rag_korean_manufacturing_docs / src /excel_processor.py
A7m0d's picture
Upload folder using huggingface_hub
7dfe46c verified
import logging
from pathlib import Path
from typing import Dict, List, Any, Optional, Tuple
import openpyxl
from openpyxl.worksheet.worksheet import Worksheet
from openpyxl.drawing.image import Image as OpenpyxlImage
from dataclasses import dataclass
import io
from PIL import Image as PILImage
import sys
import os
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from src.document_processor import (
DocumentProcessor,
ProcessedDocument,
DocumentType,
ProcessingStatus,
DocumentProcessingError,
ExtractedImage,
ExtractedTable,
DocumentProcessorFactory
)
try:
from logger.custom_logger import CustomLoggerTracker
custom_log = CustomLoggerTracker()
logger = custom_log.get_logger("excel_processor")
except ImportError:
# Fallback to standard logging if custom logger not available
logger = logging.getLogger("excel_processor")
@dataclass
class ExcelWorksheetInfo:
"""Information about an Excel worksheet."""
name: str
max_row: int
max_column: int
cell_count: int
image_count: int
table_count: int
has_data: bool
@dataclass
class CellRange:
"""Represents a range of cells in Excel."""
start_row: int
start_col: int
end_row: int
end_col: int
def to_excel_range(self) -> str:
"""Convert to Excel range notation (e.g., 'A1:C5')."""
start_col_letter = openpyxl.utils.get_column_letter(self.start_col)
end_col_letter = openpyxl.utils.get_column_letter(self.end_col)
return f"{start_col_letter}{self.start_row}:{end_col_letter}{self.end_row}"
class ExcelProcessor(DocumentProcessor):
"""
Excel document processor using openpyxl.
This processor extracts data from Excel worksheets, embedded images,
and maintains proper citations with worksheet names and cell references.
"""
def __init__(self, config: Dict[str, Any]):
"""
Initialize the Excel processor.
Args:
config: Configuration dictionary containing Excel processing settings
"""
super().__init__(config)
self.extract_images = config.get('image_processing', True)
self.extract_tables = config.get('table_extraction', True)
self.min_table_rows = config.get('min_table_rows', 2)
self.min_table_cols = config.get('min_table_cols', 2)
self.max_empty_rows = config.get('max_empty_rows', 5)
self.max_empty_cols = config.get('max_empty_cols', 5)
logger.info(f"Excel processor initialized with image_processing={self.extract_images}, "
f"table_extraction={self.extract_tables}")
def _get_supported_extensions(self) -> List[str]:
"""Get supported file extensions for Excel processor."""
return ['.xlsx', '.xls', '.xlsm']
def process_document(self, file_path: str) -> ProcessedDocument:
"""
Process an Excel document and extract all content.
Args:
file_path: Path to the Excel file
Returns:
ProcessedDocument with extracted content and metadata
Raises:
DocumentProcessingError: If Excel processing fails
"""
try:
# Validate file first
self.validate_file(file_path)
# Generate document ID
document_id = self._generate_document_id(file_path)
logger.info(f"Processing Excel document: {file_path}")
# Open Excel workbook
workbook = openpyxl.load_workbook(file_path, data_only=True)
try:
# Extract metadata
metadata = self._extract_metadata(workbook, file_path)
# Process all worksheets
all_content = []
all_images = []
all_tables = []
worksheet_info = []
for sheet_name in workbook.sheetnames:
worksheet = workbook[sheet_name]
logger.debug(f"Processing worksheet: {sheet_name}")
# Extract data from worksheet
sheet_content = self._extract_worksheet_content(worksheet, sheet_name)
if sheet_content.strip():
all_content.append(f"[Worksheet: {sheet_name}]\n{sheet_content}")
# Extract images if enabled
if self.extract_images:
sheet_images = self._extract_worksheet_images(worksheet, sheet_name, document_id)
all_images.extend(sheet_images)
# Extract tables if enabled
if self.extract_tables:
sheet_tables = self._extract_worksheet_tables(worksheet, sheet_name)
all_tables.extend(sheet_tables)
# Collect worksheet info
worksheet_info.append(ExcelWorksheetInfo(
name=sheet_name,
max_row=worksheet.max_row or 0,
max_column=worksheet.max_column or 0,
cell_count=self._count_non_empty_cells(worksheet),
image_count=len(sheet_images) if self.extract_images else 0,
table_count=len(sheet_tables) if self.extract_tables else 0,
has_data=bool(sheet_content.strip())
))
# Combine all content
full_content = "\n\n".join(all_content)
# Update metadata with processing info
metadata.update({
'total_worksheets': len(workbook.sheetnames),
'worksheet_names': workbook.sheetnames,
'total_images': len(all_images),
'total_tables': len(all_tables),
'total_content_length': len(full_content),
'worksheet_info': [
{
'name': info.name,
'max_row': info.max_row,
'max_column': info.max_column,
'cell_count': info.cell_count,
'image_count': info.image_count,
'table_count': info.table_count,
'has_data': info.has_data
}
for info in worksheet_info
]
})
# Create processed document
processed_doc = ProcessedDocument(
document_id=document_id,
filename=Path(file_path).name,
file_path=file_path,
document_type=DocumentType.EXCEL,
content=full_content,
metadata=metadata,
images=all_images,
tables=all_tables,
processing_status=ProcessingStatus.COMPLETED
)
logger.info(f"Successfully processed Excel: {len(workbook.sheetnames)} worksheets, "
f"{len(all_images)} images, {len(all_tables)} tables")
return processed_doc
finally:
workbook.close()
except Exception as e:
logger.error(f"Failed to process Excel {file_path}: {e}")
# Create failed document
document_id = self._generate_document_id(file_path)
return ProcessedDocument(
document_id=document_id,
filename=Path(file_path).name,
file_path=file_path,
document_type=DocumentType.EXCEL,
content="",
metadata={},
processing_status=ProcessingStatus.FAILED,
error_message=str(e)
)
def _extract_metadata(self, workbook: openpyxl.Workbook, file_path: str) -> Dict[str, Any]:
metadata = {}
try:
props = workbook.properties
if props.title:
metadata['title'] = props.title
if props.creator:
metadata['creator'] = props.creator
if props.description:
metadata['description'] = props.description
if props.subject:
metadata['subject'] = props.subject
if props.keywords:
metadata['keywords'] = props.keywords
if props.category:
metadata['category'] = props.category
if props.created:
metadata['created'] = props.created.isoformat()
if props.modified:
metadata['modified'] = props.modified.isoformat()
if props.lastModifiedBy:
metadata['last_modified_by'] = props.lastModifiedBy
# Workbook info
metadata['worksheet_count'] = len(workbook.sheetnames)
metadata['active_sheet'] = workbook.active.title if workbook.active else None
# File info
file_path_obj = Path(file_path)
metadata['file_size'] = file_path_obj.stat().st_size
metadata['file_extension'] = file_path_obj.suffix
except Exception as e:
logger.warning(f"Failed to extract Excel metadata: {e}")
metadata['metadata_extraction_error'] = str(e)
return metadata
def _extract_worksheet_content(self, worksheet: Worksheet, sheet_name: str) -> str:
try:
content_lines = []
if not worksheet.max_row or worksheet.max_row == 1:
return ""
# Iterate through rows and columns
for row in worksheet.iter_rows(min_row=1, max_row=worksheet.max_row,
min_col=1, max_col=worksheet.max_column,
values_only=True):
# Convert row values to strings, handling None values
row_values = []
for cell_value in row:
if cell_value is not None:
# Handle different data types
if isinstance(cell_value, (int, float)):
row_values.append(str(cell_value))
elif isinstance(cell_value, str):
row_values.append(cell_value.strip())
else:
row_values.append(str(cell_value))
else:
row_values.append("")
# Skip completely empty rows
if any(val.strip() for val in row_values if val):
# Join non-empty values with tabs
row_text = "\t".join(row_values)
content_lines.append(row_text)
return "\n".join(content_lines)
except Exception as e:
logger.warning(f"Failed to extract content from worksheet {sheet_name}: {e}")
return ""
def _extract_worksheet_images(self, worksheet: Worksheet, sheet_name: str, document_id: str) -> List[ExtractedImage]:
images = []
try:
# Get images from worksheet
if hasattr(worksheet, '_images') and worksheet._images:
for img_index, img in enumerate(worksheet._images):
try:
# Extract image data
image_data = self._extract_image_data(img)
if not image_data:
continue
# Create image object
image_id = f"{document_id}_{sheet_name}_img{img_index}"
filename = f"{sheet_name}_image{img_index}.{image_data['format'].lower()}"
# Get image position if available
anchor_info = {}
if hasattr(img, 'anchor') and img.anchor:
if hasattr(img.anchor, '_from'):
anchor_info['from_cell'] = f"{img.anchor._from.col}{img.anchor._from.row}"
if hasattr(img.anchor, 'to'):
anchor_info['to_cell'] = f"{img.anchor.to.col}{img.anchor.to.row}"
extracted_image = ExtractedImage(
image_id=image_id,
filename=filename,
content=image_data['content'],
format=image_data['format'],
width=image_data.get('width'),
height=image_data.get('height'),
extraction_method="openpyxl",
metadata={
'worksheet_name': sheet_name,
'image_index': img_index,
'size_bytes': len(image_data['content']),
'anchor_info': anchor_info
}
)
images.append(extracted_image)
except Exception as e:
logger.warning(f"Failed to extract image {img_index} from worksheet {sheet_name}: {e}")
continue
except Exception as e:
logger.warning(f"Failed to extract images from worksheet {sheet_name}: {e}")
return images
def _extract_image_data(self, img: OpenpyxlImage) -> Optional[Dict[str, Any]]:
"""
Extract data from an openpyxl Image object.
Args:
img: openpyxl Image object
Returns:
Dictionary with image data or None if extraction fails
"""
try:
# Get image data
if hasattr(img, 'ref') and img.ref:
# Image has reference to external file
image_data = img.ref
elif hasattr(img, '_data') and img._data:
# Image data is embedded
image_data = img._data()
else:
logger.warning("No image data found in image object")
return None
# Determine format
image_format = "PNG" # Default
if hasattr(img, 'format') and img.format:
image_format = img.format.upper()
# Try to get dimensions using PIL
width, height = None, None
try:
with io.BytesIO(image_data) as img_buffer:
pil_img = PILImage.open(img_buffer)
width, height = pil_img.size
except Exception as e:
logger.debug(f"Could not determine image dimensions: {e}")
return {
'content': image_data,
'format': image_format,
'width': width,
'height': height
}
except Exception as e:
logger.warning(f"Failed to extract image data: {e}")
return None
def _extract_worksheet_tables(self, worksheet: Worksheet, sheet_name: str) -> List[ExtractedTable]:
"""
Extract tables from an Excel worksheet.
Args:
worksheet: openpyxl Worksheet object
sheet_name: Name of the worksheet
Returns:
List of ExtractedTable objects
"""
tables = []
try:
# First, try to extract defined tables
if hasattr(worksheet, 'tables') and worksheet.tables:
for table_name, table in worksheet.tables.items():
try:
extracted_table = self._extract_defined_table(table, sheet_name, len(tables))
if extracted_table:
tables.append(extracted_table)
except Exception as e:
logger.warning(f"Failed to extract defined table {table_name}: {e}")
# If no defined tables found, try to detect tables from data
if not tables and self.extract_tables:
detected_tables = self._detect_data_tables(worksheet, sheet_name)
tables.extend(detected_tables)
except Exception as e:
logger.warning(f"Failed to extract tables from worksheet {sheet_name}: {e}")
return tables
def _extract_defined_table(self, table, sheet_name: str, table_index: int) -> Optional[ExtractedTable]:
"""
Extract a defined Excel table.
Args:
table: Excel table object
sheet_name: Name of the worksheet
table_index: Index of the table
Returns:
ExtractedTable object or None if extraction fails
"""
try:
# Get table range
table_range = table.ref
# Parse range (e.g., "A1:C10")
start_cell, end_cell = table_range.split(':')
# Get table data from worksheet
worksheet = table.parent
table_data = []
for row in worksheet[table_range]:
row_data = []
for cell in row:
value = cell.value if cell.value is not None else ""
row_data.append(str(value))
table_data.append(row_data)
if not table_data:
return None
# First row is typically headers
headers = table_data[0] if table_data else []
rows = table_data[1:] if len(table_data) > 1 else []
# Create table object
table_id = f"{sheet_name}_table{table_index}"
return ExtractedTable(
table_id=table_id,
headers=headers,
rows=rows,
worksheet_name=sheet_name,
cell_range=table_range,
extraction_confidence=0.9, # High confidence for defined tables
metadata={
'extraction_method': 'defined_table',
'table_index': table_index,
'table_name': getattr(table, 'name', '')
}
)
except Exception as e:
logger.warning(f"Failed to extract defined table: {e}")
return None
def _detect_data_tables(self, worksheet: Worksheet, sheet_name: str) -> List[ExtractedTable]:
"""
Detect tables from worksheet data patterns.
Args:
worksheet: openpyxl Worksheet object
sheet_name: Name of the worksheet
Returns:
List of detected ExtractedTable objects
"""
tables = []
try:
if not worksheet.max_row or worksheet.max_row < self.min_table_rows:
return tables
# Simple table detection: look for contiguous data blocks
data_blocks = self._find_data_blocks(worksheet)
for block_index, data_block in enumerate(data_blocks):
if len(data_block) >= self.min_table_rows and len(data_block[0]) >= self.min_table_cols:
# Create table from data block
headers = data_block[0]
rows = data_block[1:]
# Calculate cell range
start_row = 1 # This is simplified - in reality would need to track actual positions
end_row = start_row + len(data_block) - 1
start_col = 1
end_col = len(headers)
cell_range = CellRange(start_row, start_col, end_row, end_col).to_excel_range()
table_id = f"{sheet_name}_detected_table{block_index}"
table = ExtractedTable(
table_id=table_id,
headers=headers,
rows=rows,
worksheet_name=sheet_name,
cell_range=cell_range,
extraction_confidence=0.7, # Lower confidence for detected tables
metadata={
'extraction_method': 'data_pattern_detection',
'table_index': block_index
}
)
tables.append(table)
except Exception as e:
logger.warning(f"Failed to detect data tables: {e}")
return tables
def _find_data_blocks(self, worksheet: Worksheet) -> List[List[List[str]]]:
"""
Find contiguous blocks of data in the worksheet.
Args:
worksheet: openpyxl Worksheet object
Returns:
List of data blocks, where each block is a list of rows
"""
data_blocks = []
try:
current_block = []
empty_row_count = 0
for row in worksheet.iter_rows(min_row=1, max_row=worksheet.max_row,
min_col=1, max_col=worksheet.max_column,
values_only=True):
# Convert row to strings
row_values = []
has_data = False
for cell_value in row:
if cell_value is not None:
row_values.append(str(cell_value).strip())
if str(cell_value).strip():
has_data = True
else:
row_values.append("")
if has_data:
# Reset empty row count and add to current block
empty_row_count = 0
current_block.append(row_values)
else:
# Empty row
empty_row_count += 1
# If we've seen too many empty rows, end the current block
if empty_row_count >= self.max_empty_rows and current_block:
if len(current_block) >= self.min_table_rows:
data_blocks.append(current_block)
current_block = []
empty_row_count = 0
# Add final block if it exists
if current_block and len(current_block) >= self.min_table_rows:
data_blocks.append(current_block)
except Exception as e:
logger.warning(f"Failed to find data blocks: {e}")
return data_blocks
def _count_non_empty_cells(self, worksheet: Worksheet) -> int:
"""
Count non-empty cells in a worksheet.
Args:
worksheet: openpyxl Worksheet object
Returns:
Number of non-empty cells
"""
count = 0
try:
for row in worksheet.iter_rows(values_only=True):
for cell_value in row:
if cell_value is not None and str(cell_value).strip():
count += 1
except Exception as e:
logger.warning(f"Failed to count non-empty cells: {e}")
return count
# Register the Excel processor
DocumentProcessorFactory.register_processor(DocumentType.EXCEL, ExcelProcessor)
if __name__=="__main__":
logger.info(f"Start excel processing...")