feat: add extraction_service (PDF/Docx/Txt) and update .gitignore to exclude venv

This commit is contained in:
AI Clinical Dev Team
2025-11-16 15:32:44 +08:00
parent 2a4f59b08b
commit 39eb62ee79
18 changed files with 2706 additions and 0 deletions

View File

@@ -0,0 +1,11 @@
"""
服务模块
包含各种文档提取和处理服务
"""

View File

@@ -0,0 +1,257 @@
"""
Docx文档提取服务
使用Mammoth库提取Word文档文本
支持.docx格式不支持老版.doc
"""
import mammoth
from pathlib import Path
from typing import Dict, Any
from loguru import logger
def extract_docx_mammoth(file_path: str) -> Dict[str, Any]:
"""
使用Mammoth提取Docx文本
Mammoth特点:
- 转换为纯文本或HTML
- 保留基本格式信息
- 处理表格、列表等结构
Args:
file_path: Docx文件路径
Returns:
{
"success": True,
"text": "提取的文本内容",
"format": "plain_text",
"metadata": {
"char_count": 字符数,
"has_tables": 是否包含表格,
"file_size": 文件大小
}
}
"""
try:
file_path_obj = Path(file_path)
# 验证文件存在
if not file_path_obj.exists():
return {
"success": False,
"error": f"文件不存在: {file_path}",
"text": "",
"metadata": {}
}
# 验证文件格式
if file_path_obj.suffix.lower() != '.docx':
return {
"success": False,
"error": f"不支持的文件格式: {file_path_obj.suffix},仅支持.docx",
"text": "",
"metadata": {}
}
logger.info(f"开始提取Docx文件: {file_path_obj.name}")
# 使用Mammoth提取纯文本
with open(file_path, "rb") as docx_file:
result = mammoth.extract_raw_text(docx_file)
text = result.value # 提取的文本
messages = result.messages # 警告/错误信息
# 检查是否有警告
if messages:
logger.warning(f"Mammoth提取警告: {len(messages)}")
for msg in messages:
logger.debug(f" - {msg.type}: {msg.message}")
# 简单的质量检查
char_count = len(text)
if char_count == 0:
logger.warning("提取的文本为空")
return {
"success": False,
"error": "文档内容为空或无法提取",
"text": "",
"metadata": {
"char_count": 0,
"file_size": file_path_obj.stat().st_size
}
}
# 简单判断是否包含表格(通过制表符或特殊结构)
has_tables = '\t' in text or '|' in text
logger.info(f"Docx提取成功: {char_count}个字符")
return {
"success": True,
"text": text,
"format": "plain_text",
"metadata": {
"char_count": char_count,
"has_tables": has_tables,
"file_size": file_path_obj.stat().st_size,
"warnings": len(messages)
}
}
except Exception as e:
logger.error(f"Docx提取失败: {str(e)}")
return {
"success": False,
"error": str(e),
"text": "",
"metadata": {}
}
def extract_docx_html(file_path: str) -> Dict[str, Any]:
"""
使用Mammoth提取Docx为HTML格式保留更多格式
Args:
file_path: Docx文件路径
Returns:
{
"success": True,
"html": "HTML格式的文本",
"format": "html",
"metadata": {...}
}
"""
try:
file_path_obj = Path(file_path)
if not file_path_obj.exists():
return {
"success": False,
"error": f"文件不存在: {file_path}",
"html": "",
"metadata": {}
}
logger.info(f"开始提取Docx为HTML: {file_path_obj.name}")
# 提取为HTML
with open(file_path, "rb") as docx_file:
result = mammoth.convert_to_html(docx_file)
html = result.value
messages = result.messages
if messages:
logger.warning(f"HTML转换警告: {len(messages)}")
logger.info(f"HTML提取成功: {len(html)}个字符")
return {
"success": True,
"html": html,
"format": "html",
"metadata": {
"html_length": len(html),
"file_size": file_path_obj.stat().st_size,
"warnings": len(messages)
}
}
except Exception as e:
logger.error(f"HTML提取失败: {str(e)}")
return {
"success": False,
"error": str(e),
"html": "",
"metadata": {}
}
def validate_docx_file(file_path: str) -> Dict[str, Any]:
"""
验证Docx文件的有效性
Args:
file_path: 文件路径
Returns:
{
"valid": True/False,
"reason": "原因",
"file_info": {文件信息}
}
"""
try:
file_path_obj = Path(file_path)
# 检查文件存在
if not file_path_obj.exists():
return {
"valid": False,
"reason": "文件不存在"
}
# 检查后缀
if file_path_obj.suffix.lower() != '.docx':
return {
"valid": False,
"reason": f"不支持的格式: {file_path_obj.suffix}(仅支持.docx"
}
# 检查文件大小限制50MB
file_size = file_path_obj.stat().st_size
max_size = 50 * 1024 * 1024 # 50MB
if file_size > max_size:
return {
"valid": False,
"reason": f"文件过大: {file_size / 1024 / 1024:.2f}MB限制50MB"
}
if file_size == 0:
return {
"valid": False,
"reason": "文件为空"
}
# 尝试打开文件(基本有效性检查)
try:
with open(file_path, "rb") as f:
# 读取前4个字节检查ZIP签名docx本质是ZIP文件
signature = f.read(4)
if signature != b'PK\x03\x04':
return {
"valid": False,
"reason": "不是有效的Docx文件ZIP签名错误"
}
except Exception as e:
return {
"valid": False,
"reason": f"无法读取文件: {str(e)}"
}
return {
"valid": True,
"reason": "文件有效",
"file_info": {
"filename": file_path_obj.name,
"size": file_size,
"size_mb": round(file_size / 1024 / 1024, 2)
}
}
except Exception as e:
return {
"valid": False,
"reason": f"验证失败: {str(e)}"
}

