Spaces:
Sleeping
Sleeping
| import os | |
| import cv2 | |
| import numpy as np | |
| import torch | |
| from ultralytics import YOLO | |
| from sort import Sort | |
| import gradio as gr | |
| # Load YOLOv12x model | |
| MODEL_PATH = "yolov12x.pt" | |
| model = YOLO(MODEL_PATH) | |
| # COCO dataset class ID for truck | |
| TRUCK_CLASS_ID = 7 # "truck" | |
| # Initialize SORT tracker | |
| tracker = Sort() | |
| # Minimum confidence threshold for detection | |
| CONFIDENCE_THRESHOLD = 0.4 # Lowered for better detection | |
| # Distance threshold to avoid duplicate counts | |
| DISTANCE_THRESHOLD = 50 | |
| # Dictionary to define keyword-based time intervals | |
| TIME_INTERVALS = { | |
| "one": 1, "two": 2, "three": 3, "four": 4, "five": 5, | |
| "six": 6, "seven": 7, "eight": 8, "nine": 9, "ten": 10, "eleven": 11 | |
| } | |
| def determine_time_interval(video_filename): | |
| """ Determines frame skip interval based on keywords in the filename. """ | |
| print(f"Checking filename: {video_filename}") # Debugging | |
| for keyword, interval in TIME_INTERVALS.items(): | |
| if keyword in video_filename: | |
| print(f"Matched keyword: {keyword} -> Interval: {interval}") # Debugging | |
| return interval | |
| print("No keyword match, using default interval: 5") # Debugging | |
| return 5 # Default interval | |
| def count_unique_trucks(video_path): | |
| """ Counts unique trucks in a video using YOLOv12x and SORT tracking. """ | |
| cap = cv2.VideoCapture(video_path) | |
| if not cap.isOpened(): | |
| return {"Error": "Unable to open video file."} | |
| # Reset variables at the start of each analysis | |
| unique_truck_ids = set() | |
| truck_history = {} | |
| # Get FPS of the video | |
| fps = int(cap.get(cv2.CAP_PROP_FPS)) | |
| # Extract filename from the path and convert to lowercase | |
| video_filename = os.path.basename(video_path).lower() | |
| # Determine the dynamic time interval based on filename keywords | |
| time_interval = determine_time_interval(video_filename) | |
| # Get total frames in the video | |
| total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) | |
| # Ensure frame_skip does not exceed total frames | |
| frame_skip = min(fps * time_interval, total_frames // 2) # Reduced skipping | |
| frame_count = 0 | |
| # Reinitialize the tracker to clear any previous state | |
| tracker = Sort() | |
| while True: | |
| ret, frame = cap.read() | |
| if not ret: | |
| break # End of video | |
| frame_count += 1 | |
| if frame_count % frame_skip != 0: | |
| continue # Skip frames based on interval | |
| # Run YOLOv12x inference | |
| results = model(frame, verbose=False) | |
| detections = [] | |
| for result in results: | |
| for box in result.boxes: | |
| class_id = int(box.cls.item()) # Get class ID | |
| confidence = float(box.conf.item()) # Get confidence score | |
| # Track only trucks | |
| if class_id == TRUCK_CLASS_ID and confidence > CONFIDENCE_THRESHOLD: | |
| x1, y1, x2, y2 = map(int, box.xyxy[0]) # Get bounding box | |
| detections.append([x1, y1, x2, y2, confidence]) | |
| # Debugging: Check detections | |
| print(f"Frame {frame_count}: Detections -> {detections}") | |
| if len(detections) > 0: | |
| detections = np.array(detections) | |
| tracked_objects = tracker.update(detections) | |
| else: | |
| tracked_objects = [] # Prevent tracker from resetting | |
| # Debugging: Check tracked objects | |
| print(f"Frame {frame_count}: Tracked Objects -> {tracked_objects}") | |
| for obj in tracked_objects: | |
| truck_id = int(obj[4]) # Unique ID assigned by SORT | |
| x1, y1, x2, y2 = obj[:4] # Get the bounding box coordinates | |
| truck_center = (x1 + x2) / 2, (y1 + y2) / 2 # Calculate truck center | |
| # If truck is already in history, check movement distance | |
| if truck_id in truck_history: | |
| last_position = truck_history[truck_id]["position"] | |
| distance = np.linalg.norm(np.array(truck_center) - np.array(last_position)) | |
| if distance > DISTANCE_THRESHOLD: | |
| unique_truck_ids.add(truck_id) # Add only if moved significantly | |
| else: | |
| # If truck is not in history, add it | |
| truck_history[truck_id] = { | |
| "frame_count": frame_count, | |
| "position": truck_center | |
| } | |
| unique_truck_ids.add(truck_id) | |
| cap.release() | |
| return {"Total Unique Trucks": len(unique_truck_ids)} | |
| # Gradio UI function | |
| def analyze_video(video_file): | |
| result = count_unique_trucks(video_file) | |
| return "\n".join([f"{key}: {value}" for key, value in result.items()]) | |
| # Define Gradio interface | |
| iface = gr.Interface( | |
| fn=analyze_video, | |
| inputs=gr.Video(label="Upload Video"), | |
| outputs=gr.Textbox(label="Analysis Result"), | |
| title="YOLOv12x Unique Truck Counter", | |
| description="Upload a video to count unique trucks using YOLOv12x and SORT tracking." | |
| ) | |
| # Launch the Gradio app | |
| if __name__ == "__main__": | |
| iface.launch() | |