/** * R 服务客户端 * 负责调用 R Docker 服务执行统计分析 * * 遵循规范: * - 使用统一日志服务 @/common/logging * - 使用统一存储服务 @/common/storage(OSS 存储规范) */ import axios, { AxiosInstance } from 'axios'; import { logger } from '../../../common/logging/index.js'; import { storage } from '../../../common/storage/index.js'; import { prisma } from '../../../config/database.js'; export class RClientService { private client: AxiosInstance; constructor() { const baseURL = process.env.R_SERVICE_URL || 'http://localhost:8082'; this.client = axios.create({ baseURL, timeout: 120000, // 120 秒超时 headers: { 'Content-Type': 'application/json' } }); } async execute(sessionId: string, plan: any, session: any): Promise { const startTime = Date.now(); // 从 OSS Key 或 session title 提取原始文件名 const originalFilename = this.extractFilename(session); // 构建请求体(使用统一存储服务) const requestBody = { data_source: await this.buildDataSource(session), params: plan.params, original_filename: originalFilename, guardrails: plan.guardrails || { check_normality: true, auto_fix: true } }; try { logger.info('[SSA:RClient] Calling R service', { sessionId, toolCode: plan.tool_code, endpoint: `/api/v1/skills/${plan.tool_code}`, requestBody }); const response = await this.client.post( `/api/v1/skills/${plan.tool_code}`, requestBody ); const executionMs = Date.now() - startTime; logger.info('[SSA:RClient] R service response', { sessionId, status: response.data?.status, hasResults: !!response.data?.results, executionMs }); // 记录执行日志(失败不阻塞主流程) try { await prisma.ssaExecutionLog.create({ data: { sessionId, toolCode: plan.tool_code, inputParams: plan.params, outputStatus: response.data.status, outputResult: response.data.results, traceLog: response.data.trace_log || [], executionMs } }); } catch (logError) { logger.warn('[SSA:RClient] Failed to save execution log', { error: logError }); } // 添加执行耗时到返回结果 return { ...response.data, executionMs }; } catch (error: any) { logger.error('R service call failed', { sessionId, toolCode: plan.tool_code, error: error.message }); // 502/504 特殊处理(R 服务崩溃或超时) const statusCode = error.response?.status; if (statusCode === 502 || statusCode === 504) { throw new Error('统计服务繁忙或数据异常,请稍后重试'); } // 提取 R 服务返回的用户友好提示 const userHint = error.response?.data?.user_hint; if (userHint) { throw new Error(userHint); } throw new Error(`R service error: ${error.message}`); } } /** * 构建数据源(仅支持 OSS) * * 设计说明:SSA 场景下用户必须上传数据文件,文件存入 OSS, * R 服务通过预签名 URL 从 OSS 下载数据。 */ private async buildDataSource(session: any): Promise<{ type: string; oss_url: string }> { const ossKey = session.dataOssKey; if (!ossKey) { logger.error('[SSA:RClient] No data uploaded', { sessionId: session.id }); throw new Error('请先上传数据文件'); } logger.info('[SSA:RClient] Building OSS data source', { sessionId: session.id, ossKey }); const signedUrl = await storage.getUrl(ossKey); return { type: 'oss', oss_url: signedUrl }; } /** * 从 session 提取原始文件名 */ private extractFilename(session: any): string { // 优先从 title 获取(上传时会设置为文件名) if (session.title) { // 如果 title 不包含扩展名,添加 .csv if (!session.title.match(/\.(csv|xlsx|xls)$/i)) { return `${session.title}.csv`; } return session.title; } // 从 OSS Key 提取 if (session.dataOssKey) { const parts = session.dataOssKey.split('/'); return parts[parts.length - 1] || 'data.csv'; } return 'data.csv'; } async healthCheck(): Promise { try { const res = await this.client.get('/health'); return res.data.status === 'ok'; } catch { return false; } } }