lucid-hf's picture
CI: deploy Docker/PDM Space
98a3af2 verified
#!/usr/bin/env python3
import argparse
import os
import random
import time
import glob
import cv2
import numpy as np
import torch
class PedestrianDetector:
def __init__(self,
model_paths,
score_threshold=0.3,
target_size=(800, 1333),
tta=False,
tile_grid=(1, 1),
nms_thr=0.5):
"""
Args:
model_path (str): path to traced .pt model
score_threshold (float): minimum score to keep a box
target_size (h, w): network input size
tta (bool): if True, do horizontal-flip TTA
tile_grid (rows, cols): if >1, split the image into that many tiles
nms_thr (float): IoU threshold for merging overlapping detections (0 to disable)
"""
self.device = "cuda" if torch.cuda.is_available() else "cpu"
self.score_threshold = score_threshold
self.target_size = target_size
self.tta = tta
self.tile_grid = tuple(tile_grid)
self.nms_thr = nms_thr
self.models = [
self._load_model(model_path)
for model_path in model_paths
]
# same normalization as used in training
self.mean = np.array([123.675, 116.28, 103.53], dtype=np.float32)
self.std = np.array([58.395, 57.12, 57.375], dtype=np.float32)
def _load_model(self, model_path):
assert model_path.endswith('.pt') or '_traced' in model_path, \
f"Expected a traced .pt model, got {model_path}"
m = torch.jit.load(model_path, map_location=self.device)
m.eval()
return m.to(self.device)
def _preprocess_image(self, image):
h, w = image.shape[:2]
scale = min(self.target_size[0] / h, self.target_size[1] / w)
new_h, new_w = int(h * scale), int(w * scale)
resized = cv2.resize(image, (new_w, new_h))
pad_h = self.target_size[0] - new_h
pad_w = self.target_size[1] - new_w
padded = cv2.copyMakeBorder(
resized, 0, pad_h, 0, pad_w,
cv2.BORDER_CONSTANT, value=(0, 0, 0)
)
norm = (padded.astype(np.float32) - self.mean) / self.std
tensor = torch.from_numpy(norm.transpose(2, 0, 1))[None].float().to(self.device)
return tensor, scale
def _postprocess_detections(self, output):
"""
output from model is assumed to be (bboxes, _)
where bboxes[0].cpu().numpy() is Nx5: [x1, y1, x2, y2, score]
"""
bboxes, _ = output
b_np = bboxes[0].cpu().numpy()
scores = b_np[:, 4]
mask = scores >= self.score_threshold
if not mask.any():
return np.zeros((0, 5), dtype=np.float32)
valid = b_np[mask]
return valid # shape (M,5): x1,y1,x2,y2,score
def _rescale_bboxes(self, dets, scale):
# input dets: (N,5): x1,y1,x2,y2,score
if dets.shape[0] == 0:
return dets
dets[:, :4] = dets[:, :4] / scale
return dets
@staticmethod
def _nms(dets, iou_thr):
"""
dets: np.ndarray (N,5) => [score, x1, y1, x2, y2]
returns a subset of dets after non-maximum suppression
"""
if dets.shape[0] == 0 or iou_thr <= 0:
return dets
x1 = dets[:, 1]
y1 = dets[:, 2]
x2 = dets[:, 3]
y2 = dets[:, 4]
scores = dets[:, 0]
areas = (x2 - x1 + 1) * (y2 - y1 + 1)
order = scores.argsort()[::-1]
keep = []
while order.size > 0:
i = order[0]
keep.append(i)
xx1 = np.maximum(x1[i], x1[order[1:]])
yy1 = np.maximum(y1[i], y1[order[1:]])
xx2 = np.minimum(x2[i], x2[order[1:]])
yy2 = np.minimum(y2[i], y2[order[1:]])
w = np.maximum(0.0, xx2 - xx1 + 1)
h = np.maximum(0.0, yy2 - yy1 + 1)
inter = w * h
iou = inter / (areas[i] + areas[order[1:]] - inter)
inds = np.where(iou <= iou_thr)[0]
order = order[inds + 1]
return dets[keep]
def _predict_simple(self, img):
"""
Single-pass inference (no TTA, no tiling).
Returns list of [score, x1, y1, x2, y2].
"""
preds = []
tensor, scale = self._preprocess_image(img)
for model in self.models:
with torch.no_grad():
out = model(tensor)
dets = self._postprocess_detections(out) # (M,5) x1,y1,x2,y2,score
if dets.shape[0] == 0:
return []
dets = self._rescale_bboxes(dets, scale)
# reorder to [score, x1, y1, x2, y2]
preds.append(np.stack([dets[:, 4], dets[:, 0], dets[:, 1], dets[:, 2], dets[:, 3]], axis=1))
return np.concatenate(preds, axis=0)
def _predict_tta(self, img):
"""
Horizontal-flip augmentation. Merge original + flipped.
"""
h, w = img.shape[:2]
all_dets = []
# 1) original
det0 = self._predict_simple(img)
if len(det0) > 0:
all_dets.append(det0)
# 2) horizontal flip
img_f = img[:, ::-1, :]
detf = self._predict_simple(img_f)
if len(detf) > 0:
detf = detf.copy()
# detf[:,1]=x1, detf[:,3]=x2
x1 = detf[:, 1].copy()
x2 = detf[:, 3].copy()
detf[:, 1] = w - x2
detf[:, 3] = w - x1
# y coords & score unchanged
all_dets.append(detf)
if not all_dets:
return []
merged = np.vstack(all_dets) # shape (K,5)
if self.nms_thr > 0:
merged = self._nms(merged, self.nms_thr)
return merged.tolist()
def _predict_tiles(self, img):
"""
Split img into grid of tiles, optionally TTA each tile,
then offset coordinates and merge with NMS.
"""
h, w = img.shape[:2]
rows, cols = self.tile_grid
tile_h = int(np.ceil(h / rows))
tile_w = int(np.ceil(w / cols))
all_dets = []
for i in range(rows):
y0 = i * tile_h
y1 = min(y0 + tile_h, h)
for j in range(cols):
x0 = j * tile_w
x1 = min(x0 + tile_w, w)
tile = img[y0:y1, x0:x1]
if tile.size == 0:
continue
if self.tta:
dets_tile = self._predict_tta(tile)
else:
dets_tile = self._predict_simple(tile)
# offset each box
for dt in dets_tile:
score, bx1, by1, bx2, by2 = dt
all_dets.append([score,
bx1 + x0,
by1 + y0,
bx2 + x0,
by2 + y0])
if not all_dets:
return []
all_arr = np.array(all_dets, dtype=np.float32)
if self.nms_thr > 0:
all_arr = self._nms(all_arr, self.nms_thr)
return all_arr.tolist()
def predict(self, image):
# load image
if isinstance(image, str):
img = cv2.imread(image)
if img is None:
raise ValueError(f"Could not load image: {image}")
else:
img = image
# choose pipeline
if self.tile_grid[0] > 1 or self.tile_grid[1] > 1:
return self._predict_tiles(img)
elif self.tta:
return self._predict_tta(img)
else:
return self._predict_simple(img)
def parse_args():
p = argparse.ArgumentParser(
description='Simple MMPedestron Traced Model Inference with TTA & Tiling')
p.add_argument('--input',
help='Path to image or folder',
default='/mnt/archive/person_drone/vtuav_coco/train_rgb_images')
p.add_argument('--model',
help='Path to traced/exported model .pt',
default='mmpedestron_onnx_mix_traced.pt')
p.add_argument('--score-thr', type=float, default=0.4,
help='Score threshold')
p.add_argument('--tta', action='store_true',
help='Enable test-time horizontal flip augmentation')
p.add_argument('--tiles', nargs=2, type=int, default=[1, 1],
metavar=('ROWS', 'COLS'),
help='Split image into ROWS×COLS tiles (e.g. 2 2)')
p.add_argument('--nms-thr', type=float, default=0.5,
help='IoU threshold for NMS merging (<=0 to disable)')
return p.parse_args()
def draw_detections(image, detections):
img = image.copy()
for det in detections:
score, x1, y1, x2, y2 = det
x1, y1, x2, y2 = map(int, (x1, y1, x2, y2))
if score > 0.8:
color = (0, 255, 0)
elif score > 0.5:
color = (0, 165, 255)
else:
color = (0, 0, 255)
cv2.rectangle(img, (x1, y1), (x2, y2), color, 2)
lbl = f'{score:.2f}'
ts = cv2.getTextSize(lbl, cv2.FONT_HERSHEY_SIMPLEX, 0.5, 1)[0]
cv2.rectangle(img,
(x1, y1 - ts[1] - 4),
(x1 + ts[0], y1),
color, -1)
cv2.putText(img, lbl, (x1, y1 - 2),
cv2.FONT_HERSHEY_SIMPLEX, 0.5,
(255, 255, 255), 1)
return img
def find_image_files(input_path):
if os.path.isfile(input_path):
if input_path.lower().endswith(('.jpg', '.jpeg', '.png')):
return [input_path]
return []
elif os.path.isdir(input_path):
imgs = []
exts = ['*.jpg', '*.jpeg', '*.png', '*.JPG', '*.JPEG', '*.PNG']
for e in exts:
imgs.extend(glob.glob(os.path.join(input_path, '**', e),
recursive=True))
random.shuffle(imgs)
return imgs
else:
return []
def process_image_batch(detector, image_files):
total = len(image_files)
for idx, path in enumerate(image_files, 1):
print(f"\n[{idx}/{total}] {os.path.basename(path)}")
img = cv2.imread(path)
if img is None:
print(" ERROR loading image, skipping")
continue
t0 = time.time()
dets = detector.predict(img)
t_ms = (time.time() - t0) * 1000
print(f" Inference: {t_ms:.1f} ms, {len(dets)} boxes")
win = f'img'
cv2.namedWindow(win, cv2.WINDOW_KEEPRATIO)
vis = draw_detections(img, dets)
# Print detection details (first 5)
for j, det in enumerate(dets[:5]):
score, x1, y1, x2, y2 = det
print(f" {j + 1}. conf={score:.3f}, bbox=[{x1:.0f}, {y1:.0f}, {x2:.0f}, {y2:.0f}]")
cv2.imshow(win, vis)
key = cv2.waitKey(0)
if key == 27: # ESC
break
def main():
args = parse_args()
if not os.path.exists(args.input):
print(f"ERROR: input not found: {args.input}")
return
if not os.path.exists(args.model):
print(f"ERROR: model not found: {args.model}")
return
ims = find_image_files(args.input)
if not ims:
print("No images found.")
return
print("MMPedestron Inference with TTA & Tiling")
print(f"Input: {args.input}")
print(f"Model: {args.model}")
print(f"Found {len(ims)} image(s).")
print(f"TTA: {'enabled' if args.tta else 'disabled'}")
print(f"Tiles: {args.tiles[0]}x{args.tiles[1]}")
print(f"NMS threshold: {args.nms_thr}")
try:
detector = PedestrianDetector(
model_paths=["mmpedestron_onnx_mix_traced.pt", "mmpedestron_onnx_v2_traced.pt"],
score_threshold=args.score_thr,
tta=args.tta,
tile_grid=(args.tiles[0], args.tiles[1]),
nms_thr=args.nms_thr
)
# single vs batch
if len(ims) == 1:
print(f"Processing single image: {os.path.basename(ims[0])}")
img = cv2.imread(ims[0])
start_time = time.time()
dets = detector.predict(img)
inference_time = (time.time() - start_time) * 1000
print(f"Inference time: {inference_time:.1f} ms")
print(f"Detected {len(dets)} boxes")
if dets:
vis = draw_detections(img, dets)
cv2.imshow('Result', vis)
cv2.waitKey(0)
cv2.destroyAllWindows()
for i, det in enumerate(dets[:5]):
score, x1, y1, x2, y2 = det
print(f" {i + 1}. conf={score:.3f}, bbox=[{x1:.0f}, {y1:.0f}, {x2:.0f}, {y2:.0f}]")
else:
cv2.imshow('No Detections', img)
cv2.waitKey(0)
cv2.destroyAllWindows()
else:
print("Starting batch processing...")
process_image_batch(detector, ims)
except Exception as e:
print(f"Error: {str(e)}")
import traceback
traceback.print_exc()
if __name__ == '__main__':
main()