feat(ssa): Implement dual-channel architecture Phase 1-3 (QPER + LLM Agent pipeline)

Completed:
- Phase 1: DB schema (execution_mode + ssa_agent_executions), ModeToggle component, Session PATCH API
- Phase 2: AgentPlannerService + AgentCoderService (streaming) + CodeRunnerService + R Docker /execute-code endpoint
- Phase 3: AgentCodePanel (3-step confirmation UI), SSE event handling (7 agent events), streaming code display
- Three-step confirmation pipeline: plan -> user confirm -> stream code -> user confirm -> execute R code -> results
- R Docker sandbox /execute-code endpoint with 120s timeout + block_helpers preloaded
- E2E dual-channel test script (8 tests)
- Updated R engine architecture doc (v1.5) and SSA module status doc (v4.0)

Technical details:
- AgentCoderService uses LLM streaming (chatStream) for real-time code generation feedback
- ReviewerAgent temporarily disabled, prioritizing Plan -> Code -> Execute flow
- CodeRunnerService wraps user code with auto data loading (df variable injection)
- Frontend handles agent_planning, agent_plan_ready, code_generating, code_generated, code_executing, code_result events
- ask_user mechanism used for plan and code confirmation steps

Files: 24 files (4 new services, 2 new components, 1 migration, 1 E2E test, 16 modified)
Made-with: Cursor
This commit is contained in:
2026-03-02 22:23:54 +08:00
parent 71d32d11ee
commit aadceb5cde
24 changed files with 2694 additions and 56 deletions

View File

