Files
AIclinicalresearch/extraction_service/forensics/extractor.py
HaHafeng e785969e54 feat(rvw): Implement RVW V2.0 Data Forensics Module - Day 6 StatValidator
Summary:
- Implement L2 Statistical Validator (CI-P consistency, T-test reverse)
- Implement L2.5 Consistency Forensics (SE Triangle, SD>Mean check)
- Add error/warning severity classification with tolerance thresholds
- Support 5+ CI formats parsing (parentheses, brackets, 95% CI prefix)
- Complete Python forensics service (types, config, validator, extractor)

V2.0 Development Progress (Week 2 Day 6):
- Day 1-5: Python service setup, Word table extraction, L1 arithmetic validator
- Day 6: L2 StatValidator + L2.5 consistency forensics (promoted from V2.1)

Test Results:
- Unit tests: 4/4 passed (CI-P, SE Triangle, SD>Mean, T-test)
- Real document tests: 5/5 successful, 2 reasonable WARNINGs

Status: Day 6 completed, ready for Day 7 (Skills Framework)
Co-authored-by: Cursor <cursoragent@cursor.com>
2026-02-17 22:15:27 +08:00

341 lines
11 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
数据侦探模块 - Word 表格提取器
使用 python-docx 解析 Word 文档,提取表格数据并生成 HTML 片段。
功能:
- 解析 Word DOM 结构
- 处理合并单元格Forward Fill 策略)
- 关联表格 Caption向前回溯
- 生成 HTML 片段(含 data-coord 属性)
"""
from docx import Document
from docx.table import Table, _Cell
from docx.text.paragraph import Paragraph
from loguru import logger
from typing import List, Optional, Tuple
import re
from .types import TableData, Issue, Severity, IssueType, CellLocation, ForensicsConfig
from .config import (
MAX_TABLE_ROWS,
MAX_TABLES_PER_DOC,
BASELINE_KEYWORDS,
OUTCOME_KEYWORDS,
)
class DocxTableExtractor:
"""
Word 表格提取器
提取 .docx 文件中的所有表格,处理合并单元格,生成 HTML 片段。
"""
def __init__(self, config: ForensicsConfig):
self.config = config
self.max_table_rows = config.max_table_rows
def extract(self, file_path: str) -> Tuple[List[TableData], str]:
"""
提取 Word 文档中的所有表格
Args:
file_path: .docx 文件路径
Returns:
(tables, full_text): 表格列表和全文文本
"""
logger.info(f"开始提取表格: {file_path}")
try:
doc = Document(file_path)
except Exception as e:
logger.error(f"无法打开 Word 文档: {e}")
raise ValueError(f"无法打开 Word 文档: {e}")
tables: List[TableData] = []
full_text_parts: List[str] = []
# 收集所有段落文本(用于方法检测)
for para in doc.paragraphs:
full_text_parts.append(para.text)
# 遍历文档元素,关联表格和 Caption
table_index = 0
prev_paragraphs: List[str] = []
for element in doc.element.body:
# 段落元素
if element.tag.endswith('p'):
para = Paragraph(element, doc)
prev_paragraphs.append(para.text.strip())
# 只保留最近 3 个段落用于 Caption 匹配
if len(prev_paragraphs) > 3:
prev_paragraphs.pop(0)
# 表格元素
elif element.tag.endswith('tbl'):
if table_index >= MAX_TABLES_PER_DOC:
logger.warning(f"表格数量超过限制 ({MAX_TABLES_PER_DOC}),跳过剩余表格")
break
# 获取 python-docx Table 对象
table = Table(element, doc)
# 提取 Caption
caption = self._find_caption(prev_paragraphs)
# 提取表格数据
table_data = self._extract_table(
table=table,
table_id=f"tbl_{table_index}",
caption=caption
)
tables.append(table_data)
table_index += 1
# 清空前置段落
prev_paragraphs = []
full_text = "\n".join(full_text_parts)
logger.info(f"提取完成: {len(tables)} 个表格, {len(full_text)} 字符")
return tables, full_text
def _find_caption(self, prev_paragraphs: List[str]) -> Optional[str]:
"""
从前置段落中查找表格 Caption
匹配模式:
- "Table 1. xxx""表 1 xxx"
- "Table 1: xxx"
"""
caption_pattern = re.compile(
r"^(Table|表)\s*\d+[\.:\s]",
re.IGNORECASE
)
# 从后向前查找
for para in reversed(prev_paragraphs):
if para and caption_pattern.match(para):
return para
return None
def _extract_table(
self,
table: Table,
table_id: str,
caption: Optional[str]
) -> TableData:
"""
提取单个表格数据
Args:
table: python-docx Table 对象
table_id: 表格 ID
caption: 表格标题
Returns:
TableData 对象
"""
rows = table.rows
row_count = len(rows)
col_count = len(rows[0].cells) if rows else 0
# 检查是否超过行数限制
if row_count > self.max_table_rows:
logger.warning(f"表格 {table_id} 行数 ({row_count}) 超过限制 ({self.max_table_rows}),跳过")
return TableData(
id=table_id,
caption=caption,
type=self._detect_table_type(caption),
row_count=row_count,
col_count=col_count,
html=f"<p class='warning'>表格行数 ({row_count}) 超过限制 ({self.max_table_rows}),已跳过</p>",
data=[],
issues=[
Issue(
severity=Severity.WARNING,
type=IssueType.TABLE_SKIPPED,
message=f"表格行数 ({row_count}) 超过限制 ({self.max_table_rows})",
location=CellLocation(table_id=table_id, row=1, col=1),
evidence={"row_count": row_count, "max_rows": self.max_table_rows}
)
],
skipped=True,
skip_reason=f"行数超限: {row_count} > {self.max_table_rows}"
)
# 提取原始数据(处理合并单元格)
data = self._extract_with_merge_handling(table)
# 生成 HTML
html = self._generate_html(table_id, caption, data)
# 检测表格类型
table_type = self._detect_table_type(caption)
return TableData(
id=table_id,
caption=caption,
type=table_type,
row_count=len(data),
col_count=len(data[0]) if data else 0,
html=html,
data=data,
issues=[],
skipped=False,
skip_reason=None
)
def _extract_with_merge_handling(self, table: Table) -> List[List[str]]:
"""
提取表格数据,处理合并单元格
使用 Forward Fill 策略:
- 水平合并:将值复制到所有合并的单元格
- 垂直合并:将上方单元格的值填充到下方
"""
rows = table.rows
if not rows:
return []
# 首先获取表格的真实维度
num_rows = len(rows)
num_cols = len(rows[0].cells)
# 初始化数据矩阵
data: List[List[str]] = [["" for _ in range(num_cols)] for _ in range(num_rows)]
# 记录每个单元格是否已被处理(用于处理合并单元格)
processed = [[False for _ in range(num_cols)] for _ in range(num_rows)]
for row_idx, row in enumerate(rows):
col_idx = 0
for cell in row.cells:
# 跳过已处理的单元格(合并单元格的一部分)
while col_idx < num_cols and processed[row_idx][col_idx]:
col_idx += 1
if col_idx >= num_cols:
break
# 获取单元格文本
cell_text = self._get_cell_text(cell)
# 检测合并范围
# python-docx 中合并单元格会重复出现同一个 cell 对象
# 我们通过比较 cell._tc 来检测
merge_width = 1
merge_height = 1
# 检测水平合并
for next_col in range(col_idx + 1, num_cols):
if next_col < len(row.cells):
next_cell = row.cells[next_col]
if next_cell._tc is cell._tc:
merge_width += 1
else:
break
# 填充数据
for r in range(row_idx, min(row_idx + merge_height, num_rows)):
for c in range(col_idx, min(col_idx + merge_width, num_cols)):
data[r][c] = cell_text
processed[r][c] = True
col_idx += merge_width
return data
def _get_cell_text(self, cell: _Cell) -> str:
"""
获取单元格文本(合并多个段落)
"""
paragraphs = cell.paragraphs
texts = [p.text.strip() for p in paragraphs]
return " ".join(texts).strip()
def _generate_html(
self,
table_id: str,
caption: Optional[str],
data: List[List[str]]
) -> str:
"""
生成 HTML 片段,包含 data-coord 属性用于前端高亮
"""
if not data:
return f"<table id='{table_id}' class='forensics-table'><tr><td>空表格</td></tr></table>"
html_parts = [f"<table id='{table_id}' class='forensics-table'>"]
# 添加 Caption
if caption:
html_parts.append(f" <caption>{self._escape_html(caption)}</caption>")
# 添加表头(假设第一行是表头)
html_parts.append(" <thead>")
html_parts.append(" <tr>")
for col_idx, cell in enumerate(data[0], start=1):
coord = f"R1C{col_idx}"
html_parts.append(
f' <th data-coord="{coord}">{self._escape_html(cell)}</th>'
)
html_parts.append(" </tr>")
html_parts.append(" </thead>")
# 添加表体
html_parts.append(" <tbody>")
for row_idx, row in enumerate(data[1:], start=2):
html_parts.append(" <tr>")
for col_idx, cell in enumerate(row, start=1):
coord = f"R{row_idx}C{col_idx}"
html_parts.append(
f' <td data-coord="{coord}">{self._escape_html(cell)}</td>'
)
html_parts.append(" </tr>")
html_parts.append(" </tbody>")
html_parts.append("</table>")
return "\n".join(html_parts)
def _escape_html(self, text: str) -> str:
"""转义 HTML 特殊字符"""
return (
text
.replace("&", "&amp;")
.replace("<", "&lt;")
.replace(">", "&gt;")
.replace('"', "&quot;")
.replace("'", "&#39;")
)
def _detect_table_type(self, caption: Optional[str]) -> str:
"""
检测表格类型
Returns:
BASELINE / OUTCOME / OTHER
"""
if not caption:
return "OTHER"
caption_lower = caption.lower()
for keyword in BASELINE_KEYWORDS:
if keyword in caption_lower:
return "BASELINE"
for keyword in OUTCOME_KEYWORDS:
if keyword in caption_lower:
return "OUTCOME"
return "OTHER"