Files
AIclinicalresearch/tests/test_fillna_operations.py
HaHafeng 74cf346453 feat(dc/tool-c): Add missing value imputation feature with 6 methods and MICE
Major features:
1. Missing value imputation (6 simple methods + MICE):
   - Mean/Median/Mode/Constant imputation
   - Forward fill (ffill) and Backward fill (bfill) for time series
   - MICE multivariate imputation (in progress, shape issue to fix)

2. Auto precision detection:
   - Automatically match decimal places of original data
   - Prevent false precision (e.g. 13.57 instead of 13.566716417910449)

3. Categorical variable detection:
   - Auto-detect and skip categorical columns in MICE
   - Show warnings for unsuitable columns
   - Suggest mode imputation for categorical data

4. UI improvements:
   - Rename button: "Delete Missing" to "Missing Value Handling"
   - Remove standalone "Dedup" and "MICE" buttons
   - 3-tab dialog: Delete / Fill / Advanced Fill
   - Display column statistics and recommended methods
   - Extended warning messages (8 seconds for skipped columns)

5. Bug fixes:
   - Fix sessionService.updateSessionData -> saveProcessedData
   - Fix OperationResult interface (add message and stats)
   - Fix Toolbar button labels and removal

Modified files:
Python: operations/fillna.py (new, 556 lines), main.py (3 new endpoints)
Backend: QuickActionService.ts, QuickActionController.ts, routes/index.ts
Frontend: MissingValueDialog.tsx (new, 437 lines), Toolbar.tsx, index.tsx
Tests: test_fillna_operations.py (774 lines), test scripts and docs
Docs: 5 documentation files updated

Known issues:
- MICE imputation has DataFrame shape mismatch issue (under debugging)
- Workaround: Use 6 simple imputation methods first

Status: Development complete, MICE debugging in progress
Lines added: ~2000 lines across 3 tiers
2025-12-10 13:06:00 +08:00

774 lines
28 KiB
Python
Raw Blame History

