feat(platform): Complete Postgres-Only architecture refactoring (Phase 1-7)

Major Changes:
- Implement Platform-Only architecture pattern (unified task management)
- Add PostgresCacheAdapter for unified caching (platform_schema.app_cache)
- Add PgBossQueue for job queue management (platform_schema.job)
- Implement CheckpointService using job.data (generic for all modules)
- Add intelligent threshold-based dual-mode processing (THRESHOLD=50)
- Add task splitting mechanism (auto chunk size recommendation)
- Refactor ASL screening service with smart mode selection
- Refactor DC extraction service with smart mode selection
- Register workers for ASL and DC modules

Technical Highlights:
- All task management data stored in platform_schema.job.data (JSONB)
- Business tables remain clean (no task management fields)
- CheckpointService is generic (shared by all modules)
- Zero code duplication (DRY principle)
- Follows 3-layer architecture principle
- Zero additional cost (no Redis needed, save 8400 CNY/year)

Code Statistics:
- New code: ~1750 lines
- Modified code: ~500 lines
- Test code: ~1800 lines
- Documentation: ~3000 lines

Testing:
- Unit tests: 8/8 passed
- Integration tests: 2/2 passed
- Architecture validation: passed
- Linter errors: 0

Files:
- Platform layer: PostgresCacheAdapter, PgBossQueue, CheckpointService, utils
- ASL module: screeningService, screeningWorker
- DC module: ExtractionController, extractionWorker
- Tests: 11 test files
- Docs: Updated 4 key documents

Status: Phase 1-7 completed, Phase 8-9 pending
This commit is contained in:
2025-12-13 16:10:04 +08:00
parent a3586cdf30
commit fa72beea6c
135 changed files with 17508 additions and 91 deletions

View File

@@ -1,6 +1,8 @@
import { CacheAdapter } from './CacheAdapter.js'
import { MemoryCacheAdapter } from './MemoryCacheAdapter.js'
import { RedisCacheAdapter } from './RedisCacheAdapter.js'
import { PostgresCacheAdapter } from './PostgresCacheAdapter.js'
import { PrismaClient } from '@prisma/client'
/**
* 缓存工厂类
@@ -8,16 +10,18 @@ import { RedisCacheAdapter } from './RedisCacheAdapter.js'
* 根据环境变量自动选择缓存实现:
* - CACHE_TYPE=memory: 使用MemoryCacheAdapter内存缓存
* - CACHE_TYPE=redis: 使用RedisCacheAdapterRedis缓存
* - CACHE_TYPE=postgres: 使用PostgresCacheAdapterPostgres缓存
*
* 零代码切换:
* - 本地开发不配置CACHE_TYPE默认使用memory
* - 云端部署配置CACHE_TYPE=redis自动切换到Redis
* - Postgres-Only架构配置CACHE_TYPE=postgres
* - 高性能场景配置CACHE_TYPE=redis
*
* @example
* ```typescript
* import { cache } from '@/common/cache'
*
* // 业务代码不关心是memory还是redis
* // 业务代码不关心具体实现
* await cache.set('user:123', userData, 60)
* const user = await cache.get('user:123')
* ```
@@ -48,6 +52,9 @@ export class CacheFactory {
case 'redis':
return this.createRedisAdapter()
case 'postgres':
return this.createPostgresAdapter()
default:
console.warn(`[CacheFactory] Unknown CACHE_TYPE: ${cacheType}, fallback to memory`)
return this.createMemoryAdapter()
@@ -89,6 +96,22 @@ export class CacheFactory {
})
}
/**
* 创建Postgres缓存适配器
*/
private static createPostgresAdapter(): PostgresCacheAdapter {
console.log('[CacheFactory] Using PostgresCacheAdapter (Postgres-Only架构)')
// 获取全局Prisma实例
// 注意需要确保Prisma已经初始化
const prisma = global.prisma || new PrismaClient()
if (!global.prisma) {
global.prisma = prisma
}
return new PostgresCacheAdapter(prisma)
}
/**
* 重置实例(用于测试)
*/

View File

