Files
HaHafeng 200eab5c2e feat(dc-tool-c): Tool C UX重大改进 - 列头筛选/行号/滚动条/全量数据
新功能
- 列头筛选:Excel风格筛选功能(Community版本,中文本地化,显示唯一值及计数)
- 行号列:添加固定行号列(#列头,灰色背景,左侧固定)
- 全量数据加载:不再限制50行预览,Session加载全量数据
- 全量数据返回:所有快速操作(筛选/映射/分箱/条件/删NA/计算/Pivot)全量返回结果

 Bug修复
- 滚动条终极修复:修改MainLayout为固定高度(h-screen + overflow-hidden),整个浏览器窗口无滚动条,只有AG Grid内部滚动
- 计算列全角字符修复:自动转换中文括号等全角字符为半角
- 计算列特殊字符列名修复:完善列别名机制,支持任意特殊字符列名

 UI优化
- 删除'表格仅展示前50行'提示条,减少干扰
- 筛选对话框美化:白色背景,圆角,阴影
- 列头筛选图标优化:清晰可见,易于点击

 文档更新
- 工具C_功能按钮开发计划_V1.0.md:添加V1.5版本记录
- 工具C_MVP开发_TODO清单.md:添加Day 8 UX优化内容
- 00-工具C当前状态与开发指南.md:更新进度为98%
- 00-模块当前状态与开发指南.md:更新DC模块状态
- 00-系统当前状态与开发指南.md:更新系统整体状态

 影响范围
- Python微服务:无修改
- Node.js后端:5处代码修改(SessionService + QuickActionController + AICodeService)
- 前端:MainLayout + DataGrid + ag-grid-custom.css + index.tsx
- 完成度:Tool C整体完成度提升至98%

 代码统计
- 修改文件:~15个文件
- 新增行数:~200行
- 修改行数:~150行

Co-authored-by: AI Assistant <assistant@example.com>
2025-12-10 18:02:42 +08:00

606 lines
24 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
缺失值填补操作 - 预写函数
支持均值、中位数、众数、固定值、前向填充、后向填充、MICE多重插补
"""
import pandas as pd
import numpy as np
from typing import Literal, Optional, List, Dict, Any, Union
import sys
import io
from decimal import Decimal
def detect_decimal_places(series: pd.Series) -> int:
"""
检测数值列的小数位数
Args:
series: 数值列
Returns:
小数位数0表示整数最大返回4
"""
valid_values = series.dropna()
if len(valid_values) == 0:
return 2 # 默认2位小数
# 转换为数值
numeric_values = pd.to_numeric(valid_values, errors='coerce').dropna()
if len(numeric_values) == 0:
return 0 # 非数值列返回0
max_decimals = 0
for val in numeric_values:
# 检查是否是整数
if val == int(val):
continue
# 转换为字符串检测小数位
val_str = f"{val:.10f}".rstrip('0')
if '.' in val_str:
decimals = len(val_str.split('.')[-1])
max_decimals = max(max_decimals, decimals)
# 限制最大4位小数
return min(max_decimals, 4)
def get_column_missing_stats(
df: pd.DataFrame,
column: str
) -> Dict[str, Any]:
"""
获取列的缺失值统计信息
Args:
df: 输入数据框
column: 列名
Returns:
{
'column': 列名,
'missing_count': 缺失数量,
'missing_rate': 缺失率(百分比),
'valid_count': 有效值数量,
'total_count': 总数量,
'data_type': 数据类型('numeric', 'categorical', 'mixed'),
'value_range': [min, max] or None, # 仅数值型
'mean': 均值 or None, # 仅数值型
'median': 中位数 or None, # 仅数值型
'mode': 众数 or None,
'std': 标准差 or None, # 仅数值型
'recommended_method': 推荐的填补方法
}
"""
print(f"[fillna] 获取列 '{column}' 的缺失值统计...", flush=True)
if column not in df.columns:
raise ValueError(f"'{column}' 不存在")
col_data = df[column]
total_count = len(col_data)
missing_count = int(col_data.isna().sum())
valid_count = total_count - missing_count
missing_rate = (missing_count / total_count * 100) if total_count > 0 else 0
# 判断数据类型
valid_data = col_data.dropna()
numeric_col = pd.to_numeric(valid_data, errors='coerce')
is_numeric = not numeric_col.isna().all()
stats = {
'column': column,
'missing_count': missing_count,
'missing_rate': round(missing_rate, 2),
'valid_count': valid_count,
'total_count': total_count,
'data_type': 'numeric' if is_numeric else 'categorical',
'value_range': None,
'mean': None,
'median': None,
'mode': None,
'std': None,
'recommended_method': None
}
# 数值型统计
if is_numeric and valid_count > 0:
numeric_valid = numeric_col.dropna()
stats['value_range'] = [float(numeric_valid.min()), float(numeric_valid.max())]
stats['mean'] = float(numeric_valid.mean())
stats['median'] = float(numeric_valid.median())
stats['std'] = float(numeric_valid.std())
# 判断推荐方法(基于偏度)
if numeric_valid.std() > 0:
skewness = numeric_valid.skew()
if abs(skewness) < 0.5:
stats['recommended_method'] = 'mean' # 正态分布
else:
stats['recommended_method'] = 'median' # 偏态分布
else:
stats['recommended_method'] = 'median'
else:
stats['recommended_method'] = 'mode' # 分类变量
# 众数(数值和分类都可以有)
if valid_count > 0:
mode_values = col_data.mode()
if len(mode_values) > 0:
stats['mode'] = mode_values.iloc[0]
print(f"[fillna] 统计完成: 缺失{missing_count}个({missing_rate:.1f}%), 推荐方法: {stats['recommended_method']}", flush=True)
return stats
def fillna_simple(
df: pd.DataFrame,
column: str,
new_column_name: str,
method: Literal['mean', 'median', 'mode', 'constant', 'ffill', 'bfill'],
fill_value: Any = None
) -> Dict[str, Any]:
"""
简单填补缺失值(创建新列)
Args:
df: 输入数据框
column: 原始列名
new_column_name: 新列名(如"体重_填补"
method: 填补方法
- 'mean': 均值填补
- 'median': 中位数填补
- 'mode': 众数填补
- 'constant': 固定值填补
- 'ffill': 前向填充(用前一个非缺失值)
- 'bfill': 后向填充(用后一个非缺失值)
fill_value: 固定值method='constant'时必填)
Returns:
{
'success': True/False,
'result_data': 包含新列的数据框JSON格式,
'stats': {
'original_column': 原列名,
'new_column': 新列名,
'method': 填补方法,
'missing_before': 填补前缺失数量,
'missing_after': 填补后缺失数量(前/后向填充可能仍有缺失),
'filled_count': 实际填补的数量,
'fill_value': 填补使用的值(如均值、中位数等),
'mean_before': 填补前均值(仅数值型),
'mean_after': 填补后均值(仅数值型),
'std_before': 填补前标准差(仅数值型),
'std_after': 填补后标准差(仅数值型)
},
'message': 操作说明
}
"""
print(f"[fillna_simple] 开始填补: 列='{column}', 方法={method}, 新列名='{new_column_name}'", flush=True)
if column not in df.columns:
raise ValueError(f"'{column}' 不存在")
result = df.copy()
col_data = result[column]
# 统计填补前的信息
missing_before = int(col_data.isna().sum())
# 尝试转换为数值(用于统计)
numeric_col = pd.to_numeric(col_data, errors='coerce')
is_numeric = not numeric_col.dropna().empty
mean_before = float(numeric_col.mean()) if is_numeric else None
std_before = float(numeric_col.std()) if is_numeric else None
# 复制原列数据
new_col_data = col_data.copy()
# 执行填补
fill_value_used = None
if method == 'mean':
if not is_numeric:
raise ValueError(f"均值填补只能用于数值列,列 '{column}' 不是数值类型")
fill_value_used = float(numeric_col.mean())
new_col_data = new_col_data.fillna(fill_value_used)
print(f"[fillna_simple] 使用均值填补: {fill_value_used}", flush=True)
elif method == 'median':
if not is_numeric:
raise ValueError(f"中位数填补只能用于数值列,列 '{column}' 不是数值类型")
fill_value_used = float(numeric_col.median())
new_col_data = new_col_data.fillna(fill_value_used)
print(f"[fillna_simple] 使用中位数填补: {fill_value_used}", flush=True)
elif method == 'mode':
mode_values = col_data.mode()
if len(mode_values) > 0:
fill_value_used = mode_values.iloc[0]
new_col_data = new_col_data.fillna(fill_value_used)
print(f"[fillna_simple] 使用众数填补: {fill_value_used}", flush=True)
else:
raise ValueError(f"'{column}' 无有效值,无法计算众数")
elif method == 'constant':
if fill_value is None:
raise ValueError("固定值填补需要提供 fill_value 参数")
fill_value_used = fill_value
new_col_data = new_col_data.fillna(fill_value_used)
print(f"[fillna_simple] 使用固定值填补: {fill_value_used}", flush=True)
elif method == 'ffill':
new_col_data = new_col_data.fillna(method='ffill')
fill_value_used = '前向填充'
print(f"[fillna_simple] 使用前向填充", flush=True)
elif method == 'bfill':
new_col_data = new_col_data.fillna(method='bfill')
fill_value_used = '后向填充'
print(f"[fillna_simple] 使用后向填充", flush=True)
else:
raise ValueError(f"不支持的填补方法: {method}")
# ⭐ 应用精度:根据原始数据的小数位数四舍五入
if is_numeric and method in ['mean', 'median']:
decimal_places = detect_decimal_places(col_data)
print(f"[fillna_simple] 检测到原始列小数位数: {decimal_places}", flush=True)
# 对填补的数值进行四舍五入
numeric_new_col = pd.to_numeric(new_col_data, errors='coerce')
new_col_data = numeric_new_col.round(decimal_places)
# 对fill_value_used也四舍五入用于显示
if isinstance(fill_value_used, (int, float)):
fill_value_used = round(fill_value_used, decimal_places)
print(f"[fillna_simple] 填补值已四舍五入到 {decimal_places} 位小数", flush=True)
# 计算填补后的统计信息
missing_after = int(new_col_data.isna().sum())
filled_count = missing_before - missing_after
# 转换为数值计算均值和标准差(如果是数值型)
numeric_new = pd.to_numeric(new_col_data, errors='coerce')
mean_after = float(numeric_new.mean()) if is_numeric else None
std_after = float(numeric_new.std()) if is_numeric else None
# 插入新列到原列旁边
original_col_index = result.columns.get_loc(column)
result.insert(original_col_index + 1, new_column_name, new_col_data)
print(f"[fillna_simple] 填补完成: 填补了{filled_count}个缺失值,剩余{missing_after}", flush=True)
# 构建返回结果
stats = {
'original_column': column,
'new_column': new_column_name,
'method': method,
'missing_before': missing_before,
'missing_after': missing_after,
'filled_count': filled_count,
'fill_value': fill_value_used,
'mean_before': mean_before,
'mean_after': mean_after,
'std_before': std_before,
'std_after': std_after
}
message = f"成功填补列 '{column}',创建新列 '{new_column_name}',填补了 {filled_count} 个缺失值"
if missing_after > 0:
message += f",剩余 {missing_after} 个缺失值({method}方法的特性)"
# 转换为JSON格式处理NaN
result_json = result.replace({np.nan: None, np.inf: None, -np.inf: None}).to_dict('records')
return {
'success': True,
'result_data': result_json,
'stats': stats,
'message': message
}
def fillna_mice(
df: pd.DataFrame,
columns: List[str],
reference_columns: Optional[List[str]] = None,
n_iterations: int = 10,
random_state: int = 42
) -> Dict[str, Any]:
"""
MICE多重插补创建新列⭐ 支持参考列
Args:
df: 输入数据框
columns: 要填补的列名列表(如["体重kg", "收缩压mmHg"]- 会创建新列
reference_columns: 参考列名列表(用于预测,不创建新列)⭐ 新增
n_iterations: 迭代次数默认10范围5-50
random_state: 随机种子默认42确保结果可重复
Returns:
{
'success': True/False,
'result_data': 包含所有新列的数据框JSON格式,
'stats': {
column: {
'original_column': 原列名,
'new_column': 新列名原名_MICE,
'missing_before': 缺失数量,
'filled_count': 填补数量,
'mean_before': 填补前均值,
'mean_after': 填补后均值,
'std_before': 填补前标准差,
'std_after': 填补后标准差
}
for column in columns
},
'message': 操作说明
}
实现细节:
1. 对所选列执行MICE填补
2. 为每列创建新列命名原列名_MICE
3. 使用 df.insert() 将每个新列插入到其原列旁边
4. 返回包含所有新列的完整数据框
示例:
target: 体重kg、收缩压mmHg
reference: 年龄、身高、性别
MICE计算使用5列2个target + 3个reference
新列体重kg_MICE、收缩压mmHg_MICE只创建2个
"""
# 处理参考列默认值
if reference_columns is None:
reference_columns = []
print(f"[fillna_mice] 开始MICE填补: 列={columns}, 参考列={reference_columns}, 迭代次数={n_iterations}", flush=True)
try:
from sklearn.experimental import enable_iterative_imputer
from sklearn.impute import IterativeImputer
except ImportError:
raise ImportError("MICE功能需要安装 scikit-learn。请运行: pip install scikit-learn")
# 验证列存在
for col in columns:
if col not in df.columns:
raise ValueError(f"'{col}' 不存在")
result = df.copy()
# 统计填补前的信息并识别无法MICE填补的列
stats_dict = {}
columns_to_skip = [] # 需要跳过的列100%缺失或分类型)
valid_numeric_columns = [] # 有效的数值列
skip_reasons = {} # 跳过原因
for col in columns:
col_data = result[col]
numeric_col = pd.to_numeric(col_data, errors='coerce')
missing_before = int(col_data.isna().sum())
valid_count = len(col_data) - missing_before
mean_before = float(numeric_col.mean()) if not numeric_col.dropna().empty else None
std_before = float(numeric_col.std()) if not numeric_col.dropna().empty else None
stats_dict[col] = {
'original_column': col,
'new_column': f"{col}_MICE",
'missing_before': missing_before,
'filled_count': 0,
'mean_before': mean_before,
'mean_after': None,
'std_before': std_before,
'std_after': None
}
# ⭐ 检查是否100%缺失
if valid_count == 0:
print(f"[fillna_mice] ⚠️ 列 '{col}' 100%缺失将跳过MICE填补", flush=True)
columns_to_skip.append(col)
skip_reasons[col] = "100%缺失"
continue
# ⭐ 检查是否为数值型(关键修复!)
# 转换为数值后,检查有效值数量
numeric_valid_count = int(numeric_col.notna().sum())
if numeric_valid_count == 0:
# 所有非缺失值都无法转为数值 = 分类列
print(f"[fillna_mice] ⚠️ 列 '{col}' 是分类变量无法转为数值MICE仅支持数值列", flush=True)
print(f"[fillna_mice] 建议使用'众数填补'处理该列", flush=True)
columns_to_skip.append(col)
skip_reasons[col] = "分类变量"
elif numeric_valid_count < valid_count * 0.5:
# 超过50%的有效值无法转为数值 = 混合型,可能有问题
print(f"[fillna_mice] ⚠️ 列 '{col}' 数据类型混乱(仅{numeric_valid_count}/{valid_count}可转为数值)", flush=True)
columns_to_skip.append(col)
skip_reasons[col] = "数据类型混乱"
else:
# 有效的数值列
valid_numeric_columns.append(col)
print(f"[fillna_mice] ✓ 列 '{col}' 检测为数值列将进行MICE填补", flush=True)
# 如果没有有效的数值列
if len(valid_numeric_columns) == 0:
skip_summary = ", ".join([f"{col}({reason})" for col, reason in skip_reasons.items()])
raise ValueError(
f"所选列均无法进行MICE填补{skip_summary}\n\n"
f"💡 MICE多重插补仅适用于数值型列年龄、体重、评分等\n"
f" 对于分类变量(如:婚姻状况、性别、职业),请使用'众数填补'"
)
# ⭐ 处理参考列(用于预测,不创建新列)
valid_reference_columns = []
skipped_reference_columns = []
if reference_columns:
print(f"[fillna_mice] 开始处理参考列...", flush=True)
for ref_col in reference_columns:
if ref_col not in result.columns:
print(f"[fillna_mice] ⚠️ 参考列 '{ref_col}' 不存在,已跳过", flush=True)
continue
# 检查是否为数值型
ref_col_data = result[ref_col]
numeric_col = pd.to_numeric(ref_col_data, errors='coerce')
valid_count = int(ref_col_data.notna().sum())
numeric_valid_count = int(numeric_col.notna().sum())
if valid_count == 0:
print(f"[fillna_mice] ⚠️ 参考列 '{ref_col}' 100%缺失,已跳过", flush=True)
skipped_reference_columns.append(ref_col)
elif numeric_valid_count == 0:
print(f"[fillna_mice] ⚠️ 参考列 '{ref_col}' 是分类变量,已跳过", flush=True)
skipped_reference_columns.append(ref_col)
elif numeric_valid_count < valid_count * 0.5:
print(f"[fillna_mice] ⚠️ 参考列 '{ref_col}' 数据类型混乱,已跳过", flush=True)
skipped_reference_columns.append(ref_col)
else:
valid_reference_columns.append(ref_col)
print(f"[fillna_mice] ✓ 参考列 '{ref_col}' 检测为数值列将用于MICE预测", flush=True)
# ⭐ 合并target列和reference列进行MICE计算
all_mice_columns = valid_numeric_columns + valid_reference_columns
print(f"[fillna_mice] MICE将使用 {len(all_mice_columns)} 列进行计算: {len(valid_numeric_columns)}个目标列 + {len(valid_reference_columns)}个参考列", flush=True)
# 提取所有MICE计算需要的列
df_subset = result[all_mice_columns].copy()
# 将所有列转换为数值
for col in all_mice_columns:
df_subset[col] = pd.to_numeric(df_subset[col], errors='coerce')
# 检查是否至少有一列有缺失值
total_missing = df_subset.isna().sum().sum()
if len(columns_to_skip) > 0:
skip_details = [f"{col}({skip_reasons[col]})" for col in columns_to_skip]
skip_msg = f"(跳过了{len(columns_to_skip)}列: {', '.join(skip_details)}"
print(f"[fillna_mice] {skip_msg}", flush=True)
if total_missing == 0:
print("[fillna_mice] 警告: 数值列均无缺失值跳过MICE填补", flush=True)
# 为所有列创建副本列(包括跳过的列)
final_data = pd.DataFrame()
for col in result.columns:
final_data[col] = result[col]
if col in columns:
final_data[f"{col}_MICE"] = result[col].copy()
result_json = final_data.replace({np.nan: None, np.inf: None, -np.inf: None}).to_dict('records')
return {
'success': True,
'result_data': result_json,
'stats': stats_dict,
'message': "所选列均无缺失值,已创建副本列"
}
print(f"[fillna_mice] 总共有 {total_missing} 个缺失值需要填补(在{len(valid_numeric_columns)}个数值列中)", flush=True)
# 执行MICE填补
print(f"[fillna_mice] 正在执行MICE算法可能需要一些时间...", flush=True)
imputer = IterativeImputer(
max_iter=n_iterations,
random_state=random_state,
verbose=0
)
try:
imputed_array = imputer.fit_transform(df_subset)
# ⭐ 修复使用all_mice_columns包含target列和reference列
df_imputed = pd.DataFrame(imputed_array, columns=all_mice_columns, index=df_subset.index)
print(f"[fillna_mice] MICE填补完成", flush=True)
# ⭐ 修复重建DataFrame处理有效列和跳过的列
new_columns_data = {}
# 处理有效的数值列(已填补的)
for col in valid_numeric_columns:
new_col_name = f"{col}_MICE"
new_col_data = df_imputed[col].copy()
# ⭐ 应用精度:根据原始数据的小数位数四舍五入
decimal_places = detect_decimal_places(result[col])
new_col_data = new_col_data.round(decimal_places)
print(f"[fillna_mice] 列 '{col}': 四舍五入到 {decimal_places} 位小数", flush=True)
# 计算填补后的统计信息
missing_after = int(new_col_data.isna().sum())
filled_count = stats_dict[col]['missing_before'] - missing_after
mean_after = float(new_col_data.mean())
std_after = float(new_col_data.std())
# 更新统计信息
stats_dict[col]['filled_count'] = filled_count
stats_dict[col]['mean_after'] = mean_after
stats_dict[col]['std_after'] = std_after
# 暂存新列数据
new_columns_data[col] = new_col_data
print(f"[fillna_mice] 列 '{col}': 填补了 {filled_count} 个缺失值", flush=True)
# 处理跳过的列创建原样的MICE列
for col in columns_to_skip:
new_columns_data[col] = result[col].copy() # 保持原样
stats_dict[col]['filled_count'] = 0
stats_dict[col]['mean_after'] = None
stats_dict[col]['std_after'] = None
reason = skip_reasons.get(col, "未知原因")
print(f"[fillna_mice] 列 '{col}': {reason},已创建原样副本列", flush=True)
# ⭐ 重建DataFrame按原始列顺序仅为选中的列后跟其MICE列
final_data = pd.DataFrame()
for col in result.columns:
final_data[col] = result[col]
# 只为用户选择的列columns插入MICE列
if col in columns: # 关键修复:检查是否为用户选择的列
if col in new_columns_data:
final_data[f"{col}_MICE"] = new_columns_data[col]
result = final_data
print(f"[fillna_mice] 所有新列已插入到原列旁边,最终列数: {len(result.columns)}", flush=True)
print(f"[fillna_mice] 原始列数: {len(result.columns) - len(columns)}, 新增MICE列数: {len(columns)}", flush=True)
# 转换为JSON格式
result_json = result.replace({np.nan: None, np.inf: None, -np.inf: None}).to_dict('records')
total_filled = sum(s['filled_count'] for s in stats_dict.values())
# 构建消息
message_parts = []
message_parts.append(f"MICE填补完成共填补 {total_filled} 个缺失值")
message_parts.append(f"创建了 {len(valid_numeric_columns)} 个新列")
if len(valid_reference_columns) > 0:
message_parts.append(f"使用了 {len(valid_reference_columns)} 个参考列进行预测")
if len(columns_to_skip) > 0:
skip_summary = ", ".join([f"{col}({skip_reasons[col]})" for col in columns_to_skip])
message_parts.append(f"跳过{len(columns_to_skip)}列:{skip_summary}(请使用众数填补)")
message = "".join(message_parts)
return {
'success': True,
'result_data': result_json,
'stats': stats_dict,
'message': message
}
except Exception as e:
print(f"[fillna_mice] MICE填补失败: {str(e)}", flush=True)
raise ValueError(f"MICE填补失败: {str(e)}")