This file contains invisible Unicode characters
This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
缺失值处理功能 - 自动化测试脚本
测试所有18个测试用例
- 6个基础填补测试
- 4个MICE测试
- 4个边界测试
- 4个数据类型测试
使用方法:
python tests/test_fillna_operations.py
"""
import pandas as pd
import numpy as np
import requests
import time
import json
from typing import Dict, List, Any
from datetime import datetime
import sys
import os
# 配置
PYTHON_SERVICE_URL = "http://localhost:8000"
TEST_DATA_DIR = "tests/test_data"
# 颜色输出
class Colors:
GREEN = '\033[92m'
RED = '\033[91m'
YELLOW = '\033[93m'
BLUE = '\033[94m'
CYAN = '\033[96m'
BOLD = '\033[1m'
END = '\033[0m'
def print_header(text: str):
print(f"\n{Colors.BOLD}{Colors.CYAN}{'='*80}{Colors.END}")
print(f"{Colors.BOLD}{Colors.CYAN}{text.center(80)}{Colors.END}")
print(f"{Colors.BOLD}{Colors.CYAN}{'='*80}{Colors.END}\n")
def print_test(test_num: int, total: int, name: str):
print(f"\n{Colors.BOLD}[{test_num}/{total}] {name}{Colors.END}")
print("-" * 80)
def print_success(message: str):
print(f"{Colors.GREEN}{message}{Colors.END}")
def print_error(message: str):
print(f"{Colors.RED}{message}{Colors.END}")
def print_warning(message: str):
print(f"{Colors.YELLOW}⚠️ {message}{Colors.END}")
def print_info(message: str):
print(f"{Colors.BLUE} {message}{Colors.END}")
class FillnaTestSuite:
def __init__(self):
self.passed = 0
self.failed = 0
self.errors = []
self.start_time = None
# 确保测试数据目录存在
os.makedirs(TEST_DATA_DIR, exist_ok=True)
def generate_test_data(self) -> Dict[str, pd.DataFrame]:
"""生成各种测试数据集"""
print_info("生成测试数据...")
np.random.seed(42)
n_rows = 100
# 数据集1数值列正态分布
df_numeric = pd.DataFrame({
'id': range(1, n_rows + 1),
'体重': np.random.normal(65, 10, n_rows),
'身高': np.random.normal(170, 8, n_rows),
'年龄': np.random.randint(20, 60, n_rows)
})
# 随机插入15%缺失值
mask = np.random.random(n_rows) < 0.15
df_numeric.loc[mask, '体重'] = np.nan
mask = np.random.random(n_rows) < 0.15
df_numeric.loc[mask, '身高'] = np.nan
# 数据集2分类列
df_categorical = pd.DataFrame({
'id': range(1, n_rows + 1),
'婚姻状况': np.random.choice(['已婚', '未婚', '离异'], n_rows),
'教育程度': np.random.choice(['本科', '硕士', '博士', '高中'], n_rows),
})
mask = np.random.random(n_rows) < 0.20
df_categorical.loc[mask, '婚姻状况'] = np.nan
# 数据集3时间序列前向/后向填充)
dates = pd.date_range('2024-01-01', periods=n_rows, freq='D')
df_timeseries = pd.DataFrame({
'date': dates,
'temperature': np.random.normal(20, 5, n_rows),
'humidity': np.random.uniform(40, 80, n_rows)
})
# 连续缺失
df_timeseries.loc[10:15, 'temperature'] = np.nan
df_timeseries.loc[30:32, 'humidity'] = np.nan
# 数据集4边界情况
df_edge_cases = pd.DataFrame({
'id': range(1, 11),
'all_missing': [np.nan] * 10, # 100%缺失
'no_missing': range(1, 11), # 0%缺失
'half_missing': [1, np.nan, 3, np.nan, 5, np.nan, 7, np.nan, 9, np.nan],
})
# 数据集5混合类型
df_mixed = pd.DataFrame({
'id': range(1, n_rows + 1),
'数值列': np.random.normal(100, 20, n_rows),
'分类列': np.random.choice(['A', 'B', 'C'], n_rows),
'整数列': np.random.randint(1, 100, n_rows),
})
mask = np.random.random(n_rows) < 0.10
df_mixed.loc[mask, '数值列'] = np.nan
df_mixed.loc[mask, '分类列'] = np.nan
datasets = {
'numeric': df_numeric,
'categorical': df_categorical,
'timeseries': df_timeseries,
'edge_cases': df_edge_cases,
'mixed': df_mixed
}
print_success(f"生成了 {len(datasets)} 个测试数据集")
for name, df in datasets.items():
print(f"{name}: {df.shape[0]}× {df.shape[1]}")
return datasets
def test_service_health(self) -> bool:
"""测试Python服务是否正常运行"""
print_info("检查Python服务状态...")
try:
response = requests.get(f"{PYTHON_SERVICE_URL}/health", timeout=5)
if response.status_code == 200:
print_success("Python服务运行正常")
return True
else:
print_error(f"Python服务响应异常: {response.status_code}")
return False
except Exception as e:
print_error(f"无法连接到Python服务: {str(e)}")
print_warning(f"请确保服务已启动: cd extraction_service && python main.py")
return False
def call_fillna_simple(self, data: List[Dict], column: str, new_column_name: str,
method: str, fill_value: Any = None) -> Dict:
"""调用简单填补API"""
payload = {
"data": data,
"column": column,
"new_column_name": new_column_name,
"method": method,
"fill_value": fill_value
}
response = requests.post(
f"{PYTHON_SERVICE_URL}/api/operations/fillna-simple",
json=payload,
timeout=30
)
return response.json()
def call_fillna_stats(self, data: List[Dict], column: str) -> Dict:
"""调用统计API"""
payload = {
"data": data,
"column": column
}
response = requests.post(
f"{PYTHON_SERVICE_URL}/api/operations/fillna-stats",
json=payload,
timeout=10
)
return response.json()
def call_fillna_mice(self, data: List[Dict], columns: List[str],
n_iterations: int = 10, random_state: int = 42) -> Dict:
"""调用MICE填补API"""
payload = {
"data": data,
"columns": columns,
"n_iterations": n_iterations,
"random_state": random_state
}
response = requests.post(
f"{PYTHON_SERVICE_URL}/api/operations/fillna-mice",
json=payload,
timeout=120
)
return response.json()
def verify_result(self, result: Dict, expected_keys: List[str]) -> bool:
"""验证结果是否包含必要的字段"""
if not result.get('success'):
print_error(f"API返回失败: {result.get('error', 'Unknown error')}")
return False
for key in expected_keys:
if key not in result:
print_error(f"结果缺少字段: {key}")
return False
return True
def verify_new_column_created(self, result_data: List[Dict], new_column: str,
original_column: str) -> bool:
"""验证新列是否创建"""
if not result_data:
print_error("结果数据为空")
return False
first_row = result_data[0]
if new_column not in first_row:
print_error(f"新列 '{new_column}' 未创建")
return False
if original_column not in first_row:
print_error(f"原列 '{original_column}' 丢失")
return False
return True
def verify_column_position(self, result_data: List[Dict], new_column: str,
original_column: str) -> bool:
"""验证新列是否在原列旁边"""
if not result_data:
return False
columns = list(result_data[0].keys())
try:
orig_idx = columns.index(original_column)
new_idx = columns.index(new_column)
if new_idx == orig_idx + 1:
print_success(f"✓ 新列位置正确(紧邻原列)")
return True
else:
print_warning(f"新列位置: {new_idx}, 原列位置: {orig_idx}")
return False
except ValueError as e:
print_error(f"列位置检查失败: {str(e)}")
return False
def count_missing_values(self, data: List[Dict], column: str) -> int:
"""统计缺失值数量"""
count = 0
for row in data:
val = row.get(column)
if val is None or (isinstance(val, float) and np.isnan(val)):
count += 1
return count
# ==================== 基础测试6个====================
def test_1_mean_fill(self, datasets: Dict) -> bool:
"""测试1: 均值填补数值列"""
print_test(1, 18, "均值填补数值列")
df = datasets['numeric']
data = df.to_dict('records')
# 调用API
result = self.call_fillna_simple(data, '体重', '体重_均值', 'mean')
# 验证
if not self.verify_result(result, ['result_data', 'message']):
return False
result_data = result['result_data']
if not self.verify_new_column_created(result_data, '体重_均值', '体重'):
return False
# 检查缺失值是否被填补
missing_count = self.count_missing_values(result_data, '体重_均值')
if missing_count > 0:
print_error(f"填补后仍有 {missing_count} 个缺失值")
return False
print_success("均值填补成功,缺失值已全部填补")
self.verify_column_position(result_data, '体重_均值', '体重')
return True
def test_2_median_fill(self, datasets: Dict) -> bool:
"""测试2: 中位数填补偏态分布列"""
print_test(2, 18, "中位数填补偏态分布列")
df = datasets['numeric']
data = df.to_dict('records')
result = self.call_fillna_simple(data, '身高', '身高_中位数', 'median')
if not self.verify_result(result, ['result_data', 'message']):
return False
result_data = result['result_data']
missing_count = self.count_missing_values(result_data, '身高_中位数')
if missing_count == 0:
print_success("中位数填补成功")
return True
else:
print_error(f"填补后仍有 {missing_count} 个缺失值")
return False
def test_3_mode_fill(self, datasets: Dict) -> bool:
"""测试3: 众数填补分类列"""
print_test(3, 18, "众数填补分类列")
df = datasets['categorical']
data = df.to_dict('records')
result = self.call_fillna_simple(data, '婚姻状况', '婚姻状况_众数', 'mode')
if not self.verify_result(result, ['result_data', 'message']):
return False
result_data = result['result_data']
missing_count = self.count_missing_values(result_data, '婚姻状况_众数')
if missing_count == 0:
print_success("众数填补成功")
return True
else:
print_error(f"填补后仍有 {missing_count} 个缺失值")
return False
def test_4_constant_fill(self, datasets: Dict) -> bool:
"""测试4: 固定值填补0"""
print_test(4, 18, "固定值填补0")
df = datasets['numeric']
data = df.to_dict('records')
result = self.call_fillna_simple(data, '体重', '体重_固定值', 'constant', fill_value=0)
if not self.verify_result(result, ['result_data', 'message']):
return False
result_data = result['result_data']
missing_count = self.count_missing_values(result_data, '体重_固定值')
# 检查是否有值被填充为0
filled_zeros = sum(1 for row in result_data if row.get('体重_固定值') == 0)
if missing_count == 0 and filled_zeros > 0:
print_success(f"固定值填补成功,填充了 {filled_zeros} 个0")
return True
else:
print_error("固定值填补失败")
return False
def test_5_ffill(self, datasets: Dict) -> bool:
"""测试5: 前向填充ffill"""
print_test(5, 18, "前向填充ffill")
df = datasets['timeseries']
data = df.to_dict('records')
result = self.call_fillna_simple(data, 'temperature', 'temperature_ffill', 'ffill')
if not self.verify_result(result, ['result_data', 'message']):
return False
result_data = result['result_data']
missing_count = self.count_missing_values(result_data, 'temperature_ffill')
if missing_count == 0:
print_success("前向填充成功 ⭐")
return True
else:
print_error(f"填充后仍有 {missing_count} 个缺失值")
return False
def test_6_bfill(self, datasets: Dict) -> bool:
"""测试6: 后向填充bfill"""
print_test(6, 18, "后向填充bfill")
df = datasets['timeseries']
data = df.to_dict('records')
result = self.call_fillna_simple(data, 'humidity', 'humidity_bfill', 'bfill')
if not self.verify_result(result, ['result_data', 'message']):
return False
result_data = result['result_data']
missing_count = self.count_missing_values(result_data, 'humidity_bfill')
if missing_count == 0:
print_success("后向填充成功 ⭐")
return True
else:
print_error(f"填充后仍有 {missing_count} 个缺失值")
return False
# ==================== MICE测试4个====================
def test_7_mice_single_column(self, datasets: Dict) -> bool:
"""测试7: MICE填补单列"""
print_test(7, 18, "MICE填补单列")
df = datasets['numeric']
data = df.to_dict('records')
result = self.call_fillna_mice(data, ['体重'], n_iterations=5)
if not self.verify_result(result, ['result_data', 'message']):
return False
result_data = result['result_data']
missing_count = self.count_missing_values(result_data, '体重_MICE')
if missing_count == 0:
print_success("MICE单列填补成功 ⭐⭐⭐")
return True
else:
print_error(f"填补后仍有 {missing_count} 个缺失值")
return False
def test_8_mice_multiple_columns(self, datasets: Dict) -> bool:
"""测试8: MICE填补多列"""
print_test(8, 18, "MICE填补多列")
df = datasets['numeric']
data = df.to_dict('records')
result = self.call_fillna_mice(data, ['体重', '身高'], n_iterations=5)
if not self.verify_result(result, ['result_data', 'message']):
return False
result_data = result['result_data']
# 检查两个新列是否都创建
if '体重_MICE' not in result_data[0] or '身高_MICE' not in result_data[0]:
print_error("MICE新列未全部创建")
return False
missing_count_1 = self.count_missing_values(result_data, '体重_MICE')
missing_count_2 = self.count_missing_values(result_data, '身高_MICE')
if missing_count_1 == 0 and missing_count_2 == 0:
print_success("MICE多列填补成功 ⭐⭐⭐")
return True
else:
print_error(f"填补不完整: 体重 {missing_count_1}, 身高 {missing_count_2}")
return False
def test_9_mice_iterations(self, datasets: Dict) -> bool:
"""测试9: MICE填补 - 不同迭代次数"""
print_test(9, 18, "MICE填补 - 不同迭代次数")
df = datasets['numeric']
data = df.to_dict('records')
# 测试5次迭代和20次迭代
result_5 = self.call_fillna_mice(data, ['体重'], n_iterations=5)
result_20 = self.call_fillna_mice(data, ['体重'], n_iterations=20)
if not result_5.get('success') or not result_20.get('success'):
print_error("不同迭代次数测试失败")
return False
print_success("不同迭代次数都能成功执行")
return True
def test_10_mice_random_state(self, datasets: Dict) -> bool:
"""测试10: MICE填补 - 自定义随机种子"""
print_test(10, 18, "MICE填补 - 自定义随机种子")
df = datasets['numeric']
data = df.to_dict('records')
# 相同随机种子应产生相同结果
result_1 = self.call_fillna_mice(data, ['体重'], random_state=42)
result_2 = self.call_fillna_mice(data, ['体重'], random_state=42)
if not result_1.get('success') or not result_2.get('success'):
print_error("随机种子测试失败")
return False
print_success("随机种子功能正常")
return True
# ==================== 边界测试4个====================
def test_11_all_missing(self, datasets: Dict) -> bool:
"""测试11: 100%缺失的列"""
print_test(11, 18, "100%缺失的列")
df = datasets['edge_cases']
data = df.to_dict('records')
result = self.call_fillna_simple(data, 'all_missing', 'all_missing_filled', 'mean')
# 这个应该失败或给出警告
if result.get('success'):
print_warning("100%缺失的列填补成功可能填充了NaN")
return True
else:
print_success("正确处理了100%缺失的情况")
return True
def test_12_no_missing(self, datasets: Dict) -> bool:
"""测试12: 0%缺失的列(无需填补)"""
print_test(12, 18, "0%缺失的列(无需填补)")
df = datasets['edge_cases']
data = df.to_dict('records')
result = self.call_fillna_simple(data, 'no_missing', 'no_missing_filled', 'mean')
if result.get('success'):
print_success("0%缺失的列处理正常")
return True
else:
print_error("处理无缺失列失败")
return False
def test_13_stats_api(self, datasets: Dict) -> bool:
"""测试13: 统计API功能"""
print_test(13, 18, "统计API功能")
df = datasets['numeric']
data = df.to_dict('records')
result = self.call_fillna_stats(data, '体重')
if not result.get('success'):
print_error("统计API失败")
return False
stats = result.get('stats', {})
required_fields = ['missing_count', 'missing_rate', 'valid_count', 'total_count']
for field in required_fields:
if field not in stats:
print_error(f"统计信息缺少字段: {field}")
return False
print_success(f"统计API正常 - 缺失率: {stats['missing_rate']}%")
print_info(f" 缺失: {stats['missing_count']}, 有效: {stats['valid_count']}")
return True
def test_14_column_name_with_special_chars(self, datasets: Dict) -> bool:
"""测试14: 特殊字符列名处理"""
print_test(14, 18, "特殊字符列名处理")
# 创建包含特殊字符的列名
df = pd.DataFrame({
'id': range(1, 11),
'体重kg': [60, np.nan, 70, 65, np.nan, 75, 80, np.nan, 68, 72]
})
data = df.to_dict('records')
result = self.call_fillna_simple(data, '体重kg', '体重kg_填补', 'median')
if result.get('success'):
print_success("特殊字符列名处理正常")
return True
else:
print_warning(f"特殊字符列名处理失败: {result.get('error')}")
return False
# ==================== 数据类型测试4个====================
def test_15_numeric_types(self, datasets: Dict) -> bool:
"""测试15: 数值列int/float"""
print_test(15, 18, "数值列int/float")
df = datasets['numeric']
data = df.to_dict('records')
# 测试整数列
result = self.call_fillna_simple(data, '年龄', '年龄_填补', 'median')
if result.get('success'):
print_success("数值类型处理正常")
return True
else:
print_error("数值类型处理失败")
return False
def test_16_categorical_types(self, datasets: Dict) -> bool:
"""测试16: 分类列(字符串)"""
print_test(16, 18, "分类列(字符串)")
df = datasets['categorical']
data = df.to_dict('records')
result = self.call_fillna_simple(data, '教育程度', '教育程度_填补', 'mode')
if result.get('success'):
print_success("分类类型处理正常")
return True
else:
print_error("分类类型处理失败")
return False
def test_17_mixed_types(self, datasets: Dict) -> bool:
"""测试17: 混合类型列"""
print_test(17, 18, "混合类型列")
df = datasets['mixed']
data = df.to_dict('records')
# MICE填补多种类型
result = self.call_fillna_mice(data, ['数值列', '整数列'], n_iterations=5)
if result.get('success'):
print_success("混合类型处理正常")
return True
else:
print_warning(f"混合类型处理: {result.get('error')}")
return False
def test_18_performance(self, datasets: Dict) -> bool:
"""测试18: 性能测试(大数据集)"""
print_test(18, 18, "性能测试1000行")
# 生成1000行数据
n_rows = 1000
df_large = pd.DataFrame({
'id': range(1, n_rows + 1),
'value1': np.random.normal(100, 20, n_rows),
'value2': np.random.normal(50, 10, n_rows),
})
mask = np.random.random(n_rows) < 0.10
df_large.loc[mask, 'value1'] = np.nan
df_large.loc[mask, 'value2'] = np.nan
data = df_large.to_dict('records')
start_time = time.time()
result = self.call_fillna_mice(data, ['value1', 'value2'], n_iterations=5)
elapsed = time.time() - start_time
if result.get('success'):
print_success(f"性能测试通过 - 耗时: {elapsed:.2f}")
if elapsed < 30:
print_success("性能优秀(<30秒")
elif elapsed < 60:
print_info("性能良好30-60秒")
else:
print_warning(f"性能较慢({elapsed:.2f}秒)")
return True
else:
print_error("性能测试失败")
return False
# ==================== 主测试流程 ====================
def run_all_tests(self):
"""运行所有测试"""
print_header("缺失值处理功能 - 自动化测试")
self.start_time = time.time()
# 1. 检查服务状态
if not self.test_service_health():
print_error("Python服务未运行无法继续测试")
return
# 2. 生成测试数据
datasets = self.generate_test_data()
# 3. 运行所有测试
tests = [
# 基础测试
(self.test_1_mean_fill, "基础"),
(self.test_2_median_fill, "基础"),
(self.test_3_mode_fill, "基础"),
(self.test_4_constant_fill, "基础"),
(self.test_5_ffill, "基础"),
(self.test_6_bfill, "基础"),
# MICE测试
(self.test_7_mice_single_column, "MICE"),
(self.test_8_mice_multiple_columns, "MICE"),
(self.test_9_mice_iterations, "MICE"),
(self.test_10_mice_random_state, "MICE"),
# 边界测试
(self.test_11_all_missing, "边界"),
(self.test_12_no_missing, "边界"),
(self.test_13_stats_api, "边界"),
(self.test_14_column_name_with_special_chars, "边界"),
# 数据类型测试
(self.test_15_numeric_types, "类型"),
(self.test_16_categorical_types, "类型"),
(self.test_17_mixed_types, "类型"),
(self.test_18_performance, "性能"),
]
for test_func, category in tests:
try:
if test_func(datasets):
self.passed += 1
else:
self.failed += 1
self.errors.append(f"{test_func.__name__}")
except Exception as e:
self.failed += 1
self.errors.append(f"{test_func.__name__}: {str(e)}")
print_error(f"测试异常: {str(e)}")
# 4. 输出总结
self.print_summary()
def print_summary(self):
"""输出测试总结"""
elapsed = time.time() - self.start_time
print_header("测试总结")
total = self.passed + self.failed
pass_rate = (self.passed / total * 100) if total > 0 else 0
print(f"{Colors.BOLD}总测试数: {total}{Colors.END}")
print(f"{Colors.GREEN}✅ 通过: {self.passed}{Colors.END}")
print(f"{Colors.RED}❌ 失败: {self.failed}{Colors.END}")
print(f"{Colors.CYAN}通过率: {pass_rate:.1f}%{Colors.END}")
print(f"{Colors.BLUE}总耗时: {elapsed:.2f}{Colors.END}")
if self.errors:
print(f"\n{Colors.RED}{Colors.BOLD}失败的测试:{Colors.END}")
for error in self.errors:
print(f"{error}")
if self.failed == 0:
print(f"\n{Colors.GREEN}{Colors.BOLD}{'🎉 所有测试通过!'.center(80)}{Colors.END}")
else:
print(f"\n{Colors.YELLOW}{Colors.BOLD}{'⚠️ 部分测试失败,请检查错误信息'.center(80)}{Colors.END}")
if __name__ == "__main__":
print(f"{Colors.BOLD}{Colors.CYAN}")
print("""
╔══════════════════════════════════════════════════════════════════╗
║ ║
║ 缺失值处理功能 - 自动化测试脚本 v1.0 ║
║ ║
║ 测试内容: 18个测试用例 ║
║ - 6个基础填补测试 ║
║ - 4个MICE测试 ║
║ - 4个边界测试 ║
║ - 4个数据类型测试 ║
║ ║
╚══════════════════════════════════════════════════════════════════╝
""")
print(Colors.END)
suite = FillnaTestSuite()
suite.run_all_tests()