rag_korean_manufacturing_docs / src /metadata_manager.py
A7m0d's picture
Upload folder using huggingface_hub
7dfe46c verified
import logging
import sqlite3
from pathlib import Path
from typing import Dict, List, Any, Optional
from dataclasses import dataclass, asdict
from datetime import datetime
import json
import os
import sys
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from src.document_processor import ProcessingStatus, DocumentType
try:
from logger.custom_logger import CustomLoggerTracker
custom_log = CustomLoggerTracker()
logger = custom_log.get_logger("meta_manager")
except ImportError:
# Fallback to standard logging if custom logger not available
logger = logging.getLogger("meta_manager")
@dataclass
class DocumentMetadata:
"""Metadata for a processed document."""
document_id: str
filename: str
file_path: str
file_type: str
upload_timestamp: datetime
processing_status: ProcessingStatus
total_chunks: int
file_size: int
checksum: str
error_message: Optional[str] = None
processing_time: Optional[float] = None
metadata_json: Optional[str] = None # Additional metadata as JSON
@dataclass
class CitationInfo:
"""Citation information for a document chunk."""
chunk_id: str
document_id: str
source_document: str
location_reference: str
extraction_method: str
confidence_level: float
page_number: Optional[int] = None
worksheet_name: Optional[str] = None
cell_range: Optional[str] = None
section_title: Optional[str] = None
class MetadataManager:
"""
SQLite-based metadata manager for document tracking and citation management.
This manager provides persistent storage for document metadata, processing status,
and citation information with efficient querying capabilities.
"""
def __init__(self, config: Dict[str, Any]):
"""
Initialize the metadata manager.
Args:
config: Configuration dictionary containing database settings
"""
self.config = config
self.db_path = config.get('metadata_db_path', './data/metadata.db')
# Ensure database directory exists
Path(self.db_path).parent.mkdir(parents=True, exist_ok=True)
# Initialize database
self._init_database()
logger.info(f"Metadata manager initialized with database: {self.db_path}")
def _init_database(self):
"""Initialize the SQLite database with required tables."""
try:
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
# Create documents table
cursor.execute('''
CREATE TABLE IF NOT EXISTS documents (
document_id TEXT PRIMARY KEY,
filename TEXT NOT NULL,
file_path TEXT NOT NULL,
file_type TEXT NOT NULL,
upload_timestamp TEXT NOT NULL,
processing_status TEXT NOT NULL,
total_chunks INTEGER DEFAULT 0,
file_size INTEGER DEFAULT 0,
checksum TEXT,
error_message TEXT,
processing_time REAL,
metadata_json TEXT,
created_at TEXT DEFAULT CURRENT_TIMESTAMP,
updated_at TEXT DEFAULT CURRENT_TIMESTAMP
)
''')
# Create citations table
cursor.execute('''
CREATE TABLE IF NOT EXISTS citations (
id INTEGER PRIMARY KEY AUTOINCREMENT,
chunk_id TEXT NOT NULL,
document_id TEXT NOT NULL,
source_document TEXT NOT NULL,
location_reference TEXT NOT NULL,
extraction_method TEXT NOT NULL,
confidence_level REAL NOT NULL,
page_number INTEGER,
worksheet_name TEXT,
cell_range TEXT,
section_title TEXT,
created_at TEXT DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (document_id) REFERENCES documents (document_id)
)
''')
# Create indexes for efficient querying
cursor.execute('CREATE INDEX IF NOT EXISTS idx_documents_status ON documents (processing_status)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_documents_type ON documents (file_type)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_citations_document ON citations (document_id)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_citations_chunk ON citations (chunk_id)')
conn.commit()
logger.debug("Database tables initialized successfully")
except Exception as e:
logger.error(f"Failed to initialize database: {e}")
raise
def store_document_metadata(self, doc_id: str, metadata: DocumentMetadata) -> bool:
"""
Store document metadata in the database.
Args:
doc_id: Document ID
metadata: DocumentMetadata object
Returns:
True if successful, False otherwise
"""
try:
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
# Convert datetime to ISO string
upload_timestamp = metadata.upload_timestamp.isoformat()
cursor.execute('''
INSERT OR REPLACE INTO documents (
document_id, filename, file_path, file_type, upload_timestamp,
processing_status, total_chunks, file_size, checksum,
error_message, processing_time, metadata_json, updated_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
''', (
doc_id,
metadata.filename,
metadata.file_path,
metadata.file_type,
upload_timestamp,
metadata.processing_status.value,
metadata.total_chunks,
metadata.file_size,
metadata.checksum,
metadata.error_message,
metadata.processing_time,
metadata.metadata_json,
datetime.now().isoformat()
))
conn.commit()
logger.debug(f"Stored metadata for document: {doc_id}")
return True
except Exception as e:
logger.error(f"Failed to store document metadata: {e}")
return False
def get_document_metadata(self, doc_id: str) -> Optional[DocumentMetadata]:
"""
Retrieve document metadata by ID.
Args:
doc_id: Document ID
Returns:
DocumentMetadata object or None if not found
"""
try:
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute('''
SELECT document_id, filename, file_path, file_type, upload_timestamp,
processing_status, total_chunks, file_size, checksum,
error_message, processing_time, metadata_json
FROM documents WHERE document_id = ?
''', (doc_id,))
row = cursor.fetchone()
if row:
return DocumentMetadata(
document_id=row[0],
filename=row[1],
file_path=row[2],
file_type=row[3],
upload_timestamp=datetime.fromisoformat(row[4]),
processing_status=ProcessingStatus(row[5]),
total_chunks=row[6],
file_size=row[7],
checksum=row[8],
error_message=row[9],
processing_time=row[10],
metadata_json=row[11]
)
return None
except Exception as e:
logger.error(f"Failed to get document metadata: {e}")
return None
def update_document_status(self, doc_id: str, status: ProcessingStatus,
error_message: Optional[str] = None,
processing_time: Optional[float] = None) -> bool:
"""
Update document processing status.
Args:
doc_id: Document ID
status: New processing status
error_message: Optional error message
processing_time: Optional processing time
Returns:
True if successful, False otherwise
"""
try:
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute('''
UPDATE documents
SET processing_status = ?, error_message = ?, processing_time = ?, updated_at = ?
WHERE document_id = ?
''', (
status.value,
error_message,
processing_time,
datetime.now().isoformat(),
doc_id
))
conn.commit()
logger.debug(f"Updated status for document {doc_id}: {status.value}")
return True
except Exception as e:
logger.error(f"Failed to update document status: {e}")
return False
def store_citation_info(self, citation: CitationInfo) -> bool:
"""
Store citation information.
Args:
citation: CitationInfo object
Returns:
True if successful, False otherwise
"""
try:
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute('''
INSERT OR REPLACE INTO citations (
chunk_id, document_id, source_document, location_reference,
extraction_method, confidence_level, page_number,
worksheet_name, cell_range, section_title
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
''', (
citation.chunk_id,
citation.document_id,
citation.source_document,
citation.location_reference,
citation.extraction_method,
citation.confidence_level,
citation.page_number,
citation.worksheet_name,
citation.cell_range,
citation.section_title
))
conn.commit()
logger.debug(f"Stored citation for chunk: {citation.chunk_id}")
return True
except Exception as e:
logger.error(f"Failed to store citation info: {e}")
return False
def get_citation_info(self, chunk_id: str) -> Optional[CitationInfo]:
"""
Retrieve citation information by chunk ID.
Args:
chunk_id: Chunk ID
Returns:
CitationInfo object or None if not found
"""
try:
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute('''
SELECT chunk_id, document_id, source_document, location_reference,
extraction_method, confidence_level, page_number,
worksheet_name, cell_range, section_title
FROM citations WHERE chunk_id = ?
''', (chunk_id,))
row = cursor.fetchone()
if row:
return CitationInfo(
chunk_id=row[0],
document_id=row[1],
source_document=row[2],
location_reference=row[3],
extraction_method=row[4],
confidence_level=row[5],
page_number=row[6],
worksheet_name=row[7],
cell_range=row[8],
section_title=row[9]
)
return None
except Exception as e:
logger.error(f"Failed to get citation info: {e}")
return None
def list_documents(self, status: Optional[ProcessingStatus] = None,
file_type: Optional[str] = None,
limit: int = 100) -> List[DocumentMetadata]:
try:
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
query = '''
SELECT document_id, filename, file_path, file_type, upload_timestamp,
processing_status, total_chunks, file_size, checksum,
error_message, processing_time, metadata_json
FROM documents
'''
conditions = []
params = []
if status:
conditions.append('processing_status = ?')
params.append(status.value)
if file_type:
conditions.append('file_type = ?')
params.append(file_type)
if conditions:
query += ' WHERE ' + ' AND '.join(conditions)
query += ' ORDER BY upload_timestamp DESC LIMIT ?'
params.append(limit)
cursor.execute(query, params)
rows = cursor.fetchall()
documents = []
for row in rows:
documents.append(DocumentMetadata(
document_id=row[0],
filename=row[1],
file_path=row[2],
file_type=row[3],
upload_timestamp=datetime.fromisoformat(row[4]),
processing_status=ProcessingStatus(row[5]),
total_chunks=row[6],
file_size=row[7],
checksum=row[8],
error_message=row[9],
processing_time=row[10],
metadata_json=row[11]
))
return documents
except Exception as e:
logger.error(f"Failed to list documents: {e}")
return []
def delete_document(self, doc_id: str) -> bool:
try:
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
# Delete citations first (foreign key constraint)
cursor.execute('DELETE FROM citations WHERE document_id = ?', (doc_id,))
# Delete document
cursor.execute('DELETE FROM documents WHERE document_id = ?', (doc_id,))
conn.commit()
logger.info(f"Deleted document and citations: {doc_id}")
return True
except Exception as e:
logger.error(f"Failed to delete document: {e}")
return False
def get_statistics(self) -> Dict[str, Any]:
"""
Get database statistics.
Returns:
Dictionary with database statistics
"""
try:
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
# Count documents by status
cursor.execute('''
SELECT processing_status, COUNT(*)
FROM documents
GROUP BY processing_status
''')
status_counts = dict(cursor.fetchall())
# Count documents by type
cursor.execute('''
SELECT file_type, COUNT(*)
FROM documents
GROUP BY file_type
''')
type_counts = dict(cursor.fetchall())
# Total statistics
cursor.execute('SELECT COUNT(*) FROM documents')
total_documents = cursor.fetchone()[0]
cursor.execute('SELECT COUNT(*) FROM citations')
total_citations = cursor.fetchone()[0]
cursor.execute('SELECT SUM(total_chunks) FROM documents')
total_chunks = cursor.fetchone()[0] or 0
cursor.execute('SELECT SUM(file_size) FROM documents')
total_file_size = cursor.fetchone()[0] or 0
return {
'total_documents': total_documents,
'total_citations': total_citations,
'total_chunks': total_chunks,
'total_file_size': total_file_size,
'documents_by_status': status_counts,
'documents_by_type': type_counts,
'database_path': self.db_path
}
except Exception as e:
logger.error(f"Failed to get statistics: {e}")
return {'error': str(e)}
def cleanup_orphaned_citations(self) -> int:
"""
Clean up citations that reference non-existent documents.
Returns:
Number of orphaned citations removed
"""
try:
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute('''
DELETE FROM citations
WHERE document_id NOT IN (SELECT document_id FROM documents)
''')
removed_count = cursor.rowcount
conn.commit()
logger.info(f"Cleaned up {removed_count} orphaned citations")
return removed_count
except Exception as e:
logger.error(f"Failed to cleanup orphaned citations: {e}")
return 0
if __name__=="__main__":
logger.info(f"metadata init ..")