From f98a21f96bcaf477d45ea638eeebbc1b266da9f4 Mon Sep 17 00:00:00 2001 From: dullfig Date: Tue, 27 Jan 2026 21:27:48 -0800 Subject: [PATCH] Wire budget cleanup to thread lifecycle When threads terminate (handler returns None or chain exhausted), the pump now calls budget_registry.cleanup_thread() to: - Free memory for completed threads - Return final budget for logging/billing - Log token usage at debug level This ensures budgets don't accumulate for completed conversations. Also adds: - has_budget() method to check if thread exists without creating - Tests for cleanup behavior Co-Authored-By: Claude Opus 4.5 --- tests/test_token_budget.py | 61 +++++++++++++++++++++++++ xml_pipeline/message_bus/stream_pump.py | 18 ++++++++ 2 files changed, 79 insertions(+) diff --git a/tests/test_token_budget.py b/tests/test_token_budget.py index 1b2b712..1c93438 100644 --- a/tests/test_token_budget.py +++ b/tests/test_token_budget.py @@ -571,3 +571,64 @@ class TestLLMRouterBudgetIntegration: ) assert response.content == "Hello!" + + +# ============================================================================ +# Budget Cleanup Tests +# ============================================================================ + +class TestBudgetCleanup: + """Test budget cleanup when threads complete.""" + + @pytest.fixture(autouse=True) + def reset_all(self): + """Reset all global registries.""" + reset_budget_registry() + yield + reset_budget_registry() + + def test_cleanup_thread_returns_budget(self): + """cleanup_thread should return the budget before removing it.""" + registry = ThreadBudgetRegistry() + registry.consume("thread-1", prompt_tokens=500, completion_tokens=200) + + final = registry.cleanup_thread("thread-1") + + assert final is not None + assert final.prompt_tokens == 500 + assert final.completion_tokens == 200 + assert final.total_tokens == 700 + + def test_cleanup_thread_removes_budget(self): + """cleanup_thread should remove the budget from registry.""" + registry = ThreadBudgetRegistry() + registry.consume("thread-1", prompt_tokens=500, completion_tokens=200) + + registry.cleanup_thread("thread-1") + + # Budget should no longer exist + assert not registry.has_budget("thread-1") + assert registry.get_usage("thread-1") is None + + def test_cleanup_nonexistent_thread_returns_none(self): + """cleanup_thread for unknown thread should return None.""" + registry = ThreadBudgetRegistry() + + result = registry.cleanup_thread("nonexistent") + + assert result is None + + def test_global_cleanup(self): + """Test cleanup via global registry.""" + configure_budget_registry(max_tokens_per_thread=10000) + registry = get_budget_registry() + + # Consume some tokens + registry.consume("test-thread", prompt_tokens=1000, completion_tokens=500) + assert registry.has_budget("test-thread") + + # Cleanup + final = registry.cleanup_thread("test-thread") + + assert final.total_tokens == 1500 + assert not registry.has_budget("test-thread") diff --git a/xml_pipeline/message_bus/stream_pump.py b/xml_pipeline/message_bus/stream_pump.py index 50dea7f..d3270d6 100644 --- a/xml_pipeline/message_bus/stream_pump.py +++ b/xml_pipeline/message_bus/stream_pump.py @@ -44,6 +44,7 @@ from xml_pipeline.message_bus.steps.thread_assignment import thread_assignment_s from xml_pipeline.message_bus.message_state import MessageState, HandlerMetadata, HandlerResponse, SystemError, ROUTING_ERROR from xml_pipeline.message_bus.thread_registry import get_registry from xml_pipeline.message_bus.todo_registry import get_todo_registry +from xml_pipeline.message_bus.budget_registry import get_budget_registry from xml_pipeline.memory import get_context_buffer pump_logger = logging.getLogger(__name__) @@ -681,6 +682,15 @@ class StreamPump: # None means "no response needed" - don't re-inject if response is None: + # Thread terminates here - cleanup budget + budget_registry = get_budget_registry() + final_budget = budget_registry.cleanup_thread(current_thread) + if final_budget: + pump_logger.debug( + f"Thread {current_thread[:8]}... completed: " + f"{final_budget.total_tokens} tokens used" + ) + # Emit idle state self._emit_event(AgentStateEvent( agent_name=listener.name, @@ -698,6 +708,14 @@ class StreamPump: target, new_thread_id = registry.prune_for_response(current_thread) if target is None: # Chain exhausted - nowhere to respond to + # Cleanup thread budget + budget_registry = get_budget_registry() + final_budget = budget_registry.cleanup_thread(current_thread) + if final_budget: + pump_logger.debug( + f"Thread {current_thread[:8]}... chain exhausted: " + f"{final_budget.total_tokens} tokens used" + ) continue to_id = target thread_id = new_thread_id