# Postgres-Only 异步任务处理指南 > **文档版本:** v1.1(2026-01-23 安全规范更新) > **创建日期:** 2025-12-22 > **最后更新:** 2026-01-23 > **维护者:** 平台架构团队 > **适用场景:** 长时间任务(>30秒)、大文件处理、后台Worker > **参考实现:** DC Tool C Excel解析、ASL文献筛选、DC Tool B数据提取 > > ⚠️ **重要更新 v1.1**:新增[🛡️ 安全规范](#-安全规范强制)章节,包含幂等性、错误处理等强制规范 --- ## 📋 概述 本文档基于 **DC Tool C Excel解析功能** 的完整实践,总结 Postgres-Only 架构下异步任务处理的标准模式。 ### 核心价值 1. ✅ **避免HTTP超时**:上传接口3秒返回,解析在后台完成(30-60秒) 2. ✅ **用户体验优秀**:实时进度反馈,不需要傻等 3. ✅ **符合云原生规范**:Platform-Only模式,pg-boss队列 4. ✅ **性能优化**:clean data缓存,避免重复计算(-99%耗时) --- ## 🏗️ 架构设计 ### 三层架构 ``` ┌─────────────────────────────────────────────────────────┐ │ 前端层(React + React Query) │ │ - 上传文件(立即返回 sessionId + jobId) │ │ - 轮询状态(useQuery + refetchInterval,自动串行) │ │ - 监听 status='ready',加载数据 │ └─────────────────────────────────────────────────────────┘ ↓ HTTP ┌─────────────────────────────────────────────────────────┐ │ 后端层(Fastify + Prisma) │ │ - 快速上传到 OSS(2-3秒) │ │ - 创建 Session(状态:processing) │ │ - 推送任务到 pg-boss(立即返回) │ │ - 提供状态查询 API │ └─────────────────────────────────────────────────────────┘ ↓ pg-boss ┌─────────────────────────────────────────────────────────┐ │ Worker层(pg-boss + Platform层) │ │ - 从队列取任务(自动串行) │ │ - 执行耗时操作(解析、清洗、统计) │ │ - 保存结果(clean data 到 OSS) │ │ - 更新 Session(填充元数据) │ └─────────────────────────────────────────────────────────┘ ``` --- ## 🚀 完整实施步骤 ### 步骤1:数据库Schema设计 ```prisma // 业务表只存业务信息,不存任务管理信息 model YourBusinessTable { id String @id userId String fileKey String // OSS原始文件 // ✅ 性能优化:保存处理结果 cleanDataKey String? // 清洗/处理后的数据(避免重复计算) // 数据元信息(异步填充) totalRows Int? totalCols Int? columns Json? // 时间戳 createdAt DateTime updatedAt DateTime expiresAt DateTime @@schema("your_schema") } ``` **关键点**: - ❌ 不要添加 `status`、`progress`、`errorMessage` 等任务管理字段 - ✅ 这些字段由 pg-boss 的 `job` 表管理 --- ### 步骤2:Service层 - 快速上传+推送任务 ```typescript // backend/src/modules/your-module/services/YourService.ts import { storage } from '@/common/storage'; import { jobQueue } from '@/common/jobs'; import { prisma } from '@/config/database'; export class YourService { /** * 创建任务并推送到队列(Postgres-Only架构) * * ✅ Platform-Only 模式: * - 立即上传文件到 OSS * - 创建业务记录(元数据为null) * - 推送任务到队列 * - 立即返回(不阻塞请求) */ async createTask(userId: string, fileName: string, fileBuffer: Buffer) { // 1. 验证文件 if (fileBuffer.length > MAX_FILE_SIZE) { throw new Error('文件太大'); } // 2. ⚡ 立即上传到 OSS(2-3秒) const fileKey = `path/${userId}/${Date.now()}-${fileName}`; await storage.upload(fileKey, fileBuffer); // 3. ⚡ 创建业务记录(元数据为null,等Worker填充) const record = await prisma.yourTable.create({ data: { userId, fileName, fileKey, // ⚠️ 处理结果字段为 null totalRows: null, columns: null, expiresAt: new Date(Date.now() + 10 * 60 * 1000), }, }); // 4. ⚡ 推送任务到 pg-boss(Platform-Only) const job = await jobQueue.push('your_module_process', { recordId: record.id, fileKey, userId, }); // 5. ⚡ 立即返回(总耗时<3秒) return { ...record, jobId: job.id, // ✅ 返回 jobId 供前端轮询 }; } } ``` --- ### 步骤3:Worker层 - 后台处理 ```typescript // backend/src/modules/your-module/workers/yourWorker.ts import { jobQueue } from '@/common/jobs'; import { storage } from '@/common/storage'; import { prisma } from '@/config/database'; import { logger } from '@/common/logging'; interface YourJob { recordId: string; fileKey: string; userId: string; } /** * 注册 Worker 到队列 */ export function registerYourWorker() { logger.info('[YourWorker] Registering worker'); // ⚠️ 队列名称:只能用字母、数字、下划线、连字符 jobQueue.process('your_module_process', async (job) => { const { recordId, fileKey } = job.data; logger.info('[YourWorker] Processing job', { jobId: job.id, recordId }); try { // 1. 从 OSS 下载文件 const buffer = await storage.download(fileKey); // 2. 执行耗时操作(解析、处理、计算) const result = await yourLongTimeProcess(buffer); const { processedData, totalRows, columns } = result; // 3. ✅ 保存处理结果到 OSS(避免重复计算) const cleanDataKey = `${fileKey}_clean.json`; const cleanDataBuffer = Buffer.from(JSON.stringify(processedData), 'utf-8'); await storage.upload(cleanDataKey, cleanDataBuffer); logger.info('[YourWorker] Clean data saved', { size: `${(cleanDataBuffer.length / 1024).toFixed(2)} KB` }); // 4. 更新业务记录(填充元数据) await prisma.yourTable.update({ where: { id: recordId }, data: { cleanDataKey, // ✅ 保存 clean data 位置 totalRows, columns, updatedAt: new Date(), }, }); logger.info('[YourWorker] ✅ Job completed', { jobId: job.id }); return { success: true, recordId, totalRows }; } catch (error: any) { logger.error('[YourWorker] ❌ Job failed', { jobId: job.id, error: error.message }); throw error; // 让 pg-boss 处理重试 } }); logger.info('[YourWorker] ✅ Worker registered: your_module_process'); } ``` --- ### 步骤4:Controller层 - 状态查询API ```typescript // backend/src/modules/your-module/controllers/YourController.ts import { jobQueue } from '@/common/jobs'; export class YourController { /** * 获取任务状态(Platform-Only模式) * * GET /api/v1/your-module/tasks/:id/status * Query: jobId (可选) */ async getTaskStatus(request, reply) { const { id: recordId } = request.params; const { jobId } = request.query; // 1. 查询业务记录 const record = await prisma.yourTable.findUnique({ where: { id: recordId } }); if (!record) { return reply.code(404).send({ success: false, error: '记录不存在' }); } // 2. 判断状态 // - 如果 totalRows 不为 null,说明处理完成 // - 否则查询 job 状态 if (record.totalRows !== null) { return reply.send({ success: true, data: { recordId, status: 'ready', // ✅ 处理完成 progress: 100, record, }, }); } // 3. 处理中,查询 pg-boss if (!jobId) { return reply.send({ success: true, data: { recordId, status: 'processing', progress: 50, }, }); } // 4. 从 pg-boss 查询 job 状态 const job = await jobQueue.getJob(jobId); const status = job?.status === 'completed' ? 'ready' : job?.status === 'failed' ? 'error' : 'processing'; const progress = status === 'ready' ? 100 : status === 'error' ? 0 : 70; return reply.send({ success: true, data: { recordId, jobId, status, progress, record, }, }); } } ``` --- ### 步骤5:前端 - React Query 轮询 ```typescript // frontend-v2/src/modules/your-module/hooks/useTaskStatus.ts import { useQuery } from '@tanstack/react-query'; import * as api from '../api'; /** * 任务状态轮询 Hook * * 特点: * - 自动串行轮询(React Query 内置防并发) * - 自动清理(组件卸载时停止) * - 条件停止(完成/失败时自动停止) */ export function useTaskStatus({ recordId, jobId, enabled = true, }) { const { data, isLoading, error } = useQuery({ queryKey: ['taskStatus', recordId, jobId], queryFn: () => api.getTaskStatus(recordId, jobId), enabled: enabled && !!recordId && !!jobId, refetchInterval: (query) => { const status = query.state.data?.data?.status; // ✅ 完成或失败时停止轮询 if (status === 'ready' || status === 'error') { return false; } // ✅ 处理中时每2秒轮询(自动串行) return 2000; }, staleTime: 0, // 始终视为过时,确保轮询 retry: 1, }); const statusInfo = data?.data; const status = statusInfo?.status || 'processing'; const progress = statusInfo?.progress || 0; return { status, progress, isReady: status === 'ready', isError: status === 'error', isLoading, error, }; } ``` --- ### 步骤6:前端组件 - 使用Hook ```typescript // frontend-v2/src/modules/your-module/pages/YourPage.tsx import { useTaskStatus } from '../hooks/useTaskStatus'; const YourPage = () => { const [pollingInfo, setPollingInfo] = useState<{ recordId: string; jobId: string; } | null>(null); // ✅ 使用 React Query Hook 自动轮询 const { status, progress, isReady } = useTaskStatus({ recordId: pollingInfo?.recordId || null, jobId: pollingInfo?.jobId || null, enabled: !!pollingInfo, }); // ✅ 监听状态变化 useEffect(() => { if (isReady && pollingInfo) { console.log('✅ 处理完成,加载数据'); // 停止轮询 setPollingInfo(null); // 加载数据 loadData(pollingInfo.recordId); } }, [isReady, pollingInfo]); // 上传文件 const handleUpload = async (file) => { const result = await api.uploadFile(file); const { recordId, jobId } = result.data; // ✅ 启动轮询(设置状态,React Query自动开始) setPollingInfo({ recordId, jobId }); }; return (
{/* 进度条 */} {pollingInfo && (
{progress}%
)} {/* 上传按钮 */}
); }; ``` --- ## 🎯 关键技术点 ### 1. 队列名称规范 **错误**: ```typescript ❌ 'asl:screening:batch' // 包含冒号,pg-boss不支持 ❌ 'dc.toolc.parse' // 包含点号,不推荐 ``` **正确**: ```typescript ✅ 'asl_screening_batch' // 下划线 ✅ 'dc_toolc_parse_excel' // 下划线 ``` --- ### 2. Worker注册时机 ```typescript // backend/src/index.ts await jobQueue.start(); // ← 必须先启动队列 registerYourWorker(); // ← 再注册 Worker registerOtherWorker(); // ✅ 等待3秒,确保异步注册完成 await new Promise(resolve => setTimeout(resolve, 3000)); logger.info('✅ All workers registered'); ``` --- ### 3. clean data 缓存机制 **目的**:避免重复计算(性能提升99%) ```typescript // Worker 保存 clean data const cleanDataKey = `${fileKey}_clean.json`; await storage.upload(cleanDataKey, JSON.stringify(processedData)); await prisma.update({ where: { id }, data: { cleanDataKey, // ← 记录位置 totalRows, columns, } }); // Service 读取数据(优先 clean data) async getFullData(recordId) { const record = await prisma.findUnique({ where: { id: recordId } }); // ✅ 优先读取 clean data(<1秒) if (record.cleanDataKey) { const buffer = await storage.download(record.cleanDataKey); return JSON.parse(buffer.toString('utf-8')); } // ⚠️ Fallback:重新解析(兼容旧数据) const buffer = await storage.download(record.fileKey); return parseFile(buffer); } // ⚠️ 重要:操作后要同步更新 clean data async saveProcessedData(recordId, newData) { const record = await getRecord(recordId); // 覆盖原文件 await storage.upload(record.fileKey, toExcel(newData)); // ✅ 同时更新 clean data if (record.cleanDataKey) { await storage.upload(record.cleanDataKey, JSON.stringify(newData)); } // 更新元数据 await prisma.update({ where: { id: recordId }, data: { ... } }); } ``` --- ### 4. React Query 轮询(推荐) **优点**: - ✅ 自动串行(防并发风暴) - ✅ 自动去重(同一queryKey只有一个请求) - ✅ 自动清理(组件卸载时停止) - ✅ 条件停止(动态控制) **不要使用 setInterval**: ```typescript ❌ const pollInterval = setInterval(() => { api.getStatus(); // 可能并发 }, 2000); ``` --- ## 📊 性能对比 ### DC Tool C 实际数据(3339行×151列文件) | 指标 | 同步处理 | 异步处理 | 改善 | |------|---------|---------|------| | **上传耗时** | 47秒(阻塞) | 3秒(立即返回) | ✅ -94% | | **HTTP超时** | ❌ 经常超时 | ✅ 不会超时 | ✅ 100% | | **getPreviewData** | 43秒(重复解析) | 0.5秒(缓存) | ✅ -99% | | **getFullData** | 43秒(重复解析) | 0.5秒(缓存) | ✅ -99% | | **QuickAction操作** | 43秒 + Python | 0.5秒 + Python | ✅ -95% | | **并发请求** | 15+个 | 1个(串行) | ✅ -93% | --- ## 🛡️ 安全规范(强制) > **更新日期**:2026-01-23 > **来源**:内部逆向审查报告 + 生产问题修复 基于项目实际遇到的问题,以下规范 **必须遵守**: ### 规范1:禁止 Worker 递归死循环 ❌ **错误示例**: ```typescript // ❌ 禁止:在 catch 块中重试业务逻辑 jobQueue.process('your_task', async (job) => { try { await doSomething(job.data); } catch (error) { // ❌ 错误!这会导致死循环或重复执行 await doSomething(job.data); throw error; } }); ``` **正确做法**: ```typescript // ✅ 正确:直接 throw,让 pg-boss 接管重试(默认3次) jobQueue.process('your_task', async (job) => { try { await doSomething(job.data); } catch (error) { logger.error('Job failed', { jobId: job.id, error: error.message }); throw error; // ✅ pg-boss 会自动重试 } }); ``` --- ### 规范2:禁止 Payload 膨胀 ❌ **错误示例**: ```typescript // ❌ 禁止:在 job.data 中存大文件 await jobQueue.push('parse_excel', { fileContent: base64EncodedFile, // ❌ 会导致 job 表膨胀 imageData: base64Image, // ❌ 拖慢数据库 }); ``` **正确做法**: ```typescript // ✅ 正确:只存 fileKey 或数据库 ID await jobQueue.push('parse_excel', { sessionId: session.id, // ✅ 只存 ID fileKey: 'path/to/file', // ✅ 只存 OSS 路径 userId: 'user-123', }); ``` --- ### 规范3:Worker 必须幂等 ⭐ **问题**:任务失败重试时,可能导致重复写入、重复扣费、重复发邮件。 **错误示例**: ```typescript // ❌ 非幂等:重试会创建多条记录 await prisma.screeningResult.create({ data: { projectId, literatureId, result } }); ``` **正确做法**: ```typescript // ✅ 方案1:使用 upsert + 唯一约束 await prisma.screeningResult.upsert({ where: { projectId_literatureId: { projectId, literatureId } }, create: { projectId, literatureId, result }, update: { result }, // 重试时覆盖 }); // ✅ 方案2:先检查状态再执行 const existing = await prisma.task.findUnique({ where: { id: taskId } }); if (existing?.status === 'completed') { logger.info('Task already completed, skipping'); return; } await doWork(); ``` **幂等性检查清单**: | 操作类型 | 幂等方案 | |---------|---------| | 创建记录 | 使用 `upsert` + 唯一约束 | | 更新记录 | `update` 天然幂等 | | 发送邮件 | 先检查 `notificationSent` 标志 | | 扣费 | 使用幂等 key(如订单号) | | 调用外部API | 检查是否已成功 | --- ### 规范4:合理设置任务过期时间 **默认配置**(当前): ```typescript expireInSeconds: 6 * 60 * 60 // 6小时 ``` **推荐配置**(按业务类型): | 任务类型 | 过期时间 | 理由 | |---------|---------|------| | `asl_screening_batch` | 30分钟 | 单条文献筛选 | | `dc_extraction_batch` | 1小时 | 批量数据提取 | | `dc_toolc_parse_excel` | 30分钟 | Excel解析 | | `rvw_review_task` | 20分钟 | 审稿任务 | | `asl_research_execute` | 30分钟 | DeepSearch检索 | --- ### 规范5:优雅关闭 ✅ **已在 `index.ts` 实现**: ```typescript // 进程退出时优雅关闭 process.on('SIGTERM', async () => { await fastify.close(); // 停止接收新请求 await jobQueue.stop(); // 等待当前任务完成 await prisma.$disconnect(); // 关闭数据库 process.exit(0); }); ``` --- ### 规范6:全局错误监听 ✅ **已在 `PgBossQueue.ts` 实现**: ```typescript // 防止未捕获错误导致进程崩溃 this.boss.on('error', (err) => { if (err.code === '23505' && err.constraint === 'queue_pkey') { // 队列冲突,静默处理 console.log('Queue concurrency conflict auto-resolved'); } else { console.error('PgBoss critical error:', err); } }); ``` --- ## ⚠️ 常见问题 ### Q1: Worker 注册了但不工作? **检查**: - 队列名称是否包含冒号(`:`)?改为下划线(`_`) - 环境变量 `QUEUE_TYPE=pgboss` 是否设置? - Worker 注册是否在 `jobQueue.start()` 之后? ### Q2: 轮询风暴(多个并发请求)? **解决**:使用 React Query,不要用 setInterval ### Q3: 导出数据不对(是原始数据)? **原因**:`saveProcessedData` 没有更新 clean data **解决**:同时更新 fileKey 和 cleanDataKey --- ## 📚 参考实现 | 模块 | Worker | 前端Hook | 文档 | |------|--------|---------|------| | **DC Tool C** | `parseExcelWorker.ts` | `useSessionStatus.ts` | 本指南基础 | | **ASL 智能文献** | `screeningWorker.ts` | `useScreeningTask.ts` | [ASL模块状态](../03-业务模块/ASL-AI智能文献/00-模块当前状态与开发指南.md) | | **DC Tool B** | `extractionWorker.ts` | - | [DC模块状态](../03-业务模块/DC-数据清洗整理/00-模块当前状态与开发指南.md) | | **🆕ASL 工具 3** | 散装派发 + Aggregator | `useTaskStatus.ts` | [散装派发与轮询收口指南](./散装派发与轮询收口任务模式指南.md)(Level 2 批量模式) | --- ## ✅ 检查清单 ### 基础配置检查 - [ ] 业务表只存业务信息(不包含 status 等字段) - [ ] 队列名称使用下划线(不含冒号) - [ ] 环境变量 `QUEUE_TYPE=pgboss` 已设置 - [ ] Worker 在 `jobQueue.start()` 之后注册 - [ ] 前端使用 React Query 轮询 - [ ] Service 优先读取 clean data - [ ] saveProcessedData 同步更新 clean data ### 🛡️ 安全规范检查(强制) - [ ] **幂等性**:使用 `upsert` 或先检查状态,确保重试安全 - [ ] **Payload**:`job.data` 只存 ID 和 fileKey,不存大文件 - [ ] **错误处理**:catch 块中直接 `throw error`,不要重试业务逻辑 - [ ] **唯一约束**:数据库表有合适的唯一索引防止重复写入 - [ ] **过期时间**:根据业务类型设置合理的 `expireInSeconds` --- ## 📊 故障排查 SQL ```sql -- 查看队列健康状况 SELECT name AS queue_name, state, COUNT(*) AS count FROM platform_schema.job WHERE created_on > NOW() - INTERVAL '24 hours' GROUP BY name, state ORDER BY name, state; -- 查看失败任务 SELECT id, name, data, output, created_on FROM platform_schema.job WHERE state = 'failed' ORDER BY created_on DESC LIMIT 10; -- 查看卡住的任务(processing 超过1小时) SELECT id, name, data, created_on, started_on FROM platform_schema.job WHERE state = 'active' AND started_on < NOW() - INTERVAL '1 hour'; ``` --- **维护者**: 平台架构团队 **最后更新**: 2026-01-23 **文档状态**: ✅ 已完成(v1.1 安全规范更新)