# detection.py from functools import lru_cache from typing import List, Tuple import os import numpy as np import torch import torch.nn as nn from PIL import Image import torchvision.transforms as T # デバッグ出力制御フラグ(環境変数で制御) DEBUG_DEIMV2 = os.getenv("DEBUG_DEIMV2", "0") == "1" # YAMLConfigをインポート(engineパッケージ経由でレジストリをロード) # モジュール登録のために、すべての必要なモジュールを明示的にインポート # 重要: モジュールファイルを直接インポートすることで、@register()デコレータを確実に実行する # まず、engineパッケージ全体をインポート try: import engine from engine import YAMLConfig except ImportError: from engine.core.yaml_config import YAMLConfig # 次に、すべての必要なモジュールファイルを直接インポート # これにより、@register()デコレータが実行され、GLOBAL_CONFIGに登録される try: # Backboneモジュール import engine.backbone.dinov3_adapter # DINOv3STAs # DEIMモジュール(すべての重要なクラスを含む) import engine.deim.hybrid_encoder # HybridEncoder - 必須 import engine.deim.deim_decoder # DEIMTransformer - 必須 import engine.deim.deim # DEIM - 必須 import engine.deim.postprocessor # PostProcessor - 必須 import engine.deim.matcher # HungarianMatcher import engine.deim.deim_criterion # DEIMCriterion # その他のモジュールもインポート(念のため) import engine.deim import engine.backbone import engine.data import engine.optim except ImportError as e: # インポートエラーは警告として出力(デバッグ用) import warnings warnings.warn(f"Some engine modules could not be imported: {e}") # (x1, y1, x2, y2, label_name, score) Detection = Tuple[float, float, float, float, str, float] # ★ここを自分のファイル名に合わせる MODEL_CONFIG_PATH = "configs/deimv2_floorplan.yaml" MODEL_WEIGHTS_PATH = "models/best_stg2.pth" def _get_device(): """ ZeroGPU対応: デバイスを遅延決定する。 ZeroGPU環境では、import時点ではGPUが利用できないため、 この関数を呼び出した時点でデバイスを決定する。 """ return torch.device("cuda" if torch.cuda.is_available() else "cpu") # クラスID→記号名マッピング # クラス名リスト: ["kanki", "kanki_shikaku", "kanki_regisuta", "window1", "window2", "door1", "door2", "bathtub1", "konro1", "sink1", "toilet1", "kasaikeihou1", "kasaikeihou2", "houi1", "houi2", "houi3"] label_map = { 0: "kanki", # kanki 5: "door1", 6: "door2", } @lru_cache(maxsize=1) def load_deimv2_model(): """ HF Spaces 起動時に一度だけ呼ばれて、cfg + model + transform をキャッシュする。 もとの main(args) でやっていた処理をここに移植。 """ import os # モジュール登録の確認と強制インポート # トップレベルでインポート済みのはずだが、念のため確認して再インポート from engine.core.workspace import GLOBAL_CONFIG # 必要なモジュールが登録されているか確認 required_modules = { 'HybridEncoder': 'engine.deim.hybrid_encoder', 'DEIMTransformer': 'engine.deim.deim_decoder', 'PostProcessor': 'engine.deim.postprocessor', 'DINOv3STAs': 'engine.backbone.dinov3_adapter', 'DEIM': 'engine.deim.deim', } missing_modules = {name: module_path for name, module_path in required_modules.items() if name not in GLOBAL_CONFIG} if missing_modules: # まだ登録されていない場合は、強制的にインポート import importlib for name, module_path in missing_modules.items(): try: importlib.import_module(module_path) except ModuleNotFoundError as e: # 依存関係の問題を明確に示す missing_dep = str(e).split("'")[1] if "'" in str(e) else str(e) raise RuntimeError( f"Failed to import module {name} from {module_path} due to missing dependency: {missing_dep}. " f"Please install it with: pip install {missing_dep}. " f"Available registered modules: {list(GLOBAL_CONFIG.keys())[:20]}" ) except Exception as e: raise RuntimeError( f"Failed to import and register module {name} from {module_path}: {e}. " f"Available registered modules: {list(GLOBAL_CONFIG.keys())[:20]}" ) # 再確認 still_missing = [name for name in required_modules.keys() if name not in GLOBAL_CONFIG] if still_missing: raise RuntimeError( f"Modules still not registered after import: {still_missing}. " f"Available registered modules: {list(GLOBAL_CONFIG.keys())}" ) # パスの確認(絶対パスに変換) config_path = os.path.abspath(MODEL_CONFIG_PATH) weights_path = os.path.abspath(MODEL_WEIGHTS_PATH) if not os.path.exists(config_path): raise FileNotFoundError(f"設定ファイルが見つかりません: {config_path}") if not os.path.exists(weights_path): raise FileNotFoundError(f"モデルファイルが見つかりません: {weights_path}") try: cfg = YAMLConfig(config_path, resume=weights_path) except Exception as e: raise RuntimeError(f"YAMLConfigの初期化に失敗しました: {e}") # もとのスクリプトと同じ処理 if 'HGNetv2' in cfg.yaml_cfg: cfg.yaml_cfg['HGNetv2']['pretrained'] = False # ZeroGPU対応: デバイスを遅延決定 device = _get_device() print(f"[DEBUG] 使用デバイス: {device}") try: checkpoint = torch.load(weights_path, map_location=device) if 'ema' in checkpoint: state = checkpoint['ema']['module'] else: state = checkpoint['model'] # 訓練時と設定のズレがある場合でも動かせるように緩めにロード model_state = cfg.model.state_dict() # デバッグ情報: チェックポイント内のデコーダー関連キーのリストアップ decoder_keys_in_checkpoint = [k for k in state.keys() if 'decoder' in k.lower()] print(f"[DEBUG] チェックポイント内のデコーダー関連キー数: {len(decoder_keys_in_checkpoint)}") if decoder_keys_in_checkpoint: print(f"[DEBUG] チェックポイント内のデコーダー関連キー(最初の20件): {decoder_keys_in_checkpoint[:20]}") # デバッグ情報: モデル内のデコーダー関連キーのリストアップ decoder_keys_in_model = [k for k in model_state.keys() if 'decoder' in k.lower()] print(f"[DEBUG] モデル内のデコーダー関連キー数: {len(decoder_keys_in_model)}") if decoder_keys_in_model: print(f"[DEBUG] モデル内のデコーダー関連キー(最初の20件): {decoder_keys_in_model[:20]}") # デバッグ情報: デコーダーパラメータの値の確認(読み込み前) if decoder_keys_in_checkpoint: first_decoder_key = decoder_keys_in_checkpoint[0] if first_decoder_key in state: print(f"[DEBUG] チェックポイント内のデコーダーパラメータ '{first_decoder_key}' の値の範囲: min={state[first_decoder_key].min():.6f}, max={state[first_decoder_key].max():.6f}, mean={state[first_decoder_key].mean():.6f}") compatible_state = {} skipped = [] for k, v in state.items(): if k in model_state and model_state[k].shape == v.shape: compatible_state[k] = v else: skipped.append(k) load_result = cfg.model.load_state_dict(compatible_state, strict=False) # デバッグ情報: キーのマッチング確認 matched_decoder_keys = [k for k in decoder_keys_in_checkpoint if k in model_state and k in compatible_state] unmatched_decoder_keys = [k for k in decoder_keys_in_checkpoint if k not in model_state or k not in compatible_state] print(f"[DEBUG] マッチしたデコーダー関連キー数: {len(matched_decoder_keys)}") print(f"[DEBUG] マッチしなかったデコーダー関連キー数: {len(unmatched_decoder_keys)}") if unmatched_decoder_keys: print(f"[DEBUG] マッチしなかったデコーダー関連キー(最初の20件): {unmatched_decoder_keys[:20]}") # デバッグ情報: デコーダーパラメータの値の確認(読み込み後) decoder_params_after = {k: v for k, v in cfg.model.named_parameters() if 'decoder' in k.lower()} if decoder_params_after: first_decoder_param_after = next(iter(decoder_params_after.values())) print(f"[DEBUG] 読み込み後のデコーダーパラメータの値の範囲: min={first_decoder_param_after.min():.6f}, max={first_decoder_param_after.max():.6f}, mean={first_decoder_param_after.mean():.6f}") # すべてのデコーダーパラメータの値の範囲も確認 all_decoder_values = torch.cat([p.flatten() for p in decoder_params_after.values()]) print(f"[DEBUG] 読み込み後の全デコーダーパラメータの値の範囲: min={all_decoder_values.min():.6f}, max={all_decoder_values.max():.6f}, mean={all_decoder_values.mean():.6f}") # デバッグ情報: 読み込み統計 print(f"[DEBUG] チェックポイント読み込み統計:") print(f" - チェックポイント内のキー数: {len(state)}") print(f" - モデル内のキー数: {len(model_state)}") print(f" - 読み込んだキー数: {len(compatible_state)}") print(f" - 形状不一致でスキップ: {len(skipped)}") print(f" - 読み込み後の欠落キー: {len(load_result.missing_keys) if load_result.missing_keys else 0}") print(f" - 読み込み後の予期しないキー: {len(load_result.unexpected_keys) if load_result.unexpected_keys else 0}") if skipped or load_result.missing_keys or load_result.unexpected_keys: print("Warning: partial checkpoint load.") if skipped: print(f" shape-mismatched skipped keys: {len(skipped)}") # 重要なキー(decoder, head関連)を優先表示 important_skipped = [k for k in skipped if any(x in k for x in ['decoder', 'head', 'class', 'bbox', 'query'])] if important_skipped: print(f" [重要] スキップされたキー(decoder/head関連): {important_skipped[:10]}") if len(skipped) <= 20: print(f" すべてのスキップされたキー: {skipped}") else: print(f" スキップされたキー(最初の20件): {skipped[:20]}") if load_result.missing_keys: print(f" missing keys after load: {len(load_result.missing_keys)}") # 重要なキー(decoder, head関連)を優先表示 important_missing = [k for k in load_result.missing_keys if any(x in k for x in ['decoder', 'head', 'class', 'bbox', 'query'])] if important_missing: print(f" [重要] 欠落キー(decoder/head関連): {important_missing[:20]}") if len(load_result.missing_keys) <= 30: print(f" すべての欠落キー: {load_result.missing_keys}") else: print(f" 欠落キー(最初の30件): {list(load_result.missing_keys)[:30]}") if load_result.unexpected_keys: print(f" unexpected keys after load: {len(load_result.unexpected_keys)}") if len(load_result.unexpected_keys) <= 20: print(f" 予期しないキー: {load_result.unexpected_keys}") else: print(f" 予期しないキー(最初の20件): {load_result.unexpected_keys[:20]}") else: print(f"[DEBUG] モデル重みの読み込み: 成功 (読み込んだキー数: {len(compatible_state)})") # デバッグ情報: モデルのパラメータ統計(読み込み後) total_params = sum(p.numel() for p in cfg.model.parameters()) trainable_params = sum(p.numel() for p in cfg.model.parameters() if p.requires_grad) print(f"[DEBUG] モデルパラメータ統計:") print(f" - 総パラメータ数: {total_params:,}") print(f" - 学習可能パラメータ数: {trainable_params:,}") # デバッグ情報: デコーダーとヘッドのパラメータが初期化されているか確認 decoder_params = {k: v for k, v in cfg.model.named_parameters() if 'decoder' in k} head_params = {k: v for k, v in cfg.model.named_parameters() if any(x in k for x in ['head', 'class', 'bbox'])} print(f"[DEBUG] デコーダー/ヘッドパラメータ:") print(f" - デコーダーパラメータ数: {len(decoder_params)}") print(f" - ヘッドパラメータ数: {len(head_params)}") if decoder_params: # 最初のデコーダーパラメータの統計を確認 first_decoder_param = next(iter(decoder_params.values())) print(f" - デコーダーパラメータの値の範囲: min={first_decoder_param.min():.6f}, max={first_decoder_param.max():.6f}, mean={first_decoder_param.mean():.6f}") if head_params: # 最初のヘッドパラメータの統計を確認 first_head_param = next(iter(head_params.values())) print(f" - ヘッドパラメータの値の範囲: min={first_head_param.min():.6f}, max={first_head_param.max():.6f}, mean={first_head_param.mean():.6f}") except Exception as e: raise RuntimeError(f"モデルの重みの読み込みに失敗しました: {e}") class Model(nn.Module): def __init__(self, cfg, device): super().__init__() self.device = device self.model = cfg.model.eval().to(device) self.postprocessor = cfg.postprocessor.eval().to(device) # デバッグ情報: モデルとポストプロセッサの設定 print(f"[DEBUG] モデル構築:") print(f" - モデルタイプ: {type(self.model).__name__}") print(f" - ポストプロセッサタイプ: {type(self.postprocessor).__name__}") if hasattr(self.postprocessor, 'use_focal_loss'): print(f" - use_focal_loss: {self.postprocessor.use_focal_loss}") if hasattr(self.postprocessor, 'num_classes'): print(f" - num_classes: {self.postprocessor.num_classes}") if hasattr(self.postprocessor, 'num_top_queries'): print(f" - num_top_queries: {self.postprocessor.num_top_queries}") def forward(self, images, orig_target_sizes): outputs = self.model(images) outputs = self.postprocessor(outputs, orig_target_sizes) return outputs model = Model(cfg, device) # eval_spatial_sizeが設定にない場合は、val_dataloaderのResizeサイズから取得 # デフォルトは640x640 if "eval_spatial_size" in cfg.yaml_cfg: img_size = cfg.yaml_cfg["eval_spatial_size"] else: # val_dataloaderのtransformsから取得を試みる val_transforms = cfg.yaml_cfg.get("val_dataloader", {}).get("dataset", {}).get("transforms", {}).get("ops", []) img_size = 640 # デフォルト値 for op in val_transforms: if isinstance(op, dict) and op.get("type") == "Resize": size = op.get("size", [640, 640]) img_size = size[0] if isinstance(size, list) else size break vit_backbone = cfg.yaml_cfg.get('DINOv3STAs', False) if vit_backbone: transforms = T.Compose([ T.Resize(img_size), T.ToTensor(), T.Normalize( mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225], ), ]) else: transforms = T.Compose([ T.Resize(img_size), T.ToTensor(), ]) return model, transforms def run_inference_single_tile( model, transforms, tile_pil: Image.Image, tile_x: int, tile_y: int, tile_w: int, tile_h: int, score_thresh: float = 0.8, ) -> List[Detection]: """ 単一タイルに対する推論を実行し、元の画像座標系に変換して返す。 """ tile_w_actual, tile_h_actual = tile_pil.size # ZeroGPU対応: モデルからデバイスを動的に取得 device = next(model.model.parameters()).device # デバッグ情報: タイルの基本情報(DEBUG_DEIMV2フラグで制御) if DEBUG_DEIMV2: is_first_few = tile_x < 2000 and tile_y < 2000 # 最初の数タイルのみ if is_first_few: print(f"[DEBUG] タイル処理開始: 座標({tile_x},{tile_y}), サイズ{tile_w}×{tile_h}") print(f"[DEBUG] PIL画像サイズ: {tile_w_actual}×{tile_h_actual}") # タイル画像の統計情報 tile_np = np.array(tile_pil) print(f"[DEBUG] タイル画像値の範囲: min={tile_np.min()}, max={tile_np.max()}, mean={tile_np.mean():.2f}") # タイルサイズをorig_sizeとして設定 orig_size = torch.tensor([[tile_w_actual, tile_h_actual]], device=device).float() # 前処理 im_tensor = transforms(tile_pil).unsqueeze(0).to(device) # デバッグ情報: リサイズ後の確認(DEBUG_DEIMV2フラグで制御) if DEBUG_DEIMV2: is_first_few = tile_x < 2000 and tile_y < 2000 if is_first_few: print(f"[DEBUG] リサイズ後のテンソル形状: {im_tensor.shape}") # リサイズ前後のサイズ比較 if hasattr(transforms, 'transforms'): for t in transforms.transforms: if isinstance(t, T.Resize): print(f"[DEBUG] Resize設定: {t.size}") break print(f"[DEBUG] 前処理後のテンソル: shape={im_tensor.shape}, dtype={im_tensor.dtype}") print(f"[DEBUG] テンソル値の範囲: min={im_tensor.min():.4f}, max={im_tensor.max():.4f}, mean={im_tensor.mean():.4f}") print(f"[DEBUG] orig_size: {orig_size}") with torch.no_grad(): # デバッグモードの場合のみ、モデルの生の出力を確認(推論を2回実行) if DEBUG_DEIMV2: model_outputs = model.model(im_tensor) is_first_few = tile_x < 2000 and tile_y < 2000 if is_first_few: print(f"[DEBUG] モデル生出力: pred_logits.shape={model_outputs['pred_logits'].shape}, pred_boxes.shape={model_outputs['pred_boxes'].shape}") print(f"[DEBUG] pred_logits範囲: min={model_outputs['pred_logits'].min():.4f}, max={model_outputs['pred_logits'].max():.4f}, mean={model_outputs['pred_logits'].mean():.4f}") # クラス別のlogitsの最大値を確認 logits_max_per_class = model_outputs['pred_logits'].max(dim=1)[0] # [1, 16] print(f"[DEBUG] クラス別最大logits: {logits_max_per_class[0].cpu().numpy()}") # sigmoid後のスコアも確認 scores_raw = torch.sigmoid(model_outputs['pred_logits']) scores_max_per_class = scores_raw.max(dim=1)[0] # [1, 16] print(f"[DEBUG] クラス別最大スコア(sigmoid後): {scores_max_per_class[0].cpu().numpy()}") print(f"[DEBUG] pred_boxes範囲: min={model_outputs['pred_boxes'].min():.4f}, max={model_outputs['pred_boxes'].max():.4f}, mean={model_outputs['pred_boxes'].mean():.4f}") print(f"[DEBUG] pred_boxes形状(cxcywh形式): 最初の5件={model_outputs['pred_boxes'][0, :5, :]}") # ポストプロセッサ前の座標変換を確認 import torchvision.ops bbox_pred_raw = torchvision.ops.box_convert(model_outputs['pred_boxes'], in_fmt='cxcywh', out_fmt='xyxy') print(f"[DEBUG] cxcywh→xyxy変換後(正規化座標): 最初の5件={bbox_pred_raw[0, :5, :]}") print(f"[DEBUG] xyxy変換後の範囲: min={bbox_pred_raw.min():.4f}, max={bbox_pred_raw.max():.4f}") # 本番推論(1回のみ実行) outputs = model(im_tensor, orig_size) if not outputs or len(outputs) == 0: if DEBUG_DEIMV2: print(f"[DEBUG] モデル出力が空です") return [] out = outputs[0] labels = out['labels'].detach().cpu().numpy() boxes = out['boxes'].detach().cpu().numpy() scores = out['scores'].detach().cpu().numpy() # デバッグ情報: ポストプロセッサ後の出力(DEBUG_DEIMV2フラグで制御) if DEBUG_DEIMV2: print(f"[DEBUG] ポストプロセッサ後: labels.shape={labels.shape}, boxes.shape={boxes.shape}, scores.shape={scores.shape}") print(f"[DEBUG] boxes範囲: x1=[{boxes[:, 0].min():.1f}, {boxes[:, 0].max():.1f}], y1=[{boxes[:, 1].min():.1f}, {boxes[:, 1].max():.1f}], x2=[{boxes[:, 2].min():.1f}, {boxes[:, 2].max():.1f}], y2=[{boxes[:, 3].min():.1f}, {boxes[:, 3].max():.1f}]") # デバッグ情報: タイルの検出結果詳細 print(f"[DEBUG] タイル検出結果(フィルタリング前): {len(scores)}件") if len(scores) > 0: print(f"[DEBUG] スコア範囲: min={scores.min():.4f}, max={scores.max():.4f}, mean={scores.mean():.4f}") print(f"[DEBUG] スコア分布:") print(f" - 0.0-0.3: {(scores < 0.3).sum()}件") print(f" - 0.3-0.5: {((scores >= 0.3) & (scores < 0.5)).sum()}件") print(f" - 0.5-0.7: {((scores >= 0.5) & (scores < 0.7)).sum()}件") print(f" - 0.7-0.9: {((scores >= 0.7) & (scores < 0.9)).sum()}件") print(f" - 0.9-1.0: {(scores >= 0.9).sum()}件") # ラベルの分布 unique_labels, label_counts = np.unique(labels, return_counts=True) print(f"[DEBUG] ラベル分布:") for label_id, count in zip(unique_labels, label_counts): label_name = label_map.get(int(label_id), f"class_{int(label_id)}") print(f" - クラスID {int(label_id)} ({label_name}): {count}件") # スコア閾値以上の検出数 above_thresh = scores >= score_thresh print(f"[DEBUG] スコア閾値({score_thresh})以上の検出数: {above_thresh.sum()}件") # 上位5件の詳細を表示 if len(scores) > 0: top_indices = np.argsort(scores)[::-1][:5] print(f"[DEBUG] 上位5件の検出結果:") for rank, idx in enumerate(top_indices, 1): label_id = int(labels[idx]) label_name = label_map.get(label_id, f"class_{label_id}") score = float(scores[idx]) x1, y1, x2, y2 = boxes[idx] print(f" [{rank}] {label_name}, スコア={score:.4f}, bbox=({x1:.1f},{y1:.1f},{x2:.1f},{y2:.1f})") detections: List[Detection] = [] filtered_by_thresh = 0 filtered_by_label = 0 for label, box, score in zip(labels, boxes, scores): score = float(score) if score < score_thresh: filtered_by_thresh += 1 continue x1, y1, x2, y2 = [float(v) for v in box.tolist()] label_id = int(label) label_name = label_map.get(label_id, f"class_{label_id}") # label_mapに存在しないクラスもカウント(デバッグ用) if label_id not in label_map: filtered_by_label += 1 # タイル座標を元の画像座標に変換 x1_orig = x1 + tile_x y1_orig = y1 + tile_y x2_orig = x2 + tile_x y2_orig = y2 + tile_y detections.append((x1_orig, y1_orig, x2_orig, y2_orig, label_name, score)) if DEBUG_DEIMV2 and (filtered_by_thresh > 0 or filtered_by_label > 0): print(f"[DEBUG] フィルタリング: スコア閾値で{filtered_by_thresh}件、label_mapで{filtered_by_label}件除外") return detections def run_inference( image_np: np.ndarray, score_thresh: float = 0.8, tile_size: int = 640, tile_overlap: int = 128, ) -> List[Detection]: """ タイル推論を実行する。 大きな画像をタイルに分割して推論し、結果を統合する。 Args: image_np: RGB np.ndarray (H, W, 3) score_thresh: スコア閾値 tile_size: タイルサイズ(デフォルト: 640) tile_overlap: タイル間のオーバーラップ(デフォルト: 128) Returns: [(x1,y1,x2,y2,label_name,score), ...] """ try: model, transforms = load_deimv2_model() except Exception as e: raise RuntimeError(f"モデルの読み込みに失敗しました: {e}") try: # numpy → PIL if image_np.dtype != np.uint8: if image_np.max() <= 1.0: image_np = (image_np * 255).astype(np.uint8) else: image_np = image_np.astype(np.uint8) im_pil = Image.fromarray(image_np).convert("RGB") img_w, img_h = im_pil.size if DEBUG_DEIMV2: print(f"[DEBUG] ===== タイル推論開始 =====") print(f"[DEBUG] 入力画像サイズ: {img_w}×{img_h} (ピクセル数: {img_w*img_h:,})") print(f"[DEBUG] タイルサイズ: {tile_size}×{tile_size}") print(f"[DEBUG] タイルオーバーラップ: {tile_overlap}px") # タイルに分割 step = tile_size - tile_overlap tiles = [] tile_coords = [] for y in range(0, img_h, step): for x in range(0, img_w, step): # タイルの範囲を計算 tile_x = x tile_y = y tile_x_end = min(x + tile_size, img_w) tile_y_end = min(y + tile_size, img_h) tile_w = tile_x_end - tile_x tile_h = tile_y_end - tile_y # タイルを切り出し tile = im_pil.crop((tile_x, tile_y, tile_x_end, tile_y_end)) # デバッグ情報: タイルの確認(DEBUG_DEIMV2フラグで制御) if DEBUG_DEIMV2 and len(tiles) < 3: print(f"[DEBUG] タイル {len(tiles)+1} 詳細:") print(f" - 切り出し範囲: ({tile_x}, {tile_y}) → ({tile_x_end}, {tile_y_end})") print(f" - タイルサイズ: {tile_w}×{tile_h}") print(f" - 実際のPIL画像サイズ: {tile.size}") # タイル画像の統計情報 tile_np = np.array(tile) print(f" - 画像値の範囲: min={tile_np.min()}, max={tile_np.max()}, mean={tile_np.mean():.2f}") # タイル画像を保存(デバッグ用、最初の3タイルのみ) try: debug_dir = "debug_tiles" os.makedirs(debug_dir, exist_ok=True) tile.save(f"{debug_dir}/tile_{len(tiles)+1}_x{tile_x}_y{tile_y}.png") print(f" - タイル画像を保存: {debug_dir}/tile_{len(tiles)+1}_x{tile_x}_y{tile_y}.png") except Exception as e: print(f" - タイル画像の保存に失敗: {e}") tiles.append(tile) tile_coords.append((tile_x, tile_y, tile_w, tile_h)) if DEBUG_DEIMV2: print(f"[DEBUG] タイル数: {len(tiles)}") # 各タイルに対して推論 all_detections = [] for i, (tile, (tile_x, tile_y, tile_w, tile_h)) in enumerate(zip(tiles, tile_coords)): if DEBUG_DEIMV2: print(f"[DEBUG] タイル {i+1}/{len(tiles)}: 座標({tile_x},{tile_y}), サイズ{tile_w}×{tile_h}") tile_detections = run_inference_single_tile( model, transforms, tile, tile_x, tile_y, tile_w, tile_h, score_thresh=score_thresh ) if DEBUG_DEIMV2: print(f"[DEBUG] 検出数: {len(tile_detections)}件") all_detections.extend(tile_detections) if DEBUG_DEIMV2: print(f"[DEBUG] 総検出数(重複あり): {len(all_detections)}件") if len(all_detections) == 0: if DEBUG_DEIMV2: print(f"[DEBUG] =========================") return [] # 全検出結果の統計情報(DEBUG_DEIMV2フラグで制御) if DEBUG_DEIMV2 and len(all_detections) > 0: all_scores = [det[5] for det in all_detections] # scoreは6番目の要素 all_labels = [det[4] for det in all_detections] # label_nameは5番目の要素 print(f"[DEBUG] 全タイル統合後の統計:") print(f" - スコア範囲: min={min(all_scores):.4f}, max={max(all_scores):.4f}, mean={sum(all_scores)/len(all_scores):.4f}") # ラベルごとの集計 from collections import Counter label_counter = Counter(all_labels) print(f" - ラベル別検出数:") for label_name, count in sorted(label_counter.items(), key=lambda x: -x[1]): print(f" - {label_name}: {count}件") # NMS(Non-Maximum Suppression)で重複検出をマージ # クラスごとにNMSを適用(異なるクラス間の重複は許可) from torchvision.ops import nms # ZeroGPU対応: モデルからデバイスを動的に取得 device = next(model.model.parameters()).device # クラスごとにグループ化 detections_by_class = {} for det in all_detections: x1, y1, x2, y2, label_name, score = det if label_name not in detections_by_class: detections_by_class[label_name] = [] detections_by_class[label_name].append((x1, y1, x2, y2, score)) if DEBUG_DEIMV2: print(f"[DEBUG] NMS適用前: {len(detections_by_class)}クラス、合計{len(all_detections)}件") merged_detections = [] nms_removed = 0 for label_name, boxes_scores in detections_by_class.items(): if len(boxes_scores) == 0: continue before_nms = len(boxes_scores) # テンソルに変換 boxes_tensor = torch.tensor([[x1, y1, x2, y2] for x1, y1, x2, y2, _ in boxes_scores], device=device) scores_tensor = torch.tensor([score for _, _, _, _, score in boxes_scores], device=device) # NMS適用(IoU閾値: 0.4 - より厳しく重複を削除) keep_indices = nms(boxes_tensor, scores_tensor, iou_threshold=0.4) # マージ後の検出を追加 for idx in keep_indices.cpu().numpy(): x1, y1, x2, y2, score = boxes_scores[idx] merged_detections.append((x1, y1, x2, y2, label_name, score)) after_nms = len(keep_indices) removed = before_nms - after_nms nms_removed += removed if DEBUG_DEIMV2 and removed > 0: print(f"[DEBUG] {label_name}: NMSで{removed}件削除 ({before_nms}→{after_nms}件)") if DEBUG_DEIMV2: print(f"[DEBUG] NMS適用後: {len(merged_detections)}件 (合計{nms_removed}件削除)") # 最終結果の統計 if len(merged_detections) > 0: final_scores = [det[5] for det in merged_detections] final_labels = [det[4] for det in merged_detections] final_label_counter = Counter(final_labels) print(f"[DEBUG] 最終検出結果:") print(f" - 総検出数: {len(merged_detections)}件") print(f" - スコア範囲: min={min(final_scores):.4f}, max={max(final_scores):.4f}, mean={sum(final_scores)/len(final_scores):.4f}") print(f" - ラベル別:") for label_name, count in sorted(final_label_counter.items(), key=lambda x: -x[1]): print(f" - {label_name}: {count}件") # 上位10件を表示 sorted_detections = sorted(merged_detections, key=lambda x: x[5], reverse=True)[:10] print(f"[DEBUG] 上位10件の検出結果:") for rank, (x1, y1, x2, y2, label_name, score) in enumerate(sorted_detections, 1): print(f" [{rank}] {label_name}, スコア={score:.4f}, bbox=({x1:.1f},{y1:.1f},{x2:.1f},{y2:.1f})") print(f"[DEBUG] =========================") return merged_detections except Exception as e: raise RuntimeError(f"推論の実行に失敗しました: {e}")