Files
AIclinicalresearch/docs/03-业务模块/SSA-智能统计分析/04-开发计划/03-后端开发指南.md
HaHafeng 8137e3cde2 feat(ssa): Complete SSA-Pro MVP development plan v1.3
Summary:

- Add PRD and architecture design V4 (Brain-Hand model)

- Complete 5 development guide documents

- Pass 3 rounds of team review (v1.0 -> v1.3)

- Add module status guide document

- Update system status document

Key Features:

- Brain-Hand architecture: Node.js + R Docker

- Statistical guardrails with auto degradation

- HITL workflow: PlanCard -> ExecutionTrace -> ResultCard

- Mixed data protocol: inline vs OSS

- Reproducible R code delivery

MVP Scope: 10 statistical tools

Status: Design 100%, ready for development
Co-authored-by: Cursor <cursoragent@cursor.com>
2026-02-18 21:58:37 +08:00

24 KiB
Raw Blame History

SSA-Pro 后端开发指南

文档版本: v1.3
创建日期: 2026-02-18
最后更新: 2026-02-18纳入 V3.0 终极审查建议)
目标读者: Node.js 后端工程师


1. 模块目录结构

backend/src/modules/ssa/
├── index.ts                    # 模块入口,注册路由
├── routes/
│   ├── session.routes.ts       # 会话管理路由
│   └── analysis.routes.ts      # 分析执行路由
├── services/
│   ├── SessionService.ts       # 会话 CRUD
│   ├── PlannerService.ts       # AI 规划LLM 调用)
│   ├── CriticService.ts        # 结果解读(流式)
│   ├── ToolRetrievalService.ts # RAG 工具检索
│   ├── RClientService.ts       # R 服务调用
│   └── DataParserService.ts    # 数据解析 + Schema 提取
├── validators/
│   └── planSchema.ts           # 📌 Zod Schema 定义
├── dto/
│   ├── CreateSessionDto.ts
│   ├── UploadDataDto.ts
│   └── ExecuteAnalysisDto.ts
└── types/
    └── index.ts                # 类型定义

2. 数据库 SchemaPrisma

// schema.prisma - SSA 模块部分

// 分析会话
model SsaSession {
  id            String   @id @default(uuid())
  userId        String   @map("user_id")
  title         String?
  dataSchema    Json?    @map("data_schema")    // 数据结构LLM可见
  dataPayload   Json?    @map("data_payload")   // 真实数据仅R可见
  status        String   @default("active")
  createdAt     DateTime @default(now()) @map("created_at")
  updatedAt     DateTime @updatedAt @map("updated_at")
  
  messages      SsaMessage[]
  
  @@map("ssa_sessions")
  @@schema("ssa_schema")
}

// 消息记录
model SsaMessage {
  id            String   @id @default(uuid())
  sessionId     String   @map("session_id")
  role          String   // user | assistant | system
  contentType   String   @map("content_type")  // text | plan | result
  content       Json
  createdAt     DateTime @default(now()) @map("created_at")
  
  session       SsaSession @relation(fields: [sessionId], references: [id])
  
  @@map("ssa_messages")
  @@schema("ssa_schema")
}

// 工具库
model SsaTool {
  id            String   @id @default(uuid())
  toolCode      String   @unique @map("tool_code")
  name          String
  version       String   @default("1.0.0")
  description   String
  usageContext  String?  @map("usage_context")
  paramsSchema  Json     @map("params_schema")
  guardrails    Json?
  searchText    String   @map("search_text")
  embedding     Unsupported("vector(1024)")?
  isActive      Boolean  @default(true) @map("is_active")
  createdAt     DateTime @default(now()) @map("created_at")
  updatedAt     DateTime @updatedAt @map("updated_at")
  
  @@map("tools_library")
  @@schema("ssa_schema")
}

// 执行日志
model SsaExecutionLog {
  id            String   @id @default(uuid())
  sessionId     String   @map("session_id")
  messageId     String?  @map("message_id")
  toolCode      String   @map("tool_code")
  inputParams   Json     @map("input_params")
  outputStatus  String   @map("output_status")
  outputResult  Json?    @map("output_result")
  traceLog      String[] @map("trace_log")
  executionMs   Int?     @map("execution_ms")
  createdAt     DateTime @default(now()) @map("created_at")
  
  @@map("execution_logs")
  @@schema("ssa_schema")
}

