File size: 11,273 Bytes
39195c0
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
27f255b
 
b61bb62
 
27f255b
b61bb62
27f255b
 
 
 
 
 
 
 
 
 
 
b61bb62
ae78221
b61bb62
 
27f255b
b61bb62
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
39195c0
 
 
 
 
 
 
 
 
 
a617e93
 
39195c0
 
 
 
 
 
 
 
 
 
 
 
 
27f255b
 
 
 
 
 
afc4870
27f255b
 
 
 
 
 
 
afc4870
27f255b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
39195c0
 
 
 
 
 
 
 
a617e93
 
 
 
39195c0
a617e93
 
 
 
 
 
 
 
 
 
 
 
 
ae78221
39195c0
a617e93
 
 
 
ae78221
a617e93
ae78221
 
a617e93
 
ae78221
39195c0
 
 
 
a617e93
 
 
39195c0
a617e93
39195c0
 
a617e93
 
39195c0
 
 
 
 
 
27f255b
39195c0
27f255b
 
39195c0
 
 
 
27f255b
 
39195c0
27f255b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
39195c0
bf0d7cb
27f255b
 
afc4870
ccb7a9e
39195c0
 
27f255b
 
39195c0
 
 
 
 
27f255b
 
39195c0
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
import asyncio
import gc
import io
import json
import logging
import os
from pathlib import Path

import librosa
import numpy as np
from dotenv import load_dotenv
from groq import Groq
from pydub import AudioSegment

# --- Configuration ---
load_dotenv()

# Set up basic logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

try:
    groq_client = Groq(api_key=os.getenv("GROQ_API_KEY"))
except Exception as e:
    groq_client = None
    logger.error(f"Failed to initialize Groq client: {e}")

# --- Prompts ---
CHUNK_SUMMARIZATION_SYSTEM_PROMPT = """
You are an expert AI assistant specializing in creating concise, structured, and insightful summaries of parts of meeting and lecture transcripts. This is a segment of a larger transcript. Your goal is to distill the most critical information into a format that is easy to read.

Instructions:
1.  **Identify Core Themes**: Begin by identifying the main topics and objectives discussed in this segment.
2.  **Extract Key Decisions**: Pinpoint any decisions that were made, including the rationale behind them if available.
3.  **Highlight Main Outcomes**: Detail the primary results or conclusions reached in this segment.
4.  **Structure the Output**: Present the summary in a clean, professional format. Use bullet points for clarity.
5.  **Maintain Neutrality**: The summary should be objective and free of personal interpretation or bias.
"""
FINAL_SUMMARIZATION_SYSTEM_PROMPT = """
You are an expert AI assistant specializing in combining multiple segment summaries into a single concise, structured, and insightful summary of the entire meeting or lecture. Your goal is to distill the most critical information from all segments into a format that is easy to read and act upon.

Instructions:
1.  **Identify Overall Core Themes**: Synthesize the main topics and objectives from all segments.
2.  **Extract Key Decisions**: Compile any decisions made across segments, including rationales if available.
3.  **Highlight Main Outcomes**: Detail the primary results or conclusions from the entire discussion.
4.  **Structure the Output**: Present the summary in a clean, professional format. Use bullet points for clarity.
5.  **Maintain Neutrality**: The summary should be objective and free of personal interpretation or bias.
"""
ACTION_ITEMS_SYSTEM_PROMPT = """
You are a highly specialized AI assistant tasked with identifying and extracting actionable tasks, commitments, and deadlines from a segment of a meeting or lecture transcript. Your output must be clear, concise, and formatted as a JSON object.

Instructions:
1.  **Identify Actionable Language**: Scan the text for phrases indicating a task, such as "will send," "is responsible for," "we need to," "I'll follow up on," etc.
2.  **Extract Key Components**: For each action item, identify the assigned person (if mentioned), the specific task, and any deadlines.
3.  **Format as JSON**: Return a single JSON object with a key "action_items". The value should be a list of strings, where each string is a clearly defined action item.
4.  **Be Precise**: If no specific person is assigned, state the action generally. If no deadline is mentioned, do not invent one.
5.  **Handle No Actions**: If no action items are found, return a JSON object with an empty list: {"action_items": []}.

Example Output:
{
  "action_items": [
    "Alice will send the final budget report by Friday.",
    "Bob is responsible for updating the project timeline.",
    "The marketing strategy needs to be finalized by next week's meeting."
  ]
}
"""

