File size: 5,229 Bytes
8bab08d |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 |
# file: agents/enricher.py
"""
Enricher Agent - Enriches prospects with real-time web search data
Now uses actual web search instead of static/mock data
"""
from datetime import datetime
from app.schema import Prospect, Fact
from app.config import FACT_TTL_HOURS, SKIP_WEB_SEARCH
import uuid
import logging
logger = logging.getLogger(__name__)
class Enricher:
"""
Enriches prospects with facts from real web search
IMPROVED: Now uses actual web search to find:
- Company news and updates
- Industry trends and challenges
- Customer experience insights
- Recent developments
"""
def __init__(self, mcp_registry):
self.mcp = mcp_registry
self.search = mcp_registry.get_search_client()
self.store = mcp_registry.get_store_client()
async def run(self, prospect: Prospect) -> Prospect:
"""Enrich prospect with facts from web search"""
logger.info(f"Enricher: Enriching prospect '{prospect.company.name}'")
facts = []
seen_texts = set() # Deduplication
# Only do web search if not skipped
if not SKIP_WEB_SEARCH:
logger.info("Enricher: Performing web search for facts")
# Enhanced search queries for better fact discovery
queries = [
# Company news and updates
f"{prospect.company.name} news latest updates",
# Industry-specific challenges
f"{prospect.company.name} {prospect.company.industry} customer experience",
# Pain points and challenges
f"{prospect.company.name} challenges problems",
# Contact and support information
f"{prospect.company.domain} customer support contact"
]
for query in queries:
try:
logger.info(f"Enricher: Searching for: '{query}'")
results = await self.search.query(query)
# Process search results
for result in results[:3]: # Top 3 per query
text = result.get("text", "").strip()
title = result.get("title", "").strip()
# Skip empty or very short results
if not text or len(text) < 20:
continue
# Combine title and text for better context
if title and title not in text:
full_text = f"{title}. {text}"
else:
full_text = text
# Deduplicate
if full_text in seen_texts:
continue
seen_texts.add(full_text)
# Create fact
fact = Fact(
id=str(uuid.uuid4()),
source=result.get("source", "web search"),
text=full_text[:500], # Limit length
collected_at=datetime.utcnow(),
ttl_hours=FACT_TTL_HOURS,
confidence=result.get("confidence", 0.75),
company_id=prospect.company.id
)
facts.append(fact)
await self.store.save_fact(fact)
logger.info(f"Enricher: Added fact from {fact.source}")
except Exception as e:
logger.error(f"Enricher: Error searching for '{query}': {str(e)}")
continue
else:
logger.info("Enricher: Skipping web search (SKIP_WEB_SEARCH=true)")
# Also add company pain points as facts (from discovery)
for pain in prospect.company.pains:
if pain and len(pain) > 10: # Valid pain point
fact = Fact(
id=str(uuid.uuid4()),
source="company_discovery",
text=f"Known challenge: {pain}",
collected_at=datetime.utcnow(),
ttl_hours=FACT_TTL_HOURS * 2, # Discovery data lasts longer
confidence=0.85,
company_id=prospect.company.id
)
facts.append(fact)
await self.store.save_fact(fact)
# Add company notes as facts
for note in prospect.company.notes:
if note and len(note) > 10: # Valid note
fact = Fact(
id=str(uuid.uuid4()),
source="company_discovery",
text=note,
collected_at=datetime.utcnow(),
ttl_hours=FACT_TTL_HOURS * 2,
confidence=0.8,
company_id=prospect.company.id
)
facts.append(fact)
await self.store.save_fact(fact)
prospect.facts = facts
prospect.status = "enriched"
await self.store.save_prospect(prospect)
logger.info(f"Enricher: Added {len(facts)} facts for '{prospect.company.name}'")
return prospect |