Spaces:
Sleeping
Sleeping
| import logging | |
| import sqlite3 | |
| from pathlib import Path | |
| from typing import Dict, List, Any, Optional | |
| from dataclasses import dataclass, asdict | |
| from datetime import datetime | |
| import json | |
| import os | |
| import sys | |
| sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) | |
| from src.document_processor import ProcessingStatus, DocumentType | |
| try: | |
| from logger.custom_logger import CustomLoggerTracker | |
| custom_log = CustomLoggerTracker() | |
| logger = custom_log.get_logger("meta_manager") | |
| except ImportError: | |
| # Fallback to standard logging if custom logger not available | |
| logger = logging.getLogger("meta_manager") | |
| class DocumentMetadata: | |
| """Metadata for a processed document.""" | |
| document_id: str | |
| filename: str | |
| file_path: str | |
| file_type: str | |
| upload_timestamp: datetime | |
| processing_status: ProcessingStatus | |
| total_chunks: int | |
| file_size: int | |
| checksum: str | |
| error_message: Optional[str] = None | |
| processing_time: Optional[float] = None | |
| metadata_json: Optional[str] = None # Additional metadata as JSON | |
| class CitationInfo: | |
| """Citation information for a document chunk.""" | |
| chunk_id: str | |
| document_id: str | |
| source_document: str | |
| location_reference: str | |
| extraction_method: str | |
| confidence_level: float | |
| page_number: Optional[int] = None | |
| worksheet_name: Optional[str] = None | |
| cell_range: Optional[str] = None | |
| section_title: Optional[str] = None | |
| class MetadataManager: | |
| """ | |
| SQLite-based metadata manager for document tracking and citation management. | |
| This manager provides persistent storage for document metadata, processing status, | |
| and citation information with efficient querying capabilities. | |
| """ | |
| def __init__(self, config: Dict[str, Any]): | |
| """ | |
| Initialize the metadata manager. | |
| Args: | |
| config: Configuration dictionary containing database settings | |
| """ | |
| self.config = config | |
| self.db_path = config.get('metadata_db_path', './data/metadata.db') | |
| # Ensure database directory exists | |
| Path(self.db_path).parent.mkdir(parents=True, exist_ok=True) | |
| # Initialize database | |
| self._init_database() | |
| logger.info(f"Metadata manager initialized with database: {self.db_path}") | |
| def _init_database(self): | |
| """Initialize the SQLite database with required tables.""" | |
| try: | |
| with sqlite3.connect(self.db_path) as conn: | |
| cursor = conn.cursor() | |
| # Create documents table | |
| cursor.execute(''' | |
| CREATE TABLE IF NOT EXISTS documents ( | |
| document_id TEXT PRIMARY KEY, | |
| filename TEXT NOT NULL, | |
| file_path TEXT NOT NULL, | |
| file_type TEXT NOT NULL, | |
| upload_timestamp TEXT NOT NULL, | |
| processing_status TEXT NOT NULL, | |
| total_chunks INTEGER DEFAULT 0, | |
| file_size INTEGER DEFAULT 0, | |
| checksum TEXT, | |
| error_message TEXT, | |
| processing_time REAL, | |
| metadata_json TEXT, | |
| created_at TEXT DEFAULT CURRENT_TIMESTAMP, | |
| updated_at TEXT DEFAULT CURRENT_TIMESTAMP | |
| ) | |
| ''') | |
| # Create citations table | |
| cursor.execute(''' | |
| CREATE TABLE IF NOT EXISTS citations ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| chunk_id TEXT NOT NULL, | |
| document_id TEXT NOT NULL, | |
| source_document TEXT NOT NULL, | |
| location_reference TEXT NOT NULL, | |
| extraction_method TEXT NOT NULL, | |
| confidence_level REAL NOT NULL, | |
| page_number INTEGER, | |
| worksheet_name TEXT, | |
| cell_range TEXT, | |
| section_title TEXT, | |
| created_at TEXT DEFAULT CURRENT_TIMESTAMP, | |
| FOREIGN KEY (document_id) REFERENCES documents (document_id) | |
| ) | |
| ''') | |
| # Create indexes for efficient querying | |
| cursor.execute('CREATE INDEX IF NOT EXISTS idx_documents_status ON documents (processing_status)') | |
| cursor.execute('CREATE INDEX IF NOT EXISTS idx_documents_type ON documents (file_type)') | |
| cursor.execute('CREATE INDEX IF NOT EXISTS idx_citations_document ON citations (document_id)') | |
| cursor.execute('CREATE INDEX IF NOT EXISTS idx_citations_chunk ON citations (chunk_id)') | |
| conn.commit() | |
| logger.debug("Database tables initialized successfully") | |
| except Exception as e: | |
| logger.error(f"Failed to initialize database: {e}") | |
| raise | |
| def store_document_metadata(self, doc_id: str, metadata: DocumentMetadata) -> bool: | |
| """ | |
| Store document metadata in the database. | |
| Args: | |
| doc_id: Document ID | |
| metadata: DocumentMetadata object | |
| Returns: | |
| True if successful, False otherwise | |
| """ | |
| try: | |
| with sqlite3.connect(self.db_path) as conn: | |
| cursor = conn.cursor() | |
| # Convert datetime to ISO string | |
| upload_timestamp = metadata.upload_timestamp.isoformat() | |
| cursor.execute(''' | |
| INSERT OR REPLACE INTO documents ( | |
| document_id, filename, file_path, file_type, upload_timestamp, | |
| processing_status, total_chunks, file_size, checksum, | |
| error_message, processing_time, metadata_json, updated_at | |
| ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) | |
| ''', ( | |
| doc_id, | |
| metadata.filename, | |
| metadata.file_path, | |
| metadata.file_type, | |
| upload_timestamp, | |
| metadata.processing_status.value, | |
| metadata.total_chunks, | |
| metadata.file_size, | |
| metadata.checksum, | |
| metadata.error_message, | |
| metadata.processing_time, | |
| metadata.metadata_json, | |
| datetime.now().isoformat() | |
| )) | |
| conn.commit() | |
| logger.debug(f"Stored metadata for document: {doc_id}") | |
| return True | |
| except Exception as e: | |
| logger.error(f"Failed to store document metadata: {e}") | |
| return False | |
| def get_document_metadata(self, doc_id: str) -> Optional[DocumentMetadata]: | |
| """ | |
| Retrieve document metadata by ID. | |
| Args: | |
| doc_id: Document ID | |
| Returns: | |
| DocumentMetadata object or None if not found | |
| """ | |
| try: | |
| with sqlite3.connect(self.db_path) as conn: | |
| cursor = conn.cursor() | |
| cursor.execute(''' | |
| SELECT document_id, filename, file_path, file_type, upload_timestamp, | |
| processing_status, total_chunks, file_size, checksum, | |
| error_message, processing_time, metadata_json | |
| FROM documents WHERE document_id = ? | |
| ''', (doc_id,)) | |
| row = cursor.fetchone() | |
| if row: | |
| return DocumentMetadata( | |
| document_id=row[0], | |
| filename=row[1], | |
| file_path=row[2], | |
| file_type=row[3], | |
| upload_timestamp=datetime.fromisoformat(row[4]), | |
| processing_status=ProcessingStatus(row[5]), | |
| total_chunks=row[6], | |
| file_size=row[7], | |
| checksum=row[8], | |
| error_message=row[9], | |
| processing_time=row[10], | |
| metadata_json=row[11] | |
| ) | |
| return None | |
| except Exception as e: | |
| logger.error(f"Failed to get document metadata: {e}") | |
| return None | |
| def update_document_status(self, doc_id: str, status: ProcessingStatus, | |
| error_message: Optional[str] = None, | |
| processing_time: Optional[float] = None) -> bool: | |
| """ | |
| Update document processing status. | |
| Args: | |
| doc_id: Document ID | |
| status: New processing status | |
| error_message: Optional error message | |
| processing_time: Optional processing time | |
| Returns: | |
| True if successful, False otherwise | |
| """ | |
| try: | |
| with sqlite3.connect(self.db_path) as conn: | |
| cursor = conn.cursor() | |
| cursor.execute(''' | |
| UPDATE documents | |
| SET processing_status = ?, error_message = ?, processing_time = ?, updated_at = ? | |
| WHERE document_id = ? | |
| ''', ( | |
| status.value, | |
| error_message, | |
| processing_time, | |
| datetime.now().isoformat(), | |
| doc_id | |
| )) | |
| conn.commit() | |
| logger.debug(f"Updated status for document {doc_id}: {status.value}") | |
| return True | |
| except Exception as e: | |
| logger.error(f"Failed to update document status: {e}") | |
| return False | |
| def store_citation_info(self, citation: CitationInfo) -> bool: | |
| """ | |
| Store citation information. | |
| Args: | |
| citation: CitationInfo object | |
| Returns: | |
| True if successful, False otherwise | |
| """ | |
| try: | |
| with sqlite3.connect(self.db_path) as conn: | |
| cursor = conn.cursor() | |
| cursor.execute(''' | |
| INSERT OR REPLACE INTO citations ( | |
| chunk_id, document_id, source_document, location_reference, | |
| extraction_method, confidence_level, page_number, | |
| worksheet_name, cell_range, section_title | |
| ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) | |
| ''', ( | |
| citation.chunk_id, | |
| citation.document_id, | |
| citation.source_document, | |
| citation.location_reference, | |
| citation.extraction_method, | |
| citation.confidence_level, | |
| citation.page_number, | |
| citation.worksheet_name, | |
| citation.cell_range, | |
| citation.section_title | |
| )) | |
| conn.commit() | |
| logger.debug(f"Stored citation for chunk: {citation.chunk_id}") | |
| return True | |
| except Exception as e: | |
| logger.error(f"Failed to store citation info: {e}") | |
| return False | |
| def get_citation_info(self, chunk_id: str) -> Optional[CitationInfo]: | |
| """ | |
| Retrieve citation information by chunk ID. | |
| Args: | |
| chunk_id: Chunk ID | |
| Returns: | |
| CitationInfo object or None if not found | |
| """ | |
| try: | |
| with sqlite3.connect(self.db_path) as conn: | |
| cursor = conn.cursor() | |
| cursor.execute(''' | |
| SELECT chunk_id, document_id, source_document, location_reference, | |
| extraction_method, confidence_level, page_number, | |
| worksheet_name, cell_range, section_title | |
| FROM citations WHERE chunk_id = ? | |
| ''', (chunk_id,)) | |
| row = cursor.fetchone() | |
| if row: | |
| return CitationInfo( | |
| chunk_id=row[0], | |
| document_id=row[1], | |
| source_document=row[2], | |
| location_reference=row[3], | |
| extraction_method=row[4], | |
| confidence_level=row[5], | |
| page_number=row[6], | |
| worksheet_name=row[7], | |
| cell_range=row[8], | |
| section_title=row[9] | |
| ) | |
| return None | |
| except Exception as e: | |
| logger.error(f"Failed to get citation info: {e}") | |
| return None | |
| def list_documents(self, status: Optional[ProcessingStatus] = None, | |
| file_type: Optional[str] = None, | |
| limit: int = 100) -> List[DocumentMetadata]: | |
| try: | |
| with sqlite3.connect(self.db_path) as conn: | |
| cursor = conn.cursor() | |
| query = ''' | |
| SELECT document_id, filename, file_path, file_type, upload_timestamp, | |
| processing_status, total_chunks, file_size, checksum, | |
| error_message, processing_time, metadata_json | |
| FROM documents | |
| ''' | |
| conditions = [] | |
| params = [] | |
| if status: | |
| conditions.append('processing_status = ?') | |
| params.append(status.value) | |
| if file_type: | |
| conditions.append('file_type = ?') | |
| params.append(file_type) | |
| if conditions: | |
| query += ' WHERE ' + ' AND '.join(conditions) | |
| query += ' ORDER BY upload_timestamp DESC LIMIT ?' | |
| params.append(limit) | |
| cursor.execute(query, params) | |
| rows = cursor.fetchall() | |
| documents = [] | |
| for row in rows: | |
| documents.append(DocumentMetadata( | |
| document_id=row[0], | |
| filename=row[1], | |
| file_path=row[2], | |
| file_type=row[3], | |
| upload_timestamp=datetime.fromisoformat(row[4]), | |
| processing_status=ProcessingStatus(row[5]), | |
| total_chunks=row[6], | |
| file_size=row[7], | |
| checksum=row[8], | |
| error_message=row[9], | |
| processing_time=row[10], | |
| metadata_json=row[11] | |
| )) | |
| return documents | |
| except Exception as e: | |
| logger.error(f"Failed to list documents: {e}") | |
| return [] | |
| def delete_document(self, doc_id: str) -> bool: | |
| try: | |
| with sqlite3.connect(self.db_path) as conn: | |
| cursor = conn.cursor() | |
| # Delete citations first (foreign key constraint) | |
| cursor.execute('DELETE FROM citations WHERE document_id = ?', (doc_id,)) | |
| # Delete document | |
| cursor.execute('DELETE FROM documents WHERE document_id = ?', (doc_id,)) | |
| conn.commit() | |
| logger.info(f"Deleted document and citations: {doc_id}") | |
| return True | |
| except Exception as e: | |
| logger.error(f"Failed to delete document: {e}") | |
| return False | |
| def get_statistics(self) -> Dict[str, Any]: | |
| """ | |
| Get database statistics. | |
| Returns: | |
| Dictionary with database statistics | |
| """ | |
| try: | |
| with sqlite3.connect(self.db_path) as conn: | |
| cursor = conn.cursor() | |
| # Count documents by status | |
| cursor.execute(''' | |
| SELECT processing_status, COUNT(*) | |
| FROM documents | |
| GROUP BY processing_status | |
| ''') | |
| status_counts = dict(cursor.fetchall()) | |
| # Count documents by type | |
| cursor.execute(''' | |
| SELECT file_type, COUNT(*) | |
| FROM documents | |
| GROUP BY file_type | |
| ''') | |
| type_counts = dict(cursor.fetchall()) | |
| # Total statistics | |
| cursor.execute('SELECT COUNT(*) FROM documents') | |
| total_documents = cursor.fetchone()[0] | |
| cursor.execute('SELECT COUNT(*) FROM citations') | |
| total_citations = cursor.fetchone()[0] | |
| cursor.execute('SELECT SUM(total_chunks) FROM documents') | |
| total_chunks = cursor.fetchone()[0] or 0 | |
| cursor.execute('SELECT SUM(file_size) FROM documents') | |
| total_file_size = cursor.fetchone()[0] or 0 | |
| return { | |
| 'total_documents': total_documents, | |
| 'total_citations': total_citations, | |
| 'total_chunks': total_chunks, | |
| 'total_file_size': total_file_size, | |
| 'documents_by_status': status_counts, | |
| 'documents_by_type': type_counts, | |
| 'database_path': self.db_path | |
| } | |
| except Exception as e: | |
| logger.error(f"Failed to get statistics: {e}") | |
| return {'error': str(e)} | |
| def cleanup_orphaned_citations(self) -> int: | |
| """ | |
| Clean up citations that reference non-existent documents. | |
| Returns: | |
| Number of orphaned citations removed | |
| """ | |
| try: | |
| with sqlite3.connect(self.db_path) as conn: | |
| cursor = conn.cursor() | |
| cursor.execute(''' | |
| DELETE FROM citations | |
| WHERE document_id NOT IN (SELECT document_id FROM documents) | |
| ''') | |
| removed_count = cursor.rowcount | |
| conn.commit() | |
| logger.info(f"Cleaned up {removed_count} orphaned citations") | |
| return removed_count | |
| except Exception as e: | |
| logger.error(f"Failed to cleanup orphaned citations: {e}") | |
| return 0 | |
| if __name__=="__main__": | |
| logger.info(f"metadata init ..") |