/** * SSA 双通道架构 E2E 测试 * * 测试 Phase 1~3: * T1. 数据库迁移验证 — execution_mode 字段 + ssa_agent_executions 表 * T2. Session execution mode 切换 API * T3. R Docker /execute-code 端点 * T4. Agent 模式对话(自由对话 + 分析请求判断) * T5. AgentPlannerService 规划能力 * T6. AgentCoderService 代码生成能力 * T7. AgentReviewerService 审核能力 * T8. ModeToggle 前端集成点验证(API 层面) * * 前置条件: * - PostgreSQL 运行中(Docker Desktop) * - R Docker 运行中(可选,T3 跳过如不可用) * - DeepSeek API key 配置在 .env * - 至少有一个 SSA session(有上传数据) * * 运行: npx tsx tests/e2e-dual-channel-test.ts */ import { prisma } from '../src/config/database.js'; import { logger } from '../src/common/logging/index.js'; import axios from 'axios'; const R_SERVICE_URL = process.env.R_SERVICE_URL || 'http://localhost:8082'; const BACKEND_URL = process.env.BACKEND_URL || 'http://localhost:3000'; interface TestResult { id: string; name: string; status: 'pass' | 'fail' | 'skip'; duration: number; message?: string; } const results: TestResult[] = []; async function runTest( id: string, name: string, fn: () => Promise, ): Promise { const start = Date.now(); try { await fn(); const dur = Date.now() - start; results.push({ id, name, status: 'pass', duration: dur }); console.log(` ✅ ${id} ${name} (${dur}ms)`); } catch (error: any) { const dur = Date.now() - start; if (error.message?.startsWith('SKIP:')) { results.push({ id, name, status: 'skip', duration: dur, message: error.message }); console.log(` ⏭️ ${id} ${name} — ${error.message}`); } else { results.push({ id, name, status: 'fail', duration: dur, message: error.message }); console.log(` ❌ ${id} ${name} — ${error.message}`); } } } function assert(condition: boolean, message: string): void { if (!condition) throw new Error(message); } // ═══════════════════════════════════════════ // T1: 数据库迁移验证 // ═══════════════════════════════════════════ async function t1_dbMigration() { // 检查 ssa_sessions 表有 execution_mode 列 const colCheck = await prisma.$queryRaw` SELECT column_name, column_default FROM information_schema.columns WHERE table_schema = 'ssa_schema' AND table_name = 'ssa_sessions' AND column_name = 'execution_mode' `; assert(colCheck.length === 1, 'execution_mode 列不存在于 ssa_sessions 表'); assert( colCheck[0].column_default?.includes('qper'), `execution_mode 默认值应为 qper,实际为 ${colCheck[0].column_default}`, ); // 检查 ssa_agent_executions 表存在 const tableCheck = await prisma.$queryRaw` SELECT table_name FROM information_schema.tables WHERE table_schema = 'ssa_schema' AND table_name = 'ssa_agent_executions' `; assert(tableCheck.length === 1, 'ssa_agent_executions 表不存在'); // 检查索引 const idxCheck = await prisma.$queryRaw` SELECT indexname FROM pg_indexes WHERE schemaname = 'ssa_schema' AND tablename = 'ssa_agent_executions' AND indexname = 'idx_ssa_agent_exec_session' `; assert(idxCheck.length === 1, 'idx_ssa_agent_exec_session 索引不存在'); } // ═══════════════════════════════════════════ // T2: Session execution mode CRUD // ═══════════════════════════════════════════ async function t2_executionModeSwitch() { // 查找一个现有 session const session = await prisma.ssaSession.findFirst({ where: { status: 'active' }, orderBy: { createdAt: 'desc' }, }); if (!session) throw new Error('SKIP: 无可用 session,请先创建一个 SSA 会话'); // 读取当前 mode(应为默认 qper) const current = await prisma.$queryRaw` SELECT execution_mode FROM ssa_schema.ssa_sessions WHERE id = ${session.id} `; assert(current.length === 1, 'Session 不存在'); const originalMode = current[0].execution_mode; console.log(` 当前 mode: ${originalMode}`); // 切换到 agent await prisma.$executeRaw` UPDATE ssa_schema.ssa_sessions SET execution_mode = 'agent' WHERE id = ${session.id} `; const after = await prisma.$queryRaw` SELECT execution_mode FROM ssa_schema.ssa_sessions WHERE id = ${session.id} `; assert(after[0].execution_mode === 'agent', '切换到 agent 失败'); // 切回 qper await prisma.$executeRaw` UPDATE ssa_schema.ssa_sessions SET execution_mode = 'qper' WHERE id = ${session.id} `; const restored = await prisma.$queryRaw` SELECT execution_mode FROM ssa_schema.ssa_sessions WHERE id = ${session.id} `; assert(restored[0].execution_mode === 'qper', '切回 qper 失败'); } // ═══════════════════════════════════════════ // T3: R Docker /execute-code 端点 // ═══════════════════════════════════════════ async function t3_rExecuteCode() { // 先检查 R 服务是否可用 try { await axios.get(`${R_SERVICE_URL}/health`, { timeout: 5000 }); } catch { throw new Error('SKIP: R Docker 服务不可用'); } // 执行一段简单的 R 代码 const response = await axios.post(`${R_SERVICE_URL}/api/v1/execute-code`, { code: ` blocks <- list() blocks[[1]] <- make_markdown_block("## 测试结果\\n双通道 E2E 测试通过", title = "测试") blocks[[2]] <- make_kv_block(items = list("状态" = "成功", "时间" = as.character(Sys.time())), title = "概况") list(status = "success", report_blocks = blocks) `, session_id: 'e2e-test', timeout: 30, }, { timeout: 35000 }); assert(response.data?.status === 'success', `R 执行状态不是 success: ${response.data?.status}`); assert( Array.isArray(response.data?.result?.report_blocks), 'report_blocks 不是数组', ); assert( response.data.result.report_blocks.length === 2, `预期 2 个 block,实际 ${response.data.result.report_blocks.length}`, ); const markdownBlock = response.data.result.report_blocks[0]; assert(markdownBlock.type === 'markdown', `Block 0 类型应为 markdown,实际 ${markdownBlock.type}`); console.log(` R 执行耗时: ${response.data.duration_ms}ms, blocks: ${response.data.result.report_blocks.length}`); } // ═══════════════════════════════════════════ // T4: R Docker 错误处理 // ═══════════════════════════════════════════ async function t4_rExecuteCodeError() { try { await axios.get(`${R_SERVICE_URL}/health`, { timeout: 5000 }); } catch { throw new Error('SKIP: R Docker 服务不可用'); } const response = await axios.post(`${R_SERVICE_URL}/api/v1/execute-code`, { code: 'stop("这是一个故意的错误")', session_id: 'e2e-test-error', timeout: 10, }, { timeout: 15000 }); assert(response.data?.status === 'error', '错误代码应返回 error 状态'); assert( response.data?.message?.includes('故意的错误'), `错误消息应包含原始错误: ${response.data?.message}`, ); console.log(` 错误捕获正确: "${response.data.message}"`); } // ═══════════════════════════════════════════ // T5: AgentPlannerService 单元测试 // ═══════════════════════════════════════════ async function t5_agentPlanner() { const { agentPlannerService } = await import('../src/modules/ssa/services/AgentPlannerService.js'); // 查找有数据的 session const session = await prisma.ssaSession.findFirst({ where: { status: 'active', dataOssKey: { not: null } }, orderBy: { createdAt: 'desc' }, }); if (!session) throw new Error('SKIP: 无有数据的 session'); const plan = await agentPlannerService.generatePlan( session.id, '帮我做一个基线特征表,比较两组的差异', [], ); assert(!!plan.title, '计划标题为空'); assert(plan.steps.length > 0, '计划步骤为空'); assert(!!plan.rawText, 'rawText 为空'); console.log(` 计划标题: ${plan.title}`); console.log(` 设计类型: ${plan.designType}`); console.log(` 步骤数: ${plan.steps.length}`); plan.steps.forEach(s => console.log(` ${s.order}. ${s.method}: ${s.description}`)); } // ═══════════════════════════════════════════ // T6: AgentCoderService 单元测试 // ═══════════════════════════════════════════ async function t6_agentCoder() { const { agentCoderService } = await import('../src/modules/ssa/services/AgentCoderService.js'); const session = await prisma.ssaSession.findFirst({ where: { status: 'active', dataOssKey: { not: null } }, orderBy: { createdAt: 'desc' }, }); if (!session) throw new Error('SKIP: 无有数据的 session'); const mockPlan = { title: '基线特征表分析', designType: '横断面研究', variables: { outcome: [], predictors: [], grouping: 'group', confounders: [], }, steps: [ { order: 1, method: '描述性统计', description: '生成基线特征表', rationale: '了解数据分布' }, ], assumptions: [], rawText: '基线特征表分析计划', }; const generated = await agentCoderService.generateCode(session.id, mockPlan); assert(generated.code.length > 50, `生成代码太短: ${generated.code.length} chars`); assert(generated.code.includes('load_input_data') || generated.code.includes('df'), '代码中未包含数据加载'); console.log(` 代码长度: ${generated.code.length} chars`); console.log(` 依赖包: ${generated.requiredPackages.join(', ') || '(无额外依赖)'}`); console.log(` 代码前 100 字符: ${generated.code.slice(0, 100).replace(/\n/g, ' ')}...`); } // ═══════════════════════════════════════════ // T7: AgentReviewerService 单元测试 // ═══════════════════════════════════════════ async function t7_agentReviewer() { const { agentReviewerService } = await import('../src/modules/ssa/services/AgentReviewerService.js'); const safePlan = { title: '基线分析', designType: '队列研究', variables: { outcome: ['death'], predictors: ['age', 'sex'], grouping: 'treatment', confounders: [], }, steps: [{ order: 1, method: '基线特征表', description: '描述统计', rationale: '基线比较' }], assumptions: [], rawText: '', }; const safeCode = ` df <- load_input_data(input) library(gtsummary) tbl <- df %>% tbl_summary(by = treatment, include = c(age, sex, death)) %>% add_p() blocks <- list() blocks[[1]] <- make_markdown_block("## 基线特征表") list(status = "success", report_blocks = blocks) `; const review = await agentReviewerService.review(safePlan, safeCode); assert(typeof review.passed === 'boolean', 'passed 应为 boolean'); assert(typeof review.score === 'number', 'score 应为 number'); assert(Array.isArray(review.comments), 'comments 应为 array'); console.log(` 审核通过: ${review.passed}`); console.log(` 评分: ${review.score}/100`); console.log(` 问题数: ${review.issues.length}`); review.comments.forEach(c => console.log(` - ${c}`)); // 测试危险代码审核 const dangerCode = ` install.packages("hacker_pkg") system("rm -rf /") df <- load_input_data(input) list(status = "success", report_blocks = list()) `; const dangerReview = await agentReviewerService.review(safePlan, dangerCode); console.log(` 危险代码审核通过: ${dangerReview.passed} (预期 false)`); console.log(` 危险代码问题数: ${dangerReview.issues.length}`); if (dangerReview.passed) { console.log(' ⚠️ 警告: 危险代码未被拦截,Prompt 需要加强'); } } // ═══════════════════════════════════════════ // T8: Agent Execution 记录 CRUD // ═══════════════════════════════════════════ async function t8_agentExecutionCrud() { const session = await prisma.ssaSession.findFirst({ where: { status: 'active' }, orderBy: { createdAt: 'desc' }, }); if (!session) throw new Error('SKIP: 无可用 session'); // 创建 const exec = await (prisma as any).ssaAgentExecution.create({ data: { sessionId: session.id, query: 'E2E 测试查询', status: 'pending', }, }); assert(!!exec.id, '创建执行记录失败'); console.log(` 创建记录: ${exec.id}`); // 更新 await (prisma as any).ssaAgentExecution.update({ where: { id: exec.id }, data: { status: 'completed', planText: '测试计划', generatedCode: 'print("hello")', durationMs: 1234, }, }); const updated = await (prisma as any).ssaAgentExecution.findUnique({ where: { id: exec.id }, }); assert(updated.status === 'completed', `状态应为 completed,实际 ${updated.status}`); assert(updated.durationMs === 1234, `耗时应为 1234,实际 ${updated.durationMs}`); // 删除(清理) await (prisma as any).ssaAgentExecution.delete({ where: { id: exec.id } }); console.log(' CRUD 全流程通过'); } // ═══════════════════════════════════════════ // Main // ═══════════════════════════════════════════ async function main() { console.log('\n╔══════════════════════════════════════════════╗'); console.log('║ SSA 双通道架构 E2E 测试 (Phase 1~3) ║'); console.log('╚══════════════════════════════════════════════╝\n'); console.log('📦 Phase 1: 基础设施'); await runTest('T1', '数据库迁移验证(execution_mode + agent_executions)', t1_dbMigration); await runTest('T2', 'Session execution mode 切换', t2_executionModeSwitch); await runTest('T3', 'R Docker /execute-code 正常执行', t3_rExecuteCode); await runTest('T4', 'R Docker /execute-code 错误处理', t4_rExecuteCodeError); console.log('\n🤖 Phase 2: Agent 服务'); await runTest('T5', 'AgentPlannerService 规划能力', t5_agentPlanner); await runTest('T6', 'AgentCoderService R 代码生成', t6_agentCoder); await runTest('T7', 'AgentReviewerService 审核能力', t7_agentReviewer); await runTest('T8', 'Agent Execution 记录 CRUD', t8_agentExecutionCrud); // 汇总 console.log('\n' + '═'.repeat(50)); const passed = results.filter(r => r.status === 'pass').length; const failed = results.filter(r => r.status === 'fail').length; const skipped = results.filter(r => r.status === 'skip').length; const totalMs = results.reduce((s, r) => s + r.duration, 0); console.log(`\n📊 测试结果: ${passed} 通过 / ${failed} 失败 / ${skipped} 跳过 (总耗时 ${totalMs}ms)`); if (failed > 0) { console.log('\n❌ 失败详情:'); results.filter(r => r.status === 'fail').forEach(r => { console.log(` ${r.id} ${r.name}: ${r.message}`); }); } if (skipped > 0) { console.log('\n⏭️ 跳过详情:'); results.filter(r => r.status === 'skip').forEach(r => { console.log(` ${r.id} ${r.name}: ${r.message}`); }); } console.log('\n' + (failed === 0 ? '🎉 所有测试通过!' : '⚠️ 有测试失败,请检查。')); await prisma.$disconnect(); process.exit(failed > 0 ? 1 : 0); } main().catch(async (err) => { console.error('测试执行异常:', err); await prisma.$disconnect(); process.exit(1); });