Features - User Management (Phase 4.1): - Database: Add user_modules table for fine-grained module permissions - Database: Add 4 user permissions (view/create/edit/delete) to role_permissions - Backend: UserService (780 lines) - CRUD with tenant isolation - Backend: UserController + UserRoutes (648 lines) - 13 API endpoints - Backend: Batch import users from Excel - Frontend: UserListPage (412 lines) - list/filter/search/pagination - Frontend: UserFormPage (341 lines) - create/edit with module config - Frontend: UserDetailPage (393 lines) - details/tenant/module management - Frontend: 3 modal components (592 lines) - import/assign/configure - API: GET/POST/PUT/DELETE /api/admin/users/* endpoints Architecture Upgrade - Module Permission System: - Backend: Add getUserModules() method in auth.service - Backend: Login API returns modules array in user object - Frontend: AuthContext adds hasModule() method - Frontend: Navigation filters modules based on user.modules - Frontend: RouteGuard checks requiredModule instead of requiredVersion - Frontend: Remove deprecated version-based permission system - UX: Only show accessible modules in navigation (clean UI) - UX: Smart redirect after login (avoid 403 for regular users) Fixes: - Fix UTF-8 encoding corruption in ~100 docs files - Fix pageSize type conversion in userService (String to Number) - Fix authUser undefined error in TopNavigation - Fix login redirect logic with role-based access check - Update Git commit guidelines v1.2 with UTF-8 safety rules Database Changes: - CREATE TABLE user_modules (user_id, tenant_id, module_code, is_enabled) - ADD UNIQUE CONSTRAINT (user_id, tenant_id, module_code) - INSERT 4 permissions + role assignments - UPDATE PUBLIC tenant with 8 module subscriptions Technical: - Backend: 5 new files (~2400 lines) - Frontend: 10 new files (~2500 lines) - Docs: 1 development record + 2 status updates + 1 guideline update - Total: ~4900 lines of code Status: User management 100% complete, module permission system operational
2817 lines
82 KiB
Markdown
2817 lines
82 KiB
Markdown
# Postgres-Only 架构改造实施计划(完整版)
|
||
|
||
> **文档版本:** V2.0 ✨ **已更新**
|
||
> **创建日期:** 2025-12-13
|
||
> **更新日期:** 2025-12-13
|
||
> **目标完成时间:** 2025-12-20(7天)
|
||
> **负责人:** 技术团队
|
||
> **风险等级:** 🟢 低(基于成熟方案,有降级策略)
|
||
> **核心原则:** 通用能力层优先,复用架构优势
|
||
|
||
---
|
||
|
||
## ⚠️ **V2.0 更新说明**
|
||
|
||
本版本基于**真实代码结构验证**和**技术方案深度分析**进行了重要更新:
|
||
|
||
### **更新1:代码结构真实性验证** ✅
|
||
- ✅ 已从实际代码验证所有路径和文件
|
||
- ✅ 明确标注"已存在"、"占位符"、"需新增"
|
||
- ✅ RedisCacheAdapter确认为未实现的占位符
|
||
- ✅ 所有改造点基于真实代码结构
|
||
|
||
### **更新2:长任务可靠性策略** 🔴
|
||
- ✅ 新增:**断点续传机制**(策略2)- 强烈推荐
|
||
- ✅ 新增:**任务拆分策略**(策略3)- 架构级优化
|
||
- ✅ 分析:心跳续租机制(策略1)- 不推荐实施
|
||
- ✅ 明确:pg-boss的技术限制(最长4小时,推荐2小时)
|
||
- ✅ 修正:24小时连续任务不支持,需拆分
|
||
|
||
### **更新3:实施计划调整**
|
||
- ✅ 工作量增加到9天(增加任务拆分和断点续传)
|
||
- ✅ 完整的代码示例(可直接使用)
|
||
- ✅ 数据库Schema更新(新增字段)
|
||
|
||
---
|
||
|
||
## 📋 目录
|
||
|
||
1. [改造背景与目标](#1-改造背景与目标)
|
||
2. [当前系统分析](#2-当前系统分析)
|
||
3. [改造总体架构](#3-改造总体架构)
|
||
4. [详细实施步骤](#4-详细实施步骤)
|
||
5. [优先级与依赖关系](#5-优先级与依赖关系)
|
||
6. [测试验证方案](#6-测试验证方案)
|
||
7. [上线与回滚](#7-上线与回滚)
|
||
|
||
---
|
||
|
||
## 1. 改造背景与目标
|
||
|
||
### 1.1 为什么选择Postgres-Only?
|
||
|
||
```
|
||
核心痛点:
|
||
❌ 长任务不可靠(2小时任务实例销毁丢失)
|
||
❌ LLM成本失控(缓存不持久化)
|
||
❌ 多实例不同步(内存缓存各自独立)
|
||
|
||
技术方案选择:
|
||
✅ Postgres-Only(推荐)
|
||
- 架构简单(1-2人团队)
|
||
- 运维成本低(复用RDS)
|
||
- 数据一致性强(事务保证)
|
||
- 节省成本(¥1000+/年)
|
||
|
||
❌ Redis方案(不推荐)
|
||
- 架构复杂(双系统)
|
||
- 运维负担重(需维护Redis)
|
||
- 数据一致性弱(双写问题)
|
||
- 额外成本(¥1000+/年)
|
||
```
|
||
|
||
### 1.2 改造目标
|
||
|
||
| 目标 | 当前状态 | 改造后 | 衡量指标 |
|
||
|------|---------|--------|---------|
|
||
| **长任务可靠性** | 5-10%(实例销毁丢失) | > 99% | 2小时任务成功率 |
|
||
| **LLM成本** | 重复调用多次 | 降低50%+ | 月度API费用 |
|
||
| **缓存命中率** | 0%(不持久) | 60%+ | 监控统计 |
|
||
| **多实例同步** | ❌ 各自独立 | ✅ 共享缓存 | 实例A写→实例B读 |
|
||
| **架构复杂度** | 中等 | 低 | 中间件数量 |
|
||
|
||
---
|
||
|
||
## 2. 当前系统分析
|
||
|
||
### 2.1 代码结构现状(3层架构)- ✅ **已验证真实代码**
|
||
|
||
> **验证方法:** 通过 `list_dir` 和 `read_file` 实际检查代码库
|
||
> **验证日期:** 2025-12-13
|
||
> **验证结果:** 所有路径和文件状态均已确认
|
||
|
||
```
|
||
AIclinicalresearch/backend/src/
|
||
├── common/ # 🔵 通用能力层
|
||
│ ├── cache/ # ✅ 真实存在
|
||
│ │ ├── CacheAdapter.ts # ✅ 接口定义(真实)
|
||
│ │ ├── CacheFactory.ts # ✅ 工厂模式(真实)
|
||
│ │ ├── index.ts # ✅ 导出文件(真实)
|
||
│ │ ├── MemoryCacheAdapter.ts # ✅ 内存实现(真实,已完成)
|
||
│ │ ├── RedisCacheAdapter.ts # 🔴 占位符(真实存在,但未实现)
|
||
│ │ └── PostgresCacheAdapter.ts # ❌ 需新增(本次改造)
|
||
│ │
|
||
│ ├── jobs/ # ✅ 真实存在
|
||
│ │ ├── types.ts # ✅ 接口定义(真实)
|
||
│ │ ├── JobFactory.ts # ✅ 工厂模式(真实)
|
||
│ │ ├── index.ts # ✅ 导出文件(真实)
|
||
│ │ ├── MemoryQueue.ts # ✅ 内存实现(真实,已完成)
|
||
│ │ └── PgBossQueue.ts # ❌ 需新增(本次改造)
|
||
│ │
|
||
│ ├── storage/ # ✅ 存储服务(已完善)
|
||
│ ├── logging/ # ✅ 日志系统(已完善)
|
||
│ └── llm/ # ✅ LLM网关(已完善)
|
||
│
|
||
├── modules/ # 🟢 业务模块层
|
||
│ ├── asl/ # ✅ 真实存在
|
||
│ │ ├── services/
|
||
│ │ │ ├── screeningService.ts # ⚠️ 需改造:改为队列(真实)
|
||
│ │ │ └── llmScreeningService.ts # ✅ 真实存在
|
||
│ │ └── common/llm/
|
||
│ │ └── LLM12FieldsService.ts # ✅ 已用缓存(真实,第516行)
|
||
│ │
|
||
│ └── dc/ # ✅ 真实存在
|
||
│ └── tool-b/services/
|
||
│ └── HealthCheckService.ts # ✅ 已用缓存(真实,第47行)
|
||
│
|
||
└── config/ # 🔵 平台基础层
|
||
├── database.ts # ✅ Prisma配置(真实)
|
||
└── env.ts # ⚠️ 需添加新环境变量(真实文件)
|
||
```
|
||
|
||
**重要发现:**
|
||
|
||
1. **✅ 3层架构真实存在** - 代码组织完全符合设计
|
||
2. **✅ 工厂模式已实现** - CacheFactory和JobFactory真实可用
|
||
3. **🔴 RedisCacheAdapter是占位符** - 所有方法都 `throw new Error('Not implemented')`
|
||
4. **✅ 业务层已使用cache接口** - 改造时业务代码无需修改
|
||
5. **❌ PostgresCacheAdapter和PgBossQueue需新增** - 本次改造的核心工作
|
||
|
||
### 2.2 缓存使用现状(✅ 已验证)
|
||
|
||
| 位置 | 用途 | TTL | 数据量 | 重要性 | 代码行号 | 当前实现 |
|
||
|------|------|-----|--------|--------|---------|---------|
|
||
| **LLM12FieldsService.ts** | LLM 12字段提取缓存 | 1小时 | ~50KB/项 | 🔴 高(成本) | 第516行 | ✅ 已用cache |
|
||
| **HealthCheckService.ts** | Excel健康检查缓存 | 24小时 | ~5KB/项 | 🟡 中等 | 第47行 | ✅ 已用cache |
|
||
|
||
**代码示例(真实代码):**
|
||
|
||
```typescript
|
||
// backend/src/modules/asl/common/llm/LLM12FieldsService.ts (第516行)
|
||
const cached = await cache.get(cacheKey);
|
||
if (cached) {
|
||
logger.info('缓存命中', { cacheKey });
|
||
return cached;
|
||
}
|
||
// ... LLM调用 ...
|
||
await cache.set(cacheKey, JSON.stringify(result), 3600); // 1小时
|
||
|
||
// backend/src/modules/dc/tool-b/services/HealthCheckService.ts (第47行)
|
||
const cached = await cache.get<HealthCheckResult>(cacheKey);
|
||
if (cached) return cached;
|
||
// ... Excel解析 ...
|
||
await cache.set(cacheKey, result, 86400); // 24小时
|
||
```
|
||
|
||
**结论**:✅ 缓存系统已在使用,只需切换底层实现(Memory → Postgres)。
|
||
|
||
### 2.3 队列使用现状(✅ 已验证)
|
||
|
||
| 位置 | 用途 | 代码行号 | 耗时 | 重要性 | 当前实现 | 问题 |
|
||
|------|------|---------|------|--------|---------|------|
|
||
| **screeningService.ts** | 文献筛选任务 | 第65行 | 2小时(1000篇) | 🔴 高 | ❌ 同步执行 | 实例销毁丢失 |
|
||
| **DC Tool B** | 病历批量提取 | 未实现 | 1-3小时 | 🔴 高 | ❌ 未实现 | 不支持长任务 |
|
||
|
||
**代码示例(真实代码):**
|
||
|
||
```typescript
|
||
// backend/src/modules/asl/services/screeningService.ts (第65行)
|
||
// 4. 异步处理文献(简化版:直接在这里处理)
|
||
// 生产环境应该发送到消息队列
|
||
processLiteraturesInBackground(task.id, projectId, literatures); // ← 同步执行,有风险
|
||
```
|
||
|
||
**结论**:❌ 队列系统尚未使用,需要优先改造。注释已提示"生产环境应该发送到消息队列"。
|
||
|
||
### 2.4 长任务可靠性分析 🔴 **新增**
|
||
|
||
#### **当前问题(真实场景)**
|
||
|
||
```
|
||
场景1:1000篇文献筛选(约2小时)
|
||
├─ 问题1:同步执行,阻塞HTTP请求
|
||
├─ 问题2:SAE实例15分钟无流量自动缩容
|
||
├─ 问题3:实例重启,任务从头开始
|
||
└─ 结果:任务成功率 < 10%,用户需重复提交3-5次
|
||
|
||
场景2:10000篇文献筛选(约20小时)
|
||
├─ 问题1:超过pg-boss最大锁定时间(4小时)
|
||
├─ 问题2:任务被重复领取,造成重复处理
|
||
└─ 结果:任务失败率 100%
|
||
|
||
场景3:发布更新(15:00)
|
||
├─ 问题1:正在执行的任务被强制终止
|
||
├─ 问题2:已处理的文献结果丢失
|
||
└─ 结果:用户体验极差
|
||
```
|
||
|
||
#### **技术限制分析**
|
||
|
||
| 技术栈 | 限制 | 影响 |
|
||
|--------|------|------|
|
||
| **SAE** | 15分钟无流量自动缩容 | 长任务必然失败 |
|
||
| **pg-boss** | 最长锁定4小时(推荐2小时) | 超长任务不支持 |
|
||
| **HTTP请求** | 最长30秒超时 | 不能同步执行长任务 |
|
||
| **实例重启** | 内存状态丢失 | 任务进度丢失 |
|
||
|
||
#### **解决方案评估**
|
||
|
||
| 策略 | 价值 | 难度 | pg-boss支持 | 推荐度 | 实施 |
|
||
|------|------|------|------------|--------|------|
|
||
| **策略1:心跳续租** | ⭐⭐⭐⭐ | 🔴 高 | ❌ 不支持 | 🟡 不推荐 | 暂不实施 |
|
||
| **策略2:断点续传** | ⭐⭐⭐⭐⭐ | 🟢 低 | ✅ 兼容 | 🟢 强烈推荐 | **Phase 6** |
|
||
| **策略3:任务拆分** | ⭐⭐⭐⭐⭐ | 🟡 中 | ✅ 原生支持 | 🟢 强烈推荐 | **Phase 5** |
|
||
|
||
**结论**:采用**策略2(断点续传)+ 策略3(任务拆分)**组合方案。
|
||
|
||
---
|
||
|
||
## 3. 改造总体架构
|
||
|
||
### 3.1 架构对比
|
||
|
||
```
|
||
改造前(当前):
|
||
┌─────────────────────────────────────┐
|
||
│ Business Layer (ASL, DC, SSA...) │
|
||
│ ├─ screeningService.ts │
|
||
│ │ └─ processInBackground() │ ← 同步执行(阻塞)
|
||
│ └─ LLM12FieldsService.ts │
|
||
│ └─ cache.get/set() │ ← 使用Memory缓存
|
||
└─────────────────────────────────────┘
|
||
↓ 使用
|
||
┌─────────────────────────────────────┐
|
||
│ Capability Layer (Common) │
|
||
│ ├─ cache │
|
||
│ │ ├─ MemoryCacheAdapter ✅ │ ← 当前使用
|
||
│ │ └─ RedisCacheAdapter 🔴 │ ← 占位符(未实现)
|
||
│ └─ jobs │
|
||
│ └─ MemoryQueue ✅ │ ← 当前使用
|
||
└─────────────────────────────────────┘
|
||
↓ 依赖
|
||
┌─────────────────────────────────────┐
|
||
│ Platform Layer │
|
||
│ └─ PostgreSQL (业务数据) │
|
||
└─────────────────────────────────────┘
|
||
|
||
问题:
|
||
❌ 缓存不持久(实例重启丢失)
|
||
❌ 队列不持久(任务丢失)
|
||
❌ 多实例不共享(各自独立)
|
||
❌ 长任务不可靠(2小时任务失败率 > 90%)
|
||
❌ 违反Serverless原则(同步执行长任务)
|
||
```
|
||
|
||
```
|
||
改造后(Postgres-Only + 任务拆分 + 断点续传):
|
||
┌──────────────────────────────────────────────────────────────┐
|
||
│ Business Layer (ASL, DC, SSA...) │
|
||
│ ├─ screeningService.ts │
|
||
│ │ ├─ startScreeningTask() │ ← 改造点1:任务拆分 │
|
||
│ │ │ └─ 推送N个批次到队列 │ │
|
||
│ │ └─ registerWorkers() │ │
|
||
│ │ └─ 处理单个批次+断点续传 │ ← 改造点2:断点续传 │
|
||
│ └─ LLM12FieldsService.ts │
|
||
│ └─ cache.get/set() │ ← 无需改动(接口不变)│
|
||
└──────────────────────────────────────────────────────────────┘
|
||
↓ 使用
|
||
┌──────────────────────────────────────────────────────────────┐
|
||
│ Capability Layer (Common) │
|
||
│ ├─ cache │
|
||
│ │ ├─ MemoryCacheAdapter ✅ │ │
|
||
│ │ ├─ PostgresCacheAdapter ✨ │ ← 新增(300行) │
|
||
│ │ └─ CacheFactory │ ← 更新支持postgres │
|
||
│ └─ jobs │
|
||
│ ├─ MemoryQueue ✅ │ │
|
||
│ ├─ PgBossQueue ✨ │ ← 新增(400行) │
|
||
│ └─ JobFactory │ ← 更新支持pgboss │
|
||
└──────────────────────────────────────────────────────────────┘
|
||
↓ 依赖
|
||
┌──────────────────────────────────────────────────────────────┐
|
||
│ Platform Layer │
|
||
│ ├─ PostgreSQL (RDS) │
|
||
│ │ ├─ 业务数据(asl_*, dc_*, ...) │
|
||
│ │ ├─ platform.app_cache ✨ │ ← 缓存表(新增) │
|
||
│ │ └─ platform.job_* ✨ │ ← 队列表(pg-boss自动)│
|
||
│ └─ 阿里云RDS特性 │
|
||
│ ├─ 自动备份(每天) │
|
||
│ ├─ PITR(时间点恢复) │
|
||
│ └─ 高可用(主从切换) │
|
||
└──────────────────────────────────────────────────────────────┘
|
||
|
||
优势:
|
||
✅ 缓存持久化(RDS自动备份)
|
||
✅ 队列持久化(任务不丢失)
|
||
✅ 多实例共享(SKIP LOCKED)
|
||
✅ 事务一致性(业务+任务原子提交)
|
||
✅ 零额外运维(复用RDS)
|
||
✅ 长任务可靠(拆分+断点,成功率 > 99%)
|
||
✅ 符合Serverless(短任务,可中断恢复)
|
||
```
|
||
|
||
### 3.2 任务拆分策略 ✨ **新增**
|
||
|
||
#### **拆分原则**
|
||
|
||
```
|
||
目标:每个任务 < 15分钟(远低于pg-boss 4小时限制)
|
||
|
||
原因:
|
||
1. SAE实例15分钟无流量自动缩容
|
||
2. 失败重试成本低(只需重试15分钟,不是2小时)
|
||
3. 进度可见性高(用户体验好)
|
||
4. 可并行处理(多Worker同时工作)
|
||
```
|
||
|
||
#### **拆分策略表**
|
||
|
||
| 任务类型 | 单项耗时 | 推荐批次大小 | 批次耗时 | 并发能力 |
|
||
|---------|---------|------------|---------|---------|
|
||
| **ASL文献筛选** | 7.2秒/篇 | 100篇 | 12分钟 | 10批并行 |
|
||
| **DC病历提取** | 10秒/份 | 50份 | 8.3分钟 | 10批并行 |
|
||
| **统计分析** | 0.1秒/条 | 5000条 | 8.3分钟 | 20批并行 |
|
||
|
||
#### **实际效果对比**
|
||
|
||
```
|
||
场景:10000篇文献筛选
|
||
|
||
不拆分(错误):
|
||
├─ 1个任务,20小时
|
||
├─ 超过pg-boss 4小时限制 → 失败
|
||
└─ 成功率:0%
|
||
|
||
拆分(正确):
|
||
├─ 100个批次,每批100篇,12分钟
|
||
├─ 10个Worker并行处理
|
||
├─ 总耗时:100 / 10 = 10批轮次 × 12分钟 = 2小时
|
||
└─ 成功率:> 99.5%(单批失败只需重试12分钟)
|
||
```
|
||
|
||
### 3.3 断点续传机制 ✨ **新增**
|
||
|
||
#### **核心思想**
|
||
|
||
```
|
||
问题:SAE实例随时可能重启(发布更新、自动缩容)
|
||
|
||
无断点:
|
||
├─ 处理到第900篇 → 实例重启
|
||
├─ 重新开始,从第1篇开始
|
||
└─ 浪费时间:900 × 7.2秒 = 108分钟
|
||
|
||
有断点:
|
||
├─ 每处理10篇,保存进度到数据库
|
||
├─ 处理到第900篇 → 实例重启
|
||
├─ 重新开始,从第900篇继续
|
||
└─ 浪费时间:< 1分钟
|
||
```
|
||
|
||
#### **实现策略**
|
||
|
||
```typescript
|
||
// 数据库记录进度
|
||
model AslScreeningTask {
|
||
processedItems Int // 已处理数量
|
||
currentIndex Int // 当前游标(断点)
|
||
lastCheckpoint DateTime // 最后一次保存时间
|
||
checkpointData Json // 断点详细数据
|
||
}
|
||
|
||
// Worker读取断点
|
||
const startIndex = task.currentIndex || 0;
|
||
for (let i = startIndex; i < items.length; i++) {
|
||
await processItem(items[i]);
|
||
|
||
// 每处理10项,保存断点
|
||
if ((i + 1) % 10 === 0) {
|
||
await saveCheckpoint(i + 1);
|
||
}
|
||
}
|
||
```
|
||
|
||
#### **保存频率权衡**
|
||
|
||
| 频率 | 数据库写入 | 重启浪费时间 | 推荐 |
|
||
|------|-----------|------------|------|
|
||
| 每1项 | 很高(性能差) | < 10秒 | ❌ 不推荐 |
|
||
| 每10项 | 中等 | < 2分钟 | ✅ 推荐 |
|
||
| 每100项 | 低 | < 12分钟 | 🟡 可选 |
|
||
| 不保存 | 无 | 重头开始 | ❌ 不可接受 |
|
||
|
||
### 3.4 Schema设计(统一在platform)✨ **已更新**
|
||
|
||
```prisma
|
||
// prisma/schema.prisma
|
||
|
||
datasource db {
|
||
provider = "postgresql"
|
||
url = env("DATABASE_URL")
|
||
schemas = ["platform_schema", "aia_schema", "pkb_schema", "asl_schema",
|
||
"dc_schema", "ssa_schema", "st_schema", "rvw_schema",
|
||
"admin_schema", "common_schema", "public"]
|
||
}
|
||
|
||
generator client {
|
||
provider = "prisma-client-js"
|
||
previewFeatures = ["multiSchema"] // ✨ 启用多Schema支持
|
||
}
|
||
|
||
// ==================== 平台基础设施(platform_schema)====================
|
||
|
||
/// 应用缓存表(替代Redis)
|
||
model AppCache {
|
||
id Int @id @default(autoincrement())
|
||
key String @unique @db.VarChar(500)
|
||
value Json
|
||
expiresAt DateTime @map("expires_at")
|
||
createdAt DateTime @default(now()) @map("created_at")
|
||
|
||
@@index([expiresAt], name: "idx_app_cache_expires")
|
||
@@index([key, expiresAt], name: "idx_app_cache_key_expires")
|
||
@@map("app_cache")
|
||
@@schema("platform_schema") // ✨ 统一在platform Schema
|
||
}
|
||
|
||
// pg-boss会自动创建任务表(不需要在Prisma中定义)
|
||
// 表名:platform_schema.job, platform_schema.version 等
|
||
|
||
// ==================== 业务模块(asl_schema)====================
|
||
|
||
/// ASL筛选任务表(✨ 需要新增字段支持拆分+断点)
|
||
model AslScreeningTask {
|
||
id String @id @default(uuid())
|
||
projectId String @map("project_id")
|
||
taskType String @map("task_type")
|
||
status String // pending/running/completed/failed
|
||
totalItems Int @map("total_items")
|
||
processedItems Int @default(0) @map("processed_items")
|
||
successItems Int @default(0) @map("success_items")
|
||
failedItems Int @default(0) @map("failed_items")
|
||
conflictItems Int @default(0) @map("conflict_items")
|
||
|
||
// ✨ 新增:任务拆分支持
|
||
totalBatches Int @default(1) @map("total_batches")
|
||
processedBatches Int @default(0) @map("processed_batches")
|
||
currentBatchIndex Int @default(0) @map("current_batch_index")
|
||
|
||
// ✨ 新增:断点续传支持
|
||
currentIndex Int @default(0) @map("current_index")
|
||
lastCheckpoint DateTime? @map("last_checkpoint")
|
||
checkpointData Json? @map("checkpoint_data")
|
||
|
||
startedAt DateTime @map("started_at")
|
||
completedAt DateTime? @map("completed_at")
|
||
createdAt DateTime @default(now()) @map("created_at")
|
||
updatedAt DateTime @updatedAt @map("updated_at")
|
||
|
||
project AslScreeningProject @relation(fields: [projectId], references: [id])
|
||
results AslScreeningResult[]
|
||
|
||
@@index([projectId])
|
||
@@index([status])
|
||
@@map("asl_screening_tasks")
|
||
@@schema("asl_schema")
|
||
}
|
||
```
|
||
|
||
**新增字段说明:**
|
||
|
||
| 字段 | 类型 | 用途 | 示例值 |
|
||
|------|------|------|--------|
|
||
| **totalBatches** | Int | 总批次数 | 10(1000篇÷100篇/批) |
|
||
| **processedBatches** | Int | 已完成批次数 | 3(已完成3批) |
|
||
| **currentBatchIndex** | Int | 当前批次索引 | 3(正在处理第4批) |
|
||
| **currentIndex** | Int | 当前项索引(断点) | 350(已处理350篇) |
|
||
| **lastCheckpoint** | DateTime | 最后一次保存断点时间 | 2025-12-13 10:30:00 |
|
||
| **checkpointData** | Json | 断点详细数据 | `{"lastProcessedId": "lit_123", "batchProgress": 0.35}` |
|
||
|
||
### 3.5 Key/Queue命名规范
|
||
|
||
```typescript
|
||
// 缓存Key规范(逻辑隔离)
|
||
const CACHE_KEY_PATTERNS = {
|
||
// ASL模块
|
||
'asl:llm:{hash}': 'LLM提取结果',
|
||
'asl:pdf:{fileId}': 'PDF解析结果',
|
||
|
||
// DC模块
|
||
'dc:health:{fileHash}': 'Excel健康检查',
|
||
'dc:extraction:{recordId}': '病历提取结果',
|
||
|
||
// 全局
|
||
'session:{userId}': '用户Session',
|
||
'config:{key}': '系统配置',
|
||
};
|
||
|
||
// 队列Name规范
|
||
const QUEUE_NAMES = {
|
||
ASL_TITLE_SCREENING: 'asl:title-screening',
|
||
ASL_FULLTEXT_SCREENING: 'asl:fulltext-screening',
|
||
DC_MEDICAL_EXTRACTION: 'dc:medical-extraction',
|
||
DC_DATA_CLEANING: 'dc:data-cleaning',
|
||
SSA_STATISTICAL_ANALYSIS: 'ssa:statistical-analysis',
|
||
};
|
||
```
|
||
|
||
---
|
||
|
||
## 4. 详细实施步骤
|
||
|
||
### 4.1 Phase 1:环境准备(Day 1上午,0.5天)
|
||
|
||
#### **任务1.1:更新Prisma Schema**
|
||
|
||
```bash
|
||
# 文件:prisma/schema.prisma
|
||
```
|
||
|
||
**修改点1:启用multiSchema**
|
||
```prisma
|
||
generator client {
|
||
provider = "prisma-client-js"
|
||
previewFeatures = ["multiSchema"] // ← 新增
|
||
}
|
||
```
|
||
|
||
**修改点2:添加AppCache模型**
|
||
```prisma
|
||
/// 应用缓存表(Postgres-Only架构)
|
||
model AppCache {
|
||
id Int @id @default(autoincrement())
|
||
key String @unique @db.VarChar(500)
|
||
value Json
|
||
expiresAt DateTime @map("expires_at")
|
||
createdAt DateTime @default(now()) @map("created_at")
|
||
|
||
@@index([expiresAt], name: "idx_app_cache_expires")
|
||
@@index([key, expiresAt], name: "idx_app_cache_key_expires")
|
||
@@map("app_cache")
|
||
@@schema("platform_schema")
|
||
}
|
||
```
|
||
|
||
**执行迁移**
|
||
```bash
|
||
cd backend
|
||
|
||
# 1. 生成迁移文件
|
||
npx prisma migrate dev --name add_postgres_cache
|
||
|
||
# 2. 生成Prisma Client
|
||
npx prisma generate
|
||
|
||
# 3. 查看生成的SQL
|
||
cat prisma/migrations/*/migration.sql
|
||
```
|
||
|
||
**验证结果**
|
||
```sql
|
||
-- 应该看到以下SQL
|
||
CREATE TABLE "platform_schema"."app_cache" (
|
||
"id" SERIAL PRIMARY KEY,
|
||
"key" VARCHAR(500) UNIQUE NOT NULL,
|
||
"value" JSONB NOT NULL,
|
||
"expires_at" TIMESTAMP NOT NULL,
|
||
"created_at" TIMESTAMP DEFAULT NOW()
|
||
);
|
||
|
||
CREATE INDEX "idx_app_cache_expires" ON "platform_schema"."app_cache"("expires_at");
|
||
CREATE INDEX "idx_app_cache_key_expires" ON "platform_schema"."app_cache"("key", "expires_at");
|
||
```
|
||
|
||
#### **任务1.2:安装依赖**
|
||
|
||
```bash
|
||
cd backend
|
||
|
||
# 安装pg-boss(任务队列)
|
||
npm install pg-boss --save
|
||
|
||
# 查看版本
|
||
npm list pg-boss
|
||
# 应显示:pg-boss@10.x.x
|
||
```
|
||
|
||
#### **任务1.3:更新环境变量配置**
|
||
|
||
```typescript
|
||
// 文件:backend/src/config/env.ts
|
||
|
||
import { z } from 'zod';
|
||
|
||
const envSchema = z.object({
|
||
// ... 现有配置 ...
|
||
|
||
// ==================== 缓存配置 ====================
|
||
CACHE_TYPE: z.enum(['memory', 'postgres']).default('memory'), // ← 新增postgres选项
|
||
|
||
// ==================== 队列配置 ====================
|
||
QUEUE_TYPE: z.enum(['memory', 'pgboss']).default('memory'), // ← 新增pgboss选项
|
||
|
||
// ==================== 数据库配置 ====================
|
||
DATABASE_URL: z.string(),
|
||
});
|
||
|
||
export const config = {
|
||
// ... 现有配置 ...
|
||
|
||
// 缓存配置
|
||
cacheType: process.env.CACHE_TYPE || 'memory',
|
||
|
||
// 队列配置
|
||
queueType: process.env.QUEUE_TYPE || 'memory',
|
||
|
||
// 数据库URL
|
||
databaseUrl: process.env.DATABASE_URL,
|
||
};
|
||
```
|
||
|
||
```bash
|
||
# 文件:backend/.env
|
||
|
||
# ==================== 缓存配置 ====================
|
||
CACHE_TYPE=postgres # memory | postgres
|
||
|
||
# ==================== 队列配置 ====================
|
||
QUEUE_TYPE=pgboss # memory | pgboss
|
||
|
||
# ==================== 数据库配置 ====================
|
||
DATABASE_URL=postgresql://user:password@localhost:5432/aiclincial?schema=public
|
||
```
|
||
|
||
---
|
||
|
||
### 4.2 Phase 2:实现PostgresCacheAdapter(Day 1下午,0.5天)
|
||
|
||
#### **任务2.1:创建PostgresCacheAdapter**
|
||
|
||
```typescript
|
||
// 文件:backend/src/common/cache/PostgresCacheAdapter.ts
|
||
|
||
import { prisma } from '../../config/database.js';
|
||
import type { CacheAdapter } from './CacheAdapter.js';
|
||
import { logger } from '../logging/index.js';
|
||
|
||
/**
|
||
* Postgres缓存适配器
|
||
*
|
||
* 核心特性:
|
||
* - 持久化存储(实例重启不丢失)
|
||
* - 多实例共享(通过数据库)
|
||
* - 懒惰删除(读取时清理过期数据)
|
||
* - 自动清理(定时任务删除过期数据)
|
||
*/
|
||
export class PostgresCacheAdapter implements CacheAdapter {
|
||
|
||
/**
|
||
* 获取缓存(带过期检查和懒惰删除)
|
||
*/
|
||
async get<T = any>(key: string): Promise<T | null> {
|
||
try {
|
||
const record = await prisma.appCache.findUnique({
|
||
where: { key }
|
||
});
|
||
|
||
if (!record) {
|
||
return null;
|
||
}
|
||
|
||
// 检查是否过期
|
||
if (record.expiresAt < new Date()) {
|
||
// 懒惰删除:异步删除,不阻塞主流程
|
||
this.deleteAsync(key);
|
||
return null;
|
||
}
|
||
|
||
logger.debug('[PostgresCache] 缓存命中', { key });
|
||
return record.value as T;
|
||
|
||
} catch (error) {
|
||
logger.error('[PostgresCache] 读取失败', { key, error });
|
||
return null;
|
||
}
|
||
}
|
||
|
||
/**
|
||
* 设置缓存
|
||
*/
|
||
async set(key: string, value: any, ttlSeconds: number = 3600): Promise<void> {
|
||
try {
|
||
const expiresAt = new Date(Date.now() + ttlSeconds * 1000);
|
||
|
||
await prisma.appCache.upsert({
|
||
where: { key },
|
||
create: {
|
||
key,
|
||
value: value as any, // Prisma会自动处理JSON
|
||
expiresAt,
|
||
},
|
||
update: {
|
||
value: value as any,
|
||
expiresAt,
|
||
}
|
||
});
|
||
|
||
logger.debug('[PostgresCache] 缓存写入', { key, ttl: ttlSeconds });
|
||
|
||
} catch (error) {
|
||
logger.error('[PostgresCache] 写入失败', { key, error });
|
||
throw error;
|
||
}
|
||
}
|
||
|
||
/**
|
||
* 删除缓存
|
||
*/
|
||
async delete(key: string): Promise<boolean> {
|
||
try {
|
||
await prisma.appCache.delete({
|
||
where: { key }
|
||
});
|
||
|
||
logger.debug('[PostgresCache] 缓存删除', { key });
|
||
return true;
|
||
|
||
} catch (error) {
|
||
// 如果key不存在,Prisma会抛出错误
|
||
if ((error as any)?.code === 'P2025') {
|
||
return false;
|
||
}
|
||
logger.error('[PostgresCache] 删除失败', { key, error });
|
||
return false;
|
||
}
|
||
}
|
||
|
||
/**
|
||
* 异步删除(不阻塞主流程)
|
||
*/
|
||
private deleteAsync(key: string): void {
|
||
prisma.appCache.delete({ where: { key } })
|
||
.catch(err => {
|
||
// 静默失败(可能已被其他实例删除)
|
||
logger.debug('[PostgresCache] 懒惰删除失败', { key, err });
|
||
});
|
||
}
|
||
|
||
/**
|
||
* 批量删除(支持模式匹配)
|
||
*/
|
||
async deleteMany(pattern: string): Promise<number> {
|
||
try {
|
||
const result = await prisma.appCache.deleteMany({
|
||
where: {
|
||
key: {
|
||
contains: pattern
|
||
}
|
||
}
|
||
});
|
||
|
||
logger.info('[PostgresCache] 批量删除', { pattern, count: result.count });
|
||
return result.count;
|
||
|
||
} catch (error) {
|
||
logger.error('[PostgresCache] 批量删除失败', { pattern, error });
|
||
return 0;
|
||
}
|
||
}
|
||
|
||
/**
|
||
* 清空所有缓存
|
||
*/
|
||
async flush(): Promise<void> {
|
||
try {
|
||
await prisma.appCache.deleteMany({});
|
||
logger.info('[PostgresCache] 缓存已清空');
|
||
} catch (error) {
|
||
logger.error('[PostgresCache] 清空失败', { error });
|
||
throw error;
|
||
}
|
||
}
|
||
|
||
/**
|
||
* 获取缓存统计信息
|
||
*/
|
||
async getStats(): Promise<{
|
||
total: number;
|
||
expired: number;
|
||
byModule: Record<string, number>;
|
||
}> {
|
||
try {
|
||
const now = new Date();
|
||
|
||
// 总数
|
||
const total = await prisma.appCache.count();
|
||
|
||
// 过期数量
|
||
const expired = await prisma.appCache.count({
|
||
where: {
|
||
expiresAt: { lt: now }
|
||
}
|
||
});
|
||
|
||
// 按模块统计(通过key前缀分组)
|
||
const all = await prisma.appCache.findMany({
|
||
select: { key: true }
|
||
});
|
||
|
||
const byModule: Record<string, number> = {};
|
||
all.forEach(item => {
|
||
const module = item.key.split(':')[0];
|
||
byModule[module] = (byModule[module] || 0) + 1;
|
||
});
|
||
|
||
return { total, expired, byModule };
|
||
|
||
} catch (error) {
|
||
logger.error('[PostgresCache] 获取统计失败', { error });
|
||
return { total: 0, expired: 0, byModule: {} };
|
||
}
|
||
}
|
||
}
|
||
|
||
/**
|
||
* 启动定时清理任务(分批清理,防止阻塞)
|
||
*
|
||
* 策略:
|
||
* - 每分钟执行一次
|
||
* - 每次删除1000条过期数据
|
||
* - 使用LIMIT避免大事务
|
||
*/
|
||
export function startCacheCleanupTask(): void {
|
||
const CLEANUP_INTERVAL = 60 * 1000; // 1分钟
|
||
const BATCH_SIZE = 1000; // 每次1000条
|
||
|
||
setInterval(async () => {
|
||
try {
|
||
// 使用原生SQL,支持LIMIT
|
||
const result = await prisma.$executeRaw`
|
||
DELETE FROM platform_schema.app_cache
|
||
WHERE id IN (
|
||
SELECT id FROM platform_schema.app_cache
|
||
WHERE expires_at < NOW()
|
||
LIMIT ${BATCH_SIZE}
|
||
)
|
||
`;
|
||
|
||
if (result > 0) {
|
||
logger.info('[PostgresCache] 定时清理', { deleted: result });
|
||
}
|
||
|
||
} catch (error) {
|
||
logger.error('[PostgresCache] 定时清理失败', { error });
|
||
}
|
||
}, CLEANUP_INTERVAL);
|
||
|
||
logger.info('[PostgresCache] 定时清理任务已启动', {
|
||
interval: `${CLEANUP_INTERVAL / 1000}秒`,
|
||
batchSize: BATCH_SIZE
|
||
});
|
||
}
|
||
```
|
||
|
||
#### **任务2.2:更新CacheFactory**
|
||
|
||
```typescript
|
||
// 文件:backend/src/common/cache/CacheFactory.ts
|
||
|
||
import { CacheAdapter } from './CacheAdapter.js';
|
||
import { MemoryCacheAdapter } from './MemoryCacheAdapter.js';
|
||
import { RedisCacheAdapter } from './RedisCacheAdapter.js';
|
||
import { PostgresCacheAdapter } from './PostgresCacheAdapter.js'; // ← 新增
|
||
import { logger } from '../logging/index.js';
|
||
import { config } from '../../config/env.js';
|
||
|
||
export class CacheFactory {
|
||
private static instance: CacheAdapter | null = null;
|
||
|
||
static getInstance(): CacheAdapter {
|
||
if (!this.instance) {
|
||
this.instance = this.createAdapter();
|
||
}
|
||
return this.instance;
|
||
}
|
||
|
||
private static createAdapter(): CacheAdapter {
|
||
const cacheType = config.cacheType;
|
||
|
||
logger.info('[CacheFactory] 初始化缓存', { cacheType });
|
||
|
||
switch (cacheType) {
|
||
case 'postgres': // ← 新增
|
||
return this.createPostgresAdapter();
|
||
|
||
case 'memory':
|
||
return this.createMemoryAdapter();
|
||
|
||
case 'redis':
|
||
return this.createRedisAdapter();
|
||
|
||
default:
|
||
logger.warn(`[CacheFactory] 未知缓存类型: ${cacheType}, 降级到内存`);
|
||
return this.createMemoryAdapter();
|
||
}
|
||
}
|
||
|
||
/**
|
||
* 创建Postgres缓存适配器
|
||
*/
|
||
private static createPostgresAdapter(): PostgresCacheAdapter {
|
||
logger.info('[CacheFactory] 使用PostgresCacheAdapter');
|
||
return new PostgresCacheAdapter();
|
||
}
|
||
|
||
private static createMemoryAdapter(): MemoryCacheAdapter {
|
||
logger.info('[CacheFactory] 使用MemoryCacheAdapter');
|
||
return new MemoryCacheAdapter();
|
||
}
|
||
|
||
private static createRedisAdapter(): RedisCacheAdapter {
|
||
// ... 现有Redis逻辑 ...
|
||
logger.info('[CacheFactory] 使用RedisCacheAdapter');
|
||
return new RedisCacheAdapter({ /* ... */ });
|
||
}
|
||
|
||
static reset(): void {
|
||
this.instance = null;
|
||
}
|
||
}
|
||
```
|
||
|
||
#### **任务2.3:更新导出**
|
||
|
||
```typescript
|
||
// 文件:backend/src/common/cache/index.ts
|
||
|
||
export type { CacheAdapter } from './CacheAdapter.js';
|
||
export { MemoryCacheAdapter } from './MemoryCacheAdapter.js';
|
||
export { RedisCacheAdapter } from './RedisCacheAdapter.js';
|
||
export { PostgresCacheAdapter, startCacheCleanupTask } from './PostgresCacheAdapter.js'; // ← 新增
|
||
export { CacheFactory } from './CacheFactory.js';
|
||
|
||
import { CacheFactory } from './CacheFactory.js';
|
||
|
||
export const cache = CacheFactory.getInstance();
|
||
```
|
||
|
||
---
|
||
|
||
### 4.3 Phase 3:实现PgBossQueue(Day 2-3,2天)
|
||
|
||
#### **任务3.1:创建PgBossQueue**
|
||
|
||
```typescript
|
||
// 文件:backend/src/common/jobs/PgBossQueue.ts
|
||
|
||
import PgBoss from 'pg-boss';
|
||
import type { Job, JobQueue, JobHandler } from './types.js';
|
||
import { logger } from '../logging/index.js';
|
||
import { config } from '../../config/env.js';
|
||
|
||
/**
|
||
* PgBoss队列适配器
|
||
*
|
||
* 核心特性:
|
||
* - 任务持久化(实例重启不丢失)
|
||
* - 自动重试(指数退避)
|
||
* - 多实例协调(SKIP LOCKED)
|
||
* - 长任务支持(4小时超时)
|
||
*/
|
||
export class PgBossQueue implements JobQueue {
|
||
private boss: PgBoss;
|
||
private started = false;
|
||
private workers: Map<string, any> = new Map();
|
||
|
||
constructor() {
|
||
this.boss = new PgBoss({
|
||
connectionString: config.databaseUrl,
|
||
schema: 'platform_schema', // 统一在platform Schema
|
||
max: 5, // 连接池大小
|
||
|
||
// ✨ 关键配置:长任务支持
|
||
expireInHours: 4, // 任务锁4小时后过期
|
||
|
||
// 自动维护(清理旧任务)
|
||
retentionDays: 7, // 保留7天的历史任务
|
||
deleteAfterDays: 30, // 30天后彻底删除
|
||
});
|
||
|
||
// 监听错误
|
||
this.boss.on('error', error => {
|
||
logger.error('[PgBoss] 队列错误', { error: error.message });
|
||
});
|
||
|
||
// 监听维护事件
|
||
this.boss.on('maintenance', () => {
|
||
logger.debug('[PgBoss] 执行维护任务');
|
||
});
|
||
}
|
||
|
||
/**
|
||
* 启动队列(懒加载)
|
||
*/
|
||
private async ensureStarted(): Promise<void> {
|
||
if (this.started) return;
|
||
|
||
try {
|
||
await this.boss.start();
|
||
this.started = true;
|
||
logger.info('[PgBoss] 队列已启动');
|
||
} catch (error) {
|
||
logger.error('[PgBoss] 队列启动失败', { error });
|
||
throw error;
|
||
}
|
||
}
|
||
|
||
/**
|
||
* 推送任务到队列
|
||
*/
|
||
async push<T = any>(type: string, data: T, options?: any): Promise<Job> {
|
||
await this.ensureStarted();
|
||
|
||
try {
|
||
const jobId = await this.boss.send(type, data, {
|
||
retryLimit: 3, // 失败重试3次
|
||
retryDelay: 60, // 失败后60秒重试
|
||
retryBackoff: true, // 指数退避(60s, 120s, 240s)
|
||
expireInHours: 4, // 4小时后过期
|
||
...options
|
||
});
|
||
|
||
logger.info('[PgBoss] 任务入队', { type, jobId });
|
||
|
||
return {
|
||
id: jobId!,
|
||
type,
|
||
data,
|
||
status: 'pending',
|
||
createdAt: new Date(),
|
||
};
|
||
|
||
} catch (error) {
|
||
logger.error('[PgBoss] 任务入队失败', { type, error });
|
||
throw error;
|
||
}
|
||
}
|
||
|
||
/**
|
||
* 注册任务处理器
|
||
*/
|
||
process<T = any>(type: string, handler: JobHandler<T>): void {
|
||
// 异步注册(不阻塞主流程)
|
||
this.registerWorkerAsync(type, handler);
|
||
}
|
||
|
||
/**
|
||
* 异步注册Worker
|
||
*/
|
||
private async registerWorkerAsync<T>(
|
||
type: string,
|
||
handler: JobHandler<T>
|
||
): Promise<void> {
|
||
try {
|
||
await this.ensureStarted();
|
||
|
||
// 注册Worker
|
||
await this.boss.work(
|
||
type,
|
||
{
|
||
teamSize: 1, // 每个队列并发1个任务
|
||
teamConcurrency: 1 // 每个Worker处理1个任务
|
||
},
|
||
async (job: any) => {
|
||
const startTime = Date.now();
|
||
|
||
logger.info('[PgBoss] 开始处理任务', {
|
||
type,
|
||
jobId: job.id,
|
||
attemptsMade: job.data.__retryCount || 0,
|
||
attemptsTotal: 3
|
||
});
|
||
|
||
try {
|
||
// 调用业务处理函数
|
||
const result = await handler({
|
||
id: job.id,
|
||
type,
|
||
data: job.data as T,
|
||
status: 'processing',
|
||
createdAt: new Date(job.createdon),
|
||
});
|
||
|
||
const duration = Date.now() - startTime;
|
||
logger.info('[PgBoss] ✅ 任务完成', {
|
||
type,
|
||
jobId: job.id,
|
||
duration: `${(duration / 1000).toFixed(2)}s`
|
||
});
|
||
|
||
return result;
|
||
|
||
} catch (error) {
|
||
const duration = Date.now() - startTime;
|
||
logger.error('[PgBoss] ❌ 任务失败', {
|
||
type,
|
||
jobId: job.id,
|
||
attemptsMade: job.data.__retryCount || 0,
|
||
duration: `${(duration / 1000).toFixed(2)}s`,
|
||
error: error instanceof Error ? error.message : 'Unknown'
|
||
});
|
||
|
||
// 抛出错误,触发pg-boss自动重试
|
||
throw error;
|
||
}
|
||
}
|
||
);
|
||
|
||
this.workers.set(type, true);
|
||
logger.info('[PgBoss] ✅ Worker已注册', { type });
|
||
|
||
} catch (error) {
|
||
logger.error('[PgBoss] Worker注册失败', { type, error });
|
||
throw error;
|
||
}
|
||
}
|
||
|
||
/**
|
||
* 获取任务状态
|
||
*/
|
||
async getJob(id: string): Promise<Job | null> {
|
||
try {
|
||
await this.ensureStarted();
|
||
|
||
const job = await this.boss.getJobById(id);
|
||
if (!job) return null;
|
||
|
||
return {
|
||
id: job.id!,
|
||
type: job.name,
|
||
data: job.data,
|
||
status: this.mapState(job.state),
|
||
progress: 0, // pg-boss不直接支持进度
|
||
createdAt: new Date(job.createdon),
|
||
completedAt: job.completedon ? new Date(job.completedon) : undefined,
|
||
error: job.output?.message,
|
||
};
|
||
|
||
} catch (error) {
|
||
logger.error('[PgBoss] 获取任务失败', { id, error });
|
||
return null;
|
||
}
|
||
}
|
||
|
||
/**
|
||
* 映射pg-boss状态到通用状态
|
||
*/
|
||
private mapState(state: string): Job['status'] {
|
||
switch (state) {
|
||
case 'completed':
|
||
return 'completed';
|
||
case 'failed':
|
||
return 'failed';
|
||
case 'active':
|
||
return 'processing';
|
||
case 'cancelled':
|
||
return 'cancelled';
|
||
default:
|
||
return 'pending';
|
||
}
|
||
}
|
||
|
||
/**
|
||
* 更新任务进度(通过业务表)
|
||
*
|
||
* 注意:pg-boss不直接支持进度更新,
|
||
* 需要在业务层通过数据库表实现
|
||
*/
|
||
async updateProgress(id: string, progress: number, message?: string): Promise<void> {
|
||
logger.debug('[PgBoss] 进度更新(需业务表支持)', { id, progress, message });
|
||
// 实际实现:更新 aslScreeningTask.processedItems
|
||
}
|
||
|
||
/**
|
||
* 取消任务
|
||
*/
|
||
async cancelJob(id: string): Promise<boolean> {
|
||
try {
|
||
await this.ensureStarted();
|
||
await this.boss.cancel(id);
|
||
logger.info('[PgBoss] 任务已取消', { id });
|
||
return true;
|
||
} catch (error) {
|
||
logger.error('[PgBoss] 取消任务失败', { id, error });
|
||
return false;
|
||
}
|
||
}
|
||
|
||
/**
|
||
* 重试失败任务
|
||
*/
|
||
async retryJob(id: string): Promise<boolean> {
|
||
try {
|
||
await this.ensureStarted();
|
||
await this.boss.resume(id);
|
||
logger.info('[PgBoss] 任务已重试', { id });
|
||
return true;
|
||
} catch (error) {
|
||
logger.error('[PgBoss] 重试任务失败', { id, error });
|
||
return false;
|
||
}
|
||
}
|
||
|
||
/**
|
||
* 清理旧任务(pg-boss自动处理)
|
||
*/
|
||
async cleanup(olderThan: number = 86400000): Promise<number> {
|
||
// pg-boss有自动清理机制(retentionDays)
|
||
logger.debug('[PgBoss] 使用自动清理机制');
|
||
return 0;
|
||
}
|
||
|
||
/**
|
||
* 关闭队列(优雅关闭)
|
||
*/
|
||
async close(): Promise<void> {
|
||
if (!this.started) return;
|
||
|
||
try {
|
||
await this.boss.stop();
|
||
this.started = false;
|
||
logger.info('[PgBoss] 队列已关闭');
|
||
} catch (error) {
|
||
logger.error('[PgBoss] 队列关闭失败', { error });
|
||
}
|
||
}
|
||
}
|
||
```
|
||
|
||
#### **任务3.2:更新JobFactory**
|
||
|
||
```typescript
|
||
// 文件:backend/src/common/jobs/JobFactory.ts
|
||
|
||
import { JobQueue } from './types.js';
|
||
import { MemoryQueue } from './MemoryQueue.js';
|
||
import { PgBossQueue } from './PgBossQueue.js'; // ← 新增
|
||
import { logger } from '../logging/index.js';
|
||
import { config } from '../../config/env.js';
|
||
|
||
export class JobFactory {
|
||
private static instance: JobQueue | null = null;
|
||
|
||
static getInstance(): JobQueue {
|
||
if (!this.instance) {
|
||
this.instance = this.createQueue();
|
||
}
|
||
return this.instance;
|
||
}
|
||
|
||
private static createQueue(): JobQueue {
|
||
const queueType = config.queueType;
|
||
|
||
logger.info('[JobFactory] 初始化任务队列', { queueType });
|
||
|
||
switch (queueType) {
|
||
case 'pgboss': // ← 新增
|
||
return this.createPgBossQueue();
|
||
|
||
case 'memory':
|
||
return this.createMemoryQueue();
|
||
|
||
default:
|
||
logger.warn(`[JobFactory] 未知队列类型: ${queueType}, 降级到内存`);
|
||
return this.createMemoryQueue();
|
||
}
|
||
}
|
||
|
||
/**
|
||
* 创建PgBoss队列
|
||
*/
|
||
private static createPgBossQueue(): PgBossQueue {
|
||
logger.info('[JobFactory] 使用PgBossQueue');
|
||
return new PgBossQueue();
|
||
}
|
||
|
||
private static createMemoryQueue(): MemoryQueue {
|
||
logger.info('[JobFactory] 使用MemoryQueue');
|
||
|
||
const queue = new MemoryQueue();
|
||
|
||
// 定期清理(避免内存泄漏)
|
||
if (process.env.NODE_ENV !== 'test') {
|
||
setInterval(() => {
|
||
queue.cleanup();
|
||
}, 60 * 60 * 1000);
|
||
}
|
||
|
||
return queue;
|
||
}
|
||
|
||
static reset(): void {
|
||
this.instance = null;
|
||
}
|
||
}
|
||
```
|
||
|
||
#### **任务3.3:更新导出**
|
||
|
||
```typescript
|
||
// 文件:backend/src/common/jobs/index.ts
|
||
|
||
export type { Job, JobStatus, JobHandler, JobQueue } from './types.js';
|
||
export { MemoryQueue } from './MemoryQueue.js';
|
||
export { PgBossQueue } from './PgBossQueue.js'; // ← 新增
|
||
export { JobFactory } from './JobFactory.js';
|
||
|
||
import { JobFactory } from './JobFactory.js';
|
||
|
||
export const jobQueue = JobFactory.getInstance();
|
||
```
|
||
|
||
---
|
||
|
||
### 4.4 Phase 4:实现任务拆分机制(Day 4,1天)✨ **新增**
|
||
|
||
#### **任务4.1:创建任务拆分工具函数**
|
||
|
||
```typescript
|
||
// 文件:backend/src/common/jobs/utils.ts (新建文件)
|
||
|
||
import { logger } from '../logging/index.js';
|
||
|
||
/**
|
||
* 将数组拆分成多个批次
|
||
*
|
||
* @param items 要拆分的项目数组
|
||
* @param chunkSize 每批次大小
|
||
* @returns 拆分后的二维数组
|
||
*
|
||
* @example
|
||
* splitIntoChunks([1,2,3,4,5], 2) // [[1,2], [3,4], [5]]
|
||
*/
|
||
export function splitIntoChunks<T>(items: T[], chunkSize: number = 100): T[][] {
|
||
if (chunkSize <= 0) {
|
||
throw new Error('chunkSize must be greater than 0');
|
||
}
|
||
|
||
const chunks: T[][] = [];
|
||
for (let i = 0; i < items.length; i += chunkSize) {
|
||
chunks.push(items.slice(i, i + chunkSize));
|
||
}
|
||
|
||
logger.debug('[TaskSplit] 任务拆分完成', {
|
||
total: items.length,
|
||
chunkSize,
|
||
chunks: chunks.length
|
||
});
|
||
|
||
return chunks;
|
||
}
|
||
|
||
/**
|
||
* 估算处理时间
|
||
*
|
||
* @param itemCount 项目数量
|
||
* @param timePerItem 每项处理时间(秒)
|
||
* @returns 总时间(秒)
|
||
*/
|
||
export function estimateProcessingTime(
|
||
itemCount: number,
|
||
timePerItem: number
|
||
): number {
|
||
return itemCount * timePerItem;
|
||
}
|
||
|
||
/**
|
||
* 推荐批次大小
|
||
*
|
||
* 根据单项处理时间和最大批次时间,计算最优批次大小
|
||
*
|
||
* @param totalItems 总项目数
|
||
* @param timePerItem 每项处理时间(秒)
|
||
* @param maxChunkTime 单批次最大时间(秒,默认15分钟)
|
||
* @returns 推荐的批次大小
|
||
*
|
||
* @example
|
||
* recommendChunkSize(1000, 7.2, 900) // 返回 125(每批125项,15分钟)
|
||
*/
|
||
export function recommendChunkSize(
|
||
totalItems: number,
|
||
timePerItem: number,
|
||
maxChunkTime: number = 900 // 15分钟
|
||
): number {
|
||
// 计算每批次最多能处理多少项
|
||
const itemsPerChunk = Math.floor(maxChunkTime / timePerItem);
|
||
|
||
// 限制范围:最少10项,最多1000项
|
||
const recommended = Math.max(10, Math.min(itemsPerChunk, 1000));
|
||
|
||
logger.info('[TaskSplit] 批次大小推荐', {
|
||
totalItems,
|
||
timePerItem: `${timePerItem}秒`,
|
||
maxChunkTime: `${maxChunkTime}秒`,
|
||
recommended,
|
||
estimatedBatches: Math.ceil(totalItems / recommended),
|
||
estimatedTimePerBatch: `${(recommended * timePerItem / 60).toFixed(1)}分钟`
|
||
});
|
||
|
||
return recommended;
|
||
}
|
||
|
||
/**
|
||
* 批次任务配置表
|
||
*/
|
||
export const CHUNK_STRATEGIES = {
|
||
// ASL文献筛选:每批100篇,约12分钟
|
||
'asl:title-screening': {
|
||
chunkSize: 100,
|
||
timePerItem: 7.2, // 秒
|
||
estimatedTime: 720, // 12分钟
|
||
maxRetries: 3,
|
||
description: 'ASL标题摘要筛选(双模型并行)'
|
||
},
|
||
|
||
// ASL全文复筛:每批50篇,约15分钟
|
||
'asl:fulltext-screening': {
|
||
chunkSize: 50,
|
||
timePerItem: 18, // 秒
|
||
estimatedTime: 900, // 15分钟
|
||
maxRetries: 3,
|
||
description: 'ASL全文复筛(12字段提取)'
|
||
},
|
||
|
||
// DC病历提取:每批50份,约8分钟
|
||
'dc:medical-extraction': {
|
||
chunkSize: 50,
|
||
timePerItem: 10, // 秒
|
||
estimatedTime: 500, // 8分钟
|
||
maxRetries: 3,
|
||
description: 'DC医疗记录结构化提取'
|
||
},
|
||
|
||
// 统计分析:每批5000条,约8分钟
|
||
'ssa:statistical-analysis': {
|
||
chunkSize: 5000,
|
||
timePerItem: 0.1, // 秒
|
||
estimatedTime: 500, // 8分钟
|
||
maxRetries: 2,
|
||
description: 'SSA统计分析计算'
|
||
}
|
||
} as const;
|
||
|
||
/**
|
||
* 获取任务拆分策略
|
||
*/
|
||
export function getChunkStrategy(taskType: keyof typeof CHUNK_STRATEGIES) {
|
||
const strategy = CHUNK_STRATEGIES[taskType];
|
||
if (!strategy) {
|
||
logger.warn('[TaskSplit] 未找到任务策略,使用默认配置', { taskType });
|
||
return {
|
||
chunkSize: 100,
|
||
timePerItem: 10,
|
||
estimatedTime: 1000,
|
||
maxRetries: 3,
|
||
description: '默认任务策略'
|
||
};
|
||
}
|
||
return strategy;
|
||
}
|
||
```
|
||
|
||
#### **任务4.2:更新导出**
|
||
|
||
```typescript
|
||
// 文件:backend/src/common/jobs/index.ts
|
||
|
||
export type { Job, JobStatus, JobHandler, JobQueue } from './types.js';
|
||
export { MemoryQueue } from './MemoryQueue.js';
|
||
export { PgBossQueue } from './PgBossQueue.js';
|
||
export { JobFactory } from './JobFactory.js';
|
||
export { // ← 新增
|
||
splitIntoChunks,
|
||
estimateProcessingTime,
|
||
recommendChunkSize,
|
||
getChunkStrategy,
|
||
CHUNK_STRATEGIES
|
||
} from './utils.js';
|
||
|
||
import { JobFactory } from './JobFactory.js';
|
||
|
||
export const jobQueue = JobFactory.getInstance();
|
||
```
|
||
|
||
#### **任务4.3:单元测试**
|
||
|
||
```typescript
|
||
// 文件:backend/tests/common/jobs/utils.test.ts(新建)
|
||
|
||
import { describe, it, expect } from 'vitest';
|
||
import { splitIntoChunks, recommendChunkSize } from '../../../src/common/jobs/utils.js';
|
||
|
||
describe('Task Split Utils', () => {
|
||
describe('splitIntoChunks', () => {
|
||
it('should split array into chunks', () => {
|
||
const items = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
|
||
const chunks = splitIntoChunks(items, 3);
|
||
|
||
expect(chunks).toEqual([
|
||
[1, 2, 3],
|
||
[4, 5, 6],
|
||
[7, 8, 9],
|
||
[10]
|
||
]);
|
||
});
|
||
|
||
it('should handle exact division', () => {
|
||
const items = [1, 2, 3, 4, 5, 6];
|
||
const chunks = splitIntoChunks(items, 2);
|
||
|
||
expect(chunks).toEqual([
|
||
[1, 2],
|
||
[3, 4],
|
||
[5, 6]
|
||
]);
|
||
});
|
||
|
||
it('should handle empty array', () => {
|
||
const chunks = splitIntoChunks([], 10);
|
||
expect(chunks).toEqual([]);
|
||
});
|
||
});
|
||
|
||
describe('recommendChunkSize', () => {
|
||
it('should recommend chunk size for ASL screening', () => {
|
||
const size = recommendChunkSize(1000, 7.2, 900); // 15分钟
|
||
expect(size).toBe(125); // 125 * 7.2 = 900秒
|
||
});
|
||
|
||
it('should not exceed max limit', () => {
|
||
const size = recommendChunkSize(10000, 0.1, 900);
|
||
expect(size).toBe(1000); // 最大1000
|
||
});
|
||
|
||
it('should not go below min limit', () => {
|
||
const size = recommendChunkSize(100, 100, 900);
|
||
expect(size).toBe(10); // 最小10
|
||
});
|
||
});
|
||
});
|
||
```
|
||
|
||
---
|
||
|
||
### 4.5 Phase 5:实现断点续传机制(Day 5,1天)✨ **新增**
|
||
|
||
#### **任务5.1:更新Prisma Schema**
|
||
|
||
```prisma
|
||
// 文件:prisma/schema.prisma
|
||
|
||
model AslScreeningTask {
|
||
// ... 现有字段 ...
|
||
|
||
// ✨ 新增字段:任务拆分
|
||
totalBatches Int @default(1) @map("total_batches")
|
||
processedBatches Int @default(0) @map("processed_batches")
|
||
currentBatchIndex Int @default(0) @map("current_batch_index")
|
||
|
||
// ✨ 新增字段:断点续传
|
||
currentIndex Int @default(0) @map("current_index")
|
||
lastCheckpoint DateTime? @map("last_checkpoint")
|
||
checkpointData Json? @map("checkpoint_data")
|
||
}
|
||
```
|
||
|
||
```bash
|
||
# 生成迁移
|
||
cd backend
|
||
npx prisma migrate dev --name add_task_split_checkpoint
|
||
|
||
# 验证
|
||
npx prisma migrate status
|
||
```
|
||
|
||
#### **任务5.2:创建断点续传服务**
|
||
|
||
```typescript
|
||
// 文件:backend/src/common/jobs/CheckpointService.ts(新建)
|
||
|
||
import { prisma } from '../../config/database.js';
|
||
import { logger } from '../logging/index.js';
|
||
|
||
export interface CheckpointData {
|
||
lastProcessedId?: string;
|
||
lastProcessedIndex: number;
|
||
batchProgress: number;
|
||
metadata?: Record<string, any>;
|
||
}
|
||
|
||
/**
|
||
* 断点续传服务
|
||
*
|
||
* 提供任务断点的保存、读取和恢复功能
|
||
*/
|
||
export class CheckpointService {
|
||
/**
|
||
* 保存断点
|
||
*
|
||
* @param taskId 任务ID
|
||
* @param currentIndex 当前索引
|
||
* @param data 断点数据
|
||
*/
|
||
static async saveCheckpoint(
|
||
taskId: string,
|
||
currentIndex: number,
|
||
data?: Partial<CheckpointData>
|
||
): Promise<void> {
|
||
try {
|
||
const checkpointData: CheckpointData = {
|
||
lastProcessedIndex: currentIndex,
|
||
batchProgress: 0,
|
||
...data
|
||
};
|
||
|
||
await prisma.aslScreeningTask.update({
|
||
where: { id: taskId },
|
||
data: {
|
||
currentIndex,
|
||
lastCheckpoint: new Date(),
|
||
checkpointData: checkpointData as any
|
||
}
|
||
});
|
||
|
||
logger.debug('[Checkpoint] 断点已保存', {
|
||
taskId,
|
||
currentIndex,
|
||
checkpoint: checkpointData
|
||
});
|
||
|
||
} catch (error) {
|
||
logger.error('[Checkpoint] 保存断点失败', { taskId, currentIndex, error });
|
||
// 不抛出错误,避免影响主流程
|
||
}
|
||
}
|
||
|
||
/**
|
||
* 读取断点
|
||
*
|
||
* @param taskId 任务ID
|
||
* @returns 断点索引和数据
|
||
*/
|
||
static async loadCheckpoint(taskId: string): Promise<{
|
||
startIndex: number;
|
||
data: CheckpointData | null;
|
||
}> {
|
||
try {
|
||
const task = await prisma.aslScreeningTask.findUnique({
|
||
where: { id: taskId },
|
||
select: {
|
||
currentIndex: true,
|
||
lastCheckpoint: true,
|
||
checkpointData: true
|
||
}
|
||
});
|
||
|
||
if (!task) {
|
||
return { startIndex: 0, data: null };
|
||
}
|
||
|
||
const startIndex = task.currentIndex || 0;
|
||
const data = task.checkpointData as CheckpointData | null;
|
||
|
||
if (startIndex > 0) {
|
||
logger.info('[Checkpoint] 断点恢复', {
|
||
taskId,
|
||
startIndex,
|
||
lastCheckpoint: task.lastCheckpoint,
|
||
data
|
||
});
|
||
}
|
||
|
||
return { startIndex, data };
|
||
|
||
} catch (error) {
|
||
logger.error('[Checkpoint] 读取断点失败', { taskId, error });
|
||
return { startIndex: 0, data: null };
|
||
}
|
||
}
|
||
|
||
/**
|
||
* 清除断点
|
||
*
|
||
* @param taskId 任务ID
|
||
*/
|
||
static async clearCheckpoint(taskId: string): Promise<void> {
|
||
try {
|
||
await prisma.aslScreeningTask.update({
|
||
where: { id: taskId },
|
||
data: {
|
||
currentIndex: 0,
|
||
lastCheckpoint: null,
|
||
checkpointData: null
|
||
}
|
||
});
|
||
|
||
logger.debug('[Checkpoint] 断点已清除', { taskId });
|
||
|
||
} catch (error) {
|
||
logger.error('[Checkpoint] 清除断点失败', { taskId, error });
|
||
}
|
||
}
|
||
|
||
/**
|
||
* 批量保存进度(包含断点)
|
||
*
|
||
* @param taskId 任务ID
|
||
* @param updates 更新数据
|
||
*/
|
||
static async updateProgress(
|
||
taskId: string,
|
||
updates: {
|
||
processedItems?: number;
|
||
successItems?: number;
|
||
failedItems?: number;
|
||
conflictItems?: number;
|
||
currentIndex?: number;
|
||
checkpointData?: Partial<CheckpointData>;
|
||
}
|
||
): Promise<void> {
|
||
try {
|
||
const data: any = {};
|
||
|
||
// 累加字段
|
||
if (updates.processedItems !== undefined) {
|
||
data.processedItems = { increment: updates.processedItems };
|
||
}
|
||
if (updates.successItems !== undefined) {
|
||
data.successItems = { increment: updates.successItems };
|
||
}
|
||
if (updates.failedItems !== undefined) {
|
||
data.failedItems = { increment: updates.failedItems };
|
||
}
|
||
if (updates.conflictItems !== undefined) {
|
||
data.conflictItems = { increment: updates.conflictItems };
|
||
}
|
||
|
||
// 断点字段
|
||
if (updates.currentIndex !== undefined) {
|
||
data.currentIndex = updates.currentIndex;
|
||
data.lastCheckpoint = new Date();
|
||
}
|
||
if (updates.checkpointData) {
|
||
data.checkpointData = updates.checkpointData;
|
||
}
|
||
|
||
await prisma.aslScreeningTask.update({
|
||
where: { id: taskId },
|
||
data
|
||
});
|
||
|
||
logger.debug('[Checkpoint] 进度已更新', { taskId, updates });
|
||
|
||
} catch (error) {
|
||
logger.error('[Checkpoint] 更新进度失败', { taskId, error });
|
||
}
|
||
}
|
||
}
|
||
```
|
||
|
||
#### **任务5.3:更新导出**
|
||
|
||
```typescript
|
||
// 文件:backend/src/common/jobs/index.ts
|
||
|
||
export { CheckpointService } from './CheckpointService.js'; // ← 新增
|
||
```
|
||
|
||
---
|
||
|
||
### 4.6 Phase 6:改造业务代码(Day 6-7,2天)✨ **已更新**
|
||
|
||
#### **任务6.1:改造ASL筛选服务(✨ 完整版:拆分+断点+队列)**
|
||
|
||
```typescript
|
||
// 文件:backend/src/modules/asl/services/screeningService.ts
|
||
|
||
import { prisma } from '../../../config/database.js';
|
||
import { logger } from '../../../common/logging/index.js';
|
||
import {
|
||
jobQueue,
|
||
splitIntoChunks,
|
||
recommendChunkSize,
|
||
CheckpointService // ✨ 新增
|
||
} from '../../../common/jobs/index.js';
|
||
import { llmScreeningService } from './llmScreeningService.js';
|
||
|
||
/**
|
||
* 启动筛选任务(改造后:使用队列)
|
||
*/
|
||
export async function startScreeningTask(projectId: string, userId: string) {
|
||
try {
|
||
logger.info('Starting screening task', { projectId, userId });
|
||
|
||
// 1. 检查项目是否存在
|
||
const project = await prisma.aslScreeningProject.findFirst({
|
||
where: { id: projectId, userId },
|
||
});
|
||
|
||
if (!project) {
|
||
throw new Error('Project not found');
|
||
}
|
||
|
||
// 2. 获取该项目的所有文献
|
||
const literatures = await prisma.aslLiterature.findMany({
|
||
where: { projectId },
|
||
});
|
||
|
||
if (literatures.length === 0) {
|
||
throw new Error('No literatures found in project');
|
||
}
|
||
|
||
logger.info('Found literatures for screening', {
|
||
projectId,
|
||
count: literatures.length
|
||
});
|
||
|
||
// 3. 创建筛选任务(数据库记录)
|
||
const task = await prisma.aslScreeningTask.create({
|
||
data: {
|
||
projectId,
|
||
taskType: 'title_abstract',
|
||
status: 'pending', // ← 初始状态改为pending
|
||
totalItems: literatures.length,
|
||
processedItems: 0,
|
||
successItems: 0,
|
||
failedItems: 0,
|
||
conflictItems: 0,
|
||
startedAt: new Date(),
|
||
},
|
||
});
|
||
|
||
logger.info('Screening task created', { taskId: task.id });
|
||
|
||
// 4. ✨ 推送到队列(异步处理,不阻塞请求)
|
||
await jobQueue.push('asl:title-screening', {
|
||
taskId: task.id,
|
||
projectId,
|
||
literatureIds: literatures.map(lit => lit.id),
|
||
});
|
||
|
||
logger.info('Task pushed to queue', { taskId: task.id });
|
||
|
||
// 5. 立即返回任务ID(前端可以轮询进度)
|
||
return task;
|
||
|
||
} catch (error) {
|
||
logger.error('Failed to start screening task', { error, projectId });
|
||
throw error;
|
||
}
|
||
}
|
||
|
||
/**
|
||
* ✨ 注册队列Worker(在应用启动时调用)
|
||
*
|
||
* 这个函数需要在 backend/src/index.ts 中调用
|
||
*/
|
||
export function registerScreeningWorkers() {
|
||
// 注册标题摘要筛选Worker
|
||
jobQueue.process('asl:title-screening', async (job) => {
|
||
const { taskId, projectId, literatureIds } = job.data;
|
||
|
||
logger.info('开始处理标题摘要筛选', {
|
||
taskId,
|
||
total: literatureIds.length
|
||
});
|
||
|
||
try {
|
||
// 更新任务状态为running
|
||
await prisma.aslScreeningTask.update({
|
||
where: { id: taskId },
|
||
data: { status: 'running' }
|
||
});
|
||
|
||
// 获取项目的PICOS标准
|
||
const project = await prisma.aslScreeningProject.findUnique({
|
||
where: { id: projectId },
|
||
});
|
||
|
||
if (!project) {
|
||
throw new Error('Project not found');
|
||
}
|
||
|
||
const rawPicoCriteria = project.picoCriteria as any;
|
||
const picoCriteria = {
|
||
P: rawPicoCriteria?.P || rawPicoCriteria?.population || '',
|
||
I: rawPicoCriteria?.I || rawPicoCriteria?.intervention || '',
|
||
C: rawPicoCriteria?.C || rawPicoCriteria?.comparison || '',
|
||
O: rawPicoCriteria?.O || rawPicoCriteria?.outcome || '',
|
||
S: rawPicoCriteria?.S || rawPicoCriteria?.studyDesign || '',
|
||
};
|
||
|
||
// 逐个处理文献
|
||
let successCount = 0;
|
||
let failedCount = 0;
|
||
let conflictCount = 0;
|
||
|
||
for (let i = 0; i < literatureIds.length; i++) {
|
||
const literatureId = literatureIds[i];
|
||
|
||
try {
|
||
// 获取文献信息
|
||
const literature = await prisma.aslLiterature.findUnique({
|
||
where: { id: literatureId },
|
||
});
|
||
|
||
if (!literature) {
|
||
failedCount++;
|
||
continue;
|
||
}
|
||
|
||
// 调用LLM筛选服务
|
||
const screeningResult = await llmScreeningService.screenSingleLiterature(
|
||
literature.title || '',
|
||
literature.abstract || '',
|
||
picoCriteria,
|
||
projectId
|
||
);
|
||
|
||
// 判断是否冲突
|
||
const isConflict = screeningResult.deepseekDecision !== screeningResult.qwenDecision;
|
||
|
||
// 保存筛选结果
|
||
await prisma.aslScreeningResult.create({
|
||
data: {
|
||
literatureId,
|
||
projectId,
|
||
taskId,
|
||
taskType: 'title_abstract',
|
||
|
||
deepseekDecision: screeningResult.deepseekDecision,
|
||
deepseekReason: screeningResult.deepseekReason,
|
||
deepseekRawResponse: screeningResult.deepseekRawResponse,
|
||
|
||
qwenDecision: screeningResult.qwenDecision,
|
||
qwenReason: screeningResult.qwenReason,
|
||
qwenRawResponse: screeningResult.qwenRawResponse,
|
||
|
||
finalDecision: screeningResult.finalDecision,
|
||
hasConflict: isConflict,
|
||
manualReviewRequired: isConflict,
|
||
},
|
||
});
|
||
|
||
if (isConflict) {
|
||
conflictCount++;
|
||
} else {
|
||
successCount++;
|
||
}
|
||
|
||
} catch (error) {
|
||
logger.error('处理文献失败', { literatureId, error });
|
||
failedCount++;
|
||
}
|
||
|
||
// 更新进度(每处理10篇或最后一篇时更新)
|
||
if ((i + 1) % 10 === 0 || i === literatureIds.length - 1) {
|
||
await prisma.aslScreeningTask.update({
|
||
where: { id: taskId },
|
||
data: {
|
||
processedItems: i + 1,
|
||
successItems: successCount,
|
||
failedItems: failedCount,
|
||
conflictItems: conflictCount,
|
||
}
|
||
});
|
||
}
|
||
}
|
||
|
||
// 标记任务完成
|
||
await prisma.aslScreeningTask.update({
|
||
where: { id: taskId },
|
||
data: {
|
||
status: 'completed',
|
||
completedAt: new Date(),
|
||
processedItems: literatureIds.length,
|
||
successItems: successCount,
|
||
failedItems: failedCount,
|
||
conflictItems: conflictCount,
|
||
}
|
||
});
|
||
|
||
logger.info('标题摘要筛选完成', {
|
||
taskId,
|
||
total: literatureIds.length,
|
||
success: successCount,
|
||
failed: failedCount,
|
||
conflict: conflictCount
|
||
});
|
||
|
||
return {
|
||
success: true,
|
||
processed: literatureIds.length,
|
||
successCount,
|
||
failedCount,
|
||
conflictCount,
|
||
};
|
||
|
||
} catch (error) {
|
||
// 任务失败,更新状态
|
||
await prisma.aslScreeningTask.update({
|
||
where: { id: taskId },
|
||
data: {
|
||
status: 'failed',
|
||
completedAt: new Date(),
|
||
}
|
||
});
|
||
|
||
logger.error('标题摘要筛选失败', { taskId, error });
|
||
throw error;
|
||
}
|
||
});
|
||
|
||
logger.info('✅ ASL筛选Worker已注册');
|
||
}
|
||
```
|
||
|
||
#### **任务4.2:更新应用入口(注册Workers)**
|
||
|
||
```typescript
|
||
// 文件:backend/src/index.ts
|
||
|
||
import Fastify from 'fastify';
|
||
import { logger } from './common/logging/index.js';
|
||
import { startCacheCleanupTask } from './common/cache/index.js'; // ← 新增
|
||
import { registerScreeningWorkers } from './modules/asl/services/screeningService.js'; // ← 新增
|
||
|
||
const app = Fastify({ logger: false });
|
||
|
||
// ... 注册路由等 ...
|
||
|
||
// ✨ 启动服务器后,注册队列Workers和定时任务
|
||
app.listen({ port: 3001, host: '0.0.0.0' }, async (err, address) => {
|
||
if (err) {
|
||
logger.error('Failed to start server', { error: err });
|
||
process.exit(1);
|
||
}
|
||
|
||
logger.info(`Server listening on ${address}`);
|
||
|
||
// ✨ 启动缓存定时清理
|
||
if (process.env.CACHE_TYPE === 'postgres') {
|
||
startCacheCleanupTask();
|
||
}
|
||
|
||
// ✨ 注册队列Workers
|
||
if (process.env.QUEUE_TYPE === 'pgboss') {
|
||
try {
|
||
registerScreeningWorkers(); // ASL模块
|
||
// registerDataCleaningWorkers(); // DC模块(待实现)
|
||
// registerStatisticalWorkers(); // SSA模块(待实现)
|
||
} catch (error) {
|
||
logger.error('Failed to register workers', { error });
|
||
}
|
||
}
|
||
});
|
||
|
||
// 优雅关闭
|
||
process.on('SIGTERM', async () => {
|
||
logger.info('SIGTERM received, closing server gracefully...');
|
||
await app.close();
|
||
process.exit(0);
|
||
});
|
||
```
|
||
|
||
#### **任务4.3:DC模块改造(次优先)**
|
||
|
||
```typescript
|
||
// 文件:backend/src/modules/dc/tool-b/services/MedicalRecordExtractionService.ts
|
||
// (仅示例,实际根据需求实现)
|
||
|
||
import { jobQueue } from '../../../../common/jobs/index.js';
|
||
|
||
export function registerMedicalExtractionWorkers() {
|
||
jobQueue.process('dc:medical-extraction', async (job) => {
|
||
const { taskId, recordIds } = job.data;
|
||
|
||
// 批量提取病历
|
||
for (const recordId of recordIds) {
|
||
await extractSingleRecord(recordId);
|
||
|
||
// 更新进度
|
||
// ...
|
||
}
|
||
|
||
return { success: true };
|
||
});
|
||
}
|
||
```
|
||
|
||
---
|
||
|
||
### 4.5 Phase 5:测试验证(Day 6,1天)
|
||
|
||
#### **任务5.1:单元测试**
|
||
|
||
```typescript
|
||
// 文件:backend/tests/common/cache/PostgresCacheAdapter.test.ts
|
||
|
||
import { describe, it, expect, beforeEach } from 'vitest';
|
||
import { PostgresCacheAdapter } from '../../../src/common/cache/PostgresCacheAdapter.js';
|
||
import { prisma } from '../../../src/config/database.js';
|
||
|
||
describe('PostgresCacheAdapter', () => {
|
||
let cache: PostgresCacheAdapter;
|
||
|
||
beforeEach(async () => {
|
||
cache = new PostgresCacheAdapter();
|
||
// 清空测试数据
|
||
await prisma.appCache.deleteMany({});
|
||
});
|
||
|
||
it('should set and get cache', async () => {
|
||
await cache.set('test:key', { value: 'hello' }, 60);
|
||
const result = await cache.get('test:key');
|
||
expect(result).toEqual({ value: 'hello' });
|
||
});
|
||
|
||
it('should return null for expired cache', async () => {
|
||
await cache.set('test:key', { value: 'hello' }, 1); // 1秒过期
|
||
await new Promise(resolve => setTimeout(resolve, 1100)); // 等待1.1秒
|
||
const result = await cache.get('test:key');
|
||
expect(result).toBeNull();
|
||
});
|
||
|
||
it('should delete cache', async () => {
|
||
await cache.set('test:key', { value: 'hello' }, 60);
|
||
await cache.delete('test:key');
|
||
const result = await cache.get('test:key');
|
||
expect(result).toBeNull();
|
||
});
|
||
|
||
// ... 更多测试 ...
|
||
});
|
||
```
|
||
|
||
#### **任务5.2:集成测试**
|
||
|
||
```bash
|
||
# 1. 启动本地Postgres
|
||
docker start ai-clinical-postgres
|
||
|
||
# 2. 设置环境变量
|
||
export CACHE_TYPE=postgres
|
||
export QUEUE_TYPE=pgboss
|
||
export DATABASE_URL=postgresql://postgres:123456@localhost:5432/aiclincial
|
||
|
||
# 3. 运行迁移
|
||
cd backend
|
||
npx prisma migrate deploy
|
||
|
||
# 4. 启动应用
|
||
npm run dev
|
||
|
||
# 应该看到日志:
|
||
# [CacheFactory] 使用PostgresCacheAdapter
|
||
# [JobFactory] 使用PgBossQueue
|
||
# [PgBoss] 队列已启动
|
||
# [PostgresCache] 定时清理任务已启动
|
||
# ✅ ASL筛选Worker已注册
|
||
```
|
||
|
||
#### **任务5.3:功能测试**
|
||
|
||
```bash
|
||
# 测试1:缓存功能
|
||
curl -X POST http://localhost:3001/api/v1/asl/projects/:projectId/screening
|
||
|
||
# 观察日志:
|
||
# [PostgresCache] 缓存命中 { key: 'asl:llm:...' }
|
||
|
||
# 测试2:队列功能
|
||
# 提交任务 → 观察任务状态变化
|
||
# pending → running → completed
|
||
|
||
# 测试3:实例重启恢复
|
||
# 提交任务 → 等待处理到50% → Ctrl+C停止 → 重新启动
|
||
# 任务应该自动恢复并继续
|
||
```
|
||
|
||
---
|
||
|
||
## 5. 优先级与依赖关系
|
||
|
||
### 5.1 改造优先级矩阵(✨ 已更新)
|
||
|
||
| 模块 | 优先级 | 工作量 | 业务价值 | 风险 | 依赖 | 状态 |
|
||
|------|--------|--------|---------|------|------|------|
|
||
| **环境准备** | P0 | 0.5天 | - | 低 | 无 | ⬜ 待开始 |
|
||
| **PostgresCacheAdapter** | P0 | 0.5天 | 降低LLM成本50% | 低 | 环境准备 | ⬜ 待开始 |
|
||
| **PgBossQueue** | P0 | 2天 | 长任务可靠性 | 中 | 环境准备 | ⬜ 待开始 |
|
||
| **任务拆分机制** | P0 | 1天 | 任务成功率 > 99% | 中 | PgBossQueue | ⬜ 待开始 |
|
||
| **断点续传机制** | P0 | 1天 | 容错性提升10倍 | 低 | PgBossQueue | ⬜ 待开始 |
|
||
| **ASL筛选改造** | P0 | 1.5天 | 用户核心功能 | 中 | 任务拆分+断点 | ⬜ 待开始 |
|
||
| **DC提取改造** | P1 | 1天 | 用户核心功能 | 中 | 任务拆分+断点 | ⬜ 待开始 |
|
||
| **测试验证** | P0 | 1.5天 | 保证质量 | 低 | 所有改造 | ⬜ 待开始 |
|
||
| **SAE部署** | P1 | 0.5天 | 生产就绪 | 中 | 测试验证 | ⬜ 待开始 |
|
||
|
||
**总工作量:** 9天(比V1.0增加2天,增加任务拆分和断点续传)
|
||
|
||
### 5.2 依赖关系图(✨ 已更新)
|
||
|
||
```
|
||
Day 1: 环境准备 ────────────────┐
|
||
│
|
||
Day 1: PostgresCache ──────────┤
|
||
│
|
||
Day 2-3: PgBossQueue ───────────┤
|
||
│
|
||
Day 4: 任务拆分机制 ─────────────┤
|
||
├─→ Day 6-7: ASL筛选改造 ──┐
|
||
Day 5: 断点续传机制 ─────────────┤ │
|
||
│ │
|
||
Day 7: DC提取改造 ──────────────┘ │
|
||
│
|
||
├─→ Day 8-9: 测试 + 部署
|
||
```
|
||
|
||
### 5.3 可并行工作(✨ 已更新)
|
||
|
||
```
|
||
Day 1(并行):
|
||
├─ 环境准备(上午)
|
||
└─ PostgresCache实现(下午)
|
||
|
||
Day 2-3(串行):
|
||
└─ PgBossQueue实现(必须等环境准备完成)
|
||
|
||
Day 4-5(可并行):
|
||
├─ 任务拆分机制(工具函数)
|
||
└─ 断点续传机制(数据库字段)
|
||
|
||
Day 6-7(串行):
|
||
├─ ASL筛选改造(使用拆分+断点)
|
||
└─ DC提取改造(复用拆分+断点)
|
||
|
||
Day 8-9(并行):
|
||
├─ 测试验证
|
||
└─ 文档完善
|
||
```
|
||
|
||
---
|
||
|
||
## 6. 测试验证方案(✨ 已更新)
|
||
|
||
### 6.1 测试清单
|
||
|
||
#### **功能测试(基础)**
|
||
```bash
|
||
✅ 缓存读写正常
|
||
✅ 缓存过期自动清理(每分钟1000条)
|
||
✅ 缓存多实例共享(实例A写→实例B读)
|
||
✅ 队列任务入队(pg-boss)
|
||
✅ 队列任务处理(Worker)
|
||
✅ 队列任务重试(模拟失败,3次重试)
|
||
✅ 队列实例重启恢复
|
||
```
|
||
|
||
#### **任务拆分测试** ✨ **新增**
|
||
```bash
|
||
✅ 任务拆分工具函数正确性
|
||
- splitIntoChunks([1..100], 30) → 4批(30+30+30+10)
|
||
- recommendChunkSize(1000, 7.2, 900) → 125
|
||
|
||
✅ 拆分任务入队
|
||
- 1000篇文献 → 10批次,每批100篇
|
||
- 验证数据库:totalBatches=10, processedBatches=0
|
||
|
||
✅ 批次并行处理
|
||
- 多个Worker同时处理不同批次
|
||
- 验证无冲突(SKIP LOCKED)
|
||
|
||
✅ 批次失败重试
|
||
- 模拟第3批失败 → 自动重试
|
||
- 其他批次不受影响
|
||
```
|
||
|
||
#### **断点续传测试** ✨ **新增**
|
||
```bash
|
||
✅ 断点保存
|
||
- 处理到第50项 → 保存断点
|
||
- 验证数据库:currentIndex=50, lastCheckpoint更新
|
||
|
||
✅ 断点恢复
|
||
- 任务中断(Ctrl+C)
|
||
- 重启服务 → 从第50项继续
|
||
- 验证:前50项不重复处理
|
||
|
||
✅ 批次级断点
|
||
- 10批次任务,完成前3批
|
||
- 实例重启 → 从第4批继续
|
||
- 验证:processedBatches=3, currentBatchIndex=3
|
||
```
|
||
|
||
#### **长时间任务测试** 🔴 **重点**
|
||
```bash
|
||
✅ 1000篇文献筛选(约2小时)
|
||
- 拆分成10批,每批12分钟
|
||
- 成功率 > 99%
|
||
- 验证进度更新(每10篇)
|
||
|
||
✅ 10000篇文献筛选(约20小时)
|
||
- 拆分成100批,每批12分钟
|
||
- 10个Worker并行 → 2小时完成
|
||
- 成功率 > 99.5%
|
||
|
||
✅ 实例重启恢复(关键测试)
|
||
- 启动任务 → 等待50% → 停止服务(Ctrl+C)
|
||
- 重启服务 → 任务自动恢复
|
||
- 验证:从50%继续,不从0%开始
|
||
- 预期:总耗时 ≈ 原计划时间 × 1.05(误差5%内)
|
||
```
|
||
|
||
#### **性能测试**
|
||
```bash
|
||
✅ 缓存读取延迟 < 5ms(P99)
|
||
✅ 缓存写入延迟 < 10ms(P99)
|
||
✅ 队列吞吐量 > 100任务/小时
|
||
✅ 断点保存延迟 < 20ms
|
||
✅ 批次切换延迟 < 100ms
|
||
```
|
||
|
||
#### **故障测试(增强)**
|
||
```bash
|
||
✅ 实例销毁(SAE缩容)
|
||
- 正在处理任务 → 实例销毁
|
||
- 等待10分钟 → 新实例接管
|
||
- 任务从断点恢复
|
||
|
||
✅ 数据库连接断开
|
||
- 处理任务中 → 断开连接
|
||
- 自动重连 → 继续处理
|
||
|
||
✅ 任务处理失败
|
||
- 模拟LLM超时
|
||
- 自动重试3次
|
||
- 失败后标记为failed
|
||
|
||
✅ Postgres慢查询
|
||
- 模拟数据库慢(> 5秒)
|
||
- 任务不失败,等待完成
|
||
|
||
✅ 并发冲突
|
||
- 2个Worker领取同一批次
|
||
- pg-boss SKIP LOCKED机制
|
||
- 验证:只有1个Worker处理
|
||
|
||
✅ 发布更新(生产场景)
|
||
- 15:00发布更新
|
||
- 正在执行的5批任务
|
||
- 实例重启 → 5批任务自动恢复
|
||
```
|
||
|
||
### 6.2 测试脚本
|
||
|
||
#### **测试1:完整流程(1000篇文献)**
|
||
|
||
```bash
|
||
# 1. 准备测试数据
|
||
cd backend
|
||
npm run test:seed -- --literatures=1000
|
||
|
||
# 2. 启动服务
|
||
npm run dev
|
||
|
||
# 3. 提交任务
|
||
curl -X POST http://localhost:3001/api/v1/asl/projects/:projectId/screening \
|
||
-H "Content-Type: application/json"
|
||
|
||
# 4. 观察日志
|
||
# [TaskSplit] 任务拆分完成 { total: 1000, chunkSize: 100, chunks: 10 }
|
||
# [PgBoss] 任务入队 { type: 'asl:title-screening-batch', jobId: '1' }
|
||
# ...
|
||
# [PgBoss] 批次处理完成 { taskId, batchIndex: 0, progress: '1/10' }
|
||
|
||
# 5. 查询进度
|
||
curl http://localhost:3001/api/v1/asl/tasks/:taskId
|
||
|
||
# 预期响应:
|
||
# {
|
||
# "id": "task_123",
|
||
# "status": "running",
|
||
# "totalItems": 1000,
|
||
# "processedItems": 350,
|
||
# "totalBatches": 10,
|
||
# "processedBatches": 3,
|
||
# "progress": 0.35
|
||
# }
|
||
```
|
||
|
||
#### **测试2:断点恢复(关键测试)**
|
||
|
||
```bash
|
||
# 1. 启动任务
|
||
curl -X POST http://localhost:3001/api/v1/asl/projects/:projectId/screening
|
||
|
||
# 2. 等待处理到50%
|
||
# 观察日志:processedItems: 500
|
||
|
||
# 3. 强制停止服务
|
||
# Ctrl+C 或 kill -9 <pid>
|
||
|
||
# 4. 重新启动服务
|
||
npm run dev
|
||
|
||
# 5. 观察日志
|
||
# [Checkpoint] 断点恢复 { taskId, startIndex: 500 }
|
||
# [PgBoss] 开始处理任务 { batchIndex: 5 } ← 从第6批继续
|
||
|
||
# 6. 验证最终结果
|
||
# 总耗时应该约等于 2小时 + 重启时间(< 5分钟)
|
||
# 不应该是 4小时(从头开始)
|
||
```
|
||
|
||
#### **测试3:并发处理(10000篇)**
|
||
|
||
```bash
|
||
# 1. 准备大数据集
|
||
npm run test:seed -- --literatures=10000
|
||
|
||
# 2. 启动多个Worker实例(模拟SAE多实例)
|
||
# Terminal 1
|
||
npm run dev -- --port=3001
|
||
|
||
# Terminal 2
|
||
npm run dev -- --port=3002
|
||
|
||
# Terminal 3
|
||
npm run dev -- --port=3003
|
||
|
||
# 3. 提交任务(任意一个实例)
|
||
curl -X POST http://localhost:3001/api/v1/asl/projects/:projectId/screening
|
||
|
||
# 4. 观察三个实例的日志
|
||
# 应该看到:3个实例同时处理不同批次
|
||
# Worker1: 处理批次 0, 3, 6, 9, ...
|
||
# Worker2: 处理批次 1, 4, 7, 10, ...
|
||
# Worker3: 处理批次 2, 5, 8, 11, ...
|
||
|
||
# 5. 验证完成时间
|
||
# 100批次 / 3个Worker ≈ 33.3批轮次 × 12分钟 ≈ 6.6小时
|
||
# 单Worker需要:100批 × 12分钟 = 20小时
|
||
# 加速比:20 / 6.6 ≈ 3倍
|
||
```
|
||
|
||
### 6.3 监控指标(✨ 已更新)
|
||
|
||
```typescript
|
||
// 缓存监控
|
||
- cache_hit_rate: 命中率 (目标 > 60%)
|
||
- cache_total_count: 总数
|
||
- cache_expired_count: 过期数量
|
||
- cache_by_module: 各模块分布
|
||
|
||
// 队列监控(基础)
|
||
- queue_pending_count: 待处理任务
|
||
- queue_processing_count: 处理中任务
|
||
- queue_completed_count: 完成任务
|
||
- queue_failed_count: 失败任务
|
||
- queue_avg_duration: 平均耗时
|
||
|
||
// 任务拆分监控 ✨ 新增
|
||
- task_total_batches: 总批次数
|
||
- task_processed_batches: 已完成批次数
|
||
- task_batch_success_rate: 批次成功率 (目标 > 99%)
|
||
- task_avg_batch_duration: 平均批次耗时
|
||
|
||
// 断点续传监控 ✨ 新增
|
||
- checkpoint_save_count: 断点保存次数
|
||
- checkpoint_restore_count: 断点恢复次数
|
||
- checkpoint_save_duration: 保存耗时 (目标 < 20ms)
|
||
- task_recovery_success_rate: 恢复成功率 (目标 100%)
|
||
```
|
||
|
||
### 6.4 成功标准(✨ 已更新)
|
||
|
||
```bash
|
||
基础功能:
|
||
✅ 所有单元测试通过
|
||
✅ 所有集成测试通过
|
||
✅ 缓存命中率 > 60%
|
||
✅ LLM API调用次数下降 > 40%
|
||
|
||
任务可靠性 🔴 关键:
|
||
✅ 1000篇文献筛选成功率 > 99%
|
||
✅ 10000篇文献筛选成功率 > 99.5%
|
||
✅ 实例重启后任务自动恢复成功率 100%
|
||
✅ 断点恢复后不重复处理已完成项
|
||
✅ 批次失败只需重试单批(不重试全部)
|
||
|
||
性能指标:
|
||
✅ 单批次处理时间 < 15分钟
|
||
✅ 断点保存延迟 < 20ms
|
||
✅ 10个Worker并行加速比 > 8倍
|
||
|
||
生产验证:
|
||
✅ 生产环境运行48小时无错误
|
||
✅ 处理3个完整的1000篇文献筛选任务
|
||
✅ 至少1次实例重启恢复测试成功
|
||
✅ 无用户投诉任务丢失
|
||
✅ 系统可用性 > 99.9%
|
||
```
|
||
|
||
---
|
||
|
||
## 7. 上线与回滚
|
||
|
||
### 7.1 上线步骤
|
||
|
||
```bash
|
||
# Step 1: 数据库迁移(生产环境)
|
||
npx prisma migrate deploy
|
||
|
||
# Step 2: 更新SAE环境变量
|
||
CACHE_TYPE=postgres
|
||
QUEUE_TYPE=pgboss
|
||
|
||
# Step 3: 灰度发布(1个实例)
|
||
# 观察24小时,监控指标正常
|
||
|
||
# Step 4: 全量发布(2-3个实例)
|
||
# 逐步扩容
|
||
|
||
# Step 5: 清理旧代码
|
||
# 移除MemoryQueue相关代码(可选)
|
||
```
|
||
|
||
### 7.2 回滚方案
|
||
|
||
```bash
|
||
# 如果出现问题,立即回滚:
|
||
|
||
# 方案1:环境变量回滚(最快)
|
||
CACHE_TYPE=memory
|
||
QUEUE_TYPE=memory
|
||
# 重启应用,降级到内存模式
|
||
|
||
# 方案2:代码回滚
|
||
git revert <commit>
|
||
# 回滚到改造前版本
|
||
|
||
# 方案3:数据库回滚
|
||
npx prisma migrate down
|
||
# 删除 app_cache 表(可选)
|
||
```
|
||
|
||
### 7.3 风险预案
|
||
|
||
| 风险 | 概率 | 影响 | 预案 |
|
||
|------|------|------|------|
|
||
| Postgres性能不足 | 低 | 中 | 回滚到内存模式 |
|
||
| pg-boss连接失败 | 低 | 高 | 降级到同步处理 |
|
||
| 缓存数据过大 | 低 | 低 | 增加清理频率 |
|
||
| 长任务卡死 | 低 | 中 | 手动kill任务 |
|
||
|
||
---
|
||
|
||
## 8. 成功标准(✨ V2.0更新)
|
||
|
||
### 8.1 技术指标
|
||
|
||
| 指标类别 | 指标 | 目标值 | 衡量方法 |
|
||
|---------|------|--------|---------|
|
||
| **缓存** | 命中率 | > 60% | 监控统计 |
|
||
| | 持久化 | ✅ 实例重启不丢失 | 重启测试 |
|
||
| | 多实例共享 | ✅ A写B能读 | 并发测试 |
|
||
| **队列** | 任务持久化 | ✅ 实例销毁不丢失 | 销毁测试 |
|
||
| | 长任务可靠性 | > 99% | 1000篇筛选 |
|
||
| | 超长任务可靠性 | > 99.5% | 10000篇筛选 |
|
||
| **拆分** | 批次成功率 | > 99% | 批次统计 |
|
||
| | 批次耗时 | < 15分钟 | 监控统计 |
|
||
| | 并行加速比 | > 8倍(10 Worker) | 对比测试 |
|
||
| **断点** | 断点保存延迟 | < 20ms | 性能测试 |
|
||
| | 恢复成功率 | 100% | 重启测试 |
|
||
| | 重复处理率 | 0% | 数据验证 |
|
||
|
||
### 8.2 业务指标
|
||
|
||
| 指标 | 改造前 | 改造后 | 改进幅度 |
|
||
|------|--------|--------|---------|
|
||
| **LLM API成本** | 基线 | -40~60% | 节省¥X/月 |
|
||
| **任务成功率** | 10-30% | > 99% | 提升3-10倍 |
|
||
| **用户重复提交** | 平均3次 | < 1.1次 | 减少70% |
|
||
| **任务完成时间** | 不确定(可能失败) | 稳定可预测 | 体验提升 |
|
||
| **用户满意度** | 基线 | 显著提升 | 问卷调查 |
|
||
|
||
### 8.3 验收清单
|
||
|
||
```bash
|
||
Phase 1-3:基础设施 ✅
|
||
✅ PostgresCacheAdapter实现并通过测试
|
||
✅ PgBossQueue实现并通过测试
|
||
✅ 本地环境验证通过
|
||
|
||
Phase 4-5:高级特性 ✅
|
||
✅ 任务拆分工具函数测试通过
|
||
✅ 断点续传服务测试通过
|
||
✅ 数据库Schema迁移成功
|
||
|
||
Phase 6-7:业务集成 ✅
|
||
✅ ASL筛选服务改造完成
|
||
✅ DC提取服务改造完成
|
||
✅ Worker注册成功
|
||
|
||
关键功能测试 🔴:
|
||
✅ 1000篇文献筛选(2小时)成功率 > 99%
|
||
✅ 实例重启恢复测试通过(3次)
|
||
✅ 断点续传测试通过(从50%恢复)
|
||
✅ 批次并行处理测试通过
|
||
✅ 失败重试测试通过
|
||
|
||
生产环境验证 🔴:
|
||
✅ 生产环境运行48小时无致命错误
|
||
✅ 完成至少3个真实用户任务(1000篇+)
|
||
✅ 至少经历1次SAE实例重启,任务成功恢复
|
||
✅ 缓存命中率 > 60%
|
||
✅ LLM API调用量下降 > 40%
|
||
✅ 无用户投诉任务丢失
|
||
```
|
||
|
||
---
|
||
|
||
## 9. 附录
|
||
|
||
### 9.1 参考文档
|
||
|
||
- [Postgres-Only 全能架构解决方案](./08-Postgres-Only 全能架构解决方案.md)
|
||
- [pg-boss 官方文档](https://github.com/timgit/pg-boss)
|
||
- [Prisma 多Schema支持](https://www.prisma.io/docs/concepts/components/prisma-schema/multi-schema)
|
||
|
||
### 9.2 关键代码位置(✨ V2.0更新)
|
||
|
||
```
|
||
backend/src/
|
||
├── common/cache/ # 缓存系统
|
||
│ ├── CacheAdapter.ts # ✅ 已存在
|
||
│ ├── CacheFactory.ts # ⚠️ 需修改(+10行)
|
||
│ ├── MemoryCacheAdapter.ts # ✅ 已存在
|
||
│ ├── RedisCacheAdapter.ts # 🔴 占位符(不用管)
|
||
│ ├── PostgresCacheAdapter.ts # ❌ 需新增(~300行)
|
||
│ └── index.ts # ⚠️ 需修改(导出)
|
||
│
|
||
├── common/jobs/ # 任务队列
|
||
│ ├── types.ts # ✅ 已存在
|
||
│ ├── JobFactory.ts # ⚠️ 需修改(+10行)
|
||
│ ├── MemoryQueue.ts # ✅ 已存在
|
||
│ ├── PgBossQueue.ts # ❌ 需新增(~400行)
|
||
│ ├── utils.ts # ❌ 需新增(~200行)✨
|
||
│ ├── CheckpointService.ts # ❌ 需新增(~150行)✨
|
||
│ └── index.ts # ⚠️ 需修改(导出)
|
||
│
|
||
├── modules/asl/services/ # ASL业务层
|
||
│ ├── screeningService.ts # ⚠️ 需改造(~150行改动)
|
||
│ └── llmScreeningService.ts # ✅ 无需改动
|
||
│
|
||
├── modules/dc/tool-b/services/ # DC业务层
|
||
│ └── (类似ASL,按需改造)
|
||
│
|
||
├── config/
|
||
│ └── env.ts # ⚠️ 需添加环境变量
|
||
│
|
||
└── index.ts # ⚠️ 需修改(注册Workers + 启动清理)
|
||
|
||
prisma/
|
||
├── schema.prisma # ⚠️ 需修改(+AppCache +字段)
|
||
└── migrations/ # 自动生成
|
||
|
||
tests/ # 测试文件
|
||
├── common/cache/
|
||
│ └── PostgresCacheAdapter.test.ts # ❌ 需新增
|
||
├── common/jobs/
|
||
│ ├── PgBossQueue.test.ts # ❌ 需新增
|
||
│ ├── utils.test.ts # ❌ 需新增 ✨
|
||
│ └── CheckpointService.test.ts # ❌ 需新增 ✨
|
||
└── modules/asl/
|
||
└── screening-integration.test.ts # ❌ 需新增
|
||
|
||
backend/.env # ⚠️ 需修改
|
||
```
|
||
|
||
**文件状态说明:**
|
||
- ✅ 已存在 - 无需改动
|
||
- ⚠️ 需修改 - 少量改动(< 50行)
|
||
- ❌ 需新增 - 全新文件
|
||
- 🔴 占位符 - 忽略(不影响本次改造)
|
||
- ✨ V2.0新增 - 支持拆分+断点
|
||
|
||
**代码行数统计:**
|
||
```
|
||
总新增代码:~1800行
|
||
├─ PostgresCacheAdapter.ts: 300行
|
||
├─ PgBossQueue.ts: 400行
|
||
├─ utils.ts: 200行 ✨
|
||
├─ CheckpointService.ts: 150行 ✨
|
||
├─ screeningService.ts改造: 200行
|
||
├─ 测试代码: 400行
|
||
└─ 其他(Factory、导出等): 150行
|
||
|
||
总修改代码:~100行
|
||
├─ CacheFactory.ts: 10行
|
||
├─ JobFactory.ts: 10行
|
||
├─ index.ts: 20行
|
||
├─ env.ts: 20行
|
||
├─ schema.prisma: 40行
|
||
└─ 各处导出: 约10处
|
||
```
|
||
|
||
---
|
||
|
||
## 10. V2.0 vs V1.0 对比
|
||
|
||
| 维度 | V1.0(原计划) | V2.0(当前版本) | 变化 |
|
||
|------|--------------|-----------------|------|
|
||
| **工作量** | 7天 | 9天 | +2天 |
|
||
| **代码行数** | ~1000行 | ~1900行 | +900行 |
|
||
| **核心策略** | 缓存+队列 | 缓存+队列+拆分+断点 | +2个策略 |
|
||
| **长任务支持** | < 4小时 | 任意时长(拆分后) | 质的提升 |
|
||
| **任务成功率** | 85-90% | > 99% | 提升10%+ |
|
||
| **实例重启恢复** | 从头开始 | 断点续传 | 避免浪费 |
|
||
| **并发能力** | 单实例串行 | 多实例并行 | 加速N倍 |
|
||
| **生产就绪度** | 基本可用 | 完全就绪 | 企业级 |
|
||
|
||
**为什么增加2天?**
|
||
- Day 4:任务拆分机制(工具函数+测试)
|
||
- Day 5:断点续传机制(服务+数据库)
|
||
|
||
**增加的价值:**
|
||
- ✅ 长任务可靠性从85% → 99%+
|
||
- ✅ 支持任意时长任务(通过拆分)
|
||
- ✅ 实例重启不浪费已处理结果
|
||
- ✅ 多Worker并行,速度提升N倍
|
||
- ✅ 符合Serverless最佳实践
|
||
|
||
**结论:** 多花2天,换取质的飞跃,**非常值得**!
|
||
|
||
---
|
||
|
||
## 11. 快速开始
|
||
|
||
### 11.1 一键检查清单
|
||
|
||
```bash
|
||
# 1. 检查代码结构(应该都存在)
|
||
ls backend/src/common/cache/CacheAdapter.ts # ✅
|
||
ls backend/src/common/cache/MemoryCacheAdapter.ts # ✅
|
||
ls backend/src/common/jobs/types.ts # ✅
|
||
ls backend/src/common/jobs/MemoryQueue.ts # ✅
|
||
|
||
# 2. 检查需要新增的文件(应该不存在)
|
||
ls backend/src/common/cache/PostgresCacheAdapter.ts # ❌ 待新增
|
||
ls backend/src/common/jobs/PgBossQueue.ts # ❌ 待新增
|
||
ls backend/src/common/jobs/utils.ts # ❌ 待新增
|
||
ls backend/src/common/jobs/CheckpointService.ts # ❌ 待新增
|
||
|
||
# 3. 检查依赖
|
||
cd backend
|
||
npm list pg-boss # ❌ 需安装
|
||
|
||
# 4. 检查数据库
|
||
psql -d aiclincial -c "\dt platform_schema.*"
|
||
# 应该看到现有的业务表,但没有app_cache
|
||
```
|
||
|
||
### 11.2 立即开始
|
||
|
||
```bash
|
||
# Phase 1:环境准备(30分钟)
|
||
cd backend
|
||
npm install pg-boss --save
|
||
# 修改 prisma/schema.prisma(添加AppCache)
|
||
npx prisma migrate dev --name add_postgres_cache
|
||
npx prisma generate
|
||
|
||
# Phase 2:实现PostgresCacheAdapter(4小时)
|
||
# 创建 src/common/cache/PostgresCacheAdapter.ts
|
||
# 修改 src/common/cache/CacheFactory.ts
|
||
# 测试验证
|
||
|
||
# Phase 3:实现PgBossQueue(16小时,2天)
|
||
# 创建 src/common/jobs/PgBossQueue.ts
|
||
# 修改 src/common/jobs/JobFactory.ts
|
||
# 测试验证
|
||
|
||
# ... 按计划继续
|
||
```
|
||
|
||
---
|
||
|
||
**🎯 现在就开始?**
|
||
|
||
建议:
|
||
1. **先通读文档** - 理解整体架构(30分钟)
|
||
2. **验证代码结构** - 确认真实文件(10分钟)
|
||
3. **开始Phase 1** - 环境准备(30分钟)
|
||
4. **逐步推进** - 每完成一个Phase就测试
|
||
|
||
有任何问题随时沟通!🚀
|
||
|