""" 数值映射(重编码)操作 将分类变量的原始值映射为新值(如:男→1,女→2)。 """ import pandas as pd import numpy as np from typing import Dict, Any, Optional, Literal def apply_recode( df: pd.DataFrame, column: str, mapping: Dict[Any, Any], create_new_column: bool = True, new_column_name: Optional[str] = None, na_handling: Literal['keep', 'map', 'drop'] = 'keep', na_value: Any = None ) -> pd.DataFrame: """ 应用数值映射 Args: df: 输入数据框 column: 要重编码的列名 mapping: 映射字典,如 {'男': 1, '女': 2} create_new_column: 是否创建新列(True)或覆盖原列(False) new_column_name: 新列名(create_new_column=True时使用) na_handling: NA值处理方式 - 'keep': 保持为NA(默认) - 'map': 映射为指定值 - 'drop': 删除包含NA的行 na_value: 当na_handling='map'时,NA映射到的值 Returns: 重编码后的数据框 Examples: >>> df = pd.DataFrame({'性别': ['男', '女', '男', None]}) >>> mapping = {'男': 1, '女': 2} >>> result = apply_recode(df, '性别', mapping, True, '性别_编码', na_handling='map', na_value=0) >>> result['性别_编码'].tolist() [1, 2, 1, 0] """ if df.empty: return df # 验证列是否存在 if column not in df.columns: raise KeyError(f"列 '{column}' 不存在") if not mapping: raise ValueError('映射字典不能为空') # 确定目标列名 if create_new_column: target_column = new_column_name or f'{column}_编码' else: target_column = column # 创建结果数据框(避免修改原数据) result = df.copy() # ✨ 统计原始NA数量 original_na_count = result[column].isna().sum() # ✨ 优化:如果是创建新列,插入到原列旁边 if create_new_column: original_col_index = result.columns.get_loc(column) result.insert(original_col_index + 1, target_column, result[column].map(mapping)) else: # 覆盖原列 result[target_column] = result[column].map(mapping) # ✨ 处理NA值 if original_na_count > 0: na_mask = result[column].isna() if na_handling == 'keep': # 保持为NA(已经是NA,无需操作) print(f'📊 NA处理:保持为NA({original_na_count}个)') elif na_handling == 'map': # 映射为指定值 result.loc[na_mask, target_column] = na_value print(f'📊 NA处理:映射为 {na_value}({original_na_count}个)') elif na_handling == 'drop': # 删除包含NA的行 rows_before = len(result) result = result[~na_mask].copy() rows_after = len(result) print(f'📊 NA处理:删除包含NA的行(删除{rows_before - rows_after}行)') # 统计结果 mapped_count = result[target_column].notna().sum() unmapped_count = result[target_column].isna().sum() total_count = len(result) print(f'映射完成: {mapped_count} 个值成功映射') if unmapped_count > 0: print(f'警告: {unmapped_count} 个值未找到对应映射') # 找出未映射的唯一值 unmapped_mask = result[target_column].isna() unmapped_values = result.loc[unmapped_mask, column].unique() print(f'未映射的值: {list(unmapped_values)[:10]}') # 最多显示10个 # 映射成功率 success_rate = (mapped_count / total_count * 100) if total_count > 0 else 0 print(f'映射成功率: {success_rate:.1f}%') return result