Summary: - Fix pg-boss queue conflict (duplicate key violation on queue_pkey) - Add global error listener to prevent process crash - Reduce connection pool from 10 to 4 - Add graceful shutdown handling (SIGTERM/SIGINT) - Fix researchWorker recursive call bug in catch block - Make screeningWorker idempotent using upsert Security Standards (v1.1): - Prohibit recursive retry in Worker catch blocks - Prohibit payload bloat (only store fileKey/ID in job.data) - Require Worker idempotency (upsert + unique constraint) - Recommend task-specific expireInSeconds settings - Document graceful shutdown pattern New Features: - PKB signed URL endpoint for document preview/download - pg_bigm installation guide for Docker - Dockerfile.postgres-with-extensions for pgvector + pg_bigm Documentation: - Update Postgres-Only async task processing guide (v1.1) - Add troubleshooting SQL queries - Update safety checklist Tested: Local verification passed
21 KiB
21 KiB
Postgres-Only 异步任务处理指南
文档版本: v1.1(2026-01-23 安全规范更新)
创建日期: 2025-12-22
最后更新: 2026-01-23
维护者: 平台架构团队
适用场景: 长时间任务(>30秒)、大文件处理、后台Worker
参考实现: DC Tool C Excel解析、ASL文献筛选、DC Tool B数据提取⚠️ 重要更新 v1.1:新增🛡️ 安全规范章节,包含幂等性、错误处理等强制规范
📋 概述
本文档基于 DC Tool C Excel解析功能 的完整实践,总结 Postgres-Only 架构下异步任务处理的标准模式。
核心价值
- ✅ 避免HTTP超时:上传接口3秒返回,解析在后台完成(30-60秒)
- ✅ 用户体验优秀:实时进度反馈,不需要傻等
- ✅ 符合云原生规范:Platform-Only模式,pg-boss队列
- ✅ 性能优化:clean data缓存,避免重复计算(-99%耗时)
🏗️ 架构设计
三层架构
┌─────────────────────────────────────────────────────────┐
│ 前端层(React + React Query) │
│ - 上传文件(立即返回 sessionId + jobId) │
│ - 轮询状态(useQuery + refetchInterval,自动串行) │
│ - 监听 status='ready',加载数据 │
└─────────────────────────────────────────────────────────┘
↓ HTTP
┌─────────────────────────────────────────────────────────┐
│ 后端层(Fastify + Prisma) │
│ - 快速上传到 OSS(2-3秒) │
│ - 创建 Session(状态:processing) │
│ - 推送任务到 pg-boss(立即返回) │
│ - 提供状态查询 API │
└─────────────────────────────────────────────────────────┘
↓ pg-boss
┌─────────────────────────────────────────────────────────┐
│ Worker层(pg-boss + Platform层) │
│ - 从队列取任务(自动串行) │
│ - 执行耗时操作(解析、清洗、统计) │
│ - 保存结果(clean data 到 OSS) │
│ - 更新 Session(填充元数据) │
└─────────────────────────────────────────────────────────┘
🚀 完整实施步骤
步骤1:数据库Schema设计
// 业务表只存业务信息,不存任务管理信息
model YourBusinessTable {
id String @id
userId String
fileKey String // OSS原始文件
// ✅ 性能优化:保存处理结果
cleanDataKey String? // 清洗/处理后的数据(避免重复计算)
// 数据元信息(异步填充)
totalRows Int?
totalCols Int?
columns Json?
// 时间戳
createdAt DateTime
updatedAt DateTime
expiresAt DateTime
@@schema("your_schema")
}
关键点:
- ❌ 不要添加
status、progress、errorMessage等任务管理字段 - ✅ 这些字段由 pg-boss 的
job表管理
步骤2:Service层 - 快速上传+推送任务
// backend/src/modules/your-module/services/YourService.ts
import { storage } from '@/common/storage';
import { jobQueue } from '@/common/jobs';
import { prisma } from '@/config/database';
export class YourService {
/**
* 创建任务并推送到队列(Postgres-Only架构)
*
* ✅ Platform-Only 模式:
* - 立即上传文件到 OSS
* - 创建业务记录(元数据为null)
* - 推送任务到队列
* - 立即返回(不阻塞请求)
*/
async createTask(userId: string, fileName: string, fileBuffer: Buffer) {
// 1. 验证文件
if (fileBuffer.length > MAX_FILE_SIZE) {
throw new Error('文件太大');
}
// 2. ⚡ 立即上传到 OSS(2-3秒)
const fileKey = `path/${userId}/${Date.now()}-${fileName}`;
await storage.upload(fileKey, fileBuffer);
// 3. ⚡ 创建业务记录(元数据为null,等Worker填充)
const record = await prisma.yourTable.create({
data: {
userId,
fileName,
fileKey,
// ⚠️ 处理结果字段为 null
totalRows: null,
columns: null,
expiresAt: new Date(Date.now() + 10 * 60 * 1000),
},
});
// 4. ⚡ 推送任务到 pg-boss(Platform-Only)
const job = await jobQueue.push('your_module_process', {
recordId: record.id,
fileKey,
userId,
});
// 5. ⚡ 立即返回(总耗时<3秒)
return {
...record,
jobId: job.id, // ✅ 返回 jobId 供前端轮询
};
}
}
步骤3:Worker层 - 后台处理
// backend/src/modules/your-module/workers/yourWorker.ts
import { jobQueue } from '@/common/jobs';
import { storage } from '@/common/storage';
import { prisma } from '@/config/database';
import { logger } from '@/common/logging';
interface YourJob {
recordId: string;
fileKey: string;
userId: string;
}
/**
* 注册 Worker 到队列
*/
export function registerYourWorker() {
logger.info('[YourWorker] Registering worker');
// ⚠️ 队列名称:只能用字母、数字、下划线、连字符
jobQueue.process<YourJob>('your_module_process', async (job) => {
const { recordId, fileKey } = job.data;
logger.info('[YourWorker] Processing job', { jobId: job.id, recordId });
try {
// 1. 从 OSS 下载文件
const buffer = await storage.download(fileKey);
// 2. 执行耗时操作(解析、处理、计算)
const result = await yourLongTimeProcess(buffer);
const { processedData, totalRows, columns } = result;
// 3. ✅ 保存处理结果到 OSS(避免重复计算)
const cleanDataKey = `${fileKey}_clean.json`;
const cleanDataBuffer = Buffer.from(JSON.stringify(processedData), 'utf-8');
await storage.upload(cleanDataKey, cleanDataBuffer);
logger.info('[YourWorker] Clean data saved', {
size: `${(cleanDataBuffer.length / 1024).toFixed(2)} KB`
});
// 4. 更新业务记录(填充元数据)
await prisma.yourTable.update({
where: { id: recordId },
data: {
cleanDataKey, // ✅ 保存 clean data 位置
totalRows,
columns,
updatedAt: new Date(),
},
});
logger.info('[YourWorker] ✅ Job completed', { jobId: job.id });
return { success: true, recordId, totalRows };
} catch (error: any) {
logger.error('[YourWorker] ❌ Job failed', {
jobId: job.id,
error: error.message
});
throw error; // 让 pg-boss 处理重试
}
});
logger.info('[YourWorker] ✅ Worker registered: your_module_process');
}
步骤4:Controller层 - 状态查询API
// backend/src/modules/your-module/controllers/YourController.ts
import { jobQueue } from '@/common/jobs';
export class YourController {
/**
* 获取任务状态(Platform-Only模式)
*
* GET /api/v1/your-module/tasks/:id/status
* Query: jobId (可选)
*/
async getTaskStatus(request, reply) {
const { id: recordId } = request.params;
const { jobId } = request.query;
// 1. 查询业务记录
const record = await prisma.yourTable.findUnique({
where: { id: recordId }
});
if (!record) {
return reply.code(404).send({ success: false, error: '记录不存在' });
}
// 2. 判断状态
// - 如果 totalRows 不为 null,说明处理完成
// - 否则查询 job 状态
if (record.totalRows !== null) {
return reply.send({
success: true,
data: {
recordId,
status: 'ready', // ✅ 处理完成
progress: 100,
record,
},
});
}
// 3. 处理中,查询 pg-boss
if (!jobId) {
return reply.send({
success: true,
data: {
recordId,
status: 'processing',
progress: 50,
},
});
}
// 4. 从 pg-boss 查询 job 状态
const job = await jobQueue.getJob(jobId);
const status = job?.status === 'completed' ? 'ready' :
job?.status === 'failed' ? 'error' : 'processing';
const progress = status === 'ready' ? 100 :
status === 'error' ? 0 : 70;
return reply.send({
success: true,
data: {
recordId,
jobId,
status,
progress,
record,
},
});
}
}
步骤5:前端 - React Query 轮询
// frontend-v2/src/modules/your-module/hooks/useTaskStatus.ts
import { useQuery } from '@tanstack/react-query';
import * as api from '../api';
/**
* 任务状态轮询 Hook
*
* 特点:
* - 自动串行轮询(React Query 内置防并发)
* - 自动清理(组件卸载时停止)
* - 条件停止(完成/失败时自动停止)
*/
export function useTaskStatus({
recordId,
jobId,
enabled = true,
}) {
const { data, isLoading, error } = useQuery({
queryKey: ['taskStatus', recordId, jobId],
queryFn: () => api.getTaskStatus(recordId, jobId),
enabled: enabled && !!recordId && !!jobId,
refetchInterval: (query) => {
const status = query.state.data?.data?.status;
// ✅ 完成或失败时停止轮询
if (status === 'ready' || status === 'error') {
return false;
}
// ✅ 处理中时每2秒轮询(自动串行)
return 2000;
},
staleTime: 0, // 始终视为过时,确保轮询
retry: 1,
});
const statusInfo = data?.data;
const status = statusInfo?.status || 'processing';
const progress = statusInfo?.progress || 0;
return {
status,
progress,
isReady: status === 'ready',
isError: status === 'error',
isLoading,
error,
};
}
步骤6:前端组件 - 使用Hook
// frontend-v2/src/modules/your-module/pages/YourPage.tsx
import { useTaskStatus } from '../hooks/useTaskStatus';
const YourPage = () => {
const [pollingInfo, setPollingInfo] = useState<{
recordId: string;
jobId: string;
} | null>(null);
// ✅ 使用 React Query Hook 自动轮询
const { status, progress, isReady } = useTaskStatus({
recordId: pollingInfo?.recordId || null,
jobId: pollingInfo?.jobId || null,
enabled: !!pollingInfo,
});
// ✅ 监听状态变化
useEffect(() => {
if (isReady && pollingInfo) {
console.log('✅ 处理完成,加载数据');
// 停止轮询
setPollingInfo(null);
// 加载数据
loadData(pollingInfo.recordId);
}
}, [isReady, pollingInfo]);
// 上传文件
const handleUpload = async (file) => {
const result = await api.uploadFile(file);
const { recordId, jobId } = result.data;
// ✅ 启动轮询(设置状态,React Query自动开始)
setPollingInfo({ recordId, jobId });
};
return (
<div>
{/* 进度条 */}
{pollingInfo && (
<div className="progress-bar">
<div style={{ width: `${progress}%` }} />
<span>{progress}%</span>
</div>
)}
{/* 上传按钮 */}
<button onClick={() => handleUpload(file)}>上传</button>
</div>
);
};
🎯 关键技术点
1. 队列名称规范
错误:
❌ 'asl:screening:batch' // 包含冒号,pg-boss不支持
❌ 'dc.toolc.parse' // 包含点号,不推荐
正确:
✅ 'asl_screening_batch' // 下划线
✅ 'dc_toolc_parse_excel' // 下划线
2. Worker注册时机
// backend/src/index.ts
await jobQueue.start(); // ← 必须先启动队列
registerYourWorker(); // ← 再注册 Worker
registerOtherWorker();
// ✅ 等待3秒,确保异步注册完成
await new Promise(resolve => setTimeout(resolve, 3000));
logger.info('✅ All workers registered');
3. clean data 缓存机制
目的:避免重复计算(性能提升99%)
// Worker 保存 clean data
const cleanDataKey = `${fileKey}_clean.json`;
await storage.upload(cleanDataKey, JSON.stringify(processedData));
await prisma.update({
where: { id },
data: {
cleanDataKey, // ← 记录位置
totalRows,
columns,
}
});
// Service 读取数据(优先 clean data)
async getFullData(recordId) {
const record = await prisma.findUnique({ where: { id: recordId } });
// ✅ 优先读取 clean data(<1秒)
if (record.cleanDataKey) {
const buffer = await storage.download(record.cleanDataKey);
return JSON.parse(buffer.toString('utf-8'));
}
// ⚠️ Fallback:重新解析(兼容旧数据)
const buffer = await storage.download(record.fileKey);
return parseFile(buffer);
}
// ⚠️ 重要:操作后要同步更新 clean data
async saveProcessedData(recordId, newData) {
const record = await getRecord(recordId);
// 覆盖原文件
await storage.upload(record.fileKey, toExcel(newData));
// ✅ 同时更新 clean data
if (record.cleanDataKey) {
await storage.upload(record.cleanDataKey, JSON.stringify(newData));
}
// 更新元数据
await prisma.update({ where: { id: recordId }, data: { ... } });
}
4. React Query 轮询(推荐)
优点:
- ✅ 自动串行(防并发风暴)
- ✅ 自动去重(同一queryKey只有一个请求)
- ✅ 自动清理(组件卸载时停止)
- ✅ 条件停止(动态控制)
不要使用 setInterval:
❌ const pollInterval = setInterval(() => {
api.getStatus(); // 可能并发
}, 2000);
📊 性能对比
DC Tool C 实际数据(3339行×151列文件)
| 指标 | 同步处理 | 异步处理 | 改善 |
|---|---|---|---|
| 上传耗时 | 47秒(阻塞) | 3秒(立即返回) | ✅ -94% |
| HTTP超时 | ❌ 经常超时 | ✅ 不会超时 | ✅ 100% |
| getPreviewData | 43秒(重复解析) | 0.5秒(缓存) | ✅ -99% |
| getFullData | 43秒(重复解析) | 0.5秒(缓存) | ✅ -99% |
| QuickAction操作 | 43秒 + Python | 0.5秒 + Python | ✅ -95% |
| 并发请求 | 15+个 | 1个(串行) | ✅ -93% |
🛡️ 安全规范(强制)
更新日期:2026-01-23
来源:内部逆向审查报告 + 生产问题修复
基于项目实际遇到的问题,以下规范 必须遵守:
规范1:禁止 Worker 递归死循环 ❌
错误示例:
// ❌ 禁止:在 catch 块中重试业务逻辑
jobQueue.process('your_task', async (job) => {
try {
await doSomething(job.data);
} catch (error) {
// ❌ 错误!这会导致死循环或重复执行
await doSomething(job.data);
throw error;
}
});
正确做法:
// ✅ 正确:直接 throw,让 pg-boss 接管重试(默认3次)
jobQueue.process('your_task', async (job) => {
try {
await doSomething(job.data);
} catch (error) {
logger.error('Job failed', { jobId: job.id, error: error.message });
throw error; // ✅ pg-boss 会自动重试
}
});
规范2:禁止 Payload 膨胀 ❌
错误示例:
// ❌ 禁止:在 job.data 中存大文件
await jobQueue.push('parse_excel', {
fileContent: base64EncodedFile, // ❌ 会导致 job 表膨胀
imageData: base64Image, // ❌ 拖慢数据库
});
正确做法:
// ✅ 正确:只存 fileKey 或数据库 ID
await jobQueue.push('parse_excel', {
sessionId: session.id, // ✅ 只存 ID
fileKey: 'path/to/file', // ✅ 只存 OSS 路径
userId: 'user-123',
});
规范3:Worker 必须幂等 ⭐
问题:任务失败重试时,可能导致重复写入、重复扣费、重复发邮件。
错误示例:
// ❌ 非幂等:重试会创建多条记录
await prisma.screeningResult.create({
data: { projectId, literatureId, result }
});
正确做法:
// ✅ 方案1:使用 upsert + 唯一约束
await prisma.screeningResult.upsert({
where: {
projectId_literatureId: { projectId, literatureId }
},
create: { projectId, literatureId, result },
update: { result }, // 重试时覆盖
});
// ✅ 方案2:先检查状态再执行
const existing = await prisma.task.findUnique({ where: { id: taskId } });
if (existing?.status === 'completed') {
logger.info('Task already completed, skipping');
return;
}
await doWork();
幂等性检查清单:
| 操作类型 | 幂等方案 |
|---|---|
| 创建记录 | 使用 upsert + 唯一约束 |
| 更新记录 | update 天然幂等 |
| 发送邮件 | 先检查 notificationSent 标志 |
| 扣费 | 使用幂等 key(如订单号) |
| 调用外部API | 检查是否已成功 |
规范4:合理设置任务过期时间
默认配置(当前):
expireInSeconds: 6 * 60 * 60 // 6小时
推荐配置(按业务类型):
| 任务类型 | 过期时间 | 理由 |
|---|---|---|
asl_screening_batch |
30分钟 | 单条文献筛选 |
dc_extraction_batch |
1小时 | 批量数据提取 |
dc_toolc_parse_excel |
30分钟 | Excel解析 |
rvw_review_task |
20分钟 | 审稿任务 |
asl_research_execute |
30分钟 | DeepSearch检索 |
规范5:优雅关闭 ✅
已在 index.ts 实现:
// 进程退出时优雅关闭
process.on('SIGTERM', async () => {
await fastify.close(); // 停止接收新请求
await jobQueue.stop(); // 等待当前任务完成
await prisma.$disconnect(); // 关闭数据库
process.exit(0);
});
规范6:全局错误监听 ✅
已在 PgBossQueue.ts 实现:
// 防止未捕获错误导致进程崩溃
this.boss.on('error', (err) => {
if (err.code === '23505' && err.constraint === 'queue_pkey') {
// 队列冲突,静默处理
console.log('Queue concurrency conflict auto-resolved');
} else {
console.error('PgBoss critical error:', err);
}
});
⚠️ 常见问题
Q1: Worker 注册了但不工作?
检查:
- 队列名称是否包含冒号(
:)?改为下划线(_) - 环境变量
QUEUE_TYPE=pgboss是否设置? - Worker 注册是否在
jobQueue.start()之后?
Q2: 轮询风暴(多个并发请求)?
解决:使用 React Query,不要用 setInterval
Q3: 导出数据不对(是原始数据)?
原因:saveProcessedData 没有更新 clean data
解决:同时更新 fileKey 和 cleanDataKey
📚 参考实现
| 模块 | Worker | 前端Hook | 文档 |
|---|---|---|---|
| DC Tool C | parseExcelWorker.ts |
useSessionStatus.ts |
本指南基础 |
| ASL 智能文献 | screeningWorker.ts |
useScreeningTask.ts |
ASL模块状态 |
| DC Tool B | extractionWorker.ts |
- | DC模块状态 |
✅ 检查清单
基础配置检查
- 业务表只存业务信息(不包含 status 等字段)
- 队列名称使用下划线(不含冒号)
- 环境变量
QUEUE_TYPE=pgboss已设置 - Worker 在
jobQueue.start()之后注册 - 前端使用 React Query 轮询
- Service 优先读取 clean data
- saveProcessedData 同步更新 clean data
🛡️ 安全规范检查(强制)
- 幂等性:使用
upsert或先检查状态,确保重试安全 - Payload:
job.data只存 ID 和 fileKey,不存大文件 - 错误处理:catch 块中直接
throw error,不要重试业务逻辑 - 唯一约束:数据库表有合适的唯一索引防止重复写入
- 过期时间:根据业务类型设置合理的
expireInSeconds
📊 故障排查 SQL
-- 查看队列健康状况
SELECT
name AS queue_name,
state,
COUNT(*) AS count
FROM platform_schema.job
WHERE created_on > NOW() - INTERVAL '24 hours'
GROUP BY name, state
ORDER BY name, state;
-- 查看失败任务
SELECT id, name, data, output, created_on
FROM platform_schema.job
WHERE state = 'failed'
ORDER BY created_on DESC
LIMIT 10;
-- 查看卡住的任务(processing 超过1小时)
SELECT id, name, data, created_on, started_on
FROM platform_schema.job
WHERE state = 'active'
AND started_on < NOW() - INTERVAL '1 hour';
维护者: 平台架构团队
最后更新: 2026-01-23
文档状态: ✅ 已完成(v1.1 安全规范更新)