Connect to your Omnia Voice agents for real-time voice conversations
Set up voice conversations in 3 simple steps
Install the required Python packages for audio streaming
pip install aiohttp pyee sounddevice websockets numpyExport your API key and agent ID as environment variables
export OMNIA_API_KEY="your_api_key_here"
export OMNIA_AGENT_ID="your_agent_id_here"Save the Python code below and run it to start talking
python omnia_websocket_client.pyTools execute automatically - no client-side code needed
Tools can be assigned to your agent via the API or dashboard. Once assigned, they execute automatically during conversations without any client-side implementation.
Automatic Execution
The agent handles all tool invocations automatically
HTTP-Based
Tools make HTTP requests to your endpoints when needed
Real-time Processing
Results are processed and spoken in real-time
Complete Python client for WebSocket voice conversations
#!/usr/bin/env python3
"""
Omnia Voice WebSocket Client - Real-time voice conversations with AI agents
"""
import argparse
import asyncio
import json
import logging
import os
import signal
import sys
from typing import Any, AsyncGenerator, Awaitable, Literal
import aiohttp
import numpy as np
import pyee.asyncio
import sounddevice
from websockets import exceptions as ws_exceptions
from websockets.asyncio import client as ws_client
class AudioPlayer:
"""Handles audio output to speakers with buffer management."""
def __init__(self, sample_rate: int = 48000) -> None:
self._sample_rate = sample_rate
self._buffer: bytearray = bytearray()
def callback(outdata: np.ndarray, frame_count, time, status):
output_frame_size = len(outdata) * 2
next_frame = self._buffer[:output_frame_size]
self._buffer[:] = self._buffer[output_frame_size:]
if len(next_frame) < output_frame_size:
next_frame += b"\x00" * (output_frame_size - len(next_frame))
outdata[:] = np.frombuffer(next_frame, dtype="int16").reshape(
(frame_count, 1)
)
self._stream = sounddevice.OutputStream(
samplerate=sample_rate,
channels=1,
callback=callback,
device=None,
dtype="int16",
blocksize=sample_rate // 100,
)
self._stream.start()
if not self._stream.active:
raise RuntimeError("Failed to start audio output stream")
def write(self, chunk: bytes) -> None:
self._buffer.extend(chunk)
def drop_buffer(self) -> None:
self._buffer.clear()
async def close(self) -> None:
if self._stream:
self._stream.close()
class AudioRecorder:
"""Captures audio from microphone."""
def __init__(self, sample_rate=48000):
self._sample_rate = sample_rate
async def stream(self) -> AsyncGenerator[bytes, None]:
queue: asyncio.Queue[bytes] = asyncio.Queue()
loop = asyncio.get_running_loop()
def callback(indata: np.ndarray, frame_count, time, status):
loop.call_soon_threadsafe(queue.put_nowait, indata.tobytes())
stream = sounddevice.InputStream(
samplerate=self._sample_rate,
channels=1,
callback=callback,
device=None,
dtype="int16",
blocksize=self._sample_rate // 100,
)
with stream:
if not stream.active:
raise RuntimeError("Failed to start audio input stream")
while True:
yield await queue.get()
class OmniaVoiceSession(pyee.asyncio.AsyncIOEventEmitter):
"""WebSocket session manager for voice conversations."""
def __init__(self, websocket_url: str, sample_rate: int = 48000):
super().__init__()
self._state: Literal["idle", "listening", "thinking", "speaking"] = "idle"
self._pending_output = ""
self._url = websocket_url
self._sample_rate = sample_rate
self._socket = None
self._receive_task: asyncio.Task | None = None
self._send_audio_task = asyncio.create_task(
self._stream_microphone(AudioRecorder(sample_rate))
)
self._audio_player = AudioPlayer(sample_rate)
async def start(self):
logging.info(f"Connecting to WebSocket...")
self._socket = await ws_client.connect(self._url)
self._receive_task = asyncio.create_task(self._socket_receive(self._socket))
async def _socket_receive(self, socket: ws_client.ClientConnection):
try:
async for message in socket:
await self._on_socket_message(message)
except asyncio.CancelledError:
logging.info("Session cancelled")
except ws_exceptions.ConnectionClosedOK:
logging.info("Session closed normally")
except ws_exceptions.ConnectionClosedError as e:
self.emit("error", e)
return
logging.info("Session ended")
self.emit("ended")
async def stop(self):
logging.info("Stopping session...")
await cleanup_resources(
self._audio_player.close(),
self._socket.close() if self._socket else None,
cancel_tasks(self._send_audio_task, self._receive_task),
)
if self._state != "idle":
self._state = "idle"
self.emit("state", "idle")
async def _on_socket_message(self, payload: str | bytes):
if isinstance(payload, bytes):
self._audio_player.write(payload)
return
elif isinstance(payload, str):
msg = json.loads(payload)
await self._handle_data_message(msg)
async def _handle_data_message(self, msg: dict[str, Any]):
match msg["type"]:
case "playback_clear_buffer":
self._audio_player.drop_buffer()
case "state":
if msg["state"] != self._state:
self._state = msg["state"]
self.emit("state", msg["state"])
case "transcript":
if msg["role"] != "agent":
return
if msg.get("text", None):
self._pending_output = msg["text"]
self.emit("output", msg["text"], msg["final"])
else:
self._pending_output += msg.get("delta", "")
self.emit("output", self._pending_output, msg["final"])
if msg["final"]:
self._pending_output = ""
case "call_started":
logging.info("Call started successfully")
case "error":
logging.warning(f"Error: {msg.get('message', 'Unknown error')}")
case "debug":
logging.info(f"Debug: {msg['message']}")
case _:
pass
async def _stream_microphone(self, recorder: AudioRecorder):
async for chunk in recorder.stream():
if self._socket is None:
continue
await self._socket.send(chunk)
async def cleanup_resources(*awaitables_or_none: Awaitable | None):
coros = [coro for coro in awaitables_or_none if coro is not None]
if coros:
maybe_exceptions = await asyncio.shield(
asyncio.gather(*coros, return_exceptions=True)
)
non_cancelled_exceptions = [
exc
for exc in maybe_exceptions
if isinstance(exc, Exception)
and not isinstance(exc, asyncio.CancelledError)
]
if non_cancelled_exceptions:
to_report = (
non_cancelled_exceptions[0]
if len(non_cancelled_exceptions) == 1
else ExceptionGroup("Multiple failures", non_cancelled_exceptions)
)
logging.warning("Error during cleanup", exc_info=to_report)
async def cancel_tasks(*tasks_or_none: asyncio.Task | None):
tasks = [task for task in tasks_or_none if task is not None and task.cancel()]
await cleanup_resources(*tasks)
async def create_omnia_call(api_key: str, agent_id: str, sample_rate: int = 48000) -> str:
"""Create a new Omnia Voice call session."""
target = "https://dashboard.omnia-voice.com/api/v1/calls/create"
async with aiohttp.ClientSession() as session:
headers = {"X-API-Key": api_key}
body = {
"agentId": agent_id,
"connectionType": "websocket",
"websocket": {
"inputSampleRate": sample_rate,
"outputSampleRate": sample_rate,
},
}
logging.info(f"Creating call with agent {agent_id}...")
async with session.post(target, headers=headers, json=body) as response:
if not response.ok:
error_text = await response.text()
logging.error(f"Failed to create call: {response.status} - {error_text}")
raise Exception(f"Failed to create call: {response.status} - {error_text}")
response_json = await response.json()
return response_json["websocketUrl"]
async def main():
"""Main entry point for the Omnia Voice WebSocket client."""
api_key = os.getenv("OMNIA_API_KEY", None)
agent_id = os.getenv("OMNIA_AGENT_ID", None)
if not api_key:
raise ValueError("Please set your OMNIA_API_KEY environment variable")
if not agent_id:
raise ValueError("Please set your OMNIA_AGENT_ID environment variable")
parser = argparse.ArgumentParser(
prog="omnia_websocket_client.py",
description="Omnia Voice WebSocket Client for real-time AI conversations"
)
parser.add_argument(
"--verbose", "-v",
action="store_true",
help="Show verbose logging information"
)
parser.add_argument(
"--sample-rate", "-s",
type=int,
default=48000,
help="Audio sample rate in Hz (default: 48000)"
)
args = parser.parse_args()
if args.verbose:
logging.basicConfig(stream=sys.stdout, level=logging.INFO)
print("\n╔════════════════════════════════════════╗")
print("║ Omnia Voice WebSocket Client ║")
print("║ ║")
print("╚════════════════════════════════════════╝\n")
print(f"Agent ID: {agent_id}")
print(f"API Key: {api_key[:20]}...")
print("")
websocket_url = await create_omnia_call(api_key, agent_id, args.sample_rate)
print(f"Call created, connecting to agent...")
client = OmniaVoiceSession(websocket_url, args.sample_rate)
done = asyncio.Event()
loop = asyncio.get_running_loop()
@client.on("state")
async def on_state(state):
if state == "listening":
print("User: ", end="\r")
elif state == "thinking":
print("Agent: ", end="\r")
@client.on("output")
async def on_output(text, final):
display_text = f"{text.strip()}"
print("Agent: " + display_text, end="\n" if final else "\r")
@client.on("error")
async def on_error(error):
logging.exception("Client error", exc_info=error)
print(f"Error: {error}")
done.set()
@client.on("ended")
async def on_ended():
print("\nSession ended")
done.set()
loop.add_signal_handler(signal.SIGINT, lambda: done.set())
loop.add_signal_handler(signal.SIGTERM, lambda: done.set())
await client.start()
print("Connected! Start speaking...\n")
await done.wait()
await client.stop()
if __name__ == "__main__":
asyncio.run(main())