Files
HaHafeng 200eab5c2e feat(dc-tool-c): Tool C UX重大改进 - 列头筛选/行号/滚动条/全量数据
新功能
- 列头筛选:Excel风格筛选功能(Community版本,中文本地化,显示唯一值及计数)
- 行号列:添加固定行号列(#列头,灰色背景,左侧固定)
- 全量数据加载:不再限制50行预览,Session加载全量数据
- 全量数据返回:所有快速操作(筛选/映射/分箱/条件/删NA/计算/Pivot)全量返回结果

 Bug修复
- 滚动条终极修复:修改MainLayout为固定高度(h-screen + overflow-hidden),整个浏览器窗口无滚动条,只有AG Grid内部滚动
- 计算列全角字符修复:自动转换中文括号等全角字符为半角
- 计算列特殊字符列名修复:完善列别名机制,支持任意特殊字符列名

 UI优化
- 删除'表格仅展示前50行'提示条,减少干扰
- 筛选对话框美化:白色背景,圆角,阴影
- 列头筛选图标优化:清晰可见,易于点击

 文档更新
- 工具C_功能按钮开发计划_V1.0.md:添加V1.5版本记录
- 工具C_MVP开发_TODO清单.md:添加Day 8 UX优化内容
- 00-工具C当前状态与开发指南.md:更新进度为98%
- 00-模块当前状态与开发指南.md:更新DC模块状态
- 00-系统当前状态与开发指南.md:更新系统整体状态

 影响范围
- Python微服务:无修改
- Node.js后端:5处代码修改(SessionService + QuickActionController + AICodeService)
- 前端:MainLayout + DataGrid + ag-grid-custom.css + index.tsx
- 完成度:Tool C整体完成度提升至98%

 代码统计
- 修改文件:~15个文件
- 新增行数:~200行
- 修改行数:~150行

Co-authored-by: AI Assistant <assistant@example.com>
2025-12-10 18:02:42 +08:00

392 lines
14 KiB
Python
Raw Permalink Blame History

This file contains invisible Unicode characters
This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
计算列 - 预写函数
基于公式计算新列,支持数学运算和常用函数
"""
import pandas as pd
import numpy as np
import re
from typing import Dict, Any
def normalize_formula(formula: str) -> str:
"""
规范化公式字符串:将全角字符转换为半角字符
解决用户使用中文输入法导致的全角字符问题
Args:
formula: 原始公式字符串
Returns:
规范化后的公式字符串
示例:
normalize_formula("体重/(身高/100)**2")
-> "体重/(身高/100)**2"
"""
# 全角到半角的映射
full_to_half = {
# 括号
'': '(',
'': ')',
'': '[',
'': ']',
'': '{',
'': '}',
# 运算符
'': '+',
'': '-',
'': '*',
'': '/',
'': '%',
# 数字
'': '0', '': '1', '': '2', '': '3', '': '4',
'': '5', '': '6', '': '7', '': '8', '': '9',
# 其他符号
'': '.',
'': ',',
'': ':',
'': ';',
'': '=',
'': '<',
'': '>',
' ': ' ', # 全角空格
}
result = formula
for full_char, half_char in full_to_half.items():
result = result.replace(full_char, half_char)
return result
# 允许的函数(安全白名单)
ALLOWED_FUNCTIONS = {
'abs': abs,
'round': round,
'sqrt': np.sqrt,
'log': np.log,
'log10': np.log10,
'exp': np.exp,
'sin': np.sin,
'cos': np.cos,
'tan': np.tan,
'floor': np.floor,
'ceil': np.ceil,
'min': min,
'max': max,
}
def validate_formula(formula: str, available_columns: list) -> tuple[bool, str]:
"""
验证公式安全性和正确性
Args:
formula: 公式字符串
available_columns: 可用的列名列表
Returns:
(is_valid, error_message)
"""
# 检查是否为空
if not formula or not formula.strip():
return False, '公式不能为空'
# 检查危险操作
dangerous_patterns = [
r'__', # 双下划线Python内部属性
r'import\s', # import语句
r'exec\s', # exec函数
r'eval\s', # eval函数
r'open\s*\(', # 文件操作
r'compile\s*\(', # 编译函数
r'globals\s*\(', # 全局变量
r'locals\s*\(', # 局部变量
r'__builtins__', # 内置函数
]
for pattern in dangerous_patterns:
if re.search(pattern, formula, re.IGNORECASE):
return False, f'公式包含不允许的操作: {pattern}'
# ✨ 增强:检查是否只包含允许的字符(放宽限制,支持更多特殊字符)
# 允许:英文字母、数字、下划线、中文、空格、运算符、括号(中英文)、逗号(中英文)、点、冒号、等号
allowed_chars = r'[a-zA-Z0-9_\u4e00-\u9fa5\s\+\-\*/\(\)\[\]\{\}\.,:\*\*=()【】、。:;!?]'
if not re.match(f'^{allowed_chars}+$', formula):
# 找出不允许的字符
invalid_chars = set(re.findall(f'[^{allowed_chars}]', formula))
return False, f'公式包含不允许的字符: {", ".join(invalid_chars)}'
return True, ''
def compute_column(
df: pd.DataFrame,
new_column_name: str,
formula: str,
column_mapping: list = None # ⭐ 新增参数(兼容旧版本调用)
) -> pd.DataFrame:
"""
基于公式计算新列
Args:
df: 输入数据框
new_column_name: 新列名称
formula: 计算公式
- 支持列名引用(如:身高, 体重)
- 支持运算符(+, -, *, /, **, %
- 支持函数abs, round, sqrt, log, exp等
column_mapping: 列名映射(可选)
- 用于处理带空格/特殊字符的列名
Returns:
添加了新列的数据框
示例:
# BMI计算
compute_column(df, 'BMI', '体重 / (身高/100)**2')
# 年龄平方根
compute_column(df, '年龄_sqrt', 'sqrt(年龄)')
# 复杂公式
compute_column(df, '综合得分', '(FMA*0.6 + ADL*0.4) / 100')
"""
result = df.copy()
print(f'计算新列: {new_column_name}')
print(f'原始公式: {formula}')
# ⭐ 步骤1规范化公式转换全角字符
formula = normalize_formula(formula)
print(f'规范化后: {formula}')
print(f'收到列名映射: {len(column_mapping or [])} 个列')
print('')
# ⭐ 步骤2彻底解决方案 - 使用Node.js传来的column_mapping
env = {}
formula_safe = formula
normalized_mapping = [] # ⭐ 在外面定义,供后续使用
if column_mapping and len(column_mapping) > 0:
print('使用传入的列名映射(支持任意特殊字符):')
# ⭐ 关键修复对originalName也做normalize确保匹配成功
# 原因:用户输入"身高(cm)",但列名是"身高Cm"normalize后才能匹配
for mapping in column_mapping:
original_name = mapping.get('originalName', '')
safe_name = mapping.get('safeName', '')
# ⭐ 对列名也做normalize全角→半角
normalized_name = normalize_formula(original_name)
normalized_mapping.append({
'originalName': original_name, # 保留原始名用于访问DataFrame
'normalizedName': normalized_name, # 标准化名(用于匹配)
'safeName': safe_name
})
# ⭐ 关键:按标准化后的列名长度排序(从长到短),避免部分匹配
sorted_mapping = sorted(
normalized_mapping,
key=lambda x: len(x['normalizedName']),
reverse=True
)
# ⭐ 使用简单字符串replace不用正则彻底解决特殊字符问题
for mapping in sorted_mapping:
normalized_name = mapping['normalizedName']
safe_name = mapping['safeName']
original_name = mapping['originalName']
if normalized_name and safe_name:
# ⭐ 在标准化后的空间匹配(全角→半角后)
if normalized_name in formula_safe:
formula_safe = formula_safe.replace(normalized_name, safe_name)
print(f' "{original_name}" (标准化: "{normalized_name}") -> {safe_name}')
# ⭐ 准备执行环境使用safeName作为变量名
print('')
print('准备执行环境:')
for mapping in normalized_mapping:
original_name = mapping['originalName'] # 使用原始列名访问DataFrame
safe_name = mapping['safeName']
if original_name and safe_name and original_name in result.columns:
# 尝试转换为数值类型
try:
numeric_col = pd.to_numeric(result[original_name], errors='coerce')
if not numeric_col.isna().all():
env[safe_name] = numeric_col
print(f' {safe_name} = DataFrame["{original_name}"] (数值)')
else:
env[safe_name] = result[original_name]
print(f' {safe_name} = DataFrame["{original_name}"]')
except Exception:
env[safe_name] = result[original_name]
print(f' {safe_name} = DataFrame["{original_name}"]')
else:
# ⭐ Fallback兼容旧版本没有column_mapping时
print('⚠️ 未传入列名映射,使用自动生成(可能不支持特殊字符):')
for i, col in enumerate(result.columns):
safe_var = f'col_{i}'
# 简单替换(尽力而为)
if col in formula_safe:
formula_safe = formula_safe.replace(col, safe_var)
print(f' "{col}" -> {safe_var}')
# 准备执行环境
try:
numeric_col = pd.to_numeric(result[col], errors='coerce')
if not numeric_col.isna().all():
env[safe_var] = numeric_col
else:
env[safe_var] = result[col]
except Exception:
env[safe_var] = result[col]
# 验证公式(使用转换后的安全公式)
# 注意validate_formula现在检查的是别名后的公式所以会失败
# 我们跳过验证,或者只做基本的安全检查
print('')
print(f'最终公式: {formula_safe}')
# 基本安全检查(不依赖列名)
dangerous_patterns = ['__', 'import', 'exec', 'eval', 'open', 'compile', 'globals', 'locals', '__builtins__']
for pattern in dangerous_patterns:
if pattern in formula_safe.lower():
raise ValueError(f'公式包含不允许的操作: {pattern}')
# 2. 添加允许的函数到执行环境
env.update(ALLOWED_FUNCTIONS)
# 3. 添加numpy用于数学运算
env['np'] = np
print('')
print(f'准备执行公式: {formula_safe}')
print('')
try:
# ✨ 使用转换后的安全公式执行计算
computed_values = eval(formula_safe, {"__builtins__": {}}, env)
# ✨ 优化:将新列插入到第一个引用列的旁边
# 找到公式中引用的第一个列
first_ref_col = None
if normalized_mapping and len(normalized_mapping) > 0:
# 使用传入的映射查找
for mapping in normalized_mapping:
safe_name = mapping['safeName']
original_name = mapping['originalName']
if safe_name in formula_safe and original_name in result.columns:
first_ref_col = original_name
break
else:
# Fallback遍历所有列查找
for i, col in enumerate(result.columns):
safe_var = f'col_{i}'
if safe_var in formula_safe:
first_ref_col = col
break
if first_ref_col:
ref_col_index = result.columns.get_loc(first_ref_col)
result.insert(ref_col_index + 1, new_column_name, computed_values)
print(f'计算成功!新列插入在 "{first_ref_col}" 旁边')
else:
# 如果找不到引用列,添加到最后
result[new_column_name] = computed_values
print(f'计算成功!新列添加到最后')
print(f'新列类型: {result[new_column_name].dtype}')
print(f'新列前5个值:')
# 安全打印避免NaN/inf导致序列化错误
for idx, val in result[new_column_name].head().items():
if pd.isna(val):
print(f' [{idx}] None (NaN)')
elif np.isinf(val):
print(f' [{idx}] None (inf)')
else:
print(f' [{idx}] {val}')
print('')
# 统计结果
if pd.api.types.is_numeric_dtype(result[new_column_name]):
col_data = result[new_column_name]
# 统计缺失值和无效值
nan_count = col_data.isna().sum()
inf_count = np.isinf(col_data.replace([np.nan], 0)).sum()
print(f'统计信息:')
# 只对有效值计算统计量
valid_data = col_data.dropna().replace([np.inf, -np.inf], np.nan).dropna()
if len(valid_data) > 0:
print(f' 最小值: {valid_data.min():.2f}')
print(f' 最大值: {valid_data.max():.2f}')
print(f' 平均值: {valid_data.mean():.2f}')
else:
print(f' 没有有效的数值')
if nan_count > 0:
print(f' 缺失值(NaN): {nan_count}')
if inf_count > 0:
print(f' 无穷大值(inf): {inf_count}')
else:
print(f'非数值类型,跳过统计')
return result
except NameError as e:
# 列名不存在
missing_col = str(e).split("'")[1]
raise ValueError(f'"{missing_col}" 不存在,请检查公式中的列名')
except ZeroDivisionError:
raise ValueError('除零错误公式中存在除以0的情况')
except Exception as e:
raise ValueError(f'计算失败: {str(e)}')
def get_formula_examples() -> list[Dict[str, str]]:
"""
获取公式示例
Returns:
示例列表
"""
return [
{
'name': 'BMI计算',
'formula': '体重 / (身高/100)**2',
'description': '体重指数(需要身高(cm)和体重(kg)列)'
},
{
'name': '年龄分组',
'formula': 'round(年龄 / 10) * 10',
'description': '按10岁为一组20, 30, 40...'
},
{
'name': '综合得分',
'formula': '(FMA得分 * 0.6 + ADL得分 * 0.4)',
'description': '加权平均分'
},
{
'name': '变化率',
'formula': '(随访值 - 基线值) / 基线值 * 100',
'description': '计算变化百分比'
},
{
'name': '对数转换',
'formula': 'log(值 + 1)',
'description': '对数变换(处理偏态分布)'
},
]