Spaces:
Sleeping
Sleeping
| import base64 | |
| import hashlib | |
| import re | |
| import shutil | |
| import subprocess | |
| import sys | |
| import tempfile | |
| import time | |
| from pathlib import Path | |
| import cv2 | |
| import numpy as np | |
| import streamlit as st | |
| import torch | |
| from PIL import Image | |
| from utils.model_manager import get_model_manager, load_model | |
| import os | |
| # Choose one of these: | |
| # A) Env var: APP_ENV=prod on your server / cloud | |
| APP_ENV = os.getenv("natsar", "local").lower() | |
| # B) Or secrets: put env="prod" in .streamlit/secrets.toml on your server | |
| # APP_ENV = st.secrets.get("env", "local").lower() | |
| IS_LOCAL = True | |
| # … later, where you currently render your Deploy controls … | |
| if IS_LOCAL: | |
| st.markdown(""" | |
| <style> | |
| /* Hide the toolbar and header (which creates the black bar) */ | |
| [data-testid="stToolbar"] { display: none !important; } | |
| header[data-testid="stHeader"] { display: none !important; } | |
| /* (Legacy fallbacks) */ | |
| #MainMenu { visibility: hidden; } | |
| footer { visibility: hidden; } | |
| /* Remove the empty space the header leaves behind */ | |
| [data-testid="stAppViewContainer"] { padding-top: 0 !important; } | |
| [data-testid="stAppViewContainer"] .main .block-container { padding-top: 0 !important; } | |
| /* Give the sidebar a little breathing room at the top */ | |
| section[data-testid="stSidebar"] > div:first-child { padding-top: 0.5rem; } | |
| </style> | |
| """, unsafe_allow_html=True) | |
| def _image_to_data_url(path: str) -> str: | |
| p = Path(path) | |
| if not p.is_absolute(): | |
| p = Path(__file__).parent / p | |
| mime = "image/png" if p.suffix.lower() == ".png" else "image/jpeg" | |
| b64 = base64.b64encode(p.read_bytes()).decode() | |
| return f"data:{mime};base64,{b64}" | |
| # --- Page setup --- | |
| st.markdown(""" | |
| <style> | |
| .block-container { padding-top: 1.2rem; padding-bottom: 0.6rem; max-width: 1400px; } | |
| h2, h3, h4 { margin: 0.4rem 0; } | |
| textarea { min-height: 70px !important; } | |
| /* Light outline on inputs */ | |
| .stSelectbox > div, .stTextInput > div, .stNumberInput > div, .stDateInput > div, .stTextArea > div { | |
| border: 1px solid #D0D0D0; border-radius: 6px; | |
| } | |
| </style> | |
| """, unsafe_allow_html=True) | |
| st.set_page_config(page_title="signal watch", layout="wide") | |
| st.markdown( | |
| "<h2 style='text-align:center;margin-top:0;font-size:4.0em;color:yellow'>SAR-X<sup>ai</h2>" | |
| "<h2 style='text-align:center;margin-top:0'>Signal Watch 👁️</h2>", | |
| unsafe_allow_html=True, | |
| ) | |
| # Get model manager instance | |
| model_manager = get_model_manager() | |
| # =============== Sidebar: custom menu + cards =============== | |
| with st.sidebar: | |
| logo_data_url = _image_to_data_url("../resources/images/lucid_insights_logo.png") | |
| st.markdown( | |
| f""" | |
| <div style="text-align:left; margin: 0 0 0.8rem 0.25rem;"> | |
| <img src="{logo_data_url}" alt="Lucid Insights" style="width:160px; height:auto; margin-bottom:0.5rem;"> | |
| </div> | |
| """, | |
| unsafe_allow_html=True, | |
| ) | |
| st.markdown("---") | |
| st.page_link("app.py", label="Home") | |
| #st.page_link("pages/lost_at_sea.py", label="Lost at Sea") | |
| st.page_link("pages/signal_watch.py", label="Signal Watch") | |
| st.page_link("pages/bushland_beacon.py", label="Bushland Beacon") | |
| st.page_link("pages/misc_find.py", label="Misc Finder") | |
| st.markdown("---") | |
| st.page_link("pages/task_drone.py", label="Task Drone") | |
| st.page_link("pages/task_satellite.py", label="Task Satellite") | |
| st.page_link("pages/information.py", label="Survival Information") | |
| st.markdown("---") | |
| # ---------- Sidebar: Video feed ---------- | |
| st.header("Video feed") | |
| VIDEO_DIR = Path(__file__).resolve().parents[1] / "resources" / "videos" / "raw" | |
| youtube_urls = { | |
| "Sydney Australia Stream": "https://www.youtube.com/watch?v=HRg1gJi6yqc", | |
| "Tokyo Live Stream": "https://www.youtube.com/watch?v=cH7VBI4QQzA", | |
| "Los Angeles Live Stream": "https://www.youtube.com/watch?v=3LXQWU67Ufk", | |
| "Time Square Live Stream": "https://www.youtube.com/watch?v=rnXIjl_Rzy4", | |
| "Dublin Live Stream": "https://www.youtube.com/watch?v=u4UZ4UvZXrg", | |
| "Abbey Road Live Stream": "https://www.youtube.com/watch?v=57w2gYXjRic", | |
| "Australian Bush Drone": "https://www.youtube.com/watch?v=L38iUHJ2KJ8", | |
| } | |
| video_map = {} | |
| if VIDEO_DIR.exists(): | |
| for p in sorted(VIDEO_DIR.glob("*")): | |
| if p.suffix.lower() in {".mp4", ".mov", ".avi", ".mkv"}: | |
| video_map[p.name] = str(p) | |
| live_stream_options = [ | |
| "Sydney Australia Stream", | |
| "Tokyo Live Stream", | |
| "Los Angeles Live Stream", | |
| "Time Square Live Stream", | |
| "Dublin Live Stream", | |
| "Abbey Road Live Stream" | |
| ] | |
| # Include Webcam at the top of the list (commented out in your original) | |
| source_options = live_stream_options + list(video_map.keys()) | |
| #source_options = ["Webcam"] + live_stream_options + list(video_map.keys()) | |
| src_choice = st.selectbox("Video Source", source_options, index=0) | |
| # Webcam controls (shown only when Webcam is selected) | |
| webcam_index = 0 | |
| webcam_size = (1280, 720) | |
| if src_choice == "Webcam": | |
| webcam_index = st.number_input("Camera index", min_value=0, max_value=10, value=0, step=1) | |
| res_label = st.selectbox( | |
| "Resolution", | |
| ["1920x1080", "1280x720", "640x480"], | |
| index=1, | |
| help="Select a capture resolution supported by your camera." | |
| ) | |
| w, h = map(int, res_label.split("x")) | |
| webcam_size = (w, h) | |
| # Time limit options for live streams | |
| max_seconds = None # Always unlimited mode | |
| st.info("⚠️ Video Stream is running in unlimited mode — use Stop to end") | |
| # == Controls: Play & Detect on one row; Stop underneath == | |
| row_play, row_detect = st.columns(2) | |
| with row_play: | |
| play_only = st.button("▶️ Play", key="btn_play", use_container_width=True, help="Play raw video without detection") | |
| with row_detect: | |
| run_detection = st.button("🔎 Detect", key="btn_detect", use_container_width=True) | |
| stop_detection = st.button("🛑 Stop", key="btn_stop", use_container_width=True, help="Stop current stream") | |
| st.markdown("---") | |
| # Parameters card (shared) | |
| st.header("Parameters") | |
| confidence_threshold = st.slider("Minimum Confidence", 0.0, 1.0, 0.5, 0.01) | |
| device = model_manager.device | |
| default_preview_fps = 30 if device == "cuda" else 6 | |
| preview_fps_limit = st.slider( | |
| "Preview FPS Limit", | |
| 1, | |
| 60, | |
| default_preview_fps, | |
| help="Lower this if the live preview stutters on CPU-only hosts.", | |
| ) | |
| default_stride = 1 if device == "cuda" else 2 | |
| frame_stride = st.slider( | |
| "Frame Stride", | |
| 1, | |
| 5, | |
| default_stride, | |
| help="Higher values skip frames to keep inference responsive on slower hardware.", | |
| ) | |
| # --- NEW: live toggle for drawing boxes --- | |
| st.markdown("### Visualisation") | |
| st.toggle( | |
| "Show bounding boxes", | |
| value=True, | |
| key="show_boxes", | |
| help="Toggle overlays on/off while the stream is running.", | |
| ) | |
| st.markdown("---") | |
| # Render model selection UI | |
| model_label, model_key = model_manager.render_model_selection( | |
| key_prefix="signal_watch" | |
| ) | |
| st.markdown("---") | |
| # Render device information | |
| model_manager.render_device_info() | |
| # YouTube cookies (optional) | |
| cookies_file = ( | |
| Path(__file__).resolve().parents[1] | |
| / "resources" | |
| / "cookies" | |
| / "www.youtube.com_cookies (1).txt" | |
| ) | |
| # Process cookies file path or uploaded file | |
| if cookies_file is not None: | |
| try: | |
| # Handle both Path objects and Streamlit UploadedFile objects | |
| if hasattr(cookies_file, "getvalue"): | |
| # Streamlit UploadedFile object | |
| cookie_bytes = cookies_file.getvalue() | |
| if cookie_bytes: | |
| cookie_hash = hashlib.sha256(cookie_bytes).hexdigest()[:16] | |
| cookie_path_obj = Path(tempfile.gettempdir()) / f"yt_cookies_{cookie_hash}.txt" | |
| cookie_path_obj.write_bytes(cookie_bytes) | |
| cookies_path = str(cookie_path_obj) | |
| else: | |
| cookies_path = None | |
| st.warning("Uploaded cookies file is empty; ignoring it.") | |
| else: | |
| # Path object - use directly | |
| if cookies_file.exists(): | |
| cookies_path = str(cookies_file) | |
| else: | |
| cookies_path = None | |
| st.warning(f"Cookies file not found: {cookies_file}") | |
| except Exception as exc: | |
| cookies_path = None | |
| st.error(f"Failed to process cookies file: {exc!s}") | |
| else: | |
| cookies_path = None | |
| def find_ytdlp_executable() -> str | None: | |
| """Find the yt-dlp executable, checking multiple possible locations.""" | |
| ytdlp_path = shutil.which("yt-dlp") | |
| if ytdlp_path: | |
| return ytdlp_path | |
| venv_paths = [ | |
| Path(sys.executable).parent / "yt-dlp", | |
| Path(sys.executable).parent.parent / "bin" / "yt-dlp", | |
| Path(__file__).parents[3] / ".venv" / "bin" / "yt-dlp", | |
| Path(__file__).parents[3] / ".venv" / "Scripts" / "yt-dlp.exe", # Windows | |
| ] | |
| for path in venv_paths: | |
| if path.exists() and path.is_file(): | |
| return str(path) | |
| return None | |
| def is_youtube_url(url: str) -> bool: | |
| youtube_patterns = [ | |
| r"(?:https?://)?(?:www\.)?youtube\.com/watch\?v=", | |
| r"(?:https?://)?(?:www\.)?youtu\.be/", | |
| r"(?:https?://)?(?:www\.)?youtube\.com/live/", | |
| r"(?:https?://)?(?:www\.)?youtube\.com/embed/", | |
| ] | |
| return any(re.search(pattern, url) for pattern in youtube_patterns) | |
| def check_ytdlp_available() -> bool: | |
| ytdlp_path = find_ytdlp_executable() | |
| if not ytdlp_path: | |
| return False | |
| try: | |
| result = subprocess.run( | |
| [ytdlp_path, "--version"], | |
| check=False, | |
| capture_output=True, | |
| text=True, | |
| timeout=5, | |
| ) | |
| return result.returncode == 0 | |
| except (FileNotFoundError, subprocess.TimeoutExpired): | |
| return False | |
| def is_live_youtube_stream(url: str) -> bool: | |
| if not is_youtube_url(url): | |
| return False | |
| ytdlp_path = find_ytdlp_executable() | |
| if not ytdlp_path: | |
| return False | |
| try: | |
| cmd = [ytdlp_path, "--dump-json", "--no-playlist", "--no-warnings", url] | |
| result = subprocess.run( | |
| cmd, check=False, capture_output=True, text=True, timeout=15 | |
| ) | |
| if result.returncode == 0: | |
| import json | |
| video_info = json.loads(result.stdout) | |
| return video_info.get("is_live", False) | |
| except Exception: | |
| pass | |
| return False | |
| def extract_youtube_stream_url(url: str, cookies_path: str | None = None) -> str | None: | |
| ytdlp_path = find_ytdlp_executable() | |
| if not ytdlp_path: | |
| st.error("yt-dlp is not installed or not available in PATH.") | |
| st.info("Please install yt-dlp: `pip install yt-dlp` or `brew install yt-dlp`") | |
| return None | |
| if not check_ytdlp_available(): | |
| st.error("yt-dlp found but not working properly.") | |
| return None | |
| try: | |
| with st.spinner("Extracting YouTube stream URL..."): | |
| format_options = ["best[height<=720]", "best[height<=480]", "best", "worst"] | |
| if cookies_path and Path(cookies_path).exists(): | |
| for fmt in format_options: | |
| cmd = [ | |
| ytdlp_path, "--get-url", "--format", fmt, "--no-playlist", | |
| "--no-warnings", "--cookies", cookies_path, url, | |
| ] | |
| result = subprocess.run( | |
| cmd, check=False, capture_output=True, text=True, timeout=30 | |
| ) | |
| if result.returncode == 0: | |
| stream_url = result.stdout.strip() | |
| if stream_url and stream_url.startswith("http"): | |
| st.success(f"Successfully extracted YouTube stream URL! (format: {fmt})") | |
| return stream_url | |
| else: | |
| st.warning(f"yt-dlp failed with format {fmt}: {result.stderr}") | |
| except subprocess.TimeoutExpired: | |
| st.error("Timeout while extracting YouTube stream URL. The video might be unavailable.") | |
| except FileNotFoundError: | |
| st.error("yt-dlp not found. Please install it: pip install yt-dlp") | |
| except Exception as e: | |
| st.error(f"Error extracting YouTube stream URL: {e!s}") | |
| return None | |
| def load_model_cached(model_key: str): | |
| """Cached model loading function.""" | |
| return load_model(model_key) | |
| # ---------- Main UI logic ---------- | |
| # Initialise session state for stop functionality | |
| if "stop_processing" not in st.session_state: | |
| st.session_state.stop_processing = False | |
| def resolve_video_source( | |
| src_choice: str, | |
| cookies_path: str | None = None, | |
| webcam_index: int = 0, | |
| ): | |
| """Resolve the video source, extracting YouTube stream URLs if needed.""" | |
| if src_choice == "Webcam": | |
| return int(webcam_index) | |
| if src_choice in youtube_urls: | |
| youtube_url = youtube_urls[src_choice] | |
| st.info(f"🎥 Processing live stream: {src_choice}") | |
| stream_url = extract_youtube_stream_url(youtube_url, cookies_path) | |
| if stream_url: | |
| st.success(f"✅ Live stream ready: {src_choice}") | |
| return stream_url | |
| else: | |
| st.error(f"❌ Failed to access live stream: {src_choice}") | |
| return None | |
| # Check if it's a local video file | |
| if src_choice in video_map: | |
| return video_map[src_choice] | |
| return None | |
| def run_video_feed_detection( | |
| cap_source, | |
| conf_thr: float, | |
| stride: int, | |
| model_key: str, | |
| preview_enabled: bool = True, | |
| preview_max_fps: float | None = None, | |
| webcam_size: tuple[int, int] | None = None, # for webcam | |
| show_boxes_default: bool = True, # --- NEW: initial value; loop will live-read session toggle | |
| ): | |
| try: | |
| model = load_model_cached(model_key) | |
| except Exception as exc: | |
| st.error(str(exc)) | |
| return | |
| # Special handling flags | |
| is_youtube = isinstance(cap_source, str) and is_youtube_url(cap_source) | |
| is_webcam = isinstance(cap_source, int) | |
| cap = cv2.VideoCapture(cap_source) | |
| # For webcam, set resolution & reduce buffering | |
| if is_webcam: | |
| cap.set(cv2.CAP_PROP_BUFFERSIZE, 1) # minimal buffer for low latency | |
| if webcam_size: | |
| cap.set(cv2.CAP_PROP_FRAME_WIDTH, webcam_size[0]) | |
| cap.set(cv2.CAP_PROP_FRAME_HEIGHT, webcam_size[1]) | |
| cap.set(cv2.CAP_PROP_FPS, 30) | |
| cap.set(cv2.CAP_PROP_FOURCC, cv2.VideoWriter_fourcc(*"MJPG")) | |
| if not cap.isOpened(): | |
| st.error("Failed to open the selected source.") | |
| return | |
| try: | |
| test_ret, test_frame = cap.read() | |
| if not test_ret or test_frame is None: | |
| st.error("Video source is not providing frames. Please try a different stream.") | |
| cap.release() | |
| return | |
| cap.set(cv2.CAP_PROP_POS_FRAMES, 0) | |
| except Exception as e: | |
| st.error(f"Error testing video source: {e}") | |
| cap.release() | |
| return | |
| if is_youtube or isinstance(cap_source, str): | |
| cap.set(cv2.CAP_PROP_BUFFERSIZE, 1) | |
| cap.set(cv2.CAP_PROP_FPS, 25) | |
| cap.set(cv2.CAP_PROP_FOURCC, cv2.VideoWriter_fourcc(*"MJPG")) | |
| # Determine if local file | |
| is_file = False | |
| if isinstance(cap_source, str): | |
| if cap_source.startswith(("http://", "https://", "rtsp://")): | |
| is_file = False | |
| else: | |
| try: | |
| is_file = Path(cap_source).exists() | |
| except (OSError, ValueError): | |
| is_file = False | |
| fps = cap.get(cv2.CAP_PROP_FPS) or 25.0 | |
| total = int(cap.get(cv2.CAP_PROP_FRAME_COUNT) or 0) | |
| stride = max(1, int(stride)) | |
| frame_ph = st.empty() if preview_enabled else None | |
| prog = st.progress(0.0, text="Processing…") | |
| start_t = time.time() | |
| i = 0 | |
| last_preview_update = 0 | |
| preview_update_interval = max(1, int(30 / (preview_max_fps or 10))) # Update every N frames | |
| writer = None | |
| out_path = None | |
| if is_file: | |
| W = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) | |
| H = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) | |
| out_path = Path(tempfile.gettempdir()) / f"out_{int(time.time())}.mp4" | |
| writer = cv2.VideoWriter(str(out_path), cv2.VideoWriter_fourcc(*"mp4v"), fps, (W, H)) | |
| frame_interval = 1.0 / fps if fps and fps > 0 else 0.0 | |
| if not is_file and preview_max_fps and preview_max_fps <= 6: | |
| max_frame_skip = 15 | |
| else: | |
| max_frame_skip = 5 | |
| progress_update_seconds = 0.0 if is_file else ( | |
| 0.7 if not preview_max_fps else max(0.4, 1.0 / preview_max_fps) | |
| ) | |
| last_progress_time = start_t | |
| try: | |
| while True: | |
| if st.session_state.get("stop_processing", False): | |
| st.info("🛑 Stopping detection...") | |
| break | |
| ok, frame = cap.read() | |
| if not ok: | |
| if is_youtube or not is_file: | |
| if i < 10: | |
| time.sleep(0.5) | |
| continue | |
| break | |
| if i % max(1, stride) != 0: | |
| i += 1 | |
| continue | |
| # --- NEW: sample the live toggle each loop (defaults to given value) --- | |
| show_boxes = st.session_state.get("show_boxes", show_boxes_default) | |
| # Inference | |
| inference_start = time.time() | |
| if model_key == "deim" and hasattr(model, "annotate_frame_bgr"): | |
| vis = model.annotate_frame_bgr(frame, min_confidence=conf_thr) | |
| elif model_key == "deim": | |
| frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) | |
| annotated_pil = model.predict_image( | |
| Image.fromarray(frame_rgb), min_confidence=conf_thr | |
| ) | |
| vis = cv2.cvtColor(np.array(annotated_pil), cv2.COLOR_RGB2BGR) | |
| else: | |
| # Many models return (preds, vis). We only need the visualised frame. | |
| _, vis = model.predict_and_visualize( | |
| frame, min_confidence=conf_thr, show_score=True | |
| ) | |
| inference_elapsed = time.time() - inference_start | |
| # --- NEW: hide boxes by showing the raw frame if toggle is off --- | |
| if not show_boxes: | |
| vis = frame # present original frame (no overlays) | |
| # Drop buffered frames if we're slower than source FPS | |
| if not is_file and frame_interval > 0.0 and inference_elapsed > frame_interval: | |
| frames_to_drop = min(int(inference_elapsed / frame_interval) - 1, max_frame_skip) | |
| for _ in range(frames_to_drop): | |
| if not cap.grab(): | |
| break | |
| loop_now = time.time() | |
| # Progress | |
| if is_file or (loop_now - last_progress_time) >= progress_update_seconds: | |
| progress = i / total if total > 0 else 0 | |
| prog.progress( | |
| progress, | |
| text=f"Processing frame {i + 1}/{total if total > 0 else '∞'}...", | |
| ) | |
| last_progress_time = loop_now | |
| # Preview (throttled) | |
| if frame_ph is not None and (i - last_preview_update) >= preview_update_interval: | |
| try: | |
| display_frame = cv2.cvtColor(vis, cv2.COLOR_BGR2RGB) | |
| height, width = display_frame.shape[:2] | |
| if is_youtube or not is_file: | |
| if width > 720: | |
| scale = 720 / width | |
| new_width = 720 | |
| new_height = int(height * scale) | |
| display_frame = cv2.resize(display_frame, (new_width, new_height), interpolation=cv2.INTER_LINEAR) | |
| elif width > 1280: | |
| scale = 1280 / width | |
| new_width = 1280 | |
| new_height = int(height * scale) | |
| display_frame = cv2.resize(display_frame, (new_width, new_height)) | |
| frame_ph.image( | |
| display_frame, | |
| use_container_width=True, | |
| output_format="JPEG", | |
| channels="RGB", | |
| ) | |
| last_preview_update = i | |
| except Exception as e: | |
| if i % 30 == 0: | |
| st.warning(f"Display update failed: {e}") | |
| last_preview_update = i | |
| if writer is not None: | |
| writer.write(vis) | |
| i += 1 | |
| if is_file and total: | |
| prog.progress(min(i / total, 1.0), text=f"Processing… {i}/{total}") | |
| else: | |
| elapsed = time.time() - start_t | |
| stream_type = "YouTube" if is_youtube else ("Live" if not is_file else "File") | |
| if max_seconds is None: | |
| prog.progress(0.0, text=f"{stream_type}… {int(elapsed)}s (unlimited)") | |
| else: | |
| prog.progress( | |
| min(elapsed / max(1, max_seconds), 1.0), | |
| text=f"{stream_type}… {int(elapsed)}s", | |
| ) | |
| if elapsed >= max_seconds: | |
| return | |
| if not is_file: | |
| elapsed = time.time() - start_t | |
| if elapsed > 0: | |
| current_fps = i / elapsed | |
| if current_fps > 15: | |
| time.sleep(0.1) | |
| elif current_fps < 8: | |
| time.sleep(0.02) | |
| if i % 100 == 0 and is_youtube: | |
| if torch.cuda.is_available(): | |
| torch.cuda.empty_cache() | |
| except Exception as e: | |
| st.error(f"Error during processing: {e!s}") | |
| finally: | |
| cap.release() | |
| if writer is not None: | |
| writer.release() | |
| if torch.cuda.is_available(): | |
| torch.cuda.empty_cache() | |
| st.session_state.stop_processing = False | |
| stream_type = "YouTube" if is_youtube else ("Live" if not is_file else "File") | |
| st.success(f"Done processing {stream_type} stream!") | |
| if out_path and out_path.exists(): | |
| st.video(str(out_path)) | |
| with open(out_path, "rb") as f: | |
| st.download_button( | |
| "Download processed video", | |
| data=f.read(), | |
| file_name="detections.mp4", | |
| mime="video/mp4", | |
| ) | |
| # === play-only (no detection) === | |
| def play_video_only(src_choice: str, webcam_index: int, webcam_size: tuple[int, int]): | |
| """ | |
| Play raw video without running any model. | |
| - For YouTube/live or local files: st.video(...) | |
| - For Webcam: OpenCV capture loop until Stop pressed. | |
| """ | |
| # YouTube/live or local file -> use st.video for simplest playback | |
| if src_choice in youtube_urls: | |
| st.info("▶ Playing YouTube stream (no detection). Use ⏹️ Stop to end.") | |
| st.video(youtube_urls[src_choice]) | |
| return | |
| if src_choice in video_map: | |
| st.info("▶ Playing local file (no detection). Use ⏹️ Stop to end.") | |
| st.video(video_map[src_choice]) | |
| return | |
| # Webcam fallback -> simple OpenCV loop | |
| if src_choice == "Webcam": | |
| st.info("▶ Playing webcam (no detection). Use ⏹️ Stop to end.") | |
| ph = st.empty() | |
| cap = cv2.VideoCapture(int(webcam_index)) | |
| cap.set(cv2.CAP_PROP_BUFFERSIZE, 1) | |
| cap.set(cv2.CAP_PROP_FRAME_WIDTH, webcam_size[0]) | |
| cap.set(cv2.CAP_PROP_FRAME_HEIGHT, webcam_size[1]) | |
| cap.set(cv2.CAP_PROP_FPS, 30) | |
| cap.set(cv2.CAP_PROP_FOURCC, cv2.VideoWriter_fourcc(*"MJPG")) | |
| if not cap.isOpened(): | |
| st.error("Failed to open webcam.") | |
| return | |
| try: | |
| while True: | |
| if st.session_state.get("stop_processing", False): | |
| st.info("🛑 Stopping playback...") | |
| break | |
| ok, frame = cap.read() | |
| if not ok: | |
| time.sleep(0.05) | |
| continue | |
| frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) | |
| ph.image(frame_rgb, channels="RGB", use_container_width=True, output_format="JPEG") | |
| time.sleep(0.01) | |
| except Exception as e: | |
| st.error(f"Playback error: {e!s}") | |
| finally: | |
| cap.release() | |
| st.session_state.stop_processing = False | |
| st.success("Stopped webcam playback.") | |
| else: | |
| st.warning("Please choose a valid source to play.") | |
| # Stop/Run/Play controls | |
| if 'stop_processing' not in st.session_state: | |
| st.session_state.stop_processing = False | |
| if stop_detection: | |
| st.session_state.stop_processing = True | |
| st.info("🛑 Stop signal sent. Playback/detection will stop after current frame.") | |
| if 'play_only' not in locals(): | |
| play_only = False # safety for environments that re-run blocks | |
| if play_only: | |
| st.session_state.stop_processing = False | |
| # Use *display* URLs for simple playback (no extraction/model) | |
| play_video_only(src_choice, webcam_index=webcam_index, webcam_size=webcam_size) | |
| elif 'run_detection' in locals() and run_detection: | |
| st.session_state.stop_processing = False | |
| src = resolve_video_source(src_choice, cookies_path, webcam_index=webcam_index) | |
| # Note: 0 (webcam) is a valid source; don't treat it as falsey | |
| if src is None and src != 0: | |
| st.warning("Please select a valid source (pick a file or enter a URL).") | |
| else: | |
| run_video_feed_detection( | |
| cap_source=src, | |
| conf_thr=confidence_threshold, | |
| stride=frame_stride, | |
| model_key=model_key, | |
| preview_enabled=True, | |
| preview_max_fps=float(preview_fps_limit), | |
| webcam_size=webcam_size if src_choice == "Webcam" else None, | |
| show_boxes_default=st.session_state.get("show_boxes", True), # NEW | |
| ) | |
| else: | |
| st.info("Pick a source from the sidebar and click **🔎 Detect** for detections or **▶ Play** for raw playback.") | |