From ce8a9ae0e75c9e416b756fc8a8d34c6e41812f53 Mon Sep 17 00:00:00 2001 From: dullfig Date: Tue, 27 Jan 2026 23:07:19 -0800 Subject: [PATCH] Add Premium Librarian MVP for codebase intelligence Implements an RLM-powered codebase intelligence system that: - Ingests git repositories and chunks code intelligently - Stores chunks in eXist-db for RAG retrieval - Answers natural language queries using LLM synthesis New package xml_pipeline/librarian/ with: - chunker.py: AST-based code chunking (Python, JS/TS, C++) - ingest.py: Git clone + file walking + chunk storage - index.py: Structural index building (files, functions, classes) - query.py: RAG search + LLM synthesis with source citations - primitives.py: XML payloads (LibrarianIngest, LibrarianQuery, etc.) - handler.py: Message handlers for organism integration Also adds GitPython and aiohttp as optional [librarian] dependencies. Co-Authored-By: Claude Opus 4.5 --- pyproject.toml | 5 +- tests/test_librarian_chunker.py | 375 +++++++++++++++ tests/test_librarian_query.py | 292 ++++++++++++ xml_pipeline/librarian/__init__.py | 103 ++++ xml_pipeline/librarian/chunker.py | 677 +++++++++++++++++++++++++++ xml_pipeline/librarian/handler.py | 246 ++++++++++ xml_pipeline/librarian/index.py | 328 +++++++++++++ xml_pipeline/librarian/ingest.py | 393 ++++++++++++++++ xml_pipeline/librarian/primitives.py | 167 +++++++ xml_pipeline/librarian/query.py | 436 +++++++++++++++++ 10 files changed, 3021 insertions(+), 1 deletion(-) create mode 100644 tests/test_librarian_chunker.py create mode 100644 tests/test_librarian_query.py create mode 100644 xml_pipeline/librarian/__init__.py create mode 100644 xml_pipeline/librarian/chunker.py create mode 100644 xml_pipeline/librarian/handler.py create mode 100644 xml_pipeline/librarian/index.py create mode 100644 xml_pipeline/librarian/ingest.py create mode 100644 xml_pipeline/librarian/primitives.py create mode 100644 xml_pipeline/librarian/query.py diff --git a/pyproject.toml b/pyproject.toml index d706668..0d05ee6 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -77,6 +77,9 @@ openai = ["openai>=1.0"] redis = ["redis>=5.0"] # Distributed key-value store search = ["duckduckgo-search>=6.0"] # Web search tool +# Premium Librarian (codebase intelligence) +librarian = ["gitpython>=3.1", "aiohttp>=3.9"] + # Console example (optional, for interactive use) console = ["prompt_toolkit>=3.0"] @@ -91,7 +94,7 @@ server = [ llm = ["xml-pipeline[anthropic,openai]"] # All tools -tools = ["xml-pipeline[redis,search]"] +tools = ["xml-pipeline[redis,search,librarian]"] # Everything (for local development) all = ["xml-pipeline[llm,tools,console,server]"] diff --git a/tests/test_librarian_chunker.py b/tests/test_librarian_chunker.py new file mode 100644 index 0000000..175b50d --- /dev/null +++ b/tests/test_librarian_chunker.py @@ -0,0 +1,375 @@ +""" +Tests for the Premium Librarian code chunker. +""" + +import pytest + +from xml_pipeline.librarian.chunker import ( + Chunk, + chunk_file, + chunk_python, + chunk_javascript, + chunk_cpp, + chunk_prose, + chunk_generic, + detect_language, +) + + +class TestLanguageDetection: + """Tests for language detection from file paths.""" + + def test_python_detection(self) -> None: + assert detect_language("foo.py") == "python" + assert detect_language("path/to/module.py") == "python" + assert detect_language("types.pyi") == "python" + + def test_javascript_detection(self) -> None: + assert detect_language("app.js") == "javascript" + assert detect_language("component.jsx") == "javascript" + assert detect_language("index.mjs") == "javascript" + + def test_typescript_detection(self) -> None: + assert detect_language("app.ts") == "typescript" + assert detect_language("component.tsx") == "typescript" + + def test_cpp_detection(self) -> None: + assert detect_language("main.cpp") == "cpp" + assert detect_language("header.hpp") == "cpp" + assert detect_language("source.cc") == "cpp" + + def test_c_detection(self) -> None: + assert detect_language("main.c") == "c" + assert detect_language("header.h") == "c" + + def test_unknown_language(self) -> None: + assert detect_language("data.xyz") == "unknown" + assert detect_language("noextension") == "unknown" + + def test_case_insensitive(self) -> None: + assert detect_language("Module.PY") == "python" + assert detect_language("APP.JS") == "javascript" + + +class TestPythonChunker: + """Tests for Python AST-based chunking.""" + + def test_simple_function(self) -> None: + code = ''' +def hello(name: str) -> str: + """Say hello.""" + return f"Hello, {name}!" +''' + chunks = chunk_python(code, "test.py") + assert len(chunks) == 1 + assert chunks[0].name == "hello" + assert chunks[0].chunk_type == "function" + assert chunks[0].docstring == "Say hello." + assert "str" in chunks[0].signature + + def test_async_function(self) -> None: + code = ''' +async def fetch_data(url: str) -> dict: + """Fetch data from URL.""" + pass +''' + chunks = chunk_python(code, "test.py") + assert len(chunks) == 1 + assert chunks[0].name == "fetch_data" + assert chunks[0].chunk_type == "function" + assert "async" in chunks[0].signature + + def test_class_with_methods(self) -> None: + code = ''' +class Calculator: + """A simple calculator.""" + + def add(self, a: int, b: int) -> int: + """Add two numbers.""" + return a + b + + def subtract(self, a: int, b: int) -> int: + """Subtract two numbers.""" + return a - b +''' + chunks = chunk_python(code, "test.py") + # Should create a class chunk (small enough to keep together) + assert len(chunks) >= 1 + class_chunk = [c for c in chunks if c.chunk_type == "class"] + assert len(class_chunk) == 1 + assert class_chunk[0].name == "Calculator" + assert class_chunk[0].docstring == "A simple calculator." + + def test_imports_extracted(self) -> None: + code = ''' +import os +from typing import Optional, List + +def process(): + pass +''' + chunks = chunk_python(code, "test.py") + assert len(chunks) == 1 + assert "import os" in chunks[0].imports + assert any("from typing import" in imp for imp in chunks[0].imports) + + def test_empty_file(self) -> None: + chunks = chunk_python("", "test.py") + assert len(chunks) == 0 + + def test_module_with_only_imports(self) -> None: + code = ''' +import os +import sys +''' + chunks = chunk_python(code, "test.py") + # Should create a module chunk for files with no functions/classes + assert len(chunks) == 0 or chunks[0].chunk_type == "module" + + def test_syntax_error_fallback(self) -> None: + code = ''' +def broken( + # Missing closing paren +''' + chunks = chunk_python(code, "test.py") + # Should fall back to generic chunking + assert len(chunks) >= 0 # May or may not produce chunks + + +class TestJavaScriptChunker: + """Tests for JavaScript regex-based chunking.""" + + def test_function_declaration(self) -> None: + code = ''' +function greet(name) { + return `Hello, ${name}!`; +} +''' + chunks = chunk_javascript(code, "test.js") + assert len(chunks) == 1 + assert chunks[0].name == "greet" + assert chunks[0].chunk_type == "function" + + def test_async_function(self) -> None: + code = ''' +async function fetchData(url) { + const response = await fetch(url); + return response.json(); +} +''' + chunks = chunk_javascript(code, "test.js") + assert len(chunks) == 1 + assert chunks[0].name == "fetchData" + + def test_arrow_function(self) -> None: + code = ''' +const multiply = (a, b) => { + return a * b; +}; +''' + chunks = chunk_javascript(code, "test.js") + assert len(chunks) == 1 + assert chunks[0].name == "multiply" + assert chunks[0].chunk_type == "function" + + def test_class_definition(self) -> None: + code = ''' +class Calculator { + constructor() { + this.result = 0; + } + + add(value) { + this.result += value; + return this; + } +} +''' + chunks = chunk_javascript(code, "test.js") + assert len(chunks) >= 1 + class_chunks = [c for c in chunks if c.chunk_type == "class"] + assert len(class_chunks) == 1 + assert class_chunks[0].name == "Calculator" + + def test_export_function(self) -> None: + code = ''' +export function exportedFunc() { + return 42; +} +''' + chunks = chunk_javascript(code, "test.js") + assert len(chunks) == 1 + assert chunks[0].name == "exportedFunc" + + def test_imports_extracted(self) -> None: + code = ''' +import React from 'react'; +import { useState } from 'react'; +const lodash = require('lodash'); + +function Component() { + return null; +} +''' + chunks = chunk_javascript(code, "test.jsx") + assert len(chunks) >= 1 + assert any("import React" in imp for imp in chunks[0].imports) + + +class TestCppChunker: + """Tests for C++ regex-based chunking.""" + + def test_function_definition(self) -> None: + code = ''' +int add(int a, int b) { + return a + b; +} +''' + chunks = chunk_cpp(code, "test.cpp") + assert len(chunks) >= 1 + func_chunks = [c for c in chunks if c.chunk_type == "function"] + assert len(func_chunks) == 1 + assert func_chunks[0].name == "add" + + def test_class_definition(self) -> None: + code = ''' +class Calculator { +public: + int add(int a, int b); + int subtract(int a, int b); +}; +''' + chunks = chunk_cpp(code, "test.cpp") + assert len(chunks) >= 1 + class_chunks = [c for c in chunks if c.chunk_type == "class"] + assert len(class_chunks) == 1 + assert class_chunks[0].name == "Calculator" + + def test_includes_extracted(self) -> None: + code = ''' +#include +#include "myheader.h" + +int main() { + return 0; +} +''' + chunks = chunk_cpp(code, "test.cpp") + assert len(chunks) >= 1 + assert any("#include " in imp for imp in chunks[0].imports) + + +class TestProseChunker: + """Tests for prose document chunking.""" + + def test_markdown_headings(self) -> None: + content = '''# Introduction + +This is the introduction section. + +## Getting Started + +Follow these steps to get started. + +## Advanced Topics + +More advanced content here. +''' + chunks = chunk_prose(content, "readme.md", "markdown") + assert len(chunks) >= 2 + # First chunk should be introduction + assert chunks[0].name == "Introduction" + + def test_empty_document(self) -> None: + chunks = chunk_prose("", "empty.md", "markdown") + assert len(chunks) == 0 + + +class TestGenericChunker: + """Tests for generic line-based chunking.""" + + def test_small_file(self) -> None: + content = "line1\nline2\nline3" + chunks = chunk_generic(content, "test.txt", "text") + assert len(chunks) == 1 + assert chunks[0].content == content + + def test_empty_file(self) -> None: + chunks = chunk_generic("", "empty.txt", "text") + assert len(chunks) == 0 + + +class TestChunkFile: + """Tests for the main chunk_file dispatcher.""" + + def test_dispatches_to_python(self) -> None: + code = "def foo(): pass" + chunks = chunk_file(code, "test.py") + assert all(c.language == "python" for c in chunks) + + def test_dispatches_to_javascript(self) -> None: + code = "function foo() {}" + chunks = chunk_file(code, "test.js") + assert all(c.language == "javascript" for c in chunks) + + def test_dispatches_to_cpp(self) -> None: + code = "int main() { return 0; }" + chunks = chunk_file(code, "test.cpp") + assert all(c.language == "cpp" for c in chunks) + + def test_unknown_language_uses_generic(self) -> None: + content = "some content" + chunks = chunk_file(content, "test.xyz") + assert all(c.language == "unknown" for c in chunks) + + +class TestChunkProperties: + """Tests for Chunk dataclass properties.""" + + def test_chunk_id_generation(self) -> None: + chunk = Chunk( + content="def foo(): pass", + file_path="test.py", + start_line=1, + end_line=1, + chunk_type="function", + name="foo", + language="python", + ) + assert chunk.chunk_id + assert "test.py" in chunk.chunk_id + assert "foo" in chunk.chunk_id + + def test_chunk_id_uniqueness(self) -> None: + chunk1 = Chunk( + content="def foo(): pass", + file_path="test.py", + start_line=1, + end_line=1, + chunk_type="function", + name="foo", + language="python", + ) + chunk2 = Chunk( + content="def foo(): return 1", + file_path="test.py", + start_line=1, + end_line=1, + chunk_type="function", + name="foo", + language="python", + ) + # Different content should produce different IDs + assert chunk1.chunk_id != chunk2.chunk_id + + def test_line_count(self) -> None: + chunk = Chunk( + content="line1\nline2\nline3", + file_path="test.py", + start_line=10, + end_line=12, + chunk_type="block", + name="test", + language="python", + ) + assert chunk.line_count == 3 diff --git a/tests/test_librarian_query.py b/tests/test_librarian_query.py new file mode 100644 index 0000000..07e0b34 --- /dev/null +++ b/tests/test_librarian_query.py @@ -0,0 +1,292 @@ +""" +Integration tests for Premium Librarian query system. + +These tests require: +- eXist-db running (for storage) +- LLM router configured (for synthesis) + +Mark with @pytest.mark.integration to skip in CI without dependencies. +""" + +import pytest +from unittest.mock import AsyncMock, patch, MagicMock + +from xml_pipeline.librarian.query import ( + QueryResult, + Source, + RetrievedChunk, + _build_rag_prompt, + format_sources_xml, +) +from xml_pipeline.librarian.index import LibraryIndex + + +class TestBuildRagPrompt: + """Tests for RAG prompt construction.""" + + def test_builds_prompt_with_context(self) -> None: + chunks = [ + RetrievedChunk( + chunk_id="test:foo:abc123", + file_path="src/utils.py", + name="calculate", + chunk_type="function", + language="python", + start_line=10, + end_line=20, + content="def calculate(x): return x * 2", + docstring="Calculate double.", + signature="def calculate(x) -> int", + score=0.9, + ), + RetrievedChunk( + chunk_id="test:bar:def456", + file_path="src/main.py", + name="main", + chunk_type="function", + language="python", + start_line=1, + end_line=5, + content="def main(): print('hello')", + docstring="", + signature="def main()", + score=0.7, + ), + ] + + prompt = _build_rag_prompt( + question="How does the calculate function work?", + chunks=chunks, + library_name="test-lib", + ) + + # Verify prompt structure + assert "test-lib" in prompt + assert "calculate function" in prompt + assert "src/utils.py" in prompt + assert "src/main.py" in prompt + assert "[1]" in prompt + assert "[2]" in prompt + assert "```python" in prompt + + def test_truncates_long_content(self) -> None: + long_content = "x" * 3000 # Longer than 2000 char limit + chunks = [ + RetrievedChunk( + chunk_id="test:long:123", + file_path="long.py", + name="long_func", + chunk_type="function", + language="python", + start_line=1, + end_line=100, + content=long_content, + docstring="", + signature="", + score=0.5, + ), + ] + + prompt = _build_rag_prompt("What?", chunks, "lib") + + # Content should be truncated + assert "(truncated)" in prompt + # Should not contain full content + assert long_content not in prompt + + def test_empty_chunks_list(self) -> None: + prompt = _build_rag_prompt("What?", [], "lib") + assert "lib" in prompt + assert "Question" in prompt + + +class TestFormatSourcesXml: + """Tests for XML source formatting.""" + + def test_formats_sources_as_xml(self) -> None: + sources = [ + Source( + file_path="src/app.py", + name="process", + chunk_type="function", + start_line=10, + end_line=25, + relevance_score=0.95, + snippet="def process(data): ...", + ), + ] + + xml = format_sources_xml(sources) + + assert "" in xml + assert "" in xml + assert "" in xml + assert "src/app.py" in xml + assert "process" in xml + assert "function" in xml + assert "10-25" in xml + assert "0.95" in xml + + def test_escapes_special_characters(self) -> None: + sources = [ + Source( + file_path="src/.py", + name="func&name", + chunk_type="function", + start_line=1, + end_line=1, + relevance_score=0.5, + snippet="code with & entities", + ), + ] + + xml = format_sources_xml(sources) + + # XML entities should be escaped + assert "<special>" in xml + assert "func&name" in xml + + def test_empty_sources_list(self) -> None: + xml = format_sources_xml([]) + + assert "" in xml + assert "" in xml + + +class TestQueryResultDataclass: + """Tests for QueryResult dataclass.""" + + def test_default_values(self) -> None: + result = QueryResult(answer="Test answer") + + assert result.answer == "Test answer" + assert result.sources == [] + assert result.tokens_used == 0 + assert result.chunks_examined == 0 + assert result.error == "" + + def test_with_sources(self) -> None: + sources = [ + Source( + file_path="test.py", + name="test", + chunk_type="function", + start_line=1, + end_line=10, + relevance_score=0.9, + ), + ] + + result = QueryResult( + answer="Test answer", + sources=sources, + tokens_used=100, + chunks_examined=5, + ) + + assert len(result.sources) == 1 + assert result.tokens_used == 100 + assert result.chunks_examined == 5 + + +class TestRetrievedChunk: + """Tests for RetrievedChunk dataclass.""" + + def test_all_fields(self) -> None: + chunk = RetrievedChunk( + chunk_id="lib:file:hash", + file_path="src/module.py", + name="my_function", + chunk_type="function", + language="python", + start_line=10, + end_line=20, + content="def my_function(): pass", + docstring="Does something.", + signature="def my_function() -> None", + score=0.85, + ) + + assert chunk.chunk_id == "lib:file:hash" + assert chunk.file_path == "src/module.py" + assert chunk.name == "my_function" + assert chunk.language == "python" + assert chunk.score == 0.85 + + +@pytest.mark.integration +class TestQueryLibraryIntegration: + """Integration tests requiring eXist-db and LLM.""" + + async def test_query_nonexistent_library(self) -> None: + """Query should return error for non-existent library.""" + from xml_pipeline.librarian.query import query_library + + # Mock get_index to return None - patch at index module level + with patch("xml_pipeline.librarian.index.get_index", new_callable=AsyncMock) as mock_get_index: + mock_get_index.return_value = None + + result = await query_library( + library_id="nonexistent-lib-xyz", + question="What does this do?", + ) + + assert result.error + assert "not found" in result.error.lower() + + async def test_query_with_no_relevant_chunks(self) -> None: + """Query should handle case where search returns no results.""" + from xml_pipeline.librarian.query import query_library + + mock_index = LibraryIndex( + library_id="test-lib", + name="Test Library", + source_url="https://example.com/repo", + created_at="2024-01-01T00:00:00Z", + ) + + # Patch get_index at the index module level (where it's defined) + # and _search_chunks at query module level + with patch("xml_pipeline.librarian.index.get_index", new_callable=AsyncMock) as mock_get_index: + mock_get_index.return_value = mock_index + + with patch("xml_pipeline.librarian.query._search_chunks", new_callable=AsyncMock) as mock_search: + mock_search.return_value = [] + + result = await query_library( + library_id="test-lib", + question="What does foo do?", + ) + + assert "No relevant code found" in result.answer + assert result.chunks_examined == 0 + + +class TestLibraryIndex: + """Tests for LibraryIndex dataclass.""" + + def test_properties(self) -> None: + index = LibraryIndex( + library_id="test-id", + name="Test Lib", + source_url="https://github.com/test/repo", + created_at="2024-01-01", + files=["a.py", "b.py", "c.py"], + functions={"func1": "a.py", "func2": "b.py"}, + classes={"MyClass": "c.py"}, + stats={"chunks": 10, "files": 3}, + ) + + assert index.total_chunks == 10 + assert index.total_files == 3 + + def test_empty_stats(self) -> None: + index = LibraryIndex( + library_id="test", + name="Test", + source_url="", + created_at="", + ) + + assert index.total_chunks == 0 + assert index.total_files == 0 diff --git a/xml_pipeline/librarian/__init__.py b/xml_pipeline/librarian/__init__.py new file mode 100644 index 0000000..db0661d --- /dev/null +++ b/xml_pipeline/librarian/__init__.py @@ -0,0 +1,103 @@ +""" +Premium Librarian — RLM-powered codebase intelligence. + +Ingests codebases, chunks them intelligently, stores in eXist-db, +and answers natural language queries using Online LLM + RAG. + +Usage: + from xml_pipeline.librarian import ingest_git_repo, query_library + + # Ingest a codebase + result = await ingest_git_repo( + url="https://github.com/example/repo.git", + library_name="my-lib", + ) + + # Query it + answer = await query_library( + library_id=result.library_id, + question="What does this codebase do?", + ) +""" + +from xml_pipeline.librarian.chunker import ( + Chunk, + chunk_file, + chunk_python, + chunk_javascript, + chunk_cpp, + detect_language, +) +from xml_pipeline.librarian.ingest import ( + IngestResult, + ingest_git_repo, +) +from xml_pipeline.librarian.index import ( + LibraryIndex, + build_index, + get_index, +) +from xml_pipeline.librarian.query import ( + Source, + QueryResult, + query_library, +) +from xml_pipeline.librarian.primitives import ( + LibrarianIngest, + LibrarianIngested, + LibrarianQuery, + LibrarianAnswer, + LibrarianList, + LibrarianLibraries, + LibrarianDelete, + LibrarianDeleted, + LibrarianGetChunk, + LibrarianChunk, + LibraryInfo, +) +from xml_pipeline.librarian.handler import ( + handle_librarian_ingest, + handle_librarian_query, + handle_librarian_list, + handle_librarian_delete, + handle_librarian_get_chunk, +) + +__all__ = [ + # Chunker + "Chunk", + "chunk_file", + "chunk_python", + "chunk_javascript", + "chunk_cpp", + "detect_language", + # Ingest + "IngestResult", + "ingest_git_repo", + # Index + "LibraryIndex", + "build_index", + "get_index", + # Query + "Source", + "QueryResult", + "query_library", + # Primitives + "LibrarianIngest", + "LibrarianIngested", + "LibrarianQuery", + "LibrarianAnswer", + "LibrarianList", + "LibrarianLibraries", + "LibrarianDelete", + "LibrarianDeleted", + "LibrarianGetChunk", + "LibrarianChunk", + "LibraryInfo", + # Handlers + "handle_librarian_ingest", + "handle_librarian_query", + "handle_librarian_list", + "handle_librarian_delete", + "handle_librarian_get_chunk", +] diff --git a/xml_pipeline/librarian/chunker.py b/xml_pipeline/librarian/chunker.py new file mode 100644 index 0000000..a91070e --- /dev/null +++ b/xml_pipeline/librarian/chunker.py @@ -0,0 +1,677 @@ +""" +chunker.py — AST-based code chunking for intelligent RAG retrieval. + +Chunks source files into semantically meaningful units (functions, classes, modules) +preserving context like docstrings, signatures, and imports. + +Supported languages: +- Python (ast.parse) +- JavaScript/TypeScript (regex-based) +- C++ (regex-based) +""" + +from __future__ import annotations + +import ast +import re +import hashlib +from dataclasses import dataclass, field +from pathlib import Path +from typing import Optional + + +@dataclass +class Chunk: + """A semantically meaningful code chunk.""" + + content: str + file_path: str + start_line: int + end_line: int + chunk_type: str # "function", "class", "method", "module", "block" + name: str # Function/class name or file name for modules + language: str + imports: list[str] = field(default_factory=list) + docstring: str = "" + signature: str = "" # Function signature for context + parent_class: str = "" # Class name if this is a method + + @property + def chunk_id(self) -> str: + """Generate unique ID for this chunk.""" + content_hash = hashlib.sha256(self.content.encode()).hexdigest()[:12] + return f"{self.file_path}:{self.name}:{content_hash}" + + @property + def line_count(self) -> int: + """Number of lines in this chunk.""" + return self.end_line - self.start_line + 1 + + +# Language detection by file extension +LANGUAGE_MAP = { + ".py": "python", + ".pyi": "python", + ".js": "javascript", + ".jsx": "javascript", + ".ts": "typescript", + ".tsx": "typescript", + ".mjs": "javascript", + ".cjs": "javascript", + ".c": "c", + ".h": "c", + ".cpp": "cpp", + ".cxx": "cpp", + ".cc": "cpp", + ".hpp": "cpp", + ".hxx": "cpp", + ".rs": "rust", + ".go": "go", + ".java": "java", + ".kt": "kotlin", + ".rb": "ruby", + ".php": "php", + ".cs": "csharp", + ".swift": "swift", + ".scala": "scala", + ".md": "markdown", + ".rst": "restructuredtext", + ".txt": "text", +} + +# Max lines per chunk before splitting +MAX_CHUNK_LINES = 500 + + +def detect_language(file_path: str) -> str: + """Detect language from file extension.""" + suffix = Path(file_path).suffix.lower() + return LANGUAGE_MAP.get(suffix, "unknown") + + +def chunk_file(content: str, file_path: str) -> list[Chunk]: + """ + Chunk a file based on detected language. + + Dispatches to language-specific chunker or falls back to + line-based chunking for unknown languages. + """ + language = detect_language(file_path) + + if language == "python": + return chunk_python(content, file_path) + elif language in ("javascript", "typescript"): + return chunk_javascript(content, file_path) + elif language in ("c", "cpp"): + return chunk_cpp(content, file_path) + elif language in ("markdown", "restructuredtext", "text"): + return chunk_prose(content, file_path, language) + else: + # Generic line-based chunking + return chunk_generic(content, file_path, language) + + +def chunk_python(content: str, file_path: str) -> list[Chunk]: + """ + AST-based Python chunking. + + Extracts: + - Module-level imports (as context) + - Functions (with docstrings) + - Classes (with methods) + - Top-level code blocks + """ + chunks: list[Chunk] = [] + lines = content.splitlines() + + try: + tree = ast.parse(content) + except SyntaxError: + # Fall back to generic chunking on parse error + return chunk_generic(content, file_path, "python") + + # Extract imports for context + imports: list[str] = [] + for node in ast.walk(tree): + if isinstance(node, ast.Import): + for alias in node.names: + imports.append(f"import {alias.name}") + elif isinstance(node, ast.ImportFrom): + module = node.module or "" + names = ", ".join(a.name for a in node.names) + imports.append(f"from {module} import {names}") + + # Process top-level definitions + for node in ast.iter_child_nodes(tree): + if isinstance(node, ast.FunctionDef | ast.AsyncFunctionDef): + chunk = _extract_python_function(node, lines, file_path, imports) + chunks.append(chunk) + + elif isinstance(node, ast.ClassDef): + # Create chunk for class definition + methods + class_chunks = _extract_python_class(node, lines, file_path, imports) + chunks.extend(class_chunks) + + # If no chunks extracted, create a module chunk + if not chunks and content.strip(): + chunks.append( + Chunk( + content=content, + file_path=file_path, + start_line=1, + end_line=len(lines), + chunk_type="module", + name=Path(file_path).stem, + language="python", + imports=imports, + ) + ) + + return chunks + + +def _extract_python_function( + node: ast.FunctionDef | ast.AsyncFunctionDef, + lines: list[str], + file_path: str, + imports: list[str], + parent_class: str = "", +) -> Chunk: + """Extract a Python function as a chunk.""" + start_line = node.lineno + end_line = node.end_lineno or start_line + + # Get source lines (1-indexed) + func_lines = lines[start_line - 1 : end_line] + content = "\n".join(func_lines) + + # Extract docstring + docstring = ast.get_docstring(node) or "" + + # Build signature + args = [] + for arg in node.args.args: + arg_str = arg.arg + if arg.annotation: + try: + arg_str += f": {ast.unparse(arg.annotation)}" + except Exception: + pass + args.append(arg_str) + + returns = "" + if node.returns: + try: + returns = f" -> {ast.unparse(node.returns)}" + except Exception: + pass + + async_prefix = "async " if isinstance(node, ast.AsyncFunctionDef) else "" + signature = f"{async_prefix}def {node.name}({', '.join(args)}){returns}" + + chunk_type = "method" if parent_class else "function" + + return Chunk( + content=content, + file_path=file_path, + start_line=start_line, + end_line=end_line, + chunk_type=chunk_type, + name=node.name, + language="python", + imports=imports, + docstring=docstring, + signature=signature, + parent_class=parent_class, + ) + + +def _extract_python_class( + node: ast.ClassDef, + lines: list[str], + file_path: str, + imports: list[str], +) -> list[Chunk]: + """Extract a Python class and its methods as chunks.""" + chunks: list[Chunk] = [] + + start_line = node.lineno + end_line = node.end_lineno or start_line + + # Get full class source + class_lines = lines[start_line - 1 : end_line] + class_content = "\n".join(class_lines) + + # Class docstring + docstring = ast.get_docstring(node) or "" + + # Build class signature with bases + bases = [] + for base in node.bases: + try: + bases.append(ast.unparse(base)) + except Exception: + pass + + base_str = f"({', '.join(bases)})" if bases else "" + signature = f"class {node.name}{base_str}" + + # If class is small enough, keep as single chunk + if len(class_lines) <= MAX_CHUNK_LINES: + chunks.append( + Chunk( + content=class_content, + file_path=file_path, + start_line=start_line, + end_line=end_line, + chunk_type="class", + name=node.name, + language="python", + imports=imports, + docstring=docstring, + signature=signature, + ) + ) + else: + # Large class: chunk into class header + individual methods + # First, create a class header chunk (up to first method or ~50 lines) + header_end = start_line + min(50, len(class_lines) - 1) + + for child in node.body: + if isinstance(child, ast.FunctionDef | ast.AsyncFunctionDef): + header_end = child.lineno - 1 + break + + header_lines = lines[start_line - 1 : header_end] + chunks.append( + Chunk( + content="\n".join(header_lines), + file_path=file_path, + start_line=start_line, + end_line=header_end, + chunk_type="class", + name=node.name, + language="python", + imports=imports, + docstring=docstring, + signature=signature, + ) + ) + + # Then extract each method + for child in node.body: + if isinstance(child, ast.FunctionDef | ast.AsyncFunctionDef): + method_chunk = _extract_python_function( + child, lines, file_path, imports, parent_class=node.name + ) + chunks.append(method_chunk) + + return chunks + + +def chunk_javascript(content: str, file_path: str) -> list[Chunk]: + """ + Regex-based JavaScript/TypeScript chunking. + + Extracts: + - Function declarations + - Arrow functions assigned to const/let + - Class definitions + - Export statements + """ + chunks: list[Chunk] = [] + lines = content.splitlines() + language = detect_language(file_path) + + # Extract imports + imports: list[str] = [] + import_pattern = re.compile( + r'^(?:import\s+.*?from\s+[\'"].*?[\'"]|import\s+[\'"].*?[\'"]|' + r'const\s+\w+\s*=\s*require\([\'"].*?[\'"]\))', + re.MULTILINE, + ) + for match in import_pattern.finditer(content): + imports.append(match.group(0)) + + # Function pattern: function name(...) or async function name(...) + func_pattern = re.compile( + r"^(?:export\s+)?(?:async\s+)?function\s+(\w+)\s*\([^)]*\)", + re.MULTILINE, + ) + + # Arrow function pattern: const name = (...) => or const name = async (...) => + arrow_pattern = re.compile( + r"^(?:export\s+)?(?:const|let|var)\s+(\w+)\s*=\s*(?:async\s*)?\([^)]*\)\s*=>", + re.MULTILINE, + ) + + # Class pattern + class_pattern = re.compile( + r"^(?:export\s+)?(?:default\s+)?class\s+(\w+)", + re.MULTILINE, + ) + + # Find all definitions and their positions + definitions: list[tuple[int, str, str, str]] = [] # (line, type, name, signature) + + for match in func_pattern.finditer(content): + line_num = content[: match.start()].count("\n") + 1 + definitions.append((line_num, "function", match.group(1), match.group(0))) + + for match in arrow_pattern.finditer(content): + line_num = content[: match.start()].count("\n") + 1 + definitions.append((line_num, "function", match.group(1), match.group(0))) + + for match in class_pattern.finditer(content): + line_num = content[: match.start()].count("\n") + 1 + definitions.append((line_num, "class", match.group(1), match.group(0))) + + # Sort by line number + definitions.sort(key=lambda x: x[0]) + + # Create chunks + for i, (start_line, chunk_type, name, signature) in enumerate(definitions): + # End line is start of next definition - 1, or end of file + if i + 1 < len(definitions): + end_line = definitions[i + 1][0] - 1 + else: + end_line = len(lines) + + # Trim trailing empty lines + while end_line > start_line and not lines[end_line - 1].strip(): + end_line -= 1 + + chunk_lines = lines[start_line - 1 : end_line] + chunk_content = "\n".join(chunk_lines) + + # Extract JSDoc comment if present + docstring = "" + if start_line > 1: + prev_line = lines[start_line - 2].strip() + if prev_line.endswith("*/"): + # Look back for JSDoc start + doc_lines = [] + for j in range(start_line - 2, max(0, start_line - 20), -1): + doc_lines.insert(0, lines[j]) + if "/**" in lines[j]: + break + docstring = "\n".join(doc_lines) + + chunks.append( + Chunk( + content=chunk_content, + file_path=file_path, + start_line=start_line, + end_line=end_line, + chunk_type=chunk_type, + name=name, + language=language, + imports=imports, + docstring=docstring, + signature=signature, + ) + ) + + # If no chunks, create module chunk + if not chunks and content.strip(): + chunks.append( + Chunk( + content=content, + file_path=file_path, + start_line=1, + end_line=len(lines), + chunk_type="module", + name=Path(file_path).stem, + language=language, + imports=imports, + ) + ) + + return chunks + + +def chunk_cpp(content: str, file_path: str) -> list[Chunk]: + """ + Regex-based C/C++ chunking. + + Extracts: + - Function definitions + - Class definitions + - Struct definitions + """ + chunks: list[Chunk] = [] + lines = content.splitlines() + language = detect_language(file_path) + + # Extract includes + imports: list[str] = [] + include_pattern = re.compile(r'^#include\s+[<"].*?[>"]', re.MULTILINE) + for match in include_pattern.finditer(content): + imports.append(match.group(0)) + + # Function pattern (simplified): return_type name(params) { + # This is a simplified pattern that won't catch all cases + func_pattern = re.compile( + r"^(?:(?:static|inline|virtual|explicit|constexpr|template\s*<[^>]*>\s*)*" + r"(?:\w+(?:::\w+)*\s+)+)" # Return type + r"(\w+)\s*\([^)]*\)\s*(?:const\s*)?(?:override\s*)?(?:noexcept\s*)?[{;]", + re.MULTILINE, + ) + + # Class/struct pattern + class_pattern = re.compile( + r"^(?:template\s*<[^>]*>\s*)?(?:class|struct)\s+(\w+)", + re.MULTILINE, + ) + + definitions: list[tuple[int, str, str, str]] = [] + + for match in func_pattern.finditer(content): + line_num = content[: match.start()].count("\n") + 1 + name = match.group(1) + # Skip common false positives + if name not in ("if", "while", "for", "switch", "return"): + definitions.append((line_num, "function", name, match.group(0).strip())) + + for match in class_pattern.finditer(content): + line_num = content[: match.start()].count("\n") + 1 + definitions.append((line_num, "class", match.group(1), match.group(0))) + + definitions.sort(key=lambda x: x[0]) + + # Create chunks (similar to JS) + for i, (start_line, chunk_type, name, signature) in enumerate(definitions): + if i + 1 < len(definitions): + end_line = definitions[i + 1][0] - 1 + else: + end_line = len(lines) + + while end_line > start_line and not lines[end_line - 1].strip(): + end_line -= 1 + + # For functions, try to find matching brace + if chunk_type == "function": + brace_count = 0 + found_open = False + for j in range(start_line - 1, min(end_line, len(lines))): + for char in lines[j]: + if char == "{": + brace_count += 1 + found_open = True + elif char == "}": + brace_count -= 1 + if found_open and brace_count == 0: + end_line = j + 1 + break + if found_open and brace_count == 0: + break + + chunk_lines = lines[start_line - 1 : end_line] + chunk_content = "\n".join(chunk_lines) + + # Extract Doxygen comment if present + docstring = "" + if start_line > 1: + prev_line = lines[start_line - 2].strip() + if prev_line.endswith("*/"): + doc_lines = [] + for j in range(start_line - 2, max(0, start_line - 30), -1): + doc_lines.insert(0, lines[j]) + if "/**" in lines[j] or "/*!" in lines[j]: + break + docstring = "\n".join(doc_lines) + + chunks.append( + Chunk( + content=chunk_content, + file_path=file_path, + start_line=start_line, + end_line=end_line, + chunk_type=chunk_type, + name=name, + language=language, + imports=imports, + docstring=docstring, + signature=signature, + ) + ) + + if not chunks and content.strip(): + chunks.append( + Chunk( + content=content, + file_path=file_path, + start_line=1, + end_line=len(lines), + chunk_type="module", + name=Path(file_path).stem, + language=language, + imports=imports, + ) + ) + + return chunks + + +def chunk_prose(content: str, file_path: str, language: str) -> list[Chunk]: + """ + Chunk prose documents (Markdown, RST, plain text). + + Splits on headings/sections, keeping chunks under MAX_CHUNK_LINES. + """ + chunks: list[Chunk] = [] + lines = content.splitlines() + + # Markdown heading pattern + if language == "markdown": + heading_pattern = re.compile(r"^#{1,6}\s+(.+)$") + else: + heading_pattern = re.compile(r"^[=\-~]+$") # RST underline headings + + current_chunk_lines: list[str] = [] + current_start = 1 + current_name = Path(file_path).stem + + for i, line in enumerate(lines, 1): + match = heading_pattern.match(line) + + # New section or chunk too large + if match or len(current_chunk_lines) >= MAX_CHUNK_LINES: + # Save current chunk if non-empty + if current_chunk_lines: + chunks.append( + Chunk( + content="\n".join(current_chunk_lines), + file_path=file_path, + start_line=current_start, + end_line=i - 1, + chunk_type="section", + name=current_name, + language=language, + ) + ) + + # Start new chunk + current_chunk_lines = [line] + current_start = i + if match: + current_name = match.group(1) if language == "markdown" else lines[i - 2] if i > 1 else current_name + else: + current_chunk_lines.append(line) + + # Save final chunk + if current_chunk_lines: + chunks.append( + Chunk( + content="\n".join(current_chunk_lines), + file_path=file_path, + start_line=current_start, + end_line=len(lines), + chunk_type="section", + name=current_name, + language=language, + ) + ) + + return chunks + + +def chunk_generic(content: str, file_path: str, language: str) -> list[Chunk]: + """ + Generic line-based chunking for unknown languages. + + Splits content into MAX_CHUNK_LINES chunks, trying to break at empty lines. + """ + chunks: list[Chunk] = [] + lines = content.splitlines() + + if not lines: + return chunks + + current_chunk_lines: list[str] = [] + current_start = 1 + + for i, line in enumerate(lines, 1): + current_chunk_lines.append(line) + + # Check if we should split + if len(current_chunk_lines) >= MAX_CHUNK_LINES: + # Try to find a good break point (empty line in last 50 lines) + break_at = len(current_chunk_lines) + for j in range(len(current_chunk_lines) - 1, max(0, len(current_chunk_lines) - 50), -1): + if not current_chunk_lines[j].strip(): + break_at = j + break + + # Create chunk up to break point + chunks.append( + Chunk( + content="\n".join(current_chunk_lines[:break_at]), + file_path=file_path, + start_line=current_start, + end_line=current_start + break_at - 1, + chunk_type="block", + name=f"{Path(file_path).stem}:{current_start}", + language=language, + ) + ) + + # Keep remaining lines for next chunk + current_chunk_lines = current_chunk_lines[break_at:] + current_start = current_start + break_at + + # Save final chunk + if current_chunk_lines: + chunks.append( + Chunk( + content="\n".join(current_chunk_lines), + file_path=file_path, + start_line=current_start, + end_line=len(lines), + chunk_type="block", + name=f"{Path(file_path).stem}:{current_start}", + language=language, + ) + ) + + return chunks diff --git a/xml_pipeline/librarian/handler.py b/xml_pipeline/librarian/handler.py new file mode 100644 index 0000000..06b8651 --- /dev/null +++ b/xml_pipeline/librarian/handler.py @@ -0,0 +1,246 @@ +""" +handler.py — Message handlers for Premium Librarian. + +These handlers process librarian requests through the organism's message bus. +""" + +from __future__ import annotations + +import logging +from xml.sax.saxutils import escape as xml_escape + +from xml_pipeline.message_bus.message_state import HandlerMetadata, HandlerResponse + +from xml_pipeline.librarian.primitives import ( + LibrarianIngest, + LibrarianIngested, + LibrarianQuery, + LibrarianAnswer, + LibrarianList, + LibrarianLibraries, + LibrarianDelete, + LibrarianDeleted, + LibrarianGetChunk, + LibrarianChunk, +) + +logger = logging.getLogger(__name__) + + +async def handle_librarian_ingest( + payload: LibrarianIngest, + metadata: HandlerMetadata, +) -> HandlerResponse: + """ + Handle a codebase ingestion request. + + Clones the git repository, chunks all files, and stores in eXist-db. + """ + from xml_pipeline.librarian.ingest import ingest_git_repo + + logger.info(f"Ingesting codebase from {payload.git_url}") + + try: + result = await ingest_git_repo( + url=payload.git_url, + branch=payload.branch, + library_name=payload.library_name, + ) + + return HandlerResponse.respond( + payload=LibrarianIngested( + library_id=result.library_id, + library_name=result.library_name, + files_processed=result.files_processed, + chunks_created=result.chunks_created, + index_built=result.index_built, + errors="\n".join(result.errors) if result.errors else "", + ) + ) + + except Exception as e: + logger.error(f"Ingest failed: {e}") + return HandlerResponse.respond( + payload=LibrarianIngested( + library_id="", + library_name=payload.library_name or "", + files_processed=0, + chunks_created=0, + index_built=False, + errors=str(e), + ) + ) + + +async def handle_librarian_query( + payload: LibrarianQuery, + metadata: HandlerMetadata, +) -> HandlerResponse: + """ + Handle a library query request. + + Searches for relevant code chunks and synthesizes an answer using LLM. + """ + from xml_pipeline.librarian.query import query_library, format_sources_xml + + logger.info(f"Querying library {payload.library_id}: {payload.question[:100]}...") + + try: + result = await query_library( + library_id=payload.library_id, + question=payload.question, + max_chunks=payload.max_chunks, + model=payload.model, + ) + + sources_xml = format_sources_xml(result.sources) if result.sources else "" + + return HandlerResponse.respond( + payload=LibrarianAnswer( + answer=result.answer, + sources=sources_xml, + tokens_used=result.tokens_used, + chunks_examined=result.chunks_examined, + error=result.error, + ) + ) + + except Exception as e: + logger.error(f"Query failed: {e}") + return HandlerResponse.respond( + payload=LibrarianAnswer( + answer="", + sources="", + tokens_used=0, + chunks_examined=0, + error=str(e), + ) + ) + + +async def handle_librarian_list( + payload: LibrarianList, + metadata: HandlerMetadata, +) -> HandlerResponse: + """ + Handle a request to list all ingested libraries. + """ + from xml_pipeline.librarian.index import list_libraries + + logger.info("Listing all libraries") + + try: + libraries = await list_libraries() + + # Format libraries as XML + lib_items = [] + for lib in libraries: + lib_items.append( + f""" + {xml_escape(lib.library_id)} + {xml_escape(lib.name)} + {xml_escape(lib.source_url)} + {xml_escape(lib.created_at)} + {lib.total_files} + {lib.total_chunks} + """ + ) + + libraries_xml = "\n" + "\n".join(lib_items) + "\n" + + return HandlerResponse.respond( + payload=LibrarianLibraries( + count=len(libraries), + libraries=libraries_xml, + ) + ) + + except Exception as e: + logger.error(f"List failed: {e}") + return HandlerResponse.respond( + payload=LibrarianLibraries( + count=0, + libraries="", + ) + ) + + +async def handle_librarian_delete( + payload: LibrarianDelete, + metadata: HandlerMetadata, +) -> HandlerResponse: + """ + Handle a request to delete a library. + """ + from xml_pipeline.librarian.index import delete_library + + logger.info(f"Deleting library {payload.library_id}") + + try: + success = await delete_library(payload.library_id) + + return HandlerResponse.respond( + payload=LibrarianDeleted( + library_id=payload.library_id, + success=success, + error="" if success else "Delete operation failed", + ) + ) + + except Exception as e: + logger.error(f"Delete failed: {e}") + return HandlerResponse.respond( + payload=LibrarianDeleted( + library_id=payload.library_id, + success=False, + error=str(e), + ) + ) + + +async def handle_librarian_get_chunk( + payload: LibrarianGetChunk, + metadata: HandlerMetadata, +) -> HandlerResponse: + """ + Handle a request to retrieve a specific code chunk. + """ + from xml_pipeline.librarian.query import get_chunk_by_id + + logger.info(f"Getting chunk {payload.chunk_id} from library {payload.library_id}") + + try: + chunk = await get_chunk_by_id(payload.library_id, payload.chunk_id) + + if chunk is None: + return HandlerResponse.respond( + payload=LibrarianChunk( + chunk_id=payload.chunk_id, + error=f"Chunk not found: {payload.chunk_id}", + ) + ) + + return HandlerResponse.respond( + payload=LibrarianChunk( + chunk_id=chunk.chunk_id, + file_path=chunk.file_path, + name=chunk.name, + chunk_type=chunk.chunk_type, + language=chunk.language, + start_line=chunk.start_line, + end_line=chunk.end_line, + content=chunk.content, + docstring=chunk.docstring, + signature=chunk.signature, + error="", + ) + ) + + except Exception as e: + logger.error(f"Get chunk failed: {e}") + return HandlerResponse.respond( + payload=LibrarianChunk( + chunk_id=payload.chunk_id, + error=str(e), + ) + ) diff --git a/xml_pipeline/librarian/index.py b/xml_pipeline/librarian/index.py new file mode 100644 index 0000000..4dc9417 --- /dev/null +++ b/xml_pipeline/librarian/index.py @@ -0,0 +1,328 @@ +""" +index.py — Library index management for Premium Librarian. + +Builds and queries structural indices for ingested codebases. +The index provides fast lookup of files, functions, and classes +without needing full-text search. +""" + +from __future__ import annotations + +import logging +from dataclasses import dataclass, field +from datetime import datetime, timezone +from typing import Optional +from xml.sax.saxutils import escape as xml_escape + +logger = logging.getLogger(__name__) + + +@dataclass +class LibraryIndex: + """Structural index for an ingested library.""" + + library_id: str + name: str + source_url: str + created_at: str + files: list[str] = field(default_factory=list) + functions: dict[str, str] = field(default_factory=dict) # name → file path + classes: dict[str, str] = field(default_factory=dict) # name → file path + modules: list[str] = field(default_factory=list) + stats: dict[str, int] = field(default_factory=dict) + + @property + def total_chunks(self) -> int: + """Total number of chunks in this library.""" + return self.stats.get("chunks", 0) + + @property + def total_files(self) -> int: + """Total number of files in this library.""" + return len(self.files) + + +def _index_to_xml(index: LibraryIndex) -> str: + """Convert index to XML document for storage.""" + files_xml = "\n".join(f" {xml_escape(f)}" for f in index.files) + + functions_xml = "\n".join( + f' ' + for name, path in index.functions.items() + ) + + classes_xml = "\n".join( + f' ' + for name, path in index.classes.items() + ) + + modules_xml = "\n".join(f" {xml_escape(m)}" for m in index.modules) + + stats_xml = "\n".join( + f' {v}' + for k, v in index.stats.items() + ) + + return f""" + + {xml_escape(index.library_id)} + {xml_escape(index.name)} + {xml_escape(index.source_url)} + {xml_escape(index.created_at)} + +{files_xml} + + +{functions_xml} + + +{classes_xml} + + +{modules_xml} + + +{stats_xml} + +""" + + +def _parse_index_xml(xml_content: str) -> Optional[LibraryIndex]: + """Parse index XML back to LibraryIndex object.""" + try: + from lxml import etree + + root = etree.fromstring(xml_content.encode()) + ns = {"l": "https://xml-pipeline.org/ns/librarian/v1"} + + library_id = root.findtext("l:library-id", "", namespaces=ns) + name = root.findtext("l:name", "", namespaces=ns) + source_url = root.findtext("l:source-url", "", namespaces=ns) + created_at = root.findtext("l:created-at", "", namespaces=ns) + + files = [f.text or "" for f in root.findall("l:files/l:file", namespaces=ns)] + + functions = { + f.get("name", ""): f.get("file", "") + for f in root.findall("l:functions/l:function", namespaces=ns) + } + + classes = { + c.get("name", ""): c.get("file", "") + for c in root.findall("l:classes/l:class", namespaces=ns) + } + + modules = [m.text or "" for m in root.findall("l:modules/l:module", namespaces=ns)] + + stats = { + s.get("name", ""): int(s.text or 0) + for s in root.findall("l:stats/l:stat", namespaces=ns) + } + + return LibraryIndex( + library_id=library_id, + name=name, + source_url=source_url, + created_at=created_at, + files=files, + functions=functions, + classes=classes, + modules=modules, + stats=stats, + ) + + except Exception as e: + logger.error(f"Failed to parse index XML: {e}") + return None + + +async def build_index( + library_id: str, + library_name: str, + source_url: str, +) -> LibraryIndex: + """ + Build structural index from stored chunks. + + Queries eXist-db for all chunks belonging to this library + and extracts structural information. + """ + from xml_pipeline.tools.librarian import librarian_query, librarian_store + + # Query for all chunks in this library + xquery = f""" + declare namespace l = "https://xml-pipeline.org/ns/librarian/v1"; + for $chunk in collection("/db/librarian/{library_id}/chunks")//l:chunk + return + {{$chunk/l:file-path/text()}} + {{$chunk/l:chunk-type/text()}} + {{$chunk/l:name/text()}} + {{$chunk/l:language/text()}} + + """ + + result = await librarian_query(query=xquery, collection=f"/db/librarian/{library_id}") + + if not result.success: + logger.warning(f"Failed to query chunks for index: {result.error}") + # Create minimal index + index = LibraryIndex( + library_id=library_id, + name=library_name, + source_url=source_url, + created_at=datetime.now(timezone.utc).isoformat(), + ) + else: + # Parse results + files: set[str] = set() + functions: dict[str, str] = {} + classes: dict[str, str] = {} + modules: list[str] = [] + lang_stats: dict[str, int] = {} + chunk_count = 0 + + try: + from lxml import etree + + # Wrap results in root element for parsing + xml_str = f"{result.data.get('results', '')}" + root = etree.fromstring(xml_str.encode()) + + for item in root.findall("item"): + chunk_count += 1 + file_path = item.findtext("file", "") + chunk_type = item.findtext("type", "") + name = item.findtext("name", "") + language = item.findtext("language", "") + + if file_path: + files.add(file_path) + + if chunk_type == "function" or chunk_type == "method": + functions[name] = file_path + elif chunk_type == "class": + classes[name] = file_path + elif chunk_type == "module": + modules.append(file_path) + + if language: + lang_stats[language] = lang_stats.get(language, 0) + 1 + + except Exception as e: + logger.warning(f"Failed to parse chunk query results: {e}") + + index = LibraryIndex( + library_id=library_id, + name=library_name, + source_url=source_url, + created_at=datetime.now(timezone.utc).isoformat(), + files=sorted(files), + functions=functions, + classes=classes, + modules=modules, + stats={ + "chunks": chunk_count, + "files": len(files), + "functions": len(functions), + "classes": len(classes), + **{f"lang_{k}": v for k, v in lang_stats.items()}, + }, + ) + + # Store index document + index_xml = _index_to_xml(index) + store_result = await librarian_store( + collection=f"/db/librarian/{library_id}", + document_name="index.xml", + content=index_xml, + ) + + if not store_result.success: + logger.warning(f"Failed to store index: {store_result.error}") + + return index + + +async def get_index(library_id: str) -> Optional[LibraryIndex]: + """ + Retrieve library index from eXist-db. + + Returns None if index doesn't exist. + """ + from xml_pipeline.tools.librarian import librarian_get + + result = await librarian_get(f"/db/librarian/{library_id}/index.xml") + + if not result.success: + return None + + content = result.data.get("content", "") + return _parse_index_xml(content) + + +async def list_libraries() -> list[LibraryIndex]: + """ + List all ingested libraries. + + Returns list of LibraryIndex objects for all libraries in eXist-db. + """ + from xml_pipeline.tools.librarian import librarian_query + + xquery = """ + declare namespace l = "https://xml-pipeline.org/ns/librarian/v1"; + for $index in collection("/db/librarian")//l:library-index + return $index + """ + + result = await librarian_query(query=xquery, collection="/db/librarian") + + if not result.success: + logger.warning(f"Failed to list libraries: {result.error}") + return [] + + libraries: list[LibraryIndex] = [] + + try: + from lxml import etree + + # Parse each index document + xml_str = result.data.get("results", "") + if xml_str.strip(): + # Wrap in root element + wrapped = f"{xml_str}" + root = etree.fromstring(wrapped.encode()) + + for index_elem in root.findall( + "{https://xml-pipeline.org/ns/librarian/v1}library-index" + ): + index_xml = etree.tostring(index_elem, encoding="unicode") + index = _parse_index_xml(index_xml) + if index: + libraries.append(index) + + except Exception as e: + logger.warning(f"Failed to parse library list: {e}") + + return libraries + + +async def delete_library(library_id: str) -> bool: + """ + Delete a library and all its chunks from eXist-db. + + Returns True if successful. + """ + from xml_pipeline.tools.librarian import librarian_query + + # Delete the entire collection + xquery = f""" + xmldb:remove("/db/librarian/{library_id}") + """ + + result = await librarian_query(query=xquery) + + if not result.success: + logger.warning(f"Failed to delete library {library_id}: {result.error}") + return False + + return True diff --git a/xml_pipeline/librarian/ingest.py b/xml_pipeline/librarian/ingest.py new file mode 100644 index 0000000..6f73430 --- /dev/null +++ b/xml_pipeline/librarian/ingest.py @@ -0,0 +1,393 @@ +""" +ingest.py — Codebase ingestion for Premium Librarian. + +Clones git repositories, walks files, chunks them, and stores in eXist-db. +""" + +from __future__ import annotations + +import asyncio +import hashlib +import logging +import shutil +import tempfile +import uuid +from dataclasses import dataclass, field +from datetime import datetime, timezone +from pathlib import Path +from typing import AsyncIterator, Optional +from xml.sax.saxutils import escape as xml_escape + +from xml_pipeline.librarian.chunker import Chunk, chunk_file, detect_language + +logger = logging.getLogger(__name__) + + +# File patterns to skip during ingestion +SKIP_PATTERNS = { + # Version control + ".git", + ".svn", + ".hg", + # Dependencies + "node_modules", + "vendor", + ".venv", + "venv", + "__pycache__", + ".pytest_cache", + ".mypy_cache", + ".ruff_cache", + # Build artifacts + "dist", + "build", + "target", + "out", + ".next", + # IDE + ".idea", + ".vscode", + # OS + ".DS_Store", + "Thumbs.db", +} + +# File extensions to process +CODE_EXTENSIONS = { + ".py", ".pyi", # Python + ".js", ".jsx", ".ts", ".tsx", ".mjs", ".cjs", # JavaScript/TypeScript + ".c", ".h", ".cpp", ".cxx", ".cc", ".hpp", ".hxx", # C/C++ + ".rs", # Rust + ".go", # Go + ".java", # Java + ".kt", ".kts", # Kotlin + ".rb", # Ruby + ".php", # PHP + ".cs", # C# + ".swift", # Swift + ".scala", # Scala + ".md", ".rst", ".txt", # Documentation + ".yaml", ".yml", ".toml", ".json", # Config + ".xml", ".xsd", # XML + ".sql", # SQL + ".sh", ".bash", ".zsh", # Shell + ".dockerfile", ".containerfile", # Docker +} + +# Max file size to process (1MB) +MAX_FILE_SIZE = 1024 * 1024 + + +@dataclass +class IngestResult: + """Result of a codebase ingestion.""" + + library_id: str + library_name: str + files_processed: int + chunks_created: int + index_built: bool + errors: list[str] = field(default_factory=list) + stats: dict[str, int] = field(default_factory=dict) + + +@dataclass +class IngestConfig: + """Configuration for ingestion.""" + + branch: str = "main" + max_file_size: int = MAX_FILE_SIZE + skip_patterns: set[str] = field(default_factory=lambda: SKIP_PATTERNS.copy()) + extensions: set[str] = field(default_factory=lambda: CODE_EXTENSIONS.copy()) + + +def _should_skip_path(path: Path, config: IngestConfig) -> bool: + """Check if a path should be skipped.""" + for part in path.parts: + if part in config.skip_patterns: + return True + if part.startswith(".") and part not in {".github", ".gitlab"}: + return True + return False + + +def _should_process_file(path: Path, config: IngestConfig) -> bool: + """Check if a file should be processed.""" + # Check extension + suffix = path.suffix.lower() + if suffix not in config.extensions: + # Also check for files without extension (Dockerfile, Makefile, etc.) + name_lower = path.name.lower() + if name_lower not in {"dockerfile", "makefile", "rakefile", "gemfile"}: + return False + + # Check size + try: + if path.stat().st_size > config.max_file_size: + return False + except OSError: + return False + + return True + + +async def _clone_repo(url: str, branch: str, target_dir: Path) -> None: + """Clone a git repository.""" + try: + # Try using GitPython + from git import Repo + logger.info(f"Cloning {url} (branch: {branch}) to {target_dir}") + Repo.clone_from(url, target_dir, branch=branch, depth=1) + except ImportError: + # Fall back to git CLI + logger.info(f"GitPython not available, using git CLI") + proc = await asyncio.create_subprocess_exec( + "git", "clone", "--depth", "1", "--branch", branch, url, str(target_dir), + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + ) + stdout, stderr = await proc.communicate() + if proc.returncode != 0: + raise RuntimeError(f"git clone failed: {stderr.decode()}") + + +async def _walk_files(root: Path, config: IngestConfig) -> AsyncIterator[Path]: + """Walk directory tree, yielding files to process.""" + for path in root.rglob("*"): + if path.is_file(): + rel_path = path.relative_to(root) + if not _should_skip_path(rel_path, config): + if _should_process_file(path, config): + yield path + + +def _chunk_to_xml(chunk: Chunk, library_id: str) -> str: + """Convert a chunk to XML document for storage.""" + # Escape content for XML + content_escaped = xml_escape(chunk.content) + docstring_escaped = xml_escape(chunk.docstring) if chunk.docstring else "" + signature_escaped = xml_escape(chunk.signature) if chunk.signature else "" + + imports_xml = "\n".join(f" {xml_escape(imp)}" for imp in chunk.imports) + + return f""" + + {xml_escape(chunk.chunk_id)} + {xml_escape(library_id)} + {xml_escape(chunk.file_path)} + {chunk.start_line} + {chunk.end_line} + {xml_escape(chunk.chunk_type)} + {xml_escape(chunk.name)} + {xml_escape(chunk.language)} + {xml_escape(chunk.parent_class)} + {signature_escaped} + {docstring_escaped} + +{imports_xml} + + +""" + + +async def _store_chunk( + chunk: Chunk, + library_id: str, + collection: str, +) -> bool: + """Store a chunk in eXist-db.""" + from xml_pipeline.tools.librarian import librarian_store + + xml_content = _chunk_to_xml(chunk, library_id) + + # Generate document name from chunk ID + doc_name = f"{chunk.chunk_id.replace(':', '_').replace('/', '_')}.xml" + + result = await librarian_store( + collection=collection, + document_name=doc_name, + content=xml_content, + ) + + return result.success + + +async def ingest_git_repo( + url: str, + branch: str = "main", + library_name: str = "", + config: Optional[IngestConfig] = None, +) -> IngestResult: + """ + Clone and ingest a git repository. + + Args: + url: Git repository URL + branch: Branch to clone (default: main) + library_name: Human-readable name (derived from URL if empty) + config: Ingestion configuration + + Returns: + IngestResult with statistics and library_id + """ + if config is None: + config = IngestConfig(branch=branch) + + # Derive library name from URL if not provided + if not library_name: + # Extract repo name from URL + # https://github.com/user/repo.git -> repo + # git@github.com:user/repo.git -> repo + name = url.rstrip("/").rstrip(".git").split("/")[-1].split(":")[-1] + library_name = name + + # Generate unique library ID + library_id = f"{library_name}-{uuid.uuid4().hex[:8]}" + + result = IngestResult( + library_id=library_id, + library_name=library_name, + files_processed=0, + chunks_created=0, + index_built=False, + ) + + # Create temp directory for clone + temp_dir = Path(tempfile.mkdtemp(prefix="librarian_")) + + try: + # Clone repository + await _clone_repo(url, config.branch, temp_dir) + + # Collection path in eXist-db + collection = f"/db/librarian/{library_id}/chunks" + + # Track language statistics + lang_stats: dict[str, int] = {} + + # Process files + async for file_path in _walk_files(temp_dir, config): + try: + # Read file content + content = file_path.read_text(encoding="utf-8", errors="replace") + + # Get relative path for storage + rel_path = str(file_path.relative_to(temp_dir)) + + # Detect language and update stats + language = detect_language(rel_path) + lang_stats[language] = lang_stats.get(language, 0) + 1 + + # Chunk the file + chunks = chunk_file(content, rel_path) + + # Store each chunk + for chunk in chunks: + success = await _store_chunk(chunk, library_id, collection) + if success: + result.chunks_created += 1 + else: + result.errors.append(f"Failed to store chunk: {chunk.chunk_id}") + + result.files_processed += 1 + + except Exception as e: + result.errors.append(f"Error processing {file_path}: {e}") + logger.warning(f"Error processing {file_path}: {e}") + + result.stats = lang_stats + + # Build index + from xml_pipeline.librarian.index import build_index + + try: + await build_index(library_id, library_name, url) + result.index_built = True + except Exception as e: + result.errors.append(f"Index build failed: {e}") + logger.warning(f"Index build failed: {e}") + + logger.info( + f"Ingested {library_name}: {result.files_processed} files, " + f"{result.chunks_created} chunks" + ) + + finally: + # Cleanup temp directory + shutil.rmtree(temp_dir, ignore_errors=True) + + return result + + +async def ingest_directory( + path: str | Path, + library_name: str, + config: Optional[IngestConfig] = None, +) -> IngestResult: + """ + Ingest a local directory (for testing or local codebases). + + Args: + path: Path to directory + library_name: Human-readable name + config: Ingestion configuration + + Returns: + IngestResult with statistics and library_id + """ + if config is None: + config = IngestConfig() + + root = Path(path) + if not root.is_dir(): + raise ValueError(f"Not a directory: {path}") + + # Generate unique library ID + library_id = f"{library_name}-{uuid.uuid4().hex[:8]}" + + result = IngestResult( + library_id=library_id, + library_name=library_name, + files_processed=0, + chunks_created=0, + index_built=False, + ) + + collection = f"/db/librarian/{library_id}/chunks" + lang_stats: dict[str, int] = {} + + async for file_path in _walk_files(root, config): + try: + content = file_path.read_text(encoding="utf-8", errors="replace") + rel_path = str(file_path.relative_to(root)) + + language = detect_language(rel_path) + lang_stats[language] = lang_stats.get(language, 0) + 1 + + chunks = chunk_file(content, rel_path) + + for chunk in chunks: + success = await _store_chunk(chunk, library_id, collection) + if success: + result.chunks_created += 1 + else: + result.errors.append(f"Failed to store chunk: {chunk.chunk_id}") + + result.files_processed += 1 + + except Exception as e: + result.errors.append(f"Error processing {file_path}: {e}") + logger.warning(f"Error processing {file_path}: {e}") + + result.stats = lang_stats + + # Build index + from xml_pipeline.librarian.index import build_index + + try: + await build_index(library_id, library_name, str(root)) + result.index_built = True + except Exception as e: + result.errors.append(f"Index build failed: {e}") + + return result diff --git a/xml_pipeline/librarian/primitives.py b/xml_pipeline/librarian/primitives.py new file mode 100644 index 0000000..ecb5423 --- /dev/null +++ b/xml_pipeline/librarian/primitives.py @@ -0,0 +1,167 @@ +""" +primitives.py — XML payload dataclasses for Premium Librarian. + +These are the message types that flow through the organism's message bus. + +Note: Do NOT use `from __future__ import annotations` here +as it breaks the xmlify decorator which needs concrete types. +""" + +from dataclasses import dataclass + +from third_party.xmlable import xmlify + + +@xmlify +@dataclass +class LibrarianIngest: + """ + Request to ingest a codebase into the Premium Librarian. + + Supports git URLs. The library will be cloned, chunked, and stored + in eXist-db for subsequent querying. + """ + + git_url: str = "" + branch: str = "main" + library_name: str = "" # Optional; derived from URL if empty + + +@xmlify +@dataclass +class LibrarianIngested: + """ + Response after successful codebase ingestion. + + Contains the library_id needed for subsequent queries. + """ + + library_id: str = "" + library_name: str = "" + files_processed: int = 0 + chunks_created: int = 0 + index_built: bool = False + errors: str = "" # Newline-separated error messages + + +@xmlify +@dataclass +class LibrarianQuery: + """ + Query an ingested library with a natural language question. + + The system will search for relevant code chunks and synthesize + an answer using the configured LLM. + """ + + library_id: str = "" + question: str = "" + max_chunks: int = 20 # Max chunks to include in context + model: str = "" # Optional; uses default if empty + + +@xmlify +@dataclass +class LibrarianAnswer: + """ + Response to a library query. + + Contains the synthesized answer and source references. + """ + + answer: str = "" + sources: str = "" # XML-formatted source list + tokens_used: int = 0 + chunks_examined: int = 0 + error: str = "" + + +@xmlify +@dataclass +class LibrarianList: + """ + Request to list all ingested libraries. + """ + + pass # No parameters needed + + +@xmlify +@dataclass +class LibraryInfo: + """ + Information about a single ingested library. + """ + + library_id: str = "" + name: str = "" + source_url: str = "" + created_at: str = "" + total_files: int = 0 + total_chunks: int = 0 + + +@xmlify +@dataclass +class LibrarianLibraries: + """ + Response listing all ingested libraries. + """ + + count: int = 0 + libraries: str = "" # XML-formatted library list + + +@xmlify +@dataclass +class LibrarianDelete: + """ + Request to delete an ingested library. + """ + + library_id: str = "" + + +@xmlify +@dataclass +class LibrarianDeleted: + """ + Response after library deletion. + """ + + library_id: str = "" + success: bool = False + error: str = "" + + +@xmlify +@dataclass +class LibrarianGetChunk: + """ + Request to retrieve a specific code chunk. + + Useful for examining source code referenced in a query response. + """ + + library_id: str = "" + chunk_id: str = "" + + +@xmlify +@dataclass +class LibrarianChunk: + """ + Response with a specific code chunk. + """ + + chunk_id: str = "" + file_path: str = "" + name: str = "" + chunk_type: str = "" + language: str = "" + start_line: int = 0 + end_line: int = 0 + content: str = "" + docstring: str = "" + signature: str = "" + error: str = "" diff --git a/xml_pipeline/librarian/query.py b/xml_pipeline/librarian/query.py new file mode 100644 index 0000000..f045742 --- /dev/null +++ b/xml_pipeline/librarian/query.py @@ -0,0 +1,436 @@ +""" +query.py — RAG-based query system for Premium Librarian. + +Searches indexed codebases and synthesizes answers using Online LLM. +The flow: Search → Retrieve → Synthesize → Return with sources. +""" + +from __future__ import annotations + +import logging +from dataclasses import dataclass, field +from typing import Optional +from xml.sax.saxutils import escape as xml_escape + +logger = logging.getLogger(__name__) + + +@dataclass +class Source: + """A source chunk used in answering a query.""" + + file_path: str + name: str + chunk_type: str + start_line: int + end_line: int + relevance_score: float + snippet: str = "" # First ~200 chars of content + + +@dataclass +class QueryResult: + """Result of a library query.""" + + answer: str + sources: list[Source] = field(default_factory=list) + tokens_used: int = 0 + chunks_examined: int = 0 + error: str = "" + + +@dataclass +class RetrievedChunk: + """A chunk retrieved from eXist-db for RAG.""" + + chunk_id: str + file_path: str + name: str + chunk_type: str + language: str + start_line: int + end_line: int + content: str + docstring: str + signature: str + score: float + + +async def _search_chunks( + library_id: str, + query: str, + max_results: int = 20, +) -> list[RetrievedChunk]: + """ + Search for relevant chunks using Lucene full-text search. + + Returns chunks sorted by relevance score. + """ + from xml_pipeline.tools.librarian import librarian_query + + # Escape query for XQuery + query_escaped = query.replace('"', '\\"').replace("'", "\\'") + + # Full-text search using Lucene + xquery = f""" + declare namespace l = "https://xml-pipeline.org/ns/librarian/v1"; + import module namespace ft = "http://exist-db.org/xquery/lucene"; + + for $chunk in collection("/db/librarian/{library_id}/chunks")//l:chunk + let $content := $chunk/l:content/text() + let $name := $chunk/l:name/text() + let $docstring := $chunk/l:docstring/text() + let $score := ( + if (ft:query($content, "{query_escaped}")) then ft:score($content) * 2 + else if (ft:query($name, "{query_escaped}")) then ft:score($name) * 3 + else if (ft:query($docstring, "{query_escaped}")) then ft:score($docstring) + else 0 + ) + where $score > 0 + order by $score descending + return + {{$chunk/l:id/text()}} + {{$chunk/l:file-path/text()}} + {{$chunk/l:name/text()}} + {{$chunk/l:chunk-type/text()}} + {{$chunk/l:language/text()}} + {{$chunk/l:start-line/text()}} + {{$chunk/l:end-line/text()}} + {{$chunk/l:signature/text()}} + {{$chunk/l:docstring/text()}} + {{$chunk/l:content/text()}} + + """ + + result = await librarian_query( + query=xquery, + collection=f"/db/librarian/{library_id}", + ) + + chunks: list[RetrievedChunk] = [] + + if not result.success: + logger.warning(f"Search failed: {result.error}") + # Fall back to simple query without Lucene + return await _search_chunks_fallback(library_id, query, max_results) + + try: + from lxml import etree + + xml_str = f"{result.data.get('results', '')}" + root = etree.fromstring(xml_str.encode()) + + for item in root.findall("result")[:max_results]: + score = float(item.get("score", 0)) + + chunks.append( + RetrievedChunk( + chunk_id=item.findtext("id", ""), + file_path=item.findtext("file-path", ""), + name=item.findtext("name", ""), + chunk_type=item.findtext("chunk-type", ""), + language=item.findtext("language", ""), + start_line=int(item.findtext("start-line", "0")), + end_line=int(item.findtext("end-line", "0")), + content=item.findtext("content", ""), + docstring=item.findtext("docstring", ""), + signature=item.findtext("signature", ""), + score=score, + ) + ) + + except Exception as e: + logger.warning(f"Failed to parse search results: {e}") + + return chunks + + +async def _search_chunks_fallback( + library_id: str, + query: str, + max_results: int = 20, +) -> list[RetrievedChunk]: + """ + Fallback search using contains() when Lucene is not available. + + Less accurate but works without Lucene indexing. + """ + from xml_pipeline.tools.librarian import librarian_query + + # Simple contains search + query_lower = query.lower().replace('"', '\\"').replace("'", "\\'") + terms = query_lower.split() + + # Build contains conditions + conditions = [] + for term in terms[:5]: # Limit to 5 terms + conditions.append( + f'(contains(lower-case($chunk/l:content), "{term}") or ' + f'contains(lower-case($chunk/l:name), "{term}") or ' + f'contains(lower-case($chunk/l:docstring), "{term}"))' + ) + + where_clause = " or ".join(conditions) if conditions else "true()" + + xquery = f""" + declare namespace l = "https://xml-pipeline.org/ns/librarian/v1"; + + for $chunk in collection("/db/librarian/{library_id}/chunks")//l:chunk + where {where_clause} + return + {{$chunk/l:id/text()}} + {{$chunk/l:file-path/text()}} + {{$chunk/l:name/text()}} + {{$chunk/l:chunk-type/text()}} + {{$chunk/l:language/text()}} + {{$chunk/l:start-line/text()}} + {{$chunk/l:end-line/text()}} + {{$chunk/l:signature/text()}} + {{$chunk/l:docstring/text()}} + {{$chunk/l:content/text()}} + + """ + + result = await librarian_query( + query=xquery, + collection=f"/db/librarian/{library_id}", + ) + + chunks: list[RetrievedChunk] = [] + + if not result.success: + logger.warning(f"Fallback search failed: {result.error}") + return chunks + + try: + from lxml import etree + + xml_str = f"{result.data.get('results', '')}" + root = etree.fromstring(xml_str.encode()) + + for i, item in enumerate(root.findall("result")[:max_results]): + # Assign decreasing score based on order + score = 1.0 - (i * 0.05) + + chunks.append( + RetrievedChunk( + chunk_id=item.findtext("id", ""), + file_path=item.findtext("file-path", ""), + name=item.findtext("name", ""), + chunk_type=item.findtext("chunk-type", ""), + language=item.findtext("language", ""), + start_line=int(item.findtext("start-line", "0")), + end_line=int(item.findtext("end-line", "0")), + content=item.findtext("content", ""), + docstring=item.findtext("docstring", ""), + signature=item.findtext("signature", ""), + score=score, + ) + ) + + except Exception as e: + logger.warning(f"Failed to parse fallback search results: {e}") + + return chunks + + +def _build_rag_prompt( + question: str, + chunks: list[RetrievedChunk], + library_name: str, +) -> str: + """Build the RAG prompt with retrieved context.""" + context_parts = [] + + for i, chunk in enumerate(chunks, 1): + header = f"[{i}] {chunk.file_path}:{chunk.start_line}-{chunk.end_line}" + if chunk.signature: + header += f"\n {chunk.signature}" + + # Truncate content if too long + content = chunk.content + if len(content) > 2000: + content = content[:2000] + "\n... (truncated)" + + context_parts.append(f"{header}\n```{chunk.language}\n{content}\n```") + + context = "\n\n".join(context_parts) + + return f"""You are a code assistant analyzing the "{library_name}" codebase. + +Answer the following question based ONLY on the provided code context. +If the answer is not in the context, say so clearly. +Reference specific files and line numbers when relevant. + +## Code Context + +{context} + +## Question + +{question} + +## Instructions + +1. Answer based on the code context above +2. Cite sources using [1], [2], etc. format +3. Include relevant code snippets if helpful +4. Be concise but complete""" + + +async def query_library( + library_id: str, + question: str, + max_chunks: int = 20, + model: str = "", +) -> QueryResult: + """ + Query an ingested library using RAG. + + Args: + library_id: ID of the ingested library + question: Natural language question + max_chunks: Maximum chunks to retrieve for context + model: LLM model to use (empty = use default) + + Returns: + QueryResult with answer and sources + """ + from xml_pipeline.librarian.index import get_index + from xml_pipeline.llm import complete + + # Get library info + index = await get_index(library_id) + if not index: + return QueryResult( + answer="", + error=f"Library not found: {library_id}", + ) + + # Search for relevant chunks + chunks = await _search_chunks(library_id, question, max_chunks) + + if not chunks: + return QueryResult( + answer=f"No relevant code found for your question in the '{index.name}' codebase.", + chunks_examined=0, + ) + + # Build RAG prompt + prompt = _build_rag_prompt(question, chunks, index.name) + + # Call LLM + try: + response = await complete( + model=model or "grok-4.1", # Default model + messages=[ + {"role": "user", "content": prompt}, + ], + ) + + answer = response.content + tokens_used = response.usage.get("total_tokens", 0) + + except Exception as e: + logger.error(f"LLM call failed: {e}") + return QueryResult( + answer="", + error=f"Failed to generate answer: {e}", + chunks_examined=len(chunks), + ) + + # Build sources list + sources = [ + Source( + file_path=chunk.file_path, + name=chunk.name, + chunk_type=chunk.chunk_type, + start_line=chunk.start_line, + end_line=chunk.end_line, + relevance_score=chunk.score, + snippet=chunk.content[:200] if chunk.content else "", + ) + for chunk in chunks + ] + + return QueryResult( + answer=answer, + sources=sources, + tokens_used=tokens_used, + chunks_examined=len(chunks), + ) + + +def format_sources_xml(sources: list[Source]) -> str: + """Format sources as XML for LibrarianAnswer payload.""" + source_items = [] + + for i, source in enumerate(sources, 1): + snippet_escaped = xml_escape(source.snippet[:100]) if source.snippet else "" + source_items.append( + f""" + {xml_escape(source.file_path)} + {xml_escape(source.name)} + {xml_escape(source.chunk_type)} + {source.start_line}-{source.end_line} + {source.relevance_score:.2f} + {snippet_escaped} + """ + ) + + return "\n" + "\n".join(source_items) + "\n" + + +async def get_chunk_by_id(library_id: str, chunk_id: str) -> Optional[RetrievedChunk]: + """ + Retrieve a specific chunk by ID. + + Useful for follow-up queries about a specific piece of code. + """ + from xml_pipeline.tools.librarian import librarian_query + + chunk_id_escaped = chunk_id.replace('"', '\\"') + + xquery = f""" + declare namespace l = "https://xml-pipeline.org/ns/librarian/v1"; + + for $chunk in collection("/db/librarian/{library_id}/chunks")//l:chunk + where $chunk/l:id = "{chunk_id_escaped}" + return $chunk + """ + + result = await librarian_query( + query=xquery, + collection=f"/db/librarian/{library_id}", + ) + + if not result.success: + return None + + try: + from lxml import etree + + ns = {"l": "https://xml-pipeline.org/ns/librarian/v1"} + root = etree.fromstring(result.data.get("results", "").encode()) + + chunk_elem = root if root.tag.endswith("chunk") else root.find("l:chunk", namespaces=ns) + if chunk_elem is None: + return None + + return RetrievedChunk( + chunk_id=chunk_elem.findtext("l:id", "", namespaces=ns), + file_path=chunk_elem.findtext("l:file-path", "", namespaces=ns), + name=chunk_elem.findtext("l:name", "", namespaces=ns), + chunk_type=chunk_elem.findtext("l:chunk-type", "", namespaces=ns), + language=chunk_elem.findtext("l:language", "", namespaces=ns), + start_line=int(chunk_elem.findtext("l:start-line", "0", namespaces=ns)), + end_line=int(chunk_elem.findtext("l:end-line", "0", namespaces=ns)), + content=chunk_elem.findtext("l:content", "", namespaces=ns), + docstring=chunk_elem.findtext("l:docstring", "", namespaces=ns), + signature=chunk_elem.findtext("l:signature", "", namespaces=ns), + score=1.0, + ) + + except Exception as e: + logger.warning(f"Failed to parse chunk: {e}") + return None