|
|
import os
|
|
|
import logging
|
|
|
from typing import List, Tuple
|
|
|
|
|
|
import torch
|
|
|
import numpy as np
|
|
|
from ultralytics import YOLO
|
|
|
|
|
|
|
|
|
import impact.core as core
|
|
|
from impact.core import SEG
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
|
from .utils_salia import (
|
|
|
NODE_DIR,
|
|
|
IMGSZ,
|
|
|
list_local_pt_files,
|
|
|
tensor_to_pil,
|
|
|
make_crop_region,
|
|
|
crop_image,
|
|
|
crop_ndarray2,
|
|
|
dilate_mask,
|
|
|
)
|
|
|
except ImportError:
|
|
|
|
|
|
from utils_salia import (
|
|
|
NODE_DIR,
|
|
|
IMGSZ,
|
|
|
list_local_pt_files,
|
|
|
tensor_to_pil,
|
|
|
make_crop_region,
|
|
|
crop_image,
|
|
|
crop_ndarray2,
|
|
|
dilate_mask,
|
|
|
)
|
|
|
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class TRTYOLOBBoxDetector:
|
|
|
"""
|
|
|
BBOX_DETECTOR interface compatible with Impact Pack / FaceDetailer.
|
|
|
|
|
|
Required API:
|
|
|
- setAux(x)
|
|
|
- detect(image, threshold, dilation, crop_factor, drop_size=1, detailer_hook=None)
|
|
|
- detect_combined(image, threshold, dilation)
|
|
|
"""
|
|
|
|
|
|
def __init__(self, yolo_model: YOLO, device: str = "0"):
|
|
|
self.bbox_model = yolo_model
|
|
|
self.device = device or "0"
|
|
|
|
|
|
self.aux: str | None = None
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def setAux(self, x: str):
|
|
|
"""
|
|
|
Store auxiliary info (typically a class filter like 'face').
|
|
|
FaceDetailer calls setAux('face') before detect() and setAux(None) after.
|
|
|
"""
|
|
|
self.aux = x
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def detect(
|
|
|
self,
|
|
|
image: torch.Tensor,
|
|
|
threshold: float,
|
|
|
dilation: int,
|
|
|
crop_factor: float,
|
|
|
drop_size: int = 1,
|
|
|
detailer_hook=None,
|
|
|
) -> Tuple[Tuple[int, int], List[SEG]]:
|
|
|
"""
|
|
|
Main detection method used by FaceDetailer.
|
|
|
|
|
|
Args:
|
|
|
image: ComfyUI IMAGE tensor [B, H, W, C] in 0..1.
|
|
|
threshold: confidence threshold for detections.
|
|
|
dilation: mask dilation/erosion size in pixels (>0 dilate, <0 erode).
|
|
|
crop_factor: expansion factor for bbox when computing crop_region.
|
|
|
drop_size: minimum bbox width/height to keep.
|
|
|
detailer_hook: optional hook with post_crop_region / post_detection.
|
|
|
|
|
|
Returns:
|
|
|
SEGS tuple: ( (H, W), [SEG, SEG, ...] )
|
|
|
"""
|
|
|
|
|
|
if image.dim() != 4:
|
|
|
raise ValueError(
|
|
|
"[TRTYOLOBBoxDetector] Expected IMAGE tensor with 4 dims [B, H, W, C]."
|
|
|
)
|
|
|
|
|
|
|
|
|
if image.shape[0] != 1:
|
|
|
logger.warning(
|
|
|
"[TRTYOLOBBoxDetector] Batch > 1 detected; using only the first image for detection."
|
|
|
)
|
|
|
image = image[:1]
|
|
|
|
|
|
|
|
|
h, w = int(image.shape[1]), int(image.shape[2])
|
|
|
shape = (h, w)
|
|
|
|
|
|
|
|
|
pil_img = tensor_to_pil(image)
|
|
|
|
|
|
|
|
|
pred_list = self.bbox_model(pil_img, conf=threshold, device=self.device, verbose=False)
|
|
|
if len(pred_list) == 0:
|
|
|
return (shape, [])
|
|
|
|
|
|
pred = pred_list[0]
|
|
|
boxes = pred.boxes
|
|
|
if boxes is None or boxes.xyxy is None or boxes.xyxy.shape[0] == 0:
|
|
|
return (shape, [])
|
|
|
|
|
|
xyxy = boxes.xyxy.cpu().numpy()
|
|
|
confs = boxes.conf.cpu().numpy()
|
|
|
clses = boxes.cls.cpu().numpy().astype(int)
|
|
|
names = pred.names
|
|
|
|
|
|
seg_items: List[SEG] = []
|
|
|
|
|
|
for i in range(xyxy.shape[0]):
|
|
|
x1, y1, x2, y2 = xyxy[i]
|
|
|
score = float(confs[i])
|
|
|
cls_id = int(clses[i])
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if isinstance(names, (list, tuple)):
|
|
|
label = names[cls_id] if 0 <= cls_id < len(names) else str(cls_id)
|
|
|
else:
|
|
|
|
|
|
label = names.get(cls_id, str(cls_id))
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if self.aux and isinstance(self.aux, str):
|
|
|
if label.lower() != self.aux.lower():
|
|
|
|
|
|
continue
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
box_w = x2 - x1
|
|
|
box_h = y2 - y1
|
|
|
if box_w <= drop_size or box_h <= drop_size:
|
|
|
continue
|
|
|
|
|
|
|
|
|
x1_i = max(int(np.floor(x1)), 0)
|
|
|
y1_i = max(int(np.floor(y1)), 0)
|
|
|
x2_i = min(int(np.ceil(x2)), w)
|
|
|
y2_i = min(int(np.ceil(y2)), h)
|
|
|
if x2_i <= x1_i or y2_i <= y1_i:
|
|
|
continue
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
mask = np.zeros((h, w), dtype=np.float32)
|
|
|
mask[y1_i:y2_i, x1_i:x2_i] = 1.0
|
|
|
|
|
|
|
|
|
|
|
|
if dilation:
|
|
|
mask = dilate_mask(mask, dilation)
|
|
|
|
|
|
|
|
|
item_bbox = [float(x1), float(y1), float(x2), float(y2)]
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
crop_region = make_crop_region(w, h, item_bbox, crop_factor)
|
|
|
if detailer_hook is not None and hasattr(detailer_hook, "post_crop_region"):
|
|
|
crop_region = detailer_hook.post_crop_region(w, h, item_bbox, crop_region)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
cropped_image = crop_image(image, crop_region)
|
|
|
cropped_mask = crop_ndarray2(mask, crop_region)
|
|
|
|
|
|
|
|
|
seg = SEG(
|
|
|
cropped_image,
|
|
|
cropped_mask,
|
|
|
score,
|
|
|
crop_region,
|
|
|
item_bbox,
|
|
|
label,
|
|
|
None,
|
|
|
)
|
|
|
seg_items.append(seg)
|
|
|
|
|
|
segs = (shape, seg_items)
|
|
|
|
|
|
|
|
|
if detailer_hook is not None and hasattr(detailer_hook, "post_detection"):
|
|
|
segs = detailer_hook.post_detection(segs)
|
|
|
|
|
|
return segs
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def detect_combined(
|
|
|
self,
|
|
|
image: torch.Tensor,
|
|
|
threshold: float,
|
|
|
dilation: int,
|
|
|
) -> torch.Tensor:
|
|
|
"""
|
|
|
Optional combined-mask API: returns a single MASK tensor covering all detections.
|
|
|
"""
|
|
|
shape, seg_list = self.detect(
|
|
|
image=image,
|
|
|
threshold=threshold,
|
|
|
dilation=dilation,
|
|
|
crop_factor=1.0,
|
|
|
drop_size=1,
|
|
|
detailer_hook=None,
|
|
|
)
|
|
|
return core.segs_to_combined_mask((shape, seg_list))
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class TRTYOLOEngineBuilder:
|
|
|
@classmethod
|
|
|
def INPUT_TYPES(cls):
|
|
|
pt_files = list_local_pt_files()
|
|
|
default_name = pt_files[0] if pt_files else "face.pt"
|
|
|
|
|
|
return {
|
|
|
"required": {
|
|
|
"pt_model_name": (
|
|
|
pt_files,
|
|
|
{
|
|
|
"default": default_name,
|
|
|
"tooltip": (
|
|
|
"Select a YOLO .pt file that lives in the SAME folder as this node file."
|
|
|
),
|
|
|
},
|
|
|
),
|
|
|
}
|
|
|
}
|
|
|
|
|
|
RETURN_TYPES = ("STRING",)
|
|
|
RETURN_NAMES = ("engine_path",)
|
|
|
FUNCTION = "build"
|
|
|
CATEGORY = "ImpactPack/TensorRT"
|
|
|
|
|
|
def build(self, pt_model_name: str):
|
|
|
|
|
|
pt_path = os.path.join(NODE_DIR, pt_model_name)
|
|
|
if not os.path.isfile(pt_path):
|
|
|
raise FileNotFoundError(
|
|
|
f"[TRTYOLOEngineBuilder] .pt model not found: {pt_path}"
|
|
|
)
|
|
|
|
|
|
logger.info(
|
|
|
f"[TRTYOLOEngineBuilder] Exporting TensorRT engine from '{pt_path}' "
|
|
|
f"with imgsz={IMGSZ} (H,W), batch=1, half=True, device='0', exist_ok=True"
|
|
|
)
|
|
|
|
|
|
|
|
|
try:
|
|
|
result = YOLO(pt_path).export(
|
|
|
format="engine",
|
|
|
imgsz=IMGSZ,
|
|
|
half=True,
|
|
|
device="0",
|
|
|
exist_ok=True,
|
|
|
)
|
|
|
except TypeError:
|
|
|
|
|
|
result = YOLO(pt_path).export(
|
|
|
format="engine",
|
|
|
imgsz=IMGSZ,
|
|
|
half=True,
|
|
|
device="0",
|
|
|
)
|
|
|
|
|
|
|
|
|
if isinstance(result, (list, tuple)):
|
|
|
engine_path = result[0] if len(result) > 0 else ""
|
|
|
else:
|
|
|
engine_path = result
|
|
|
|
|
|
engine_path = str(engine_path)
|
|
|
|
|
|
if not engine_path:
|
|
|
raise RuntimeError(
|
|
|
"[TRTYOLOEngineBuilder] Engine export failed (empty output path)."
|
|
|
)
|
|
|
|
|
|
|
|
|
if not os.path.isabs(engine_path):
|
|
|
|
|
|
model_dir = os.path.dirname(pt_path)
|
|
|
candidate = os.path.join(model_dir, engine_path)
|
|
|
if os.path.isfile(candidate):
|
|
|
engine_path = candidate
|
|
|
else:
|
|
|
|
|
|
candidate = os.path.join(NODE_DIR, engine_path)
|
|
|
if os.path.isfile(candidate):
|
|
|
engine_path = candidate
|
|
|
|
|
|
|
|
|
logger.info(f"[TRTYOLOEngineBuilder] Export complete. Engine path: {engine_path}")
|
|
|
return (engine_path,)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class TRTYOLOBBoxDetectorProvider:
|
|
|
@classmethod
|
|
|
def INPUT_TYPES(cls):
|
|
|
return {
|
|
|
"required": {
|
|
|
"engine_path": (
|
|
|
"STRING",
|
|
|
{
|
|
|
"default": "",
|
|
|
"tooltip": (
|
|
|
"Path to the TensorRT .engine file.\n"
|
|
|
"Can be an absolute path or relative to this node's folder.\n"
|
|
|
"Typically use the output of TRTYOLOEngineBuilder."
|
|
|
),
|
|
|
},
|
|
|
),
|
|
|
}
|
|
|
}
|
|
|
|
|
|
RETURN_TYPES = ("BBOX_DETECTOR",)
|
|
|
RETURN_NAMES = ("bbox_detector",)
|
|
|
FUNCTION = "load"
|
|
|
CATEGORY = "ImpactPack/TensorRT"
|
|
|
|
|
|
def load(self, engine_path: str):
|
|
|
if not engine_path:
|
|
|
raise ValueError(
|
|
|
"[TRTYOLOBBoxDetectorProvider] 'engine_path' is empty. "
|
|
|
"Provide a valid path or connect from TRTYOLOEngineBuilder."
|
|
|
)
|
|
|
|
|
|
engine_path = engine_path.strip()
|
|
|
|
|
|
|
|
|
if not os.path.isabs(engine_path):
|
|
|
engine_path = os.path.join(NODE_DIR, engine_path)
|
|
|
|
|
|
if not os.path.isfile(engine_path):
|
|
|
raise FileNotFoundError(
|
|
|
f"[TRTYOLOBBoxDetectorProvider] Engine file not found: {engine_path}"
|
|
|
)
|
|
|
|
|
|
logger.info(
|
|
|
f"[TRTYOLOBBoxDetectorProvider] Loading YOLO TensorRT engine from '{engine_path}' on device '0'"
|
|
|
)
|
|
|
|
|
|
|
|
|
yolo_model = YOLO(engine_path)
|
|
|
detector = TRTYOLOBBoxDetector(yolo_model, device="0")
|
|
|
|
|
|
return (detector,)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
NODE_CLASS_MAPPINGS = {
|
|
|
"TRTYOLOEngineBuilder": TRTYOLOEngineBuilder,
|
|
|
"TRTYOLOBBoxDetectorProvider": TRTYOLOBBoxDetectorProvider,
|
|
|
}
|
|
|
|
|
|
NODE_DISPLAY_NAME_MAPPINGS = {
|
|
|
"TRTYOLOEngineBuilder": "TensorRT YOLO Engine Builder (1344x768)",
|
|
|
"TRTYOLOBBoxDetectorProvider": "TensorRT YOLO BBox Detector",
|
|
|
}
|
|
|
|