cx_ai_agent_v1 / services /web_search.py
muzakkirhussain011's picture
Add application files (text files only)
8bab08d
"""
Web Search Service using Serper API (serper.dev)
Provides reliable, low-cost Google Search API functionality for the CX AI Agent
With built-in rate limiting and retry logic
"""
from typing import List, Dict, Optional
import asyncio
import logging
import time
import os
import json
import requests
from functools import wraps
logger = logging.getLogger(__name__)
def async_wrapper(func):
"""Wrapper to run sync functions in async context"""
@wraps(func)
async def wrapper(*args, **kwargs):
loop = asyncio.get_event_loop()
return await loop.run_in_executor(None, lambda: func(*args, **kwargs))
return wrapper
class WebSearchService:
"""
Web search service using Serper API (serper.dev)
Low-cost Google Search API with 2,500 free searches/month
Requires SERPER_API_KEY environment variable
Includes rate limiting protection and retry logic
"""
def __init__(self, max_results: int = 10, rate_limit_delay: float = 0.5):
"""
Initialize web search service
Args:
max_results: Maximum number of results to return per query
rate_limit_delay: Delay between requests in seconds (default: 0.5)
"""
self.max_results = max_results
self.rate_limit_delay = rate_limit_delay
self.last_request_time = 0
self._request_lock = asyncio.Lock()
self.api_url = "https://google.serper.dev/search"
# Get API key from environment
self.api_key = os.getenv('SERPER_API_KEY')
if not self.api_key:
logger.warning(
"SERPER_API_KEY not found in environment. "
"Web search will fail. Please set SERPER_API_KEY environment variable. "
"Get your free API key at https://serper.dev/"
)
async def _rate_limit(self):
"""Enforce rate limiting between requests"""
async with self._request_lock:
current_time = time.time()
time_since_last_request = current_time - self.last_request_time
if time_since_last_request < self.rate_limit_delay:
sleep_time = self.rate_limit_delay - time_since_last_request
logger.debug(f"Rate limiting: sleeping {sleep_time:.2f}s")
await asyncio.sleep(sleep_time)
self.last_request_time = time.time()
def _make_request(self, query: str, num_results: int, search_type: str = "search") -> dict:
"""
Make a synchronous request to Serper API
Args:
query: Search query string
num_results: Number of results to return
search_type: Type of search (search, news)
Returns:
API response as dictionary
"""
if not self.api_key:
raise ValueError("SERPER_API_KEY is not set")
# Determine endpoint
if search_type == "news":
url = "https://google.serper.dev/news"
else:
url = self.api_url
# Prepare request
headers = {
'X-API-KEY': self.api_key,
'Content-Type': 'application/json'
}
payload = {
'q': query,
'num': num_results
}
# Make request
response = requests.post(
url,
headers=headers,
data=json.dumps(payload),
timeout=10
)
response.raise_for_status()
return response.json()
async def search(
self,
query: str,
max_results: Optional[int] = None,
region: str = 'wt-wt', # kept for compatibility
safesearch: str = 'moderate', # kept for compatibility
max_retries: int = 3
) -> List[Dict[str, str]]:
"""
Perform web search with rate limiting and retry logic
Args:
query: Search query string
max_results: Override default max results
region: Region code (kept for compatibility, not used with Serper)
safesearch: Safe search setting (kept for compatibility)
max_retries: Maximum number of retry attempts
Returns:
List of search results with title, body, and url
"""
if not query or not query.strip():
logger.warning("Empty search query provided")
return []
if not self.api_key:
logger.error("SERPER_API_KEY not set. Cannot perform search.")
return []
num_results = max_results or self.max_results
for attempt in range(max_retries):
try:
# Rate limiting
await self._rate_limit()
logger.info(f"Searching via Serper API for: '{query}' (attempt {attempt + 1}/{max_retries})")
# Run search in executor
loop = asyncio.get_event_loop()
response = await loop.run_in_executor(
None,
lambda: self._make_request(query, num_results, "search")
)
# Parse results
formatted_results = self._parse_search_results(response)
logger.info(f"Found {len(formatted_results)} results for query: '{query}'")
return formatted_results
except requests.exceptions.HTTPError as e:
error_msg = str(e)
logger.warning(f"Search attempt {attempt + 1}/{max_retries} failed for '{query}': {error_msg}")
# Check if rate limited or quota exceeded
if e.response.status_code == 429 or "quota" in error_msg.lower():
if attempt < max_retries - 1:
# Exponential backoff: 5s, 10s, 20s
backoff_time = 5 * (2 ** attempt)
logger.info(f"Rate limited or quota exceeded, backing off for {backoff_time}s...")
await asyncio.sleep(backoff_time)
continue
# If last attempt, log and return empty
if attempt == max_retries - 1:
logger.error(f"All {max_retries} attempts failed for query '{query}'")
return []
# Wait before retry
await asyncio.sleep(2)
except Exception as e:
logger.error(f"Search attempt {attempt + 1}/{max_retries} failed for '{query}': {str(e)}")
if attempt == max_retries - 1:
logger.error(f"All {max_retries} attempts failed for query '{query}'")
return []
await asyncio.sleep(2)
return []
def _parse_search_results(self, response: dict) -> List[Dict[str, str]]:
"""
Parse Serper API response into our standard format
Args:
response: API response dictionary
Returns:
List of formatted search results
"""
formatted_results = []
try:
# Serper returns results in 'organic' field
organic_results = response.get('organic', [])
for result in organic_results:
formatted_results.append({
'title': result.get('title', ''),
'body': result.get('snippet', ''),
'url': result.get('link', ''),
'source': self._extract_domain(result.get('link', ''))
})
# Also check for answer box / knowledge graph
if 'answerBox' in response:
answer_box = response['answerBox']
formatted_results.insert(0, {
'title': answer_box.get('title', 'Answer'),
'body': answer_box.get('answer', answer_box.get('snippet', '')),
'url': answer_box.get('link', ''),
'source': 'Google Answer Box'
})
# Knowledge graph
if 'knowledgeGraph' in response:
kg = response['knowledgeGraph']
formatted_results.insert(0, {
'title': kg.get('title', 'Knowledge Graph'),
'body': kg.get('description', ''),
'url': kg.get('website', ''),
'source': 'Google Knowledge Graph'
})
except Exception as e:
logger.error(f"Error parsing search results: {str(e)}")
return formatted_results
def _extract_domain(self, url: str) -> str:
"""Extract domain from URL"""
if not url:
return 'unknown'
try:
from urllib.parse import urlparse
parsed = urlparse(url)
return parsed.netloc or 'unknown'
except:
return 'unknown'
async def search_news(
self,
query: str,
max_results: Optional[int] = None,
max_retries: int = 3
) -> List[Dict[str, str]]:
"""
Search for news articles with rate limiting and retry logic
Args:
query: Search query string
max_results: Override default max results
max_retries: Maximum number of retry attempts
Returns:
List of news results
"""
if not query or not query.strip():
logger.warning("Empty news search query provided")
return []
if not self.api_key:
logger.error("SERPER_API_KEY not set. Cannot perform news search.")
return []
num_results = max_results or self.max_results
for attempt in range(max_retries):
try:
# Rate limiting
await self._rate_limit()
logger.info(f"Searching Serper News API for: '{query}' (attempt {attempt + 1}/{max_retries})")
# Run news search in executor
loop = asyncio.get_event_loop()
response = await loop.run_in_executor(
None,
lambda: self._make_request(query, num_results, "news")
)
# Parse news results
formatted_results = self._parse_news_results(response)
logger.info(f"Found {len(formatted_results)} news results for query: '{query}'")
return formatted_results
except requests.exceptions.HTTPError as e:
error_msg = str(e)
logger.warning(f"News search attempt {attempt + 1}/{max_retries} failed for '{query}': {error_msg}")
if e.response.status_code == 429 or "quota" in error_msg.lower():
if attempt < max_retries - 1:
backoff_time = 5 * (2 ** attempt)
logger.info(f"Rate limited, backing off for {backoff_time}s...")
await asyncio.sleep(backoff_time)
continue
if attempt == max_retries - 1:
logger.error(f"All {max_retries} attempts failed for news query '{query}'")
return []
await asyncio.sleep(2)
except Exception as e:
logger.error(f"News search attempt {attempt + 1}/{max_retries} failed for '{query}': {str(e)}")
if attempt == max_retries - 1:
logger.error(f"All {max_retries} attempts failed for news query '{query}'")
return []
await asyncio.sleep(2)
return []
def _parse_news_results(self, response: dict) -> List[Dict[str, str]]:
"""
Parse Serper news API response
Args:
response: API response dictionary
Returns:
List of formatted news results
"""
formatted_results = []
try:
# Serper returns news results in 'news' field
news_results = response.get('news', [])
for result in news_results:
formatted_results.append({
'title': result.get('title', ''),
'body': result.get('snippet', ''),
'url': result.get('link', ''),
'source': result.get('source', self._extract_domain(result.get('link', ''))),
'date': result.get('date', '')
})
except Exception as e:
logger.error(f"Error parsing news results: {str(e)}")
return formatted_results
async def instant_answer(self, query: str) -> Optional[str]:
"""
Get instant answer for a query from answer box/knowledge graph
Args:
query: Search query string
Returns:
Instant answer text or None
"""
if not query or not query.strip():
return None
if not self.api_key:
return None
try:
logger.info(f"Getting instant answer for: '{query}'")
# Perform regular search
results = await self.search(query, max_results=1)
# Check if first result is from answer box or knowledge graph
if results and len(results) > 0:
first_result = results[0]
if first_result.get('source') in ['Google Answer Box', 'Google Knowledge Graph']:
return first_result.get('body', '')
return None
except Exception as e:
logger.error(f"Instant answer error for query '{query}': {str(e)}")
return None
# Singleton instance
_search_service: Optional[WebSearchService] = None
def get_search_service() -> WebSearchService:
"""Get or create singleton search service instance"""
global _search_service
if _search_service is None:
_search_service = WebSearchService()
return _search_service