Files
AIclinicalresearch/docs/07-运维文档/09-Postgres-Only改造实施计划(完整版).md
HaHafeng 66255368b7 feat(admin): Add user management and upgrade to module permission system
Features - User Management (Phase 4.1):
- Database: Add user_modules table for fine-grained module permissions
- Database: Add 4 user permissions (view/create/edit/delete) to role_permissions
- Backend: UserService (780 lines) - CRUD with tenant isolation
- Backend: UserController + UserRoutes (648 lines) - 13 API endpoints
- Backend: Batch import users from Excel
- Frontend: UserListPage (412 lines) - list/filter/search/pagination
- Frontend: UserFormPage (341 lines) - create/edit with module config
- Frontend: UserDetailPage (393 lines) - details/tenant/module management
- Frontend: 3 modal components (592 lines) - import/assign/configure
- API: GET/POST/PUT/DELETE /api/admin/users/* endpoints

Architecture Upgrade - Module Permission System:
- Backend: Add getUserModules() method in auth.service
- Backend: Login API returns modules array in user object
- Frontend: AuthContext adds hasModule() method
- Frontend: Navigation filters modules based on user.modules
- Frontend: RouteGuard checks requiredModule instead of requiredVersion
- Frontend: Remove deprecated version-based permission system
- UX: Only show accessible modules in navigation (clean UI)
- UX: Smart redirect after login (avoid 403 for regular users)

Fixes:
- Fix UTF-8 encoding corruption in ~100 docs files
- Fix pageSize type conversion in userService (String to Number)
- Fix authUser undefined error in TopNavigation
- Fix login redirect logic with role-based access check
- Update Git commit guidelines v1.2 with UTF-8 safety rules

Database Changes:
- CREATE TABLE user_modules (user_id, tenant_id, module_code, is_enabled)
- ADD UNIQUE CONSTRAINT (user_id, tenant_id, module_code)
- INSERT 4 permissions + role assignments
- UPDATE PUBLIC tenant with 8 module subscriptions

Technical:
- Backend: 5 new files (~2400 lines)
- Frontend: 10 new files (~2500 lines)
- Docs: 1 development record + 2 status updates + 1 guideline update
- Total: ~4900 lines of code

Status: User management 100% complete, module permission system operational
2026-01-16 13:42:10 +08:00

82 KiB
Raw Permalink Blame History

Postgres-Only 架构改造实施计划(完整版)

文档版本: V2.0 已更新
创建日期: 2025-12-13
更新日期: 2025-12-13
目标完成时间: 2025-12-207天
负责人: 技术团队
风险等级: 🟢 低(基于成熟方案,有降级策略)
核心原则: 通用能力层优先,复用架构优势


⚠️ V2.0 更新说明

本版本基于真实代码结构验证技术方案深度分析进行了重要更新:

更新1代码结构真实性验证

  • 已从实际代码验证所有路径和文件
  • 明确标注"已存在"、"占位符"、"需新增"
  • RedisCacheAdapter确认为未实现的占位符
  • 所有改造点基于真实代码结构

更新2长任务可靠性策略 🔴

  • 新增:断点续传机制策略2- 强烈推荐
  • 新增:任务拆分策略策略3- 架构级优化
  • 分析心跳续租机制策略1- 不推荐实施
  • 明确pg-boss的技术限制最长4小时推荐2小时
  • 修正24小时连续任务不支持需拆分

更新3实施计划调整

  • 工作量增加到9天增加任务拆分和断点续传
  • 完整的代码示例(可直接使用)
  • 数据库Schema更新新增字段

📋 目录

  1. 改造背景与目标
  2. 当前系统分析
  3. 改造总体架构
  4. 详细实施步骤
  5. 优先级与依赖关系
  6. 测试验证方案
  7. 上线与回滚

1. 改造背景与目标

1.1 为什么选择Postgres-Only

核心痛点:
❌ 长任务不可靠2小时任务实例销毁丢失
❌ LLM成本失控缓存不持久化
❌ 多实例不同步(内存缓存各自独立)

技术方案选择:
✅ Postgres-Only推荐
  - 架构简单1-2人团队
  - 运维成本低复用RDS
  - 数据一致性强(事务保证)
  - 节省成本¥1000+/年)

❌ Redis方案不推荐
  - 架构复杂(双系统)
  - 运维负担重需维护Redis
  - 数据一致性弱(双写问题)
  - 额外成本¥1000+/年)

1.2 改造目标

目标 当前状态 改造后 衡量指标
长任务可靠性 5-10%(实例销毁丢失) > 99% 2小时任务成功率
LLM成本 重复调用多次 降低50%+ 月度API费用
缓存命中率 0%(不持久) 60%+ 监控统计
多实例同步 各自独立 共享缓存 实例A写→实例B读
架构复杂度 中等 中间件数量

2. 当前系统分析

2.1 代码结构现状3层架构- 已验证真实代码

验证方法: 通过 list_dirread_file 实际检查代码库
验证日期: 2025-12-13
验证结果: 所有路径和文件状态均已确认

AIclinicalresearch/backend/src/
├── common/                           # 🔵 通用能力层
│   ├── cache/                        # ✅ 真实存在
│   │   ├── CacheAdapter.ts           # ✅ 接口定义(真实)
│   │   ├── CacheFactory.ts           # ✅ 工厂模式(真实)
│   │   ├── index.ts                  # ✅ 导出文件(真实)
│   │   ├── MemoryCacheAdapter.ts     # ✅ 内存实现(真实,已完成)
│   │   ├── RedisCacheAdapter.ts      # 🔴 占位符(真实存在,但未实现)
│   │   └── PostgresCacheAdapter.ts   # ❌ 需新增(本次改造)
│   │
│   ├── jobs/                         # ✅ 真实存在
│   │   ├── types.ts                  # ✅ 接口定义(真实)
│   │   ├── JobFactory.ts             # ✅ 工厂模式(真实)
│   │   ├── index.ts                  # ✅ 导出文件(真实)
│   │   ├── MemoryQueue.ts            # ✅ 内存实现(真实,已完成)
│   │   └── PgBossQueue.ts            # ❌ 需新增(本次改造)
│   │
│   ├── storage/                      # ✅ 存储服务(已完善)
│   ├── logging/                      # ✅ 日志系统(已完善)
│   └── llm/                          # ✅ LLM网关已完善
│
├── modules/                          # 🟢 业务模块层
│   ├── asl/                          # ✅ 真实存在
│   │   ├── services/
│   │   │   ├── screeningService.ts      # ⚠️ 需改造:改为队列(真实)
│   │   │   └── llmScreeningService.ts   # ✅ 真实存在
│   │   └── common/llm/
│   │       └── LLM12FieldsService.ts    # ✅ 已用缓存真实第516行
│   │
│   └── dc/                           # ✅ 真实存在
│       └── tool-b/services/
│           └── HealthCheckService.ts     # ✅ 已用缓存真实第47行
│
└── config/                           # 🔵 平台基础层
    ├── database.ts                   # ✅ Prisma配置真实
    └── env.ts                        # ⚠️ 需添加新环境变量(真实文件)

重要发现:

  1. 3层架构真实存在 - 代码组织完全符合设计
  2. 工厂模式已实现 - CacheFactory和JobFactory真实可用
  3. 🔴 RedisCacheAdapter是占位符 - 所有方法都 throw new Error('Not implemented')
  4. 业务层已使用cache接口 - 改造时业务代码无需修改
  5. PostgresCacheAdapter和PgBossQueue需新增 - 本次改造的核心工作

2.2 缓存使用现状( 已验证)

位置 用途 TTL 数据量 重要性 代码行号 当前实现
LLM12FieldsService.ts LLM 12字段提取缓存 1小时 ~50KB/项 🔴 高(成本) 第516行 已用cache
HealthCheckService.ts Excel健康检查缓存 24小时 ~5KB/项 🟡 中等 第47行 已用cache

代码示例(真实代码):

// backend/src/modules/asl/common/llm/LLM12FieldsService.ts (第516行)
const cached = await cache.get(cacheKey);
if (cached) {
  logger.info('缓存命中', { cacheKey });
  return cached;
}
// ... LLM调用 ...
await cache.set(cacheKey, JSON.stringify(result), 3600);  // 1小时

// backend/src/modules/dc/tool-b/services/HealthCheckService.ts (第47行)
const cached = await cache.get<HealthCheckResult>(cacheKey);
if (cached) return cached;
// ... Excel解析 ...
await cache.set(cacheKey, result, 86400);  // 24小时

结论 缓存系统已在使用只需切换底层实现Memory → Postgres

2.3 队列使用现状( 已验证)

位置 用途 代码行号 耗时 重要性 当前实现 问题
screeningService.ts 文献筛选任务 第65行 2小时1000篇 🔴 同步执行 实例销毁丢失
DC Tool B 病历批量提取 未实现 1-3小时 🔴 未实现 不支持长任务

代码示例(真实代码):

// backend/src/modules/asl/services/screeningService.ts (第65行)
// 4. 异步处理文献(简化版:直接在这里处理)
// 生产环境应该发送到消息队列
processLiteraturesInBackground(task.id, projectId, literatures);  // ← 同步执行,有风险

结论 队列系统尚未使用,需要优先改造。注释已提示"生产环境应该发送到消息队列"。

2.4 长任务可靠性分析 🔴 新增

当前问题(真实场景)

场景11000篇文献筛选约2小时
├─ 问题1同步执行阻塞HTTP请求
├─ 问题2SAE实例15分钟无流量自动缩容
├─ 问题3实例重启任务从头开始
└─ 结果:任务成功率 < 10%用户需重复提交3-5次

场景210000篇文献筛选约20小时
├─ 问题1超过pg-boss最大锁定时间4小时
├─ 问题2任务被重复领取造成重复处理
└─ 结果:任务失败率 100%

场景3发布更新15:00
├─ 问题1正在执行的任务被强制终止
├─ 问题2已处理的文献结果丢失
└─ 结果:用户体验极差

技术限制分析

技术栈 限制 影响
SAE 15分钟无流量自动缩容 长任务必然失败
pg-boss 最长锁定4小时推荐2小时 超长任务不支持
HTTP请求 最长30秒超时 不能同步执行长任务
实例重启 内存状态丢失 任务进度丢失

解决方案评估

策略 价值 难度 pg-boss支持 推荐度 实施
策略1心跳续租 🔴 不支持 🟡 不推荐 暂不实施
策略2断点续传 🟢 兼容 🟢 强烈推荐 Phase 6
策略3任务拆分 🟡 原生支持 🟢 强烈推荐 Phase 5

结论:采用**策略2断点续传+ 策略3任务拆分**组合方案。


3. 改造总体架构

3.1 架构对比

改造前(当前):
┌─────────────────────────────────────┐
│  Business Layer (ASL, DC, SSA...)   │
│  ├─ screeningService.ts             │
│  │   └─ processInBackground()       │  ← 同步执行(阻塞)
│  └─ LLM12FieldsService.ts           │
│      └─ cache.get/set()             │  ← 使用Memory缓存
└─────────────────────────────────────┘
         ↓ 使用
┌─────────────────────────────────────┐
│  Capability Layer (Common)          │
│  ├─ cache                           │
│  │   ├─ MemoryCacheAdapter ✅       │  ← 当前使用
│  │   └─ RedisCacheAdapter 🔴        │  ← 占位符(未实现)
│  └─ jobs                            │
│      └─ MemoryQueue ✅              │  ← 当前使用
└─────────────────────────────────────┘
         ↓ 依赖
┌─────────────────────────────────────┐
│  Platform Layer                     │
│  └─ PostgreSQL (业务数据)           │
└─────────────────────────────────────┘

问题:
❌ 缓存不持久(实例重启丢失)
❌ 队列不持久(任务丢失)
❌ 多实例不共享(各自独立)
❌ 长任务不可靠2小时任务失败率 > 90%
❌ 违反Serverless原则同步执行长任务
改造后Postgres-Only + 任务拆分 + 断点续传):
┌──────────────────────────────────────────────────────────────┐
│  Business Layer (ASL, DC, SSA...)                            │
│  ├─ screeningService.ts                                      │
│  │   ├─ startScreeningTask()        │  ← 改造点1任务拆分   │
│  │   │   └─ 推送N个批次到队列        │                       │
│  │   └─ registerWorkers()           │                       │
│  │       └─ 处理单个批次+断点续传    │  ← 改造点2断点续传   │
│  └─ LLM12FieldsService.ts                                    │
│      └─ cache.get/set()             │  ← 无需改动(接口不变)│
└──────────────────────────────────────────────────────────────┘
         ↓ 使用
┌──────────────────────────────────────────────────────────────┐
│  Capability Layer (Common)                                   │
│  ├─ cache                                                    │
│  │   ├─ MemoryCacheAdapter ✅       │                       │
│  │   ├─ PostgresCacheAdapter ✨     │  ← 新增300行       │
│  │   └─ CacheFactory               │  ← 更新支持postgres    │
│  └─ jobs                                                     │
│      ├─ MemoryQueue ✅               │                       │
│      ├─ PgBossQueue ✨               │  ← 新增400行       │
│      └─ JobFactory                  │  ← 更新支持pgboss     │
└──────────────────────────────────────────────────────────────┘
         ↓ 依赖
┌──────────────────────────────────────────────────────────────┐
│  Platform Layer                                              │
│  ├─ PostgreSQL (RDS)                                         │
│  │   ├─ 业务数据asl_*, dc_*, ...                         │
│  │   ├─ platform.app_cache ✨       │  ← 缓存表(新增)       │
│  │   └─ platform.job_* ✨           │  ← 队列表pg-boss自动│
│  └─ 阿里云RDS特性                                            │
│      ├─ 自动备份(每天)                                      │
│      ├─ PITR时间点恢复                                    │
│      └─ 高可用(主从切换)                                    │
└──────────────────────────────────────────────────────────────┘

优势:
✅ 缓存持久化RDS自动备份
✅ 队列持久化(任务不丢失)
✅ 多实例共享SKIP LOCKED
✅ 事务一致性(业务+任务原子提交)
✅ 零额外运维复用RDS
✅ 长任务可靠(拆分+断点,成功率 > 99%
✅ 符合Serverless短任务可中断恢复

3.2 任务拆分策略 新增

拆分原则

目标:每个任务 < 15分钟远低于pg-boss 4小时限制

原因:
1. SAE实例15分钟无流量自动缩容
2. 失败重试成本低只需重试15分钟不是2小时
3. 进度可见性高(用户体验好)
4. 可并行处理多Worker同时工作

拆分策略表

任务类型 单项耗时 推荐批次大小 批次耗时 并发能力
ASL文献筛选 7.2秒/篇 100篇 12分钟 10批并行
DC病历提取 10秒/份 50份 8.3分钟 10批并行
统计分析 0.1秒/条 5000条 8.3分钟 20批并行

实际效果对比

场景10000篇文献筛选

不拆分(错误):
├─ 1个任务20小时
├─ 超过pg-boss 4小时限制 → 失败
└─ 成功率0%

拆分(正确):
├─ 100个批次每批100篇12分钟
├─ 10个Worker并行处理
├─ 总耗时100 / 10 = 10批轮次 × 12分钟 = 2小时
└─ 成功率:> 99.5%单批失败只需重试12分钟

3.3 断点续传机制 新增

核心思想

问题SAE实例随时可能重启发布更新、自动缩容

无断点:
├─ 处理到第900篇 → 实例重启
├─ 重新开始从第1篇开始
└─ 浪费时间900 × 7.2秒 = 108分钟

有断点:
├─ 每处理10篇保存进度到数据库
├─ 处理到第900篇 → 实例重启
├─ 重新开始从第900篇继续
└─ 浪费时间:< 1分钟

实现策略

// 数据库记录进度
model AslScreeningTask {
  processedItems  Int      // 已处理数量
  currentIndex    Int      // 当前游标(断点)
  lastCheckpoint  DateTime // 最后一次保存时间
  checkpointData  Json     // 断点详细数据
}

// Worker读取断点
const startIndex = task.currentIndex || 0;
for (let i = startIndex; i < items.length; i++) {
  await processItem(items[i]);
  
  // 每处理10项保存断点
  if ((i + 1) % 10 === 0) {
    await saveCheckpoint(i + 1);
  }
}

保存频率权衡

频率 数据库写入 重启浪费时间 推荐
每1项 很高(性能差) < 10秒 不推荐
每10项 中等 < 2分钟 推荐
每100项 < 12分钟 🟡 可选
不保存 重头开始 不可接受

3.4 Schema设计统一在platform 已更新

// prisma/schema.prisma

datasource db {
  provider = "postgresql"
  url      = env("DATABASE_URL")
  schemas  = ["platform_schema", "aia_schema", "pkb_schema", "asl_schema", 
              "dc_schema", "ssa_schema", "st_schema", "rvw_schema", 
              "admin_schema", "common_schema", "public"]
}

generator client {
  provider        = "prisma-client-js"
  previewFeatures = ["multiSchema"]  // ✨ 启用多Schema支持
}

// ==================== 平台基础设施platform_schema====================

/// 应用缓存表替代Redis
model AppCache {
  id        Int      @id @default(autoincrement())
  key       String   @unique @db.VarChar(500)
  value     Json
  expiresAt DateTime @map("expires_at")
  createdAt DateTime @default(now()) @map("created_at")

  @@index([expiresAt], name: "idx_app_cache_expires")
  @@index([key, expiresAt], name: "idx_app_cache_key_expires")
  @@map("app_cache")
  @@schema("platform_schema")  // ✨ 统一在platform Schema
}

// pg-boss会自动创建任务表不需要在Prisma中定义
// 表名platform_schema.job, platform_schema.version 等

// ==================== 业务模块asl_schema====================

/// ASL筛选任务表✨ 需要新增字段支持拆分+断点)
model AslScreeningTask {
  id              String   @id @default(uuid())
  projectId       String   @map("project_id")
  taskType        String   @map("task_type")
  status          String   // pending/running/completed/failed
  totalItems      Int      @map("total_items")
  processedItems  Int      @default(0) @map("processed_items")
  successItems    Int      @default(0) @map("success_items")
  failedItems     Int      @default(0) @map("failed_items")
  conflictItems   Int      @default(0) @map("conflict_items")
  
  // ✨ 新增:任务拆分支持
  totalBatches      Int      @default(1) @map("total_batches")
  processedBatches  Int      @default(0) @map("processed_batches")
  currentBatchIndex Int      @default(0) @map("current_batch_index")
  
  // ✨ 新增:断点续传支持
  currentIndex    Int       @default(0) @map("current_index")
  lastCheckpoint  DateTime? @map("last_checkpoint")
  checkpointData  Json?     @map("checkpoint_data")
  
  startedAt       DateTime  @map("started_at")
  completedAt     DateTime? @map("completed_at")
  createdAt       DateTime  @default(now()) @map("created_at")
  updatedAt       DateTime  @updatedAt @map("updated_at")

  project         AslScreeningProject @relation(fields: [projectId], references: [id])
  results         AslScreeningResult[]

  @@index([projectId])
  @@index([status])
  @@map("asl_screening_tasks")
  @@schema("asl_schema")
}

新增字段说明:

字段 类型 用途 示例值
totalBatches Int 总批次数 101000篇÷100篇/批)
processedBatches Int 已完成批次数 3已完成3批
currentBatchIndex Int 当前批次索引 3正在处理第4批
currentIndex Int 当前项索引(断点) 350已处理350篇
lastCheckpoint DateTime 最后一次保存断点时间 2025-12-13 10:30:00
checkpointData Json 断点详细数据 {"lastProcessedId": "lit_123", "batchProgress": 0.35}

3.5 Key/Queue命名规范

// 缓存Key规范逻辑隔离
const CACHE_KEY_PATTERNS = {
  // ASL模块
  'asl:llm:{hash}': 'LLM提取结果',
  'asl:pdf:{fileId}': 'PDF解析结果',
  
  // DC模块
  'dc:health:{fileHash}': 'Excel健康检查',
  'dc:extraction:{recordId}': '病历提取结果',
  
  // 全局
  'session:{userId}': '用户Session',
  'config:{key}': '系统配置',
};

// 队列Name规范
const QUEUE_NAMES = {
  ASL_TITLE_SCREENING: 'asl:title-screening',
  ASL_FULLTEXT_SCREENING: 'asl:fulltext-screening',
  DC_MEDICAL_EXTRACTION: 'dc:medical-extraction',
  DC_DATA_CLEANING: 'dc:data-cleaning',
  SSA_STATISTICAL_ANALYSIS: 'ssa:statistical-analysis',
};

4. 详细实施步骤

4.1 Phase 1环境准备Day 1上午0.5天)

任务1.1更新Prisma Schema

# 文件prisma/schema.prisma

修改点1启用multiSchema

generator client {
  provider        = "prisma-client-js"
  previewFeatures = ["multiSchema"]  // ← 新增
}

修改点2添加AppCache模型

/// 应用缓存表Postgres-Only架构
model AppCache {
  id        Int      @id @default(autoincrement())
  key       String   @unique @db.VarChar(500)
  value     Json
  expiresAt DateTime @map("expires_at")
  createdAt DateTime @default(now()) @map("created_at")

  @@index([expiresAt], name: "idx_app_cache_expires")
  @@index([key, expiresAt], name: "idx_app_cache_key_expires")
  @@map("app_cache")
  @@schema("platform_schema")
}

执行迁移

cd backend

# 1. 生成迁移文件
npx prisma migrate dev --name add_postgres_cache

# 2. 生成Prisma Client
npx prisma generate

# 3. 查看生成的SQL
cat prisma/migrations/*/migration.sql

