Files
AIclinicalresearch/backend/src/modules/dc/tool-b/controllers/ExtractionController.ts
HaHafeng d595037316 feat(admin): Complete tenant management and module access control system
Major Features:
- Tenant management CRUD (list, create, edit, delete, module configuration)
- Dynamic module management system (modules table with 8 modules)
- Multi-tenant module permission merging (ModuleService)
- Module access control middleware (requireModule)
- User module permission API (GET /api/v1/auth/me/modules)
- Frontend module permission filtering (HomePage + TopNavigation)

Module Integration:
- RVW module integrated with PromptService (editorial + methodology)
- All modules (RVW/PKB/ASL/DC) added authenticate + requireModule middleware
- Fixed ReviewTask foreign key constraint (cross-schema issue)
- Removed all MOCK_USER_ID, unified to request.user?.userId

Prompt Management Enhancements:
- Module names displayed in Chinese (RVW -> 智能审稿)
- Enhanced version history with view content and rollback features
- List page shows both activeVersion and draftVersion columns

Database Changes:
- Added platform_schema.modules table
- Modified tenant_modules table (added index and UUID)
- Removed ReviewTask foreign key to public.users (cross-schema fix)
- Seeded 8 modules: RVW, PKB, ASL, DC, IIT, AIA, SSA, ST

Documentation Updates:
- Updated ADMIN module development status
- Updated TODO checklist (89% progress)
- Updated Prompt management plan (Phase 3.5.5 completed)
- Added module authentication specification

Files Changed: 80+
Status: All features tested and verified locally
Next: User management module development
2026-01-13 07:34:30 +08:00

