""" 数据侦探模块 - Word 表格提取器 使用 python-docx 解析 Word 文档,提取表格数据并生成 HTML 片段。 功能: - 解析 Word DOM 结构 - 处理合并单元格(Forward Fill 策略) - 关联表格 Caption(向前回溯) - 生成 HTML 片段(含 data-coord 属性) """ from docx import Document from docx.table import Table, _Cell from docx.text.paragraph import Paragraph from loguru import logger from typing import List, Optional, Tuple import re from .types import TableData, Issue, Severity, IssueType, CellLocation, ForensicsConfig from .config import ( MAX_TABLE_ROWS, MAX_TABLES_PER_DOC, BASELINE_KEYWORDS, OUTCOME_KEYWORDS, ) class DocxTableExtractor: """ Word 表格提取器 提取 .docx 文件中的所有表格,处理合并单元格,生成 HTML 片段。 """ def __init__(self, config: ForensicsConfig): self.config = config self.max_table_rows = config.max_table_rows def extract(self, file_path: str) -> Tuple[List[TableData], str]: """ 提取 Word 文档中的所有表格 Args: file_path: .docx 文件路径 Returns: (tables, full_text): 表格列表和全文文本 """ logger.info(f"开始提取表格: {file_path}") try: doc = Document(file_path) except Exception as e: logger.error(f"无法打开 Word 文档: {e}") raise ValueError(f"无法打开 Word 文档: {e}") tables: List[TableData] = [] full_text_parts: List[str] = [] # 收集所有段落文本(用于方法检测) for para in doc.paragraphs: full_text_parts.append(para.text) # 遍历文档元素,关联表格和 Caption table_index = 0 prev_paragraphs: List[str] = [] for element in doc.element.body: # 段落元素 if element.tag.endswith('p'): para = Paragraph(element, doc) prev_paragraphs.append(para.text.strip()) # 只保留最近 3 个段落用于 Caption 匹配 if len(prev_paragraphs) > 3: prev_paragraphs.pop(0) # 表格元素 elif element.tag.endswith('tbl'): if table_index >= MAX_TABLES_PER_DOC: logger.warning(f"表格数量超过限制 ({MAX_TABLES_PER_DOC}),跳过剩余表格") break # 获取 python-docx Table 对象 table = Table(element, doc) # 提取 Caption caption = self._find_caption(prev_paragraphs) # 提取表格数据 table_data = self._extract_table( table=table, table_id=f"tbl_{table_index}", caption=caption ) tables.append(table_data) table_index += 1 # 清空前置段落 prev_paragraphs = [] full_text = "\n".join(full_text_parts) logger.info(f"提取完成: {len(tables)} 个表格, {len(full_text)} 字符") return tables, full_text def _find_caption(self, prev_paragraphs: List[str]) -> Optional[str]: """ 从前置段落中查找表格 Caption 匹配模式: - "Table 1. xxx" 或 "表 1 xxx" - "Table 1: xxx" """ caption_pattern = re.compile( r"^(Table|表)\s*\d+[\.:\s]", re.IGNORECASE ) # 从后向前查找 for para in reversed(prev_paragraphs): if para and caption_pattern.match(para): return para return None def _extract_table( self, table: Table, table_id: str, caption: Optional[str] ) -> TableData: """ 提取单个表格数据 Args: table: python-docx Table 对象 table_id: 表格 ID caption: 表格标题 Returns: TableData 对象 """ rows = table.rows row_count = len(rows) col_count = len(rows[0].cells) if rows else 0 # 检查是否超过行数限制 if row_count > self.max_table_rows: logger.warning(f"表格 {table_id} 行数 ({row_count}) 超过限制 ({self.max_table_rows}),跳过") return TableData( id=table_id, caption=caption, type=self._detect_table_type(caption), row_count=row_count, col_count=col_count, html=f"

表格行数 ({row_count}) 超过限制 ({self.max_table_rows}),已跳过

