cx_ai_agent_v1 / services /enhanced_contact_finder.py
muzakkirhussain011's picture
Add application files (text files only)
8bab08d
"""
Enhanced Contact Finder Service
Finds real decision-makers using LinkedIn search, team page scraping, and AI extraction
"""
from typing import List, Optional, Dict, Set, TYPE_CHECKING
import re
import logging
from email_validator import validate_email, EmailNotValidError
from services.web_search import get_search_service
from services.web_scraper import WebScraperService
from app.schema import Contact
import uuid
import asyncio
if TYPE_CHECKING:
from mcp.registry import MCPRegistry
logger = logging.getLogger(__name__)
class EnhancedContactFinder:
"""
Enhanced contact discovery using multiple strategies:
1. LinkedIn profile search
2. Company team/about page scraping
3. AI-powered contact extraction
4. Email pattern detection
Now supports MCP (Model Context Protocol) for unified search interface
"""
def __init__(self, mcp_registry: Optional['MCPRegistry'] = None):
"""
Initialize enhanced contact finder
Args:
mcp_registry: Optional MCP registry for unified search (recommended)
If None, falls back to direct web search service
"""
if mcp_registry:
# Use MCP search client
self.search = mcp_registry.get_search_client()
logger.info("EnhancedContactFinder initialized with MCP search client")
else:
# Fallback to direct search service (legacy)
self.search = get_search_service()
logger.warning("EnhancedContactFinder initialized without MCP (consider using MCP)")
self.scraper = WebScraperService()
# Common team page URL patterns
self.team_page_patterns = [
'/team',
'/about-us',
'/about',
'/leadership',
'/our-team',
'/management',
'/executives',
'/people'
]
# Enhanced regex patterns for name extraction
self.name_patterns = [
# LinkedIn format: "Name - Title at Company | LinkedIn"
r'([A-Z][a-z]+(?:\s+[A-Z][a-z]+)+)\s*[-–—]\s*([^|]+?)\s*(?:at|@)\s*([^|]+)',
# Standard format: "Name, Title at Company"
r'([A-Z][a-z]+(?:\s+[A-Z][a-z]+)+),?\s+([^,\n]+?)\s+(?:at|@)\s+([^\n]+)',
# Bio format: "Name is the Title"
r'([A-Z][a-z]+(?:\s+[A-Z][a-z]+)+)\s+is\s+(?:the\s+)?([^.]+)',
# Direct format: "Name\nTitle"
r'([A-Z][a-z]+(?:\s+[A-Z][a-z]+)+)\s*\n\s*([A-Z][^,\n]+)',
]
# We do NOT estimate emails - only use verified emails found on web
# This list is kept for reference but not used for generation
self._common_email_patterns = [
'{first}.{last}', # john.smith@company.com
'{first}{last}', # johnsmith@company.com
]
async def find_real_contacts(
self,
company_name: str,
domain: str,
target_titles: List[str],
max_contacts: int = 3
) -> List[Contact]:
"""
Find real decision-makers with VERIFIED contact information.
Searches multiple sources:
1. Company website (team/about/contact pages)
2. LinkedIn profiles
3. Press releases and news articles
4. Crunchbase and business directories
5. Social media (Twitter, Instagram business profiles)
Returns:
List of Contact objects with verified information
"""
logger.info(f"EnhancedFinder: Finding VERIFIED contacts at '{company_name}'")
print(f"\n[CONTACT FINDER] ========================================")
print(f"[CONTACT FINDER] Starting comprehensive search for {company_name}")
print(f"[CONTACT FINDER] Domain: {domain}")
print(f"[CONTACT FINDER] Target titles: {target_titles}")
print(f"[CONTACT FINDER] ========================================")
contacts = []
seen_emails: Set[str] = set()
seen_names: Set[str] = set()
# Strategy 1: Scrape company website directly
print(f"\n[CONTACT FINDER] 📄 Strategy 1: Scraping company website...")
website_contacts = await self._scrape_company_website(
company_name, domain, target_titles, seen_emails, seen_names, max_contacts
)
contacts.extend(website_contacts)
print(f"[CONTACT FINDER] ✓ Found {len(website_contacts)} contacts from company website")
# Strategy 2: LinkedIn search for executives
if len(contacts) < max_contacts:
print(f"\n[CONTACT FINDER] 💼 Strategy 2: Searching LinkedIn...")
linkedin_contacts = await self._search_linkedin(
company_name, domain, target_titles, seen_emails, seen_names, max_contacts - len(contacts)
)
contacts.extend(linkedin_contacts)
print(f"[CONTACT FINDER] ✓ Found {len(linkedin_contacts)} contacts from LinkedIn")
# Strategy 3: Search Crunchbase/business directories
if len(contacts) < max_contacts:
print(f"\n[CONTACT FINDER] 📊 Strategy 3: Searching business directories...")
directory_contacts = await self._search_business_directories(
company_name, domain, target_titles, seen_emails, seen_names, max_contacts - len(contacts)
)
contacts.extend(directory_contacts)
print(f"[CONTACT FINDER] ✓ Found {len(directory_contacts)} contacts from directories")
# Strategy 4: Press releases and news
if len(contacts) < max_contacts:
print(f"\n[CONTACT FINDER] 📰 Strategy 4: Searching press releases & news...")
news_contacts = await self._search_press_releases(
company_name, domain, target_titles, seen_emails, seen_names, max_contacts - len(contacts)
)
contacts.extend(news_contacts)
print(f"[CONTACT FINDER] ✓ Found {len(news_contacts)} contacts from news/PR")
# Strategy 5: Social media profiles
if len(contacts) < max_contacts:
print(f"\n[CONTACT FINDER] 📱 Strategy 5: Searching social media...")
social_contacts = await self._search_social_media(
company_name, domain, target_titles, seen_emails, seen_names, max_contacts - len(contacts)
)
contacts.extend(social_contacts)
print(f"[CONTACT FINDER] ✓ Found {len(social_contacts)} contacts from social media")
# Strategy 6: Direct email search as fallback
if len(contacts) < max_contacts:
print(f"\n[CONTACT FINDER] 🔍 Strategy 6: Direct email search...")
email_contacts = await self._search_for_emails(
company_name, domain, target_titles, seen_emails, max_contacts - len(contacts)
)
contacts.extend(email_contacts)
print(f"[CONTACT FINDER] ✓ Found {len(email_contacts)} contacts from direct email search")
logger.info(f"EnhancedFinder: Total {len(contacts)} VERIFIED contacts found for '{company_name}'")
print(f"\n[CONTACT FINDER] ========================================")
print(f"[CONTACT FINDER] FINAL RESULTS: {len(contacts)} verified contacts")
print(f"[CONTACT FINDER] ========================================")
for i, contact in enumerate(contacts[:max_contacts], 1):
print(f"[CONTACT FINDER] {i}. {contact.name} ({contact.title})")
print(f"[CONTACT FINDER] 📧 {contact.email}")
if len(contacts) == 0:
print(f"[CONTACT FINDER] No verified contacts found.")
print(f"[CONTACT FINDER] Try manual search on LinkedIn or company website.")
print(f"[CONTACT FINDER] ========================================\n")
return contacts[:max_contacts]
async def _scrape_company_website(
self,
company_name: str,
domain: str,
target_titles: List[str],
seen_emails: Set[str],
seen_names: Set[str],
max_needed: int
) -> List[Contact]:
"""Scrape company website for contact information"""
contacts = []
# Pages to check on company website
pages_to_check = [
f"https://{domain}/team",
f"https://{domain}/about",
f"https://{domain}/about-us",
f"https://{domain}/leadership",
f"https://{domain}/our-team",
f"https://{domain}/management",
f"https://{domain}/contact",
f"https://{domain}/contact-us",
f"https://www.{domain}/team",
f"https://www.{domain}/about",
f"https://www.{domain}/about-us",
f"https://www.{domain}/leadership",
f"https://www.{domain}/contact",
]
for url in pages_to_check:
if len(contacts) >= max_needed:
break
try:
print(f"[CONTACT FINDER] Checking: {url}")
page_content = await self.scraper.scrape_page(url)
if not page_content:
continue
text = page_content.get('text', '')
html = page_content.get('html', '')
# Find emails on page
found_emails = self._extract_emails_from_text(text, domain)
for email in found_emails:
if email.lower() not in seen_emails and not self._is_generic_email(email.split('@')[0]):
name, title = self._extract_name_near_email(text, email, target_titles)
if name and name.lower() not in seen_names:
contacts.append(Contact(
id=str(uuid.uuid4()),
name=name,
email=email,
title=title or "Executive",
prospect_id=""
))
seen_emails.add(email.lower())
seen_names.add(name.lower())
print(f"[CONTACT FINDER] ✓ FOUND: {name} ({title}) - {email}")
if len(contacts) >= max_needed:
return contacts
except Exception as e:
logger.debug(f"Error scraping {url}: {str(e)}")
continue
return contacts
async def _search_linkedin(
self,
company_name: str,
domain: str,
target_titles: List[str],
seen_emails: Set[str],
seen_names: Set[str],
max_needed: int
) -> List[Contact]:
"""Search LinkedIn for company executives with contact info"""
contacts = []
for title in target_titles[:5]: # Check top 5 titles
if len(contacts) >= max_needed:
break
# LinkedIn-specific search queries
queries = [
f'site:linkedin.com/in "{company_name}" "{title}" email',
f'site:linkedin.com "{company_name}" {title} contact',
f'linkedin.com/in {title} {company_name} "@{domain}"',
]
for query in queries:
if len(contacts) >= max_needed:
break
try:
print(f"[CONTACT FINDER] Query: {query[:60]}...")
results = await self.search.search(query, max_results=5)
for result in results:
text = result.get('title', '') + ' ' + result.get('body', '')
url = result.get('url', '')
# Look for emails in the result
found_emails = self._extract_emails_from_text(text, domain)
if found_emails:
for email in found_emails:
if email.lower() not in seen_emails:
name = self._extract_linkedin_name(text, result.get('title', ''))
if name and name.lower() not in seen_names:
contacts.append(Contact(
id=str(uuid.uuid4()),
name=name,
email=email,
title=title,
prospect_id=""
))
seen_emails.add(email.lower())
seen_names.add(name.lower())
print(f"[CONTACT FINDER] ✓ FOUND: {name} ({title}) - {email}")
except Exception as e:
logger.debug(f"LinkedIn search error: {str(e)}")
continue
return contacts
async def _search_business_directories(
self,
company_name: str,
domain: str,
target_titles: List[str],
seen_emails: Set[str],
seen_names: Set[str],
max_needed: int
) -> List[Contact]:
"""Search Crunchbase, ZoomInfo, and other business directories"""
contacts = []
# Directory search queries
queries = [
f'site:crunchbase.com "{company_name}" founder CEO email',
f'site:crunchbase.com/person "{company_name}" email',
f'"{company_name}" founder email "@{domain}"',
f'"{company_name}" CEO email contact',
f'site:zoominfo.com "{company_name}" contact',
f'site:apollo.io "{company_name}" email',
]
for query in queries:
if len(contacts) >= max_needed:
break
try:
print(f"[CONTACT FINDER] Query: {query[:60]}...")
results = await self.search.search(query, max_results=5)
for result in results:
text = result.get('title', '') + ' ' + result.get('body', '')
found_emails = self._extract_emails_from_text(text, domain)
for email in found_emails:
if email.lower() not in seen_emails and not self._is_generic_email(email.split('@')[0]):
name, title = self._extract_name_near_email(text, email, target_titles)
if name and name.lower() not in seen_names:
contacts.append(Contact(
id=str(uuid.uuid4()),
name=name,
email=email,
title=title or "Founder/Executive",
prospect_id=""
))
seen_emails.add(email.lower())
seen_names.add(name.lower())
print(f"[CONTACT FINDER] ✓ FOUND: {name} - {email}")
except Exception as e:
logger.debug(f"Directory search error: {str(e)}")
continue
return contacts
async def _search_press_releases(
self,
company_name: str,
domain: str,
target_titles: List[str],
seen_emails: Set[str],
seen_names: Set[str],
max_needed: int
) -> List[Contact]:
"""Search press releases and news for executive contact info"""
contacts = []
queries = [
f'"{company_name}" press release contact email',
f'"{company_name}" announcement CEO founder email',
f'site:prnewswire.com "{company_name}" contact',
f'site:businesswire.com "{company_name}" contact',
f'"{company_name}" media contact "@{domain}"',
f'"{company_name}" PR contact email',
]
for query in queries:
if len(contacts) >= max_needed:
break
try:
print(f"[CONTACT FINDER] Query: {query[:60]}...")
results = await self.search.search(query, max_results=5)
for result in results:
text = result.get('title', '') + ' ' + result.get('body', '')
found_emails = self._extract_emails_from_text(text, domain)
for email in found_emails:
if email.lower() not in seen_emails and not self._is_generic_email(email.split('@')[0]):
name, title = self._extract_name_near_email(text, email, target_titles)
if name and name.lower() not in seen_names:
contacts.append(Contact(
id=str(uuid.uuid4()),
name=name,
email=email,
title=title or "Media Contact",
prospect_id=""
))
seen_emails.add(email.lower())
seen_names.add(name.lower())
print(f"[CONTACT FINDER] ✓ FOUND: {name} - {email}")
except Exception as e:
logger.debug(f"Press release search error: {str(e)}")
continue
return contacts
async def _search_social_media(
self,
company_name: str,
domain: str,
target_titles: List[str],
seen_emails: Set[str],
seen_names: Set[str],
max_needed: int
) -> List[Contact]:
"""Search social media profiles for contact information"""
contacts = []
queries = [
f'site:twitter.com "{company_name}" email "@{domain}"',
f'site:instagram.com "{company_name}" email contact',
f'"{company_name}" twitter CEO founder email',
f'"{company_name}" instagram business email',
f'site:facebook.com "{company_name}" about email',
]
for query in queries:
if len(contacts) >= max_needed:
break
try:
print(f"[CONTACT FINDER] Query: {query[:60]}...")
results = await self.search.search(query, max_results=5)
for result in results:
text = result.get('title', '') + ' ' + result.get('body', '')
found_emails = self._extract_emails_from_text(text, domain)
for email in found_emails:
if email.lower() not in seen_emails and not self._is_generic_email(email.split('@')[0]):
name, title = self._extract_name_near_email(text, email, target_titles)
if name and name.lower() not in seen_names:
contacts.append(Contact(
id=str(uuid.uuid4()),
name=name,
email=email,
title=title or "Executive",
prospect_id=""
))
seen_emails.add(email.lower())
seen_names.add(name.lower())
print(f"[CONTACT FINDER] ✓ FOUND: {name} - {email}")
except Exception as e:
logger.debug(f"Social media search error: {str(e)}")
continue
return contacts
def _extract_linkedin_name(self, text: str, title: str) -> Optional[str]:
"""Extract name from LinkedIn search result"""
# LinkedIn title format: "Name - Title at Company | LinkedIn"
linkedin_pattern = r'^([A-Z][a-z]+(?:\s+[A-Z][a-z]+)+)\s*[-–—]'
match = re.search(linkedin_pattern, title)
if match:
name = match.group(1).strip()
if self._is_valid_name(name):
return name
# Try to find name in text
for pattern in self.name_patterns:
match = re.search(pattern, text)
if match:
name = match.group(1).strip()
if self._is_valid_name(name):
return name
return None
async def _search_for_emails(
self,
company_name: str,
domain: str,
target_titles: List[str],
seen_emails: Set[str],
max_needed: int
) -> List[Contact]:
"""Search specifically for email addresses associated with company executives"""
contacts = []
# Direct email search queries
email_queries = [
f'"{domain}" email CEO OR founder OR director',
f'"{company_name}" contact email executive',
f'site:{domain} email contact',
f'"{company_name}" "@{domain}" CEO OR VP OR director',
]
for query in email_queries:
try:
print(f"[CONTACT FINDER] Query: '{query}'")
results = await self.search.search(query, max_results=10)
for result in results:
text = result.get('title', '') + ' ' + result.get('body', '')
# Extract emails from text
found_emails = self._extract_emails_from_text(text, domain)
for email in found_emails:
if email.lower() not in seen_emails and not self._is_generic_email(email.split('@')[0]):
# Try to find associated name and title
name, title = self._extract_name_near_email(text, email, target_titles)
if name:
contacts.append(Contact(
id=str(uuid.uuid4()),
name=name,
email=email,
title=title or "Executive",
prospect_id=""
))
seen_emails.add(email.lower())
print(f"[CONTACT FINDER] ✓ FOUND: {name} - {email}")
if len(contacts) >= max_needed:
return contacts
except Exception as e:
logger.debug(f"Email search error: {str(e)}")
continue
return contacts
async def _scrape_for_verified_emails(
self,
company_name: str,
domain: str,
target_titles: List[str],
seen_emails: Set[str],
max_needed: int
) -> List[Contact]:
"""Scrape company pages to find actual email addresses"""
contacts = []
# Pages likely to have contact info
pages_to_check = [
f"https://{domain}/contact",
f"https://{domain}/contact-us",
f"https://{domain}/about",
f"https://{domain}/about-us",
f"https://{domain}/team",
f"https://{domain}/leadership",
f"https://{domain}/our-team",
f"https://www.{domain}/contact",
f"https://www.{domain}/about",
f"https://www.{domain}/team",
]
for url in pages_to_check:
try:
page_content = await self.scraper.scrape_page(url)
if not page_content:
continue
text = page_content.get('text', '')
# Find all emails on page
found_emails = self._extract_emails_from_text(text, domain)
for email in found_emails:
if email.lower() not in seen_emails and not self._is_generic_email(email.split('@')[0]):
# Try to find associated name
name, title = self._extract_name_near_email(text, email, target_titles)
if name:
contacts.append(Contact(
id=str(uuid.uuid4()),
name=name,
email=email,
title=title or "Contact",
prospect_id=""
))
seen_emails.add(email.lower())
print(f"[CONTACT FINDER] ✓ SCRAPED: {name} - {email} from {url}")
if len(contacts) >= max_needed:
return contacts
except Exception as e:
logger.debug(f"Scrape error for {url}: {str(e)}")
continue
return contacts
async def _find_contacts_with_emails(
self,
company_name: str,
domain: str,
target_titles: List[str],
seen_emails: Set[str],
max_needed: int
) -> List[Contact]:
"""Search for executives and only return those with verified emails"""
contacts = []
for title in target_titles:
# Search for person WITH email mention
queries = [
f'"{company_name}" {title} email "@{domain}"',
f'"{company_name}" {title} contact email',
f'site:linkedin.com "{company_name}" {title} email',
]
for query in queries:
try:
results = await self.search.search(query, max_results=5)
for result in results:
text = result.get('title', '') + ' ' + result.get('body', '')
# Only proceed if we find an actual email
found_emails = self._extract_emails_from_text(text, domain)
for email in found_emails:
if email.lower() not in seen_emails and not self._is_generic_email(email.split('@')[0]):
# Extract name from text
name = self._extract_name_from_text(text, company_name)
if name:
contacts.append(Contact(
id=str(uuid.uuid4()),
name=name,
email=email,
title=title,
prospect_id=""
))
seen_emails.add(email.lower())
print(f"[CONTACT FINDER] ✓ FOUND: {name} ({title}) - {email}")
if len(contacts) >= max_needed:
return contacts
except Exception as e:
logger.debug(f"Search error: {str(e)}")
continue
return contacts
def _extract_emails_from_text(self, text: str, domain: str) -> List[str]:
"""Extract email addresses from text, prioritizing company domain"""
if not text:
return []
# Find all emails
email_pattern = r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b'
all_emails = re.findall(email_pattern, text, re.IGNORECASE)
# Prioritize company domain emails
company_emails = [e for e in all_emails if domain.lower() in e.lower()]
# Filter out junk
filtered = []
ignore_patterns = ['example.com', 'domain.com', 'email.com', 'test.com', 'sample.com',
'noreply', 'no-reply', 'donotreply', 'unsubscribe', 'privacy',
'support@', 'info@', 'contact@', 'hello@', 'sales@', 'help@']
for email in company_emails:
if not any(pattern in email.lower() for pattern in ignore_patterns):
filtered.append(email.lower())
return list(set(filtered))
def _extract_name_near_email(self, text: str, email: str, target_titles: List[str]) -> tuple:
"""Extract name that appears near an email address"""
if not text or not email:
return None, None
# Find context around email (200 chars before and after)
email_pos = text.lower().find(email.lower())
if email_pos == -1:
return None, None
start = max(0, email_pos - 200)
end = min(len(text), email_pos + len(email) + 200)
context = text[start:end]
# Look for name patterns in context
name = None
title = None
# Try to find name-title patterns
for pattern in self.name_patterns:
match = re.search(pattern, context)
if match:
potential_name = match.group(1).strip()
if self._is_valid_name(potential_name):
name = potential_name
if len(match.groups()) > 1:
title = match.group(2).strip()
break
# If no name found, try simpler extraction
if not name:
# Look for capitalized name-like words near email
words = context.split()
for i, word in enumerate(words):
if word and word[0].isupper() and len(word) > 2:
if i + 1 < len(words) and words[i+1] and words[i+1][0].isupper():
potential_name = f"{word} {words[i+1]}"
if self._is_valid_name(potential_name):
name = potential_name
break
return name, title
def _extract_name_from_text(self, text: str, company_name: str) -> Optional[str]:
"""Extract a person's name from text"""
for pattern in self.name_patterns:
match = re.search(pattern, text)
if match:
name = match.group(1).strip()
if self._is_valid_name(name) and company_name.lower() not in name.lower():
return name
return None
def _is_valid_name(self, name: str) -> bool:
"""Validate that a string looks like a real person's name"""
if not name:
return False
# Remove extra whitespace
name = ' '.join(name.split())
# Check for minimum length
if len(name) < 4 or len(name) > 50:
return False
# Should have at least 2 words (first and last name)
parts = name.split()
if len(parts) < 2:
return False
# Each part should be reasonable length
if not all(2 <= len(part) <= 20 for part in parts):
return False
# Should start with capital letters
if not all(part[0].isupper() for part in parts):
return False
# Shouldn't contain common non-name words
non_name_words = {'inc', 'ltd', 'llc', 'corporation', 'company', 'the', 'and', 'of'}
if any(word.lower() in non_name_words for word in parts):
return False
return True
def _is_generic_email(self, prefix: str) -> bool:
"""Check if email prefix is generic (info, contact, etc.)"""
generic_prefixes = {
'info', 'contact', 'support', 'hello', 'sales', 'admin',
'help', 'service', 'team', 'general', 'office', 'mail'
}
return prefix.lower() in generic_prefixes
# Legacy singleton (deprecated - use MCP instead)
_enhanced_finder: Optional[EnhancedContactFinder] = None
def get_enhanced_contact_finder(mcp_registry=None) -> EnhancedContactFinder:
"""
Get enhanced contact finder instance
Args:
mcp_registry: Optional MCP registry (recommended). If provided, creates new instance.
If None, returns legacy singleton (deprecated)
Returns:
EnhancedContactFinder instance
"""
if mcp_registry:
# Create new instance with MCP (recommended)
return EnhancedContactFinder(mcp_registry=mcp_registry)
# Legacy singleton fallback (deprecated)
global _enhanced_finder
if _enhanced_finder is None:
_enhanced_finder = EnhancedContactFinder()
return _enhanced_finder