@@ -0,0 +1,349 @@
import { CacheAdapter } from './CacheAdapter.js'
import { PrismaClient } from '@prisma/client'
/**
* Postgres缓存适配器
*
* 适用场景:
* - Postgres-Only架构无需Redis
* - 云原生Serverless环境SAE
* - 多实例部署需要共享缓存
*
* 特点:
* - ✅ 无需额外Redis实例降低成本
* - ✅ 多实例自动共享缓存
* - ✅ 数据持久化,实例重启不丢失
* - ✅ 适合中小规模应用(<10万MAU
* - ⚠️ 性能低于Redis但足够
* - ⚠️ 需要定期清理过期数据
*
* 性能指标:
* - 单次get/set: ~2-5ms
* - 批量操作(10条): ~10-20ms
* - 适用并发: <100 QPS
*
* @example
* ```typescript
* const cache = new PostgresCacheAdapter(prisma)
* await cache.set('llm:result:abc', data, 3600) // 1小时过期
* const data = await cache.get('llm:result:abc')
* ```
*/
export class PostgresCacheAdapter implements CacheAdapter {
private prisma: PrismaClient
private cleanupTimer: NodeJS.Timeout | null = null
private readonly CLEANUP_INTERVAL = 5 * 60 * 1000 // 5分钟
private readonly CLEANUP_BATCH_SIZE = 1000 // 每次最多删除1000条
constructor(prisma: PrismaClient) {
this.prisma = prisma
// 启动后台清理任务
this.startCleanupTask()
}
/**
* 启动定期清理过期缓存
*
* 策略:
* - 每5分钟运行一次
* - 每次最多删除1000条避免长事务锁表
* - 使用WHERE expires_at < NOW()快速定位
*/
private startCleanupTask(): void {
if (process.env.NODE_ENV === 'test') {
return // 测试环境不启动定时任务
}
this.cleanupTimer = setInterval(async () => {
try {
await this.cleanupExpired()
} catch (error) {
console.error('[PostgresCacheAdapter] Cleanup failed:', error)
}
}, this.CLEANUP_INTERVAL)
console.log('[PostgresCacheAdapter] Cleanup task started (interval: 5min, batch: 1000)')
}
/**
* 停止清理任务
*/
destroy(): void {
if (this.cleanupTimer) {
clearInterval(this.cleanupTimer)
this.cleanupTimer = null
console.log('[PostgresCacheAdapter] Cleanup task stopped')
}
}
/**
* 清理过期缓存(批量删除)
*
* 优化点:
* - LIMIT 1000 避免大事务
* - DELETE 使用索引 (idx_app_cache_expires)
* - 快照读不阻塞其他查询
*/
private async cleanupExpired(): Promise<void> {
try {
const result = await this.prisma.$executeRaw`
DELETE FROM platform_schema.app_cache
WHERE id IN (
SELECT id FROM platform_schema.app_cache
WHERE expires_at < NOW()
LIMIT ${this.CLEANUP_BATCH_SIZE}
)
`
if (result > 0) {
console.log(`[PostgresCacheAdapter] Cleanup: removed ${result} expired entries`)
}
} catch (error) {
console.error('[PostgresCacheAdapter] Cleanup error:', error)
throw error
}
}
/**
* 获取缓存值
*
* 逻辑:
* 1. SELECT + 过期检查
* 2. 如果过期,顺手删除(懒惰删除)
* 3. 返回值或null
*/
async get<T = any>(key: string): Promise<T | null> {
try {
const entry = await this.prisma.appCache.findUnique({
where: { key }
})
if (!entry) {
return null
}
// 检查是否过期
if (entry.expiresAt < new Date()) {
// 过期了删除并返回null懒惰删除
await this.prisma.appCache.delete({
where: { key }
}).catch(() => {
// 删除失败不影响主流程
})
return null
}
// 返回缓存值
return entry.value as T
} catch (error) {
console.error(`[PostgresCacheAdapter] get() error for key: ${key}`, error)
return null // 缓存失败不影响业务
}
}
/**
* 设置缓存值
*
* 逻辑:
* 1. 计算过期时间(秒 -> 毫秒 -> Date
* 2. UPSERT (INSERT ON CONFLICT UPDATE)
*/
async set(key: string, value: any, ttl?: number): Promise<void> {
try {
// 计算过期时间默认7天
const defaultTTL = 7 * 24 * 60 * 60 // 7天
const expiresAt = new Date(Date.now() + (ttl || defaultTTL) * 1000)
await this.prisma.appCache.upsert({
where: { key },
update: {
value: value as any, // Prisma会自动处理JSON
expiresAt
},
create: {
key,
value: value as any,
expiresAt
}
})
} catch (error) {
console.error(`[PostgresCacheAdapter] set() error for key: ${key}`, error)
throw error
}
}
/**
* 删除缓存
*/
async delete(key: string): Promise<void> {
try {
await this.prisma.appCache.delete({
where: { key }
}).catch(() => {
// Key不存在也算成功
})
} catch (error) {
console.error(`[PostgresCacheAdapter] delete() error for key: ${key}`, error)
// 删除失败不抛错
}
}
/**
* 清空所有缓存
* ⚠️ 生产环境慎用!
*/
async clear(): Promise<void> {
try {
const result = await this.prisma.appCache.deleteMany({})
console.log(`[PostgresCacheAdapter] Cleared ${result.count} cache entries`)
} catch (error) {
console.error('[PostgresCacheAdapter] clear() error:', error)
throw error
}
}
/**
* 检查缓存是否存在
*/
async has(key: string): Promise<boolean> {
try {
const entry = await this.prisma.appCache.findUnique({
where: { key },
select: { expiresAt: true }
})
if (!entry) {
return false
}
// 检查是否过期
if (entry.expiresAt < new Date()) {
// 过期了,顺手删除
await this.prisma.appCache.delete({
where: { key }
}).catch(() => {})
return false
}
return true
} catch (error) {
console.error(`[PostgresCacheAdapter] has() error for key: ${key}`, error)
return false
}
}
/**
* 批量获取缓存
*
* 优化:
* - 一次查询获取所有key
* - 客户端过滤过期数据
*/
async mget<T = any>(keys: string[]): Promise<(T | null)[]> {
if (keys.length === 0) {
return []
}
try {
// 一次性查询所有key
const entries = await this.prisma.appCache.findMany({
where: {
key: { in: keys }
}
})
// 构建key -> entry映射
const entryMap = new Map(entries.map((e) => [e.key, e] as const))
const now = new Date()
// 按keys顺序返回结果
return keys.map(key => {
const entry = entryMap.get(key)
if (!entry) {
return null
}
// 检查过期
if (entry.expiresAt < now) {
// 过期了,异步删除(不阻塞返回)
this.prisma.appCache.delete({
where: { key }
}).catch(() => {})
return null
}
return entry.value as T
})
} catch (error) {
console.error('[PostgresCacheAdapter] mget() error:', error)
// 返回全null缓存失败不影响业务
return keys.map(() => null)
}
}
/**
* 批量设置缓存
*
* 优化:
* - 使用事务批量插入
* - 遇到冲突则更新
*/
async mset(entries: Array<{ key: string; value: any }>, ttl?: number): Promise<void> {
if (entries.length === 0) {
return
}
try {
// 计算过期时间
const defaultTTL = 7 * 24 * 60 * 60 // 7天
const expiresAt = new Date(Date.now() + (ttl || defaultTTL) * 1000)
// 使用事务批量upsert
await this.prisma.$transaction(
entries.map(({ key, value }) =>
this.prisma.appCache.upsert({
where: { key },
update: {
value: value as any,
expiresAt
},
create: {
key,
value: value as any,
expiresAt
}
})
)
)
} catch (error) {
console.error('[PostgresCacheAdapter] mset() error:', error)
throw error
}
}
/**
* 获取缓存统计信息(调试用)
*/
async getStats() {
try {
const total = await this.prisma.appCache.count()
const expired = await this.prisma.appCache.count({
where: {
expiresAt: {
lt: new Date()
}
}
})
return {
total,
active: total - expired,
expired
}
} catch (error) {
console.error('[PostgresCacheAdapter] getStats() error:', error)
return { total: 0, active: 0, expired: 0 }
}
}
}

