Architecture transformation: - Replace Fan-out (Manager->Child->Last Child Wins) with Scatter+Aggregator pattern - API layer directly dispatches N independent jobs (no Manager) - Worker only writes its own Result row, never touches Task table (zero row-lock) - Aggregator polls groupBy for completion + zombie cleanup (replaces Sweeper) - Reduce red lines from 13 to 9, eliminate distributed complexity Documents updated (10 files): - 08-Tool3 main architecture doc: v2.0 rewrite (schema, Task 2.3/2.4, red lines, risks) - 08d-Code patterns: rewrite sections 4.1-4.6 (API dispatch, SingleWorker, Aggregator) - 08a-M1 sprint: rewrite M1-3 core (Worker+Aggregator), red lines, acceptance criteria - 08b-M2 sprint: simplify SSE (NOTIFY/LISTEN downgraded to P2 optional) - 08c-M3 sprint: milestone table wording update - New: Scatter+Polling Aggregator pattern guide v1.1 (Level 2 cookbook) - New: V2.0 architecture deep review and gap-fix report - Updated: ASL module status, system status, capability layer index Co-authored-by: Cursor <cursoragent@cursor.com>
9.4 KiB
M1:骨架管线 — The Skeleton Pipeline
所属: 工具 3 全文智能提取工作台 V2.0
架构总纲:08-工具3-全文智能提取工作台V2.0开发计划.md
代码手册:08d-工具3-代码模式与技术规范.md(所有代码模式均在此手册中,开发时按需查阅)
建议时间: Week 1(5-6 天)
核心目标: 证明 "API 散装派发 → Worker 单兵提取 → Aggregator 收口 → 前端轮询到 completed" 这条管线是通的。
异步模式指南:散装派发与轮询收口任务模式指南.md(Level 2 Cookbook)
Demo 形态
用户在前端点击按钮,系统后台静默跑完流程,前端 useTaskStatus 轮询到 status = completed,数据库能查到 JSON 提取结果。前端只需一个极简列表。
关键妥协:M1 不接 MinerU,不做审核抽屉,不做 SSE 日志流。
任务清单
M1-1:Prisma 数据模型 + Migration + Seed(1 天)
做什么:
- 新增
AslExtractionTemplate、AslProjectTemplate、AslExtractionTask、AslExtractionResult四张表 - 运行
npx prisma migrate dev --name add_extraction_template_engine - Seed 脚本注入 3 套系统内置模板(RCT / Cohort / QC)
验收标准:
npx prisma migrate deploy成功npx prisma db seed后数据库有 3 套模板记录AslExtractionTask含pkbKnowledgeBaseId+idempotencyKey @unique字段(无successCount/failedCount)AslExtractionResult含snapshotStorageKey+snapshotFilename快照字段AslExtractionResult含pkbDocumentId字段、status含pending | extracting | completed | errorAslExtractionResult含@@index([taskId, status])复合索引(Aggregator 性能保障)
📖 Schema 详情见架构总纲 Task 1.1
M1-2:模板 API + 提取任务 API — 仅基座模板(1.5 天)
做什么:
TemplateController.ts:GET 模板列表、GET 模板详情、POST 克隆到项目ExtractionController.ts:- POST 创建任务(含 DB 幂等
idempotencyKey @unique+ P2002) - GET 任务状态(
groupBy聚合 Result 状态,驱动 React Query 轮询) - GET 结果列表
- POST 创建任务(含 DB 幂等
- 🚀 创建任务 = API 层散装派发(无 Manager):锁定模板 → PKB 快照冻结 →
createManyResult →jobQueue.insertN 个asl_extract_singleJob
不做什么:
- 不做自定义字段 CRUD API(M3)
- 不做 SSE 端点(M2)
- 不做 Excel 导出(M2)
验收标准:
POST /api/v1/asl/extraction/tasks创建任务并散装派发 N 个 Job- 重复
idempotencyKey请求返回已有 taskId(幂等验证) GET /api/v1/asl/extraction/tasks/:taskId返回groupBy聚合进度(completedCount / errorCount / pendingCount / extractingCount)GET /api/v1/asl/extraction/tasks/:taskId/results返回提取结果列表
📖 端点完整列表见架构总纲 Task 1.3 + Task 2.4
📖 API 散装派发代码见 08d §4.2
M1-3:PKB ACL 防腐层 + 散装 Worker + Aggregator(2 天)⚠️ 本里程碑最关键
做什么(按顺序):
Step A — PKB 侧 ACL(0.5 天):
PkbExportService.ts(PKB 模块维护):listKnowledgeBases()、listPdfDocuments()、getDocumentForExtraction()返回 DTO- 通过依赖注入暴露给 ASL
Step B — ASL 侧桥接(0.5 天):
PkbBridgeService.ts:调用PkbExportService,代理所有 PKB 数据访问
Step C — ExtractionSingleWorker(0.5 天)⚠️ 核心战役:
ExtractionSingleWorker.ts完整逻辑:- 幽灵重试守卫:
updateMany({ where: { status: 'pending' }, data: { status: 'extracting' } }) - 纯文本降级提取:从 PKB 读
extractedText+ 写死 RCT Schema → 调用 DeepSeek - 只更新自己的 Result:
prisma.aslExtractionResult.update({ status: 'completed', extractedData }) - 绝不碰 Task 表(无
$transaction、无increment、无 Last Child Wins) - 错误分级路由:致命错误 → 标 error + return;临时错误 → 回退
pending+ throw
- 幽灵重试守卫:
Step D — ExtractionAggregator(0.5 天):
ExtractionAggregator.ts:pg-boss schedule 每 2 分钟执行- 一人兼两职:僵尸清理(
extracting > 30min→ error)+ 收口(pending === 0 && extracting === 0→ completed) - 使用
groupBy一次查询统计所有状态
Worker + Aggregator 注册(遵守队列命名规范):
jobQueue.work('asl_extract_single', { teamConcurrency: 10 }, handler)
await jobQueue.schedule('asl_extraction_aggregator', '*/2 * * * *')
await jobQueue.work('asl_extraction_aggregator', aggregatorHandler)
M1 阶段简化:Worker 内部串行调 LLM(不接 MinerU,M2 再接)。
验收标准:
- PkbExportService 能返回知识库列表和文档详情(DTO)
- API 创建任务后
AslExtractionResult.snapshotStorageKey和snapshotFilename已填充(PKB 快照验证) - 手动删除 PKB 文档记录后,Worker 仍能通过
snapshotStorageKey从 OSS 获取 PDF(一致性验证) - API 为 N 篇文献散装派发 N 个
asl_extract_singleJob - Worker 幽灵守卫正确:并发重试不会双倍处理(
lock.count === 0跳过) - Worker 只写 Result 行,Task 表零更新(确认无行锁争用)
- Aggregator 每 2 分钟轮询:
pending === 0 && extracting === 0→ Taskcompleted - Aggregator 僵尸清理:手动将 Result 卡在
extracting超 30 分钟 → 被标为error - 致命错误(PKB 文档不存在)→ 该篇标 error + 不重试 + 不阻塞其他篇
- 临时错误(429)→ 回退
pending+ pg-boss 指数退避重试 - 临时错误回退后重试成功:模拟 429 → 重试 → 幽灵守卫通过 → 提取成功
- 空文献边界:
documentIds = []→ API 直接拒绝(400 Bad Request)
📖 散装架构图、Worker 代码模式见架构总纲 Task 2.3
📖 ACL 防腐层设计见架构总纲 Task 3.3b
📖 Worker / Aggregator 代码见 08d §4.3 / §4.6
📖 散装模式指南见散装派发与轮询收口任务模式指南.md
M1-4:前端极简 Step 1 — 选模板 + 选 PKB 文献(1 天)
做什么:
ExtractionSetup.tsx:左栏模板下拉(只读,默认 RCT)+ 右栏 PKB 知识库下拉 + 文献 Checkbox 列表PkbKnowledgeBaseSelector.tsx:调用 PKB API 加载知识库和文献- 底部 "确认并开始提取" 按钮 → 调用
POST /api/v1/asl/extraction/tasks
不做什么:
- 不做自定义字段 UI(M3)
- 不做基座字段标签云展示(M2 附带做)
验收标准:
- 能选择 PKB 知识库并展示 PDF 文档列表
- 能勾选文献并提交创建任务
- 空知识库时显示引导提示 + PKB 跳转链接
M1-5:前端极简 Step 2 + Step 3 — 轮询进度 + 极简列表(1 天)
做什么:
ExtractionProgress.tsx:useTaskStatus轮询(3s)驱动进度条 + 检测completed跳转ExtractionWorkbench.tsx:极简表格展示提取结果(Study ID、状态)ExtractionPage.tsx:状态驱动路由(pending→Step1 / processing→Step2 / completed→Step3)- 路由注册(前端 + 后端)
不做什么:
- 不做 SSE 日志终端(M2)
- 不做审核抽屉(M2)
- 不做 Excel 导出按钮(M2)
验收标准:
- 进度条从 0% 推进到 100%(React Query 轮询驱动)
status = completed后自动跳转到 Step 3- Step 3 能看到提取结果列表(状态列展示 completed/error)
- 关闭浏览器重新打开 → 恢复到正确步骤(断点恢复)
M1 研发红线(全员必须背诵)
| # | 红线 | 违反后果 |
|---|---|---|
| 1 | 队列名用下划线(asl_extract_single),禁止点号 |
pg-boss 路由截断 |
| 2 | Worker 绝不碰 Task 表,只写自己的 Result 行 | 行锁争用 / 死锁 |
| 3 | Worker 用 updateMany({ status: 'pending' }) 幽灵守卫,禁止 findUnique → if |
并发穿透,LLM 费用白烧 |
| 4 | 临时错误 throw 前必须 update({ status: 'pending' }) 回退 |
重试被幽灵守卫误跳过 |
| 5 | teamConcurrency: 10,禁止无限拉取 Job |
Node.js OOM |
| 6 | Job Payload 仅传 ID(< 200 bytes),禁止塞 PDF 正文 | pg-boss 阻塞 |
| 7 | ACL 防腐层:ASL 不 import PKB 内部类型 | 模块耦合蔓延 |
| 8 | API 层快照 snapshotStorageKey + snapshotFilename,Worker 禁止运行时回查 PKB |
PKB 删文档 → 批量崩溃 |
| 9 | API 创建任务用 idempotencyKey @unique + P2002,禁止 Read-then-Write |
并发穿透创建重复任务 |
M1 结束时的状态
✅ Prisma 表 + 3 套 Seed 模板(含 idempotencyKey @unique)
✅ PKB ACL 防腐层 → PkbExportService + PkbBridgeService
✅ 散装派发全链路:API 散装 → N × Worker → Aggregator 收口 → completed
✅ 幽灵守卫 + 错误分级路由 + Aggregator 僵尸清理 — 所有并发 Bug 已验证
✅ API 层 DB 幂等 + PKB 快照冻结
✅ 前端三步走:选模板/选文献 → 轮询进度(groupBy 聚合)→ 极简结果列表
❌ 无 MinerU(纯文本降级)
❌ 无 SSE 日志流
❌ 无审核抽屉
❌ 无自定义字段
❌ 无 Excel 导出
M1 的核心价值: 散装架构天然消除了 Fan-out 的行锁争用、Last Child Wins 终点丢失等分布式 Bug。Worker 逻辑极简(只写自己),Aggregator 定时收口,第一周就能稳定跑通全链路。