feat(dc/tool-c): Day 2 - Session管理与数据处理完成

核心功能:
- 数据库: 创建dc_tool_c_sessions表 (12字段, 3索引)
- 服务层: SessionService (383行), DataProcessService (303行)
- 控制器: SessionController (300行, 6个API端点)
- 路由: 新增6个Session管理路由
- 测试: 7个API测试全部通过 (100%)

技术亮点:
- 零落盘架构: Excel内存解析, OSS存储
- Session管理: 10分钟过期, 心跳延长机制
- 云原生规范: storage/logger/prisma全平台复用
- 完整测试: 上传/预览/完整数据/删除/心跳

文件清单:
- backend/prisma/schema.prisma (新增DcToolCSession模型)
- backend/prisma/migrations/create_tool_c_session.sql
- backend/scripts/create-tool-c-table.mjs
- backend/src/modules/dc/tool-c/services/ (SessionService, DataProcessService)
- backend/src/modules/dc/tool-c/controllers/SessionController.ts
- backend/src/modules/dc/tool-c/routes/index.ts
- backend/test-tool-c-day2.mjs
- docs/03-业务模块/DC-数据清洗整理/00-工具C当前状态与开发指南.md
- docs/03-业务模块/DC-数据清洗整理/06-开发记录/2025-12-06_工具C_Day2开发完成总结.md

代码统计: ~1900行
测试结果: 7/7 通过 (100%)
云原生规范: 完全符合
This commit is contained in:
2025-12-06 22:12:47 +08:00
parent 8be741cd52
commit 2348234013
13 changed files with 3466 additions and 0 deletions

View File

@@ -0,0 +1,171 @@
# 工具C (Tool C) - 科研数据编辑器
## 📁 项目结构
```
tool-c/
├── services/
│ └── PythonExecutorService.ts # Python代码执行服务
├── controllers/
│ └── TestController.ts # 测试控制器Day 1
├── routes/
│ └── index.ts # 路由定义
└── README.md # 本文件
```
## ⚙️ 环境变量配置
`backend/.env` 文件中添加以下配置:
```bash
# Python微服务地址
EXTRACTION_SERVICE_URL=http://localhost:8000
```
**说明**
- 默认值:`http://localhost:8000`
- Python微服务需要先启动才能使用工具C
- 启动命令:`cd extraction_service && .\venv\Scripts\activate && uvicorn main:app --host 0.0.0.0 --port 8000`
## 🚀 API端点Day 1 测试)
### 1. 测试Python服务健康检查
```
GET /api/v1/dc/tool-c/test/health
```
**响应**
```json
{
"success": true,
"message": "Python服务正常",
"healthy": true
}
```
### 2. 测试代码验证
```
POST /api/v1/dc/tool-c/test/validate
```
**请求体**
```json
{
"code": "df['age_group'] = df['age'] > 60"
}
```
**响应**
```json
{
"success": true,
"data": {
"valid": true,
"errors": [],
"warnings": []
}
}
```
### 3. 测试代码执行
```
POST /api/v1/dc/tool-c/test/execute
```
**请求体**
```json
{
"data": [
{"age": 25},
{"age": 65}
],
"code": "df['old'] = df['age'] > 60"
}
```
**响应**
```json
{
"success": true,
"data": {
"success": true,
"result_data": [
{"age": 25, "old": false},
{"age": 65, "old": true}
],
"output": "",
"error": null,
"execution_time": 0.004,
"result_shape": [2, 2]
}
}
```
## ✅ Day 1 完成情况
- [x] 创建Python微服务dc_executor.py
- [x] 添加AST安全检查
- [x] 实现Pandas代码执行
- [x] 创建FastAPI端点/api/dc/validate, /api/dc/execute
- [x] 创建Node.js服务PythonExecutorService.ts
- [x] 创建测试控制器和路由
- [x] 验证功能正常工作
## 📝 使用示例
### 启动Python微服务
```bash
cd extraction_service
.\venv\Scripts\activate
python main.py
```
### 启动Node.js后端
```bash
cd backend
npm run dev
```
### 测试API
```bash
# 1. 健康检查
curl http://localhost:3000/api/v1/dc/tool-c/test/health
# 2. 代码验证
curl -X POST http://localhost:3000/api/v1/dc/tool-c/test/validate \
-H "Content-Type: application/json" \
-d '{"code":"df[\"x\"] = 1"}'
# 3. 代码执行
curl -X POST http://localhost:3000/api/v1/dc/tool-c/test/execute \
-H "Content-Type: application/json" \
-d '{"data":[{"age":25},{"age":65}],"code":"df[\"old\"] = df[\"age\"] > 60"}'
```
## 🔐 安全特性
- **AST静态检查**拦截危险模块导入os, sys, subprocess等
- **超时保护**代码执行超时30秒自动终止
- **沙箱环境**:限制可用的内置函数
- **错误处理**:完整的异常捕获和错误信息
## 📚 技术栈
- **Python后端**: FastAPI + Pandas + AST
- **Node.js后端**: Fastify + Axios + TypeScript
- **通信方式**: HTTP REST API
- **数据格式**: JSON
## 🎯 下一步Day 2
- [ ] Session管理数据库 + OSS
- [ ] 数据处理服务
- [ ] AI代码生成服务LLMFactory集成
- [ ] 前端基础框架搭建

