Files
AIclinicalresearch/extraction_service/operations/unpivot.py
HaHafeng b47079b387 feat(iit): Phase 1.5 AI对话集成REDCap真实数据完成
- feat: ChatService集成DeepSeek-V3实现AI对话(390行)
- feat: SessionMemory实现上下文记忆(最近3轮对话,170行)
- feat: 意图识别支持REDCap数据查询(关键词匹配)
- feat: REDCap数据注入LLM(queryRedcapRecord, countRedcapRecords, getProjectInfo)
- feat: 解决LLM幻觉问题(基于真实数据回答,明确system prompt)
- feat: 即时反馈(正在查询...提示)
- test: REDCap查询测试通过(test0102项目,10条记录,ID 7患者详情)
- docs: 创建Phase1.5开发完成记录(313行)
- docs: 更新Phase1.5开发计划(标记完成)
- docs: 更新MVP开发任务清单(Phase 1.5完成)
- docs: 更新模块当前状态(60%完成度)
- docs: 更新系统总体设计文档(v2.6)
- chore: 删除测试脚本(test-redcap-query-for-ai.ts, check-env-config.ts)
- chore: 移除REDCap测试环境变量(REDCAP_TEST_*)

技术亮点:
- AI基于REDCap真实数据对话,不编造信息
- 从数据库读取项目配置,不使用环境变量
- 企业微信端测试通过,用户体验良好

测试通过:
-  查询项目记录总数(10条)
-  查询特定患者详情(ID 7)
-  项目信息查询
-  上下文记忆(3轮对话)
-  即时反馈提示

影响范围:IIT Manager Agent模块
2026-01-03 22:48:10 +08:00

