Files
AIclinicalresearch/extraction_service/test_pdf_table_extraction.py
HaHafeng dc6b292308 docs(asl): Complete Tool 3 extraction workbench V2.0 development plan (v1.5)
ASL Tool 3 Development Plan:
- Architecture blueprint v1.5 (6 rounds of architecture review, 13 red lines)
- M1/M2/M3 sprint checklists (Skeleton Pipeline / HITL Workbench / Dynamic Template Engine)
- Code patterns cookbook (9 chapters: Fan-out, Prompt engineering, ACL, SSE dual-track, etc.)
- Key patterns: Fan-out with Last Child Wins, Optimistic Locking, teamConcurrency throttling
- PKB ACL integration (anti-corruption layer), MinerU Cache-Aside, NOTIFY/LISTEN cross-pod SSE
- Data consistency snapshot for long-running extraction tasks

Platform capability:
- Add distributed Fan-out task pattern development guide (7 patterns + 10 anti-patterns)
- Add system-level async architecture risk analysis blueprint
- Add PDF table extraction engine design and usage guide (MinerU integration)
- Add table extraction source code (TableExtractionManager + MinerU engine)

Documentation updates:
- Update ASL module status with Tool 3 V2.0 plan readiness
- Update system status document (v6.2) with latest milestones
- Add V2.0 product requirements, prototypes, and data dictionary specs
- Add architecture review documents (4 rounds of review feedback)
- Add test PDF files for extraction validation

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-02-23 22:49:16 +08:00

