Files
AIclinicalresearch/docs/02-通用能力层/散装派发与轮询收口任务模式指南.md
HaHafeng 371fa53956 docs(asl): Upgrade Tool 3 architecture from Fan-out to Scatter+Aggregator (v2.0)
Architecture transformation:
- Replace Fan-out (Manager->Child->Last Child Wins) with Scatter+Aggregator pattern
- API layer directly dispatches N independent jobs (no Manager)
- Worker only writes its own Result row, never touches Task table (zero row-lock)
- Aggregator polls groupBy for completion + zombie cleanup (replaces Sweeper)
- Reduce red lines from 13 to 9, eliminate distributed complexity

Documents updated (10 files):
- 08-Tool3 main architecture doc: v2.0 rewrite (schema, Task 2.3/2.4, red lines, risks)
- 08d-Code patterns: rewrite sections 4.1-4.6 (API dispatch, SingleWorker, Aggregator)
- 08a-M1 sprint: rewrite M1-3 core (Worker+Aggregator), red lines, acceptance criteria
- 08b-M2 sprint: simplify SSE (NOTIFY/LISTEN downgraded to P2 optional)
- 08c-M3 sprint: milestone table wording update
- New: Scatter+Polling Aggregator pattern guide v1.1 (Level 2 cookbook)
- New: V2.0 architecture deep review and gap-fix report
- Updated: ASL module status, system status, capability layer index

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-02-24 22:11:09 +08:00

18 KiB
Raw Blame History

散装派发与轮询收口任务模式指南

版本: v1.1优化DB 级幂等 + groupBy 进度查询 + expireInMinutes 注释)
创建日期: 2026-02-24
定位: Level 2 批量任务实战 Cookbook1 触发 → N 个独立 Worker → Aggregator 收口)
底层规范: Postgres-Only异步任务处理指南.mdLevel 1队列命名、Payload、过期时间等强制规范
首个试点: ASL 工具 3 全文智能提取工作台
替代方案: 分布式Fan-out任务模式开发指南.mdLevel 3适用于万级任务 + 10+ Pod当前阶段不推荐


一、适用场景判断

维度 Level 1单体任务 Level 2散装派发本文 Level 3Fan-out
触发模式 1 触发 → 1 Worker → 结束 1 触发 → N 个独立 Worker → Aggregator 收口 1 触发 → Manager → N Child → Last Child Wins
典型案例 DC Tool C 解析 1 个 Excel ASL 工具 3 批量提取 100 篇文献 万级任务 + 严格实时计数
复杂度
团队要求 初级 初中级 高级(需掌握 12+ 并发模式)

判断公式: 如果你的任务是"1 次操作处理 N 个独立子项N 可能 > 10且团队 < 10 人",选本文的散装模式。


二、核心架构:散装派发 + 独立单兵 + 轮询收口

┌─ API 层 ──────────────────────────────────────┐
│  POST /tasks                                   │
│  1. 创建 1 个 Taskstatus: processing       │
│  2. 批量创建 N 个 Resultstatus: pending    │
│  3. 散装派发 N 个 pgBoss Job一次 insert    │
│  4. 立即返回 taskId< 0.1 秒)               │
└────────────────────────────────────────────────┘
       ↓ (N 个独立 Job)
┌─ Worker 层(多 Pod 各自抢单)─────────────────┐
│  每个 Worker 只管自己的 1 篇文献:             │
│  1. 幽灵重试守卫updateMany 防覆盖已完成)   │
│  2. 执行业务逻辑MinerU + LLM              │
│  3. 只更新自己的 Result 行                     │
│  4. 绝不触碰父任务 Task 表!                   │
│  5. 错误分级:致命 return / 临时回退 + throw   │
└────────────────────────────────────────────────┘

┌─ Aggregator全局唯一每 10 秒)─────────────┐
│  1. 清理僵尸extracting > 30min → error      │
│  2. 聚合统计GROUP BY Result.status           │
│  3. 收口判定pending=0 且 extracting=0        │
│     → 标记 Task completed                      │
└────────────────────────────────────────────────┘

┌─ 前端React Query 轮询)─────────────────────┐
│  GET /tasks/:taskId/status                     │
│  → 实时 COUNT(Result) 聚合进度                 │
│  → 无需 successCount/failedCount 冗余字段      │
└────────────────────────────────────────────────┘

为什么不用 Fan-out

