/** * Session控制器 * * API端点: * - POST /sessions/upload 上传Excel文件创建Session * - GET /sessions/:id 获取Session信息 * - GET /sessions/:id/preview 获取预览数据(⭐ 已改为全量加载) * - GET /sessions/:id/full 获取完整数据 * - DELETE /sessions/:id 删除Session * - POST /sessions/:id/heartbeat 更新心跳 * * @module SessionController */ import { FastifyRequest, FastifyReply } from 'fastify'; import { MultipartFile } from '@fastify/multipart'; import { logger } from '../../../../common/logging/index.js'; import { sessionService } from '../services/SessionService.js'; import { dataProcessService } from '../services/DataProcessService.js'; import { jobQueue } from '../../../../common/jobs/index.js'; import * as xlsx from 'xlsx'; import { activityService } from '../../../../common/services/activity.service.js'; /** * 获取用户ID(从JWT Token中获取) */ function getUserId(request: FastifyRequest): string { const userId = (request as any).user?.userId; if (!userId) { throw new Error('User not authenticated'); } return userId; } // ==================== 请求参数类型定义 ==================== interface SessionIdParams { id: string; } interface GetUniqueValuesQuery { column: string; } // ==================== 控制器 ==================== export class SessionController { /** * 上传Excel文件创建Session * * POST /api/v1/dc/tool-c/sessions/upload */ async upload(request: FastifyRequest, reply: FastifyReply) { try { logger.info('[SessionController] 收到文件上传请求'); // 1. 获取multipart数据 const data = await request.file(); if (!data) { return reply.code(400).send({ success: false, error: '未找到上传的文件', }); } const file = data as MultipartFile; const fileName = file.filename; logger.info(`[SessionController] 文件名: ${fileName}`); // 2. 读取文件到Buffer const fileBuffer = await file.toBuffer(); // 3. 验证文件 const validation = dataProcessService.validateFile(fileBuffer, fileName); if (!validation.valid) { return reply.code(400).send({ success: false, error: validation.error, }); } // 4. 获取用户ID const userId = getUserId(request); // 5. 创建Session(Postgres-Only架构 - 异步处理) const sessionResult = await sessionService.createSession( userId, fileName, fileBuffer ); logger.info(`[SessionController] Session创建成功: ${sessionResult.id}, jobId: ${sessionResult.jobId}`); // 埋点:DC 上传数据文件 try { const user = (request as any).user; if (user) { activityService.log( user.tenantId, user.tenantName || null, user.userId || user.id, user.name || null, 'DC', '智能数据清洗', 'USE', `upload:${fileName}, session:${sessionResult.id}`, ); } } catch { /* 埋点失败不影响主业务 */ } // 6. 返回Session信息 + jobId(用于前端轮询) return reply.code(201).send({ success: true, message: 'Session创建成功', data: { sessionId: sessionResult.id, jobId: sessionResult.jobId, // ✅ 返回 jobId 供前端轮询 fileName: sessionResult.fileName, fileSize: dataProcessService.formatFileSize(sessionResult.fileSize), totalRows: sessionResult.totalRows, totalCols: sessionResult.totalCols, columns: sessionResult.columns, expiresAt: sessionResult.expiresAt, createdAt: sessionResult.createdAt, }, }); } catch (error: any) { logger.error(`[SessionController] 文件上传失败: ${error.message}`); return reply.code(500).send({ success: false, error: error.message || '文件上传失败,请重试', }); } } /** * 获取Session信息(只含元数据) * * GET /api/v1/dc/tool-c/sessions/:id */ async getSession( request: FastifyRequest<{ Params: SessionIdParams }>, reply: FastifyReply ) { try { const { id } = request.params; logger.info(`[SessionController] 获取Session: ${id}`); const session = await sessionService.getSession(id); return reply.code(200).send({ success: true, data: { sessionId: session.id, fileName: session.fileName, fileSize: dataProcessService.formatFileSize(session.fileSize), totalRows: session.totalRows, totalCols: session.totalCols, columns: session.columns, encoding: session.encoding, expiresAt: session.expiresAt, createdAt: session.createdAt, updatedAt: session.updatedAt, }, }); } catch (error: any) { logger.error(`[SessionController] 获取Session失败: ${error.message}`); const statusCode = error.message.includes('不存在') || error.message.includes('过期') ? 404 : 500; return reply.code(statusCode).send({ success: false, error: error.message || '获取Session失败', }); } } /** * 获取预览数据(⭐ 已改为全量加载) * * GET /api/v1/dc/tool-c/sessions/:id/preview */ async getPreviewData( request: FastifyRequest<{ Params: SessionIdParams }>, reply: FastifyReply ) { try { const { id } = request.params; logger.info(`[SessionController] 获取预览数据: ${id}`); const result = await sessionService.getPreviewData(id); return reply.code(200).send({ success: true, data: { sessionId: result.id, fileName: result.fileName, totalRows: result.totalRows, totalCols: result.totalCols, columns: result.columns, previewRows: result.previewData.length, previewData: result.previewData, }, }); } catch (error: any) { logger.error(`[SessionController] 获取预览数据失败: ${error.message}`); const statusCode = error.message.includes('不存在') || error.message.includes('过期') ? 404 : 500; return reply.code(statusCode).send({ success: false, error: error.message || '获取预览数据失败', }); } } /** * 获取完整数据 * * GET /api/v1/dc/tool-c/sessions/:id/full */ async getFullData( request: FastifyRequest<{ Params: SessionIdParams }>, reply: FastifyReply ) { try { const { id } = request.params; logger.info(`[SessionController] 获取完整数据: ${id}`); const data = await sessionService.getFullData(id); return reply.code(200).send({ success: true, data: { sessionId: id, totalRows: data.length, data, }, }); } catch (error: any) { logger.error(`[SessionController] 获取完整数据失败: ${error.message}`); const statusCode = error.message.includes('不存在') || error.message.includes('过期') ? 404 : 500; return reply.code(statusCode).send({ success: false, error: error.message || '获取完整数据失败', }); } } /** * 删除Session * * DELETE /api/v1/dc/tool-c/sessions/:id */ async deleteSession( request: FastifyRequest<{ Params: SessionIdParams }>, reply: FastifyReply ) { try { const { id } = request.params; logger.info(`[SessionController] 删除Session: ${id}`); await sessionService.deleteSession(id); return reply.code(200).send({ success: true, message: 'Session删除成功', }); } catch (error: any) { logger.error(`[SessionController] 删除Session失败: ${error.message}`); return reply.code(500).send({ success: false, error: error.message || '删除Session失败', }); } } /** * 更新心跳(延长过期时间) * * POST /api/v1/dc/tool-c/sessions/:id/heartbeat */ async updateHeartbeat( request: FastifyRequest<{ Params: SessionIdParams }>, reply: FastifyReply ) { try { const { id } = request.params; logger.info(`[SessionController] 更新心跳: ${id}`); const newExpiresAt = await sessionService.updateHeartbeat(id); return reply.code(200).send({ success: true, message: '心跳更新成功', data: { sessionId: id, expiresAt: newExpiresAt, }, }); } catch (error: any) { logger.error(`[SessionController] 更新心跳失败: ${error.message}`); const statusCode = error.message.includes('不存在') ? 404 : 500; return reply.code(statusCode).send({ success: false, error: error.message || '更新心跳失败', }); } } /** * ✨ 导出Excel文件(新增) * * GET /api/v1/dc/tool-c/sessions/:id/export */ async exportData( request: FastifyRequest<{ Params: SessionIdParams }>, reply: FastifyReply ) { try { const { id } = request.params; logger.info(`[SessionController] 导出Excel: ${id}`); // 1. 获取Session信息 const session = await sessionService.getSession(id); // 2. 获取完整数据 const data = await sessionService.getFullData(id); // 3. 生成Excel const workbook = xlsx.utils.book_new(); const worksheet = xlsx.utils.json_to_sheet(data); // 设置列宽(自动调整) const colWidths = session.columns.map(col => { const maxLength = Math.max( col.length, ...data.slice(0, 100).map(row => String(row[col] || '').length) ); return { wch: Math.min(maxLength + 2, 50) }; }); worksheet['!cols'] = colWidths; xlsx.utils.book_append_sheet(workbook, worksheet, 'Data'); // 4. 生成Buffer const buffer = xlsx.write(workbook, { type: 'buffer', bookType: 'xlsx', compression: true, }); // 5. 生成文件名(加上_cleaned后缀和时间戳) const timestamp = new Date().toISOString().replace(/[:.]/g, '-').slice(0, 19); const baseFileName = session.fileName.replace(/\.[^/.]+$/, ''); // 去除扩展名 const exportFileName = `${baseFileName}_cleaned_${timestamp}.xlsx`; logger.info(`[SessionController] 导出成功: ${exportFileName}, 大小: ${(buffer.length / 1024).toFixed(2)}KB`); // 6. 返回文件 reply.header('Content-Type', 'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet'); reply.header('Content-Disposition', `attachment; filename="${encodeURIComponent(exportFileName)}"`); reply.header('Content-Length', buffer.length); return reply.send(buffer); } catch (error: any) { logger.error(`[SessionController] 导出Excel失败: ${error.message}`); const statusCode = error.message.includes('不存在') || error.message.includes('过期') ? 404 : 500; return reply.code(statusCode).send({ success: false, error: error.message || '导出Excel失败', }); } } /** * ✨ 获取列的唯一值(用于数值映射) * * GET /api/v1/dc/tool-c/sessions/:id/unique-values?column=xxx */ async getUniqueValues( request: FastifyRequest<{ Params: SessionIdParams; Querystring: GetUniqueValuesQuery }>, reply: FastifyReply ) { try { const { id } = request.params; const { column } = request.query; if (!column) { return reply.code(400).send({ success: false, error: '缺少column参数', }); } logger.info(`[SessionController] 获取唯一值: session=${id}, column=${column}`); // 1. 获取完整数据 const data = await sessionService.getFullData(id); // 2. 提取唯一值(保留NA值,但清理字符串) const values = data.map((row) => row[column]); const cleanedValues = values.map((val) => { if (val === null || val === undefined || val === '') return null; // 如果是字符串,去除首尾空格 return typeof val === 'string' ? val.trim() : val; }); // 3. 去重(✨ 保留null值,但用特殊标记表示) const uniqueSet = Array.from(new Set(cleanedValues)); const hasNA = uniqueSet.includes(null); const naCount = cleanedValues.filter(v => v === null).length; // ✨ 统计NA数量 // 过滤掉null和空白,然后排序 const nonNAValues = uniqueSet .filter((v) => v !== null && v !== '' && v !== '(空白)') .sort(); // 排序,方便查看 // ✨ 如果有NA,添加到数组末尾(用特殊字符串标记) const uniqueValues = hasNA ? [...nonNAValues, '<空值/NA>'] : nonNAValues; logger.info(`[SessionController] 唯一值数量: ${uniqueValues.length} (含NA: ${hasNA}, NA数量: ${naCount})`); // 4. 返回结果 return reply.send({ success: true, data: { column, uniqueValues, count: uniqueValues.length, naCount: hasNA ? naCount : 0, // ✨ 返回NA数量 totalCount: data.length, // ✨ 总行数 }, }); } catch (error: any) { logger.error(`[SessionController] 获取唯一值失败: ${error.message}`); const statusCode = error.message.includes('不存在') || error.message.includes('过期') ? 404 : 500; return reply.code(statusCode).send({ success: false, error: error.message || '获取唯一值失败', }); } } /** * 获取Session状态(Postgres-Only架构) * * 查询任务状态: * - 从 pg-boss 查询 job 状态 * - 从 Session 表查询解析结果 * - 合并返回给前端 * * GET /api/v1/dc/tool-c/sessions/:id/status * Query: jobId (可选,首次上传时提供) */ async getSessionStatus( request: FastifyRequest<{ Params: SessionIdParams; Querystring: { jobId?: string } }>, reply: FastifyReply ) { try { const { id: sessionId } = request.params; const { jobId } = request.query; logger.info(`[SessionController] 获取Session状态: sessionId=${sessionId}, jobId=${jobId}`); // 1. 查询 Session 信息 const session = await sessionService.getSession(sessionId); // 2. 判断解析状态 // - 如果 totalRows 不为 null,说明解析已完成 // - 否则查询 job 状态 if (session.totalRows !== null && session.totalRows !== undefined) { // 解析已完成 logger.info(`[SessionController] Session已解析完成: ${sessionId}`); return reply.code(200).send({ success: true, data: { sessionId, status: 'ready', // ✅ 解析完成 progress: 100, session, }, }); } // 3. 解析中,查询 job 状态 if (!jobId) { // 没有 jobId,可能是旧数据或直接查询 logger.warn(`[SessionController] 没有jobId,Session可能处于pending状态`); return reply.code(200).send({ success: true, data: { sessionId, status: 'processing', // 处理中 progress: 50, // 估算进度 session: { ...session, totalRows: null, totalCols: null, columns: null, }, }, }); } // 4. 从 pg-boss 查询 job 状态 const job = await jobQueue.getJob(jobId); if (!job) { logger.warn(`[SessionController] Job不存在: ${jobId}`); return reply.code(200).send({ success: true, data: { sessionId, status: 'processing', progress: 50, session, }, }); } // 5. 映射 job 状态到前端状态 let status = 'processing'; let progress = 50; switch (job.status) { case 'completed': status = 'ready'; progress = 100; break; case 'failed': status = 'error'; progress = 0; break; case 'processing': status = 'processing'; progress = 70; // 处理中,估算70% break; default: status = 'processing'; progress = 30; // 队列中,估算30% } logger.info(`[SessionController] Job状态: ${job.status}, 前端状态: ${status}`); return reply.code(200).send({ success: true, data: { sessionId, jobId, status, progress, session, }, }); } catch (error: any) { logger.error(`[SessionController] 获取Session状态失败: ${error.message}`); const statusCode = error.message.includes('不存在') || error.message.includes('过期') ? 404 : 500; return reply.code(statusCode).send({ success: false, error: error.message || '获取Session状态失败', }); } } } // ==================== 导出单例实例 ==================== export const sessionController = new SessionController();