629 lines
24 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
PDF 表格提取三方对比测试
对比方法:
1. pymupdf4llm — 本地 PDF→Markdown内置 find_tables()
2. MinerU Cloud API — VLM 云端解析
3. DeepSeek LLM — 先用 pymupdf 提取原始文本,再由 LLM 识别并结构化表格
测试目标8 篇医学 PDF 文献的表格提取准确率、效率、输出质量
"""
import os
import sys
import json
import time
import base64
import zipfile
import io
import re
from pathlib import Path
from datetime import datetime
from typing import Dict, Any, List, Optional
# ── 配置 ──────────────────────────────────────────────
PDF_DIR = Path(__file__).parent.parent / "docs" / "03-业务模块" / "ASL-AI智能文献" / "05-测试文档" / "PDF"
OUTPUT_DIR = Path(__file__).parent / "test_output" / "pdf_table_extraction"
MINERU_API_TOKEN = os.environ.get("MINERU_API_TOKEN", "eyJ0eXBlIjoiSldUIiwiYWxnIjoiSFM1MTIifQ.eyJqdGkiOiIyNjkwMDA1MiIsInJvbCI6IlJPTEVfUkVHSVNURVIiLCJpc3MiOiJPcGVuWExhYiIsImlhdCI6MTc3MTgyNzcxNSwiY2xpZW50SWQiOiJsa3pkeDU3bnZ5MjJqa3BxOXgydyIsInBob25lIjoiMTg2MTEzNDg3MzgiLCJvcGVuSWQiOm51bGwsInV1aWQiOiJlNGZiYTc1Zi0xYjQ0LTQyYzQtYThkMy1mOWM2ZmM3YWM0NDIiLCJlbWFpbCI6ImdvZmVuZzExN0AxNjMuY29tIiwiZXhwIjoxNzc5NjAzNzE1fQ.0OmtAKk7Cs_Lw-iMWJkQO5Pk75K8HE3S0X-WQ83lAuTxv9aLkTcR91rbnOfS39EKthmfLNkNa7RGZY-ezvi2ag")
MINERU_API_BASE = "https://mineru.net/api/v4"
DEEPSEEK_API_KEY = os.environ.get("DEEPSEEK_API_KEY", "sk-7f8cc37a79fa4799860b38fc7ba2e150")
DEEPSEEK_API_BASE = "https://api.deepseek.com/v1"
# ── 辅助函数 ──────────────────────────────────────────
def ensure_output_dir():
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
for sub in ["pymupdf4llm", "mineru", "deepseek"]:
(OUTPUT_DIR / sub).mkdir(exist_ok=True)
def get_pdf_files() -> List[Path]:
"""获取所有待测试 PDF"""
pdfs = sorted(PDF_DIR.glob("*.pdf"))
print(f"\n📁 找到 {len(pdfs)} 个 PDF 文件:")
for i, p in enumerate(pdfs, 1):
print(f" {i}. {p.name} ({p.stat().st_size / 1024:.1f} KB)")
return pdfs
def count_tables_in_markdown(md_text: str) -> int:
"""统计 Markdown 中的表格数量(通过 | 行模式识别)"""
lines = md_text.split('\n')
table_count = 0
in_table = False
for line in lines:
stripped = line.strip()
if stripped.startswith('|') and stripped.endswith('|'):
if not in_table:
table_count += 1
in_table = True
else:
in_table = False
return table_count
def extract_tables_from_markdown(md_text: str) -> List[str]:
"""从 Markdown 提取所有表格文本块"""
lines = md_text.split('\n')
tables = []
current_table = []
in_table = False
for line in lines:
stripped = line.strip()
if stripped.startswith('|') and stripped.endswith('|'):
current_table.append(stripped)
in_table = True
else:
if in_table and current_table:
tables.append('\n'.join(current_table))
current_table = []
in_table = False
if current_table:
tables.append('\n'.join(current_table))
return tables
def save_result(method: str, filename: str, content: str):
"""保存提取结果到文件"""
safe_name = re.sub(r'[^\w\-.]', '_', Path(filename).stem)
out_path = OUTPUT_DIR / method / f"{safe_name}.md"
out_path.write_text(content, encoding='utf-8')
# ══════════════════════════════════════════════════════
# 方法 1: pymupdf4llm
# ══════════════════════════════════════════════════════
def test_pymupdf4llm(pdf_files: List[Path]) -> Dict[str, Any]:
"""pymupdf4llm 表格提取测试"""
import pymupdf4llm
print("\n" + "=" * 70)
print("📋 方法 1: pymupdf4llm 本地提取")
print("=" * 70)
results = {}
total_start = time.time()
for pdf_path in pdf_files:
name = pdf_path.name
print(f"\n 处理: {name} ...", end=" ", flush=True)
start = time.time()
try:
md_text = pymupdf4llm.to_markdown(
str(pdf_path),
page_chunks=False,
show_progress=False,
)
elapsed = time.time() - start
tables = extract_tables_from_markdown(md_text)
table_count = len(tables)
save_result("pymupdf4llm", name, md_text)
results[name] = {
"success": True,
"time_sec": round(elapsed, 2),
"table_count": table_count,
"total_chars": len(md_text),
"tables_preview": [t[:200] for t in tables[:5]],
}
print(f"{table_count} 个表格, {elapsed:.1f}s")
except Exception as e:
elapsed = time.time() - start
results[name] = {
"success": False,
"time_sec": round(elapsed, 2),
"error": str(e),
"table_count": 0,
}
print(f"❌ 失败: {e}")
total_elapsed = time.time() - total_start
print(f"\n 总耗时: {total_elapsed:.1f}s")
return {"method": "pymupdf4llm", "total_time": round(total_elapsed, 2), "files": results}
# ══════════════════════════════════════════════════════
# 方法 2: MinerU Cloud API
# ══════════════════════════════════════════════════════
def test_mineru_api(pdf_files: List[Path]) -> Dict[str, Any]:
"""MinerU Cloud API 表格提取测试"""
import requests
print("\n" + "=" * 70)
print("📋 方法 2: MinerU Cloud API (VLM)")
print("=" * 70)
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {MINERU_API_TOKEN}",
}
results = {}
total_start = time.time()
# Step 1: 请求批量上传 URL
print("\n Step 1: 请求上传 URL ...")
files_payload = []
for i, pdf_path in enumerate(pdf_files):
safe_id = re.sub(r'[^\w\-.]', '_', pdf_path.stem)
files_payload.append({
"name": pdf_path.name,
"data_id": safe_id,
})
req_body = {
"files": files_payload,
"enable_table": True,
"enable_formula": False,
"language": "ch",
"model_version": "vlm",
}
try:
resp = requests.post(
f"{MINERU_API_BASE}/file-urls/batch",
headers=headers,
json=req_body,
timeout=30,
)
resp_json = resp.json()
print(f" 状态码: {resp.status_code}")
print(f" 响应: code={resp_json.get('code')}, msg={resp_json.get('msg')}")
if resp_json.get("code") != 0:
print(f" ❌ 请求失败: {resp_json}")
return {"method": "mineru_api", "error": resp_json, "files": {}}
batch_id = resp_json["data"]["batch_id"]
file_urls = resp_json["data"]["file_urls"]
print(f" batch_id: {batch_id}")
print(f" 获得 {len(file_urls)} 个上传 URL")
except Exception as e:
print(f" ❌ 请求异常: {e}")
return {"method": "mineru_api", "error": str(e), "files": {}}
# Step 2: 上传文件
print("\n Step 2: 上传 PDF 文件 ...")
for i, pdf_path in enumerate(pdf_files):
print(f" 上传 [{i+1}/{len(pdf_files)}] {pdf_path.name} ...", end=" ", flush=True)
try:
with open(pdf_path, 'rb') as f:
upload_resp = requests.put(file_urls[i], data=f, timeout=120)
if upload_resp.status_code == 200:
print("")
else:
print(f"⚠️ 状态码={upload_resp.status_code}")
except Exception as e:
print(f"{e}")
# Step 3: 轮询等待结果
print("\n Step 3: 等待解析完成 (轮询中) ...")
max_wait = 600 # 最长等待 10 分钟
poll_interval = 10
elapsed_wait = 0
all_done = False
while elapsed_wait < max_wait:
time.sleep(poll_interval)
elapsed_wait += poll_interval
try:
poll_resp = requests.get(
f"{MINERU_API_BASE}/extract-results/batch/{batch_id}",
headers=headers,
timeout=30,
)
poll_json = poll_resp.json()
if poll_json.get("code") != 0:
print(f" [{elapsed_wait}s] 查询异常: {poll_json.get('msg')}")
continue
extract_results = poll_json.get("data", {}).get("extract_result", [])
states = [r.get("state", "unknown") for r in extract_results]
done_count = states.count("done")
failed_count = states.count("failed")
running_count = states.count("running")
pending_count = states.count("pending")
print(f" [{elapsed_wait}s] 完成={done_count}, 运行中={running_count}, 排队={pending_count}, 失败={failed_count}")
if done_count + failed_count == len(pdf_files):
all_done = True
break
except Exception as e:
print(f" [{elapsed_wait}s] 查询异常: {e}")
if not all_done:
print(f" ⚠️ 超时 ({max_wait}s),部分任务可能未完成")
# Step 4: 收集结果
print("\n Step 4: 收集解析结果 ...")
try:
final_resp = requests.get(
f"{MINERU_API_BASE}/extract-results/batch/{batch_id}",
headers=headers,
timeout=30,
)
final_json = final_resp.json()
extract_results = final_json.get("data", {}).get("extract_result", [])
except Exception as e:
print(f" ❌ 获取最终结果失败: {e}")
extract_results = []
for i, pdf_path in enumerate(pdf_files):
name = pdf_path.name
if i >= len(extract_results):
results[name] = {"success": False, "error": "未返回结果", "table_count": 0}
continue
result_item = extract_results[i]
state = result_item.get("state", "unknown")
if state == "done":
zip_url = result_item.get("full_zip_url", "")
if zip_url:
try:
md_content = download_and_extract_markdown(zip_url)
tables = extract_tables_from_markdown(md_content)
save_result("mineru", name, md_content)
results[name] = {
"success": True,
"state": state,
"table_count": len(tables),
"total_chars": len(md_content),
"tables_preview": [t[:200] for t in tables[:5]],
}
print(f" {name}: ✅ {len(tables)} 个表格")
except Exception as e:
results[name] = {"success": False, "error": str(e), "table_count": 0}
print(f" {name}: ❌ 下载结果失败: {e}")
else:
results[name] = {"success": False, "error": "无下载链接", "table_count": 0}
else:
err_msg = result_item.get("err_msg", "")
results[name] = {"success": False, "state": state, "error": err_msg, "table_count": 0}
print(f" {name}: ❌ 状态={state}, {err_msg}")
total_elapsed = time.time() - total_start
print(f"\n 总耗时 (含上传+等待): {total_elapsed:.1f}s")
return {"method": "mineru_api", "batch_id": batch_id, "total_time": round(total_elapsed, 2), "files": results}
def download_and_extract_markdown(zip_url: str) -> str:
"""下载 MinerU 结果 zip 并提取 Markdown 内容"""
import requests
resp = requests.get(zip_url, timeout=120)
resp.raise_for_status()
md_content = ""
with zipfile.ZipFile(io.BytesIO(resp.content)) as zf:
for name in zf.namelist():
if name.endswith('.md'):
md_content += zf.read(name).decode('utf-8', errors='replace')
break # 通常只有一个 .md 文件
if not md_content:
# 如果没有 .md尝试找 .json
with zipfile.ZipFile(io.BytesIO(resp.content)) as zf:
for name in zf.namelist():
if name.endswith('.json') and 'content_list' in name:
raw = json.loads(zf.read(name).decode('utf-8'))
md_parts = []
for item in raw:
if item.get("type") == "table":
md_parts.append(item.get("text", ""))
md_content = '\n\n'.join(md_parts)
return md_content
# ══════════════════════════════════════════════════════
# 方法 3: DeepSeek LLM 直接提取
# ══════════════════════════════════════════════════════
def test_deepseek_llm(pdf_files: List[Path]) -> Dict[str, Any]:
"""DeepSeek LLM 表格提取测试: 用 pymupdf 提取原始文本 → DeepSeek 识别表格"""
import requests
import fitz # pymupdf
print("\n" + "=" * 70)
print("📋 方法 3: DeepSeek LLM 直接提取")
print("=" * 70)
results = {}
total_start = time.time()
for pdf_path in pdf_files:
name = pdf_path.name
print(f"\n 处理: {name} ...", flush=True)
start = time.time()
try:
# Step A: 用 pymupdf 提取原始文本
doc = fitz.open(str(pdf_path))
page_texts = []
for page_num in range(len(doc)):
page = doc[page_num]
text = page.get_text()
if text.strip():
page_texts.append(f"=== 第 {page_num + 1} 页 ===\n{text}")
doc.close()
raw_text = '\n\n'.join(page_texts)
# 限制文本长度 (DeepSeek 上下文限制)
if len(raw_text) > 30000:
raw_text = raw_text[:30000] + "\n\n... [文本已截断] ..."
# Step B: 发送到 DeepSeek
print(f" 原始文本: {len(raw_text)} 字符, 调用 DeepSeek ...", end=" ", flush=True)
prompt = """你是一位医学文献数据提取专家。请从以下 PDF 文献的原始文本中,精确识别并提取所有数据表格。
要求:
1. 将每个表格转换为标准 Markdown 表格格式
2. 保留表格标题(如 Table 1, Table 2 等)
3. 保留所有数值数据,不要修改任何数字
4. 如果有合并单元格,尽量用文字说明
5. 每个表格之间用空行分隔
6. 如果没有找到表格,请说明"未发现表格"
以下是 PDF 文献的原始提取文本:
"""
api_resp = requests.post(
f"{DEEPSEEK_API_BASE}/chat/completions",
headers={
"Authorization": f"Bearer {DEEPSEEK_API_KEY}",
"Content-Type": "application/json",
},
json={
"model": "deepseek-chat",
"messages": [
{"role": "system", "content": "你是医学文献表格提取专家,擅长从论文原始文本中精确还原数据表格。输出使用 Markdown 表格格式。"},
{"role": "user", "content": prompt + raw_text},
],
"temperature": 0.1,
"max_tokens": 8000,
},
timeout=120,
)
elapsed = time.time() - start
if api_resp.status_code != 200:
raise Exception(f"API 返回 {api_resp.status_code}: {api_resp.text[:300]}")
resp_json = api_resp.json()
llm_output = resp_json["choices"][0]["message"]["content"]
tables = extract_tables_from_markdown(llm_output)
usage = resp_json.get("usage", {})
save_result("deepseek", name, llm_output)
results[name] = {
"success": True,
"time_sec": round(elapsed, 2),
"table_count": len(tables),
"total_chars": len(llm_output),
"input_tokens": usage.get("prompt_tokens", 0),
"output_tokens": usage.get("completion_tokens", 0),
"tables_preview": [t[:200] for t in tables[:5]],
}
print(f"{len(tables)} 个表格, {elapsed:.1f}s, tokens={usage.get('total_tokens', '?')}")
except Exception as e:
elapsed = time.time() - start
results[name] = {
"success": False,
"time_sec": round(elapsed, 2),
"error": str(e),
"table_count": 0,
}
print(f"❌ 失败: {e}")
total_elapsed = time.time() - total_start
print(f"\n 总耗时: {total_elapsed:.1f}s")
return {"method": "deepseek_llm", "total_time": round(total_elapsed, 2), "files": results}
# ══════════════════════════════════════════════════════
# 综合对比报告
# ══════════════════════════════════════════════════════
def generate_report(all_results: List[Dict[str, Any]], pdf_files: List[Path]):
"""生成对比报告"""
print("\n" + "=" * 70)
print("📊 综合对比报告")
print("=" * 70)
report_lines = []
report_lines.append(f"# PDF 表格提取三方对比测试报告\n")
report_lines.append(f"**测试时间**: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
report_lines.append(f"**测试文件**: {len(pdf_files)} 个医学 PDF 文献\n")
# 汇总表
report_lines.append("\n## 1. 总体对比\n")
report_lines.append("| 指标 | pymupdf4llm | MinerU API (VLM) | DeepSeek LLM |")
report_lines.append("|------|-------------|------------------|--------------|")
for r in all_results:
method = r["method"]
files = r.get("files", {})
success_count = sum(1 for v in files.values() if v.get("success"))
total_tables = sum(v.get("table_count", 0) for v in files.values())
total_time = r.get("total_time", 0)
if method == "pymupdf4llm":
col1 = f"{success_count}/{len(pdf_files)} 成功, {total_tables} 表格, {total_time:.0f}s"
elif method == "mineru_api":
col2 = f"{success_count}/{len(pdf_files)} 成功, {total_tables} 表格, {total_time:.0f}s"
elif method == "deepseek_llm":
col3 = f"{success_count}/{len(pdf_files)} 成功, {total_tables} 表格, {total_time:.0f}s"
report_lines.append(f"| 成功/表格/耗时 | {col1} | {col2} | {col3} |")
# 逐文件对比
report_lines.append("\n## 2. 逐文件对比\n")
report_lines.append("| 文件 | pymupdf4llm | MinerU API | DeepSeek LLM |")
report_lines.append("|------|-------------|------------|--------------|")
for pdf_path in pdf_files:
name = pdf_path.name
short_name = name[:40] + "..." if len(name) > 43 else name
cells = [short_name]
for r in all_results:
finfo = r.get("files", {}).get(name, {})
if finfo.get("success"):
tc = finfo.get("table_count", 0)
ts = finfo.get("time_sec", 0)
if ts:
cells.append(f"{tc} 表格 ({ts:.1f}s)")
else:
cells.append(f"{tc} 表格")
else:
err = finfo.get("error", "失败")[:30]
cells.append(f"{err}")
report_lines.append(f"| {' | '.join(cells)} |")
# 详细结果
report_lines.append("\n## 3. 方法详情\n")
for r in all_results:
method = r["method"]
report_lines.append(f"\n### {method}\n")
report_lines.append(f"- 总耗时: {r.get('total_time', 0):.1f}s")
if r.get("batch_id"):
report_lines.append(f"- MinerU batch_id: {r['batch_id']}")
report_lines.append("")
for name, info in r.get("files", {}).items():
report_lines.append(f"**{name}**:")
if info.get("success"):
report_lines.append(f" - 表格数: {info['table_count']}")
report_lines.append(f" - 字符数: {info.get('total_chars', 'N/A')}")
if info.get("time_sec"):
report_lines.append(f" - 耗时: {info['time_sec']}s")
if info.get("input_tokens"):
report_lines.append(f" - Token: 输入={info['input_tokens']}, 输出={info['output_tokens']}")
else:
report_lines.append(f" - 状态: 失败")
report_lines.append(f" - 错误: {info.get('error', 'N/A')}")
report_lines.append("")
report_text = '\n'.join(report_lines)
report_path = OUTPUT_DIR / "comparison_report.md"
report_path.write_text(report_text, encoding='utf-8')
print(f"\n📄 报告已保存: {report_path}")
print(report_text)
return report_text
# ══════════════════════════════════════════════════════
# 主函数:支持单独运行每个方法
# ══════════════════════════════════════════════════════
def main():
"""
用法:
python test_pdf_table_extraction.py # 运行全部三个方法
python test_pdf_table_extraction.py pymupdf # 只运行 pymupdf4llm
python test_pdf_table_extraction.py mineru # 只运行 MinerU API
python test_pdf_table_extraction.py deepseek # 只运行 DeepSeek LLM
python test_pdf_table_extraction.py report # 仅从已保存结果生成报告
"""
ensure_output_dir()
pdf_files = get_pdf_files()
if not pdf_files:
print("❌ 未找到 PDF 文件,请检查路径")
return
mode = sys.argv[1] if len(sys.argv) > 1 else "all"
all_results = []
json_path = OUTPUT_DIR / "raw_results.json"
# 加载已有结果(增量测试)
existing = {}
if json_path.exists():
try:
existing = json.loads(json_path.read_text(encoding='utf-8'))
except:
pass
if mode in ("all", "pymupdf"):
r1 = test_pymupdf4llm(pdf_files)
existing["pymupdf4llm"] = r1
all_results.append(r1)
if mode in ("all", "mineru"):
r2 = test_mineru_api(pdf_files)
existing["mineru_api"] = r2
all_results.append(r2)
if mode in ("all", "deepseek"):
r3 = test_deepseek_llm(pdf_files)
existing["deepseek_llm"] = r3
all_results.append(r3)
# 保存原始 JSON 结果
json_path.write_text(json.dumps(existing, ensure_ascii=False, indent=2, default=str), encoding='utf-8')
print(f"\n💾 原始结果已保存: {json_path}")
# 生成报告(只有全部三个结果都有时)
if mode in ("all", "report"):
report_results = []
for key in ["pymupdf4llm", "mineru_api", "deepseek_llm"]:
if key in existing:
report_results.append(existing[key])
if len(report_results) == 3:
generate_report(report_results, pdf_files)
else:
print(f"\n⚠️ 需要全部三个方法的结果才能生成对比报告 (当前: {list(existing.keys())})")
if report_results:
# 也输出部分报告
print("\n--- 已有结果摘要 ---")
for r in report_results:
m = r["method"]
files = r.get("files", {})
success = sum(1 for v in files.values() if v.get("success"))
tables = sum(v.get("table_count", 0) for v in files.values())
print(f" {m}: {success}/{len(files)} 成功, {tables} 个表格, {r.get('total_time', 0):.0f}s")
if __name__ == "__main__":
main()