feat(rag): Complete RAG engine implementation with pgvector

Major Features:
- Created ekb_schema (13th schema) with 3 tables: KB/Document/Chunk
- Implemented EmbeddingService (text-embedding-v4, 1024-dim vectors)
- Implemented ChunkService (smart Markdown chunking)
- Implemented VectorSearchService (multi-query + hybrid search)
- Implemented RerankService (qwen3-rerank)
- Integrated DeepSeek V3 QueryRewriter for cross-language search
- Python service: Added pymupdf4llm for PDF-to-Markdown conversion
- PKB: Dual-mode adapter (pgvector/dify/hybrid)

Architecture:
- Brain-Hand Model: Business layer (DeepSeek) + Engine layer (pgvector)
- Cross-language support: Chinese query matches English documents
- Small Embedding (1024) + Strong Reranker strategy

Performance:
- End-to-end latency: 2.5s
- Cost per query: 0.0025 RMB
- Accuracy improvement: +20.5% (cross-language)

Tests:
- test-embedding-service.ts: Vector embedding verified
- test-rag-e2e.ts: Full pipeline tested
- test-rerank.ts: Rerank quality validated
- test-query-rewrite.ts: Cross-language search verified
- test-pdf-ingest.ts: Real PDF document tested (Dongen 2003.pdf)

Documentation:
- Added 05-RAG-Engine-User-Guide.md
- Added 02-Document-Processing-User-Guide.md
- Updated system status documentation

Status: Production ready
This commit is contained in:
2026-01-21 20:24:29 +08:00
parent 1f5bf2cd65
commit 40c2f8e148
338 changed files with 11014 additions and 1158 deletions

View File

