Spaces:
Sleeping
Sleeping
| import logging | |
| from abc import ABC, abstractmethod | |
| from dataclasses import dataclass, field | |
| from datetime import datetime | |
| from pathlib import Path | |
| from typing import Dict, List, Optional, Any, Union | |
| from enum import Enum | |
| import hashlib | |
| import sys | |
| import os | |
| sys.path.append(os.path.dirname(os.path.abspath(__file__))) # Ensure current directory is in | |
| try: | |
| from logger.custom_logger import CustomLoggerTracker | |
| custom_log = CustomLoggerTracker() | |
| logger = custom_log.get_logger("documents_processor") | |
| except ImportError: | |
| # Fallback to standard logging if custom logger not available | |
| logger = logging.getLogger("documents_processor") | |
| class DocumentType(Enum): | |
| """Supported document types.""" | |
| PDF = "pdf" | |
| EXCEL = "excel" | |
| IMAGE = "image" | |
| UNKNOWN = "unknown" | |
| class ProcessingStatus(Enum): | |
| """Document processing status.""" | |
| PENDING = "pending" | |
| PROCESSING = "processing" | |
| COMPLETED = "completed" | |
| FAILED = "failed" | |
| PARTIAL = "partial" | |
| class ExtractedImage: | |
| """Represents an image extracted from a document.""" | |
| image_id: str | |
| filename: str | |
| content: bytes | |
| format: str # PNG, JPEG, etc. | |
| width: Optional[int] = None | |
| height: Optional[int] = None | |
| ocr_text: Optional[str] = None | |
| ocr_confidence: Optional[float] = None | |
| extraction_method: str = "unknown" | |
| metadata: Dict[str, Any] = field(default_factory=dict) | |
| class ExtractedTable: | |
| """Represents a table extracted from a document.""" | |
| table_id: str | |
| headers: List[str] | |
| rows: List[List[str]] | |
| page_number: Optional[int] = None | |
| worksheet_name: Optional[str] = None | |
| cell_range: Optional[str] = None | |
| extraction_confidence: Optional[float] = None | |
| metadata: Dict[str, Any] = field(default_factory=dict) | |
| class ChunkMetadata: | |
| """Metadata for a document chunk.""" | |
| chunk_id: str | |
| document_id: str | |
| chunk_index: int | |
| page_number: Optional[int] = None | |
| worksheet_name: Optional[str] = None | |
| cell_range: Optional[str] = None | |
| section_title: Optional[str] = None | |
| image_references: List[str] = field(default_factory=list) | |
| table_references: List[str] = field(default_factory=list) | |
| extraction_timestamp: datetime = field(default_factory=datetime.now) | |
| confidence_score: Optional[float] = None | |
| class DocumentChunk: | |
| content: str | |
| metadata: ChunkMetadata | |
| embedding: Optional[List[float]] = None | |
| def __post_init__(self): | |
| """Validate chunk content after initialization.""" | |
| if not self.content.strip(): | |
| logger.warning(f"Empty content in chunk {self.metadata.chunk_id}") | |
| if len(self.content) > 10000: # Warn for very large chunks | |
| logger.warning(f"Large chunk detected ({len(self.content)} chars): {self.metadata.chunk_id}") | |
| class ProcessedDocument: | |
| """Represents a fully processed document with all extracted content.""" | |
| document_id: str | |
| filename: str | |
| file_path: str | |
| document_type: DocumentType | |
| content: str | |
| metadata: Dict[str, Any] | |
| images: List[ExtractedImage] = field(default_factory=list) | |
| tables: List[ExtractedTable] = field(default_factory=list) | |
| processing_status: ProcessingStatus = ProcessingStatus.PENDING | |
| processing_timestamp: datetime = field(default_factory=datetime.now) | |
| file_size: int = 0 | |
| checksum: str = "" | |
| error_message: Optional[str] = None | |
| def __post_init__(self): | |
| """Generate checksum and validate document after initialization.""" | |
| if not self.checksum and Path(self.file_path).exists(): | |
| self.checksum = self._generate_checksum() | |
| self.file_size = Path(self.file_path).stat().st_size | |
| def _generate_checksum(self) -> str: | |
| try: | |
| hash_md5 = hashlib.md5() | |
| with open(self.file_path, "rb") as f: | |
| for chunk in iter(lambda: f.read(1024), b""): | |
| hash_md5.update(chunk) | |
| return hash_md5.hexdigest() | |
| except Exception as e: | |
| logger.error(f"Failed to generate checksum for {self.file_path}: {e}") | |
| return "" | |
| class DocumentProcessingError(Exception): | |
| """Base exception for document processing errors.""" | |
| def __init__(self, file_path: str, error_type: str, details: str): | |
| self.file_path = file_path | |
| self.error_type = error_type | |
| self.details = details | |
| super().__init__(f"Document processing error in {file_path}: {error_type} - {details}") | |
| class UnsupportedDocumentTypeError(DocumentProcessingError): | |
| def __init__(self, file_path: str, detected_type: str): | |
| super().__init__( | |
| file_path, | |
| "UnsupportedDocumentType", | |
| f"Document type '{detected_type}' is not supported" | |
| ) | |
| class DocumentProcessor(ABC): | |
| def __init__(self, config: Dict[str, Any]): | |
| self.config = config | |
| self.supported_extensions = self._get_supported_extensions() | |
| logger.info(f"Initialized {self.__class__.__name__} with config: {config}") | |
| def _get_supported_extensions(self) -> List[str]: | |
| pass | |
| def process_document(self, file_path: str) -> ProcessedDocument: | |
| pass | |
| def can_process(self, file_path: str) -> bool: | |
| file_extension = Path(file_path).suffix.lower() | |
| return file_extension in self.supported_extensions | |
| def extract_chunks(self, document: ProcessedDocument, chunk_size: int = 512, | |
| chunk_overlap: int = 50) -> List[DocumentChunk]: | |
| if not document.content.strip(): | |
| logger.warning(f"No content to chunk in document {document.document_id}") | |
| return [] | |
| chunks = [] | |
| content = document.content | |
| start = 0 | |
| chunk_index = 0 | |
| while start < len(content): | |
| # Calculate end position | |
| end = min(start + chunk_size, len(content)) | |
| # Try to break at word boundary if not at end of content | |
| if end < len(content): | |
| # Look for the last space within the chunk | |
| last_space = content.rfind(' ', start, end) | |
| if last_space > start: | |
| end = last_space | |
| # Extract chunk content | |
| chunk_content = content[start:end].strip() | |
| if chunk_content: # Only create chunk if it has content | |
| # Create chunk metadata | |
| metadata = ChunkMetadata( | |
| chunk_id=f"{document.document_id}_chunk_{chunk_index}", | |
| document_id=document.document_id, | |
| chunk_index=chunk_index | |
| ) | |
| # Create chunk | |
| chunk = DocumentChunk( | |
| content=chunk_content, | |
| metadata=metadata | |
| ) | |
| chunks.append(chunk) | |
| chunk_index += 1 | |
| # Move start position with overlap | |
| start = max(end - chunk_overlap, start + 1) | |
| # Prevent infinite loop | |
| if start >= end: | |
| break | |
| logger.info(f"Created {len(chunks)} chunks from document {document.document_id}") | |
| return chunks | |
| def _detect_document_type(self, file_path: str) -> DocumentType: | |
| extension = Path(file_path).suffix.lower() | |
| if extension == '.pdf': | |
| return DocumentType.PDF | |
| elif extension in ['.xlsx', '.xls', '.xlsm']: | |
| return DocumentType.EXCEL | |
| elif extension in ['.png', '.jpg', '.jpeg', '.gif', '.bmp', '.tiff']: | |
| return DocumentType.IMAGE | |
| else: | |
| return DocumentType.UNKNOWN | |
| def _generate_document_id(self, file_path: str) -> str: | |
| """ | |
| Generate a unique document ID based on file path and timestamp. | |
| Args: | |
| file_path: Path to the document file | |
| Returns: | |
| Unique document ID string | |
| """ | |
| file_name = Path(file_path).name | |
| timestamp = datetime.now().isoformat() | |
| content = f"{file_name}_{timestamp}" | |
| return hashlib.md5(content.encode()).hexdigest() | |
| def validate_file(self, file_path: str) -> None: | |
| """ | |
| Validate that a file exists and can be processed. | |
| Args: | |
| file_path: Path to the file to validate | |
| Raises: | |
| DocumentProcessingError: If file validation fails | |
| """ | |
| file_path_obj = Path(file_path) | |
| if not file_path_obj.exists(): | |
| raise DocumentProcessingError( | |
| file_path, | |
| "FileNotFound", | |
| f"File does not exist: {file_path}" | |
| ) | |
| if not file_path_obj.is_file(): | |
| raise DocumentProcessingError( | |
| file_path, | |
| "NotAFile", | |
| f"Path is not a file: {file_path}" | |
| ) | |
| # Check file size | |
| max_size = self.config.get('max_file_size_mb', 100) * 1024 * 1024 # Convert to bytes | |
| file_size = file_path_obj.stat().st_size | |
| if file_size > max_size: | |
| raise DocumentProcessingError( | |
| file_path, | |
| "FileTooLarge", | |
| f"File size ({file_size} bytes) exceeds maximum allowed size ({max_size} bytes)" | |
| ) | |
| if not self.can_process(file_path): | |
| detected_type = self._detect_document_type(file_path) | |
| raise UnsupportedDocumentTypeError(file_path, detected_type.value) | |
| logger.debug(f"File validation passed for: {file_path}") | |
| class DocumentProcessorFactory: | |
| """Factory class for creating appropriate document processors.""" | |
| _processors = {} | |
| def register_processor(cls, document_type: DocumentType, processor_class): | |
| """Register a processor class for a document type.""" | |
| cls._processors[document_type] = processor_class | |
| logger.info(f"Registered processor {processor_class.__name__} for type {document_type.value}") | |
| def create_processor(cls, file_path: str, config: Dict[str, Any]) -> DocumentProcessor: | |
| """ | |
| Create appropriate processor for the given file. | |
| Args: | |
| file_path: Path to the file to process | |
| config: Configuration dictionary | |
| Returns: | |
| DocumentProcessor instance | |
| Raises: | |
| UnsupportedDocumentTypeError: If no processor is available for the file type | |
| """ | |
| # Detect document type | |
| extension = Path(file_path).suffix.lower() | |
| if extension == '.pdf': | |
| document_type = DocumentType.PDF | |
| elif extension in ['.xlsx', '.xls', '.xlsm']: | |
| document_type = DocumentType.EXCEL | |
| elif extension in ['.png', '.jpg', '.jpeg', '.gif', '.bmp', '.tiff']: | |
| document_type = DocumentType.IMAGE | |
| else: | |
| document_type = DocumentType.UNKNOWN | |
| # Get processor class | |
| processor_class = cls._processors.get(document_type) | |
| if not processor_class: | |
| raise UnsupportedDocumentTypeError(file_path, document_type.value) | |
| # Create and return processor instance | |
| return processor_class(config) | |
| def get_supported_types(cls) -> List[DocumentType]: | |
| """Get list of supported document types.""" | |
| return list(cls._processors.keys()) | |
| if __name__=="__main__": | |
| logger.info(f"Docs processor init ..") | |
| # Example usage (for testing purposes) | |
| config = {'max_file_size_mb': 50} | |
| processor = DocumentProcessorFactory.create_processor("example.pdf", config) | |
| processed_doc = processor.process_document("example.pdf") | |
| chunks = processor.extract_chunks(processed_doc) | |
| for chunk in chunks: | |
| print(chunk) | |