3. API 路由设计

3.1 路由注册

// index.ts
import { FastifyInstance } from 'fastify';
import sessionRoutes from './routes/session.routes';
import analysisRoutes from './routes/analysis.routes';

export default async function ssaModule(app: FastifyInstance) {
  // 注册认证中间件
  app.addHook('preHandler', app.authenticate);
  
  // 注册子路由
  app.register(sessionRoutes, { prefix: '/sessions' });
  app.register(analysisRoutes, { prefix: '/sessions' });
}

3.2 会话路由

// routes/session.routes.ts
import { FastifyInstance } from 'fastify';
import { SessionService } from '../services/SessionService';

export default async function sessionRoutes(app: FastifyInstance) {
  const sessionService = new SessionService();

  // 创建会话
  app.post('/', async (req, reply) => {
    const userId = req.user.id;
    const session = await sessionService.create(userId);
    return reply.send(session);
  });

  // 获取会话列表
  app.get('/', async (req, reply) => {
    const userId = req.user.id;
    const sessions = await sessionService.listByUser(userId);
    return reply.send(sessions);
  });

  // 获取单个会话(含消息历史)
  app.get('/:id', async (req, reply) => {
    const { id } = req.params as { id: string };
    const session = await sessionService.getById(id, req.user.id);
    return reply.send(session);
  });

  // 上传数据
  app.post('/:id/upload', async (req, reply) => {
    const { id } = req.params as { id: string };
    // 解析 Excel/CSV提取 Schema 和 Data
    const result = await sessionService.uploadData(id, req);
    return reply.send(result);
  });
}

3.3 分析路由

// routes/analysis.routes.ts
import { FastifyInstance } from 'fastify';
import { PlannerService } from '../services/PlannerService';
import { RClientService } from '../services/RClientService';
import { CriticService } from '../services/CriticService';

export default async function analysisRoutes(app: FastifyInstance) {
  const plannerService = new PlannerService();
  const rClientService = new RClientService();
  const criticService = new CriticService();

  // 生成分析计划(不执行)
  app.post('/:id/plan', async (req, reply) => {
    const { id } = req.params as { id: string };
    const { query } = req.body as { query: string };
    
    // 1. RAG 检索工具
    // 2. LLM 生成计划
    const plan = await plannerService.generatePlan(id, query);
    
    return reply.send({
      type: 'plan',
      plan
    });
  });

  // 确认执行
  app.post('/:id/execute', async (req, reply) => {
    const { id } = req.params as { id: string };
    const { plan } = req.body as { plan: object };
    
    // 1. 调用 R 服务执行
    const result = await rClientService.execute(id, plan);
    
    // 2. 保存执行日志
    // 3. 保存结果到消息
    
    return reply.send({
      type: 'result',
      result
    });
  });

  // 获取结果解读(流式)
  app.get('/:id/interpret/:messageId', async (req, reply) => {
    const { id, messageId } = req.params as { id: string; messageId: string };
    
    // 流式返回 Critic 解读
    reply.raw.setHeader('Content-Type', 'text/event-stream');
    
    await criticService.streamInterpret(id, messageId, reply.raw);
  });

  // 下载代码
  app.get('/:id/download-code/:messageId', async (req, reply) => {
    const { id, messageId } = req.params as { id: string; messageId: string };
    
    const code = await sessionService.getReproducibleCode(messageId);
    
    reply.header('Content-Type', 'text/plain');
    reply.header('Content-Disposition', 'attachment; filename="analysis.R"');
    return reply.send(code);
  });
}

4. 核心服务实现

4.1 RClientService调用 R 服务)

// services/RClientService.ts
import axios, { AxiosInstance } from 'axios';
import { prisma } from '@/common/db';
import { logger } from '@/common/logging';

export class RClientService {
  private client: AxiosInstance;
  
  constructor() {
    this.client = axios.create({
      baseURL: process.env.R_SERVICE_URL || 'http://localhost:8080',
      timeout: 120000,  // 📌 120s 超时(应对复杂计算)
      headers: { 'Content-Type': 'application/json' }
    });
  }

