""" 缺失值填补操作 - 预写函数 支持:均值、中位数、众数、固定值、前向填充、后向填充、MICE多重插补 """ import pandas as pd import numpy as np from typing import Literal, Optional, List, Dict, Any, Union import sys import io from decimal import Decimal def detect_decimal_places(series: pd.Series) -> int: """ 检测数值列的小数位数 Args: series: 数值列 Returns: 小数位数(0表示整数,最大返回4) """ valid_values = series.dropna() if len(valid_values) == 0: return 2 # 默认2位小数 # 转换为数值 numeric_values = pd.to_numeric(valid_values, errors='coerce').dropna() if len(numeric_values) == 0: return 0 # 非数值列,返回0 max_decimals = 0 for val in numeric_values: # 检查是否是整数 if val == int(val): continue # 转换为字符串检测小数位 val_str = f"{val:.10f}".rstrip('0') if '.' in val_str: decimals = len(val_str.split('.')[-1]) max_decimals = max(max_decimals, decimals) # 限制最大4位小数 return min(max_decimals, 4) def get_column_missing_stats( df: pd.DataFrame, column: str ) -> Dict[str, Any]: """ 获取列的缺失值统计信息 Args: df: 输入数据框 column: 列名 Returns: { 'column': 列名, 'missing_count': 缺失数量, 'missing_rate': 缺失率(百分比), 'valid_count': 有效值数量, 'total_count': 总数量, 'data_type': 数据类型('numeric', 'categorical', 'mixed'), 'value_range': [min, max] or None, # 仅数值型 'mean': 均值 or None, # 仅数值型 'median': 中位数 or None, # 仅数值型 'mode': 众数 or None, 'std': 标准差 or None, # 仅数值型 'recommended_method': 推荐的填补方法 } """ print(f"[fillna] 获取列 '{column}' 的缺失值统计...", flush=True) if column not in df.columns: raise ValueError(f"列 '{column}' 不存在") col_data = df[column] total_count = len(col_data) missing_count = int(col_data.isna().sum()) valid_count = total_count - missing_count missing_rate = (missing_count / total_count * 100) if total_count > 0 else 0 # 判断数据类型 valid_data = col_data.dropna() numeric_col = pd.to_numeric(valid_data, errors='coerce') is_numeric = not numeric_col.isna().all() stats = { 'column': column, 'missing_count': missing_count, 'missing_rate': round(missing_rate, 2), 'valid_count': valid_count, 'total_count': total_count, 'data_type': 'numeric' if is_numeric else 'categorical', 'value_range': None, 'mean': None, 'median': None, 'mode': None, 'std': None, 'recommended_method': None } # 数值型统计 if is_numeric and valid_count > 0: numeric_valid = numeric_col.dropna() stats['value_range'] = [float(numeric_valid.min()), float(numeric_valid.max())] stats['mean'] = float(numeric_valid.mean()) stats['median'] = float(numeric_valid.median()) stats['std'] = float(numeric_valid.std()) # 判断推荐方法(基于偏度) if numeric_valid.std() > 0: skewness = numeric_valid.skew() if abs(skewness) < 0.5: stats['recommended_method'] = 'mean' # 正态分布 else: stats['recommended_method'] = 'median' # 偏态分布 else: stats['recommended_method'] = 'median' else: stats['recommended_method'] = 'mode' # 分类变量 # 众数(数值和分类都可以有) if valid_count > 0: mode_values = col_data.mode() if len(mode_values) > 0: stats['mode'] = mode_values.iloc[0] print(f"[fillna] 统计完成: 缺失{missing_count}个({missing_rate:.1f}%), 推荐方法: {stats['recommended_method']}", flush=True) return stats def fillna_simple( df: pd.DataFrame, column: str, new_column_name: str, method: Literal['mean', 'median', 'mode', 'constant', 'ffill', 'bfill'], fill_value: Any = None ) -> Dict[str, Any]: """ 简单填补缺失值(创建新列) Args: df: 输入数据框 column: 原始列名 new_column_name: 新列名(如"体重_填补") method: 填补方法 - 'mean': 均值填补 - 'median': 中位数填补 - 'mode': 众数填补 - 'constant': 固定值填补 - 'ffill': 前向填充(用前一个非缺失值) - 'bfill': 后向填充(用后一个非缺失值) fill_value: 固定值(method='constant'时必填) Returns: { 'success': True/False, 'result_data': 包含新列的数据框(JSON格式), 'stats': { 'original_column': 原列名, 'new_column': 新列名, 'method': 填补方法, 'missing_before': 填补前缺失数量, 'missing_after': 填补后缺失数量(前/后向填充可能仍有缺失), 'filled_count': 实际填补的数量, 'fill_value': 填补使用的值(如均值、中位数等), 'mean_before': 填补前均值(仅数值型), 'mean_after': 填补后均值(仅数值型), 'std_before': 填补前标准差(仅数值型), 'std_after': 填补后标准差(仅数值型) }, 'message': 操作说明 } """ print(f"[fillna_simple] 开始填补: 列='{column}', 方法={method}, 新列名='{new_column_name}'", flush=True) if column not in df.columns: raise ValueError(f"列 '{column}' 不存在") result = df.copy() col_data = result[column] # 统计填补前的信息 missing_before = int(col_data.isna().sum()) # 尝试转换为数值(用于统计) numeric_col = pd.to_numeric(col_data, errors='coerce') is_numeric = not numeric_col.dropna().empty mean_before = float(numeric_col.mean()) if is_numeric else None std_before = float(numeric_col.std()) if is_numeric else None # 复制原列数据 new_col_data = col_data.copy() # 执行填补 fill_value_used = None if method == 'mean': if not is_numeric: raise ValueError(f"均值填补只能用于数值列,列 '{column}' 不是数值类型") fill_value_used = float(numeric_col.mean()) new_col_data = new_col_data.fillna(fill_value_used) print(f"[fillna_simple] 使用均值填补: {fill_value_used}", flush=True) elif method == 'median': if not is_numeric: raise ValueError(f"中位数填补只能用于数值列,列 '{column}' 不是数值类型") fill_value_used = float(numeric_col.median()) new_col_data = new_col_data.fillna(fill_value_used) print(f"[fillna_simple] 使用中位数填补: {fill_value_used}", flush=True) elif method == 'mode': mode_values = col_data.mode() if len(mode_values) > 0: fill_value_used = mode_values.iloc[0] new_col_data = new_col_data.fillna(fill_value_used) print(f"[fillna_simple] 使用众数填补: {fill_value_used}", flush=True) else: raise ValueError(f"列 '{column}' 无有效值,无法计算众数") elif method == 'constant': if fill_value is None: raise ValueError("固定值填补需要提供 fill_value 参数") fill_value_used = fill_value new_col_data = new_col_data.fillna(fill_value_used) print(f"[fillna_simple] 使用固定值填补: {fill_value_used}", flush=True) elif method == 'ffill': new_col_data = new_col_data.fillna(method='ffill') fill_value_used = '前向填充' print(f"[fillna_simple] 使用前向填充", flush=True) elif method == 'bfill': new_col_data = new_col_data.fillna(method='bfill') fill_value_used = '后向填充' print(f"[fillna_simple] 使用后向填充", flush=True) else: raise ValueError(f"不支持的填补方法: {method}") # ⭐ 应用精度:根据原始数据的小数位数四舍五入 if is_numeric and method in ['mean', 'median']: decimal_places = detect_decimal_places(col_data) print(f"[fillna_simple] 检测到原始列小数位数: {decimal_places}位", flush=True) # 对填补的数值进行四舍五入 numeric_new_col = pd.to_numeric(new_col_data, errors='coerce') new_col_data = numeric_new_col.round(decimal_places) # 对fill_value_used也四舍五入(用于显示) if isinstance(fill_value_used, (int, float)): fill_value_used = round(fill_value_used, decimal_places) print(f"[fillna_simple] 填补值已四舍五入到 {decimal_places} 位小数", flush=True) # 计算填补后的统计信息 missing_after = int(new_col_data.isna().sum()) filled_count = missing_before - missing_after # 转换为数值计算均值和标准差(如果是数值型) numeric_new = pd.to_numeric(new_col_data, errors='coerce') mean_after = float(numeric_new.mean()) if is_numeric else None std_after = float(numeric_new.std()) if is_numeric else None # 插入新列到原列旁边 original_col_index = result.columns.get_loc(column) result.insert(original_col_index + 1, new_column_name, new_col_data) print(f"[fillna_simple] 填补完成: 填补了{filled_count}个缺失值,剩余{missing_after}个", flush=True) # 构建返回结果 stats = { 'original_column': column, 'new_column': new_column_name, 'method': method, 'missing_before': missing_before, 'missing_after': missing_after, 'filled_count': filled_count, 'fill_value': fill_value_used, 'mean_before': mean_before, 'mean_after': mean_after, 'std_before': std_before, 'std_after': std_after } message = f"成功填补列 '{column}',创建新列 '{new_column_name}',填补了 {filled_count} 个缺失值" if missing_after > 0: message += f",剩余 {missing_after} 个缺失值({method}方法的特性)" # 转换为JSON格式(处理NaN) result_json = result.replace({np.nan: None, np.inf: None, -np.inf: None}).to_dict('records') return { 'success': True, 'result_data': result_json, 'stats': stats, 'message': message } def fillna_mice( df: pd.DataFrame, columns: List[str], reference_columns: Optional[List[str]] = None, n_iterations: int = 10, random_state: int = 42 ) -> Dict[str, Any]: """ MICE多重插补(创建新列)⭐ 支持参考列 Args: df: 输入数据框 columns: 要填补的列名列表(如["体重(kg)", "收缩压(mmHg)"])- 会创建新列 reference_columns: 参考列名列表(用于预测,不创建新列)⭐ 新增 n_iterations: 迭代次数(默认10,范围5-50) random_state: 随机种子(默认42,确保结果可重复) Returns: { 'success': True/False, 'result_data': 包含所有新列的数据框(JSON格式), 'stats': { column: { 'original_column': 原列名, 'new_column': 新列名(原名_MICE), 'missing_before': 缺失数量, 'filled_count': 填补数量, 'mean_before': 填补前均值, 'mean_after': 填补后均值, 'std_before': 填补前标准差, 'std_after': 填补后标准差 } for column in columns }, 'message': 操作说明 } 实现细节: 1. 对所选列执行MICE填补 2. 为每列创建新列(命名:原列名_MICE) 3. 使用 df.insert() 将每个新列插入到其原列旁边 4. 返回包含所有新列的完整数据框 示例: target: 体重(kg)、收缩压(mmHg) reference: 年龄、身高、性别 MICE计算:使用5列(2个target + 3个reference) 新列:体重(kg)_MICE、收缩压(mmHg)_MICE(只创建2个) """ # 处理参考列默认值 if reference_columns is None: reference_columns = [] print(f"[fillna_mice] 开始MICE填补: 列={columns}, 参考列={reference_columns}, 迭代次数={n_iterations}", flush=True) try: from sklearn.experimental import enable_iterative_imputer from sklearn.impute import IterativeImputer except ImportError: raise ImportError("MICE功能需要安装 scikit-learn。请运行: pip install scikit-learn") # 验证列存在 for col in columns: if col not in df.columns: raise ValueError(f"列 '{col}' 不存在") result = df.copy() # 统计填补前的信息,并识别无法MICE填补的列 stats_dict = {} columns_to_skip = [] # 需要跳过的列(100%缺失或分类型) valid_numeric_columns = [] # 有效的数值列 skip_reasons = {} # 跳过原因 for col in columns: col_data = result[col] numeric_col = pd.to_numeric(col_data, errors='coerce') missing_before = int(col_data.isna().sum()) valid_count = len(col_data) - missing_before mean_before = float(numeric_col.mean()) if not numeric_col.dropna().empty else None std_before = float(numeric_col.std()) if not numeric_col.dropna().empty else None stats_dict[col] = { 'original_column': col, 'new_column': f"{col}_MICE", 'missing_before': missing_before, 'filled_count': 0, 'mean_before': mean_before, 'mean_after': None, 'std_before': std_before, 'std_after': None } # ⭐ 检查是否100%缺失 if valid_count == 0: print(f"[fillna_mice] ⚠️ 列 '{col}' 100%缺失,将跳过MICE填补", flush=True) columns_to_skip.append(col) skip_reasons[col] = "100%缺失" continue # ⭐ 检查是否为数值型(关键修复!) # 转换为数值后,检查有效值数量 numeric_valid_count = int(numeric_col.notna().sum()) if numeric_valid_count == 0: # 所有非缺失值都无法转为数值 = 分类列 print(f"[fillna_mice] ⚠️ 列 '{col}' 是分类变量(无法转为数值),MICE仅支持数值列", flush=True) print(f"[fillna_mice] 建议使用'众数填补'处理该列", flush=True) columns_to_skip.append(col) skip_reasons[col] = "分类变量" elif numeric_valid_count < valid_count * 0.5: # 超过50%的有效值无法转为数值 = 混合型,可能有问题 print(f"[fillna_mice] ⚠️ 列 '{col}' 数据类型混乱(仅{numeric_valid_count}/{valid_count}可转为数值)", flush=True) columns_to_skip.append(col) skip_reasons[col] = "数据类型混乱" else: # 有效的数值列 valid_numeric_columns.append(col) print(f"[fillna_mice] ✓ 列 '{col}' 检测为数值列,将进行MICE填补", flush=True) # 如果没有有效的数值列 if len(valid_numeric_columns) == 0: skip_summary = ", ".join([f"{col}({reason})" for col, reason in skip_reasons.items()]) raise ValueError( f"所选列均无法进行MICE填补:{skip_summary}。\n\n" f"💡 MICE多重插补仅适用于数值型列(如:年龄、体重、评分等)。\n" f" 对于分类变量(如:婚姻状况、性别、职业),请使用'众数填补'。" ) # ⭐ 处理参考列(用于预测,不创建新列) valid_reference_columns = [] skipped_reference_columns = [] if reference_columns: print(f"[fillna_mice] 开始处理参考列...", flush=True) for ref_col in reference_columns: if ref_col not in result.columns: print(f"[fillna_mice] ⚠️ 参考列 '{ref_col}' 不存在,已跳过", flush=True) continue # 检查是否为数值型 ref_col_data = result[ref_col] numeric_col = pd.to_numeric(ref_col_data, errors='coerce') valid_count = int(ref_col_data.notna().sum()) numeric_valid_count = int(numeric_col.notna().sum()) if valid_count == 0: print(f"[fillna_mice] ⚠️ 参考列 '{ref_col}' 100%缺失,已跳过", flush=True) skipped_reference_columns.append(ref_col) elif numeric_valid_count == 0: print(f"[fillna_mice] ⚠️ 参考列 '{ref_col}' 是分类变量,已跳过", flush=True) skipped_reference_columns.append(ref_col) elif numeric_valid_count < valid_count * 0.5: print(f"[fillna_mice] ⚠️ 参考列 '{ref_col}' 数据类型混乱,已跳过", flush=True) skipped_reference_columns.append(ref_col) else: valid_reference_columns.append(ref_col) print(f"[fillna_mice] ✓ 参考列 '{ref_col}' 检测为数值列,将用于MICE预测", flush=True) # ⭐ 合并target列和reference列进行MICE计算 all_mice_columns = valid_numeric_columns + valid_reference_columns print(f"[fillna_mice] MICE将使用 {len(all_mice_columns)} 列进行计算: {len(valid_numeric_columns)}个目标列 + {len(valid_reference_columns)}个参考列", flush=True) # 提取所有MICE计算需要的列 df_subset = result[all_mice_columns].copy() # 将所有列转换为数值 for col in all_mice_columns: df_subset[col] = pd.to_numeric(df_subset[col], errors='coerce') # 检查是否至少有一列有缺失值 total_missing = df_subset.isna().sum().sum() if len(columns_to_skip) > 0: skip_details = [f"{col}({skip_reasons[col]})" for col in columns_to_skip] skip_msg = f"(跳过了{len(columns_to_skip)}列: {', '.join(skip_details)})" print(f"[fillna_mice] {skip_msg}", flush=True) if total_missing == 0: print("[fillna_mice] 警告: 数值列均无缺失值,跳过MICE填补", flush=True) # 为所有列创建副本列(包括跳过的列) final_data = pd.DataFrame() for col in result.columns: final_data[col] = result[col] if col in columns: final_data[f"{col}_MICE"] = result[col].copy() result_json = final_data.replace({np.nan: None, np.inf: None, -np.inf: None}).to_dict('records') return { 'success': True, 'result_data': result_json, 'stats': stats_dict, 'message': "所选列均无缺失值,已创建副本列" } print(f"[fillna_mice] 总共有 {total_missing} 个缺失值需要填补(在{len(valid_numeric_columns)}个数值列中)", flush=True) # 执行MICE填补 print(f"[fillna_mice] 正在执行MICE算法(可能需要一些时间)...", flush=True) imputer = IterativeImputer( max_iter=n_iterations, random_state=random_state, verbose=0 ) try: imputed_array = imputer.fit_transform(df_subset) # ⭐ 修复:使用all_mice_columns(包含target列和reference列) df_imputed = pd.DataFrame(imputed_array, columns=all_mice_columns, index=df_subset.index) print(f"[fillna_mice] MICE填补完成", flush=True) # ⭐ 修复:重建DataFrame,处理有效列和跳过的列 new_columns_data = {} # 处理有效的数值列(已填补的) for col in valid_numeric_columns: new_col_name = f"{col}_MICE" new_col_data = df_imputed[col].copy() # ⭐ 应用精度:根据原始数据的小数位数四舍五入 decimal_places = detect_decimal_places(result[col]) new_col_data = new_col_data.round(decimal_places) print(f"[fillna_mice] 列 '{col}': 四舍五入到 {decimal_places} 位小数", flush=True) # 计算填补后的统计信息 missing_after = int(new_col_data.isna().sum()) filled_count = stats_dict[col]['missing_before'] - missing_after mean_after = float(new_col_data.mean()) std_after = float(new_col_data.std()) # 更新统计信息 stats_dict[col]['filled_count'] = filled_count stats_dict[col]['mean_after'] = mean_after stats_dict[col]['std_after'] = std_after # 暂存新列数据 new_columns_data[col] = new_col_data print(f"[fillna_mice] 列 '{col}': 填补了 {filled_count} 个缺失值", flush=True) # 处理跳过的列,创建原样的MICE列 for col in columns_to_skip: new_columns_data[col] = result[col].copy() # 保持原样 stats_dict[col]['filled_count'] = 0 stats_dict[col]['mean_after'] = None stats_dict[col]['std_after'] = None reason = skip_reasons.get(col, "未知原因") print(f"[fillna_mice] 列 '{col}': {reason},已创建原样副本列", flush=True) # ⭐ 重建DataFrame:按原始列顺序,仅为选中的列后跟其MICE列 final_data = pd.DataFrame() for col in result.columns: final_data[col] = result[col] # 只为用户选择的列(columns)插入MICE列 if col in columns: # 关键修复:检查是否为用户选择的列 if col in new_columns_data: final_data[f"{col}_MICE"] = new_columns_data[col] result = final_data print(f"[fillna_mice] 所有新列已插入到原列旁边,最终列数: {len(result.columns)}", flush=True) print(f"[fillna_mice] 原始列数: {len(result.columns) - len(columns)}, 新增MICE列数: {len(columns)}", flush=True) # 转换为JSON格式 result_json = result.replace({np.nan: None, np.inf: None, -np.inf: None}).to_dict('records') total_filled = sum(s['filled_count'] for s in stats_dict.values()) # 构建消息 message_parts = [] message_parts.append(f"MICE填补完成,共填补 {total_filled} 个缺失值") message_parts.append(f"创建了 {len(valid_numeric_columns)} 个新列") if len(valid_reference_columns) > 0: message_parts.append(f"使用了 {len(valid_reference_columns)} 个参考列进行预测") if len(columns_to_skip) > 0: skip_summary = ", ".join([f"{col}({skip_reasons[col]})" for col in columns_to_skip]) message_parts.append(f"跳过{len(columns_to_skip)}列:{skip_summary}(请使用众数填补)") message = ",".join(message_parts) return { 'success': True, 'result_data': result_json, 'stats': stats_dict, 'message': message } except Exception as e: print(f"[fillna_mice] MICE填补失败: {str(e)}", flush=True) raise ValueError(f"MICE填补失败: {str(e)}")