View File

@@ -35,6 +35,7 @@
export type { CacheAdapter } from './CacheAdapter.js'
export { MemoryCacheAdapter } from './MemoryCacheAdapter.js'
export { RedisCacheAdapter } from './RedisCacheAdapter.js'
export { PostgresCacheAdapter } from './PostgresCacheAdapter.js'
export { CacheFactory } from './CacheFactory.js'
// Import for usage below
@@ -45,7 +46,8 @@ import { CacheFactory } from './CacheFactory.js'
*
* 自动根据环境变量选择缓存实现:
* - CACHE_TYPE=memory: 内存缓存(本地开发)
* - CACHE_TYPE=redis: Redis缓存生产环境
* - CACHE_TYPE=redis: Redis缓存高性能场景
* - CACHE_TYPE=postgres: Postgres缓存Postgres-Only架构
*/
export const cache = CacheFactory.getInstance()

View File

@@ -0,0 +1,258 @@
/**
* 断点续传服务Platform层统一实现
*
* ✅ 重构:利用 pg-boss 的 job.data 字段存储断点信息
* 不在业务表中存储符合3层架构原则
*
* 优点:
* 1. 统一管理所有模块ASL、DC、SSA等共用一套逻辑
* 2. 数据一致:断点数据与任务数据在同一处
* 3. 查询高效无需JOIN直接读取job.data
* 4. 易维护:只需维护一处代码
*/
import { PrismaClient } from '@prisma/client';
/**
* 断点数据结构
*/
export interface CheckpointData {
/** 当前批次索引 */
currentBatchIndex: number;
/** 当前处理的项索引(在整个数组中的位置) */
currentIndex: number;
/** 已处理的批次数 */
processedBatches: number;
/** 总批次数 */
totalBatches: number;
/** 中间结果(可选) */
intermediateResult?: any;
/** 额外元数据 */
metadata?: Record<string, any>;
/** 最后更新时间 */
lastUpdate?: Date;
}
/**
* pg-boss Job 数据结构
*/
interface PgBossJob {
id: string;
name: string;
data: any; // JSONB
state: string;
priority: number;
retry_limit: number;
retry_count: number;
retry_delay: number;
retry_backoff: boolean;
start_after: Date;
started_on: Date | null;
singleton_key: string | null;
singleton_on: Date | null;
expire_in: any; // interval
created_on: Date;
completed_on: Date | null;
keep_until: Date;
}
/**
* 断点续传服务
*
* @example
* ```typescript
* const service = new CheckpointService(prisma);
*
* // 保存断点到 pg-boss job.data
* await service.saveCheckpoint(jobId, {
* currentBatchIndex: 5,
* currentIndex: 250,
* processedBatches: 5,
* totalBatches: 20
* });
*
* // 从 pg-boss job.data 读取断点
* const checkpoint = await service.loadCheckpoint(jobId);
* if (checkpoint) {
* startFrom = checkpoint.currentIndex;
* }
*
* // 清除断点
* await service.clearCheckpoint(jobId);
* ```
*/
export class CheckpointService {
constructor(private prisma: PrismaClient) {}
/**
* 保存任务断点(更新 pg-boss job.data
*
* @param jobId pg-boss 任务ID
* @param checkpoint 断点数据
*/
async saveCheckpoint(jobId: string, checkpoint: CheckpointData): Promise<void> {
try {
// 读取当前 job.data
const rows = await this.prisma.$queryRaw<PgBossJob[]>`
SELECT id, data
FROM platform_schema.job
WHERE id = ${jobId}::uuid
LIMIT 1
`;
const job = rows[0] || null;
if (!job) {
throw new Error(`Job not found: ${jobId}`);
}
// 合并断点数据到 job.data
const updatedData = {
...(job.data || {}),
checkpoint: {
...checkpoint,
lastUpdate: new Date()
}
};
// 更新 job.data
await this.prisma.$executeRaw`
UPDATE platform_schema.job
SET data = ${JSON.stringify(updatedData)}::jsonb
WHERE id = ${jobId}::uuid
`;
console.log(`[CheckpointService] Checkpoint saved for job: ${jobId}`, {
batchIndex: checkpoint.currentBatchIndex,
index: checkpoint.currentIndex
});
} catch (error) {
console.error(`[CheckpointService] Failed to save checkpoint for job ${jobId}:`, error);
throw error;
}
}
/**
* 加载任务断点(从 pg-boss job.data 读取)
*
* @param jobId pg-boss 任务ID
* @returns 断点数据,如果不存在则返回 null
*/
async loadCheckpoint(jobId: string): Promise<CheckpointData | null> {
try {
const rows = await this.prisma.$queryRaw<PgBossJob[]>`
SELECT id, data
FROM platform_schema.job
WHERE id = ${jobId}::uuid
LIMIT 1
`;
const job = rows[0] || null;
if (!job || !job.data?.checkpoint) {
return null;
}
return job.data.checkpoint as CheckpointData;
} catch (error) {
console.error(`[CheckpointService] Failed to load checkpoint for job ${jobId}:`, error);
return null;
}
}
/**
* 清除任务断点(从 pg-boss job.data 中删除)
*
* @param jobId pg-boss 任务ID
*/
async clearCheckpoint(jobId: string): Promise<void> {
try {
// 读取当前 job.data
const rows = await this.prisma.$queryRaw<PgBossJob[]>`
SELECT id, data
FROM platform_schema.job
WHERE id = ${jobId}::uuid
LIMIT 1
`;
const job = rows[0] || null;
if (!job) {
console.log(`[CheckpointService] Job not found: ${jobId}`);
return;
}
// 删除 checkpoint 字段
const updatedData = { ...(job.data || {}) };
delete updatedData.checkpoint;
// 更新 job.data
await this.prisma.$executeRaw`
UPDATE platform_schema.job
SET data = ${JSON.stringify(updatedData)}::jsonb
WHERE id = ${jobId}::uuid
`;
console.log(`[CheckpointService] Checkpoint cleared for job: ${jobId}`);
} catch (error) {
console.error(`[CheckpointService] Failed to clear checkpoint for job ${jobId}:`, error);
throw error;
}
}
/**
* 获取任务的批次进度
*
* @param jobId pg-boss 任务ID
* @returns 批次进度信息
*/
async getProgress(jobId: string): Promise<{
currentBatch: number;
totalBatches: number;
processedBatches: number;
percentage: number;
} | null> {
try {
const checkpoint = await this.loadCheckpoint(jobId);
if (!checkpoint) {
return null;
}
const percentage = checkpoint.totalBatches > 0
? Math.round((checkpoint.processedBatches / checkpoint.totalBatches) * 100)
: 0;
return {
currentBatch: checkpoint.currentBatchIndex,
totalBatches: checkpoint.totalBatches,
processedBatches: checkpoint.processedBatches,
percentage
};
} catch (error) {
console.error(`[CheckpointService] Failed to get progress for job ${jobId}:`, error);
return null;
}
}
/**
* 检查任务是否可以从断点恢复
*
* @param jobId pg-boss 任务ID
* @returns 是否存在有效断点
*/
async canResume(jobId: string): Promise<boolean> {
const checkpoint = await this.loadCheckpoint(jobId);
return checkpoint !== null && checkpoint.processedBatches < checkpoint.totalBatches;
}
}
// 导出类(不导出单例,由使用方创建实例)
// export const checkpointService = new CheckpointService(prisma);