", data=[], issues=[ Issue( severity=Severity.WARNING, type=IssueType.TABLE_SKIPPED, message=f"表格行数 ({row_count}) 超过限制 ({self.max_table_rows})", location=CellLocation(table_id=table_id, row=1, col=1), evidence={"row_count": row_count, "max_rows": self.max_table_rows} ) ], skipped=True, skip_reason=f"行数超限: {row_count} > {self.max_table_rows}" ) # 提取原始数据(处理合并单元格) data = self._extract_with_merge_handling(table) # 生成 HTML html = self._generate_html(table_id, caption, data) # 检测表格类型 table_type = self._detect_table_type(caption) return TableData( id=table_id, caption=caption, type=table_type, row_count=len(data), col_count=len(data[0]) if data else 0, html=html, data=data, issues=[], skipped=False, skip_reason=None ) def _extract_with_merge_handling(self, table: Table) -> List[List[str]]: """ 提取表格数据,处理合并单元格 使用 Forward Fill 策略: - 水平合并:将值复制到所有合并的单元格 - 垂直合并:将上方单元格的值填充到下方 """ rows = table.rows if not rows: return [] # 首先获取表格的真实维度 num_rows = len(rows) num_cols = len(rows[0].cells) # 初始化数据矩阵 data: List[List[str]] = [["" for _ in range(num_cols)] for _ in range(num_rows)] # 记录每个单元格是否已被处理(用于处理合并单元格) processed = [[False for _ in range(num_cols)] for _ in range(num_rows)] for row_idx, row in enumerate(rows): col_idx = 0 for cell in row.cells: # 跳过已处理的单元格(合并单元格的一部分) while col_idx < num_cols and processed[row_idx][col_idx]: col_idx += 1 if col_idx >= num_cols: break # 获取单元格文本(保留换行符用于 HTML 显示) cell_text = self._get_cell_text(cell, use_newline=True) # 检测合并范围 # python-docx 中合并单元格会重复出现同一个 cell 对象 # 我们通过比较 cell._tc 来检测 merge_width = 1 merge_height = 1 # 检测水平合并 for next_col in range(col_idx + 1, num_cols): if next_col < len(row.cells): next_cell = row.cells[next_col] if next_cell._tc is cell._tc: merge_width += 1 else: break # 填充数据 for r in range(row_idx, min(row_idx + merge_height, num_rows)): for c in range(col_idx, min(col_idx + merge_width, num_cols)): data[r][c] = cell_text processed[r][c] = True col_idx += merge_width return data # Symbol 字体字符映射表(Word 使用 Symbol 字体表示希腊字母等) SYMBOL_CHAR_MAP = { 'F063': 'χ', # chi 'F032': '²', # superscript 2 'F061': 'α', # alpha 'F062': 'β', # beta 'F067': 'γ', # gamma 'F064': 'δ', # delta 'F065': 'ε', # epsilon 'F06D': 'μ', # mu 'F073': 'σ', # sigma 'F070': 'π', # pi 'F0B2': '²', # another superscript 2 encoding } def _clean_statistical_text(self, text: str) -> str: """ 清洗统计学文本中的特殊字符 关键清洗: 1. 负号归一化(最重要!防止 float() 崩溃) 2. 比较符归一化 3. 零宽字符清理 """ if not text: return "" # 1. 负号归一化(极高危!) # Word 会自动把连字符转成破折号或数学减号,导致 float() 报错 text = text.replace('\u2212', '-') # 数学减号 (Minus Sign) text = text.replace('\u2013', '-') # En Dash text = text.replace('\u2014', '-') # Em Dash text = text.replace('\u2010', '-') # Hyphen text = text.replace('\u2011', '-') # Non-Breaking Hyphen text = text.replace('\u00ad', '-') # Soft Hyphen # 2. 比较符归一化 text = text.replace('\u2264', '<=') # ≤ text = text.replace('\u2265', '>=') # ≥ text = text.replace('\u2260', '!=') # ≠ text = text.replace('\u2248', '~=') # ≈ # 3. 加减号归一化 # 保留 ± 原样,因为它在统计学中有特定含义(如 mean±SD) # text = text.replace('\u00b1', '+/-') # ± # 4. 乘号归一化 text = text.replace('\u00d7', 'x') # × text = text.replace('\u2217', '*') # ∗ (asterisk operator) # 5. 零宽字符清理 text = text.replace('\u200b', '') # Zero-Width Space text = text.replace('\u200c', '') # Zero-Width Non-Joiner text = text.replace('\u200d', '') # Zero-Width Joiner text = text.replace('\ufeff', '') # BOM / Zero-Width No-Break Space text = text.replace('\u00a0', ' ') # Non-Breaking Space -> 普通空格 return text def _get_cell_text(self, cell: _Cell, use_newline: bool = False) -> str: """ 获取单元格文本(合并多个段落) Args: cell: Word 单元格对象 use_newline: 是否使用换行符连接段落(用于 HTML 显示) 注意:会处理 Word 的 符号字符(如 χ² 等) """ paragraphs = cell.paragraphs texts = [] for para in paragraphs: # 使用增强的文本提取(处理符号字符) para_text = self._extract_paragraph_text(para) if para_text.strip(): texts.append(para_text.strip()) separator = "\n" if use_newline else " " raw_text = separator.join(texts).strip() # 清洗统计学特殊字符(负号归一化等) return self._clean_statistical_text(raw_text) def _extract_paragraph_text(self, para: Paragraph) -> str: """ 从段落中提取完整文本,包括 符号字符 Word 使用 表示 χ 等符号, python-docx 的 paragraph.text 不会提取这些内容。 """ from docx.oxml.ns import qn text_parts = [] # 遍历段落中的所有 run 元素 for run in para._p.iter(): # 处理普通文本 if run.tag == qn('w:t'): text_parts.append(run.text or '') # 处理符号字符 elif run.tag == qn('w:sym'): font = run.get(qn('w:font')) char_code = run.get(qn('w:char')) if font == 'Symbol' and char_code: # 查找映射 unicode_char = self.SYMBOL_CHAR_MAP.get(char_code.upper(), '') if unicode_char: text_parts.append(unicode_char) else: # 未知符号,记录警告 logger.debug(f"Unknown Symbol char: {char_code}") text_parts.append(f'[SYM:{char_code}]') return ''.join(text_parts) def _generate_html( self, table_id: str, caption: Optional[str], data: List[List[str]] ) -> str: """ 生成 HTML 片段,包含 data-coord 属性用于前端高亮 """ if not data: return f"
空表格
" html_parts = [f""] # 添加 Caption if caption: html_parts.append(f" ") # 添加表头(假设第一行是表头) html_parts.append(" ") html_parts.append(" ") for col_idx, cell in enumerate(data[0], start=1): coord = f"R1C{col_idx}" html_parts.append( f' ' ) html_parts.append(" ") html_parts.append(" ") # 添加表体 html_parts.append(" ") for row_idx, row in enumerate(data[1:], start=2): html_parts.append(" ") for col_idx, cell in enumerate(row, start=1): coord = f"R{row_idx}C{col_idx}" # 为每个子行添加 span 标记,支持细粒度高亮 cell_html = self._escape_html_with_subrows(cell, coord) html_parts.append( f' ' ) html_parts.append(" ") html_parts.append(" ") html_parts.append("
{self._escape_html(caption)}
{self._escape_html(cell)}
{cell_html}
") return "\n".join(html_parts) def _escape_html(self, text: str) -> str: """转义 HTML 特殊字符,并将换行符转换为
""" escaped = ( text .replace("&", "&") .replace("<", "<") .replace(">", ">") .replace('"', """) .replace("'", "'") ) # 将换行符转换为
标签,保留表格中的多行结构 return escaped.replace("\n", "
") def _escape_html_with_subrows(self, text: str, coord: str) -> str: """ 转义 HTML 并为每个子行添加 span 标记,支持细粒度高亮 例如:单元格内容 "0.017\n0.01\n<0.001" 会生成: 0.017
0.01
<0.001 """ lines = text.split("\n") if len(lines) == 1: # 单行内容,直接转义 return self._escape_single(text) # 多行内容,为每行添加 span result_parts = [] for idx, line in enumerate(lines, start=1): escaped_line = self._escape_single(line) subcoord = f"{coord}S{idx}" result_parts.append(f'{escaped_line}') return "
".join(result_parts) def _escape_single(self, text: str) -> str: """转义单行文本的 HTML 特殊字符""" return ( text .replace("&", "&") .replace("<", "<") .replace(">", ">") .replace('"', """) .replace("'", "'") ) def _detect_table_type(self, caption: Optional[str]) -> str: """ 检测表格类型 Returns: BASELINE / OUTCOME / OTHER """ if not caption: return "OTHER" caption_lower = caption.lower() for keyword in BASELINE_KEYWORDS: if keyword in caption_lower: return "BASELINE" for keyword in OUTCOME_KEYWORDS: if keyword in caption_lower: return "OUTCOME" return "OTHER"