shop-bob/server/pipeline.py
dan 98310bf062 Add server component: FastAPI + WebSocket speech pipeline
Voice-in/voice-out server for the Shop Bob machine shop assistant.
STT (faster-whisper), LLM (Ollama), TTS (Piper) with sentence-level
audio streaming over WebSocket for low-latency responses.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-02-05 13:23:01 -08:00

85 lines
2.8 KiB
Python

import json
import logging
import re
from fastapi import WebSocket
from . import llm, stt, tts
logger = logging.getLogger(__name__)
# Regex to split text on sentence boundaries while keeping the delimiters
_SENTENCE_RE = re.compile(r"(?<=[.!?])\s+")
async def _send_status(ws: WebSocket, state: str) -> None:
await ws.send_text(json.dumps({"type": "status", "state": state}))
async def process_request(
audio_bytes: bytes,
sample_rate: int,
websocket: WebSocket,
) -> None:
"""Run the full speech-in → text-out → speech-out pipeline."""
try:
# --- STT ---
await _send_status(websocket, "transcribing")
transcript = await stt.transcribe(audio_bytes, sample_rate)
if not transcript.strip():
await websocket.send_text(
json.dumps({"type": "transcript", "text": ""})
)
await websocket.send_text(json.dumps({"type": "response_end"}))
return
await websocket.send_text(
json.dumps({"type": "transcript", "text": transcript})
)
# --- LLM ---
await _send_status(websocket, "thinking")
full_response = ""
sentence_buffer = ""
# --- Sentence-level TTS streaming ---
await _send_status(websocket, "speaking")
async for token in llm.generate_response(transcript):
full_response += token
sentence_buffer += token
# Check if we have one or more complete sentences
parts = _SENTENCE_RE.split(sentence_buffer)
if len(parts) > 1:
# All parts except the last are complete sentences
for sentence in parts[:-1]:
sentence = sentence.strip()
if sentence:
audio_chunk = await tts.synthesize(sentence)
await websocket.send_bytes(audio_chunk)
# Keep the incomplete remainder
sentence_buffer = parts[-1]
# Flush any remaining text
sentence_buffer = sentence_buffer.strip()
if sentence_buffer:
audio_chunk = await tts.synthesize(sentence_buffer)
await websocket.send_bytes(audio_chunk)
# Send the full text response and signal completion
await websocket.send_text(
json.dumps({"type": "response_text", "text": full_response})
)
await websocket.send_text(json.dumps({"type": "response_end"}))
except Exception:
logger.exception("Pipeline error")
try:
await websocket.send_text(
json.dumps({"type": "error", "text": "Internal processing error"})
)
await websocket.send_text(json.dumps({"type": "response_end"}))
except Exception:
pass # Client already disconnected