Files
AIclinicalresearch/docs/02-通用能力层/散装派发与轮询收口任务模式指南.md
HaHafeng 371fa53956 docs(asl): Upgrade Tool 3 architecture from Fan-out to Scatter+Aggregator (v2.0)
Architecture transformation:
- Replace Fan-out (Manager->Child->Last Child Wins) with Scatter+Aggregator pattern
- API layer directly dispatches N independent jobs (no Manager)
- Worker only writes its own Result row, never touches Task table (zero row-lock)
- Aggregator polls groupBy for completion + zombie cleanup (replaces Sweeper)
- Reduce red lines from 13 to 9, eliminate distributed complexity

Documents updated (10 files):
- 08-Tool3 main architecture doc: v2.0 rewrite (schema, Task 2.3/2.4, red lines, risks)
- 08d-Code patterns: rewrite sections 4.1-4.6 (API dispatch, SingleWorker, Aggregator)
- 08a-M1 sprint: rewrite M1-3 core (Worker+Aggregator), red lines, acceptance criteria
- 08b-M2 sprint: simplify SSE (NOTIFY/LISTEN downgraded to P2 optional)
- 08c-M3 sprint: milestone table wording update
- New: Scatter+Polling Aggregator pattern guide v1.1 (Level 2 cookbook)
- New: V2.0 architecture deep review and gap-fix report
- Updated: ASL module status, system status, capability layer index

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-02-24 22:11:09 +08:00