  async execute(sessionId: string, plan: {
    tool_code: string;
    params: Record<string, any>;
    guardrails: Record<string, boolean>;
  }) {
    const startTime = Date.now();
    
    // 1. 获取会话的真实数据
    const session = await prisma.ssaSession.findUniqueOrThrow({
      where: { id: sessionId }
    });
    
    // 🆕 2. 构造 R 服务请求(混合数据协议)
    const dataSource = this.buildDataSource(session);
    const requestBody = {
      data_source: dataSource,  // 🆕 统一数据源字段
      params: plan.params,
      guardrails: plan.guardrails
    };
  
  /**
   * 🆕 根据数据大小选择传输方式
   * - < 2MB: inline JSON
   * - >= 2MB: OSS key
   */
  private buildDataSource(session: any): { type: string; data?: any; oss_key?: string } {
    const payload = session.dataPayload;
    const payloadSize = JSON.stringify(payload).length;
    
    const SIZE_THRESHOLD = 2 * 1024 * 1024;  // 2MB
    
    if (payloadSize < SIZE_THRESHOLD) {
      // 小数据:直接内联
      return {
        type: 'inline',
        data: payload
      };
    } else {
      // 大数据:上传 OSS传递 key
      // 注意:此处假设 session 创建时已上传 OSS
      const ossKey = session.dataOssKey || `sessions/${session.id}/data.json`;
      return {
        type: 'oss',
        oss_key: ossKey
      };
    }
  }
    
    // 3. 调用 R 服务
    try {
      const response = await this.client.post(
        `/api/v1/skills/${plan.tool_code}`,
        requestBody
      );
      
      const executionMs = Date.now() - startTime;
      
      // 4. 记录执行日志(不含真实数据)
      await prisma.ssaExecutionLog.create({
        data: {
          sessionId,
          toolCode: plan.tool_code,
          inputParams: plan.params,  // 只记录参数,不记录数据
          outputStatus: response.data.status,
          outputResult: response.data.results,
          traceLog: response.data.trace_log || [],
          executionMs
        }
      });
      
      return response.data;
      
    } catch (error: any) {
      logger.error('R service call failed', { sessionId, toolCode: plan.tool_code, error });
      
      // 🆕 502/504 特殊处理R 服务崩溃或超时)
      const statusCode = error.response?.status;
      if (statusCode === 502 || statusCode === 504) {
        throw new Error('统计服务繁忙或数据异常,请稍后重试');
      }
      
      // 🆕 提取 R 服务返回的用户友好提示
      const userHint = error.response?.data?.user_hint;
      if (userHint) {
        throw new Error(userHint);
      }
      
      throw new Error(`R service error: ${error.message}`);
    }
  }
  
  async healthCheck(): Promise<boolean> {
    try {
      const res = await this.client.get('/health');
      return res.data.status === 'ok';
    } catch {
      return false;
    }
  }
}

4.2 ToolRetrievalServiceRAG 检索)

// services/ToolRetrievalService.ts
import { VectorSearchService } from '@/common/rag';
import { LLMFactory } from '@/common/llm/adapters/LLMFactory';
import { prisma } from '@/common/db';

export class ToolRetrievalService {
  private vectorSearch: VectorSearchService;
  
  constructor() {
    this.vectorSearch = new VectorSearchService({
      schema: 'ssa_schema',
      table: 'tools_library',
      embeddingColumn: 'embedding',
      textColumn: 'search_text'
    });
  }

  async retrieveTools(query: string, dataSchema: object, topK = 5) {
    // 1. Query Rewrite可选提升召回
    const rewriter = LLMFactory.getAdapter('deepseek-v3');
    const rewritePrompt = `
将用户的统计分析需求改写为更适合检索统计工具的查询:
用户需求: ${query}
数据结构: ${JSON.stringify(dataSchema)}

输出改写后的查询(一句话):
    `.trim();
    
    const rewrittenQuery = await rewriter.chat([
      { role: 'user', content: rewritePrompt }
    ]);
    
    // 2. 向量检索
    const vectorResults = await this.vectorSearch.search(rewrittenQuery, topK);
    
    // 3. 关键词检索 (pg_bigm)
    const keywordResults = await prisma.$queryRaw`
      SELECT id, tool_code, name, description, params_schema, guardrails
      FROM ssa_schema.tools_library
      WHERE search_text LIKE '%' || ${query} || '%'
      AND is_active = true
      LIMIT 5
    `;
    
    // 4. RRF 融合
    const merged = this.rrfMerge(vectorResults, keywordResults);
    
    // 5. Rerank可选
    // const reranked = await this.rerank(merged, query);
    
    return merged.slice(0, topK);
  }

