# 散装派发与轮询收口任务模式指南 > **版本:** v1.1(优化:DB 级幂等 + groupBy 进度查询 + expireInMinutes 注释) > **创建日期:** 2026-02-24 > **定位:** Level 2 批量任务实战 Cookbook(1 触发 → N 个独立 Worker → Aggregator 收口) > **底层规范:** `Postgres-Only异步任务处理指南.md`(Level 1,队列命名、Payload、过期时间等强制规范) > **首个试点:** ASL 工具 3 全文智能提取工作台 > **替代方案:** `分布式Fan-out任务模式开发指南.md`(Level 3,适用于万级任务 + 10+ Pod,当前阶段不推荐) --- ## 一、适用场景判断 | 维度 | Level 1:单体任务 | **Level 2:散装派发(本文)** | Level 3:Fan-out | |------|-------------------|---------------------------|-----------------| | **触发模式** | 1 触发 → 1 Worker → 结束 | 1 触发 → N 个独立 Worker → Aggregator 收口 | 1 触发 → Manager → N Child → Last Child Wins | | **典型案例** | DC Tool C 解析 1 个 Excel | **ASL 工具 3 批量提取 100 篇文献** | 万级任务 + 严格实时计数 | | **复杂度** | ⭐ | ⭐⭐ | ⭐⭐⭐⭐⭐ | | **团队要求** | 初级 | 初中级 | 高级(需掌握 12+ 并发模式) | **判断公式:** 如果你的任务是"1 次操作处理 N 个独立子项,N 可能 > 10,且团队 < 10 人",选本文的散装模式。 --- ## 二、核心架构:散装派发 + 独立单兵 + 轮询收口 ``` ┌─ API 层 ──────────────────────────────────────┐ │ POST /tasks │ │ 1. 创建 1 个 Task(status: processing) │ │ 2. 批量创建 N 个 Result(status: pending) │ │ 3. 散装派发 N 个 pgBoss Job(一次 insert) │ │ 4. 立即返回 taskId(< 0.1 秒) │ └────────────────────────────────────────────────┘ ↓ (N 个独立 Job) ┌─ Worker 层(多 Pod 各自抢单)─────────────────┐ │ 每个 Worker 只管自己的 1 篇文献: │ │ 1. 幽灵重试守卫(updateMany 防覆盖已完成) │ │ 2. 执行业务逻辑(MinerU + LLM) │ │ 3. 只更新自己的 Result 行 │ │ 4. 绝不触碰父任务 Task 表! │ │ 5. 错误分级:致命 return / 临时回退 + throw │ └────────────────────────────────────────────────┘ ┌─ Aggregator(全局唯一,每 10 秒)─────────────┐ │ 1. 清理僵尸:extracting > 30min → error │ │ 2. 聚合统计:GROUP BY Result.status │ │ 3. 收口判定:pending=0 且 extracting=0 │ │ → 标记 Task completed │ └────────────────────────────────────────────────┘ ┌─ 前端(React Query 轮询)─────────────────────┐ │ GET /tasks/:taskId/status │ │ → 实时 COUNT(Result) 聚合进度 │ │ → 无需 successCount/failedCount 冗余字段 │ └────────────────────────────────────────────────┘ ``` ### 为什么不用 Fan-out? | Fan-out 的代价 | 散装模式如何规避 | |---------------|----------------| | Last Child Wins(终止器逻辑) | **Aggregator 轮询收口**,零并发竞争 | | 原子递增 `successCount`(行锁争用) | **Worker 不碰父表**,进度由 COUNT 聚合 | | 乐观锁与重试的互相绞杀 | **幽灵重试守卫 + Aggregator 僵尸清理**,两层配合 | | Sweeper 清道夫 | **Aggregator 自带清理**,一个组件两个职责 | | NOTIFY/LISTEN 跨 Pod 广播 | **纯轮询**,无需跨 Pod 通信 | | 12+ 设计模式、19 项检查清单 | **4 个核心模式 + 8 项检查清单** | --- ## 三、数据库设计 ```prisma model AslExtractionTask { id String @id @default(uuid()) projectId String templateId String idempotencyKey String? @unique // v1.1:DB 级幂等(防并发重复创建) totalCount Int // 创建后不再变更 status String @default("processing") // 仅 Aggregator 改为 completed createdAt DateTime @default(now()) completedAt DateTime? results AslExtractionResult[] @@schema("asl_schema") } model AslExtractionResult { id String @id @default(uuid()) taskId String pkbDocumentId String status String @default("pending") // pending → extracting → completed/error extractedData Json? errorMessage String? task AslExtractionTask @relation(fields: [taskId], references: [id]) @@index([taskId, status]) // Aggregator 聚合查询加速 @@schema("asl_schema") } ``` **关键设计:Task 表无 `successCount`/`failedCount`。** 进度由 `COUNT(Result WHERE status=...)` 实时聚合,彻底消灭多 Worker 对同一行的写竞争。 --- ## 四、4 项核心代码模式 ### 模式 1:API 层散装派发 ```typescript async function createTask(req, reply) { const { projectId, templateId, documentIds, idempotencyKey } = req.body; if (documentIds.length === 0) throw new Error('No documents selected'); // ═══════════════════════════════════════════════════════ // v1.1 DB 级幂等:利用 @unique 索引 + Prisma P2002 冲突捕获。 // 比 findFirst → if exists 的 Read-then-Write 安全 100 倍: // 即使两个请求在同一毫秒到达,数据库唯一约束也会物理拦截第二个。 // ═══════════════════════════════════════════════════════ let task; try { task = await prisma.aslExtractionTask.create({ data: { projectId, templateId, totalCount: documentIds.length, status: 'processing', idempotencyKey }, }); } catch (error) { if (error.code === 'P2002' && idempotencyKey) { const existing = await prisma.aslExtractionTask.findFirst({ where: { idempotencyKey } }); return reply.send({ success: true, taskId: existing.id, note: 'Idempotent return' }); } throw error; } const resultsData = documentIds.map(docId => ({ taskId: task.id, pkbDocumentId: docId, status: 'pending', })); await prisma.aslExtractionResult.createMany({ data: resultsData }); const createdResults = await prisma.aslExtractionResult.findMany({ where: { taskId: task.id } }); // 散装派发:N 个独立 Job 一次入队 const jobs = createdResults.map(result => ({ name: 'asl_extract_single', data: { resultId: result.id, pkbDocumentId: result.pkbDocumentId }, options: { retryLimit: 3, retryBackoff: true, expireInMinutes: 30, // 执行超时(Worker 拿到 Job 后 30min 未完成则标记 expired,非排队等待超时) singletonKey: `extract-${result.id}`, // 防重复派发 }, })); await jobQueue.insert(jobs); return reply.send({ success: true, taskId: task.id }); } ``` **要点:** - `singletonKey: extract-${result.id}` 防止 API 重试导致重复派发 - `idempotencyKey` + `@unique` 索引 + P2002 捕获 = **DB 物理级幂等**(v1.1,替代 Read-then-Write) - `createMany` + `insert` 批量操作,100 篇文献入队 < 0.1 秒 ### 模式 2:Worker 单兵作战(含幽灵重试守卫) ```typescript jobQueue.work('asl_extract_single', { teamConcurrency: 10 }, async (job) => { const { resultId, pkbDocumentId } = job.data; // ═══════════════════════════════════════════════════════ // 幽灵重试守卫:只允许 pending → extracting。 // 如果已经是 completed/error,说明是 pg-boss 误重投的幽灵, // 直接跳过,省下一次 LLM API 费用。 // ⚠️ 此守卫依赖模式 3 Aggregator 的僵尸清理配合! // OOM 崩溃 → status 卡在 extracting → 重试被跳过 // → Aggregator 30 分钟后将 extracting 标为 error 兜底 // ═══════════════════════════════════════════════════════ const lock = await prisma.aslExtractionResult.updateMany({ where: { id: resultId, status: 'pending' }, data: { status: 'extracting' }, }); if (lock.count === 0) { return { success: true, note: 'Phantom retry skipped' }; } try { const data = await extractLogic(pkbDocumentId); // 只更新自己的 Result 行,绝不碰 Task 表! await prisma.aslExtractionResult.update({ where: { id: resultId }, data: { status: 'completed', extractedData: data }, }); } catch (error) { if (isPermanentError(error)) { await prisma.aslExtractionResult.update({ where: { id: resultId }, data: { status: 'error', errorMessage: error.message }, }); return { success: false, note: 'Permanent error' }; } // 临时错误:回退状态为 pending,让下次重试能通过幽灵守卫 await prisma.aslExtractionResult.update({ where: { id: resultId }, data: { status: 'pending' }, }); throw error; // pg-boss 指数退避重试 } }); ``` **要点:** - Worker **只写自己的 Result 行**,零行锁争用 - 临时错误 throw 前回退 `status → pending`,确保重试时幽灵守卫不拦截 - `teamConcurrency: 10` 全局限流,防 OOM ### 模式 3:Aggregator 轮询收口(含僵尸清理) ```typescript // 注册为 pg-boss 定时任务(多 Pod 下只有 1 个实例执行) await jobQueue.schedule('asl_extraction_aggregator', '*/10 * * * * *'); // 每 10 秒 jobQueue.work('asl_extraction_aggregator', async () => { const activeTasks = await prisma.aslExtractionTask.findMany({ where: { status: 'processing' }, }); for (const task of activeTasks) { // ═══════════════════════════════════════════════════════ // 僵尸清理:extracting 超过 30 分钟 → 标记 error // 场景:Worker OOM 崩溃 → status 卡在 extracting // → pg-boss 重试时被幽灵守卫跳过 // → 必须由 Aggregator 兜底收口 // ⚠️ 此逻辑与模式 2 的幽灵守卫是绑定关系,缺一不可! // ═══════════════════════════════════════════════════════ await prisma.aslExtractionResult.updateMany({ where: { taskId: task.id, status: 'extracting', updatedAt: { lt: new Date(Date.now() - 30 * 60 * 1000) }, }, data: { status: 'error', errorMessage: '[Aggregator] Extraction timeout (30min) — likely Worker crash.', }, }); // 聚合统计(有索引,100 条 < 1ms) const stats = await prisma.aslExtractionResult.groupBy({ by: ['status'], where: { taskId: task.id }, _count: true, }); const pendingCount = stats.find(s => s.status === 'pending')?._count || 0; const extractingCount = stats.find(s => s.status === 'extracting')?._count || 0; // 收口:没有人在排队或干活了 if (pendingCount === 0 && extractingCount === 0) { await prisma.aslExtractionTask.update({ where: { id: task.id }, data: { status: 'completed', completedAt: new Date() }, }); logger.info(`[Aggregator] Task ${task.id} completed`); } } }); ``` **要点:** - **一个组件两个职责**:僵尸清理 + 完成收口,无需额外 Sweeper - `groupBy` + 索引 `@@index([taskId, status])`,聚合极快 - pg-boss `schedule` 保证多 Pod 下只有 1 个实例执行,无并发问题 ### 模式 4:前端进度查询(实时 COUNT) ```typescript async function getTaskStatus(req, reply) { const { taskId } = req.params; const task = await prisma.aslExtractionTask.findUnique({ where: { id: taskId } }); // v1.1:1x groupBy 替代 2x count,DB 只扫描一次索引,查询量减半 const stats = await prisma.aslExtractionResult.groupBy({ by: ['status'], where: { taskId }, _count: true, }); const successCount = stats.find(s => s.status === 'completed')?._count || 0; const failedCount = stats.find(s => s.status === 'error')?._count || 0; return reply.send({ status: task.status, progress: { total: task.totalCount, success: successCount, failed: failedCount, percent: Math.round(((successCount + failedCount) / task.totalCount) * 100), }, }); } ``` --- ## 五、反模式速查表 | 反模式 | 后果 | 正确做法 | |--------|------|---------| | Worker 更新父 Task 的 successCount | 行锁争用,Lock timeout | Worker 只写自己的 Result,进度由 COUNT 聚合 | | Worker 无幽灵守卫,无条件 `extracting` | pg-boss 误重投时覆盖已完成数据,浪费 LLM 费用 | `updateMany({ where: { status: 'pending' } })` | | 临时错误 throw 前不回退 pending | 重试被幽灵守卫拦截,Aggregator 30min 后才能兜底 | throw 前 `update({ status: 'pending' })` | | Aggregator 不清理僵尸 extracting | OOM 崩溃后 Task 永远卡在 processing | 30min 超时 → error | | API 派发不加 singletonKey | 网络重试导致重复 Job | `singletonKey: extract-${resultId}` | | API 幂等用 Read-then-Write(findFirst → if exists) | 并发请求穿透检查,创建重复 Task | `@unique` 索引 + P2002 捕获(DB 物理级幂等)| | 进度查询发 2 次 count | 索引扫描 2 次,查询量翻倍 | 1 次 `groupBy` 聚合所有状态 | | 误解 `expireInMinutes` 为排队超时 | 队列繁忙时正常等待的 Job 被误杀 | `expireInMinutes` = 执行超时,与排队等待无关 | | Aggregator 用 `startedAt` 判断超时 | 误杀正在排队的大批量任务 | 用 `updatedAt`(最后活跃时间) | | 不设 `teamConcurrency` | 1000 个 Job 同时拉起 → OOM | `teamConcurrency: 10` | --- ## 六、开发检查清单 - [ ] **Worker 独立性**:Worker 是否只更新自己的 Result 行?是否完全不碰 Task 表? - [ ] **幽灵守卫**:Worker 是否用 `updateMany({ where: { status: 'pending' } })` 防覆盖? - [ ] **临时错误回退**:throw 前是否回退 `status → pending`? - [ ] **Aggregator 僵尸清理**:是否清理 extracting > 30min 的 Result? - [ ] **Aggregator 收口**:是否在 `pending=0 && extracting=0` 时标记 Task completed? - [ ] **singletonKey**:Job 派发是否设置了 `singletonKey: extract-${resultId}`? - [ ] **teamConcurrency**:Worker 是否设置了全局并发限制? - [ ] **索引**:Result 表是否有 `@@index([taskId, status])` 加速聚合? - [ ] **DB 级幂等**:API 是否用 `@unique(idempotencyKey)` + P2002 捕获?(禁止 Read-then-Write) - [ ] **groupBy 进度查询**:前端进度接口是否用 1 次 `groupBy` 而非多次 `count`? - [ ] **expireInMinutes 注释**:Job 配置中是否注明 `expireInMinutes` = 执行超时(非排队超时)? --- ## 七、与 Fan-out 模式的演进关系 ``` Level 1: Postgres-Only 单体任务(现有指南,DC Tool C) ↓ 当 N > 10 Level 2: 散装派发 + 轮询收口(本文,ASL 工具 3) ↓ 当日处理 > 10,000 且 Pod > 10 Level 3: 分布式 Fan-out(预留指南,暂不启用) ``` **升级条件:** 仅当同时满足以下条件时,才考虑升级到 Level 3: 1. 单次批量 > 2000 篇,且 Aggregator 10 秒轮询延迟不可接受 2. 部署 Pod > 10 个,需要实时精确计数(而非 COUNT 聚合) 3. 团队有 2+ 名熟悉分布式并发的高级开发 > **设计哲学:** 用最简单的架构解决当前问题。所有 Fan-out 的知识沉淀都在 `分布式Fan-out任务模式开发指南.md` 中保存完好,等真正需要时再启用。 --- ## 八、遵守底层规范 本文所有代码模式均遵守 `Postgres-Only异步任务处理指南.md` 的强制规范: | 规范 | 本文实现 | |------|---------| | 队列名用下划线 | `asl_extract_single`、`asl_extraction_aggregator` | | Payload 轻量 (< 1KB) | 仅传 `{ resultId, pkbDocumentId }` | | Worker 必须幂等 | 幽灵守卫 + 临时错误回退 pending | | 设置过期时间 | `expireInMinutes: 30`(执行超时,非排队超时) | | 错误分级 | 永久 return / 临时 throw | | API 幂等 | `@unique(idempotencyKey)` + P2002 捕获(DB 物理级) | --- *本文档基于 ASL 工具 3 全文智能提取工作台的架构选型经验总结。* *散装派发模式是 Fan-out 模式的降维替代方案,以 80% 的复杂度降低换取 95% 的功能覆盖。* *待实战验证后升级为 v2.0。* --- ### 版本记录 | 版本 | 日期 | 变更内容 | |------|------|---------| | v1.0 | 2026-02-24 | 初版,4 项核心模式 + 8 项检查清单 | | v1.1 | 2026-02-24 | 优化 1:API 幂等升级为 `@unique` + P2002(替代 Read-then-Write);优化 2:进度查询 2x count → 1x groupBy;优化 3:`expireInMinutes` 加注释说明含义;反模式 +2,检查清单 +3 |