Spaces:
Runtime error
Runtime error
| """ | |
| Modal.com Deployment Configuration for Spend Analyzer MCP | |
| Enhanced with Claude and SambaNova Cloud API support | |
| """ | |
| import modal | |
| import os | |
| from typing import Dict, Any, Optional | |
| import json | |
| import asyncio | |
| from datetime import datetime | |
| import logging | |
| # Create Modal app | |
| app = modal.App("spend-analyzer-mcp-bmt") | |
| # Define the container image with all dependencies | |
| image = ( | |
| modal.Image.debian_slim(python_version="3.11") | |
| .pip_install([ | |
| "fastapi", | |
| "uvicorn", | |
| "gradio", | |
| "pandas", | |
| "numpy", | |
| "PyPDF2", | |
| "PyMuPDF", | |
| "anthropic>=0.7.0", | |
| "openai>=1.0.0", | |
| "python-multipart", | |
| "aiofiles", | |
| "python-dotenv", | |
| "imaplib2", | |
| "email-validator", | |
| "pydantic>=1.10.0", | |
| "websockets", | |
| "asyncio-mqtt", | |
| "python-dateutil", | |
| "regex", | |
| "plotly>=5.0.0", | |
| "requests>=2.28.0", | |
| "httpx>=0.24.0" | |
| ]) | |
| .apt_install(["tesseract-ocr", "tesseract-ocr-eng", "poppler-utils"]) | |
| ) | |
| # Secrets for API keys and email credentials | |
| secrets = [ | |
| modal.Secret.from_name("anthropic-api-key"), # ANTHROPIC_API_KEY | |
| modal.Secret.from_name("sambanova-api-key"), # SAMBANOVA_API_KEY | |
| modal.Secret.from_name("email-credentials"), # EMAIL_USER, EMAIL_PASS, IMAP_SERVER | |
| ] | |
| # Shared volume for persistent storage | |
| volume = modal.Volume.from_name("spend-analyzer-data", create_if_missing=True) | |
| def process_bank_statements(email_config: Dict, days_back: int = 30, passwords: Optional[Dict] = None): | |
| """ | |
| Modal function to process bank statements from email | |
| """ | |
| import sys | |
| sys.path.append("/data") | |
| from email_processor import EmailProcessor, PDFProcessor | |
| from spend_analyzer import SpendAnalyzer | |
| try: | |
| # Initialize processors | |
| email_processor = EmailProcessor(email_config) | |
| pdf_processor = PDFProcessor() | |
| analyzer = SpendAnalyzer() | |
| # Fetch emails | |
| emails = asyncio.run(email_processor.fetch_bank_emails(days_back)) | |
| all_transactions = [] | |
| processed_statements = [] | |
| for email_msg in emails: | |
| try: | |
| # Extract attachments | |
| attachments = asyncio.run(email_processor.extract_attachments(email_msg)) | |
| for filename, content, file_type in attachments: | |
| if file_type == 'pdf': | |
| # Try to process PDF | |
| password = None | |
| if passwords and filename in passwords: | |
| password = passwords[filename] | |
| try: | |
| statement_info = asyncio.run(pdf_processor.process_pdf(content, password)) | |
| all_transactions.extend(statement_info.transactions) | |
| processed_statements.append({ | |
| 'filename': filename, | |
| 'bank': statement_info.bank_name, | |
| 'account': statement_info.account_number, | |
| 'period': statement_info.statement_period, | |
| 'transaction_count': len(statement_info.transactions) | |
| }) | |
| except ValueError as e: | |
| if "password" in str(e).lower(): | |
| # PDF requires password | |
| processed_statements.append({ | |
| 'filename': filename, | |
| 'status': 'password_required', | |
| 'error': str(e) | |
| }) | |
| else: | |
| processed_statements.append({ | |
| 'filename': filename, | |
| 'status': 'error', | |
| 'error': str(e) | |
| }) | |
| except Exception as e: | |
| logging.error(f"Error processing email: {e}") | |
| continue | |
| # Analyze transactions | |
| if all_transactions: | |
| analyzer.load_transactions(all_transactions) | |
| analysis_data = analyzer.export_analysis_data() | |
| else: | |
| analysis_data = {'message': 'No transactions found'} | |
| return { | |
| 'processed_statements': processed_statements, | |
| 'total_transactions': len(all_transactions), | |
| 'analysis': analysis_data, | |
| 'timestamp': datetime.now().isoformat() | |
| } | |
| except Exception as e: | |
| logging.error(f"Error in process_bank_statements: {e}") | |
| return {'error': str(e)} | |
| def analyze_uploaded_statements(pdf_contents: Dict[str, bytes], passwords: Optional[Dict] = None): | |
| """ | |
| Modal function to analyze directly uploaded PDF statements | |
| """ | |
| from pdf_processor import PDFProcessor | |
| from spend_analyzer import SpendAnalyzer | |
| try: | |
| pdf_processor = PDFProcessor() | |
| analyzer = SpendAnalyzer() | |
| all_transactions = [] | |
| processed_files = [] | |
| for filename, content in pdf_contents.items(): | |
| try: | |
| password = passwords.get(filename) if passwords else None | |
| statement_info = asyncio.run(pdf_processor.process_pdf(content, password)) | |
| all_transactions.extend(statement_info.transactions) | |
| processed_files.append({ | |
| 'filename': filename, | |
| 'bank': statement_info.bank_name, | |
| 'account': statement_info.account_number, | |
| 'transaction_count': len(statement_info.transactions), | |
| 'status': 'success' | |
| }) | |
| except Exception as e: | |
| processed_files.append({ | |
| 'filename': filename, | |
| 'status': 'error', | |
| 'error': str(e) | |
| }) | |
| # Analyze transactions | |
| if all_transactions: | |
| analyzer.load_transactions(all_transactions) | |
| analysis_data = analyzer.export_analysis_data() | |
| else: | |
| analysis_data = {'message': 'No transactions found'} | |
| return { | |
| 'processed_files': processed_files, | |
| 'total_transactions': len(all_transactions), | |
| 'analysis': analysis_data | |
| } | |
| except Exception as e: | |
| return {'error': str(e)} | |
| def get_ai_analysis(analysis_data: Dict, user_question: str = "", provider: str = "claude"): | |
| """ | |
| Modal function to get AI analysis of spending data using Claude or SambaNova | |
| """ | |
| try: | |
| # Prepare context for AI | |
| context = f""" | |
| Financial Analysis Data: | |
| {json.dumps(analysis_data, indent=2, default=str)} | |
| User Question: {user_question if user_question else "Please provide a comprehensive analysis of my spending patterns and recommendations."} | |
| """ | |
| prompt = f""" | |
| You are a financial advisor analyzing bank statement data. | |
| Based on the provided financial data, give insights about: | |
| 1. Spending patterns and trends | |
| 2. Budget adherence and alerts | |
| 3. Unusual transactions that need attention | |
| 4. Specific recommendations for improvement | |
| 5. Answer to the user's specific question if provided | |
| Be specific, actionable, and highlight both positive aspects and areas for improvement. | |
| {context} | |
| """ | |
| if provider.lower() == "claude": | |
| return _get_claude_analysis(prompt) | |
| elif provider.lower() == "sambanova": | |
| return _get_sambanova_analysis(prompt) | |
| else: | |
| # Default to Claude | |
| return _get_claude_analysis(prompt) | |
| except Exception as e: | |
| return {'error': f"AI API error: {str(e)}"} | |
| def _get_claude_analysis(prompt: str) -> Dict: | |
| """Get analysis from Claude API""" | |
| try: | |
| import anthropic | |
| client = anthropic.Anthropic(api_key=os.environ["ANTHROPIC_API_KEY"]) | |
| response = client.messages.create( | |
| model="claude-3-sonnet-20240229", | |
| max_tokens=1500, | |
| messages=[ | |
| { | |
| "role": "user", | |
| "content": prompt | |
| } | |
| ] | |
| ) | |
| # Handle different response formats | |
| if hasattr(response.content[0], 'text'): | |
| analysis_text = response.content[0].text | |
| else: | |
| analysis_text = str(response.content[0]) | |
| return { | |
| 'ai_analysis': analysis_text, | |
| 'provider': 'claude', | |
| 'model': 'claude-3-sonnet-20240229', | |
| 'usage': { | |
| 'input_tokens': response.usage.input_tokens, | |
| 'output_tokens': response.usage.output_tokens, | |
| 'total_tokens': response.usage.input_tokens + response.usage.output_tokens | |
| } | |
| } | |
| except Exception as e: | |
| return {'error': f"Claude API error: {str(e)}"} | |
| def _get_sambanova_analysis(prompt: str) -> Dict: | |
| """Get analysis from SambaNova Cloud API""" | |
| try: | |
| import openai | |
| # SambaNova uses OpenAI-compatible API | |
| client = openai.OpenAI( | |
| api_key=os.environ["SAMBANOVA_API_KEY"], | |
| base_url="https://api.sambanova.ai/v1" | |
| ) | |
| response = client.chat.completions.create( | |
| model="Meta-Llama-3.1-8B-Instruct", # SambaNova model | |
| messages=[ | |
| { | |
| "role": "user", | |
| "content": prompt | |
| } | |
| ], | |
| max_tokens=1500, | |
| temperature=0.7 | |
| ) | |
| return { | |
| 'ai_analysis': response.choices[0].message.content, | |
| 'provider': 'sambanova', | |
| 'model': 'Meta-Llama-3.1-8B-Instruct', | |
| 'usage': { | |
| 'input_tokens': response.usage.prompt_tokens, | |
| 'output_tokens': response.usage.completion_tokens, | |
| 'total_tokens': response.usage.total_tokens | |
| } | |
| } | |
| except Exception as e: | |
| return {'error': f"SambaNova API error: {str(e)}"} | |
| def save_user_data(user_id: str, data: Dict): | |
| """ | |
| Save user analysis data to persistent storage | |
| """ | |
| try: | |
| import json | |
| import os | |
| user_dir = f"/data/users/{user_id}" | |
| os.makedirs(user_dir, exist_ok=True) | |
| # Save analysis data | |
| with open(f"{user_dir}/analysis.json", "w") as f: | |
| json.dump(data, f, indent=2, default=str) | |
| # Save timestamp | |
| with open(f"{user_dir}/last_updated.txt", "w") as f: | |
| f.write(datetime.now().isoformat()) | |
| return {"status": "saved", "path": user_dir} | |
| except Exception as e: | |
| return {"error": str(e)} | |
| def load_user_data(user_id: str): | |
| """ | |
| Load user analysis data from persistent storage | |
| """ | |
| try: | |
| import json | |
| user_dir = f"/data/users/{user_id}" | |
| analysis_file = f"{user_dir}/analysis.json" | |
| if os.path.exists(analysis_file): | |
| with open(analysis_file, "r") as f: | |
| data = json.load(f) | |
| # Get last updated time | |
| last_updated = None | |
| if os.path.exists(f"{user_dir}/last_updated.txt"): | |
| with open(f"{user_dir}/last_updated.txt", "r") as f: | |
| last_updated = f.read().strip() | |
| return { | |
| "data": data, | |
| "last_updated": last_updated, | |
| "status": "found" | |
| } | |
| else: | |
| return {"status": "not_found"} | |
| except Exception as e: | |
| return {"error": str(e)} | |
| # Webhook endpoint for MCP integration | |
| def mcp_webhook(request_data: Dict): | |
| """ | |
| Webhook endpoint for MCP protocol messages | |
| """ | |
| try: | |
| from mcp_server import MCPServer | |
| # Initialize MCP server | |
| server = MCPServer() | |
| # Register tools | |
| async def process_statements_tool(args): | |
| email_config = args.get('email_config', {}) | |
| days_back = args.get('days_back', 30) | |
| passwords = args.get('passwords', {}) | |
| result = process_bank_statements.remote(email_config, days_back, passwords) | |
| return result | |
| async def analyze_pdf_tool(args): | |
| pdf_contents = args.get('pdf_contents', {}) | |
| passwords = args.get('passwords', {}) | |
| result = analyze_uploaded_statements.remote(pdf_contents, passwords) | |
| return result | |
| async def get_analysis_tool(args): | |
| analysis_data = args.get('analysis_data', {}) | |
| user_question = args.get('user_question', '') | |
| provider = args.get('provider', 'claude') | |
| result = get_ai_analysis.remote(analysis_data, user_question, provider) | |
| return result | |
| # Register tools with MCP server | |
| server.register_tool("process_email_statements", "Process bank statements from email", process_statements_tool) | |
| server.register_tool("analyze_pdf_statements", "Analyze uploaded PDF statements", analyze_pdf_tool) | |
| server.register_tool("get_ai_analysis", "Get AI financial analysis (Claude or SambaNova)", get_analysis_tool) | |
| # Handle MCP message | |
| response = asyncio.run(server.handle_message(request_data)) | |
| return response | |
| except Exception as e: | |
| return { | |
| "jsonrpc": "2.0", | |
| "id": request_data.get("id"), | |
| "error": { | |
| "code": -32603, | |
| "message": str(e) | |
| } | |
| } | |
| # CLI for local testing | |
| def main(): | |
| """ | |
| Local entrypoint for testing Modal functions | |
| """ | |
| print("Testing Modal deployment...") | |
| # Test basic functionality | |
| test_data = { | |
| "spending_insights": [], | |
| "recommendations": ["Test recommendation"] | |
| } | |
| result = get_ai_analysis.remote(test_data, "What do you think about my spending?", "claude") | |
| print("AI analysis result:", result) | |
| if __name__ == "__main__": | |
| # For running locally | |
| modal.run(main) | |