Files
AIclinicalresearch/docs/02-通用能力层/Postgres-Only异步任务处理指南.md
HaHafeng 66255368b7 feat(admin): Add user management and upgrade to module permission system
Features - User Management (Phase 4.1):
- Database: Add user_modules table for fine-grained module permissions
- Database: Add 4 user permissions (view/create/edit/delete) to role_permissions
- Backend: UserService (780 lines) - CRUD with tenant isolation
- Backend: UserController + UserRoutes (648 lines) - 13 API endpoints
- Backend: Batch import users from Excel
- Frontend: UserListPage (412 lines) - list/filter/search/pagination
- Frontend: UserFormPage (341 lines) - create/edit with module config
- Frontend: UserDetailPage (393 lines) - details/tenant/module management
- Frontend: 3 modal components (592 lines) - import/assign/configure
- API: GET/POST/PUT/DELETE /api/admin/users/* endpoints

Architecture Upgrade - Module Permission System:
- Backend: Add getUserModules() method in auth.service
- Backend: Login API returns modules array in user object
- Frontend: AuthContext adds hasModule() method
- Frontend: Navigation filters modules based on user.modules
- Frontend: RouteGuard checks requiredModule instead of requiredVersion
- Frontend: Remove deprecated version-based permission system
- UX: Only show accessible modules in navigation (clean UI)
- UX: Smart redirect after login (avoid 403 for regular users)

Fixes:
- Fix UTF-8 encoding corruption in ~100 docs files
- Fix pageSize type conversion in userService (String to Number)
- Fix authUser undefined error in TopNavigation
- Fix login redirect logic with role-based access check
- Update Git commit guidelines v1.2 with UTF-8 safety rules

Database Changes:
- CREATE TABLE user_modules (user_id, tenant_id, module_code, is_enabled)
- ADD UNIQUE CONSTRAINT (user_id, tenant_id, module_code)
- INSERT 4 permissions + role assignments
- UPDATE PUBLIC tenant with 8 module subscriptions

Technical:
- Backend: 5 new files (~2400 lines)
- Frontend: 10 new files (~2500 lines)
- Docs: 1 development record + 2 status updates + 1 guideline update
- Total: ~4900 lines of code

Status: User management 100% complete, module permission system operational
2026-01-16 13:42:10 +08:00

16 KiB
Raw Blame History

Postgres-Only 异步任务处理指南

文档版本: v1.0
创建日期: 2025-12-22
维护者: 平台架构团队
适用场景: 长时间任务(>30秒、大文件处理、后台Worker
参考实现: DC Tool C Excel解析、ASL文献筛选、DC Tool B数据提取


📋 概述

本文档基于 DC Tool C Excel解析功能 的完整实践,总结 Postgres-Only 架构下异步任务处理的标准模式。

核心价值

  1. 避免HTTP超时上传接口3秒返回解析在后台完成30-60秒
  2. 用户体验优秀:实时进度反馈,不需要傻等
  3. 符合云原生规范Platform-Only模式pg-boss队列
  4. 性能优化clean data缓存避免重复计算-99%耗时)

🏗️ 架构设计

三层架构

┌─────────────────────────────────────────────────────────┐
│  前端层React + React Query                           │
│  - 上传文件(立即返回 sessionId + jobId                │
│  - 轮询状态useQuery + refetchInterval自动串行      │
│  - 监听 status='ready',加载数据                        │
└─────────────────────────────────────────────────────────┘
                        ↓ HTTP
┌─────────────────────────────────────────────────────────┐
│  后端层Fastify + Prisma                              │
│  - 快速上传到 OSS2-3秒                               │
│  - 创建 Session状态processing                      │
│  - 推送任务到 pg-boss立即返回                        │
│  - 提供状态查询 API                                      │
└─────────────────────────────────────────────────────────┘
                        ↓ pg-boss
┌─────────────────────────────────────────────────────────┐
│  Worker层pg-boss + Platform层                        │
│  - 从队列取任务(自动串行)                              │
│  - 执行耗时操作(解析、清洗、统计)                      │
│  - 保存结果clean data 到 OSS                        │
│  - 更新 Session填充元数据                           │
└─────────────────────────────────────────────────────────┘

🚀 完整实施步骤

步骤1数据库Schema设计

// 业务表只存业务信息,不存任务管理信息
model YourBusinessTable {
  id String @id
  userId String
  fileKey String  // OSS原始文件
  
  // ✅ 性能优化:保存处理结果
  cleanDataKey String?  // 清洗/处理后的数据(避免重复计算)
  
  // 数据元信息(异步填充)
  totalRows Int?
  totalCols Int?
  columns Json?
  
  // 时间戳
  createdAt DateTime
  updatedAt DateTime
  expiresAt DateTime
  
  @@schema("your_schema")
}

关键点

  • 不要添加 statusprogresserrorMessage 等任务管理字段
  • 这些字段由 pg-boss 的 job 表管理

步骤2Service层 - 快速上传+推送任务

// backend/src/modules/your-module/services/YourService.ts

import { storage } from '@/common/storage';
import { jobQueue } from '@/common/jobs';
import { prisma } from '@/config/database';

export class YourService {
  /**
   * 创建任务并推送到队列Postgres-Only架构
   * 
   * ✅ Platform-Only 模式:
   * - 立即上传文件到 OSS
   * - 创建业务记录元数据为null
   * - 推送任务到队列
   * - 立即返回(不阻塞请求)
   */
  async createTask(userId: string, fileName: string, fileBuffer: Buffer) {
    // 1. 验证文件
    if (fileBuffer.length > MAX_FILE_SIZE) {
      throw new Error('文件太大');
    }

    // 2. ⚡ 立即上传到 OSS2-3秒
    const fileKey = `path/${userId}/${Date.now()}-${fileName}`;
    await storage.upload(fileKey, fileBuffer);

    // 3. ⚡ 创建业务记录元数据为null等Worker填充
    const record = await prisma.yourTable.create({
      data: {
        userId,
        fileName,
        fileKey,
        // ⚠️ 处理结果字段为 null
        totalRows: null,
        columns: null,
        expiresAt: new Date(Date.now() + 10 * 60 * 1000),
      },
    });

    // 4. ⚡ 推送任务到 pg-bossPlatform-Only
    const job = await jobQueue.push('your_module_process', {
      recordId: record.id,
      fileKey,
      userId,
    });

    // 5. ⚡ 立即返回(总耗时<3秒
    return {
      ...record,
      jobId: job.id,  // ✅ 返回 jobId 供前端轮询
    };
  }
}