Fan-out 的代价 散装模式如何规避
Last Child Wins终止器逻辑 Aggregator 轮询收口,零并发竞争
原子递增 successCount(行锁争用) Worker 不碰父表,进度由 COUNT 聚合
乐观锁与重试的互相绞杀 幽灵重试守卫 + Aggregator 僵尸清理,两层配合
Sweeper 清道夫 Aggregator 自带清理,一个组件两个职责
NOTIFY/LISTEN 跨 Pod 广播 纯轮询,无需跨 Pod 通信
12+ 设计模式、19 项检查清单 4 个核心模式 + 8 项检查清单

三、数据库设计

model AslExtractionTask {
  id              String   @id @default(uuid())
  projectId       String
  templateId      String
  idempotencyKey  String?  @unique  // v1.1DB 级幂等(防并发重复创建)
  totalCount      Int      // 创建后不再变更
  status          String   @default("processing")  // 仅 Aggregator 改为 completed
  createdAt       DateTime @default(now())
  completedAt     DateTime?
  results         AslExtractionResult[]
  @@schema("asl_schema")
}

model AslExtractionResult {
  id              String   @id @default(uuid())
  taskId          String
  pkbDocumentId   String
  status          String   @default("pending")  // pending → extracting → completed/error
  extractedData   Json?
  errorMessage    String?
  task            AslExtractionTask @relation(fields: [taskId], references: [id])
  @@index([taskId, status])  // Aggregator 聚合查询加速
  @@schema("asl_schema")
}

关键设计Task 表无 successCount/failedCount 进度由 COUNT(Result WHERE status=...) 实时聚合,彻底消灭多 Worker 对同一行的写竞争。


四、4 项核心代码模式

模式 1API 层散装派发

async function createTask(req, reply) {
  const { projectId, templateId, documentIds, idempotencyKey } = req.body;

  if (documentIds.length === 0) throw new Error('No documents selected');

  // ═══════════════════════════════════════════════════════
  // v1.1 DB 级幂等:利用 @unique 索引 + Prisma P2002 冲突捕获。
  // 比 findFirst → if exists 的 Read-then-Write 安全 100 倍:
  // 即使两个请求在同一毫秒到达,数据库唯一约束也会物理拦截第二个。
  // ═══════════════════════════════════════════════════════
  let task;
  try {
    task = await prisma.aslExtractionTask.create({
      data: { projectId, templateId, totalCount: documentIds.length, status: 'processing', idempotencyKey },
    });
  } catch (error) {
    if (error.code === 'P2002' && idempotencyKey) {
      const existing = await prisma.aslExtractionTask.findFirst({ where: { idempotencyKey } });
      return reply.send({ success: true, taskId: existing.id, note: 'Idempotent return' });
    }
    throw error;
  }

  const resultsData = documentIds.map(docId => ({
    taskId: task.id, pkbDocumentId: docId, status: 'pending',
  }));
  await prisma.aslExtractionResult.createMany({ data: resultsData });

  const createdResults = await prisma.aslExtractionResult.findMany({ where: { taskId: task.id } });

  // 散装派发N 个独立 Job 一次入队
  const jobs = createdResults.map(result => ({
    name: 'asl_extract_single',
    data: { resultId: result.id, pkbDocumentId: result.pkbDocumentId },
    options: {
      retryLimit: 3,
      retryBackoff: true,
      expireInMinutes: 30,  // 执行超时Worker 拿到 Job 后 30min 未完成则标记 expired非排队等待超时
      singletonKey: `extract-${result.id}`,  // 防重复派发
    },
  }));
  await jobQueue.insert(jobs);

  return reply.send({ success: true, taskId: task.id });
}

要点:

  • singletonKey: extract-${result.id} 防止 API 重试导致重复派发
  • idempotencyKey + @unique 索引 + P2002 捕获 = DB 物理级幂等v1.1,替代 Read-then-Write
  • createMany + insert 批量操作100 篇文献入队 < 0.1 秒

模式 2Worker 单兵作战(含幽灵重试守卫)

