Voice-in/voice-out server for the Shop Bob machine shop assistant. STT (faster-whisper), LLM (Ollama), TTS (Piper) with sentence-level audio streaming over WebSocket for low-latency responses. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
26 lines
827 B
Python
26 lines
827 B
Python
import logging
|
|
|
|
from fastapi import WebSocket
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class ConnectionManager:
|
|
def __init__(self) -> None:
|
|
self._connections: dict[str, WebSocket] = {}
|
|
|
|
async def connect(self, client_id: str, websocket: WebSocket) -> None:
|
|
await websocket.accept()
|
|
self._connections[client_id] = websocket
|
|
logger.info("Client connected: %s (total: %d)", client_id, len(self._connections))
|
|
|
|
def disconnect(self, client_id: str) -> None:
|
|
self._connections.pop(client_id, None)
|
|
logger.info("Client disconnected: %s (total: %d)", client_id, len(self._connections))
|
|
|
|
def get_active_connections(self) -> dict[str, WebSocket]:
|
|
return dict(self._connections)
|
|
|
|
@property
|
|
def active_count(self) -> int:
|
|
return len(self._connections)
|