Spaces:
Sleeping
Sleeping
| import logging | |
| import time | |
| from typing import Dict, List, Any, Optional | |
| from dataclasses import dataclass | |
| import requests | |
| import json | |
| import os | |
| import os | |
| import sys | |
| from dotenv import load_dotenv | |
| load_dotenv() | |
| sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) | |
| # Setup logging | |
| try: | |
| from logger.custom_logger import CustomLoggerTracker | |
| custom_log = CustomLoggerTracker() | |
| logger = custom_log.get_logger("groq_client") | |
| except ImportError: | |
| # Fallback to standard logging if custom logger not available | |
| logger = logging.getLogger("groq_client") | |
| GROQ_API_KEY=os.getenv('GROQ_API_KEY', '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') | |
| class LLMResponse: | |
| """Response from LLM generation.""" | |
| text: str | |
| model_name: str | |
| processing_time: float | |
| token_count: int | |
| success: bool | |
| error_message: Optional[str] = None | |
| finish_reason: Optional[str] = None | |
| class GroqClient: | |
| """ | |
| Groq API client for fast LLM inference. | |
| This client provides high-speed inference using Groq's LPU architecture | |
| with support for various models like Llama, Mixtral, and Gemma. | |
| """ | |
| def __init__(self, api_key: str, base_url: str = "https://api.groq.com/openai/v1"): | |
| """ | |
| Initialize the Groq client. | |
| Args: | |
| api_key: Groq API key | |
| base_url: Base URL for Groq API | |
| """ | |
| self.api_key = api_key | |
| self.base_url = base_url.rstrip('/') | |
| self.session = requests.Session() | |
| self.session.headers.update({ | |
| 'Authorization': f'Bearer {api_key}', | |
| 'Content-Type': 'application/json' | |
| }) | |
| # Rate limiting | |
| self.max_requests_per_minute = 30 | |
| self.request_timestamps = [] | |
| logger.info(f"Groq client initialized with base URL: {base_url}") | |
| def generate_response(self, messages: List[Dict[str, str]], | |
| model: str = "openai/gpt-oss-120b", | |
| max_tokens: int = 1024, | |
| temperature: float = 0.1) -> LLMResponse: | |
| """ | |
| Generate response using Groq LLM. | |
| Args: | |
| messages: List of message dictionaries with 'role' and 'content' | |
| model: Model name to use | |
| max_tokens: Maximum tokens to generate | |
| temperature: Sampling temperature | |
| Returns: | |
| LLMResponse with generated text and metadata | |
| """ | |
| start_time = time.time() | |
| try: | |
| # Rate limiting check | |
| self._check_rate_limit() | |
| # Prepare request payload | |
| payload = { | |
| "model": model, | |
| "messages": messages, | |
| "max_tokens": max_tokens, | |
| "temperature": temperature, | |
| "stream": False | |
| } | |
| # Make API request | |
| response = self.session.post( | |
| f"{self.base_url}/chat/completions", | |
| json=payload, | |
| timeout=60 | |
| ) | |
| processing_time = time.time() - start_time | |
| if response.status_code == 200: | |
| data = response.json() | |
| # Extract response text | |
| choice = data.get('choices', [{}])[0] | |
| message = choice.get('message', {}) | |
| generated_text = message.get('content', '') | |
| finish_reason = choice.get('finish_reason', 'unknown') | |
| # Get usage info | |
| usage = data.get('usage', {}) | |
| token_count = usage.get('total_tokens', 0) | |
| logger.debug(f"Generated response in {processing_time:.2f}s, {token_count} tokens") | |
| return LLMResponse( | |
| text=generated_text, | |
| model_name=model, | |
| processing_time=processing_time, | |
| token_count=token_count, | |
| success=True, | |
| finish_reason=finish_reason | |
| ) | |
| else: | |
| error_msg = f"API request failed with status {response.status_code}: {response.text}" | |
| logger.error(error_msg) | |
| return LLMResponse( | |
| text="", | |
| model_name=model, | |
| processing_time=processing_time, | |
| token_count=0, | |
| success=False, | |
| error_message=error_msg | |
| ) | |
| except Exception as e: | |
| processing_time = time.time() - start_time | |
| error_msg = f"LLM generation failed: {str(e)}" | |
| logger.error(error_msg) | |
| return LLMResponse( | |
| text="", | |
| model_name=model, | |
| processing_time=processing_time, | |
| token_count=0, | |
| success=False, | |
| error_message=error_msg | |
| ) | |
| def answer_question(self, question: str, context: str, | |
| model: str = "openai/gpt-oss-120b") -> LLMResponse: | |
| """ | |
| Answer a question based on provided context. | |
| Args: | |
| question: Question to answer | |
| context: Context information | |
| model: Model name to use | |
| Returns: | |
| LLMResponse with the answer | |
| """ | |
| # Create system prompt for manufacturing Q&A | |
| system_prompt = """You are an expert manufacturing analyst assistant. Your task is to answer questions about manufacturing data, processes, and documentation based on the provided context. | |
| Guidelines: | |
| 1. Answer questions accurately based only on the provided context | |
| 2. If the context doesn't contain enough information, say so clearly | |
| 3. Include specific references to data points, measurements, or processes when available | |
| 4. Use technical manufacturing terminology appropriately | |
| 5. Provide concise but complete answers | |
| 6. If asked about trends or comparisons, use the numerical data from the context | |
| Always cite your sources by mentioning the specific document, page, or section where you found the information.""" | |
| # Create user prompt with context and question | |
| user_prompt = f"""Context: | |
| {context} | |
| Question: {question} | |
| Please provide a detailed answer based on the context above. Include specific citations where possible.""" | |
| messages = [ | |
| {"role": "system", "content": system_prompt}, | |
| {"role": "user", "content": user_prompt} | |
| ] | |
| return self.generate_response(messages, model, max_tokens=1024, temperature=0.1) | |
| def summarize_document(self, content: str, | |
| model: str = "openai/gpt-oss-120b") -> LLMResponse: | |
| system_prompt = """You are an expert at summarizing manufacturing documents. Create concise, informative summaries that capture the key information, data points, and insights from manufacturing documentation.""" | |
| user_prompt = f"""Please provide a comprehensive summary of the following manufacturing document content: | |
| {content} | |
| Focus on: | |
| - Key manufacturing processes described | |
| - Important measurements, specifications, or data points | |
| - Quality metrics or performance indicators | |
| - Any issues, recommendations, or conclusions | |
| - Critical dates, locations, or responsible parties | |
| Keep the summary concise but comprehensive.""" | |
| messages = [ | |
| {"role": "system", "content": system_prompt}, | |
| {"role": "user", "content": user_prompt} | |
| ] | |
| return self.generate_response(messages, model, max_tokens=512, temperature=0.1) | |
| def extract_key_information(self, content: str, | |
| model: str = "openai/gpt-oss-120b") -> LLMResponse: | |
| """ | |
| Extract key information from document content. | |
| Args: | |
| content: Document content to analyze | |
| model: Model name to use | |
| Returns: | |
| LLMResponse with extracted key information | |
| """ | |
| system_prompt = """You are an expert at extracting key information from manufacturing documents. Identify and extract the most important data points, specifications, processes, and insights.""" | |
| user_prompt = f"""Extract the key information from the following manufacturing document content: | |
| {content} | |
| Please organize the extracted information into categories such as: | |
| - Manufacturing Processes | |
| - Quality Metrics | |
| - Specifications/Parameters | |
| - Performance Data | |
| - Issues/Problems | |
| - Recommendations | |
| - Dates and Timelines | |
| Present the information in a structured, easy-to-read format.""" | |
| messages = [ | |
| {"role": "system", "content": system_prompt}, | |
| {"role": "user", "content": user_prompt} | |
| ] | |
| return self.generate_response(messages, model, max_tokens=768, temperature=0.1) | |
| def _check_rate_limit(self): | |
| """Check and enforce rate limiting.""" | |
| current_time = time.time() | |
| # Remove timestamps older than 1 minute | |
| self.request_timestamps = [ | |
| ts for ts in self.request_timestamps | |
| if current_time - ts < 60 | |
| ] | |
| # Check if we're at the rate limit | |
| if len(self.request_timestamps) >= self.max_requests_per_minute: | |
| sleep_time = 60 - (current_time - self.request_timestamps[0]) | |
| if sleep_time > 0: | |
| logger.warning(f"Rate limit reached, sleeping for {sleep_time:.2f} seconds") | |
| time.sleep(sleep_time) | |
| # Add current request timestamp | |
| self.request_timestamps.append(current_time) | |
| def get_available_models(self) -> List[str]: | |
| """ | |
| Get list of available models. | |
| Returns: | |
| List of available model names | |
| """ | |
| try: | |
| response = self.session.get(f"{self.base_url}/models") | |
| if response.status_code == 200: | |
| data = response.json() | |
| models = [model['id'] for model in data.get('data', [])] | |
| return models | |
| else: | |
| logger.error(f"Failed to get models: {response.status_code}") | |
| return [] | |
| except Exception as e: | |
| logger.error(f"Failed to get available models: {e}") | |
| return [] | |
| def health_check(self) -> bool: | |
| """ | |
| Check if the Groq API is accessible. | |
| Returns: | |
| True if healthy, False otherwise | |
| """ | |
| try: | |
| response = self.session.get(f"{self.base_url}/models", timeout=10) | |
| return response.status_code == 200 | |
| except Exception as e: | |
| logger.error(f"Groq health check failed: {e}") | |
| return False | |
| class LLMSystem: | |
| def __init__(self, config: Dict[str, Any]): | |
| self.config = config | |
| self.api_key = GROQ_API_KEY | |
| self.default_model = config.get('llm_model', 'openai/gpt-oss-120b') | |
| self.max_retries = config.get('max_retries', 3) | |
| if not self.api_key: | |
| raise ValueError("Groq API key is required") | |
| self.client = GroqClient(self.api_key) | |
| logger.info(f"LLM system initialized with default model: {self.default_model}") | |
| def answer_question(self, question: str, context: str, model: Optional[str] = None) -> str: | |
| model = model or self.default_model | |
| for attempt in range(self.max_retries): | |
| try: | |
| response = self.client.answer_question(question, context, model) | |
| if response.success: | |
| return response.text | |
| else: | |
| logger.warning(f"LLM generation failed (attempt {attempt + 1}): {response.error_message}") | |
| if attempt < self.max_retries - 1: | |
| time.sleep(2 ** attempt) # Exponential backoff | |
| except Exception as e: | |
| logger.warning(f"LLM generation error (attempt {attempt + 1}): {e}") | |
| if attempt < self.max_retries - 1: | |
| time.sleep(2 ** attempt) | |
| # Return fallback response if all attempts failed | |
| return "I apologize, but I'm unable to generate a response at this time due to technical difficulties. Please try again later." | |
| def summarize_content(self, content: str, model: Optional[str] = None) -> str: | |
| model = model or self.default_model | |
| for attempt in range(self.max_retries): | |
| try: | |
| response = self.client.summarize_document(content, model) | |
| if response.success: | |
| return response.text | |
| else: | |
| logger.warning(f"Summarization failed (attempt {attempt + 1}): {response.error_message}") | |
| if attempt < self.max_retries - 1: | |
| time.sleep(2 ** attempt) | |
| except Exception as e: | |
| logger.warning(f"Summarization error (attempt {attempt + 1}): {e}") | |
| if attempt < self.max_retries - 1: | |
| time.sleep(2 ** attempt) | |
| return "Unable to generate summary at this time." | |
| if __name__=="__main__": | |
| logger.info(f"Groq client init ..") | |
| ## Test code (for demonstration purposes) | |
| config = { | |
| 'groq_api_key': GROQ_API_KEY, | |
| 'llm_model': 'openai/gpt-oss-120b', | |
| 'max_retries': 3 | |
| } | |
| llm_system = LLMSystem(config) | |
| question = "What is the capital of France?" | |
| context = "France is a country in Western Europe." | |
| answer = llm_system.answer_question(question, context) | |
| logger.info(f"Answer: {answer}") |