Files
AIclinicalresearch/docs/02-通用能力层/技术方案文档_Postgres-Only队列架构优化方案 (1).md
HaHafeng 61cdc97eeb feat(platform): Fix pg-boss queue conflict and add safety standards
Summary:
- Fix pg-boss queue conflict (duplicate key violation on queue_pkey)
- Add global error listener to prevent process crash
- Reduce connection pool from 10 to 4
- Add graceful shutdown handling (SIGTERM/SIGINT)
- Fix researchWorker recursive call bug in catch block
- Make screeningWorker idempotent using upsert

Security Standards (v1.1):
- Prohibit recursive retry in Worker catch blocks
- Prohibit payload bloat (only store fileKey/ID in job.data)
- Require Worker idempotency (upsert + unique constraint)
- Recommend task-specific expireInSeconds settings
- Document graceful shutdown pattern

New Features:
- PKB signed URL endpoint for document preview/download
- pg_bigm installation guide for Docker
- Dockerfile.postgres-with-extensions for pgvector + pg_bigm

Documentation:
- Update Postgres-Only async task processing guide (v1.1)
- Add troubleshooting SQL queries
- Update safety checklist

Tested: Local verification passed
2026-01-23 22:07:26 +08:00

230 lines
7.5 KiB
Markdown
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# **Postgres-Only 队列架构治理与异步决策指南**
**文档版本:** v1.2 (Review版) **面向对象:** 后端开发团队 (2人) **核心目标:** 解决并发冲突,规避异步陷阱,建立低成本运维标准
**适用架构:** Node.js \+ PostgreSQL \+ pg-boss (无 Redis)
## **1\. 现状与痛点分析**
### **1.1 当前架构背景**
我们采用了极简的 **Postgres-Only** 架构,利用 pg-boss 实现异步任务队列。这对于我们 2 人团队非常有利,因为:
* **运维成本低**:不需要维护 Redis 或 RabbitMQ。
* **事务一致性**:任务数据与业务数据在同一个数据库,天然支持事务。
* **部署简单**:一个 Docker 容器搞定所有状态存储。
### **1.2 遇到的技术问题**
在开发环境Nodemon 热重载)或生产环境(多实例部署)启动时,频发以下错误:
error: duplicate key value violates unique constraint "queue\_pkey"
Key (name)=(asl\_research\_execute) already exists.
**根本原因:** 典型的竞争条件 (Race Condition)。多个进程同时尝试初始化队列,触发数据库唯一约束。
## **2\. 核心技术方案:健壮的单例模式**
为了解决报错并防止资源泄露,我们需要对 pg-boss 进行**防御性封装**。
### **2.1 标准代码实现 (backend/services/queueService.js)**
**⚠️ 重大更新**:增加了连接池限制 (max: 2),防止搞挂 RDS。
import PgBoss from 'pg-boss';
import { logger } from '@/common/logging';
class QueueService {
constructor() {
this.boss \= null;
this.isReady \= false;
this.isStarting \= false;
}
/\*\*
\* 核心:初始化与错误监听
\*/
async init(connectionString) {
if (this.boss || this.isStarting) return;
this.isStarting \= true;
try {
this.boss \= new PgBoss({
connectionString,
application\_name: 'ai\_clinical\_queue',
retentionDays: 7,
maxTries: 3,
// 🛡️ \[逆向防御\]:限制连接池大小
// pg-boss 默认至少需要 2-4 个连接。
// 我们显式限制为 4防止挤占 Prisma 的连接配额 (RDS通常限制 100\)
max: 4,
});
// 🛡️ \[逆向防御\]:监听全局错误,防止进程崩溃
this.boss.on('error', (err) \=\> {
if (this.\_isDuplicateKeyError(err)) {
logger.warn(\`\[Queue\] Concurrency conflict auto-resolved: ${err.detail}\`);
} else {
logger.error('\[Queue\] PgBoss critical error:', err);
// TODO: 这里可以接入飞书/钉钉 Webhook 告警
}
});
await this.boss.start();
this.isReady \= true;
this.isStarting \= false;
logger.info('✅ Queue Service started successfully');
} catch (err) {
this.isStarting \= false;
logger.error('❌ Failed to start Queue Service:', err);
throw err;
}
}
/\*\*
\* 封装:安全地发布任务
\*/
async publish(queueName, data, options \= {}) {
await this.\_ensureReady();
await this.\_ensureQueueExists(queueName);
try {
return await this.boss.send(queueName, data, options);
} catch (err) {
logger.error(\`❌ Failed to publish to ${queueName}:\`, err);
// 🛡️ \[逆向防御\]:这里是否需要抛出错误取决于业务?
// 建议抛出,让上层业务感知到任务提交失败
throw err;
}
}
/\*\*
\* 封装Worker 注册
\*/
async subscribe(queueName, handler) {
await this.\_ensureReady();
await this.\_ensureQueueExists(queueName);
// 🛡️ \[逆向防御\]:包裹 handler捕获未处理的异常防止 Worker 僵死
const safeHandler \= async (job) \=\> {
try {
logger.info(\`🔄 Processing job ${job.id} \[${queueName}\]\`);
return await handler(job);
} catch (err) {
logger.error(\`❌ Job ${job.id} failed:\`, err);
throw err; // 抛出给 pg-boss 进行重试
}
};
await this.boss.work(queueName, safeHandler);
logger.info(\`👷 Worker registered: ${queueName}\`);
}
async shutdown() {
if (this.boss) {
await this.boss.stop();
this.boss \= null;
this.isReady \= false;
}
}
// \--- 私有辅助方法 \---
async \_ensureQueueExists(queueName) {
try {
await this.boss.createQueue(queueName);
} catch (err) {
if (\!this.\_isDuplicateKeyError(err)) throw err;
}
}
\_isDuplicateKeyError(err) {
return err.code \=== '23505' && err.constraint \=== 'queue\_pkey';
}
async \_ensureReady() {
if (\!this.isReady && \!this.isStarting) {
await this.init(process.env.DATABASE\_URL);
}
if (\!this.isReady) throw new Error('QueueService not initialized');
}
}
export const queueService \= new QueueService();
## **3\. 架构决策:什么时候该用异步?**
作为 2 人团队,维护异步队列有显著的隐性成本。原则:**默认同步,按需异步。**
### **3.1 坚决【不使用】异步的场景**
* **短时间 AI 交互 (\< 20秒)**:用 SSE 流式响应。
* **简单 CRUD**:直接 await。
* **强实时反馈**:如 REDCap Webhook。
### **3.2 【必须】使用异步的场景**
* **大文件解析 (DC 模块)**:防止 HTTP Timeout。
* **长时外部 API (ASL 模块)**DeepSearch 检索。
* **高并发削峰**:批量导入。
## **4\. 最佳实践:智能混合模式 (Smart Hybrid Strategy)**
### **4.1 ⚠️ 逆向风险:代码分裂 (Code Divergence)**
**风险**:如果同步逻辑写一份,异步 Worker 里又复制粘贴一份。一旦业务修改,很容易“改了同步忘了异步”,导致 Bug。 **解法****业务逻辑必须原子化**。
### **4.2 正确的代码范例**
// backend/src/modules/dc/services/coreLogic.ts
// 1\. 核心逻辑剥离:这是一个纯函数,不关心它是被 HTTP 调用的还是被 Worker 调用的
export async function extractDataCore(fileBuffer: Buffer) {
// ...复杂的解析逻辑...
return result;
}
// backend/src/modules/dc/services/extractionService.ts
import { extractDataCore } from './coreLogic';
async function processData(data: any\[\]) {
// 🟢 场景 A同步处理
if (data.length \< 50\) {
// 直接调用核心逻辑
return await extractDataCore(data);
}
// 🟠 场景 B异步队列
else {
// 仅仅是发布任务,任务载荷里只存必要参数
const jobId \= await queueService.publish('dc\_process\_batch', data);
return { status: 'queued', jobId };
}
}
// backend/src/workers/dcWorker.ts
import { extractDataCore } from '../modules/dc/services/coreLogic';
// Worker 也调用同一个核心逻辑
queueService.subscribe('dc\_process\_batch', async (job) \=\> {
return await extractDataCore(job.data);
});
## **5\. 开发规范**
### **5.1 命名规范**
***推荐**:模块\_动作 (如 asl\_screening\_task)
***禁止**:冒号 : 或点号 .
### **5.2 Postgres-Only 特有技巧**
利用 **事务一致性**。在 Postgres-Only 架构中,尽量让任务发布与业务数据写入在同一个事务中(如果 ORM 支持),确保不丢任务。
## **6\. 故障排查与监控 (SQL)**
因为没有 Redis GUI使用 SQL 监控:
\-- 查看失败任务
SELECT id, name, data, output, created\_on FROM platform\_schema.job WHERE state \= 'failed' LIMIT 10;