# **Postgres-Only 队列架构治理与异步决策指南** **文档版本:** v1.2 (Review版) **面向对象:** 后端开发团队 (2人) **核心目标:** 解决并发冲突,规避异步陷阱,建立低成本运维标准 **适用架构:** Node.js \+ PostgreSQL \+ pg-boss (无 Redis) ## **1\. 现状与痛点分析** ### **1.1 当前架构背景** 我们采用了极简的 **Postgres-Only** 架构,利用 pg-boss 实现异步任务队列。这对于我们 2 人团队非常有利,因为: * **运维成本低**:不需要维护 Redis 或 RabbitMQ。 * **事务一致性**:任务数据与业务数据在同一个数据库,天然支持事务。 * **部署简单**:一个 Docker 容器搞定所有状态存储。 ### **1.2 遇到的技术问题** 在开发环境(Nodemon 热重载)或生产环境(多实例部署)启动时,频发以下错误: error: duplicate key value violates unique constraint "queue\_pkey" Key (name)=(asl\_research\_execute) already exists. **根本原因:** 典型的竞争条件 (Race Condition)。多个进程同时尝试初始化队列,触发数据库唯一约束。 ## **2\. 核心技术方案:健壮的单例模式** 为了解决报错并防止资源泄露,我们需要对 pg-boss 进行**防御性封装**。 ### **2.1 标准代码实现 (backend/services/queueService.js)** **⚠️ 重大更新**:增加了连接池限制 (max: 2),防止搞挂 RDS。 import PgBoss from 'pg-boss'; import { logger } from '@/common/logging'; class QueueService { constructor() { this.boss \= null; this.isReady \= false; this.isStarting \= false; } /\*\* \* 核心:初始化与错误监听 \*/ async init(connectionString) { if (this.boss || this.isStarting) return; this.isStarting \= true; try { this.boss \= new PgBoss({ connectionString, application\_name: 'ai\_clinical\_queue', retentionDays: 7, maxTries: 3, // 🛡️ \[逆向防御\]:限制连接池大小 // pg-boss 默认至少需要 2-4 个连接。 // 我们显式限制为 4,防止挤占 Prisma 的连接配额 (RDS通常限制 100\) max: 4, }); // 🛡️ \[逆向防御\]:监听全局错误,防止进程崩溃 this.boss.on('error', (err) \=\> { if (this.\_isDuplicateKeyError(err)) { logger.warn(\`\[Queue\] Concurrency conflict auto-resolved: ${err.detail}\`); } else { logger.error('\[Queue\] PgBoss critical error:', err); // TODO: 这里可以接入飞书/钉钉 Webhook 告警 } }); await this.boss.start(); this.isReady \= true; this.isStarting \= false; logger.info('✅ Queue Service started successfully'); } catch (err) { this.isStarting \= false; logger.error('❌ Failed to start Queue Service:', err); throw err; } } /\*\* \* 封装:安全地发布任务 \*/ async publish(queueName, data, options \= {}) { await this.\_ensureReady(); await this.\_ensureQueueExists(queueName); try { return await this.boss.send(queueName, data, options); } catch (err) { logger.error(\`❌ Failed to publish to ${queueName}:\`, err); // 🛡️ \[逆向防御\]:这里是否需要抛出错误取决于业务? // 建议抛出,让上层业务感知到任务提交失败 throw err; } } /\*\* \* 封装:Worker 注册 \*/ async subscribe(queueName, handler) { await this.\_ensureReady(); await this.\_ensureQueueExists(queueName); // 🛡️ \[逆向防御\]:包裹 handler,捕获未处理的异常,防止 Worker 僵死 const safeHandler \= async (job) \=\> { try { logger.info(\`🔄 Processing job ${job.id} \[${queueName}\]\`); return await handler(job); } catch (err) { logger.error(\`❌ Job ${job.id} failed:\`, err); throw err; // 抛出给 pg-boss 进行重试 } }; await this.boss.work(queueName, safeHandler); logger.info(\`👷 Worker registered: ${queueName}\`); } async shutdown() { if (this.boss) { await this.boss.stop(); this.boss \= null; this.isReady \= false; } } // \--- 私有辅助方法 \--- async \_ensureQueueExists(queueName) { try { await this.boss.createQueue(queueName); } catch (err) { if (\!this.\_isDuplicateKeyError(err)) throw err; } } \_isDuplicateKeyError(err) { return err.code \=== '23505' && err.constraint \=== 'queue\_pkey'; } async \_ensureReady() { if (\!this.isReady && \!this.isStarting) { await this.init(process.env.DATABASE\_URL); } if (\!this.isReady) throw new Error('QueueService not initialized'); } } export const queueService \= new QueueService(); ## **3\. 架构决策:什么时候该用异步?** 作为 2 人团队,维护异步队列有显著的隐性成本。原则:**默认同步,按需异步。** ### **3.1 坚决【不使用】异步的场景** * **短时间 AI 交互 (\< 20秒)**:用 SSE 流式响应。 * **简单 CRUD**:直接 await。 * **强实时反馈**:如 REDCap Webhook。 ### **3.2 【必须】使用异步的场景** * **大文件解析 (DC 模块)**:防止 HTTP Timeout。 * **长时外部 API (ASL 模块)**:DeepSearch 检索。 * **高并发削峰**:批量导入。 ## **4\. 最佳实践:智能混合模式 (Smart Hybrid Strategy)** ### **4.1 ⚠️ 逆向风险:代码分裂 (Code Divergence)** **风险**:如果同步逻辑写一份,异步 Worker 里又复制粘贴一份。一旦业务修改,很容易“改了同步忘了异步”,导致 Bug。 **解法**:**业务逻辑必须原子化**。 ### **4.2 正确的代码范例** // backend/src/modules/dc/services/coreLogic.ts // 1\. 核心逻辑剥离:这是一个纯函数,不关心它是被 HTTP 调用的还是被 Worker 调用的 export async function extractDataCore(fileBuffer: Buffer) { // ...复杂的解析逻辑... return result; } // backend/src/modules/dc/services/extractionService.ts import { extractDataCore } from './coreLogic'; async function processData(data: any\[\]) { // 🟢 场景 A:同步处理 if (data.length \< 50\) { // 直接调用核心逻辑 return await extractDataCore(data); } // 🟠 场景 B:异步队列 else { // 仅仅是发布任务,任务载荷里只存必要参数 const jobId \= await queueService.publish('dc\_process\_batch', data); return { status: 'queued', jobId }; } } // backend/src/workers/dcWorker.ts import { extractDataCore } from '../modules/dc/services/coreLogic'; // Worker 也调用同一个核心逻辑 queueService.subscribe('dc\_process\_batch', async (job) \=\> { return await extractDataCore(job.data); }); ## **5\. 开发规范** ### **5.1 命名规范** * ✅ **推荐**:模块\_动作 (如 asl\_screening\_task) * ❌ **禁止**:冒号 : 或点号 . ### **5.2 Postgres-Only 特有技巧** 利用 **事务一致性**。在 Postgres-Only 架构中,尽量让任务发布与业务数据写入在同一个事务中(如果 ORM 支持),确保不丢任务。 ## **6\. 故障排查与监控 (SQL)** 因为没有 Redis GUI,使用 SQL 监控: \-- 查看失败任务 SELECT id, name, data, output, created\_on FROM platform\_schema.job WHERE state \= 'failed' LIMIT 10;