|
|
import math |
|
|
import os.path |
|
|
import re |
|
|
from os import path |
|
|
|
|
|
from loguru import logger |
|
|
|
|
|
from app.config import config |
|
|
from app.models import const |
|
|
from app.models.schema import VideoConcatMode, VideoParams |
|
|
from app.services import llm, material, subtitle, video, voice |
|
|
from app.services import state as sm |
|
|
from app.utils import utils |
|
|
|
|
|
|
|
|
def generate_script(task_id, params): |
|
|
logger.info("\n\n## generating video script") |
|
|
video_script = params.video_script.strip() |
|
|
if not video_script: |
|
|
video_script = llm.generate_script( |
|
|
video_subject=params.video_subject, |
|
|
language=params.video_language, |
|
|
paragraph_number=params.paragraph_number, |
|
|
) |
|
|
else: |
|
|
logger.info("Using provided video script") |
|
|
|
|
|
if not video_script: |
|
|
sm.state.update_task(task_id, state=const.TASK_STATE_FAILED) |
|
|
logger.error("failed to generate video script.") |
|
|
return None |
|
|
|
|
|
return video_script |
|
|
|
|
|
|
|
|
def generate_terms(task_id, params, video_script): |
|
|
logger.info("\n\n## generating video terms") |
|
|
video_terms = params.video_terms |
|
|
if not video_terms: |
|
|
video_terms = llm.generate_terms( |
|
|
video_subject=params.video_subject, video_script=video_script, amount=5 |
|
|
) |
|
|
else: |
|
|
if isinstance(video_terms, str): |
|
|
video_terms = [term.strip() for term in re.split(r"[,,]", video_terms)] |
|
|
elif isinstance(video_terms, list): |
|
|
video_terms = [term.strip() for term in video_terms] |
|
|
else: |
|
|
raise ValueError("video_terms must be a string or a list of strings.") |
|
|
|
|
|
logger.info(f"Using provided video terms: {len(video_terms)} term(s)") |
|
|
|
|
|
if not video_terms: |
|
|
sm.state.update_task(task_id, state=const.TASK_STATE_FAILED) |
|
|
logger.error("failed to generate video terms.") |
|
|
return None |
|
|
|
|
|
return video_terms |
|
|
|
|
|
|
|
|
def save_script_data(task_id, video_script, video_terms, params): |
|
|
script_file = path.join(utils.task_dir(task_id), "script.json") |
|
|
script_data = { |
|
|
"script": video_script, |
|
|
"search_terms": video_terms, |
|
|
"params": params, |
|
|
} |
|
|
|
|
|
with open(script_file, "w", encoding="utf-8") as f: |
|
|
f.write(utils.to_json(script_data)) |
|
|
|
|
|
|
|
|
def generate_audio(task_id, params, video_script): |
|
|
logger.info("\n\n## generating audio") |
|
|
audio_file = path.join(utils.task_dir(task_id), "audio.mp3") |
|
|
sub_maker = voice.tts( |
|
|
text=video_script, |
|
|
voice_name=voice.parse_voice_name(params.voice_name), |
|
|
voice_rate=params.voice_rate, |
|
|
voice_file=audio_file, |
|
|
) |
|
|
if sub_maker is None: |
|
|
sm.state.update_task(task_id, state=const.TASK_STATE_FAILED) |
|
|
logger.error( |
|
|
"""failed to generate audio: |
|
|
1. check if the language of the voice matches the language of the video script. |
|
|
2. check if the network is available. If you are in China, it is recommended to use a VPN and enable the global traffic mode. |
|
|
""".strip() |
|
|
) |
|
|
return None, None, None |
|
|
|
|
|
audio_duration = math.ceil(voice.get_audio_duration(sub_maker)) |
|
|
return audio_file, audio_duration, sub_maker |
|
|
|
|
|
|
|
|
def generate_subtitle(task_id, params, video_script, sub_maker, audio_file): |
|
|
if not params.subtitle_enabled: |
|
|
return "" |
|
|
|
|
|
subtitle_path = path.join(utils.task_dir(task_id), "subtitle.srt") |
|
|
subtitle_provider = config.app.get("subtitle_provider", "edge").strip().lower() |
|
|
logger.info(f"\n\n## generating subtitle, provider: {subtitle_provider}") |
|
|
|
|
|
subtitle_fallback = False |
|
|
if subtitle_provider == "edge": |
|
|
voice.create_subtitle( |
|
|
text=video_script, sub_maker=sub_maker, subtitle_file=subtitle_path |
|
|
) |
|
|
if not os.path.exists(subtitle_path): |
|
|
subtitle_fallback = True |
|
|
logger.warning("subtitle file not found, fallback to whisper") |
|
|
|
|
|
if subtitle_provider == "whisper" or subtitle_fallback: |
|
|
subtitle.create(audio_file=audio_file, subtitle_file=subtitle_path) |
|
|
logger.info("\n\n## correcting subtitle") |
|
|
subtitle.correct(subtitle_file=subtitle_path, video_script=video_script) |
|
|
|
|
|
subtitle_lines = subtitle.file_to_subtitles(subtitle_path) |
|
|
if not subtitle_lines: |
|
|
logger.warning(f"subtitle file is invalid: {subtitle_path}") |
|
|
return "" |
|
|
|
|
|
return subtitle_path |
|
|
|
|
|
|
|
|
def get_video_materials(task_id, params, video_terms, audio_duration): |
|
|
if params.video_source == "local": |
|
|
logger.info("\n\n## preprocess local materials") |
|
|
materials = video.preprocess_video( |
|
|
materials=params.video_materials, clip_duration=params.video_clip_duration |
|
|
) |
|
|
if not materials: |
|
|
sm.state.update_task(task_id, state=const.TASK_STATE_FAILED) |
|
|
logger.error( |
|
|
"no valid materials found, please check the materials and try again." |
|
|
) |
|
|
return None |
|
|
return [material_info.url for material_info in materials] |
|
|
else: |
|
|
logger.info(f"\n\n## downloading videos from {params.video_source}") |
|
|
downloaded_videos = material.download_videos( |
|
|
task_id=task_id, |
|
|
search_terms=video_terms, |
|
|
source=params.video_source, |
|
|
video_aspect=params.video_aspect, |
|
|
video_contact_mode=params.video_concat_mode, |
|
|
audio_duration=audio_duration * params.video_count, |
|
|
max_clip_duration=params.video_clip_duration, |
|
|
) |
|
|
if not downloaded_videos: |
|
|
sm.state.update_task(task_id, state=const.TASK_STATE_FAILED) |
|
|
logger.error( |
|
|
"failed to download videos, maybe the network is not available. if you are in China, please use a VPN." |
|
|
) |
|
|
return None |
|
|
return downloaded_videos |
|
|
|
|
|
|
|
|
def generate_final_videos( |
|
|
task_id, params, downloaded_videos, audio_file, subtitle_path |
|
|
): |
|
|
final_video_paths = [] |
|
|
combined_video_paths = [] |
|
|
video_concat_mode = ( |
|
|
params.video_concat_mode if params.video_count == 1 else VideoConcatMode.random |
|
|
) |
|
|
video_transition_mode = params.video_transition_mode |
|
|
|
|
|
_progress = 50 |
|
|
for i in range(params.video_count): |
|
|
index = i + 1 |
|
|
combined_video_path = path.join( |
|
|
utils.task_dir(task_id), f"combined-{index}.mp4" |
|
|
) |
|
|
logger.info(f"\n\n## combining video: {index} => {combined_video_path}") |
|
|
video.combine_videos( |
|
|
combined_video_path=combined_video_path, |
|
|
video_paths=downloaded_videos, |
|
|
audio_file=audio_file, |
|
|
video_aspect=params.video_aspect, |
|
|
video_concat_mode=video_concat_mode, |
|
|
video_transition_mode=video_transition_mode, |
|
|
max_clip_duration=params.video_clip_duration, |
|
|
threads=params.n_threads, |
|
|
) |
|
|
|
|
|
_progress += 50 / params.video_count / 2 |
|
|
sm.state.update_task(task_id, progress=_progress) |
|
|
|
|
|
final_video_path = path.join(utils.task_dir(task_id), f"final-{index}.mp4") |
|
|
|
|
|
logger.info(f"\n\n## generating video: {index} => {final_video_path}") |
|
|
video.generate_video( |
|
|
video_path=combined_video_path, |
|
|
audio_path=audio_file, |
|
|
subtitle_path=subtitle_path, |
|
|
output_file=final_video_path, |
|
|
params=params, |
|
|
) |
|
|
|
|
|
_progress += 50 / params.video_count / 2 |
|
|
sm.state.update_task(task_id, progress=_progress) |
|
|
|
|
|
final_video_paths.append(final_video_path) |
|
|
combined_video_paths.append(combined_video_path) |
|
|
|
|
|
return final_video_paths, combined_video_paths |
|
|
|
|
|
|
|
|
def start(task_id, params: VideoParams, stop_at: str = "video"): |
|
|
logger.info(f"start task: {task_id}, stop_at: {stop_at}") |
|
|
sm.state.update_task(task_id, state=const.TASK_STATE_PROCESSING, progress=5) |
|
|
|
|
|
|
|
|
if hasattr(params, 'video_count') and params.video_count > 2: |
|
|
logger.warning("Limiting video count to 2 for Huggingface Spaces") |
|
|
params.video_count = 2 |
|
|
|
|
|
if hasattr(params, 'video_clip_duration') and params.video_clip_duration > 5: |
|
|
logger.warning("Limiting clip duration to 5 seconds for Huggingface Spaces") |
|
|
params.video_clip_duration = 5 |
|
|
|
|
|
if type(params.video_concat_mode) is str: |
|
|
params.video_concat_mode = VideoConcatMode(params.video_concat_mode) |
|
|
|
|
|
|
|
|
video_script = generate_script(task_id, params) |
|
|
if not video_script or "Error: " in video_script: |
|
|
sm.state.update_task(task_id, state=const.TASK_STATE_FAILED) |
|
|
return |
|
|
|
|
|
sm.state.update_task(task_id, state=const.TASK_STATE_PROCESSING, progress=10) |
|
|
|
|
|
if stop_at == "script": |
|
|
sm.state.update_task( |
|
|
task_id, state=const.TASK_STATE_COMPLETE, progress=100, script=video_script |
|
|
) |
|
|
return {"script": video_script} |
|
|
|
|
|
|
|
|
video_terms = "" |
|
|
if params.video_source != "local": |
|
|
video_terms = generate_terms(task_id, params, video_script) |
|
|
if not video_terms: |
|
|
sm.state.update_task(task_id, state=const.TASK_STATE_FAILED) |
|
|
return |
|
|
|
|
|
save_script_data(task_id, video_script, video_terms, params) |
|
|
|
|
|
if stop_at == "terms": |
|
|
sm.state.update_task( |
|
|
task_id, state=const.TASK_STATE_COMPLETE, progress=100, terms=video_terms |
|
|
) |
|
|
return {"script": video_script, "terms": video_terms} |
|
|
|
|
|
sm.state.update_task(task_id, state=const.TASK_STATE_PROCESSING, progress=20) |
|
|
|
|
|
|
|
|
audio_file, audio_duration, sub_maker = generate_audio( |
|
|
task_id, params, video_script |
|
|
) |
|
|
if not audio_file: |
|
|
sm.state.update_task(task_id, state=const.TASK_STATE_FAILED) |
|
|
return |
|
|
|
|
|
sm.state.update_task(task_id, state=const.TASK_STATE_PROCESSING, progress=30) |
|
|
|
|
|
if stop_at == "audio": |
|
|
sm.state.update_task( |
|
|
task_id, |
|
|
state=const.TASK_STATE_COMPLETE, |
|
|
progress=100, |
|
|
audio_file=audio_file, |
|
|
) |
|
|
return {"audio_file": audio_file, "audio_duration": audio_duration} |
|
|
|
|
|
|
|
|
subtitle_path = generate_subtitle( |
|
|
task_id, params, video_script, sub_maker, audio_file |
|
|
) |
|
|
|
|
|
if stop_at == "subtitle": |
|
|
sm.state.update_task( |
|
|
task_id, |
|
|
state=const.TASK_STATE_COMPLETE, |
|
|
progress=100, |
|
|
subtitle_path=subtitle_path, |
|
|
) |
|
|
return {"subtitle_path": subtitle_path} |
|
|
|
|
|
sm.state.update_task(task_id, state=const.TASK_STATE_PROCESSING, progress=40) |
|
|
|
|
|
|
|
|
downloaded_videos = get_video_materials( |
|
|
task_id, params, video_terms, audio_duration |
|
|
) |
|
|
if not downloaded_videos: |
|
|
sm.state.update_task(task_id, state=const.TASK_STATE_FAILED) |
|
|
return |
|
|
|
|
|
if stop_at == "materials": |
|
|
sm.state.update_task( |
|
|
task_id, |
|
|
state=const.TASK_STATE_COMPLETE, |
|
|
progress=100, |
|
|
materials=downloaded_videos, |
|
|
) |
|
|
return {"materials": downloaded_videos} |
|
|
|
|
|
sm.state.update_task(task_id, state=const.TASK_STATE_PROCESSING, progress=50) |
|
|
|
|
|
|
|
|
final_video_paths, combined_video_paths = generate_final_videos( |
|
|
task_id, params, downloaded_videos, audio_file, subtitle_path |
|
|
) |
|
|
|
|
|
if not final_video_paths: |
|
|
sm.state.update_task(task_id, state=const.TASK_STATE_FAILED) |
|
|
return |
|
|
|
|
|
logger.success( |
|
|
f"task {task_id} finished, generated {len(final_video_paths)} videos." |
|
|
) |
|
|
|
|
|
kwargs = { |
|
|
"videos": final_video_paths, |
|
|
"combined_videos": combined_video_paths, |
|
|
"script": video_script, |
|
|
"terms": video_terms, |
|
|
"audio_file": audio_file, |
|
|
"audio_duration": audio_duration, |
|
|
"subtitle_path": subtitle_path, |
|
|
"materials": downloaded_videos, |
|
|
} |
|
|
sm.state.update_task( |
|
|
task_id, state=const.TASK_STATE_COMPLETE, progress=100, **kwargs |
|
|
) |
|
|
return kwargs |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
task_id = "task_id" |
|
|
params = VideoParams( |
|
|
video_subject="金钱的作用", |
|
|
voice_name="zh-CN-XiaoyiNeural-Female", |
|
|
voice_rate=1.0, |
|
|
) |
|
|
start(task_id, params, stop_at="video") |
|
|
|