View File

@@ -0,0 +1,299 @@
/**
* Session控制器
*
* API端点
* - POST /sessions/upload 上传Excel文件创建Session
* - GET /sessions/:id 获取Session信息
* - GET /sessions/:id/preview 获取预览数据前100行
* - GET /sessions/:id/full 获取完整数据
* - DELETE /sessions/:id 删除Session
* - POST /sessions/:id/heartbeat 更新心跳
*
* @module SessionController
*/
import { FastifyRequest, FastifyReply } from 'fastify';
import { MultipartFile } from '@fastify/multipart';
import { logger } from '../../../../common/logging/index.js';
import { sessionService } from '../services/SessionService.js';
import { dataProcessService } from '../services/DataProcessService.js';
// ==================== 请求参数类型定义 ====================
interface SessionIdParams {
id: string;
}
// ==================== 控制器 ====================
export class SessionController {
/**
* 上传Excel文件创建Session
*
* POST /api/v1/dc/tool-c/sessions/upload
*/
async upload(request: FastifyRequest, reply: FastifyReply) {
try {
logger.info('[SessionController] 收到文件上传请求');
// 1. 获取multipart数据
const data = await request.file();
if (!data) {
return reply.code(400).send({
success: false,
error: '未找到上传的文件',
});
}
const file = data as MultipartFile;
const fileName = file.filename;
logger.info(`[SessionController] 文件名: ${fileName}`);
// 2. 读取文件到Buffer
const fileBuffer = await file.toBuffer();
// 3. 验证文件
const validation = dataProcessService.validateFile(fileBuffer, fileName);
if (!validation.valid) {
return reply.code(400).send({
success: false,
error: validation.error,
});
}
// 4. 获取用户ID从请求中提取实际部署时从JWT获取
// TODO: 从JWT token中获取userId
const userId = (request as any).userId || 'test-user-001';
// 5. 创建Session
const session = await sessionService.createSession(
userId,
fileName,
fileBuffer
);
logger.info(`[SessionController] Session创建成功: ${session.id}`);
// 6. 返回Session信息
return reply.code(201).send({
success: true,
message: 'Session创建成功',
data: {
sessionId: session.id,
fileName: session.fileName,
fileSize: dataProcessService.formatFileSize(session.fileSize),
totalRows: session.totalRows,
totalCols: session.totalCols,
columns: session.columns,
expiresAt: session.expiresAt,
createdAt: session.createdAt,
},
});
} catch (error: any) {
logger.error(`[SessionController] 文件上传失败: ${error.message}`);
return reply.code(500).send({
success: false,
error: error.message || '文件上传失败,请重试',
});
}
}
/**
* 获取Session信息只含元数据
*
* GET /api/v1/dc/tool-c/sessions/:id
*/
async getSession(
request: FastifyRequest<{ Params: SessionIdParams }>,
reply: FastifyReply
) {
try {
const { id } = request.params;
logger.info(`[SessionController] 获取Session: ${id}`);
const session = await sessionService.getSession(id);
return reply.code(200).send({
success: true,
data: {
sessionId: session.id,
fileName: session.fileName,
fileSize: dataProcessService.formatFileSize(session.fileSize),
totalRows: session.totalRows,
totalCols: session.totalCols,
columns: session.columns,
encoding: session.encoding,
expiresAt: session.expiresAt,
createdAt: session.createdAt,
updatedAt: session.updatedAt,
},
});
} catch (error: any) {
logger.error(`[SessionController] 获取Session失败: ${error.message}`);
const statusCode = error.message.includes('不存在') || error.message.includes('过期')
? 404
: 500;
return reply.code(statusCode).send({
success: false,
error: error.message || '获取Session失败',
});
}
}
/**
* 获取预览数据前100行
*
* GET /api/v1/dc/tool-c/sessions/:id/preview
*/
async getPreviewData(
request: FastifyRequest<{ Params: SessionIdParams }>,
reply: FastifyReply
) {
try {
const { id } = request.params;
logger.info(`[SessionController] 获取预览数据: ${id}`);
const result = await sessionService.getPreviewData(id);
return reply.code(200).send({
success: true,
data: {
sessionId: result.id,
fileName: result.fileName,
totalRows: result.totalRows,
totalCols: result.totalCols,
columns: result.columns,
previewRows: result.previewData.length,
previewData: result.previewData,
},
});
} catch (error: any) {
logger.error(`[SessionController] 获取预览数据失败: ${error.message}`);
const statusCode = error.message.includes('不存在') || error.message.includes('过期')
? 404
: 500;
return reply.code(statusCode).send({
success: false,
error: error.message || '获取预览数据失败',
});
}
}
/**
* 获取完整数据
*
* GET /api/v1/dc/tool-c/sessions/:id/full
*/
async getFullData(
request: FastifyRequest<{ Params: SessionIdParams }>,
reply: FastifyReply
) {
try {
const { id } = request.params;
logger.info(`[SessionController] 获取完整数据: ${id}`);
const data = await sessionService.getFullData(id);
return reply.code(200).send({
success: true,
data: {
sessionId: id,
totalRows: data.length,
data,
},
});
} catch (error: any) {
logger.error(`[SessionController] 获取完整数据失败: ${error.message}`);
const statusCode = error.message.includes('不存在') || error.message.includes('过期')
? 404
: 500;
return reply.code(statusCode).send({
success: false,
error: error.message || '获取完整数据失败',
});
}
}
/**
* 删除Session
*
* DELETE /api/v1/dc/tool-c/sessions/:id
*/
async deleteSession(
request: FastifyRequest<{ Params: SessionIdParams }>,
reply: FastifyReply
) {
try {
const { id } = request.params;
logger.info(`[SessionController] 删除Session: ${id}`);
await sessionService.deleteSession(id);
return reply.code(200).send({
success: true,
message: 'Session删除成功',
});
} catch (error: any) {
logger.error(`[SessionController] 删除Session失败: ${error.message}`);
return reply.code(500).send({
success: false,
error: error.message || '删除Session失败',
});
}
}
/**
* 更新心跳(延长过期时间)
*
* POST /api/v1/dc/tool-c/sessions/:id/heartbeat
*/
async updateHeartbeat(
request: FastifyRequest<{ Params: SessionIdParams }>,
reply: FastifyReply
) {
try {
const { id } = request.params;
logger.info(`[SessionController] 更新心跳: ${id}`);
const newExpiresAt = await sessionService.updateHeartbeat(id);
return reply.code(200).send({
success: true,
message: '心跳更新成功',
data: {
sessionId: id,
expiresAt: newExpiresAt,
},
});
} catch (error: any) {
logger.error(`[SessionController] 更新心跳失败: ${error.message}`);
const statusCode = error.message.includes('不存在')
? 404
: 500;
return reply.code(statusCode).send({
success: false,
error: error.message || '更新心跳失败',
});
}
}
}
// ==================== 导出单例实例 ====================
export const sessionController = new SessionController();

