import asyncio import io import logging import wave from concurrent.futures import ThreadPoolExecutor from functools import partial from piper.voice import PiperVoice from .config import settings logger = logging.getLogger(__name__) _voice: PiperVoice | None = None _executor = ThreadPoolExecutor(max_workers=2) def load_model() -> None: global _voice logger.info("Loading Piper TTS voice %s...", settings.piper_model) _voice = PiperVoice.load(settings.piper_model) logger.info("Piper TTS loaded.") def _synthesize_sync(text: str) -> bytes: """Synthesize text to raw PCM 16-bit mono audio bytes.""" assert _voice is not None, "Piper voice not loaded — call load_model() first" buf = io.BytesIO() with wave.open(buf, "wb") as wf: _voice.synthesize(text, wf) # Extract raw PCM from the WAV container buf.seek(0) with wave.open(buf, "rb") as wf: pcm_data = wf.readframes(wf.getnframes()) logger.debug("Synthesized %d chars → %d bytes PCM", len(text), len(pcm_data)) return pcm_data async def synthesize(text: str) -> bytes: """Async wrapper — runs Piper in a thread pool.""" loop = asyncio.get_running_loop() return await loop.run_in_executor( _executor, partial(_synthesize_sync, text), )