Spaces:
Paused
Paused
| """ | |
| Chat Blueprint | |
| Handles chat interface and conversation management | |
| """ | |
| from flask import Blueprint, render_template, request, jsonify, send_file | |
| from flask_login import login_required, current_user | |
| from models import db, Conversation, Message | |
| from gtts import gTTS | |
| import logging | |
| import os | |
| import tempfile | |
| import hashlib | |
| logger = logging.getLogger(__name__) | |
| chat_bp = Blueprint('chat', __name__) | |
| def chat_interface(): | |
| """Main chat interface page.""" | |
| # | |
| # --- MODIFICA CHIAVE DI DEBUG --- | |
| # | |
| # Se vediamo questo log, significa che il login e il redirect | |
| # hanno funzionato. Se NON lo vediamo, il problema è nel user_loader. | |
| # | |
| logger.info(f"--- CHAT INTERFACE: Accesso riuscito! L'utente {current_user.username} è sulla pagina della chat.") | |
| # | |
| # --- FINE MODIFICA DI DEBUG --- | |
| # | |
| # Get user's conversations | |
| conversations = Conversation.query.filter_by(user_id=current_user.id).order_by(Conversation.updated_at.desc()).all() | |
| # Get current conversation (most recent) or create new one | |
| current_conversation = None | |
| if conversations: | |
| current_conversation = conversations[0] | |
| else: | |
| # Create first conversation | |
| current_conversation = Conversation( | |
| user_id=current_user.id, | |
| title='Welcome Chat' | |
| ) | |
| db.session.add(current_conversation) | |
| db.session.commit() | |
| conversations = [current_conversation] | |
| return render_template( | |
| 'chat.html', | |
| conversations=conversations, | |
| current_conversation=current_conversation | |
| ) | |
| def send_message(): | |
| """ | |
| API endpoint to send a message and get a response. | |
| Expected JSON: | |
| { | |
| "message": "user message", | |
| "conversation_id": 123 (optional) | |
| } | |
| Returns JSON: | |
| { | |
| "success": true, | |
| "user_message": {...}, | |
| "assistant_message": {...}, | |
| "conversation_id": 123 | |
| } | |
| """ | |
| try: | |
| data = request.get_json() | |
| user_message_text = data.get('message', '').strip() | |
| conversation_id = data.get('conversation_id') | |
| if not user_message_text: | |
| return jsonify({'success': False, 'error': 'Message cannot be empty'}), 400 | |
| # Get or create conversation | |
| if conversation_id: | |
| conversation = Conversation.query.filter_by( | |
| id=conversation_id, | |
| user_id=current_user.id | |
| ).first() | |
| if not conversation: | |
| return jsonify({'success': False, 'error': 'Conversation not found'}), 404 | |
| else: | |
| # Create new conversation | |
| conversation = Conversation( | |
| user_id=current_user.id, | |
| title=user_message_text[:50] + ('...' if len(user_message_text) > 50 else '') | |
| ) | |
| db.session.add(conversation) | |
| db.session.commit() | |
| # Save user message | |
| user_message = Message( | |
| conversation_id=conversation.id, | |
| role='user', | |
| content=user_message_text | |
| ) | |
| db.session.add(user_message) | |
| db.session.commit() | |
| # Get conversation history (excluding the current message for now) | |
| history = conversation.get_message_history()[:-1] # Exclude the message we just added | |
| # Generate response using thread-safe model access | |
| # Import here to avoid circular import | |
| from app import generate_response_threadsafe | |
| logger.info(f"Generating response for user {current_user.username}") | |
| assistant_response_text = generate_response_threadsafe( | |
| prompt=user_message_text, | |
| conversation_history=history | |
| ) | |
| # Save assistant message | |
| assistant_message = Message( | |
| conversation_id=conversation.id, | |
| role='assistant', | |
| content=assistant_response_text | |
| ) | |
| db.session.add(assistant_message) | |
| db.session.commit() | |
| logger.info(f"Response generated successfully for user {current_user.username}") | |
| return jsonify({ | |
| 'success': True, | |
| 'user_message': user_message.to_dict(), | |
| 'assistant_message': assistant_message.to_dict(), | |
| 'conversation_id': conversation.id | |
| }) | |
| except Exception as e: | |
| logger.error(f"Error in send_message: {e}", exc_info=True) | |
| return jsonify({ | |
| 'success': False, | |
| 'error': 'An error occurred while processing your message' | |
| }), 500 | |
| def get_conversations(): | |
| """Get all conversations for the current user.""" | |
| conversations = Conversation.query.filter_by(user_id=current_user.id).order_by(Conversation.updated_at.desc()).all() | |
| return jsonify({ | |
| 'success': True, | |
| 'conversations': [ | |
| { | |
| 'id': c.id, | |
| 'title': c.title, | |
| 'created_at': c.created_at.isoformat(), | |
| 'updated_at': c.updated_at.isoformat(), | |
| 'message_count': len(c.messages) | |
| } | |
| for c in conversations | |
| ] | |
| }) | |
| def get_conversation(conversation_id): | |
| """Get a specific conversation with all messages.""" | |
| conversation = Conversation.query.filter_by( | |
| id=conversation_id, | |
| user_id=current_user.id | |
| ).first() | |
| if not conversation: | |
| return jsonify({'success': False, 'error': 'Conversation not found'}), 404 | |
| return jsonify({ | |
| 'success': True, | |
| 'conversation': { | |
| 'id': conversation.id, | |
| 'title': conversation.title, | |
| 'created_at': conversation.created_at.isoformat(), | |
| 'updated_at': conversation.updated_at.isoformat(), | |
| 'messages': [msg.to_dict() for msg in conversation.messages] | |
| } | |
| }) | |
| def new_conversation(): | |
| """Create a new conversation.""" | |
| try: | |
| conversation = Conversation( | |
| user_id=current_user.id, | |
| title='New Conversation' | |
| ) | |
| db.session.add(conversation) | |
| db.session.commit() | |
| return jsonify({ | |
| 'success': True, | |
| 'conversation': { | |
| 'id': conversation.id, | |
| 'title': conversation.title, | |
| 'created_at': conversation.created_at.isoformat(), | |
| 'updated_at': conversation.updated_at.isoformat() | |
| } | |
| }) | |
| except Exception as e: | |
| logger.error(f"Error creating new conversation: {e}") | |
| return jsonify({'success': False, 'error': 'Failed to create conversation'}), 500 | |
| def delete_conversation(conversation_id): | |
| """Delete a conversation.""" | |
| try: | |
| conversation = Conversation.query.filter_by( | |
| id=conversation_id, | |
| user_id=current_user.id | |
| ).first() | |
| if not conversation: | |
| return jsonify({'success': False, 'error': 'Conversation not found'}), 404 | |
| db.session.delete(conversation) | |
| db.session.commit() | |
| return jsonify({'success': True}) | |
| except Exception as e: | |
| logger.error(f"Error deleting conversation: {e}") | |
| return jsonify({'success': False, 'error': 'Failed to delete conversation'}), 500 | |
| def generate_tts(): | |
| """ | |
| Generate text-to-speech audio from text. | |
| Expected JSON: | |
| { | |
| "text": "text to convert to speech" | |
| } | |
| Returns: | |
| Audio file (MP3) for client-side playback | |
| """ | |
| try: | |
| data = request.get_json() | |
| text = data.get('text', '').strip() | |
| if not text: | |
| return jsonify({'success': False, 'error': 'Text cannot be empty'}), 400 | |
| # Create a unique filename based on text hash (for caching) | |
| text_hash = hashlib.md5(text.encode()).hexdigest() | |
| # Create audio directory if it doesn't exist | |
| audio_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'static', 'audio') | |
| os.makedirs(audio_dir, exist_ok=True) | |
| # Path for cached audio file | |
| audio_filename = f"tts_{text_hash}.mp3" | |
| audio_path = os.path.join(audio_dir, audio_filename) | |
| # Generate TTS if not already cached | |
| if not os.path.exists(audio_path): | |
| logger.info(f"Generating TTS for text (hash: {text_hash})") | |
| # Generate speech using Google TTS | |
| # lang='en' for English, slow=False for normal speed | |
| tts = gTTS(text=text, lang='en', slow=False) | |
| # Save to file | |
| tts.save(audio_path) | |
| logger.info(f"TTS saved to: {audio_path}") | |
| else: | |
| logger.info(f"Using cached TTS (hash: {text_hash})") | |
| # Return the audio file URL | |
| return jsonify({ | |
| 'success': True, | |
| 'audio_url': f'/static/audio/{audio_filename}' | |
| }) | |
| except Exception as e: | |
| logger.error(f"Error generating TTS: {e}", exc_info=True) | |
| return jsonify({ | |
| 'success': False, | |
| 'error': 'Failed to generate speech' | |
| }), 500 |