  private rrfMerge(vectorResults: any[], keywordResults: any[], k = 60) {
    const scores = new Map<string, number>();
    
    vectorResults.forEach((item, idx) => {
      const rrf = 1 / (k + idx + 1);
      scores.set(item.id, (scores.get(item.id) || 0) + rrf);
    });
    
    keywordResults.forEach((item, idx) => {
      const rrf = 1 / (k + idx + 1);
      scores.set(item.id, (scores.get(item.id) || 0) + rrf);
    });
    
    // 合并并排序
    const allItems = [...vectorResults, ...keywordResults];
    const unique = [...new Map(allItems.map(i => [i.id, i])).values()];
    
    return unique.sort((a, b) => 
      (scores.get(b.id) || 0) - (scores.get(a.id) || 0)
    );
  }
}

4.3 PlannerServiceAI 规划 + JSON 容错)

// services/PlannerService.ts
import { LLMFactory } from '@/common/llm/adapters/LLMFactory';
import { PromptService } from '@/common/prompts';
import { ToolRetrievalService } from './ToolRetrievalService';
import { prisma } from '@/common/db';
import { jsonrepair } from 'jsonrepair';  // 📌 JSON 修复库
import { planSchema } from '../validators/planSchema';  // 📌 Zod Schema

export class PlannerService {
  private retrieval: ToolRetrievalService;
  
  constructor() {
    this.retrieval = new ToolRetrievalService();
  }

  async generatePlan(sessionId: string, userQuery: string) {
    // 1. 获取会话的数据 Schema不含真实数据
    const session = await prisma.ssaSession.findUniqueOrThrow({
      where: { id: sessionId },
      select: { dataSchema: true }
    });
    
    // 2. RAG 检索候选工具
    const candidateTools = await this.retrieval.retrieveTools(
      userQuery,
      session.dataSchema,
      5
    );
    
    // 3. 获取 Planner Prompt
    const promptTemplate = await PromptService.get('SSA_PLANNER');
    
    // 4. 构造 Prompt
    const systemPrompt = promptTemplate
      .replace('{{data_schema_json}}', JSON.stringify(session.dataSchema, null, 2))
      .replace('{{candidate_tools_json}}', JSON.stringify(candidateTools, null, 2));
    
    // 5. 调用 LLM
    const llm = LLMFactory.getAdapter('deepseek-v3');
    const response = await llm.chat([
      { role: 'system', content: systemPrompt },
      { role: 'user', content: userQuery }
    ]);
    
    // 6. 📌 解析 + 修复 + 校验 JSON
    const plan = this.parseAndValidateJson(response, candidateTools);
    
    // 7. 保存用户消息和计划消息
    await prisma.ssaMessage.createMany({
      data: [
        {
          sessionId,
          role: 'user',
          contentType: 'text',
          content: { text: userQuery }
        },
        {
          sessionId,
          role: 'assistant',
          contentType: 'plan',
          content: plan
        }
      ]
    });
    
    return plan;
  }

  // 📌 增强的 JSON 解析(含修复和校验)
  private parseAndValidateJson(text: string, candidateTools: any[]): object {
    // Step 1: 提取 JSON 块
    const jsonMatch = text.match(/```json\n?([\s\S]*?)\n?```/) || 
                      text.match(/\{[\s\S]*\}/);
    
    if (!jsonMatch) {
      throw new Error('LLM response does not contain valid JSON');
    }
    
    let jsonStr = jsonMatch[1] || jsonMatch[0];
    
    // Step 2: 使用 jsonrepair 修复常见问题(末尾逗号、缺少引号等)
    try {
      jsonStr = jsonrepair(jsonStr);
    } catch (repairError) {
      // 修复失败,继续尝试原始解析
    }
    
    // Step 3: 解析 JSON
    let parsed: any;
    try {
      parsed = JSON.parse(jsonStr);
    } catch (parseError) {
      throw new Error(`JSON parse failed: ${parseError.message}`);
    }
    
    // Step 4: 使用 Zod 校验结构
    const validatedPlan = planSchema.safeParse(parsed);
    
    if (!validatedPlan.success) {
      throw new Error(`Plan validation failed: ${validatedPlan.error.message}`);
    }
    
    // Step 5: 校验 tool_code 是否在候选列表中
    const validToolCodes = candidateTools.map(t => t.tool_code);
    if (!validToolCodes.includes(validatedPlan.data.tool_code)) {
      throw new Error(`Invalid tool_code: ${validatedPlan.data.tool_code}`);
    }
    
    return validatedPlan.data;
  }
}