验证结果

-- 应该看到以下SQL
CREATE TABLE "platform_schema"."app_cache" (
  "id" SERIAL PRIMARY KEY,
  "key" VARCHAR(500) UNIQUE NOT NULL,
  "value" JSONB NOT NULL,
  "expires_at" TIMESTAMP NOT NULL,
  "created_at" TIMESTAMP DEFAULT NOW()
);

CREATE INDEX "idx_app_cache_expires" ON "platform_schema"."app_cache"("expires_at");
CREATE INDEX "idx_app_cache_key_expires" ON "platform_schema"."app_cache"("key", "expires_at");

任务1.2:安装依赖

cd backend

# 安装pg-boss任务队列
npm install pg-boss --save

# 查看版本
npm list pg-boss
# 应显示pg-boss@10.x.x

任务1.3:更新环境变量配置

// 文件backend/src/config/env.ts

import { z } from 'zod';

const envSchema = z.object({
  // ... 现有配置 ...
  
  // ==================== 缓存配置 ====================
  CACHE_TYPE: z.enum(['memory', 'postgres']).default('memory'),  // ← 新增postgres选项
  
  // ==================== 队列配置 ====================
  QUEUE_TYPE: z.enum(['memory', 'pgboss']).default('memory'),  // ← 新增pgboss选项
  
  // ==================== 数据库配置 ====================
  DATABASE_URL: z.string(),
});

