Project Files
src / swarm / orchestrator.ts
/**
* @file swarm/orchestrator.ts
* The swarm orchestrator with:
* - Worker fan-out: each role spawns N sub-workers in parallel
* - Multi-lane DDG rate limiters: true parallel search across lanes
* - 10 specialized worker roles (up from 5)
* - Adaptive collection with stagnation + coverage termination
*/
import { runWorker, SharedCrawlState } from "./worker";
import {
buildQueryPlan,
buildAdaptiveGapFill,
summariseFindings,
} from "../planning/planner";
import { detectCoveredDimensions, DIMENSIONS } from "../planning/dimensions";
import {
ResearchConfig,
SwarmTask,
WorkerResult,
CrawledSource,
WorkerRole,
DynamicWorkerSpec,
AgentMessage,
StatusFn,
WarnFn,
} from "../types";
import { DepthProfile } from "../constants";
import { DdgRateLimiter, DdgLimiterPool, resetThrottle } from "../net/ddg";
class MutableCrawlState implements SharedCrawlState {
private readonly _visitedUrls = new Set<string>();
private readonly _contentHashes = new Set<string>();
private readonly _domainCounts = new Map<string, number>();
private readonly _discoveries: Array<{
url: string;
title: string;
fromWorker: string;
}> = [];
get visitedUrls(): ReadonlySet<string> {
return this._visitedUrls;
}
get contentHashes(): ReadonlySet<string> {
return this._contentHashes;
}
get domainCounts(): ReadonlyMap<string, number> {
return this._domainCounts;
}
addVisited(url: string): void {
this._visitedUrls.add(url);
}
addHash(hash: string): void {
this._contentHashes.add(hash);
}
incrementDomain(url: string): void {
const host = safeHostname(url);
if (host)
this._domainCounts.set(host, (this._domainCounts.get(host) ?? 0) + 1);
}
domainCount(url: string): number {
return this._domainCounts.get(safeHostname(url)) ?? 0;
}
pushDiscovery(url: string, title: string, fromWorker: string): void {
if (!this._visitedUrls.has(url)) {
this._discoveries.push({ url, title, fromWorker });
}
}
drainDiscoveries(
limit: number,
): ReadonlyArray<{ url: string; title: string }> {
const results: Array<{ url: string; title: string }> = [];
while (results.length < limit && this._discoveries.length > 0) {
const item = this._discoveries.shift()!;
if (!this._visitedUrls.has(item.url)) {
results.push({ url: item.url, title: item.title });
}
}
return results;
}
}
/** Core roles always used. */
const CORE_ROLES: ReadonlyArray<WorkerRole> = [
"breadth",
"depth",
"recency",
"academic",
"critical",
];
/** Extended roles added for deep/deeper/exhaustive presets. */
const EXTENDED_ROLES: ReadonlyArray<WorkerRole> = [
"statistical",
"regulatory",
"technical",
"primary",
"comparative",
];
const ROLE_LABELS: Readonly<Record<WorkerRole, string>> = {
breadth: "Breadth",
depth: "Depth",
recency: "Recency",
academic: "Academic",
critical: "Critical",
statistical: "Statistical/Data",
regulatory: "Regulatory/Policy",
technical: "Technical Deep-Dive",
primary: "Primary Sources",
comparative: "Comparative Analysis",
};
function rolesForProfile(profile: DepthProfile): ReadonlyArray<WorkerRole> {
if (profile.depthRounds >= 10) return [...CORE_ROLES, ...EXTENDED_ROLES];
if (profile.depthRounds >= 5)
return [...CORE_ROLES, "technical", "comparative", "statistical"];
return [...CORE_ROLES];
}
function buildTaskBase(
profile: DepthProfile,
cfg: ResearchConfig,
): Pick<
SwarmTask,
| "contentLimit"
| "safeSearch"
| "searchResultsPerQuery"
| "maxPagesPerDomain"
| "maxLinksToEvaluate"
| "maxLinksToFollow"
| "candidatePoolMultiplier"
| "workerConcurrency"
| "minRelevanceScore"
| "maxOutlinksPerPage"
| "searchPages"
| "extraEngines"
| "linkCrawlDepth"
| "queryMutationThreshold"
| "enableLocalSources"
| "localCollectionIds"
| "roleCollectionMap"
> {
return {
contentLimit: cfg.contentLimitPerPage,
safeSearch: cfg.safeSearch,
searchResultsPerQuery: profile.searchResultsPerQuery,
maxPagesPerDomain: profile.maxPagesPerDomain,
maxLinksToEvaluate: profile.maxLinksToEvaluate,
maxLinksToFollow: profile.maxLinksToFollow,
candidatePoolMultiplier: profile.candidatePoolMultiplier,
workerConcurrency: profile.workerConcurrency,
minRelevanceScore: profile.minRelevanceScore,
maxOutlinksPerPage: profile.maxOutlinksPerPage,
searchPages: profile.searchPages,
extraEngines: profile.extraEngines,
linkCrawlDepth: profile.linkCrawlDepth,
queryMutationThreshold: profile.queryMutationThreshold,
enableLocalSources: cfg.enableLocalSources,
localCollectionIds: cfg.localCollectionIds,
roleCollectionMap: cfg.roleCollectionMap,
};
}
function buildDynamicTask(
spec: DynamicWorkerSpec,
profile: DepthProfile,
cfg: ResearchConfig,
subIdx: number = 0,
): SwarmTask {
const proportional = Math.round(
profile.pageBudgetPerWorker * spec.budgetWeight * 2,
);
const minBudget = Math.ceil(profile.pageBudgetPerWorker / 2);
return {
...buildTaskBase(profile, cfg),
id: `${spec.role}-${spec.label.slice(0, 20)}-s${subIdx}-${Date.now()}`,
role: spec.role,
label: subIdx > 0 ? `${spec.label} #${subIdx + 1}` : spec.label,
queries: spec.queries,
pageBudget: Math.max(proportional, minBudget),
followLinks: cfg.enableLinkFollowing && spec.followLinks,
preferredTiers: spec.preferredTiers,
};
}
function buildStaticTask(
role: WorkerRole,
queries: ReadonlyArray<string>,
profile: DepthProfile,
cfg: ResearchConfig,
subIdx: number = 0,
): SwarmTask {
const followRoles: ReadonlyArray<WorkerRole> = [
"depth",
"academic",
"technical",
"primary",
];
const academicTiers = ["academic", "government", "reference"] as const;
return {
...buildTaskBase(profile, cfg),
id: `${role}-s${subIdx}-${Date.now()}`,
role,
label:
subIdx > 0 ? `${ROLE_LABELS[role]} #${subIdx + 1}` : ROLE_LABELS[role],
queries,
pageBudget: profile.pageBudgetPerWorker,
followLinks: cfg.enableLinkFollowing && followRoles.includes(role),
preferredTiers:
role === "academic" || role === "regulatory" ? academicTiers : undefined,
};
}
function fanOutQueries(
queries: ReadonlyArray<string>,
fanOut: number,
): ReadonlyArray<ReadonlyArray<string>> {
if (fanOut <= 1 || queries.length <= 2) return [queries];
const groups: string[][] = [];
for (let i = 0; i < fanOut; i++) groups.push([]);
for (let i = 0; i < queries.length; i++) {
groups[i % fanOut].push(queries[i]);
}
return groups.filter((g) => g.length > 0);
}
export interface OrchestratorResult {
readonly sources: ReadonlyArray<CrawledSource>;
readonly queriesUsed: ReadonlyArray<string>;
readonly workerErrors: ReadonlyArray<string>;
readonly usedAI: boolean;
readonly topicKeywords: ReadonlyArray<string>;
}
export async function runSwarm(
cfg: ResearchConfig,
profile: DepthProfile,
status: StatusFn,
warn: WarnFn,
signal: AbortSignal,
): Promise<OrchestratorResult> {
const state = new MutableCrawlState();
const allSources: CrawledSource[] = [];
const allQueries: string[] = [];
const allErrors: string[] = [];
let usedAI = false;
resetThrottle();
const pool = new DdgLimiterPool(profile.searchLanes, profile.ddgRateLimitMs);
status(
`\n Launching swarm for: "${cfg.topic}" [${cfg.depthPreset} — ` +
`${profile.depthRounds} rounds, ${profile.pageBudgetPerWorker} pages/worker, ` +
`${profile.searchLanes} search lanes, fan-out ×${profile.workerFanOut}` +
`${cfg.enableLocalSources ? ", local sources enabled" : ""}]`,
);
const plan = await buildQueryPlan(
cfg.topic,
cfg.focusAreas,
cfg.enableAIPlanning,
status,
profile,
);
usedAI = plan.usedAI;
let round1Tasks: SwarmTask[] = [];
if (plan.dynamicSpecs && plan.dynamicSpecs.length >= 3) {
for (const spec of plan.dynamicSpecs) {
if (profile.workerFanOut > 1 && spec.queries.length > 2) {
const queryGroups = fanOutQueries(spec.queries, profile.workerFanOut);
for (let si = 0; si < queryGroups.length; si++) {
const subSpec = { ...spec, queries: queryGroups[si] };
round1Tasks.push(buildDynamicTask(subSpec, profile, cfg, si));
}
} else {
round1Tasks.push(buildDynamicTask(spec, profile, cfg));
}
}
status(
`\n ${round1Tasks.length} AI-decomposed workers (with fan-out) launching in parallel…`,
);
} else {
const roles = rolesForProfile(profile);
for (const role of roles) {
const roleQueries = plan.queriesByRole[role] ?? [];
if (roleQueries.length === 0) continue;
if (profile.workerFanOut > 1 && roleQueries.length > 2) {
const queryGroups = fanOutQueries(roleQueries, profile.workerFanOut);
for (let si = 0; si < queryGroups.length; si++) {
round1Tasks.push(
buildStaticTask(role, queryGroups[si], profile, cfg, si),
);
}
} else {
round1Tasks.push(buildStaticTask(role, roleQueries, profile, cfg));
}
}
status(
`\n ${round1Tasks.length} workers (${rolesForProfile(profile).length} roles × fan-out) launching in parallel…`,
);
}
const round1Results = await Promise.all(
round1Tasks.map((task, idx) => {
const limiter = pool.next();
return runWorker(
task,
state,
signal,
status,
warn,
plan.topicKeywords,
limiter,
).catch((err) => {
if (!(err instanceof DOMException && err.name === "AbortError")) {
warn(
`Worker ${task.label} crashed: ${err instanceof Error ? err.message : String(err)}`,
);
}
return {
taskId: task.id,
role: task.role,
label: task.label,
sources: [] as CrawledSource[],
queries: [] as string[],
errors: [String(err)],
} satisfies WorkerResult;
});
}),
);
aggregateResults(round1Results, allSources, allQueries, allErrors);
status(
`\n Round 1 complete — ${allSources.length} sources from ${round1Tasks.length} parallel workers`,
);
let priorMessages: ReadonlyArray<AgentMessage> = [];
if (profile.depthRounds > 1 && cfg.enableAIPlanning) {
status(`\n Summarising Round 1 findings for gap-fill workers…`);
priorMessages = await summariseFindings(
allSources,
cfg.topic,
cfg.enableAIPlanning,
status,
);
}
let consecutiveStagnant = 0;
for (let round = 2; round <= profile.depthRounds; round++) {
if (signal.aborted) break;
const coveredIds = detectCoveredDimensions(allSources.map((s) => s.text));
if (coveredIds.length >= DIMENSIONS.length) {
status(
`\n All ${DIMENSIONS.length} research dimensions covered — stopping early at round ${round}`,
);
break;
}
status(
`\n Analysing coverage gaps for round ${round} (${coveredIds.length}/${DIMENSIONS.length} dimensions covered)…`,
);
const gapPlans = await buildAdaptiveGapFill(
cfg.topic,
coveredIds,
priorMessages,
cfg.enableAIPlanning,
status,
profile,
);
if (gapPlans.length === 0) {
status("Research coverage is comprehensive, stopping early");
break;
}
const roundName =
round <= 2 ? "Follow-up" : round <= 5 ? "Deep-dive" : "Exhaustive";
status(
`\n ${roundName} round ${round} — ${gapPlans.length} targeted gap-fill worker(s), ` +
`${profile.pageBudgetPerGapWorker} pages each…`,
);
const sourcesBefore = allSources.length;
const gapTasks: SwarmTask[] = [];
for (const gapPlan of gapPlans) {
gapTasks.push({
...buildTaskBase(profile, cfg),
id: `gap-${gapPlan.role}-r${round}-${Date.now()}-${Math.random().toString(36).slice(2, 6)}`,
role: gapPlan.role,
label: gapPlan.label,
queries: gapPlan.queries,
pageBudget: profile.pageBudgetPerGapWorker,
followLinks: cfg.enableLinkFollowing && gapPlan.followLinks,
preferredTiers: gapPlan.preferredTiers,
});
}
const gapResults = await Promise.all(
gapTasks.map((task) => {
const limiter = pool.next();
return runWorker(
task,
state,
signal,
status,
warn,
task.queries,
limiter,
).catch(
(err) =>
({
taskId: task.id,
role: task.role,
label: task.label,
sources: [] as CrawledSource[],
queries: [] as string[],
errors: [String(err)],
}) satisfies WorkerResult,
);
}),
);
aggregateResults(gapResults, allSources, allQueries, allErrors);
const newSources = allSources.length - sourcesBefore;
status(
`Round ${round} done — ${newSources} new sources this round, ${allSources.length} total`,
);
if (newSources === 0) {
consecutiveStagnant++;
if (consecutiveStagnant >= profile.stagnationThreshold) {
status(
`\n ${consecutiveStagnant} consecutive round(s) with no new sources — stopping (stagnation)`,
);
break;
}
} else {
consecutiveStagnant = 0;
}
if (newSources > 0 && round < profile.depthRounds && cfg.enableAIPlanning) {
priorMessages = await summariseFindings(
allSources,
cfg.topic,
cfg.enableAIPlanning,
status,
);
}
}
return {
sources: allSources,
queriesUsed: [...new Set(allQueries)],
workerErrors: allErrors,
usedAI,
topicKeywords: plan.topicKeywords,
};
}
function aggregateResults(
results: ReadonlyArray<WorkerResult>,
sources: CrawledSource[],
queries: string[],
errors: string[],
): void {
for (const result of results) {
sources.push(...result.sources);
queries.push(...result.queries);
errors.push(...result.errors);
}
}
function safeHostname(url: string): string {
try {
return new URL(url).hostname.replace(/^www\./, "");
} catch {
return "";
}
}