Files
AIclinicalresearch/extraction_service/forensics/extractor.py
HaHafeng f9ed0c2528 feat(rvw): Complete V2.0 Week 3 - Statistical validation extension and UX improvements
Week 3 Development Summary:

- Implement negative sign normalization (6 Unicode variants)

- Enhance T-test validation with smart sample size extraction

- Enhance SE triangle and CI-P consistency validation with subrow support

- Add precise sub-cell highlighting for P-values in multi-line cells

- Add frontend issue type Chinese translations (6 new types)

- Add file format tips for PDF/DOC uploads

Technical improvements:

- Add _clean_statistical_text() in extractor.py

- Add _safe_float() wrapper in validator.py

- Add ForensicsReport.tsx component

- Update ISSUE_TYPE_LABELS translations

Documentation:

- Add 2026-02-18 development record

- Update RVW module status (v5.1)

- Update system status (v5.2)

Status: Week 3 complete, ready for Week 4 testing
Co-authored-by: Cursor <cursoragent@cursor.com>
2026-02-18 18:26:16 +08:00

489 lines
17 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
数据侦探模块 - Word 表格提取器
使用 python-docx 解析 Word 文档,提取表格数据并生成 HTML 片段。
功能:
- 解析 Word DOM 结构
- 处理合并单元格Forward Fill 策略)
- 关联表格 Caption向前回溯
- 生成 HTML 片段(含 data-coord 属性)
"""
from docx import Document
from docx.table import Table, _Cell
from docx.text.paragraph import Paragraph
from loguru import logger
from typing import List, Optional, Tuple
import re
from .types import TableData, Issue, Severity, IssueType, CellLocation, ForensicsConfig
from .config import (
MAX_TABLE_ROWS,
MAX_TABLES_PER_DOC,
BASELINE_KEYWORDS,
OUTCOME_KEYWORDS,
)
class DocxTableExtractor:
"""
Word 表格提取器
提取 .docx 文件中的所有表格,处理合并单元格,生成 HTML 片段。
"""
def __init__(self, config: ForensicsConfig):
self.config = config
self.max_table_rows = config.max_table_rows
def extract(self, file_path: str) -> Tuple[List[TableData], str]:
"""
提取 Word 文档中的所有表格
Args:
file_path: .docx 文件路径
Returns:
(tables, full_text): 表格列表和全文文本
"""
logger.info(f"开始提取表格: {file_path}")
try:
doc = Document(file_path)
except Exception as e:
logger.error(f"无法打开 Word 文档: {e}")
raise ValueError(f"无法打开 Word 文档: {e}")
tables: List[TableData] = []
full_text_parts: List[str] = []
# 收集所有段落文本(用于方法检测)
for para in doc.paragraphs:
full_text_parts.append(para.text)
# 遍历文档元素,关联表格和 Caption
table_index = 0
prev_paragraphs: List[str] = []
for element in doc.element.body:
# 段落元素
if element.tag.endswith('p'):
para = Paragraph(element, doc)
prev_paragraphs.append(para.text.strip())
# 只保留最近 3 个段落用于 Caption 匹配
if len(prev_paragraphs) > 3:
prev_paragraphs.pop(0)
# 表格元素
elif element.tag.endswith('tbl'):
if table_index >= MAX_TABLES_PER_DOC:
logger.warning(f"表格数量超过限制 ({MAX_TABLES_PER_DOC}),跳过剩余表格")
break
# 获取 python-docx Table 对象
table = Table(element, doc)
# 提取 Caption
caption = self._find_caption(prev_paragraphs)
# 提取表格数据
table_data = self._extract_table(
table=table,
table_id=f"tbl_{table_index}",
caption=caption
)
tables.append(table_data)
table_index += 1
# 清空前置段落
prev_paragraphs = []
full_text = "\n".join(full_text_parts)
logger.info(f"提取完成: {len(tables)} 个表格, {len(full_text)} 字符")
return tables, full_text
def _find_caption(self, prev_paragraphs: List[str]) -> Optional[str]:
"""
从前置段落中查找表格 Caption
匹配模式:
- "Table 1. xxx""表 1 xxx"
- "Table 1: xxx"
"""
caption_pattern = re.compile(
r"^(Table|表)\s*\d+[\.:\s]",
re.IGNORECASE
)
# 从后向前查找
for para in reversed(prev_paragraphs):
if para and caption_pattern.match(para):
return para
return None
def _extract_table(
self,
table: Table,
table_id: str,
caption: Optional[str]
) -> TableData:
"""
提取单个表格数据
Args:
table: python-docx Table 对象
table_id: 表格 ID
caption: 表格标题
Returns:
TableData 对象
"""
rows = table.rows
row_count = len(rows)
col_count = len(rows[0].cells) if rows else 0
# 检查是否超过行数限制
if row_count > self.max_table_rows:
logger.warning(f"表格 {table_id} 行数 ({row_count}) 超过限制 ({self.max_table_rows}),跳过")
return TableData(
id=table_id,
caption=caption,
type=self._detect_table_type(caption),
row_count=row_count,
col_count=col_count,
html=f"<p class='warning'>表格行数 ({row_count}) 超过限制 ({self.max_table_rows}),已跳过</p>",
data=[],
issues=[
Issue(
severity=Severity.WARNING,
type=IssueType.TABLE_SKIPPED,
message=f"表格行数 ({row_count}) 超过限制 ({self.max_table_rows})",
location=CellLocation(table_id=table_id, row=1, col=1),
evidence={"row_count": row_count, "max_rows": self.max_table_rows}
)
],
skipped=True,
skip_reason=f"行数超限: {row_count} > {self.max_table_rows}"
)
# 提取原始数据(处理合并单元格)
data = self._extract_with_merge_handling(table)
# 生成 HTML
html = self._generate_html(table_id, caption, data)
# 检测表格类型
table_type = self._detect_table_type(caption)
return TableData(
id=table_id,
caption=caption,
type=table_type,
row_count=len(data),
col_count=len(data[0]) if data else 0,
html=html,
data=data,
issues=[],
skipped=False,
skip_reason=None
)
def _extract_with_merge_handling(self, table: Table) -> List[List[str]]:
"""
提取表格数据,处理合并单元格
使用 Forward Fill 策略:
- 水平合并:将值复制到所有合并的单元格
- 垂直合并:将上方单元格的值填充到下方
"""
rows = table.rows
if not rows:
return []
# 首先获取表格的真实维度
num_rows = len(rows)
num_cols = len(rows[0].cells)
# 初始化数据矩阵
data: List[List[str]] = [["" for _ in range(num_cols)] for _ in range(num_rows)]
# 记录每个单元格是否已被处理(用于处理合并单元格)
processed = [[False for _ in range(num_cols)] for _ in range(num_rows)]
for row_idx, row in enumerate(rows):
col_idx = 0
for cell in row.cells:
# 跳过已处理的单元格(合并单元格的一部分)
while col_idx < num_cols and processed[row_idx][col_idx]:
col_idx += 1
if col_idx >= num_cols:
break
# 获取单元格文本(保留换行符用于 HTML 显示)
cell_text = self._get_cell_text(cell, use_newline=True)
# 检测合并范围
# python-docx 中合并单元格会重复出现同一个 cell 对象
# 我们通过比较 cell._tc 来检测
merge_width = 1
merge_height = 1
# 检测水平合并
for next_col in range(col_idx + 1, num_cols):
if next_col < len(row.cells):
next_cell = row.cells[next_col]
if next_cell._tc is cell._tc:
merge_width += 1
else:
break
# 填充数据
for r in range(row_idx, min(row_idx + merge_height, num_rows)):
for c in range(col_idx, min(col_idx + merge_width, num_cols)):
data[r][c] = cell_text
processed[r][c] = True
col_idx += merge_width
return data
# Symbol 字体字符映射表Word 使用 Symbol 字体表示希腊字母等)
SYMBOL_CHAR_MAP = {
'F063': 'χ', # chi
'F032': '²', # superscript 2
'F061': 'α', # alpha
'F062': 'β', # beta
'F067': 'γ', # gamma
'F064': 'δ', # delta
'F065': 'ε', # epsilon
'F06D': 'μ', # mu
'F073': 'σ', # sigma
'F070': 'π', # pi
'F0B2': '²', # another superscript 2 encoding
}
def _clean_statistical_text(self, text: str) -> str:
"""
清洗统计学文本中的特殊字符
关键清洗:
1. 负号归一化(最重要!防止 float() 崩溃)
2. 比较符归一化
3. 零宽字符清理
"""
if not text:
return ""
# 1. 负号归一化(极高危!)
# Word 会自动把连字符转成破折号或数学减号,导致 float() 报错
text = text.replace('\u2212', '-') # 数学减号 (Minus Sign)
text = text.replace('\u2013', '-') # En Dash
text = text.replace('\u2014', '-') # Em Dash
text = text.replace('\u2010', '-') # Hyphen
text = text.replace('\u2011', '-') # Non-Breaking Hyphen
text = text.replace('\u00ad', '-') # Soft Hyphen
# 2. 比较符归一化
text = text.replace('\u2264', '<=') # ≤
text = text.replace('\u2265', '>=') # ≥
text = text.replace('\u2260', '!=') # ≠
text = text.replace('\u2248', '~=') # ≈
# 3. 加减号归一化
# 保留 ± 原样,因为它在统计学中有特定含义(如 mean±SD
# text = text.replace('\u00b1', '+/-') # ±
# 4. 乘号归一化
text = text.replace('\u00d7', 'x') # ×
text = text.replace('\u2217', '*') # (asterisk operator)
# 5. 零宽字符清理
text = text.replace('\u200b', '') # Zero-Width Space
text = text.replace('\u200c', '') # Zero-Width Non-Joiner
text = text.replace('\u200d', '') # Zero-Width Joiner
text = text.replace('\ufeff', '') # BOM / Zero-Width No-Break Space
text = text.replace('\u00a0', ' ') # Non-Breaking Space -> 普通空格
return text
def _get_cell_text(self, cell: _Cell, use_newline: bool = False) -> str:
"""
获取单元格文本(合并多个段落)
Args:
cell: Word 单元格对象
use_newline: 是否使用换行符连接段落(用于 HTML 显示)
注意:会处理 Word 的 <w:sym> 符号字符(如 χ² 等)
"""
paragraphs = cell.paragraphs
texts = []
for para in paragraphs:
# 使用增强的文本提取(处理符号字符)
para_text = self._extract_paragraph_text(para)
if para_text.strip():
texts.append(para_text.strip())
separator = "\n" if use_newline else " "
raw_text = separator.join(texts).strip()
# 清洗统计学特殊字符(负号归一化等)
return self._clean_statistical_text(raw_text)
def _extract_paragraph_text(self, para: Paragraph) -> str:
"""
从段落中提取完整文本,包括 <w:sym> 符号字符
Word 使用 <w:sym w:font="Symbol" w:char="F063"/> 表示 χ 等符号,
python-docx 的 paragraph.text 不会提取这些内容。
"""
from docx.oxml.ns import qn
text_parts = []
# 遍历段落中的所有 run 元素
for run in para._p.iter():
# 处理普通文本
if run.tag == qn('w:t'):
text_parts.append(run.text or '')
# 处理符号字符 <w:sym>
elif run.tag == qn('w:sym'):
font = run.get(qn('w:font'))
char_code = run.get(qn('w:char'))
if font == 'Symbol' and char_code:
# 查找映射
unicode_char = self.SYMBOL_CHAR_MAP.get(char_code.upper(), '')
if unicode_char:
text_parts.append(unicode_char)
else:
# 未知符号,记录警告
logger.debug(f"Unknown Symbol char: {char_code}")
text_parts.append(f'[SYM:{char_code}]')
return ''.join(text_parts)
def _generate_html(
self,
table_id: str,
caption: Optional[str],
data: List[List[str]]
) -> str:
"""
生成 HTML 片段,包含 data-coord 属性用于前端高亮
"""
if not data:
return f"<table id='{table_id}' class='forensics-table'><tr><td>空表格</td></tr></table>"
html_parts = [f"<table id='{table_id}' class='forensics-table'>"]
# 添加 Caption
if caption:
html_parts.append(f" <caption>{self._escape_html(caption)}</caption>")
# 添加表头(假设第一行是表头)
html_parts.append(" <thead>")
html_parts.append(" <tr>")
for col_idx, cell in enumerate(data[0], start=1):
coord = f"R1C{col_idx}"
html_parts.append(
f' <th data-coord="{coord}">{self._escape_html(cell)}</th>'
)
html_parts.append(" </tr>")
html_parts.append(" </thead>")
# 添加表体
html_parts.append(" <tbody>")
for row_idx, row in enumerate(data[1:], start=2):
html_parts.append(" <tr>")
for col_idx, cell in enumerate(row, start=1):
coord = f"R{row_idx}C{col_idx}"
# 为每个子行添加 span 标记,支持细粒度高亮
cell_html = self._escape_html_with_subrows(cell, coord)
html_parts.append(
f' <td data-coord="{coord}">{cell_html}</td>'
)
html_parts.append(" </tr>")
html_parts.append(" </tbody>")
html_parts.append("</table>")
return "\n".join(html_parts)
def _escape_html(self, text: str) -> str:
"""转义 HTML 特殊字符,并将换行符转换为 <br>"""
escaped = (
text
.replace("&", "&amp;")
.replace("<", "&lt;")
.replace(">", "&gt;")
.replace('"', "&quot;")
.replace("'", "&#39;")
)
# 将换行符转换为 <br> 标签,保留表格中的多行结构
return escaped.replace("\n", "<br>")
def _escape_html_with_subrows(self, text: str, coord: str) -> str:
"""
转义 HTML 并为每个子行添加 span 标记,支持细粒度高亮
例如:单元格内容 "0.017\n0.01\n<0.001" 会生成:
<span data-subcoord="R5C5S1">0.017</span><br>
<span data-subcoord="R5C5S2">0.01</span><br>
<span data-subcoord="R5C5S3">&lt;0.001</span>
"""
lines = text.split("\n")
if len(lines) == 1:
# 单行内容,直接转义
return self._escape_single(text)
# 多行内容,为每行添加 span
result_parts = []
for idx, line in enumerate(lines, start=1):
escaped_line = self._escape_single(line)
subcoord = f"{coord}S{idx}"
result_parts.append(f'<span data-subcoord="{subcoord}">{escaped_line}</span>')
return "<br>".join(result_parts)
def _escape_single(self, text: str) -> str:
"""转义单行文本的 HTML 特殊字符"""
return (
text
.replace("&", "&amp;")
.replace("<", "&lt;")
.replace(">", "&gt;")
.replace('"', "&quot;")
.replace("'", "&#39;")
)
def _detect_table_type(self, caption: Optional[str]) -> str:
"""
检测表格类型
Returns:
BASELINE / OUTCOME / OTHER
"""
if not caption:
return "OTHER"
caption_lower = caption.lower()
for keyword in BASELINE_KEYWORDS:
if keyword in caption_lower:
return "BASELINE"
for keyword in OUTCOME_KEYWORDS:
if keyword in caption_lower:
return "OUTCOME"
return "OTHER"