export const config = {
  // ... 现有配置 ...
  
  // 缓存配置
  cacheType: process.env.CACHE_TYPE || 'memory',
  
  // 队列配置
  queueType: process.env.QUEUE_TYPE || 'memory',
  
  // 数据库URL
  databaseUrl: process.env.DATABASE_URL,
};
# 文件backend/.env

# ==================== 缓存配置 ====================
CACHE_TYPE=postgres  # memory | postgres

# ==================== 队列配置 ====================
QUEUE_TYPE=pgboss    # memory | pgboss

# ==================== 数据库配置 ====================
DATABASE_URL=postgresql://user:password@localhost:5432/aiclincial?schema=public

4.2 Phase 2实现PostgresCacheAdapterDay 1下午0.5天)

任务2.1创建PostgresCacheAdapter

// 文件backend/src/common/cache/PostgresCacheAdapter.ts

import { prisma } from '../../config/database.js';
import type { CacheAdapter } from './CacheAdapter.js';
import { logger } from '../logging/index.js';

/**
 * Postgres缓存适配器
 * 
 * 核心特性:
 * - 持久化存储(实例重启不丢失)
 * - 多实例共享(通过数据库)
 * - 懒惰删除(读取时清理过期数据)
 * - 自动清理(定时任务删除过期数据)
 */
export class PostgresCacheAdapter implements CacheAdapter {
  
  /**
   * 获取缓存(带过期检查和懒惰删除)
   */
  async get<T = any>(key: string): Promise<T | null> {
    try {
      const record = await prisma.appCache.findUnique({
        where: { key }
      });
      
      if (!record) {
        return null;
      }
      
      // 检查是否过期
      if (record.expiresAt < new Date()) {
        // 懒惰删除:异步删除,不阻塞主流程
        this.deleteAsync(key);
        return null;
      }
      
      logger.debug('[PostgresCache] 缓存命中', { key });
      return record.value as T;
      
    } catch (error) {
      logger.error('[PostgresCache] 读取失败', { key, error });
      return null;
    }
  }

  /**
   * 设置缓存
   */
  async set(key: string, value: any, ttlSeconds: number = 3600): Promise<void> {
    try {
      const expiresAt = new Date(Date.now() + ttlSeconds * 1000);
      
      await prisma.appCache.upsert({
        where: { key },
        create: {
          key,
          value: value as any,  // Prisma会自动处理JSON
          expiresAt,
        },
        update: {
          value: value as any,
          expiresAt,
        }
      });
      
      logger.debug('[PostgresCache] 缓存写入', { key, ttl: ttlSeconds });
      
    } catch (error) {
      logger.error('[PostgresCache] 写入失败', { key, error });
      throw error;
    }
  }

  /**
   * 删除缓存
   */
  async delete(key: string): Promise<boolean> {
    try {
      await prisma.appCache.delete({
        where: { key }
      });
      
      logger.debug('[PostgresCache] 缓存删除', { key });
      return true;
      
    } catch (error) {
      // 如果key不存在Prisma会抛出错误
      if ((error as any)?.code === 'P2025') {
        return false;
      }
      logger.error('[PostgresCache] 删除失败', { key, error });
      return false;
    }
  }

  /**
   * 异步删除(不阻塞主流程)
   */
  private deleteAsync(key: string): void {
    prisma.appCache.delete({ where: { key } })
      .catch(err => {
        // 静默失败(可能已被其他实例删除)
        logger.debug('[PostgresCache] 懒惰删除失败', { key, err });
      });
  }

  /**
   * 批量删除(支持模式匹配)
   */
  async deleteMany(pattern: string): Promise<number> {
    try {
      const result = await prisma.appCache.deleteMany({
        where: {
          key: {
            contains: pattern
          }
        }
      });
      
      logger.info('[PostgresCache] 批量删除', { pattern, count: result.count });
      return result.count;
      
    } catch (error) {
      logger.error('[PostgresCache] 批量删除失败', { pattern, error });
      return 0;
    }
  }

  /**
   * 清空所有缓存
   */
  async flush(): Promise<void> {
    try {
      await prisma.appCache.deleteMany({});
      logger.info('[PostgresCache] 缓存已清空');
    } catch (error) {
      logger.error('[PostgresCache] 清空失败', { error });
      throw error;
    }
  }

  /**
   * 获取缓存统计信息
   */
  async getStats(): Promise<{
    total: number;
    expired: number;
    byModule: Record<string, number>;
  }> {
    try {
      const now = new Date();
      
      // 总数
      const total = await prisma.appCache.count();
      
      // 过期数量
      const expired = await prisma.appCache.count({
        where: {
          expiresAt: { lt: now }
        }
      });
      
      // 按模块统计通过key前缀分组
      const all = await prisma.appCache.findMany({
        select: { key: true }
      });
      
      const byModule: Record<string, number> = {};
      all.forEach(item => {
        const module = item.key.split(':')[0];
        byModule[module] = (byModule[module] || 0) + 1;
      });
      
      return { total, expired, byModule };
      
    } catch (error) {
      logger.error('[PostgresCache] 获取统计失败', { error });
      return { total: 0, expired: 0, byModule: {} };
    }
  }
}

/**
 * 启动定时清理任务(分批清理,防止阻塞)
 * 
 * 策略:
 * - 每分钟执行一次
 * - 每次删除1000条过期数据
 * - 使用LIMIT避免大事务
 */
export function startCacheCleanupTask(): void {
  const CLEANUP_INTERVAL = 60 * 1000;  // 1分钟
  const BATCH_SIZE = 1000;             // 每次1000条
  
  setInterval(async () => {
    try {
      // 使用原生SQL支持LIMIT
      const result = await prisma.$executeRaw`
        DELETE FROM platform_schema.app_cache 
        WHERE id IN (
          SELECT id FROM platform_schema.app_cache 
          WHERE expires_at < NOW() 
          LIMIT ${BATCH_SIZE}
        )
      `;
      
      if (result > 0) {
        logger.info('[PostgresCache] 定时清理', { deleted: result });
      }
      
    } catch (error) {
      logger.error('[PostgresCache] 定时清理失败', { error });
    }
  }, CLEANUP_INTERVAL);
  
  logger.info('[PostgresCache] 定时清理任务已启动', {
    interval: `${CLEANUP_INTERVAL / 1000}秒`,
    batchSize: BATCH_SIZE
  });
}

任务2.2更新CacheFactory

// 文件backend/src/common/cache/CacheFactory.ts

import { CacheAdapter } from './CacheAdapter.js';
import { MemoryCacheAdapter } from './MemoryCacheAdapter.js';
import { RedisCacheAdapter } from './RedisCacheAdapter.js';
import { PostgresCacheAdapter } from './PostgresCacheAdapter.js';  // ← 新增
import { logger } from '../logging/index.js';
import { config } from '../../config/env.js';

export class CacheFactory {
  private static instance: CacheAdapter | null = null;

  static getInstance(): CacheAdapter {
    if (!this.instance) {
      this.instance = this.createAdapter();
    }
    return this.instance;
  }

  private static createAdapter(): CacheAdapter {
    const cacheType = config.cacheType;

    logger.info('[CacheFactory] 初始化缓存', { cacheType });

    switch (cacheType) {
      case 'postgres':  // ← 新增
        return this.createPostgresAdapter();
      
      case 'memory':
        return this.createMemoryAdapter();
      
      case 'redis':
        return this.createRedisAdapter();
      
      default:
        logger.warn(`[CacheFactory] 未知缓存类型: ${cacheType}, 降级到内存`);
        return this.createMemoryAdapter();
    }
  }

  /**
   * 创建Postgres缓存适配器
   */
  private static createPostgresAdapter(): PostgresCacheAdapter {
    logger.info('[CacheFactory] 使用PostgresCacheAdapter');
    return new PostgresCacheAdapter();
  }

