|
|
|
|
|
""" |
|
|
Script d'analyse des transcriptions avec Hugging Face Transformers |
|
|
Analyse les fichiers txt dans output/transcriptions et génère un résumé structuré |
|
|
""" |
|
|
|
|
|
import os |
|
|
from pathlib import Path |
|
|
import torch |
|
|
from transformers import pipeline, AutoTokenizer, AutoModelForCausalLM |
|
|
from datetime import datetime |
|
|
import re |
|
|
import traceback |
|
|
from dotenv import load_dotenv |
|
|
|
|
|
|
|
|
try: |
|
|
from portable_env import setup_portable_env |
|
|
setup_portable_env() |
|
|
except Exception: |
|
|
pass |
|
|
|
|
|
|
|
|
try: |
|
|
load_dotenv(Path(__file__).parent.parent / ".env") |
|
|
except Exception: |
|
|
pass |
|
|
|
|
|
|
|
|
BASE_DIR = Path(os.environ.get("BOB_BASE_DIR", Path(__file__).parent.parent)) |
|
|
TRANSCRIPTIONS_DIR = Path(os.environ.get("BOB_TRANSCRIPTIONS_DIR", BASE_DIR / "output" / "transcriptions")) |
|
|
OUTPUT_FILE = Path(os.environ.get("BOB_OUTPUT_FILE", BASE_DIR / "output" / "resume_bob.txt")) |
|
|
HF_MODEL = os.environ.get("HF_MODEL", "google/gemma-3-4b-pt") |
|
|
|
|
|
def get_hf_model(): |
|
|
"""Récupère le modèle Hugging Face depuis la variable d'environnement""" |
|
|
return os.environ.get("HF_MODEL", "google/gemma-3-4b-pt") |
|
|
|
|
|
|
|
|
def load_hf_model(): |
|
|
"""Charge un modèle Hugging Face""" |
|
|
try: |
|
|
hf_model = get_hf_model() |
|
|
print(f"Chargement du modèle Hugging Face: {hf_model}") |
|
|
|
|
|
|
|
|
generator = pipeline( |
|
|
"text-generation", |
|
|
model=hf_model, |
|
|
torch_dtype=torch.float16 if torch.cuda.is_available() else torch.float32, |
|
|
device_map="auto" if torch.cuda.is_available() else "cpu", |
|
|
token=os.environ.get("HF_TOKEN") |
|
|
) |
|
|
|
|
|
print(f"✅ Modèle {hf_model} chargé avec succès") |
|
|
return generator |
|
|
except Exception as e: |
|
|
print(f"❌ Erreur lors du chargement du modèle Hugging Face: {e}") |
|
|
print(f"Traceback: {traceback.format_exc()}") |
|
|
print("Assurez-vous que le modèle est disponible et que vous avez les permissions nécessaires") |
|
|
return None |
|
|
|
|
|
def create_analysis_prompt(): |
|
|
"""Crée le prompt d'analyse""" |
|
|
return """RÔLE: Expert en classification de contenu journalistique RTL. |
|
|
|
|
|
TÂCHE: Extraire 3 informations précises de cette transcription radio : |
|
|
|
|
|
1. AUTEUR : Nom complet du journaliste/présentateur |
|
|
- Chercher "les précisions pour RTL de [NOM]" ou signature en fin |
|
|
- Si absent : "Inconnu" |
|
|
|
|
|
2. QUALIFICATION du format (TRÈS IMPORTANT) : |
|
|
- P = PAPIER seul : Lecture continue par le journaliste, pas d'interviews |
|
|
• Phrases à la 3e personne uniquement |
|
|
• Aucune citation directe de témoins |
|
|
• Style narratif/descriptif pur |
|
|
|
|
|
- P+S = PAPIER + SON : Reportage avec interviews/témoignages |
|
|
• Présence de citations directes ("Je...", "Nous...") |
|
|
• Témoignages de personnes citées par leur prénom |
|
|
• Alternance narratif + paroles rapportées |
|
|
• Phrases comme "explique Alexandre", "témoigne Lucas" |
|
|
|
|
|
- QR = QUESTIONS-RÉPONSES : Interview/débat en direct |
|
|
• Format conversationnel |
|
|
• Questions-réponses explicites |
|
|
• Dialogue en temps réel |
|
|
|
|
|
3. TITRE : Sujet principal en 4-6 mots, MAJUSCULES, style presse |
|
|
|
|
|
INDICES DE DÉTECTION P+S : |
|
|
- Citations à la 1ère personne : "J'ai été hospitalisé", "Nous avons commencé" |
|
|
- Prénoms + témoignages : "Alexandre explique", "Lucas raconte" |
|
|
- Discours rapporté : "Il dit que", "Ils nous ont dit" |
|
|
- Changement de ton narratif |
|
|
|
|
|
FORMAT OBLIGATOIRE : |
|
|
AUTEUR|QUALIFICATION|TITRE""" |
|
|
|
|
|
def detect_format_indicators(text): |
|
|
"""Détecte automatiquement les indicateurs de format P/P+S/QR/MT""" |
|
|
indicators = { |
|
|
'p_plus_s': 0, |
|
|
'qr': 0, |
|
|
'mt': 0, |
|
|
'p_only': 0 |
|
|
} |
|
|
|
|
|
text_lower = text.lower() |
|
|
|
|
|
|
|
|
mt_patterns = [ |
|
|
r'\bmoi je (?:trouve|pense|crois|dis)', |
|
|
r'\bje trouve (?:que|ça|dommage)', |
|
|
r'\bje pense que', |
|
|
r'\bpour moi', |
|
|
r'\bà mon avis', |
|
|
r'\bfranchement', |
|
|
r'en arrivant', |
|
|
r'je viens (?:de|d\')', |
|
|
r'aujourd\'hui', |
|
|
r'c\'est dommage', |
|
|
r'malheureusement', |
|
|
r'donc je (?:voulais|pense)', |
|
|
r'quand même', |
|
|
r'un petit peu', |
|
|
r'vraiment dommage', |
|
|
] |
|
|
|
|
|
|
|
|
p_plus_s_patterns = [ |
|
|
r'\bje\s+(?:suis|ai|me|pense|crois|vais|veux|dois)', |
|
|
r'\bnous\s+(?:avons|sommes|étions|allons|devons)', |
|
|
r'\bj\'(?:ai|étais|avais|irai|aurais)', |
|
|
r'\bmon\s+(?:père|fils|mari|frère)', |
|
|
r'\bma\s+(?:mère|fille|femme|sœur)', |
|
|
r'\b(?:explique|témoigne|raconte|confie|précise|ajoute|poursuit)\s+\w+', |
|
|
r'\b\w+\s+(?:explique|témoigne|raconte|confie|précise|ajoute|poursuit)', |
|
|
r'selon\s+\w+', |
|
|
r'(?:il|elle|ils|elles)\s+(?:dit|disent|explique|expliquent|affirme|assure)\s+que', |
|
|
r'pour\s+\w+\s*,', |
|
|
r'comme\s+(?:le\s+)?(?:dit|explique|précise)\s+\w+', |
|
|
r'voilà ce à quoi', |
|
|
r'c\'est qu?\'?à? partir', |
|
|
r'certains d\'entre (?:nous|eux)', |
|
|
r'parmi les\s+\d+', |
|
|
r'\b[A-Z][a-z]+\s+qui\s+(?:est|a|était)', |
|
|
r'comme\s+[A-Z][a-z]+', |
|
|
r'fièvre\s+et\s+\w+', |
|
|
r'hospitalisé', |
|
|
r'symptômes', |
|
|
r'malade', |
|
|
] |
|
|
|
|
|
|
|
|
qr_patterns = [ |
|
|
r'\?.*[A-Z]', |
|
|
r'question\s*:', |
|
|
r'réponse\s*:', |
|
|
r'vous\s+(?:pensez|croyez|dites)', |
|
|
r'que\s+pensez-vous', |
|
|
r'interview', |
|
|
r'débat', |
|
|
] |
|
|
|
|
|
|
|
|
for pattern in mt_patterns: |
|
|
indicators['mt'] += len(re.findall(pattern, text_lower, re.IGNORECASE)) |
|
|
|
|
|
for pattern in p_plus_s_patterns: |
|
|
indicators['p_plus_s'] += len(re.findall(pattern, text_lower, re.IGNORECASE)) |
|
|
|
|
|
for pattern in qr_patterns: |
|
|
indicators['qr'] += len(re.findall(pattern, text_lower, re.IGNORECASE)) |
|
|
|
|
|
|
|
|
if indicators['mt'] < 3 and indicators['p_plus_s'] < 3 and indicators['qr'] < 2: |
|
|
indicators['p_only'] = 5 |
|
|
|
|
|
return indicators |
|
|
|
|
|
def analyze_transcription(generator, transcription_text, filename): |
|
|
"""Analyse une transcription avec Hugging Face""" |
|
|
try: |
|
|
|
|
|
format_indicators = detect_format_indicators(transcription_text) |
|
|
|
|
|
|
|
|
max_score = max(format_indicators.values()) |
|
|
likely_format = [k for k, v in format_indicators.items() if v == max_score][0] |
|
|
|
|
|
hint_map = { |
|
|
'p_plus_s': "ATTENTION: Nombreux témoignages détectés → OBLIGATOIREMENT P+S", |
|
|
'qr': "ATTENTION: Format questions-réponses détecté → OBLIGATOIREMENT QR", |
|
|
'p_only': "ATTENTION: Aucun témoignage/interview → OBLIGATOIREMENT P" |
|
|
} |
|
|
|
|
|
|
|
|
force_format = "" |
|
|
if format_indicators['p_plus_s'] >= 5: |
|
|
force_format = "\nFORMAT IMPOSÉ: Utilise OBLIGATOIREMENT 'P+S' pour la qualification." |
|
|
elif format_indicators['qr'] >= 3: |
|
|
force_format = "\nFORMAT IMPOSÉ: Utilise OBLIGATOIREMENT 'QR' pour la qualification." |
|
|
elif format_indicators['p_plus_s'] <= 1 and format_indicators['qr'] <= 1: |
|
|
force_format = "\nFORMAT IMPOSÉ: Utilise OBLIGATOIREMENT 'P' pour la qualification." |
|
|
|
|
|
format_hint = hint_map.get(likely_format, "") |
|
|
|
|
|
prompt = create_analysis_prompt() |
|
|
enhanced_prompt = f"{prompt}\n\n{format_hint}{force_format}" |
|
|
|
|
|
|
|
|
full_prompt = f"<|begin_of_text|><|start_header_id|>user<|end_header_id|>\n\n{enhanced_prompt}\n\nTRANSCRIPTION:\n{transcription_text}<|eot_id|><|start_header_id|>assistant<|end_header_id|>\n\n" |
|
|
|
|
|
print(f"Analyse de: {filename}") |
|
|
print(f" Indices détectés: MT={format_indicators['mt']}, P+S={format_indicators['p_plus_s']}, QR={format_indicators['qr']}, P={format_indicators['p_only']}") |
|
|
|
|
|
|
|
|
response = generator( |
|
|
full_prompt, |
|
|
max_new_tokens=150, |
|
|
temperature=0.2, |
|
|
top_k=20, |
|
|
top_p=0.8, |
|
|
repetition_penalty=1.15, |
|
|
do_sample=True, |
|
|
pad_token_id=generator.tokenizer.eos_token_id |
|
|
) |
|
|
|
|
|
result = response[0]['generated_text'].replace(full_prompt, '').strip() |
|
|
|
|
|
|
|
|
result = result.replace('\n', ' ').strip() |
|
|
|
|
|
if "|" in result: |
|
|
|
|
|
lines = result.split('\n') |
|
|
for line in lines: |
|
|
if "|" in line and line.count("|") >= 2: |
|
|
parts = line.split("|") |
|
|
if len(parts) >= 3: |
|
|
auteur = parts[0].strip() |
|
|
qualification = parts[1].strip() |
|
|
titre = parts[2].strip() |
|
|
|
|
|
|
|
|
if auteur.lower() in ["inconnu", "non mentionné", "auteur", ""]: |
|
|
auteur = "Inconnu" |
|
|
|
|
|
|
|
|
if qualification.upper() not in ["P", "P+S", "SON", "MT", "QR"]: |
|
|
qualification = "P" |
|
|
|
|
|
return { |
|
|
"success": True, |
|
|
"auteur": auteur, |
|
|
"qualification": qualification.upper(), |
|
|
"titre": titre.upper(), |
|
|
"filename": filename |
|
|
} |
|
|
|
|
|
|
|
|
if " - " in result: |
|
|
parts = result.split(" - ") |
|
|
if len(parts) >= 3: |
|
|
auteur = parts[0].strip() |
|
|
qualification = parts[1].strip() |
|
|
titre = " - ".join(parts[2:]).strip() |
|
|
|
|
|
if auteur.lower() in ["inconnu", "non mentionné", "auteur", ""]: |
|
|
auteur = "Inconnu" |
|
|
|
|
|
|
|
|
if qualification.upper() not in ["P", "P+S", "QR"]: |
|
|
qualification = "P" |
|
|
|
|
|
return { |
|
|
"success": True, |
|
|
"auteur": auteur, |
|
|
"qualification": qualification.upper(), |
|
|
"titre": titre.upper(), |
|
|
"filename": filename |
|
|
} |
|
|
|
|
|
|
|
|
return { |
|
|
"success": False, |
|
|
"error": f"Format de réponse incorrect: {result}", |
|
|
"filename": filename, |
|
|
"raw_response": result |
|
|
} |
|
|
|
|
|
except Exception as e: |
|
|
return { |
|
|
"success": False, |
|
|
"error": str(e), |
|
|
"filename": filename |
|
|
} |
|
|
|
|
|
def read_transcription_file(file_path): |
|
|
"""Lit le contenu d'un fichier de transcription et extrait les métadonnées""" |
|
|
try: |
|
|
with open(file_path, 'r', encoding='utf-8') as f: |
|
|
content = f.read() |
|
|
|
|
|
|
|
|
metadata = {} |
|
|
lines = content.split('\n') |
|
|
|
|
|
for line in lines[:10]: |
|
|
if line.startswith('Fichier source:'): |
|
|
metadata['filename'] = line.replace('Fichier source:', '').strip() |
|
|
elif line.startswith('Durée de traitement:'): |
|
|
metadata['processing_time'] = line.replace('Durée de traitement:', '').strip() |
|
|
|
|
|
|
|
|
if "--------------------------------------------------" in content: |
|
|
parts = content.split("--------------------------------------------------") |
|
|
if len(parts) > 1: |
|
|
text_content = parts[1].strip() |
|
|
return text_content, metadata |
|
|
|
|
|
return content.strip(), metadata |
|
|
|
|
|
except Exception as e: |
|
|
print(f"Erreur lors de la lecture de {file_path}: {e}") |
|
|
return None, {} |
|
|
|
|
|
def apply_duration_correction(result, duration_seconds, format_indicators=None): |
|
|
"""Applique une correction probabiliste basée sur la durée et les patterns détectés""" |
|
|
if not duration_seconds: |
|
|
return result |
|
|
|
|
|
original_qualification = result.get("qualification", "") |
|
|
corrected = False |
|
|
|
|
|
|
|
|
if format_indicators and format_indicators.get('mt', 0) >= 8 and duration_seconds < 60: |
|
|
if original_qualification in ["P", "P+S", "SON"]: |
|
|
result["qualification"] = "MT" |
|
|
corrected = True |
|
|
print(f" → Correction MT détecté: {original_qualification} → MT (patterns={format_indicators['mt']})") |
|
|
|
|
|
|
|
|
elif not corrected: |
|
|
if duration_seconds < 30: |
|
|
|
|
|
if original_qualification in ["P", "P+S"]: |
|
|
result["qualification"] = "SON" |
|
|
corrected = True |
|
|
print(f" → Correction durée < 30s: {original_qualification} → SON") |
|
|
|
|
|
elif 30 <= duration_seconds <= 40: |
|
|
|
|
|
if original_qualification == "P+S": |
|
|
result["qualification"] = "SON" |
|
|
corrected = True |
|
|
print(f" → Correction durée 30-40s: P+S → SON") |
|
|
|
|
|
return result |
|
|
|
|
|
def extract_author_from_filename(filename): |
|
|
"""Extrait le nom du journaliste depuis le nom du fichier""" |
|
|
try: |
|
|
import re |
|
|
|
|
|
|
|
|
clean_name = filename.replace('_transcription.txt', '').replace('.mp3', '').replace('.MP3', '') |
|
|
|
|
|
|
|
|
|
|
|
clean_name = re.sub(r'^\d+\s+', '', clean_name).strip() |
|
|
|
|
|
|
|
|
if clean_name: |
|
|
return clean_name |
|
|
|
|
|
return "Inconnu" |
|
|
|
|
|
except Exception as e: |
|
|
print(f"Erreur extraction auteur: {e}") |
|
|
return "Inconnu" |
|
|
|
|
|
def get_audio_duration(audio_filename, input_dir): |
|
|
"""Calcule la durée d'un fichier audio en secondes totales (version unique)""" |
|
|
try: |
|
|
|
|
|
from pydub import AudioSegment |
|
|
audio_path = None |
|
|
audio_extensions = ['.mp3', '.wav', '.m4a', '.flac', '.ogg', '.mp4', '.avi', '.mov'] |
|
|
|
|
|
for ext in audio_extensions: |
|
|
potential_path = input_dir / audio_filename.replace('_transcription.txt', ext) |
|
|
if potential_path.exists(): |
|
|
audio_path = potential_path |
|
|
break |
|
|
base_name = audio_filename.replace('_transcription.txt', '') |
|
|
potential_path = input_dir / f"{base_name}{ext}" |
|
|
if potential_path.exists(): |
|
|
audio_path = potential_path |
|
|
break |
|
|
if audio_path: |
|
|
audio = AudioSegment.from_file(str(audio_path)) |
|
|
duration_seconds = len(audio) / 1000 |
|
|
minutes = int(duration_seconds // 60) |
|
|
seconds = int(duration_seconds % 60) |
|
|
return minutes * 100 + seconds |
|
|
return None |
|
|
except Exception as e: |
|
|
print(f"Erreur calcul durée pour {audio_filename}: {e}") |
|
|
return None |
|
|
|
|
|
def get_transcription_files(transcriptions_dir): |
|
|
"""Récupère tous les fichiers de transcription (unique)""" |
|
|
if not transcriptions_dir.exists(): |
|
|
print(f"Le dossier {transcriptions_dir} n'existe pas") |
|
|
return [] |
|
|
txt_files = list(transcriptions_dir.glob("*_transcription.txt")) |
|
|
return sorted(txt_files) |
|
|
|
|
|
def main(): |
|
|
"""Fonction principale""" |
|
|
print("=" * 60) |
|
|
print("ANALYSE DES BOB AVEC HUGGING FACE") |
|
|
print("=" * 60) |
|
|
|
|
|
|
|
|
script_dir = Path(__file__).parent.absolute() |
|
|
transcriptions_dir = Path(os.environ.get("BOB_TRANSCRIPTIONS_DIR", script_dir.parent / "output" / "transcriptions")) |
|
|
output_file = Path(os.environ.get("BOB_OUTPUT_FILE", script_dir.parent / "output" / "resume_bob.txt")) |
|
|
input_dir = Path(os.environ.get("BOB_INPUT_DIR", script_dir.parent / "input")) |
|
|
|
|
|
|
|
|
analyze_files_hf( |
|
|
transcriptions_dir=transcriptions_dir, |
|
|
input_dir=input_dir, |
|
|
output_file=output_file, |
|
|
log_fn=print, |
|
|
progress_fn=None, |
|
|
cancel_fn=None, |
|
|
) |
|
|
|
|
|
if __name__ == "__main__": |
|
|
main() |
|
|
|
|
|
|
|
|
def analyze_files_hf(transcriptions_dir: Path, input_dir: Path, output_file: Path, log_fn=print, progress_fn=None, cancel_fn=None): |
|
|
"""Analyse tous les fichiers de transcription avec Hugging Face""" |
|
|
log = log_fn or (lambda *a, **k: None) |
|
|
|
|
|
log("Dossier transcriptions: {}".format(transcriptions_dir)) |
|
|
log("Fichier de sortie: {}".format(output_file)) |
|
|
log("") |
|
|
|
|
|
transcription_files = get_transcription_files(transcriptions_dir) |
|
|
log(f"🔍 Recherche de fichiers dans: {transcriptions_dir}") |
|
|
log(f"📁 Contenu du dossier: {list(transcriptions_dir.iterdir()) if transcriptions_dir.exists() else 'Dossier non trouvé'}") |
|
|
if not transcription_files: |
|
|
log("❌ Aucun fichier de transcription trouvé") |
|
|
log("Assurez-vous d'avoir exécuté le script de transcription d'abord") |
|
|
return {"success": False, "count": 0} |
|
|
|
|
|
log(f"Trouvé {len(transcription_files)} fichier(s) de transcription:") |
|
|
for i, file in enumerate(transcription_files, 1): |
|
|
log(f" {i}. {file.name}") |
|
|
log("") |
|
|
|
|
|
|
|
|
generator = load_hf_model() |
|
|
if not generator: |
|
|
log("❌ Modèle Hugging Face indisponible") |
|
|
|
|
|
try: |
|
|
output_file.parent.mkdir(parents=True, exist_ok=True) |
|
|
with open(output_file, 'w', encoding='utf-8') as f: |
|
|
f.write("# ERREUR - Modèle indisponible\n") |
|
|
f.write("Erreur|P|MODÈLE INDISPONIBLE|000\n") |
|
|
log(f"📄 Fichier d'erreur créé: {output_file}") |
|
|
return {"success": False, "error": "Modèle Hugging Face indisponible", "count": 0} |
|
|
except Exception as write_error: |
|
|
log(f"❌ Erreur lors de la création du fichier d'erreur: {write_error}") |
|
|
return {"success": False, "error": "Modèle Hugging Face indisponible"} |
|
|
|
|
|
log("✅ Modèle chargé, début de l'analyse...") |
|
|
|
|
|
results = [] |
|
|
success_count = 0 |
|
|
total = len(transcription_files) |
|
|
|
|
|
for i, file_path in enumerate(transcription_files, 1): |
|
|
if cancel_fn and cancel_fn(): |
|
|
log("⏹️ Analyse annulée") |
|
|
break |
|
|
|
|
|
log(f"[{i}/{total}] ") |
|
|
|
|
|
transcription_text, metadata = read_transcription_file(file_path) |
|
|
if not transcription_text: |
|
|
log(f"✗ Impossible de lire {file_path.name}") |
|
|
if progress_fn: |
|
|
progress_fn(i, total) |
|
|
continue |
|
|
|
|
|
duration = get_audio_duration(file_path.name, input_dir) |
|
|
author_from_filename = extract_author_from_filename(file_path.name) |
|
|
|
|
|
result = analyze_transcription(generator, transcription_text, file_path.name) |
|
|
|
|
|
if result["success"]: |
|
|
format_indicators = detect_format_indicators(transcription_text) |
|
|
result = apply_duration_correction(result, duration, format_indicators) |
|
|
if result["auteur"].lower() in ["inconnu", "non mentionné", "auteur", ""]: |
|
|
result["auteur"] = author_from_filename |
|
|
result["duree"] = duration if duration else "000" |
|
|
result["filename_source"] = metadata.get("filename", file_path.name) |
|
|
log(f"✓ {result['auteur']} - {result['qualification']} - {result['titre']} - {result['duree']}") |
|
|
results.append(result) |
|
|
success_count += 1 |
|
|
else: |
|
|
log(f"✗ Erreur: {result['error']}") |
|
|
if "raw_response" in result: |
|
|
log(f" Réponse brute: {result['raw_response']}") |
|
|
|
|
|
if progress_fn: |
|
|
progress_fn(i, total) |
|
|
log("") |
|
|
|
|
|
|
|
|
if results: |
|
|
try: |
|
|
log(f"💾 Tentative de création du fichier: {output_file}") |
|
|
log(f"📁 Dossier parent existe: {output_file.parent.exists()}") |
|
|
|
|
|
|
|
|
output_file.parent.mkdir(parents=True, exist_ok=True) |
|
|
log(f"📁 Dossier parent créé/vérifié: {output_file.parent}") |
|
|
|
|
|
|
|
|
with open(output_file, 'w', encoding='utf-8') as f: |
|
|
f.write(f"# RÉSUMÉ DES BOB - {datetime.now().strftime('%d/%m/%Y %H:%M:%S')}\n") |
|
|
f.write(f"# Format: Auteur | Qualification | Titre | Durée\n") |
|
|
f.write("# Qualification: P=papier, P+S=papier+son, QR=question-réponse\n") |
|
|
f.write("# Durée: format MMss (ex: 1min04 = 104)\n") |
|
|
f.write("# " + "="*70 + "\n\n") |
|
|
for r in results: |
|
|
line = f"{r['auteur']} | {r['qualification']} | {r['titre']} | {r['duree']}" |
|
|
f.write(line + "\n") |
|
|
|
|
|
log(f"✅ Fichier écrit avec succès: {output_file}") |
|
|
log(f"📄 Contenu du fichier:") |
|
|
with open(output_file, 'r', encoding='utf-8') as f: |
|
|
content = f.read() |
|
|
for line in content.split('\n')[:10]: |
|
|
if line.strip(): |
|
|
log(f" {line}") |
|
|
|
|
|
log("=" * 60) |
|
|
log("RÉSUMÉ GÉNÉRÉ") |
|
|
log("=" * 60) |
|
|
log(f"Fichiers analysés: {total}") |
|
|
log(f"Analyses réussies: {success_count}") |
|
|
log(f"Analyses échouées: {total - success_count}") |
|
|
log(f"Fichier de résumé: {output_file}") |
|
|
log(f"📁 Contenu final du dossier output: {list(output_file.parent.iterdir())}") |
|
|
|
|
|
return {"success": True, "count": total, "ok": success_count, "results": results} |
|
|
|
|
|
except Exception as write_error: |
|
|
log(f"❌ Erreur lors de l'écriture du fichier: {write_error}") |
|
|
log(f"Traceback: {traceback.format_exc()}") |
|
|
return {"success": False, "error": f"Erreur d'écriture: {write_error}"} |
|
|
else: |
|
|
log("⚠️ Aucune analyse réussie, création d'un fichier factice...") |
|
|
|
|
|
try: |
|
|
output_file.parent.mkdir(parents=True, exist_ok=True) |
|
|
with open(output_file, 'w', encoding='utf-8') as f: |
|
|
f.write(f"# RÉSUMÉ DES BOB - {datetime.now().strftime('%d/%m/%Y %H:%M:%S')}\n") |
|
|
f.write("# Aucune analyse réussie\n") |
|
|
log(f"📄 Fichier factice créé: {output_file}") |
|
|
return {"success": False, "count": total, "ok": 0} |
|
|
except Exception as write_error: |
|
|
log(f"❌ Erreur lors de la création du fichier factice: {write_error}") |
|
|
return {"success": False, "error": f"Erreur d'écriture: {write_error}"} |
|
|
|