AudioSummarizer / app /processing.py
Grinding's picture
Update app/processing.py
bf0d7cb verified
import asyncio
import gc
import io
import json
import logging
import os
from pathlib import Path
import librosa
import numpy as np
from dotenv import load_dotenv
from groq import Groq
from pydub import AudioSegment
# --- Configuration ---
load_dotenv()
# Set up basic logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
try:
groq_client = Groq(api_key=os.getenv("GROQ_API_KEY"))
except Exception as e:
groq_client = None
logger.error(f"Failed to initialize Groq client: {e}")
# --- Prompts ---
CHUNK_SUMMARIZATION_SYSTEM_PROMPT = """
You are an expert AI assistant specializing in creating concise, structured, and insightful summaries of parts of meeting and lecture transcripts. This is a segment of a larger transcript. Your goal is to distill the most critical information into a format that is easy to read.
Instructions:
1. **Identify Core Themes**: Begin by identifying the main topics and objectives discussed in this segment.
2. **Extract Key Decisions**: Pinpoint any decisions that were made, including the rationale behind them if available.
3. **Highlight Main Outcomes**: Detail the primary results or conclusions reached in this segment.
4. **Structure the Output**: Present the summary in a clean, professional format. Use bullet points for clarity.
5. **Maintain Neutrality**: The summary should be objective and free of personal interpretation or bias.
"""
FINAL_SUMMARIZATION_SYSTEM_PROMPT = """
You are an expert AI assistant specializing in combining multiple segment summaries into a single concise, structured, and insightful summary of the entire meeting or lecture. Your goal is to distill the most critical information from all segments into a format that is easy to read and act upon.
Instructions:
1. **Identify Overall Core Themes**: Synthesize the main topics and objectives from all segments.
2. **Extract Key Decisions**: Compile any decisions made across segments, including rationales if available.
3. **Highlight Main Outcomes**: Detail the primary results or conclusions from the entire discussion.
4. **Structure the Output**: Present the summary in a clean, professional format. Use bullet points for clarity.
5. **Maintain Neutrality**: The summary should be objective and free of personal interpretation or bias.
"""
ACTION_ITEMS_SYSTEM_PROMPT = """
You are a highly specialized AI assistant tasked with identifying and extracting actionable tasks, commitments, and deadlines from a segment of a meeting or lecture transcript. Your output must be clear, concise, and formatted as a JSON object.
Instructions:
1. **Identify Actionable Language**: Scan the text for phrases indicating a task, such as "will send," "is responsible for," "we need to," "I'll follow up on," etc.
2. **Extract Key Components**: For each action item, identify the assigned person (if mentioned), the specific task, and any deadlines.
3. **Format as JSON**: Return a single JSON object with a key "action_items". The value should be a list of strings, where each string is a clearly defined action item.
4. **Be Precise**: If no specific person is assigned, state the action generally. If no deadline is mentioned, do not invent one.
5. **Handle No Actions**: If no action items are found, return a JSON object with an empty list: {"action_items": []}.
Example Output:
{
"action_items": [
"Alice will send the final budget report by Friday.",
"Bob is responsible for updating the project timeline.",
"The marketing strategy needs to be finalized by next week's meeting."
]
}
"""
# --- Core Functions ---
async def transcribe_chunk(chunk_index: int, audio_chunk: AudioSegment):
"""Sends a single audio chunk to Groq and returns its transcription with the index."""
logger.info(f"Starting transcription for chunk {chunk_index + 1}...")
try:
with io.BytesIO() as chunk_bytes:
audio_chunk.export(chunk_bytes, format="wav")
chunk_bytes.seek(0)
chunk_size = chunk_bytes.getbuffer().nbytes
logger.info(f"Chunk {chunk_index + 1} size: {chunk_size / (1024 * 1024):.2f} MB")
transcription = await asyncio.to_thread(
groq_client.audio.transcriptions.create,
file=("audio.wav", chunk_bytes.read()),
model="whisper-large-v3",
response_format="text"
)
logger.info(f"Finished transcription for chunk {chunk_index + 1}.")
return (chunk_index, transcription)
except Exception as e:
logger.error(f"Error transcribing chunk {chunk_index + 1}: {e}")
return (chunk_index, f"[TRANSCRIPTION FAILED FOR SEGMENT {chunk_index+1}]")
async def process_transcript_chunk(chunk_index: int, chunk_text: str):
"""Process a single transcript chunk for summary and action items."""
logger.info(f"Starting processing for transcript chunk {chunk_index + 1}...")
try:
summary_task = asyncio.to_thread(
groq_client.chat.completions.create,
model="gemma2-9b-it",
messages=[{"role": "system", "content": CHUNK_SUMMARIZATION_SYSTEM_PROMPT}, {"role": "user", "content": chunk_text}],
temperature=0.2,
max_tokens=512
)
action_task = asyncio.to_thread(
groq_client.chat.completions.create,
model="llama-3.1-8b-instant",
messages=[{"role": "system", "content": ACTION_ITEMS_SYSTEM_PROMPT}, {"role": "user", "content": chunk_text}],
temperature=0.1,
max_tokens=512,
response_format={"type": "json_object"}
)
summary_completion, action_completion = await asyncio.gather(summary_task, action_task)
summary = summary_completion.choices[0].message.content
action_items_json = json.loads(action_completion.choices[0].message.content)
action_items = action_items_json.get("action_items", [])
logger.info(f"Finished processing for transcript chunk {chunk_index + 1}.")
return (chunk_index, summary, action_items)
except Exception as e:
logger.error(f"Error processing transcript chunk {chunk_index + 1}: {e}")
return (chunk_index, "[SUMMARY FAILED]", [])
async def run_pipeline(task_id: str, file_path: Path, tasks_db: dict):
if not groq_client:
tasks_db[task_id] = {"status": "failed", "result": "Groq client is not initialized. Check API key."}
return
try:
logger.info(f"Starting pipeline for task {task_id} with file {file_path}")
# Get total duration
duration = librosa.get_duration(filename=str(file_path))
orig_sr = librosa.get_samplerate(str(file_path))
logger.info(f"Audio duration: {duration:.2f} seconds, sample rate: {orig_sr}")
target_sr = 16000
max_chunk_mb = 19.5
max_chunk_bytes = max_chunk_mb * 1024 * 1024
bytes_per_second = target_sr * 2 * 1 # 16-bit mono
max_chunk_duration = (max_chunk_bytes - 1000) / bytes_per_second # conservative
# Configurable base chunk duration, but cap at max
base_chunk_duration = int(os.getenv("CHUNK_DURATION_S", 300)) # default 5 minutes
chunk_duration = min(base_chunk_duration, max_chunk_duration)
logger.info(f"Using chunk duration: {chunk_duration:.2f} seconds")
num_chunks = int(np.ceil(duration / chunk_duration))
logger.info(f"Number of chunks: {num_chunks}")
transcription_tasks = []
for i in range(num_chunks):
offset = i * chunk_duration
this_dur = min(chunk_duration, duration - offset)
logger.info(f"Loading audio chunk {i+1} (offset: {offset:.2f}s, duration: {this_dur:.2f}s)")
y_chunk, _ = librosa.load(str(file_path), sr=None, mono=True, offset=offset, duration=this_dur)
# Resample to target_sr
if _ != target_sr:
y_chunk = librosa.resample(y_chunk, orig_sr=_, target_sr=target_sr)
pcm_chunk = (y_chunk * 32767).astype(np.int16)
audio_segment = AudioSegment(
pcm_chunk.tobytes(),
frame_rate=target_sr,
sample_width=2,
channels=1
)
transcription_tasks.append(transcribe_chunk(i, audio_segment))
# Clean up memory
del y_chunk, pcm_chunk, audio_segment
gc.collect()
# Run all transcription tasks in parallel
logger.info(f"Running {len(transcription_tasks)} transcription tasks in parallel...")
transcription_results = await asyncio.gather(*transcription_tasks)
# Sort results by index
transcription_results.sort(key=lambda x: x[0])
chunk_transcripts = [text for index, text in transcription_results]
full_transcript = "\n".join(chunk_transcripts)
if not full_transcript.strip():
raise ValueError("Transcription result is empty.")
# --- Chunked Analysis with Groq LLM ---
logger.info("Starting chunked analysis with Groq LLM...")
processing_tasks = []
for i, chunk_text in enumerate(chunk_transcripts):
processing_tasks.append(process_transcript_chunk(i, chunk_text))
processing_results = await asyncio.gather(*processing_tasks)
# Sort by index
processing_results.sort(key=lambda x: x[0])
chunk_summaries = [summary for index, summary, actions in processing_results]
all_action_items = []
for index, summary, actions in processing_results:
all_action_items.extend(actions)
# Combine chunk summaries into final summary
combined_summaries = "\n\n---\n\n".join([f"Segment {i+1}:\n{summary}" for i, summary in enumerate(chunk_summaries)])
final_summary_task = asyncio.to_thread(
groq_client.chat.completions.create,
model="openai/gpt-oss-120b",
messages=[{"role": "system", "content": FINAL_SUMMARIZATION_SYSTEM_PROMPT}, {"role": "user", "content": combined_summaries}],
temperature=0.2,
reasoning_format="hidden",
max_tokens=4096
)
final_summary_completion = await final_summary_task
final_summary = final_summary_completion.choices[0].message.content
logger.info(f"Final analysis complete for task {task_id}.")
final_result = {
"transcript": full_transcript,
"summary": final_summary,
"action_items": all_action_items,
}
tasks_db[task_id] = {"status": "complete", "result": final_result}
except Exception as e:
logger.error(f"Error in pipeline for task {task_id}: {e}", exc_info=True)
tasks_db[task_id] = {"status": "failed", "result": str(e)}
finally:
# Clean up the temporary audio file
if os.path.exists(file_path):
os.remove(file_path)
logger.info(f"Cleaned up temporary file: {file_path}")