307 lines
10 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
宽表转长表Unpivot/Melt操作
提供数据重塑功能,将宽格式转换为长格式。
典型医学场景:
- 多时间点随访数据FMA_基线、FMA_2周 → 时间点列 + FMA值列
- 多指标合并分析(收缩压、舒张压 → 指标列 + 测量值列)
- 治疗组对比治疗组_NRS、对照组_NRS → 组别列 + NRS列
"""
import pandas as pd
import numpy as np
from typing import List, Optional, Dict, Any
import sys
def apply_unpivot(
df: pd.DataFrame,
id_vars: List[str],
value_vars: List[str],
var_name: str = '变量',
value_name: str = '',
parse_column_names: bool = False,
separator: str = '_',
metric_name: Optional[str] = None,
time_name: Optional[str] = None,
dropna: bool = False
) -> pd.DataFrame:
"""
应用宽表转长表转换
Args:
df: 输入数据框
id_vars: ID列保持不变的列
value_vars: 值列(需要转换的列)
var_name: 变量名列名(存储原列名)
value_name: 值列名(存储实际值)
parse_column_names: 是否解析列名(如"FMA_基线""FMA"+"基线"
separator: 列名分隔符
metric_name: 指标列名(解析列名时使用)
time_name: 时间列名(解析列名时使用)
dropna: 是否删除缺失值行
Returns:
转换后的长格式数据框
Examples:
>>> # 场景1多时间点随访数据
>>> df = pd.DataFrame({
... '患者ID': ['P001', 'P002'],
... '性别': ['', ''],
... 'FMA_基线': [32, 28],
... 'FMA_2周': [45, 38],
... 'FMA_1月': [52, 44]
... })
>>> result = apply_unpivot(
... df,
... id_vars=['患者ID', '性别'],
... value_vars=['FMA_基线', 'FMA_2周', 'FMA_1月'],
... var_name='时间点',
... value_name='FMA值'
... )
>>> len(result) # 2人 × 3个时间点 = 6行
6
>>> result.columns.tolist()
['患者ID', '性别', '时间点', 'FMA值']
>>> # 场景2带列名解析
>>> result = apply_unpivot(
... df,
... id_vars=['患者ID', '性别'],
... value_vars=['FMA_基线', 'FMA_2周', 'FMA_1月'],
... parse_column_names=True,
... separator='_',
... metric_name='指标',
... time_name='时间点',
... value_name='测量值'
... )
>>> result.columns.tolist()
['患者ID', '性别', '指标', '时间点', '测量值']
>>> result['指标'].unique().tolist()
['FMA']
>>> result['时间点'].unique().tolist()
['基线', '2周', '1月']
"""
print("\n" + "="*60, flush=True)
print("🔄 开始宽表转长表转换...", flush=True)
print("="*60, flush=True)
# ==================== 参数验证 ====================
if df.empty:
print("⚠️ 输入数据框为空", flush=True)
return df
if not id_vars:
raise ValueError('❌ 至少需要选择1个ID列标识列')
if len(value_vars) < 2:
raise ValueError('❌ 至少需要选择2个值列需要转换的列')
# 验证列是否存在
missing_id_cols = [col for col in id_vars if col not in df.columns]
if missing_id_cols:
raise KeyError(f"❌ ID列不存在: {', '.join(missing_id_cols)}")
missing_value_cols = [col for col in value_vars if col not in df.columns]
if missing_value_cols:
raise KeyError(f"❌ 值列不存在: {', '.join(missing_value_cols)}")
# 检查ID列和值列是否有重复
overlap = set(id_vars) & set(value_vars)
if overlap:
raise ValueError(f"❌ ID列和值列不能重复: {', '.join(overlap)}")
print(f"\n📊 转换前数据概况:", flush=True)
print(f" - 总行数: {len(df)}", flush=True)
print(f" - 总列数: {len(df.columns)}", flush=True)
print(f" - ID列: {len(id_vars)} 个 ({', '.join(id_vars[:3])}{'...' if len(id_vars) > 3 else ''})", flush=True)
print(f" - 值列: {len(value_vars)} 个 ({', '.join(value_vars[:3])}{'...' if len(value_vars) > 3 else ''})", flush=True)
# ==================== 基础转换使用pandas.melt====================
try:
result = pd.melt(
df,
id_vars=id_vars,
value_vars=value_vars,
var_name=var_name,
value_name=value_name
)
print(f"\n✅ 基础转换完成:", flush=True)
print(f" - 转换后行数: {len(result)} (原 {len(df)} × {len(value_vars)})", flush=True)
print(f" - 转换后列数: {len(result.columns)} (ID列 + 变量名列 + 值列)", flush=True)
except Exception as e:
print(f"❌ 转换失败: {str(e)}", flush=True)
raise
# ==================== 高级功能:解析列名 ====================
if parse_column_names and separator:
print(f"\n🔍 开始解析列名(分隔符: '{separator}'...", flush=True)
def parse_column_name(name: str):
"""
解析列名
Examples:
"FMA_基线" → ("FMA", "基线")
"血压_1月" → ("血压", "1月")
"NRS_治疗组_2周" → ("NRS", "治疗组_2周")
"""
parts = name.split(separator)
if len(parts) >= 2:
metric = parts[0]
time = separator.join(parts[1:])
return metric, time
else:
# 没有分隔符,整个作为指标名,时间点留空
return name, ''
try:
# 应用解析函数
parsed = result[var_name].apply(parse_column_name)
# 创建新列
metric_col = metric_name or '指标'
time_col = time_name or '时间点'
result[metric_col] = parsed.str[0]
result[time_col] = parsed.str[1]
# 删除原变量名列(已经拆分了)
result = result.drop(columns=[var_name])
# 统计解析结果
unique_metrics = result[metric_col].nunique()
unique_times = result[time_col].nunique()
print(f"✅ 列名解析完成:", flush=True)
print(f" - {metric_col}列: {unique_metrics} 个唯一值", flush=True)
print(f" - {time_col}列: {unique_times} 个唯一值", flush=True)
# 显示前3个解析示例
sample_original = value_vars[:3]
print(f"\n 解析示例:", flush=True)
for orig in sample_original:
metric, time = parse_column_name(orig)
print(f" - '{orig}'{metric_col}='{metric}', {time_col}='{time}'", flush=True)
except Exception as e:
print(f"⚠️ 列名解析失败: {str(e)}", flush=True)
print(f" 已保留原变量名列: {var_name}", flush=True)
# ==================== 删除缺失值行 ====================
if dropna:
original_len = len(result)
result = result.dropna(subset=[value_name])
dropped = original_len - len(result)
if dropped > 0:
print(f"\n🗑️ 删除缺失值行: {dropped} 行 ({dropped/original_len*100:.1f}%)", flush=True)
# ==================== 排序 ====================
# 排序按ID列排序保持患者分组
result = result.sort_values(id_vars).reset_index(drop=True)
print(f"\n✅ 排序完成: 按 {', '.join(id_vars[:2])}{'...' if len(id_vars) > 2 else ''} 排序", flush=True)
# ==================== 最终统计 ====================
print(f"\n{'='*60}", flush=True)
print(f"✅ 宽表转长表转换完成!", flush=True)
print(f"{'='*60}", flush=True)
print(f"📊 最终数据:", flush=True)
print(f" - 总行数: {len(result)} (扩展了 {len(result)/len(df):.1f}x)", flush=True)
print(f" - 总列数: {len(result.columns)}", flush=True)
print(f" - 列名: {', '.join(result.columns.tolist())}", flush=True)
# 显示前3行示例
print(f"\n 前3行数据示例:", flush=True)
for idx, row in result.head(3).iterrows():
row_str = ' | '.join([f"{col}={row[col]}" for col in result.columns[:4]])
print(f" [{idx}] {row_str}...", flush=True)
return result
def get_unpivot_preview(
df: pd.DataFrame,
id_vars: List[str],
value_vars: List[str],
var_name: str = '变量',
value_name: str = '',
preview_rows: int = 10
) -> Dict[str, Any]:
"""
获取转换预览信息(不实际执行完整转换)
Args:
df: 输入数据框
id_vars: ID列
value_vars: 值列
var_name: 变量名列名
value_name: 值列名
preview_rows: 预览行数
Returns:
{
'original_shape': (rows, cols),
'new_shape': (rows, cols),
'expansion_factor': 扩展倍数,
'preview_data': 前N行数据,
'estimated_change': '将从 100 行 × 15 列 转换为 500 行 × 5 列'
}
"""
original_rows = len(df)
original_cols = len(df.columns)
# 预估转换后的形状
new_rows = original_rows * len(value_vars)
new_cols = len(id_vars) + 2 # ID列 + 变量名列 + 值列
expansion_factor = len(value_vars)
# 生成前几行预览
preview_df = df.head(min(3, len(df)))
preview_result = pd.melt(
preview_df,
id_vars=id_vars,
value_vars=value_vars,
var_name=var_name,
value_name=value_name
)
return {
'original_shape': (original_rows, original_cols),
'new_shape': (new_rows, new_cols),
'expansion_factor': expansion_factor,
'preview_data': preview_result.head(preview_rows).to_dict('records'),
'estimated_change': f"将从 {original_rows}× {original_cols} 列 转换为 {new_rows}× {new_cols}"
}