Spaces:
Sleeping
Sleeping
| # pages/bushland_beacon.py | |
| import io | |
| import tempfile | |
| import time | |
| from pathlib import Path | |
| import cv2 | |
| import numpy as np | |
| import streamlit as st | |
| from PIL import Image | |
| from utils.model_manager import get_model_manager, load_model | |
| # =============== Page setup =============== | |
| st.set_page_config( | |
| page_title="Bushland Beacon ", layout="wide", initial_sidebar_state="expanded" | |
| ) | |
| st.markdown( | |
| "<h2 style='text-align:center;margin-top:0'>SAR-X<sup>ai</sup></h2>" | |
| "<h2 style='text-align:center;margin-top:0'>Bushland Beacon 🚨 </h2>", | |
| unsafe_allow_html=True, | |
| ) | |
| # =============== Sidebar: custom menu + cards =============== | |
| with st.sidebar: | |
| st.page_link("app.py", label="Home") | |
| st.page_link("pages/bushland_beacon.py", label="Bushland Beacon") | |
| st.page_link("pages/lost_at_sea.py", label="Lost at Sea") | |
| st.page_link("pages/signal_watch.py", label="Signal Watch") | |
| st.markdown("---") | |
| st.page_link("pages/task_satellite.py", label="Task Satellite") | |
| st.page_link("pages/task_drone.py", label="Task Drone") | |
| st.markdown("---") | |
| # Simple "card" styling in the sidebar | |
| st.markdown( | |
| """ | |
| <style> | |
| .sb-card {border:1px solid rgba(255,255,255,0.15); padding:14px; border-radius:8px; margin-bottom:16px;} | |
| .sb-card h4 {margin:0 0 10px 0; font-weight:700;} | |
| </style> | |
| """, | |
| unsafe_allow_html=True, | |
| ) | |
| # Image Detection card | |
| st.sidebar.header("Image Detection") | |
| img_file = st.file_uploader( | |
| "Upload an image", type=["jpg", "jpeg", "png"], key="img_up" | |
| ) | |
| run_img = st.button("🔍 Run Image Detection", use_container_width=True) | |
| # Video Detection card | |
| st.sidebar.header("Video Detection") | |
| vid_file = st.file_uploader( | |
| "Upload a video", type=["mp4", "mov", "avi", "mkv"], key="vid_up" | |
| ) | |
| run_vid = st.button("🎥 Run Video Detection", use_container_width=True) | |
| st.sidebar.markdown("---") | |
| # Parameters card (shared) | |
| st.sidebar.header("Parameters") | |
| conf_thr = st.slider("Minimum confidence threshold", 0.05, 0.95, 0.50, 0.01) | |
| st.sidebar.markdown("---") | |
| # Get model manager instance | |
| model_manager = get_model_manager() | |
| # Render model selection UI | |
| model_label, model_key = model_manager.render_model_selection( | |
| key_prefix="bushland_beacon" | |
| ) | |
| st.sidebar.markdown("---") | |
| # Render device information | |
| model_manager.render_device_info() | |
| # =============== Detection helpers =============== | |
| def run_image_detection(uploaded_file, conf_thr: float = 0.5, model_key: str = "deim"): | |
| try: | |
| data = uploaded_file.getvalue() | |
| img = Image.open(io.BytesIO(data)).convert("RGB") | |
| st.image(img, caption="Uploaded Image", use_container_width=True) | |
| except Exception as e: | |
| st.error(f"Error loading image: {e}") | |
| return | |
| try: | |
| model = load_model(model_key) | |
| with st.spinner("Running detection..."): | |
| annotated = model.predict_image(img, min_confidence=conf_thr) | |
| st.subheader("🎯 Detection Results") | |
| st.image(annotated, caption="Detections", width="stretch") | |
| except Exception as e: | |
| st.error(f"Error during detection: {e}") | |
| def run_video_detection(vid_bytes, conf_thr: float = 0.5, model_key: str = "deim"): | |
| tmp_in = Path(tempfile.gettempdir()) / f"in_{int(time.time())}.mp4" | |
| with open(tmp_in, "wb") as f: | |
| f.write(vid_bytes) | |
| model = load_model(model_key) | |
| # Set up video capture for preview | |
| cap = cv2.VideoCapture(str(tmp_in)) | |
| if not cap.isOpened(): | |
| st.error("Failed to open the uploaded video.") | |
| return | |
| fps = cap.get(cv2.CAP_PROP_FPS) or 25.0 | |
| total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT) or 0) | |
| # Set up preview and progress | |
| frame_ph = st.empty() | |
| prog = st.progress(0.0, text="Processing…") | |
| # Set up video writer for output | |
| W = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) | |
| H = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) | |
| tmp_out = Path(tempfile.gettempdir()) / f"out_{int(time.time())}.mp4" | |
| writer = cv2.VideoWriter(str(tmp_out), cv2.VideoWriter_fourcc(*"mp4v"), fps, (W, H)) | |
| frame_count = 0 | |
| last_preview_update = 0 | |
| preview_update_interval = 1 # Update preview every 5 frames | |
| try: | |
| with st.spinner("Processing video with live preview…"): | |
| while True: | |
| ok, frame = cap.read() | |
| if not ok: | |
| break | |
| # Process frame with model | |
| if model_key == "deim": | |
| frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) | |
| annotated_pil = model.predict_image( | |
| Image.fromarray(frame_rgb), min_confidence=conf_thr | |
| ) | |
| vis = cv2.cvtColor(np.array(annotated_pil), cv2.COLOR_RGB2BGR) | |
| else: | |
| _, vis = model.predict_and_visualize( | |
| frame, min_confidence=conf_thr, show_score=True | |
| ) | |
| # Update progress | |
| progress = frame_count / total_frames if total_frames > 0 else 0 | |
| prog.progress( | |
| progress, | |
| text=f"Processing frame {frame_count + 1}/{total_frames}...", | |
| ) | |
| # Update preview (throttled to prevent freezing) | |
| if (frame_count - last_preview_update) >= preview_update_interval: | |
| frame_ph.image( | |
| cv2.cvtColor(vis, cv2.COLOR_BGR2RGB), | |
| use_container_width=True, | |
| output_format="JPEG", | |
| channels="RGB", | |
| ) | |
| last_preview_update = frame_count | |
| # Write frame to output video | |
| writer.write(vis) | |
| frame_count += 1 | |
| except Exception as exc: | |
| st.error(f"Video detection failed: {exc}") | |
| return | |
| finally: | |
| cap.release() | |
| writer.release() | |
| st.success("Done!") | |
| # Check if output file exists before trying to display it | |
| if tmp_out.exists(): | |
| st.video(str(tmp_out)) | |
| with open(tmp_out, "rb") as f: | |
| st.download_button( | |
| "Download processed video", | |
| data=f.read(), | |
| file_name=tmp_out.name, | |
| mime="video/mp4", | |
| ) | |
| else: | |
| st.error("Video processing completed but output file was not created.") | |
| # =============== Main: hook up actions =============== | |
| if run_img: | |
| if img_file is None: | |
| st.warning("Please upload an image first.") | |
| else: | |
| run_image_detection(img_file, conf_thr=conf_thr, model_key=model_key) | |
| if run_vid: | |
| if vid_file is None: | |
| st.warning("Please upload a video first.") | |
| else: | |
| run_video_detection( | |
| vid_bytes=vid_file.read(), conf_thr=conf_thr, model_key=model_key | |
| ) | |