Files
AIclinicalresearch/docs/02-通用能力层/Postgres-Only异步任务处理指南.md
HaHafeng decff0bb1f docs(deploy): Complete full system deployment to Aliyun SAE
Summary:
- Successfully deployed complete system to Aliyun SAE (2025-12-25)
- All services running: Python microservice + Node.js backend + Frontend Nginx + CLB
- Public access available at http://8.140.53.236/

Major Achievements:
1. Python microservice deployed (v1.0, internal IP: 172.17.173.66:8000)
2. Node.js backend deployed (v1.3, internal IP: 172.17.173.73:3001)
   - Fixed 4 critical issues: bash path, config directory, pino-pretty, ES Module
3. Frontend Nginx deployed (v1.0, internal IP: 172.17.173.72:80)
4. CLB load balancer configured (public IP: 8.140.53.236)

New Documentation (9 docs):
- 11-Node.js backend SAE deployment config checklist (21 env vars)
- 12-Node.js backend SAE deployment operation manual
- 13-Node.js backend image fix record (config directory)
- 14-Node.js backend pino-pretty fix
- 15-Node.js backend deployment success summary
- 16-Frontend Nginx deployment success summary
- 17-Complete deployment practical manual 2025 edition (1800 lines)
- 18-Deployment documentation usage guide
- 19-Daily update quick operation manual (670 lines)

Key Fixes:
- Environment variable name correction: EXTRACTION_SERVICE_URL (not PYTHON_SERVICE_URL)
- Dockerfile fix: added COPY config ./config
- Logger configuration: conditional pino-pretty for dev only
- Health check fix: ES Module compatibility (require -> import)

Updated Files:
- System status document updated with full deployment info
- Deployment progress overview updated with latest IPs
- All 3 Docker services' Dockerfiles and configs refined

Verification:
- All health checks passed
- Tool C 7 features working correctly
- Literature screening module functional
- Response time < 1 second

BREAKING CHANGE: Node.js backend internal IP changed from 172.17.173.71 to 172.17.173.73

Closes #deployment-milestone
2025-12-25 21:24:37 +08:00

