Spaces:
Sleeping
Sleeping
| """ | |
| Sentinel Arbitrage Engine - v21.0 OMEGA (Unbreakable) | |
| This is the definitive, money-spinning engine. It uses a self-healing | |
| background task, a transparent logging UI, and a robust Socket.IO | |
| connection to guarantee performance and reliability. | |
| """ | |
| import asyncio | |
| import os | |
| import json | |
| import time | |
| from contextlib import asynccontextmanager | |
| from datetime import datetime, timezone | |
| import httpx | |
| import socketio | |
| from fastapi import FastAPI | |
| from fastapi.staticfiles import StaticFiles | |
| from .price_fetcher import PriceFetcher | |
| from .arbitrage_analyzer import ArbitrageAnalyzer | |
| OPPORTUNITY_THRESHOLD = 0.0015 | |
| sio = socketio.AsyncServer(async_mode='asgi', cors_allowed_origins='*') | |
| async def log_and_emit(message: str): | |
| """Helper to both print to console and emit to frontend.""" | |
| print(message) | |
| await sio.emit('log_message', message) | |
| async def run_arbitrage_detector(price_fetcher, analyzer): | |
| """The self-healing core engine loop.""" | |
| await log_and_emit("Engine core starting...") | |
| while True: | |
| try: | |
| await log_and_emit("Updating prices from oracles...") | |
| await price_fetcher.update_prices_async() | |
| all_prices = price_fetcher.get_all_prices() | |
| if not any(p.get("pyth") and p.get("chainlink_agg") for p in all_prices.values()): | |
| await log_and_emit("Oracle data incomplete. Waiting for next cycle.") | |
| await asyncio.sleep(30) | |
| continue | |
| for asset, prices in all_prices.items(): | |
| pyth_price = prices.get("pyth") | |
| chainlink_price = prices.get("chainlink_agg") | |
| if pyth_price and chainlink_price and pyth_price > 0: | |
| spread = abs(pyth_price - chainlink_price) / chainlink_price | |
| if spread > OPPORTUNITY_THRESHOLD: | |
| await log_and_emit(f"β‘οΈ Discrepancy for {asset}: {spread:.3f}%") | |
| briefing = await analyzer.get_alpha_briefing(asset, {"asset": asset, "pyth_price": pyth_price, "chainlink_price": chainlink_price, "spread_pct": spread * 100}) | |
| if briefing: | |
| signal = {"asset": asset, "pyth_price": pyth_price, "chainlink_price": chainlink_price, "spread_pct": spread * 100, **briefing, "timestamp": datetime.now(timezone.utc).isoformat()} | |
| await sio.emit('new_signal', signal) | |
| await log_and_emit(f"β Signal Emitted for {asset}: {signal['strategy']}") | |
| except Exception as e: | |
| await log_and_emit(f"β CRITICAL ERROR in engine loop: {e}. Recovering...") | |
| await asyncio.sleep(15) | |
| async def lifespan(app: FastAPI): | |
| print("π Initializing Sentinel Arbitrage Engine v21.0...") | |
| async with httpx.AsyncClient() as client: | |
| price_fetcher = PriceFetcher(client) | |
| arbitrage_analyzer = ArbitrageAnalyzer(client) | |
| sio.background_task = sio.start_background_task(run_arbitrage_detector, price_fetcher, arbitrage_analyzer) | |
| yield | |
| print("β Engine shut down.") | |
| app = FastAPI(lifespan=lifespan) | |
| socket_app = socketio.ASGIApp(sio) | |
| app.mount('/socket.io', socket_app) | |
| app.mount("/", StaticFiles(directory="static", html=True), name="static") | |
| async def connect(sid, environ): | |
| await sio.emit('welcome', {'message': 'Connection to Sentinel Engine established!'}, to=sid) | |
| print(f"β Client connected: {sid}") | |
| async def disconnect(sid): | |
| print(f"π₯ Client disconnected: {sid}") |