步骤3Worker层 - 后台处理

// backend/src/modules/your-module/workers/yourWorker.ts

import { jobQueue } from '@/common/jobs';
import { storage } from '@/common/storage';
import { prisma } from '@/config/database';
import { logger } from '@/common/logging';

interface YourJob {
  recordId: string;
  fileKey: string;
  userId: string;
}

/**
 * 注册 Worker 到队列
 */
export function registerYourWorker() {
  logger.info('[YourWorker] Registering worker');

  // ⚠️ 队列名称:只能用字母、数字、下划线、连字符
  jobQueue.process<YourJob>('your_module_process', async (job) => {
    const { recordId, fileKey } = job.data;

    logger.info('[YourWorker] Processing job', { jobId: job.id, recordId });

    try {
      // 1. 从 OSS 下载文件
      const buffer = await storage.download(fileKey);

      // 2. 执行耗时操作(解析、处理、计算)
      const result = await yourLongTimeProcess(buffer);
      const { processedData, totalRows, columns } = result;

      // 3. ✅ 保存处理结果到 OSS避免重复计算
      const cleanDataKey = `${fileKey}_clean.json`;
      const cleanDataBuffer = Buffer.from(JSON.stringify(processedData), 'utf-8');
      await storage.upload(cleanDataKey, cleanDataBuffer);

      logger.info('[YourWorker] Clean data saved', { 
        size: `${(cleanDataBuffer.length / 1024).toFixed(2)} KB` 
      });

      // 4. 更新业务记录(填充元数据)
      await prisma.yourTable.update({
        where: { id: recordId },
        data: {
          cleanDataKey,  // ✅ 保存 clean data 位置
          totalRows,
          columns,
          updatedAt: new Date(),
        },
      });

      logger.info('[YourWorker] ✅ Job completed', { jobId: job.id });

      return { success: true, recordId, totalRows };
    } catch (error: any) {
      logger.error('[YourWorker] ❌ Job failed', { 
        jobId: job.id, 
        error: error.message 
      });
      throw error;  // 让 pg-boss 处理重试
    }
  });

  logger.info('[YourWorker] ✅ Worker registered: your_module_process');
}

