-- ======================================== -- 004-migrate-pkb.sql -- ======================================== -- 目的:迁移pkb_schema(个人知识库模块) -- 迁移表:5个(knowledge_bases, documents, batch_tasks, batch_results, task_templates) -- 预计时间:30分钟 -- 作者:AI助手 -- 日期:2025-11-09 -- ======================================== -- 前置条件: -- 1. 已执行 001-create-all-10-schemas.sql -- 2. 已执行 002-migrate-platform.sql(因为需要引用platform_schema.users) -- 3. public schema中的相关表存在且有数据 BEGIN; -- ======================================== -- 第一步:创建pkb_schema.knowledge_bases表 -- ======================================== CREATE TABLE IF NOT EXISTS pkb_schema.knowledge_bases ( id UUID PRIMARY KEY DEFAULT gen_random_uuid(), user_id UUID NOT NULL, name VARCHAR(255) NOT NULL, description TEXT, dify_dataset_id VARCHAR(255) NOT NULL, file_count INT DEFAULT 0, total_size_bytes BIGINT DEFAULT 0, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY (user_id) REFERENCES platform_schema.users(id) ON DELETE CASCADE ); CREATE INDEX IF NOT EXISTS idx_pkb_knowledge_bases_user_id ON pkb_schema.knowledge_bases(user_id); CREATE INDEX IF NOT EXISTS idx_pkb_knowledge_bases_dify_dataset_id ON pkb_schema.knowledge_bases(dify_dataset_id); -- ======================================== -- 第二步:创建pkb_schema.documents表(包含Phase 2字段) -- ======================================== CREATE TABLE IF NOT EXISTS pkb_schema.documents ( id UUID PRIMARY KEY DEFAULT gen_random_uuid(), kb_id UUID NOT NULL, user_id UUID NOT NULL, filename VARCHAR(255) NOT NULL, file_type VARCHAR(50) NOT NULL, file_size_bytes BIGINT NOT NULL, file_url TEXT NOT NULL, dify_document_id VARCHAR(255) NOT NULL, status VARCHAR(50) DEFAULT 'uploading', progress INT DEFAULT 0, error_message TEXT, segments_count INT, tokens_count INT, -- Phase 2:全文阅读模式字段 extraction_method VARCHAR(50), extraction_quality FLOAT, char_count INT, language VARCHAR(20), extracted_text TEXT, uploaded_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, processed_at TIMESTAMP, FOREIGN KEY (kb_id) REFERENCES pkb_schema.knowledge_bases(id) ON DELETE CASCADE, FOREIGN KEY (user_id) REFERENCES platform_schema.users(id) ON DELETE CASCADE ); CREATE INDEX IF NOT EXISTS idx_pkb_documents_kb_id ON pkb_schema.documents(kb_id); CREATE INDEX IF NOT EXISTS idx_pkb_documents_user_id ON pkb_schema.documents(user_id); CREATE INDEX IF NOT EXISTS idx_pkb_documents_status ON pkb_schema.documents(status); CREATE INDEX IF NOT EXISTS idx_pkb_documents_dify_document_id ON pkb_schema.documents(dify_document_id); CREATE INDEX IF NOT EXISTS idx_pkb_documents_extraction_method ON pkb_schema.documents(extraction_method); -- ======================================== -- 第三步:创建pkb_schema.batch_tasks表 -- ======================================== CREATE TABLE IF NOT EXISTS pkb_schema.batch_tasks ( id UUID PRIMARY KEY DEFAULT gen_random_uuid(), user_id UUID NOT NULL, kb_id UUID NOT NULL, name VARCHAR(255) NOT NULL, template_type VARCHAR(50) NOT NULL, template_id VARCHAR(100), prompt TEXT NOT NULL, status VARCHAR(50) NOT NULL, total_documents INT NOT NULL, completed_count INT DEFAULT 0, failed_count INT DEFAULT 0, model_type VARCHAR(50) NOT NULL, concurrency INT DEFAULT 3, started_at TIMESTAMP, completed_at TIMESTAMP, duration_seconds INT, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY (user_id) REFERENCES platform_schema.users(id) ON DELETE CASCADE, FOREIGN KEY (kb_id) REFERENCES pkb_schema.knowledge_bases(id) ON DELETE CASCADE ); CREATE INDEX IF NOT EXISTS idx_pkb_batch_tasks_user_id ON pkb_schema.batch_tasks(user_id); CREATE INDEX IF NOT EXISTS idx_pkb_batch_tasks_kb_id ON pkb_schema.batch_tasks(kb_id); CREATE INDEX IF NOT EXISTS idx_pkb_batch_tasks_status ON pkb_schema.batch_tasks(status); CREATE INDEX IF NOT EXISTS idx_pkb_batch_tasks_created_at ON pkb_schema.batch_tasks(created_at); -- ======================================== -- 第四步:创建pkb_schema.batch_results表 -- ======================================== CREATE TABLE IF NOT EXISTS pkb_schema.batch_results ( id UUID PRIMARY KEY DEFAULT gen_random_uuid(), task_id UUID NOT NULL, document_id UUID NOT NULL, status VARCHAR(50) NOT NULL, data JSONB, raw_output TEXT, error_message TEXT, processing_time_ms INT, tokens_used INT, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY (task_id) REFERENCES pkb_schema.batch_tasks(id) ON DELETE CASCADE, FOREIGN KEY (document_id) REFERENCES pkb_schema.documents(id) ON DELETE CASCADE ); CREATE INDEX IF NOT EXISTS idx_pkb_batch_results_task_id ON pkb_schema.batch_results(task_id); CREATE INDEX IF NOT EXISTS idx_pkb_batch_results_document_id ON pkb_schema.batch_results(document_id); CREATE INDEX IF NOT EXISTS idx_pkb_batch_results_status ON pkb_schema.batch_results(status); -- ======================================== -- 第五步:创建pkb_schema.task_templates表 -- ======================================== CREATE TABLE IF NOT EXISTS pkb_schema.task_templates ( id UUID PRIMARY KEY DEFAULT gen_random_uuid(), user_id UUID NOT NULL, name VARCHAR(255) NOT NULL, description TEXT, prompt TEXT NOT NULL, output_fields JSONB NOT NULL, is_public BOOLEAN DEFAULT false, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY (user_id) REFERENCES platform_schema.users(id) ON DELETE CASCADE ); CREATE INDEX IF NOT EXISTS idx_pkb_task_templates_user_id ON pkb_schema.task_templates(user_id); -- ======================================== -- 第六步:迁移数据 -- ======================================== -- 6.1 迁移knowledge_bases INSERT INTO pkb_schema.knowledge_bases ( id, user_id, name, description, dify_dataset_id, file_count, total_size_bytes, created_at, updated_at ) SELECT id, user_id, name, description, dify_dataset_id, file_count, total_size_bytes, created_at, updated_at FROM public.knowledge_bases ON CONFLICT (id) DO NOTHING; -- 6.2 迁移documents(包含Phase 2字段) INSERT INTO pkb_schema.documents ( id, kb_id, user_id, filename, file_type, file_size_bytes, file_url, dify_document_id, status, progress, error_message, segments_count, tokens_count, extraction_method, extraction_quality, char_count, language, extracted_text, uploaded_at, processed_at ) SELECT id, kb_id, user_id, filename, file_type, file_size_bytes, file_url, dify_document_id, status, progress, error_message, segments_count, tokens_count, extraction_method, extraction_quality, char_count, language, extracted_text, uploaded_at, processed_at FROM public.documents ON CONFLICT (id) DO NOTHING; -- 6.3 迁移batch_tasks INSERT INTO pkb_schema.batch_tasks ( id, user_id, kb_id, name, template_type, template_id, prompt, status, total_documents, completed_count, failed_count, model_type, concurrency, started_at, completed_at, duration_seconds, created_at, updated_at ) SELECT id, user_id, kb_id, name, template_type, template_id, prompt, status, total_documents, completed_count, failed_count, model_type, concurrency, started_at, completed_at, duration_seconds, created_at, updated_at FROM public.batch_tasks ON CONFLICT (id) DO NOTHING; -- 6.4 迁移batch_results INSERT INTO pkb_schema.batch_results ( id, task_id, document_id, status, data, raw_output, error_message, processing_time_ms, tokens_used, created_at ) SELECT id, task_id, document_id, status, data, raw_output, error_message, processing_time_ms, tokens_used, created_at FROM public.batch_results ON CONFLICT (id) DO NOTHING; -- 6.5 迁移task_templates INSERT INTO pkb_schema.task_templates ( id, user_id, name, description, prompt, output_fields, is_public, created_at, updated_at ) SELECT id, user_id, name, description, prompt, output_fields, is_public, created_at, updated_at FROM public.task_templates ON CONFLICT (id) DO NOTHING; -- ======================================== -- 第七步:数据验证 -- ======================================== DO $$ DECLARE public_knowledge_bases INTEGER; public_documents INTEGER; public_batch_tasks INTEGER; public_batch_results INTEGER; public_task_templates INTEGER; pkb_knowledge_bases INTEGER; pkb_documents INTEGER; pkb_batch_tasks INTEGER; pkb_batch_results INTEGER; pkb_task_templates INTEGER; all_match BOOLEAN := true; BEGIN -- 统计原表 SELECT COUNT(*) INTO public_knowledge_bases FROM public.knowledge_bases; SELECT COUNT(*) INTO public_documents FROM public.documents; SELECT COUNT(*) INTO public_batch_tasks FROM public.batch_tasks; SELECT COUNT(*) INTO public_batch_results FROM public.batch_results; SELECT COUNT(*) INTO public_task_templates FROM public.task_templates; -- 统计新表 SELECT COUNT(*) INTO pkb_knowledge_bases FROM pkb_schema.knowledge_bases; SELECT COUNT(*) INTO pkb_documents FROM pkb_schema.documents; SELECT COUNT(*) INTO pkb_batch_tasks FROM pkb_schema.batch_tasks; SELECT COUNT(*) INTO pkb_batch_results FROM pkb_schema.batch_results; SELECT COUNT(*) INTO pkb_task_templates FROM pkb_schema.task_templates; -- 输出统计 RAISE NOTICE '==================== 数据迁移统计 ===================='; RAISE NOTICE 'knowledge_bases: public.% -> pkb_schema.%', public_knowledge_bases, pkb_knowledge_bases; RAISE NOTICE 'documents: public.% -> pkb_schema.%', public_documents, pkb_documents; RAISE NOTICE 'batch_tasks: public.% -> pkb_schema.%', public_batch_tasks, pkb_batch_tasks; RAISE NOTICE 'batch_results: public.% -> pkb_schema.%', public_batch_results, pkb_batch_results; RAISE NOTICE 'task_templates: public.% -> pkb_schema.%', public_task_templates, pkb_task_templates; RAISE NOTICE '====================================================='; -- 验证每个表 IF public_knowledge_bases != pkb_knowledge_bases THEN RAISE WARNING '⚠️ knowledge_bases 数据量不一致'; all_match := false; END IF; IF public_documents != pkb_documents THEN RAISE WARNING '⚠️ documents 数据量不一致'; all_match := false; END IF; IF public_batch_tasks != pkb_batch_tasks THEN RAISE WARNING '⚠️ batch_tasks 数据量不一致'; all_match := false; END IF; IF public_batch_results != pkb_batch_results THEN RAISE WARNING '⚠️ batch_results 数据量不一致'; all_match := false; END IF; IF public_task_templates != pkb_task_templates THEN RAISE WARNING '⚠️ task_templates 数据量不一致'; all_match := false; END IF; IF all_match THEN RAISE NOTICE '✅ PKB Schema 所有表数据迁移成功!'; ELSE RAISE WARNING '⚠️ 部分表数据迁移存在问题,请检查'; END IF; END $$; -- ======================================== -- 第八步:外键完整性验证 -- ======================================== -- 验证knowledge_bases的user_id DO $$ DECLARE invalid_kb_users INTEGER; invalid_doc_users INTEGER; invalid_doc_kb INTEGER; BEGIN -- 验证knowledge_bases.user_id SELECT COUNT(*) INTO invalid_kb_users FROM pkb_schema.knowledge_bases kb LEFT JOIN platform_schema.users u ON kb.user_id = u.id WHERE u.id IS NULL; IF invalid_kb_users > 0 THEN RAISE WARNING '⚠️ knowledge_bases表中有 % 条记录的user_id无效', invalid_kb_users; ELSE RAISE NOTICE '✅ knowledge_bases表user_id外键完整性验证通过'; END IF; -- 验证documents.user_id SELECT COUNT(*) INTO invalid_doc_users FROM pkb_schema.documents d LEFT JOIN platform_schema.users u ON d.user_id = u.id WHERE u.id IS NULL; IF invalid_doc_users > 0 THEN RAISE WARNING '⚠️ documents表中有 % 条记录的user_id无效', invalid_doc_users; ELSE RAISE NOTICE '✅ documents表user_id外键完整性验证通过'; END IF; -- 验证documents.kb_id SELECT COUNT(*) INTO invalid_doc_kb FROM pkb_schema.documents d LEFT JOIN pkb_schema.knowledge_bases kb ON d.kb_id = kb.id WHERE kb.id IS NULL; IF invalid_doc_kb > 0 THEN RAISE WARNING '⚠️ documents表中有 % 条记录的kb_id无效', invalid_doc_kb; ELSE RAISE NOTICE '✅ documents表kb_id外键完整性验证通过'; END IF; END $$; -- ======================================== -- 第九步:Phase 2字段统计 -- ======================================== SELECT '全文阅读字段统计' AS category, COUNT(*) AS total_documents, COUNT(extracted_text) AS has_extracted_text, COUNT(extraction_method) AS has_extraction_method, COUNT(CASE WHEN extraction_method = 'pymupdf' THEN 1 END) AS pymupdf_count, COUNT(CASE WHEN extraction_method = 'nougat' THEN 1 END) AS nougat_count, AVG(extraction_quality) AS avg_quality, AVG(char_count) AS avg_char_count FROM pkb_schema.documents; COMMIT; -- ======================================== -- 执行结果统计(可单独运行) -- ======================================== SELECT 'pkb_schema' AS schema_name, 'knowledge_bases' AS table_name, COUNT(*) AS row_count, COUNT(DISTINCT user_id) AS unique_users, SUM(file_count) AS total_files, pg_size_pretty(SUM(total_size_bytes)) AS total_size FROM pkb_schema.knowledge_bases UNION ALL SELECT 'pkb_schema', 'documents', COUNT(*), COUNT(DISTINCT user_id), NULL, pg_size_pretty(SUM(file_size_bytes)) FROM pkb_schema.documents UNION ALL SELECT 'pkb_schema', 'batch_tasks', COUNT(*), COUNT(DISTINCT user_id), NULL, NULL FROM pkb_schema.batch_tasks; -- ======================================== -- 完成提示 -- ======================================== -- ✅ PKB Schema 迁移完成 -- 包含5个表:knowledge_bases, documents, batch_tasks, -- batch_results, task_templates -- 下一步:执行 005-validate-all.sql 进行全局验证 -- ========================================