|
|
|
|
|
""" |
|
|
Script orchestrateur pour le traitement automatique des BOB (Hugging Face version) |
|
|
1. Transcrit tous les fichiers audio du dossier input avec Whisper |
|
|
2. Analyse toutes les transcriptions avec Hugging Face |
|
|
3. Génère le fichier de résumé final |
|
|
""" |
|
|
|
|
|
import os |
|
|
import sys |
|
|
import subprocess |
|
|
from pathlib import Path |
|
|
from datetime import datetime |
|
|
import time |
|
|
|
|
|
|
|
|
SCRIPT_DIR = Path(__file__).parent.absolute() |
|
|
TRANSCRIBE_SCRIPT = SCRIPT_DIR / "transcribe_audio.py" |
|
|
ANALYZE_SCRIPT = SCRIPT_DIR / "analyze_bob_hf.py" |
|
|
INPUT_DIR = SCRIPT_DIR.parent / "input" |
|
|
OUTPUT_DIR = SCRIPT_DIR.parent / "output" |
|
|
TRANSCRIPTIONS_DIR = OUTPUT_DIR / "transcriptions" |
|
|
RESUME_FILE = OUTPUT_DIR / "resume_bob.txt" |
|
|
|
|
|
def print_header(): |
|
|
"""Affiche l'en-tête du script""" |
|
|
print("=" * 70) |
|
|
print("🎙️ TRAITEMENT AUTOMATIQUE DES BOB (Hugging Face)") |
|
|
print(" Transcription audio → Analyse IA → Résumé") |
|
|
print("=" * 70) |
|
|
print(f"Démarré le: {datetime.now().strftime('%d/%m/%Y à %H:%M:%S')}") |
|
|
print() |
|
|
|
|
|
def check_prerequisites(): |
|
|
"""Vérifie que tous les prérequis sont en place""" |
|
|
print("🔍 Vérification des prérequis...") |
|
|
|
|
|
|
|
|
if not INPUT_DIR.exists(): |
|
|
print(f"❌ Le dossier input n'existe pas: {INPUT_DIR}") |
|
|
return False |
|
|
|
|
|
if not TRANSCRIBE_SCRIPT.exists(): |
|
|
print(f"❌ Script de transcription introuvable: {TRANSCRIBE_SCRIPT}") |
|
|
return False |
|
|
|
|
|
if not ANALYZE_SCRIPT.exists(): |
|
|
print(f"❌ Script d'analyse introuvable: {ANALYZE_SCRIPT}") |
|
|
return False |
|
|
|
|
|
|
|
|
audio_extensions = ['.mp3', '.wav', '.m4a', '.flac', '.ogg', '.mp4', '.avi', '.mov'] |
|
|
audio_files = [] |
|
|
for ext in audio_extensions: |
|
|
audio_files.extend(INPUT_DIR.glob(f"*{ext}")) |
|
|
audio_files.extend(INPUT_DIR.glob(f"*{ext.upper()}")) |
|
|
|
|
|
if not audio_files: |
|
|
print(f"⚠️ Aucun fichier audio trouvé dans {INPUT_DIR}") |
|
|
print(f" Formats supportés: {', '.join(audio_extensions)}") |
|
|
return False |
|
|
|
|
|
print(f"✅ Trouvé {len(audio_files)} fichier(s) audio à traiter:") |
|
|
for i, file in enumerate(sorted(set(audio_files)), 1): |
|
|
print(f" {i}. {file.name}") |
|
|
|
|
|
print("✅ Scripts de traitement trouvés") |
|
|
print("✅ Prérequis validés") |
|
|
print() |
|
|
return True |
|
|
|
|
|
def run_script(script_path, step_name, python_executable=None): |
|
|
"""Exécute un script Python et retourne le succès""" |
|
|
if python_executable is None: |
|
|
python_executable = sys.executable |
|
|
|
|
|
print(f"🚀 Étape {step_name}...") |
|
|
print(f" Exécution: {script_path.name}") |
|
|
|
|
|
start_time = time.time() |
|
|
|
|
|
try: |
|
|
|
|
|
result = subprocess.run( |
|
|
[python_executable, str(script_path)], |
|
|
cwd=str(script_path.parent), |
|
|
capture_output=True, |
|
|
text=True, |
|
|
encoding='utf-8', |
|
|
errors='replace' |
|
|
) |
|
|
|
|
|
end_time = time.time() |
|
|
duration = end_time - start_time |
|
|
|
|
|
if result.returncode == 0: |
|
|
print(f"✅ {step_name} terminée avec succès") |
|
|
print(f" Durée: {duration:.1f} secondes") |
|
|
|
|
|
|
|
|
output_lines = result.stdout.strip().split('\n') |
|
|
if output_lines: |
|
|
print(" Résultat:") |
|
|
for line in output_lines[-3:]: |
|
|
if line.strip(): |
|
|
print(f" > {line.strip()}") |
|
|
print() |
|
|
return True |
|
|
else: |
|
|
print(f"❌ {step_name} a échoué") |
|
|
print(f" Code d'erreur: {result.returncode}") |
|
|
if result.stderr: |
|
|
print(f" Erreur: {result.stderr.strip()}") |
|
|
if result.stdout: |
|
|
print(f" Sortie: {result.stdout.strip()}") |
|
|
print() |
|
|
return False |
|
|
|
|
|
except Exception as e: |
|
|
end_time = time.time() |
|
|
duration = end_time - start_time |
|
|
print(f"❌ Erreur lors de l'exécution de {step_name}") |
|
|
print(f" Exception: {e}") |
|
|
print(f" Durée avant erreur: {duration:.1f} secondes") |
|
|
print() |
|
|
return False |
|
|
|
|
|
def check_results(): |
|
|
"""Vérifie et affiche les résultats finaux""" |
|
|
print("📊 Vérification des résultats...") |
|
|
|
|
|
|
|
|
if TRANSCRIPTIONS_DIR.exists(): |
|
|
transcription_files = list(TRANSCRIPTIONS_DIR.glob("*_transcription.txt")) |
|
|
print(f"✅ {len(transcription_files)} transcription(s) générée(s)") |
|
|
|
|
|
for file in transcription_files: |
|
|
size_kb = file.stat().st_size / 1024 |
|
|
print(f" • {file.name} ({size_kb:.1f} KB)") |
|
|
else: |
|
|
print("❌ Dossier de transcriptions non trouvé") |
|
|
return False |
|
|
|
|
|
|
|
|
if RESUME_FILE.exists(): |
|
|
print(f"✅ Fichier de résumé généré: {RESUME_FILE.name}") |
|
|
|
|
|
|
|
|
try: |
|
|
with open(RESUME_FILE, 'r', encoding='utf-8') as f: |
|
|
content = f.read().strip() |
|
|
|
|
|
print("📝 Contenu du résumé:") |
|
|
print("-" * 50) |
|
|
|
|
|
lines = content.split('\n') |
|
|
for line in lines: |
|
|
if line.strip() and not line.startswith('#'): |
|
|
print(f" {line}") |
|
|
|
|
|
print("-" * 50) |
|
|
except Exception as e: |
|
|
print(f" Erreur lors de la lecture: {e}") |
|
|
|
|
|
return True |
|
|
else: |
|
|
print("❌ Fichier de résumé non généré") |
|
|
return False |
|
|
|
|
|
def main(): |
|
|
"""Fonction principale""" |
|
|
start_total = time.time() |
|
|
|
|
|
print_header() |
|
|
|
|
|
|
|
|
if not check_prerequisites(): |
|
|
print("❌ Impossible de continuer sans les prérequis") |
|
|
return 1 |
|
|
|
|
|
|
|
|
print("👀 Prêt à démarrer le traitement automatique des BOB") |
|
|
response = input(" Continuer ? (o/N): ").lower().strip() |
|
|
if response not in ['o', 'oui', 'y', 'yes']: |
|
|
print("🚫 Traitement annulé par l'utilisateur") |
|
|
return 0 |
|
|
|
|
|
print() |
|
|
|
|
|
|
|
|
python_exe = sys.executable |
|
|
print(f"🐍 Utilisation de Python: {python_exe}") |
|
|
print() |
|
|
|
|
|
|
|
|
success_transcription = run_script( |
|
|
TRANSCRIBE_SCRIPT, |
|
|
"1/2 - Transcription audio (Whisper)", |
|
|
python_exe |
|
|
) |
|
|
|
|
|
if not success_transcription: |
|
|
print("❌ Échec de la transcription. Arrêt du traitement.") |
|
|
return 1 |
|
|
|
|
|
|
|
|
success_analysis = run_script( |
|
|
ANALYZE_SCRIPT, |
|
|
"2/2 - Analyse des transcriptions (Hugging Face)", |
|
|
python_exe |
|
|
) |
|
|
|
|
|
if not success_analysis: |
|
|
print("❌ Échec de l'analyse. Vérifiez que les modèles Hugging Face sont accessibles.") |
|
|
return 1 |
|
|
|
|
|
|
|
|
print() |
|
|
results_ok = check_results() |
|
|
|
|
|
|
|
|
end_total = time.time() |
|
|
total_duration = end_total - start_total |
|
|
|
|
|
print() |
|
|
print("=" * 70) |
|
|
if results_ok: |
|
|
print("🎉 TRAITEMENT TERMINÉ AVEC SUCCÈS") |
|
|
print(f"⏱️ Durée totale: {total_duration:.1f} secondes ({total_duration/60:.1f} minutes)") |
|
|
print(f"📁 Fichier de résumé: {RESUME_FILE}") |
|
|
print("✅ Tous vos BOB ont été traités automatiquement !") |
|
|
else: |
|
|
print("⚠️ TRAITEMENT PARTIELLEMENT RÉUSSI") |
|
|
print(f"⏱️ Durée totale: {total_duration:.1f} secondes") |
|
|
print("🔍 Vérifiez les fichiers de sortie manuellement") |
|
|
print("=" * 70) |
|
|
|
|
|
return 0 if results_ok else 1 |
|
|
|
|
|
if __name__ == "__main__": |
|
|
try: |
|
|
exit_code = main() |
|
|
sys.exit(exit_code) |
|
|
except KeyboardInterrupt: |
|
|
print("\n🚫 Traitement interrompu par l'utilisateur") |
|
|
sys.exit(1) |
|
|
except Exception as e: |
|
|
print(f"\n💥 Erreur inattendue: {e}") |
|
|
sys.exit(1) |
|
|
|