385 lines
18 KiB
Markdown
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# 散装派发与轮询收口任务模式指南
> **版本:** v1.1优化DB 级幂等 + groupBy 进度查询 + expireInMinutes 注释)
> **创建日期:** 2026-02-24
> **定位:** Level 2 批量任务实战 Cookbook1 触发 → N 个独立 Worker → Aggregator 收口)
> **底层规范:** `Postgres-Only异步任务处理指南.md`Level 1队列命名、Payload、过期时间等强制规范
> **首个试点:** ASL 工具 3 全文智能提取工作台
> **替代方案:** `分布式Fan-out任务模式开发指南.md`Level 3适用于万级任务 + 10+ Pod当前阶段不推荐
---
## 一、适用场景判断
| 维度 | Level 1单体任务 | **Level 2散装派发本文** | Level 3Fan-out |
|------|-------------------|---------------------------|-----------------|
| **触发模式** | 1 触发 → 1 Worker → 结束 | 1 触发 → N 个独立 Worker → Aggregator 收口 | 1 触发 → Manager → N Child → Last Child Wins |
| **典型案例** | DC Tool C 解析 1 个 Excel | **ASL 工具 3 批量提取 100 篇文献** | 万级任务 + 严格实时计数 |
| **复杂度** | ⭐ | ⭐⭐ | ⭐⭐⭐⭐⭐ |
| **团队要求** | 初级 | 初中级 | 高级(需掌握 12+ 并发模式) |
**判断公式:** 如果你的任务是"1 次操作处理 N 个独立子项N 可能 > 10且团队 < 10 人",选本文的散装模式。
---
## 二、核心架构:散装派发 + 独立单兵 + 轮询收口
```
┌─ API 层 ──────────────────────────────────────┐
│ POST /tasks │
│ 1. 创建 1 个 Taskstatus: processing
│ 2. 批量创建 N 个 Resultstatus: pending
│ 3. 散装派发 N 个 pgBoss Job一次 insert
│ 4. 立即返回 taskId< 0.1 秒) │
└────────────────────────────────────────────────┘
↓ (N 个独立 Job)
┌─ Worker 层(多 Pod 各自抢单)─────────────────┐
│ 每个 Worker 只管自己的 1 篇文献: │
│ 1. 幽灵重试守卫updateMany 防覆盖已完成) │
│ 2. 执行业务逻辑MinerU + LLM
│ 3. 只更新自己的 Result 行 │
│ 4. 绝不触碰父任务 Task 表! │
│ 5. 错误分级:致命 return / 临时回退 + throw │
└────────────────────────────────────────────────┘
┌─ Aggregator全局唯一每 10 秒)─────────────┐
│ 1. 清理僵尸extracting > 30min → error │
│ 2. 聚合统计GROUP BY Result.status │
│ 3. 收口判定pending=0 且 extracting=0 │
│ → 标记 Task completed │
└────────────────────────────────────────────────┘
┌─ 前端React Query 轮询)─────────────────────┐
│ GET /tasks/:taskId/status │
│ → 实时 COUNT(Result) 聚合进度 │
│ → 无需 successCount/failedCount 冗余字段 │
└────────────────────────────────────────────────┘
```
### 为什么不用 Fan-out
| Fan-out 的代价 | 散装模式如何规避 |
|---------------|----------------|
| Last Child Wins终止器逻辑 | **Aggregator 轮询收口**,零并发竞争 |
| 原子递增 `successCount`(行锁争用) | **Worker 不碰父表**,进度由 COUNT 聚合 |
| 乐观锁与重试的互相绞杀 | **幽灵重试守卫 + Aggregator 僵尸清理**,两层配合 |
| Sweeper 清道夫 | **Aggregator 自带清理**,一个组件两个职责 |
| NOTIFY/LISTEN 跨 Pod 广播 | **纯轮询**,无需跨 Pod 通信 |
| 12+ 设计模式、19 项检查清单 | **4 个核心模式 + 8 项检查清单** |
---
## 三、数据库设计
```prisma
model AslExtractionTask {
id String @id @default(uuid())
projectId String
templateId String
idempotencyKey String? @unique // v1.1DB 级幂等(防并发重复创建)
totalCount Int // 创建后不再变更
status String @default("processing") // 仅 Aggregator 改为 completed
createdAt DateTime @default(now())
completedAt DateTime?
results AslExtractionResult[]
@@schema("asl_schema")
}
model AslExtractionResult {
id String @id @default(uuid())
taskId String
pkbDocumentId String
status String @default("pending") // pending → extracting → completed/error
extractedData Json?
errorMessage String?
task AslExtractionTask @relation(fields: [taskId], references: [id])
@@index([taskId, status]) // Aggregator 聚合查询加速
@@schema("asl_schema")
}
```
**关键设计Task 表无 `successCount`/`failedCount`。** 进度由 `COUNT(Result WHERE status=...)` 实时聚合,彻底消灭多 Worker 对同一行的写竞争。
---
## 四、4 项核心代码模式
### 模式 1API 层散装派发
```typescript
async function createTask(req, reply) {
const { projectId, templateId, documentIds, idempotencyKey } = req.body;
if (documentIds.length === 0) throw new Error('No documents selected');
// ═══════════════════════════════════════════════════════
// v1.1 DB 级幂等:利用 @unique 索引 + Prisma P2002 冲突捕获。
// 比 findFirst → if exists 的 Read-then-Write 安全 100 倍:
// 即使两个请求在同一毫秒到达,数据库唯一约束也会物理拦截第二个。
// ═══════════════════════════════════════════════════════
let task;
try {
task = await prisma.aslExtractionTask.create({
data: { projectId, templateId, totalCount: documentIds.length, status: 'processing', idempotencyKey },
});
} catch (error) {
if (error.code === 'P2002' && idempotencyKey) {
const existing = await prisma.aslExtractionTask.findFirst({ where: { idempotencyKey } });
return reply.send({ success: true, taskId: existing.id, note: 'Idempotent return' });
}
throw error;
}
const resultsData = documentIds.map(docId => ({
taskId: task.id, pkbDocumentId: docId, status: 'pending',
}));
await prisma.aslExtractionResult.createMany({ data: resultsData });
const createdResults = await prisma.aslExtractionResult.findMany({ where: { taskId: task.id } });
// 散装派发N 个独立 Job 一次入队
const jobs = createdResults.map(result => ({
name: 'asl_extract_single',
data: { resultId: result.id, pkbDocumentId: result.pkbDocumentId },
options: {
retryLimit: 3,
retryBackoff: true,
expireInMinutes: 30, // 执行超时Worker 拿到 Job 后 30min 未完成则标记 expired非排队等待超时
singletonKey: `extract-${result.id}`, // 防重复派发
},
}));
await jobQueue.insert(jobs);
return reply.send({ success: true, taskId: task.id });
}
```
**要点:**
- `singletonKey: extract-${result.id}` 防止 API 重试导致重复派发
- `idempotencyKey` + `@unique` 索引 + P2002 捕获 = **DB 物理级幂等**v1.1,替代 Read-then-Write
- `createMany` + `insert` 批量操作100 篇文献入队 < 0.1 秒
### 模式 2Worker 单兵作战(含幽灵重试守卫)
```typescript
jobQueue.work('asl_extract_single', { teamConcurrency: 10 }, async (job) => {
const { resultId, pkbDocumentId } = job.data;
// ═══════════════════════════════════════════════════════
// 幽灵重试守卫:只允许 pending → extracting。
// 如果已经是 completed/error说明是 pg-boss 误重投的幽灵,
// 直接跳过,省下一次 LLM API 费用。
// ⚠️ 此守卫依赖模式 3 Aggregator 的僵尸清理配合!
// OOM 崩溃 → status 卡在 extracting → 重试被跳过
// → Aggregator 30 分钟后将 extracting 标为 error 兜底
// ═══════════════════════════════════════════════════════
const lock = await prisma.aslExtractionResult.updateMany({
where: { id: resultId, status: 'pending' },
data: { status: 'extracting' },
});
if (lock.count === 0) {
return { success: true, note: 'Phantom retry skipped' };
}
try {
const data = await extractLogic(pkbDocumentId);
// 只更新自己的 Result 行,绝不碰 Task 表!
await prisma.aslExtractionResult.update({
where: { id: resultId },
data: { status: 'completed', extractedData: data },
});
} catch (error) {
if (isPermanentError(error)) {
await prisma.aslExtractionResult.update({
where: { id: resultId },
data: { status: 'error', errorMessage: error.message },
});
return { success: false, note: 'Permanent error' };
}
// 临时错误:回退状态为 pending让下次重试能通过幽灵守卫
await prisma.aslExtractionResult.update({
where: { id: resultId },
data: { status: 'pending' },
});
throw error; // pg-boss 指数退避重试
}
});
```
**要点:**
- Worker **只写自己的 Result 行**,零行锁争用
- 临时错误 throw 前回退 `status → pending`,确保重试时幽灵守卫不拦截
- `teamConcurrency: 10` 全局限流,防 OOM
### 模式 3Aggregator 轮询收口(含僵尸清理)
```typescript
// 注册为 pg-boss 定时任务(多 Pod 下只有 1 个实例执行)
await jobQueue.schedule('asl_extraction_aggregator', '*/10 * * * * *'); // 每 10 秒
jobQueue.work('asl_extraction_aggregator', async () => {
const activeTasks = await prisma.aslExtractionTask.findMany({
where: { status: 'processing' },
});
for (const task of activeTasks) {
// ═══════════════════════════════════════════════════════
// 僵尸清理extracting 超过 30 分钟 → 标记 error
// 场景Worker OOM 崩溃 → status 卡在 extracting
// → pg-boss 重试时被幽灵守卫跳过
// → 必须由 Aggregator 兜底收口
// ⚠️ 此逻辑与模式 2 的幽灵守卫是绑定关系,缺一不可!
// ═══════════════════════════════════════════════════════
await prisma.aslExtractionResult.updateMany({
where: {
taskId: task.id,
status: 'extracting',
updatedAt: { lt: new Date(Date.now() - 30 * 60 * 1000) },
},
data: {
status: 'error',
errorMessage: '[Aggregator] Extraction timeout (30min) — likely Worker crash.',
},
});
// 聚合统计有索引100 条 < 1ms
const stats = await prisma.aslExtractionResult.groupBy({
by: ['status'],
where: { taskId: task.id },
_count: true,
});
const pendingCount = stats.find(s => s.status === 'pending')?._count || 0;
const extractingCount = stats.find(s => s.status === 'extracting')?._count || 0;
// 收口:没有人在排队或干活了
if (pendingCount === 0 && extractingCount === 0) {
await prisma.aslExtractionTask.update({
where: { id: task.id },
data: { status: 'completed', completedAt: new Date() },
});
logger.info(`[Aggregator] Task ${task.id} completed`);
}
}
});
```
**要点:**
- **一个组件两个职责**:僵尸清理 + 完成收口,无需额外 Sweeper
- `groupBy` + 索引 `@@index([taskId, status])`,聚合极快
- pg-boss `schedule` 保证多 Pod 下只有 1 个实例执行,无并发问题
### 模式 4前端进度查询实时 COUNT
```typescript
async function getTaskStatus(req, reply) {
const { taskId } = req.params;
const task = await prisma.aslExtractionTask.findUnique({ where: { id: taskId } });
// v1.11x groupBy 替代 2x countDB 只扫描一次索引,查询量减半
const stats = await prisma.aslExtractionResult.groupBy({
by: ['status'],
where: { taskId },
_count: true,
});
const successCount = stats.find(s => s.status === 'completed')?._count || 0;
const failedCount = stats.find(s => s.status === 'error')?._count || 0;
return reply.send({
status: task.status,
progress: {
total: task.totalCount,
success: successCount,
failed: failedCount,
percent: Math.round(((successCount + failedCount) / task.totalCount) * 100),
},
});
}
```
---
## 五、反模式速查表
| 反模式 | 后果 | 正确做法 |
|--------|------|---------|
| Worker 更新父 Task 的 successCount | 行锁争用Lock timeout | Worker 只写自己的 Result进度由 COUNT 聚合 |
| Worker 无幽灵守卫,无条件 `extracting` | pg-boss 误重投时覆盖已完成数据,浪费 LLM 费用 | `updateMany({ where: { status: 'pending' } })` |
| 临时错误 throw 前不回退 pending | 重试被幽灵守卫拦截Aggregator 30min 后才能兜底 | throw 前 `update({ status: 'pending' })` |
| Aggregator 不清理僵尸 extracting | OOM 崩溃后 Task 永远卡在 processing | 30min 超时 → error |
| API 派发不加 singletonKey | 网络重试导致重复 Job | `singletonKey: extract-${resultId}` |
| API 幂等用 Read-then-WritefindFirst → if exists | 并发请求穿透检查,创建重复 Task | `@unique` 索引 + P2002 捕获DB 物理级幂等)|
| 进度查询发 2 次 count | 索引扫描 2 次,查询量翻倍 | 1 次 `groupBy` 聚合所有状态 |
| 误解 `expireInMinutes` 为排队超时 | 队列繁忙时正常等待的 Job 被误杀 | `expireInMinutes` = 执行超时,与排队等待无关 |
| Aggregator 用 `startedAt` 判断超时 | 误杀正在排队的大批量任务 | 用 `updatedAt`(最后活跃时间) |
| 不设 `teamConcurrency` | 1000 个 Job 同时拉起 → OOM | `teamConcurrency: 10` |
---
## 六、开发检查清单
- [ ] **Worker 独立性**Worker 是否只更新自己的 Result 行?是否完全不碰 Task 表?
- [ ] **幽灵守卫**Worker 是否用 `updateMany({ where: { status: 'pending' } })` 防覆盖?
- [ ] **临时错误回退**throw 前是否回退 `status → pending`
- [ ] **Aggregator 僵尸清理**:是否清理 extracting > 30min 的 Result
- [ ] **Aggregator 收口**:是否在 `pending=0 && extracting=0` 时标记 Task completed
- [ ] **singletonKey**Job 派发是否设置了 `singletonKey: extract-${resultId}`
- [ ] **teamConcurrency**Worker 是否设置了全局并发限制?
- [ ] **索引**Result 表是否有 `@@index([taskId, status])` 加速聚合?
- [ ] **DB 级幂等**API 是否用 `@unique(idempotencyKey)` + P2002 捕获?(禁止 Read-then-Write
- [ ] **groupBy 进度查询**:前端进度接口是否用 1 次 `groupBy` 而非多次 `count`
- [ ] **expireInMinutes 注释**Job 配置中是否注明 `expireInMinutes` = 执行超时(非排队超时)?
---
## 七、与 Fan-out 模式的演进关系
```
Level 1: Postgres-Only 单体任务现有指南DC Tool C
↓ 当 N > 10
Level 2: 散装派发 + 轮询收口本文ASL 工具 3
↓ 当日处理 > 10,000 且 Pod > 10
Level 3: 分布式 Fan-out预留指南暂不启用
```
**升级条件:** 仅当同时满足以下条件时,才考虑升级到 Level 3
1. 单次批量 > 2000 篇,且 Aggregator 10 秒轮询延迟不可接受
2. 部署 Pod > 10 个,需要实时精确计数(而非 COUNT 聚合)
3. 团队有 2+ 名熟悉分布式并发的高级开发
> **设计哲学:** 用最简单的架构解决当前问题。所有 Fan-out 的知识沉淀都在 `分布式Fan-out任务模式开发指南.md` 中保存完好,等真正需要时再启用。
---
## 八、遵守底层规范
本文所有代码模式均遵守 `Postgres-Only异步任务处理指南.md` 的强制规范:
| 规范 | 本文实现 |
|------|---------|
| 队列名用下划线 | `asl_extract_single``asl_extraction_aggregator` |
| Payload 轻量 (< 1KB) | 仅传 `{ resultId, pkbDocumentId }` |
| Worker 必须幂等 | 幽灵守卫 + 临时错误回退 pending |
| 设置过期时间 | `expireInMinutes: 30`(执行超时,非排队超时) |
| 错误分级 | 永久 return / 临时 throw |
| API 幂等 | `@unique(idempotencyKey)` + P2002 捕获DB 物理级) |
---
*本文档基于 ASL 工具 3 全文智能提取工作台的架构选型经验总结。*
*散装派发模式是 Fan-out 模式的降维替代方案,以 80% 的复杂度降低换取 95% 的功能覆盖。*
*待实战验证后升级为 v2.0。*
---
### 版本记录
| 版本 | 日期 | 变更内容 |
|------|------|---------|
| v1.0 | 2026-02-24 | 初版4 项核心模式 + 8 项检查清单 |
| v1.1 | 2026-02-24 | 优化 1API 幂等升级为 `@unique` + P2002替代 Read-then-Write优化 2进度查询 2x count → 1x groupBy优化 3`expireInMinutes` 加注释说明含义;反模式 +2检查清单 +3 |