cx_ai_agent_v1 / agents /enricher.py
muzakkirhussain011's picture
Add application files (text files only)
8bab08d
# file: agents/enricher.py
"""
Enricher Agent - Enriches prospects with real-time web search data
Now uses actual web search instead of static/mock data
"""
from datetime import datetime
from app.schema import Prospect, Fact
from app.config import FACT_TTL_HOURS, SKIP_WEB_SEARCH
import uuid
import logging
logger = logging.getLogger(__name__)
class Enricher:
"""
Enriches prospects with facts from real web search
IMPROVED: Now uses actual web search to find:
- Company news and updates
- Industry trends and challenges
- Customer experience insights
- Recent developments
"""
def __init__(self, mcp_registry):
self.mcp = mcp_registry
self.search = mcp_registry.get_search_client()
self.store = mcp_registry.get_store_client()
async def run(self, prospect: Prospect) -> Prospect:
"""Enrich prospect with facts from web search"""
logger.info(f"Enricher: Enriching prospect '{prospect.company.name}'")
facts = []
seen_texts = set() # Deduplication
# Only do web search if not skipped
if not SKIP_WEB_SEARCH:
logger.info("Enricher: Performing web search for facts")
# Enhanced search queries for better fact discovery
queries = [
# Company news and updates
f"{prospect.company.name} news latest updates",
# Industry-specific challenges
f"{prospect.company.name} {prospect.company.industry} customer experience",
# Pain points and challenges
f"{prospect.company.name} challenges problems",
# Contact and support information
f"{prospect.company.domain} customer support contact"
]
for query in queries:
try:
logger.info(f"Enricher: Searching for: '{query}'")
results = await self.search.query(query)
# Process search results
for result in results[:3]: # Top 3 per query
text = result.get("text", "").strip()
title = result.get("title", "").strip()
# Skip empty or very short results
if not text or len(text) < 20:
continue
# Combine title and text for better context
if title and title not in text:
full_text = f"{title}. {text}"
else:
full_text = text
# Deduplicate
if full_text in seen_texts:
continue
seen_texts.add(full_text)
# Create fact
fact = Fact(
id=str(uuid.uuid4()),
source=result.get("source", "web search"),
text=full_text[:500], # Limit length
collected_at=datetime.utcnow(),
ttl_hours=FACT_TTL_HOURS,
confidence=result.get("confidence", 0.75),
company_id=prospect.company.id
)
facts.append(fact)
await self.store.save_fact(fact)
logger.info(f"Enricher: Added fact from {fact.source}")
except Exception as e:
logger.error(f"Enricher: Error searching for '{query}': {str(e)}")
continue
else:
logger.info("Enricher: Skipping web search (SKIP_WEB_SEARCH=true)")
# Also add company pain points as facts (from discovery)
for pain in prospect.company.pains:
if pain and len(pain) > 10: # Valid pain point
fact = Fact(
id=str(uuid.uuid4()),
source="company_discovery",
text=f"Known challenge: {pain}",
collected_at=datetime.utcnow(),
ttl_hours=FACT_TTL_HOURS * 2, # Discovery data lasts longer
confidence=0.85,
company_id=prospect.company.id
)
facts.append(fact)
await self.store.save_fact(fact)
# Add company notes as facts
for note in prospect.company.notes:
if note and len(note) > 10: # Valid note
fact = Fact(
id=str(uuid.uuid4()),
source="company_discovery",
text=note,
collected_at=datetime.utcnow(),
ttl_hours=FACT_TTL_HOURS * 2,
confidence=0.8,
company_id=prospect.company.id
)
facts.append(fact)
await self.store.save_fact(fact)
prospect.facts = facts
prospect.status = "enriched"
await self.store.save_prospect(prospect)
logger.info(f"Enricher: Added {len(facts)} facts for '{prospect.company.name}'")
return prospect