  private static createMemoryAdapter(): MemoryCacheAdapter {
    logger.info('[CacheFactory] 使用MemoryCacheAdapter');
    return new MemoryCacheAdapter();
  }

  private static createRedisAdapter(): RedisCacheAdapter {
    // ... 现有Redis逻辑 ...
    logger.info('[CacheFactory] 使用RedisCacheAdapter');
    return new RedisCacheAdapter({ /* ... */ });
  }

  static reset(): void {
    this.instance = null;
  }
}

任务2.3:更新导出

// 文件backend/src/common/cache/index.ts

export type { CacheAdapter } from './CacheAdapter.js';
export { MemoryCacheAdapter } from './MemoryCacheAdapter.js';
export { RedisCacheAdapter } from './RedisCacheAdapter.js';
export { PostgresCacheAdapter, startCacheCleanupTask } from './PostgresCacheAdapter.js';  // ← 新增
export { CacheFactory } from './CacheFactory.js';

import { CacheFactory } from './CacheFactory.js';

export const cache = CacheFactory.getInstance();

4.3 Phase 3实现PgBossQueueDay 2-32天

任务3.1创建PgBossQueue

// 文件backend/src/common/jobs/PgBossQueue.ts

import PgBoss from 'pg-boss';
import type { Job, JobQueue, JobHandler } from './types.js';
import { logger } from '../logging/index.js';
import { config } from '../../config/env.js';

/**
 * PgBoss队列适配器
 * 
 * 核心特性:
 * - 任务持久化(实例重启不丢失)
 * - 自动重试(指数退避)
 * - 多实例协调SKIP LOCKED
 * - 长任务支持4小时超时
 */
export class PgBossQueue implements JobQueue {
  private boss: PgBoss;
  private started = false;
  private workers: Map<string, any> = new Map();

  constructor() {
    this.boss = new PgBoss({
      connectionString: config.databaseUrl,
      schema: 'platform_schema',  // 统一在platform Schema
      max: 5,                      // 连接池大小
      
      // ✨ 关键配置:长任务支持
      expireInHours: 4,            // 任务锁4小时后过期
      
      // 自动维护(清理旧任务)
      retentionDays: 7,            // 保留7天的历史任务
      deleteAfterDays: 30,         // 30天后彻底删除
    });

    // 监听错误
    this.boss.on('error', error => {
      logger.error('[PgBoss] 队列错误', { error: error.message });
    });

    // 监听维护事件
    this.boss.on('maintenance', () => {
      logger.debug('[PgBoss] 执行维护任务');
    });
  }

  /**
   * 启动队列(懒加载)
   */
  private async ensureStarted(): Promise<void> {
    if (this.started) return;
    
    try {
      await this.boss.start();
      this.started = true;
      logger.info('[PgBoss] 队列已启动');
    } catch (error) {
      logger.error('[PgBoss] 队列启动失败', { error });
      throw error;
    }
  }

  /**
   * 推送任务到队列
   */
  async push<T = any>(type: string, data: T, options?: any): Promise<Job> {
    await this.ensureStarted();
    
    try {
      const jobId = await this.boss.send(type, data, {
        retryLimit: 3,                 // 失败重试3次
        retryDelay: 60,                // 失败后60秒重试
        retryBackoff: true,            // 指数退避60s, 120s, 240s
        expireInHours: 4,              // 4小时后过期
        ...options
      });

      logger.info('[PgBoss] 任务入队', { type, jobId });

      return {
        id: jobId!,
        type,
        data,
        status: 'pending',
        createdAt: new Date(),
      };
      
    } catch (error) {
      logger.error('[PgBoss] 任务入队失败', { type, error });
      throw error;
    }
  }

  /**
   * 注册任务处理器
   */
  process<T = any>(type: string, handler: JobHandler<T>): void {
    // 异步注册(不阻塞主流程)
    this.registerWorkerAsync(type, handler);
  }

  /**
   * 异步注册Worker
   */
  private async registerWorkerAsync<T>(
    type: string, 
    handler: JobHandler<T>
  ): Promise<void> {
    try {
      await this.ensureStarted();
      
      // 注册Worker
      await this.boss.work(
        type,
        {
          teamSize: 1,       // 每个队列并发1个任务
          teamConcurrency: 1 // 每个Worker处理1个任务
        },
        async (job: any) => {
          const startTime = Date.now();
          
          logger.info('[PgBoss] 开始处理任务', { 
            type, 
            jobId: job.id,
            attemptsMade: job.data.__retryCount || 0,
            attemptsTotal: 3
          });

          try {
            // 调用业务处理函数
            const result = await handler({
              id: job.id,
              type,
              data: job.data as T,
              status: 'processing',
              createdAt: new Date(job.createdon),
            });

            const duration = Date.now() - startTime;
            logger.info('[PgBoss] ✅ 任务完成', { 
              type, 
              jobId: job.id,
              duration: `${(duration / 1000).toFixed(2)}s`
            });

            return result;

          } catch (error) {
            const duration = Date.now() - startTime;
            logger.error('[PgBoss] ❌ 任务失败', { 
              type, 
              jobId: job.id,
              attemptsMade: job.data.__retryCount || 0,
              duration: `${(duration / 1000).toFixed(2)}s`,
              error: error instanceof Error ? error.message : 'Unknown'
            });
            
            // 抛出错误触发pg-boss自动重试
            throw error;
          }
        }
      );

      this.workers.set(type, true);
      logger.info('[PgBoss] ✅ Worker已注册', { type });

    } catch (error) {
      logger.error('[PgBoss] Worker注册失败', { type, error });
      throw error;
    }
  }

  /**
   * 获取任务状态
   */
  async getJob(id: string): Promise<Job | null> {
    try {
      await this.ensureStarted();
      
      const job = await this.boss.getJobById(id);
      if (!job) return null;

      return {
        id: job.id!,
        type: job.name,
        data: job.data,
        status: this.mapState(job.state),
        progress: 0,  // pg-boss不直接支持进度
        createdAt: new Date(job.createdon),
        completedAt: job.completedon ? new Date(job.completedon) : undefined,
        error: job.output?.message,
      };
      
    } catch (error) {
      logger.error('[PgBoss] 获取任务失败', { id, error });
      return null;
    }
  }

  /**
   * 映射pg-boss状态到通用状态
   */
  private mapState(state: string): Job['status'] {
    switch (state) {
      case 'completed':
        return 'completed';
      case 'failed':
        return 'failed';
      case 'active':
        return 'processing';
      case 'cancelled':
        return 'cancelled';
      default:
        return 'pending';
    }
  }

  /**
   * 更新任务进度(通过业务表)
   * 
   * 注意pg-boss不直接支持进度更新
   * 需要在业务层通过数据库表实现
   */
  async updateProgress(id: string, progress: number, message?: string): Promise<void> {
    logger.debug('[PgBoss] 进度更新(需业务表支持)', { id, progress, message });
    // 实际实现:更新 aslScreeningTask.processedItems
  }

  /**
   * 取消任务
   */
  async cancelJob(id: string): Promise<boolean> {
    try {
      await this.ensureStarted();
      await this.boss.cancel(id);
      logger.info('[PgBoss] 任务已取消', { id });
      return true;
    } catch (error) {
      logger.error('[PgBoss] 取消任务失败', { id, error });
      return false;
    }
  }

  /**
   * 重试失败任务
   */
  async retryJob(id: string): Promise<boolean> {
    try {
      await this.ensureStarted();
      await this.boss.resume(id);
      logger.info('[PgBoss] 任务已重试', { id });
      return true;
    } catch (error) {
      logger.error('[PgBoss] 重试任务失败', { id, error });
      return false;
    }
  }

  /**
   * 清理旧任务pg-boss自动处理
   */
  async cleanup(olderThan: number = 86400000): Promise<number> {
    // pg-boss有自动清理机制retentionDays
    logger.debug('[PgBoss] 使用自动清理机制');
    return 0;
  }

  /**
   * 关闭队列(优雅关闭)
   */
  async close(): Promise<void> {
    if (!this.started) return;
    
    try {
      await this.boss.stop();
      this.started = false;
      logger.info('[PgBoss] 队列已关闭');
    } catch (error) {
      logger.error('[PgBoss] 队列关闭失败', { error });
    }
  }
}

任务3.2更新JobFactory

// 文件backend/src/common/jobs/JobFactory.ts

import { JobQueue } from './types.js';
import { MemoryQueue } from './MemoryQueue.js';
import { PgBossQueue } from './PgBossQueue.js';  // ← 新增
import { logger } from '../logging/index.js';
import { config } from '../../config/env.js';

export class JobFactory {
  private static instance: JobQueue | null = null;

  static getInstance(): JobQueue {
    if (!this.instance) {
      this.instance = this.createQueue();
    }
    return this.instance;
  }

  private static createQueue(): JobQueue {
    const queueType = config.queueType;

    logger.info('[JobFactory] 初始化任务队列', { queueType });

    switch (queueType) {
      case 'pgboss':  // ← 新增
        return this.createPgBossQueue();
      
      case 'memory':
        return this.createMemoryQueue();
      
      default:
        logger.warn(`[JobFactory] 未知队列类型: ${queueType}, 降级到内存`);
        return this.createMemoryQueue();
    }
  }

  /**
   * 创建PgBoss队列
   */
  private static createPgBossQueue(): PgBossQueue {
    logger.info('[JobFactory] 使用PgBossQueue');
    return new PgBossQueue();
  }

  private static createMemoryQueue(): MemoryQueue {
    logger.info('[JobFactory] 使用MemoryQueue');
    
    const queue = new MemoryQueue();
    
    // 定期清理(避免内存泄漏)
    if (process.env.NODE_ENV !== 'test') {
      setInterval(() => {
        queue.cleanup();
      }, 60 * 60 * 1000);
    }
    
    return queue;
  }

  static reset(): void {
    this.instance = null;
  }
}

任务3.3:更新导出

// 文件backend/src/common/jobs/index.ts

export type { Job, JobStatus, JobHandler, JobQueue } from './types.js';
export { MemoryQueue } from './MemoryQueue.js';
export { PgBossQueue } from './PgBossQueue.js';  // ← 新增
export { JobFactory } from './JobFactory.js';

import { JobFactory } from './JobFactory.js';

