Architecture transformation: - Replace Fan-out (Manager->Child->Last Child Wins) with Scatter+Aggregator pattern - API layer directly dispatches N independent jobs (no Manager) - Worker only writes its own Result row, never touches Task table (zero row-lock) - Aggregator polls groupBy for completion + zombie cleanup (replaces Sweeper) - Reduce red lines from 13 to 9, eliminate distributed complexity Documents updated (10 files): - 08-Tool3 main architecture doc: v2.0 rewrite (schema, Task 2.3/2.4, red lines, risks) - 08d-Code patterns: rewrite sections 4.1-4.6 (API dispatch, SingleWorker, Aggregator) - 08a-M1 sprint: rewrite M1-3 core (Worker+Aggregator), red lines, acceptance criteria - 08b-M2 sprint: simplify SSE (NOTIFY/LISTEN downgraded to P2 optional) - 08c-M3 sprint: milestone table wording update - New: Scatter+Polling Aggregator pattern guide v1.1 (Level 2 cookbook) - New: V2.0 architecture deep review and gap-fix report - Updated: ASL module status, system status, capability layer index Co-authored-by: Cursor <cursoragent@cursor.com>
18 KiB
18 KiB
散装派发与轮询收口任务模式指南
版本: v1.1(优化:DB 级幂等 + groupBy 进度查询 + expireInMinutes 注释)
创建日期: 2026-02-24
定位: Level 2 批量任务实战 Cookbook(1 触发 → N 个独立 Worker → Aggregator 收口)
底层规范:Postgres-Only异步任务处理指南.md(Level 1,队列命名、Payload、过期时间等强制规范)
首个试点: ASL 工具 3 全文智能提取工作台
替代方案:分布式Fan-out任务模式开发指南.md(Level 3,适用于万级任务 + 10+ Pod,当前阶段不推荐)
一、适用场景判断
| 维度 | Level 1:单体任务 | Level 2:散装派发(本文) | Level 3:Fan-out |
|---|---|---|---|
| 触发模式 | 1 触发 → 1 Worker → 结束 | 1 触发 → N 个独立 Worker → Aggregator 收口 | 1 触发 → Manager → N Child → Last Child Wins |
| 典型案例 | DC Tool C 解析 1 个 Excel | ASL 工具 3 批量提取 100 篇文献 | 万级任务 + 严格实时计数 |
| 复杂度 | ⭐ | ⭐⭐ | ⭐⭐⭐⭐⭐ |
| 团队要求 | 初级 | 初中级 | 高级(需掌握 12+ 并发模式) |
判断公式: 如果你的任务是"1 次操作处理 N 个独立子项,N 可能 > 10,且团队 < 10 人",选本文的散装模式。
二、核心架构:散装派发 + 独立单兵 + 轮询收口
┌─ API 层 ──────────────────────────────────────┐
│ POST /tasks │
│ 1. 创建 1 个 Task(status: processing) │
│ 2. 批量创建 N 个 Result(status: pending) │
│ 3. 散装派发 N 个 pgBoss Job(一次 insert) │
│ 4. 立即返回 taskId(< 0.1 秒) │
└────────────────────────────────────────────────┘
↓ (N 个独立 Job)
┌─ Worker 层(多 Pod 各自抢单)─────────────────┐
│ 每个 Worker 只管自己的 1 篇文献: │
│ 1. 幽灵重试守卫(updateMany 防覆盖已完成) │
│ 2. 执行业务逻辑(MinerU + LLM) │
│ 3. 只更新自己的 Result 行 │
│ 4. 绝不触碰父任务 Task 表! │
│ 5. 错误分级:致命 return / 临时回退 + throw │
└────────────────────────────────────────────────┘
┌─ Aggregator(全局唯一,每 10 秒)─────────────┐
│ 1. 清理僵尸:extracting > 30min → error │
│ 2. 聚合统计:GROUP BY Result.status │
│ 3. 收口判定:pending=0 且 extracting=0 │
│ → 标记 Task completed │
└────────────────────────────────────────────────┘
┌─ 前端(React Query 轮询)─────────────────────┐
│ GET /tasks/:taskId/status │
│ → 实时 COUNT(Result) 聚合进度 │
│ → 无需 successCount/failedCount 冗余字段 │
└────────────────────────────────────────────────┘
为什么不用 Fan-out?
| Fan-out 的代价 | 散装模式如何规避 |
|---|---|
| Last Child Wins(终止器逻辑) | Aggregator 轮询收口,零并发竞争 |
原子递增 successCount(行锁争用) |
Worker 不碰父表,进度由 COUNT 聚合 |
| 乐观锁与重试的互相绞杀 | 幽灵重试守卫 + Aggregator 僵尸清理,两层配合 |
| Sweeper 清道夫 | Aggregator 自带清理,一个组件两个职责 |
| NOTIFY/LISTEN 跨 Pod 广播 | 纯轮询,无需跨 Pod 通信 |
| 12+ 设计模式、19 项检查清单 | 4 个核心模式 + 8 项检查清单 |
三、数据库设计
model AslExtractionTask {
id String @id @default(uuid())
projectId String
templateId String
idempotencyKey String? @unique // v1.1:DB 级幂等(防并发重复创建)
totalCount Int // 创建后不再变更
status String @default("processing") // 仅 Aggregator 改为 completed
createdAt DateTime @default(now())
completedAt DateTime?
results AslExtractionResult[]
@@schema("asl_schema")
}
model AslExtractionResult {
id String @id @default(uuid())
taskId String
pkbDocumentId String
status String @default("pending") // pending → extracting → completed/error
extractedData Json?
errorMessage String?
task AslExtractionTask @relation(fields: [taskId], references: [id])
@@index([taskId, status]) // Aggregator 聚合查询加速
@@schema("asl_schema")
}
关键设计:Task 表无 successCount/failedCount。 进度由 COUNT(Result WHERE status=...) 实时聚合,彻底消灭多 Worker 对同一行的写竞争。
四、4 项核心代码模式
模式 1:API 层散装派发
async function createTask(req, reply) {
const { projectId, templateId, documentIds, idempotencyKey } = req.body;
if (documentIds.length === 0) throw new Error('No documents selected');
// ═══════════════════════════════════════════════════════
// v1.1 DB 级幂等:利用 @unique 索引 + Prisma P2002 冲突捕获。
// 比 findFirst → if exists 的 Read-then-Write 安全 100 倍:
// 即使两个请求在同一毫秒到达,数据库唯一约束也会物理拦截第二个。
// ═══════════════════════════════════════════════════════
let task;
try {
task = await prisma.aslExtractionTask.create({
data: { projectId, templateId, totalCount: documentIds.length, status: 'processing', idempotencyKey },
});
} catch (error) {
if (error.code === 'P2002' && idempotencyKey) {
const existing = await prisma.aslExtractionTask.findFirst({ where: { idempotencyKey } });
return reply.send({ success: true, taskId: existing.id, note: 'Idempotent return' });
}
throw error;
}
const resultsData = documentIds.map(docId => ({
taskId: task.id, pkbDocumentId: docId, status: 'pending',
}));
await prisma.aslExtractionResult.createMany({ data: resultsData });
const createdResults = await prisma.aslExtractionResult.findMany({ where: { taskId: task.id } });
// 散装派发:N 个独立 Job 一次入队
const jobs = createdResults.map(result => ({
name: 'asl_extract_single',
data: { resultId: result.id, pkbDocumentId: result.pkbDocumentId },
options: {
retryLimit: 3,
retryBackoff: true,
expireInMinutes: 30, // 执行超时(Worker 拿到 Job 后 30min 未完成则标记 expired,非排队等待超时)
singletonKey: `extract-${result.id}`, // 防重复派发
},
}));
await jobQueue.insert(jobs);
return reply.send({ success: true, taskId: task.id });
}
要点:
singletonKey: extract-${result.id}防止 API 重试导致重复派发idempotencyKey+@unique索引 + P2002 捕获 = DB 物理级幂等(v1.1,替代 Read-then-Write)createMany+insert批量操作,100 篇文献入队 < 0.1 秒
模式 2:Worker 单兵作战(含幽灵重试守卫)
jobQueue.work('asl_extract_single', { teamConcurrency: 10 }, async (job) => {
const { resultId, pkbDocumentId } = job.data;
// ═══════════════════════════════════════════════════════
// 幽灵重试守卫:只允许 pending → extracting。
// 如果已经是 completed/error,说明是 pg-boss 误重投的幽灵,
// 直接跳过,省下一次 LLM API 费用。
// ⚠️ 此守卫依赖模式 3 Aggregator 的僵尸清理配合!
// OOM 崩溃 → status 卡在 extracting → 重试被跳过
// → Aggregator 30 分钟后将 extracting 标为 error 兜底
// ═══════════════════════════════════════════════════════
const lock = await prisma.aslExtractionResult.updateMany({
where: { id: resultId, status: 'pending' },
data: { status: 'extracting' },
});
if (lock.count === 0) {
return { success: true, note: 'Phantom retry skipped' };
}
try {
const data = await extractLogic(pkbDocumentId);
// 只更新自己的 Result 行,绝不碰 Task 表!
await prisma.aslExtractionResult.update({
where: { id: resultId },
data: { status: 'completed', extractedData: data },
});
} catch (error) {
if (isPermanentError(error)) {
await prisma.aslExtractionResult.update({
where: { id: resultId },
data: { status: 'error', errorMessage: error.message },
});
return { success: false, note: 'Permanent error' };
}
// 临时错误:回退状态为 pending,让下次重试能通过幽灵守卫
await prisma.aslExtractionResult.update({
where: { id: resultId },
data: { status: 'pending' },
});
throw error; // pg-boss 指数退避重试
}
});
要点:
- Worker 只写自己的 Result 行,零行锁争用
- 临时错误 throw 前回退
status → pending,确保重试时幽灵守卫不拦截 teamConcurrency: 10全局限流,防 OOM
模式 3:Aggregator 轮询收口(含僵尸清理)
// 注册为 pg-boss 定时任务(多 Pod 下只有 1 个实例执行)
await jobQueue.schedule('asl_extraction_aggregator', '*/10 * * * * *'); // 每 10 秒
jobQueue.work('asl_extraction_aggregator', async () => {
const activeTasks = await prisma.aslExtractionTask.findMany({
where: { status: 'processing' },
});
for (const task of activeTasks) {
// ═══════════════════════════════════════════════════════
// 僵尸清理:extracting 超过 30 分钟 → 标记 error
// 场景:Worker OOM 崩溃 → status 卡在 extracting
// → pg-boss 重试时被幽灵守卫跳过
// → 必须由 Aggregator 兜底收口
// ⚠️ 此逻辑与模式 2 的幽灵守卫是绑定关系,缺一不可!
// ═══════════════════════════════════════════════════════
await prisma.aslExtractionResult.updateMany({
where: {
taskId: task.id,
status: 'extracting',
updatedAt: { lt: new Date(Date.now() - 30 * 60 * 1000) },
},
data: {
status: 'error',
errorMessage: '[Aggregator] Extraction timeout (30min) — likely Worker crash.',
},
});
// 聚合统计(有索引,100 条 < 1ms)
const stats = await prisma.aslExtractionResult.groupBy({
by: ['status'],
where: { taskId: task.id },
_count: true,
});
const pendingCount = stats.find(s => s.status === 'pending')?._count || 0;
const extractingCount = stats.find(s => s.status === 'extracting')?._count || 0;
// 收口:没有人在排队或干活了
if (pendingCount === 0 && extractingCount === 0) {
await prisma.aslExtractionTask.update({
where: { id: task.id },
data: { status: 'completed', completedAt: new Date() },
});
logger.info(`[Aggregator] Task ${task.id} completed`);
}
}
});
要点:
- 一个组件两个职责:僵尸清理 + 完成收口,无需额外 Sweeper
groupBy+ 索引@@index([taskId, status]),聚合极快- pg-boss
schedule保证多 Pod 下只有 1 个实例执行,无并发问题
模式 4:前端进度查询(实时 COUNT)
async function getTaskStatus(req, reply) {
const { taskId } = req.params;
const task = await prisma.aslExtractionTask.findUnique({ where: { id: taskId } });
// v1.1:1x groupBy 替代 2x count,DB 只扫描一次索引,查询量减半
const stats = await prisma.aslExtractionResult.groupBy({
by: ['status'],
where: { taskId },
_count: true,
});
const successCount = stats.find(s => s.status === 'completed')?._count || 0;
const failedCount = stats.find(s => s.status === 'error')?._count || 0;
return reply.send({
status: task.status,
progress: {
total: task.totalCount,
success: successCount,
failed: failedCount,
percent: Math.round(((successCount + failedCount) / task.totalCount) * 100),
},
});
}
五、反模式速查表
| 反模式 | 后果 | 正确做法 |
|---|---|---|
| Worker 更新父 Task 的 successCount | 行锁争用,Lock timeout | Worker 只写自己的 Result,进度由 COUNT 聚合 |
Worker 无幽灵守卫,无条件 extracting |
pg-boss 误重投时覆盖已完成数据,浪费 LLM 费用 | updateMany({ where: { status: 'pending' } }) |
| 临时错误 throw 前不回退 pending | 重试被幽灵守卫拦截,Aggregator 30min 后才能兜底 | throw 前 update({ status: 'pending' }) |
| Aggregator 不清理僵尸 extracting | OOM 崩溃后 Task 永远卡在 processing | 30min 超时 → error |
| API 派发不加 singletonKey | 网络重试导致重复 Job | singletonKey: extract-${resultId} |
| API 幂等用 Read-then-Write(findFirst → if exists) | 并发请求穿透检查,创建重复 Task | @unique 索引 + P2002 捕获(DB 物理级幂等) |
| 进度查询发 2 次 count | 索引扫描 2 次,查询量翻倍 | 1 次 groupBy 聚合所有状态 |
误解 expireInMinutes 为排队超时 |
队列繁忙时正常等待的 Job 被误杀 | expireInMinutes = 执行超时,与排队等待无关 |
Aggregator 用 startedAt 判断超时 |
误杀正在排队的大批量任务 | 用 updatedAt(最后活跃时间) |
不设 teamConcurrency |
1000 个 Job 同时拉起 → OOM | teamConcurrency: 10 |
六、开发检查清单
- Worker 独立性:Worker 是否只更新自己的 Result 行?是否完全不碰 Task 表?
- 幽灵守卫:Worker 是否用
updateMany({ where: { status: 'pending' } })防覆盖? - 临时错误回退:throw 前是否回退
status → pending? - Aggregator 僵尸清理:是否清理 extracting > 30min 的 Result?
- Aggregator 收口:是否在
pending=0 && extracting=0时标记 Task completed? - singletonKey:Job 派发是否设置了
singletonKey: extract-${resultId}? - teamConcurrency:Worker 是否设置了全局并发限制?
- 索引:Result 表是否有
@@index([taskId, status])加速聚合? - DB 级幂等:API 是否用
@unique(idempotencyKey)+ P2002 捕获?(禁止 Read-then-Write) - groupBy 进度查询:前端进度接口是否用 1 次
groupBy而非多次count? - expireInMinutes 注释:Job 配置中是否注明
expireInMinutes= 执行超时(非排队超时)?
七、与 Fan-out 模式的演进关系
Level 1: Postgres-Only 单体任务(现有指南,DC Tool C)
↓ 当 N > 10
Level 2: 散装派发 + 轮询收口(本文,ASL 工具 3)
↓ 当日处理 > 10,000 且 Pod > 10
Level 3: 分布式 Fan-out(预留指南,暂不启用)
升级条件: 仅当同时满足以下条件时,才考虑升级到 Level 3:
- 单次批量 > 2000 篇,且 Aggregator 10 秒轮询延迟不可接受
- 部署 Pod > 10 个,需要实时精确计数(而非 COUNT 聚合)
- 团队有 2+ 名熟悉分布式并发的高级开发
设计哲学: 用最简单的架构解决当前问题。所有 Fan-out 的知识沉淀都在
分布式Fan-out任务模式开发指南.md中保存完好,等真正需要时再启用。
八、遵守底层规范
本文所有代码模式均遵守 Postgres-Only异步任务处理指南.md 的强制规范:
| 规范 | 本文实现 |
|---|---|
| 队列名用下划线 | asl_extract_single、asl_extraction_aggregator |
| Payload 轻量 (< 1KB) | 仅传 { resultId, pkbDocumentId } |
| Worker 必须幂等 | 幽灵守卫 + 临时错误回退 pending |
| 设置过期时间 | expireInMinutes: 30(执行超时,非排队超时) |
| 错误分级 | 永久 return / 临时 throw |
| API 幂等 | @unique(idempotencyKey) + P2002 捕获(DB 物理级) |
本文档基于 ASL 工具 3 全文智能提取工作台的架构选型经验总结。 散装派发模式是 Fan-out 模式的降维替代方案,以 80% 的复杂度降低换取 95% 的功能覆盖。 待实战验证后升级为 v2.0。
版本记录
| 版本 | 日期 | 变更内容 |
|---|---|---|
| v1.0 | 2026-02-24 | 初版,4 项核心模式 + 8 项检查清单 |
| v1.1 | 2026-02-24 | 优化 1:API 幂等升级为 @unique + P2002(替代 Read-then-Write);优化 2:进度查询 2x count → 1x groupBy;优化 3:expireInMinutes 加注释说明含义;反模式 +2,检查清单 +3 |