feat(ssa): finalize strict stepwise agent execution flow

Align Agent mode to strict stepwise generation and execution, add deterministic and safety hardening, and sync deployment/module documentation for Phase 5A.5/5B/5C rollout.

- implement strict stepwise execution path and dependency short-circuiting
- persist step-level errors/results and stream step_* progress events
- add agent plan params patch route and schema/migration support
- improve R sanitizer/security checks and step result rendering in workspace
- update SSA module guide and deployment change checklist

Made-with: Cursor
This commit is contained in:
2026-03-11 22:49:05 +08:00
parent d3b24bd8c3
commit 6edfad032f
19 changed files with 2105 additions and 158 deletions

View File

@@ -16,6 +16,7 @@ import configRoutes from './routes/config.routes.js';
import workflowRoutes from './routes/workflow.routes.js';
import blackboardRoutes from './routes/blackboard.routes.js';
import chatRoutes from './routes/chat.routes.js';
import agentExecutionRoutes from './routes/agent-execution.routes.js';
export async function ssaRoutes(app: FastifyInstance) {
// 注册认证中间件(遵循模块认证规范)
@@ -32,6 +33,8 @@ export async function ssaRoutes(app: FastifyInstance) {
app.register(blackboardRoutes, { prefix: '/sessions/:sessionId/blackboard' });
// Phase II: 统一对话入口
app.register(chatRoutes, { prefix: '/sessions' });
// Agent 计划参数编辑接口Phase 5A.5
app.register(agentExecutionRoutes, { prefix: '/agent-executions' });
}
export default ssaRoutes;

View File

@@ -0,0 +1,183 @@
import { FastifyInstance, FastifyRequest } from 'fastify';
import { z } from 'zod';
import { logger } from '../../../common/logging/index.js';
import { prisma } from '../../../config/database.js';
import toolParamConstraints from '../config/tool_param_constraints.json' with { type: 'json' };
type ParamRule = {
paramType: 'single' | 'multi';
requiredType: 'any' | 'numeric' | 'categorical';
hint?: string;
};
type ConstraintsMap = Record<string, Record<string, ParamRule>>;
const Constraints = toolParamConstraints as ConstraintsMap;
function getUserId(request: FastifyRequest): string {
const userId = (request as any).user?.userId;
if (!userId) throw new Error('User not authenticated');
return userId;
}
function normalizeColumnType(raw?: string): 'numeric' | 'categorical' | 'other' {
const t = (raw || '').toLowerCase();
if (['numeric', 'number', 'float', 'double', 'integer', 'int'].includes(t)) return 'numeric';
if (['categorical', 'category', 'factor', 'string', 'text'].includes(t)) return 'categorical';
return 'other';
}
export default async function agentExecutionRoutes(app: FastifyInstance) {
app.patch<{
Params: { executionId: string };
Body: { steps: Array<{ stepOrder: number; params: Record<string, unknown> }> };
}>(
'/:executionId/plan-params',
async (request, reply) => {
const userId = getUserId(request);
const { executionId } = request.params;
const BodySchema = z.object({
steps: z.array(
z.object({
stepOrder: z.number().int().positive(),
params: z.record(z.string(), z.unknown()),
}),
).min(1),
});
const parsed = BodySchema.safeParse(request.body);
if (!parsed.success) {
return reply.status(400).send({
success: false,
error: 'Invalid request body',
details: parsed.error.flatten(),
});
}
const execution = await (prisma as any).ssaAgentExecution.findUnique({
where: { id: executionId },
select: {
id: true,
status: true,
reviewResult: true,
sessionId: true,
session: { select: { userId: true, dataSchema: true } },
},
});
if (!execution) {
return reply.status(404).send({ success: false, error: 'Agent execution not found' });
}
if (execution.session?.userId !== userId) {
return reply.status(403).send({ success: false, error: 'No permission to modify this execution' });
}
if (execution.status !== 'plan_pending') {
return reply.status(409).send({
success: false,
error: `Cannot modify plan params when status is '${execution.status}'`,
});
}
const review = (execution.reviewResult || {}) as any;
const steps = Array.isArray(review.steps) ? review.steps : [];
if (steps.length === 0) {
return reply.status(400).send({ success: false, error: 'Execution has no structured plan steps' });
}
const schemaColumns = ((execution.session?.dataSchema as any)?.columns || []) as Array<any>;
const columnTypeMap = new Map<string, 'numeric' | 'categorical' | 'other'>(
schemaColumns.map((c: any) => [c.name, normalizeColumnType(c.type || c.inferred_type)]),
);
for (const patch of parsed.data.steps) {
const target = steps.find((s: any) => Number(s.order) === patch.stepOrder);
if (!target) {
return reply.status(400).send({
success: false,
error: `Step ${patch.stepOrder} does not exist in reviewResult.steps`,
});
}
const toolCode = String(target.toolCode || target.tool_code || '').trim();
const ruleSet = toolCode ? Constraints[toolCode] : undefined;
for (const [paramName, value] of Object.entries(patch.params)) {
if (typeof value === 'string' && value && columnTypeMap.size > 0 && !columnTypeMap.has(value)) {
return reply.status(400).send({
success: false,
error: `Variable '${value}' in step ${patch.stepOrder}.${paramName} does not exist in dataset`,
});
}
if (Array.isArray(value) && columnTypeMap.size > 0) {
for (const v of value) {
if (typeof v === 'string' && !columnTypeMap.has(v)) {
return reply.status(400).send({
success: false,
error: `Variable '${v}' in step ${patch.stepOrder}.${paramName} does not exist in dataset`,
});
}
}
}
const rule = ruleSet?.[paramName];
if (!rule || rule.requiredType === 'any') continue;
if (rule.paramType === 'single' && typeof value === 'string') {
const t = columnTypeMap.get(value);
if ((rule.requiredType === 'numeric' && t !== 'numeric')
|| (rule.requiredType === 'categorical' && t !== 'categorical')) {
return reply.status(400).send({
success: false,
error: `Step ${patch.stepOrder}.${paramName} requires ${rule.requiredType} variable`,
hint: rule.hint,
});
}
}
if (rule.paramType === 'multi' && Array.isArray(value)) {
for (const v of value) {
if (typeof v !== 'string') continue;
const t = columnTypeMap.get(v);
if ((rule.requiredType === 'numeric' && t !== 'numeric')
|| (rule.requiredType === 'categorical' && t !== 'categorical')) {
return reply.status(400).send({
success: false,
error: `Step ${patch.stepOrder}.${paramName} requires ${rule.requiredType} variables`,
hint: rule.hint,
});
}
}
}
}
}
// 写回 reviewResult.steps[].params
for (const patch of parsed.data.steps) {
const idx = steps.findIndex((s: any) => Number(s.order) === patch.stepOrder);
if (idx >= 0) {
const currentParams = (steps[idx].params || {}) as Record<string, unknown>;
steps[idx].params = { ...currentParams, ...patch.params };
}
}
review.steps = steps;
await (prisma as any).ssaAgentExecution.update({
where: { id: executionId },
data: {
reviewResult: review,
},
});
logger.info('[SSA:AgentExecution] plan params updated', {
executionId,
stepsUpdated: parsed.data.steps.length,
});
return reply.send({
success: true,
stepsUpdated: parsed.data.steps.length,
reviewResult: review,
});
},
);
}

View File

@@ -28,6 +28,12 @@ export interface GeneratedCode {
requiredPackages: string[];
}
export interface StepResultSummary {
stepOrder: number;
method: string;
highlights: string;
}
export class AgentCoderService {
/**
@@ -132,6 +138,63 @@ export class AgentCoderService {
return result;
}
/**
* 按步骤流式生成Phase 5B
*/
async generateStepCodeStream(
sessionId: string,
plan: AgentPlan,
step: AgentPlan['steps'][number],
previousResults: StepResultSummary[],
onProgress: (accumulated: string) => void,
errorFeedback?: string,
previousCode?: string,
): Promise<GeneratedCode> {
const dataContext = await this.buildDataContext(sessionId);
const systemPrompt = await this.buildSystemPrompt(dataContext);
const userMessage = errorFeedback
? this.buildStepRetryMessage(plan, step, previousResults, errorFeedback, previousCode)
: this.buildStepFirstMessage(plan, step, previousResults);
const messages: LLMMessage[] = [
{ role: 'system', content: systemPrompt },
{ role: 'user', content: userMessage },
];
logger.info('[AgentCoder] Generating step R code (stream)', {
sessionId,
planTitle: plan.title,
stepOrder: step.order,
stepMethod: step.method,
isRetry: !!errorFeedback,
});
const adapter = LLMFactory.getAdapter(MODEL as any);
let fullContent = '';
let lastSentLen = 0;
const CHUNK_SIZE = 150;
for await (const chunk of adapter.chatStream(messages, {
temperature: 0.2,
maxTokens: 8000,
})) {
if (chunk.content) {
fullContent += chunk.content;
if (fullContent.length - lastSentLen >= CHUNK_SIZE) {
onProgress(fullContent);
lastSentLen = fullContent.length;
}
}
}
if (fullContent.length > lastSentLen) {
onProgress(fullContent);
}
return this.parseCode(fullContent);
}
private async buildDataContext(sessionId: string): Promise<string> {
const blackboard = await sessionBlackboardService.get(sessionId);
if (!blackboard) return '(无数据上下文)';
@@ -302,7 +365,84 @@ ${errorFeedback}
4. 用 safe_test 模式包裹统计检验,处理 NA/NaN/Inf
5. 检查所有 library() 调用是否在预装包列表内
6. 保持 report_blocks 输出格式不变
7. **必须将修正后的完整代码放在 <r_code>...</r_code> 标签中**`;
7. **严禁使用未预装包**(尤其是 pROC、nortest、exact2x2如涉及 ROC/AUC请使用基础 R 实现,不要写 pROC::*
8. 若报错包含 "unexpected 'if'" 或语法错误,必须检查并修复所有单行拼接错误:表达式后面的 if/else 必须换行并带完整代码块
9. **必须将修正后的完整代码放在 <r_code>...</r_code> 标签中**`;
}
private buildStepFirstMessage(
plan: AgentPlan,
step: AgentPlan['steps'][number],
previousResults: StepResultSummary[],
): string {
const previous = previousResults.length > 0
? previousResults.map(r => `- Step ${r.stepOrder} (${r.method}): ${r.highlights}`).join('\n')
: '(无)';
return `请仅为当前步骤生成 R 代码(不要输出其他步骤代码)。
## 分析计划
- 标题:${plan.title}
- 研究设计:${plan.designType}
## 当前步骤(仅此一步)
- Step ${step.order}
- 方法:${step.method}
- 目标:${step.description}
- 理由:${step.rationale}
## 已完成步骤结果摘要
${previous}
## 关键约束
1. 只能生成当前步骤所需代码
2. 不要重复加载数据df 已存在)
3. 可直接使用前序步骤已生成的对象
4. 末尾仍需返回包含 report_blocks 的 list
5. 必须使用 <r_code>...</r_code> 包裹完整代码`;
}
private buildStepRetryMessage(
plan: AgentPlan,
step: AgentPlan['steps'][number],
previousResults: StepResultSummary[],
errorFeedback: string,
previousCode?: string,
): string {
const codeSection = previousCode
? `## 上次失败代码
<previous_code>
${previousCode}
</previous_code>`
: '';
const previous = previousResults.length > 0
? previousResults.map(r => `- Step ${r.stepOrder} (${r.method}): ${r.highlights}`).join('\n')
: '(无)';
return `当前步骤代码执行失败,请仅修复当前步骤并输出完整新代码。
${codeSection}
## 失败步骤
- 计划:${plan.title}
- Step ${step.order} / ${step.method}
- 目标:${step.description}
## 前序步骤结果摘要
${previous}
## 错误信息
<error>
${errorFeedback}
</error>
## 修复要求
1. 只修复当前步骤代码,不生成其他步骤代码
2. 检查并修复语法、对象存在性、类型转换、缺失值处理
3. 禁止未安装包,禁止 system/eval/source 等高危调用
4. 仍需返回包含 report_blocks 的 list
5. 必须用 <r_code>...</r_code> 输出完整代码`;
}
private parseCode(content: string): GeneratedCode {

View File

@@ -19,7 +19,7 @@ import { askUserService, type AskUserResponse } from './AskUserService.js';
import { toolOrchestratorService } from './ToolOrchestratorService.js';
import { executeGetDataOverview } from './tools/GetDataOverviewTool.js';
import { agentPlannerService } from './AgentPlannerService.js';
import { agentCoderService } from './AgentCoderService.js';
import { agentCoderService, type StepResultSummary } from './AgentCoderService.js';
import { codeRunnerService } from './CodeRunnerService.js';
import { prisma } from '../../../config/database.js';
import type { IntentType } from './SystemPromptService.js';
@@ -397,12 +397,12 @@ export class ChatHandlerService {
// ────────────────────────────────────────────
/**
* Agent 模式入口 — 三步确认式管线
* Agent 模式入口 — 三步确认式管线(严格分步模式)
*
* 状态机:
* 新请求 → agentGeneratePlan → plan_pending等用户确认
* 用户确认计划 → agentStreamCode → code_pending等用户确认
* 用户确认代码 → agentExecuteCode → completed
* 用户确认计划 → agentStreamCode(不生成代码,仅进入执行确认)→ code_pending等用户确认
* 用户确认代码 → agentExecuteCode(分步生成+分步执行)→ completed/error
*/
async handleAgentMode(
sessionId: string,
@@ -423,7 +423,7 @@ export class ChatHandlerService {
});
if (!activeExec) {
return await this.handleAgentChat(sessionId, conversationId, writer, placeholderMessageId, null);
return await this.handleAgentChat(sessionId, conversationId, writer, placeholderMessageId, null, false);
}
if (agentAction === 'confirm_plan' && activeExec.status === 'plan_pending') {
@@ -445,6 +445,7 @@ export class ChatHandlerService {
if (activeExec) {
const action = this.parseAgentAction(userContent);
const isInlineInstruction = !!metadata?.agentInlineInstruction;
if (activeExec.status === 'plan_pending' && action === 'confirm') {
return await this.agentStreamCode(activeExec, sessionId, conversationId, writer, placeholderMessageId);
}
@@ -454,6 +455,14 @@ export class ChatHandlerService {
if (action === 'cancel') {
return await this.agentCancel(activeExec, sessionId, conversationId, writer, placeholderMessageId);
}
// 方案 B在待确认阶段左侧输入默认视为“右侧工作区内联修改指令”
if (isInlineInstruction && activeExec.status === 'plan_pending') {
return await this.agentGeneratePlan(sessionId, conversationId, userContent, writer, placeholderMessageId);
}
if (isInlineInstruction && activeExec.status === 'code_pending') {
// 严格分步模式:执行前不生成代码。若用户继续输入修改指令,回到计划阶段重新规划更符合心智。
return await this.agentGeneratePlan(sessionId, conversationId, userContent, writer, placeholderMessageId);
}
}
// 3. 无挂起确认 — 检查是否是分析请求
@@ -559,7 +568,8 @@ export class ChatHandlerService {
sendEvent('agent_planning', { executionId: execution.id, message: '正在制定分析计划...' });
const conversationHistory = await conversationService.buildContext(sessionId, conversationId, 'analyze');
const plan = await agentPlannerService.generatePlan(sessionId, userContent, conversationHistory);
const rawPlan = await agentPlannerService.generatePlan(sessionId, userContent, conversationHistory);
const plan = this.normalizeAgentPlan(rawPlan);
// planText 存原始文本, reviewResult(JSON) 存结构化计划以便恢复
await (prisma as any).ssaAgentExecution.update({
@@ -584,7 +594,154 @@ export class ChatHandlerService {
return { messageId: placeholderMessageId, intent: 'analyze', success: true };
}
// ── Agent Step 2: 流式生成代码 → 等用户确认 ──
/**
* 将 Planner 输出归一化为可编辑计划:
* - 每步补齐 toolCode用于约束匹配
* - 每步补齐 params用于 5A.5 变量编辑)
*/
private normalizeAgentPlan(plan: any): any {
const normalized = JSON.parse(JSON.stringify(plan || {}));
if (!Array.isArray(normalized.steps)) {
normalized.steps = [];
return normalized;
}
normalized.steps = normalized.steps.map((step: any, idx: number) => {
const order = Number(step?.order) || (idx + 1);
const inferred = this.inferToolCodeAndParamsFromStep(normalized, step || {});
return {
...step,
order,
toolCode: step?.toolCode || step?.tool_code || inferred.toolCode,
params: { ...(inferred.params || {}), ...(step?.params || {}) },
};
});
return normalized;
}
private inferToolCodeAndParamsFromStep(
plan: {
variables?: {
outcome?: string[];
predictors?: string[];
grouping?: string | null;
confounders?: string[];
};
},
step: { method?: string; description?: string },
): { toolCode: string; params: Record<string, unknown> } {
const method = (step.method || '').toLowerCase();
const desc = (step.description || '').toLowerCase();
const text = `${method} ${desc}`;
const vars = plan.variables || {};
const outcome = Array.isArray(vars.outcome) ? vars.outcome[0] : undefined;
const predictors = Array.isArray(vars.predictors) ? vars.predictors : [];
const grouping = vars.grouping || undefined;
const confounders = Array.isArray(vars.confounders) ? vars.confounders : [];
if (text.includes('logistic') || text.includes('逻辑回归') || text.includes('二元回归')) {
return {
toolCode: 'ST_LOGISTIC_BINARY',
params: {
outcome_var: outcome || null,
predictors,
confounders,
},
};
}
if (text.includes('linear') || text.includes('线性回归')) {
return {
toolCode: 'ST_LINEAR_REG',
params: {
outcome_var: outcome || null,
predictors,
confounders,
},
};
}
if (text.includes('anova') || text.includes('方差分析')) {
return {
toolCode: 'ST_ANOVA_ONE',
params: {
group_var: grouping || null,
value_var: outcome || null,
},
};
}
if (text.includes('mann') || text.includes('wilcoxon秩和') || text.includes('秩和检验')) {
return {
toolCode: 'ST_MANN_WHITNEY',
params: {
group_var: grouping || null,
value_var: outcome || null,
},
};
}
if (text.includes('t检验') || text.includes('t test') || text.includes('t-test')) {
return {
toolCode: 'ST_T_TEST_IND',
params: {
group_var: grouping || null,
value_var: outcome || null,
},
};
}
if (text.includes('fisher')) {
return {
toolCode: 'ST_FISHER',
params: {
var1: grouping || null,
var2: outcome || null,
},
};
}
if (text.includes('卡方') || text.includes('chi-square') || text.includes('chisq')) {
return {
toolCode: 'ST_CHI_SQUARE',
params: {
var1: grouping || null,
var2: outcome || null,
},
};
}
if (text.includes('相关') || text.includes('correlation') || text.includes('pearson') || text.includes('spearman')) {
return {
toolCode: 'ST_CORRELATION',
params: {
var_x: predictors[0] || null,
var_y: outcome || null,
},
};
}
if (text.includes('基线') || text.includes('baseline')) {
return {
toolCode: 'ST_BASELINE_TABLE',
params: {
group_var: grouping || null,
analyze_vars: [...predictors, ...confounders].filter(Boolean),
},
};
}
return {
toolCode: 'ST_DESCRIPTIVE',
params: {
variables: [...predictors, ...confounders, ...(outcome ? [outcome] : [])].filter(Boolean),
group_var: grouping || null,
},
};
}
// ── Agent Step 2: 进入执行确认(严格分步:执行前不生成代码) ──
private async agentStreamCode(
execution: any,
@@ -602,30 +759,86 @@ export class ChatHandlerService {
data: { status: 'coding' },
});
sendEvent('code_generating', { executionId: execution.id, partialCode: '', message: '正在生成 R 代码...' });
const plan = execution.reviewResult as any;
const generated = await agentCoderService.generateCodeStream(
sessionId, plan,
(accumulated: string) => {
sendEvent('code_generating', { executionId: execution.id, partialCode: accumulated });
},
);
await (prisma as any).ssaAgentExecution.update({
where: { id: execution.id },
data: { generatedCode: generated.code, status: 'code_pending' },
data: { generatedCode: null, status: 'code_pending' },
});
sendEvent('code_generated', {
executionId: execution.id,
code: generated.code,
explanation: generated.explanation,
code: '',
explanation: '已进入执行确认。严格分步模式下,代码将在执行阶段逐步生成。',
});
// 固定文本引导语
const hintText = `R 代码已生成(${generated.code.split('\n').length} 行),👉 请在右侧工作区核对代码点击「确认并执行」。`;
const hintText = '已进入执行确认。当前为严格分步模式:执行前不生成代码点击「确认并执行代码」后将按步骤逐步生成并执行。';
await this.sendFixedHint(writer, placeholderMessageId, hintText);
return { messageId: placeholderMessageId, intent: 'analyze', success: true };
}
// ── Agent 补充分支:用户在 code_pending 阶段给出修改指令(兼容保留) ──
private async agentRegenerateCodeByInstruction(
execution: any,
sessionId: string,
_conversationId: string,
instruction: string,
writer: StreamWriter,
placeholderMessageId: string,
): Promise<HandleResult> {
const sendEvent = (type: string, data: Record<string, any>) => {
writer.write(`data: ${JSON.stringify({ type, ...data })}\n\n`);
};
await (prisma as any).ssaAgentExecution.update({
where: { id: execution.id },
data: { status: 'coding' },
});
sendEvent('code_retry', {
executionId: execution.id,
retryCount: (execution.retryCount || 0) + 1,
message: '收到修改指令Agent 正在重新生成第 1 步代码预览...',
previousError: instruction,
});
sendEvent('code_generating', { executionId: execution.id, partialCode: '' });
const plan = execution.reviewResult as any;
const steps = (plan?.steps || []).length > 0
? (plan.steps as Array<{ order: number; method: string; description: string; rationale: string }>)
: [{ order: 1, method: '综合分析', description: '执行完整分析', rationale: '默认单步' }];
const firstStep = steps[0];
const retry = await agentCoderService.generateStepCodeStream(
sessionId,
plan,
firstStep,
[],
(accumulated) => {
sendEvent('code_generating', {
executionId: execution.id,
partialCode: accumulated,
});
},
`用户修改要求:${instruction}`,
execution.generatedCode,
);
await (prisma as any).ssaAgentExecution.update({
where: { id: execution.id },
data: {
generatedCode: retry.code,
status: 'code_pending',
retryCount: (execution.retryCount || 0) + 1,
},
});
sendEvent('code_generated', {
executionId: execution.id,
code: retry.code,
explanation: retry.explanation,
});
const hintText = '已根据你的修改指令重生成第 1 步代码预览,后续步骤将在执行阶段逐步生成。👉 请在右侧工作区确认后执行。';
await this.sendFixedHint(writer, placeholderMessageId, hintText);
return { messageId: placeholderMessageId, intent: 'analyze', success: true };
@@ -636,7 +849,7 @@ export class ChatHandlerService {
private async agentExecuteCode(
execution: any,
sessionId: string,
conversationId: string,
_conversationId: string,
writer: StreamWriter,
placeholderMessageId: string,
): Promise<HandleResult> {
@@ -650,111 +863,344 @@ export class ChatHandlerService {
});
const plan = execution.reviewResult as any;
let currentCode = execution.generatedCode as string;
const steps = (plan?.steps || []).length > 0
? (plan.steps as Array<{ order: number; method: string; description: string; rationale: string }>)
: [{ order: 1, method: '综合分析', description: '执行完整分析', rationale: '默认单步' }];
let lastError: string | null = null;
const datasetHash = await this.getSessionDatasetHash(sessionId);
const baseSeed = this.deriveStableSeed(`${sessionId}:${execution.id}:${datasetHash}`);
const seedAudit = { baseSeed, datasetHash, steps: [] as Array<{ stepOrder: number; stepSeed: number }> };
const stepResults: Array<{
stepOrder: number;
method: string;
status: 'pending' | 'coding' | 'executing' | 'completed' | 'error' | 'skipped';
code?: string;
reportBlocks?: any[];
errorMessage?: string;
retryCount: number;
durationMs?: number;
}> = steps.map(s => ({
stepOrder: s.order,
method: s.method,
status: 'pending',
retryCount: 0,
}));
const previousResults: StepResultSummary[] = [];
let accumulatedCode = '';
for (let attempt = 0; attempt <= codeRunnerService.maxRetries; attempt++) {
sendEvent('code_executing', {
executionId: execution.id,
attempt: attempt + 1,
message: attempt === 0 ? '正在执行 R 代码...' : `${attempt + 1} 次重试执行...`,
await (prisma as any).ssaAgentExecution.update({
where: { id: execution.id },
data: { seedAudit, currentStep: 1, stepResults: stepResults as any },
});
for (let stepIndex = 0; stepIndex < steps.length; stepIndex++) {
const step = steps[stepIndex];
const stepOrder = step.order || (stepIndex + 1);
const stepSeed = this.deriveStepSeed(baseSeed, stepOrder);
const deterministicHeader = this.buildDeterministicHeader(stepSeed);
seedAudit.steps.push({ stepOrder, stepSeed });
let stepCode = '';
let stepCompleted = false;
await (prisma as any).ssaAgentExecution.update({
where: { id: execution.id },
data: { currentStep: stepOrder, seedAudit },
});
const execResult = await codeRunnerService.executeCode(sessionId, currentCode);
for (let attempt = 0; attempt <= codeRunnerService.maxRetries; attempt++) {
stepResults[stepIndex].retryCount = attempt;
stepResults[stepIndex].status = 'coding';
sendEvent('step_coding', {
executionId: execution.id,
stepOrder,
retryCount: attempt,
partialCode: '',
});
// 兼容旧前端事件
sendEvent('code_generating', {
executionId: execution.id,
partialCode: '',
});
if (execResult.success) {
const durationMs = execResult.durationMs || 0;
const gen = await agentCoderService.generateStepCodeStream(
sessionId,
plan,
step,
previousResults,
(partialCode) => {
sendEvent('step_coding', {
executionId: execution.id,
stepOrder,
retryCount: attempt,
partialCode,
});
// 兼容旧前端事件
sendEvent('code_generating', {
executionId: execution.id,
partialCode,
});
},
attempt > 0 ? lastError || '执行失败' : undefined,
attempt > 0 ? stepCode : undefined,
);
stepCode = gen.code;
sendEvent('step_code_ready', {
executionId: execution.id,
stepOrder,
code: stepCode,
});
stepResults[stepIndex].status = 'executing';
stepResults[stepIndex].code = stepCode;
await (prisma as any).ssaAgentExecution.update({
where: { id: execution.id },
data: {
executionResult: execResult as any,
reportBlocks: execResult.reportBlocks as any,
generatedCode: currentCode,
status: 'completed',
stepResults: stepResults as any,
generatedCode: stepCode,
retryCount: attempt,
durationMs,
},
});
sendEvent('code_result', {
sendEvent('step_executing', {
executionId: execution.id,
reportBlocks: execResult.reportBlocks,
code: currentCode,
durationMs,
stepOrder,
retryCount: attempt,
});
// 兼容旧前端事件
sendEvent('code_executing', {
executionId: execution.id,
attempt: attempt + 1,
message: `Step ${stepOrder} 正在执行 R 代码...`,
});
// 固定文本引导语(结果解读应在右侧工作区,不在对话区)
const blockCount = (execResult.reportBlocks || []).length;
const seconds = (durationMs / 1000).toFixed(1);
const hintText = `✅ 分析完成(${seconds}s共生成 ${blockCount} 个结果模块。👉 请在右侧工作区查看完整结果和图表。`;
await this.sendFixedHint(writer, placeholderMessageId, hintText);
const fullCode = `${deterministicHeader}\n${accumulatedCode}\n${stepCode}`;
const execResult = await codeRunnerService.executeCode(sessionId, fullCode, {
stepOrder,
stepSeed,
baseSeed,
datasetHash,
});
return { messageId: placeholderMessageId, intent: 'analyze', success: true };
}
if (execResult.success) {
const durationMs = execResult.durationMs || 0;
const reportBlocks = execResult.reportBlocks || [];
accumulatedCode = `${accumulatedCode}\n${stepCode}`.trim();
stepResults[stepIndex] = {
...stepResults[stepIndex],
status: 'completed',
reportBlocks,
durationMs,
};
previousResults.push({
stepOrder,
method: step.method,
highlights: this.summarizeStepHighlights(reportBlocks),
});
sendEvent('step_result', {
executionId: execution.id,
stepOrder,
reportBlocks,
durationMs,
});
stepCompleted = true;
break;
}
lastError = execResult.error || '执行失败';
const rawConsole = execResult.consoleOutput;
const consoleArr = Array.isArray(rawConsole) ? rawConsole : (rawConsole ? [String(rawConsole)] : []);
const consoleSnippet = consoleArr.slice(-20).join('\n');
if (attempt < codeRunnerService.maxRetries) {
const errorDetail = consoleSnippet
? `${lastError}\n\n--- R console output (last 20 lines) ---\n${consoleSnippet}`
: lastError;
lastError = execResult.error || '执行失败';
const errorCode = execResult.errorCode;
const errorType = execResult.errorType;
const errorClass = this.classifyExecutionError(errorCode, lastError);
const isFatal = errorClass === 'fatal';
const rawConsole = execResult.consoleOutput;
const consoleArr = Array.isArray(rawConsole) ? rawConsole : (rawConsole ? [String(rawConsole)] : []);
const consoleSnippet = consoleArr.slice(-20).join('\n');
stepResults[stepIndex].status = 'error';
stepResults[stepIndex].errorMessage = lastError;
await (prisma as any).ssaAgentExecution.update({
where: { id: execution.id },
data: {
stepResults: stepResults as any,
errorMessage: lastError,
},
});
sendEvent('step_error', {
executionId: execution.id,
stepOrder,
message: lastError,
errorCode,
errorType,
isFatal,
consoleOutput: consoleSnippet || undefined,
willRetry: attempt < codeRunnerService.maxRetries && !isFatal,
retryCount: attempt + 1,
});
// 兼容旧前端事件
sendEvent('code_error', {
executionId: execution.id,
message: lastError,
consoleOutput: consoleSnippet || undefined,
willRetry: true,
willRetry: attempt < codeRunnerService.maxRetries && !isFatal,
retryCount: attempt + 1,
});
sendEvent('code_retry', {
executionId: execution.id,
retryCount: attempt + 1,
message: `${attempt + 1} 次执行失败Agent 正在重新生成代码...`,
previousError: lastError,
});
if (isFatal) {
sendEvent('pipeline_aborted', {
executionId: execution.id,
stepOrder,
error: lastError,
errorCode,
});
await (prisma as any).ssaAgentExecution.update({
where: { id: execution.id },
data: {
status: 'error',
errorMessage: lastError,
stepResults: stepResults as any,
seedAudit,
retryCount: attempt,
},
});
return { messageId: placeholderMessageId, intent: 'analyze', success: true };
}
const retry = await agentCoderService.generateCodeStream(
sessionId,
plan,
(accumulated) => {
sendEvent('code_generating', {
if (attempt >= codeRunnerService.maxRetries) {
stepResults[stepIndex].status = 'skipped';
sendEvent('step_skipped', {
executionId: execution.id,
stepOrder,
reason: `重试 ${codeRunnerService.maxRetries + 1} 次仍失败,跳过该步骤`,
});
// 关键依赖短路:当前步骤失败后,后续步骤不再生成代码,直接标记跳过
for (let j = stepIndex + 1; j < steps.length; j++) {
const nextStepOrder = steps[j].order || (j + 1);
stepResults[j].status = 'skipped';
stepResults[j].errorMessage = `依赖步骤 ${stepOrder} 失败,自动跳过`;
sendEvent('step_skipped', {
executionId: execution.id,
partialCode: accumulated,
stepOrder: nextStepOrder,
reason: `依赖步骤 ${stepOrder} 失败,自动跳过`,
});
},
errorDetail,
currentCode,
);
currentCode = retry.code;
}
sendEvent('pipeline_aborted', {
executionId: execution.id,
stepOrder,
error: `步骤 ${stepOrder} 重试失败,后续步骤已短路跳过`,
errorCode: errorCode || 'E_DEPENDENCY',
});
await (prisma as any).ssaAgentExecution.update({
where: { id: execution.id },
data: {
stepResults: stepResults as any,
errorMessage: `步骤 ${stepOrder} 重试失败,后续步骤已短路跳过`,
},
});
stepCompleted = false;
break;
}
}
await (prisma as any).ssaAgentExecution.update({
where: { id: execution.id },
data: { generatedCode: currentCode, retryCount: attempt + 1 },
});
sendEvent('code_generated', { executionId: execution.id, code: currentCode });
if (!stepCompleted && stepResults[stepIndex].status === 'skipped') {
break;
}
if (!stepCompleted && stepResults[stepIndex].status !== 'skipped') {
break;
}
}
const allBlocks = stepResults
.filter(s => s.status === 'completed')
.flatMap(s => s.reportBlocks || []);
const totalDuration = stepResults.reduce((sum, s) => sum + (s.durationMs || 0), 0);
const hasCompletedStep = stepResults.some(s => s.status === 'completed');
const finalStatus = hasCompletedStep ? 'completed' : 'error';
await (prisma as any).ssaAgentExecution.update({
where: { id: execution.id },
data: { status: 'error', errorMessage: lastError, retryCount: codeRunnerService.maxRetries },
data: {
status: finalStatus,
errorMessage: finalStatus === 'error' ? (lastError || '执行失败') : null,
executionResult: { stepResults } as any,
stepResults: stepResults as any,
reportBlocks: allBlocks as any,
generatedCode: accumulatedCode,
currentStep: steps.length,
retryCount: stepResults.reduce((max, s) => Math.max(max, s.retryCount), 0),
durationMs: totalDuration,
seedAudit,
},
});
sendEvent('code_error', {
sendEvent('code_result', {
executionId: execution.id,
message: `经过 ${codeRunnerService.maxRetries + 1} 次尝试仍然失败: ${lastError}`,
willRetry: false,
reportBlocks: allBlocks,
code: accumulatedCode,
durationMs: totalDuration,
stepResults,
});
const blockCount = allBlocks.length;
const seconds = (totalDuration / 1000).toFixed(1);
const hintText = finalStatus === 'completed'
? `✅ 分析完成(${seconds}s共生成 ${blockCount} 个结果模块。👉 请在右侧工作区查看完整结果和图表。`
: `⚠️ 分析未完全完成,部分步骤失败或跳过。已输出 ${blockCount} 个可用结果模块,请在右侧工作区查看详情。`;
await this.sendFixedHint(writer, placeholderMessageId, hintText);
return { messageId: placeholderMessageId, intent: 'analyze', success: true };
}
private buildDeterministicHeader(stepSeed: number): string {
return [
'# --- 系统强制注入:保证累加执行确定性 ---',
`set.seed(${stepSeed})`,
"RNGkind('Mersenne-Twister', 'Inversion', 'Rejection')",
'options(warn = 1)',
'# --------------------------------------',
].join('\n');
}
private deriveStepSeed(baseSeed: number, stepOrder: number): number {
return ((baseSeed + stepOrder * 9973) % 2147483647) || 42;
}
private deriveStableSeed(input: string): number {
let hash = 2166136261;
for (let i = 0; i < input.length; i++) {
hash ^= input.charCodeAt(i);
hash += (hash << 1) + (hash << 4) + (hash << 7) + (hash << 8) + (hash << 24);
}
return Math.abs(hash >>> 0) % 2147483647 || 42;
}
private classifyExecutionError(errorCode?: string, errorMessage?: string): 'fatal' | 'retriable' {
const FATAL_CODES = new Set(['E_OOM', 'E_TIMEOUT', 'E005', 'E_SECURITY']);
if (errorCode && FATAL_CODES.has(errorCode)) return 'fatal';
const msg = (errorMessage || '').toLowerCase();
const fatalKeywords = ['cannot allocate vector', 'out of memory', 'killed', 'security violation', 'timed out'];
if (fatalKeywords.some(k => msg.includes(k))) return 'fatal';
return 'retriable';
}
private summarizeStepHighlights(reportBlocks: any[]): string {
if (!Array.isArray(reportBlocks) || reportBlocks.length === 0) {
return '无可用输出';
}
const first = reportBlocks.find(b => b?.title || b?.content) || reportBlocks[0];
const title = first?.title ? String(first.title) : '';
const content = first?.content ? String(first.content).replace(/\s+/g, ' ').slice(0, 120) : '';
return [title, content].filter(Boolean).join(' - ').slice(0, 180) || '步骤完成';
}
private async getSessionDatasetHash(sessionId: string): Promise<string> {
const session = await prisma.ssaSession.findUnique({
where: { id: sessionId },
select: { dataOssKey: true },
});
const source = session?.dataOssKey || 'no_data_key';
return this.deriveStableSeed(source).toString();
}
// ── Agent 取消 ──
private async agentCancel(

View File

@@ -29,6 +29,8 @@ export interface CodeExecutionResult {
consoleOutput?: string[];
durationMs?: number;
error?: string;
errorCode?: string;
errorType?: string;
}
export class CodeRunnerService {
@@ -49,6 +51,12 @@ export class CodeRunnerService {
async executeCode(
sessionId: string,
code: string,
metadata?: {
stepOrder?: number;
stepSeed?: number;
baseSeed?: number;
datasetHash?: string;
},
): Promise<CodeExecutionResult> {
const startTime = Date.now();
@@ -59,6 +67,7 @@ export class CodeRunnerService {
code: this.wrapCode(code, dataSource),
session_id: sessionId,
timeout: 120,
metadata,
};
logger.info('[CodeRunner] Executing R code', {
@@ -88,15 +97,21 @@ export class CodeRunnerService {
}
const errorMsg = response.data?.message || response.data?.user_hint || 'R 执行返回非成功状态';
const errorCode = response.data?.error_code;
const errorType = response.data?.error_type;
logger.warn('[CodeRunner] Execution failed (R returned error)', {
sessionId,
durationMs,
error: errorMsg,
errorCode,
errorType,
});
return {
success: false,
error: errorMsg,
errorCode,
errorType,
consoleOutput: response.data?.console_output,
durationMs,
};
@@ -108,6 +123,8 @@ export class CodeRunnerService {
return {
success: false,
error: 'R 统计服务超时或崩溃,请检查代码是否有死循环或内存溢出',
errorCode: 'E_TIMEOUT',
errorType: 'runtime',
durationMs,
};
}
@@ -126,6 +143,8 @@ export class CodeRunnerService {
return {
success: false,
error: errorMsg,
errorCode: error.response?.data?.error_code,
errorType: error.response?.data?.error_type,
durationMs,
};
}
@@ -141,6 +160,7 @@ export class CodeRunnerService {
*/
private wrapCode(userCode: string, dataSource: { type: string; oss_url?: string }): string {
const escapedUrl = (dataSource.oss_url || '').replace(/\\/g, '\\\\').replace(/"/g, '\\"');
const sanitizedUserCode = this.sanitizeUserCode(userCode);
return `
# === 自动注入:数据加载 ===
input <- list(
@@ -154,12 +174,60 @@ input <- list(
df <- load_input_data(input)
message(paste0("[Agent] Data loaded: ", nrow(df), " rows x ", ncol(df), " cols"))
# === pROC 兼容兜底(运行环境未安装 pROC 时启用)===
if (!requireNamespace("pROC", quietly = TRUE)) {
roc <- function(response, predictor, ...) {
resp <- as.numeric(response)
pred <- as.numeric(predictor)
keep <- is.finite(resp) & is.finite(pred)
resp <- resp[keep]
pred <- pred[keep]
if (length(resp) < 2 || length(unique(resp)) < 2) {
stop("ROC requires binary response with at least 2 classes")
}
positive <- max(resp, na.rm = TRUE)
y <- ifelse(resp == positive, 1, 0)
ord <- order(pred, decreasing = TRUE)
y <- y[ord]
tp <- cumsum(y == 1)
fp <- cumsum(y == 0)
P <- sum(y == 1)
N <- sum(y == 0)
tpr <- if (P > 0) tp / P else rep(0, length(tp))
fpr <- if (N > 0) fp / N else rep(0, length(fp))
x <- c(0, fpr, 1)
yv <- c(0, tpr, 1)
auc_val <- sum((x[-1] - x[-length(x)]) * (yv[-1] + yv[-length(yv)]) / 2, na.rm = TRUE)
structure(list(auc = auc_val, sensitivities = tpr, specificities = 1 - fpr), class = "simple_roc")
}
auc <- function(roc_obj, ...) {
if (!is.null(roc_obj$auc)) return(as.numeric(roc_obj$auc))
return(NA_real_)
}
}
# === 用户代码开始 ===
${userCode}
${sanitizedUserCode}
# === 用户代码结束 ===
`.trim();
}
private sanitizeUserCode(userCode: string): string {
let code = userCode.replace(/\r\n/g, '\n');
// 1) 未安装包 pROC 的兼容处理(保留执行而不是直接失败)
code = code.replace(/^\s*library\(\s*pROC\s*\)\s*$/gmi, '# removed: library(pROC) (not installed in runtime)');
code = code.replace(/\bpROC::roc\s*\(/g, 'roc(');
code = code.replace(/\bpROC::auc\s*\(/g, 'auc(');
// 2) 常见语法断裂修复:"... ) if (...)" -> 换行
code = code.replace(/\)\s+if\s*\(/g, ')\nif (');
// R 语言要求 "} else" 通常同一行,换行会导致 unexpected 'else'
code = code.replace(/\}\s+else\b/g, '} else');
return code;
}
/**
* 构建数据源(从 session 读取 OSS key → 预签名 URL
*/