Spaces:
Sleeping
Sleeping
| import asyncio | |
| import gc | |
| import io | |
| import json | |
| import logging | |
| import os | |
| from pathlib import Path | |
| import librosa | |
| import numpy as np | |
| from dotenv import load_dotenv | |
| from groq import Groq | |
| from pydub import AudioSegment | |
| # --- Configuration --- | |
| load_dotenv() | |
| # Set up basic logging | |
| logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') | |
| logger = logging.getLogger(__name__) | |
| try: | |
| groq_client = Groq(api_key=os.getenv("GROQ_API_KEY")) | |
| except Exception as e: | |
| groq_client = None | |
| logger.error(f"Failed to initialize Groq client: {e}") | |
| # --- Prompts --- | |
| CHUNK_SUMMARIZATION_SYSTEM_PROMPT = """ | |
| You are an expert AI assistant specializing in creating concise, structured, and insightful summaries of parts of meeting and lecture transcripts. This is a segment of a larger transcript. Your goal is to distill the most critical information into a format that is easy to read. | |
| Instructions: | |
| 1. **Identify Core Themes**: Begin by identifying the main topics and objectives discussed in this segment. | |
| 2. **Extract Key Decisions**: Pinpoint any decisions that were made, including the rationale behind them if available. | |
| 3. **Highlight Main Outcomes**: Detail the primary results or conclusions reached in this segment. | |
| 4. **Structure the Output**: Present the summary in a clean, professional format. Use bullet points for clarity. | |
| 5. **Maintain Neutrality**: The summary should be objective and free of personal interpretation or bias. | |
| """ | |
| FINAL_SUMMARIZATION_SYSTEM_PROMPT = """ | |
| You are an expert AI assistant specializing in combining multiple segment summaries into a single concise, structured, and insightful summary of the entire meeting or lecture. Your goal is to distill the most critical information from all segments into a format that is easy to read and act upon. | |
| Instructions: | |
| 1. **Identify Overall Core Themes**: Synthesize the main topics and objectives from all segments. | |
| 2. **Extract Key Decisions**: Compile any decisions made across segments, including rationales if available. | |
| 3. **Highlight Main Outcomes**: Detail the primary results or conclusions from the entire discussion. | |
| 4. **Structure the Output**: Present the summary in a clean, professional format. Use bullet points for clarity. | |
| 5. **Maintain Neutrality**: The summary should be objective and free of personal interpretation or bias. | |
| """ | |
| ACTION_ITEMS_SYSTEM_PROMPT = """ | |
| You are a highly specialized AI assistant tasked with identifying and extracting actionable tasks, commitments, and deadlines from a segment of a meeting or lecture transcript. Your output must be clear, concise, and formatted as a JSON object. | |
| Instructions: | |
| 1. **Identify Actionable Language**: Scan the text for phrases indicating a task, such as "will send," "is responsible for," "we need to," "I'll follow up on," etc. | |
| 2. **Extract Key Components**: For each action item, identify the assigned person (if mentioned), the specific task, and any deadlines. | |
| 3. **Format as JSON**: Return a single JSON object with a key "action_items". The value should be a list of strings, where each string is a clearly defined action item. | |
| 4. **Be Precise**: If no specific person is assigned, state the action generally. If no deadline is mentioned, do not invent one. | |
| 5. **Handle No Actions**: If no action items are found, return a JSON object with an empty list: {"action_items": []}. | |
| Example Output: | |
| { | |
| "action_items": [ | |
| "Alice will send the final budget report by Friday.", | |
| "Bob is responsible for updating the project timeline.", | |
| "The marketing strategy needs to be finalized by next week's meeting." | |
| ] | |
| } | |
| """ | |
| # --- Core Functions --- | |
| async def transcribe_chunk(chunk_index: int, audio_chunk: AudioSegment): | |
| """Sends a single audio chunk to Groq and returns its transcription with the index.""" | |
| logger.info(f"Starting transcription for chunk {chunk_index + 1}...") | |
| try: | |
| with io.BytesIO() as chunk_bytes: | |
| audio_chunk.export(chunk_bytes, format="wav") | |
| chunk_bytes.seek(0) | |
| chunk_size = chunk_bytes.getbuffer().nbytes | |
| logger.info(f"Chunk {chunk_index + 1} size: {chunk_size / (1024 * 1024):.2f} MB") | |
| transcription = await asyncio.to_thread( | |
| groq_client.audio.transcriptions.create, | |
| file=("audio.wav", chunk_bytes.read()), | |
| model="whisper-large-v3", | |
| response_format="text" | |
| ) | |
| logger.info(f"Finished transcription for chunk {chunk_index + 1}.") | |
| return (chunk_index, transcription) | |
| except Exception as e: | |
| logger.error(f"Error transcribing chunk {chunk_index + 1}: {e}") | |
| return (chunk_index, f"[TRANSCRIPTION FAILED FOR SEGMENT {chunk_index+1}]") | |
| async def process_transcript_chunk(chunk_index: int, chunk_text: str): | |
| """Process a single transcript chunk for summary and action items.""" | |
| logger.info(f"Starting processing for transcript chunk {chunk_index + 1}...") | |
| try: | |
| summary_task = asyncio.to_thread( | |
| groq_client.chat.completions.create, | |
| model="gemma2-9b-it", | |
| messages=[{"role": "system", "content": CHUNK_SUMMARIZATION_SYSTEM_PROMPT}, {"role": "user", "content": chunk_text}], | |
| temperature=0.2, | |
| max_tokens=512 | |
| ) | |
| action_task = asyncio.to_thread( | |
| groq_client.chat.completions.create, | |
| model="llama-3.1-8b-instant", | |
| messages=[{"role": "system", "content": ACTION_ITEMS_SYSTEM_PROMPT}, {"role": "user", "content": chunk_text}], | |
| temperature=0.1, | |
| max_tokens=512, | |
| response_format={"type": "json_object"} | |
| ) | |
| summary_completion, action_completion = await asyncio.gather(summary_task, action_task) | |
| summary = summary_completion.choices[0].message.content | |
| action_items_json = json.loads(action_completion.choices[0].message.content) | |
| action_items = action_items_json.get("action_items", []) | |
| logger.info(f"Finished processing for transcript chunk {chunk_index + 1}.") | |
| return (chunk_index, summary, action_items) | |
| except Exception as e: | |
| logger.error(f"Error processing transcript chunk {chunk_index + 1}: {e}") | |
| return (chunk_index, "[SUMMARY FAILED]", []) | |
| async def run_pipeline(task_id: str, file_path: Path, tasks_db: dict): | |
| if not groq_client: | |
| tasks_db[task_id] = {"status": "failed", "result": "Groq client is not initialized. Check API key."} | |
| return | |
| try: | |
| logger.info(f"Starting pipeline for task {task_id} with file {file_path}") | |
| # Get total duration | |
| duration = librosa.get_duration(filename=str(file_path)) | |
| orig_sr = librosa.get_samplerate(str(file_path)) | |
| logger.info(f"Audio duration: {duration:.2f} seconds, sample rate: {orig_sr}") | |
| target_sr = 16000 | |
| max_chunk_mb = 19.5 | |
| max_chunk_bytes = max_chunk_mb * 1024 * 1024 | |
| bytes_per_second = target_sr * 2 * 1 # 16-bit mono | |
| max_chunk_duration = (max_chunk_bytes - 1000) / bytes_per_second # conservative | |
| # Configurable base chunk duration, but cap at max | |
| base_chunk_duration = int(os.getenv("CHUNK_DURATION_S", 300)) # default 5 minutes | |
| chunk_duration = min(base_chunk_duration, max_chunk_duration) | |
| logger.info(f"Using chunk duration: {chunk_duration:.2f} seconds") | |
| num_chunks = int(np.ceil(duration / chunk_duration)) | |
| logger.info(f"Number of chunks: {num_chunks}") | |
| transcription_tasks = [] | |
| for i in range(num_chunks): | |
| offset = i * chunk_duration | |
| this_dur = min(chunk_duration, duration - offset) | |
| logger.info(f"Loading audio chunk {i+1} (offset: {offset:.2f}s, duration: {this_dur:.2f}s)") | |
| y_chunk, _ = librosa.load(str(file_path), sr=None, mono=True, offset=offset, duration=this_dur) | |
| # Resample to target_sr | |
| if _ != target_sr: | |
| y_chunk = librosa.resample(y_chunk, orig_sr=_, target_sr=target_sr) | |
| pcm_chunk = (y_chunk * 32767).astype(np.int16) | |
| audio_segment = AudioSegment( | |
| pcm_chunk.tobytes(), | |
| frame_rate=target_sr, | |
| sample_width=2, | |
| channels=1 | |
| ) | |
| transcription_tasks.append(transcribe_chunk(i, audio_segment)) | |
| # Clean up memory | |
| del y_chunk, pcm_chunk, audio_segment | |
| gc.collect() | |
| # Run all transcription tasks in parallel | |
| logger.info(f"Running {len(transcription_tasks)} transcription tasks in parallel...") | |
| transcription_results = await asyncio.gather(*transcription_tasks) | |
| # Sort results by index | |
| transcription_results.sort(key=lambda x: x[0]) | |
| chunk_transcripts = [text for index, text in transcription_results] | |
| full_transcript = "\n".join(chunk_transcripts) | |
| if not full_transcript.strip(): | |
| raise ValueError("Transcription result is empty.") | |
| # --- Chunked Analysis with Groq LLM --- | |
| logger.info("Starting chunked analysis with Groq LLM...") | |
| processing_tasks = [] | |
| for i, chunk_text in enumerate(chunk_transcripts): | |
| processing_tasks.append(process_transcript_chunk(i, chunk_text)) | |
| processing_results = await asyncio.gather(*processing_tasks) | |
| # Sort by index | |
| processing_results.sort(key=lambda x: x[0]) | |
| chunk_summaries = [summary for index, summary, actions in processing_results] | |
| all_action_items = [] | |
| for index, summary, actions in processing_results: | |
| all_action_items.extend(actions) | |
| # Combine chunk summaries into final summary | |
| combined_summaries = "\n\n---\n\n".join([f"Segment {i+1}:\n{summary}" for i, summary in enumerate(chunk_summaries)]) | |
| final_summary_task = asyncio.to_thread( | |
| groq_client.chat.completions.create, | |
| model="openai/gpt-oss-120b", | |
| messages=[{"role": "system", "content": FINAL_SUMMARIZATION_SYSTEM_PROMPT}, {"role": "user", "content": combined_summaries}], | |
| temperature=0.2, | |
| reasoning_format="hidden", | |
| max_tokens=4096 | |
| ) | |
| final_summary_completion = await final_summary_task | |
| final_summary = final_summary_completion.choices[0].message.content | |
| logger.info(f"Final analysis complete for task {task_id}.") | |
| final_result = { | |
| "transcript": full_transcript, | |
| "summary": final_summary, | |
| "action_items": all_action_items, | |
| } | |
| tasks_db[task_id] = {"status": "complete", "result": final_result} | |
| except Exception as e: | |
| logger.error(f"Error in pipeline for task {task_id}: {e}", exc_info=True) | |
| tasks_db[task_id] = {"status": "failed", "result": str(e)} | |
| finally: | |
| # Clean up the temporary audio file | |
| if os.path.exists(file_path): | |
| os.remove(file_path) | |
| logger.info(f"Cleaned up temporary file: {file_path}") |