Files
AIclinicalresearch/extraction_service/operations/pivot.py
HaHafeng 91cab452d1 fix(dc/tool-c): Fix special character handling and improve UX
Major fixes:
- Fix pivot transformation with special characters in column names
- Fix compute column validation for Chinese punctuation
- Fix recode dialog to fetch unique values from full dataset via new API
- Add column mapping mechanism to handle special characters

Database migration:
- Add column_mapping field to dc_tool_c_sessions table
- Migration file: 20251208_add_column_mapping

UX improvements:
- Darken table grid lines for better visibility
- Reduce column width by 40% with tooltip support
- Insert new columns next to source columns
- Preserve original row order after operations
- Add notice about 50-row preview limit

Modified files:
- Backend: SessionService, SessionController, QuickActionService, routes
- Python: pivot.py, compute.py, recode.py, binning.py, conditional.py
- Frontend: DataGrid, RecodeDialog, index.tsx, ag-grid-custom.css
- Database: schema.prisma, migration SQL

Status: Code complete, database migrated, ready for testing
2025-12-08 23:20:55 +08:00

185 lines
6.6 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Pivot操作 - 预写函数
长表转宽表(一人多行 → 一人一行)
"""
import pandas as pd
from typing import List, Literal, Optional
def pivot_long_to_wide(
df: pd.DataFrame,
index_column: str,
pivot_column: str,
value_columns: List[str],
aggfunc: Literal['first', 'last', 'mean', 'sum', 'min', 'max'] = 'first'
) -> pd.DataFrame:
"""
长表转宽表Pivot
将纵向重复的数据转为横向数据
Args:
df: 输入数据框
index_column: 索引列(唯一标识,如 Record ID
pivot_column: 透视列(将变成新列名的列,如 Event Name
value_columns: 值列(要转置的数据列,如 FMA得分, ADL得分
aggfunc: 聚合函数
- 'first': 取第一个值(推荐)
- 'last': 取最后一个值
- 'mean': 求平均值
- 'sum': 求和
- 'min': 取最小值
- 'max': 取最大值
Returns:
宽表数据框
示例:
pivot_long_to_wide(
df,
index_column='Record ID',
pivot_column='Event Name',
value_columns=['FMA得分', 'ADL得分'],
aggfunc='first'
)
"""
result = df.copy()
print(f'原始数据: {len(result)}× {len(result.columns)}')
print(f'索引列: {index_column}')
print(f'透视列: {pivot_column}')
print(f'值列: {", ".join(value_columns)}')
print(f'聚合方式: {aggfunc}')
print('')
# 验证列是否存在
required_cols = [index_column, pivot_column] + value_columns
missing_cols = [col for col in required_cols if col not in result.columns]
if missing_cols:
raise ValueError(f'以下列不存在: {", ".join(missing_cols)}')
# 检查索引列的唯一值数量
unique_index = result[index_column].nunique()
print(f'唯一{index_column}数量: {unique_index}')
# 检查透视列的唯一值
unique_pivot = result[pivot_column].unique()
print(f'透视列"{pivot_column}"的唯一值: {list(unique_pivot)}')
print('')
try:
# 执行Pivot转换
df_pivot = result.pivot_table(
index=index_column,
columns=pivot_column,
values=value_columns,
aggfunc=aggfunc
)
# ✨ 增强:展平多级列名(处理特殊字符)
# 如果只有一个值列,列名是单层的
if len(value_columns) == 1:
# 清理列名中的特殊字符,使用安全的分隔符
value_col_clean = str(value_columns[0]).replace('(', '').replace(')', '').replace('=', '').strip()
df_pivot.columns = [f'{value_col_clean}___{str(col).replace(" ", "_")}' for col in df_pivot.columns]
else:
# 多个值列,列名是多层的,需要展平
# 使用三个下划线作为分隔符(避免与列名中的下划线冲突)
new_columns = []
for col in df_pivot.columns.values:
if isinstance(col, tuple):
# 清理每个部分的特殊字符
parts = [str(c).replace('(', '').replace(')', '').replace('=', '').strip() for c in col]
new_col = '___'.join(parts)
else:
new_col = str(col).replace('(', '').replace(')', '').replace('=', '').strip()
new_columns.append(new_col)
df_pivot.columns = new_columns
# 重置索引将index列变回普通列
df_pivot = df_pivot.reset_index()
# ✨ 优化保持原始行顺序按照index_column排序
# 获取原始数据中index_column的顺序
original_order = result[index_column].drop_duplicates().tolist()
# 创建排序映射
order_map = {val: idx for idx, val in enumerate(original_order)}
# 添加临时排序列
df_pivot['_sort_order'] = df_pivot[index_column].map(order_map)
# 按原始顺序排序
df_pivot = df_pivot.sort_values('_sort_order').drop(columns=['_sort_order']).reset_index(drop=True)
print(f'转换成功!')
print(f'结果: {len(df_pivot)}× {len(df_pivot.columns)}')
print(f'新增列: {len(df_pivot.columns) - 1}')
print('')
# 显示新列名
print(f'生成的列名:')
new_cols = [col for col in df_pivot.columns if col != index_column]
for i, col in enumerate(new_cols[:10], 1): # 只显示前10个
print(f' {i}. {col}')
if len(new_cols) > 10:
print(f' ... 还有 {len(new_cols) - 10}')
return df_pivot
except ValueError as e:
# Pivot失败可能有重复的index+pivot组合
if 'Index contains duplicate entries' in str(e):
# 统计重复情况
duplicates = result.groupby([index_column, pivot_column]).size()
duplicates = duplicates[duplicates > 1]
print('⚠️ 警告: 发现重复的索引+透视组合:')
for (idx, piv), count in duplicates.head(5).items():
print(f' {index_column}={idx}, {pivot_column}={piv}: {count}')
if len(duplicates) > 5:
print(f' ... 还有 {len(duplicates) - 5} 个重复组合')
print(f'\n建议: 使用聚合函数如mean、sum处理重复值')
print(f'当前聚合方式: {aggfunc}')
raise ValueError(f'存在重复的{index_column}+{pivot_column}组合,需要选择合适的聚合方式')
else:
raise e
def get_pivot_preview(
df: pd.DataFrame,
index_column: str,
pivot_column: str
) -> dict:
"""
获取Pivot预览信息
Args:
df: 输入数据框
index_column: 索引列
pivot_column: 透视列
Returns:
预览信息
"""
# 统计唯一值
unique_index = df[index_column].nunique()
unique_pivot = df[pivot_column].unique()
# 检查是否有重复
duplicates = df.groupby([index_column, pivot_column]).size()
has_duplicates = (duplicates > 1).any()
duplicate_count = (duplicates > 1).sum() if has_duplicates else 0
return {
'unique_index_count': int(unique_index),
'unique_pivot_values': [str(v) for v in unique_pivot],
'has_duplicates': bool(has_duplicates),
'duplicate_count': int(duplicate_count),
'estimated_rows': int(unique_index),
'estimated_columns': len(unique_pivot)
}