View File

@@ -0,0 +1,130 @@
/**
* 工具C测试控制器
*
* 用于Day 1验证Python服务集成
*
* @module TestController
*/
import { FastifyRequest, FastifyReply } from 'fastify';
import { logger } from '../../../../common/logging/index.js';
import { pythonExecutorService } from '../services/PythonExecutorService.js';
// ==================== 请求体类型定义 ====================
interface ValidateRequest {
code: string;
}
interface ExecuteRequest {
data: Record<string, any>[];
code: string;
}
// ==================== 控制器 ====================
export class TestController {
/**
* 测试Python服务健康检查
*
* GET /api/dc/tool-c/test/health
*/
async testHealth(request: FastifyRequest, reply: FastifyReply) {
try {
logger.info('[Test] 测试Python服务健康检查');
const isHealthy = await pythonExecutorService.healthCheck();
return reply.code(200).send({
success: true,
message: isHealthy ? 'Python服务正常' : 'Python服务异常',
healthy: isHealthy,
});
} catch (error: any) {
logger.error(`[Test] 健康检查失败: ${error.message}`);
return reply.code(500).send({
success: false,
error: error.message,
});
}
}
/**
* 测试代码验证
*
* POST /api/dc/tool-c/test/validate
*/
async testValidate(
request: FastifyRequest<{ Body: ValidateRequest }>,
reply: FastifyReply
) {
try {
const { code } = request.body;
if (!code) {
return reply.code(400).send({
success: false,
error: '缺少必需参数: code',
});
}
logger.info(`[Test] 测试代码验证,长度: ${code.length} 字符`);
const result = await pythonExecutorService.validateCode(code);
return reply.code(200).send({
success: true,
data: result,
});
} catch (error: any) {
logger.error(`[Test] 代码验证失败: ${error.message}`);
return reply.code(500).send({
success: false,
error: error.message,
});
}
}
/**
* 测试代码执行
*
* POST /api/dc/tool-c/test/execute
*/
async testExecute(
request: FastifyRequest<{ Body: ExecuteRequest }>,
reply: FastifyReply
) {
try {
const { data, code } = request.body;
if (!data || !code) {
return reply.code(400).send({
success: false,
error: '缺少必需参数: data 或 code',
});
}
logger.info(
`[Test] 测试代码执行: 数据行数=${data.length}, 代码长度=${code.length} 字符`
);
const result = await pythonExecutorService.executeCode(data, code);
return reply.code(200).send({
success: result.success,
data: result,
});
} catch (error: any) {
logger.error(`[Test] 代码执行失败: ${error.message}`);
return reply.code(500).send({
success: false,
error: error.message,
});
}
}
}
// ==================== 导出单例实例 ====================
export const testController = new TestController();

