Files
AIclinicalresearch/extraction_service/forensics/api.py
HaHafeng f9ed0c2528 feat(rvw): Complete V2.0 Week 3 - Statistical validation extension and UX improvements
Week 3 Development Summary:

- Implement negative sign normalization (6 Unicode variants)

- Enhance T-test validation with smart sample size extraction

- Enhance SE triangle and CI-P consistency validation with subrow support

- Add precise sub-cell highlighting for P-values in multi-line cells

- Add frontend issue type Chinese translations (6 new types)

- Add file format tips for PDF/DOC uploads

Technical improvements:

- Add _clean_statistical_text() in extractor.py

- Add _safe_float() wrapper in validator.py

- Add ForensicsReport.tsx component

- Update ISSUE_TYPE_LABELS translations

Documentation:

- Add 2026-02-18 development record

- Update RVW module status (v5.1)

- Update system status (v5.2)

Status: Week 3 complete, ready for Week 4 testing
Co-authored-by: Cursor <cursoragent@cursor.com>
2026-02-18 18:26:16 +08:00

222 lines
6.6 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
数据侦探模块 - FastAPI 路由
提供 /api/v1/forensics/* 接口
API 端点:
- GET /api/v1/forensics/health - 健康检查
- POST /api/v1/forensics/analyze_docx - 分析 Word 文档
- GET /api/v1/forensics/supported_formats - 获取支持的格式
"""
from fastapi import APIRouter, File, UploadFile, HTTPException
from fastapi.responses import JSONResponse
from loguru import logger
from pathlib import Path
import os
import time
from .types import ForensicsConfig, ForensicsResult, Severity
from .config import (
validate_file_size,
validate_file_extension,
detect_methods,
MAX_FILE_SIZE_BYTES,
ALLOWED_EXTENSIONS,
)
from .extractor import DocxTableExtractor
from .validator import ArithmeticValidator, StatValidator
# 创建路由器
router = APIRouter(prefix="/api/v1/forensics", tags=["forensics"])
# 临时文件目录
TEMP_DIR = Path(os.getenv("TEMP_DIR", "/tmp/extraction_service"))
TEMP_DIR.mkdir(parents=True, exist_ok=True)
@router.get("/health")
async def forensics_health():
"""
数据侦探模块健康检查
"""
try:
# 检查依赖
import docx
import pandas
import scipy
return {
"status": "healthy",
"module": "forensics",
"version": "2.0.0",
"dependencies": {
"python-docx": docx.__version__ if hasattr(docx, '__version__') else "unknown",
"pandas": pandas.__version__,
"scipy": scipy.__version__,
}
}
except ImportError as e:
return {
"status": "degraded",
"module": "forensics",
"error": f"Missing dependency: {e}"
}
@router.post("/analyze_docx")
async def analyze_docx(
file: UploadFile = File(...),
check_level: str = "L1_L2",
tolerance_percent: float = 0.1,
max_table_rows: int = 500
):
"""
分析 Word 文档表格数据
Args:
file: 上传的 .docx 文件
check_level: 验证级别 (L1 / L1_L2)
tolerance_percent: 百分比容错范围
max_table_rows: 单表最大行数
Returns:
ForensicsResult: 分析结果包含表格、HTML、问题列表
"""
temp_path = None
start_time = time.time()
try:
# 1. 验证文件扩展名
is_valid, error_msg = validate_file_extension(file.filename)
if not is_valid:
logger.warning(f"文件格式校验失败: {file.filename} - {error_msg}")
raise HTTPException(status_code=400, detail=error_msg)
# 2. 读取文件内容
content = await file.read()
file_size = len(content)
# 3. 验证文件大小
is_valid, error_msg = validate_file_size(file_size)
if not is_valid:
logger.warning(f"文件大小校验失败: {file.filename} - {error_msg}")
raise HTTPException(status_code=400, detail=error_msg)
logger.info(f"开始分析 Word 文档: {file.filename}, 大小: {file_size/1024:.1f}KB")
# 4. 保存临时文件
temp_path = TEMP_DIR / f"forensics_{os.getpid()}_{file.filename}"
with open(temp_path, "wb") as f:
f.write(content)
# 5. 创建配置
config = ForensicsConfig(
check_level=check_level,
tolerance_percent=tolerance_percent,
max_table_rows=max_table_rows
)
# 6. 提取表格
extractor = DocxTableExtractor(config)
tables, full_text = extractor.extract(str(temp_path))
# 7. 检测统计方法
methods_found = detect_methods(full_text)
logger.info(f"检测到统计方法: {methods_found}")
# 8. L1 算术验证
arithmetic_validator = ArithmeticValidator(config)
for table in tables:
if not table.skipped:
arithmetic_validator.validate(table)
# 9. L2 统计验证(如果启用)
if check_level == "L1_L2":
stat_validator = StatValidator(config)
for table in tables:
if not table.skipped:
stat_validator.validate(table, full_text)
# 10. 统计问题数量
total_issues = 0
error_count = 0
warning_count = 0
for table in tables:
for issue in table.issues:
total_issues += 1
if issue.severity == Severity.ERROR:
error_count += 1
elif issue.severity == Severity.WARNING:
warning_count += 1
execution_time_ms = int((time.time() - start_time) * 1000)
# 11. 构建结果
result = ForensicsResult(
success=True,
methods_found=methods_found,
tables=tables,
total_issues=total_issues,
error_count=error_count,
warning_count=warning_count,
execution_time_ms=execution_time_ms,
error=None,
fallback_available=True
)
logger.info(
f"分析完成: {file.filename}, "
f"表格: {len(tables)}, "
f"问题: {total_issues} (ERROR: {error_count}, WARNING: {warning_count}), "
f"耗时: {execution_time_ms}ms"
)
return JSONResponse(content=result.model_dump(by_alias=True))
except HTTPException:
raise
except Exception as e:
logger.error(f"分析失败: {file.filename} - {str(e)}")
execution_time_ms = int((time.time() - start_time) * 1000)
# 返回失败结果(支持降级)
result = ForensicsResult(
success=False,
methods_found=[],
tables=[],
total_issues=0,
error_count=0,
warning_count=0,
execution_time_ms=execution_time_ms,
error=str(e),
fallback_available=True
)
return JSONResponse(
status_code=500,
content=result.model_dump()
)
finally:
# 清理临时文件
if temp_path and temp_path.exists():
try:
os.remove(temp_path)
except Exception as e:
logger.warning(f"清理临时文件失败: {e}")
@router.get("/supported_formats")
async def supported_formats():
"""
获取支持的文件格式
"""
return {
"formats": list(ALLOWED_EXTENSIONS),
"max_file_size_mb": MAX_FILE_SIZE_BYTES / 1024 / 1024,
"note": "MVP 阶段仅支持 .docx 格式,.doc 文件请先用 Word 另存为 .docx"
}