Files
AIclinicalresearch/docs/02-通用能力层/分布式Fan-out任务模式开发指南.md
HaHafeng 85fda830c2 feat(ssa): Complete Phase V-A editable analysis plan variables
Features:
- Add editable variable selection in workflow plan (SingleVarSelect + MultiVarTags)
- Implement 3-layer flexible interception (warning bar + icon + blocking dialog)
- Add tool_param_constraints.json for 12 statistical tools parameter validation
- Add PATCH /workflow/:id/params API with Zod structural validation
- Implement synchronous parameter sync before execution (Promise chaining)
- Fix LLM hallucination by strict system prompt constraints
- Fix DynamicReport object-based rows compatibility (R baseline_table)
- Fix Word export row.map error with same normalization logic
- Restore inferGroupingVar for smart default variable selection
- Add ReactMarkdown rendering in SSAChatPane
- Update SSA module status document to v3.5

Modified files:
- backend: workflow.routes, ChatHandlerService, SystemPromptService, FlowTemplateService
- frontend: WorkflowTimeline, SSAWorkspacePane, DynamicReport, SSAChatPane, ssaStore, ssa.css
- config: tool_param_constraints.json (new)
- docs: SSA status doc, team review reports

Tested: Cohort study end-to-end execution + report export verified
Co-authored-by: Cursor <cursoragent@cursor.com>
2026-02-24 13:08:29 +08:00

