#!/usr/bin/env python3 """ Script d'analyse des transcriptions avec Hugging Face Transformers Analyse les fichiers txt dans output/transcriptions et génère un résumé structuré """ import os from pathlib import Path import torch from transformers import pipeline, AutoTokenizer, AutoModelForCausalLM from datetime import datetime import re import traceback from dotenv import load_dotenv # Bootstrap environnement portable try: from portable_env import setup_portable_env setup_portable_env() except Exception: pass # Charger les variables d'environnement try: load_dotenv(Path(__file__).parent.parent / ".env") except Exception: pass # Configuration (via env, avec fallback local) BASE_DIR = Path(os.environ.get("BOB_BASE_DIR", Path(__file__).parent.parent)) TRANSCRIPTIONS_DIR = Path(os.environ.get("BOB_TRANSCRIPTIONS_DIR", BASE_DIR / "output" / "transcriptions")) OUTPUT_FILE = Path(os.environ.get("BOB_OUTPUT_FILE", BASE_DIR / "output" / "resume_bob.txt")) HF_MODEL = os.environ.get("HF_MODEL", "google/gemma-3-4b-pt") # Gemma 3 4B FORCÉ def get_hf_model(): """Récupère le modèle Hugging Face depuis la variable d'environnement""" return os.environ.get("HF_MODEL", "google/gemma-3-4b-pt") def load_hf_model(): """Charge un modèle Hugging Face""" try: hf_model = get_hf_model() # Lire dynamiquement print(f"Chargement du modèle Hugging Face: {hf_model}") # Utiliser pipeline pour plus de simplicité generator = pipeline( "text-generation", model=hf_model, torch_dtype=torch.float16 if torch.cuda.is_available() else torch.float32, device_map="auto" if torch.cuda.is_available() else "cpu", token=os.environ.get("HF_TOKEN") # Pour les modèles privés ) print(f"✅ Modèle {hf_model} chargé avec succès") return generator except Exception as e: print(f"❌ Erreur lors du chargement du modèle Hugging Face: {e}") print(f"Traceback: {traceback.format_exc()}") print("Assurez-vous que le modèle est disponible et que vous avez les permissions nécessaires") return None def create_analysis_prompt(): """Crée le prompt d'analyse""" return """RÔLE: Expert en classification de contenu journalistique RTL. TÂCHE: Extraire 3 informations précises de cette transcription radio : 1. AUTEUR : Nom complet du journaliste/présentateur - Chercher "les précisions pour RTL de [NOM]" ou signature en fin - Si absent : "Inconnu" 2. QUALIFICATION du format (TRÈS IMPORTANT) : - P = PAPIER seul : Lecture continue par le journaliste, pas d'interviews • Phrases à la 3e personne uniquement • Aucune citation directe de témoins • Style narratif/descriptif pur - P+S = PAPIER + SON : Reportage avec interviews/témoignages • Présence de citations directes ("Je...", "Nous...") • Témoignages de personnes citées par leur prénom • Alternance narratif + paroles rapportées • Phrases comme "explique Alexandre", "témoigne Lucas" - QR = QUESTIONS-RÉPONSES : Interview/débat en direct • Format conversationnel • Questions-réponses explicites • Dialogue en temps réel 3. TITRE : Sujet principal en 4-6 mots, MAJUSCULES, style presse INDICES DE DÉTECTION P+S : - Citations à la 1ère personne : "J'ai été hospitalisé", "Nous avons commencé" - Prénoms + témoignages : "Alexandre explique", "Lucas raconte" - Discours rapporté : "Il dit que", "Ils nous ont dit" - Changement de ton narratif FORMAT OBLIGATOIRE : AUTEUR|QUALIFICATION|TITRE""" def detect_format_indicators(text): """Détecte automatiquement les indicateurs de format P/P+S/QR/MT""" indicators = { 'p_plus_s': 0, # Papier + Son 'qr': 0, # Questions-Réponses 'mt': 0, # Micro-Trottoir 'p_only': 0 # Papier seul } text_lower = text.lower() # Indicateurs Micro-Trottoir (MT) mt_patterns = [ r'\bmoi je (?:trouve|pense|crois|dis)', r'\bje trouve (?:que|ça|dommage)', r'\bje pense que', r'\bpour moi', r'\bà mon avis', r'\bfranchement', r'en arrivant', r'je viens (?:de|d\')', r'aujourd\'hui', r'c\'est dommage', r'malheureusement', r'donc je (?:voulais|pense)', r'quand même', r'un petit peu', r'vraiment dommage', ] # Indicateurs P+S (Papier + Son) p_plus_s_patterns = [ r'\bje\s+(?:suis|ai|me|pense|crois|vais|veux|dois)', r'\bnous\s+(?:avons|sommes|étions|allons|devons)', r'\bj\'(?:ai|étais|avais|irai|aurais)', r'\bmon\s+(?:père|fils|mari|frère)', r'\bma\s+(?:mère|fille|femme|sœur)', r'\b(?:explique|témoigne|raconte|confie|précise|ajoute|poursuit)\s+\w+', r'\b\w+\s+(?:explique|témoigne|raconte|confie|précise|ajoute|poursuit)', r'selon\s+\w+', r'(?:il|elle|ils|elles)\s+(?:dit|disent|explique|expliquent|affirme|assure)\s+que', r'pour\s+\w+\s*,', r'comme\s+(?:le\s+)?(?:dit|explique|précise)\s+\w+', r'voilà ce à quoi', r'c\'est qu?\'?à? partir', r'certains d\'entre (?:nous|eux)', r'parmi les\s+\d+', r'\b[A-Z][a-z]+\s+qui\s+(?:est|a|était)', r'comme\s+[A-Z][a-z]+', r'fièvre\s+et\s+\w+', r'hospitalisé', r'symptômes', r'malade', ] # Indicateurs QR (Questions-Réponses) qr_patterns = [ r'\?.*[A-Z]', # Question suivie de réponse r'question\s*:', r'réponse\s*:', r'vous\s+(?:pensez|croyez|dites)', r'que\s+pensez-vous', r'interview', r'débat', ] # Compter les patterns for pattern in mt_patterns: indicators['mt'] += len(re.findall(pattern, text_lower, re.IGNORECASE)) for pattern in p_plus_s_patterns: indicators['p_plus_s'] += len(re.findall(pattern, text_lower, re.IGNORECASE)) for pattern in qr_patterns: indicators['qr'] += len(re.findall(pattern, text_lower, re.IGNORECASE)) # Si ni MT ni P+S ni QR détecté fortement, c'est probablement P seul if indicators['mt'] < 3 and indicators['p_plus_s'] < 3 and indicators['qr'] < 2: indicators['p_only'] = 5 return indicators def analyze_transcription(generator, transcription_text, filename): """Analyse une transcription avec Hugging Face""" try: # Analyse automatique des patterns format_indicators = detect_format_indicators(transcription_text) # Créer un hint pour l'IA basé sur l'analyse automatique max_score = max(format_indicators.values()) likely_format = [k for k, v in format_indicators.items() if v == max_score][0] hint_map = { 'p_plus_s': "ATTENTION: Nombreux témoignages détectés → OBLIGATOIREMENT P+S", 'qr': "ATTENTION: Format questions-réponses détecté → OBLIGATOIREMENT QR", 'p_only': "ATTENTION: Aucun témoignage/interview → OBLIGATOIREMENT P" } # Si détection très claire (score élevé), forcer le format force_format = "" if format_indicators['p_plus_s'] >= 5: force_format = "\nFORMAT IMPOSÉ: Utilise OBLIGATOIREMENT 'P+S' pour la qualification." elif format_indicators['qr'] >= 3: force_format = "\nFORMAT IMPOSÉ: Utilise OBLIGATOIREMENT 'QR' pour la qualification." elif format_indicators['p_plus_s'] <= 1 and format_indicators['qr'] <= 1: force_format = "\nFORMAT IMPOSÉ: Utilise OBLIGATOIREMENT 'P' pour la qualification." format_hint = hint_map.get(likely_format, "") prompt = create_analysis_prompt() enhanced_prompt = f"{prompt}\n\n{format_hint}{force_format}" # Format pour Llama full_prompt = f"<|begin_of_text|><|start_header_id|>user<|end_header_id|>\n\n{enhanced_prompt}\n\nTRANSCRIPTION:\n{transcription_text}<|eot_id|><|start_header_id|>assistant<|end_header_id|>\n\n" print(f"Analyse de: {filename}") print(f" Indices détectés: MT={format_indicators['mt']}, P+S={format_indicators['p_plus_s']}, QR={format_indicators['qr']}, P={format_indicators['p_only']}") # Génération avec le modèle response = generator( full_prompt, max_new_tokens=150, temperature=0.2, top_k=20, top_p=0.8, repetition_penalty=1.15, do_sample=True, pad_token_id=generator.tokenizer.eos_token_id ) result = response[0]['generated_text'].replace(full_prompt, '').strip() # Parser le résultat result = result.replace('\n', ' ').strip() if "|" in result: # Prendre la première ligne qui contient des | lines = result.split('\n') for line in lines: if "|" in line and line.count("|") >= 2: parts = line.split("|") if len(parts) >= 3: auteur = parts[0].strip() qualification = parts[1].strip() titre = parts[2].strip() # Nettoyer et formater if auteur.lower() in ["inconnu", "non mentionné", "auteur", ""]: auteur = "Inconnu" # Valider la qualification if qualification.upper() not in ["P", "P+S", "SON", "MT", "QR"]: qualification = "P" # Défaut return { "success": True, "auteur": auteur, "qualification": qualification.upper(), "titre": titre.upper(), "filename": filename } # Si pas de format avec |, essayer de parser différemment if " - " in result: parts = result.split(" - ") if len(parts) >= 3: auteur = parts[0].strip() qualification = parts[1].strip() titre = " - ".join(parts[2:]).strip() if auteur.lower() in ["inconnu", "non mentionné", "auteur", ""]: auteur = "Inconnu" # Valider la qualification if qualification.upper() not in ["P", "P+S", "QR"]: qualification = "P" return { "success": True, "auteur": auteur, "qualification": qualification.upper(), "titre": titre.upper(), "filename": filename } # Si le format n'est pas correct return { "success": False, "error": f"Format de réponse incorrect: {result}", "filename": filename, "raw_response": result } except Exception as e: return { "success": False, "error": str(e), "filename": filename } def read_transcription_file(file_path): """Lit le contenu d'un fichier de transcription et extrait les métadonnées""" try: with open(file_path, 'r', encoding='utf-8') as f: content = f.read() # Extraire les métadonnées metadata = {} lines = content.split('\n') for line in lines[:10]: # Chercher dans les 10 premières lignes if line.startswith('Fichier source:'): metadata['filename'] = line.replace('Fichier source:', '').strip() elif line.startswith('Durée de traitement:'): metadata['processing_time'] = line.replace('Durée de traitement:', '').strip() # Extraire seulement le texte de transcription (après les métadonnées) if "--------------------------------------------------" in content: parts = content.split("--------------------------------------------------") if len(parts) > 1: text_content = parts[1].strip() return text_content, metadata return content.strip(), metadata except Exception as e: print(f"Erreur lors de la lecture de {file_path}: {e}") return None, {} def apply_duration_correction(result, duration_seconds, format_indicators=None): """Applique une correction probabiliste basée sur la durée et les patterns détectés""" if not duration_seconds: return result original_qualification = result.get("qualification", "") corrected = False # Priorité 1: Détection Micro-Trottoir basée sur patterns + durée if format_indicators and format_indicators.get('mt', 0) >= 8 and duration_seconds < 60: if original_qualification in ["P", "P+S", "SON"]: result["qualification"] = "MT" corrected = True print(f" → Correction MT détecté: {original_qualification} → MT (patterns={format_indicators['mt']})") # Priorité 2: Logique probabiliste selon durée (si pas MT) elif not corrected: if duration_seconds < 30: # < 30s = quasi-certainement un SON if original_qualification in ["P", "P+S"]: result["qualification"] = "SON" corrected = True print(f" → Correction durée < 30s: {original_qualification} → SON") elif 30 <= duration_seconds <= 40: # 30-40s = probablement un SON, mais peut être P if original_qualification == "P+S": result["qualification"] = "SON" corrected = True print(f" → Correction durée 30-40s: P+S → SON") return result def extract_author_from_filename(filename): """Extrait le nom du journaliste depuis le nom du fichier""" try: import re # Nettoyer le nom du fichier clean_name = filename.replace('_transcription.txt', '').replace('.mp3', '').replace('.MP3', '') # Pattern spécifique pour "1 DEMARIA Philippe" -> "DEMARIA Philippe" # Supprimer les numéros au début clean_name = re.sub(r'^\d+\s+', '', clean_name).strip() # Si on a encore quelque chose, c'est probablement un nom if clean_name: return clean_name return "Inconnu" except Exception as e: print(f"Erreur extraction auteur: {e}") return "Inconnu" def get_audio_duration(audio_filename, input_dir): """Calcule la durée d'un fichier audio en secondes totales (version unique)""" try: # portable_env a déjà injecté ffmpeg dans le PATH si nécessaire from pydub import AudioSegment audio_path = None audio_extensions = ['.mp3', '.wav', '.m4a', '.flac', '.ogg', '.mp4', '.avi', '.mov'] # Rechercher le fichier audio correspondant for ext in audio_extensions: potential_path = input_dir / audio_filename.replace('_transcription.txt', ext) if potential_path.exists(): audio_path = potential_path break base_name = audio_filename.replace('_transcription.txt', '') potential_path = input_dir / f"{base_name}{ext}" if potential_path.exists(): audio_path = potential_path break if audio_path: audio = AudioSegment.from_file(str(audio_path)) duration_seconds = len(audio) / 1000 # pydub retourne en millisecondes minutes = int(duration_seconds // 60) seconds = int(duration_seconds % 60) return minutes * 100 + seconds return None except Exception as e: print(f"Erreur calcul durée pour {audio_filename}: {e}") return None def get_transcription_files(transcriptions_dir): """Récupère tous les fichiers de transcription (unique)""" if not transcriptions_dir.exists(): print(f"Le dossier {transcriptions_dir} n'existe pas") return [] txt_files = list(transcriptions_dir.glob("*_transcription.txt")) return sorted(txt_files) def main(): """Fonction principale""" print("=" * 60) print("ANALYSE DES BOB AVEC HUGGING FACE") print("=" * 60) # Vérification des dossiers script_dir = Path(__file__).parent.absolute() transcriptions_dir = Path(os.environ.get("BOB_TRANSCRIPTIONS_DIR", script_dir.parent / "output" / "transcriptions")) output_file = Path(os.environ.get("BOB_OUTPUT_FILE", script_dir.parent / "output" / "resume_bob.txt")) input_dir = Path(os.environ.get("BOB_INPUT_DIR", script_dir.parent / "input")) # Utiliser la fonction factorisée avec print comme log analyze_files_hf( transcriptions_dir=transcriptions_dir, input_dir=input_dir, output_file=output_file, log_fn=print, progress_fn=None, cancel_fn=None, ) if __name__ == "__main__": main() # --- API factorisée pour le GUI --- def analyze_files_hf(transcriptions_dir: Path, input_dir: Path, output_file: Path, log_fn=print, progress_fn=None, cancel_fn=None): """Analyse tous les fichiers de transcription avec Hugging Face""" log = log_fn or (lambda *a, **k: None) log("Dossier transcriptions: {}".format(transcriptions_dir)) log("Fichier de sortie: {}".format(output_file)) log("") transcription_files = get_transcription_files(transcriptions_dir) log(f"🔍 Recherche de fichiers dans: {transcriptions_dir}") log(f"📁 Contenu du dossier: {list(transcriptions_dir.iterdir()) if transcriptions_dir.exists() else 'Dossier non trouvé'}") if not transcription_files: log("❌ Aucun fichier de transcription trouvé") log("Assurez-vous d'avoir exécuté le script de transcription d'abord") return {"success": False, "count": 0} log(f"Trouvé {len(transcription_files)} fichier(s) de transcription:") for i, file in enumerate(transcription_files, 1): log(f" {i}. {file.name}") log("") # Initialisation du modèle Hugging Face generator = load_hf_model() if not generator: log("❌ Modèle Hugging Face indisponible") # Créer un fichier de résultat factice pour éviter l'erreur try: output_file.parent.mkdir(parents=True, exist_ok=True) with open(output_file, 'w', encoding='utf-8') as f: f.write("# ERREUR - Modèle indisponible\n") f.write("Erreur|P|MODÈLE INDISPONIBLE|000\n") log(f"📄 Fichier d'erreur créé: {output_file}") return {"success": False, "error": "Modèle Hugging Face indisponible", "count": 0} except Exception as write_error: log(f"❌ Erreur lors de la création du fichier d'erreur: {write_error}") return {"success": False, "error": "Modèle Hugging Face indisponible"} log("✅ Modèle chargé, début de l'analyse...") results = [] success_count = 0 total = len(transcription_files) for i, file_path in enumerate(transcription_files, 1): if cancel_fn and cancel_fn(): log("⏹️ Analyse annulée") break log(f"[{i}/{total}] ") transcription_text, metadata = read_transcription_file(file_path) if not transcription_text: log(f"✗ Impossible de lire {file_path.name}") if progress_fn: progress_fn(i, total) continue duration = get_audio_duration(file_path.name, input_dir) author_from_filename = extract_author_from_filename(file_path.name) result = analyze_transcription(generator, transcription_text, file_path.name) if result["success"]: format_indicators = detect_format_indicators(transcription_text) result = apply_duration_correction(result, duration, format_indicators) if result["auteur"].lower() in ["inconnu", "non mentionné", "auteur", ""]: result["auteur"] = author_from_filename result["duree"] = duration if duration else "000" result["filename_source"] = metadata.get("filename", file_path.name) log(f"✓ {result['auteur']} - {result['qualification']} - {result['titre']} - {result['duree']}") results.append(result) success_count += 1 else: log(f"✗ Erreur: {result['error']}") if "raw_response" in result: log(f" Réponse brute: {result['raw_response']}") if progress_fn: progress_fn(i, total) log("") # Version améliorée avec plus de debug : if results: try: log(f"💾 Tentative de création du fichier: {output_file}") log(f"📁 Dossier parent existe: {output_file.parent.exists()}") # Créer le dossier parent output_file.parent.mkdir(parents=True, exist_ok=True) log(f"📁 Dossier parent créé/vérifié: {output_file.parent}") # Écrire le fichier with open(output_file, 'w', encoding='utf-8') as f: f.write(f"# RÉSUMÉ DES BOB - {datetime.now().strftime('%d/%m/%Y %H:%M:%S')}\n") f.write(f"# Format: Auteur | Qualification | Titre | Durée\n") f.write("# Qualification: P=papier, P+S=papier+son, QR=question-réponse\n") f.write("# Durée: format MMss (ex: 1min04 = 104)\n") f.write("# " + "="*70 + "\n\n") for r in results: line = f"{r['auteur']} | {r['qualification']} | {r['titre']} | {r['duree']}" f.write(line + "\n") log(f"✅ Fichier écrit avec succès: {output_file}") log(f"📄 Contenu du fichier:") with open(output_file, 'r', encoding='utf-8') as f: content = f.read() for line in content.split('\n')[:10]: # Afficher les 10 premières lignes if line.strip(): log(f" {line}") log("=" * 60) log("RÉSUMÉ GÉNÉRÉ") log("=" * 60) log(f"Fichiers analysés: {total}") log(f"Analyses réussies: {success_count}") log(f"Analyses échouées: {total - success_count}") log(f"Fichier de résumé: {output_file}") log(f"📁 Contenu final du dossier output: {list(output_file.parent.iterdir())}") return {"success": True, "count": total, "ok": success_count, "results": results} except Exception as write_error: log(f"❌ Erreur lors de l'écriture du fichier: {write_error}") log(f"Traceback: {traceback.format_exc()}") return {"success": False, "error": f"Erreur d'écriture: {write_error}"} else: log("⚠️ Aucune analyse réussie, création d'un fichier factice...") # Créer un fichier factice pour éviter l'erreur try: output_file.parent.mkdir(parents=True, exist_ok=True) with open(output_file, 'w', encoding='utf-8') as f: f.write(f"# RÉSUMÉ DES BOB - {datetime.now().strftime('%d/%m/%Y %H:%M:%S')}\n") f.write("# Aucune analyse réussie\n") log(f"📄 Fichier factice créé: {output_file}") return {"success": False, "count": total, "ok": 0} except Exception as write_error: log(f"❌ Erreur lors de la création du fichier factice: {write_error}") return {"success": False, "error": f"Erreur d'écriture: {write_error}"}