View File

@@ -0,0 +1,61 @@
/**
* 工具C路由定义
*
* @module routes
*/
import { FastifyInstance } from 'fastify';
import { testController } from '../controllers/TestController.js';
import { sessionController } from '../controllers/SessionController.js';
export async function toolCRoutes(fastify: FastifyInstance) {
// ==================== 测试路由Day 1 ====================
// 测试Python服务健康检查
fastify.get('/test/health', {
handler: testController.testHealth.bind(testController),
});
// 测试代码验证
fastify.post('/test/validate', {
handler: testController.testValidate.bind(testController),
});
// 测试代码执行
fastify.post('/test/execute', {
handler: testController.testExecute.bind(testController),
});
// ==================== Session管理路由Day 2 ====================
// 上传Excel文件创建Session
fastify.post('/sessions/upload', {
handler: sessionController.upload.bind(sessionController),
});
// 获取Session信息元数据
fastify.get('/sessions/:id', {
handler: sessionController.getSession.bind(sessionController),
});
// 获取预览数据前100行
fastify.get('/sessions/:id/preview', {
handler: sessionController.getPreviewData.bind(sessionController),
});
// 获取完整数据
fastify.get('/sessions/:id/full', {
handler: sessionController.getFullData.bind(sessionController),
});
// 删除Session
fastify.delete('/sessions/:id', {
handler: sessionController.deleteSession.bind(sessionController),
});
// 更新心跳延长10分钟
fastify.post('/sessions/:id/heartbeat', {
handler: sessionController.updateHeartbeat.bind(sessionController),
});
}

View File

