# Postgres-Only 架构改造实施计划(完整版) > **文档版本?* V2.0 ?**已更?* > **创建日期?* 2025-12-13 > **更新日期?* 2025-12-13 > **目标完成时间?* 2025-12-20?天) > **负责人:** 技术团? > **风险等级?* 🟢 低(基于成熟方案,有降级策略? > **核心原则?* 通用能力层优先,复用架构优势 --- ## ⚠️ **V2.0 更新说明** 本版本基?*真实代码结构验证**?*技术方案深度分?*进行了重要更新: ### **更新1:代码结构真实性验?* ?- ?已从实际代码验证所有路径和文件 - ?明确标注"已存??占位??需新增" - ?RedisCacheAdapter确认为未实现的占位符 - ?所有改造点基于真实代码结构 ### **更新2:长任务可靠性策?* 🔴 - ?新增?*断点续传机制**(策?? 强烈推荐 - ?新增?*任务拆分策略**(策?? 架构级优?- ?分析:心跳续租机制(策略1? 不推荐实?- ?明确:pg-boss的技术限制(最?小时,推?小时?- ?修正?4小时连续任务不支持,需拆分 ### **更新3:实施计划调?* - ?工作量增加到9天(增加任务拆分和断点续传) - ?完整的代码示例(可直接使用) - ?数据库Schema更新(新增字段) --- ## 📋 目录 1. [改造背景与目标](#1-改造背景与目标) 2. [当前系统分析](#2-当前系统分析) 3. [改造总体架构](#3-改造总体架构) 4. [详细实施步骤](#4-详细实施步骤) 5. [优先级与依赖关系](#5-优先级与依赖关系) 6. [测试验证方案](#6-测试验证方案) 7. [上线与回滚](#7-上线与回? --- ## 1. 改造背景与目标 ### 1.1 为什么选择Postgres-Only? ``` 核心痛点??长任务不可靠?小时任务实例销毁丢失) ?LLM成本失控(缓存不持久化) ?多实例不同步(内存缓存各自独立) 技术方案选择??Postgres-Only(推荐) - 架构简单(1-2人团队) - 运维成本低(复用RDS? - 数据一致性强(事务保证) - 节省成本(?000+/年) ?Redis方案(不推荐? - 架构复杂(双系统? - 运维负担重(需维护Redis? - 数据一致性弱(双写问题) - 额外成本(?000+/年) ``` ### 1.2 改造目? | 目标 | 当前状?| 改造后 | 衡量指标 | |------|---------|--------|---------| | **长任务可靠?* | 5-10%(实例销毁丢失) | > 99% | 2小时任务成功?| | **LLM成本** | 重复调用多次 | 降低50%+ | 月度API费用 | | **缓存命中?* | 0%(不持久?| 60%+ | 监控统计 | | **多实例同?* | ?各自独立 | ?共享缓存 | 实例A写→实例B?| | **架构复杂?* | 中等 | ?| 中间件数?| --- ## 2. 当前系统分析 ### 2.1 代码结构现状?层架构)- ?**已验证真实代?* > **验证方法?* 通过 `list_dir` ?`read_file` 实际检查代码库 > **验证日期?* 2025-12-13 > **验证结果?* 所有路径和文件状态均已确? ``` AIclinicalresearch/backend/src/ ├── common/ # 🔵 通用能力?? ├── cache/ # ?真实存在 ? ? ├── CacheAdapter.ts # ?接口定义(真实) ? ? ├── CacheFactory.ts # ?工厂模式(真实) ? ? ├── index.ts # ?导出文件(真实) ? ? ├── MemoryCacheAdapter.ts # ?内存实现(真实,已完成) ? ? ├── RedisCacheAdapter.ts # 🔴 占位符(真实存在,但未实现) ? ? └── PostgresCacheAdapter.ts # ?需新增(本次改造) ? ?? ├── jobs/ # ?真实存在 ? ? ├── types.ts # ?接口定义(真实) ? ? ├── JobFactory.ts # ?工厂模式(真实) ? ? ├── index.ts # ?导出文件(真实) ? ? ├── MemoryQueue.ts # ?内存实现(真实,已完成) ? ? └── PgBossQueue.ts # ?需新增(本次改造) ? ?? ├── storage/ # ?存储服务(已完善?? ├── logging/ # ?日志系统(已完善?? └── llm/ # ?LLM网关(已完善??├── modules/ # 🟢 业务模块?? ├── asl/ # ?真实存在 ? ? ├── services/ ? ? ? ├── screeningService.ts # ⚠️ 需改造:改为队列(真实) ? ? ? └── llmScreeningService.ts # ?真实存在 ? ? └── common/llm/ ? ? └── LLM12FieldsService.ts # ?已用缓存(真实,?16行) ? ?? └── dc/ # ?真实存在 ? └── tool-b/services/ ? └── HealthCheckService.ts # ?已用缓存(真实,?7行) ?└── config/ # 🔵 平台基础? ├── database.ts # ?Prisma配置(真实) └── env.ts # ⚠️ 需添加新环境变量(真实文件?``` **重要发现?* 1. **?3层架构真实存?* - 代码组织完全符合设计 2. **?工厂模式已实?* - CacheFactory和JobFactory真实可用 3. **🔴 RedisCacheAdapter是占位符** - 所有方法都 `throw new Error('Not implemented')` 4. **?业务层已使用cache接口** - 改造时业务代码无需修改 5. **?PostgresCacheAdapter和PgBossQueue需新增** - 本次改造的核心工作 ### 2.2 缓存使用现状(✅ 已验证) | 位置 | 用?| TTL | 数据?| 重要?| 代码行号 | 当前实现 | |------|------|-----|--------|--------|---------|---------| | **LLM12FieldsService.ts** | LLM 12字段提取缓存 | 1小时 | ~50KB/?| 🔴 高(成本?| ?16?| ?已用cache | | **HealthCheckService.ts** | Excel健康检查缓?| 24小时 | ~5KB/?| 🟡 中等 | ?7?| ?已用cache | **代码示例(真实代码)?* ```typescript // backend/src/modules/asl/common/llm/LLM12FieldsService.ts (?16? const cached = await cache.get(cacheKey); if (cached) { logger.info('缓存命中', { cacheKey }); return cached; } // ... LLM调用 ... await cache.set(cacheKey, JSON.stringify(result), 3600); // 1小时 // backend/src/modules/dc/tool-b/services/HealthCheckService.ts (?7? const cached = await cache.get(cacheKey); if (cached) return cached; // ... Excel解析 ... await cache.set(cacheKey, result, 86400); // 24小时 ``` **结论**:✅ 缓存系统已在使用,只需切换底层实现(Memory ?Postgres)? ### 2.3 队列使用现状(✅ 已验证) | 位置 | 用?| 代码行号 | 耗时 | 重要?| 当前实现 | 问题 | |------|------|---------|------|--------|---------|------| | **screeningService.ts** | 文献筛选任?| ?5?| 2小时?000篇) | 🔴 ?| ?同步执行 | 实例销毁丢?| | **DC Tool B** | 病历批量提取 | 未实?| 1-3小时 | 🔴 ?| ?未实?| 不支持长任务 | **代码示例(真实代码)?* ```typescript // backend/src/modules/asl/services/screeningService.ts (?5? // 4. 异步处理文献(简化版:直接在这里处理?// 生产环境应该发送到消息队列 processLiteraturesInBackground(task.id, projectId, literatures); // ?同步执行,有风险 ``` **结论**:❌ 队列系统尚未使用,需要优先改造。注释已提示"生产环境应该发送到消息队列"? ### 2.4 长任务可靠性分?🔴 **新增** #### **当前问题(真实场景)** ``` 场景1?000篇文献筛选(?小时?├─ 问题1:同步执行,阻塞HTTP请求 ├─ 问题2:SAE实例15分钟无流量自动缩?├─ 问题3:实例重启,任务从头开?└─ 结果:任务成功率 < 10%,用户需重复提交3-5? 场景2?0000篇文献筛选(?0小时?├─ 问题1:超过pg-boss最大锁定时间(4小时?├─ 问题2:任务被重复领取,造成重复处理 └─ 结果:任务失败率 100% 场景3:发布更新(15:00?├─ 问题1:正在执行的任务被强制终?├─ 问题2:已处理的文献结果丢?└─ 结果:用户体验极?``` #### **技术限制分?* | 技术栈 | 限制 | 影响 | |--------|------|------| | **SAE** | 15分钟无流量自动缩?| 长任务必然失?| | **pg-boss** | 最长锁?小时(推?小时?| 超长任务不支?| | **HTTP请求** | 最?0秒超?| 不能同步执行长任?| | **实例重启** | 内存状态丢?| 任务进度丢失 | #### **解决方案评估** | 策略 | 价?| 难度 | pg-boss支持 | 推荐?| 实施 | |------|------|------|------------|--------|------| | **策略1:心跳续?* | ⭐⭐⭐⭐ | 🔴 ?| ?不支?| 🟡 不推?| 暂不实施 | | **策略2:断点续?* | ⭐⭐⭐⭐?| 🟢 ?| ?兼容 | 🟢 强烈推荐 | **Phase 6** | | **策略3:任务拆?* | ⭐⭐⭐⭐?| 🟡 ?| ?原生支持 | 🟢 强烈推荐 | **Phase 5** | **结论**:采?*策略2(断点续传)+ 策略3(任务拆分)**组合方案? --- ## 3. 改造总体架构 ### 3.1 架构对比 ``` 改造前(当前)?┌─────────────────────────────────────?? Business Layer (ASL, DC, SSA...) ?? ├─ screeningService.ts ?? ? └─ processInBackground() ? ?同步执行(阻塞) ? └─ LLM12FieldsService.ts ?? └─ cache.get/set() ? ?使用Memory缓存 └─────────────────────────────────────? ?使用 ┌─────────────────────────────────────?? Capability Layer (Common) ?? ├─ cache ?? ? ├─ MemoryCacheAdapter ? ? ?当前使用 ? ? └─ RedisCacheAdapter 🔴 ? ?占位符(未实现) ? └─ jobs ?? └─ MemoryQueue ? ? ?当前使用 └─────────────────────────────────────? ?依赖 ┌─────────────────────────────────────?? Platform Layer ?? └─ PostgreSQL (业务数据) ?└─────────────────────────────────────? 问题??缓存不持久(实例重启丢失??队列不持久(任务丢失??多实例不共享(各自独立) ?长任务不可靠?小时任务失败?> 90%??违反Serverless原则(同步执行长任务?``` ``` 改造后(Postgres-Only + 任务拆分 + 断点续传): ┌──────────────────────────────────────────────────────────────?? Business Layer (ASL, DC, SSA...) ?? ├─ screeningService.ts ?? ? ├─ startScreeningTask() ? ?改造点1:任务拆? ?? ? ? └─ 推送N个批次到队列 ? ?? ? └─ registerWorkers() ? ?? ? └─ 处理单个批次+断点续传 ? ?改造点2:断点续? ?? └─ LLM12FieldsService.ts ?? └─ cache.get/set() ? ?无需改动(接口不变)?└──────────────────────────────────────────────────────────────? ?使用 ┌──────────────────────────────────────────────────────────────?? Capability Layer (Common) ?? ├─ cache ?? ? ├─ MemoryCacheAdapter ? ? ?? ? ├─ PostgresCacheAdapter ? ? ?新增?00行) ?? ? └─ CacheFactory ? ?更新支持postgres ?? └─ jobs ?? ├─ MemoryQueue ? ? ?? ├─ PgBossQueue ? ? ?新增?00行) ?? └─ JobFactory ? ?更新支持pgboss ?└──────────────────────────────────────────────────────────────? ?依赖 ┌──────────────────────────────────────────────────────────────?? Platform Layer ?? ├─ PostgreSQL (RDS) ?? ? ├─ 业务数据(asl_*, dc_*, ...? ?? ? ├─ platform.app_cache ? ? ?缓存表(新增? ?? ? └─ platform.job_* ? ? ?队列表(pg-boss自动)│ ? └─ 阿里云RDS特? ?? ├─ 自动备份(每天) ?? ├─ PITR(时间点恢复? ?? └─ 高可用(主从切换? ?└──────────────────────────────────────────────────────────────? 优势??缓存持久化(RDS自动备份??队列持久化(任务不丢失) ?多实例共享(SKIP LOCKED??事务一致性(业务+任务原子提交??零额外运维(复用RDS??长任务可靠(拆分+断点,成功率 > 99%??符合Serverless(短任务,可中断恢复?``` ### 3.2 任务拆分策略 ?**新增** #### **拆分原则** ``` 目标:每个任?< 15分钟(远低于pg-boss 4小时限制? 原因?1. SAE实例15分钟无流量自动缩?2. 失败重试成本低(只需重试15分钟,不?小时?3. 进度可见性高(用户体验好?4. 可并行处理(多Worker同时工作?``` #### **拆分策略?* | 任务类型 | 单项耗时 | 推荐批次大小 | 批次耗时 | 并发能力 | |---------|---------|------------|---------|---------| | **ASL文献筛?* | 7.2??| 100?| 12分钟 | 10批并?| | **DC病历提取** | 10??| 50?| 8.3分钟 | 10批并?| | **统计分析** | 0.1??| 5000?| 8.3分钟 | 20批并?| #### **实际效果对比** ``` 场景?0000篇文献筛? 不拆分(错误): ├─ 1个任务,20小时 ├─ 超过pg-boss 4小时限制 ?失败 └─ 成功率:0% 拆分(正确)?├─ 100个批次,每批100篇,12分钟 ├─ 10个Worker并行处理 ├─ 总耗时?00 / 10 = 10批轮?× 12分钟 = 2小时 └─ 成功率:> 99.5%(单批失败只需重试12分钟?``` ### 3.3 断点续传机制 ?**新增** #### **核心思想** ``` 问题:SAE实例随时可能重启(发布更新、自动缩容) 无断点: ├─ 处理到第900??实例重启 ├─ 重新开始,从第1篇开?└─ 浪费时间?00 × 7.2?= 108分钟 有断点: ├─ 每处?0篇,保存进度到数据库 ├─ 处理到第900??实例重启 ├─ 重新开始,从第900篇继?└─ 浪费时间? 1分钟 ``` #### **实现策略** ```typescript // 数据库记录进?model AslScreeningTask { processedItems Int // 已处理数? currentIndex Int // 当前游标(断点) lastCheckpoint DateTime // 最后一次保存时? checkpointData Json // 断点详细数据 } // Worker读取断点 const startIndex = task.currentIndex || 0; for (let i = startIndex; i < items.length; i++) { await processItem(items[i]); // 每处?0项,保存断点 if ((i + 1) % 10 === 0) { await saveCheckpoint(i + 1); } } ``` #### **保存频率权衡** | 频率 | 数据库写?| 重启浪费时间 | 推荐 | |------|-----------|------------|------| | ??| 很高(性能差) | < 10?| ?不推?| | ?0?| 中等 | < 2分钟 | ?推荐 | | ?00?| ?| < 12分钟 | 🟡 可?| | 不保?| ?| 重头开?| ?不可接受 | ### 3.4 Schema设计(统一在platform)✨ **已更?* ```prisma // prisma/schema.prisma datasource db { provider = "postgresql" url = env("DATABASE_URL") schemas = ["platform_schema", "aia_schema", "pkb_schema", "asl_schema", "dc_schema", "ssa_schema", "st_schema", "rvw_schema", "admin_schema", "common_schema", "public"] } generator client { provider = "prisma-client-js" previewFeatures = ["multiSchema"] // ?启用多Schema支持 } // ==================== 平台基础设施(platform_schema?=================== /// 应用缓存表(替代Redis?model AppCache { id Int @id @default(autoincrement()) key String @unique @db.VarChar(500) value Json expiresAt DateTime @map("expires_at") createdAt DateTime @default(now()) @map("created_at") @@index([expiresAt], name: "idx_app_cache_expires") @@index([key, expiresAt], name: "idx_app_cache_key_expires") @@map("app_cache") @@schema("platform_schema") // ?统一在platform Schema } // pg-boss会自动创建任务表(不需要在Prisma中定义) // 表名:platform_schema.job, platform_schema.version ? // ==================== 业务模块(asl_schema?=================== /// ASL筛选任务表(✨ 需要新增字段支持拆?断点?model AslScreeningTask { id String @id @default(uuid()) projectId String @map("project_id") taskType String @map("task_type") status String // pending/running/completed/failed totalItems Int @map("total_items") processedItems Int @default(0) @map("processed_items") successItems Int @default(0) @map("success_items") failedItems Int @default(0) @map("failed_items") conflictItems Int @default(0) @map("conflict_items") // ?新增:任务拆分支? totalBatches Int @default(1) @map("total_batches") processedBatches Int @default(0) @map("processed_batches") currentBatchIndex Int @default(0) @map("current_batch_index") // ?新增:断点续传支? currentIndex Int @default(0) @map("current_index") lastCheckpoint DateTime? @map("last_checkpoint") checkpointData Json? @map("checkpoint_data") startedAt DateTime @map("started_at") completedAt DateTime? @map("completed_at") createdAt DateTime @default(now()) @map("created_at") updatedAt DateTime @updatedAt @map("updated_at") project AslScreeningProject @relation(fields: [projectId], references: [id]) results AslScreeningResult[] @@index([projectId]) @@index([status]) @@map("asl_screening_tasks") @@schema("asl_schema") } ``` **新增字段说明?* | 字段 | 类型 | 用?| 示例?| |------|------|------|--------| | **totalBatches** | Int | 总批次数 | 10?000篇?00?批) | | **processedBatches** | Int | 已完成批次数 | 3(已完成3批) | | **currentBatchIndex** | Int | 当前批次索引 | 3(正在处理第4批) | | **currentIndex** | Int | 当前项索引(断点?| 350(已处理350篇) | | **lastCheckpoint** | DateTime | 最后一次保存断点时?| 2025-12-13 10:30:00 | | **checkpointData** | Json | 断点详细数据 | `{"lastProcessedId": "lit_123", "batchProgress": 0.35}` | ### 3.5 Key/Queue命名规范 ```typescript // 缓存Key规范(逻辑隔离?const CACHE_KEY_PATTERNS = { // ASL模块 'asl:llm:{hash}': 'LLM提取结果', 'asl:pdf:{fileId}': 'PDF解析结果', // DC模块 'dc:health:{fileHash}': 'Excel健康检?, 'dc:extraction:{recordId}': '病历提取结果', // 全局 'session:{userId}': '用户Session', 'config:{key}': '系统配置', }; // 队列Name规范 const QUEUE_NAMES = { ASL_TITLE_SCREENING: 'asl:title-screening', ASL_FULLTEXT_SCREENING: 'asl:fulltext-screening', DC_MEDICAL_EXTRACTION: 'dc:medical-extraction', DC_DATA_CLEANING: 'dc:data-cleaning', SSA_STATISTICAL_ANALYSIS: 'ssa:statistical-analysis', }; ``` --- ## 4. 详细实施步骤 ### 4.1 Phase 1:环境准备(Day 1上午?.5天) #### **任务1.1:更新Prisma Schema** ```bash # 文件:prisma/schema.prisma ``` **修改?:启用multiSchema** ```prisma generator client { provider = "prisma-client-js" previewFeatures = ["multiSchema"] // ?新增 } ``` **修改?:添加AppCache模型** ```prisma /// 应用缓存表(Postgres-Only架构?model AppCache { id Int @id @default(autoincrement()) key String @unique @db.VarChar(500) value Json expiresAt DateTime @map("expires_at") createdAt DateTime @default(now()) @map("created_at") @@index([expiresAt], name: "idx_app_cache_expires") @@index([key, expiresAt], name: "idx_app_cache_key_expires") @@map("app_cache") @@schema("platform_schema") } ``` **执行迁移** ```bash cd backend # 1. 生成迁移文件 npx prisma migrate dev --name add_postgres_cache # 2. 生成Prisma Client npx prisma generate # 3. 查看生成的SQL cat prisma/migrations/*/migration.sql ``` **验证结果** ```sql -- 应该看到以下SQL CREATE TABLE "platform_schema"."app_cache" ( "id" SERIAL PRIMARY KEY, "key" VARCHAR(500) UNIQUE NOT NULL, "value" JSONB NOT NULL, "expires_at" TIMESTAMP NOT NULL, "created_at" TIMESTAMP DEFAULT NOW() ); CREATE INDEX "idx_app_cache_expires" ON "platform_schema"."app_cache"("expires_at"); CREATE INDEX "idx_app_cache_key_expires" ON "platform_schema"."app_cache"("key", "expires_at"); ``` #### **任务1.2:安装依?* ```bash cd backend # 安装pg-boss(任务队列) npm install pg-boss --save # 查看版本 npm list pg-boss # 应显示:pg-boss@10.x.x ``` #### **任务1.3:更新环境变量配?* ```typescript // 文件:backend/src/config/env.ts import { z } from 'zod'; const envSchema = z.object({ // ... 现有配置 ... // ==================== 缓存配置 ==================== CACHE_TYPE: z.enum(['memory', 'postgres']).default('memory'), // ?新增postgres选项 // ==================== 队列配置 ==================== QUEUE_TYPE: z.enum(['memory', 'pgboss']).default('memory'), // ?新增pgboss选项 // ==================== 数据库配?==================== DATABASE_URL: z.string(), }); export const config = { // ... 现有配置 ... // 缓存配置 cacheType: process.env.CACHE_TYPE || 'memory', // 队列配置 queueType: process.env.QUEUE_TYPE || 'memory', // 数据库URL databaseUrl: process.env.DATABASE_URL, }; ``` ```bash # 文件:backend/.env # ==================== 缓存配置 ==================== CACHE_TYPE=postgres # memory | postgres # ==================== 队列配置 ==================== QUEUE_TYPE=pgboss # memory | pgboss # ==================== 数据库配?==================== DATABASE_URL=postgresql://user:password@localhost:5432/aiclincial?schema=public ``` --- ### 4.2 Phase 2:实现PostgresCacheAdapter(Day 1下午?.5天) #### **任务2.1:创建PostgresCacheAdapter** ```typescript // 文件:backend/src/common/cache/PostgresCacheAdapter.ts import { prisma } from '../../config/database.js'; import type { CacheAdapter } from './CacheAdapter.js'; import { logger } from '../logging/index.js'; /** * Postgres缓存适配? * * 核心特性: * - 持久化存储(实例重启不丢失) * - 多实例共享(通过数据库) * - 懒惰删除(读取时清理过期数据? * - 自动清理(定时任务删除过期数据) */ export class PostgresCacheAdapter implements CacheAdapter { /** * 获取缓存(带过期检查和懒惰删除? */ async get(key: string): Promise { try { const record = await prisma.appCache.findUnique({ where: { key } }); if (!record) { return null; } // 检查是否过? if (record.expiresAt < new Date()) { // 懒惰删除:异步删除,不阻塞主流程 this.deleteAsync(key); return null; } logger.debug('[PostgresCache] 缓存命中', { key }); return record.value as T; } catch (error) { logger.error('[PostgresCache] 读取失败', { key, error }); return null; } } /** * 设置缓存 */ async set(key: string, value: any, ttlSeconds: number = 3600): Promise { try { const expiresAt = new Date(Date.now() + ttlSeconds * 1000); await prisma.appCache.upsert({ where: { key }, create: { key, value: value as any, // Prisma会自动处理JSON expiresAt, }, update: { value: value as any, expiresAt, } }); logger.debug('[PostgresCache] 缓存写入', { key, ttl: ttlSeconds }); } catch (error) { logger.error('[PostgresCache] 写入失败', { key, error }); throw error; } } /** * 删除缓存 */ async delete(key: string): Promise { try { await prisma.appCache.delete({ where: { key } }); logger.debug('[PostgresCache] 缓存删除', { key }); return true; } catch (error) { // 如果key不存在,Prisma会抛出错? if ((error as any)?.code === 'P2025') { return false; } logger.error('[PostgresCache] 删除失败', { key, error }); return false; } } /** * 异步删除(不阻塞主流程) */ private deleteAsync(key: string): void { prisma.appCache.delete({ where: { key } }) .catch(err => { // 静默失败(可能已被其他实例删除) logger.debug('[PostgresCache] 懒惰删除失败', { key, err }); }); } /** * 批量删除(支持模式匹配) */ async deleteMany(pattern: string): Promise { try { const result = await prisma.appCache.deleteMany({ where: { key: { contains: pattern } } }); logger.info('[PostgresCache] 批量删除', { pattern, count: result.count }); return result.count; } catch (error) { logger.error('[PostgresCache] 批量删除失败', { pattern, error }); return 0; } } /** * 清空所有缓? */ async flush(): Promise { try { await prisma.appCache.deleteMany({}); logger.info('[PostgresCache] 缓存已清?); } catch (error) { logger.error('[PostgresCache] 清空失败', { error }); throw error; } } /** * 获取缓存统计信息 */ async getStats(): Promise<{ total: number; expired: number; byModule: Record; }> { try { const now = new Date(); // 总数 const total = await prisma.appCache.count(); // 过期数量 const expired = await prisma.appCache.count({ where: { expiresAt: { lt: now } } }); // 按模块统计(通过key前缀分组? const all = await prisma.appCache.findMany({ select: { key: true } }); const byModule: Record = {}; all.forEach(item => { const module = item.key.split(':')[0]; byModule[module] = (byModule[module] || 0) + 1; }); return { total, expired, byModule }; } catch (error) { logger.error('[PostgresCache] 获取统计失败', { error }); return { total: 0, expired: 0, byModule: {} }; } } } /** * 启动定时清理任务(分批清理,防止阻塞? * * 策略? * - 每分钟执行一? * - 每次删除1000条过期数? * - 使用LIMIT避免大事? */ export function startCacheCleanupTask(): void { const CLEANUP_INTERVAL = 60 * 1000; // 1分钟 const BATCH_SIZE = 1000; // 每次1000? setInterval(async () => { try { // 使用原生SQL,支持LIMIT const result = await prisma.$executeRaw` DELETE FROM platform_schema.app_cache WHERE id IN ( SELECT id FROM platform_schema.app_cache WHERE expires_at < NOW() LIMIT ${BATCH_SIZE} ) `; if (result > 0) { logger.info('[PostgresCache] 定时清理', { deleted: result }); } } catch (error) { logger.error('[PostgresCache] 定时清理失败', { error }); } }, CLEANUP_INTERVAL); logger.info('[PostgresCache] 定时清理任务已启?, { interval: `${CLEANUP_INTERVAL / 1000}秒`, batchSize: BATCH_SIZE }); } ``` #### **任务2.2:更新CacheFactory** ```typescript // 文件:backend/src/common/cache/CacheFactory.ts import { CacheAdapter } from './CacheAdapter.js'; import { MemoryCacheAdapter } from './MemoryCacheAdapter.js'; import { RedisCacheAdapter } from './RedisCacheAdapter.js'; import { PostgresCacheAdapter } from './PostgresCacheAdapter.js'; // ?新增 import { logger } from '../logging/index.js'; import { config } from '../../config/env.js'; export class CacheFactory { private static instance: CacheAdapter | null = null; static getInstance(): CacheAdapter { if (!this.instance) { this.instance = this.createAdapter(); } return this.instance; } private static createAdapter(): CacheAdapter { const cacheType = config.cacheType; logger.info('[CacheFactory] 初始化缓?, { cacheType }); switch (cacheType) { case 'postgres': // ?新增 return this.createPostgresAdapter(); case 'memory': return this.createMemoryAdapter(); case 'redis': return this.createRedisAdapter(); default: logger.warn(`[CacheFactory] 未知缓存类型: ${cacheType}, 降级到内存`); return this.createMemoryAdapter(); } } /** * 创建Postgres缓存适配? */ private static createPostgresAdapter(): PostgresCacheAdapter { logger.info('[CacheFactory] 使用PostgresCacheAdapter'); return new PostgresCacheAdapter(); } private static createMemoryAdapter(): MemoryCacheAdapter { logger.info('[CacheFactory] 使用MemoryCacheAdapter'); return new MemoryCacheAdapter(); } private static createRedisAdapter(): RedisCacheAdapter { // ... 现有Redis逻辑 ... logger.info('[CacheFactory] 使用RedisCacheAdapter'); return new RedisCacheAdapter({ /* ... */ }); } static reset(): void { this.instance = null; } } ``` #### **任务2.3:更新导?* ```typescript // 文件:backend/src/common/cache/index.ts export type { CacheAdapter } from './CacheAdapter.js'; export { MemoryCacheAdapter } from './MemoryCacheAdapter.js'; export { RedisCacheAdapter } from './RedisCacheAdapter.js'; export { PostgresCacheAdapter, startCacheCleanupTask } from './PostgresCacheAdapter.js'; // ?新增 export { CacheFactory } from './CacheFactory.js'; import { CacheFactory } from './CacheFactory.js'; export const cache = CacheFactory.getInstance(); ``` --- ### 4.3 Phase 3:实现PgBossQueue(Day 2-3?天) #### **任务3.1:创建PgBossQueue** ```typescript // 文件:backend/src/common/jobs/PgBossQueue.ts import PgBoss from 'pg-boss'; import type { Job, JobQueue, JobHandler } from './types.js'; import { logger } from '../logging/index.js'; import { config } from '../../config/env.js'; /** * PgBoss队列适配? * * 核心特性: * - 任务持久化(实例重启不丢失) * - 自动重试(指数退避) * - 多实例协调(SKIP LOCKED? * - 长任务支持(4小时超时? */ export class PgBossQueue implements JobQueue { private boss: PgBoss; private started = false; private workers: Map = new Map(); constructor() { this.boss = new PgBoss({ connectionString: config.databaseUrl, schema: 'platform_schema', // 统一在platform Schema max: 5, // 连接池大? // ?关键配置:长任务支持 expireInHours: 4, // 任务?小时后过? // 自动维护(清理旧任务? retentionDays: 7, // 保留7天的历史任务 deleteAfterDays: 30, // 30天后彻底删除 }); // 监听错误 this.boss.on('error', error => { logger.error('[PgBoss] 队列错误', { error: error.message }); }); // 监听维护事件 this.boss.on('maintenance', () => { logger.debug('[PgBoss] 执行维护任务'); }); } /** * 启动队列(懒加载? */ private async ensureStarted(): Promise { if (this.started) return; try { await this.boss.start(); this.started = true; logger.info('[PgBoss] 队列已启?); } catch (error) { logger.error('[PgBoss] 队列启动失败', { error }); throw error; } } /** * 推送任务到队列 */ async push(type: string, data: T, options?: any): Promise { await this.ensureStarted(); try { const jobId = await this.boss.send(type, data, { retryLimit: 3, // 失败重试3? retryDelay: 60, // 失败?0秒重? retryBackoff: true, // 指数退避(60s, 120s, 240s? expireInHours: 4, // 4小时后过? ...options }); logger.info('[PgBoss] 任务入队', { type, jobId }); return { id: jobId!, type, data, status: 'pending', createdAt: new Date(), }; } catch (error) { logger.error('[PgBoss] 任务入队失败', { type, error }); throw error; } } /** * 注册任务处理? */ process(type: string, handler: JobHandler): void { // 异步注册(不阻塞主流程) this.registerWorkerAsync(type, handler); } /** * 异步注册Worker */ private async registerWorkerAsync( type: string, handler: JobHandler ): Promise { try { await this.ensureStarted(); // 注册Worker await this.boss.work( type, { teamSize: 1, // 每个队列并发1个任? teamConcurrency: 1 // 每个Worker处理1个任? }, async (job: any) => { const startTime = Date.now(); logger.info('[PgBoss] 开始处理任?, { type, jobId: job.id, attemptsMade: job.data.__retryCount || 0, attemptsTotal: 3 }); try { // 调用业务处理函数 const result = await handler({ id: job.id, type, data: job.data as T, status: 'processing', createdAt: new Date(job.createdon), }); const duration = Date.now() - startTime; logger.info('[PgBoss] ?任务完成', { type, jobId: job.id, duration: `${(duration / 1000).toFixed(2)}s` }); return result; } catch (error) { const duration = Date.now() - startTime; logger.error('[PgBoss] ?任务失败', { type, jobId: job.id, attemptsMade: job.data.__retryCount || 0, duration: `${(duration / 1000).toFixed(2)}s`, error: error instanceof Error ? error.message : 'Unknown' }); // 抛出错误,触发pg-boss自动重试 throw error; } } ); this.workers.set(type, true); logger.info('[PgBoss] ?Worker已注?, { type }); } catch (error) { logger.error('[PgBoss] Worker注册失败', { type, error }); throw error; } } /** * 获取任务状? */ async getJob(id: string): Promise { try { await this.ensureStarted(); const job = await this.boss.getJobById(id); if (!job) return null; return { id: job.id!, type: job.name, data: job.data, status: this.mapState(job.state), progress: 0, // pg-boss不直接支持进? createdAt: new Date(job.createdon), completedAt: job.completedon ? new Date(job.completedon) : undefined, error: job.output?.message, }; } catch (error) { logger.error('[PgBoss] 获取任务失败', { id, error }); return null; } } /** * 映射pg-boss状态到通用状? */ private mapState(state: string): Job['status'] { switch (state) { case 'completed': return 'completed'; case 'failed': return 'failed'; case 'active': return 'processing'; case 'cancelled': return 'cancelled'; default: return 'pending'; } } /** * 更新任务进度(通过业务表) * * 注意:pg-boss不直接支持进度更新, * 需要在业务层通过数据库表实现 */ async updateProgress(id: string, progress: number, message?: string): Promise { logger.debug('[PgBoss] 进度更新(需业务表支持)', { id, progress, message }); // 实际实现:更?aslScreeningTask.processedItems } /** * 取消任务 */ async cancelJob(id: string): Promise { try { await this.ensureStarted(); await this.boss.cancel(id); logger.info('[PgBoss] 任务已取?, { id }); return true; } catch (error) { logger.error('[PgBoss] 取消任务失败', { id, error }); return false; } } /** * 重试失败任务 */ async retryJob(id: string): Promise { try { await this.ensureStarted(); await this.boss.resume(id); logger.info('[PgBoss] 任务已重?, { id }); return true; } catch (error) { logger.error('[PgBoss] 重试任务失败', { id, error }); return false; } } /** * 清理旧任务(pg-boss自动处理? */ async cleanup(olderThan: number = 86400000): Promise { // pg-boss有自动清理机制(retentionDays? logger.debug('[PgBoss] 使用自动清理机制'); return 0; } /** * 关闭队列(优雅关闭) */ async close(): Promise { if (!this.started) return; try { await this.boss.stop(); this.started = false; logger.info('[PgBoss] 队列已关?); } catch (error) { logger.error('[PgBoss] 队列关闭失败', { error }); } } } ``` #### **任务3.2:更新JobFactory** ```typescript // 文件:backend/src/common/jobs/JobFactory.ts import { JobQueue } from './types.js'; import { MemoryQueue } from './MemoryQueue.js'; import { PgBossQueue } from './PgBossQueue.js'; // ?新增 import { logger } from '../logging/index.js'; import { config } from '../../config/env.js'; export class JobFactory { private static instance: JobQueue | null = null; static getInstance(): JobQueue { if (!this.instance) { this.instance = this.createQueue(); } return this.instance; } private static createQueue(): JobQueue { const queueType = config.queueType; logger.info('[JobFactory] 初始化任务队?, { queueType }); switch (queueType) { case 'pgboss': // ?新增 return this.createPgBossQueue(); case 'memory': return this.createMemoryQueue(); default: logger.warn(`[JobFactory] 未知队列类型: ${queueType}, 降级到内存`); return this.createMemoryQueue(); } } /** * 创建PgBoss队列 */ private static createPgBossQueue(): PgBossQueue { logger.info('[JobFactory] 使用PgBossQueue'); return new PgBossQueue(); } private static createMemoryQueue(): MemoryQueue { logger.info('[JobFactory] 使用MemoryQueue'); const queue = new MemoryQueue(); // 定期清理(避免内存泄漏) if (process.env.NODE_ENV !== 'test') { setInterval(() => { queue.cleanup(); }, 60 * 60 * 1000); } return queue; } static reset(): void { this.instance = null; } } ``` #### **任务3.3:更新导?* ```typescript // 文件:backend/src/common/jobs/index.ts export type { Job, JobStatus, JobHandler, JobQueue } from './types.js'; export { MemoryQueue } from './MemoryQueue.js'; export { PgBossQueue } from './PgBossQueue.js'; // ?新增 export { JobFactory } from './JobFactory.js'; import { JobFactory } from './JobFactory.js'; export const jobQueue = JobFactory.getInstance(); ``` --- ### 4.4 Phase 4:实现任务拆分机制(Day 4?天)?**新增** #### **任务4.1:创建任务拆分工具函?* ```typescript // 文件:backend/src/common/jobs/utils.ts (新建文件) import { logger } from '../logging/index.js'; /** * 将数组拆分成多个批次 * * @param items 要拆分的项目数组 * @param chunkSize 每批次大? * @returns 拆分后的二维数组 * * @example * splitIntoChunks([1,2,3,4,5], 2) // [[1,2], [3,4], [5]] */ export function splitIntoChunks(items: T[], chunkSize: number = 100): T[][] { if (chunkSize <= 0) { throw new Error('chunkSize must be greater than 0'); } const chunks: T[][] = []; for (let i = 0; i < items.length; i += chunkSize) { chunks.push(items.slice(i, i + chunkSize)); } logger.debug('[TaskSplit] 任务拆分完成', { total: items.length, chunkSize, chunks: chunks.length }); return chunks; } /** * 估算处理时间 * * @param itemCount 项目数量 * @param timePerItem 每项处理时间(秒? * @returns 总时间(秒) */ export function estimateProcessingTime( itemCount: number, timePerItem: number ): number { return itemCount * timePerItem; } /** * 推荐批次大小 * * 根据单项处理时间和最大批次时间,计算最优批次大? * * @param totalItems 总项目数 * @param timePerItem 每项处理时间(秒? * @param maxChunkTime 单批次最大时间(秒,默认15分钟? * @returns 推荐的批次大? * * @example * recommendChunkSize(1000, 7.2, 900) // 返回 125(每?25项,15分钟? */ export function recommendChunkSize( totalItems: number, timePerItem: number, maxChunkTime: number = 900 // 15分钟 ): number { // 计算每批次最多能处理多少? const itemsPerChunk = Math.floor(maxChunkTime / timePerItem); // 限制范围:最?0项,最?000? const recommended = Math.max(10, Math.min(itemsPerChunk, 1000)); logger.info('[TaskSplit] 批次大小推荐', { totalItems, timePerItem: `${timePerItem}秒`, maxChunkTime: `${maxChunkTime}秒`, recommended, estimatedBatches: Math.ceil(totalItems / recommended), estimatedTimePerBatch: `${(recommended * timePerItem / 60).toFixed(1)}分钟` }); return recommended; } /** * 批次任务配置? */ export const CHUNK_STRATEGIES = { // ASL文献筛选:每批100篇,?2分钟 'asl:title-screening': { chunkSize: 100, timePerItem: 7.2, // ? estimatedTime: 720, // 12分钟 maxRetries: 3, description: 'ASL标题摘要筛选(双模型并行)' }, // ASL全文复筛:每?0篇,?5分钟 'asl:fulltext-screening': { chunkSize: 50, timePerItem: 18, // ? estimatedTime: 900, // 15分钟 maxRetries: 3, description: 'ASL全文复筛?2字段提取? }, // DC病历提取:每?0份,?分钟 'dc:medical-extraction': { chunkSize: 50, timePerItem: 10, // ? estimatedTime: 500, // 8分钟 maxRetries: 3, description: 'DC医疗记录结构化提? }, // 统计分析:每?000条,?分钟 'ssa:statistical-analysis': { chunkSize: 5000, timePerItem: 0.1, // ? estimatedTime: 500, // 8分钟 maxRetries: 2, description: 'SSA统计分析计算' } } as const; /** * 获取任务拆分策略 */ export function getChunkStrategy(taskType: keyof typeof CHUNK_STRATEGIES) { const strategy = CHUNK_STRATEGIES[taskType]; if (!strategy) { logger.warn('[TaskSplit] 未找到任务策略,使用默认配置', { taskType }); return { chunkSize: 100, timePerItem: 10, estimatedTime: 1000, maxRetries: 3, description: '默认任务策略' }; } return strategy; } ``` #### **任务4.2:更新导?* ```typescript // 文件:backend/src/common/jobs/index.ts export type { Job, JobStatus, JobHandler, JobQueue } from './types.js'; export { MemoryQueue } from './MemoryQueue.js'; export { PgBossQueue } from './PgBossQueue.js'; export { JobFactory } from './JobFactory.js'; export { // ?新增 splitIntoChunks, estimateProcessingTime, recommendChunkSize, getChunkStrategy, CHUNK_STRATEGIES } from './utils.js'; import { JobFactory } from './JobFactory.js'; export const jobQueue = JobFactory.getInstance(); ``` #### **任务4.3:单元测?* ```typescript // 文件:backend/tests/common/jobs/utils.test.ts(新建) import { describe, it, expect } from 'vitest'; import { splitIntoChunks, recommendChunkSize } from '../../../src/common/jobs/utils.js'; describe('Task Split Utils', () => { describe('splitIntoChunks', () => { it('should split array into chunks', () => { const items = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]; const chunks = splitIntoChunks(items, 3); expect(chunks).toEqual([ [1, 2, 3], [4, 5, 6], [7, 8, 9], [10] ]); }); it('should handle exact division', () => { const items = [1, 2, 3, 4, 5, 6]; const chunks = splitIntoChunks(items, 2); expect(chunks).toEqual([ [1, 2], [3, 4], [5, 6] ]); }); it('should handle empty array', () => { const chunks = splitIntoChunks([], 10); expect(chunks).toEqual([]); }); }); describe('recommendChunkSize', () => { it('should recommend chunk size for ASL screening', () => { const size = recommendChunkSize(1000, 7.2, 900); // 15分钟 expect(size).toBe(125); // 125 * 7.2 = 900? }); it('should not exceed max limit', () => { const size = recommendChunkSize(10000, 0.1, 900); expect(size).toBe(1000); // 最?000 }); it('should not go below min limit', () => { const size = recommendChunkSize(100, 100, 900); expect(size).toBe(10); // 最?0 }); }); }); ``` --- ### 4.5 Phase 5:实现断点续传机制(Day 5?天)?**新增** #### **任务5.1:更新Prisma Schema** ```prisma // 文件:prisma/schema.prisma model AslScreeningTask { // ... 现有字段 ... // ?新增字段:任务拆? totalBatches Int @default(1) @map("total_batches") processedBatches Int @default(0) @map("processed_batches") currentBatchIndex Int @default(0) @map("current_batch_index") // ?新增字段:断点续? currentIndex Int @default(0) @map("current_index") lastCheckpoint DateTime? @map("last_checkpoint") checkpointData Json? @map("checkpoint_data") } ``` ```bash # 生成迁移 cd backend npx prisma migrate dev --name add_task_split_checkpoint # 验证 npx prisma migrate status ``` #### **任务5.2:创建断点续传服?* ```typescript // 文件:backend/src/common/jobs/CheckpointService.ts(新建) import { prisma } from '../../config/database.js'; import { logger } from '../logging/index.js'; export interface CheckpointData { lastProcessedId?: string; lastProcessedIndex: number; batchProgress: number; metadata?: Record; } /** * 断点续传服务 * * 提供任务断点的保存、读取和恢复功能 */ export class CheckpointService { /** * 保存断点 * * @param taskId 任务ID * @param currentIndex 当前索引 * @param data 断点数据 */ static async saveCheckpoint( taskId: string, currentIndex: number, data?: Partial ): Promise { try { const checkpointData: CheckpointData = { lastProcessedIndex: currentIndex, batchProgress: 0, ...data }; await prisma.aslScreeningTask.update({ where: { id: taskId }, data: { currentIndex, lastCheckpoint: new Date(), checkpointData: checkpointData as any } }); logger.debug('[Checkpoint] 断点已保?, { taskId, currentIndex, checkpoint: checkpointData }); } catch (error) { logger.error('[Checkpoint] 保存断点失败', { taskId, currentIndex, error }); // 不抛出错误,避免影响主流? } } /** * 读取断点 * * @param taskId 任务ID * @returns 断点索引和数? */ static async loadCheckpoint(taskId: string): Promise<{ startIndex: number; data: CheckpointData | null; }> { try { const task = await prisma.aslScreeningTask.findUnique({ where: { id: taskId }, select: { currentIndex: true, lastCheckpoint: true, checkpointData: true } }); if (!task) { return { startIndex: 0, data: null }; } const startIndex = task.currentIndex || 0; const data = task.checkpointData as CheckpointData | null; if (startIndex > 0) { logger.info('[Checkpoint] 断点恢复', { taskId, startIndex, lastCheckpoint: task.lastCheckpoint, data }); } return { startIndex, data }; } catch (error) { logger.error('[Checkpoint] 读取断点失败', { taskId, error }); return { startIndex: 0, data: null }; } } /** * 清除断点 * * @param taskId 任务ID */ static async clearCheckpoint(taskId: string): Promise { try { await prisma.aslScreeningTask.update({ where: { id: taskId }, data: { currentIndex: 0, lastCheckpoint: null, checkpointData: null } }); logger.debug('[Checkpoint] 断点已清?, { taskId }); } catch (error) { logger.error('[Checkpoint] 清除断点失败', { taskId, error }); } } /** * 批量保存进度(包含断点) * * @param taskId 任务ID * @param updates 更新数据 */ static async updateProgress( taskId: string, updates: { processedItems?: number; successItems?: number; failedItems?: number; conflictItems?: number; currentIndex?: number; checkpointData?: Partial; } ): Promise { try { const data: any = {}; // 累加字段 if (updates.processedItems !== undefined) { data.processedItems = { increment: updates.processedItems }; } if (updates.successItems !== undefined) { data.successItems = { increment: updates.successItems }; } if (updates.failedItems !== undefined) { data.failedItems = { increment: updates.failedItems }; } if (updates.conflictItems !== undefined) { data.conflictItems = { increment: updates.conflictItems }; } // 断点字段 if (updates.currentIndex !== undefined) { data.currentIndex = updates.currentIndex; data.lastCheckpoint = new Date(); } if (updates.checkpointData) { data.checkpointData = updates.checkpointData; } await prisma.aslScreeningTask.update({ where: { id: taskId }, data }); logger.debug('[Checkpoint] 进度已更?, { taskId, updates }); } catch (error) { logger.error('[Checkpoint] 更新进度失败', { taskId, error }); } } } ``` #### **任务5.3:更新导?* ```typescript // 文件:backend/src/common/jobs/index.ts export { CheckpointService } from './CheckpointService.js'; // ?新增 ``` --- ### 4.6 Phase 6:改造业务代码(Day 6-7?天)?**已更?* #### **任务6.1:改造ASL筛选服务(?完整版:拆分+断点+队列?* ```typescript // 文件:backend/src/modules/asl/services/screeningService.ts import { prisma } from '../../../config/database.js'; import { logger } from '../../../common/logging/index.js'; import { jobQueue, splitIntoChunks, recommendChunkSize, CheckpointService // ?新增 } from '../../../common/jobs/index.js'; import { llmScreeningService } from './llmScreeningService.js'; /** * 启动筛选任务(改造后:使用队列) */ export async function startScreeningTask(projectId: string, userId: string) { try { logger.info('Starting screening task', { projectId, userId }); // 1. 检查项目是否存? const project = await prisma.aslScreeningProject.findFirst({ where: { id: projectId, userId }, }); if (!project) { throw new Error('Project not found'); } // 2. 获取该项目的所有文? const literatures = await prisma.aslLiterature.findMany({ where: { projectId }, }); if (literatures.length === 0) { throw new Error('No literatures found in project'); } logger.info('Found literatures for screening', { projectId, count: literatures.length }); // 3. 创建筛选任务(数据库记录) const task = await prisma.aslScreeningTask.create({ data: { projectId, taskType: 'title_abstract', status: 'pending', // ?初始状态改为pending totalItems: literatures.length, processedItems: 0, successItems: 0, failedItems: 0, conflictItems: 0, startedAt: new Date(), }, }); logger.info('Screening task created', { taskId: task.id }); // 4. ?推送到队列(异步处理,不阻塞请求) await jobQueue.push('asl:title-screening', { taskId: task.id, projectId, literatureIds: literatures.map(lit => lit.id), }); logger.info('Task pushed to queue', { taskId: task.id }); // 5. 立即返回任务ID(前端可以轮询进度) return task; } catch (error) { logger.error('Failed to start screening task', { error, projectId }); throw error; } } /** * ?注册队列Worker(在应用启动时调用) * * 这个函数需要在 backend/src/index.ts 中调? */ export function registerScreeningWorkers() { // 注册标题摘要筛选Worker jobQueue.process('asl:title-screening', async (job) => { const { taskId, projectId, literatureIds } = job.data; logger.info('开始处理标题摘要筛?, { taskId, total: literatureIds.length }); try { // 更新任务状态为running await prisma.aslScreeningTask.update({ where: { id: taskId }, data: { status: 'running' } }); // 获取项目的PICOS标准 const project = await prisma.aslScreeningProject.findUnique({ where: { id: projectId }, }); if (!project) { throw new Error('Project not found'); } const rawPicoCriteria = project.picoCriteria as any; const picoCriteria = { P: rawPicoCriteria?.P || rawPicoCriteria?.population || '', I: rawPicoCriteria?.I || rawPicoCriteria?.intervention || '', C: rawPicoCriteria?.C || rawPicoCriteria?.comparison || '', O: rawPicoCriteria?.O || rawPicoCriteria?.outcome || '', S: rawPicoCriteria?.S || rawPicoCriteria?.studyDesign || '', }; // 逐个处理文献 let successCount = 0; let failedCount = 0; let conflictCount = 0; for (let i = 0; i < literatureIds.length; i++) { const literatureId = literatureIds[i]; try { // 获取文献信息 const literature = await prisma.aslLiterature.findUnique({ where: { id: literatureId }, }); if (!literature) { failedCount++; continue; } // 调用LLM筛选服? const screeningResult = await llmScreeningService.screenSingleLiterature( literature.title || '', literature.abstract || '', picoCriteria, projectId ); // 判断是否冲突 const isConflict = screeningResult.deepseekDecision !== screeningResult.qwenDecision; // 保存筛选结? await prisma.aslScreeningResult.create({ data: { literatureId, projectId, taskId, taskType: 'title_abstract', deepseekDecision: screeningResult.deepseekDecision, deepseekReason: screeningResult.deepseekReason, deepseekRawResponse: screeningResult.deepseekRawResponse, qwenDecision: screeningResult.qwenDecision, qwenReason: screeningResult.qwenReason, qwenRawResponse: screeningResult.qwenRawResponse, finalDecision: screeningResult.finalDecision, hasConflict: isConflict, manualReviewRequired: isConflict, }, }); if (isConflict) { conflictCount++; } else { successCount++; } } catch (error) { logger.error('处理文献失败', { literatureId, error }); failedCount++; } // 更新进度(每处理10篇或最后一篇时更新? if ((i + 1) % 10 === 0 || i === literatureIds.length - 1) { await prisma.aslScreeningTask.update({ where: { id: taskId }, data: { processedItems: i + 1, successItems: successCount, failedItems: failedCount, conflictItems: conflictCount, } }); } } // 标记任务完成 await prisma.aslScreeningTask.update({ where: { id: taskId }, data: { status: 'completed', completedAt: new Date(), processedItems: literatureIds.length, successItems: successCount, failedItems: failedCount, conflictItems: conflictCount, } }); logger.info('标题摘要筛选完?, { taskId, total: literatureIds.length, success: successCount, failed: failedCount, conflict: conflictCount }); return { success: true, processed: literatureIds.length, successCount, failedCount, conflictCount, }; } catch (error) { // 任务失败,更新状? await prisma.aslScreeningTask.update({ where: { id: taskId }, data: { status: 'failed', completedAt: new Date(), } }); logger.error('标题摘要筛选失?, { taskId, error }); throw error; } }); logger.info('?ASL筛选Worker已注?); } ``` #### **任务4.2:更新应用入口(注册Workers?* ```typescript // 文件:backend/src/index.ts import Fastify from 'fastify'; import { logger } from './common/logging/index.js'; import { startCacheCleanupTask } from './common/cache/index.js'; // ?新增 import { registerScreeningWorkers } from './modules/asl/services/screeningService.js'; // ?新增 const app = Fastify({ logger: false }); // ... 注册路由?... // ?启动服务器后,注册队列Workers和定时任?app.listen({ port: 3001, host: '0.0.0.0' }, async (err, address) => { if (err) { logger.error('Failed to start server', { error: err }); process.exit(1); } logger.info(`Server listening on ${address}`); // ?启动缓存定时清理 if (process.env.CACHE_TYPE === 'postgres') { startCacheCleanupTask(); } // ?注册队列Workers if (process.env.QUEUE_TYPE === 'pgboss') { try { registerScreeningWorkers(); // ASL模块 // registerDataCleaningWorkers(); // DC模块(待实现? // registerStatisticalWorkers(); // SSA模块(待实现? } catch (error) { logger.error('Failed to register workers', { error }); } } }); // 优雅关闭 process.on('SIGTERM', async () => { logger.info('SIGTERM received, closing server gracefully...'); await app.close(); process.exit(0); }); ``` #### **任务4.3:DC模块改造(次优先)** ```typescript // 文件:backend/src/modules/dc/tool-b/services/MedicalRecordExtractionService.ts // (仅示例,实际根据需求实现) import { jobQueue } from '../../../../common/jobs/index.js'; export function registerMedicalExtractionWorkers() { jobQueue.process('dc:medical-extraction', async (job) => { const { taskId, recordIds } = job.data; // 批量提取病历 for (const recordId of recordIds) { await extractSingleRecord(recordId); // 更新进度 // ... } return { success: true }; }); } ``` --- ### 4.5 Phase 5:测试验证(Day 6?天) #### **任务5.1:单元测?* ```typescript // 文件:backend/tests/common/cache/PostgresCacheAdapter.test.ts import { describe, it, expect, beforeEach } from 'vitest'; import { PostgresCacheAdapter } from '../../../src/common/cache/PostgresCacheAdapter.js'; import { prisma } from '../../../src/config/database.js'; describe('PostgresCacheAdapter', () => { let cache: PostgresCacheAdapter; beforeEach(async () => { cache = new PostgresCacheAdapter(); // 清空测试数据 await prisma.appCache.deleteMany({}); }); it('should set and get cache', async () => { await cache.set('test:key', { value: 'hello' }, 60); const result = await cache.get('test:key'); expect(result).toEqual({ value: 'hello' }); }); it('should return null for expired cache', async () => { await cache.set('test:key', { value: 'hello' }, 1); // 1秒过? await new Promise(resolve => setTimeout(resolve, 1100)); // 等待1.1? const result = await cache.get('test:key'); expect(result).toBeNull(); }); it('should delete cache', async () => { await cache.set('test:key', { value: 'hello' }, 60); await cache.delete('test:key'); const result = await cache.get('test:key'); expect(result).toBeNull(); }); // ... 更多测试 ... }); ``` #### **任务5.2:集成测?* ```bash # 1. 启动本地Postgres docker start ai-clinical-postgres # 2. 设置环境变量 export CACHE_TYPE=postgres export QUEUE_TYPE=pgboss export DATABASE_URL=postgresql://postgres:123456@localhost:5432/aiclincial # 3. 运行迁移 cd backend npx prisma migrate deploy # 4. 启动应用 npm run dev # 应该看到日志?# [CacheFactory] 使用PostgresCacheAdapter # [JobFactory] 使用PgBossQueue # [PgBoss] 队列已启?# [PostgresCache] 定时清理任务已启?# ?ASL筛选Worker已注?``` #### **任务5.3:功能测?* ```bash # 测试1:缓存功?curl -X POST http://localhost:3001/api/v1/asl/projects/:projectId/screening # 观察日志?# [PostgresCache] 缓存命中 { key: 'asl:llm:...' } # 测试2:队列功?# 提交任务 ?观察任务状态变?# pending ?running ?completed # 测试3:实例重启恢?# 提交任务 ?等待处理?0% ?Ctrl+C停止 ?重新启动 # 任务应该自动恢复并继?``` --- ## 5. 优先级与依赖关系 ### 5.1 改造优先级矩阵(✨ 已更新) | 模块 | 优先?| 工作?| 业务价?| 风险 | 依赖 | 状?| |------|--------|--------|---------|------|------|------| | **环境准备** | P0 | 0.5?| - | ?| ?| ?待开?| | **PostgresCacheAdapter** | P0 | 0.5?| 降低LLM成本50% | ?| 环境准备 | ?待开?| | **PgBossQueue** | P0 | 2?| 长任务可靠?| ?| 环境准备 | ?待开?| | **任务拆分机制** | P0 | 1?| 任务成功?> 99% | ?| PgBossQueue | ?待开?| | **断点续传机制** | P0 | 1?| 容错性提?0?| ?| PgBossQueue | ?待开?| | **ASL筛选改?* | P0 | 1.5?| 用户核心功能 | ?| 任务拆分+断点 | ?待开?| | **DC提取改?* | P1 | 1?| 用户核心功能 | ?| 任务拆分+断点 | ?待开?| | **测试验证** | P0 | 1.5?| 保证质量 | ?| 所有改?| ?待开?| | **SAE部署** | P1 | 0.5?| 生产就绪 | ?| 测试验证 | ?待开?| **总工作量?* 9天(比V1.0增加2天,增加任务拆分和断点续传) ### 5.2 依赖关系图(?已更新) ``` Day 1: 环境准备 ────────────────? ?Day 1: PostgresCache ──────────? ?Day 2-3: PgBossQueue ───────────? ?Day 4: 任务拆分机制 ─────────────? ├─?Day 6-7: ASL筛选改?──?Day 5: 断点续传机制 ─────────────? ? ? ?Day 7: DC提取改?──────────────? ? ? ├─?Day 8-9: 测试 + 部署 ``` ### 5.3 可并行工作(?已更新) ``` Day 1(并行)?├─ 环境准备(上午) └─ PostgresCache实现(下午) Day 2-3(串行)?└─ PgBossQueue实现(必须等环境准备完成? Day 4-5(可并行): ├─ 任务拆分机制(工具函数) └─ 断点续传机制(数据库字段? Day 6-7(串行)?├─ ASL筛选改造(使用拆分+断点?└─ DC提取改造(复用拆分+断点? Day 8-9(并行)?├─ 测试验证 └─ 文档完善 ``` --- ## 6. 测试验证方案(✨ 已更新) ### 6.1 测试清单 #### **功能测试(基础?* ```bash ?缓存读写正常 ?缓存过期自动清理(每分钟1000条) ?缓存多实例共享(实例A写→实例B读) ?队列任务入队(pg-boss??队列任务处理(Worker??队列任务重试(模拟失败,3次重试) ?队列实例重启恢复 ``` #### **任务拆分测试** ?**新增** ```bash ?任务拆分工具函数正确? - splitIntoChunks([1..100], 30) ?4批(30+30+30+10? - recommendChunkSize(1000, 7.2, 900) ?125 ?拆分任务入队 - 1000篇文??10批次,每?00? - 验证数据库:totalBatches=10, processedBatches=0 ?批次并行处理 - 多个Worker同时处理不同批次 - 验证无冲突(SKIP LOCKED? ?批次失败重试 - 模拟?批失??自动重试 - 其他批次不受影响 ``` #### **断点续传测试** ?**新增** ```bash ?断点保存 - 处理到第50??保存断点 - 验证数据库:currentIndex=50, lastCheckpoint更新 ?断点恢复 - 任务中断(Ctrl+C? - 重启服务 ?从第50项继? - 验证:前50项不重复处理 ?批次级断? - 10批次任务,完成前3? - 实例重启 ?从第4批继? - 验证:processedBatches=3, currentBatchIndex=3 ``` #### **长时间任务测?* 🔴 **重点** ```bash ?1000篇文献筛选(?小时? - 拆分?0批,每批12分钟 - 成功?> 99% - 验证进度更新(每10篇) ?10000篇文献筛选(?0小时? - 拆分?00批,每批12分钟 - 10个Worker并行 ?2小时完成 - 成功?> 99.5% ?实例重启恢复(关键测试) - 启动任务 ?等待50% ?停止服务(Ctrl+C? - 重启服务 ?任务自动恢复 - 验证:从50%继续,不?%开? - 预期:总耗时 ?原计划时?× 1.05(误?%内) ``` #### **性能测试** ```bash ?缓存读取延迟 < 5ms(P99??缓存写入延迟 < 10ms(P99??队列吞吐?> 100任务/小时 ?断点保存延迟 < 20ms ?批次切换延迟 < 100ms ``` #### **故障测试(增强)** ```bash ?实例销毁(SAE缩容? - 正在处理任务 ?实例销? - 等待10分钟 ?新实例接? - 任务从断点恢? ?数据库连接断开 - 处理任务??断开连接 - 自动重连 ?继续处理 ?任务处理失败 - 模拟LLM超时 - 自动重试3? - 失败后标记为failed ?Postgres慢查? - 模拟数据库慢? 5秒) - 任务不失败,等待完成 ?并发冲突 - 2个Worker领取同一批次 - pg-boss SKIP LOCKED机制 - 验证:只?个Worker处理 ?发布更新(生产场景) - 15:00发布更新 - 正在执行?批任? - 实例重启 ?5批任务自动恢?``` ### 6.2 测试脚本 #### **测试1:完整流程(1000篇文献)** ```bash # 1. 准备测试数据 cd backend npm run test:seed -- --literatures=1000 # 2. 启动服务 npm run dev # 3. 提交任务 curl -X POST http://localhost:3001/api/v1/asl/projects/:projectId/screening \ -H "Content-Type: application/json" # 4. 观察日志 # [TaskSplit] 任务拆分完成 { total: 1000, chunkSize: 100, chunks: 10 } # [PgBoss] 任务入队 { type: 'asl:title-screening-batch', jobId: '1' } # ... # [PgBoss] 批次处理完成 { taskId, batchIndex: 0, progress: '1/10' } # 5. 查询进度 curl http://localhost:3001/api/v1/asl/tasks/:taskId # 预期响应?# { # "id": "task_123", # "status": "running", # "totalItems": 1000, # "processedItems": 350, # "totalBatches": 10, # "processedBatches": 3, # "progress": 0.35 # } ``` #### **测试2:断点恢复(关键测试?* ```bash # 1. 启动任务 curl -X POST http://localhost:3001/api/v1/asl/projects/:projectId/screening # 2. 等待处理?0% # 观察日志:processedItems: 500 # 3. 强制停止服务 # Ctrl+C ?kill -9 # 4. 重新启动服务 npm run dev # 5. 观察日志 # [Checkpoint] 断点恢复 { taskId, startIndex: 500 } # [PgBoss] 开始处理任?{ batchIndex: 5 } ?从第6批继? # 6. 验证最终结?# 总耗时应该约等?2小时 + 重启时间? 5分钟?# 不应该是 4小时(从头开始) ``` #### **测试3:并发处理(10000篇)** ```bash # 1. 准备大数据集 npm run test:seed -- --literatures=10000 # 2. 启动多个Worker实例(模拟SAE多实例) # Terminal 1 npm run dev -- --port=3001 # Terminal 2 npm run dev -- --port=3002 # Terminal 3 npm run dev -- --port=3003 # 3. 提交任务(任意一个实例) curl -X POST http://localhost:3001/api/v1/asl/projects/:projectId/screening # 4. 观察三个实例的日?# 应该看到?个实例同时处理不同批?# Worker1: 处理批次 0, 3, 6, 9, ... # Worker2: 处理批次 1, 4, 7, 10, ... # Worker3: 处理批次 2, 5, 8, 11, ... # 5. 验证完成时间 # 100批次 / 3个Worker ?33.3批轮?× 12分钟 ?6.6小时 # 单Worker需要:100?× 12分钟 = 20小时 # 加速比?0 / 6.6 ?3?``` ### 6.3 监控指标(✨ 已更新) ```typescript // 缓存监控 - cache_hit_rate: 命中?(目标 > 60%) - cache_total_count: 总数 - cache_expired_count: 过期数量 - cache_by_module: 各模块分? // 队列监控(基础?- queue_pending_count: 待处理任?- queue_processing_count: 处理中任?- queue_completed_count: 完成任务 - queue_failed_count: 失败任务 - queue_avg_duration: 平均耗时 // 任务拆分监控 ?新增 - task_total_batches: 总批次数 - task_processed_batches: 已完成批次数 - task_batch_success_rate: 批次成功?(目标 > 99%) - task_avg_batch_duration: 平均批次耗时 // 断点续传监控 ?新增 - checkpoint_save_count: 断点保存次数 - checkpoint_restore_count: 断点恢复次数 - checkpoint_save_duration: 保存耗时 (目标 < 20ms) - task_recovery_success_rate: 恢复成功?(目标 100%) ``` ### 6.4 成功标准(✨ 已更新) ```bash 基础功能??所有单元测试通过 ?所有集成测试通过 ?缓存命中?> 60% ?LLM API调用次数下降 > 40% 任务可靠?🔴 关键??1000篇文献筛选成功率 > 99% ?10000篇文献筛选成功率 > 99.5% ?实例重启后任务自动恢复成功率 100% ?断点恢复后不重复处理已完成项 ?批次失败只需重试单批(不重试全部? 性能指标??单批次处理时?< 15分钟 ?断点保存延迟 < 20ms ?10个Worker并行加速比 > 8? 生产验证??生产环境运行48小时无错??处理3个完整的1000篇文献筛选任??至少1次实例重启恢复测试成??无用户投诉任务丢??系统可用?> 99.9% ``` --- ## 7. 上线与回? ### 7.1 上线步骤 ```bash # Step 1: 数据库迁移(生产环境?npx prisma migrate deploy # Step 2: 更新SAE环境变量 CACHE_TYPE=postgres QUEUE_TYPE=pgboss # Step 3: 灰度发布?个实例) # 观察24小时,监控指标正? # Step 4: 全量发布?-3个实例) # 逐步扩容 # Step 5: 清理旧代?# 移除MemoryQueue相关代码(可选) ``` ### 7.2 回滚方案 ```bash # 如果出现问题,立即回滚: # 方案1:环境变量回滚(最快) CACHE_TYPE=memory QUEUE_TYPE=memory # 重启应用,降级到内存模式 # 方案2:代码回?git revert # 回滚到改造前版本 # 方案3:数据库回滚 npx prisma migrate down # 删除 app_cache 表(可选) ``` ### 7.3 风险预案 | 风险 | 概率 | 影响 | 预案 | |------|------|------|------| | Postgres性能不足 | ?| ?| 回滚到内存模?| | pg-boss连接失败 | ?| ?| 降级到同步处?| | 缓存数据过大 | ?| ?| 增加清理频率 | | 长任务卡?| ?| ?| 手动kill任务 | --- ## 8. 成功标准(✨ V2.0更新? ### 8.1 技术指? | 指标类别 | 指标 | 目标?| 衡量方法 | |---------|------|--------|---------| | **缓存** | 命中?| > 60% | 监控统计 | | | 持久?| ?实例重启不丢?| 重启测试 | | | 多实例共?| ?A写B能读 | 并发测试 | | **队列** | 任务持久?| ?实例销毁不丢失 | 销毁测?| | | 长任务可靠?| > 99% | 1000篇筛?| | | 超长任务可靠?| > 99.5% | 10000篇筛?| | **拆分** | 批次成功?| > 99% | 批次统计 | | | 批次耗时 | < 15分钟 | 监控统计 | | | 并行加速比 | > 8倍(10 Worker?| 对比测试 | | **断点** | 断点保存延迟 | < 20ms | 性能测试 | | | 恢复成功?| 100% | 重启测试 | | | 重复处理?| 0% | 数据验证 | ### 8.2 业务指标 | 指标 | 改造前 | 改造后 | 改进幅度 | |------|--------|--------|---------| | **LLM API成本** | 基线 | -40~60% | 节省¥X/?| | **任务成功?* | 10-30% | > 99% | 提升3-10?| | **用户重复提交** | 平均3?| < 1.1?| 减少70% | | **任务完成时间** | 不确定(可能失败?| 稳定可预?| 体验提升 | | **用户满意?* | 基线 | 显著提升 | 问卷调查 | ### 8.3 验收清单 ```bash Phase 1-3:基础设施 ??PostgresCacheAdapter实现并通过测试 ?PgBossQueue实现并通过测试 ?本地环境验证通过 Phase 4-5:高级特???任务拆分工具函数测试通过 ?断点续传服务测试通过 ?数据库Schema迁移成功 Phase 6-7:业务集???ASL筛选服务改造完??DC提取服务改造完??Worker注册成功 关键功能测试 🔴??1000篇文献筛选(2小时)成功率 > 99% ?实例重启恢复测试通过?次) ?断点续传测试通过(从50%恢复??批次并行处理测试通过 ?失败重试测试通过 生产环境验证 🔴??生产环境运行48小时无致命错??完成至少3个真实用户任务(1000???至少经历1次SAE实例重启,任务成功恢??缓存命中?> 60% ?LLM API调用量下?> 40% ?无用户投诉任务丢?``` --- ## 9. 附录 ### 9.1 参考文? - [Postgres-Only 全能架构解决方案](./08-Postgres-Only 全能架构解决方案.md) - [pg-boss 官方文档](https://github.com/timgit/pg-boss) - [Prisma 多Schema支持](https://www.prisma.io/docs/concepts/components/prisma-schema/multi-schema) ### 9.2 关键代码位置(✨ V2.0更新? ``` backend/src/ ├── common/cache/ # 缓存系统 ? ├── CacheAdapter.ts # ?已存?? ├── CacheFactory.ts # ⚠️ 需修改?10行) ? ├── MemoryCacheAdapter.ts # ?已存?? ├── RedisCacheAdapter.ts # 🔴 占位符(不用管) ? ├── PostgresCacheAdapter.ts # ?需新增(~300行) ? └── index.ts # ⚠️ 需修改(导出) ?├── common/jobs/ # 任务队列 ? ├── types.ts # ?已存?? ├── JobFactory.ts # ⚠️ 需修改?10行) ? ├── MemoryQueue.ts # ?已存?? ├── PgBossQueue.ts # ?需新增(~400行) ? ├── utils.ts # ?需新增(~200行)?? ├── CheckpointService.ts # ?需新增(~150行)?? └── index.ts # ⚠️ 需修改(导出) ?├── modules/asl/services/ # ASL业务?? ├── screeningService.ts # ⚠️ 需改造(~150行改动) ? └── llmScreeningService.ts # ?无需改动 ?├── modules/dc/tool-b/services/ # DC业务?? └── (类似ASL,按需改造) ?├── config/ ? └── env.ts # ⚠️ 需添加环境变量 ?└── index.ts # ⚠️ 需修改(注册Workers + 启动清理? prisma/ ├── schema.prisma # ⚠️ 需修改?AppCache +字段?└── migrations/ # 自动生成 tests/ # 测试文件 ├── common/cache/ ? └── PostgresCacheAdapter.test.ts # ?需新增 ├── common/jobs/ ? ├── PgBossQueue.test.ts # ?需新增 ? ├── utils.test.ts # ?需新增 ?? └── CheckpointService.test.ts # ?需新增 ?└── modules/asl/ └── screening-integration.test.ts # ?需新增 backend/.env # ⚠️ 需修改 ``` **文件状态说明:** - ?已存?- 无需改动 - ⚠️ 需修改 - 少量改动? 50行) - ?需新增 - 全新文件 - 🔴 占位?- 忽略(不影响本次改造) - ?V2.0新增 - 支持拆分+断点 **代码行数统计?* ``` 总新增代码:~1800?├─ PostgresCacheAdapter.ts: 300?├─ PgBossQueue.ts: 400?├─ utils.ts: 200??├─ CheckpointService.ts: 150??├─ screeningService.ts改? 200?├─ 测试代码: 400?└─ 其他(Factory、导出等? 150? 总修改代码:~100?├─ CacheFactory.ts: 10?├─ JobFactory.ts: 10?├─ index.ts: 20?├─ env.ts: 20?├─ schema.prisma: 40?└─ 各处导出: ?0?``` --- ## 10. V2.0 vs V1.0 对比 | 维度 | V1.0(原计划?| V2.0(当前版本) | 变化 | |------|--------------|-----------------|------| | **工作?* | 7?| 9?| +2?| | **代码行数** | ~1000?| ~1900?| +900?| | **核心策略** | 缓存+队列 | 缓存+队列+拆分+断点 | +2个策?| | **长任务支?* | < 4小时 | 任意时长(拆分后?| 质的提升 | | **任务成功?* | 85-90% | > 99% | 提升10%+ | | **实例重启恢复** | 从头开?| 断点续传 | 避免浪费 | | **并发能力** | 单实例串?| 多实例并?| 加速N?| | **生产就绪?* | 基本可用 | 完全就绪 | 企业?| **为什么增?天?** - Day 4:任务拆分机制(工具函数+测试?- Day 5:断点续传机制(服务+数据库) **增加的价值:** - ?长任务可靠性从85% ?99%+ - ?支持任意时长任务(通过拆分?- ?实例重启不浪费已处理结果 - ?多Worker并行,速度提升N?- ?符合Serverless最佳实? **结论?* 多花2天,换取质的飞跃?*非常值得**? --- ## 11. 快速开? ### 11.1 一键检查清? ```bash # 1. 检查代码结构(应该都存在) ls backend/src/common/cache/CacheAdapter.ts # ?ls backend/src/common/cache/MemoryCacheAdapter.ts # ?ls backend/src/common/jobs/types.ts # ?ls backend/src/common/jobs/MemoryQueue.ts # ? # 2. 检查需要新增的文件(应该不存在?ls backend/src/common/cache/PostgresCacheAdapter.ts # ?待新?ls backend/src/common/jobs/PgBossQueue.ts # ?待新?ls backend/src/common/jobs/utils.ts # ?待新?ls backend/src/common/jobs/CheckpointService.ts # ?待新? # 3. 检查依?cd backend npm list pg-boss # ?需安装 # 4. 检查数据库 psql -d aiclincial -c "\dt platform_schema.*" # 应该看到现有的业务表,但没有app_cache ``` ### 11.2 立即开? ```bash # Phase 1:环境准备(30分钟?cd backend npm install pg-boss --save # 修改 prisma/schema.prisma(添加AppCache?npx prisma migrate dev --name add_postgres_cache npx prisma generate # Phase 2:实现PostgresCacheAdapter?小时?# 创建 src/common/cache/PostgresCacheAdapter.ts # 修改 src/common/cache/CacheFactory.ts # 测试验证 # Phase 3:实现PgBossQueue?6小时?天) # 创建 src/common/jobs/PgBossQueue.ts # 修改 src/common/jobs/JobFactory.ts # 测试验证 # ... 按计划继?``` --- **🎯 现在就开始?** 建议?1. **先通读文档** - 理解整体架构?0分钟?2. **验证代码结构** - 确认真实文件?0分钟?3. **开始Phase 1** - 环境准备?0分钟?4. **逐步推进** - 每完成一个Phase就测? 有任何问题随时沟通!🚀