import logging from pathlib import Path from typing import Dict, List, Any, Optional, Tuple import pytesseract from PIL import Image, ImageEnhance, ImageFilter import io import numpy as np from dataclasses import dataclass import os import sys sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) from src.document_processor import ( DocumentProcessor, ProcessedDocument, DocumentType, ProcessingStatus, DocumentProcessingError, ExtractedImage, DocumentProcessorFactory ) try: from logger.custom_logger import CustomLoggerTracker custom_log = CustomLoggerTracker() logger = custom_log.get_logger("excel_processor") except ImportError: # Fallback to standard logging if custom logger not available logger = logging.getLogger("excel_processor") @dataclass class OCRResult: """Result of OCR processing.""" text: str confidence: float word_count: int processing_time: float preprocessing_applied: List[str] @dataclass class ImageAnalysis: """Analysis results for an image.""" width: int height: int format: str mode: str size_bytes: int is_grayscale: bool average_brightness: float contrast_level: float estimated_dpi: Optional[int] = None class ImageProcessor(DocumentProcessor): """ Image processor with OCR capabilities using Tesseract. This processor handles standalone image files and provides OCR text extraction with preprocessing to improve accuracy. """ def __init__(self, config: Dict[str, Any]): """ Initialize the image processor. Args: config: Configuration dictionary containing image processing settings """ super().__init__(config) self.ocr_engine = config.get('ocr_engine', 'tesseract') self.ocr_language = config.get('ocr_language', 'eng') self.preprocessing_enabled = config.get('image_preprocessing', True) self.min_confidence = config.get('min_ocr_confidence', 30.0) self.max_image_size = config.get('max_image_size', (3000, 3000)) self.enhance_contrast = config.get('enhance_contrast', True) self.enhance_sharpness = config.get('enhance_sharpness', True) # Verify Tesseract installation self._verify_tesseract() logger.info(f"Image processor initialized with OCR language: {self.ocr_language}, " f"preprocessing: {self.preprocessing_enabled}") def _get_supported_extensions(self) -> List[str]: """Get supported file extensions for image processor.""" return ['.png', '.jpg', '.jpeg', '.gif', '.bmp', '.tiff', '.tif'] def _verify_tesseract(self) -> None: """Verify that Tesseract is properly installed and accessible.""" try: version = pytesseract.get_tesseract_version() logger.info(f"Tesseract version: {version}") except Exception as e: logger.error(f"Tesseract not found or not properly installed: {e}") raise DocumentProcessingError( "tesseract", "InstallationError", f"Tesseract OCR engine not found: {e}" ) def process_document(self, file_path: str) -> ProcessedDocument: """ Process an image file and extract text using OCR. Args: file_path: Path to the image file Returns: ProcessedDocument with extracted text and metadata Raises: DocumentProcessingError: If image processing fails """ try: # Validate file first self.validate_file(file_path) # Generate document ID document_id = self._generate_document_id(file_path) logger.info(f"Processing image document: {file_path}") # Load and analyze image image = Image.open(file_path) image_analysis = self._analyze_image(image, file_path) # Preprocess image if enabled processed_image = image preprocessing_steps = [] if self.preprocessing_enabled: processed_image, preprocessing_steps = self._preprocess_image(image) # Perform OCR ocr_result = self._perform_ocr(processed_image) # Create extracted image object with open(file_path, 'rb') as f: image_content = f.read() extracted_image = ExtractedImage( image_id=f"{document_id}_main", filename=Path(file_path).name, content=image_content, format=image_analysis.format, width=image_analysis.width, height=image_analysis.height, ocr_text=ocr_result.text, ocr_confidence=ocr_result.confidence, extraction_method="tesseract_ocr", metadata={ 'image_analysis': { 'mode': image_analysis.mode, 'size_bytes': image_analysis.size_bytes, 'is_grayscale': image_analysis.is_grayscale, 'average_brightness': image_analysis.average_brightness, 'contrast_level': image_analysis.contrast_level, 'estimated_dpi': image_analysis.estimated_dpi }, 'ocr_result': { 'word_count': ocr_result.word_count, 'processing_time': ocr_result.processing_time, 'preprocessing_applied': ocr_result.preprocessing_applied } } ) # Create metadata metadata = { 'image_analysis': image_analysis.__dict__, 'ocr_result': ocr_result.__dict__, 'preprocessing_steps': preprocessing_steps, 'ocr_language': self.ocr_language, 'ocr_engine': self.ocr_engine } # Create processed document processed_doc = ProcessedDocument( document_id=document_id, filename=Path(file_path).name, file_path=file_path, document_type=DocumentType.IMAGE, content=ocr_result.text, metadata=metadata, images=[extracted_image], processing_status=ProcessingStatus.COMPLETED ) logger.info(f"Successfully processed image: {len(ocr_result.text)} characters extracted, " f"confidence: {ocr_result.confidence:.1f}%") return processed_doc except Exception as e: logger.error(f"Failed to process image {file_path}: {e}") # Create failed document document_id = self._generate_document_id(file_path) return ProcessedDocument( document_id=document_id, filename=Path(file_path).name, file_path=file_path, document_type=DocumentType.IMAGE, content="", metadata={}, processing_status=ProcessingStatus.FAILED, error_message=str(e) ) def process_extracted_image(self, extracted_image: ExtractedImage) -> ExtractedImage: """ Process an already extracted image (e.g., from PDF or Excel) with OCR. Args: extracted_image: ExtractedImage object to process Returns: Updated ExtractedImage with OCR text """ try: logger.debug(f"Processing extracted image: {extracted_image.image_id}") # Load image from bytes image = Image.open(io.BytesIO(extracted_image.content)) # Preprocess image if enabled processed_image = image preprocessing_steps = [] if self.preprocessing_enabled: processed_image, preprocessing_steps = self._preprocess_image(image) # Perform OCR ocr_result = self._perform_ocr(processed_image) # Update extracted image with OCR results extracted_image.ocr_text = ocr_result.text extracted_image.ocr_confidence = ocr_result.confidence # Update metadata if 'ocr_result' not in extracted_image.metadata: extracted_image.metadata['ocr_result'] = {} extracted_image.metadata['ocr_result'].update({ 'word_count': ocr_result.word_count, 'processing_time': ocr_result.processing_time, 'preprocessing_applied': preprocessing_steps, 'ocr_language': self.ocr_language, 'ocr_engine': self.ocr_engine }) logger.debug(f"OCR completed for {extracted_image.image_id}: " f"{len(ocr_result.text)} characters, confidence: {ocr_result.confidence:.1f}%") return extracted_image except Exception as e: logger.warning(f"Failed to process extracted image {extracted_image.image_id}: {e}") # Return original image with error info extracted_image.metadata['ocr_error'] = str(e) return extracted_image def _analyze_image(self, image: Image.Image, file_path: str) -> ImageAnalysis: """ Analyze image properties and characteristics. Args: image: PIL Image object file_path: Path to the image file Returns: ImageAnalysis object with image properties """ try: # Basic properties width, height = image.size format_name = image.format or Path(file_path).suffix[1:].upper() mode = image.mode # File size size_bytes = Path(file_path).stat().st_size # Convert to grayscale for analysis if image.mode != 'L': gray_image = image.convert('L') else: gray_image = image # Calculate brightness and contrast np_image = np.array(gray_image) average_brightness = np.mean(np_image) contrast_level = np.std(np_image) # Check if image is grayscale is_grayscale = mode in ['L', '1'] or (mode == 'RGB' and self._is_grayscale_rgb(image)) # Estimate DPI if available estimated_dpi = None if hasattr(image, 'info') and 'dpi' in image.info: estimated_dpi = image.info['dpi'][0] if isinstance(image.info['dpi'], tuple) else image.info['dpi'] return ImageAnalysis( width=width, height=height, format=format_name, mode=mode, size_bytes=size_bytes, is_grayscale=is_grayscale, average_brightness=float(average_brightness), contrast_level=float(contrast_level), estimated_dpi=estimated_dpi ) except Exception as e: logger.warning(f"Failed to analyze image: {e}") # Return basic analysis return ImageAnalysis( width=image.size[0], height=image.size[1], format=image.format or "UNKNOWN", mode=image.mode, size_bytes=0, is_grayscale=False, average_brightness=128.0, contrast_level=50.0 ) def _is_grayscale_rgb(self, image: Image.Image) -> bool: """ Check if an RGB image is actually grayscale. Args: image: PIL Image object in RGB mode Returns: True if image is grayscale, False otherwise """ try: # Sample a few pixels to check if R=G=B sample_size = min(100, image.size[0] * image.size[1]) pixels = list(image.getdata()) # Check first 'sample_size' pixels for i in range(0, min(sample_size, len(pixels))): r, g, b = pixels[i][:3] # Handle RGBA by taking only RGB if r != g or g != b: return False return True except Exception: return False def _preprocess_image(self, image: Image.Image) -> Tuple[Image.Image, List[str]]: """ Preprocess image to improve OCR accuracy. Args: image: PIL Image object Returns: Tuple of (processed_image, list_of_applied_steps) """ processed_image = image.copy() applied_steps = [] try: # Resize if image is too large if image.size[0] > self.max_image_size[0] or image.size[1] > self.max_image_size[1]: processed_image.thumbnail(self.max_image_size, Image.Resampling.LANCZOS) applied_steps.append("resize") # Convert to grayscale if not already if processed_image.mode != 'L': processed_image = processed_image.convert('L') applied_steps.append("grayscale_conversion") # Enhance contrast if enabled if self.enhance_contrast: enhancer = ImageEnhance.Contrast(processed_image) processed_image = enhancer.enhance(1.5) # Increase contrast by 50% applied_steps.append("contrast_enhancement") # Enhance sharpness if enabled if self.enhance_sharpness: enhancer = ImageEnhance.Sharpness(processed_image) processed_image = enhancer.enhance(1.2) # Increase sharpness by 20% applied_steps.append("sharpness_enhancement") # Apply noise reduction processed_image = processed_image.filter(ImageFilter.MedianFilter(size=3)) applied_steps.append("noise_reduction") except Exception as e: logger.warning(f"Error during image preprocessing: {e}") # Return original image if preprocessing fails return image, ["preprocessing_failed"] return processed_image, applied_steps def _perform_ocr(self, image: Image.Image) -> OCRResult: """ Perform OCR on the processed image. Args: image: PIL Image object Returns: OCRResult with extracted text and metadata """ import time start_time = time.time() try: # Configure Tesseract custom_config = r'--oem 3 --psm 6' # Use LSTM OCR Engine Mode with uniform text block # Get text with confidence scores data = pytesseract.image_to_data( image, lang=self.ocr_language, config=custom_config, output_type=pytesseract.Output.DICT ) # Extract text and calculate average confidence words = [] confidences = [] for i, conf in enumerate(data['conf']): if int(conf) > 0: # Only include words with confidence > 0 word = data['text'][i].strip() if word: # Only include non-empty words words.append(word) confidences.append(int(conf)) # Combine words into text extracted_text = ' '.join(words) # Calculate average confidence avg_confidence = sum(confidences) / len(confidences) if confidences else 0.0 # Calculate processing time processing_time = time.time() - start_time # Clean up text extracted_text = self._clean_ocr_text(extracted_text) return OCRResult( text=extracted_text, confidence=avg_confidence, word_count=len(words), processing_time=processing_time, preprocessing_applied=[] # Will be filled by caller ) except Exception as e: logger.error(f"OCR processing failed: {e}") processing_time = time.time() - start_time return OCRResult( text="", confidence=0.0, word_count=0, processing_time=processing_time, preprocessing_applied=[] ) def _clean_ocr_text(self, text: str) -> str: """ Clean and normalize OCR extracted text. Args: text: Raw OCR text Returns: Cleaned text """ if not text: return "" # Remove excessive whitespace import re text = re.sub(r'\s+', ' ', text) # Remove common OCR artifacts text = text.replace('|', 'I') # Common misrecognition text = text.replace('0', 'O') # In some contexts # Strip leading/trailing whitespace text = text.strip() return text def batch_process_images(self, image_list: List[ExtractedImage]) -> List[ExtractedImage]: """ Process multiple extracted images in batch. Args: image_list: List of ExtractedImage objects Returns: List of processed ExtractedImage objects with OCR text """ processed_images = [] logger.info(f"Starting batch OCR processing for {len(image_list)} images") for i, extracted_image in enumerate(image_list): try: logger.debug(f"Processing image {i+1}/{len(image_list)}: {extracted_image.image_id}") processed_image = self.process_extracted_image(extracted_image) processed_images.append(processed_image) except Exception as e: logger.warning(f"Failed to process image {extracted_image.image_id}: {e}") # Add original image with error info extracted_image.metadata['batch_processing_error'] = str(e) processed_images.append(extracted_image) logger.info(f"Completed batch OCR processing: {len(processed_images)} images processed") return processed_images # Register the Image processor DocumentProcessorFactory.register_processor(DocumentType.IMAGE, ImageProcessor) if __name__=="__main__": logger.info(f"Image processor init ..")