shop-bob/server/connection_manager.py
dan 98310bf062 Add server component: FastAPI + WebSocket speech pipeline
Voice-in/voice-out server for the Shop Bob machine shop assistant.
STT (faster-whisper), LLM (Ollama), TTS (Piper) with sentence-level
audio streaming over WebSocket for low-latency responses.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-02-05 13:23:01 -08:00

26 lines
827 B
Python

import logging
from fastapi import WebSocket
logger = logging.getLogger(__name__)
class ConnectionManager:
def __init__(self) -> None:
self._connections: dict[str, WebSocket] = {}
async def connect(self, client_id: str, websocket: WebSocket) -> None:
await websocket.accept()
self._connections[client_id] = websocket
logger.info("Client connected: %s (total: %d)", client_id, len(self._connections))
def disconnect(self, client_id: str) -> None:
self._connections.pop(client_id, None)
logger.info("Client disconnected: %s (total: %d)", client_id, len(self._connections))
def get_active_connections(self) -> dict[str, WebSocket]:
return dict(self._connections)
@property
def active_count(self) -> int:
return len(self._connections)