Files
AIclinicalresearch/docs/02-通用能力层/Postgres-Only异步任务处理指南.md
HaHafeng 371fa53956 docs(asl): Upgrade Tool 3 architecture from Fan-out to Scatter+Aggregator (v2.0)
Architecture transformation:
- Replace Fan-out (Manager->Child->Last Child Wins) with Scatter+Aggregator pattern
- API layer directly dispatches N independent jobs (no Manager)
- Worker only writes its own Result row, never touches Task table (zero row-lock)
- Aggregator polls groupBy for completion + zombie cleanup (replaces Sweeper)
- Reduce red lines from 13 to 9, eliminate distributed complexity

Documents updated (10 files):
- 08-Tool3 main architecture doc: v2.0 rewrite (schema, Task 2.3/2.4, red lines, risks)
- 08d-Code patterns: rewrite sections 4.1-4.6 (API dispatch, SingleWorker, Aggregator)
- 08a-M1 sprint: rewrite M1-3 core (Worker+Aggregator), red lines, acceptance criteria
- 08b-M2 sprint: simplify SSE (NOTIFY/LISTEN downgraded to P2 optional)
- 08c-M3 sprint: milestone table wording update
- New: Scatter+Polling Aggregator pattern guide v1.1 (Level 2 cookbook)
- New: V2.0 architecture deep review and gap-fix report
- Updated: ASL module status, system status, capability layer index

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-02-24 22:11:09 +08:00

21 KiB
Raw Blame History

Postgres-Only 异步任务处理指南

文档版本: v1.12026-01-23 安全规范更新)
创建日期: 2025-12-22
最后更新: 2026-01-23
维护者: 平台架构团队
适用场景: 长时间任务(>30秒、大文件处理、后台Worker
参考实现: DC Tool C Excel解析、ASL文献筛选、DC Tool B数据提取

⚠️ 重要更新 v1.1:新增🛡️ 安全规范章节,包含幂等性、错误处理等强制规范


📋 概述

本文档基于 DC Tool C Excel解析功能 的完整实践,总结 Postgres-Only 架构下异步任务处理的标准模式。

核心价值

  1. 避免HTTP超时上传接口3秒返回解析在后台完成30-60秒
  2. 用户体验优秀:实时进度反馈,不需要傻等
  3. 符合云原生规范Platform-Only模式pg-boss队列
  4. 性能优化clean data缓存避免重复计算-99%耗时)

🏗️ 架构设计

三层架构

