""" 缺失值处理功能 - 自动化测试脚本 测试所有18个测试用例: - 6个基础填补测试 - 4个MICE测试 - 4个边界测试 - 4个数据类型测试 使用方法: python tests/test_fillna_operations.py """ import pandas as pd import numpy as np import requests import time import json from typing import Dict, List, Any from datetime import datetime import sys import os # 配置 PYTHON_SERVICE_URL = "http://localhost:8000" TEST_DATA_DIR = "tests/test_data" # 颜色输出 class Colors: GREEN = '\033[92m' RED = '\033[91m' YELLOW = '\033[93m' BLUE = '\033[94m' CYAN = '\033[96m' BOLD = '\033[1m' END = '\033[0m' def print_header(text: str): print(f"\n{Colors.BOLD}{Colors.CYAN}{'='*80}{Colors.END}") print(f"{Colors.BOLD}{Colors.CYAN}{text.center(80)}{Colors.END}") print(f"{Colors.BOLD}{Colors.CYAN}{'='*80}{Colors.END}\n") def print_test(test_num: int, total: int, name: str): print(f"\n{Colors.BOLD}[{test_num}/{total}] {name}{Colors.END}") print("-" * 80) def print_success(message: str): print(f"{Colors.GREEN}✅ {message}{Colors.END}") def print_error(message: str): print(f"{Colors.RED}❌ {message}{Colors.END}") def print_warning(message: str): print(f"{Colors.YELLOW}⚠️ {message}{Colors.END}") def print_info(message: str): print(f"{Colors.BLUE}ℹ️ {message}{Colors.END}") class FillnaTestSuite: def __init__(self): self.passed = 0 self.failed = 0 self.errors = [] self.start_time = None # 确保测试数据目录存在 os.makedirs(TEST_DATA_DIR, exist_ok=True) def generate_test_data(self) -> Dict[str, pd.DataFrame]: """生成各种测试数据集""" print_info("生成测试数据...") np.random.seed(42) n_rows = 100 # 数据集1:数值列(正态分布) df_numeric = pd.DataFrame({ 'id': range(1, n_rows + 1), '体重': np.random.normal(65, 10, n_rows), '身高': np.random.normal(170, 8, n_rows), '年龄': np.random.randint(20, 60, n_rows) }) # 随机插入15%缺失值 mask = np.random.random(n_rows) < 0.15 df_numeric.loc[mask, '体重'] = np.nan mask = np.random.random(n_rows) < 0.15 df_numeric.loc[mask, '身高'] = np.nan # 数据集2:分类列 df_categorical = pd.DataFrame({ 'id': range(1, n_rows + 1), '婚姻状况': np.random.choice(['已婚', '未婚', '离异'], n_rows), '教育程度': np.random.choice(['本科', '硕士', '博士', '高中'], n_rows), }) mask = np.random.random(n_rows) < 0.20 df_categorical.loc[mask, '婚姻状况'] = np.nan # 数据集3:时间序列(前向/后向填充) dates = pd.date_range('2024-01-01', periods=n_rows, freq='D') df_timeseries = pd.DataFrame({ 'date': dates, 'temperature': np.random.normal(20, 5, n_rows), 'humidity': np.random.uniform(40, 80, n_rows) }) # 连续缺失 df_timeseries.loc[10:15, 'temperature'] = np.nan df_timeseries.loc[30:32, 'humidity'] = np.nan # 数据集4:边界情况 df_edge_cases = pd.DataFrame({ 'id': range(1, 11), 'all_missing': [np.nan] * 10, # 100%缺失 'no_missing': range(1, 11), # 0%缺失 'half_missing': [1, np.nan, 3, np.nan, 5, np.nan, 7, np.nan, 9, np.nan], }) # 数据集5:混合类型 df_mixed = pd.DataFrame({ 'id': range(1, n_rows + 1), '数值列': np.random.normal(100, 20, n_rows), '分类列': np.random.choice(['A', 'B', 'C'], n_rows), '整数列': np.random.randint(1, 100, n_rows), }) mask = np.random.random(n_rows) < 0.10 df_mixed.loc[mask, '数值列'] = np.nan df_mixed.loc[mask, '分类列'] = np.nan datasets = { 'numeric': df_numeric, 'categorical': df_categorical, 'timeseries': df_timeseries, 'edge_cases': df_edge_cases, 'mixed': df_mixed } print_success(f"生成了 {len(datasets)} 个测试数据集") for name, df in datasets.items(): print(f" • {name}: {df.shape[0]} 行 × {df.shape[1]} 列") return datasets def test_service_health(self) -> bool: """测试Python服务是否正常运行""" print_info("检查Python服务状态...") try: response = requests.get(f"{PYTHON_SERVICE_URL}/health", timeout=5) if response.status_code == 200: print_success("Python服务运行正常") return True else: print_error(f"Python服务响应异常: {response.status_code}") return False except Exception as e: print_error(f"无法连接到Python服务: {str(e)}") print_warning(f"请确保服务已启动: cd extraction_service && python main.py") return False def call_fillna_simple(self, data: List[Dict], column: str, new_column_name: str, method: str, fill_value: Any = None) -> Dict: """调用简单填补API""" payload = { "data": data, "column": column, "new_column_name": new_column_name, "method": method, "fill_value": fill_value } response = requests.post( f"{PYTHON_SERVICE_URL}/api/operations/fillna-simple", json=payload, timeout=30 ) return response.json() def call_fillna_stats(self, data: List[Dict], column: str) -> Dict: """调用统计API""" payload = { "data": data, "column": column } response = requests.post( f"{PYTHON_SERVICE_URL}/api/operations/fillna-stats", json=payload, timeout=10 ) return response.json() def call_fillna_mice(self, data: List[Dict], columns: List[str], n_iterations: int = 10, random_state: int = 42) -> Dict: """调用MICE填补API""" payload = { "data": data, "columns": columns, "n_iterations": n_iterations, "random_state": random_state } response = requests.post( f"{PYTHON_SERVICE_URL}/api/operations/fillna-mice", json=payload, timeout=120 ) return response.json() def verify_result(self, result: Dict, expected_keys: List[str]) -> bool: """验证结果是否包含必要的字段""" if not result.get('success'): print_error(f"API返回失败: {result.get('error', 'Unknown error')}") return False for key in expected_keys: if key not in result: print_error(f"结果缺少字段: {key}") return False return True def verify_new_column_created(self, result_data: List[Dict], new_column: str, original_column: str) -> bool: """验证新列是否创建""" if not result_data: print_error("结果数据为空") return False first_row = result_data[0] if new_column not in first_row: print_error(f"新列 '{new_column}' 未创建") return False if original_column not in first_row: print_error(f"原列 '{original_column}' 丢失") return False return True def verify_column_position(self, result_data: List[Dict], new_column: str, original_column: str) -> bool: """验证新列是否在原列旁边""" if not result_data: return False columns = list(result_data[0].keys()) try: orig_idx = columns.index(original_column) new_idx = columns.index(new_column) if new_idx == orig_idx + 1: print_success(f"✓ 新列位置正确(紧邻原列)") return True else: print_warning(f"新列位置: {new_idx}, 原列位置: {orig_idx}") return False except ValueError as e: print_error(f"列位置检查失败: {str(e)}") return False def count_missing_values(self, data: List[Dict], column: str) -> int: """统计缺失值数量""" count = 0 for row in data: val = row.get(column) if val is None or (isinstance(val, float) and np.isnan(val)): count += 1 return count # ==================== 基础测试(6个)==================== def test_1_mean_fill(self, datasets: Dict) -> bool: """测试1: 均值填补数值列""" print_test(1, 18, "均值填补数值列") df = datasets['numeric'] data = df.to_dict('records') # 调用API result = self.call_fillna_simple(data, '体重', '体重_均值', 'mean') # 验证 if not self.verify_result(result, ['result_data', 'message']): return False result_data = result['result_data'] if not self.verify_new_column_created(result_data, '体重_均值', '体重'): return False # 检查缺失值是否被填补 missing_count = self.count_missing_values(result_data, '体重_均值') if missing_count > 0: print_error(f"填补后仍有 {missing_count} 个缺失值") return False print_success("均值填补成功,缺失值已全部填补") self.verify_column_position(result_data, '体重_均值', '体重') return True def test_2_median_fill(self, datasets: Dict) -> bool: """测试2: 中位数填补偏态分布列""" print_test(2, 18, "中位数填补偏态分布列") df = datasets['numeric'] data = df.to_dict('records') result = self.call_fillna_simple(data, '身高', '身高_中位数', 'median') if not self.verify_result(result, ['result_data', 'message']): return False result_data = result['result_data'] missing_count = self.count_missing_values(result_data, '身高_中位数') if missing_count == 0: print_success("中位数填补成功") return True else: print_error(f"填补后仍有 {missing_count} 个缺失值") return False def test_3_mode_fill(self, datasets: Dict) -> bool: """测试3: 众数填补分类列""" print_test(3, 18, "众数填补分类列") df = datasets['categorical'] data = df.to_dict('records') result = self.call_fillna_simple(data, '婚姻状况', '婚姻状况_众数', 'mode') if not self.verify_result(result, ['result_data', 'message']): return False result_data = result['result_data'] missing_count = self.count_missing_values(result_data, '婚姻状况_众数') if missing_count == 0: print_success("众数填补成功") return True else: print_error(f"填补后仍有 {missing_count} 个缺失值") return False def test_4_constant_fill(self, datasets: Dict) -> bool: """测试4: 固定值填补(0)""" print_test(4, 18, "固定值填补(0)") df = datasets['numeric'] data = df.to_dict('records') result = self.call_fillna_simple(data, '体重', '体重_固定值', 'constant', fill_value=0) if not self.verify_result(result, ['result_data', 'message']): return False result_data = result['result_data'] missing_count = self.count_missing_values(result_data, '体重_固定值') # 检查是否有值被填充为0 filled_zeros = sum(1 for row in result_data if row.get('体重_固定值') == 0) if missing_count == 0 and filled_zeros > 0: print_success(f"固定值填补成功,填充了 {filled_zeros} 个0") return True else: print_error("固定值填补失败") return False def test_5_ffill(self, datasets: Dict) -> bool: """测试5: 前向填充(ffill)⭐""" print_test(5, 18, "前向填充(ffill)⭐") df = datasets['timeseries'] data = df.to_dict('records') result = self.call_fillna_simple(data, 'temperature', 'temperature_ffill', 'ffill') if not self.verify_result(result, ['result_data', 'message']): return False result_data = result['result_data'] missing_count = self.count_missing_values(result_data, 'temperature_ffill') if missing_count == 0: print_success("前向填充成功 ⭐") return True else: print_error(f"填充后仍有 {missing_count} 个缺失值") return False def test_6_bfill(self, datasets: Dict) -> bool: """测试6: 后向填充(bfill)⭐""" print_test(6, 18, "后向填充(bfill)⭐") df = datasets['timeseries'] data = df.to_dict('records') result = self.call_fillna_simple(data, 'humidity', 'humidity_bfill', 'bfill') if not self.verify_result(result, ['result_data', 'message']): return False result_data = result['result_data'] missing_count = self.count_missing_values(result_data, 'humidity_bfill') if missing_count == 0: print_success("后向填充成功 ⭐") return True else: print_error(f"填充后仍有 {missing_count} 个缺失值") return False # ==================== MICE测试(4个)==================== def test_7_mice_single_column(self, datasets: Dict) -> bool: """测试7: MICE填补单列""" print_test(7, 18, "MICE填补单列") df = datasets['numeric'] data = df.to_dict('records') result = self.call_fillna_mice(data, ['体重'], n_iterations=5) if not self.verify_result(result, ['result_data', 'message']): return False result_data = result['result_data'] missing_count = self.count_missing_values(result_data, '体重_MICE') if missing_count == 0: print_success("MICE单列填补成功 ⭐⭐⭐") return True else: print_error(f"填补后仍有 {missing_count} 个缺失值") return False def test_8_mice_multiple_columns(self, datasets: Dict) -> bool: """测试8: MICE填补多列""" print_test(8, 18, "MICE填补多列") df = datasets['numeric'] data = df.to_dict('records') result = self.call_fillna_mice(data, ['体重', '身高'], n_iterations=5) if not self.verify_result(result, ['result_data', 'message']): return False result_data = result['result_data'] # 检查两个新列是否都创建 if '体重_MICE' not in result_data[0] or '身高_MICE' not in result_data[0]: print_error("MICE新列未全部创建") return False missing_count_1 = self.count_missing_values(result_data, '体重_MICE') missing_count_2 = self.count_missing_values(result_data, '身高_MICE') if missing_count_1 == 0 and missing_count_2 == 0: print_success("MICE多列填补成功 ⭐⭐⭐") return True else: print_error(f"填补不完整: 体重 {missing_count_1}, 身高 {missing_count_2}") return False def test_9_mice_iterations(self, datasets: Dict) -> bool: """测试9: MICE填补 - 不同迭代次数""" print_test(9, 18, "MICE填补 - 不同迭代次数") df = datasets['numeric'] data = df.to_dict('records') # 测试5次迭代和20次迭代 result_5 = self.call_fillna_mice(data, ['体重'], n_iterations=5) result_20 = self.call_fillna_mice(data, ['体重'], n_iterations=20) if not result_5.get('success') or not result_20.get('success'): print_error("不同迭代次数测试失败") return False print_success("不同迭代次数都能成功执行") return True def test_10_mice_random_state(self, datasets: Dict) -> bool: """测试10: MICE填补 - 自定义随机种子""" print_test(10, 18, "MICE填补 - 自定义随机种子") df = datasets['numeric'] data = df.to_dict('records') # 相同随机种子应产生相同结果 result_1 = self.call_fillna_mice(data, ['体重'], random_state=42) result_2 = self.call_fillna_mice(data, ['体重'], random_state=42) if not result_1.get('success') or not result_2.get('success'): print_error("随机种子测试失败") return False print_success("随机种子功能正常") return True # ==================== 边界测试(4个)==================== def test_11_all_missing(self, datasets: Dict) -> bool: """测试11: 100%缺失的列""" print_test(11, 18, "100%缺失的列") df = datasets['edge_cases'] data = df.to_dict('records') result = self.call_fillna_simple(data, 'all_missing', 'all_missing_filled', 'mean') # 这个应该失败或给出警告 if result.get('success'): print_warning("100%缺失的列填补成功(可能填充了NaN)") return True else: print_success("正确处理了100%缺失的情况") return True def test_12_no_missing(self, datasets: Dict) -> bool: """测试12: 0%缺失的列(无需填补)""" print_test(12, 18, "0%缺失的列(无需填补)") df = datasets['edge_cases'] data = df.to_dict('records') result = self.call_fillna_simple(data, 'no_missing', 'no_missing_filled', 'mean') if result.get('success'): print_success("0%缺失的列处理正常") return True else: print_error("处理无缺失列失败") return False def test_13_stats_api(self, datasets: Dict) -> bool: """测试13: 统计API功能""" print_test(13, 18, "统计API功能") df = datasets['numeric'] data = df.to_dict('records') result = self.call_fillna_stats(data, '体重') if not result.get('success'): print_error("统计API失败") return False stats = result.get('stats', {}) required_fields = ['missing_count', 'missing_rate', 'valid_count', 'total_count'] for field in required_fields: if field not in stats: print_error(f"统计信息缺少字段: {field}") return False print_success(f"统计API正常 - 缺失率: {stats['missing_rate']}%") print_info(f" 缺失: {stats['missing_count']}, 有效: {stats['valid_count']}") return True def test_14_column_name_with_special_chars(self, datasets: Dict) -> bool: """测试14: 特殊字符列名处理""" print_test(14, 18, "特殊字符列名处理") # 创建包含特殊字符的列名 df = pd.DataFrame({ 'id': range(1, 11), '体重(kg)': [60, np.nan, 70, 65, np.nan, 75, 80, np.nan, 68, 72] }) data = df.to_dict('records') result = self.call_fillna_simple(data, '体重(kg)', '体重(kg)_填补', 'median') if result.get('success'): print_success("特殊字符列名处理正常") return True else: print_warning(f"特殊字符列名处理失败: {result.get('error')}") return False # ==================== 数据类型测试(4个)==================== def test_15_numeric_types(self, datasets: Dict) -> bool: """测试15: 数值列(int/float)""" print_test(15, 18, "数值列(int/float)") df = datasets['numeric'] data = df.to_dict('records') # 测试整数列 result = self.call_fillna_simple(data, '年龄', '年龄_填补', 'median') if result.get('success'): print_success("数值类型处理正常") return True else: print_error("数值类型处理失败") return False def test_16_categorical_types(self, datasets: Dict) -> bool: """测试16: 分类列(字符串)""" print_test(16, 18, "分类列(字符串)") df = datasets['categorical'] data = df.to_dict('records') result = self.call_fillna_simple(data, '教育程度', '教育程度_填补', 'mode') if result.get('success'): print_success("分类类型处理正常") return True else: print_error("分类类型处理失败") return False def test_17_mixed_types(self, datasets: Dict) -> bool: """测试17: 混合类型列""" print_test(17, 18, "混合类型列") df = datasets['mixed'] data = df.to_dict('records') # MICE填补多种类型 result = self.call_fillna_mice(data, ['数值列', '整数列'], n_iterations=5) if result.get('success'): print_success("混合类型处理正常") return True else: print_warning(f"混合类型处理: {result.get('error')}") return False def test_18_performance(self, datasets: Dict) -> bool: """测试18: 性能测试(大数据集)""" print_test(18, 18, "性能测试(1000行)") # 生成1000行数据 n_rows = 1000 df_large = pd.DataFrame({ 'id': range(1, n_rows + 1), 'value1': np.random.normal(100, 20, n_rows), 'value2': np.random.normal(50, 10, n_rows), }) mask = np.random.random(n_rows) < 0.10 df_large.loc[mask, 'value1'] = np.nan df_large.loc[mask, 'value2'] = np.nan data = df_large.to_dict('records') start_time = time.time() result = self.call_fillna_mice(data, ['value1', 'value2'], n_iterations=5) elapsed = time.time() - start_time if result.get('success'): print_success(f"性能测试通过 - 耗时: {elapsed:.2f}秒") if elapsed < 30: print_success("性能优秀(<30秒)") elif elapsed < 60: print_info("性能良好(30-60秒)") else: print_warning(f"性能较慢({elapsed:.2f}秒)") return True else: print_error("性能测试失败") return False # ==================== 主测试流程 ==================== def run_all_tests(self): """运行所有测试""" print_header("缺失值处理功能 - 自动化测试") self.start_time = time.time() # 1. 检查服务状态 if not self.test_service_health(): print_error("Python服务未运行,无法继续测试") return # 2. 生成测试数据 datasets = self.generate_test_data() # 3. 运行所有测试 tests = [ # 基础测试 (self.test_1_mean_fill, "基础"), (self.test_2_median_fill, "基础"), (self.test_3_mode_fill, "基础"), (self.test_4_constant_fill, "基础"), (self.test_5_ffill, "基础"), (self.test_6_bfill, "基础"), # MICE测试 (self.test_7_mice_single_column, "MICE"), (self.test_8_mice_multiple_columns, "MICE"), (self.test_9_mice_iterations, "MICE"), (self.test_10_mice_random_state, "MICE"), # 边界测试 (self.test_11_all_missing, "边界"), (self.test_12_no_missing, "边界"), (self.test_13_stats_api, "边界"), (self.test_14_column_name_with_special_chars, "边界"), # 数据类型测试 (self.test_15_numeric_types, "类型"), (self.test_16_categorical_types, "类型"), (self.test_17_mixed_types, "类型"), (self.test_18_performance, "性能"), ] for test_func, category in tests: try: if test_func(datasets): self.passed += 1 else: self.failed += 1 self.errors.append(f"{test_func.__name__}") except Exception as e: self.failed += 1 self.errors.append(f"{test_func.__name__}: {str(e)}") print_error(f"测试异常: {str(e)}") # 4. 输出总结 self.print_summary() def print_summary(self): """输出测试总结""" elapsed = time.time() - self.start_time print_header("测试总结") total = self.passed + self.failed pass_rate = (self.passed / total * 100) if total > 0 else 0 print(f"{Colors.BOLD}总测试数: {total}{Colors.END}") print(f"{Colors.GREEN}✅ 通过: {self.passed}{Colors.END}") print(f"{Colors.RED}❌ 失败: {self.failed}{Colors.END}") print(f"{Colors.CYAN}通过率: {pass_rate:.1f}%{Colors.END}") print(f"{Colors.BLUE}总耗时: {elapsed:.2f}秒{Colors.END}") if self.errors: print(f"\n{Colors.RED}{Colors.BOLD}失败的测试:{Colors.END}") for error in self.errors: print(f" • {error}") if self.failed == 0: print(f"\n{Colors.GREEN}{Colors.BOLD}{'🎉 所有测试通过!'.center(80)}{Colors.END}") else: print(f"\n{Colors.YELLOW}{Colors.BOLD}{'⚠️ 部分测试失败,请检查错误信息'.center(80)}{Colors.END}") if __name__ == "__main__": print(f"{Colors.BOLD}{Colors.CYAN}") print(""" ╔══════════════════════════════════════════════════════════════════╗ ║ ║ ║ 缺失值处理功能 - 自动化测试脚本 v1.0 ║ ║ ║ ║ 测试内容: 18个测试用例 ║ ║ - 6个基础填补测试 ║ ║ - 4个MICE测试 ║ ║ - 4个边界测试 ║ ║ - 4个数据类型测试 ║ ║ ║ ╚══════════════════════════════════════════════════════════════════╝ """) print(Colors.END) suite = FillnaTestSuite() suite.run_all_tests()