步骤4Controller层 - 状态查询API

// backend/src/modules/your-module/controllers/YourController.ts

import { jobQueue } from '@/common/jobs';

export class YourController {
  /**
   * 获取任务状态Platform-Only模式
   * 
   * GET /api/v1/your-module/tasks/:id/status
   * Query: jobId (可选)
   */
  async getTaskStatus(request, reply) {
    const { id: recordId } = request.params;
    const { jobId } = request.query;

    // 1. 查询业务记录
    const record = await prisma.yourTable.findUnique({
      where: { id: recordId }
    });

    if (!record) {
      return reply.code(404).send({ success: false, error: '记录不存在' });
    }

    // 2. 判断状态
    //    - 如果 totalRows 不为 null说明处理完成
    //    - 否则查询 job 状态
    if (record.totalRows !== null) {
      return reply.send({
        success: true,
        data: {
          recordId,
          status: 'ready',  // ✅ 处理完成
          progress: 100,
          record,
        },
      });
    }

    // 3. 处理中,查询 pg-boss
    if (!jobId) {
      return reply.send({
        success: true,
        data: {
          recordId,
          status: 'processing',
          progress: 50,
        },
      });
    }

    // 4. 从 pg-boss 查询 job 状态
    const job = await jobQueue.getJob(jobId);

    const status = job?.status === 'completed' ? 'ready' :
                  job?.status === 'failed' ? 'error' : 'processing';

    const progress = status === 'ready' ? 100 :
                    status === 'error' ? 0 : 70;

    return reply.send({
      success: true,
      data: {
        recordId,
        jobId,
        status,
        progress,
        record,
      },
    });
  }
}

步骤5前端 - React Query 轮询

// frontend-v2/src/modules/your-module/hooks/useTaskStatus.ts

import { useQuery } from '@tanstack/react-query';
import * as api from '../api';

/**
 * 任务状态轮询 Hook
 * 
 * 特点:
 * - 自动串行轮询React Query 内置防并发)
 * - 自动清理(组件卸载时停止)
 * - 条件停止(完成/失败时自动停止)
 */
export function useTaskStatus({
  recordId,
  jobId,
  enabled = true,
}) {
  const { data, isLoading, error } = useQuery({
    queryKey: ['taskStatus', recordId, jobId],
    queryFn: () => api.getTaskStatus(recordId, jobId),
    enabled: enabled && !!recordId && !!jobId,
    refetchInterval: (query) => {
      const status = query.state.data?.data?.status;
      
      // ✅ 完成或失败时停止轮询
      if (status === 'ready' || status === 'error') {
        return false;
      }
      
      // ✅ 处理中时每2秒轮询自动串行
      return 2000;
    },
    staleTime: 0,  // 始终视为过时,确保轮询
    retry: 1,
  });

  const statusInfo = data?.data;
  const status = statusInfo?.status || 'processing';
  const progress = statusInfo?.progress || 0;

  return {
    status,
    progress,
    isReady: status === 'ready',
    isError: status === 'error',
    isLoading,
    error,
  };
}

步骤6前端组件 - 使用Hook

// frontend-v2/src/modules/your-module/pages/YourPage.tsx

import { useTaskStatus } from '../hooks/useTaskStatus';

const YourPage = () => {
  const [pollingInfo, setPollingInfo] = useState<{
    recordId: string;
    jobId: string;
  } | null>(null);

  // ✅ 使用 React Query Hook 自动轮询
  const { status, progress, isReady } = useTaskStatus({
    recordId: pollingInfo?.recordId || null,
    jobId: pollingInfo?.jobId || null,
    enabled: !!pollingInfo,
  });

  // ✅ 监听状态变化
  useEffect(() => {
    if (isReady && pollingInfo) {
      console.log('✅ 处理完成,加载数据');
      
      // 停止轮询
      setPollingInfo(null);
      
      // 加载数据
      loadData(pollingInfo.recordId);
    }
  }, [isReady, pollingInfo]);

  // 上传文件
  const handleUpload = async (file) => {
    const result = await api.uploadFile(file);
    const { recordId, jobId } = result.data;
    
    // ✅ 启动轮询设置状态React Query自动开始
    setPollingInfo({ recordId, jobId });
  };

  return (
    <div>
      {/* 进度条 */}
      {pollingInfo && (
        <div className="progress-bar">
          <div style={{ width: `${progress}%` }} />
          <span>{progress}%</span>
        </div>
      )}
      
      {/* 上传按钮 */}
      <button onClick={() => handleUpload(file)}>上传</button>
    </div>
  );
};

