""" 条件生成列 - 预写函数 支持复杂的IF-THEN-ELSE多条件逻辑 """ import pandas as pd from typing import List, Dict, Any, Union def apply_conditional_column( df: pd.DataFrame, new_column_name: str, rules: List[Dict[str, Any]], else_value: Any = None ) -> pd.DataFrame: """ 根据多条件规则生成新列 Args: df: 输入数据框 new_column_name: 新列名称 rules: 规则列表,每个规则包含: - conditions: 条件列表 - logic: 'and' 或 'or' - result: 满足条件时的结果值 else_value: 所有规则都不满足时的默认值 Returns: 添加了新列的数据框 示例: rules = [ { "conditions": [ {"column": "年龄", "operator": ">=", "value": 60} ], "logic": "and", "result": "老年" }, { "conditions": [ {"column": "年龄", "operator": ">=", "value": 18}, {"column": "年龄", "operator": "<", "value": 60} ], "logic": "and", "result": "成年" } ] """ result = df.copy() # 验证规则 if not rules or len(rules) == 0: raise ValueError('至少需要1条规则') # 验证所有引用的列是否存在 for rule in rules: for condition in rule.get('conditions', []): column = condition.get('column') if column not in result.columns: raise ValueError(f'列 "{column}" 不存在') # 初始化新列为else_value result[new_column_name] = else_value print(f'开始应用条件规则,共 {len(rules)} 条规则') # 按顺序应用每条规则 for rule_idx, rule in enumerate(rules, 1): conditions = rule.get('conditions', []) logic = rule.get('logic', 'and') result_value = rule.get('result') if not conditions: continue # 构建每个条件的mask masks = [] for condition in conditions: column = condition['column'] operator = condition['operator'] value = condition['value'] # 智能类型转换:对于数字比较运算符,尝试将列转换为数字 if operator in ('>', '<', '>=', '<='): # 尝试将列转换为数字类型 try: col_data = pd.to_numeric(result[column], errors='coerce') # 确保value也是数字 if not isinstance(value, (int, float)): value = float(value) except Exception: # 如果转换失败,使用原始数据 col_data = result[column] else: # 对于相等/不相等比较,使用原始数据 col_data = result[column] # 根据运算符生成mask if operator == '=': mask = col_data == value elif operator == '!=': mask = col_data != value elif operator == '>': mask = col_data > value elif operator == '<': mask = col_data < value elif operator == '>=': mask = col_data >= value elif operator == '<=': mask = col_data <= value elif operator == 'is_null': # ✨ 新增:为空 mask = result[column].isna() elif operator == 'not_null': # ✨ 新增:不为空 mask = result[column].notna() else: raise ValueError(f'不支持的运算符: {operator}') masks.append(mask) # 组合条件 if logic == 'and': final_mask = pd.concat(masks, axis=1).all(axis=1) elif logic == 'or': final_mask = pd.concat(masks, axis=1).any(axis=1) else: raise ValueError(f'不支持的逻辑运算符: {logic}') # 应用规则 matched_count = final_mask.sum() result.loc[final_mask, new_column_name] = result_value print(f' 规则{rule_idx}: 匹配 {matched_count} 行 → 值为 {result_value}') # ✨ 优化:将新列移到第一个引用列旁边 first_ref_col = rules[0]['conditions'][0]['column'] # 使用第一个规则的第一个条件列作为参考 original_col_index = result.columns.get_loc(first_ref_col) cols = list(result.columns) # 移除新列(当前在最后) cols.remove(new_column_name) # 插入到原列旁边 cols.insert(original_col_index + 1, new_column_name) result = result[cols] # 统计结果分布 print(f'\n结果分布:') value_counts = result[new_column_name].value_counts(dropna=False) for value, count in value_counts.items(): percentage = count / len(result) * 100 if pd.isna(value): print(f' (空值): {count} 行 ({percentage:.1f}%)') else: print(f' {value}: {count} 行 ({percentage:.1f}%)') return result def apply_simple_binning( df: pd.DataFrame, column: str, new_column_name: str, threshold: float, value_if_true: Any = 1, value_if_false: Any = 0 ) -> pd.DataFrame: """ 简单二分类(单一阈值判断) 这是条件生成列的简化版本,用于单一阈值判断 Args: df: 输入数据框 column: 用于判断的列 new_column_name: 新列名称 threshold: 阈值 value_if_true: >= threshold时的值 value_if_false: < threshold时的值 Returns: 添加了新列的数据框 示例: 住院患者暴露分组: 督脉针刺持续时间 >= 10 → 1 (暴露) 督脉针刺持续时间 < 10 → 0 (非暴露) """ result = df.copy() if column not in result.columns: raise ValueError(f'列 "{column}" 不存在') # 简单的阈值判断 result[new_column_name] = (result[column] >= threshold).astype(int) * value_if_true + \ (result[column] < threshold).astype(int) * value_if_false # 统计分布 print(f'简单二分类结果:') print(f' {column} >= {threshold}: {(result[column] >= threshold).sum()} 行 → {value_if_true}') print(f' {column} < {threshold}: {(result[column] < threshold).sum()} 行 → {value_if_false}') return result