Spaces:
Running
Running
| from typing import TypedDict, List, Annotated, Dict, Any, Optional | |
| import operator | |
| import os | |
| from dotenv import load_dotenv | |
| import re | |
| import json | |
| # Load environment variables | |
| # Try loading from current directory first (backend), then parent | |
| load_dotenv(".env", override=True) | |
| load_dotenv("../.env", override=False) | |
| from langchain_google_genai import ChatGoogleGenerativeAI, GoogleGenerativeAIEmbeddings | |
| from langgraph.graph import StateGraph, END | |
| from langchain_core.messages import HumanMessage, AIMessage, BaseMessage | |
| from langchain_chroma import Chroma | |
| from tavily import TavilyClient | |
| from langchain_core.prompts import ChatPromptTemplate | |
| from pydantic import BaseModel, Field | |
| from langchain_core.output_parsers import StrOutputParser, JsonOutputParser | |
| from youtube_transcript_api import YouTubeTranscriptApi | |
| import yt_dlp | |
| # --- Configuration --- | |
| llm = ChatGoogleGenerativeAI(model="gemini-2.0-flash", temperature=0) | |
| # Initialize Embeddings & Vector Store | |
| embeddings = GoogleGenerativeAIEmbeddings(model="models/text-embedding-004") | |
| vector_store = Chroma(embedding_function=embeddings, persist_directory="./chroma_db") | |
| retriever = vector_store.as_retriever(search_kwargs={"k": 3}) | |
| def clear_vector_store(): | |
| """Clears the vector store.""" | |
| global vector_store, retriever | |
| try: | |
| # Delete the collection | |
| vector_store.delete_collection() | |
| # Re-initialize | |
| vector_store = Chroma(embedding_function=embeddings, persist_directory="./chroma_db") | |
| retriever = vector_store.as_retriever(search_kwargs={"k": 3}) | |
| return True | |
| except Exception as e: | |
| print(f"Error clearing vector store: {e}") | |
| return False | |
| # --- State Definition --- | |
| class AgentState(TypedDict): | |
| """The state of our Deep Research Agent.""" | |
| task: str | |
| plan: List[str] | |
| content: Annotated[List[str], operator.add] | |
| revision_number: int | |
| max_revisions: int | |
| final_report: str | |
| steps: Annotated[List[str], operator.add] | |
| messages: Annotated[List[BaseMessage], operator.add] | |
| youtube_url: str | |
| youtube_captions: str | |
| deep_research: bool # Flag to indicate if deep research is requested | |
| conversation_id: str # For RAG isolation | |
| # --- File Processing --- | |
| from langchain_community.document_loaders import PyPDFLoader, TextLoader | |
| from langchain_text_splitters import RecursiveCharacterTextSplitter | |
| def upload_file(file_path: str, conversation_id: str): | |
| """Process uploaded file and add to vector store with metadata.""" | |
| if file_path.endswith(".pdf"): | |
| loader = PyPDFLoader(file_path) | |
| else: | |
| loader = TextLoader(file_path) | |
| docs = loader.load() | |
| text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=200) | |
| splits = text_splitter.split_documents(docs) | |
| # Add metadata | |
| for split in splits: | |
| split.metadata["conversation_id"] = conversation_id | |
| vector_store.add_documents(splits) | |
| return splits | |
| # --- Data Models --- | |
| class Plan(BaseModel): | |
| """Plan to follow for research.""" | |
| steps: List[str] = Field(description="List of research steps/questions to investigate.") | |
| # --- Helpers --- | |
| def extract_video_id(url): | |
| """Extracts the video ID from a YouTube URL.""" | |
| # Support various URL formats | |
| regex = r"(?:v=|\/)([0-9A-Za-z_-]{11}).*" | |
| match = re.search(regex, url) | |
| if match: | |
| return match.group(1) | |
| return None | |
| def get_video_duration(url): | |
| """Gets video duration in seconds using yt-dlp.""" | |
| ydl_opts = {'quiet': True, 'no_warnings': True} | |
| try: | |
| with yt_dlp.YoutubeDL(ydl_opts) as ydl: | |
| info = ydl.extract_info(url, download=False) | |
| return info.get('duration', 0) | |
| except Exception as e: | |
| print(f"Warning: Could not check duration: {e}") | |
| return 0 # Return 0 to skip duration check on error | |
| # --- Nodes --- | |
| def router_node(state: AgentState): | |
| """ | |
| Routes based on task type: | |
| - YouTube URL → YouTube processor | |
| - "deep research" mentioned → Full research workflow | |
| - Otherwise → Quick response mode | |
| """ | |
| task = state["task"].lower() | |
| # Check for YouTube URL | |
| if "youtube.com" in task or "youtu.be" in task: | |
| url_match = re.search(r"(https?://[^\s]+)", state["task"]) | |
| if url_match: | |
| return { | |
| "youtube_url": url_match.group(1), | |
| "steps": ["Detected YouTube URL, routing to Video Processor"], | |
| "deep_research": False | |
| } | |
| # Check for deep research request | |
| # Only trigger deep research if explicitly requested | |
| if "deep research" in task or "deep dive" in task or "research report" in task: | |
| return { | |
| "steps": ["Deep research explicitly requested, routing to Research Planner"], | |
| "deep_research": True | |
| } | |
| # Default to quick response | |
| return { | |
| "steps": ["Routing to Quick Response mode"], | |
| "deep_research": False | |
| } | |
| def youtube_node(state: AgentState): | |
| """ | |
| Process YouTube video: Check duration, get captions, generate title. | |
| """ | |
| print("---YOUTUBE PROCESSOR---") | |
| url = state["youtube_url"] | |
| task = state["task"] | |
| video_id = extract_video_id(url) | |
| if not video_id: | |
| return {"final_report": "Error: Could not extract Video ID.", "steps": ["Failed to extract Video ID"]} | |
| # Check Duration | |
| try: | |
| duration = get_video_duration(url) | |
| if duration > 1200: # 20 minutes limit (increased) | |
| return {"final_report": f"Error: Video is too long ({duration//60} mins). Limit is 20 minutes.", "steps": ["Video rejected: Too long"]} | |
| except Exception as e: | |
| print(f"Error checking duration: {e}") | |
| # Continue anyway if duration check fails (might be network issue) | |
| # Get Captions | |
| transcript_text = "" | |
| try: | |
| print(f"Fetching captions for {video_id}") | |
| # Method 1: Try YouTubeTranscriptApi | |
| try: | |
| transcript_list = YouTubeTranscriptApi.list_transcripts(video_id) | |
| transcript = None | |
| try: | |
| transcript = transcript_list.find_transcript(['en']) | |
| except: | |
| # Get any available | |
| for t in transcript_list: | |
| transcript = t | |
| break | |
| if transcript: | |
| transcript_data = transcript.fetch() | |
| # Handle both dictionary and object formats | |
| transcript_text = " ".join([ | |
| entry.text if hasattr(entry, 'text') else entry['text'] | |
| for entry in transcript_data | |
| ]) | |
| except Exception as e: | |
| print(f"YouTubeTranscriptApi failed: {e}. Trying yt-dlp fallback...") | |
| # Method 2: Fallback to yt-dlp | |
| import requests | |
| ydl_opts = { | |
| 'skip_download': True, | |
| 'writesubtitles': True, | |
| 'writeautomaticsub': True, | |
| 'subtitleslangs': ['en'], | |
| 'quiet': True | |
| } | |
| with yt_dlp.YoutubeDL(ydl_opts) as ydl: | |
| info = ydl.extract_info(url, download=False) | |
| subtitles = info.get('subtitles', {}) | |
| auto_captions = info.get('automatic_captions', {}) | |
| sub_url = None | |
| if 'en' in subtitles: | |
| for fmt in subtitles['en']: | |
| if fmt['ext'] == 'json3': | |
| sub_url = fmt['url'] | |
| break | |
| if not sub_url and subtitles['en']: | |
| sub_url = subtitles['en'][0]['url'] | |
| elif 'en' in auto_captions: | |
| for fmt in auto_captions['en']: | |
| if fmt['ext'] == 'json3': | |
| sub_url = fmt['url'] | |
| break | |
| if not sub_url and auto_captions['en']: | |
| sub_url = auto_captions['en'][0]['url'] | |
| if sub_url: | |
| print(f"Fetching captions from: {sub_url}") | |
| r = requests.get(sub_url) | |
| data = r.json() | |
| events = data.get('events', []) | |
| for event in events: | |
| if 'segs' in event: | |
| for seg in event['segs']: | |
| if 'utf8' in seg: | |
| transcript_text += seg['utf8'] | |
| transcript_text += " " | |
| else: | |
| raise Exception("No captions found via yt-dlp") | |
| if not transcript_text: | |
| return {"final_report": "No captions available for this video.", "steps": ["No captions found"]} | |
| except Exception as e: | |
| print(f"Caption Error: {e}") | |
| import traceback | |
| traceback.print_exc() | |
| return {"final_report": f"Error fetching captions: {e}. \n\nPossible reasons:\n1. Video has no captions.\n2. Network restrictions.\n3. Video is private.", "steps": ["Failed to fetch captions"]} | |
| # Generate Title | |
| system = """You are a YouTube Expert. Analyze the provided video transcript and generate 3 catchy, AI-enhanced title options. | |
| Provide ONLY the 3 titles in this exact format: | |
| VIRAL: [title here] | |
| SEO: [title here] | |
| PROFESSIONAL: [title here] | |
| Do not add any other text, headers, or explanations. Just the 3 titles.""" | |
| prompt = ChatPromptTemplate.from_messages( | |
| [ | |
| ("system", system), | |
| ("human", "Transcript: {transcript}\n\nUser Request: {task}"), | |
| ] | |
| ) | |
| chain = prompt | llm | StrOutputParser() | |
| try: | |
| raw_titles = chain.invoke({"transcript": transcript_text[:10000], "task": task}) # Increased limit | |
| except Exception as e: | |
| raw_titles = "VIRAL: Error generating titles\nSEO: Error\nPROFESSIONAL: Error" | |
| print(f"Title generation error: {e}") | |
| # Manually construct a beautifully formatted report with MAXIMUM SPACING | |
| report = "# YouTube Video Analysis\n\n\n" | |
| report += "---\n\n\n" | |
| report += "## 🎬 AI Enhanced Title Options\n\n\n" | |
| # Parse the titles and format them nicely with extra spacing | |
| lines = raw_titles.strip().split('\n') | |
| for line in lines: | |
| if line.strip(): | |
| if ':' in line: | |
| category, title = line.split(':', 1) | |
| report += f"### {category.strip().title()}\n\n" | |
| report += f"**{title.strip()}**\n\n\n" | |
| # Check if user wants captions | |
| if "caption" in task.lower() or "transcript" in task.lower(): | |
| report += "---\n\n\n" | |
| report += "## 📝 Full Captions\n\n\n" | |
| report += f"```text\n{transcript_text[:5000]}...\n```\n\n(Truncated for display)\n\n" | |
| else: | |
| report += "---\n\n\n" | |
| report += "> **Note:** Captions are available for this video! Add 'with captions' to your request to see them.\n\n\n" | |
| # Add summary of video content | |
| summary_system = "Summarize the following video transcript in 3-5 bullet points." | |
| summary_prompt = ChatPromptTemplate.from_messages([("system", summary_system), ("human", "{transcript}")]) | |
| summary_chain = summary_prompt | llm | StrOutputParser() | |
| try: | |
| summary = summary_chain.invoke({"transcript": transcript_text[:10000]}) | |
| report += "## 📹 Video Summary\n\n" + summary + "\n\n" | |
| except: | |
| pass | |
| return { | |
| "final_report": report, | |
| "youtube_captions": transcript_text, | |
| "steps": ["Processed YouTube video: Checked duration, fetched captions, generated title"] | |
| } | |
| def quick_response_node(state: AgentState): | |
| """ | |
| Quick Response: Provides a direct answer without the full research workflow. | |
| Uses RAG if relevant documents exist, otherwise just LLM response. | |
| """ | |
| print("---QUICK RESPONSE MODE---") | |
| task = state["task"] | |
| # Try to get relevant context from vector store | |
| try: | |
| conversation_id = state.get("conversation_id") | |
| filter_dict = {"conversation_id": conversation_id} if conversation_id else None | |
| # Use similarity_search directly to support filtering | |
| docs = vector_store.similarity_search(task, k=3, filter=filter_dict) | |
| context = "\n\n".join([d.page_content for d in docs]) if docs else "" | |
| except Exception as e: | |
| print(f"Retriever error: {e}") | |
| context = "" | |
| # Check if we should do a quick web search (for real-time info) | |
| # If context is empty OR if the query implies real-time data | |
| real_time_keywords = ["price", "current", "news", "latest", "today", "now", "live", "rate", "stock", "weather", "forecast", "score", "result", "vs", "when", "where", "who", "what"] | |
| should_search = any(k in task.lower() for k in real_time_keywords) | |
| web_context = "" | |
| api_key = os.getenv("TAVILY_API_KEY") | |
| if should_search and api_key: | |
| try: | |
| print(f"Quick Search: Searching web for '{task}'...") | |
| tavily = TavilyClient(api_key=api_key) | |
| response = tavily.search(query=task, max_results=2) | |
| web_docs = response.get('results', []) | |
| if web_docs: | |
| print(f"Quick Search: Found {len(web_docs)} results") | |
| web_context = "\n\n".join([f"Source: {d['url']}\nContent: {d['content']}" for d in web_docs]) | |
| else: | |
| print("Quick Search: No results found") | |
| except Exception as e: | |
| print(f"Quick Search Error: {e}") | |
| # Combine context | |
| full_context = "" | |
| if context: | |
| full_context += f"### Internal Knowledge Base:\n{context}\n\n" | |
| if web_context: | |
| full_context += f"### Live Web Search Results:\n{web_context}\n\n" | |
| system = """You are a helpful AI assistant. Provide a clear, concise, and accurate answer to the user's question. | |
| Guidelines: | |
| 1. Be direct and to the point. | |
| 2. Use markdown formatting for readability. | |
| 3. If context is provided (Internal Knowledge or Web Search), USE IT to answer. | |
| 4. If the user asks for "price", "news", or "current" info, prioritize the Web Search Results. | |
| 5. Keep responses focused. Do NOT write a long report. | |
| 6. If the question requires extensive research, suggest the user ask for "deep research". | |
| 7. If you don't know the answer and have no context, use your general knowledge to answer as best as possible. | |
| """ | |
| if full_context: | |
| prompt = ChatPromptTemplate.from_messages([ | |
| ("system", system), | |
| ("human", "Context:\n{context}\n\nQuestion: {task}") | |
| ]) | |
| chain = prompt | llm | StrOutputParser() | |
| response = chain.invoke({"context": full_context, "task": task}) | |
| else: | |
| prompt = ChatPromptTemplate.from_messages([ | |
| ("system", system), | |
| ("human", "{task}") | |
| ]) | |
| chain = prompt | llm | StrOutputParser() | |
| response = chain.invoke({"task": task}) | |
| return { | |
| "final_report": response, | |
| "messages": [AIMessage(content=response)], | |
| "steps": ["Generated quick response" + (" with web search" if web_context else "")] | |
| } | |
| def plan_node(state: AgentState): | |
| """ | |
| Planner Agent: Breaks down the task into sub-questions. | |
| """ | |
| print("---PLANNER---") | |
| task = state["task"] | |
| system = """You are a Research Planner. Given a user topic, break it down into 3-5 distinct, specific research questions or sub-topics that need to be investigated to write a comprehensive report. | |
| Return the result as a JSON object with a single key 'steps' containing a list of strings. | |
| Example: {{"steps": ["question 1", "question 2", "question 3"]}}""" | |
| prompt = ChatPromptTemplate.from_messages( | |
| [ | |
| ("system", system), | |
| ("human", "{task}"), | |
| ] | |
| ) | |
| planner = prompt | llm | JsonOutputParser() | |
| plan_dict = planner.invoke({"task": task}) | |
| # Ensure we get a list of steps | |
| steps = plan_dict.get("steps", []) | |
| if isinstance(steps, str): | |
| steps = [steps] | |
| return { | |
| "plan": steps, | |
| "steps": [f"Created research plan with {len(steps)} steps: {', '.join(steps)}"] | |
| } | |
| def research_node(state: AgentState): | |
| """ | |
| Researcher Agent: Executes the plan. | |
| """ | |
| print("---RESEARCHER---") | |
| plan = state["plan"] | |
| content = [] | |
| steps_log = [] | |
| api_key = os.getenv("TAVILY_API_KEY") | |
| if not api_key: | |
| print("WARNING: TAVILY_API_KEY not found in environment variables!") | |
| tavily = TavilyClient(api_key=api_key) if api_key else None | |
| for step in plan: | |
| print(f"Researching: {step}") | |
| steps_log.append(f"Researching: {step}") | |
| # 1. Try Vector Store first | |
| conversation_id = state.get("conversation_id") | |
| filter_dict = {"conversation_id": conversation_id} if conversation_id else None | |
| docs = vector_store.similarity_search(step, k=3, filter=filter_dict) | |
| if docs: | |
| context = "\n".join([d.page_content for d in docs]) | |
| content.append(f"Source: Local Documents\nTopic: {step}\nContent: {context}") | |
| # 2. Always Web Search for "Deep" research to get fresh info | |
| if tavily: | |
| try: | |
| print(f"Searching web for: {step}") | |
| response = tavily.search(query=step, max_results=2) | |
| web_docs = response.get('results', []) | |
| if web_docs: | |
| print(f"Found {len(web_docs)} web results for: {step}") | |
| web_context = "\n".join([d["content"] for d in web_docs]) | |
| content.append(f"Source: Web Search\nTopic: {step}\nContent: {web_context}") | |
| else: | |
| print(f"No web results found for: {step}") | |
| except Exception as e: | |
| print(f"Web search error: {e}") | |
| else: | |
| print("Skipping web search (Tavily not initialized)") | |
| steps_log.append("Skipped web search (Tavily key missing)") | |
| return {"content": content, "steps": steps_log} | |
| def writer_node(state: AgentState): | |
| """ | |
| Writer Agent: Synthesizes the report. | |
| """ | |
| print("---WRITER---") | |
| task = state["task"] | |
| content = state["content"] | |
| system = """You are a Professional Research Writer. Your goal is to write a comprehensive, well-structured Markdown report based on the provided research notes. | |
| Guidelines: | |
| 1. **Structure**: Start with an engaging Title (#) and Introduction. | |
| 2. **Headers**: Use headers (##, ###) to organize sections. **IMPORTANT**: Always add a blank line before and after every header. | |
| 3. **Content**: Synthesize the information. Do not just list facts. | |
| 4. **Formatting**: | |
| - Use **bold** for key terms. | |
| - Use bullet points for lists (ensure there is a blank line before the list starts). | |
| - Use > Blockquotes for important summaries. | |
| 5. **Citations**: If the notes mention specific sources, cite them. | |
| 6. **Conclusion**: End with a strong conclusion. | |
| Make the report visually appealing and easy to read. | |
| """ | |
| prompt = ChatPromptTemplate.from_messages( | |
| [ | |
| ("system", system), | |
| ("human", "Task: {task}\n\nResearch Notes:\n{content}"), | |
| ] | |
| ) | |
| chain = prompt | llm | StrOutputParser() | |
| report = chain.invoke({"task": task, "content": "\n\n".join(content)}) | |
| return { | |
| "final_report": report, | |
| "messages": [AIMessage(content=report)], # For compatibility with main.py | |
| "steps": ["Wrote final report"] | |
| } | |
| def route_task(state: AgentState): | |
| """ | |
| Conditional edge to route based on task type. | |
| """ | |
| # YouTube processing | |
| if state.get("youtube_url"): | |
| return "youtube_node" | |
| # Deep research workflow | |
| if state.get("deep_research", False): | |
| return "planner" | |
| # Quick response for everything else | |
| return "quick_response" | |
| # --- Graph Construction --- | |
| workflow = StateGraph(AgentState) | |
| workflow.add_node("router", router_node) | |
| workflow.add_node("youtube_node", youtube_node) | |
| workflow.add_node("quick_response", quick_response_node) | |
| workflow.add_node("planner", plan_node) | |
| workflow.add_node("researcher", research_node) | |
| workflow.add_node("writer", writer_node) | |
| workflow.set_entry_point("router") | |
| workflow.add_conditional_edges( | |
| "router", | |
| route_task, | |
| { | |
| "youtube_node": "youtube_node", | |
| "quick_response": "quick_response", | |
| "planner": "planner" | |
| } | |
| ) | |
| workflow.add_edge("youtube_node", END) | |
| workflow.add_edge("quick_response", END) | |
| workflow.add_edge("planner", "researcher") | |
| workflow.add_edge("researcher", "writer") | |
| workflow.add_edge("writer", END) | |
| app = workflow.compile() | |