Files
AIclinicalresearch/extraction_service/main.py
HaHafeng f01981bf78 feat(dc/tool-c): 完成AI代码生成服务(Day 3 MVP)
核心功能:
- 新增AICodeService(550行):AI代码生成核心服务
- 新增AIController(257行):4个API端点
- 新增dc_tool_c_ai_history表:存储对话历史
- 实现自我修正机制:最多3次智能重试
- 集成LLMFactory:复用通用能力层
- 10个Few-shot示例:覆盖Level 1-4场景

技术优化:
- 修复NaN序列化问题(Python端转None)
- 修复数据传递问题(从Session获取真实数据)
- 优化System Prompt(明确环境信息)
- 调整Few-shot示例(移除import语句)

测试结果:
- 通过率:9/11(81.8%) 达到MVP标准
- 成功场景:缺失值处理、编码、分箱、BMI、筛选、填补、统计、分类
- 待优化:数值清洗、智能去重(已记录技术债务TD-C-006)

API端点:
- POST /api/v1/dc/tool-c/ai/generate(生成代码)
- POST /api/v1/dc/tool-c/ai/execute(执行代码)
- POST /api/v1/dc/tool-c/ai/process(生成并执行,一步到位)
- GET /api/v1/dc/tool-c/ai/history/:sessionId(对话历史)

文档更新:
- 新增Day 3开发完成总结(770行)
- 新增复杂场景优化技术债务(TD-C-006)
- 更新工具C当前状态文档
- 更新技术债务清单

