Files
AIclinicalresearch/docs/02-通用能力层/03-RAG引擎/03-分阶段实施方案.md
HaHafeng 40c2f8e148 feat(rag): Complete RAG engine implementation with pgvector
Major Features:
- Created ekb_schema (13th schema) with 3 tables: KB/Document/Chunk
- Implemented EmbeddingService (text-embedding-v4, 1024-dim vectors)
- Implemented ChunkService (smart Markdown chunking)
- Implemented VectorSearchService (multi-query + hybrid search)
- Implemented RerankService (qwen3-rerank)
- Integrated DeepSeek V3 QueryRewriter for cross-language search
- Python service: Added pymupdf4llm for PDF-to-Markdown conversion
- PKB: Dual-mode adapter (pgvector/dify/hybrid)

Architecture:
- Brain-Hand Model: Business layer (DeepSeek) + Engine layer (pgvector)
- Cross-language support: Chinese query matches English documents
- Small Embedding (1024) + Strong Reranker strategy

Performance:
- End-to-end latency: 2.5s
- Cost per query: 0.0025 RMB
- Accuracy improvement: +20.5% (cross-language)

Tests:
- test-embedding-service.ts: Vector embedding verified
- test-rag-e2e.ts: Full pipeline tested
- test-rerank.ts: Rerank quality validated
- test-query-rewrite.ts: Cross-language search verified
- test-pdf-ingest.ts: Real PDF document tested (Dongen 2003.pdf)

Documentation:
- Added 05-RAG-Engine-User-Guide.md
- Added 02-Document-Processing-User-Guide.md
- Updated system status documentation

Status: Production ready
2026-01-21 20:24:29 +08:00

17 KiB
Raw Blame History

知识库引擎分阶段实施方案

文档版本: v1.0
创建日期: 2026-01-20
最后更新: 2026-01-20
核心原则: 先跑通 MVP让业务走起来再逐步完善


📋 概述

为什么分阶段实施?

完整的知识库引擎包含多个复杂功能,一次性全部实现风险高、周期长。采用分阶段实施:

  • 降低风险:每阶段可交付、可验证
  • 快速见效MVP 3天即可让业务跑起来
  • 灵活调整:根据业务反馈调整后续优先级

三阶段总览

┌─────────────────────────────────────────────────────────────┐
│  Phase 1: MVP3天                                         │
│  ─────────────────                                          │
│  目标:让业务跑起来                                           │
│  能力:入库 + 向量检索 + 全文获取                             │
│  场景PKB 基础问答                                          │
├─────────────────────────────────────────────────────────────┤
│  Phase 2: 增强检索2天                                    │
│  ─────────────────                                          │
│  目标:检索质量提升                                           │
│  能力:+ 关键词检索 + 混合检索 + rerank                       │
│  场景PKB 高质量检索                                        │
├─────────────────────────────────────────────────────────────┤
│  Phase 3: 完整功能3天                                    │
│  ─────────────────                                          │
│  目标:完整架构落地                                           │
│  能力:+ 异步入库 + 摘要生成 + 临床要素提取                    │
│  场景ASL、AIA 完整功能                                     │
└─────────────────────────────────────────────────────────────┘

🚀 Phase 1: MVP3天

目标

最小可用版本PKB 能上传文档、能检索、能问答

交付能力

能力 方法 说明
文档入库 ingestDocument() 同步处理(简化版)
向量检索 vectorSearch() pgvector 语义检索
全文获取 getDocumentFullText() 获取单个文档
getAllDocumentsText() 获取知识库所有文档
管理操作 deleteDocument() 删除文档

暂不实现

  • 异步入库pg-boss
  • 关键词检索pg_trgm
  • 混合检索RRF
  • rerank 重排序
  • 摘要生成
  • 临床要素提取PICO

数据模型

📌 数据模型详见04-数据模型设计.md

⚠️ 重要MVP 阶段就创建完整 Schema但只使用部分字段。避免后续阶段改表迁移。

字段使用阶段说明

字段分层 Phase 1 MVP Phase 2 增强 Phase 3 完整
Layer 0-1: 基础信息
filename, fileType, fileSizeBytes 填充 - -
fileUrl, extractedText 填充 - -
status, errorMessage 使用 - -
Layer 2: 内容增强
summary, tokenCount, pageCount 预留 预留 填充
Layer 3: 分类标签
contentType, tags, category 可选 可选 启用
Layer 4: 结构化数据
metadata, structuredData 预留 预留 填充
EkbChunk
content, chunkIndex 填充 - -
pageNumber, sectionType 可选 - -
embedding 填充 - -

