|
|
""" |
|
|
Web Search Service using Serper API (serper.dev) |
|
|
Provides reliable, low-cost Google Search API functionality for the CX AI Agent |
|
|
With built-in rate limiting and retry logic |
|
|
""" |
|
|
from typing import List, Dict, Optional |
|
|
import asyncio |
|
|
import logging |
|
|
import time |
|
|
import os |
|
|
import json |
|
|
import requests |
|
|
from functools import wraps |
|
|
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
|
|
|
def async_wrapper(func): |
|
|
"""Wrapper to run sync functions in async context""" |
|
|
@wraps(func) |
|
|
async def wrapper(*args, **kwargs): |
|
|
loop = asyncio.get_event_loop() |
|
|
return await loop.run_in_executor(None, lambda: func(*args, **kwargs)) |
|
|
return wrapper |
|
|
|
|
|
|
|
|
class WebSearchService: |
|
|
""" |
|
|
Web search service using Serper API (serper.dev) |
|
|
Low-cost Google Search API with 2,500 free searches/month |
|
|
Requires SERPER_API_KEY environment variable |
|
|
Includes rate limiting protection and retry logic |
|
|
""" |
|
|
|
|
|
def __init__(self, max_results: int = 10, rate_limit_delay: float = 0.5): |
|
|
""" |
|
|
Initialize web search service |
|
|
|
|
|
Args: |
|
|
max_results: Maximum number of results to return per query |
|
|
rate_limit_delay: Delay between requests in seconds (default: 0.5) |
|
|
""" |
|
|
self.max_results = max_results |
|
|
self.rate_limit_delay = rate_limit_delay |
|
|
self.last_request_time = 0 |
|
|
self._request_lock = asyncio.Lock() |
|
|
self.api_url = "https://google.serper.dev/search" |
|
|
|
|
|
|
|
|
self.api_key = os.getenv('SERPER_API_KEY') |
|
|
|
|
|
if not self.api_key: |
|
|
logger.warning( |
|
|
"SERPER_API_KEY not found in environment. " |
|
|
"Web search will fail. Please set SERPER_API_KEY environment variable. " |
|
|
"Get your free API key at https://serper.dev/" |
|
|
) |
|
|
|
|
|
async def _rate_limit(self): |
|
|
"""Enforce rate limiting between requests""" |
|
|
async with self._request_lock: |
|
|
current_time = time.time() |
|
|
time_since_last_request = current_time - self.last_request_time |
|
|
|
|
|
if time_since_last_request < self.rate_limit_delay: |
|
|
sleep_time = self.rate_limit_delay - time_since_last_request |
|
|
logger.debug(f"Rate limiting: sleeping {sleep_time:.2f}s") |
|
|
await asyncio.sleep(sleep_time) |
|
|
|
|
|
self.last_request_time = time.time() |
|
|
|
|
|
def _make_request(self, query: str, num_results: int, search_type: str = "search") -> dict: |
|
|
""" |
|
|
Make a synchronous request to Serper API |
|
|
|
|
|
Args: |
|
|
query: Search query string |
|
|
num_results: Number of results to return |
|
|
search_type: Type of search (search, news) |
|
|
|
|
|
Returns: |
|
|
API response as dictionary |
|
|
""" |
|
|
if not self.api_key: |
|
|
raise ValueError("SERPER_API_KEY is not set") |
|
|
|
|
|
|
|
|
if search_type == "news": |
|
|
url = "https://google.serper.dev/news" |
|
|
else: |
|
|
url = self.api_url |
|
|
|
|
|
|
|
|
headers = { |
|
|
'X-API-KEY': self.api_key, |
|
|
'Content-Type': 'application/json' |
|
|
} |
|
|
|
|
|
payload = { |
|
|
'q': query, |
|
|
'num': num_results |
|
|
} |
|
|
|
|
|
|
|
|
response = requests.post( |
|
|
url, |
|
|
headers=headers, |
|
|
data=json.dumps(payload), |
|
|
timeout=10 |
|
|
) |
|
|
|
|
|
response.raise_for_status() |
|
|
return response.json() |
|
|
|
|
|
async def search( |
|
|
self, |
|
|
query: str, |
|
|
max_results: Optional[int] = None, |
|
|
region: str = 'wt-wt', |
|
|
safesearch: str = 'moderate', |
|
|
max_retries: int = 3 |
|
|
) -> List[Dict[str, str]]: |
|
|
""" |
|
|
Perform web search with rate limiting and retry logic |
|
|
|
|
|
Args: |
|
|
query: Search query string |
|
|
max_results: Override default max results |
|
|
region: Region code (kept for compatibility, not used with Serper) |
|
|
safesearch: Safe search setting (kept for compatibility) |
|
|
max_retries: Maximum number of retry attempts |
|
|
|
|
|
Returns: |
|
|
List of search results with title, body, and url |
|
|
""" |
|
|
if not query or not query.strip(): |
|
|
logger.warning("Empty search query provided") |
|
|
return [] |
|
|
|
|
|
if not self.api_key: |
|
|
logger.error("SERPER_API_KEY not set. Cannot perform search.") |
|
|
return [] |
|
|
|
|
|
num_results = max_results or self.max_results |
|
|
|
|
|
for attempt in range(max_retries): |
|
|
try: |
|
|
|
|
|
await self._rate_limit() |
|
|
|
|
|
logger.info(f"Searching via Serper API for: '{query}' (attempt {attempt + 1}/{max_retries})") |
|
|
|
|
|
|
|
|
loop = asyncio.get_event_loop() |
|
|
response = await loop.run_in_executor( |
|
|
None, |
|
|
lambda: self._make_request(query, num_results, "search") |
|
|
) |
|
|
|
|
|
|
|
|
formatted_results = self._parse_search_results(response) |
|
|
|
|
|
logger.info(f"Found {len(formatted_results)} results for query: '{query}'") |
|
|
return formatted_results |
|
|
|
|
|
except requests.exceptions.HTTPError as e: |
|
|
error_msg = str(e) |
|
|
logger.warning(f"Search attempt {attempt + 1}/{max_retries} failed for '{query}': {error_msg}") |
|
|
|
|
|
|
|
|
if e.response.status_code == 429 or "quota" in error_msg.lower(): |
|
|
if attempt < max_retries - 1: |
|
|
|
|
|
backoff_time = 5 * (2 ** attempt) |
|
|
logger.info(f"Rate limited or quota exceeded, backing off for {backoff_time}s...") |
|
|
await asyncio.sleep(backoff_time) |
|
|
continue |
|
|
|
|
|
|
|
|
if attempt == max_retries - 1: |
|
|
logger.error(f"All {max_retries} attempts failed for query '{query}'") |
|
|
return [] |
|
|
|
|
|
|
|
|
await asyncio.sleep(2) |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Search attempt {attempt + 1}/{max_retries} failed for '{query}': {str(e)}") |
|
|
|
|
|
if attempt == max_retries - 1: |
|
|
logger.error(f"All {max_retries} attempts failed for query '{query}'") |
|
|
return [] |
|
|
|
|
|
await asyncio.sleep(2) |
|
|
|
|
|
return [] |
|
|
|
|
|
def _parse_search_results(self, response: dict) -> List[Dict[str, str]]: |
|
|
""" |
|
|
Parse Serper API response into our standard format |
|
|
|
|
|
Args: |
|
|
response: API response dictionary |
|
|
|
|
|
Returns: |
|
|
List of formatted search results |
|
|
""" |
|
|
formatted_results = [] |
|
|
|
|
|
try: |
|
|
|
|
|
organic_results = response.get('organic', []) |
|
|
|
|
|
for result in organic_results: |
|
|
formatted_results.append({ |
|
|
'title': result.get('title', ''), |
|
|
'body': result.get('snippet', ''), |
|
|
'url': result.get('link', ''), |
|
|
'source': self._extract_domain(result.get('link', '')) |
|
|
}) |
|
|
|
|
|
|
|
|
if 'answerBox' in response: |
|
|
answer_box = response['answerBox'] |
|
|
formatted_results.insert(0, { |
|
|
'title': answer_box.get('title', 'Answer'), |
|
|
'body': answer_box.get('answer', answer_box.get('snippet', '')), |
|
|
'url': answer_box.get('link', ''), |
|
|
'source': 'Google Answer Box' |
|
|
}) |
|
|
|
|
|
|
|
|
if 'knowledgeGraph' in response: |
|
|
kg = response['knowledgeGraph'] |
|
|
formatted_results.insert(0, { |
|
|
'title': kg.get('title', 'Knowledge Graph'), |
|
|
'body': kg.get('description', ''), |
|
|
'url': kg.get('website', ''), |
|
|
'source': 'Google Knowledge Graph' |
|
|
}) |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Error parsing search results: {str(e)}") |
|
|
|
|
|
return formatted_results |
|
|
|
|
|
def _extract_domain(self, url: str) -> str: |
|
|
"""Extract domain from URL""" |
|
|
if not url: |
|
|
return 'unknown' |
|
|
try: |
|
|
from urllib.parse import urlparse |
|
|
parsed = urlparse(url) |
|
|
return parsed.netloc or 'unknown' |
|
|
except: |
|
|
return 'unknown' |
|
|
|
|
|
async def search_news( |
|
|
self, |
|
|
query: str, |
|
|
max_results: Optional[int] = None, |
|
|
max_retries: int = 3 |
|
|
) -> List[Dict[str, str]]: |
|
|
""" |
|
|
Search for news articles with rate limiting and retry logic |
|
|
|
|
|
Args: |
|
|
query: Search query string |
|
|
max_results: Override default max results |
|
|
max_retries: Maximum number of retry attempts |
|
|
|
|
|
Returns: |
|
|
List of news results |
|
|
""" |
|
|
if not query or not query.strip(): |
|
|
logger.warning("Empty news search query provided") |
|
|
return [] |
|
|
|
|
|
if not self.api_key: |
|
|
logger.error("SERPER_API_KEY not set. Cannot perform news search.") |
|
|
return [] |
|
|
|
|
|
num_results = max_results or self.max_results |
|
|
|
|
|
for attempt in range(max_retries): |
|
|
try: |
|
|
|
|
|
await self._rate_limit() |
|
|
|
|
|
logger.info(f"Searching Serper News API for: '{query}' (attempt {attempt + 1}/{max_retries})") |
|
|
|
|
|
|
|
|
loop = asyncio.get_event_loop() |
|
|
response = await loop.run_in_executor( |
|
|
None, |
|
|
lambda: self._make_request(query, num_results, "news") |
|
|
) |
|
|
|
|
|
|
|
|
formatted_results = self._parse_news_results(response) |
|
|
|
|
|
logger.info(f"Found {len(formatted_results)} news results for query: '{query}'") |
|
|
return formatted_results |
|
|
|
|
|
except requests.exceptions.HTTPError as e: |
|
|
error_msg = str(e) |
|
|
logger.warning(f"News search attempt {attempt + 1}/{max_retries} failed for '{query}': {error_msg}") |
|
|
|
|
|
if e.response.status_code == 429 or "quota" in error_msg.lower(): |
|
|
if attempt < max_retries - 1: |
|
|
backoff_time = 5 * (2 ** attempt) |
|
|
logger.info(f"Rate limited, backing off for {backoff_time}s...") |
|
|
await asyncio.sleep(backoff_time) |
|
|
continue |
|
|
|
|
|
if attempt == max_retries - 1: |
|
|
logger.error(f"All {max_retries} attempts failed for news query '{query}'") |
|
|
return [] |
|
|
|
|
|
await asyncio.sleep(2) |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"News search attempt {attempt + 1}/{max_retries} failed for '{query}': {str(e)}") |
|
|
|
|
|
if attempt == max_retries - 1: |
|
|
logger.error(f"All {max_retries} attempts failed for news query '{query}'") |
|
|
return [] |
|
|
|
|
|
await asyncio.sleep(2) |
|
|
|
|
|
return [] |
|
|
|
|
|
def _parse_news_results(self, response: dict) -> List[Dict[str, str]]: |
|
|
""" |
|
|
Parse Serper news API response |
|
|
|
|
|
Args: |
|
|
response: API response dictionary |
|
|
|
|
|
Returns: |
|
|
List of formatted news results |
|
|
""" |
|
|
formatted_results = [] |
|
|
|
|
|
try: |
|
|
|
|
|
news_results = response.get('news', []) |
|
|
|
|
|
for result in news_results: |
|
|
formatted_results.append({ |
|
|
'title': result.get('title', ''), |
|
|
'body': result.get('snippet', ''), |
|
|
'url': result.get('link', ''), |
|
|
'source': result.get('source', self._extract_domain(result.get('link', ''))), |
|
|
'date': result.get('date', '') |
|
|
}) |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Error parsing news results: {str(e)}") |
|
|
|
|
|
return formatted_results |
|
|
|
|
|
async def instant_answer(self, query: str) -> Optional[str]: |
|
|
""" |
|
|
Get instant answer for a query from answer box/knowledge graph |
|
|
|
|
|
Args: |
|
|
query: Search query string |
|
|
|
|
|
Returns: |
|
|
Instant answer text or None |
|
|
""" |
|
|
if not query or not query.strip(): |
|
|
return None |
|
|
|
|
|
if not self.api_key: |
|
|
return None |
|
|
|
|
|
try: |
|
|
logger.info(f"Getting instant answer for: '{query}'") |
|
|
|
|
|
|
|
|
results = await self.search(query, max_results=1) |
|
|
|
|
|
|
|
|
if results and len(results) > 0: |
|
|
first_result = results[0] |
|
|
if first_result.get('source') in ['Google Answer Box', 'Google Knowledge Graph']: |
|
|
return first_result.get('body', '') |
|
|
|
|
|
return None |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Instant answer error for query '{query}': {str(e)}") |
|
|
return None |
|
|
|
|
|
|
|
|
|
|
|
_search_service: Optional[WebSearchService] = None |
|
|
|
|
|
|
|
|
def get_search_service() -> WebSearchService: |
|
|
"""Get or create singleton search service instance""" |
|
|
global _search_service |
|
|
if _search_service is None: |
|
|
_search_service = WebSearchService() |
|
|
return _search_service |
|
|
|