import random import string import uuid import json import os import requests import httpx import asyncio from fastapi import FastAPI, HTTPException from pydantic import BaseModel from PIL import Image import logging from io import BytesIO import time from datetime import datetime, timedelta # New imports for date manipulation # --- Configuration --- # Set up logging for debugging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) # Global list to store proxies PROXY_LIST = [] PROXY_REFRESH_INTERVAL = 5 * 60 # 5 minutes in seconds # --- FastAPI App Initialization --- app = FastAPI( title="Image Generation API", description="An API to generate images with periodic proxy refreshing and randomized headers.", version="1.4.0" # Version bumped for the new feature ) # ----------- Proxy Management ----------- async def fetch_proxies(): """ Asynchronously fetches a list of proxies from the URL specified in the PROXY_LIST_URL environment variable using httpx. """ global PROXY_LIST proxy_url = os.getenv("PROXY_LIST_URL") if not proxy_url: logger.warning("PROXY_LIST_URL environment variable not set. Running without proxies.") PROXY_LIST = [] return try: async with httpx.AsyncClient() as client: response = await client.get(proxy_url, timeout=10) response.raise_for_status() proxies = [line.strip() for line in response.text.splitlines() if line.strip()] if not proxies: logger.warning("Proxy list was fetched but is empty.") PROXY_LIST = [] else: PROXY_LIST = proxies logger.info(f"Successfully refreshed proxy list. Loaded {len(PROXY_LIST)} proxies.") except httpx.RequestError as e: logger.error(f"Failed to fetch or refresh proxy list: {e}") except Exception as e: logger.error(f"An unexpected error occurred during proxy fetch: {e}") async def schedule_proxy_refresh(): """ A background task that runs forever, refreshing the proxy list periodically. """ while True: await asyncio.sleep(PROXY_REFRESH_INTERVAL) logger.info(f"Scheduled proxy refresh triggered (every {PROXY_REFRESH_INTERVAL / 60} minutes).") await fetch_proxies() @app.on_event("startup") async def startup_event(): """ On application startup, fetch the initial list of proxies and start the background refresh task. """ logger.info("Performing initial proxy fetch on startup...") await fetch_proxies() logger.info("Starting background task for periodic proxy refresh.") asyncio.create_task(schedule_proxy_refresh()) def get_random_proxy(): """Selects a random proxy from the global list.""" if not PROXY_LIST: return None proxy = random.choice(PROXY_LIST) return {"http": proxy, "https": proxy} # ----------- Models ----------- class GenerationRequest(BaseModel): prompt: str negative_prompt: str = "text, talk bubble, low quality, watermark, signature" width: int = 1024 height: int = 1024 seed: int = 0 steps: int = 28 cfg: float = 7.0 session_hash: str = None # ----------- Utils ----------- def generate_session_hash(): return ''.join(random.choices(string.ascii_lowercase + string.digits, k=12)) def generate_zerogpu_uuid(): return f"n{random.randint(1000,9999)}_{uuid.uuid4().hex[:12]}_creitl" def generate_random_future_date_header(): """ Generates a random date string for an HTTP header, between now and 24 hours in the future. """ # Add a random number of seconds (up to 24 hours) to the current time random_seconds = random.randint(0, 24 * 3600) future_time = datetime.utcnow() + timedelta(seconds=random_seconds) # Format according to RFC 7231 for HTTP 'Date' header return future_time.strftime('%a, %d %b %Y %H:%M:%S GMT') # ----------- Helper Function to Upload to SnapZion ----------- def upload_to_snapzion(file_content: BytesIO, file_name: str): token = os.getenv("SNAPZION_API_TOKEN") if not token: logger.error("SNAPZION_API_TOKEN is not set.") raise HTTPException(status_code=500, detail="Server configuration error: SnapZion API token is missing.") upload_url = "https://upload.snapzion.com/api/public-upload" files = {"file": (file_name, file_content, "image/png")} headers = {"Authorization": f"Bearer {token}"} try: response = requests.post(upload_url, headers=headers, files=files, timeout=60) response.raise_for_status() except requests.exceptions.RequestException as e: logger.error(f"SnapZion upload failed: {e}") raise HTTPException(status_code=500, detail="Failed to upload the image to SnapZion.") response_data = response.json() return response_data.get("url") # ----------- API Route ----------- @app.post("/v1/images/generation", tags=["Image Generation"]) async def generate_image(req: GenerationRequest): session_hash = req.session_hash if req.session_hash else generate_session_hash() uuid_token = generate_zerogpu_uuid() # --- HEADER CUSTOMIZATION --- # Generate a new random Date header for each request random_date_header = generate_random_future_date_header() logger.info(f"Using randomized Date header for this request: {random_date_header}") headers_post = { "accept": "*/*", "accept-language": "en-US,en;q=0.9", "content-type": "application/json", "Date": random_date_header, # <-- Here is the new randomized header "origin": "https://heartsync-nsfw-uncensored.hf.space", "referer": "https://heartsync-nsfw-uncensored.hf.space/?not-for-all-audiences=true&__theme=system", "sec-ch-ua": '"Not/A)Brand";v="8", "Chromium";v="126", "Google Chrome";v="126"', "sec-ch-ua-mobile": "?0", "sec-ch-ua-platform": '"Windows"', "sec-fetch-dest": "empty", "sec-fetch-mode": "cors", "sec-fetch-site": "same-origin", "user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/126.0.0.0 Safari/537.36", "x-zerogpu-uuid": uuid_token } payload = { "data": [ req.prompt, req.negative_prompt, req.seed, True, req.width, req.height, req.cfg, req.steps ], "event_data": None, "fn_index": 2, "trigger_id": 16, "session_hash": session_hash } try: # Step 1: Join Queue join_proxy = get_random_proxy() proxy_ip_log = join_proxy['http'].split('@')[-1] if join_proxy else 'None' logger.info(f"Step 1: Joining queue with proxy: {proxy_ip_log}") join_response = requests.post( f"https://heartsync-nsfw-uncensored.hf.space/gradio_api/queue/join?not-for-all-audiences=true&__theme=system", headers=headers_post, json=payload, proxies=join_proxy, timeout=30 ) join_response.raise_for_status() logger.info("Successfully joined queue. Now listening for SSE data.") # Step 2: Listen to queue via SSE sse_proxy = get_random_proxy() proxy_ip_log = sse_proxy['http'].split('@')[-1] if sse_proxy else 'None' logger.info(f"Step 2: Listening to SSE stream with proxy: {proxy_ip_log}") sse_url = f"https://heartsync-nsfw-uncensored.hf.space/gradio_api/queue/data?session_hash={session_hash}" sse_headers = {"accept": "text/event-stream", "user-agent": headers_post["user-agent"], "Date": random_date_header} with requests.get(sse_url, headers=sse_headers, stream=True, proxies=sse_proxy, timeout=300) as sse_response: sse_response.raise_for_status() for line in sse_response.iter_lines(): if not line: continue decoded_line = line.decode('utf-8') if decoded_line.startswith("data:"): try: data = json.loads(decoded_line[5:].strip()) if data.get("msg") == "process_completed": logger.info("Process completed. Extracting image data.") output_list = data.get("output", {}).get("data", []) if not output_list or not isinstance(output_list, list) or len(output_list) == 0: raise HTTPException(status_code=500, detail="Generation completed but no image data was returned.") image_dict = output_list[0] if not isinstance(image_dict, dict): raise HTTPException(status_code=500, detail="Unexpected output format from generation service.") image_url = image_dict.get("url") if not image_url: raise HTTPException(status_code=500, detail="Image data received, but the URL is missing.") # Step 3: Download image download_proxy = get_random_proxy() proxy_ip_log = download_proxy['http'].split('@')[-1] if download_proxy else 'None' logger.info(f"Step 3: Downloading image with proxy: {proxy_ip_log}") image_response = requests.get(image_url, proxies=download_proxy, timeout=60) image_response.raise_for_status() image = Image.open(BytesIO(image_response.content)) png_buffer = BytesIO() image.save(png_buffer, format="PNG") png_buffer.seek(0) sanitized_prompt = "".join(c for c in req.prompt if c.isalnum() or c in " _-").rstrip()[:50] file_name = f"{sanitized_prompt}_{int(time.time())}.png" snapzion_url = upload_to_snapzion(png_buffer, file_name) logger.info(f"Image successfully uploaded to SnapZion: {snapzion_url}") return {"success": True, "image_url": snapzion_url} except (json.JSONDecodeError, IndexError): continue raise HTTPException(status_code=504, detail="Stream closed before generation could complete.") except requests.exceptions.HTTPError as e: logger.error(f"HTTP error occurred: {e.response.status_code} - {e.response.text}") raise HTTPException(status_code=e.response.status_code, detail=f"HTTP Error: {e.response.text}") except requests.exceptions.RequestException as e: logger.error(f"A network error occurred: {e}") raise HTTPException(status_code=503, detail=f"Service unavailable or network error: {e}") except Exception as e: logger.error(f"An unexpected error occurred: {e}", exc_info=True) raise HTTPException(status_code=500, detail=f"An internal server error occurred: {str(e)}")