View File

@@ -0,0 +1,88 @@
"""
文件工具函数
"""
import os
from pathlib import Path
from loguru import logger
def detect_file_type(filename: str) -> str:
"""
根据文件名检测文件类型
Args:
filename: 文件名
Returns:
文件类型: 'pdf' | 'docx' | 'txt'
Raises:
ValueError: 不支持的文件格式
"""
ext = filename.lower().split('.')[-1]
if ext == 'pdf':
return 'pdf'
elif ext == 'docx':
return 'docx'
elif ext == 'txt':
return 'txt'
else:
raise ValueError(f"不支持的文件格式: .{ext}")
def cleanup_temp_file(file_path: Path | str) -> None:
"""
清理临时文件
Args:
file_path: 文件路径
"""
try:
if isinstance(file_path, str):
file_path = Path(file_path)
if file_path.exists():
file_path.unlink()
logger.debug(f"清理临时文件: {file_path}")
except Exception as e:
logger.warning(f"清理临时文件失败: {str(e)}")
def get_file_size_mb(file_path: Path | str) -> float:
"""
获取文件大小MB
Args:
file_path: 文件路径
Returns:
文件大小MB
"""
if isinstance(file_path, str):
file_path = Path(file_path)
if file_path.exists():
return file_path.stat().st_size / (1024 * 1024)
return 0.0
def validate_file_size(file_size: int, max_size: int = 52428800) -> bool:
"""
验证文件大小
Args:
file_size: 文件大小(字节)
max_size: 最大允许大小字节默认50MB
Returns:
是否通过验证
"""
return file_size <= max_size

View File

