Features: - Add editable variable selection in workflow plan (SingleVarSelect + MultiVarTags) - Implement 3-layer flexible interception (warning bar + icon + blocking dialog) - Add tool_param_constraints.json for 12 statistical tools parameter validation - Add PATCH /workflow/:id/params API with Zod structural validation - Implement synchronous parameter sync before execution (Promise chaining) - Fix LLM hallucination by strict system prompt constraints - Fix DynamicReport object-based rows compatibility (R baseline_table) - Fix Word export row.map error with same normalization logic - Restore inferGroupingVar for smart default variable selection - Add ReactMarkdown rendering in SSAChatPane - Update SSA module status document to v3.5 Modified files: - backend: workflow.routes, ChatHandlerService, SystemPromptService, FlowTemplateService - frontend: WorkflowTimeline, SSAWorkspacePane, DynamicReport, SSAChatPane, ssaStore, ssa.css - config: tool_param_constraints.json (new) - docs: SSA status doc, team review reports Tested: Cohort study end-to-end execution + report export verified Co-authored-by: Cursor <cursoragent@cursor.com>
154 lines
7.2 KiB
Markdown
154 lines
7.2 KiB
Markdown
# **🔬 分布式 Fan-out 任务模式开发指南:逐行级审查与修正报告**
|
||
|
||
**审查人:** 资深架构师 & 分布式系统专家
|
||
|
||
**审查对象:** 《分布式 Fan-out 任务模式开发指南 v1.1》
|
||
|
||
**审查深度:** 代码级、变量级、多进程时序推演
|
||
|
||
**核心结论:** 整体框架卓越!但在“重试机制与乐观锁的冲突”、“清道夫的误伤”、“空任务死锁”以及“底层 SQL 注入”上,存在 **4 处极其隐蔽且致命的系统级 Bug**。必须修正后方可发布 v2.0。
|
||
|
||
## **🚨 致命漏洞 1:“乐观锁”与“重试机制”的互相绞杀 (The Retry Paradox)**
|
||
|
||
### **❌ 逐行推演发现的问题(位于 模式 3 与 模式 4)**
|
||
|
||
在指南的《模式 3:乐观锁抢占》中,您的代码写道:
|
||
|
||
const lock \= await prisma.result.updateMany({
|
||
where: { id: resultId, status: 'pending' },
|
||
data: { status: 'processing' },
|
||
});
|
||
if (lock.count \=== 0\) return { success: true, note: 'Idempotent skip' };
|
||
|
||
在《模式 4:错误分级路由》中,当发生临时错误(如 API 超时)时,您的代码写道:
|
||
|
||
// 临时错误 (429/5xx/网络抖动):throw → pg-boss 指数退避自动重试
|
||
throw error;
|
||
|
||
**🔥 灾难时序爆发:**
|
||
|
||
1. Child Job 第一次运行,成功拿到锁,数据库 status 变为 **processing**。
|
||
2. 调用外部大模型,发生网络抖动抛出 Error,走到 catch 块,执行了 throw error。
|
||
3. pg-boss 捕获异常,决定 10 秒后**重试**这个 Job。
|
||
4. 10 秒后,Child Job 第二次运行,执行 updateMany({ where: { status: 'pending' } })。
|
||
5. **致命时刻:** 因为第一次失败时**没有把状态改回 pending**,此时数据库里的状态依然是 processing!
|
||
6. updateMany 返回 count \=== 0。代码打印 "Idempotent skip",然后直接 return { success: true }。
|
||
7. **后果:** 这个重试的任务什么都没做就“成功”退出了。它**不会**递增父任务的失败数或成功数,父任务**永远缺少一次计数**,"Last Child Wins" 永远无法触发,整个任务死锁卡住。
|
||
|
||
### **✅ 骨灰级修正方案**
|
||
|
||
在《模式 4:错误分级路由》的 catch 块中,针对临时错误,**必须在 throw error 释放锁(回退状态)**:
|
||
|
||
} catch (error) {
|
||
if (isPermanentError(error)) {
|
||
// 永久错误逻辑不变...
|
||
return { success: false };
|
||
}
|
||
|
||
// 核心补丁:临时错误在交给 pg-boss 重试前,必须释放乐观锁!
|
||
await prisma.result.update({
|
||
where: { id: resultId },
|
||
data: { status: 'pending' } // 让出状态,允许下一次重试抢占
|
||
});
|
||
|
||
throw error; // 继续抛出,触发 pg-boss 退避重试
|
||
}
|
||
|
||
## **🚨 致命漏洞 2:清道夫 (Sweeper) 的“友军之火” (Friendly Fire)**
|
||
|
||
### **❌ 逐行推演发现的问题(位于 模式 2:Sweeper 清道夫)**
|
||
|
||
指南中建议这样筛选卡死的任务:
|
||
|
||
where: {
|
||
status: 'processing',
|
||
startedAt: { lt: new Date(Date.now() \- 2 \* 60 \* 60 \* 1000\) }, // 超过 2 小时
|
||
}
|
||
|
||
**🔥 灾难时序爆发:**
|
||
|
||
1. 用户提交了一个包含 **500 篇**复杂 PDF 的超级批量任务。
|
||
2. 系统限流 MinerU teamConcurrency: 2,导致这 500 篇文献正常排队执行,总共需要花费 **3 个小时**才能跑完。
|
||
3. 跑到第 2 小时零 1 分钟时,任务非常健康,已经完成了 350 篇。
|
||
4. **致命时刻:** Sweeper 定时任务被唤醒。它发现这个任务的 startedAt 是 2 小时前,不管三七二十一,直接把这个健康运行的巨型任务标记为 failed 并强制收口!
|
||
|
||
### **✅ 骨灰级修正方案**
|
||
|
||
判断一个任务是否“卡死”,不能看它“什么时候开始 (startedAt)”,而必须看它\*\*“上一次产生进度是什么时候 (updatedAt)”\*\*!
|
||
|
||
只要子任务还在不断完成,Task 表的 updatedAt 就会不断被刷新。超过 2 小时没有进度更新的,才是真死机。
|
||
|
||
// 修正后的 Sweeper 筛选条件
|
||
const stuckTasks \= await prisma.task.findMany({
|
||
where: {
|
||
status: 'processing',
|
||
// 核心补丁:使用 updatedAt(最后活跃时间)而非 startedAt
|
||
updatedAt: { lt: new Date(Date.now() \- 2 \* 60 \* 60 \* 1000\) },
|
||
},
|
||
});
|
||
|
||
## **🚨 致命漏洞 3:Manager 的“空集合”黑洞 (The Empty Batch Deadlock)**
|
||
|
||
### **❌ 逐行推演发现的问题(位于 二、核心架构 Manager Job)**
|
||
|
||
架构图中写道:Manager 读取 N 个子项 \-\> for each 派发 \-\> 退出。
|
||
|
||
但是!如果因为某种极端业务情况(比如用户传了一个空的列表,或者源数据被过滤后 results.length \=== 0)。
|
||
|
||
**🔥 灾难时序爆发:**
|
||
|
||
1. Manager 查出文献列表,发现长度为 0。
|
||
2. for 循环不执行,直接退出。
|
||
3. **致命时刻:** 因为没有任何 Child Job 被派发,所以永远不会有 Child Job 去触发 "Last Child Wins" 收口逻辑。
|
||
4. 父任务 Task 将永远停留在 status: 'processing'。
|
||
|
||
### **✅ 骨灰级修正方案**
|
||
|
||
在 Manager Job 派发子任务之前,必须增加边界拦截:
|
||
|
||
// Manager Worker 核心补丁
|
||
if (items.length \=== 0\) {
|
||
// 如果没有任何子项,Manager 必须自己充当收口人
|
||
await prisma.task.update({
|
||
where: { id: taskId },
|
||
data: { status: 'completed', completedAt: new Date() }
|
||
});
|
||
return; // 直接退出
|
||
}
|
||
|
||
// 继续执行 for 循环派发...
|
||
|
||
## **🚨 隐患 4:NOTIFY 的底层 SQL 注入与转义灾难**
|
||
|
||
### **❌ 逐行推演发现的问题(位于 模式 6:SSE 跨实例广播)**
|
||
|
||
指南中使用了原生的 SQL 拼接执行 NOTIFY:
|
||
|
||
await prisma.$executeRawUnsafe(
|
||
\`NOTIFY sse\_channel, '${safePayload.replace(/'/g, "''")}'\`
|
||
);
|
||
|
||
**🔥 灾难时序爆发:**
|
||
|
||
1. 这种通过字符串拼接执行 SQL 的方式,在任何正规后端的代码审计中都会被标为**高危 (Critical)**。
|
||
2. 虽然加了 .replace 单引号,但在不同编码或遇到特殊换行符、反斜杠 \\ 时,仍然可能导致 PostgreSQL 语法解析错误,甚至引发 SQL 注入。
|
||
|
||
### **✅ 骨灰级修正方案**
|
||
|
||
**抛弃拼接!使用 PostgreSQL 内置的 pg\_notify 函数配合 Prisma 的参数化查询(Tagged Template Literal)。**
|
||
|
||
这不仅彻底免疫 SQL 注入,而且完全不需要手动写 replace 转义:
|
||
|
||
const payloadStr \= JSON.stringify({ taskId, type: 'log', data: logEntry });
|
||
const safePayload \= payloadStr.length \> 7000 ? payloadStr.substring(0, 7000\) \+ '..."}' : payloadStr;
|
||
|
||
// 核心补丁:使用内置函数与参数化绑定,绝对安全!
|
||
await prisma.$executeRaw\`SELECT pg\_notify('sse\_channel', ${safePayload})\`;
|
||
|
||
## **🎯 终极审查结论**
|
||
|
||
您团队产出的这套《分布式 Fan-out 任务模式开发指南》底子极其优良,这证明了你们的技术选型方向是绝对正确的。
|
||
|
||
我指出的这 4 个致命漏洞,属于\*\*“分布式并发架构下极其隐蔽的角落”\*\*。即使是互联网大厂的高级开发,如果没踩过这些坑,单看代码也很难发现。
|
||
|
||
请立即要求开发团队将这 4 处“骨灰级补丁”更新到开发指南(升级为 v1.2 或 v2.0)中,然后再指导其他类似任务的开发。修复这些问题后,这套系统才算真正拥有了“抗造”的底气! |