核心代码

// Phase 1: MVP 版本 KnowledgeBaseEngine
export class KnowledgeBaseEngine {
  constructor(private prisma: PrismaClient) {}

  /**
   * 同步入库MVP 简化版,小文件场景)
   * Phase 3 将升级为异步
   */
  async ingestDocument(params: {
    kbId: string;
    userId: string;
    file: Buffer;
    filename: string;
  }): Promise<{ documentId: string }> {
    // 1. 解析文档 → Markdown
    const markdown = await documentProcessor.toMarkdown(params.file, params.filename);
    
    // 2. 切片
    const chunks = chunkService.split(markdown, { size: 512, overlap: 50 });
    
    // 3. 向量化
    const embeddings = await embeddingService.embedBatch(chunks.map(c => c.text));
    
    // 4. 上传原始文件到 OSS
    const fileUrl = await storage.upload(params.file, params.filename);
    
    // 5. 存储文档(使用完整 SchemaMVP 只填充部分字段)
    const document = await this.prisma.ekbDocument.create({
      data: {
        kbId: params.kbId,
        userId: params.userId,
        filename: params.filename,
        fileType: getFileType(params.filename),
        fileSizeBytes: BigInt(params.file.length),
        fileUrl: fileUrl,
        extractedText: markdown,
        status: 'completed',
        // Phase 3 才填充的字段保持 null
        // summary, tokenCount, pico, studyDesign, regimen, safety, criteria, endpoints
      }
    });
    
    // 6. 存储切片 + 向量(使用完整 Schema
    for (let i = 0; i < chunks.length; i++) {
      await this.prisma.$executeRaw`
        INSERT INTO "ekb_schema"."EkbChunk" 
        (id, document_id, content, chunk_index, page_number, section_type, embedding, created_at)
        VALUES (
          ${crypto.randomUUID()},
          ${document.id},
          ${chunks[i].text},
          ${i},
          ${chunks[i].pageNumber || null},
          ${chunks[i].sectionType || null},
          ${embeddings[i]}::vector,
          NOW()
        )
      `;
    }
    
    return { documentId: document.id };
  }

  /**
   * 向量检索
   */
  async vectorSearch(
    kbIds: string[],
    query: string,
    topK: number = 10
  ): Promise<SearchResult[]> {
    const queryVector = await embeddingService.embed(query);
    
    const results = await this.prisma.$queryRaw<SearchResult[]>`
      SELECT 
        c.id,
        c.content,
        c.document_id,
        d.filename,
        1 - (c.embedding <=> ${queryVector}::vector) as score
      FROM "ekb_schema"."EkbChunk" c
      JOIN "ekb_schema"."EkbDocument" d ON c.document_id = d.id
      WHERE d.kb_id = ANY(${kbIds}::text[])
      ORDER BY c.embedding <=> ${queryVector}::vector
      LIMIT ${topK}
    `;
    
    return results;
  }

  /**
   * 获取单个文档全文
   */
  async getDocumentFullText(documentId: string): Promise<DocumentText> {
    const doc = await this.prisma.ekbDocument.findUnique({
      where: { id: documentId },
      select: { id: true, filename: true, extractedText: true }
    });
    
    if (!doc) throw new Error('Document not found');
    
    return {
      id: doc.id,
      filename: doc.filename,
      text: doc.extractedText || '',
    };
  }

  /**
   * 获取知识库所有文档全文
   */
  async getAllDocumentsText(kbId: string): Promise<DocumentText[]> {
    const docs = await this.prisma.ekbDocument.findMany({
      where: { kbId, status: 'completed' },
      select: { id: true, filename: true, extractedText: true }
    });
    
    return docs.map(doc => ({
      id: doc.id,
      filename: doc.filename,
      text: doc.extractedText || '',
    }));
  }

  /**
   * 删除文档
   */
  async deleteDocument(documentId: string): Promise<void> {
    await this.prisma.ekbDocument.delete({
      where: { id: documentId }
    });
  }
}

索引设计(完整版,一次性创建)

-- ===== MVP 阶段必须创建 =====

-- 1. HNSW 向量索引(语义检索核心)
CREATE INDEX IF NOT EXISTS ekb_chunk_embedding_idx 
ON "ekb_schema"."EkbChunk" 
USING hnsw (embedding vector_cosine_ops)
WITH (m = 16, ef_construction = 64);

-- ===== Phase 2 阶段使用MVP 可预创建)=====

-- 2. pg_bigm 中文关键词索引
CREATE EXTENSION IF NOT EXISTS pg_bigm;
CREATE INDEX IF NOT EXISTS ekb_chunk_content_bigm_idx 
ON "ekb_schema"."EkbChunk" 
USING gin (content gin_bigm_ops);

