""" Web Search Service using Serper API (serper.dev) Provides reliable, low-cost Google Search API functionality for the CX AI Agent With built-in rate limiting and retry logic """ from typing import List, Dict, Optional import asyncio import logging import time import os import json import requests from functools import wraps logger = logging.getLogger(__name__) def async_wrapper(func): """Wrapper to run sync functions in async context""" @wraps(func) async def wrapper(*args, **kwargs): loop = asyncio.get_event_loop() return await loop.run_in_executor(None, lambda: func(*args, **kwargs)) return wrapper class WebSearchService: """ Web search service using Serper API (serper.dev) Low-cost Google Search API with 2,500 free searches/month Requires SERPER_API_KEY environment variable Includes rate limiting protection and retry logic """ def __init__(self, max_results: int = 10, rate_limit_delay: float = 0.5): """ Initialize web search service Args: max_results: Maximum number of results to return per query rate_limit_delay: Delay between requests in seconds (default: 0.5) """ self.max_results = max_results self.rate_limit_delay = rate_limit_delay self.last_request_time = 0 self._request_lock = asyncio.Lock() self.api_url = "https://google.serper.dev/search" # Get API key from environment self.api_key = os.getenv('SERPER_API_KEY') if not self.api_key: logger.warning( "SERPER_API_KEY not found in environment. " "Web search will fail. Please set SERPER_API_KEY environment variable. " "Get your free API key at https://serper.dev/" ) async def _rate_limit(self): """Enforce rate limiting between requests""" async with self._request_lock: current_time = time.time() time_since_last_request = current_time - self.last_request_time if time_since_last_request < self.rate_limit_delay: sleep_time = self.rate_limit_delay - time_since_last_request logger.debug(f"Rate limiting: sleeping {sleep_time:.2f}s") await asyncio.sleep(sleep_time) self.last_request_time = time.time() def _make_request(self, query: str, num_results: int, search_type: str = "search") -> dict: """ Make a synchronous request to Serper API Args: query: Search query string num_results: Number of results to return search_type: Type of search (search, news) Returns: API response as dictionary """ if not self.api_key: raise ValueError("SERPER_API_KEY is not set") # Determine endpoint if search_type == "news": url = "https://google.serper.dev/news" else: url = self.api_url # Prepare request headers = { 'X-API-KEY': self.api_key, 'Content-Type': 'application/json' } payload = { 'q': query, 'num': num_results } # Make request response = requests.post( url, headers=headers, data=json.dumps(payload), timeout=10 ) response.raise_for_status() return response.json() async def search( self, query: str, max_results: Optional[int] = None, region: str = 'wt-wt', # kept for compatibility safesearch: str = 'moderate', # kept for compatibility max_retries: int = 3 ) -> List[Dict[str, str]]: """ Perform web search with rate limiting and retry logic Args: query: Search query string max_results: Override default max results region: Region code (kept for compatibility, not used with Serper) safesearch: Safe search setting (kept for compatibility) max_retries: Maximum number of retry attempts Returns: List of search results with title, body, and url """ if not query or not query.strip(): logger.warning("Empty search query provided") return [] if not self.api_key: logger.error("SERPER_API_KEY not set. Cannot perform search.") return [] num_results = max_results or self.max_results for attempt in range(max_retries): try: # Rate limiting await self._rate_limit() logger.info(f"Searching via Serper API for: '{query}' (attempt {attempt + 1}/{max_retries})") # Run search in executor loop = asyncio.get_event_loop() response = await loop.run_in_executor( None, lambda: self._make_request(query, num_results, "search") ) # Parse results formatted_results = self._parse_search_results(response) logger.info(f"Found {len(formatted_results)} results for query: '{query}'") return formatted_results except requests.exceptions.HTTPError as e: error_msg = str(e) logger.warning(f"Search attempt {attempt + 1}/{max_retries} failed for '{query}': {error_msg}") # Check if rate limited or quota exceeded if e.response.status_code == 429 or "quota" in error_msg.lower(): if attempt < max_retries - 1: # Exponential backoff: 5s, 10s, 20s backoff_time = 5 * (2 ** attempt) logger.info(f"Rate limited or quota exceeded, backing off for {backoff_time}s...") await asyncio.sleep(backoff_time) continue # If last attempt, log and return empty if attempt == max_retries - 1: logger.error(f"All {max_retries} attempts failed for query '{query}'") return [] # Wait before retry await asyncio.sleep(2) except Exception as e: logger.error(f"Search attempt {attempt + 1}/{max_retries} failed for '{query}': {str(e)}") if attempt == max_retries - 1: logger.error(f"All {max_retries} attempts failed for query '{query}'") return [] await asyncio.sleep(2) return [] def _parse_search_results(self, response: dict) -> List[Dict[str, str]]: """ Parse Serper API response into our standard format Args: response: API response dictionary Returns: List of formatted search results """ formatted_results = [] try: # Serper returns results in 'organic' field organic_results = response.get('organic', []) for result in organic_results: formatted_results.append({ 'title': result.get('title', ''), 'body': result.get('snippet', ''), 'url': result.get('link', ''), 'source': self._extract_domain(result.get('link', '')) }) # Also check for answer box / knowledge graph if 'answerBox' in response: answer_box = response['answerBox'] formatted_results.insert(0, { 'title': answer_box.get('title', 'Answer'), 'body': answer_box.get('answer', answer_box.get('snippet', '')), 'url': answer_box.get('link', ''), 'source': 'Google Answer Box' }) # Knowledge graph if 'knowledgeGraph' in response: kg = response['knowledgeGraph'] formatted_results.insert(0, { 'title': kg.get('title', 'Knowledge Graph'), 'body': kg.get('description', ''), 'url': kg.get('website', ''), 'source': 'Google Knowledge Graph' }) except Exception as e: logger.error(f"Error parsing search results: {str(e)}") return formatted_results def _extract_domain(self, url: str) -> str: """Extract domain from URL""" if not url: return 'unknown' try: from urllib.parse import urlparse parsed = urlparse(url) return parsed.netloc or 'unknown' except: return 'unknown' async def search_news( self, query: str, max_results: Optional[int] = None, max_retries: int = 3 ) -> List[Dict[str, str]]: """ Search for news articles with rate limiting and retry logic Args: query: Search query string max_results: Override default max results max_retries: Maximum number of retry attempts Returns: List of news results """ if not query or not query.strip(): logger.warning("Empty news search query provided") return [] if not self.api_key: logger.error("SERPER_API_KEY not set. Cannot perform news search.") return [] num_results = max_results or self.max_results for attempt in range(max_retries): try: # Rate limiting await self._rate_limit() logger.info(f"Searching Serper News API for: '{query}' (attempt {attempt + 1}/{max_retries})") # Run news search in executor loop = asyncio.get_event_loop() response = await loop.run_in_executor( None, lambda: self._make_request(query, num_results, "news") ) # Parse news results formatted_results = self._parse_news_results(response) logger.info(f"Found {len(formatted_results)} news results for query: '{query}'") return formatted_results except requests.exceptions.HTTPError as e: error_msg = str(e) logger.warning(f"News search attempt {attempt + 1}/{max_retries} failed for '{query}': {error_msg}") if e.response.status_code == 429 or "quota" in error_msg.lower(): if attempt < max_retries - 1: backoff_time = 5 * (2 ** attempt) logger.info(f"Rate limited, backing off for {backoff_time}s...") await asyncio.sleep(backoff_time) continue if attempt == max_retries - 1: logger.error(f"All {max_retries} attempts failed for news query '{query}'") return [] await asyncio.sleep(2) except Exception as e: logger.error(f"News search attempt {attempt + 1}/{max_retries} failed for '{query}': {str(e)}") if attempt == max_retries - 1: logger.error(f"All {max_retries} attempts failed for news query '{query}'") return [] await asyncio.sleep(2) return [] def _parse_news_results(self, response: dict) -> List[Dict[str, str]]: """ Parse Serper news API response Args: response: API response dictionary Returns: List of formatted news results """ formatted_results = [] try: # Serper returns news results in 'news' field news_results = response.get('news', []) for result in news_results: formatted_results.append({ 'title': result.get('title', ''), 'body': result.get('snippet', ''), 'url': result.get('link', ''), 'source': result.get('source', self._extract_domain(result.get('link', ''))), 'date': result.get('date', '') }) except Exception as e: logger.error(f"Error parsing news results: {str(e)}") return formatted_results async def instant_answer(self, query: str) -> Optional[str]: """ Get instant answer for a query from answer box/knowledge graph Args: query: Search query string Returns: Instant answer text or None """ if not query or not query.strip(): return None if not self.api_key: return None try: logger.info(f"Getting instant answer for: '{query}'") # Perform regular search results = await self.search(query, max_results=1) # Check if first result is from answer box or knowledge graph if results and len(results) > 0: first_result = results[0] if first_result.get('source') in ['Google Answer Box', 'Google Knowledge Graph']: return first_result.get('body', '') return None except Exception as e: logger.error(f"Instant answer error for query '{query}': {str(e)}") return None # Singleton instance _search_service: Optional[WebSearchService] = None def get_search_service() -> WebSearchService: """Get or create singleton search service instance""" global _search_service if _search_service is None: _search_service = WebSearchService() return _search_service