@@ -0,0 +1,160 @@
"""
语言检测服务
检测PDF文档的主要语言中文/英文/混合)
用于决定使用哪种提取方法
"""
import pdfplumber
from typing import Dict, Any
from loguru import logger
def detect_language(pdf_path: str) -> str:
"""
检测PDF主要语言
策略:
1. 提取前3页文本代表性强
2. 统计中文字符比例
3. 判断语言类型
Args:
pdf_path: PDF文件路径
Returns:
'chinese' | 'english' | 'mixed'
"""
try:
logger.info(f"开始语言检测: {pdf_path}")
with pdfplumber.open(pdf_path) as pdf:
# 提取前3页文本或全部如果少于3页
sample_pages = min(3, len(pdf.pages))
sample_text = ""
for i in range(sample_pages):
try:
page_text = pdf.pages[i].extract_text()
if page_text:
sample_text += page_text + "\n"
except Exception as e:
logger.warning(f"{i+1}页文本提取失败: {str(e)}")
continue
# 检查是否有足够文本
if len(sample_text.strip()) < 100:
logger.warning("文本太少,默认使用英文处理")
return 'english'
# 统计中文字符比例
chinese_chars = len([c for c in sample_text if '\u4e00' <= c <= '\u9fff'])
total_chars = len([c for c in sample_text if c.strip()])
if total_chars == 0:
logger.warning("无有效字符,默认使用英文处理")
return 'english'
chinese_ratio = chinese_chars / total_chars
logger.info(f"中文字符比例: {chinese_ratio:.2%} ({chinese_chars}/{total_chars})")
# 判断语言
# 阈值说明:
# - > 30%: 判定为中文PDF包括中英混合但中文为主
# - <= 30%: 判定为英文PDF
if chinese_ratio > 0.3:
language = 'chinese'
else:
language = 'english'
logger.info(f"检测结果: {language}")
return language
except Exception as e:
logger.error(f"语言检测失败: {str(e)},默认使用英文处理")
return 'english'
def detect_language_detailed(pdf_path: str) -> Dict[str, Any]:
"""
详细的语言检测
返回更多统计信息
Args:
pdf_path: PDF文件路径
Returns:
{
"language": "chinese" | "english" | "mixed",
"chinese_ratio": 0.65,
"chinese_chars": 3500,
"total_chars": 5000,
"sample_pages": 3,
"sample_text_length": 5000
}
"""
try:
with pdfplumber.open(pdf_path) as pdf:
sample_pages = min(3, len(pdf.pages))
sample_text = ""
for i in range(sample_pages):
try:
page_text = pdf.pages[i].extract_text()
if page_text:
sample_text += page_text + "\n"
except:
continue
# 统计
chinese_chars = len([c for c in sample_text if '\u4e00' <= c <= '\u9fff'])
total_chars = len([c for c in sample_text if c.strip()])
chinese_ratio = chinese_chars / total_chars if total_chars > 0 else 0
# 判断语言
if chinese_ratio > 0.3:
language = 'chinese'
elif chinese_ratio > 0.1:
language = 'mixed'
else:
language = 'english'
return {
"language": language,
"chinese_ratio": round(chinese_ratio, 4),
"chinese_chars": chinese_chars,
"total_chars": total_chars,
"sample_pages": sample_pages,
"sample_text_length": len(sample_text)
}
except Exception as e:
logger.error(f"详细语言检测失败: {str(e)}")
return {
"language": "english",
"error": str(e)
}
def is_chinese_pdf(pdf_path: str, threshold: float = 0.3) -> bool:
"""
简单判断是否为中文PDF
Args:
pdf_path: PDF文件路径
threshold: 中文字符比例阈值默认30%
Returns:
True if 中文字符比例 > threshold
"""
language = detect_language(pdf_path)
return language == 'chinese'

View File

