lucid-hf's picture
CI: deploy Docker/PDM Space
7042293 verified
# pages/bushland_beacon.py
import io
import tempfile
import time
from pathlib import Path
import cv2
import numpy as np
import streamlit as st
from PIL import Image
from utils.model_manager import get_model_manager, load_model
# =============== Page setup ===============
st.set_page_config(
page_title="Bushland Beacon ", layout="wide", initial_sidebar_state="expanded"
)
st.markdown(
"<h2 style='text-align:center;margin-top:0'>SAR-X<sup>ai</sup></h2>"
"<h2 style='text-align:center;margin-top:0'>Bushland Beacon 🚨 </h2>",
unsafe_allow_html=True,
)
# =============== Sidebar: custom menu + cards ===============
with st.sidebar:
st.page_link("app.py", label="Home")
st.page_link("pages/bushland_beacon.py", label="Bushland Beacon")
st.page_link("pages/lost_at_sea.py", label="Lost at Sea")
st.page_link("pages/signal_watch.py", label="Signal Watch")
st.markdown("---")
st.page_link("pages/task_satellite.py", label="Task Satellite")
st.page_link("pages/task_drone.py", label="Task Drone")
st.markdown("---")
# Simple "card" styling in the sidebar
st.markdown(
"""
<style>
.sb-card {border:1px solid rgba(255,255,255,0.15); padding:14px; border-radius:8px; margin-bottom:16px;}
.sb-card h4 {margin:0 0 10px 0; font-weight:700;}
</style>
""",
unsafe_allow_html=True,
)
# Image Detection card
st.sidebar.header("Image Detection")
img_file = st.file_uploader(
"Upload an image", type=["jpg", "jpeg", "png"], key="img_up"
)
run_img = st.button("🔍 Run Image Detection", use_container_width=True)
# Video Detection card
st.sidebar.header("Video Detection")
vid_file = st.file_uploader(
"Upload a video", type=["mp4", "mov", "avi", "mkv"], key="vid_up"
)
run_vid = st.button("🎥 Run Video Detection", use_container_width=True)
st.sidebar.markdown("---")
# Parameters card (shared)
st.sidebar.header("Parameters")
conf_thr = st.slider("Minimum confidence threshold", 0.05, 0.95, 0.50, 0.01)
st.sidebar.markdown("---")
# Get model manager instance
model_manager = get_model_manager()
# Render model selection UI
model_label, model_key = model_manager.render_model_selection(
key_prefix="bushland_beacon"
)
st.sidebar.markdown("---")
# Render device information
model_manager.render_device_info()
# =============== Detection helpers ===============
def run_image_detection(uploaded_file, conf_thr: float = 0.5, model_key: str = "deim"):
try:
data = uploaded_file.getvalue()
img = Image.open(io.BytesIO(data)).convert("RGB")
st.image(img, caption="Uploaded Image", use_container_width=True)
except Exception as e:
st.error(f"Error loading image: {e}")
return
try:
model = load_model(model_key)
with st.spinner("Running detection..."):
annotated = model.predict_image(img, min_confidence=conf_thr)
st.subheader("🎯 Detection Results")
st.image(annotated, caption="Detections", width="stretch")
except Exception as e:
st.error(f"Error during detection: {e}")
def run_video_detection(vid_bytes, conf_thr: float = 0.5, model_key: str = "deim"):
tmp_in = Path(tempfile.gettempdir()) / f"in_{int(time.time())}.mp4"
with open(tmp_in, "wb") as f:
f.write(vid_bytes)
model = load_model(model_key)
# Set up video capture for preview
cap = cv2.VideoCapture(str(tmp_in))
if not cap.isOpened():
st.error("Failed to open the uploaded video.")
return
fps = cap.get(cv2.CAP_PROP_FPS) or 25.0
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT) or 0)
# Set up preview and progress
frame_ph = st.empty()
prog = st.progress(0.0, text="Processing…")
# Set up video writer for output
W = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
H = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
tmp_out = Path(tempfile.gettempdir()) / f"out_{int(time.time())}.mp4"
writer = cv2.VideoWriter(str(tmp_out), cv2.VideoWriter_fourcc(*"mp4v"), fps, (W, H))
frame_count = 0
last_preview_update = 0
preview_update_interval = 1 # Update preview every 5 frames
try:
with st.spinner("Processing video with live preview…"):
while True:
ok, frame = cap.read()
if not ok:
break
# Process frame with model
if model_key == "deim":
frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
annotated_pil = model.predict_image(
Image.fromarray(frame_rgb), min_confidence=conf_thr
)
vis = cv2.cvtColor(np.array(annotated_pil), cv2.COLOR_RGB2BGR)
else:
_, vis = model.predict_and_visualize(
frame, min_confidence=conf_thr, show_score=True
)
# Update progress
progress = frame_count / total_frames if total_frames > 0 else 0
prog.progress(
progress,
text=f"Processing frame {frame_count + 1}/{total_frames}...",
)
# Update preview (throttled to prevent freezing)
if (frame_count - last_preview_update) >= preview_update_interval:
frame_ph.image(
cv2.cvtColor(vis, cv2.COLOR_BGR2RGB),
use_container_width=True,
output_format="JPEG",
channels="RGB",
)
last_preview_update = frame_count
# Write frame to output video
writer.write(vis)
frame_count += 1
except Exception as exc:
st.error(f"Video detection failed: {exc}")
return
finally:
cap.release()
writer.release()
st.success("Done!")
# Check if output file exists before trying to display it
if tmp_out.exists():
st.video(str(tmp_out))
with open(tmp_out, "rb") as f:
st.download_button(
"Download processed video",
data=f.read(),
file_name=tmp_out.name,
mime="video/mp4",
)
else:
st.error("Video processing completed but output file was not created.")
# =============== Main: hook up actions ===============
if run_img:
if img_file is None:
st.warning("Please upload an image first.")
else:
run_image_detection(img_file, conf_thr=conf_thr, model_key=model_key)
if run_vid:
if vid_file is None:
st.warning("Please upload a video first.")
else:
run_video_detection(
vid_bytes=vid_file.read(), conf_thr=conf_thr, model_key=model_key
)