export const jobQueue = JobFactory.getInstance();

4.4 Phase 4实现任务拆分机制Day 41天 新增

任务4.1:创建任务拆分工具函数

// 文件backend/src/common/jobs/utils.ts (新建文件)

import { logger } from '../logging/index.js';

/**
 * 将数组拆分成多个批次
 * 
 * @param items 要拆分的项目数组
 * @param chunkSize 每批次大小
 * @returns 拆分后的二维数组
 * 
 * @example
 * splitIntoChunks([1,2,3,4,5], 2)  // [[1,2], [3,4], [5]]
 */
export function splitIntoChunks<T>(items: T[], chunkSize: number = 100): T[][] {
  if (chunkSize <= 0) {
    throw new Error('chunkSize must be greater than 0');
  }
  
  const chunks: T[][] = [];
  for (let i = 0; i < items.length; i += chunkSize) {
    chunks.push(items.slice(i, i + chunkSize));
  }
  
  logger.debug('[TaskSplit] 任务拆分完成', {
    total: items.length,
    chunkSize,
    chunks: chunks.length
  });
  
  return chunks;
}

/**
 * 估算处理时间
 * 
 * @param itemCount 项目数量
 * @param timePerItem 每项处理时间(秒)
 * @returns 总时间(秒)
 */
export function estimateProcessingTime(
  itemCount: number,
  timePerItem: number
): number {
  return itemCount * timePerItem;
}

/**
 * 推荐批次大小
 * 
 * 根据单项处理时间和最大批次时间,计算最优批次大小
 * 
 * @param totalItems 总项目数
 * @param timePerItem 每项处理时间(秒)
 * @param maxChunkTime 单批次最大时间默认15分钟
 * @returns 推荐的批次大小
 * 
 * @example
 * recommendChunkSize(1000, 7.2, 900)  // 返回 125每批125项15分钟
 */
export function recommendChunkSize(
  totalItems: number,
  timePerItem: number,
  maxChunkTime: number = 900  // 15分钟
): number {
  // 计算每批次最多能处理多少项
  const itemsPerChunk = Math.floor(maxChunkTime / timePerItem);
  
  // 限制范围最少10项最多1000项
  const recommended = Math.max(10, Math.min(itemsPerChunk, 1000));
  
  logger.info('[TaskSplit] 批次大小推荐', {
    totalItems,
    timePerItem: `${timePerItem}秒`,
    maxChunkTime: `${maxChunkTime}秒`,
    recommended,
    estimatedBatches: Math.ceil(totalItems / recommended),
    estimatedTimePerBatch: `${(recommended * timePerItem / 60).toFixed(1)}分钟`
  });
  
  return recommended;
}

/**
 * 批次任务配置表
 */
export const CHUNK_STRATEGIES = {
  // ASL文献筛选每批100篇约12分钟
  'asl:title-screening': {
    chunkSize: 100,
    timePerItem: 7.2,  // 秒
    estimatedTime: 720,  // 12分钟
    maxRetries: 3,
    description: 'ASL标题摘要筛选双模型并行'
  },
  
  // ASL全文复筛每批50篇约15分钟
  'asl:fulltext-screening': {
    chunkSize: 50,
    timePerItem: 18,  // 秒
    estimatedTime: 900,  // 15分钟
    maxRetries: 3,
    description: 'ASL全文复筛12字段提取'
  },
  
  // DC病历提取每批50份约8分钟
  'dc:medical-extraction': {
    chunkSize: 50,
    timePerItem: 10,  // 秒
    estimatedTime: 500,  // 8分钟
    maxRetries: 3,
    description: 'DC医疗记录结构化提取'
  },
  
  // 统计分析每批5000条约8分钟
  'ssa:statistical-analysis': {
    chunkSize: 5000,
    timePerItem: 0.1,  // 秒
    estimatedTime: 500,  // 8分钟
    maxRetries: 2,
    description: 'SSA统计分析计算'
  }
} as const;

/**
 * 获取任务拆分策略
 */
export function getChunkStrategy(taskType: keyof typeof CHUNK_STRATEGIES) {
  const strategy = CHUNK_STRATEGIES[taskType];
  if (!strategy) {
    logger.warn('[TaskSplit] 未找到任务策略,使用默认配置', { taskType });
    return {
      chunkSize: 100,
      timePerItem: 10,
      estimatedTime: 1000,
      maxRetries: 3,
      description: '默认任务策略'
    };
  }
  return strategy;
}

任务4.2:更新导出

// 文件backend/src/common/jobs/index.ts

export type { Job, JobStatus, JobHandler, JobQueue } from './types.js';
export { MemoryQueue } from './MemoryQueue.js';
export { PgBossQueue } from './PgBossQueue.js';
export { JobFactory } from './JobFactory.js';
export {  // ← 新增
  splitIntoChunks,
  estimateProcessingTime,
  recommendChunkSize,
  getChunkStrategy,
  CHUNK_STRATEGIES
} from './utils.js';

import { JobFactory } from './JobFactory.js';

export const jobQueue = JobFactory.getInstance();

任务4.3:单元测试

// 文件backend/tests/common/jobs/utils.test.ts新建

import { describe, it, expect } from 'vitest';
import { splitIntoChunks, recommendChunkSize } from '../../../src/common/jobs/utils.js';

describe('Task Split Utils', () => {
  describe('splitIntoChunks', () => {
    it('should split array into chunks', () => {
      const items = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
      const chunks = splitIntoChunks(items, 3);
      
      expect(chunks).toEqual([
        [1, 2, 3],
        [4, 5, 6],
        [7, 8, 9],
        [10]
      ]);
    });

    it('should handle exact division', () => {
      const items = [1, 2, 3, 4, 5, 6];
      const chunks = splitIntoChunks(items, 2);
      
      expect(chunks).toEqual([
        [1, 2],
        [3, 4],
        [5, 6]
      ]);
    });

    it('should handle empty array', () => {
      const chunks = splitIntoChunks([], 10);
      expect(chunks).toEqual([]);
    });
  });

  describe('recommendChunkSize', () => {
    it('should recommend chunk size for ASL screening', () => {
      const size = recommendChunkSize(1000, 7.2, 900);  // 15分钟
      expect(size).toBe(125);  // 125 * 7.2 = 900秒
    });

    it('should not exceed max limit', () => {
      const size = recommendChunkSize(10000, 0.1, 900);
      expect(size).toBe(1000);  // 最大1000
    });

    it('should not go below min limit', () => {
      const size = recommendChunkSize(100, 100, 900);
      expect(size).toBe(10);  // 最小10
    });
  });
});

4.5 Phase 5实现断点续传机制Day 51天 新增

任务5.1更新Prisma Schema

// 文件prisma/schema.prisma

model AslScreeningTask {
  // ... 现有字段 ...
  
  // ✨ 新增字段:任务拆分
  totalBatches      Int      @default(1) @map("total_batches")
  processedBatches  Int      @default(0) @map("processed_batches")
  currentBatchIndex Int      @default(0) @map("current_batch_index")
  
  // ✨ 新增字段:断点续传
  currentIndex      Int       @default(0) @map("current_index")
  lastCheckpoint    DateTime? @map("last_checkpoint")
  checkpointData    Json?     @map("checkpoint_data")
}
# 生成迁移
cd backend
npx prisma migrate dev --name add_task_split_checkpoint

# 验证
npx prisma migrate status

任务5.2:创建断点续传服务

// 文件backend/src/common/jobs/CheckpointService.ts新建

import { prisma } from '../../config/database.js';
import { logger } from '../logging/index.js';

export interface CheckpointData {
  lastProcessedId?: string;
  lastProcessedIndex: number;
  batchProgress: number;
  metadata?: Record<string, any>;
}

/**
 * 断点续传服务
 * 
 * 提供任务断点的保存、读取和恢复功能
 */
export class CheckpointService {
  /**
   * 保存断点
   * 
   * @param taskId 任务ID
   * @param currentIndex 当前索引
   * @param data 断点数据
   */
  static async saveCheckpoint(
    taskId: string,
    currentIndex: number,
    data?: Partial<CheckpointData>
  ): Promise<void> {
    try {
      const checkpointData: CheckpointData = {
        lastProcessedIndex: currentIndex,
        batchProgress: 0,
        ...data
      };

      await prisma.aslScreeningTask.update({
        where: { id: taskId },
        data: {
          currentIndex,
          lastCheckpoint: new Date(),
          checkpointData: checkpointData as any
        }
      });

      logger.debug('[Checkpoint] 断点已保存', {
        taskId,
        currentIndex,
        checkpoint: checkpointData
      });

    } catch (error) {
      logger.error('[Checkpoint] 保存断点失败', { taskId, currentIndex, error });
      // 不抛出错误,避免影响主流程
    }
  }

  /**
   * 读取断点
   * 
   * @param taskId 任务ID
   * @returns 断点索引和数据
   */
  static async loadCheckpoint(taskId: string): Promise<{
    startIndex: number;
    data: CheckpointData | null;
  }> {
    try {
      const task = await prisma.aslScreeningTask.findUnique({
        where: { id: taskId },
        select: {
          currentIndex: true,
          lastCheckpoint: true,
          checkpointData: true
        }
      });

      if (!task) {
        return { startIndex: 0, data: null };
      }

      const startIndex = task.currentIndex || 0;
      const data = task.checkpointData as CheckpointData | null;

      if (startIndex > 0) {
        logger.info('[Checkpoint] 断点恢复', {
          taskId,
          startIndex,
          lastCheckpoint: task.lastCheckpoint,
          data
        });
      }

      return { startIndex, data };

    } catch (error) {
      logger.error('[Checkpoint] 读取断点失败', { taskId, error });
      return { startIndex: 0, data: null };
    }
  }

  /**
   * 清除断点
   * 
   * @param taskId 任务ID
   */
  static async clearCheckpoint(taskId: string): Promise<void> {
    try {
      await prisma.aslScreeningTask.update({
        where: { id: taskId },
        data: {
          currentIndex: 0,
          lastCheckpoint: null,
          checkpointData: null
        }
      });

      logger.debug('[Checkpoint] 断点已清除', { taskId });

    } catch (error) {
      logger.error('[Checkpoint] 清除断点失败', { taskId, error });
    }
  }

