|
|
import os |
|
|
import random |
|
|
from typing import List |
|
|
from urllib.parse import urlencode |
|
|
|
|
|
import requests |
|
|
from loguru import logger |
|
|
from moviepy.video.io.VideoFileClip import VideoFileClip |
|
|
|
|
|
from app.config import config |
|
|
from app.models.schema import MaterialInfo, VideoAspect, VideoConcatMode |
|
|
from app.utils import utils |
|
|
|
|
|
requested_count = 0 |
|
|
|
|
|
|
|
|
def get_api_key(cfg_key: str): |
|
|
api_keys = config.app.get(cfg_key) |
|
|
if not api_keys: |
|
|
raise ValueError( |
|
|
f"\n\n##### {cfg_key} is not set #####\n\nPlease set it in the config.toml file: {config.config_file}\n\n" |
|
|
f"{utils.to_json(config.app)}" |
|
|
) |
|
|
|
|
|
|
|
|
if isinstance(api_keys, str): |
|
|
return api_keys |
|
|
|
|
|
global requested_count |
|
|
requested_count += 1 |
|
|
return api_keys[requested_count % len(api_keys)] |
|
|
|
|
|
|
|
|
def search_videos_pexels( |
|
|
search_term: str, |
|
|
minimum_duration: int, |
|
|
video_aspect: VideoAspect = VideoAspect.portrait, |
|
|
) -> List[MaterialInfo]: |
|
|
aspect = VideoAspect(video_aspect) |
|
|
video_orientation = aspect.name |
|
|
video_width, video_height = aspect.to_resolution() |
|
|
api_key = get_api_key("pexels_api_keys") |
|
|
headers = { |
|
|
"Authorization": api_key, |
|
|
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/115.0.0.0 Safari/537.36", |
|
|
} |
|
|
|
|
|
params = {"query": search_term, "per_page": 20, "orientation": video_orientation} |
|
|
query_url = f"https://api.pexels.com/videos/search?{urlencode(params)}" |
|
|
logger.info(f"searching videos: {query_url}, with proxies: {config.proxy}") |
|
|
|
|
|
try: |
|
|
r = requests.get( |
|
|
query_url, |
|
|
headers=headers, |
|
|
proxies=config.proxy, |
|
|
verify=False, |
|
|
timeout=(30, 60), |
|
|
) |
|
|
response = r.json() |
|
|
video_items = [] |
|
|
if "videos" not in response: |
|
|
logger.error(f"search videos failed: {response}") |
|
|
return video_items |
|
|
videos = response["videos"] |
|
|
|
|
|
for v in videos: |
|
|
duration = v["duration"] |
|
|
|
|
|
if duration < minimum_duration: |
|
|
continue |
|
|
video_files = v["video_files"] |
|
|
|
|
|
for video in video_files: |
|
|
w = int(video["width"]) |
|
|
h = int(video["height"]) |
|
|
if w == video_width and h == video_height: |
|
|
item = MaterialInfo() |
|
|
item.provider = "pexels" |
|
|
item.url = video["link"] |
|
|
item.duration = duration |
|
|
video_items.append(item) |
|
|
break |
|
|
return video_items |
|
|
except Exception as e: |
|
|
logger.error(f"search videos failed: {str(e)}") |
|
|
|
|
|
return [] |
|
|
|
|
|
|
|
|
def search_videos_pixabay( |
|
|
search_term: str, |
|
|
minimum_duration: int, |
|
|
video_aspect: VideoAspect = VideoAspect.portrait, |
|
|
) -> List[MaterialInfo]: |
|
|
aspect = VideoAspect(video_aspect) |
|
|
|
|
|
video_width, video_height = aspect.to_resolution() |
|
|
|
|
|
api_key = get_api_key("pixabay_api_keys") |
|
|
|
|
|
params = { |
|
|
"q": search_term, |
|
|
"video_type": "all", |
|
|
"per_page": 50, |
|
|
"key": api_key, |
|
|
} |
|
|
query_url = f"https://pixabay.com/api/videos/?{urlencode(params)}" |
|
|
logger.info(f"searching videos: {query_url}, with proxies: {config.proxy}") |
|
|
|
|
|
try: |
|
|
r = requests.get( |
|
|
query_url, proxies=config.proxy, verify=False, timeout=(30, 60) |
|
|
) |
|
|
response = r.json() |
|
|
video_items = [] |
|
|
if "hits" not in response: |
|
|
logger.error(f"search videos failed: {response}") |
|
|
return video_items |
|
|
videos = response["hits"] |
|
|
|
|
|
for v in videos: |
|
|
duration = v["duration"] |
|
|
|
|
|
if duration < minimum_duration: |
|
|
continue |
|
|
video_files = v["videos"] |
|
|
|
|
|
for video_type in video_files: |
|
|
video = video_files[video_type] |
|
|
w = int(video["width"]) |
|
|
|
|
|
if w >= video_width: |
|
|
item = MaterialInfo() |
|
|
item.provider = "pixabay" |
|
|
item.url = video["url"] |
|
|
item.duration = duration |
|
|
video_items.append(item) |
|
|
break |
|
|
return video_items |
|
|
except Exception as e: |
|
|
logger.error(f"search videos failed: {str(e)}") |
|
|
|
|
|
return [] |
|
|
|
|
|
|
|
|
def save_video(video_url: str, save_dir: str = "") -> str: |
|
|
if not save_dir: |
|
|
save_dir = utils.storage_dir("cache_videos") |
|
|
|
|
|
if not os.path.exists(save_dir): |
|
|
os.makedirs(save_dir) |
|
|
|
|
|
url_without_query = video_url.split("?")[0] |
|
|
url_hash = utils.md5(url_without_query) |
|
|
video_id = f"vid-{url_hash}" |
|
|
video_path = f"{save_dir}/{video_id}.mp4" |
|
|
|
|
|
|
|
|
if os.path.exists(video_path) and os.path.getsize(video_path) > 0: |
|
|
logger.info(f"video already exists: {video_path}") |
|
|
return video_path |
|
|
|
|
|
headers = { |
|
|
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/115.0.0.0 Safari/537.36" |
|
|
} |
|
|
|
|
|
|
|
|
with open(video_path, "wb") as f: |
|
|
f.write( |
|
|
requests.get( |
|
|
video_url, |
|
|
headers=headers, |
|
|
proxies=config.proxy, |
|
|
verify=False, |
|
|
timeout=(60, 240), |
|
|
).content |
|
|
) |
|
|
|
|
|
if os.path.exists(video_path) and os.path.getsize(video_path) > 0: |
|
|
try: |
|
|
clip = VideoFileClip(video_path) |
|
|
duration = clip.duration |
|
|
fps = clip.fps |
|
|
clip.close() |
|
|
if duration > 0 and fps > 0: |
|
|
return video_path |
|
|
except Exception as e: |
|
|
try: |
|
|
os.remove(video_path) |
|
|
except Exception: |
|
|
pass |
|
|
logger.warning(f"invalid video file: {video_path} => {str(e)}") |
|
|
return "" |
|
|
|
|
|
|
|
|
def download_videos( |
|
|
task_id: str, |
|
|
search_terms: List[str], |
|
|
source: str = "pexels", |
|
|
video_aspect: VideoAspect = VideoAspect.portrait, |
|
|
video_contact_mode: VideoConcatMode = VideoConcatMode.random, |
|
|
audio_duration: float = 0.0, |
|
|
max_clip_duration: int = 5, |
|
|
) -> List[str]: |
|
|
valid_video_items = [] |
|
|
valid_video_urls = [] |
|
|
found_duration = 0.0 |
|
|
search_videos = search_videos_pexels |
|
|
if source == "pixabay": |
|
|
search_videos = search_videos_pixabay |
|
|
|
|
|
for search_term in search_terms: |
|
|
video_items = search_videos( |
|
|
search_term=search_term, |
|
|
minimum_duration=max_clip_duration, |
|
|
video_aspect=video_aspect, |
|
|
) |
|
|
logger.info(f"found {len(video_items)} videos for '{search_term}'") |
|
|
|
|
|
for item in video_items: |
|
|
if item.url not in valid_video_urls: |
|
|
valid_video_items.append(item) |
|
|
valid_video_urls.append(item.url) |
|
|
found_duration += item.duration |
|
|
|
|
|
logger.info( |
|
|
f"found total videos: {len(valid_video_items)}, required duration: {audio_duration} seconds, found duration: {found_duration} seconds" |
|
|
) |
|
|
video_paths = [] |
|
|
|
|
|
material_directory = config.app.get("material_directory", "").strip() |
|
|
if material_directory == "task": |
|
|
material_directory = utils.task_dir(task_id) |
|
|
elif material_directory and not os.path.isdir(material_directory): |
|
|
material_directory = "" |
|
|
|
|
|
if video_contact_mode.value == VideoConcatMode.random.value: |
|
|
random.shuffle(valid_video_items) |
|
|
|
|
|
total_duration = 0.0 |
|
|
for item in valid_video_items: |
|
|
try: |
|
|
logger.info(f"downloading video: {item.url}") |
|
|
saved_video_path = save_video( |
|
|
video_url=item.url, save_dir=material_directory |
|
|
) |
|
|
if saved_video_path: |
|
|
logger.info(f"video saved: {saved_video_path}") |
|
|
video_paths.append(saved_video_path) |
|
|
seconds = min(max_clip_duration, item.duration) |
|
|
total_duration += seconds |
|
|
if total_duration > audio_duration: |
|
|
logger.info( |
|
|
f"total duration of downloaded videos: {total_duration} seconds, skip downloading more" |
|
|
) |
|
|
break |
|
|
except Exception as e: |
|
|
logger.error(f"failed to download video: {utils.to_json(item)} => {str(e)}") |
|
|
logger.success(f"downloaded {len(video_paths)} videos") |
|
|
return video_paths |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
download_videos( |
|
|
"test123", ["Money Exchange Medium"], audio_duration=100, source="pixabay" |
|
|
) |
|
|
|