jobQueue.work('asl_extract_single', { teamConcurrency: 10 }, async (job) => {
  const { resultId, pkbDocumentId } = job.data;

  // ═══════════════════════════════════════════════════════
  // 幽灵重试守卫:只允许 pending → extracting。
  // 如果已经是 completed/error说明是 pg-boss 误重投的幽灵,
  // 直接跳过,省下一次 LLM API 费用。
  // ⚠️ 此守卫依赖模式 3 Aggregator 的僵尸清理配合!
  //    OOM 崩溃 → status 卡在 extracting → 重试被跳过
  //    → Aggregator 30 分钟后将 extracting 标为 error 兜底
  // ═══════════════════════════════════════════════════════
  const lock = await prisma.aslExtractionResult.updateMany({
    where: { id: resultId, status: 'pending' },
    data: { status: 'extracting' },
  });
  if (lock.count === 0) {
    return { success: true, note: 'Phantom retry skipped' };
  }

  try {
    const data = await extractLogic(pkbDocumentId);

    // 只更新自己的 Result 行,绝不碰 Task 表!
    await prisma.aslExtractionResult.update({
      where: { id: resultId },
      data: { status: 'completed', extractedData: data },
    });
  } catch (error) {
    if (isPermanentError(error)) {
      await prisma.aslExtractionResult.update({
        where: { id: resultId },
        data: { status: 'error', errorMessage: error.message },
      });
      return { success: false, note: 'Permanent error' };
    }

    // 临时错误:回退状态为 pending让下次重试能通过幽灵守卫
    await prisma.aslExtractionResult.update({
      where: { id: resultId },
      data: { status: 'pending' },
    });
    throw error;  // pg-boss 指数退避重试
  }
});

要点:

  • Worker 只写自己的 Result 行,零行锁争用
  • 临时错误 throw 前回退 status → pending,确保重试时幽灵守卫不拦截
  • teamConcurrency: 10 全局限流,防 OOM

模式 3Aggregator 轮询收口(含僵尸清理)

// 注册为 pg-boss 定时任务(多 Pod 下只有 1 个实例执行)
await jobQueue.schedule('asl_extraction_aggregator', '*/10 * * * * *');  // 每 10 秒

jobQueue.work('asl_extraction_aggregator', async () => {
  const activeTasks = await prisma.aslExtractionTask.findMany({
    where: { status: 'processing' },
  });

  for (const task of activeTasks) {
    // ═══════════════════════════════════════════════════════
    // 僵尸清理extracting 超过 30 分钟 → 标记 error
    // 场景Worker OOM 崩溃 → status 卡在 extracting
    //       → pg-boss 重试时被幽灵守卫跳过
    //       → 必须由 Aggregator 兜底收口
    // ⚠️ 此逻辑与模式 2 的幽灵守卫是绑定关系,缺一不可!
    // ═══════════════════════════════════════════════════════
    await prisma.aslExtractionResult.updateMany({
      where: {
        taskId: task.id,
        status: 'extracting',
        updatedAt: { lt: new Date(Date.now() - 30 * 60 * 1000) },
      },
      data: {
        status: 'error',
        errorMessage: '[Aggregator] Extraction timeout (30min) — likely Worker crash.',
      },
    });

    // 聚合统计有索引100 条 < 1ms
    const stats = await prisma.aslExtractionResult.groupBy({
      by: ['status'],
      where: { taskId: task.id },
      _count: true,
    });

    const pendingCount = stats.find(s => s.status === 'pending')?._count || 0;
    const extractingCount = stats.find(s => s.status === 'extracting')?._count || 0;

    // 收口:没有人在排队或干活了
    if (pendingCount === 0 && extractingCount === 0) {
      await prisma.aslExtractionTask.update({
        where: { id: task.id },
        data: { status: 'completed', completedAt: new Date() },
      });
      logger.info(`[Aggregator] Task ${task.id} completed`);
    }
  }
});

要点:

  • 一个组件两个职责:僵尸清理 + 完成收口,无需额外 Sweeper
  • groupBy + 索引 @@index([taskId, status]),聚合极快
  • pg-boss schedule 保证多 Pod 下只有 1 个实例执行,无并发问题

模式 4前端进度查询实时 COUNT

async function getTaskStatus(req, reply) {
  const { taskId } = req.params;

  const task = await prisma.aslExtractionTask.findUnique({ where: { id: taskId } });

  // v1.11x groupBy 替代 2x countDB 只扫描一次索引,查询量减半
  const stats = await prisma.aslExtractionResult.groupBy({
    by: ['status'],
    where: { taskId },
    _count: true,
  });

  const successCount = stats.find(s => s.status === 'completed')?._count || 0;
  const failedCount = stats.find(s => s.status === 'error')?._count || 0;

  return reply.send({
    status: task.status,
    progress: {
      total: task.totalCount,
      success: successCount,
      failed: failedCount,
      percent: Math.round(((successCount + failedCount) / task.totalCount) * 100),
    },
  });
}

五、反模式速查表

