WebSocket Voice Conversations

Connect to your Omnia Voice agents for real-time voice conversations

Real-time Audio

Getting Started

Set up voice conversations in 3 simple steps

1

Install Dependencies

Install the required Python packages for audio streaming

pip install aiohttp pyee sounddevice websockets numpy
2

Set Your Credentials

Export your API key and agent ID as environment variables

export OMNIA_API_KEY="your_api_key_here"
export OMNIA_AGENT_ID="your_agent_id_here"
3

Run the Client

Save the Python code below and run it to start talking

python omnia_websocket_client.py

How Tools Work

Tools execute automatically - no client-side code needed

Tools can be assigned to your agent via the API or dashboard. Once assigned, they execute automatically during conversations without any client-side implementation.

Automatic Execution

The agent handles all tool invocations automatically

HTTP-Based

Tools make HTTP requests to your endpoints when needed

Real-time Processing

Results are processed and spoken in real-time

Python Client Code

Complete Python client for WebSocket voice conversations

#!/usr/bin/env python3
"""
Omnia Voice WebSocket Client - Real-time voice conversations with AI agents
"""

import argparse
import asyncio
import json
import logging
import os
import signal
import sys
from typing import Any, AsyncGenerator, Awaitable, Literal

import aiohttp
import numpy as np
import pyee.asyncio
import sounddevice
from websockets import exceptions as ws_exceptions
from websockets.asyncio import client as ws_client


class AudioPlayer:
    """Handles audio output to speakers with buffer management."""

    def __init__(self, sample_rate: int = 48000) -> None:
        self._sample_rate = sample_rate
        self._buffer: bytearray = bytearray()

        def callback(outdata: np.ndarray, frame_count, time, status):
            output_frame_size = len(outdata) * 2
            next_frame = self._buffer[:output_frame_size]
            self._buffer[:] = self._buffer[output_frame_size:]
            if len(next_frame) < output_frame_size:
                next_frame += b"\x00" * (output_frame_size - len(next_frame))
            outdata[:] = np.frombuffer(next_frame, dtype="int16").reshape(
                (frame_count, 1)
            )

        self._stream = sounddevice.OutputStream(
            samplerate=sample_rate,
            channels=1,
            callback=callback,
            device=None,
            dtype="int16",
            blocksize=sample_rate // 100,
        )
        self._stream.start()
        if not self._stream.active:
            raise RuntimeError("Failed to start audio output stream")

    def write(self, chunk: bytes) -> None:
        self._buffer.extend(chunk)

    def drop_buffer(self) -> None:
        self._buffer.clear()

    async def close(self) -> None:
        if self._stream:
            self._stream.close()


class AudioRecorder:
    """Captures audio from microphone."""

    def __init__(self, sample_rate=48000):
        self._sample_rate = sample_rate

    async def stream(self) -> AsyncGenerator[bytes, None]:
        queue: asyncio.Queue[bytes] = asyncio.Queue()
        loop = asyncio.get_running_loop()

        def callback(indata: np.ndarray, frame_count, time, status):
            loop.call_soon_threadsafe(queue.put_nowait, indata.tobytes())

        stream = sounddevice.InputStream(
            samplerate=self._sample_rate,
            channels=1,
            callback=callback,
            device=None,
            dtype="int16",
            blocksize=self._sample_rate // 100,
        )
        with stream:
            if not stream.active:
                raise RuntimeError("Failed to start audio input stream")
            while True:
                yield await queue.get()


class OmniaVoiceSession(pyee.asyncio.AsyncIOEventEmitter):
    """WebSocket session manager for voice conversations."""

    def __init__(self, websocket_url: str, sample_rate: int = 48000):
        super().__init__()
        self._state: Literal["idle", "listening", "thinking", "speaking"] = "idle"
        self._pending_output = ""
        self._url = websocket_url
        self._sample_rate = sample_rate
        self._socket = None
        self._receive_task: asyncio.Task | None = None
        self._send_audio_task = asyncio.create_task(
            self._stream_microphone(AudioRecorder(sample_rate))
        )
        self._audio_player = AudioPlayer(sample_rate)

    async def start(self):
        logging.info(f"Connecting to WebSocket...")
        self._socket = await ws_client.connect(self._url)
        self._receive_task = asyncio.create_task(self._socket_receive(self._socket))

    async def _socket_receive(self, socket: ws_client.ClientConnection):
        try:
            async for message in socket:
                await self._on_socket_message(message)
        except asyncio.CancelledError:
            logging.info("Session cancelled")
        except ws_exceptions.ConnectionClosedOK:
            logging.info("Session closed normally")
        except ws_exceptions.ConnectionClosedError as e:
            self.emit("error", e)
            return
        logging.info("Session ended")
        self.emit("ended")

    async def stop(self):
        logging.info("Stopping session...")
        await cleanup_resources(
            self._audio_player.close(),
            self._socket.close() if self._socket else None,
            cancel_tasks(self._send_audio_task, self._receive_task),
        )
        if self._state != "idle":
            self._state = "idle"
            self.emit("state", "idle")

    async def _on_socket_message(self, payload: str | bytes):
        if isinstance(payload, bytes):
            self._audio_player.write(payload)
            return
        elif isinstance(payload, str):
            msg = json.loads(payload)
            await self._handle_data_message(msg)

    async def _handle_data_message(self, msg: dict[str, Any]):
        match msg["type"]:
            case "playback_clear_buffer":
                self._audio_player.drop_buffer()
                
            case "state":
                if msg["state"] != self._state:
                    self._state = msg["state"]
                    self.emit("state", msg["state"])
                    
            case "transcript":
                if msg["role"] != "agent":
                    return
                if msg.get("text", None):
                    self._pending_output = msg["text"]
                    self.emit("output", msg["text"], msg["final"])
                else:
                    self._pending_output += msg.get("delta", "")
                    self.emit("output", self._pending_output, msg["final"])
                if msg["final"]:
                    self._pending_output = ""
                    
            case "call_started":
                logging.info("Call started successfully")
                
            case "error":
                logging.warning(f"Error: {msg.get('message', 'Unknown error')}")
                
            case "debug":
                logging.info(f"Debug: {msg['message']}")
                
            case _:
                pass

    async def _stream_microphone(self, recorder: AudioRecorder):
        async for chunk in recorder.stream():
            if self._socket is None:
                continue
            await self._socket.send(chunk)


