Sample / app.py
anu151105's picture
Update app.py
30bea59 verified
import os
import gradio as gr
import torch
import re
import time
from transformers import AutoTokenizer, AutoModelForCausalLM, pipeline
from huggingface_hub import hf_hub_download, snapshot_download
import json
from typing import Dict, List, Any, Optional, Union
# Import agent modules
from agent_reasoning import ReasoningEngine
from agent_tasks import TaskExecutor
from agent_memory import MemoryManager
class ResuRankAgent:
"""Autonomous AI Agent similar to Manus AI
This agent can:
1. Process user queries and generate responses
2. Perform reasoning through chain-of-thought
3. Execute tasks based on user instructions
4. Maintain conversation context
"""
def __init__(self, model_id="distilgpt2", use_cache=True, test_mode=False):
"""Initialize the ResuRank Agent
Args:
model_id: Hugging Face model ID to use for the agent
use_cache: Whether to use cached models from Hugging Face Hub
test_mode: Whether to run in test mode with minimal resources
"""
self.model_id = model_id
self.test_mode = test_mode
# Use CPU for test mode, otherwise check for CUDA
if test_mode:
self.device = "cpu"
print("Running in test mode on CPU with minimal resources")
else:
self.device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"Using device: {self.device}")
# Load model and tokenizer from Hugging Face Hub
print(f"Loading model {model_id} from Hugging Face Hub...")
try:
# Configure model loading parameters based on mode
model_kwargs = {
"torch_dtype": torch.float32, # Use float32 for better compatibility
}
# Check if Accelerate is available for low_cpu_mem_usage and device_map
try:
import accelerate
model_kwargs["low_cpu_mem_usage"] = True
# Add device map only if not in test mode
if not test_mode:
model_kwargs["device_map"] = "auto"
if self.device == "cuda":
model_kwargs["torch_dtype"] = torch.float16
except ImportError:
print("Accelerate library not found, disabling low_cpu_mem_usage and device_map")
# Continue without these options
# Add cache directory if using cache
if use_cache:
model_kwargs["cache_dir"] = "./.cache"
print("Using cached models if available...")
self.tokenizer = AutoTokenizer.from_pretrained(model_id, cache_dir="./.cache")
else:
print("Downloading models from Hugging Face Hub...")
self.tokenizer = AutoTokenizer.from_pretrained(model_id)
# Load the model with optimized parameters
self.model = AutoModelForCausalLM.from_pretrained(model_id, **model_kwargs)
print(f"Successfully loaded model {model_id}")
except Exception as e:
print(f"Error loading model: {str(e)}")
print("Falling back to smaller model...")
fallback_model = "distilgpt2" # Use a smaller model as fallback
self.model_id = fallback_model
try:
# Try loading the fallback model with minimal parameters
self.tokenizer = AutoTokenizer.from_pretrained(fallback_model, cache_dir="./.cache")
self.model = AutoModelForCausalLM.from_pretrained(
fallback_model,
torch_dtype=torch.float32,
low_cpu_mem_usage=True,
cache_dir="./.cache"
)
print(f"Successfully loaded fallback model {fallback_model}")
except Exception as fallback_error:
print(f"Error loading fallback model: {str(fallback_error)}")
raise RuntimeError("Failed to load both primary and fallback models")
# Initialize agent components
self.reasoning_engine = ReasoningEngine(self.model, self.tokenizer, self.device)
self.memory_manager = MemoryManager(max_history_length=20)
self.task_executor = TaskExecutor(self.reasoning_engine)
def process_query(self, query: str, use_reasoning: bool = True) -> Dict[str, Any]:
"""Process a user query and generate a response
Args:
query: User query text
use_reasoning: Whether to use chain-of-thought reasoning
Returns:
Dictionary containing response and metadata
"""
# Add query to conversation history
self.memory_manager.add_message("user", query)
start_time = time.time()
# Check if this is a task execution request
is_task_request = self._is_task_request(query)
# Process the query with appropriate method
if is_task_request:
# Handle as a task execution request
task_result = self.execute_task(query)
response = f"I've executed your task. {task_result.get('result', '')}\n\nStatus: {task_result.get('status', 'unknown')}"
reasoning = task_result.get('plan', '')
elif use_reasoning:
# Use chain-of-thought reasoning
# Enhance with context from memory
facts = self.memory_manager.format_facts_for_prompt()
context = self.memory_manager.format_conversation_for_prompt(max_turns=5)
# Create an enhanced query with context
enhanced_query = f"{facts}\n\nRecent conversation:\n{context}\n\nCurrent query: {query}"
result = self.reasoning_engine.chain_of_thought(enhanced_query)
response = result["answer"]
reasoning = result["reasoning"]
else:
# Simple response generation without reasoning
conversation_prompt = self.memory_manager.format_conversation_for_prompt(max_turns=10)
facts_prompt = self.memory_manager.format_facts_for_prompt()
prompt = f"{facts_prompt}\n\n{conversation_prompt}\nassistant: "
response = self.reasoning_engine.generate_text(prompt)
reasoning = None
# Add response to conversation history
self.memory_manager.add_message("assistant", response)
# Extract any important facts from the conversation
self._extract_facts(query, response)
processing_time = time.time() - start_time
return {
"response": response,
"reasoning": reasoning,
"processing_time": processing_time,
"timestamp": time.time()
}
def _is_task_request(self, query: str) -> bool:
"""Determine if a query is a task execution request
Args:
query: The user query
Returns:
True if the query appears to be a task request, False otherwise
"""
# Keywords that suggest a task execution request
task_keywords = [
"execute", "perform", "run", "do", "complete", "finish",
"task", "job", "work", "action", "operation", "function",
"can you", "please", "help me", "i need", "i want"
]
# Check if query contains task-related keywords
query_lower = query.lower()
for keyword in task_keywords:
if keyword in query_lower:
return True
return False
def _extract_facts(self, query: str, response: str) -> None:
"""Extract important facts from the conversation
Args:
query: User query
response: Agent response
"""
# Extract personal information
self._extract_personal_info(query)
# Extract preferences
self._extract_preferences(query)
# Extract task-related information
self._extract_task_info(query)
# Use the reasoning engine to identify important facts
self._extract_with_reasoning(query, response)
def _extract_personal_info(self, text: str) -> None:
"""Extract personal information from text
Args:
text: Text to extract information from
"""
text_lower = text.lower()
# Extract name
if "my name is" in text_lower or "i am called" in text_lower or "i'm called" in text_lower:
name_patterns = [
r"my name is ([\w\s]+)[.\,]?",
r"i am called ([\w\s]+)[.\,]?",
r"i'm called ([\w\s]+)[.\,]?"
]
for pattern in name_patterns:
name_match = re.search(pattern, text_lower)
if name_match:
name = name_match.group(1).strip()
self.memory_manager.add_important_fact(f"User's name is {name}", "user")
break
# Extract location
if "i am from" in text_lower or "i'm from" in text_lower or "i live in" in text_lower:
location_patterns = [
r"i am from ([\w\s]+)[.\,]?",
r"i'm from ([\w\s]+)[.\,]?",
r"i live in ([\w\s]+)[.\,]?"
]
for pattern in location_patterns:
location_match = re.search(pattern, text_lower)
if location_match:
location = location_match.group(1).strip()
self.memory_manager.add_important_fact(f"User is from {location}", "user")
break
# Extract profession/occupation
if "i work as" in text_lower or "i am a" in text_lower or "i'm a" in text_lower:
profession_patterns = [
r"i work as a[n]? ([\w\s]+)[.\,]?",
r"i am a[n]? ([\w\s]+)[.\,]?",
r"i'm a[n]? ([\w\s]+)[.\,]?"
]
for pattern in profession_patterns:
profession_match = re.search(pattern, text_lower)
if profession_match:
profession = profession_match.group(1).strip()
self.memory_manager.add_important_fact(f"User works as a {profession}", "user")
break
def _extract_preferences(self, text: str) -> None:
"""Extract user preferences from text
Args:
text: Text to extract information from
"""
text_lower = text.lower()
# Extract likes
if "i like" in text_lower or "i love" in text_lower or "i enjoy" in text_lower:
like_patterns = [
r"i like ([\w\s]+)[.\,]?",
r"i love ([\w\s]+)[.\,]?",
r"i enjoy ([\w\s]+)[.\,]?"
]
for pattern in like_patterns:
like_match = re.search(pattern, text_lower)
if like_match:
like = like_match.group(1).strip()
self.memory_manager.add_important_fact(f"User likes {like}", "user")
break
# Extract dislikes
if "i don't like" in text_lower or "i hate" in text_lower or "i dislike" in text_lower:
dislike_patterns = [
r"i don't like ([\w\s]+)[.\,]?",
r"i hate ([\w\s]+)[.\,]?",
r"i dislike ([\w\s]+)[.\,]?"
]
for pattern in dislike_patterns:
dislike_match = re.search(pattern, text_lower)
if dislike_match:
dislike = dislike_match.group(1).strip()
self.memory_manager.add_important_fact(f"User dislikes {dislike}", "user")
break
def _extract_task_info(self, text: str) -> None:
"""Extract task-related information from text
Args:
text: Text to extract information from
"""
text_lower = text.lower()
# Extract goals
if "my goal is" in text_lower or "i want to" in text_lower or "i need to" in text_lower:
goal_patterns = [
r"my goal is to ([\w\s]+)[.\,]?",
r"i want to ([\w\s]+)[.\,]?",
r"i need to ([\w\s]+)[.\,]?"
]
for pattern in goal_patterns:
goal_match = re.search(pattern, text_lower)
if goal_match:
goal = goal_match.group(1).strip()
self.memory_manager.add_important_fact(f"User's goal is to {goal}", "user")
break
def run_test_case(self) -> Dict[str, Any]:
"""Run a test case to demonstrate the agent's capabilities with minimal resources
This method is useful for testing the agent on resource-constrained environments
like Hugging Face Spaces or during development.
Returns:
Dictionary containing test results and performance metrics
"""
print("Running test case with minimal resources...")
start_time = time.time()
# Simple test query that doesn't require extensive reasoning
test_query = "What can you help me with?"
# Process the query with minimal settings
test_response = self.process_query(test_query, use_reasoning=False)
# Calculate performance metrics
processing_time = time.time() - start_time
memory_usage = self._estimate_memory_usage()
# Return test results
return {
"status": "success",
"model_id": self.model_id,
"device": self.device,
"test_query": test_query,
"test_response": test_response["response"],
"processing_time": processing_time,
"memory_usage_mb": memory_usage,
"timestamp": time.time()
}
def _estimate_memory_usage(self) -> float:
"""Estimate the memory usage of the model
Returns:
Estimated memory usage in MB
"""
try:
import psutil
process = psutil.Process(os.getpid())
memory_info = process.memory_info()
return memory_info.rss / (1024 * 1024) # Convert to MB
except ImportError:
return 0.0 # Return 0 if psutil is not available
def _extract_with_reasoning(self, query: str, response: str) -> None:
"""Use the reasoning engine to extract important facts
Args:
query: User query
response: Agent response
"""
# Only use this for longer queries to avoid unnecessary processing
if len(query) < 50:
return
extraction_prompt = f"""Extract important facts from this conversation:
User: {query}
Assistant: {response}
List of important facts (one per line):
1. """
try:
facts_text = self.reasoning_engine.generate_text(extraction_prompt, max_length=256)
# Parse the facts
for line in facts_text.split('\n'):
line = line.strip()
if line and (line[0].isdigit() or line.startswith('- ')):
# Remove numbering or bullet points
fact = re.sub(r'^\d+\.\s*|^-\s*', '', line).strip()
if fact and len(fact) > 10: # Only add substantial facts
self.memory_manager.add_important_fact(fact, "inference")
except Exception as e:
print(f"Error extracting facts with reasoning: {str(e)}")
# Continue without adding facts
def execute_task(self, task_description: str) -> Dict[str, Any]:
"""Execute a task based on the description
Args:
task_description: Description of the task to execute
Returns:
Dictionary containing task results and status
"""
return self.task_executor.execute_task(task_description)
def get_status(self) -> Dict[str, Any]:
"""Get the current status of the agent
Returns:
Dictionary containing agent status information
"""
memory_stats = self.memory_manager.get_memory_stats()
task_status = self.task_executor.get_task_status()
return {
"model_id": self.model_id,
"device": self.device,
"conversation_turns": memory_stats["conversation_turns"],
"important_facts": memory_stats["important_facts"],
"current_task": task_status["current_task"],
"task_status": task_status["status"]
}
def clear_conversation(self) -> None:
"""Clear the conversation history"""
self.memory_manager.clear_conversation_history()
def process_document(self, document_text: str, document_type: str = "resume") -> Dict[str, Any]:
"""Process a document (like a resume) and extract information
Args:
document_text: The text content of the document
document_type: The type of document (e.g., "resume", "job_description")
Returns:
Dictionary containing extracted information and analysis
"""
self.memory_manager.store_session_data(f"last_{document_type}", document_text)
start_time = time.time()
# Create a prompt for document analysis
analysis_prompt = f"""I need to analyze this {document_type} document and extract key information:
{document_text}
Detailed analysis:"""
# Generate analysis using reasoning engine
analysis = self.reasoning_engine.generate_text(analysis_prompt, max_length=1024)
# Extract structured information based on document type
if document_type.lower() == "resume":
extraction_prompt = f"""Based on this resume:
{document_text}
Extract the following information in a structured format:
1. Name:
2. Contact Information:
3. Education:
4. Work Experience:
5. Skills:
6. Projects:
7. Certifications:
8. Languages:
9. Key Strengths:
"""
elif document_type.lower() == "job_description":
extraction_prompt = f"""Based on this job description:
{document_text}
Extract the following information in a structured format:
1. Job Title:
2. Company:
3. Location:
4. Required Skills:
5. Required Experience:
6. Education Requirements:
7. Responsibilities:
8. Benefits:
9. Key Qualifications:
"""
else:
extraction_prompt = f"""Extract key information from this document:
{document_text}
Key information:
1. """
# Generate structured extraction
structured_info = self.reasoning_engine.generate_text(extraction_prompt, max_length=1024)
# Add important facts to memory
self._extract_document_facts(document_text, document_type, structured_info)
processing_time = time.time() - start_time
return {
"document_type": document_type,
"analysis": analysis,
"structured_info": structured_info,
"processing_time": processing_time,
"timestamp": time.time()
}
def _extract_document_facts(self, document_text: str, document_type: str, structured_info: str) -> None:
"""Extract important facts from a document and add them to memory
Args:
document_text: The text content of the document
document_type: The type of document
structured_info: Structured information extracted from the document
"""
# Extract key facts based on document type
if document_type.lower() == "resume":
# Extract name if present
name_match = re.search(r"Name:\s*([\w\s]+)\n", structured_info)
if name_match:
name = name_match.group(1).strip()
self.memory_manager.add_important_fact(f"Document contains resume for {name}", "document")
# Extract skills
skills_match = re.search(r"Skills:\s*([\w\s,\.\-\+]+)\n", structured_info)
if skills_match:
skills = skills_match.group(1).strip()
self.memory_manager.add_important_fact(f"Resume shows skills in: {skills}", "document")
# Extract education
education_match = re.search(r"Education:\s*([\w\s,\.\-\+]+)\n", structured_info)
if education_match:
education = education_match.group(1).strip()
self.memory_manager.add_important_fact(f"Resume shows education: {education}", "document")
elif document_type.lower() == "job_description":
# Extract job title
title_match = re.search(r"Job Title:\s*([\w\s]+)\n", structured_info)
if title_match:
title = title_match.group(1).strip()
self.memory_manager.add_important_fact(f"Document contains job description for {title}", "document")
# Extract required skills
skills_match = re.search(r"Required Skills:\s*([\w\s,\.\-\+]+)\n", structured_info)
if skills_match:
skills = skills_match.group(1).strip()
self.memory_manager.add_important_fact(f"Job requires skills in: {skills}", "document")
# Add general document fact
self.memory_manager.add_important_fact(f"Processed a {document_type} document", "system")
def rank_resumes(self, job_description: str, resumes: List[str]) -> Dict[str, Any]:
"""Rank multiple resumes against a job description
Args:
job_description: The job description text
resumes: List of resume texts to rank
Returns:
Dictionary containing rankings and analysis
"""
start_time = time.time()
# Process the job description first
job_result = self.process_document(job_description, "job_description")
job_analysis = job_result["structured_info"]
# Process each resume
resume_results = []
for i, resume in enumerate(resumes):
result = self.process_document(resume, "resume")
resume_results.append({
"index": i,
"text": resume,
"analysis": result["structured_info"]
})
# Create a ranking prompt
ranking_prompt = f"""I need to rank these resumes based on how well they match the job description.
Job Description Analysis:
{job_analysis}
Resumes:
"""
for i, result in enumerate(resume_results):
ranking_prompt += f"\nResume {i+1}:\n{result['analysis']}\n"
ranking_prompt += "\nRank these resumes from best to worst match for the job, with detailed reasoning for each:"
# Generate the ranking analysis
ranking_analysis = self.reasoning_engine.generate_text(ranking_prompt, max_length=2048)
# Generate a numerical scoring for each resume
scoring_prompt = f"""Based on my analysis of how well these resumes match the job description:
{ranking_analysis}
Assign a numerical score from 0-100 for each resume, where 100 is a perfect match:
Resume 1 Score:"""
scores_text = self.reasoning_engine.generate_text(scoring_prompt, max_length=512)
# Parse scores (simple regex approach)
scores = []
for i in range(len(resume_results)):
score_match = re.search(fr"Resume {i+1} Score:\s*(\d+)", scores_text)
if score_match:
scores.append(int(score_match.group(1)))
else:
# Default score if parsing fails
scores.append(50)
# Create the final rankings
rankings = []
for i, score in enumerate(scores):
rankings.append({
"resume_index": i,
"score": score,
"resume_text": resumes[i][:100] + "..." # Truncated for readability
})
# Sort by score (descending)
rankings.sort(key=lambda x: x["score"], reverse=True)
processing_time = time.time() - start_time
return {
"rankings": rankings,
"analysis": ranking_analysis,
"job_description": job_description,
"processing_time": processing_time
}
# Create the Gradio interface
def create_interface(test_mode=False):
"""Create the Gradio interface for the ResuRank AI Agent
Args:
test_mode: Whether to run in test mode with minimal resources
"""
# Initialize the agent with appropriate settings
if test_mode:
agent = ResuRankAgent(model_id="distilgpt2", use_cache=True, test_mode=True)
# Run a test case to verify functionality
test_results = agent.run_test_case()
print(f"Test results: {test_results}")
else:
agent = ResuRankAgent(model_id="google/flan-t5-base", use_cache=True)
with gr.Blocks(title="ResuRank AI Agent") as interface:
gr.Markdown("# ResuRank AI Agent")
gr.Markdown("An autonomous AI agent that can process queries, perform reasoning, and execute tasks.")
with gr.Tab("Chat"):
chatbot = gr.Chatbot(height=400)
msg = gr.Textbox(label="Your message", placeholder="Ask me anything...")
with gr.Row():
submit_btn = gr.Button("Submit")
clear_btn = gr.Button("Clear")
reasoning_checkbox = gr.Checkbox(label="Use reasoning", value=True)
if reasoning_checkbox.value:
reasoning_output = gr.Textbox(label="Reasoning", interactive=False)
else:
reasoning_output = gr.Textbox(label="Reasoning", interactive=False, visible=False)
def respond(message, chat_history, use_reasoning):
if not message.strip():
return chat_history, "", ""
# Process the query
result = agent.process_query(message, use_reasoning=use_reasoning)
# Update chat history
chat_history.append((message, result["response"]))
return chat_history, "", result.get("reasoning", "")
def clear_chat():
agent.clear_conversation()
return [], "", ""
# Set up event handlers
submit_btn.click(respond, [msg, chatbot, reasoning_checkbox], [chatbot, msg, reasoning_output])
msg.submit(respond, [msg, chatbot, reasoning_checkbox], [chatbot, msg, reasoning_output])
clear_btn.click(clear_chat, None, [chatbot, msg, reasoning_output])
reasoning_checkbox.change(lambda x: gr.update(visible=x), reasoning_checkbox, reasoning_output)
with gr.Tab("Task Execution"):
task_input = gr.Textbox(label="Task Description", placeholder="Describe the task to execute...")
execute_btn = gr.Button("Execute Task")
with gr.Row():
with gr.Column():
plan_output = gr.Textbox(label="Execution Plan", interactive=False)
with gr.Column():
results_output = gr.Textbox(label="Task Results", interactive=False)
task_status = gr.Textbox(label="Task Status", value="idle", interactive=False)
def execute_task(task_description):
if not task_description.strip():
return "No task provided.", "", "idle"
# Execute the task
result = agent.execute_task(task_description)
return result.get("plan", ""), result.get("result", ""), result.get("status", "")
# Set up event handlers
execute_btn.click(execute_task, task_input, [plan_output, results_output, task_status])
with gr.Tab("Agent Status"):
status_btn = gr.Button("Refresh Status")
with gr.Row():
with gr.Column():
model_info = gr.Textbox(label="Model Information", interactive=False)
with gr.Column():
conversation_info = gr.Textbox(label="Conversation Information", interactive=False)
def update_status():
status = agent.get_status()
model_text = f"Model ID: {status['model_id']}\nDevice: {status['device']}"
# Handle important_facts which might be an integer count or a list
important_facts_count = status['important_facts']
if isinstance(important_facts_count, list):
important_facts_count = len(important_facts_count)
conversation_text = f"Conversation Length: {status['conversation_turns']} turns\nImportant Facts: {important_facts_count}\nCurrent Task: {status['current_task'] or 'None'}\nTask Status: {status['task_status']}"
return model_text, conversation_text
# Set up event handlers
status_btn.click(update_status, None, [model_info, conversation_info])
# Initialize status on load
model_info.value, conversation_info.value = update_status()
with gr.Tab("Document Processing"):
with gr.Row():
with gr.Column():
document_input = gr.Textbox(label="Document Text", placeholder="Paste resume or job description text here...", lines=10)
document_type = gr.Radio(["resume", "job_description", "other"], label="Document Type", value="resume")
process_btn = gr.Button("Process Document")
with gr.Row():
with gr.Column():
analysis_output = gr.Textbox(label="Document Analysis", interactive=False, lines=10)
with gr.Column():
structured_output = gr.Textbox(label="Structured Information", interactive=False, lines=10)
def process_document(document_text, doc_type):
if not document_text.strip():
return "No document provided.", ""
# Process the document
result = agent.process_document(document_text, doc_type)
return result.get("analysis", ""), result.get("structured_info", "")
# Set up event handlers
process_btn.click(process_document, [document_input, document_type], [analysis_output, structured_output])
with gr.Tab("Resume Ranking"):
with gr.Row():
with gr.Column():
job_description_input = gr.Textbox(label="Job Description", placeholder="Paste job description here...", lines=8)
with gr.Row():
with gr.Column():
resume1_input = gr.Textbox(label="Resume 1", placeholder="Paste first resume here...", lines=6)
with gr.Column():
resume2_input = gr.Textbox(label="Resume 2", placeholder="Paste second resume here...", lines=6)
with gr.Row():
with gr.Column():
resume3_input = gr.Textbox(label="Resume 3 (Optional)", placeholder="Paste third resume here...", lines=6)
with gr.Column():
resume4_input = gr.Textbox(label="Resume 4 (Optional)", placeholder="Paste fourth resume here...", lines=6)
rank_btn = gr.Button("Rank Resumes")
ranking_output = gr.Textbox(label="Ranking Results", interactive=False, lines=15)
def rank_resumes(job_desc, resume1, resume2, resume3, resume4):
if not job_desc.strip() or not resume1.strip() or not resume2.strip():
return "Please provide at least a job description and two resumes."
# Collect all non-empty resumes
resumes = [r for r in [resume1, resume2, resume3, resume4] if r.strip()]
# Rank the resumes
result = agent.rank_resumes(job_desc, resumes)
# Format the results
output = "Resume Rankings (Best to Worst Match):\n\n"
for i, rank in enumerate(result["rankings"]):
resume_num = rank["resume_index"] + 1
score = rank["score"]
output += f"{i+1}. Resume {resume_num} - Score: {score}/100\n"
output += "\nDetailed Analysis:\n" + result["analysis"]
return output
# Set up event handlers
rank_btn.click(rank_resumes, [job_description_input, resume1_input, resume2_input, resume3_input, resume4_input], ranking_output)
return interface
# Launch the interface when run directly
if __name__ == "__main__":
import argparse
# Parse command line arguments
parser = argparse.ArgumentParser(description="ResuRank AI Agent")
parser.add_argument("--test", action="store_true", help="Run in test mode with minimal resources")
parser.add_argument("--share", action="store_true", help="Share the Gradio interface")
args = parser.parse_args()
# Create and launch the interface
interface = create_interface(test_mode=args.test)
interface.launch(share=args.share)