feat(ssa): Complete T-test end-to-end testing with 9 bug fixes - Phase 1 core 85% complete. R service: missing value auto-filter. Backend: error handling, variable matching, dynamic filename. Frontend: module activation, session isolation, error propagation. Full flow verified.

Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
2026-02-19 20:57:00 +08:00
parent 8137e3cde2
commit 49b5c37cb1
86 changed files with 21207 additions and 252 deletions

View File

@@ -0,0 +1,697 @@
import axios from 'axios';
import FormData from 'form-data';
import { logger } from '../../../common/logging/index.js';
/**
* REDCap API 适配器
*
* 用途封装REDCap REST API调用提供统一的接口
* 主要功能:
* - exportRecords: 拉取数据(支持增量同步)
* - exportMetadata: 获取字段定义
* - importRecords: 回写数据Phase 2
*/
export class RedcapAdapter {
/**
* 构造函数
* @param baseUrl REDCap基础URLhttp://localhost:8080
* @param apiToken API Token32位字符串
* @param timeout 超时时间毫秒默认30秒
*/
constructor(baseUrl, apiToken, timeout = 30000) {
// 移除末尾斜杠
this.baseUrl = baseUrl.replace(/\/$/, '');
this.apiToken = apiToken;
this.timeout = timeout;
// 创建axios实例
this.client = axios.create({
timeout: this.timeout,
headers: {
'User-Agent': 'IIT-Manager-Agent/1.0'
}
});
logger.info('RedcapAdapter initialized', {
baseUrl: this.baseUrl,
timeout: this.timeout
});
}
/**
* 导出记录(支持增量同步)
*
* 用途从REDCap拉取数据
* 增量同步使用dateRangeBegin参数只拉取新数据
*
* @param options 导出选项
* @returns 记录数组
*/
async exportRecords(options = {}) {
const formData = new FormData();
// 基础参数
formData.append('token', this.apiToken);
formData.append('content', 'record');
formData.append('format', 'json');
formData.append('type', 'flat');
// 指定记录ID
if (options.records && options.records.length > 0) {
options.records.forEach((recordId, index) => {
formData.append(`records[${index}]`, recordId);
});
logger.debug('Exporting specific records', {
recordCount: options.records.length
});
}
// 指定字段
if (options.fields && options.fields.length > 0) {
options.fields.forEach((field, index) => {
formData.append(`fields[${index}]`, field);
});
logger.debug('Exporting specific fields', {
fieldCount: options.fields.length
});
}
// 时间过滤(增量同步关键)
if (options.dateRangeBegin) {
const dateStr = this.formatRedcapDate(options.dateRangeBegin);
formData.append('dateRangeBegin', dateStr);
logger.debug('Using incremental sync', {
dateRangeBegin: dateStr
});
}
if (options.dateRangeEnd) {
const dateStr = this.formatRedcapDate(options.dateRangeEnd);
formData.append('dateRangeEnd', dateStr);
}
// 指定事件(纵向研究)
if (options.events && options.events.length > 0) {
options.events.forEach((event, index) => {
formData.append(`events[${index}]`, event);
});
}
// 返回值格式raw原始代码或 label显示标签
// 默认使用 raw保证数据一致性
formData.append('rawOrLabel', options.rawOrLabel || 'raw');
// 导出计算字段(如:从出生日期自动计算的年龄)
// 默认启用,确保所有字段数据完整
if (options.exportCalculatedFields !== false) {
formData.append('exportCalculatedFields', 'true');
}
try {
const startTime = Date.now();
const response = await this.client.post(`${this.baseUrl}/api/`, formData, {
headers: formData.getHeaders()
});
const duration = Date.now() - startTime;
// 验证响应格式
if (!Array.isArray(response.data)) {
logger.error('Invalid REDCap API response format', {
responseType: typeof response.data
});
throw new Error('Invalid response format: expected array');
}
logger.info('REDCap API: exportRecords success', {
recordCount: response.data.length,
duration: `${duration}ms`
});
return response.data;
}
catch (error) {
logger.error('REDCap API: exportRecords failed', {
error: error.message,
baseUrl: this.baseUrl,
options
});
// 友好的错误信息
if (error.code === 'ECONNREFUSED') {
throw new Error(`Cannot connect to REDCap at ${this.baseUrl}`);
}
else if (error.response?.status === 403) {
throw new Error('Invalid API Token or insufficient permissions');
}
else if (error.response?.status === 404) {
throw new Error('REDCap API endpoint not found');
}
else {
throw new Error(`REDCap API error: ${error.message}`);
}
}
}
/**
* 导出元数据(字段定义)
*
* 用途:获取项目的表单结构和字段定义
* 场景:初始化项目时了解字段类型、验证规则等
*
* @returns 元数据数组
*/
async exportMetadata() {
const formData = new FormData();
formData.append('token', this.apiToken);
formData.append('content', 'metadata');
formData.append('format', 'json');
try {
const startTime = Date.now();
const response = await this.client.post(`${this.baseUrl}/api/`, formData, {
headers: formData.getHeaders()
});
const duration = Date.now() - startTime;
if (!Array.isArray(response.data)) {
throw new Error('Invalid response format: expected array');
}
logger.info('REDCap API: exportMetadata success', {
fieldCount: response.data.length,
duration: `${duration}ms`
});
return response.data;
}
catch (error) {
logger.error('REDCap API: exportMetadata failed', {
error: error.message
});
throw new Error(`REDCap API error: ${error.message}`);
}
}
/**
* 导入记录(回写数据)
*
* 用途将AI质控意见写回REDCapPhase 2功能
* 场景:
* - 创建Data Query
* - 更新字段值
* - 添加质控标记
*
* @param records 记录数组
* @returns 导入结果
*/
async importRecords(records) {
const formData = new FormData();
formData.append('token', this.apiToken);
formData.append('content', 'record');
formData.append('format', 'json');
formData.append('type', 'flat');
formData.append('overwriteBehavior', 'normal');
formData.append('data', JSON.stringify(records));
try {
const startTime = Date.now();
const response = await this.client.post(`${this.baseUrl}/api/`, formData, {
headers: formData.getHeaders()
});
const duration = Date.now() - startTime;
logger.info('REDCap API: importRecords success', {
recordCount: records.length,
duration: `${duration}ms`,
result: response.data
});
return {
count: response.data.count || records.length,
ids: response.data.ids || []
};
}
catch (error) {
logger.error('REDCap API: importRecords failed', {
error: error.message,
recordCount: records.length
});
throw new Error(`REDCap API error: ${error.message}`);
}
}
/**
* 格式化日期为REDCap格式
*
* REDCap日期格式YYYY-MM-DD HH:MM:SS
* 示例2026-01-02 14:30:00
*
* @param date Date对象
* @returns REDCap格式的日期字符串
*/
formatRedcapDate(date) {
// ISO: 2026-01-02T14:30:00.000Z
// REDCap: 2026-01-02 14:30:00
return date
.toISOString()
.replace('T', ' ')
.substring(0, 19);
}
/**
* 测试API连接
*
* 用途验证API Token是否有效连接是否正常
*
* @returns 连接是否成功
*/
async testConnection() {
try {
// 尝试导出元数据最轻量的API调用
await this.exportMetadata();
logger.info('REDCap connection test: SUCCESS');
return true;
}
catch (error) {
logger.error('REDCap connection test: FAILED', { error });
return false;
}
}
// ============================================================
// MVP 新增便捷方法
// ============================================================
/**
* 获取单条记录(便捷方法)
*
* 对于纵向研究项目,同一个 record_id 可能有多个事件(访视),
* 此方法会将所有事件的数据合并到一个对象中。
*
* @param recordId 记录ID
* @returns 合并后的记录对象,未找到则返回 null
*/
async getRecordById(recordId) {
try {
const records = await this.exportRecords({ records: [recordId] });
if (records.length === 0) {
logger.warn('REDCap: Record not found', { recordId });
return null;
}
// 如果只有一条记录,直接返回
if (records.length === 1) {
return records[0];
}
// 纵向研究:合并所有事件的数据
// 策略:非空值覆盖空值,保留所有事件名称
const mergedRecord = {
record_id: recordId,
_events: [], // 记录所有事件名称
_event_count: records.length,
};
for (const record of records) {
const eventName = record.redcap_event_name || record.event_id || 'unknown';
mergedRecord._events.push(eventName);
// 遍历该事件的所有字段
for (const [key, value] of Object.entries(record)) {
// 跳过元数据字段
if (key === 'redcap_event_name' || key === 'event_id' || key === 'redcap_repeat_instrument' || key === 'redcap_repeat_instance') {
continue;
}
// 如果当前字段为空,或新值非空,则更新
const currentValue = mergedRecord[key];
const isCurrentEmpty = currentValue === undefined || currentValue === null || currentValue === '';
const isNewEmpty = value === undefined || value === null || value === '';
if (isCurrentEmpty && !isNewEmpty) {
mergedRecord[key] = value;
}
else if (!isCurrentEmpty && !isNewEmpty && currentValue !== value) {
// 两个值都非空且不同,保留最新的(后面的事件可能是更新的数据)
mergedRecord[key] = value;
}
}
}
logger.info('REDCap: Merged multi-event record', {
recordId,
eventCount: records.length,
mergedFieldCount: Object.keys(mergedRecord).length,
});
return mergedRecord;
}
catch (error) {
logger.error('REDCap: getRecordById failed', { recordId, error: error.message });
throw error;
}
}
/**
* 获取记录的指定字段(用于质控)
*
* 对于纵向研究项目,会合并所有事件的数据
*
* @param recordId 记录ID
* @param fields 字段列表
* @returns 合并后包含指定字段的记录对象
*/
async getRecordFields(recordId, fields) {
try {
const records = await this.exportRecords({
records: [recordId],
fields: fields
});
if (records.length === 0) {
return null;
}
// 如果只有一条记录,直接返回
if (records.length === 1) {
return records[0];
}
// 纵向研究:合并所有事件的指定字段
const mergedRecord = {
record_id: recordId,
};
for (const record of records) {
for (const [key, value] of Object.entries(record)) {
if (key === 'redcap_event_name' || key === 'event_id') {
continue;
}
const currentValue = mergedRecord[key];
const isCurrentEmpty = currentValue === undefined || currentValue === null || currentValue === '';
const isNewEmpty = value === undefined || value === null || value === '';
if (isCurrentEmpty && !isNewEmpty) {
mergedRecord[key] = value;
}
}
}
return mergedRecord;
}
catch (error) {
logger.error('REDCap: getRecordFields failed', { recordId, fields, error: error.message });
throw error;
}
}
/**
* 获取所有记录的指定字段(批量质控用)
*
* @param fields 字段列表
* @returns 记录数组
*/
async getAllRecordsFields(fields) {
try {
return await this.exportRecords({ fields });
}
catch (error) {
logger.error('REDCap: getAllRecordsFields failed', { fields, error: error.message });
throw error;
}
}
/**
* 导出项目信息
*
* @returns 项目基本信息
*/
async exportProjectInfo() {
const formData = new FormData();
formData.append('token', this.apiToken);
formData.append('content', 'project');
formData.append('format', 'json');
try {
const response = await this.client.post(`${this.baseUrl}/api/`, formData, { headers: formData.getHeaders() });
logger.info('REDCap API: exportProjectInfo success', {
projectTitle: response.data.project_title
});
return response.data;
}
catch (error) {
logger.error('REDCap API: exportProjectInfo failed', { error: error.message });
throw new Error(`REDCap API error: ${error.message}`);
}
}
/**
* 导出表单/工具列表
*
* @returns 表单列表
*/
async exportInstruments() {
const formData = new FormData();
formData.append('token', this.apiToken);
formData.append('content', 'instrument');
formData.append('format', 'json');
try {
const response = await this.client.post(`${this.baseUrl}/api/`, formData, { headers: formData.getHeaders() });
logger.info('REDCap API: exportInstruments success', {
instrumentCount: response.data.length
});
return response.data;
}
catch (error) {
logger.error('REDCap API: exportInstruments failed', { error: error.message });
throw new Error(`REDCap API error: ${error.message}`);
}
}
/**
* 导出审计日志Logging/Audit Trail
*
* 用途:获取谁在什么时间录入/修改了数据
*
* @param options 查询选项
* @param options.record 指定记录ID可选
* @param options.beginTime 开始时间(可选)
* @param options.endTime 结束时间(可选)
* @returns 审计日志数组
*/
async exportLogging(options) {
const formData = new FormData();
formData.append('token', this.apiToken);
formData.append('content', 'log');
formData.append('format', 'json');
formData.append('logtype', 'record'); // 只获取记录相关的日志
if (options?.record) {
formData.append('record', options.record);
}
if (options?.beginTime) {
formData.append('beginTime', this.formatRedcapDate(options.beginTime));
}
if (options?.endTime) {
formData.append('endTime', this.formatRedcapDate(options.endTime));
}
try {
const startTime = Date.now();
const response = await this.client.post(`${this.baseUrl}/api/`, formData, { headers: formData.getHeaders() });
const duration = Date.now() - startTime;
logger.info('REDCap API: exportLogging success', {
logCount: response.data.length,
duration: `${duration}ms`
});
return response.data;
}
catch (error) {
logger.error('REDCap API: exportLogging failed', { error: error.message });
throw new Error(`REDCap API error: ${error.message}`);
}
}
/**
* 获取记录的审计信息(谁、什么时间录入/修改)
*
* @param recordId 记录ID
* @returns 该记录的所有操作日志
*/
async getRecordAuditInfo(recordId) {
const logs = await this.exportLogging({ record: recordId });
// 按时间排序(最早的在前)
logs.sort((a, b) => new Date(a.timestamp).getTime() - new Date(b.timestamp).getTime());
let createdAt;
let createdBy;
let lastModifiedAt;
let lastModifiedBy;
if (logs.length > 0) {
// 第一条是创建记录
const firstLog = logs[0];
createdAt = firstLog.timestamp;
createdBy = firstLog.username;
// 最后一条是最近修改
const lastLog = logs[logs.length - 1];
lastModifiedAt = lastLog.timestamp;
lastModifiedBy = lastLog.username;
}
return {
recordId,
createdAt,
createdBy,
lastModifiedAt,
lastModifiedBy,
logs: logs.map(log => ({
timestamp: log.timestamp,
username: log.username,
action: log.action,
details: log.details,
})),
};
}
/**
* 解析元数据为字段信息映射(增强版)
*
* 包含Field Name、Field Label、字段类型、验证规则、选项值等
*
* @returns 字段名 -> 字段信息的映射
*/
async getFieldInfoMap() {
const metadata = await this.exportMetadata();
const fieldMap = new Map();
for (const field of metadata) {
// 解析选项值 (格式: "1, 男 | 2, 女")
let choicesParsed = null;
if (field.select_choices_or_calculations && field.field_type !== 'calc') {
choicesParsed = field.select_choices_or_calculations
.split('|')
.map((choice) => {
const parts = choice.trim().split(',');
if (parts.length >= 2) {
return {
value: parts[0].trim(),
label: parts.slice(1).join(',').trim()
};
}
return null;
})
.filter((c) => c !== null);
}
fieldMap.set(field.field_name, {
fieldName: field.field_name,
fieldLabel: field.field_label,
fieldType: field.field_type,
formName: field.form_name,
sectionHeader: field.section_header || null,
fieldNote: field.field_note || null,
validation: field.text_validation_type_or_show_slider_number || null,
validationMin: field.text_validation_min || null,
validationMax: field.text_validation_max || null,
choices: field.select_choices_or_calculations || null,
choicesParsed,
required: field.required_field === 'y',
branching: field.branching_logic || null,
});
}
logger.info('REDCap: Field info map created', {
fieldCount: fieldMap.size
});
return fieldMap;
}
/**
* 获取带中文标签的记录数据
*
* 将 Field Name 转换为 Field Label并保留原始值
*
* @param recordId 记录ID
* @returns 带中文标签的记录数据
*/
async getRecordWithLabels(recordId) {
// 1. 获取记录数据(合并多事件)
const record = await this.getRecordById(recordId);
if (!record) {
return null;
}
// 2. 获取字段信息映射
const fieldMap = await this.getFieldInfoMap();
// 3. 获取审计信息
let auditInfo;
try {
const audit = await this.getRecordAuditInfo(recordId);
auditInfo = {
createdAt: audit.createdAt,
createdBy: audit.createdBy,
lastModifiedAt: audit.lastModifiedAt,
lastModifiedBy: audit.lastModifiedBy,
};
}
catch (e) {
// 审计日志可能需要额外权限,忽略错误
logger.warn('REDCap: Failed to get audit info', { recordId });
}
// 4. 转换数据
const data = [];
for (const [fieldName, value] of Object.entries(record)) {
// 跳过内部元数据字段
if (fieldName.startsWith('_') || fieldName === 'redcap_event_name') {
continue;
}
const fieldInfo = fieldMap.get(fieldName);
if (!fieldInfo) {
continue;
}
// 转换选项值为显示值
let displayValue = String(value ?? '');
if (fieldInfo.choicesParsed && value !== '' && value !== null) {
const choice = fieldInfo.choicesParsed.find(c => c.value === String(value));
if (choice) {
displayValue = choice.label;
}
}
data.push({
fieldName,
fieldLabel: fieldInfo.fieldLabel,
value,
displayValue,
formName: fieldInfo.formName,
});
}
return {
recordId,
data,
auditInfo,
};
}
/**
* 获取记录总数(快速统计)
*
* @returns 唯一记录数
*/
async getRecordCount() {
const records = await this.exportRecords({ fields: ['record_id'] });
const uniqueIds = new Set(records.map(r => r.record_id));
return uniqueIds.size;
}
/**
* 获取所有记录(合并多事件数据)
*
* 对于纵向研究项目,将每个 record_id 的所有事件数据合并为一条记录
*
* @param options 可选参数
* @param options.fields 指定字段列表(可选,默认获取所有字段)
* @returns 合并后的记录数组
*/
async getAllRecordsMerged(options) {
try {
// 导出所有记录(或指定字段)
const rawRecords = options?.fields
? await this.exportRecords({ fields: options.fields })
: await this.exportRecords();
if (rawRecords.length === 0) {
return [];
}
// 按 record_id 分组
const recordGroups = new Map();
for (const record of rawRecords) {
const recordId = record.record_id;
if (!recordGroups.has(recordId)) {
recordGroups.set(recordId, []);
}
recordGroups.get(recordId).push(record);
}
// 合并每个 record_id 的所有事件
const mergedRecords = [];
for (const [recordId, events] of recordGroups) {
const merged = {
record_id: recordId,
_event_count: events.length,
_events: events.map(e => e.redcap_event_name || 'unknown'),
};
// 合并所有事件的字段
for (const event of events) {
for (const [key, value] of Object.entries(event)) {
// 跳过元数据字段
if (key === 'redcap_event_name' || key === 'event_id' ||
key === 'redcap_repeat_instrument' || key === 'redcap_repeat_instance') {
continue;
}
const currentValue = merged[key];
const isCurrentEmpty = currentValue === undefined || currentValue === null || currentValue === '';
const isNewEmpty = value === undefined || value === null || value === '';
if (isCurrentEmpty && !isNewEmpty) {
merged[key] = value;
}
else if (!isCurrentEmpty && !isNewEmpty && currentValue !== value) {
// 保留最新的值
merged[key] = value;
}
}
}
mergedRecords.push(merged);
}
// 按 record_id 排序
mergedRecords.sort((a, b) => {
const idA = parseInt(a.record_id) || 0;
const idB = parseInt(b.record_id) || 0;
return idA - idB;
});
logger.info('REDCap: getAllRecordsMerged success', {
rawRecordCount: rawRecords.length,
mergedRecordCount: mergedRecords.length,
});
return mergedRecords;
}
catch (error) {
logger.error('REDCap: getAllRecordsMerged failed', { error: error.message });
throw error;
}
}
}