┌─────────────────────────────────────────────────────────┐
│  前端层React + React Query                           │
│  - 上传文件(立即返回 sessionId + jobId                │
│  - 轮询状态useQuery + refetchInterval自动串行      │
│  - 监听 status='ready',加载数据                        │
└─────────────────────────────────────────────────────────┘
                        ↓ HTTP
┌─────────────────────────────────────────────────────────┐
│  后端层Fastify + Prisma                              │
│  - 快速上传到 OSS2-3秒                               │
│  - 创建 Session状态processing                      │
│  - 推送任务到 pg-boss立即返回                        │
│  - 提供状态查询 API                                      │
└─────────────────────────────────────────────────────────┘
                        ↓ pg-boss
┌─────────────────────────────────────────────────────────┐
│  Worker层pg-boss + Platform层                        │
│  - 从队列取任务(自动串行)                              │
│  - 执行耗时操作(解析、清洗、统计)                      │
│  - 保存结果clean data 到 OSS                        │
│  - 更新 Session填充元数据                           │
└─────────────────────────────────────────────────────────┘

🚀 完整实施步骤

步骤1数据库Schema设计

// 业务表只存业务信息,不存任务管理信息
model YourBusinessTable {
  id String @id
  userId String
  fileKey String  // OSS原始文件
  
  // ✅ 性能优化:保存处理结果
  cleanDataKey String?  // 清洗/处理后的数据(避免重复计算)
  
  // 数据元信息(异步填充)
  totalRows Int?
  totalCols Int?
  columns Json?
  
  // 时间戳
  createdAt DateTime
  updatedAt DateTime
  expiresAt DateTime
  
  @@schema("your_schema")
}

关键点

  • 不要添加 statusprogresserrorMessage 等任务管理字段
  • 这些字段由 pg-boss 的 job 表管理

步骤2Service层 - 快速上传+推送任务

// backend/src/modules/your-module/services/YourService.ts

import { storage } from '@/common/storage';
import { jobQueue } from '@/common/jobs';
import { prisma } from '@/config/database';

export class YourService {
  /**
   * 创建任务并推送到队列Postgres-Only架构
   * 
   * ✅ Platform-Only 模式:
   * - 立即上传文件到 OSS
   * - 创建业务记录元数据为null
   * - 推送任务到队列
   * - 立即返回(不阻塞请求)
   */
  async createTask(userId: string, fileName: string, fileBuffer: Buffer) {
    // 1. 验证文件
    if (fileBuffer.length > MAX_FILE_SIZE) {
      throw new Error('文件太大');
    }

    // 2. ⚡ 立即上传到 OSS2-3秒
    const fileKey = `path/${userId}/${Date.now()}-${fileName}`;
    await storage.upload(fileKey, fileBuffer);

    // 3. ⚡ 创建业务记录元数据为null等Worker填充
    const record = await prisma.yourTable.create({
      data: {
        userId,
        fileName,
        fileKey,
        // ⚠️ 处理结果字段为 null
        totalRows: null,
        columns: null,
        expiresAt: new Date(Date.now() + 10 * 60 * 1000),
      },
    });

    // 4. ⚡ 推送任务到 pg-bossPlatform-Only
    const job = await jobQueue.push('your_module_process', {
      recordId: record.id,
      fileKey,
      userId,
    });

    // 5. ⚡ 立即返回(总耗时<3秒
    return {
      ...record,
      jobId: job.id,  // ✅ 返回 jobId 供前端轮询
    };
  }
}

步骤3Worker层 - 后台处理

// backend/src/modules/your-module/workers/yourWorker.ts

import { jobQueue } from '@/common/jobs';
import { storage } from '@/common/storage';
import { prisma } from '@/config/database';
import { logger } from '@/common/logging';

interface YourJob {
  recordId: string;
  fileKey: string;
  userId: string;
}

/**
 * 注册 Worker 到队列
 */
export function registerYourWorker() {
  logger.info('[YourWorker] Registering worker');

  // ⚠️ 队列名称:只能用字母、数字、下划线、连字符
  jobQueue.process<YourJob>('your_module_process', async (job) => {
    const { recordId, fileKey } = job.data;

    logger.info('[YourWorker] Processing job', { jobId: job.id, recordId });

    try {
      // 1. 从 OSS 下载文件
      const buffer = await storage.download(fileKey);

      // 2. 执行耗时操作(解析、处理、计算)
      const result = await yourLongTimeProcess(buffer);
      const { processedData, totalRows, columns } = result;

      // 3. ✅ 保存处理结果到 OSS避免重复计算
      const cleanDataKey = `${fileKey}_clean.json`;
      const cleanDataBuffer = Buffer.from(JSON.stringify(processedData), 'utf-8');
      await storage.upload(cleanDataKey, cleanDataBuffer);

      logger.info('[YourWorker] Clean data saved', { 
        size: `${(cleanDataBuffer.length / 1024).toFixed(2)} KB` 
      });

      // 4. 更新业务记录(填充元数据)
      await prisma.yourTable.update({
        where: { id: recordId },
        data: {
          cleanDataKey,  // ✅ 保存 clean data 位置
          totalRows,
          columns,
          updatedAt: new Date(),
        },
      });

      logger.info('[YourWorker] ✅ Job completed', { jobId: job.id });

      return { success: true, recordId, totalRows };
    } catch (error: any) {
      logger.error('[YourWorker] ❌ Job failed', { 
        jobId: job.id, 
        error: error.message 
      });
      throw error;  // 让 pg-boss 处理重试
    }
  });

  logger.info('[YourWorker] ✅ Worker registered: your_module_process');
}

步骤4Controller层 - 状态查询API

// backend/src/modules/your-module/controllers/YourController.ts

import { jobQueue } from '@/common/jobs';

export class YourController {
  /**
   * 获取任务状态Platform-Only模式
   * 
   * GET /api/v1/your-module/tasks/:id/status
   * Query: jobId (可选)
   */
  async getTaskStatus(request, reply) {
    const { id: recordId } = request.params;
    const { jobId } = request.query;

    // 1. 查询业务记录
    const record = await prisma.yourTable.findUnique({
      where: { id: recordId }
    });

    if (!record) {
      return reply.code(404).send({ success: false, error: '记录不存在' });
    }

    // 2. 判断状态
    //    - 如果 totalRows 不为 null说明处理完成
    //    - 否则查询 job 状态
    if (record.totalRows !== null) {
      return reply.send({
        success: true,
        data: {
          recordId,
          status: 'ready',  // ✅ 处理完成
          progress: 100,
          record,
        },
      });
    }

    // 3. 处理中,查询 pg-boss
    if (!jobId) {
      return reply.send({
        success: true,
        data: {
          recordId,
          status: 'processing',
          progress: 50,
        },
      });
    }

    // 4. 从 pg-boss 查询 job 状态
    const job = await jobQueue.getJob(jobId);

    const status = job?.status === 'completed' ? 'ready' :
                  job?.status === 'failed' ? 'error' : 'processing';

    const progress = status === 'ready' ? 100 :
                    status === 'error' ? 0 : 70;

    return reply.send({
      success: true,
      data: {
        recordId,
        jobId,
        status,
        progress,
        record,
      },
    });
  }
}

步骤5前端 - React Query 轮询

// frontend-v2/src/modules/your-module/hooks/useTaskStatus.ts

import { useQuery } from '@tanstack/react-query';
import * as api from '../api';

/**
 * 任务状态轮询 Hook
 * 
 * 特点:
 * - 自动串行轮询React Query 内置防并发)
 * - 自动清理(组件卸载时停止)
 * - 条件停止(完成/失败时自动停止)
 */
export function useTaskStatus({
  recordId,
  jobId,
  enabled = true,
}) {
  const { data, isLoading, error } = useQuery({
    queryKey: ['taskStatus', recordId, jobId],
    queryFn: () => api.getTaskStatus(recordId, jobId),
    enabled: enabled && !!recordId && !!jobId,
    refetchInterval: (query) => {
      const status = query.state.data?.data?.status;
      
      // ✅ 完成或失败时停止轮询
      if (status === 'ready' || status === 'error') {
        return false;
      }
      
      // ✅ 处理中时每2秒轮询自动串行
      return 2000;
    },
    staleTime: 0,  // 始终视为过时,确保轮询
    retry: 1,
  });

  const statusInfo = data?.data;
  const status = statusInfo?.status || 'processing';
  const progress = statusInfo?.progress || 0;

  return {
    status,
    progress,
    isReady: status === 'ready',
    isError: status === 'error',
    isLoading,
    error,
  };
}

步骤6前端组件 - 使用Hook

// frontend-v2/src/modules/your-module/pages/YourPage.tsx

import { useTaskStatus } from '../hooks/useTaskStatus';

const YourPage = () => {
  const [pollingInfo, setPollingInfo] = useState<{
    recordId: string;
    jobId: string;
  } | null>(null);

  // ✅ 使用 React Query Hook 自动轮询
  const { status, progress, isReady } = useTaskStatus({
    recordId: pollingInfo?.recordId || null,
    jobId: pollingInfo?.jobId || null,
    enabled: !!pollingInfo,
  });

  // ✅ 监听状态变化
  useEffect(() => {
    if (isReady && pollingInfo) {
      console.log('✅ 处理完成,加载数据');
      
      // 停止轮询
      setPollingInfo(null);
      
      // 加载数据
      loadData(pollingInfo.recordId);
    }
  }, [isReady, pollingInfo]);

  // 上传文件
  const handleUpload = async (file) => {
    const result = await api.uploadFile(file);
    const { recordId, jobId } = result.data;
    
    // ✅ 启动轮询设置状态React Query自动开始
    setPollingInfo({ recordId, jobId });
  };

  return (
    <div>
      {/* 进度条 */}
      {pollingInfo && (
        <div className="progress-bar">
          <div style={{ width: `${progress}%` }} />
          <span>{progress}%</span>
        </div>
      )}
      
      {/* 上传按钮 */}
      <button onClick={() => handleUpload(file)}>上传</button>
    </div>
  );
};

🎯 关键技术点

1. 队列名称规范

错误

 'asl:screening:batch'  // 包含冒号pg-boss不支持
 'dc.toolc.parse'       // 包含点号,不推荐

正确

 'asl_screening_batch'  // 下划线
 'dc_toolc_parse_excel' // 下划线

2. Worker注册时机

// backend/src/index.ts

await jobQueue.start();  // ← 必须先启动队列

registerYourWorker();    // ← 再注册 Worker
registerOtherWorker();

// ✅ 等待3秒确保异步注册完成
await new Promise(resolve => setTimeout(resolve, 3000));

logger.info('✅ All workers registered');

3. clean data 缓存机制

目的避免重复计算性能提升99%

// Worker 保存 clean data
const cleanDataKey = `${fileKey}_clean.json`;
await storage.upload(cleanDataKey, JSON.stringify(processedData));

await prisma.update({
  where: { id },
  data: {
    cleanDataKey,  // ← 记录位置
    totalRows,
    columns,
  }
});

// Service 读取数据(优先 clean data
async getFullData(recordId) {
  const record = await prisma.findUnique({ where: { id: recordId } });
  
  // ✅ 优先读取 clean data<1秒
  if (record.cleanDataKey) {
    const buffer = await storage.download(record.cleanDataKey);
    return JSON.parse(buffer.toString('utf-8'));
  }
  
  // ⚠️ Fallback重新解析兼容旧数据
  const buffer = await storage.download(record.fileKey);
  return parseFile(buffer);
}

// ⚠️ 重要:操作后要同步更新 clean data
async saveProcessedData(recordId, newData) {
  const record = await getRecord(recordId);
  
  // 覆盖原文件
  await storage.upload(record.fileKey, toExcel(newData));
  
  // ✅ 同时更新 clean data
  if (record.cleanDataKey) {
    await storage.upload(record.cleanDataKey, JSON.stringify(newData));
  }
  
  // 更新元数据
  await prisma.update({ where: { id: recordId }, data: { ... } });
}

4. React Query 轮询(推荐)

优点

  • 自动串行(防并发风暴)
  • 自动去重同一queryKey只有一个请求
  • 自动清理(组件卸载时停止)
  • 条件停止(动态控制)

不要使用 setInterval

 const pollInterval = setInterval(() => {
  api.getStatus();  // 可能并发
}, 2000);

📊 性能对比

DC Tool C 实际数据3339行×151列文件

指标 同步处理 异步处理 改善
上传耗时 47秒阻塞 3秒立即返回 -94%
HTTP超时 经常超时 不会超时 100%
getPreviewData 43秒重复解析 0.5秒(缓存) -99%
getFullData 43秒重复解析 0.5秒(缓存) -99%
QuickAction操作 43秒 + Python 0.5秒 + Python -95%
并发请求 15+个 1个串行 -93%

🛡️ 安全规范(强制)

更新日期2026-01-23
来源:内部逆向审查报告 + 生产问题修复

基于项目实际遇到的问题,以下规范 必须遵守

规范1禁止 Worker 递归死循环

错误示例

// ❌ 禁止:在 catch 块中重试业务逻辑
jobQueue.process('your_task', async (job) => {
  try {
    await doSomething(job.data);
  } catch (error) {
    // ❌ 错误!这会导致死循环或重复执行
    await doSomething(job.data);  
    throw error;
  }
});

正确做法

// ✅ 正确:直接 throw让 pg-boss 接管重试默认3次
jobQueue.process('your_task', async (job) => {
  try {
    await doSomething(job.data);
  } catch (error) {
    logger.error('Job failed', { jobId: job.id, error: error.message });
    throw error;  // ✅ pg-boss 会自动重试
  }
});

规范2禁止 Payload 膨胀

错误示例

// ❌ 禁止:在 job.data 中存大文件
await jobQueue.push('parse_excel', {
  fileContent: base64EncodedFile,  // ❌ 会导致 job 表膨胀
  imageData: base64Image,          // ❌ 拖慢数据库
});

正确做法

// ✅ 正确:只存 fileKey 或数据库 ID
await jobQueue.push('parse_excel', {
  sessionId: session.id,      // ✅ 只存 ID
  fileKey: 'path/to/file',    // ✅ 只存 OSS 路径
  userId: 'user-123',
});

规范3Worker 必须幂等

问题:任务失败重试时,可能导致重复写入、重复扣费、重复发邮件。

错误示例

// ❌ 非幂等:重试会创建多条记录
await prisma.screeningResult.create({
  data: { projectId, literatureId, result }
});

正确做法

// ✅ 方案1使用 upsert + 唯一约束
await prisma.screeningResult.upsert({
  where: {
    projectId_literatureId: { projectId, literatureId }
  },
  create: { projectId, literatureId, result },
  update: { result },  // 重试时覆盖
});

// ✅ 方案2先检查状态再执行
const existing = await prisma.task.findUnique({ where: { id: taskId } });
if (existing?.status === 'completed') {
  logger.info('Task already completed, skipping');
  return;
}
await doWork();

幂等性检查清单

操作类型 幂等方案
创建记录 使用 upsert + 唯一约束
更新记录 update 天然幂等
发送邮件 先检查 notificationSent 标志
扣费 使用幂等 key如订单号
调用外部API 检查是否已成功

规范4合理设置任务过期时间

默认配置(当前):

expireInSeconds: 6 * 60 * 60  // 6小时

推荐配置(按业务类型):

任务类型 过期时间 理由
asl_screening_batch 30分钟 单条文献筛选
dc_extraction_batch 1小时 批量数据提取
dc_toolc_parse_excel 30分钟 Excel解析
rvw_review_task 20分钟 审稿任务
asl_research_execute 30分钟 DeepSearch检索

规范5优雅关闭

已在 index.ts 实现

// 进程退出时优雅关闭
process.on('SIGTERM', async () => {
  await fastify.close();      // 停止接收新请求
  await jobQueue.stop();      // 等待当前任务完成
  await prisma.$disconnect(); // 关闭数据库
  process.exit(0);
});

规范6全局错误监听

已在 PgBossQueue.ts 实现

// 防止未捕获错误导致进程崩溃
this.boss.on('error', (err) => {
  if (err.code === '23505' && err.constraint === 'queue_pkey') {
    // 队列冲突,静默处理
    console.log('Queue concurrency conflict auto-resolved');
  } else {
    console.error('PgBoss critical error:', err);
  }
});

⚠️ 常见问题

Q1: Worker 注册了但不工作?

检查

  • 队列名称是否包含冒号(:)?改为下划线(_
  • 环境变量 QUEUE_TYPE=pgboss 是否设置?
  • Worker 注册是否在 jobQueue.start() 之后?

Q2: 轮询风暴(多个并发请求)?

解决:使用 React Query不要用 setInterval

Q3: 导出数据不对(是原始数据)?

原因saveProcessedData 没有更新 clean data
解决:同时更新 fileKey 和 cleanDataKey


📚 参考实现

模块 Worker 前端Hook 文档
DC Tool C parseExcelWorker.ts useSessionStatus.ts 本指南基础
ASL 智能文献 screeningWorker.ts useScreeningTask.ts ASL模块状态
DC Tool B extractionWorker.ts - DC模块状态
🆕ASL 工具 3 散装派发 + Aggregator useTaskStatus.ts 散装派发与轮询收口指南Level 2 批量模式)

检查清单

基础配置检查

  • 业务表只存业务信息(不包含 status 等字段)
  • 队列名称使用下划线(不含冒号)
  • 环境变量 QUEUE_TYPE=pgboss 已设置
  • Worker 在 jobQueue.start() 之后注册
  • 前端使用 React Query 轮询
  • Service 优先读取 clean data
  • saveProcessedData 同步更新 clean data

🛡️ 安全规范检查(强制)

  • 幂等性:使用 upsert 或先检查状态,确保重试安全
  • Payloadjob.data 只存 ID 和 fileKey不存大文件
  • 错误处理catch 块中直接 throw error,不要重试业务逻辑
  • 唯一约束:数据库表有合适的唯一索引防止重复写入
  • 过期时间:根据业务类型设置合理的 expireInSeconds

📊 故障排查 SQL

-- 查看队列健康状况
SELECT 
  name AS queue_name,
  state,
  COUNT(*) AS count
FROM platform_schema.job
WHERE created_on > NOW() - INTERVAL '24 hours'
GROUP BY name, state
ORDER BY name, state;

-- 查看失败任务
SELECT id, name, data, output, created_on 
FROM platform_schema.job 
WHERE state = 'failed' 
ORDER BY created_on DESC 
LIMIT 10;

-- 查看卡住的任务processing 超过1小时
SELECT id, name, data, created_on, started_on
FROM platform_schema.job 
WHERE state = 'active' 
  AND started_on < NOW() - INTERVAL '1 hour';

维护者: 平台架构团队
最后更新: 2026-01-23
文档状态: 已完成v1.1 安全规范更新)