""" 指标-时间表转换(Metric-Time Transform) 将多个时间点列转换为"指标行+时间点列"格式 典型医学场景: - 制作临床研究Table 1 - 横向对比同一指标的时间变化 - 多时间点随访数据整理 示例: 输入(宽表): Record_ID | FMA___基线 | FMA___2周 | FMA___1月 10 | 54 | 93 | 68 11 | 16 | 31 | 72 输出(指标-时间表): Record_ID | 时间点 | 基线 | 2周 | 1月 10 | FMA | 54 | 93 | 68 11 | FMA | 16 | 31 | 72 """ import pandas as pd import numpy as np from typing import List, Optional, Dict, Any import os from collections import defaultdict def detect_common_pattern(column_names: List[str]) -> Dict[str, Any]: """ 自动检测列名的公共模式(前缀、分隔符、时间点) Args: column_names: 列名列表 Returns: { 'success': bool, 'common_prefix': str, # 公共前缀(指标名) 'separator': str, # 分隔符 'timepoints': List[str], # 时间点列表 'confidence': float, # 置信度 0-1 'message': str # 提示信息 } Examples: >>> cols = ['FMA总得分___筛选及基线', 'FMA总得分___随访(2周)', 'FMA总得分___随访(1个月)'] >>> result = detect_common_pattern(cols) >>> result['common_prefix'] 'FMA总得分' >>> result['separator'] '___' >>> result['timepoints'] ['筛选及基线', '随访(2周)', '随访(1个月)'] """ print(f"\n🔍 开始自动检测列名模式...", flush=True) print(f" 输入列数: {len(column_names)}", flush=True) if len(column_names) < 2: return { 'success': False, 'common_prefix': '', 'separator': '', 'timepoints': [], 'confidence': 0.0, 'message': '至少需要2列才能检测模式' } # 打印前3个列名作为样本 print(f" 样本列名:", flush=True) for i, col in enumerate(column_names[:3]): print(f" [{i+1}] {col}", flush=True) if len(column_names) > 3: print(f" ... 还有 {len(column_names) - 3} 列", flush=True) # ==================== 1. 检测最长公共前缀 ==================== common_prefix = os.path.commonprefix(column_names) print(f"\n ✓ 检测到公共前缀: '{common_prefix}'", flush=True) if not common_prefix: return { 'success': False, 'common_prefix': '', 'separator': '', 'timepoints': [], 'confidence': 0.0, 'message': '未检测到公共前缀,选中的列可能不属于同一指标' } # ==================== 2. 检测分隔符 ==================== # 尝试常见分隔符(按优先级排序) separators = ['___', '__', '_', '-', '.', '|', ' - ', ' '] detected_separator = None # 方法1:检查公共前缀是否以分隔符结尾 for sep in separators: if common_prefix.endswith(sep): detected_separator = sep common_prefix = common_prefix[:-len(sep)] # 移除尾部分隔符 print(f" ✓ 检测到分隔符: '{sep}' (位于公共前缀末尾)", flush=True) break # 方法2:如果公共前缀末尾没有分隔符,尝试从剩余部分检测 if not detected_separator: remainders = [col[len(common_prefix):] for col in column_names] for sep in separators: if all(r.startswith(sep) for r in remainders if r): detected_separator = sep print(f" ✓ 检测到分隔符: '{sep}' (位于剩余部分开头)", flush=True) break # ✨ 方法3:智能修正 - 如果剩余部分仍包含分隔符,尝试扩展公共前缀 if detected_separator: remainders = [col[len(common_prefix):] for col in column_names] # 检查每个剩余部分,看分隔符前是否还有公共部分 parts_before_sep = [] for remainder in remainders: if detected_separator in remainder: # 找到第一个分隔符的位置 sep_pos = remainder.find(detected_separator) part = remainder[:sep_pos] parts_before_sep.append(part) else: parts_before_sep.append('') # 如果所有剩余部分在分隔符前都有内容,且内容相同,则扩展公共前缀 if parts_before_sep and all(p == parts_before_sep[0] for p in parts_before_sep if p): additional_prefix = parts_before_sep[0] if additional_prefix: print(f" 🔄 智能修正: 扩展公共前缀 '{common_prefix}' → '{common_prefix}{additional_prefix}'", flush=True) common_prefix = common_prefix + additional_prefix if not detected_separator: print(f" ⚠️ 未检测到明确分隔符,使用空字符串", flush=True) detected_separator = '' # ==================== 3. 提取时间点 ==================== if detected_separator: # ✨ 修复:正确移除分隔符(移除整个分隔符字符串,而不是lstrip) timepoints = [] for col in column_names: remainder = col[len(common_prefix):] # 如果剩余部分以分隔符开头,移除它 if remainder.startswith(detected_separator): timepoint = remainder[len(detected_separator):] else: timepoint = remainder timepoints.append(timepoint.strip()) else: # 没有分隔符,整个剩余部分作为时间点 timepoints = [col[len(common_prefix):].strip() for col in column_names] print(f" ✓ 提取到 {len(timepoints)} 个时间点:", flush=True) for i, tp in enumerate(timepoints[:5]): print(f" [{i+1}] {tp}", flush=True) if len(timepoints) > 5: print(f" ... 还有 {len(timepoints) - 5} 个", flush=True) # ==================== 4. 计算置信度 ==================== confidence = 1.0 # 检查:时间点不能为空 empty_count = sum(1 for tp in timepoints if not tp) if empty_count > 0: confidence -= 0.3 print(f" ⚠️ 发现 {empty_count} 个空时间点,降低置信度", flush=True) # 检查:时间点应该各不相同 unique_timepoints = len(set(timepoints)) if unique_timepoints < len(timepoints): confidence -= 0.2 print(f" ⚠️ 时间点有重复,降低置信度", flush=True) # 检查:公共前缀不应该太短 if len(common_prefix) < 2: confidence -= 0.2 print(f" ⚠️ 公共前缀过短,降低置信度", flush=True) confidence = max(0.0, min(1.0, confidence)) print(f"\n 📊 检测置信度: {confidence:.0%}", flush=True) # ==================== 5. 生成消息 ==================== if confidence >= 0.8: message = f"成功检测:指标='{common_prefix}', 分隔符='{detected_separator}', {len(timepoints)}个时间点" elif confidence >= 0.5: message = f"检测成功但有警告,建议检查结果" else: message = f"检测置信度较低,建议手动指定参数" return { 'success': True, 'common_prefix': common_prefix, 'separator': detected_separator, 'timepoints': timepoints, 'confidence': confidence, 'message': message } def apply_metric_time_transform( df: pd.DataFrame, id_vars: List[str], value_vars: List[str], metric_name: Optional[str] = None, separator: Optional[str] = None, timepoint_col_name: str = '时间点' ) -> pd.DataFrame: """ 应用指标-时间表转换 Args: df: 输入数据框 id_vars: ID列(保持不变的列) value_vars: 值列(同一指标的多个时间点) metric_name: 指标名称(如果为None,则自动检测) separator: 分隔符(如果为None,则自动检测) timepoint_col_name: 时间点列的列名(默认:"时间点") Returns: 转换后的数据框 Examples: >>> df = pd.DataFrame({ ... 'Record_ID': [10, 11], ... 'FMA___基线': [54, 16], ... 'FMA___2周': [93, 31], ... 'FMA___1月': [68, 72] ... }) >>> result = apply_metric_time_transform( ... df, ... id_vars=['Record_ID'], ... value_vars=['FMA___基线', 'FMA___2周', 'FMA___1月'] ... ) >>> result.columns.tolist() ['Record_ID', '时间点', '基线', '2周', '1月'] """ print("\n" + "="*60, flush=True) print("🔄 开始指标-时间表转换...", flush=True) print("="*60, flush=True) # ==================== 参数验证 ==================== if df.empty: print("⚠️ 输入数据框为空", flush=True) return df if not id_vars: raise ValueError('❌ 至少需要选择1个ID列') if len(value_vars) < 2: raise ValueError('❌ 至少需要选择2个值列') # 验证列是否存在 for col in id_vars + value_vars: if col not in df.columns: raise KeyError(f"❌ 列 '{col}' 不存在") print(f"\n📊 转换前数据概况:", flush=True) print(f" - 总行数: {len(df)}", flush=True) print(f" - ID列: {len(id_vars)} 个 ({', '.join(id_vars)})", flush=True) print(f" - 值列: {len(value_vars)} 个", flush=True) # ==================== 自动检测或使用指定参数 ==================== if not metric_name or separator is None: print(f"\n🔍 自动检测模式...", flush=True) pattern = detect_common_pattern(value_vars) if not pattern['success']: raise ValueError(f"❌ 自动检测失败: {pattern['message']}") metric_name = metric_name or pattern['common_prefix'] separator = separator if separator is not None else pattern['separator'] timepoints = pattern['timepoints'] print(f"\n✅ 使用检测结果:", flush=True) print(f" - 指标名: '{metric_name}'", flush=True) print(f" - 分隔符: '{separator}'", flush=True) print(f" - 置信度: {pattern['confidence']:.0%}", flush=True) else: print(f"\n✅ 使用手动指定参数:", flush=True) print(f" - 指标名: '{metric_name}'", flush=True) print(f" - 分隔符: '{separator}'", flush=True) # 手动拆分时间点 timepoints = [] for col in value_vars: if separator and separator in col: # 移除指标名和分隔符 remainder = col.replace(metric_name, '', 1).lstrip(separator) timepoints.append(remainder) else: # 直接移除指标名 remainder = col.replace(metric_name, '', 1) timepoints.append(remainder.strip()) # ==================== 构建结果DataFrame ==================== print(f"\n🔨 开始构建结果数据...", flush=True) result_rows = [] for idx, row in df.iterrows(): result_row = {} # 1. 复制ID列 for id_col in id_vars: result_row[id_col] = row[id_col] # 2. 添加时间点列(实际存储的是指标名) result_row[timepoint_col_name] = metric_name # 3. 添加各个时间点的值作为独立列 for original_col, timepoint in zip(value_vars, timepoints): result_row[timepoint] = row[original_col] result_rows.append(result_row) result_df = pd.DataFrame(result_rows) # ==================== 调整列顺序 ==================== # 顺序:ID列 + 时间点列 + 各时间点列 column_order = id_vars + [timepoint_col_name] + timepoints result_df = result_df[column_order] # ==================== 统计输出 ==================== print(f"\n{'='*60}", flush=True) print(f"✅ 指标-时间表转换完成!", flush=True) print(f"{'='*60}", flush=True) print(f"📊 转换结果:", flush=True) print(f" - 总行数: {len(result_df)} (不变)", flush=True) print(f" - 总列数: {len(result_df.columns)} (ID列 + 时间点列 + {len(timepoints)}个时间点列)", flush=True) print(f" - 指标名: {metric_name}", flush=True) print(f" - 时间点: {', '.join(timepoints[:5])}{'...' if len(timepoints) > 5 else ''}", flush=True) # 显示前3行示例 print(f"\n 前3行数据示例:", flush=True) for idx, row in result_df.head(3).iterrows(): row_preview = ' | '.join([f"{col}={row[col]}" for col in result_df.columns[:4]]) print(f" [{idx}] {row_preview}...", flush=True) return result_df def preview_metric_time_transform( df: pd.DataFrame, id_vars: List[str], value_vars: List[str], preview_rows: int = 5 ) -> Dict[str, Any]: """ 预览指标-时间表转换结果(不实际执行完整转换) Args: df: 输入数据框 id_vars: ID列 value_vars: 值列 preview_rows: 预览行数 Returns: { 'pattern': { 'common_prefix': str, 'separator': str, 'timepoints': List[str], 'confidence': float }, 'original_shape': (rows, cols), 'new_shape': (rows, cols), 'preview_data': List[Dict], 'estimated_change': str } """ # 检测模式 pattern = detect_common_pattern(value_vars) if not pattern['success']: return { 'success': False, 'error': pattern['message'] } # 对前几行执行转换 preview_df = df.head(preview_rows) try: result_preview = apply_metric_time_transform( preview_df, id_vars, value_vars, pattern['common_prefix'], pattern['separator'] ) return { 'success': True, 'pattern': pattern, 'original_shape': (len(df), len(df.columns)), 'new_shape': (len(df), len(id_vars) + 1 + len(pattern['timepoints'])), 'preview_data': result_preview.to_dict('records'), 'estimated_change': f"列数: {len(df.columns)} → {len(id_vars) + 1 + len(pattern['timepoints'])} (ID列 + 时间点列 + {len(pattern['timepoints'])}个时间点列)" } except Exception as e: return { 'success': False, 'error': str(e) } # ==================== 多指标转换(方向1:时间点为行,指标为列)==================== def detect_metric_groups( column_names: List[str], separators: Optional[List[str]] = None ) -> Dict[str, Any]: """ 自动检测并分组多个指标的列 参数: column_names: 列名列表,例如 ['FMA总得分_基线', 'FMA总得分_随访1', 'ADL总分_基线', 'ADL总分_随访1'] separators: 可选的分隔符列表,默认 ['___', '__', '_', '-', '.'] 返回: { 'success': bool, 'metric_groups': { 'FMA总得分': ['FMA总得分_基线', 'FMA总得分_随访1', ...], 'ADL总分': ['ADL总分_基线', 'ADL总分_随访1', ...], ... }, 'separator': str, # 检测到的分隔符 'timepoints': ['基线', '随访1', ...], # 所有时间点(应该每个指标都一致) 'confidence': float, # 置信度 0.0-1.0 'message': str } """ print(f"\n🔍 开始自动检测多指标分组...", flush=True) print(f" 输入列数: {len(column_names)}", flush=True) if len(column_names) < 2: return { 'success': False, 'metric_groups': {}, 'separator': '', 'timepoints': [], 'confidence': 0.0, 'message': '至少需要2列才能检测分组' } if separators is None: separators = ['___', '__', '_', '-', '.', '|', ' - ', ' '] # ==================== 1. 尝试每个分隔符 ==================== detected_separator = None metric_groups = defaultdict(list) for sep in separators: temp_groups = defaultdict(list) failed = False for col in column_names: if sep not in col: failed = True break # 分割列名 parts = col.split(sep) if len(parts) < 2: failed = True break # 第一部分作为指标名 metric_name = parts[0] temp_groups[metric_name].append(col) if not failed and len(temp_groups) > 0: detected_separator = sep metric_groups = temp_groups print(f" ✓ 检测到分隔符: '{sep}'", flush=True) break if not detected_separator: return { 'success': False, 'metric_groups': {}, 'separator': '', 'timepoints': [], 'confidence': 0.0, 'message': '未检测到公共分隔符,请确认选中的列格式一致' } # ==================== 2. 提取每个指标的时间点 ==================== metric_timepoints = {} for metric_name, cols in metric_groups.items(): timepoints = [] for col in cols: # 提取时间点(分隔符后的部分) parts = col.split(detected_separator) if len(parts) >= 2: # 使用最后一部分作为时间点(支持多级分隔,如 "FMA总得分_子项_基线") timepoint = parts[-1].strip() timepoints.append(timepoint) metric_timepoints[metric_name] = timepoints print(f" ✓ 检测到 {len(metric_groups)} 个指标:", flush=True) for metric_name, cols in metric_groups.items(): print(f" • {metric_name} ({len(cols)}列)", flush=True) # ==================== 3. 验证时间点一致性 ==================== # 检查所有指标的时间点是否相同 all_timepoints = list(metric_timepoints.values()) first_timepoints = all_timepoints[0] consistent = True for tp_list in all_timepoints[1:]: if tp_list != first_timepoints: consistent = False break if not consistent: print(f" ⚠️ 警告: 各指标的时间点不完全一致", flush=True) # 使用所有时间点的并集 all_unique_timepoints = sorted(set(tp for tp_list in all_timepoints for tp in tp_list)) confidence = 0.6 message = f"检测到{len(metric_groups)}个指标,但时间点不完全一致。将使用所有时间点的并集,缺失值将填充为NA。" else: all_unique_timepoints = first_timepoints confidence = 1.0 message = f"成功检测到{len(metric_groups)}个指标,共{len(all_unique_timepoints)}个时间点" print(f" ✓ 检测到 {len(all_unique_timepoints)} 个时间点:", flush=True) for i, tp in enumerate(all_unique_timepoints[:5]): print(f" [{i+1}] {tp}", flush=True) if len(all_unique_timepoints) > 5: print(f" ... 还有 {len(all_unique_timepoints) - 5} 个", flush=True) # ==================== 4. 计算置信度 ==================== # 检查:每个指标的列数是否相同 column_counts = [len(cols) for cols in metric_groups.values()] if len(set(column_counts)) > 1: confidence -= 0.2 print(f" ⚠️ 各指标的列数不同,降低置信度", flush=True) return { 'success': True, 'metric_groups': dict(metric_groups), 'separator': detected_separator, 'timepoints': all_unique_timepoints, 'confidence': confidence, 'message': message } def apply_multi_metric_to_long( df: pd.DataFrame, id_vars: List[str], metric_groups: Dict[str, List[str]], separator: str, event_col_name: str = 'Event_Name' ) -> pd.DataFrame: """ 多指标转长表:时间点为行,指标为列 参数: df: 原始数据框 id_vars: ID列列表 metric_groups: 指标分组字典,格式 {'FMA总得分': ['FMA总得分_基线', ...], ...} separator: 分隔符 event_col_name: 时间点列的列名 返回: 转换后的数据框 示例: 输入: Record_ID | FMA总得分_基线 | FMA总得分_随访1 | ADL总分_基线 | ADL总分_随访1 10 | 58 | 67 | 值1 | 值2 输出: Record_ID | Event_Name | FMA总得分 | ADL总分 10 | 基线 | 58 | 值1 10 | 随访1 | 67 | 值2 """ print(f"\n🔄 开始多指标转长表转换...", flush=True) print(f" 原始形状: {df.shape}", flush=True) print(f" ID列: {id_vars}", flush=True) print(f" 指标数: {len(metric_groups)}", flush=True) # ✨ 记录原始行的顺序(保持原始Record ID顺序) df = df.copy() df['_original_order'] = range(len(df)) # ==================== 1. 对每个指标执行 melt ==================== melted_dfs = [] for metric_name, cols in metric_groups.items(): print(f" • 处理指标: {metric_name} ({len(cols)}列)", flush=True) # 提取该指标的数据(包含原始顺序列) df_metric = df[id_vars + ['_original_order'] + cols].copy() # Melt(保留原始顺序列) df_melted = df_metric.melt( id_vars=id_vars + ['_original_order'], value_vars=cols, var_name='_temp_col', value_name=metric_name ) # 提取时间点(移除分隔符前的指标名部分) df_melted[event_col_name] = df_melted['_temp_col'].apply( lambda x: x.split(separator)[-1].strip() if separator in x else x ) # 删除临时列 df_melted = df_melted.drop('_temp_col', axis=1) melted_dfs.append(df_melted) # ==================== 2. Merge所有指标 ==================== print(f" • 合并 {len(melted_dfs)} 个指标的数据...", flush=True) result = melted_dfs[0] for i, df_metric in enumerate(melted_dfs[1:], 1): result = result.merge( df_metric, on=id_vars + ['_original_order', event_col_name], how='outer' # 外连接,保留所有时间点 ) # ==================== 3. 排序 ==================== # ✨ 按原始顺序和时间点排序(保持原始Record ID顺序) result = result.sort_values(by=['_original_order', event_col_name]).reset_index(drop=True) # 删除临时的原始顺序列 result = result.drop('_original_order', axis=1) # ==================== 4. 调整列顺序 ==================== # 确保列顺序为:ID列 → Event_Name → 所有指标列 metric_cols = [col for col in result.columns if col not in id_vars and col != event_col_name] desired_column_order = id_vars + [event_col_name] + metric_cols result = result[desired_column_order] print(f" ✓ 转换完成!新形状: {result.shape}", flush=True) print(f" ✓ 列顺序: {list(result.columns)}", flush=True) return result def preview_multi_metric_to_long( df: pd.DataFrame, id_vars: List[str], value_vars: List[str], separators: Optional[List[str]] = None, event_col_name: str = 'Event_Name', preview_rows: int = 10 ) -> Dict[str, Any]: """ 预览多指标转长表的结果 返回: { 'success': bool, 'grouping': {...}, # detect_metric_groups的结果 'original_shape': (rows, cols), 'new_shape': (rows, cols), 'preview_data': [...], 'estimated_change': str } """ print(f"\n📊 预览多指标转长表...", flush=True) # 1. 检测分组 grouping = detect_metric_groups(value_vars, separators) if not grouping['success']: return { 'success': False, 'error': grouping['message'] } # 2. 对前几行执行转换 preview_df = df.head(preview_rows) try: result_preview = apply_multi_metric_to_long( preview_df, id_vars, grouping['metric_groups'], grouping['separator'], event_col_name ) num_metrics = len(grouping['metric_groups']) num_timepoints = len(grouping['timepoints']) return { 'success': True, 'grouping': grouping, 'original_shape': (len(df), len(df.columns)), 'new_shape': (len(df) * num_timepoints, len(id_vars) + 1 + num_metrics), 'preview_data': result_preview.to_dict('records'), 'estimated_change': f"行数: {len(df)} → {len(df) * num_timepoints} (每个ID复制{num_timepoints}次); 列数: {len(df.columns)} → {len(id_vars) + 1 + num_metrics} (ID列 + 时间点列 + {num_metrics}个指标列)" } except Exception as e: import traceback print(f" ❌ 预览失败: {str(e)}", flush=True) traceback.print_exc() return { 'success': False, 'error': str(e) } # ==================== 多指标转换(方向2:时间点为列,指标为行)==================== def apply_multi_metric_to_matrix( df: pd.DataFrame, id_vars: List[str], metric_groups: Dict[str, List[str]], separator: str, event_col_name: str = 'Event_Name', metric_col_name: str = '指标名' ) -> pd.DataFrame: """ 多指标转矩阵格式:时间点为列,指标为行 参数: df: 原始数据框 id_vars: ID列列表 metric_groups: 指标分组字典 separator: 分隔符 event_col_name: 时间点列的列名(中间变量) metric_col_name: 指标列的列名 返回: 转换后的数据框 示例: 输入: Record_ID | FMA总得分_基线 | FMA总得分_随访1 | ADL总分_基线 | ADL总分_随访1 10 | 58 | 67 | 值1 | 值2 输出: Record_ID | 指标名 | 基线 | 随访1 10 | FMA总得分 | 58 | 67 10 | ADL总分 | 值1 | 值2 """ print(f"\n🔄 开始多指标转矩阵格式...", flush=True) print(f" 原始形状: {df.shape}", flush=True) print(f" ID列: {id_vars}", flush=True) print(f" 指标数: {len(metric_groups)}", flush=True) # ✨ 记录原始行的顺序(保持原始Record ID顺序) # 创建ID到原始顺序的映射 df_with_order = df.copy() df_with_order['_original_order'] = range(len(df_with_order)) # 创建ID列到原始顺序的映射字典 # 如果有多个ID列,使用元组作为key if len(id_vars) == 1: id_to_order = df_with_order.set_index(id_vars[0])['_original_order'].to_dict() else: id_to_order = df_with_order.set_index(id_vars)['_original_order'].to_dict() # ==================== 1. 先转成长表 ==================== df_long = apply_multi_metric_to_long( df, id_vars, metric_groups, separator, event_col_name ) print(f" • 长表形状: {df_long.shape}", flush=True) # ==================== 2. 转成宽格式(指标为行,时间点为列)==================== # 先melt所有指标列,变成 (ID, Event_Name, 指标名, 值) 格式 metric_cols = [col for col in df_long.columns if col not in id_vars and col != event_col_name] print(f" • 准备pivot: {len(metric_cols)} 个指标列", flush=True) # Melt:将所有指标列转为行 df_melted = df_long.melt( id_vars=id_vars + [event_col_name], value_vars=metric_cols, var_name=metric_col_name, value_name='_value' ) print(f" • Melt后形状: {df_melted.shape}", flush=True) # Pivot:时间点变成列 # 使用 pivot_table 而不是 pivot,因为可能有重复索引 result = df_melted.pivot_table( index=id_vars + [metric_col_name], columns=event_col_name, values='_value', aggfunc='first' # 如果有重复,取第一个值 ).reset_index() # 清理列名(移除多级索引的名称) result.columns.name = None # ✨ 添加原始顺序列(用于排序) if len(id_vars) == 1: result['_original_order'] = result[id_vars[0]].map(id_to_order) else: # 多个ID列的情况,创建元组作为key result['_original_order'] = result[id_vars].apply(tuple, axis=1).map(id_to_order) # ==================== 3. 调整列顺序 ==================== # 确保列顺序为:ID列 → 指标名列 → 所有时间点列(按原始顺序) timepoint_cols = [col for col in result.columns if col not in id_vars and col != metric_col_name] # 尝试保持时间点的原始顺序(从 metric_groups 中获取) first_metric_cols = list(metric_groups.values())[0] original_timepoint_order = [] for col in first_metric_cols: timepoint = col.split(separator)[-1].strip() if separator in col else col if timepoint not in original_timepoint_order: original_timepoint_order.append(timepoint) # 按原始顺序排列时间点列 sorted_timepoint_cols = [] for tp in original_timepoint_order: if tp in timepoint_cols: sorted_timepoint_cols.append(tp) # 添加任何未在原始顺序中的时间点(防御性编程) for tp in timepoint_cols: if tp not in sorted_timepoint_cols: sorted_timepoint_cols.append(tp) # ==================== 4. 排序 ==================== # ✨ 按原始顺序和指标名排序(保持原始Record ID顺序) result = result.sort_values(by=['_original_order', metric_col_name]).reset_index(drop=True) # 删除临时的原始顺序列 result = result.drop('_original_order', axis=1) # ==================== 5. 调整列顺序 ==================== desired_column_order = id_vars + [metric_col_name] + sorted_timepoint_cols result = result[desired_column_order] print(f" ✓ 转换完成!新形状: {result.shape}", flush=True) print(f" ✓ 列顺序: {list(result.columns)}", flush=True) return result def preview_multi_metric_to_matrix( df: pd.DataFrame, id_vars: List[str], value_vars: List[str], separators: Optional[List[str]] = None, metric_col_name: str = '指标名', preview_rows: int = 10 ) -> Dict[str, Any]: """ 预览多指标转矩阵格式的结果 返回: { 'success': bool, 'grouping': {...}, # detect_metric_groups的结果 'original_shape': (rows, cols), 'new_shape': (rows, cols), 'preview_data': [...], 'estimated_change': str } """ print(f"\n📊 预览多指标转矩阵格式...", flush=True) # 1. 检测分组 grouping = detect_metric_groups(value_vars, separators) if not grouping['success']: return { 'success': False, 'error': grouping['message'] } # 2. 对前几行执行转换 preview_df = df.head(preview_rows) try: result_preview = apply_multi_metric_to_matrix( preview_df, id_vars, grouping['metric_groups'], grouping['separator'], 'Event_Name', metric_col_name ) num_metrics = len(grouping['metric_groups']) num_timepoints = len(grouping['timepoints']) # 新行数 = 原始行数 × 指标数 estimated_new_rows = len(df) * num_metrics # 新列数 = ID列数 + 1(指标名列)+ 时间点数 estimated_new_cols = len(id_vars) + 1 + num_timepoints return { 'success': True, 'grouping': grouping, 'original_shape': (len(df), len(df.columns)), 'new_shape': (estimated_new_rows, estimated_new_cols), 'preview_data': result_preview.to_dict('records'), 'estimated_change': f"行数: {len(df)} → {estimated_new_rows} (每个ID复制{num_metrics}次,每个指标1行); 列数: {len(df.columns)} → {estimated_new_cols} (ID列 + 指标名列 + {num_timepoints}个时间点列)" } except Exception as e: import traceback print(f" ❌ 预览失败: {str(e)}", flush=True) traceback.print_exc() return { 'success': False, 'error': str(e) }