View File

@@ -0,0 +1,354 @@
/**
* HardRuleEngine - 硬规则质控引擎
*
* 功能:
* - 基于 JSON Logic 执行质控规则
* - 支持纳入标准、排除标准、变量范围检查
* - 返回结构化的质控结果
*
* 设计原则:
* - 零容忍:规则判断是确定性的,不依赖 AI 猜测
* - 可追溯:每条规则执行结果都有详细记录
* - 高性能:纯逻辑计算,无 LLM 调用
*/
import jsonLogic from 'json-logic-js';
import { PrismaClient } from '@prisma/client';
import { logger } from '../../../common/logging/index.js';
const prisma = new PrismaClient();
// ============================================================
// HardRuleEngine 实现
// ============================================================
export class HardRuleEngine {
constructor(projectId) {
this.rules = [];
this.fieldMappings = new Map();
this.projectId = projectId;
}
/**
* 初始化引擎(加载规则和字段映射)
*
* @param formName 可选,按表单名过滤规则(用于单表实时质控)
*/
async initialize(formName) {
// 1. 加载质控规则
const skill = await prisma.iitSkill.findFirst({
where: {
projectId: this.projectId,
skillType: 'qc_process',
isActive: true
}
});
if (!skill) {
throw new Error(`No active QC rules found for project: ${this.projectId}`);
}
const config = skill.config;
let allRules = config.rules || [];
// ⭐ 如果指定了 formName则只加载该表单相关的规则
// 规则通过 formName 或 field 字段来判断所属表单
if (formName) {
allRules = allRules.filter((rule) => {
// 优先使用规则中的 formName 字段
if (rule.formName) {
return rule.formName === formName;
}
// 如果规则没有 formName则默认包含兼容旧规则
// TODO: 后续可以通过 field_metadata 表来判断字段所属表单
return true;
});
}
this.rules = allRules;
logger.info('[HardRuleEngine] Rules loaded', {
projectId: this.projectId,
ruleCount: this.rules.length
});
// 2. 加载字段映射
const mappings = await prisma.iitFieldMapping.findMany({
where: { projectId: this.projectId }
});
for (const m of mappings) {
this.fieldMappings.set(m.aliasName, m.actualName);
}
logger.info('[HardRuleEngine] Field mappings loaded', {
mappingCount: this.fieldMappings.size
});
}
/**
* 执行质控检查
*
* @param recordId 记录ID
* @param data 记录数据REDCap 格式)
* @returns 质控结果
*/
execute(recordId, data) {
const startTime = Date.now();
const results = [];
const errors = [];
const warnings = [];
// 1. 数据预处理:应用字段映射
const normalizedData = this.normalizeData(data);
// 2. 执行每条规则
for (const rule of this.rules) {
const result = this.executeRule(rule, normalizedData);
results.push(result);
if (!result.passed) {
if (result.severity === 'error') {
errors.push(result);
}
else if (result.severity === 'warning') {
warnings.push(result);
}
}
}
// 3. 计算整体状态
let overallStatus = 'PASS';
if (errors.length > 0) {
overallStatus = 'FAIL';
}
else if (warnings.length > 0) {
overallStatus = 'WARNING';
}
const duration = Date.now() - startTime;
logger.info('[HardRuleEngine] QC completed', {
recordId,
overallStatus,
totalRules: this.rules.length,
errors: errors.length,
warnings: warnings.length,
duration: `${duration}ms`
});
return {
recordId,
projectId: this.projectId,
timestamp: new Date().toISOString(),
overallStatus,
summary: {
totalRules: this.rules.length,
passed: results.filter(r => r.passed).length,
failed: errors.length,
warnings: warnings.length
},
results,
errors,
warnings
};
}
/**
* 批量质控检查
*
* @param records 记录数组
* @returns 质控结果数组
*/
executeBatch(records) {
return records.map(r => this.execute(r.recordId, r.data));
}
/**
* 执行单条规则
*
* V2.1 优化:生成自包含的 LLM 友好消息
*/
executeRule(rule, data) {
try {
// 获取字段值
const fieldValue = this.getFieldValue(rule.field, data);
// 执行 JSON Logic
const passed = jsonLogic.apply(rule.logic, data);
// V2.1: 解析期望值(从 JSON Logic 中提取)
const expectedValue = this.extractExpectedValue(rule.logic);
const expectedCondition = this.describeLogic(rule.logic);
// V2.1: 构建自包含的 LLM 友好消息
const llmMessage = passed
? '通过'
: this.buildLlmMessage(rule, fieldValue, expectedValue);
// V2.1: 构建结构化证据
const evidence = {
value: fieldValue,
threshold: expectedValue,
unit: rule.metadata?.unit,
};
return {
ruleId: rule.id,
ruleName: rule.name,
field: rule.field,
passed,
message: passed ? '通过' : rule.message,
llmMessage,
severity: rule.severity,
category: rule.category,
actualValue: fieldValue,
expectedValue,
expectedCondition,
evidence,
};
}
catch (error) {
logger.error('[HardRuleEngine] Rule execution error', {
ruleId: rule.id,
error: error.message
});
return {
ruleId: rule.id,
ruleName: rule.name,
field: rule.field,
passed: false,
message: `规则执行出错: ${error.message}`,
llmMessage: `规则执行出错: ${error.message}`,
severity: 'error',
category: rule.category
};
}
}
/**
* V2.1: 从 JSON Logic 中提取期望值
*/
extractExpectedValue(logic) {
const operator = Object.keys(logic)[0];
const args = logic[operator];
switch (operator) {
case '>=':
case '<=':
case '>':
case '<':
case '==':
case '!=':
return String(args[1]);
case 'and':
// 对于 and 逻辑,尝试提取范围
const values = args.map((a) => this.extractExpectedValue(a)).filter(Boolean);
if (values.length === 2) {
return `${values[0]}-${values[1]}`;
}
return values.join(', ');
case '!!':
return '非空/必填';
default:
return '';
}
}
/**
* V2.1: 构建 LLM 友好的自包含消息
*
* 格式:当前 **{actualValue}** (标准: {expectedValue})
*/
buildLlmMessage(rule, actualValue, expectedValue) {
const fieldName = Array.isArray(rule.field) ? rule.field.join(', ') : rule.field;
const displayValue = actualValue !== undefined && actualValue !== null && actualValue !== ''
? `**${actualValue}**`
: '**空**';
// 根据规则类别生成不同的消息格式
switch (rule.category) {
case 'inclusion':
return `**${rule.name}**: 当前值 ${displayValue} (入排标准: ${expectedValue || rule.message})`;
case 'exclusion':
return `**${rule.name}**: 当前值 ${displayValue} 触发排除条件`;
case 'lab_values':
return `**${rule.name}**: 当前值 ${displayValue} (正常范围: ${expectedValue})`;
case 'logic_check':
return `**${rule.name}**: \`${fieldName}\` = ${displayValue} (要求: ${expectedValue || '非空'})`;
default:
return `**${rule.name}**: 当前值 ${displayValue} (标准: ${expectedValue || rule.message})`;
}
}
/**
* 获取字段值(支持映射)
*/
getFieldValue(field, data) {
if (Array.isArray(field)) {
return field.map(f => data[f]);
}
// 先尝试直接获取
if (data[field] !== undefined) {
return data[field];
}
// 再尝试通过映射获取
const mappedField = this.fieldMappings.get(field);
if (mappedField && data[mappedField] !== undefined) {
return data[mappedField];
}
return undefined;
}
/**
* 数据标准化(应用字段映射,转换类型)
*/
normalizeData(data) {
const normalized = { ...data };
// 1. 应用字段映射反向映射actualName -> aliasName
for (const [alias, actual] of this.fieldMappings.entries()) {
if (data[actual] !== undefined && normalized[alias] === undefined) {
normalized[alias] = data[actual];
}
}
// 2. 类型转换(字符串数字转数字)
for (const [key, value] of Object.entries(normalized)) {
if (typeof value === 'string' && value !== '' && !isNaN(Number(value))) {
normalized[key] = Number(value);
}
}
return normalized;
}
/**
* 描述 JSON Logic 表达式(用于报告)
*/
describeLogic(logic) {
const operator = Object.keys(logic)[0];
const args = logic[operator];
switch (operator) {
case '>=':
return `>= ${args[1]}`;
case '<=':
return `<= ${args[1]}`;
case '>':
return `> ${args[1]}`;
case '<':
return `< ${args[1]}`;
case '==':
return `= ${args[1]}`;
case '!=':
return `${args[1]}`;
case 'and':
return args.map((a) => this.describeLogic(a)).join(' 且 ');
case 'or':
return args.map((a) => this.describeLogic(a)).join(' 或 ');
case '!!':
return '非空';
default:
return JSON.stringify(logic);
}
}
/**
* 获取规则列表
*/
getRules() {
return this.rules;
}
/**
* 获取规则统计
*/
getRuleStats() {
const byCategory = {};
const bySeverity = {};
for (const rule of this.rules) {
byCategory[rule.category] = (byCategory[rule.category] || 0) + 1;
bySeverity[rule.severity] = (bySeverity[rule.severity] || 0) + 1;
}
return {
total: this.rules.length,
byCategory,
bySeverity
};
}
}
// ============================================================
// 工厂函数
// ============================================================
/**
* 创建并初始化 HardRuleEngine
*
* @param projectId 项目ID
* @param formName 可选,按表单名过滤规则(用于单表实时质控)
* 如果不传,则加载所有规则(用于全案批量质控)
*/
export async function createHardRuleEngine(projectId, formName) {
const engine = new HardRuleEngine(projectId);
await engine.initialize(formName);
return engine;
}