# --- Core Functions ---

async def transcribe_chunk(chunk_index: int, audio_chunk: AudioSegment):
    """Sends a single audio chunk to Groq and returns its transcription with the index."""
    logger.info(f"Starting transcription for chunk {chunk_index + 1}...")
    try:
        with io.BytesIO() as chunk_bytes:
            audio_chunk.export(chunk_bytes, format="wav")
            chunk_bytes.seek(0)
            chunk_size = chunk_bytes.getbuffer().nbytes
            logger.info(f"Chunk {chunk_index + 1} size: {chunk_size / (1024 * 1024):.2f} MB")
            
            transcription = await asyncio.to_thread(
                groq_client.audio.transcriptions.create,
                file=("audio.wav", chunk_bytes.read()),
                model="whisper-large-v3",
                response_format="text"
            )
        logger.info(f"Finished transcription for chunk {chunk_index + 1}.")
        return (chunk_index, transcription)
    except Exception as e:
        logger.error(f"Error transcribing chunk {chunk_index + 1}: {e}")
        return (chunk_index, f"[TRANSCRIPTION FAILED FOR SEGMENT {chunk_index+1}]")

async def process_transcript_chunk(chunk_index: int, chunk_text: str):
    """Process a single transcript chunk for summary and action items."""
    logger.info(f"Starting processing for transcript chunk {chunk_index + 1}...")
    try:
        summary_task = asyncio.to_thread(
            groq_client.chat.completions.create,
            model="gemma2-9b-it",
            messages=[{"role": "system", "content": CHUNK_SUMMARIZATION_SYSTEM_PROMPT}, {"role": "user", "content": chunk_text}],
            temperature=0.2,
            max_tokens=512
        )
        
        action_task = asyncio.to_thread(
            groq_client.chat.completions.create,
            model="llama-3.1-8b-instant",
            messages=[{"role": "system", "content": ACTION_ITEMS_SYSTEM_PROMPT}, {"role": "user", "content": chunk_text}],
            temperature=0.1,
            max_tokens=512,
            response_format={"type": "json_object"}
        )

        summary_completion, action_completion = await asyncio.gather(summary_task, action_task)

        summary = summary_completion.choices[0].message.content
        action_items_json = json.loads(action_completion.choices[0].message.content)
        action_items = action_items_json.get("action_items", [])
        
        logger.info(f"Finished processing for transcript chunk {chunk_index + 1}.")
        return (chunk_index, summary, action_items)
    except Exception as e:
        logger.error(f"Error processing transcript chunk {chunk_index + 1}: {e}")
        return (chunk_index, "[SUMMARY FAILED]", [])

