Week 3 Development Summary: - Implement negative sign normalization (6 Unicode variants) - Enhance T-test validation with smart sample size extraction - Enhance SE triangle and CI-P consistency validation with subrow support - Add precise sub-cell highlighting for P-values in multi-line cells - Add frontend issue type Chinese translations (6 new types) - Add file format tips for PDF/DOC uploads Technical improvements: - Add _clean_statistical_text() in extractor.py - Add _safe_float() wrapper in validator.py - Add ForensicsReport.tsx component - Update ISSUE_TYPE_LABELS translations Documentation: - Add 2026-02-18 development record - Update RVW module status (v5.1) - Update system status (v5.2) Status: Week 3 complete, ready for Week 4 testing Co-authored-by: Cursor <cursoragent@cursor.com>
489 lines
17 KiB
Python
489 lines
17 KiB
Python
"""
|
||
数据侦探模块 - Word 表格提取器
|
||
|
||
使用 python-docx 解析 Word 文档,提取表格数据并生成 HTML 片段。
|
||
|
||
功能:
|
||
- 解析 Word DOM 结构
|
||
- 处理合并单元格(Forward Fill 策略)
|
||
- 关联表格 Caption(向前回溯)
|
||
- 生成 HTML 片段(含 data-coord 属性)
|
||
"""
|
||
|
||
from docx import Document
|
||
from docx.table import Table, _Cell
|
||
from docx.text.paragraph import Paragraph
|
||
from loguru import logger
|
||
from typing import List, Optional, Tuple
|
||
import re
|
||
|
||
from .types import TableData, Issue, Severity, IssueType, CellLocation, ForensicsConfig
|
||
from .config import (
|
||
MAX_TABLE_ROWS,
|
||
MAX_TABLES_PER_DOC,
|
||
BASELINE_KEYWORDS,
|
||
OUTCOME_KEYWORDS,
|
||
)
|
||
|
||
|
||
class DocxTableExtractor:
|
||
"""
|
||
Word 表格提取器
|
||
|
||
提取 .docx 文件中的所有表格,处理合并单元格,生成 HTML 片段。
|
||
"""
|
||
|
||
def __init__(self, config: ForensicsConfig):
|
||
self.config = config
|
||
self.max_table_rows = config.max_table_rows
|
||
|
||
def extract(self, file_path: str) -> Tuple[List[TableData], str]:
|
||
"""
|
||
提取 Word 文档中的所有表格
|
||
|
||
Args:
|
||
file_path: .docx 文件路径
|
||
|
||
Returns:
|
||
(tables, full_text): 表格列表和全文文本
|
||
"""
|
||
logger.info(f"开始提取表格: {file_path}")
|
||
|
||
try:
|
||
doc = Document(file_path)
|
||
except Exception as e:
|
||
logger.error(f"无法打开 Word 文档: {e}")
|
||
raise ValueError(f"无法打开 Word 文档: {e}")
|
||
|
||
tables: List[TableData] = []
|
||
full_text_parts: List[str] = []
|
||
|
||
# 收集所有段落文本(用于方法检测)
|
||
for para in doc.paragraphs:
|
||
full_text_parts.append(para.text)
|
||
|
||
# 遍历文档元素,关联表格和 Caption
|
||
table_index = 0
|
||
prev_paragraphs: List[str] = []
|
||
|
||
for element in doc.element.body:
|
||
# 段落元素
|
||
if element.tag.endswith('p'):
|
||
para = Paragraph(element, doc)
|
||
prev_paragraphs.append(para.text.strip())
|
||
# 只保留最近 3 个段落用于 Caption 匹配
|
||
if len(prev_paragraphs) > 3:
|
||
prev_paragraphs.pop(0)
|
||
|
||
# 表格元素
|
||
elif element.tag.endswith('tbl'):
|
||
if table_index >= MAX_TABLES_PER_DOC:
|
||
logger.warning(f"表格数量超过限制 ({MAX_TABLES_PER_DOC}),跳过剩余表格")
|
||
break
|
||
|
||
# 获取 python-docx Table 对象
|
||
table = Table(element, doc)
|
||
|
||
# 提取 Caption
|
||
caption = self._find_caption(prev_paragraphs)
|
||
|
||
# 提取表格数据
|
||
table_data = self._extract_table(
|
||
table=table,
|
||
table_id=f"tbl_{table_index}",
|
||
caption=caption
|
||
)
|
||
|
||
tables.append(table_data)
|
||
table_index += 1
|
||
|
||
# 清空前置段落
|
||
prev_paragraphs = []
|
||
|
||
full_text = "\n".join(full_text_parts)
|
||
|
||
logger.info(f"提取完成: {len(tables)} 个表格, {len(full_text)} 字符")
|
||
|
||
return tables, full_text
|
||
|
||
def _find_caption(self, prev_paragraphs: List[str]) -> Optional[str]:
|
||
"""
|
||
从前置段落中查找表格 Caption
|
||
|
||
匹配模式:
|
||
- "Table 1. xxx" 或 "表 1 xxx"
|
||
- "Table 1: xxx"
|
||
"""
|
||
caption_pattern = re.compile(
|
||
r"^(Table|表)\s*\d+[\.:\s]",
|
||
re.IGNORECASE
|
||
)
|
||
|
||
# 从后向前查找
|
||
for para in reversed(prev_paragraphs):
|
||
if para and caption_pattern.match(para):
|
||
return para
|
||
|
||
return None
|
||
|
||
def _extract_table(
|
||
self,
|
||
table: Table,
|
||
table_id: str,
|
||
caption: Optional[str]
|
||
) -> TableData:
|
||
"""
|
||
提取单个表格数据
|
||
|
||
Args:
|
||
table: python-docx Table 对象
|
||
table_id: 表格 ID
|
||
caption: 表格标题
|
||
|
||
Returns:
|
||
TableData 对象
|
||
"""
|
||
rows = table.rows
|
||
row_count = len(rows)
|
||
col_count = len(rows[0].cells) if rows else 0
|
||
|
||
# 检查是否超过行数限制
|
||
if row_count > self.max_table_rows:
|
||
logger.warning(f"表格 {table_id} 行数 ({row_count}) 超过限制 ({self.max_table_rows}),跳过")
|
||
return TableData(
|
||
id=table_id,
|
||
caption=caption,
|
||
type=self._detect_table_type(caption),
|
||
row_count=row_count,
|
||
col_count=col_count,
|
||
html=f"<p class='warning'>表格行数 ({row_count}) 超过限制 ({self.max_table_rows}),已跳过</p>",
|
||
data=[],
|
||
issues=[
|
||
Issue(
|
||
severity=Severity.WARNING,
|
||
type=IssueType.TABLE_SKIPPED,
|
||
message=f"表格行数 ({row_count}) 超过限制 ({self.max_table_rows})",
|
||
location=CellLocation(table_id=table_id, row=1, col=1),
|
||
evidence={"row_count": row_count, "max_rows": self.max_table_rows}
|
||
)
|
||
],
|
||
skipped=True,
|
||
skip_reason=f"行数超限: {row_count} > {self.max_table_rows}"
|
||
)
|
||
|
||
# 提取原始数据(处理合并单元格)
|
||
data = self._extract_with_merge_handling(table)
|
||
|
||
# 生成 HTML
|
||
html = self._generate_html(table_id, caption, data)
|
||
|
||
# 检测表格类型
|
||
table_type = self._detect_table_type(caption)
|
||
|
||
return TableData(
|
||
id=table_id,
|
||
caption=caption,
|
||
type=table_type,
|
||
row_count=len(data),
|
||
col_count=len(data[0]) if data else 0,
|
||
html=html,
|
||
data=data,
|
||
issues=[],
|
||
skipped=False,
|
||
skip_reason=None
|
||
)
|
||
|
||
def _extract_with_merge_handling(self, table: Table) -> List[List[str]]:
|
||
"""
|
||
提取表格数据,处理合并单元格
|
||
|
||
使用 Forward Fill 策略:
|
||
- 水平合并:将值复制到所有合并的单元格
|
||
- 垂直合并:将上方单元格的值填充到下方
|
||
"""
|
||
rows = table.rows
|
||
if not rows:
|
||
return []
|
||
|
||
# 首先获取表格的真实维度
|
||
num_rows = len(rows)
|
||
num_cols = len(rows[0].cells)
|
||
|
||
# 初始化数据矩阵
|
||
data: List[List[str]] = [["" for _ in range(num_cols)] for _ in range(num_rows)]
|
||
|
||
# 记录每个单元格是否已被处理(用于处理合并单元格)
|
||
processed = [[False for _ in range(num_cols)] for _ in range(num_rows)]
|
||
|
||
for row_idx, row in enumerate(rows):
|
||
col_idx = 0
|
||
for cell in row.cells:
|
||
# 跳过已处理的单元格(合并单元格的一部分)
|
||
while col_idx < num_cols and processed[row_idx][col_idx]:
|
||
col_idx += 1
|
||
|
||
if col_idx >= num_cols:
|
||
break
|
||
|
||
# 获取单元格文本(保留换行符用于 HTML 显示)
|
||
cell_text = self._get_cell_text(cell, use_newline=True)
|
||
|
||
# 检测合并范围
|
||
# python-docx 中合并单元格会重复出现同一个 cell 对象
|
||
# 我们通过比较 cell._tc 来检测
|
||
merge_width = 1
|
||
merge_height = 1
|
||
|
||
# 检测水平合并
|
||
for next_col in range(col_idx + 1, num_cols):
|
||
if next_col < len(row.cells):
|
||
next_cell = row.cells[next_col]
|
||
if next_cell._tc is cell._tc:
|
||
merge_width += 1
|
||
else:
|
||
break
|
||
|
||
# 填充数据
|
||
for r in range(row_idx, min(row_idx + merge_height, num_rows)):
|
||
for c in range(col_idx, min(col_idx + merge_width, num_cols)):
|
||
data[r][c] = cell_text
|
||
processed[r][c] = True
|
||
|
||
col_idx += merge_width
|
||
|
||
return data
|
||
|
||
# Symbol 字体字符映射表(Word 使用 Symbol 字体表示希腊字母等)
|
||
SYMBOL_CHAR_MAP = {
|
||
'F063': 'χ', # chi
|
||
'F032': '²', # superscript 2
|
||
'F061': 'α', # alpha
|
||
'F062': 'β', # beta
|
||
'F067': 'γ', # gamma
|
||
'F064': 'δ', # delta
|
||
'F065': 'ε', # epsilon
|
||
'F06D': 'μ', # mu
|
||
'F073': 'σ', # sigma
|
||
'F070': 'π', # pi
|
||
'F0B2': '²', # another superscript 2 encoding
|
||
}
|
||
|
||
def _clean_statistical_text(self, text: str) -> str:
|
||
"""
|
||
清洗统计学文本中的特殊字符
|
||
|
||
关键清洗:
|
||
1. 负号归一化(最重要!防止 float() 崩溃)
|
||
2. 比较符归一化
|
||
3. 零宽字符清理
|
||
"""
|
||
if not text:
|
||
return ""
|
||
|
||
# 1. 负号归一化(极高危!)
|
||
# Word 会自动把连字符转成破折号或数学减号,导致 float() 报错
|
||
text = text.replace('\u2212', '-') # 数学减号 (Minus Sign)
|
||
text = text.replace('\u2013', '-') # En Dash
|
||
text = text.replace('\u2014', '-') # Em Dash
|
||
text = text.replace('\u2010', '-') # Hyphen
|
||
text = text.replace('\u2011', '-') # Non-Breaking Hyphen
|
||
text = text.replace('\u00ad', '-') # Soft Hyphen
|
||
|
||
# 2. 比较符归一化
|
||
text = text.replace('\u2264', '<=') # ≤
|
||
text = text.replace('\u2265', '>=') # ≥
|
||
text = text.replace('\u2260', '!=') # ≠
|
||
text = text.replace('\u2248', '~=') # ≈
|
||
|
||
# 3. 加减号归一化
|
||
# 保留 ± 原样,因为它在统计学中有特定含义(如 mean±SD)
|
||
# text = text.replace('\u00b1', '+/-') # ±
|
||
|
||
# 4. 乘号归一化
|
||
text = text.replace('\u00d7', 'x') # ×
|
||
text = text.replace('\u2217', '*') # ∗ (asterisk operator)
|
||
|
||
# 5. 零宽字符清理
|
||
text = text.replace('\u200b', '') # Zero-Width Space
|
||
text = text.replace('\u200c', '') # Zero-Width Non-Joiner
|
||
text = text.replace('\u200d', '') # Zero-Width Joiner
|
||
text = text.replace('\ufeff', '') # BOM / Zero-Width No-Break Space
|
||
text = text.replace('\u00a0', ' ') # Non-Breaking Space -> 普通空格
|
||
|
||
return text
|
||
|
||
def _get_cell_text(self, cell: _Cell, use_newline: bool = False) -> str:
|
||
"""
|
||
获取单元格文本(合并多个段落)
|
||
|
||
Args:
|
||
cell: Word 单元格对象
|
||
use_newline: 是否使用换行符连接段落(用于 HTML 显示)
|
||
|
||
注意:会处理 Word 的 <w:sym> 符号字符(如 χ² 等)
|
||
"""
|
||
paragraphs = cell.paragraphs
|
||
texts = []
|
||
|
||
for para in paragraphs:
|
||
# 使用增强的文本提取(处理符号字符)
|
||
para_text = self._extract_paragraph_text(para)
|
||
if para_text.strip():
|
||
texts.append(para_text.strip())
|
||
|
||
separator = "\n" if use_newline else " "
|
||
raw_text = separator.join(texts).strip()
|
||
|
||
# 清洗统计学特殊字符(负号归一化等)
|
||
return self._clean_statistical_text(raw_text)
|
||
|
||
def _extract_paragraph_text(self, para: Paragraph) -> str:
|
||
"""
|
||
从段落中提取完整文本,包括 <w:sym> 符号字符
|
||
|
||
Word 使用 <w:sym w:font="Symbol" w:char="F063"/> 表示 χ 等符号,
|
||
python-docx 的 paragraph.text 不会提取这些内容。
|
||
"""
|
||
from docx.oxml.ns import qn
|
||
|
||
text_parts = []
|
||
|
||
# 遍历段落中的所有 run 元素
|
||
for run in para._p.iter():
|
||
# 处理普通文本
|
||
if run.tag == qn('w:t'):
|
||
text_parts.append(run.text or '')
|
||
|
||
# 处理符号字符 <w:sym>
|
||
elif run.tag == qn('w:sym'):
|
||
font = run.get(qn('w:font'))
|
||
char_code = run.get(qn('w:char'))
|
||
|
||
if font == 'Symbol' and char_code:
|
||
# 查找映射
|
||
unicode_char = self.SYMBOL_CHAR_MAP.get(char_code.upper(), '')
|
||
if unicode_char:
|
||
text_parts.append(unicode_char)
|
||
else:
|
||
# 未知符号,记录警告
|
||
logger.debug(f"Unknown Symbol char: {char_code}")
|
||
text_parts.append(f'[SYM:{char_code}]')
|
||
|
||
return ''.join(text_parts)
|
||
|
||
def _generate_html(
|
||
self,
|
||
table_id: str,
|
||
caption: Optional[str],
|
||
data: List[List[str]]
|
||
) -> str:
|
||
"""
|
||
生成 HTML 片段,包含 data-coord 属性用于前端高亮
|
||
"""
|
||
if not data:
|
||
return f"<table id='{table_id}' class='forensics-table'><tr><td>空表格</td></tr></table>"
|
||
|
||
html_parts = [f"<table id='{table_id}' class='forensics-table'>"]
|
||
|
||
# 添加 Caption
|
||
if caption:
|
||
html_parts.append(f" <caption>{self._escape_html(caption)}</caption>")
|
||
|
||
# 添加表头(假设第一行是表头)
|
||
html_parts.append(" <thead>")
|
||
html_parts.append(" <tr>")
|
||
for col_idx, cell in enumerate(data[0], start=1):
|
||
coord = f"R1C{col_idx}"
|
||
html_parts.append(
|
||
f' <th data-coord="{coord}">{self._escape_html(cell)}</th>'
|
||
)
|
||
html_parts.append(" </tr>")
|
||
html_parts.append(" </thead>")
|
||
|
||
# 添加表体
|
||
html_parts.append(" <tbody>")
|
||
for row_idx, row in enumerate(data[1:], start=2):
|
||
html_parts.append(" <tr>")
|
||
for col_idx, cell in enumerate(row, start=1):
|
||
coord = f"R{row_idx}C{col_idx}"
|
||
# 为每个子行添加 span 标记,支持细粒度高亮
|
||
cell_html = self._escape_html_with_subrows(cell, coord)
|
||
html_parts.append(
|
||
f' <td data-coord="{coord}">{cell_html}</td>'
|
||
)
|
||
html_parts.append(" </tr>")
|
||
html_parts.append(" </tbody>")
|
||
|
||
html_parts.append("</table>")
|
||
|
||
return "\n".join(html_parts)
|
||
|
||
def _escape_html(self, text: str) -> str:
|
||
"""转义 HTML 特殊字符,并将换行符转换为 <br>"""
|
||
escaped = (
|
||
text
|
||
.replace("&", "&")
|
||
.replace("<", "<")
|
||
.replace(">", ">")
|
||
.replace('"', """)
|
||
.replace("'", "'")
|
||
)
|
||
# 将换行符转换为 <br> 标签,保留表格中的多行结构
|
||
return escaped.replace("\n", "<br>")
|
||
|
||
def _escape_html_with_subrows(self, text: str, coord: str) -> str:
|
||
"""
|
||
转义 HTML 并为每个子行添加 span 标记,支持细粒度高亮
|
||
|
||
例如:单元格内容 "0.017\n0.01\n<0.001" 会生成:
|
||
<span data-subcoord="R5C5S1">0.017</span><br>
|
||
<span data-subcoord="R5C5S2">0.01</span><br>
|
||
<span data-subcoord="R5C5S3"><0.001</span>
|
||
"""
|
||
lines = text.split("\n")
|
||
if len(lines) == 1:
|
||
# 单行内容,直接转义
|
||
return self._escape_single(text)
|
||
|
||
# 多行内容,为每行添加 span
|
||
result_parts = []
|
||
for idx, line in enumerate(lines, start=1):
|
||
escaped_line = self._escape_single(line)
|
||
subcoord = f"{coord}S{idx}"
|
||
result_parts.append(f'<span data-subcoord="{subcoord}">{escaped_line}</span>')
|
||
|
||
return "<br>".join(result_parts)
|
||
|
||
def _escape_single(self, text: str) -> str:
|
||
"""转义单行文本的 HTML 特殊字符"""
|
||
return (
|
||
text
|
||
.replace("&", "&")
|
||
.replace("<", "<")
|
||
.replace(">", ">")
|
||
.replace('"', """)
|
||
.replace("'", "'")
|
||
)
|
||
|
||
def _detect_table_type(self, caption: Optional[str]) -> str:
|
||
"""
|
||
检测表格类型
|
||
|
||
Returns:
|
||
BASELINE / OUTCOME / OTHER
|
||
"""
|
||
if not caption:
|
||
return "OTHER"
|
||
|
||
caption_lower = caption.lower()
|
||
|
||
for keyword in BASELINE_KEYWORDS:
|
||
if keyword in caption_lower:
|
||
return "BASELINE"
|
||
|
||
for keyword in OUTCOME_KEYWORDS:
|
||
if keyword in caption_lower:
|
||
return "OUTCOME"
|
||
|
||
return "OTHER"
|