593 lines
16 KiB
Markdown
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# Postgres-Only 异步任务处理指南
> **文档版本:** v1.0
> **创建日期:** 2025-12-22
> **维护者:** 平台架构团队
> **适用场景:** 长时间任务(>30秒、大文件处理、后台Worker
> **参考实现:** DC Tool C Excel解析、ASL文献筛选、DC Tool B数据提取
---
## 📋 概述
本文档基于 **DC Tool C Excel解析功能** 的完整实践,总结 Postgres-Only 架构下异步任务处理的标准模式。
### 核心价值
1.**避免HTTP超时**上传接口3秒返回解析在后台完成30-60秒
2.**用户体验优秀**:实时进度反馈,不需要傻等
3.**符合云原生规范**Platform-Only模式pg-boss队列
4.**性能优化**clean data缓存避免重复计算-99%耗时)
---
## 🏗️ 架构设计
### 三层架构
```
┌─────────────────────────────────────────────────────────┐
│ 前端层React + React Query
│ - 上传文件(立即返回 sessionId + jobId
│ - 轮询状态useQuery + refetchInterval自动串行
│ - 监听 status='ready',加载数据 │
└─────────────────────────────────────────────────────────┘
↓ HTTP
┌─────────────────────────────────────────────────────────┐
│ 后端层Fastify + Prisma
│ - 快速上传到 OSS2-3秒
│ - 创建 Session状态processing
│ - 推送任务到 pg-boss立即返回
│ - 提供状态查询 API │
└─────────────────────────────────────────────────────────┘
↓ pg-boss
┌─────────────────────────────────────────────────────────┐
│ Worker层pg-boss + Platform层
│ - 从队列取任务(自动串行) │
│ - 执行耗时操作(解析、清洗、统计) │
│ - 保存结果clean data 到 OSS
│ - 更新 Session填充元数据
└─────────────────────────────────────────────────────────┘
```
---
## 🚀 完整实施步骤
### 步骤1数据库Schema设计
```prisma
// 业务表只存业务信息,不存任务管理信息
model YourBusinessTable {
id String @id
userId String
fileKey String // OSS原始文件
// ✅ 性能优化:保存处理结果
cleanDataKey String? // 清洗/处理后的数据(避免重复计算)
// 数据元信息(异步填充)
totalRows Int?
totalCols Int?
columns Json?
// 时间戳
createdAt DateTime
updatedAt DateTime
expiresAt DateTime
@@schema("your_schema")
}
```
**关键点**
- ❌ 不要添加 `status``progress``errorMessage` 等任务管理字段
- ✅ 这些字段由 pg-boss 的 `job` 表管理
---
### 步骤2Service层 - 快速上传+推送任务
```typescript
// backend/src/modules/your-module/services/YourService.ts
import { storage } from '@/common/storage';
import { jobQueue } from '@/common/jobs';
import { prisma } from '@/config/database';
export class YourService {
/**
* 创建任务并推送到队列Postgres-Only架构
*
* ✅ Platform-Only 模式:
* - 立即上传文件到 OSS
* - 创建业务记录元数据为null
* - 推送任务到队列
* - 立即返回(不阻塞请求)
*/
async createTask(userId: string, fileName: string, fileBuffer: Buffer) {
// 1. 验证文件
if (fileBuffer.length > MAX_FILE_SIZE) {
throw new Error('文件太大');
}
// 2. ⚡ 立即上传到 OSS2-3秒
const fileKey = `path/${userId}/${Date.now()}-${fileName}`;
await storage.upload(fileKey, fileBuffer);
// 3. ⚡ 创建业务记录元数据为null等Worker填充
const record = await prisma.yourTable.create({
data: {
userId,
fileName,
fileKey,
// ⚠️ 处理结果字段为 null
totalRows: null,
columns: null,
expiresAt: new Date(Date.now() + 10 * 60 * 1000),
},
});
// 4. ⚡ 推送任务到 pg-bossPlatform-Only
const job = await jobQueue.push('your_module_process', {
recordId: record.id,
fileKey,
userId,
});
// 5. ⚡ 立即返回(总耗时<3秒
return {
...record,
jobId: job.id, // ✅ 返回 jobId 供前端轮询
};
}
}
```
---
### 步骤3Worker层 - 后台处理
```typescript
// backend/src/modules/your-module/workers/yourWorker.ts
import { jobQueue } from '@/common/jobs';
import { storage } from '@/common/storage';
import { prisma } from '@/config/database';
import { logger } from '@/common/logging';
interface YourJob {
recordId: string;
fileKey: string;
userId: string;
}
/**
* 注册 Worker 到队列
*/
export function registerYourWorker() {
logger.info('[YourWorker] Registering worker');
// ⚠️ 队列名称:只能用字母、数字、下划线、连字符
jobQueue.process<YourJob>('your_module_process', async (job) => {
const { recordId, fileKey } = job.data;
logger.info('[YourWorker] Processing job', { jobId: job.id, recordId });
try {
// 1. 从 OSS 下载文件
const buffer = await storage.download(fileKey);
// 2. 执行耗时操作(解析、处理、计算)
const result = await yourLongTimeProcess(buffer);
const { processedData, totalRows, columns } = result;
// 3. ✅ 保存处理结果到 OSS避免重复计算
const cleanDataKey = `${fileKey}_clean.json`;
const cleanDataBuffer = Buffer.from(JSON.stringify(processedData), 'utf-8');
await storage.upload(cleanDataKey, cleanDataBuffer);
logger.info('[YourWorker] Clean data saved', {
size: `${(cleanDataBuffer.length / 1024).toFixed(2)} KB`
});
// 4. 更新业务记录(填充元数据)
await prisma.yourTable.update({
where: { id: recordId },
data: {
cleanDataKey, // ✅ 保存 clean data 位置
totalRows,
columns,
updatedAt: new Date(),
},
});
logger.info('[YourWorker] ✅ Job completed', { jobId: job.id });
return { success: true, recordId, totalRows };
} catch (error: any) {
logger.error('[YourWorker] ❌ Job failed', {
jobId: job.id,
error: error.message
});
throw error; // 让 pg-boss 处理重试
}
});
logger.info('[YourWorker] ✅ Worker registered: your_module_process');
}
```
---
### 步骤4Controller层 - 状态查询API
```typescript
// backend/src/modules/your-module/controllers/YourController.ts
import { jobQueue } from '@/common/jobs';
export class YourController {
/**
* 获取任务状态Platform-Only模式
*
* GET /api/v1/your-module/tasks/:id/status
* Query: jobId (可选)
*/
async getTaskStatus(request, reply) {
const { id: recordId } = request.params;
const { jobId } = request.query;
// 1. 查询业务记录
const record = await prisma.yourTable.findUnique({
where: { id: recordId }
});
if (!record) {
return reply.code(404).send({ success: false, error: '记录不存在' });
}
// 2. 判断状态
// - 如果 totalRows 不为 null说明处理完成
// - 否则查询 job 状态
if (record.totalRows !== null) {
return reply.send({
success: true,
data: {
recordId,
status: 'ready', // ✅ 处理完成
progress: 100,
record,
},
});
}
// 3. 处理中,查询 pg-boss
if (!jobId) {
return reply.send({
success: true,
data: {
recordId,
status: 'processing',
progress: 50,
},
});
}
// 4. 从 pg-boss 查询 job 状态
const job = await jobQueue.getJob(jobId);
const status = job?.status === 'completed' ? 'ready' :
job?.status === 'failed' ? 'error' : 'processing';
const progress = status === 'ready' ? 100 :
status === 'error' ? 0 : 70;
return reply.send({
success: true,
data: {
recordId,
jobId,
status,
progress,
record,
},
});
}
}
```
---
### 步骤5前端 - React Query 轮询
```typescript
// frontend-v2/src/modules/your-module/hooks/useTaskStatus.ts
import { useQuery } from '@tanstack/react-query';
import * as api from '../api';
/**
* 任务状态轮询 Hook
*
* 特点:
* - 自动串行轮询React Query 内置防并发)
* - 自动清理(组件卸载时停止)
* - 条件停止(完成/失败时自动停止)
*/
export function useTaskStatus({
recordId,
jobId,
enabled = true,
}) {
const { data, isLoading, error } = useQuery({
queryKey: ['taskStatus', recordId, jobId],
queryFn: () => api.getTaskStatus(recordId, jobId),
enabled: enabled && !!recordId && !!jobId,
refetchInterval: (query) => {
const status = query.state.data?.data?.status;
// ✅ 完成或失败时停止轮询
if (status === 'ready' || status === 'error') {
return false;
}
// ✅ 处理中时每2秒轮询自动串行
return 2000;
},
staleTime: 0, // 始终视为过时,确保轮询
retry: 1,
});
const statusInfo = data?.data;
const status = statusInfo?.status || 'processing';
const progress = statusInfo?.progress || 0;
return {
status,
progress,
isReady: status === 'ready',
isError: status === 'error',
isLoading,
error,
};
}
```
---
### 步骤6前端组件 - 使用Hook
```typescript
// frontend-v2/src/modules/your-module/pages/YourPage.tsx
import { useTaskStatus } from '../hooks/useTaskStatus';
const YourPage = () => {
const [pollingInfo, setPollingInfo] = useState<{
recordId: string;
jobId: string;
} | null>(null);
// ✅ 使用 React Query Hook 自动轮询
const { status, progress, isReady } = useTaskStatus({
recordId: pollingInfo?.recordId || null,
jobId: pollingInfo?.jobId || null,
enabled: !!pollingInfo,
});
// ✅ 监听状态变化
useEffect(() => {
if (isReady && pollingInfo) {
console.log('✅ 处理完成,加载数据');
// 停止轮询
setPollingInfo(null);
// 加载数据
loadData(pollingInfo.recordId);
}
}, [isReady, pollingInfo]);
// 上传文件
const handleUpload = async (file) => {
const result = await api.uploadFile(file);
const { recordId, jobId } = result.data;
// ✅ 启动轮询设置状态React Query自动开始
setPollingInfo({ recordId, jobId });
};
return (
<div>
{/* 进度条 */}
{pollingInfo && (
<div className="progress-bar">
<div style={{ width: `${progress}%` }} />
<span>{progress}%</span>
</div>
)}
{/* 上传按钮 */}
<button onClick={() => handleUpload(file)}></button>
</div>
);
};
```
---
## 🎯 关键技术点
### 1. 队列名称规范
**错误**
```typescript
'asl:screening:batch' // 包含冒号pg-boss不支持
'dc.toolc.parse' // 包含点号,不推荐
```
**正确**
```typescript
'asl_screening_batch' // 下划线
'dc_toolc_parse_excel' // 下划线
```
---
### 2. Worker注册时机
```typescript
// backend/src/index.ts
await jobQueue.start(); // ← 必须先启动队列
registerYourWorker(); // ← 再注册 Worker
registerOtherWorker();
// ✅ 等待3秒确保异步注册完成
await new Promise(resolve => setTimeout(resolve, 3000));
logger.info('✅ All workers registered');
```
---
### 3. clean data 缓存机制
**目的**避免重复计算性能提升99%
```typescript
// Worker 保存 clean data
const cleanDataKey = `${fileKey}_clean.json`;
await storage.upload(cleanDataKey, JSON.stringify(processedData));
await prisma.update({
where: { id },
data: {
cleanDataKey, // ← 记录位置
totalRows,
columns,
}
});
// Service 读取数据(优先 clean data
async getFullData(recordId) {
const record = await prisma.findUnique({ where: { id: recordId } });
// ✅ 优先读取 clean data<1秒
if (record.cleanDataKey) {
const buffer = await storage.download(record.cleanDataKey);
return JSON.parse(buffer.toString('utf-8'));
}
// ⚠️ Fallback重新解析兼容旧数据
const buffer = await storage.download(record.fileKey);
return parseFile(buffer);
}
// ⚠️ 重要:操作后要同步更新 clean data
async saveProcessedData(recordId, newData) {
const record = await getRecord(recordId);
// 覆盖原文件
await storage.upload(record.fileKey, toExcel(newData));
// ✅ 同时更新 clean data
if (record.cleanDataKey) {
await storage.upload(record.cleanDataKey, JSON.stringify(newData));
}
// 更新元数据
await prisma.update({ where: { id: recordId }, data: { ... } });
}
```
---
### 4. React Query 轮询(推荐)
**优点**
- ✅ 自动串行(防并发风暴)
- ✅ 自动去重同一queryKey只有一个请求
- ✅ 自动清理(组件卸载时停止)
- ✅ 条件停止(动态控制)
**不要使用 setInterval**
```typescript
const pollInterval = setInterval(() => {
api.getStatus(); // 可能并发
}, 2000);
```
---
## 📊 性能对比
### DC Tool C 实际数据3339行×151列文件
| 指标 | 同步处理 | 异步处理 | 改善 |
|------|---------|---------|------|
| **上传耗时** | 47秒阻塞 | 3秒立即返回 | ✅ -94% |
| **HTTP超时** | ❌ 经常超时 | ✅ 不会超时 | ✅ 100% |
| **getPreviewData** | 43秒重复解析 | 0.5秒(缓存) | ✅ -99% |
| **getFullData** | 43秒重复解析 | 0.5秒(缓存) | ✅ -99% |
| **QuickAction操作** | 43秒 + Python | 0.5秒 + Python | ✅ -95% |
| **并发请求** | 15+个 | 1个串行 | ✅ -93% |
---
## ⚠️ 常见问题
### Q1: Worker 注册了但不工作?
**检查**
- 队列名称是否包含冒号(`:`)?改为下划线(`_`
- 环境变量 `QUEUE_TYPE=pgboss` 是否设置?
- Worker 注册是否在 `jobQueue.start()` 之后?
### Q2: 轮询风暴(多个并发请求)?
**解决**:使用 React Query不要用 setInterval
### Q3: 导出数据不对(是原始数据)?
**原因**`saveProcessedData` 没有更新 clean data
**解决**:同时更新 fileKey 和 cleanDataKey
---
## 📚 参考实现
| 模块 | Worker | 前端Hook | 文档 |
|------|--------|---------|------|
| **DC Tool C** | `parseExcelWorker.ts` | `useSessionStatus.ts` | 本指南基础 |
| **ASL 智能文献** | `screeningWorker.ts` | `useScreeningTask.ts` | [ASL模块状态](../03-业务模块/ASL-AI智能文献/00-模块当前状态与开发指南.md) |
| **DC Tool B** | `extractionWorker.ts` | - | [DC模块状态](../03-业务模块/DC-数据清洗整理/00-模块当前状态与开发指南.md) |
---
## ✅ 检查清单
在实施异步任务前,请确认:
- [ ] 业务表只存业务信息(不包含 status 等字段)
- [ ] 队列名称使用下划线(不含冒号)
- [ ] 环境变量 `QUEUE_TYPE=pgboss` 已设置
- [ ] Worker 在 `jobQueue.start()` 之后注册
- [ ] 前端使用 React Query 轮询
- [ ] Service 优先读取 clean data
- [ ] saveProcessedData 同步更新 clean data
---
**维护者**: 平台架构团队
**最后更新**: 2025-12-22
**文档状态**: ✅ 已完成