  /**
   * 批量保存进度(包含断点)
   * 
   * @param taskId 任务ID
   * @param updates 更新数据
   */
  static async updateProgress(
    taskId: string,
    updates: {
      processedItems?: number;
      successItems?: number;
      failedItems?: number;
      conflictItems?: number;
      currentIndex?: number;
      checkpointData?: Partial<CheckpointData>;
    }
  ): Promise<void> {
    try {
      const data: any = {};

      // 累加字段
      if (updates.processedItems !== undefined) {
        data.processedItems = { increment: updates.processedItems };
      }
      if (updates.successItems !== undefined) {
        data.successItems = { increment: updates.successItems };
      }
      if (updates.failedItems !== undefined) {
        data.failedItems = { increment: updates.failedItems };
      }
      if (updates.conflictItems !== undefined) {
        data.conflictItems = { increment: updates.conflictItems };
      }

      // 断点字段
      if (updates.currentIndex !== undefined) {
        data.currentIndex = updates.currentIndex;
        data.lastCheckpoint = new Date();
      }
      if (updates.checkpointData) {
        data.checkpointData = updates.checkpointData;
      }

      await prisma.aslScreeningTask.update({
        where: { id: taskId },
        data
      });

      logger.debug('[Checkpoint] 进度已更新', { taskId, updates });

    } catch (error) {
      logger.error('[Checkpoint] 更新进度失败', { taskId, error });
    }
  }
}

任务5.3:更新导出

// 文件backend/src/common/jobs/index.ts

export { CheckpointService } from './CheckpointService.js';  // ← 新增

4.6 Phase 6改造业务代码Day 6-72天 已更新

任务6.1改造ASL筛选服务 完整版:拆分+断点+队列)

// 文件backend/src/modules/asl/services/screeningService.ts

import { prisma } from '../../../config/database.js';
import { logger } from '../../../common/logging/index.js';
import { 
  jobQueue, 
  splitIntoChunks, 
  recommendChunkSize,
  CheckpointService  // ✨ 新增
} from '../../../common/jobs/index.js';
import { llmScreeningService } from './llmScreeningService.js';

/**
 * 启动筛选任务(改造后:使用队列)
 */
export async function startScreeningTask(projectId: string, userId: string) {
  try {
    logger.info('Starting screening task', { projectId, userId });

    // 1. 检查项目是否存在
    const project = await prisma.aslScreeningProject.findFirst({
      where: { id: projectId, userId },
    });

    if (!project) {
      throw new Error('Project not found');
    }

    // 2. 获取该项目的所有文献
    const literatures = await prisma.aslLiterature.findMany({
      where: { projectId },
    });

    if (literatures.length === 0) {
      throw new Error('No literatures found in project');
    }

    logger.info('Found literatures for screening', { 
      projectId, 
      count: literatures.length 
    });

    // 3. 创建筛选任务(数据库记录)
    const task = await prisma.aslScreeningTask.create({
      data: {
        projectId,
        taskType: 'title_abstract',
        status: 'pending',  // ← 初始状态改为pending
        totalItems: literatures.length,
        processedItems: 0,
        successItems: 0,
        failedItems: 0,
        conflictItems: 0,
        startedAt: new Date(),
      },
    });

    logger.info('Screening task created', { taskId: task.id });

    // 4. ✨ 推送到队列(异步处理,不阻塞请求)
    await jobQueue.push('asl:title-screening', {
      taskId: task.id,
      projectId,
      literatureIds: literatures.map(lit => lit.id),
    });

    logger.info('Task pushed to queue', { taskId: task.id });

    // 5. 立即返回任务ID前端可以轮询进度
    return task;
    
  } catch (error) {
    logger.error('Failed to start screening task', { error, projectId });
    throw error;
  }
}

/**
 * ✨ 注册队列Worker在应用启动时调用
 * 
 * 这个函数需要在 backend/src/index.ts 中调用
 */
export function registerScreeningWorkers() {
  // 注册标题摘要筛选Worker
  jobQueue.process('asl:title-screening', async (job) => {
    const { taskId, projectId, literatureIds } = job.data;
    
    logger.info('开始处理标题摘要筛选', { 
      taskId, 
      total: literatureIds.length 
    });

    try {
      // 更新任务状态为running
      await prisma.aslScreeningTask.update({
        where: { id: taskId },
        data: { status: 'running' }
      });

      // 获取项目的PICOS标准
      const project = await prisma.aslScreeningProject.findUnique({
        where: { id: projectId },
      });

      if (!project) {
        throw new Error('Project not found');
      }

      const rawPicoCriteria = project.picoCriteria as any;
      const picoCriteria = {
        P: rawPicoCriteria?.P || rawPicoCriteria?.population || '',
        I: rawPicoCriteria?.I || rawPicoCriteria?.intervention || '',
        C: rawPicoCriteria?.C || rawPicoCriteria?.comparison || '',
        O: rawPicoCriteria?.O || rawPicoCriteria?.outcome || '',
        S: rawPicoCriteria?.S || rawPicoCriteria?.studyDesign || '',
      };

      // 逐个处理文献
      let successCount = 0;
      let failedCount = 0;
      let conflictCount = 0;

      for (let i = 0; i < literatureIds.length; i++) {
        const literatureId = literatureIds[i];
        
        try {
          // 获取文献信息
          const literature = await prisma.aslLiterature.findUnique({
            where: { id: literatureId },
          });

          if (!literature) {
            failedCount++;
            continue;
          }

          // 调用LLM筛选服务
          const screeningResult = await llmScreeningService.screenSingleLiterature(
            literature.title || '',
            literature.abstract || '',
            picoCriteria,
            projectId
          );

          // 判断是否冲突
          const isConflict = screeningResult.deepseekDecision !== screeningResult.qwenDecision;

          // 保存筛选结果
          await prisma.aslScreeningResult.create({
            data: {
              literatureId,
              projectId,
              taskId,
              taskType: 'title_abstract',
              
              deepseekDecision: screeningResult.deepseekDecision,
              deepseekReason: screeningResult.deepseekReason,
              deepseekRawResponse: screeningResult.deepseekRawResponse,
              
              qwenDecision: screeningResult.qwenDecision,
              qwenReason: screeningResult.qwenReason,
              qwenRawResponse: screeningResult.qwenRawResponse,
              
              finalDecision: screeningResult.finalDecision,
              hasConflict: isConflict,
              manualReviewRequired: isConflict,
            },
          });

          if (isConflict) {
            conflictCount++;
          } else {
            successCount++;
          }

        } catch (error) {
          logger.error('处理文献失败', { literatureId, error });
          failedCount++;
        }

        // 更新进度每处理10篇或最后一篇时更新
        if ((i + 1) % 10 === 0 || i === literatureIds.length - 1) {
          await prisma.aslScreeningTask.update({
            where: { id: taskId },
            data: {
              processedItems: i + 1,
              successItems: successCount,
              failedItems: failedCount,
              conflictItems: conflictCount,
            }
          });
        }
      }

      // 标记任务完成
      await prisma.aslScreeningTask.update({
        where: { id: taskId },
        data: {
          status: 'completed',
          completedAt: new Date(),
          processedItems: literatureIds.length,
          successItems: successCount,
          failedItems: failedCount,
          conflictItems: conflictCount,
        }
      });

      logger.info('标题摘要筛选完成', { 
        taskId, 
        total: literatureIds.length,
        success: successCount,
        failed: failedCount,
        conflict: conflictCount
      });

      return {
        success: true,
        processed: literatureIds.length,
        successCount,
        failedCount,
        conflictCount,
      };

    } catch (error) {
      // 任务失败,更新状态
      await prisma.aslScreeningTask.update({
        where: { id: taskId },
        data: {
          status: 'failed',
          completedAt: new Date(),
        }
      });

      logger.error('标题摘要筛选失败', { taskId, error });
      throw error;
    }
  });

  logger.info('✅ ASL筛选Worker已注册');
}

任务4.2更新应用入口注册Workers

// 文件backend/src/index.ts

import Fastify from 'fastify';
import { logger } from './common/logging/index.js';
import { startCacheCleanupTask } from './common/cache/index.js';  // ← 新增
import { registerScreeningWorkers } from './modules/asl/services/screeningService.js';  // ← 新增

const app = Fastify({ logger: false });

// ... 注册路由等 ...

// ✨ 启动服务器后注册队列Workers和定时任务
app.listen({ port: 3001, host: '0.0.0.0' }, async (err, address) => {
  if (err) {
    logger.error('Failed to start server', { error: err });
    process.exit(1);
  }

  logger.info(`Server listening on ${address}`);

  // ✨ 启动缓存定时清理
  if (process.env.CACHE_TYPE === 'postgres') {
    startCacheCleanupTask();
  }

  // ✨ 注册队列Workers
  if (process.env.QUEUE_TYPE === 'pgboss') {
    try {
      registerScreeningWorkers();  // ASL模块
      // registerDataCleaningWorkers();  // DC模块待实现
      // registerStatisticalWorkers();   // SSA模块待实现
    } catch (error) {
      logger.error('Failed to register workers', { error });
    }
  }
});

// 优雅关闭
process.on('SIGTERM', async () => {
  logger.info('SIGTERM received, closing server gracefully...');
  await app.close();
  process.exit(0);
});

任务4.3DC模块改造次优先

// 文件backend/src/modules/dc/tool-b/services/MedicalRecordExtractionService.ts
// (仅示例,实际根据需求实现)

import { jobQueue } from '../../../../common/jobs/index.js';

export function registerMedicalExtractionWorkers() {
  jobQueue.process('dc:medical-extraction', async (job) => {
    const { taskId, recordIds } = job.data;
    
    // 批量提取病历
    for (const recordId of recordIds) {
      await extractSingleRecord(recordId);
      
      // 更新进度
      // ...
    }
    
    return { success: true };
  });
}

4.5 Phase 5测试验证Day 61天

任务5.1:单元测试

// 文件backend/tests/common/cache/PostgresCacheAdapter.test.ts

import { describe, it, expect, beforeEach } from 'vitest';
import { PostgresCacheAdapter } from '../../../src/common/cache/PostgresCacheAdapter.js';
import { prisma } from '../../../src/config/database.js';