@@ -0,0 +1,302 @@
/**
* 数据处理服务
*
* 功能:
* - Excel文件解析
* - 文件验证
* - 列类型推断(可选)
*
* @module DataProcessService
*/
import * as xlsx from 'xlsx';
import { logger } from '../../../../common/logging/index.js';
// ==================== 类型定义 ====================
interface ParsedExcelData {
data: any[];
columns: string[];
totalRows: number;
totalCols: number;
}
interface ValidationResult {
valid: boolean;
error?: string;
}
interface ColumnType {
name: string;
type: 'number' | 'string' | 'date' | 'boolean' | 'mixed';
sampleValues: any[];
}
// ==================== 配置常量 ====================
const MAX_FILE_SIZE = 10 * 1024 * 1024; // 10MB
const SUPPORTED_EXTENSIONS = ['.xlsx', '.xls', '.csv'];
// ==================== 数据处理服务 ====================
export class DataProcessService {
/**
* 解析Excel文件
*
* @param buffer - 文件Buffer
* @returns 解析后的数据
*/
parseExcel(buffer: Buffer): ParsedExcelData {
try {
logger.info('[DataProcessService] 开始解析Excel文件');
// 1. 读取Excel文件内存操作
const workbook = xlsx.read(buffer, { type: 'buffer' });
// 2. 获取第一个工作表
const sheetName = workbook.SheetNames[0];
if (!sheetName) {
throw new Error('Excel文件中没有工作表');
}
const sheet = workbook.Sheets[sheetName];
// 3. 转换为JSON格式
const data = xlsx.utils.sheet_to_json(sheet);
if (data.length === 0) {
throw new Error('Excel文件没有数据');
}
// 4. 提取元数据
const totalRows = data.length;
const columns = Object.keys(data[0] || {});
const totalCols = columns.length;
if (totalCols === 0) {
throw new Error('Excel文件没有列');
}
logger.info(
`[DataProcessService] Excel解析成功: ${totalRows}行 x ${totalCols}`,
{ columns }
);
return {
data,
columns,
totalRows,
totalCols,
};
} catch (error: any) {
logger.error(`[DataProcessService] Excel解析失败: ${error.message}`);
throw new Error(`Excel文件解析失败: ${error.message}`);
}
}
/**
* 验证文件
*
* @param buffer - 文件Buffer
* @param fileName - 文件名
* @returns 验证结果
*/
validateFile(buffer: Buffer, fileName: string): ValidationResult {
try {
logger.info('[DataProcessService] 验证文件', { fileName, size: buffer.length });
// 1. 检查文件大小
if (buffer.length === 0) {
return {
valid: false,
error: '文件为空',
};
}
if (buffer.length > MAX_FILE_SIZE) {
const sizeMB = (buffer.length / 1024 / 1024).toFixed(2);
return {
valid: false,
error: `文件大小超过限制最大10MB当前: ${sizeMB}MB`,
};
}
// 2. 检查文件扩展名
const ext = fileName.toLowerCase().substring(fileName.lastIndexOf('.'));
if (!SUPPORTED_EXTENSIONS.includes(ext)) {
return {
valid: false,
error: `不支持的文件格式: ${ext},仅支持 .xlsx, .xls, .csv`,
};
}
// 3. 尝试解析文件
try {
const parsed = this.parseExcel(buffer);
// 检查行数
if (parsed.totalRows > 50000) {
logger.warn('[DataProcessService] 文件行数较多,可能影响性能', {
rows: parsed.totalRows,
});
}
// 检查列数
if (parsed.totalCols > 100) {
logger.warn('[DataProcessService] 文件列数较多', {
cols: parsed.totalCols,
});
}
} catch (error: any) {
return {
valid: false,
error: `文件内容无法解析: ${error.message}`,
};
}
logger.info('[DataProcessService] 文件验证通过', { fileName });
return {
valid: true,
};
} catch (error: any) {
logger.error(`[DataProcessService] 文件验证失败: ${error.message}`);
return {
valid: false,
error: `文件验证失败: ${error.message}`,
};
}
}
/**
* 推断列类型可选功能Day 3优化
*
* @param data - 数据数组
* @returns 列类型信息
*/
inferColumnTypes(data: any[]): ColumnType[] {
try {
logger.info('[DataProcessService] 推断列类型');
if (data.length === 0) {
return [];
}
const columns = Object.keys(data[0] || {});
const columnTypes: ColumnType[] = [];
for (const columnName of columns) {
// 取前10行样本值
const sampleValues = data.slice(0, 10).map((row) => row[columnName]);
// 推断类型
const types = new Set(sampleValues.map((val) => this.getValueType(val)));
let inferredType: ColumnType['type'] = 'string';
if (types.size === 1) {
inferredType = Array.from(types)[0];
} else if (types.size > 1) {
inferredType = 'mixed';
}
columnTypes.push({
name: columnName,
type: inferredType,
sampleValues,
});
}
logger.info(`[DataProcessService] 列类型推断完成: ${columnTypes.length}`);
return columnTypes;
} catch (error: any) {
logger.error(`[DataProcessService] 列类型推断失败: ${error.message}`);
return [];
}
}
/**
* 获取值类型
*
* @param value - 值
* @returns 类型
*/
private getValueType(value: any): ColumnType['type'] {
if (value === null || value === undefined || value === '') {
return 'string';
}
if (typeof value === 'number') {
return 'number';
}
if (typeof value === 'boolean') {
return 'boolean';
}
if (value instanceof Date) {
return 'date';
}
// 尝试解析为日期
const datePattern = /^\d{4}-\d{2}-\d{2}$/;
if (typeof value === 'string' && datePattern.test(value)) {
return 'date';
}
// 尝试解析为数字
if (typeof value === 'string' && !isNaN(Number(value))) {
return 'number';
}
return 'string';
}
/**
* 格式化文件大小
*
* @param bytes - 字节数
* @returns 格式化后的大小
*/
formatFileSize(bytes: number): string {
if (bytes < 1024) {
return `${bytes} B`;
} else if (bytes < 1024 * 1024) {
return `${(bytes / 1024).toFixed(2)} KB`;
} else {
return `${(bytes / 1024 / 1024).toFixed(2)} MB`;
}
}
/**
* 生成文件摘要信息
*
* @param buffer - 文件Buffer
* @param fileName - 文件名
* @returns 文件摘要
*/
generateFileSummary(buffer: Buffer, fileName: string) {
try {
const parsed = this.parseExcel(buffer);
const columnTypes = this.inferColumnTypes(parsed.data);
return {
fileName,
fileSize: this.formatFileSize(buffer.length),
totalRows: parsed.totalRows,
totalCols: parsed.totalCols,
columns: parsed.columns,
columnTypes,
sampleData: parsed.data.slice(0, 5), // 前5行样本
};
} catch (error: any) {
logger.error(`[DataProcessService] 生成文件摘要失败: ${error.message}`);
throw error;
}
}
}
// ==================== 导出单例实例 ====================
export const dataProcessService = new DataProcessService();

View File

