""" Enterprise-grade Web Scraping Service Extracts company information, contact pages, and decision-maker details """ import asyncio import re import logging from typing import Dict, List, Optional from urllib.parse import urljoin, urlparse import requests from bs4 import BeautifulSoup logger = logging.getLogger(__name__) class WebScraperService: """Production-ready web scraper for company and contact information""" def __init__(self, timeout: int = 10, max_retries: int = 2): self.timeout = timeout self.max_retries = max_retries self.session = requests.Session() self.session.headers.update({ 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36' }) async def extract_company_info(self, url: str) -> Dict[str, any]: """ Extract company information from website Args: url: Company website URL Returns: Dictionary with company info """ try: logger.info(f"Extracting company info from: {url}") # Fetch page loop = asyncio.get_event_loop() response = await loop.run_in_executor( None, lambda: self.session.get(url, timeout=self.timeout, allow_redirects=True) ) if response.status_code != 200: logger.warning(f"Failed to fetch {url}: Status {response.status_code}") return {} soup = BeautifulSoup(response.text, 'html.parser') # Extract company name company_name = self._extract_company_name(soup, url) # Extract description description = self._extract_description(soup) # Find contact page URL contact_url = self._find_contact_page(soup, url) # Extract domain domain = urlparse(url).netloc.replace('www.', '') return { 'name': company_name, 'website': url, 'domain': domain, 'description': description, 'contact_page': contact_url } except Exception as e: logger.error(f"Error extracting company info from {url}: {str(e)}") return {} def _extract_company_name(self, soup: BeautifulSoup, url: str) -> str: """Extract company name from page""" # Try meta tags first og_site_name = soup.find('meta', property='og:site_name') if og_site_name and og_site_name.get('content'): return og_site_name['content'] # Try title tag title = soup.find('title') if title: # Clean up title (remove " - Home" etc.) clean_title = re.sub(r'\s*[-|]\s*(Home|Homepage|Welcome).*$', '', title.text, flags=re.IGNORECASE) return clean_title.strip() # Fallback to domain domain = urlparse(url).netloc.replace('www.', '') return domain.split('.')[0].title() def _extract_description(self, soup: BeautifulSoup) -> str: """Extract company description""" # Try meta description meta_desc = soup.find('meta', attrs={'name': 'description'}) if meta_desc and meta_desc.get('content'): return meta_desc['content'] # Try og:description og_desc = soup.find('meta', property='og:description') if og_desc and og_desc.get('content'): return og_desc['content'] # Try first paragraph first_p = soup.find('p') if first_p: return first_p.text.strip()[:200] return "" def _find_contact_page(self, soup: BeautifulSoup, base_url: str) -> Optional[str]: """Find contact page URL""" # Common contact page patterns contact_patterns = [ r'contact', r'about.*us', r'team', r'leadership', r'get.*in.*touch', r'reach.*us' ] # Search all links for link in soup.find_all('a', href=True): href = link['href'].lower() link_text = link.text.lower() for pattern in contact_patterns: if re.search(pattern, href) or re.search(pattern, link_text): # Convert relative to absolute URL full_url = urljoin(base_url, link['href']) return full_url # Try common URLs directly domain = urlparse(base_url).scheme + "://" + urlparse(base_url).netloc common_paths = ['/contact', '/contact-us', '/about', '/about-us', '/team'] for path in common_paths: test_url = domain + path try: response = self.session.head(test_url, timeout=5, allow_redirects=True) if response.status_code == 200: return test_url except: continue return None async def scrape_page(self, url: str) -> Optional[Dict[str, any]]: """ Generic page scraper that returns full page content Args: url: Page URL to scrape Returns: Dictionary with page content (html, text, soup) """ try: logger.info(f"Scraping page: {url}") loop = asyncio.get_event_loop() response = await loop.run_in_executor( None, lambda: self.session.get(url, timeout=self.timeout, allow_redirects=True) ) if response.status_code != 200: logger.warning(f"Failed to scrape {url}: Status {response.status_code}") return None soup = BeautifulSoup(response.text, 'html.parser') # Remove script and style elements for script in soup(["script", "style"]): script.decompose() # Get text text = soup.get_text() # Clean up text - remove multiple newlines/spaces lines = (line.strip() for line in text.splitlines()) chunks = (phrase.strip() for line in lines for phrase in line.split(" ")) text = '\n'.join(chunk for chunk in chunks if chunk) return { 'url': url, 'html': response.text, 'text': text, 'soup': soup } except Exception as e: logger.error(f"Error scraping page {url}: {str(e)}") return None async def scrape_contact_page(self, url: str) -> Dict[str, List[str]]: """ Scrape contact information from a page Args: url: Contact page URL Returns: Dictionary with emails, phones, names found """ try: logger.info(f"Scraping contact page: {url}") loop = asyncio.get_event_loop() response = await loop.run_in_executor( None, lambda: self.session.get(url, timeout=self.timeout, allow_redirects=True) ) if response.status_code != 200: return {'emails': [], 'phones': [], 'names': []} text = response.text soup = BeautifulSoup(text, 'html.parser') # Extract emails emails = self._extract_emails(text) # Extract phone numbers phones = self._extract_phones(text) # Extract names (people mentioned) names = self._extract_names(soup) return { 'emails': list(set(emails)), 'phones': list(set(phones)), 'names': list(set(names)) } except Exception as e: logger.error(f"Error scraping contact page {url}: {str(e)}") return {'emails': [], 'phones': [], 'names': []} def _extract_emails(self, text: str) -> List[str]: """Extract email addresses from text""" # Email regex pattern email_pattern = r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b' emails = re.findall(email_pattern, text) # Filter out common junk emails filtered = [] ignore_patterns = ['example.com', 'domain.com', 'email.com', 'yourcompany.com', 'image', 'pixel'] for email in emails: if not any(pattern in email.lower() for pattern in ignore_patterns): filtered.append(email.lower()) return filtered def _extract_phones(self, text: str) -> List[str]: """Extract phone numbers from text""" # Phone number patterns phone_patterns = [ r'\+?1?\s*\(?([0-9]{3})\)?[\s.-]?([0-9]{3})[\s.-]?([0-9]{4})', # US format r'\+?([0-9]{1,3})?[\s.-]?\(?([0-9]{2,4})\)?[\s.-]?([0-9]{3,4})[\s.-]?([0-9]{4})' # International ] phones = [] for pattern in phone_patterns: matches = re.findall(pattern, text) for match in matches: if isinstance(match, tuple): phone = ''.join(match) else: phone = match if len(phone) >= 10: # Valid phone number phones.append(phone) return phones[:5] # Limit to 5 def _extract_names(self, soup: BeautifulSoup) -> List[str]: """Extract person names from page""" names = [] # Look for common patterns # 1. "Meet the team" sections team_sections = soup.find_all(['section', 'div'], class_=re.compile(r'team|staff|leadership|people', re.I)) for section in team_sections: # Find headings that might be names headings = section.find_all(['h2', 'h3', 'h4', 'p']) for heading in headings: text = heading.text.strip() # Simple check: 2-4 words, each capitalized words = text.split() if 2 <= len(words) <= 4 and all(w[0].isupper() for w in words if w): names.append(text) # 2. Look for title patterns title_patterns = [ r'(CEO|CTO|CFO|COO|President|VP|Director|Manager|Head of)\s*[:-]\s*([A-Z][a-z]+\s+[A-Z][a-z]+)', r'([A-Z][a-z]+\s+[A-Z][a-z]+)\s*,\s*(CEO|CTO|CFO|COO|President|VP|Director)' ] page_text = soup.get_text() for pattern in title_patterns: matches = re.findall(pattern, page_text) for match in matches: if isinstance(match, tuple): name = match[1] if match[0] in ['CEO', 'CTO', 'CFO', 'COO', 'President', 'VP', 'Director'] else match[0] names.append(name) return names[:10] # Limit to 10 async def find_linkedin_profiles(self, company_name: str, title: str = "CEO") -> List[Dict[str, str]]: """ Find LinkedIn profiles via Google search Args: company_name: Company name title: Job title to search for Returns: List of potential profiles """ # We'll use the web search service for this # Return empty for now, will integrate with WebSearchService return [] def generate_email_patterns(self, name: str, domain: str) -> List[str]: """ Generate possible email addresses for a person Args: name: Person's full name domain: Company domain Returns: List of possible email addresses """ if not name or not domain: return [] # Parse name parts = name.lower().split() if len(parts) < 2: return [] first = parts[0] last = parts[-1] # Common patterns patterns = [ f"{first}.{last}@{domain}", f"{first}{last}@{domain}", f"{first[0]}{last}@{domain}", f"{first}_{last}@{domain}", f"{last}.{first}@{domain}", f"{first}@{domain}", f"{last}@{domain}" ] return patterns def validate_email_format(self, email: str) -> bool: """Validate email format""" pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$' return bool(re.match(pattern, email))