Spaces:
Paused
Paused
| import random | |
| import string | |
| import uuid | |
| import json | |
| import os | |
| import requests | |
| import httpx | |
| import asyncio | |
| from fastapi import FastAPI, HTTPException | |
| from pydantic import BaseModel | |
| from PIL import Image | |
| import logging | |
| from io import BytesIO | |
| import time | |
| from datetime import datetime, timedelta # New imports for date manipulation | |
| # --- Configuration --- | |
| # Set up logging for debugging | |
| logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') | |
| logger = logging.getLogger(__name__) | |
| # Global list to store proxies | |
| PROXY_LIST = [] | |
| PROXY_REFRESH_INTERVAL = 5 * 60 # 5 minutes in seconds | |
| # --- FastAPI App Initialization --- | |
| app = FastAPI( | |
| title="Image Generation API", | |
| description="An API to generate images with periodic proxy refreshing and randomized headers.", | |
| version="1.4.0" # Version bumped for the new feature | |
| ) | |
| # ----------- Proxy Management ----------- | |
| async def fetch_proxies(): | |
| """ | |
| Asynchronously fetches a list of proxies from the URL specified in the | |
| PROXY_LIST_URL environment variable using httpx. | |
| """ | |
| global PROXY_LIST | |
| proxy_url = os.getenv("PROXY_LIST_URL") | |
| if not proxy_url: | |
| logger.warning("PROXY_LIST_URL environment variable not set. Running without proxies.") | |
| PROXY_LIST = [] | |
| return | |
| try: | |
| async with httpx.AsyncClient() as client: | |
| response = await client.get(proxy_url, timeout=10) | |
| response.raise_for_status() | |
| proxies = [line.strip() for line in response.text.splitlines() if line.strip()] | |
| if not proxies: | |
| logger.warning("Proxy list was fetched but is empty.") | |
| PROXY_LIST = [] | |
| else: | |
| PROXY_LIST = proxies | |
| logger.info(f"Successfully refreshed proxy list. Loaded {len(PROXY_LIST)} proxies.") | |
| except httpx.RequestError as e: | |
| logger.error(f"Failed to fetch or refresh proxy list: {e}") | |
| except Exception as e: | |
| logger.error(f"An unexpected error occurred during proxy fetch: {e}") | |
| async def schedule_proxy_refresh(): | |
| """ | |
| A background task that runs forever, refreshing the proxy list periodically. | |
| """ | |
| while True: | |
| await asyncio.sleep(PROXY_REFRESH_INTERVAL) | |
| logger.info(f"Scheduled proxy refresh triggered (every {PROXY_REFRESH_INTERVAL / 60} minutes).") | |
| await fetch_proxies() | |
| async def startup_event(): | |
| """ | |
| On application startup, fetch the initial list of proxies and start the background refresh task. | |
| """ | |
| logger.info("Performing initial proxy fetch on startup...") | |
| await fetch_proxies() | |
| logger.info("Starting background task for periodic proxy refresh.") | |
| asyncio.create_task(schedule_proxy_refresh()) | |
| def get_random_proxy(): | |
| """Selects a random proxy from the global list.""" | |
| if not PROXY_LIST: | |
| return None | |
| proxy = random.choice(PROXY_LIST) | |
| return {"http": proxy, "https": proxy} | |
| # ----------- Models ----------- | |
| class GenerationRequest(BaseModel): | |
| prompt: str | |
| negative_prompt: str = "text, talk bubble, low quality, watermark, signature" | |
| width: int = 1024 | |
| height: int = 1024 | |
| seed: int = 0 | |
| steps: int = 28 | |
| cfg: float = 7.0 | |
| session_hash: str = None | |
| # ----------- Utils ----------- | |
| def generate_session_hash(): | |
| return ''.join(random.choices(string.ascii_lowercase + string.digits, k=12)) | |
| def generate_zerogpu_uuid(): | |
| return f"n{random.randint(1000,9999)}_{uuid.uuid4().hex[:12]}_creitl" | |
| def generate_random_future_date_header(): | |
| """ | |
| Generates a random date string for an HTTP header, between now and 24 hours in the future. | |
| """ | |
| # Add a random number of seconds (up to 24 hours) to the current time | |
| random_seconds = random.randint(0, 24 * 3600) | |
| future_time = datetime.utcnow() + timedelta(seconds=random_seconds) | |
| # Format according to RFC 7231 for HTTP 'Date' header | |
| return future_time.strftime('%a, %d %b %Y %H:%M:%S GMT') | |
| # ----------- Helper Function to Upload to SnapZion ----------- | |
| def upload_to_snapzion(file_content: BytesIO, file_name: str): | |
| token = os.getenv("SNAPZION_API_TOKEN") | |
| if not token: | |
| logger.error("SNAPZION_API_TOKEN is not set.") | |
| raise HTTPException(status_code=500, detail="Server configuration error: SnapZion API token is missing.") | |
| upload_url = "https://upload.snapzion.com/api/public-upload" | |
| files = {"file": (file_name, file_content, "image/png")} | |
| headers = {"Authorization": f"Bearer {token}"} | |
| try: | |
| response = requests.post(upload_url, headers=headers, files=files, timeout=60) | |
| response.raise_for_status() | |
| except requests.exceptions.RequestException as e: | |
| logger.error(f"SnapZion upload failed: {e}") | |
| raise HTTPException(status_code=500, detail="Failed to upload the image to SnapZion.") | |
| response_data = response.json() | |
| return response_data.get("url") | |
| # ----------- API Route ----------- | |
| async def generate_image(req: GenerationRequest): | |
| session_hash = req.session_hash if req.session_hash else generate_session_hash() | |
| uuid_token = generate_zerogpu_uuid() | |
| # --- HEADER CUSTOMIZATION --- | |
| # Generate a new random Date header for each request | |
| random_date_header = generate_random_future_date_header() | |
| logger.info(f"Using randomized Date header for this request: {random_date_header}") | |
| headers_post = { | |
| "accept": "*/*", | |
| "accept-language": "en-US,en;q=0.9", | |
| "content-type": "application/json", | |
| "Date": random_date_header, # <-- Here is the new randomized header | |
| "origin": "https://heartsync-nsfw-uncensored.hf.space", | |
| "referer": "https://heartsync-nsfw-uncensored.hf.space/?not-for-all-audiences=true&__theme=system", | |
| "sec-ch-ua": '"Not/A)Brand";v="8", "Chromium";v="126", "Google Chrome";v="126"', | |
| "sec-ch-ua-mobile": "?0", | |
| "sec-ch-ua-platform": '"Windows"', | |
| "sec-fetch-dest": "empty", | |
| "sec-fetch-mode": "cors", | |
| "sec-fetch-site": "same-origin", | |
| "user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/126.0.0.0 Safari/537.36", | |
| "x-zerogpu-uuid": uuid_token | |
| } | |
| payload = { | |
| "data": [ | |
| req.prompt, req.negative_prompt, req.seed, True, req.width, | |
| req.height, req.cfg, req.steps | |
| ], | |
| "event_data": None, "fn_index": 2, "trigger_id": 16, "session_hash": session_hash | |
| } | |
| try: | |
| # Step 1: Join Queue | |
| join_proxy = get_random_proxy() | |
| proxy_ip_log = join_proxy['http'].split('@')[-1] if join_proxy else 'None' | |
| logger.info(f"Step 1: Joining queue with proxy: {proxy_ip_log}") | |
| join_response = requests.post( | |
| f"https://heartsync-nsfw-uncensored.hf.space/gradio_api/queue/join?not-for-all-audiences=true&__theme=system", | |
| headers=headers_post, json=payload, proxies=join_proxy, timeout=30 | |
| ) | |
| join_response.raise_for_status() | |
| logger.info("Successfully joined queue. Now listening for SSE data.") | |
| # Step 2: Listen to queue via SSE | |
| sse_proxy = get_random_proxy() | |
| proxy_ip_log = sse_proxy['http'].split('@')[-1] if sse_proxy else 'None' | |
| logger.info(f"Step 2: Listening to SSE stream with proxy: {proxy_ip_log}") | |
| sse_url = f"https://heartsync-nsfw-uncensored.hf.space/gradio_api/queue/data?session_hash={session_hash}" | |
| sse_headers = {"accept": "text/event-stream", "user-agent": headers_post["user-agent"], "Date": random_date_header} | |
| with requests.get(sse_url, headers=sse_headers, stream=True, proxies=sse_proxy, timeout=300) as sse_response: | |
| sse_response.raise_for_status() | |
| for line in sse_response.iter_lines(): | |
| if not line: continue | |
| decoded_line = line.decode('utf-8') | |
| if decoded_line.startswith("data:"): | |
| try: | |
| data = json.loads(decoded_line[5:].strip()) | |
| if data.get("msg") == "process_completed": | |
| logger.info("Process completed. Extracting image data.") | |
| output_list = data.get("output", {}).get("data", []) | |
| if not output_list or not isinstance(output_list, list) or len(output_list) == 0: | |
| raise HTTPException(status_code=500, detail="Generation completed but no image data was returned.") | |
| image_dict = output_list[0] | |
| if not isinstance(image_dict, dict): | |
| raise HTTPException(status_code=500, detail="Unexpected output format from generation service.") | |
| image_url = image_dict.get("url") | |
| if not image_url: | |
| raise HTTPException(status_code=500, detail="Image data received, but the URL is missing.") | |
| # Step 3: Download image | |
| download_proxy = get_random_proxy() | |
| proxy_ip_log = download_proxy['http'].split('@')[-1] if download_proxy else 'None' | |
| logger.info(f"Step 3: Downloading image with proxy: {proxy_ip_log}") | |
| image_response = requests.get(image_url, proxies=download_proxy, timeout=60) | |
| image_response.raise_for_status() | |
| image = Image.open(BytesIO(image_response.content)) | |
| png_buffer = BytesIO() | |
| image.save(png_buffer, format="PNG") | |
| png_buffer.seek(0) | |
| sanitized_prompt = "".join(c for c in req.prompt if c.isalnum() or c in " _-").rstrip()[:50] | |
| file_name = f"{sanitized_prompt}_{int(time.time())}.png" | |
| snapzion_url = upload_to_snapzion(png_buffer, file_name) | |
| logger.info(f"Image successfully uploaded to SnapZion: {snapzion_url}") | |
| return {"success": True, "image_url": snapzion_url} | |
| except (json.JSONDecodeError, IndexError): continue | |
| raise HTTPException(status_code=504, detail="Stream closed before generation could complete.") | |
| except requests.exceptions.HTTPError as e: | |
| logger.error(f"HTTP error occurred: {e.response.status_code} - {e.response.text}") | |
| raise HTTPException(status_code=e.response.status_code, detail=f"HTTP Error: {e.response.text}") | |
| except requests.exceptions.RequestException as e: | |
| logger.error(f"A network error occurred: {e}") | |
| raise HTTPException(status_code=503, detail=f"Service unavailable or network error: {e}") | |
| except Exception as e: | |
| logger.error(f"An unexpected error occurred: {e}", exc_info=True) | |
| raise HTTPException(status_code=500, detail=f"An internal server error occurred: {str(e)}") |