Architecture transformation: - Replace Fan-out (Manager->Child->Last Child Wins) with Scatter+Aggregator pattern - API layer directly dispatches N independent jobs (no Manager) - Worker only writes its own Result row, never touches Task table (zero row-lock) - Aggregator polls groupBy for completion + zombie cleanup (replaces Sweeper) - Reduce red lines from 13 to 9, eliminate distributed complexity Documents updated (10 files): - 08-Tool3 main architecture doc: v2.0 rewrite (schema, Task 2.3/2.4, red lines, risks) - 08d-Code patterns: rewrite sections 4.1-4.6 (API dispatch, SingleWorker, Aggregator) - 08a-M1 sprint: rewrite M1-3 core (Worker+Aggregator), red lines, acceptance criteria - 08b-M2 sprint: simplify SSE (NOTIFY/LISTEN downgraded to P2 optional) - 08c-M3 sprint: milestone table wording update - New: Scatter+Polling Aggregator pattern guide v1.1 (Level 2 cookbook) - New: V2.0 architecture deep review and gap-fix report - Updated: ASL module status, system status, capability layer index Co-authored-by: Cursor <cursoragent@cursor.com>
193 lines
9.4 KiB
Markdown
193 lines
9.4 KiB
Markdown
# M1:骨架管线 — The Skeleton Pipeline
|
||
|
||
> **所属:** 工具 3 全文智能提取工作台 V2.0
|
||
> **架构总纲:** `08-工具3-全文智能提取工作台V2.0开发计划.md`
|
||
> **代码手册:** `08d-工具3-代码模式与技术规范.md`(所有代码模式均在此手册中,开发时按需查阅)
|
||
> **建议时间:** Week 1(5-6 天)
|
||
> **核心目标:** 证明 "API 散装派发 → Worker 单兵提取 → Aggregator 收口 → 前端轮询到 completed" 这条管线是通的。
|
||
> **异步模式指南:** `散装派发与轮询收口任务模式指南.md`(Level 2 Cookbook)
|
||
|
||
---
|
||
|
||
## Demo 形态
|
||
|
||
用户在前端点击按钮,系统后台静默跑完流程,前端 `useTaskStatus` 轮询到 `status = completed`,数据库能查到 JSON 提取结果。前端只需一个极简列表。
|
||
|
||
**关键妥协:M1 不接 MinerU,不做审核抽屉,不做 SSE 日志流。**
|
||
|
||
---
|
||
|
||
## 任务清单
|
||
|
||
### M1-1:Prisma 数据模型 + Migration + Seed(1 天)
|
||
|
||
**做什么:**
|
||
- 新增 `AslExtractionTemplate`、`AslProjectTemplate`、`AslExtractionTask`、`AslExtractionResult` 四张表
|
||
- 运行 `npx prisma migrate dev --name add_extraction_template_engine`
|
||
- Seed 脚本注入 3 套系统内置模板(RCT / Cohort / QC)
|
||
|
||
**验收标准:**
|
||
- [ ] `npx prisma migrate deploy` 成功
|
||
- [ ] `npx prisma db seed` 后数据库有 3 套模板记录
|
||
- [ ] `AslExtractionTask` 含 `pkbKnowledgeBaseId` + `idempotencyKey @unique` 字段(无 `successCount/failedCount`)
|
||
- [ ] `AslExtractionResult` 含 `snapshotStorageKey` + `snapshotFilename` 快照字段
|
||
- [ ] `AslExtractionResult` 含 `pkbDocumentId` 字段、`status` 含 `pending | extracting | completed | error`
|
||
- [ ] `AslExtractionResult` 含 `@@index([taskId, status])` 复合索引(Aggregator 性能保障)
|
||
|
||
> 📖 Schema 详情见架构总纲 Task 1.1
|
||
|
||
---
|
||
|
||
### M1-2:模板 API + 提取任务 API — 仅基座模板(1.5 天)
|
||
|
||
**做什么:**
|
||
- `TemplateController.ts`:GET 模板列表、GET 模板详情、POST 克隆到项目
|
||
- `ExtractionController.ts`:
|
||
- POST 创建任务(含 DB 幂等 `idempotencyKey @unique` + P2002)
|
||
- GET 任务状态(`groupBy` 聚合 Result 状态,驱动 React Query 轮询)
|
||
- GET 结果列表
|
||
- 🚀 **创建任务 = API 层散装派发(无 Manager)**:锁定模板 → PKB 快照冻结 → `createMany` Result → `jobQueue.insert` N 个 `asl_extract_single` Job
|
||
|
||
**不做什么:**
|
||
- 不做自定义字段 CRUD API(M3)
|
||
- 不做 SSE 端点(M2)
|
||
- 不做 Excel 导出(M2)
|
||
|
||
**验收标准:**
|
||
- [ ] `POST /api/v1/asl/extraction/tasks` 创建任务并散装派发 N 个 Job
|
||
- [ ] 重复 `idempotencyKey` 请求返回已有 taskId(幂等验证)
|
||
- [ ] `GET /api/v1/asl/extraction/tasks/:taskId` 返回 `groupBy` 聚合进度(completedCount / errorCount / pendingCount / extractingCount)
|
||
- [ ] `GET /api/v1/asl/extraction/tasks/:taskId/results` 返回提取结果列表
|
||
|
||
> 📖 端点完整列表见架构总纲 Task 1.3 + Task 2.4
|
||
> 📖 API 散装派发代码见 08d §4.2
|
||
|
||
---
|
||
|
||
### M1-3:PKB ACL 防腐层 + 散装 Worker + Aggregator(2 天)⚠️ 本里程碑最关键
|
||
|
||
**做什么(按顺序):**
|
||
|
||
**Step A — PKB 侧 ACL(0.5 天):**
|
||
- `PkbExportService.ts`(PKB 模块维护):`listKnowledgeBases()`、`listPdfDocuments()`、`getDocumentForExtraction()` 返回 DTO
|
||
- 通过依赖注入暴露给 ASL
|
||
|
||
**Step B — ASL 侧桥接(0.5 天):**
|
||
- `PkbBridgeService.ts`:调用 `PkbExportService`,代理所有 PKB 数据访问
|
||
|
||
**Step C — ExtractionSingleWorker(0.5 天)⚠️ 核心战役:**
|
||
- `ExtractionSingleWorker.ts` 完整逻辑:
|
||
1. **幽灵重试守卫**:`updateMany({ where: { status: 'pending' }, data: { status: 'extracting' } })`
|
||
2. **纯文本降级提取**:从 PKB 读 `extractedText` + 写死 RCT Schema → 调用 DeepSeek
|
||
3. **只更新自己的 Result**:`prisma.aslExtractionResult.update({ status: 'completed', extractedData })`
|
||
4. **绝不碰 Task 表**(无 `$transaction`、无 `increment`、无 Last Child Wins)
|
||
5. **错误分级路由**:致命错误 → 标 error + return;临时错误 → 回退 `pending` + throw
|
||
|
||
**Step D — ExtractionAggregator(0.5 天):**
|
||
- `ExtractionAggregator.ts`:pg-boss schedule 每 2 分钟执行
|
||
- 一人兼两职:**僵尸清理**(`extracting > 30min` → error)+ **收口**(`pending === 0 && extracting === 0` → completed)
|
||
- 使用 `groupBy` 一次查询统计所有状态
|
||
|
||
**Worker + Aggregator 注册(遵守队列命名规范):**
|
||
```
|
||
jobQueue.work('asl_extract_single', { teamConcurrency: 10 }, handler)
|
||
await jobQueue.schedule('asl_extraction_aggregator', '*/2 * * * *')
|
||
await jobQueue.work('asl_extraction_aggregator', aggregatorHandler)
|
||
```
|
||
|
||
**M1 阶段简化:Worker 内部串行调 LLM(不接 MinerU,M2 再接)。**
|
||
|
||
**验收标准:**
|
||
- [ ] PkbExportService 能返回知识库列表和文档详情(DTO)
|
||
- [ ] API 创建任务后 `AslExtractionResult.snapshotStorageKey` 和 `snapshotFilename` 已填充(PKB 快照验证)
|
||
- [ ] 手动删除 PKB 文档记录后,Worker 仍能通过 `snapshotStorageKey` 从 OSS 获取 PDF(一致性验证)
|
||
- [ ] API 为 N 篇文献散装派发 N 个 `asl_extract_single` Job
|
||
- [ ] Worker 幽灵守卫正确:并发重试不会双倍处理(`lock.count === 0` 跳过)
|
||
- [ ] Worker 只写 Result 行,Task 表零更新(确认无行锁争用)
|
||
- [ ] Aggregator 每 2 分钟轮询:`pending === 0 && extracting === 0` → Task `completed`
|
||
- [ ] Aggregator 僵尸清理:手动将 Result 卡在 `extracting` 超 30 分钟 → 被标为 `error`
|
||
- [ ] 致命错误(PKB 文档不存在)→ 该篇标 error + 不重试 + 不阻塞其他篇
|
||
- [ ] 临时错误(429)→ 回退 `pending` + pg-boss 指数退避重试
|
||
- [ ] 临时错误回退后重试成功:模拟 429 → 重试 → 幽灵守卫通过 → 提取成功
|
||
- [ ] 空文献边界:`documentIds = []` → API 直接拒绝(400 Bad Request)
|
||
|
||
> 📖 散装架构图、Worker 代码模式见架构总纲 Task 2.3
|
||
> 📖 ACL 防腐层设计见架构总纲 Task 3.3b
|
||
> 📖 Worker / Aggregator 代码见 08d §4.3 / §4.6
|
||
> 📖 散装模式指南见 `散装派发与轮询收口任务模式指南.md`
|
||
|
||
---
|
||
|
||
### M1-4:前端极简 Step 1 — 选模板 + 选 PKB 文献(1 天)
|
||
|
||
**做什么:**
|
||
- `ExtractionSetup.tsx`:左栏模板下拉(只读,默认 RCT)+ 右栏 PKB 知识库下拉 + 文献 Checkbox 列表
|
||
- `PkbKnowledgeBaseSelector.tsx`:调用 PKB API 加载知识库和文献
|
||
- 底部 "确认并开始提取" 按钮 → 调用 `POST /api/v1/asl/extraction/tasks`
|
||
|
||
**不做什么:**
|
||
- 不做自定义字段 UI(M3)
|
||
- 不做基座字段标签云展示(M2 附带做)
|
||
|
||
**验收标准:**
|
||
- [ ] 能选择 PKB 知识库并展示 PDF 文档列表
|
||
- [ ] 能勾选文献并提交创建任务
|
||
- [ ] 空知识库时显示引导提示 + PKB 跳转链接
|
||
|
||
---
|
||
|
||
### M1-5:前端极简 Step 2 + Step 3 — 轮询进度 + 极简列表(1 天)
|
||
|
||
**做什么:**
|
||
- `ExtractionProgress.tsx`:`useTaskStatus` 轮询(3s)驱动进度条 + 检测 `completed` 跳转
|
||
- `ExtractionWorkbench.tsx`:极简表格展示提取结果(Study ID、状态)
|
||
- `ExtractionPage.tsx`:状态驱动路由(pending→Step1 / processing→Step2 / completed→Step3)
|
||
- 路由注册(前端 + 后端)
|
||
|
||
**不做什么:**
|
||
- 不做 SSE 日志终端(M2)
|
||
- 不做审核抽屉(M2)
|
||
- 不做 Excel 导出按钮(M2)
|
||
|
||
**验收标准:**
|
||
- [ ] 进度条从 0% 推进到 100%(React Query 轮询驱动)
|
||
- [ ] `status = completed` 后自动跳转到 Step 3
|
||
- [ ] Step 3 能看到提取结果列表(状态列展示 completed/error)
|
||
- [ ] 关闭浏览器重新打开 → 恢复到正确步骤(断点恢复)
|
||
|
||
---
|
||
|
||
## M1 研发红线(全员必须背诵)
|
||
|
||
| # | 红线 | 违反后果 |
|
||
|---|------|---------|
|
||
| 1 | 队列名用下划线(`asl_extract_single`),禁止点号 | pg-boss 路由截断 |
|
||
| 2 | **Worker 绝不碰 Task 表**,只写自己的 Result 行 | 行锁争用 / 死锁 |
|
||
| 3 | Worker 用 `updateMany({ status: 'pending' })` 幽灵守卫,禁止 `findUnique → if` | 并发穿透,LLM 费用白烧 |
|
||
| 4 | 临时错误 `throw` 前必须 `update({ status: 'pending' })` 回退 | 重试被幽灵守卫误跳过 |
|
||
| 5 | `teamConcurrency: 10`,禁止无限拉取 Job | Node.js OOM |
|
||
| 6 | Job Payload 仅传 ID(< 200 bytes),禁止塞 PDF 正文 | pg-boss 阻塞 |
|
||
| 7 | ACL 防腐层:ASL 不 import PKB 内部类型 | 模块耦合蔓延 |
|
||
| 8 | API 层快照 `snapshotStorageKey` + `snapshotFilename`,Worker 禁止运行时回查 PKB | PKB 删文档 → 批量崩溃 |
|
||
| 9 | API 创建任务用 `idempotencyKey @unique` + P2002,禁止 Read-then-Write | 并发穿透创建重复任务 |
|
||
|
||
---
|
||
|
||
## M1 结束时的状态
|
||
|
||
```
|
||
✅ Prisma 表 + 3 套 Seed 模板(含 idempotencyKey @unique)
|
||
✅ PKB ACL 防腐层 → PkbExportService + PkbBridgeService
|
||
✅ 散装派发全链路:API 散装 → N × Worker → Aggregator 收口 → completed
|
||
✅ 幽灵守卫 + 错误分级路由 + Aggregator 僵尸清理 — 所有并发 Bug 已验证
|
||
✅ API 层 DB 幂等 + PKB 快照冻结
|
||
✅ 前端三步走:选模板/选文献 → 轮询进度(groupBy 聚合)→ 极简结果列表
|
||
❌ 无 MinerU(纯文本降级)
|
||
❌ 无 SSE 日志流
|
||
❌ 无审核抽屉
|
||
❌ 无自定义字段
|
||
❌ 无 Excel 导出
|
||
```
|
||
|
||
> **M1 的核心价值:** 散装架构天然消除了 Fan-out 的行锁争用、Last Child Wins 终点丢失等分布式 Bug。Worker 逻辑极简(只写自己),Aggregator 定时收口,第一周就能稳定跑通全链路。
|