math-routing-agent / database /qdrant_manager.py
bhoomika19's picture
phase 1 - data storage in qdrant and retrieval
6874d8b
"""
Qdrant client wrapper for vector database operations.
"""
import logging
from typing import List, Dict, Any
from qdrant_client import QdrantClient
from qdrant_client.models import Distance, VectorParams, PointStruct
import time
logger = logging.getLogger(__name__)
class QdrantManager:
"""Manages Qdrant vector database operations."""
def __init__(self, url: str, api_key: str):
"""Initialize Qdrant client."""
self.client = QdrantClient(url=url, api_key=api_key)
logger.info(f"Connected to Qdrant at {url}")
def create_collection(self, collection_name: str, vector_size: int, distance: str = "Cosine"):
"""
Create a new collection in Qdrant.
Args:
collection_name: Name of the collection
vector_size: Dimension of vectors
distance: Distance metric (Cosine, Euclidean, Dot)
"""
try:
# Check if collection already exists
collections = self.client.get_collections().collections
existing_names = [col.name for col in collections]
if collection_name in existing_names:
logger.info(f"Collection '{collection_name}' already exists")
return True
# Create new collection
distance_map = {
"Cosine": Distance.COSINE,
"Euclidean": Distance.EUCLID,
"Dot": Distance.DOT
}
self.client.create_collection(
collection_name=collection_name,
vectors_config=VectorParams(
size=vector_size,
distance=distance_map.get(distance, Distance.COSINE)
)
)
logger.info(f"Created collection '{collection_name}' with vector size {vector_size}")
return True
except Exception as e:
logger.error(f"Error creating collection: {e}")
return False
def upsert_points(self, collection_name: str, points_data: List[Dict[str, Any]],
embeddings: List[List[float]], max_retries: int = 3):
"""
Upsert points into Qdrant collection with retry logic.
Args:
collection_name: Name of the collection
points_data: List of point data dictionaries
embeddings: List of embedding vectors
max_retries: Maximum number of retry attempts
"""
points = []
for i, (data, embedding) in enumerate(zip(points_data, embeddings)):
point = PointStruct(
id=data['id'],
vector=embedding,
payload={
'problem': data['problem'],
'solution': data['solution'],
'source': data['source']
}
)
points.append(point)
# Retry logic for network issues
for attempt in range(max_retries):
try:
self.client.upsert(
collection_name=collection_name,
points=points
)
logger.info(f"Successfully upserted {len(points)} points")
return True
except Exception as e:
logger.warning(f"Attempt {attempt + 1} failed: {e}")
if attempt < max_retries - 1:
time.sleep(2 ** attempt) # Exponential backoff
else:
logger.error(f"Failed to upsert points after {max_retries} attempts")
raise e
def search_similar(self, collection_name: str, query_vector: List[float],
limit: int = 3, score_threshold: float = 0.0):
"""
Search for similar vectors in the collection.
Args:
collection_name: Name of the collection
query_vector: Query embedding vector
limit: Number of results to return
score_threshold: Minimum similarity score
Returns:
Search results from Qdrant
"""
try:
results = self.client.search(
collection_name=collection_name,
query_vector=query_vector,
limit=limit,
score_threshold=score_threshold
)
logger.info(f"Found {len(results)} similar results")
return results
except Exception as e:
logger.error(f"Error searching collection: {e}")
return []
def get_collection_info(self, collection_name: str):
"""Get information about a collection."""
try:
info = self.client.get_collection(collection_name)
logger.info(f"Collection info: {info}")
return info
except Exception as e:
logger.error(f"Error getting collection info: {e}")
return None