Files
AIclinicalresearch/python-microservice/operations/binning.py
HaHafeng bdfca32305 docs(iit): REDCap对接技术方案完成与模块状态更新
- 新增《REDCap对接技术方案与实施指南》(1070行)
  - 确定DET+REST API技术方案(不使用External Module)
  - 完整RedcapAdapter/WebhookController/SyncManager代码设计
  - Day 2详细实施步骤与验收标准
- 更新《IIT Manager Agent模块当前状态与开发指南》
  - 记录REDCap本地环境部署完成(15.8.0)
  - 记录对接方案确定过程与技术决策
  - 更新Day 2工作计划(6个阶段详细清单)
  - 整体进度18%(Day 1完成+REDCap环境就绪)
- REDCap环境准备完成
  - 测试项目test0102(PID 16)创建成功
  - DET功能源码验证通过
  - 本地Docker环境稳定运行

技术方案:
- 实时触发: Data Entry Trigger (0秒延迟)
- 数据拉取: REST API exportRecords (增量同步)
- 轮询补充: pg-boss定时任务 (每30分钟)
- 可靠性: Webhook幂等性 + 轮询补充机制
2026-01-02 14:30:38 +08:00

155 lines
3.8 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
生成分类变量(分箱)操作
将连续数值变量转换为分类变量。
支持三种方法:自定义切点、等宽分箱、等频分箱。
"""
import pandas as pd
import numpy as np
from typing import List, Optional, Literal, Union
def apply_binning(
df: pd.DataFrame,
column: str,
method: Literal['custom', 'equal_width', 'equal_freq'],
new_column_name: str,
bins: Optional[List[Union[int, float]]] = None,
labels: Optional[List[Union[str, int]]] = None,
num_bins: int = 3
) -> pd.DataFrame:
"""
应用分箱操作
Args:
df: 输入数据框
column: 要分箱的列名
method: 分箱方法
- 'custom': 自定义切点
- 'equal_width': 等宽分箱
- 'equal_freq': 等频分箱
new_column_name: 新列名
bins: 自定义切点列表仅method='custom'时使用),如 [18, 60] → <18, 18-60, >60
labels: 标签列表(可选)
num_bins: 分组数量仅method='equal_width''equal_freq'时使用)
Returns:
分箱后的数据框
Examples:
>>> df = pd.DataFrame({'年龄': [15, 25, 35, 45, 55, 65, 75]})
>>> result = apply_binning(df, '年龄', 'custom', '年龄分组',
... bins=[18, 60], labels=['青少年', '成年', '老年'])
>>> result['年龄分组'].tolist()
['青少年', '成年', '成年', '成年', '成年', '老年', '老年']
"""
if df.empty:
return df
# 验证列是否存在
if column not in df.columns:
raise KeyError(f"'{column}' 不存在")
# 验证数据类型
if not pd.api.types.is_numeric_dtype(df[column]):
raise TypeError(f"'{column}' 不是数值类型,无法进行分箱")
# 创建结果数据框
result = df.copy()
# 根据方法进行分箱
if method == 'custom':
# 自定义切点
if not bins or len(bins) < 2:
raise ValueError('自定义切点至少需要2个值')
# 验证切点是否升序
if bins != sorted(bins):
raise ValueError('切点必须按升序排列')
# 验证标签数量
if labels and len(labels) != len(bins) - 1:
raise ValueError(f'标签数量({len(labels)})必须等于切点数量-1{len(bins)-1}')
result[new_column_name] = pd.cut(
result[column],
bins=bins,
labels=labels,
right=False,
include_lowest=True
)
elif method == 'equal_width':
# 等宽分箱
if num_bins < 2:
raise ValueError('分组数量至少为2')
result[new_column_name] = pd.cut(
result[column],
bins=num_bins,
labels=labels,
include_lowest=True
)
elif method == 'equal_freq':
# 等频分箱
if num_bins < 2:
raise ValueError('分组数量至少为2')
result[new_column_name] = pd.qcut(
result[column],
q=num_bins,
labels=labels,
duplicates='drop' # 处理重复边界值
)
else:
raise ValueError(f"不支持的分箱方法: {method}")
# 统计分布
print(f'分箱结果分布:')
value_counts = result[new_column_name].value_counts().sort_index()
for category, count in value_counts.items():
percentage = count / len(result) * 100
print(f' {category}: {count} 行 ({percentage:.1f}%)')
# 缺失值统计
missing_count = result[new_column_name].isna().sum()
if missing_count > 0:
print(f'警告: {missing_count} 个值无法分箱(可能是缺失值或边界问题)')
return result