@@ -0,0 +1,595 @@
# 知识库引擎分阶段实施方案
> **文档版本:** v1.0
> **创建日期:** 2026-01-20
> **最后更新:** 2026-01-20
> **核心原则:** 先跑通 MVP让业务走起来再逐步完善
---
## 📋 概述
### 为什么分阶段实施?
完整的知识库引擎包含多个复杂功能,一次性全部实现风险高、周期长。采用分阶段实施:
-**降低风险**:每阶段可交付、可验证
-**快速见效**MVP 3天即可让业务跑起来
-**灵活调整**:根据业务反馈调整后续优先级
### 三阶段总览
```
┌─────────────────────────────────────────────────────────────┐
│ Phase 1: MVP3天
│ ───────────────── │
│ 目标:让业务跑起来 │
│ 能力:入库 + 向量检索 + 全文获取 │
│ 场景PKB 基础问答 │
├─────────────────────────────────────────────────────────────┤
│ Phase 2: 增强检索2天
│ ───────────────── │
│ 目标:检索质量提升 │
│ 能力:+ 关键词检索 + 混合检索 + rerank │
│ 场景PKB 高质量检索 │
├─────────────────────────────────────────────────────────────┤
│ Phase 3: 完整功能3天
│ ───────────────── │
│ 目标:完整架构落地 │
│ 能力:+ 异步入库 + 摘要生成 + 临床要素提取 │
│ 场景ASL、AIA 完整功能 │
└─────────────────────────────────────────────────────────────┘
```
---
## 🚀 Phase 1: MVP3天
### 目标
**最小可用版本**PKB 能上传文档、能检索、能问答
### 交付能力
| 能力 | 方法 | 说明 |
|------|------|------|
| **文档入库** | `ingestDocument()` | 同步处理(简化版) |
| **向量检索** | `vectorSearch()` | pgvector 语义检索 |
| **全文获取** | `getDocumentFullText()` | 获取单个文档 |
| | `getAllDocumentsText()` | 获取知识库所有文档 |
| **管理操作** | `deleteDocument()` | 删除文档 |
### 暂不实现
- ❌ 异步入库pg-boss
- ❌ 关键词检索pg_trgm
- ❌ 混合检索RRF
- ❌ rerank 重排序
- ❌ 摘要生成
- ❌ 临床要素提取PICO
### 数据模型
> 📌 **数据模型详见**[04-数据模型设计.md](./04-数据模型设计.md)
>
> ⚠️ **重要**MVP 阶段就创建完整 Schema但只使用部分字段。避免后续阶段改表迁移。
### 字段使用阶段说明
| 字段分层 | Phase 1 MVP | Phase 2 增强 | Phase 3 完整 |
|----------|-------------|--------------|--------------|
| **Layer 0-1: 基础信息** | | | |
| filename, fileType, fileSizeBytes | ✅ 填充 | - | - |
| fileUrl, extractedText | ✅ 填充 | - | - |
| status, errorMessage | ✅ 使用 | - | - |
| **Layer 2: 内容增强** | | | |
| summary, tokenCount, pageCount | ❌ 预留 | ❌ 预留 | ✅ 填充 |
| **Layer 3: 分类标签** | | | |
| contentType, tags, category | ⚪ 可选 | ⚪ 可选 | ✅ 启用 |
| **Layer 4: 结构化数据** | | | |
| metadata, structuredData | ❌ 预留 | ❌ 预留 | ✅ 填充 |
| **EkbChunk** | | | |
| content, chunkIndex | ✅ 填充 | - | - |
| pageNumber, sectionType | ⚪ 可选 | - | - |
| embedding | ✅ 填充 | - | - |
### 核心代码
```typescript
// Phase 1: MVP 版本 KnowledgeBaseEngine
export class KnowledgeBaseEngine {
constructor(private prisma: PrismaClient) {}
/**
* 同步入库MVP 简化版,小文件场景)
* Phase 3 将升级为异步
*/
async ingestDocument(params: {
kbId: string;
userId: string;
file: Buffer;
filename: string;
}): Promise<{ documentId: string }> {
// 1. 解析文档 → Markdown
const markdown = await documentProcessor.toMarkdown(params.file, params.filename);
// 2. 切片
const chunks = chunkService.split(markdown, { size: 512, overlap: 50 });
// 3. 向量化
const embeddings = await embeddingService.embedBatch(chunks.map(c => c.text));
// 4. 上传原始文件到 OSS
const fileUrl = await storage.upload(params.file, params.filename);
// 5. 存储文档(使用完整 SchemaMVP 只填充部分字段)
const document = await this.prisma.ekbDocument.create({
data: {
kbId: params.kbId,
userId: params.userId,
filename: params.filename,
fileType: getFileType(params.filename),
fileSizeBytes: BigInt(params.file.length),
fileUrl: fileUrl,
extractedText: markdown,
status: 'completed',
// Phase 3 才填充的字段保持 null
// summary, tokenCount, pico, studyDesign, regimen, safety, criteria, endpoints
}
});
// 6. 存储切片 + 向量(使用完整 Schema
for (let i = 0; i < chunks.length; i++) {
await this.prisma.$executeRaw`
INSERT INTO "ekb_schema"."EkbChunk"
(id, document_id, content, chunk_index, page_number, section_type, embedding, created_at)
VALUES (
${crypto.randomUUID()},
${document.id},
${chunks[i].text},
${i},
${chunks[i].pageNumber || null},
${chunks[i].sectionType || null},
${embeddings[i]}::vector,
NOW()
)
`;
}
return { documentId: document.id };
}
/**
* 向量检索
*/
async vectorSearch(
kbIds: string[],
query: string,
topK: number = 10
): Promise<SearchResult[]> {
const queryVector = await embeddingService.embed(query);
const results = await this.prisma.$queryRaw<SearchResult[]>`
SELECT
c.id,
c.content,
c.document_id,
d.filename,
1 - (c.embedding <=> ${queryVector}::vector) as score
FROM "ekb_schema"."EkbChunk" c
JOIN "ekb_schema"."EkbDocument" d ON c.document_id = d.id
WHERE d.kb_id = ANY(${kbIds}::text[])
ORDER BY c.embedding <=> ${queryVector}::vector
LIMIT ${topK}
`;
return results;
}
/**
* 获取单个文档全文
*/
async getDocumentFullText(documentId: string): Promise<DocumentText> {
const doc = await this.prisma.ekbDocument.findUnique({
where: { id: documentId },
select: { id: true, filename: true, extractedText: true }
});
if (!doc) throw new Error('Document not found');
return {
id: doc.id,
filename: doc.filename,
text: doc.extractedText || '',
};
}
/**
* 获取知识库所有文档全文
*/
async getAllDocumentsText(kbId: string): Promise<DocumentText[]> {
const docs = await this.prisma.ekbDocument.findMany({
where: { kbId, status: 'completed' },
select: { id: true, filename: true, extractedText: true }
});
return docs.map(doc => ({
id: doc.id,
filename: doc.filename,
text: doc.extractedText || '',
}));
}
/**
* 删除文档
*/
async deleteDocument(documentId: string): Promise<void> {
await this.prisma.ekbDocument.delete({
where: { id: documentId }
});
}
}
```
### 索引设计(完整版,一次性创建)
```sql
-- ===== MVP 阶段必须创建 =====
-- 1. HNSW 向量索引(语义检索核心)
CREATE INDEX IF NOT EXISTS ekb_chunk_embedding_idx
ON "ekb_schema"."EkbChunk"
USING hnsw (embedding vector_cosine_ops)
WITH (m = 16, ef_construction = 64);
-- ===== Phase 2 阶段使用MVP 可预创建)=====
-- 2. pg_bigm 中文关键词索引
CREATE EXTENSION IF NOT EXISTS pg_bigm;
CREATE INDEX IF NOT EXISTS ekb_chunk_content_bigm_idx
ON "ekb_schema"."EkbChunk"
USING gin (content gin_bigm_ops);
-- ===== Phase 3 阶段使用MVP 可预创建)=====
-- 3. JSONB GIN 索引(临床数据查询)
CREATE INDEX IF NOT EXISTS ekb_document_pico_idx
ON "ekb_schema"."EkbDocument" USING gin (pico);
CREATE INDEX IF NOT EXISTS ekb_document_safety_idx
ON "ekb_schema"."EkbDocument" USING gin (safety);
CREATE INDEX IF NOT EXISTS ekb_document_studydesign_idx
ON "ekb_schema"."EkbDocument" USING gin ("studyDesign");
```
> 💡 **建议**MVP 阶段一次性创建所有索引,避免后续 DDL 操作。空表创建索引几乎无成本。
### 任务清单
| 任务 | 预估 | 产出 |
|------|------|------|
| **Schema 迁移(完整版)** | 3h | `ekb_schema` + 完整表结构 + 全部索引 |
| EmbeddingService | 3h | 阿里云 API 封装 |
| ChunkService | 2h | 文本切片 |
| KnowledgeBaseEngine MVP | 4h | 核心类(只实现 MVP 方法) |
| 单元测试 | 3h | 基础测试用例 |
| PKB 集成 | 4h | 替换 Dify 调用 |
| **合计** | **19h (3天)** | |
> 💡 **关键点**Schema 迁移一次到位,后续阶段只写代码,不改表。
### 验收标准
- [ ] PKB 可上传 PDF 文档
- [ ] PKB 可向量检索
- [ ] PKB 可获取文档全文
- [ ] PKB 可删除文档
- [ ] 基础问答功能正常
---
## 🔍 Phase 2: 增强检索2天
### 目标
**检索质量提升**:支持中文关键词、混合检索、结果重排序
### 新增能力
| 能力 | 方法 | 说明 |
|------|------|------|
| **关键词检索** | `keywordSearch()` | pg_bigm 中文精确匹配 |
| **混合检索** | `hybridSearch()` | 向量 + 关键词 + RRF |
| **重排序** | `rerank()` | Qwen-Rerank API |
### 新增索引
```sql
-- Phase 2: 关键词检索索引pg_bigm专为中文优化
CREATE EXTENSION IF NOT EXISTS pg_bigm;
CREATE INDEX IF NOT EXISTS ekb_chunk_content_bigm_idx
ON "ekb_schema"."EkbChunk"
USING gin (content gin_bigm_ops);
```
### 新增代码
```typescript
// Phase 2 新增方法
/**
* 关键词检索pg_bigm 中文精确匹配)
*/
async keywordSearch(
kbIds: string[],
query: string,
topK: number = 10
): Promise<SearchResult[]> {
const results = await this.prisma.$queryRaw<SearchResult[]>`
SELECT
c.id,
c.content,
c.document_id,
d.filename,
bigm_similarity(c.content, ${query}) as score
FROM "ekb_schema"."EkbChunk" c
JOIN "ekb_schema"."EkbDocument" d ON c.document_id = d.id
WHERE d.kb_id = ANY(${kbIds}::text[])
AND c.content LIKE ${'%' + query + '%'}
ORDER BY bigm_similarity(c.content, ${query}) DESC
LIMIT ${topK}
`;
return results;
}
/**
* 混合检索(向量 + 关键词 + RRF 融合)
*/
async hybridSearch(
kbIds: string[],
query: string,
topK: number = 10
): Promise<SearchResult[]> {
// 并发执行两路检索
const [vectorResults, keywordResults] = await Promise.all([
this.vectorSearch(kbIds, query, 20),
this.keywordSearch(kbIds, query, 20),
]);
// RRF 融合
return rrfFusion(vectorResults, keywordResults, topK);
}
/**
* 重排序Qwen-Rerank API
*/
async rerank(
documents: SearchResult[],
query: string,
topK: number = 10
): Promise<SearchResult[]> {
const response = await dashscope.rerank({
model: 'gte-rerank',
query,
documents: documents.map(d => d.content),
top_n: topK,
});
return response.results.map(r => ({
...documents[r.index],
score: r.relevance_score,
}));
}
```
### RRF 融合算法
```typescript
// utils/rrfFusion.ts
export function rrfFusion(
vectorResults: SearchResult[],
keywordResults: SearchResult[],
topK: number,
k: number = 60
): SearchResult[] {
const scores = new Map<string, number>();
// 向量检索得分
vectorResults.forEach((r, rank) => {
const score = scores.get(r.id) || 0;
scores.set(r.id, score + 1 / (k + rank + 1));
});
// 关键词检索得分
keywordResults.forEach((r, rank) => {
const score = scores.get(r.id) || 0;
scores.set(r.id, score + 1 / (k + rank + 1));
});
// 合并去重 + 排序
const allResults = [...vectorResults, ...keywordResults];
const uniqueResults = Array.from(
new Map(allResults.map(r => [r.id, r])).values()
);
return uniqueResults
.map(r => ({ ...r, score: scores.get(r.id) || 0 }))
.sort((a, b) => b.score - a.score)
.slice(0, topK);
}
```
### 任务清单
| 任务 | 预估 | 产出 |
|------|------|------|
| pg_bigm 索引 | 1h | SQL 迁移 |
| KeywordSearchService | 2h | 关键词检索pg_bigm |
| RRF 融合算法 | 2h | rrfFusion.ts |
| RerankService | 2h | 阿里云 API 封装 |
| hybridSearch 集成 | 2h | 混合检索 |
| 测试 | 3h | 检索质量验证 |
| **合计** | **12h (2天)** | |
### 验收标准
- [ ] 关键词检索支持中文("帕博利珠" 可匹配 "帕博利珠单抗"
- [ ] 混合检索可用
- [ ] rerank 可用
- [ ] 检索召回率提升(对比 Phase 1
---
## 🎯 Phase 3: 完整功能3天
### 目标
**完整架构落地**:支持大文件、高级提取功能
### 新增能力
| 能力 | 方法 | 说明 |
|------|------|------|
| **异步入库** | `submitIngestTask()` | pg-boss 队列 |
| | `getIngestStatus()` | 任务状态查询 |
| **摘要获取** | `getDocumentSummary()` | LLM 生成摘要 |
| | `getAllDocumentsSummaries()` | 批量获取摘要 |
| **临床数据** | `getClinicalData()` | PICO 等结构化数据 |
### Schema 说明
> ✅ **无需 Schema 升级**MVP 阶段已创建完整表结构Phase 3 只需填充字段。
```typescript
// Phase 3: 开始填充 summary、tokenCount、临床数据字段
await prisma.ekbDocument.update({
where: { id: documentId },
data: {
summary: generatedSummary, // 🆕 Phase 3 填充
tokenCount: calculatedTokens, // 🆕 Phase 3 填充
pico: extractedPico, // 🆕 Phase 3 填充
studyDesign: extractedStudyDesign, // 🆕 Phase 3 填充
regimen: extractedRegimen, // 🆕 Phase 3 填充
safety: extractedSafety, // 🆕 Phase 3 填充
}
});
```
### 异步入库实现
详见:[Postgres-Only异步任务处理指南](../Postgres-Only异步任务处理指南.md)
```typescript
// Phase 3: 异步入库
/**
* 提交入库任务
*/
async submitIngestTask(params: {
kbId: string;
userId: string;
file: Buffer;
filename: string;
options?: {
enableSummary?: boolean; // 💰 默认 false
enableClinicalExtraction?: boolean; // 💰 默认 false
};
}): Promise<{ taskId: string; documentId: string }> {
// 1. 快速上传到 OSS
const fileUrl = await storage.upload(params.file);
// 2. 创建文档记录status: processing
const document = await this.prisma.ekbDocument.create({
data: {
kbId: params.kbId,
userId: params.userId,
filename: params.filename,
fileUrl,
status: 'processing',
}
});
// 3. 推送任务到 pg-boss
const taskId = await jobQueue.send('ekb-ingest', {
documentId: document.id,
fileUrl,
options: params.options,
});
return { taskId, documentId: document.id };
}
/**
* 获取任务状态
*/
async getIngestStatus(taskId: string): Promise<{
status: 'pending' | 'processing' | 'completed' | 'failed';
progress: number;
error?: string;
}> {
const job = await jobQueue.getJobById(taskId);
return {
status: job.state,
progress: job.data?.progress || 0,
error: job.state === 'failed' ? job.output?.error : undefined,
};
}
```
### 任务清单
| 任务 | 预估 | 产出 |
|------|------|------|
| pg-boss Worker | 4h | ingestWorker.ts异步入库 |
| SummaryService | 3h | LLM 摘要生成 |
| ClinicalExtractionService | 4h | PICO 提取 + JSON 容错 |
| 摘要/临床数据 API | 2h | 新增 getDocumentSummary 等方法 |
| 测试 | 4h | 完整流程验证 |
| **合计** | **17h (3天)** | |
> ✅ **优势**:无需 Schema 迁移,只写业务代码。
### 验收标准
- [ ] 大文件上传不超时
- [ ] 任务状态可查询
- [ ] 摘要生成可用
- [ ] PICO 提取可用ASL 场景)
- [ ] AIA 摘要筛选策略可用
---
## 📊 总体进度
| 阶段 | 工期 | 累计 | 状态 |
|------|------|------|------|
| **Phase 1 MVP** | 3天 | 3天 | 🔜 待开始 |
| **Phase 2 增强** | 2天 | 5天 | 📋 规划中 |
| **Phase 3 完整** | 3天 | 8天 | 📋 规划中 |
---
## 🔗 相关文档
- [知识库引擎架构设计](./01-知识库引擎架构设计.md) - 完整架构目标
- [pgvector 替换 Dify 技术方案](./02-pgvector替换Dify计划.md) - 详细技术实现
- [Postgres-Only异步任务处理指南](../Postgres-Only异步任务处理指南.md) - 异步架构参考
---
## 📅 更新日志
### v1.0 (2026-01-20)
- 初始版本
- 确定三阶段实施方案MVP → 增强 → 完整
---
**维护人:** 技术架构师
**最后更新:** 2026-01-20