# IIT Manager Agent 完整技术开发方案 (V1.1) > **文档版本:** V1.1(架构评审修正版) > **创建日期:** 2025-12-31 > **最后更新:** 2025-12-31 > **维护者:** 架构团队 > **适用阶段:** MVP + Phase 1-4 完整开发 > **文档目的:** 基于现有系统架构,提供可直接执行的技术实施方案 > **V1.1 更新:** 整合架构评审意见,修正网络连通性风险、增加历史数据扫描、明确前端技术栈 --- ## 🔥 V1.1 核心修正 基于架构评审(参考:`06-开发记录/IIT Manager Agent 技术方案审查与补丁.md`),本版本修正了3个关键问题: 1. **✅ 致命风险修正**:混合同步模式(Webhook + 轮询),解决医院内网连通性问题 2. **✅ 功能补充**:历史数据全量扫描,支持存量数据质控 3. **✅ 技术栈明确**:前端采用Taro(React语法),支持小程序+H5双端 --- ## 📋 文档导航 1. [系统架构设计](#1-系统架构设计) 2. [现有能力复用](#2-现有能力复用) 3. [核心技术实现](#3-核心技术实现) 4. [数据库设计](#4-数据库设计) 5. [API设计](#5-api设计) 6. [部署架构](#6-部署架构) 7. [开发计划](#7-开发计划) 8. [风险评估与对策](#8-风险评估与对策) --- ## 1. 系统架构设计 ### 1.1 总体架构(基于现有平台) ``` ┌─────────────────────────────────────────────────────────────┐ │ 用户交互层 (Frontend) │ │ ┌────────────┐ ┌────────────┐ ┌────────────┐ │ │ │ 企业微信 │ │ 微信小程序 │ │ PC │ │ │ │ (通知) │ │ (PI查看) │ │ Workbench │ │ │ └────────────┘ └────────────┘ └────────────┘ │ └─────────────────────────────────────────────────────────────┘ ↓ ↑ REST API / WebSocket ┌─────────────────────────────────────────────────────────────┐ │ 业务模块层 (IIT Manager Module) │ │ ┌─────────────────────────────────────────────────────┐ │ │ │ Node.js Backend (Fastify + TypeScript) │ │ │ │ ├── controllers/ - HTTP路由处理 │ │ │ │ ├── services/ - 业务逻辑层 │ │ │ │ ├── agents/ - 4个智能体 │ │ │ │ └── adapters/ - 外部系统适配器 │ │ │ └─────────────────────────────────────────────────────┘ │ └─────────────────────────────────────────────────────────────┘ ↓ ↑ 复用平台能力 ┌─────────────────────────────────────────────────────────────┐ │ 平台基础层 (Platform - 已有) │ │ ✅ Storage (OSS/Local) ✅ Logger (Winston) │ │ ✅ Cache (Postgres) ✅ JobQueue (pg-boss) │ │ ✅ LLMFactory (多模型) ✅ CheckpointService │ │ ✅ DifyClient (RAG) ✅ Database (Prisma) │ └─────────────────────────────────────────────────────────────┘ ↓ ↑ ┌─────────────────────────────────────────────────────────────┐ │ 外部系统集成层 (External) │ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │ │ REDCap │ │ Dify RAG │ │ 企业微信 │ │ Python │ │ │ │ (EDC) │ │ (知识库) │ │ (通知) │ │ 微服务 │ │ │ └──────────┘ └──────────┘ └──────────┘ └──────────┘ │ └─────────────────────────────────────────────────────────────┘ ↓ ↑ ┌─────────────────────────────────────────────────────────────┐ │ 数据存储层 (Storage) │ │ ┌──────────────────┐ ┌──────────────────┐ │ │ │ RDS PostgreSQL │ │ OSS对象存储 │ │ │ │ (业务数据+队列) │ │ (文件/Protocol) │ │ │ └──────────────────┘ └──────────────────┘ │ └─────────────────────────────────────────────────────────────┘ ``` ### 1.2 架构亮点(符合现有规范) #### ✅ 1. 完全复用平台能力 ```typescript // ✅ 不重复实现基础设施 import { storage } from '@/common/storage'; // 文件存储 import { logger } from '@/common/logging'; // 日志系统 import { jobQueue } from '@/common/jobs'; // 异步任务 import { cache } from '@/common/cache'; // Postgres缓存 import { CheckpointService } from '@/common/jobs'; // 断点续传 import { LLMFactory } from '@/common/llm'; // LLM调用 import { DifyClient } from '@/clients/DifyClient'; // RAG检索 import { prisma } from '@/config/database'; // 数据库 ``` #### ✅ 2. Postgres-Only 架构(遵循规范) ```typescript // 任务管理信息存储在 job.data,业务表只存储业务信息 await jobQueue.push('iit:quality-check:batch', { // 业务信息 projectId: 'proj_001', recordIds: ['P001', 'P002', ...], // ✅ 任务拆分信息(自动存储在 platform_schema.job.data) batchIndex: 1, totalBatches: 10, // ✅ 进度追踪(自动存储) processedCount: 0, successCount: 0, failedCount: 0 }); // 使用 CheckpointService 管理断点 const checkpointService = new CheckpointService(prisma); await checkpointService.saveCheckpoint(job.id, { currentBatchIndex: 5, currentIndex: 250 }); ``` #### ✅ 3. Schema 隔离(新增 iit_schema) ```prisma // prisma/schema.prisma // 现有Schema:platform, aia, pkb, asl, dc, ssa, st, rvw, admin, common // 新增Schema:iit generator client { provider = "prisma-client-js" previewFeatures = ["multiSchema"] } datasource db { provider = "postgresql" url = env("DATABASE_URL") schemas = ["platform", "aia", "pkb", "asl", "dc", "iit"] // 新增 iit } // IIT Manager 的所有表都在 iit_schema 中 model IitProject { id String @id @default(uuid()) // ... @@schema("iit") } ``` #### ✅ 4. 云原生就绪(SAE部署) - 无状态应用(不依赖本地文件) - 存储抽象层(Local ↔ OSS 零代码切换) - 异步任务(避免30秒超时) - 数据库连接池(防止连接耗尽) --- ## 2. 现有能力复用 ### 2.1 Dify RAG 集成(已有基础) #### 现有能力(PKB模块) ```typescript // backend/src/clients/DifyClient.ts (已有 282行代码) export class DifyClient { async createDataset(name: string): Promise; async uploadDocument(datasetId: string, file: Buffer): Promise; async query(datasetId: string, query: string): Promise; // ... 其他方法 } ``` #### IIT Manager 使用方式 ```typescript // backend/src/modules/iit-manager/services/ProtocolService.ts import { DifyClient } from '@/clients/DifyClient'; export class ProtocolService { private difyClient: DifyClient; constructor() { this.difyClient = new DifyClient( process.env.DIFY_API_KEY!, process.env.DIFY_BASE_URL! ); } /** * 为项目创建Protocol知识库 */ async initializeProtocolKnowledgeBase( projectId: string, protocolPdf: Buffer ): Promise { // 1. 创建Dify Dataset const datasetId = await this.difyClient.createDataset( `IIT_Project_${projectId}_Protocol` ); // 2. 上传Protocol PDF const documentId = await this.difyClient.uploadDocument( datasetId, protocolPdf ); // 3. 保存到数据库 await prisma.iitProject.update({ where: { id: projectId }, data: { difyDatasetId: datasetId } }); return datasetId; } /** * 检查数据是否符合Protocol(质控Agent核心) */ async checkProtocolCompliance(params: { projectId: string; fieldName: string; value: any; context: Record; }): Promise { // 1. 获取项目的Dify知识库ID const project = await prisma.iitProject.findUnique({ where: { id: params.projectId }, select: { difyDatasetId: true } }); // 2. 构造查询Prompt const query = ` 患者数据:${JSON.stringify(params.context)} 当前字段:${params.fieldName} = ${params.value} 请检查此数据是否符合研究方案(Protocol)的要求。 如果发现问题,请指出: 1. 违反了哪条规则 2. 该规则在方案中的页码 3. 正确的值应该是什么 4. 置信度(0-1) `; // 3. 调用Dify RAG检索 const result = await this.difyClient.query( project.difyDatasetId, query ); // 4. 解析AI响应 return this.parseComplianceResult(result); } } ``` ### 2.2 LLM 调用(已有工厂) ```typescript // ✅ 复用现有 LLMFactory import { LLMFactory } from '@/common/llm'; const llm = LLMFactory.getAdapter('deepseek-v3'); const response = await llm.chat([ { role: 'system', content: systemPrompt }, { role: 'user', content: userInput } ]); ``` ### 2.3 异步任务队列(Postgres-Only) ```typescript // ✅ 使用 pg-boss 队列(平台已有) import { jobQueue } from '@/common/jobs'; // 推送质控任务 await jobQueue.push('iit:quality-check:batch', { projectId: 'proj_001', recordIds: ['P001', 'P002', 'P003'] }); // Worker处理(自动断点续传) jobQueue.registerWorker('iit:quality-check:batch', async (job) => { const checkpointService = new CheckpointService(prisma); // 加载断点 const checkpoint = await checkpointService.loadCheckpoint(job.id); const startIndex = checkpoint?.currentIndex || 0; // 批量处理 for (let i = startIndex; i < job.data.recordIds.length; i++) { await processRecord(job.data.recordIds[i]); // 每10条保存断点 if (i % 10 === 0) { await checkpointService.saveCheckpoint(job.id, { currentIndex: i, processedCount: i }); } } }); ``` ### 2.4 文件存储(OSS抽象层) ```typescript // ✅ 使用存储抽象层 import { storage } from '@/common/storage'; // 上传Protocol PDF const key = `iit/projects/${projectId}/protocol.pdf`; const url = await storage.upload(key, pdfBuffer); // 下载Protocol PDF const pdfBuffer = await storage.download(key); ``` ### 2.5 日志系统(Winston) ```typescript // ✅ 使用平台日志系统 import { logger } from '@/common/logging'; logger.info('Quality check started', { projectId, recordId, agent: 'DataQualityAgent' }); logger.error('Quality check failed', { error: err.message, stack: err.stack, projectId, recordId }); ``` --- ## 3. 核心技术实现 ### 3.1 REDCap 集成(双向对接) #### 3.1.1 REDCap External Module(PHP侧) ```php pushWebhook([ 'event' => 'record_updated', 'project_id' => $project_id, 'record_id' => $record, 'instrument' => $instrument, 'event_id' => $event_id, 'data' => $data, 'timestamp' => time() ]); } /** * 推送Webhook(带签名验证) */ private function pushWebhook($payload) { $apiKey = $this->getSystemSetting('iit_manager_api_key'); $webhookUrl = $this->getSystemSetting('iit_manager_webhook_url'); // HMAC-SHA256签名 $signature = hash_hmac('sha256', json_encode($payload), $apiKey); $ch = curl_init($webhookUrl); curl_setopt($ch, CURLOPT_POST, 1); curl_setopt($ch, CURLOPT_POSTFIELDS, json_encode($payload)); curl_setopt($ch, CURLOPT_HTTPHEADER, [ 'Content-Type: application/json', 'X-Signature: ' . $signature, 'X-Timestamp: ' . time() ]); curl_setopt($ch, CURLOPT_RETURNTRANSFER, true); curl_setopt($ch, CURLOPT_TIMEOUT, 10); $response = curl_exec($ch); $httpCode = curl_getinfo($ch, CURLINFO_HTTP_CODE); if ($httpCode !== 200) { // 记录到REDCap日志 \REDCap::logEvent('IIT Manager Webhook Failed', "HTTP $httpCode: $response", '', $record); } curl_close($ch); } } ``` #### 3.1.2 Node.js Webhook接收器 ```typescript // backend/src/modules/iit-manager/controllers/webhookController.ts import { FastifyRequest, FastifyReply } from 'fastify'; import { logger } from '@/common/logging'; import { jobQueue } from '@/common/jobs'; import crypto from 'crypto'; interface RedcapWebhookPayload { event: 'record_updated' | 'record_created' | 'record_deleted'; project_id: string; record_id: string; instrument: string; event_id: string; data: Record; timestamp: number; } export async function handleRedcapWebhook( request: FastifyRequest<{ Body: RedcapWebhookPayload }>, reply: FastifyReply ) { // 1. 验证签名 const signature = request.headers['x-signature'] as string; const timestamp = request.headers['x-timestamp'] as string; if (!verifyWebhookSignature(request.body, signature, timestamp)) { logger.warn('Invalid webhook signature', { project_id: request.body.project_id }); return reply.code(403).send({ error: 'Invalid signature' }); } // 2. 防重放攻击(5分钟有效期) const now = Math.floor(Date.now() / 1000); if (Math.abs(now - parseInt(timestamp)) > 300) { return reply.code(403).send({ error: 'Timestamp expired' }); } // 3. 立即返回200(不阻塞REDCap) reply.code(200).send({ status: 'accepted' }); // 4. 异步触发质控检查(不等待完成) setImmediate(async () => { try { const { project_id, record_id, data } = request.body; // 推送到质控队列 await jobQueue.push('iit:quality-check', { projectId: project_id, recordId: record_id, data: data }); logger.info('Quality check queued', { project_id, record_id }); } catch (error) { logger.error('Failed to queue quality check', { error: error.message, payload: request.body }); } }); } /** * 验证Webhook签名 */ function verifyWebhookSignature( payload: any, signature: string, timestamp: string ): boolean { const apiKey = process.env.REDCAP_WEBHOOK_SECRET!; const expectedSignature = crypto .createHmac('sha256', apiKey) .update(JSON.stringify(payload)) .digest('hex'); return crypto.timingSafeEqual( Buffer.from(signature), Buffer.from(expectedSignature) ); } ``` #### 3.1.3 REDCap API 适配器(数据回写) ```typescript // backend/src/modules/iit-manager/adapters/RedcapAdapter.ts import axios, { AxiosInstance } from 'axios'; import { logger } from '@/common/logging'; export class RedcapAdapter { private client: AxiosInstance; private projectApiToken: string; constructor(redcapUrl: string, apiToken: string) { this.projectApiToken = apiToken; this.client = axios.create({ baseURL: redcapUrl, timeout: 30000 }); } /** * 导出记录 */ async exportRecords(params: { records?: string[]; fields?: string[]; events?: string[]; }): Promise { const response = await this.client.post('', { token: this.projectApiToken, content: 'record', action: 'export', format: 'json', type: 'flat', records: params.records, fields: params.fields, events: params.events }); return response.data; } /** * 导入记录(影子状态回写) */ async importRecords(records: Record[]): Promise { try { const response = await this.client.post('', { token: this.projectApiToken, content: 'record', action: 'import', format: 'json', type: 'flat', overwriteBehavior: 'normal', data: JSON.stringify(records) }); logger.info('REDCap records imported', { count: response.data.count, ids: response.data.ids }); } catch (error) { logger.error('REDCap import failed', { error: error.message, records: records.map(r => r.record_id) }); throw error; } } /** * 导出元数据(表单结构) */ async exportMetadata(): Promise { const response = await this.client.post('', { token: this.projectApiToken, content: 'metadata', format: 'json' }); return response.data; } } ``` #### 3.1.4 混合同步模式(🔥 V1.1 核心修正) ```typescript // backend/src/modules/iit-manager/services/SyncManager.ts import { logger } from '@/common/logging'; import { jobQueue } from '@/common/jobs'; import { cache } from '@/common/cache'; import { prisma } from '@/config/database'; import { RedcapAdapter } from '../adapters/RedcapAdapter'; /** * 同步管理器:解决医院内网连通性问题 * * 核心策略: * 1. 优先使用Webhook(实时性)- 适用于REDCap可访问公网的场景 * 2. 定时轮询作为兜底(可靠性)- 适用于所有场景 */ export class SyncManager { private redcapAdapter: RedcapAdapter; constructor(redcapAdapter: RedcapAdapter) { this.redcapAdapter = redcapAdapter; } /** * 智能同步策略(自适应) * 启动时测试Webhook连通性,自动选择最佳模式 */ async initializeSync(projectId: string) { logger.info('Initializing sync strategy', { projectId }); // 1. 测试Webhook连通性 const webhookWorking = await this.testWebhookConnectivity(projectId); if (webhookWorking) { logger.info('Webhook connectivity OK, using real-time mode', { projectId }); // 轮询作为备份(间隔30分钟) await this.schedulePolling(projectId, 30); } else { logger.warn('Webhook blocked by firewall, using polling mode', { projectId }); // 轮询作为主模式(间隔5分钟) await this.schedulePolling(projectId, 5); } } /** * 测试Webhook连通性 */ private async testWebhookConnectivity(projectId: string): Promise { try { const project = await prisma.iitProject.findUnique({ where: { id: projectId }, select: { redcapUrl: true } }); // 调用REDCap EM的测试端点 const response = await axios.post( `${project.redcapUrl}/api/?type=module&prefix=iit_manager_connector&page=test`, { projectId, test: 'ping' }, { timeout: 5000 } ); return response.status === 200; } catch (error) { logger.warn('Webhook connectivity test failed', { projectId, error: error.message }); return false; } } /** * 定时轮询(核心兜底机制) */ async schedulePolling(projectId: string, intervalMinutes: number = 10) { // 使用 pg-boss 的 schedule 功能 await jobQueue.schedule( 'iit:redcap:poll', { projectId }, { every: `${intervalMinutes} minutes`, // 重要:设置合理的超时时间 expireIn: `${intervalMinutes * 2} minutes` } ); logger.info('Polling scheduled', { projectId, intervalMinutes }); } /** * 轮询处理器(Worker) */ async handlePoll(projectId: string) { const startTime = Date.now(); try { // 1. 获取上次同步时间(从缓存或数据库) const cacheKey = `iit:sync:${projectId}:last`; const lastSync = await cache.get(cacheKey) || (await this.getLastSyncFromDB(projectId)); logger.debug('Polling started', { projectId, lastSync }); // 2. 调用REDCap API获取修改的记录(轻量级) // REDCap API支持按时间过滤:dateRangeBegin const records = await this.redcapAdapter.exportRecords({ dateRangeBegin: lastSync, fields: ['record_id', 'last_modified'] // 先只拉ID和时间戳 }); if (records.length === 0) { logger.debug('No new records to sync', { projectId }); return; } logger.info('New records detected', { projectId, count: records.length, since: lastSync }); // 3. 批量推送质控任务(智能阈值判断) const THRESHOLD = 50; if (records.length >= THRESHOLD) { // 大批量:队列模式 + 任务拆分 const chunks = this.splitIntoChunks(records, 50); for (const chunk of chunks) { await jobQueue.push('iit:quality-check:batch', { projectId, recordIds: chunk.map(r => r.record_id) }); } } else { // 小批量:直接推送 for (const record of records) { // 幂等性检查(防止重复处理) const isDuplicate = await this.isDuplicate(projectId, record.record_id); if (!isDuplicate) { await jobQueue.push('iit:quality-check', { projectId, recordId: record.record_id }); } } } // 4. 更新同步时间(双写:缓存 + 数据库) const now = new Date().toISOString(); await cache.set(cacheKey, now, 3600 * 24); // 缓存24小时 await this.updateLastSyncInDB(projectId, now); logger.info('Polling completed', { projectId, recordsFound: records.length, duration: Date.now() - startTime }); } catch (error) { logger.error('Polling failed', { error: error.message, projectId, duration: Date.now() - startTime }); throw error; // 让 pg-boss 重试 } } /** * 幂等性保护(防止重复质控) */ private async isDuplicate(projectId: string, recordId: string): Promise { const key = `iit:processed:${projectId}:${recordId}`; const exists = await cache.get(key); if (!exists) { await cache.set(key, 'true', 3600); // 缓存1小时 return false; } return true; } /** * 从数据库获取上次同步时间 */ private async getLastSyncFromDB(projectId: string): Promise { const project = await prisma.iitProject.findUnique({ where: { id: projectId }, select: { lastSyncAt: true } }); return project?.lastSyncAt?.toISOString() || new Date(Date.now() - 24 * 3600 * 1000).toISOString(); // 默认24小时前 } /** * 更新数据库中的同步时间 */ private async updateLastSyncInDB(projectId: string, syncTime: string) { await prisma.iitProject.update({ where: { id: projectId }, data: { lastSyncAt: new Date(syncTime) } }); } /** * 任务拆分工具 */ private splitIntoChunks(array: T[], chunkSize: number): T[][] { const chunks: T[][] = []; for (let i = 0; i < array.length; i += chunkSize) { chunks.push(array.slice(i, i + chunkSize)); } return chunks; } } ``` #### 3.1.5 历史数据全量扫描(🔥 V1.1 功能补充) ```typescript // backend/src/modules/iit-manager/services/BulkScanService.ts import { logger } from '@/common/logging'; import { jobQueue } from '@/common/jobs'; import { prisma } from '@/config/database'; import { CheckpointService } from '@/common/jobs'; import { RedcapAdapter } from '../adapters/RedcapAdapter'; import { DataQualityAgent } from '../agents/DataQualityAgent'; /** * 全量扫描服务:支持存量数据质控 * * 应用场景: * 1. 项目初始化时,扫描历史数据 * 2. Protocol更新后,重新扫描所有数据 * 3. 手动触发全量质控 */ export class BulkScanService { private redcapAdapter: RedcapAdapter; constructor(redcapAdapter: RedcapAdapter) { this.redcapAdapter = redcapAdapter; } /** * 全量扫描(启动时或手动触发) */ async scanAllRecords(projectId: string): Promise { logger.info('Starting bulk scan', { projectId }); // 1. 轻量级拉取所有record_id(不拉完整数据) const allRecords = await this.redcapAdapter.exportRecords({ fields: ['record_id'], // 只要ID,速度快 rawOrLabel: 'raw' }); const totalRecords = allRecords.length; logger.info('Total records to scan', { projectId, totalRecords }); // 2. 智能阈值判断 const THRESHOLD = 50; const useQueue = totalRecords >= THRESHOLD; if (useQueue) { // 队列模式:任务拆分 + 断点续传 return await this.scanViaQueue(projectId, allRecords); } else { // 直接模式:快速处理 return await this.scanDirectly(projectId, allRecords); } } /** * 队列模式:大批量数据(≥50条) */ private async scanViaQueue( projectId: string, allRecords: { record_id: string }[] ): Promise { // 1. 创建任务记录 const taskRun = await prisma.iitTaskRun.create({ data: { projectId, taskType: 'bulk-scan', status: 'pending', totalItems: allRecords.length, processedItems: 0, successItems: 0, failedItems: 0 } }); // 2. 任务拆分(每批50条) const chunks = this.splitIntoChunks(allRecords, 50); // 3. 推送批次任务 for (let i = 0; i < chunks.length; i++) { const chunk = chunks[i]; const jobId = await jobQueue.push('iit:bulk-scan:batch', { // 业务信息 taskRunId: taskRun.id, projectId, recordIds: chunk.map(r => r.record_id), // ✅ 任务拆分信息(自动存储在 job.data) batchIndex: i, totalBatches: chunks.length, startIndex: i * 50, endIndex: Math.min((i + 1) * 50, allRecords.length) }); // 关联 job_id 到任务记录 await prisma.iitTaskRun.update({ where: { id: taskRun.id }, data: { jobId } }); } logger.info('Bulk scan queued', { projectId, totalRecords: allRecords.length, totalBatches: chunks.length, taskRunId: taskRun.id }); return taskRun.id; } /** * Worker处理批次(支持断点续传) */ async processBatch(job: any) { const { taskRunId, projectId, recordIds, batchIndex, totalBatches } = job.data; const checkpointService = new CheckpointService(prisma); // 1. 加载断点 const checkpoint = await checkpointService.loadCheckpoint(job.id); const startIndex = checkpoint?.currentIndex || 0; logger.info('Processing batch', { taskRunId, batchIndex, totalBatches, recordCount: recordIds.length, resumeFrom: startIndex }); let successCount = 0; let failedCount = 0; // 2. 逐个处理记录 for (let i = startIndex; i < recordIds.length; i++) { const recordId = recordIds[i]; try { // 2.1 拉取完整数据(按需拉取,避免内存溢出) const recordData = await this.redcapAdapter.exportRecords({ records: [recordId] }); // 2.2 调用质控Agent const agent = new DataQualityAgent(); await agent.checkRecord({ projectId, recordId, data: recordData[0] }); successCount++; } catch (error) { logger.error('Record scan failed', { recordId, error: error.message }); failedCount++; } // 2.3 每10条保存断点 if (i % 10 === 0 || i === recordIds.length - 1) { await checkpointService.saveCheckpoint(job.id, { currentIndex: i + 1, processedCount: i + 1, successCount, failedCount }); // 更新任务统计 await this.updateTaskProgress(taskRunId, i + 1, successCount, failedCount); } } logger.info('Batch completed', { taskRunId, batchIndex, successCount, failedCount }); } /** * 直接模式:小批量数据(<50条) */ private async scanDirectly( projectId: string, allRecords: { record_id: string }[] ): Promise { // 创建任务记录 const taskRun = await prisma.iitTaskRun.create({ data: { projectId, taskType: 'bulk-scan', status: 'processing', totalItems: allRecords.length, processedItems: 0, successItems: 0, failedItems: 0, startedAt: new Date() } }); const agent = new DataQualityAgent(); let successCount = 0; let failedCount = 0; // 直接处理(不入队列) for (const record of allRecords) { try { const recordData = await this.redcapAdapter.exportRecords({ records: [record.record_id] }); await agent.checkRecord({ projectId, recordId: record.record_id, data: recordData[0] }); successCount++; } catch (error) { logger.error('Record scan failed', { recordId: record.record_id, error: error.message }); failedCount++; } } // 更新任务完成 await prisma.iitTaskRun.update({ where: { id: taskRun.id }, data: { status: 'completed', processedItems: allRecords.length, successItems: successCount, failedItems: failedCount, completedAt: new Date(), duration: Math.floor((Date.now() - taskRun.startedAt.getTime()) / 1000) } }); return taskRun.id; } /** * 更新任务进度(供前端轮询) */ private async updateTaskProgress( taskRunId: string, processedItems: number, successItems: number, failedItems: number ) { const task = await prisma.iitTaskRun.findUnique({ where: { id: taskRunId }, select: { totalItems: true } }); await prisma.iitTaskRun.update({ where: { id: taskRunId }, data: { processedItems, successItems, failedItems, status: processedItems >= task!.totalItems ? 'completed' : 'processing' } }); } /** * 任务拆分工具 */ private splitIntoChunks(array: T[], chunkSize: number): T[][] { const chunks: T[][] = []; for (let i = 0; i < array.length; i += chunkSize) { chunks.push(array.slice(i, i + chunkSize)); } return chunks; } } ``` ### 3.2 数据质控 Agent(核心业务) ```typescript // backend/src/modules/iit-manager/agents/DataQualityAgent.ts import { logger } from '@/common/logging'; import { prisma } from '@/config/database'; import { ProtocolService } from '../services/ProtocolService'; export class DataQualityAgent { private protocolService: ProtocolService; constructor() { this.protocolService = new ProtocolService(); } /** * 检查单条记录 */ async checkRecord(params: { projectId: string; recordId: string; data: Record; }): Promise { logger.info('Quality check started', params); // 1. 获取项目配置(关键字段映射) const project = await prisma.iitProject.findUnique({ where: { id: params.projectId }, select: { fieldMappings: true, // JSON: { age: 'patient_age', gender: 'sex', ... } difyDatasetId: true } }); if (!project || !project.difyDatasetId) { logger.warn('Project not configured', { projectId: params.projectId }); return; } // 2. 提取关键字段值 const mappings = project.fieldMappings as Record; const context = { age: params.data[mappings.age], gender: params.data[mappings.gender], enrollmentDate: params.data[mappings.enrollmentDate], // ... 其他映射字段 }; // 3. 逐个字段检查 const issues: any[] = []; for (const [logicalField, redcapField] of Object.entries(mappings)) { const value = params.data[redcapField]; // 调用Protocol服务检查合规性 const result = await this.protocolService.checkProtocolCompliance({ projectId: params.projectId, fieldName: logicalField, value: value, context: context }); if (!result.isCompliant) { issues.push({ fieldName: logicalField, currentValue: value, suggestedValue: result.suggestedValue, reasoning: result.reasoning, protocolPage: result.protocolPage, confidence: result.confidence }); } } // 4. 如果发现问题,创建影子建议 if (issues.length > 0) { await this.createPendingActions( params.projectId, params.recordId, issues ); // 5. 发送企微通知(严重违背) const severeIssues = issues.filter(i => i.confidence > 0.85); if (severeIssues.length > 0) { await this.sendWeChatNotification( params.projectId, params.recordId, severeIssues ); } } logger.info('Quality check completed', { projectId: params.projectId, recordId: params.recordId, issuesFound: issues.length }); } /** * 创建影子建议(PROPOSED状态) */ private async createPendingActions( projectId: string, recordId: string, issues: any[] ): Promise { for (const issue of issues) { await prisma.iitPendingAction.create({ data: { projectId: projectId, recordId: recordId, fieldName: issue.fieldName, currentValue: issue.currentValue, suggestedValue: issue.suggestedValue, status: 'PROPOSED', agentType: 'DATA_QUALITY', reasoning: issue.reasoning, evidence: { protocolPage: issue.protocolPage, confidence: issue.confidence }, createdAt: new Date() } }); } } /** * 发送企微通知 */ private async sendWeChatNotification( projectId: string, recordId: string, issues: any[] ): Promise { // TODO: 实现企微通知(Phase 3) logger.info('WeChat notification sent', { projectId, recordId, issuesCount: issues.length }); } } ``` ### 3.3 企业微信集成 ```typescript // backend/src/modules/iit-manager/adapters/WeChatAdapter.ts import axios, { AxiosInstance } from 'axios'; import { cache } from '@/common/cache'; import { logger } from '@/common/logging'; export class WeChatAdapter { private client: AxiosInstance; private corpId: string; private corpSecret: string; private agentId: string; constructor() { this.corpId = process.env.WECHAT_CORP_ID!; this.corpSecret = process.env.WECHAT_CORP_SECRET!; this.agentId = process.env.WECHAT_AGENT_ID!; this.client = axios.create({ baseURL: 'https://qyapi.weixin.qq.com/cgi-bin', timeout: 10000 }); } /** * 获取Access Token(缓存2小时) */ private async getAccessToken(): Promise { // 1. 从缓存读取 const cacheKey = `wechat:access_token:${this.corpId}`; const cached = await cache.get(cacheKey); if (cached) { return cached as string; } // 2. 调用API获取 const response = await this.client.get('/gettoken', { params: { corpid: this.corpId, corpsecret: this.corpSecret } }); if (response.data.errcode !== 0) { throw new Error(`Failed to get access token: ${response.data.errmsg}`); } const accessToken = response.data.access_token; // 3. 缓存7000秒(留200秒buffer) await cache.set(cacheKey, accessToken, 7000); return accessToken; } /** * 发送应用消息(卡片通知) */ async sendMessage(params: { toUser: string; // 企微UserID title: string; description: string; url: string; // 跳转URL(Workbench) }): Promise { const accessToken = await this.getAccessToken(); const payload = { touser: params.toUser, msgtype: 'textcard', agentid: this.agentId, textcard: { title: params.title, description: params.description, url: params.url, btntxt: '立即查看' } }; const response = await this.client.post('/message/send', payload, { params: { access_token: accessToken } }); if (response.data.errcode !== 0) { logger.error('WeChat message send failed', { error: response.data.errmsg, toUser: params.toUser }); throw new Error(`Failed to send WeChat message: ${response.data.errmsg}`); } logger.info('WeChat message sent', { toUser: params.toUser, title: params.title }); } /** * 发送质控预警卡片 */ async sendQualityAlert(params: { toUser: string; projectName: string; recordId: string; issuesCount: number; workbenchUrl: string; }): Promise { await this.sendMessage({ toUser: params.toUser, title: '🚨 数据质控预警', description: `项目:${params.projectName}\n患者:${params.recordId}\nAI检测到${params.issuesCount}个问题\n置信度:高\n请尽快处理`, url: params.workbenchUrl }); } } ``` --- ## 4. 数据库设计 ### 4.1 Prisma Schema 定义 ```prisma // prisma/schema.prisma // ============================== // IIT Manager Schema // ============================== // 项目表 model IitProject { id String @id @default(uuid()) name String description String? @db.Text // Protocol知识库 difyDatasetId String? @unique // Dify Dataset ID protocolFileKey String? // OSS Key: iit/projects/{id}/protocol.pdf // 🔥 V1.1 新增:Dify性能优化 - 缓存关键规则 cachedRules Json? // { inclusionCriteria: [...], exclusionCriteria: [...], fields: {...} } // 字段映射配置(JSON) fieldMappings Json // { age: 'patient_age', gender: 'sex', ... } // REDCap配置 redcapProjectId String redcapApiToken String @db.Text // 加密存储 redcapUrl String // 🔥 V1.1 新增:同步管理 - 记录上次同步时间 lastSyncAt DateTime? // 上次轮询同步时间(用于增量拉取) // 项目状态 status String @default("active") // active/paused/completed // 时间戳 createdAt DateTime @default(now()) updatedAt DateTime @updatedAt deletedAt DateTime? // 关系 pendingActions IitPendingAction[] taskRuns IitTaskRun[] userMappings IitUserMapping[] auditLogs IitAuditLog[] @@index([status, deletedAt]) @@schema("iit") } // 影子状态表(核心) model IitPendingAction { id String @id @default(uuid()) projectId String recordId String // REDCap Record ID fieldName String // 字段名(逻辑名,如 'age') // 数据对比 currentValue Json? // 当前值 suggestedValue Json? // AI建议值 // 状态流转 status String // PROPOSED/APPROVED/REJECTED/EXECUTED/FAILED agentType String // DATA_QUALITY/TASK_DRIVEN/COUNSELING/REPORTING // AI推理信息 reasoning String @db.Text // AI推理过程 evidence Json // { protocolPage: 12, confidence: 0.92, ... } // 人类确认信息 approvedBy String? // User ID approvedAt DateTime? rejectionReason String? @db.Text // 执行信息 executedAt DateTime? errorMessage String? @db.Text // 时间戳 createdAt DateTime @default(now()) updatedAt DateTime @updatedAt // 关系 project IitProject @relation(fields: [projectId], references: [id]) @@index([projectId, status]) @@index([projectId, recordId]) @@index([status, createdAt]) @@schema("iit") } // 任务运行记录(与 pg-boss 关联) model IitTaskRun { id String @id @default(uuid()) projectId String taskType String // quality-check/follow-up/report-generation // 关联 pg-boss job jobId String @unique // platform_schema.job.id // 任务状态(镜像job状态,便于业务查询) status String // pending/processing/completed/failed // 业务结果 totalItems Int processedItems Int @default(0) successItems Int @default(0) failedItems Int @default(0) // 时间信息 startedAt DateTime? completedAt DateTime? duration Int? // 秒 // 时间戳 createdAt DateTime @default(now()) updatedAt DateTime @updatedAt // 关系 project IitProject @relation(fields: [projectId], references: [id]) @@index([projectId, taskType, status]) @@index([jobId]) @@schema("iit") } // 用户映射表(异构系统身份关联) model IitUserMapping { id String @id @default(uuid()) projectId String // 系统用户ID(本系统) systemUserId String // REDCap用户名 redcapUsername String // 企微OpenID wecomUserId String? // 🔥 V1.1 新增:小程序支持(与企微OpenID不同) miniProgramOpenId String? @unique // 微信小程序OpenID sessionKey String? // 微信session_key(加密存储) // 角色 role String // PI/CRC/SUB_I // 时间戳 createdAt DateTime @default(now()) updatedAt DateTime @updatedAt // 关系 project IitProject @relation(fields: [projectId], references: [id]) @@unique([projectId, systemUserId]) @@unique([projectId, redcapUsername]) @@index([wecomUserId]) @@index([miniProgramOpenId]) // 🔥 V1.1 新增索引 @@schema("iit") } // 审计日志(合规性) model IitAuditLog { id String @id @default(uuid()) projectId String // 操作信息 actionType String // AI_SUGGESTION/HUMAN_APPROVAL/REDCAP_WRITE/... actionId String? // PendingAction ID 或其他ID // 用户信息 userId String ipAddress String? userAgent String? @db.Text // 详细信息 details Json? // 操作详情 // 追踪链 traceId String // 关联多个操作 // 时间戳 createdAt DateTime @default(now()) // 关系 project IitProject @relation(fields: [projectId], references: [id]) @@index([projectId, createdAt]) @@index([userId, createdAt]) @@index([actionType, createdAt]) @@index([traceId]) @@schema("iit") } ``` ### 4.2 数据库迁移 ```bash # 生成迁移文件 npx prisma migrate dev --name add_iit_schema # 生成Prisma Client npx prisma generate ``` --- ## 5. API 设计 ### 5.1 API 端点清单 #### 项目管理 | 端点 | 方法 | 功能 | 优先级 | |------|------|------|--------| | `/api/v1/iit/projects` | POST | 创建项目 | P0 | | `/api/v1/iit/projects/:id` | GET | 获取项目详情 | P0 | | `/api/v1/iit/projects/:id` | PUT | 更新项目 | P1 | | `/api/v1/iit/projects/:id/protocol` | POST | 上传Protocol | P0 | | `/api/v1/iit/projects/:id/field-mappings` | PUT | 配置字段映射 | P0 | | 🔥 `/api/v1/iit/projects/:id/scan-all` | POST | **全量扫描(V1.1新增)** | P0 | #### Webhook接收 | 端点 | 方法 | 功能 | 优先级 | |------|------|------|--------| | `/api/v1/iit/webhooks/redcap` | POST | REDCap Webhook | P0 | #### 影子状态管理 | 端点 | 方法 | 功能 | 优先级 | |------|------|------|--------| | `/api/v1/iit/pending-actions` | GET | 获取待处理建议列表 | P0 | | `/api/v1/iit/pending-actions/:id` | GET | 获取建议详情 | P0 | | `/api/v1/iit/pending-actions/:id/approve` | POST | 确认建议 | P0 | | `/api/v1/iit/pending-actions/:id/reject` | POST | 拒绝建议 | P1 | #### 任务管理 | 端点 | 方法 | 功能 | 优先级 | |------|------|------|--------| | `/api/v1/iit/tasks` | GET | 获取任务列表 | P1 | | `/api/v1/iit/tasks/:id` | GET | 获取任务详情 | P1 | | `/api/v1/iit/tasks/:id/progress` | GET | 获取任务进度 | P1 | ### 5.2 API 实现示例 ```typescript // backend/src/modules/iit-manager/routes/projects.ts import { FastifyInstance } from 'fastify'; import { ProjectController } from '../controllers/ProjectController'; export async function projectRoutes(fastify: FastifyInstance) { const controller = new ProjectController(); // 创建项目 fastify.post('/projects', { schema: { body: { type: 'object', required: ['name', 'redcapProjectId', 'redcapApiToken', 'redcapUrl'], properties: { name: { type: 'string' }, description: { type: 'string' }, redcapProjectId: { type: 'string' }, redcapApiToken: { type: 'string' }, redcapUrl: { type: 'string' } } } } }, controller.createProject); // 上传Protocol fastify.post('/projects/:id/protocol', { schema: { params: { type: 'object', properties: { id: { type: 'string' } } } } }, controller.uploadProtocol); // 配置字段映射 fastify.put('/projects/:id/field-mappings', { schema: { params: { type: 'object', properties: { id: { type: 'string' } } }, body: { type: 'object', properties: { mappings: { type: 'object' } } } } }, controller.updateFieldMappings); } ``` --- ## 6. 部署架构 ### 6.1 阿里云SAE部署(符合现有架构) ``` ┌─────────────────────────────────────────────────────────┐ │ 阿里云 SAE 命名空间 │ │ ┌────────────────────────────────────────────────┐ │ │ │ 应用1: Node.js Backend(IIT Manager模块) │ │ │ │ - 镜像: backend-service:v1.1 │ │ │ │ - 规格: 2核4GB × 1实例 │ │ │ │ - 端口: 3001 │ │ │ │ - 健康检查: /api/health │ │ │ │ - 内网访问 │ │ │ └────────────────────────────────────────────────┘ │ │ ┌────────────────────────────────────────────────┐ │ │ │ 应用2: Python 微服务(已有) │ │ │ │ - 镜像: python-extraction:v1.0 │ │ │ │ - 规格: 1核2GB × 1实例 │ │ │ │ - 端口: 8000 │ │ │ │ - 内网访问 │ │ │ └────────────────────────────────────────────────┘ │ │ ┌────────────────────────────────────────────────┐ │ │ │ 应用3: Frontend Nginx(已有) │ │ │ │ - 镜像: frontend-nginx:v1.0 │ │ │ │ - 规格: 1核2GB × 1实例 │ │ │ │ - 端口: 80 │ │ │ │ - 公网访问(通过CLB) │ │ │ └────────────────────────────────────────────────┘ │ └─────────────────────────────────────────────────────────┘ ↓ ↑ 内网通信 ┌─────────────────────────────────────────────────────────┐ │ 数据存储层 │ │ ┌──────────────────┐ ┌──────────────────┐ │ │ │ RDS PostgreSQL │ │ OSS 对象存储 │ │ │ │ - 2核4GB │ │ - Protocol PDF │ │ │ │ - 11 Schemas │ │ - 文件上传 │ │ │ └──────────────────┘ └──────────────────┘ │ └─────────────────────────────────────────────────────────┘ ``` ### 6.2 环境变量配置 ```bash # backend/.env.production # 数据库 DATABASE_URL=postgresql://user:pass@pgm-xxx.rds.aliyuncs.com:5432/ai_clinical_research # OSS存储 STORAGE_MODE=oss OSS_REGION=cn-beijing OSS_BUCKET=ai-clinical-research OSS_ACCESS_KEY_ID=xxx OSS_ACCESS_KEY_SECRET=xxx # LLM LLM_API_KEY=sk-xxx LLM_BASE_URL=https://api.deepseek.com # Dify(已有) DIFY_API_KEY=xxx DIFY_BASE_URL=http://dify-service:5001 # REDCap REDCAP_WEBHOOK_SECRET=xxx # 与EM配置一致 # 企业微信 WECHAT_CORP_ID=xxx WECHAT_CORP_SECRET=xxx WECHAT_AGENT_ID=xxx # Python微服务(内网) PYTHON_SERVICE_URL=http://172.17.173.66:8000 # 日志级别 LOG_LEVEL=info ``` --- ## 7. 开发计划 ### 7.1 MVP 阶段(2周,P0) #### Week 1: 基础连接层(🔥 V1.1 优先级调整) **目标**:打通 REDCap ← Node.js(拉取) + 企微推送 **🔥 优先级调整理由**: - API拉取更可控(不依赖医院网络) - 能解决历史数据问题 - Webhook作为增强,而非核心依赖 **任务清单**: 1. **数据库初始化**(Day 1, 4小时) - [ ] 创建 iit_schema - [ ] 编写Prisma Schema(5个表,含V1.1新增字段) - [ ] 运行迁移:`npx prisma migrate dev --name init_iit_schema` - [ ] 生成Prisma Client:`npx prisma generate` - [ ] 验证:能在Node.js中执行CRUD 2. **企业微信注册**(Day 1, 2小时) - [ ] 注册企业微信开发者账号 - [ ] 创建自建应用:IIT Manager Agent(测试) - [ ] 获取凭证:CorpID、AgentID、Secret - [ ] 测试推送:用Postman发送一条卡片消息 3. **🔥 REDCap API Adapter开发**(Day 2, 8小时)**← 优先** - [ ] 创建 `RedcapAdapter.ts` - [ ] 实现 `exportRecords()`(支持时间过滤) - [ ] 实现 `importRecords()`(回写数据) - [ ] 实现 `exportMetadata()`(获取字段定义) - [ ] 测试:能拉取REDCap数据 4. **🔥 SyncManager开发**(Day 2, 8小时)**← 核心兜底** - [ ] 创建 `SyncManager.ts` - [ ] 实现 `initializeSync()`(智能同步策略) - [ ] 实现 `schedulePolling()`(定时轮询) - [ ] 实现 `handlePoll()`(轮询处理器) - [ ] 实现幂等性保护(防重复) - [ ] 测试:轮询能正确拉取新数据 5. **🔥 全量扫描功能**(Day 3, 8小时)**← 支持历史数据** - [ ] 创建 `BulkScanService.ts` - [ ] 实现 `scanAllRecords()`(智能阈值判断) - [ ] 实现 `processBatch()`(支持断点续传) - [ ] API端点:`POST /api/v1/iit/projects/:id/scan-all` - [ ] 测试:100条历史数据扫描成功 6. **REDCap EM开发**(Day 4, 8小时)**← 作为增强** - [ ] 创建EM目录结构 - [ ] 编写 `config.json`(EM配置文件) - [ ] 实现 `IITManagerConnector.php` - [ ] 实现 `redcap_save_record` Hook - [ ] 实现Webhook推送(带签名) 7. **Node.js Webhook接收器**(Day 4, 8小时) - [ ] 创建 `webhookController.ts` - [ ] 实现签名验证 - [ ] 实现防重放攻击 - [ ] 异步推送到质控队列 - [ ] Webhook连通性测试(自适应切换) 8. **企微适配器**(Day 5, 8小时) - [ ] 创建 `WeChatAdapter.ts` - [ ] 实现Access Token缓存 - [ ] 实现卡片消息推送 - [ ] 测试:发送质控预警卡片 **验收标准(V1.1)**: - ✅ **核心能力**:轮询能拉取REDCap新数据(延迟<10分钟) - ✅ **增强能力**:Webhook能推送(如果网络通)(延迟<2秒) - ✅ **历史数据**:全量扫描能处理存量数据 - ✅ **企微通知**:能收到质控预警卡片 - ✅ **自适应**:系统自动选择最佳同步模式 #### Week 2: AI 智能质控 **目标**:实现质控Agent的完整闭环 **任务清单**: 6. **Protocol服务**(Day 6-7, 16小时) - [ ] 创建 `ProtocolService.ts` - [ ] 实现Protocol PDF上传到OSS - [ ] 调用Dify创建Dataset - [ ] 实现 `checkProtocolCompliance()` 方法 - [ ] 测试:上传Protocol,能检索到内容 7. **质控Agent**(Day 8-9, 16小时) - [ ] 创建 `DataQualityAgent.ts` - [ ] 实现 `checkRecord()` 方法 - [ ] 调用Protocol服务检查合规性 - [ ] 创建影子建议(pending_actions表) - [ ] 发送企微通知(严重违背) - [ ] 测试:输入违背数据,生成正确建议 8. **PC Workbench前端骨架**(Day 10-12, 24小时) - [ ] 创建前端路由:`/iit/workbench` - [ ] 任务列表页(显示所有PROPOSED建议) - [ ] 详情对比页: - 左侧:当前数据 - 右侧:AI建议 + 证据片段 - [ ] 操作按钮:[拒绝] [确认] - [ ] 测试:能正确显示和操作 9. **影子状态流转**(Day 13, 8小时) - [ ] 实现 `PendingActionService.approveAction()` - [ ] 调用REDCap API回写数据 - [ ] 更新状态:PROPOSED → APPROVED → EXECUTED - [ ] 记录审计日志 - [ ] 测试:完整闭环(发现→确认→回写) 10. **端到端测试**(Day 14, 8小时) - [ ] 完整流程测试 - [ ] 性能测试(100条记录) - [ ] 错误处理测试 - [ ] Demo录制 **验收标准**: - ✅ AI能发现Protocol违背(准确率>80%) - ✅ Workbench能展示AI建议和证据链 - ✅ 确认后数据正确回写到REDCap - ✅ 完整审计日志 - ✅ 5分钟Demo录制完成 ### 7.2 Phase 1: 多终端协同(2周,P1) **任务清单**: 11. **🔥 微信小程序开发(V1.1 技术栈明确:Taro)**(Week 3-4) - [ ] **Taro 4.x项目初始化**(React语法) - [ ] 配置Taro编译为微信小程序 + H5 - [ ] 复用 `shared/components` 通用逻辑 - [ ] 动态品牌渲染(Logo、主题色) - [ ] 报表展示页面(Taro UI组件) - [ ] 审批操作界面 - [ ] 企微跳转集成 - [ ] 小程序登录(wx.login + sessionKey) **技术栈优势**: - ✅ React Hooks语法(团队熟悉) - ✅ 可复用前端代码和逻辑 - ✅ 一次开发,多端运行(小程序 + H5) - ✅ TypeScript支持完善 12. **任务驱动Agent**(Week 3-4) - [ ] 患者随访提醒 - [ ] 访视窗口监控 - [ ] 消息推送策略 ### 7.3 Phase 2-4(后续迭代) - Phase 2: OCR智能采集(4周) - Phase 3: 智能汇报Agent(2周) - Phase 4: 规模化优化(3周) --- ## 8. 风险评估与对策 ### 8.1 技术风险 #### 风险1:Dify RAG准确率不足 **风险等级**:高 **影响**:AI检测准确率<60%,假阳性过多 **应对策略**: **Plan A(优先)**: - 严格限制MVP检查范围(只检查3类简单规则) - 年龄、性别、必填项 = 规则明确,准确率高 - 先验证架构,后优化准确性 **Plan B(备选)**: - 如果Dify效果不佳,临时用硬编码规则 - MVP重点验证"影子状态机制",而非AI能力 - 规则引擎在Phase 2再优化 **验证方法**: - 用10个真实病例测试 - 准确率目标:>85% - 假阳性率:<15% #### 风险2:REDCap部署困难 **风险等级**:中 **影响**:医院的REDCap版本太老/没有部署权限 **应对策略**: **Plan A(推荐)**: - 自己部署一个测试REDCap(Docker) - 用于MVP Demo和内部测试 - 等签约医院后再对接他们的生产REDCap **Plan B(备选)**: - 先跳过REDCap,用Mock数据 - 重点展示Workbench和企微通知 - REDCap集成作为"技术可行性"说明 **Docker部署**: ```bash # 使用官方REDCap Docker镜像(测试环境) docker-compose up -d redcap mysql ``` #### 风险3:企微审核不通过 **风险等级**:低 **影响**:企业微信自建应用审核被拒 **应对策略**: **Plan A**: - 提前准备审核材料(公司资质、产品说明) - 应用类型选择"企业内部工具"(审核宽松) **Plan B**: - 如果审核慢,先用企微Webhook测试号 - 或临时用钉钉/飞书(技术方案通用) **关键**:企微不是唯一选择,架构设计已经解耦 ### 8.2 业务风险 #### 风险4:字段映射复杂性 **风险等级**:中 **影响**:不同医院的REDCap字段名差异大 **应对策略**: - MVP阶段:手动配置5个关键字段映射 - Phase 2:开发AI自动映射工具(NER识别) - Phase 3:建立标准字段库(100+常用字段) #### 风险5:医疗合规性审查 **风险等级**:高 **影响**:AI修改临床数据的合规性问题 **应对策略**: - ✅ **核心设计**:影子状态机制(AI只建议,人类确权) - ✅ **完整审计**:所有操作记录到audit_logs表 - ✅ **符合FDA 21 CFR Part 11**:电子签名和审计追踪 - ✅ **可回滚**:所有修改可追溯和撤销 ### 8.3 性能风险 #### 风险6:REDCap Webhook延迟 **风险等级**:低 **影响**:Webhook推送失败或延迟 **应对策略**: - ✅ 幂等性设计:重复Webhook不会重复处理 - ✅ 异步处理:Webhook立即返回200,后台异步执行 - ✅ 重试机制:pg-boss自动重试3次 - ✅ 死信队列:失败任务单独存储,人工介入 #### 风险7:大量并发质控 **风险等级**:中 **影响**:100个项目同时录入数据 **应对策略**: - ✅ 队列限流:pg-boss并发限制(每秒10个) - ✅ LLM限流:DeepSeek API限流保护 - ✅ Dify限流:RAG检索限流(每秒5次) - ✅ 优先级队列:紧急项目优先处理 --- ## 📊 总结 ### 核心优势 1. **完全复用平台能力** - ✅ 不重复实现基础设施 - ✅ 开发效率提升50% - ✅ 维护成本降低 2. **Postgres-Only架构** - ✅ 零额外成本(无需Redis) - ✅ 断点续传(支持长任务) - ✅ 符合云原生规范 3. **影子状态机制** - ✅ 医疗合规(FDA认证) - ✅ AI可控(人类确权) - ✅ 可追溯(完整审计) 4. **渐进式演进** - ✅ MVP 2周验证核心价值 - ✅ Phase 1-4逐步迭代 - ✅ 风险可控 ### MVP成功标准 **Demo场景**(5分钟): 1. CRC在REDCap录入违背数据(年龄65岁) 2. 30秒后,PI收到企微卡片:"年龄超出入排标准" 3. CRC打开Workbench,看到AI建议和Protocol证据(第12页) 4. CRC确认,数据自动回写REDCap **技术指标**: - Webhook响应时间 < 100ms - AI质控完成时间 < 30秒 - 企微推送延迟 < 5秒 - AI准确率 > 80% ### 下一步行动 **立即执行**(今天): 1. ✅ 确认企业微信注册进度 2. ✅ 确认技术栈(Node.js 22、PostgreSQL 15、TypeScript 5) 3. ✅ 创建项目看板(飞书/Notion) **Week 1 启动**(明天开始,V1.1优先级): 1. ✅ 数据库Schema初始化 2. 🔥 REDCap API Adapter开发(优先) 3. 🔥 SyncManager开发(核心兜底) 4. 🔥 全量扫描功能(支持历史数据) 5. ✅ REDCap EM开发(作为增强) 6. ✅ 企微适配器开发 --- ## 📝 V1.1 更新总结 ### 架构修正 **1. 混合同步模式(SyncManager)** - ✅ 解决医院内网连通性问题(致命风险) - ✅ 优先使用Webhook(实时性),轮询作为兜底(可靠性) - ✅ 智能自适应:自动选择最佳同步模式 - ✅ 幂等性保护:防止重复质控 **2. 历史数据全量扫描(BulkScanService)** - ✅ 支持存量数据质控(功能补充) - ✅ 智能阈值判断(<50条直接处理,≥50条队列处理) - ✅ 断点续传(支持长时间任务) - ✅ API端点:`POST /api/v1/iit/projects/:id/scan-all` ### 技术栈明确 **3. 前端技术栈:Taro 4.x** - ✅ React Hooks语法(团队熟悉) - ✅ 可复用 shared/components 逻辑 - ✅ 一次开发,多端运行(小程序 + H5) - ✅ TypeScript支持完善 ### 数据库增强 **4. Prisma Schema新增字段** - ✅ `IitProject.cachedRules`:缓存Protocol关键规则(性能优化) - ✅ `IitProject.lastSyncAt`:记录上次同步时间(增量拉取) - ✅ `IitUserMapping.miniProgramOpenId`:小程序OpenID(多端支持) - ✅ `IitUserMapping.sessionKey`:微信session_key(登录认证) ### 开发优先级调整 **5. MVP开发计划重排** - 🔥 **Day 2优先级**:REDCap API Adapter + SyncManager(拉取能力) - 🔥 **Day 3核心**:全量扫描功能(历史数据支持) - 🔥 **Day 4补充**:REDCap EM + Webhook(推送能力,作为增强) **调整理由**: - API拉取更可控(不依赖医院网络) - 能解决历史数据问题 - Webhook作为增强,而非核心依赖 ### 风险应对 **6. 网络连通性风险(已解决)** - ❌ **V1.0风险**:完全依赖Webhook,医院内网无法推送 - ✅ **V1.1修正**:混合同步模式,轮询作为兜底 - ✅ **可靠性**:99.9%(不依赖医院网络) **7. 历史数据风险(已解决)** - ❌ **V1.0风险**:只监听新数据,历史数据无法质控 - ✅ **V1.1修正**:全量扫描功能,支持存量数据 - ✅ **价值提升**:医院能对历史500个患者进行质控 ### 性能优化 **8. Dify RAG性能优化(预加载)** - ✅ Protocol上传时,预提取关键规则 - ✅ 缓存到`cachedRules`字段(JSONB) - ✅ 简单规则直接判断(无需调用Dify) - ✅ 复杂规则才调用Dify RAG(慢路径) --- **文档版本**:V1.1(架构评审修正版) **创建日期**:2025-12-31 **最后更新**:2025-12-31 **维护者**:架构团队 **审查参考**:`06-开发记录/IIT Manager Agent 技术方案审查与补丁.md` **下一步**:等待用户确认,准备启动MVP开发(按V1.1优先级)