432 lines
21 KiB
Markdown
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# 分布式 Fan-out 任务模式开发指南
> **版本:** v1.2(逐行级审查修正:乐观锁释放 + Sweeper updatedAt + 空集合守卫 + pg_notify 参数化)
> **创建日期:** 2026-02-23
> **定位:** 实战 Cookbook开发时按需查阅
> **互补文档:** `系统级异步架构风险剖析与演进技术蓝图.md`Why→ 本文How
> **Postgres-Only 指南:** `Postgres-Only异步任务处理指南.md`(底层规范)
> **首个试点:** ASL 工具 3 全文智能提取工作台(`docs/03-业务模块/ASL-AI智能文献/04-开发计划/08-工具3-*.md`
> **状态:** 🟡 设计阶段经验总结,待 ASL 工具 3 M1/M2 实战后升级为 v2.0
---
## 一、适用场景判断
| 维度 | Level 1单体任务 | Level 2Fan-out 任务 |
|------|-------------------|----------------------|
| **触发模式** | 1 触发 → 1 Worker → 结束 | 1 触发 → 1 Manager → N 个 Child Worker |
| **典型案例** | DC Tool C 解析 1 个 Excel | ASL 工具 3 批量提取 100 篇文献 |
| **失败代价** | 小(重跑 40 秒) | 极大(第 99 篇失败不应导致前 98 篇白做) |
| **并发挑战** | 无(单 Worker | 高N 个 Child 跨 Pod 竞争同一父任务计数器) |
**判断公式:** 如果你的任务是"1 次操作处理 N 个独立子项,且 N 可能 > 10",就必须使用 Fan-out 模式。
---
## 二、核心架构Manager + Child + Last Child Wins
```
┌─ API 层 ──────────────────────────────────┐
│ POST /tasks → 创建业务记录 → pgBoss.send │
│ (module_task_manager) │
└────────────────────────────────────────────┘
┌─ Manager Job ─────────────────────────────┐
│ 1. 读取 N 个子项 │
│ 1.5 🆕 if N=0 → 直接 completed + return │
│ 2. 快照外部依赖数据(防源头失踪) │
│ 3. for each → pgBoss.send(child_queue) │
│ 4. 派发完毕 → 退出Fire-and-forget
└────────────────────────────────────────────┘
↓ (N 个)
┌─ Child Job ───────────────────────────────┐
│ 1. 乐观锁抢占updateMany where status │
│ = pending → processing
│ 2. 执行业务逻辑 │
│ 3. 事务内:更新子项 + 原子递增父任务计数 │
│ 4. 判断 successCount + failedCount >= │
│ totalCount → 翻转父任务 completed │
│ 5. 错误分级:致命 return / 临时 throw │
└────────────────────────────────────────────┘
```
---
## 三、8 项关键设计模式
### 模式 1原子递增禁止 Read-then-Write
**问题:** 多个 Child 同时完成时,`count = count + 1` 的读写逻辑导致计数丢失。
```typescript
// ❌ 错误Read-then-Write 反模式
const task = await prisma.task.findUnique({ where: { id } });
await prisma.task.update({ data: { successCount: task.successCount + 1 } });
// ✅ 正确:数据库级原子操作
const taskAfterUpdate = await prisma.task.update({
where: { id: taskId },
data: { successCount: { increment: 1 } },
});
```
Prisma 的 `{ increment: 1 }` 编译为 SQL `SET success_count = success_count + 1`,数据库行锁保证原子性。
**🚨 v1.1 补充:短事务原则(防行锁争用)**
> **场景推演:** 100 个极轻量 Child Job缓存命中瞬间完成在不同 Pod 中几乎同时走到 `prisma.$transaction`。
> 这 100 个事务都需要对同一父任务行执行 `{ increment: 1 }`PostgreSQL 在这一行上加排他行锁Row-Level Lock
> 100 个并发请求排队等一把锁,极易触发 Lock wait timeout大量本已成功的任务在最后一步报数据库错误。
**强制规范:**
- **绝不允许**在更新父任务的 `$transaction` 内发起任何网络请求或耗时操作
- 事务必须极度纯粹:更新子项(Result) + 递增父亲(Task),确保事务在 **< 1ms** 内提交并释放行锁
- 高并发下若仍出现行锁超时,可在 Prisma 连接串中适当调大 `pool_timeout`
### 模式 2Last Child Wins终止器
**问题:** Manager 派发完就退出,没有人负责把父任务从 `processing` 翻转为 `completed`
**解法:** 每个 Child无论成功还是失败在原子递增后立即检查
```typescript
if (taskAfterUpdate.successCount + taskAfterUpdate.failedCount >= taskAfterUpdate.totalCount) {
await prisma.task.update({
where: { id: taskId },
data: { status: 'completed', completedAt: new Date() },
});
// 广播完成事件(如 NOTIFY
}
```
**关键:** 成功路径和失败路径都必须有这段检查。漏掉任何一条路径,任务就可能永远卡在 `processing`
**🚨 v1.1 补充Sweeper 清道夫 — 应对进程硬崩溃(最危险场景)**
> **场景推演:** N=100第 99 个 Child Worker 解析超大 PDF 触发 Node.js V8 OOM或容器被云平台 SIGKILL。
> 进程瞬间蒸发,代码根本没有机会走到 `catch` 块。`failedCount` 永远不会 +1。
> pg-boss 虽然会在 `expireInMinutes` 后标记该 Job 为 failed但业务表里
> `successCount + failedCount = 99`,永远达不到 100。父任务永远卡在 `processing`。
>
> **本质:** 单兵 Worker 无法处理自身猝死,必须有系统级外部兜底。
**解法:注册全局定时清道夫 `FanOut_Task_Sweeper`(每 10 分钟运行一次):**
```typescript
// 全局 Cron Job — 清道夫(建议用 pg-boss 的 schedule 功能注册)
async function fanOutTaskSweeper() {
const stuckTasks = await prisma.task.findMany({
where: {
status: 'processing',
// 🚨 v1.2 修正:使用 updatedAt最后活跃时间而非 startedAt
// 原因500 篇文献正常排队可能需要 3+ 小时,用 startedAt 会误杀健康任务。
// 只要子任务还在完成原子递增updatedAt 就会持续刷新。
// 超过 2 小时没有任何进度更新的,才是真正卡死。
updatedAt: { lt: new Date(Date.now() - 2 * 60 * 60 * 1000) },
},
});
for (const task of stuckTasks) {
await prisma.task.update({
where: { id: task.id },
data: {
status: 'failed',
errorMessage: '[Sweeper] No progress update for 2h. Likely Child Worker hard crash (OOM/SIGKILL). Force-closed.',
completedAt: new Date(),
},
});
logger.warn(`[Sweeper] Force-closed stuck task ${task.id} (no progress for 2h)`);
}
}
// 注册为 pg-boss 定时任务
await pgBoss.schedule('fanout_task_sweeper', '*/10 * * * *'); // 每 10 分钟
await pgBoss.work('fanout_task_sweeper', fanOutTaskSweeper);
```
**Sweeper 是 Fan-out 模式的终极保险丝。** 即使所有 Last Child Wins 逻辑都正确进程硬崩溃仍然是不可避免的物理级异常。Sweeper 确保任何"卡死"的任务最终都会被收口。
> **⚠️ v1.2 关键修正:** 判断"卡死"的依据是 `updatedAt`(最后活跃时间),而非 `startedAt`(任务创建时间)。
> 只要 Child 还在完成并递增计数Prisma 的 `update` 会自动刷新 `updatedAt`。
> 超大批量任务500+ 文献)正常排队执行可能需要数小时,用 `startedAt` 会导致 Sweeper 误杀正在健康运行的任务("友军之火")。
### 模式 3乐观锁抢占Optimistic Locking
**问题:** pg-boss 的 at-least-once 语义意味着同一 Child Job 可能被投递多次。如果用 `findUnique → if (status !== 'pending') return` 做幂等检查,两个 Worker 可能同时读到 `pending` 然后同时处理。
```typescript
// ❌ 错误Read-then-Write 幂等检查
const existing = await prisma.result.findUnique({ where: { id } });
if (existing?.status === 'completed') return; // 两个 Worker 可能同时到这里
// ✅ 正确:原子抢占
const lock = await prisma.result.updateMany({
where: { id: resultId, status: 'pending' },
data: { status: 'processing' },
});
if (lock.count === 0) return { success: true, note: 'Idempotent skip' };
```
`updateMany` 的 WHERE 条件充当乐观锁,数据库保证只有一个 Worker 能成功更新。
**🚨 v1.1 补充:子任务派发防重 — singletonKey 的真正意图**
> **场景推演:** Manager Job 在派发了 50 个 Child Job 后进程崩溃。pg-boss 的 at-least-once 语义会
> 重新投递 Manager Job。重试时 Manager 重新查出 100 个子项,再次循环派发 100 个 Child Job。
> 前 50 个任务被重复派发,队列瞬间塞满垃圾数据,并导致 Child Worker 重复处理。
**强制规范Manager 必须为每个 Child 赋予基于业务 ID 的 `singletonKey`**
```typescript
// Manager 内循环派发
for (const item of items) {
await pgBoss.send('module_task_child', { taskId, itemId: item.id }, {
singletonKey: `child-${item.id}`, // ← 基于业务 ID 去重!
retryLimit: 3,
retryBackoff: true,
expireInMinutes: 30,
});
}
```
**`singletonKey` 是保证 Manager 自身崩溃重试时不会导致子任务指数级爆炸的唯一防线。**
pg-boss 在收到重复 `singletonKey` 时自动去重(忽略重复插入),无需手动判断。新手开发**绝不可省略**此字段。
### 模式 4错误分级路由
**问题:** pg-boss 默认对所有失败 Job 进行指数退避重试。但"PDF 损坏"这类永久错误重试 3 次也不会好。
```typescript
try {
await doWork();
} catch (error) {
if (isPermanentError(error)) {
// 致命错误:更新业务状态为 error + 原子递增 failedCount
await markAsFailed(resultId, taskId, error.message);
// ⚠️ 别忘了 Last Child Wins 检查!
return { success: false }; // return 而非 throw → pg-boss 视为"成功消费",停止重试
}
// 🚨 v1.2 补丁:临时错误 throw 前必须释放乐观锁!
// 否则 pg-boss 重试时 updateMany({ where: { status: 'pending' } }) 返回 0
// 被误判为"幂等跳过"计数永远少一票Last Child Wins 永远无法触发。
await prisma.result.update({
where: { id: resultId },
data: { status: 'pending' },
});
// 临时错误 (429/5xx/网络抖动)throw → pg-boss 指数退避自动重试
throw error;
}
```
| 错误类型 | 处理方式 | pg-boss 行为 |
|---------|---------|-------------|
| 永久错误4xx、数据不存在、格式损坏 | `return` | 停止重试 |
| 临时错误429、5xx、网络超时 | `throw` | 指数退避重试 |
### 模式 5三级限流teamConcurrency
**问题:** 如果不限制 Child 并发1000 个 Job 被同时拉起 → 1000 个 `await` 挂起的闭包 → Node.js OOM。
```typescript
// 第一级Child Worker — 控制内存中的并发闭包数量
jobQueue.work('module_task_child', { teamConcurrency: 10 }, handler);
// 第二级:昂贵 API — 保护外部服务
jobQueue.work('module_expensive_api', { teamConcurrency: 2 }, handler);
// 第三级LLM 调用 — 保护 LLM 并发
jobQueue.work('module_llm_call', { teamConcurrency: 5 }, handler);
```
**`teamConcurrency` vs `P-Queue`**
- `P-Queue` 是进程内信号量,多 Pod 下每个 Pod 各自限流 → 全局并发 = 限制值 × Pod 数 → API 429
- `teamConcurrency` 是 PostgreSQL 行锁,跨所有 Node.js 实例全局生效
- **结论Fan-out 场景禁止使用 P-Queue必须用 teamConcurrency**
### 模式 6SSE 跨实例广播NOTIFY/LISTEN
**问题:** `sseEmitter.emit()` 基于内存 EventEmitter用户连 Pod A、Worker 跑 Pod B → Pod A 收不到日志。
```typescript
// Worker 端(发送)— v1.2 使用 pg_notify + 参数化查询(免疫 SQL 注入)
const payloadStr = JSON.stringify({ taskId, type: 'log', data: logEntry });
const safePayload = payloadStr.length > 7000
? payloadStr.substring(0, 7000) + '..."}'
: payloadStr;
// 🚨 v1.2 修正:抛弃 $executeRawUnsafe + 字符串拼接!
// 使用 PostgreSQL 内置 pg_notify() 函数 + Prisma Tagged Template参数化绑定
await prisma.$executeRaw`SELECT pg_notify('sse_channel', ${safePayload})`;
// API 端(接收)— Pod 启动时初始化
const pgClient = new Client({ connectionString: DATABASE_URL });
await pgClient.connect();
await pgClient.query('LISTEN sse_channel');
pgClient.on('notification', (msg) => {
const { taskId, type, data } = JSON.parse(msg.payload);
const clients = sseClients.get(taskId);
if (clients?.size > 0) {
for (const res of clients) {
res.write(`event: ${type}\ndata: ${JSON.stringify(data)}\n\n`);
}
}
});
```
**约束:**
- LISTEN 连接必须独立于连接池(归还后 LISTEN 失效)
- **NOTIFY payload 物理上限 ~8000 bytes**(超出直接报错,阻断业务流程!)
- **强制规范v1.1** 发送前必须安全截断至 7000 bytes 以内(预留 JSON 结构和转义开销)
- LLM 错误堆栈、超长乱码是最常见的超限来源
- **🚨 v1.2 强制规范:禁止 `$executeRawUnsafe` + 字符串拼接发送 NOTIFY**
- 必须使用 `$executeRaw` Tagged Template + `pg_notify()` 函数(参数化绑定,彻底免疫 SQL 注入)
- 不同编码、特殊换行符、反斜杠均可能绕过手动 `.replace` 转义
- fire-and-forget无持久化适合日志流这类"丢了不影响业务"的场景
### 模式 7数据一致性快照
**问题:** Fan-out 任务可能持续数十分钟。期间用户在源模块删改数据 → Child Worker 找不到依赖数据而崩溃。
**解法:** Manager 派发前一次性快照关键元数据,冻结到子项记录中:
```typescript
// Manager 中:批量快照
const pkbDocs = await Promise.all(
results.map(r => pkbBridge.getDocumentDetail(r.pkbDocumentId))
);
const docMap = new Map(pkbDocs.map(d => [d.documentId, d]));
await prisma.$transaction(
results.map(result => {
const doc = docMap.get(result.pkbDocumentId);
return prisma.result.update({
where: { id: result.id },
data: {
snapshotStorageKey: doc?.storageKey ?? null,
snapshotFilename: doc?.filename ?? null,
}
});
})
);
```
**原则:** 快照轻量元数据storageKey、filename 等 < 1KB到数据库。大文件内容不快照通过错误分级路由兜底。
### 🆕 模式 8Manager 空集合边界守卫v1.2
**问题:** 如果源数据被过滤后 `items.length === 0`空列表、数据异常等极端情况Manager 的 `for` 循环不执行,没有任何 Child 被派发Last Child Wins 永远不会触发,父任务永远卡在 `processing`
```typescript
// Manager Worker — 派发前必须检查空集合
if (items.length === 0) {
await prisma.task.update({
where: { id: taskId },
data: { status: 'completed', completedAt: new Date() },
});
logger.info(`[Manager] Task ${taskId}: 0 items, auto-completed`);
return;
}
// 正常路径:继续快照 + 派发 Child Job...
```
**这是 Last Child Wins 的唯一盲区。** 当 N=0 时Manager 必须自己充当"收口人"直接完成任务。
---
## 四、反模式速查表
| 反模式 | 后果 | 正确做法 |
|--------|------|---------|
| 内存计数 `count + 1` | 多 Pod 计数丢失 | Prisma `{ increment: 1 }` |
| `findUnique → if → update` 幂等 | 并发穿透 | `updateMany({ where: { status: 'pending' } })` |
| Manager 等待所有 Child 完成 | Manager 进程挂起,消耗连接 | Fire-and-forget + Last Child Wins |
| P-Queue 限流 | 多 Pod 失效 | pg-boss `teamConcurrency` |
| 内存 EventEmitter 跨 Pod | SSE 日志断裂 | PostgreSQL NOTIFY/LISTEN |
| Job payload 塞大数据 | pg-boss 阻塞 | 仅传 ID< 1KB数据存 DB/OSS |
| 队列名用点号 | pg-boss 路由截断 | 下划线命名(`module_task_child` |
| 不设 `expireInMinutes` | 僵尸 Job 占据队列名额 | Manager: 60min, Child: 30min |
| 成功路径漏检 Last Child Wins | 任务永远卡在 processing | 成功 + 失败路径都检查 |
| Child 运行时回查外部模块数据 | 源头删改导致批量崩溃 | Manager 快照元数据到子项记录 |
| 🆕 无 Sweeper 清道夫 | 进程 OOM/SIGKILL 后任务永远卡死 | 全局 Cron 扫描 processing > 2h 强制收口 |
| 🆕 事务内做网络请求 | 父表行锁长时间持有 → Lock timeout | 短事务:仅更新 Result + 递增 Task |
| 🆕 Child 派发漏 singletonKey | Manager 重试导致子任务指数级爆炸 | `singletonKey: child-${itemId}` |
| 🆕 NOTIFY payload 不截断 | 超 8000 bytes 直接报错阻断流程 | 发送前截断至 7000 bytes |
| 🆕 临时错误 throw 前不释放乐观锁 | 重试时被误判"幂等跳过",计数永远缺一票 | throw 前 `update({ status: 'pending' })` |
| 🆕 Sweeper 用 `startedAt` 判断卡死 | 误杀正在排队的健康超大批量任务 | 用 `updatedAt`(最后活跃时间) |
| 🆕 Manager 不检查空集合 | N=0 时无 Child → Last Child Wins 死锁 | `if (items.length === 0)` 直接 completed |
| 🆕 NOTIFY 用 `$executeRawUnsafe` 拼接 | SQL 注入高危 | `$executeRaw` + `pg_notify()` 参数化 |
---
## 五、pg-boss 配置速查
```typescript
// Manager Job 派发
await pgBoss.send('module_task_manager', { taskId }, {
retryLimit: 2,
expireInMinutes: 60,
singletonKey: `manager-${taskId}`, // 防止同一任务重复派发
});
// Child Job 派发Manager 内循环)
await pgBoss.send('module_task_child', { taskId, itemId }, {
retryLimit: 3,
retryDelay: 10, // 10 秒后重试
retryBackoff: true, // 指数退避10s, 20s, 40s
expireInMinutes: 30,
singletonKey: `child-${itemId}`, // ← 派发防重Manager 崩溃重试时 pg-boss 自动去重
});
// Worker 注册(队列名必须用下划线!)
jobQueue.work('module_task_child', { teamConcurrency: 10 }, handler);
```
---
## 六、开发检查清单
在 Code Review 时,逐项核对以下问题:
- [ ] **原子递增**:父任务计数器是否使用 `{ increment: 1 }`
- [ ] **Last Child Wins**:成功路径和失败路径是否都检查了 `successCount + failedCount >= totalCount`
- [ ] **乐观锁**Child Worker 是否使用 `updateMany({ where: { status: 'pending' } })` 而非 `findUnique → if`
- [ ] **错误分级**:永久错误是否 `return`(停止重试)?临时错误是否 `throw`(指数退避)?
- [ ] **teamConcurrency**Child 队列是否设置了全局并发限制?是否禁用了 P-Queue
- [ ] **Payload 轻量**Job data 是否仅传 ID< 1KB
- [ ] **过期时间**:是否设置了 `expireInMinutes`
- [ ] **队列命名**:是否使用下划线(`module_task_child`),而非点号?
- [ ] **数据快照**Manager 是否在派发前快照了外部依赖数据?
- [ ] **NOTIFY 广播**SSE 日志推送是否经过 PostgreSQL NOTIFY如需跨 Pod
- [ ] **事务保障**:子项状态更新 + 父任务原子递增是否在同一事务中?
- [ ] 🆕 **Sweeper 清道夫**:是否注册了全局定时任务扫描 processing > 2h 的父任务并强制收口?
- [ ] 🆕 **短事务原则**`$transaction` 内是否仅包含纯 DB 操作(无网络请求/无耗时计算)?
- [ ] 🆕 **派发防重**Manager 循环派发 Child 时是否设置了 `singletonKey: child-${itemId}`
- [ ] 🆕 **NOTIFY 截断**NOTIFY payload 发送前是否截断至 7000 bytes 以内?
- [ ] 🆕 **乐观锁释放**:临时错误 `throw` 前是否将子项状态回退为 `pending`(防重试时被幂等跳过)?
- [ ] 🆕 **Sweeper 活跃判定**:清道夫是否基于 `updatedAt`(而非 `startedAt`)判断任务卡死?
- [ ] 🆕 **空集合守卫**Manager 是否在 `items.length === 0` 时直接将任务标记为 `completed`
- [ ] 🆕 **NOTIFY 参数化**:是否使用 `$executeRaw` + `pg_notify()` 而非 `$executeRawUnsafe` + 字符串拼接?
---
## 七、演进路线
| 阶段 | 时间 | 内容 |
|------|------|------|
| v1.0 设计沉淀 | 2026-02 | 基于 ASL 工具 3 架构审查经验编写本指南(当前) |
| v1.5 实战验证 | ASL M1 完成后 | 将 M1 开发中遇到的实际问题补充到本文 |
| v2.0 基建抽象 | ASL M2 完成后 | 将 Fan-out 通用逻辑抽离为 `common/jobs/FanOutHelper.ts` |
| v2.5 全量推广 | 后续模块 | IIT Agent 批量质控、DC 批量 ETL 等模块复用 Fan-out 基建 |
> **设计原则:** 先在 ASL 工具 3 中"打样",踩完坑后再抽象为平台能力。避免过早抽象导致接口不合理。
---
*本文档基于 ASL 工具 3 全文智能提取工作台开发计划v1.5,经 6 轮架构审查)的设计经验总结。*
*v1.1 补充 4 项生产级防御策略Sweeper 清道夫、短事务原则、singletonKey 派发防重、NOTIFY 安全截断。*
*v1.2 逐行级审查修正 4 项致命漏洞乐观锁与重试绞杀、Sweeper 友军之火、空集合死锁、SQL 注入隐患。*
*待 M1/M2 实战后升级为 v2.0,届时补充真实踩坑记录和性能数据。*