import { Job, JobQueue, JobHandler } from './types.js' import { PgBoss } from 'pg-boss' import { randomUUID } from 'crypto' import { logger } from '../logging/index.js' /** * PgBoss队列适配器 * * 适用场景: * - Postgres-Only架构(无需Redis) * - 云原生Serverless环境(SAE) * - 多实例部署需要共享队列 * - 关键任务(需要持久化) * * 特点: * - ✅ 无需额外Redis实例,降低成本 * - ✅ 多实例自动负载均衡 * - ✅ 任务持久化,实例重启不丢失 * - ✅ 支持延迟任务、重试、优先级 * - ✅ 适合中小规模应用(<10万任务/天) * - ⚠️ 性能低于Redis队列(但足够) * * pg-boss特性: * - 基于Postgres SKIP LOCKED机制 * - 自动创建表:platform_schema.job 和 platform_schema.version * - 自动清理过期任务 * - 支持CRON定时任务 * * @example * ```typescript * const queue = new PgBossQueue(databaseUrl) * await queue.start() * * // 注册处理函数 * queue.process('asl:screening', async (job) => { * await processScreening(job.data) * }) * * // 创建任务 * const job = await queue.push('asl:screening', { projectId: 123 }) * ``` */ export class PgBossQueue implements JobQueue { private boss: PgBoss private jobs: Map = new Map() // 任务元数据缓存 private handlers: Map = new Map() private started: boolean = false constructor(connectionString: string, schema: string = 'platform_schema') { this.boss = new PgBoss({ connectionString, schema, // 使用platform_schema max: 5, // 🛡️ 限制连接数,避免挤占 Prisma 连接配额(RDS 限制 100) application_name: 'aiclinical-queue', // 调度配置 schedule: true, // 启用定时任务 // 维护配置 supervise: true, // 启用监控 maintenanceIntervalSeconds: 300, // 每5分钟运行维护任务 }) // 🛡️ 全局错误监听:防止未捕获错误导致进程崩溃 this.boss.on('error', (err: any) => { // 静默处理 duplicate key 错误(队列并发初始化时的正常现象) if (err.code === '23505' && err.constraint === 'queue_pkey') { console.log(`[PgBossQueue] ℹ️ Queue concurrency conflict auto-resolved: ${err.detail}`); } else { console.error('[PgBossQueue] ❌ Critical error:', err); // 记录到日志但不崩溃进程 } }); console.log('[PgBossQueue] Initialized with schema:', schema) } /** * 启动队列 * 必须在使用前调用 */ async start(): Promise { if (this.started) return try { await this.boss.start() this.started = true console.log('[PgBossQueue] Started successfully') // 重新注册所有handler for (const [type, handler] of this.handlers) { await this.registerBossHandler(type, handler) } } catch (error) { console.error('[PgBossQueue] Failed to start:', error) throw error } } /** * 停止队列 */ async stop(): Promise { if (!this.started) return try { await this.boss.stop() this.started = false console.log('[PgBossQueue] Stopped') } catch (error) { console.error('[PgBossQueue] Failed to stop:', error) throw error } } /** * 添加任务到队列 * * @param type 任务类型 * @param data 任务数据 * @returns Job对象 */ async push(type: string, data: T): Promise> { if (!this.started) { await this.start() } try { // 创建任务元数据 const jobId = randomUUID() const now = new Date() const job: Job = { id: jobId, type, data, status: 'pending', progress: 0, createdAt: now, updatedAt: now } // 存储元数据到缓存 this.jobs.set(jobId, job) // 📝 注意:队列已在 registerBossHandler() 中创建,这里不再重复创建 // 避免每次 push 都尝试 createQueue 导致重复定义 // 发送任务到pg-boss // ✅ 支持自定义 singletonKey 和 options(通过 data 中的特殊字段) // 特殊字段:__singletonKey, __singletonSeconds, __expireInSeconds const dataObj = data as any const singletonKey = dataObj.__singletonKey || dataObj.taskId || jobId const singletonSeconds = dataObj.__singletonSeconds || 3600 // 默认 1 小时 const expireInSeconds = dataObj.__expireInSeconds || 6 * 60 * 60 // 默认 6 小时 // 移除特殊字段,不传入 pg-boss const cleanData = { ...dataObj } delete cleanData.__singletonKey delete cleanData.__singletonSeconds delete cleanData.__expireInSeconds const bossJobId = await this.boss.send(type, { ...cleanData, __jobId: jobId, // 嵌入我们的jobId __createdAt: now.toISOString() }, { retryLimit: 3, retryDelay: 60, expireInSeconds, singletonKey, // ✅ 防止同一任务重复入队 singletonSeconds, }) console.log(`[PgBossQueue] Job pushed: ${jobId} -> pg-boss:${bossJobId} (type: ${type})`) return job } catch (error) { console.error(`[PgBossQueue] Failed to push job (type: ${type}):`, error) throw error } } /** * 注册任务处理函数 * * @param type 任务类型 * @param handler 处理函数 */ process(type: string, handler: JobHandler): void { this.handlers.set(type, handler) console.log(`[PgBossQueue] Registered handler for job type: ${type}`) // 如果已启动,立即注册到pg-boss if (this.started) { this.registerBossHandler(type, handler).catch(err => { console.error(`[PgBossQueue] Failed to register handler for ${type}:`, err) }) } } /** * 注册handler到pg-boss * (内部方法) */ private async registerBossHandler(type: string, handler: JobHandler): Promise { console.log(`[PgBossQueue] 🔧 开始注册 Handler: ${type}`); try { // pg-boss 9.x 需要显式创建队列(幂等操作) try { await this.boss.createQueue(type, { retryLimit: 3, retryDelay: 60, expireInSeconds: 6 * 60 * 60 // 6小时 }); console.log(`[PgBossQueue] ✅ Queue created: ${type}`); } catch (createError: any) { // 队列已存在时会报 duplicate key 错误,忽略 if (createError.code === '23505' || createError.message?.includes('already exists')) { console.log(`[PgBossQueue] ℹ️ Queue already exists: ${type}`); } else { throw createError; } } await this.boss.work>(type, { batchSize: 1, // 每次处理1个任务 pollingIntervalSeconds: 2 // 每2秒轮询一次(降低频率避免竞态) }, async (bossJobs) => { // pg-boss的work handler接收的是Job数组 const bossJob = bossJobs[0] if (!bossJob) return const { __jobId, __createdAt, ...data } = bossJob.data const jobId = __jobId || randomUUID() const bossJobId = bossJob.id // pg-boss 自己的 job ID // 获取或创建Job对象 let job = this.jobs.get(jobId) if (!job) { job = { id: jobId, type, data: data as T, status: 'processing', progress: 0, createdAt: new Date(__createdAt || Date.now()), updatedAt: new Date(), startedAt: new Date() } this.jobs.set(jobId, job) } else { job.status = 'processing' job.startedAt = new Date() job.updatedAt = new Date() } // ✅ 检查:是否已经处理过这个 job(防止重复处理) const existingJob = this.jobs.get(jobId) if (existingJob && existingJob.status === 'completed') { console.warn(`[PgBossQueue] ⚠️ Job already completed, skipping: ${jobId} (pg-boss: ${bossJobId})`) return // 跳过已完成的任务 } console.log(`[PgBossQueue] Processing job: ${jobId} (pg-boss: ${bossJobId}, type: ${type})`) try { // 执行用户提供的处理函数 const result = await handler(job) // 标记为完成(更新内存缓存) await this.completeJob(jobId, result) // ✅ 重要:pg-boss 12.x work handler 返回 void // pg-boss 会在 handler 成功返回后自动将任务标记为 completed console.log(`[PgBossQueue] ✅ Job handler finished successfully: ${jobId} (pg-boss: ${bossJobId})`) // 打印最终状态 const finalJob = this.jobs.get(jobId) console.log(`[PgBossQueue] Job final status: ${finalJob?.status || 'unknown'}`) } catch (error: any) { console.error(`[PgBossQueue] ❌ Job handler failed: ${jobId} (pg-boss: ${bossJobId})`, error.message) // 标记为失败(更新内存缓存) await this.failJob(jobId, error.message || String(error)) // 抛出错误让pg-boss处理重试 throw error } }) console.log(`[PgBossQueue] ✅ Handler registered to pg-boss: ${type}`); logger.info(`[PgBossQueue] Worker registration completed`, { type }); } catch (error: any) { console.error(`[PgBossQueue] ❌ Failed to register handler: ${type}`, error); logger.error(`[PgBossQueue] Handler registration failed`, { type, error: error.message }); throw error; } } /** * 获取任务信息 * * @param id 任务ID * @returns Job对象或null */ async getJob(id: string): Promise { // 先从缓存查找 const cachedJob = this.jobs.get(id) if (cachedJob) { return cachedJob } // ✅ 修复:从pg-boss数据库查询真实状态 try { // pg-boss v9 API: getJobById(queueName, id) // 使用通配符'*'来搜索所有队列中的job const bossJob = await (this.boss.getJobById as any)('*', id); if (!bossJob) { return null; } // 映射 pg-boss 状态到我们的Job对象(注意:pg-boss 使用驼峰命名) const status: any = (this as any).mapBossStateToJobStatus((bossJob.state || 'created') as any, null as any); return { id: bossJob.id, type: bossJob.name, data: bossJob.data, status, progress: 0, createdAt: new Date(bossJob.createdOn || bossJob.createdon || Date.now()), updatedAt: new Date(bossJob.completedOn || bossJob.startedOn || bossJob.createdOn || Date.now()), startedAt: bossJob.startedOn ? new Date(bossJob.startedOn) : (bossJob.startedon ? new Date(bossJob.startedon) : undefined), completedAt: bossJob.completedOn ? new Date(bossJob.completedOn) : (bossJob.completedon ? new Date(bossJob.completedon) : undefined), }; } catch (error: any) { console.error(`[PgBossQueue] Failed to get job ${id} from pg-boss:`, error); return null; } } /** * 映射 pg-boss 状态到我们的 Job 状态 */ private mapBossStateToJobStatus(state: string): 'pending' | 'processing' | 'completed' | 'failed' | 'cancelled' { switch (state) { case 'created': case 'retry': return 'pending'; case 'active': return 'processing'; case 'completed': return 'completed'; case 'expired': case 'cancelled': return 'cancelled'; case 'failed': return 'failed'; default: return 'pending'; } } /** * 更新任务进度 * * @param id 任务ID * @param progress 进度(0-100) */ async updateProgress(id: string, progress: number): Promise { const job = this.jobs.get(id) if (job) { job.progress = Math.min(100, Math.max(0, progress)) job.updatedAt = new Date() this.jobs.set(id, job) console.log(`[PgBossQueue] Job progress updated: ${id} -> ${progress}%`) } } /** * 标记任务为完成 * * @param id 任务ID * @param result 任务结果 */ async completeJob(id: string, result: any): Promise { const job = this.jobs.get(id) if (job) { job.status = 'completed' job.progress = 100 job.result = result job.completedAt = new Date() job.updatedAt = new Date() this.jobs.set(id, job) console.log(`[PgBossQueue] Job completed: ${id} (type: ${job.type})`) } } /** * 标记任务为失败 * * @param id 任务ID * @param error 错误信息 */ async failJob(id: string, error: string): Promise { const job = this.jobs.get(id) if (job) { job.status = 'failed' job.error = error job.completedAt = new Date() job.updatedAt = new Date() this.jobs.set(id, job) console.error(`[PgBossQueue] Job failed: ${id} (type: ${job.type})`, error) } } /** * 获取队列统计信息 */ async getStats() { const jobs = Array.from(this.jobs.values()) return { total: jobs.length, pending: jobs.filter(j => j.status === 'pending').length, processing: jobs.filter(j => j.status === 'processing').length, completed: jobs.filter(j => j.status === 'completed').length, failed: jobs.filter(j => j.status === 'failed').length } } /** * 清理已完成的任务(从缓存中) */ cleanup(olderThan: Date = new Date(Date.now() - 24 * 60 * 60 * 1000)) { let removed = 0 for (const [id, job] of this.jobs) { if ( (job.status === 'completed' || job.status === 'failed') && job.completedAt && job.completedAt < olderThan ) { this.jobs.delete(id) removed++ } } if (removed > 0) { console.log(`[PgBossQueue] Cleanup: removed ${removed} old jobs from cache`) } return removed } }