|
|
from fastapi import FastAPI, Request, Query, BackgroundTasks |
|
|
from fastapi.responses import FileResponse, JSONResponse, StreamingResponse |
|
|
from fastapi.middleware.cors import CORSMiddleware |
|
|
import yt_dlp |
|
|
import os |
|
|
import asyncio |
|
|
from datetime import datetime, timedelta |
|
|
from urllib.parse import quote |
|
|
import io |
|
|
import logging |
|
|
import json |
|
|
import time |
|
|
from collections import defaultdict |
|
|
import uvicorn |
|
|
import gradio as gr |
|
|
|
|
|
OUTPUT_DIR = "output" |
|
|
os.makedirs(OUTPUT_DIR, exist_ok=True) |
|
|
PROXY_URL = os.environ.get("YTDLP_PROXY") |
|
|
BANNED_IP_FILE = "bannedip.json" |
|
|
MAX_REQUESTS_PER_MINUTE = 35 |
|
|
|
|
|
logging.basicConfig( |
|
|
level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s" |
|
|
) |
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
server_start_time = time.time() |
|
|
total_request_count = 0 |
|
|
request_counter = defaultdict(list) |
|
|
|
|
|
|
|
|
def load_banned_ips(): |
|
|
try: |
|
|
with open(BANNED_IP_FILE, "r") as f: |
|
|
return set(json.load(f)) |
|
|
except (FileNotFoundError, json.JSONDecodeError): |
|
|
return set() |
|
|
|
|
|
|
|
|
def save_banned_ips(banned_ips): |
|
|
with open(BANNED_IP_FILE, "w") as f: |
|
|
json.dump(list(banned_ips), f) |
|
|
|
|
|
|
|
|
async def delete_file_after_delay(file_path: str, delay: int = 600): |
|
|
await asyncio.sleep(delay) |
|
|
try: |
|
|
os.remove(file_path) |
|
|
logger.info(f"Deleted file {file_path} after {delay} seconds.") |
|
|
except FileNotFoundError: |
|
|
logger.warning(f"File {file_path} not found during deletion.") |
|
|
except Exception as e: |
|
|
logger.error(f"Error deleting file {file_path}: {e}", exc_info=True) |
|
|
|
|
|
|
|
|
app = FastAPI( |
|
|
title="YouTube Downloader API", |
|
|
description="API to download video/audio from YouTube.", |
|
|
version="1.0.0", |
|
|
) |
|
|
|
|
|
app.add_middleware( |
|
|
CORSMiddleware, |
|
|
allow_origins=["*"], |
|
|
allow_credentials=True, |
|
|
allow_methods=["*"], |
|
|
allow_headers=["*"], |
|
|
) |
|
|
|
|
|
@app.middleware("http") |
|
|
async def log_requests(request: Request, call_next): |
|
|
global total_request_count |
|
|
total_request_count += 1 |
|
|
|
|
|
client_ip = ( |
|
|
request.headers.get("X-Forwarded-For", request.client.host) |
|
|
.split(",")[0] |
|
|
.strip() |
|
|
) |
|
|
banned_ips = load_banned_ips() |
|
|
|
|
|
if client_ip in banned_ips: |
|
|
return JSONResponse( |
|
|
status_code=403, content={"message": "Blocked due to excessive requests."} |
|
|
) |
|
|
|
|
|
now = datetime.now() |
|
|
request_counter[client_ip].append(now) |
|
|
request_counter[client_ip] = [ |
|
|
t for t in request_counter[client_ip] if t > now - timedelta(minutes=1) |
|
|
] |
|
|
|
|
|
if len(request_counter[client_ip]) > MAX_REQUESTS_PER_MINUTE: |
|
|
logger.warning(f"IP {client_ip} is blocked due to rate limiting.") |
|
|
banned_ips.add(client_ip) |
|
|
save_banned_ips(banned_ips) |
|
|
request_counter[client_ip] = [] |
|
|
|
|
|
logger.info(f"{client_ip} requested {request.method} {request.url}") |
|
|
response = await call_next(request) |
|
|
return response |
|
|
|
|
|
|
|
|
@app.get("/", summary="Root Endpoint") |
|
|
async def root(): |
|
|
return JSONResponse(status_code=200, content={"status": "Server Running"}) |
|
|
|
|
|
|
|
|
@app.get("/ytv/", summary="Download YouTube Video") |
|
|
async def download_video( |
|
|
background_tasks: BackgroundTasks, |
|
|
url: str = Query(...), |
|
|
quality: int = Query(720), |
|
|
mode: str = Query("url"), |
|
|
): |
|
|
if mode not in ["url", "buffer"]: |
|
|
return JSONResponse( |
|
|
status_code=400, content={"error": "Invalid mode. Use 'url' or 'buffer'."} |
|
|
) |
|
|
|
|
|
try: |
|
|
resx = f"_{quality}p" |
|
|
ydl_opts = { |
|
|
"format": f"bestvideo[height<={quality}]+bestaudio/best", |
|
|
"outtmpl": os.path.join(OUTPUT_DIR, "%(title)s" + resx + ".%(ext)s"), |
|
|
"merge_output_format": "mp4", |
|
|
"cookiefile": "yt.txt", |
|
|
"postprocessors": [ |
|
|
{"key": "FFmpegVideoConvertor", "preferedformat": "mp4"} |
|
|
], |
|
|
} |
|
|
with yt_dlp.YoutubeDL(ydl_opts) as ydl: |
|
|
info = ydl.extract_info(url, download=True) |
|
|
file_path = ydl.prepare_filename(info) |
|
|
file_ext = os.path.splitext(file_path)[-1].lstrip(".") |
|
|
if not os.path.exists(file_path): |
|
|
raise FileNotFoundError(f"File not found after download: {file_path}") |
|
|
background_tasks.add_task(delete_file_after_delay, file_path) |
|
|
if mode == "url": |
|
|
return { |
|
|
"status": "success", |
|
|
"title": info["title"], |
|
|
"thumb": info.get("thumbnail"), |
|
|
"url": f"https://lordxdd-ytdlp.hf.space/cdn/video/{quote(os.path.basename(file_path))}", |
|
|
} |
|
|
media_type = f"video/{file_ext}" if file_ext else "video/mp4" |
|
|
with open(file_path, "rb") as f: |
|
|
return StreamingResponse( |
|
|
io.BytesIO(f.read()), |
|
|
media_type=media_type, |
|
|
headers={ |
|
|
"Content-Disposition": f"attachment; filename={os.path.basename(file_path)}" |
|
|
}, |
|
|
) |
|
|
except Exception as e: |
|
|
logger.error(f"Download error: {e}", exc_info=True) |
|
|
return JSONResponse(status_code=500, content={"error": str(e)}) |
|
|
|
|
|
|
|
|
@app.get("/cdn/video/{filename}", summary="Serve Downloaded File") |
|
|
async def download_file(filename: str): |
|
|
file_path = os.path.join(OUTPUT_DIR, filename) |
|
|
if os.path.exists(file_path): |
|
|
file_ext = os.path.splitext(filename)[-1].lstrip(".") |
|
|
media_type = f"video/{file_ext}" if file_ext else "video/mp4" |
|
|
return FileResponse(file_path, filename=filename, media_type=media_type) |
|
|
return JSONResponse(status_code=404, content={"error": "File not found"}) |
|
|
|
|
|
|
|
|
@app.get("/yta/", summary="Download YouTube Audio (.mp3)") |
|
|
async def download_audio( |
|
|
background_tasks: BackgroundTasks, |
|
|
url: str = Query(..., description="YouTube video URL"), |
|
|
mode: str = Query("url", description="Response mode: 'url' or 'buffer'"), |
|
|
): |
|
|
if mode not in {"url", "buffer"}: |
|
|
return JSONResponse( |
|
|
status_code=400, content={"error": "Invalid mode. Use 'url' or 'buffer'."} |
|
|
) |
|
|
ydl_opts = { |
|
|
"format": "bestaudio/best", |
|
|
"outtmpl": os.path.join(OUTPUT_DIR, "%(title)s.%(ext)s"), |
|
|
"cookiefile": "yt.txt", |
|
|
"noplaylist": True, |
|
|
"no_warnings": True, |
|
|
"proxy": PROXY_URL, |
|
|
"postprocessors": [ |
|
|
{ |
|
|
"key": "FFmpegExtractAudio", |
|
|
"preferredcodec": "mp3", |
|
|
"preferredquality": "128", |
|
|
} |
|
|
], |
|
|
"postprocessor_args": ["-vn", "-preset", "ultrafast", "-threads", "4"], |
|
|
"prefer_ffmpeg": True, |
|
|
"noplaylist": True, |
|
|
"no_warnings": True, |
|
|
} |
|
|
try: |
|
|
with yt_dlp.YoutubeDL(ydl_opts) as ydl: |
|
|
info = ydl.extract_info(url, download=True) |
|
|
if info.get("requested_downloads"): |
|
|
file_path = info["requested_downloads"][0]["filepath"] |
|
|
else: |
|
|
base, _ = os.path.splitext(ydl.prepare_filename(info)) |
|
|
file_path = base + ".m4a" |
|
|
if not os.path.exists(file_path): |
|
|
raise FileNotFoundError(f"File not found: {file_path}") |
|
|
background_tasks.add_task(delete_file_after_delay, file_path) |
|
|
if mode == "url": |
|
|
return { |
|
|
"status": "success", |
|
|
"title": info.get("title"), |
|
|
"thumb": info.get("thumbnail"), |
|
|
"url": f"https://lordofc-ytdlp.hf.space/cdn/audio/{quote(os.path.basename(file_path))}", |
|
|
} |
|
|
|
|
|
with open(file_path, "rb") as f: |
|
|
data = f.read() |
|
|
headers = { |
|
|
"Content-Disposition": f'attachment; filename="{os.path.basename(file_path)}"' |
|
|
} |
|
|
return StreamingResponse( |
|
|
io.BytesIO(data), media_type="audio/m4a", headers=headers |
|
|
) |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Download error: {e}", exc_info=True) |
|
|
return JSONResponse(status_code=500, content={"error": str(e)}) |
|
|
|
|
|
|
|
|
@app.get("/cdn/audio/{filename}", summary="Serve Downloaded Audio File") |
|
|
async def serve_audio_file(filename: str): |
|
|
file_path = os.path.join(OUTPUT_DIR, filename) |
|
|
if os.path.exists(file_path): |
|
|
ext = os.path.splitext(filename)[-1].lstrip(".") |
|
|
return FileResponse(file_path, filename=filename, media_type=f"audio/{ext}") |
|
|
return JSONResponse(status_code=404, content={"error": "File not found"}) |
|
|
|
|
|
|
|
|
def run_gradio_ui(): |
|
|
def download_gradio(url, resolution): |
|
|
try: |
|
|
quality = int(resolution) |
|
|
ydl_opts = { |
|
|
"format": f"bestvideo[height<={quality}]+bestaudio/best", |
|
|
"outtmpl": os.path.join( |
|
|
OUTPUT_DIR, "%(title)s" + f"_{quality}p" + ".%(ext)s" |
|
|
), |
|
|
"merge_output_format": "mp4", |
|
|
"postprocessors": [ |
|
|
{"key": "FFmpegVideoConvertor", "preferedformat": "mp4"} |
|
|
], |
|
|
} |
|
|
with yt_dlp.YoutubeDL(ydl_opts) as ydl: |
|
|
info = ydl.extract_info(url, download=True) |
|
|
file_path = ydl.prepare_filename(info) |
|
|
return { |
|
|
"status": "success", |
|
|
"title": info["title"], |
|
|
"url": f"/cdn/video/{quote(os.path.basename(file_path))}", |
|
|
} |
|
|
except Exception as e: |
|
|
return {"error": str(e)} |
|
|
|
|
|
interface = gr.Interface( |
|
|
fn=download_gradio, |
|
|
inputs=["text", gr.Slider(240, 1080)], |
|
|
outputs="json", |
|
|
title="YouTube Video Downloader", |
|
|
) |
|
|
interface.launch() |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
uvicorn.run("app:app", host="0.0.0.0", port=7860) |
|
|
|