Major features: 1. Missing value imputation (6 simple methods + MICE): - Mean/Median/Mode/Constant imputation - Forward fill (ffill) and Backward fill (bfill) for time series - MICE multivariate imputation (in progress, shape issue to fix) 2. Auto precision detection: - Automatically match decimal places of original data - Prevent false precision (e.g. 13.57 instead of 13.566716417910449) 3. Categorical variable detection: - Auto-detect and skip categorical columns in MICE - Show warnings for unsuitable columns - Suggest mode imputation for categorical data 4. UI improvements: - Rename button: "Delete Missing" to "Missing Value Handling" - Remove standalone "Dedup" and "MICE" buttons - 3-tab dialog: Delete / Fill / Advanced Fill - Display column statistics and recommended methods - Extended warning messages (8 seconds for skipped columns) 5. Bug fixes: - Fix sessionService.updateSessionData -> saveProcessedData - Fix OperationResult interface (add message and stats) - Fix Toolbar button labels and removal Modified files: Python: operations/fillna.py (new, 556 lines), main.py (3 new endpoints) Backend: QuickActionService.ts, QuickActionController.ts, routes/index.ts Frontend: MissingValueDialog.tsx (new, 437 lines), Toolbar.tsx, index.tsx Tests: test_fillna_operations.py (774 lines), test scripts and docs Docs: 5 documentation files updated Known issues: - MICE imputation has DataFrame shape mismatch issue (under debugging) - Workaround: Use 6 simple imputation methods first Status: Development complete, MICE debugging in progress Lines added: ~2000 lines across 3 tiers
774 lines
28 KiB
Python
774 lines
28 KiB
Python
"""
|
||
缺失值处理功能 - 自动化测试脚本
|
||
|
||
测试所有18个测试用例:
|
||
- 6个基础填补测试
|
||
- 4个MICE测试
|
||
- 4个边界测试
|
||
- 4个数据类型测试
|
||
|
||
使用方法:
|
||
python tests/test_fillna_operations.py
|
||
"""
|
||
|
||
import pandas as pd
|
||
import numpy as np
|
||
import requests
|
||
import time
|
||
import json
|
||
from typing import Dict, List, Any
|
||
from datetime import datetime
|
||
import sys
|
||
import os
|
||
|
||
# 配置
|
||
PYTHON_SERVICE_URL = "http://localhost:8000"
|
||
TEST_DATA_DIR = "tests/test_data"
|
||
|
||
# 颜色输出
|
||
class Colors:
|
||
GREEN = '\033[92m'
|
||
RED = '\033[91m'
|
||
YELLOW = '\033[93m'
|
||
BLUE = '\033[94m'
|
||
CYAN = '\033[96m'
|
||
BOLD = '\033[1m'
|
||
END = '\033[0m'
|
||
|
||
def print_header(text: str):
|
||
print(f"\n{Colors.BOLD}{Colors.CYAN}{'='*80}{Colors.END}")
|
||
print(f"{Colors.BOLD}{Colors.CYAN}{text.center(80)}{Colors.END}")
|
||
print(f"{Colors.BOLD}{Colors.CYAN}{'='*80}{Colors.END}\n")
|
||
|
||
def print_test(test_num: int, total: int, name: str):
|
||
print(f"\n{Colors.BOLD}[{test_num}/{total}] {name}{Colors.END}")
|
||
print("-" * 80)
|
||
|
||
def print_success(message: str):
|
||
print(f"{Colors.GREEN}✅ {message}{Colors.END}")
|
||
|
||
def print_error(message: str):
|
||
print(f"{Colors.RED}❌ {message}{Colors.END}")
|
||
|
||
def print_warning(message: str):
|
||
print(f"{Colors.YELLOW}⚠️ {message}{Colors.END}")
|
||
|
||
def print_info(message: str):
|
||
print(f"{Colors.BLUE}ℹ️ {message}{Colors.END}")
|
||
|
||
|
||
class FillnaTestSuite:
|
||
def __init__(self):
|
||
self.passed = 0
|
||
self.failed = 0
|
||
self.errors = []
|
||
self.start_time = None
|
||
|
||
# 确保测试数据目录存在
|
||
os.makedirs(TEST_DATA_DIR, exist_ok=True)
|
||
|
||
def generate_test_data(self) -> Dict[str, pd.DataFrame]:
|
||
"""生成各种测试数据集"""
|
||
print_info("生成测试数据...")
|
||
|
||
np.random.seed(42)
|
||
n_rows = 100
|
||
|
||
# 数据集1:数值列(正态分布)
|
||
df_numeric = pd.DataFrame({
|
||
'id': range(1, n_rows + 1),
|
||
'体重': np.random.normal(65, 10, n_rows),
|
||
'身高': np.random.normal(170, 8, n_rows),
|
||
'年龄': np.random.randint(20, 60, n_rows)
|
||
})
|
||
# 随机插入15%缺失值
|
||
mask = np.random.random(n_rows) < 0.15
|
||
df_numeric.loc[mask, '体重'] = np.nan
|
||
mask = np.random.random(n_rows) < 0.15
|
||
df_numeric.loc[mask, '身高'] = np.nan
|
||
|
||
# 数据集2:分类列
|
||
df_categorical = pd.DataFrame({
|
||
'id': range(1, n_rows + 1),
|
||
'婚姻状况': np.random.choice(['已婚', '未婚', '离异'], n_rows),
|
||
'教育程度': np.random.choice(['本科', '硕士', '博士', '高中'], n_rows),
|
||
})
|
||
mask = np.random.random(n_rows) < 0.20
|
||
df_categorical.loc[mask, '婚姻状况'] = np.nan
|
||
|
||
# 数据集3:时间序列(前向/后向填充)
|
||
dates = pd.date_range('2024-01-01', periods=n_rows, freq='D')
|
||
df_timeseries = pd.DataFrame({
|
||
'date': dates,
|
||
'temperature': np.random.normal(20, 5, n_rows),
|
||
'humidity': np.random.uniform(40, 80, n_rows)
|
||
})
|
||
# 连续缺失
|
||
df_timeseries.loc[10:15, 'temperature'] = np.nan
|
||
df_timeseries.loc[30:32, 'humidity'] = np.nan
|
||
|
||
# 数据集4:边界情况
|
||
df_edge_cases = pd.DataFrame({
|
||
'id': range(1, 11),
|
||
'all_missing': [np.nan] * 10, # 100%缺失
|
||
'no_missing': range(1, 11), # 0%缺失
|
||
'half_missing': [1, np.nan, 3, np.nan, 5, np.nan, 7, np.nan, 9, np.nan],
|
||
})
|
||
|
||
# 数据集5:混合类型
|
||
df_mixed = pd.DataFrame({
|
||
'id': range(1, n_rows + 1),
|
||
'数值列': np.random.normal(100, 20, n_rows),
|
||
'分类列': np.random.choice(['A', 'B', 'C'], n_rows),
|
||
'整数列': np.random.randint(1, 100, n_rows),
|
||
})
|
||
mask = np.random.random(n_rows) < 0.10
|
||
df_mixed.loc[mask, '数值列'] = np.nan
|
||
df_mixed.loc[mask, '分类列'] = np.nan
|
||
|
||
datasets = {
|
||
'numeric': df_numeric,
|
||
'categorical': df_categorical,
|
||
'timeseries': df_timeseries,
|
||
'edge_cases': df_edge_cases,
|
||
'mixed': df_mixed
|
||
}
|
||
|
||
print_success(f"生成了 {len(datasets)} 个测试数据集")
|
||
for name, df in datasets.items():
|
||
print(f" • {name}: {df.shape[0]} 行 × {df.shape[1]} 列")
|
||
|
||
return datasets
|
||
|
||
def test_service_health(self) -> bool:
|
||
"""测试Python服务是否正常运行"""
|
||
print_info("检查Python服务状态...")
|
||
try:
|
||
response = requests.get(f"{PYTHON_SERVICE_URL}/health", timeout=5)
|
||
if response.status_code == 200:
|
||
print_success("Python服务运行正常")
|
||
return True
|
||
else:
|
||
print_error(f"Python服务响应异常: {response.status_code}")
|
||
return False
|
||
except Exception as e:
|
||
print_error(f"无法连接到Python服务: {str(e)}")
|
||
print_warning(f"请确保服务已启动: cd extraction_service && python main.py")
|
||
return False
|
||
|
||
def call_fillna_simple(self, data: List[Dict], column: str, new_column_name: str,
|
||
method: str, fill_value: Any = None) -> Dict:
|
||
"""调用简单填补API"""
|
||
payload = {
|
||
"data": data,
|
||
"column": column,
|
||
"new_column_name": new_column_name,
|
||
"method": method,
|
||
"fill_value": fill_value
|
||
}
|
||
|
||
response = requests.post(
|
||
f"{PYTHON_SERVICE_URL}/api/operations/fillna-simple",
|
||
json=payload,
|
||
timeout=30
|
||
)
|
||
return response.json()
|
||
|
||
def call_fillna_stats(self, data: List[Dict], column: str) -> Dict:
|
||
"""调用统计API"""
|
||
payload = {
|
||
"data": data,
|
||
"column": column
|
||
}
|
||
|
||
response = requests.post(
|
||
f"{PYTHON_SERVICE_URL}/api/operations/fillna-stats",
|
||
json=payload,
|
||
timeout=10
|
||
)
|
||
return response.json()
|
||
|
||
def call_fillna_mice(self, data: List[Dict], columns: List[str],
|
||
n_iterations: int = 10, random_state: int = 42) -> Dict:
|
||
"""调用MICE填补API"""
|
||
payload = {
|
||
"data": data,
|
||
"columns": columns,
|
||
"n_iterations": n_iterations,
|
||
"random_state": random_state
|
||
}
|
||
|
||
response = requests.post(
|
||
f"{PYTHON_SERVICE_URL}/api/operations/fillna-mice",
|
||
json=payload,
|
||
timeout=120
|
||
)
|
||
return response.json()
|
||
|
||
def verify_result(self, result: Dict, expected_keys: List[str]) -> bool:
|
||
"""验证结果是否包含必要的字段"""
|
||
if not result.get('success'):
|
||
print_error(f"API返回失败: {result.get('error', 'Unknown error')}")
|
||
return False
|
||
|
||
for key in expected_keys:
|
||
if key not in result:
|
||
print_error(f"结果缺少字段: {key}")
|
||
return False
|
||
|
||
return True
|
||
|
||
def verify_new_column_created(self, result_data: List[Dict], new_column: str,
|
||
original_column: str) -> bool:
|
||
"""验证新列是否创建"""
|
||
if not result_data:
|
||
print_error("结果数据为空")
|
||
return False
|
||
|
||
first_row = result_data[0]
|
||
|
||
if new_column not in first_row:
|
||
print_error(f"新列 '{new_column}' 未创建")
|
||
return False
|
||
|
||
if original_column not in first_row:
|
||
print_error(f"原列 '{original_column}' 丢失")
|
||
return False
|
||
|
||
return True
|
||
|
||
def verify_column_position(self, result_data: List[Dict], new_column: str,
|
||
original_column: str) -> bool:
|
||
"""验证新列是否在原列旁边"""
|
||
if not result_data:
|
||
return False
|
||
|
||
columns = list(result_data[0].keys())
|
||
|
||
try:
|
||
orig_idx = columns.index(original_column)
|
||
new_idx = columns.index(new_column)
|
||
|
||
if new_idx == orig_idx + 1:
|
||
print_success(f"✓ 新列位置正确(紧邻原列)")
|
||
return True
|
||
else:
|
||
print_warning(f"新列位置: {new_idx}, 原列位置: {orig_idx}")
|
||
return False
|
||
except ValueError as e:
|
||
print_error(f"列位置检查失败: {str(e)}")
|
||
return False
|
||
|
||
def count_missing_values(self, data: List[Dict], column: str) -> int:
|
||
"""统计缺失值数量"""
|
||
count = 0
|
||
for row in data:
|
||
val = row.get(column)
|
||
if val is None or (isinstance(val, float) and np.isnan(val)):
|
||
count += 1
|
||
return count
|
||
|
||
# ==================== 基础测试(6个)====================
|
||
|
||
def test_1_mean_fill(self, datasets: Dict) -> bool:
|
||
"""测试1: 均值填补数值列"""
|
||
print_test(1, 18, "均值填补数值列")
|
||
|
||
df = datasets['numeric']
|
||
data = df.to_dict('records')
|
||
|
||
# 调用API
|
||
result = self.call_fillna_simple(data, '体重', '体重_均值', 'mean')
|
||
|
||
# 验证
|
||
if not self.verify_result(result, ['result_data', 'message']):
|
||
return False
|
||
|
||
result_data = result['result_data']
|
||
|
||
if not self.verify_new_column_created(result_data, '体重_均值', '体重'):
|
||
return False
|
||
|
||
# 检查缺失值是否被填补
|
||
missing_count = self.count_missing_values(result_data, '体重_均值')
|
||
if missing_count > 0:
|
||
print_error(f"填补后仍有 {missing_count} 个缺失值")
|
||
return False
|
||
|
||
print_success("均值填补成功,缺失值已全部填补")
|
||
self.verify_column_position(result_data, '体重_均值', '体重')
|
||
|
||
return True
|
||
|
||
def test_2_median_fill(self, datasets: Dict) -> bool:
|
||
"""测试2: 中位数填补偏态分布列"""
|
||
print_test(2, 18, "中位数填补偏态分布列")
|
||
|
||
df = datasets['numeric']
|
||
data = df.to_dict('records')
|
||
|
||
result = self.call_fillna_simple(data, '身高', '身高_中位数', 'median')
|
||
|
||
if not self.verify_result(result, ['result_data', 'message']):
|
||
return False
|
||
|
||
result_data = result['result_data']
|
||
missing_count = self.count_missing_values(result_data, '身高_中位数')
|
||
|
||
if missing_count == 0:
|
||
print_success("中位数填补成功")
|
||
return True
|
||
else:
|
||
print_error(f"填补后仍有 {missing_count} 个缺失值")
|
||
return False
|
||
|
||
def test_3_mode_fill(self, datasets: Dict) -> bool:
|
||
"""测试3: 众数填补分类列"""
|
||
print_test(3, 18, "众数填补分类列")
|
||
|
||
df = datasets['categorical']
|
||
data = df.to_dict('records')
|
||
|
||
result = self.call_fillna_simple(data, '婚姻状况', '婚姻状况_众数', 'mode')
|
||
|
||
if not self.verify_result(result, ['result_data', 'message']):
|
||
return False
|
||
|
||
result_data = result['result_data']
|
||
missing_count = self.count_missing_values(result_data, '婚姻状况_众数')
|
||
|
||
if missing_count == 0:
|
||
print_success("众数填补成功")
|
||
return True
|
||
else:
|
||
print_error(f"填补后仍有 {missing_count} 个缺失值")
|
||
return False
|
||
|
||
def test_4_constant_fill(self, datasets: Dict) -> bool:
|
||
"""测试4: 固定值填补(0)"""
|
||
print_test(4, 18, "固定值填补(0)")
|
||
|
||
df = datasets['numeric']
|
||
data = df.to_dict('records')
|
||
|
||
result = self.call_fillna_simple(data, '体重', '体重_固定值', 'constant', fill_value=0)
|
||
|
||
if not self.verify_result(result, ['result_data', 'message']):
|
||
return False
|
||
|
||
result_data = result['result_data']
|
||
missing_count = self.count_missing_values(result_data, '体重_固定值')
|
||
|
||
# 检查是否有值被填充为0
|
||
filled_zeros = sum(1 for row in result_data if row.get('体重_固定值') == 0)
|
||
|
||
if missing_count == 0 and filled_zeros > 0:
|
||
print_success(f"固定值填补成功,填充了 {filled_zeros} 个0")
|
||
return True
|
||
else:
|
||
print_error("固定值填补失败")
|
||
return False
|
||
|
||
def test_5_ffill(self, datasets: Dict) -> bool:
|
||
"""测试5: 前向填充(ffill)⭐"""
|
||
print_test(5, 18, "前向填充(ffill)⭐")
|
||
|
||
df = datasets['timeseries']
|
||
data = df.to_dict('records')
|
||
|
||
result = self.call_fillna_simple(data, 'temperature', 'temperature_ffill', 'ffill')
|
||
|
||
if not self.verify_result(result, ['result_data', 'message']):
|
||
return False
|
||
|
||
result_data = result['result_data']
|
||
missing_count = self.count_missing_values(result_data, 'temperature_ffill')
|
||
|
||
if missing_count == 0:
|
||
print_success("前向填充成功 ⭐")
|
||
return True
|
||
else:
|
||
print_error(f"填充后仍有 {missing_count} 个缺失值")
|
||
return False
|
||
|
||
def test_6_bfill(self, datasets: Dict) -> bool:
|
||
"""测试6: 后向填充(bfill)⭐"""
|
||
print_test(6, 18, "后向填充(bfill)⭐")
|
||
|
||
df = datasets['timeseries']
|
||
data = df.to_dict('records')
|
||
|
||
result = self.call_fillna_simple(data, 'humidity', 'humidity_bfill', 'bfill')
|
||
|
||
if not self.verify_result(result, ['result_data', 'message']):
|
||
return False
|
||
|
||
result_data = result['result_data']
|
||
missing_count = self.count_missing_values(result_data, 'humidity_bfill')
|
||
|
||
if missing_count == 0:
|
||
print_success("后向填充成功 ⭐")
|
||
return True
|
||
else:
|
||
print_error(f"填充后仍有 {missing_count} 个缺失值")
|
||
return False
|
||
|
||
# ==================== MICE测试(4个)====================
|
||
|
||
def test_7_mice_single_column(self, datasets: Dict) -> bool:
|
||
"""测试7: MICE填补单列"""
|
||
print_test(7, 18, "MICE填补单列")
|
||
|
||
df = datasets['numeric']
|
||
data = df.to_dict('records')
|
||
|
||
result = self.call_fillna_mice(data, ['体重'], n_iterations=5)
|
||
|
||
if not self.verify_result(result, ['result_data', 'message']):
|
||
return False
|
||
|
||
result_data = result['result_data']
|
||
missing_count = self.count_missing_values(result_data, '体重_MICE')
|
||
|
||
if missing_count == 0:
|
||
print_success("MICE单列填补成功 ⭐⭐⭐")
|
||
return True
|
||
else:
|
||
print_error(f"填补后仍有 {missing_count} 个缺失值")
|
||
return False
|
||
|
||
def test_8_mice_multiple_columns(self, datasets: Dict) -> bool:
|
||
"""测试8: MICE填补多列"""
|
||
print_test(8, 18, "MICE填补多列")
|
||
|
||
df = datasets['numeric']
|
||
data = df.to_dict('records')
|
||
|
||
result = self.call_fillna_mice(data, ['体重', '身高'], n_iterations=5)
|
||
|
||
if not self.verify_result(result, ['result_data', 'message']):
|
||
return False
|
||
|
||
result_data = result['result_data']
|
||
|
||
# 检查两个新列是否都创建
|
||
if '体重_MICE' not in result_data[0] or '身高_MICE' not in result_data[0]:
|
||
print_error("MICE新列未全部创建")
|
||
return False
|
||
|
||
missing_count_1 = self.count_missing_values(result_data, '体重_MICE')
|
||
missing_count_2 = self.count_missing_values(result_data, '身高_MICE')
|
||
|
||
if missing_count_1 == 0 and missing_count_2 == 0:
|
||
print_success("MICE多列填补成功 ⭐⭐⭐")
|
||
return True
|
||
else:
|
||
print_error(f"填补不完整: 体重 {missing_count_1}, 身高 {missing_count_2}")
|
||
return False
|
||
|
||
def test_9_mice_iterations(self, datasets: Dict) -> bool:
|
||
"""测试9: MICE填补 - 不同迭代次数"""
|
||
print_test(9, 18, "MICE填补 - 不同迭代次数")
|
||
|
||
df = datasets['numeric']
|
||
data = df.to_dict('records')
|
||
|
||
# 测试5次迭代和20次迭代
|
||
result_5 = self.call_fillna_mice(data, ['体重'], n_iterations=5)
|
||
result_20 = self.call_fillna_mice(data, ['体重'], n_iterations=20)
|
||
|
||
if not result_5.get('success') or not result_20.get('success'):
|
||
print_error("不同迭代次数测试失败")
|
||
return False
|
||
|
||
print_success("不同迭代次数都能成功执行")
|
||
return True
|
||
|
||
def test_10_mice_random_state(self, datasets: Dict) -> bool:
|
||
"""测试10: MICE填补 - 自定义随机种子"""
|
||
print_test(10, 18, "MICE填补 - 自定义随机种子")
|
||
|
||
df = datasets['numeric']
|
||
data = df.to_dict('records')
|
||
|
||
# 相同随机种子应产生相同结果
|
||
result_1 = self.call_fillna_mice(data, ['体重'], random_state=42)
|
||
result_2 = self.call_fillna_mice(data, ['体重'], random_state=42)
|
||
|
||
if not result_1.get('success') or not result_2.get('success'):
|
||
print_error("随机种子测试失败")
|
||
return False
|
||
|
||
print_success("随机种子功能正常")
|
||
return True
|
||
|
||
# ==================== 边界测试(4个)====================
|
||
|
||
def test_11_all_missing(self, datasets: Dict) -> bool:
|
||
"""测试11: 100%缺失的列"""
|
||
print_test(11, 18, "100%缺失的列")
|
||
|
||
df = datasets['edge_cases']
|
||
data = df.to_dict('records')
|
||
|
||
result = self.call_fillna_simple(data, 'all_missing', 'all_missing_filled', 'mean')
|
||
|
||
# 这个应该失败或给出警告
|
||
if result.get('success'):
|
||
print_warning("100%缺失的列填补成功(可能填充了NaN)")
|
||
return True
|
||
else:
|
||
print_success("正确处理了100%缺失的情况")
|
||
return True
|
||
|
||
def test_12_no_missing(self, datasets: Dict) -> bool:
|
||
"""测试12: 0%缺失的列(无需填补)"""
|
||
print_test(12, 18, "0%缺失的列(无需填补)")
|
||
|
||
df = datasets['edge_cases']
|
||
data = df.to_dict('records')
|
||
|
||
result = self.call_fillna_simple(data, 'no_missing', 'no_missing_filled', 'mean')
|
||
|
||
if result.get('success'):
|
||
print_success("0%缺失的列处理正常")
|
||
return True
|
||
else:
|
||
print_error("处理无缺失列失败")
|
||
return False
|
||
|
||
def test_13_stats_api(self, datasets: Dict) -> bool:
|
||
"""测试13: 统计API功能"""
|
||
print_test(13, 18, "统计API功能")
|
||
|
||
df = datasets['numeric']
|
||
data = df.to_dict('records')
|
||
|
||
result = self.call_fillna_stats(data, '体重')
|
||
|
||
if not result.get('success'):
|
||
print_error("统计API失败")
|
||
return False
|
||
|
||
stats = result.get('stats', {})
|
||
required_fields = ['missing_count', 'missing_rate', 'valid_count', 'total_count']
|
||
|
||
for field in required_fields:
|
||
if field not in stats:
|
||
print_error(f"统计信息缺少字段: {field}")
|
||
return False
|
||
|
||
print_success(f"统计API正常 - 缺失率: {stats['missing_rate']}%")
|
||
print_info(f" 缺失: {stats['missing_count']}, 有效: {stats['valid_count']}")
|
||
return True
|
||
|
||
def test_14_column_name_with_special_chars(self, datasets: Dict) -> bool:
|
||
"""测试14: 特殊字符列名处理"""
|
||
print_test(14, 18, "特殊字符列名处理")
|
||
|
||
# 创建包含特殊字符的列名
|
||
df = pd.DataFrame({
|
||
'id': range(1, 11),
|
||
'体重(kg)': [60, np.nan, 70, 65, np.nan, 75, 80, np.nan, 68, 72]
|
||
})
|
||
data = df.to_dict('records')
|
||
|
||
result = self.call_fillna_simple(data, '体重(kg)', '体重(kg)_填补', 'median')
|
||
|
||
if result.get('success'):
|
||
print_success("特殊字符列名处理正常")
|
||
return True
|
||
else:
|
||
print_warning(f"特殊字符列名处理失败: {result.get('error')}")
|
||
return False
|
||
|
||
# ==================== 数据类型测试(4个)====================
|
||
|
||
def test_15_numeric_types(self, datasets: Dict) -> bool:
|
||
"""测试15: 数值列(int/float)"""
|
||
print_test(15, 18, "数值列(int/float)")
|
||
|
||
df = datasets['numeric']
|
||
data = df.to_dict('records')
|
||
|
||
# 测试整数列
|
||
result = self.call_fillna_simple(data, '年龄', '年龄_填补', 'median')
|
||
|
||
if result.get('success'):
|
||
print_success("数值类型处理正常")
|
||
return True
|
||
else:
|
||
print_error("数值类型处理失败")
|
||
return False
|
||
|
||
def test_16_categorical_types(self, datasets: Dict) -> bool:
|
||
"""测试16: 分类列(字符串)"""
|
||
print_test(16, 18, "分类列(字符串)")
|
||
|
||
df = datasets['categorical']
|
||
data = df.to_dict('records')
|
||
|
||
result = self.call_fillna_simple(data, '教育程度', '教育程度_填补', 'mode')
|
||
|
||
if result.get('success'):
|
||
print_success("分类类型处理正常")
|
||
return True
|
||
else:
|
||
print_error("分类类型处理失败")
|
||
return False
|
||
|
||
def test_17_mixed_types(self, datasets: Dict) -> bool:
|
||
"""测试17: 混合类型列"""
|
||
print_test(17, 18, "混合类型列")
|
||
|
||
df = datasets['mixed']
|
||
data = df.to_dict('records')
|
||
|
||
# MICE填补多种类型
|
||
result = self.call_fillna_mice(data, ['数值列', '整数列'], n_iterations=5)
|
||
|
||
if result.get('success'):
|
||
print_success("混合类型处理正常")
|
||
return True
|
||
else:
|
||
print_warning(f"混合类型处理: {result.get('error')}")
|
||
return False
|
||
|
||
def test_18_performance(self, datasets: Dict) -> bool:
|
||
"""测试18: 性能测试(大数据集)"""
|
||
print_test(18, 18, "性能测试(1000行)")
|
||
|
||
# 生成1000行数据
|
||
n_rows = 1000
|
||
df_large = pd.DataFrame({
|
||
'id': range(1, n_rows + 1),
|
||
'value1': np.random.normal(100, 20, n_rows),
|
||
'value2': np.random.normal(50, 10, n_rows),
|
||
})
|
||
mask = np.random.random(n_rows) < 0.10
|
||
df_large.loc[mask, 'value1'] = np.nan
|
||
df_large.loc[mask, 'value2'] = np.nan
|
||
|
||
data = df_large.to_dict('records')
|
||
|
||
start_time = time.time()
|
||
result = self.call_fillna_mice(data, ['value1', 'value2'], n_iterations=5)
|
||
elapsed = time.time() - start_time
|
||
|
||
if result.get('success'):
|
||
print_success(f"性能测试通过 - 耗时: {elapsed:.2f}秒")
|
||
if elapsed < 30:
|
||
print_success("性能优秀(<30秒)")
|
||
elif elapsed < 60:
|
||
print_info("性能良好(30-60秒)")
|
||
else:
|
||
print_warning(f"性能较慢({elapsed:.2f}秒)")
|
||
return True
|
||
else:
|
||
print_error("性能测试失败")
|
||
return False
|
||
|
||
# ==================== 主测试流程 ====================
|
||
|
||
def run_all_tests(self):
|
||
"""运行所有测试"""
|
||
print_header("缺失值处理功能 - 自动化测试")
|
||
|
||
self.start_time = time.time()
|
||
|
||
# 1. 检查服务状态
|
||
if not self.test_service_health():
|
||
print_error("Python服务未运行,无法继续测试")
|
||
return
|
||
|
||
# 2. 生成测试数据
|
||
datasets = self.generate_test_data()
|
||
|
||
# 3. 运行所有测试
|
||
tests = [
|
||
# 基础测试
|
||
(self.test_1_mean_fill, "基础"),
|
||
(self.test_2_median_fill, "基础"),
|
||
(self.test_3_mode_fill, "基础"),
|
||
(self.test_4_constant_fill, "基础"),
|
||
(self.test_5_ffill, "基础"),
|
||
(self.test_6_bfill, "基础"),
|
||
# MICE测试
|
||
(self.test_7_mice_single_column, "MICE"),
|
||
(self.test_8_mice_multiple_columns, "MICE"),
|
||
(self.test_9_mice_iterations, "MICE"),
|
||
(self.test_10_mice_random_state, "MICE"),
|
||
# 边界测试
|
||
(self.test_11_all_missing, "边界"),
|
||
(self.test_12_no_missing, "边界"),
|
||
(self.test_13_stats_api, "边界"),
|
||
(self.test_14_column_name_with_special_chars, "边界"),
|
||
# 数据类型测试
|
||
(self.test_15_numeric_types, "类型"),
|
||
(self.test_16_categorical_types, "类型"),
|
||
(self.test_17_mixed_types, "类型"),
|
||
(self.test_18_performance, "性能"),
|
||
]
|
||
|
||
for test_func, category in tests:
|
||
try:
|
||
if test_func(datasets):
|
||
self.passed += 1
|
||
else:
|
||
self.failed += 1
|
||
self.errors.append(f"{test_func.__name__}")
|
||
except Exception as e:
|
||
self.failed += 1
|
||
self.errors.append(f"{test_func.__name__}: {str(e)}")
|
||
print_error(f"测试异常: {str(e)}")
|
||
|
||
# 4. 输出总结
|
||
self.print_summary()
|
||
|
||
def print_summary(self):
|
||
"""输出测试总结"""
|
||
elapsed = time.time() - self.start_time
|
||
|
||
print_header("测试总结")
|
||
|
||
total = self.passed + self.failed
|
||
pass_rate = (self.passed / total * 100) if total > 0 else 0
|
||
|
||
print(f"{Colors.BOLD}总测试数: {total}{Colors.END}")
|
||
print(f"{Colors.GREEN}✅ 通过: {self.passed}{Colors.END}")
|
||
print(f"{Colors.RED}❌ 失败: {self.failed}{Colors.END}")
|
||
print(f"{Colors.CYAN}通过率: {pass_rate:.1f}%{Colors.END}")
|
||
print(f"{Colors.BLUE}总耗时: {elapsed:.2f}秒{Colors.END}")
|
||
|
||
if self.errors:
|
||
print(f"\n{Colors.RED}{Colors.BOLD}失败的测试:{Colors.END}")
|
||
for error in self.errors:
|
||
print(f" • {error}")
|
||
|
||
if self.failed == 0:
|
||
print(f"\n{Colors.GREEN}{Colors.BOLD}{'🎉 所有测试通过!'.center(80)}{Colors.END}")
|
||
else:
|
||
print(f"\n{Colors.YELLOW}{Colors.BOLD}{'⚠️ 部分测试失败,请检查错误信息'.center(80)}{Colors.END}")
|
||
|
||
|
||
if __name__ == "__main__":
|
||
print(f"{Colors.BOLD}{Colors.CYAN}")
|
||
print("""
|
||
╔══════════════════════════════════════════════════════════════════╗
|
||
║ ║
|
||
║ 缺失值处理功能 - 自动化测试脚本 v1.0 ║
|
||
║ ║
|
||
║ 测试内容: 18个测试用例 ║
|
||
║ - 6个基础填补测试 ║
|
||
║ - 4个MICE测试 ║
|
||
║ - 4个边界测试 ║
|
||
║ - 4个数据类型测试 ║
|
||
║ ║
|
||
╚══════════════════════════════════════════════════════════════════╝
|
||
""")
|
||
print(Colors.END)
|
||
|
||
suite = FillnaTestSuite()
|
||
suite.run_all_tests()
|
||
|