@@ -0,0 +1,176 @@
/**
* Python代码执行服务
*
* 功能:
* - 调用Python微服务执行Pandas代码
* - AST安全验证
* - 超时控制
* - 错误处理
*
* @module PythonExecutorService
*/
import axios, { AxiosInstance } from 'axios';
import { logger } from '../../../../common/logging/index.js';
// ==================== 类型定义 ====================
interface ValidateCodeRequest {
code: string;
}
interface ValidateCodeResponse {
valid: boolean;
errors: string[];
warnings: string[];
}
interface ExecuteCodeRequest {
data: Record<string, any>[];
code: string;
}
interface ExecuteCodeResponse {
success: boolean;
result_data: Record<string, any>[] | null;
output: string;
error: string | null;
execution_time: number;
result_shape?: [number, number];
}
// ==================== 配置常量 ====================
const EXTRACTION_SERVICE_URL = process.env.EXTRACTION_SERVICE_URL || 'http://localhost:8000';
const DEFAULT_TIMEOUT = 30000; // 30秒超时
// ==================== Python执行器服务 ====================
export class PythonExecutorService {
private client: AxiosInstance;
constructor() {
// 创建axios实例
this.client = axios.create({
baseURL: EXTRACTION_SERVICE_URL,
timeout: DEFAULT_TIMEOUT,
headers: {
'Content-Type': 'application/json',
},
});
logger.info(`PythonExecutorService initialized: ${EXTRACTION_SERVICE_URL}`);
}
/**
* 验证Pandas代码安全性AST检查
*
* @param code - Pandas代码
* @returns 验证结果
*/
async validateCode(code: string): Promise<ValidateCodeResponse> {
try {
logger.info(`验证代码安全性,长度: ${code.length} 字符`);
const response = await this.client.post<ValidateCodeResponse>(
'/api/dc/validate',
{ code } as ValidateCodeRequest
);
logger.info(
`代码验证完成: valid=${response.data.valid}, ` +
`errors=${response.data.errors.length}, warnings=${response.data.warnings.length}`
);
return response.data;
} catch (error: any) {
logger.error(`代码验证失败: ${error.message}`);
if (axios.isAxiosError(error)) {
if (error.response) {
throw new Error(`验证失败 (${error.response.status}): ${error.response.data?.detail || error.message}`);
} else if (error.code === 'ECONNREFUSED') {
throw new Error('无法连接到Python微服务请确保服务已启动');
} else if (error.code === 'ECONNABORTED') {
throw new Error('代码验证超时');
}
}
throw new Error(`代码验证失败: ${error.message}`);
}
}
/**
* 执行Pandas代码
*
* @param data - JSON格式的数据数组对象
* @param code - Pandas代码操作df变量
* @returns 执行结果
*/
async executeCode(
data: Record<string, any>[],
code: string
): Promise<ExecuteCodeResponse> {
try {
logger.info(
`执行Pandas代码: 数据行数=${data.length}, 代码长度=${code.length} 字符`
);
const response = await this.client.post<ExecuteCodeResponse>(
'/api/dc/execute',
{ data, code } as ExecuteCodeRequest,
{ timeout: DEFAULT_TIMEOUT } // 执行可能较慢,使用完整超时
);
if (response.data.success) {
logger.info(
`代码执行成功: ` +
`结果shape=${JSON.stringify(response.data.result_shape)}, ` +
`耗时=${response.data.execution_time.toFixed(3)}`
);
} else {
logger.warn(`代码执行失败: ${response.data.error}`);
}
return response.data;
} catch (error: any) {
logger.error(`代码执行失败: ${error.message}`);
if (axios.isAxiosError(error)) {
if (error.response) {
throw new Error(`执行失败 (${error.response.status}): ${error.response.data?.detail || error.message}`);
} else if (error.code === 'ECONNREFUSED') {
throw new Error('无法连接到Python微服务请确保服务已启动');
} else if (error.code === 'ECONNABORTED') {
throw new Error('代码执行超时(>30秒');
}
}
throw new Error(`代码执行失败: ${error.message}`);
}
}
/**
* 健康检查测试Python服务连接
*
* @returns 服务是否正常
*/
async healthCheck(): Promise<boolean> {
try {
const response = await this.client.get('/api/health', { timeout: 5000 });
const isHealthy = response.status === 200 && response.data.status === 'healthy';
logger.info(`Python服务健康检查: ${isHealthy ? '正常' : '异常'}`);
return isHealthy;
} catch (error: any) {
logger.error(`Python服务健康检查失败: ${error.message}`);
return false;
}
}
}
// ==================== 导出单例实例 ====================
export const pythonExecutorService = new PythonExecutorService();

View File