🎯 关键技术点

1. 队列名称规范

错误

 'asl:screening:batch'  // 包含冒号pg-boss不支持
 'dc.toolc.parse'       // 包含点号,不推荐

正确

 'asl_screening_batch'  // 下划线
 'dc_toolc_parse_excel' // 下划线

2. Worker注册时机

// backend/src/index.ts

await jobQueue.start();  // ← 必须先启动队列

registerYourWorker();    // ← 再注册 Worker
registerOtherWorker();

// ✅ 等待3秒确保异步注册完成
await new Promise(resolve => setTimeout(resolve, 3000));

logger.info('✅ All workers registered');

3. clean data 缓存机制

目的避免重复计算性能提升99%

// Worker 保存 clean data
const cleanDataKey = `${fileKey}_clean.json`;
await storage.upload(cleanDataKey, JSON.stringify(processedData));

await prisma.update({
  where: { id },
  data: {
    cleanDataKey,  // ← 记录位置
    totalRows,
    columns,
  }
});

// Service 读取数据(优先 clean data
async getFullData(recordId) {
  const record = await prisma.findUnique({ where: { id: recordId } });
  
  // ✅ 优先读取 clean data<1秒
  if (record.cleanDataKey) {
    const buffer = await storage.download(record.cleanDataKey);
    return JSON.parse(buffer.toString('utf-8'));
  }
  
  // ⚠️ Fallback重新解析兼容旧数据
  const buffer = await storage.download(record.fileKey);
  return parseFile(buffer);
}

// ⚠️ 重要:操作后要同步更新 clean data
async saveProcessedData(recordId, newData) {
  const record = await getRecord(recordId);
  
  // 覆盖原文件
  await storage.upload(record.fileKey, toExcel(newData));
  
  // ✅ 同时更新 clean data
  if (record.cleanDataKey) {
    await storage.upload(record.cleanDataKey, JSON.stringify(newData));
  }
  
  // 更新元数据
  await prisma.update({ where: { id: recordId }, data: { ... } });
}

4. React Query 轮询(推荐)

优点

  • 自动串行(防并发风暴)
  • 自动去重同一queryKey只有一个请求
  • 自动清理(组件卸载时停止)
  • 条件停止(动态控制)

不要使用 setInterval

 const pollInterval = setInterval(() => {
  api.getStatus();  // 可能并发
}, 2000);

📊 性能对比

DC Tool C 实际数据3339行×151列文件

指标 同步处理 异步处理 改善
上传耗时 47秒阻塞 3秒立即返回 -94%
HTTP超时 经常超时 不会超时 100%
getPreviewData 43秒重复解析 0.5秒(缓存) -99%
getFullData 43秒重复解析 0.5秒(缓存) -99%
QuickAction操作 43秒 + Python 0.5秒 + Python -95%
并发请求 15+个 1个串行 -93%

⚠️ 常见问题

Q1: Worker 注册了但不工作?

检查

  • 队列名称是否包含冒号(:)?改为下划线(_
  • 环境变量 QUEUE_TYPE=pgboss 是否设置?
  • Worker 注册是否在 jobQueue.start() 之后?

Q2: 轮询风暴(多个并发请求)?

解决:使用 React Query不要用 setInterval

Q3: 导出数据不对(是原始数据)?

原因saveProcessedData 没有更新 clean data
解决:同时更新 fileKey 和 cleanDataKey


📚 参考实现

模块 Worker 前端Hook 文档
DC Tool C parseExcelWorker.ts useSessionStatus.ts 本指南基础
ASL 智能文献 screeningWorker.ts useScreeningTask.ts ASL模块状态
DC Tool B extractionWorker.ts - DC模块状态

检查清单

在实施异步任务前,请确认:

  • 业务表只存业务信息(不包含 status 等字段)
  • 队列名称使用下划线(不含冒号)
  • 环境变量 QUEUE_TYPE=pgboss 已设置
  • Worker 在 jobQueue.start() 之后注册
  • 前端使用 React Query 轮询
  • Service 优先读取 clean data
  • saveProcessedData 同步更新 clean data

维护者: 平台架构团队
最后更新: 2025-12-22
文档状态: 已完成