影响范围:
- backend/src/modules/dc/tool-c/*(新增2个文件,更新1个文件)
- backend/scripts/create-tool-c-ai-history-table.mjs(新增)
- backend/prisma/schema.prisma(新增DcToolCAiHistory模型)
- extraction_service/services/dc_executor.py(NaN序列化修复)
- docs/03-业务模块/DC-数据清洗整理/*(5份文档更新)

Breaking Changes: 无

总代码行数:+950行

Refs: #Tool-C-Day3
2025-12-07 16:21:32 +08:00

617 lines
17 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
文档提取微服务 - 主入口
功能:
- PDF文本提取PyMuPDF
- Docx文本提取Mammoth
- Txt文本提取直接读取
- 语言检测
- 健康检查
"""
from fastapi import FastAPI, File, UploadFile, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse
from pydantic import BaseModel
from typing import List, Dict, Any
from loguru import logger
from pathlib import Path
import os
import sys
from datetime import datetime
from dotenv import load_dotenv
# 加载环境变量
load_dotenv()
# 配置日志
logger.remove()
logger.add(
sys.stdout,
format="<green>{time:YYYY-MM-DD HH:mm:ss}</green> | <level>{level: <8}</level> | <cyan>{name}</cyan>:<cyan>{function}</cyan>:<cyan>{line}</cyan> - <level>{message}</level>",
level=os.getenv("LOG_LEVEL", "INFO")
)
# 创建FastAPI应用
app = FastAPI(
title="文档提取微服务",
description="提供PDF、Docx、Txt文档的文本提取服务",
version="1.0.0",
)
# CORS配置
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # 生产环境应该限制具体域名
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# 临时文件目录
TEMP_DIR = Path(os.getenv("TEMP_DIR", "/tmp/extraction_service"))
TEMP_DIR.mkdir(parents=True, exist_ok=True)
# 导入服务模块
from services.pdf_extractor import extract_pdf_pymupdf
from services.pdf_processor import extract_pdf, get_pdf_processing_strategy
from services.language_detector import detect_language, detect_language_detailed
from services.nougat_extractor import check_nougat_available, get_nougat_info
from services.file_utils import detect_file_type, cleanup_temp_file
from services.docx_extractor import extract_docx_mammoth, validate_docx_file
from services.txt_extractor import extract_txt, validate_txt_file
from services.dc_executor import validate_code, execute_pandas_code
# ==================== Pydantic Models ====================
class ValidateCodeRequest(BaseModel):
"""代码验证请求模型"""
code: str
class ExecuteCodeRequest(BaseModel):
"""代码执行请求模型"""
data: List[Dict[str, Any]]
code: str
# ==================== API路由 ====================
@app.get("/")
async def root():
"""根路径"""
return {
"service": "文档提取微服务",
"version": "1.0.0",
"status": "running"
}
@app.get("/api/health")
async def health_check():
"""
健康检查接口
检查项:
- 服务是否运行
- PyMuPDF是否可用
- Nougat是否可用
- 临时目录是否可写
"""
try:
import fitz # PyMuPDF
pymupdf_version = fitz.__version__
pymupdf_available = True
except Exception as e:
pymupdf_version = "unknown"
pymupdf_available = False
logger.warning(f"PyMuPDF不可用: {str(e)}")
# 检查Nougat
nougat_info = get_nougat_info()
# 检查临时目录
temp_dir_writable = TEMP_DIR.exists() and os.access(TEMP_DIR, os.W_OK)
return {
"status": "healthy" if (pymupdf_available and temp_dir_writable) else "degraded",
"checks": {
"pymupdf": {
"available": pymupdf_available,
"version": pymupdf_version
},
"nougat": nougat_info,
"temp_dir": {
"path": str(TEMP_DIR),
"writable": temp_dir_writable
}
},
"timestamp": datetime.now().isoformat()
}
@app.post("/api/extract/pdf")
async def extract_pdf_endpoint(
file: UploadFile = File(...),
method: str = "auto"
):
"""
PDF文本提取接口智能选择方法
Args:
file: 上传的PDF文件
method: 提取方法 ('auto' | 'nougat' | 'pymupdf')
- auto: 自动选择(默认)
- nougat: 强制使用Nougat
- pymupdf: 强制使用PyMuPDF
Returns:
{
"success": true,
"method": "nougat" | "pymupdf",
"reason": "...",
"text": "提取的文本内容",
"metadata": {...}
}
"""
temp_path = None
try:
# 验证文件类型
if not file.filename.lower().endswith('.pdf'):
raise HTTPException(
status_code=400,
detail="文件格式错误只支持PDF文件"
)
# 保存临时文件
temp_path = TEMP_DIR / f"temp_{os.getpid()}_{file.filename}"
logger.info(f"开始处理PDF文件: {file.filename}, 方法={method}")
with open(temp_path, "wb") as f:
content = await file.read()
f.write(content)
file_size = len(content)
logger.info(f"文件大小: {file_size / 1024:.2f} KB")
# 提取文本(使用顺序降级策略)
force_method = None if method == "auto" else method
result = extract_pdf(str(temp_path), force_method=force_method)
if not result["success"]:
raise HTTPException(
status_code=500,
detail=f"PDF提取失败: {result.get('error', 'Unknown error')}"
)
# 添加文件元数据
result["metadata"]["file_size"] = file_size
result["metadata"]["filename"] = file.filename
logger.info(f"PDF提取成功: {file.filename}, "
f"方法={result['method']}, "
f"原因={result.get('reason', 'N/A')}")
return JSONResponse(content=result)
except HTTPException:
raise
except Exception as e:
logger.error(f"PDF提取失败: {str(e)}")
raise HTTPException(
status_code=500,
detail=f"处理失败: {str(e)}"
)
finally:
# 清理临时文件
if temp_path:
cleanup_temp_file(temp_path)
@app.post("/api/detect-language")
async def detect_language_endpoint(file: UploadFile = File(...)):
"""
PDF语言检测接口
Args:
file: 上传的PDF文件
Returns:
{
"language": "chinese" | "english" | "mixed",
"chinese_ratio": 0.65,
"chinese_chars": 3500,
"total_chars": 5000
}
"""
temp_path = None
try:
if not file.filename.lower().endswith('.pdf'):
raise HTTPException(status_code=400, detail="只支持PDF文件")
# 保存临时文件
temp_path = TEMP_DIR / f"temp_{os.getpid()}_{file.filename}"
with open(temp_path, "wb") as f:
content = await file.read()
f.write(content)
# 检测语言
result = detect_language_detailed(str(temp_path))
result["filename"] = file.filename
return JSONResponse(content=result)
except HTTPException:
raise
except Exception as e:
logger.error(f"语言检测失败: {str(e)}")
raise HTTPException(status_code=500, detail=f"检测失败: {str(e)}")
finally:
if temp_path:
cleanup_temp_file(temp_path)
@app.post("/api/pdf-strategy")
async def get_strategy_endpoint(file: UploadFile = File(...)):
"""
获取PDF处理策略不实际提取
Args:
file: 上传的PDF文件
Returns:
{
"detected_language": "chinese" | "english",
"recommended_method": "nougat" | "pymupdf",
"reason": "...",
"nougat_available": true
}
"""
temp_path = None
try:
if not file.filename.lower().endswith('.pdf'):
raise HTTPException(status_code=400, detail="只支持PDF文件")
temp_path = TEMP_DIR / f"temp_{os.getpid()}_{file.filename}"
with open(temp_path, "wb") as f:
content = await file.read()
f.write(content)
# 获取处理策略
result = get_pdf_processing_strategy(str(temp_path))
result["filename"] = file.filename
return JSONResponse(content=result)
except HTTPException:
raise
except Exception as e:
logger.error(f"获取策略失败: {str(e)}")
raise HTTPException(status_code=500, detail=f"失败: {str(e)}")
finally:
if temp_path:
cleanup_temp_file(temp_path)
@app.post("/api/extract/docx")
async def extract_docx_endpoint(file: UploadFile = File(...)):
"""
Docx文档提取接口
Args:
file: 上传的Docx文件
Returns:
{
"success": true,
"method": "mammoth",
"text": "提取的文本内容",
"metadata": {
"char_count": 字符数,
"has_tables": 是否包含表格,
"file_size": 文件大小
}
}
"""
temp_path = None
try:
# 验证文件类型
if not file.filename.lower().endswith('.docx'):
raise HTTPException(
status_code=400,
detail="文件格式错误只支持Docx文件"
)
# 保存临时文件
temp_path = TEMP_DIR / f"temp_{os.getpid()}_{file.filename}"
logger.info(f"开始处理Docx文件: {file.filename}")
with open(temp_path, "wb") as f:
content = await file.read()
f.write(content)
file_size = len(content)
logger.info(f"文件大小: {file_size / 1024:.2f} KB")
# 提取文本
result = extract_docx_mammoth(str(temp_path))
if not result["success"]:
raise HTTPException(
status_code=500,
detail=f"Docx提取失败: {result.get('error', 'Unknown error')}"
)
# 添加文件元数据
result["method"] = "mammoth"
result["metadata"]["filename"] = file.filename
logger.info(f"Docx提取成功: {file.filename}, "
f"字符数={result['metadata']['char_count']}")
return JSONResponse(content=result)
except HTTPException:
raise
except Exception as e:
logger.error(f"Docx提取失败: {str(e)}")
raise HTTPException(
status_code=500,
detail=f"处理失败: {str(e)}"
)
finally:
if temp_path:
cleanup_temp_file(temp_path)
@app.post("/api/extract/txt")
async def extract_txt_endpoint(file: UploadFile = File(...)):
"""
Txt文本文件提取接口
Args:
file: 上传的Txt文件
Returns:
{
"success": true,
"method": "direct",
"text": "文本内容",
"encoding": "utf-8",
"metadata": {
"char_count": 字符数,
"line_count": 行数,
"file_size": 文件大小
}
}
"""
temp_path = None
try:
# 验证文件类型
if not file.filename.lower().endswith('.txt'):
raise HTTPException(
status_code=400,
detail="文件格式错误只支持Txt文件"
)
# 保存临时文件
temp_path = TEMP_DIR / f"temp_{os.getpid()}_{file.filename}"
logger.info(f"开始处理Txt文件: {file.filename}")
with open(temp_path, "wb") as f:
content = await file.read()
f.write(content)
file_size = len(content)
logger.info(f"文件大小: {file_size / 1024:.2f} KB")
# 提取文本
result = extract_txt(str(temp_path))
if not result["success"]:
raise HTTPException(
status_code=500,
detail=f"Txt提取失败: {result.get('error', 'Unknown error')}"
)
# 添加方法标识和文件名
result["method"] = "direct"
result["metadata"]["filename"] = file.filename
logger.info(f"Txt提取成功: {file.filename}, "
f"编码={result['encoding']}, "
f"字符数={result['metadata']['char_count']}")
return JSONResponse(content=result)
except HTTPException:
raise
except Exception as e:
logger.error(f"Txt提取失败: {str(e)}")
raise HTTPException(
status_code=500,
detail=f"处理失败: {str(e)}"
)
finally:
if temp_path:
cleanup_temp_file(temp_path)
@app.post("/api/extract")
async def extract_document(
file: UploadFile = File(...),
file_type: str = None
):
"""
通用文档提取接口
自动检测文件类型并调用相应的提取方法
Args:
file: 上传的文件
file_type: 可选,指定文件类型 ('pdf' | 'docx' | 'txt')
Returns:
提取结果
"""
try:
# 自动检测文件类型
if not file_type:
file_type = detect_file_type(file.filename)
logger.info(f"文件类型: {file_type}, 文件名: {file.filename}")
# 根据类型调用不同的处理函数
if file_type == 'pdf':
return await extract_pdf_endpoint(file)
elif file_type == 'docx':
return await extract_docx_endpoint(file)
elif file_type == 'txt':
return await extract_txt_endpoint(file)
else:
raise HTTPException(
status_code=400,
detail=f"不支持的文件格式: {file_type}仅支持PDF、Docx、Txt"
)
except HTTPException:
raise
except Exception as e:
logger.error(f"文档提取失败: {str(e)}")
raise HTTPException(
status_code=500,
detail=f"处理失败: {str(e)}"
)
# ==================== DC工具C - 代码执行接口 ====================
@app.post("/api/dc/validate")
async def validate_pandas_code(request: ValidateCodeRequest):
"""
DC工具C - Pandas代码安全验证接口
Args:
request: ValidateCodeRequest
- code: str # 待验证的Pandas代码
Returns:
{
"valid": bool,
"errors": List[str],
"warnings": List[str]
}
"""
try:
logger.info(f"开始验证Pandas代码长度: {len(request.code)} 字符")
# 执行AST安全检查
result = validate_code(request.code)
logger.info(
f"代码验证完成: valid={result['valid']}, "
f"errors={len(result['errors'])}, warnings={len(result['warnings'])}"
)
return JSONResponse(content=result)
except HTTPException:
raise
except Exception as e:
logger.error(f"代码验证失败: {str(e)}")
raise HTTPException(
status_code=500,
detail=f"验证失败: {str(e)}"
)
@app.post("/api/dc/execute")
async def execute_pandas_code_endpoint(request: ExecuteCodeRequest):
"""
DC工具C - Pandas代码执行接口
Args:
request: ExecuteCodeRequest
- data: List[Dict] # JSON格式的数据数组对象
- code: str # Pandas代码操作df变量
Returns:
{
"success": bool,
"result_data": List[Dict], # 执行后的数据
"output": str, # 打印输出
"error": str, # 错误信息(如果失败)
"execution_time": float, # 执行时间(秒)
"result_shape": [rows, cols] # 结果形状
}
"""
try:
logger.info(
f"开始执行Pandas代码: "
f"数据行数={len(request.data)}, 代码长度={len(request.code)} 字符"
)
# 执行代码
result = execute_pandas_code(request.data, request.code)
if result["success"]:
logger.info(
f"代码执行成功: "
f"结果shape={result.get('result_shape')}, "
f"耗时={result['execution_time']:.3f}"
)
else:
logger.warning(
f"代码执行失败: {result.get('error', 'Unknown error')}"
)
return JSONResponse(content=result)
except HTTPException:
raise
except Exception as e:
logger.error(f"代码执行接口失败: {str(e)}")
raise HTTPException(
status_code=500,
detail=f"处理失败: {str(e)}"
)
# ==================== 启动配置 ====================
if __name__ == "__main__":
import uvicorn
port = int(os.getenv("SERVICE_PORT", 8000))
host = os.getenv("SERVICE_HOST", "0.0.0.0")
debug = os.getenv("DEBUG", "True").lower() == "true"
logger.info(f"启动文档提取微服务...")
logger.info(f"地址: http://{host}:{port}")
logger.info(f"健康检查: http://{host}:{port}/api/health")
logger.info(f"调试模式: {debug}")
uvicorn.run(
"main:app",
host=host,
port=port,
reload=debug,
log_level="info"
)