@@ -0,0 +1,241 @@
"""
Nougat提取服务
使用Nougat OCR提取学术PDF的高质量文本
保留表格、公式等结构信息
"""
import subprocess
import os
from pathlib import Path
from typing import Dict, Any, Optional, Callable
from loguru import logger
def check_nougat_available() -> bool:
"""
检查Nougat是否已安装
Returns:
True if Nougat可用
"""
try:
# 方法1: 尝试导入nougat模块
import nougat
logger.info(f"Nougat module is available (version: {getattr(nougat, '__version__', 'unknown')})")
return True
except ImportError:
logger.warning("Nougat module not found")
return False
except Exception as e:
logger.error(f"检查Nougat失败: {str(e)}")
return False
def extract_pdf_nougat(
file_path: str,
output_dir: Optional[str] = None,
progress_callback: Optional[Callable[[int, int], None]] = None
) -> Dict[str, Any]:
"""
使用Nougat提取PDF文本
Args:
file_path: PDF文件路径
output_dir: 输出目录,默认为临时目录
progress_callback: 进度回调函数 (current_page, total_pages)
Returns:
{
"success": True,
"method": "nougat",
"text": "提取的Markdown文本",
"format": "markdown",
"metadata": {
"page_count": 20,
"char_count": 50000,
"quality_score": 0.95,
"has_tables": True,
"has_formulas": True
}
}
"""
try:
# 检查Nougat是否可用
if not check_nougat_available():
raise Exception("Nougat未安装请先安装pip install nougat-ocr")
logger.info(f"开始使用Nougat提取: {file_path}")
# 准备输出目录
if output_dir is None:
output_dir = os.path.join(os.path.dirname(file_path), "nougat_output")
Path(output_dir).mkdir(parents=True, exist_ok=True)
# 构建Nougat命令
# nougat命令格式nougat <pdf_path> -o <output_dir>
cmd = [
'nougat',
file_path,
'-o', output_dir,
'--markdown', # 输出Markdown格式
'--no-skipping' # 不跳过任何页面
]
logger.info(f"执行命令: {' '.join(cmd)}")
# 执行Nougat
# 注意Nougat可能需要较长时间1-2分钟/20页
process = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True
)
# 等待完成
stdout, stderr = process.communicate(timeout=300) # 5分钟超时
if process.returncode != 0:
logger.error(f"Nougat执行失败: {stderr}")
raise Exception(f"Nougat执行失败: {stderr}")
# 读取输出文件
# Nougat会生成 <filename>.mmd 文件
pdf_name = Path(file_path).stem
output_file = Path(output_dir) / f"{pdf_name}.mmd"
if not output_file.exists():
raise Exception(f"Nougat输出文件不存在: {output_file}")
with open(output_file, 'r', encoding='utf-8') as f:
markdown_text = f.read()
# 评估质量
quality_result = evaluate_nougat_quality(markdown_text)
logger.info(f"Nougat提取完成: 质量={quality_result['quality_score']:.2f}")
return {
"success": True,
"method": "nougat",
"text": markdown_text,
"format": "markdown",
"metadata": {
"char_count": len(markdown_text),
"quality_score": quality_result['quality_score'],
"has_tables": quality_result['has_tables'],
"has_formulas": quality_result['has_formulas'],
"has_structure": quality_result['has_structure']
}
}
except subprocess.TimeoutExpired:
logger.error("Nougat处理超时>5分钟")
return {
"success": False,
"error": "处理超时",
"method": "nougat"
}
except Exception as e:
logger.error(f"Nougat提取失败: {str(e)}")
return {
"success": False,
"error": str(e),
"method": "nougat"
}
def evaluate_nougat_quality(text: str) -> Dict[str, Any]:
"""
评估Nougat提取质量
评分标准:
- 基础分0.5
- 有章节结构:+0.2
- 有表格:+0.15
- 有公式:+0.15
- 文本长度充足:+0.1
- 乱码检测:-0.3
Args:
text: Nougat提取的Markdown文本
Returns:
{
"quality_score": 0.92,
"has_structure": True,
"has_tables": True,
"has_formulas": True,
"has_garbled": False
}
"""
score = 0.5 # 基础分
# 检查章节结构Markdown标题
has_structure = bool(text.count('##') >= 2 or text.count('#') >= 3)
if has_structure:
score += 0.2
# 检查表格
has_tables = '|' in text and '---' in text
if has_tables:
score += 0.15
# 检查公式LaTeX格式
has_formulas = '$$' in text or '$' in text or '\\(' in text
if has_formulas:
score += 0.15
# 检查文本长度
if len(text) > 5000: # 至少5000字符
score += 0.1
# 检查乱码(简单启发式)
# 大量重复字符或特殊符号可能表示乱码
garbled_chars = sum(1 for c in text if ord(c) > 65535 or c in '<EFBFBD><EFBFBD>')
has_garbled = garbled_chars > len(text) * 0.05 # 超过5%
if has_garbled:
score -= 0.3
# 确保分数在0-1之间
score = max(0.0, min(1.0, score))
return {
"quality_score": score,
"has_structure": has_structure,
"has_tables": has_tables,
"has_formulas": has_formulas,
"has_garbled": has_garbled
}
def get_nougat_info() -> Dict[str, Any]:
"""
获取Nougat信息
Returns:
Nougat版本和状态信息
"""
try:
import nougat
version = getattr(nougat, '__version__', 'unknown')
return {
"available": True,
"version": version
}
except ImportError:
return {
"available": False,
"error": "Nougat未安装"
}
except Exception as e:
return {
"available": False,
"error": str(e)
}

