321 lines
8.3 KiB
Python
321 lines
8.3 KiB
Python
"""
|
||
Txt文本文件提取服务
|
||
|
||
直接读取纯文本文件
|
||
支持多种编码自动检测
|
||
"""
|
||
|
||
from pathlib import Path
|
||
from typing import Dict, Any, List
|
||
from loguru import logger
|
||
import chardet
|
||
|
||
|
||
def extract_txt(file_path: str) -> Dict[str, Any]:
|
||
"""
|
||
提取Txt文件内容
|
||
|
||
特性:
|
||
- 自动检测编码(UTF-8, GBK, GB2312等)
|
||
- 支持大文件(逐块读取)
|
||
- 去除BOM标记
|
||
|
||
Args:
|
||
file_path: Txt文件路径
|
||
|
||
Returns:
|
||
{
|
||
"success": True,
|
||
"text": "文本内容",
|
||
"encoding": "检测到的编码",
|
||
"metadata": {
|
||
"char_count": 字符数,
|
||
"line_count": 行数,
|
||
"file_size": 文件大小
|
||
}
|
||
}
|
||
"""
|
||
try:
|
||
file_path_obj = Path(file_path)
|
||
|
||
# 验证文件存在
|
||
if not file_path_obj.exists():
|
||
return {
|
||
"success": False,
|
||
"error": f"文件不存在: {file_path}",
|
||
"text": "",
|
||
"metadata": {}
|
||
}
|
||
|
||
# 验证文件格式
|
||
if file_path_obj.suffix.lower() != '.txt':
|
||
return {
|
||
"success": False,
|
||
"error": f"不支持的文件格式: {file_path_obj.suffix},仅支持.txt",
|
||
"text": "",
|
||
"metadata": {}
|
||
}
|
||
|
||
file_size = file_path_obj.stat().st_size
|
||
|
||
# 空文件检查
|
||
if file_size == 0:
|
||
return {
|
||
"success": False,
|
||
"error": "文件为空",
|
||
"text": "",
|
||
"metadata": {
|
||
"char_count": 0,
|
||
"line_count": 0,
|
||
"file_size": 0
|
||
}
|
||
}
|
||
|
||
logger.info(f"开始提取Txt文件: {file_path_obj.name} ({file_size / 1024:.2f} KB)")
|
||
|
||
# 检测编码
|
||
detected_encoding = detect_encoding(file_path)
|
||
logger.info(f"检测到编码: {detected_encoding}")
|
||
|
||
# 读取文件(带编码回退)
|
||
text, actual_encoding = read_with_fallback(file_path, detected_encoding)
|
||
|
||
if text is None:
|
||
return {
|
||
"success": False,
|
||
"error": "无法解码文件,尝试了多种编码均失败",
|
||
"text": "",
|
||
"metadata": {}
|
||
}
|
||
|
||
# 统计信息
|
||
char_count = len(text)
|
||
line_count = text.count('\n') + 1
|
||
|
||
logger.info(f"Txt提取成功: {char_count}个字符, {line_count}行")
|
||
|
||
return {
|
||
"success": True,
|
||
"text": text,
|
||
"encoding": actual_encoding,
|
||
"metadata": {
|
||
"char_count": char_count,
|
||
"line_count": line_count,
|
||
"file_size": file_size,
|
||
"size_kb": round(file_size / 1024, 2)
|
||
}
|
||
}
|
||
|
||
except Exception as e:
|
||
logger.error(f"Txt提取失败: {str(e)}")
|
||
return {
|
||
"success": False,
|
||
"error": str(e),
|
||
"text": "",
|
||
"metadata": {}
|
||
}
|
||
|
||
|
||
def detect_encoding(file_path: str, sample_size: int = 10000) -> str:
|
||
"""
|
||
检测文件编码
|
||
|
||
Args:
|
||
file_path: 文件路径
|
||
sample_size: 采样大小(字节)
|
||
|
||
Returns:
|
||
检测到的编码名称
|
||
"""
|
||
try:
|
||
with open(file_path, 'rb') as f:
|
||
raw_data = f.read(sample_size)
|
||
|
||
# 使用chardet检测
|
||
result = chardet.detect(raw_data)
|
||
encoding = result['encoding']
|
||
confidence = result['confidence']
|
||
|
||
logger.debug(f"编码检测: {encoding} (置信度: {confidence:.2f})")
|
||
|
||
# 如果置信度太低,使用UTF-8作为默认
|
||
if confidence < 0.7:
|
||
logger.warning(f"编码置信度较低({confidence:.2f}),将尝试UTF-8")
|
||
return 'utf-8'
|
||
|
||
return encoding if encoding else 'utf-8'
|
||
|
||
except Exception as e:
|
||
logger.warning(f"编码检测失败: {str(e)},使用UTF-8")
|
||
return 'utf-8'
|
||
|
||
|
||
def read_with_fallback(file_path: str, primary_encoding: str) -> tuple[str, str]:
|
||
"""
|
||
尝试多种编码读取文件
|
||
|
||
Args:
|
||
file_path: 文件路径
|
||
primary_encoding: 首选编码
|
||
|
||
Returns:
|
||
(文本内容, 实际使用的编码)
|
||
"""
|
||
# 编码尝试列表(按优先级)
|
||
encodings = [
|
||
primary_encoding,
|
||
'utf-8',
|
||
'utf-8-sig', # UTF-8 with BOM
|
||
'gbk',
|
||
'gb2312',
|
||
'gb18030',
|
||
'latin-1',
|
||
'cp1252',
|
||
'iso-8859-1'
|
||
]
|
||
|
||
# 去重并保持顺序
|
||
seen = set()
|
||
unique_encodings = []
|
||
for enc in encodings:
|
||
if enc and enc.lower() not in seen:
|
||
seen.add(enc.lower())
|
||
unique_encodings.append(enc)
|
||
|
||
# 尝试每种编码
|
||
for encoding in unique_encodings:
|
||
try:
|
||
with open(file_path, 'r', encoding=encoding, errors='strict') as f:
|
||
text = f.read()
|
||
|
||
logger.info(f"成功使用编码: {encoding}")
|
||
return text, encoding
|
||
|
||
except UnicodeDecodeError:
|
||
logger.debug(f"编码 {encoding} 解码失败,尝试下一个")
|
||
continue
|
||
|
||
except Exception as e:
|
||
logger.warning(f"使用编码 {encoding} 读取失败: {str(e)}")
|
||
continue
|
||
|
||
# 所有编码都失败
|
||
logger.error("所有编码尝试均失败")
|
||
return None, None
|
||
|
||
|
||
def validate_txt_file(file_path: str) -> Dict[str, Any]:
|
||
"""
|
||
验证Txt文件的有效性
|
||
|
||
Args:
|
||
file_path: 文件路径
|
||
|
||
Returns:
|
||
{
|
||
"valid": True/False,
|
||
"reason": "原因",
|
||
"file_info": {文件信息}
|
||
}
|
||
"""
|
||
try:
|
||
file_path_obj = Path(file_path)
|
||
|
||
# 检查文件存在
|
||
if not file_path_obj.exists():
|
||
return {
|
||
"valid": False,
|
||
"reason": "文件不存在"
|
||
}
|
||
|
||
# 检查后缀
|
||
if file_path_obj.suffix.lower() != '.txt':
|
||
return {
|
||
"valid": False,
|
||
"reason": f"不支持的格式: {file_path_obj.suffix}(仅支持.txt)"
|
||
}
|
||
|
||
# 检查文件大小(限制10MB,txt文件通常较小)
|
||
file_size = file_path_obj.stat().st_size
|
||
max_size = 10 * 1024 * 1024 # 10MB
|
||
|
||
if file_size > max_size:
|
||
return {
|
||
"valid": False,
|
||
"reason": f"文件过大: {file_size / 1024 / 1024:.2f}MB(限制10MB)"
|
||
}
|
||
|
||
if file_size == 0:
|
||
return {
|
||
"valid": False,
|
||
"reason": "文件为空"
|
||
}
|
||
|
||
# 尝试检测编码
|
||
encoding = detect_encoding(str(file_path_obj))
|
||
|
||
return {
|
||
"valid": True,
|
||
"reason": "文件有效",
|
||
"file_info": {
|
||
"filename": file_path_obj.name,
|
||
"size": file_size,
|
||
"size_kb": round(file_size / 1024, 2),
|
||
"detected_encoding": encoding
|
||
}
|
||
}
|
||
|
||
except Exception as e:
|
||
return {
|
||
"valid": False,
|
||
"reason": f"验证失败: {str(e)}"
|
||
}
|
||
|
||
|
||
def preview_txt(file_path: str, lines: int = 10) -> Dict[str, Any]:
|
||
"""
|
||
预览Txt文件前几行
|
||
|
||
Args:
|
||
file_path: 文件路径
|
||
lines: 预览行数
|
||
|
||
Returns:
|
||
{
|
||
"success": True,
|
||
"preview": "前N行内容",
|
||
"total_lines": 总行数(如果能快速获取)
|
||
}
|
||
"""
|
||
try:
|
||
result = extract_txt(file_path)
|
||
|
||
if not result['success']:
|
||
return result
|
||
|
||
text = result['text']
|
||
text_lines = text.split('\n')
|
||
|
||
preview_lines = text_lines[:lines]
|
||
preview = '\n'.join(preview_lines)
|
||
|
||
return {
|
||
"success": True,
|
||
"preview": preview,
|
||
"total_lines": len(text_lines),
|
||
"preview_lines": len(preview_lines)
|
||
}
|
||
|
||
except Exception as e:
|
||
return {
|
||
"success": False,
|
||
"error": str(e),
|
||
"preview": ""
|
||
}
|
||
|
||
|
||
|
||
|
||
|
||
|