Files
AIclinicalresearch/docs/02-通用能力层/03-RAG引擎/03-分阶段实施方案.md
HaHafeng 40c2f8e148 feat(rag): Complete RAG engine implementation with pgvector
Major Features:
- Created ekb_schema (13th schema) with 3 tables: KB/Document/Chunk
- Implemented EmbeddingService (text-embedding-v4, 1024-dim vectors)
- Implemented ChunkService (smart Markdown chunking)
- Implemented VectorSearchService (multi-query + hybrid search)
- Implemented RerankService (qwen3-rerank)
- Integrated DeepSeek V3 QueryRewriter for cross-language search
- Python service: Added pymupdf4llm for PDF-to-Markdown conversion
- PKB: Dual-mode adapter (pgvector/dify/hybrid)

Architecture:
- Brain-Hand Model: Business layer (DeepSeek) + Engine layer (pgvector)
- Cross-language support: Chinese query matches English documents
- Small Embedding (1024) + Strong Reranker strategy

Performance:
- End-to-end latency: 2.5s
- Cost per query: 0.0025 RMB
- Accuracy improvement: +20.5% (cross-language)

Tests:
- test-embedding-service.ts: Vector embedding verified
- test-rag-e2e.ts: Full pipeline tested
- test-rerank.ts: Rerank quality validated
- test-query-rewrite.ts: Cross-language search verified
- test-pdf-ingest.ts: Real PDF document tested (Dongen 2003.pdf)

Documentation:
- Added 05-RAG-Engine-User-Guide.md
- Added 02-Document-Processing-User-Guide.md
- Updated system status documentation

Status: Production ready
2026-01-21 20:24:29 +08:00

