# **🔬 工具 3 终极代码级同步审计与修正清单 (基于 Fan-out v1.2)** **审计背景:** 确保《工具 3 开发计划 (v1.4.2)》及其代码模式(08d)完全、无死角地落实了《分布式 Fan-out 开发指南 v1.2》中的所有极端场景防御策略。 **审计结论:** 理论已同步,但**代码落地存在 4 处断层**。必须修改 08d-代码模式与技术规范.md 中的具体代码片段。 ## **🚨 审计点 1:Child Worker 临时错误重试的“死锁穿透” (必须修改)** **🔍 逐行审查发现:** 在 08d 文档的 §4.3 ExtractionChildWorker 的 catch 块中,针对临时错误的代码目前是: // 当前 08d 代码 // 临时错误 (429/网络抖动):直接 throw,让 pg-boss 自动指数退避重试 throw error; **💥 业务危害:** 这直接违背了 Fan-out 指南 v1.2 的核心补丁!因为上方使用了 updateMany 乐观锁把 AslExtractionResult 的状态改为了 extracting。如果直接 throw,pg-boss 在 10 秒后重试时,数据库里该行还是 extracting,乐观锁 updateMany 会返回 count: 0,导致 Worker 误以为任务已完成而直接 return success。**最终导致父任务 AslExtractionTask 永远少一个计数,彻底卡死在 processing。** **✅ 代码修正指令:** 必须在 ExtractionChildWorker 的 catch 块末尾,throw error 之前,强制释放当前业务表的锁: // 修正后的 08d §4.3 代码 } catch (error) { if (isPermanentError(error)) { // 致命错误处理逻辑不变... return { success: false }; } // ⚡ 必须增加的解锁代码:临时错误退避前,回退状态为 pending! await prisma.aslExtractionResult.update({ where: { id: resultId }, data: { status: 'pending' } }); // 让出状态后,再抛出异常让 pg-boss 重试 throw error; } ## **🚨 审计点 2:Manager Worker 空文献的“无头挂起” (必须修改)** **🔍 逐行审查发现:** 在 08d 文档的 §4.2 ExtractionManagerWorker 中,代码是: // 当前 08d 代码 const results \= await prisma.aslExtractionResult.findMany({ where: { taskId: task.id } }); for (const result of results) { await pgBoss.send('asl\_extraction\_child', ...); } // Manager 退出 **💥 业务危害:** 在工具 3 的业务流中,如果用户在 Step 1 勾选的 PKB 文献因为某种原因(如被其他协作者删除)导致 results.length \=== 0,Manager 会直接退出。因为没有任何 Child 被派发,Last Child Wins 永远不触发,AslExtractionTask 状态永远是 processing,前端进度条永远转圈。 **✅ 代码修正指令:** 在 ExtractionManagerWorker 获取到 results 后,必须增加边界拦截: // 修正后的 08d §4.2 代码 const results \= await prisma.aslExtractionResult.findMany({ where: { taskId: task.id } }); // ⚡ 必须增加的空集合守卫 if (results.length \=== 0\) { await prisma.aslExtractionTask.update({ where: { id: task.id }, data: { status: 'completed', completedAt: new Date() } }); // 触发 SSE 完成事件 await prisma.$executeRaw\`SELECT pg\_notify('asl\_extraction\_sse', '{"taskId":"${task.id}","type":"complete"}')\`; return; } // 正常循环派发... ## **🚨 审计点 3:工具 3 专属 Sweeper 的缺位 (必须新增)** **🔍 逐行审查发现:** 《Fan-out 开发指南 v1.2》规定必须有 Sweeper 清道夫。但在《工具 3 开发计划》的所有 Task 清单(M1/M2/M3)中,**完全没有分配开发 Sweeper 的任务**。 **💥 业务危害:** 如果没有针对 AslExtractionTask 写具体的清道夫代码,一旦遇到极度变态的 PDF 导致 MinerU 或 pymupdf4llm 的 Node.js 宿主进程 OOM 崩溃,该 Task 会永久挂起在前端工作台,医生无法进行后续操作。 **✅ 代码修正指令:** 必须在后端模块初始化时(如 backend/src/modules/asl/extraction/index.ts),专门为工具 3 注册一个清道夫 Worker: // ⚡ 必须在工具 3 模块启动时注册 async function aslExtractionSweeper() { const stuckTasks \= await prisma.aslExtractionTask.findMany({ where: { status: 'processing', // 工具 3 独有逻辑:使用 updatedAt 判断最后活跃时间超 2 小时 updatedAt: { lt: new Date(Date.now() \- 2 \* 60 \* 60 \* 1000\) }, }, }); for (const task of stuckTasks) { await prisma.aslExtractionTask.update({ where: { id: task.id }, data: { status: 'failed', errorMessage: 'System timeout (OOM/Crash)', completedAt: new Date() }, }); } } await jobQueue.schedule('asl\_extraction\_sweeper', '\*/10 \* \* \* \*'); await jobQueue.work('asl\_extraction\_sweeper', aslExtractionSweeper); ## **🚨 审计点 4:SSE 广播代码的安全隐患 (必须修改)** **🔍 逐行审查发现:** 在 08d 的 §4.3 中,Child Worker 完成提取后,依然在使用: // 当前 08d 代码 this.sseEmitter.emit(taskId, { type: 'log', data: { ... } }); **💥 业务危害:** 这还是单机内存的 EventEmitter!在 SAE 多实例(多 Pods)部署下,Pod A 上的用户绝对收不到 Pod B 产生的日志。前端日志流会严重断裂。 **✅ 代码修正指令:** 必须将所有 this.sseEmitter.emit 替换为安全的、截断的、参数化的 pg\_notify SQL 注入免疫调用: // 修正后的 08d §4.3 代码 const logEntry \= { source: 'system', message: \`✅ ${extractResult.filename} extracted\` }; const payloadStr \= JSON.stringify({ taskId, type: 'log', data: logEntry }); // ⚡ 必须进行的 7000 bytes 安全截断(防 PostgreSQL 报错) const safePayload \= payloadStr.length \> 7000 ? payloadStr.substring(0, 7000\) \+ '..."}' : payloadStr; // ⚡ 必须使用的参数化 pg\_notify await prisma.$executeRaw\`SELECT pg\_notify('asl\_extraction\_sse', ${safePayload})\`; ## **🏁 架构师最终放行许可** 只要开发团队在编写工具 3 代码时,把上述 **4 段具体的代码** 替换到工程中: 1. Child Worker catch 释放锁 2. Manager Worker 拦截空数组 3. 注册 asl\_extraction\_sweeper 4. 替换 sseEmitter 为参数化 pg\_notify 您的系统在抗压能力、容错能力和数据一致性上,将绝对达到顶尖大厂的微服务水准!**这一次,您可以 100% 放心闭眼放行了!祝团队开发顺利!**