Files
AIclinicalresearch/docs/03-业务模块/ASL-AI智能文献/04-开发计划/08a-工具3-M1-骨架管线冲刺清单.md
HaHafeng 371fa53956 docs(asl): Upgrade Tool 3 architecture from Fan-out to Scatter+Aggregator (v2.0)
Architecture transformation:
- Replace Fan-out (Manager->Child->Last Child Wins) with Scatter+Aggregator pattern
- API layer directly dispatches N independent jobs (no Manager)
- Worker only writes its own Result row, never touches Task table (zero row-lock)
- Aggregator polls groupBy for completion + zombie cleanup (replaces Sweeper)
- Reduce red lines from 13 to 9, eliminate distributed complexity

Documents updated (10 files):
- 08-Tool3 main architecture doc: v2.0 rewrite (schema, Task 2.3/2.4, red lines, risks)
- 08d-Code patterns: rewrite sections 4.1-4.6 (API dispatch, SingleWorker, Aggregator)
- 08a-M1 sprint: rewrite M1-3 core (Worker+Aggregator), red lines, acceptance criteria
- 08b-M2 sprint: simplify SSE (NOTIFY/LISTEN downgraded to P2 optional)
- 08c-M3 sprint: milestone table wording update
- New: Scatter+Polling Aggregator pattern guide v1.1 (Level 2 cookbook)
- New: V2.0 architecture deep review and gap-fix report
- Updated: ASL module status, system status, capability layer index

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-02-24 22:11:09 +08:00

