Files
AIclinicalresearch/docs/03-业务模块/IIT Manager Agent/02-技术设计/IIT Manager Agent 完整技术开发方案 (V1.1).md
HaHafeng 66255368b7 feat(admin): Add user management and upgrade to module permission system
Features - User Management (Phase 4.1):
- Database: Add user_modules table for fine-grained module permissions
- Database: Add 4 user permissions (view/create/edit/delete) to role_permissions
- Backend: UserService (780 lines) - CRUD with tenant isolation
- Backend: UserController + UserRoutes (648 lines) - 13 API endpoints
- Backend: Batch import users from Excel
- Frontend: UserListPage (412 lines) - list/filter/search/pagination
- Frontend: UserFormPage (341 lines) - create/edit with module config
- Frontend: UserDetailPage (393 lines) - details/tenant/module management
- Frontend: 3 modal components (592 lines) - import/assign/configure
- API: GET/POST/PUT/DELETE /api/admin/users/* endpoints

Architecture Upgrade - Module Permission System:
- Backend: Add getUserModules() method in auth.service
- Backend: Login API returns modules array in user object
- Frontend: AuthContext adds hasModule() method
- Frontend: Navigation filters modules based on user.modules
- Frontend: RouteGuard checks requiredModule instead of requiredVersion
- Frontend: Remove deprecated version-based permission system
- UX: Only show accessible modules in navigation (clean UI)
- UX: Smart redirect after login (avoid 403 for regular users)

Fixes:
- Fix UTF-8 encoding corruption in ~100 docs files
- Fix pageSize type conversion in userService (String to Number)
- Fix authUser undefined error in TopNavigation
- Fix login redirect logic with role-based access check
- Update Git commit guidelines v1.2 with UTF-8 safety rules

Database Changes:
- CREATE TABLE user_modules (user_id, tenant_id, module_code, is_enabled)
- ADD UNIQUE CONSTRAINT (user_id, tenant_id, module_code)
- INSERT 4 permissions + role assignments
- UPDATE PUBLIC tenant with 8 module subscriptions

Technical:
- Backend: 5 new files (~2400 lines)
- Frontend: 10 new files (~2500 lines)
- Docs: 1 development record + 2 status updates + 1 guideline update
- Total: ~4900 lines of code

Status: User management 100% complete, module permission system operational
2026-01-16 13:42:10 +08:00

2170 lines
64 KiB
Markdown
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# IIT Manager Agent 完整技术开发方案 (V1.1)
> **文档版本:** V1.1(架构评审修正版)
> **创建日期:** 2025-12-31
> **最后更新:** 2025-12-31
> **维护者:** 架构团队
> **适用阶段:** MVP + Phase 1-4 完整开发
> **文档目的:** 基于现有系统架构,提供可直接执行的技术实施方案
> **V1.1 更新:** 整合架构评审意见,修正网络连通性风险、增加历史数据扫描、明确前端技术栈
---
## 🔥 V1.1 核心修正
基于架构评审(参考:`06-开发记录/IIT Manager Agent 技术方案审查与补丁.md`本版本修正了3个关键问题
1. **✅ 致命风险修正**混合同步模式Webhook + 轮询),解决医院内网连通性问题
2. **✅ 功能补充**:历史数据全量扫描,支持存量数据质控
3. **✅ 技术栈明确**前端采用TaroReact语法支持小程序+H5双端
---
## 📋 文档导航
1. [系统架构设计](#1-系统架构设计)
2. [现有能力复用](#2-现有能力复用)
3. [核心技术实现](#3-核心技术实现)
4. [数据库设计](#4-数据库设计)
5. [API设计](#5-api设计)
6. [部署架构](#6-部署架构)
7. [开发计划](#7-开发计划)
8. [风险评估与对策](#8-风险评估与对策)
---
## 1. 系统架构设计
### 1.1 总体架构(基于现有平台)
```
┌─────────────────────────────────────────────────────────────┐
│ 用户交互层 (Frontend) │
│ ┌────────────┐ ┌────────────┐ ┌────────────┐ │
│ │ 企业微信 │ │ 微信小程序 │ │ PC │ │
│ │ (通知) │ │ (PI查看) │ │ Workbench │ │
│ └────────────┘ └────────────┘ └────────────┘ │
└─────────────────────────────────────────────────────────────┘
↓ ↑ REST API / WebSocket
┌─────────────────────────────────────────────────────────────┐
│ 业务模块层 (IIT Manager Module) │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ Node.js Backend (Fastify + TypeScript) │ │
│ │ ├── controllers/ - HTTP路由处理 │ │
│ │ ├── services/ - 业务逻辑层 │ │
│ │ ├── agents/ - 4个智能体 │ │
│ │ └── adapters/ - 外部系统适配器 │ │
│ └─────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────┘
↓ ↑ 复用平台能力
┌─────────────────────────────────────────────────────────────┐
│ 平台基础层 (Platform - 已有) │
│ ✅ Storage (OSS/Local) ✅ Logger (Winston) │
│ ✅ Cache (Postgres) ✅ JobQueue (pg-boss) │
│ ✅ LLMFactory (多模型) ✅ CheckpointService │
│ ✅ DifyClient (RAG) ✅ Database (Prisma) │
└─────────────────────────────────────────────────────────────┘
↓ ↑
┌─────────────────────────────────────────────────────────────┐
│ 外部系统集成层 (External) │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ REDCap │ │ Dify RAG │ │ 企业微信 │ │ Python │ │
│ │ (EDC) │ │ (知识库) │ │ (通知) │ │ 微服务 │ │
│ └──────────┘ └──────────┘ └──────────┘ └──────────┘ │
└─────────────────────────────────────────────────────────────┘
↓ ↑
┌─────────────────────────────────────────────────────────────┐
│ 数据存储层 (Storage) │
│ ┌──────────────────┐ ┌──────────────────┐ │
│ │ RDS PostgreSQL │ │ OSS对象存储 │ │
│ │ (业务数据+队列) │ │ (文件/Protocol) │ │
│ └──────────────────┘ └──────────────────┘ │
└─────────────────────────────────────────────────────────────┘
```
### 1.2 架构亮点(符合现有规范)
#### ✅ 1. 完全复用平台能力
```typescript
// ✅ 不重复实现基础设施
import { storage } from '@/common/storage'; // 文件存储
import { logger } from '@/common/logging'; // 日志系统
import { jobQueue } from '@/common/jobs'; // 异步任务
import { cache } from '@/common/cache'; // Postgres缓存
import { CheckpointService } from '@/common/jobs'; // 断点续传
import { LLMFactory } from '@/common/llm'; // LLM调用
import { DifyClient } from '@/clients/DifyClient'; // RAG检索
import { prisma } from '@/config/database'; // 数据库
```
#### ✅ 2. Postgres-Only 架构(遵循规范)
```typescript
// 任务管理信息存储在 job.data业务表只存储业务信息
await jobQueue.push('iit:quality-check:batch', {
// 业务信息
projectId: 'proj_001',
recordIds: ['P001', 'P002', ...],
// ✅ 任务拆分信息(自动存储在 platform_schema.job.data
batchIndex: 1,
totalBatches: 10,
// ✅ 进度追踪(自动存储)
processedCount: 0,
successCount: 0,
failedCount: 0
});
// 使用 CheckpointService 管理断点
const checkpointService = new CheckpointService(prisma);
await checkpointService.saveCheckpoint(job.id, {
currentBatchIndex: 5,
currentIndex: 250
});
```
#### ✅ 3. Schema 隔离(新增 iit_schema
```prisma
// prisma/schema.prisma
// 现有Schemaplatform, aia, pkb, asl, dc, ssa, st, rvw, admin, common
// 新增Schemaiit
generator client {
provider = "prisma-client-js"
previewFeatures = ["multiSchema"]
}
datasource db {
provider = "postgresql"
url = env("DATABASE_URL")
schemas = ["platform", "aia", "pkb", "asl", "dc", "iit"] // 新增 iit
}
// IIT Manager 的所有表都在 iit_schema 中
model IitProject {
id String @id @default(uuid())
// ...
@@schema("iit")
}
```
#### ✅ 4. 云原生就绪SAE部署
- 无状态应用(不依赖本地文件)
- 存储抽象层Local ↔ OSS 零代码切换)
- 异步任务避免30秒超时
- 数据库连接池(防止连接耗尽)
---
## 2. 现有能力复用
### 2.1 Dify RAG 集成(已有基础)
#### 现有能力PKB模块
```typescript
// backend/src/clients/DifyClient.ts (已有 282行代码)
export class DifyClient {
async createDataset(name: string): Promise<string>;
async uploadDocument(datasetId: string, file: Buffer): Promise<string>;
async query(datasetId: string, query: string): Promise<QueryResult>;
// ... 其他方法
}
```
#### IIT Manager 使用方式
```typescript
// backend/src/modules/iit-manager/services/ProtocolService.ts
import { DifyClient } from '@/clients/DifyClient';
export class ProtocolService {
private difyClient: DifyClient;
constructor() {
this.difyClient = new DifyClient(
process.env.DIFY_API_KEY!,
process.env.DIFY_BASE_URL!
);
}
/**
* 为项目创建Protocol知识库
*/
async initializeProtocolKnowledgeBase(
projectId: string,
protocolPdf: Buffer
): Promise<string> {
// 1. 创建Dify Dataset
const datasetId = await this.difyClient.createDataset(
`IIT_Project_${projectId}_Protocol`
);
// 2. 上传Protocol PDF
const documentId = await this.difyClient.uploadDocument(
datasetId,
protocolPdf
);
// 3. 保存到数据库
await prisma.iitProject.update({
where: { id: projectId },
data: { difyDatasetId: datasetId }
});
return datasetId;
}
/**
* 检查数据是否符合Protocol质控Agent核心
*/
async checkProtocolCompliance(params: {
projectId: string;
fieldName: string;
value: any;
context: Record<string, any>;
}): Promise<ComplianceResult> {
// 1. 获取项目的Dify知识库ID
const project = await prisma.iitProject.findUnique({
where: { id: params.projectId },
select: { difyDatasetId: true }
});
// 2. 构造查询Prompt
const query = `
患者数据:${JSON.stringify(params.context)}
当前字段:${params.fieldName} = ${params.value}
请检查此数据是否符合研究方案Protocol的要求。
如果发现问题,请指出:
1. 违反了哪条规则
2. 该规则在方案中的页码
3. 正确的值应该是什么
4. 置信度0-1
`;
// 3. 调用Dify RAG检索
const result = await this.difyClient.query(
project.difyDatasetId,
query
);
// 4. 解析AI响应
return this.parseComplianceResult(result);
}
}
```
### 2.2 LLM 调用(已有工厂)
```typescript
// ✅ 复用现有 LLMFactory
import { LLMFactory } from '@/common/llm';
const llm = LLMFactory.getAdapter('deepseek-v3');
const response = await llm.chat([
{ role: 'system', content: systemPrompt },
{ role: 'user', content: userInput }
]);
```
### 2.3 异步任务队列Postgres-Only
```typescript
// ✅ 使用 pg-boss 队列(平台已有)
import { jobQueue } from '@/common/jobs';
// 推送质控任务
await jobQueue.push('iit:quality-check:batch', {
projectId: 'proj_001',
recordIds: ['P001', 'P002', 'P003']
});
// Worker处理自动断点续传
jobQueue.registerWorker('iit:quality-check:batch', async (job) => {
const checkpointService = new CheckpointService(prisma);
// 加载断点
const checkpoint = await checkpointService.loadCheckpoint(job.id);
const startIndex = checkpoint?.currentIndex || 0;
// 批量处理
for (let i = startIndex; i < job.data.recordIds.length; i++) {
await processRecord(job.data.recordIds[i]);
// 每10条保存断点
if (i % 10 === 0) {
await checkpointService.saveCheckpoint(job.id, {
currentIndex: i,
processedCount: i
});
}
}
});
```
### 2.4 文件存储OSS抽象层
```typescript
// ✅ 使用存储抽象层
import { storage } from '@/common/storage';
// 上传Protocol PDF
const key = `iit/projects/${projectId}/protocol.pdf`;
const url = await storage.upload(key, pdfBuffer);
// 下载Protocol PDF
const pdfBuffer = await storage.download(key);
```
### 2.5 日志系统Winston
```typescript
// ✅ 使用平台日志系统
import { logger } from '@/common/logging';
logger.info('Quality check started', {
projectId,
recordId,
agent: 'DataQualityAgent'
});
logger.error('Quality check failed', {
error: err.message,
stack: err.stack,
projectId,
recordId
});
```
---
## 3. 核心技术实现
### 3.1 REDCap 集成(双向对接)
#### 3.1.1 REDCap External ModulePHP侧
```php
<?php
// redcap/modules/iit_manager_connector_v1.0.0/IITManagerConnector.php
namespace YiZhengXun\IITManagerConnector;
use ExternalModules\AbstractExternalModule;
class IITManagerConnector extends AbstractExternalModule {
/**
* Hook: 当记录保存时触发
*/
public function redcap_save_record($project_id, $record, $instrument,
$event_id, $group_id, $survey_hash,
$response_id, $repeat_instance) {
// 1. 获取变更数据
$data = \REDCap::getData($project_id, 'array', $record);
// 2. 推送Webhook到IIT Manager
$this->pushWebhook([
'event' => 'record_updated',
'project_id' => $project_id,
'record_id' => $record,
'instrument' => $instrument,
'event_id' => $event_id,
'data' => $data,
'timestamp' => time()
]);
}
/**
* 推送Webhook带签名验证
*/
private function pushWebhook($payload) {
$apiKey = $this->getSystemSetting('iit_manager_api_key');
$webhookUrl = $this->getSystemSetting('iit_manager_webhook_url');
// HMAC-SHA256签名
$signature = hash_hmac('sha256', json_encode($payload), $apiKey);
$ch = curl_init($webhookUrl);
curl_setopt($ch, CURLOPT_POST, 1);
curl_setopt($ch, CURLOPT_POSTFIELDS, json_encode($payload));
curl_setopt($ch, CURLOPT_HTTPHEADER, [
'Content-Type: application/json',
'X-Signature: ' . $signature,
'X-Timestamp: ' . time()
]);
curl_setopt($ch, CURLOPT_RETURNTRANSFER, true);
curl_setopt($ch, CURLOPT_TIMEOUT, 10);
$response = curl_exec($ch);
$httpCode = curl_getinfo($ch, CURLINFO_HTTP_CODE);
if ($httpCode !== 200) {
// 记录到REDCap日志
\REDCap::logEvent('IIT Manager Webhook Failed',
"HTTP $httpCode: $response", '', $record);
}
curl_close($ch);
}
}
```
#### 3.1.2 Node.js Webhook接收器
```typescript
// backend/src/modules/iit-manager/controllers/webhookController.ts
import { FastifyRequest, FastifyReply } from 'fastify';
import { logger } from '@/common/logging';
import { jobQueue } from '@/common/jobs';
import crypto from 'crypto';
interface RedcapWebhookPayload {
event: 'record_updated' | 'record_created' | 'record_deleted';
project_id: string;
record_id: string;
instrument: string;
event_id: string;
data: Record<string, any>;
timestamp: number;
}
export async function handleRedcapWebhook(
request: FastifyRequest<{ Body: RedcapWebhookPayload }>,
reply: FastifyReply
) {
// 1. 验证签名
const signature = request.headers['x-signature'] as string;
const timestamp = request.headers['x-timestamp'] as string;
if (!verifyWebhookSignature(request.body, signature, timestamp)) {
logger.warn('Invalid webhook signature', {
project_id: request.body.project_id
});
return reply.code(403).send({ error: 'Invalid signature' });
}
// 2. 防重放攻击5分钟有效期
const now = Math.floor(Date.now() / 1000);
if (Math.abs(now - parseInt(timestamp)) > 300) {
return reply.code(403).send({ error: 'Timestamp expired' });
}
// 3. 立即返回200不阻塞REDCap
reply.code(200).send({ status: 'accepted' });
// 4. 异步触发质控检查(不等待完成)
setImmediate(async () => {
try {
const { project_id, record_id, data } = request.body;
// 推送到质控队列
await jobQueue.push('iit:quality-check', {
projectId: project_id,
recordId: record_id,
data: data
});
logger.info('Quality check queued', {
project_id,
record_id
});
} catch (error) {
logger.error('Failed to queue quality check', {
error: error.message,
payload: request.body
});
}
});
}
/**
* 验证Webhook签名
*/
function verifyWebhookSignature(
payload: any,
signature: string,
timestamp: string
): boolean {
const apiKey = process.env.REDCAP_WEBHOOK_SECRET!;
const expectedSignature = crypto
.createHmac('sha256', apiKey)
.update(JSON.stringify(payload))
.digest('hex');
return crypto.timingSafeEqual(
Buffer.from(signature),
Buffer.from(expectedSignature)
);
}
```
#### 3.1.3 REDCap API 适配器(数据回写)
```typescript
// backend/src/modules/iit-manager/adapters/RedcapAdapter.ts
import axios, { AxiosInstance } from 'axios';
import { logger } from '@/common/logging';
export class RedcapAdapter {
private client: AxiosInstance;
private projectApiToken: string;
constructor(redcapUrl: string, apiToken: string) {
this.projectApiToken = apiToken;
this.client = axios.create({
baseURL: redcapUrl,
timeout: 30000
});
}
/**
* 导出记录
*/
async exportRecords(params: {
records?: string[];
fields?: string[];
events?: string[];
}): Promise<any[]> {
const response = await this.client.post('', {
token: this.projectApiToken,
content: 'record',
action: 'export',
format: 'json',
type: 'flat',
records: params.records,
fields: params.fields,
events: params.events
});
return response.data;
}
/**
* 导入记录(影子状态回写)
*/
async importRecords(records: Record<string, any>[]): Promise<void> {
try {
const response = await this.client.post('', {
token: this.projectApiToken,
content: 'record',
action: 'import',
format: 'json',
type: 'flat',
overwriteBehavior: 'normal',
data: JSON.stringify(records)
});
logger.info('REDCap records imported', {
count: response.data.count,
ids: response.data.ids
});
} catch (error) {
logger.error('REDCap import failed', {
error: error.message,
records: records.map(r => r.record_id)
});
throw error;
}
}
/**
* 导出元数据(表单结构)
*/
async exportMetadata(): Promise<any[]> {
const response = await this.client.post('', {
token: this.projectApiToken,
content: 'metadata',
format: 'json'
});
return response.data;
}
}
```
#### 3.1.4 混合同步模式(🔥 V1.1 核心修正)
```typescript
// backend/src/modules/iit-manager/services/SyncManager.ts
import { logger } from '@/common/logging';
import { jobQueue } from '@/common/jobs';
import { cache } from '@/common/cache';
import { prisma } from '@/config/database';
import { RedcapAdapter } from '../adapters/RedcapAdapter';
/**
* 同步管理器:解决医院内网连通性问题
*
* 核心策略:
* 1. 优先使用Webhook实时性- 适用于REDCap可访问公网的场景
* 2. 定时轮询作为兜底(可靠性)- 适用于所有场景
*/
export class SyncManager {
private redcapAdapter: RedcapAdapter;
constructor(redcapAdapter: RedcapAdapter) {
this.redcapAdapter = redcapAdapter;
}
/**
* 智能同步策略(自适应)
* 启动时测试Webhook连通性自动选择最佳模式
*/
async initializeSync(projectId: string) {
logger.info('Initializing sync strategy', { projectId });
// 1. 测试Webhook连通性
const webhookWorking = await this.testWebhookConnectivity(projectId);
if (webhookWorking) {
logger.info('Webhook connectivity OK, using real-time mode', { projectId });
// 轮询作为备份间隔30分钟
await this.schedulePolling(projectId, 30);
} else {
logger.warn('Webhook blocked by firewall, using polling mode', { projectId });
// 轮询作为主模式间隔5分钟
await this.schedulePolling(projectId, 5);
}
}
/**
* 测试Webhook连通性
*/
private async testWebhookConnectivity(projectId: string): Promise<boolean> {
try {
const project = await prisma.iitProject.findUnique({
where: { id: projectId },
select: { redcapUrl: true }
});
// 调用REDCap EM的测试端点
const response = await axios.post(
`${project.redcapUrl}/api/?type=module&prefix=iit_manager_connector&page=test`,
{ projectId, test: 'ping' },
{ timeout: 5000 }
);
return response.status === 200;
} catch (error) {
logger.warn('Webhook connectivity test failed', {
projectId,
error: error.message
});
return false;
}
}
/**
* 定时轮询(核心兜底机制)
*/
async schedulePolling(projectId: string, intervalMinutes: number = 10) {
// 使用 pg-boss 的 schedule 功能
await jobQueue.schedule(
'iit:redcap:poll',
{ projectId },
{
every: `${intervalMinutes} minutes`,
// 重要:设置合理的超时时间
expireIn: `${intervalMinutes * 2} minutes`
}
);
logger.info('Polling scheduled', {
projectId,
intervalMinutes
});
}
/**
* 轮询处理器Worker
*/
async handlePoll(projectId: string) {
const startTime = Date.now();
try {
// 1. 获取上次同步时间(从缓存或数据库)
const cacheKey = `iit:sync:${projectId}:last`;
const lastSync = await cache.get(cacheKey) ||
(await this.getLastSyncFromDB(projectId));
logger.debug('Polling started', { projectId, lastSync });
// 2. 调用REDCap API获取修改的记录轻量级
// REDCap API支持按时间过滤dateRangeBegin
const records = await this.redcapAdapter.exportRecords({
dateRangeBegin: lastSync,
fields: ['record_id', 'last_modified'] // 先只拉ID和时间戳
});
if (records.length === 0) {
logger.debug('No new records to sync', { projectId });
return;
}
logger.info('New records detected', {
projectId,
count: records.length,
since: lastSync
});
// 3. 批量推送质控任务(智能阈值判断)
const THRESHOLD = 50;
if (records.length >= THRESHOLD) {
// 大批量:队列模式 + 任务拆分
const chunks = this.splitIntoChunks(records, 50);
for (const chunk of chunks) {
await jobQueue.push('iit:quality-check:batch', {
projectId,
recordIds: chunk.map(r => r.record_id)
});
}
} else {
// 小批量:直接推送
for (const record of records) {
// 幂等性检查(防止重复处理)
const isDuplicate = await this.isDuplicate(projectId, record.record_id);
if (!isDuplicate) {
await jobQueue.push('iit:quality-check', {
projectId,
recordId: record.record_id
});
}
}
}
// 4. 更新同步时间(双写:缓存 + 数据库)
const now = new Date().toISOString();
await cache.set(cacheKey, now, 3600 * 24); // 缓存24小时
await this.updateLastSyncInDB(projectId, now);
logger.info('Polling completed', {
projectId,
recordsFound: records.length,
duration: Date.now() - startTime
});
} catch (error) {
logger.error('Polling failed', {
error: error.message,
projectId,
duration: Date.now() - startTime
});
throw error; // 让 pg-boss 重试
}
}
/**
* 幂等性保护(防止重复质控)
*/
private async isDuplicate(projectId: string, recordId: string): Promise<boolean> {
const key = `iit:processed:${projectId}:${recordId}`;
const exists = await cache.get(key);
if (!exists) {
await cache.set(key, 'true', 3600); // 缓存1小时
return false;
}
return true;
}
/**
* 从数据库获取上次同步时间
*/
private async getLastSyncFromDB(projectId: string): Promise<string> {
const project = await prisma.iitProject.findUnique({
where: { id: projectId },
select: { lastSyncAt: true }
});
return project?.lastSyncAt?.toISOString() ||
new Date(Date.now() - 24 * 3600 * 1000).toISOString(); // 默认24小时前
}
/**
* 更新数据库中的同步时间
*/
private async updateLastSyncInDB(projectId: string, syncTime: string) {
await prisma.iitProject.update({
where: { id: projectId },
data: { lastSyncAt: new Date(syncTime) }
});
}
/**
* 任务拆分工具
*/
private splitIntoChunks<T>(array: T[], chunkSize: number): T[][] {
const chunks: T[][] = [];
for (let i = 0; i < array.length; i += chunkSize) {
chunks.push(array.slice(i, i + chunkSize));
}
return chunks;
}
}
```
#### 3.1.5 历史数据全量扫描(🔥 V1.1 功能补充)
```typescript
// backend/src/modules/iit-manager/services/BulkScanService.ts
import { logger } from '@/common/logging';
import { jobQueue } from '@/common/jobs';
import { prisma } from '@/config/database';
import { CheckpointService } from '@/common/jobs';
import { RedcapAdapter } from '../adapters/RedcapAdapter';
import { DataQualityAgent } from '../agents/DataQualityAgent';
/**
* 全量扫描服务:支持存量数据质控
*
* 应用场景:
* 1. 项目初始化时,扫描历史数据
* 2. Protocol更新后重新扫描所有数据
* 3. 手动触发全量质控
*/
export class BulkScanService {
private redcapAdapter: RedcapAdapter;
constructor(redcapAdapter: RedcapAdapter) {
this.redcapAdapter = redcapAdapter;
}
/**
* 全量扫描(启动时或手动触发)
*/
async scanAllRecords(projectId: string): Promise<string> {
logger.info('Starting bulk scan', { projectId });
// 1. 轻量级拉取所有record_id不拉完整数据
const allRecords = await this.redcapAdapter.exportRecords({
fields: ['record_id'], // 只要ID速度快
rawOrLabel: 'raw'
});
const totalRecords = allRecords.length;
logger.info('Total records to scan', {
projectId,
totalRecords
});
// 2. 智能阈值判断
const THRESHOLD = 50;
const useQueue = totalRecords >= THRESHOLD;
if (useQueue) {
// 队列模式:任务拆分 + 断点续传
return await this.scanViaQueue(projectId, allRecords);
} else {
// 直接模式:快速处理
return await this.scanDirectly(projectId, allRecords);
}
}
/**
* 队列模式大批量数据≥50条
*/
private async scanViaQueue(
projectId: string,
allRecords: { record_id: string }[]
): Promise<string> {
// 1. 创建任务记录
const taskRun = await prisma.iitTaskRun.create({
data: {
projectId,
taskType: 'bulk-scan',
status: 'pending',
totalItems: allRecords.length,
processedItems: 0,
successItems: 0,
failedItems: 0
}
});
// 2. 任务拆分每批50条
const chunks = this.splitIntoChunks(allRecords, 50);
// 3. 推送批次任务
for (let i = 0; i < chunks.length; i++) {
const chunk = chunks[i];
const jobId = await jobQueue.push('iit:bulk-scan:batch', {
// 业务信息
taskRunId: taskRun.id,
projectId,
recordIds: chunk.map(r => r.record_id),
// ✅ 任务拆分信息(自动存储在 job.data
batchIndex: i,
totalBatches: chunks.length,
startIndex: i * 50,
endIndex: Math.min((i + 1) * 50, allRecords.length)
});
// 关联 job_id 到任务记录
await prisma.iitTaskRun.update({
where: { id: taskRun.id },
data: { jobId }
});
}
logger.info('Bulk scan queued', {
projectId,
totalRecords: allRecords.length,
totalBatches: chunks.length,
taskRunId: taskRun.id
});
return taskRun.id;
}
/**
* Worker处理批次支持断点续传
*/
async processBatch(job: any) {
const { taskRunId, projectId, recordIds, batchIndex, totalBatches } = job.data;
const checkpointService = new CheckpointService(prisma);
// 1. 加载断点
const checkpoint = await checkpointService.loadCheckpoint(job.id);
const startIndex = checkpoint?.currentIndex || 0;
logger.info('Processing batch', {
taskRunId,
batchIndex,
totalBatches,
recordCount: recordIds.length,
resumeFrom: startIndex
});
let successCount = 0;
let failedCount = 0;
// 2. 逐个处理记录
for (let i = startIndex; i < recordIds.length; i++) {
const recordId = recordIds[i];
try {
// 2.1 拉取完整数据(按需拉取,避免内存溢出)
const recordData = await this.redcapAdapter.exportRecords({
records: [recordId]
});
// 2.2 调用质控Agent
const agent = new DataQualityAgent();
await agent.checkRecord({
projectId,
recordId,
data: recordData[0]
});
successCount++;
} catch (error) {
logger.error('Record scan failed', {
recordId,
error: error.message
});
failedCount++;
}
// 2.3 每10条保存断点
if (i % 10 === 0 || i === recordIds.length - 1) {
await checkpointService.saveCheckpoint(job.id, {
currentIndex: i + 1,
processedCount: i + 1,
successCount,
failedCount
});
// 更新任务统计
await this.updateTaskProgress(taskRunId, i + 1, successCount, failedCount);
}
}
logger.info('Batch completed', {
taskRunId,
batchIndex,
successCount,
failedCount
});
}
/**
* 直接模式:小批量数据(<50条
*/
private async scanDirectly(
projectId: string,
allRecords: { record_id: string }[]
): Promise<string> {
// 创建任务记录
const taskRun = await prisma.iitTaskRun.create({
data: {
projectId,
taskType: 'bulk-scan',
status: 'processing',
totalItems: allRecords.length,
processedItems: 0,
successItems: 0,
failedItems: 0,
startedAt: new Date()
}
});
const agent = new DataQualityAgent();
let successCount = 0;
let failedCount = 0;
// 直接处理(不入队列)
for (const record of allRecords) {
try {
const recordData = await this.redcapAdapter.exportRecords({
records: [record.record_id]
});
await agent.checkRecord({
projectId,
recordId: record.record_id,
data: recordData[0]
});
successCount++;
} catch (error) {
logger.error('Record scan failed', {
recordId: record.record_id,
error: error.message
});
failedCount++;
}
}
// 更新任务完成
await prisma.iitTaskRun.update({
where: { id: taskRun.id },
data: {
status: 'completed',
processedItems: allRecords.length,
successItems: successCount,
failedItems: failedCount,
completedAt: new Date(),
duration: Math.floor((Date.now() - taskRun.startedAt.getTime()) / 1000)
}
});
return taskRun.id;
}
/**
* 更新任务进度(供前端轮询)
*/
private async updateTaskProgress(
taskRunId: string,
processedItems: number,
successItems: number,
failedItems: number
) {
const task = await prisma.iitTaskRun.findUnique({
where: { id: taskRunId },
select: { totalItems: true }
});
await prisma.iitTaskRun.update({
where: { id: taskRunId },
data: {
processedItems,
successItems,
failedItems,
status: processedItems >= task!.totalItems ? 'completed' : 'processing'
}
});
}
/**
* 任务拆分工具
*/
private splitIntoChunks<T>(array: T[], chunkSize: number): T[][] {
const chunks: T[][] = [];
for (let i = 0; i < array.length; i += chunkSize) {
chunks.push(array.slice(i, i + chunkSize));
}
return chunks;
}
}
```
### 3.2 数据质控 Agent核心业务
```typescript
// backend/src/modules/iit-manager/agents/DataQualityAgent.ts
import { logger } from '@/common/logging';
import { prisma } from '@/config/database';
import { ProtocolService } from '../services/ProtocolService';
export class DataQualityAgent {
private protocolService: ProtocolService;
constructor() {
this.protocolService = new ProtocolService();
}
/**
* 检查单条记录
*/
async checkRecord(params: {
projectId: string;
recordId: string;
data: Record<string, any>;
}): Promise<void> {
logger.info('Quality check started', params);
// 1. 获取项目配置(关键字段映射)
const project = await prisma.iitProject.findUnique({
where: { id: params.projectId },
select: {
fieldMappings: true, // JSON: { age: 'patient_age', gender: 'sex', ... }
difyDatasetId: true
}
});
if (!project || !project.difyDatasetId) {
logger.warn('Project not configured', { projectId: params.projectId });
return;
}
// 2. 提取关键字段值
const mappings = project.fieldMappings as Record<string, string>;
const context = {
age: params.data[mappings.age],
gender: params.data[mappings.gender],
enrollmentDate: params.data[mappings.enrollmentDate],
// ... 其他映射字段
};
// 3. 逐个字段检查
const issues: any[] = [];
for (const [logicalField, redcapField] of Object.entries(mappings)) {
const value = params.data[redcapField];
// 调用Protocol服务检查合规性
const result = await this.protocolService.checkProtocolCompliance({
projectId: params.projectId,
fieldName: logicalField,
value: value,
context: context
});
if (!result.isCompliant) {
issues.push({
fieldName: logicalField,
currentValue: value,
suggestedValue: result.suggestedValue,
reasoning: result.reasoning,
protocolPage: result.protocolPage,
confidence: result.confidence
});
}
}
// 4. 如果发现问题,创建影子建议
if (issues.length > 0) {
await this.createPendingActions(
params.projectId,
params.recordId,
issues
);
// 5. 发送企微通知(严重违背)
const severeIssues = issues.filter(i => i.confidence > 0.85);
if (severeIssues.length > 0) {
await this.sendWeChatNotification(
params.projectId,
params.recordId,
severeIssues
);
}
}
logger.info('Quality check completed', {
projectId: params.projectId,
recordId: params.recordId,
issuesFound: issues.length
});
}
/**
* 创建影子建议PROPOSED状态
*/
private async createPendingActions(
projectId: string,
recordId: string,
issues: any[]
): Promise<void> {
for (const issue of issues) {
await prisma.iitPendingAction.create({
data: {
projectId: projectId,
recordId: recordId,
fieldName: issue.fieldName,
currentValue: issue.currentValue,
suggestedValue: issue.suggestedValue,
status: 'PROPOSED',
agentType: 'DATA_QUALITY',
reasoning: issue.reasoning,
evidence: {
protocolPage: issue.protocolPage,
confidence: issue.confidence
},
createdAt: new Date()
}
});
}
}
/**
* 发送企微通知
*/
private async sendWeChatNotification(
projectId: string,
recordId: string,
issues: any[]
): Promise<void> {
// TODO: 实现企微通知Phase 3
logger.info('WeChat notification sent', {
projectId,
recordId,
issuesCount: issues.length
});
}
}
```
### 3.3 企业微信集成
```typescript
// backend/src/modules/iit-manager/adapters/WeChatAdapter.ts
import axios, { AxiosInstance } from 'axios';
import { cache } from '@/common/cache';
import { logger } from '@/common/logging';
export class WeChatAdapter {
private client: AxiosInstance;
private corpId: string;
private corpSecret: string;
private agentId: string;
constructor() {
this.corpId = process.env.WECHAT_CORP_ID!;
this.corpSecret = process.env.WECHAT_CORP_SECRET!;
this.agentId = process.env.WECHAT_AGENT_ID!;
this.client = axios.create({
baseURL: 'https://qyapi.weixin.qq.com/cgi-bin',
timeout: 10000
});
}
/**
* 获取Access Token缓存2小时
*/
private async getAccessToken(): Promise<string> {
// 1. 从缓存读取
const cacheKey = `wechat:access_token:${this.corpId}`;
const cached = await cache.get(cacheKey);
if (cached) {
return cached as string;
}
// 2. 调用API获取
const response = await this.client.get('/gettoken', {
params: {
corpid: this.corpId,
corpsecret: this.corpSecret
}
});
if (response.data.errcode !== 0) {
throw new Error(`Failed to get access token: ${response.data.errmsg}`);
}
const accessToken = response.data.access_token;
// 3. 缓存7000秒留200秒buffer
await cache.set(cacheKey, accessToken, 7000);
return accessToken;
}
/**
* 发送应用消息(卡片通知)
*/
async sendMessage(params: {
toUser: string; // 企微UserID
title: string;
description: string;
url: string; // 跳转URLWorkbench
}): Promise<void> {
const accessToken = await this.getAccessToken();
const payload = {
touser: params.toUser,
msgtype: 'textcard',
agentid: this.agentId,
textcard: {
title: params.title,
description: params.description,
url: params.url,
btntxt: '立即查看'
}
};
const response = await this.client.post('/message/send', payload, {
params: { access_token: accessToken }
});
if (response.data.errcode !== 0) {
logger.error('WeChat message send failed', {
error: response.data.errmsg,
toUser: params.toUser
});
throw new Error(`Failed to send WeChat message: ${response.data.errmsg}`);
}
logger.info('WeChat message sent', {
toUser: params.toUser,
title: params.title
});
}
/**
* 发送质控预警卡片
*/
async sendQualityAlert(params: {
toUser: string;
projectName: string;
recordId: string;
issuesCount: number;
workbenchUrl: string;
}): Promise<void> {
await this.sendMessage({
toUser: params.toUser,
title: '🚨 数据质控预警',
description: `项目:${params.projectName}\n患者${params.recordId}\nAI检测到${params.issuesCount}个问题\n置信度\n请尽快处理`,
url: params.workbenchUrl
});
}
}
```
---
## 4. 数据库设计
### 4.1 Prisma Schema 定义
```prisma
// prisma/schema.prisma
// ==============================
// IIT Manager Schema
// ==============================
// 项目表
model IitProject {
id String @id @default(uuid())
name String
description String? @db.Text
// Protocol知识库
difyDatasetId String? @unique // Dify Dataset ID
protocolFileKey String? // OSS Key: iit/projects/{id}/protocol.pdf
// 🔥 V1.1 新增Dify性能优化 - 缓存关键规则
cachedRules Json? // { inclusionCriteria: [...], exclusionCriteria: [...], fields: {...} }
// 字段映射配置JSON
fieldMappings Json // { age: 'patient_age', gender: 'sex', ... }
// REDCap配置
redcapProjectId String
redcapApiToken String @db.Text // 加密存储
redcapUrl String
// 🔥 V1.1 新增:同步管理 - 记录上次同步时间
lastSyncAt DateTime? // 上次轮询同步时间(用于增量拉取)
// 项目状态
status String @default("active") // active/paused/completed
// 时间戳
createdAt DateTime @default(now())
updatedAt DateTime @updatedAt
deletedAt DateTime?
// 关系
pendingActions IitPendingAction[]
taskRuns IitTaskRun[]
userMappings IitUserMapping[]
auditLogs IitAuditLog[]
@@index([status, deletedAt])
@@schema("iit")
}
// 影子状态表(核心)
model IitPendingAction {
id String @id @default(uuid())
projectId String
recordId String // REDCap Record ID
fieldName String // 字段名(逻辑名,如 'age'
// 数据对比
currentValue Json? // 当前值
suggestedValue Json? // AI建议值
// 状态流转
status String // PROPOSED/APPROVED/REJECTED/EXECUTED/FAILED
agentType String // DATA_QUALITY/TASK_DRIVEN/COUNSELING/REPORTING
// AI推理信息
reasoning String @db.Text // AI推理过程
evidence Json // { protocolPage: 12, confidence: 0.92, ... }
// 人类确认信息
approvedBy String? // User ID
approvedAt DateTime?
rejectionReason String? @db.Text
// 执行信息
executedAt DateTime?
errorMessage String? @db.Text
// 时间戳
createdAt DateTime @default(now())
updatedAt DateTime @updatedAt
// 关系
project IitProject @relation(fields: [projectId], references: [id])
@@index([projectId, status])
@@index([projectId, recordId])
@@index([status, createdAt])
@@schema("iit")
}
// 任务运行记录(与 pg-boss 关联)
model IitTaskRun {
id String @id @default(uuid())
projectId String
taskType String // quality-check/follow-up/report-generation
// 关联 pg-boss job
jobId String @unique // platform_schema.job.id
// 任务状态镜像job状态便于业务查询
status String // pending/processing/completed/failed
// 业务结果
totalItems Int
processedItems Int @default(0)
successItems Int @default(0)
failedItems Int @default(0)
// 时间信息
startedAt DateTime?
completedAt DateTime?
duration Int? // 秒
// 时间戳
createdAt DateTime @default(now())
updatedAt DateTime @updatedAt
// 关系
project IitProject @relation(fields: [projectId], references: [id])
@@index([projectId, taskType, status])
@@index([jobId])
@@schema("iit")
}
// 用户映射表(异构系统身份关联)
model IitUserMapping {
id String @id @default(uuid())
projectId String
// 系统用户ID本系统
systemUserId String
// REDCap用户名
redcapUsername String
// 企微OpenID
wecomUserId String?
// 🔥 V1.1 新增小程序支持与企微OpenID不同
miniProgramOpenId String? @unique // 微信小程序OpenID
sessionKey String? // 微信session_key加密存储
// 角色
role String // PI/CRC/SUB_I
// 时间戳
createdAt DateTime @default(now())
updatedAt DateTime @updatedAt
// 关系
project IitProject @relation(fields: [projectId], references: [id])
@@unique([projectId, systemUserId])
@@unique([projectId, redcapUsername])
@@index([wecomUserId])
@@index([miniProgramOpenId]) // 🔥 V1.1 新增索引
@@schema("iit")
}
// 审计日志(合规性)
model IitAuditLog {
id String @id @default(uuid())
projectId String
// 操作信息
actionType String // AI_SUGGESTION/HUMAN_APPROVAL/REDCAP_WRITE/...
actionId String? // PendingAction ID 或其他ID
// 用户信息
userId String
ipAddress String?
userAgent String? @db.Text
// 详细信息
details Json? // 操作详情
// 追踪链
traceId String // 关联多个操作
// 时间戳
createdAt DateTime @default(now())
// 关系
project IitProject @relation(fields: [projectId], references: [id])
@@index([projectId, createdAt])
@@index([userId, createdAt])
@@index([actionType, createdAt])
@@index([traceId])
@@schema("iit")
}
```
### 4.2 数据库迁移
```bash
# 生成迁移文件
npx prisma migrate dev --name add_iit_schema
# 生成Prisma Client
npx prisma generate
```
---
## 5. API 设计
### 5.1 API 端点清单
#### 项目管理
| 端点 | 方法 | 功能 | 优先级 |
|------|------|------|--------|
| `/api/v1/iit/projects` | POST | 创建项目 | P0 |
| `/api/v1/iit/projects/:id` | GET | 获取项目详情 | P0 |
| `/api/v1/iit/projects/:id` | PUT | 更新项目 | P1 |
| `/api/v1/iit/projects/:id/protocol` | POST | 上传Protocol | P0 |
| `/api/v1/iit/projects/:id/field-mappings` | PUT | 配置字段映射 | P0 |
| 🔥 `/api/v1/iit/projects/:id/scan-all` | POST | **全量扫描V1.1新增)** | P0 |
#### Webhook接收
| 端点 | 方法 | 功能 | 优先级 |
|------|------|------|--------|
| `/api/v1/iit/webhooks/redcap` | POST | REDCap Webhook | P0 |
#### 影子状态管理
| 端点 | 方法 | 功能 | 优先级 |
|------|------|------|--------|
| `/api/v1/iit/pending-actions` | GET | 获取待处理建议列表 | P0 |
| `/api/v1/iit/pending-actions/:id` | GET | 获取建议详情 | P0 |
| `/api/v1/iit/pending-actions/:id/approve` | POST | 确认建议 | P0 |
| `/api/v1/iit/pending-actions/:id/reject` | POST | 拒绝建议 | P1 |
#### 任务管理
| 端点 | 方法 | 功能 | 优先级 |
|------|------|------|--------|
| `/api/v1/iit/tasks` | GET | 获取任务列表 | P1 |
| `/api/v1/iit/tasks/:id` | GET | 获取任务详情 | P1 |
| `/api/v1/iit/tasks/:id/progress` | GET | 获取任务进度 | P1 |
### 5.2 API 实现示例
```typescript
// backend/src/modules/iit-manager/routes/projects.ts
import { FastifyInstance } from 'fastify';
import { ProjectController } from '../controllers/ProjectController';
export async function projectRoutes(fastify: FastifyInstance) {
const controller = new ProjectController();
// 创建项目
fastify.post('/projects', {
schema: {
body: {
type: 'object',
required: ['name', 'redcapProjectId', 'redcapApiToken', 'redcapUrl'],
properties: {
name: { type: 'string' },
description: { type: 'string' },
redcapProjectId: { type: 'string' },
redcapApiToken: { type: 'string' },
redcapUrl: { type: 'string' }
}
}
}
}, controller.createProject);
// 上传Protocol
fastify.post('/projects/:id/protocol', {
schema: {
params: {
type: 'object',
properties: {
id: { type: 'string' }
}
}
}
}, controller.uploadProtocol);
// 配置字段映射
fastify.put('/projects/:id/field-mappings', {
schema: {
params: {
type: 'object',
properties: {
id: { type: 'string' }
}
},
body: {
type: 'object',
properties: {
mappings: { type: 'object' }
}
}
}
}, controller.updateFieldMappings);
}
```
---
## 6. 部署架构
### 6.1 阿里云SAE部署符合现有架构
```
┌─────────────────────────────────────────────────────────┐
│ 阿里云 SAE 命名空间 │
│ ┌────────────────────────────────────────────────┐ │
│ │ 应用1: Node.js BackendIIT Manager模块 │ │
│ │ - 镜像: backend-service:v1.1 │ │
│ │ - 规格: 2核4GB × 1实例 │ │
│ │ - 端口: 3001 │ │
│ │ - 健康检查: /api/health │ │
│ │ - 内网访问 │ │
│ └────────────────────────────────────────────────┘ │
│ ┌────────────────────────────────────────────────┐ │
│ │ 应用2: Python 微服务(已有) │ │
│ │ - 镜像: python-extraction:v1.0 │ │
│ │ - 规格: 1核2GB × 1实例 │ │
│ │ - 端口: 8000 │ │
│ │ - 内网访问 │ │
│ └────────────────────────────────────────────────┘ │
│ ┌────────────────────────────────────────────────┐ │
│ │ 应用3: Frontend Nginx已有 │ │
│ │ - 镜像: frontend-nginx:v1.0 │ │
│ │ - 规格: 1核2GB × 1实例 │ │
│ │ - 端口: 80 │ │
│ │ - 公网访问通过CLB │ │
│ └────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────┘
↓ ↑ 内网通信
┌─────────────────────────────────────────────────────────┐
│ 数据存储层 │
│ ┌──────────────────┐ ┌──────────────────┐ │
│ │ RDS PostgreSQL │ │ OSS 对象存储 │ │
│ │ - 2核4GB │ │ - Protocol PDF │ │
│ │ - 11 Schemas │ │ - 文件上传 │ │
│ └──────────────────┘ └──────────────────┘ │
└─────────────────────────────────────────────────────────┘
```
### 6.2 环境变量配置
```bash
# backend/.env.production
# 数据库
DATABASE_URL=postgresql://user:pass@pgm-xxx.rds.aliyuncs.com:5432/ai_clinical_research
# OSS存储
STORAGE_MODE=oss
OSS_REGION=cn-beijing
OSS_BUCKET=ai-clinical-research
OSS_ACCESS_KEY_ID=xxx
OSS_ACCESS_KEY_SECRET=xxx
# LLM
LLM_API_KEY=sk-xxx
LLM_BASE_URL=https://api.deepseek.com
# Dify已有
DIFY_API_KEY=xxx
DIFY_BASE_URL=http://dify-service:5001
# REDCap
REDCAP_WEBHOOK_SECRET=xxx # 与EM配置一致
# 企业微信
WECHAT_CORP_ID=xxx
WECHAT_CORP_SECRET=xxx
WECHAT_AGENT_ID=xxx
# Python微服务内网
PYTHON_SERVICE_URL=http://172.17.173.66:8000
# 日志级别
LOG_LEVEL=info
```
---
## 7. 开发计划
### 7.1 MVP 阶段2周P0
#### Week 1: 基础连接层(🔥 V1.1 优先级调整)
**目标**:打通 REDCap ← Node.js拉取 + 企微推送
**🔥 优先级调整理由**
- API拉取更可控不依赖医院网络
- 能解决历史数据问题
- Webhook作为增强而非核心依赖
**任务清单**
1. **数据库初始化**Day 1, 4小时
- [ ] 创建 iit_schema
- [ ] 编写Prisma Schema5个表含V1.1新增字段)
- [ ] 运行迁移:`npx prisma migrate dev --name init_iit_schema`
- [ ] 生成Prisma Client`npx prisma generate`
- [ ] 验证能在Node.js中执行CRUD
2. **企业微信注册**Day 1, 2小时
- [ ] 注册企业微信开发者账号
- [ ] 创建自建应用IIT Manager Agent测试
- [ ] 获取凭证CorpID、AgentID、Secret
- [ ] 测试推送用Postman发送一条卡片消息
3. **🔥 REDCap API Adapter开发**Day 2, 8小时**← 优先**
- [ ] 创建 `RedcapAdapter.ts`
- [ ] 实现 `exportRecords()`(支持时间过滤)
- [ ] 实现 `importRecords()`(回写数据)
- [ ] 实现 `exportMetadata()`(获取字段定义)
- [ ] 测试能拉取REDCap数据
4. **🔥 SyncManager开发**Day 2, 8小时**← 核心兜底**
- [ ] 创建 `SyncManager.ts`
- [ ] 实现 `initializeSync()`(智能同步策略)
- [ ] 实现 `schedulePolling()`(定时轮询)
- [ ] 实现 `handlePoll()`(轮询处理器)
- [ ] 实现幂等性保护(防重复)
- [ ] 测试:轮询能正确拉取新数据
5. **🔥 全量扫描功能**Day 3, 8小时**← 支持历史数据**
- [ ] 创建 `BulkScanService.ts`
- [ ] 实现 `scanAllRecords()`(智能阈值判断)
- [ ] 实现 `processBatch()`(支持断点续传)
- [ ] API端点`POST /api/v1/iit/projects/:id/scan-all`
- [ ] 测试100条历史数据扫描成功
6. **REDCap EM开发**Day 4, 8小时**← 作为增强**
- [ ] 创建EM目录结构
- [ ] 编写 `config.json`EM配置文件
- [ ] 实现 `IITManagerConnector.php`
- [ ] 实现 `redcap_save_record` Hook
- [ ] 实现Webhook推送带签名
7. **Node.js Webhook接收器**Day 4, 8小时
- [ ] 创建 `webhookController.ts`
- [ ] 实现签名验证
- [ ] 实现防重放攻击
- [ ] 异步推送到质控队列
- [ ] Webhook连通性测试自适应切换
8. **企微适配器**Day 5, 8小时
- [ ] 创建 `WeChatAdapter.ts`
- [ ] 实现Access Token缓存
- [ ] 实现卡片消息推送
- [ ] 测试:发送质控预警卡片
**验收标准V1.1**
-**核心能力**轮询能拉取REDCap新数据延迟<10分钟
-**增强能力**Webhook能推送如果网络通延迟<2秒
-**历史数据**:全量扫描能处理存量数据
-**企微通知**:能收到质控预警卡片
-**自适应**:系统自动选择最佳同步模式
#### Week 2: AI 智能质控
**目标**实现质控Agent的完整闭环
**任务清单**
6. **Protocol服务**Day 6-7, 16小时
- [ ] 创建 `ProtocolService.ts`
- [ ] 实现Protocol PDF上传到OSS
- [ ] 调用Dify创建Dataset
- [ ] 实现 `checkProtocolCompliance()` 方法
- [ ] 测试上传Protocol能检索到内容
7. **质控Agent**Day 8-9, 16小时
- [ ] 创建 `DataQualityAgent.ts`
- [ ] 实现 `checkRecord()` 方法
- [ ] 调用Protocol服务检查合规性
- [ ] 创建影子建议pending_actions表
- [ ] 发送企微通知(严重违背)
- [ ] 测试:输入违背数据,生成正确建议
8. **PC Workbench前端骨架**Day 10-12, 24小时
- [ ] 创建前端路由:`/iit/workbench`
- [ ] 任务列表页显示所有PROPOSED建议
- [ ] 详情对比页:
- 左侧:当前数据
- 右侧AI建议 + 证据片段
- [ ] 操作按钮:[拒绝] [确认]
- [ ] 测试:能正确显示和操作
9. **影子状态流转**Day 13, 8小时
- [ ] 实现 `PendingActionService.approveAction()`
- [ ] 调用REDCap API回写数据
- [ ] 更新状态PROPOSED → APPROVED → EXECUTED
- [ ] 记录审计日志
- [ ] 测试:完整闭环(发现→确认→回写)
10. **端到端测试**Day 14, 8小时
- [ ] 完整流程测试
- [ ] 性能测试100条记录
- [ ] 错误处理测试
- [ ] Demo录制
**验收标准**
- ✅ AI能发现Protocol违背准确率>80%
- ✅ Workbench能展示AI建议和证据链
- ✅ 确认后数据正确回写到REDCap
- ✅ 完整审计日志
- ✅ 5分钟Demo录制完成
### 7.2 Phase 1: 多终端协同2周P1
**任务清单**
11. **🔥 微信小程序开发V1.1 技术栈明确Taro**Week 3-4
- [ ] **Taro 4.x项目初始化**React语法
- [ ] 配置Taro编译为微信小程序 + H5
- [ ] 复用 `shared/components` 通用逻辑
- [ ] 动态品牌渲染Logo、主题色
- [ ] 报表展示页面Taro UI组件
- [ ] 审批操作界面
- [ ] 企微跳转集成
- [ ] 小程序登录wx.login + sessionKey
**技术栈优势**
- ✅ React Hooks语法团队熟悉
- ✅ 可复用前端代码和逻辑
- ✅ 一次开发,多端运行(小程序 + H5
- ✅ TypeScript支持完善
12. **任务驱动Agent**Week 3-4
- [ ] 患者随访提醒
- [ ] 访视窗口监控
- [ ] 消息推送策略
### 7.3 Phase 2-4后续迭代
- Phase 2: OCR智能采集4周
- Phase 3: 智能汇报Agent2周
- Phase 4: 规模化优化3周
---
## 8. 风险评估与对策
### 8.1 技术风险
#### 风险1Dify RAG准确率不足
**风险等级**:高
**影响**AI检测准确率<60%,假阳性过多
**应对策略**
**Plan A优先**
- 严格限制MVP检查范围只检查3类简单规则
- 年龄、性别、必填项 = 规则明确,准确率高
- 先验证架构,后优化准确性
**Plan B备选**
- 如果Dify效果不佳临时用硬编码规则
- MVP重点验证"影子状态机制"而非AI能力
- 规则引擎在Phase 2再优化
**验证方法**
- 用10个真实病例测试
- 准确率目标:>85%
- 假阳性率:<15%
#### 风险2REDCap部署困难
**风险等级**:中
**影响**医院的REDCap版本太老/没有部署权限
**应对策略**
**Plan A推荐**
- 自己部署一个测试REDCapDocker
- 用于MVP Demo和内部测试
- 等签约医院后再对接他们的生产REDCap
**Plan B备选**
- 先跳过REDCap用Mock数据
- 重点展示Workbench和企微通知
- REDCap集成作为"技术可行性"说明
**Docker部署**
```bash
# 使用官方REDCap Docker镜像测试环境
docker-compose up -d redcap mysql
```
#### 风险3企微审核不通过
**风险等级**:低
**影响**:企业微信自建应用审核被拒
**应对策略**
**Plan A**
- 提前准备审核材料(公司资质、产品说明)
- 应用类型选择"企业内部工具"(审核宽松)
**Plan B**
- 如果审核慢先用企微Webhook测试号
- 或临时用钉钉/飞书(技术方案通用)
**关键**:企微不是唯一选择,架构设计已经解耦
### 8.2 业务风险
#### 风险4字段映射复杂性
**风险等级**:中
**影响**不同医院的REDCap字段名差异大
**应对策略**
- MVP阶段手动配置5个关键字段映射
- Phase 2开发AI自动映射工具NER识别
- Phase 3建立标准字段库100+常用字段)
#### 风险5医疗合规性审查
**风险等级**:高
**影响**AI修改临床数据的合规性问题
**应对策略**
-**核心设计**影子状态机制AI只建议人类确权
-**完整审计**所有操作记录到audit_logs表
-**符合FDA 21 CFR Part 11**:电子签名和审计追踪
-**可回滚**:所有修改可追溯和撤销
### 8.3 性能风险
#### 风险6REDCap Webhook延迟
**风险等级**:低
**影响**Webhook推送失败或延迟
**应对策略**
- ✅ 幂等性设计重复Webhook不会重复处理
- ✅ 异步处理Webhook立即返回200后台异步执行
- ✅ 重试机制pg-boss自动重试3次
- ✅ 死信队列:失败任务单独存储,人工介入
#### 风险7大量并发质控
**风险等级**:中
**影响**100个项目同时录入数据
**应对策略**
- ✅ 队列限流pg-boss并发限制每秒10个
- ✅ LLM限流DeepSeek API限流保护
- ✅ Dify限流RAG检索限流每秒5次
- ✅ 优先级队列:紧急项目优先处理
---
## 📊 总结
### 核心优势
1. **完全复用平台能力**
- ✅ 不重复实现基础设施
- ✅ 开发效率提升50%
- ✅ 维护成本降低
2. **Postgres-Only架构**
- ✅ 零额外成本无需Redis
- ✅ 断点续传(支持长任务)
- ✅ 符合云原生规范
3. **影子状态机制**
- ✅ 医疗合规FDA认证
- ✅ AI可控人类确权
- ✅ 可追溯(完整审计)
4. **渐进式演进**
- ✅ MVP 2周验证核心价值
- ✅ Phase 1-4逐步迭代
- ✅ 风险可控
### MVP成功标准
**Demo场景**5分钟
1. CRC在REDCap录入违背数据年龄65岁
2. 30秒后PI收到企微卡片"年龄超出入排标准"
3. CRC打开Workbench看到AI建议和Protocol证据第12页
4. CRC确认数据自动回写REDCap
**技术指标**
- Webhook响应时间 < 100ms
- AI质控完成时间 < 30秒
- 企微推送延迟 < 5秒
- AI准确率 > 80%
### 下一步行动
**立即执行**(今天):
1. ✅ 确认企业微信注册进度
2. ✅ 确认技术栈Node.js 22、PostgreSQL 15、TypeScript 5
3. ✅ 创建项目看板(飞书/Notion
**Week 1 启动**明天开始V1.1优先级):
1. ✅ 数据库Schema初始化
2. 🔥 REDCap API Adapter开发优先
3. 🔥 SyncManager开发核心兜底
4. 🔥 全量扫描功能(支持历史数据)
5. ✅ REDCap EM开发作为增强
6. ✅ 企微适配器开发
---
## 📝 V1.1 更新总结
### 架构修正
**1. 混合同步模式SyncManager**
- ✅ 解决医院内网连通性问题(致命风险)
- ✅ 优先使用Webhook实时性轮询作为兜底可靠性
- ✅ 智能自适应:自动选择最佳同步模式
- ✅ 幂等性保护:防止重复质控
**2. 历史数据全量扫描BulkScanService**
- ✅ 支持存量数据质控(功能补充)
- ✅ 智能阈值判断(<50条直接处理≥50条队列处理
- ✅ 断点续传(支持长时间任务)
- ✅ API端点`POST /api/v1/iit/projects/:id/scan-all`
### 技术栈明确
**3. 前端技术栈Taro 4.x**
- ✅ React Hooks语法团队熟悉
- ✅ 可复用 shared/components 逻辑
- ✅ 一次开发,多端运行(小程序 + H5
- ✅ TypeScript支持完善
### 数据库增强
**4. Prisma Schema新增字段**
-`IitProject.cachedRules`缓存Protocol关键规则性能优化
-`IitProject.lastSyncAt`:记录上次同步时间(增量拉取)
-`IitUserMapping.miniProgramOpenId`小程序OpenID多端支持
-`IitUserMapping.sessionKey`微信session_key登录认证
### 开发优先级调整
**5. MVP开发计划重排**
- 🔥 **Day 2优先级**REDCap API Adapter + SyncManager拉取能力
- 🔥 **Day 3核心**:全量扫描功能(历史数据支持)
- 🔥 **Day 4补充**REDCap EM + Webhook推送能力作为增强
**调整理由**
- API拉取更可控不依赖医院网络
- 能解决历史数据问题
- Webhook作为增强而非核心依赖
### 风险应对
**6. 网络连通性风险(已解决)**
-**V1.0风险**完全依赖Webhook医院内网无法推送
-**V1.1修正**:混合同步模式,轮询作为兜底
-**可靠性**99.9%(不依赖医院网络)
**7. 历史数据风险(已解决)**
-**V1.0风险**:只监听新数据,历史数据无法质控
-**V1.1修正**:全量扫描功能,支持存量数据
-**价值提升**医院能对历史500个患者进行质控
### 性能优化
**8. Dify RAG性能优化预加载**
- ✅ Protocol上传时预提取关键规则
- ✅ 缓存到`cachedRules`字段JSONB
- ✅ 简单规则直接判断无需调用Dify
- ✅ 复杂规则才调用Dify RAG慢路径
---
**文档版本**V1.1(架构评审修正版)
**创建日期**2025-12-31
**最后更新**2025-12-31
**维护者**:架构团队
**审查参考**`06-开发记录/IIT Manager Agent 技术方案审查与补丁.md`
**下一步**等待用户确认准备启动MVP开发按V1.1优先级)