Architecture transformation: - Replace Fan-out (Manager->Child->Last Child Wins) with Scatter+Aggregator pattern - API layer directly dispatches N independent jobs (no Manager) - Worker only writes its own Result row, never touches Task table (zero row-lock) - Aggregator polls groupBy for completion + zombie cleanup (replaces Sweeper) - Reduce red lines from 13 to 9, eliminate distributed complexity Documents updated (10 files): - 08-Tool3 main architecture doc: v2.0 rewrite (schema, Task 2.3/2.4, red lines, risks) - 08d-Code patterns: rewrite sections 4.1-4.6 (API dispatch, SingleWorker, Aggregator) - 08a-M1 sprint: rewrite M1-3 core (Worker+Aggregator), red lines, acceptance criteria - 08b-M2 sprint: simplify SSE (NOTIFY/LISTEN downgraded to P2 optional) - 08c-M3 sprint: milestone table wording update - New: Scatter+Polling Aggregator pattern guide v1.1 (Level 2 cookbook) - New: V2.0 architecture deep review and gap-fix report - Updated: ASL module status, system status, capability layer index Co-authored-by: Cursor <cursoragent@cursor.com>
385 lines
18 KiB
Markdown
385 lines
18 KiB
Markdown
# 散装派发与轮询收口任务模式指南
|
||
|
||
> **版本:** v1.1(优化:DB 级幂等 + groupBy 进度查询 + expireInMinutes 注释)
|
||
> **创建日期:** 2026-02-24
|
||
> **定位:** Level 2 批量任务实战 Cookbook(1 触发 → N 个独立 Worker → Aggregator 收口)
|
||
> **底层规范:** `Postgres-Only异步任务处理指南.md`(Level 1,队列命名、Payload、过期时间等强制规范)
|
||
> **首个试点:** ASL 工具 3 全文智能提取工作台
|
||
> **替代方案:** `分布式Fan-out任务模式开发指南.md`(Level 3,适用于万级任务 + 10+ Pod,当前阶段不推荐)
|
||
|
||
---
|
||
|
||
## 一、适用场景判断
|
||
|
||
| 维度 | Level 1:单体任务 | **Level 2:散装派发(本文)** | Level 3:Fan-out |
|
||
|------|-------------------|---------------------------|-----------------|
|
||
| **触发模式** | 1 触发 → 1 Worker → 结束 | 1 触发 → N 个独立 Worker → Aggregator 收口 | 1 触发 → Manager → N Child → Last Child Wins |
|
||
| **典型案例** | DC Tool C 解析 1 个 Excel | **ASL 工具 3 批量提取 100 篇文献** | 万级任务 + 严格实时计数 |
|
||
| **复杂度** | ⭐ | ⭐⭐ | ⭐⭐⭐⭐⭐ |
|
||
| **团队要求** | 初级 | 初中级 | 高级(需掌握 12+ 并发模式) |
|
||
|
||
**判断公式:** 如果你的任务是"1 次操作处理 N 个独立子项,N 可能 > 10,且团队 < 10 人",选本文的散装模式。
|
||
|
||
---
|
||
|
||
## 二、核心架构:散装派发 + 独立单兵 + 轮询收口
|
||
|
||
```
|
||
┌─ API 层 ──────────────────────────────────────┐
|
||
│ POST /tasks │
|
||
│ 1. 创建 1 个 Task(status: processing) │
|
||
│ 2. 批量创建 N 个 Result(status: pending) │
|
||
│ 3. 散装派发 N 个 pgBoss Job(一次 insert) │
|
||
│ 4. 立即返回 taskId(< 0.1 秒) │
|
||
└────────────────────────────────────────────────┘
|
||
↓ (N 个独立 Job)
|
||
┌─ Worker 层(多 Pod 各自抢单)─────────────────┐
|
||
│ 每个 Worker 只管自己的 1 篇文献: │
|
||
│ 1. 幽灵重试守卫(updateMany 防覆盖已完成) │
|
||
│ 2. 执行业务逻辑(MinerU + LLM) │
|
||
│ 3. 只更新自己的 Result 行 │
|
||
│ 4. 绝不触碰父任务 Task 表! │
|
||
│ 5. 错误分级:致命 return / 临时回退 + throw │
|
||
└────────────────────────────────────────────────┘
|
||
|
||
┌─ Aggregator(全局唯一,每 10 秒)─────────────┐
|
||
│ 1. 清理僵尸:extracting > 30min → error │
|
||
│ 2. 聚合统计:GROUP BY Result.status │
|
||
│ 3. 收口判定:pending=0 且 extracting=0 │
|
||
│ → 标记 Task completed │
|
||
└────────────────────────────────────────────────┘
|
||
|
||
┌─ 前端(React Query 轮询)─────────────────────┐
|
||
│ GET /tasks/:taskId/status │
|
||
│ → 实时 COUNT(Result) 聚合进度 │
|
||
│ → 无需 successCount/failedCount 冗余字段 │
|
||
└────────────────────────────────────────────────┘
|
||
```
|
||
|
||
### 为什么不用 Fan-out?
|
||
|
||
| Fan-out 的代价 | 散装模式如何规避 |
|
||
|---------------|----------------|
|
||
| Last Child Wins(终止器逻辑) | **Aggregator 轮询收口**,零并发竞争 |
|
||
| 原子递增 `successCount`(行锁争用) | **Worker 不碰父表**,进度由 COUNT 聚合 |
|
||
| 乐观锁与重试的互相绞杀 | **幽灵重试守卫 + Aggregator 僵尸清理**,两层配合 |
|
||
| Sweeper 清道夫 | **Aggregator 自带清理**,一个组件两个职责 |
|
||
| NOTIFY/LISTEN 跨 Pod 广播 | **纯轮询**,无需跨 Pod 通信 |
|
||
| 12+ 设计模式、19 项检查清单 | **4 个核心模式 + 8 项检查清单** |
|
||
|
||
---
|
||
|
||
## 三、数据库设计
|
||
|
||
```prisma
|
||
model AslExtractionTask {
|
||
id String @id @default(uuid())
|
||
projectId String
|
||
templateId String
|
||
idempotencyKey String? @unique // v1.1:DB 级幂等(防并发重复创建)
|
||
totalCount Int // 创建后不再变更
|
||
status String @default("processing") // 仅 Aggregator 改为 completed
|
||
createdAt DateTime @default(now())
|
||
completedAt DateTime?
|
||
results AslExtractionResult[]
|
||
@@schema("asl_schema")
|
||
}
|
||
|
||
model AslExtractionResult {
|
||
id String @id @default(uuid())
|
||
taskId String
|
||
pkbDocumentId String
|
||
status String @default("pending") // pending → extracting → completed/error
|
||
extractedData Json?
|
||
errorMessage String?
|
||
task AslExtractionTask @relation(fields: [taskId], references: [id])
|
||
@@index([taskId, status]) // Aggregator 聚合查询加速
|
||
@@schema("asl_schema")
|
||
}
|
||
```
|
||
|
||
**关键设计:Task 表无 `successCount`/`failedCount`。** 进度由 `COUNT(Result WHERE status=...)` 实时聚合,彻底消灭多 Worker 对同一行的写竞争。
|
||
|
||
---
|
||
|
||
## 四、4 项核心代码模式
|
||
|
||
### 模式 1:API 层散装派发
|
||
|
||
```typescript
|
||
async function createTask(req, reply) {
|
||
const { projectId, templateId, documentIds, idempotencyKey } = req.body;
|
||
|
||
if (documentIds.length === 0) throw new Error('No documents selected');
|
||
|
||
// ═══════════════════════════════════════════════════════
|
||
// v1.1 DB 级幂等:利用 @unique 索引 + Prisma P2002 冲突捕获。
|
||
// 比 findFirst → if exists 的 Read-then-Write 安全 100 倍:
|
||
// 即使两个请求在同一毫秒到达,数据库唯一约束也会物理拦截第二个。
|
||
// ═══════════════════════════════════════════════════════
|
||
let task;
|
||
try {
|
||
task = await prisma.aslExtractionTask.create({
|
||
data: { projectId, templateId, totalCount: documentIds.length, status: 'processing', idempotencyKey },
|
||
});
|
||
} catch (error) {
|
||
if (error.code === 'P2002' && idempotencyKey) {
|
||
const existing = await prisma.aslExtractionTask.findFirst({ where: { idempotencyKey } });
|
||
return reply.send({ success: true, taskId: existing.id, note: 'Idempotent return' });
|
||
}
|
||
throw error;
|
||
}
|
||
|
||
const resultsData = documentIds.map(docId => ({
|
||
taskId: task.id, pkbDocumentId: docId, status: 'pending',
|
||
}));
|
||
await prisma.aslExtractionResult.createMany({ data: resultsData });
|
||
|
||
const createdResults = await prisma.aslExtractionResult.findMany({ where: { taskId: task.id } });
|
||
|
||
// 散装派发:N 个独立 Job 一次入队
|
||
const jobs = createdResults.map(result => ({
|
||
name: 'asl_extract_single',
|
||
data: { resultId: result.id, pkbDocumentId: result.pkbDocumentId },
|
||
options: {
|
||
retryLimit: 3,
|
||
retryBackoff: true,
|
||
expireInMinutes: 30, // 执行超时(Worker 拿到 Job 后 30min 未完成则标记 expired,非排队等待超时)
|
||
singletonKey: `extract-${result.id}`, // 防重复派发
|
||
},
|
||
}));
|
||
await jobQueue.insert(jobs);
|
||
|
||
return reply.send({ success: true, taskId: task.id });
|
||
}
|
||
```
|
||
|
||
**要点:**
|
||
- `singletonKey: extract-${result.id}` 防止 API 重试导致重复派发
|
||
- `idempotencyKey` + `@unique` 索引 + P2002 捕获 = **DB 物理级幂等**(v1.1,替代 Read-then-Write)
|
||
- `createMany` + `insert` 批量操作,100 篇文献入队 < 0.1 秒
|
||
|
||
### 模式 2:Worker 单兵作战(含幽灵重试守卫)
|
||
|
||
```typescript
|
||
jobQueue.work('asl_extract_single', { teamConcurrency: 10 }, async (job) => {
|
||
const { resultId, pkbDocumentId } = job.data;
|
||
|
||
// ═══════════════════════════════════════════════════════
|
||
// 幽灵重试守卫:只允许 pending → extracting。
|
||
// 如果已经是 completed/error,说明是 pg-boss 误重投的幽灵,
|
||
// 直接跳过,省下一次 LLM API 费用。
|
||
// ⚠️ 此守卫依赖模式 3 Aggregator 的僵尸清理配合!
|
||
// OOM 崩溃 → status 卡在 extracting → 重试被跳过
|
||
// → Aggregator 30 分钟后将 extracting 标为 error 兜底
|
||
// ═══════════════════════════════════════════════════════
|
||
const lock = await prisma.aslExtractionResult.updateMany({
|
||
where: { id: resultId, status: 'pending' },
|
||
data: { status: 'extracting' },
|
||
});
|
||
if (lock.count === 0) {
|
||
return { success: true, note: 'Phantom retry skipped' };
|
||
}
|
||
|
||
try {
|
||
const data = await extractLogic(pkbDocumentId);
|
||
|
||
// 只更新自己的 Result 行,绝不碰 Task 表!
|
||
await prisma.aslExtractionResult.update({
|
||
where: { id: resultId },
|
||
data: { status: 'completed', extractedData: data },
|
||
});
|
||
} catch (error) {
|
||
if (isPermanentError(error)) {
|
||
await prisma.aslExtractionResult.update({
|
||
where: { id: resultId },
|
||
data: { status: 'error', errorMessage: error.message },
|
||
});
|
||
return { success: false, note: 'Permanent error' };
|
||
}
|
||
|
||
// 临时错误:回退状态为 pending,让下次重试能通过幽灵守卫
|
||
await prisma.aslExtractionResult.update({
|
||
where: { id: resultId },
|
||
data: { status: 'pending' },
|
||
});
|
||
throw error; // pg-boss 指数退避重试
|
||
}
|
||
});
|
||
```
|
||
|
||
**要点:**
|
||
- Worker **只写自己的 Result 行**,零行锁争用
|
||
- 临时错误 throw 前回退 `status → pending`,确保重试时幽灵守卫不拦截
|
||
- `teamConcurrency: 10` 全局限流,防 OOM
|
||
|
||
### 模式 3:Aggregator 轮询收口(含僵尸清理)
|
||
|
||
```typescript
|
||
// 注册为 pg-boss 定时任务(多 Pod 下只有 1 个实例执行)
|
||
await jobQueue.schedule('asl_extraction_aggregator', '*/10 * * * * *'); // 每 10 秒
|
||
|
||
jobQueue.work('asl_extraction_aggregator', async () => {
|
||
const activeTasks = await prisma.aslExtractionTask.findMany({
|
||
where: { status: 'processing' },
|
||
});
|
||
|
||
for (const task of activeTasks) {
|
||
// ═══════════════════════════════════════════════════════
|
||
// 僵尸清理:extracting 超过 30 分钟 → 标记 error
|
||
// 场景:Worker OOM 崩溃 → status 卡在 extracting
|
||
// → pg-boss 重试时被幽灵守卫跳过
|
||
// → 必须由 Aggregator 兜底收口
|
||
// ⚠️ 此逻辑与模式 2 的幽灵守卫是绑定关系,缺一不可!
|
||
// ═══════════════════════════════════════════════════════
|
||
await prisma.aslExtractionResult.updateMany({
|
||
where: {
|
||
taskId: task.id,
|
||
status: 'extracting',
|
||
updatedAt: { lt: new Date(Date.now() - 30 * 60 * 1000) },
|
||
},
|
||
data: {
|
||
status: 'error',
|
||
errorMessage: '[Aggregator] Extraction timeout (30min) — likely Worker crash.',
|
||
},
|
||
});
|
||
|
||
// 聚合统计(有索引,100 条 < 1ms)
|
||
const stats = await prisma.aslExtractionResult.groupBy({
|
||
by: ['status'],
|
||
where: { taskId: task.id },
|
||
_count: true,
|
||
});
|
||
|
||
const pendingCount = stats.find(s => s.status === 'pending')?._count || 0;
|
||
const extractingCount = stats.find(s => s.status === 'extracting')?._count || 0;
|
||
|
||
// 收口:没有人在排队或干活了
|
||
if (pendingCount === 0 && extractingCount === 0) {
|
||
await prisma.aslExtractionTask.update({
|
||
where: { id: task.id },
|
||
data: { status: 'completed', completedAt: new Date() },
|
||
});
|
||
logger.info(`[Aggregator] Task ${task.id} completed`);
|
||
}
|
||
}
|
||
});
|
||
```
|
||
|
||
**要点:**
|
||
- **一个组件两个职责**:僵尸清理 + 完成收口,无需额外 Sweeper
|
||
- `groupBy` + 索引 `@@index([taskId, status])`,聚合极快
|
||
- pg-boss `schedule` 保证多 Pod 下只有 1 个实例执行,无并发问题
|
||
|
||
### 模式 4:前端进度查询(实时 COUNT)
|
||
|
||
```typescript
|
||
async function getTaskStatus(req, reply) {
|
||
const { taskId } = req.params;
|
||
|
||
const task = await prisma.aslExtractionTask.findUnique({ where: { id: taskId } });
|
||
|
||
// v1.1:1x groupBy 替代 2x count,DB 只扫描一次索引,查询量减半
|
||
const stats = await prisma.aslExtractionResult.groupBy({
|
||
by: ['status'],
|
||
where: { taskId },
|
||
_count: true,
|
||
});
|
||
|
||
const successCount = stats.find(s => s.status === 'completed')?._count || 0;
|
||
const failedCount = stats.find(s => s.status === 'error')?._count || 0;
|
||
|
||
return reply.send({
|
||
status: task.status,
|
||
progress: {
|
||
total: task.totalCount,
|
||
success: successCount,
|
||
failed: failedCount,
|
||
percent: Math.round(((successCount + failedCount) / task.totalCount) * 100),
|
||
},
|
||
});
|
||
}
|
||
```
|
||
|
||
---
|
||
|
||
## 五、反模式速查表
|
||
|
||
| 反模式 | 后果 | 正确做法 |
|
||
|--------|------|---------|
|
||
| Worker 更新父 Task 的 successCount | 行锁争用,Lock timeout | Worker 只写自己的 Result,进度由 COUNT 聚合 |
|
||
| Worker 无幽灵守卫,无条件 `extracting` | pg-boss 误重投时覆盖已完成数据,浪费 LLM 费用 | `updateMany({ where: { status: 'pending' } })` |
|
||
| 临时错误 throw 前不回退 pending | 重试被幽灵守卫拦截,Aggregator 30min 后才能兜底 | throw 前 `update({ status: 'pending' })` |
|
||
| Aggregator 不清理僵尸 extracting | OOM 崩溃后 Task 永远卡在 processing | 30min 超时 → error |
|
||
| API 派发不加 singletonKey | 网络重试导致重复 Job | `singletonKey: extract-${resultId}` |
|
||
| API 幂等用 Read-then-Write(findFirst → if exists) | 并发请求穿透检查,创建重复 Task | `@unique` 索引 + P2002 捕获(DB 物理级幂等)|
|
||
| 进度查询发 2 次 count | 索引扫描 2 次,查询量翻倍 | 1 次 `groupBy` 聚合所有状态 |
|
||
| 误解 `expireInMinutes` 为排队超时 | 队列繁忙时正常等待的 Job 被误杀 | `expireInMinutes` = 执行超时,与排队等待无关 |
|
||
| Aggregator 用 `startedAt` 判断超时 | 误杀正在排队的大批量任务 | 用 `updatedAt`(最后活跃时间) |
|
||
| 不设 `teamConcurrency` | 1000 个 Job 同时拉起 → OOM | `teamConcurrency: 10` |
|
||
|
||
---
|
||
|
||
## 六、开发检查清单
|
||
|
||
- [ ] **Worker 独立性**:Worker 是否只更新自己的 Result 行?是否完全不碰 Task 表?
|
||
- [ ] **幽灵守卫**:Worker 是否用 `updateMany({ where: { status: 'pending' } })` 防覆盖?
|
||
- [ ] **临时错误回退**:throw 前是否回退 `status → pending`?
|
||
- [ ] **Aggregator 僵尸清理**:是否清理 extracting > 30min 的 Result?
|
||
- [ ] **Aggregator 收口**:是否在 `pending=0 && extracting=0` 时标记 Task completed?
|
||
- [ ] **singletonKey**:Job 派发是否设置了 `singletonKey: extract-${resultId}`?
|
||
- [ ] **teamConcurrency**:Worker 是否设置了全局并发限制?
|
||
- [ ] **索引**:Result 表是否有 `@@index([taskId, status])` 加速聚合?
|
||
- [ ] **DB 级幂等**:API 是否用 `@unique(idempotencyKey)` + P2002 捕获?(禁止 Read-then-Write)
|
||
- [ ] **groupBy 进度查询**:前端进度接口是否用 1 次 `groupBy` 而非多次 `count`?
|
||
- [ ] **expireInMinutes 注释**:Job 配置中是否注明 `expireInMinutes` = 执行超时(非排队超时)?
|
||
|
||
---
|
||
|
||
## 七、与 Fan-out 模式的演进关系
|
||
|
||
```
|
||
Level 1: Postgres-Only 单体任务(现有指南,DC Tool C)
|
||
↓ 当 N > 10
|
||
Level 2: 散装派发 + 轮询收口(本文,ASL 工具 3)
|
||
↓ 当日处理 > 10,000 且 Pod > 10
|
||
Level 3: 分布式 Fan-out(预留指南,暂不启用)
|
||
```
|
||
|
||
**升级条件:** 仅当同时满足以下条件时,才考虑升级到 Level 3:
|
||
1. 单次批量 > 2000 篇,且 Aggregator 10 秒轮询延迟不可接受
|
||
2. 部署 Pod > 10 个,需要实时精确计数(而非 COUNT 聚合)
|
||
3. 团队有 2+ 名熟悉分布式并发的高级开发
|
||
|
||
> **设计哲学:** 用最简单的架构解决当前问题。所有 Fan-out 的知识沉淀都在 `分布式Fan-out任务模式开发指南.md` 中保存完好,等真正需要时再启用。
|
||
|
||
---
|
||
|
||
## 八、遵守底层规范
|
||
|
||
本文所有代码模式均遵守 `Postgres-Only异步任务处理指南.md` 的强制规范:
|
||
|
||
| 规范 | 本文实现 |
|
||
|------|---------|
|
||
| 队列名用下划线 | `asl_extract_single`、`asl_extraction_aggregator` |
|
||
| Payload 轻量 (< 1KB) | 仅传 `{ resultId, pkbDocumentId }` |
|
||
| Worker 必须幂等 | 幽灵守卫 + 临时错误回退 pending |
|
||
| 设置过期时间 | `expireInMinutes: 30`(执行超时,非排队超时) |
|
||
| 错误分级 | 永久 return / 临时 throw |
|
||
| API 幂等 | `@unique(idempotencyKey)` + P2002 捕获(DB 物理级) |
|
||
|
||
---
|
||
|
||
*本文档基于 ASL 工具 3 全文智能提取工作台的架构选型经验总结。*
|
||
*散装派发模式是 Fan-out 模式的降维替代方案,以 80% 的复杂度降低换取 95% 的功能覆盖。*
|
||
*待实战验证后升级为 v2.0。*
|
||
|
||
---
|
||
|
||
### 版本记录
|
||
|
||
| 版本 | 日期 | 变更内容 |
|
||
|------|------|---------|
|
||
| v1.0 | 2026-02-24 | 初版,4 项核心模式 + 8 项检查清单 |
|
||
| v1.1 | 2026-02-24 | 优化 1:API 幂等升级为 `@unique` + P2002(替代 Read-then-Write);优化 2:进度查询 2x count → 1x groupBy;优化 3:`expireInMinutes` 加注释说明含义;反模式 +2,检查清单 +3 |
|