Files
AIclinicalresearch/recover_dc_code.py
HaHafeng b31255031e feat(iit-manager): Add WeChat Official Account integration for patient notifications
Features:
- PatientWechatCallbackController for URL verification and message handling
- PatientWechatService for template and customer messages
- Support for secure mode (message encryption/decryption)
- Simplified route /wechat/patient/callback for WeChat config
- Event handlers for subscribe/unsubscribe/text messages
- Template message for visit reminders

Technical details:
- Reuse @wecom/crypto for encryption (compatible with Official Account)
- Relaxed Fastify schema validation to prevent early request blocking
- Access token caching (7000s with 5min pre-refresh)
- Comprehensive logging for debugging

Testing: Local URL verification passed, ready for SAE deployment

Status: Code complete, waiting for WeChat platform configuration
2026-01-04 22:53:42 +08:00

259 lines
6.4 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
Cursor SQLite数据库恢复脚本
从state.vscdb中提取Chat/Composer历史记录中的DC模块代码
"""
import sqlite3
import json
import os
import sys
from datetime import datetime
# 数据库路径
DB_PATH = r"C:\Users\zhibo\AppData\Roaming\Cursor\User\workspaceStorage\d5e3431d02cbaa0109f69d72300733da\state.vscdb"
OUTPUT_DIR = "recovered_dc_code"
def extract_chat_history(db_path):
"""提取Chat历史记录"""
try:
# 复制数据库文件(安全起见)
backup_path = db_path + ".recovery_backup"
if not os.path.exists(backup_path):
import shutil
shutil.copy2(db_path, backup_path)
print(f"✅ 已创建数据库备份: {backup_path}")
# 连接数据库
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
# 查询所有表
cursor.execute("SELECT name FROM sqlite_master WHERE type='table';")
tables = cursor.fetchall()
print(f"\n📋 数据库中的表: {[t[0] for t in tables]}")
# 查询ItemTable
cursor.execute("SELECT key, value FROM ItemTable WHERE key LIKE '%chat%' OR key LIKE '%composer%' OR key LIKE '%dc%' OR key LIKE '%DC%'")
rows = cursor.fetchall()
print(f"\n🔍 找到 {len(rows)} 条相关记录")
results = []
for i, (key, value) in enumerate(rows):
try:
# 尝试解析JSON
if value:
data = json.loads(value)
results.append({
'index': i,
'key': key,
'data': data,
'raw_value': value
})
print(f" [{i}] Key: {key[:80]}...")
except json.JSONDecodeError:
# 不是JSON可能是纯文本
results.append({
'index': i,
'key': key,
'data': None,
'raw_value': value
})
print(f" [{i}] Key (非JSON): {key[:80]}...")
conn.close()
return results
except Exception as e:
print(f"❌ 错误: {e}")
import traceback
traceback.print_exc()
return []
def search_dc_code(results):
"""搜索DC模块相关代码"""
dc_findings = []
# 搜索关键词
keywords = [
'HealthCheckService',
'DualModelExtractionService',
'ConflictDetectionService',
'TemplateService',
'dc_health_checks',
'dc_extraction_tasks',
'dc_templates',
'dc_extraction_items',
'ExtractionController',
'tool-b',
'DC模块',
'数据清洗'
]
print(f"\n🔍 搜索DC模块相关代码...")
for result in results:
raw_value = result['raw_value']
if not raw_value:
continue
# 检查是否包含关键词
for keyword in keywords:
if keyword.lower() in raw_value.lower():
dc_findings.append({
'result': result,
'keyword': keyword
})
print(f" ✅ 在记录 [{result['index']}] 中找到关键词: {keyword}")
break
return dc_findings
def extract_code_blocks(text):
"""提取代码块"""
import re
# 匹配各种代码块格式
patterns = [
r'```typescript\n(.*?)```',
r'```ts\n(.*?)```',
r'```javascript\n(.*?)```',
r'```js\n(.*?)```',
r'```\n(.*?)```',
]
code_blocks = []
for pattern in patterns:
matches = re.findall(pattern, text, re.DOTALL)
code_blocks.extend(matches)
return code_blocks
def save_findings(dc_findings):
"""保存发现的DC代码"""
if not dc_findings:
print("\n⚠️ 未找到DC模块相关代码")
return
# 创建输出目录
os.makedirs(OUTPUT_DIR, exist_ok=True)
# 保存每个发现
for i, finding in enumerate(dc_findings):
result = finding['result']
keyword = finding['keyword']
# 保存原始数据
filename = f"{OUTPUT_DIR}/finding_{i:03d}_{keyword}.json"
with open(filename, 'w', encoding='utf-8') as f:
json.dump(result, f, ensure_ascii=False, indent=2)
print(f" 💾 已保存: {filename}")
# 提取代码块
raw_value = result['raw_value']
code_blocks = extract_code_blocks(raw_value)
if code_blocks:
for j, code in enumerate(code_blocks):
code_filename = f"{OUTPUT_DIR}/code_{i:03d}_{keyword}_block_{j}.ts"
with open(code_filename, 'w', encoding='utf-8') as f:
f.write(code)
print(f" 📝 提取代码块: {code_filename}")
# 生成汇总报告
report_path = f"{OUTPUT_DIR}/recovery_report.txt"
with open(report_path, 'w', encoding='utf-8') as f:
f.write("DC模块代码恢复报告\n")
f.write("=" * 80 + "\n\n")
f.write(f"恢复时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
f.write(f"找到相关记录数: {len(dc_findings)}\n\n")
for i, finding in enumerate(dc_findings):
f.write(f"\n[{i}] 关键词: {finding['keyword']}\n")
f.write(f"Key: {finding['result']['key']}\n")
f.write("-" * 80 + "\n")
print(f"\n📊 汇总报告已保存: {report_path}")
def main():
print("=" * 80)
print("Cursor SQLite数据库恢复工具 - DC模块专用")
print("=" * 80)
# 检查数据库文件
if not os.path.exists(DB_PATH):
print(f"❌ 数据库文件不存在: {DB_PATH}")
return
print(f"\n📂 数据库路径: {DB_PATH}")
print(f"📂 输出目录: {OUTPUT_DIR}")
# 提取Chat历史
results = extract_chat_history(DB_PATH)
if not results:
print("\n⚠️ 未找到任何记录")
return
# 搜索DC代码
dc_findings = search_dc_code(results)
# 保存结果
save_findings(dc_findings)
print("\n" + "=" * 80)
print("✅ 恢复完成!")
print(f"📁 请查看 {OUTPUT_DIR} 目录")
print("=" * 80)
if __name__ == "__main__":
main()