Files
AIclinicalresearch/extraction_service/main.py

509 lines
14 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
文档提取微服务 - 主入口
功能:
- PDF文本提取PyMuPDF
- Docx文本提取Mammoth
- Txt文本提取直接读取
- 语言检测
- 健康检查
"""
from fastapi import FastAPI, File, UploadFile, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse
from loguru import logger
from pathlib import Path
import os
import sys
from datetime import datetime
from dotenv import load_dotenv
# 加载环境变量
load_dotenv()
# 配置日志
logger.remove()
logger.add(
sys.stdout,
format="<green>{time:YYYY-MM-DD HH:mm:ss}</green> | <level>{level: <8}</level> | <cyan>{name}</cyan>:<cyan>{function}</cyan>:<cyan>{line}</cyan> - <level>{message}</level>",
level=os.getenv("LOG_LEVEL", "INFO")
)
# 创建FastAPI应用
app = FastAPI(
title="文档提取微服务",
description="提供PDF、Docx、Txt文档的文本提取服务",
version="1.0.0",
)
# CORS配置
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # 生产环境应该限制具体域名
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# 临时文件目录
TEMP_DIR = Path(os.getenv("TEMP_DIR", "/tmp/extraction_service"))
TEMP_DIR.mkdir(parents=True, exist_ok=True)
# 导入服务模块
from services.pdf_extractor import extract_pdf_pymupdf
from services.pdf_processor import extract_pdf, get_pdf_processing_strategy
from services.language_detector import detect_language, detect_language_detailed
from services.nougat_extractor import check_nougat_available, get_nougat_info
from services.file_utils import detect_file_type, cleanup_temp_file
from services.docx_extractor import extract_docx_mammoth, validate_docx_file
from services.txt_extractor import extract_txt, validate_txt_file
# ==================== API路由 ====================
@app.get("/")
async def root():
"""根路径"""
return {
"service": "文档提取微服务",
"version": "1.0.0",
"status": "running"
}
@app.get("/api/health")
async def health_check():
"""
健康检查接口
检查项:
- 服务是否运行
- PyMuPDF是否可用
- Nougat是否可用
- 临时目录是否可写
"""
try:
import fitz # PyMuPDF
pymupdf_version = fitz.__version__
pymupdf_available = True
except Exception as e:
pymupdf_version = "unknown"
pymupdf_available = False
logger.warning(f"PyMuPDF不可用: {str(e)}")
# 检查Nougat
nougat_info = get_nougat_info()
# 检查临时目录
temp_dir_writable = TEMP_DIR.exists() and os.access(TEMP_DIR, os.W_OK)
return {
"status": "healthy" if (pymupdf_available and temp_dir_writable) else "degraded",
"checks": {
"pymupdf": {
"available": pymupdf_available,
"version": pymupdf_version
},
"nougat": nougat_info,
"temp_dir": {
"path": str(TEMP_DIR),
"writable": temp_dir_writable
}
},
"timestamp": datetime.now().isoformat()
}
@app.post("/api/extract/pdf")
async def extract_pdf_endpoint(
file: UploadFile = File(...),
method: str = "auto"
):
"""
PDF文本提取接口智能选择方法
Args:
file: 上传的PDF文件
method: 提取方法 ('auto' | 'nougat' | 'pymupdf')
- auto: 自动选择(默认)
- nougat: 强制使用Nougat
- pymupdf: 强制使用PyMuPDF
Returns:
{
"success": true,
"method": "nougat" | "pymupdf",
"reason": "...",
"text": "提取的文本内容",
"metadata": {...}
}
"""
temp_path = None
try:
# 验证文件类型
if not file.filename.lower().endswith('.pdf'):
raise HTTPException(
status_code=400,
detail="文件格式错误只支持PDF文件"
)
# 保存临时文件
temp_path = TEMP_DIR / f"temp_{os.getpid()}_{file.filename}"
logger.info(f"开始处理PDF文件: {file.filename}, 方法={method}")
with open(temp_path, "wb") as f:
content = await file.read()
f.write(content)
file_size = len(content)
logger.info(f"文件大小: {file_size / 1024:.2f} KB")
# 提取文本(使用顺序降级策略)
force_method = None if method == "auto" else method
result = extract_pdf(str(temp_path), force_method=force_method)
if not result["success"]:
raise HTTPException(
status_code=500,
detail=f"PDF提取失败: {result.get('error', 'Unknown error')}"
)
# 添加文件元数据
result["metadata"]["file_size"] = file_size
result["metadata"]["filename"] = file.filename
logger.info(f"PDF提取成功: {file.filename}, "
f"方法={result['method']}, "
f"原因={result.get('reason', 'N/A')}")
return JSONResponse(content=result)
except HTTPException:
raise
except Exception as e:
logger.error(f"PDF提取失败: {str(e)}")
raise HTTPException(
status_code=500,
detail=f"处理失败: {str(e)}"
)
finally:
# 清理临时文件
if temp_path:
cleanup_temp_file(temp_path)
@app.post("/api/detect-language")
async def detect_language_endpoint(file: UploadFile = File(...)):
"""
PDF语言检测接口
Args:
file: 上传的PDF文件
Returns:
{
"language": "chinese" | "english" | "mixed",
"chinese_ratio": 0.65,
"chinese_chars": 3500,
"total_chars": 5000
}
"""
temp_path = None
try:
if not file.filename.lower().endswith('.pdf'):
raise HTTPException(status_code=400, detail="只支持PDF文件")
# 保存临时文件
temp_path = TEMP_DIR / f"temp_{os.getpid()}_{file.filename}"
with open(temp_path, "wb") as f:
content = await file.read()
f.write(content)
# 检测语言
result = detect_language_detailed(str(temp_path))
result["filename"] = file.filename
return JSONResponse(content=result)
except HTTPException:
raise
except Exception as e:
logger.error(f"语言检测失败: {str(e)}")
raise HTTPException(status_code=500, detail=f"检测失败: {str(e)}")
finally:
if temp_path:
cleanup_temp_file(temp_path)
@app.post("/api/pdf-strategy")
async def get_strategy_endpoint(file: UploadFile = File(...)):
"""
获取PDF处理策略不实际提取
Args:
file: 上传的PDF文件
Returns:
{
"detected_language": "chinese" | "english",
"recommended_method": "nougat" | "pymupdf",
"reason": "...",
"nougat_available": true
}
"""
temp_path = None
try:
if not file.filename.lower().endswith('.pdf'):
raise HTTPException(status_code=400, detail="只支持PDF文件")
temp_path = TEMP_DIR / f"temp_{os.getpid()}_{file.filename}"
with open(temp_path, "wb") as f:
content = await file.read()
f.write(content)
# 获取处理策略
result = get_pdf_processing_strategy(str(temp_path))
result["filename"] = file.filename
return JSONResponse(content=result)
except HTTPException:
raise
except Exception as e:
logger.error(f"获取策略失败: {str(e)}")
raise HTTPException(status_code=500, detail=f"失败: {str(e)}")
finally:
if temp_path:
cleanup_temp_file(temp_path)
@app.post("/api/extract/docx")
async def extract_docx_endpoint(file: UploadFile = File(...)):
"""
Docx文档提取接口
Args:
file: 上传的Docx文件
Returns:
{
"success": true,
"method": "mammoth",
"text": "提取的文本内容",
"metadata": {
"char_count": 字符数,
"has_tables": 是否包含表格,
"file_size": 文件大小
}
}
"""
temp_path = None
try:
# 验证文件类型
if not file.filename.lower().endswith('.docx'):
raise HTTPException(
status_code=400,
detail="文件格式错误只支持Docx文件"
)
# 保存临时文件
temp_path = TEMP_DIR / f"temp_{os.getpid()}_{file.filename}"
logger.info(f"开始处理Docx文件: {file.filename}")
with open(temp_path, "wb") as f:
content = await file.read()
f.write(content)
file_size = len(content)
logger.info(f"文件大小: {file_size / 1024:.2f} KB")
# 提取文本
result = extract_docx_mammoth(str(temp_path))
if not result["success"]:
raise HTTPException(
status_code=500,
detail=f"Docx提取失败: {result.get('error', 'Unknown error')}"
)
# 添加文件元数据
result["method"] = "mammoth"
result["metadata"]["filename"] = file.filename
logger.info(f"Docx提取成功: {file.filename}, "
f"字符数={result['metadata']['char_count']}")
return JSONResponse(content=result)
except HTTPException:
raise
except Exception as e:
logger.error(f"Docx提取失败: {str(e)}")
raise HTTPException(
status_code=500,
detail=f"处理失败: {str(e)}"
)
finally:
if temp_path:
cleanup_temp_file(temp_path)
@app.post("/api/extract/txt")
async def extract_txt_endpoint(file: UploadFile = File(...)):
"""
Txt文本文件提取接口
Args:
file: 上传的Txt文件
Returns:
{
"success": true,
"method": "direct",
"text": "文本内容",
"encoding": "utf-8",
"metadata": {
"char_count": 字符数,
"line_count": 行数,
"file_size": 文件大小
}
}
"""
temp_path = None
try:
# 验证文件类型
if not file.filename.lower().endswith('.txt'):
raise HTTPException(
status_code=400,
detail="文件格式错误只支持Txt文件"
)
# 保存临时文件
temp_path = TEMP_DIR / f"temp_{os.getpid()}_{file.filename}"
logger.info(f"开始处理Txt文件: {file.filename}")
with open(temp_path, "wb") as f:
content = await file.read()
f.write(content)
file_size = len(content)
logger.info(f"文件大小: {file_size / 1024:.2f} KB")
# 提取文本
result = extract_txt(str(temp_path))
if not result["success"]:
raise HTTPException(
status_code=500,
detail=f"Txt提取失败: {result.get('error', 'Unknown error')}"
)
# 添加方法标识和文件名
result["method"] = "direct"
result["metadata"]["filename"] = file.filename
logger.info(f"Txt提取成功: {file.filename}, "
f"编码={result['encoding']}, "
f"字符数={result['metadata']['char_count']}")
return JSONResponse(content=result)
except HTTPException:
raise
except Exception as e:
logger.error(f"Txt提取失败: {str(e)}")
raise HTTPException(
status_code=500,
detail=f"处理失败: {str(e)}"
)
finally:
if temp_path:
cleanup_temp_file(temp_path)
@app.post("/api/extract")
async def extract_document(
file: UploadFile = File(...),
file_type: str = None
):
"""
通用文档提取接口
自动检测文件类型并调用相应的提取方法
Args:
file: 上传的文件
file_type: 可选,指定文件类型 ('pdf' | 'docx' | 'txt')
Returns:
提取结果
"""
try:
# 自动检测文件类型
if not file_type:
file_type = detect_file_type(file.filename)
logger.info(f"文件类型: {file_type}, 文件名: {file.filename}")
# 根据类型调用不同的处理函数
if file_type == 'pdf':
return await extract_pdf_endpoint(file)
elif file_type == 'docx':
return await extract_docx_endpoint(file)
elif file_type == 'txt':
return await extract_txt_endpoint(file)
else:
raise HTTPException(
status_code=400,
detail=f"不支持的文件格式: {file_type}仅支持PDF、Docx、Txt"
)
except HTTPException:
raise
except Exception as e:
logger.error(f"文档提取失败: {str(e)}")
raise HTTPException(
status_code=500,
detail=f"处理失败: {str(e)}"
)
# ==================== 启动配置 ====================
if __name__ == "__main__":
import uvicorn
port = int(os.getenv("SERVICE_PORT", 8000))
host = os.getenv("SERVICE_HOST", "0.0.0.0")
debug = os.getenv("DEBUG", "True").lower() == "true"
logger.info(f"启动文档提取微服务...")
logger.info(f"地址: http://{host}:{port}")
logger.info(f"健康检查: http://{host}:{port}/api/health")
logger.info(f"调试模式: {debug}")
uvicorn.run(
"main:app",
host=host,
port=port,
reload=debug,
log_level="info"
)