Summary: - Implement async file upload processing (Platform-Only pattern) - Add parseExcelWorker with pg-boss queue - Implement React Query polling mechanism - Add clean data caching (avoid duplicate parsing) - Fix pivot single-value column tuple issue - Optimize performance by 99 percent Technical Details: 1. Async Architecture (Postgres-Only): - SessionService.createSession: Fast upload + push to queue (3s) - parseExcelWorker: Background parsing + save clean data (53s) - SessionController.getSessionStatus: Status query API for polling - React Query Hook: useSessionStatus (auto-serial polling) - Frontend progress bar with real-time feedback 2. Performance Optimization: - Clean data caching: Worker saves processed data to OSS - getPreviewData: Read from clean data cache (0.5s vs 43s, -99 percent) - getFullData: Read from clean data cache (0.5s vs 43s, -99 percent) - Intelligent cleaning: Boundary detection + ghost column/row removal - Safety valve: Max 3000 columns, 5M cells 3. Bug Fixes: - Fix pivot column name tuple issue for single value column - Fix queue name format (colon to underscore: asl:screening -> asl_screening) - Fix polling storm (15+ concurrent requests -> 1 serial request) - Fix QUEUE_TYPE environment variable (memory -> pgboss) - Fix logger import in PgBossQueue - Fix formatSession to return cleanDataKey - Fix saveProcessedData to update clean data synchronously 4. Database Changes: - ALTER TABLE dc_tool_c_sessions ADD COLUMN clean_data_key VARCHAR(1000) - ALTER TABLE dc_tool_c_sessions ALTER COLUMN total_rows DROP NOT NULL - ALTER TABLE dc_tool_c_sessions ALTER COLUMN total_cols DROP NOT NULL - ALTER TABLE dc_tool_c_sessions ALTER COLUMN columns DROP NOT NULL 5. Documentation: - Create Postgres-Only async task processing guide (588 lines) - Update Tool C status document (Day 10 summary) - Update DC module status document - Update system overview document - Update cloud-native development guide Performance Improvements: - Upload + preview: 96s -> 53.5s (-44 percent) - Filter operation: 44s -> 2.5s (-94 percent) - Pivot operation: 45s -> 2.5s (-94 percent) - Concurrent requests: 15+ -> 1 (-93 percent) - Complete workflow (upload + 7 ops): 404s -> 70.5s (-83 percent) Files Changed: - Backend: 15 files (Worker, Service, Controller, Schema, Config) - Frontend: 4 files (Hook, Component, API) - Docs: 4 files (Guide, Status, Overview, Spec) - Database: 4 column modifications - Total: ~1388 lines of new/modified code Status: Fully tested and verified, production ready
128 lines
3.4 KiB
Python
128 lines
3.4 KiB
Python
"""
|
||
高级筛选操作
|
||
|
||
提供多条件筛选功能,支持AND/OR逻辑组合。
|
||
"""
|
||
|
||
import pandas as pd
|
||
from typing import List, Dict, Any, Literal
|
||
|
||
|
||
def apply_filter(
|
||
df: pd.DataFrame,
|
||
conditions: List[Dict[str, Any]],
|
||
logic: Literal['and', 'or'] = 'and'
|
||
) -> pd.DataFrame:
|
||
"""
|
||
应用筛选条件
|
||
|
||
Args:
|
||
df: 输入数据框
|
||
conditions: 筛选条件列表,每个条件包含:
|
||
- column: 列名
|
||
- operator: 运算符 (=, !=, >, <, >=, <=, contains, not_contains,
|
||
starts_with, ends_with, is_null, not_null)
|
||
- value: 值(is_null和not_null不需要)
|
||
logic: 逻辑组合方式 ('and' 或 'or')
|
||
|
||
Returns:
|
||
筛选后的数据框
|
||
|
||
Examples:
|
||
>>> df = pd.DataFrame({'年龄': [25, 35, 45], '性别': ['男', '女', '男']})
|
||
>>> conditions = [
|
||
... {'column': '年龄', 'operator': '>', 'value': 30},
|
||
... {'column': '性别', 'operator': '=', 'value': '男'}
|
||
... ]
|
||
>>> result = apply_filter(df, conditions, logic='and')
|
||
>>> len(result)
|
||
1
|
||
"""
|
||
if not conditions:
|
||
raise ValueError('筛选条件不能为空')
|
||
|
||
if df.empty:
|
||
return df
|
||
|
||
# 生成各个条件的mask
|
||
masks = []
|
||
for cond in conditions:
|
||
column = cond['column']
|
||
operator = cond['operator']
|
||
value = cond.get('value')
|
||
|
||
# 验证列是否存在
|
||
if column not in df.columns:
|
||
raise KeyError(f"列 '{column}' 不存在")
|
||
|
||
# 根据运算符生成mask
|
||
if operator == '=':
|
||
mask = df[column] == value
|
||
elif operator == '!=':
|
||
mask = df[column] != value
|
||
elif operator == '>':
|
||
mask = df[column] > value
|
||
elif operator == '<':
|
||
mask = df[column] < value
|
||
elif operator == '>=':
|
||
mask = df[column] >= value
|
||
elif operator == '<=':
|
||
mask = df[column] <= value
|
||
elif operator == 'contains':
|
||
mask = df[column].astype(str).str.contains(str(value), na=False)
|
||
elif operator == 'not_contains':
|
||
mask = ~df[column].astype(str).str.contains(str(value), na=False)
|
||
elif operator == 'starts_with':
|
||
mask = df[column].astype(str).str.startswith(str(value), na=False)
|
||
elif operator == 'ends_with':
|
||
mask = df[column].astype(str).str.endswith(str(value), na=False)
|
||
elif operator == 'is_null':
|
||
mask = df[column].isna()
|
||
elif operator == 'not_null':
|
||
mask = df[column].notna()
|
||
else:
|
||
raise ValueError(f"不支持的运算符: {operator}")
|
||
|
||
masks.append(mask)
|
||
|
||
# 组合所有条件
|
||
if logic == 'and':
|
||
final_mask = pd.concat(masks, axis=1).all(axis=1)
|
||
elif logic == 'or':
|
||
final_mask = pd.concat(masks, axis=1).any(axis=1)
|
||
else:
|
||
raise ValueError(f"不支持的逻辑运算: {logic}")
|
||
|
||
# 应用筛选
|
||
result = df[final_mask].copy()
|
||
|
||
# 打印统计信息
|
||
original_rows = len(df)
|
||
filtered_rows = len(result)
|
||
removed_rows = original_rows - filtered_rows
|
||
|
||
print(f'原始数据: {original_rows} 行')
|
||
print(f'筛选后: {filtered_rows} 行')
|
||
print(f'删除: {removed_rows} 行 ({removed_rows/original_rows*100:.1f}%)')
|
||
|
||
return result
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|