Files
AIclinicalresearch/extraction_service/operations/conditional.py
HaHafeng f4f1d09837 feat(dc/tool-c): Add pivot column ordering and NA handling features
Major features:
1. Pivot transformation enhancements:
   - Add option to keep unselected columns with 3 aggregation methods
   - Maintain original column order after pivot (aligned with source file)
   - Preserve pivot value order (first appearance order)

2. NA handling across 4 core functions:
   - Recode: Support keep/map/drop for NA values
   - Filter: Already supports is_null/not_null operators
   - Binning: Support keep/label/assign for NA values (fix nan display)
   - Conditional: Add is_null/not_null operators

3. UI improvements:
   - Enable column header tooltips with custom header component
   - Add closeable alert for 50-row preview
   - Fix page scrollbar issues

Modified files:
Python: pivot.py, recode.py, binning.py, conditional.py, main.py
Backend: SessionController, QuickActionController, QuickActionService
Frontend: PivotDialog, RecodeDialog, BinningDialog, ConditionalDialog, DataGrid, index

Status: Ready for testing
2025-12-09 14:40:14 +08:00

203 lines
6.6 KiB
Python

"""
条件生成列 - 预写函数
支持复杂的IF-THEN-ELSE多条件逻辑
"""
import pandas as pd
from typing import List, Dict, Any, Union
def apply_conditional_column(
df: pd.DataFrame,
new_column_name: str,
rules: List[Dict[str, Any]],
else_value: Any = None
) -> pd.DataFrame:
"""
根据多条件规则生成新列
Args:
df: 输入数据框
new_column_name: 新列名称
rules: 规则列表,每个规则包含:
- conditions: 条件列表
- logic: 'and''or'
- result: 满足条件时的结果值
else_value: 所有规则都不满足时的默认值
Returns:
添加了新列的数据框
示例:
rules = [
{
"conditions": [
{"column": "年龄", "operator": ">=", "value": 60}
],
"logic": "and",
"result": "老年"
},
{
"conditions": [
{"column": "年龄", "operator": ">=", "value": 18},
{"column": "年龄", "operator": "<", "value": 60}
],
"logic": "and",
"result": "成年"
}
]
"""
result = df.copy()
# 验证规则
if not rules or len(rules) == 0:
raise ValueError('至少需要1条规则')
# 验证所有引用的列是否存在
for rule in rules:
for condition in rule.get('conditions', []):
column = condition.get('column')
if column not in result.columns:
raise ValueError(f'"{column}" 不存在')
# 初始化新列为else_value
result[new_column_name] = else_value
print(f'开始应用条件规则,共 {len(rules)} 条规则')
# 按顺序应用每条规则
for rule_idx, rule in enumerate(rules, 1):
conditions = rule.get('conditions', [])
logic = rule.get('logic', 'and')
result_value = rule.get('result')
if not conditions:
continue
# 构建每个条件的mask
masks = []
for condition in conditions:
column = condition['column']
operator = condition['operator']
value = condition['value']
# 智能类型转换:对于数字比较运算符,尝试将列转换为数字
if operator in ('>', '<', '>=', '<='):
# 尝试将列转换为数字类型
try:
col_data = pd.to_numeric(result[column], errors='coerce')
# 确保value也是数字
if not isinstance(value, (int, float)):
value = float(value)
except Exception:
# 如果转换失败,使用原始数据
col_data = result[column]
else:
# 对于相等/不相等比较,使用原始数据
col_data = result[column]
# 根据运算符生成mask
if operator == '=':
mask = col_data == value
elif operator == '!=':
mask = col_data != value
elif operator == '>':
mask = col_data > value
elif operator == '<':
mask = col_data < value
elif operator == '>=':
mask = col_data >= value
elif operator == '<=':
mask = col_data <= value
elif operator == 'is_null': # ✨ 新增:为空
mask = result[column].isna()
elif operator == 'not_null': # ✨ 新增:不为空
mask = result[column].notna()
else:
raise ValueError(f'不支持的运算符: {operator}')
masks.append(mask)
# 组合条件
if logic == 'and':
final_mask = pd.concat(masks, axis=1).all(axis=1)
elif logic == 'or':
final_mask = pd.concat(masks, axis=1).any(axis=1)
else:
raise ValueError(f'不支持的逻辑运算符: {logic}')
# 应用规则
matched_count = final_mask.sum()
result.loc[final_mask, new_column_name] = result_value
print(f' 规则{rule_idx}: 匹配 {matched_count} 行 → 值为 {result_value}')
# ✨ 优化:将新列移到第一个引用列旁边
first_ref_col = rules[0]['conditions'][0]['column'] # 使用第一个规则的第一个条件列作为参考
original_col_index = result.columns.get_loc(first_ref_col)
cols = list(result.columns)
# 移除新列(当前在最后)
cols.remove(new_column_name)
# 插入到原列旁边
cols.insert(original_col_index + 1, new_column_name)
result = result[cols]
# 统计结果分布
print(f'\n结果分布:')
value_counts = result[new_column_name].value_counts(dropna=False)
for value, count in value_counts.items():
percentage = count / len(result) * 100
if pd.isna(value):
print(f' (空值): {count} 行 ({percentage:.1f}%)')
else:
print(f' {value}: {count} 行 ({percentage:.1f}%)')
return result
def apply_simple_binning(
df: pd.DataFrame,
column: str,
new_column_name: str,
threshold: float,
value_if_true: Any = 1,
value_if_false: Any = 0
) -> pd.DataFrame:
"""
简单二分类(单一阈值判断)
这是条件生成列的简化版本,用于单一阈值判断
Args:
df: 输入数据框
column: 用于判断的列
new_column_name: 新列名称
threshold: 阈值
value_if_true: >= threshold时的值
value_if_false: < threshold时的值
Returns:
添加了新列的数据框
示例:
住院患者暴露分组:
督脉针刺持续时间 >= 10 → 1 (暴露)
督脉针刺持续时间 < 10 → 0 (非暴露)
"""
result = df.copy()
if column not in result.columns:
raise ValueError(f'"{column}" 不存在')
# 简单的阈值判断
result[new_column_name] = (result[column] >= threshold).astype(int) * value_if_true + \
(result[column] < threshold).astype(int) * value_if_false
# 统计分布
print(f'简单二分类结果:')
print(f' {column} >= {threshold}: {(result[column] >= threshold).sum()} 行 → {value_if_true}')
print(f' {column} < {threshold}: {(result[column] < threshold).sum()} 行 → {value_if_false}')
return result