NSFW-IMG-API / main.py
AIMaster7's picture
Update main.py
de40268 verified
import random
import string
import uuid
import json
import os
import requests
import httpx
import asyncio
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from PIL import Image
import logging
from io import BytesIO
import time
from datetime import datetime, timedelta # New imports for date manipulation
# --- Configuration ---
# Set up logging for debugging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# Global list to store proxies
PROXY_LIST = []
PROXY_REFRESH_INTERVAL = 5 * 60 # 5 minutes in seconds
# --- FastAPI App Initialization ---
app = FastAPI(
title="Image Generation API",
description="An API to generate images with periodic proxy refreshing and randomized headers.",
version="1.4.0" # Version bumped for the new feature
)
# ----------- Proxy Management -----------
async def fetch_proxies():
"""
Asynchronously fetches a list of proxies from the URL specified in the
PROXY_LIST_URL environment variable using httpx.
"""
global PROXY_LIST
proxy_url = os.getenv("PROXY_LIST_URL")
if not proxy_url:
logger.warning("PROXY_LIST_URL environment variable not set. Running without proxies.")
PROXY_LIST = []
return
try:
async with httpx.AsyncClient() as client:
response = await client.get(proxy_url, timeout=10)
response.raise_for_status()
proxies = [line.strip() for line in response.text.splitlines() if line.strip()]
if not proxies:
logger.warning("Proxy list was fetched but is empty.")
PROXY_LIST = []
else:
PROXY_LIST = proxies
logger.info(f"Successfully refreshed proxy list. Loaded {len(PROXY_LIST)} proxies.")
except httpx.RequestError as e:
logger.error(f"Failed to fetch or refresh proxy list: {e}")
except Exception as e:
logger.error(f"An unexpected error occurred during proxy fetch: {e}")
async def schedule_proxy_refresh():
"""
A background task that runs forever, refreshing the proxy list periodically.
"""
while True:
await asyncio.sleep(PROXY_REFRESH_INTERVAL)
logger.info(f"Scheduled proxy refresh triggered (every {PROXY_REFRESH_INTERVAL / 60} minutes).")
await fetch_proxies()
@app.on_event("startup")
async def startup_event():
"""
On application startup, fetch the initial list of proxies and start the background refresh task.
"""
logger.info("Performing initial proxy fetch on startup...")
await fetch_proxies()
logger.info("Starting background task for periodic proxy refresh.")
asyncio.create_task(schedule_proxy_refresh())
def get_random_proxy():
"""Selects a random proxy from the global list."""
if not PROXY_LIST:
return None
proxy = random.choice(PROXY_LIST)
return {"http": proxy, "https": proxy}
# ----------- Models -----------
class GenerationRequest(BaseModel):
prompt: str
negative_prompt: str = "text, talk bubble, low quality, watermark, signature"
width: int = 1024
height: int = 1024
seed: int = 0
steps: int = 28
cfg: float = 7.0
session_hash: str = None
# ----------- Utils -----------
def generate_session_hash():
return ''.join(random.choices(string.ascii_lowercase + string.digits, k=12))
def generate_zerogpu_uuid():
return f"n{random.randint(1000,9999)}_{uuid.uuid4().hex[:12]}_creitl"
def generate_random_future_date_header():
"""
Generates a random date string for an HTTP header, between now and 24 hours in the future.
"""
# Add a random number of seconds (up to 24 hours) to the current time
random_seconds = random.randint(0, 24 * 3600)
future_time = datetime.utcnow() + timedelta(seconds=random_seconds)
# Format according to RFC 7231 for HTTP 'Date' header
return future_time.strftime('%a, %d %b %Y %H:%M:%S GMT')
# ----------- Helper Function to Upload to SnapZion -----------
def upload_to_snapzion(file_content: BytesIO, file_name: str):
token = os.getenv("SNAPZION_API_TOKEN")
if not token:
logger.error("SNAPZION_API_TOKEN is not set.")
raise HTTPException(status_code=500, detail="Server configuration error: SnapZion API token is missing.")
upload_url = "https://upload.snapzion.com/api/public-upload"
files = {"file": (file_name, file_content, "image/png")}
headers = {"Authorization": f"Bearer {token}"}
try:
response = requests.post(upload_url, headers=headers, files=files, timeout=60)
response.raise_for_status()
except requests.exceptions.RequestException as e:
logger.error(f"SnapZion upload failed: {e}")
raise HTTPException(status_code=500, detail="Failed to upload the image to SnapZion.")
response_data = response.json()
return response_data.get("url")
# ----------- API Route -----------
@app.post("/v1/images/generation", tags=["Image Generation"])
async def generate_image(req: GenerationRequest):
session_hash = req.session_hash if req.session_hash else generate_session_hash()
uuid_token = generate_zerogpu_uuid()
# --- HEADER CUSTOMIZATION ---
# Generate a new random Date header for each request
random_date_header = generate_random_future_date_header()
logger.info(f"Using randomized Date header for this request: {random_date_header}")
headers_post = {
"accept": "*/*",
"accept-language": "en-US,en;q=0.9",
"content-type": "application/json",
"Date": random_date_header, # <-- Here is the new randomized header
"origin": "https://heartsync-nsfw-uncensored.hf.space",
"referer": "https://heartsync-nsfw-uncensored.hf.space/?not-for-all-audiences=true&__theme=system",
"sec-ch-ua": '"Not/A)Brand";v="8", "Chromium";v="126", "Google Chrome";v="126"',
"sec-ch-ua-mobile": "?0",
"sec-ch-ua-platform": '"Windows"',
"sec-fetch-dest": "empty",
"sec-fetch-mode": "cors",
"sec-fetch-site": "same-origin",
"user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/126.0.0.0 Safari/537.36",
"x-zerogpu-uuid": uuid_token
}
payload = {
"data": [
req.prompt, req.negative_prompt, req.seed, True, req.width,
req.height, req.cfg, req.steps
],
"event_data": None, "fn_index": 2, "trigger_id": 16, "session_hash": session_hash
}
try:
# Step 1: Join Queue
join_proxy = get_random_proxy()
proxy_ip_log = join_proxy['http'].split('@')[-1] if join_proxy else 'None'
logger.info(f"Step 1: Joining queue with proxy: {proxy_ip_log}")
join_response = requests.post(
f"https://heartsync-nsfw-uncensored.hf.space/gradio_api/queue/join?not-for-all-audiences=true&__theme=system",
headers=headers_post, json=payload, proxies=join_proxy, timeout=30
)
join_response.raise_for_status()
logger.info("Successfully joined queue. Now listening for SSE data.")
# Step 2: Listen to queue via SSE
sse_proxy = get_random_proxy()
proxy_ip_log = sse_proxy['http'].split('@')[-1] if sse_proxy else 'None'
logger.info(f"Step 2: Listening to SSE stream with proxy: {proxy_ip_log}")
sse_url = f"https://heartsync-nsfw-uncensored.hf.space/gradio_api/queue/data?session_hash={session_hash}"
sse_headers = {"accept": "text/event-stream", "user-agent": headers_post["user-agent"], "Date": random_date_header}
with requests.get(sse_url, headers=sse_headers, stream=True, proxies=sse_proxy, timeout=300) as sse_response:
sse_response.raise_for_status()
for line in sse_response.iter_lines():
if not line: continue
decoded_line = line.decode('utf-8')
if decoded_line.startswith("data:"):
try:
data = json.loads(decoded_line[5:].strip())
if data.get("msg") == "process_completed":
logger.info("Process completed. Extracting image data.")
output_list = data.get("output", {}).get("data", [])
if not output_list or not isinstance(output_list, list) or len(output_list) == 0:
raise HTTPException(status_code=500, detail="Generation completed but no image data was returned.")
image_dict = output_list[0]
if not isinstance(image_dict, dict):
raise HTTPException(status_code=500, detail="Unexpected output format from generation service.")
image_url = image_dict.get("url")
if not image_url:
raise HTTPException(status_code=500, detail="Image data received, but the URL is missing.")
# Step 3: Download image
download_proxy = get_random_proxy()
proxy_ip_log = download_proxy['http'].split('@')[-1] if download_proxy else 'None'
logger.info(f"Step 3: Downloading image with proxy: {proxy_ip_log}")
image_response = requests.get(image_url, proxies=download_proxy, timeout=60)
image_response.raise_for_status()
image = Image.open(BytesIO(image_response.content))
png_buffer = BytesIO()
image.save(png_buffer, format="PNG")
png_buffer.seek(0)
sanitized_prompt = "".join(c for c in req.prompt if c.isalnum() or c in " _-").rstrip()[:50]
file_name = f"{sanitized_prompt}_{int(time.time())}.png"
snapzion_url = upload_to_snapzion(png_buffer, file_name)
logger.info(f"Image successfully uploaded to SnapZion: {snapzion_url}")
return {"success": True, "image_url": snapzion_url}
except (json.JSONDecodeError, IndexError): continue
raise HTTPException(status_code=504, detail="Stream closed before generation could complete.")
except requests.exceptions.HTTPError as e:
logger.error(f"HTTP error occurred: {e.response.status_code} - {e.response.text}")
raise HTTPException(status_code=e.response.status_code, detail=f"HTTP Error: {e.response.text}")
except requests.exceptions.RequestException as e:
logger.error(f"A network error occurred: {e}")
raise HTTPException(status_code=503, detail=f"Service unavailable or network error: {e}")
except Exception as e:
logger.error(f"An unexpected error occurred: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=f"An internal server error occurred: {str(e)}")