Files
AIclinicalresearch/recover_dc_code.py
HaHafeng 75ceeb0653 hotfix(dc/tool-c): Fix compute formula validation and binning NaN serialization
Critical fixes:
1. Compute column: Add Chinese comma support in formula validation
   - Problem: Formula with Chinese comma failed validation
   - Fix: Add Chinese comma character to allowed_chars regex
   - Example: Support formulas like 'col1(kg)+ col2,col3'

2. Binning operation: Fix NaN serialization error
   - Problem: 'Out of range float values are not JSON compliant: nan'
   - Fix: Enhanced NaN/inf handling in binning endpoint
   - Added np.inf/-np.inf replacement before JSON serialization
   - Added manual JSON serialization with NaN->null conversion

3. Enhanced all operation endpoints for consistency
   - Updated conditional, dropna endpoints with same NaN/inf handling
   - Ensures all operations return JSON-compliant data

Modified files:
- extraction_service/operations/compute.py: Add Chinese comma to regex
- extraction_service/main.py: Enhanced NaN handling in binning/conditional/dropna

Status: Hotfix complete, ready for testing
2025-12-09 08:45:27 +08:00

226 lines
6.3 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
Cursor SQLite数据库恢复脚本
从state.vscdb中提取Chat/Composer历史记录中的DC模块代码
"""
import sqlite3
import json
import os
import sys
from datetime import datetime
# 数据库路径
DB_PATH = r"C:\Users\zhibo\AppData\Roaming\Cursor\User\workspaceStorage\d5e3431d02cbaa0109f69d72300733da\state.vscdb"
OUTPUT_DIR = "recovered_dc_code"
def extract_chat_history(db_path):
"""提取Chat历史记录"""
try:
# 复制数据库文件(安全起见)
backup_path = db_path + ".recovery_backup"
if not os.path.exists(backup_path):
import shutil
shutil.copy2(db_path, backup_path)
print(f"✅ 已创建数据库备份: {backup_path}")
# 连接数据库
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
# 查询所有表
cursor.execute("SELECT name FROM sqlite_master WHERE type='table';")
tables = cursor.fetchall()
print(f"\n📋 数据库中的表: {[t[0] for t in tables]}")
# 查询ItemTable
cursor.execute("SELECT key, value FROM ItemTable WHERE key LIKE '%chat%' OR key LIKE '%composer%' OR key LIKE '%dc%' OR key LIKE '%DC%'")
rows = cursor.fetchall()
print(f"\n🔍 找到 {len(rows)} 条相关记录")
results = []
for i, (key, value) in enumerate(rows):
try:
# 尝试解析JSON
if value:
data = json.loads(value)
results.append({
'index': i,
'key': key,
'data': data,
'raw_value': value
})
print(f" [{i}] Key: {key[:80]}...")
except json.JSONDecodeError:
# 不是JSON可能是纯文本
results.append({
'index': i,
'key': key,
'data': None,
'raw_value': value
})
print(f" [{i}] Key (非JSON): {key[:80]}...")
conn.close()
return results
except Exception as e:
print(f"❌ 错误: {e}")
import traceback
traceback.print_exc()
return []
def search_dc_code(results):
"""搜索DC模块相关代码"""
dc_findings = []
# 搜索关键词
keywords = [
'HealthCheckService',
'DualModelExtractionService',
'ConflictDetectionService',
'TemplateService',
'dc_health_checks',
'dc_extraction_tasks',
'dc_templates',
'dc_extraction_items',
'ExtractionController',
'tool-b',
'DC模块',
'数据清洗'
]
print(f"\n🔍 搜索DC模块相关代码...")
for result in results:
raw_value = result['raw_value']
if not raw_value:
continue
# 检查是否包含关键词
for keyword in keywords:
if keyword.lower() in raw_value.lower():
dc_findings.append({
'result': result,
'keyword': keyword
})
print(f" ✅ 在记录 [{result['index']}] 中找到关键词: {keyword}")
break
return dc_findings
def extract_code_blocks(text):
"""提取代码块"""
import re
# 匹配各种代码块格式
patterns = [
r'```typescript\n(.*?)```',
r'```ts\n(.*?)```',
r'```javascript\n(.*?)```',
r'```js\n(.*?)```',
r'```\n(.*?)```',
]
code_blocks = []
for pattern in patterns:
matches = re.findall(pattern, text, re.DOTALL)
code_blocks.extend(matches)
return code_blocks
def save_findings(dc_findings):
"""保存发现的DC代码"""
if not dc_findings:
print("\n⚠️ 未找到DC模块相关代码")
return
# 创建输出目录
os.makedirs(OUTPUT_DIR, exist_ok=True)
# 保存每个发现
for i, finding in enumerate(dc_findings):
result = finding['result']
keyword = finding['keyword']
# 保存原始数据
filename = f"{OUTPUT_DIR}/finding_{i:03d}_{keyword}.json"
with open(filename, 'w', encoding='utf-8') as f:
json.dump(result, f, ensure_ascii=False, indent=2)
print(f" 💾 已保存: {filename}")
# 提取代码块
raw_value = result['raw_value']
code_blocks = extract_code_blocks(raw_value)
if code_blocks:
for j, code in enumerate(code_blocks):
code_filename = f"{OUTPUT_DIR}/code_{i:03d}_{keyword}_block_{j}.ts"
with open(code_filename, 'w', encoding='utf-8') as f:
f.write(code)
print(f" 📝 提取代码块: {code_filename}")
# 生成汇总报告
report_path = f"{OUTPUT_DIR}/recovery_report.txt"
with open(report_path, 'w', encoding='utf-8') as f:
f.write("DC模块代码恢复报告\n")
f.write("=" * 80 + "\n\n")
f.write(f"恢复时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
f.write(f"找到相关记录数: {len(dc_findings)}\n\n")
for i, finding in enumerate(dc_findings):
f.write(f"\n[{i}] 关键词: {finding['keyword']}\n")
f.write(f"Key: {finding['result']['key']}\n")
f.write("-" * 80 + "\n")
print(f"\n📊 汇总报告已保存: {report_path}")
def main():
print("=" * 80)
print("Cursor SQLite数据库恢复工具 - DC模块专用")
print("=" * 80)
# 检查数据库文件
if not os.path.exists(DB_PATH):
print(f"❌ 数据库文件不存在: {DB_PATH}")
return
print(f"\n📂 数据库路径: {DB_PATH}")
print(f"📂 输出目录: {OUTPUT_DIR}")
# 提取Chat历史
results = extract_chat_history(DB_PATH)
if not results:
print("\n⚠️ 未找到任何记录")
return
# 搜索DC代码
dc_findings = search_dc_code(results)
# 保存结果
save_findings(dc_findings)
print("\n" + "=" * 80)
print("✅ 恢复完成!")
print(f"📁 请查看 {OUTPUT_DIR} 目录")
print("=" * 80)
if __name__ == "__main__":
main()