""" 数据侦探模块 - 验证器 包含 L1 算术验证、L2 统计验证、L2.5 一致性取证。 L1 算术验证: - n (%) 格式验证 - Sum/Total 校验 - 容错逻辑 L2 统计验证: - T 检验 P 值逆向验证 - 卡方检验 P 值逆向验证 - CI vs P 值逻辑检查 L2.5 一致性取证(终审提权): - SE 三角验证(回归系数 CI↔P 一致性) - SD > Mean 检查(正值指标启发式规则) """ import re import math from typing import List, Optional, Tuple from loguru import logger # scipy 用于统计计算 try: from scipy import stats SCIPY_AVAILABLE = True except ImportError: SCIPY_AVAILABLE = False logger.warning("scipy 未安装,L2 统计验证将受限") from .types import ( TableData, Issue, Severity, IssueType, CellLocation, ForensicsConfig, ) from .config import ( PERCENT_PATTERN, PVALUE_PATTERN, CI_PATTERN, MEAN_SD_PATTERN, MEAN_SD_PAREN_PATTERN, CI_PATTERNS, EFFECT_SIZE_PATTERN, CHI_SQUARE_PATTERN, DEFAULT_TOLERANCE_PERCENT, PVALUE_ERROR_THRESHOLD, PVALUE_WARNING_THRESHOLD, STAT_RELATIVE_TOLERANCE, ) def _clean_number_string(text: str) -> str: """ 清洗数值字符串中的特殊字符,防止 float() 崩溃 关键清洗:负号归一化(Word 会把 - 转成数学减号或破折号) """ if not text: return "" # 负号归一化(防止 float() 崩溃) text = text.replace('\u2212', '-') # 数学减号 (Minus Sign) text = text.replace('\u2013', '-') # En Dash text = text.replace('\u2014', '-') # Em Dash text = text.replace('\u2010', '-') # Hyphen text = text.replace('\u2011', '-') # Non-Breaking Hyphen # 零宽字符清理 text = text.replace('\u200b', '') # Zero-Width Space text = text.replace('\u00a0', ' ') # Non-Breaking Space -> 普通空格 return text.strip() def _safe_float(text: str) -> Optional[float]: """ 安全的 float 转换,处理特殊字符 Returns: 转换成功返回浮点数,失败返回 None """ try: cleaned = _clean_number_string(text) return float(cleaned) except (ValueError, TypeError): return None class ArithmeticValidator: """ L1 算术自洽性验证器 验证表格中的数值计算是否正确: - n (%) 格式中的百分比是否等于 n/N - Total/Sum 行是否等于其他行之和 """ def __init__(self, config: ForensicsConfig): self.config = config self.tolerance = config.tolerance_percent def validate(self, table: TableData) -> List[Issue]: """ 验证表格的算术一致性 Args: table: 要验证的表格数据 Returns: 发现的问题列表 """ if table.skipped or not table.data: return [] issues: List[Issue] = [] # 1. 验证 n (%) 格式 percent_issues = self._validate_percent_format(table) issues.extend(percent_issues) # 2. 验证 Sum/Total 行 sum_issues = self._validate_sum_rows(table) issues.extend(sum_issues) # 更新表格的 issues table.issues.extend(issues) logger.debug(f"表格 {table.id} 算术验证完成: {len(issues)} 个问题") return issues def _validate_percent_format(self, table: TableData) -> List[Issue]: """ 验证 n (%) 格式 查找形如 "45 (50.0%)" 的单元格,验证百分比是否正确。 需要从表头或同行找到总数 N。 """ issues: List[Issue] = [] data = table.data if len(data) < 2: # 至少需要表头和一行数据 return issues # 尝试从表头识别 N 列(如 "n", "N", "Total", "合计") header = data[0] n_col_indices = self._find_n_columns(header) for row_idx, row in enumerate(data[1:], start=2): # 从第2行开始(数据行) for col_idx, cell in enumerate(row, start=1): # 查找 n (%) 格式 match = PERCENT_PATTERN.search(cell) if match: n_value = float(match.group(1)) reported_percent = float(match.group(2)) # 尝试找到对应的 N 值 total_n = self._find_total_n(data, row_idx - 1, col_idx - 1, n_col_indices) if total_n is not None and total_n > 0: # 计算实际百分比 calculated_percent = (n_value / total_n) * 100 # 检查差异 diff = abs(calculated_percent - reported_percent) if diff > self.tolerance: issues.append(Issue( severity=Severity.ERROR, type=IssueType.ARITHMETIC_PERCENT, message=f"百分比计算错误: 报告值 {reported_percent}%,计算值 {calculated_percent:.1f}% (n={n_value}, N={total_n})", location=CellLocation( table_id=table.id, row=row_idx, col=col_idx ), evidence={ "n": n_value, "N": total_n, "reported_percent": reported_percent, "calculated_percent": round(calculated_percent, 2), "difference": round(diff, 2) } )) return issues def _find_n_columns(self, header: List[str]) -> List[int]: """ 从表头识别可能包含 N 值的列索引 """ n_keywords = ["n", "total", "合计", "总数", "all", "sum"] indices = [] for idx, cell in enumerate(header): cell_lower = cell.lower().strip() for keyword in n_keywords: if keyword in cell_lower: indices.append(idx) break return indices def _find_total_n( self, data: List[List[str]], row_idx: int, col_idx: int, n_col_indices: List[int] ) -> Optional[float]: """ 查找对应的总数 N 策略: 1. 首先检查同行的 N 列 2. 如果没有,检查表头行对应位置 3. 尝试解析同列第一个纯数字 """ row = data[row_idx] # 策略 1:检查同行的 N 列 for n_col in n_col_indices: if n_col < len(row): n_val = self._parse_number(row[n_col]) if n_val is not None and n_val > 0: return n_val # 策略 2:检查同列的第一行(可能是 N 值) if row_idx > 0: first_data_row = data[1] if len(data) > 1 else None if first_data_row and col_idx < len(first_data_row): # 检查是否该列第一行就是数字(Total N) n_val = self._parse_number(first_data_row[col_idx]) if n_val is not None and n_val > 0: return n_val # 策略 3:尝试从同行其他单元格累加 # 这是一个启发式方法,可能不准确 return None def _parse_number(self, text: str) -> Optional[float]: """ 从文本中解析数字 处理: - 纯数字 "45" - 带逗号 "1,234" - 带空格 "1 234" - 负数(含特殊负号字符) """ if not text: return None # 先清洗特殊字符(负号归一化等) cleaned = _clean_number_string(text) # 移除常见分隔符 cleaned = cleaned.replace(",", "").replace(" ", "") # 尝试提取数字(支持负数) match = re.match(r"^(-?\d+(?:\.\d+)?)", cleaned) if match: return _safe_float(match.group(1)) return None def _validate_sum_rows(self, table: TableData) -> List[Issue]: """ 验证 Sum/Total 行 查找标记为 "Total", "Sum", "合计" 的行,验证其值是否等于上方各行之和。 """ issues: List[Issue] = [] data = table.data if len(data) < 3: # 至少需要表头、数据行和合计行 return issues # 查找 Total/Sum 行 total_keywords = ["total", "sum", "合计", "总计", "总和", "all"] for row_idx, row in enumerate(data[1:], start=2): # 跳过表头 first_cell = row[0].lower().strip() if row else "" is_total_row = any(kw in first_cell for kw in total_keywords) if is_total_row: # 验证每个数值列 for col_idx, cell in enumerate(row[1:], start=2): # 跳过第一列 total_val = self._parse_number(cell) if total_val is None: continue # 计算上方各行的和 column_sum = 0.0 valid_sum = True for prev_row_idx in range(1, row_idx - 1): # 从第一个数据行到当前行的上一行 if col_idx - 1 < len(data[prev_row_idx]): prev_cell = data[prev_row_idx][col_idx - 1] prev_val = self._parse_number(prev_cell) if prev_val is not None: column_sum += prev_val else: # 如果有非数字单元格,跳过验证 valid_sum = False break if valid_sum and column_sum > 0: diff = abs(total_val - column_sum) # 允许小数点误差 if diff > 0.5: # 容错 0.5 issues.append(Issue( severity=Severity.ERROR, type=IssueType.ARITHMETIC_SUM, message=f"合计行计算错误: 报告值 {total_val},计算值 {column_sum}", location=CellLocation( table_id=table.id, row=row_idx, col=col_idx ), evidence={ "reported_total": total_val, "calculated_sum": column_sum, "difference": round(diff, 2) } )) return issues class StatValidator: """ L2 统计学复核验证器 + L2.5 一致性取证 验证统计检验结果的合理性: - T 检验 P 值逆向验证 - 卡方检验 P 值逆向验证(基于频数表) - CI 与 P 值逻辑一致性检查 - SE 三角验证(回归系数 CI↔P 一致性) - SD > Mean 检查(正值指标启发式规则) """ def __init__(self, config: ForensicsConfig): self.config = config def validate(self, table: TableData, full_text: str) -> List[Issue]: """ 验证表格的统计学一致性 Args: table: 要验证的表格数据 full_text: 文档全文(用于方法识别) Returns: 发现的问题列表 """ if table.skipped or not table.data: return [] # 仅在 L1_L2 模式下执行 if self.config.check_level != "L1_L2": return [] issues: List[Issue] = [] # 1. CI vs P 值逻辑检查(基础) ci_issues = self._validate_ci_pvalue_consistency(table) issues.extend(ci_issues) # 2. T 检验逆向验证 if SCIPY_AVAILABLE: ttest_issues = self._validate_ttest(table) issues.extend(ttest_issues) # 2.5. 卡方检验逆向验证 if SCIPY_AVAILABLE: chi2_issues = self._validate_chi_square(table) issues.extend(chi2_issues) # 3. SE 三角验证(终审提权:回归系数 CI↔P 一致性) se_issues = self._validate_se_triangle(table) issues.extend(se_issues) # 4. SD > Mean 检查(终审提权:启发式规则) sd_issues = self._validate_sd_greater_mean(table) issues.extend(sd_issues) # 更新表格的 issues table.issues.extend(issues) logger.debug(f"表格 {table.id} 统计验证完成: {len(issues)} 个问题") return issues def _validate_ci_pvalue_consistency(self, table: TableData) -> List[Issue]: """ 验证 CI 与 P 值的逻辑一致性 黄金法则: - 若 95% CI 跨越 1.0(如 0.8-1.2)→ P 值必须 ≥ 0.05 - 若 95% CI 不跨越 1.0(如 1.1-1.5)→ P 值必须 < 0.05 违反此规则 = 数据逻辑矛盾 改进:支持多行单元格的 subrow 精确定位 """ issues: List[Issue] = [] data = table.data if len(data) < 2: return issues header = data[0] if data else [] pvalue_col_idx = self._find_pvalue_column(header) for row_idx, row in enumerate(data[1:], start=2): # 获取 P 值列内容(可能有多行) pvalue_cell = row[pvalue_col_idx] if pvalue_col_idx < len(row) else "" pvalue_lines = pvalue_cell.split("\n") if pvalue_cell else [] # 获取第一列内容(用于描述) first_cell_lines = row[0].split("\n") if row else [] # 整行文本用于查找 CI row_text = " ".join(row) # 查找所有 CI all_ci_results = [] for pattern in CI_PATTERNS: for match in pattern.finditer(row_text): ci_lower = _safe_float(match.group(1)) ci_upper = _safe_float(match.group(2)) if ci_lower is not None and ci_upper is not None and ci_lower < ci_upper: all_ci_results.append((ci_lower, ci_upper)) if not all_ci_results: # 回退到单个 CI 解析 ci_result = self._parse_ci(row_text) if ci_result: all_ci_results.append(ci_result) if not all_ci_results: continue # 遍历 P 值行进行验证 for line_idx, pvalue_line in enumerate(pvalue_lines): pvalue = self._parse_pvalue_flexible(pvalue_line) if pvalue is None: continue # 获取行描述 row_desc = first_cell_lines[line_idx] if line_idx < len(first_cell_lines) else f"第{line_idx+1}项" # 使用对应的 CI(如果有多个 CI,按顺序匹配) ci_idx = min(line_idx, len(all_ci_results) - 1) ci_lower, ci_upper = all_ci_results[ci_idx] # 检查逻辑一致性 ci_crosses_one = ci_lower <= 1.0 <= ci_upper p_significant = pvalue < 0.05 # 计算 subrow 索引 subrow_idx = line_idx + 1 if len(pvalue_lines) > 1 else None # 矛盾情况 if ci_crosses_one and p_significant: issues.append(Issue( severity=Severity.ERROR, type=IssueType.STAT_CI_PVALUE_CONFLICT, message=f"CI 与 P 值逻辑矛盾 [{row_desc.strip()}]: 95% CI ({ci_lower}-{ci_upper}) 跨越 1.0,但 P={pvalue} < 0.05", location=CellLocation( table_id=table.id, row=row_idx, col=pvalue_col_idx + 1, subrow=subrow_idx ), evidence={ "ci_lower": ci_lower, "ci_upper": ci_upper, "ci_crosses_one": ci_crosses_one, "pvalue": pvalue, "p_significant": p_significant } )) elif not ci_crosses_one and not p_significant: issues.append(Issue( severity=Severity.ERROR, type=IssueType.STAT_CI_PVALUE_CONFLICT, message=f"CI 与 P 值逻辑矛盾 [{row_desc.strip()}]: 95% CI ({ci_lower}-{ci_upper}) 不跨越 1.0,但 P={pvalue} ≥ 0.05", location=CellLocation( table_id=table.id, row=row_idx, col=pvalue_col_idx + 1, subrow=subrow_idx ), evidence={ "ci_lower": ci_lower, "ci_upper": ci_upper, "ci_crosses_one": ci_crosses_one, "pvalue": pvalue, "p_significant": p_significant } )) return issues def _validate_ttest(self, table: TableData) -> List[Issue]: """ T 检验逆向验证 从表格中提取 M±SD, n 信息,反推 t 值和 P 值, 与报告的 P 值进行对比。 公式: t = (M1 - M2) / sqrt(SD1²/n1 + SD2²/n2) 改进: 1. 智能样本量提取(表头、行首、上下文) 2. 支持多种 Mean±SD 格式 3. 支持多行单元格的 subrow 精确定位 """ issues: List[Issue] = [] if not SCIPY_AVAILABLE: return issues data = table.data if len(data) < 2: return issues header = data[0] if data else [] # 预先从表头提取样本量 n1, n2 = self._extract_sample_sizes_from_header(header) # 查找 P 值列的索引 pvalue_col_idx = self._find_pvalue_column(header) # 查找包含组比较数据的行 for row_idx, row in enumerate(data[1:], start=2): row_text = " ".join(row) # 尝试提取同一行中的两组 Mean±SD 数据 mean_sd_matches = list(MEAN_SD_PATTERN.finditer(row_text)) # 如果没找到,尝试括号格式 if len(mean_sd_matches) < 2: mean_sd_matches = list(MEAN_SD_PAREN_PATTERN.finditer(row_text)) if len(mean_sd_matches) < 2: continue # 找到至少两组 Mean±SD 数据 try: m1 = _safe_float(mean_sd_matches[0].group(1)) sd1 = _safe_float(mean_sd_matches[0].group(2)) m2 = _safe_float(mean_sd_matches[1].group(1)) sd2 = _safe_float(mean_sd_matches[1].group(2)) if None in (m1, sd1, m2, sd2): continue # 如果表头没有样本量,尝试从行中提取 local_n1, local_n2 = n1, n2 if local_n1 is None or local_n2 is None: local_n1, local_n2 = self._extract_sample_sizes_from_row(row, header) # 仍然没有样本量,跳过 if local_n1 is None or local_n2 is None: continue # 计算 SE 和 t 值 se = math.sqrt(sd1**2/local_n1 + sd2**2/local_n2) if se == 0: continue t_calc = abs(m1 - m2) / se df = local_n1 + local_n2 - 2 # 计算 P 值 p_calc = 2 * (1 - stats.t.cdf(t_calc, df)) # 从 P 值列提取报告的 P 值 pvalue_cell = row[pvalue_col_idx] if pvalue_col_idx < len(row) else "" pvalue_lines = pvalue_cell.split("\n") if pvalue_cell else [] # 尝试从整行提取 P 值(如果 P 值列没有) if not pvalue_lines or not any(pvalue_lines): pvalue = self._parse_pvalue_flexible(row_text) if pvalue is None: continue pvalue_lines = [str(pvalue)] subrow_idx = None pvalue_col = pvalue_col_idx + 1 else: # 遍历 P 值单元格的每一行 for line_idx, pvalue_line in enumerate(pvalue_lines): pvalue = self._parse_pvalue_flexible(pvalue_line) if pvalue is None: continue # 计算子行索引 subrow_idx = line_idx + 1 if len(pvalue_lines) > 1 else None pvalue_col = pvalue_col_idx + 1 # 比较 P 值 p_diff = abs(p_calc - pvalue) # 获取行描述 first_cell_lines = row[0].split("\n") if row else [] row_desc = first_cell_lines[line_idx] if line_idx < len(first_cell_lines) else row[0][:20] if row else "" if p_diff > PVALUE_ERROR_THRESHOLD: issues.append(Issue( severity=Severity.ERROR, type=IssueType.STAT_TTEST_PVALUE, message=f"T 检验 P 值矛盾 [{row_desc.strip()}]: 报告 P={pvalue},计算 P={p_calc:.4f}(差异 {p_diff:.3f})", location=CellLocation( table_id=table.id, row=row_idx, col=pvalue_col, subrow=subrow_idx ), evidence={ "group1": {"mean": m1, "sd": sd1, "n": local_n1}, "group2": {"mean": m2, "sd": sd2, "n": local_n2}, "t_calculated": round(t_calc, 3), "df": df, "p_calculated": round(p_calc, 4), "p_reported": pvalue, "p_difference": round(p_diff, 4) } )) elif p_diff > PVALUE_WARNING_THRESHOLD: issues.append(Issue( severity=Severity.WARNING, type=IssueType.STAT_TTEST_PVALUE, message=f"T 检验 P 值轻微偏差 [{row_desc.strip()}]: 报告 P={pvalue},计算 P={p_calc:.4f}", location=CellLocation( table_id=table.id, row=row_idx, col=pvalue_col, subrow=subrow_idx ), evidence={ "p_calculated": round(p_calc, 4), "p_reported": pvalue, "p_difference": round(p_diff, 4) } )) continue # 已处理完此行的所有 P 值 # 单个 P 值的情况 pvalue = self._parse_pvalue_flexible(pvalue_lines[0]) if pvalue_lines else None if pvalue is None: continue p_diff = abs(p_calc - pvalue) if p_diff > PVALUE_ERROR_THRESHOLD: issues.append(Issue( severity=Severity.ERROR, type=IssueType.STAT_TTEST_PVALUE, message=f"T 检验 P 值不一致: 报告 P={pvalue},计算 P={p_calc:.4f}(差异 {p_diff:.3f})", location=CellLocation( table_id=table.id, row=row_idx, col=pvalue_col_idx + 1 ), evidence={ "group1": {"mean": m1, "sd": sd1, "n": local_n1}, "group2": {"mean": m2, "sd": sd2, "n": local_n2}, "t_calculated": round(t_calc, 3), "df": df, "p_calculated": round(p_calc, 4), "p_reported": pvalue, "p_difference": round(p_diff, 4) } )) elif p_diff > PVALUE_WARNING_THRESHOLD: issues.append(Issue( severity=Severity.WARNING, type=IssueType.STAT_TTEST_PVALUE, message=f"T 检验 P 值轻微偏差: 报告 P={pvalue},计算 P={p_calc:.4f}", location=CellLocation( table_id=table.id, row=row_idx, col=pvalue_col_idx + 1 ), evidence={ "p_calculated": round(p_calc, 4), "p_reported": pvalue, "p_difference": round(p_diff, 4) } )) except (ValueError, TypeError, ZeroDivisionError) as e: logger.debug(f"T 检验验证失败: {e}") continue return issues def _extract_sample_sizes_from_header(self, header: List[str]) -> Tuple[Optional[int], Optional[int]]: """ 从表头提取样本量 支持格式: - (n=50) - n=50 - N=50 - (50例) - 对照组(n=48) """ n_pattern = re.compile(r"[(\[(]?\s*[nN]\s*[=::]\s*(\d+)\s*[)\])]?") n_pattern_cn = re.compile(r"[(\[(]?\s*(\d+)\s*例\s*[)\])]?") n_values = [] for cell in header: # 优先匹配 n=XX 格式 match = n_pattern.search(cell) if match: try: n_values.append(int(match.group(1))) except ValueError: pass continue # 尝试中文格式 match = n_pattern_cn.search(cell) if match: try: n_values.append(int(match.group(1))) except ValueError: pass if len(n_values) >= 2: return n_values[0], n_values[1] return None, None def _extract_sample_sizes_from_row( self, row: List[str], header: List[str] ) -> Tuple[Optional[int], Optional[int]]: """ 从数据行提取样本量 策略: 1. 查找行首的 n 值 2. 查找与 Mean±SD 列对应的 n 列 """ row_text = " ".join(row) n_pattern = re.compile(r"\(\s*[nN]\s*[=::]\s*(\d+)\s*\)") matches = n_pattern.findall(row_text) if len(matches) >= 2: try: return int(matches[0]), int(matches[1]) except ValueError: pass return None, None def _validate_chi_square(self, table: TableData) -> List[Issue]: """ 卡方检验逆向验证 从报告的 χ² 值和推断的自由度,反算 P 值,与报告值对比。 原理: - 查找 χ²=X.XXX 和对应的 P 值 - 估计自由度(默认 df=1,适用于大多数 2x2 比较) - 使用卡方分布计算 P 值 - 与报告的 P 值对比 特殊处理: - 支持多段落单元格(一个单元格内多行数据) - 支持 P 值列没有 "P=" 前缀的情况(直接是数值) 适用场景: - 医学基线特征表(分类变量比较) - 任何报告 χ² 值和 P 值的表格 """ issues: List[Issue] = [] if not SCIPY_AVAILABLE: return issues data = table.data if len(data) < 2: return issues # 首先识别表头,找到 P 值列 header = data[0] pvalue_col_idx = self._find_pvalue_column(header) chi2_col_idx = self._find_stat_column(header) for row_idx, row in enumerate(data[1:], start=2): # 获取统计值和 P 值单元格 stat_cell = row[chi2_col_idx] if chi2_col_idx < len(row) else "" pvalue_cell = row[pvalue_col_idx] if pvalue_col_idx < len(row) else "" # 处理多行单元格:按换行符分割 stat_lines = stat_cell.split("\n") if stat_cell else [] pvalue_lines = pvalue_cell.split("\n") if pvalue_cell else [] # 逐行匹配卡方值和 P 值 for line_idx in range(max(len(stat_lines), len(pvalue_lines))): stat_line = stat_lines[line_idx] if line_idx < len(stat_lines) else "" pvalue_line = pvalue_lines[line_idx] if line_idx < len(pvalue_lines) else "" # 查找 χ² 值 chi2_match = CHI_SQUARE_PATTERN.search(stat_line) if not chi2_match: continue chi2_value = _safe_float(chi2_match.group(1)) if chi2_value is None or chi2_value <= 0: continue # 解析 P 值(支持多种格式) pvalue = self._parse_pvalue_flexible(pvalue_line) if pvalue is None: continue # 默认 df=1(最常见的 2x2 比较场景) df = 1 try: # 使用卡方分布计算 P 值 p_calc = 1 - stats.chi2.cdf(chi2_value, df) # 比较 P 值 p_diff = abs(p_calc - pvalue) # 检查显著性是否一致 p_significant_reported = pvalue < 0.05 p_significant_calc = p_calc < 0.05 significance_mismatch = p_significant_reported != p_significant_calc # 获取子行描述(从第一列提取) first_cell_lines = row[0].split("\n") if row else [] sub_row_desc = first_cell_lines[line_idx] if line_idx < len(first_cell_lines) else f"子行 {line_idx + 1}" # 计算子行索引(从 1 开始),用于前端精确高亮 subrow_idx = line_idx + 1 if len(pvalue_lines) > 1 else None if significance_mismatch: issues.append(Issue( severity=Severity.ERROR, type=IssueType.STAT_CHI2_PVALUE, message=f"卡方检验 P 值矛盾 [{sub_row_desc.strip()}]: χ²={chi2_value}, 报告 P={pvalue}, 计算 P={p_calc:.4f},显著性不一致", location=CellLocation( table_id=table.id, row=row_idx, col=pvalue_col_idx + 1, subrow=subrow_idx ), evidence={ "chi2_value": chi2_value, "df": df, "p_calculated": round(p_calc, 4), "p_reported": pvalue, "p_difference": round(p_diff, 4), "sub_row": sub_row_desc.strip(), "significance_reported": "显著" if p_significant_reported else "不显著", "significance_calculated": "显著" if p_significant_calc else "不显著" } )) elif p_diff > PVALUE_ERROR_THRESHOLD: issues.append(Issue( severity=Severity.WARNING, type=IssueType.STAT_CHI2_PVALUE, message=f"卡方检验 P 值偏差 [{sub_row_desc.strip()}]: χ²={chi2_value}, 报告 P={pvalue}, 计算 P={p_calc:.4f},差异 {p_diff:.3f}", location=CellLocation( table_id=table.id, row=row_idx, col=pvalue_col_idx + 1, subrow=subrow_idx ), evidence={ "chi2_value": chi2_value, "df": df, "p_calculated": round(p_calc, 4), "p_reported": pvalue, "p_difference": round(p_diff, 4), "sub_row": sub_row_desc.strip() } )) except (ValueError, ZeroDivisionError, TypeError) as e: logger.debug(f"卡方检验验证失败: {e}") continue return issues def _find_pvalue_column(self, header: List[str]) -> int: """查找 P 值列的索引""" p_keywords = ["p值", "pvalue", "p-value", "p 值", "sig"] for idx, cell in enumerate(header): cell_lower = cell.lower().strip() for kw in p_keywords: if kw in cell_lower: return idx # 默认最后一列 return len(header) - 1 def _find_stat_column(self, header: List[str]) -> int: """查找统计值列的索引(包含 χ²/t/Z 等)""" stat_keywords = ["统计", "stat", "χ", "chi", "t值", "z值"] for idx, cell in enumerate(header): cell_lower = cell.lower().strip() for kw in stat_keywords: if kw in cell_lower: return idx # 默认倒数第二列 return len(header) - 2 def _parse_pvalue_flexible(self, text: str) -> Optional[float]: """ 灵活解析 P 值 支持格式: - P=0.05 - P<0.001 - 0.05(直接数值) - <0.001(全角符号) """ if not text: return None # 先清洗特殊字符(负号归一化等) text = _clean_number_string(text) # 先尝试标准 P 值格式 match = PVALUE_PATTERN.search(text) if match: val = _safe_float(match.group(1)) if val is not None: return val # 处理 <0.001 或 <0.001 格式 less_than_match = re.search(r"[<<]\s*(\d+\.?\d*)", text) if less_than_match: val = _safe_float(less_than_match.group(1)) if val is not None: return val # 直接尝试解析为数字 val = _safe_float(text) if val is not None and 0 <= val <= 1: # P 值范围检查 return val return None def _validate_se_triangle(self, table: TableData) -> List[Issue]: """ SE 三角验证(终审提权) 用于 Logistic 回归、Cox 回归等场景。 原理: - SE = (ln(CI_upper) - ln(CI_lower)) / 3.92 - Z = ln(OR) / SE - P_calculated = 2 * (1 - norm.cdf(|Z|)) 若报告的 P 值与计算的 P 值严重不一致,则存在问题。 改进:支持多行单元格的 subrow 精确定位 """ issues: List[Issue] = [] data = table.data if not SCIPY_AVAILABLE: return issues header = data[0] if data else [] pvalue_col_idx = self._find_pvalue_column(header) for row_idx, row in enumerate(data[1:], start=2): # 获取 P 值列的内容(可能有多行) pvalue_cell = row[pvalue_col_idx] if pvalue_col_idx < len(row) else "" pvalue_lines = pvalue_cell.split("\n") if pvalue_cell else [] # 获取第一列内容(用于描述) first_cell_lines = row[0].split("\n") if row else [] # 将整行连接起来查找 OR/HR/RR 和 CI row_text = " ".join(row) # 查找所有 OR/HR/RR(可能有多个) effect_matches = list(EFFECT_SIZE_PATTERN.finditer(row_text)) if not effect_matches: continue # 查找所有 CI ci_matches = [] for pattern in CI_PATTERNS: ci_matches.extend(list(pattern.finditer(row_text))) if not ci_matches: continue # 遍历 P 值行,尝试匹配对应的 OR/CI for line_idx, pvalue_line in enumerate(pvalue_lines): pvalue = self._parse_pvalue_flexible(pvalue_line) if pvalue is None: continue # 获取当前行的描述 row_desc = first_cell_lines[line_idx] if line_idx < len(first_cell_lines) else f"第{line_idx+1}项" # 使用第一个有效的 OR/CI 组合进行验证 for effect_match in effect_matches: effect_size = _safe_float(effect_match.group(1)) if effect_size is None or effect_size <= 0: continue # 查找对应的 CI ci_result = self._parse_ci(row_text) if ci_result is None: continue ci_lower, ci_upper = ci_result # 确保 CI 有效 if ci_lower <= 0 or ci_upper <= 0 or ci_lower >= ci_upper: continue try: # SE 三角计算 ln_effect = math.log(effect_size) ln_ci_lower = math.log(ci_lower) ln_ci_upper = math.log(ci_upper) # SE = (ln(CI_upper) - ln(CI_lower)) / 3.92 (for 95% CI) se = (ln_ci_upper - ln_ci_lower) / 3.92 if se <= 0: continue # Z = ln(OR) / SE z = abs(ln_effect) / se # P = 2 * (1 - norm.cdf(|Z|)) p_calc = 2 * (1 - stats.norm.cdf(z)) # 比较 P 值 p_diff = abs(p_calc - pvalue) # 计算 subrow 索引 subrow_idx = line_idx + 1 if len(pvalue_lines) > 1 else None if p_diff > PVALUE_ERROR_THRESHOLD: issues.append(Issue( severity=Severity.ERROR, type=IssueType.STAT_SE_TRIANGLE, message=f"SE 三角验证不一致 [{row_desc.strip()}]: OR={effect_size}, 报告 P={pvalue},由 CI 反推 P={p_calc:.4f}", location=CellLocation( table_id=table.id, row=row_idx, col=pvalue_col_idx + 1, subrow=subrow_idx ), evidence={ "effect_size": effect_size, "ci_lower": ci_lower, "ci_upper": ci_upper, "se_calculated": round(se, 4), "z_calculated": round(z, 3), "p_calculated": round(p_calc, 4), "p_reported": pvalue, "p_difference": round(p_diff, 4) } )) elif p_diff > PVALUE_WARNING_THRESHOLD: issues.append(Issue( severity=Severity.WARNING, type=IssueType.STAT_SE_TRIANGLE, message=f"SE 三角验证轻微偏差 [{row_desc.strip()}]: OR={effect_size}, 报告 P={pvalue},计算 P={p_calc:.4f}", location=CellLocation( table_id=table.id, row=row_idx, col=pvalue_col_idx + 1, subrow=subrow_idx ), evidence={ "effect_size": effect_size, "p_calculated": round(p_calc, 4), "p_reported": pvalue, "p_difference": round(p_diff, 4) } )) # 找到有效匹配后跳出 effect_match 循环 break except (ValueError, ZeroDivisionError, TypeError) as e: logger.debug(f"SE 三角验证失败: {e}") continue return issues def _validate_sd_greater_mean(self, table: TableData) -> List[Issue]: """ SD > Mean 启发式检查(终审提权) 对于正值指标(如年龄、体重、血压、实验室指标), SD > Mean 通常是不合理的,可能暗示数据问题。 例外情况: - 差值指标(可正可负) - 某些偏态分布指标 """ issues: List[Issue] = [] data = table.data # 识别表头,判断哪些列是正值指标 if len(data) < 2: return issues header = data[0] # 正值指标的关键词(这些指标通常不应有 SD > Mean) positive_indicators = [ "age", "年龄", "weight", "体重", "bmi", "height", "身高", "sbp", "dbp", "血压", "heart rate", "心率", "pulse", "脉搏", "wbc", "rbc", "hgb", "plt", "白细胞", "红细胞", "血红蛋白", "血小板", "creatinine", "肌酐", "bun", "尿素氮", "glucose", "血糖", "alt", "ast", "转氨酶", "bilirubin", "胆红素", "cost", "费用", "time", "时间", "duration", "持续" ] for row_idx, row in enumerate(data[1:], start=2): for col_idx, cell in enumerate(row, start=1): # 检查 Mean±SD 格式 match = MEAN_SD_PATTERN.search(cell) if not match: # 尝试括号格式 match = MEAN_SD_PAREN_PATTERN.search(cell) if not match: continue mean_val = _safe_float(match.group(1)) sd_val = _safe_float(match.group(2)) if mean_val is None or sd_val is None: continue # 检查 SD > Mean(仅对 mean > 0 的情况) if mean_val > 0 and sd_val > mean_val: # 检查是否是正值指标(通过表头或行首判断) context_text = "" if col_idx - 1 < len(header): context_text += header[col_idx - 1].lower() if len(row) > 0: context_text += " " + row[0].lower() # 判断是否是已知的正值指标 is_positive_indicator = any(kw in context_text for kw in positive_indicators) # 计算 CV(变异系数) cv = sd_val / mean_val if mean_val != 0 else 0 if is_positive_indicator: # 已知正值指标,SD > Mean 是错误 issues.append(Issue( severity=Severity.ERROR, type=IssueType.STAT_SD_GREATER_MEAN, message=f"SD 大于 Mean 异常: {mean_val}±{sd_val},CV={cv:.1%},该指标通常为正值", location=CellLocation( table_id=table.id, row=row_idx, col=col_idx ), evidence={ "mean": mean_val, "sd": sd_val, "cv": round(cv, 3), "context": context_text[:50] } )) else: # 未确定的指标,给出警告 issues.append(Issue( severity=Severity.WARNING, type=IssueType.STAT_SD_GREATER_MEAN, message=f"SD 大于 Mean: {mean_val}±{sd_val},CV={cv:.1%},建议核查数据分布", location=CellLocation( table_id=table.id, row=row_idx, col=col_idx ), evidence={ "mean": mean_val, "sd": sd_val, "cv": round(cv, 3) } )) return issues # ==================== 辅助方法 ==================== def _parse_ci(self, text: str) -> Optional[Tuple[float, float]]: """ 解析 CI 字符串,支持多种格式(终审建议) 支持格式: - 2.5 (1.1-3.5) - 2.5 (1.1, 3.5) - 2.5 [1.1; 3.5] - 95% CI: 1.1-3.5 - 95% CI 1.1 to 3.5 """ for pattern in CI_PATTERNS: match = pattern.search(text) if match: try: lower = _safe_float(match.group(1)) upper = _safe_float(match.group(2)) if lower is not None and upper is not None and lower < upper: return lower, upper except IndexError: continue # 回退到原始的 CI_PATTERN match = CI_PATTERN.search(text) if match: lower = _safe_float(match.group(1)) upper = _safe_float(match.group(2)) if lower is not None and upper is not None and lower < upper: return lower, upper return None def _parse_pvalue(self, text: str) -> Optional[float]: """ 解析 P 值 处理: - P=0.05 - P<0.001 - P>0.05 - p值=0.05 """ match = PVALUE_PATTERN.search(text) if match: return _safe_float(match.group(1)) return None