Spaces:
Runtime error
Runtime error
| import os | |
| import tempfile | |
| import subprocess | |
| import matplotlib.pyplot as plt | |
| import pandas as pd | |
| import cv2 | |
| import numpy as np | |
| from tqdm import tqdm | |
| from persistence import load_detection_data | |
| def create_frame_data(json_path): | |
| """Create frame-by-frame detection data for visualization.""" | |
| try: | |
| data = load_detection_data(json_path) | |
| if not data: | |
| print("No data loaded from JSON file") | |
| return None | |
| if "video_metadata" not in data or "frame_detections" not in data: | |
| print("Invalid JSON structure: missing required fields") | |
| return None | |
| # Extract video metadata | |
| metadata = data["video_metadata"] | |
| if "fps" not in metadata or "total_frames" not in metadata: | |
| print("Invalid metadata: missing fps or total_frames") | |
| return None | |
| fps = metadata["fps"] | |
| total_frames = metadata["total_frames"] | |
| # Create frame data | |
| frame_counts = {} | |
| for frame_data in data["frame_detections"]: | |
| if "frame" not in frame_data or "objects" not in frame_data: | |
| continue # Skip invalid frame data | |
| frame_num = frame_data["frame"] | |
| frame_counts[frame_num] = len(frame_data["objects"]) | |
| # Fill in missing frames with 0 detections | |
| for frame in range(total_frames): | |
| if frame not in frame_counts: | |
| frame_counts[frame] = 0 | |
| if not frame_counts: | |
| print("No valid frame data found") | |
| return None | |
| # Convert to DataFrame | |
| df = pd.DataFrame(list(frame_counts.items()), columns=["frame", "detections"]) | |
| df["timestamp"] = df["frame"] / fps | |
| return df, metadata | |
| except Exception as e: | |
| print(f"Error creating frame data: {str(e)}") | |
| import traceback | |
| traceback.print_exc() | |
| return None | |
| def generate_frame_image(df, frame_num, temp_dir, max_y): | |
| """Generate and save a single frame of the visualization.""" | |
| # Set the style to dark background | |
| plt.style.use('dark_background') | |
| # Set global font to monospace | |
| plt.rcParams['font.family'] = 'monospace' | |
| plt.rcParams['font.monospace'] = ['DejaVu Sans Mono'] | |
| plt.figure(figsize=(10, 6)) | |
| # Plot data up to current frame | |
| current_data = df[df['frame'] <= frame_num] | |
| plt.plot(df['frame'], df['detections'], color='#1a1a1a', alpha=0.5) # Darker background line | |
| plt.plot(current_data['frame'], current_data['detections'], color='#00ff41') # Matrix green | |
| # Add vertical line for current position | |
| plt.axvline(x=frame_num, color='#ff0000', linestyle='-', alpha=0.7) # Keep red for position | |
| # Set consistent axes | |
| plt.xlim(0, len(df) - 1) | |
| plt.ylim(0, max_y * 1.1) # Add 10% padding | |
| # Add labels with Matrix green color | |
| plt.title(f'FRAME {frame_num:04d} - DETECTIONS OVER TIME', color='#00ff41', pad=20) | |
| plt.xlabel('FRAME NUMBER', color='#00ff41') | |
| plt.ylabel('NUMBER OF DETECTIONS', color='#00ff41') | |
| # Add current stats in Matrix green with monospace formatting | |
| current_detections = df[df['frame'] == frame_num]['detections'].iloc[0] | |
| plt.text(0.02, 0.98, f'CURRENT DETECTIONS: {current_detections:02d}', | |
| transform=plt.gca().transAxes, verticalalignment='top', | |
| color='#00ff41', family='monospace') | |
| # Style the grid and ticks | |
| plt.grid(True, color='#1a1a1a', linestyle='-', alpha=0.3) | |
| plt.tick_params(colors='#00ff41') | |
| # Save frame | |
| frame_path = os.path.join(temp_dir, f'frame_{frame_num:05d}.png') | |
| plt.savefig(frame_path, bbox_inches='tight', dpi=100, facecolor='black', edgecolor='none') | |
| plt.close() | |
| return frame_path | |
| def generate_gauge_frame(df, frame_num, temp_dir, detect_keyword="OBJECT"): | |
| """Generate a modern square-style binary gauge visualization frame.""" | |
| # Set the style to dark background | |
| plt.style.use('dark_background') | |
| # Set global font to monospace | |
| plt.rcParams['font.family'] = 'monospace' | |
| plt.rcParams['font.monospace'] = ['DejaVu Sans Mono'] | |
| # Create figure with 16:9 aspect ratio | |
| plt.figure(figsize=(16, 9)) | |
| # Get current detection state | |
| current_detections = df[df['frame'] == frame_num]['detections'].iloc[0] | |
| has_detection = current_detections > 0 | |
| # Create a simple gauge visualization | |
| plt.axis('off') | |
| # Set colors | |
| if has_detection: | |
| color = '#00ff41' # Matrix green for YES | |
| status = 'YES' | |
| indicator_pos = 0.8 # Right position | |
| else: | |
| color = '#ff0000' # Red for NO | |
| status = 'NO' | |
| indicator_pos = 0.2 # Left position | |
| # Draw background rectangle | |
| background = plt.Rectangle((0.1, 0.3), 0.8, 0.2, | |
| facecolor='#1a1a1a', | |
| edgecolor='#333333', | |
| linewidth=2) | |
| plt.gca().add_patch(background) | |
| # Draw indicator | |
| indicator_width = 0.05 | |
| indicator = plt.Rectangle((indicator_pos - indicator_width/2, 0.25), | |
| indicator_width, 0.3, | |
| facecolor=color, | |
| edgecolor=None) | |
| plt.gca().add_patch(indicator) | |
| # Add tick marks | |
| tick_positions = [0.2, 0.5, 0.8] # NO, CENTER, YES | |
| for x in tick_positions: | |
| plt.plot([x, x], [0.3, 0.5], color='#444444', linewidth=2) | |
| # Add YES/NO labels | |
| plt.text(0.8, 0.2, 'YES', color='#00ff41', fontsize=14, | |
| ha='center', va='center', family='monospace') | |
| plt.text(0.2, 0.2, 'NO', color='#ff0000', fontsize=14, | |
| ha='center', va='center', family='monospace') | |
| # Add status box at top with detection keyword | |
| plt.text(0.5, 0.8, f'{detect_keyword.upper()} DETECTED?', color=color, | |
| fontsize=16, ha='center', va='center', family='monospace', | |
| bbox=dict(facecolor='#1a1a1a', | |
| edgecolor=color, | |
| linewidth=2, | |
| pad=10)) | |
| # Add frame counter at bottom | |
| plt.text(0.5, 0.1, f'FRAME: {frame_num:04d}', color='#00ff41', | |
| fontsize=14, ha='center', va='center', family='monospace') | |
| # Add subtle grid lines for depth | |
| for x in np.linspace(0.2, 0.8, 7): | |
| plt.plot([x, x], [0.3, 0.5], color='#222222', linewidth=1, zorder=0) | |
| # Add glow effect to indicator | |
| for i in range(3): | |
| glow = plt.Rectangle((indicator_pos - (indicator_width + i*0.01)/2, | |
| 0.25 - i*0.01), | |
| indicator_width + i*0.01, | |
| 0.3 + i*0.02, | |
| facecolor=color, | |
| alpha=0.1/(i+1)) | |
| plt.gca().add_patch(glow) | |
| # Set consistent plot limits | |
| plt.xlim(0, 1) | |
| plt.ylim(0, 1) | |
| # Save frame with 16:9 aspect ratio | |
| frame_path = os.path.join(temp_dir, f'gauge_{frame_num:05d}.png') | |
| plt.savefig(frame_path, | |
| bbox_inches='tight', | |
| dpi=100, | |
| facecolor='black', | |
| edgecolor='none', | |
| pad_inches=0) | |
| plt.close() | |
| return frame_path | |
| def create_video_visualization(json_path, style="timeline"): | |
| """Create a video visualization of the detection data.""" | |
| try: | |
| if not json_path: | |
| return None, "No JSON file provided" | |
| if not os.path.exists(json_path): | |
| return None, f"File not found: {json_path}" | |
| # Load and process data | |
| result = create_frame_data(json_path) | |
| if result is None: | |
| return None, "Failed to load detection data from JSON file" | |
| frame_data, metadata = result | |
| if len(frame_data) == 0: | |
| return None, "No frame data found in JSON file" | |
| total_frames = metadata["total_frames"] | |
| detect_keyword = metadata.get("detect_keyword", "OBJECT") # Get the detection keyword | |
| # Create temporary directory for frames | |
| with tempfile.TemporaryDirectory() as temp_dir: | |
| max_y = frame_data['detections'].max() | |
| # Generate each frame | |
| print("Generating frames...") | |
| frame_paths = [] | |
| with tqdm(total=total_frames, desc="Generating frames") as pbar: | |
| for frame in range(total_frames): | |
| try: | |
| if style == "gauge": | |
| frame_path = generate_gauge_frame(frame_data, frame, temp_dir, detect_keyword) | |
| else: # default to timeline | |
| frame_path = generate_frame_image(frame_data, frame, temp_dir, max_y) | |
| if frame_path and os.path.exists(frame_path): | |
| frame_paths.append(frame_path) | |
| else: | |
| print(f"Warning: Failed to generate frame {frame}") | |
| pbar.update(1) | |
| except Exception as e: | |
| print(f"Error generating frame {frame}: {str(e)}") | |
| continue | |
| if not frame_paths: | |
| return None, "Failed to generate any frames" | |
| # Create output video path | |
| output_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), "outputs") | |
| os.makedirs(output_dir, exist_ok=True) | |
| output_video = os.path.join(output_dir, f"detection_visualization_{style}.mp4") | |
| # Create temp output path | |
| base, ext = os.path.splitext(output_video) | |
| temp_output = f"{base}_temp{ext}" | |
| # First pass: Create video with OpenCV VideoWriter | |
| print("Creating initial video...") | |
| # Get frame size from first image | |
| first_frame = cv2.imread(frame_paths[0]) | |
| height, width = first_frame.shape[:2] | |
| out = cv2.VideoWriter( | |
| temp_output, | |
| cv2.VideoWriter_fourcc(*"mp4v"), | |
| metadata["fps"], | |
| (width, height) | |
| ) | |
| with tqdm(total=total_frames, desc="Creating video") as pbar: # Use total_frames here too | |
| for frame_path in frame_paths: | |
| frame = cv2.imread(frame_path) | |
| out.write(frame) | |
| pbar.update(1) | |
| out.release() | |
| # Second pass: Convert to web-compatible format | |
| print("Converting to web format...") | |
| try: | |
| subprocess.run( | |
| [ | |
| "ffmpeg", | |
| "-y", | |
| "-i", | |
| temp_output, | |
| "-c:v", | |
| "libx264", | |
| "-preset", | |
| "medium", | |
| "-crf", | |
| "23", | |
| "-movflags", | |
| "+faststart", # Better web playback | |
| "-loglevel", | |
| "error", | |
| output_video, | |
| ], | |
| check=True, | |
| ) | |
| os.remove(temp_output) # Remove the temporary file | |
| if not os.path.exists(output_video): | |
| print(f"Warning: FFmpeg completed but output file not found at {output_video}") | |
| return None, "Failed to create video" | |
| # Return video path and stats | |
| stats = f"""Video Stats: | |
| FPS: {metadata['fps']} | |
| Total Frames: {metadata['total_frames']} | |
| Duration: {metadata['duration_sec']:.2f} seconds | |
| Max Detections in a Frame: {frame_data['detections'].max()} | |
| Average Detections per Frame: {frame_data['detections'].mean():.2f}""" | |
| return output_video, stats | |
| except subprocess.CalledProcessError as e: | |
| print(f"Error running FFmpeg: {str(e)}") | |
| if os.path.exists(temp_output): | |
| os.remove(temp_output) | |
| return None, f"Error creating visualization: {str(e)}" | |
| except Exception as e: | |
| print(f"Error creating video visualization: {str(e)}") | |
| import traceback | |
| traceback.print_exc() | |
| return None, f"Error creating visualization: {str(e)}" |