/** * DC模块 - 双模型提取服务 * * 功能: * - 并发调用DeepSeek-V3和Qwen-Max进行文本提取 * - PII脱敏处理 * - JSON解析与容错 * - Token统计 * - 异步任务管理 * * 平台能力复用: * - ✅ LLMFactory: LLM调用 * - ✅ jobQueue: 异步任务 * - ✅ logger: 日志记录 * - ✅ prisma: 数据库操作 */ import { LLMFactory } from '../../../../common/llm/adapters/LLMFactory.js'; import { logger } from '../../../../common/logging/index.js'; import { prisma } from '../../../../config/database.js'; export interface ExtractionInput { text: string; fields: { name: string; desc: string }[]; promptTemplate: string; } export interface ExtractionOutput { result: Record; tokensUsed: number; rawOutput: any; } export class DualModelExtractionService { /** * 双模型并发提取 * * @param input 提取输入 * @param taskId 任务ID * @param itemId 记录ID * @returns 双模型结果 */ async extract(input: ExtractionInput, taskId: string, itemId: string): Promise<{ resultA: ExtractionOutput; resultB: ExtractionOutput; }> { try { logger.info('[DualExtraction] Starting extraction', { taskId, itemId }); // 1. PII脱敏 const maskedText = this.maskPII(input.text); // 2. 构建Prompt const prompt = this.buildPrompt(maskedText, input.fields, input.promptTemplate); // 3. 并发调用两个模型(DeepSeek & Qwen) const [resultA, resultB] = await Promise.allSettled([ this.callModel('deepseek', prompt, input.fields), this.callModel('qwen', prompt, input.fields) ]); // 4. 处理结果 if (resultA.status === 'rejected' || resultB.status === 'rejected') { logger.error('[DualExtraction] One or both models failed', { taskId, itemId, errorA: resultA.status === 'rejected' ? resultA.reason : null, errorB: resultB.status === 'rejected' ? resultB.reason : null }); throw new Error('Dual model extraction failed'); } logger.info('[DualExtraction] Extraction completed', { taskId, itemId, tokensA: resultA.value.tokensUsed, tokensB: resultB.value.tokensUsed }); return { resultA: resultA.value, resultB: resultB.value }; } catch (error) { logger.error('[DualExtraction] Extraction failed', { error, taskId, itemId }); throw error; } } /** * PII脱敏 * * 使用正则表达式替换敏感信息: * - 姓名:张** * - 身份证号:3301********1234 * - 手机号:138****5678 */ private maskPII(text: string): string { let masked = text; // 手机号脱敏:138****5678 masked = masked.replace(/1[3-9]\d{9}/g, (match) => { return match.substring(0, 3) + '****' + match.substring(7); }); // 身份证号脱敏:330102********1234 masked = masked.replace(/\d{6}(19|20)\d{2}(0[1-9]|1[0-2])(0[1-9]|[12]\d|3[01])\d{3}[\dxX]/g, (match) => { return match.substring(0, 6) + '********' + match.substring(14); }); // 简单的姓名脱敏(匹配:患者xxx、姓名:xxx) masked = masked.replace(/(患者|姓名[::])\s*([^\s,。,]{2,4})/g, (match, prefix, name) => { if (name.length === 2) { return prefix + name[0] + '*'; } return prefix + name[0] + '*'.repeat(name.length - 1); }); return masked; } /** * 构建Prompt */ private buildPrompt(text: string, fields: { name: string; desc: string }[], template: string): string { // 在模板末尾添加病历文本 return `${template} **病历原文:** ${text} 请严格按照JSON格式输出,不要有任何额外文字。`; } /** * 调用单个模型 */ private async callModel( modelType: 'deepseek' | 'qwen', prompt: string, fields: { name: string; desc: string }[] ): Promise { try { // 🔑 使用LLMFactory获取适配器(正确的方法) const modelName = modelType === 'deepseek' ? 'deepseek-v3' : 'qwen3-72b'; logger.info(`[${modelType.toUpperCase()}] Getting adapter`, { modelName }); const adapter = LLMFactory.getAdapter(modelName as any); logger.info(`[${modelType.toUpperCase()}] Adapter created successfully`); logger.info(`[${modelType.toUpperCase()}] Calling model with prompt`, { modelName, promptLength: prompt.length, promptPreview: prompt.substring(0, 100) + '...' }); // 🔑 调用LLM(使用chat方法,符合ILLMAdapter接口) const startTime = Date.now(); const response = await adapter.chat([ { role: 'user', content: prompt } ], { temperature: 0, // 最大确定性 maxTokens: 1000 }); const elapsedTime = Date.now() - startTime; logger.info(`[${modelType.toUpperCase()}] Model responded successfully`, { modelName, tokensUsed: response.usage?.totalTokens, elapsedMs: elapsedTime, contentLength: response.content.length, contentPreview: response.content.substring(0, 200) }); // 解析JSON(3层容错) logger.info(`[${modelType.toUpperCase()}] Parsing JSON response`); const result = this.parseJSON(response.content, fields); logger.info(`[${modelType.toUpperCase()}] JSON parsed successfully`, { fieldCount: Object.keys(result).length }); return { result, tokensUsed: response.usage?.totalTokens || 0, rawOutput: response.content }; } catch (error: any) { logger.error(`[${modelType.toUpperCase()}] Model call failed`, { error: error.message, stack: error.stack, modelType }); throw error; } } /** * 解析JSON(3层容错策略) * * 1. 直接JSON.parse * 2. 提取```json代码块 * 3. 提取{}内容 */ private parseJSON(text: string, fields: { name: string; desc: string }[]): Record { // 策略1:直接解析 try { const parsed = JSON.parse(text); if (this.validateFields(parsed, fields)) { return parsed; } } catch (e) { // 继续下一个策略 } // 策略2:提取```json代码块 const codeBlockMatch = text.match(/```json\s*\n([\s\S]*?)\n```/); if (codeBlockMatch) { try { const parsed = JSON.parse(codeBlockMatch[1]); if (this.validateFields(parsed, fields)) { return parsed; } } catch (e) { // 继续下一个策略 } } // 策略3:提取第一个完整的{}对象 const objectMatch = text.match(/\{[\s\S]*\}/); if (objectMatch) { try { const parsed = JSON.parse(objectMatch[0]); if (this.validateFields(parsed, fields)) { return parsed; } } catch (e) { // 解析失败 } } // 所有策略失败,返回空对象 logger.warn('[JSON] All parse strategies failed', { text }); const emptyResult: Record = {}; fields.forEach(f => { emptyResult[f.name] = '解析失败'; }); return emptyResult; } /** * 验证字段完整性 */ private validateFields(parsed: any, fields: { name: string; desc: string }[]): boolean { if (!parsed || typeof parsed !== 'object') { return false; } // 检查所有必需字段是否存在 return fields.every(f => parsed.hasOwnProperty(f.name)); } /** * 批量提取(异步任务) * * @param taskId 任务ID */ async batchExtract(taskId: string): Promise { try { logger.info('[Batch] ===== Starting batch extraction =====', { taskId }); // 1. 获取任务 logger.info('[Batch] Step 1: Fetching task from database', { taskId }); const task = await prisma.dCExtractionTask.findUnique({ where: { id: taskId }, include: { items: true } }); if (!task) { logger.error('[Batch] Task not found in database', { taskId }); throw new Error(`Task not found: ${taskId}`); } logger.info('[Batch] Task fetched successfully', { taskId, itemCount: task.items.length, diseaseType: task.diseaseType, reportType: task.reportType }); // 2. 更新任务状态 await prisma.dCExtractionTask.update({ where: { id: taskId }, data: { status: 'processing', startedAt: new Date() } }); // 3. 获取模板 const template = await prisma.dCTemplate.findUnique({ where: { diseaseType_reportType: { diseaseType: task.diseaseType, reportType: task.reportType } } }); if (!template) { throw new Error(`Template not found: ${task.diseaseType}/${task.reportType}`); } const fields = template.fields as { name: string; desc: string }[]; // 4. 逐条处理 let processedCount = 0; let cleanCount = 0; let conflictCount = 0; let totalTokens = 0; for (const item of task.items) { try { // 双模型提取 const { resultA, resultB } = await this.extract( { text: item.originalText, fields, promptTemplate: template.promptTemplate }, taskId, item.id ); // 检测冲突(由ConflictDetectionService处理,这里暂时简单比较) const hasConflict = JSON.stringify(resultA.result) !== JSON.stringify(resultB.result); // 更新记录 await prisma.dCExtractionItem.update({ where: { id: item.id }, data: { resultA: resultA.result as any, resultB: resultB.result as any, tokensA: resultA.tokensUsed, tokensB: resultB.tokensUsed, status: hasConflict ? 'conflict' : 'clean', finalResult: (hasConflict ? null : resultA.result) as any // 一致时自动采纳 } }); processedCount++; if (hasConflict) { conflictCount++; } else { cleanCount++; } totalTokens += resultA.tokensUsed + resultB.tokensUsed; // 更新任务进度 await prisma.dCExtractionTask.update({ where: { id: taskId }, data: { processedCount, cleanCount, conflictCount, totalTokens } }); } catch (error) { logger.error('[Batch] Item extraction failed', { error, itemId: item.id }); await prisma.dCExtractionItem.update({ where: { id: item.id }, data: { status: 'failed', error: String(error) } }); } } // 5. 完成任务 await prisma.dCExtractionTask.update({ where: { id: taskId }, data: { status: 'completed', completedAt: new Date() } }); logger.info('[Batch] Batch extraction completed', { taskId, processedCount, cleanCount, conflictCount, totalTokens }); } catch (error) { logger.error('[Batch] Batch extraction failed', { error, taskId }); // 更新任务为失败状态 await prisma.dCExtractionTask.update({ where: { id: taskId }, data: { status: 'failed', error: String(error) } }); throw error; } } } // 导出单例 export const dualModelExtractionService = new DualModelExtractionService();