Spaces:
Sleeping
Sleeping
| import constants | |
| import os | |
| from PIL import Image | |
| from gradio_client import Client | |
| import moviepy.editor as mp | |
| from structured_output_extractor import StructuredOutputExtractor | |
| from response_schemas import ScenesResponseSchema | |
| from typing import List, Dict | |
| from natsort import natsorted # Natural sorting for correct numerical order | |
| def get_scenes(text_script: str): | |
| read_time = calculate_read_time(text_script) | |
| prompt = f""" | |
| ROLE: Story to Scene Generator | |
| Tasks: For the given story | |
| 1. Read it Completely and Understand the Complete Context | |
| 2. Rewrite the story in tiny scenes(but without even changing a word) with highly detailed and context aware list of image prompts to visualize each scene | |
| 3. Never Describe complete scene in a single image prompt use multiple prompts | |
| RULE OF THUMB: 12 image prompts / 1 min audio (Note: You will be provided with estimated read Time for each story or script) | |
| here is the Estimated Read Time of the complete story: {read_time}\n\n | |
| and Here is the Complete Story: {text_script} | |
| """ | |
| extractor = StructuredOutputExtractor(response_schema=ScenesResponseSchema) | |
| result = extractor.extract(prompt) | |
| return result.model_dump() # returns dictionary version pydantic model | |
| def generate_video_assets(scenes: Dict, language: str, speaker: str, base_path: str = "media") -> str: | |
| try: | |
| # Ensure the base folder exists | |
| if not os.path.exists(base_path): | |
| os.makedirs(base_path) | |
| # Extract scenes from the input dictionary | |
| scenes_list = scenes.get("scenes", []) | |
| print(f"Total Scenes: {len(scenes_list)}") | |
| # Create a folder for the current video | |
| video_folder = os.path.join(base_path, f"video_{len(os.listdir(base_path)) + 1}") | |
| if not os.path.exists(video_folder): | |
| os.makedirs(video_folder) | |
| # Create 'images' and 'audio' folders inside the video folder | |
| images_folder = os.path.join(video_folder, "images") | |
| audio_folder = os.path.join(video_folder, "audio") | |
| os.makedirs(images_folder, exist_ok=True) | |
| os.makedirs(audio_folder, exist_ok=True) | |
| for scene_count, scene in enumerate(scenes_list): | |
| text: str = scene.get("text", "") | |
| image_prompts: List[str] = scene.get("image_prompts", []) | |
| # Create a folder for the current scene inside the 'images' folder | |
| scene_images_folder = os.path.join(images_folder, f"scene_{scene_count + 1}") | |
| os.makedirs(scene_images_folder, exist_ok=True) | |
| # Generate audio for the scene | |
| audio_path = os.path.join(audio_folder, f"scene_{scene_count + 1}.mp3") | |
| audio_result = generate_audio(text, language, speaker, path=audio_path) | |
| if "error" in audio_result: | |
| print(f"Error generating audio for scene {scene_count + 1}: {audio_result['error']}") | |
| continue | |
| # Generate images for the scene | |
| image_paths = [] | |
| for count, prompt in enumerate(image_prompts): | |
| image_path = os.path.join(scene_images_folder, f"scene_{scene_count + 1}_image_{count + 1}.png") | |
| image_result = generate_image(prompt=prompt, path=image_path) | |
| if "error" in image_result: | |
| print(f"Error generating image {count + 1} for scene {scene_count + 1}: {image_result['error']}") | |
| else: | |
| image_paths.append(image_path) | |
| print(f"Scene: {scene_count + 1}\t No of Images in Scene {scene_count + 1}: {len(image_paths)}") | |
| # Return the path of the main video folder | |
| return video_folder | |
| except Exception as e: | |
| print(f"Error during video asset generation: {e}") | |
| return {"error": str(e)} | |
| def generate_audio(text, language_code, speaker, path='test_audio.mp3'): | |
| try: | |
| # Initialize the Gradio Client with the hosted model | |
| client = Client("habib926653/Multilingual-TTS") | |
| # Make the API request | |
| result = client.predict( | |
| text=text, # Text input for audio generation | |
| language_code=language_code, # Language code (e.g., "Urdu") | |
| speaker=speaker, # Selected speaker (e.g., "Asad") | |
| api_name="/text_to_speech_edge" | |
| ) | |
| # The result is a tuple: (text, audio_file_path) | |
| audio_file_path = result[1] # The generated audio file path | |
| # Read the audio file as bytes | |
| with open(audio_file_path, 'rb') as f: | |
| audio_bytes = f.read() | |
| # Save the audio bytes to the specified path | |
| with open(path, 'wb') as f: | |
| f.write(audio_bytes) | |
| # Return the result (which includes the file path) | |
| return {"audio_file": path} | |
| except Exception as e: | |
| print(f"Error during audio generation: {e}") | |
| return {"error": str(e)} | |
| def generate_image(prompt, path='test_image.png'): | |
| try: | |
| # Initialize the Gradio Client with Hugging Face token | |
| client = Client(constants.IMAGE_GENERATION_SPACE_NAME, hf_token=constants.HF_TOKEN) | |
| # Make the API request | |
| result = client.predict( | |
| prompt=prompt, # Text prompt for image generation | |
| width=1280, | |
| height=720, | |
| api_name="/generate_image" | |
| ) | |
| image = Image.open(result) | |
| image.save(path) | |
| # Return the result (which includes the URL or file path) | |
| return result | |
| except Exception as e: | |
| print(f"Error during image generation: {e}") | |
| return {"error": str(e)} | |
| def generate_video(video_folder: str, output_filename: str = "final_video.mp4"): | |
| audio_folder = os.path.join(video_folder, "audio") | |
| images_folder = os.path.join(video_folder, "images") | |
| final_clips = [] | |
| # Get all scene folders with absolute paths and sorted order | |
| scene_folders = [ | |
| os.path.join(images_folder, scene) | |
| for scene in natsorted(os.listdir(images_folder)) | |
| if os.path.isdir(os.path.join(images_folder, scene)) | |
| ] | |
| for scene_path in scene_folders: | |
| scene_name = os.path.basename(scene_path) | |
| audio_path = os.path.join(audio_folder, f"{scene_name}.mp3") | |
| # Ensure audio file exists | |
| if not os.path.exists(audio_path): | |
| print(f"Warning: Audio file {audio_path} not found. Skipping scene {scene_name}.") | |
| continue | |
| # Get all image files for the scene (sorted) | |
| image_files = natsorted([ | |
| os.path.join(scene_path, img) | |
| for img in os.listdir(scene_path) | |
| if img.lower().endswith(('.png', '.jpg', '.jpeg')) | |
| ]) | |
| if not image_files: | |
| print(f"Warning: No images found in {scene_path}. Skipping scene {scene_name}.") | |
| continue | |
| # Load audio file | |
| audio_clip = mp.AudioFileClip(audio_path) | |
| # Calculate duration per image | |
| duration_per_image = audio_clip.duration / len(image_files) | |
| # Create image clips | |
| image_clips = [mp.ImageClip(img).set_duration(duration_per_image) for img in image_files] | |
| # Concatenate image clips | |
| scene_video = mp.concatenate_videoclips(image_clips, method="compose").set_audio(audio_clip) | |
| final_clips.append(scene_video) | |
| if not final_clips: | |
| print("Error: No valid scenes processed.") | |
| return None | |
| # Concatenate all scenes | |
| final_video = mp.concatenate_videoclips(final_clips, method="compose") | |
| output_path = os.path.join(video_folder, output_filename) | |
| final_video.write_videofile(output_path, fps=24, codec='libx264') | |
| return output_path | |
| def calculate_read_time(text: str, words_per_minute: int = 155) -> str: | |
| """ | |
| Calculate how long it will take to read a given text. | |
| Args: | |
| text (str): The input text to calculate reading time for. | |
| words_per_minute (int): Average reading speed in words per minute. Default is 155(an uneducated guess). | |
| Returns: | |
| str: A string describing the reading time in seconds, minutes, or hours. | |
| """ | |
| try: | |
| # Validate input | |
| if not text or not isinstance(text, str): | |
| return "Invalid input: Text must be a non-empty string." | |
| # Calculate the number of words in the text | |
| words = text.split() | |
| word_count = len(words) | |
| # Calculate total reading time in seconds | |
| total_seconds = (word_count / words_per_minute) * 60 | |
| # Convert to hours, minutes, and seconds | |
| hours = int(total_seconds // 3600) | |
| minutes = int((total_seconds % 3600) // 60) | |
| seconds = int(total_seconds % 60) | |
| # Format the output based on the duration | |
| if hours > 0: | |
| return f"Reading time: {hours} hour(s), {minutes} minute(s), and {seconds} second(s)." | |
| elif minutes > 0: | |
| return f"Reading time: {minutes} minute(s) and {seconds} second(s)." | |
| else: | |
| return f"Reading time: {seconds} second(s)." | |
| except Exception as e: | |
| return f"An error occurred: {e}" | |
| # Example usage: | |
| if __name__ == "__main__": | |
| short_story = """ | |
| In a quiet village, a young girl named Lily discovered a hidden garden. | |
| Every flower in the garden glowed with a magical light, revealing secrets of the past. | |
| Lily knew she had found something truly extraordinary. | |
| """ | |
| generate_audio(short_story, "Urdu", "Asad") | |