""" 计算列 - 预写函数 基于公式计算新列,支持数学运算和常用函数 """ import pandas as pd import numpy as np import re from typing import Dict, Any # 允许的函数(安全白名单) ALLOWED_FUNCTIONS = { 'abs': abs, 'round': round, 'sqrt': np.sqrt, 'log': np.log, 'log10': np.log10, 'exp': np.exp, 'sin': np.sin, 'cos': np.cos, 'tan': np.tan, 'floor': np.floor, 'ceil': np.ceil, 'min': min, 'max': max, } def validate_formula(formula: str, available_columns: list) -> tuple[bool, str]: """ 验证公式安全性和正确性 Args: formula: 公式字符串 available_columns: 可用的列名列表 Returns: (is_valid, error_message) """ # 检查是否为空 if not formula or not formula.strip(): return False, '公式不能为空' # 检查危险操作 dangerous_patterns = [ r'__', # 双下划线(Python内部属性) r'import\s', # import语句 r'exec\s', # exec函数 r'eval\s', # eval函数 r'open\s*\(', # 文件操作 r'compile\s*\(', # 编译函数 r'globals\s*\(', # 全局变量 r'locals\s*\(', # 局部变量 r'__builtins__', # 内置函数 ] for pattern in dangerous_patterns: if re.search(pattern, formula, re.IGNORECASE): return False, f'公式包含不允许的操作: {pattern}' # ✨ 增强:检查是否只包含允许的字符(放宽限制,支持更多特殊字符) # 允许:英文字母、数字、下划线、中文、空格、运算符、括号(中英文)、逗号(中英文)、点、冒号、等号 allowed_chars = r'[a-zA-Z0-9_\u4e00-\u9fa5\s\+\-\*/\(\)\[\]\{\}\.,,:\*\*=()【】、。:;!?]' if not re.match(f'^{allowed_chars}+$', formula): # 找出不允许的字符 invalid_chars = set(re.findall(f'[^{allowed_chars}]', formula)) return False, f'公式包含不允许的字符: {", ".join(invalid_chars)}' return True, '' def compute_column( df: pd.DataFrame, new_column_name: str, formula: str ) -> pd.DataFrame: """ 基于公式计算新列 Args: df: 输入数据框 new_column_name: 新列名称 formula: 计算公式 - 支持列名引用(如:身高, 体重) - 支持运算符(+, -, *, /, **, %) - 支持函数(abs, round, sqrt, log, exp等) Returns: 添加了新列的数据框 示例: # BMI计算 compute_column(df, 'BMI', '体重 / (身高/100)**2') # 年龄平方根 compute_column(df, '年龄_sqrt', 'sqrt(年龄)') # 复杂公式 compute_column(df, '综合得分', '(FMA*0.6 + ADL*0.4) / 100') """ result = df.copy() print(f'计算新列: {new_column_name}') print(f'公式: {formula}') print('') # 验证公式 is_valid, error_msg = validate_formula(formula, list(result.columns)) if not is_valid: raise ValueError(f'公式验证失败: {error_msg}') # 准备执行环境 # 1. 添加数据框的列作为变量(自动转换数值类型) env = {} # ✨ 增强:处理列名中的特殊字符 # 创建列名映射:将公式中的列名替换为安全的变量名 col_mapping = {} formula_safe = formula for i, col in enumerate(result.columns): # 为每个列创建一个安全的变量名 safe_var = f'col_{i}' col_mapping[col] = safe_var # 在公式中替换列名(完整匹配,避免部分替换) # 使用正则表达式确保只替换完整的列名 import re # 转义列名中的特殊字符 col_escaped = re.escape(col) # 替换公式中的列名(前后必须是边界) formula_safe = re.sub(rf'\b{col_escaped}\b', safe_var, formula_safe) # 尝试将列转换为数值类型 try: # 如果列可以转换为数值,就转换 numeric_col = pd.to_numeric(result[col], errors='coerce') # 如果转换后不全是NaN,说明是数值列 if not numeric_col.isna().all(): env[safe_var] = numeric_col print(f' 列 "{col}" -> {safe_var} (数值类型)') else: # 否则保持原样 env[safe_var] = result[col] print(f' 列 "{col}" -> {safe_var}') except Exception: # 转换失败,保持原样 env[safe_var] = result[col] print(f' 列 "{col}" -> {safe_var}') # 2. 添加允许的函数 env.update(ALLOWED_FUNCTIONS) # 3. 添加numpy(用于数学运算) env['np'] = np print(f' 使用安全公式: {formula_safe}') print('') try: # ✨ 使用转换后的安全公式执行计算 computed_values = eval(formula_safe, {"__builtins__": {}}, env) # ✨ 优化:将新列插入到第一个引用列的旁边 # 找到公式中引用的第一个列 first_ref_col = None for col in result.columns: safe_var = col_mapping.get(col) if safe_var and safe_var in formula_safe: first_ref_col = col break if first_ref_col: ref_col_index = result.columns.get_loc(first_ref_col) result.insert(ref_col_index + 1, new_column_name, computed_values) print(f'计算成功!新列插入在 {first_ref_col} 旁边') else: # 如果找不到引用列,添加到最后 result[new_column_name] = computed_values print(f'计算成功!') print(f'新列类型: {result[new_column_name].dtype}') print(f'新列前5个值:') # 安全打印(避免NaN/inf导致序列化错误) for idx, val in result[new_column_name].head().items(): if pd.isna(val): print(f' [{idx}] None (NaN)') elif np.isinf(val): print(f' [{idx}] None (inf)') else: print(f' [{idx}] {val}') print('') # 统计结果 if pd.api.types.is_numeric_dtype(result[new_column_name]): col_data = result[new_column_name] # 统计缺失值和无效值 nan_count = col_data.isna().sum() inf_count = np.isinf(col_data.replace([np.nan], 0)).sum() print(f'统计信息:') # 只对有效值计算统计量 valid_data = col_data.dropna().replace([np.inf, -np.inf], np.nan).dropna() if len(valid_data) > 0: print(f' 最小值: {valid_data.min():.2f}') print(f' 最大值: {valid_data.max():.2f}') print(f' 平均值: {valid_data.mean():.2f}') else: print(f' 没有有效的数值') if nan_count > 0: print(f' 缺失值(NaN): {nan_count} 个') if inf_count > 0: print(f' 无穷大值(inf): {inf_count} 个') else: print(f'非数值类型,跳过统计') return result except NameError as e: # 列名不存在 missing_col = str(e).split("'")[1] raise ValueError(f'列 "{missing_col}" 不存在,请检查公式中的列名') except ZeroDivisionError: raise ValueError('除零错误:公式中存在除以0的情况') except Exception as e: raise ValueError(f'计算失败: {str(e)}') def get_formula_examples() -> list[Dict[str, str]]: """ 获取公式示例 Returns: 示例列表 """ return [ { 'name': 'BMI计算', 'formula': '体重 / (身高/100)**2', 'description': '体重指数(需要身高(cm)和体重(kg)列)' }, { 'name': '年龄分组', 'formula': 'round(年龄 / 10) * 10', 'description': '按10岁为一组(20, 30, 40...)' }, { 'name': '综合得分', 'formula': '(FMA得分 * 0.6 + ADL得分 * 0.4)', 'description': '加权平均分' }, { 'name': '变化率', 'formula': '(随访值 - 基线值) / 基线值 * 100', 'description': '计算变化百分比' }, { 'name': '对数转换', 'formula': 'log(值 + 1)', 'description': '对数变换(处理偏态分布)' }, ]