Files
HaHafeng f9ed0c2528 feat(rvw): Complete V2.0 Week 3 - Statistical validation extension and UX improvements
Week 3 Development Summary:

- Implement negative sign normalization (6 Unicode variants)

- Enhance T-test validation with smart sample size extraction

- Enhance SE triangle and CI-P consistency validation with subrow support

- Add precise sub-cell highlighting for P-values in multi-line cells

- Add frontend issue type Chinese translations (6 new types)

- Add file format tips for PDF/DOC uploads

Technical improvements:

- Add _clean_statistical_text() in extractor.py

- Add _safe_float() wrapper in validator.py

- Add ForensicsReport.tsx component

- Update ISSUE_TYPE_LABELS translations

Documentation:

- Add 2026-02-18 development record

- Update RVW module status (v5.1)

- Update system status (v5.2)

Status: Week 3 complete, ready for Week 4 testing
Co-authored-by: Cursor <cursoragent@cursor.com>
2026-02-18 18:26:16 +08:00

1259 lines
49 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
数据侦探模块 - 验证器
包含 L1 算术验证、L2 统计验证、L2.5 一致性取证。
L1 算术验证:
- n (%) 格式验证
- Sum/Total 校验
- 容错逻辑
L2 统计验证:
- T 检验 P 值逆向验证
- 卡方检验 P 值逆向验证
- CI vs P 值逻辑检查
L2.5 一致性取证(终审提权):
- SE 三角验证(回归系数 CI↔P 一致性)
- SD > Mean 检查(正值指标启发式规则)
"""
import re
import math
from typing import List, Optional, Tuple
from loguru import logger
# scipy 用于统计计算
try:
from scipy import stats
SCIPY_AVAILABLE = True
except ImportError:
SCIPY_AVAILABLE = False
logger.warning("scipy 未安装L2 统计验证将受限")
from .types import (
TableData,
Issue,
Severity,
IssueType,
CellLocation,
ForensicsConfig,
)
from .config import (
PERCENT_PATTERN,
PVALUE_PATTERN,
CI_PATTERN,
MEAN_SD_PATTERN,
MEAN_SD_PAREN_PATTERN,
CI_PATTERNS,
EFFECT_SIZE_PATTERN,
CHI_SQUARE_PATTERN,
DEFAULT_TOLERANCE_PERCENT,
PVALUE_ERROR_THRESHOLD,
PVALUE_WARNING_THRESHOLD,
STAT_RELATIVE_TOLERANCE,
)
def _clean_number_string(text: str) -> str:
"""
清洗数值字符串中的特殊字符,防止 float() 崩溃
关键清洗负号归一化Word 会把 - 转成数学减号或破折号)
"""
if not text:
return ""
# 负号归一化(防止 float() 崩溃)
text = text.replace('\u2212', '-') # 数学减号 (Minus Sign)
text = text.replace('\u2013', '-') # En Dash
text = text.replace('\u2014', '-') # Em Dash
text = text.replace('\u2010', '-') # Hyphen
text = text.replace('\u2011', '-') # Non-Breaking Hyphen
# 零宽字符清理
text = text.replace('\u200b', '') # Zero-Width Space
text = text.replace('\u00a0', ' ') # Non-Breaking Space -> 普通空格
return text.strip()
def _safe_float(text: str) -> Optional[float]:
"""
安全的 float 转换,处理特殊字符
Returns:
转换成功返回浮点数,失败返回 None
"""
try:
cleaned = _clean_number_string(text)
return float(cleaned)
except (ValueError, TypeError):
return None
class ArithmeticValidator:
"""
L1 算术自洽性验证器
验证表格中的数值计算是否正确:
- n (%) 格式中的百分比是否等于 n/N
- Total/Sum 行是否等于其他行之和
"""
def __init__(self, config: ForensicsConfig):
self.config = config
self.tolerance = config.tolerance_percent
def validate(self, table: TableData) -> List[Issue]:
"""
验证表格的算术一致性
Args:
table: 要验证的表格数据
Returns:
发现的问题列表
"""
if table.skipped or not table.data:
return []
issues: List[Issue] = []
# 1. 验证 n (%) 格式
percent_issues = self._validate_percent_format(table)
issues.extend(percent_issues)
# 2. 验证 Sum/Total 行
sum_issues = self._validate_sum_rows(table)
issues.extend(sum_issues)
# 更新表格的 issues
table.issues.extend(issues)
logger.debug(f"表格 {table.id} 算术验证完成: {len(issues)} 个问题")
return issues
def _validate_percent_format(self, table: TableData) -> List[Issue]:
"""
验证 n (%) 格式
查找形如 "45 (50.0%)" 的单元格,验证百分比是否正确。
需要从表头或同行找到总数 N。
"""
issues: List[Issue] = []
data = table.data
if len(data) < 2: # 至少需要表头和一行数据
return issues
# 尝试从表头识别 N 列(如 "n", "N", "Total", "合计"
header = data[0]
n_col_indices = self._find_n_columns(header)
for row_idx, row in enumerate(data[1:], start=2): # 从第2行开始数据行
for col_idx, cell in enumerate(row, start=1):
# 查找 n (%) 格式
match = PERCENT_PATTERN.search(cell)
if match:
n_value = float(match.group(1))
reported_percent = float(match.group(2))
# 尝试找到对应的 N 值
total_n = self._find_total_n(data, row_idx - 1, col_idx - 1, n_col_indices)
if total_n is not None and total_n > 0:
# 计算实际百分比
calculated_percent = (n_value / total_n) * 100
# 检查差异
diff = abs(calculated_percent - reported_percent)
if diff > self.tolerance:
issues.append(Issue(
severity=Severity.ERROR,
type=IssueType.ARITHMETIC_PERCENT,
message=f"百分比计算错误: 报告值 {reported_percent}%,计算值 {calculated_percent:.1f}% (n={n_value}, N={total_n})",
location=CellLocation(
table_id=table.id,
row=row_idx,
col=col_idx
),
evidence={
"n": n_value,
"N": total_n,
"reported_percent": reported_percent,
"calculated_percent": round(calculated_percent, 2),
"difference": round(diff, 2)
}
))
return issues
def _find_n_columns(self, header: List[str]) -> List[int]:
"""
从表头识别可能包含 N 值的列索引
"""
n_keywords = ["n", "total", "合计", "总数", "all", "sum"]
indices = []
for idx, cell in enumerate(header):
cell_lower = cell.lower().strip()
for keyword in n_keywords:
if keyword in cell_lower:
indices.append(idx)
break
return indices
def _find_total_n(
self,
data: List[List[str]],
row_idx: int,
col_idx: int,
n_col_indices: List[int]
) -> Optional[float]:
"""
查找对应的总数 N
策略:
1. 首先检查同行的 N 列
2. 如果没有,检查表头行对应位置
3. 尝试解析同列第一个纯数字
"""
row = data[row_idx]
# 策略 1检查同行的 N 列
for n_col in n_col_indices:
if n_col < len(row):
n_val = self._parse_number(row[n_col])
if n_val is not None and n_val > 0:
return n_val
# 策略 2检查同列的第一行可能是 N 值)
if row_idx > 0:
first_data_row = data[1] if len(data) > 1 else None
if first_data_row and col_idx < len(first_data_row):
# 检查是否该列第一行就是数字Total N
n_val = self._parse_number(first_data_row[col_idx])
if n_val is not None and n_val > 0:
return n_val
# 策略 3尝试从同行其他单元格累加
# 这是一个启发式方法,可能不准确
return None
def _parse_number(self, text: str) -> Optional[float]:
"""
从文本中解析数字
处理:
- 纯数字 "45"
- 带逗号 "1,234"
- 带空格 "1 234"
- 负数(含特殊负号字符)
"""
if not text:
return None
# 先清洗特殊字符(负号归一化等)
cleaned = _clean_number_string(text)
# 移除常见分隔符
cleaned = cleaned.replace(",", "").replace(" ", "")
# 尝试提取数字(支持负数)
match = re.match(r"^(-?\d+(?:\.\d+)?)", cleaned)
if match:
return _safe_float(match.group(1))
return None
def _validate_sum_rows(self, table: TableData) -> List[Issue]:
"""
验证 Sum/Total 行
查找标记为 "Total", "Sum", "合计" 的行,验证其值是否等于上方各行之和。
"""
issues: List[Issue] = []
data = table.data
if len(data) < 3: # 至少需要表头、数据行和合计行
return issues
# 查找 Total/Sum 行
total_keywords = ["total", "sum", "合计", "总计", "总和", "all"]
for row_idx, row in enumerate(data[1:], start=2): # 跳过表头
first_cell = row[0].lower().strip() if row else ""
is_total_row = any(kw in first_cell for kw in total_keywords)
if is_total_row:
# 验证每个数值列
for col_idx, cell in enumerate(row[1:], start=2): # 跳过第一列
total_val = self._parse_number(cell)
if total_val is None:
continue
# 计算上方各行的和
column_sum = 0.0
valid_sum = True
for prev_row_idx in range(1, row_idx - 1): # 从第一个数据行到当前行的上一行
if col_idx - 1 < len(data[prev_row_idx]):
prev_cell = data[prev_row_idx][col_idx - 1]
prev_val = self._parse_number(prev_cell)
if prev_val is not None:
column_sum += prev_val
else:
# 如果有非数字单元格,跳过验证
valid_sum = False
break
if valid_sum and column_sum > 0:
diff = abs(total_val - column_sum)
# 允许小数点误差
if diff > 0.5: # 容错 0.5
issues.append(Issue(
severity=Severity.ERROR,
type=IssueType.ARITHMETIC_SUM,
message=f"合计行计算错误: 报告值 {total_val},计算值 {column_sum}",
location=CellLocation(
table_id=table.id,
row=row_idx,
col=col_idx
),
evidence={
"reported_total": total_val,
"calculated_sum": column_sum,
"difference": round(diff, 2)
}
))
return issues
class StatValidator:
"""
L2 统计学复核验证器 + L2.5 一致性取证
验证统计检验结果的合理性:
- T 检验 P 值逆向验证
- 卡方检验 P 值逆向验证(基于频数表)
- CI 与 P 值逻辑一致性检查
- SE 三角验证(回归系数 CI↔P 一致性)
- SD > Mean 检查(正值指标启发式规则)
"""
def __init__(self, config: ForensicsConfig):
self.config = config
def validate(self, table: TableData, full_text: str) -> List[Issue]:
"""
验证表格的统计学一致性
Args:
table: 要验证的表格数据
full_text: 文档全文(用于方法识别)
Returns:
发现的问题列表
"""
if table.skipped or not table.data:
return []
# 仅在 L1_L2 模式下执行
if self.config.check_level != "L1_L2":
return []
issues: List[Issue] = []
# 1. CI vs P 值逻辑检查(基础)
ci_issues = self._validate_ci_pvalue_consistency(table)
issues.extend(ci_issues)
# 2. T 检验逆向验证
if SCIPY_AVAILABLE:
ttest_issues = self._validate_ttest(table)
issues.extend(ttest_issues)
# 2.5. 卡方检验逆向验证
if SCIPY_AVAILABLE:
chi2_issues = self._validate_chi_square(table)
issues.extend(chi2_issues)
# 3. SE 三角验证(终审提权:回归系数 CI↔P 一致性)
se_issues = self._validate_se_triangle(table)
issues.extend(se_issues)
# 4. SD > Mean 检查(终审提权:启发式规则)
sd_issues = self._validate_sd_greater_mean(table)
issues.extend(sd_issues)
# 更新表格的 issues
table.issues.extend(issues)
logger.debug(f"表格 {table.id} 统计验证完成: {len(issues)} 个问题")
return issues
def _validate_ci_pvalue_consistency(self, table: TableData) -> List[Issue]:
"""
验证 CI 与 P 值的逻辑一致性
黄金法则:
- 若 95% CI 跨越 1.0(如 0.8-1.2)→ P 值必须 ≥ 0.05
- 若 95% CI 不跨越 1.0(如 1.1-1.5)→ P 值必须 < 0.05
违反此规则 = 数据逻辑矛盾
改进:支持多行单元格的 subrow 精确定位
"""
issues: List[Issue] = []
data = table.data
if len(data) < 2:
return issues
header = data[0] if data else []
pvalue_col_idx = self._find_pvalue_column(header)
for row_idx, row in enumerate(data[1:], start=2):
# 获取 P 值列内容(可能有多行)
pvalue_cell = row[pvalue_col_idx] if pvalue_col_idx < len(row) else ""
pvalue_lines = pvalue_cell.split("\n") if pvalue_cell else []
# 获取第一列内容(用于描述)
first_cell_lines = row[0].split("\n") if row else []
# 整行文本用于查找 CI
row_text = " ".join(row)
# 查找所有 CI
all_ci_results = []
for pattern in CI_PATTERNS:
for match in pattern.finditer(row_text):
ci_lower = _safe_float(match.group(1))
ci_upper = _safe_float(match.group(2))
if ci_lower is not None and ci_upper is not None and ci_lower < ci_upper:
all_ci_results.append((ci_lower, ci_upper))
if not all_ci_results:
# 回退到单个 CI 解析
ci_result = self._parse_ci(row_text)
if ci_result:
all_ci_results.append(ci_result)
if not all_ci_results:
continue
# 遍历 P 值行进行验证
for line_idx, pvalue_line in enumerate(pvalue_lines):
pvalue = self._parse_pvalue_flexible(pvalue_line)
if pvalue is None:
continue
# 获取行描述
row_desc = first_cell_lines[line_idx] if line_idx < len(first_cell_lines) else f"{line_idx+1}"
# 使用对应的 CI如果有多个 CI按顺序匹配
ci_idx = min(line_idx, len(all_ci_results) - 1)
ci_lower, ci_upper = all_ci_results[ci_idx]
# 检查逻辑一致性
ci_crosses_one = ci_lower <= 1.0 <= ci_upper
p_significant = pvalue < 0.05
# 计算 subrow 索引
subrow_idx = line_idx + 1 if len(pvalue_lines) > 1 else None
# 矛盾情况
if ci_crosses_one and p_significant:
issues.append(Issue(
severity=Severity.ERROR,
type=IssueType.STAT_CI_PVALUE_CONFLICT,
message=f"CI 与 P 值逻辑矛盾 [{row_desc.strip()}]: 95% CI ({ci_lower}-{ci_upper}) 跨越 1.0,但 P={pvalue} < 0.05",
location=CellLocation(
table_id=table.id,
row=row_idx,
col=pvalue_col_idx + 1,
subrow=subrow_idx
),
evidence={
"ci_lower": ci_lower,
"ci_upper": ci_upper,
"ci_crosses_one": ci_crosses_one,
"pvalue": pvalue,
"p_significant": p_significant
}
))
elif not ci_crosses_one and not p_significant:
issues.append(Issue(
severity=Severity.ERROR,
type=IssueType.STAT_CI_PVALUE_CONFLICT,
message=f"CI 与 P 值逻辑矛盾 [{row_desc.strip()}]: 95% CI ({ci_lower}-{ci_upper}) 不跨越 1.0,但 P={pvalue} ≥ 0.05",
location=CellLocation(
table_id=table.id,
row=row_idx,
col=pvalue_col_idx + 1,
subrow=subrow_idx
),
evidence={
"ci_lower": ci_lower,
"ci_upper": ci_upper,
"ci_crosses_one": ci_crosses_one,
"pvalue": pvalue,
"p_significant": p_significant
}
))
return issues
def _validate_ttest(self, table: TableData) -> List[Issue]:
"""
T 检验逆向验证
从表格中提取 M±SD, n 信息,反推 t 值和 P 值,
与报告的 P 值进行对比。
公式: t = (M1 - M2) / sqrt(SD1²/n1 + SD2²/n2)
改进:
1. 智能样本量提取(表头、行首、上下文)
2. 支持多种 Mean±SD 格式
3. 支持多行单元格的 subrow 精确定位
"""
issues: List[Issue] = []
if not SCIPY_AVAILABLE:
return issues
data = table.data
if len(data) < 2:
return issues
header = data[0] if data else []
# 预先从表头提取样本量
n1, n2 = self._extract_sample_sizes_from_header(header)
# 查找 P 值列的索引
pvalue_col_idx = self._find_pvalue_column(header)
# 查找包含组比较数据的行
for row_idx, row in enumerate(data[1:], start=2):
row_text = " ".join(row)
# 尝试提取同一行中的两组 Mean±SD 数据
mean_sd_matches = list(MEAN_SD_PATTERN.finditer(row_text))
# 如果没找到,尝试括号格式
if len(mean_sd_matches) < 2:
mean_sd_matches = list(MEAN_SD_PAREN_PATTERN.finditer(row_text))
if len(mean_sd_matches) < 2:
continue
# 找到至少两组 Mean±SD 数据
try:
m1 = _safe_float(mean_sd_matches[0].group(1))
sd1 = _safe_float(mean_sd_matches[0].group(2))
m2 = _safe_float(mean_sd_matches[1].group(1))
sd2 = _safe_float(mean_sd_matches[1].group(2))
if None in (m1, sd1, m2, sd2):
continue
# 如果表头没有样本量,尝试从行中提取
local_n1, local_n2 = n1, n2
if local_n1 is None or local_n2 is None:
local_n1, local_n2 = self._extract_sample_sizes_from_row(row, header)
# 仍然没有样本量,跳过
if local_n1 is None or local_n2 is None:
continue
# 计算 SE 和 t 值
se = math.sqrt(sd1**2/local_n1 + sd2**2/local_n2)
if se == 0:
continue
t_calc = abs(m1 - m2) / se
df = local_n1 + local_n2 - 2
# 计算 P 值
p_calc = 2 * (1 - stats.t.cdf(t_calc, df))
# 从 P 值列提取报告的 P 值
pvalue_cell = row[pvalue_col_idx] if pvalue_col_idx < len(row) else ""
pvalue_lines = pvalue_cell.split("\n") if pvalue_cell else []
# 尝试从整行提取 P 值(如果 P 值列没有)
if not pvalue_lines or not any(pvalue_lines):
pvalue = self._parse_pvalue_flexible(row_text)
if pvalue is None:
continue
pvalue_lines = [str(pvalue)]
subrow_idx = None
pvalue_col = pvalue_col_idx + 1
else:
# 遍历 P 值单元格的每一行
for line_idx, pvalue_line in enumerate(pvalue_lines):
pvalue = self._parse_pvalue_flexible(pvalue_line)
if pvalue is None:
continue
# 计算子行索引
subrow_idx = line_idx + 1 if len(pvalue_lines) > 1 else None
pvalue_col = pvalue_col_idx + 1
# 比较 P 值
p_diff = abs(p_calc - pvalue)
# 获取行描述
first_cell_lines = row[0].split("\n") if row else []
row_desc = first_cell_lines[line_idx] if line_idx < len(first_cell_lines) else row[0][:20] if row else ""
if p_diff > PVALUE_ERROR_THRESHOLD:
issues.append(Issue(
severity=Severity.ERROR,
type=IssueType.STAT_TTEST_PVALUE,
message=f"T 检验 P 值矛盾 [{row_desc.strip()}]: 报告 P={pvalue},计算 P={p_calc:.4f}(差异 {p_diff:.3f}",
location=CellLocation(
table_id=table.id,
row=row_idx,
col=pvalue_col,
subrow=subrow_idx
),
evidence={
"group1": {"mean": m1, "sd": sd1, "n": local_n1},
"group2": {"mean": m2, "sd": sd2, "n": local_n2},
"t_calculated": round(t_calc, 3),
"df": df,
"p_calculated": round(p_calc, 4),
"p_reported": pvalue,
"p_difference": round(p_diff, 4)
}
))
elif p_diff > PVALUE_WARNING_THRESHOLD:
issues.append(Issue(
severity=Severity.WARNING,
type=IssueType.STAT_TTEST_PVALUE,
message=f"T 检验 P 值轻微偏差 [{row_desc.strip()}]: 报告 P={pvalue},计算 P={p_calc:.4f}",
location=CellLocation(
table_id=table.id,
row=row_idx,
col=pvalue_col,
subrow=subrow_idx
),
evidence={
"p_calculated": round(p_calc, 4),
"p_reported": pvalue,
"p_difference": round(p_diff, 4)
}
))
continue # 已处理完此行的所有 P 值
# 单个 P 值的情况
pvalue = self._parse_pvalue_flexible(pvalue_lines[0]) if pvalue_lines else None
if pvalue is None:
continue
p_diff = abs(p_calc - pvalue)
if p_diff > PVALUE_ERROR_THRESHOLD:
issues.append(Issue(
severity=Severity.ERROR,
type=IssueType.STAT_TTEST_PVALUE,
message=f"T 检验 P 值不一致: 报告 P={pvalue},计算 P={p_calc:.4f}(差异 {p_diff:.3f}",
location=CellLocation(
table_id=table.id,
row=row_idx,
col=pvalue_col_idx + 1
),
evidence={
"group1": {"mean": m1, "sd": sd1, "n": local_n1},
"group2": {"mean": m2, "sd": sd2, "n": local_n2},
"t_calculated": round(t_calc, 3),
"df": df,
"p_calculated": round(p_calc, 4),
"p_reported": pvalue,
"p_difference": round(p_diff, 4)
}
))
elif p_diff > PVALUE_WARNING_THRESHOLD:
issues.append(Issue(
severity=Severity.WARNING,
type=IssueType.STAT_TTEST_PVALUE,
message=f"T 检验 P 值轻微偏差: 报告 P={pvalue},计算 P={p_calc:.4f}",
location=CellLocation(
table_id=table.id,
row=row_idx,
col=pvalue_col_idx + 1
),
evidence={
"p_calculated": round(p_calc, 4),
"p_reported": pvalue,
"p_difference": round(p_diff, 4)
}
))
except (ValueError, TypeError, ZeroDivisionError) as e:
logger.debug(f"T 检验验证失败: {e}")
continue
return issues
def _extract_sample_sizes_from_header(self, header: List[str]) -> Tuple[Optional[int], Optional[int]]:
"""
从表头提取样本量
支持格式:
- (n=50)
- n=50
- N=50
- (50例)
- 对照组(n=48)
"""
n_pattern = re.compile(r"[(\[]?\s*[nN]\s*[=:]\s*(\d+)\s*[)\]]?")
n_pattern_cn = re.compile(r"[(\[]?\s*(\d+)\s*例\s*[)\]]?")
n_values = []
for cell in header:
# 优先匹配 n=XX 格式
match = n_pattern.search(cell)
if match:
try:
n_values.append(int(match.group(1)))
except ValueError:
pass
continue
# 尝试中文格式
match = n_pattern_cn.search(cell)
if match:
try:
n_values.append(int(match.group(1)))
except ValueError:
pass
if len(n_values) >= 2:
return n_values[0], n_values[1]
return None, None
def _extract_sample_sizes_from_row(
self,
row: List[str],
header: List[str]
) -> Tuple[Optional[int], Optional[int]]:
"""
从数据行提取样本量
策略:
1. 查找行首的 n 值
2. 查找与 Mean±SD 列对应的 n 列
"""
row_text = " ".join(row)
n_pattern = re.compile(r"\(\s*[nN]\s*[=:]\s*(\d+)\s*\)")
matches = n_pattern.findall(row_text)
if len(matches) >= 2:
try:
return int(matches[0]), int(matches[1])
except ValueError:
pass
return None, None
def _validate_chi_square(self, table: TableData) -> List[Issue]:
"""
卡方检验逆向验证
从报告的 χ² 值和推断的自由度,反算 P 值,与报告值对比。
原理:
- 查找 χ²=X.XXX 和对应的 P 值
- 估计自由度(默认 df=1适用于大多数 2x2 比较)
- 使用卡方分布计算 P 值
- 与报告的 P 值对比
特殊处理:
- 支持多段落单元格(一个单元格内多行数据)
- 支持 P 值列没有 "P=" 前缀的情况(直接是数值)
适用场景:
- 医学基线特征表(分类变量比较)
- 任何报告 χ² 值和 P 值的表格
"""
issues: List[Issue] = []
if not SCIPY_AVAILABLE:
return issues
data = table.data
if len(data) < 2:
return issues
# 首先识别表头,找到 P 值列
header = data[0]
pvalue_col_idx = self._find_pvalue_column(header)
chi2_col_idx = self._find_stat_column(header)
for row_idx, row in enumerate(data[1:], start=2):
# 获取统计值和 P 值单元格
stat_cell = row[chi2_col_idx] if chi2_col_idx < len(row) else ""
pvalue_cell = row[pvalue_col_idx] if pvalue_col_idx < len(row) else ""
# 处理多行单元格:按换行符分割
stat_lines = stat_cell.split("\n") if stat_cell else []
pvalue_lines = pvalue_cell.split("\n") if pvalue_cell else []
# 逐行匹配卡方值和 P 值
for line_idx in range(max(len(stat_lines), len(pvalue_lines))):
stat_line = stat_lines[line_idx] if line_idx < len(stat_lines) else ""
pvalue_line = pvalue_lines[line_idx] if line_idx < len(pvalue_lines) else ""
# 查找 χ² 值
chi2_match = CHI_SQUARE_PATTERN.search(stat_line)
if not chi2_match:
continue
chi2_value = _safe_float(chi2_match.group(1))
if chi2_value is None or chi2_value <= 0:
continue
# 解析 P 值(支持多种格式)
pvalue = self._parse_pvalue_flexible(pvalue_line)
if pvalue is None:
continue
# 默认 df=1最常见的 2x2 比较场景)
df = 1
try:
# 使用卡方分布计算 P 值
p_calc = 1 - stats.chi2.cdf(chi2_value, df)
# 比较 P 值
p_diff = abs(p_calc - pvalue)
# 检查显著性是否一致
p_significant_reported = pvalue < 0.05
p_significant_calc = p_calc < 0.05
significance_mismatch = p_significant_reported != p_significant_calc
# 获取子行描述(从第一列提取)
first_cell_lines = row[0].split("\n") if row else []
sub_row_desc = first_cell_lines[line_idx] if line_idx < len(first_cell_lines) else f"子行 {line_idx + 1}"
# 计算子行索引(从 1 开始),用于前端精确高亮
subrow_idx = line_idx + 1 if len(pvalue_lines) > 1 else None
if significance_mismatch:
issues.append(Issue(
severity=Severity.ERROR,
type=IssueType.STAT_CHI2_PVALUE,
message=f"卡方检验 P 值矛盾 [{sub_row_desc.strip()}]: χ²={chi2_value}, 报告 P={pvalue}, 计算 P={p_calc:.4f},显著性不一致",
location=CellLocation(
table_id=table.id,
row=row_idx,
col=pvalue_col_idx + 1,
subrow=subrow_idx
),
evidence={
"chi2_value": chi2_value,
"df": df,
"p_calculated": round(p_calc, 4),
"p_reported": pvalue,
"p_difference": round(p_diff, 4),
"sub_row": sub_row_desc.strip(),
"significance_reported": "显著" if p_significant_reported else "不显著",
"significance_calculated": "显著" if p_significant_calc else "不显著"
}
))
elif p_diff > PVALUE_ERROR_THRESHOLD:
issues.append(Issue(
severity=Severity.WARNING,
type=IssueType.STAT_CHI2_PVALUE,
message=f"卡方检验 P 值偏差 [{sub_row_desc.strip()}]: χ²={chi2_value}, 报告 P={pvalue}, 计算 P={p_calc:.4f},差异 {p_diff:.3f}",
location=CellLocation(
table_id=table.id,
row=row_idx,
col=pvalue_col_idx + 1,
subrow=subrow_idx
),
evidence={
"chi2_value": chi2_value,
"df": df,
"p_calculated": round(p_calc, 4),
"p_reported": pvalue,
"p_difference": round(p_diff, 4),
"sub_row": sub_row_desc.strip()
}
))
except (ValueError, ZeroDivisionError, TypeError) as e:
logger.debug(f"卡方检验验证失败: {e}")
continue
return issues
def _find_pvalue_column(self, header: List[str]) -> int:
"""查找 P 值列的索引"""
p_keywords = ["p值", "pvalue", "p-value", "p 值", "sig"]
for idx, cell in enumerate(header):
cell_lower = cell.lower().strip()
for kw in p_keywords:
if kw in cell_lower:
return idx
# 默认最后一列
return len(header) - 1
def _find_stat_column(self, header: List[str]) -> int:
"""查找统计值列的索引(包含 χ²/t/Z 等)"""
stat_keywords = ["统计", "stat", "χ", "chi", "t值", "z值"]
for idx, cell in enumerate(header):
cell_lower = cell.lower().strip()
for kw in stat_keywords:
if kw in cell_lower:
return idx
# 默认倒数第二列
return len(header) - 2
def _parse_pvalue_flexible(self, text: str) -> Optional[float]:
"""
灵活解析 P 值
支持格式:
- P=0.05
- P<0.001
- 0.05(直接数值)
- 0.001(全角符号)
"""
if not text:
return None
# 先清洗特殊字符(负号归一化等)
text = _clean_number_string(text)
# 先尝试标准 P 值格式
match = PVALUE_PATTERN.search(text)
if match:
val = _safe_float(match.group(1))
if val is not None:
return val
# 处理 <0.001 或 0.001 格式
less_than_match = re.search(r"[<]\s*(\d+\.?\d*)", text)
if less_than_match:
val = _safe_float(less_than_match.group(1))
if val is not None:
return val
# 直接尝试解析为数字
val = _safe_float(text)
if val is not None and 0 <= val <= 1: # P 值范围检查
return val
return None
def _validate_se_triangle(self, table: TableData) -> List[Issue]:
"""
SE 三角验证(终审提权)
用于 Logistic 回归、Cox 回归等场景。
原理:
- SE = (ln(CI_upper) - ln(CI_lower)) / 3.92
- Z = ln(OR) / SE
- P_calculated = 2 * (1 - norm.cdf(|Z|))
若报告的 P 值与计算的 P 值严重不一致,则存在问题。
改进:支持多行单元格的 subrow 精确定位
"""
issues: List[Issue] = []
data = table.data
if not SCIPY_AVAILABLE:
return issues
header = data[0] if data else []
pvalue_col_idx = self._find_pvalue_column(header)
for row_idx, row in enumerate(data[1:], start=2):
# 获取 P 值列的内容(可能有多行)
pvalue_cell = row[pvalue_col_idx] if pvalue_col_idx < len(row) else ""
pvalue_lines = pvalue_cell.split("\n") if pvalue_cell else []
# 获取第一列内容(用于描述)
first_cell_lines = row[0].split("\n") if row else []
# 将整行连接起来查找 OR/HR/RR 和 CI
row_text = " ".join(row)
# 查找所有 OR/HR/RR可能有多个
effect_matches = list(EFFECT_SIZE_PATTERN.finditer(row_text))
if not effect_matches:
continue
# 查找所有 CI
ci_matches = []
for pattern in CI_PATTERNS:
ci_matches.extend(list(pattern.finditer(row_text)))
if not ci_matches:
continue
# 遍历 P 值行,尝试匹配对应的 OR/CI
for line_idx, pvalue_line in enumerate(pvalue_lines):
pvalue = self._parse_pvalue_flexible(pvalue_line)
if pvalue is None:
continue
# 获取当前行的描述
row_desc = first_cell_lines[line_idx] if line_idx < len(first_cell_lines) else f"{line_idx+1}"
# 使用第一个有效的 OR/CI 组合进行验证
for effect_match in effect_matches:
effect_size = _safe_float(effect_match.group(1))
if effect_size is None or effect_size <= 0:
continue
# 查找对应的 CI
ci_result = self._parse_ci(row_text)
if ci_result is None:
continue
ci_lower, ci_upper = ci_result
# 确保 CI 有效
if ci_lower <= 0 or ci_upper <= 0 or ci_lower >= ci_upper:
continue
try:
# SE 三角计算
ln_effect = math.log(effect_size)
ln_ci_lower = math.log(ci_lower)
ln_ci_upper = math.log(ci_upper)
# SE = (ln(CI_upper) - ln(CI_lower)) / 3.92 (for 95% CI)
se = (ln_ci_upper - ln_ci_lower) / 3.92
if se <= 0:
continue
# Z = ln(OR) / SE
z = abs(ln_effect) / se
# P = 2 * (1 - norm.cdf(|Z|))
p_calc = 2 * (1 - stats.norm.cdf(z))
# 比较 P 值
p_diff = abs(p_calc - pvalue)
# 计算 subrow 索引
subrow_idx = line_idx + 1 if len(pvalue_lines) > 1 else None
if p_diff > PVALUE_ERROR_THRESHOLD:
issues.append(Issue(
severity=Severity.ERROR,
type=IssueType.STAT_SE_TRIANGLE,
message=f"SE 三角验证不一致 [{row_desc.strip()}]: OR={effect_size}, 报告 P={pvalue},由 CI 反推 P={p_calc:.4f}",
location=CellLocation(
table_id=table.id,
row=row_idx,
col=pvalue_col_idx + 1,
subrow=subrow_idx
),
evidence={
"effect_size": effect_size,
"ci_lower": ci_lower,
"ci_upper": ci_upper,
"se_calculated": round(se, 4),
"z_calculated": round(z, 3),
"p_calculated": round(p_calc, 4),
"p_reported": pvalue,
"p_difference": round(p_diff, 4)
}
))
elif p_diff > PVALUE_WARNING_THRESHOLD:
issues.append(Issue(
severity=Severity.WARNING,
type=IssueType.STAT_SE_TRIANGLE,
message=f"SE 三角验证轻微偏差 [{row_desc.strip()}]: OR={effect_size}, 报告 P={pvalue},计算 P={p_calc:.4f}",
location=CellLocation(
table_id=table.id,
row=row_idx,
col=pvalue_col_idx + 1,
subrow=subrow_idx
),
evidence={
"effect_size": effect_size,
"p_calculated": round(p_calc, 4),
"p_reported": pvalue,
"p_difference": round(p_diff, 4)
}
))
# 找到有效匹配后跳出 effect_match 循环
break
except (ValueError, ZeroDivisionError, TypeError) as e:
logger.debug(f"SE 三角验证失败: {e}")
continue
return issues
def _validate_sd_greater_mean(self, table: TableData) -> List[Issue]:
"""
SD > Mean 启发式检查(终审提权)
对于正值指标(如年龄、体重、血压、实验室指标),
SD > Mean 通常是不合理的,可能暗示数据问题。
例外情况:
- 差值指标(可正可负)
- 某些偏态分布指标
"""
issues: List[Issue] = []
data = table.data
# 识别表头,判断哪些列是正值指标
if len(data) < 2:
return issues
header = data[0]
# 正值指标的关键词(这些指标通常不应有 SD > Mean
positive_indicators = [
"age", "年龄", "weight", "体重", "bmi", "height", "身高",
"sbp", "dbp", "血压", "heart rate", "心率", "pulse", "脉搏",
"wbc", "rbc", "hgb", "plt", "白细胞", "红细胞", "血红蛋白", "血小板",
"creatinine", "肌酐", "bun", "尿素氮", "glucose", "血糖",
"alt", "ast", "转氨酶", "bilirubin", "胆红素",
"cost", "费用", "time", "时间", "duration", "持续"
]
for row_idx, row in enumerate(data[1:], start=2):
for col_idx, cell in enumerate(row, start=1):
# 检查 Mean±SD 格式
match = MEAN_SD_PATTERN.search(cell)
if not match:
# 尝试括号格式
match = MEAN_SD_PAREN_PATTERN.search(cell)
if not match:
continue
mean_val = _safe_float(match.group(1))
sd_val = _safe_float(match.group(2))
if mean_val is None or sd_val is None:
continue
# 检查 SD > Mean仅对 mean > 0 的情况)
if mean_val > 0 and sd_val > mean_val:
# 检查是否是正值指标(通过表头或行首判断)
context_text = ""
if col_idx - 1 < len(header):
context_text += header[col_idx - 1].lower()
if len(row) > 0:
context_text += " " + row[0].lower()
# 判断是否是已知的正值指标
is_positive_indicator = any(kw in context_text for kw in positive_indicators)
# 计算 CV变异系数
cv = sd_val / mean_val if mean_val != 0 else 0
if is_positive_indicator:
# 已知正值指标SD > Mean 是错误
issues.append(Issue(
severity=Severity.ERROR,
type=IssueType.STAT_SD_GREATER_MEAN,
message=f"SD 大于 Mean 异常: {mean_val}±{sd_val}CV={cv:.1%},该指标通常为正值",
location=CellLocation(
table_id=table.id,
row=row_idx,
col=col_idx
),
evidence={
"mean": mean_val,
"sd": sd_val,
"cv": round(cv, 3),
"context": context_text[:50]
}
))
else:
# 未确定的指标,给出警告
issues.append(Issue(
severity=Severity.WARNING,
type=IssueType.STAT_SD_GREATER_MEAN,
message=f"SD 大于 Mean: {mean_val}±{sd_val}CV={cv:.1%},建议核查数据分布",
location=CellLocation(
table_id=table.id,
row=row_idx,
col=col_idx
),
evidence={
"mean": mean_val,
"sd": sd_val,
"cv": round(cv, 3)
}
))
return issues
# ==================== 辅助方法 ====================
def _parse_ci(self, text: str) -> Optional[Tuple[float, float]]:
"""
解析 CI 字符串,支持多种格式(终审建议)
支持格式:
- 2.5 (1.1-3.5)
- 2.5 (1.1, 3.5)
- 2.5 [1.1; 3.5]
- 95% CI: 1.1-3.5
- 95% CI 1.1 to 3.5
"""
for pattern in CI_PATTERNS:
match = pattern.search(text)
if match:
try:
lower = _safe_float(match.group(1))
upper = _safe_float(match.group(2))
if lower is not None and upper is not None and lower < upper:
return lower, upper
except IndexError:
continue
# 回退到原始的 CI_PATTERN
match = CI_PATTERN.search(text)
if match:
lower = _safe_float(match.group(1))
upper = _safe_float(match.group(2))
if lower is not None and upper is not None and lower < upper:
return lower, upper
return None
def _parse_pvalue(self, text: str) -> Optional[float]:
"""
解析 P 值
处理:
- P=0.05
- P<0.001
- P>0.05
- p值=0.05
"""
match = PVALUE_PATTERN.search(text)
if match:
return _safe_float(match.group(1))
return None