Spaces:
Running
on
Zero
Running
on
Zero
| # detection.py | |
| from functools import lru_cache | |
| from typing import List, Tuple | |
| import os | |
| import numpy as np | |
| import torch | |
| import torch.nn as nn | |
| from PIL import Image | |
| import torchvision.transforms as T | |
| # デバッグ出力制御フラグ(環境変数で制御) | |
| DEBUG_DEIMV2 = os.getenv("DEBUG_DEIMV2", "0") == "1" | |
| # YAMLConfigをインポート(engineパッケージ経由でレジストリをロード) | |
| # モジュール登録のために、すべての必要なモジュールを明示的にインポート | |
| # 重要: モジュールファイルを直接インポートすることで、@register()デコレータを確実に実行する | |
| # まず、engineパッケージ全体をインポート | |
| try: | |
| import engine | |
| from engine import YAMLConfig | |
| except ImportError: | |
| from engine.core.yaml_config import YAMLConfig | |
| # 次に、すべての必要なモジュールファイルを直接インポート | |
| # これにより、@register()デコレータが実行され、GLOBAL_CONFIGに登録される | |
| try: | |
| # Backboneモジュール | |
| import engine.backbone.dinov3_adapter # DINOv3STAs | |
| # DEIMモジュール(すべての重要なクラスを含む) | |
| import engine.deim.hybrid_encoder # HybridEncoder - 必須 | |
| import engine.deim.deim_decoder # DEIMTransformer - 必須 | |
| import engine.deim.deim # DEIM - 必須 | |
| import engine.deim.postprocessor # PostProcessor - 必須 | |
| import engine.deim.matcher # HungarianMatcher | |
| import engine.deim.deim_criterion # DEIMCriterion | |
| # その他のモジュールもインポート(念のため) | |
| import engine.deim | |
| import engine.backbone | |
| import engine.data | |
| import engine.optim | |
| except ImportError as e: | |
| # インポートエラーは警告として出力(デバッグ用) | |
| import warnings | |
| warnings.warn(f"Some engine modules could not be imported: {e}") | |
| # (x1, y1, x2, y2, label_name, score) | |
| Detection = Tuple[float, float, float, float, str, float] | |
| # ★ここを自分のファイル名に合わせる | |
| MODEL_CONFIG_PATH = "configs/deimv2_floorplan.yaml" | |
| MODEL_WEIGHTS_PATH = "models/best_stg2.pth" | |
| def _get_device(): | |
| """ | |
| ZeroGPU対応: デバイスを遅延決定する。 | |
| ZeroGPU環境では、import時点ではGPUが利用できないため、 | |
| この関数を呼び出した時点でデバイスを決定する。 | |
| """ | |
| return torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
| # クラスID→記号名マッピング | |
| # クラス名リスト: ["kanki", "kanki_shikaku", "kanki_regisuta", "window1", "window2", "door1", "door2", "bathtub1", "konro1", "sink1", "toilet1", "kasaikeihou1", "kasaikeihou2", "houi1", "houi2", "houi3"] | |
| label_map = { | |
| 0: "kanki", # kanki | |
| 5: "door1", | |
| 6: "door2", | |
| } | |
| def load_deimv2_model(): | |
| """ | |
| HF Spaces 起動時に一度だけ呼ばれて、cfg + model + transform をキャッシュする。 | |
| もとの main(args) でやっていた処理をここに移植。 | |
| """ | |
| import os | |
| # モジュール登録の確認と強制インポート | |
| # トップレベルでインポート済みのはずだが、念のため確認して再インポート | |
| from engine.core.workspace import GLOBAL_CONFIG | |
| # 必要なモジュールが登録されているか確認 | |
| required_modules = { | |
| 'HybridEncoder': 'engine.deim.hybrid_encoder', | |
| 'DEIMTransformer': 'engine.deim.deim_decoder', | |
| 'PostProcessor': 'engine.deim.postprocessor', | |
| 'DINOv3STAs': 'engine.backbone.dinov3_adapter', | |
| 'DEIM': 'engine.deim.deim', | |
| } | |
| missing_modules = {name: module_path for name, module_path in required_modules.items() | |
| if name not in GLOBAL_CONFIG} | |
| if missing_modules: | |
| # まだ登録されていない場合は、強制的にインポート | |
| import importlib | |
| for name, module_path in missing_modules.items(): | |
| try: | |
| importlib.import_module(module_path) | |
| except ModuleNotFoundError as e: | |
| # 依存関係の問題を明確に示す | |
| missing_dep = str(e).split("'")[1] if "'" in str(e) else str(e) | |
| raise RuntimeError( | |
| f"Failed to import module {name} from {module_path} due to missing dependency: {missing_dep}. " | |
| f"Please install it with: pip install {missing_dep}. " | |
| f"Available registered modules: {list(GLOBAL_CONFIG.keys())[:20]}" | |
| ) | |
| except Exception as e: | |
| raise RuntimeError( | |
| f"Failed to import and register module {name} from {module_path}: {e}. " | |
| f"Available registered modules: {list(GLOBAL_CONFIG.keys())[:20]}" | |
| ) | |
| # 再確認 | |
| still_missing = [name for name in required_modules.keys() if name not in GLOBAL_CONFIG] | |
| if still_missing: | |
| raise RuntimeError( | |
| f"Modules still not registered after import: {still_missing}. " | |
| f"Available registered modules: {list(GLOBAL_CONFIG.keys())}" | |
| ) | |
| # パスの確認(絶対パスに変換) | |
| config_path = os.path.abspath(MODEL_CONFIG_PATH) | |
| weights_path = os.path.abspath(MODEL_WEIGHTS_PATH) | |
| if not os.path.exists(config_path): | |
| raise FileNotFoundError(f"設定ファイルが見つかりません: {config_path}") | |
| if not os.path.exists(weights_path): | |
| raise FileNotFoundError(f"モデルファイルが見つかりません: {weights_path}") | |
| try: | |
| cfg = YAMLConfig(config_path, resume=weights_path) | |
| except Exception as e: | |
| raise RuntimeError(f"YAMLConfigの初期化に失敗しました: {e}") | |
| # もとのスクリプトと同じ処理 | |
| if 'HGNetv2' in cfg.yaml_cfg: | |
| cfg.yaml_cfg['HGNetv2']['pretrained'] = False | |
| # ZeroGPU対応: デバイスを遅延決定 | |
| device = _get_device() | |
| print(f"[DEBUG] 使用デバイス: {device}") | |
| try: | |
| checkpoint = torch.load(weights_path, map_location=device) | |
| if 'ema' in checkpoint: | |
| state = checkpoint['ema']['module'] | |
| else: | |
| state = checkpoint['model'] | |
| # 訓練時と設定のズレがある場合でも動かせるように緩めにロード | |
| model_state = cfg.model.state_dict() | |
| # デバッグ情報: チェックポイント内のデコーダー関連キーのリストアップ | |
| decoder_keys_in_checkpoint = [k for k in state.keys() if 'decoder' in k.lower()] | |
| print(f"[DEBUG] チェックポイント内のデコーダー関連キー数: {len(decoder_keys_in_checkpoint)}") | |
| if decoder_keys_in_checkpoint: | |
| print(f"[DEBUG] チェックポイント内のデコーダー関連キー(最初の20件): {decoder_keys_in_checkpoint[:20]}") | |
| # デバッグ情報: モデル内のデコーダー関連キーのリストアップ | |
| decoder_keys_in_model = [k for k in model_state.keys() if 'decoder' in k.lower()] | |
| print(f"[DEBUG] モデル内のデコーダー関連キー数: {len(decoder_keys_in_model)}") | |
| if decoder_keys_in_model: | |
| print(f"[DEBUG] モデル内のデコーダー関連キー(最初の20件): {decoder_keys_in_model[:20]}") | |
| # デバッグ情報: デコーダーパラメータの値の確認(読み込み前) | |
| if decoder_keys_in_checkpoint: | |
| first_decoder_key = decoder_keys_in_checkpoint[0] | |
| if first_decoder_key in state: | |
| print(f"[DEBUG] チェックポイント内のデコーダーパラメータ '{first_decoder_key}' の値の範囲: min={state[first_decoder_key].min():.6f}, max={state[first_decoder_key].max():.6f}, mean={state[first_decoder_key].mean():.6f}") | |
| compatible_state = {} | |
| skipped = [] | |
| for k, v in state.items(): | |
| if k in model_state and model_state[k].shape == v.shape: | |
| compatible_state[k] = v | |
| else: | |
| skipped.append(k) | |
| load_result = cfg.model.load_state_dict(compatible_state, strict=False) | |
| # デバッグ情報: キーのマッチング確認 | |
| matched_decoder_keys = [k for k in decoder_keys_in_checkpoint if k in model_state and k in compatible_state] | |
| unmatched_decoder_keys = [k for k in decoder_keys_in_checkpoint if k not in model_state or k not in compatible_state] | |
| print(f"[DEBUG] マッチしたデコーダー関連キー数: {len(matched_decoder_keys)}") | |
| print(f"[DEBUG] マッチしなかったデコーダー関連キー数: {len(unmatched_decoder_keys)}") | |
| if unmatched_decoder_keys: | |
| print(f"[DEBUG] マッチしなかったデコーダー関連キー(最初の20件): {unmatched_decoder_keys[:20]}") | |
| # デバッグ情報: デコーダーパラメータの値の確認(読み込み後) | |
| decoder_params_after = {k: v for k, v in cfg.model.named_parameters() if 'decoder' in k.lower()} | |
| if decoder_params_after: | |
| first_decoder_param_after = next(iter(decoder_params_after.values())) | |
| print(f"[DEBUG] 読み込み後のデコーダーパラメータの値の範囲: min={first_decoder_param_after.min():.6f}, max={first_decoder_param_after.max():.6f}, mean={first_decoder_param_after.mean():.6f}") | |
| # すべてのデコーダーパラメータの値の範囲も確認 | |
| all_decoder_values = torch.cat([p.flatten() for p in decoder_params_after.values()]) | |
| print(f"[DEBUG] 読み込み後の全デコーダーパラメータの値の範囲: min={all_decoder_values.min():.6f}, max={all_decoder_values.max():.6f}, mean={all_decoder_values.mean():.6f}") | |
| # デバッグ情報: 読み込み統計 | |
| print(f"[DEBUG] チェックポイント読み込み統計:") | |
| print(f" - チェックポイント内のキー数: {len(state)}") | |
| print(f" - モデル内のキー数: {len(model_state)}") | |
| print(f" - 読み込んだキー数: {len(compatible_state)}") | |
| print(f" - 形状不一致でスキップ: {len(skipped)}") | |
| print(f" - 読み込み後の欠落キー: {len(load_result.missing_keys) if load_result.missing_keys else 0}") | |
| print(f" - 読み込み後の予期しないキー: {len(load_result.unexpected_keys) if load_result.unexpected_keys else 0}") | |
| if skipped or load_result.missing_keys or load_result.unexpected_keys: | |
| print("Warning: partial checkpoint load.") | |
| if skipped: | |
| print(f" shape-mismatched skipped keys: {len(skipped)}") | |
| # 重要なキー(decoder, head関連)を優先表示 | |
| important_skipped = [k for k in skipped if any(x in k for x in ['decoder', 'head', 'class', 'bbox', 'query'])] | |
| if important_skipped: | |
| print(f" [重要] スキップされたキー(decoder/head関連): {important_skipped[:10]}") | |
| if len(skipped) <= 20: | |
| print(f" すべてのスキップされたキー: {skipped}") | |
| else: | |
| print(f" スキップされたキー(最初の20件): {skipped[:20]}") | |
| if load_result.missing_keys: | |
| print(f" missing keys after load: {len(load_result.missing_keys)}") | |
| # 重要なキー(decoder, head関連)を優先表示 | |
| important_missing = [k for k in load_result.missing_keys if any(x in k for x in ['decoder', 'head', 'class', 'bbox', 'query'])] | |
| if important_missing: | |
| print(f" [重要] 欠落キー(decoder/head関連): {important_missing[:20]}") | |
| if len(load_result.missing_keys) <= 30: | |
| print(f" すべての欠落キー: {load_result.missing_keys}") | |
| else: | |
| print(f" 欠落キー(最初の30件): {list(load_result.missing_keys)[:30]}") | |
| if load_result.unexpected_keys: | |
| print(f" unexpected keys after load: {len(load_result.unexpected_keys)}") | |
| if len(load_result.unexpected_keys) <= 20: | |
| print(f" 予期しないキー: {load_result.unexpected_keys}") | |
| else: | |
| print(f" 予期しないキー(最初の20件): {load_result.unexpected_keys[:20]}") | |
| else: | |
| print(f"[DEBUG] モデル重みの読み込み: 成功 (読み込んだキー数: {len(compatible_state)})") | |
| # デバッグ情報: モデルのパラメータ統計(読み込み後) | |
| total_params = sum(p.numel() for p in cfg.model.parameters()) | |
| trainable_params = sum(p.numel() for p in cfg.model.parameters() if p.requires_grad) | |
| print(f"[DEBUG] モデルパラメータ統計:") | |
| print(f" - 総パラメータ数: {total_params:,}") | |
| print(f" - 学習可能パラメータ数: {trainable_params:,}") | |
| # デバッグ情報: デコーダーとヘッドのパラメータが初期化されているか確認 | |
| decoder_params = {k: v for k, v in cfg.model.named_parameters() if 'decoder' in k} | |
| head_params = {k: v for k, v in cfg.model.named_parameters() if any(x in k for x in ['head', 'class', 'bbox'])} | |
| print(f"[DEBUG] デコーダー/ヘッドパラメータ:") | |
| print(f" - デコーダーパラメータ数: {len(decoder_params)}") | |
| print(f" - ヘッドパラメータ数: {len(head_params)}") | |
| if decoder_params: | |
| # 最初のデコーダーパラメータの統計を確認 | |
| first_decoder_param = next(iter(decoder_params.values())) | |
| print(f" - デコーダーパラメータの値の範囲: min={first_decoder_param.min():.6f}, max={first_decoder_param.max():.6f}, mean={first_decoder_param.mean():.6f}") | |
| if head_params: | |
| # 最初のヘッドパラメータの統計を確認 | |
| first_head_param = next(iter(head_params.values())) | |
| print(f" - ヘッドパラメータの値の範囲: min={first_head_param.min():.6f}, max={first_head_param.max():.6f}, mean={first_head_param.mean():.6f}") | |
| except Exception as e: | |
| raise RuntimeError(f"モデルの重みの読み込みに失敗しました: {e}") | |
| class Model(nn.Module): | |
| def __init__(self, cfg, device): | |
| super().__init__() | |
| self.device = device | |
| self.model = cfg.model.eval().to(device) | |
| self.postprocessor = cfg.postprocessor.eval().to(device) | |
| # デバッグ情報: モデルとポストプロセッサの設定 | |
| print(f"[DEBUG] モデル構築:") | |
| print(f" - モデルタイプ: {type(self.model).__name__}") | |
| print(f" - ポストプロセッサタイプ: {type(self.postprocessor).__name__}") | |
| if hasattr(self.postprocessor, 'use_focal_loss'): | |
| print(f" - use_focal_loss: {self.postprocessor.use_focal_loss}") | |
| if hasattr(self.postprocessor, 'num_classes'): | |
| print(f" - num_classes: {self.postprocessor.num_classes}") | |
| if hasattr(self.postprocessor, 'num_top_queries'): | |
| print(f" - num_top_queries: {self.postprocessor.num_top_queries}") | |
| def forward(self, images, orig_target_sizes): | |
| outputs = self.model(images) | |
| outputs = self.postprocessor(outputs, orig_target_sizes) | |
| return outputs | |
| model = Model(cfg, device) | |
| # eval_spatial_sizeが設定にない場合は、val_dataloaderのResizeサイズから取得 | |
| # デフォルトは640x640 | |
| if "eval_spatial_size" in cfg.yaml_cfg: | |
| img_size = cfg.yaml_cfg["eval_spatial_size"] | |
| else: | |
| # val_dataloaderのtransformsから取得を試みる | |
| val_transforms = cfg.yaml_cfg.get("val_dataloader", {}).get("dataset", {}).get("transforms", {}).get("ops", []) | |
| img_size = 640 # デフォルト値 | |
| for op in val_transforms: | |
| if isinstance(op, dict) and op.get("type") == "Resize": | |
| size = op.get("size", [640, 640]) | |
| img_size = size[0] if isinstance(size, list) else size | |
| break | |
| vit_backbone = cfg.yaml_cfg.get('DINOv3STAs', False) | |
| if vit_backbone: | |
| transforms = T.Compose([ | |
| T.Resize(img_size), | |
| T.ToTensor(), | |
| T.Normalize( | |
| mean=[0.485, 0.456, 0.406], | |
| std=[0.229, 0.224, 0.225], | |
| ), | |
| ]) | |
| else: | |
| transforms = T.Compose([ | |
| T.Resize(img_size), | |
| T.ToTensor(), | |
| ]) | |
| return model, transforms | |
| def run_inference_single_tile( | |
| model, | |
| transforms, | |
| tile_pil: Image.Image, | |
| tile_x: int, | |
| tile_y: int, | |
| tile_w: int, | |
| tile_h: int, | |
| score_thresh: float = 0.8, | |
| ) -> List[Detection]: | |
| """ | |
| 単一タイルに対する推論を実行し、元の画像座標系に変換して返す。 | |
| """ | |
| tile_w_actual, tile_h_actual = tile_pil.size | |
| # ZeroGPU対応: モデルからデバイスを動的に取得 | |
| device = next(model.model.parameters()).device | |
| # デバッグ情報: タイルの基本情報(DEBUG_DEIMV2フラグで制御) | |
| if DEBUG_DEIMV2: | |
| is_first_few = tile_x < 2000 and tile_y < 2000 # 最初の数タイルのみ | |
| if is_first_few: | |
| print(f"[DEBUG] タイル処理開始: 座標({tile_x},{tile_y}), サイズ{tile_w}×{tile_h}") | |
| print(f"[DEBUG] PIL画像サイズ: {tile_w_actual}×{tile_h_actual}") | |
| # タイル画像の統計情報 | |
| tile_np = np.array(tile_pil) | |
| print(f"[DEBUG] タイル画像値の範囲: min={tile_np.min()}, max={tile_np.max()}, mean={tile_np.mean():.2f}") | |
| # タイルサイズをorig_sizeとして設定 | |
| orig_size = torch.tensor([[tile_w_actual, tile_h_actual]], device=device).float() | |
| # 前処理 | |
| im_tensor = transforms(tile_pil).unsqueeze(0).to(device) | |
| # デバッグ情報: リサイズ後の確認(DEBUG_DEIMV2フラグで制御) | |
| if DEBUG_DEIMV2: | |
| is_first_few = tile_x < 2000 and tile_y < 2000 | |
| if is_first_few: | |
| print(f"[DEBUG] リサイズ後のテンソル形状: {im_tensor.shape}") | |
| # リサイズ前後のサイズ比較 | |
| if hasattr(transforms, 'transforms'): | |
| for t in transforms.transforms: | |
| if isinstance(t, T.Resize): | |
| print(f"[DEBUG] Resize設定: {t.size}") | |
| break | |
| print(f"[DEBUG] 前処理後のテンソル: shape={im_tensor.shape}, dtype={im_tensor.dtype}") | |
| print(f"[DEBUG] テンソル値の範囲: min={im_tensor.min():.4f}, max={im_tensor.max():.4f}, mean={im_tensor.mean():.4f}") | |
| print(f"[DEBUG] orig_size: {orig_size}") | |
| with torch.no_grad(): | |
| # デバッグモードの場合のみ、モデルの生の出力を確認(推論を2回実行) | |
| if DEBUG_DEIMV2: | |
| model_outputs = model.model(im_tensor) | |
| is_first_few = tile_x < 2000 and tile_y < 2000 | |
| if is_first_few: | |
| print(f"[DEBUG] モデル生出力: pred_logits.shape={model_outputs['pred_logits'].shape}, pred_boxes.shape={model_outputs['pred_boxes'].shape}") | |
| print(f"[DEBUG] pred_logits範囲: min={model_outputs['pred_logits'].min():.4f}, max={model_outputs['pred_logits'].max():.4f}, mean={model_outputs['pred_logits'].mean():.4f}") | |
| # クラス別のlogitsの最大値を確認 | |
| logits_max_per_class = model_outputs['pred_logits'].max(dim=1)[0] # [1, 16] | |
| print(f"[DEBUG] クラス別最大logits: {logits_max_per_class[0].cpu().numpy()}") | |
| # sigmoid後のスコアも確認 | |
| scores_raw = torch.sigmoid(model_outputs['pred_logits']) | |
| scores_max_per_class = scores_raw.max(dim=1)[0] # [1, 16] | |
| print(f"[DEBUG] クラス別最大スコア(sigmoid後): {scores_max_per_class[0].cpu().numpy()}") | |
| print(f"[DEBUG] pred_boxes範囲: min={model_outputs['pred_boxes'].min():.4f}, max={model_outputs['pred_boxes'].max():.4f}, mean={model_outputs['pred_boxes'].mean():.4f}") | |
| print(f"[DEBUG] pred_boxes形状(cxcywh形式): 最初の5件={model_outputs['pred_boxes'][0, :5, :]}") | |
| # ポストプロセッサ前の座標変換を確認 | |
| import torchvision.ops | |
| bbox_pred_raw = torchvision.ops.box_convert(model_outputs['pred_boxes'], in_fmt='cxcywh', out_fmt='xyxy') | |
| print(f"[DEBUG] cxcywh→xyxy変換後(正規化座標): 最初の5件={bbox_pred_raw[0, :5, :]}") | |
| print(f"[DEBUG] xyxy変換後の範囲: min={bbox_pred_raw.min():.4f}, max={bbox_pred_raw.max():.4f}") | |
| # 本番推論(1回のみ実行) | |
| outputs = model(im_tensor, orig_size) | |
| if not outputs or len(outputs) == 0: | |
| if DEBUG_DEIMV2: | |
| print(f"[DEBUG] モデル出力が空です") | |
| return [] | |
| out = outputs[0] | |
| labels = out['labels'].detach().cpu().numpy() | |
| boxes = out['boxes'].detach().cpu().numpy() | |
| scores = out['scores'].detach().cpu().numpy() | |
| # デバッグ情報: ポストプロセッサ後の出力(DEBUG_DEIMV2フラグで制御) | |
| if DEBUG_DEIMV2: | |
| print(f"[DEBUG] ポストプロセッサ後: labels.shape={labels.shape}, boxes.shape={boxes.shape}, scores.shape={scores.shape}") | |
| print(f"[DEBUG] boxes範囲: x1=[{boxes[:, 0].min():.1f}, {boxes[:, 0].max():.1f}], y1=[{boxes[:, 1].min():.1f}, {boxes[:, 1].max():.1f}], x2=[{boxes[:, 2].min():.1f}, {boxes[:, 2].max():.1f}], y2=[{boxes[:, 3].min():.1f}, {boxes[:, 3].max():.1f}]") | |
| # デバッグ情報: タイルの検出結果詳細 | |
| print(f"[DEBUG] タイル検出結果(フィルタリング前): {len(scores)}件") | |
| if len(scores) > 0: | |
| print(f"[DEBUG] スコア範囲: min={scores.min():.4f}, max={scores.max():.4f}, mean={scores.mean():.4f}") | |
| print(f"[DEBUG] スコア分布:") | |
| print(f" - 0.0-0.3: {(scores < 0.3).sum()}件") | |
| print(f" - 0.3-0.5: {((scores >= 0.3) & (scores < 0.5)).sum()}件") | |
| print(f" - 0.5-0.7: {((scores >= 0.5) & (scores < 0.7)).sum()}件") | |
| print(f" - 0.7-0.9: {((scores >= 0.7) & (scores < 0.9)).sum()}件") | |
| print(f" - 0.9-1.0: {(scores >= 0.9).sum()}件") | |
| # ラベルの分布 | |
| unique_labels, label_counts = np.unique(labels, return_counts=True) | |
| print(f"[DEBUG] ラベル分布:") | |
| for label_id, count in zip(unique_labels, label_counts): | |
| label_name = label_map.get(int(label_id), f"class_{int(label_id)}") | |
| print(f" - クラスID {int(label_id)} ({label_name}): {count}件") | |
| # スコア閾値以上の検出数 | |
| above_thresh = scores >= score_thresh | |
| print(f"[DEBUG] スコア閾値({score_thresh})以上の検出数: {above_thresh.sum()}件") | |
| # 上位5件の詳細を表示 | |
| if len(scores) > 0: | |
| top_indices = np.argsort(scores)[::-1][:5] | |
| print(f"[DEBUG] 上位5件の検出結果:") | |
| for rank, idx in enumerate(top_indices, 1): | |
| label_id = int(labels[idx]) | |
| label_name = label_map.get(label_id, f"class_{label_id}") | |
| score = float(scores[idx]) | |
| x1, y1, x2, y2 = boxes[idx] | |
| print(f" [{rank}] {label_name}, スコア={score:.4f}, bbox=({x1:.1f},{y1:.1f},{x2:.1f},{y2:.1f})") | |
| detections: List[Detection] = [] | |
| filtered_by_thresh = 0 | |
| filtered_by_label = 0 | |
| for label, box, score in zip(labels, boxes, scores): | |
| score = float(score) | |
| if score < score_thresh: | |
| filtered_by_thresh += 1 | |
| continue | |
| x1, y1, x2, y2 = [float(v) for v in box.tolist()] | |
| label_id = int(label) | |
| label_name = label_map.get(label_id, f"class_{label_id}") | |
| # label_mapに存在しないクラスもカウント(デバッグ用) | |
| if label_id not in label_map: | |
| filtered_by_label += 1 | |
| # タイル座標を元の画像座標に変換 | |
| x1_orig = x1 + tile_x | |
| y1_orig = y1 + tile_y | |
| x2_orig = x2 + tile_x | |
| y2_orig = y2 + tile_y | |
| detections.append((x1_orig, y1_orig, x2_orig, y2_orig, label_name, score)) | |
| if DEBUG_DEIMV2 and (filtered_by_thresh > 0 or filtered_by_label > 0): | |
| print(f"[DEBUG] フィルタリング: スコア閾値で{filtered_by_thresh}件、label_mapで{filtered_by_label}件除外") | |
| return detections | |
| def run_inference( | |
| image_np: np.ndarray, | |
| score_thresh: float = 0.8, | |
| tile_size: int = 640, | |
| tile_overlap: int = 128, | |
| ) -> List[Detection]: | |
| """ | |
| タイル推論を実行する。 | |
| 大きな画像をタイルに分割して推論し、結果を統合する。 | |
| Args: | |
| image_np: RGB np.ndarray (H, W, 3) | |
| score_thresh: スコア閾値 | |
| tile_size: タイルサイズ(デフォルト: 640) | |
| tile_overlap: タイル間のオーバーラップ(デフォルト: 128) | |
| Returns: | |
| [(x1,y1,x2,y2,label_name,score), ...] | |
| """ | |
| try: | |
| model, transforms = load_deimv2_model() | |
| except Exception as e: | |
| raise RuntimeError(f"モデルの読み込みに失敗しました: {e}") | |
| try: | |
| # numpy → PIL | |
| if image_np.dtype != np.uint8: | |
| if image_np.max() <= 1.0: | |
| image_np = (image_np * 255).astype(np.uint8) | |
| else: | |
| image_np = image_np.astype(np.uint8) | |
| im_pil = Image.fromarray(image_np).convert("RGB") | |
| img_w, img_h = im_pil.size | |
| if DEBUG_DEIMV2: | |
| print(f"[DEBUG] ===== タイル推論開始 =====") | |
| print(f"[DEBUG] 入力画像サイズ: {img_w}×{img_h} (ピクセル数: {img_w*img_h:,})") | |
| print(f"[DEBUG] タイルサイズ: {tile_size}×{tile_size}") | |
| print(f"[DEBUG] タイルオーバーラップ: {tile_overlap}px") | |
| # タイルに分割 | |
| step = tile_size - tile_overlap | |
| tiles = [] | |
| tile_coords = [] | |
| for y in range(0, img_h, step): | |
| for x in range(0, img_w, step): | |
| # タイルの範囲を計算 | |
| tile_x = x | |
| tile_y = y | |
| tile_x_end = min(x + tile_size, img_w) | |
| tile_y_end = min(y + tile_size, img_h) | |
| tile_w = tile_x_end - tile_x | |
| tile_h = tile_y_end - tile_y | |
| # タイルを切り出し | |
| tile = im_pil.crop((tile_x, tile_y, tile_x_end, tile_y_end)) | |
| # デバッグ情報: タイルの確認(DEBUG_DEIMV2フラグで制御) | |
| if DEBUG_DEIMV2 and len(tiles) < 3: | |
| print(f"[DEBUG] タイル {len(tiles)+1} 詳細:") | |
| print(f" - 切り出し範囲: ({tile_x}, {tile_y}) → ({tile_x_end}, {tile_y_end})") | |
| print(f" - タイルサイズ: {tile_w}×{tile_h}") | |
| print(f" - 実際のPIL画像サイズ: {tile.size}") | |
| # タイル画像の統計情報 | |
| tile_np = np.array(tile) | |
| print(f" - 画像値の範囲: min={tile_np.min()}, max={tile_np.max()}, mean={tile_np.mean():.2f}") | |
| # タイル画像を保存(デバッグ用、最初の3タイルのみ) | |
| try: | |
| debug_dir = "debug_tiles" | |
| os.makedirs(debug_dir, exist_ok=True) | |
| tile.save(f"{debug_dir}/tile_{len(tiles)+1}_x{tile_x}_y{tile_y}.png") | |
| print(f" - タイル画像を保存: {debug_dir}/tile_{len(tiles)+1}_x{tile_x}_y{tile_y}.png") | |
| except Exception as e: | |
| print(f" - タイル画像の保存に失敗: {e}") | |
| tiles.append(tile) | |
| tile_coords.append((tile_x, tile_y, tile_w, tile_h)) | |
| if DEBUG_DEIMV2: | |
| print(f"[DEBUG] タイル数: {len(tiles)}") | |
| # 各タイルに対して推論 | |
| all_detections = [] | |
| for i, (tile, (tile_x, tile_y, tile_w, tile_h)) in enumerate(zip(tiles, tile_coords)): | |
| if DEBUG_DEIMV2: | |
| print(f"[DEBUG] タイル {i+1}/{len(tiles)}: 座標({tile_x},{tile_y}), サイズ{tile_w}×{tile_h}") | |
| tile_detections = run_inference_single_tile( | |
| model, transforms, tile, | |
| tile_x, tile_y, tile_w, tile_h, | |
| score_thresh=score_thresh | |
| ) | |
| if DEBUG_DEIMV2: | |
| print(f"[DEBUG] 検出数: {len(tile_detections)}件") | |
| all_detections.extend(tile_detections) | |
| if DEBUG_DEIMV2: | |
| print(f"[DEBUG] 総検出数(重複あり): {len(all_detections)}件") | |
| if len(all_detections) == 0: | |
| if DEBUG_DEIMV2: | |
| print(f"[DEBUG] =========================") | |
| return [] | |
| # 全検出結果の統計情報(DEBUG_DEIMV2フラグで制御) | |
| if DEBUG_DEIMV2 and len(all_detections) > 0: | |
| all_scores = [det[5] for det in all_detections] # scoreは6番目の要素 | |
| all_labels = [det[4] for det in all_detections] # label_nameは5番目の要素 | |
| print(f"[DEBUG] 全タイル統合後の統計:") | |
| print(f" - スコア範囲: min={min(all_scores):.4f}, max={max(all_scores):.4f}, mean={sum(all_scores)/len(all_scores):.4f}") | |
| # ラベルごとの集計 | |
| from collections import Counter | |
| label_counter = Counter(all_labels) | |
| print(f" - ラベル別検出数:") | |
| for label_name, count in sorted(label_counter.items(), key=lambda x: -x[1]): | |
| print(f" - {label_name}: {count}件") | |
| # NMS(Non-Maximum Suppression)で重複検出をマージ | |
| # クラスごとにNMSを適用(異なるクラス間の重複は許可) | |
| from torchvision.ops import nms | |
| # ZeroGPU対応: モデルからデバイスを動的に取得 | |
| device = next(model.model.parameters()).device | |
| # クラスごとにグループ化 | |
| detections_by_class = {} | |
| for det in all_detections: | |
| x1, y1, x2, y2, label_name, score = det | |
| if label_name not in detections_by_class: | |
| detections_by_class[label_name] = [] | |
| detections_by_class[label_name].append((x1, y1, x2, y2, score)) | |
| if DEBUG_DEIMV2: | |
| print(f"[DEBUG] NMS適用前: {len(detections_by_class)}クラス、合計{len(all_detections)}件") | |
| merged_detections = [] | |
| nms_removed = 0 | |
| for label_name, boxes_scores in detections_by_class.items(): | |
| if len(boxes_scores) == 0: | |
| continue | |
| before_nms = len(boxes_scores) | |
| # テンソルに変換 | |
| boxes_tensor = torch.tensor([[x1, y1, x2, y2] for x1, y1, x2, y2, _ in boxes_scores], device=device) | |
| scores_tensor = torch.tensor([score for _, _, _, _, score in boxes_scores], device=device) | |
| # NMS適用(IoU閾値: 0.4 - より厳しく重複を削除) | |
| keep_indices = nms(boxes_tensor, scores_tensor, iou_threshold=0.4) | |
| # マージ後の検出を追加 | |
| for idx in keep_indices.cpu().numpy(): | |
| x1, y1, x2, y2, score = boxes_scores[idx] | |
| merged_detections.append((x1, y1, x2, y2, label_name, score)) | |
| after_nms = len(keep_indices) | |
| removed = before_nms - after_nms | |
| nms_removed += removed | |
| if DEBUG_DEIMV2 and removed > 0: | |
| print(f"[DEBUG] {label_name}: NMSで{removed}件削除 ({before_nms}→{after_nms}件)") | |
| if DEBUG_DEIMV2: | |
| print(f"[DEBUG] NMS適用後: {len(merged_detections)}件 (合計{nms_removed}件削除)") | |
| # 最終結果の統計 | |
| if len(merged_detections) > 0: | |
| final_scores = [det[5] for det in merged_detections] | |
| final_labels = [det[4] for det in merged_detections] | |
| final_label_counter = Counter(final_labels) | |
| print(f"[DEBUG] 最終検出結果:") | |
| print(f" - 総検出数: {len(merged_detections)}件") | |
| print(f" - スコア範囲: min={min(final_scores):.4f}, max={max(final_scores):.4f}, mean={sum(final_scores)/len(final_scores):.4f}") | |
| print(f" - ラベル別:") | |
| for label_name, count in sorted(final_label_counter.items(), key=lambda x: -x[1]): | |
| print(f" - {label_name}: {count}件") | |
| # 上位10件を表示 | |
| sorted_detections = sorted(merged_detections, key=lambda x: x[5], reverse=True)[:10] | |
| print(f"[DEBUG] 上位10件の検出結果:") | |
| for rank, (x1, y1, x2, y2, label_name, score) in enumerate(sorted_detections, 1): | |
| print(f" [{rank}] {label_name}, スコア={score:.4f}, bbox=({x1:.1f},{y1:.1f},{x2:.1f},{y2:.1f})") | |
| print(f"[DEBUG] =========================") | |
| return merged_detections | |
| except Exception as e: | |
| raise RuntimeError(f"推論の実行に失敗しました: {e}") | |