# PKB 模块:Dify 替换为 pgvector 开发计划 > **文档版本:** v1.0 > **创建日期:** 2026-01-19 > **预计工期:** 2 周(10个工作日) > **前置条件:** ✅ pgvector 0.8.1 已安装 > **目标:** 用 PostgreSQL + pgvector 原生 RAG 替代 Dify,实现 R-C-R-G 混合检索架构 --- ## 📊 整体难度评估 ### 总体评估:⭐⭐⭐ 中等难度 | 评估维度 | 难度 | 说明 | |----------|------|------| | **数据库设计** | ⭐⭐ 低 | Prisma schema 直接写,pgvector 已就绪 | | **Embedding 服务** | ⭐⭐ 低 | 调用阿里云 API,简单封装 | | **文档切片** | ⭐⭐ 低 | 成熟方案,RecursiveCharacterTextSplitter | | **全要素提取** | ⭐⭐⭐ 中 | 需要调优 Prompt,处理 JSON 异常 | | **向量检索** | ⭐⭐⭐ 中 | pgvector SQL 语法需要学习 | | **混合检索(RRF)** | ⭐⭐⭐ 中 | 核心算法,需要调优 | | **服务替换** | ⭐⭐⭐⭐ 中高 | 需要保持 API 兼容,测试覆盖 | | **数据迁移** | ⭐⭐⭐ 中 | 现有文档需重新向量化 | **综合评估**:技术上完全可行,主要挑战在于**服务替换的平滑过渡**和**检索效果调优**。 --- ## 🔥 核心挑战分析 ### 挑战 1:混合检索效果调优 🔴 高风险 **问题描述**: - 替换 Dify 后,检索效果可能下降 - 需要调优向量检索 + 关键词检索的权重 - RRF 参数(k 值)需要实验确定 **应对策略**: - 准备测试数据集(100+ 查询) - 建立效果评估指标(Recall@K, MRR) - 先用小批量数据验证,再全量迁移 **预留时间**:2 天专门用于调优 --- ### 挑战 2:全要素提取的准确性 🟡 中风险 **问题描述**: - LLM 提取的 JSON 可能格式错误 - PICO、用药方案等字段提取不完整 - 不同类型文献(RCT/综述/病例)提取策略不同 **应对策略**: - 三层 JSON 解析容错(直接解析 → 提取代码块 → LLM修复) - 字段级校验(必填字段、类型校验) - 分文献类型设计 Prompt **预留时间**:1 天用于 Prompt 调优 --- ### 挑战 3:服务替换的兼容性 🟡 中风险 **问题描述**: - 需要保持 API 接口不变(前端零修改) - `searchKnowledgeBase()` 返回格式需兼容 - 文档上传流程需要无缝切换 **应对策略**: - 定义适配层,转换返回格式 - 新旧服务并行运行,灰度切换 - 充分测试所有使用场景 **预留时间**:1 天专门用于兼容性测试 --- ### 挑战 4:向量数据的批量处理 🟢 低风险 **问题描述**: - 批量 Embedding 调用需要控制并发 - 阿里云 API 有 QPS 限制 - 大文档切片后向量较多 **应对策略**: - 使用 p-queue 控制并发(固定 3 并发) - 批量 Embedding(每次最多 25 条) - 增量处理,支持断点续传 --- ## 📅 详细开发计划 ### 总览时间线 ``` Week 1: 基础设施 + 核心服务开发 ├── Day 1: 数据库设计 + Prisma 迁移 ├── Day 2: Embedding 服务 + 切片服务 ├── Day 3: 全要素提取服务(Prompt 调优) ├── Day 4: 向量检索服务(pgvector SQL) ├── Day 5: 混合检索 + RRF 融合 Week 2: 服务替换 + 测试 + 迁移 ├── Day 6: 修改 knowledgeBaseService(检索替换) ├── Day 7: 修改 documentService(上传替换) ├── Day 8: 集成测试 + 效果调优 ├── Day 9: 数据迁移(现有文档向量化) ├── Day 10: 清理 + 文档 + 上线 ``` --- ### Day 1:数据库设计 + Prisma 迁移 **目标**:创建向量存储的数据表 **任务清单**: - [ ] 设计 `EkbDocument` 表(增强文档,含临床数据 JSONB 字段) - [ ] 设计 `EkbChunk` 表(向量切片,含 pgvector 字段) - [ ] 编写 Prisma schema - [ ] 运行 `prisma migrate dev` - [ ] 创建 HNSW 索引(手动 SQL) - [ ] 验证向量插入和查询 **交付物**: - `prisma/schema.prisma` 更新 - `migrations/xxx_add_ekb_tables.sql` - 索引创建脚本 **预计工时**:4-6 小时 **关键代码**: ```prisma // schema.prisma model EkbDocument { id String @id @default(uuid()) kbId String userId String // 基础信息 filename String fileType String fileSizeBytes BigInt fileUrl String // 原始 PDF 的 OSS 地址 extractedText String? @db.Text // 解析后的 Markdown/文本 // 临床数据(JSONB) pico Json? studyDesign Json? regimen Json? safety Json? criteria Json? endpoints Json? // 状态 status String @default("pending") errorMessage String? @db.Text chunks EkbChunk[] knowledgeBase KnowledgeBase @relation(fields: [kbId], references: [id], onDelete: Cascade) createdAt DateTime @default(now()) updatedAt DateTime @updatedAt @@index([kbId]) @@index([status]) @@schema("pkb_schema") } model EkbChunk { id String @id @default(uuid()) documentId String content String @db.Text pageNumber Int? sectionType String? // pgvector 字段(需要手动创建) embedding Unsupported("vector(1024)")? document EkbDocument @relation(fields: [documentId], references: [id], onDelete: Cascade) createdAt DateTime @default(now()) @@index([documentId]) @@schema("pkb_schema") } ``` **手动 SQL(创建索引)**: ```sql -- 创建 HNSW 索引 CREATE INDEX IF NOT EXISTS ekb_chunk_embedding_idx ON "pkb_schema"."EkbChunk" USING hnsw (embedding vector_cosine_ops) WITH (m = 16, ef_construction = 64); -- 创建全文检索索引 CREATE INDEX IF NOT EXISTS ekb_chunk_content_idx ON "pkb_schema"."EkbChunk" USING gin (to_tsvector('simple', content)); -- 创建 JSONB GIN 索引(用于临床数据查询) CREATE INDEX IF NOT EXISTS ekb_document_pico_idx ON "pkb_schema"."EkbDocument" USING gin (pico); CREATE INDEX IF NOT EXISTS ekb_document_safety_idx ON "pkb_schema"."EkbDocument" USING gin (safety); ``` --- ### Day 2:Embedding 服务 + 切片服务 **目标**:实现文本向量化和文档切片 **任务清单**: - [ ] 创建 `EmbeddingService.ts`(阿里云 text-embedding-v3) - [ ] 创建 `ChunkService.ts`(RecursiveCharacterTextSplitter) - [ ] 单元测试:Embedding API 调用 - [ ] 单元测试:切片效果验证 - [ ] 环境变量配置(DASHSCOPE_API_KEY) **交付物**: - `backend/src/common/rag/EmbeddingService.ts` - `backend/src/common/rag/ChunkService.ts` - 单元测试文件 **预计工时**:4-6 小时 **关键代码**: ```typescript // EmbeddingService.ts export class EmbeddingService { private apiKey: string; private baseUrl = 'https://dashscope.aliyuncs.com/api/v1/services/embeddings/text-embedding/text-embedding'; async embed(text: string): Promise { ... } async embedBatch(texts: string[]): Promise { ... } async embedQuery(query: string): Promise { ... } } // ChunkService.ts export class ChunkService { splitDocument( text: string, options: { chunkSize: number; chunkOverlap: number } ): Chunk[] { ... } detectSections(text: string): Section[] { ... } } ``` --- ### Day 3:全要素提取服务 **目标**:实现 PICO、用药方案等临床数据的 AI 提取 **任务清单**: - [ ] 创建 `ClinicalExtractionService.ts` - [ ] 设计提取 Prompt(参考 EKB 方案) - [ ] 实现三层 JSON 解析容错 - [ ] 测试不同类型文献(RCT、综述、病例) - [ ] Prompt 调优(提高提取准确率) **交付物**: - `backend/src/modules/pkb/services/ClinicalExtractionService.ts` - `backend/prompts/clinical_extraction.txt` - 测试用例 **预计工时**:6-8 小时(含 Prompt 调优) **关键 Prompt**: ``` 你是一个医学数据专家。请阅读这篇文献,严格按照以下 JSON 格式提取关键信息。 如果文中未提及,字段留空(null)。 提取字段: 1. pico: { "P": "患者人群", "I": "干预措施", "C": "对照", "O": "结局指标" } 2. studyDesign: { "design": "研究类型", "sampleSize": 数字, "blinding": "盲法" } 3. regimen: [{ "drug": "药物名", "dose": "剂量", "frequency": "频率" }] 4. safety: { "ae_all": ["不良反应列表"], "ae_grade34": ["严重不良反应"] } 5. criteria: { "inclusion": ["纳入标准"], "exclusion": ["排除标准"] } 6. endpoints: { "primary": ["主要终点"], "secondary": ["次要终点"] } 输出必须是纯 JSON,不要有任何前言或后缀。 --- 文献内容: {{fullText}} ``` --- ### Day 4:向量检索服务(pgvector SQL) **目标**:实现基于 pgvector 的向量检索 **任务清单**: - [ ] 创建 `VectorSearchService.ts` - [ ] 实现向量检索(余弦相似度) - [ ] 实现关键词检索(PostgreSQL FTS) - [ ] 测试检索性能(1000+ 向量) - [ ] 优化查询(索引使用验证) **交付物**: - `backend/src/common/rag/VectorSearchService.ts` - 性能测试报告 **预计工时**:6 小时 **关键 SQL**: ```sql -- 向量检索 SELECT c.id, c.content, d.filename, 1 - (c.embedding <=> $1::vector) as score FROM "pkb_schema"."EkbChunk" c JOIN "pkb_schema"."EkbDocument" d ON c."documentId" = d.id WHERE d."kbId" = $2 ORDER BY c.embedding <=> $1::vector LIMIT $3; -- 关键词检索 SELECT c.id, c.content, d.filename, ts_rank_cd(to_tsvector('simple', c.content), plainto_tsquery('simple', $1)) as score FROM "pkb_schema"."EkbChunk" c JOIN "pkb_schema"."EkbDocument" d ON c."documentId" = d.id WHERE d."kbId" = $2 AND to_tsvector('simple', c.content) @@ plainto_tsquery('simple', $1) ORDER BY score DESC LIMIT $3; ``` --- ### Day 5:混合检索 + RRF 融合 **目标**:实现 R-C-R-G 架构中的混合检索 **任务清单**: - [ ] 实现 RRF(Reciprocal Rank Fusion)算法 - [ ] 实现并发三路检索(向量 + 关键词 + SQL 筛选) - [ ] 集成 Rerank API(可选,qwen-rerank) - [ ] 效果评估(对比 Dify 检索) - [ ] 参数调优(k 值、权重) **交付物**: - RRF 融合模块 - 效果评估报告 **预计工时**:6-8 小时 **RRF 算法**: ```typescript function rrfFusion( vectorResults: Result[], keywordResults: Result[], k: number = 60 // RRF 常数,通常取 60 ): Result[] { const scoreMap = new Map(); // 计算 RRF 分数 vectorResults.forEach((r, idx) => { const rrfScore = 1 / (k + idx + 1); scoreMap.set(r.id, (scoreMap.get(r.id) || 0) + rrfScore); }); keywordResults.forEach((r, idx) => { const rrfScore = 1 / (k + idx + 1); scoreMap.set(r.id, (scoreMap.get(r.id) || 0) + rrfScore); }); // 排序返回 return Array.from(scoreMap.entries()) .sort((a, b) => b[1] - a[1]) .map(([id, score]) => ({ id, score })); } ``` --- ### Day 6:修改 knowledgeBaseService(检索替换) **目标**:替换 Dify 检索为 pgvector 检索 **任务清单**: - [ ] 修改 `searchKnowledgeBase()` 函数 - [ ] 移除 `difyClient.retrieveKnowledge()` 调用 - [ ] 使用 `vectorSearchService.hybridSearch()` - [ ] 保持返回格式兼容(前端零修改) - [ ] 单元测试 **交付物**: - 更新后的 `knowledgeBaseService.ts` **预计工时**:4 小时 **关键修改**: ```typescript // 修改前 const results = await difyClient.retrieveKnowledge( knowledgeBase.difyDatasetId, query, { retrieval_model: { search_method: 'semantic_search', top_k: topK } } ); // 修改后 const searchResults = await vectorSearchService.hybridSearch(kbId, query, topK); // 格式转换(保持兼容) return { query: { content: query }, records: searchResults.map((r, idx) => ({ segment_id: r.id, document_id: r.documentId, document_name: r.documentName, position: idx + 1, score: r.score, content: r.content, })), }; ``` --- ### Day 7:修改 documentService(上传替换) **目标**:替换 Dify 上传流程为本地向量化流程 **任务清单**: - [ ] 修改 `uploadDocument()` 函数 - [ ] 移除 `difyClient.uploadDocumentDirectly()` 调用 - [ ] 实现本地处理流程(提取 → 切片 → 向量化) - [ ] 移除 Dify 状态轮询逻辑 - [ ] 实现自己的异步处理和状态更新 - [ ] 单元测试 **交付物**: - 更新后的 `documentService.ts` **预计工时**:6 小时 --- ### Day 8:集成测试 + 效果调优 **目标**:端到端测试,确保功能正常 **任务清单**: - [ ] 前端测试:创建知识库 - [ ] 前端测试:上传文档 - [ ] 前端测试:RAG 检索问答 - [ ] 效果对比:Dify vs pgvector 检索质量 - [ ] 性能测试:检索延迟 - [ ] Bug 修复 **交付物**: - 测试报告 - Bug 修复记录 **预计工时**:8 小时 --- ### Day 9:数据迁移(现有文档向量化) **目标**:将现有知识库文档迁移到新表并向量化 **任务清单**: - [ ] 编写迁移脚本(Document → EkbDocument) - [ ] 批量向量化现有文档 - [ ] 验证迁移完整性 - [ ] 验证检索效果 **交付物**: - `scripts/migrate-to-ekb.ts` - 迁移日志 **预计工时**:6 小时 **迁移脚本**: ```typescript // scripts/migrate-to-ekb.ts async function migrateDocuments() { // 1. 获取所有现有文档 const documents = await prisma.document.findMany({ where: { status: 'completed', extractedText: { not: null } }, }); console.log(`Found ${documents.length} documents to migrate`); // 2. 逐个迁移 for (const doc of documents) { try { // 创建 EkbDocument const ekbDoc = await prisma.ekbDocument.create({ data: { kbId: doc.kbId, userId: doc.userId, filename: doc.filename, fileType: doc.fileType, fileSizeBytes: doc.fileSizeBytes, extractedText: doc.extractedText, status: 'embedding', }, }); // 切片 const chunks = chunkService.splitDocument(doc.extractedText!); // 向量化 const embeddings = await embeddingService.embedBatch( chunks.map(c => c.content) ); // 存入数据库 // ... console.log(`✅ Migrated: ${doc.filename}`); } catch (error) { console.error(`❌ Failed: ${doc.filename}`, error); } } } ``` --- ### Day 10:清理 + 文档 + 上线 **目标**:清理遗留代码,更新文档,正式上线 **任务清单**: - [ ] 删除 `DifyClient.ts` - [ ] 删除 `difyDatasetId` 字段(可选,下个版本) - [ ] 删除 `difyDocumentId` 字段(可选,下个版本) - [ ] 更新 `00-模块当前状态与开发指南.md` - [ ] 更新环境变量文档 - [ ] 代码 Review - [ ] 合并到主分支 **交付物**: - 更新后的文档 - 清理后的代码 **预计工时**:4 小时 --- ## ⚠️ 风险评估与应对 ### 风险矩阵 | 风险 | 概率 | 影响 | 等级 | 应对措施 | |------|------|------|------|----------| | 检索效果下降 | 中 | 高 | 🔴 | 效果评估 + 参数调优 + 回滚方案 | | API 兼容性问题 | 低 | 高 | 🟡 | 格式转换层 + 充分测试 | | Embedding API 限流 | 中 | 中 | 🟡 | 并发控制 + 重试机制 | | 迁移数据丢失 | 低 | 高 | 🟡 | 备份 + 验证 + 回滚 | | 性能下降 | 低 | 中 | 🟢 | 索引优化 + 缓存 | ### 回滚方案 如果新方案效果不理想,可以: 1. 保留 `difyDatasetId` 字段,随时切回 Dify 2. 新旧服务通过 Feature Flag 切换 3. 灰度发布:先 10% 用户使用 pgvector --- ## 📊 资源需求 ### 人力资源 | 角色 | 工作量 | 说明 | |------|--------|------| | 后端开发 | 10 人天 | 核心开发 | | 测试 | 2 人天 | 集成测试 + 效果评估 | | **总计** | **12 人天** | 约 2 周 | ### 技术资源 | 资源 | 用途 | 成本 | |------|------|------| | 阿里云 DashScope | Embedding API | ~¥50/月 | | 阿里云 DashScope | Rerank API(可选) | ~¥20/月 | | PostgreSQL | 已有 | ¥0 | --- ## ✅ 验收标准 ### 功能验收 - [ ] 创建知识库:不依赖 Dify,直接创建 - [ ] 上传文档:本地处理 + 向量化 - [ ] RAG 检索:混合检索效果 ≥ Dify - [ ] 全文阅读模式:正常工作 - [ ] 批处理模式:正常工作 ### 性能验收 - [ ] 检索延迟:< 500ms(95 分位) - [ ] 上传处理:< 60s/文档(平均) - [ ] 向量化吞吐:> 100 文档/小时 ### 质量验收 - [ ] 检索召回率:≥ 80%(测试集) - [ ] 无 Dify 相关代码残留 - [ ] 文档更新完整 --- ## 📝 附录 ### A. 相关文档 - [企业级医学知识库综合技术解决方案 V2](../00-系统设计/企业级医学知识库_综合技术解决方案%20V2.md) - [PostgreSQL与pgvector深度应用分析](../00-系统设计/医疗科研AI系统架构评估报告:PostgreSQL与pgvector在RAG及知识库中的深度应用分析.md) - [PKB模块当前状态](../00-模块当前状态与开发指南.md) ### B. 环境变量配置 ```bash # .env 新增 DASHSCOPE_API_KEY=sk-xxx # 阿里云 DashScope API Key EMBEDDING_MODEL=text-embedding-v3 # Embedding 模型 EMBEDDING_DIMENSION=1024 # 向量维度 RERANK_MODEL=gte-rerank # Rerank 模型(可选) ``` ### C. 依赖更新 ```json // package.json { "dependencies": { // 新增 "langchain": "^0.1.0", // 可选,用于切片 "p-queue": "^8.0.0" // 并发控制 } } ``` --- **文档维护**:PKB 模块开发团队 **最后更新**:2026-01-19 **下次更新**:开发完成后更新进度