Spaces:
Sleeping
Sleeping
File size: 24,540 Bytes
7dfe46c |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 |
import logging
from pathlib import Path
from typing import Dict, List, Any, Optional, Tuple
import openpyxl
from openpyxl.worksheet.worksheet import Worksheet
from openpyxl.drawing.image import Image as OpenpyxlImage
from dataclasses import dataclass
import io
from PIL import Image as PILImage
import sys
import os
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from src.document_processor import (
DocumentProcessor,
ProcessedDocument,
DocumentType,
ProcessingStatus,
DocumentProcessingError,
ExtractedImage,
ExtractedTable,
DocumentProcessorFactory
)
try:
from logger.custom_logger import CustomLoggerTracker
custom_log = CustomLoggerTracker()
logger = custom_log.get_logger("excel_processor")
except ImportError:
# Fallback to standard logging if custom logger not available
logger = logging.getLogger("excel_processor")
@dataclass
class ExcelWorksheetInfo:
"""Information about an Excel worksheet."""
name: str
max_row: int
max_column: int
cell_count: int
image_count: int
table_count: int
has_data: bool
@dataclass
class CellRange:
"""Represents a range of cells in Excel."""
start_row: int
start_col: int
end_row: int
end_col: int
def to_excel_range(self) -> str:
"""Convert to Excel range notation (e.g., 'A1:C5')."""
start_col_letter = openpyxl.utils.get_column_letter(self.start_col)
end_col_letter = openpyxl.utils.get_column_letter(self.end_col)
return f"{start_col_letter}{self.start_row}:{end_col_letter}{self.end_row}"
class ExcelProcessor(DocumentProcessor):
"""
Excel document processor using openpyxl.
This processor extracts data from Excel worksheets, embedded images,
and maintains proper citations with worksheet names and cell references.
"""
def __init__(self, config: Dict[str, Any]):
"""
Initialize the Excel processor.
Args:
config: Configuration dictionary containing Excel processing settings
"""
super().__init__(config)
self.extract_images = config.get('image_processing', True)
self.extract_tables = config.get('table_extraction', True)
self.min_table_rows = config.get('min_table_rows', 2)
self.min_table_cols = config.get('min_table_cols', 2)
self.max_empty_rows = config.get('max_empty_rows', 5)
self.max_empty_cols = config.get('max_empty_cols', 5)
logger.info(f"Excel processor initialized with image_processing={self.extract_images}, "
f"table_extraction={self.extract_tables}")
def _get_supported_extensions(self) -> List[str]:
"""Get supported file extensions for Excel processor."""
return ['.xlsx', '.xls', '.xlsm']
def process_document(self, file_path: str) -> ProcessedDocument:
"""
Process an Excel document and extract all content.
Args:
file_path: Path to the Excel file
Returns:
ProcessedDocument with extracted content and metadata
Raises:
DocumentProcessingError: If Excel processing fails
"""
try:
# Validate file first
self.validate_file(file_path)
# Generate document ID
document_id = self._generate_document_id(file_path)
logger.info(f"Processing Excel document: {file_path}")
# Open Excel workbook
workbook = openpyxl.load_workbook(file_path, data_only=True)
try:
# Extract metadata
metadata = self._extract_metadata(workbook, file_path)
# Process all worksheets
all_content = []
all_images = []
all_tables = []
worksheet_info = []
for sheet_name in workbook.sheetnames:
worksheet = workbook[sheet_name]
logger.debug(f"Processing worksheet: {sheet_name}")
# Extract data from worksheet
sheet_content = self._extract_worksheet_content(worksheet, sheet_name)
if sheet_content.strip():
all_content.append(f"[Worksheet: {sheet_name}]\n{sheet_content}")
# Extract images if enabled
if self.extract_images:
sheet_images = self._extract_worksheet_images(worksheet, sheet_name, document_id)
all_images.extend(sheet_images)
# Extract tables if enabled
if self.extract_tables:
sheet_tables = self._extract_worksheet_tables(worksheet, sheet_name)
all_tables.extend(sheet_tables)
# Collect worksheet info
worksheet_info.append(ExcelWorksheetInfo(
name=sheet_name,
max_row=worksheet.max_row or 0,
max_column=worksheet.max_column or 0,
cell_count=self._count_non_empty_cells(worksheet),
image_count=len(sheet_images) if self.extract_images else 0,
table_count=len(sheet_tables) if self.extract_tables else 0,
has_data=bool(sheet_content.strip())
))
# Combine all content
full_content = "\n\n".join(all_content)
# Update metadata with processing info
metadata.update({
'total_worksheets': len(workbook.sheetnames),
'worksheet_names': workbook.sheetnames,
'total_images': len(all_images),
'total_tables': len(all_tables),
'total_content_length': len(full_content),
'worksheet_info': [
{
'name': info.name,
'max_row': info.max_row,
'max_column': info.max_column,
'cell_count': info.cell_count,
'image_count': info.image_count,
'table_count': info.table_count,
'has_data': info.has_data
}
for info in worksheet_info
]
})
# Create processed document
processed_doc = ProcessedDocument(
document_id=document_id,
filename=Path(file_path).name,
file_path=file_path,
document_type=DocumentType.EXCEL,
content=full_content,
metadata=metadata,
images=all_images,
tables=all_tables,
processing_status=ProcessingStatus.COMPLETED
)
logger.info(f"Successfully processed Excel: {len(workbook.sheetnames)} worksheets, "
f"{len(all_images)} images, {len(all_tables)} tables")
return processed_doc
finally:
workbook.close()
except Exception as e:
logger.error(f"Failed to process Excel {file_path}: {e}")
# Create failed document
document_id = self._generate_document_id(file_path)
return ProcessedDocument(
document_id=document_id,
filename=Path(file_path).name,
file_path=file_path,
document_type=DocumentType.EXCEL,
content="",
metadata={},
processing_status=ProcessingStatus.FAILED,
error_message=str(e)
)
def _extract_metadata(self, workbook: openpyxl.Workbook, file_path: str) -> Dict[str, Any]:
metadata = {}
try:
props = workbook.properties
if props.title:
metadata['title'] = props.title
if props.creator:
metadata['creator'] = props.creator
if props.description:
metadata['description'] = props.description
if props.subject:
metadata['subject'] = props.subject
if props.keywords:
metadata['keywords'] = props.keywords
if props.category:
metadata['category'] = props.category
if props.created:
metadata['created'] = props.created.isoformat()
if props.modified:
metadata['modified'] = props.modified.isoformat()
if props.lastModifiedBy:
metadata['last_modified_by'] = props.lastModifiedBy
# Workbook info
metadata['worksheet_count'] = len(workbook.sheetnames)
metadata['active_sheet'] = workbook.active.title if workbook.active else None
# File info
file_path_obj = Path(file_path)
metadata['file_size'] = file_path_obj.stat().st_size
metadata['file_extension'] = file_path_obj.suffix
except Exception as e:
logger.warning(f"Failed to extract Excel metadata: {e}")
metadata['metadata_extraction_error'] = str(e)
return metadata
def _extract_worksheet_content(self, worksheet: Worksheet, sheet_name: str) -> str:
try:
content_lines = []
if not worksheet.max_row or worksheet.max_row == 1:
return ""
# Iterate through rows and columns
for row in worksheet.iter_rows(min_row=1, max_row=worksheet.max_row,
min_col=1, max_col=worksheet.max_column,
values_only=True):
# Convert row values to strings, handling None values
row_values = []
for cell_value in row:
if cell_value is not None:
# Handle different data types
if isinstance(cell_value, (int, float)):
row_values.append(str(cell_value))
elif isinstance(cell_value, str):
row_values.append(cell_value.strip())
else:
row_values.append(str(cell_value))
else:
row_values.append("")
# Skip completely empty rows
if any(val.strip() for val in row_values if val):
# Join non-empty values with tabs
row_text = "\t".join(row_values)
content_lines.append(row_text)
return "\n".join(content_lines)
except Exception as e:
logger.warning(f"Failed to extract content from worksheet {sheet_name}: {e}")
return ""
def _extract_worksheet_images(self, worksheet: Worksheet, sheet_name: str, document_id: str) -> List[ExtractedImage]:
images = []
try:
# Get images from worksheet
if hasattr(worksheet, '_images') and worksheet._images:
for img_index, img in enumerate(worksheet._images):
try:
# Extract image data
image_data = self._extract_image_data(img)
if not image_data:
continue
# Create image object
image_id = f"{document_id}_{sheet_name}_img{img_index}"
filename = f"{sheet_name}_image{img_index}.{image_data['format'].lower()}"
# Get image position if available
anchor_info = {}
if hasattr(img, 'anchor') and img.anchor:
if hasattr(img.anchor, '_from'):
anchor_info['from_cell'] = f"{img.anchor._from.col}{img.anchor._from.row}"
if hasattr(img.anchor, 'to'):
anchor_info['to_cell'] = f"{img.anchor.to.col}{img.anchor.to.row}"
extracted_image = ExtractedImage(
image_id=image_id,
filename=filename,
content=image_data['content'],
format=image_data['format'],
width=image_data.get('width'),
height=image_data.get('height'),
extraction_method="openpyxl",
metadata={
'worksheet_name': sheet_name,
'image_index': img_index,
'size_bytes': len(image_data['content']),
'anchor_info': anchor_info
}
)
images.append(extracted_image)
except Exception as e:
logger.warning(f"Failed to extract image {img_index} from worksheet {sheet_name}: {e}")
continue
except Exception as e:
logger.warning(f"Failed to extract images from worksheet {sheet_name}: {e}")
return images
def _extract_image_data(self, img: OpenpyxlImage) -> Optional[Dict[str, Any]]:
"""
Extract data from an openpyxl Image object.
Args:
img: openpyxl Image object
Returns:
Dictionary with image data or None if extraction fails
"""
try:
# Get image data
if hasattr(img, 'ref') and img.ref:
# Image has reference to external file
image_data = img.ref
elif hasattr(img, '_data') and img._data:
# Image data is embedded
image_data = img._data()
else:
logger.warning("No image data found in image object")
return None
# Determine format
image_format = "PNG" # Default
if hasattr(img, 'format') and img.format:
image_format = img.format.upper()
# Try to get dimensions using PIL
width, height = None, None
try:
with io.BytesIO(image_data) as img_buffer:
pil_img = PILImage.open(img_buffer)
width, height = pil_img.size
except Exception as e:
logger.debug(f"Could not determine image dimensions: {e}")
return {
'content': image_data,
'format': image_format,
'width': width,
'height': height
}
except Exception as e:
logger.warning(f"Failed to extract image data: {e}")
return None
def _extract_worksheet_tables(self, worksheet: Worksheet, sheet_name: str) -> List[ExtractedTable]:
"""
Extract tables from an Excel worksheet.
Args:
worksheet: openpyxl Worksheet object
sheet_name: Name of the worksheet
Returns:
List of ExtractedTable objects
"""
tables = []
try:
# First, try to extract defined tables
if hasattr(worksheet, 'tables') and worksheet.tables:
for table_name, table in worksheet.tables.items():
try:
extracted_table = self._extract_defined_table(table, sheet_name, len(tables))
if extracted_table:
tables.append(extracted_table)
except Exception as e:
logger.warning(f"Failed to extract defined table {table_name}: {e}")
# If no defined tables found, try to detect tables from data
if not tables and self.extract_tables:
detected_tables = self._detect_data_tables(worksheet, sheet_name)
tables.extend(detected_tables)
except Exception as e:
logger.warning(f"Failed to extract tables from worksheet {sheet_name}: {e}")
return tables
def _extract_defined_table(self, table, sheet_name: str, table_index: int) -> Optional[ExtractedTable]:
"""
Extract a defined Excel table.
Args:
table: Excel table object
sheet_name: Name of the worksheet
table_index: Index of the table
Returns:
ExtractedTable object or None if extraction fails
"""
try:
# Get table range
table_range = table.ref
# Parse range (e.g., "A1:C10")
start_cell, end_cell = table_range.split(':')
# Get table data from worksheet
worksheet = table.parent
table_data = []
for row in worksheet[table_range]:
row_data = []
for cell in row:
value = cell.value if cell.value is not None else ""
row_data.append(str(value))
table_data.append(row_data)
if not table_data:
return None
# First row is typically headers
headers = table_data[0] if table_data else []
rows = table_data[1:] if len(table_data) > 1 else []
# Create table object
table_id = f"{sheet_name}_table{table_index}"
return ExtractedTable(
table_id=table_id,
headers=headers,
rows=rows,
worksheet_name=sheet_name,
cell_range=table_range,
extraction_confidence=0.9, # High confidence for defined tables
metadata={
'extraction_method': 'defined_table',
'table_index': table_index,
'table_name': getattr(table, 'name', '')
}
)
except Exception as e:
logger.warning(f"Failed to extract defined table: {e}")
return None
def _detect_data_tables(self, worksheet: Worksheet, sheet_name: str) -> List[ExtractedTable]:
"""
Detect tables from worksheet data patterns.
Args:
worksheet: openpyxl Worksheet object
sheet_name: Name of the worksheet
Returns:
List of detected ExtractedTable objects
"""
tables = []
try:
if not worksheet.max_row or worksheet.max_row < self.min_table_rows:
return tables
# Simple table detection: look for contiguous data blocks
data_blocks = self._find_data_blocks(worksheet)
for block_index, data_block in enumerate(data_blocks):
if len(data_block) >= self.min_table_rows and len(data_block[0]) >= self.min_table_cols:
# Create table from data block
headers = data_block[0]
rows = data_block[1:]
# Calculate cell range
start_row = 1 # This is simplified - in reality would need to track actual positions
end_row = start_row + len(data_block) - 1
start_col = 1
end_col = len(headers)
cell_range = CellRange(start_row, start_col, end_row, end_col).to_excel_range()
table_id = f"{sheet_name}_detected_table{block_index}"
table = ExtractedTable(
table_id=table_id,
headers=headers,
rows=rows,
worksheet_name=sheet_name,
cell_range=cell_range,
extraction_confidence=0.7, # Lower confidence for detected tables
metadata={
'extraction_method': 'data_pattern_detection',
'table_index': block_index
}
)
tables.append(table)
except Exception as e:
logger.warning(f"Failed to detect data tables: {e}")
return tables
def _find_data_blocks(self, worksheet: Worksheet) -> List[List[List[str]]]:
"""
Find contiguous blocks of data in the worksheet.
Args:
worksheet: openpyxl Worksheet object
Returns:
List of data blocks, where each block is a list of rows
"""
data_blocks = []
try:
current_block = []
empty_row_count = 0
for row in worksheet.iter_rows(min_row=1, max_row=worksheet.max_row,
min_col=1, max_col=worksheet.max_column,
values_only=True):
# Convert row to strings
row_values = []
has_data = False
for cell_value in row:
if cell_value is not None:
row_values.append(str(cell_value).strip())
if str(cell_value).strip():
has_data = True
else:
row_values.append("")
if has_data:
# Reset empty row count and add to current block
empty_row_count = 0
current_block.append(row_values)
else:
# Empty row
empty_row_count += 1
# If we've seen too many empty rows, end the current block
if empty_row_count >= self.max_empty_rows and current_block:
if len(current_block) >= self.min_table_rows:
data_blocks.append(current_block)
current_block = []
empty_row_count = 0
# Add final block if it exists
if current_block and len(current_block) >= self.min_table_rows:
data_blocks.append(current_block)
except Exception as e:
logger.warning(f"Failed to find data blocks: {e}")
return data_blocks
def _count_non_empty_cells(self, worksheet: Worksheet) -> int:
"""
Count non-empty cells in a worksheet.
Args:
worksheet: openpyxl Worksheet object
Returns:
Number of non-empty cells
"""
count = 0
try:
for row in worksheet.iter_rows(values_only=True):
for cell_value in row:
if cell_value is not None and str(cell_value).strip():
count += 1
except Exception as e:
logger.warning(f"Failed to count non-empty cells: {e}")
return count
# Register the Excel processor
DocumentProcessorFactory.register_processor(DocumentType.EXCEL, ExcelProcessor)
if __name__=="__main__":
logger.info(f"Start excel processing...") |