-- ===== Phase 3 阶段使用MVP 可预创建)=====

-- 3. JSONB GIN 索引(临床数据查询)
CREATE INDEX IF NOT EXISTS ekb_document_pico_idx 
ON "ekb_schema"."EkbDocument" USING gin (pico);

CREATE INDEX IF NOT EXISTS ekb_document_safety_idx 
ON "ekb_schema"."EkbDocument" USING gin (safety);

CREATE INDEX IF NOT EXISTS ekb_document_studydesign_idx 
ON "ekb_schema"."EkbDocument" USING gin ("studyDesign");

💡 建议MVP 阶段一次性创建所有索引,避免后续 DDL 操作。空表创建索引几乎无成本。

任务清单

任务 预估 产出
Schema 迁移(完整版) 3h ekb_schema + 完整表结构 + 全部索引
EmbeddingService 3h 阿里云 API 封装
ChunkService 2h 文本切片
KnowledgeBaseEngine MVP 4h 核心类(只实现 MVP 方法)
单元测试 3h 基础测试用例
PKB 集成 4h 替换 Dify 调用
合计 19h (3天)

💡 关键点Schema 迁移一次到位,后续阶段只写代码,不改表。

验收标准

  • PKB 可上传 PDF 文档
  • PKB 可向量检索
  • PKB 可获取文档全文
  • PKB 可删除文档
  • 基础问答功能正常

🔍 Phase 2: 增强检索2天

目标

检索质量提升:支持中文关键词、混合检索、结果重排序

新增能力

能力 方法 说明
关键词检索 keywordSearch() pg_bigm 中文精确匹配
混合检索 hybridSearch() 向量 + 关键词 + RRF
重排序 rerank() Qwen-Rerank API

新增索引

-- Phase 2: 关键词检索索引pg_bigm专为中文优化
CREATE EXTENSION IF NOT EXISTS pg_bigm;

CREATE INDEX IF NOT EXISTS ekb_chunk_content_bigm_idx 
ON "ekb_schema"."EkbChunk" 
USING gin (content gin_bigm_ops);

新增代码

// Phase 2 新增方法

/**
 * 关键词检索pg_bigm 中文精确匹配)
 */
async keywordSearch(
  kbIds: string[],
  query: string,
  topK: number = 10
): Promise<SearchResult[]> {
  const results = await this.prisma.$queryRaw<SearchResult[]>`
    SELECT 
      c.id,
      c.content,
      c.document_id,
      d.filename,
      bigm_similarity(c.content, ${query}) as score
    FROM "ekb_schema"."EkbChunk" c
    JOIN "ekb_schema"."EkbDocument" d ON c.document_id = d.id
    WHERE d.kb_id = ANY(${kbIds}::text[])
      AND c.content LIKE ${'%' + query + '%'}
    ORDER BY bigm_similarity(c.content, ${query}) DESC
    LIMIT ${topK}
  `;
  
  return results;
}

/**
 * 混合检索(向量 + 关键词 + RRF 融合)
 */
async hybridSearch(
  kbIds: string[],
  query: string,
  topK: number = 10
): Promise<SearchResult[]> {
  // 并发执行两路检索
  const [vectorResults, keywordResults] = await Promise.all([
    this.vectorSearch(kbIds, query, 20),
    this.keywordSearch(kbIds, query, 20),
  ]);
  
  // RRF 融合
  return rrfFusion(vectorResults, keywordResults, topK);
}

/**
 * 重排序Qwen-Rerank API
 */
async rerank(
  documents: SearchResult[],
  query: string,
  topK: number = 10
): Promise<SearchResult[]> {
  const response = await dashscope.rerank({
    model: 'gte-rerank',
    query,
    documents: documents.map(d => d.content),
    top_n: topK,
  });
  
  return response.results.map(r => ({
    ...documents[r.index],
    score: r.relevance_score,
  }));
}

RRF 融合算法

// utils/rrfFusion.ts
export function rrfFusion(
  vectorResults: SearchResult[],
  keywordResults: SearchResult[],
  topK: number,
  k: number = 60
): SearchResult[] {
  const scores = new Map<string, number>();
  
  // 向量检索得分
  vectorResults.forEach((r, rank) => {
    const score = scores.get(r.id) || 0;
    scores.set(r.id, score + 1 / (k + rank + 1));
  });
  
  // 关键词检索得分
  keywordResults.forEach((r, rank) => {
    const score = scores.get(r.id) || 0;
    scores.set(r.id, score + 1 / (k + rank + 1));
  });
  
  // 合并去重 + 排序
  const allResults = [...vectorResults, ...keywordResults];
  const uniqueResults = Array.from(
    new Map(allResults.map(r => [r.id, r])).values()
  );
  
  return uniqueResults
    .map(r => ({ ...r, score: scores.get(r.id) || 0 }))
    .sort((a, b) => b.score - a.score)
    .slice(0, topK);
}

