Summary: - Migrate PostgreSQL to pgvector/pgvector:pg15 Docker image - Successfully install and verify pgvector 0.8.1 extension - Create comprehensive Dify-to-pgvector migration plan - Update PKB module documentation with pgvector status - Update system documentation with pgvector integration Key changes: - docker-compose.yml: Switch to pgvector/pgvector:pg15 image - Add EkbDocument and EkbChunk data model design - Design R-C-R-G hybrid retrieval architecture - Add clinical data JSONB fields (pico, studyDesign, regimen, safety, criteria, endpoints) - Create detailed 10-day implementation roadmap Documentation updates: - PKB module status: pgvector RAG infrastructure ready - System status: pgvector 0.8.1 integrated - New: Dify replacement development plan (01-Dify替换为pgvector开发计划.md) - New: Enterprise medical knowledge base solution V2 Tested: PostgreSQL with pgvector verified, frontend and backend functionality confirmed
16 KiB
16 KiB
Postgres-Only 异步任务处理指南
文档版本: v1.0
创建日期: 2025-12-22
维护者: 平台架构团队
适用场景: 长时间任务(>30秒)、大文件处理、后台Worker
参考实现: DC Tool C Excel解析、ASL文献筛选、DC Tool B数据提取
📋 概述
本文档基于 DC Tool C Excel解析功能 的完整实践,总结 Postgres-Only 架构下异步任务处理的标准模式。
核心价值
- ✅ 避免HTTP超时:上传接口3秒返回,解析在后台完成(30-60秒)
- ✅ 用户体验优秀:实时进度反馈,不需要傻等
- ✅ 符合云原生规范:Platform-Only模式,pg-boss队列
- ✅ 性能优化:clean data缓存,避免重复计算(-99%耗时)
🏗️ 架构设计
三层架构
┌─────────────────────────────────────────────────────────┐
│ 前端层(React + React Query) │
│ - 上传文件(立即返回 sessionId + jobId) │
│ - 轮询状态(useQuery + refetchInterval,自动串行) │
│ - 监听 status='ready',加载数据 │
└─────────────────────────────────────────────────────────┘
↓ HTTP
┌─────────────────────────────────────────────────────────┐
│ 后端层(Fastify + Prisma) │
│ - 快速上传到 OSS(2-3秒) │
│ - 创建 Session(状态:processing) │
│ - 推送任务到 pg-boss(立即返回) │
│ - 提供状态查询 API │
└─────────────────────────────────────────────────────────┘
↓ pg-boss
┌─────────────────────────────────────────────────────────┐
│ Worker层(pg-boss + Platform层) │
│ - 从队列取任务(自动串行) │
│ - 执行耗时操作(解析、清洗、统计) │
│ - 保存结果(clean data 到 OSS) │
│ - 更新 Session(填充元数据) │
└─────────────────────────────────────────────────────────┘
🚀 完整实施步骤
步骤1:数据库Schema设计
// 业务表只存业务信息,不存任务管理信息
model YourBusinessTable {
id String @id
userId String
fileKey String // OSS原始文件
// ✅ 性能优化:保存处理结果
cleanDataKey String? // 清洗/处理后的数据(避免重复计算)
// 数据元信息(异步填充)
totalRows Int?
totalCols Int?
columns Json?
// 时间戳
createdAt DateTime
updatedAt DateTime
expiresAt DateTime
@@schema("your_schema")
}
关键点:
- ❌ 不要添加
status、progress、errorMessage等任务管理字段 - ✅ 这些字段由 pg-boss 的
job表管理
步骤2:Service层 - 快速上传+推送任务
// backend/src/modules/your-module/services/YourService.ts
import { storage } from '@/common/storage';
import { jobQueue } from '@/common/jobs';
import { prisma } from '@/config/database';
export class YourService {
/**
* 创建任务并推送到队列(Postgres-Only架构)
*
* ✅ Platform-Only 模式:
* - 立即上传文件到 OSS
* - 创建业务记录(元数据为null)
* - 推送任务到队列
* - 立即返回(不阻塞请求)
*/
async createTask(userId: string, fileName: string, fileBuffer: Buffer) {
// 1. 验证文件
if (fileBuffer.length > MAX_FILE_SIZE) {
throw new Error('文件太大');
}
// 2. ⚡ 立即上传到 OSS(2-3秒)
const fileKey = `path/${userId}/${Date.now()}-${fileName}`;
await storage.upload(fileKey, fileBuffer);
// 3. ⚡ 创建业务记录(元数据为null,等Worker填充)
const record = await prisma.yourTable.create({
data: {
userId,
fileName,
fileKey,
// ⚠️ 处理结果字段为 null
totalRows: null,
columns: null,
expiresAt: new Date(Date.now() + 10 * 60 * 1000),
},
});
// 4. ⚡ 推送任务到 pg-boss(Platform-Only)
const job = await jobQueue.push('your_module_process', {
recordId: record.id,
fileKey,
userId,
});
// 5. ⚡ 立即返回(总耗时<3秒)
return {
...record,
jobId: job.id, // ✅ 返回 jobId 供前端轮询
};
}
}
步骤3:Worker层 - 后台处理
// backend/src/modules/your-module/workers/yourWorker.ts
import { jobQueue } from '@/common/jobs';
import { storage } from '@/common/storage';
import { prisma } from '@/config/database';
import { logger } from '@/common/logging';
interface YourJob {
recordId: string;
fileKey: string;
userId: string;
}
/**
* 注册 Worker 到队列
*/
export function registerYourWorker() {
logger.info('[YourWorker] Registering worker');
// ⚠️ 队列名称:只能用字母、数字、下划线、连字符
jobQueue.process<YourJob>('your_module_process', async (job) => {
const { recordId, fileKey } = job.data;
logger.info('[YourWorker] Processing job', { jobId: job.id, recordId });
try {
// 1. 从 OSS 下载文件
const buffer = await storage.download(fileKey);
// 2. 执行耗时操作(解析、处理、计算)
const result = await yourLongTimeProcess(buffer);
const { processedData, totalRows, columns } = result;
// 3. ✅ 保存处理结果到 OSS(避免重复计算)
const cleanDataKey = `${fileKey}_clean.json`;
const cleanDataBuffer = Buffer.from(JSON.stringify(processedData), 'utf-8');
await storage.upload(cleanDataKey, cleanDataBuffer);
logger.info('[YourWorker] Clean data saved', {
size: `${(cleanDataBuffer.length / 1024).toFixed(2)} KB`
});
// 4. 更新业务记录(填充元数据)
await prisma.yourTable.update({
where: { id: recordId },
data: {
cleanDataKey, // ✅ 保存 clean data 位置
totalRows,
columns,
updatedAt: new Date(),
},
});
logger.info('[YourWorker] ✅ Job completed', { jobId: job.id });
return { success: true, recordId, totalRows };
} catch (error: any) {
logger.error('[YourWorker] ❌ Job failed', {
jobId: job.id,
error: error.message
});
throw error; // 让 pg-boss 处理重试
}
});
logger.info('[YourWorker] ✅ Worker registered: your_module_process');
}
步骤4:Controller层 - 状态查询API
// backend/src/modules/your-module/controllers/YourController.ts
import { jobQueue } from '@/common/jobs';
export class YourController {
/**
* 获取任务状态(Platform-Only模式)
*
* GET /api/v1/your-module/tasks/:id/status
* Query: jobId (可选)
*/
async getTaskStatus(request, reply) {
const { id: recordId } = request.params;
const { jobId } = request.query;
// 1. 查询业务记录
const record = await prisma.yourTable.findUnique({
where: { id: recordId }
});
if (!record) {
return reply.code(404).send({ success: false, error: '记录不存在' });
}
// 2. 判断状态
// - 如果 totalRows 不为 null,说明处理完成
// - 否则查询 job 状态
if (record.totalRows !== null) {
return reply.send({
success: true,
data: {
recordId,
status: 'ready', // ✅ 处理完成
progress: 100,
record,
},
});
}
// 3. 处理中,查询 pg-boss
if (!jobId) {
return reply.send({
success: true,
data: {
recordId,
status: 'processing',
progress: 50,
},
});
}
// 4. 从 pg-boss 查询 job 状态
const job = await jobQueue.getJob(jobId);
const status = job?.status === 'completed' ? 'ready' :
job?.status === 'failed' ? 'error' : 'processing';
const progress = status === 'ready' ? 100 :
status === 'error' ? 0 : 70;
return reply.send({
success: true,
data: {
recordId,
jobId,
status,
progress,
record,
},
});
}
}
步骤5:前端 - React Query 轮询
// frontend-v2/src/modules/your-module/hooks/useTaskStatus.ts
import { useQuery } from '@tanstack/react-query';
import * as api from '../api';
/**
* 任务状态轮询 Hook
*
* 特点:
* - 自动串行轮询(React Query 内置防并发)
* - 自动清理(组件卸载时停止)
* - 条件停止(完成/失败时自动停止)
*/
export function useTaskStatus({
recordId,
jobId,
enabled = true,
}) {
const { data, isLoading, error } = useQuery({
queryKey: ['taskStatus', recordId, jobId],
queryFn: () => api.getTaskStatus(recordId, jobId),
enabled: enabled && !!recordId && !!jobId,
refetchInterval: (query) => {
const status = query.state.data?.data?.status;
// ✅ 完成或失败时停止轮询
if (status === 'ready' || status === 'error') {
return false;
}
// ✅ 处理中时每2秒轮询(自动串行)
return 2000;
},
staleTime: 0, // 始终视为过时,确保轮询
retry: 1,
});
const statusInfo = data?.data;
const status = statusInfo?.status || 'processing';
const progress = statusInfo?.progress || 0;
return {
status,
progress,
isReady: status === 'ready',
isError: status === 'error',
isLoading,
error,
};
}
步骤6:前端组件 - 使用Hook
// frontend-v2/src/modules/your-module/pages/YourPage.tsx
import { useTaskStatus } from '../hooks/useTaskStatus';
const YourPage = () => {
const [pollingInfo, setPollingInfo] = useState<{
recordId: string;
jobId: string;
} | null>(null);
// ✅ 使用 React Query Hook 自动轮询
const { status, progress, isReady } = useTaskStatus({
recordId: pollingInfo?.recordId || null,
jobId: pollingInfo?.jobId || null,
enabled: !!pollingInfo,
});
// ✅ 监听状态变化
useEffect(() => {
if (isReady && pollingInfo) {
console.log('✅ 处理完成,加载数据');
// 停止轮询
setPollingInfo(null);
// 加载数据
loadData(pollingInfo.recordId);
}
}, [isReady, pollingInfo]);
// 上传文件
const handleUpload = async (file) => {
const result = await api.uploadFile(file);
const { recordId, jobId } = result.data;
// ✅ 启动轮询(设置状态,React Query自动开始)
setPollingInfo({ recordId, jobId });
};
return (
<div>
{/* 进度条 */}
{pollingInfo && (
<div className="progress-bar">
<div style={{ width: `${progress}%` }} />
<span>{progress}%</span>
</div>
)}
{/* 上传按钮 */}
<button onClick={() => handleUpload(file)}>上传</button>
</div>
);
};
🎯 关键技术点
1. 队列名称规范
错误:
❌ 'asl:screening:batch' // 包含冒号,pg-boss不支持
❌ 'dc.toolc.parse' // 包含点号,不推荐
正确:
✅ 'asl_screening_batch' // 下划线
✅ 'dc_toolc_parse_excel' // 下划线
2. Worker注册时机
// backend/src/index.ts
await jobQueue.start(); // ← 必须先启动队列
registerYourWorker(); // ← 再注册 Worker
registerOtherWorker();
// ✅ 等待3秒,确保异步注册完成
await new Promise(resolve => setTimeout(resolve, 3000));
logger.info('✅ All workers registered');
3. clean data 缓存机制
目的:避免重复计算(性能提升99%)
// Worker 保存 clean data
const cleanDataKey = `${fileKey}_clean.json`;
await storage.upload(cleanDataKey, JSON.stringify(processedData));
await prisma.update({
where: { id },
data: {
cleanDataKey, // ← 记录位置
totalRows,
columns,
}
});
// Service 读取数据(优先 clean data)
async getFullData(recordId) {
const record = await prisma.findUnique({ where: { id: recordId } });
// ✅ 优先读取 clean data(<1秒)
if (record.cleanDataKey) {
const buffer = await storage.download(record.cleanDataKey);
return JSON.parse(buffer.toString('utf-8'));
}
// ⚠️ Fallback:重新解析(兼容旧数据)
const buffer = await storage.download(record.fileKey);
return parseFile(buffer);
}
// ⚠️ 重要:操作后要同步更新 clean data
async saveProcessedData(recordId, newData) {
const record = await getRecord(recordId);
// 覆盖原文件
await storage.upload(record.fileKey, toExcel(newData));
// ✅ 同时更新 clean data
if (record.cleanDataKey) {
await storage.upload(record.cleanDataKey, JSON.stringify(newData));
}
// 更新元数据
await prisma.update({ where: { id: recordId }, data: { ... } });
}
4. React Query 轮询(推荐)
优点:
- ✅ 自动串行(防并发风暴)
- ✅ 自动去重(同一queryKey只有一个请求)
- ✅ 自动清理(组件卸载时停止)
- ✅ 条件停止(动态控制)
不要使用 setInterval:
❌ const pollInterval = setInterval(() => {
api.getStatus(); // 可能并发
}, 2000);
📊 性能对比
DC Tool C 实际数据(3339行×151列文件)
| 指标 | 同步处理 | 异步处理 | 改善 |
|---|---|---|---|
| 上传耗时 | 47秒(阻塞) | 3秒(立即返回) | ✅ -94% |
| HTTP超时 | ❌ 经常超时 | ✅ 不会超时 | ✅ 100% |
| getPreviewData | 43秒(重复解析) | 0.5秒(缓存) | ✅ -99% |
| getFullData | 43秒(重复解析) | 0.5秒(缓存) | ✅ -99% |
| QuickAction操作 | 43秒 + Python | 0.5秒 + Python | ✅ -95% |
| 并发请求 | 15+个 | 1个(串行) | ✅ -93% |
⚠️ 常见问题
Q1: Worker 注册了但不工作?
检查:
- 队列名称是否包含冒号(
:)?改为下划线(_) - 环境变量
QUEUE_TYPE=pgboss是否设置? - Worker 注册是否在
jobQueue.start()之后?
Q2: 轮询风暴(多个并发请求)?
解决:使用 React Query,不要用 setInterval
Q3: 导出数据不对(是原始数据)?
原因:saveProcessedData 没有更新 clean data
解决:同时更新 fileKey 和 cleanDataKey
📚 参考实现
| 模块 | Worker | 前端Hook | 文档 |
|---|---|---|---|
| DC Tool C | parseExcelWorker.ts |
useSessionStatus.ts |
本指南基础 |
| ASL 智能文献 | screeningWorker.ts |
useScreeningTask.ts |
ASL模块状态 |
| DC Tool B | extractionWorker.ts |
- | DC模块状态 |
✅ 检查清单
在实施异步任务前,请确认:
- 业务表只存业务信息(不包含 status 等字段)
- 队列名称使用下划线(不含冒号)
- 环境变量
QUEUE_TYPE=pgboss已设置 - Worker 在
jobQueue.start()之后注册 - 前端使用 React Query 轮询
- Service 优先读取 clean data
- saveProcessedData 同步更新 clean data
维护者: 平台架构团队
最后更新: 2025-12-22
文档状态: ✅ 已完成