""" Txt文本文件提取服务 直接读取纯文本文件 支持多种编码自动检测 """ from pathlib import Path from typing import Dict, Any, List from loguru import logger import chardet def extract_txt(file_path: str) -> Dict[str, Any]: """ 提取Txt文件内容 特性: - 自动检测编码(UTF-8, GBK, GB2312等) - 支持大文件(逐块读取) - 去除BOM标记 Args: file_path: Txt文件路径 Returns: { "success": True, "text": "文本内容", "encoding": "检测到的编码", "metadata": { "char_count": 字符数, "line_count": 行数, "file_size": 文件大小 } } """ try: file_path_obj = Path(file_path) # 验证文件存在 if not file_path_obj.exists(): return { "success": False, "error": f"文件不存在: {file_path}", "text": "", "metadata": {} } # 验证文件格式 if file_path_obj.suffix.lower() != '.txt': return { "success": False, "error": f"不支持的文件格式: {file_path_obj.suffix},仅支持.txt", "text": "", "metadata": {} } file_size = file_path_obj.stat().st_size # 空文件检查 if file_size == 0: return { "success": False, "error": "文件为空", "text": "", "metadata": { "char_count": 0, "line_count": 0, "file_size": 0 } } logger.info(f"开始提取Txt文件: {file_path_obj.name} ({file_size / 1024:.2f} KB)") # 检测编码 detected_encoding = detect_encoding(file_path) logger.info(f"检测到编码: {detected_encoding}") # 读取文件(带编码回退) text, actual_encoding = read_with_fallback(file_path, detected_encoding) if text is None: return { "success": False, "error": "无法解码文件,尝试了多种编码均失败", "text": "", "metadata": {} } # 统计信息 char_count = len(text) line_count = text.count('\n') + 1 logger.info(f"Txt提取成功: {char_count}个字符, {line_count}行") return { "success": True, "text": text, "encoding": actual_encoding, "metadata": { "char_count": char_count, "line_count": line_count, "file_size": file_size, "size_kb": round(file_size / 1024, 2) } } except Exception as e: logger.error(f"Txt提取失败: {str(e)}") return { "success": False, "error": str(e), "text": "", "metadata": {} } def detect_encoding(file_path: str, sample_size: int = 10000) -> str: """ 检测文件编码 Args: file_path: 文件路径 sample_size: 采样大小(字节) Returns: 检测到的编码名称 """ try: with open(file_path, 'rb') as f: raw_data = f.read(sample_size) # 使用chardet检测 result = chardet.detect(raw_data) encoding = result['encoding'] confidence = result['confidence'] logger.debug(f"编码检测: {encoding} (置信度: {confidence:.2f})") # 如果置信度太低,使用UTF-8作为默认 if confidence < 0.7: logger.warning(f"编码置信度较低({confidence:.2f}),将尝试UTF-8") return 'utf-8' return encoding if encoding else 'utf-8' except Exception as e: logger.warning(f"编码检测失败: {str(e)},使用UTF-8") return 'utf-8' def read_with_fallback(file_path: str, primary_encoding: str) -> tuple[str, str]: """ 尝试多种编码读取文件 Args: file_path: 文件路径 primary_encoding: 首选编码 Returns: (文本内容, 实际使用的编码) """ # 编码尝试列表(按优先级) encodings = [ primary_encoding, 'utf-8', 'utf-8-sig', # UTF-8 with BOM 'gbk', 'gb2312', 'gb18030', 'latin-1', 'cp1252', 'iso-8859-1' ] # 去重并保持顺序 seen = set() unique_encodings = [] for enc in encodings: if enc and enc.lower() not in seen: seen.add(enc.lower()) unique_encodings.append(enc) # 尝试每种编码 for encoding in unique_encodings: try: with open(file_path, 'r', encoding=encoding, errors='strict') as f: text = f.read() logger.info(f"成功使用编码: {encoding}") return text, encoding except UnicodeDecodeError: logger.debug(f"编码 {encoding} 解码失败,尝试下一个") continue except Exception as e: logger.warning(f"使用编码 {encoding} 读取失败: {str(e)}") continue # 所有编码都失败 logger.error("所有编码尝试均失败") return None, None def validate_txt_file(file_path: str) -> Dict[str, Any]: """ 验证Txt文件的有效性 Args: file_path: 文件路径 Returns: { "valid": True/False, "reason": "原因", "file_info": {文件信息} } """ try: file_path_obj = Path(file_path) # 检查文件存在 if not file_path_obj.exists(): return { "valid": False, "reason": "文件不存在" } # 检查后缀 if file_path_obj.suffix.lower() != '.txt': return { "valid": False, "reason": f"不支持的格式: {file_path_obj.suffix}(仅支持.txt)" } # 检查文件大小(限制10MB,txt文件通常较小) file_size = file_path_obj.stat().st_size max_size = 10 * 1024 * 1024 # 10MB if file_size > max_size: return { "valid": False, "reason": f"文件过大: {file_size / 1024 / 1024:.2f}MB(限制10MB)" } if file_size == 0: return { "valid": False, "reason": "文件为空" } # 尝试检测编码 encoding = detect_encoding(str(file_path_obj)) return { "valid": True, "reason": "文件有效", "file_info": { "filename": file_path_obj.name, "size": file_size, "size_kb": round(file_size / 1024, 2), "detected_encoding": encoding } } except Exception as e: return { "valid": False, "reason": f"验证失败: {str(e)}" } def preview_txt(file_path: str, lines: int = 10) -> Dict[str, Any]: """ 预览Txt文件前几行 Args: file_path: 文件路径 lines: 预览行数 Returns: { "success": True, "preview": "前N行内容", "total_lines": 总行数(如果能快速获取) } """ try: result = extract_txt(file_path) if not result['success']: return result text = result['text'] text_lines = text.split('\n') preview_lines = text_lines[:lines] preview = '\n'.join(preview_lines) return { "success": True, "preview": preview, "total_lines": len(text_lines), "preview_lines": len(preview_lines) } except Exception as e: return { "success": False, "error": str(e), "preview": "" }