@@ -0,0 +1,382 @@
/**
* Session管理服务
*
* 功能:
* - 创建Session上传Excel到OSS
* - 获取Session信息
* - 获取预览/完整数据从OSS
* - 删除Session
* - 更新心跳
*
* @module SessionService
*/
import { storage } from '../../../../common/storage/index.js';
import { logger } from '../../../../common/logging/index.js';
import { prisma } from '../../../../config/database.js';
import * as xlsx from 'xlsx';
// ==================== 类型定义 ====================
interface SessionData {
id: string;
userId: string;
fileName: string;
fileKey: string;
totalRows: number;
totalCols: number;
columns: string[];
encoding: string | null;
fileSize: number;
createdAt: Date;
updatedAt: Date;
expiresAt: Date;
}
interface PreviewDataResponse extends SessionData {
previewData: any[];
}
// ==================== 配置常量 ====================
const MAX_FILE_SIZE = 10 * 1024 * 1024; // 10MB
const SESSION_EXPIRE_MINUTES = 10; // Session过期时间10分钟
const PREVIEW_ROWS = 100; // 预览行数
// ==================== Session管理服务 ====================
export class SessionService {
/**
* 创建Session
*
* @param userId - 用户ID
* @param fileName - 原始文件名
* @param fileBuffer - 文件Buffer
* @returns Session信息
*/
async createSession(
userId: string,
fileName: string,
fileBuffer: Buffer
): Promise<SessionData> {
try {
logger.info(`[SessionService] 创建Session: userId=${userId}, fileName=${fileName}`);
// 1. 验证文件大小
if (fileBuffer.length > MAX_FILE_SIZE) {
throw new Error(`文件大小超过限制最大10MB当前: ${(fileBuffer.length / 1024 / 1024).toFixed(2)}MB`);
}
// 2. 内存解析Excel不落盘符合云原生规范
logger.info('[SessionService] 解析Excel文件...');
let workbook: xlsx.WorkBook;
try {
workbook = xlsx.read(fileBuffer, { type: 'buffer' });
} catch (error: any) {
throw new Error(`Excel文件解析失败: ${error.message}`);
}
const sheetName = workbook.SheetNames[0];
if (!sheetName) {
throw new Error('Excel文件中没有工作表');
}
const sheet = workbook.Sheets[sheetName];
const data = xlsx.utils.sheet_to_json(sheet);
if (data.length === 0) {
throw new Error('Excel文件没有数据');
}
// 3. 提取元数据
const totalRows = data.length;
const totalCols = Object.keys(data[0] || {}).length;
const columns = Object.keys(data[0] || {});
logger.info(`[SessionService] 解析完成: ${totalRows}行 x ${totalCols}`);
// 4. 上传到OSS使用平台storage服务
const timestamp = Date.now();
const fileKey = `dc/tool-c/sessions/${userId}/${timestamp}-${fileName}`;
logger.info(`[SessionService] 上传到OSS: ${fileKey}`);
await storage.upload(fileKey, fileBuffer);
logger.info('[SessionService] OSS上传成功');
// 5. 保存Session到数据库只存元数据符合云原生规范
const expiresAt = new Date(Date.now() + SESSION_EXPIRE_MINUTES * 60 * 1000);
const session = await prisma.dcToolCSession.create({
data: {
userId,
fileName,
fileKey,
totalRows,
totalCols,
columns: columns, // Prisma会自动转换为JSONB
encoding: 'utf-8', // 默认utf-8后续可扩展检测
fileSize: fileBuffer.length,
expiresAt,
},
});
logger.info(`[SessionService] Session创建成功: ${session.id}`);
return this.formatSession(session);
} catch (error: any) {
logger.error(`[SessionService] 创建Session失败: ${error.message}`, { error });
throw error;
}
}
/**
* 获取Session信息只含元数据
*
* @param sessionId - Session ID
* @returns Session信息
*/
async getSession(sessionId: string): Promise<SessionData> {
try {
logger.info(`[SessionService] 获取Session: ${sessionId}`);
const session = await prisma.dcToolCSession.findUnique({
where: { id: sessionId },
});
if (!session) {
throw new Error('Session不存在');
}
// 检查是否过期
if (new Date() > session.expiresAt) {
logger.warn(`[SessionService] Session已过期: ${sessionId}`);
throw new Error('Session已过期请重新上传文件');
}
logger.info(`[SessionService] Session获取成功: ${sessionId}`);
return this.formatSession(session);
} catch (error: any) {
logger.error(`[SessionService] 获取Session失败: ${error.message}`, { sessionId });
throw error;
}
}
/**
* 获取预览数据前100行
*
* @param sessionId - Session ID
* @returns Session信息 + 预览数据
*/
async getPreviewData(sessionId: string): Promise<PreviewDataResponse> {
try {
logger.info(`[SessionService] 获取预览数据: ${sessionId}`);
// 1. 获取Session信息
const session = await this.getSession(sessionId);
// 2. 从OSS下载文件到内存
logger.info(`[SessionService] 从OSS下载文件: ${session.fileKey}`);
const buffer = await storage.download(session.fileKey);
// 3. 内存解析Excel不落盘
const workbook = xlsx.read(buffer, { type: 'buffer' });
const sheetName = workbook.SheetNames[0];
const sheet = workbook.Sheets[sheetName];
const data = xlsx.utils.sheet_to_json(sheet);
// 4. 返回前100行
const previewData = data.slice(0, PREVIEW_ROWS);
logger.info(`[SessionService] 预览数据获取成功: ${previewData.length}`);
return {
...session,
previewData,
};
} catch (error: any) {
logger.error(`[SessionService] 获取预览数据失败: ${error.message}`, { sessionId });
throw error;
}
}
/**
* 获取完整数据
*
* @param sessionId - Session ID
* @returns 完整数据数组
*/
async getFullData(sessionId: string): Promise<any[]> {
try {
logger.info(`[SessionService] 获取完整数据: ${sessionId}`);
// 1. 获取Session信息
const session = await this.getSession(sessionId);
// 2. 从OSS下载文件到内存
logger.info(`[SessionService] 从OSS下载文件: ${session.fileKey}`);
const buffer = await storage.download(session.fileKey);
// 3. 内存解析Excel
const workbook = xlsx.read(buffer, { type: 'buffer' });
const sheetName = workbook.SheetNames[0];
const sheet = workbook.Sheets[sheetName];
const data = xlsx.utils.sheet_to_json(sheet);
logger.info(`[SessionService] 完整数据获取成功: ${data.length}`);
return data;
} catch (error: any) {
logger.error(`[SessionService] 获取完整数据失败: ${error.message}`, { sessionId });
throw error;
}
}
/**
* 删除Session
*
* @param sessionId - Session ID
*/
async deleteSession(sessionId: string): Promise<void> {
try {
logger.info(`[SessionService] 删除Session: ${sessionId}`);
// 1. 获取Session信息
const session = await prisma.dcToolCSession.findUnique({
where: { id: sessionId },
});
if (!session) {
logger.warn(`[SessionService] Session不存在: ${sessionId}`);
return;
}
// 2. 删除OSS文件
try {
logger.info(`[SessionService] 删除OSS文件: ${session.fileKey}`);
await storage.delete(session.fileKey);
logger.info('[SessionService] OSS文件删除成功');
} catch (error: any) {
logger.warn(`[SessionService] OSS文件删除失败: ${error.message}`);
// 继续执行,删除数据库记录
}
// 3. 删除数据库记录
await prisma.dcToolCSession.delete({
where: { id: sessionId },
});
logger.info(`[SessionService] Session删除成功: ${sessionId}`);
} catch (error: any) {
logger.error(`[SessionService] 删除Session失败: ${error.message}`, { sessionId });
throw error;
}
}
/**
* 更新心跳(延长过期时间)
*
* @param sessionId - Session ID
* @returns 新的过期时间
*/
async updateHeartbeat(sessionId: string): Promise<Date> {
try {
logger.info(`[SessionService] 更新心跳: ${sessionId}`);
// 检查Session是否存在
const session = await prisma.dcToolCSession.findUnique({
where: { id: sessionId },
});
if (!session) {
throw new Error('Session不存在');
}
// 更新过期时间
const newExpiresAt = new Date(Date.now() + SESSION_EXPIRE_MINUTES * 60 * 1000);
await prisma.dcToolCSession.update({
where: { id: sessionId },
data: {
expiresAt: newExpiresAt,
updatedAt: new Date(),
},
});
logger.info(`[SessionService] 心跳更新成功: ${sessionId}, 新过期时间: ${newExpiresAt.toISOString()}`);
return newExpiresAt;
} catch (error: any) {
logger.error(`[SessionService] 更新心跳失败: ${error.message}`, { sessionId });
throw error;
}
}
/**
* 清理过期Session定时任务使用
*
* @returns 清理的Session数量
*/
async cleanExpiredSessions(): Promise<number> {
try {
logger.info('[SessionService] 开始清理过期Session...');
// 查询所有过期的Session
const expiredSessions = await prisma.dcToolCSession.findMany({
where: {
expiresAt: {
lt: new Date(),
},
},
});
logger.info(`[SessionService] 发现${expiredSessions.length}个过期Session`);
// 删除过期Session
let cleanedCount = 0;
for (const session of expiredSessions) {
try {
await this.deleteSession(session.id);
cleanedCount++;
} catch (error: any) {
logger.warn(`[SessionService] 清理Session失败: ${session.id}`, { error });
}
}
logger.info(`[SessionService] 清理完成: ${cleanedCount}/${expiredSessions.length}`);
return cleanedCount;
} catch (error: any) {
logger.error(`[SessionService] 清理过期Session失败: ${error.message}`, { error });
return 0;
}
}
/**
* 格式化Session数据
*
* @param session - Prisma Session对象
* @returns 格式化的Session数据
*/
private formatSession(session: any): SessionData {
return {
id: session.id,
userId: session.userId,
fileName: session.fileName,
fileKey: session.fileKey,
totalRows: session.totalRows,
totalCols: session.totalCols,
columns: session.columns as string[],
encoding: session.encoding,
fileSize: session.fileSize,
createdAt: session.createdAt,
updatedAt: session.updatedAt,
expiresAt: session.expiresAt,
};
}
}
// ==================== 导出单例实例 ====================
export const sessionService = new SessionService();