View File

@@ -0,0 +1,494 @@
/**
* SkillRunner - 规则调度器
*
* 功能:
* - 根据触发类型加载和执行 Skills
* - 协调 HardRuleEngine 和 SoftRuleEngine
* - 实现漏斗式执行策略Blocking → Hard → Soft
* - 聚合质控结果
*
* 设计原则:
* - 可插拔:通过 Skill 配置动态加载规则
* - 成本控制:阻断性检查优先,失败则跳过 AI 检查
* - 统一入口:所有触发类型使用相同的执行逻辑
*/
import { PrismaClient } from '@prisma/client';
import { logger } from '../../../common/logging/index.js';
import { createHardRuleEngine } from './HardRuleEngine.js';
import { createSoftRuleEngine } from './SoftRuleEngine.js';
import { RedcapAdapter } from '../adapters/RedcapAdapter.js';
const prisma = new PrismaClient();
// ============================================================
// SkillRunner 实现
// ============================================================
export class SkillRunner {
constructor(projectId) {
this.projectId = projectId;
}
/**
* 初始化 REDCap 适配器
*/
async initRedcapAdapter() {
if (this.redcapAdapter) {
return this.redcapAdapter;
}
const project = await prisma.iitProject.findUnique({
where: { id: this.projectId },
select: { redcapUrl: true, redcapApiToken: true },
});
if (!project) {
throw new Error(`项目不存在: ${this.projectId}`);
}
this.redcapAdapter = new RedcapAdapter(project.redcapUrl, project.redcapApiToken);
return this.redcapAdapter;
}
/**
* 按触发类型执行 Skills
*
* @param triggerType 触发类型
* @param options 执行选项
* @returns 执行结果
*/
async runByTrigger(triggerType, options) {
const startTime = Date.now();
logger.info('[SkillRunner] Starting execution', {
projectId: this.projectId,
triggerType,
options,
});
// 1. 加载启用的 Skills
const skills = await this.loadSkills(triggerType, options?.formName);
if (skills.length === 0) {
logger.warn('[SkillRunner] No active skills found', {
projectId: this.projectId,
triggerType,
});
return [];
}
// 2. 按优先级排序priority 越小越优先blocking 级别最优先)
skills.sort((a, b) => {
if (a.level === 'blocking' && b.level !== 'blocking')
return -1;
if (a.level !== 'blocking' && b.level === 'blocking')
return 1;
return a.priority - b.priority;
});
// 3. 获取要处理的记录
const records = await this.getRecordsToProcess(options?.recordId);
// 4. 对每条记录执行所有 Skills
const results = [];
for (const record of records) {
const result = await this.executeSkillsForRecord(record.recordId, record.data, skills, triggerType, options);
results.push(result);
// 保存质控日志
await this.saveQcLog(result);
}
const totalTime = Date.now() - startTime;
logger.info('[SkillRunner] Execution completed', {
projectId: this.projectId,
triggerType,
recordCount: records.length,
totalTimeMs: totalTime,
});
return results;
}
/**
* 加载 Skills
*/
async loadSkills(triggerType, formName) {
const where = {
projectId: this.projectId,
isActive: true,
};
// 根据触发类型过滤
if (triggerType === 'webhook') {
where.triggerType = 'webhook';
}
else if (triggerType === 'cron') {
where.triggerType = { in: ['cron', 'webhook'] }; // Cron 也执行 webhook 规则
}
// manual 执行所有规则
const skills = await prisma.iitSkill.findMany({
where,
select: {
id: true,
skillType: true,
name: true,
ruleType: true,
level: true,
priority: true,
config: true,
requiredTags: true,
},
});
// 如果指定了 formName过滤相关的 Skills
if (formName) {
return skills.filter(skill => {
const config = skill.config;
// 检查规则中是否有与该表单相关的规则
if (config?.rules) {
return config.rules.some((rule) => !rule.formName || rule.formName === formName);
}
return true; // 没有 formName 限制的规则默认包含
});
}
return skills;
}
/**
* 获取要处理的记录
*/
async getRecordsToProcess(recordId) {
const adapter = await this.initRedcapAdapter();
if (recordId) {
// 单记录处理
const data = await adapter.getRecordById(recordId);
if (!data) {
throw new Error(`记录不存在: ${recordId}`);
}
return [{ recordId, data }];
}
// 全量处理 - 获取所有记录
const records = await adapter.exportRecords();
const grouped = new Map();
// 按 record_id 分组合并数据
for (const record of records) {
const id = record.record_id || record.recordId;
if (!id)
continue;
if (!grouped.has(id)) {
grouped.set(id, {});
}
Object.assign(grouped.get(id), record);
}
return Array.from(grouped.entries()).map(([recordId, data]) => ({
recordId,
data,
}));
}
/**
* 对单条记录执行所有 Skills
*/
async executeSkillsForRecord(recordId, data, skills, triggerType, options) {
const startTime = Date.now();
const skillResults = [];
const allIssues = [];
const criticalIssues = [];
const warningIssues = [];
let blockedByLevel1 = false;
// 漏斗式执行
for (const skill of skills) {
const ruleType = skill.ruleType;
// 如果已被阻断且当前不是 blocking 级别,跳过 LLM 检查
if (blockedByLevel1 && ruleType === 'LLM_CHECK') {
logger.debug('[SkillRunner] Skipping LLM check due to blocking failure', {
skillId: skill.id,
recordId,
});
continue;
}
// 如果选项要求跳过软规则
if (options?.skipSoftRules && ruleType === 'LLM_CHECK') {
continue;
}
// 执行 Skill
const result = await this.executeSkill(skill, recordId, data);
skillResults.push(result);
// 收集问题
for (const issue of result.issues) {
allIssues.push(issue);
if (issue.severity === 'critical') {
criticalIssues.push(issue);
}
else if (issue.severity === 'warning') {
warningIssues.push(issue);
}
}
// 检查是否触发阻断
if (skill.level === 'blocking' && result.status === 'FAIL') {
blockedByLevel1 = true;
logger.info('[SkillRunner] Blocking check failed, skipping AI checks', {
skillId: skill.id,
recordId,
});
}
}
// 计算整体状态
let overallStatus = 'PASS';
if (criticalIssues.length > 0) {
overallStatus = 'FAIL';
}
else if (skillResults.some(r => r.status === 'UNCERTAIN')) {
overallStatus = 'UNCERTAIN';
}
else if (warningIssues.length > 0) {
overallStatus = 'WARNING';
}
const executionTimeMs = Date.now() - startTime;
return {
projectId: this.projectId,
recordId,
triggerType,
timestamp: new Date().toISOString(),
overallStatus,
summary: {
totalSkills: skillResults.length,
passed: skillResults.filter(r => r.status === 'PASS').length,
failed: skillResults.filter(r => r.status === 'FAIL').length,
warnings: skillResults.filter(r => r.status === 'WARNING').length,
uncertain: skillResults.filter(r => r.status === 'UNCERTAIN').length,
blockedByLevel1,
},
skillResults,
allIssues,
criticalIssues,
warningIssues,
executionTimeMs,
};
}
/**
* 执行单个 Skill
*/
async executeSkill(skill, recordId, data) {
const startTime = Date.now();
const ruleType = skill.ruleType;
const config = skill.config;
const issues = [];
let status = 'PASS';
try {
if (ruleType === 'HARD_RULE') {
// 使用 HardRuleEngine
const engine = await createHardRuleEngine(this.projectId);
// 临时注入规则(如果 config 中有)
if (config?.rules) {
const result = this.executeHardRulesDirectly(config.rules, recordId, data);
issues.push(...result.issues);
status = result.status;
}
}
else if (ruleType === 'LLM_CHECK') {
// 使用 SoftRuleEngine
const engine = createSoftRuleEngine(this.projectId, {
model: config?.model || 'deepseek-v3',
});
const checks = (config?.checks || []).map((check) => ({
id: check.id,
name: check.name || check.desc,
description: check.desc,
promptTemplate: check.promptTemplate || check.prompt,
requiredTags: check.requiredTags || skill.requiredTags || [],
category: check.category || 'medical_logic',
severity: check.severity || 'warning',
}));
if (checks.length > 0) {
const result = await engine.execute(recordId, data, checks);
for (const checkResult of result.results) {
if (checkResult.status !== 'PASS') {
issues.push({
ruleId: checkResult.checkId,
ruleName: checkResult.checkName,
message: checkResult.reason,
severity: checkResult.severity,
evidence: checkResult.evidence,
confidence: checkResult.confidence,
});
}
}
if (result.overallStatus === 'FAIL') {
status = 'FAIL';
}
else if (result.overallStatus === 'UNCERTAIN') {
status = 'UNCERTAIN';
}
}
}
else if (ruleType === 'HYBRID') {
// 混合模式:先执行硬规则,再执行软规则
// TODO: 实现混合逻辑
logger.warn('[SkillRunner] Hybrid rules not yet implemented', {
skillId: skill.id,
});
}
}
catch (error) {
logger.error('[SkillRunner] Skill execution error', {
skillId: skill.id,
error: error.message,
});
status = 'UNCERTAIN';
issues.push({
ruleId: 'EXECUTION_ERROR',
ruleName: '执行错误',
message: `Skill 执行出错: ${error.message}`,
severity: 'warning',
});
}
const executionTimeMs = Date.now() - startTime;
return {
skillId: skill.id,
skillName: skill.name,
skillType: skill.skillType,
ruleType,
status,
issues,
executionTimeMs,
};
}
/**
* 直接执行硬规则(不通过 HardRuleEngine 初始化)
*
* V2.1 优化:添加 expectedValue, llmMessage, evidence 字段
*/
executeHardRulesDirectly(rules, recordId, data) {
const issues = [];
let hasFail = false;
let hasWarning = false;
// 动态导入 json-logic-js
const jsonLogic = require('json-logic-js');
for (const rule of rules) {
try {
const passed = jsonLogic.apply(rule.logic, data);
if (!passed) {
const severity = rule.severity === 'error' ? 'critical' : 'warning';
const actualValue = this.getFieldValue(rule.field, data);
// V2.1: 提取期望值
const expectedValue = this.extractExpectedValue(rule.logic);
// V2.1: 构建自包含的 LLM 友好消息
const llmMessage = this.buildLlmMessage(rule, actualValue, expectedValue);
issues.push({
ruleId: rule.id,
ruleName: rule.name,
field: rule.field,
message: rule.message,
llmMessage, // V2.1: 自包含消息
severity,
actualValue,
expectedValue, // V2.1: 期望值
evidence: {
value: actualValue,
threshold: expectedValue,
unit: rule.metadata?.unit,
},
});
if (severity === 'critical') {
hasFail = true;
}
else {
hasWarning = true;
}
}
}
catch (error) {
logger.warn('[SkillRunner] Rule execution error', {
ruleId: rule.id,
error: error.message,
});
}
}
let status = 'PASS';
if (hasFail) {
status = 'FAIL';
}
else if (hasWarning) {
status = 'WARNING';
}
return { status, issues };
}
/**
* V2.1: 从 JSON Logic 中提取期望值
*/
extractExpectedValue(logic) {
const operator = Object.keys(logic)[0];
const args = logic[operator];
switch (operator) {
case '>=':
case '<=':
case '>':
case '<':
case '==':
case '!=':
return String(args[1]);
case 'and':
// 对于 and 逻辑,尝试提取范围
if (Array.isArray(args)) {
const values = args.map((a) => this.extractExpectedValue(a)).filter(Boolean);
if (values.length === 2) {
return `${values[0]}-${values[1]}`;
}
return values.join(', ');
}
return '';
case '!!':
return '非空/必填';
default:
return '';
}
}
/**
* V2.1: 构建 LLM 友好的自包含消息
*/
buildLlmMessage(rule, actualValue, expectedValue) {
const displayValue = actualValue !== undefined && actualValue !== null && actualValue !== ''
? `**${actualValue}**`
: '**空**';
if (expectedValue) {
return `**${rule.name}**: 当前值 ${displayValue} (标准: ${expectedValue})`;
}
return `**${rule.name}**: 当前值 ${displayValue}`;
}
/**
* 获取字段值
*/
getFieldValue(field, data) {
if (Array.isArray(field)) {
return field.map(f => data[f]);
}
return data[field];
}
/**
* 保存质控日志
*/
async saveQcLog(result) {
try {
// 将结果保存到 iit_qc_logs 表
// 将 summary 合并到 issues 数组中(作为元数据记录)
const issuesWithSummary = {
items: result.allIssues,
summary: result.summary,
};
await prisma.iitQcLog.create({
data: {
projectId: result.projectId,
recordId: result.recordId,
qcType: 'holistic',
formName: null,
status: result.overallStatus,
issues: JSON.parse(JSON.stringify(issuesWithSummary)), // 转换为 JSON 兼容格式
ruleVersion: 'v3.0', // CRA 智能质控引擎版本
rulesEvaluated: result.summary.totalSkills || 0,
rulesPassed: result.summary.passed || 0,
rulesFailed: result.summary.failed || 0,
rulesSkipped: result.summary.skipped || 0,
createdAt: new Date(result.timestamp),
},
});
}
catch (error) {
logger.error('[SkillRunner] Failed to save QC log', {
recordId: result.recordId,
error: error.message,
});
}
}
}
// ============================================================
// 工厂函数
// ============================================================
/**
* 创建 SkillRunner 实例
*
* @param projectId 项目ID
* @returns SkillRunner 实例
*/
export function createSkillRunner(projectId) {
return new SkillRunner(projectId);
}

