# Postgres-Only 异步任务处理指南 > **文档版本?* v1.0 > **创建日期?* 2025-12-22 > **维护者:** 平台架构团队 > **适用场景?* 长时间任务(>30秒)、大文件处理、后台Worker > **参考实现:** DC Tool C Excel解析、ASL文献筛选、DC Tool B数据提取 --- ## 📋 概述 本文档基?**DC Tool C Excel解析功能** 的完整实践,总结 Postgres-Only 架构下异步任务处理的标准模式? ### 核心价? 1. ?**避免HTTP超时**:上传接?秒返回,解析在后台完成(30-60秒) 2. ?**用户体验优秀**:实时进度反馈,不需要傻? 3. ?**符合云原生规?*:Platform-Only模式,pg-boss队列 4. ?**性能优化**:clean data缓存,避免重复计算(-99%耗时? --- ## 🏗?架构设计 ### 三层架构 ``` ┌─────────────────────────────────────────────────────────? ? 前端层(React + React Query? ? ? - 上传文件(立即返?sessionId + jobId? ? ? - 轮询状态(useQuery + refetchInterval,自动串行) ? ? - 监听 status='ready',加载数? ? └─────────────────────────────────────────────────────────? ?HTTP ┌─────────────────────────────────────────────────────────? ? 后端层(Fastify + Prisma? ? ? - 快速上传到 OSS?-3秒) ? ? - 创建 Session(状态:processing? ? ? - 推送任务到 pg-boss(立即返回) ? ? - 提供状态查?API ? └─────────────────────────────────────────────────────────? ?pg-boss ┌─────────────────────────────────────────────────────────? ? Worker层(pg-boss + Platform层) ? ? - 从队列取任务(自动串行) ? ? - 执行耗时操作(解析、清洗、统计) ? ? - 保存结果(clean data ?OSS? ? ? - 更新 Session(填充元数据? ? └─────────────────────────────────────────────────────────? ``` --- ## 🚀 完整实施步骤 ### 步骤1:数据库Schema设计 ```prisma // 业务表只存业务信息,不存任务管理信息 model YourBusinessTable { id String @id userId String fileKey String // OSS原始文件 // ?性能优化:保存处理结? cleanDataKey String? // 清洗/处理后的数据(避免重复计算) // 数据元信息(异步填充? totalRows Int? totalCols Int? columns Json? // 时间? createdAt DateTime updatedAt DateTime expiresAt DateTime @@schema("your_schema") } ``` **关键?*? - ?不要添加 `status`、`progress`、`errorMessage` 等任务管理字? - ?这些字段?pg-boss ?`job` 表管? --- ### 步骤2:Service?- 快速上?推送任? ```typescript // backend/src/modules/your-module/services/YourService.ts import { storage } from '@/common/storage'; import { jobQueue } from '@/common/jobs'; import { prisma } from '@/config/database'; export class YourService { /** * 创建任务并推送到队列(Postgres-Only架构? * * ?Platform-Only 模式? * - 立即上传文件?OSS * - 创建业务记录(元数据为null? * - 推送任务到队列 * - 立即返回(不阻塞请求? */ async createTask(userId: string, fileName: string, fileBuffer: Buffer) { // 1. 验证文件 if (fileBuffer.length > MAX_FILE_SIZE) { throw new Error('文件太大'); } // 2. ?立即上传?OSS?-3秒) const fileKey = `path/${userId}/${Date.now()}-${fileName}`; await storage.upload(fileKey, fileBuffer); // 3. ?创建业务记录(元数据为null,等Worker填充? const record = await prisma.yourTable.create({ data: { userId, fileName, fileKey, // ⚠️ 处理结果字段?null totalRows: null, columns: null, expiresAt: new Date(Date.now() + 10 * 60 * 1000), }, }); // 4. ?推送任务到 pg-boss(Platform-Only? const job = await jobQueue.push('your_module_process', { recordId: record.id, fileKey, userId, }); // 5. ?立即返回(总耗时<3秒) return { ...record, jobId: job.id, // ?返回 jobId 供前端轮? }; } } ``` --- ### 步骤3:Worker?- 后台处理 ```typescript // backend/src/modules/your-module/workers/yourWorker.ts import { jobQueue } from '@/common/jobs'; import { storage } from '@/common/storage'; import { prisma } from '@/config/database'; import { logger } from '@/common/logging'; interface YourJob { recordId: string; fileKey: string; userId: string; } /** * 注册 Worker 到队? */ export function registerYourWorker() { logger.info('[YourWorker] Registering worker'); // ⚠️ 队列名称:只能用字母、数字、下划线、连字符 jobQueue.process('your_module_process', async (job) => { const { recordId, fileKey } = job.data; logger.info('[YourWorker] Processing job', { jobId: job.id, recordId }); try { // 1. ?OSS 下载文件 const buffer = await storage.download(fileKey); // 2. 执行耗时操作(解析、处理、计算) const result = await yourLongTimeProcess(buffer); const { processedData, totalRows, columns } = result; // 3. ?保存处理结果?OSS(避免重复计算) const cleanDataKey = `${fileKey}_clean.json`; const cleanDataBuffer = Buffer.from(JSON.stringify(processedData), 'utf-8'); await storage.upload(cleanDataKey, cleanDataBuffer); logger.info('[YourWorker] Clean data saved', { size: `${(cleanDataBuffer.length / 1024).toFixed(2)} KB` }); // 4. 更新业务记录(填充元数据? await prisma.yourTable.update({ where: { id: recordId }, data: { cleanDataKey, // ?保存 clean data 位置 totalRows, columns, updatedAt: new Date(), }, }); logger.info('[YourWorker] ?Job completed', { jobId: job.id }); return { success: true, recordId, totalRows }; } catch (error: any) { logger.error('[YourWorker] ?Job failed', { jobId: job.id, error: error.message }); throw error; // ?pg-boss 处理重试 } }); logger.info('[YourWorker] ?Worker registered: your_module_process'); } ``` --- ### 步骤4:Controller?- 状态查询API ```typescript // backend/src/modules/your-module/controllers/YourController.ts import { jobQueue } from '@/common/jobs'; export class YourController { /** * 获取任务状态(Platform-Only模式? * * GET /api/v1/your-module/tasks/:id/status * Query: jobId (可? */ async getTaskStatus(request, reply) { const { id: recordId } = request.params; const { jobId } = request.query; // 1. 查询业务记录 const record = await prisma.yourTable.findUnique({ where: { id: recordId } }); if (!record) { return reply.code(404).send({ success: false, error: '记录不存? }); } // 2. 判断状? // - 如果 totalRows 不为 null,说明处理完? // - 否则查询 job 状? if (record.totalRows !== null) { return reply.send({ success: true, data: { recordId, status: 'ready', // ?处理完成 progress: 100, record, }, }); } // 3. 处理中,查询 pg-boss if (!jobId) { return reply.send({ success: true, data: { recordId, status: 'processing', progress: 50, }, }); } // 4. ?pg-boss 查询 job 状? const job = await jobQueue.getJob(jobId); const status = job?.status === 'completed' ? 'ready' : job?.status === 'failed' ? 'error' : 'processing'; const progress = status === 'ready' ? 100 : status === 'error' ? 0 : 70; return reply.send({ success: true, data: { recordId, jobId, status, progress, record, }, }); } } ``` --- ### 步骤5:前?- React Query 轮询 ```typescript // frontend-v2/src/modules/your-module/hooks/useTaskStatus.ts import { useQuery } from '@tanstack/react-query'; import * as api from '../api'; /** * 任务状态轮?Hook * * 特点? * - 自动串行轮询(React Query 内置防并发) * - 自动清理(组件卸载时停止? * - 条件停止(完?失败时自动停止) */ export function useTaskStatus({ recordId, jobId, enabled = true, }) { const { data, isLoading, error } = useQuery({ queryKey: ['taskStatus', recordId, jobId], queryFn: () => api.getTaskStatus(recordId, jobId), enabled: enabled && !!recordId && !!jobId, refetchInterval: (query) => { const status = query.state.data?.data?.status; // ?完成或失败时停止轮询 if (status === 'ready' || status === 'error') { return false; } // ?处理中时?秒轮询(自动串行? return 2000; }, staleTime: 0, // 始终视为过时,确保轮? retry: 1, }); const statusInfo = data?.data; const status = statusInfo?.status || 'processing'; const progress = statusInfo?.progress || 0; return { status, progress, isReady: status === 'ready', isError: status === 'error', isLoading, error, }; } ``` --- ### 步骤6:前端组?- 使用Hook ```typescript // frontend-v2/src/modules/your-module/pages/YourPage.tsx import { useTaskStatus } from '../hooks/useTaskStatus'; const YourPage = () => { const [pollingInfo, setPollingInfo] = useState<{ recordId: string; jobId: string; } | null>(null); // ?使用 React Query Hook 自动轮询 const { status, progress, isReady } = useTaskStatus({ recordId: pollingInfo?.recordId || null, jobId: pollingInfo?.jobId || null, enabled: !!pollingInfo, }); // ?监听状态变? useEffect(() => { if (isReady && pollingInfo) { console.log('?处理完成,加载数?); // 停止轮询 setPollingInfo(null); // 加载数据 loadData(pollingInfo.recordId); } }, [isReady, pollingInfo]); // 上传文件 const handleUpload = async (file) => { const result = await api.uploadFile(file); const { recordId, jobId } = result.data; // ?启动轮询(设置状态,React Query自动开始) setPollingInfo({ recordId, jobId }); }; return (
{/* 进度?*/} {pollingInfo && (
{progress}%
)} {/* 上传按钮 */}
); }; ``` --- ## 🎯 关键技术点 ### 1. 队列名称规范 **错误**? ```typescript ?'asl:screening:batch' // 包含冒号,pg-boss不支? ?'dc.toolc.parse' // 包含点号,不推荐 ``` **正确**? ```typescript ?'asl_screening_batch' // 下划? ?'dc_toolc_parse_excel' // 下划? ``` --- ### 2. Worker注册时机 ```typescript // backend/src/index.ts await jobQueue.start(); // ?必须先启动队? registerYourWorker(); // ?再注?Worker registerOtherWorker(); // ?等待3秒,确保异步注册完成 await new Promise(resolve => setTimeout(resolve, 3000)); logger.info('?All workers registered'); ``` --- ### 3. clean data 缓存机制 **目的**:避免重复计算(性能提升99%? ```typescript // Worker 保存 clean data const cleanDataKey = `${fileKey}_clean.json`; await storage.upload(cleanDataKey, JSON.stringify(processedData)); await prisma.update({ where: { id }, data: { cleanDataKey, // ?记录位置 totalRows, columns, } }); // Service 读取数据(优?clean data? async getFullData(recordId) { const record = await prisma.findUnique({ where: { id: recordId } }); // ?优先读取 clean data?1秒) if (record.cleanDataKey) { const buffer = await storage.download(record.cleanDataKey); return JSON.parse(buffer.toString('utf-8')); } // ⚠️ Fallback:重新解析(兼容旧数据) const buffer = await storage.download(record.fileKey); return parseFile(buffer); } // ⚠️ 重要:操作后要同步更?clean data async saveProcessedData(recordId, newData) { const record = await getRecord(recordId); // 覆盖原文? await storage.upload(record.fileKey, toExcel(newData)); // ?同时更新 clean data if (record.cleanDataKey) { await storage.upload(record.cleanDataKey, JSON.stringify(newData)); } // 更新元数? await prisma.update({ where: { id: recordId }, data: { ... } }); } ``` --- ### 4. React Query 轮询(推荐) **优点**? - ?自动串行(防并发风暴? - ?自动去重(同一queryKey只有一个请求) - ?自动清理(组件卸载时停止? - ?条件停止(动态控制) **不要使用 setInterval**? ```typescript ?const pollInterval = setInterval(() => { api.getStatus(); // 可能并发 }, 2000); ``` --- ## 📊 性能对比 ### DC Tool C 实际数据?339行?51列文件) | 指标 | 同步处理 | 异步处理 | 改善 | |------|---------|---------|------| | **上传耗时** | 47秒(阻塞?| 3秒(立即返回?| ?-94% | | **HTTP超时** | ?经常超时 | ?不会超时 | ?100% | | **getPreviewData** | 43秒(重复解析?| 0.5秒(缓存?| ?-99% | | **getFullData** | 43秒(重复解析?| 0.5秒(缓存?| ?-99% | | **QuickAction操作** | 43?+ Python | 0.5?+ Python | ?-95% | | **并发请求** | 15+?| 1个(串行?| ?-93% | --- ## ⚠️ 常见问题 ### Q1: Worker 注册了但不工作? **检?*? - 队列名称是否包含冒号(`:`)?改为下划线(`_`? - 环境变量 `QUEUE_TYPE=pgboss` 是否设置? - Worker 注册是否?`jobQueue.start()` 之后? ### Q2: 轮询风暴(多个并发请求)? **解决**:使?React Query,不要用 setInterval ### Q3: 导出数据不对(是原始数据)? **原因**:`saveProcessedData` 没有更新 clean data **解决**:同时更?fileKey ?cleanDataKey --- ## 📚 参考实? | 模块 | Worker | 前端Hook | 文档 | |------|--------|---------|------| | **DC Tool C** | `parseExcelWorker.ts` | `useSessionStatus.ts` | 本指南基础 | | **ASL 智能文献** | `screeningWorker.ts` | `useScreeningTask.ts` | [ASL模块状态](../03-业务模块/ASL-AI智能文献/00-模块当前状态与开发指?md) | | **DC Tool B** | `extractionWorker.ts` | - | [DC模块状态](../03-业务模块/DC-数据清洗整理/00-模块当前状态与开发指?md) | --- ## ?检查清? 在实施异步任务前,请确认? - [ ] 业务表只存业务信息(不包?status 等字段) - [ ] 队列名称使用下划线(不含冒号? - [ ] 环境变量 `QUEUE_TYPE=pgboss` 已设? - [ ] Worker ?`jobQueue.start()` 之后注册 - [ ] 前端使用 React Query 轮询 - [ ] Service 优先读取 clean data - [ ] saveProcessedData 同步更新 clean data --- **维护?*: 平台架构团队 **最后更?*: 2025-12-22 **文档状?*: ?已完?