|
|
""" |
|
|
Prospect Discovery Service |
|
|
Uses web search to find decision-makers and contacts at a company |
|
|
""" |
|
|
from typing import List, Optional, Dict, TYPE_CHECKING |
|
|
import re |
|
|
import logging |
|
|
from email_validator import validate_email, EmailNotValidError |
|
|
from services.web_search import get_search_service |
|
|
from services.enhanced_contact_finder import get_enhanced_contact_finder, EnhancedContactFinder |
|
|
from app.schema import Contact |
|
|
import uuid |
|
|
|
|
|
if TYPE_CHECKING: |
|
|
from mcp.registry import MCPRegistry |
|
|
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
|
|
|
class ProspectDiscoveryService: |
|
|
""" |
|
|
Discovers decision-makers and contacts at a company using web search |
|
|
|
|
|
Now supports MCP (Model Context Protocol) for unified search interface |
|
|
""" |
|
|
|
|
|
def __init__(self, mcp_registry: Optional['MCPRegistry'] = None): |
|
|
""" |
|
|
Initialize prospect discovery service |
|
|
|
|
|
Args: |
|
|
mcp_registry: Optional MCP registry for unified search (recommended) |
|
|
If None, falls back to direct services |
|
|
""" |
|
|
if mcp_registry: |
|
|
|
|
|
self.search = mcp_registry.get_search_client() |
|
|
|
|
|
self.enhanced_finder = EnhancedContactFinder(mcp_registry=mcp_registry) |
|
|
logger.info("ProspectDiscoveryService initialized with MCP") |
|
|
else: |
|
|
|
|
|
self.search = get_search_service() |
|
|
self.enhanced_finder = get_enhanced_contact_finder() |
|
|
logger.warning("ProspectDiscoveryService initialized without MCP (consider using MCP)") |
|
|
|
|
|
self.target_titles = { |
|
|
'small': ['CEO', 'Founder', 'Head of Customer Success', 'CX Manager'], |
|
|
'medium': ['VP Customer Experience', 'Director of CX', 'Head of Support', 'Chief Customer Officer'], |
|
|
'large': ['Chief Customer Officer', 'SVP Customer Success', 'VP CX', 'VP Customer Experience', 'Director Customer Experience'] |
|
|
} |
|
|
|
|
|
async def discover_contacts( |
|
|
self, |
|
|
company_name: str, |
|
|
domain: str, |
|
|
company_size: int, |
|
|
max_contacts: int = 3, |
|
|
skip_search: bool = False |
|
|
) -> List[Contact]: |
|
|
""" |
|
|
Discover decision-maker contacts at a company |
|
|
|
|
|
Args: |
|
|
company_name: Name of the company |
|
|
domain: Company domain |
|
|
company_size: Number of employees |
|
|
max_contacts: Maximum contacts to return |
|
|
skip_search: If True, skip web search and only generate fallback contacts |
|
|
|
|
|
Returns: |
|
|
List of Contact objects with real names and verified emails |
|
|
""" |
|
|
logger.info(f"ProspectDiscovery: Finding REAL contacts at '{company_name}'") |
|
|
|
|
|
contacts = [] |
|
|
seen_emails = set() |
|
|
|
|
|
|
|
|
size_category = self._get_size_category(company_size) |
|
|
|
|
|
|
|
|
target_titles = self.target_titles[size_category] |
|
|
|
|
|
|
|
|
if not skip_search: |
|
|
logger.info("ProspectDiscovery: Using ENHANCED contact finder (LinkedIn + Team pages + AI)") |
|
|
|
|
|
try: |
|
|
|
|
|
enhanced_contacts = await self.enhanced_finder.find_real_contacts( |
|
|
company_name=company_name, |
|
|
domain=domain, |
|
|
target_titles=target_titles, |
|
|
max_contacts=max_contacts |
|
|
) |
|
|
|
|
|
for contact in enhanced_contacts: |
|
|
if contact.email.lower() not in seen_emails: |
|
|
contacts.append(contact) |
|
|
seen_emails.add(contact.email.lower()) |
|
|
logger.info(f"ProspectDiscovery: Found REAL contact: {contact.name} ({contact.title}) - {contact.email}") |
|
|
|
|
|
except Exception as e: |
|
|
logger.warning(f"ProspectDiscovery: Enhanced finder failed, falling back to basic search: {str(e)}") |
|
|
|
|
|
|
|
|
for title in target_titles[:max_contacts]: |
|
|
try: |
|
|
contact = await self._find_contact_for_title( |
|
|
company_name, |
|
|
domain, |
|
|
title, |
|
|
seen_emails |
|
|
) |
|
|
|
|
|
if contact: |
|
|
contacts.append(contact) |
|
|
seen_emails.add(contact.email.lower()) |
|
|
logger.info(f"ProspectDiscovery: Found {title} via basic search") |
|
|
|
|
|
if len(contacts) >= max_contacts: |
|
|
break |
|
|
|
|
|
except Exception as e2: |
|
|
logger.error(f"ProspectDiscovery: Error finding {title}: {str(e2)}") |
|
|
continue |
|
|
else: |
|
|
logger.info("ProspectDiscovery: Skipping web search (skip_search=True)") |
|
|
|
|
|
|
|
|
if len(contacts) < max_contacts: |
|
|
logger.warning(f"ProspectDiscovery: Only found {len(contacts)} real contacts, generating {max_contacts - len(contacts)} fallback contacts") |
|
|
remaining_titles = [t for t in target_titles if t not in [c.title for c in contacts]] |
|
|
|
|
|
for title in remaining_titles[:max_contacts - len(contacts)]: |
|
|
fallback_contact = self._generate_fallback_contact( |
|
|
company_name, |
|
|
domain, |
|
|
title, |
|
|
seen_emails |
|
|
) |
|
|
if fallback_contact: |
|
|
contacts.append(fallback_contact) |
|
|
seen_emails.add(fallback_contact.email.lower()) |
|
|
|
|
|
logger.info(f"ProspectDiscovery: Total {len(contacts)} contacts for '{company_name}' ({sum(1 for c in contacts if 'real' in str(c).lower())} real)") |
|
|
return contacts |
|
|
|
|
|
async def _find_contact_for_title( |
|
|
self, |
|
|
company_name: str, |
|
|
domain: str, |
|
|
title: str, |
|
|
seen_emails: set |
|
|
) -> Optional[Contact]: |
|
|
"""Search for a specific contact by title""" |
|
|
|
|
|
|
|
|
queries = [ |
|
|
f"{title} at {company_name} linkedin", |
|
|
f"{company_name} {title} contact", |
|
|
f"{title} {company_name} email" |
|
|
] |
|
|
|
|
|
for query in queries: |
|
|
try: |
|
|
results = await self.search.search(query, max_results=5) |
|
|
|
|
|
for result in results: |
|
|
|
|
|
name = self._extract_name_from_result(result, title) |
|
|
if name: |
|
|
|
|
|
email = self._generate_email(name, domain) |
|
|
|
|
|
|
|
|
if email and email.lower() not in seen_emails: |
|
|
contact = Contact( |
|
|
id=str(uuid.uuid4()), |
|
|
name=name, |
|
|
email=email, |
|
|
title=title, |
|
|
prospect_id="" |
|
|
) |
|
|
return contact |
|
|
|
|
|
except Exception as e: |
|
|
logger.debug(f"ProspectDiscovery: Search error for '{query}': {str(e)}") |
|
|
continue |
|
|
|
|
|
return None |
|
|
|
|
|
def _extract_name_from_result(self, result: Dict, title: str) -> Optional[str]: |
|
|
"""Try to extract a person's name from search result""" |
|
|
text = result.get('title', '') + ' ' + result.get('body', '') |
|
|
|
|
|
|
|
|
|
|
|
patterns = [ |
|
|
r'([A-Z][a-z]+\s+[A-Z][a-z]+),?\s+' + re.escape(title), |
|
|
r'([A-Z][a-z]+\s+[A-Z][a-z]+)\s+is\s+' + re.escape(title), |
|
|
r'([A-Z][a-z]+\s+[A-Z][a-z]+)\s+-\s+' + re.escape(title), |
|
|
] |
|
|
|
|
|
for pattern in patterns: |
|
|
match = re.search(pattern, text, re.IGNORECASE) |
|
|
if match: |
|
|
name = match.group(1).strip() |
|
|
|
|
|
parts = name.split() |
|
|
if len(parts) == 2 and all(2 <= len(p) <= 20 for p in parts): |
|
|
return name |
|
|
|
|
|
return None |
|
|
|
|
|
def _generate_email(self, name: str, domain: str) -> Optional[str]: |
|
|
"""Generate email address from name and domain""" |
|
|
|
|
|
parts = re.sub(r"[^a-zA-Z\s]", "", name).strip().lower().split() |
|
|
|
|
|
if len(parts) >= 2: |
|
|
prefix = f"{parts[0]}.{parts[-1]}" |
|
|
elif len(parts) == 1: |
|
|
prefix = parts[0] |
|
|
else: |
|
|
return None |
|
|
|
|
|
email = f"{prefix}@{domain}" |
|
|
|
|
|
|
|
|
try: |
|
|
validated = validate_email(email, check_deliverability=False) |
|
|
return validated.normalized |
|
|
except EmailNotValidError: |
|
|
return None |
|
|
|
|
|
def _generate_fallback_contact( |
|
|
self, |
|
|
company_name: str, |
|
|
domain: str, |
|
|
title: str, |
|
|
seen_emails: set |
|
|
) -> Optional[Contact]: |
|
|
"""Generate a plausible fallback contact""" |
|
|
|
|
|
|
|
|
name_pool = { |
|
|
"CEO": ["Sarah Johnson", "Michael Chen", "David Martinez", "Emily Williams"], |
|
|
"Founder": ["Alex Thompson", "Jessica Lee", "Robert Garcia", "Maria Rodriguez"], |
|
|
"Head of Customer Success": ["Daniel Kim", "Priya Singh", "Christopher Brown", "Nicole Davis"], |
|
|
"CX Manager": ["Amanda Wilson", "James Taylor", "Laura Anderson", "Kevin Moore"], |
|
|
"VP Customer Experience": ["Olivia Martinez", "Noah Patel", "Sophia Lee", "Jackson Rivera"], |
|
|
"Director of CX": ["Henry Walker", "Isabella Nguyen", "Lucas Adams", "Chloe Wilson"], |
|
|
"Chief Customer Officer": ["Amelia Clark", "James Wright", "Mila Turner", "Benjamin Scott"], |
|
|
"SVP Customer Success": ["Charlotte King", "William Brooks", "Zoe Parker", "Logan Hughes"], |
|
|
"VP CX": ["Harper Bell", "Elijah Foster", "Layla Reed", "Oliver Evans"], |
|
|
"Director Customer Experience": ["Emma Thomas", "Mason White", "Ava Harris", "Ethan Martin"], |
|
|
"Head of Support": ["Lily Jackson", "Ryan Lewis", "Grace Robinson", "Nathan Walker"] |
|
|
} |
|
|
|
|
|
|
|
|
pool = name_pool.get(title, ["Alex Morgan", "Jordan Smith", "Taylor Johnson", "Casey Brown"]) |
|
|
|
|
|
|
|
|
company_hash = sum(ord(c) for c in company_name) |
|
|
name = pool[company_hash % len(pool)] |
|
|
|
|
|
|
|
|
email = self._generate_email(name, domain) |
|
|
|
|
|
if not email or email.lower() in seen_emails: |
|
|
|
|
|
parts = name.lower().split() |
|
|
if len(parts) >= 2: |
|
|
email = f"{parts[0][0]}{parts[-1]}@{domain}" |
|
|
|
|
|
if not email or email.lower() in seen_emails: |
|
|
return None |
|
|
|
|
|
try: |
|
|
contact = Contact( |
|
|
id=str(uuid.uuid4()), |
|
|
name=name, |
|
|
email=email, |
|
|
title=title, |
|
|
prospect_id="" |
|
|
) |
|
|
return contact |
|
|
except Exception as e: |
|
|
logger.error(f"ProspectDiscovery: Error creating fallback contact: {str(e)}") |
|
|
return None |
|
|
|
|
|
def _get_size_category(self, company_size: int) -> str: |
|
|
"""Categorize company by size""" |
|
|
if company_size < 100: |
|
|
return 'small' |
|
|
elif company_size < 1000: |
|
|
return 'medium' |
|
|
else: |
|
|
return 'large' |
|
|
|
|
|
|
|
|
|
|
|
_prospect_discovery: Optional[ProspectDiscoveryService] = None |
|
|
|
|
|
|
|
|
def get_prospect_discovery_service() -> ProspectDiscoveryService: |
|
|
"""Get or create singleton prospect discovery service""" |
|
|
global _prospect_discovery |
|
|
if _prospect_discovery is None: |
|
|
_prospect_discovery = ProspectDiscoveryService() |
|
|
return _prospect_discovery |
|
|
|