gpu_symbol / detection.py
himipo's picture
first
4459426
# detection.py
from functools import lru_cache
from typing import List, Tuple
import os
import numpy as np
import torch
import torch.nn as nn
from PIL import Image
import torchvision.transforms as T
# デバッグ出力制御フラグ(環境変数で制御)
DEBUG_DEIMV2 = os.getenv("DEBUG_DEIMV2", "0") == "1"
# YAMLConfigをインポート(engineパッケージ経由でレジストリをロード)
# モジュール登録のために、すべての必要なモジュールを明示的にインポート
# 重要: モジュールファイルを直接インポートすることで、@register()デコレータを確実に実行する
# まず、engineパッケージ全体をインポート
try:
import engine
from engine import YAMLConfig
except ImportError:
from engine.core.yaml_config import YAMLConfig
# 次に、すべての必要なモジュールファイルを直接インポート
# これにより、@register()デコレータが実行され、GLOBAL_CONFIGに登録される
try:
# Backboneモジュール
import engine.backbone.dinov3_adapter # DINOv3STAs
# DEIMモジュール(すべての重要なクラスを含む)
import engine.deim.hybrid_encoder # HybridEncoder - 必須
import engine.deim.deim_decoder # DEIMTransformer - 必須
import engine.deim.deim # DEIM - 必須
import engine.deim.postprocessor # PostProcessor - 必須
import engine.deim.matcher # HungarianMatcher
import engine.deim.deim_criterion # DEIMCriterion
# その他のモジュールもインポート(念のため)
import engine.deim
import engine.backbone
import engine.data
import engine.optim
except ImportError as e:
# インポートエラーは警告として出力(デバッグ用)
import warnings
warnings.warn(f"Some engine modules could not be imported: {e}")
# (x1, y1, x2, y2, label_name, score)
Detection = Tuple[float, float, float, float, str, float]
# ★ここを自分のファイル名に合わせる
MODEL_CONFIG_PATH = "configs/deimv2_floorplan.yaml"
MODEL_WEIGHTS_PATH = "models/best_stg2.pth"
def _get_device():
"""
ZeroGPU対応: デバイスを遅延決定する。
ZeroGPU環境では、import時点ではGPUが利用できないため、
この関数を呼び出した時点でデバイスを決定する。
"""
return torch.device("cuda" if torch.cuda.is_available() else "cpu")
# クラスID→記号名マッピング
# クラス名リスト: ["kanki", "kanki_shikaku", "kanki_regisuta", "window1", "window2", "door1", "door2", "bathtub1", "konro1", "sink1", "toilet1", "kasaikeihou1", "kasaikeihou2", "houi1", "houi2", "houi3"]
label_map = {
0: "kanki", # kanki
5: "door1",
6: "door2",
}
@lru_cache(maxsize=1)
def load_deimv2_model():
"""
HF Spaces 起動時に一度だけ呼ばれて、cfg + model + transform をキャッシュする。
もとの main(args) でやっていた処理をここに移植。
"""
import os
# モジュール登録の確認と強制インポート
# トップレベルでインポート済みのはずだが、念のため確認して再インポート
from engine.core.workspace import GLOBAL_CONFIG
# 必要なモジュールが登録されているか確認
required_modules = {
'HybridEncoder': 'engine.deim.hybrid_encoder',
'DEIMTransformer': 'engine.deim.deim_decoder',
'PostProcessor': 'engine.deim.postprocessor',
'DINOv3STAs': 'engine.backbone.dinov3_adapter',
'DEIM': 'engine.deim.deim',
}
missing_modules = {name: module_path for name, module_path in required_modules.items()
if name not in GLOBAL_CONFIG}
if missing_modules:
# まだ登録されていない場合は、強制的にインポート
import importlib
for name, module_path in missing_modules.items():
try:
importlib.import_module(module_path)
except ModuleNotFoundError as e:
# 依存関係の問題を明確に示す
missing_dep = str(e).split("'")[1] if "'" in str(e) else str(e)
raise RuntimeError(
f"Failed to import module {name} from {module_path} due to missing dependency: {missing_dep}. "
f"Please install it with: pip install {missing_dep}. "
f"Available registered modules: {list(GLOBAL_CONFIG.keys())[:20]}"
)
except Exception as e:
raise RuntimeError(
f"Failed to import and register module {name} from {module_path}: {e}. "
f"Available registered modules: {list(GLOBAL_CONFIG.keys())[:20]}"
)
# 再確認
still_missing = [name for name in required_modules.keys() if name not in GLOBAL_CONFIG]
if still_missing:
raise RuntimeError(
f"Modules still not registered after import: {still_missing}. "
f"Available registered modules: {list(GLOBAL_CONFIG.keys())}"
)
# パスの確認(絶対パスに変換)
config_path = os.path.abspath(MODEL_CONFIG_PATH)
weights_path = os.path.abspath(MODEL_WEIGHTS_PATH)
if not os.path.exists(config_path):
raise FileNotFoundError(f"設定ファイルが見つかりません: {config_path}")
if not os.path.exists(weights_path):
raise FileNotFoundError(f"モデルファイルが見つかりません: {weights_path}")
try:
cfg = YAMLConfig(config_path, resume=weights_path)
except Exception as e:
raise RuntimeError(f"YAMLConfigの初期化に失敗しました: {e}")
# もとのスクリプトと同じ処理
if 'HGNetv2' in cfg.yaml_cfg:
cfg.yaml_cfg['HGNetv2']['pretrained'] = False
# ZeroGPU対応: デバイスを遅延決定
device = _get_device()
print(f"[DEBUG] 使用デバイス: {device}")
try:
checkpoint = torch.load(weights_path, map_location=device)
if 'ema' in checkpoint:
state = checkpoint['ema']['module']
else:
state = checkpoint['model']
# 訓練時と設定のズレがある場合でも動かせるように緩めにロード
model_state = cfg.model.state_dict()
# デバッグ情報: チェックポイント内のデコーダー関連キーのリストアップ
decoder_keys_in_checkpoint = [k for k in state.keys() if 'decoder' in k.lower()]
print(f"[DEBUG] チェックポイント内のデコーダー関連キー数: {len(decoder_keys_in_checkpoint)}")
if decoder_keys_in_checkpoint:
print(f"[DEBUG] チェックポイント内のデコーダー関連キー(最初の20件): {decoder_keys_in_checkpoint[:20]}")
# デバッグ情報: モデル内のデコーダー関連キーのリストアップ
decoder_keys_in_model = [k for k in model_state.keys() if 'decoder' in k.lower()]
print(f"[DEBUG] モデル内のデコーダー関連キー数: {len(decoder_keys_in_model)}")
if decoder_keys_in_model:
print(f"[DEBUG] モデル内のデコーダー関連キー(最初の20件): {decoder_keys_in_model[:20]}")
# デバッグ情報: デコーダーパラメータの値の確認(読み込み前)
if decoder_keys_in_checkpoint:
first_decoder_key = decoder_keys_in_checkpoint[0]
if first_decoder_key in state:
print(f"[DEBUG] チェックポイント内のデコーダーパラメータ '{first_decoder_key}' の値の範囲: min={state[first_decoder_key].min():.6f}, max={state[first_decoder_key].max():.6f}, mean={state[first_decoder_key].mean():.6f}")
compatible_state = {}
skipped = []
for k, v in state.items():
if k in model_state and model_state[k].shape == v.shape:
compatible_state[k] = v
else:
skipped.append(k)
load_result = cfg.model.load_state_dict(compatible_state, strict=False)
# デバッグ情報: キーのマッチング確認
matched_decoder_keys = [k for k in decoder_keys_in_checkpoint if k in model_state and k in compatible_state]
unmatched_decoder_keys = [k for k in decoder_keys_in_checkpoint if k not in model_state or k not in compatible_state]
print(f"[DEBUG] マッチしたデコーダー関連キー数: {len(matched_decoder_keys)}")
print(f"[DEBUG] マッチしなかったデコーダー関連キー数: {len(unmatched_decoder_keys)}")
if unmatched_decoder_keys:
print(f"[DEBUG] マッチしなかったデコーダー関連キー(最初の20件): {unmatched_decoder_keys[:20]}")
# デバッグ情報: デコーダーパラメータの値の確認(読み込み後)
decoder_params_after = {k: v for k, v in cfg.model.named_parameters() if 'decoder' in k.lower()}
if decoder_params_after:
first_decoder_param_after = next(iter(decoder_params_after.values()))
print(f"[DEBUG] 読み込み後のデコーダーパラメータの値の範囲: min={first_decoder_param_after.min():.6f}, max={first_decoder_param_after.max():.6f}, mean={first_decoder_param_after.mean():.6f}")
# すべてのデコーダーパラメータの値の範囲も確認
all_decoder_values = torch.cat([p.flatten() for p in decoder_params_after.values()])
print(f"[DEBUG] 読み込み後の全デコーダーパラメータの値の範囲: min={all_decoder_values.min():.6f}, max={all_decoder_values.max():.6f}, mean={all_decoder_values.mean():.6f}")
# デバッグ情報: 読み込み統計
print(f"[DEBUG] チェックポイント読み込み統計:")
print(f" - チェックポイント内のキー数: {len(state)}")
print(f" - モデル内のキー数: {len(model_state)}")
print(f" - 読み込んだキー数: {len(compatible_state)}")
print(f" - 形状不一致でスキップ: {len(skipped)}")
print(f" - 読み込み後の欠落キー: {len(load_result.missing_keys) if load_result.missing_keys else 0}")
print(f" - 読み込み後の予期しないキー: {len(load_result.unexpected_keys) if load_result.unexpected_keys else 0}")
if skipped or load_result.missing_keys or load_result.unexpected_keys:
print("Warning: partial checkpoint load.")
if skipped:
print(f" shape-mismatched skipped keys: {len(skipped)}")
# 重要なキー(decoder, head関連)を優先表示
important_skipped = [k for k in skipped if any(x in k for x in ['decoder', 'head', 'class', 'bbox', 'query'])]
if important_skipped:
print(f" [重要] スキップされたキー(decoder/head関連): {important_skipped[:10]}")
if len(skipped) <= 20:
print(f" すべてのスキップされたキー: {skipped}")
else:
print(f" スキップされたキー(最初の20件): {skipped[:20]}")
if load_result.missing_keys:
print(f" missing keys after load: {len(load_result.missing_keys)}")
# 重要なキー(decoder, head関連)を優先表示
important_missing = [k for k in load_result.missing_keys if any(x in k for x in ['decoder', 'head', 'class', 'bbox', 'query'])]
if important_missing:
print(f" [重要] 欠落キー(decoder/head関連): {important_missing[:20]}")
if len(load_result.missing_keys) <= 30:
print(f" すべての欠落キー: {load_result.missing_keys}")
else:
print(f" 欠落キー(最初の30件): {list(load_result.missing_keys)[:30]}")
if load_result.unexpected_keys:
print(f" unexpected keys after load: {len(load_result.unexpected_keys)}")
if len(load_result.unexpected_keys) <= 20:
print(f" 予期しないキー: {load_result.unexpected_keys}")
else:
print(f" 予期しないキー(最初の20件): {load_result.unexpected_keys[:20]}")
else:
print(f"[DEBUG] モデル重みの読み込み: 成功 (読み込んだキー数: {len(compatible_state)})")
# デバッグ情報: モデルのパラメータ統計(読み込み後)
total_params = sum(p.numel() for p in cfg.model.parameters())
trainable_params = sum(p.numel() for p in cfg.model.parameters() if p.requires_grad)
print(f"[DEBUG] モデルパラメータ統計:")
print(f" - 総パラメータ数: {total_params:,}")
print(f" - 学習可能パラメータ数: {trainable_params:,}")
# デバッグ情報: デコーダーとヘッドのパラメータが初期化されているか確認
decoder_params = {k: v for k, v in cfg.model.named_parameters() if 'decoder' in k}
head_params = {k: v for k, v in cfg.model.named_parameters() if any(x in k for x in ['head', 'class', 'bbox'])}
print(f"[DEBUG] デコーダー/ヘッドパラメータ:")
print(f" - デコーダーパラメータ数: {len(decoder_params)}")
print(f" - ヘッドパラメータ数: {len(head_params)}")
if decoder_params:
# 最初のデコーダーパラメータの統計を確認
first_decoder_param = next(iter(decoder_params.values()))
print(f" - デコーダーパラメータの値の範囲: min={first_decoder_param.min():.6f}, max={first_decoder_param.max():.6f}, mean={first_decoder_param.mean():.6f}")
if head_params:
# 最初のヘッドパラメータの統計を確認
first_head_param = next(iter(head_params.values()))
print(f" - ヘッドパラメータの値の範囲: min={first_head_param.min():.6f}, max={first_head_param.max():.6f}, mean={first_head_param.mean():.6f}")
except Exception as e:
raise RuntimeError(f"モデルの重みの読み込みに失敗しました: {e}")
class Model(nn.Module):
def __init__(self, cfg, device):
super().__init__()
self.device = device
self.model = cfg.model.eval().to(device)
self.postprocessor = cfg.postprocessor.eval().to(device)
# デバッグ情報: モデルとポストプロセッサの設定
print(f"[DEBUG] モデル構築:")
print(f" - モデルタイプ: {type(self.model).__name__}")
print(f" - ポストプロセッサタイプ: {type(self.postprocessor).__name__}")
if hasattr(self.postprocessor, 'use_focal_loss'):
print(f" - use_focal_loss: {self.postprocessor.use_focal_loss}")
if hasattr(self.postprocessor, 'num_classes'):
print(f" - num_classes: {self.postprocessor.num_classes}")
if hasattr(self.postprocessor, 'num_top_queries'):
print(f" - num_top_queries: {self.postprocessor.num_top_queries}")
def forward(self, images, orig_target_sizes):
outputs = self.model(images)
outputs = self.postprocessor(outputs, orig_target_sizes)
return outputs
model = Model(cfg, device)
# eval_spatial_sizeが設定にない場合は、val_dataloaderのResizeサイズから取得
# デフォルトは640x640
if "eval_spatial_size" in cfg.yaml_cfg:
img_size = cfg.yaml_cfg["eval_spatial_size"]
else:
# val_dataloaderのtransformsから取得を試みる
val_transforms = cfg.yaml_cfg.get("val_dataloader", {}).get("dataset", {}).get("transforms", {}).get("ops", [])
img_size = 640 # デフォルト値
for op in val_transforms:
if isinstance(op, dict) and op.get("type") == "Resize":
size = op.get("size", [640, 640])
img_size = size[0] if isinstance(size, list) else size
break
vit_backbone = cfg.yaml_cfg.get('DINOv3STAs', False)
if vit_backbone:
transforms = T.Compose([
T.Resize(img_size),
T.ToTensor(),
T.Normalize(
mean=[0.485, 0.456, 0.406],
std=[0.229, 0.224, 0.225],
),
])
else:
transforms = T.Compose([
T.Resize(img_size),
T.ToTensor(),
])
return model, transforms
def run_inference_single_tile(
model,
transforms,
tile_pil: Image.Image,
tile_x: int,
tile_y: int,
tile_w: int,
tile_h: int,
score_thresh: float = 0.8,
) -> List[Detection]:
"""
単一タイルに対する推論を実行し、元の画像座標系に変換して返す。
"""
tile_w_actual, tile_h_actual = tile_pil.size
# ZeroGPU対応: モデルからデバイスを動的に取得
device = next(model.model.parameters()).device
# デバッグ情報: タイルの基本情報(DEBUG_DEIMV2フラグで制御)
if DEBUG_DEIMV2:
is_first_few = tile_x < 2000 and tile_y < 2000 # 最初の数タイルのみ
if is_first_few:
print(f"[DEBUG] タイル処理開始: 座標({tile_x},{tile_y}), サイズ{tile_w}×{tile_h}")
print(f"[DEBUG] PIL画像サイズ: {tile_w_actual}×{tile_h_actual}")
# タイル画像の統計情報
tile_np = np.array(tile_pil)
print(f"[DEBUG] タイル画像値の範囲: min={tile_np.min()}, max={tile_np.max()}, mean={tile_np.mean():.2f}")
# タイルサイズをorig_sizeとして設定
orig_size = torch.tensor([[tile_w_actual, tile_h_actual]], device=device).float()
# 前処理
im_tensor = transforms(tile_pil).unsqueeze(0).to(device)
# デバッグ情報: リサイズ後の確認(DEBUG_DEIMV2フラグで制御)
if DEBUG_DEIMV2:
is_first_few = tile_x < 2000 and tile_y < 2000
if is_first_few:
print(f"[DEBUG] リサイズ後のテンソル形状: {im_tensor.shape}")
# リサイズ前後のサイズ比較
if hasattr(transforms, 'transforms'):
for t in transforms.transforms:
if isinstance(t, T.Resize):
print(f"[DEBUG] Resize設定: {t.size}")
break
print(f"[DEBUG] 前処理後のテンソル: shape={im_tensor.shape}, dtype={im_tensor.dtype}")
print(f"[DEBUG] テンソル値の範囲: min={im_tensor.min():.4f}, max={im_tensor.max():.4f}, mean={im_tensor.mean():.4f}")
print(f"[DEBUG] orig_size: {orig_size}")
with torch.no_grad():
# デバッグモードの場合のみ、モデルの生の出力を確認(推論を2回実行)
if DEBUG_DEIMV2:
model_outputs = model.model(im_tensor)
is_first_few = tile_x < 2000 and tile_y < 2000
if is_first_few:
print(f"[DEBUG] モデル生出力: pred_logits.shape={model_outputs['pred_logits'].shape}, pred_boxes.shape={model_outputs['pred_boxes'].shape}")
print(f"[DEBUG] pred_logits範囲: min={model_outputs['pred_logits'].min():.4f}, max={model_outputs['pred_logits'].max():.4f}, mean={model_outputs['pred_logits'].mean():.4f}")
# クラス別のlogitsの最大値を確認
logits_max_per_class = model_outputs['pred_logits'].max(dim=1)[0] # [1, 16]
print(f"[DEBUG] クラス別最大logits: {logits_max_per_class[0].cpu().numpy()}")
# sigmoid後のスコアも確認
scores_raw = torch.sigmoid(model_outputs['pred_logits'])
scores_max_per_class = scores_raw.max(dim=1)[0] # [1, 16]
print(f"[DEBUG] クラス別最大スコア(sigmoid後): {scores_max_per_class[0].cpu().numpy()}")
print(f"[DEBUG] pred_boxes範囲: min={model_outputs['pred_boxes'].min():.4f}, max={model_outputs['pred_boxes'].max():.4f}, mean={model_outputs['pred_boxes'].mean():.4f}")
print(f"[DEBUG] pred_boxes形状(cxcywh形式): 最初の5件={model_outputs['pred_boxes'][0, :5, :]}")
# ポストプロセッサ前の座標変換を確認
import torchvision.ops
bbox_pred_raw = torchvision.ops.box_convert(model_outputs['pred_boxes'], in_fmt='cxcywh', out_fmt='xyxy')
print(f"[DEBUG] cxcywh→xyxy変換後(正規化座標): 最初の5件={bbox_pred_raw[0, :5, :]}")
print(f"[DEBUG] xyxy変換後の範囲: min={bbox_pred_raw.min():.4f}, max={bbox_pred_raw.max():.4f}")
# 本番推論(1回のみ実行)
outputs = model(im_tensor, orig_size)
if not outputs or len(outputs) == 0:
if DEBUG_DEIMV2:
print(f"[DEBUG] モデル出力が空です")
return []
out = outputs[0]
labels = out['labels'].detach().cpu().numpy()
boxes = out['boxes'].detach().cpu().numpy()
scores = out['scores'].detach().cpu().numpy()
# デバッグ情報: ポストプロセッサ後の出力(DEBUG_DEIMV2フラグで制御)
if DEBUG_DEIMV2:
print(f"[DEBUG] ポストプロセッサ後: labels.shape={labels.shape}, boxes.shape={boxes.shape}, scores.shape={scores.shape}")
print(f"[DEBUG] boxes範囲: x1=[{boxes[:, 0].min():.1f}, {boxes[:, 0].max():.1f}], y1=[{boxes[:, 1].min():.1f}, {boxes[:, 1].max():.1f}], x2=[{boxes[:, 2].min():.1f}, {boxes[:, 2].max():.1f}], y2=[{boxes[:, 3].min():.1f}, {boxes[:, 3].max():.1f}]")
# デバッグ情報: タイルの検出結果詳細
print(f"[DEBUG] タイル検出結果(フィルタリング前): {len(scores)}件")
if len(scores) > 0:
print(f"[DEBUG] スコア範囲: min={scores.min():.4f}, max={scores.max():.4f}, mean={scores.mean():.4f}")
print(f"[DEBUG] スコア分布:")
print(f" - 0.0-0.3: {(scores < 0.3).sum()}件")
print(f" - 0.3-0.5: {((scores >= 0.3) & (scores < 0.5)).sum()}件")
print(f" - 0.5-0.7: {((scores >= 0.5) & (scores < 0.7)).sum()}件")
print(f" - 0.7-0.9: {((scores >= 0.7) & (scores < 0.9)).sum()}件")
print(f" - 0.9-1.0: {(scores >= 0.9).sum()}件")
# ラベルの分布
unique_labels, label_counts = np.unique(labels, return_counts=True)
print(f"[DEBUG] ラベル分布:")
for label_id, count in zip(unique_labels, label_counts):
label_name = label_map.get(int(label_id), f"class_{int(label_id)}")
print(f" - クラスID {int(label_id)} ({label_name}): {count}件")
# スコア閾値以上の検出数
above_thresh = scores >= score_thresh
print(f"[DEBUG] スコア閾値({score_thresh})以上の検出数: {above_thresh.sum()}件")
# 上位5件の詳細を表示
if len(scores) > 0:
top_indices = np.argsort(scores)[::-1][:5]
print(f"[DEBUG] 上位5件の検出結果:")
for rank, idx in enumerate(top_indices, 1):
label_id = int(labels[idx])
label_name = label_map.get(label_id, f"class_{label_id}")
score = float(scores[idx])
x1, y1, x2, y2 = boxes[idx]
print(f" [{rank}] {label_name}, スコア={score:.4f}, bbox=({x1:.1f},{y1:.1f},{x2:.1f},{y2:.1f})")
detections: List[Detection] = []
filtered_by_thresh = 0
filtered_by_label = 0
for label, box, score in zip(labels, boxes, scores):
score = float(score)
if score < score_thresh:
filtered_by_thresh += 1
continue
x1, y1, x2, y2 = [float(v) for v in box.tolist()]
label_id = int(label)
label_name = label_map.get(label_id, f"class_{label_id}")
# label_mapに存在しないクラスもカウント(デバッグ用)
if label_id not in label_map:
filtered_by_label += 1
# タイル座標を元の画像座標に変換
x1_orig = x1 + tile_x
y1_orig = y1 + tile_y
x2_orig = x2 + tile_x
y2_orig = y2 + tile_y
detections.append((x1_orig, y1_orig, x2_orig, y2_orig, label_name, score))
if DEBUG_DEIMV2 and (filtered_by_thresh > 0 or filtered_by_label > 0):
print(f"[DEBUG] フィルタリング: スコア閾値で{filtered_by_thresh}件、label_mapで{filtered_by_label}件除外")
return detections
def run_inference(
image_np: np.ndarray,
score_thresh: float = 0.8,
tile_size: int = 640,
tile_overlap: int = 128,
) -> List[Detection]:
"""
タイル推論を実行する。
大きな画像をタイルに分割して推論し、結果を統合する。
Args:
image_np: RGB np.ndarray (H, W, 3)
score_thresh: スコア閾値
tile_size: タイルサイズ(デフォルト: 640)
tile_overlap: タイル間のオーバーラップ(デフォルト: 128)
Returns:
[(x1,y1,x2,y2,label_name,score), ...]
"""
try:
model, transforms = load_deimv2_model()
except Exception as e:
raise RuntimeError(f"モデルの読み込みに失敗しました: {e}")
try:
# numpy → PIL
if image_np.dtype != np.uint8:
if image_np.max() <= 1.0:
image_np = (image_np * 255).astype(np.uint8)
else:
image_np = image_np.astype(np.uint8)
im_pil = Image.fromarray(image_np).convert("RGB")
img_w, img_h = im_pil.size
if DEBUG_DEIMV2:
print(f"[DEBUG] ===== タイル推論開始 =====")
print(f"[DEBUG] 入力画像サイズ: {img_w}×{img_h} (ピクセル数: {img_w*img_h:,})")
print(f"[DEBUG] タイルサイズ: {tile_size}×{tile_size}")
print(f"[DEBUG] タイルオーバーラップ: {tile_overlap}px")
# タイルに分割
step = tile_size - tile_overlap
tiles = []
tile_coords = []
for y in range(0, img_h, step):
for x in range(0, img_w, step):
# タイルの範囲を計算
tile_x = x
tile_y = y
tile_x_end = min(x + tile_size, img_w)
tile_y_end = min(y + tile_size, img_h)
tile_w = tile_x_end - tile_x
tile_h = tile_y_end - tile_y
# タイルを切り出し
tile = im_pil.crop((tile_x, tile_y, tile_x_end, tile_y_end))
# デバッグ情報: タイルの確認(DEBUG_DEIMV2フラグで制御)
if DEBUG_DEIMV2 and len(tiles) < 3:
print(f"[DEBUG] タイル {len(tiles)+1} 詳細:")
print(f" - 切り出し範囲: ({tile_x}, {tile_y}) → ({tile_x_end}, {tile_y_end})")
print(f" - タイルサイズ: {tile_w}×{tile_h}")
print(f" - 実際のPIL画像サイズ: {tile.size}")
# タイル画像の統計情報
tile_np = np.array(tile)
print(f" - 画像値の範囲: min={tile_np.min()}, max={tile_np.max()}, mean={tile_np.mean():.2f}")
# タイル画像を保存(デバッグ用、最初の3タイルのみ)
try:
debug_dir = "debug_tiles"
os.makedirs(debug_dir, exist_ok=True)
tile.save(f"{debug_dir}/tile_{len(tiles)+1}_x{tile_x}_y{tile_y}.png")
print(f" - タイル画像を保存: {debug_dir}/tile_{len(tiles)+1}_x{tile_x}_y{tile_y}.png")
except Exception as e:
print(f" - タイル画像の保存に失敗: {e}")
tiles.append(tile)
tile_coords.append((tile_x, tile_y, tile_w, tile_h))
if DEBUG_DEIMV2:
print(f"[DEBUG] タイル数: {len(tiles)}")
# 各タイルに対して推論
all_detections = []
for i, (tile, (tile_x, tile_y, tile_w, tile_h)) in enumerate(zip(tiles, tile_coords)):
if DEBUG_DEIMV2:
print(f"[DEBUG] タイル {i+1}/{len(tiles)}: 座標({tile_x},{tile_y}), サイズ{tile_w}×{tile_h}")
tile_detections = run_inference_single_tile(
model, transforms, tile,
tile_x, tile_y, tile_w, tile_h,
score_thresh=score_thresh
)
if DEBUG_DEIMV2:
print(f"[DEBUG] 検出数: {len(tile_detections)}件")
all_detections.extend(tile_detections)
if DEBUG_DEIMV2:
print(f"[DEBUG] 総検出数(重複あり): {len(all_detections)}件")
if len(all_detections) == 0:
if DEBUG_DEIMV2:
print(f"[DEBUG] =========================")
return []
# 全検出結果の統計情報(DEBUG_DEIMV2フラグで制御)
if DEBUG_DEIMV2 and len(all_detections) > 0:
all_scores = [det[5] for det in all_detections] # scoreは6番目の要素
all_labels = [det[4] for det in all_detections] # label_nameは5番目の要素
print(f"[DEBUG] 全タイル統合後の統計:")
print(f" - スコア範囲: min={min(all_scores):.4f}, max={max(all_scores):.4f}, mean={sum(all_scores)/len(all_scores):.4f}")
# ラベルごとの集計
from collections import Counter
label_counter = Counter(all_labels)
print(f" - ラベル別検出数:")
for label_name, count in sorted(label_counter.items(), key=lambda x: -x[1]):
print(f" - {label_name}: {count}件")
# NMS(Non-Maximum Suppression)で重複検出をマージ
# クラスごとにNMSを適用(異なるクラス間の重複は許可)
from torchvision.ops import nms
# ZeroGPU対応: モデルからデバイスを動的に取得
device = next(model.model.parameters()).device
# クラスごとにグループ化
detections_by_class = {}
for det in all_detections:
x1, y1, x2, y2, label_name, score = det
if label_name not in detections_by_class:
detections_by_class[label_name] = []
detections_by_class[label_name].append((x1, y1, x2, y2, score))
if DEBUG_DEIMV2:
print(f"[DEBUG] NMS適用前: {len(detections_by_class)}クラス、合計{len(all_detections)}件")
merged_detections = []
nms_removed = 0
for label_name, boxes_scores in detections_by_class.items():
if len(boxes_scores) == 0:
continue
before_nms = len(boxes_scores)
# テンソルに変換
boxes_tensor = torch.tensor([[x1, y1, x2, y2] for x1, y1, x2, y2, _ in boxes_scores], device=device)
scores_tensor = torch.tensor([score for _, _, _, _, score in boxes_scores], device=device)
# NMS適用(IoU閾値: 0.4 - より厳しく重複を削除)
keep_indices = nms(boxes_tensor, scores_tensor, iou_threshold=0.4)
# マージ後の検出を追加
for idx in keep_indices.cpu().numpy():
x1, y1, x2, y2, score = boxes_scores[idx]
merged_detections.append((x1, y1, x2, y2, label_name, score))
after_nms = len(keep_indices)
removed = before_nms - after_nms
nms_removed += removed
if DEBUG_DEIMV2 and removed > 0:
print(f"[DEBUG] {label_name}: NMSで{removed}件削除 ({before_nms}{after_nms}件)")
if DEBUG_DEIMV2:
print(f"[DEBUG] NMS適用後: {len(merged_detections)}件 (合計{nms_removed}件削除)")
# 最終結果の統計
if len(merged_detections) > 0:
final_scores = [det[5] for det in merged_detections]
final_labels = [det[4] for det in merged_detections]
final_label_counter = Counter(final_labels)
print(f"[DEBUG] 最終検出結果:")
print(f" - 総検出数: {len(merged_detections)}件")
print(f" - スコア範囲: min={min(final_scores):.4f}, max={max(final_scores):.4f}, mean={sum(final_scores)/len(final_scores):.4f}")
print(f" - ラベル別:")
for label_name, count in sorted(final_label_counter.items(), key=lambda x: -x[1]):
print(f" - {label_name}: {count}件")
# 上位10件を表示
sorted_detections = sorted(merged_detections, key=lambda x: x[5], reverse=True)[:10]
print(f"[DEBUG] 上位10件の検出結果:")
for rank, (x1, y1, x2, y2, label_name, score) in enumerate(sorted_detections, 1):
print(f" [{rank}] {label_name}, スコア={score:.4f}, bbox=({x1:.1f},{y1:.1f},{x2:.1f},{y2:.1f})")
print(f"[DEBUG] =========================")
return merged_detections
except Exception as e:
raise RuntimeError(f"推論の実行に失敗しました: {e}")