193 lines
9.4 KiB
Markdown
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# M1骨架管线 — The Skeleton Pipeline
> **所属:** 工具 3 全文智能提取工作台 V2.0
> **架构总纲:** `08-工具3-全文智能提取工作台V2.0开发计划.md`
> **代码手册:** `08d-工具3-代码模式与技术规范.md`(所有代码模式均在此手册中,开发时按需查阅)
> **建议时间:** Week 15-6 天)
> **核心目标:** 证明 "API 散装派发 → Worker 单兵提取 → Aggregator 收口 → 前端轮询到 completed" 这条管线是通的。
> **异步模式指南:** `散装派发与轮询收口任务模式指南.md`Level 2 Cookbook
---
## Demo 形态
用户在前端点击按钮,系统后台静默跑完流程,前端 `useTaskStatus` 轮询到 `status = completed`,数据库能查到 JSON 提取结果。前端只需一个极简列表。
**关键妥协M1 不接 MinerU不做审核抽屉不做 SSE 日志流。**
---
## 任务清单
### M1-1Prisma 数据模型 + Migration + Seed1 天)
**做什么:**
- 新增 `AslExtractionTemplate``AslProjectTemplate``AslExtractionTask``AslExtractionResult` 四张表
- 运行 `npx prisma migrate dev --name add_extraction_template_engine`
- Seed 脚本注入 3 套系统内置模板RCT / Cohort / QC
**验收标准:**
- [ ] `npx prisma migrate deploy` 成功
- [ ] `npx prisma db seed` 后数据库有 3 套模板记录
- [ ] `AslExtractionTask``pkbKnowledgeBaseId` + `idempotencyKey @unique` 字段(无 `successCount/failedCount`
- [ ] `AslExtractionResult``snapshotStorageKey` + `snapshotFilename` 快照字段
- [ ] `AslExtractionResult``pkbDocumentId` 字段、`status``pending | extracting | completed | error`
- [ ] `AslExtractionResult``@@index([taskId, status])` 复合索引Aggregator 性能保障)
> 📖 Schema 详情见架构总纲 Task 1.1
---
### M1-2模板 API + 提取任务 API — 仅基座模板1.5 天)
**做什么:**
- `TemplateController.ts`GET 模板列表、GET 模板详情、POST 克隆到项目
- `ExtractionController.ts`
- POST 创建任务(含 DB 幂等 `idempotencyKey @unique` + P2002
- GET 任务状态(`groupBy` 聚合 Result 状态,驱动 React Query 轮询)
- GET 结果列表
- 🚀 **创建任务 = API 层散装派发(无 Manager**:锁定模板 → PKB 快照冻结 → `createMany` Result → `jobQueue.insert` N 个 `asl_extract_single` Job
**不做什么:**
- 不做自定义字段 CRUD APIM3
- 不做 SSE 端点M2
- 不做 Excel 导出M2
**验收标准:**
- [ ] `POST /api/v1/asl/extraction/tasks` 创建任务并散装派发 N 个 Job
- [ ] 重复 `idempotencyKey` 请求返回已有 taskId幂等验证
- [ ] `GET /api/v1/asl/extraction/tasks/:taskId` 返回 `groupBy` 聚合进度completedCount / errorCount / pendingCount / extractingCount
- [ ] `GET /api/v1/asl/extraction/tasks/:taskId/results` 返回提取结果列表
> 📖 端点完整列表见架构总纲 Task 1.3 + Task 2.4
> 📖 API 散装派发代码见 08d §4.2
---
### M1-3PKB ACL 防腐层 + 散装 Worker + Aggregator2 天)⚠️ 本里程碑最关键
**做什么(按顺序):**
**Step A — PKB 侧 ACL0.5 天):**
- `PkbExportService.ts`PKB 模块维护):`listKnowledgeBases()``listPdfDocuments()``getDocumentForExtraction()` 返回 DTO
- 通过依赖注入暴露给 ASL
**Step B — ASL 侧桥接0.5 天):**
- `PkbBridgeService.ts`:调用 `PkbExportService`,代理所有 PKB 数据访问
**Step C — ExtractionSingleWorker0.5 天)⚠️ 核心战役:**
- `ExtractionSingleWorker.ts` 完整逻辑:
1. **幽灵重试守卫**`updateMany({ where: { status: 'pending' }, data: { status: 'extracting' } })`
2. **纯文本降级提取**:从 PKB 读 `extractedText` + 写死 RCT Schema → 调用 DeepSeek
3. **只更新自己的 Result**`prisma.aslExtractionResult.update({ status: 'completed', extractedData })`
4. **绝不碰 Task 表**(无 `$transaction`、无 `increment`、无 Last Child Wins
5. **错误分级路由**:致命错误 → 标 error + return临时错误 → 回退 `pending` + throw
**Step D — ExtractionAggregator0.5 天):**
- `ExtractionAggregator.ts`pg-boss schedule 每 2 分钟执行
- 一人兼两职:**僵尸清理**`extracting > 30min` → error+ **收口**`pending === 0 && extracting === 0` → completed
- 使用 `groupBy` 一次查询统计所有状态
**Worker + Aggregator 注册(遵守队列命名规范):**
```
jobQueue.work('asl_extract_single', { teamConcurrency: 10 }, handler)
await jobQueue.schedule('asl_extraction_aggregator', '*/2 * * * *')
await jobQueue.work('asl_extraction_aggregator', aggregatorHandler)
```
**M1 阶段简化Worker 内部串行调 LLM不接 MinerUM2 再接)。**
**验收标准:**
- [ ] PkbExportService 能返回知识库列表和文档详情DTO
- [ ] API 创建任务后 `AslExtractionResult.snapshotStorageKey``snapshotFilename` 已填充PKB 快照验证)
- [ ] 手动删除 PKB 文档记录后Worker 仍能通过 `snapshotStorageKey` 从 OSS 获取 PDF一致性验证
- [ ] API 为 N 篇文献散装派发 N 个 `asl_extract_single` Job
- [ ] Worker 幽灵守卫正确:并发重试不会双倍处理(`lock.count === 0` 跳过)
- [ ] Worker 只写 Result 行Task 表零更新(确认无行锁争用)
- [ ] Aggregator 每 2 分钟轮询:`pending === 0 && extracting === 0` → Task `completed`
- [ ] Aggregator 僵尸清理:手动将 Result 卡在 `extracting` 超 30 分钟 → 被标为 `error`
- [ ] 致命错误PKB 文档不存在)→ 该篇标 error + 不重试 + 不阻塞其他篇
- [ ] 临时错误429→ 回退 `pending` + pg-boss 指数退避重试
- [ ] 临时错误回退后重试成功:模拟 429 → 重试 → 幽灵守卫通过 → 提取成功
- [ ] 空文献边界:`documentIds = []` → API 直接拒绝400 Bad Request
> 📖 散装架构图、Worker 代码模式见架构总纲 Task 2.3
> 📖 ACL 防腐层设计见架构总纲 Task 3.3b
> 📖 Worker / Aggregator 代码见 08d §4.3 / §4.6
> 📖 散装模式指南见 `散装派发与轮询收口任务模式指南.md`
---
### M1-4前端极简 Step 1 — 选模板 + 选 PKB 文献1 天)
**做什么:**
- `ExtractionSetup.tsx`:左栏模板下拉(只读,默认 RCT+ 右栏 PKB 知识库下拉 + 文献 Checkbox 列表
- `PkbKnowledgeBaseSelector.tsx`:调用 PKB API 加载知识库和文献
- 底部 "确认并开始提取" 按钮 → 调用 `POST /api/v1/asl/extraction/tasks`
**不做什么:**
- 不做自定义字段 UIM3
- 不做基座字段标签云展示M2 附带做)
**验收标准:**
- [ ] 能选择 PKB 知识库并展示 PDF 文档列表
- [ ] 能勾选文献并提交创建任务
- [ ] 空知识库时显示引导提示 + PKB 跳转链接
---
### M1-5前端极简 Step 2 + Step 3 — 轮询进度 + 极简列表1 天)
**做什么:**
- `ExtractionProgress.tsx``useTaskStatus` 轮询3s驱动进度条 + 检测 `completed` 跳转
- `ExtractionWorkbench.tsx`极简表格展示提取结果Study ID、状态
- `ExtractionPage.tsx`状态驱动路由pending→Step1 / processing→Step2 / completed→Step3
- 路由注册(前端 + 后端)
**不做什么:**
- 不做 SSE 日志终端M2
- 不做审核抽屉M2
- 不做 Excel 导出按钮M2
**验收标准:**
- [ ] 进度条从 0% 推进到 100%React Query 轮询驱动)
- [ ] `status = completed` 后自动跳转到 Step 3
- [ ] Step 3 能看到提取结果列表(状态列展示 completed/error
- [ ] 关闭浏览器重新打开 → 恢复到正确步骤(断点恢复)
---
## M1 研发红线(全员必须背诵)
| # | 红线 | 违反后果 |
|---|------|---------|
| 1 | 队列名用下划线(`asl_extract_single`),禁止点号 | pg-boss 路由截断 |
| 2 | **Worker 绝不碰 Task 表**,只写自己的 Result 行 | 行锁争用 / 死锁 |
| 3 | Worker 用 `updateMany({ status: 'pending' })` 幽灵守卫,禁止 `findUnique → if` | 并发穿透LLM 费用白烧 |
| 4 | 临时错误 `throw` 前必须 `update({ status: 'pending' })` 回退 | 重试被幽灵守卫误跳过 |
| 5 | `teamConcurrency: 10`,禁止无限拉取 Job | Node.js OOM |
| 6 | Job Payload 仅传 ID< 200 bytes禁止塞 PDF 正文 | pg-boss 阻塞 |
| 7 | ACL 防腐层ASL 不 import PKB 内部类型 | 模块耦合蔓延 |
| 8 | API 层快照 `snapshotStorageKey` + `snapshotFilename`Worker 禁止运行时回查 PKB | PKB 删文档 → 批量崩溃 |
| 9 | API 创建任务用 `idempotencyKey @unique` + P2002禁止 Read-then-Write | 并发穿透创建重复任务 |
---
## M1 结束时的状态
```
✅ Prisma 表 + 3 套 Seed 模板(含 idempotencyKey @unique
✅ PKB ACL 防腐层 → PkbExportService + PkbBridgeService
✅ 散装派发全链路API 散装 → N × Worker → Aggregator 收口 → completed
✅ 幽灵守卫 + 错误分级路由 + Aggregator 僵尸清理 — 所有并发 Bug 已验证
✅ API 层 DB 幂等 + PKB 快照冻结
✅ 前端三步走:选模板/选文献 → 轮询进度groupBy 聚合)→ 极简结果列表
❌ 无 MinerU纯文本降级
❌ 无 SSE 日志流
❌ 无审核抽屉
❌ 无自定义字段
❌ 无 Excel 导出
```
> **M1 的核心价值:** 散装架构天然消除了 Fan-out 的行锁争用、Last Child Wins 终点丢失等分布式 Bug。Worker 逻辑极简只写自己Aggregator 定时收口,第一周就能稳定跑通全链路。