""" 生成分类变量(分箱)操作 将连续数值变量转换为分类变量。 支持三种方法:自定义切点、等宽分箱、等频分箱。 """ import pandas as pd import numpy as np from typing import List, Optional, Literal, Union def apply_binning( df: pd.DataFrame, column: str, method: Literal['custom', 'equal_width', 'equal_freq'], new_column_name: str, bins: Optional[List[Union[int, float]]] = None, labels: Optional[List[Union[str, int]]] = None, num_bins: int = 3 ) -> pd.DataFrame: """ 应用分箱操作 Args: df: 输入数据框 column: 要分箱的列名 method: 分箱方法 - 'custom': 自定义切点 - 'equal_width': 等宽分箱 - 'equal_freq': 等频分箱 new_column_name: 新列名 bins: 自定义切点列表(仅method='custom'时使用),如 [18, 60] → <18, 18-60, >60 labels: 标签列表(可选) num_bins: 分组数量(仅method='equal_width'或'equal_freq'时使用) Returns: 分箱后的数据框 Examples: >>> df = pd.DataFrame({'年龄': [15, 25, 35, 45, 55, 65, 75]}) >>> result = apply_binning(df, '年龄', 'custom', '年龄分组', ... bins=[18, 60], labels=['青少年', '成年', '老年']) >>> result['年龄分组'].tolist() ['青少年', '成年', '成年', '成年', '成年', '老年', '老年'] """ if df.empty: return df # 验证列是否存在 if column not in df.columns: raise KeyError(f"列 '{column}' 不存在") # 验证数据类型 if not pd.api.types.is_numeric_dtype(df[column]): raise TypeError(f"列 '{column}' 不是数值类型,无法进行分箱") # 创建结果数据框 result = df.copy() # 根据方法进行分箱 if method == 'custom': # 自定义切点 if not bins or len(bins) < 2: raise ValueError('自定义切点至少需要2个值') # 验证切点是否升序 if bins != sorted(bins): raise ValueError('切点必须按升序排列') # 验证标签数量 if labels and len(labels) != len(bins) - 1: raise ValueError(f'标签数量({len(labels)})必须等于切点数量-1({len(bins)-1})') result[new_column_name] = pd.cut( result[column], bins=bins, labels=labels, right=False, include_lowest=True ) elif method == 'equal_width': # 等宽分箱 if num_bins < 2: raise ValueError('分组数量至少为2') result[new_column_name] = pd.cut( result[column], bins=num_bins, labels=labels, include_lowest=True ) elif method == 'equal_freq': # 等频分箱 if num_bins < 2: raise ValueError('分组数量至少为2') result[new_column_name] = pd.qcut( result[column], q=num_bins, labels=labels, duplicates='drop' # 处理重复边界值 ) else: raise ValueError(f"不支持的分箱方法: {method}") # 统计分布 print(f'分箱结果分布:') value_counts = result[new_column_name].value_counts().sort_index() for category, count in value_counts.items(): percentage = count / len(result) * 100 print(f' {category}: {count} 行 ({percentage:.1f}%)') # 缺失值统计 missing_count = result[new_column_name].isna().sum() if missing_count > 0: print(f'警告: {missing_count} 个值无法分箱(可能是缺失值或边界问题)') return result