@@ -0,0 +1,444 @@
/**
* SSA 双通道架构 E2E 测试
*
* 测试 Phase 1~3
* T1. 数据库迁移验证 — execution_mode 字段 + ssa_agent_executions 表
* T2. Session execution mode 切换 API
* T3. R Docker /execute-code 端点
* T4. Agent 模式对话(自由对话 + 分析请求判断)
* T5. AgentPlannerService 规划能力
* T6. AgentCoderService 代码生成能力
* T7. AgentReviewerService 审核能力
* T8. ModeToggle 前端集成点验证API 层面)
*
* 前置条件:
* - PostgreSQL 运行中Docker Desktop
* - R Docker 运行中可选T3 跳过如不可用)
* - DeepSeek API key 配置在 .env
* - 至少有一个 SSA session有上传数据
*
* 运行: npx tsx tests/e2e-dual-channel-test.ts
*/
import { prisma } from '../src/config/database.js';
import { logger } from '../src/common/logging/index.js';
import axios from 'axios';
const R_SERVICE_URL = process.env.R_SERVICE_URL || 'http://localhost:8082';
const BACKEND_URL = process.env.BACKEND_URL || 'http://localhost:3000';
interface TestResult {
id: string;
name: string;
status: 'pass' | 'fail' | 'skip';
duration: number;
message?: string;
}
const results: TestResult[] = [];
async function runTest(
id: string,
name: string,
fn: () => Promise<void>,
): Promise<void> {
const start = Date.now();
try {
await fn();
const dur = Date.now() - start;
results.push({ id, name, status: 'pass', duration: dur });
console.log(`${id} ${name} (${dur}ms)`);
} catch (error: any) {
const dur = Date.now() - start;
if (error.message?.startsWith('SKIP:')) {
results.push({ id, name, status: 'skip', duration: dur, message: error.message });
console.log(` ⏭️ ${id} ${name}${error.message}`);
} else {
results.push({ id, name, status: 'fail', duration: dur, message: error.message });
console.log(`${id} ${name}${error.message}`);
}
}
}
function assert(condition: boolean, message: string): void {
if (!condition) throw new Error(message);
}
// ═══════════════════════════════════════════
// T1: 数据库迁移验证
// ═══════════════════════════════════════════
async function t1_dbMigration() {
// 检查 ssa_sessions 表有 execution_mode 列
const colCheck = await prisma.$queryRaw<any[]>`
SELECT column_name, column_default
FROM information_schema.columns
WHERE table_schema = 'ssa_schema'
AND table_name = 'ssa_sessions'
AND column_name = 'execution_mode'
`;
assert(colCheck.length === 1, 'execution_mode 列不存在于 ssa_sessions 表');
assert(
colCheck[0].column_default?.includes('qper'),
`execution_mode 默认值应为 qper实际为 ${colCheck[0].column_default}`,
);
// 检查 ssa_agent_executions 表存在
const tableCheck = await prisma.$queryRaw<any[]>`
SELECT table_name
FROM information_schema.tables
WHERE table_schema = 'ssa_schema'
AND table_name = 'ssa_agent_executions'
`;
assert(tableCheck.length === 1, 'ssa_agent_executions 表不存在');
// 检查索引
const idxCheck = await prisma.$queryRaw<any[]>`
SELECT indexname
FROM pg_indexes
WHERE schemaname = 'ssa_schema'
AND tablename = 'ssa_agent_executions'
AND indexname = 'idx_ssa_agent_exec_session'
`;
assert(idxCheck.length === 1, 'idx_ssa_agent_exec_session 索引不存在');
}
// ═══════════════════════════════════════════
// T2: Session execution mode CRUD
// ═══════════════════════════════════════════
async function t2_executionModeSwitch() {
// 查找一个现有 session
const session = await prisma.ssaSession.findFirst({
where: { status: 'active' },
orderBy: { createdAt: 'desc' },
});
if (!session) throw new Error('SKIP: 无可用 session请先创建一个 SSA 会话');
// 读取当前 mode应为默认 qper
const current = await prisma.$queryRaw<any[]>`
SELECT execution_mode FROM ssa_schema.ssa_sessions WHERE id = ${session.id}
`;
assert(current.length === 1, 'Session 不存在');
const originalMode = current[0].execution_mode;
console.log(` 当前 mode: ${originalMode}`);
// 切换到 agent
await prisma.$executeRaw`
UPDATE ssa_schema.ssa_sessions SET execution_mode = 'agent' WHERE id = ${session.id}
`;
const after = await prisma.$queryRaw<any[]>`
SELECT execution_mode FROM ssa_schema.ssa_sessions WHERE id = ${session.id}
`;
assert(after[0].execution_mode === 'agent', '切换到 agent 失败');
// 切回 qper
await prisma.$executeRaw`
UPDATE ssa_schema.ssa_sessions SET execution_mode = 'qper' WHERE id = ${session.id}
`;
const restored = await prisma.$queryRaw<any[]>`
SELECT execution_mode FROM ssa_schema.ssa_sessions WHERE id = ${session.id}
`;
assert(restored[0].execution_mode === 'qper', '切回 qper 失败');
}
// ═══════════════════════════════════════════
// T3: R Docker /execute-code 端点
// ═══════════════════════════════════════════
async function t3_rExecuteCode() {
// 先检查 R 服务是否可用
try {
await axios.get(`${R_SERVICE_URL}/health`, { timeout: 5000 });
} catch {
throw new Error('SKIP: R Docker 服务不可用');
}
// 执行一段简单的 R 代码
const response = await axios.post(`${R_SERVICE_URL}/api/v1/execute-code`, {
code: `
blocks <- list()
blocks[[1]] <- make_markdown_block("## 测试结果\\n双通道 E2E 测试通过", title = "测试")
blocks[[2]] <- make_kv_block(items = list("状态" = "成功", "时间" = as.character(Sys.time())), title = "概况")
list(status = "success", report_blocks = blocks)
`,
session_id: 'e2e-test',
timeout: 30,
}, { timeout: 35000 });
assert(response.data?.status === 'success', `R 执行状态不是 success: ${response.data?.status}`);
assert(
Array.isArray(response.data?.result?.report_blocks),
'report_blocks 不是数组',
);
assert(
response.data.result.report_blocks.length === 2,
`预期 2 个 block实际 ${response.data.result.report_blocks.length}`,
);
const markdownBlock = response.data.result.report_blocks[0];
assert(markdownBlock.type === 'markdown', `Block 0 类型应为 markdown实际 ${markdownBlock.type}`);
console.log(` R 执行耗时: ${response.data.duration_ms}ms, blocks: ${response.data.result.report_blocks.length}`);
}
// ═══════════════════════════════════════════
// T4: R Docker 错误处理
// ═══════════════════════════════════════════
async function t4_rExecuteCodeError() {
try {
await axios.get(`${R_SERVICE_URL}/health`, { timeout: 5000 });
} catch {
throw new Error('SKIP: R Docker 服务不可用');
}
const response = await axios.post(`${R_SERVICE_URL}/api/v1/execute-code`, {
code: 'stop("这是一个故意的错误")',
session_id: 'e2e-test-error',
timeout: 10,
}, { timeout: 15000 });
assert(response.data?.status === 'error', '错误代码应返回 error 状态');
assert(
response.data?.message?.includes('故意的错误'),
`错误消息应包含原始错误: ${response.data?.message}`,
);
console.log(` 错误捕获正确: "${response.data.message}"`);
}
// ═══════════════════════════════════════════
// T5: AgentPlannerService 单元测试
// ═══════════════════════════════════════════
async function t5_agentPlanner() {
const { agentPlannerService } = await import('../src/modules/ssa/services/AgentPlannerService.js');
// 查找有数据的 session
const session = await prisma.ssaSession.findFirst({
where: { status: 'active', dataOssKey: { not: null } },
orderBy: { createdAt: 'desc' },
});
if (!session) throw new Error('SKIP: 无有数据的 session');
const plan = await agentPlannerService.generatePlan(
session.id,
'帮我做一个基线特征表,比较两组的差异',
[],
);
assert(!!plan.title, '计划标题为空');
assert(plan.steps.length > 0, '计划步骤为空');
assert(!!plan.rawText, 'rawText 为空');
console.log(` 计划标题: ${plan.title}`);
console.log(` 设计类型: ${plan.designType}`);
console.log(` 步骤数: ${plan.steps.length}`);
plan.steps.forEach(s => console.log(` ${s.order}. ${s.method}: ${s.description}`));
}
// ═══════════════════════════════════════════
// T6: AgentCoderService 单元测试
// ═══════════════════════════════════════════
async function t6_agentCoder() {
const { agentCoderService } = await import('../src/modules/ssa/services/AgentCoderService.js');
const session = await prisma.ssaSession.findFirst({
where: { status: 'active', dataOssKey: { not: null } },
orderBy: { createdAt: 'desc' },
});
if (!session) throw new Error('SKIP: 无有数据的 session');
const mockPlan = {
title: '基线特征表分析',
designType: '横断面研究',
variables: {
outcome: [],
predictors: [],
grouping: 'group',
confounders: [],
},
steps: [
{ order: 1, method: '描述性统计', description: '生成基线特征表', rationale: '了解数据分布' },
],
assumptions: [],
rawText: '基线特征表分析计划',
};
const generated = await agentCoderService.generateCode(session.id, mockPlan);
assert(generated.code.length > 50, `生成代码太短: ${generated.code.length} chars`);
assert(generated.code.includes('load_input_data') || generated.code.includes('df'), '代码中未包含数据加载');
console.log(` 代码长度: ${generated.code.length} chars`);
console.log(` 依赖包: ${generated.requiredPackages.join(', ') || '(无额外依赖)'}`);
console.log(` 代码前 100 字符: ${generated.code.slice(0, 100).replace(/\n/g, ' ')}...`);
}
// ═══════════════════════════════════════════
// T7: AgentReviewerService 单元测试
// ═══════════════════════════════════════════
async function t7_agentReviewer() {
const { agentReviewerService } = await import('../src/modules/ssa/services/AgentReviewerService.js');
const safePlan = {
title: '基线分析',
designType: '队列研究',
variables: {
outcome: ['death'],
predictors: ['age', 'sex'],
grouping: 'treatment',
confounders: [],
},
steps: [{ order: 1, method: '基线特征表', description: '描述统计', rationale: '基线比较' }],
assumptions: [],
rawText: '',
};
const safeCode = `
df <- load_input_data(input)
library(gtsummary)
tbl <- df %>%
tbl_summary(by = treatment, include = c(age, sex, death)) %>%
add_p()
blocks <- list()
blocks[[1]] <- make_markdown_block("## 基线特征表")
list(status = "success", report_blocks = blocks)
`;
const review = await agentReviewerService.review(safePlan, safeCode);
assert(typeof review.passed === 'boolean', 'passed 应为 boolean');
assert(typeof review.score === 'number', 'score 应为 number');
assert(Array.isArray(review.comments), 'comments 应为 array');
console.log(` 审核通过: ${review.passed}`);
console.log(` 评分: ${review.score}/100`);
console.log(` 问题数: ${review.issues.length}`);
review.comments.forEach(c => console.log(` - ${c}`));
// 测试危险代码审核
const dangerCode = `
install.packages("hacker_pkg")
system("rm -rf /")
df <- load_input_data(input)
list(status = "success", report_blocks = list())
`;
const dangerReview = await agentReviewerService.review(safePlan, dangerCode);
console.log(` 危险代码审核通过: ${dangerReview.passed} (预期 false)`);
console.log(` 危险代码问题数: ${dangerReview.issues.length}`);
if (dangerReview.passed) {
console.log(' ⚠️ 警告: 危险代码未被拦截Prompt 需要加强');
}
}
// ═══════════════════════════════════════════
// T8: Agent Execution 记录 CRUD
// ═══════════════════════════════════════════
async function t8_agentExecutionCrud() {
const session = await prisma.ssaSession.findFirst({
where: { status: 'active' },
orderBy: { createdAt: 'desc' },
});
if (!session) throw new Error('SKIP: 无可用 session');
// 创建
const exec = await (prisma as any).ssaAgentExecution.create({
data: {
sessionId: session.id,
query: 'E2E 测试查询',
status: 'pending',
},
});
assert(!!exec.id, '创建执行记录失败');
console.log(` 创建记录: ${exec.id}`);
// 更新
await (prisma as any).ssaAgentExecution.update({
where: { id: exec.id },
data: {
status: 'completed',
planText: '测试计划',
generatedCode: 'print("hello")',
durationMs: 1234,
},
});
const updated = await (prisma as any).ssaAgentExecution.findUnique({
where: { id: exec.id },
});
assert(updated.status === 'completed', `状态应为 completed实际 ${updated.status}`);
assert(updated.durationMs === 1234, `耗时应为 1234实际 ${updated.durationMs}`);
// 删除(清理)
await (prisma as any).ssaAgentExecution.delete({ where: { id: exec.id } });
console.log(' CRUD 全流程通过');
}
// ═══════════════════════════════════════════
// Main
// ═══════════════════════════════════════════
async function main() {
console.log('\n╔══════════════════════════════════════════════╗');
console.log('║ SSA 双通道架构 E2E 测试 (Phase 1~3) ║');
console.log('╚══════════════════════════════════════════════╝\n');
console.log('📦 Phase 1: 基础设施');
await runTest('T1', '数据库迁移验证execution_mode + agent_executions', t1_dbMigration);
await runTest('T2', 'Session execution mode 切换', t2_executionModeSwitch);
await runTest('T3', 'R Docker /execute-code 正常执行', t3_rExecuteCode);
await runTest('T4', 'R Docker /execute-code 错误处理', t4_rExecuteCodeError);
console.log('\n🤖 Phase 2: Agent 服务');
await runTest('T5', 'AgentPlannerService 规划能力', t5_agentPlanner);
await runTest('T6', 'AgentCoderService R 代码生成', t6_agentCoder);
await runTest('T7', 'AgentReviewerService 审核能力', t7_agentReviewer);
await runTest('T8', 'Agent Execution 记录 CRUD', t8_agentExecutionCrud);
// 汇总
console.log('\n' + '═'.repeat(50));
const passed = results.filter(r => r.status === 'pass').length;
const failed = results.filter(r => r.status === 'fail').length;
const skipped = results.filter(r => r.status === 'skip').length;
const totalMs = results.reduce((s, r) => s + r.duration, 0);
console.log(`\n📊 测试结果: ${passed} 通过 / ${failed} 失败 / ${skipped} 跳过 (总耗时 ${totalMs}ms)`);
if (failed > 0) {
console.log('\n❌ 失败详情:');
results.filter(r => r.status === 'fail').forEach(r => {
console.log(` ${r.id} ${r.name}: ${r.message}`);
});
}
if (skipped > 0) {
console.log('\n⏭ 跳过详情:');
results.filter(r => r.status === 'skip').forEach(r => {
console.log(` ${r.id} ${r.name}: ${r.message}`);
});
}
console.log('\n' + (failed === 0 ? '🎉 所有测试通过!' : '⚠️ 有测试失败,请检查。'));
await prisma.$disconnect();
process.exit(failed > 0 ? 1 : 0);
}
main().catch(async (err) => {
console.error('测试执行异常:', err);
await prisma.$disconnect();
process.exit(1);
});