Files
HaHafeng 6f5013e8ab fix(dc): Fix pivot column name tuple issue for single value column
Problem:
- When selecting only 1 value column in Pivot (long-to-wide), column names
  were incorrectly formatted as 'FMA___('FMA', 'baseline')' instead of 'FMA___baseline'
- This occurred because pandas pivot_table() sometimes returns tuple column names
  even for single value columns, but the code didn't handle this case

Root Cause:
- Line 112-127 in pivot.py assumed single value columns always have simple
  string column names, without checking for tuples
- Multi-value column logic (line 129-145) correctly handled tuples with isinstance()

Solution:
- Add isinstance(col, tuple) check in single value column mode
- Extract pivot_value from tuple[1] if column name is a tuple
- Maintain backward compatibility for non-tuple column names

Testing:
- Single value column: FMA___ + tuple -> 'FMA___baseline' (fixed)
- Multiple value columns: Already working correctly (no change)

Impact: Fixes historical bug in Tool C Pivot feature
2025-12-21 18:09:58 +08:00

347 lines
15 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Pivot操作 - 预写函数
长表转宽表(一人多行 → 一人一行)
✨ 方案B实现支持列名映射
"""
import pandas as pd
from typing import List, Literal, Optional, Dict
def pivot_long_to_wide(
df: pd.DataFrame,
index_column: str,
pivot_column: str,
value_columns: List[str],
aggfunc: Literal['first', 'last', 'mean', 'sum', 'min', 'max'] = 'first',
column_mapping: Optional[List[Dict[str, str]]] = None,
keep_unused_columns: bool = False,
unused_agg_method: Literal['first', 'mode', 'mean'] = 'first',
original_column_order: Optional[List[str]] = None,
pivot_value_order: Optional[List[str]] = None
) -> pd.DataFrame:
"""
长表转宽表Pivot
将纵向重复的数据转为横向数据
Args:
df: 输入数据框
index_column: 索引列(唯一标识,如 Record ID
pivot_column: 透视列(将变成新列名的列,如 Event Name
value_columns: 值列(要转置的数据列,如 FMA得分, ADL得分
aggfunc: 聚合函数
column_mapping: 列名映射(可选)
keep_unused_columns: 是否保留未选择的列默认False
unused_agg_method: 未选择列的聚合方式('first'=取第一个值, 'mode'=取众数, 'mean'=取均值)
original_column_order: 原始列顺序(用于保持列顺序一致)
pivot_value_order: 透视列值的原始顺序(用于保持透视值顺序一致)
Returns:
宽表数据框
"""
result = df.copy()
print(f'━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━', flush=True)
print(f'📊 Pivot转换', flush=True)
print(f'原始数据: {len(result)}× {len(result.columns)}', flush=True)
print(f'索引列: {index_column}', flush=True)
print(f'透视列: {pivot_column}', flush=True)
print(f'值列: {", ".join(value_columns)}', flush=True)
print(f'聚合方式: {aggfunc}', flush=True)
# ✨ 检测未选择的列
all_columns = set(result.columns)
used_columns = {index_column, pivot_column} | set(value_columns)
unused_columns = list(all_columns - used_columns)
if unused_columns:
print(f'', flush=True)
print(f'📋 未选择的列({len(unused_columns)}个): {", ".join(unused_columns[:5])}{"..." if len(unused_columns) > 5 else ""}', flush=True)
if keep_unused_columns:
print(f'✓ 将保留未选择的列(聚合方式: {unused_agg_method}', flush=True)
else:
print(f'⚠️ 这些列将不会保留在结果中', flush=True)
print('', flush=True)
# 验证列是否存在
required_cols = [index_column, pivot_column] + value_columns
missing_cols = [col for col in required_cols if col not in result.columns]
if missing_cols:
raise ValueError(f'以下列不存在: {", ".join(missing_cols)}')
# 检查索引列的唯一值数量
unique_index = result[index_column].nunique()
print(f'✓ 唯一{index_column}数量: {unique_index}', flush=True)
# 检查透视列的唯一值(重要!)
unique_pivot_values = result[pivot_column].unique()
print(f'✓ 透视列"{pivot_column}"的唯一值: {list(unique_pivot_values)}', flush=True)
print(f'✓ 唯一值数量: {len(unique_pivot_values)}', flush=True)
# ⚠️ 关键检查如果唯一值只有1个警告用户
if len(unique_pivot_values) == 1:
print(f'', flush=True)
print(f'⚠️ 警告: 透视列只有1个唯一值', flush=True)
print(f' 这意味着Pivot后只会生成1列而不是多列', flush=True)
print(f' 请检查:', flush=True)
print(f' 1. 透视列是否选择正确?', flush=True)
print(f' 2. 数据是否已经是宽表格式?', flush=True)
print(f'', flush=True)
print('', flush=True)
try:
# ✅ 执行Pivot转换dropna=False保留全NaN的列
df_pivot = result.pivot_table(
index=index_column,
columns=pivot_column,
values=value_columns,
aggfunc=aggfunc,
dropna=False # ✨ 关键不删除全NaN的列确保所有组合都生成
)
print(f'✓ Pivot执行成功', flush=True)
print(f' Pivot后shape: {df_pivot.shape}', flush=True)
print(f' 列数: {len(df_pivot.columns)}', flush=True)
print(f'', flush=True)
# ✨ 修复:更健壮的列名展平逻辑
if len(value_columns) == 1:
# 单个值列:列名可能是单层的,也可能是元组
print(f'📝 单值列模式:展平列名', flush=True)
# 获取原始值列名(用于生成新列名)
value_col_name = value_columns[0]
# 生成新列名值列名___透视值
new_columns = []
for col in df_pivot.columns:
# ✅ 修复检查col是否是元组pandas在某些情况下会返回元组
if isinstance(col, tuple):
# 如果是元组 (值列名, 透视值),取第二个元素
pivot_value = col[1]
new_col_name = f'{value_col_name}___{pivot_value}'
else:
# 如果不是元组,直接使用(兼容旧行为)
new_col_name = f'{value_col_name}___{col}'
new_columns.append(new_col_name)
print(f' 生成列: {new_col_name}', flush=True)
df_pivot.columns = new_columns
else:
# 多个值列:列名是多层的 ((value_col1, pivot_val1), (value_col1, pivot_val2), ...)
print(f'📝 多值列模式:展平多级列名', flush=True)
new_columns = []
for col in df_pivot.columns:
if isinstance(col, tuple):
# 元组:(值列名, 透视值)
value_name, pivot_value = col
new_col_name = f'{value_name}___{pivot_value}'
new_columns.append(new_col_name)
print(f' {col}{new_col_name}', flush=True)
else:
# 单个值(不应该出现,但防御性编程)
new_columns.append(str(col))
df_pivot.columns = new_columns
print(f'', flush=True)
print(f'✓ 列名展平完成', flush=True)
print(f'', flush=True)
# 重置索引将index列变回普通列
df_pivot = df_pivot.reset_index()
# ✨ 新功能:保留未选择的列
if keep_unused_columns and unused_columns:
print(f'', flush=True)
print(f'📦 正在处理未选择的列...', flush=True)
# 对未选择的列进行聚合
if unused_agg_method == 'first':
# 取第一个非空值
unused_df = result.groupby(index_column)[unused_columns].first().reset_index()
print(f'✓ 聚合方式:取第一个值', flush=True)
elif unused_agg_method == 'mode':
# 取众数
def get_mode(x):
mode_vals = x.mode()
return mode_vals[0] if len(mode_vals) > 0 else None
unused_df = result.groupby(index_column)[unused_columns].agg(get_mode).reset_index()
print(f'✓ 聚合方式:取众数', flush=True)
elif unused_agg_method == 'mean':
# 取均值(区分数值列和非数值列)
numeric_cols = [col for col in unused_columns if pd.api.types.is_numeric_dtype(result[col])]
non_numeric_cols = [col for col in unused_columns if col not in numeric_cols]
# 数值列取均值
if numeric_cols:
numeric_df = result.groupby(index_column)[numeric_cols].mean()
else:
numeric_df = pd.DataFrame(index=result[index_column].unique())
# 非数值列取第一个值
if non_numeric_cols:
non_numeric_df = result.groupby(index_column)[non_numeric_cols].first()
else:
non_numeric_df = pd.DataFrame(index=result[index_column].unique())
# 合并
unused_df = pd.concat([numeric_df, non_numeric_df], axis=1).reset_index()
print(f'✓ 聚合方式:数值列取均值,非数值列取第一个值', flush=True)
else:
# 默认取第一个值
unused_df = result.groupby(index_column)[unused_columns].first().reset_index()
# 合并到pivot结果中
df_pivot = df_pivot.merge(unused_df, on=index_column, how='left')
print(f'✓ 已保留 {len(unused_columns)} 个未选择的列', flush=True)
for col in unused_columns[:5]:
print(f'{col}', flush=True)
if len(unused_columns) > 5:
print(f' • ... 还有 {len(unused_columns) - 5}', flush=True)
# ✨ 优化:保持原始行顺序
original_order = result[index_column].drop_duplicates().tolist()
order_map = {val: idx for idx, val in enumerate(original_order)}
df_pivot['_sort_order'] = df_pivot[index_column].map(order_map)
df_pivot = df_pivot.sort_values('_sort_order').drop(columns=['_sort_order']).reset_index(drop=True)
# ✨ 新增:保持原始列顺序
if original_column_order:
print(f'', flush=True)
print(f'🔄 按原始列顺序重排列...', flush=True)
# ✅ 关键:一次遍历,逐列判断(转置列展开,未选择列保持)
final_cols = [index_column]
for orig_col in original_column_order:
if orig_col == index_column or orig_col == pivot_column:
continue # 跳过索引列和透视列
if orig_col in value_columns:
# ✅ 这个列被选择转置 → 添加展开后的所有列
related_cols = [c for c in df_pivot.columns if c.startswith(f'{orig_col}___')]
# ✨ 按透视列的原始顺序排序(而不是字母顺序)
if pivot_value_order:
# 创建顺序映射
pivot_order_map = {val: idx for idx, val in enumerate(pivot_value_order)}
# 对related_cols按透视值顺序排序
def get_pivot_value(col_name):
# 从 "FMA___基线" 提取 "基线"
parts = col_name.split('___')
if len(parts) == 2:
return parts[1]
return col_name
related_cols_sorted = sorted(
related_cols,
key=lambda c: pivot_order_map.get(get_pivot_value(c), 999)
)
else:
# 如果没有提供透视值顺序,保持现有顺序
related_cols_sorted = sorted(related_cols)
final_cols.extend(related_cols_sorted)
print(f'{orig_col}{len(related_cols_sorted)}个转置列', flush=True)
elif keep_unused_columns and orig_col in df_pivot.columns:
# ✅ 这个列未被选择 → 如果保留,直接添加
final_cols.append(orig_col)
print(f'{orig_col} → 保持不变', flush=True)
# 添加任何剩余的列(防御性编程)
for col in df_pivot.columns:
if col not in final_cols:
final_cols.append(col)
print(f'{col} → 剩余列', flush=True)
# 重排列
df_pivot = df_pivot[final_cols]
print(f'✓ 列顺序已按原始顺序重排(总计{len(final_cols)}列)', flush=True)
print(f'✅ 转换成功!', flush=True)
print(f'📊 结果: {len(df_pivot)}× {len(df_pivot.columns)}', flush=True)
print(f'📈 新增列: {len(df_pivot.columns) - 1}', flush=True)
print(f'', flush=True)
# 显示所有新列名
print(f'📋 生成的列名:', flush=True)
new_cols = [col for col in df_pivot.columns if col != index_column]
for i, col in enumerate(new_cols, 1):
print(f' {i}. {col}', flush=True)
print(f'━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━', flush=True)
print(f'', flush=True)
return df_pivot
except ValueError as e:
# Pivot失败可能有重复的index+pivot组合
if 'Index contains duplicate entries' in str(e):
# 统计重复情况
duplicates = result.groupby([index_column, pivot_column]).size()
duplicates = duplicates[duplicates > 1]
print('⚠️ 警告: 发现重复的索引+透视组合:')
for (idx, piv), count in duplicates.head(5).items():
print(f' {index_column}={idx}, {pivot_column}={piv}: {count}')
if len(duplicates) > 5:
print(f' ... 还有 {len(duplicates) - 5} 个重复组合')
print(f'\n建议: 使用聚合函数如mean、sum处理重复值')
print(f'当前聚合方式: {aggfunc}')
raise ValueError(f'存在重复的{index_column}+{pivot_column}组合,需要选择合适的聚合方式')
else:
raise e
def get_pivot_preview(
df: pd.DataFrame,
index_column: str,
pivot_column: str
) -> dict:
"""
获取Pivot预览信息
Args:
df: 输入数据框
index_column: 索引列
pivot_column: 透视列
Returns:
预览信息
"""
# 统计唯一值
unique_index = df[index_column].nunique()
unique_pivot = df[pivot_column].unique()
# 检查是否有重复
duplicates = df.groupby([index_column, pivot_column]).size()
has_duplicates = (duplicates > 1).any()
duplicate_count = (duplicates > 1).sum() if has_duplicates else 0
return {
'unique_index_count': int(unique_index),
'unique_pivot_values': [str(v) for v in unique_pivot],
'has_duplicates': bool(has_duplicates),
'duplicate_count': int(duplicate_count),
'estimated_rows': int(unique_index),
'estimated_columns': len(unique_pivot)
}