Project Files
src / tools / rag_tool.py
"""
tools/rag_tool.py — MCP tool schema + handler for rag_search.
"""
from __future__ import annotations
from mcp import types
from config import DEFAULT_MIN_SCORE, DEFAULT_TOP_K
from src.retrieval.engine import RAGEngine
from src.utils.cache import TTLCache
from src.utils.logging import get_logger
log = get_logger("tools.rag")
def build_rag_tool_schema(top_k: int, min_score: float, engine: RAGEngine) -> types.Tool:
"""Return the rag_search Tool definition exposed to the LLM."""
cfg = engine.model_cfg
flags = []
if engine.use_hybrid:
flags.append("hybrid BM25+ dense retrieval")
if engine._reranker is not None:
flags.append("cross-encoder reranking")
if engine.use_hyde:
flags.append("HyDE query expansion")
flag_str = " · ".join(flags) if flags else "dense retrieval"
return types.Tool(
name="rag_search",
description=(
f"Search the local document knowledge base using semantic retrieval. "
f"Features: {flag_str}. "
f"Embedding model: {cfg['label']}. "
f"Chunking: {engine.chunk_strategy}. "
"Use this for any question that may be answered by the loaded documents. "
"Returns ranked chunks with source file attribution."
),
inputSchema={
"type": "object",
"properties": {
"query": {
"type": "string",
"description": (
"The question or search query to look up in the document knowledge base."
),
},
"top_k": {
"type": "integer",
"description": (
f"Number of document chunks to return. "
f"Server default: {top_k}. Max recommended: 20."
),
},
"min_score": {
"type": "number",
"description": (
f"Minimum similarity score threshold [0.0–1.0]. "
f"Server default: {min_score}. "
"Raise this (e.g. 0.4) to filter weak matches."
),
},
},
"required": ["query"],
},
)
async def handle_rag_search(
arguments: dict,
engine: RAGEngine,
cache: TTLCache[str],
cache_ttl: float,
) -> list[types.TextContent]:
"""Execute a rag_search tool call."""
query = arguments.get("query", "").strip()
top_k = arguments.get("top_k")
min_score = arguments.get("min_score")
if not query:
return [types.TextContent(type="text", text="Error: 'query' must not be empty.")]
# Dedup cache
cached = cache.get(query)
if cached is not None:
log.info("RAG cache hit: %s…", query[:60])
return [
types.TextContent(
type="text",
text=f"[Cached result — TTL {cache_ttl:.0f}s]\n\n{cached}",
)
]
results = await engine.search(query, top_k=top_k, min_score=min_score)
if not results:
return [
types.TextContent(
type="text",
text=(
"No relevant documents found for this query.\n"
"Suggestions:\n"
"• Lower `min_score` (e.g. 0.1)\n"
"• Increase `top_k`\n"
"• Rephrase the query\n"
"• Check that documents are indexed (ingest completes at startup)"
),
)
]
has_rerank = "rerank_score" in results[0]
lines = [f"## RAG Results — '{query}'\n"]
for i, r in enumerate(results, start=1):
score_parts = [f"dense={r['score']:.3f}"]
if has_rerank:
score_parts.append(f"rerank={r['rerank_score']:.3f}")
score_str = " | ".join(score_parts)
lines.append(f"### [{i}] {r['source']} ({score_str})")
lines.append(r["text"])
lines.append("\n---\n")
output = "\n".join(lines)
cache.set(query, output)
return [types.TextContent(type="text", text=output)]