Files
AIclinicalresearch/extraction_service/operations/compute.py
HaHafeng 75ceeb0653 hotfix(dc/tool-c): Fix compute formula validation and binning NaN serialization
Critical fixes:
1. Compute column: Add Chinese comma support in formula validation
   - Problem: Formula with Chinese comma failed validation
   - Fix: Add Chinese comma character to allowed_chars regex
   - Example: Support formulas like 'col1(kg)+ col2,col3'

2. Binning operation: Fix NaN serialization error
   - Problem: 'Out of range float values are not JSON compliant: nan'
   - Fix: Enhanced NaN/inf handling in binning endpoint
   - Added np.inf/-np.inf replacement before JSON serialization
   - Added manual JSON serialization with NaN->null conversion

3. Enhanced all operation endpoints for consistency
   - Updated conditional, dropna endpoints with same NaN/inf handling
   - Ensures all operations return JSON-compliant data

Modified files:
- extraction_service/operations/compute.py: Add Chinese comma to regex
- extraction_service/main.py: Enhanced NaN handling in binning/conditional/dropna

Status: Hotfix complete, ready for testing
2025-12-09 08:45:27 +08:00

270 lines
8.6 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
计算列 - 预写函数
基于公式计算新列,支持数学运算和常用函数
"""
import pandas as pd
import numpy as np
import re
from typing import Dict, Any
# 允许的函数(安全白名单)
ALLOWED_FUNCTIONS = {
'abs': abs,
'round': round,
'sqrt': np.sqrt,
'log': np.log,
'log10': np.log10,
'exp': np.exp,
'sin': np.sin,
'cos': np.cos,
'tan': np.tan,
'floor': np.floor,
'ceil': np.ceil,
'min': min,
'max': max,
}
def validate_formula(formula: str, available_columns: list) -> tuple[bool, str]:
"""
验证公式安全性和正确性
Args:
formula: 公式字符串
available_columns: 可用的列名列表
Returns:
(is_valid, error_message)
"""
# 检查是否为空
if not formula or not formula.strip():
return False, '公式不能为空'
# 检查危险操作
dangerous_patterns = [
r'__', # 双下划线Python内部属性
r'import\s', # import语句
r'exec\s', # exec函数
r'eval\s', # eval函数
r'open\s*\(', # 文件操作
r'compile\s*\(', # 编译函数
r'globals\s*\(', # 全局变量
r'locals\s*\(', # 局部变量
r'__builtins__', # 内置函数
]
for pattern in dangerous_patterns:
if re.search(pattern, formula, re.IGNORECASE):
return False, f'公式包含不允许的操作: {pattern}'
# ✨ 增强:检查是否只包含允许的字符(放宽限制,支持更多特殊字符)
# 允许:英文字母、数字、下划线、中文、空格、运算符、括号(中英文)、逗号(中英文)、点、冒号、等号
allowed_chars = r'[a-zA-Z0-9_\u4e00-\u9fa5\s\+\-\*/\(\)\[\]\{\}\.,:\*\*=()【】、。:;!?]'
if not re.match(f'^{allowed_chars}+$', formula):
# 找出不允许的字符
invalid_chars = set(re.findall(f'[^{allowed_chars}]', formula))
return False, f'公式包含不允许的字符: {", ".join(invalid_chars)}'
return True, ''
def compute_column(
df: pd.DataFrame,
new_column_name: str,
formula: str
) -> pd.DataFrame:
"""
基于公式计算新列
Args:
df: 输入数据框
new_column_name: 新列名称
formula: 计算公式
- 支持列名引用(如:身高, 体重)
- 支持运算符(+, -, *, /, **, %
- 支持函数abs, round, sqrt, log, exp等
Returns:
添加了新列的数据框
示例:
# BMI计算
compute_column(df, 'BMI', '体重 / (身高/100)**2')
# 年龄平方根
compute_column(df, '年龄_sqrt', 'sqrt(年龄)')
# 复杂公式
compute_column(df, '综合得分', '(FMA*0.6 + ADL*0.4) / 100')
"""
result = df.copy()
print(f'计算新列: {new_column_name}')
print(f'公式: {formula}')
print('')
# 验证公式
is_valid, error_msg = validate_formula(formula, list(result.columns))
if not is_valid:
raise ValueError(f'公式验证失败: {error_msg}')
# 准备执行环境
# 1. 添加数据框的列作为变量(自动转换数值类型)
env = {}
# ✨ 增强:处理列名中的特殊字符
# 创建列名映射:将公式中的列名替换为安全的变量名
col_mapping = {}
formula_safe = formula
for i, col in enumerate(result.columns):
# 为每个列创建一个安全的变量名
safe_var = f'col_{i}'
col_mapping[col] = safe_var
# 在公式中替换列名(完整匹配,避免部分替换)
# 使用正则表达式确保只替换完整的列名
import re
# 转义列名中的特殊字符
col_escaped = re.escape(col)
# 替换公式中的列名(前后必须是边界)
formula_safe = re.sub(rf'\b{col_escaped}\b', safe_var, formula_safe)
# 尝试将列转换为数值类型
try:
# 如果列可以转换为数值,就转换
numeric_col = pd.to_numeric(result[col], errors='coerce')
# 如果转换后不全是NaN说明是数值列
if not numeric_col.isna().all():
env[safe_var] = numeric_col
print(f'"{col}" -> {safe_var} (数值类型)')
else:
# 否则保持原样
env[safe_var] = result[col]
print(f'"{col}" -> {safe_var}')
except Exception:
# 转换失败,保持原样
env[safe_var] = result[col]
print(f'"{col}" -> {safe_var}')
# 2. 添加允许的函数
env.update(ALLOWED_FUNCTIONS)
# 3. 添加numpy用于数学运算
env['np'] = np
print(f' 使用安全公式: {formula_safe}')
print('')
try:
# ✨ 使用转换后的安全公式执行计算
computed_values = eval(formula_safe, {"__builtins__": {}}, env)
# ✨ 优化:将新列插入到第一个引用列的旁边
# 找到公式中引用的第一个列
first_ref_col = None
for col in result.columns:
safe_var = col_mapping.get(col)
if safe_var and safe_var in formula_safe:
first_ref_col = col
break
if first_ref_col:
ref_col_index = result.columns.get_loc(first_ref_col)
result.insert(ref_col_index + 1, new_column_name, computed_values)
print(f'计算成功!新列插入在 {first_ref_col} 旁边')
else:
# 如果找不到引用列,添加到最后
result[new_column_name] = computed_values
print(f'计算成功!')
print(f'新列类型: {result[new_column_name].dtype}')
print(f'新列前5个值:')
# 安全打印避免NaN/inf导致序列化错误
for idx, val in result[new_column_name].head().items():
if pd.isna(val):
print(f' [{idx}] None (NaN)')
elif np.isinf(val):
print(f' [{idx}] None (inf)')
else:
print(f' [{idx}] {val}')
print('')
# 统计结果
if pd.api.types.is_numeric_dtype(result[new_column_name]):
col_data = result[new_column_name]
# 统计缺失值和无效值
nan_count = col_data.isna().sum()
inf_count = np.isinf(col_data.replace([np.nan], 0)).sum()
print(f'统计信息:')
# 只对有效值计算统计量
valid_data = col_data.dropna().replace([np.inf, -np.inf], np.nan).dropna()
if len(valid_data) > 0:
print(f' 最小值: {valid_data.min():.2f}')
print(f' 最大值: {valid_data.max():.2f}')
print(f' 平均值: {valid_data.mean():.2f}')
else:
print(f' 没有有效的数值')
if nan_count > 0:
print(f' 缺失值(NaN): {nan_count}')
if inf_count > 0:
print(f' 无穷大值(inf): {inf_count}')
else:
print(f'非数值类型,跳过统计')
return result
except NameError as e:
# 列名不存在
missing_col = str(e).split("'")[1]
raise ValueError(f'"{missing_col}" 不存在,请检查公式中的列名')
except ZeroDivisionError:
raise ValueError('除零错误公式中存在除以0的情况')
except Exception as e:
raise ValueError(f'计算失败: {str(e)}')
def get_formula_examples() -> list[Dict[str, str]]:
"""
获取公式示例
Returns:
示例列表
"""
return [
{
'name': 'BMI计算',
'formula': '体重 / (身高/100)**2',
'description': '体重指数(需要身高(cm)和体重(kg)列)'
},
{
'name': '年龄分组',
'formula': 'round(年龄 / 10) * 10',
'description': '按10岁为一组20, 30, 40...'
},
{
'name': '综合得分',
'formula': '(FMA得分 * 0.6 + ADL得分 * 0.4)',
'description': '加权平均分'
},
{
'name': '变化率',
'formula': '(随访值 - 基线值) / 基线值 * 100',
'description': '计算变化百分比'
},
{
'name': '对数转换',
'formula': 'log(值 + 1)',
'description': '对数变换(处理偏态分布)'
},
]