Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python3 | |
| import argparse | |
| import os | |
| import random | |
| import time | |
| import glob | |
| import cv2 | |
| import numpy as np | |
| import torch | |
| class PedestrianDetector: | |
| def __init__(self, | |
| model_paths, | |
| score_threshold=0.3, | |
| target_size=(800, 1333), | |
| tta=False, | |
| tile_grid=(1, 1), | |
| nms_thr=0.5): | |
| """ | |
| Args: | |
| model_path (str): path to traced .pt model | |
| score_threshold (float): minimum score to keep a box | |
| target_size (h, w): network input size | |
| tta (bool): if True, do horizontal-flip TTA | |
| tile_grid (rows, cols): if >1, split the image into that many tiles | |
| nms_thr (float): IoU threshold for merging overlapping detections (0 to disable) | |
| """ | |
| self.device = "cuda" if torch.cuda.is_available() else "cpu" | |
| self.score_threshold = score_threshold | |
| self.target_size = target_size | |
| self.tta = tta | |
| self.tile_grid = tuple(tile_grid) | |
| self.nms_thr = nms_thr | |
| self.models = [ | |
| self._load_model(model_path) | |
| for model_path in model_paths | |
| ] | |
| # same normalization as used in training | |
| self.mean = np.array([123.675, 116.28, 103.53], dtype=np.float32) | |
| self.std = np.array([58.395, 57.12, 57.375], dtype=np.float32) | |
| def _load_model(self, model_path): | |
| assert model_path.endswith('.pt') or '_traced' in model_path, \ | |
| f"Expected a traced .pt model, got {model_path}" | |
| m = torch.jit.load(model_path, map_location=self.device) | |
| m.eval() | |
| return m.to(self.device) | |
| def _preprocess_image(self, image): | |
| h, w = image.shape[:2] | |
| scale = min(self.target_size[0] / h, self.target_size[1] / w) | |
| new_h, new_w = int(h * scale), int(w * scale) | |
| resized = cv2.resize(image, (new_w, new_h)) | |
| pad_h = self.target_size[0] - new_h | |
| pad_w = self.target_size[1] - new_w | |
| padded = cv2.copyMakeBorder( | |
| resized, 0, pad_h, 0, pad_w, | |
| cv2.BORDER_CONSTANT, value=(0, 0, 0) | |
| ) | |
| norm = (padded.astype(np.float32) - self.mean) / self.std | |
| tensor = torch.from_numpy(norm.transpose(2, 0, 1))[None].float().to(self.device) | |
| return tensor, scale | |
| def _postprocess_detections(self, output): | |
| """ | |
| output from model is assumed to be (bboxes, _) | |
| where bboxes[0].cpu().numpy() is Nx5: [x1, y1, x2, y2, score] | |
| """ | |
| bboxes, _ = output | |
| b_np = bboxes[0].cpu().numpy() | |
| scores = b_np[:, 4] | |
| mask = scores >= self.score_threshold | |
| if not mask.any(): | |
| return np.zeros((0, 5), dtype=np.float32) | |
| valid = b_np[mask] | |
| return valid # shape (M,5): x1,y1,x2,y2,score | |
| def _rescale_bboxes(self, dets, scale): | |
| # input dets: (N,5): x1,y1,x2,y2,score | |
| if dets.shape[0] == 0: | |
| return dets | |
| dets[:, :4] = dets[:, :4] / scale | |
| return dets | |
| def _nms(dets, iou_thr): | |
| """ | |
| dets: np.ndarray (N,5) => [score, x1, y1, x2, y2] | |
| returns a subset of dets after non-maximum suppression | |
| """ | |
| if dets.shape[0] == 0 or iou_thr <= 0: | |
| return dets | |
| x1 = dets[:, 1] | |
| y1 = dets[:, 2] | |
| x2 = dets[:, 3] | |
| y2 = dets[:, 4] | |
| scores = dets[:, 0] | |
| areas = (x2 - x1 + 1) * (y2 - y1 + 1) | |
| order = scores.argsort()[::-1] | |
| keep = [] | |
| while order.size > 0: | |
| i = order[0] | |
| keep.append(i) | |
| xx1 = np.maximum(x1[i], x1[order[1:]]) | |
| yy1 = np.maximum(y1[i], y1[order[1:]]) | |
| xx2 = np.minimum(x2[i], x2[order[1:]]) | |
| yy2 = np.minimum(y2[i], y2[order[1:]]) | |
| w = np.maximum(0.0, xx2 - xx1 + 1) | |
| h = np.maximum(0.0, yy2 - yy1 + 1) | |
| inter = w * h | |
| iou = inter / (areas[i] + areas[order[1:]] - inter) | |
| inds = np.where(iou <= iou_thr)[0] | |
| order = order[inds + 1] | |
| return dets[keep] | |
| def _predict_simple(self, img): | |
| """ | |
| Single-pass inference (no TTA, no tiling). | |
| Returns list of [score, x1, y1, x2, y2]. | |
| """ | |
| preds = [] | |
| tensor, scale = self._preprocess_image(img) | |
| for model in self.models: | |
| with torch.no_grad(): | |
| out = model(tensor) | |
| dets = self._postprocess_detections(out) # (M,5) x1,y1,x2,y2,score | |
| if dets.shape[0] == 0: | |
| return [] | |
| dets = self._rescale_bboxes(dets, scale) | |
| # reorder to [score, x1, y1, x2, y2] | |
| preds.append(np.stack([dets[:, 4], dets[:, 0], dets[:, 1], dets[:, 2], dets[:, 3]], axis=1)) | |
| return np.concatenate(preds, axis=0) | |
| def _predict_tta(self, img): | |
| """ | |
| Horizontal-flip augmentation. Merge original + flipped. | |
| """ | |
| h, w = img.shape[:2] | |
| all_dets = [] | |
| # 1) original | |
| det0 = self._predict_simple(img) | |
| if len(det0) > 0: | |
| all_dets.append(det0) | |
| # 2) horizontal flip | |
| img_f = img[:, ::-1, :] | |
| detf = self._predict_simple(img_f) | |
| if len(detf) > 0: | |
| detf = detf.copy() | |
| # detf[:,1]=x1, detf[:,3]=x2 | |
| x1 = detf[:, 1].copy() | |
| x2 = detf[:, 3].copy() | |
| detf[:, 1] = w - x2 | |
| detf[:, 3] = w - x1 | |
| # y coords & score unchanged | |
| all_dets.append(detf) | |
| if not all_dets: | |
| return [] | |
| merged = np.vstack(all_dets) # shape (K,5) | |
| if self.nms_thr > 0: | |
| merged = self._nms(merged, self.nms_thr) | |
| return merged.tolist() | |
| def _predict_tiles(self, img): | |
| """ | |
| Split img into grid of tiles, optionally TTA each tile, | |
| then offset coordinates and merge with NMS. | |
| """ | |
| h, w = img.shape[:2] | |
| rows, cols = self.tile_grid | |
| tile_h = int(np.ceil(h / rows)) | |
| tile_w = int(np.ceil(w / cols)) | |
| all_dets = [] | |
| for i in range(rows): | |
| y0 = i * tile_h | |
| y1 = min(y0 + tile_h, h) | |
| for j in range(cols): | |
| x0 = j * tile_w | |
| x1 = min(x0 + tile_w, w) | |
| tile = img[y0:y1, x0:x1] | |
| if tile.size == 0: | |
| continue | |
| if self.tta: | |
| dets_tile = self._predict_tta(tile) | |
| else: | |
| dets_tile = self._predict_simple(tile) | |
| # offset each box | |
| for dt in dets_tile: | |
| score, bx1, by1, bx2, by2 = dt | |
| all_dets.append([score, | |
| bx1 + x0, | |
| by1 + y0, | |
| bx2 + x0, | |
| by2 + y0]) | |
| if not all_dets: | |
| return [] | |
| all_arr = np.array(all_dets, dtype=np.float32) | |
| if self.nms_thr > 0: | |
| all_arr = self._nms(all_arr, self.nms_thr) | |
| return all_arr.tolist() | |
| def predict(self, image): | |
| # load image | |
| if isinstance(image, str): | |
| img = cv2.imread(image) | |
| if img is None: | |
| raise ValueError(f"Could not load image: {image}") | |
| else: | |
| img = image | |
| # choose pipeline | |
| if self.tile_grid[0] > 1 or self.tile_grid[1] > 1: | |
| return self._predict_tiles(img) | |
| elif self.tta: | |
| return self._predict_tta(img) | |
| else: | |
| return self._predict_simple(img) | |
| def parse_args(): | |
| p = argparse.ArgumentParser( | |
| description='Simple MMPedestron Traced Model Inference with TTA & Tiling') | |
| p.add_argument('--input', | |
| help='Path to image or folder', | |
| default='/mnt/archive/person_drone/vtuav_coco/train_rgb_images') | |
| p.add_argument('--model', | |
| help='Path to traced/exported model .pt', | |
| default='mmpedestron_onnx_mix_traced.pt') | |
| p.add_argument('--score-thr', type=float, default=0.4, | |
| help='Score threshold') | |
| p.add_argument('--tta', action='store_true', | |
| help='Enable test-time horizontal flip augmentation') | |
| p.add_argument('--tiles', nargs=2, type=int, default=[1, 1], | |
| metavar=('ROWS', 'COLS'), | |
| help='Split image into ROWS×COLS tiles (e.g. 2 2)') | |
| p.add_argument('--nms-thr', type=float, default=0.5, | |
| help='IoU threshold for NMS merging (<=0 to disable)') | |
| return p.parse_args() | |
| def draw_detections(image, detections): | |
| img = image.copy() | |
| for det in detections: | |
| score, x1, y1, x2, y2 = det | |
| x1, y1, x2, y2 = map(int, (x1, y1, x2, y2)) | |
| if score > 0.8: | |
| color = (0, 255, 0) | |
| elif score > 0.5: | |
| color = (0, 165, 255) | |
| else: | |
| color = (0, 0, 255) | |
| cv2.rectangle(img, (x1, y1), (x2, y2), color, 2) | |
| lbl = f'{score:.2f}' | |
| ts = cv2.getTextSize(lbl, cv2.FONT_HERSHEY_SIMPLEX, 0.5, 1)[0] | |
| cv2.rectangle(img, | |
| (x1, y1 - ts[1] - 4), | |
| (x1 + ts[0], y1), | |
| color, -1) | |
| cv2.putText(img, lbl, (x1, y1 - 2), | |
| cv2.FONT_HERSHEY_SIMPLEX, 0.5, | |
| (255, 255, 255), 1) | |
| return img | |
| def find_image_files(input_path): | |
| if os.path.isfile(input_path): | |
| if input_path.lower().endswith(('.jpg', '.jpeg', '.png')): | |
| return [input_path] | |
| return [] | |
| elif os.path.isdir(input_path): | |
| imgs = [] | |
| exts = ['*.jpg', '*.jpeg', '*.png', '*.JPG', '*.JPEG', '*.PNG'] | |
| for e in exts: | |
| imgs.extend(glob.glob(os.path.join(input_path, '**', e), | |
| recursive=True)) | |
| random.shuffle(imgs) | |
| return imgs | |
| else: | |
| return [] | |
| def process_image_batch(detector, image_files): | |
| total = len(image_files) | |
| for idx, path in enumerate(image_files, 1): | |
| print(f"\n[{idx}/{total}] {os.path.basename(path)}") | |
| img = cv2.imread(path) | |
| if img is None: | |
| print(" ERROR loading image, skipping") | |
| continue | |
| t0 = time.time() | |
| dets = detector.predict(img) | |
| t_ms = (time.time() - t0) * 1000 | |
| print(f" Inference: {t_ms:.1f} ms, {len(dets)} boxes") | |
| win = f'img' | |
| cv2.namedWindow(win, cv2.WINDOW_KEEPRATIO) | |
| vis = draw_detections(img, dets) | |
| # Print detection details (first 5) | |
| for j, det in enumerate(dets[:5]): | |
| score, x1, y1, x2, y2 = det | |
| print(f" {j + 1}. conf={score:.3f}, bbox=[{x1:.0f}, {y1:.0f}, {x2:.0f}, {y2:.0f}]") | |
| cv2.imshow(win, vis) | |
| key = cv2.waitKey(0) | |
| if key == 27: # ESC | |
| break | |
| def main(): | |
| args = parse_args() | |
| if not os.path.exists(args.input): | |
| print(f"ERROR: input not found: {args.input}") | |
| return | |
| if not os.path.exists(args.model): | |
| print(f"ERROR: model not found: {args.model}") | |
| return | |
| ims = find_image_files(args.input) | |
| if not ims: | |
| print("No images found.") | |
| return | |
| print("MMPedestron Inference with TTA & Tiling") | |
| print(f"Input: {args.input}") | |
| print(f"Model: {args.model}") | |
| print(f"Found {len(ims)} image(s).") | |
| print(f"TTA: {'enabled' if args.tta else 'disabled'}") | |
| print(f"Tiles: {args.tiles[0]}x{args.tiles[1]}") | |
| print(f"NMS threshold: {args.nms_thr}") | |
| try: | |
| detector = PedestrianDetector( | |
| model_paths=["mmpedestron_onnx_mix_traced.pt", "mmpedestron_onnx_v2_traced.pt"], | |
| score_threshold=args.score_thr, | |
| tta=args.tta, | |
| tile_grid=(args.tiles[0], args.tiles[1]), | |
| nms_thr=args.nms_thr | |
| ) | |
| # single vs batch | |
| if len(ims) == 1: | |
| print(f"Processing single image: {os.path.basename(ims[0])}") | |
| img = cv2.imread(ims[0]) | |
| start_time = time.time() | |
| dets = detector.predict(img) | |
| inference_time = (time.time() - start_time) * 1000 | |
| print(f"Inference time: {inference_time:.1f} ms") | |
| print(f"Detected {len(dets)} boxes") | |
| if dets: | |
| vis = draw_detections(img, dets) | |
| cv2.imshow('Result', vis) | |
| cv2.waitKey(0) | |
| cv2.destroyAllWindows() | |
| for i, det in enumerate(dets[:5]): | |
| score, x1, y1, x2, y2 = det | |
| print(f" {i + 1}. conf={score:.3f}, bbox=[{x1:.0f}, {y1:.0f}, {x2:.0f}, {y2:.0f}]") | |
| else: | |
| cv2.imshow('No Detections', img) | |
| cv2.waitKey(0) | |
| cv2.destroyAllWindows() | |
| else: | |
| print("Starting batch processing...") | |
| process_image_batch(detector, ims) | |
| except Exception as e: | |
| print(f"Error: {str(e)}") | |
| import traceback | |
| traceback.print_exc() | |
| if __name__ == '__main__': | |
| main() | |