feat(asl): Add DeepSearch smart literature retrieval MVP
Features: - Integrate unifuncs DeepSearch API (OpenAI compatible protocol) - SSE real-time streaming for AI thinking process display - Natural language input, auto-generate PubMed search strategy - Extract and display PubMed literature links - Database storage for task records (asl_research_tasks) Backend: - researchService.ts - Core business logic with SSE streaming - researchController.ts - SSE stream endpoint - researchWorker.ts - Async task worker (backup mode) - schema.prisma - AslResearchTask model Frontend: - ResearchSearch.tsx - Search page with unified content stream - ResearchSearch.css - Styling (unifuncs-inspired simple design) - ASLLayout.tsx - Enable menu item - api/index.ts - Add research API functions API Endpoints: - POST /api/v1/asl/research/stream - SSE streaming search - POST /api/v1/asl/research/tasks - Async task creation - GET /api/v1/asl/research/tasks/:taskId/status - Task status Documentation: - Development record for DeepSearch integration - Update ASL module status (v1.5) - Update system status (v3.7) Known limitations: - SSE mode, task interrupts when leaving page - Cost ~0.3 RMB per search (unifuncs API)
This commit is contained in:
137
backend/src/modules/asl/controllers/researchController.ts
Normal file
137
backend/src/modules/asl/controllers/researchController.ts
Normal file
@@ -0,0 +1,137 @@
|
||||
/**
|
||||
* 智能文献检索 Controller
|
||||
*
|
||||
* SSE 流式 + PubMed 链接提取
|
||||
*/
|
||||
|
||||
import { FastifyRequest, FastifyReply } from 'fastify';
|
||||
import { researchService } from '../services/researchService.js';
|
||||
import { logger } from '../../../common/logging/index.js';
|
||||
|
||||
interface SearchBody {
|
||||
projectId: string;
|
||||
query: string;
|
||||
}
|
||||
|
||||
/**
|
||||
* POST /api/v1/asl/research/stream
|
||||
* SSE 实时流式检索
|
||||
*/
|
||||
export async function streamSearch(
|
||||
request: FastifyRequest<{ Body: SearchBody }>,
|
||||
reply: FastifyReply
|
||||
) {
|
||||
const { projectId, query } = request.body;
|
||||
const userId = request.user?.userId;
|
||||
|
||||
if (!userId) {
|
||||
return reply.code(401).send({ success: false, error: '用户未认证' });
|
||||
}
|
||||
|
||||
if (!query?.trim()) {
|
||||
return reply.code(400).send({ success: false, error: '请输入检索问题' });
|
||||
}
|
||||
|
||||
logger.info('[ResearchController] Starting SSE stream', { userId, queryLength: query.length });
|
||||
|
||||
// 设置 SSE 响应头
|
||||
reply.raw.writeHead(200, {
|
||||
'Content-Type': 'text/event-stream',
|
||||
'Cache-Control': 'no-cache',
|
||||
'Connection': 'keep-alive',
|
||||
'Access-Control-Allow-Origin': '*',
|
||||
});
|
||||
|
||||
try {
|
||||
const task = await researchService.createTaskRecord(projectId, userId, query);
|
||||
|
||||
reply.raw.write(`data: ${JSON.stringify({ type: 'task_created', taskId: task.id })}\n\n`);
|
||||
|
||||
await researchService.executeStreamSearch(
|
||||
task.id,
|
||||
query,
|
||||
// 思考过程(统一追加)
|
||||
(reasoning: string) => {
|
||||
reply.raw.write(`data: ${JSON.stringify({ type: 'reasoning', content: reasoning })}\n\n`);
|
||||
},
|
||||
// 结果内容(统一追加)
|
||||
(content: string) => {
|
||||
reply.raw.write(`data: ${JSON.stringify({ type: 'content', content })}\n\n`);
|
||||
},
|
||||
// 完成(返回链接列表)
|
||||
(result: { links: string[] }) => {
|
||||
reply.raw.write(`data: ${JSON.stringify({ type: 'completed', links: result.links })}\n\n`);
|
||||
reply.raw.end();
|
||||
},
|
||||
// 错误
|
||||
(error: string) => {
|
||||
reply.raw.write(`data: ${JSON.stringify({ type: 'error', error })}\n\n`);
|
||||
reply.raw.end();
|
||||
}
|
||||
);
|
||||
} catch (error: any) {
|
||||
logger.error('[ResearchController] Stream search failed', { error: error.message });
|
||||
reply.raw.write(`data: ${JSON.stringify({ type: 'error', error: error.message })}\n\n`);
|
||||
reply.raw.end();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* POST /api/v1/asl/research/tasks
|
||||
* 创建检索任务(异步模式)
|
||||
*/
|
||||
export async function createTask(
|
||||
request: FastifyRequest<{ Body: SearchBody }>,
|
||||
reply: FastifyReply
|
||||
) {
|
||||
try {
|
||||
const { projectId, query } = request.body;
|
||||
const userId = request.user?.userId;
|
||||
|
||||
if (!userId) {
|
||||
return reply.code(401).send({ success: false, error: '用户未认证' });
|
||||
}
|
||||
|
||||
if (!projectId) {
|
||||
return reply.code(400).send({ success: false, error: '缺少 projectId' });
|
||||
}
|
||||
|
||||
if (!query?.trim()) {
|
||||
return reply.code(400).send({ success: false, error: '请输入检索问题' });
|
||||
}
|
||||
|
||||
const task = await researchService.createTask({ projectId, userId, query: query.trim() });
|
||||
return reply.send({ success: true, data: task });
|
||||
} catch (error: any) {
|
||||
logger.error('[ResearchController] Create task failed', { error: error.message });
|
||||
return reply.code(500).send({ success: false, error: error.message });
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* GET /api/v1/asl/research/tasks/:taskId/status
|
||||
* 获取任务状态
|
||||
*/
|
||||
export async function getTaskStatus(
|
||||
request: FastifyRequest<{ Params: { taskId: string } }>,
|
||||
reply: FastifyReply
|
||||
) {
|
||||
try {
|
||||
const { taskId } = request.params;
|
||||
|
||||
if (!taskId) {
|
||||
return reply.code(400).send({ success: false, error: '缺少 taskId' });
|
||||
}
|
||||
|
||||
const status = await researchService.getTaskStatus(taskId);
|
||||
|
||||
if (!status) {
|
||||
return reply.code(404).send({ success: false, error: '任务不存在' });
|
||||
}
|
||||
|
||||
return reply.send({ success: true, data: status });
|
||||
} catch (error: any) {
|
||||
logger.error('[ResearchController] Get task status failed', { error: error.message });
|
||||
return reply.code(500).send({ success: false, error: error.message });
|
||||
}
|
||||
}
|
||||
@@ -7,6 +7,7 @@ import * as projectController from '../controllers/projectController.js';
|
||||
import * as literatureController from '../controllers/literatureController.js';
|
||||
import * as screeningController from '../controllers/screeningController.js';
|
||||
import * as fulltextScreeningController from '../fulltext-screening/controllers/FulltextScreeningController.js';
|
||||
import * as researchController from '../controllers/researchController.js';
|
||||
import { authenticate, requireModule } from '../../../common/auth/auth.middleware.js';
|
||||
|
||||
export async function aslRoutes(fastify: FastifyInstance) {
|
||||
@@ -77,6 +78,17 @@ export async function aslRoutes(fastify: FastifyInstance) {
|
||||
|
||||
// 导出Excel
|
||||
fastify.get('/fulltext-screening/tasks/:taskId/export', { preHandler: [authenticate, requireModule('ASL')] }, fulltextScreeningController.exportExcel);
|
||||
|
||||
// ==================== 智能文献检索路由 (DeepSearch) ====================
|
||||
|
||||
// SSE 流式检索(推荐,实时显示思考过程)
|
||||
fastify.post('/research/stream', { preHandler: [authenticate, requireModule('ASL')] }, researchController.streamSearch);
|
||||
|
||||
// 创建检索任务(异步模式,备用)
|
||||
fastify.post('/research/tasks', { preHandler: [authenticate, requireModule('ASL')] }, researchController.createTask);
|
||||
|
||||
// 获取任务状态(轮询)
|
||||
fastify.get('/research/tasks/:taskId/status', { preHandler: [authenticate, requireModule('ASL')] }, researchController.getTaskStatus);
|
||||
}
|
||||
|
||||
|
||||
|
||||
309
backend/src/modules/asl/services/researchService.ts
Normal file
309
backend/src/modules/asl/services/researchService.ts
Normal file
@@ -0,0 +1,309 @@
|
||||
/**
|
||||
* 智能文献检索服务(DeepSearch)
|
||||
*
|
||||
* SSE 流式 + 提取 PubMed 链接
|
||||
*/
|
||||
|
||||
import { prisma } from '../../../config/database.js';
|
||||
import { jobQueue } from '../../../common/jobs/index.js';
|
||||
import { logger } from '../../../common/logging/index.js';
|
||||
import OpenAI from 'openai';
|
||||
|
||||
const UNIFUNCS_API_KEY = process.env.UNIFUNCS_API_KEY;
|
||||
const UNIFUNCS_BASE_URL = 'https://api.unifuncs.com/deepsearch/v1';
|
||||
|
||||
class ResearchService {
|
||||
private client: OpenAI;
|
||||
|
||||
constructor() {
|
||||
this.client = new OpenAI({
|
||||
baseURL: UNIFUNCS_BASE_URL,
|
||||
apiKey: UNIFUNCS_API_KEY || '',
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* 创建任务记录
|
||||
*/
|
||||
async createTaskRecord(projectId: string, userId: string, query: string) {
|
||||
const task = await prisma.aslResearchTask.create({
|
||||
data: {
|
||||
projectId,
|
||||
userId,
|
||||
query,
|
||||
status: 'processing',
|
||||
},
|
||||
});
|
||||
logger.info('[ResearchService] Task record created', { taskId: task.id });
|
||||
return task;
|
||||
}
|
||||
|
||||
/**
|
||||
* SSE 流式检索
|
||||
* 统一内容流 + 提取 PubMed 链接
|
||||
*/
|
||||
async executeStreamSearch(
|
||||
taskId: string,
|
||||
query: string,
|
||||
onReasoning: (content: string) => void,
|
||||
onContent: (content: string) => void,
|
||||
onComplete: (result: { links: string[] }) => void,
|
||||
onError: (error: string) => void
|
||||
) {
|
||||
if (!UNIFUNCS_API_KEY) {
|
||||
onError('UNIFUNCS_API_KEY 未配置');
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
const systemPrompt = this.buildSystemPrompt();
|
||||
let fullContent = '';
|
||||
|
||||
await prisma.aslResearchTask.update({
|
||||
where: { id: taskId },
|
||||
data: { status: 'searching' },
|
||||
});
|
||||
|
||||
const stream = await (this.client.chat.completions.create as any)({
|
||||
model: 's2',
|
||||
messages: [{ role: 'user', content: query }],
|
||||
stream: true,
|
||||
introduction: systemPrompt,
|
||||
max_depth: 15,
|
||||
domain_scope: ['https://pubmed.ncbi.nlm.nih.gov/'],
|
||||
domain_blacklist: ['wanfang.com', 'cnki.net'],
|
||||
reference_style: 'link',
|
||||
generate_summary: true,
|
||||
});
|
||||
|
||||
for await (const chunk of stream) {
|
||||
const delta = chunk.choices[0]?.delta;
|
||||
if (!delta) continue;
|
||||
|
||||
// 思考过程
|
||||
const reasoning = (delta as any).reasoning_content;
|
||||
if (reasoning) {
|
||||
fullContent += reasoning;
|
||||
onReasoning(reasoning);
|
||||
}
|
||||
|
||||
// 结果内容
|
||||
if (delta.content) {
|
||||
fullContent += delta.content;
|
||||
onContent(delta.content);
|
||||
}
|
||||
}
|
||||
|
||||
// 提取 PubMed 链接
|
||||
const links = this.extractPubMedLinks(fullContent);
|
||||
|
||||
// 更新数据库
|
||||
await prisma.aslResearchTask.update({
|
||||
where: { id: taskId },
|
||||
data: {
|
||||
status: 'completed',
|
||||
rawResult: fullContent,
|
||||
resultCount: links.length,
|
||||
literatures: links.map(link => ({ url: link })) as any,
|
||||
completedAt: new Date(),
|
||||
},
|
||||
});
|
||||
|
||||
logger.info('[ResearchService] Stream search completed', {
|
||||
taskId,
|
||||
linkCount: links.length
|
||||
});
|
||||
|
||||
onComplete({ links });
|
||||
|
||||
} catch (error: any) {
|
||||
logger.error('[ResearchService] Stream search failed', { taskId, error: error.message });
|
||||
|
||||
await prisma.aslResearchTask.update({
|
||||
where: { id: taskId },
|
||||
data: {
|
||||
status: 'failed',
|
||||
errorMessage: error.message,
|
||||
},
|
||||
});
|
||||
|
||||
onError(error.message || '检索失败');
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 提取 PubMed 链接
|
||||
*/
|
||||
private extractPubMedLinks(content: string): string[] {
|
||||
const linkSet = new Set<string>();
|
||||
|
||||
// 匹配 PubMed URL(各种格式)
|
||||
const patterns = [
|
||||
/https?:\/\/pubmed\.ncbi\.nlm\.nih\.gov\/(\d+)\/?/gi,
|
||||
/pubmed\.ncbi\.nlm\.nih\.gov\/(\d+)/gi,
|
||||
];
|
||||
|
||||
for (const pattern of patterns) {
|
||||
let match;
|
||||
while ((match = pattern.exec(content)) !== null) {
|
||||
const pmid = match[1];
|
||||
linkSet.add(`https://pubmed.ncbi.nlm.nih.gov/${pmid}/`);
|
||||
}
|
||||
}
|
||||
|
||||
return Array.from(linkSet);
|
||||
}
|
||||
|
||||
/**
|
||||
* 创建异步任务(备用)
|
||||
*/
|
||||
async createTask(params: { projectId: string; userId: string; query: string }) {
|
||||
const { projectId, userId, query } = params;
|
||||
|
||||
if (!UNIFUNCS_API_KEY) {
|
||||
throw new Error('UNIFUNCS_API_KEY 未配置');
|
||||
}
|
||||
|
||||
const task = await prisma.aslResearchTask.create({
|
||||
data: {
|
||||
projectId,
|
||||
userId,
|
||||
query,
|
||||
status: 'processing',
|
||||
},
|
||||
});
|
||||
|
||||
await jobQueue.push('asl_research_execute', {
|
||||
taskId: task.id,
|
||||
query,
|
||||
});
|
||||
|
||||
return { id: task.id, status: task.status };
|
||||
}
|
||||
|
||||
/**
|
||||
* 异步执行(Worker 调用)
|
||||
*/
|
||||
async executeSearch(taskId: string, query: string) {
|
||||
try {
|
||||
const systemPrompt = this.buildSystemPrompt();
|
||||
let fullContent = '';
|
||||
|
||||
await prisma.aslResearchTask.update({
|
||||
where: { id: taskId },
|
||||
data: { status: 'searching' },
|
||||
});
|
||||
|
||||
const stream = await (this.client.chat.completions.create as any)({
|
||||
model: 's2',
|
||||
messages: [{ role: 'user', content: query }],
|
||||
stream: true,
|
||||
introduction: systemPrompt,
|
||||
max_depth: 15,
|
||||
domain_scope: ['https://pubmed.ncbi.nlm.nih.gov/'],
|
||||
domain_blacklist: ['wanfang.com', 'cnki.net'],
|
||||
reference_style: 'link',
|
||||
generate_summary: true,
|
||||
});
|
||||
|
||||
for await (const chunk of stream) {
|
||||
const delta = chunk.choices[0]?.delta;
|
||||
if (!delta) continue;
|
||||
const reasoning = (delta as any).reasoning_content;
|
||||
if (reasoning) fullContent += reasoning;
|
||||
if (delta.content) fullContent += delta.content;
|
||||
}
|
||||
|
||||
const links = this.extractPubMedLinks(fullContent);
|
||||
|
||||
await prisma.aslResearchTask.update({
|
||||
where: { id: taskId },
|
||||
data: {
|
||||
status: 'completed',
|
||||
rawResult: fullContent,
|
||||
resultCount: links.length,
|
||||
literatures: links.map(link => ({ url: link })) as any,
|
||||
completedAt: new Date(),
|
||||
},
|
||||
});
|
||||
|
||||
return { success: true, linkCount: links.length };
|
||||
|
||||
} catch (error: any) {
|
||||
await prisma.aslResearchTask.update({
|
||||
where: { id: taskId },
|
||||
data: { status: 'failed', errorMessage: error.message },
|
||||
});
|
||||
return { success: false, error: error.message };
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取任务状态
|
||||
*/
|
||||
async getTaskStatus(taskId: string) {
|
||||
const task = await prisma.aslResearchTask.findUnique({
|
||||
where: { id: taskId },
|
||||
});
|
||||
|
||||
if (!task) return null;
|
||||
|
||||
let frontendStatus: 'processing' | 'ready' | 'error';
|
||||
let progress = 0;
|
||||
|
||||
switch (task.status) {
|
||||
case 'processing':
|
||||
case 'searching':
|
||||
frontendStatus = 'processing';
|
||||
progress = task.status === 'searching' ? 50 : 10;
|
||||
break;
|
||||
case 'completed':
|
||||
frontendStatus = 'ready';
|
||||
progress = 100;
|
||||
break;
|
||||
case 'failed':
|
||||
frontendStatus = 'error';
|
||||
break;
|
||||
default:
|
||||
frontendStatus = 'processing';
|
||||
progress = 10;
|
||||
}
|
||||
|
||||
// 提取链接
|
||||
const links: string[] = [];
|
||||
if (task.literatures && Array.isArray(task.literatures)) {
|
||||
for (const item of task.literatures as any[]) {
|
||||
if (item.url) links.push(item.url);
|
||||
}
|
||||
}
|
||||
|
||||
return {
|
||||
taskId: task.id,
|
||||
status: frontendStatus,
|
||||
progress,
|
||||
query: task.query,
|
||||
resultCount: task.resultCount,
|
||||
links,
|
||||
errorMessage: task.errorMessage,
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* 构建系统提示词
|
||||
*/
|
||||
private buildSystemPrompt(): string {
|
||||
return `你是一名专业的临床研究文献检索专家。请在 PubMed 中检索相关文献。
|
||||
|
||||
检索要求:
|
||||
1. 优先检索高质量研究:系统综述、Meta分析、RCT
|
||||
2. 关注 PICOS 要素
|
||||
3. 优先近5年的研究
|
||||
|
||||
输出要求:
|
||||
1. 返回每篇文献的 PubMed 链接
|
||||
2. 按研究类型分组
|
||||
3. 按相关性排序`;
|
||||
}
|
||||
}
|
||||
|
||||
export const researchService = new ResearchService();
|
||||
86
backend/src/modules/asl/workers/researchWorker.ts
Normal file
86
backend/src/modules/asl/workers/researchWorker.ts
Normal file
@@ -0,0 +1,86 @@
|
||||
/**
|
||||
* 智能文献检索 Worker
|
||||
*
|
||||
* ✅ 使用 OpenAI 兼容协议(已验证成功)
|
||||
* ✅ 严格遵循 Postgres-Only 异步任务处理指南
|
||||
*/
|
||||
|
||||
import { jobQueue } from '../../../common/jobs/index.js';
|
||||
import { logger } from '../../../common/logging/index.js';
|
||||
import { researchService } from '../services/researchService.js';
|
||||
import type { Job } from '../../../common/jobs/types.js';
|
||||
|
||||
/**
|
||||
* 检索任务数据结构
|
||||
*/
|
||||
interface ResearchExecuteJob {
|
||||
taskId: string;
|
||||
query: string;
|
||||
}
|
||||
|
||||
/**
|
||||
* 注册智能文献检索 Worker
|
||||
*/
|
||||
export function registerResearchWorker() {
|
||||
logger.info('[ResearchWorker] Registering worker');
|
||||
|
||||
// 注册执行任务的 Worker
|
||||
jobQueue.process<ResearchExecuteJob>('asl_research_execute', async (job: Job<ResearchExecuteJob>) => {
|
||||
const { taskId, query } = job.data;
|
||||
|
||||
logger.info('[ResearchWorker] Starting search', {
|
||||
jobId: job.id,
|
||||
taskId,
|
||||
queryLength: query.length,
|
||||
});
|
||||
|
||||
try {
|
||||
// 执行检索(使用 OpenAI 兼容协议 streaming)
|
||||
const result = await researchService.executeSearch(taskId, query);
|
||||
|
||||
if (result.success) {
|
||||
logger.info('[ResearchWorker] ✅ Search completed', {
|
||||
taskId,
|
||||
resultCount: result.resultCount,
|
||||
});
|
||||
|
||||
return {
|
||||
success: true,
|
||||
taskId,
|
||||
resultCount: result.resultCount,
|
||||
};
|
||||
} else {
|
||||
logger.error('[ResearchWorker] ❌ Search failed', {
|
||||
taskId,
|
||||
error: result.error,
|
||||
});
|
||||
|
||||
return {
|
||||
success: false,
|
||||
taskId,
|
||||
error: result.error,
|
||||
};
|
||||
}
|
||||
} catch (error: any) {
|
||||
logger.error('[ResearchWorker] ❌ Unexpected error', {
|
||||
taskId,
|
||||
error: error.message,
|
||||
});
|
||||
|
||||
// 更新任务状态为失败
|
||||
try {
|
||||
await researchService.executeSearch(taskId, query);
|
||||
} catch {
|
||||
// 忽略
|
||||
}
|
||||
|
||||
return {
|
||||
success: false,
|
||||
taskId,
|
||||
error: error.message,
|
||||
};
|
||||
}
|
||||
});
|
||||
|
||||
logger.info('[ResearchWorker] ✅ Worker registered: asl_research_execute');
|
||||
}
|
||||
Reference in New Issue
Block a user