Spaces:
Sleeping
Sleeping
| import os | |
| import tempfile | |
| import time | |
| from glob import glob | |
| from pathlib import Path | |
| from shutil import which | |
| import streamlit as st | |
| from deim_model import DeimHgnetV2MDrone | |
| from model import * | |
| from PIL import Image | |
| def main(): | |
| model = DeimHgnetV2MDrone() | |
| minimum_confidence_threshold = 0.5 | |
| st.set_page_config(page_title="SatSense Demo") | |
| st.title(":satellite: SatSense Demo") | |
| st.markdown( | |
| """ | |
| The SatSense demo app simplifies annotating images and videos taken by satellites. | |
| It employs cutting-edge object detection models to automatically analyze and recognize | |
| various objects in satellite imagery, including vehicles and ships. | |
| #### How to get started | |
| 1. **Upload Satellite Imagery:** Use the sidebar to upload your satellite imagery media | |
| files for analysis. | |
| 2. **Review Identified Objects:** Explore the annotated objects marked by the model. | |
| #### Tips for usage | |
| 1. Please clear any existing uploads in the sidebar before uploading a new file. | |
| 2. For optimal results, please upload clear and high-resolution satellite media files. | |
| 3. [Location SA Map Viewer](https://location.sa.gov.au/viewer/) provides satellite imagery that can be used as image input. | |
| SatSense simplifies the process of annotating satellite imagery and allows you to | |
| export the annotated media files. Start annotating and discovering objects of interest | |
| effortlessly! | |
| ***Note:** In its current MVP stage, the SatSense demo offers a glimpse into the | |
| world of automatic object detection in satellite imagery. Your feedback can help shape | |
| its future improvements!* | |
| """ | |
| ) | |
| # Sidebar to set minimum confidence threshold | |
| st.sidebar.header("Parameters") | |
| minimum_confidence_threshold = st.sidebar.slider( | |
| "Minimum confidence threshold", | |
| min_value=0.0, | |
| max_value=1.0, | |
| step=0.1, | |
| value=minimum_confidence_threshold, | |
| format="%.1f", | |
| ) | |
| st.sidebar.markdown("---") | |
| # Sidebar for image detection | |
| st.sidebar.header("Image Detection") | |
| uploaded_image = st.sidebar.file_uploader( | |
| "Upload an image", type=["jpg", "jpeg", "png"] | |
| ) | |
| st.sidebar.markdown("---") | |
| # Sidebar for video detection | |
| st.sidebar.header("Video Detection") | |
| uploaded_video = st.sidebar.file_uploader( | |
| "Upload a video", type=["mp4", "avi", "mov"] | |
| ) | |
| if uploaded_image: | |
| st.markdown("---") | |
| st.write("") | |
| st.markdown("#### Uploaded image") | |
| image = Image.open(uploaded_image) | |
| st.image(image, use_column_width=True) | |
| st.write("") | |
| st.write("") | |
| with st.spinner("Processing..."): | |
| annotated_image = model.predict_image( | |
| image, min_confidence=minimum_confidence_threshold | |
| ) | |
| st.markdown("#### Annotated image") | |
| st.image(annotated_image, use_column_width=True) | |
| if uploaded_video: | |
| st.markdown("---") | |
| st.write("") | |
| temp_dir = tempfile.mkdtemp() | |
| # Preserve uploaded extension to maximize compatibility with OpenCV/YOLO | |
| uploaded_ext = Path(uploaded_video.name).suffix.lower() or ".mp4" | |
| temp_video_path = os.path.join(temp_dir, f"temp_video{uploaded_ext}") | |
| annotated_dir = "./annotated_video" | |
| os.makedirs(annotated_dir, exist_ok=True) | |
| annotated_video_path_input_ext = os.path.join( | |
| annotated_dir, f"temp_video{uploaded_ext}" | |
| ) | |
| annotated_video_path_mp4 = os.path.join(annotated_dir, "temp_video.mp4") | |
| st.markdown("#### Uploaded video") | |
| uploaded_video_bytes = uploaded_video.getvalue() | |
| st.video(uploaded_video_bytes) | |
| st.write("") | |
| st.write("") | |
| progress_bar = st.progress(0.3, text="Performing object detection...") | |
| with open(temp_video_path, "wb") as video_file: | |
| video_file.write(uploaded_video.getvalue()) | |
| model.predict_video( | |
| temp_video_path, | |
| min_confidence=minimum_confidence_threshold, | |
| target_dir_name="annotated_video", | |
| ) | |
| # Resolve the actual saved annotated video. Ultralytics may write .avi even if input is .mp4 | |
| final_video_path = None | |
| preferred_candidates = [ | |
| annotated_video_path_input_ext, | |
| os.path.join(annotated_dir, "temp_video.mp4"), | |
| os.path.join(annotated_dir, "temp_video.avi"), | |
| ] | |
| for cand in preferred_candidates: | |
| if os.path.exists(cand): | |
| final_video_path = cand | |
| break | |
| if final_video_path is None: | |
| candidates = [] | |
| for pattern in ("*.mp4", "*.avi", "*.mov", "*.mkv", "*.webm"): | |
| candidates.extend(glob(os.path.join(annotated_dir, pattern))) | |
| if candidates: | |
| final_video_path = max(candidates, key=os.path.getmtime) | |
| else: | |
| progress_bar.empty() | |
| st.error( | |
| "Annotated video not found after detection. Please try again or check logs." | |
| ) | |
| return | |
| # If the annotated output isn't mp4, try converting with ffmpeg if available | |
| if Path(final_video_path).suffix.lower() != ".mp4": | |
| progress_bar.progress(0.67, text="Converting video format...") | |
| if which("ffmpeg"): | |
| import subprocess | |
| try: | |
| subprocess.run( | |
| [ | |
| "ffmpeg", | |
| "-y", | |
| "-i", | |
| final_video_path, | |
| "-c:v", | |
| "libx264", | |
| "-pix_fmt", | |
| "yuv420p", | |
| "-crf", | |
| "23", | |
| "-preset", | |
| "veryfast", | |
| "-an", | |
| annotated_video_path_mp4, | |
| ], | |
| check=True, | |
| stdout=subprocess.DEVNULL, | |
| stderr=subprocess.STDOUT, | |
| ) | |
| final_video_path = annotated_video_path_mp4 | |
| except Exception: | |
| st.warning( | |
| "ffmpeg failed to convert the video. Attempting to display original format." | |
| ) | |
| else: | |
| st.info( | |
| "Install ffmpeg to enable conversion to mp4 (e.g. `brew install ffmpeg` on macOS) or use the provided Dockerfile." | |
| ) | |
| progress_bar.progress(1.0, text="Done!") | |
| time.sleep(1) | |
| progress_bar.empty() | |
| st.markdown("#### Annotated video") | |
| annotated_video_file = open(final_video_path, "rb") | |
| annotated_video_bytes = annotated_video_file.read() | |
| # Let Streamlit infer format from the file when possible | |
| st.video(annotated_video_bytes) | |
| st.markdown("---") | |
| st.markdown("Demo built by [Lucid Insights Pty Ltd](https://lucidinsights.com.au).") | |
| if __name__ == "__main__": | |
| main() | |