feat(asl): Complete Deep Research V2.0 core development

Backend:
- Add SSE streaming client (unifuncsSseClient) replacing async polling
- Add paragraph-based reasoning parser with mergeConsecutiveThinking
- Add requirement expansion service (DeepSeek-V3 PICOS+MeSH)
- Add Word export service with Pandoc, inline hyperlinks, reference link expansion
- Add deep research V2 worker with 2s log flush and Chinese source prompt
- Add 5 curated data sources config (PubMed/ClinicalTrials/Cochrane/CNKI/MedJournals)
- Add 4 API endpoints (generate-requirement/tasks/task-status/export-word)
- Update Prisma schema with 6 new V2.0 fields on AslResearchTask
- Add DB migration for V2.0 fields
- Simplify ASL_DEEP_RESEARCH_EXPANSION prompt (remove strategy section)

Frontend:
- Add waterfall-flow DeepResearchPage (phase 0-4 progressive reveal)
- Add LandingView, SetupPanel, StrategyConfirm, AgentTerminal, ResultsView
- Add react-markdown + remark-gfm for report rendering
- Add custom link component showing visible URLs after references
- Add useDeepResearchTask polling hook
- Add deep research TypeScript types

Tests:
- Add E2E test, smoke test, and Chinese data source test scripts

Docs:
- Update ASL module status (v2.0 - core features complete)
- Update system status (v6.1 - ASL V2.0 milestone)
- Update Unifuncs DeepSearch API guide (v2.0 - SSE mode + Chinese source results)
- Update module auth specification (test script guidelines)
- Update V2.0 development plan

Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
2026-02-23 13:21:52 +08:00
parent b06daecacd
commit 8f06d4f929
39 changed files with 5605 additions and 417 deletions

View File

@@ -0,0 +1,10 @@
-- Deep Research V2.0: Add 6 new fields to research_tasks
-- Backward compatible: all new columns are nullable
ALTER TABLE "asl_schema"."research_tasks"
ADD COLUMN IF NOT EXISTS "target_sources" JSONB,
ADD COLUMN IF NOT EXISTS "confirmed_requirement" TEXT,
ADD COLUMN IF NOT EXISTS "ai_intent_summary" JSONB,
ADD COLUMN IF NOT EXISTS "execution_logs" JSONB,
ADD COLUMN IF NOT EXISTS "synthesis_report" TEXT,
ADD COLUMN IF NOT EXISTS "result_list" JSONB;

View File

@@ -1,100 +0,0 @@
-- =====================================================
-- Phase 2A: SSA 智能化核心 - 数据库迁移脚本
-- 日期: 2026-02-20
-- 描述: 添加工作流表和数据画像字段
-- 注意: ssa_sessions.id 是 TEXT 类型(存储 UUID 字符串)
-- =====================================================
-- 1. 给 ssa_sessions 表添加 data_profile 字段(如果不存在)
ALTER TABLE ssa_schema.ssa_sessions
ADD COLUMN IF NOT EXISTS data_profile JSONB;
COMMENT ON COLUMN ssa_schema.ssa_sessions.data_profile IS 'Python Tool C 生成的数据画像 (Phase 2A)';
-- 2. 创建 ssa_workflows 表(多步骤分析流程)
CREATE TABLE IF NOT EXISTS ssa_schema.ssa_workflows (
id TEXT PRIMARY KEY DEFAULT gen_random_uuid()::TEXT,
session_id TEXT NOT NULL,
message_id TEXT,
status VARCHAR(20) NOT NULL DEFAULT 'pending',
total_steps INTEGER NOT NULL,
completed_steps INTEGER NOT NULL DEFAULT 0,
workflow_plan JSONB NOT NULL,
reasoning TEXT,
created_at TIMESTAMP WITHOUT TIME ZONE NOT NULL DEFAULT NOW(),
started_at TIMESTAMP WITHOUT TIME ZONE,
completed_at TIMESTAMP WITHOUT TIME ZONE,
CONSTRAINT fk_ssa_workflow_session
FOREIGN KEY (session_id)
REFERENCES ssa_schema.ssa_sessions(id)
ON DELETE CASCADE
);
-- ssa_workflows 索引
CREATE INDEX IF NOT EXISTS idx_ssa_workflow_session
ON ssa_schema.ssa_workflows(session_id);
CREATE INDEX IF NOT EXISTS idx_ssa_workflow_status
ON ssa_schema.ssa_workflows(status);
-- ssa_workflows 字段注释
COMMENT ON TABLE ssa_schema.ssa_workflows IS 'SSA 多步骤分析工作流 (Phase 2A)';
COMMENT ON COLUMN ssa_schema.ssa_workflows.status IS 'pending | running | completed | partial | error';
COMMENT ON COLUMN ssa_schema.ssa_workflows.workflow_plan IS 'LLM 生成的原始工作流计划 JSON';
COMMENT ON COLUMN ssa_schema.ssa_workflows.reasoning IS 'LLM 规划理由说明';
-- 3. 创建 ssa_workflow_steps 表(流程中的每个步骤)
CREATE TABLE IF NOT EXISTS ssa_schema.ssa_workflow_steps (
id TEXT PRIMARY KEY DEFAULT gen_random_uuid()::TEXT,
workflow_id TEXT NOT NULL,
step_order INTEGER NOT NULL,
tool_code VARCHAR(50) NOT NULL,
tool_name VARCHAR(100) NOT NULL,
status VARCHAR(20) NOT NULL DEFAULT 'pending',
input_params JSONB,
guardrail_checks JSONB,
output_result JSONB,
error_info JSONB,
execution_ms INTEGER,
started_at TIMESTAMP WITHOUT TIME ZONE,
completed_at TIMESTAMP WITHOUT TIME ZONE,
CONSTRAINT fk_ssa_workflow_step_workflow
FOREIGN KEY (workflow_id)
REFERENCES ssa_schema.ssa_workflows(id)
ON DELETE CASCADE
);
-- ssa_workflow_steps 索引
CREATE INDEX IF NOT EXISTS idx_ssa_workflow_step_workflow
ON ssa_schema.ssa_workflow_steps(workflow_id);
CREATE INDEX IF NOT EXISTS idx_ssa_workflow_step_status
ON ssa_schema.ssa_workflow_steps(status);
-- ssa_workflow_steps 字段注释
COMMENT ON TABLE ssa_schema.ssa_workflow_steps IS 'SSA 工作流单步执行记录 (Phase 2A)';
COMMENT ON COLUMN ssa_schema.ssa_workflow_steps.status IS 'pending | running | success | warning | error | skipped';
COMMENT ON COLUMN ssa_schema.ssa_workflow_steps.guardrail_checks IS 'R Service JIT 护栏检验结果 (正态性、方差齐性等)';
COMMENT ON COLUMN ssa_schema.ssa_workflow_steps.output_result IS '工具执行结果 (已裁剪,符合 LLM 上下文限制)';
COMMENT ON COLUMN ssa_schema.ssa_workflow_steps.error_info IS '错误信息 (用于容错管道的部分成功场景)';
-- =====================================================
-- 验证脚本
-- =====================================================
SELECT 'ssa_sessions.data_profile 字段' as item,
CASE WHEN EXISTS (
SELECT 1 FROM information_schema.columns
WHERE table_schema = 'ssa_schema' AND table_name = 'ssa_sessions' AND column_name = 'data_profile'
) THEN '✅ 已创建' ELSE '❌ 未创建' END as status;
SELECT 'ssa_workflows 表' as item,
CASE WHEN EXISTS (
SELECT 1 FROM information_schema.tables
WHERE table_schema = 'ssa_schema' AND table_name = 'ssa_workflows'
) THEN '✅ 已创建' ELSE '❌ 未创建' END as status;
SELECT 'ssa_workflow_steps 表' as item,
CASE WHEN EXISTS (
SELECT 1 FROM information_schema.tables
WHERE table_schema = 'ssa_schema' AND table_name = 'ssa_workflow_steps'
) THEN '✅ 已创建' ELSE '❌ 未创建' END as status;

View File

