Files
AIclinicalresearch/extraction_service/services/txt_extractor.py

321 lines
8.3 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Txt文本文件提取服务
直接读取纯文本文件
支持多种编码自动检测
"""
from pathlib import Path
from typing import Dict, Any, List
from loguru import logger
import chardet
def extract_txt(file_path: str) -> Dict[str, Any]:
"""
提取Txt文件内容
特性:
- 自动检测编码UTF-8, GBK, GB2312等
- 支持大文件(逐块读取)
- 去除BOM标记
Args:
file_path: Txt文件路径
Returns:
{
"success": True,
"text": "文本内容",
"encoding": "检测到的编码",
"metadata": {
"char_count": 字符数,
"line_count": 行数,
"file_size": 文件大小
}
}
"""
try:
file_path_obj = Path(file_path)
# 验证文件存在
if not file_path_obj.exists():
return {
"success": False,
"error": f"文件不存在: {file_path}",
"text": "",
"metadata": {}
}
# 验证文件格式
if file_path_obj.suffix.lower() != '.txt':
return {
"success": False,
"error": f"不支持的文件格式: {file_path_obj.suffix},仅支持.txt",
"text": "",
"metadata": {}
}
file_size = file_path_obj.stat().st_size
# 空文件检查
if file_size == 0:
return {
"success": False,
"error": "文件为空",
"text": "",
"metadata": {
"char_count": 0,
"line_count": 0,
"file_size": 0
}
}
logger.info(f"开始提取Txt文件: {file_path_obj.name} ({file_size / 1024:.2f} KB)")
# 检测编码
detected_encoding = detect_encoding(file_path)
logger.info(f"检测到编码: {detected_encoding}")
# 读取文件(带编码回退)
text, actual_encoding = read_with_fallback(file_path, detected_encoding)
if text is None:
return {
"success": False,
"error": "无法解码文件,尝试了多种编码均失败",
"text": "",
"metadata": {}
}
# 统计信息
char_count = len(text)
line_count = text.count('\n') + 1
logger.info(f"Txt提取成功: {char_count}个字符, {line_count}")
return {
"success": True,
"text": text,
"encoding": actual_encoding,
"metadata": {
"char_count": char_count,
"line_count": line_count,
"file_size": file_size,
"size_kb": round(file_size / 1024, 2)
}
}
except Exception as e:
logger.error(f"Txt提取失败: {str(e)}")
return {
"success": False,
"error": str(e),
"text": "",
"metadata": {}
}
def detect_encoding(file_path: str, sample_size: int = 10000) -> str:
"""
检测文件编码
Args:
file_path: 文件路径
sample_size: 采样大小(字节)
Returns:
检测到的编码名称
"""
try:
with open(file_path, 'rb') as f:
raw_data = f.read(sample_size)
# 使用chardet检测
result = chardet.detect(raw_data)
encoding = result['encoding']
confidence = result['confidence']
logger.debug(f"编码检测: {encoding} (置信度: {confidence:.2f})")
# 如果置信度太低使用UTF-8作为默认
if confidence < 0.7:
logger.warning(f"编码置信度较低({confidence:.2f})将尝试UTF-8")
return 'utf-8'
return encoding if encoding else 'utf-8'
except Exception as e:
logger.warning(f"编码检测失败: {str(e)}使用UTF-8")
return 'utf-8'
def read_with_fallback(file_path: str, primary_encoding: str) -> tuple[str, str]:
"""
尝试多种编码读取文件
Args:
file_path: 文件路径
primary_encoding: 首选编码
Returns:
(文本内容, 实际使用的编码)
"""
# 编码尝试列表(按优先级)
encodings = [
primary_encoding,
'utf-8',
'utf-8-sig', # UTF-8 with BOM
'gbk',
'gb2312',
'gb18030',
'latin-1',
'cp1252',
'iso-8859-1'
]
# 去重并保持顺序
seen = set()
unique_encodings = []
for enc in encodings:
if enc and enc.lower() not in seen:
seen.add(enc.lower())
unique_encodings.append(enc)
# 尝试每种编码
for encoding in unique_encodings:
try:
with open(file_path, 'r', encoding=encoding, errors='strict') as f:
text = f.read()
logger.info(f"成功使用编码: {encoding}")
return text, encoding
except UnicodeDecodeError:
logger.debug(f"编码 {encoding} 解码失败,尝试下一个")
continue
except Exception as e:
logger.warning(f"使用编码 {encoding} 读取失败: {str(e)}")
continue
# 所有编码都失败
logger.error("所有编码尝试均失败")
return None, None
def validate_txt_file(file_path: str) -> Dict[str, Any]:
"""
验证Txt文件的有效性
Args:
file_path: 文件路径
Returns:
{
"valid": True/False,
"reason": "原因",
"file_info": {文件信息}
}
"""
try:
file_path_obj = Path(file_path)
# 检查文件存在
if not file_path_obj.exists():
return {
"valid": False,
"reason": "文件不存在"
}
# 检查后缀
if file_path_obj.suffix.lower() != '.txt':
return {
"valid": False,
"reason": f"不支持的格式: {file_path_obj.suffix}(仅支持.txt"
}
# 检查文件大小限制10MBtxt文件通常较小
file_size = file_path_obj.stat().st_size
max_size = 10 * 1024 * 1024 # 10MB
if file_size > max_size:
return {
"valid": False,
"reason": f"文件过大: {file_size / 1024 / 1024:.2f}MB限制10MB"
}
if file_size == 0:
return {
"valid": False,
"reason": "文件为空"
}
# 尝试检测编码
encoding = detect_encoding(str(file_path_obj))
return {
"valid": True,
"reason": "文件有效",
"file_info": {
"filename": file_path_obj.name,
"size": file_size,
"size_kb": round(file_size / 1024, 2),
"detected_encoding": encoding
}
}
except Exception as e:
return {
"valid": False,
"reason": f"验证失败: {str(e)}"
}
def preview_txt(file_path: str, lines: int = 10) -> Dict[str, Any]:
"""
预览Txt文件前几行
Args:
file_path: 文件路径
lines: 预览行数
Returns:
{
"success": True,
"preview": "前N行内容",
"total_lines": 总行数(如果能快速获取)
}
"""
try:
result = extract_txt(file_path)
if not result['success']:
return result
text = result['text']
text_lines = text.split('\n')
preview_lines = text_lines[:lines]
preview = '\n'.join(preview_lines)
return {
"success": True,
"preview": preview,
"total_lines": len(text_lines),
"preview_lines": len(preview_lines)
}
except Exception as e:
return {
"success": False,
"error": str(e),
"preview": ""
}