Spaces:
Sleeping
Sleeping
File size: 11,273 Bytes
39195c0 27f255b b61bb62 27f255b b61bb62 27f255b b61bb62 ae78221 b61bb62 27f255b b61bb62 39195c0 a617e93 39195c0 27f255b afc4870 27f255b afc4870 27f255b 39195c0 a617e93 39195c0 a617e93 ae78221 39195c0 a617e93 ae78221 a617e93 ae78221 a617e93 ae78221 39195c0 a617e93 39195c0 a617e93 39195c0 a617e93 39195c0 27f255b 39195c0 27f255b 39195c0 27f255b 39195c0 27f255b 39195c0 bf0d7cb 27f255b afc4870 ccb7a9e 39195c0 27f255b 39195c0 27f255b 39195c0 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 |
import asyncio
import gc
import io
import json
import logging
import os
from pathlib import Path
import librosa
import numpy as np
from dotenv import load_dotenv
from groq import Groq
from pydub import AudioSegment
# --- Configuration ---
load_dotenv()
# Set up basic logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
try:
groq_client = Groq(api_key=os.getenv("GROQ_API_KEY"))
except Exception as e:
groq_client = None
logger.error(f"Failed to initialize Groq client: {e}")
# --- Prompts ---
CHUNK_SUMMARIZATION_SYSTEM_PROMPT = """
You are an expert AI assistant specializing in creating concise, structured, and insightful summaries of parts of meeting and lecture transcripts. This is a segment of a larger transcript. Your goal is to distill the most critical information into a format that is easy to read.
Instructions:
1. **Identify Core Themes**: Begin by identifying the main topics and objectives discussed in this segment.
2. **Extract Key Decisions**: Pinpoint any decisions that were made, including the rationale behind them if available.
3. **Highlight Main Outcomes**: Detail the primary results or conclusions reached in this segment.
4. **Structure the Output**: Present the summary in a clean, professional format. Use bullet points for clarity.
5. **Maintain Neutrality**: The summary should be objective and free of personal interpretation or bias.
"""
FINAL_SUMMARIZATION_SYSTEM_PROMPT = """
You are an expert AI assistant specializing in combining multiple segment summaries into a single concise, structured, and insightful summary of the entire meeting or lecture. Your goal is to distill the most critical information from all segments into a format that is easy to read and act upon.
Instructions:
1. **Identify Overall Core Themes**: Synthesize the main topics and objectives from all segments.
2. **Extract Key Decisions**: Compile any decisions made across segments, including rationales if available.
3. **Highlight Main Outcomes**: Detail the primary results or conclusions from the entire discussion.
4. **Structure the Output**: Present the summary in a clean, professional format. Use bullet points for clarity.
5. **Maintain Neutrality**: The summary should be objective and free of personal interpretation or bias.
"""
ACTION_ITEMS_SYSTEM_PROMPT = """
You are a highly specialized AI assistant tasked with identifying and extracting actionable tasks, commitments, and deadlines from a segment of a meeting or lecture transcript. Your output must be clear, concise, and formatted as a JSON object.
Instructions:
1. **Identify Actionable Language**: Scan the text for phrases indicating a task, such as "will send," "is responsible for," "we need to," "I'll follow up on," etc.
2. **Extract Key Components**: For each action item, identify the assigned person (if mentioned), the specific task, and any deadlines.
3. **Format as JSON**: Return a single JSON object with a key "action_items". The value should be a list of strings, where each string is a clearly defined action item.
4. **Be Precise**: If no specific person is assigned, state the action generally. If no deadline is mentioned, do not invent one.
5. **Handle No Actions**: If no action items are found, return a JSON object with an empty list: {"action_items": []}.
Example Output:
{
"action_items": [
"Alice will send the final budget report by Friday.",
"Bob is responsible for updating the project timeline.",
"The marketing strategy needs to be finalized by next week's meeting."
]
}
"""
# --- Core Functions ---
async def transcribe_chunk(chunk_index: int, audio_chunk: AudioSegment):
"""Sends a single audio chunk to Groq and returns its transcription with the index."""
logger.info(f"Starting transcription for chunk {chunk_index + 1}...")
try:
with io.BytesIO() as chunk_bytes:
audio_chunk.export(chunk_bytes, format="wav")
chunk_bytes.seek(0)
chunk_size = chunk_bytes.getbuffer().nbytes
logger.info(f"Chunk {chunk_index + 1} size: {chunk_size / (1024 * 1024):.2f} MB")
transcription = await asyncio.to_thread(
groq_client.audio.transcriptions.create,
file=("audio.wav", chunk_bytes.read()),
model="whisper-large-v3",
response_format="text"
)
logger.info(f"Finished transcription for chunk {chunk_index + 1}.")
return (chunk_index, transcription)
except Exception as e:
logger.error(f"Error transcribing chunk {chunk_index + 1}: {e}")
return (chunk_index, f"[TRANSCRIPTION FAILED FOR SEGMENT {chunk_index+1}]")
async def process_transcript_chunk(chunk_index: int, chunk_text: str):
"""Process a single transcript chunk for summary and action items."""
logger.info(f"Starting processing for transcript chunk {chunk_index + 1}...")
try:
summary_task = asyncio.to_thread(
groq_client.chat.completions.create,
model="gemma2-9b-it",
messages=[{"role": "system", "content": CHUNK_SUMMARIZATION_SYSTEM_PROMPT}, {"role": "user", "content": chunk_text}],
temperature=0.2,
max_tokens=512
)
action_task = asyncio.to_thread(
groq_client.chat.completions.create,
model="llama-3.1-8b-instant",
messages=[{"role": "system", "content": ACTION_ITEMS_SYSTEM_PROMPT}, {"role": "user", "content": chunk_text}],
temperature=0.1,
max_tokens=512,
response_format={"type": "json_object"}
)
summary_completion, action_completion = await asyncio.gather(summary_task, action_task)
summary = summary_completion.choices[0].message.content
action_items_json = json.loads(action_completion.choices[0].message.content)
action_items = action_items_json.get("action_items", [])
logger.info(f"Finished processing for transcript chunk {chunk_index + 1}.")
return (chunk_index, summary, action_items)
except Exception as e:
logger.error(f"Error processing transcript chunk {chunk_index + 1}: {e}")
return (chunk_index, "[SUMMARY FAILED]", [])
async def run_pipeline(task_id: str, file_path: Path, tasks_db: dict):
if not groq_client:
tasks_db[task_id] = {"status": "failed", "result": "Groq client is not initialized. Check API key."}
return
try:
logger.info(f"Starting pipeline for task {task_id} with file {file_path}")
# Get total duration
duration = librosa.get_duration(filename=str(file_path))
orig_sr = librosa.get_samplerate(str(file_path))
logger.info(f"Audio duration: {duration:.2f} seconds, sample rate: {orig_sr}")
target_sr = 16000
max_chunk_mb = 19.5
max_chunk_bytes = max_chunk_mb * 1024 * 1024
bytes_per_second = target_sr * 2 * 1 # 16-bit mono
max_chunk_duration = (max_chunk_bytes - 1000) / bytes_per_second # conservative
# Configurable base chunk duration, but cap at max
base_chunk_duration = int(os.getenv("CHUNK_DURATION_S", 300)) # default 5 minutes
chunk_duration = min(base_chunk_duration, max_chunk_duration)
logger.info(f"Using chunk duration: {chunk_duration:.2f} seconds")
num_chunks = int(np.ceil(duration / chunk_duration))
logger.info(f"Number of chunks: {num_chunks}")
transcription_tasks = []
for i in range(num_chunks):
offset = i * chunk_duration
this_dur = min(chunk_duration, duration - offset)
logger.info(f"Loading audio chunk {i+1} (offset: {offset:.2f}s, duration: {this_dur:.2f}s)")
y_chunk, _ = librosa.load(str(file_path), sr=None, mono=True, offset=offset, duration=this_dur)
# Resample to target_sr
if _ != target_sr:
y_chunk = librosa.resample(y_chunk, orig_sr=_, target_sr=target_sr)
pcm_chunk = (y_chunk * 32767).astype(np.int16)
audio_segment = AudioSegment(
pcm_chunk.tobytes(),
frame_rate=target_sr,
sample_width=2,
channels=1
)
transcription_tasks.append(transcribe_chunk(i, audio_segment))
# Clean up memory
del y_chunk, pcm_chunk, audio_segment
gc.collect()
# Run all transcription tasks in parallel
logger.info(f"Running {len(transcription_tasks)} transcription tasks in parallel...")
transcription_results = await asyncio.gather(*transcription_tasks)
# Sort results by index
transcription_results.sort(key=lambda x: x[0])
chunk_transcripts = [text for index, text in transcription_results]
full_transcript = "\n".join(chunk_transcripts)
if not full_transcript.strip():
raise ValueError("Transcription result is empty.")
# --- Chunked Analysis with Groq LLM ---
logger.info("Starting chunked analysis with Groq LLM...")
processing_tasks = []
for i, chunk_text in enumerate(chunk_transcripts):
processing_tasks.append(process_transcript_chunk(i, chunk_text))
processing_results = await asyncio.gather(*processing_tasks)
# Sort by index
processing_results.sort(key=lambda x: x[0])
chunk_summaries = [summary for index, summary, actions in processing_results]
all_action_items = []
for index, summary, actions in processing_results:
all_action_items.extend(actions)
# Combine chunk summaries into final summary
combined_summaries = "\n\n---\n\n".join([f"Segment {i+1}:\n{summary}" for i, summary in enumerate(chunk_summaries)])
final_summary_task = asyncio.to_thread(
groq_client.chat.completions.create,
model="openai/gpt-oss-120b",
messages=[{"role": "system", "content": FINAL_SUMMARIZATION_SYSTEM_PROMPT}, {"role": "user", "content": combined_summaries}],
temperature=0.2,
reasoning_format="hidden",
max_tokens=4096
)
final_summary_completion = await final_summary_task
final_summary = final_summary_completion.choices[0].message.content
logger.info(f"Final analysis complete for task {task_id}.")
final_result = {
"transcript": full_transcript,
"summary": final_summary,
"action_items": all_action_items,
}
tasks_db[task_id] = {"status": "complete", "result": final_result}
except Exception as e:
logger.error(f"Error in pipeline for task {task_id}: {e}", exc_info=True)
tasks_db[task_id] = {"status": "failed", "result": str(e)}
finally:
# Clean up the temporary audio file
if os.path.exists(file_path):
os.remove(file_path)
logger.info(f"Cleaned up temporary file: {file_path}") |