View File

@@ -0,0 +1,191 @@
"""
PDF文本提取服务
使用PyMuPDF (fitz)提取PDF文本内容
"""
import fitz # PyMuPDF
from typing import Dict, Any
from loguru import logger
def extract_pdf_pymupdf(file_path: str) -> Dict[str, Any]:
"""
使用PyMuPDF提取PDF文本
Args:
file_path: PDF文件路径
Returns:
{
"success": True,
"method": "pymupdf",
"text": "提取的文本",
"metadata": {
"page_count": 20,
"char_count": 50000,
"has_text": True
}
}
"""
try:
logger.info(f"开始使用PyMuPDF提取: {file_path}")
# 打开PDF
doc = fitz.open(file_path)
page_count = len(doc)
logger.info(f"PDF页数: {page_count}")
# 提取所有页面的文本
text_parts = []
for page_num in range(page_count):
try:
page = doc[page_num]
text = page.get_text()
if text.strip():
# 添加页面分隔符
text_parts.append(f"\n\n--- 第 {page_num + 1} 页 ---\n\n")
text_parts.append(text)
logger.debug(f"{page_num + 1} 页提取了 {len(text)} 个字符")
except Exception as e:
logger.warning(f"{page_num + 1} 页提取失败: {str(e)}")
continue
# 合并文本
full_text = "".join(text_parts)
char_count = len(full_text)
# 关闭文档
doc.close()
# 检查是否提取到文本
has_text = char_count > 100 # 至少要有100个字符
if not has_text:
logger.warning(f"PDF可能是扫描版或无文本内容")
logger.info(f"PyMuPDF提取完成: 字符数={char_count}")
return {
"success": True,
"method": "pymupdf",
"text": full_text,
"format": "plain_text",
"metadata": {
"page_count": page_count,
"char_count": char_count,
"has_text": has_text
}
}
except Exception as e:
logger.error(f"PyMuPDF提取失败: {str(e)}")
return {
"success": False,
"error": str(e),
"method": "pymupdf"
}
def extract_pdf_with_layout(file_path: str) -> Dict[str, Any]:
"""
使用PyMuPDF提取PDF文本保留布局
Args:
file_path: PDF文件路径
Returns:
提取结果
"""
try:
logger.info(f"开始使用PyMuPDF提取保留布局: {file_path}")
doc = fitz.open(file_path)
page_count = len(doc)
text_parts = []
for page_num in range(page_count):
try:
page = doc[page_num]
# 使用dict模式提取可以保留更多格式信息
blocks = page.get_text("dict")["blocks"]
page_text = []
for block in blocks:
if block["type"] == 0: # 文本块
for line in block.get("lines", []):
for span in line.get("spans", []):
text = span.get("text", "")
if text.strip():
page_text.append(text)
if page_text:
text_parts.append(f"\n\n--- 第 {page_num + 1} 页 ---\n\n")
text_parts.append(" ".join(page_text))
except Exception as e:
logger.warning(f"{page_num + 1} 页处理失败: {str(e)}")
continue
full_text = "".join(text_parts)
doc.close()
return {
"success": True,
"method": "pymupdf_layout",
"text": full_text,
"format": "plain_text",
"metadata": {
"page_count": page_count,
"char_count": len(full_text)
}
}
except Exception as e:
logger.error(f"PyMuPDF布局提取失败: {str(e)}")
return {
"success": False,
"error": str(e)
}
def get_pdf_metadata(file_path: str) -> Dict[str, Any]:
"""
获取PDF元数据
Args:
file_path: PDF文件路径
Returns:
PDF元数据
"""
try:
doc = fitz.open(file_path)
metadata = {
"page_count": len(doc),
"metadata": doc.metadata,
"is_encrypted": doc.is_encrypted,
"is_pdf": doc.is_pdf
}
doc.close()
return metadata
except Exception as e:
logger.error(f"获取PDF元数据失败: {str(e)}")
return {}