反模式 后果 正确做法
Worker 更新父 Task 的 successCount 行锁争用Lock timeout Worker 只写自己的 Result进度由 COUNT 聚合
Worker 无幽灵守卫,无条件 extracting pg-boss 误重投时覆盖已完成数据,浪费 LLM 费用 updateMany({ where: { status: 'pending' } })
临时错误 throw 前不回退 pending 重试被幽灵守卫拦截Aggregator 30min 后才能兜底 throw 前 update({ status: 'pending' })
Aggregator 不清理僵尸 extracting OOM 崩溃后 Task 永远卡在 processing 30min 超时 → error
API 派发不加 singletonKey 网络重试导致重复 Job singletonKey: extract-${resultId}
API 幂等用 Read-then-WritefindFirst → if exists 并发请求穿透检查,创建重复 Task @unique 索引 + P2002 捕获DB 物理级幂等)
进度查询发 2 次 count 索引扫描 2 次,查询量翻倍 1 次 groupBy 聚合所有状态
误解 expireInMinutes 为排队超时 队列繁忙时正常等待的 Job 被误杀 expireInMinutes = 执行超时,与排队等待无关
Aggregator 用 startedAt 判断超时 误杀正在排队的大批量任务 updatedAt(最后活跃时间)
不设 teamConcurrency 1000 个 Job 同时拉起 → OOM teamConcurrency: 10

六、开发检查清单

  • Worker 独立性Worker 是否只更新自己的 Result 行?是否完全不碰 Task 表?
  • 幽灵守卫Worker 是否用 updateMany({ where: { status: 'pending' } }) 防覆盖?
  • 临时错误回退throw 前是否回退 status → pending
  • Aggregator 僵尸清理:是否清理 extracting > 30min 的 Result
  • Aggregator 收口:是否在 pending=0 && extracting=0 时标记 Task completed
  • singletonKeyJob 派发是否设置了 singletonKey: extract-${resultId}
  • teamConcurrencyWorker 是否设置了全局并发限制?
  • 索引Result 表是否有 @@index([taskId, status]) 加速聚合?
  • DB 级幂等API 是否用 @unique(idempotencyKey) + P2002 捕获?(禁止 Read-then-Write
  • groupBy 进度查询:前端进度接口是否用 1 次 groupBy 而非多次 count
  • expireInMinutes 注释Job 配置中是否注明 expireInMinutes = 执行超时(非排队超时)?

七、与 Fan-out 模式的演进关系

Level 1: Postgres-Only 单体任务现有指南DC Tool C
  ↓ 当 N > 10
Level 2: 散装派发 + 轮询收口本文ASL 工具 3
  ↓ 当日处理 > 10,000 且 Pod > 10
Level 3: 分布式 Fan-out预留指南暂不启用

升级条件: 仅当同时满足以下条件时,才考虑升级到 Level 3

  1. 单次批量 > 2000 篇,且 Aggregator 10 秒轮询延迟不可接受
  2. 部署 Pod > 10 个,需要实时精确计数(而非 COUNT 聚合)
  3. 团队有 2+ 名熟悉分布式并发的高级开发

设计哲学: 用最简单的架构解决当前问题。所有 Fan-out 的知识沉淀都在 分布式Fan-out任务模式开发指南.md 中保存完好,等真正需要时再启用。


八、遵守底层规范

本文所有代码模式均遵守 Postgres-Only异步任务处理指南.md 的强制规范:

规范 本文实现
队列名用下划线 asl_extract_singleasl_extraction_aggregator
Payload 轻量 (< 1KB) 仅传 { resultId, pkbDocumentId }
Worker 必须幂等 幽灵守卫 + 临时错误回退 pending
设置过期时间 expireInMinutes: 30(执行超时,非排队超时)
错误分级 永久 return / 临时 throw
API 幂等 @unique(idempotencyKey) + P2002 捕获DB 物理级)

本文档基于 ASL 工具 3 全文智能提取工作台的架构选型经验总结。 散装派发模式是 Fan-out 模式的降维替代方案,以 80% 的复杂度降低换取 95% 的功能覆盖。 待实战验证后升级为 v2.0。


版本记录

版本 日期 变更内容
v1.0 2026-02-24 初版4 项核心模式 + 8 项检查清单
v1.1 2026-02-24 优化 1API 幂等升级为 @unique + P2002替代 Read-then-Write优化 2进度查询 2x count → 1x groupBy优化 3expireInMinutes 加注释说明含义;反模式 +2检查清单 +3