/** * QcAggregator — V3.1 异步防抖聚合引擎 * * 职责: * 1. 从 qc_field_status 聚合到 qc_event_status(事件级) * 2. 从 qc_event_status 聚合到 record_summary(记录级) * 3. 提供受试者粒度和全项目粒度两种聚合入口 * * 设计原则: * - 纯 SQL INSERT...ON CONFLICT 一次性聚合,无应用层循环 * - 执行阶段只写 qc_field_status,聚合阶段延迟批量完成 * - 受试者级防抖:singletonKey = aggregate_${projectId}_${recordId} */ import { PrismaClient, Prisma } from '@prisma/client'; import { logger } from '../../../common/logging/index.js'; import { HealthScoreEngine } from './HealthScoreEngine.js'; const prisma = new PrismaClient(); export interface AggregateResult { eventStatusRows: number; recordSummaryRows: number; durationMs: number; } /** * 全项目聚合:field_status → event_status → record_summary * * 适用于 executeBatch 完成后一次性刷新。 */ export async function aggregateDeferred( projectId: string, ): Promise { const start = Date.now(); const eventRows = await aggregateEventStatus(projectId); const recordRows = await aggregateRecordSummary(projectId); // HealthScoreEngine — 仅全项目聚合时触发 try { const hsEngine = new HealthScoreEngine(projectId); const hsResult = await hsEngine.calculate(); logger.info('[QcAggregator] HealthScore refreshed', { projectId, healthScore: hsResult.healthScore, healthGrade: hsResult.healthGrade, }); } catch (err: any) { logger.warn('[QcAggregator] HealthScoreEngine failed (non-fatal)', { projectId, error: err.message, }); } const result: AggregateResult = { eventStatusRows: eventRows, recordSummaryRows: recordRows, durationMs: Date.now() - start, }; logger.info('[QcAggregator] aggregateDeferred done', { projectId, ...result, }); return result; } /** * 单受试者聚合:只重算该 record 下的 event_status 和 record_summary * * 适用于 executeSingle 完成后由 pg-boss 防抖触发。 */ export async function aggregateForRecord( projectId: string, recordId: string, ): Promise { const start = Date.now(); const eventRows = await aggregateEventStatus(projectId, recordId); const recordRows = await aggregateRecordSummary(projectId, recordId); const result: AggregateResult = { eventStatusRows: eventRows, recordSummaryRows: recordRows, durationMs: Date.now() - start, }; logger.info('[QcAggregator] aggregateForRecord done', { projectId, recordId, ...result, }); return result; } // ============================================================ // Step 1: qc_field_status → qc_event_status // ============================================================ async function aggregateEventStatus( projectId: string, recordId?: string, ): Promise { const whereClause = recordId ? Prisma.sql`WHERE fs.project_id = ${projectId} AND fs.record_id = ${recordId}` : Prisma.sql`WHERE fs.project_id = ${projectId}`; const rows: number = await prisma.$executeRaw` INSERT INTO iit_schema.qc_event_status (id, project_id, record_id, event_id, status, fields_total, fields_passed, fields_failed, fields_warning, d1_issues, d2_issues, d3_issues, d5_issues, d6_issues, d7_issues, triggered_by, last_qc_at, created_at, updated_at) SELECT gen_random_uuid(), fs.project_id, fs.record_id, fs.event_id, CASE WHEN COUNT(*) FILTER (WHERE fs.status = 'FAIL') > 0 THEN 'FAIL' WHEN COUNT(*) FILTER (WHERE fs.status = 'WARNING') > 0 THEN 'WARNING' ELSE 'PASS' END, COUNT(*)::int, COUNT(*) FILTER (WHERE fs.status = 'PASS')::int, COUNT(*) FILTER (WHERE fs.status = 'FAIL')::int, COUNT(*) FILTER (WHERE fs.status = 'WARNING')::int, COUNT(*) FILTER (WHERE fs.rule_category = 'D1' AND fs.status = 'FAIL')::int, COUNT(*) FILTER (WHERE fs.rule_category = 'D2' AND fs.status = 'FAIL')::int, COUNT(*) FILTER (WHERE fs.rule_category = 'D3' AND fs.status = 'FAIL')::int, COUNT(*) FILTER (WHERE fs.rule_category = 'D5' AND fs.status = 'FAIL')::int, COUNT(*) FILTER (WHERE fs.rule_category = 'D6' AND fs.status = 'FAIL')::int, COUNT(*) FILTER (WHERE fs.rule_category = 'D7' AND fs.status = 'FAIL')::int, 'aggregation', NOW(), NOW(), NOW() FROM iit_schema.qc_field_status fs ${whereClause} GROUP BY fs.project_id, fs.record_id, fs.event_id ON CONFLICT (project_id, record_id, event_id) DO UPDATE SET status = EXCLUDED.status, fields_total = EXCLUDED.fields_total, fields_passed = EXCLUDED.fields_passed, fields_failed = EXCLUDED.fields_failed, fields_warning = EXCLUDED.fields_warning, d1_issues = EXCLUDED.d1_issues, d2_issues = EXCLUDED.d2_issues, d3_issues = EXCLUDED.d3_issues, d5_issues = EXCLUDED.d5_issues, d6_issues = EXCLUDED.d6_issues, d7_issues = EXCLUDED.d7_issues, last_qc_at = NOW(), updated_at = NOW() `; return rows; } // ============================================================ // Step 2: qc_event_status → record_summary // ============================================================ async function aggregateRecordSummary( projectId: string, recordId?: string, ): Promise { const whereClause = recordId ? Prisma.sql`WHERE es.project_id = ${projectId} AND es.record_id = ${recordId}` : Prisma.sql`WHERE es.project_id = ${projectId}`; const rows: number = await prisma.$executeRaw` INSERT INTO iit_schema.record_summary ( id, project_id, record_id, last_updated_at, updated_at, events_total, events_passed, events_failed, events_warning, fields_total, fields_passed, fields_failed, d1_issues, d2_issues, d3_issues, d5_issues, d6_issues, d7_issues, top_issues, latest_qc_status, latest_qc_at ) SELECT gen_random_uuid(), agg.project_id, agg.record_id, agg.last_qc_at, NOW(), agg.events_total, agg.events_passed, agg.events_failed, agg.events_warning, agg.fields_total, agg.fields_passed, agg.fields_failed, agg.d1_issues, agg.d2_issues, agg.d3_issues, agg.d5_issues, agg.d6_issues, agg.d7_issues, agg.top_issues, agg.worst_status, agg.last_qc_at FROM ( SELECT es.project_id, es.record_id, COUNT(*)::int AS events_total, COUNT(*) FILTER (WHERE es.status = 'PASS')::int AS events_passed, COUNT(*) FILTER (WHERE es.status = 'FAIL')::int AS events_failed, COUNT(*) FILTER (WHERE es.status = 'WARNING')::int AS events_warning, COALESCE(SUM(es.fields_total), 0)::int AS fields_total, COALESCE(SUM(es.fields_passed), 0)::int AS fields_passed, COALESCE(SUM(es.fields_failed), 0)::int AS fields_failed, COALESCE(SUM(es.d1_issues), 0)::int AS d1_issues, COALESCE(SUM(es.d2_issues), 0)::int AS d2_issues, COALESCE(SUM(es.d3_issues), 0)::int AS d3_issues, COALESCE(SUM(es.d5_issues), 0)::int AS d5_issues, COALESCE(SUM(es.d6_issues), 0)::int AS d6_issues, COALESCE(SUM(es.d7_issues), 0)::int AS d7_issues, CASE WHEN COUNT(*) FILTER (WHERE es.status = 'FAIL') > 0 THEN 'FAIL' WHEN COUNT(*) FILTER (WHERE es.status = 'WARNING') > 0 THEN 'WARNING' ELSE 'PASS' END AS worst_status, COALESCE( jsonb_agg( jsonb_build_object( 'eventId', es.event_id, 'status', es.status, 'failedFields', es.fields_failed ) ) FILTER (WHERE es.status IN ('FAIL', 'WARNING')), '[]'::jsonb ) AS top_issues, COALESCE(MAX(es.last_qc_at), NOW()) AS last_qc_at FROM iit_schema.qc_event_status es ${whereClause} GROUP BY es.project_id, es.record_id ) agg ON CONFLICT (project_id, record_id) DO UPDATE SET events_total = EXCLUDED.events_total, events_passed = EXCLUDED.events_passed, events_failed = EXCLUDED.events_failed, events_warning = EXCLUDED.events_warning, fields_total = EXCLUDED.fields_total, fields_passed = EXCLUDED.fields_passed, fields_failed = EXCLUDED.fields_failed, d1_issues = EXCLUDED.d1_issues, d2_issues = EXCLUDED.d2_issues, d3_issues = EXCLUDED.d3_issues, d5_issues = EXCLUDED.d5_issues, d6_issues = EXCLUDED.d6_issues, d7_issues = EXCLUDED.d7_issues, top_issues = EXCLUDED.top_issues, latest_qc_status = EXCLUDED.latest_qc_status, latest_qc_at = EXCLUDED.latest_qc_at, last_updated_at = EXCLUDED.latest_qc_at, updated_at = NOW() `; return rows; }