/** * SSA DataProfile 服务 (Phase 2A) * * 调用 Python Tool C 生成数据画像,用于 LLM 生成分析计划 * * 时机:用户上传数据时(时机 A) * 输出:DataProfile JSON,存入 SsaSession.dataProfile */ import axios, { AxiosInstance } from 'axios'; import { logger } from '../../../common/logging/index.js'; import { prisma } from '../../../config/database.js'; import { storage } from '../../../common/storage/index.js'; export interface DataProfile { columns: ColumnProfile[]; summary: DataSummary; } export interface ColumnProfile { name: string; type: 'numeric' | 'categorical' | 'datetime' | 'text'; missingCount: number; missingRate: number; uniqueCount: number; totalCount: number; // 数值列 mean?: number; std?: number; median?: number; min?: number; max?: number; q1?: number; q3?: number; iqr?: number; outlierCount?: number; outlierRate?: number; skewness?: number; kurtosis?: number; // 分类列 topValues?: Array<{ value: string; count: number; percentage: number }>; totalLevels?: number; modeValue?: string; modeCount?: number; // 日期列 minDate?: string; maxDate?: string; dateRange?: string; // Phase Q: 非分析列标记(由 Python DataProfiler 生成) isIdLike?: boolean; } export interface DataSummary { totalRows: number; totalColumns: number; numericColumns: number; categoricalColumns: number; datetimeColumns: number; textColumns: number; overallMissingRate: number; totalMissingCells: number; } export interface QualityScore { score: number; grade: 'A' | 'B' | 'C' | 'D'; gradeDescription: string; issues: string[]; recommendations: string[]; } export interface DataProfileResult { success: boolean; profile?: DataProfile; quality?: QualityScore; executionTime?: number; error?: string; } export class DataProfileService { private client: AxiosInstance; constructor() { const baseURL = process.env.EXTRACTION_SERVICE_URL || 'http://localhost:8000'; this.client = axios.create({ baseURL, timeout: 60000, headers: { 'Content-Type': 'application/json' } }); } /** * 为 SSA Session 生成数据画像 * * @param sessionId SSA 会话 ID * @param data 数据数组(JSON 格式) * @returns DataProfile 结果 */ async generateProfile(sessionId: string, data: Record[]): Promise { const startTime = Date.now(); try { logger.info('[SSA:DataProfile] Generating profile', { sessionId, rowCount: data.length, columnCount: data.length > 0 ? Object.keys(data[0]).length : 0 }); const response = await this.client.post('/api/ssa/data-profile', { data, max_unique_values: 20, include_quality_score: true }); if (!response.data.success) { throw new Error(response.data.error || 'Profile generation failed'); } const result: DataProfileResult = { success: true, profile: response.data.profile, quality: response.data.quality, executionTime: response.data.execution_time }; // 保存到数据库 await this.saveProfileToSession(sessionId, result); const executionMs = Date.now() - startTime; logger.info('[SSA:DataProfile] Profile generated successfully', { sessionId, executionMs, summary: result.profile?.summary }); return result; } catch (error: any) { const executionMs = Date.now() - startTime; logger.error('[SSA:DataProfile] Profile generation failed', { sessionId, error: error.message, executionMs }); return { success: false, error: error.message, executionTime: executionMs / 1000 }; } } /** * 从 CSV 内容直接生成画像(让 Python pandas 解析 CSV) * * @param sessionId SSA 会话 ID * @param csvContent CSV 文件内容 * @returns DataProfile 结果 */ async generateProfileFromCSV(sessionId: string, csvContent: string): Promise { const startTime = Date.now(); try { logger.info('[SSA:DataProfile] Generating profile from CSV', { sessionId, contentLength: csvContent.length }); // 直接发送 CSV 内容给 Python 服务,让 pandas 解析 const response = await this.client.post('/api/ssa/data-profile-csv', { csv_content: csvContent, max_unique_values: 20, include_quality_score: true }); if (!response.data.success) { throw new Error(response.data.error || 'Profile generation failed'); } const result: DataProfileResult = { success: true, profile: response.data.profile, quality: response.data.quality, executionTime: response.data.execution_time }; // 保存到数据库 await this.saveProfileToSession(sessionId, result); const executionMs = Date.now() - startTime; logger.info('[SSA:DataProfile] Profile generated from CSV successfully', { sessionId, executionMs, summary: result.profile?.summary }); return result; } catch (error: any) { const executionMs = Date.now() - startTime; logger.error('[SSA:DataProfile] CSV profile generation failed', { sessionId, error: error.message, executionMs }); return { success: false, error: error.message, executionTime: executionMs / 1000 }; } } /** * 从 OSS 加载数据并生成画像 * * @param sessionId SSA 会话 ID * @returns DataProfile 结果 */ async generateProfileFromSession(sessionId: string): Promise { try { const session = await prisma.ssaSession.findUnique({ where: { id: sessionId } }); if (!session) { throw new Error(`Session not found: ${sessionId}`); } // 如果已有画像,直接返回 if (session.dataProfile) { logger.info('[SSA:DataProfile] Using cached profile', { sessionId }); return { success: true, profile: session.dataProfile as unknown as DataProfile }; } // 从 dataPayload 或 OSS 加载数据 if (session.dataPayload) { // JSON 格式数据,直接调用原方法 const data = session.dataPayload as unknown as Record[]; return await this.generateProfile(sessionId, data); } else if (session.dataOssKey) { // 从 OSS 下载文件 const buffer = await storage.download(session.dataOssKey); const ext = this.getFileExtensionFromOssKey(session.dataOssKey); // 按文件扩展名处理,避免将 Excel 二进制误当作 UTF-8 文本 if (ext === 'csv') { const content = buffer.toString('utf-8'); return await this.generateProfileFromCSV(sessionId, content); } if (ext === 'xlsx' || ext === 'xls') { return await this.generateProfileFromExcel(sessionId, buffer); } // 兼容历史数据:未知扩展名时再尝试 JSON/CSV 兜底 const content = buffer.toString('utf-8'); const trimmedContent = content.trim(); if (trimmedContent.startsWith('[') || trimmedContent.startsWith('{')) { const data = JSON.parse(content); return await this.generateProfile(sessionId, data); } return await this.generateProfileFromCSV(sessionId, content); } else { throw new Error('No data available for session'); } } catch (error: any) { logger.error('[SSA:DataProfile] Failed to generate profile from session', { sessionId, error: error.message }); return { success: false, error: error.message }; } } private getFileExtensionFromOssKey(ossKey: string): string { const match = ossKey.toLowerCase().match(/\.([a-z0-9]+)$/); return match?.[1] || ''; } /** * 从 Excel 二进制生成画像(xlsx/xls) */ private async generateProfileFromExcel(sessionId: string, buffer: Buffer): Promise { try { const xlsx = await import('xlsx'); const workbook = xlsx.read(buffer, { type: 'buffer' }); const firstSheetName = workbook.SheetNames[0]; const firstSheet = workbook.Sheets[firstSheetName]; const rows = xlsx.utils.sheet_to_json(firstSheet, { defval: null, raw: false, }) as Record[]; return await this.generateProfile(sessionId, rows); } catch (error: any) { logger.error('[SSA:DataProfile] Excel profile generation failed', { sessionId, error: error.message, }); return { success: false, error: error.message || 'Failed to parse Excel file', }; } } /** * 保存画像到 Session */ private async saveProfileToSession(sessionId: string, result: DataProfileResult): Promise { try { await prisma.ssaSession.update({ where: { id: sessionId }, data: { dataProfile: result.profile as any } }); logger.info('[SSA:DataProfile] Profile saved to session', { sessionId }); } catch (error: any) { logger.error('[SSA:DataProfile] Failed to save profile', { sessionId, error: error.message }); } } /** * 获取已缓存的画像 */ async getCachedProfile(sessionId: string): Promise { const session = await prisma.ssaSession.findUnique({ where: { id: sessionId }, select: { dataProfile: true } }); return session?.dataProfile as unknown as DataProfile | null; } /** * 为 LLM 生成精简版画像摘要 * 用于 Prompt 注入,控制 Token 消耗 */ generateProfileSummaryForLLM(profile: DataProfile): string { const { summary, columns } = profile; const lines: string[] = [ `## 数据概况`, `- 样本量: ${summary.totalRows} 行`, `- 变量数: ${summary.totalColumns} 列 (${summary.numericColumns} 数值, ${summary.categoricalColumns} 分类)`, `- 整体缺失率: ${summary.overallMissingRate}%`, '', `## 变量清单` ]; for (const col of columns) { let desc = `- **${col.name}** [${col.type}]`; if (col.missingRate > 0) { desc += ` (缺失 ${col.missingRate}%)`; } if (col.type === 'numeric') { desc += `: 均值=${col.mean}, SD=${col.std}, 范围=[${col.min}, ${col.max}]`; if (col.outlierCount && col.outlierCount > 0) { desc += `, ${col.outlierCount}个异常值`; } } else if (col.type === 'categorical') { const levels = col.topValues?.slice(0, 5).map(v => v.value).join(', '); desc += `: ${col.totalLevels}个水平 (${levels}${col.totalLevels && col.totalLevels > 5 ? '...' : ''})`; } lines.push(desc); } return lines.join('\n'); } /** * Phase I: 获取单变量详细分析(调用 Python variable-detail 端点) * * @param sessionId SSA 会话 ID * @param variableName 目标变量名 */ async getVariableDetail(sessionId: string, variableName: string): Promise { try { const csvContent = await this.loadCSVFromSession(sessionId); if (!csvContent) { return { success: false, error: 'No CSV data available for session' }; } const response = await this.client.post('/api/ssa/variable-detail', { csv_content: csvContent, variable_name: variableName, max_bins: 30, max_qq_points: 200, }); return response.data; } catch (error: any) { logger.error('[SSA:DataProfile] Variable detail failed', { sessionId, variableName, error: error.message, }); return { success: false, error: error.message }; } } /** * 从 Session 加载原始 CSV 字符串(供 variable-detail 复用) */ private async loadCSVFromSession(sessionId: string): Promise { const session = await prisma.ssaSession.findUnique({ where: { id: sessionId } }); if (!session) return null; if (session.dataOssKey) { const buffer = await storage.download(session.dataOssKey); return buffer.toString('utf-8'); } if (session.dataPayload) { const rows = session.dataPayload as unknown as Record[]; if (rows.length === 0) return null; const cols = Object.keys(rows[0]); const lines = [cols.join(',')]; for (const row of rows) { lines.push(cols.map(c => { const v = row[c]; if (v === null || v === undefined) return ''; const s = String(v); return s.includes(',') || s.includes('"') ? `"${s.replace(/"/g, '""')}"` : s; }).join(',')); } return lines.join('\n'); } return null; } } // 单例导出 export const dataProfileService = new DataProfileService();