View File

@@ -0,0 +1,192 @@
"""
PDF处理主服务
实现顺序降级策略:
1. 检测语言
2. 中文PDF → PyMuPDF快速
3. 英文PDF → Nougat → 失败降级PyMuPDF
"""
from typing import Dict, Any, Optional
from loguru import logger
from .language_detector import detect_language
from .nougat_extractor import extract_pdf_nougat, check_nougat_available
from .pdf_extractor import extract_pdf_pymupdf
def extract_pdf(
file_path: str,
force_method: Optional[str] = None
) -> Dict[str, Any]:
"""
PDF提取主函数顺序降级策略
处理流程:
1. 检测语言
2. 中文 → 直接PyMuPDF
3. 英文 → 尝试Nougat → 失败降级PyMuPDF
Args:
file_path: PDF文件路径
force_method: 强制使用的方法 ('nougat' | 'pymupdf')
Returns:
{
"success": True,
"method": "nougat" | "pymupdf",
"reason": "chinese_pdf" | "english_pdf" | "nougat_failed" | "nougat_low_quality",
"text": "提取的文本",
"metadata": {...}
}
"""
try:
logger.info(f"开始处理PDF: {file_path}")
# Step 1: 语言检测
logger.info("[Step 1] 检测PDF语言...")
language = detect_language(file_path)
logger.info(f"检测结果: {language}")
# 如果强制指定方法
if force_method:
logger.info(f"强制使用方法: {force_method}")
if force_method == 'nougat':
return extract_pdf_nougat(file_path)
elif force_method == 'pymupdf':
result = extract_pdf_pymupdf(file_path)
result['reason'] = 'force_pymupdf'
return result
# Step 2: 中文PDF → 直接PyMuPDF
if language == 'chinese':
logger.info("[Step 2] 中文PDF使用PyMuPDF快速处理")
result = extract_pdf_pymupdf(file_path)
if result['success']:
result['reason'] = 'chinese_pdf'
result['detected_language'] = language
logger.info("✅ PyMuPDF处理成功中文PDF")
return result
else:
logger.error("❌ PyMuPDF处理失败")
return result
# Step 3: 英文PDF → 尝试Nougat
logger.info("[Step 3] 英文PDF尝试Nougat高质量解析")
# 检查Nougat是否可用
if not check_nougat_available():
logger.warning("⚠️ Nougat不可用降级到PyMuPDF")
result = extract_pdf_pymupdf(file_path)
if result['success']:
result['reason'] = 'nougat_unavailable'
result['detected_language'] = language
return result
# 尝试Nougat
try:
nougat_result = extract_pdf_nougat(file_path)
if not nougat_result['success']:
logger.warning("⚠️ Nougat提取失败降级到PyMuPDF")
raise Exception(nougat_result.get('error', 'Nougat failed'))
# 质量检查
quality_score = nougat_result['metadata'].get('quality_score', 0)
logger.info(f"Nougat质量评分: {quality_score:.2f}")
# 质量阈值0.7
if quality_score >= 0.7:
logger.info("✅ Nougat处理成功质量合格")
nougat_result['reason'] = 'english_pdf_high_quality'
nougat_result['detected_language'] = language
return nougat_result
else:
logger.warning(f"⚠️ Nougat质量不足: {quality_score:.2f}降级到PyMuPDF")
raise Exception(f"Quality too low: {quality_score}")
except Exception as e:
logger.warning(f"Nougat处理失败: {str(e)}降级到PyMuPDF")
# Step 4: 降级到PyMuPDF
logger.info("[Step 4] 降级使用PyMuPDF")
result = extract_pdf_pymupdf(file_path)
if result['success']:
result['reason'] = 'nougat_failed_or_low_quality'
result['detected_language'] = language
result['fallback'] = True
logger.info("✅ PyMuPDF处理成功降级方案")
else:
logger.error("❌ PyMuPDF处理也失败了")
return result
except Exception as e:
logger.error(f"PDF处理完全失败: {str(e)}")
return {
"success": False,
"error": str(e),
"method": "unknown"
}
def get_pdf_processing_strategy(file_path: str) -> Dict[str, Any]:
"""
获取PDF处理策略不实际提取
用于预览将使用哪种方法
Args:
file_path: PDF文件路径
Returns:
{
"detected_language": "chinese" | "english",
"recommended_method": "nougat" | "pymupdf",
"reason": "...",
"nougat_available": True | False
}
"""
try:
# 检测语言
language = detect_language(file_path)
# 检查Nougat可用性
nougat_available = check_nougat_available()
# 决定策略
if language == 'chinese':
recommended_method = 'pymupdf'
reason = '中文PDF推荐使用PyMuPDF快速处理'
elif nougat_available:
recommended_method = 'nougat'
reason = '英文PDF推荐使用Nougat高质量解析'
else:
recommended_method = 'pymupdf'
reason = 'Nougat不可用使用PyMuPDF'
return {
"detected_language": language,
"recommended_method": recommended_method,
"reason": reason,
"nougat_available": nougat_available
}
except Exception as e:
logger.error(f"获取处理策略失败: {str(e)}")
return {
"error": str(e)
}