694 lines
21 KiB
TypeScript
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
/**
* DC模块 - 提取控制器
*
* API端点
* - POST /api/v1/dc/tool-b/health-check - 健康检查
* - GET /api/v1/dc/tool-b/templates - 获取模板列表
* - POST /api/v1/dc/tool-b/tasks - 创建提取任务
* - GET /api/v1/dc/tool-b/tasks/:taskId/progress - 查询任务进度
* - GET /api/v1/dc/tool-b/tasks/:taskId/items - 获取验证网格数据
* - POST /api/v1/dc/tool-b/items/:itemId/resolve - 裁决冲突
*
* 平台能力复用:
* - ✅ logger: 日志记录
* - ✅ prisma: 数据库操作
* - ✅ storage: 文件操作
* - ✅ jobQueue: 异步任务
*/
import { FastifyRequest, FastifyReply } from 'fastify';
import { healthCheckService } from '../services/HealthCheckService.js';
import { templateService } from '../services/TemplateService.js';
import { dualModelExtractionService } from '../services/DualModelExtractionService.js';
import { conflictDetectionService } from '../services/ConflictDetectionService.js';
import { storage } from '../../../../common/storage/index.js';
import { logger } from '../../../../common/logging/index.js';
import { prisma } from '../../../../config/database.js';
import { jobQueue } from '../../../../common/jobs/index.js';
import { splitIntoChunks, recommendChunkSize } from '../../../../common/jobs/utils.js';
import * as xlsx from 'xlsx';
/**
* 获取用户ID从JWT Token中获取
*/
function getUserId(request: FastifyRequest): string {
const userId = (request as any).user?.userId;
if (!userId) {
throw new Error('User not authenticated');
}
return userId;
}
export class ExtractionController {
/**
* 文件上传
* POST /upload
*/
async uploadFile(request: FastifyRequest, reply: FastifyReply) {
try {
const data = await request.file();
if (!data) {
return reply.code(400).send({
success: false,
error: 'No file uploaded'
});
}
const userId = getUserId(request);
const buffer = await data.toBuffer();
const originalFilename = data.filename;
const timestamp = Date.now();
const fileKey = `dc/tool-b/${userId}/${timestamp}_${originalFilename}`;
logger.info('[API] File upload request', {
filename: originalFilename,
size: buffer.length,
userId
});
// 解析Excel文件获取列名和行数
const workbook = xlsx.read(buffer, { type: 'buffer' });
const sheetName = workbook.SheetNames[0];
const worksheet = workbook.Sheets[sheetName];
const jsonData = xlsx.utils.sheet_to_json<Record<string, any>>(worksheet);
// 获取列名从第一行数据的keys
const columns = jsonData.length > 0 ? Object.keys(jsonData[0]) : [];
const totalRows = jsonData.length;
logger.info('[API] Excel parsed', { columns, totalRows });
// 上传到storage
const url = await storage.upload(fileKey, buffer);
logger.info('[API] File uploaded successfully', { fileKey, url });
return reply.code(200).send({
success: true,
data: {
fileKey,
url,
filename: originalFilename,
size: buffer.length,
totalRows,
columns
}
});
} catch (error) {
logger.error('[API] File upload failed', { error });
return reply.code(500).send({
success: false,
error: String(error)
});
}
}
/**
* 健康检查
* POST /health-check
*/
async healthCheck(request: FastifyRequest<{
Body: {
fileKey: string;
columnName: string;
}
}>, reply: FastifyReply) {
try {
const { fileKey, columnName } = request.body;
const userId = getUserId(request); // TODO: 从auth middleware获取
logger.info('[API] Health check request', { fileKey, columnName, userId });
// 参数验证
if (!fileKey || !columnName) {
logger.error('[API] Missing required parameters', { fileKey, columnName });
return reply.code(400).send({
success: false,
error: 'Missing required parameters: fileKey or columnName'
});
}
const result = await healthCheckService.check(fileKey, columnName, userId);
logger.info('[API] Health check success', { status: result.status });
return reply.code(200).send({
success: true,
data: result
});
} catch (error: any) {
logger.error('[API] Health check failed', {
error: error.message,
stack: error.stack,
fileKey: request.body?.fileKey,
columnName: request.body?.columnName
});
return reply.code(500).send({
success: false,
error: error.message || String(error),
details: process.env.NODE_ENV === 'development' ? error.stack : undefined
});
}
}
/**
* 获取模板列表
* GET /templates
*/
async getTemplates(request: FastifyRequest, reply: FastifyReply) {
try {
logger.info('[API] Get templates request');
const templates = await templateService.getAllTemplates();
return reply.code(200).send({
success: true,
data: { templates }
});
} catch (error) {
logger.error('[API] Get templates failed', { error });
return reply.code(500).send({
success: false,
error: String(error)
});
}
}
/**
* 创建提取任务
* POST /tasks
*/
async createTask(request: FastifyRequest<{
Body: {
projectName: string;
sourceFileKey: string;
textColumn: string;
diseaseType: string;
reportType: string;
modelA?: string;
modelB?: string;
}
}>, reply: FastifyReply) {
try {
logger.info('[API] ===== CREATE TASK START =====');
const {
projectName,
sourceFileKey,
textColumn,
diseaseType,
reportType,
modelA = 'deepseek-v3',
modelB = 'qwen-max'
} = request.body;
const userId = getUserId(request);
logger.info('[API] Create task request', {
userId,
projectName,
sourceFileKey,
textColumn,
diseaseType,
reportType
});
// 1. 获取模板
logger.info('[API] Step 1: Getting template', { diseaseType, reportType });
const template = await templateService.getTemplate(diseaseType, reportType);
if (!template) {
logger.error('[API] Template not found', { diseaseType, reportType });
return reply.code(404).send({
success: false,
error: `Template not found: ${diseaseType}/${reportType}`
});
}
logger.info('[API] Template found', { templateId: template.id });
// 2. 读取Excel文件创建items
logger.info('[API] Step 2: Downloading Excel file', { sourceFileKey });
const fileBuffer = await storage.download(sourceFileKey);
if (!fileBuffer) {
logger.error('[API] File not found in storage', { sourceFileKey });
return reply.code(404).send({
success: false,
error: `File not found: ${sourceFileKey}`
});
}
logger.info('[API] File downloaded', { size: fileBuffer.length });
logger.info('[API] Step 3: Parsing Excel file');
const workbook = xlsx.read(fileBuffer, { type: 'buffer' });
const sheetName = workbook.SheetNames[0];
const worksheet = workbook.Sheets[sheetName];
const data = xlsx.utils.sheet_to_json<Record<string, any>>(worksheet);
logger.info('[API] Excel parsed', { rowCount: data.length });
if (!data[0].hasOwnProperty(textColumn)) {
logger.error('[API] Column not found', {
textColumn,
availableColumns: Object.keys(data[0])
});
return reply.code(400).send({
success: false,
error: `Column '${textColumn}' not found in Excel`
});
}
// 3. 创建任务
logger.info('[API] Step 4: Creating task in database');
const task = await prisma.dCExtractionTask.create({
data: {
userId,
projectName,
sourceFileKey,
textColumn,
diseaseType,
reportType,
targetFields: template.fields as any, // Prisma Json类型
modelA,
modelB,
totalCount: data.length,
status: 'pending'
}
});
logger.info('[API] Task created in database', { taskId: task.id });
// 4. 创建items
logger.info('[API] Step 5: Creating extraction items', { count: data.length });
const itemsData = data.map((row, index) => ({
taskId: task.id,
rowIndex: index + 1,
originalText: String(row[textColumn] || '')
}));
await prisma.dCExtractionItem.createMany({
data: itemsData
});
logger.info('[API] Items created', { count: itemsData.length });
// 5. 智能选择处理模式(✅ Platform-Only架构
const QUEUE_THRESHOLD = 50; // 50条以下直接处理50条以上使用队列
const useQueue = itemsData.length >= QUEUE_THRESHOLD;
if (useQueue) {
// ============================================
// 模式A队列模式≥50条
// ============================================
logger.info('[API] Using queue mode with task splitting', {
totalItems: itemsData.length,
threshold: QUEUE_THRESHOLD
});
// 获取所有创建的 items需要获取ID
const items = await prisma.dCExtractionItem.findMany({
where: { taskId: task.id },
orderBy: { rowIndex: 'asc' }
});
// 推荐批次大小
const chunkSize = recommendChunkSize('extraction', items.length);
const chunks = splitIntoChunks(items, chunkSize);
logger.info('[API] Task splitting completed', {
totalItems: items.length,
chunkSize,
totalBatches: chunks.length
});
// 更新任务状态
await prisma.dCExtractionTask.update({
where: { id: task.id },
data: {
status: 'processing',
startedAt: new Date()
}
});
// 推送批次任务到队列
const jobPromises = chunks.map(async (chunk, batchIndex) => {
const itemIds = chunk.map(item => item.id);
return await jobQueue.push('dc_extraction_batch', {
// 业务信息
taskId: task.id,
itemIds,
diseaseType,
reportType,
// ✅ 任务拆分信息(存储在 job.data 中)
batchIndex,
totalBatches: chunks.length,
startIndex: batchIndex * chunkSize,
endIndex: Math.min((batchIndex + 1) * chunkSize, items.length),
// ✅ 进度追踪(初始化)
processedCount: 0,
cleanCount: 0,
conflictCount: 0,
failedCount: 0,
});
});
await Promise.all(jobPromises);
logger.info('[API] All batch jobs pushed to queue', {
taskId: task.id,
totalBatches: chunks.length,
queueType: 'pg-boss'
});
console.log('\n🚀 数据提取任务已启动 (队列模式):');
console.log(` 任务ID: ${task.id}`);
console.log(` 总记录数: ${items.length}`);
console.log(` 批次大小: ${chunkSize} 条/批`);
console.log(` 总批次数: ${chunks.length}`);
console.log(` 队列类型: pg-boss (持久化 + 断点续传)`);
} else {
// ============================================
// 模式B直接模式<50条
// ============================================
logger.info('[API] Using direct mode (small task)', {
totalItems: itemsData.length,
threshold: QUEUE_THRESHOLD
});
// 直接处理(不使用队列,快速响应)
dualModelExtractionService.batchExtract(task.id)
.then(() => {
logger.info('[API] Batch extraction completed successfully', { taskId: task.id });
})
.catch(err => {
logger.error('[API] Batch extraction failed', {
error: err.message,
stack: err.stack,
taskId: task.id
});
});
console.log('\n🚀 数据提取任务已启动 (直接模式):');
console.log(` 任务ID: ${task.id}`);
console.log(` 总记录数: ${itemsData.length}`);
console.log(` 处理模式: 直接处理(快速模式)`);
}
logger.info('[API] Task created', { taskId: task.id, itemCount: data.length });
return reply.code(201).send({
success: true,
data: {
taskId: task.id,
totalCount: data.length,
status: 'pending'
}
});
} catch (error) {
logger.error('[API] Create task failed', { error });
return reply.code(500).send({
success: false,
error: String(error)
});
}
}
/**
* 查询任务进度
* GET /tasks/:taskId/progress
*/
async getTaskProgress(request: FastifyRequest<{
Params: { taskId: string }
}>, reply: FastifyReply) {
try {
const { taskId } = request.params;
logger.info('[API] Get task progress', { taskId });
const task = await prisma.dCExtractionTask.findUnique({
where: { id: taskId }
});
if (!task) {
return reply.code(404).send({
success: false,
error: 'Task not found'
});
}
return reply.code(200).send({
success: true,
data: {
taskId: task.id,
status: task.status,
totalCount: task.totalCount,
processedCount: task.processedCount,
cleanCount: task.cleanCount,
conflictCount: task.conflictCount,
failedCount: task.failedCount,
totalTokens: task.totalTokens,
totalCost: task.totalCost,
progress: task.totalCount > 0 ? Math.round((task.processedCount / task.totalCount) * 100) : 0
}
});
} catch (error) {
logger.error('[API] Get task progress failed', { error });
return reply.code(500).send({
success: false,
error: String(error)
});
}
}
/**
* 获取验证网格数据
* GET /tasks/:taskId/items
*/
async getTaskItems(request: FastifyRequest<{
Params: { taskId: string };
Querystring: { page?: string; limit?: string; status?: string }
}>, reply: FastifyReply) {
try {
const { taskId } = request.params;
const page = parseInt(request.query.page || '1');
const limit = parseInt(request.query.limit || '50');
const statusFilter = request.query.status;
logger.info('[API] Get task items', { taskId, page, limit, statusFilter });
const where: any = { taskId };
if (statusFilter) {
where.status = statusFilter;
}
const [items, total] = await Promise.all([
prisma.dCExtractionItem.findMany({
where,
skip: (page - 1) * limit,
take: limit,
orderBy: { rowIndex: 'asc' }
}),
prisma.dCExtractionItem.count({ where })
]);
return reply.code(200).send({
success: true,
data: {
items: items.map(item => ({
id: item.id,
rowIndex: item.rowIndex,
originalText: item.originalText,
resultA: item.resultA,
resultB: item.resultB,
status: item.status,
conflictFields: item.conflictFields,
finalResult: item.finalResult
})),
pagination: {
page,
limit,
total,
totalPages: Math.ceil(total / limit)
}
}
});
} catch (error) {
logger.error('[API] Get task items failed', { error });
return reply.code(500).send({
success: false,
error: String(error)
});
}
}
/**
* 裁决冲突
* POST /items/:itemId/resolve
*/
async resolveConflict(request: FastifyRequest<{
Params: { itemId: string };
Body: {
field: string;
chosenValue: string;
}
}>, reply: FastifyReply) {
try {
const { itemId } = request.params;
const { field, chosenValue } = request.body;
logger.info('[API] Resolve conflict', { itemId, field });
// 获取当前记录
const item = await prisma.dCExtractionItem.findUnique({
where: { id: itemId }
});
if (!item) {
return reply.code(404).send({
success: false,
error: 'Item not found'
});
}
// 更新finalResult
const finalResult = { ...(item.finalResult as Record<string, string> || {}) };
finalResult[field] = chosenValue;
// 移除已解决的冲突字段
const conflictFields = item.conflictFields.filter(f => f !== field);
// 更新状态
const newStatus = conflictFields.length === 0 ? 'resolved' : 'conflict';
await prisma.dCExtractionItem.update({
where: { id: itemId },
data: {
finalResult,
conflictFields,
status: newStatus,
resolvedAt: conflictFields.length === 0 ? new Date() : null
}
});
logger.info('[API] Conflict resolved', { itemId, field, newStatus });
return reply.code(200).send({
success: true,
data: {
itemId,
status: newStatus,
remainingConflicts: conflictFields.length
}
});
} catch (error) {
logger.error('[API] Resolve conflict failed', { error });
return reply.code(500).send({
success: false,
error: String(error)
});
}
}
/**
* 导出结果
* GET /tasks/:taskId/export
*/
async exportResults(request: FastifyRequest<{
Params: { taskId: string };
}>, reply: FastifyReply) {
try {
const { taskId } = request.params;
logger.info('[API] Export results request', { taskId });
// 获取任务和所有items
const task = await prisma.dCExtractionTask.findUnique({
where: { id: taskId },
include: {
items: {
orderBy: { rowIndex: 'asc' }
}
}
});
if (!task) {
return reply.code(404).send({
success: false,
error: 'Task not found'
});
}
// 创建Excel工作簿
const workbook = xlsx.utils.book_new();
// 🔑 获取字段顺序从targetFields
const targetFields = task.targetFields as { name: string; desc: string }[];
const fieldNames = targetFields.map(f => f.name);
// 构建数据行,按模板字段顺序
const rows = task.items.map(item => {
// 优先使用finalResult如果为空则使用resultA
const finalResult = item.finalResult as Record<string, string> | null;
const resultA = item.resultA as Record<string, string> | null;
const extractedData = finalResult || resultA || {};
// 🔑 按字段顺序构建行对象
const row: Record<string, any> = {
'行号': item.rowIndex,
'原文': item.originalText,
'状态': item.status === 'resolved' ? '已解决' : item.status === 'clean' ? '一致' : '待裁决'
};
// 按模板定义的顺序添加字段
fieldNames.forEach(fieldName => {
row[fieldName] = extractedData[fieldName] || '未提及';
});
return row;
});
// 创建工作表
const worksheet = xlsx.utils.json_to_sheet(rows);
xlsx.utils.book_append_sheet(workbook, worksheet, '提取结果');
// 生成Excel Buffer
const excelBuffer = xlsx.write(workbook, { type: 'buffer', bookType: 'xlsx' });
logger.info('[API] Export results success', { taskId, rowCount: rows.length });
// 返回文件
// 🔑 对文件名进行URL编码以支持中文
const filename = `${task.projectName}_结果.xlsx`;
const encodedFilename = encodeURIComponent(filename);
return reply
.code(200)
.header('Content-Type', 'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet')
.header('Content-Disposition', `attachment; filename*=UTF-8''${encodedFilename}`)
.send(excelBuffer);
} catch (error) {
logger.error('[API] Export results failed', { error });
return reply.code(500).send({
success: false,
error: String(error)
});
}
}
}
// 导出单例
export const extractionController = new ExtractionController();