fix(backend): Resolve PgBoss infinite loop issue and cleanup unused files
Backend fixes: - Fix PgBoss task infinite loop on SAE (root cause: missing queue table constraints) - Add singletonKey to prevent duplicate job enqueueing - Add idempotency check in reviewWorker (skip completed tasks) - Add optimistic locking in reviewService (atomic status update) Frontend fixes: - Add isSubmitting state to prevent duplicate submissions in RVW Dashboard - Fix API baseURL in knowledgeBaseApi (relative path) Cleanup (removed): - Old frontend/ directory (migrated to frontend-v2) - python-microservice/ (unused, replaced by extraction_service) - Root package.json and node_modules (accidentally created) - redcap-docker-dev/ (external dependency) - Various temporary files and outdated docs in root New documentation: - docs/07-运维文档/01-PgBoss队列监控与维护.md - docs/07-运维文档/02-故障预防检查清单.md - docs/07-运维文档/03-数据库迁移注意事项.md Database fix applied to RDS: - Added PRIMARY KEY to platform_schema.queue - Added 3 missing foreign key constraints Tested: Local build passed, RDS constraints verified
This commit is contained in:
Binary file not shown.
@@ -50,7 +50,7 @@ export class PgBossQueue implements JobQueue {
|
||||
this.boss = new PgBoss({
|
||||
connectionString,
|
||||
schema, // 使用platform_schema
|
||||
max: 4, // 🛡️ 限制连接数,避免挤占 Prisma 连接配额(RDS 限制 100)
|
||||
max: 5, // 🛡️ 限制连接数,避免挤占 Prisma 连接配额(RDS 限制 100)
|
||||
application_name: 'aiclinical-queue',
|
||||
|
||||
// 调度配置
|
||||
@@ -143,21 +143,13 @@ export class PgBossQueue implements JobQueue {
|
||||
// 存储元数据到缓存
|
||||
this.jobs.set(jobId, job)
|
||||
|
||||
// 确保队列存在(幂等操作)
|
||||
try {
|
||||
await this.boss.createQueue(type, {
|
||||
retryLimit: 3,
|
||||
retryDelay: 60,
|
||||
expireInSeconds: 6 * 60 * 60 // 6小时
|
||||
});
|
||||
} catch (error: any) {
|
||||
// 队列已存在时会报错,忽略
|
||||
if (!error.message?.includes('already exists')) {
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
// 📝 注意:队列已在 registerBossHandler() 中创建,这里不再重复创建
|
||||
// 避免每次 push 都尝试 createQueue 导致重复定义
|
||||
|
||||
// 发送任务到pg-boss
|
||||
// ✅ 使用 singletonKey 防止同一任务被重复入队
|
||||
const singletonKey = (data as any).taskId || jobId
|
||||
|
||||
const bossJobId = await this.boss.send(type, {
|
||||
...data,
|
||||
__jobId: jobId, // 嵌入我们的jobId
|
||||
@@ -165,7 +157,9 @@ export class PgBossQueue implements JobQueue {
|
||||
}, {
|
||||
retryLimit: 3,
|
||||
retryDelay: 60,
|
||||
expireInSeconds: 6 * 60 * 60 // 6小时过期(更适合长批次任务)
|
||||
expireInSeconds: 6 * 60 * 60, // 6小时过期(更适合长批次任务)
|
||||
singletonKey, // ✅ 防止同一任务重复入队
|
||||
singletonSeconds: 3600, // 1小时内不允许重复
|
||||
})
|
||||
|
||||
console.log(`[PgBossQueue] Job pushed: ${jobId} -> pg-boss:${bossJobId} (type: ${type})`)
|
||||
@@ -222,7 +216,7 @@ export class PgBossQueue implements JobQueue {
|
||||
|
||||
await this.boss.work<Record<string, any>>(type, {
|
||||
batchSize: 1, // 每次处理1个任务
|
||||
pollingIntervalSeconds: 1 // 每秒轮询一次
|
||||
pollingIntervalSeconds: 2 // 每2秒轮询一次(降低频率避免竞态)
|
||||
}, async (bossJobs) => {
|
||||
// pg-boss的work handler接收的是Job数组
|
||||
const bossJob = bossJobs[0]
|
||||
@@ -230,6 +224,7 @@ export class PgBossQueue implements JobQueue {
|
||||
|
||||
const { __jobId, __createdAt, ...data } = bossJob.data
|
||||
const jobId = __jobId || randomUUID()
|
||||
const bossJobId = bossJob.id // pg-boss 自己的 job ID
|
||||
|
||||
// 获取或创建Job对象
|
||||
let job = this.jobs.get(jobId)
|
||||
@@ -251,18 +246,33 @@ export class PgBossQueue implements JobQueue {
|
||||
job.updatedAt = new Date()
|
||||
}
|
||||
|
||||
console.log(`[PgBossQueue] Processing job: ${jobId} (type: ${type})`)
|
||||
// ✅ 检查:是否已经处理过这个 job(防止重复处理)
|
||||
const existingJob = this.jobs.get(jobId)
|
||||
if (existingJob && existingJob.status === 'completed') {
|
||||
console.warn(`[PgBossQueue] ⚠️ Job already completed, skipping: ${jobId} (pg-boss: ${bossJobId})`)
|
||||
return // 跳过已完成的任务
|
||||
}
|
||||
|
||||
console.log(`[PgBossQueue] Processing job: ${jobId} (pg-boss: ${bossJobId}, type: ${type})`)
|
||||
|
||||
try {
|
||||
// 执行用户提供的处理函数
|
||||
const result = await handler(job)
|
||||
|
||||
// 标记为完成
|
||||
// 标记为完成(更新内存缓存)
|
||||
await this.completeJob(jobId, result)
|
||||
|
||||
return result
|
||||
// ✅ 重要:pg-boss 12.x work handler 返回 void
|
||||
// pg-boss 会在 handler 成功返回后自动将任务标记为 completed
|
||||
console.log(`[PgBossQueue] ✅ Job handler finished successfully: ${jobId} (pg-boss: ${bossJobId})`)
|
||||
|
||||
// 打印最终状态
|
||||
const finalJob = this.jobs.get(jobId)
|
||||
console.log(`[PgBossQueue] Job final status: ${finalJob?.status || 'unknown'}`)
|
||||
} catch (error: any) {
|
||||
// 标记为失败
|
||||
console.error(`[PgBossQueue] ❌ Job handler failed: ${jobId} (pg-boss: ${bossJobId})`, error.message)
|
||||
|
||||
// 标记为失败(更新内存缓存)
|
||||
await this.failJob(jobId, error.message || String(error))
|
||||
|
||||
// 抛出错误让pg-boss处理重试
|
||||
|
||||
@@ -62,4 +62,4 @@ export const storage = StorageFactory.getInstance()
|
||||
* - 系统Logo
|
||||
* - RAG引用的图片
|
||||
*/
|
||||
export const staticStorage = StorageFactory.getStaticInstance()
|
||||
export const staticStorage = StorageFactory.getStaticInstance()
|
||||
@@ -156,9 +156,14 @@ export async function runReview(params: RunReviewParams): Promise<{ jobId: strin
|
||||
throw new Error('文档尚未提取完成,请稍后再试');
|
||||
}
|
||||
|
||||
// 更新任务状态为reviewing
|
||||
await prisma.reviewTask.update({
|
||||
where: { id: taskId },
|
||||
// 🔒 使用原子操作实现幂等性(防止并发请求重复入队)
|
||||
// updateMany 只会更新匹配条件的记录,返回更新数量
|
||||
const updateResult = await prisma.reviewTask.updateMany({
|
||||
where: {
|
||||
id: taskId,
|
||||
// 只有当状态是 pending/completed/failed 时才允许启动
|
||||
status: { in: ['pending', 'completed', 'failed'] }
|
||||
},
|
||||
data: {
|
||||
status: 'reviewing',
|
||||
selectedAgents: agents,
|
||||
@@ -171,6 +176,14 @@ export async function runReview(params: RunReviewParams): Promise<{ jobId: strin
|
||||
},
|
||||
});
|
||||
|
||||
// 🔒 幂等性检查:如果没有更新任何记录,说明任务已经在运行中
|
||||
if (updateResult.count === 0) {
|
||||
logger.warn('[RVW] ⚠️ 任务已在运行中,拒绝重复启动', { taskId, currentStatus: task.status });
|
||||
throw new Error('任务已在运行中,请勿重复提交');
|
||||
}
|
||||
|
||||
logger.info('[RVW] ✅ 任务状态已原子更新为 reviewing', { taskId });
|
||||
|
||||
// ✅ 推送任务到 pg-boss 队列(Platform-Only架构)
|
||||
const job = await jobQueue.push('rvw_review_task', {
|
||||
taskId,
|
||||
|
||||
@@ -63,6 +63,29 @@ export function registerReviewWorker() {
|
||||
console.log(` 文本长度: ${extractedText.length} 字符`);
|
||||
|
||||
try {
|
||||
// ✅ 检查任务是否已经完成(防止重复处理)
|
||||
const existingTask = await prisma.reviewTask.findUnique({
|
||||
where: { id: taskId },
|
||||
select: { status: true, completedAt: true, overallScore: true },
|
||||
});
|
||||
|
||||
if (existingTask?.status === 'completed' && existingTask.completedAt) {
|
||||
logger.warn('[reviewWorker] ⚠️ Task already completed, skipping', {
|
||||
jobId: job.id,
|
||||
taskId,
|
||||
completedAt: existingTask.completedAt,
|
||||
overallScore: existingTask.overallScore,
|
||||
});
|
||||
console.log(`\n⚠️ 任务已完成,跳过重复处理`);
|
||||
console.log(` Task ID: ${taskId}`);
|
||||
console.log(` 完成时间: ${existingTask.completedAt}`);
|
||||
console.log(` 得分: ${existingTask.overallScore}`);
|
||||
return {
|
||||
taskId,
|
||||
skipped: true,
|
||||
reason: 'Task already completed',
|
||||
};
|
||||
}
|
||||
// ========================================
|
||||
// 1. 运行选中的智能体
|
||||
// ========================================
|
||||
|
||||
Reference in New Issue
Block a user