Files
AIclinicalresearch/extraction_service/operations/dropna.py
HaHafeng 4c5bb3d174 feat(iit): Initialize IIT Manager Agent MVP - Day 1 complete
- Add iit_schema with 5 tables
- Create module structure and types (223 lines)
- WeChat integration verified (Access Token success)
- Update system docs to v2.4
- Add REDCap source folders to .gitignore
- Day 1/14 complete (11/11 tasks)
2025-12-31 18:35:05 +08:00

179 lines
4.7 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
删除缺失值 - 预写函数
支持按行删除、按列删除、阈值控制
"""
import pandas as pd
from typing import Literal, Optional, List
def drop_missing_values(
df: pd.DataFrame,
method: Literal['row', 'column', 'both'] = 'row',
threshold: Optional[float] = None,
subset: Optional[List[str]] = None
) -> pd.DataFrame:
"""
删除缺失值
Args:
df: 输入数据框
method: 删除方式
- 'row': 删除包含缺失值的行
- 'column': 删除缺失值过多的列
- 'both': 先删除列,再删除行
threshold: 缺失率阈值0-1之间仅对'column''both'有效
- 如果列的缺失率超过此阈值,则删除该列
- 默认为0.550%
subset: 仅检查指定列的缺失值(仅对'row'有效)
Returns:
删除缺失值后的数据框
示例:
# 删除包含任何缺失值的行
drop_missing_values(df, method='row')
# 删除缺失率>30%的列
drop_missing_values(df, method='column', threshold=0.3)
# 先删除缺失列,再删除缺失行
drop_missing_values(df, method='both', threshold=0.5)
# 仅检查指定列
drop_missing_values(df, method='row', subset=['年龄', 'BMI'])
"""
result = df.copy()
original_shape = result.shape
print(f'原始数据: {original_shape[0]}× {original_shape[1]}')
print(f'缺失值总数: {result.isna().sum().sum()}')
print('')
# 默认阈值
if threshold is None:
threshold = 0.5
# 按列删除
if method in ('column', 'both'):
# 计算每列的缺失率
missing_rate = result.isna().sum() / len(result)
cols_to_drop = missing_rate[missing_rate > threshold].index.tolist()
if cols_to_drop:
print(f'检测到缺失率>{threshold*100:.0f}%的列: {len(cols_to_drop)}')
for col in cols_to_drop:
rate = missing_rate[col]
count = result[col].isna().sum()
print(f' - {col}: 缺失率={rate*100:.1f}% ({count}/{len(result)})')
result = result.drop(columns=cols_to_drop)
print(f'删除后: {result.shape[0]}× {result.shape[1]}')
print('')
else:
print(f'没有找到缺失率>{threshold*100:.0f}%的列')
print('')
# 按行删除
if method in ('row', 'both'):
before_rows = len(result)
if subset:
# 仅检查指定列
print(f'仅检查指定列的缺失值: {subset}')
result = result.dropna(subset=subset)
else:
# 检查所有列
result = result.dropna()
dropped_rows = before_rows - len(result)
if dropped_rows > 0:
print(f'删除了 {dropped_rows} 行(包含缺失值的行)')
print(f'保留了 {len(result)} 行({len(result)/before_rows*100:.1f}%')
else:
print('没有找到包含缺失值的行')
print('')
# 最终统计
final_shape = result.shape
print(f'最终结果: {final_shape[0]}× {final_shape[1]}')
print(f'删除了 {original_shape[0] - final_shape[0]}')
print(f'删除了 {original_shape[1] - final_shape[1]}')
print(f'剩余缺失值: {result.isna().sum().sum()}')
# 如果结果为空,给出警告
if len(result) == 0:
print('\n⚠️ 警告: 删除后数据为空!')
return result
def get_missing_summary(df: pd.DataFrame) -> dict:
"""
获取缺失值统计摘要
Args:
df: 输入数据框
Returns:
缺失值统计信息
"""
total_cells = df.shape[0] * df.shape[1]
total_missing = df.isna().sum().sum()
# 按列统计
col_missing = df.isna().sum()
col_missing_rate = col_missing / len(df)
cols_with_missing = col_missing[col_missing > 0].to_dict()
cols_missing_rate = col_missing_rate[col_missing > 0].to_dict()
# 按行统计
row_missing = df.isna().sum(axis=1)
rows_with_missing = (row_missing > 0).sum()
return {
'total_cells': total_cells,
'total_missing': int(total_missing),
'missing_rate': total_missing / total_cells if total_cells > 0 else 0,
'rows_with_missing': int(rows_with_missing),
'cols_with_missing': len(cols_with_missing),
'col_missing_detail': {
col: {
'count': int(count),
'rate': float(cols_missing_rate[col])
}
for col, count in cols_with_missing.items()
}
}