# 分布式 Fan-out 任务模式开发指南 > **版本:** v1.2(逐行级审查修正:乐观锁释放 + Sweeper updatedAt + 空集合守卫 + pg_notify 参数化) > **创建日期:** 2026-02-23 > **定位:** 实战 Cookbook,开发时按需查阅 > **互补文档:** `系统级异步架构风险剖析与演进技术蓝图.md`(Why)→ 本文(How) > **Postgres-Only 指南:** `Postgres-Only异步任务处理指南.md`(底层规范) > **首个试点:** ASL 工具 3 全文智能提取工作台(`docs/03-业务模块/ASL-AI智能文献/04-开发计划/08-工具3-*.md`) > **状态:** 🟡 设计阶段经验总结,待 ASL 工具 3 M1/M2 实战后升级为 v2.0 --- ## 一、适用场景判断 | 维度 | Level 1:单体任务 | Level 2:Fan-out 任务 | |------|-------------------|----------------------| | **触发模式** | 1 触发 → 1 Worker → 结束 | 1 触发 → 1 Manager → N 个 Child Worker | | **典型案例** | DC Tool C 解析 1 个 Excel | ASL 工具 3 批量提取 100 篇文献 | | **失败代价** | 小(重跑 40 秒) | 极大(第 99 篇失败不应导致前 98 篇白做) | | **并发挑战** | 无(单 Worker) | 高(N 个 Child 跨 Pod 竞争同一父任务计数器) | **判断公式:** 如果你的任务是"1 次操作处理 N 个独立子项,且 N 可能 > 10",就必须使用 Fan-out 模式。 --- ## 二、核心架构:Manager + Child + Last Child Wins ``` ┌─ API 层 ──────────────────────────────────┐ │ POST /tasks → 创建业务记录 → pgBoss.send │ │ (module_task_manager) │ └────────────────────────────────────────────┘ ↓ ┌─ Manager Job ─────────────────────────────┐ │ 1. 读取 N 个子项 │ │ 1.5 🆕 if N=0 → 直接 completed + return │ │ 2. 快照外部依赖数据(防源头失踪) │ │ 3. for each → pgBoss.send(child_queue) │ │ 4. 派发完毕 → 退出(Fire-and-forget) │ └────────────────────────────────────────────┘ ↓ (N 个) ┌─ Child Job ───────────────────────────────┐ │ 1. 乐观锁抢占(updateMany where status │ │ = pending → processing) │ │ 2. 执行业务逻辑 │ │ 3. 事务内:更新子项 + 原子递增父任务计数 │ │ 4. 判断 successCount + failedCount >= │ │ totalCount → 翻转父任务 completed │ │ 5. 错误分级:致命 return / 临时 throw │ └────────────────────────────────────────────┘ ``` --- ## 三、8 项关键设计模式 ### 模式 1:原子递增(禁止 Read-then-Write) **问题:** 多个 Child 同时完成时,`count = count + 1` 的读写逻辑导致计数丢失。 ```typescript // ❌ 错误:Read-then-Write 反模式 const task = await prisma.task.findUnique({ where: { id } }); await prisma.task.update({ data: { successCount: task.successCount + 1 } }); // ✅ 正确:数据库级原子操作 const taskAfterUpdate = await prisma.task.update({ where: { id: taskId }, data: { successCount: { increment: 1 } }, }); ``` Prisma 的 `{ increment: 1 }` 编译为 SQL `SET success_count = success_count + 1`,数据库行锁保证原子性。 **🚨 v1.1 补充:短事务原则(防行锁争用)** > **场景推演:** 100 个极轻量 Child Job(缓存命中,瞬间完成)在不同 Pod 中几乎同时走到 `prisma.$transaction`。 > 这 100 个事务都需要对同一父任务行执行 `{ increment: 1 }`,PostgreSQL 在这一行上加排他行锁(Row-Level Lock)。 > 100 个并发请求排队等一把锁,极易触发 Lock wait timeout,大量本已成功的任务在最后一步报数据库错误。 **强制规范:** - **绝不允许**在更新父任务的 `$transaction` 内发起任何网络请求或耗时操作 - 事务必须极度纯粹:更新子项(Result) + 递增父亲(Task),确保事务在 **< 1ms** 内提交并释放行锁 - 高并发下若仍出现行锁超时,可在 Prisma 连接串中适当调大 `pool_timeout` ### 模式 2:Last Child Wins(终止器) **问题:** Manager 派发完就退出,没有人负责把父任务从 `processing` 翻转为 `completed`。 **解法:** 每个 Child(无论成功还是失败)在原子递增后立即检查: ```typescript if (taskAfterUpdate.successCount + taskAfterUpdate.failedCount >= taskAfterUpdate.totalCount) { await prisma.task.update({ where: { id: taskId }, data: { status: 'completed', completedAt: new Date() }, }); // 广播完成事件(如 NOTIFY) } ``` **关键:** 成功路径和失败路径都必须有这段检查。漏掉任何一条路径,任务就可能永远卡在 `processing`。 **🚨 v1.1 补充:Sweeper 清道夫 — 应对进程硬崩溃(最危险场景)** > **场景推演:** N=100,第 99 个 Child Worker 解析超大 PDF 触发 Node.js V8 OOM,或容器被云平台 SIGKILL。 > 进程瞬间蒸发,代码根本没有机会走到 `catch` 块。`failedCount` 永远不会 +1。 > pg-boss 虽然会在 `expireInMinutes` 后标记该 Job 为 failed,但业务表里 > `successCount + failedCount = 99`,永远达不到 100。父任务永远卡在 `processing`。 > > **本质:** 单兵 Worker 无法处理自身猝死,必须有系统级外部兜底。 **解法:注册全局定时清道夫 `FanOut_Task_Sweeper`(每 10 分钟运行一次):** ```typescript // 全局 Cron Job — 清道夫(建议用 pg-boss 的 schedule 功能注册) async function fanOutTaskSweeper() { const stuckTasks = await prisma.task.findMany({ where: { status: 'processing', // 🚨 v1.2 修正:使用 updatedAt(最后活跃时间)而非 startedAt! // 原因:500 篇文献正常排队可能需要 3+ 小时,用 startedAt 会误杀健康任务。 // 只要子任务还在完成(原子递增),updatedAt 就会持续刷新。 // 超过 2 小时没有任何进度更新的,才是真正卡死。 updatedAt: { lt: new Date(Date.now() - 2 * 60 * 60 * 1000) }, }, }); for (const task of stuckTasks) { await prisma.task.update({ where: { id: task.id }, data: { status: 'failed', errorMessage: '[Sweeper] No progress update for 2h. Likely Child Worker hard crash (OOM/SIGKILL). Force-closed.', completedAt: new Date(), }, }); logger.warn(`[Sweeper] Force-closed stuck task ${task.id} (no progress for 2h)`); } } // 注册为 pg-boss 定时任务 await pgBoss.schedule('fanout_task_sweeper', '*/10 * * * *'); // 每 10 分钟 await pgBoss.work('fanout_task_sweeper', fanOutTaskSweeper); ``` **Sweeper 是 Fan-out 模式的终极保险丝。** 即使所有 Last Child Wins 逻辑都正确,进程硬崩溃仍然是不可避免的物理级异常。Sweeper 确保任何"卡死"的任务最终都会被收口。 > **⚠️ v1.2 关键修正:** 判断"卡死"的依据是 `updatedAt`(最后活跃时间),而非 `startedAt`(任务创建时间)。 > 只要 Child 还在完成并递增计数,Prisma 的 `update` 会自动刷新 `updatedAt`。 > 超大批量任务(500+ 文献)正常排队执行可能需要数小时,用 `startedAt` 会导致 Sweeper 误杀正在健康运行的任务("友军之火")。 ### 模式 3:乐观锁抢占(Optimistic Locking) **问题:** pg-boss 的 at-least-once 语义意味着同一 Child Job 可能被投递多次。如果用 `findUnique → if (status !== 'pending') return` 做幂等检查,两个 Worker 可能同时读到 `pending` 然后同时处理。 ```typescript // ❌ 错误:Read-then-Write 幂等检查 const existing = await prisma.result.findUnique({ where: { id } }); if (existing?.status === 'completed') return; // 两个 Worker 可能同时到这里 // ✅ 正确:原子抢占 const lock = await prisma.result.updateMany({ where: { id: resultId, status: 'pending' }, data: { status: 'processing' }, }); if (lock.count === 0) return { success: true, note: 'Idempotent skip' }; ``` `updateMany` 的 WHERE 条件充当乐观锁,数据库保证只有一个 Worker 能成功更新。 **🚨 v1.1 补充:子任务派发防重 — singletonKey 的真正意图** > **场景推演:** Manager Job 在派发了 50 个 Child Job 后,进程崩溃。pg-boss 的 at-least-once 语义会 > 重新投递 Manager Job。重试时 Manager 重新查出 100 个子项,再次循环派发 100 个 Child Job。 > 前 50 个任务被重复派发,队列瞬间塞满垃圾数据,并导致 Child Worker 重复处理。 **强制规范:Manager 必须为每个 Child 赋予基于业务 ID 的 `singletonKey`:** ```typescript // Manager 内循环派发 for (const item of items) { await pgBoss.send('module_task_child', { taskId, itemId: item.id }, { singletonKey: `child-${item.id}`, // ← 基于业务 ID 去重! retryLimit: 3, retryBackoff: true, expireInMinutes: 30, }); } ``` **`singletonKey` 是保证 Manager 自身崩溃重试时不会导致子任务指数级爆炸的唯一防线。** pg-boss 在收到重复 `singletonKey` 时自动去重(忽略重复插入),无需手动判断。新手开发**绝不可省略**此字段。 ### 模式 4:错误分级路由 **问题:** pg-boss 默认对所有失败 Job 进行指数退避重试。但"PDF 损坏"这类永久错误重试 3 次也不会好。 ```typescript try { await doWork(); } catch (error) { if (isPermanentError(error)) { // 致命错误:更新业务状态为 error + 原子递增 failedCount await markAsFailed(resultId, taskId, error.message); // ⚠️ 别忘了 Last Child Wins 检查! return { success: false }; // return 而非 throw → pg-boss 视为"成功消费",停止重试 } // 🚨 v1.2 补丁:临时错误 throw 前必须释放乐观锁! // 否则 pg-boss 重试时 updateMany({ where: { status: 'pending' } }) 返回 0, // 被误判为"幂等跳过",计数永远少一票,Last Child Wins 永远无法触发。 await prisma.result.update({ where: { id: resultId }, data: { status: 'pending' }, }); // 临时错误 (429/5xx/网络抖动):throw → pg-boss 指数退避自动重试 throw error; } ``` | 错误类型 | 处理方式 | pg-boss 行为 | |---------|---------|-------------| | 永久错误(4xx、数据不存在、格式损坏) | `return` | 停止重试 | | 临时错误(429、5xx、网络超时) | `throw` | 指数退避重试 | ### 模式 5:三级限流(teamConcurrency) **问题:** 如果不限制 Child 并发,1000 个 Job 被同时拉起 → 1000 个 `await` 挂起的闭包 → Node.js OOM。 ```typescript // 第一级:Child Worker — 控制内存中的并发闭包数量 jobQueue.work('module_task_child', { teamConcurrency: 10 }, handler); // 第二级:昂贵 API — 保护外部服务 jobQueue.work('module_expensive_api', { teamConcurrency: 2 }, handler); // 第三级:LLM 调用 — 保护 LLM 并发 jobQueue.work('module_llm_call', { teamConcurrency: 5 }, handler); ``` **`teamConcurrency` vs `P-Queue`:** - `P-Queue` 是进程内信号量,多 Pod 下每个 Pod 各自限流 → 全局并发 = 限制值 × Pod 数 → API 429 - `teamConcurrency` 是 PostgreSQL 行锁,跨所有 Node.js 实例全局生效 - **结论:Fan-out 场景禁止使用 P-Queue,必须用 teamConcurrency** ### 模式 6:SSE 跨实例广播(NOTIFY/LISTEN) **问题:** `sseEmitter.emit()` 基于内存 EventEmitter,用户连 Pod A、Worker 跑 Pod B → Pod A 收不到日志。 ```typescript // Worker 端(发送)— v1.2 使用 pg_notify + 参数化查询(免疫 SQL 注入) const payloadStr = JSON.stringify({ taskId, type: 'log', data: logEntry }); const safePayload = payloadStr.length > 7000 ? payloadStr.substring(0, 7000) + '..."}' : payloadStr; // 🚨 v1.2 修正:抛弃 $executeRawUnsafe + 字符串拼接! // 使用 PostgreSQL 内置 pg_notify() 函数 + Prisma Tagged Template(参数化绑定) await prisma.$executeRaw`SELECT pg_notify('sse_channel', ${safePayload})`; // API 端(接收)— Pod 启动时初始化 const pgClient = new Client({ connectionString: DATABASE_URL }); await pgClient.connect(); await pgClient.query('LISTEN sse_channel'); pgClient.on('notification', (msg) => { const { taskId, type, data } = JSON.parse(msg.payload); const clients = sseClients.get(taskId); if (clients?.size > 0) { for (const res of clients) { res.write(`event: ${type}\ndata: ${JSON.stringify(data)}\n\n`); } } }); ``` **约束:** - LISTEN 连接必须独立于连接池(归还后 LISTEN 失效) - **NOTIFY payload 物理上限 ~8000 bytes**(超出直接报错,阻断业务流程!) - **强制规范(v1.1):** 发送前必须安全截断至 7000 bytes 以内(预留 JSON 结构和转义开销) - LLM 错误堆栈、超长乱码是最常见的超限来源 - **🚨 v1.2 强制规范:禁止 `$executeRawUnsafe` + 字符串拼接发送 NOTIFY!** - 必须使用 `$executeRaw` Tagged Template + `pg_notify()` 函数(参数化绑定,彻底免疫 SQL 注入) - 不同编码、特殊换行符、反斜杠均可能绕过手动 `.replace` 转义 - fire-and-forget(无持久化),适合日志流这类"丢了不影响业务"的场景 ### 模式 7:数据一致性快照 **问题:** Fan-out 任务可能持续数十分钟。期间用户在源模块删改数据 → Child Worker 找不到依赖数据而崩溃。 **解法:** Manager 派发前一次性快照关键元数据,冻结到子项记录中: ```typescript // Manager 中:批量快照 const pkbDocs = await Promise.all( results.map(r => pkbBridge.getDocumentDetail(r.pkbDocumentId)) ); const docMap = new Map(pkbDocs.map(d => [d.documentId, d])); await prisma.$transaction( results.map(result => { const doc = docMap.get(result.pkbDocumentId); return prisma.result.update({ where: { id: result.id }, data: { snapshotStorageKey: doc?.storageKey ?? null, snapshotFilename: doc?.filename ?? null, } }); }) ); ``` **原则:** 快照轻量元数据(storageKey、filename 等 < 1KB)到数据库。大文件内容不快照,通过错误分级路由兜底。 ### 🆕 模式 8:Manager 空集合边界守卫(v1.2) **问题:** 如果源数据被过滤后 `items.length === 0`(空列表、数据异常等极端情况),Manager 的 `for` 循环不执行,没有任何 Child 被派发,Last Child Wins 永远不会触发,父任务永远卡在 `processing`。 ```typescript // Manager Worker — 派发前必须检查空集合 if (items.length === 0) { await prisma.task.update({ where: { id: taskId }, data: { status: 'completed', completedAt: new Date() }, }); logger.info(`[Manager] Task ${taskId}: 0 items, auto-completed`); return; } // 正常路径:继续快照 + 派发 Child Job... ``` **这是 Last Child Wins 的唯一盲区。** 当 N=0 时,Manager 必须自己充当"收口人"直接完成任务。 --- ## 四、反模式速查表 | 反模式 | 后果 | 正确做法 | |--------|------|---------| | 内存计数 `count + 1` | 多 Pod 计数丢失 | Prisma `{ increment: 1 }` | | `findUnique → if → update` 幂等 | 并发穿透 | `updateMany({ where: { status: 'pending' } })` | | Manager 等待所有 Child 完成 | Manager 进程挂起,消耗连接 | Fire-and-forget + Last Child Wins | | P-Queue 限流 | 多 Pod 失效 | pg-boss `teamConcurrency` | | 内存 EventEmitter 跨 Pod | SSE 日志断裂 | PostgreSQL NOTIFY/LISTEN | | Job payload 塞大数据 | pg-boss 阻塞 | 仅传 ID(< 1KB),数据存 DB/OSS | | 队列名用点号 | pg-boss 路由截断 | 下划线命名(`module_task_child`) | | 不设 `expireInMinutes` | 僵尸 Job 占据队列名额 | Manager: 60min, Child: 30min | | 成功路径漏检 Last Child Wins | 任务永远卡在 processing | 成功 + 失败路径都检查 | | Child 运行时回查外部模块数据 | 源头删改导致批量崩溃 | Manager 快照元数据到子项记录 | | 🆕 无 Sweeper 清道夫 | 进程 OOM/SIGKILL 后任务永远卡死 | 全局 Cron 扫描 processing > 2h 强制收口 | | 🆕 事务内做网络请求 | 父表行锁长时间持有 → Lock timeout | 短事务:仅更新 Result + 递增 Task | | 🆕 Child 派发漏 singletonKey | Manager 重试导致子任务指数级爆炸 | `singletonKey: child-${itemId}` | | 🆕 NOTIFY payload 不截断 | 超 8000 bytes 直接报错阻断流程 | 发送前截断至 7000 bytes | | 🆕 临时错误 throw 前不释放乐观锁 | 重试时被误判"幂等跳过",计数永远缺一票 | throw 前 `update({ status: 'pending' })` | | 🆕 Sweeper 用 `startedAt` 判断卡死 | 误杀正在排队的健康超大批量任务 | 用 `updatedAt`(最后活跃时间) | | 🆕 Manager 不检查空集合 | N=0 时无 Child → Last Child Wins 死锁 | `if (items.length === 0)` 直接 completed | | 🆕 NOTIFY 用 `$executeRawUnsafe` 拼接 | SQL 注入高危 | `$executeRaw` + `pg_notify()` 参数化 | --- ## 五、pg-boss 配置速查 ```typescript // Manager Job 派发 await pgBoss.send('module_task_manager', { taskId }, { retryLimit: 2, expireInMinutes: 60, singletonKey: `manager-${taskId}`, // 防止同一任务重复派发 }); // Child Job 派发(Manager 内循环) await pgBoss.send('module_task_child', { taskId, itemId }, { retryLimit: 3, retryDelay: 10, // 10 秒后重试 retryBackoff: true, // 指数退避(10s, 20s, 40s) expireInMinutes: 30, singletonKey: `child-${itemId}`, // ← 派发防重!Manager 崩溃重试时 pg-boss 自动去重 }); // Worker 注册(队列名必须用下划线!) jobQueue.work('module_task_child', { teamConcurrency: 10 }, handler); ``` --- ## 六、开发检查清单 在 Code Review 时,逐项核对以下问题: - [ ] **原子递增**:父任务计数器是否使用 `{ increment: 1 }`? - [ ] **Last Child Wins**:成功路径和失败路径是否都检查了 `successCount + failedCount >= totalCount`? - [ ] **乐观锁**:Child Worker 是否使用 `updateMany({ where: { status: 'pending' } })` 而非 `findUnique → if`? - [ ] **错误分级**:永久错误是否 `return`(停止重试)?临时错误是否 `throw`(指数退避)? - [ ] **teamConcurrency**:Child 队列是否设置了全局并发限制?是否禁用了 P-Queue? - [ ] **Payload 轻量**:Job data 是否仅传 ID(< 1KB)? - [ ] **过期时间**:是否设置了 `expireInMinutes`? - [ ] **队列命名**:是否使用下划线(`module_task_child`),而非点号? - [ ] **数据快照**:Manager 是否在派发前快照了外部依赖数据? - [ ] **NOTIFY 广播**:SSE 日志推送是否经过 PostgreSQL NOTIFY(如需跨 Pod)? - [ ] **事务保障**:子项状态更新 + 父任务原子递增是否在同一事务中? - [ ] 🆕 **Sweeper 清道夫**:是否注册了全局定时任务扫描 processing > 2h 的父任务并强制收口? - [ ] 🆕 **短事务原则**:`$transaction` 内是否仅包含纯 DB 操作(无网络请求/无耗时计算)? - [ ] 🆕 **派发防重**:Manager 循环派发 Child 时是否设置了 `singletonKey: child-${itemId}`? - [ ] 🆕 **NOTIFY 截断**:NOTIFY payload 发送前是否截断至 7000 bytes 以内? - [ ] 🆕 **乐观锁释放**:临时错误 `throw` 前是否将子项状态回退为 `pending`(防重试时被幂等跳过)? - [ ] 🆕 **Sweeper 活跃判定**:清道夫是否基于 `updatedAt`(而非 `startedAt`)判断任务卡死? - [ ] 🆕 **空集合守卫**:Manager 是否在 `items.length === 0` 时直接将任务标记为 `completed`? - [ ] 🆕 **NOTIFY 参数化**:是否使用 `$executeRaw` + `pg_notify()` 而非 `$executeRawUnsafe` + 字符串拼接? --- ## 七、演进路线 | 阶段 | 时间 | 内容 | |------|------|------| | v1.0 设计沉淀 | 2026-02 | 基于 ASL 工具 3 架构审查经验编写本指南(当前) | | v1.5 实战验证 | ASL M1 完成后 | 将 M1 开发中遇到的实际问题补充到本文 | | v2.0 基建抽象 | ASL M2 完成后 | 将 Fan-out 通用逻辑抽离为 `common/jobs/FanOutHelper.ts` | | v2.5 全量推广 | 后续模块 | IIT Agent 批量质控、DC 批量 ETL 等模块复用 Fan-out 基建 | > **设计原则:** 先在 ASL 工具 3 中"打样",踩完坑后再抽象为平台能力。避免过早抽象导致接口不合理。 --- *本文档基于 ASL 工具 3 全文智能提取工作台开发计划(v1.5,经 6 轮架构审查)的设计经验总结。* *v1.1 补充 4 项生产级防御策略:Sweeper 清道夫、短事务原则、singletonKey 派发防重、NOTIFY 安全截断。* *v1.2 逐行级审查修正 4 项致命漏洞:乐观锁与重试绞杀、Sweeper 友军之火、空集合死锁、SQL 注入隐患。* *待 M1/M2 实战后升级为 v2.0,届时补充真实踩坑记录和性能数据。*