import asyncio import gc import io import json import logging import os from pathlib import Path import librosa import numpy as np from dotenv import load_dotenv from groq import Groq from pydub import AudioSegment # --- Configuration --- load_dotenv() # Set up basic logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) try: groq_client = Groq(api_key=os.getenv("GROQ_API_KEY")) except Exception as e: groq_client = None logger.error(f"Failed to initialize Groq client: {e}") # --- Prompts --- CHUNK_SUMMARIZATION_SYSTEM_PROMPT = """ You are an expert AI assistant specializing in creating concise, structured, and insightful summaries of parts of meeting and lecture transcripts. This is a segment of a larger transcript. Your goal is to distill the most critical information into a format that is easy to read. Instructions: 1. **Identify Core Themes**: Begin by identifying the main topics and objectives discussed in this segment. 2. **Extract Key Decisions**: Pinpoint any decisions that were made, including the rationale behind them if available. 3. **Highlight Main Outcomes**: Detail the primary results or conclusions reached in this segment. 4. **Structure the Output**: Present the summary in a clean, professional format. Use bullet points for clarity. 5. **Maintain Neutrality**: The summary should be objective and free of personal interpretation or bias. """ FINAL_SUMMARIZATION_SYSTEM_PROMPT = """ You are an expert AI assistant specializing in combining multiple segment summaries into a single concise, structured, and insightful summary of the entire meeting or lecture. Your goal is to distill the most critical information from all segments into a format that is easy to read and act upon. Instructions: 1. **Identify Overall Core Themes**: Synthesize the main topics and objectives from all segments. 2. **Extract Key Decisions**: Compile any decisions made across segments, including rationales if available. 3. **Highlight Main Outcomes**: Detail the primary results or conclusions from the entire discussion. 4. **Structure the Output**: Present the summary in a clean, professional format. Use bullet points for clarity. 5. **Maintain Neutrality**: The summary should be objective and free of personal interpretation or bias. """ ACTION_ITEMS_SYSTEM_PROMPT = """ You are a highly specialized AI assistant tasked with identifying and extracting actionable tasks, commitments, and deadlines from a segment of a meeting or lecture transcript. Your output must be clear, concise, and formatted as a JSON object. Instructions: 1. **Identify Actionable Language**: Scan the text for phrases indicating a task, such as "will send," "is responsible for," "we need to," "I'll follow up on," etc. 2. **Extract Key Components**: For each action item, identify the assigned person (if mentioned), the specific task, and any deadlines. 3. **Format as JSON**: Return a single JSON object with a key "action_items". The value should be a list of strings, where each string is a clearly defined action item. 4. **Be Precise**: If no specific person is assigned, state the action generally. If no deadline is mentioned, do not invent one. 5. **Handle No Actions**: If no action items are found, return a JSON object with an empty list: {"action_items": []}. Example Output: { "action_items": [ "Alice will send the final budget report by Friday.", "Bob is responsible for updating the project timeline.", "The marketing strategy needs to be finalized by next week's meeting." ] } """ # --- Core Functions --- async def transcribe_chunk(chunk_index: int, audio_chunk: AudioSegment): """Sends a single audio chunk to Groq and returns its transcription with the index.""" logger.info(f"Starting transcription for chunk {chunk_index + 1}...") try: with io.BytesIO() as chunk_bytes: audio_chunk.export(chunk_bytes, format="wav") chunk_bytes.seek(0) chunk_size = chunk_bytes.getbuffer().nbytes logger.info(f"Chunk {chunk_index + 1} size: {chunk_size / (1024 * 1024):.2f} MB") transcription = await asyncio.to_thread( groq_client.audio.transcriptions.create, file=("audio.wav", chunk_bytes.read()), model="whisper-large-v3", response_format="text" ) logger.info(f"Finished transcription for chunk {chunk_index + 1}.") return (chunk_index, transcription) except Exception as e: logger.error(f"Error transcribing chunk {chunk_index + 1}: {e}") return (chunk_index, f"[TRANSCRIPTION FAILED FOR SEGMENT {chunk_index+1}]") async def process_transcript_chunk(chunk_index: int, chunk_text: str): """Process a single transcript chunk for summary and action items.""" logger.info(f"Starting processing for transcript chunk {chunk_index + 1}...") try: summary_task = asyncio.to_thread( groq_client.chat.completions.create, model="gemma2-9b-it", messages=[{"role": "system", "content": CHUNK_SUMMARIZATION_SYSTEM_PROMPT}, {"role": "user", "content": chunk_text}], temperature=0.2, max_tokens=512 ) action_task = asyncio.to_thread( groq_client.chat.completions.create, model="llama-3.1-8b-instant", messages=[{"role": "system", "content": ACTION_ITEMS_SYSTEM_PROMPT}, {"role": "user", "content": chunk_text}], temperature=0.1, max_tokens=512, response_format={"type": "json_object"} ) summary_completion, action_completion = await asyncio.gather(summary_task, action_task) summary = summary_completion.choices[0].message.content action_items_json = json.loads(action_completion.choices[0].message.content) action_items = action_items_json.get("action_items", []) logger.info(f"Finished processing for transcript chunk {chunk_index + 1}.") return (chunk_index, summary, action_items) except Exception as e: logger.error(f"Error processing transcript chunk {chunk_index + 1}: {e}") return (chunk_index, "[SUMMARY FAILED]", []) async def run_pipeline(task_id: str, file_path: Path, tasks_db: dict): if not groq_client: tasks_db[task_id] = {"status": "failed", "result": "Groq client is not initialized. Check API key."} return try: logger.info(f"Starting pipeline for task {task_id} with file {file_path}") # Get total duration duration = librosa.get_duration(filename=str(file_path)) orig_sr = librosa.get_samplerate(str(file_path)) logger.info(f"Audio duration: {duration:.2f} seconds, sample rate: {orig_sr}") target_sr = 16000 max_chunk_mb = 19.5 max_chunk_bytes = max_chunk_mb * 1024 * 1024 bytes_per_second = target_sr * 2 * 1 # 16-bit mono max_chunk_duration = (max_chunk_bytes - 1000) / bytes_per_second # conservative # Configurable base chunk duration, but cap at max base_chunk_duration = int(os.getenv("CHUNK_DURATION_S", 300)) # default 5 minutes chunk_duration = min(base_chunk_duration, max_chunk_duration) logger.info(f"Using chunk duration: {chunk_duration:.2f} seconds") num_chunks = int(np.ceil(duration / chunk_duration)) logger.info(f"Number of chunks: {num_chunks}") transcription_tasks = [] for i in range(num_chunks): offset = i * chunk_duration this_dur = min(chunk_duration, duration - offset) logger.info(f"Loading audio chunk {i+1} (offset: {offset:.2f}s, duration: {this_dur:.2f}s)") y_chunk, _ = librosa.load(str(file_path), sr=None, mono=True, offset=offset, duration=this_dur) # Resample to target_sr if _ != target_sr: y_chunk = librosa.resample(y_chunk, orig_sr=_, target_sr=target_sr) pcm_chunk = (y_chunk * 32767).astype(np.int16) audio_segment = AudioSegment( pcm_chunk.tobytes(), frame_rate=target_sr, sample_width=2, channels=1 ) transcription_tasks.append(transcribe_chunk(i, audio_segment)) # Clean up memory del y_chunk, pcm_chunk, audio_segment gc.collect() # Run all transcription tasks in parallel logger.info(f"Running {len(transcription_tasks)} transcription tasks in parallel...") transcription_results = await asyncio.gather(*transcription_tasks) # Sort results by index transcription_results.sort(key=lambda x: x[0]) chunk_transcripts = [text for index, text in transcription_results] full_transcript = "\n".join(chunk_transcripts) if not full_transcript.strip(): raise ValueError("Transcription result is empty.") # --- Chunked Analysis with Groq LLM --- logger.info("Starting chunked analysis with Groq LLM...") processing_tasks = [] for i, chunk_text in enumerate(chunk_transcripts): processing_tasks.append(process_transcript_chunk(i, chunk_text)) processing_results = await asyncio.gather(*processing_tasks) # Sort by index processing_results.sort(key=lambda x: x[0]) chunk_summaries = [summary for index, summary, actions in processing_results] all_action_items = [] for index, summary, actions in processing_results: all_action_items.extend(actions) # Combine chunk summaries into final summary combined_summaries = "\n\n---\n\n".join([f"Segment {i+1}:\n{summary}" for i, summary in enumerate(chunk_summaries)]) final_summary_task = asyncio.to_thread( groq_client.chat.completions.create, model="openai/gpt-oss-120b", messages=[{"role": "system", "content": FINAL_SUMMARIZATION_SYSTEM_PROMPT}, {"role": "user", "content": combined_summaries}], temperature=0.2, reasoning_format="hidden", max_tokens=4096 ) final_summary_completion = await final_summary_task final_summary = final_summary_completion.choices[0].message.content logger.info(f"Final analysis complete for task {task_id}.") final_result = { "transcript": full_transcript, "summary": final_summary, "action_items": all_action_items, } tasks_db[task_id] = {"status": "complete", "result": final_result} except Exception as e: logger.error(f"Error in pipeline for task {task_id}: {e}", exc_info=True) tasks_db[task_id] = {"status": "failed", "result": str(e)} finally: # Clean up the temporary audio file if os.path.exists(file_path): os.remove(file_path) logger.info(f"Cleaned up temporary file: {file_path}")