diff --git a/agentserver/message_bus/stream_pump.py b/agentserver/message_bus/stream_pump.py index 583c379..7773601 100644 --- a/agentserver/message_bus/stream_pump.py +++ b/agentserver/message_bus/stream_pump.py @@ -381,9 +381,11 @@ class StreamPump: # === CONTEXT BUFFER: Record incoming message === # Append validated payload to thread's context buffer + # The returned BufferSlot becomes the single source of truth + slot = None if current_thread and state.payload: try: - context_buffer.append( + slot = context_buffer.append( thread_id=current_thread, payload=state.payload, from_id=state.from_id or "unknown", @@ -400,16 +402,24 @@ class StreamPump: f"Thread {current_thread[:8]}... exceeded context buffer limit" ) - metadata = HandlerMetadata( - thread_id=current_thread, - from_id=state.from_id or "", - own_name=listener.name if listener.is_agent else None, - is_self_call=is_self_call, - usage_instructions=listener.usage_instructions, - todo_nudge=todo_nudge, - ) + # Derive metadata from slot (single source of truth) + # Fall back to manual construction if no slot (e.g., buffer overflow) + if slot: + from agentserver.memory import slot_to_handler_metadata + metadata = slot_to_handler_metadata(slot) + payload_ref = slot.payload # Same reference as in buffer + else: + metadata = HandlerMetadata( + thread_id=current_thread, + from_id=state.from_id or "", + own_name=listener.name if listener.is_agent else None, + is_self_call=is_self_call, + usage_instructions=listener.usage_instructions, + todo_nudge=todo_nudge, + ) + payload_ref = state.payload - response = await listener.handler(state.payload, metadata) + response = await listener.handler(payload_ref, metadata) # None means "no response needed" - don't re-inject if response is None: