""" 生成分类变量(分箱)操作 将连续数值变量转换为分类变量。 支持三种方法:自定义切点、等宽分箱、等频分箱。 """ import pandas as pd import numpy as np from typing import List, Optional, Literal, Union def apply_binning( df: pd.DataFrame, column: str, method: Literal['custom', 'equal_width', 'equal_freq'], new_column_name: str, bins: Optional[List[Union[int, float]]] = None, labels: Optional[List[Union[str, int]]] = None, num_bins: int = 3, na_handling: Literal['keep', 'label', 'assign'] = 'keep', na_label: Optional[str] = None, na_assign_to: Optional[int] = None ) -> pd.DataFrame: """ 应用分箱操作 Args: df: 输入数据框 column: 要分箱的列名 method: 分箱方法 - 'custom': 自定义切点 - 'equal_width': 等宽分箱 - 'equal_freq': 等频分箱 new_column_name: 新列名 bins: 自定义切点列表(仅method='custom'时使用),如 [18, 60] → <18, 18-60, >60 labels: 标签列表(可选) num_bins: 分组数量(仅method='equal_width'或'equal_freq'时使用) na_handling: NA值处理方式 - 'keep': 保持为NA(默认) - 'label': 标记为指定标签 - 'assign': 分配到指定组 na_label: 当na_handling='label'时,NA的标签(如"缺失") na_assign_to: 当na_handling='assign'时,NA分配到的组索引 Returns: 分箱后的数据框 Examples: >>> df = pd.DataFrame({'年龄': [15, 25, 35, 45, 55, 65, 75, None]}) >>> result = apply_binning(df, '年龄', 'custom', '年龄分组', ... bins=[18, 60], labels=['青少年', '成年', '老年'], ... na_handling='label', na_label='缺失') >>> result['年龄分组'].tolist() ['青少年', '成年', '成年', '成年', '成年', '老年', '老年', '缺失'] """ if df.empty: return df # 验证列是否存在 if column not in df.columns: raise KeyError(f"列 '{column}' 不存在") # 创建结果数据框 result = df.copy() # ✨ 记录原始NA的位置(在分箱前) original_na_mask = result[column].isna() original_na_count = original_na_mask.sum() # 验证并转换数据类型 if not pd.api.types.is_numeric_dtype(result[column]): # 尝试将字符串转换为数值 try: result[column] = pd.to_numeric(result[column], errors='coerce') print(f"警告: 列 '{column}' 已自动转换为数值类型") except Exception as e: raise TypeError(f"列 '{column}' 不是数值类型且无法转换,无法进行分箱") # 检查是否有有效的数值 if result[column].isna().all(): raise ValueError(f"列 '{column}' 中没有有效的数值,无法进行分箱") # 根据方法进行分箱 if method == 'custom': # 自定义切点(用户输入的是中间切点,需要自动添加边界) if not bins or len(bins) < 1: raise ValueError('自定义切点至少需要1个值') # 验证切点是否升序 if bins != sorted(bins): raise ValueError('切点必须按升序排列') # 自动添加左右边界 # 重要:始终添加边界,确保切点数+1=区间数 min_val = result[column].min() max_val = result[column].max() print(f'用户输入切点: {bins}') print(f'数据范围: [{min_val:.2f}, {max_val:.2f}]') # 构建完整的边界数组:始终添加左右边界 # 左边界:取min(用户第一个切点, 数据最小值) - 0.001 # 右边界:取max(用户最后一个切点, 数据最大值) + 0.001 left_bound = min(bins[0], min_val) - 0.001 right_bound = max(bins[-1], max_val) + 0.001 full_bins = [left_bound] + bins + [right_bound] print(f'完整边界: {[f"{b:.1f}" for b in full_bins]}') print(f'将生成 {len(full_bins) - 1} 个区间 = {len(bins) + 1} 个区间') # 验证标签数量(区间数 = 边界数 - 1) expected_label_count = len(full_bins) - 1 if labels and len(labels) != expected_label_count: raise ValueError(f'标签数量({len(labels)})必须等于区间数量({expected_label_count})') result[new_column_name] = pd.cut( result[column], bins=full_bins, labels=labels, right=False, include_lowest=True ) elif method == 'equal_width': # 等宽分箱 if num_bins < 2: raise ValueError('分组数量至少为2') result[new_column_name] = pd.cut( result[column], bins=num_bins, labels=labels, include_lowest=True ) elif method == 'equal_freq': # 等频分箱 if num_bins < 2: raise ValueError('分组数量至少为2') result[new_column_name] = pd.qcut( result[column], q=num_bins, labels=labels, duplicates='drop' # 处理重复边界值 ) else: raise ValueError(f"不支持的分箱方法: {method}") # ✨ 重要:将Categorical类型转换为object类型,避免"nan"字符串问题 result[new_column_name] = result[new_column_name].astype('object') # ✨ 优化:将新列移到原列旁边 original_col_index = result.columns.get_loc(column) cols = list(result.columns) # 移除新列(当前在最后) cols.remove(new_column_name) # 插入到原列旁边 cols.insert(original_col_index + 1, new_column_name) result = result[cols] # ✨ 处理NA值(使用分箱前记录的NA位置) if original_na_count > 0: if na_handling == 'keep': # 保持为NA(显式设置为None,避免显示为"nan"字符串) result.loc[original_na_mask, new_column_name] = None print(f'📊 NA处理:保持为NA({original_na_count}个)', flush=True) elif na_handling == 'label': # 标记为指定标签 label_to_use = na_label if na_label else '空值/NA' result.loc[original_na_mask, new_column_name] = label_to_use print(f'📊 NA处理:标记为 "{label_to_use}"({original_na_count}个)', flush=True) elif na_handling == 'assign': # 分配到指定组(通过labels) if labels and na_assign_to is not None and 0 <= na_assign_to < len(labels): result.loc[original_na_mask, new_column_name] = labels[na_assign_to] print(f'📊 NA处理:分配到组 "{labels[na_assign_to]}"({original_na_count}个)', flush=True) else: print(f'⚠️ 警告:na_assign_to无效,NA保持为空', flush=True) # 统计分布 print(f'分箱结果分布:') value_counts = result[new_column_name].value_counts().sort_index() for category, count in value_counts.items(): percentage = count / len(result) * 100 print(f' {category}: {count} 行 ({percentage:.1f}%)') # 缺失值统计 missing_count = result[new_column_name].isna().sum() if missing_count > 0: print(f'警告: {missing_count} 个值无法分箱(可能是缺失值或边界问题)') return result