Files
AIclinicalresearch/docs/07-运维文档/01-PgBoss队列监控与维护.md
HaHafeng bbf98c4d5c fix(backend): Resolve PgBoss infinite loop issue and cleanup unused files
Backend fixes:
- Fix PgBoss task infinite loop on SAE (root cause: missing queue table constraints)
- Add singletonKey to prevent duplicate job enqueueing
- Add idempotency check in reviewWorker (skip completed tasks)
- Add optimistic locking in reviewService (atomic status update)

Frontend fixes:
- Add isSubmitting state to prevent duplicate submissions in RVW Dashboard
- Fix API baseURL in knowledgeBaseApi (relative path)

Cleanup (removed):
- Old frontend/ directory (migrated to frontend-v2)
- python-microservice/ (unused, replaced by extraction_service)
- Root package.json and node_modules (accidentally created)
- redcap-docker-dev/ (external dependency)
- Various temporary files and outdated docs in root

New documentation:
- docs/07-运维文档/01-PgBoss队列监控与维护.md
- docs/07-运维文档/02-故障预防检查清单.md
- docs/07-运维文档/03-数据库迁移注意事项.md

Database fix applied to RDS:
- Added PRIMARY KEY to platform_schema.queue
- Added 3 missing foreign key constraints

Tested: Local build passed, RDS constraints verified
2026-01-27 18:16:22 +08:00

8.4 KiB
Raw Permalink Blame History

PgBoss 队列监控与维护手册

文档版本v1.0
创建日期2026-01-27
基于故障2026-01-27 RVW 模块任务无限循环故障


📋 目录

  1. 故障背景
  2. 架构说明
  3. 日常监控 SQL
  4. 故障排查指南
  5. 清理操作
  6. 预防措施

故障背景

2026-01-27 故障复盘

现象RVW 审稿模块任务完成后继续无限循环执行,前端不显示结果

表层原因:数据库中残留 7 个重复的队列定义,导致单实例在一次事件循环中为同一 taskId 创建了 7 个 Job

根本原因数据库迁移时 platform_schema.queue 表的主键约束丢失

环境 queue 表主键 结果
本地 queue_pkey (name 唯一) createQueue() 重复调用会报错被忽略
RDS 无主键(迁移丢失) createQueue() 每次都插入新行

证据

-- 同一 taskId 被处理 7 次
task_id: bd19c3d3-80cc-42f7-85a4-d38b17319a1b
created_on: 2026-01-27 16:06:07.446015+08  (全部相同!)
job_count: 7

修复

  1. 清理 32 个重复队列定义
  2. 添加主键约束:ALTER TABLE platform_schema.queue ADD PRIMARY KEY (name);
  3. 代码四层防御(前端锁 + API幂等 + singletonKey + Worker检查

详见:03-数据库迁移注意事项


架构说明

PgBoss 表结构

表名 说明 Schema
queue 队列定义(每种任务类型一条记录) platform_schema
job 任务记录(旧版,可能未使用) platform_schema
job_common 任务记录(当前使用) platform_schema
schedule 定时任务配置 platform_schema
subscription 订阅配置 platform_schema
version pg-boss 版本信息 platform_schema

任务状态流转

created → active → completed
                 → failed → retry → active
                         → expired

日常监控 SQL

1. 检查重复队列定义(🔴 每日必查)

-- 如果返回结果,说明有重复队列定义需要清理
SELECT name, COUNT(*) as cnt 
FROM platform_schema.queue 
GROUP BY name 
HAVING COUNT(*) > 1
ORDER BY cnt DESC;

预期结果无返回0 行)

异常处理:参考 清理操作


2. 检查任务状态分布

-- 查看各队列的任务状态分布
SELECT name, state, COUNT(*) as count
FROM platform_schema.job_common 
GROUP BY name, state
ORDER BY name, state;

关注点

  • active 状态任务不应该长期存在
  • created 状态任务堆积说明 Worker 未启动或有问题
  • retry 状态任务过多说明有系统性错误

3. 检查同一 taskId 重复处理

-- 检查是否有任务被重复处理
SELECT 
  data->>'taskId' as task_id, 
  COUNT(*) as job_count,
  MIN(created_on) as first_run,
  MAX(completed_on) as last_run
FROM platform_schema.job_common 
WHERE name = 'rvw_review_task'
  AND created_on > NOW() - INTERVAL '24 hours'
GROUP BY data->>'taskId'
HAVING COUNT(*) > 1
ORDER BY job_count DESC;

预期结果:无返回(每个 taskId 只应处理一次)


4. 检查卡住的任务

-- 查找运行超过 30 分钟的活跃任务
SELECT 
  id,
  name,
  state,
  data->>'taskId' as task_id,
  created_on,
  started_on,
  EXTRACT(EPOCH FROM (NOW() - started_on))/60 as running_minutes
FROM platform_schema.job_common 
WHERE state = 'active'
  AND started_on < NOW() - INTERVAL '30 minutes'
ORDER BY started_on;

5. 队列健康检查汇总