596 lines
17 KiB
Markdown
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# 知识库引擎分阶段实施方案
> **文档版本:** v1.0
> **创建日期:** 2026-01-20
> **最后更新:** 2026-01-20
> **核心原则:** 先跑通 MVP让业务走起来再逐步完善
---
## 📋 概述
### 为什么分阶段实施?
完整的知识库引擎包含多个复杂功能,一次性全部实现风险高、周期长。采用分阶段实施:
-**降低风险**:每阶段可交付、可验证
-**快速见效**MVP 3天即可让业务跑起来
-**灵活调整**:根据业务反馈调整后续优先级
### 三阶段总览
```
┌─────────────────────────────────────────────────────────────┐
│ Phase 1: MVP3天
│ ───────────────── │
│ 目标:让业务跑起来 │
│ 能力:入库 + 向量检索 + 全文获取 │
│ 场景PKB 基础问答 │
├─────────────────────────────────────────────────────────────┤
│ Phase 2: 增强检索2天
│ ───────────────── │
│ 目标:检索质量提升 │
│ 能力:+ 关键词检索 + 混合检索 + rerank │
│ 场景PKB 高质量检索 │
├─────────────────────────────────────────────────────────────┤
│ Phase 3: 完整功能3天
│ ───────────────── │
│ 目标:完整架构落地 │
│ 能力:+ 异步入库 + 摘要生成 + 临床要素提取 │
│ 场景ASL、AIA 完整功能 │
└─────────────────────────────────────────────────────────────┘
```
---
## 🚀 Phase 1: MVP3天
### 目标
**最小可用版本**PKB 能上传文档、能检索、能问答
### 交付能力
| 能力 | 方法 | 说明 |
|------|------|------|
| **文档入库** | `ingestDocument()` | 同步处理(简化版) |
| **向量检索** | `vectorSearch()` | pgvector 语义检索 |
| **全文获取** | `getDocumentFullText()` | 获取单个文档 |
| | `getAllDocumentsText()` | 获取知识库所有文档 |
| **管理操作** | `deleteDocument()` | 删除文档 |
### 暂不实现
- ❌ 异步入库pg-boss
- ❌ 关键词检索pg_trgm
- ❌ 混合检索RRF
- ❌ rerank 重排序
- ❌ 摘要生成
- ❌ 临床要素提取PICO
### 数据模型
> 📌 **数据模型详见**[04-数据模型设计.md](./04-数据模型设计.md)
>
> ⚠️ **重要**MVP 阶段就创建完整 Schema但只使用部分字段。避免后续阶段改表迁移。
### 字段使用阶段说明
| 字段分层 | Phase 1 MVP | Phase 2 增强 | Phase 3 完整 |
|----------|-------------|--------------|--------------|
| **Layer 0-1: 基础信息** | | | |
| filename, fileType, fileSizeBytes | ✅ 填充 | - | - |
| fileUrl, extractedText | ✅ 填充 | - | - |
| status, errorMessage | ✅ 使用 | - | - |
| **Layer 2: 内容增强** | | | |
| summary, tokenCount, pageCount | ❌ 预留 | ❌ 预留 | ✅ 填充 |
| **Layer 3: 分类标签** | | | |
| contentType, tags, category | ⚪ 可选 | ⚪ 可选 | ✅ 启用 |
| **Layer 4: 结构化数据** | | | |
| metadata, structuredData | ❌ 预留 | ❌ 预留 | ✅ 填充 |
| **EkbChunk** | | | |
| content, chunkIndex | ✅ 填充 | - | - |
| pageNumber, sectionType | ⚪ 可选 | - | - |
| embedding | ✅ 填充 | - | - |
### 核心代码
```typescript
// Phase 1: MVP 版本 KnowledgeBaseEngine
export class KnowledgeBaseEngine {
constructor(private prisma: PrismaClient) {}
/**
* 同步入库MVP 简化版,小文件场景)
* Phase 3 将升级为异步
*/
async ingestDocument(params: {
kbId: string;
userId: string;
file: Buffer;
filename: string;
}): Promise<{ documentId: string }> {
// 1. 解析文档 → Markdown
const markdown = await documentProcessor.toMarkdown(params.file, params.filename);
// 2. 切片
const chunks = chunkService.split(markdown, { size: 512, overlap: 50 });
// 3. 向量化
const embeddings = await embeddingService.embedBatch(chunks.map(c => c.text));
// 4. 上传原始文件到 OSS
const fileUrl = await storage.upload(params.file, params.filename);
// 5. 存储文档(使用完整 SchemaMVP 只填充部分字段)
const document = await this.prisma.ekbDocument.create({
data: {
kbId: params.kbId,
userId: params.userId,
filename: params.filename,
fileType: getFileType(params.filename),
fileSizeBytes: BigInt(params.file.length),
fileUrl: fileUrl,
extractedText: markdown,
status: 'completed',
// Phase 3 才填充的字段保持 null
// summary, tokenCount, pico, studyDesign, regimen, safety, criteria, endpoints
}
});
// 6. 存储切片 + 向量(使用完整 Schema
for (let i = 0; i < chunks.length; i++) {
await this.prisma.$executeRaw`
INSERT INTO "ekb_schema"."EkbChunk"
(id, document_id, content, chunk_index, page_number, section_type, embedding, created_at)
VALUES (
${crypto.randomUUID()},
${document.id},
${chunks[i].text},
${i},
${chunks[i].pageNumber || null},
${chunks[i].sectionType || null},
${embeddings[i]}::vector,
NOW()
)
`;
}
return { documentId: document.id };
}
/**
* 向量检索
*/
async vectorSearch(
kbIds: string[],
query: string,
topK: number = 10
): Promise<SearchResult[]> {
const queryVector = await embeddingService.embed(query);
const results = await this.prisma.$queryRaw<SearchResult[]>`
SELECT
c.id,
c.content,
c.document_id,
d.filename,
1 - (c.embedding <=> ${queryVector}::vector) as score
FROM "ekb_schema"."EkbChunk" c
JOIN "ekb_schema"."EkbDocument" d ON c.document_id = d.id
WHERE d.kb_id = ANY(${kbIds}::text[])
ORDER BY c.embedding <=> ${queryVector}::vector
LIMIT ${topK}
`;
return results;
}
/**
* 获取单个文档全文
*/
async getDocumentFullText(documentId: string): Promise<DocumentText> {
const doc = await this.prisma.ekbDocument.findUnique({
where: { id: documentId },
select: { id: true, filename: true, extractedText: true }
});
if (!doc) throw new Error('Document not found');
return {
id: doc.id,
filename: doc.filename,
text: doc.extractedText || '',
};
}
/**
* 获取知识库所有文档全文
*/
async getAllDocumentsText(kbId: string): Promise<DocumentText[]> {
const docs = await this.prisma.ekbDocument.findMany({
where: { kbId, status: 'completed' },
select: { id: true, filename: true, extractedText: true }
});
return docs.map(doc => ({
id: doc.id,
filename: doc.filename,
text: doc.extractedText || '',
}));
}
/**
* 删除文档
*/
async deleteDocument(documentId: string): Promise<void> {
await this.prisma.ekbDocument.delete({
where: { id: documentId }
});
}
}
```
### 索引设计(完整版,一次性创建)
```sql
-- ===== MVP 阶段必须创建 =====
-- 1. HNSW 向量索引(语义检索核心)
CREATE INDEX IF NOT EXISTS ekb_chunk_embedding_idx
ON "ekb_schema"."EkbChunk"
USING hnsw (embedding vector_cosine_ops)
WITH (m = 16, ef_construction = 64);
-- ===== Phase 2 阶段使用MVP 可预创建)=====
-- 2. pg_bigm 中文关键词索引
CREATE EXTENSION IF NOT EXISTS pg_bigm;
CREATE INDEX IF NOT EXISTS ekb_chunk_content_bigm_idx
ON "ekb_schema"."EkbChunk"
USING gin (content gin_bigm_ops);
-- ===== Phase 3 阶段使用MVP 可预创建)=====
-- 3. JSONB GIN 索引(临床数据查询)
CREATE INDEX IF NOT EXISTS ekb_document_pico_idx
ON "ekb_schema"."EkbDocument" USING gin (pico);
CREATE INDEX IF NOT EXISTS ekb_document_safety_idx
ON "ekb_schema"."EkbDocument" USING gin (safety);
CREATE INDEX IF NOT EXISTS ekb_document_studydesign_idx
ON "ekb_schema"."EkbDocument" USING gin ("studyDesign");
```
> 💡 **建议**MVP 阶段一次性创建所有索引,避免后续 DDL 操作。空表创建索引几乎无成本。
### 任务清单
| 任务 | 预估 | 产出 |
|------|------|------|
| **Schema 迁移(完整版)** | 3h | `ekb_schema` + 完整表结构 + 全部索引 |
| EmbeddingService | 3h | 阿里云 API 封装 |
| ChunkService | 2h | 文本切片 |
| KnowledgeBaseEngine MVP | 4h | 核心类(只实现 MVP 方法) |
| 单元测试 | 3h | 基础测试用例 |
| PKB 集成 | 4h | 替换 Dify 调用 |
| **合计** | **19h (3天)** | |
> 💡 **关键点**Schema 迁移一次到位,后续阶段只写代码,不改表。
### 验收标准
- [ ] PKB 可上传 PDF 文档
- [ ] PKB 可向量检索
- [ ] PKB 可获取文档全文
- [ ] PKB 可删除文档
- [ ] 基础问答功能正常
---
## 🔍 Phase 2: 增强检索2天
### 目标
**检索质量提升**:支持中文关键词、混合检索、结果重排序
### 新增能力
| 能力 | 方法 | 说明 |
|------|------|------|
| **关键词检索** | `keywordSearch()` | pg_bigm 中文精确匹配 |
| **混合检索** | `hybridSearch()` | 向量 + 关键词 + RRF |
| **重排序** | `rerank()` | Qwen-Rerank API |
### 新增索引
```sql
-- Phase 2: 关键词检索索引pg_bigm专为中文优化
CREATE EXTENSION IF NOT EXISTS pg_bigm;
CREATE INDEX IF NOT EXISTS ekb_chunk_content_bigm_idx
ON "ekb_schema"."EkbChunk"
USING gin (content gin_bigm_ops);
```
### 新增代码
```typescript
// Phase 2 新增方法
/**
* 关键词检索pg_bigm 中文精确匹配)
*/
async keywordSearch(
kbIds: string[],
query: string,
topK: number = 10
): Promise<SearchResult[]> {
const results = await this.prisma.$queryRaw<SearchResult[]>`
SELECT
c.id,
c.content,
c.document_id,
d.filename,
bigm_similarity(c.content, ${query}) as score
FROM "ekb_schema"."EkbChunk" c
JOIN "ekb_schema"."EkbDocument" d ON c.document_id = d.id
WHERE d.kb_id = ANY(${kbIds}::text[])
AND c.content LIKE ${'%' + query + '%'}
ORDER BY bigm_similarity(c.content, ${query}) DESC
LIMIT ${topK}
`;
return results;
}
/**
* 混合检索(向量 + 关键词 + RRF 融合)
*/
async hybridSearch(
kbIds: string[],
query: string,
topK: number = 10
): Promise<SearchResult[]> {
// 并发执行两路检索
const [vectorResults, keywordResults] = await Promise.all([
this.vectorSearch(kbIds, query, 20),
this.keywordSearch(kbIds, query, 20),
]);
// RRF 融合
return rrfFusion(vectorResults, keywordResults, topK);
}
/**
* 重排序Qwen-Rerank API
*/
async rerank(
documents: SearchResult[],
query: string,
topK: number = 10
): Promise<SearchResult[]> {
const response = await dashscope.rerank({
model: 'gte-rerank',
query,
documents: documents.map(d => d.content),
top_n: topK,
});
return response.results.map(r => ({
...documents[r.index],
score: r.relevance_score,
}));
}
```
### RRF 融合算法
```typescript
// utils/rrfFusion.ts
export function rrfFusion(
vectorResults: SearchResult[],
keywordResults: SearchResult[],
topK: number,
k: number = 60
): SearchResult[] {
const scores = new Map<string, number>();
// 向量检索得分
vectorResults.forEach((r, rank) => {
const score = scores.get(r.id) || 0;
scores.set(r.id, score + 1 / (k + rank + 1));
});
// 关键词检索得分
keywordResults.forEach((r, rank) => {
const score = scores.get(r.id) || 0;
scores.set(r.id, score + 1 / (k + rank + 1));
});
// 合并去重 + 排序
const allResults = [...vectorResults, ...keywordResults];
const uniqueResults = Array.from(
new Map(allResults.map(r => [r.id, r])).values()
);
return uniqueResults
.map(r => ({ ...r, score: scores.get(r.id) || 0 }))
.sort((a, b) => b.score - a.score)
.slice(0, topK);
}
```
### 任务清单
| 任务 | 预估 | 产出 |
|------|------|------|
| pg_bigm 索引 | 1h | SQL 迁移 |
| KeywordSearchService | 2h | 关键词检索pg_bigm |
| RRF 融合算法 | 2h | rrfFusion.ts |
| RerankService | 2h | 阿里云 API 封装 |
| hybridSearch 集成 | 2h | 混合检索 |
| 测试 | 3h | 检索质量验证 |
| **合计** | **12h (2天)** | |
### 验收标准
- [ ] 关键词检索支持中文("帕博利珠" 可匹配 "帕博利珠单抗"
- [ ] 混合检索可用
- [ ] rerank 可用
- [ ] 检索召回率提升(对比 Phase 1
---
## 🎯 Phase 3: 完整功能3天
### 目标
**完整架构落地**:支持大文件、高级提取功能
### 新增能力
| 能力 | 方法 | 说明 |
|------|------|------|
| **异步入库** | `submitIngestTask()` | pg-boss 队列 |
| | `getIngestStatus()` | 任务状态查询 |
| **摘要获取** | `getDocumentSummary()` | LLM 生成摘要 |
| | `getAllDocumentsSummaries()` | 批量获取摘要 |
| **临床数据** | `getClinicalData()` | PICO 等结构化数据 |
### Schema 说明
> ✅ **无需 Schema 升级**MVP 阶段已创建完整表结构Phase 3 只需填充字段。
```typescript
// Phase 3: 开始填充 summary、tokenCount、临床数据字段
await prisma.ekbDocument.update({
where: { id: documentId },
data: {
summary: generatedSummary, // 🆕 Phase 3 填充
tokenCount: calculatedTokens, // 🆕 Phase 3 填充
pico: extractedPico, // 🆕 Phase 3 填充
studyDesign: extractedStudyDesign, // 🆕 Phase 3 填充
regimen: extractedRegimen, // 🆕 Phase 3 填充
safety: extractedSafety, // 🆕 Phase 3 填充
}
});
```
### 异步入库实现
详见:[Postgres-Only异步任务处理指南](../Postgres-Only异步任务处理指南.md)
```typescript
// Phase 3: 异步入库
/**
* 提交入库任务
*/
async submitIngestTask(params: {
kbId: string;
userId: string;
file: Buffer;
filename: string;
options?: {
enableSummary?: boolean; // 💰 默认 false
enableClinicalExtraction?: boolean; // 💰 默认 false
};
}): Promise<{ taskId: string; documentId: string }> {
// 1. 快速上传到 OSS
const fileUrl = await storage.upload(params.file);
// 2. 创建文档记录status: processing
const document = await this.prisma.ekbDocument.create({
data: {
kbId: params.kbId,
userId: params.userId,
filename: params.filename,
fileUrl,
status: 'processing',
}
});
// 3. 推送任务到 pg-boss
const taskId = await jobQueue.send('ekb-ingest', {
documentId: document.id,
fileUrl,
options: params.options,
});
return { taskId, documentId: document.id };
}
/**
* 获取任务状态
*/
async getIngestStatus(taskId: string): Promise<{
status: 'pending' | 'processing' | 'completed' | 'failed';
progress: number;
error?: string;
}> {
const job = await jobQueue.getJobById(taskId);
return {
status: job.state,
progress: job.data?.progress || 0,
error: job.state === 'failed' ? job.output?.error : undefined,
};
}
```
### 任务清单
| 任务 | 预估 | 产出 |
|------|------|------|
| pg-boss Worker | 4h | ingestWorker.ts异步入库 |
| SummaryService | 3h | LLM 摘要生成 |
| ClinicalExtractionService | 4h | PICO 提取 + JSON 容错 |
| 摘要/临床数据 API | 2h | 新增 getDocumentSummary 等方法 |
| 测试 | 4h | 完整流程验证 |
| **合计** | **17h (3天)** | |
> ✅ **优势**:无需 Schema 迁移,只写业务代码。
### 验收标准
- [ ] 大文件上传不超时
- [ ] 任务状态可查询
- [ ] 摘要生成可用
- [ ] PICO 提取可用ASL 场景)
- [ ] AIA 摘要筛选策略可用
---
## 📊 总体进度
| 阶段 | 工期 | 累计 | 状态 |
|------|------|------|------|
| **Phase 1 MVP** | 3天 | 3天 | 🔜 待开始 |
| **Phase 2 增强** | 2天 | 5天 | 📋 规划中 |
| **Phase 3 完整** | 3天 | 8天 | 📋 规划中 |
---
## 🔗 相关文档
- [知识库引擎架构设计](./01-知识库引擎架构设计.md) - 完整架构目标
- [pgvector 替换 Dify 技术方案](./02-pgvector替换Dify计划.md) - 详细技术实现
- [Postgres-Only异步任务处理指南](../Postgres-Only异步任务处理指南.md) - 异步架构参考
---
## 📅 更新日志
### v1.0 (2026-01-20)
- 初始版本
- 确定三阶段实施方案MVP → 增强 → 完整
---
**维护人:** 技术架构师
**最后更新:** 2026-01-20