Project Files
src / rag / retriever.ts
/**
* Retriever
* High-level interface for indexing and retrieving documents
*/
import path from 'path';
import { promises as fsPromises } from 'fs';
import { VectorStore } from './vectorStore';
import { EmbeddingsClient } from './embeddings';
import { TextChunker } from './chunker';
import { BM25Index } from './bm25';
import { DocumentLoader } from '../documents/loader';
import { FileWatcher } from '../documents/fileWatcher';
import type { SearchResult } from '../types';
import { computeLanguageNeutrality, guessLanguage, languageMatchScore } from '../utils/language';
import type { IndexedDocument } from '../sources/types';
import { loadRemoteIndexedDocuments } from '../sources/registry';
import { defaultLmStudioHome } from '../sources/lmStudioConversationMarkdown';
import { isLmStudioConversationSource } from '../sources/adapters/lmStudioConversationSourceAdapter';
import { ragDebug } from '../utils/ragLogger.js';
const CONVERSATION_INDEX_MIN_AGE_MS = 2 * 60 * 1000;
export interface RetrieverConfig {
dbPath: string;
contentDirectories: string[];
embeddingModel: string;
embeddingBaseUrl: string;
embeddingApiKey?: string;
chunkSize: number;
chunkOverlap: number;
embeddingBatchSize?: number;
embeddingRequestTimeoutMs?: number;
retrievalLimit: number;
watchFiles: boolean;
hybridSearchWeight: number; // 0 = pure semantic, 1 = pure keyword, 0.3 = balanced
languageBiasStrength: number; // 0 = off, 1 = strongest (still soft)
remoteSources?: string[];
remoteFetchTimeoutMs?: number;
remoteMaxBytes?: number;
remoteMaxPages?: number;
githubToken?: string;
huggingFaceToken?: string;
activeChatId?: string;
}
function normalizeChatId(value: string | null | undefined): string | null {
if (typeof value !== 'string') return null;
const trimmed = value.trim();
return /^\d{13}$/.test(trimmed) ? trimmed : null;
}
function chatIdFromConversationFile(filePath: string): string | null {
const match = /(?:^|\/)(\d{13})\.conversation\.json$/i.exec(filePath);
return match ? match[1] : null;
}
function chatIdFromConversationSourceId(sourceId: string): string | null {
const match = /^lmstudio-conversation:\/\/(\d{13})$/i.exec(sourceId);
return match ? match[1] : null;
}
function isConversationCollectionSource(source: string): boolean {
return /^lmstudio-conversations?:\/\/(?:\d{13})?$/i.test(source.trim());
}
function conversationDirectoryFromSource(source: string): string | null {
const trimmed = source.trim();
if (isConversationCollectionSource(trimmed)) {
return path.join(defaultLmStudioHome(), 'conversations');
}
if (path.isAbsolute(trimmed) && path.basename(trimmed) === 'conversations') {
return trimmed;
}
if (path.isAbsolute(trimmed) && chatIdFromConversationFile(trimmed)) {
return path.dirname(trimmed);
}
return null;
}
export class Retriever {
private vectorStore: VectorStore;
private embeddings: EmbeddingsClient;
private bm25Index: BM25Index;
private config: RetrieverConfig;
private fileWatcher: FileWatcher | null = null;
private isInitialized = false;
private indexingQueue = new Set<string>();
private deferredConversationFiles = new Set<string>();
private conversationRetryTimers = new Map<string, ReturnType<typeof setTimeout>>();
private activeChatId: string | null = null;
private statusCallback: ((text: string) => void) | null = null;
private lastStatusText = '';
private lastStatusAt = 0;
constructor(config: RetrieverConfig) {
this.config = config;
this.activeChatId = normalizeChatId(config.activeChatId);
this.embeddings = new EmbeddingsClient({
baseUrl: config.embeddingBaseUrl,
model: config.embeddingModel,
apiKey: config.embeddingApiKey,
timeout: config.embeddingRequestTimeoutMs,
});
// BM25 index for keyword search
this.bm25Index = new BM25Index();
// We'll initialize vectorStore after getting embedding dimension
this.vectorStore = null as any;
}
setActiveChatId(chatId: string | null | undefined): void {
const nextChatId = normalizeChatId(chatId);
if (this.activeChatId === nextChatId) return;
const previousChatId = this.activeChatId;
this.activeChatId = nextChatId;
ragDebug('RAG', `Active chat changed: ${previousChatId ?? '(none)'} -> ${nextChatId ?? '(none)'}`);
void this.flushDeferredConversationFiles();
}
private async deferIfConversationNotReady(filePath: string): Promise<boolean> {
const chatId = chatIdFromConversationFile(filePath);
if (chatId && chatId === this.activeChatId) {
this.deferConversationFile(filePath, CONVERSATION_INDEX_MIN_AGE_MS, 'active conversation');
return true;
}
try {
const stat = await fsPromises.stat(filePath);
const ageMs = Date.now() - stat.mtimeMs;
if (ageMs < CONVERSATION_INDEX_MIN_AGE_MS) {
this.deferConversationFile(
filePath,
CONVERSATION_INDEX_MIN_AGE_MS - ageMs,
`conversation changed ${Math.ceil(ageMs / 1000)}s ago`
);
return true;
}
} catch {
this.deferredConversationFiles.delete(filePath);
this.cancelDeferredConversationFlush(filePath);
return true;
}
return false;
}
private deferConversationFile(filePath: string, delayMs: number, reason: string): void {
this.deferredConversationFiles.add(filePath);
const retryDelayMs = Math.max(1000, Math.ceil(delayMs));
ragDebug('RAG', `Deferring conversation index (${reason}, retry in ${Math.ceil(retryDelayMs / 1000)}s): ${filePath}`);
this.scheduleDeferredConversationFlush(filePath, retryDelayMs);
}
private scheduleDeferredConversationFlush(filePath: string, delayMs: number): void {
this.cancelDeferredConversationFlush(filePath);
const timer = setTimeout(() => {
this.conversationRetryTimers.delete(filePath);
void this.flushDeferredConversationFile(filePath);
}, delayMs);
this.conversationRetryTimers.set(filePath, timer);
}
private cancelDeferredConversationFlush(filePath: string): void {
const timer = this.conversationRetryTimers.get(filePath);
if (!timer) return;
clearTimeout(timer);
this.conversationRetryTimers.delete(filePath);
}
private isActiveConversationSourceId(sourceId: string): boolean {
const chatId = chatIdFromConversationSourceId(sourceId);
return Boolean(chatId && chatId === this.activeChatId);
}
private async flushDeferredConversationFiles(): Promise<void> {
if (this.deferredConversationFiles.size === 0) return;
const files = Array.from(this.deferredConversationFiles);
for (const filePath of files) {
await this.flushDeferredConversationFile(filePath);
}
}
private async flushDeferredConversationFile(filePath: string): Promise<void> {
if (!this.deferredConversationFiles.has(filePath)) return;
if (await this.deferIfConversationNotReady(filePath)) return;
this.deferredConversationFiles.delete(filePath);
this.cancelDeferredConversationFlush(filePath);
try {
await this.indexConversationSource(filePath);
} catch (error) {
console.warn(`[RAG] Failed to index deferred conversation ${filePath}:`, String(error));
}
}
private getConversationDirectories(): string[] {
const dirs = new Set<string>();
for (const source of [...this.config.contentDirectories, ...(this.config.remoteSources ?? [])]) {
const dir = conversationDirectoryFromSource(source);
if (dir) dirs.add(dir);
}
return Array.from(dirs);
}
private deleteChunksFromIndex(documentId: number): void {
const chunkIds = this.vectorStore.getChunkIdsByDocumentId(documentId);
this.bm25Index.removeDocumentChunks(chunkIds);
this.vectorStore.deleteChunksByDocumentId(documentId);
}
private deleteDocumentFromIndex(docPath: string): void {
const existingDoc = this.vectorStore.getDocumentByPath(docPath);
if (existingDoc) {
this.deleteChunksFromIndex(existingDoc.id);
}
this.vectorStore.deleteDocument(docPath);
}
/**
* Set a callback for status updates (for UI progress indicators)
*/
setStatusCallback(callback: ((text: string) => void) | null): void {
this.statusCallback = callback;
}
private updateStatus(text: string): void {
const now = Date.now();
if (text === this.lastStatusText && now - this.lastStatusAt < 5000) return;
this.lastStatusText = text;
this.lastStatusAt = now;
if (this.statusCallback) {
try {
this.statusCallback(text);
} catch {
// Non-fatal: UI update failed
}
}
}
/**
* Initialize the retriever and vector store
*/
async initialize(): Promise<void> {
if (this.isInitialized) return;
ragDebug('RAG', 'Initializing retriever...');
ragDebug('RAG', 'Configuration:');
ragDebug('RAG', `Content directories: ${this.config.contentDirectories.join(', ')}`);
ragDebug('RAG', `Remote sources: ${(this.config.remoteSources ?? []).join(', ') || '(none)'}`);
ragDebug('RAG', `Watch files: ${this.config.watchFiles}`);
ragDebug('RAG', `Embedding model: "${this.config.embeddingModel}"`);
ragDebug('RAG', `Embedding API URL: ${this.config.embeddingBaseUrl}`);
// Get embedding dimension - this will trigger a test embedding
ragDebug('RAG', 'Checking embedding model dimension...');
const dimension = await this.embeddings.getDimension();
ragDebug('RAG', `Embedding model confirmed - Dimension: ${dimension}`);
this.vectorStore = new VectorStore({
dbPath: this.config.dbPath,
embeddingDimension: dimension,
});
// Initialize the database
await this.vectorStore.init();
ragDebug('RAG', 'VectorStore initialized');
// Build BM25 index from existing chunks
this.rebuildBM25Index();
// Clean up orphaned documents (indexed but no longer exist on disk)
await this.cleanupOrphanedDocuments();
// Index existing local files in a controlled foreground pass. The watcher is
// only for later changes, so startup does not keep indexing after ready.
await this.indexLocalContentDirectories();
// Index remote sources as first-class documents.
await this.indexRemoteSources();
// Start file watcher if enabled
if (this.config.watchFiles && this.config.contentDirectories.length > 0) {
await this.setupFileWatcher();
} else {
ragDebug('RAG', 'FileWatcher disabled or no directories configured');
}
this.isInitialized = true;
ragDebug('RAG', 'Retriever initialized successfully');
}
/**
* Rebuild BM25 index from existing chunks in database
*/
private rebuildBM25Index(): void {
ragDebug('RAG', 'Rebuilding BM25 keyword index...');
this.bm25Index.clear();
const allChunks = this.vectorStore.getAllChunksForBM25();
for (const chunk of allChunks) {
this.bm25Index.addDocument(chunk.id, chunk.content);
}
const stats = this.bm25Index.getStats();
ragDebug('RAG', `BM25 index rebuilt: ${stats.documents} docs, ${stats.terms} unique terms, avg length=${stats.avgLength}`);
}
/**
* Remove documents from DB that no longer exist on disk
*/
private async cleanupOrphanedDocuments(): Promise<void> {
const { existsSync } = await import('fs');
const indexedPaths = this.vectorStore.getAllDocumentPaths();
const conversationDirs = this.getConversationDirectories();
let removedCount = 0;
for (const docPath of indexedPaths) {
const conversationChatId = chatIdFromConversationSourceId(docPath);
if (conversationChatId) {
const exists = conversationDirs.some((dir) => existsSync(path.join(dir, `${conversationChatId}.conversation.json`)));
if (!exists) {
ragDebug('RAG', `Removing orphaned conversation: ${docPath}`);
this.deleteDocumentFromIndex(docPath);
removedCount++;
}
continue;
}
if (/^[a-z][a-z0-9+.-]*:\/\//i.test(docPath)) continue;
if (!existsSync(docPath)) {
ragDebug('RAG', `Removing orphaned document: ${docPath}`);
this.deleteDocumentFromIndex(docPath);
removedCount++;
continue;
}
// Remove documents that are no longer under any configured content directory.
// When contentDirectories is empty (notesDirectory + contentDirectories both
// unconfigured), every local document is out-of-scope and must be removed.
const configuredDirs = this.config.contentDirectories;
const underConfiguredDir = configuredDirs.length > 0 &&
configuredDirs.some((dir) => docPath.startsWith(dir + path.sep) || docPath.startsWith(dir + '/'));
if (!underConfiguredDir) {
ragDebug('RAG', `Removing out-of-scope document: ${docPath}`);
this.deleteDocumentFromIndex(docPath);
removedCount++;
}
}
if (removedCount > 0) {
ragDebug('RAG', `Cleaned up ${removedCount} orphaned documents`);
}
}
/**
* Wait until all pending indexing operations are complete
* @param timeoutMs - Maximum time to wait; null/undefined means wait until indexing is done
* @param onProgress - Optional callback for progress updates
*/
async waitForIndexing(
timeoutMs: number | null = null,
onProgress?: (filesRemaining: number, currentFile?: string) => void
): Promise<void> {
const startTime = Date.now();
let lastProgressKey = '';
const hasTimeout = typeof timeoutMs === 'number' && Number.isFinite(timeoutMs) && timeoutMs > 0;
while (this.indexingQueue.size > 0) {
if (hasTimeout && Date.now() - startTime > timeoutMs) {
console.warn(`[RAG] Indexing timeout after ${timeoutMs}ms, ${this.indexingQueue.size} files still pending`);
break;
}
const filesRemaining = this.indexingQueue.size;
const currentFile = this.indexingQueue.values().next().value;
ragDebug('RAG', `Waiting for indexing to complete... (${filesRemaining} files remaining)`);
const progressKey = `${filesRemaining}:${currentFile ?? ''}`;
if (onProgress && progressKey !== lastProgressKey) {
lastProgressKey = progressKey;
try {
onProgress(filesRemaining, currentFile);
} catch {
// Non-fatal: UI update failed
}
}
await new Promise(resolve => setTimeout(resolve, 1000));
}
ragDebug('RAG', 'All indexing operations completed');
}
/**
* Setup file watcher for auto-indexing
*/
private async setupFileWatcher(): Promise<void> {
const directories = this.config.contentDirectories;
ragDebug('RAG', `Setting up FileWatcher for directories: ${directories.join(', ')}`);
this.fileWatcher = new FileWatcher({
directories,
ignoreInitial: true,
changeDebounceMs: 1500,
isSupportedFile: (filePath) => DocumentLoader.isSupported(filePath) || isLmStudioConversationSource(filePath),
onFileAdded: async (filePath) => {
ragDebug('RAG', `FileWatcher: File added - ${filePath}`);
if (isLmStudioConversationSource(filePath)) {
if (await this.deferIfConversationNotReady(filePath)) return;
await this.indexConversationSource(filePath);
} else {
await this.indexFile(filePath);
}
},
onFileChanged: async (filePath) => {
ragDebug('RAG', `FileWatcher: File changed - ${filePath}`);
if (isLmStudioConversationSource(filePath)) {
if (await this.deferIfConversationNotReady(filePath)) return;
await this.indexConversationSource(filePath);
} else {
await this.reindexFile(filePath);
}
},
onFileDeleted: async (filePath) => {
ragDebug('RAG', `FileWatcher: File deleted - ${filePath}`);
if (isLmStudioConversationSource(filePath)) {
this.deferredConversationFiles.delete(filePath);
this.cancelDeferredConversationFlush(filePath);
this.deleteConversationSource(filePath);
} else {
this.deleteDocumentFromIndex(filePath);
}
},
});
await this.fileWatcher.start();
ragDebug('RAG', 'FileWatcher is ready and has scanned all directories');
}
/**
* Index a single file
*/
async indexFile(filePath: string, onProgress?: (progress: number) => void): Promise<void> {
// Prevent duplicate indexing
if (this.indexingQueue.has(filePath)) {
ragDebug('RAG', `Skipping ${filePath} - already in queue`);
return;
}
this.indexingQueue.add(filePath);
const fileName = path.basename(filePath);
ragDebug('RAG', `Starting to index: ${filePath}`);
try {
// Check if file needs indexing by comparing file hash BEFORE expensive parsing
const existingDoc = this.vectorStore.getDocumentByPath(filePath);
// Quick file hash check (just reads file bytes, no parsing)
const { createHash } = await import('crypto');
const { readFileSync } = await import('fs');
const fileBuffer = readFileSync(filePath);
const currentFileHash = createHash('sha256').update(fileBuffer).digest('hex');
ragDebug('RAG', `Quick hash check for ${filePath}:`);
ragDebug('RAG', `Current file hash: ${currentFileHash.substring(0, 16)}...`);
ragDebug('RAG', `Stored hash: ${existingDoc ? existingDoc.hash.substring(0, 16) + '...' : 'none'}`);
// Skip early if hash matches - no need for expensive parsing!
if (existingDoc && existingDoc.hash === currentFileHash) {
ragDebug('RAG', `Fast skip: ${filePath} - already indexed (hash unchanged)`);
return;
}
// Now do the expensive document parsing
const startTime = Date.now();
this.updateStatus(`Parsing: ${fileName}...`);
ragDebug('RAG', `Loading document (hash changed or new): ${filePath}`);
const parsed = await DocumentLoader.load(filePath, {
extractTables: true,
onProgress,
});
const parseTime = Math.floor((Date.now() - startTime) / 1000);
ragDebug('RAG', `Document loaded: ${filePath}, content length=${parsed.content.length}, time=${parseTime}s`);
this.updateStatus(`Indexing: ${fileName}`);
// Use the file hash we already calculated (more reliable than content-based hash)
parsed.hash = currentFileHash;
// Upsert document
ragDebug('RAG', `Upserting document to DB: ${filePath}`);
const documentId = this.vectorStore.upsertDocument({
path: filePath,
hash: parsed.hash,
title: parsed.metadata.title,
metadata: parsed.metadata,
});
ragDebug('RAG', `Document upserted with ID: ${documentId}`);
// Delete old chunks if re-indexing
if (existingDoc) {
ragDebug('RAG', `Deleting old chunks for document ${documentId}`);
this.deleteChunksFromIndex(documentId);
}
// Chunk the content
this.updateStatus(`Chunking: ${fileName}`);
ragDebug('RAG', `Chunking content for: ${filePath}`);
const chunker = new TextChunker({
chunkSize: this.config.chunkSize,
chunkOverlap: this.config.chunkOverlap,
documentPath: filePath,
documentId,
});
const chunks = chunker.chunk(parsed.content);
ragDebug('RAG', `Generated ${chunks.length} chunks for ${filePath}`);
if (chunks.length === 0) {
console.warn(`[RAG] No chunks generated for ${filePath}`);
return;
}
// Build a prefix from title + tags to prepend to each chunk for embedding and BM25.
// This mirrors the playbook indexer strategy so frontmatter metadata is searchable.
const docTitle: string = parsed.metadata.title ?? path.basename(filePath, path.extname(filePath));
const rawTags = parsed.metadata.tags;
const docTags: string[] = Array.isArray(rawTags)
? rawTags.map(String).filter(Boolean)
: typeof rawTags === 'string' && rawTags.trim()
? [rawTags.trim()]
: [];
const metaPrefix = docTags.length > 0
? `${docTitle}\ntags: ${docTags.join(', ')}\n`
: `${docTitle}\n`;
// Generate embeddings in batches
ragDebug('RAG', `Generating embeddings for ${chunks.length} chunks...`);
const batchSize = this.embeddingBatchSize;
let lastBatchStatusAt = 0;
for (let i = 0; i < chunks.length; i += batchSize) {
const batch = chunks.slice(i, i + batchSize);
const texts = batch.map(c => `${metaPrefix}${c.content}`);
const batchNum = Math.floor(i/batchSize) + 1;
const totalBatches = Math.ceil(chunks.length/batchSize);
const now = Date.now();
if (batchNum === 1 || batchNum === totalBatches || now - lastBatchStatusAt >= 5000) {
this.updateStatus(`Embedding: ${fileName} (${batchNum}/${totalBatches})`);
lastBatchStatusAt = now;
}
ragDebug('RAG', `Embedding batch ${batchNum}/${totalBatches}`);
const embeddings = await this.embeddings.embed(texts);
// Attach embeddings to chunks
const chunksWithEmbeddings = batch.map((chunk, idx) => ({
...chunk,
embedding: embeddings[idx],
}));
ragDebug('RAG', `Inserting ${chunksWithEmbeddings.length} chunks into DB`);
const insertedIds = this.vectorStore.insertChunks(chunksWithEmbeddings);
// Also index in BM25 for keyword search (include title+tags prefix so they are term-searchable)
for (let j = 0; j < batch.length; j++) {
this.bm25Index.addDocument(insertedIds[j], `${metaPrefix}${batch[j].content}`);
}
}
ragDebug('RAG', `Successfully indexed ${filePath}: ${chunks.length} chunks`);
this.updateStatus(`Indexed: ${fileName} (${chunks.length} chunks)`);
const bm25Stats = this.bm25Index.getStats();
ragDebug('RAG', `BM25 index: ${bm25Stats.documents} docs, ${bm25Stats.terms} terms`);
} catch (error) {
const reason = error instanceof Error ? error.message : String(error);
console.warn(`[RAG] Skipping ${filePath}: ${reason}`);
if (error instanceof Error && error.stack) {
ragDebug('RAG', `Index error stack for ${filePath}: ${error.stack}`);
}
} finally {
this.indexingQueue.delete(filePath);
ragDebug('RAG', `Removed ${filePath} from queue. Queue size: ${this.indexingQueue.size}`);
}
}
private async indexRemoteSources(): Promise<void> {
const remoteSources = Array.from(new Set([
...(this.config.remoteSources ?? []),
].filter((source) => !isLmStudioConversationSource(source))));
if (remoteSources.length === 0) return;
this.updateStatus(`Loading ${remoteSources.length} remote source${remoteSources.length !== 1 ? 's' : ''}...`);
const docs = await loadRemoteIndexedDocuments({
remoteSources,
fetchTimeoutMs: this.config.remoteFetchTimeoutMs ?? 10000,
maxBytes: this.config.remoteMaxBytes ?? 15728640,
maxPages: this.config.remoteMaxPages ?? 50,
githubToken: this.config.githubToken,
huggingFaceToken: this.config.huggingFaceToken,
});
ragDebug('RAG', `Remote source documents loaded: ${docs.length}`);
for (const doc of docs) {
try {
await this.indexIndexedDocument(doc);
} catch (err) {
console.warn(`[RAG] Failed to index remote document ${doc.sourceId}:`, String(err));
}
}
}
private async indexLocalContentDirectories(): Promise<void> {
const files: string[] = [];
const seen = new Set<string>();
for (const directory of this.config.contentDirectories) {
const directConversationFile = chatIdFromConversationFile(directory);
const filePaths = directConversationFile
? [directory]
: await this.enumerateIndexableFiles(directory);
for (const filePath of filePaths) {
if (seen.has(filePath)) continue;
seen.add(filePath);
files.push(filePath);
}
}
if (files.length === 0) return;
this.updateStatus(`Checking ${files.length} local document${files.length !== 1 ? 's' : ''}...`);
for (let index = 0; index < files.length; index++) {
const filePath = files[index];
this.updateStatus(`Checking: ${path.basename(filePath)} (${index + 1}/${files.length})`);
if (isLmStudioConversationSource(filePath)) {
if (await this.deferIfConversationNotReady(filePath)) continue;
await this.indexConversationSource(filePath);
} else {
await this.indexFile(filePath);
}
}
}
private async enumerateIndexableFiles(root: string): Promise<string[]> {
const files: string[] = [];
const visit = async (entryPath: string): Promise<void> => {
let entries;
try {
entries = await fsPromises.readdir(entryPath, { withFileTypes: true });
} catch {
return;
}
for (const entry of entries) {
if (entry.name === 'node_modules' || entry.name.startsWith('.')) continue;
const childPath = path.join(entryPath, entry.name);
if (entry.isDirectory()) {
await visit(childPath);
} else if (entry.isFile() && (DocumentLoader.isSupported(childPath) || isLmStudioConversationSource(childPath))) {
files.push(childPath);
}
}
};
await visit(root);
return files.sort();
}
private async indexConversationSource(source: string): Promise<void> {
const docs = await loadRemoteIndexedDocuments({
remoteSources: [source],
fetchTimeoutMs: this.config.remoteFetchTimeoutMs ?? 10000,
maxBytes: this.config.remoteMaxBytes ?? 15728640,
maxPages: 1,
githubToken: this.config.githubToken,
huggingFaceToken: this.config.huggingFaceToken,
});
for (const doc of docs) {
try {
await this.indexIndexedDocument(doc);
} catch (err) {
console.warn(`[RAG] Failed to index conversation document ${doc.sourceId}:`, String(err));
}
}
}
private deleteConversationSource(filePath: string): void {
const match = /(?:^|\/)(\d{13})\.conversation\.json$/i.exec(filePath);
if (!match) return;
this.deleteDocumentFromIndex(`lmstudio-conversation://${match[1]}`);
}
async indexIndexedDocument(doc: IndexedDocument): Promise<void> {
if (this.indexingQueue.has(doc.sourceId)) {
ragDebug('RAG', `Skipping remote ${doc.sourceId} - already in queue`);
return;
}
this.indexingQueue.add(doc.sourceId);
try {
const existingDoc = this.vectorStore.getDocumentByPath(doc.sourceId);
if (existingDoc && existingDoc.hash === doc.contentHash) {
ragDebug('RAG', `Fast skip remote: ${doc.sourceId} - already indexed (hash unchanged)`);
return;
}
const statusKind = doc.sourceKind === 'conversation' ? 'chat' : 'remote';
this.updateStatus(`Indexing ${statusKind}: ${doc.title ?? doc.sourceId}`);
ragDebug('RAG', `Upserting remote document to DB: ${doc.sourceId}`);
const documentId = this.vectorStore.upsertDocument({
path: doc.sourceId,
hash: doc.contentHash,
title: doc.title,
metadata: doc.metadata,
});
if (existingDoc) {
this.deleteChunksFromIndex(documentId);
}
const chunker = new TextChunker({
chunkSize: this.config.chunkSize,
chunkOverlap: this.config.chunkOverlap,
documentPath: doc.sourceId,
documentId,
});
const chunks = chunker.chunk(doc.content).map((chunk) => ({
...chunk,
metadata: {
...chunk.metadata,
sourceKind: doc.sourceKind,
canonicalUrl: doc.canonicalUrl,
baseUrl: doc.baseUrl,
version: doc.version,
...(doc.sourceKind === 'conversation' ? { imageRefs: doc.imageRefs } : {}),
},
}));
if (chunks.length === 0) {
console.warn(`[RAG] No chunks generated for remote document ${doc.sourceId}`);
return;
}
const batchSize = this.embeddingBatchSize;
let lastBatchStatusAt = 0;
for (let i = 0; i < chunks.length; i += batchSize) {
const batch = chunks.slice(i, i + batchSize);
const batchNum = Math.floor(i / batchSize) + 1;
const totalBatches = Math.ceil(chunks.length / batchSize);
const now = Date.now();
if (batchNum === 1 || batchNum === totalBatches || now - lastBatchStatusAt >= 5000) {
this.updateStatus(`Embedding ${statusKind}: ${doc.title ?? doc.sourceId} (${batchNum}/${totalBatches})`);
lastBatchStatusAt = now;
}
const embeddings = await this.embeddings.embed(batch.map(c => c.content));
const chunksWithEmbeddings = batch.map((chunk, idx) => ({
...chunk,
embedding: embeddings[idx],
}));
const insertedIds = this.vectorStore.insertChunks(chunksWithEmbeddings);
for (let j = 0; j < batch.length; j++) {
this.bm25Index.addDocument(insertedIds[j], batch[j].content);
}
}
ragDebug('RAG', `Successfully indexed remote ${doc.sourceId}: ${chunks.length} chunks`);
} finally {
this.indexingQueue.delete(doc.sourceId);
ragDebug('RAG', `Removed remote ${doc.sourceId} from queue. Queue size: ${this.indexingQueue.size}`);
}
}
/**
* Re-index a file (delete old chunks and index again)
*/
async reindexFile(filePath: string): Promise<void> {
await this.indexFile(filePath);
}
/**
* Hybrid Search - combines semantic (embedding) and keyword (BM25) search
* Uses Reciprocal Rank Fusion (RRF) to combine results from both methods
*/
async search(query: string, limit?: number): Promise<SearchResult[]> {
if (!this.isInitialized) {
throw new Error('Retriever not initialized. Call initialize() first.');
}
const actualLimit = limit ?? this.config.retrievalLimit;
const hybridWeight = this.config.hybridSearchWeight;
ragDebug('RAG', `Hybrid Search query: "${query.substring(0, 50)}..."`);
ragDebug('RAG', `Search params: limit=${actualLimit}, hybridWeight=${hybridWeight}`);
// 1. Semantic Search (Embeddings)
ragDebug('RAG', 'Generating query embedding...');
const queryEmbedding = await this.embeddings.embedSingle(query);
// Fetch more candidates for fusion, use low threshold to get diverse results
const semanticResults = this.vectorStore.search(
queryEmbedding,
actualLimit * 3,
0.05 // Very low - RRF ranking decides what's relevant
);
ragDebug('RAG', `Semantic search: ${semanticResults.length} results`);
// 2. BM25 Keyword Search
const bm25Results = this.bm25Index.search(query, actualLimit * 3);
ragDebug('RAG', `BM25 keyword search: ${bm25Results.length} results`);
// 3. Reciprocal Rank Fusion (RRF)
// Score = sum of 1/(k + rank) across both lists
const k = 60; // RRF constant (standard value)
const fusedScores = new Map<number, { score: number; semanticRank?: number; bm25Rank?: number }>();
// Add semantic scores (key by chunk.id)
semanticResults.forEach((result, idx) => {
const rrf = 1 / (k + idx + 1);
const weighted = rrf * (1 - hybridWeight);
fusedScores.set(result.chunk.id, {
score: weighted,
semanticRank: idx + 1,
});
});
// Add BM25 scores
bm25Results.forEach((result, idx) => {
const rrf = 1 / (k + idx + 1);
const weighted = rrf * hybridWeight;
const existing = fusedScores.get(result.id);
if (existing) {
existing.score += weighted;
existing.bm25Rank = idx + 1;
} else {
fusedScores.set(result.id, {
score: weighted,
bm25Rank: idx + 1,
});
}
});
// Sort by fused score
const sortedFused = Array.from(fusedScores.entries())
.sort((a, b) => b[1].score - a[1].score)
.slice(0, actualLimit);
// Build final results with original data
const semanticMap = new Map(semanticResults.map(r => [r.chunk.id, r]));
const finalResults: SearchResult[] = [];
for (const [chunkId, fusedData] of sortedFused) {
// Try to get from semantic results first (has more data)
const semantic = semanticMap.get(chunkId);
if (semantic) {
// In hybrid mode, trust the RRF ranking - don't double-filter by threshold
// The threshold was already applied in the initial semantic search
finalResults.push({
chunk: {
...semantic.chunk,
metadata: {
...semantic.chunk.metadata,
_hybridScore: fusedData.score,
_semanticRank: fusedData.semanticRank,
_bm25Rank: fusedData.bm25Rank,
},
},
document: semantic.document,
score: semantic.score,
distance: semantic.distance,
});
} else if (fusedData.bm25Rank) {
// BM25-only result - fetch from DB (these are keyword matches not found by semantic)
const chunkData = this.vectorStore.getChunkById(chunkId);
const docData = chunkData ? this.vectorStore.getDocumentById(chunkData.documentId) : null;
if (chunkData && docData) {
// Ensure required metadata fields
const baseMetadata = {
documentPath: docData.path,
chunkIndex: (chunkData.metadata.chunkIndex as number) ?? 0,
...chunkData.metadata,
};
finalResults.push({
chunk: {
id: chunkId,
documentId: chunkData.documentId,
content: chunkData.content,
metadata: {
...baseMetadata,
_hybridScore: fusedData.score,
_bm25Rank: fusedData.bm25Rank,
_bm25Only: true,
},
},
document: docData,
score: 0.5, // Synthetic score for BM25-only
distance: 0.5,
});
}
}
}
ragDebug('RAG', `Hybrid search: ${finalResults.length} final results`);
// Optional soft language bias re-ranking.
// Goal: prefer chunks in the conversation language without sacrificing recall.
// This is intentionally conservative: it never filters results, only adjusts ordering slightly.
{
const strengthRaw = Number(this.config.languageBiasStrength ?? 0);
const strength = Number.isFinite(strengthRaw) ? Math.max(0, Math.min(1, strengthRaw)) : 0;
if (strength > 0 && finalResults.length > 1) {
const q = guessLanguage(query);
const queryLang = q.language;
const queryConf = q.confidence;
// If query is mixed/unknown, bias is unlikely to help; keep it near zero.
const effectiveQueryConf = queryLang === 'unknown' ? 0 : queryLang === 'mixed' ? 0.15 : queryConf;
if (effectiveQueryConf > 0) {
// RRF hybrid scores are around ~0.01. Keep max bias small so we don't hurt relevance.
const biasBase = 0.003; // max additive delta when strength=1 and signals are strong
for (const r of finalResults) {
const hybridScore = (r.chunk.metadata._hybridScore as number | undefined) ?? 0;
// Use chunk content (not doc title) to decide neutrality + language.
const content = r.chunk.content ?? '';
const neutrality = computeLanguageNeutrality(content); // 0..1
const docGuess = guessLanguage(content);
const match = languageMatchScore(queryLang, docGuess.language);
const docConf = docGuess.confidence;
const effectiveDocConf = docGuess.language === 'unknown' ? 0 : docGuess.language === 'mixed' ? 0.15 : docConf;
const bias = strength * biasBase * effectiveQueryConf * effectiveDocConf * neutrality * match;
r.chunk.metadata._languageQueryLang = queryLang;
r.chunk.metadata._languageChunkLang = docGuess.language;
r.chunk.metadata._languageBias = bias;
r.chunk.metadata._hybridScoreBiased = hybridScore + bias;
}
finalResults.sort((a, b) => {
const as = ((a.chunk.metadata._hybridScoreBiased as number | undefined) ?? (a.chunk.metadata._hybridScore as number | undefined) ?? 0);
const bs = ((b.chunk.metadata._hybridScoreBiased as number | undefined) ?? (b.chunk.metadata._hybridScore as number | undefined) ?? 0);
return bs - as;
});
}
}
}
const visibleResults = this.activeChatId
? finalResults.filter((result) => !this.isActiveConversationSourceId(result.document.path))
: finalResults;
// Log top results for debugging
visibleResults.slice(0, 3).forEach((r, i) => {
const ranks = [];
if (r.chunk.metadata._semanticRank) ranks.push(`sem:#${r.chunk.metadata._semanticRank}`);
if (r.chunk.metadata._bm25Rank) ranks.push(`bm25:#${r.chunk.metadata._bm25Rank}`);
const hs = (r.chunk.metadata._hybridScore as number) ?? 0;
const hsb = (r.chunk.metadata._hybridScoreBiased as number | undefined);
const lb = (r.chunk.metadata._languageBias as number | undefined);
if (hsb != null && lb != null) {
ragDebug('RAG', `#${i + 1}: ${ranks.join(', ')}, hybrid=${hs.toFixed(4)}, biased=${hsb.toFixed(4)} (langBias=${lb.toFixed(4)})`);
} else {
ragDebug('RAG', `#${i + 1}: ${ranks.join(', ')}, hybrid=${hs.toFixed(4)}`);
}
});
return visibleResults;
}
/**
* Get statistics
*/
getStats() {
return {
...this.vectorStore.getStats(),
indexingQueue: this.indexingQueue.size,
watcherActive: this.fileWatcher?.getStatus() ?? false,
watchedDirectories: this.config.contentDirectories,
};
}
private get embeddingBatchSize(): number {
const raw = Number(this.config.embeddingBatchSize ?? 1);
if (!Number.isFinite(raw)) return 1;
return Math.max(1, Math.min(10, Math.floor(raw)));
}
/**
* Return the currently configured content directories.
*/
getContentDirectories(): string[] {
return this.config.contentDirectories;
}
/**
* Update the set of watched content directories at runtime.
* - Documents under removed directories are pruned from the index immediately.
* - The file watcher is restarted with the new directory set.
* - Files in newly added directories are indexed.
*/
async updateContentDirectories(newDirs: string[]): Promise<void> {
const oldDirs = this.config.contentDirectories;
const newDirSet = new Set(newDirs);
const oldDirSet = new Set(oldDirs);
// Prune documents from removed directories.
const removedDirs = oldDirs.filter((d) => !newDirSet.has(d));
if (removedDirs.length > 0) {
const indexedPaths = this.vectorStore.getAllDocumentPaths();
for (const docPath of indexedPaths) {
if (/^[a-z][a-z0-9+.-]*:\/\//i.test(docPath)) continue;
if (removedDirs.some((dir) => docPath.startsWith(dir + path.sep) || docPath.startsWith(dir + '/'))) {
ragDebug('RAG', `updateContentDirectories: removing doc under removed dir: ${docPath}`);
this.deleteDocumentFromIndex(docPath);
}
}
}
const addedDirs = newDirs.filter((d) => !oldDirSet.has(d));
// Update config before restarting the watcher and indexing new dirs.
this.config.contentDirectories = newDirs;
// Restart file watcher with the new directory set.
if (this.fileWatcher) {
await this.fileWatcher.stop();
this.fileWatcher = null;
}
if (this.config.watchFiles && newDirs.length > 0) {
await this.setupFileWatcher();
}
// Index files in newly added directories.
if (addedDirs.length > 0) {
this.config.contentDirectories = addedDirs;
try {
await this.indexLocalContentDirectories();
} finally {
this.config.contentDirectories = newDirs;
}
}
}
/**
* Clear all indexed data
*/
clearAll(): void {
this.vectorStore.clearAll();
}
/**
* Shutdown retriever and cleanup resources
*/
async shutdown(): Promise<void> {
if (this.fileWatcher) {
await this.fileWatcher.stop();
}
this.vectorStore.close();
this.isInitialized = false;
}
}