52 KiB
52 KiB
SSA-Pro 后端开发指南
文档版本: v1.5
创建日期: 2026-02-18
最后更新: 2026-02-18(纳入专家配置体系 + 决策表匹配 + R代码库)
目标读者: Node.js 后端工程师
1. 模块目录结构(概念显性化)
backend/src/modules/ssa/
├── index.ts # 模块入口,注册路由
├── routes/
│ ├── session.routes.ts # 会话管理路由
│ ├── analysis.routes.ts # 分析执行路由
│ └── consult.routes.ts # 🆕 咨询模式路由
│ └── config.routes.ts # 🆕 配置中台路由
│
├── planner/ # 🆕 Planner 职责(大脑)
│ ├── DataParserService.ts # 数据解析 + Schema 提取
│ ├── DecisionTableService.ts # 🆕 决策表匹配 (Goal,Y,X,Design)
│ ├── ToolRetrievalService.ts # RAG 工具检索(辅助)
│ ├── PlannerService.ts # AI 规划(有数据)
│ ├── ConsultService.ts # 无数据咨询
│ ├── SAPGeneratorService.ts # SAP 文档生成
│ └── CriticService.ts # 结果解读(流式)
│
├── executor/ # 🆕 Executor 职责(四肢)
│ ├── RClientService.ts # R 服务调用
│ └── InterpretationService.ts # 🆕 结果解读(配置模板)
│
├── config/ # 🆕 配置中台
│ ├── DecisionTableLoader.ts # 🆕 统计决策表加载
│ ├── RCodeLibraryService.ts # 🆕 R 代码库管理
│ ├── ParamMappingService.ts # 🆕 参数映射配置
│ ├── GuardrailConfigService.ts # 🆕 护栏规则链
│ ├── ConfigValidatorService.ts # 配置校验
│ └── ConfigCacheService.ts # 配置缓存
│
├── validators/
│ └── planSchema.ts # 📌 Zod Schema 定义
├── dto/
│ ├── CreateSessionDto.ts
│ ├── UploadDataDto.ts
│ └── ExecuteAnalysisDto.ts
└── types/
└── index.ts # 类型定义
1.1 🆕 设计原则(五条核心原则)
| 原则 | 说明 |
|---|---|
| Planner/Executor 分离 | 不要把规划逻辑和执行逻辑混在一个 Class 里 |
| 支持无数据模式 | Planner 可以在没有数据的情况下独立工作 |
| 决策表优先 | 🆕 工具选择优先用决策表匹配,RAG 作为兜底 |
| 配置外置 | 工具定义、护栏规则从 Excel/数据库 加载,不硬编码 |
| 统一入口函数 | 🆕 所有 R 脚本统一 run_analysis() 入口 |
2. 数据库 Schema(Prisma)
// schema.prisma - SSA 模块部分
// 分析会话
model SsaSession {
id String @id @default(uuid())
userId String @map("user_id")
title String?
dataSchema Json? @map("data_schema") // 数据结构(LLM可见)
dataPayload Json? @map("data_payload") // 真实数据(仅R可见)
status String @default("active")
createdAt DateTime @default(now()) @map("created_at")
updatedAt DateTime @updatedAt @map("updated_at")
messages SsaMessage[]
@@map("ssa_sessions")
@@schema("ssa_schema")
}
// 消息记录
model SsaMessage {
id String @id @default(uuid())
sessionId String @map("session_id")
role String // user | assistant | system
contentType String @map("content_type") // text | plan | result
content Json
createdAt DateTime @default(now()) @map("created_at")
session SsaSession @relation(fields: [sessionId], references: [id])
@@map("ssa_messages")
@@schema("ssa_schema")
}
// 工具库
model SsaTool {
id String @id @default(uuid())
toolCode String @unique @map("tool_code")
name String
version String @default("1.0.0")
description String
usageContext String? @map("usage_context")
paramsSchema Json @map("params_schema")
guardrails Json?
searchText String @map("search_text")
embedding Unsupported("vector(1024)")?
isActive Boolean @default(true) @map("is_active")
createdAt DateTime @default(now()) @map("created_at")
updatedAt DateTime @updatedAt @map("updated_at")
@@map("tools_library")
@@schema("ssa_schema")
}
// 执行日志
model SsaExecutionLog {
id String @id @default(uuid())
sessionId String @map("session_id")
messageId String? @map("message_id")
toolCode String @map("tool_code")
inputParams Json @map("input_params")
outputStatus String @map("output_status")
outputResult Json? @map("output_result")
traceLog String[] @map("trace_log")
executionMs Int? @map("execution_ms")
createdAt DateTime @default(now()) @map("created_at")
@@map("execution_logs")
@@schema("ssa_schema")
}
// 🆕 统计决策表
model SsaDecisionTable {
id String @id @default(uuid())
goalType String @map("goal_type") // 分析目标:组间差异、相关性、分布描述
yType String @map("y_type") // 因变量类型:连续、分类、计数
xType String? @map("x_type") // 自变量类型:可选
designType String @map("design_type") // 设计类型:独立、配对、重复测量
toolCode String @map("tool_code") // 推荐工具
altToolCode String? @map("alt_tool_code") // 备选工具(降级)
priority Int @default(0) // 优先级
isActive Boolean @default(true) @map("is_active")
createdAt DateTime @default(now()) @map("created_at")
@@unique([goalType, yType, xType, designType])
@@map("decision_table")
@@schema("ssa_schema")
}
// 🆕 R 代码库
model SsaRCodeLibrary {
id String @id @default(uuid())
toolCode String @map("tool_code") // 关联工具代码
version String @default("1.0.0")
fileName String @map("file_name") // R 脚本文件名
codeContent String @map("code_content") // R 代码内容
entryFunc String @default("run_analysis") @map("entry_func") // 入口函数
description String?
dependencies String[] @default([]) // 依赖包列表
isActive Boolean @default(true) @map("is_active")
createdAt DateTime @default(now()) @map("created_at")
updatedAt DateTime @updatedAt @map("updated_at")
@@map("r_code_library")
@@schema("ssa_schema")
}
// 🆕 参数映射配置
model SsaParamMapping {
id String @id @default(uuid())
toolCode String @map("tool_code")
jsonKey String @map("json_key") // 前端传入的 JSON Key
rParamName String @map("r_param_name") // R 函数参数名
dataType String @map("data_type") // string | number | boolean
isRequired Boolean @default(false) @map("is_required")
defaultValue String? @map("default_value")
validationRule String? @map("validation_rule") // 校验规则
description String?
@@unique([toolCode, jsonKey])
@@map("param_mapping")
@@schema("ssa_schema")
}
// 🆕 护栏规则配置
model SsaGuardrailConfig {
id String @id @default(uuid())
toolCode String @map("tool_code")
checkName String @map("check_name") // 检查名称:正态性、方差齐性
checkOrder Int @default(0) @map("check_order") // 执行顺序
checkCode String @map("check_code") // R 函数名
threshold String? // 阈值条件:p < 0.05
actionType String @map("action_type") // Block | Warn | Switch
actionTarget String? @map("action_target") // Switch 时的目标工具
isEnabled Boolean @default(true) @map("is_enabled")
@@map("guardrail_config")
@@schema("ssa_schema")
}
// 🆕 结果解读模板
model SsaInterpretation {
id String @id @default(uuid())
toolCode String @map("tool_code")
scenarioKey String @map("scenario_key") // 场景:significant | not_significant
template String // 解读模板(含占位符)
placeholders String[] @default([]) // 占位符列表
@@unique([toolCode, scenarioKey])
@@map("interpretation_templates")
@@schema("ssa_schema")
}
3. API 路由设计
3.1 路由注册
// index.ts
import { FastifyInstance } from 'fastify';
import sessionRoutes from './routes/session.routes';
import analysisRoutes from './routes/analysis.routes';
import consultRoutes from './routes/consult.routes'; // 🆕
import configRoutes from './routes/config.routes'; // 🆕
export default async function ssaModule(app: FastifyInstance) {
// 注册认证中间件
app.addHook('preHandler', app.authenticate);
// 注册子路由
app.register(sessionRoutes, { prefix: '/sessions' });
app.register(analysisRoutes, { prefix: '/sessions' });
app.register(consultRoutes, { prefix: '/consult' }); // 🆕 咨询模式
app.register(configRoutes, { prefix: '/config' }); // 🆕 配置中台
}
3.2 会话路由
// routes/session.routes.ts
import { FastifyInstance } from 'fastify';
import { SessionService } from '../services/SessionService';
export default async function sessionRoutes(app: FastifyInstance) {
const sessionService = new SessionService();
// 创建会话
app.post('/', async (req, reply) => {
const userId = req.user.id;
const session = await sessionService.create(userId);
return reply.send(session);
});
// 获取会话列表
app.get('/', async (req, reply) => {
const userId = req.user.id;
const sessions = await sessionService.listByUser(userId);
return reply.send(sessions);
});
// 获取单个会话(含消息历史)
app.get('/:id', async (req, reply) => {
const { id } = req.params as { id: string };
const session = await sessionService.getById(id, req.user.id);
return reply.send(session);
});
// 上传数据
app.post('/:id/upload', async (req, reply) => {
const { id } = req.params as { id: string };
// 解析 Excel/CSV,提取 Schema 和 Data
const result = await sessionService.uploadData(id, req);
return reply.send(result);
});
}
3.3 分析路由
// routes/analysis.routes.ts
import { FastifyInstance } from 'fastify';
import { PlannerService } from '../services/PlannerService';
import { RClientService } from '../services/RClientService';
import { CriticService } from '../services/CriticService';
export default async function analysisRoutes(app: FastifyInstance) {
const plannerService = new PlannerService();
const rClientService = new RClientService();
const criticService = new CriticService();
// 生成分析计划(不执行)
app.post('/:id/plan', async (req, reply) => {
const { id } = req.params as { id: string };
const { query } = req.body as { query: string };
// 1. RAG 检索工具
// 2. LLM 生成计划
const plan = await plannerService.generatePlan(id, query);
return reply.send({
type: 'plan',
plan
});
});
// 确认执行
app.post('/:id/execute', async (req, reply) => {
const { id } = req.params as { id: string };
const { plan } = req.body as { plan: object };
// 1. 调用 R 服务执行
const result = await rClientService.execute(id, plan);
// 2. 保存执行日志
// 3. 保存结果到消息
return reply.send({
type: 'result',
result
});
});
// 获取结果解读(流式)
app.get('/:id/interpret/:messageId', async (req, reply) => {
const { id, messageId } = req.params as { id: string; messageId: string };
// 流式返回 Critic 解读
reply.raw.setHeader('Content-Type', 'text/event-stream');
await criticService.streamInterpret(id, messageId, reply.raw);
});
// 下载代码
app.get('/:id/download-code/:messageId', async (req, reply) => {
const { id, messageId } = req.params as { id: string; messageId: string };
const code = await sessionService.getReproducibleCode(messageId);
reply.header('Content-Type', 'text/plain');
reply.header('Content-Disposition', 'attachment; filename="analysis.R"');
return reply.send(code);
});
}
4. 核心服务实现
4.1 RClientService(调用 R 服务)
// services/RClientService.ts
import axios, { AxiosInstance } from 'axios';
import { prisma } from '@/common/db';
import { logger } from '@/common/logging';
export class RClientService {
private client: AxiosInstance;
constructor() {
this.client = axios.create({
baseURL: process.env.R_SERVICE_URL || 'http://localhost:8080',
timeout: 120000, // 📌 120s 超时(应对复杂计算)
headers: { 'Content-Type': 'application/json' }
});
}
async execute(sessionId: string, plan: {
tool_code: string;
params: Record<string, any>;
guardrails: Record<string, boolean>;
}) {
const startTime = Date.now();
// 1. 获取会话的真实数据
const session = await prisma.ssaSession.findUniqueOrThrow({
where: { id: sessionId }
});
// 🆕 2. 构造 R 服务请求(混合数据协议)
const dataSource = this.buildDataSource(session);
const requestBody = {
data_source: dataSource, // 🆕 统一数据源字段
params: plan.params,
guardrails: plan.guardrails
};
/**
* 🆕 根据数据大小选择传输方式
* - < 2MB: inline JSON
* - >= 2MB: OSS key
*/
private buildDataSource(session: any): { type: string; data?: any; oss_key?: string } {
const payload = session.dataPayload;
const payloadSize = JSON.stringify(payload).length;
const SIZE_THRESHOLD = 2 * 1024 * 1024; // 2MB
if (payloadSize < SIZE_THRESHOLD) {
// 小数据:直接内联
return {
type: 'inline',
data: payload
};
} else {
// 大数据:上传 OSS,传递 key
// 注意:此处假设 session 创建时已上传 OSS
const ossKey = session.dataOssKey || `sessions/${session.id}/data.json`;
return {
type: 'oss',
oss_key: ossKey
};
}
}
// 3. 调用 R 服务
try {
const response = await this.client.post(
`/api/v1/skills/${plan.tool_code}`,
requestBody
);
const executionMs = Date.now() - startTime;
// 4. 记录执行日志(不含真实数据)
await prisma.ssaExecutionLog.create({
data: {
sessionId,
toolCode: plan.tool_code,
inputParams: plan.params, // 只记录参数,不记录数据
outputStatus: response.data.status,
outputResult: response.data.results,
traceLog: response.data.trace_log || [],
executionMs
}
});
return response.data;
} catch (error: any) {
logger.error('R service call failed', { sessionId, toolCode: plan.tool_code, error });
// 🆕 502/504 特殊处理(R 服务崩溃或超时)
const statusCode = error.response?.status;
if (statusCode === 502 || statusCode === 504) {
throw new Error('统计服务繁忙或数据异常,请稍后重试');
}
// 🆕 提取 R 服务返回的用户友好提示
const userHint = error.response?.data?.user_hint;
if (userHint) {
throw new Error(userHint);
}
throw new Error(`R service error: ${error.message}`);
}
}
async healthCheck(): Promise<boolean> {
try {
const res = await this.client.get('/health');
return res.data.status === 'ok';
} catch {
return false;
}
}
}
4.2 🆕 DecisionTableService(决策表匹配 - 优先)
// planner/DecisionTableService.ts
import { prisma } from '@/common/db';
import { logger } from '@/common/logging';
interface AnalysisIntent {
goalType: string; // 组间差异 | 相关性 | 分布描述 | 预测建模
yType: string; // 连续 | 分类 | 计数
xType?: string; // 连续 | 分类 | 无
designType: string; // 独立 | 配对 | 重复测量
}
export class DecisionTableService {
/**
* 🆕 根据分析意图从决策表精准匹配工具
* 优先级: 决策表匹配 > RAG 检索
*/
async matchTool(intent: AnalysisIntent): Promise<string | null> {
const result = await prisma.ssaDecisionTable.findFirst({
where: {
goalType: intent.goalType,
yType: intent.yType,
xType: intent.xType || null,
designType: intent.designType,
isActive: true
},
orderBy: { priority: 'desc' }
});
if (result) {
logger.info('Decision table matched', {
intent,
toolCode: result.toolCode
});
return result.toolCode;
}
return null;
}
/**
* 🆕 获取降级工具(护栏触发时使用)
*/
async getAlternativeTool(toolCode: string): Promise<string | null> {
const entry = await prisma.ssaDecisionTable.findFirst({
where: { toolCode, isActive: true }
});
return entry?.altToolCode || null;
}
/**
* 🆕 从 LLM 提取分析意图(结构化)
*/
async extractIntent(userQuery: string, dataSchema: object): Promise<AnalysisIntent> {
const llm = LLMFactory.getAdapter('deepseek-v3');
const prompt = `
分析用户的统计需求,提取以下四个维度:
用户需求: ${userQuery}
数据结构: ${JSON.stringify(dataSchema, null, 2)}
请返回 JSON 格式:
{
"goalType": "组间差异 | 相关性 | 分布描述 | 预测建模",
"yType": "连续 | 分类 | 计数",
"xType": "连续 | 分类 | 无",
"designType": "独立 | 配对 | 重复测量"
}
只返回 JSON,不要其他内容。
`.trim();
const response = await llm.chat([{ role: 'user', content: prompt }]);
try {
return JSON.parse(jsonrepair(response));
} catch {
// 兜底默认值
return {
goalType: '组间差异',
yType: '连续',
xType: '分类',
designType: '独立'
};
}
}
}
4.3 ToolRetrievalService(RAG 检索 - 兜底)
// planner/ToolRetrievalService.ts
import { VectorSearchService } from '@/common/rag';
import { LLMFactory } from '@/common/llm/adapters/LLMFactory';
import { prisma } from '@/common/db';
import { DecisionTableService } from './DecisionTableService';
export class ToolRetrievalService {
private vectorSearch: VectorSearchService;
private decisionTable: DecisionTableService;
constructor() {
this.vectorSearch = new VectorSearchService({
schema: 'ssa_schema',
table: 'tools_library',
embeddingColumn: 'embedding',
textColumn: 'search_text'
});
this.decisionTable = new DecisionTableService();
}
/**
* 🆕 工具选择策略:决策表优先,RAG 兜底
*/
async selectTool(query: string, dataSchema: object): Promise<any> {
// 1. 尝试决策表精准匹配
const intent = await this.decisionTable.extractIntent(query, dataSchema);
const matchedCode = await this.decisionTable.matchTool(intent);
if (matchedCode) {
const tool = await prisma.ssaTool.findUnique({
where: { toolCode: matchedCode }
});
if (tool) {
return { ...tool, matchMethod: 'decision_table' };
}
}
// 2. 决策表未命中,使用 RAG 检索
const ragResults = await this.retrieveTools(query, dataSchema, 1);
if (ragResults.length > 0) {
return { ...ragResults[0], matchMethod: 'rag' };
}
return null;
}
async retrieveTools(query: string, dataSchema: object, topK = 5) {
// 1. Query Rewrite(可选,提升召回)
const rewriter = LLMFactory.getAdapter('deepseek-v3');
const rewritePrompt = `
将用户的统计分析需求改写为更适合检索统计工具的查询:
用户需求: ${query}
数据结构: ${JSON.stringify(dataSchema)}
输出改写后的查询(一句话):
`.trim();
const rewrittenQuery = await rewriter.chat([
{ role: 'user', content: rewritePrompt }
]);
// 2. 向量检索
const vectorResults = await this.vectorSearch.search(rewrittenQuery, topK);
// 3. 关键词检索 (pg_bigm)
const keywordResults = await prisma.$queryRaw`
SELECT id, tool_code, name, description, params_schema, guardrails
FROM ssa_schema.tools_library
WHERE search_text LIKE '%' || ${query} || '%'
AND is_active = true
LIMIT 5
`;
// 4. RRF 融合
const merged = this.rrfMerge(vectorResults, keywordResults);
// 5. Rerank(可选)
// const reranked = await this.rerank(merged, query);
return merged.slice(0, topK);
}
private rrfMerge(vectorResults: any[], keywordResults: any[], k = 60) {
const scores = new Map<string, number>();
vectorResults.forEach((item, idx) => {
const rrf = 1 / (k + idx + 1);
scores.set(item.id, (scores.get(item.id) || 0) + rrf);
});
keywordResults.forEach((item, idx) => {
const rrf = 1 / (k + idx + 1);
scores.set(item.id, (scores.get(item.id) || 0) + rrf);
});
// 合并并排序
const allItems = [...vectorResults, ...keywordResults];
const unique = [...new Map(allItems.map(i => [i.id, i])).values()];
return unique.sort((a, b) =>
(scores.get(b.id) || 0) - (scores.get(a.id) || 0)
);
}
}
4.3 PlannerService(AI 规划 + JSON 容错)
// services/PlannerService.ts
import { LLMFactory } from '@/common/llm/adapters/LLMFactory';
import { PromptService } from '@/common/prompts';
import { ToolRetrievalService } from './ToolRetrievalService';
import { prisma } from '@/common/db';
import { jsonrepair } from 'jsonrepair'; // 📌 JSON 修复库
import { planSchema } from '../validators/planSchema'; // 📌 Zod Schema
export class PlannerService {
private retrieval: ToolRetrievalService;
constructor() {
this.retrieval = new ToolRetrievalService();
}
async generatePlan(sessionId: string, userQuery: string) {
// 1. 获取会话的数据 Schema(不含真实数据)
const session = await prisma.ssaSession.findUniqueOrThrow({
where: { id: sessionId },
select: { dataSchema: true }
});
// 2. RAG 检索候选工具
const candidateTools = await this.retrieval.retrieveTools(
userQuery,
session.dataSchema,
5
);
// 3. 获取 Planner Prompt
const promptTemplate = await PromptService.get('SSA_PLANNER');
// 4. 构造 Prompt
const systemPrompt = promptTemplate
.replace('{{data_schema_json}}', JSON.stringify(session.dataSchema, null, 2))
.replace('{{candidate_tools_json}}', JSON.stringify(candidateTools, null, 2));
// 5. 调用 LLM
const llm = LLMFactory.getAdapter('deepseek-v3');
const response = await llm.chat([
{ role: 'system', content: systemPrompt },
{ role: 'user', content: userQuery }
]);
// 6. 📌 解析 + 修复 + 校验 JSON
const plan = this.parseAndValidateJson(response, candidateTools);
// 7. 保存用户消息和计划消息
await prisma.ssaMessage.createMany({
data: [
{
sessionId,
role: 'user',
contentType: 'text',
content: { text: userQuery }
},
{
sessionId,
role: 'assistant',
contentType: 'plan',
content: plan
}
]
});
return plan;
}
// 📌 增强的 JSON 解析(含修复和校验)
private parseAndValidateJson(text: string, candidateTools: any[]): object {
// Step 1: 提取 JSON 块
const jsonMatch = text.match(/```json\n?([\s\S]*?)\n?```/) ||
text.match(/\{[\s\S]*\}/);
if (!jsonMatch) {
throw new Error('LLM response does not contain valid JSON');
}
let jsonStr = jsonMatch[1] || jsonMatch[0];
// Step 2: 使用 jsonrepair 修复常见问题(末尾逗号、缺少引号等)
try {
jsonStr = jsonrepair(jsonStr);
} catch (repairError) {
// 修复失败,继续尝试原始解析
}
// Step 3: 解析 JSON
let parsed: any;
try {
parsed = JSON.parse(jsonStr);
} catch (parseError) {
throw new Error(`JSON parse failed: ${parseError.message}`);
}
// Step 4: 使用 Zod 校验结构
const validatedPlan = planSchema.safeParse(parsed);
if (!validatedPlan.success) {
throw new Error(`Plan validation failed: ${validatedPlan.error.message}`);
}
// Step 5: 校验 tool_code 是否在候选列表中
const validToolCodes = candidateTools.map(t => t.tool_code);
if (!validToolCodes.includes(validatedPlan.data.tool_code)) {
throw new Error(`Invalid tool_code: ${validatedPlan.data.tool_code}`);
}
return validatedPlan.data;
}
}
4.4 Zod Schema 定义
// validators/planSchema.ts
import { z } from 'zod';
export const planSchema = z.object({
tool_code: z.string().min(1),
reasoning: z.string().optional(),
params: z.record(z.any()),
guardrails: z.object({
check_normality: z.boolean().optional(),
check_homogeneity: z.boolean().optional(),
auto_fix: z.boolean().optional()
}).optional()
});
export type PlanType = z.infer<typeof planSchema>;
4.5 🆕 ConsultService(无数据咨询)
// planner/ConsultService.ts
import { LLMFactory } from '@/common/llm/adapters/LLMFactory';
import { PromptService } from '@/common/prompts';
import { ToolRetrievalService } from './ToolRetrievalService';
import { prisma } from '@/common/db';
export class ConsultService {
private retrieval: ToolRetrievalService;
constructor() {
this.retrieval = new ToolRetrievalService();
}
/**
* 🆕 无数据咨询对话
* - 用户只描述研究设计、变量类型等
* - 系统推理适合的统计方法
*/
async chat(sessionId: string, userMessage: string) {
// 1. 获取会话历史
const history = await prisma.ssaMessage.findMany({
where: { sessionId },
orderBy: { createdAt: 'asc' }
});
// 2. 获取咨询专用 Prompt
const systemPrompt = await PromptService.get('SSA_CONSULT');
// 3. 构造消息列表
const messages = [
{ role: 'system' as const, content: systemPrompt },
...history.map(m => ({
role: m.role as 'user' | 'assistant',
content: typeof m.content === 'string' ? m.content : JSON.stringify(m.content)
})),
{ role: 'user' as const, content: userMessage }
];
// 4. 调用 LLM
const llm = LLMFactory.getAdapter('deepseek-v3');
const response = await llm.chat(messages);
// 5. 保存消息
await prisma.ssaMessage.createMany({
data: [
{ sessionId, role: 'user', contentType: 'text', content: { text: userMessage } },
{ sessionId, role: 'assistant', contentType: 'text', content: { text: response } }
]
});
return response;
}
/**
* 🆕 生成 SAP 文档
* - 基于对话历史生成结构化的统计分析计划
*/
async generateSAP(sessionId: string): Promise<{
title: string;
sections: Array<{
heading: string;
content: string;
}>;
recommendedTools: string[];
}> {
const history = await prisma.ssaMessage.findMany({
where: { sessionId },
orderBy: { createdAt: 'asc' }
});
const sapPrompt = await PromptService.get('SSA_SAP_GENERATOR');
const llm = LLMFactory.getAdapter('deepseek-v3');
const response = await llm.chat([
{ role: 'system', content: sapPrompt },
{ role: 'user', content: `基于以下对话生成统计分析计划:\n${JSON.stringify(history)}` }
]);
// 解析 JSON 响应
const sap = JSON.parse(response);
return sap;
}
}
4.6 🆕 SAPGeneratorService(SAP 文档导出)
// planner/SAPGeneratorService.ts
import { Document, Packer, Paragraph, HeadingLevel, Table, TableRow, TableCell } from 'docx';
interface SAPDocument {
title: string;
sections: Array<{
heading: string;
content: string;
}>;
recommendedTools: string[];
}
export class SAPGeneratorService {
/**
* 🆕 生成 Word 文档
*/
async generateWord(sap: SAPDocument): Promise<Buffer> {
const doc = new Document({
sections: [{
children: [
new Paragraph({
text: sap.title,
heading: HeadingLevel.TITLE
}),
...sap.sections.flatMap(section => [
new Paragraph({
text: section.heading,
heading: HeadingLevel.HEADING_1
}),
new Paragraph({ text: section.content })
]),
new Paragraph({
text: '推荐统计方法',
heading: HeadingLevel.HEADING_1
}),
...sap.recommendedTools.map(tool =>
new Paragraph({ text: `• ${tool}` })
)
]
}]
});
return await Packer.toBuffer(doc);
}
/**
* 🆕 生成 Markdown
*/
generateMarkdown(sap: SAPDocument): string {
let md = `# ${sap.title}\n\n`;
for (const section of sap.sections) {
md += `## ${section.heading}\n\n${section.content}\n\n`;
}
md += `## 推荐统计方法\n\n`;
for (const tool of sap.recommendedTools) {
md += `- ${tool}\n`;
}
return md;
}
}
4.7 🆕 ConfigLoaderService(配置中台)
// config/ConfigLoaderService.ts
import * as XLSX from 'xlsx';
import { ConfigValidatorService } from './ConfigValidatorService';
import { logger } from '@/common/logging';
interface ToolConfig {
tool_code: string;
name: string;
description: string;
params_schema: Record<string, any>;
guardrails: Record<string, any>;
search_text: string;
}
interface GuardrailConfig {
guardrail_code: string;
description: string;
threshold: number;
auto_fix_action: string;
}
export class ConfigLoaderService {
private static instance: ConfigLoaderService;
private toolsCache: Map<string, ToolConfig> = new Map();
private guardrailsCache: Map<string, GuardrailConfig> = new Map();
private lastLoadTime: Date | null = null;
static getInstance() {
if (!this.instance) {
this.instance = new ConfigLoaderService();
}
return this.instance;
}
/**
* 🆕 从 Excel 加载配置
*/
async loadFromExcel(buffer: Buffer): Promise<{
tools: number;
guardrails: number;
errors: string[];
}> {
const workbook = XLSX.read(buffer, { type: 'buffer' });
const errors: string[] = [];
// Sheet 1: 工具定义
const toolsSheet = workbook.Sheets['Tools'];
if (toolsSheet) {
const toolsData = XLSX.utils.sheet_to_json<ToolConfig>(toolsSheet);
for (const tool of toolsData) {
// 校验
const validation = ConfigValidatorService.validateTool(tool);
if (validation.valid) {
this.toolsCache.set(tool.tool_code, tool);
} else {
errors.push(`Tool ${tool.tool_code}: ${validation.error}`);
}
}
}
// Sheet 2: 护栏规则
const guardrailsSheet = workbook.Sheets['Guardrails'];
if (guardrailsSheet) {
const guardrailsData = XLSX.utils.sheet_to_json<GuardrailConfig>(guardrailsSheet);
for (const gr of guardrailsData) {
const validation = ConfigValidatorService.validateGuardrail(gr);
if (validation.valid) {
this.guardrailsCache.set(gr.guardrail_code, gr);
} else {
errors.push(`Guardrail ${gr.guardrail_code}: ${validation.error}`);
}
}
}
this.lastLoadTime = new Date();
logger.info('Config loaded', {
tools: this.toolsCache.size,
guardrails: this.guardrailsCache.size
});
return {
tools: this.toolsCache.size,
guardrails: this.guardrailsCache.size,
errors
};
}
/**
* 🆕 热加载(清空缓存并重新加载)
*/
async reload(): Promise<void> {
// 从数据库或默认 Excel 重新加载
this.toolsCache.clear();
this.guardrailsCache.clear();
// ... 重新加载逻辑
logger.info('Config reloaded');
}
getTool(toolCode: string): ToolConfig | undefined {
return this.toolsCache.get(toolCode);
}
getAllTools(): ToolConfig[] {
return Array.from(this.toolsCache.values());
}
getGuardrail(code: string): GuardrailConfig | undefined {
return this.guardrailsCache.get(code);
}
}
4.8 🆕 ConfigValidatorService(配置校验)
// config/ConfigValidatorService.ts
interface ValidationResult {
valid: boolean;
error?: string;
}
export class ConfigValidatorService {
/**
* 🆕 校验工具配置
*/
static validateTool(tool: any): ValidationResult {
// 必填校验
if (!tool.tool_code) {
return { valid: false, error: 'tool_code is required' };
}
if (!tool.name) {
return { valid: false, error: 'name is required' };
}
// 格式校验
if (!/^ST_[A-Z_]+$/.test(tool.tool_code)) {
return { valid: false, error: 'tool_code must match ST_XXX pattern' };
}
// params_schema 校验
if (tool.params_schema) {
try {
if (typeof tool.params_schema === 'string') {
JSON.parse(tool.params_schema);
}
} catch {
return { valid: false, error: 'params_schema is not valid JSON' };
}
}
return { valid: true };
}
/**
* 🆕 校验护栏配置
*/
static validateGuardrail(gr: any): ValidationResult {
if (!gr.guardrail_code) {
return { valid: false, error: 'guardrail_code is required' };
}
if (typeof gr.threshold !== 'number') {
return { valid: false, error: 'threshold must be a number' };
}
if (gr.threshold < 0 || gr.threshold > 1) {
return { valid: false, error: 'threshold must be between 0 and 1' };
}
return { valid: true };
}
}
4.9 🆕 配置中台路由
// routes/config.routes.ts
import { FastifyInstance } from 'fastify';
import { ConfigLoaderService } from '../config/ConfigLoaderService';
export default async function configRoutes(app: FastifyInstance) {
const configService = ConfigLoaderService.getInstance();
// 导入 Excel 配置
app.post('/import', async (req, reply) => {
const data = await req.file();
if (!data) {
return reply.status(400).send({ error: 'No file uploaded' });
}
const buffer = await data.toBuffer();
const result = await configService.loadFromExcel(buffer);
return reply.send(result);
});
// 🆕 热加载配置(Admin API)
app.post('/reload', async (req, reply) => {
await configService.reload();
return reply.send({ success: true, timestamp: new Date().toISOString() });
});
// 获取工具列表
app.get('/tools', async (req, reply) => {
const tools = configService.getAllTools();
return reply.send(tools);
});
// 校验配置文件(不导入)
app.post('/validate', async (req, reply) => {
const data = await req.file();
if (!data) {
return reply.status(400).send({ error: 'No file uploaded' });
}
// 仅校验,不加载到缓存
// ...
return reply.send({ valid: true });
});
}
4.10 🆕 咨询模式路由
// routes/consult.routes.ts
import { FastifyInstance } from 'fastify';
import { ConsultService } from '../planner/ConsultService';
import { SAPGeneratorService } from '../planner/SAPGeneratorService';
import { prisma } from '@/common/db';
export default async function consultRoutes(app: FastifyInstance) {
const consultService = new ConsultService();
const sapGenerator = new SAPGeneratorService();
// 创建咨询会话(无数据)
app.post('/', async (req, reply) => {
const userId = req.user.id;
const session = await prisma.ssaSession.create({
data: {
userId,
title: '统计咨询',
status: 'consult' // 🆕 标记为咨询模式
}
});
return reply.send(session);
});
// 咨询对话
app.post('/:id/chat', async (req, reply) => {
const { id } = req.params as { id: string };
const { message } = req.body as { message: string };
const response = await consultService.chat(id, message);
return reply.send({ response });
});
// 生成 SAP 文档
app.post('/:id/generate-sap', async (req, reply) => {
const { id } = req.params as { id: string };
const sap = await consultService.generateSAP(id);
// 保存到会话
await prisma.ssaMessage.create({
data: {
sessionId: id,
role: 'assistant',
contentType: 'sap',
content: sap
}
});
return reply.send(sap);
});
// 下载 SAP(Word/Markdown)
app.get('/:id/download-sap', async (req, reply) => {
const { id } = req.params as { id: string };
const { format = 'word' } = req.query as { format?: 'word' | 'markdown' };
// 获取最新的 SAP
const sapMessage = await prisma.ssaMessage.findFirst({
where: { sessionId: id, contentType: 'sap' },
orderBy: { createdAt: 'desc' }
});
if (!sapMessage) {
return reply.status(404).send({ error: 'SAP not found' });
}
const sap = sapMessage.content as any;
if (format === 'markdown') {
const md = sapGenerator.generateMarkdown(sap);
reply.header('Content-Type', 'text/markdown');
reply.header('Content-Disposition', 'attachment; filename="SAP.md"');
return reply.send(md);
} else {
const buffer = await sapGenerator.generateWord(sap);
reply.header('Content-Type', 'application/vnd.openxmlformats-officedocument.wordprocessingml.document');
reply.header('Content-Disposition', 'attachment; filename="SAP.docx"');
return reply.send(buffer);
}
});
}
5. Brain-Hand 数据隔离
核心原则:LLM 只看 Schema,R 服务处理真实数据
┌─────────────────────────────────────────────────────────────┐
│ 数据上传流程 │
│ │
│ Excel/CSV ──────┬────────────────────────────────────────│
│ │ │
│ ┌──────▼──────┐ │
│ │ 数据解析器 │ │
│ └──────┬──────┘ │
│ │ │
│ ┌─────────┴─────────┐ │
│ │ │ │
│ dataSchema dataPayload │
│ (结构/类型/统计) (真实数据) │
│ │ │ │
│ ▼ ▼ │
│ LLM (Planner) R (Executor) │
│ │
└─────────────────────────────────────────────────────────────┘
5.1 数据解析实现
// services/DataParserService.ts
import * as XLSX from 'xlsx';
export class DataParserService {
static parse(buffer: Buffer, filename: string) {
const workbook = XLSX.read(buffer, { type: 'buffer' });
const sheetName = workbook.SheetNames[0];
const sheet = workbook.Sheets[sheetName];
// 转为 JSON 数组
const data = XLSX.utils.sheet_to_json(sheet);
// 提取 Schema
const schema = this.extractSchema(data);
return {
dataSchema: schema, // 给 LLM
dataPayload: data // 给 R
};
}
private static extractSchema(data: any[]) {
if (data.length === 0) return { columns: [], rowCount: 0 };
const columns = Object.keys(data[0]).map(colName => {
const values = data.map(row => row[colName]).filter(v => v != null);
const type = this.inferType(values);
return {
name: colName,
type,
...this.computeStats(values, type, data.length) // 📌 传入行数用于隐私保护
};
});
return {
rowCount: data.length,
columns
};
}
private static inferType(values: any[]): 'numeric' | 'categorical' | 'datetime' {
const sample = values.slice(0, 100);
const numericCount = sample.filter(v => typeof v === 'number' || !isNaN(Number(v))).length;
if (numericCount / sample.length > 0.9) return 'numeric';
return 'categorical';
}
private static computeStats(values: any[], type: string, rowCount: number) {
if (type === 'numeric') {
const nums = values.map(Number).filter(n => !isNaN(n));
let min = Math.min(...nums);
let max = Math.max(...nums);
// 📌 小样本隐私保护:N < 10 时模糊化极值
if (rowCount < 10) {
min = Math.floor(min / 10) * 10; // 向下取整到十位
max = Math.ceil(max / 10) * 10; // 向上取整到十位
}
return {
min,
max,
mean: nums.reduce((a, b) => a + b, 0) / nums.length,
missing: values.length - nums.length,
privacyProtected: rowCount < 10 // 📌 标记是否已模糊化
};
}
// categorical
const counts = new Map<string, number>();
values.forEach(v => {
const key = String(v);
counts.set(key, (counts.get(key) || 0) + 1);
});
// 🆕 分类变量隐私保护:
// 如果某个取值的计数 < 5 且总行数 > 10,则隐藏具体值
const uniqueValues: string[] = [];
let maskedCount = 0;
for (const [value, count] of counts.entries()) {
if (count < 5 && rowCount > 10) {
maskedCount++;
} else {
uniqueValues.push(value);
}
}
// 最多展示 10 个非敏感值
const safeValues = uniqueValues.slice(0, 10);
if (maskedCount > 0) {
safeValues.push(`[${maskedCount} 个稀有值已隐藏]`);
}
return {
uniqueValues: safeValues,
uniqueCount: counts.size,
missing: values.filter(v => v == null || v === '').length,
privacyProtected: maskedCount > 0 // 🆕 标记
};
}
}
6. Prompt 注册
-- 注册 Planner Prompt
INSERT INTO capability_schema.prompt_templates (code, name, content, model, temperature)
VALUES (
'SSA_PLANNER',
'SSA 统计规划器',
'你是一名资深的生物统计学家。你面前有一份数据摘要(Metadata)和一组可用的统计工具箱。
请根据用户的需求,选择最合适的一个工具,并生成详细的执行计划(SAP)。
### 数据摘要
{{data_schema_json}}
### 可用工具箱 (Candidates)
{{candidate_tools_json}}
### 决策规则 (Guardrails)
1. **类型匹配**:严格检查变量类型。不要把分类变量填入要求数值型的参数中。
2. **工具匹配**:如果用户要做 "预测",优先选 "回归" 类工具;如果做 "差异",选 "检验" 类工具。
3. **护栏配置**:对于 T 检验、ANOVA 等参数检验,必须开启 check_normality。
### 输出要求
请先在 <thinking> 标签中进行推理,分析变量类型和工具适用性。
然后输出纯 JSON,格式如下:
{
"tool_code": "选中工具的CODE",
"reasoning": "一句话解释为什么选这个工具",
"params": { ...根据工具定义的 params_schema 填写... },
"guardrails": { "check_normality": true, "auto_fix": true }
}',
'deepseek-v3',
0.3
);
7. 与主应用集成
// backend/src/index.ts
import ssaModule from './modules/ssa';
// 在 Fastify 注册
app.register(ssaModule, { prefix: '/api/v1/ssa' });
8. 环境变量
# .env
# R 服务配置
R_SERVICE_URL=http://ssa-r-service:8080 # SAE VPC 内网地址
R_SERVICE_TIMEOUT=120000 # 📌 超时 120s
# 📌 OSS 配置(必须使用 VPC 内网 Endpoint)
OSS_ENDPOINT=oss-cn-beijing-internal.aliyuncs.com # 内网地址
OSS_BUCKET=ssa-data-bucket
OSS_ACCESS_KEY_ID=your-access-key
OSS_ACCESS_KEY_SECRET=your-secret
# LLM 配置
LLM_DEFAULT_MODEL=deepseek-v3
重要:OSS Endpoint 必须使用
-internal后缀的 VPC 内网地址,否则 R 服务的网络隔离策略会导致文件下载失败。
9. 测试检查清单
| 测试场景 | 预期结果 |
|---|---|
| POST /sessions 创建会话 | 返回 sessionId |
| POST /sessions/:id/upload (CSV) | 返回 dataSchema |
| POST /sessions/:id/upload (N<10) | dataSchema.privacyProtected = true |
| POST /sessions/:id/plan (T检验意图) | 返回包含 tool_code 的 plan |
| POST /sessions/:id/plan (LLM 返回格式错误 JSON) | json-repair 修复成功 |
| POST /sessions/:id/plan (参数不合法) | Zod 校验失败,返回错误 |
| POST /sessions/:id/execute | R 服务返回 success |
| POST /sessions/:id/execute (超过 60s) | 不超时,等待 120s |
| GET /sessions/:id/download-code | 下载 .R 文件 |
| R 服务宕机时 execute | 返回友好错误 |
10. 依赖包清单
{
"dependencies": {
"jsonrepair": "^3.6.0",
"zod": "^3.22.4",
"xlsx": "^0.18.5",
"axios": "^1.6.0",
"docx": "^8.5.0" // 🆕 Word 文档生成(SAP 导出)
}
}
11. 🆕 配置中台 Excel 模板规范
核心理念:统计学专家通过 Excel + R 脚本配置系统行为,无需修改代码。
11.0 🆕 决策表 Excel (decision_table.xlsx)
用于 Planner 工具选择,四维精准匹配
| 列名 | 类型 | 必填 | 说明 |
|---|---|---|---|
| goal_type | string | ✅ | 分析目标:组间差异 / 相关性 / 分布描述 / 预测建模 |
| y_type | string | ✅ | 因变量类型:连续 / 分类 / 计数 |
| x_type | string | 自变量类型:连续 / 分类 / 无 | |
| design_type | string | ✅ | 设计类型:独立 / 配对 / 重复测量 |
| tool_code | string | ✅ | 推荐工具代码 |
| alt_tool_code | string | 备选工具(护栏降级) | |
| priority | number | 优先级(数字越大越优先) |
示例数据:
| goal_type | y_type | x_type | design_type | tool_code | alt_tool_code |
|---|---|---|---|---|---|
| 组间差异 | 连续 | 分类 | 独立 | ST_T_TEST_IND | ST_MANN_WHITNEY |
| 组间差异 | 连续 | 分类 | 配对 | ST_T_TEST_PAIRED | ST_WILCOXON |
| 相关性 | 连续 | 连续 | 独立 | ST_CORRELATION | ST_CORRELATION |
11.1 Sheet 1: Metadata(工具元数据)
| 列名 | 类型 | 必填 | 说明 |
|---|---|---|---|
| tool_code | string | ✅ | 工具代码,格式 ST_XXX |
| name | string | ✅ | 工具名称 |
| version | string | 版本号,默认 1.0.0 | |
| r_script_file | string | ✅ | 🆕 R 脚本文件名(如 t_test_ind.R) |
| description | string | ✅ | 工具描述 |
| usage_context | string | 适用场景 | |
| search_text | string | RAG 搜索关键词 |
11.2 Sheet 2: ParamMapping(参数映射)
🆕 JSON Key → R 参数名映射
| 列名 | 类型 | 必填 | 说明 |
|---|---|---|---|
| tool_code | string | ✅ | 工具代码 |
| json_key | string | ✅ | 前端传入的 JSON 字段名 |
| r_param_name | string | ✅ | R 函数参数名 |
| data_type | string | ✅ | 数据类型:string / number / boolean |
| is_required | boolean | 是否必填 | |
| default_value | string | 默认值 | |
| validation_rule | string | 校验规则(正则或条件) | |
| description | string | 参数说明 |
示例数据:
| tool_code | json_key | r_param_name | data_type | is_required |
|---|---|---|---|---|
| ST_T_TEST_IND | group_variable | group_var | string | TRUE |
| ST_T_TEST_IND | value_variable | value_var | string | TRUE |
| ST_T_TEST_IND | confidence_level | conf_level | number | FALSE |
11.3 Sheet 3: Guardrails(护栏规则链)
🆕 支持 Block / Warn / Switch 三种 Action
| 列名 | 类型 | 必填 | 说明 |
|---|---|---|---|
| tool_code | string | ✅ | 工具代码 |
| check_name | string | ✅ | 检查名称:正态性检验 / 方差齐性 / 样本量 |
| check_order | number | 执行顺序(数字越小越先) | |
| check_code | string | ✅ | R 函数名(如 check_normality) |
| threshold | string | 阈值条件:p < 0.05 | |
| action_type | string | ✅ | 🆕 Block / Warn / Switch |
| action_target | string | Switch 时的目标工具代码 | |
| is_enabled | boolean | 是否启用 |
Action 类型说明:
- Block: 阻止执行,返回错误
- Warn: 警告但继续执行
- Switch: 🆕 自动切换到备选方法
示例数据:
| tool_code | check_name | check_code | threshold | action_type | action_target |
|---|---|---|---|---|---|
| ST_T_TEST_IND | 正态性检验 | check_normality | p < 0.05 | Switch | ST_MANN_WHITNEY |
| ST_T_TEST_IND | 样本量检查 | check_sample_size | n < 3 | Block | |
| ST_ANOVA_ONE | 方差齐性 | check_homogeneity | p < 0.05 | Warn |
11.4 Sheet 4: OutputDef(输出字段定义)
| 列名 | 类型 | 必填 | 说明 |
|---|---|---|---|
| tool_code | string | ✅ | 工具代码 |
| field_name | string | ✅ | 字段名 |
| display_name | string | ✅ | 展示名称 |
| data_type | string | 数据类型 | |
| format_rule | string | 格式化规则(如 %.3f) |
11.5 Sheet 5: Interpretation(结果解读模板)
🆕 "填空题"式的论文级结论生成
| 列名 | 类型 | 必填 | 说明 |
|---|---|---|---|
| tool_code | string | ✅ | 工具代码 |
| scenario_key | string | ✅ | 场景:significant / not_significant / warning |
| template | text | ✅ | 解读模板(含占位符) |
| placeholders | text | 占位符列表(JSON 数组) |
示例模板:
场景: significant
模板: "采用 {method} 进行分析,结果表明两组之间存在统计学显著差异(t = {statistic}, p {p_value_fmt}, 95% CI [{ci_lower}, {ci_upper}])。{group1} 组均值为 {mean1} ± {sd1},{group2} 组均值为 {mean2} ± {sd2}。"
场景: not_significant
模板: "采用 {method} 进行分析,结果表明两组之间差异无统计学意义(t = {statistic}, p = {p_value_fmt})。"
11.6 🆕 R 脚本规范
专家上传的 R 脚本必须遵循以下规范
# 文件名: t_test_ind.R
# 工具代码: ST_T_TEST_IND
# 版本: 1.0.0
#' @title 独立样本 T 检验
#' @description 比较两组独立样本的均值差异
#' @param input List 包含 data_source, params, guardrails
#' @return List 包含 status, results, plots, trace_log, reproducible_code
# 📌 所有脚本必须使用统一入口函数
run_analysis <- function(input) {
# 1. 数据加载
df <- load_input_data(input)
# 2. 参数提取(根据 ParamMapping 配置)
group_var <- input$params$group_var
value_var <- input$params$value_var
# 3. 护栏检查(根据 Guardrails 配置)
# ... 护栏检查代码 ...
# 4. 核心计算
result <- t.test(...)
# 5. 返回标准格式
return(list(
status = "success",
results = list(...),
plots = list(...),
trace_log = logs,
reproducible_code = code
))
}
12. 🆕 SAP 文档规范
12.1 SAP 结构定义
interface SAPDocument {
title: string; // 统计分析计划标题
sections: Array<{
heading: string; // 章节标题
content: string; // 章节内容
}>;
recommendedTools: string[]; // 推荐的统计方法列表
metadata: {
generatedAt: string; // 生成时间
sessionId: string; // 关联会话
version: string; // 版本号
};
}
12.2 标准章节
- 研究背景 - 研究目的、设计类型
- 数据描述 - 样本量、变量类型、缺失情况
- 统计假设 - 原假设、备择假设
- 分析方法 - 具体统计方法及选择理由
- 结果解读指南 - 如何解读统计结果
- 注意事项 - 方法局限性、前提条件