import os import tempfile import time from glob import glob from pathlib import Path from shutil import which import streamlit as st from deim_model import DeimHgnetV2MDrone from model import * from PIL import Image def main(): model = DeimHgnetV2MDrone() minimum_confidence_threshold = 0.5 st.set_page_config(page_title="SatSense Demo") st.title(":satellite: SatSense Demo") st.markdown( """ The SatSense demo app simplifies annotating images and videos taken by satellites. It employs cutting-edge object detection models to automatically analyze and recognize various objects in satellite imagery, including vehicles and ships. #### How to get started 1. **Upload Satellite Imagery:** Use the sidebar to upload your satellite imagery media files for analysis. 2. **Review Identified Objects:** Explore the annotated objects marked by the model. #### Tips for usage 1. Please clear any existing uploads in the sidebar before uploading a new file. 2. For optimal results, please upload clear and high-resolution satellite media files. 3. [Location SA Map Viewer](https://location.sa.gov.au/viewer/) provides satellite imagery that can be used as image input. SatSense simplifies the process of annotating satellite imagery and allows you to export the annotated media files. Start annotating and discovering objects of interest effortlessly! ***Note:** In its current MVP stage, the SatSense demo offers a glimpse into the world of automatic object detection in satellite imagery. Your feedback can help shape its future improvements!* """ ) # Sidebar to set minimum confidence threshold st.sidebar.header("Parameters") minimum_confidence_threshold = st.sidebar.slider( "Minimum confidence threshold", min_value=0.0, max_value=1.0, step=0.1, value=minimum_confidence_threshold, format="%.1f", ) st.sidebar.markdown("---") # Sidebar for image detection st.sidebar.header("Image Detection") uploaded_image = st.sidebar.file_uploader( "Upload an image", type=["jpg", "jpeg", "png"] ) st.sidebar.markdown("---") # Sidebar for video detection st.sidebar.header("Video Detection") uploaded_video = st.sidebar.file_uploader( "Upload a video", type=["mp4", "avi", "mov"] ) if uploaded_image: st.markdown("---") st.write("") st.markdown("#### Uploaded image") image = Image.open(uploaded_image) st.image(image, use_column_width=True) st.write("") st.write("") with st.spinner("Processing..."): annotated_image = model.predict_image( image, min_confidence=minimum_confidence_threshold ) st.markdown("#### Annotated image") st.image(annotated_image, use_column_width=True) if uploaded_video: st.markdown("---") st.write("") temp_dir = tempfile.mkdtemp() # Preserve uploaded extension to maximize compatibility with OpenCV/YOLO uploaded_ext = Path(uploaded_video.name).suffix.lower() or ".mp4" temp_video_path = os.path.join(temp_dir, f"temp_video{uploaded_ext}") annotated_dir = "./annotated_video" os.makedirs(annotated_dir, exist_ok=True) annotated_video_path_input_ext = os.path.join( annotated_dir, f"temp_video{uploaded_ext}" ) annotated_video_path_mp4 = os.path.join(annotated_dir, "temp_video.mp4") st.markdown("#### Uploaded video") uploaded_video_bytes = uploaded_video.getvalue() st.video(uploaded_video_bytes) st.write("") st.write("") progress_bar = st.progress(0.3, text="Performing object detection...") with open(temp_video_path, "wb") as video_file: video_file.write(uploaded_video.getvalue()) model.predict_video( temp_video_path, min_confidence=minimum_confidence_threshold, target_dir_name="annotated_video", ) # Resolve the actual saved annotated video. Ultralytics may write .avi even if input is .mp4 final_video_path = None preferred_candidates = [ annotated_video_path_input_ext, os.path.join(annotated_dir, "temp_video.mp4"), os.path.join(annotated_dir, "temp_video.avi"), ] for cand in preferred_candidates: if os.path.exists(cand): final_video_path = cand break if final_video_path is None: candidates = [] for pattern in ("*.mp4", "*.avi", "*.mov", "*.mkv", "*.webm"): candidates.extend(glob(os.path.join(annotated_dir, pattern))) if candidates: final_video_path = max(candidates, key=os.path.getmtime) else: progress_bar.empty() st.error( "Annotated video not found after detection. Please try again or check logs." ) return # If the annotated output isn't mp4, try converting with ffmpeg if available if Path(final_video_path).suffix.lower() != ".mp4": progress_bar.progress(0.67, text="Converting video format...") if which("ffmpeg"): import subprocess try: subprocess.run( [ "ffmpeg", "-y", "-i", final_video_path, "-c:v", "libx264", "-pix_fmt", "yuv420p", "-crf", "23", "-preset", "veryfast", "-an", annotated_video_path_mp4, ], check=True, stdout=subprocess.DEVNULL, stderr=subprocess.STDOUT, ) final_video_path = annotated_video_path_mp4 except Exception: st.warning( "ffmpeg failed to convert the video. Attempting to display original format." ) else: st.info( "Install ffmpeg to enable conversion to mp4 (e.g. `brew install ffmpeg` on macOS) or use the provided Dockerfile." ) progress_bar.progress(1.0, text="Done!") time.sleep(1) progress_bar.empty() st.markdown("#### Annotated video") annotated_video_file = open(final_video_path, "rb") annotated_video_bytes = annotated_video_file.read() # Let Streamlit infer format from the file when possible st.video(annotated_video_bytes) st.markdown("---") st.markdown("Demo built by [Lucid Insights Pty Ltd](https://lucidinsights.com.au).") if __name__ == "__main__": main()