View File

@@ -1,22 +1,25 @@
import { JobQueue } from './types.js'
import { MemoryQueue } from './MemoryQueue.js'
import { PgBossQueue } from './PgBossQueue.js'
/**
* 任务队列工厂类
*
* 根据环境变量自动选择队列实现:
* - QUEUE_TYPE=memory: 使用MemoryQueue内存队列
* - QUEUE_TYPE=database: 使用DatabaseQueue数据库队列待实现
* - QUEUE_TYPE=pgboss: 使用PgBossQueuePostgres队列
* - QUEUE_TYPE=database: 别名指向pgboss
*
* 零代码切换:
* - 本地开发不配置QUEUE_TYPE默认使用memory
* - 云端部署配置QUEUE_TYPE=database多实例共享
* - Postgres-Only架构配置QUEUE_TYPE=pgboss
* - 多实例部署配置QUEUE_TYPE=pgboss自动负载均衡
*
* @example
* ```typescript
* import { jobQueue } from '@/common/jobs'
*
* // 业务代码不关心是memory还是database
* // 业务代码不关心具体实现
* const job = await jobQueue.push('asl:screening', { projectId: 123 })
* ```
*/
@@ -43,10 +46,9 @@ export class JobFactory {
case 'memory':
return this.createMemoryQueue()
case 'database':
// TODO: 实现DatabaseQueue
console.warn('[JobFactory] DatabaseQueue not implemented yet, fallback to MemoryQueue')
return this.createMemoryQueue()
case 'pgboss':
case 'database': // 别名
return this.createPgBossQueue()
default:
console.warn(`[JobFactory] Unknown QUEUE_TYPE: ${queueType}, fallback to memory`)
@@ -72,6 +74,37 @@ export class JobFactory {
return queue
}
/**
* 创建PgBoss队列
*/
private static createPgBossQueue(): PgBossQueue {
const databaseUrl = process.env.DATABASE_URL
if (!databaseUrl) {
throw new Error(
'[JobFactory] DATABASE_URL is required when QUEUE_TYPE=pgboss'
)
}
console.log('[JobFactory] Using PgBossQueue (Postgres-Only架构)')
const queue = new PgBossQueue(databaseUrl, 'platform_schema')
// 启动队列(异步)
queue.start().catch(err => {
console.error('[JobFactory] Failed to start PgBossQueue:', err)
})
// 定期清理缓存中的已完成任务
if (process.env.NODE_ENV !== 'test') {
setInterval(() => {
queue.cleanup()
}, 60 * 60 * 1000) // 每小时清理一次
}
return queue
}
/**
* 重置实例(用于测试)
*/

View File

@@ -36,6 +36,22 @@ export class MemoryQueue implements JobQueue {
private handlers: Map<string, JobHandler> = new Map()
private processing: boolean = false
/**
* 启动队列MemoryQueue无需启动立即可用
*/
async start(): Promise<void> {
// MemoryQueue不需要初始化已经ready
this.processing = true
}
/**
* 停止队列MemoryQueue无需清理
*/
async stop(): Promise<void> {
// MemoryQueue不需要清理
this.processing = false
}
/**
* 添加任务到队列
*/

View File

@@ -0,0 +1,363 @@
import { Job, JobQueue, JobHandler } from './types.js'
import { PgBoss } from 'pg-boss'
import { randomUUID } from 'crypto'
/**
* PgBoss队列适配器
*
* 适用场景:
* - Postgres-Only架构无需Redis
* - 云原生Serverless环境SAE
* - 多实例部署需要共享队列
* - 关键任务(需要持久化)
*
* 特点:
* - ✅ 无需额外Redis实例降低成本
* - ✅ 多实例自动负载均衡
* - ✅ 任务持久化,实例重启不丢失
* - ✅ 支持延迟任务、重试、优先级
* - ✅ 适合中小规模应用(<10万任务/天)
* - ⚠️ 性能低于Redis队列但足够
*
* pg-boss特性
* - 基于Postgres SKIP LOCKED机制
* - 自动创建表platform_schema.job 和 platform_schema.version
* - 自动清理过期任务
* - 支持CRON定时任务
*
* @example
* ```typescript
* const queue = new PgBossQueue(databaseUrl)
* await queue.start()
*
* // 注册处理函数
* queue.process('asl:screening', async (job) => {
* await processScreening(job.data)
* })
*
* // 创建任务
* const job = await queue.push('asl:screening', { projectId: 123 })
* ```
*/
export class PgBossQueue implements JobQueue {
private boss: PgBoss
private jobs: Map<string, Job> = new Map() // 任务元数据缓存
private handlers: Map<string, JobHandler> = new Map()
private started: boolean = false
constructor(connectionString: string, schema: string = 'platform_schema') {
this.boss = new PgBoss({
connectionString,
schema, // 使用platform_schema
max: 10, // 最大连接数
application_name: 'aiclinical-queue',
// 调度配置
schedule: true, // 启用定时任务
// 维护配置
supervise: true, // 启用监控
maintenanceIntervalSeconds: 300, // 每5分钟运行维护任务
})
console.log('[PgBossQueue] Initialized with schema:', schema)
}
/**
* 启动队列
* 必须在使用前调用
*/
async start(): Promise<void> {
if (this.started) return
try {
await this.boss.start()
this.started = true
console.log('[PgBossQueue] Started successfully')
// 重新注册所有handler
for (const [type, handler] of this.handlers) {
await this.registerBossHandler(type, handler)
}
} catch (error) {
console.error('[PgBossQueue] Failed to start:', error)
throw error
}
}
/**
* 停止队列
*/
async stop(): Promise<void> {
if (!this.started) return
try {
await this.boss.stop()
this.started = false
console.log('[PgBossQueue] Stopped')
} catch (error) {
console.error('[PgBossQueue] Failed to stop:', error)
throw error
}
}
/**
* 添加任务到队列
*
* @param type 任务类型
* @param data 任务数据
* @returns Job对象
*/
async push<T>(type: string, data: T): Promise<Job<T>> {
if (!this.started) {
await this.start()
}
try {
// 创建任务元数据
const jobId = randomUUID()
const now = new Date()
const job: Job<T> = {
id: jobId,
type,
data,
status: 'pending',
progress: 0,
createdAt: now,
updatedAt: now
}
// 存储元数据到缓存
this.jobs.set(jobId, job)
// 确保队列存在(幂等操作)
try {
await this.boss.createQueue(type, {
retryLimit: 3,
retryDelay: 60,
expireInSeconds: 6 * 60 * 60 // 6小时
});
} catch (error: any) {
// 队列已存在时会报错,忽略
if (!error.message?.includes('already exists')) {
throw error;
}
}
// 发送任务到pg-boss
const bossJobId = await this.boss.send(type, {
...data,
__jobId: jobId, // 嵌入我们的jobId
__createdAt: now.toISOString()
}, {
retryLimit: 3,
retryDelay: 60,
expireInSeconds: 6 * 60 * 60 // 6小时过期更适合长批次任务
})
console.log(`[PgBossQueue] Job pushed: ${jobId} -> pg-boss:${bossJobId} (type: ${type})`)
return job
} catch (error) {
console.error(`[PgBossQueue] Failed to push job (type: ${type}):`, error)
throw error
}
}
/**
* 注册任务处理函数
*
* @param type 任务类型
* @param handler 处理函数
*/
process<T>(type: string, handler: JobHandler<T>): void {
this.handlers.set(type, handler)
console.log(`[PgBossQueue] Registered handler for job type: ${type}`)
// 如果已启动立即注册到pg-boss
if (this.started) {
this.registerBossHandler(type, handler).catch(err => {
console.error(`[PgBossQueue] Failed to register handler for ${type}:`, err)
})
}
}
/**
* 注册handler到pg-boss
* (内部方法)
*/
private async registerBossHandler<T>(type: string, handler: JobHandler<T>): Promise<void> {
// pg-boss 9.x 需要显式创建队列
await this.boss.createQueue(type, {
retryLimit: 3,
retryDelay: 60,
expireInSeconds: 6 * 60 * 60 // 6小时
});
console.log(`[PgBossQueue] Queue created: ${type}`);
await this.boss.work<Record<string, any>>(type, {
batchSize: 1, // 每次处理1个任务
pollingIntervalSeconds: 1 // 每秒轮询一次
}, async (bossJobs) => {
// pg-boss的work handler接收的是Job数组
const bossJob = bossJobs[0]
if (!bossJob) return
const { __jobId, __createdAt, ...data } = bossJob.data
const jobId = __jobId || randomUUID()
// 获取或创建Job对象
let job = this.jobs.get(jobId)
if (!job) {
job = {
id: jobId,
type,
data: data as T,
status: 'processing',
progress: 0,
createdAt: new Date(__createdAt || Date.now()),
updatedAt: new Date(),
startedAt: new Date()
}
this.jobs.set(jobId, job)
} else {
job.status = 'processing'
job.startedAt = new Date()
job.updatedAt = new Date()
}
console.log(`[PgBossQueue] Processing job: ${jobId} (type: ${type})`)
try {
// 执行用户提供的处理函数
const result = await handler(job)
// 标记为完成
await this.completeJob(jobId, result)
return result
} catch (error: any) {
// 标记为失败
await this.failJob(jobId, error.message || String(error))
// 抛出错误让pg-boss处理重试
throw error
}
})
console.log(`[PgBossQueue] Handler registered to pg-boss: ${type}`)
}
/**
* 获取任务信息
*
* @param id 任务ID
* @returns Job对象或null
*/
async getJob(id: string): Promise<Job | null> {
// 先从缓存查找
const cachedJob = this.jobs.get(id)
if (cachedJob) {
return cachedJob
}
// TODO: 从pg-boss查询需要额外存储
// 目前只返回缓存中的任务
return null
}
/**
* 更新任务进度
*
* @param id 任务ID
* @param progress 进度0-100
*/
async updateProgress(id: string, progress: number): Promise<void> {
const job = this.jobs.get(id)
if (job) {
job.progress = Math.min(100, Math.max(0, progress))
job.updatedAt = new Date()
this.jobs.set(id, job)
console.log(`[PgBossQueue] Job progress updated: ${id} -> ${progress}%`)
}
}
/**
* 标记任务为完成
*
* @param id 任务ID
* @param result 任务结果
*/
async completeJob(id: string, result: any): Promise<void> {
const job = this.jobs.get(id)
if (job) {
job.status = 'completed'
job.progress = 100
job.result = result
job.completedAt = new Date()
job.updatedAt = new Date()
this.jobs.set(id, job)
console.log(`[PgBossQueue] Job completed: ${id} (type: ${job.type})`)
}
}
/**
* 标记任务为失败
*
* @param id 任务ID
* @param error 错误信息
*/
async failJob(id: string, error: string): Promise<void> {
const job = this.jobs.get(id)
if (job) {
job.status = 'failed'
job.error = error
job.completedAt = new Date()
job.updatedAt = new Date()
this.jobs.set(id, job)
console.error(`[PgBossQueue] Job failed: ${id} (type: ${job.type})`, error)
}
}
/**
* 获取队列统计信息
*/
async getStats() {
const jobs = Array.from(this.jobs.values())
return {
total: jobs.length,
pending: jobs.filter(j => j.status === 'pending').length,
processing: jobs.filter(j => j.status === 'processing').length,
completed: jobs.filter(j => j.status === 'completed').length,
failed: jobs.filter(j => j.status === 'failed').length
}
}
/**
* 清理已完成的任务(从缓存中)
*/
cleanup(olderThan: Date = new Date(Date.now() - 24 * 60 * 60 * 1000)) {
let removed = 0
for (const [id, job] of this.jobs) {
if (
(job.status === 'completed' || job.status === 'failed') &&
job.completedAt &&
job.completedAt < olderThan
) {
this.jobs.delete(id)
removed++
}
}
if (removed > 0) {
console.log(`[PgBossQueue] Cleanup: removed ${removed} old jobs from cache`)
}
return removed
}
}

View File

@@ -37,6 +37,7 @@
export type { Job, JobStatus, JobHandler, JobQueue } from './types.js'
export { MemoryQueue } from './MemoryQueue.js'
export { PgBossQueue } from './PgBossQueue.js'
export { JobFactory } from './JobFactory.js'
// Import for usage below
@@ -47,7 +48,8 @@ import { JobFactory } from './JobFactory.js'
*
* 自动根据环境变量选择队列实现:
* - QUEUE_TYPE=memory: 内存队列(本地开发)
* - QUEUE_TYPE=database: 数据库队列(生产环境,待实现
* - QUEUE_TYPE=pgboss: Postgres队列Postgres-Only架构
* - QUEUE_TYPE=database: 别名指向pgboss
*/
export const jobQueue = JobFactory.getInstance()

View File

@@ -56,6 +56,16 @@ export type JobHandler<T = any> = (job: Job<T>) => Promise<any>
* 任务队列接口
*/
export interface JobQueue {
/**
* 启动队列初始化连接和Worker
*/
start(): Promise<void>
/**
* 停止队列清理连接和Worker
*/
stop(): Promise<void>
/**
* 添加任务到队列
*/

View File

@@ -0,0 +1,282 @@
/**
* 任务拆分工具函数
*
* 用于将长时间任务拆分成多个小任务,避免:
* - SAE 30秒超时
* - pg-boss 24小时任务过期
* - 任务失败时重做所有工作
*
* 核心策略:
* - 文献筛选每批20-50篇
* - 数据提取每批10-20条
* - 统计分析:按数据集大小动态调整
*/
/**
* 任务类型的拆分策略
*/
export interface ChunkStrategy {
/** 任务类型标识 */
type: string
/** 每批处理的数据量 */
chunkSize: number
/** 最大批次数(防止过度拆分) */
maxChunks?: number
/** 描述 */
description: string
}
/**
* 预定义的拆分策略
*
* 根据实际业务场景和性能测试数据配置
*/
export const CHUNK_STRATEGIES: Record<string, ChunkStrategy> = {
// ASL模块文献筛选
'asl:screening:title-abstract': {
type: 'asl:screening:title-abstract',
chunkSize: 50, // 每批50篇LLM API较快
maxChunks: 100, // 最多100批5000篇
description: '标题/摘要筛选 - 每批50篇'
},
'asl:screening:full-text': {
type: 'asl:screening:full-text',
chunkSize: 20, // 每批20篇全文较慢
maxChunks: 50, // 最多50批1000篇
description: '全文筛选 - 每批20篇'
},
'asl:extraction': {
type: 'asl:extraction',
chunkSize: 30, // 每批30篇
maxChunks: 50,
description: '数据提取 - 每批30篇'
},
// DC模块数据清洗
'dc:clean:batch': {
type: 'dc:clean:batch',
chunkSize: 100, // 每批100行
maxChunks: 100,
description: '数据清洗 - 每批100行'
},
'dc:extract:medical-record': {
type: 'dc:extract:medical-record',
chunkSize: 10, // 每批10份病历AI提取较慢
maxChunks: 100,
description: '病历提取 - 每批10份'
},
// SSA模块统计分析
'ssa:analysis:batch': {
type: 'ssa:analysis:batch',
chunkSize: 1000, // 每批1000条数据
maxChunks: 50,
description: '统计分析 - 每批1000条'
},
// 默认策略
'default': {
type: 'default',
chunkSize: 50,
maxChunks: 100,
description: '默认策略 - 每批50条'
}
}
/**
* 将数据数组拆分成多个批次
*
* @param items 要拆分的数据数组
* @param chunkSize 每批的大小
* @returns 拆分后的批次数组
*
* @example
* ```typescript
* const ids = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
* const batches = splitIntoChunks(ids, 3)
* // 结果: [[1,2,3], [4,5,6], [7,8,9], [10]]
* ```
*/
export function splitIntoChunks<T>(items: T[], chunkSize: number): T[][] {
if (chunkSize <= 0) {
throw new Error('chunkSize must be positive')
}
if (items.length === 0) {
return []
}
const chunks: T[][] = []
for (let i = 0; i < items.length; i += chunkSize) {
chunks.push(items.slice(i, i + chunkSize))
}
return chunks
}
/**
* 根据任务类型推荐批次大小
*
* @param taskType 任务类型(如:'asl:screening:title-abstract'
* @param totalItems 总数据量
* @returns 推荐的批次大小
*
* @example
* ```typescript
* const chunkSize = recommendChunkSize('asl:screening:title-abstract', 1000)
* // 返回: 50 (根据CHUNK_STRATEGIES配置)
* ```
*/
export function recommendChunkSize(taskType: string, totalItems: number): number {
// 查找对应的策略
const strategy = CHUNK_STRATEGIES[taskType] || CHUNK_STRATEGIES['default']
let chunkSize = strategy.chunkSize
// 如果总量很小,不拆分
if (totalItems <= chunkSize) {
return totalItems
}
// 如果拆分后批次数超过maxChunks增大chunkSize
if (strategy.maxChunks) {
const predictedChunks = Math.ceil(totalItems / chunkSize)
if (predictedChunks > strategy.maxChunks) {
chunkSize = Math.ceil(totalItems / strategy.maxChunks)
console.log(
`[TaskSplit] Adjusted chunkSize to ${chunkSize} to limit chunks to ${strategy.maxChunks}`
)
}
}
return chunkSize
}
/**
* 计算任务拆分信息
*
* @param taskType 任务类型
* @param totalItems 总数据量
* @returns 拆分信息
*
* @example
* ```typescript
* const info = calculateSplitInfo('asl:screening:title-abstract', 1000)
* // 返回: { chunkSize: 50, totalChunks: 20, strategy: {...} }
* ```
*/
export function calculateSplitInfo(taskType: string, totalItems: number) {
const strategy = CHUNK_STRATEGIES[taskType] || CHUNK_STRATEGIES['default']
const chunkSize = recommendChunkSize(taskType, totalItems)
const totalChunks = Math.ceil(totalItems / chunkSize)
return {
taskType,
totalItems,
chunkSize,
totalChunks,
strategy,
avgItemsPerChunk: totalChunks > 0 ? Math.round(totalItems / totalChunks) : 0,
lastChunkSize: totalItems % chunkSize || chunkSize
}
}
/**
* 获取批次索引的人类可读描述
*
* @param batchIndex 批次索引从0开始
* @param totalBatches 总批次数
* @returns 描述字符串
*
* @example
* ```typescript
* getBatchDescription(0, 20) // "批次 1/20"
* getBatchDescription(19, 20) // "批次 20/20最后一批"
* ```
*/
export function getBatchDescription(batchIndex: number, totalBatches: number): string {
const humanIndex = batchIndex + 1
if (humanIndex === totalBatches) {
return `批次 ${humanIndex}/${totalBatches}(最后一批)`
}
return `批次 ${humanIndex}/${totalBatches}`
}
/**
* 估算批次执行时间(秒)
*
* 基于经验值估算,用于前端显示预计完成时间
*
* @param taskType 任务类型
* @param batchSize 批次大小
* @returns 估算的执行时间(秒)
*/
export function estimateBatchDuration(taskType: string, batchSize: number): number {
// 每项平均处理时间(秒)
const TIME_PER_ITEM: Record<string, number> = {
'asl:screening:title-abstract': 0.5, // 0.5秒/篇含LLM调用
'asl:screening:full-text': 2, // 2秒/篇
'asl:extraction': 3, // 3秒/篇
'dc:clean:batch': 0.1, // 0.1秒/行
'dc:extract:medical-record': 5, // 5秒/份
'ssa:analysis:batch': 0.01, // 0.01秒/条
'default': 1 // 1秒/条
}
const timePerItem = TIME_PER_ITEM[taskType] || TIME_PER_ITEM['default']
return Math.ceil(batchSize * timePerItem)
}
/**
* 验证批次索引是否有效
*
* @param batchIndex 批次索引
* @param totalBatches 总批次数
* @throws Error 如果索引无效
*/
export function validateBatchIndex(batchIndex: number, totalBatches: number): void {
if (batchIndex < 0 || batchIndex >= totalBatches) {
throw new Error(
`Invalid batch index: ${batchIndex}. Must be between 0 and ${totalBatches - 1}`
)
}
}
/**
* 从数组中提取指定批次的数据
*
* @param items 完整数据数组
* @param batchIndex 批次索引从0开始
* @param chunkSize 批次大小
* @returns 该批次的数据
*
* @example
* ```typescript
* const ids = [1,2,3,4,5,6,7,8,9,10]
* getBatchItems(ids, 0, 3) // [1,2,3]
* getBatchItems(ids, 1, 3) // [4,5,6]
* getBatchItems(ids, 3, 3) // [10]
* ```
*/
export function getBatchItems<T>(
items: T[],
batchIndex: number,
chunkSize: number
): T[] {
const start = batchIndex * chunkSize
const end = Math.min(start + chunkSize, items.length)
return items.slice(start, end)
}