任务清单

任务 预估 产出
pg_bigm 索引 1h SQL 迁移
KeywordSearchService 2h 关键词检索pg_bigm
RRF 融合算法 2h rrfFusion.ts
RerankService 2h 阿里云 API 封装
hybridSearch 集成 2h 混合检索
测试 3h 检索质量验证
合计 12h (2天)

验收标准

  • 关键词检索支持中文("帕博利珠" 可匹配 "帕博利珠单抗"
  • 混合检索可用
  • rerank 可用
  • 检索召回率提升(对比 Phase 1

🎯 Phase 3: 完整功能3天

目标

完整架构落地:支持大文件、高级提取功能

新增能力

能力 方法 说明
异步入库 submitIngestTask() pg-boss 队列
getIngestStatus() 任务状态查询
摘要获取 getDocumentSummary() LLM 生成摘要
getAllDocumentsSummaries() 批量获取摘要
临床数据 getClinicalData() PICO 等结构化数据

Schema 说明

无需 Schema 升级MVP 阶段已创建完整表结构Phase 3 只需填充字段。

// Phase 3: 开始填充 summary、tokenCount、临床数据字段
await prisma.ekbDocument.update({
  where: { id: documentId },
  data: {
    summary: generatedSummary,           // 🆕 Phase 3 填充
    tokenCount: calculatedTokens,        // 🆕 Phase 3 填充
    pico: extractedPico,                 // 🆕 Phase 3 填充
    studyDesign: extractedStudyDesign,   // 🆕 Phase 3 填充
    regimen: extractedRegimen,           // 🆕 Phase 3 填充
    safety: extractedSafety,             // 🆕 Phase 3 填充
  }
});

异步入库实现

详见:Postgres-Only异步任务处理指南

// Phase 3: 异步入库

/**
 * 提交入库任务
 */
async submitIngestTask(params: {
  kbId: string;
  userId: string;
  file: Buffer;
  filename: string;
  options?: {
    enableSummary?: boolean;           // 💰 默认 false
    enableClinicalExtraction?: boolean; // 💰 默认 false
  };
}): Promise<{ taskId: string; documentId: string }> {
  // 1. 快速上传到 OSS
  const fileUrl = await storage.upload(params.file);
  
  // 2. 创建文档记录status: processing
  const document = await this.prisma.ekbDocument.create({
    data: {
      kbId: params.kbId,
      userId: params.userId,
      filename: params.filename,
      fileUrl,
      status: 'processing',
    }
  });
  
  // 3. 推送任务到 pg-boss
  const taskId = await jobQueue.send('ekb-ingest', {
    documentId: document.id,
    fileUrl,
    options: params.options,
  });
  
  return { taskId, documentId: document.id };
}

/**
 * 获取任务状态
 */
async getIngestStatus(taskId: string): Promise<{
  status: 'pending' | 'processing' | 'completed' | 'failed';
  progress: number;
  error?: string;
}> {
  const job = await jobQueue.getJobById(taskId);
  
  return {
    status: job.state,
    progress: job.data?.progress || 0,
    error: job.state === 'failed' ? job.output?.error : undefined,
  };
}

任务清单

任务 预估 产出
pg-boss Worker 4h ingestWorker.ts异步入库
SummaryService 3h LLM 摘要生成
ClinicalExtractionService 4h PICO 提取 + JSON 容错
摘要/临床数据 API 2h 新增 getDocumentSummary 等方法
测试 4h 完整流程验证
合计 17h (3天)

优势:无需 Schema 迁移,只写业务代码。

验收标准

  • 大文件上传不超时
  • 任务状态可查询
  • 摘要生成可用
  • PICO 提取可用ASL 场景)
  • AIA 摘要筛选策略可用

📊 总体进度

阶段 工期 累计 状态
Phase 1 MVP 3天 3天 🔜 待开始
Phase 2 增强 2天 5天 📋 规划中
Phase 3 完整 3天 8天 📋 规划中

🔗 相关文档


📅 更新日志

v1.0 (2026-01-20)

  • 初始版本
  • 确定三阶段实施方案MVP → 增强 → 完整

维护人: 技术架构师
最后更新: 2026-01-20