describe('PostgresCacheAdapter', () => {
  let cache: PostgresCacheAdapter;

  beforeEach(async () => {
    cache = new PostgresCacheAdapter();
    // 清空测试数据
    await prisma.appCache.deleteMany({});
  });

  it('should set and get cache', async () => {
    await cache.set('test:key', { value: 'hello' }, 60);
    const result = await cache.get('test:key');
    expect(result).toEqual({ value: 'hello' });
  });

  it('should return null for expired cache', async () => {
    await cache.set('test:key', { value: 'hello' }, 1);  // 1秒过期
    await new Promise(resolve => setTimeout(resolve, 1100));  // 等待1.1秒
    const result = await cache.get('test:key');
    expect(result).toBeNull();
  });

  it('should delete cache', async () => {
    await cache.set('test:key', { value: 'hello' }, 60);
    await cache.delete('test:key');
    const result = await cache.get('test:key');
    expect(result).toBeNull();
  });

  // ... 更多测试 ...
});

任务5.2:集成测试

# 1. 启动本地Postgres
docker start ai-clinical-postgres

# 2. 设置环境变量
export CACHE_TYPE=postgres
export QUEUE_TYPE=pgboss
export DATABASE_URL=postgresql://postgres:123456@localhost:5432/aiclincial

# 3. 运行迁移
cd backend
npx prisma migrate deploy

# 4. 启动应用
npm run dev

# 应该看到日志:
# [CacheFactory] 使用PostgresCacheAdapter
# [JobFactory] 使用PgBossQueue
# [PgBoss] 队列已启动
# [PostgresCache] 定时清理任务已启动
# ✅ ASL筛选Worker已注册

任务5.3:功能测试

# 测试1缓存功能
curl -X POST http://localhost:3001/api/v1/asl/projects/:projectId/screening

# 观察日志:
# [PostgresCache] 缓存命中 { key: 'asl:llm:...' }

# 测试2队列功能
# 提交任务 → 观察任务状态变化
# pending → running → completed

# 测试3实例重启恢复
# 提交任务 → 等待处理到50% → Ctrl+C停止 → 重新启动
# 任务应该自动恢复并继续

5. 优先级与依赖关系

5.1 改造优先级矩阵( 已更新)

模块 优先级 工作量 业务价值 风险 依赖 状态
环境准备 P0 0.5天 - 待开始
PostgresCacheAdapter P0 0.5天 降低LLM成本50% 环境准备 待开始
PgBossQueue P0 2天 长任务可靠性 环境准备 待开始
任务拆分机制 P0 1天 任务成功率 > 99% PgBossQueue 待开始
断点续传机制 P0 1天 容错性提升10倍 PgBossQueue 待开始
ASL筛选改造 P0 1.5天 用户核心功能 任务拆分+断点 待开始
DC提取改造 P1 1天 用户核心功能 任务拆分+断点 待开始
测试验证 P0 1.5天 保证质量 所有改造 待开始
SAE部署 P1 0.5天 生产就绪 测试验证 待开始

总工作量: 9天比V1.0增加2天增加任务拆分和断点续传

5.2 依赖关系图( 已更新)

Day 1: 环境准备 ────────────────┐
                               │
Day 1: PostgresCache ──────────┤
                               │
Day 2-3: PgBossQueue ───────────┤
                               │
Day 4: 任务拆分机制 ─────────────┤
                               ├─→ Day 6-7: ASL筛选改造 ──┐
Day 5: 断点续传机制 ─────────────┤                         │
                               │                         │
Day 7: DC提取改造 ──────────────┘                         │
                                                         │
                                                         ├─→ Day 8-9: 测试 + 部署

5.3 可并行工作( 已更新)

Day 1并行
├─ 环境准备(上午)
└─ PostgresCache实现下午

Day 2-3串行
└─ PgBossQueue实现必须等环境准备完成

Day 4-5可并行
├─ 任务拆分机制(工具函数)
└─ 断点续传机制(数据库字段)

Day 6-7串行
├─ ASL筛选改造使用拆分+断点)
└─ DC提取改造复用拆分+断点)

Day 8-9并行
├─ 测试验证
└─ 文档完善

6. 测试验证方案( 已更新)

6.1 测试清单

功能测试(基础)

✅ 缓存读写正常
✅ 缓存过期自动清理每分钟1000条
✅ 缓存多实例共享实例A写→实例B读
✅ 队列任务入队pg-boss
✅ 队列任务处理Worker
✅ 队列任务重试模拟失败3次重试
✅ 队列实例重启恢复

任务拆分测试 新增

✅ 任务拆分工具函数正确性
  - splitIntoChunks([1..100], 30) → 4批30+30+30+10
  - recommendChunkSize(1000, 7.2, 900)125

✅ 拆分任务入队
  - 1000篇文献 → 10批次每批100篇
  - 验证数据库totalBatches=10, processedBatches=0

✅ 批次并行处理
  - 多个Worker同时处理不同批次
  - 验证无冲突SKIP LOCKED

✅ 批次失败重试
  - 模拟第3批失败 → 自动重试
  - 其他批次不受影响

断点续传测试 新增

✅ 断点保存
  - 处理到第50项 → 保存断点
  - 验证数据库currentIndex=50, lastCheckpoint更新

✅ 断点恢复
  - 任务中断Ctrl+C
  - 重启服务 → 从第50项继续
  - 验证前50项不重复处理

✅ 批次级断点
  - 10批次任务完成前3批
  - 实例重启 → 从第4批继续
  - 验证processedBatches=3, currentBatchIndex=3

长时间任务测试 🔴 重点

✅ 1000篇文献筛选约2小时
  - 拆分成10批每批12分钟
  - 成功率 > 99%
  - 验证进度更新每10篇

✅ 10000篇文献筛选约20小时
  - 拆分成100批每批12分钟
  - 10个Worker并行 → 2小时完成
  - 成功率 > 99.5%

✅ 实例重启恢复(关键测试)
  - 启动任务 → 等待50% → 停止服务Ctrl+C
  - 重启服务 → 任务自动恢复
  - 验证从50%继续不从0%开始
  - 预期:总耗时 ≈ 原计划时间 × 1.05误差5%内)

性能测试

✅ 缓存读取延迟 < 5msP99
✅ 缓存写入延迟 < 10msP99
✅ 队列吞吐量 > 100任务/小时
✅ 断点保存延迟 < 20ms
✅ 批次切换延迟 < 100ms

故障测试(增强)

✅ 实例销毁SAE缩容
  - 正在处理任务 → 实例销毁
  - 等待10分钟 → 新实例接管
  - 任务从断点恢复

✅ 数据库连接断开
  - 处理任务中 → 断开连接
  - 自动重连 → 继续处理

✅ 任务处理失败
  - 模拟LLM超时
  - 自动重试3次
  - 失败后标记为failed

