rag_korean_manufacturing_docs / src /ingestion_pipeline.py
A7m0d's picture
Upload folder using huggingface_hub
7dfe46c verified
import logging
import time
from pathlib import Path
from typing import Dict, List, Any, Optional, Tuple
from dataclasses import dataclass
from concurrent.futures import ThreadPoolExecutor, as_completed
import hashlib
import os
import sys
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from src.document_processor import (
DocumentProcessor, DocumentProcessorFactory, ProcessedDocument,
DocumentChunk, ProcessingStatus, DocumentType
)
from src.embedding_system import EmbeddingSystem
from src.vector_store import QdrantVectorStore
from src.metadata_manager import MetadataManager, DocumentMetadata
from src.image_processor import ImageProcessor
try:
from logger.custom_logger import CustomLoggerTracker
custom_log = CustomLoggerTracker()
logger = custom_log.get_logger("ingestion_pipeline")
except ImportError:
# Fallback to standard logging if custom logger not available
logger = logging.getLogger("ingestion_pipeline")
@dataclass
class IngestionResult:
"""Result of document ingestion."""
document_id: str
filename: str
success: bool
processing_time: float
chunks_created: int
chunks_indexed: int
error_message: Optional[str] = None
warnings: List[str] = None
def __post_init__(self):
if self.warnings is None:
self.warnings = []
@dataclass
class IngestionStats:
"""Statistics for batch ingestion."""
total_documents: int
successful_documents: int
failed_documents: int
total_chunks: int
total_processing_time: float
average_processing_time: float
documents_by_type: Dict[str, int]
errors: List[str]
def jina_embeddings(text: str) -> List[float]:
JINA_API_KEY= "jina_a75b55a8a9524bb697ea016b164211ebF5IduSgA0Ku8lmI0pS9fnXoZ83Su"
import requests
headers = {
'Content-Type': 'application/json',
'Authorization': 'Bearer jina_a75b55a8a9524bb697ea016b164211ebF5IduSgA0Ku8lmI0pS9fnXoZ83Su'}
data = {
"model": "jina-embeddings-v3",
"task": "retrieval.passage",
"input": text}
response = requests.post('https://api.jina.ai/v1/embeddings', headers=headers, json=data)
return response.json()['data'][0]['embedding']
class DocumentIngestionPipeline:
def __init__(self, config: Dict[str, Any]):
self.config = config
# Initialize components
self.embedding_system = EmbeddingSystem(config)
self.vector_store = QdrantVectorStore(config)
self.metadata_manager = MetadataManager(config)
# Initialize components with correct vector dimensions
self.vector_size = config.get('vector_size', 1024) # Match Jina's dimension
self.config['vector_size'] = self.vector_size # Update config for other components
# Initialize image processor for OCR
self.image_processor = ImageProcessor(config)
# Pipeline settings
self.chunk_size = config.get('chunk_size', 512)
self.chunk_overlap = config.get('chunk_overlap', 50)
self.batch_size = config.get('embedding_batch_size', 32)
self.max_workers = config.get('max_workers', 4)
self.enable_ocr = config.get('image_processing', True)
logger.info(f"Document ingestion pipeline initialized")
def ingest_document(self, file_path: str, document_id: Optional[str] = None) -> IngestionResult:
"""
Ingest a single document through the complete pipeline.
Args:
file_path: Path to the document file
document_id: Optional custom document ID
Returns:
IngestionResult with processing details
"""
start_time = time.time()
file_path_obj = Path(file_path)
filename = file_path_obj.name
try:
logger.info(f"Starting ingestion of document: {filename}")
# Generate document ID if not provided
if not document_id:
document_id = self._generate_document_id(file_path)
# Check if document already exists
existing_metadata = self.metadata_manager.get_document_metadata(document_id)
if existing_metadata and existing_metadata.processing_status == ProcessingStatus.COMPLETED:
logger.info(f"Document {filename} already processed, skipping")
return IngestionResult(
document_id=document_id,
filename=filename,
success=True,
processing_time=0.0,
chunks_created=existing_metadata.total_chunks,
chunks_indexed=existing_metadata.total_chunks,
warnings=["Document already processed"]
)
# Step 1: Process document
processed_doc = self._process_document(file_path)
if processed_doc.processing_status == ProcessingStatus.FAILED:
return IngestionResult(
document_id=document_id,
filename=filename,
success=False,
processing_time=time.time() - start_time,
chunks_created=0,
chunks_indexed=0,
error_message=processed_doc.error_message
)
# Step 2: Process images with OCR if enabled
if self.enable_ocr and processed_doc.images:
processed_doc.images = self.image_processor.batch_process_images(processed_doc.images)
# Step 3: Create document chunks
processor = DocumentProcessorFactory.create_processor(file_path, self.config)
chunks = processor.extract_chunks(processed_doc, self.chunk_size, self.chunk_overlap)
if not chunks:
logger.warning(f"No chunks created for document: {filename}")
return IngestionResult(
document_id=document_id,
filename=filename,
success=False,
processing_time=time.time() - start_time,
chunks_created=0,
chunks_indexed=0,
error_message="No content chunks could be created"
)
# Step 4: Generate embeddings
chunk_texts = [chunk.content for chunk in chunks]
logger.info(chunk_texts[:2])
# embeddings = self.embedding_system.generate_embeddings(chunk_texts)
embeddings = [jina_embeddings(text) for text in chunk_texts]
if not embeddings or len(embeddings) != len(chunks):
logger.error(f"Embedding generation failed for document: {filename}")
return IngestionResult(
document_id=document_id,
filename=filename,
success=False,
processing_time=time.time() - start_time,
chunks_created=len(chunks),
chunks_indexed=0,
error_message="Failed to generate embeddings"
)
# Attach embeddings to chunks
for chunk, embedding in zip(chunks, embeddings):
chunk.embedding = embedding
# Step 5: Store in vector database
vector_success = self.vector_store.add_documents(chunks)
if not vector_success:
logger.error(f"Failed to store vectors for document: {filename}")
return IngestionResult(
document_id=document_id,
filename=filename,
success=False,
processing_time=time.time() - start_time,
chunks_created=len(chunks),
chunks_indexed=0,
error_message="Failed to store document vectors"
)
# Step 6: Store metadata
processing_time = time.time() - start_time
metadata = DocumentMetadata(
document_id=document_id,
filename=filename,
file_path=file_path,
file_type=processed_doc.document_type.value,
upload_timestamp=processed_doc.processing_timestamp,
processing_status=ProcessingStatus.COMPLETED,
total_chunks=len(chunks),
file_size=processed_doc.file_size,
checksum=processed_doc.checksum,
processing_time=processing_time,
metadata_json=self._serialize_metadata(processed_doc.metadata)
)
metadata_success = self.metadata_manager.store_document_metadata(document_id, metadata)
if not metadata_success:
logger.warning(f"Failed to store metadata for document: {filename}")
logger.info(f"Successfully ingested document {filename}: {len(chunks)} chunks in {processing_time:.2f}s")
return IngestionResult(
document_id=document_id,
filename=filename,
success=True,
processing_time=processing_time,
chunks_created=len(chunks),
chunks_indexed=len(chunks)
)
except Exception as e:
error_msg = f"Ingestion failed for {filename}: {str(e)}"
logger.error(error_msg)
# Update metadata with error status
if document_id:
self.metadata_manager.update_document_status(
document_id,
ProcessingStatus.FAILED,
error_msg,
time.time() - start_time
)
return IngestionResult(
document_id=document_id or "unknown",
filename=filename,
success=False,
processing_time=time.time() - start_time,
chunks_created=0,
chunks_indexed=0,
error_message=error_msg
)
def ingest_batch(self, file_paths: List[str], max_workers: Optional[int] = None) -> IngestionStats:
"""
Ingest multiple documents in parallel.
Args:
file_paths: List of file paths to process
max_workers: Maximum number of worker threads
Returns:
IngestionStats with batch processing results
"""
start_time = time.time()
max_workers = max_workers or self.max_workers
logger.info(f"Starting batch ingestion of {len(file_paths)} documents with {max_workers} workers")
results = []
errors = []
documents_by_type = {}
# Process documents in parallel
with ThreadPoolExecutor(max_workers=max_workers) as executor:
# Submit all tasks
future_to_path = {
executor.submit(self.ingest_document, file_path): file_path
for file_path in file_paths
}
# Collect results
for future in as_completed(future_to_path):
file_path = future_to_path[future]
try:
result = future.result()
results.append(result)
# Track document types
file_ext = Path(file_path).suffix.lower()
documents_by_type[file_ext] = documents_by_type.get(file_ext, 0) + 1
if not result.success:
errors.append(f"{result.filename}: {result.error_message}")
except Exception as e:
error_msg = f"Failed to process {file_path}: {str(e)}"
errors.append(error_msg)
logger.error(error_msg)
# Calculate statistics
successful_results = [r for r in results if r.success]
failed_results = [r for r in results if not r.success]
total_processing_time = time.time() - start_time
total_chunks = sum(r.chunks_indexed for r in successful_results)
avg_processing_time = (
sum(r.processing_time for r in results) / len(results)
if results else 0.0
)
stats = IngestionStats(
total_documents=len(file_paths),
successful_documents=len(successful_results),
failed_documents=len(failed_results),
total_chunks=total_chunks,
total_processing_time=total_processing_time,
average_processing_time=avg_processing_time,
documents_by_type=documents_by_type,
errors=errors
)
logger.info(f"Batch ingestion completed: {stats.successful_documents}/{stats.total_documents} "
f"documents processed successfully in {total_processing_time:.2f}s")
return stats
def reprocess_document(self, document_id: str) -> IngestionResult:
"""
Reprocess an existing document.
Args:
document_id: ID of the document to reprocess
Returns:
IngestionResult with reprocessing details
"""
# Get existing metadata
metadata = self.metadata_manager.get_document_metadata(document_id)
if not metadata:
return IngestionResult(
document_id=document_id,
filename="unknown",
success=False,
processing_time=0.0,
chunks_created=0,
chunks_indexed=0,
error_message="Document not found in metadata"
)
# Delete existing vectors
self.vector_store.delete_document(document_id)
# Reprocess the document
return self.ingest_document(metadata.file_path, document_id)
def delete_document(self, document_id: str) -> bool:
"""
Delete a document and all associated data.
Args:
document_id: ID of the document to delete
Returns:
True if successful, False otherwise
"""
try:
# Delete from vector store
vector_success = self.vector_store.delete_document(document_id)
# Delete from metadata
metadata_success = self.metadata_manager.delete_document(document_id)
success = vector_success and metadata_success
if success:
logger.info(f"Successfully deleted document: {document_id}")
else:
logger.warning(f"Partial deletion of document: {document_id}")
return success
except Exception as e:
logger.error(f"Failed to delete document {document_id}: {e}")
return False
def _process_document(self, file_path: str) -> ProcessedDocument:
try:
processor = DocumentProcessorFactory.create_processor(file_path, self.config)
return processor.process_document(file_path)
except Exception as e:
logger.error(f"Document processing failed for {file_path}: {e}")
# Return failed document
document_id = self._generate_document_id(file_path)
return ProcessedDocument(
document_id=document_id,
filename=Path(file_path).name,
file_path=file_path,
document_type=DocumentType.UNKNOWN,
content="",
metadata={},
processing_status=ProcessingStatus.FAILED,
error_message=str(e)
)
def _generate_document_id(self, file_path: str) -> str:
# Use file path and modification time for uniqueness
file_path_obj = Path(file_path)
if file_path_obj.exists():
mtime = file_path_obj.stat().st_mtime
content = f"{file_path}_{mtime}"
else:
content = f"{file_path}_{time.time()}"
return hashlib.md5(content.encode()).hexdigest()
def _serialize_metadata(self, metadata: Dict[str, Any]) -> str:
try:
import json
return json.dumps(metadata, default=str, ensure_ascii=False)
except Exception as e:
logger.warning(f"Failed to serialize metadata: {e}")
return "{}"
def get_pipeline_stats(self) -> Dict[str, Any]:
"""
Get statistics about the ingestion pipeline.
Returns:
Dictionary with pipeline statistics
"""
try:
# Get component statistics
vector_stats = self.vector_store.get_collection_info()
metadata_stats = self.metadata_manager.get_statistics()
embedding_stats = self.embedding_system.get_cache_stats()
return {
"vector_store": vector_stats.__dict__ if vector_stats else {},
"metadata_manager": metadata_stats,
"embedding_system": embedding_stats,
"pipeline_config": {
"chunk_size": self.chunk_size,
"chunk_overlap": self.chunk_overlap,
"batch_size": self.batch_size,
"max_workers": self.max_workers,
"enable_ocr": self.enable_ocr
}
}
except Exception as e:
logger.error(f"Failed to get pipeline stats: {e}")
return {"error": str(e)}
def health_check(self) -> Dict[str, bool]:
"""
Check health of all pipeline components.
Returns:
Dictionary with health status of each component
"""
return {
"vector_store": self.vector_store.health_check(),
"metadata_manager": True, # SQLite is always available if file system works
"embedding_system": True # Will be checked during actual usage
}
if __name__=="__main__":
logger.info(f"Ingestion Pipe init ..")
## Example usage
import yaml
with open("src/config.yaml", 'r') as f:
config = yaml.safe_load(f)
pipeline = DocumentIngestionPipeline(config)
stats = pipeline.get_pipeline_stats()
logger.info(f"Pipeline stats: {stats}")
# Example single document ingestion
result = pipeline.ingest_document("data/documents/3.수불확인등록.xlsx")
logger.info(f"Ingestion result: {result}")
# Example batch ingestion
# batch_result = pipeline.ingest_batch(["sample_data/sample.pdf", "sample_data/sample.docx"])
# logger.info(f"Batch ingestion stats: {batch_result}")