4.4 Zod Schema 定义

// validators/planSchema.ts
import { z } from 'zod';

export const planSchema = z.object({
  tool_code: z.string().min(1),
  reasoning: z.string().optional(),
  params: z.record(z.any()),
  guardrails: z.object({
    check_normality: z.boolean().optional(),
    check_homogeneity: z.boolean().optional(),
    auto_fix: z.boolean().optional()
  }).optional()
});

export type PlanType = z.infer<typeof planSchema>;

5. Brain-Hand 数据隔离

核心原则LLM 只看 SchemaR 服务处理真实数据

┌─────────────────────────────────────────────────────────────┐
│                     数据上传流程                              │
│                                                             │
│  Excel/CSV  ──────┬────────────────────────────────────────│
│                   │                                         │
│            ┌──────▼──────┐                                 │
│            │  数据解析器  │                                 │
│            └──────┬──────┘                                 │
│                   │                                         │
│         ┌─────────┴─────────┐                              │
│         │                   │                              │
│    dataSchema          dataPayload                         │
│   (结构/类型/统计)       (真实数据)                          │
│         │                   │                              │
│         ▼                   ▼                              │
│   LLM (Planner)        R (Executor)                        │
│                                                             │
└─────────────────────────────────────────────────────────────┘

5.1 数据解析实现

// services/DataParserService.ts
import * as XLSX from 'xlsx';

export class DataParserService {
  
  static parse(buffer: Buffer, filename: string) {
    const workbook = XLSX.read(buffer, { type: 'buffer' });
    const sheetName = workbook.SheetNames[0];
    const sheet = workbook.Sheets[sheetName];
    
    // 转为 JSON 数组
    const data = XLSX.utils.sheet_to_json(sheet);
    
    // 提取 Schema
    const schema = this.extractSchema(data);
    
    return {
      dataSchema: schema,    // 给 LLM
      dataPayload: data      // 给 R
    };
  }

  private static extractSchema(data: any[]) {
    if (data.length === 0) return { columns: [], rowCount: 0 };
    
    const columns = Object.keys(data[0]).map(colName => {
      const values = data.map(row => row[colName]).filter(v => v != null);
      const type = this.inferType(values);
      
      return {
        name: colName,
        type,
        ...this.computeStats(values, type, data.length)  // 📌 传入行数用于隐私保护
      };
    });
    
    return {
      rowCount: data.length,
      columns
    };
  }

  private static inferType(values: any[]): 'numeric' | 'categorical' | 'datetime' {
    const sample = values.slice(0, 100);
    const numericCount = sample.filter(v => typeof v === 'number' || !isNaN(Number(v))).length;
    
    if (numericCount / sample.length > 0.9) return 'numeric';
    return 'categorical';
  }

