/** * 全文复筛服务 * * 功能: * - 批量处理文献全文筛选 * - 集成LLM服务、验证器、冲突检测 * - 并发控制与进度跟踪 * - 容错与重试机制 * * @module FulltextScreeningService */ import { PrismaClient } from '@prisma/client'; import PQueue from 'p-queue'; import { LLM12FieldsService, LLM12FieldsMode } from '../../common/llm/LLM12FieldsService.js'; import { MedicalLogicValidator } from '../../common/validation/MedicalLogicValidator.js'; import { EvidenceChainValidator } from '../../common/validation/EvidenceChainValidator.js'; import { ConflictDetectionService } from '../../common/validation/ConflictDetectionService.js'; import { logger } from '../../../../common/logging/index.js'; const prisma = new PrismaClient(); // ===================================================== // 类型定义 // ===================================================== export interface FulltextScreeningConfig { modelA: string; modelB: string; promptVersion?: string; concurrency?: number; // 并发数,默认3 maxRetries?: number; // 最大重试次数,默认2 skipExtraction?: boolean; // 跳过全文提取(用于测试) } export interface ScreeningProgress { taskId: string; status: 'pending' | 'running' | 'completed' | 'failed'; totalCount: number; processedCount: number; successCount: number; failedCount: number; degradedCount: number; totalTokens: number; totalCost: number; startedAt: Date | null; completedAt: Date | null; estimatedEndAt: Date | null; currentLiterature?: string; } export interface SingleLiteratureResult { success: boolean; isDegraded: boolean; error?: string; tokens: number; cost: number; } // ===================================================== // 全文复筛服务 // ===================================================== export class FulltextScreeningService { private llmService: LLM12FieldsService; private medicalLogicValidator: MedicalLogicValidator; private evidenceChainValidator: EvidenceChainValidator; private conflictDetectionService: ConflictDetectionService; constructor() { this.llmService = new LLM12FieldsService(); this.medicalLogicValidator = new MedicalLogicValidator(); this.evidenceChainValidator = new EvidenceChainValidator(); this.conflictDetectionService = new ConflictDetectionService(); } // ===================================================== // 1. 任务处理入口 // ===================================================== /** * 启动全文复筛任务 * * @param projectId - 项目ID * @param literatureIds - 文献ID列表 * @param config - 筛选配置 * @returns 任务ID */ async createAndProcessTask( projectId: string, literatureIds: string[], config: FulltextScreeningConfig ): Promise { logger.info('Creating fulltext screening task', { projectId, literatureCount: literatureIds.length, config, }); // 1. 获取项目和文献数据 const project = await prisma.aslScreeningProject.findUnique({ where: { id: projectId }, }); if (!project) { throw new Error(`Project not found: ${projectId}`); } const literatures = await prisma.aslLiterature.findMany({ where: { id: { in: literatureIds }, projectId, }, }); if (literatures.length === 0) { throw new Error('No valid literatures found'); } logger.info(`Found ${literatures.length} literatures to process`); // 2. 创建任务记录 const task = await prisma.aslFulltextScreeningTask.create({ data: { id: `task_${Date.now()}_${Math.random().toString(36).slice(2, 9)}`, projectId, modelA: config.modelA, modelB: config.modelB, promptVersion: config.promptVersion || 'v1.0.0-mvp', status: 'pending', totalCount: literatures.length, processedCount: 0, successCount: 0, failedCount: 0, degradedCount: 0, totalTokens: 0, totalCost: 0, }, }); logger.info(`Task created: ${task.id}`); // 3. 异步处理任务(不等待完成) this.processTaskInBackground(task.id, literatures, project, config).catch((error) => { logger.error('Task processing failed', { taskId: task.id, error }); }); return task.id; } /** * 后台处理任务(核心逻辑) * * @param taskId - 任务ID * @param literatures - 文献列表 * @param project - 项目信息 * @param config - 筛选配置 */ private async processTaskInBackground( taskId: string, literatures: any[], project: any, config: FulltextScreeningConfig ): Promise { const startTime = Date.now(); try { // 1. 更新任务状态为运行中 await prisma.aslFulltextScreeningTask.update({ where: { id: taskId }, data: { status: 'running', startedAt: new Date(), }, }); logger.info(`Task started: ${taskId}`, { totalCount: literatures.length, concurrency: config.concurrency || 3, }); // 2. 构建PICOS上下文 const picosContext = { P: project.picoCriteria.P || '', I: project.picoCriteria.I || '', C: project.picoCriteria.C || '', O: project.picoCriteria.O || '', S: project.picoCriteria.S || '', inclusionCriteria: project.inclusionCriteria || '', exclusionCriteria: project.exclusionCriteria || '', }; // 3. 并发处理文献 const concurrency = config.concurrency || 3; const queue = new PQueue({ concurrency }); let processedCount = 0; let successCount = 0; let failedCount = 0; let degradedCount = 0; let totalTokens = 0; let totalCost = 0; const tasks = literatures.map((literature, index) => queue.add(async () => { const litStartTime = Date.now(); logger.info(`[${index + 1}/${literatures.length}] Processing: ${literature.title}`); try { // 处理单篇文献 const result = await this.screenLiteratureWithRetry( taskId, project.id, literature, picosContext, config ); // 更新统计 processedCount++; if (result.success) { successCount++; if (result.isDegraded) { degradedCount++; } } else { failedCount++; } totalTokens += result.tokens; totalCost += result.cost; // 更新进度 await this.updateTaskProgress(taskId, { processedCount, successCount, failedCount, degradedCount, totalTokens, totalCost, startTime, }); const litDuration = Date.now() - litStartTime; logger.info( `[${index + 1}/${literatures.length}] ✅ Success: ${literature.title} (${litDuration}ms, ${result.tokens} tokens, $${result.cost.toFixed(4)})` ); } catch (error: any) { processedCount++; failedCount++; logger.error(`[${index + 1}/${literatures.length}] ❌ Failed: ${literature.title}`, { error: error.message, }); // 更新进度(失败) await this.updateTaskProgress(taskId, { processedCount, successCount, failedCount, degradedCount, totalTokens, totalCost, startTime, }); } }) ); // 等待所有任务完成 await Promise.all(tasks); // 4. 完成任务 await this.completeTask(taskId, { status: 'completed', totalTokens, totalCost, successCount, failedCount, degradedCount, }); const duration = Date.now() - startTime; logger.info(`Task completed: ${taskId}`, { duration: `${(duration / 1000).toFixed(1)}s`, totalCount: literatures.length, successCount, failedCount, degradedCount, totalTokens, totalCost: `$${totalCost.toFixed(4)}`, }); } catch (error: any) { logger.error(`Task failed: ${taskId}`, { error: error.message, stack: error.stack }); await prisma.aslFulltextScreeningTask.update({ where: { id: taskId }, data: { status: 'failed', completedAt: new Date(), errorMessage: error.message, errorStack: error.stack, }, }); } } // ===================================================== // 2. 单篇文献筛选 // ===================================================== /** * 处理单篇文献(带重试) * * @param taskId - 任务ID * @param projectId - 项目ID * @param literature - 文献信息 * @param picosContext - PICOS上下文 * @param config - 筛选配置 * @returns 处理结果 */ private async screenLiteratureWithRetry( taskId: string, projectId: string, literature: any, picosContext: any, config: FulltextScreeningConfig ): Promise { const maxRetries = config.maxRetries || 2; for (let attempt = 1; attempt <= maxRetries; attempt++) { try { return await this.screenLiterature(taskId, projectId, literature, picosContext, config); } catch (error: any) { logger.warn(`Retry ${attempt}/${maxRetries} for literature ${literature.id}`, { error: error.message, }); if (attempt === maxRetries) { // 最后一次重试失败,抛出错误 throw error; } // 等待后重试(指数退避) await new Promise((resolve) => setTimeout(resolve, 1000 * attempt)); } } throw new Error('Unreachable code'); } /** * 处理单篇文献(核心逻辑) * * @param taskId - 任务ID * @param projectId - 项目ID * @param literature - 文献信息 * @param picosContext - PICOS上下文 * @param config - 筛选配置 * @returns 处理结果 */ private async screenLiterature( taskId: string, projectId: string, literature: any, picosContext: any, config: FulltextScreeningConfig ): Promise { // 1. 获取PDF Buffer let pdfBuffer: Buffer; let filename: string; if (config.skipExtraction) { // 测试模式:创建一个简单的文本Buffer模拟PDF const testContent = `# ${literature.title}\n\n## Abstract\n${literature.abstract}`; pdfBuffer = Buffer.from(testContent, 'utf-8'); filename = `test_${literature.id}.txt`; logger.info(`[TEST MODE] Using title+abstract as test PDF`); } else { // 生产模式:从存储中获取PDF if (!literature.pdfStorageRef) { throw new Error(`No PDF available for literature ${literature.id}`); } // TODO: 从OSS/Dify加载PDF Buffer // pdfBuffer = await pdfStorageService.downloadPDF(literature.pdfStorageRef); // 临时方案:使用测试数据 const testContent = `# ${literature.title}\n\n## Abstract\n${literature.abstract}`; pdfBuffer = Buffer.from(testContent, 'utf-8'); filename = `${literature.id}.pdf`; logger.warn(`[TODO] PDF loading not implemented, using test data for ${literature.id}`); } // 2. 调用LLM服务(双模型) const llmResult = await this.llmService.processDualModels( LLM12FieldsMode.SCREENING, config.modelA, config.modelB, pdfBuffer, filename, picosContext ); // 检查至少有一个模型成功 if (!llmResult.resultA && !llmResult.resultB) { throw new Error(`Both models failed in dual-model processing`); } // 3. 验证器处理 // 3.1 医学逻辑验证 const medicalLogicIssuesA = llmResult.resultA?.result ? this.medicalLogicValidator.validate(llmResult.resultA.result) : []; const medicalLogicIssuesB = llmResult.resultB?.result ? this.medicalLogicValidator.validate(llmResult.resultB.result) : []; // 3.2 证据链验证 const evidenceChainIssuesA = llmResult.resultA?.result ? this.evidenceChainValidator.validate(llmResult.resultA.result) : []; const evidenceChainIssuesB = llmResult.resultB?.result ? this.evidenceChainValidator.validate(llmResult.resultB.result) : []; // 3.3 冲突检测 let conflictResult = null; if (llmResult.resultA?.result && llmResult.resultB?.result) { conflictResult = this.conflictDetectionService.detectScreeningConflict( llmResult.resultA.result, llmResult.resultB.result ); } // 4. 保存结果到数据库 const totalTokens = (llmResult.resultA?.tokenUsage || 0) + (llmResult.resultB?.tokenUsage || 0); const totalCost = (llmResult.resultA?.cost || 0) + (llmResult.resultB?.cost || 0); await prisma.aslFulltextScreeningResult.create({ data: { id: `result_${Date.now()}_${Math.random().toString(36).slice(2, 9)}`, taskId, projectId, literatureId: literature.id, // Model A 结果 modelAName: config.modelA, modelAStatus: llmResult.resultA ? 'success' : 'failed', modelAFields: llmResult.resultA?.result?.fields || null, modelAOverall: llmResult.resultA?.result?.overall || null, modelAProcessingLog: llmResult.resultA?.result?.processingLog || null, modelAVerification: llmResult.resultA?.result?.verification || null, modelATokens: llmResult.resultA?.tokenUsage || 0, modelACost: llmResult.resultA?.cost || 0, modelAError: null, // Model B 结果 modelBName: config.modelB, modelBStatus: llmResult.resultB ? 'success' : 'failed', modelBFields: llmResult.resultB?.result?.fields || null, modelBOverall: llmResult.resultB?.result?.overall || null, modelBProcessingLog: llmResult.resultB?.result?.processingLog || null, modelBVerification: llmResult.resultB?.result?.verification || null, modelBTokens: llmResult.resultB?.tokenUsage || 0, modelBCost: llmResult.resultB?.cost || 0, modelBError: null, // 验证结果 medicalLogicIssues: { modelA: medicalLogicIssuesA, modelB: medicalLogicIssuesB, } as any, evidenceChainIssues: { modelA: evidenceChainIssuesA, modelB: evidenceChainIssuesB, } as any, // 冲突检测 isConflict: conflictResult ? conflictResult.hasConflict : false, conflictSeverity: conflictResult?.severity || null, conflictFields: conflictResult?.conflictFields || [], conflictDetails: (conflictResult || null) as any, reviewPriority: conflictResult?.reviewPriority || 50, // 处理状态 processingStatus: 'completed', isDegraded: llmResult.degradedMode || false, degradedModel: llmResult.failedModel || null, processedAt: new Date(), promptVersion: config.promptVersion || 'v1.0.0-mvp', // 原始输出(用于审计) rawOutputA: (llmResult.resultA || null) as any, rawOutputB: (llmResult.resultB || null) as any, }, }); // 5. 返回结果 return { success: true, isDegraded: llmResult.degradedMode || false, tokens: totalTokens, cost: totalCost, }; } // ===================================================== // 3. 进度更新 // ===================================================== /** * 更新任务进度 * * @param taskId - 任务ID * @param progress - 进度信息 */ private async updateTaskProgress( taskId: string, progress: { processedCount: number; successCount: number; failedCount: number; degradedCount: number; totalTokens: number; totalCost: number; startTime: number; } ): Promise { // 计算预估结束时间 const elapsed = Date.now() - progress.startTime; const avgTimePerItem = elapsed / progress.processedCount; const task = await prisma.aslFulltextScreeningTask.findUnique({ where: { id: taskId }, }); if (!task) { logger.warn(`Task not found: ${taskId}`); return; } const remainingItems = task.totalCount - progress.processedCount; const estimatedRemainingTime = avgTimePerItem * remainingItems; const estimatedEndAt = new Date(Date.now() + estimatedRemainingTime); // 更新数据库 await prisma.aslFulltextScreeningTask.update({ where: { id: taskId }, data: { processedCount: progress.processedCount, successCount: progress.successCount, failedCount: progress.failedCount, degradedCount: progress.degradedCount, totalTokens: progress.totalTokens, totalCost: progress.totalCost, estimatedEndAt, }, }); } // ===================================================== // 4. 任务完成 // ===================================================== /** * 标记任务完成 * * @param taskId - 任务ID * @param summary - 任务摘要 */ private async completeTask( taskId: string, summary: { status: 'completed' | 'failed'; totalTokens: number; totalCost: number; successCount: number; failedCount: number; degradedCount: number; } ): Promise { await prisma.aslFulltextScreeningTask.update({ where: { id: taskId }, data: { status: summary.status, completedAt: new Date(), totalTokens: summary.totalTokens, totalCost: summary.totalCost, successCount: summary.successCount, failedCount: summary.failedCount, degradedCount: summary.degradedCount, }, }); logger.info(`Task marked as ${summary.status}: ${taskId}`); } // ===================================================== // 5. 查询接口 // ===================================================== /** * 获取任务进度 * * @param taskId - 任务ID * @returns 进度信息 */ async getTaskProgress(taskId: string): Promise { const task = await prisma.aslFulltextScreeningTask.findUnique({ where: { id: taskId }, }); if (!task) { return null; } return { taskId: task.id, status: task.status as any, totalCount: task.totalCount, processedCount: task.processedCount, successCount: task.successCount, failedCount: task.failedCount, degradedCount: task.degradedCount, totalTokens: task.totalTokens || 0, totalCost: task.totalCost || 0, startedAt: task.startedAt, completedAt: task.completedAt, estimatedEndAt: task.estimatedEndAt, }; } /** * 获取任务结果列表 * * @param taskId - 任务ID * @param filter - 过滤条件 * @returns 结果列表 */ async getTaskResults( taskId: string, filter?: { conflictOnly?: boolean; page?: number; pageSize?: number; } ): Promise<{ results: any[]; total: number }> { const page = filter?.page || 1; const pageSize = filter?.pageSize || 50; const skip = (page - 1) * pageSize; const where: any = { taskId }; if (filter?.conflictOnly) { where.isConflict = true; } const [results, total] = await Promise.all([ prisma.aslFulltextScreeningResult.findMany({ where, include: { literature: { select: { id: true, title: true, authors: true, journal: true, publicationYear: true, }, }, }, orderBy: [ { isConflict: 'desc' }, { reviewPriority: 'desc' }, { processedAt: 'desc' }, ], skip, take: pageSize, }), prisma.aslFulltextScreeningResult.count({ where }), ]); return { results, total }; } /** * 更新人工复核决策 * * @param resultId - 结果ID * @param decision - 决策信息 */ async updateReviewDecision( resultId: string, decision: { finalDecision: 'include' | 'exclude'; finalDecisionBy: string; exclusionReason?: string; reviewNotes?: string; } ): Promise { await prisma.aslFulltextScreeningResult.update({ where: { id: resultId }, data: { finalDecision: decision.finalDecision, finalDecisionBy: decision.finalDecisionBy, finalDecisionAt: new Date(), exclusionReason: decision.exclusionReason || null, reviewNotes: decision.reviewNotes || null, }, }); logger.info(`Review decision updated: ${resultId}`, { decision: decision.finalDecision }); } } // 导出单例 export const fulltextScreeningService = new FulltextScreeningService();