async def cleanup_resources(*awaitables_or_none: Awaitable | None):
    coros = [coro for coro in awaitables_or_none if coro is not None]
    if coros:
        maybe_exceptions = await asyncio.shield(
            asyncio.gather(*coros, return_exceptions=True)
        )
        non_cancelled_exceptions = [
            exc
            for exc in maybe_exceptions
            if isinstance(exc, Exception)
            and not isinstance(exc, asyncio.CancelledError)
        ]
        if non_cancelled_exceptions:
            to_report = (
                non_cancelled_exceptions[0]
                if len(non_cancelled_exceptions) == 1
                else ExceptionGroup("Multiple failures", non_cancelled_exceptions)
            )
            logging.warning("Error during cleanup", exc_info=to_report)


async def cancel_tasks(*tasks_or_none: asyncio.Task | None):
    tasks = [task for task in tasks_or_none if task is not None and task.cancel()]
    await cleanup_resources(*tasks)


async def create_omnia_call(api_key: str, agent_id: str, sample_rate: int = 48000) -> str:
    """Create a new Omnia Voice call session."""
    
    target = "https://dashboard.omnia-voice.com/api/v1/calls/create"
    
    async with aiohttp.ClientSession() as session:
        headers = {"X-API-Key": api_key}
        body = {
            "agentId": agent_id,
            "connectionType": "websocket",
            "websocket": {
                "inputSampleRate": sample_rate,
                "outputSampleRate": sample_rate,
            },
        }
        
        logging.info(f"Creating call with agent {agent_id}...")
        async with session.post(target, headers=headers, json=body) as response:
            if not response.ok:
                error_text = await response.text()
                logging.error(f"Failed to create call: {response.status} - {error_text}")
                raise Exception(f"Failed to create call: {response.status} - {error_text}")
            response_json = await response.json()
            return response_json["websocketUrl"]


async def main():
    """Main entry point for the Omnia Voice WebSocket client."""
    
    api_key = os.getenv("OMNIA_API_KEY", None)
    agent_id = os.getenv("OMNIA_AGENT_ID", None)
    
    if not api_key:
        raise ValueError("Please set your OMNIA_API_KEY environment variable")
    
    if not agent_id:
        raise ValueError("Please set your OMNIA_AGENT_ID environment variable")
    
    parser = argparse.ArgumentParser(
        prog="omnia_websocket_client.py",
        description="Omnia Voice WebSocket Client for real-time AI conversations"
    )
    parser.add_argument(
        "--verbose", "-v", 
        action="store_true", 
        help="Show verbose logging information"
    )
    parser.add_argument(
        "--sample-rate", "-s", 
        type=int, 
        default=48000,
        help="Audio sample rate in Hz (default: 48000)"
    )
    
    args = parser.parse_args()
    
    if args.verbose:
        logging.basicConfig(stream=sys.stdout, level=logging.INFO)
    
    print("\n╔════════════════════════════════════════╗")
    print("║        Omnia Voice WebSocket Client    ║")
    print("║                                        ║")
    print("╚════════════════════════════════════════╝\n")
    
    print(f"Agent ID: {agent_id}")
    print(f"API Key: {api_key[:20]}...")
    print("")
    
    websocket_url = await create_omnia_call(api_key, agent_id, args.sample_rate)
    print(f"Call created, connecting to agent...")
    
    client = OmniaVoiceSession(websocket_url, args.sample_rate)
    done = asyncio.Event()
    loop = asyncio.get_running_loop()

    @client.on("state")
    async def on_state(state):
        if state == "listening":
            print("User:  ", end="\r")
        elif state == "thinking":
            print("Agent: ", end="\r")

    @client.on("output")
    async def on_output(text, final):
        display_text = f"{text.strip()}"
        print("Agent: " + display_text, end="\n" if final else "\r")

    @client.on("error")
    async def on_error(error):
        logging.exception("Client error", exc_info=error)
        print(f"Error: {error}")
        done.set()

    @client.on("ended")
    async def on_ended():
        print("\nSession ended")
        done.set()

    loop.add_signal_handler(signal.SIGINT, lambda: done.set())
    loop.add_signal_handler(signal.SIGTERM, lambda: done.set())
    
    await client.start()
    print("Connected! Start speaking...\n")
    
    await done.wait()
    await client.stop()


if __name__ == "__main__":
    asyncio.run(main())