|
|
""" |
|
|
Enhanced Contact Finder Service |
|
|
Finds real decision-makers using LinkedIn search, team page scraping, and AI extraction |
|
|
""" |
|
|
from typing import List, Optional, Dict, Set, TYPE_CHECKING |
|
|
import re |
|
|
import logging |
|
|
from email_validator import validate_email, EmailNotValidError |
|
|
from services.web_search import get_search_service |
|
|
from services.web_scraper import WebScraperService |
|
|
from app.schema import Contact |
|
|
import uuid |
|
|
import asyncio |
|
|
|
|
|
if TYPE_CHECKING: |
|
|
from mcp.registry import MCPRegistry |
|
|
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
|
|
|
class EnhancedContactFinder: |
|
|
""" |
|
|
Enhanced contact discovery using multiple strategies: |
|
|
1. LinkedIn profile search |
|
|
2. Company team/about page scraping |
|
|
3. AI-powered contact extraction |
|
|
4. Email pattern detection |
|
|
|
|
|
Now supports MCP (Model Context Protocol) for unified search interface |
|
|
""" |
|
|
|
|
|
def __init__(self, mcp_registry: Optional['MCPRegistry'] = None): |
|
|
""" |
|
|
Initialize enhanced contact finder |
|
|
|
|
|
Args: |
|
|
mcp_registry: Optional MCP registry for unified search (recommended) |
|
|
If None, falls back to direct web search service |
|
|
""" |
|
|
if mcp_registry: |
|
|
|
|
|
self.search = mcp_registry.get_search_client() |
|
|
logger.info("EnhancedContactFinder initialized with MCP search client") |
|
|
else: |
|
|
|
|
|
self.search = get_search_service() |
|
|
logger.warning("EnhancedContactFinder initialized without MCP (consider using MCP)") |
|
|
|
|
|
self.scraper = WebScraperService() |
|
|
|
|
|
|
|
|
self.team_page_patterns = [ |
|
|
'/team', |
|
|
'/about-us', |
|
|
'/about', |
|
|
'/leadership', |
|
|
'/our-team', |
|
|
'/management', |
|
|
'/executives', |
|
|
'/people' |
|
|
] |
|
|
|
|
|
|
|
|
self.name_patterns = [ |
|
|
|
|
|
r'([A-Z][a-z]+(?:\s+[A-Z][a-z]+)+)\s*[-–—]\s*([^|]+?)\s*(?:at|@)\s*([^|]+)', |
|
|
|
|
|
r'([A-Z][a-z]+(?:\s+[A-Z][a-z]+)+),?\s+([^,\n]+?)\s+(?:at|@)\s+([^\n]+)', |
|
|
|
|
|
r'([A-Z][a-z]+(?:\s+[A-Z][a-z]+)+)\s+is\s+(?:the\s+)?([^.]+)', |
|
|
|
|
|
r'([A-Z][a-z]+(?:\s+[A-Z][a-z]+)+)\s*\n\s*([A-Z][^,\n]+)', |
|
|
] |
|
|
|
|
|
|
|
|
|
|
|
self._common_email_patterns = [ |
|
|
'{first}.{last}', |
|
|
'{first}{last}', |
|
|
] |
|
|
|
|
|
async def find_real_contacts( |
|
|
self, |
|
|
company_name: str, |
|
|
domain: str, |
|
|
target_titles: List[str], |
|
|
max_contacts: int = 3 |
|
|
) -> List[Contact]: |
|
|
""" |
|
|
Find real decision-makers with VERIFIED contact information. |
|
|
|
|
|
Searches multiple sources: |
|
|
1. Company website (team/about/contact pages) |
|
|
2. LinkedIn profiles |
|
|
3. Press releases and news articles |
|
|
4. Crunchbase and business directories |
|
|
5. Social media (Twitter, Instagram business profiles) |
|
|
|
|
|
Returns: |
|
|
List of Contact objects with verified information |
|
|
""" |
|
|
logger.info(f"EnhancedFinder: Finding VERIFIED contacts at '{company_name}'") |
|
|
print(f"\n[CONTACT FINDER] ========================================") |
|
|
print(f"[CONTACT FINDER] Starting comprehensive search for {company_name}") |
|
|
print(f"[CONTACT FINDER] Domain: {domain}") |
|
|
print(f"[CONTACT FINDER] Target titles: {target_titles}") |
|
|
print(f"[CONTACT FINDER] ========================================") |
|
|
|
|
|
contacts = [] |
|
|
seen_emails: Set[str] = set() |
|
|
seen_names: Set[str] = set() |
|
|
|
|
|
|
|
|
print(f"\n[CONTACT FINDER] 📄 Strategy 1: Scraping company website...") |
|
|
website_contacts = await self._scrape_company_website( |
|
|
company_name, domain, target_titles, seen_emails, seen_names, max_contacts |
|
|
) |
|
|
contacts.extend(website_contacts) |
|
|
print(f"[CONTACT FINDER] ✓ Found {len(website_contacts)} contacts from company website") |
|
|
|
|
|
|
|
|
if len(contacts) < max_contacts: |
|
|
print(f"\n[CONTACT FINDER] 💼 Strategy 2: Searching LinkedIn...") |
|
|
linkedin_contacts = await self._search_linkedin( |
|
|
company_name, domain, target_titles, seen_emails, seen_names, max_contacts - len(contacts) |
|
|
) |
|
|
contacts.extend(linkedin_contacts) |
|
|
print(f"[CONTACT FINDER] ✓ Found {len(linkedin_contacts)} contacts from LinkedIn") |
|
|
|
|
|
|
|
|
if len(contacts) < max_contacts: |
|
|
print(f"\n[CONTACT FINDER] 📊 Strategy 3: Searching business directories...") |
|
|
directory_contacts = await self._search_business_directories( |
|
|
company_name, domain, target_titles, seen_emails, seen_names, max_contacts - len(contacts) |
|
|
) |
|
|
contacts.extend(directory_contacts) |
|
|
print(f"[CONTACT FINDER] ✓ Found {len(directory_contacts)} contacts from directories") |
|
|
|
|
|
|
|
|
if len(contacts) < max_contacts: |
|
|
print(f"\n[CONTACT FINDER] 📰 Strategy 4: Searching press releases & news...") |
|
|
news_contacts = await self._search_press_releases( |
|
|
company_name, domain, target_titles, seen_emails, seen_names, max_contacts - len(contacts) |
|
|
) |
|
|
contacts.extend(news_contacts) |
|
|
print(f"[CONTACT FINDER] ✓ Found {len(news_contacts)} contacts from news/PR") |
|
|
|
|
|
|
|
|
if len(contacts) < max_contacts: |
|
|
print(f"\n[CONTACT FINDER] 📱 Strategy 5: Searching social media...") |
|
|
social_contacts = await self._search_social_media( |
|
|
company_name, domain, target_titles, seen_emails, seen_names, max_contacts - len(contacts) |
|
|
) |
|
|
contacts.extend(social_contacts) |
|
|
print(f"[CONTACT FINDER] ✓ Found {len(social_contacts)} contacts from social media") |
|
|
|
|
|
|
|
|
if len(contacts) < max_contacts: |
|
|
print(f"\n[CONTACT FINDER] 🔍 Strategy 6: Direct email search...") |
|
|
email_contacts = await self._search_for_emails( |
|
|
company_name, domain, target_titles, seen_emails, max_contacts - len(contacts) |
|
|
) |
|
|
contacts.extend(email_contacts) |
|
|
print(f"[CONTACT FINDER] ✓ Found {len(email_contacts)} contacts from direct email search") |
|
|
|
|
|
logger.info(f"EnhancedFinder: Total {len(contacts)} VERIFIED contacts found for '{company_name}'") |
|
|
print(f"\n[CONTACT FINDER] ========================================") |
|
|
print(f"[CONTACT FINDER] FINAL RESULTS: {len(contacts)} verified contacts") |
|
|
print(f"[CONTACT FINDER] ========================================") |
|
|
for i, contact in enumerate(contacts[:max_contacts], 1): |
|
|
print(f"[CONTACT FINDER] {i}. {contact.name} ({contact.title})") |
|
|
print(f"[CONTACT FINDER] 📧 {contact.email}") |
|
|
if len(contacts) == 0: |
|
|
print(f"[CONTACT FINDER] No verified contacts found.") |
|
|
print(f"[CONTACT FINDER] Try manual search on LinkedIn or company website.") |
|
|
print(f"[CONTACT FINDER] ========================================\n") |
|
|
return contacts[:max_contacts] |
|
|
|
|
|
async def _scrape_company_website( |
|
|
self, |
|
|
company_name: str, |
|
|
domain: str, |
|
|
target_titles: List[str], |
|
|
seen_emails: Set[str], |
|
|
seen_names: Set[str], |
|
|
max_needed: int |
|
|
) -> List[Contact]: |
|
|
"""Scrape company website for contact information""" |
|
|
contacts = [] |
|
|
|
|
|
|
|
|
pages_to_check = [ |
|
|
f"https://{domain}/team", |
|
|
f"https://{domain}/about", |
|
|
f"https://{domain}/about-us", |
|
|
f"https://{domain}/leadership", |
|
|
f"https://{domain}/our-team", |
|
|
f"https://{domain}/management", |
|
|
f"https://{domain}/contact", |
|
|
f"https://{domain}/contact-us", |
|
|
f"https://www.{domain}/team", |
|
|
f"https://www.{domain}/about", |
|
|
f"https://www.{domain}/about-us", |
|
|
f"https://www.{domain}/leadership", |
|
|
f"https://www.{domain}/contact", |
|
|
] |
|
|
|
|
|
for url in pages_to_check: |
|
|
if len(contacts) >= max_needed: |
|
|
break |
|
|
try: |
|
|
print(f"[CONTACT FINDER] Checking: {url}") |
|
|
page_content = await self.scraper.scrape_page(url) |
|
|
if not page_content: |
|
|
continue |
|
|
|
|
|
text = page_content.get('text', '') |
|
|
html = page_content.get('html', '') |
|
|
|
|
|
|
|
|
found_emails = self._extract_emails_from_text(text, domain) |
|
|
|
|
|
for email in found_emails: |
|
|
if email.lower() not in seen_emails and not self._is_generic_email(email.split('@')[0]): |
|
|
name, title = self._extract_name_near_email(text, email, target_titles) |
|
|
if name and name.lower() not in seen_names: |
|
|
contacts.append(Contact( |
|
|
id=str(uuid.uuid4()), |
|
|
name=name, |
|
|
email=email, |
|
|
title=title or "Executive", |
|
|
prospect_id="" |
|
|
)) |
|
|
seen_emails.add(email.lower()) |
|
|
seen_names.add(name.lower()) |
|
|
print(f"[CONTACT FINDER] ✓ FOUND: {name} ({title}) - {email}") |
|
|
|
|
|
if len(contacts) >= max_needed: |
|
|
return contacts |
|
|
|
|
|
except Exception as e: |
|
|
logger.debug(f"Error scraping {url}: {str(e)}") |
|
|
continue |
|
|
|
|
|
return contacts |
|
|
|
|
|
async def _search_linkedin( |
|
|
self, |
|
|
company_name: str, |
|
|
domain: str, |
|
|
target_titles: List[str], |
|
|
seen_emails: Set[str], |
|
|
seen_names: Set[str], |
|
|
max_needed: int |
|
|
) -> List[Contact]: |
|
|
"""Search LinkedIn for company executives with contact info""" |
|
|
contacts = [] |
|
|
|
|
|
for title in target_titles[:5]: |
|
|
if len(contacts) >= max_needed: |
|
|
break |
|
|
|
|
|
|
|
|
queries = [ |
|
|
f'site:linkedin.com/in "{company_name}" "{title}" email', |
|
|
f'site:linkedin.com "{company_name}" {title} contact', |
|
|
f'linkedin.com/in {title} {company_name} "@{domain}"', |
|
|
] |
|
|
|
|
|
for query in queries: |
|
|
if len(contacts) >= max_needed: |
|
|
break |
|
|
try: |
|
|
print(f"[CONTACT FINDER] Query: {query[:60]}...") |
|
|
results = await self.search.search(query, max_results=5) |
|
|
|
|
|
for result in results: |
|
|
text = result.get('title', '') + ' ' + result.get('body', '') |
|
|
url = result.get('url', '') |
|
|
|
|
|
|
|
|
found_emails = self._extract_emails_from_text(text, domain) |
|
|
|
|
|
if found_emails: |
|
|
for email in found_emails: |
|
|
if email.lower() not in seen_emails: |
|
|
name = self._extract_linkedin_name(text, result.get('title', '')) |
|
|
if name and name.lower() not in seen_names: |
|
|
contacts.append(Contact( |
|
|
id=str(uuid.uuid4()), |
|
|
name=name, |
|
|
email=email, |
|
|
title=title, |
|
|
prospect_id="" |
|
|
)) |
|
|
seen_emails.add(email.lower()) |
|
|
seen_names.add(name.lower()) |
|
|
print(f"[CONTACT FINDER] ✓ FOUND: {name} ({title}) - {email}") |
|
|
|
|
|
except Exception as e: |
|
|
logger.debug(f"LinkedIn search error: {str(e)}") |
|
|
continue |
|
|
|
|
|
return contacts |
|
|
|
|
|
async def _search_business_directories( |
|
|
self, |
|
|
company_name: str, |
|
|
domain: str, |
|
|
target_titles: List[str], |
|
|
seen_emails: Set[str], |
|
|
seen_names: Set[str], |
|
|
max_needed: int |
|
|
) -> List[Contact]: |
|
|
"""Search Crunchbase, ZoomInfo, and other business directories""" |
|
|
contacts = [] |
|
|
|
|
|
|
|
|
queries = [ |
|
|
f'site:crunchbase.com "{company_name}" founder CEO email', |
|
|
f'site:crunchbase.com/person "{company_name}" email', |
|
|
f'"{company_name}" founder email "@{domain}"', |
|
|
f'"{company_name}" CEO email contact', |
|
|
f'site:zoominfo.com "{company_name}" contact', |
|
|
f'site:apollo.io "{company_name}" email', |
|
|
] |
|
|
|
|
|
for query in queries: |
|
|
if len(contacts) >= max_needed: |
|
|
break |
|
|
try: |
|
|
print(f"[CONTACT FINDER] Query: {query[:60]}...") |
|
|
results = await self.search.search(query, max_results=5) |
|
|
|
|
|
for result in results: |
|
|
text = result.get('title', '') + ' ' + result.get('body', '') |
|
|
|
|
|
found_emails = self._extract_emails_from_text(text, domain) |
|
|
|
|
|
for email in found_emails: |
|
|
if email.lower() not in seen_emails and not self._is_generic_email(email.split('@')[0]): |
|
|
name, title = self._extract_name_near_email(text, email, target_titles) |
|
|
if name and name.lower() not in seen_names: |
|
|
contacts.append(Contact( |
|
|
id=str(uuid.uuid4()), |
|
|
name=name, |
|
|
email=email, |
|
|
title=title or "Founder/Executive", |
|
|
prospect_id="" |
|
|
)) |
|
|
seen_emails.add(email.lower()) |
|
|
seen_names.add(name.lower()) |
|
|
print(f"[CONTACT FINDER] ✓ FOUND: {name} - {email}") |
|
|
|
|
|
except Exception as e: |
|
|
logger.debug(f"Directory search error: {str(e)}") |
|
|
continue |
|
|
|
|
|
return contacts |
|
|
|
|
|
async def _search_press_releases( |
|
|
self, |
|
|
company_name: str, |
|
|
domain: str, |
|
|
target_titles: List[str], |
|
|
seen_emails: Set[str], |
|
|
seen_names: Set[str], |
|
|
max_needed: int |
|
|
) -> List[Contact]: |
|
|
"""Search press releases and news for executive contact info""" |
|
|
contacts = [] |
|
|
|
|
|
queries = [ |
|
|
f'"{company_name}" press release contact email', |
|
|
f'"{company_name}" announcement CEO founder email', |
|
|
f'site:prnewswire.com "{company_name}" contact', |
|
|
f'site:businesswire.com "{company_name}" contact', |
|
|
f'"{company_name}" media contact "@{domain}"', |
|
|
f'"{company_name}" PR contact email', |
|
|
] |
|
|
|
|
|
for query in queries: |
|
|
if len(contacts) >= max_needed: |
|
|
break |
|
|
try: |
|
|
print(f"[CONTACT FINDER] Query: {query[:60]}...") |
|
|
results = await self.search.search(query, max_results=5) |
|
|
|
|
|
for result in results: |
|
|
text = result.get('title', '') + ' ' + result.get('body', '') |
|
|
|
|
|
found_emails = self._extract_emails_from_text(text, domain) |
|
|
|
|
|
for email in found_emails: |
|
|
if email.lower() not in seen_emails and not self._is_generic_email(email.split('@')[0]): |
|
|
name, title = self._extract_name_near_email(text, email, target_titles) |
|
|
if name and name.lower() not in seen_names: |
|
|
contacts.append(Contact( |
|
|
id=str(uuid.uuid4()), |
|
|
name=name, |
|
|
email=email, |
|
|
title=title or "Media Contact", |
|
|
prospect_id="" |
|
|
)) |
|
|
seen_emails.add(email.lower()) |
|
|
seen_names.add(name.lower()) |
|
|
print(f"[CONTACT FINDER] ✓ FOUND: {name} - {email}") |
|
|
|
|
|
except Exception as e: |
|
|
logger.debug(f"Press release search error: {str(e)}") |
|
|
continue |
|
|
|
|
|
return contacts |
|
|
|
|
|
async def _search_social_media( |
|
|
self, |
|
|
company_name: str, |
|
|
domain: str, |
|
|
target_titles: List[str], |
|
|
seen_emails: Set[str], |
|
|
seen_names: Set[str], |
|
|
max_needed: int |
|
|
) -> List[Contact]: |
|
|
"""Search social media profiles for contact information""" |
|
|
contacts = [] |
|
|
|
|
|
queries = [ |
|
|
f'site:twitter.com "{company_name}" email "@{domain}"', |
|
|
f'site:instagram.com "{company_name}" email contact', |
|
|
f'"{company_name}" twitter CEO founder email', |
|
|
f'"{company_name}" instagram business email', |
|
|
f'site:facebook.com "{company_name}" about email', |
|
|
] |
|
|
|
|
|
for query in queries: |
|
|
if len(contacts) >= max_needed: |
|
|
break |
|
|
try: |
|
|
print(f"[CONTACT FINDER] Query: {query[:60]}...") |
|
|
results = await self.search.search(query, max_results=5) |
|
|
|
|
|
for result in results: |
|
|
text = result.get('title', '') + ' ' + result.get('body', '') |
|
|
|
|
|
found_emails = self._extract_emails_from_text(text, domain) |
|
|
|
|
|
for email in found_emails: |
|
|
if email.lower() not in seen_emails and not self._is_generic_email(email.split('@')[0]): |
|
|
name, title = self._extract_name_near_email(text, email, target_titles) |
|
|
if name and name.lower() not in seen_names: |
|
|
contacts.append(Contact( |
|
|
id=str(uuid.uuid4()), |
|
|
name=name, |
|
|
email=email, |
|
|
title=title or "Executive", |
|
|
prospect_id="" |
|
|
)) |
|
|
seen_emails.add(email.lower()) |
|
|
seen_names.add(name.lower()) |
|
|
print(f"[CONTACT FINDER] ✓ FOUND: {name} - {email}") |
|
|
|
|
|
except Exception as e: |
|
|
logger.debug(f"Social media search error: {str(e)}") |
|
|
continue |
|
|
|
|
|
return contacts |
|
|
|
|
|
def _extract_linkedin_name(self, text: str, title: str) -> Optional[str]: |
|
|
"""Extract name from LinkedIn search result""" |
|
|
|
|
|
linkedin_pattern = r'^([A-Z][a-z]+(?:\s+[A-Z][a-z]+)+)\s*[-–—]' |
|
|
match = re.search(linkedin_pattern, title) |
|
|
if match: |
|
|
name = match.group(1).strip() |
|
|
if self._is_valid_name(name): |
|
|
return name |
|
|
|
|
|
|
|
|
for pattern in self.name_patterns: |
|
|
match = re.search(pattern, text) |
|
|
if match: |
|
|
name = match.group(1).strip() |
|
|
if self._is_valid_name(name): |
|
|
return name |
|
|
|
|
|
return None |
|
|
|
|
|
async def _search_for_emails( |
|
|
self, |
|
|
company_name: str, |
|
|
domain: str, |
|
|
target_titles: List[str], |
|
|
seen_emails: Set[str], |
|
|
max_needed: int |
|
|
) -> List[Contact]: |
|
|
"""Search specifically for email addresses associated with company executives""" |
|
|
contacts = [] |
|
|
|
|
|
|
|
|
email_queries = [ |
|
|
f'"{domain}" email CEO OR founder OR director', |
|
|
f'"{company_name}" contact email executive', |
|
|
f'site:{domain} email contact', |
|
|
f'"{company_name}" "@{domain}" CEO OR VP OR director', |
|
|
] |
|
|
|
|
|
for query in email_queries: |
|
|
try: |
|
|
print(f"[CONTACT FINDER] Query: '{query}'") |
|
|
results = await self.search.search(query, max_results=10) |
|
|
|
|
|
for result in results: |
|
|
text = result.get('title', '') + ' ' + result.get('body', '') |
|
|
|
|
|
|
|
|
found_emails = self._extract_emails_from_text(text, domain) |
|
|
|
|
|
for email in found_emails: |
|
|
if email.lower() not in seen_emails and not self._is_generic_email(email.split('@')[0]): |
|
|
|
|
|
name, title = self._extract_name_near_email(text, email, target_titles) |
|
|
|
|
|
if name: |
|
|
contacts.append(Contact( |
|
|
id=str(uuid.uuid4()), |
|
|
name=name, |
|
|
email=email, |
|
|
title=title or "Executive", |
|
|
prospect_id="" |
|
|
)) |
|
|
seen_emails.add(email.lower()) |
|
|
print(f"[CONTACT FINDER] ✓ FOUND: {name} - {email}") |
|
|
|
|
|
if len(contacts) >= max_needed: |
|
|
return contacts |
|
|
|
|
|
except Exception as e: |
|
|
logger.debug(f"Email search error: {str(e)}") |
|
|
continue |
|
|
|
|
|
return contacts |
|
|
|
|
|
async def _scrape_for_verified_emails( |
|
|
self, |
|
|
company_name: str, |
|
|
domain: str, |
|
|
target_titles: List[str], |
|
|
seen_emails: Set[str], |
|
|
max_needed: int |
|
|
) -> List[Contact]: |
|
|
"""Scrape company pages to find actual email addresses""" |
|
|
contacts = [] |
|
|
|
|
|
|
|
|
pages_to_check = [ |
|
|
f"https://{domain}/contact", |
|
|
f"https://{domain}/contact-us", |
|
|
f"https://{domain}/about", |
|
|
f"https://{domain}/about-us", |
|
|
f"https://{domain}/team", |
|
|
f"https://{domain}/leadership", |
|
|
f"https://{domain}/our-team", |
|
|
f"https://www.{domain}/contact", |
|
|
f"https://www.{domain}/about", |
|
|
f"https://www.{domain}/team", |
|
|
] |
|
|
|
|
|
for url in pages_to_check: |
|
|
try: |
|
|
page_content = await self.scraper.scrape_page(url) |
|
|
if not page_content: |
|
|
continue |
|
|
|
|
|
text = page_content.get('text', '') |
|
|
|
|
|
|
|
|
found_emails = self._extract_emails_from_text(text, domain) |
|
|
|
|
|
for email in found_emails: |
|
|
if email.lower() not in seen_emails and not self._is_generic_email(email.split('@')[0]): |
|
|
|
|
|
name, title = self._extract_name_near_email(text, email, target_titles) |
|
|
|
|
|
if name: |
|
|
contacts.append(Contact( |
|
|
id=str(uuid.uuid4()), |
|
|
name=name, |
|
|
email=email, |
|
|
title=title or "Contact", |
|
|
prospect_id="" |
|
|
)) |
|
|
seen_emails.add(email.lower()) |
|
|
print(f"[CONTACT FINDER] ✓ SCRAPED: {name} - {email} from {url}") |
|
|
|
|
|
if len(contacts) >= max_needed: |
|
|
return contacts |
|
|
|
|
|
except Exception as e: |
|
|
logger.debug(f"Scrape error for {url}: {str(e)}") |
|
|
continue |
|
|
|
|
|
return contacts |
|
|
|
|
|
async def _find_contacts_with_emails( |
|
|
self, |
|
|
company_name: str, |
|
|
domain: str, |
|
|
target_titles: List[str], |
|
|
seen_emails: Set[str], |
|
|
max_needed: int |
|
|
) -> List[Contact]: |
|
|
"""Search for executives and only return those with verified emails""" |
|
|
contacts = [] |
|
|
|
|
|
for title in target_titles: |
|
|
|
|
|
queries = [ |
|
|
f'"{company_name}" {title} email "@{domain}"', |
|
|
f'"{company_name}" {title} contact email', |
|
|
f'site:linkedin.com "{company_name}" {title} email', |
|
|
] |
|
|
|
|
|
for query in queries: |
|
|
try: |
|
|
results = await self.search.search(query, max_results=5) |
|
|
|
|
|
for result in results: |
|
|
text = result.get('title', '') + ' ' + result.get('body', '') |
|
|
|
|
|
|
|
|
found_emails = self._extract_emails_from_text(text, domain) |
|
|
|
|
|
for email in found_emails: |
|
|
if email.lower() not in seen_emails and not self._is_generic_email(email.split('@')[0]): |
|
|
|
|
|
name = self._extract_name_from_text(text, company_name) |
|
|
|
|
|
if name: |
|
|
contacts.append(Contact( |
|
|
id=str(uuid.uuid4()), |
|
|
name=name, |
|
|
email=email, |
|
|
title=title, |
|
|
prospect_id="" |
|
|
)) |
|
|
seen_emails.add(email.lower()) |
|
|
print(f"[CONTACT FINDER] ✓ FOUND: {name} ({title}) - {email}") |
|
|
|
|
|
if len(contacts) >= max_needed: |
|
|
return contacts |
|
|
|
|
|
except Exception as e: |
|
|
logger.debug(f"Search error: {str(e)}") |
|
|
continue |
|
|
|
|
|
return contacts |
|
|
|
|
|
def _extract_emails_from_text(self, text: str, domain: str) -> List[str]: |
|
|
"""Extract email addresses from text, prioritizing company domain""" |
|
|
if not text: |
|
|
return [] |
|
|
|
|
|
|
|
|
email_pattern = r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b' |
|
|
all_emails = re.findall(email_pattern, text, re.IGNORECASE) |
|
|
|
|
|
|
|
|
company_emails = [e for e in all_emails if domain.lower() in e.lower()] |
|
|
|
|
|
|
|
|
filtered = [] |
|
|
ignore_patterns = ['example.com', 'domain.com', 'email.com', 'test.com', 'sample.com', |
|
|
'noreply', 'no-reply', 'donotreply', 'unsubscribe', 'privacy', |
|
|
'support@', 'info@', 'contact@', 'hello@', 'sales@', 'help@'] |
|
|
|
|
|
for email in company_emails: |
|
|
if not any(pattern in email.lower() for pattern in ignore_patterns): |
|
|
filtered.append(email.lower()) |
|
|
|
|
|
return list(set(filtered)) |
|
|
|
|
|
def _extract_name_near_email(self, text: str, email: str, target_titles: List[str]) -> tuple: |
|
|
"""Extract name that appears near an email address""" |
|
|
if not text or not email: |
|
|
return None, None |
|
|
|
|
|
|
|
|
email_pos = text.lower().find(email.lower()) |
|
|
if email_pos == -1: |
|
|
return None, None |
|
|
|
|
|
start = max(0, email_pos - 200) |
|
|
end = min(len(text), email_pos + len(email) + 200) |
|
|
context = text[start:end] |
|
|
|
|
|
|
|
|
name = None |
|
|
title = None |
|
|
|
|
|
|
|
|
for pattern in self.name_patterns: |
|
|
match = re.search(pattern, context) |
|
|
if match: |
|
|
potential_name = match.group(1).strip() |
|
|
if self._is_valid_name(potential_name): |
|
|
name = potential_name |
|
|
if len(match.groups()) > 1: |
|
|
title = match.group(2).strip() |
|
|
break |
|
|
|
|
|
|
|
|
if not name: |
|
|
|
|
|
words = context.split() |
|
|
for i, word in enumerate(words): |
|
|
if word and word[0].isupper() and len(word) > 2: |
|
|
if i + 1 < len(words) and words[i+1] and words[i+1][0].isupper(): |
|
|
potential_name = f"{word} {words[i+1]}" |
|
|
if self._is_valid_name(potential_name): |
|
|
name = potential_name |
|
|
break |
|
|
|
|
|
return name, title |
|
|
|
|
|
def _extract_name_from_text(self, text: str, company_name: str) -> Optional[str]: |
|
|
"""Extract a person's name from text""" |
|
|
for pattern in self.name_patterns: |
|
|
match = re.search(pattern, text) |
|
|
if match: |
|
|
name = match.group(1).strip() |
|
|
if self._is_valid_name(name) and company_name.lower() not in name.lower(): |
|
|
return name |
|
|
return None |
|
|
|
|
|
def _is_valid_name(self, name: str) -> bool: |
|
|
"""Validate that a string looks like a real person's name""" |
|
|
|
|
|
if not name: |
|
|
return False |
|
|
|
|
|
|
|
|
name = ' '.join(name.split()) |
|
|
|
|
|
|
|
|
if len(name) < 4 or len(name) > 50: |
|
|
return False |
|
|
|
|
|
|
|
|
parts = name.split() |
|
|
if len(parts) < 2: |
|
|
return False |
|
|
|
|
|
|
|
|
if not all(2 <= len(part) <= 20 for part in parts): |
|
|
return False |
|
|
|
|
|
|
|
|
if not all(part[0].isupper() for part in parts): |
|
|
return False |
|
|
|
|
|
|
|
|
non_name_words = {'inc', 'ltd', 'llc', 'corporation', 'company', 'the', 'and', 'of'} |
|
|
if any(word.lower() in non_name_words for word in parts): |
|
|
return False |
|
|
|
|
|
return True |
|
|
|
|
|
def _is_generic_email(self, prefix: str) -> bool: |
|
|
"""Check if email prefix is generic (info, contact, etc.)""" |
|
|
|
|
|
generic_prefixes = { |
|
|
'info', 'contact', 'support', 'hello', 'sales', 'admin', |
|
|
'help', 'service', 'team', 'general', 'office', 'mail' |
|
|
} |
|
|
|
|
|
return prefix.lower() in generic_prefixes |
|
|
|
|
|
|
|
|
|
|
|
_enhanced_finder: Optional[EnhancedContactFinder] = None |
|
|
|
|
|
|
|
|
def get_enhanced_contact_finder(mcp_registry=None) -> EnhancedContactFinder: |
|
|
""" |
|
|
Get enhanced contact finder instance |
|
|
|
|
|
Args: |
|
|
mcp_registry: Optional MCP registry (recommended). If provided, creates new instance. |
|
|
If None, returns legacy singleton (deprecated) |
|
|
|
|
|
Returns: |
|
|
EnhancedContactFinder instance |
|
|
""" |
|
|
if mcp_registry: |
|
|
|
|
|
return EnhancedContactFinder(mcp_registry=mcp_registry) |
|
|
|
|
|
|
|
|
global _enhanced_finder |
|
|
if _enhanced_finder is None: |
|
|
_enhanced_finder = EnhancedContactFinder() |
|
|
return _enhanced_finder |
|
|
|