""" Pivot操作 - 预写函数 长表转宽表(一人多行 → 一人一行) """ import pandas as pd from typing import List, Literal, Optional def pivot_long_to_wide( df: pd.DataFrame, index_column: str, pivot_column: str, value_columns: List[str], aggfunc: Literal['first', 'last', 'mean', 'sum', 'min', 'max'] = 'first' ) -> pd.DataFrame: """ 长表转宽表(Pivot) 将纵向重复的数据转为横向数据 Args: df: 输入数据框 index_column: 索引列(唯一标识,如 Record ID) pivot_column: 透视列(将变成新列名的列,如 Event Name) value_columns: 值列(要转置的数据列,如 FMA得分, ADL得分) aggfunc: 聚合函数 - 'first': 取第一个值(推荐) - 'last': 取最后一个值 - 'mean': 求平均值 - 'sum': 求和 - 'min': 取最小值 - 'max': 取最大值 Returns: 宽表数据框 示例: pivot_long_to_wide( df, index_column='Record ID', pivot_column='Event Name', value_columns=['FMA得分', 'ADL得分'], aggfunc='first' ) """ result = df.copy() print(f'原始数据: {len(result)} 行 × {len(result.columns)} 列') print(f'索引列: {index_column}') print(f'透视列: {pivot_column}') print(f'值列: {", ".join(value_columns)}') print(f'聚合方式: {aggfunc}') print('') # 验证列是否存在 required_cols = [index_column, pivot_column] + value_columns missing_cols = [col for col in required_cols if col not in result.columns] if missing_cols: raise ValueError(f'以下列不存在: {", ".join(missing_cols)}') # 检查索引列的唯一值数量 unique_index = result[index_column].nunique() print(f'唯一{index_column}数量: {unique_index}') # 检查透视列的唯一值 unique_pivot = result[pivot_column].unique() print(f'透视列"{pivot_column}"的唯一值: {list(unique_pivot)}') print('') try: # 执行Pivot转换 df_pivot = result.pivot_table( index=index_column, columns=pivot_column, values=value_columns, aggfunc=aggfunc ) # 展平多级列名 # 如果只有一个值列,列名是单层的 if len(value_columns) == 1: df_pivot.columns = [f'{value_columns[0]}_{col}' for col in df_pivot.columns] else: # 多个值列,列名是多层的,需要展平 df_pivot.columns = ['_'.join(str(c) for c in col).strip() for col in df_pivot.columns.values] # 重置索引(将index列变回普通列) df_pivot = df_pivot.reset_index() print(f'转换成功!') print(f'结果: {len(df_pivot)} 行 × {len(df_pivot.columns)} 列') print(f'新增列: {len(df_pivot.columns) - 1} 列') print('') # 显示新列名 print(f'生成的列名:') new_cols = [col for col in df_pivot.columns if col != index_column] for i, col in enumerate(new_cols[:10], 1): # 只显示前10个 print(f' {i}. {col}') if len(new_cols) > 10: print(f' ... 还有 {len(new_cols) - 10} 列') return df_pivot except ValueError as e: # Pivot失败(可能有重复的index+pivot组合) if 'Index contains duplicate entries' in str(e): # 统计重复情况 duplicates = result.groupby([index_column, pivot_column]).size() duplicates = duplicates[duplicates > 1] print('⚠️ 警告: 发现重复的索引+透视组合:') for (idx, piv), count in duplicates.head(5).items(): print(f' {index_column}={idx}, {pivot_column}={piv}: {count}次') if len(duplicates) > 5: print(f' ... 还有 {len(duplicates) - 5} 个重复组合') print(f'\n建议: 使用聚合函数(如mean、sum)处理重复值') print(f'当前聚合方式: {aggfunc}') raise ValueError(f'存在重复的{index_column}+{pivot_column}组合,需要选择合适的聚合方式') else: raise e def get_pivot_preview( df: pd.DataFrame, index_column: str, pivot_column: str ) -> dict: """ 获取Pivot预览信息 Args: df: 输入数据框 index_column: 索引列 pivot_column: 透视列 Returns: 预览信息 """ # 统计唯一值 unique_index = df[index_column].nunique() unique_pivot = df[pivot_column].unique() # 检查是否有重复 duplicates = df.groupby([index_column, pivot_column]).size() has_duplicates = (duplicates > 1).any() duplicate_count = (duplicates > 1).sum() if has_duplicates else 0 return { 'unique_index_count': int(unique_index), 'unique_pivot_values': [str(v) for v in unique_pivot], 'has_duplicates': bool(has_duplicates), 'duplicate_count': int(duplicate_count), 'estimated_rows': int(unique_index), 'estimated_columns': len(unique_pivot) }