-- 一键健康检查
SELECT 
  'queue_duplicates' as check_type,
  CASE WHEN COUNT(*) > 0 THEN '❌ 异常' ELSE '✅ 正常' END as status,
  COUNT(*) as count
FROM (
  SELECT name FROM platform_schema.queue GROUP BY name HAVING COUNT(*) > 1
) t

UNION ALL

SELECT 
  'stuck_active_jobs',
  CASE WHEN COUNT(*) > 0 THEN '⚠️ 警告' ELSE '✅ 正常' END,
  COUNT(*)
FROM platform_schema.job_common 
WHERE state = 'active' AND started_on < NOW() - INTERVAL '30 minutes'

UNION ALL

SELECT 
  'duplicate_tasks_24h',
  CASE WHEN COUNT(*) > 0 THEN '❌ 异常' ELSE '✅ 正常' END,
  COUNT(*)
FROM (
  SELECT data->>'taskId' FROM platform_schema.job_common 
  WHERE created_on > NOW() - INTERVAL '24 hours'
  GROUP BY data->>'taskId' HAVING COUNT(*) > 1
) t;

故障排查指南

症状 1任务无限循环

现象:同一任务反复执行,日志显示不断 "Processing job"

排查步骤

  1. 检查重复队列定义

    SELECT name, COUNT(*) FROM platform_schema.queue GROUP BY name HAVING COUNT(*) > 1;
    
  2. 检查同一 taskId 的 Job 数量

    SELECT data->>'taskId', COUNT(*) FROM platform_schema.job_common 
    WHERE name = 'rvw_review_task' GROUP BY data->>'taskId' HAVING COUNT(*) > 1;
    
  3. 检查任务状态

    SELECT id, name, state, data->>'taskId', retry_count, created_on 
    FROM platform_schema.job_common 
    WHERE data->>'taskId' = '问题taskId' ORDER BY created_on;
    

修复:清理重复队列定义 + 更新任务状态


症状 2任务卡住不执行

现象:任务状态一直是 created,不变成 active

排查步骤

  1. 检查 Worker 是否注册

    # 查看 SAE 日志
    grep "Worker registered" /logs/app.log
    
  2. 检查 pg-boss 连接

    SELECT * FROM pg_stat_activity WHERE application_name = 'aiclinical-queue';
    
  3. 重启后端服务


症状 3任务状态不一致

现象pg-boss 显示 completed但业务表显示 pending

排查步骤

  1. 对比两边状态

    -- pg-boss 状态
    SELECT id, state, data->>'taskId' as task_id, completed_on 
    FROM platform_schema.job_common WHERE data->>'taskId' = 'xxx';
    
    -- 业务表状态
    SELECT id, status, "completedAt" FROM rvw_schema."ReviewTask" WHERE id = 'xxx';
    
  2. 手动同步状态(如确认已完成)

    UPDATE rvw_schema."ReviewTask" 
    SET status = 'completed', "completedAt" = NOW() 
    WHERE id = 'xxx';
    

清理操作

清理重复队列定义

-- 删除重复的队列定义,保留最新的一个
DELETE FROM platform_schema.queue a
USING platform_schema.queue b
WHERE a.name = b.name 
  AND a.created_on < b.created_on;

-- 验证
SELECT name, COUNT(*) FROM platform_schema.queue GROUP BY name;

清理卡住的任务

-- 将卡住的 active 任务标记为 failed
UPDATE platform_schema.job_common 
SET state = 'failed', completed_on = NOW()
WHERE state = 'active' 
  AND started_on < NOW() - INTERVAL '1 hour';

清理历史完成任务(释放空间)

-- 删除 7 天前的已完成任务(谨慎操作)
DELETE FROM platform_schema.job_common 
WHERE state = 'completed' 
  AND completed_on < NOW() - INTERVAL '7 days';

预防措施

代码层面(四层防御)

层级 措施 代码位置
前端 isSubmitting 防重复点击 Dashboard.tsx
API updateMany 乐观锁 reviewService.ts
队列 singletonKey 去重 PgBossQueue.ts
Worker 状态检查跳过已完成 reviewWorker.ts

运维层面

  1. 每日巡检:执行健康检查 SQL
  2. 部署后检查:确认队列定义无重复
  3. 告警配置:设置队列异常告警(待实现)

代码审查要点

  • 新增队列时,确保使用 singletonKey
  • Worker 处理前检查业务状态
  • 避免在 push() 中调用 createQueue()
  • 前端异步操作添加 loading 状态

📝 连接信息

# RDS 测试环境(仅供运维使用)
# 外网访问需先开启白名单
psql "postgresql://airesearch:Xibahe%40fengzhibo117@pgm-2zex1m2y3r23hdn5xo.pg.rds.aliyuncs.com:5432/ai_clinical_research_test"

# 通过本地 Docker 连接 RDS
docker exec ai-clinical-postgres psql "postgresql://airesearch:Xibahe%40fengzhibo117@pgm-2zex1m2y3r23hdn5xo.pg.rds.aliyuncs.com:5432/ai_clinical_research_test" -c "SQL语句"

📚 相关文档