""" 计算列 - 预写函数 基于公式计算新列,支持数学运算和常用函数 """ import pandas as pd import numpy as np import re from typing import Dict, Any def normalize_formula(formula: str) -> str: """ 规范化公式字符串:将全角字符转换为半角字符 解决用户使用中文输入法导致的全角字符问题 Args: formula: 原始公式字符串 Returns: 规范化后的公式字符串 示例: normalize_formula("体重/(身高/100)**2") -> "体重/(身高/100)**2" """ # 全角到半角的映射 full_to_half = { # 括号 '(': '(', ')': ')', '【': '[', '】': ']', '{': '{', '}': '}', # 运算符 '+': '+', '-': '-', '*': '*', '/': '/', '%': '%', # 数字 '0': '0', '1': '1', '2': '2', '3': '3', '4': '4', '5': '5', '6': '6', '7': '7', '8': '8', '9': '9', # 其他符号 '.': '.', ',': ',', ':': ':', ';': ';', '=': '=', '<': '<', '>': '>', ' ': ' ', # 全角空格 } result = formula for full_char, half_char in full_to_half.items(): result = result.replace(full_char, half_char) return result # 允许的函数(安全白名单) ALLOWED_FUNCTIONS = { 'abs': abs, 'round': round, 'sqrt': np.sqrt, 'log': np.log, 'log10': np.log10, 'exp': np.exp, 'sin': np.sin, 'cos': np.cos, 'tan': np.tan, 'floor': np.floor, 'ceil': np.ceil, 'min': min, 'max': max, } def validate_formula(formula: str, available_columns: list) -> tuple[bool, str]: """ 验证公式安全性和正确性 Args: formula: 公式字符串 available_columns: 可用的列名列表 Returns: (is_valid, error_message) """ # 检查是否为空 if not formula or not formula.strip(): return False, '公式不能为空' # 检查危险操作 dangerous_patterns = [ r'__', # 双下划线(Python内部属性) r'import\s', # import语句 r'exec\s', # exec函数 r'eval\s', # eval函数 r'open\s*\(', # 文件操作 r'compile\s*\(', # 编译函数 r'globals\s*\(', # 全局变量 r'locals\s*\(', # 局部变量 r'__builtins__', # 内置函数 ] for pattern in dangerous_patterns: if re.search(pattern, formula, re.IGNORECASE): return False, f'公式包含不允许的操作: {pattern}' # ✨ 增强:检查是否只包含允许的字符(放宽限制,支持更多特殊字符) # 允许:英文字母、数字、下划线、中文、空格、运算符、括号(中英文)、逗号(中英文)、点、冒号、等号 allowed_chars = r'[a-zA-Z0-9_\u4e00-\u9fa5\s\+\-\*/\(\)\[\]\{\}\.,,:\*\*=()【】、。:;!?]' if not re.match(f'^{allowed_chars}+$', formula): # 找出不允许的字符 invalid_chars = set(re.findall(f'[^{allowed_chars}]', formula)) return False, f'公式包含不允许的字符: {", ".join(invalid_chars)}' return True, '' def compute_column( df: pd.DataFrame, new_column_name: str, formula: str, column_mapping: list = None # ⭐ 新增参数(兼容旧版本调用) ) -> pd.DataFrame: """ 基于公式计算新列 Args: df: 输入数据框 new_column_name: 新列名称 formula: 计算公式 - 支持列名引用(如:身高, 体重) - 支持运算符(+, -, *, /, **, %) - 支持函数(abs, round, sqrt, log, exp等) column_mapping: 列名映射(可选) - 用于处理带空格/特殊字符的列名 Returns: 添加了新列的数据框 示例: # BMI计算 compute_column(df, 'BMI', '体重 / (身高/100)**2') # 年龄平方根 compute_column(df, '年龄_sqrt', 'sqrt(年龄)') # 复杂公式 compute_column(df, '综合得分', '(FMA*0.6 + ADL*0.4) / 100') """ result = df.copy() print(f'计算新列: {new_column_name}') print(f'原始公式: {formula}') # ⭐ 步骤1:规范化公式(转换全角字符) formula = normalize_formula(formula) print(f'规范化后: {formula}') print(f'收到列名映射: {len(column_mapping or [])} 个列') print('') # ⭐ 步骤2:彻底解决方案 - 使用Node.js传来的column_mapping env = {} formula_safe = formula normalized_mapping = [] # ⭐ 在外面定义,供后续使用 if column_mapping and len(column_mapping) > 0: print('使用传入的列名映射(支持任意特殊字符):') # ⭐ 关键修复:对originalName也做normalize,确保匹配成功 # 原因:用户输入"身高(cm)",但列名是"身高(Cm)",normalize后才能匹配 for mapping in column_mapping: original_name = mapping.get('originalName', '') safe_name = mapping.get('safeName', '') # ⭐ 对列名也做normalize(全角→半角) normalized_name = normalize_formula(original_name) normalized_mapping.append({ 'originalName': original_name, # 保留原始名(用于访问DataFrame) 'normalizedName': normalized_name, # 标准化名(用于匹配) 'safeName': safe_name }) # ⭐ 关键:按标准化后的列名长度排序(从长到短),避免部分匹配 sorted_mapping = sorted( normalized_mapping, key=lambda x: len(x['normalizedName']), reverse=True ) # ⭐ 使用简单字符串replace(不用正则,彻底解决特殊字符问题) for mapping in sorted_mapping: normalized_name = mapping['normalizedName'] safe_name = mapping['safeName'] original_name = mapping['originalName'] if normalized_name and safe_name: # ⭐ 在标准化后的空间匹配(全角→半角后) if normalized_name in formula_safe: formula_safe = formula_safe.replace(normalized_name, safe_name) print(f' "{original_name}" (标准化: "{normalized_name}") -> {safe_name}') # ⭐ 准备执行环境:使用safeName作为变量名 print('') print('准备执行环境:') for mapping in normalized_mapping: original_name = mapping['originalName'] # 使用原始列名访问DataFrame safe_name = mapping['safeName'] if original_name and safe_name and original_name in result.columns: # 尝试转换为数值类型 try: numeric_col = pd.to_numeric(result[original_name], errors='coerce') if not numeric_col.isna().all(): env[safe_name] = numeric_col print(f' {safe_name} = DataFrame["{original_name}"] (数值)') else: env[safe_name] = result[original_name] print(f' {safe_name} = DataFrame["{original_name}"]') except Exception: env[safe_name] = result[original_name] print(f' {safe_name} = DataFrame["{original_name}"]') else: # ⭐ Fallback:兼容旧版本(没有column_mapping时) print('⚠️ 未传入列名映射,使用自动生成(可能不支持特殊字符):') for i, col in enumerate(result.columns): safe_var = f'col_{i}' # 简单替换(尽力而为) if col in formula_safe: formula_safe = formula_safe.replace(col, safe_var) print(f' "{col}" -> {safe_var}') # 准备执行环境 try: numeric_col = pd.to_numeric(result[col], errors='coerce') if not numeric_col.isna().all(): env[safe_var] = numeric_col else: env[safe_var] = result[col] except Exception: env[safe_var] = result[col] # 验证公式(使用转换后的安全公式) # 注意:validate_formula现在检查的是别名后的公式,所以会失败 # 我们跳过验证,或者只做基本的安全检查 print('') print(f'最终公式: {formula_safe}') # 基本安全检查(不依赖列名) dangerous_patterns = ['__', 'import', 'exec', 'eval', 'open', 'compile', 'globals', 'locals', '__builtins__'] for pattern in dangerous_patterns: if pattern in formula_safe.lower(): raise ValueError(f'公式包含不允许的操作: {pattern}') # 2. 添加允许的函数到执行环境 env.update(ALLOWED_FUNCTIONS) # 3. 添加numpy(用于数学运算) env['np'] = np print('') print(f'准备执行公式: {formula_safe}') print('') try: # ✨ 使用转换后的安全公式执行计算 computed_values = eval(formula_safe, {"__builtins__": {}}, env) # ✨ 优化:将新列插入到第一个引用列的旁边 # 找到公式中引用的第一个列 first_ref_col = None if normalized_mapping and len(normalized_mapping) > 0: # 使用传入的映射查找 for mapping in normalized_mapping: safe_name = mapping['safeName'] original_name = mapping['originalName'] if safe_name in formula_safe and original_name in result.columns: first_ref_col = original_name break else: # Fallback:遍历所有列查找 for i, col in enumerate(result.columns): safe_var = f'col_{i}' if safe_var in formula_safe: first_ref_col = col break if first_ref_col: ref_col_index = result.columns.get_loc(first_ref_col) result.insert(ref_col_index + 1, new_column_name, computed_values) print(f'计算成功!新列插入在 "{first_ref_col}" 旁边') else: # 如果找不到引用列,添加到最后 result[new_column_name] = computed_values print(f'计算成功!新列添加到最后') print(f'新列类型: {result[new_column_name].dtype}') print(f'新列前5个值:') # 安全打印(避免NaN/inf导致序列化错误) for idx, val in result[new_column_name].head().items(): if pd.isna(val): print(f' [{idx}] None (NaN)') elif np.isinf(val): print(f' [{idx}] None (inf)') else: print(f' [{idx}] {val}') print('') # 统计结果 if pd.api.types.is_numeric_dtype(result[new_column_name]): col_data = result[new_column_name] # 统计缺失值和无效值 nan_count = col_data.isna().sum() inf_count = np.isinf(col_data.replace([np.nan], 0)).sum() print(f'统计信息:') # 只对有效值计算统计量 valid_data = col_data.dropna().replace([np.inf, -np.inf], np.nan).dropna() if len(valid_data) > 0: print(f' 最小值: {valid_data.min():.2f}') print(f' 最大值: {valid_data.max():.2f}') print(f' 平均值: {valid_data.mean():.2f}') else: print(f' 没有有效的数值') if nan_count > 0: print(f' 缺失值(NaN): {nan_count} 个') if inf_count > 0: print(f' 无穷大值(inf): {inf_count} 个') else: print(f'非数值类型,跳过统计') return result except NameError as e: # 列名不存在 missing_col = str(e).split("'")[1] raise ValueError(f'列 "{missing_col}" 不存在,请检查公式中的列名') except ZeroDivisionError: raise ValueError('除零错误:公式中存在除以0的情况') except Exception as e: raise ValueError(f'计算失败: {str(e)}') def get_formula_examples() -> list[Dict[str, str]]: """ 获取公式示例 Returns: 示例列表 """ return [ { 'name': 'BMI计算', 'formula': '体重 / (身高/100)**2', 'description': '体重指数(需要身高(cm)和体重(kg)列)' }, { 'name': '年龄分组', 'formula': 'round(年龄 / 10) * 10', 'description': '按10岁为一组(20, 30, 40...)' }, { 'name': '综合得分', 'formula': '(FMA得分 * 0.6 + ADL得分 * 0.4)', 'description': '加权平均分' }, { 'name': '变化率', 'formula': '(随访值 - 基线值) / 基线值 * 100', 'description': '计算变化百分比' }, { 'name': '对数转换', 'formula': 'log(值 + 1)', 'description': '对数变换(处理偏态分布)' }, ]