BOB / analyze_bob_hf.py
vic3610's picture
Create analyze_bob_hf.py
f99c886 verified
raw
history blame
21.9 kB
#!/usr/bin/env python3
"""
Script d'analyse des transcriptions avec Hugging Face Transformers
Analyse les fichiers txt dans output/transcriptions et génère un résumé structuré
"""
import os
from pathlib import Path
import torch
from transformers import pipeline, AutoTokenizer, AutoModelForCausalLM
from datetime import datetime
import re
# Bootstrap environnement portable
try:
from portable_env import setup_portable_env
setup_portable_env()
except Exception:
pass
# Configuration (via env, avec fallback local)
BASE_DIR = Path(os.environ.get("BOB_BASE_DIR", Path(__file__).parent.parent))
TRANSCRIPTIONS_DIR = Path(os.environ.get("BOB_TRANSCRIPTIONS_DIR", BASE_DIR / "output" / "transcriptions"))
OUTPUT_FILE = Path(os.environ.get("BOB_OUTPUT_FILE", BASE_DIR / "output" / "resume_bob.txt"))
HF_MODEL = os.environ.get("HF_MODEL", "meta-llama/Llama-3.2-1B-Instruct") # par défaut
def load_hf_model():
"""Charge un modèle Hugging Face"""
try:
print(f"Chargement du modèle Hugging Face: {HF_MODEL}")
# Utiliser pipeline pour plus de simplicité
generator = pipeline(
"text-generation",
model=HF_MODEL,
torch_dtype=torch.float16 if torch.cuda.is_available() else torch.float32,
device_map="auto" if torch.cuda.is_available() else "cpu",
token=os.environ.get("HF_TOKEN") # Pour les modèles privés
)
print(f"Modèle {HF_MODEL} chargé avec succès")
return generator
except Exception as e:
print(f"Erreur lors du chargement du modèle Hugging Face: {e}")
print("Assurez-vous que le modèle est disponible et que vous avez les permissions nécessaires")
return None
def create_analysis_prompt():
"""Crée le prompt d'analyse"""
return """RÔLE: Expert en classification de contenu journalistique RTL.
TÂCHE: Extraire 3 informations précises de cette transcription radio :
1. AUTEUR : Nom complet du journaliste/présentateur
- Chercher "les précisions pour RTL de [NOM]" ou signature en fin
- Si absent : "Inconnu"
2. QUALIFICATION du format (TRÈS IMPORTANT) :
- P = PAPIER seul : Lecture continue par le journaliste, pas d'interviews
• Phrases à la 3e personne uniquement
• Aucune citation directe de témoins
• Style narratif/descriptif pur
- P+S = PAPIER + SON : Reportage avec interviews/témoignages
• Présence de citations directes ("Je...", "Nous...")
• Témoignages de personnes citées par leur prénom
• Alternance narratif + paroles rapportées
• Phrases comme "explique Alexandre", "témoigne Lucas"
- QR = QUESTIONS-RÉPONSES : Interview/débat en direct
• Format conversationnel
• Questions-réponses explicites
• Dialogue en temps réel
3. TITRE : Sujet principal en 4-6 mots, MAJUSCULES, style presse
INDICES DE DÉTECTION P+S :
- Citations à la 1ère personne : "J'ai été hospitalisé", "Nous avons commencé"
- Prénoms + témoignages : "Alexandre explique", "Lucas raconte"
- Discours rapporté : "Il dit que", "Ils nous ont dit"
- Changement de ton narratif
FORMAT OBLIGATOIRE :
AUTEUR|QUALIFICATION|TITRE"""
def detect_format_indicators(text):
"""Détecte automatiquement les indicateurs de format P/P+S/QR/MT"""
indicators = {
'p_plus_s': 0, # Papier + Son
'qr': 0, # Questions-Réponses
'mt': 0, # Micro-Trottoir
'p_only': 0 # Papier seul
}
text_lower = text.lower()
# Indicateurs Micro-Trottoir (MT)
mt_patterns = [
r'\bmoi je (?:trouve|pense|crois|dis)',
r'\bje trouve (?:que|ça|dommage)',
r'\bje pense que',
r'\bpour moi',
r'\bà mon avis',
r'\bfranchement',
r'en arrivant',
r'je viens (?:de|d\')',
r'aujourd\'hui',
r'c\'est dommage',
r'malheureusement',
r'donc je (?:voulais|pense)',
r'quand même',
r'un petit peu',
r'vraiment dommage',
]
# Indicateurs P+S (Papier + Son)
p_plus_s_patterns = [
r'\bje\s+(?:suis|ai|me|pense|crois|vais|veux|dois)',
r'\bnous\s+(?:avons|sommes|étions|allons|devons)',
r'\bj\'(?:ai|étais|avais|irai|aurais)',
r'\bmon\s+(?:père|fils|mari|frère)',
r'\bma\s+(?:mère|fille|femme|sœur)',
r'\b(?:explique|témoigne|raconte|confie|précise|ajoute|poursuit)\s+\w+',
r'\b\w+\s+(?:explique|témoigne|raconte|confie|précise|ajoute|poursuit)',
r'selon\s+\w+',
r'(?:il|elle|ils|elles)\s+(?:dit|disent|explique|expliquent|affirme|assure)\s+que',
r'pour\s+\w+\s*,',
r'comme\s+(?:le\s+)?(?:dit|explique|précise)\s+\w+',
r'voilà ce à quoi',
r'c\'est qu?\'?à? partir',
r'certains d\'entre (?:nous|eux)',
r'parmi les\s+\d+',
r'\b[A-Z][a-z]+\s+qui\s+(?:est|a|était)',
r'comme\s+[A-Z][a-z]+',
r'fièvre\s+et\s+\w+',
r'hospitalisé',
r'symptômes',
r'malade',
]
# Indicateurs QR (Questions-Réponses)
qr_patterns = [
r'\?.*[A-Z]', # Question suivie de réponse
r'question\s*:',
r'réponse\s*:',
r'vous\s+(?:pensez|croyez|dites)',
r'que\s+pensez-vous',
r'interview',
r'débat',
]
# Compter les patterns
for pattern in mt_patterns:
indicators['mt'] += len(re.findall(pattern, text_lower, re.IGNORECASE))
for pattern in p_plus_s_patterns:
indicators['p_plus_s'] += len(re.findall(pattern, text_lower, re.IGNORECASE))
for pattern in qr_patterns:
indicators['qr'] += len(re.findall(pattern, text_lower, re.IGNORECASE))
# Si ni MT ni P+S ni QR détecté fortement, c'est probablement P seul
if indicators['mt'] < 3 and indicators['p_plus_s'] < 3 and indicators['qr'] < 2:
indicators['p_only'] = 5
return indicators
def analyze_transcription(generator, transcription_text, filename):
"""Analyse une transcription avec Hugging Face"""
try:
# Analyse automatique des patterns
format_indicators = detect_format_indicators(transcription_text)
# Créer un hint pour l'IA basé sur l'analyse automatique
max_score = max(format_indicators.values())
likely_format = [k for k, v in format_indicators.items() if v == max_score][0]
hint_map = {
'p_plus_s': "ATTENTION: Nombreux témoignages détectés → OBLIGATOIREMENT P+S",
'qr': "ATTENTION: Format questions-réponses détecté → OBLIGATOIREMENT QR",
'p_only': "ATTENTION: Aucun témoignage/interview → OBLIGATOIREMENT P"
}
# Si détection très claire (score élevé), forcer le format
force_format = ""
if format_indicators['p_plus_s'] >= 5:
force_format = "\nFORMAT IMPOSÉ: Utilise OBLIGATOIREMENT 'P+S' pour la qualification."
elif format_indicators['qr'] >= 3:
force_format = "\nFORMAT IMPOSÉ: Utilise OBLIGATOIREMENT 'QR' pour la qualification."
elif format_indicators['p_plus_s'] <= 1 and format_indicators['qr'] <= 1:
force_format = "\nFORMAT IMPOSÉ: Utilise OBLIGATOIREMENT 'P' pour la qualification."
format_hint = hint_map.get(likely_format, "")
prompt = create_analysis_prompt()
enhanced_prompt = f"{prompt}\n\n{format_hint}{force_format}"
# Format pour Llama
full_prompt = f"<|begin_of_text|><|start_header_id|>user<|end_header_id|>\n\n{enhanced_prompt}\n\nTRANSCRIPTION:\n{transcription_text}<|eot_id|><|start_header_id|>assistant<|end_header_id|>\n\n"
print(f"Analyse de: {filename}")
print(f" Indices détectés: MT={format_indicators['mt']}, P+S={format_indicators['p_plus_s']}, QR={format_indicators['qr']}, P={format_indicators['p_only']}")
# Génération avec le modèle
response = generator(
full_prompt,
max_new_tokens=150,
temperature=0.2,
top_k=20,
top_p=0.8,
repetition_penalty=1.15,
do_sample=True,
pad_token_id=generator.tokenizer.eos_token_id
)
result = response[0]['generated_text'].replace(full_prompt, '').strip()
# Parser le résultat
result = result.replace('\n', ' ').strip()
if "|" in result:
# Prendre la première ligne qui contient des |
lines = result.split('\n')
for line in lines:
if "|" in line and line.count("|") >= 2:
parts = line.split("|")
if len(parts) >= 3:
auteur = parts[0].strip()
qualification = parts[1].strip()
titre = parts[2].strip()
# Nettoyer et formater
if auteur.lower() in ["inconnu", "non mentionné", "auteur", ""]:
auteur = "Inconnu"
# Valider la qualification
if qualification.upper() not in ["P", "P+S", "SON", "MT", "QR"]:
qualification = "P" # Défaut
return {
"success": True,
"auteur": auteur,
"qualification": qualification.upper(),
"titre": titre.upper(),
"filename": filename
}
# Si pas de format avec |, essayer de parser différemment
if " - " in result:
parts = result.split(" - ")
if len(parts) >= 3:
auteur = parts[0].strip()
qualification = parts[1].strip()
titre = " - ".join(parts[2:]).strip()
if auteur.lower() in ["inconnu", "non mentionné", "auteur", ""]:
auteur = "Inconnu"
# Valider la qualification
if qualification.upper() not in ["P", "P+S", "QR"]:
qualification = "P"
return {
"success": True,
"auteur": auteur,
"qualification": qualification.upper(),
"titre": titre.upper(),
"filename": filename
}
# Si le format n'est pas correct
return {
"success": False,
"error": f"Format de réponse incorrect: {result}",
"filename": filename,
"raw_response": result
}
except Exception as e:
return {
"success": False,
"error": str(e),
"filename": filename
}
def read_transcription_file(file_path):
"""Lit le contenu d'un fichier de transcription et extrait les métadonnées"""
try:
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
# Extraire les métadonnées
metadata = {}
lines = content.split('\n')
for line in lines[:10]: # Chercher dans les 10 premières lignes
if line.startswith('Fichier source:'):
metadata['filename'] = line.replace('Fichier source:', '').strip()
elif line.startswith('Durée de traitement:'):
metadata['processing_time'] = line.replace('Durée de traitement:', '').strip()
# Extraire seulement le texte de transcription (après les métadonnées)
if "--------------------------------------------------" in content:
parts = content.split("--------------------------------------------------")
if len(parts) > 1:
text_content = parts[1].strip()
return text_content, metadata
return content.strip(), metadata
except Exception as e:
print(f"Erreur lors de la lecture de {file_path}: {e}")
return None, {}
def apply_duration_correction(result, duration_seconds, format_indicators=None):
"""Applique une correction probabiliste basée sur la durée et les patterns détectés"""
if not duration_seconds:
return result
original_qualification = result.get("qualification", "")
corrected = False
# Priorité 1: Détection Micro-Trottoir basée sur patterns + durée
if format_indicators and format_indicators.get('mt', 0) >= 8 and duration_seconds < 60:
if original_qualification in ["P", "P+S", "SON"]:
result["qualification"] = "MT"
corrected = True
print(f" → Correction MT détecté: {original_qualification} → MT (patterns={format_indicators['mt']})")
# Priorité 2: Logique probabiliste selon durée (si pas MT)
elif not corrected:
if duration_seconds < 30:
# < 30s = quasi-certainement un SON
if original_qualification in ["P", "P+S"]:
result["qualification"] = "SON"
corrected = True
print(f" → Correction durée < 30s: {original_qualification} → SON")
elif 30 <= duration_seconds <= 40:
# 30-40s = probablement un SON, mais peut être P
if original_qualification == "P+S":
result["qualification"] = "SON"
corrected = True
print(f" → Correction durée 30-40s: P+S → SON")
return result
def extract_author_from_filename(filename):
"""Extrait le nom du journaliste depuis le nom du fichier"""
try:
# Nettoyer le nom du fichier
clean_name = filename.replace('_transcription.txt', '').replace('.mp3', '').replace('.MP3', '')
# Patterns courants pour extraire un nom (prénom + nom)
words = clean_name.split()
# Chercher une séquence de 2 mots qui commencent par une majuscule
for i in range(len(words) - 1):
word1 = words[i].strip('()[]{}.,;:!?-_')
word2 = words[i + 1].strip('()[]{}.,;:!?-_')
# Vérifier si les deux mots ressemblent à un prénom + nom
if (len(word1) >= 2 and len(word2) >= 2 and
word1[0].isupper() and word2[0].isupper() and
word1.isalpha() and word2.isalpha()):
return f"{word1} {word2}"
# Si pas trouvé, chercher le premier mot qui commence par une majuscule
for word in words:
clean_word = word.strip('()[]{}.,;:!?-_')
if len(clean_word) >= 2 and clean_word[0].isupper() and clean_word.isalpha():
# Essayer de trouver le mot suivant
word_index = words.index(word)
if word_index + 1 < len(words):
next_word = words[word_index + 1].strip('()[]{}.,;:!?-_')
if len(next_word) >= 2 and next_word[0].isupper() and next_word.isalpha():
return f"{clean_word} {next_word}"
return clean_word
return "Inconnu"
except Exception as e:
print(f"Erreur extraction auteur de {filename}: {e}")
return "Inconnu"
def get_audio_duration(audio_filename, input_dir):
"""Calcule la durée d'un fichier audio en secondes totales"""
try:
from pydub import AudioSegment
audio_path = None
audio_extensions = ['.mp3', '.wav', '.m4a', '.flac', '.ogg', '.mp4', '.avi', '.mov']
# Rechercher le fichier audio correspondant
for ext in audio_extensions:
potential_path = input_dir / audio_filename.replace('_transcription.txt', ext)
if potential_path.exists():
audio_path = potential_path
break
base_name = audio_filename.replace('_transcription.txt', '')
potential_path = input_dir / f"{base_name}{ext}"
if potential_path.exists():
audio_path = potential_path
break
if audio_path:
audio = AudioSegment.from_file(str(audio_path))
duration_seconds = len(audio) / 1000 # pydub retourne en millisecondes
minutes = int(duration_seconds // 60)
seconds = int(duration_seconds % 60)
return minutes * 100 + seconds
return None
except Exception as e:
print(f"Erreur calcul durée pour {audio_filename}: {e}")
return None
def get_transcription_files(transcriptions_dir):
"""Récupère tous les fichiers de transcription"""
if not transcriptions_dir.exists():
print(f"Le dossier {transcriptions_dir} n'existe pas")
return []
txt_files = list(transcriptions_dir.glob("*_transcription.txt"))
return sorted(txt_files)
def main():
"""Fonction principale"""
print("=" * 60)
print("ANALYSE DES BOB AVEC HUGGING FACE")
print("=" * 60)
# Vérification des dossiers
script_dir = Path(__file__).parent.absolute()
transcriptions_dir = Path(os.environ.get("BOB_TRANSCRIPTIONS_DIR", script_dir.parent / "output" / "transcriptions"))
output_file = Path(os.environ.get("BOB_OUTPUT_FILE", script_dir.parent / "output" / "resume_bob.txt"))
input_dir = Path(os.environ.get("BOB_INPUT_DIR", script_dir.parent / "input"))
# Utiliser la fonction factorisée avec print comme log
analyze_files_hf(
transcriptions_dir=transcriptions_dir,
input_dir=input_dir,
output_file=output_file,
log_fn=print,
progress_fn=None,
cancel_fn=None,
)
if __name__ == "__main__":
main()
# --- API factorisée pour le GUI ---
def analyze_files_hf(transcriptions_dir: Path, input_dir: Path, output_file: Path, log_fn=print, progress_fn=None, cancel_fn=None):
"""Analyse tous les fichiers de transcription avec Hugging Face"""
log = log_fn or (lambda *a, **k: None)
log("Dossier transcriptions: {}".format(transcriptions_dir))
log("Fichier de sortie: {}".format(output_file))
log("")
transcription_files = get_transcription_files(transcriptions_dir)
if not transcription_files:
log("Aucun fichier de transcription trouvé")
log("Assurez-vous d'avoir exécuté le script de transcription d'abord")
return {"success": False, "count": 0}
log(f"Trouvé {len(transcription_files)} fichier(s) de transcription:")
for i, file in enumerate(transcription_files, 1):
log(f" {i}. {file.name}")
log("")
# Initialisation du modèle Hugging Face
generator = load_hf_model()
if not generator:
return {"success": False, "error": "Modèle Hugging Face indisponible"}
results = []
success_count = 0
total = len(transcription_files)
for i, file_path in enumerate(transcription_files, 1):
if cancel_fn and cancel_fn():
log("⏹️ Analyse annulée")
break
log(f"[{i}/{total}] ")
transcription_text, metadata = read_transcription_file(file_path)
if not transcription_text:
log(f"✗ Impossible de lire {file_path.name}")
if progress_fn:
progress_fn(i, total)
continue
duration = get_audio_duration(file_path.name, input_dir)
author_from_filename = extract_author_from_filename(file_path.name)
result = analyze_transcription(generator, transcription_text, file_path.name)
if result["success"]:
format_indicators = detect_format_indicators(transcription_text)
result = apply_duration_correction(result, duration, format_indicators)
if result["auteur"].lower() in ["inconnu", "non mentionné", "auteur", ""]:
result["auteur"] = author_from_filename
result["duree"] = duration if duration else "000"
result["filename_source"] = metadata.get("filename", file_path.name)
log(f"✓ {result['auteur']} - {result['qualification']} - {result['titre']} - {result['duree']}")
results.append(result)
success_count += 1
else:
log(f"✗ Erreur: {result['error']}")
if "raw_response" in result:
log(f" Réponse brute: {result['raw_response']}")
if progress_fn:
progress_fn(i, total)
log("")
if results:
output_file.parent.mkdir(parents=True, exist_ok=True)
with open(output_file, 'w', encoding='utf-8') as f:
f.write(f"# RÉSUMÉ DES BOB - {datetime.now().strftime('%d/%m/%Y %H:%M:%S')}\n")
f.write(f"# Format: Auteur | Qualification | Titre | Durée\n")
f.write("# Qualification: P=papier, P+S=papier+son, QR=question-réponse\n")
f.write("# Durée: format MMss (ex: 1min04 = 104)\n")
f.write("# " + "="*70 + "\n\n")
for r in results:
line = f"{r['auteur']} | {r['qualification']} | {r['titre']} | {r['duree']}"
f.write(line + "\n")
log("=" * 60)
log("RÉSUMÉ GÉNÉRÉ")
log("=" * 60)
log(f"Fichiers analysés: {total}")
log(f"Analyses réussies: {success_count}")
log(f"Analyses échouées: {total - success_count}")
log(f"Fichier de résumé: {output_file}")
return {"success": True, "count": total, "ok": success_count, "results": results}
else:
log("Aucune analyse réussie, pas de fichier de résumé généré")
return {"success": False, "count": total, "ok": 0}