@@ -477,7 +477,7 @@ model AslFulltextScreeningTask {
@@schema("asl_schema")
}
/// 智能文献检索任务DeepSearch
/// 智能文献检索任务DeepSearch V1.x + V2.0
model AslResearchTask {
id String @id @default(uuid())
@@ -486,23 +486,23 @@ model AslResearchTask {
userId String @map("user_id")
// 检索输入
query String // 用户的自然语言查询
filters Json? // 🔜 后续:高级筛选 { yearFrom, yearTo, articleTypes }
query String // 用户的自然语言查询V1.x 原始输入 / V2.0 Step 1 粗略想法)
filters Json? // 高级筛选 { yearRange, targetCount, requireOpenAccess }
// unifuncs 任务
externalTaskId String? @map("external_task_id")
// 状态
status String @default("pending") // pending/processing/completed/failed
// 状态: draft → pending → running → completed / failed
status String @default("pending")
errorMessage String? @map("error_message")
// 结果
// V1.x 结果字段(保留向后兼容)
resultCount Int? @map("result_count")
rawResult String? @map("raw_result") @db.Text
reasoningContent String? @map("reasoning_content") @db.Text // AI思考过程
literatures Json? // 解析后的文献列表
reasoningContent String? @map("reasoning_content") @db.Text
literatures Json?
// 统计(🔜 后续展示)
// 统计
tokenUsage Json? @map("token_usage")
searchCount Int? @map("search_count")
readCount Int? @map("read_count")
@@ -513,6 +513,15 @@ model AslResearchTask {
updatedAt DateTime @updatedAt @map("updated_at")
completedAt DateTime? @map("completed_at")
// ── V2.0 新增字段 ──────────────────────────────
targetSources Json? @map("target_sources") // 选中的数据源 ["https://pubmed.ncbi.nlm.nih.gov/", ...]
confirmedRequirement String? @map("confirmed_requirement") @db.Text // 用户核验后的自然语言检索指令书
aiIntentSummary Json? @map("ai_intent_summary") // PICOS + MeSH 结构化摘要
executionLogs Json? @map("execution_logs") // 终端日志数组 [{type, title, text, ts}]
synthesisReport String? @map("synthesis_report") @db.Text // AI综合报告Markdown
resultList Json? @map("result_list") // 结构化文献元数据列表
// ── 索引 ────────────────────────────
@@index([projectId], map: "idx_research_tasks_project_id")
@@index([userId], map: "idx_research_tasks_user_id")
@@index([status], map: "idx_research_tasks_status")

View File

@@ -55,7 +55,7 @@ const RVW_FALLBACKS: Record<string, FallbackPrompt> = {
};
/**
* ASL 模块兜底 Prompt(预留)
* ASL 模块兜底 Prompt
*/
const ASL_FALLBACKS: Record<string, FallbackPrompt> = {
ASL_SCREENING: {
@@ -64,6 +64,49 @@ const ASL_FALLBACKS: Record<string, FallbackPrompt> = {
请根据提供的标准对文献进行筛选输出JSON格式的结果。`,
modelConfig: { model: 'deepseek-v3', temperature: 0.2 },
},
ASL_DEEP_RESEARCH_EXPANSION: {
content: `你是一位经验丰富的医学信息官,擅长将研究者的模糊想法转化为精准的文献检索需求指令。
## 任务
根据用户输入的粗略研究想法,生成一份简洁的深度文献检索指令书。
## 输出规则
1. 自然语言风格:像写邮件一样,口语化但专业,方便研究者直接阅读和编辑
2. PICOS 拆解:明确 Population / Intervention / Comparison / Outcome / Study Design
3. MeSH 扩展:为关键术语补充 MeSH 同义词(用括号标注英文 MeSH 术语)
4. 研究设计偏好:若用户未指定,默认优先 RCT、Systematic Review/Meta-Analysis、Cohort Study
5. 不要输出"检索策略建议"章节
6. 不要使用 Markdown 加粗标记(即不要用 ** 符号)
7. 不得自行添加约束:不要擅自限定"仅开放获取"或"仅英文文献"
## 用户输入
- 研究想法:{{originalQuery}}
- 选择的数据源:{{targetSources}}
- 时间范围:{{yearRange}}
- 目标数量:{{targetCount}}
## 输出格式
请严格按以下两部分输出,不要添加额外内容:
### Part 1: 自然语言检索指令书
简洁的检索需求描述包含研究背景、PICOS要素、MeSH术语不要包含检索策略建议
### Part 2: 结构化摘要JSON
\`\`\`json
{
"objective": "研究目标一句话描述",
"population": "研究人群",
"intervention": "干预措施含英文MeSH",
"comparison": "对照组",
"outcome": "主要结局指标",
"studyDesign": ["RCT", "Meta-analysis"],
"meshTerms": ["MeSH术语1", "MeSH术语2"],
"condition": "疾病/状况"
}
\`\`\``,
modelConfig: { model: 'deepseek-v3', temperature: 0.4 },
},
};
/**

View File

@@ -0,0 +1,179 @@
/**
* 中文数据源专项测试
*
* 只搜索 CNKI + 中华医学期刊网,验证 Unifuncs API 能否检索中文文献。
*
* 运行: npx tsx src/modules/asl/__tests__/deep-research-chinese-sources.ts
*/
const UNIFUNCS_BASE_URL = 'https://api.unifuncs.com/deepsearch/v1';
const UNIFUNCS_API_KEY = process.env.UNIFUNCS_API_KEY || 'sk-2fNwqUH73elGq0aDKJEM4ReqP7Ry0iqHo4OXyidDe2WpQ9XQ';
const CHINESE_SOURCES = [
'https://www.cnki.net/',
'https://medjournals.cn/',
];
const TEST_QUERIES = [
'2型糖尿病患者SGLT2抑制剂的肾脏保护作用',
'非小细胞肺癌免疫治疗的中国临床研究进展',
];
async function runTest(query: string, domainScope: string[]) {
console.log('\n' + '='.repeat(80));
console.log(`查询: ${query}`);
console.log(`数据源: ${domainScope.join(', ')}`);
console.log('='.repeat(80));
// --- Test 1: SSE 流式模式 ---
console.log('\n--- SSE 流式模式测试 ---');
try {
const payload = {
model: 's2',
messages: [{ role: 'user', content: query }],
stream: true,
introduction: '你是一名专业的中国临床研究文献检索专家,擅长从中国学术数据库中检索中文医学文献。请使用中文关键词进行检索。',
max_depth: 10,
domain_scope: domainScope,
reference_style: 'link',
output_prompt: '请使用中文输出。列出所有检索到的文献,包含标题、作者、期刊、年份和链接。如果文献是中文的,请保留中文信息。',
};
const response = await fetch(`${UNIFUNCS_BASE_URL}/chat/completions`, {
method: 'POST',
headers: {
'Authorization': `Bearer ${UNIFUNCS_API_KEY}`,
'Content-Type': 'application/json',
'Accept': 'text/event-stream',
},
body: JSON.stringify(payload),
});
if (!response.ok) {
const text = await response.text();
console.error(`❌ SSE 请求失败 HTTP ${response.status}: ${text}`);
return;
}
console.log(`✅ SSE 连接成功 (HTTP ${response.status})`);
const reader = (response.body as any).getReader() as ReadableStreamDefaultReader<Uint8Array>;
const decoder = new TextDecoder();
let buffer = '';
let reasoningContent = '';
let content = '';
let chunkCount = 0;
let reasoningChunks = 0;
let contentChunks = 0;
const startTime = Date.now();
while (true) {
const { done, value } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
const lines = buffer.split('\n');
buffer = lines.pop() || '';
for (const line of lines) {
const trimmed = line.trim();
if (!trimmed || trimmed.startsWith(':')) continue;
if (trimmed.startsWith('data: ')) {
const data = trimmed.slice(6);
if (data === '[DONE]') {
console.log('\n✅ 流式传输完成 [DONE]');
break;
}
try {
const json = JSON.parse(data);
const delta = json.choices?.[0]?.delta;
chunkCount++;
if (delta?.reasoning_content) {
reasoningContent += delta.reasoning_content;
reasoningChunks++;
if (reasoningChunks % 50 === 0) {
process.stdout.write(`\r reasoning chunks: ${reasoningChunks}, content chunks: ${contentChunks}`);
}
} else if (delta?.content) {
content += delta.content;
contentChunks++;
if (contentChunks % 20 === 0) {
process.stdout.write(`\r reasoning chunks: ${reasoningChunks}, content chunks: ${contentChunks}`);
}
}
} catch {
// Skip unparseable
}
}
}
}
reader.releaseLock();
const elapsed = ((Date.now() - startTime) / 1000).toFixed(1);
console.log(`\n\n📊 统计:`);
console.log(` 耗时: ${elapsed}s`);
console.log(` 总 chunks: ${chunkCount}`);
console.log(` reasoning chunks: ${reasoningChunks} (${reasoningContent.length} chars)`);
console.log(` content chunks: ${contentChunks} (${content.length} chars)`);
// 分析结果
console.log(`\n📝 Reasoning 内容(前 500 字):`);
console.log(reasoningContent.slice(0, 500));
console.log(`\n📝 Content 结果(前 1000 字):`);
console.log(content.slice(0, 1000));
// 检查是否包含中文内容
const chineseCharCount = (content.match(/[\u4e00-\u9fa5]/g) || []).length;
const hasChineseLinks = content.includes('cnki.net') || content.includes('medjournals.cn');
const hasPubMedLinks = content.includes('pubmed.ncbi.nlm.nih.gov');
console.log(`\n🔍 中文分析:`);
console.log(` 中文字符数: ${chineseCharCount}`);
console.log(` 包含 CNKI 链接: ${hasChineseLinks ? '✅ 是' : '❌ 否'}`);
console.log(` 包含 PubMed 链接: ${hasPubMedLinks ? '⚠️ 是(非中文源)' : '✅ 否'}`);
// 统计所有 URL
const urls = content.match(/https?:\/\/[^\s)]+/g) || [];
console.log(` 结果中的链接 (共 ${urls.length} 个):`);
const domainCounts: Record<string, number> = {};
for (const url of urls) {
try {
const domain = new URL(url).hostname;
domainCounts[domain] = (domainCounts[domain] || 0) + 1;
} catch { /* skip */ }
}
for (const [domain, count] of Object.entries(domainCounts).sort((a, b) => b[1] - a[1])) {
console.log(` ${domain}: ${count}`);
}
} catch (err: any) {
console.error(`❌ SSE 测试失败: ${err.message}`);
}
}
async function main() {
console.log('🔬 中文数据源专项测试');
console.log(`API Key: ${UNIFUNCS_API_KEY.slice(0, 10)}...`);
console.log(`测试数据源: ${CHINESE_SOURCES.join(', ')}`);
for (const query of TEST_QUERIES) {
await runTest(query, CHINESE_SOURCES);
}
// 额外测试: 同时包含中英文数据源
console.log('\n\n' + '🔬'.repeat(20));
console.log('附加测试: 中英文混合数据源');
await runTest(
'他汀类药物在心血管疾病一级预防中的最新RCT证据',
['https://pubmed.ncbi.nlm.nih.gov/', ...CHINESE_SOURCES]
);
console.log('\n\n✅ 所有测试完成');
}
main().catch(console.error);

View File

@@ -0,0 +1,345 @@
/**
* Deep Research V2.0 — 端到端集成测试
*
* 测试全流程:
* Step 1: 登录获取 Token
* Step 2: 获取数据源列表
* Step 3: 需求扩写LLM 调用)
* Step 4: 查看 draft 任务
* Step 5: HITL 确认 → 启动执行
* Step 6: 轮询任务直到 completed/failed
* Step 7: 验证结果完整性
* Step 8: Word 导出(可选,需要 Pandoc 微服务)
* Step 9: 清理测试数据
*
* 运行方式:
* npx tsx src/modules/asl/__tests__/deep-research-v2-e2e.ts
*
* 前置条件:
* - 后端服务运行在 localhost:3001
* - PostgreSQL 运行中
* - UNIFUNCS_API_KEY 环境变量已设置
* - 可选Python 微服务运行在 localhost:8000Word 导出)
*
* 预计耗时2-5 分钟(取决于 Unifuncs 搜索深度)
* 预计成本:约 ¥0.05-0.2LLM 需求扩写 + Unifuncs DeepSearch
*/
// ─── 配置 ───────────────────────────────────────
const BASE_URL = process.env.TEST_BASE_URL || 'http://localhost:3001';
const API_PREFIX = `${BASE_URL}/api/v1`;
const TEST_PHONE = process.env.TEST_PHONE || '13800000001';
const TEST_PASSWORD = process.env.TEST_PASSWORD || '123456';
const TEST_QUERY = '他汀类药物在心血管疾病一级预防中的最新 RCT 证据';
const MAX_POLL_ATTEMPTS = 60;
const POLL_INTERVAL_MS = 5000;
const SKIP_WORD_EXPORT = process.env.SKIP_WORD_EXPORT === 'true';
const SKIP_CLEANUP = process.env.SKIP_CLEANUP === 'true';
// ─── 工具函数 ───────────────────────────────────
const sleep = (ms: number) => new Promise(r => setTimeout(r, ms));
let authToken = '';
async function api(
path: string,
options: RequestInit = {}
): Promise<{ status: number; data: any }> {
const url = `${API_PREFIX}${path}`;
const res = await fetch(url, {
...options,
headers: {
'Content-Type': 'application/json',
...(authToken ? { Authorization: `Bearer ${authToken}` } : {}),
...(options.headers || {}),
},
});
const contentType = res.headers.get('content-type') || '';
let data: any;
if (contentType.includes('application/json')) {
data = await res.json();
} else if (contentType.includes('application/vnd.openxmlformats')) {
const buf = await res.arrayBuffer();
data = { _binary: true, byteLength: buf.byteLength };
} else {
data = await res.text();
}
return { status: res.status, data };
}
function assert(condition: boolean, message: string) {
if (!condition) {
throw new Error(`❌ 断言失败: ${message}`);
}
console.log(`${message}`);
}
function printDivider(title: string) {
console.log(`\n${'═'.repeat(70)}`);
console.log(` ${title}`);
console.log(`${'═'.repeat(70)}\n`);
}
// ─── 测试流程 ───────────────────────────────────
async function runE2E() {
console.log('🧪 Deep Research V2.0 端到端集成测试');
console.log(`⏰ 时间: ${new Date().toLocaleString('zh-CN')}`);
console.log(`📍 后端: ${BASE_URL}`);
console.log(`📝 查询: ${TEST_QUERY}`);
console.log('');
let taskId: string | null = null;
const startTime = Date.now();
try {
// ═══════════ Step 1: 登录 ═══════════
printDivider('Step 1: 登录获取 Token');
const loginRes = await api('/auth/login/password', {
method: 'POST',
body: JSON.stringify({ phone: TEST_PHONE, password: TEST_PASSWORD }),
});
if (loginRes.status !== 200 || !loginRes.data.success) {
console.log('⚠️ 登录返回:', JSON.stringify(loginRes.data, null, 2));
throw new Error(
`登录失败 (HTTP ${loginRes.status}): ${loginRes.data.error || loginRes.data.message || '未知错误'}\n` +
`请确认测试账号存在phone=${TEST_PHONE}, password=${TEST_PASSWORD}\n` +
`或通过环境变量指定TEST_PHONE=xxx TEST_PASSWORD=yyy`
);
}
authToken = loginRes.data.data.tokens?.accessToken || loginRes.data.data.accessToken || loginRes.data.data.token;
assert(!!authToken, `获取到 Token (${authToken.slice(0, 20)}...)`);
// ═══════════ Step 2: 获取数据源列表 ═══════════
printDivider('Step 2: 获取数据源列表');
const sourcesRes = await api('/asl/research/data-sources');
assert(sourcesRes.status === 200, `HTTP 200 (实际: ${sourcesRes.status})`);
assert(sourcesRes.data.success === true, 'success=true');
const sources = sourcesRes.data.data;
assert(Array.isArray(sources), `返回数组 (${sources.length} 个数据源)`);
assert(sources.length >= 3, `至少 3 个数据源`);
console.log('\n 数据源列表:');
sources.forEach((s: any) => {
console.log(` ${s.defaultChecked ? '☑' : '☐'} [${s.category}] ${s.label}${s.domainScope}`);
});
const defaultIds = sources.filter((s: any) => s.defaultChecked).map((s: any) => s.domainScope);
console.log(`\n 默认选中: ${defaultIds.join(', ')}`);
// ═══════════ Step 3: 需求扩写 ═══════════
printDivider('Step 3: 需求扩写LLM 调用)');
console.log(' ⏳ 调用 LLM 生成检索指令书,请稍候...\n');
const expandStart = Date.now();
const expandRes = await api('/asl/research/generate-requirement', {
method: 'POST',
body: JSON.stringify({
originalQuery: TEST_QUERY,
targetSources: defaultIds,
filters: { yearRange: '近5年', targetCount: '约20篇' },
}),
});
const expandMs = Date.now() - expandStart;
assert(expandRes.status === 200, `HTTP 200 (实际: ${expandRes.status})`);
assert(expandRes.data.success === true, 'success=true');
const expandData = expandRes.data.data;
taskId = expandData.taskId;
assert(!!taskId, `创建 draft 任务: ${taskId}`);
assert(!!expandData.generatedRequirement, `生成检索指令书 (${expandData.generatedRequirement.length} 字)`);
assert(!!expandData.intentSummary, 'PICOS 结构化摘要已生成');
console.log(`\n ⏱️ 耗时: ${(expandMs / 1000).toFixed(1)}s`);
console.log(`\n 📋 PICOS 摘要:`);
const summary = expandData.intentSummary;
console.log(` 目标: ${summary.objective || '—'}`);
console.log(` P: ${summary.population || '—'}`);
console.log(` I: ${summary.intervention || '—'}`);
console.log(` C: ${summary.comparison || '—'}`);
console.log(` O: ${summary.outcome || '—'}`);
console.log(` 研究设计: ${(summary.studyDesign || []).join(', ')}`);
console.log(` MeSH: ${(summary.meshTerms || []).join(', ')}`);
console.log(`\n 📝 指令书预览 (前 200 字):`);
console.log(` ${expandData.generatedRequirement.slice(0, 200).replace(/\n/g, '\n ')}...`);
// ═══════════ Step 4: 查看 draft 任务 ═══════════
printDivider('Step 4: 查看 draft 状态任务');
const draftRes = await api(`/asl/research/tasks/${taskId}`);
assert(draftRes.status === 200, `HTTP 200`);
assert(draftRes.data.data.status === 'draft', `状态为 draft`);
assert(draftRes.data.data.query === TEST_QUERY, `query 匹配`);
// ═══════════ Step 5: HITL 确认 → 启动 ═══════════
printDivider('Step 5: HITL 确认 → 启动执行');
const confirmedReq = expandData.generatedRequirement;
const executeRes = await api(`/asl/research/tasks/${taskId}/execute`, {
method: 'PUT',
body: JSON.stringify({ confirmedRequirement: confirmedReq }),
});
assert(executeRes.status === 200, `HTTP 200 (实际: ${executeRes.status})`);
assert(executeRes.data.success === true, '任务已入队');
console.log(' 🚀 Deep Research 任务已启动!');
// ═══════════ Step 6: 轮询 ═══════════
printDivider('Step 6: 轮询任务进度');
let lastLogCount = 0;
let finalStatus = '';
for (let i = 1; i <= MAX_POLL_ATTEMPTS; i++) {
await sleep(POLL_INTERVAL_MS);
const pollRes = await api(`/asl/research/tasks/${taskId}`);
const task = pollRes.data.data;
const logs = task.executionLogs || [];
const newLogs = logs.slice(lastLogCount);
if (newLogs.length > 0) {
newLogs.forEach((log: any) => {
const icon: Record<string, string> = {
thinking: '💭', searching: '🔍', reading: '📖',
analyzing: '🧪', summary: '📊', info: '',
};
console.log(` ${icon[log.type] || '•'} [${log.title}] ${log.text.slice(0, 80)}`);
});
lastLogCount = logs.length;
}
console.log(` [${i}/${MAX_POLL_ATTEMPTS}] status=${task.status} | logs=${logs.length} | elapsed=${((Date.now() - startTime) / 1000).toFixed(0)}s`);
if (task.status === 'completed' || task.status === 'failed') {
finalStatus = task.status;
break;
}
}
assert(finalStatus === 'completed', `任务完成 (实际: ${finalStatus || 'timeout'})`);
// ═══════════ Step 7: 验证结果 ═══════════
printDivider('Step 7: 验证结果完整性');
const resultRes = await api(`/asl/research/tasks/${taskId}`);
const result = resultRes.data.data;
assert(result.status === 'completed', '状态: completed');
assert(!!result.synthesisReport, `AI 综合报告已生成 (${(result.synthesisReport || '').length} 字)`);
assert(!!result.completedAt, `completedAt 已设置`);
const hasResultList = Array.isArray(result.resultList) && result.resultList.length > 0;
if (hasResultList) {
console.log(` ✅ 结构化文献列表: ${result.resultList.length}`);
console.log('\n 📚 文献样例 (前 3 篇):');
result.resultList.slice(0, 3).forEach((item: any, i: number) => {
console.log(` ${i + 1}. ${item.title || '(无标题)'}`);
if (item.authors) console.log(` 作者: ${item.authors}`);
if (item.journal) console.log(` 期刊: ${item.journal}`);
if (item.pmid) console.log(` PMID: ${item.pmid}`);
});
} else {
console.log(' ⚠️ 结构化文献列表为空(降级到报告展示模式)');
}
const logs = result.executionLogs || [];
console.log(`\n 📊 执行日志统计: ${logs.length}`);
const typeCounts: Record<string, number> = {};
logs.forEach((l: any) => { typeCounts[l.type] = (typeCounts[l.type] || 0) + 1; });
Object.entries(typeCounts).forEach(([type, count]) => {
console.log(` ${type}: ${count}`);
});
console.log(`\n 📝 报告预览 (前 300 字):`);
console.log(` ${result.synthesisReport.slice(0, 300).replace(/\n/g, '\n ')}...`);
// ═══════════ Step 8: Word 导出 ═══════════
if (!SKIP_WORD_EXPORT) {
printDivider('Step 8: Word 导出(需要 Pandoc 微服务)');
try {
const exportRes = await api(`/asl/research/tasks/${taskId}/export-word`);
if (exportRes.status === 200 && exportRes.data._binary) {
assert(true, `Word 导出成功 (${exportRes.data.byteLength} bytes)`);
} else {
console.log(` ⚠️ Word 导出返回异常 (HTTP ${exportRes.status})`);
console.log(` 这通常是因为 Python 微服务Pandoc未运行可忽略。`);
}
} catch (e: any) {
console.log(` ⚠️ Word 导出跳过: ${e.message}`);
console.log(` 如需测试,请启动 Python 微服务: cd extraction-service && python app.py`);
}
} else {
console.log('\n ⏭️ 跳过 Word 导出测试 (SKIP_WORD_EXPORT=true)');
}
// ═══════════ Step 9: 清理 ═══════════
if (!SKIP_CLEANUP && taskId) {
printDivider('Step 9: 清理测试数据');
try {
const { PrismaClient } = await import('@prisma/client');
const prisma = new PrismaClient();
await prisma.aslResearchTask.delete({ where: { id: taskId } });
await prisma.$disconnect();
console.log(` 🗑️ 已删除测试任务: ${taskId}`);
} catch (e: any) {
console.log(` ⚠️ 清理失败: ${e.message} (可手动删除)`);
}
} else {
console.log(`\n ⏭️ 保留测试数据 taskId=${taskId}`);
}
// ═══════════ 总结 ═══════════
printDivider('🎉 测试通过!');
const totalMs = Date.now() - startTime;
console.log(` 总耗时: ${(totalMs / 1000).toFixed(1)}s`);
console.log(` 任务 ID: ${taskId}`);
console.log(` 综合报告: ${(result.synthesisReport || '').length}`);
console.log(` 文献数量: ${hasResultList ? result.resultList.length : 0}`);
console.log(` 执行日志: ${logs.length}`);
console.log('');
} catch (error: any) {
printDivider('💥 测试失败');
console.error(` 错误: ${error.message}`);
console.error(` 任务 ID: ${taskId || '未创建'}`);
console.error(` 耗时: ${((Date.now() - startTime) / 1000).toFixed(1)}s`);
console.error('');
if (error.message.includes('登录失败')) {
console.error(' 💡 提示: 请检查测试账号配置');
console.error(' 可通过环境变量指定: TEST_PHONE=xxx TEST_PASSWORD=yyy npx tsx ...');
}
if (error.message.includes('fetch failed') || error.message.includes('ECONNREFUSED')) {
console.error(' 💡 提示: 后端服务未启动,请先运行:');
console.error(' cd backend && npm run dev');
}
process.exit(1);
}
}
// ─── 入口 ───────────────────────────────────────
runE2E();

View File

@@ -0,0 +1,161 @@
/**
* Deep Research V2.0 — 冒烟测试Smoke Test
*
* 仅测试 API 接口连通性和基本参数校验,
* 不调用 LLM / Unifuncs无外部依赖几秒完成。
*
* 运行方式:
* npx tsx src/modules/asl/__tests__/deep-research-v2-smoke.ts
*
* 前置条件:
* - 后端服务运行在 localhost:3001
* - PostgreSQL 运行中
*/
const BASE_URL = process.env.TEST_BASE_URL || 'http://localhost:3001';
const API_PREFIX = `${BASE_URL}/api/v1`;
const TEST_PHONE = process.env.TEST_PHONE || '13800000001';
const TEST_PASSWORD = process.env.TEST_PASSWORD || '123456';
let authToken = '';
let passed = 0;
let failed = 0;
async function api(path: string, options: RequestInit = {}): Promise<{ status: number; data: any }> {
const res = await fetch(`${API_PREFIX}${path}`, {
...options,
headers: {
'Content-Type': 'application/json',
...(authToken ? { Authorization: `Bearer ${authToken}` } : {}),
...(options.headers || {}),
},
});
const contentType = res.headers.get('content-type') || '';
const data = contentType.includes('json') ? await res.json() : await res.text();
return { status: res.status, data };
}
function check(ok: boolean, label: string) {
if (ok) {
console.log(`${label}`);
passed++;
} else {
console.log(`${label}`);
failed++;
}
}
async function run() {
console.log('🔥 Deep Research V2.0 冒烟测试\n');
// ─── 1. 登录 ───
console.log('[1] 登录');
try {
const res = await api('/auth/login/password', {
method: 'POST',
body: JSON.stringify({ phone: TEST_PHONE, password: TEST_PASSWORD }),
});
const ok = res.status === 200 && res.data.success;
check(ok, `POST /auth/login/password → ${res.status}`);
if (ok) {
authToken = res.data.data.tokens?.accessToken || res.data.data.accessToken || res.data.data.token;
} else {
console.log(' ⚠️ 登录失败,后续需要认证的测试将跳过');
console.log(` 返回: ${JSON.stringify(res.data).slice(0, 200)}`);
}
} catch (e: any) {
check(false, `连接后端失败: ${e.message}`);
console.log('\n💡 请先启动后端: cd backend && npm run dev\n');
process.exit(1);
}
// ─── 2. GET /data-sources ───
console.log('\n[2] GET /asl/research/data-sources');
{
const res = await api('/asl/research/data-sources');
check(res.status === 200, `HTTP ${res.status} === 200`);
check(res.data.success === true, 'success=true');
check(Array.isArray(res.data.data) && res.data.data.length >= 3, `返回 ${res.data.data?.length} 个数据源`);
const hasPubmed = res.data.data?.some((s: any) => s.id === 'pubmed');
check(hasPubmed, 'PubMed 存在于数据源列表');
const defaultChecked = res.data.data?.filter((s: any) => s.defaultChecked);
check(defaultChecked?.length >= 1, `默认选中 ${defaultChecked?.length}`);
}
// ─── 3. POST /generate-requirement (参数校验) ───
console.log('\n[3] POST /asl/research/generate-requirement — 参数校验');
{
const res = await api('/asl/research/generate-requirement', {
method: 'POST',
body: JSON.stringify({ originalQuery: '' }),
});
check(res.status === 400, `空 query → HTTP ${res.status} === 400`);
}
{
const res = await api('/asl/research/generate-requirement', {
method: 'POST',
body: JSON.stringify({}),
});
check(res.status === 400, `缺少 query → HTTP ${res.status} === 400`);
}
// ─── 4. PUT /tasks/:taskId/execute (参数校验) ───
console.log('\n[4] PUT /asl/research/tasks/:taskId/execute — 参数校验');
{
const res = await api('/asl/research/tasks/nonexistent-id/execute', {
method: 'PUT',
body: JSON.stringify({ confirmedRequirement: 'test' }),
});
check(res.status === 404, `不存在的 taskId → HTTP ${res.status} === 404`);
}
{
const res = await api('/asl/research/tasks/some-id/execute', {
method: 'PUT',
body: JSON.stringify({ confirmedRequirement: '' }),
});
check(res.status === 400, `空 confirmedRequirement → HTTP ${res.status} === 400`);
}
// ─── 5. GET /tasks/:taskId (不存在) ───
console.log('\n[5] GET /asl/research/tasks/:taskId — 不存在');
{
const res = await api('/asl/research/tasks/nonexistent-id');
check(res.status === 404, `不存在 → HTTP ${res.status} === 404`);
}
// ─── 6. GET /tasks/:taskId/export-word (不存在) ───
console.log('\n[6] GET /asl/research/tasks/:taskId/export-word — 不存在');
{
const res = await api('/asl/research/tasks/nonexistent-id/export-word');
check(res.status === 500 || res.status === 404, `不存在 → HTTP ${res.status}`);
}
// ─── 7. 未认证访问 ───
console.log('\n[7] 未认证访问(无 Token');
{
const savedToken = authToken;
authToken = '';
const res = await api('/asl/research/data-sources');
check(res.status === 401, `无 Token → HTTP ${res.status} === 401`);
authToken = savedToken;
}
// ─── 结果汇总 ───
console.log(`\n${'═'.repeat(50)}`);
console.log(` 🏁 冒烟测试完成: ${passed} 通过, ${failed} 失败 (共 ${passed + failed})`);
console.log(`${'═'.repeat(50)}\n`);
if (failed > 0) {
process.exit(1);
}
}
run();

View File

@@ -0,0 +1,75 @@
/**
* Deep Research V2.0 — 精选数据源配置
*
* 基于 Unifuncs API 18 站实测结果2026-02-22精选的 5 个数据源。
* 前端 SetupPanel 直接消费此配置渲染 Checkbox 列表。
*/
export interface DataSourceConfig {
id: string;
label: string;
labelEn: string;
domainScope: string;
category: 'english' | 'chinese';
defaultChecked: boolean;
note?: string;
}
export const DEEP_RESEARCH_DATA_SOURCES: DataSourceConfig[] = [
{
id: 'pubmed',
label: 'PubMed',
labelEn: 'PubMed',
domainScope: 'https://pubmed.ncbi.nlm.nih.gov/',
category: 'english',
defaultChecked: true,
},
{
id: 'clinicaltrials',
label: 'ClinicalTrials.gov',
labelEn: 'ClinicalTrials.gov',
domainScope: 'https://clinicaltrials.gov/',
category: 'english',
defaultChecked: false,
},
{
id: 'cochrane',
label: 'Cochrane Library',
labelEn: 'Cochrane Library',
domainScope: 'https://www.cochranelibrary.com/',
category: 'english',
defaultChecked: false,
},
{
id: 'cnki',
label: '中国知网 CNKI',
labelEn: 'CNKI',
domainScope: 'https://www.cnki.net/',
category: 'chinese',
defaultChecked: false,
},
{
id: 'medjournals',
label: '中华医学期刊网',
labelEn: 'Chinese Medical Journals',
domainScope: 'https://medjournals.cn/',
category: 'chinese',
defaultChecked: false,
},
];
export function getDefaultDataSources(): string[] {
return DEEP_RESEARCH_DATA_SOURCES
.filter(ds => ds.defaultChecked)
.map(ds => ds.domainScope);
}
export function getDomainScopes(ids: string[]): string[] {
return DEEP_RESEARCH_DATA_SOURCES
.filter(ds => ids.includes(ds.id))
.map(ds => ds.domainScope);
}
export function hasEnglishOnlySource(ids: string[]): boolean {
return ids.includes('clinicaltrials');
}

View File

@@ -0,0 +1,210 @@
/**
* Deep Research V2.0 Controller
*
* 新增端点(保留 V1.x 不动):
* - POST /generate-requirement — 需求扩写
* - PUT /tasks/:taskId/execute — 启动异步执行
* - GET /tasks/:taskId — 状态 + 日志 + 结果
*/
import { FastifyRequest, FastifyReply } from 'fastify';
import { prisma } from '../../../config/database.js';
import { jobQueue } from '../../../common/jobs/index.js';
import { logger } from '../../../common/logging/index.js';
import { requirementExpansionService } from '../services/requirementExpansionService.js';
import { wordExportService } from '../services/wordExportService.js';
import { DEEP_RESEARCH_DATA_SOURCES } from '../config/dataSources.js';
// ─── Request types ──────────────────────────────
interface GenerateRequirementBody {
originalQuery: string;
targetSources?: string[];
filters?: {
yearRange?: string;
targetCount?: string;
requireOpenAccess?: boolean;
};
}
interface ExecuteBody {
confirmedRequirement: string;
}
interface TaskParams {
taskId: string;
}
// ─── POST /research/generate-requirement ────────
export async function generateRequirement(
request: FastifyRequest<{ Body: GenerateRequirementBody }>,
reply: FastifyReply
) {
try {
const userId = request.user?.userId;
if (!userId) {
return reply.code(401).send({ success: false, error: '用户未认证' });
}
const { originalQuery, targetSources, filters } = request.body;
if (!originalQuery?.trim()) {
return reply.code(400).send({ success: false, error: '请输入研究想法' });
}
const projectId = 'default';
const result = await requirementExpansionService.generateRequirement({
projectId,
userId,
originalQuery: originalQuery.trim(),
targetSources,
filters,
});
return reply.send({ success: true, data: result });
} catch (error: any) {
logger.error('[DeepResearchController] generateRequirement failed', {
error: error.message,
});
return reply.code(500).send({ success: false, error: error.message });
}
}
// ─── PUT /research/tasks/:taskId/execute ────────
export async function executeTask(
request: FastifyRequest<{ Params: TaskParams; Body: ExecuteBody }>,
reply: FastifyReply
) {
try {
const userId = request.user?.userId;
if (!userId) {
return reply.code(401).send({ success: false, error: '用户未认证' });
}
const { taskId } = request.params;
const { confirmedRequirement } = request.body;
if (!confirmedRequirement?.trim()) {
return reply.code(400).send({ success: false, error: '检索指令不能为空' });
}
const task = await prisma.aslResearchTask.findUnique({
where: { id: taskId },
});
if (!task) {
return reply.code(404).send({ success: false, error: '任务不存在' });
}
if (task.userId !== userId) {
return reply.code(403).send({ success: false, error: '无权操作此任务' });
}
if (task.status !== 'draft') {
return reply.code(400).send({
success: false,
error: `任务状态为 ${task.status},只有 draft 状态可启动`,
});
}
await prisma.aslResearchTask.update({
where: { id: taskId },
data: {
confirmedRequirement: confirmedRequirement.trim(),
status: 'pending',
},
});
await jobQueue.push('asl_deep_research_v2', { taskId });
logger.info('[DeepResearchController] Task pushed to queue', { taskId });
return reply.send({ success: true });
} catch (error: any) {
logger.error('[DeepResearchController] executeTask failed', {
error: error.message,
});
return reply.code(500).send({ success: false, error: error.message });
}
}
// ─── GET /research/tasks/:taskId ────────────────
export async function getTask(
request: FastifyRequest<{ Params: TaskParams }>,
reply: FastifyReply
) {
try {
const { taskId } = request.params;
const task = await prisma.aslResearchTask.findUnique({
where: { id: taskId },
});
if (!task) {
return reply.code(404).send({ success: false, error: '任务不存在' });
}
return reply.send({
success: true,
data: {
taskId: task.id,
status: task.status,
query: task.query,
targetSources: task.targetSources,
confirmedRequirement: task.confirmedRequirement,
aiIntentSummary: task.aiIntentSummary,
executionLogs: task.executionLogs || [],
synthesisReport: task.synthesisReport,
resultList: task.resultList,
resultCount: task.resultCount,
errorMessage: task.errorMessage,
createdAt: task.createdAt,
completedAt: task.completedAt,
},
});
} catch (error: any) {
logger.error('[DeepResearchController] getTask failed', {
error: error.message,
});
return reply.code(500).send({ success: false, error: error.message });
}
}
// ─── GET /research/tasks/:taskId/export-word ────
export async function exportWord(
request: FastifyRequest<{ Params: TaskParams }>,
reply: FastifyReply
) {
try {
const { taskId } = request.params;
const { buffer, filename } = await wordExportService.exportTaskToWord(taskId);
reply
.header('Content-Type', 'application/vnd.openxmlformats-officedocument.wordprocessingml.document')
.header('Content-Disposition', `attachment; filename*=UTF-8''${encodeURIComponent(filename)}`)
.send(buffer);
} catch (error: any) {
logger.error('[DeepResearchController] exportWord failed', {
error: error.message,
});
return reply.code(500).send({ success: false, error: error.message });
}
}
// ─── GET /research/data-sources ─────────────────
export async function getDataSources(
_request: FastifyRequest,
reply: FastifyReply
) {
return reply.send({
success: true,
data: DEEP_RESEARCH_DATA_SOURCES,
});
}

View File

@@ -8,6 +8,7 @@ import * as literatureController from '../controllers/literatureController.js';
import * as screeningController from '../controllers/screeningController.js';
import * as fulltextScreeningController from '../fulltext-screening/controllers/FulltextScreeningController.js';
import * as researchController from '../controllers/researchController.js';
import * as deepResearchController from '../controllers/deepResearchController.js';
import { authenticate, requireModule } from '../../../common/auth/auth.middleware.js';
export async function aslRoutes(fastify: FastifyInstance) {
@@ -79,16 +80,33 @@ export async function aslRoutes(fastify: FastifyInstance) {
// 导出Excel
fastify.get('/fulltext-screening/tasks/:taskId/export', { preHandler: [authenticate, requireModule('ASL')] }, fulltextScreeningController.exportExcel);
// ==================== 智能文献检索路由 (DeepSearch) ====================
// ==================== 智能文献检索路由 (DeepSearch V1.x — 保留兼容) ====================
// SSE 流式检索(推荐,实时显示思考过程)
// SSE 流式检索
fastify.post('/research/stream', { preHandler: [authenticate, requireModule('ASL')] }, researchController.streamSearch);
// 创建检索任务(异步模式,备用
// 创建检索任务(V1.x 异步模式)
fastify.post('/research/tasks', { preHandler: [authenticate, requireModule('ASL')] }, researchController.createTask);
// 获取任务状态(轮询)
// 获取任务状态(V1.x 轮询)
fastify.get('/research/tasks/:taskId/status', { preHandler: [authenticate, requireModule('ASL')] }, researchController.getTaskStatus);
// ==================== Deep Research V2.0 路由 ====================
// 获取可选数据源列表
fastify.get('/research/data-sources', { preHandler: [authenticate, requireModule('ASL')] }, deepResearchController.getDataSources);
// 需求扩写PICOS + MeSH
fastify.post('/research/generate-requirement', { preHandler: [authenticate, requireModule('ASL')] }, deepResearchController.generateRequirement);
// 启动异步执行
fastify.put('/research/tasks/:taskId/execute', { preHandler: [authenticate, requireModule('ASL')] }, deepResearchController.executeTask);
// V2.0 任务详情(状态 + 日志 + 结果)
fastify.get('/research/tasks/:taskId', { preHandler: [authenticate, requireModule('ASL')] }, deepResearchController.getTask);
// V2.0 导出 Word
fastify.get('/research/tasks/:taskId/export-word', { preHandler: [authenticate, requireModule('ASL')] }, deepResearchController.exportWord);
}

View File

@@ -0,0 +1,180 @@
/**
* Deep Research V2.0 — 需求扩写服务
*
* 职责:
* 1. 通过 Prompt 管理服务获取 ASL_DEEP_RESEARCH_EXPANSION 模板
* 2. 调用 LLM 将用户粗略想法扩写为 PICOS 结构化检索指令
* 3. 解析 LLM 输出为 generatedRequirement + intentSummary
* 4. 创建 DB 记录status=draft
*/
import { prisma } from '../../../config/database.js';
import { getPromptService } from '../../../common/prompt/index.js';
import { LLMFactory } from '../../../common/llm/adapters/LLMFactory.js';
import { logger } from '../../../common/logging/index.js';
import { getDefaultDataSources, hasEnglishOnlySource } from '../config/dataSources.js';
import type { Message } from '../../../common/llm/adapters/types.js';
export interface GenerateRequirementInput {
projectId: string;
userId: string;
originalQuery: string;
targetSources?: string[];
filters?: {
yearRange?: string;
targetCount?: string;
requireOpenAccess?: boolean;
};
}
export interface IntentSummary {
objective: string;
population: string;
intervention: string;
comparison: string;
outcome: string;
studyDesign: string[];
meshTerms: string[];
condition: string;
}
export interface GenerateRequirementResult {
taskId: string;
generatedRequirement: string;
intentSummary: IntentSummary;
}
class RequirementExpansionService {
async generateRequirement(input: GenerateRequirementInput): Promise<GenerateRequirementResult> {
const {
projectId,
userId,
originalQuery,
targetSources,
filters,
} = input;
const sources = targetSources && targetSources.length > 0
? targetSources
: getDefaultDataSources();
logger.info('[RequirementExpansion] Starting expansion', {
userId,
queryLength: originalQuery.length,
sources,
});
const promptService = getPromptService(prisma);
const rendered = await promptService.get(
'ASL_DEEP_RESEARCH_EXPANSION',
{
originalQuery,
targetSources: sources.join(', '),
yearRange: filters?.yearRange || '不限',
targetCount: filters?.targetCount || '全面检索',
},
{ userId }
);
const adapter = LLMFactory.getAdapter(
(rendered.modelConfig.model as any) || 'deepseek-v3'
);
const messages: Message[] = [
{ role: 'system', content: rendered.content },
{ role: 'user', content: originalQuery },
];
const llmResponse = await adapter.chat(messages, {
temperature: rendered.modelConfig.temperature ?? 0.4,
maxTokens: rendered.modelConfig.maxTokens ?? 4096,
});
const rawOutput = llmResponse.content;
const { requirement, intentSummary } = this.parseOutput(rawOutput);
const task = await prisma.aslResearchTask.create({
data: {
projectId,
userId,
query: originalQuery,
status: 'draft',
targetSources: sources as any,
confirmedRequirement: requirement,
aiIntentSummary: intentSummary as any,
filters: filters as any,
},
});
logger.info('[RequirementExpansion] Task created', {
taskId: task.id,
meshTerms: intentSummary.meshTerms?.length || 0,
});
return {
taskId: task.id,
generatedRequirement: requirement,
intentSummary,
};
}
private parseOutput(raw: string): {
requirement: string;
intentSummary: IntentSummary;
} {
let requirement = '';
let intentSummary: IntentSummary = {
objective: '',
population: '',
intervention: '',
comparison: '',
outcome: '',
studyDesign: [],
meshTerms: [],
condition: '',
};
const part1Match = raw.match(
/### Part 1[:][^\n]*\n([\s\S]*?)(?=### Part 2|$)/i
);
if (part1Match) {
requirement = part1Match[1].trim();
} else {
const jsonBlockStart = raw.indexOf('```json');
if (jsonBlockStart > 0) {
requirement = raw.slice(0, jsonBlockStart).trim();
} else {
requirement = raw.trim();
}
}
const jsonMatch = raw.match(/```json\s*([\s\S]*?)```/);
if (jsonMatch) {
try {
let cleaned = jsonMatch[1].trim();
cleaned = cleaned.replace(/,\s*([}\]])/g, '$1');
const parsed = JSON.parse(cleaned);
intentSummary = {
objective: parsed.objective || '',
population: parsed.population || '',
intervention: parsed.intervention || '',
comparison: parsed.comparison || '',
outcome: parsed.outcome || '',
studyDesign: Array.isArray(parsed.studyDesign) ? parsed.studyDesign : [],
meshTerms: Array.isArray(parsed.meshTerms) ? parsed.meshTerms : [],
condition: parsed.condition || '',
};
} catch (e) {
logger.warn('[RequirementExpansion] Failed to parse intent summary JSON', {
error: (e as Error).message,
});
}
}
return { requirement, intentSummary };
}
}
export const requirementExpansionService = new RequirementExpansionService();

View File

@@ -0,0 +1,155 @@
/**
* Unifuncs DeepSearch 异步客户端
*
* 封装 create_task / query_task 两个 API。
* Worker 通过此客户端与 Unifuncs 服务交互。
*/
import { logger } from '../../../common/logging/index.js';
const UNIFUNCS_BASE_URL = 'https://api.unifuncs.com/deepsearch/v1';
const UNIFUNCS_API_KEY = process.env.UNIFUNCS_API_KEY;
export interface CreateTaskParams {
query: string;
introduction?: string;
maxDepth?: number;
domainScope?: string[];
domainBlacklist?: string[];
referenceStyle?: 'link' | 'character';
generateSummary?: boolean;
outputPrompt?: string;
}
export interface CreateTaskResponse {
code: number;
message: string;
data: {
task_id: string;
status: string;
created_at: string;
};
}
export interface QueryTaskResponse {
code: number;
message: string;
data: {
task_id: string;
status: 'pending' | 'running' | 'completed' | 'failed';
result?: {
content: string;
reasoning_content: string;
};
created_at: string;
updated_at: string;
progress?: {
current: number;
total: number;
message: string;
};
statistics?: {
iterations: number;
search_count: number;
read_count: number;
token_usage?: {
prompt_tokens: number;
completion_tokens: number;
total_tokens: number;
};
};
};
}
class UnifuncsAsyncClient {
private apiKey: string;
private baseUrl: string;
constructor() {
this.apiKey = UNIFUNCS_API_KEY || '';
this.baseUrl = UNIFUNCS_BASE_URL;
}
async createTask(params: CreateTaskParams): Promise<CreateTaskResponse> {
if (!this.apiKey) {
throw new Error('UNIFUNCS_API_KEY not configured');
}
const payload = {
model: 's2',
messages: [{ role: 'user', content: params.query }],
introduction: params.introduction || this.defaultIntroduction(),
max_depth: params.maxDepth ?? 25,
domain_scope: params.domainScope || ['https://pubmed.ncbi.nlm.nih.gov/'],
domain_blacklist: params.domainBlacklist || [],
reference_style: params.referenceStyle || 'link',
generate_summary: params.generateSummary ?? true,
...(params.outputPrompt ? { output_prompt: params.outputPrompt } : {}),
};
logger.info('[UnifuncsClient] Creating task', {
queryLen: params.query.length,
domainScope: payload.domain_scope,
maxDepth: payload.max_depth,
});
const res = await fetch(`${this.baseUrl}/create_task`, {
method: 'POST',
headers: {
'Authorization': `Bearer ${this.apiKey}`,
'Content-Type': 'application/json',
},
body: JSON.stringify(payload),
});
if (!res.ok) {
const text = await res.text();
throw new Error(`create_task HTTP ${res.status}: ${text}`);
}
const json = await res.json() as CreateTaskResponse;
if (json.code !== 0) {
throw new Error(`create_task error: ${json.message}`);
}
logger.info('[UnifuncsClient] Task created', {
externalTaskId: json.data.task_id,
});
return json;
}
async queryTask(taskId: string): Promise<QueryTaskResponse> {
if (!this.apiKey) {
throw new Error('UNIFUNCS_API_KEY not configured');
}
const params = new URLSearchParams({ task_id: taskId });
const res = await fetch(`${this.baseUrl}/query_task?${params.toString()}`, {
headers: {
'Authorization': `Bearer ${this.apiKey}`,
},
});
if (!res.ok) {
const text = await res.text();
throw new Error(`query_task HTTP ${res.status}: ${text}`);
}
const json = await res.json() as QueryTaskResponse;
if (json.code !== 0) {
throw new Error(`query_task error: ${json.message}`);
}
return json;
}
private defaultIntroduction(): string {
return '你是一名专业的临床研究文献检索专家,擅长从多个学术数据库中检索高质量的医学文献。请根据用户的检索需求,系统性地搜索并返回相关文献的详细信息。';
}
}
export const unifuncsAsyncClient = new UnifuncsAsyncClient();

View File

@@ -0,0 +1,122 @@
/**
* Unifuncs DeepSearch SSE 流式客户端
*
* 通过 OpenAI 兼容协议的 SSE 流获取实时 reasoning_content 和 content。
* Worker 消费此流可实现逐条实时写入 executionLogs。
*/
import { logger } from '../../../common/logging/index.js';
const UNIFUNCS_BASE_URL = 'https://api.unifuncs.com/deepsearch/v1';
const UNIFUNCS_API_KEY = process.env.UNIFUNCS_API_KEY;
export interface SseStreamParams {
query: string;
introduction?: string;
maxDepth?: number;
domainScope?: string[];
domainBlacklist?: string[];
referenceStyle?: 'link' | 'character';
outputPrompt?: string;
}
export interface SseChunk {
type: 'reasoning' | 'content' | 'done' | 'error';
text: string;
}
/**
* 以 AsyncGenerator 形式逐 chunk 返回 Unifuncs SSE 流数据
*/
export async function* streamDeepSearch(
params: SseStreamParams
): AsyncGenerator<SseChunk> {
const apiKey = UNIFUNCS_API_KEY;
if (!apiKey) throw new Error('UNIFUNCS_API_KEY not configured');
const payload: Record<string, any> = {
model: 's2',
messages: [{ role: 'user', content: params.query }],
stream: true,
introduction: params.introduction || defaultIntroduction(),
max_depth: params.maxDepth ?? 25,
domain_scope: params.domainScope || ['https://pubmed.ncbi.nlm.nih.gov/'],
domain_blacklist: params.domainBlacklist || [],
reference_style: params.referenceStyle || 'link',
};
if (params.outputPrompt) {
payload.output_prompt = params.outputPrompt;
}
logger.info('[UnifuncsSse] Starting SSE stream', {
queryLen: params.query.length,
domainScope: payload.domain_scope,
maxDepth: payload.max_depth,
});
const response = await fetch(`${UNIFUNCS_BASE_URL}/chat/completions`, {
method: 'POST',
headers: {
'Authorization': `Bearer ${apiKey}`,
'Content-Type': 'application/json',
'Accept': 'text/event-stream',
},
body: JSON.stringify(payload),
});
if (!response.ok) {
const text = await response.text();
throw new Error(`SSE request failed HTTP ${response.status}: ${text}`);
}
if (!response.body) {
throw new Error('Response body is null');
}
const reader = (response.body as any).getReader() as ReadableStreamDefaultReader<Uint8Array>;
const decoder = new TextDecoder();
let buffer = '';
try {
while (true) {
const { done, value } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
const lines = buffer.split('\n');
buffer = lines.pop() || '';
for (const line of lines) {
const trimmed = line.trim();
if (!trimmed || trimmed.startsWith(':')) continue;
if (trimmed.startsWith('data: ')) {
const data = trimmed.slice(6);
if (data === '[DONE]') {
yield { type: 'done' as const, text: '' };
return;
}
try {
const json = JSON.parse(data);
const delta = json.choices?.[0]?.delta;
if (delta?.reasoning_content) {
yield { type: 'reasoning' as const, text: delta.reasoning_content };
} else if (delta?.content) {
yield { type: 'content' as const, text: delta.content };
}
} catch {
// Skip unparseable chunks
}
}
}
}
} finally {
reader.releaseLock();
}
}
function defaultIntroduction(): string {
return '你是一名专业的临床研究文献检索专家,擅长从多个学术数据库中检索高质量的医学文献。请根据用户的检索需求,系统性地搜索并返回相关文献的详细信息。';
}

View File

@@ -0,0 +1,140 @@
/**
* Deep Research V2.0 — Word 导出服务
*
* 将 synthesisReport + resultList 拼接为完整 Markdown
* 调用 Python 微服务Pandoc转换为 .docx。
*
* 文献清单中标题直接作为超链接,确保 Word 中可点击。
*/
import axios from 'axios';
import { prisma } from '../../../config/database.js';
import { logger } from '../../../common/logging/index.js';
import type { LiteratureItem } from '../utils/resultParser.js';
const EXTRACTION_SERVICE_URL = process.env.EXTRACTION_SERVICE_URL || 'http://localhost:8000';
class WordExportService {
async exportTaskToWord(taskId: string): Promise<{
buffer: Buffer;
filename: string;
}> {
const task = await prisma.aslResearchTask.findUnique({
where: { id: taskId },
});
if (!task) throw new Error('任务不存在');
if (task.status !== 'completed') throw new Error('任务尚未完成');
const markdown = this.buildMarkdown(
task.query,
task.synthesisReport,
task.resultList as LiteratureItem[] | null,
task.completedAt,
);
const docxBuffer = await this.convertToDocx(markdown, task.query);
const safeQuery = task.query.replace(/[^\u4e00-\u9fa5a-zA-Z0-9]/g, '').slice(0, 30);
const dateStr = new Date().toISOString().slice(0, 10).replace(/-/g, '');
const filename = `DeepResearch_${safeQuery}_${dateStr}.docx`;
return { buffer: docxBuffer, filename };
}
private buildMarkdown(
query: string,
report: string | null,
resultList: LiteratureItem[] | null,
completedAt: Date | null,
): string {
const parts: string[] = [];
parts.push(`# Deep Research 报告\n`);
parts.push(`检索主题: ${query}\n`);
parts.push(`生成时间: ${completedAt ? new Date(completedAt).toLocaleString('zh-CN') : new Date().toLocaleString('zh-CN')}\n`);
parts.push('---\n');
if (report) {
parts.push('## 综合分析报告\n');
let cleaned = report.replace(/\*\*([^*]+)\*\*/g, '$1');
cleaned = this.expandReferenceLinks(cleaned);
parts.push(cleaned);
parts.push('\n');
}
if (resultList && resultList.length > 0) {
parts.push('---\n');
parts.push(`## 文献清单(共 ${resultList.length} 篇)\n`);
resultList.forEach((item, idx) => {
const title = item.title || '(无标题)';
const url = item.url || (item.pmid ? `https://pubmed.ncbi.nlm.nih.gov/${item.pmid}/` : '');
const titleLine = url ? `[${title}](${url})` : title;
parts.push(`### ${idx + 1}. ${titleLine}\n`);
const details: string[] = [];
if (item.authors) details.push(`作者: ${item.authors}`);
if (item.journal) details.push(`期刊: ${item.journal}`);
if (item.year) details.push(`年份: ${item.year}`);
if (item.studyType) details.push(`研究类型: ${item.studyType}`);
if (item.pmid) details.push(`PMID: ${item.pmid}`);
if (details.length > 0) {
parts.push(details.join(' | '));
}
if (url) {
parts.push(`\n链接: ${url}`);
}
parts.push('\n');
});
}
parts.push('---\n');
parts.push('*本报告由 AI Clinical Research 平台 Deep Research 引擎自动生成*\n');
return parts.join('\n');
}
/**
* 将 [[N]](url) 格式的引用链接展开为 [N] url 形式,
* 使 Word 中引用旁边可见完整 URL。
*/
private expandReferenceLinks(text: string): string {
return text.replace(
/\[\[(\d+)\]\]\((https?:\/\/[^\s)]+)\)/g,
'[$1]($2) ($2)'
);
}
private async convertToDocx(markdown: string, title: string): Promise<Buffer> {
try {
logger.info('[WordExport] Converting Markdown → Word');
const response = await axios.post(
`${EXTRACTION_SERVICE_URL}/api/convert/docx`,
{
content: markdown,
use_template: true,
title: `Deep Research: ${title.slice(0, 50)}`,
},
{
responseType: 'arraybuffer',
timeout: 30000,
}
);
logger.info(`[WordExport] Conversion success, size: ${response.data.length} bytes`);
return Buffer.from(response.data);
} catch (error) {
logger.error('[WordExport] Conversion failed:', error);
throw new Error(`Word 转换失败: ${error instanceof Error ? error.message : 'Unknown error'}`);
}
}
}
export const wordExportService = new WordExportService();

View File

@@ -0,0 +1,115 @@
/**
* Reasoning Content 解析器
*
* 将 Unifuncs 返回的 reasoning_content 增量文本解析为结构化日志条目。
*
* 核心策略:按段落(\n\n拆分同一段落内的思考内容合并为一条日志
* 只有 搜索/阅读/分析 等动作才单独成条。
*/
import { logger } from '../../../common/logging/index.js';
export interface ExecutionLogEntry {
type: 'thinking' | 'searching' | 'reading' | 'analyzing' | 'summary' | 'info';
title: string;
text: string;
ts: string;
}
const SEARCH_PATTERN = /(?:搜索|searching|search(?:ing)?\s+for|查找|检索|looking\s+for)[:\s]+(.+)/i;
const READ_PATTERN = /(?:阅读|reading|read(?:ing)?|访问|打开|visiting|open(?:ing)?)\s*[:\s]*(https?:\/\/\S+|\S+\.(?:com|org|net|gov|cn)\S*)/i;
const ANALYZE_PATTERN = /(?:分析|analyz|发现|总结|归纳|结论|found|result|finding|conclud|summariz)/i;
/**
* 将增量 reasoning 文本解析为段落级日志条目。
* 连续的思考行合并为一段,动作行(搜索/阅读/分析)独立成条。
*/
export function parseReasoningIncrement(
newText: string,
_previousLength: number
): ExecutionLogEntry[] {
if (!newText) return [];
const entries: ExecutionLogEntry[] = [];
const now = new Date().toISOString();
const paragraphs = newText.split(/\n{2,}/);
for (const para of paragraphs) {
const lines = para.split('\n').filter(l => l.trim());
if (lines.length === 0) continue;
let thinkingBuf: string[] = [];
const flushThinking = () => {
if (thinkingBuf.length === 0) return;
const text = thinkingBuf.join('').slice(0, 800);
if (text.length > 10) {
entries.push({ type: 'thinking', title: '思考', text, ts: now });
}
thinkingBuf = [];
};
for (const line of lines) {
const trimmed = line.trim();
if (!trimmed) continue;
const searchMatch = trimmed.match(SEARCH_PATTERN);
if (searchMatch) {
flushThinking();
entries.push({ type: 'searching', title: '搜索', text: searchMatch[1].trim(), ts: now });
continue;
}
const readMatch = trimmed.match(READ_PATTERN);
if (readMatch) {
flushThinking();
entries.push({ type: 'reading', title: '阅读页面', text: readMatch[1].trim(), ts: now });
continue;
}
if (ANALYZE_PATTERN.test(trimmed) && trimmed.length > 20) {
flushThinking();
entries.push({ type: 'analyzing', title: '分析', text: trimmed.slice(0, 500), ts: now });
continue;
}
thinkingBuf.push(trimmed);
}
flushThinking();
}
return entries;
}
/**
* 合并连续的同类型thinking条目为一段。
* 在 Worker 写入 DB 前调用,减少碎片化。
*/
export function mergeConsecutiveThinking(entries: ExecutionLogEntry[]): ExecutionLogEntry[] {
if (entries.length <= 1) return entries;
const merged: ExecutionLogEntry[] = [];
let current = { ...entries[0] };
for (let i = 1; i < entries.length; i++) {
if (entries[i].type === 'thinking' && current.type === 'thinking') {
current.text = (current.text + ' ' + entries[i].text).slice(0, 800);
} else {
merged.push(current);
current = { ...entries[i] };
}
}
merged.push(current);
return merged;
}
/**
* 从完整的 reasoning_content 一次性提取摘要级日志
*/
export function parseFullReasoning(fullText: string): ExecutionLogEntry[] {
if (!fullText) return [];
return parseReasoningIncrement(fullText, 0);
}

View File

@@ -0,0 +1,113 @@
/**
* Deep Research V2.0 — 结果解析器
*
* 职责:
* 1. 从 Unifuncs content 中切割出 synthesisReport + resultList
* 2. safeParseJsonList: 4 层防崩溃 JSON 解析
*/
import { logger } from '../../../common/logging/index.js';
export interface LiteratureItem {
title: string;
authors?: string;
journal?: string;
year?: number | string;
doi?: string;
pmid?: string;
url?: string;
abstract?: string;
studyType?: string;
}
/**
* 从 Unifuncs 返回的 content 中拆分报告和文献列表
*/
export function parseContent(content: string): {
synthesisReport: string;
resultList: LiteratureItem[] | null;
} {
if (!content) {
return { synthesisReport: '', resultList: null };
}
const jsonBlockMatch = content.match(/```json\s*([\s\S]*?)```/);
if (jsonBlockMatch) {
const beforeJson = content.slice(0, content.indexOf('```json')).trim();
const jsonRaw = jsonBlockMatch[1];
const resultList = safeParseJsonList(jsonRaw);
const afterJsonEnd = content.indexOf('```', content.indexOf('```json') + 7) + 3;
const afterJson = content.slice(afterJsonEnd).trim();
const synthesisReport = (beforeJson + (afterJson ? '\n\n' + afterJson : '')).trim();
return { synthesisReport: synthesisReport || content, resultList };
}
const links = extractPubMedLinks(content);
if (links.length > 0) {
const resultList: LiteratureItem[] = links.map(url => ({
title: '',
url,
pmid: extractPmidFromUrl(url) || undefined,
}));
return { synthesisReport: content, resultList };
}
return { synthesisReport: content, resultList: null };
}
/**
* 4 层防崩溃 JSON 解析
*/
export function safeParseJsonList(raw: string | null): LiteratureItem[] | null {
if (!raw) return null;
let cleaned = raw.replace(/```json\s*/gi, '').replace(/```\s*/g, '');
cleaned = cleaned.replace(/,\s*([}\]])/g, '$1');
try {
const parsed = JSON.parse(cleaned);
return Array.isArray(parsed) ? parsed : [parsed];
} catch {
logger.warn('[resultParser] Standard JSON.parse failed, trying regex extraction');
}
const objects: any[] = [];
const regex = /\{[^{}]*\}/g;
let match;
while ((match = regex.exec(cleaned)) !== null) {
try {
objects.push(JSON.parse(match[0]));
} catch {
// skip unparseable fragment
}
}
if (objects.length > 0) {
logger.info('[resultParser] Regex extraction recovered items', { count: objects.length });
return objects;
}
logger.warn('[resultParser] All parsing strategies failed');
return null;
}
function extractPubMedLinks(content: string): string[] {
const linkSet = new Set<string>();
const pattern = /https?:\/\/pubmed\.ncbi\.nlm\.nih\.gov\/(\d+)\/?/gi;
let match;
while ((match = pattern.exec(content)) !== null) {
linkSet.add(`https://pubmed.ncbi.nlm.nih.gov/${match[1]}/`);
}
return Array.from(linkSet);
}
function extractPmidFromUrl(url: string): string | null {
const m = url.match(/pubmed\.ncbi\.nlm\.nih\.gov\/(\d+)/);
return m ? m[1] : null;
}

