""" 统一文档处理入口 - DocumentProcessor 功能: - 自动检测文件类型 - 调用对应的处理器 - 统一输出 Markdown 格式 支持格式: - PDF (.pdf) → pymupdf4llm - Word (.docx) → mammoth - Excel (.xlsx) → pandas - CSV (.csv) → pandas - PPT (.pptx) → python-pptx - 纯文本 (.txt, .md) → 直接读取 """ from pathlib import Path from typing import Dict, Any, Optional from loguru import logger import chardet class DocumentProcessor: """统一文档处理器""" # 支持的文件类型 SUPPORTED_TYPES = { '.pdf': 'pdf', '.docx': 'word', '.doc': 'word', '.xlsx': 'excel', '.xls': 'excel', '.csv': 'csv', '.pptx': 'ppt', '.ppt': 'ppt', '.txt': 'text', '.md': 'text', '.markdown': 'text', } def to_markdown(self, file_path: str) -> Dict[str, Any]: """ 将文档转换为 Markdown Args: file_path: 文件路径 Returns: { "success": True, "markdown": "Markdown 内容", "file_type": "pdf", "metadata": { ... } } """ path = Path(file_path) filename = path.name suffix = path.suffix.lower() # 检查文件类型 if suffix not in self.SUPPORTED_TYPES: return { "success": False, "error": f"不支持的文件类型: {suffix}", "supported_types": list(self.SUPPORTED_TYPES.keys()) } file_type = self.SUPPORTED_TYPES[suffix] logger.info(f"处理文档: {filename}, 类型: {file_type}") try: # 根据类型调用对应处理器 if file_type == 'pdf': result = self._process_pdf(file_path) elif file_type == 'word': result = self._process_word(file_path) elif file_type == 'excel': result = self._process_excel(file_path) elif file_type == 'csv': result = self._process_csv(file_path) elif file_type == 'ppt': result = self._process_ppt(file_path) elif file_type == 'text': result = self._process_text(file_path) else: result = { "success": False, "error": f"未实现的处理器: {file_type}" } # 添加通用信息 if result.get("success"): result["file_type"] = file_type result["filename"] = filename return result except Exception as e: logger.error(f"文档处理失败: {filename}, 错误: {e}") return { "success": False, "error": str(e), "file_type": file_type, "filename": filename } def _process_pdf(self, file_path: str) -> Dict[str, Any]: """处理 PDF""" from .pdf_markdown_processor import PdfMarkdownProcessor processor = PdfMarkdownProcessor() return processor.to_markdown(file_path) def _process_word(self, file_path: str) -> Dict[str, Any]: """处理 Word 文档""" import mammoth try: with open(file_path, "rb") as f: result = mammoth.convert_to_markdown(f) markdown = result.value messages = result.messages # 添加文件名上下文 filename = Path(file_path).name markdown_with_context = f"## 文档: {filename}\n\n{markdown}" return { "success": True, "markdown": markdown_with_context, "metadata": { "char_count": len(markdown), "warnings": [str(m) for m in messages] if messages else [] } } except Exception as e: return { "success": False, "error": str(e), "markdown": f"> **系统提示**:Word 文档解析失败: {str(e)}" } def _process_excel(self, file_path: str) -> Dict[str, Any]: """处理 Excel""" import pandas as pd try: filename = Path(file_path).name xlsx = pd.ExcelFile(file_path, engine='openpyxl') md_parts = [] total_rows = 0 for sheet_name in xlsx.sheet_names: df = pd.read_excel(xlsx, sheet_name=sheet_name) rows = len(df) total_rows += rows # 添加 Sheet 信息 md_parts.append(f"## 数据: {filename} - {sheet_name}") md_parts.append(f"- **行列**: {rows} 行 × {len(df.columns)} 列\n") # 截断大数据 max_rows = 200 if rows > max_rows: md_parts.append(f"> ⚠️ 数据量较大,仅显示前 {max_rows} 行(共 {rows} 行)\n") df = df.head(max_rows) # 转换为 Markdown 表格 df = df.fillna('') md_parts.append(df.to_markdown(index=False)) md_parts.append("\n---\n") return { "success": True, "markdown": "\n".join(md_parts), "metadata": { "sheet_count": len(xlsx.sheet_names), "total_rows": total_rows, "sheets": xlsx.sheet_names } } except Exception as e: return { "success": False, "error": str(e), "markdown": f"> **系统提示**:Excel 文档解析失败: {str(e)}" } def _process_csv(self, file_path: str) -> Dict[str, Any]: """处理 CSV""" import pandas as pd try: filename = Path(file_path).name # 自动检测编码 with open(file_path, 'rb') as f: raw = f.read(10000) detected = chardet.detect(raw) encoding = detected.get('encoding', 'utf-8') df = pd.read_csv(file_path, encoding=encoding) rows = len(df) md_parts = [] md_parts.append(f"## 数据: {filename}") md_parts.append(f"- **行列**: {rows} 行 × {len(df.columns)} 列") md_parts.append(f"- **编码**: {encoding}\n") # 截断大数据 max_rows = 200 if rows > max_rows: md_parts.append(f"> ⚠️ 数据量较大,仅显示前 {max_rows} 行(共 {rows} 行)\n") df = df.head(max_rows) df = df.fillna('') md_parts.append(df.to_markdown(index=False)) return { "success": True, "markdown": "\n".join(md_parts), "metadata": { "row_count": rows, "column_count": len(df.columns), "encoding": encoding } } except Exception as e: return { "success": False, "error": str(e), "markdown": f"> **系统提示**:CSV 文件解析失败: {str(e)}" } def _process_ppt(self, file_path: str) -> Dict[str, Any]: """处理 PPT""" from pptx import Presentation try: filename = Path(file_path).name prs = Presentation(file_path) md_parts = [] md_parts.append(f"## 演示文稿: {filename}\n") for slide_num, slide in enumerate(prs.slides, 1): md_parts.append(f"### 幻灯片 {slide_num}") # 获取标题 if slide.shapes.title: md_parts.append(f"**{slide.shapes.title.text}**\n") # 获取所有文本 for shape in slide.shapes: if shape.has_text_frame: for para in shape.text_frame.paragraphs: text = para.text.strip() if text: md_parts.append(f"- {text}") md_parts.append("") return { "success": True, "markdown": "\n".join(md_parts), "metadata": { "slide_count": len(prs.slides) } } except Exception as e: return { "success": False, "error": str(e), "markdown": f"> **系统提示**:PPT 文档解析失败: {str(e)}" } def _process_text(self, file_path: str) -> Dict[str, Any]: """处理纯文本""" try: filename = Path(file_path).name # 自动检测编码 with open(file_path, 'rb') as f: raw = f.read() detected = chardet.detect(raw) encoding = detected.get('encoding', 'utf-8') with open(file_path, 'r', encoding=encoding) as f: content = f.read() # 如果是 .md 文件,直接返回 if file_path.endswith('.md') or file_path.endswith('.markdown'): markdown = content else: # 纯文本添加文件名上下文 markdown = f"## 文档: {filename}\n\n{content}" return { "success": True, "markdown": markdown, "metadata": { "char_count": len(content), "encoding": encoding } } except Exception as e: return { "success": False, "error": str(e), "markdown": f"> **系统提示**:文本文件读取失败: {str(e)}" } # 便捷函数 async def convert_to_markdown(file_path: str, file_type: Optional[str] = None) -> Dict[str, Any]: """ 将文档转换为 Markdown(便捷函数,异步版本) Args: file_path: 文件路径 file_type: 可选,指定文件类型(如不指定则自动检测) Returns: 处理结果字典,格式: { "success": True, "text": "Markdown 内容", "format": "markdown", "metadata": { ... } } """ processor = DocumentProcessor() result = processor.to_markdown(file_path) # 转换输出格式以匹配 API 预期 if result.get("success"): return { "success": True, "text": result.get("markdown", ""), "format": "markdown", "metadata": { "original_file_type": result.get("file_type"), "char_count": len(result.get("markdown", "")), **result.get("metadata", {}) } } else: return { "success": False, "error": result.get("error", "处理失败"), "metadata": result.get("metadata", {}) }