View File

@@ -0,0 +1,320 @@
"""
Txt文本文件提取服务
直接读取纯文本文件
支持多种编码自动检测
"""
from pathlib import Path
from typing import Dict, Any, List
from loguru import logger
import chardet
def extract_txt(file_path: str) -> Dict[str, Any]:
"""
提取Txt文件内容
特性:
- 自动检测编码UTF-8, GBK, GB2312等
- 支持大文件(逐块读取)
- 去除BOM标记
Args:
file_path: Txt文件路径
Returns:
{
"success": True,
"text": "文本内容",
"encoding": "检测到的编码",
"metadata": {
"char_count": 字符数,
"line_count": 行数,
"file_size": 文件大小
}
}
"""
try:
file_path_obj = Path(file_path)
# 验证文件存在
if not file_path_obj.exists():
return {
"success": False,
"error": f"文件不存在: {file_path}",
"text": "",
"metadata": {}
}
# 验证文件格式
if file_path_obj.suffix.lower() != '.txt':
return {
"success": False,
"error": f"不支持的文件格式: {file_path_obj.suffix},仅支持.txt",
"text": "",
"metadata": {}
}
file_size = file_path_obj.stat().st_size
# 空文件检查
if file_size == 0:
return {
"success": False,
"error": "文件为空",
"text": "",
"metadata": {
"char_count": 0,
"line_count": 0,
"file_size": 0
}
}
logger.info(f"开始提取Txt文件: {file_path_obj.name} ({file_size / 1024:.2f} KB)")
# 检测编码
detected_encoding = detect_encoding(file_path)
logger.info(f"检测到编码: {detected_encoding}")
# 读取文件(带编码回退)
text, actual_encoding = read_with_fallback(file_path, detected_encoding)
if text is None:
return {
"success": False,
"error": "无法解码文件,尝试了多种编码均失败",
"text": "",
"metadata": {}
}
# 统计信息
char_count = len(text)
line_count = text.count('\n') + 1
logger.info(f"Txt提取成功: {char_count}个字符, {line_count}")
return {
"success": True,
"text": text,
"encoding": actual_encoding,
"metadata": {
"char_count": char_count,
"line_count": line_count,
"file_size": file_size,
"size_kb": round(file_size / 1024, 2)
}
}
except Exception as e:
logger.error(f"Txt提取失败: {str(e)}")
return {
"success": False,
"error": str(e),
"text": "",
"metadata": {}
}
def detect_encoding(file_path: str, sample_size: int = 10000) -> str:
"""
检测文件编码
Args:
file_path: 文件路径
sample_size: 采样大小(字节)
Returns:
检测到的编码名称
"""
try:
with open(file_path, 'rb') as f:
raw_data = f.read(sample_size)
# 使用chardet检测
result = chardet.detect(raw_data)
encoding = result['encoding']
confidence = result['confidence']
logger.debug(f"编码检测: {encoding} (置信度: {confidence:.2f})")
# 如果置信度太低使用UTF-8作为默认
if confidence < 0.7:
logger.warning(f"编码置信度较低({confidence:.2f})将尝试UTF-8")
return 'utf-8'
return encoding if encoding else 'utf-8'
except Exception as e:
logger.warning(f"编码检测失败: {str(e)}使用UTF-8")
return 'utf-8'
def read_with_fallback(file_path: str, primary_encoding: str) -> tuple[str, str]:
"""
尝试多种编码读取文件
Args:
file_path: 文件路径
primary_encoding: 首选编码
Returns:
(文本内容, 实际使用的编码)
"""
# 编码尝试列表(按优先级)
encodings = [
primary_encoding,
'utf-8',
'utf-8-sig', # UTF-8 with BOM
'gbk',
'gb2312',
'gb18030',
'latin-1',
'cp1252',
'iso-8859-1'
]
# 去重并保持顺序
seen = set()
unique_encodings = []
for enc in encodings:
if enc and enc.lower() not in seen:
seen.add(enc.lower())
unique_encodings.append(enc)
# 尝试每种编码
for encoding in unique_encodings:
try:
with open(file_path, 'r', encoding=encoding, errors='strict') as f:
text = f.read()
logger.info(f"成功使用编码: {encoding}")
return text, encoding
except UnicodeDecodeError:
logger.debug(f"编码 {encoding} 解码失败,尝试下一个")
continue
except Exception as e:
logger.warning(f"使用编码 {encoding} 读取失败: {str(e)}")
continue
# 所有编码都失败
logger.error("所有编码尝试均失败")
return None, None
def validate_txt_file(file_path: str) -> Dict[str, Any]:
"""
验证Txt文件的有效性
Args:
file_path: 文件路径
Returns:
{
"valid": True/False,
"reason": "原因",
"file_info": {文件信息}
}
"""
try:
file_path_obj = Path(file_path)
# 检查文件存在
if not file_path_obj.exists():
return {
"valid": False,
"reason": "文件不存在"
}
# 检查后缀
if file_path_obj.suffix.lower() != '.txt':
return {
"valid": False,
"reason": f"不支持的格式: {file_path_obj.suffix}(仅支持.txt"
}
# 检查文件大小限制10MBtxt文件通常较小
file_size = file_path_obj.stat().st_size
max_size = 10 * 1024 * 1024 # 10MB
if file_size > max_size:
return {
"valid": False,
"reason": f"文件过大: {file_size / 1024 / 1024:.2f}MB限制10MB"
}
if file_size == 0:
return {
"valid": False,
"reason": "文件为空"
}
# 尝试检测编码
encoding = detect_encoding(str(file_path_obj))
return {
"valid": True,
"reason": "文件有效",
"file_info": {
"filename": file_path_obj.name,
"size": file_size,
"size_kb": round(file_size / 1024, 2),
"detected_encoding": encoding
}
}
except Exception as e:
return {
"valid": False,
"reason": f"验证失败: {str(e)}"
}
def preview_txt(file_path: str, lines: int = 10) -> Dict[str, Any]:
"""
预览Txt文件前几行
Args:
file_path: 文件路径
lines: 预览行数
Returns:
{
"success": True,
"preview": "前N行内容",
"total_lines": 总行数(如果能快速获取)
}
"""
try:
result = extract_txt(file_path)
if not result['success']:
return result
text = result['text']
text_lines = text.split('\n')
preview_lines = text_lines[:lines]
preview = '\n'.join(preview_lines)
return {
"success": True,
"preview": preview,
"total_lines": len(text_lines),
"preview_lines": len(preview_lines)
}
except Exception as e:
return {
"success": False,
"error": str(e),
"preview": ""
}