View File

@@ -0,0 +1,353 @@
/**
* SoftRuleEngine - 软规则质控引擎 (LLM 推理)
*
* 功能:
* - 调用 LLM 进行复杂的医学逻辑判断
* - 支持入排标准、AE 事件检测、方案偏离等场景
* - 返回带证据链的结构化结果
*
* 设计原则:
* - 智能推理:利用 LLM 处理模糊规则和复杂逻辑
* - 证据链:每个判断都附带推理过程和证据
* - 三态输出PASS / FAIL / UNCERTAIN需人工确认
*/
import { PrismaClient } from '@prisma/client';
import { LLMFactory } from '../../../common/llm/adapters/LLMFactory.js';
import { logger } from '../../../common/logging/index.js';
import { buildClinicalSlice } from '../services/PromptBuilder.js';
const prisma = new PrismaClient();
// ============================================================
// SoftRuleEngine 实现
// ============================================================
export class SoftRuleEngine {
constructor(projectId, config) {
this.projectId = projectId;
this.model = config?.model || 'deepseek-v3';
this.timeoutMs = config?.timeoutMs || 30000;
}
/**
* 执行软规则检查
*
* @param recordId 记录ID
* @param data 记录数据
* @param checks 要执行的检查列表
* @returns 检查结果
*/
async execute(recordId, data, checks) {
const startTime = Date.now();
const results = [];
const failedChecks = [];
const uncertainChecks = [];
logger.info('[SoftRuleEngine] Starting execution', {
projectId: this.projectId,
recordId,
checkCount: checks.length,
model: this.model,
});
// 逐个执行检查(可以改为并发,但需注意 Token 限制)
for (const check of checks) {
try {
const result = await this.executeCheck(recordId, data, check);
results.push(result);
if (result.status === 'FAIL') {
failedChecks.push(result);
}
else if (result.status === 'UNCERTAIN') {
uncertainChecks.push(result);
}
}
catch (error) {
logger.error('[SoftRuleEngine] Check execution failed', {
checkId: check.id,
error: error.message,
});
// 发生错误时标记为 UNCERTAIN
const errorResult = {
checkId: check.id,
checkName: check.name,
status: 'UNCERTAIN',
reason: `执行出错: ${error.message}`,
evidence: {},
confidence: 0,
severity: check.severity,
category: check.category,
};
results.push(errorResult);
uncertainChecks.push(errorResult);
}
}
// 计算整体状态
let overallStatus = 'PASS';
if (failedChecks.length > 0) {
overallStatus = 'FAIL';
}
else if (uncertainChecks.length > 0) {
overallStatus = 'UNCERTAIN';
}
const duration = Date.now() - startTime;
logger.info('[SoftRuleEngine] Execution completed', {
recordId,
overallStatus,
totalChecks: checks.length,
failed: failedChecks.length,
uncertain: uncertainChecks.length,
duration: `${duration}ms`,
});
return {
recordId,
projectId: this.projectId,
timestamp: new Date().toISOString(),
overallStatus,
summary: {
totalChecks: checks.length,
passed: results.filter(r => r.status === 'PASS').length,
failed: failedChecks.length,
uncertain: uncertainChecks.length,
},
results,
failedChecks,
uncertainChecks,
};
}
/**
* 执行单个检查
*/
async executeCheck(recordId, data, check) {
const startTime = Date.now();
// 1. 构建 Prompt
const prompt = this.buildCheckPrompt(recordId, data, check);
// 2. 调用 LLM
const llmAdapter = LLMFactory.getAdapter(this.model);
const response = await llmAdapter.chat([
{
role: 'system',
content: this.getSystemPrompt(),
},
{
role: 'user',
content: prompt,
},
]);
const rawResponse = response.content;
// 3. 解析响应
const parsed = this.parseResponse(rawResponse, check);
const duration = Date.now() - startTime;
logger.debug('[SoftRuleEngine] Check executed', {
checkId: check.id,
status: parsed.status,
confidence: parsed.confidence,
duration: `${duration}ms`,
});
return {
checkId: check.id,
checkName: check.name,
status: parsed.status,
reason: parsed.reason,
evidence: parsed.evidence,
confidence: parsed.confidence,
severity: check.severity,
category: check.category,
rawResponse,
};
}
/**
* 构建检查 Prompt
*/
buildCheckPrompt(recordId, data, check) {
// 使用 PromptBuilder 生成临床数据切片
const clinicalSlice = buildClinicalSlice({
task: check.name,
criteria: [check.description || check.name],
patientData: data,
tags: check.requiredTags,
instruction: '请根据以下数据进行判断。',
});
// 替换 Prompt 模板中的变量
let userPrompt = check.promptTemplate;
// 替换 {{variable}} 格式的占位符
userPrompt = userPrompt.replace(/\{\{(\w+)\}\}/g, (_, key) => {
return data[key] !== undefined ? String(data[key]) : `[${key}未提供]`;
});
// 替换 {{#tag}} 格式的数据标签
userPrompt = userPrompt.replace(/\{\{#(\w+)\}\}/g, (_, tag) => {
// 根据标签筛选相关字段
return JSON.stringify(data, null, 2);
});
return `${clinicalSlice}\n\n---\n\n## 检查任务\n\n${userPrompt}`;
}
/**
* 获取系统 Prompt
*/
getSystemPrompt() {
return `你是一个专业的临床研究数据监查员 (CRA),负责核查受试者数据的质量和合规性。
## 你的职责
1. 仔细分析提供的临床数据
2. 根据检查任务进行判断
3. 给出清晰的判断结果和理由
## 输出格式要求
请严格按照以下 JSON 格式输出:
\`\`\`json
{
"status": "PASS" | "FAIL" | "UNCERTAIN",
"reason": "判断理由的详细说明",
"evidence": {
"key_field_1": "相关数据值",
"key_field_2": "相关数据值"
},
"confidence": 0.95
}
\`\`\`
## 状态说明
- **PASS**: 检查通过,数据符合要求
- **FAIL**: 检查失败,发现问题
- **UNCERTAIN**: 数据不足或存在歧义,需要人工确认
## 置信度说明
- 0.9-1.0: 非常确定
- 0.7-0.9: 比较确定
- 0.5-0.7: 有一定把握
- <0.5: 不太确定,建议人工复核
请只输出 JSON不要有其他内容。`;
}
/**
* 解析 LLM 响应
*/
parseResponse(rawResponse, check) {
try {
// 尝试提取 JSON
const jsonMatch = rawResponse.match(/```json\s*([\s\S]*?)\s*```/);
const jsonStr = jsonMatch ? jsonMatch[1] : rawResponse;
const parsed = JSON.parse(jsonStr.trim());
// 验证状态值
const validStatuses = ['PASS', 'FAIL', 'UNCERTAIN'];
const status = validStatuses.includes(parsed.status?.toUpperCase())
? parsed.status.toUpperCase()
: 'UNCERTAIN';
return {
status: status,
reason: parsed.reason || '未提供理由',
evidence: parsed.evidence || {},
confidence: typeof parsed.confidence === 'number'
? Math.min(1, Math.max(0, parsed.confidence))
: 0.5,
};
}
catch (error) {
logger.warn('[SoftRuleEngine] Failed to parse LLM response', {
checkId: check.id,
rawResponse: rawResponse.substring(0, 500),
});
// 解析失败时尝试简单匹配
const lowerResponse = rawResponse.toLowerCase();
if (lowerResponse.includes('pass') || lowerResponse.includes('通过')) {
return {
status: 'PASS',
reason: rawResponse,
evidence: {},
confidence: 0.6,
};
}
else if (lowerResponse.includes('fail') || lowerResponse.includes('失败') || lowerResponse.includes('不符合')) {
return {
status: 'FAIL',
reason: rawResponse,
evidence: {},
confidence: 0.6,
};
}
return {
status: 'UNCERTAIN',
reason: `无法解析响应: ${rawResponse.substring(0, 200)}`,
evidence: {},
confidence: 0.3,
};
}
}
/**
* 批量执行检查
*
* @param records 记录列表
* @param checks 检查列表
* @returns 所有记录的检查结果
*/
async executeBatch(records, checks) {
const results = [];
for (const record of records) {
const result = await this.execute(record.recordId, record.data, checks);
results.push(result);
}
return results;
}
}
// ============================================================
// 工厂函数
// ============================================================
/**
* 创建 SoftRuleEngine 实例
*
* @param projectId 项目ID
* @param config 可选配置
* @returns SoftRuleEngine 实例
*/
export function createSoftRuleEngine(projectId, config) {
return new SoftRuleEngine(projectId, config);
}
// ============================================================
// 预置检查模板
// ============================================================
/**
* 入排标准检查模板
*/
export const INCLUSION_EXCLUSION_CHECKS = [
{
id: 'IE-001',
name: '年龄入组标准',
description: '检查受试者年龄是否符合入组标准',
promptTemplate: '请根据受试者数据,判断其年龄是否在研究方案规定的入组范围内。如果年龄字段缺失,请标记为 UNCERTAIN。',
requiredTags: ['#demographics'],
category: 'inclusion',
severity: 'critical',
},
{
id: 'IE-002',
name: '确诊时间入组标准',
description: '检查受试者确诊时间是否符合入组标准(通常要求确诊在一定时间内)',
promptTemplate: '请根据受试者的确诊日期和入组日期,判断确诊时间是否符合研究方案要求。',
requiredTags: ['#demographics', '#medical_history'],
category: 'inclusion',
severity: 'critical',
},
];
/**
* AE 事件检测模板
*/
export const AE_DETECTION_CHECKS = [
{
id: 'AE-001',
name: 'Lab 异常与 AE 一致性',
description: '检查实验室检查异常值是否已在 AE 表中报告',
promptTemplate: '请对比实验室检查数据和不良事件记录判断是否存在未报告的实验室异常Grade 3 及以上)。',
requiredTags: ['#lab', '#ae'],
category: 'ae_detection',
severity: 'critical',
},
];
/**
* 方案偏离检测模板
*/
export const PROTOCOL_DEVIATION_CHECKS = [
{
id: 'PD-001',
name: '访视超窗检测',
description: '检查访视是否在方案规定的时间窗口内',
promptTemplate: '请根据访视日期数据,判断各访视之间的时间间隔是否符合方案规定的访视窗口。',
requiredTags: ['#visits'],
category: 'protocol_deviation',
severity: 'warning',
},
];

View File

@@ -0,0 +1,296 @@
/**
* PromptBuilder - LLM 提示词构建器
*
* 功能:
* - 构建 XML 临床切片格式(对 LLM 更友好,减少幻觉)
* - 生成自然语言摘要
* - 支持按语义标签过滤数据
*
* 设计原则:
* - XML 格式比 JSON 更适合 LLM 理解
* - 语义化标签便于上下文切片
* - 明确的任务指令减少幻觉
*
* @see docs/03-业务模块/IIT Manager Agent/04-开发计划/07-质控系统UI与LLM格式优化计划.md
*/
// ============================================================
// PromptBuilder 实现
// ============================================================
export class PromptBuilder {
/**
* 构建 XML 临床切片格式
*
* 输出格式示例:
* ```xml
* <task>核查该患者是否符合研究入排标准</task>
*
* <protocol_criteria>
* 1. 年龄 16-35 岁。
* 2. 月经周期规律28±7天
* </protocol_criteria>
*
* <patient_slice tag="#demographics, #screening">
* - 出生日期2003-01-07当前年龄 22 岁)✅
* - 月经周期45 天 ⚠️ 超出范围
* </patient_slice>
*
* <instruction>
* 请一步步推理,对比患者数据与标准。如发现异常,说明具体哪条标准被违反。
* </instruction>
* ```
*/
static buildClinicalSlice(params) {
const { task, criteria = [], patientData, tags = [], instruction, fieldMetadata = {} } = params;
const parts = [];
// 1. 任务描述
parts.push(`<task>${task}</task>`);
parts.push('');
// 2. 研究方案标准(如果有)
if (criteria.length > 0) {
parts.push('<protocol_criteria>');
criteria.forEach((c, i) => {
parts.push(` ${i + 1}. ${c}`);
});
parts.push('</protocol_criteria>');
parts.push('');
}
// 3. 患者数据切片
const tagAttr = tags.length > 0 ? ` tag="${tags.join(', ')}"` : '';
parts.push(`<patient_slice${tagAttr}>`);
// 格式化患者数据
const formattedData = this.formatPatientData(patientData, fieldMetadata);
formattedData.forEach(line => {
parts.push(` ${line}`);
});
parts.push('</patient_slice>');
parts.push('');
// 4. 指令(如果有)
if (instruction) {
parts.push('<instruction>');
parts.push(instruction);
parts.push('</instruction>');
}
return parts.join('\n').trim();
}
/**
* 格式化患者数据为可读格式
*/
static formatPatientData(data, metadata) {
const lines = [];
for (const [key, value] of Object.entries(data)) {
// 跳过系统字段
if (key.startsWith('redcap_') || key === 'record_id')
continue;
const meta = metadata[key];
const label = meta?.label || key;
const formattedValue = this.formatFieldValue(value, meta);
const statusIcon = this.getStatusIcon(value, meta);
lines.push(`- ${label}: ${formattedValue}${statusIcon ? ' ' + statusIcon : ''}`);
}
return lines;
}
/**
* 格式化字段值
*/
static formatFieldValue(value, meta) {
if (value === null || value === undefined || value === '') {
return '未填写';
}
// 日期类型
if (meta?.type === 'date' && typeof value === 'string') {
return value;
}
// 选择类型
if (meta?.options && meta.options.length > 0) {
const option = meta.options.find(o => o.value === String(value));
return option ? option.label : String(value);
}
// 数值类型 - 添加范围信息
if (meta?.type === 'number' && meta.normalRange) {
const { min, max } = meta.normalRange;
const numValue = Number(value);
if (!isNaN(numValue)) {
const rangeInfo = [];
if (min !== undefined)
rangeInfo.push(`${min}`);
if (max !== undefined)
rangeInfo.push(`${max}`);
if (rangeInfo.length > 0) {
return `${value} (正常范围: ${rangeInfo.join(', ')})`;
}
}
}
return String(value);
}
/**
* 获取状态图标
*/
static getStatusIcon(value, meta) {
if (value === null || value === undefined || value === '') {
return '⚠️'; // 缺失
}
// 检查数值是否在范围内
if (meta?.normalRange) {
const numValue = Number(value);
if (!isNaN(numValue)) {
const { min, max } = meta.normalRange;
if ((min !== undefined && numValue < min) || (max !== undefined && numValue > max)) {
return '❌ 超出范围';
}
return '✅';
}
}
return '';
}
/**
* 构建质控结果摘要(自然语言)
*
* 输出示例:
* "项目 test0207 共有 13 条记录,质控通过率 0%。主要问题包括知情同意未签署13条、入排标准不符8条。"
*/
static buildQcSummary(params) {
const { projectName, totalRecords, passedRecords, failedRecords, warningRecords = 0, topIssues = [] } = params;
const passRate = totalRecords > 0
? ((passedRecords / totalRecords) * 100).toFixed(1)
: '0';
let summary = `项目【${projectName}】共有 ${totalRecords} 条记录,质控通过率 ${passRate}%。`;
if (failedRecords > 0) {
summary += `\n\n【质控统计】\n`;
summary += `- 通过: ${passedRecords}\n`;
summary += `- 失败: ${failedRecords}\n`;
if (warningRecords > 0) {
summary += `- 警告: ${warningRecords}\n`;
}
}
if (topIssues.length > 0) {
const issueText = topIssues
.slice(0, 3)
.map(i => `${i.issue}${i.count}条)`)
.join('、');
summary += `\n【主要问题】${issueText}`;
}
return summary;
}
/**
* 构建录入进度摘要(自然语言)
*/
static buildEnrollmentSummary(params) {
const { projectName, totalRecords, avgCompletionRate, recentEnrollments, byQcStatus } = params;
let summary = `项目【${projectName}】录入概况:\n\n`;
summary += `【基本统计】\n`;
summary += `- 总记录数: ${totalRecords}\n`;
summary += `- 平均完成率: ${avgCompletionRate.toFixed(1)}%\n`;
summary += `- 近一周新增: ${recentEnrollments}\n`;
summary += `\n【质控分布】\n`;
summary += `- 通过: ${byQcStatus.pass}\n`;
summary += `- 失败: ${byQcStatus.fail}\n`;
summary += `- 警告: ${byQcStatus.warning}\n`;
summary += `- 待质控: ${byQcStatus.pending}`;
return summary;
}
/**
* 构建记录详情的 XML 格式
*/
static buildRecordDetail(params) {
const { projectName, recordId, data, fieldMetadata = {}, qcResult } = params;
const parts = [];
// 1. 记录头部信息
parts.push(`<record id="${recordId}" project="${projectName}">`);
parts.push('');
// 2. 数据内容
parts.push(' <data>');
for (const [key, value] of Object.entries(data)) {
if (key.startsWith('redcap_') || key === 'record_id')
continue;
const meta = fieldMetadata[key];
const label = meta?.label || key;
const formattedValue = value ?? '未填写';
parts.push(` <field name="${key}" label="${label}">${formattedValue}</field>`);
}
parts.push(' </data>');
// 3. 质控结果(如果有)
if (qcResult) {
parts.push('');
parts.push(` <qc_result status="${qcResult.status}">`);
if (qcResult.errors && qcResult.errors.length > 0) {
parts.push(' <errors>');
qcResult.errors.forEach(e => {
parts.push(` <error field="${e.field}">${e.message}</error>`);
});
parts.push(' </errors>');
}
if (qcResult.warnings && qcResult.warnings.length > 0) {
parts.push(' <warnings>');
qcResult.warnings.forEach(w => {
parts.push(` <warning field="${w.field}">${w.message}</warning>`);
});
parts.push(' </warnings>');
}
parts.push(' </qc_result>');
}
parts.push('');
parts.push('</record>');
return parts.join('\n');
}
/**
* 构建质控问题列表的 XML 格式
* 用于批量质控结果展示
*/
static buildQcIssuesList(params) {
const { projectName, totalRecords, passedRecords, failedRecords, problemRecords } = params;
const parts = [];
// 1. 项目概览
parts.push(`<qc_overview project="${projectName}">`);
parts.push(` <stats>`);
parts.push(` <total>${totalRecords}</total>`);
parts.push(` <passed>${passedRecords}</passed>`);
parts.push(` <failed>${failedRecords}</failed>`);
parts.push(` <pass_rate>${((passedRecords / totalRecords) * 100).toFixed(1)}%</pass_rate>`);
parts.push(` </stats>`);
parts.push('');
// 2. 问题记录列表
if (problemRecords.length > 0) {
parts.push(' <problem_records>');
problemRecords.forEach(record => {
parts.push(` <record id="${record.recordId}" status="${record.status}">`);
record.issues.slice(0, 3).forEach(issue => {
const fieldAttr = issue.field ? ` field="${issue.field}"` : '';
parts.push(` <issue${fieldAttr}>${issue.message}</issue>`);
});
parts.push(' </record>');
});
parts.push(' </problem_records>');
}
parts.push('</qc_overview>');
return parts.join('\n');
}
/**
* 包装 XML 内容为 LLM 系统消息
*/
static wrapAsSystemMessage(xmlContent, dataSource) {
const sourceLabel = {
'REDCap': 'REDCap 系统实时数据',
'QC_TABLE': '质控日志表缓存数据',
'SUMMARY_TABLE': '录入汇总表缓存数据',
'QC_REPORT': '质控报告(预生成)'
}[dataSource];
return `${sourceLabel} - 以下是真实数据,必须使用】
${xmlContent}
⚠️ 重要提示:
1. 上述数据是从系统查询的真实数据
2. 你必须且只能使用上述数据中的内容回答用户
3. 如果某个字段为空或标记为"未填写",请如实告知
4. 绝对禁止编造任何不在上述数据中的信息
5. 如果数据中包含 <error> 标签,说明存在质控问题,需要重点说明`;
}
}
// 导出单例便捷方法
export const buildClinicalSlice = PromptBuilder.buildClinicalSlice;
export const buildQcSummary = PromptBuilder.buildQcSummary;
export const buildEnrollmentSummary = PromptBuilder.buildEnrollmentSummary;
export const buildRecordDetail = PromptBuilder.buildRecordDetail;
export const buildQcIssuesList = PromptBuilder.buildQcIssuesList;
export const wrapAsSystemMessage = PromptBuilder.wrapAsSystemMessage;
export default PromptBuilder;

View File

@@ -0,0 +1,138 @@
/**
* R 服务客户端
* 负责调用 R Docker 服务执行统计分析
*
* 遵循规范:
* - 使用统一日志服务 @/common/logging
* - 使用统一存储服务 @/common/storageOSS 存储规范)
*/
import axios, { AxiosInstance } from 'axios';
import { logger } from '../../../common/logging/index.js';
import { storage } from '../../../common/storage/index.js';
import { prisma } from '../../../config/database.js';
export class RClientService {
private client: AxiosInstance;
constructor() {
const baseURL = process.env.R_SERVICE_URL || 'http://localhost:8082';
this.client = axios.create({
baseURL,
timeout: 120000, // 120 秒超时
headers: {
'Content-Type': 'application/json'
}
});
}
async execute(sessionId: string, plan: any, session: any): Promise<any> {
const startTime = Date.now();
// 构建请求体(使用统一存储服务)
const requestBody = {
data_source: await this.buildDataSource(session),
params: plan.params,
guardrails: plan.guardrails || {
check_normality: true,
auto_fix: true
}
};
try {
logger.info('[SSA:RClient] Calling R service', {
sessionId,
toolCode: plan.tool_code,
endpoint: `/api/v1/skills/${plan.tool_code}`,
requestBody
});
const response = await this.client.post(
`/api/v1/skills/${plan.tool_code}`,
requestBody
);
const executionMs = Date.now() - startTime;
logger.info('[SSA:RClient] R service response', {
sessionId,
status: response.data?.status,
hasResults: !!response.data?.results,
executionMs
});
// 记录执行日志(失败不阻塞主流程)
try {
await prisma.ssaExecutionLog.create({
data: {
sessionId,
toolCode: plan.tool_code,
inputParams: plan.params,
outputStatus: response.data.status,
outputResult: response.data.results,
traceLog: response.data.trace_log || [],
executionMs
}
});
} catch (logError) {
logger.warn('[SSA:RClient] Failed to save execution log', { error: logError });
}
// 添加执行耗时到返回结果
return {
...response.data,
executionMs
};
} catch (error: any) {
logger.error('R service call failed', { sessionId, toolCode: plan.tool_code, error: error.message });
// 502/504 特殊处理R 服务崩溃或超时)
const statusCode = error.response?.status;
if (statusCode === 502 || statusCode === 504) {
throw new Error('统计服务繁忙或数据异常,请稍后重试');
}
// 提取 R 服务返回的用户友好提示
const userHint = error.response?.data?.user_hint;
if (userHint) {
throw new Error(userHint);
}
throw new Error(`R service error: ${error.message}`);
}
}
/**
* 构建数据源(仅支持 OSS
*
* 设计说明SSA 场景下用户必须上传数据文件,文件存入 OSS
* R 服务通过预签名 URL 从 OSS 下载数据。
*/
private async buildDataSource(session: any): Promise<{ type: string; oss_url: string }> {
const ossKey = session.dataOssKey;
if (!ossKey) {
logger.error('[SSA:RClient] No data uploaded', { sessionId: session.id });
throw new Error('请先上传数据文件');
}
logger.info('[SSA:RClient] Building OSS data source', { sessionId: session.id, ossKey });
const signedUrl = await storage.getUrl(ossKey);
return {
type: 'oss',
oss_url: signedUrl
};
}
async healthCheck(): Promise<boolean> {
try {
const res = await this.client.get('/health');
return res.data.status === 'ok';
} catch {
return false;
}
}
}

View File

@@ -0,0 +1,28 @@
/**
* SSA-Pro 智能统计分析模块入口
* @module ssa
*
* 遵循规范:
* - 使用 authenticate 中间件(模块认证规范)
* - 使用统一日志服务
*/
import { FastifyInstance } from 'fastify';
import { authenticate } from '../../common/auth/auth.middleware.js';
import sessionRoutes from './routes/session.routes.js';
import analysisRoutes from './routes/analysis.routes.js';
import consultRoutes from './routes/consult.routes.js';
import configRoutes from './routes/config.routes.js';
export async function ssaRoutes(app: FastifyInstance) {
// 注册认证中间件(遵循模块认证规范)
app.addHook('preHandler', authenticate);
// 注册子路由
app.register(sessionRoutes, { prefix: '/sessions' });
app.register(analysisRoutes, { prefix: '/sessions' });
app.register(consultRoutes, { prefix: '/consult' });
app.register(configRoutes, { prefix: '/config' });
}
export default ssaRoutes;

View File

@@ -0,0 +1,349 @@
/**
* SSA 分析执行路由
*
* 遵循规范:
* - 使用 getUserId模块认证规范
* - 使用 storageOSS 存储规范)
* - 使用 logger日志服务
*/
import { FastifyInstance, FastifyRequest } from 'fastify';
import { RClientService } from '../executor/RClientService.js';
import { prisma } from '../../../config/database.js';
import { storage } from '../../../common/storage/index.js';
import { logger } from '../../../common/logging/index.js';
function getUserId(request: FastifyRequest): string {
const userId = (request as any).user?.userId;
if (!userId) {
throw new Error('User not authenticated');
}
return userId;
}
function getTenantId(request: FastifyRequest): string {
return (request as any).user?.tenantId || 'default';
}
export default async function analysisRoutes(app: FastifyInstance) {
const rClient = new RClientService();
// 上传数据(遵循 OSS 存储规范)
app.post('/:id/upload', async (req, reply) => {
const { id } = req.params as { id: string };
const userId = getUserId(req);
const tenantId = getTenantId(req);
const data = await req.file();
if (!data) {
return reply.status(400).send({ error: 'No file uploaded' });
}
const buffer = await data.toBuffer();
const filename = data.filename;
// 生成存储 Key遵循 OSS 目录结构规范)
const uuid = crypto.randomUUID().replace(/-/g, '').substring(0, 16);
const ext = filename.split('.').pop()?.toLowerCase() || 'csv';
const storageKey = `tenants/${tenantId}/users/${userId}/ssa/${uuid}.${ext}`;
// 上传到存储服务
await storage.upload(storageKey, buffer);
logger.info('[SSA:Analysis] Data uploaded', { sessionId: id, storageKey });
// 更新会话
await prisma.ssaSession.update({
where: { id },
data: { dataOssKey: storageKey }
});
return reply.send({
success: true,
message: 'Data uploaded successfully',
sessionId: id,
storageKey
});
});
// 生成分析计划
app.post('/:id/plan', async (req, reply) => {
const { id } = req.params as { id: string };
const { query } = req.body as { query: string };
// 获取会话数据 schema
const session = await prisma.ssaSession.findUnique({
where: { id },
select: { dataSchema: true }
});
// TODO: 调用 PlannerService 根据 query 和 schema 推荐分析方法
// const plan = await plannerService.generatePlan(id, query, session?.dataSchema);
// 从 schema 中提取列名用于智能推荐
const schema = session?.dataSchema as any;
const columns = schema?.columns || [];
const columnNames = columns.map((c: any) => c.name.toLowerCase());
// 从用户查询中提取变量名(简单的关键词匹配)
const queryLower = query.toLowerCase();
// 找出查询中提到的所有列
const mentionedColumns = columns.filter((col: any) =>
queryLower.includes(col.name.toLowerCase())
);
logger.info('[SSA:Analysis] Mentioned columns', {
query,
mentionedColumns: mentionedColumns.map((c: any) => ({ name: c.name, type: c.type }))
});
// 从提到的列中分配变量(分类 → 分组,数值 → 值)
let groupVar = '';
let valueVar = '';
for (const col of mentionedColumns) {
if (col.type === 'categorical' && !groupVar) {
groupVar = col.name;
} else if (col.type === 'numeric' && !valueVar) {
valueVar = col.name;
}
}
// 如果查询中只提到了一种类型,从未提到的列中补充
if (!groupVar && mentionedColumns.length > 0) {
// 查询中没有分类变量,从其他分类变量中选一个
const otherCategorical = columns.find((c: any) =>
c.type === 'categorical' && !mentionedColumns.some((m: any) => m.name === c.name)
);
groupVar = otherCategorical?.name || columns.find((c: any) => c.type === 'categorical')?.name || '';
}
if (!valueVar && mentionedColumns.length > 0) {
// 查询中没有数值变量,从其他数值变量中选一个
const otherNumeric = columns.find((c: any) =>
c.type === 'numeric' && !mentionedColumns.some((m: any) => m.name === c.name)
);
valueVar = otherNumeric?.name || columns.find((c: any) => c.type === 'numeric')?.name || '';
}
// 如果完全没有匹配到,使用默认策略(但避免使用同一个变量)
if (!groupVar) {
groupVar = columns.find((c: any) => c.type === 'categorical' && c.name !== valueVar)?.name || columns[0]?.name || 'group';
}
if (!valueVar) {
valueVar = columns.find((c: any) => c.type === 'numeric' && c.name !== groupVar)?.name || columns[1]?.name || 'value';
}
// 最终检查:确保两个变量不相同
if (groupVar === valueVar && columns.length > 1) {
// 如果相同,重新选择
const otherCol = columns.find((c: any) => c.name !== groupVar);
if (otherCol) {
if (otherCol.type === 'numeric') {
valueVar = otherCol.name;
} else {
groupVar = otherCol.name;
}
}
}
logger.info('[SSA:Analysis] Variable matching', {
query,
matchedGroupVar: groupVar,
matchedValueVar: valueVar,
availableColumns: columnNames
});
// 返回前端期望的 AnalysisPlan 格式camelCase
const mockPlan = {
id: `plan_${Date.now()}`,
toolCode: 'ST_T_TEST_IND',
toolName: '独立样本 T 检验',
description: `根据您的数据特征和分析需求"${query}",推荐使用独立样本 T 检验,比较 ${groupVar} 分组下 ${valueVar} 的差异。`,
parameters: {
group_var: groupVar,
value_var: valueVar
},
guardrails: [
{ checkName: '正态性检验', checkCode: 'NORMALITY', actionType: 'Switch', actionTarget: 'WILCOXON' },
{ checkName: '样本量检查', checkCode: 'SAMPLE_SIZE', threshold: 'n >= 30', actionType: 'Warn' }
],
confidence: 0.85
};
logger.info('[SSA:Analysis] Plan generated', { sessionId: id, query, toolCode: mockPlan.toolCode, params: mockPlan.parameters });
return reply.send(mockPlan);
});
// 执行分析
app.post('/:id/execute', async (req, reply) => {
const { id } = req.params as { id: string };
const { plan } = req.body as { plan: any };
logger.info('[SSA:Analysis] Execute request', { sessionId: id, plan });
try {
// 验证 plan 参数
if (!plan || !plan.tool_code) {
logger.error('[SSA:Analysis] Invalid plan', { plan });
return reply.status(400).send({
error: 'Invalid plan: missing tool_code',
user_hint: '分析计划无效,请重新生成'
});
}
// 获取会话数据
const session = await prisma.ssaSession.findUnique({
where: { id }
});
if (!session) {
logger.error('[SSA:Analysis] Session not found', { sessionId: id });
return reply.status(404).send({ error: 'Session not found' });
}
if (!session.dataOssKey) {
logger.error('[SSA:Analysis] No data uploaded', { sessionId: id });
return reply.status(400).send({
error: 'No data uploaded',
user_hint: '请先上传数据文件'
});
}
logger.info('[SSA:Analysis] Calling R service', {
sessionId: id,
toolCode: plan.tool_code,
dataOssKey: session.dataOssKey
});
// 调用 R 服务
const result = await rClient.execute(id, plan, session);
logger.info('[SSA:Analysis] R service returned', {
sessionId: id,
status: result?.status,
hasResults: !!result?.results,
message: result?.message,
userHint: result?.user_hint
});
// 检查 R 服务是否返回错误
if (result?.status === 'error') {
logger.warn('[SSA:Analysis] R service returned error', {
sessionId: id,
errorCode: result.error_code,
message: result.message
});
// 保存错误消息(用于历史记录)
await prisma.ssaMessage.create({
data: {
sessionId: id,
role: 'assistant',
contentType: 'error',
content: result
}
});
// 返回业务错误(使用 422 表示数据不符合业务规则)
return reply.status(422).send({
status: 'error',
error: result.message || '分析执行失败',
error_code: result.error_code,
user_hint: result.user_hint || result.message
});
}
// 保存成功结果消息
await prisma.ssaMessage.create({
data: {
sessionId: id,
role: 'assistant',
contentType: 'result',
content: result
}
});
return reply.send(result);
} catch (error: any) {
logger.error('[SSA:Analysis] Execute failed', {
sessionId: id,
error: error.message,
stack: error.stack
});
return reply.status(500).send({
error: error.message,
user_hint: '分析执行失败,请检查 R 服务是否正常运行'
});
}
});
// 下载代码
app.get('/:id/download-code', async (req, reply) => {
const { id } = req.params as { id: string };
// 获取会话信息(用于文件名)
const session = await prisma.ssaSession.findUnique({
where: { id },
select: { title: true, createdAt: true }
});
// 获取最新的执行结果消息
const latestMessage = await prisma.ssaMessage.findFirst({
where: {
sessionId: id,
contentType: 'result'
},
orderBy: { createdAt: 'desc' }
});
let code = `# SSA-Pro 生成的 R 代码\n# Session: ${id}\n# 暂无可用代码\n`;
let toolName = 'analysis';
if (latestMessage?.content) {
const content = latestMessage.content as any;
// 从消息内容中提取 reproducible_code
const reproducibleCode = content.reproducible_code || content.reproducibleCode;
if (reproducibleCode) {
code = reproducibleCode;
}
// 提取工具名称
if (content.results?.method) {
toolName = content.results.method.replace(/\s+/g, '_').replace(/[^a-zA-Z0-9_]/g, '');
}
}
// 生成有意义的文件名工具名_数据文件名_月日_时分
const now = new Date();
const dateStr = `${String(now.getMonth() + 1).padStart(2, '0')}${String(now.getDate()).padStart(2, '0')}`;
const timeStr = `${String(now.getHours()).padStart(2, '0')}${String(now.getMinutes()).padStart(2, '0')}`;
// 从 session title 提取数据文件名(去除扩展名和特殊字符)
let dataName = 'data';
if (session?.title) {
dataName = session.title
.replace(/\.(csv|xlsx|xls)$/i, '')
.replace(/[^a-zA-Z0-9\u4e00-\u9fa5_-]/g, '_')
.substring(0, 20);
}
const filename = `${toolName}_${dataName}_${dateStr}_${timeStr}.R`;
logger.info('[SSA:Analysis] Download code', { sessionId: id, filename, hasCode: code.length > 50 });
reply.header('Content-Type', 'text/plain; charset=utf-8');
reply.header('Content-Disposition', `attachment; filename="${encodeURIComponent(filename)}"`);
return reply.send(code);
});
// 健康检查
app.get('/r-service/health', async (req, reply) => {
const healthy = await rClient.healthCheck();
return reply.send({
r_service: healthy ? 'ok' : 'unavailable'
});
});
}

View File

@@ -0,0 +1,130 @@
/**
* SSA 配置中台路由
*
* 遵循规范:
* - 管理员接口需要权限校验
* - 使用 logger日志服务
*/
import { FastifyInstance, FastifyRequest } from 'fastify';
import { prisma } from '../../../config/database.js';
import { logger } from '../../../common/logging/index.js';
function getUserId(request: FastifyRequest): string {
const userId = (request as any).user?.userId;
if (!userId) {
throw new Error('User not authenticated');
}
return userId;
}
export default async function configRoutes(app: FastifyInstance) {
// 导入决策表
app.post('/decision-table', async (req, reply) => {
const data = await req.file();
if (!data) {
return reply.status(400).send({ error: 'No file uploaded' });
}
// TODO: 解析 Excel 并导入决策表
return reply.send({
success: true,
message: 'Decision table imported successfully'
});
});
// 获取决策表
app.get('/decision-table', async (req, reply) => {
// TODO: 从数据库获取决策表
return reply.send([]);
});
// 上传 R 脚本
app.post('/r-scripts', async (req, reply) => {
const data = await req.file();
if (!data) {
return reply.status(400).send({ error: 'No file uploaded' });
}
// TODO: 保存 R 脚本到数据库
return reply.send({
success: true,
message: 'R script uploaded successfully'
});
});
// 获取脚本列表
app.get('/r-scripts', async (req, reply) => {
// TODO: 从数据库获取脚本列表
return reply.send([]);
});
// 导入工具配置
app.post('/tool-config', async (req, reply) => {
const data = await req.file();
if (!data) {
return reply.status(400).send({ error: 'No file uploaded' });
}
// TODO: 解析 Excel 并导入工具配置
return reply.send({
success: true,
message: 'Tool config imported successfully'
});
});
// 获取工具列表
app.get('/tools', async (req, reply) => {
// TODO: 从配置缓存获取工具列表
return reply.send([
{
tool_code: 'ST_T_TEST_IND',
name: '独立样本 T 检验',
description: '比较两组独立样本的均值差异',
category: '假设检验'
}
]);
});
// 获取参数映射
app.get('/tools/:code/params', async (req, reply) => {
const { code } = req.params as { code: string };
// TODO: 从数据库获取参数映射
return reply.send([]);
});
// 获取护栏规则
app.get('/tools/:code/guardrails', async (req, reply) => {
const { code } = req.params as { code: string };
// TODO: 从数据库获取护栏规则
return reply.send([]);
});
// 热加载配置
app.post('/reload', async (req, reply) => {
// TODO: 重新加载所有配置到缓存
return reply.send({
success: true,
timestamp: new Date().toISOString()
});
});
// 校验配置文件
app.post('/validate', async (req, reply) => {
const data = await req.file();
if (!data) {
return reply.status(400).send({ error: 'No file uploaded' });
}
// TODO: 仅校验,不导入
return reply.send({ valid: true });
});
}

View File

@@ -0,0 +1,186 @@
/**
* SSA 咨询模式路由
*
* 遵循规范:
* - 使用 StreamingService流式响应服务
* - 使用 LLMFactoryLLM 网关)
* - 使用 getUserId模块认证规范
*/
import { FastifyInstance, FastifyRequest } from 'fastify';
import { prisma } from '../../../config/database.js';
import { createStreamingService } from '../../../common/streaming/index.js';
import { LLMFactory } from '../../../common/llm/adapters/LLMFactory.js';
import { logger } from '../../../common/logging/index.js';
function getUserId(request: FastifyRequest): string {
const userId = (request as any).user?.userId;
if (!userId) {
throw new Error('User not authenticated');
}
return userId;
}
export default async function consultRoutes(app: FastifyInstance) {
// 创建咨询会话(无数据)
app.post('/', async (req, reply) => {
const userId = getUserId(req);
const session = await prisma.ssaSession.create({
data: {
userId,
title: '统计咨询',
status: 'consult'
}
});
return reply.send(session);
});
// 咨询对话(非流式)
app.post('/:id/chat', async (req, reply) => {
const { id } = req.params as { id: string };
const { message } = req.body as { message: string };
const userId = getUserId(req);
// 保存用户消息
await prisma.ssaMessage.create({
data: {
sessionId: id,
role: 'user',
contentType: 'text',
content: { text: message }
}
});
// TODO: 调用 ConsultService 生成回复
const response = `感谢您的咨询。根据您描述的研究设计,我建议考虑以下统计方法...`;
// 保存助手回复
await prisma.ssaMessage.create({
data: {
sessionId: id,
role: 'assistant',
contentType: 'text',
content: { text: response }
}
});
return reply.send({ response });
});
// 咨询对话(流式)- 使用 StreamingService
app.post('/:id/chat/stream', async (req, reply) => {
const { id } = req.params as { id: string };
const { message } = req.body as { message: string };
const userId = getUserId(req);
logger.info('[SSA:Consult] Stream chat started', { sessionId: id, userId });
// 保存用户消息
await prisma.ssaMessage.create({
data: {
sessionId: id,
role: 'user',
contentType: 'text',
content: { text: message }
}
});
// 获取历史消息
const history = await prisma.ssaMessage.findMany({
where: { sessionId: id },
orderBy: { createdAt: 'asc' },
take: 20
});
// 构建消息列表
const messages = [
{
role: 'system' as const,
content: `你是一个专业的生物统计咨询师。请根据用户的研究设计和需求提供统计分析建议并帮助用户制定统计分析计划SAP
要点:
1. 理解研究设计(实验/观察、独立/配对、随机/分层等)
2. 明确研究假设和主要终点
3. 推荐合适的统计方法
4. 提示统计前提条件和注意事项`
},
...history.map(m => ({
role: m.role as 'user' | 'assistant',
content: (m.content as any).text || ''
}))
];
// 使用 StreamingService 流式输出
const streamingService = createStreamingService(reply, {
model: 'deepseek-v3',
temperature: 0.7,
maxTokens: 4096,
enableDeepThinking: true,
userId,
conversationId: id,
});
await streamingService.streamGenerate(messages, {
onComplete: async (content, thinking) => {
// 保存助手回复
await prisma.ssaMessage.create({
data: {
sessionId: id,
role: 'assistant',
contentType: 'text',
content: { text: content, thinking }
}
});
logger.info('[SSA:Consult] Stream chat completed', { sessionId: id });
}
});
});
// 生成 SAP 文档
app.post('/:id/generate-sap', async (req, reply) => {
const { id } = req.params as { id: string };
// TODO: 调用 SAPGeneratorService
const sap = {
title: '统计分析计划',
sections: [
{ heading: '研究背景', content: '...' },
{ heading: '数据描述', content: '...' },
{ heading: '统计假设', content: '...' },
{ heading: '分析方法', content: '...' },
{ heading: '结果解读指南', content: '...' },
{ heading: '注意事项', content: '...' }
],
recommendedTools: ['ST_T_TEST_IND'],
metadata: {
generatedAt: new Date().toISOString(),
sessionId: id,
version: '1.0'
}
};
return reply.send(sap);
});
// 下载 SAP 文档
app.get('/:id/download-sap', async (req, reply) => {
const { id } = req.params as { id: string };
const { format } = req.query as { format?: string };
// TODO: 生成 Word 或 Markdown 格式
const content = `# 统计分析计划\n\n## 研究背景\n...`;
if (format === 'word') {
// TODO: 使用 docx 库生成 Word
reply.header('Content-Type', 'application/vnd.openxmlformats-officedocument.wordprocessingml.document');
reply.header('Content-Disposition', `attachment; filename="SAP_${id}.docx"`);
} else {
reply.header('Content-Type', 'text/markdown');
reply.header('Content-Disposition', `attachment; filename="SAP_${id}.md"`);
}
return reply.send(content);
});
}

View File

@@ -0,0 +1,138 @@
/**
* SSA 会话管理路由
*
* 遵循规范:
* - 使用 getUserId模块认证规范
* - 使用 logger日志服务
* - 使用 storageOSS 存储规范)
*/
import { FastifyInstance, FastifyRequest } from 'fastify';
import crypto from 'crypto';
import { prisma } from '../../../config/database.js';
import { logger } from '../../../common/logging/index.js';
import { storage } from '../../../common/storage/index.js';
import { DataParserService } from '../services/DataParserService.js';
function getUserId(request: FastifyRequest): string {
const userId = (request as any).user?.userId;
if (!userId) {
throw new Error('User not authenticated');
}
return userId;
}
function getTenantId(request: FastifyRequest): string {
return (request as any).user?.tenantId || 'default';
}
export default async function sessionRoutes(app: FastifyInstance) {
// 创建会话(支持同时上传文件)
app.post('/', async (req, reply) => {
const userId = getUserId(req);
const tenantId = getTenantId(req);
// 检查是否有文件上传
const contentType = req.headers['content-type'] || '';
const isMultipart = contentType.includes('multipart/form-data');
let dataOssKey: string | null = null;
let dataSchema: any = null;
let title = '新分析会话';
if (isMultipart) {
// 处理文件上传
const data = await req.file();
if (data) {
const buffer = await data.toBuffer();
const filename = data.filename;
title = filename;
// 生成存储 Key遵循 OSS 目录结构规范)
const uuid = crypto.randomUUID().replace(/-/g, '').substring(0, 16);
const ext = filename.split('.').pop()?.toLowerCase() || 'csv';
dataOssKey = `tenants/${tenantId}/users/${userId}/ssa/${uuid}.${ext}`;
// 上传到 OSS
await storage.upload(dataOssKey, buffer);
logger.info('[SSA:Session] File uploaded to OSS', { dataOssKey, filename });
// 解析数据 schema
try {
const parser = new DataParserService();
dataSchema = await parser.parseSchema(buffer, ext);
logger.info('[SSA:Session] Data schema parsed', {
columns: dataSchema.columns.length,
rowCount: dataSchema.rowCount
});
} catch (parseError) {
logger.warn('[SSA:Session] Schema parsing failed, continuing without schema', { error: parseError });
}
}
}
// 创建会话
const session = await prisma.ssaSession.create({
data: {
userId,
title,
status: 'active',
dataOssKey,
dataSchema
}
});
logger.info('[SSA:Session] Session created', {
sessionId: session.id,
hasFile: !!dataOssKey
});
// 返回前端期望的格式
return reply.send({
sessionId: session.id,
schema: dataSchema || { columns: [], rowCount: 0 }
});
});
// 获取会话列表
app.get('/', async (req, reply) => {
const userId = getUserId(req);
const sessions = await prisma.ssaSession.findMany({
where: { userId },
orderBy: { createdAt: 'desc' },
take: 20
});
return reply.send(sessions);
});
// 获取会话详情
app.get('/:id', async (req, reply) => {
const { id } = req.params as { id: string };
const session = await prisma.ssaSession.findUnique({
where: { id },
include: { messages: true }
});
if (!session) {
return reply.status(404).send({ error: 'Session not found' });
}
return reply.send(session);
});
// 获取消息历史
app.get('/:id/messages', async (req, reply) => {
const { id } = req.params as { id: string };
const messages = await prisma.ssaMessage.findMany({
where: { sessionId: id },
orderBy: { createdAt: 'asc' }
});
return reply.send(messages);
});
}

View File

@@ -0,0 +1,189 @@
/**
* SSA 数据解析服务
*
* 功能:解析 CSV/Excel 文件,提取 schema 信息
*/
import { logger } from '../../../common/logging/index.js';
interface ColumnSchema {
name: string;
type: 'numeric' | 'categorical' | 'datetime' | 'text';
uniqueValues?: number;
nullCount?: number;
}
interface DataSchema {
columns: ColumnSchema[];
rowCount: number;
preview?: any[];
}
export class DataParserService {
/**
* 解析数据文件 schema
*/
async parseSchema(buffer: Buffer, ext: string): Promise<DataSchema> {
if (ext === 'csv') {
return this.parseCsvSchema(buffer);
} else if (ext === 'xlsx' || ext === 'xls') {
return this.parseExcelSchema(buffer);
}
throw new Error(`Unsupported file format: ${ext}`);
}
/**
* 解析 CSV 文件 schema
*/
private async parseCsvSchema(buffer: Buffer): Promise<DataSchema> {
const content = buffer.toString('utf-8');
const lines = content.trim().split('\n');
if (lines.length === 0) {
throw new Error('CSV file is empty');
}
// 解析表头
const headers = this.parseCsvLine(lines[0]);
// 解析数据行(最多读取前 1000 行用于类型推断)
const dataLines = lines.slice(1, Math.min(1001, lines.length));
const rows = dataLines.map(line => this.parseCsvLine(line));
// 推断列类型
const columns: ColumnSchema[] = headers.map((name, index) => {
const values = rows.map(row => row[index]).filter(v => v !== undefined && v !== '');
return {
name,
type: this.inferType(values),
uniqueValues: new Set(values).size,
nullCount: rows.length - values.length
};
});
return {
columns,
rowCount: lines.length - 1
};
}
/**
* 解析 Excel 文件 schema
*/
private async parseExcelSchema(buffer: Buffer): Promise<DataSchema> {
try {
const xlsx = await import('xlsx');
const workbook = xlsx.read(buffer, { type: 'buffer' });
const sheetName = workbook.SheetNames[0];
const sheet = workbook.Sheets[sheetName];
const data = xlsx.utils.sheet_to_json(sheet, { header: 1 }) as any[][];
if (data.length === 0) {
throw new Error('Excel file is empty');
}
const headers = data[0] as string[];
const rows = data.slice(1, Math.min(1001, data.length));
const columns: ColumnSchema[] = headers.map((name, index) => {
const values = rows.map(row => row[index]).filter(v => v !== undefined && v !== null && v !== '');
return {
name: String(name),
type: this.inferType(values.map(String)),
uniqueValues: new Set(values).size,
nullCount: rows.length - values.length
};
});
return {
columns,
rowCount: data.length - 1
};
} catch (error) {
logger.error('[DataParser] Excel parsing failed', { error });
throw new Error('Failed to parse Excel file');
}
}
/**
* 解析 CSV 行(处理引号和逗号)
*/
private parseCsvLine(line: string): string[] {
const result: string[] = [];
let current = '';
let inQuotes = false;
for (let i = 0; i < line.length; i++) {
const char = line[i];
if (char === '"') {
inQuotes = !inQuotes;
} else if (char === ',' && !inQuotes) {
result.push(current.trim());
current = '';
} else {
current += char;
}
}
result.push(current.trim());
return result;
}
/**
* 推断列类型
*
* 规则优先级:
* 1. 唯一值 <= 10 且 唯一值比例 < 20% → categorical即使是数字也视为分类
* 2. 90%+ 是数字 → numeric
* 3. 90%+ 是日期 → datetime
* 4. 唯一值比例 < 50% → categorical
* 5. 其他 → text
*/
private inferType(values: string[]): 'numeric' | 'categorical' | 'datetime' | 'text' {
if (values.length === 0) return 'text';
const sample = values.slice(0, 100);
const uniqueValues = new Set(sample);
const uniqueCount = uniqueValues.size;
const uniqueRatio = uniqueCount / sample.length;
// 规则1唯一值很少<=10且比例很低<20%)→ 分类变量
// 典型场景0/1, 是/否, A/B/C 等
if (uniqueCount <= 10 && uniqueRatio < 0.2) {
return 'categorical';
}
// 规则2检查是否为数值
const numericCount = sample.filter(v => !isNaN(Number(v)) && v !== '').length;
if (numericCount / sample.length > 0.9) {
// 即使是数字如果唯一值只有2-3个也视为分类变量二分类/三分类)
if (uniqueCount <= 3) {
return 'categorical';
}
return 'numeric';
}
// 规则3检查是否为日期
const datePatterns = [
/^\d{4}-\d{2}-\d{2}$/,
/^\d{2}\/\d{2}\/\d{4}$/,
/^\d{4}\/\d{2}\/\d{2}$/
];
const dateCount = sample.filter(v =>
datePatterns.some(p => p.test(v)) || !isNaN(Date.parse(v))
).length;
if (dateCount / sample.length > 0.9) {
return 'datetime';
}
// 规则4唯一值比例较低 → 分类变量
if (uniqueRatio < 0.5) {
return 'categorical';
}
return 'text';
}
}

View File

@@ -0,0 +1,111 @@
/**
* SSA 模块类型定义
*/
// 分析模式
export type SSAMode = 'analysis' | 'consult';
// 会话状态
export type SessionStatus = 'active' | 'consult' | 'completed' | 'error';
// 分析计划
export interface AnalysisPlan {
tool_code: string;
tool_name: string;
reasoning: string;
params: Record<string, any>;
guardrails: GuardrailConfig[];
confidence: number;
}
// 护栏配置
export interface GuardrailConfig {
name: string;
check_code: string;
threshold?: string;
action_type: 'Block' | 'Warn' | 'Switch';
action_target?: string;
status: 'pending' | 'passed' | 'failed' | 'switched';
}
// 执行结果
export interface ExecutionResult {
status: 'success' | 'error' | 'blocked';
message: string;
warnings?: string[];
results?: {
method: string;
statistic: number;
p_value: number;
p_value_fmt: string;
conf_int?: number[];
group_stats?: GroupStats[];
};
plots?: string[];
trace_log?: string[];
reproducible_code?: string;
}
// 分组统计
export interface GroupStats {
group: string;
n: number;
mean: number;
sd: number;
}
// 数据 Schema
export interface DataSchema {
rowCount: number;
columns: ColumnInfo[];
}
// 列信息
export interface ColumnInfo {
name: string;
type: 'numeric' | 'categorical' | 'datetime';
min?: number;
max?: number;
mean?: number;
missing?: number;
uniqueValues?: string[];
uniqueCount?: number;
privacyProtected?: boolean;
}
// SAP 文档
export interface SAPDocument {
title: string;
sections: Array<{
heading: string;
content: string;
}>;
recommendedTools: string[];
metadata: {
generatedAt: string;
sessionId: string;
version: string;
};
}
// 决策表条目
export interface DecisionTableEntry {
goalType: string;
yType: string;
xType?: string;
designType: string;
toolCode: string;
altToolCode?: string;
priority: number;
}
// 参数映射
export interface ParamMapping {
toolCode: string;
jsonKey: string;
rParamName: string;
dataType: 'string' | 'number' | 'boolean';
isRequired: boolean;
defaultValue?: string;
validationRule?: string;
}

View File

@@ -0,0 +1,42 @@
/**
* SSA 计划验证 Schema (Zod)
*/
import { z } from 'zod';
// 护栏配置 Schema
export const guardrailConfigSchema = z.object({
name: z.string(),
check_code: z.string().optional(),
threshold: z.string().optional(),
action_type: z.enum(['Block', 'Warn', 'Switch']).default('Warn'),
action_target: z.string().optional(),
status: z.enum(['pending', 'passed', 'failed', 'switched']).default('pending')
});
// 分析计划 Schema
export const analysisPlanSchema = z.object({
tool_code: z.string().regex(/^ST_[A-Z_]+$/, 'tool_code must match ST_XXX pattern'),
tool_name: z.string().min(1),
reasoning: z.string(),
params: z.record(z.any()),
guardrails: z.array(guardrailConfigSchema).optional(),
confidence: z.number().min(0).max(1).optional()
});
// 执行请求 Schema
export const executeRequestSchema = z.object({
plan: analysisPlanSchema,
confirm: z.boolean().default(true)
});
// 咨询消息 Schema
export const consultMessageSchema = z.object({
message: z.string().min(1, 'Message cannot be empty')
});
// 类型导出
export type GuardrailConfig = z.infer<typeof guardrailConfigSchema>;
export type AnalysisPlan = z.infer<typeof analysisPlanSchema>;
export type ExecuteRequest = z.infer<typeof executeRequestSchema>;
export type ConsultMessage = z.infer<typeof consultMessageSchema>;