|
|
""" |
|
|
Enterprise-grade Web Scraping Service |
|
|
Extracts company information, contact pages, and decision-maker details |
|
|
""" |
|
|
import asyncio |
|
|
import re |
|
|
import logging |
|
|
from typing import Dict, List, Optional |
|
|
from urllib.parse import urljoin, urlparse |
|
|
import requests |
|
|
from bs4 import BeautifulSoup |
|
|
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
|
|
|
class WebScraperService: |
|
|
"""Production-ready web scraper for company and contact information""" |
|
|
|
|
|
def __init__(self, timeout: int = 10, max_retries: int = 2): |
|
|
self.timeout = timeout |
|
|
self.max_retries = max_retries |
|
|
self.session = requests.Session() |
|
|
self.session.headers.update({ |
|
|
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36' |
|
|
}) |
|
|
|
|
|
async def extract_company_info(self, url: str) -> Dict[str, any]: |
|
|
""" |
|
|
Extract company information from website |
|
|
|
|
|
Args: |
|
|
url: Company website URL |
|
|
|
|
|
Returns: |
|
|
Dictionary with company info |
|
|
""" |
|
|
try: |
|
|
logger.info(f"Extracting company info from: {url}") |
|
|
|
|
|
|
|
|
loop = asyncio.get_event_loop() |
|
|
response = await loop.run_in_executor( |
|
|
None, |
|
|
lambda: self.session.get(url, timeout=self.timeout, allow_redirects=True) |
|
|
) |
|
|
|
|
|
if response.status_code != 200: |
|
|
logger.warning(f"Failed to fetch {url}: Status {response.status_code}") |
|
|
return {} |
|
|
|
|
|
soup = BeautifulSoup(response.text, 'html.parser') |
|
|
|
|
|
|
|
|
company_name = self._extract_company_name(soup, url) |
|
|
|
|
|
|
|
|
description = self._extract_description(soup) |
|
|
|
|
|
|
|
|
contact_url = self._find_contact_page(soup, url) |
|
|
|
|
|
|
|
|
domain = urlparse(url).netloc.replace('www.', '') |
|
|
|
|
|
return { |
|
|
'name': company_name, |
|
|
'website': url, |
|
|
'domain': domain, |
|
|
'description': description, |
|
|
'contact_page': contact_url |
|
|
} |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Error extracting company info from {url}: {str(e)}") |
|
|
return {} |
|
|
|
|
|
def _extract_company_name(self, soup: BeautifulSoup, url: str) -> str: |
|
|
"""Extract company name from page""" |
|
|
|
|
|
og_site_name = soup.find('meta', property='og:site_name') |
|
|
if og_site_name and og_site_name.get('content'): |
|
|
return og_site_name['content'] |
|
|
|
|
|
|
|
|
title = soup.find('title') |
|
|
if title: |
|
|
|
|
|
clean_title = re.sub(r'\s*[-|]\s*(Home|Homepage|Welcome).*$', '', title.text, flags=re.IGNORECASE) |
|
|
return clean_title.strip() |
|
|
|
|
|
|
|
|
domain = urlparse(url).netloc.replace('www.', '') |
|
|
return domain.split('.')[0].title() |
|
|
|
|
|
def _extract_description(self, soup: BeautifulSoup) -> str: |
|
|
"""Extract company description""" |
|
|
|
|
|
meta_desc = soup.find('meta', attrs={'name': 'description'}) |
|
|
if meta_desc and meta_desc.get('content'): |
|
|
return meta_desc['content'] |
|
|
|
|
|
|
|
|
og_desc = soup.find('meta', property='og:description') |
|
|
if og_desc and og_desc.get('content'): |
|
|
return og_desc['content'] |
|
|
|
|
|
|
|
|
first_p = soup.find('p') |
|
|
if first_p: |
|
|
return first_p.text.strip()[:200] |
|
|
|
|
|
return "" |
|
|
|
|
|
def _find_contact_page(self, soup: BeautifulSoup, base_url: str) -> Optional[str]: |
|
|
"""Find contact page URL""" |
|
|
|
|
|
contact_patterns = [ |
|
|
r'contact', |
|
|
r'about.*us', |
|
|
r'team', |
|
|
r'leadership', |
|
|
r'get.*in.*touch', |
|
|
r'reach.*us' |
|
|
] |
|
|
|
|
|
|
|
|
for link in soup.find_all('a', href=True): |
|
|
href = link['href'].lower() |
|
|
link_text = link.text.lower() |
|
|
|
|
|
for pattern in contact_patterns: |
|
|
if re.search(pattern, href) or re.search(pattern, link_text): |
|
|
|
|
|
full_url = urljoin(base_url, link['href']) |
|
|
return full_url |
|
|
|
|
|
|
|
|
domain = urlparse(base_url).scheme + "://" + urlparse(base_url).netloc |
|
|
common_paths = ['/contact', '/contact-us', '/about', '/about-us', '/team'] |
|
|
|
|
|
for path in common_paths: |
|
|
test_url = domain + path |
|
|
try: |
|
|
response = self.session.head(test_url, timeout=5, allow_redirects=True) |
|
|
if response.status_code == 200: |
|
|
return test_url |
|
|
except: |
|
|
continue |
|
|
|
|
|
return None |
|
|
|
|
|
async def scrape_page(self, url: str) -> Optional[Dict[str, any]]: |
|
|
""" |
|
|
Generic page scraper that returns full page content |
|
|
|
|
|
Args: |
|
|
url: Page URL to scrape |
|
|
|
|
|
Returns: |
|
|
Dictionary with page content (html, text, soup) |
|
|
""" |
|
|
try: |
|
|
logger.info(f"Scraping page: {url}") |
|
|
|
|
|
loop = asyncio.get_event_loop() |
|
|
response = await loop.run_in_executor( |
|
|
None, |
|
|
lambda: self.session.get(url, timeout=self.timeout, allow_redirects=True) |
|
|
) |
|
|
|
|
|
if response.status_code != 200: |
|
|
logger.warning(f"Failed to scrape {url}: Status {response.status_code}") |
|
|
return None |
|
|
|
|
|
soup = BeautifulSoup(response.text, 'html.parser') |
|
|
|
|
|
|
|
|
for script in soup(["script", "style"]): |
|
|
script.decompose() |
|
|
|
|
|
|
|
|
text = soup.get_text() |
|
|
|
|
|
|
|
|
lines = (line.strip() for line in text.splitlines()) |
|
|
chunks = (phrase.strip() for line in lines for phrase in line.split(" ")) |
|
|
text = '\n'.join(chunk for chunk in chunks if chunk) |
|
|
|
|
|
return { |
|
|
'url': url, |
|
|
'html': response.text, |
|
|
'text': text, |
|
|
'soup': soup |
|
|
} |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Error scraping page {url}: {str(e)}") |
|
|
return None |
|
|
|
|
|
async def scrape_contact_page(self, url: str) -> Dict[str, List[str]]: |
|
|
""" |
|
|
Scrape contact information from a page |
|
|
|
|
|
Args: |
|
|
url: Contact page URL |
|
|
|
|
|
Returns: |
|
|
Dictionary with emails, phones, names found |
|
|
""" |
|
|
try: |
|
|
logger.info(f"Scraping contact page: {url}") |
|
|
|
|
|
loop = asyncio.get_event_loop() |
|
|
response = await loop.run_in_executor( |
|
|
None, |
|
|
lambda: self.session.get(url, timeout=self.timeout, allow_redirects=True) |
|
|
) |
|
|
|
|
|
if response.status_code != 200: |
|
|
return {'emails': [], 'phones': [], 'names': []} |
|
|
|
|
|
text = response.text |
|
|
soup = BeautifulSoup(text, 'html.parser') |
|
|
|
|
|
|
|
|
emails = self._extract_emails(text) |
|
|
|
|
|
|
|
|
phones = self._extract_phones(text) |
|
|
|
|
|
|
|
|
names = self._extract_names(soup) |
|
|
|
|
|
return { |
|
|
'emails': list(set(emails)), |
|
|
'phones': list(set(phones)), |
|
|
'names': list(set(names)) |
|
|
} |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Error scraping contact page {url}: {str(e)}") |
|
|
return {'emails': [], 'phones': [], 'names': []} |
|
|
|
|
|
def _extract_emails(self, text: str) -> List[str]: |
|
|
"""Extract email addresses from text""" |
|
|
|
|
|
email_pattern = r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b' |
|
|
|
|
|
emails = re.findall(email_pattern, text) |
|
|
|
|
|
|
|
|
filtered = [] |
|
|
ignore_patterns = ['example.com', 'domain.com', 'email.com', 'yourcompany.com', 'image', 'pixel'] |
|
|
|
|
|
for email in emails: |
|
|
if not any(pattern in email.lower() for pattern in ignore_patterns): |
|
|
filtered.append(email.lower()) |
|
|
|
|
|
return filtered |
|
|
|
|
|
def _extract_phones(self, text: str) -> List[str]: |
|
|
"""Extract phone numbers from text""" |
|
|
|
|
|
phone_patterns = [ |
|
|
r'\+?1?\s*\(?([0-9]{3})\)?[\s.-]?([0-9]{3})[\s.-]?([0-9]{4})', |
|
|
r'\+?([0-9]{1,3})?[\s.-]?\(?([0-9]{2,4})\)?[\s.-]?([0-9]{3,4})[\s.-]?([0-9]{4})' |
|
|
] |
|
|
|
|
|
phones = [] |
|
|
for pattern in phone_patterns: |
|
|
matches = re.findall(pattern, text) |
|
|
for match in matches: |
|
|
if isinstance(match, tuple): |
|
|
phone = ''.join(match) |
|
|
else: |
|
|
phone = match |
|
|
if len(phone) >= 10: |
|
|
phones.append(phone) |
|
|
|
|
|
return phones[:5] |
|
|
|
|
|
def _extract_names(self, soup: BeautifulSoup) -> List[str]: |
|
|
"""Extract person names from page""" |
|
|
names = [] |
|
|
|
|
|
|
|
|
|
|
|
team_sections = soup.find_all(['section', 'div'], class_=re.compile(r'team|staff|leadership|people', re.I)) |
|
|
|
|
|
for section in team_sections: |
|
|
|
|
|
headings = section.find_all(['h2', 'h3', 'h4', 'p']) |
|
|
for heading in headings: |
|
|
text = heading.text.strip() |
|
|
|
|
|
words = text.split() |
|
|
if 2 <= len(words) <= 4 and all(w[0].isupper() for w in words if w): |
|
|
names.append(text) |
|
|
|
|
|
|
|
|
title_patterns = [ |
|
|
r'(CEO|CTO|CFO|COO|President|VP|Director|Manager|Head of)\s*[:-]\s*([A-Z][a-z]+\s+[A-Z][a-z]+)', |
|
|
r'([A-Z][a-z]+\s+[A-Z][a-z]+)\s*,\s*(CEO|CTO|CFO|COO|President|VP|Director)' |
|
|
] |
|
|
|
|
|
page_text = soup.get_text() |
|
|
for pattern in title_patterns: |
|
|
matches = re.findall(pattern, page_text) |
|
|
for match in matches: |
|
|
if isinstance(match, tuple): |
|
|
name = match[1] if match[0] in ['CEO', 'CTO', 'CFO', 'COO', 'President', 'VP', 'Director'] else match[0] |
|
|
names.append(name) |
|
|
|
|
|
return names[:10] |
|
|
|
|
|
async def find_linkedin_profiles(self, company_name: str, title: str = "CEO") -> List[Dict[str, str]]: |
|
|
""" |
|
|
Find LinkedIn profiles via Google search |
|
|
|
|
|
Args: |
|
|
company_name: Company name |
|
|
title: Job title to search for |
|
|
|
|
|
Returns: |
|
|
List of potential profiles |
|
|
""" |
|
|
|
|
|
|
|
|
return [] |
|
|
|
|
|
def generate_email_patterns(self, name: str, domain: str) -> List[str]: |
|
|
""" |
|
|
Generate possible email addresses for a person |
|
|
|
|
|
Args: |
|
|
name: Person's full name |
|
|
domain: Company domain |
|
|
|
|
|
Returns: |
|
|
List of possible email addresses |
|
|
""" |
|
|
if not name or not domain: |
|
|
return [] |
|
|
|
|
|
|
|
|
parts = name.lower().split() |
|
|
if len(parts) < 2: |
|
|
return [] |
|
|
|
|
|
first = parts[0] |
|
|
last = parts[-1] |
|
|
|
|
|
|
|
|
patterns = [ |
|
|
f"{first}.{last}@{domain}", |
|
|
f"{first}{last}@{domain}", |
|
|
f"{first[0]}{last}@{domain}", |
|
|
f"{first}_{last}@{domain}", |
|
|
f"{last}.{first}@{domain}", |
|
|
f"{first}@{domain}", |
|
|
f"{last}@{domain}" |
|
|
] |
|
|
|
|
|
return patterns |
|
|
|
|
|
def validate_email_format(self, email: str) -> bool: |
|
|
"""Validate email format""" |
|
|
pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$' |
|
|
return bool(re.match(pattern, email)) |
|
|
|