Files
AIclinicalresearch/docs/02-通用能力层/Postgres-Only异步任务处理指南.md
HaHafeng 61cdc97eeb feat(platform): Fix pg-boss queue conflict and add safety standards
Summary:
- Fix pg-boss queue conflict (duplicate key violation on queue_pkey)
- Add global error listener to prevent process crash
- Reduce connection pool from 10 to 4
- Add graceful shutdown handling (SIGTERM/SIGINT)
- Fix researchWorker recursive call bug in catch block
- Make screeningWorker idempotent using upsert

Security Standards (v1.1):
- Prohibit recursive retry in Worker catch blocks
- Prohibit payload bloat (only store fileKey/ID in job.data)
- Require Worker idempotency (upsert + unique constraint)
- Recommend task-specific expireInSeconds settings
- Document graceful shutdown pattern

New Features:
- PKB signed URL endpoint for document preview/download
- pg_bigm installation guide for Docker
- Dockerfile.postgres-with-extensions for pgvector + pg_bigm

Documentation:
- Update Postgres-Only async task processing guide (v1.1)
- Add troubleshooting SQL queries
- Update safety checklist

Tested: Local verification passed
2026-01-23 22:07:26 +08:00

825 lines
21 KiB
Markdown
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# Postgres-Only 异步任务处理指南
> **文档版本:** v1.12026-01-23 安全规范更新)
> **创建日期:** 2025-12-22
> **最后更新:** 2026-01-23
> **维护者:** 平台架构团队
> **适用场景:** 长时间任务(>30秒、大文件处理、后台Worker
> **参考实现:** DC Tool C Excel解析、ASL文献筛选、DC Tool B数据提取
>
> ⚠️ **重要更新 v1.1**:新增[🛡️ 安全规范](#-安全规范强制)章节,包含幂等性、错误处理等强制规范
---
## 📋 概述
本文档基于 **DC Tool C Excel解析功能** 的完整实践,总结 Postgres-Only 架构下异步任务处理的标准模式。
### 核心价值
1.**避免HTTP超时**上传接口3秒返回解析在后台完成30-60秒
2.**用户体验优秀**:实时进度反馈,不需要傻等
3.**符合云原生规范**Platform-Only模式pg-boss队列
4.**性能优化**clean data缓存避免重复计算-99%耗时)
---
## 🏗️ 架构设计
### 三层架构
```
┌─────────────────────────────────────────────────────────┐
│ 前端层React + React Query
│ - 上传文件(立即返回 sessionId + jobId
│ - 轮询状态useQuery + refetchInterval自动串行
│ - 监听 status='ready',加载数据 │
└─────────────────────────────────────────────────────────┘
↓ HTTP
┌─────────────────────────────────────────────────────────┐
│ 后端层Fastify + Prisma
│ - 快速上传到 OSS2-3秒
│ - 创建 Session状态processing
│ - 推送任务到 pg-boss立即返回
│ - 提供状态查询 API │
└─────────────────────────────────────────────────────────┘
↓ pg-boss
┌─────────────────────────────────────────────────────────┐
│ Worker层pg-boss + Platform层
│ - 从队列取任务(自动串行) │
│ - 执行耗时操作(解析、清洗、统计) │
│ - 保存结果clean data 到 OSS
│ - 更新 Session填充元数据
└─────────────────────────────────────────────────────────┘
```
---
## 🚀 完整实施步骤
### 步骤1数据库Schema设计
```prisma
// 业务表只存业务信息,不存任务管理信息
model YourBusinessTable {
id String @id
userId String
fileKey String // OSS原始文件
// ✅ 性能优化:保存处理结果
cleanDataKey String? // 清洗/处理后的数据(避免重复计算)
// 数据元信息(异步填充)
totalRows Int?
totalCols Int?
columns Json?
// 时间戳
createdAt DateTime
updatedAt DateTime
expiresAt DateTime
@@schema("your_schema")
}
```
**关键点**
- ❌ 不要添加 `status``progress``errorMessage` 等任务管理字段
- ✅ 这些字段由 pg-boss 的 `job` 表管理
---
### 步骤2Service层 - 快速上传+推送任务
```typescript
// backend/src/modules/your-module/services/YourService.ts
import { storage } from '@/common/storage';
import { jobQueue } from '@/common/jobs';
import { prisma } from '@/config/database';
export class YourService {
/**
* 创建任务并推送到队列Postgres-Only架构
*
* ✅ Platform-Only 模式:
* - 立即上传文件到 OSS
* - 创建业务记录元数据为null
* - 推送任务到队列
* - 立即返回(不阻塞请求)
*/
async createTask(userId: string, fileName: string, fileBuffer: Buffer) {
// 1. 验证文件
if (fileBuffer.length > MAX_FILE_SIZE) {
throw new Error('文件太大');
}
// 2. ⚡ 立即上传到 OSS2-3秒
const fileKey = `path/${userId}/${Date.now()}-${fileName}`;
await storage.upload(fileKey, fileBuffer);
// 3. ⚡ 创建业务记录元数据为null等Worker填充
const record = await prisma.yourTable.create({
data: {
userId,
fileName,
fileKey,
// ⚠️ 处理结果字段为 null
totalRows: null,
columns: null,
expiresAt: new Date(Date.now() + 10 * 60 * 1000),
},
});
// 4. ⚡ 推送任务到 pg-bossPlatform-Only
const job = await jobQueue.push('your_module_process', {
recordId: record.id,
fileKey,
userId,
});
// 5. ⚡ 立即返回(总耗时<3秒
return {
...record,
jobId: job.id, // ✅ 返回 jobId 供前端轮询
};
}
}
```
---
### 步骤3Worker层 - 后台处理
```typescript
// backend/src/modules/your-module/workers/yourWorker.ts
import { jobQueue } from '@/common/jobs';
import { storage } from '@/common/storage';
import { prisma } from '@/config/database';
import { logger } from '@/common/logging';
interface YourJob {
recordId: string;
fileKey: string;
userId: string;
}
/**
* 注册 Worker 到队列
*/
export function registerYourWorker() {
logger.info('[YourWorker] Registering worker');
// ⚠️ 队列名称:只能用字母、数字、下划线、连字符
jobQueue.process<YourJob>('your_module_process', async (job) => {
const { recordId, fileKey } = job.data;
logger.info('[YourWorker] Processing job', { jobId: job.id, recordId });
try {
// 1. 从 OSS 下载文件
const buffer = await storage.download(fileKey);
// 2. 执行耗时操作(解析、处理、计算)
const result = await yourLongTimeProcess(buffer);
const { processedData, totalRows, columns } = result;
// 3. ✅ 保存处理结果到 OSS避免重复计算
const cleanDataKey = `${fileKey}_clean.json`;
const cleanDataBuffer = Buffer.from(JSON.stringify(processedData), 'utf-8');
await storage.upload(cleanDataKey, cleanDataBuffer);
logger.info('[YourWorker] Clean data saved', {
size: `${(cleanDataBuffer.length / 1024).toFixed(2)} KB`
});
// 4. 更新业务记录(填充元数据)
await prisma.yourTable.update({
where: { id: recordId },
data: {
cleanDataKey, // ✅ 保存 clean data 位置
totalRows,
columns,
updatedAt: new Date(),
},
});
logger.info('[YourWorker] ✅ Job completed', { jobId: job.id });
return { success: true, recordId, totalRows };
} catch (error: any) {
logger.error('[YourWorker] ❌ Job failed', {
jobId: job.id,
error: error.message
});
throw error; // 让 pg-boss 处理重试
}
});
logger.info('[YourWorker] ✅ Worker registered: your_module_process');
}
```
---
### 步骤4Controller层 - 状态查询API
```typescript
// backend/src/modules/your-module/controllers/YourController.ts
import { jobQueue } from '@/common/jobs';
export class YourController {
/**
* 获取任务状态Platform-Only模式
*
* GET /api/v1/your-module/tasks/:id/status
* Query: jobId (可选)
*/
async getTaskStatus(request, reply) {
const { id: recordId } = request.params;
const { jobId } = request.query;
// 1. 查询业务记录
const record = await prisma.yourTable.findUnique({
where: { id: recordId }
});
if (!record) {
return reply.code(404).send({ success: false, error: '记录不存在' });
}
// 2. 判断状态
// - 如果 totalRows 不为 null说明处理完成
// - 否则查询 job 状态
if (record.totalRows !== null) {
return reply.send({
success: true,
data: {
recordId,
status: 'ready', // ✅ 处理完成
progress: 100,
record,
},
});
}
// 3. 处理中,查询 pg-boss
if (!jobId) {
return reply.send({
success: true,
data: {
recordId,
status: 'processing',
progress: 50,
},
});
}
// 4. 从 pg-boss 查询 job 状态
const job = await jobQueue.getJob(jobId);
const status = job?.status === 'completed' ? 'ready' :
job?.status === 'failed' ? 'error' : 'processing';
const progress = status === 'ready' ? 100 :
status === 'error' ? 0 : 70;
return reply.send({
success: true,
data: {
recordId,
jobId,
status,
progress,
record,
},
});
}
}
```
---
### 步骤5前端 - React Query 轮询
```typescript
// frontend-v2/src/modules/your-module/hooks/useTaskStatus.ts
import { useQuery } from '@tanstack/react-query';
import * as api from '../api';
/**
* 任务状态轮询 Hook
*
* 特点:
* - 自动串行轮询React Query 内置防并发)
* - 自动清理(组件卸载时停止)
* - 条件停止(完成/失败时自动停止)
*/
export function useTaskStatus({
recordId,
jobId,
enabled = true,
}) {
const { data, isLoading, error } = useQuery({
queryKey: ['taskStatus', recordId, jobId],
queryFn: () => api.getTaskStatus(recordId, jobId),
enabled: enabled && !!recordId && !!jobId,
refetchInterval: (query) => {
const status = query.state.data?.data?.status;
// ✅ 完成或失败时停止轮询
if (status === 'ready' || status === 'error') {
return false;
}
// ✅ 处理中时每2秒轮询自动串行
return 2000;
},
staleTime: 0, // 始终视为过时,确保轮询
retry: 1,
});
const statusInfo = data?.data;
const status = statusInfo?.status || 'processing';
const progress = statusInfo?.progress || 0;
return {
status,
progress,
isReady: status === 'ready',
isError: status === 'error',
isLoading,
error,
};
}
```
---
### 步骤6前端组件 - 使用Hook
```typescript
// frontend-v2/src/modules/your-module/pages/YourPage.tsx
import { useTaskStatus } from '../hooks/useTaskStatus';
const YourPage = () => {
const [pollingInfo, setPollingInfo] = useState<{
recordId: string;
jobId: string;
} | null>(null);
// ✅ 使用 React Query Hook 自动轮询
const { status, progress, isReady } = useTaskStatus({
recordId: pollingInfo?.recordId || null,
jobId: pollingInfo?.jobId || null,
enabled: !!pollingInfo,
});
// ✅ 监听状态变化
useEffect(() => {
if (isReady && pollingInfo) {
console.log('✅ 处理完成,加载数据');
// 停止轮询
setPollingInfo(null);
// 加载数据
loadData(pollingInfo.recordId);
}
}, [isReady, pollingInfo]);
// 上传文件
const handleUpload = async (file) => {
const result = await api.uploadFile(file);
const { recordId, jobId } = result.data;
// ✅ 启动轮询设置状态React Query自动开始
setPollingInfo({ recordId, jobId });
};
return (
<div>
{/* 进度条 */}
{pollingInfo && (
<div className="progress-bar">
<div style={{ width: `${progress}%` }} />
<span>{progress}%</span>
</div>
)}
{/* 上传按钮 */}
<button onClick={() => handleUpload(file)}></button>
</div>
);
};
```
---
## 🎯 关键技术点
### 1. 队列名称规范
**错误**
```typescript
'asl:screening:batch' // 包含冒号pg-boss不支持
'dc.toolc.parse' // 包含点号,不推荐
```
**正确**
```typescript
'asl_screening_batch' // 下划线
'dc_toolc_parse_excel' // 下划线
```
---
### 2. Worker注册时机
```typescript
// backend/src/index.ts
await jobQueue.start(); // ← 必须先启动队列
registerYourWorker(); // ← 再注册 Worker
registerOtherWorker();
// ✅ 等待3秒确保异步注册完成
await new Promise(resolve => setTimeout(resolve, 3000));
logger.info('✅ All workers registered');
```
---
### 3. clean data 缓存机制
**目的**避免重复计算性能提升99%
```typescript
// Worker 保存 clean data
const cleanDataKey = `${fileKey}_clean.json`;
await storage.upload(cleanDataKey, JSON.stringify(processedData));
await prisma.update({
where: { id },
data: {
cleanDataKey, // ← 记录位置
totalRows,
columns,
}
});
// Service 读取数据(优先 clean data
async getFullData(recordId) {
const record = await prisma.findUnique({ where: { id: recordId } });
// ✅ 优先读取 clean data<1秒
if (record.cleanDataKey) {
const buffer = await storage.download(record.cleanDataKey);
return JSON.parse(buffer.toString('utf-8'));
}
// ⚠️ Fallback重新解析兼容旧数据
const buffer = await storage.download(record.fileKey);
return parseFile(buffer);
}
// ⚠️ 重要:操作后要同步更新 clean data
async saveProcessedData(recordId, newData) {
const record = await getRecord(recordId);
// 覆盖原文件
await storage.upload(record.fileKey, toExcel(newData));
// ✅ 同时更新 clean data
if (record.cleanDataKey) {
await storage.upload(record.cleanDataKey, JSON.stringify(newData));
}
// 更新元数据
await prisma.update({ where: { id: recordId }, data: { ... } });
}
```
---
### 4. React Query 轮询(推荐)
**优点**
- ✅ 自动串行(防并发风暴)
- ✅ 自动去重同一queryKey只有一个请求
- ✅ 自动清理(组件卸载时停止)
- ✅ 条件停止(动态控制)
**不要使用 setInterval**
```typescript
const pollInterval = setInterval(() => {
api.getStatus(); // 可能并发
}, 2000);
```
---
## 📊 性能对比
### DC Tool C 实际数据3339行×151列文件
| 指标 | 同步处理 | 异步处理 | 改善 |
|------|---------|---------|------|
| **上传耗时** | 47秒阻塞 | 3秒立即返回 | ✅ -94% |
| **HTTP超时** | ❌ 经常超时 | ✅ 不会超时 | ✅ 100% |
| **getPreviewData** | 43秒重复解析 | 0.5秒(缓存) | ✅ -99% |
| **getFullData** | 43秒重复解析 | 0.5秒(缓存) | ✅ -99% |
| **QuickAction操作** | 43秒 + Python | 0.5秒 + Python | ✅ -95% |
| **并发请求** | 15+个 | 1个串行 | ✅ -93% |
---
## 🛡️ 安全规范(强制)
> **更新日期**2026-01-23
> **来源**:内部逆向审查报告 + 生产问题修复
基于项目实际遇到的问题,以下规范 **必须遵守**
### 规范1禁止 Worker 递归死循环 ❌
**错误示例**
```typescript
// ❌ 禁止:在 catch 块中重试业务逻辑
jobQueue.process('your_task', async (job) => {
try {
await doSomething(job.data);
} catch (error) {
// ❌ 错误!这会导致死循环或重复执行
await doSomething(job.data);
throw error;
}
});
```
**正确做法**
```typescript
// ✅ 正确:直接 throw让 pg-boss 接管重试默认3次
jobQueue.process('your_task', async (job) => {
try {
await doSomething(job.data);
} catch (error) {
logger.error('Job failed', { jobId: job.id, error: error.message });
throw error; // ✅ pg-boss 会自动重试
}
});
```
---
### 规范2禁止 Payload 膨胀 ❌
**错误示例**
```typescript
// ❌ 禁止:在 job.data 中存大文件
await jobQueue.push('parse_excel', {
fileContent: base64EncodedFile, // ❌ 会导致 job 表膨胀
imageData: base64Image, // ❌ 拖慢数据库
});
```
**正确做法**
```typescript
// ✅ 正确:只存 fileKey 或数据库 ID
await jobQueue.push('parse_excel', {
sessionId: session.id, // ✅ 只存 ID
fileKey: 'path/to/file', // ✅ 只存 OSS 路径
userId: 'user-123',
});
```
---
### 规范3Worker 必须幂等 ⭐
**问题**:任务失败重试时,可能导致重复写入、重复扣费、重复发邮件。
**错误示例**
```typescript
// ❌ 非幂等:重试会创建多条记录
await prisma.screeningResult.create({
data: { projectId, literatureId, result }
});
```
**正确做法**
```typescript
// ✅ 方案1使用 upsert + 唯一约束
await prisma.screeningResult.upsert({
where: {
projectId_literatureId: { projectId, literatureId }
},
create: { projectId, literatureId, result },
update: { result }, // 重试时覆盖
});
// ✅ 方案2先检查状态再执行
const existing = await prisma.task.findUnique({ where: { id: taskId } });
if (existing?.status === 'completed') {
logger.info('Task already completed, skipping');
return;
}
await doWork();
```
**幂等性检查清单**
| 操作类型 | 幂等方案 |
|---------|---------|
| 创建记录 | 使用 `upsert` + 唯一约束 |
| 更新记录 | `update` 天然幂等 |
| 发送邮件 | 先检查 `notificationSent` 标志 |
| 扣费 | 使用幂等 key如订单号 |
| 调用外部API | 检查是否已成功 |
---
### 规范4合理设置任务过期时间
**默认配置**(当前):
```typescript
expireInSeconds: 6 * 60 * 60 // 6小时
```
**推荐配置**(按业务类型):
| 任务类型 | 过期时间 | 理由 |
|---------|---------|------|
| `asl_screening_batch` | 30分钟 | 单条文献筛选 |
| `dc_extraction_batch` | 1小时 | 批量数据提取 |
| `dc_toolc_parse_excel` | 30分钟 | Excel解析 |
| `rvw_review_task` | 20分钟 | 审稿任务 |
| `asl_research_execute` | 30分钟 | DeepSearch检索 |
---
### 规范5优雅关闭 ✅
**已在 `index.ts` 实现**
```typescript
// 进程退出时优雅关闭
process.on('SIGTERM', async () => {
await fastify.close(); // 停止接收新请求
await jobQueue.stop(); // 等待当前任务完成
await prisma.$disconnect(); // 关闭数据库
process.exit(0);
});
```
---
### 规范6全局错误监听 ✅
**已在 `PgBossQueue.ts` 实现**
```typescript
// 防止未捕获错误导致进程崩溃
this.boss.on('error', (err) => {
if (err.code === '23505' && err.constraint === 'queue_pkey') {
// 队列冲突,静默处理
console.log('Queue concurrency conflict auto-resolved');
} else {
console.error('PgBoss critical error:', err);
}
});
```
---
## ⚠️ 常见问题
### Q1: Worker 注册了但不工作?
**检查**
- 队列名称是否包含冒号(`:`)?改为下划线(`_`
- 环境变量 `QUEUE_TYPE=pgboss` 是否设置?
- Worker 注册是否在 `jobQueue.start()` 之后?
### Q2: 轮询风暴(多个并发请求)?
**解决**:使用 React Query不要用 setInterval
### Q3: 导出数据不对(是原始数据)?
**原因**`saveProcessedData` 没有更新 clean data
**解决**:同时更新 fileKey 和 cleanDataKey
---
## 📚 参考实现
| 模块 | Worker | 前端Hook | 文档 |
|------|--------|---------|------|
| **DC Tool C** | `parseExcelWorker.ts` | `useSessionStatus.ts` | 本指南基础 |
| **ASL 智能文献** | `screeningWorker.ts` | `useScreeningTask.ts` | [ASL模块状态](../03-业务模块/ASL-AI智能文献/00-模块当前状态与开发指南.md) |
| **DC Tool B** | `extractionWorker.ts` | - | [DC模块状态](../03-业务模块/DC-数据清洗整理/00-模块当前状态与开发指南.md) |
---
## ✅ 检查清单
### 基础配置检查
- [ ] 业务表只存业务信息(不包含 status 等字段)
- [ ] 队列名称使用下划线(不含冒号)
- [ ] 环境变量 `QUEUE_TYPE=pgboss` 已设置
- [ ] Worker 在 `jobQueue.start()` 之后注册
- [ ] 前端使用 React Query 轮询
- [ ] Service 优先读取 clean data
- [ ] saveProcessedData 同步更新 clean data
### 🛡️ 安全规范检查(强制)
- [ ] **幂等性**:使用 `upsert` 或先检查状态,确保重试安全
- [ ] **Payload**`job.data` 只存 ID 和 fileKey不存大文件
- [ ] **错误处理**catch 块中直接 `throw error`,不要重试业务逻辑
- [ ] **唯一约束**:数据库表有合适的唯一索引防止重复写入
- [ ] **过期时间**:根据业务类型设置合理的 `expireInSeconds`
---
## 📊 故障排查 SQL
```sql
-- 查看队列健康状况
SELECT
name AS queue_name,
state,
COUNT(*) AS count
FROM platform_schema.job
WHERE created_on > NOW() - INTERVAL '24 hours'
GROUP BY name, state
ORDER BY name, state;
-- 查看失败任务
SELECT id, name, data, output, created_on
FROM platform_schema.job
WHERE state = 'failed'
ORDER BY created_on DESC
LIMIT 10;
-- 查看卡住的任务processing 超过1小时
SELECT id, name, data, created_on, started_on
FROM platform_schema.job
WHERE state = 'active'
AND started_on < NOW() - INTERVAL '1 hour';
```
---
**维护者**: 平台架构团队
**最后更新**: 2026-01-23
**文档状态**: ✅ 已完成v1.1 安全规范更新)