Files
AIclinicalresearch/backend/scripts/test-ssa-qper-e2e.ts
HaHafeng 371e1c069c feat(ssa): Complete QPER architecture - Query, Planner, Execute, Reflection layers
Implement the full QPER intelligent analysis pipeline:

- Phase E+: Block-based standardization for all 7 R tools, DynamicReport renderer, Word export enhancement

- Phase Q: LLM intent parsing with dynamic Zod validation against real column names, ClarificationCard component, DataProfile is_id_like tagging

- Phase P: ConfigLoader with Zod schema validation and hot-reload API, DecisionTableService (4-dimension matching), FlowTemplateService with EPV protection, PlannedTrace audit output

- Phase R: ReflectionService with statistical slot injection, sensitivity analysis conflict rules, ConclusionReport with section reveal animation, conclusion caching API, graceful R error classification

End-to-end test: 40/40 passed across two complete analysis scenarios.

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-02-21 18:15:53 +08:00

664 lines
25 KiB
TypeScript
Raw Permalink Blame History

This file contains invisible Unicode characters
This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
/**
* SSA Q→P→E→R — 完整 QPER 链路端到端集成测试
*
* 测试链路:
* 登录 → 创建会话+上传 CSV → 数据画像
* → Q 层LLM Intent→ P 层Plan
* → E 层R 引擎执行)→ R 层LLM 结论生成)
* → 结论 API 缓存验证
*
* 依赖Node.js 后端 + PostgreSQL + Python extraction_service + R 引擎 + LLM 服务
* 运行方式npx tsx scripts/test-ssa-qper-e2e.ts
*
* 测试数据docs/03-业务模块/SSA-智能统计分析/05-测试文档/test.csv
* 测试用户13800000001 / 123456
*/
import { readFileSync } from 'fs';
import { join, dirname } from 'path';
import { fileURLToPath } from 'url';
const __filename = fileURLToPath(import.meta.url);
const __dirname = dirname(__filename);
const BASE_URL = 'http://localhost:3000';
const TEST_PHONE = '13800000001';
const TEST_PASSWORD = '123456';
const TEST_CSV_PATH = join(__dirname, '../../docs/03-业务模块/SSA-智能统计分析/05-测试文档/test.csv');
// ────────────────────────────────────────────
// 工具函数
// ────────────────────────────────────────────
let passed = 0;
let failed = 0;
let skipped = 0;
let token = '';
let sessionId = '';
function assert(condition: boolean, testName: string, detail?: string) {
if (condition) {
console.log(`${testName}`);
passed++;
} else {
console.log(`${testName}${detail ? `${detail}` : ''}`);
failed++;
}
}
function skip(testName: string, reason: string) {
console.log(` ⏭️ ${testName} — 跳过:${reason}`);
skipped++;
}
function section(title: string) {
console.log(`\n${'─'.repeat(60)}`);
console.log(`📋 ${title}`);
console.log('─'.repeat(60));
}
function authHeaders(contentType?: string): Record<string, string> {
const headers: Record<string, string> = {
'Authorization': `Bearer ${token}`,
};
if (contentType) {
headers['Content-Type'] = contentType;
}
return headers;
}
async function apiPost(path: string, body: any, headers?: Record<string, string>): Promise<any> {
const res = await fetch(`${BASE_URL}${path}`, {
method: 'POST',
headers: headers || authHeaders('application/json'),
body: typeof body === 'string' ? body : JSON.stringify(body),
});
const text = await res.text();
try {
return { status: res.status, data: JSON.parse(text) };
} catch {
return { status: res.status, data: text };
}
}
async function apiGet(path: string): Promise<any> {
const res = await fetch(`${BASE_URL}${path}`, {
method: 'GET',
headers: authHeaders(),
});
const text = await res.text();
try {
return { status: res.status, data: JSON.parse(text) };
} catch {
return { status: res.status, data: text };
}
}
// ────────────────────────────────────────────
// 测试 1: 登录获取 Token
// ────────────────────────────────────────────
async function testLogin(): Promise<boolean> {
section('测试 1: 登录认证');
try {
const res = await apiPost('/api/v1/auth/login/password', {
phone: TEST_PHONE,
password: TEST_PASSWORD,
}, { 'Content-Type': 'application/json' });
assert(res.status === 200, `登录返回 200实际 ${res.status}`);
if (res.status === 200 && res.data) {
token = res.data?.data?.tokens?.accessToken || res.data?.accessToken || res.data?.token || '';
assert(token.length > 0, '获取到 JWT Token', `token 长度: ${token.length}`);
}
} catch (e: any) {
assert(false, '登录请求失败', e.message);
}
if (!token) {
console.log('\n ⚠️ Token 获取失败,后续测试无法继续');
return false;
}
return true;
}
// ────────────────────────────────────────────
// 测试 2: 创建会话 + 上传 test.csv
// ────────────────────────────────────────────
async function testCreateSession(): Promise<boolean> {
section('测试 2: 创建会话 + 上传 test.csv');
try {
const csvBuffer = readFileSync(TEST_CSV_PATH);
assert(csvBuffer.length > 0, `test.csv 读取成功(${csvBuffer.length} bytes`);
const formData = new FormData();
const blob = new Blob([csvBuffer], { type: 'text/csv' });
formData.append('file', blob, 'test.csv');
const res = await fetch(`${BASE_URL}/api/v1/ssa/sessions/`, {
method: 'POST',
headers: { 'Authorization': `Bearer ${token}` },
body: formData,
});
const data = await res.json();
assert(res.status === 200, `创建会话返回 200实际 ${res.status}`);
if (data.sessionId) {
sessionId = data.sessionId;
assert(true, `会话 ID: ${sessionId}`);
} else {
assert(false, '未返回 sessionId');
}
if (data.schema) {
assert(data.schema.columns?.length > 0, `Schema 解析成功(${data.schema.columns?.length} 列, ${data.schema.rowCount} 行)`);
}
} catch (e: any) {
assert(false, '创建会话失败', e.message);
}
return !!sessionId;
}
// ────────────────────────────────────────────
// 测试 3: 数据画像Python DataProfiler
// ────────────────────────────────────────────
async function testDataProfile() {
section('测试 3: 数据画像Python DataProfiler');
try {
const res = await apiPost('/api/v1/ssa/workflow/profile', { sessionId });
assert(res.status === 200, `画像请求返回 200实际 ${res.status}`);
if (res.data?.success) {
const profile = res.data.profile;
assert(!!profile, '画像数据非空');
if (profile) {
const rows = profile.row_count || profile.totalRows || 0;
const cols = profile.column_count || profile.totalColumns || 0;
assert(rows > 0, `行数: ${rows}`);
assert(cols > 0, `列数: ${cols}`);
}
} else {
assert(false, '画像生成失败', res.data?.error);
}
} catch (e: any) {
assert(false, '画像请求异常', e.message);
}
}
// ────────────────────────────────────────────
// 测试 4: Q 层 — LLM 意图解析
// ────────────────────────────────────────────
async function testQLayer(): Promise<string | null> {
section('测试 4: Q 层 — LLM 意图理解');
const query = '比较 sex 不同组的 Yqol 有没有差别';
console.log(` Query: "${query}"`);
try {
const start = Date.now();
const res = await apiPost('/api/v1/ssa/workflow/intent', {
sessionId,
userQuery: query,
});
const elapsed = Date.now() - start;
assert(res.status === 200, `返回 200实际 ${res.status}`);
if (res.data?.success && res.data.intent) {
const intent = res.data.intent;
console.log(` 耗时: ${elapsed}ms`);
console.log(` Goal: ${intent.goal}, Confidence: ${intent.confidence}`);
console.log(` Y: ${intent.outcome_var}, X: ${JSON.stringify(intent.predictor_vars)}`);
console.log(` Design: ${intent.design}, needsClarification: ${intent.needsClarification}`);
assert(intent.goal === 'comparison', `Goal = comparison实际 ${intent.goal}`);
assert(intent.confidence >= 0.7, `高置信度 >= 0.7(实际 ${intent.confidence}`);
assert(!intent.needsClarification, '无需追问');
return intent.goal;
} else {
assert(false, 'Intent 解析失败', res.data?.error);
}
} catch (e: any) {
assert(false, 'Q 层请求异常', e.message);
}
return null;
}
// ────────────────────────────────────────────
// 测试 5: P 层 — 工作流规划
// ────────────────────────────────────────────
let workflowId = '';
async function testPLayer(): Promise<boolean> {
section('测试 5: P 层 — 工作流规划');
const query = '比较 sex 不同组的 Yqol 有没有差别';
console.log(` Query: "${query}"`);
try {
const start = Date.now();
const res = await apiPost('/api/v1/ssa/workflow/plan', {
sessionId,
userQuery: query,
});
const elapsed = Date.now() - start;
assert(res.status === 200, `返回 200实际 ${res.status}`);
if (res.data?.success && res.data.plan) {
const plan = res.data.plan;
console.log(` 耗时: ${elapsed}ms`);
console.log(` 标题: ${plan.title}`);
console.log(` 步骤数: ${plan.total_steps}`);
workflowId = plan.workflow_id;
assert(!!workflowId, `Workflow ID: ${workflowId}`);
assert(plan.total_steps >= 2, `步骤数 >= 2实际 ${plan.total_steps}`);
plan.steps?.forEach((step: any, i: number) => {
const sensitivity = step.is_sensitivity ? ' [敏感性]' : '';
const guardrail = step.switch_condition ? ` | 护栏:${step.switch_condition}` : '';
console.log(` 步骤 ${i + 1}: ${step.tool_name} (${step.tool_code})${sensitivity}${guardrail}`);
});
if (plan.planned_trace) {
console.log(` PlannedTrace: Primary=${plan.planned_trace.primaryTool}, Fallback=${plan.planned_trace.fallbackTool || 'null'}`);
assert(!!plan.planned_trace.primaryTool, 'PlannedTrace 包含 primaryTool');
}
return true;
} else {
assert(false, '规划失败', res.data?.error);
}
} catch (e: any) {
assert(false, 'P 层请求异常', e.message);
}
return false;
}
// ────────────────────────────────────────────
// 测试 6: E 层 — R 引擎执行
// ────────────────────────────────────────────
async function testELayer(): Promise<boolean> {
section('测试 6: E 层 — R 引擎执行(含 R 层结论生成)');
if (!workflowId) {
skip('E 层执行', '无 workflowId');
return false;
}
console.log(` Workflow ID: ${workflowId}`);
console.log(` Session ID: ${sessionId}`);
try {
const start = Date.now();
const res = await apiPost(`/api/v1/ssa/workflow/${workflowId}/execute`, {
sessionId,
});
const elapsed = Date.now() - start;
assert(res.status === 200, `返回 200实际 ${res.status}`);
if (res.data?.success && res.data.result) {
const result = res.data.result;
console.log(` 耗时: ${elapsed}ms`);
console.log(` 状态: ${result.status}`);
console.log(` 总步骤: ${result.totalSteps}, 成功: ${result.successSteps}, 完成: ${result.completedSteps}`);
assert(
result.status === 'completed' || result.status === 'partial',
`执行状态正常(${result.status}`,
result.status === 'error' ? '全部步骤失败' : undefined,
);
assert(result.successSteps > 0, `至少 1 个步骤成功(实际 ${result.successSteps}`);
// 逐步骤检查
if (result.results && Array.isArray(result.results)) {
for (const step of result.results) {
const icon = step.status === 'success' || step.status === 'warning' ? '✅' : '❌';
const pVal = step.result?.p_value != null ? `, P=${step.result.p_value_fmt || step.result.p_value}` : '';
const blocks = step.reportBlocks?.length || 0;
const errMsg = step.error ? ` | 错误: ${step.error.userHint || step.error.message}` : '';
console.log(` ${icon} 步骤 ${step.stepOrder}: ${step.toolName} [${step.status}] (${step.executionMs}ms${pVal}, ${blocks} blocks${errMsg})`);
}
}
// 检查 report_blocks
if (result.reportBlocks && result.reportBlocks.length > 0) {
assert(true, `聚合 reportBlocks: ${result.reportBlocks.length}`);
const types = result.reportBlocks.map((b: any) => b.type);
const uniqueTypes = [...new Set(types)];
console.log(` Block 类型分布: ${uniqueTypes.join(', ')}`);
}
// 检查 R 层结论
if (result.conclusion) {
console.log('\n ── R 层结论验证 ──');
const c = result.conclusion;
assert(!!c.executive_summary, `executive_summary 非空(${c.executive_summary?.length || 0} 字)`);
assert(Array.isArray(c.key_findings) && c.key_findings.length > 0,
`key_findings 非空(${c.key_findings?.length || 0} 条)`);
assert(!!c.statistical_summary, 'statistical_summary 存在');
assert(Array.isArray(c.limitations) && c.limitations.length > 0,
`limitations 非空(${c.limitations?.length || 0} 条)`);
assert(!!c.generated_at, `generated_at: ${c.generated_at}`);
assert(!!c.source, `source: ${c.source}`);
// 打印结论内容摘要
console.log(` 结论来源: ${c.source === 'llm' ? 'AI 智能生成' : '规则引擎'}`);
console.log(` 摘要前 200 字: ${c.executive_summary?.substring(0, 200)}...`);
if (c.key_findings?.length > 0) {
console.log(' 主要发现:');
c.key_findings.slice(0, 3).forEach((f: string, i: number) => {
console.log(` ${i + 1}. ${f.substring(0, 120)}`);
});
}
if (c.statistical_summary) {
console.log(` 统计概览: ${c.statistical_summary.total_tests} 项检验, ${c.statistical_summary.significant_results} 项显著`);
console.log(` 使用方法: ${c.statistical_summary.methods_used?.join(', ')}`);
}
if (c.step_summaries?.length > 0) {
console.log(' 步骤摘要:');
c.step_summaries.forEach((s: any) => {
const sig = s.is_significant ? ' (显著*)' : '';
console.log(` 步骤${s.step_number} ${s.tool_name}: ${s.summary?.substring(0, 100)}${sig}`);
});
}
if (c.limitations?.length > 0) {
console.log(' 局限性:');
c.limitations.slice(0, 3).forEach((l: string, i: number) => {
console.log(` ${i + 1}. ${l.substring(0, 120)}`);
});
}
if (c.recommendations?.length > 0) {
console.log(' 建议:');
c.recommendations.slice(0, 2).forEach((r: string, i: number) => {
console.log(` ${i + 1}. ${r.substring(0, 120)}`);
});
}
// 验证 workflow_id 一致
if (c.workflow_id) {
assert(c.workflow_id === workflowId, `conclusion.workflow_id 与 workflowId 一致`);
}
} else {
assert(false, 'R 层未返回 conclusion');
}
return result.successSteps > 0;
} else {
assert(false, '执行失败', res.data?.error);
}
} catch (e: any) {
assert(false, 'E 层请求异常', e.message);
}
return false;
}
// ────────────────────────────────────────────
// 测试 7: 结论 API 缓存验证
// ────────────────────────────────────────────
async function testConclusionAPI() {
section('测试 7: 结论 API + 缓存验证');
if (!sessionId) {
skip('结论 API', '无 sessionId');
return;
}
try {
const start = Date.now();
const res = await apiGet(`/api/v1/ssa/workflow/sessions/${sessionId}/conclusion`);
const elapsed = Date.now() - start;
assert(res.status === 200, `返回 200实际 ${res.status}`);
if (res.data?.success && res.data.conclusion) {
const c = res.data.conclusion;
console.log(` 耗时: ${elapsed}ms`);
console.log(` 来源: ${res.data.source}`);
assert(!!c.executive_summary, 'executive_summary 非空');
assert(Array.isArray(c.key_findings), 'key_findings 是数组');
assert(!!c.generated_at, `generated_at: ${c.generated_at}`);
// 二次调用验证缓存
console.log('\n ── 缓存验证(二次调用) ──');
const start2 = Date.now();
const res2 = await apiGet(`/api/v1/ssa/workflow/sessions/${sessionId}/conclusion`);
const elapsed2 = Date.now() - start2;
assert(res2.status === 200, '二次调用返回 200');
console.log(` 二次调用耗时: ${elapsed2}ms`);
if (elapsed2 < elapsed && res.data.source === 'cache') {
assert(true, `缓存命中(${elapsed2}ms << ${elapsed}ms`);
} else {
console.log(` 首次 ${elapsed}ms, 二次 ${elapsed2}ms缓存效果取决于实现`);
}
} else if (res.status === 404) {
skip('结论 API', '未找到已完成的 workflow可能是 E 层全部失败)');
} else {
assert(false, '获取结论失败', res.data?.error || JSON.stringify(res.data).substring(0, 200));
}
} catch (e: any) {
assert(false, '结论 API 异常', e.message);
}
}
// ────────────────────────────────────────────
// 测试 8: 第二条链路(相关分析 Q→P→E→R
// ────────────────────────────────────────────
async function testSecondScenario() {
section('测试 8: 第二条完整链路(相关分析 age vs bmi');
const query = '分析 age 和 bmi 的相关性';
console.log(` Query: "${query}"`);
try {
// Q → P: Plan
const planRes = await apiPost('/api/v1/ssa/workflow/plan', {
sessionId,
userQuery: query,
});
assert(planRes.status === 200, 'Plan 返回 200');
if (!planRes.data?.success || !planRes.data.plan) {
assert(false, 'Plan 失败', planRes.data?.error);
return;
}
const plan = planRes.data.plan;
const wfId = plan.workflow_id;
console.log(` Workflow: ${wfId}, 步骤数: ${plan.total_steps}`);
plan.steps?.forEach((s: any, i: number) => {
console.log(` 步骤 ${i + 1}: ${s.tool_name} (${s.tool_code})`);
});
// P → E → R: Execute
const start = Date.now();
const execRes = await apiPost(`/api/v1/ssa/workflow/${wfId}/execute`, { sessionId });
const elapsed = Date.now() - start;
assert(execRes.status === 200, 'Execute 返回 200');
if (execRes.data?.success && execRes.data.result) {
const result = execRes.data.result;
console.log(` 执行耗时: ${elapsed}ms, 状态: ${result.status}, 成功步骤: ${result.successSteps}/${result.totalSteps}`);
assert(result.successSteps > 0, `至少 1 步成功(实际 ${result.successSteps}`);
for (const step of (result.results || [])) {
const icon = step.status === 'success' || step.status === 'warning' ? '✅' : '❌';
const pVal = step.result?.p_value != null ? `, P=${step.result.p_value_fmt || step.result.p_value}` : '';
console.log(` ${icon} 步骤 ${step.stepOrder}: ${step.toolName} [${step.status}] (${step.executionMs}ms${pVal})`);
}
// 验证 R 层结论
if (result.conclusion) {
const c = result.conclusion;
assert(!!c.executive_summary, `R 层结论存在(来源: ${c.source}`);
console.log(` 结论摘要: ${c.executive_summary?.substring(0, 150)}...`);
// 相关分析应该提到相关系数
const mentionsCorrelation =
c.executive_summary?.includes('相关') ||
c.executive_summary?.includes('correlation') ||
c.executive_summary?.includes('r =') ||
c.executive_summary?.includes('r=');
if (mentionsCorrelation) {
assert(true, '结论中提到了相关性分析');
} else {
console.log(' 结论未明确提到"相关"(可能是 fallback 结论)');
}
} else {
skip('R 层结论', '未返回 conclusion');
}
} else {
assert(false, '执行失败', execRes.data?.error);
}
} catch (e: any) {
assert(false, '第二条链路异常', e.message);
}
}
// ────────────────────────────────────────────
// 测试 9: 错误分类验证E_COLUMN_NOT_FOUND 等)
// ────────────────────────────────────────────
async function testErrorClassification() {
section('测试 9: E 层错误分类验证(构造异常查询)');
const query = '比较 NONEXISTENT_GROUP 不同组的 FAKE_OUTCOME';
console.log(` 构造异常 Query: "${query}"`);
console.log(' 此测试验证 LLM 面对不存在的变量名时的行为');
try {
const res = await apiPost('/api/v1/ssa/workflow/intent', {
sessionId,
userQuery: query,
});
if (res.data?.success && res.data.intent) {
const intent = res.data.intent;
console.log(` LLM 返回: goal=${intent.goal}, confidence=${intent.confidence}`);
console.log(` Y=${intent.outcome_var}, X=${JSON.stringify(intent.predictor_vars)}`);
// Zod 动态校验应该拦截不存在的变量名
// 或者 LLM 会给出低置信度
if (intent.confidence < 0.7 || intent.needsClarification) {
assert(true, `LLM 识别到异常confidence=${intent.confidence})或触发追问`);
} else {
console.log(' LLM 未识别到异常变量,可能猜测了现有变量作为替代');
}
} else {
// Intent 解析失败也是可以接受的Zod 拦截了幻觉变量)
console.log(` Intent 解析结果: ${res.data?.error || '失败/降级'}`);
assert(true, '异常输入被处理(未崩溃)');
}
} catch (e: any) {
assert(false, '异常查询处理失败(不应崩溃)', e.message);
}
}
// ────────────────────────────────────────────
// 运行所有测试
// ────────────────────────────────────────────
async function main() {
console.log('\n🧪 SSA QPER — 完整链路端到端集成测试Q→P→E→R\n');
console.log('测试链路:登录 → 上传 CSV → 画像 → Q(Intent) → P(Plan) → E(Execute) → R(Conclusion)');
console.log(`测试用户:${TEST_PHONE}`);
console.log(`后端地址:${BASE_URL}`);
console.log(`测试文件:${TEST_CSV_PATH}\n`);
// 前置检查
try {
readFileSync(TEST_CSV_PATH);
} catch {
console.error('❌ test.csv 文件不存在,请检查路径');
process.exit(1);
}
try {
const health = await fetch(`${BASE_URL}/health`).catch(() => null);
if (!health || health.status !== 200) {
console.error('❌ 后端服务未启动');
process.exit(1);
}
console.log('✅ 后端服务可达');
} catch {
console.error('❌ 后端服务不可达');
process.exit(1);
}
// 顺序执行
const loginOk = await testLogin();
if (!loginOk) { console.log('\n⛔ 登录失败,终止'); process.exit(1); }
const sessionOk = await testCreateSession();
if (!sessionOk) { console.log('\n⛔ 会话创建失败,终止'); process.exit(1); }
await testDataProfile();
const goal = await testQLayer();
if (!goal) { console.log('\n⚠ Q 层失败,继续后续测试...'); }
const planOk = await testPLayer();
if (!planOk) { console.log('\n⚠ P 层失败E/R 层将跳过'); }
const execOk = planOk ? await testELayer() : false;
if (execOk) {
await testConclusionAPI();
} else if (planOk) {
console.log('\n⚠ E 层失败,跳过结论 API 测试');
}
await testSecondScenario();
await testErrorClassification();
// 汇总
console.log(`\n${'═'.repeat(60)}`);
console.log(`📊 测试结果汇总:${passed} 通过 / ${failed} 失败 / ${skipped} 跳过 / ${passed + failed + skipped} 总计`);
if (failed === 0) {
console.log('🎉 全部通过QPER 四层端到端验证成功。');
} else {
console.log(`⚠️ 有 ${failed} 个测试失败,请检查上方输出。`);
}
console.log(`\n📝 测试会话 ID: ${sessionId}`);
console.log('═'.repeat(60));
process.exit(failed > 0 ? 1 : 0);
}
main().catch(e => {
console.error('💥 测试脚本异常:', e);
process.exit(1);
});