✅ Postgres慢查询
  - 模拟数据库慢(> 5秒
  - 任务不失败,等待完成

✅ 并发冲突
  - 2个Worker领取同一批次
  - pg-boss SKIP LOCKED机制
  - 验证只有1个Worker处理

✅ 发布更新(生产场景)
  - 15:00发布更新
  - 正在执行的5批任务
  - 实例重启 → 5批任务自动恢复

6.2 测试脚本

测试1完整流程1000篇文献

# 1. 准备测试数据
cd backend
npm run test:seed -- --literatures=1000

# 2. 启动服务
npm run dev

# 3. 提交任务
curl -X POST http://localhost:3001/api/v1/asl/projects/:projectId/screening \
  -H "Content-Type: application/json"

# 4. 观察日志
# [TaskSplit] 任务拆分完成 { total: 1000, chunkSize: 100, chunks: 10 }
# [PgBoss] 任务入队 { type: 'asl:title-screening-batch', jobId: '1' }
# ...
# [PgBoss] 批次处理完成 { taskId, batchIndex: 0, progress: '1/10' }

# 5. 查询进度
curl http://localhost:3001/api/v1/asl/tasks/:taskId

# 预期响应:
# {
#   "id": "task_123",
#   "status": "running",
#   "totalItems": 1000,
#   "processedItems": 350,
#   "totalBatches": 10,
#   "processedBatches": 3,
#   "progress": 0.35
# }

测试2断点恢复关键测试

# 1. 启动任务
curl -X POST http://localhost:3001/api/v1/asl/projects/:projectId/screening

# 2. 等待处理到50%
# 观察日志processedItems: 500

# 3. 强制停止服务
# Ctrl+C 或 kill -9 <pid>

# 4. 重新启动服务
npm run dev

# 5. 观察日志
# [Checkpoint] 断点恢复 { taskId, startIndex: 500 }
# [PgBoss] 开始处理任务 { batchIndex: 5 }  ← 从第6批继续

# 6. 验证最终结果
# 总耗时应该约等于 2小时 + 重启时间(< 5分钟
# 不应该是 4小时从头开始

测试3并发处理10000篇

# 1. 准备大数据集
npm run test:seed -- --literatures=10000

# 2. 启动多个Worker实例模拟SAE多实例
# Terminal 1
npm run dev -- --port=3001

# Terminal 2
npm run dev -- --port=3002

# Terminal 3
npm run dev -- --port=3003

# 3. 提交任务(任意一个实例)
curl -X POST http://localhost:3001/api/v1/asl/projects/:projectId/screening

# 4. 观察三个实例的日志
# 应该看到3个实例同时处理不同批次
# Worker1: 处理批次 0, 3, 6, 9, ...
# Worker2: 处理批次 1, 4, 7, 10, ...
# Worker3: 处理批次 2, 5, 8, 11, ...

# 5. 验证完成时间
# 100批次 / 3个Worker ≈ 33.3批轮次 × 12分钟 ≈ 6.6小时
# 单Worker需要100批 × 12分钟 = 20小时
# 加速比20 / 6.6 ≈ 3倍

6.3 监控指标( 已更新)

// 缓存监控
- cache_hit_rate: 命中率 (目标 > 60%)
- cache_total_count: 总数
- cache_expired_count: 过期数量
- cache_by_module: 各模块分布

// 队列监控(基础)
- queue_pending_count: 待处理任务
- queue_processing_count: 处理中任务
- queue_completed_count: 完成任务
- queue_failed_count: 失败任务
- queue_avg_duration: 平均耗时

// 任务拆分监控 ✨ 新增
- task_total_batches: 总批次数
- task_processed_batches: 已完成批次数
- task_batch_success_rate: 批次成功率 (目标 > 99%)
- task_avg_batch_duration: 平均批次耗时

// 断点续传监控 ✨ 新增
- checkpoint_save_count: 断点保存次数
- checkpoint_restore_count: 断点恢复次数
- checkpoint_save_duration: 保存耗时 (目标 < 20ms)
- task_recovery_success_rate: 恢复成功率 (目标 100%)

6.4 成功标准( 已更新)

基础功能:
✅ 所有单元测试通过
✅ 所有集成测试通过
✅ 缓存命中率 > 60%
✅ LLM API调用次数下降 > 40%

任务可靠性 🔴 关键:
✅ 1000篇文献筛选成功率 > 99%
✅ 10000篇文献筛选成功率 > 99.5%
✅ 实例重启后任务自动恢复成功率 100%
✅ 断点恢复后不重复处理已完成项
✅ 批次失败只需重试单批(不重试全部)

性能指标:
✅ 单批次处理时间 < 15分钟
✅ 断点保存延迟 < 20ms
✅ 10个Worker并行加速比 > 8倍

生产验证:
✅ 生产环境运行48小时无错误
✅ 处理3个完整的1000篇文献筛选任务
✅ 至少1次实例重启恢复测试成功
✅ 无用户投诉任务丢失
✅ 系统可用性 > 99.9%

7. 上线与回滚

7.1 上线步骤

# Step 1: 数据库迁移(生产环境)
npx prisma migrate deploy

# Step 2: 更新SAE环境变量
CACHE_TYPE=postgres
QUEUE_TYPE=pgboss

# Step 3: 灰度发布1个实例
# 观察24小时监控指标正常

# Step 4: 全量发布2-3个实例
# 逐步扩容

# Step 5: 清理旧代码
# 移除MemoryQueue相关代码可选

7.2 回滚方案

# 如果出现问题,立即回滚:

# 方案1环境变量回滚最快
CACHE_TYPE=memory
QUEUE_TYPE=memory
# 重启应用,降级到内存模式

# 方案2代码回滚
git revert <commit>
# 回滚到改造前版本

# 方案3数据库回滚
npx prisma migrate down
# 删除 app_cache 表(可选)

7.3 风险预案

风险 概率 影响 预案
Postgres性能不足 回滚到内存模式
pg-boss连接失败 降级到同步处理
缓存数据过大 增加清理频率
长任务卡死 手动kill任务

8. 成功标准( V2.0更新)

8.1 技术指标

指标类别 指标 目标值 衡量方法
缓存 命中率 > 60% 监控统计
持久化 实例重启不丢失 重启测试
多实例共享 A写B能读 并发测试
队列 任务持久化 实例销毁不丢失 销毁测试
长任务可靠性 > 99% 1000篇筛选
超长任务可靠性 > 99.5% 10000篇筛选
拆分 批次成功率 > 99% 批次统计
批次耗时 < 15分钟 监控统计
并行加速比 > 8倍10 Worker 对比测试
断点 断点保存延迟 < 20ms 性能测试
恢复成功率 100% 重启测试
重复处理率 0% 数据验证

8.2 业务指标

指标 改造前 改造后 改进幅度
LLM API成本 基线 -40~60% 节省¥X/月
任务成功率 10-30% > 99% 提升3-10倍
用户重复提交 平均3次 < 1.1次 减少70%
任务完成时间 不确定(可能失败) 稳定可预测 体验提升
用户满意度 基线 显著提升 问卷调查

8.3 验收清单

Phase 1-3基础设施 ✅
✅ PostgresCacheAdapter实现并通过测试
✅ PgBossQueue实现并通过测试
✅ 本地环境验证通过

Phase 4-5高级特性 ✅
✅ 任务拆分工具函数测试通过
✅ 断点续传服务测试通过
✅ 数据库Schema迁移成功

Phase 6-7业务集成 ✅
✅ ASL筛选服务改造完成
✅ DC提取服务改造完成
✅ Worker注册成功

关键功能测试 🔴:
✅ 1000篇文献筛选2小时成功率 > 99%
✅ 实例重启恢复测试通过3次
✅ 断点续传测试通过从50%恢复)
✅ 批次并行处理测试通过
✅ 失败重试测试通过

生产环境验证 🔴:
✅ 生产环境运行48小时无致命错误
✅ 完成至少3个真实用户任务1000篇+
✅ 至少经历1次SAE实例重启任务成功恢复
✅ 缓存命中率 > 60%
✅ LLM API调用量下降 > 40%
✅ 无用户投诉任务丢失

9. 附录

9.1 参考文档

9.2 关键代码位置( V2.0更新)

backend/src/
├── common/cache/                           # 缓存系统
│   ├── CacheAdapter.ts                     # ✅ 已存在
│   ├── CacheFactory.ts                     # ⚠️ 需修改(+10行
│   ├── MemoryCacheAdapter.ts               # ✅ 已存在
│   ├── RedisCacheAdapter.ts                # 🔴 占位符(不用管)
│   ├── PostgresCacheAdapter.ts             # ❌ 需新增(~300行
│   └── index.ts                            # ⚠️ 需修改(导出)
│
├── common/jobs/                            # 任务队列
│   ├── types.ts                            # ✅ 已存在
│   ├── JobFactory.ts                       # ⚠️ 需修改(+10行
│   ├── MemoryQueue.ts                      # ✅ 已存在
│   ├── PgBossQueue.ts                      # ❌ 需新增(~400行
│   ├── utils.ts                            # ❌ 需新增(~200行✨
│   ├── CheckpointService.ts                # ❌ 需新增(~150行✨
│   └── index.ts                            # ⚠️ 需修改(导出)
│
├── modules/asl/services/                   # ASL业务层
│   ├── screeningService.ts                 # ⚠️ 需改造(~150行改动
│   └── llmScreeningService.ts              # ✅ 无需改动
│
├── modules/dc/tool-b/services/             # DC业务层
│   └── 类似ASL按需改造
│
├── config/
│   └── env.ts                              # ⚠️ 需添加环境变量
│
└── index.ts                                # ⚠️ 需修改注册Workers + 启动清理)

prisma/
├── schema.prisma                           # ⚠️ 需修改(+AppCache +字段)
└── migrations/                             # 自动生成

tests/                                      # 测试文件
├── common/cache/
│   └── PostgresCacheAdapter.test.ts        # ❌ 需新增
├── common/jobs/
│   ├── PgBossQueue.test.ts                 # ❌ 需新增
│   ├── utils.test.ts                       # ❌ 需新增 ✨
│   └── CheckpointService.test.ts           # ❌ 需新增 ✨
└── modules/asl/
    └── screening-integration.test.ts       # ❌ 需新增

backend/.env                                # ⚠️ 需修改

文件状态说明:

  • 已存在 - 无需改动
  • ⚠️ 需修改 - 少量改动(< 50行
  • 需新增 - 全新文件
  • 🔴 占位符 - 忽略(不影响本次改造)
  • V2.0新增 - 支持拆分+断点

代码行数统计:

总新增代码:~1800行
├─ PostgresCacheAdapter.ts: 300行
├─ PgBossQueue.ts: 400行
├─ utils.ts: 200行 ✨
├─ CheckpointService.ts: 150行 ✨
├─ screeningService.ts改造: 200行
├─ 测试代码: 400行
└─ 其他Factory、导出等: 150行

总修改代码:~100行
├─ CacheFactory.ts: 10行
├─ JobFactory.ts: 10行
├─ index.ts: 20行
├─ env.ts: 20行
├─ schema.prisma: 40行
└─ 各处导出: 约10处

10. V2.0 vs V1.0 对比

维度 V1.0(原计划) V2.0(当前版本) 变化
工作量 7天 9天 +2天
代码行数 ~1000行 ~1900行 +900行
核心策略 缓存+队列 缓存+队列+拆分+断点 +2个策略
长任务支持 < 4小时 任意时长(拆分后) 质的提升
任务成功率 85-90% > 99% 提升10%+
实例重启恢复 从头开始 断点续传 避免浪费
并发能力 单实例串行 多实例并行 加速N倍
生产就绪度 基本可用 完全就绪 企业级

为什么增加2天

  • Day 4任务拆分机制工具函数+测试)
  • Day 5断点续传机制服务+数据库)

增加的价值:

  • 长任务可靠性从85% → 99%+
  • 支持任意时长任务(通过拆分)
  • 实例重启不浪费已处理结果
  • 多Worker并行速度提升N倍
  • 符合Serverless最佳实践

结论: 多花2天换取质的飞跃非常值得


11. 快速开始

11.1 一键检查清单

# 1. 检查代码结构(应该都存在)
ls backend/src/common/cache/CacheAdapter.ts     # ✅
ls backend/src/common/cache/MemoryCacheAdapter.ts # ✅
ls backend/src/common/jobs/types.ts              # ✅
ls backend/src/common/jobs/MemoryQueue.ts        # ✅

# 2. 检查需要新增的文件(应该不存在)
ls backend/src/common/cache/PostgresCacheAdapter.ts  # ❌ 待新增
ls backend/src/common/jobs/PgBossQueue.ts            # ❌ 待新增
ls backend/src/common/jobs/utils.ts                  # ❌ 待新增
ls backend/src/common/jobs/CheckpointService.ts     # ❌ 待新增

# 3. 检查依赖
cd backend
npm list pg-boss   # ❌ 需安装

# 4. 检查数据库
psql -d aiclincial -c "\dt platform_schema.*"
# 应该看到现有的业务表但没有app_cache

11.2 立即开始

# Phase 1环境准备30分钟
cd backend
npm install pg-boss --save
# 修改 prisma/schema.prisma添加AppCache
npx prisma migrate dev --name add_postgres_cache
npx prisma generate

# Phase 2实现PostgresCacheAdapter4小时
# 创建 src/common/cache/PostgresCacheAdapter.ts
# 修改 src/common/cache/CacheFactory.ts
# 测试验证

# Phase 3实现PgBossQueue16小时2天
# 创建 src/common/jobs/PgBossQueue.ts
# 修改 src/common/jobs/JobFactory.ts
# 测试验证

# ... 按计划继续

🎯 现在就开始?

建议:

  1. 先通读文档 - 理解整体架构30分钟
  2. 验证代码结构 - 确认真实文件10分钟
  3. 开始Phase 1 - 环境准备30分钟
  4. 逐步推进 - 每完成一个Phase就测试

有任何问题随时沟通!🚀