Spaces:
Sleeping
Sleeping
| import logging | |
| import time | |
| from pathlib import Path | |
| from typing import Dict, List, Any, Optional, Tuple | |
| from dataclasses import dataclass | |
| from concurrent.futures import ThreadPoolExecutor, as_completed | |
| import hashlib | |
| import os | |
| import sys | |
| sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) | |
| from src.document_processor import ( | |
| DocumentProcessor, DocumentProcessorFactory, ProcessedDocument, | |
| DocumentChunk, ProcessingStatus, DocumentType | |
| ) | |
| from src.embedding_system import EmbeddingSystem | |
| from src.vector_store import QdrantVectorStore | |
| from src.metadata_manager import MetadataManager, DocumentMetadata | |
| from src.image_processor import ImageProcessor | |
| try: | |
| from logger.custom_logger import CustomLoggerTracker | |
| custom_log = CustomLoggerTracker() | |
| logger = custom_log.get_logger("ingestion_pipeline") | |
| except ImportError: | |
| # Fallback to standard logging if custom logger not available | |
| logger = logging.getLogger("ingestion_pipeline") | |
| class IngestionResult: | |
| """Result of document ingestion.""" | |
| document_id: str | |
| filename: str | |
| success: bool | |
| processing_time: float | |
| chunks_created: int | |
| chunks_indexed: int | |
| error_message: Optional[str] = None | |
| warnings: List[str] = None | |
| def __post_init__(self): | |
| if self.warnings is None: | |
| self.warnings = [] | |
| class IngestionStats: | |
| """Statistics for batch ingestion.""" | |
| total_documents: int | |
| successful_documents: int | |
| failed_documents: int | |
| total_chunks: int | |
| total_processing_time: float | |
| average_processing_time: float | |
| documents_by_type: Dict[str, int] | |
| errors: List[str] | |
| def jina_embeddings(text: str) -> List[float]: | |
| JINA_API_KEY= "jina_a75b55a8a9524bb697ea016b164211ebF5IduSgA0Ku8lmI0pS9fnXoZ83Su" | |
| import requests | |
| headers = { | |
| 'Content-Type': 'application/json', | |
| 'Authorization': 'Bearer jina_a75b55a8a9524bb697ea016b164211ebF5IduSgA0Ku8lmI0pS9fnXoZ83Su'} | |
| data = { | |
| "model": "jina-embeddings-v3", | |
| "task": "retrieval.passage", | |
| "input": text} | |
| response = requests.post('https://api.jina.ai/v1/embeddings', headers=headers, json=data) | |
| return response.json()['data'][0]['embedding'] | |
| class DocumentIngestionPipeline: | |
| def __init__(self, config: Dict[str, Any]): | |
| self.config = config | |
| # Initialize components | |
| self.embedding_system = EmbeddingSystem(config) | |
| self.vector_store = QdrantVectorStore(config) | |
| self.metadata_manager = MetadataManager(config) | |
| # Initialize components with correct vector dimensions | |
| self.vector_size = config.get('vector_size', 1024) # Match Jina's dimension | |
| self.config['vector_size'] = self.vector_size # Update config for other components | |
| # Initialize image processor for OCR | |
| self.image_processor = ImageProcessor(config) | |
| # Pipeline settings | |
| self.chunk_size = config.get('chunk_size', 512) | |
| self.chunk_overlap = config.get('chunk_overlap', 50) | |
| self.batch_size = config.get('embedding_batch_size', 32) | |
| self.max_workers = config.get('max_workers', 4) | |
| self.enable_ocr = config.get('image_processing', True) | |
| logger.info(f"Document ingestion pipeline initialized") | |
| def ingest_document(self, file_path: str, document_id: Optional[str] = None) -> IngestionResult: | |
| """ | |
| Ingest a single document through the complete pipeline. | |
| Args: | |
| file_path: Path to the document file | |
| document_id: Optional custom document ID | |
| Returns: | |
| IngestionResult with processing details | |
| """ | |
| start_time = time.time() | |
| file_path_obj = Path(file_path) | |
| filename = file_path_obj.name | |
| try: | |
| logger.info(f"Starting ingestion of document: {filename}") | |
| # Generate document ID if not provided | |
| if not document_id: | |
| document_id = self._generate_document_id(file_path) | |
| # Check if document already exists | |
| existing_metadata = self.metadata_manager.get_document_metadata(document_id) | |
| if existing_metadata and existing_metadata.processing_status == ProcessingStatus.COMPLETED: | |
| logger.info(f"Document {filename} already processed, skipping") | |
| return IngestionResult( | |
| document_id=document_id, | |
| filename=filename, | |
| success=True, | |
| processing_time=0.0, | |
| chunks_created=existing_metadata.total_chunks, | |
| chunks_indexed=existing_metadata.total_chunks, | |
| warnings=["Document already processed"] | |
| ) | |
| # Step 1: Process document | |
| processed_doc = self._process_document(file_path) | |
| if processed_doc.processing_status == ProcessingStatus.FAILED: | |
| return IngestionResult( | |
| document_id=document_id, | |
| filename=filename, | |
| success=False, | |
| processing_time=time.time() - start_time, | |
| chunks_created=0, | |
| chunks_indexed=0, | |
| error_message=processed_doc.error_message | |
| ) | |
| # Step 2: Process images with OCR if enabled | |
| if self.enable_ocr and processed_doc.images: | |
| processed_doc.images = self.image_processor.batch_process_images(processed_doc.images) | |
| # Step 3: Create document chunks | |
| processor = DocumentProcessorFactory.create_processor(file_path, self.config) | |
| chunks = processor.extract_chunks(processed_doc, self.chunk_size, self.chunk_overlap) | |
| if not chunks: | |
| logger.warning(f"No chunks created for document: {filename}") | |
| return IngestionResult( | |
| document_id=document_id, | |
| filename=filename, | |
| success=False, | |
| processing_time=time.time() - start_time, | |
| chunks_created=0, | |
| chunks_indexed=0, | |
| error_message="No content chunks could be created" | |
| ) | |
| # Step 4: Generate embeddings | |
| chunk_texts = [chunk.content for chunk in chunks] | |
| logger.info(chunk_texts[:2]) | |
| # embeddings = self.embedding_system.generate_embeddings(chunk_texts) | |
| embeddings = [jina_embeddings(text) for text in chunk_texts] | |
| if not embeddings or len(embeddings) != len(chunks): | |
| logger.error(f"Embedding generation failed for document: {filename}") | |
| return IngestionResult( | |
| document_id=document_id, | |
| filename=filename, | |
| success=False, | |
| processing_time=time.time() - start_time, | |
| chunks_created=len(chunks), | |
| chunks_indexed=0, | |
| error_message="Failed to generate embeddings" | |
| ) | |
| # Attach embeddings to chunks | |
| for chunk, embedding in zip(chunks, embeddings): | |
| chunk.embedding = embedding | |
| # Step 5: Store in vector database | |
| vector_success = self.vector_store.add_documents(chunks) | |
| if not vector_success: | |
| logger.error(f"Failed to store vectors for document: {filename}") | |
| return IngestionResult( | |
| document_id=document_id, | |
| filename=filename, | |
| success=False, | |
| processing_time=time.time() - start_time, | |
| chunks_created=len(chunks), | |
| chunks_indexed=0, | |
| error_message="Failed to store document vectors" | |
| ) | |
| # Step 6: Store metadata | |
| processing_time = time.time() - start_time | |
| metadata = DocumentMetadata( | |
| document_id=document_id, | |
| filename=filename, | |
| file_path=file_path, | |
| file_type=processed_doc.document_type.value, | |
| upload_timestamp=processed_doc.processing_timestamp, | |
| processing_status=ProcessingStatus.COMPLETED, | |
| total_chunks=len(chunks), | |
| file_size=processed_doc.file_size, | |
| checksum=processed_doc.checksum, | |
| processing_time=processing_time, | |
| metadata_json=self._serialize_metadata(processed_doc.metadata) | |
| ) | |
| metadata_success = self.metadata_manager.store_document_metadata(document_id, metadata) | |
| if not metadata_success: | |
| logger.warning(f"Failed to store metadata for document: {filename}") | |
| logger.info(f"Successfully ingested document {filename}: {len(chunks)} chunks in {processing_time:.2f}s") | |
| return IngestionResult( | |
| document_id=document_id, | |
| filename=filename, | |
| success=True, | |
| processing_time=processing_time, | |
| chunks_created=len(chunks), | |
| chunks_indexed=len(chunks) | |
| ) | |
| except Exception as e: | |
| error_msg = f"Ingestion failed for {filename}: {str(e)}" | |
| logger.error(error_msg) | |
| # Update metadata with error status | |
| if document_id: | |
| self.metadata_manager.update_document_status( | |
| document_id, | |
| ProcessingStatus.FAILED, | |
| error_msg, | |
| time.time() - start_time | |
| ) | |
| return IngestionResult( | |
| document_id=document_id or "unknown", | |
| filename=filename, | |
| success=False, | |
| processing_time=time.time() - start_time, | |
| chunks_created=0, | |
| chunks_indexed=0, | |
| error_message=error_msg | |
| ) | |
| def ingest_batch(self, file_paths: List[str], max_workers: Optional[int] = None) -> IngestionStats: | |
| """ | |
| Ingest multiple documents in parallel. | |
| Args: | |
| file_paths: List of file paths to process | |
| max_workers: Maximum number of worker threads | |
| Returns: | |
| IngestionStats with batch processing results | |
| """ | |
| start_time = time.time() | |
| max_workers = max_workers or self.max_workers | |
| logger.info(f"Starting batch ingestion of {len(file_paths)} documents with {max_workers} workers") | |
| results = [] | |
| errors = [] | |
| documents_by_type = {} | |
| # Process documents in parallel | |
| with ThreadPoolExecutor(max_workers=max_workers) as executor: | |
| # Submit all tasks | |
| future_to_path = { | |
| executor.submit(self.ingest_document, file_path): file_path | |
| for file_path in file_paths | |
| } | |
| # Collect results | |
| for future in as_completed(future_to_path): | |
| file_path = future_to_path[future] | |
| try: | |
| result = future.result() | |
| results.append(result) | |
| # Track document types | |
| file_ext = Path(file_path).suffix.lower() | |
| documents_by_type[file_ext] = documents_by_type.get(file_ext, 0) + 1 | |
| if not result.success: | |
| errors.append(f"{result.filename}: {result.error_message}") | |
| except Exception as e: | |
| error_msg = f"Failed to process {file_path}: {str(e)}" | |
| errors.append(error_msg) | |
| logger.error(error_msg) | |
| # Calculate statistics | |
| successful_results = [r for r in results if r.success] | |
| failed_results = [r for r in results if not r.success] | |
| total_processing_time = time.time() - start_time | |
| total_chunks = sum(r.chunks_indexed for r in successful_results) | |
| avg_processing_time = ( | |
| sum(r.processing_time for r in results) / len(results) | |
| if results else 0.0 | |
| ) | |
| stats = IngestionStats( | |
| total_documents=len(file_paths), | |
| successful_documents=len(successful_results), | |
| failed_documents=len(failed_results), | |
| total_chunks=total_chunks, | |
| total_processing_time=total_processing_time, | |
| average_processing_time=avg_processing_time, | |
| documents_by_type=documents_by_type, | |
| errors=errors | |
| ) | |
| logger.info(f"Batch ingestion completed: {stats.successful_documents}/{stats.total_documents} " | |
| f"documents processed successfully in {total_processing_time:.2f}s") | |
| return stats | |
| def reprocess_document(self, document_id: str) -> IngestionResult: | |
| """ | |
| Reprocess an existing document. | |
| Args: | |
| document_id: ID of the document to reprocess | |
| Returns: | |
| IngestionResult with reprocessing details | |
| """ | |
| # Get existing metadata | |
| metadata = self.metadata_manager.get_document_metadata(document_id) | |
| if not metadata: | |
| return IngestionResult( | |
| document_id=document_id, | |
| filename="unknown", | |
| success=False, | |
| processing_time=0.0, | |
| chunks_created=0, | |
| chunks_indexed=0, | |
| error_message="Document not found in metadata" | |
| ) | |
| # Delete existing vectors | |
| self.vector_store.delete_document(document_id) | |
| # Reprocess the document | |
| return self.ingest_document(metadata.file_path, document_id) | |
| def delete_document(self, document_id: str) -> bool: | |
| """ | |
| Delete a document and all associated data. | |
| Args: | |
| document_id: ID of the document to delete | |
| Returns: | |
| True if successful, False otherwise | |
| """ | |
| try: | |
| # Delete from vector store | |
| vector_success = self.vector_store.delete_document(document_id) | |
| # Delete from metadata | |
| metadata_success = self.metadata_manager.delete_document(document_id) | |
| success = vector_success and metadata_success | |
| if success: | |
| logger.info(f"Successfully deleted document: {document_id}") | |
| else: | |
| logger.warning(f"Partial deletion of document: {document_id}") | |
| return success | |
| except Exception as e: | |
| logger.error(f"Failed to delete document {document_id}: {e}") | |
| return False | |
| def _process_document(self, file_path: str) -> ProcessedDocument: | |
| try: | |
| processor = DocumentProcessorFactory.create_processor(file_path, self.config) | |
| return processor.process_document(file_path) | |
| except Exception as e: | |
| logger.error(f"Document processing failed for {file_path}: {e}") | |
| # Return failed document | |
| document_id = self._generate_document_id(file_path) | |
| return ProcessedDocument( | |
| document_id=document_id, | |
| filename=Path(file_path).name, | |
| file_path=file_path, | |
| document_type=DocumentType.UNKNOWN, | |
| content="", | |
| metadata={}, | |
| processing_status=ProcessingStatus.FAILED, | |
| error_message=str(e) | |
| ) | |
| def _generate_document_id(self, file_path: str) -> str: | |
| # Use file path and modification time for uniqueness | |
| file_path_obj = Path(file_path) | |
| if file_path_obj.exists(): | |
| mtime = file_path_obj.stat().st_mtime | |
| content = f"{file_path}_{mtime}" | |
| else: | |
| content = f"{file_path}_{time.time()}" | |
| return hashlib.md5(content.encode()).hexdigest() | |
| def _serialize_metadata(self, metadata: Dict[str, Any]) -> str: | |
| try: | |
| import json | |
| return json.dumps(metadata, default=str, ensure_ascii=False) | |
| except Exception as e: | |
| logger.warning(f"Failed to serialize metadata: {e}") | |
| return "{}" | |
| def get_pipeline_stats(self) -> Dict[str, Any]: | |
| """ | |
| Get statistics about the ingestion pipeline. | |
| Returns: | |
| Dictionary with pipeline statistics | |
| """ | |
| try: | |
| # Get component statistics | |
| vector_stats = self.vector_store.get_collection_info() | |
| metadata_stats = self.metadata_manager.get_statistics() | |
| embedding_stats = self.embedding_system.get_cache_stats() | |
| return { | |
| "vector_store": vector_stats.__dict__ if vector_stats else {}, | |
| "metadata_manager": metadata_stats, | |
| "embedding_system": embedding_stats, | |
| "pipeline_config": { | |
| "chunk_size": self.chunk_size, | |
| "chunk_overlap": self.chunk_overlap, | |
| "batch_size": self.batch_size, | |
| "max_workers": self.max_workers, | |
| "enable_ocr": self.enable_ocr | |
| } | |
| } | |
| except Exception as e: | |
| logger.error(f"Failed to get pipeline stats: {e}") | |
| return {"error": str(e)} | |
| def health_check(self) -> Dict[str, bool]: | |
| """ | |
| Check health of all pipeline components. | |
| Returns: | |
| Dictionary with health status of each component | |
| """ | |
| return { | |
| "vector_store": self.vector_store.health_check(), | |
| "metadata_manager": True, # SQLite is always available if file system works | |
| "embedding_system": True # Will be checked during actual usage | |
| } | |
| if __name__=="__main__": | |
| logger.info(f"Ingestion Pipe init ..") | |
| ## Example usage | |
| import yaml | |
| with open("src/config.yaml", 'r') as f: | |
| config = yaml.safe_load(f) | |
| pipeline = DocumentIngestionPipeline(config) | |
| stats = pipeline.get_pipeline_stats() | |
| logger.info(f"Pipeline stats: {stats}") | |
| # Example single document ingestion | |
| result = pipeline.ingest_document("data/documents/3.수불확인등록.xlsx") | |
| logger.info(f"Ingestion result: {result}") | |
| # Example batch ingestion | |
| # batch_result = pipeline.ingest_batch(["sample_data/sample.pdf", "sample_data/sample.docx"]) | |
| # logger.info(f"Batch ingestion stats: {batch_result}") | |