Spaces:
Paused
Paused
| #!/usr/bin/env python3 | |
| """ | |
| Life Coach Model - DEBUG VERSION | |
| Versione con logging estensivo per diagnosticare blocchi su HF Spaces | |
| """ | |
| import os | |
| import torch | |
| import logging | |
| import time | |
| import traceback | |
| import gc | |
| import threading | |
| from datetime import datetime | |
| from transformers import ( | |
| AutoModelForCausalLM, | |
| AutoTokenizer, | |
| StoppingCriteria, | |
| StoppingCriteriaList, | |
| ) | |
| from peft import PeftModel | |
| from pathlib import Path | |
| import re | |
| # ---------------------------------------------------------------------- | |
| # Installa psutil se non presente (per HF Spaces) | |
| # ---------------------------------------------------------------------- | |
| try: | |
| import psutil | |
| except ImportError: | |
| import subprocess | |
| subprocess.check_call(["pip", "install", "psutil", "--break-system-packages"]) | |
| import psutil | |
| # ---------------------------------------------------------------------- | |
| # Logging ultra-dettagliato | |
| # ---------------------------------------------------------------------- | |
| logging.basicConfig( | |
| level=logging.DEBUG, | |
| format='%(asctime)s - [PID:%(process)d] - %(levelname)s - %(message)s', | |
| ) | |
| logger = logging.getLogger(__name__) | |
| def log_system_status(prefix: str = "") -> None: | |
| """Log dettagliato dello stato del sistema.""" | |
| logger.info(f"{'=' * 60}") | |
| logger.info(f"{prefix} SYSTEM STATUS CHECK") | |
| logger.info(f"PID: {os.getpid()}") | |
| logger.info(f"Thread ID: {threading.get_ident()}") | |
| cpu_percent = psutil.cpu_percent(interval=0.1) | |
| logger.info(f"CPU Usage: {cpu_percent}%") | |
| mem = psutil.virtual_memory() | |
| logger.info( | |
| f"RAM: {mem.used/1e9:.2f}GB used / {mem.total/1e9:.2f}GB total ({mem.percent}%)" | |
| ) | |
| if torch.cuda.is_available(): | |
| try: | |
| gpu_mem = torch.cuda.mem_get_info() | |
| logger.info( | |
| f"GPU Memory: {gpu_mem[0]/1e9:.2f}GB free / {gpu_mem[1]/1e9:.2f}GB total" | |
| ) | |
| logger.info(f"GPU Allocated: {torch.cuda.memory_allocated()/1e9:.2f}GB") | |
| logger.info(f"GPU Reserved: {torch.cuda.memory_reserved()/1e9:.2f}GB") | |
| logger.info(f"CUDA Device: {torch.cuda.get_device_name()}") | |
| except Exception as e: | |
| logger.error(f"Error getting GPU info: {e}") | |
| logger.info(f"{'=' * 60}") | |
| # ---------------------------------------------------------------------- | |
| # LifeCoachModel | |
| # ---------------------------------------------------------------------- | |
| class LifeCoachModel: | |
| def __init__( | |
| self, | |
| model_name: str = "microsoft/Phi-4", | |
| model_save_path: str = "data/life_coach_model", | |
| train_file: str | None = None, | |
| ): | |
| logger.info("[INIT] Starting LifeCoachModel initialization") | |
| logger.info(f"[INIT] Model name: {model_name}") | |
| logger.info(f"[INIT] Save path: {model_save_path}") | |
| log_system_status("[INIT-START]") | |
| self.model_name = model_name | |
| self.model_save_path = model_save_path | |
| self.train_file = train_file | |
| # ------------------------------------------------------------------ | |
| # Device detection | |
| # ------------------------------------------------------------------ | |
| if torch.cuda.is_available(): | |
| self.device = torch.device("cuda") | |
| logger.info("[INIT] CUDA is available") | |
| logger.info(f"[INIT] CUDA version: {torch.version.cuda}") | |
| logger.info(f"[INIT] PyTorch version: {torch.__version__}") | |
| torch.cuda.empty_cache() | |
| gc.collect() | |
| else: | |
| self.device = torch.device("cpu") | |
| logger.warning("[INIT] CUDA not available, using CPU") | |
| logger.info(f"[INIT] Device set to: {self.device}") | |
| self.tokenizer = None | |
| self.model = None | |
| # ------------------------------------------------------------------ | |
| # System prompt (esattamente come nella versione non-debug) | |
| # ------------------------------------------------------------------ | |
| self.system_prompt = """You are Robert, a friendly and experienced life coach. Here's your background: | |
| About You: | |
| - Name: Robert (Bob to friends) | |
| - Age: 42 years old | |
| - Experience: 15 years as a certified life coach and motivational speaker | |
| - Education: Master's degree in Psychology from UC Berkeley | |
| - Specialties: Personal growth, career transitions, work-life balance, goal setting, stress management | |
| - Personal: Married with two kids, enjoy hiking and meditation in your free time | |
| - Approach: Warm, empathetic, practical, and solution-focused | |
| Your Coaching Style: | |
| - Respond ONLY to what the user actually tells you - never make assumptions about their problems | |
| - Start conversations in a welcoming, open manner | |
| - Ask clarifying questions to understand their situation better | |
| - Provide practical, actionable advice based on what they share | |
| - Be encouraging and positive, but also honest and realistic | |
| - Keep responses concise and focused (2-4 sentences usually) | |
| - Share brief personal insights when relevant, but keep the focus on the client | |
| Important: Never assume clients have problems they haven't mentioned. Let them guide the conversation and share what's on their mind.""" | |
| logger.info("[INIT] LifeCoachModel initialization complete") | |
| log_system_status("[INIT-END]") | |
| # ------------------------------------------------------------------ | |
| # Tokenizer | |
| # ------------------------------------------------------------------ | |
| def load_tokenizer(self) -> None: | |
| logger.info("[TOKENIZER] Loading tokenizer...") | |
| start = time.time() | |
| self.tokenizer = AutoTokenizer.from_pretrained( | |
| self.model_name, | |
| trust_remote_code=True, | |
| cache_dir=os.environ.get("HF_HOME", None), | |
| ) | |
| logger.info(f"[TOKENIZER] Loaded in {time.time() - start:.2f}s") | |
| logger.info(f"[TOKENIZER] Vocab size: {self.tokenizer.vocab_size}") | |
| if self.tokenizer.pad_token is None: | |
| self.tokenizer.pad_token = self.tokenizer.eos_token | |
| self.tokenizer.pad_token_id = self.tokenizer.eos_token_id | |
| logger.info("[TOKENIZER] pad_token set to eos_token") | |
| # ------------------------------------------------------------------ | |
| # Model loading – **same as old version** + explicit offload folder | |
| # ------------------------------------------------------------------ | |
| def load_model(self, fine_tuned: bool = True) -> None: | |
| """Load Phi-4 model with 4-bit quantization (fits in 24GB GPU).""" | |
| logger.info(f"[MODEL] Loading model (fine_tuned={fine_tuned})") | |
| log_system_status("[MODEL-LOAD-START]") | |
| # Resolve adapter path | |
| if fine_tuned: | |
| adapter_path = Path(self.model_save_path) | |
| alt_path = Path(f"./{self.model_save_path}") | |
| if alt_path.exists() and (alt_path / "adapter_model.safetensors").exists(): | |
| model_path = str(alt_path) | |
| logger.info(f"[MODEL] Adapter found at alternate path: {model_path}") | |
| elif adapter_path.exists() and (adapter_path / "adapter_model.safetensors").exists(): | |
| model_path = str(adapter_path) | |
| logger.info(f"[MODEL] Adapter found at primary path: {model_path}") | |
| else: | |
| logger.error("[MODEL] No adapter found → loading base model") | |
| fine_tuned = False | |
| else: | |
| model_path = None | |
| try: | |
| # 4-bit quantization config (fits ~9.5GB VRAM) | |
| from transformers import BitsAndBytesConfig | |
| quantization_config = BitsAndBytesConfig( | |
| load_in_4bit=True, | |
| bnb_4bit_quant_type="nf4", | |
| bnb_4bit_compute_dtype=torch.float16, | |
| bnb_4bit_use_double_quant=True, | |
| ) | |
| logger.info("[MODEL] Using 4-bit NF4 quantization") | |
| # Load base model with 4-bit | |
| logger.info("[MODEL] Loading base model from HuggingFace...") | |
| start = time.time() | |
| self.model = AutoModelForCausalLM.from_pretrained( | |
| self.model_name, | |
| device_map="auto", | |
| torch_dtype=torch.float16, | |
| trust_remote_code=True, | |
| quantization_config=quantization_config, | |
| cache_dir=os.environ.get("HF_HOME", None), | |
| ) | |
| logger.info(f"[MODEL] Base model loaded in {time.time() - start:.2f}s") | |
| log_system_status("[MODEL-AFTER-BASE]") | |
| # Load PEFT adapter (no offload needed with 4-bit) | |
| if fine_tuned: | |
| logger.info(f"[MODEL] Loading PEFT adapter from {model_path}") | |
| start = time.time() | |
| self.model = PeftModel.from_pretrained( | |
| self.model, | |
| model_path, | |
| device_map="auto", | |
| ) | |
| logger.info(f"[MODEL] Adapter loaded in {time.time() - start:.2f}s") | |
| self.model.eval() | |
| logger.info(f"[MODEL] Parameters: {sum(p.numel() for p in self.model.parameters())/1e9:.2f}B") | |
| log_system_status("[MODEL-LOAD-COMPLETE]") | |
| logger.info("[MODEL] Model loading COMPLETE") | |
| except Exception as e: | |
| logger.error("[MODEL] CRITICAL ERROR during model loading") | |
| logger.error(f"[MODEL] {type(e).__name__}: {e}") | |
| logger.error(f"[MODEL] Traceback:\n{traceback.format_exc()}") | |
| raise | |
| # ------------------------------------------------------------------ | |
| # Stopping criteria (stop on <|end|>) | |
| # ------------------------------------------------------------------ | |
| def _get_stopping_criteria(self) -> StoppingCriteriaList: | |
| stop_token = "<|end|>" | |
| stop_ids = self.tokenizer.encode(stop_token, add_special_tokens=False) | |
| class StopOnToken(StoppingCriteria): | |
| def __init__(self, ids): | |
| self.ids = ids | |
| def __call__(self, input_ids, scores, **kwargs): | |
| return input_ids[0][-1].item() in self.ids | |
| return StoppingCriteriaList([StopOnToken(stop_ids)]) | |
| # ------------------------------------------------------------------ | |
| # Generation | |
| # ------------------------------------------------------------------ | |
| # ------------------------------------------------------------------ | |
| # Generation (Patched Version) | |
| # ------------------------------------------------------------------ | |
| def generate_response( | |
| self, | |
| prompt: str, | |
| max_new_tokens: int = 256, | |
| conversation_history: list | None = None, | |
| ) -> str: | |
| logger.info(f"{'=' * 80}") | |
| logger.info("[GENERATE] STARTING GENERATION") | |
| logger.info(f"[GENERATE] Prompt length: {len(prompt)} chars") | |
| logger.info(f"[GENERATE] Max new tokens: {max_new_tokens}") | |
| logger.info(f"[GENERATE] History items: {len(conversation_history or [])}") | |
| log_system_status("[GENERATE-START]") | |
| try: | |
| # -------------------------------------------------------------- | |
| # 1. Build full prompt with Phi-4 chat template | |
| # -------------------------------------------------------------- | |
| full_prompt = f"<|system|>\n{self.system_prompt}<|end|>\n" | |
| if conversation_history: | |
| for msg in conversation_history: | |
| role = msg.get("role", "user") | |
| content = msg.get("content", "") | |
| full_prompt += f"<|{role}|>\n{content}<|end|>\n" | |
| full_prompt += f"<|user|>\n{prompt}<|end|>\n<|assistant|>\n" | |
| logger.info(f"[GENERATE-1] Full prompt length: {len(full_prompt)} chars") | |
| # -------------------------------------------------------------- | |
| # 2. Tokenize | |
| # -------------------------------------------------------------- | |
| inputs = self.tokenizer( | |
| full_prompt, | |
| return_tensors="pt", | |
| truncation=True, | |
| max_length=2048, | |
| ).to(self.device) | |
| # -------------------------------------------------------------- | |
| # 3. Generate | |
| # -------------------------------------------------------------- | |
| logger.info("[GENERATE] Calling model.generate()") | |
| start = time.time() | |
| with torch.no_grad(): | |
| outputs = self.model.generate( | |
| **inputs, | |
| max_new_tokens=max_new_tokens, | |
| temperature=0.7, | |
| do_sample=True, | |
| top_p=0.9, | |
| repetition_penalty=1.2, | |
| pad_token_id=self.tokenizer.pad_token_id, | |
| eos_token_id=self.tokenizer.eos_token_id, | |
| stopping_criteria=self._get_stopping_criteria(), | |
| ) | |
| gen_time = time.time() - start | |
| logger.info(f"[GENERATE] Generation took {gen_time:.2f}s") | |
| logger.info(f"[GENERATE] Generated {outputs.shape[1] - inputs['input_ids'].shape[1]} tokens") | |
| # -------------------------------------------------------------- | |
| # 4. Decode & clean (Logica di Pulizia Aggiornata) | |
| # -------------------------------------------------------------- | |
| full_text = self.tokenizer.decode(outputs[0], skip_special_tokens=False) | |
| response = "" | |
| if "<|assistant|>" in full_text: | |
| response = full_text.split("<|assistant|>")[-1] | |
| # PASSAGGIO 1: Rimuove il tag di fine completo (<|end|>) e gli spazi | |
| response = re.sub(r"\s*<\|end\|>\s*$", "", response) | |
| # PASSAGGIO 2: Rimuove tutti gli altri tag completi (e.g., <|system|>) | |
| response = re.sub(r"<\|.*?\|>", "", response) | |
| # PASSAGGIO 3: Rimuove il frammento incompleto di tag alla fine (e.g., "<|") | |
| # Usa '(?s)' per robustezza, catturando qualsiasi cosa inizi con <| e non si chiuda | |
| # (con il carattere >) fino alla fine della stringa. | |
| response = re.sub(r"(?s)\s*<\|[^>]*$", "", response) | |
| # PASSAGGIO 4: Pulisce gli spazi extra | |
| response = response.strip() | |
| else: | |
| # Fallback (caso in cui <|assistant|> non è nella risposta) | |
| response = re.sub(r"\s*<\|end\|>\s*$", "", full_text) | |
| response = re.sub(r"<\|.*?\|>", "", response) | |
| response = re.sub(r"(?s)\s*<\|[^>]*$", "", response).strip() | |
| logger.info(f"[GENERATE] Response length: {len(response)} chars") | |
| logger.info(f"[GENERATE] Preview: {response[:100]}...") | |
| # -------------------------------------------------------------- | |
| # 5. Cleanup | |
| # -------------------------------------------------------------- | |
| del inputs, outputs | |
| torch.cuda.empty_cache() | |
| gc.collect() | |
| log_system_status("[GENERATE-COMPLETE]") | |
| logger.info("[GENERATE] GENERATION SUCCESSFUL") | |
| logger.info(f"{'=' * 80}") | |
| return response | |
| except Exception as e: | |
| logger.error("[GENERATE] ERROR DURING GENERATION") | |
| logger.error(f"{type(e).__name__}: {e}") | |
| logger.error(traceback.format_exc()) | |
| return "I apologize, but I encountered an error while generating a response. Please try again." | |
| # ---------------------------------------------------------------------- | |
| # Test entry point | |
| # ---------------------------------------------------------------------- | |
| if __name__ == "__main__": | |
| logger.info("Running debug test...") | |
| model = LifeCoachModel() | |
| model.load_tokenizer() | |
| model.load_model(fine_tuned=True) | |
| test_resp = model.generate_response("Hello, how are you?", max_new_tokens=50) | |
| logger.info(f"Test response: {test_resp}") |