File size: 13,845 Bytes
8bab08d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
"""
Web Search Service using Serper API (serper.dev)
Provides reliable, low-cost Google Search API functionality for the CX AI Agent
With built-in rate limiting and retry logic
"""
from typing import List, Dict, Optional
import asyncio
import logging
import time
import os
import json
import requests
from functools import wraps

logger = logging.getLogger(__name__)


def async_wrapper(func):
    """Wrapper to run sync functions in async context"""
    @wraps(func)
    async def wrapper(*args, **kwargs):
        loop = asyncio.get_event_loop()
        return await loop.run_in_executor(None, lambda: func(*args, **kwargs))
    return wrapper


class WebSearchService:
    """
    Web search service using Serper API (serper.dev)
    Low-cost Google Search API with 2,500 free searches/month
    Requires SERPER_API_KEY environment variable
    Includes rate limiting protection and retry logic
    """

    def __init__(self, max_results: int = 10, rate_limit_delay: float = 0.5):
        """
        Initialize web search service

        Args:
            max_results: Maximum number of results to return per query
            rate_limit_delay: Delay between requests in seconds (default: 0.5)
        """
        self.max_results = max_results
        self.rate_limit_delay = rate_limit_delay
        self.last_request_time = 0
        self._request_lock = asyncio.Lock()
        self.api_url = "https://google.serper.dev/search"

        # Get API key from environment
        self.api_key = os.getenv('SERPER_API_KEY')

        if not self.api_key:
            logger.warning(
                "SERPER_API_KEY not found in environment. "
                "Web search will fail. Please set SERPER_API_KEY environment variable. "
                "Get your free API key at https://serper.dev/"
            )

    async def _rate_limit(self):
        """Enforce rate limiting between requests"""
        async with self._request_lock:
            current_time = time.time()
            time_since_last_request = current_time - self.last_request_time

            if time_since_last_request < self.rate_limit_delay:
                sleep_time = self.rate_limit_delay - time_since_last_request
                logger.debug(f"Rate limiting: sleeping {sleep_time:.2f}s")
                await asyncio.sleep(sleep_time)

            self.last_request_time = time.time()

    def _make_request(self, query: str, num_results: int, search_type: str = "search") -> dict:
        """
        Make a synchronous request to Serper API

        Args:
            query: Search query string
            num_results: Number of results to return
            search_type: Type of search (search, news)

        Returns:
            API response as dictionary
        """
        if not self.api_key:
            raise ValueError("SERPER_API_KEY is not set")

        # Determine endpoint
        if search_type == "news":
            url = "https://google.serper.dev/news"
        else:
            url = self.api_url

        # Prepare request
        headers = {
            'X-API-KEY': self.api_key,
            'Content-Type': 'application/json'
        }

        payload = {
            'q': query,
            'num': num_results
        }

        # Make request
        response = requests.post(
            url,
            headers=headers,
            data=json.dumps(payload),
            timeout=10
        )

        response.raise_for_status()
        return response.json()

    async def search(
        self,
        query: str,
        max_results: Optional[int] = None,
        region: str = 'wt-wt',  # kept for compatibility
        safesearch: str = 'moderate',  # kept for compatibility
        max_retries: int = 3
    ) -> List[Dict[str, str]]:
        """
        Perform web search with rate limiting and retry logic

        Args:
            query: Search query string
            max_results: Override default max results
            region: Region code (kept for compatibility, not used with Serper)
            safesearch: Safe search setting (kept for compatibility)
            max_retries: Maximum number of retry attempts

        Returns:
            List of search results with title, body, and url
        """
        if not query or not query.strip():
            logger.warning("Empty search query provided")
            return []

        if not self.api_key:
            logger.error("SERPER_API_KEY not set. Cannot perform search.")
            return []

        num_results = max_results or self.max_results

        for attempt in range(max_retries):
            try:
                # Rate limiting
                await self._rate_limit()

                logger.info(f"Searching via Serper API for: '{query}' (attempt {attempt + 1}/{max_retries})")

                # Run search in executor
                loop = asyncio.get_event_loop()
                response = await loop.run_in_executor(
                    None,
                    lambda: self._make_request(query, num_results, "search")
                )

                # Parse results
                formatted_results = self._parse_search_results(response)

                logger.info(f"Found {len(formatted_results)} results for query: '{query}'")
                return formatted_results

            except requests.exceptions.HTTPError as e:
                error_msg = str(e)
                logger.warning(f"Search attempt {attempt + 1}/{max_retries} failed for '{query}': {error_msg}")

                # Check if rate limited or quota exceeded
                if e.response.status_code == 429 or "quota" in error_msg.lower():
                    if attempt < max_retries - 1:
                        # Exponential backoff: 5s, 10s, 20s
                        backoff_time = 5 * (2 ** attempt)
                        logger.info(f"Rate limited or quota exceeded, backing off for {backoff_time}s...")
                        await asyncio.sleep(backoff_time)
                        continue

                # If last attempt, log and return empty
                if attempt == max_retries - 1:
                    logger.error(f"All {max_retries} attempts failed for query '{query}'")
                    return []

                # Wait before retry
                await asyncio.sleep(2)

            except Exception as e:
                logger.error(f"Search attempt {attempt + 1}/{max_retries} failed for '{query}': {str(e)}")

                if attempt == max_retries - 1:
                    logger.error(f"All {max_retries} attempts failed for query '{query}'")
                    return []

                await asyncio.sleep(2)

        return []

    def _parse_search_results(self, response: dict) -> List[Dict[str, str]]:
        """
        Parse Serper API response into our standard format

        Args:
            response: API response dictionary

        Returns:
            List of formatted search results
        """
        formatted_results = []

        try:
            # Serper returns results in 'organic' field
            organic_results = response.get('organic', [])

            for result in organic_results:
                formatted_results.append({
                    'title': result.get('title', ''),
                    'body': result.get('snippet', ''),
                    'url': result.get('link', ''),
                    'source': self._extract_domain(result.get('link', ''))
                })

            # Also check for answer box / knowledge graph
            if 'answerBox' in response:
                answer_box = response['answerBox']
                formatted_results.insert(0, {
                    'title': answer_box.get('title', 'Answer'),
                    'body': answer_box.get('answer', answer_box.get('snippet', '')),
                    'url': answer_box.get('link', ''),
                    'source': 'Google Answer Box'
                })

            # Knowledge graph
            if 'knowledgeGraph' in response:
                kg = response['knowledgeGraph']
                formatted_results.insert(0, {
                    'title': kg.get('title', 'Knowledge Graph'),
                    'body': kg.get('description', ''),
                    'url': kg.get('website', ''),
                    'source': 'Google Knowledge Graph'
                })

        except Exception as e:
            logger.error(f"Error parsing search results: {str(e)}")

        return formatted_results

    def _extract_domain(self, url: str) -> str:
        """Extract domain from URL"""
        if not url:
            return 'unknown'
        try:
            from urllib.parse import urlparse
            parsed = urlparse(url)
            return parsed.netloc or 'unknown'
        except:
            return 'unknown'

    async def search_news(
        self,
        query: str,
        max_results: Optional[int] = None,
        max_retries: int = 3
    ) -> List[Dict[str, str]]:
        """
        Search for news articles with rate limiting and retry logic

        Args:
            query: Search query string
            max_results: Override default max results
            max_retries: Maximum number of retry attempts

        Returns:
            List of news results
        """
        if not query or not query.strip():
            logger.warning("Empty news search query provided")
            return []

        if not self.api_key:
            logger.error("SERPER_API_KEY not set. Cannot perform news search.")
            return []

        num_results = max_results or self.max_results

        for attempt in range(max_retries):
            try:
                # Rate limiting
                await self._rate_limit()

                logger.info(f"Searching Serper News API for: '{query}' (attempt {attempt + 1}/{max_retries})")

                # Run news search in executor
                loop = asyncio.get_event_loop()
                response = await loop.run_in_executor(
                    None,
                    lambda: self._make_request(query, num_results, "news")
                )

                # Parse news results
                formatted_results = self._parse_news_results(response)

                logger.info(f"Found {len(formatted_results)} news results for query: '{query}'")
                return formatted_results

            except requests.exceptions.HTTPError as e:
                error_msg = str(e)
                logger.warning(f"News search attempt {attempt + 1}/{max_retries} failed for '{query}': {error_msg}")

                if e.response.status_code == 429 or "quota" in error_msg.lower():
                    if attempt < max_retries - 1:
                        backoff_time = 5 * (2 ** attempt)
                        logger.info(f"Rate limited, backing off for {backoff_time}s...")
                        await asyncio.sleep(backoff_time)
                        continue

                if attempt == max_retries - 1:
                    logger.error(f"All {max_retries} attempts failed for news query '{query}'")
                    return []

                await asyncio.sleep(2)

            except Exception as e:
                logger.error(f"News search attempt {attempt + 1}/{max_retries} failed for '{query}': {str(e)}")

                if attempt == max_retries - 1:
                    logger.error(f"All {max_retries} attempts failed for news query '{query}'")
                    return []

                await asyncio.sleep(2)

        return []

    def _parse_news_results(self, response: dict) -> List[Dict[str, str]]:
        """
        Parse Serper news API response

        Args:
            response: API response dictionary

        Returns:
            List of formatted news results
        """
        formatted_results = []

        try:
            # Serper returns news results in 'news' field
            news_results = response.get('news', [])

            for result in news_results:
                formatted_results.append({
                    'title': result.get('title', ''),
                    'body': result.get('snippet', ''),
                    'url': result.get('link', ''),
                    'source': result.get('source', self._extract_domain(result.get('link', ''))),
                    'date': result.get('date', '')
                })

        except Exception as e:
            logger.error(f"Error parsing news results: {str(e)}")

        return formatted_results

    async def instant_answer(self, query: str) -> Optional[str]:
        """
        Get instant answer for a query from answer box/knowledge graph

        Args:
            query: Search query string

        Returns:
            Instant answer text or None
        """
        if not query or not query.strip():
            return None

        if not self.api_key:
            return None

        try:
            logger.info(f"Getting instant answer for: '{query}'")

            # Perform regular search
            results = await self.search(query, max_results=1)

            # Check if first result is from answer box or knowledge graph
            if results and len(results) > 0:
                first_result = results[0]
                if first_result.get('source') in ['Google Answer Box', 'Google Knowledge Graph']:
                    return first_result.get('body', '')

            return None

        except Exception as e:
            logger.error(f"Instant answer error for query '{query}': {str(e)}")
            return None


# Singleton instance
_search_service: Optional[WebSearchService] = None


def get_search_service() -> WebSearchService:
    """Get or create singleton search service instance"""
    global _search_service
    if _search_service is None:
        _search_service = WebSearchService()
    return _search_service