async def run_pipeline(task_id: str, file_path: Path, tasks_db: dict):
    if not groq_client:
        tasks_db[task_id] = {"status": "failed", "result": "Groq client is not initialized. Check API key."}
        return

    try:
        logger.info(f"Starting pipeline for task {task_id} with file {file_path}")
        
        # Get total duration
        duration = librosa.get_duration(filename=str(file_path))
        orig_sr = librosa.get_samplerate(str(file_path))
        logger.info(f"Audio duration: {duration:.2f} seconds, sample rate: {orig_sr}")
        
        target_sr = 16000
        max_chunk_mb = 19.5
        max_chunk_bytes = max_chunk_mb * 1024 * 1024
        bytes_per_second = target_sr * 2 * 1  # 16-bit mono
        max_chunk_duration = (max_chunk_bytes - 1000) / bytes_per_second  # conservative
        
        # Configurable base chunk duration, but cap at max
        base_chunk_duration = int(os.getenv("CHUNK_DURATION_S", 300))  # default 5 minutes
        chunk_duration = min(base_chunk_duration, max_chunk_duration)
        logger.info(f"Using chunk duration: {chunk_duration:.2f} seconds")
        
        num_chunks = int(np.ceil(duration / chunk_duration))
        logger.info(f"Number of chunks: {num_chunks}")
        
        transcription_tasks = []
        for i in range(num_chunks):
            offset = i * chunk_duration
            this_dur = min(chunk_duration, duration - offset)
            logger.info(f"Loading audio chunk {i+1} (offset: {offset:.2f}s, duration: {this_dur:.2f}s)")
            
            y_chunk, _ = librosa.load(str(file_path), sr=None, mono=True, offset=offset, duration=this_dur)
            
            # Resample to target_sr
            if _ != target_sr:
                y_chunk = librosa.resample(y_chunk, orig_sr=_, target_sr=target_sr)
            
            pcm_chunk = (y_chunk * 32767).astype(np.int16)
            
            audio_segment = AudioSegment(
                pcm_chunk.tobytes(),
                frame_rate=target_sr,
                sample_width=2,
                channels=1
            )
            
            transcription_tasks.append(transcribe_chunk(i, audio_segment))
            
            # Clean up memory
            del y_chunk, pcm_chunk, audio_segment
            gc.collect()

        # Run all transcription tasks in parallel
        logger.info(f"Running {len(transcription_tasks)} transcription tasks in parallel...")
        transcription_results = await asyncio.gather(*transcription_tasks)

        # Sort results by index
        transcription_results.sort(key=lambda x: x[0])
        chunk_transcripts = [text for index, text in transcription_results]
        full_transcript = "\n".join(chunk_transcripts)

        if not full_transcript.strip():
            raise ValueError("Transcription result is empty.")

        # --- Chunked Analysis with Groq LLM ---
        logger.info("Starting chunked analysis with Groq LLM...")
        
        processing_tasks = []
        for i, chunk_text in enumerate(chunk_transcripts):
            processing_tasks.append(process_transcript_chunk(i, chunk_text))
        
        processing_results = await asyncio.gather(*processing_tasks)
        
        # Sort by index
        processing_results.sort(key=lambda x: x[0])
        
        chunk_summaries = [summary for index, summary, actions in processing_results]
        all_action_items = []
        for index, summary, actions in processing_results:
            all_action_items.extend(actions)
        
        # Combine chunk summaries into final summary
        combined_summaries = "\n\n---\n\n".join([f"Segment {i+1}:\n{summary}" for i, summary in enumerate(chunk_summaries)])
        
        final_summary_task = asyncio.to_thread(
            groq_client.chat.completions.create,
            model="openai/gpt-oss-120b",
            messages=[{"role": "system", "content": FINAL_SUMMARIZATION_SYSTEM_PROMPT}, {"role": "user", "content": combined_summaries}],
            temperature=0.2,
            reasoning_format="hidden",
            max_tokens=4096
        )
        
        final_summary_completion = await final_summary_task
        final_summary = final_summary_completion.choices[0].message.content
        
        logger.info(f"Final analysis complete for task {task_id}.")
        
        final_result = {
            "transcript": full_transcript,
            "summary": final_summary,
            "action_items": all_action_items,
        }
        tasks_db[task_id] = {"status": "complete", "result": final_result}

    except Exception as e:
        logger.error(f"Error in pipeline for task {task_id}: {e}", exc_info=True)
        tasks_db[task_id] = {"status": "failed", "result": str(e)}
    finally:
        # Clean up the temporary audio file
        if os.path.exists(file_path):
            os.remove(file_path)
            logger.info(f"Cleaned up temporary file: {file_path}")