Fixed issues: - Remove simulateUpload function from DashboardPage Step 3 - Map department to description field when creating KB - Add upload modal in WorkspacePage knowledge assets tab - Fix DocumentUpload import path (../../stores to ../stores) Known issue: Dify API validation error during document upload (file uploaded but DB record failed, needs investigation) Testing: KB creation works, upload dialog opens correctly
618 lines
16 KiB
Markdown
618 lines
16 KiB
Markdown
# Postgres-Only 异步任务处理指南
|
||
|
||
> **文档版本:** v1.0
|
||
> **创建日期:** 2025-12-22
|
||
> **维护者:** 平台架构团队
|
||
> **适用场景:** 长时间任务(>30秒)、大文件处理、后台Worker
|
||
> **参考实现:** DC Tool C Excel解析、ASL文献筛选、DC Tool B数据提取
|
||
|
||
---
|
||
|
||
## 📋 概述
|
||
|
||
本文档基于 **DC Tool C Excel解析功能** 的完整实践,总结 Postgres-Only 架构下异步任务处理的标准模式。
|
||
|
||
### 核心价值
|
||
|
||
1. ✅ **避免HTTP超时**:上传接口3秒返回,解析在后台完成(30-60秒)
|
||
2. ✅ **用户体验优秀**:实时进度反馈,不需要傻等
|
||
3. ✅ **符合云原生规范**:Platform-Only模式,pg-boss队列
|
||
4. ✅ **性能优化**:clean data缓存,避免重复计算(-99%耗时)
|
||
|
||
---
|
||
|
||
## 🏗️ 架构设计
|
||
|
||
### 三层架构
|
||
|
||
```
|
||
┌─────────────────────────────────────────────────────────┐
|
||
│ 前端层(React + React Query) │
|
||
│ - 上传文件(立即返回 sessionId + jobId) │
|
||
│ - 轮询状态(useQuery + refetchInterval,自动串行) │
|
||
│ - 监听 status='ready',加载数据 │
|
||
└─────────────────────────────────────────────────────────┘
|
||
↓ HTTP
|
||
┌─────────────────────────────────────────────────────────┐
|
||
│ 后端层(Fastify + Prisma) │
|
||
│ - 快速上传到 OSS(2-3秒) │
|
||
│ - 创建 Session(状态:processing) │
|
||
│ - 推送任务到 pg-boss(立即返回) │
|
||
│ - 提供状态查询 API │
|
||
└─────────────────────────────────────────────────────────┘
|
||
↓ pg-boss
|
||
┌─────────────────────────────────────────────────────────┐
|
||
│ Worker层(pg-boss + Platform层) │
|
||
│ - 从队列取任务(自动串行) │
|
||
│ - 执行耗时操作(解析、清洗、统计) │
|
||
│ - 保存结果(clean data 到 OSS) │
|
||
│ - 更新 Session(填充元数据) │
|
||
└─────────────────────────────────────────────────────────┘
|
||
```
|
||
|
||
---
|
||
|
||
## 🚀 完整实施步骤
|
||
|
||
### 步骤1:数据库Schema设计
|
||
|
||
```prisma
|
||
// 业务表只存业务信息,不存任务管理信息
|
||
model YourBusinessTable {
|
||
id String @id
|
||
userId String
|
||
fileKey String // OSS原始文件
|
||
|
||
// ✅ 性能优化:保存处理结果
|
||
cleanDataKey String? // 清洗/处理后的数据(避免重复计算)
|
||
|
||
// 数据元信息(异步填充)
|
||
totalRows Int?
|
||
totalCols Int?
|
||
columns Json?
|
||
|
||
// 时间戳
|
||
createdAt DateTime
|
||
updatedAt DateTime
|
||
expiresAt DateTime
|
||
|
||
@@schema("your_schema")
|
||
}
|
||
```
|
||
|
||
**关键点**:
|
||
- ❌ 不要添加 `status`、`progress`、`errorMessage` 等任务管理字段
|
||
- ✅ 这些字段由 pg-boss 的 `job` 表管理
|
||
|
||
---
|
||
|
||
### 步骤2:Service层 - 快速上传+推送任务
|
||
|
||
```typescript
|
||
// backend/src/modules/your-module/services/YourService.ts
|
||
|
||
import { storage } from '@/common/storage';
|
||
import { jobQueue } from '@/common/jobs';
|
||
import { prisma } from '@/config/database';
|
||
|
||
export class YourService {
|
||
/**
|
||
* 创建任务并推送到队列(Postgres-Only架构)
|
||
*
|
||
* ✅ Platform-Only 模式:
|
||
* - 立即上传文件到 OSS
|
||
* - 创建业务记录(元数据为null)
|
||
* - 推送任务到队列
|
||
* - 立即返回(不阻塞请求)
|
||
*/
|
||
async createTask(userId: string, fileName: string, fileBuffer: Buffer) {
|
||
// 1. 验证文件
|
||
if (fileBuffer.length > MAX_FILE_SIZE) {
|
||
throw new Error('文件太大');
|
||
}
|
||
|
||
// 2. ⚡ 立即上传到 OSS(2-3秒)
|
||
const fileKey = `path/${userId}/${Date.now()}-${fileName}`;
|
||
await storage.upload(fileKey, fileBuffer);
|
||
|
||
// 3. ⚡ 创建业务记录(元数据为null,等Worker填充)
|
||
const record = await prisma.yourTable.create({
|
||
data: {
|
||
userId,
|
||
fileName,
|
||
fileKey,
|
||
// ⚠️ 处理结果字段为 null
|
||
totalRows: null,
|
||
columns: null,
|
||
expiresAt: new Date(Date.now() + 10 * 60 * 1000),
|
||
},
|
||
});
|
||
|
||
// 4. ⚡ 推送任务到 pg-boss(Platform-Only)
|
||
const job = await jobQueue.push('your_module_process', {
|
||
recordId: record.id,
|
||
fileKey,
|
||
userId,
|
||
});
|
||
|
||
// 5. ⚡ 立即返回(总耗时<3秒)
|
||
return {
|
||
...record,
|
||
jobId: job.id, // ✅ 返回 jobId 供前端轮询
|
||
};
|
||
}
|
||
}
|
||
```
|
||
|
||
---
|
||
|
||
### 步骤3:Worker层 - 后台处理
|
||
|
||
```typescript
|
||
// backend/src/modules/your-module/workers/yourWorker.ts
|
||
|
||
import { jobQueue } from '@/common/jobs';
|
||
import { storage } from '@/common/storage';
|
||
import { prisma } from '@/config/database';
|
||
import { logger } from '@/common/logging';
|
||
|
||
interface YourJob {
|
||
recordId: string;
|
||
fileKey: string;
|
||
userId: string;
|
||
}
|
||
|
||
/**
|
||
* 注册 Worker 到队列
|
||
*/
|
||
export function registerYourWorker() {
|
||
logger.info('[YourWorker] Registering worker');
|
||
|
||
// ⚠️ 队列名称:只能用字母、数字、下划线、连字符
|
||
jobQueue.process<YourJob>('your_module_process', async (job) => {
|
||
const { recordId, fileKey } = job.data;
|
||
|
||
logger.info('[YourWorker] Processing job', { jobId: job.id, recordId });
|
||
|
||
try {
|
||
// 1. 从 OSS 下载文件
|
||
const buffer = await storage.download(fileKey);
|
||
|
||
// 2. 执行耗时操作(解析、处理、计算)
|
||
const result = await yourLongTimeProcess(buffer);
|
||
const { processedData, totalRows, columns } = result;
|
||
|
||
// 3. ✅ 保存处理结果到 OSS(避免重复计算)
|
||
const cleanDataKey = `${fileKey}_clean.json`;
|
||
const cleanDataBuffer = Buffer.from(JSON.stringify(processedData), 'utf-8');
|
||
await storage.upload(cleanDataKey, cleanDataBuffer);
|
||
|
||
logger.info('[YourWorker] Clean data saved', {
|
||
size: `${(cleanDataBuffer.length / 1024).toFixed(2)} KB`
|
||
});
|
||
|
||
// 4. 更新业务记录(填充元数据)
|
||
await prisma.yourTable.update({
|
||
where: { id: recordId },
|
||
data: {
|
||
cleanDataKey, // ✅ 保存 clean data 位置
|
||
totalRows,
|
||
columns,
|
||
updatedAt: new Date(),
|
||
},
|
||
});
|
||
|
||
logger.info('[YourWorker] ✅ Job completed', { jobId: job.id });
|
||
|
||
return { success: true, recordId, totalRows };
|
||
} catch (error: any) {
|
||
logger.error('[YourWorker] ❌ Job failed', {
|
||
jobId: job.id,
|
||
error: error.message
|
||
});
|
||
throw error; // 让 pg-boss 处理重试
|
||
}
|
||
});
|
||
|
||
logger.info('[YourWorker] ✅ Worker registered: your_module_process');
|
||
}
|
||
```
|
||
|
||
---
|
||
|
||
### 步骤4:Controller层 - 状态查询API
|
||
|
||
```typescript
|
||
// backend/src/modules/your-module/controllers/YourController.ts
|
||
|
||
import { jobQueue } from '@/common/jobs';
|
||
|
||
export class YourController {
|
||
/**
|
||
* 获取任务状态(Platform-Only模式)
|
||
*
|
||
* GET /api/v1/your-module/tasks/:id/status
|
||
* Query: jobId (可选)
|
||
*/
|
||
async getTaskStatus(request, reply) {
|
||
const { id: recordId } = request.params;
|
||
const { jobId } = request.query;
|
||
|
||
// 1. 查询业务记录
|
||
const record = await prisma.yourTable.findUnique({
|
||
where: { id: recordId }
|
||
});
|
||
|
||
if (!record) {
|
||
return reply.code(404).send({ success: false, error: '记录不存在' });
|
||
}
|
||
|
||
// 2. 判断状态
|
||
// - 如果 totalRows 不为 null,说明处理完成
|
||
// - 否则查询 job 状态
|
||
if (record.totalRows !== null) {
|
||
return reply.send({
|
||
success: true,
|
||
data: {
|
||
recordId,
|
||
status: 'ready', // ✅ 处理完成
|
||
progress: 100,
|
||
record,
|
||
},
|
||
});
|
||
}
|
||
|
||
// 3. 处理中,查询 pg-boss
|
||
if (!jobId) {
|
||
return reply.send({
|
||
success: true,
|
||
data: {
|
||
recordId,
|
||
status: 'processing',
|
||
progress: 50,
|
||
},
|
||
});
|
||
}
|
||
|
||
// 4. 从 pg-boss 查询 job 状态
|
||
const job = await jobQueue.getJob(jobId);
|
||
|
||
const status = job?.status === 'completed' ? 'ready' :
|
||
job?.status === 'failed' ? 'error' : 'processing';
|
||
|
||
const progress = status === 'ready' ? 100 :
|
||
status === 'error' ? 0 : 70;
|
||
|
||
return reply.send({
|
||
success: true,
|
||
data: {
|
||
recordId,
|
||
jobId,
|
||
status,
|
||
progress,
|
||
record,
|
||
},
|
||
});
|
||
}
|
||
}
|
||
```
|
||
|
||
---
|
||
|
||
### 步骤5:前端 - React Query 轮询
|
||
|
||
```typescript
|
||
// frontend-v2/src/modules/your-module/hooks/useTaskStatus.ts
|
||
|
||
import { useQuery } from '@tanstack/react-query';
|
||
import * as api from '../api';
|
||
|
||
/**
|
||
* 任务状态轮询 Hook
|
||
*
|
||
* 特点:
|
||
* - 自动串行轮询(React Query 内置防并发)
|
||
* - 自动清理(组件卸载时停止)
|
||
* - 条件停止(完成/失败时自动停止)
|
||
*/
|
||
export function useTaskStatus({
|
||
recordId,
|
||
jobId,
|
||
enabled = true,
|
||
}) {
|
||
const { data, isLoading, error } = useQuery({
|
||
queryKey: ['taskStatus', recordId, jobId],
|
||
queryFn: () => api.getTaskStatus(recordId, jobId),
|
||
enabled: enabled && !!recordId && !!jobId,
|
||
refetchInterval: (query) => {
|
||
const status = query.state.data?.data?.status;
|
||
|
||
// ✅ 完成或失败时停止轮询
|
||
if (status === 'ready' || status === 'error') {
|
||
return false;
|
||
}
|
||
|
||
// ✅ 处理中时每2秒轮询(自动串行)
|
||
return 2000;
|
||
},
|
||
staleTime: 0, // 始终视为过时,确保轮询
|
||
retry: 1,
|
||
});
|
||
|
||
const statusInfo = data?.data;
|
||
const status = statusInfo?.status || 'processing';
|
||
const progress = statusInfo?.progress || 0;
|
||
|
||
return {
|
||
status,
|
||
progress,
|
||
isReady: status === 'ready',
|
||
isError: status === 'error',
|
||
isLoading,
|
||
error,
|
||
};
|
||
}
|
||
```
|
||
|
||
---
|
||
|
||
### 步骤6:前端组件 - 使用Hook
|
||
|
||
```typescript
|
||
// frontend-v2/src/modules/your-module/pages/YourPage.tsx
|
||
|
||
import { useTaskStatus } from '../hooks/useTaskStatus';
|
||
|
||
const YourPage = () => {
|
||
const [pollingInfo, setPollingInfo] = useState<{
|
||
recordId: string;
|
||
jobId: string;
|
||
} | null>(null);
|
||
|
||
// ✅ 使用 React Query Hook 自动轮询
|
||
const { status, progress, isReady } = useTaskStatus({
|
||
recordId: pollingInfo?.recordId || null,
|
||
jobId: pollingInfo?.jobId || null,
|
||
enabled: !!pollingInfo,
|
||
});
|
||
|
||
// ✅ 监听状态变化
|
||
useEffect(() => {
|
||
if (isReady && pollingInfo) {
|
||
console.log('✅ 处理完成,加载数据');
|
||
|
||
// 停止轮询
|
||
setPollingInfo(null);
|
||
|
||
// 加载数据
|
||
loadData(pollingInfo.recordId);
|
||
}
|
||
}, [isReady, pollingInfo]);
|
||
|
||
// 上传文件
|
||
const handleUpload = async (file) => {
|
||
const result = await api.uploadFile(file);
|
||
const { recordId, jobId } = result.data;
|
||
|
||
// ✅ 启动轮询(设置状态,React Query自动开始)
|
||
setPollingInfo({ recordId, jobId });
|
||
};
|
||
|
||
return (
|
||
<div>
|
||
{/* 进度条 */}
|
||
{pollingInfo && (
|
||
<div className="progress-bar">
|
||
<div style={{ width: `${progress}%` }} />
|
||
<span>{progress}%</span>
|
||
</div>
|
||
)}
|
||
|
||
{/* 上传按钮 */}
|
||
<button onClick={() => handleUpload(file)}>上传</button>
|
||
</div>
|
||
);
|
||
};
|
||
```
|
||
|
||
---
|
||
|
||
## 🎯 关键技术点
|
||
|
||
### 1. 队列名称规范
|
||
|
||
**错误**:
|
||
```typescript
|
||
❌ 'asl:screening:batch' // 包含冒号,pg-boss不支持
|
||
❌ 'dc.toolc.parse' // 包含点号,不推荐
|
||
```
|
||
|
||
**正确**:
|
||
```typescript
|
||
✅ 'asl_screening_batch' // 下划线
|
||
✅ 'dc_toolc_parse_excel' // 下划线
|
||
```
|
||
|
||
---
|
||
|
||
### 2. Worker注册时机
|
||
|
||
```typescript
|
||
// backend/src/index.ts
|
||
|
||
await jobQueue.start(); // ← 必须先启动队列
|
||
|
||
registerYourWorker(); // ← 再注册 Worker
|
||
registerOtherWorker();
|
||
|
||
// ✅ 等待3秒,确保异步注册完成
|
||
await new Promise(resolve => setTimeout(resolve, 3000));
|
||
|
||
logger.info('✅ All workers registered');
|
||
```
|
||
|
||
---
|
||
|
||
### 3. clean data 缓存机制
|
||
|
||
**目的**:避免重复计算(性能提升99%)
|
||
|
||
```typescript
|
||
// Worker 保存 clean data
|
||
const cleanDataKey = `${fileKey}_clean.json`;
|
||
await storage.upload(cleanDataKey, JSON.stringify(processedData));
|
||
|
||
await prisma.update({
|
||
where: { id },
|
||
data: {
|
||
cleanDataKey, // ← 记录位置
|
||
totalRows,
|
||
columns,
|
||
}
|
||
});
|
||
|
||
// Service 读取数据(优先 clean data)
|
||
async getFullData(recordId) {
|
||
const record = await prisma.findUnique({ where: { id: recordId } });
|
||
|
||
// ✅ 优先读取 clean data(<1秒)
|
||
if (record.cleanDataKey) {
|
||
const buffer = await storage.download(record.cleanDataKey);
|
||
return JSON.parse(buffer.toString('utf-8'));
|
||
}
|
||
|
||
// ⚠️ Fallback:重新解析(兼容旧数据)
|
||
const buffer = await storage.download(record.fileKey);
|
||
return parseFile(buffer);
|
||
}
|
||
|
||
// ⚠️ 重要:操作后要同步更新 clean data
|
||
async saveProcessedData(recordId, newData) {
|
||
const record = await getRecord(recordId);
|
||
|
||
// 覆盖原文件
|
||
await storage.upload(record.fileKey, toExcel(newData));
|
||
|
||
// ✅ 同时更新 clean data
|
||
if (record.cleanDataKey) {
|
||
await storage.upload(record.cleanDataKey, JSON.stringify(newData));
|
||
}
|
||
|
||
// 更新元数据
|
||
await prisma.update({ where: { id: recordId }, data: { ... } });
|
||
}
|
||
```
|
||
|
||
---
|
||
|
||
### 4. React Query 轮询(推荐)
|
||
|
||
**优点**:
|
||
- ✅ 自动串行(防并发风暴)
|
||
- ✅ 自动去重(同一queryKey只有一个请求)
|
||
- ✅ 自动清理(组件卸载时停止)
|
||
- ✅ 条件停止(动态控制)
|
||
|
||
**不要使用 setInterval**:
|
||
```typescript
|
||
❌ const pollInterval = setInterval(() => {
|
||
api.getStatus(); // 可能并发
|
||
}, 2000);
|
||
```
|
||
|
||
---
|
||
|
||
## 📊 性能对比
|
||
|
||
### DC Tool C 实际数据(3339行×151列文件)
|
||
|
||
| 指标 | 同步处理 | 异步处理 | 改善 |
|
||
|------|---------|---------|------|
|
||
| **上传耗时** | 47秒(阻塞) | 3秒(立即返回) | ✅ -94% |
|
||
| **HTTP超时** | ❌ 经常超时 | ✅ 不会超时 | ✅ 100% |
|
||
| **getPreviewData** | 43秒(重复解析) | 0.5秒(缓存) | ✅ -99% |
|
||
| **getFullData** | 43秒(重复解析) | 0.5秒(缓存) | ✅ -99% |
|
||
| **QuickAction操作** | 43秒 + Python | 0.5秒 + Python | ✅ -95% |
|
||
| **并发请求** | 15+个 | 1个(串行) | ✅ -93% |
|
||
|
||
---
|
||
|
||
## ⚠️ 常见问题
|
||
|
||
### Q1: Worker 注册了但不工作?
|
||
|
||
**检查**:
|
||
- 队列名称是否包含冒号(`:`)?改为下划线(`_`)
|
||
- 环境变量 `QUEUE_TYPE=pgboss` 是否设置?
|
||
- Worker 注册是否在 `jobQueue.start()` 之后?
|
||
|
||
### Q2: 轮询风暴(多个并发请求)?
|
||
|
||
**解决**:使用 React Query,不要用 setInterval
|
||
|
||
### Q3: 导出数据不对(是原始数据)?
|
||
|
||
**原因**:`saveProcessedData` 没有更新 clean data
|
||
**解决**:同时更新 fileKey 和 cleanDataKey
|
||
|
||
---
|
||
|
||
## 📚 参考实现
|
||
|
||
| 模块 | Worker | 前端Hook | 文档 |
|
||
|------|--------|---------|------|
|
||
| **DC Tool C** | `parseExcelWorker.ts` | `useSessionStatus.ts` | 本指南基础 |
|
||
| **ASL 智能文献** | `screeningWorker.ts` | `useScreeningTask.ts` | [ASL模块状态](../03-业务模块/ASL-AI智能文献/00-模块当前状态与开发指南.md) |
|
||
| **DC Tool B** | `extractionWorker.ts` | - | [DC模块状态](../03-业务模块/DC-数据清洗整理/00-模块当前状态与开发指南.md) |
|
||
|
||
---
|
||
|
||
## ✅ 检查清单
|
||
|
||
在实施异步任务前,请确认:
|
||
|
||
- [ ] 业务表只存业务信息(不包含 status 等字段)
|
||
- [ ] 队列名称使用下划线(不含冒号)
|
||
- [ ] 环境变量 `QUEUE_TYPE=pgboss` 已设置
|
||
- [ ] Worker 在 `jobQueue.start()` 之后注册
|
||
- [ ] 前端使用 React Query 轮询
|
||
- [ ] Service 优先读取 clean data
|
||
- [ ] saveProcessedData 同步更新 clean data
|
||
|
||
---
|
||
|
||
**维护者**: 平台架构团队
|
||
**最后更新**: 2025-12-22
|
||
**文档状态**: ✅ 已完成
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|