View File

@@ -0,0 +1,223 @@
/**
* Deep Research V2.0 Worker — SSE 流式架构
*
* 核心流程:
* 1. 从 DB 读取任务(含 confirmedRequirement + targetSources
* 2. 通过 SSE 流连接 Unifuncs OpenAI 兼容接口
* 3. 实时读取 reasoning_content 增量 → 解析为 executionLogs → 每 2s 刷写 DB
* 4. 读取 content 增量 → 累积
* 5. 流结束后解析 content → synthesisReport + resultList
* 6. 超时保护: 15 分钟
*/
import { prisma } from '../../../config/database.js';
import { logger } from '../../../common/logging/index.js';
import { streamDeepSearch } from '../services/unifuncsSseClient.js';
import { parseReasoningIncrement, mergeConsecutiveThinking, type ExecutionLogEntry } from '../utils/reasoningParser.js';
import { parseContent } from '../utils/resultParser.js';
const MAX_DURATION_MS = 15 * 60 * 1000;
const LOG_FLUSH_INTERVAL_MS = 2000;
export async function processDeepResearchV2(job: { data: { taskId: string } }) {
const { taskId } = job.data;
logger.info('[DeepResearchV2Worker] Starting', { taskId });
const task = await prisma.aslResearchTask.findUnique({
where: { id: taskId },
});
if (!task) {
logger.error('[DeepResearchV2Worker] Task not found', { taskId });
return;
}
const searchQuery = task.confirmedRequirement || task.query;
const targetSources = (task.targetSources as string[]) || ['https://pubmed.ncbi.nlm.nih.gov/'];
try {
await prisma.aslResearchTask.update({
where: { id: taskId },
data: { status: 'running', executionLogs: [] as any },
});
const hasChinese = targetSources.some(
s => s.includes('cnki') || s.includes('medjournals')
);
const outputPrompt = buildOutputPrompt(hasChinese);
const allLogs: ExecutionLogEntry[] = [{
type: 'info',
title: '任务已提交',
text: '正在连接深度检索引擎...',
ts: new Date().toISOString(),
}];
await updateLogs(taskId, allLogs);
const startTime = Date.now();
let reasoningAccumulated = '';
let contentAccumulated = '';
let lastFlushTime = Date.now();
let previousReasoningLength = 0;
let pendingLogEntries: ExecutionLogEntry[] = [];
let contentStarted = false;
const stream = streamDeepSearch({
query: searchQuery,
domainScope: targetSources,
maxDepth: 25,
referenceStyle: 'link',
outputPrompt,
});
allLogs.push({
type: 'info',
title: '连接成功',
text: 'AI 开始深度检索,正在分析检索策略...',
ts: new Date().toISOString(),
});
await updateLogs(taskId, allLogs);
for await (const chunk of stream) {
if (Date.now() - startTime > MAX_DURATION_MS) {
throw new Error('Task exceeded 15-minute timeout');
}
if (chunk.type === 'reasoning') {
reasoningAccumulated += chunk.text;
const newText = reasoningAccumulated.slice(previousReasoningLength);
if (newText.length > 200) {
const newEntries = parseReasoningIncrement(newText, previousReasoningLength);
if (newEntries.length > 0) {
pendingLogEntries.push(...newEntries);
}
previousReasoningLength = reasoningAccumulated.length;
}
if (pendingLogEntries.length > 0 && Date.now() - lastFlushTime >= LOG_FLUSH_INTERVAL_MS) {
const merged = mergeConsecutiveThinking(pendingLogEntries);
allLogs.push(...merged);
pendingLogEntries = [];
await updateLogs(taskId, allLogs);
lastFlushTime = Date.now();
}
} else if (chunk.type === 'content') {
contentAccumulated += chunk.text;
if (pendingLogEntries.length > 0) {
allLogs.push(...pendingLogEntries);
pendingLogEntries = [];
}
if (!contentStarted) {
contentStarted = true;
allLogs.push({
type: 'summary',
title: '思考完毕',
text: '正在生成综合分析报告...',
ts: new Date().toISOString(),
});
await updateLogs(taskId, allLogs);
}
} else if (chunk.type === 'done') {
break;
}
}
if (pendingLogEntries.length > 0) {
const remaining = reasoningAccumulated.slice(previousReasoningLength);
if (remaining.length > 0) {
const lastEntries = parseReasoningIncrement(remaining, previousReasoningLength);
pendingLogEntries.push(...lastEntries);
}
allLogs.push(...mergeConsecutiveThinking(pendingLogEntries));
pendingLogEntries = [];
}
allLogs.push({
type: 'summary',
title: '检索完成',
text: 'AI 深度检索已完成,正在整理结果...',
ts: new Date().toISOString(),
});
await updateLogs(taskId, allLogs);
const { synthesisReport, resultList } = parseContent(contentAccumulated);
await prisma.aslResearchTask.update({
where: { id: taskId },
data: {
status: 'completed',
rawResult: contentAccumulated,
reasoningContent: reasoningAccumulated,
synthesisReport,
resultList: resultList as any,
resultCount: resultList?.length || 0,
executionLogs: allLogs as any,
completedAt: new Date(),
},
});
logger.info('[DeepResearchV2Worker] Completed', {
taskId,
resultCount: resultList?.length || 0,
reasoningLen: reasoningAccumulated.length,
contentLen: contentAccumulated.length,
duration: Date.now() - startTime,
});
} catch (error: any) {
logger.error('[DeepResearchV2Worker] Failed', {
taskId,
error: error.message,
});
await prisma.aslResearchTask.update({
where: { id: taskId },
data: {
status: 'failed',
errorMessage: error.message,
},
});
}
}
function buildOutputPrompt(hasChinese: boolean): string {
let prompt = `请按以下格式输出:
## 综合分析报告
(对检索到的文献进行综合分析,包括研究现状、主要发现、研究趋势等。引用文献时请使用内联链接格式 [标题](url),不要使用脚注式引用。)
## 文献清单
请以 JSON 数组格式输出所有检索到的文献元数据:
\`\`\`json
[
{
"title": "文献标题",
"authors": "第一作者 et al.",
"journal": "期刊名",
"year": 2024,
"doi": "10.xxxx/xxxxx",
"pmid": "12345678",
"url": "文献完整链接",
"abstract": "摘要前200字...",
"studyType": "RCT"
}
]
\`\`\``;
if (hasChinese) {
prompt += `\n\n重要提示用户选择了中文数据库CNKI/中华医学期刊网请务必使用中文关键词检索中文文献确保结果中包含中文文献。如有中文文献请保留中文标题和作者信息url 使用原始中文数据库链接。`;
}
return prompt;
}
async function updateLogs(taskId: string, logs: ExecutionLogEntry[]) {
await prisma.aslResearchTask.update({
where: { id: taskId },
data: { executionLogs: logs as any },
});
}

View File

@@ -8,6 +8,7 @@
import { jobQueue } from '../../../common/jobs/index.js';
import { logger } from '../../../common/logging/index.js';
import { researchService } from '../services/researchService.js';
import { processDeepResearchV2 } from './deepResearchV2Worker.js';
import type { Job } from '../../../common/jobs/types.js';
/**
@@ -79,4 +80,18 @@ export function registerResearchWorker() {
});
logger.info('[ResearchWorker] ✅ Worker registered: asl_research_execute');
// ── Deep Research V2.0 Worker ──
jobQueue.process<{ taskId: string }>('asl_deep_research_v2', async (job: Job<{ taskId: string }>) => {
logger.info('[ResearchWorker] Starting V2 deep research', {
jobId: job.id,
taskId: job.data.taskId,
});
await processDeepResearchV2(job);
return { success: true, taskId: job.data.taskId };
});
logger.info('[ResearchWorker] ✅ Worker registered: asl_deep_research_v2');
}