ASL Tool 3 Development Plan: - Architecture blueprint v1.5 (6 rounds of architecture review, 13 red lines) - M1/M2/M3 sprint checklists (Skeleton Pipeline / HITL Workbench / Dynamic Template Engine) - Code patterns cookbook (9 chapters: Fan-out, Prompt engineering, ACL, SSE dual-track, etc.) - Key patterns: Fan-out with Last Child Wins, Optimistic Locking, teamConcurrency throttling - PKB ACL integration (anti-corruption layer), MinerU Cache-Aside, NOTIFY/LISTEN cross-pod SSE - Data consistency snapshot for long-running extraction tasks Platform capability: - Add distributed Fan-out task pattern development guide (7 patterns + 10 anti-patterns) - Add system-level async architecture risk analysis blueprint - Add PDF table extraction engine design and usage guide (MinerU integration) - Add table extraction source code (TableExtractionManager + MinerU engine) Documentation updates: - Update ASL module status with Tool 3 V2.0 plan readiness - Update system status document (v6.2) with latest milestones - Add V2.0 product requirements, prototypes, and data dictionary specs - Add architecture review documents (4 rounds of review feedback) - Add test PDF files for extraction validation Co-authored-by: Cursor <cursoragent@cursor.com>
629 lines
24 KiB
Python
629 lines
24 KiB
Python
"""
|
||
PDF 表格提取三方对比测试
|
||
|
||
对比方法:
|
||
1. pymupdf4llm — 本地 PDF→Markdown,内置 find_tables()
|
||
2. MinerU Cloud API — VLM 云端解析
|
||
3. DeepSeek LLM — 先用 pymupdf 提取原始文本,再由 LLM 识别并结构化表格
|
||
|
||
测试目标:8 篇医学 PDF 文献的表格提取准确率、效率、输出质量
|
||
"""
|
||
|
||
import os
|
||
import sys
|
||
import json
|
||
import time
|
||
import base64
|
||
import zipfile
|
||
import io
|
||
import re
|
||
from pathlib import Path
|
||
from datetime import datetime
|
||
from typing import Dict, Any, List, Optional
|
||
|
||
# ── 配置 ──────────────────────────────────────────────
|
||
PDF_DIR = Path(__file__).parent.parent / "docs" / "03-业务模块" / "ASL-AI智能文献" / "05-测试文档" / "PDF"
|
||
OUTPUT_DIR = Path(__file__).parent / "test_output" / "pdf_table_extraction"
|
||
|
||
MINERU_API_TOKEN = os.environ.get("MINERU_API_TOKEN", "eyJ0eXBlIjoiSldUIiwiYWxnIjoiSFM1MTIifQ.eyJqdGkiOiIyNjkwMDA1MiIsInJvbCI6IlJPTEVfUkVHSVNURVIiLCJpc3MiOiJPcGVuWExhYiIsImlhdCI6MTc3MTgyNzcxNSwiY2xpZW50SWQiOiJsa3pkeDU3bnZ5MjJqa3BxOXgydyIsInBob25lIjoiMTg2MTEzNDg3MzgiLCJvcGVuSWQiOm51bGwsInV1aWQiOiJlNGZiYTc1Zi0xYjQ0LTQyYzQtYThkMy1mOWM2ZmM3YWM0NDIiLCJlbWFpbCI6ImdvZmVuZzExN0AxNjMuY29tIiwiZXhwIjoxNzc5NjAzNzE1fQ.0OmtAKk7Cs_Lw-iMWJkQO5Pk75K8HE3S0X-WQ83lAuTxv9aLkTcR91rbnOfS39EKthmfLNkNa7RGZY-ezvi2ag")
|
||
MINERU_API_BASE = "https://mineru.net/api/v4"
|
||
|
||
DEEPSEEK_API_KEY = os.environ.get("DEEPSEEK_API_KEY", "sk-7f8cc37a79fa4799860b38fc7ba2e150")
|
||
DEEPSEEK_API_BASE = "https://api.deepseek.com/v1"
|
||
|
||
# ── 辅助函数 ──────────────────────────────────────────
|
||
|
||
def ensure_output_dir():
|
||
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
|
||
for sub in ["pymupdf4llm", "mineru", "deepseek"]:
|
||
(OUTPUT_DIR / sub).mkdir(exist_ok=True)
|
||
|
||
def get_pdf_files() -> List[Path]:
|
||
"""获取所有待测试 PDF"""
|
||
pdfs = sorted(PDF_DIR.glob("*.pdf"))
|
||
print(f"\n📁 找到 {len(pdfs)} 个 PDF 文件:")
|
||
for i, p in enumerate(pdfs, 1):
|
||
print(f" {i}. {p.name} ({p.stat().st_size / 1024:.1f} KB)")
|
||
return pdfs
|
||
|
||
def count_tables_in_markdown(md_text: str) -> int:
|
||
"""统计 Markdown 中的表格数量(通过 | 行模式识别)"""
|
||
lines = md_text.split('\n')
|
||
table_count = 0
|
||
in_table = False
|
||
for line in lines:
|
||
stripped = line.strip()
|
||
if stripped.startswith('|') and stripped.endswith('|'):
|
||
if not in_table:
|
||
table_count += 1
|
||
in_table = True
|
||
else:
|
||
in_table = False
|
||
return table_count
|
||
|
||
def extract_tables_from_markdown(md_text: str) -> List[str]:
|
||
"""从 Markdown 提取所有表格文本块"""
|
||
lines = md_text.split('\n')
|
||
tables = []
|
||
current_table = []
|
||
in_table = False
|
||
|
||
for line in lines:
|
||
stripped = line.strip()
|
||
if stripped.startswith('|') and stripped.endswith('|'):
|
||
current_table.append(stripped)
|
||
in_table = True
|
||
else:
|
||
if in_table and current_table:
|
||
tables.append('\n'.join(current_table))
|
||
current_table = []
|
||
in_table = False
|
||
|
||
if current_table:
|
||
tables.append('\n'.join(current_table))
|
||
|
||
return tables
|
||
|
||
def save_result(method: str, filename: str, content: str):
|
||
"""保存提取结果到文件"""
|
||
safe_name = re.sub(r'[^\w\-.]', '_', Path(filename).stem)
|
||
out_path = OUTPUT_DIR / method / f"{safe_name}.md"
|
||
out_path.write_text(content, encoding='utf-8')
|
||
|
||
|
||
# ══════════════════════════════════════════════════════
|
||
# 方法 1: pymupdf4llm
|
||
# ══════════════════════════════════════════════════════
|
||
|
||
def test_pymupdf4llm(pdf_files: List[Path]) -> Dict[str, Any]:
|
||
"""pymupdf4llm 表格提取测试"""
|
||
import pymupdf4llm
|
||
|
||
print("\n" + "=" * 70)
|
||
print("📋 方法 1: pymupdf4llm 本地提取")
|
||
print("=" * 70)
|
||
|
||
results = {}
|
||
total_start = time.time()
|
||
|
||
for pdf_path in pdf_files:
|
||
name = pdf_path.name
|
||
print(f"\n 处理: {name} ...", end=" ", flush=True)
|
||
start = time.time()
|
||
|
||
try:
|
||
md_text = pymupdf4llm.to_markdown(
|
||
str(pdf_path),
|
||
page_chunks=False,
|
||
show_progress=False,
|
||
)
|
||
elapsed = time.time() - start
|
||
tables = extract_tables_from_markdown(md_text)
|
||
table_count = len(tables)
|
||
|
||
save_result("pymupdf4llm", name, md_text)
|
||
|
||
results[name] = {
|
||
"success": True,
|
||
"time_sec": round(elapsed, 2),
|
||
"table_count": table_count,
|
||
"total_chars": len(md_text),
|
||
"tables_preview": [t[:200] for t in tables[:5]],
|
||
}
|
||
print(f"✅ {table_count} 个表格, {elapsed:.1f}s")
|
||
|
||
except Exception as e:
|
||
elapsed = time.time() - start
|
||
results[name] = {
|
||
"success": False,
|
||
"time_sec": round(elapsed, 2),
|
||
"error": str(e),
|
||
"table_count": 0,
|
||
}
|
||
print(f"❌ 失败: {e}")
|
||
|
||
total_elapsed = time.time() - total_start
|
||
print(f"\n 总耗时: {total_elapsed:.1f}s")
|
||
return {"method": "pymupdf4llm", "total_time": round(total_elapsed, 2), "files": results}
|
||
|
||
|
||
# ══════════════════════════════════════════════════════
|
||
# 方法 2: MinerU Cloud API
|
||
# ══════════════════════════════════════════════════════
|
||
|
||
def test_mineru_api(pdf_files: List[Path]) -> Dict[str, Any]:
|
||
"""MinerU Cloud API 表格提取测试"""
|
||
import requests
|
||
|
||
print("\n" + "=" * 70)
|
||
print("📋 方法 2: MinerU Cloud API (VLM)")
|
||
print("=" * 70)
|
||
|
||
headers = {
|
||
"Content-Type": "application/json",
|
||
"Authorization": f"Bearer {MINERU_API_TOKEN}",
|
||
}
|
||
|
||
results = {}
|
||
total_start = time.time()
|
||
|
||
# Step 1: 请求批量上传 URL
|
||
print("\n Step 1: 请求上传 URL ...")
|
||
files_payload = []
|
||
for i, pdf_path in enumerate(pdf_files):
|
||
safe_id = re.sub(r'[^\w\-.]', '_', pdf_path.stem)
|
||
files_payload.append({
|
||
"name": pdf_path.name,
|
||
"data_id": safe_id,
|
||
})
|
||
|
||
req_body = {
|
||
"files": files_payload,
|
||
"enable_table": True,
|
||
"enable_formula": False,
|
||
"language": "ch",
|
||
"model_version": "vlm",
|
||
}
|
||
|
||
try:
|
||
resp = requests.post(
|
||
f"{MINERU_API_BASE}/file-urls/batch",
|
||
headers=headers,
|
||
json=req_body,
|
||
timeout=30,
|
||
)
|
||
resp_json = resp.json()
|
||
print(f" 状态码: {resp.status_code}")
|
||
print(f" 响应: code={resp_json.get('code')}, msg={resp_json.get('msg')}")
|
||
|
||
if resp_json.get("code") != 0:
|
||
print(f" ❌ 请求失败: {resp_json}")
|
||
return {"method": "mineru_api", "error": resp_json, "files": {}}
|
||
|
||
batch_id = resp_json["data"]["batch_id"]
|
||
file_urls = resp_json["data"]["file_urls"]
|
||
print(f" batch_id: {batch_id}")
|
||
print(f" 获得 {len(file_urls)} 个上传 URL")
|
||
|
||
except Exception as e:
|
||
print(f" ❌ 请求异常: {e}")
|
||
return {"method": "mineru_api", "error": str(e), "files": {}}
|
||
|
||
# Step 2: 上传文件
|
||
print("\n Step 2: 上传 PDF 文件 ...")
|
||
for i, pdf_path in enumerate(pdf_files):
|
||
print(f" 上传 [{i+1}/{len(pdf_files)}] {pdf_path.name} ...", end=" ", flush=True)
|
||
try:
|
||
with open(pdf_path, 'rb') as f:
|
||
upload_resp = requests.put(file_urls[i], data=f, timeout=120)
|
||
if upload_resp.status_code == 200:
|
||
print("✅")
|
||
else:
|
||
print(f"⚠️ 状态码={upload_resp.status_code}")
|
||
except Exception as e:
|
||
print(f"❌ {e}")
|
||
|
||
# Step 3: 轮询等待结果
|
||
print("\n Step 3: 等待解析完成 (轮询中) ...")
|
||
max_wait = 600 # 最长等待 10 分钟
|
||
poll_interval = 10
|
||
elapsed_wait = 0
|
||
all_done = False
|
||
|
||
while elapsed_wait < max_wait:
|
||
time.sleep(poll_interval)
|
||
elapsed_wait += poll_interval
|
||
|
||
try:
|
||
poll_resp = requests.get(
|
||
f"{MINERU_API_BASE}/extract-results/batch/{batch_id}",
|
||
headers=headers,
|
||
timeout=30,
|
||
)
|
||
poll_json = poll_resp.json()
|
||
|
||
if poll_json.get("code") != 0:
|
||
print(f" [{elapsed_wait}s] 查询异常: {poll_json.get('msg')}")
|
||
continue
|
||
|
||
extract_results = poll_json.get("data", {}).get("extract_result", [])
|
||
states = [r.get("state", "unknown") for r in extract_results]
|
||
done_count = states.count("done")
|
||
failed_count = states.count("failed")
|
||
running_count = states.count("running")
|
||
pending_count = states.count("pending")
|
||
|
||
print(f" [{elapsed_wait}s] 完成={done_count}, 运行中={running_count}, 排队={pending_count}, 失败={failed_count}")
|
||
|
||
if done_count + failed_count == len(pdf_files):
|
||
all_done = True
|
||
break
|
||
|
||
except Exception as e:
|
||
print(f" [{elapsed_wait}s] 查询异常: {e}")
|
||
|
||
if not all_done:
|
||
print(f" ⚠️ 超时 ({max_wait}s),部分任务可能未完成")
|
||
|
||
# Step 4: 收集结果
|
||
print("\n Step 4: 收集解析结果 ...")
|
||
try:
|
||
final_resp = requests.get(
|
||
f"{MINERU_API_BASE}/extract-results/batch/{batch_id}",
|
||
headers=headers,
|
||
timeout=30,
|
||
)
|
||
final_json = final_resp.json()
|
||
extract_results = final_json.get("data", {}).get("extract_result", [])
|
||
except Exception as e:
|
||
print(f" ❌ 获取最终结果失败: {e}")
|
||
extract_results = []
|
||
|
||
for i, pdf_path in enumerate(pdf_files):
|
||
name = pdf_path.name
|
||
if i >= len(extract_results):
|
||
results[name] = {"success": False, "error": "未返回结果", "table_count": 0}
|
||
continue
|
||
|
||
result_item = extract_results[i]
|
||
state = result_item.get("state", "unknown")
|
||
|
||
if state == "done":
|
||
zip_url = result_item.get("full_zip_url", "")
|
||
if zip_url:
|
||
try:
|
||
md_content = download_and_extract_markdown(zip_url)
|
||
tables = extract_tables_from_markdown(md_content)
|
||
save_result("mineru", name, md_content)
|
||
|
||
results[name] = {
|
||
"success": True,
|
||
"state": state,
|
||
"table_count": len(tables),
|
||
"total_chars": len(md_content),
|
||
"tables_preview": [t[:200] for t in tables[:5]],
|
||
}
|
||
print(f" {name}: ✅ {len(tables)} 个表格")
|
||
except Exception as e:
|
||
results[name] = {"success": False, "error": str(e), "table_count": 0}
|
||
print(f" {name}: ❌ 下载结果失败: {e}")
|
||
else:
|
||
results[name] = {"success": False, "error": "无下载链接", "table_count": 0}
|
||
else:
|
||
err_msg = result_item.get("err_msg", "")
|
||
results[name] = {"success": False, "state": state, "error": err_msg, "table_count": 0}
|
||
print(f" {name}: ❌ 状态={state}, {err_msg}")
|
||
|
||
total_elapsed = time.time() - total_start
|
||
print(f"\n 总耗时 (含上传+等待): {total_elapsed:.1f}s")
|
||
return {"method": "mineru_api", "batch_id": batch_id, "total_time": round(total_elapsed, 2), "files": results}
|
||
|
||
|
||
def download_and_extract_markdown(zip_url: str) -> str:
|
||
"""下载 MinerU 结果 zip 并提取 Markdown 内容"""
|
||
import requests
|
||
|
||
resp = requests.get(zip_url, timeout=120)
|
||
resp.raise_for_status()
|
||
|
||
md_content = ""
|
||
with zipfile.ZipFile(io.BytesIO(resp.content)) as zf:
|
||
for name in zf.namelist():
|
||
if name.endswith('.md'):
|
||
md_content += zf.read(name).decode('utf-8', errors='replace')
|
||
break # 通常只有一个 .md 文件
|
||
|
||
if not md_content:
|
||
# 如果没有 .md,尝试找 .json
|
||
with zipfile.ZipFile(io.BytesIO(resp.content)) as zf:
|
||
for name in zf.namelist():
|
||
if name.endswith('.json') and 'content_list' in name:
|
||
raw = json.loads(zf.read(name).decode('utf-8'))
|
||
md_parts = []
|
||
for item in raw:
|
||
if item.get("type") == "table":
|
||
md_parts.append(item.get("text", ""))
|
||
md_content = '\n\n'.join(md_parts)
|
||
|
||
return md_content
|
||
|
||
|
||
# ══════════════════════════════════════════════════════
|
||
# 方法 3: DeepSeek LLM 直接提取
|
||
# ══════════════════════════════════════════════════════
|
||
|
||
def test_deepseek_llm(pdf_files: List[Path]) -> Dict[str, Any]:
|
||
"""DeepSeek LLM 表格提取测试: 用 pymupdf 提取原始文本 → DeepSeek 识别表格"""
|
||
import requests
|
||
import fitz # pymupdf
|
||
|
||
print("\n" + "=" * 70)
|
||
print("📋 方法 3: DeepSeek LLM 直接提取")
|
||
print("=" * 70)
|
||
|
||
results = {}
|
||
total_start = time.time()
|
||
|
||
for pdf_path in pdf_files:
|
||
name = pdf_path.name
|
||
print(f"\n 处理: {name} ...", flush=True)
|
||
start = time.time()
|
||
|
||
try:
|
||
# Step A: 用 pymupdf 提取原始文本
|
||
doc = fitz.open(str(pdf_path))
|
||
page_texts = []
|
||
for page_num in range(len(doc)):
|
||
page = doc[page_num]
|
||
text = page.get_text()
|
||
if text.strip():
|
||
page_texts.append(f"=== 第 {page_num + 1} 页 ===\n{text}")
|
||
doc.close()
|
||
|
||
raw_text = '\n\n'.join(page_texts)
|
||
|
||
# 限制文本长度 (DeepSeek 上下文限制)
|
||
if len(raw_text) > 30000:
|
||
raw_text = raw_text[:30000] + "\n\n... [文本已截断] ..."
|
||
|
||
# Step B: 发送到 DeepSeek
|
||
print(f" 原始文本: {len(raw_text)} 字符, 调用 DeepSeek ...", end=" ", flush=True)
|
||
|
||
prompt = """你是一位医学文献数据提取专家。请从以下 PDF 文献的原始文本中,精确识别并提取所有数据表格。
|
||
|
||
要求:
|
||
1. 将每个表格转换为标准 Markdown 表格格式
|
||
2. 保留表格标题(如 Table 1, Table 2 等)
|
||
3. 保留所有数值数据,不要修改任何数字
|
||
4. 如果有合并单元格,尽量用文字说明
|
||
5. 每个表格之间用空行分隔
|
||
6. 如果没有找到表格,请说明"未发现表格"
|
||
|
||
以下是 PDF 文献的原始提取文本:
|
||
|
||
"""
|
||
api_resp = requests.post(
|
||
f"{DEEPSEEK_API_BASE}/chat/completions",
|
||
headers={
|
||
"Authorization": f"Bearer {DEEPSEEK_API_KEY}",
|
||
"Content-Type": "application/json",
|
||
},
|
||
json={
|
||
"model": "deepseek-chat",
|
||
"messages": [
|
||
{"role": "system", "content": "你是医学文献表格提取专家,擅长从论文原始文本中精确还原数据表格。输出使用 Markdown 表格格式。"},
|
||
{"role": "user", "content": prompt + raw_text},
|
||
],
|
||
"temperature": 0.1,
|
||
"max_tokens": 8000,
|
||
},
|
||
timeout=120,
|
||
)
|
||
|
||
elapsed = time.time() - start
|
||
|
||
if api_resp.status_code != 200:
|
||
raise Exception(f"API 返回 {api_resp.status_code}: {api_resp.text[:300]}")
|
||
|
||
resp_json = api_resp.json()
|
||
llm_output = resp_json["choices"][0]["message"]["content"]
|
||
tables = extract_tables_from_markdown(llm_output)
|
||
usage = resp_json.get("usage", {})
|
||
|
||
save_result("deepseek", name, llm_output)
|
||
|
||
results[name] = {
|
||
"success": True,
|
||
"time_sec": round(elapsed, 2),
|
||
"table_count": len(tables),
|
||
"total_chars": len(llm_output),
|
||
"input_tokens": usage.get("prompt_tokens", 0),
|
||
"output_tokens": usage.get("completion_tokens", 0),
|
||
"tables_preview": [t[:200] for t in tables[:5]],
|
||
}
|
||
print(f"✅ {len(tables)} 个表格, {elapsed:.1f}s, tokens={usage.get('total_tokens', '?')}")
|
||
|
||
except Exception as e:
|
||
elapsed = time.time() - start
|
||
results[name] = {
|
||
"success": False,
|
||
"time_sec": round(elapsed, 2),
|
||
"error": str(e),
|
||
"table_count": 0,
|
||
}
|
||
print(f"❌ 失败: {e}")
|
||
|
||
total_elapsed = time.time() - total_start
|
||
print(f"\n 总耗时: {total_elapsed:.1f}s")
|
||
return {"method": "deepseek_llm", "total_time": round(total_elapsed, 2), "files": results}
|
||
|
||
|
||
# ══════════════════════════════════════════════════════
|
||
# 综合对比报告
|
||
# ══════════════════════════════════════════════════════
|
||
|
||
def generate_report(all_results: List[Dict[str, Any]], pdf_files: List[Path]):
|
||
"""生成对比报告"""
|
||
print("\n" + "=" * 70)
|
||
print("📊 综合对比报告")
|
||
print("=" * 70)
|
||
|
||
report_lines = []
|
||
report_lines.append(f"# PDF 表格提取三方对比测试报告\n")
|
||
report_lines.append(f"**测试时间**: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
|
||
report_lines.append(f"**测试文件**: {len(pdf_files)} 个医学 PDF 文献\n")
|
||
|
||
# 汇总表
|
||
report_lines.append("\n## 1. 总体对比\n")
|
||
report_lines.append("| 指标 | pymupdf4llm | MinerU API (VLM) | DeepSeek LLM |")
|
||
report_lines.append("|------|-------------|------------------|--------------|")
|
||
|
||
for r in all_results:
|
||
method = r["method"]
|
||
files = r.get("files", {})
|
||
success_count = sum(1 for v in files.values() if v.get("success"))
|
||
total_tables = sum(v.get("table_count", 0) for v in files.values())
|
||
total_time = r.get("total_time", 0)
|
||
|
||
if method == "pymupdf4llm":
|
||
col1 = f"{success_count}/{len(pdf_files)} 成功, {total_tables} 表格, {total_time:.0f}s"
|
||
elif method == "mineru_api":
|
||
col2 = f"{success_count}/{len(pdf_files)} 成功, {total_tables} 表格, {total_time:.0f}s"
|
||
elif method == "deepseek_llm":
|
||
col3 = f"{success_count}/{len(pdf_files)} 成功, {total_tables} 表格, {total_time:.0f}s"
|
||
|
||
report_lines.append(f"| 成功/表格/耗时 | {col1} | {col2} | {col3} |")
|
||
|
||
# 逐文件对比
|
||
report_lines.append("\n## 2. 逐文件对比\n")
|
||
report_lines.append("| 文件 | pymupdf4llm | MinerU API | DeepSeek LLM |")
|
||
report_lines.append("|------|-------------|------------|--------------|")
|
||
|
||
for pdf_path in pdf_files:
|
||
name = pdf_path.name
|
||
short_name = name[:40] + "..." if len(name) > 43 else name
|
||
cells = [short_name]
|
||
|
||
for r in all_results:
|
||
finfo = r.get("files", {}).get(name, {})
|
||
if finfo.get("success"):
|
||
tc = finfo.get("table_count", 0)
|
||
ts = finfo.get("time_sec", 0)
|
||
if ts:
|
||
cells.append(f"{tc} 表格 ({ts:.1f}s)")
|
||
else:
|
||
cells.append(f"{tc} 表格")
|
||
else:
|
||
err = finfo.get("error", "失败")[:30]
|
||
cells.append(f"❌ {err}")
|
||
|
||
report_lines.append(f"| {' | '.join(cells)} |")
|
||
|
||
# 详细结果
|
||
report_lines.append("\n## 3. 方法详情\n")
|
||
for r in all_results:
|
||
method = r["method"]
|
||
report_lines.append(f"\n### {method}\n")
|
||
report_lines.append(f"- 总耗时: {r.get('total_time', 0):.1f}s")
|
||
if r.get("batch_id"):
|
||
report_lines.append(f"- MinerU batch_id: {r['batch_id']}")
|
||
report_lines.append("")
|
||
|
||
for name, info in r.get("files", {}).items():
|
||
report_lines.append(f"**{name}**:")
|
||
if info.get("success"):
|
||
report_lines.append(f" - 表格数: {info['table_count']}")
|
||
report_lines.append(f" - 字符数: {info.get('total_chars', 'N/A')}")
|
||
if info.get("time_sec"):
|
||
report_lines.append(f" - 耗时: {info['time_sec']}s")
|
||
if info.get("input_tokens"):
|
||
report_lines.append(f" - Token: 输入={info['input_tokens']}, 输出={info['output_tokens']}")
|
||
else:
|
||
report_lines.append(f" - 状态: 失败")
|
||
report_lines.append(f" - 错误: {info.get('error', 'N/A')}")
|
||
report_lines.append("")
|
||
|
||
report_text = '\n'.join(report_lines)
|
||
report_path = OUTPUT_DIR / "comparison_report.md"
|
||
report_path.write_text(report_text, encoding='utf-8')
|
||
print(f"\n📄 报告已保存: {report_path}")
|
||
print(report_text)
|
||
|
||
return report_text
|
||
|
||
|
||
# ══════════════════════════════════════════════════════
|
||
# 主函数:支持单独运行每个方法
|
||
# ══════════════════════════════════════════════════════
|
||
|
||
def main():
|
||
"""
|
||
用法:
|
||
python test_pdf_table_extraction.py # 运行全部三个方法
|
||
python test_pdf_table_extraction.py pymupdf # 只运行 pymupdf4llm
|
||
python test_pdf_table_extraction.py mineru # 只运行 MinerU API
|
||
python test_pdf_table_extraction.py deepseek # 只运行 DeepSeek LLM
|
||
python test_pdf_table_extraction.py report # 仅从已保存结果生成报告
|
||
"""
|
||
ensure_output_dir()
|
||
pdf_files = get_pdf_files()
|
||
|
||
if not pdf_files:
|
||
print("❌ 未找到 PDF 文件,请检查路径")
|
||
return
|
||
|
||
mode = sys.argv[1] if len(sys.argv) > 1 else "all"
|
||
all_results = []
|
||
json_path = OUTPUT_DIR / "raw_results.json"
|
||
|
||
# 加载已有结果(增量测试)
|
||
existing = {}
|
||
if json_path.exists():
|
||
try:
|
||
existing = json.loads(json_path.read_text(encoding='utf-8'))
|
||
except:
|
||
pass
|
||
|
||
if mode in ("all", "pymupdf"):
|
||
r1 = test_pymupdf4llm(pdf_files)
|
||
existing["pymupdf4llm"] = r1
|
||
all_results.append(r1)
|
||
|
||
if mode in ("all", "mineru"):
|
||
r2 = test_mineru_api(pdf_files)
|
||
existing["mineru_api"] = r2
|
||
all_results.append(r2)
|
||
|
||
if mode in ("all", "deepseek"):
|
||
r3 = test_deepseek_llm(pdf_files)
|
||
existing["deepseek_llm"] = r3
|
||
all_results.append(r3)
|
||
|
||
# 保存原始 JSON 结果
|
||
json_path.write_text(json.dumps(existing, ensure_ascii=False, indent=2, default=str), encoding='utf-8')
|
||
print(f"\n💾 原始结果已保存: {json_path}")
|
||
|
||
# 生成报告(只有全部三个结果都有时)
|
||
if mode in ("all", "report"):
|
||
report_results = []
|
||
for key in ["pymupdf4llm", "mineru_api", "deepseek_llm"]:
|
||
if key in existing:
|
||
report_results.append(existing[key])
|
||
if len(report_results) == 3:
|
||
generate_report(report_results, pdf_files)
|
||
else:
|
||
print(f"\n⚠️ 需要全部三个方法的结果才能生成对比报告 (当前: {list(existing.keys())})")
|
||
if report_results:
|
||
# 也输出部分报告
|
||
print("\n--- 已有结果摘要 ---")
|
||
for r in report_results:
|
||
m = r["method"]
|
||
files = r.get("files", {})
|
||
success = sum(1 for v in files.values() if v.get("success"))
|
||
tables = sum(v.get("table_count", 0) for v in files.values())
|
||
print(f" {m}: {success}/{len(files)} 成功, {tables} 个表格, {r.get('total_time', 0):.0f}s")
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|