  private static computeStats(values: any[], type: string, rowCount: number) {
    if (type === 'numeric') {
      const nums = values.map(Number).filter(n => !isNaN(n));
      let min = Math.min(...nums);
      let max = Math.max(...nums);
      
      // 📌 小样本隐私保护N < 10 时模糊化极值
      if (rowCount < 10) {
        min = Math.floor(min / 10) * 10;  // 向下取整到十位
        max = Math.ceil(max / 10) * 10;   // 向上取整到十位
      }
      
      return {
        min,
        max,
        mean: nums.reduce((a, b) => a + b, 0) / nums.length,
        missing: values.length - nums.length,
        privacyProtected: rowCount < 10  // 📌 标记是否已模糊化
      };
    }
    
    // categorical
    const counts = new Map<string, number>();
    values.forEach(v => {
      const key = String(v);
      counts.set(key, (counts.get(key) || 0) + 1);
    });
    
    // 🆕 分类变量隐私保护:
    // 如果某个取值的计数 < 5 且总行数 > 10则隐藏具体值
    const uniqueValues: string[] = [];
    let maskedCount = 0;
    
    for (const [value, count] of counts.entries()) {
      if (count < 5 && rowCount > 10) {
        maskedCount++;
      } else {
        uniqueValues.push(value);
      }
    }
    
    // 最多展示 10 个非敏感值
    const safeValues = uniqueValues.slice(0, 10);
    if (maskedCount > 0) {
      safeValues.push(`[${maskedCount} 个稀有值已隐藏]`);
    }
    
    return {
      uniqueValues: safeValues,
      uniqueCount: counts.size,
      missing: values.filter(v => v == null || v === '').length,
      privacyProtected: maskedCount > 0  // 🆕 标记
    };
  }
}

6. Prompt 注册

-- 注册 Planner Prompt
INSERT INTO capability_schema.prompt_templates (code, name, content, model, temperature)
VALUES (
  'SSA_PLANNER',
  'SSA 统计规划器',
  '你是一名资深的生物统计学家。你面前有一份数据摘要Metadata和一组可用的统计工具箱。
请根据用户的需求选择最合适的一个工具并生成详细的执行计划SAP
### 数据摘要
{{data_schema_json}}

### 可用工具箱 (Candidates)
{{candidate_tools_json}}

### 决策规则 (Guardrails)
1. **类型匹配**:严格检查变量类型。不要把分类变量填入要求数值型的参数中。
2. **工具匹配**:如果用户要做 "预测",优先选 "回归" 类工具;如果做 "差异",选 "检验" 类工具。
3. **护栏配置**:对于 T 检验、ANOVA 等参数检验,必须开启 check_normality。

### 输出要求
请先在 <thinking> 标签中进行推理,分析变量类型和工具适用性。
然后输出纯 JSON格式如下
{
  "tool_code": "选中工具的CODE",
  "reasoning": "一句话解释为什么选这个工具",
  "params": { ...根据工具定义的 params_schema 填写... },
  "guardrails": { "check_normality": true, "auto_fix": true }
}',
  'deepseek-v3',
  0.3
);

7. 与主应用集成

// backend/src/index.ts
import ssaModule from './modules/ssa';

// 在 Fastify 注册
app.register(ssaModule, { prefix: '/api/v1/ssa' });

8. 环境变量

# .env

# R 服务配置
R_SERVICE_URL=http://ssa-r-service:8080  # SAE VPC 内网地址
R_SERVICE_TIMEOUT=120000                  # 📌 超时 120s

# 📌 OSS 配置(必须使用 VPC 内网 Endpoint
OSS_ENDPOINT=oss-cn-beijing-internal.aliyuncs.com  # 内网地址
OSS_BUCKET=ssa-data-bucket
OSS_ACCESS_KEY_ID=your-access-key
OSS_ACCESS_KEY_SECRET=your-secret

# LLM 配置
LLM_DEFAULT_MODEL=deepseek-v3

重要OSS Endpoint 必须使用 -internal 后缀的 VPC 内网地址,否则 R 服务的网络隔离策略会导致文件下载失败。


9. 测试检查清单

测试场景 预期结果
POST /sessions 创建会话 返回 sessionId
POST /sessions/:id/upload (CSV) 返回 dataSchema
POST /sessions/:id/upload (N<10) dataSchema.privacyProtected = true
POST /sessions/:id/plan (T检验意图) 返回包含 tool_code 的 plan
POST /sessions/:id/plan (LLM 返回格式错误 JSON) json-repair 修复成功
POST /sessions/:id/plan (参数不合法) Zod 校验失败,返回错误
POST /sessions/:id/execute R 服务返回 success
POST /sessions/:id/execute (超过 60s) 不超时,等待 120s
GET /sessions/:id/download-code 下载 .R 文件
R 服务宕机时 execute 返回友好错误

10. 依赖包清单

{
  "dependencies": {
    "jsonrepair": "^3.6.0",
    "zod": "^3.22.4",
    "xlsx": "^0.18.5",
    "axios": "^1.6.0"
  }
}