import gradio as gr import os import uuid import shutil import functools from PIL import Image, ImageDraw, ImageFont import numpy as np import torch # Normalize OMP threads for libgomp (HF Spaces sometimes inject invalid values) _omp_env = os.getenv("OMP_NUM_THREADS", "") if _omp_env and not _omp_env.isdigit(): os.environ["OMP_NUM_THREADS"] = "4" # ZeroGPU Support - CRITICAL for HuggingFace Spaces try: import spaces ZEROGPU_AVAILABLE = True print("✅ ZeroGPU support enabled") except ImportError: print("⚠️ ZeroGPU not available - running in standard mode") ZEROGPU_AVAILABLE = False # Create dummy decorator for local development class spaces: @staticmethod def GPU(duration=60): def decorator(func): return func return decorator #from cube3d.render.render_bricks import render_bricks from cube3d.render.render_bricks_safe import render_bricks_safe from cube3d.training.engine import Engine, EngineFast from cube3d.training.bert_infer import generate_tokens from cube3d.training.utils import normalize_bboxs from cube3d.training.process_single_ldr import process_ldr_data, process_ldr_flatten, logits2botldrpr from cube3d.config import HF_CACHE_DIR # Neural design generation for text-to-LEGO functionality try: from clip_retrieval import get_retriever CLIP_AVAILABLE = True except ImportError: print("⚠️ Text-to-design module not available. Text input feature will be disabled.") CLIP_AVAILABLE = False # Lazy loading for GPU models (ZeroGPU requirement) _retriever = None _gpt_engine = None @functools.lru_cache(maxsize=1) def get_clip_retriever_cached(): """Lazy load CLIP retriever (initialized only once, cached)""" print("🔧 Initializing CLIP retriever (one-time setup)...") retriever = get_retriever(data_root="data/1313个筛选车结构和对照渲染图") print(f"✅ CLIP retriever loaded ({retriever.features.shape[0]} designs)") return retriever # Removed cached engine - creates fresh instance each time to prevent state corruption # 确保临时目录存在(远程服务器路径) TMP_DIR = "./tmp/ldr_processor_demo" os.makedirs(TMP_DIR, exist_ok=True) class MockFileStorage: def __init__(self, file_path): self.name = file_path # 关键:模拟文件路径属性,和 Gadio 保持一致 # 模型预测函数(保持原逻辑) def model_predict(ldr_content): parts = [line.strip() for line in ldr_content.splitlines() if line.strip()] positions = [(120.0, 0, 180.0), (90.0, 0, 210.0), (90.0, 0, 180.0), (70.0, 0, 170.0)] color_code = 115 result = [] for i, part in enumerate(parts): pos = positions[i % len(positions)] part_line = f"1 {color_code} {pos[0]} {pos[1]} {pos[2]} 0 0 1 0 1 0 -1 0 0 {part}" result.append(part_line) if i < len(parts) - 1: result.append("0 STEP") return "\n".join(result) DEFAULT_PART_RENDER_PATH = "../data/car_1k/demos/example/part_ldr_1k_render/" os.makedirs(DEFAULT_PART_RENDER_PATH, exist_ok=True) # Ensure a visual placeholder exists to avoid broken images in the gallery UNKNOWN_PART_IMG = os.path.join(DEFAULT_PART_RENDER_PATH, "unknown_part.png") if not os.path.exists(UNKNOWN_PART_IMG): os.makedirs(os.path.dirname(UNKNOWN_PART_IMG), exist_ok=True) img = Image.new("RGB", (256, 256), color=(240, 240, 240)) draw = ImageDraw.Draw(img) text = "No preview" draw.text((70, 120), text, fill=(120, 120, 120)) img.save(UNKNOWN_PART_IMG) def get_part_renderings(part_names): renderings = [] for part in part_names: # 拼接零件对应的渲染图路径(假设文件名与part_name一致,后缀为.png) # 例如:part为"3001.dat" → 对应路径为 "./part_renders/3001.dat.png" part_base = part.replace(".dat", "").replace("/", "_") # 统一转为小写并移除非法路径分隔符 part_render_path = os.path.join(DEFAULT_PART_RENDER_PATH, f"{part_base}.png") # 检查文件是否存在,不存在则使用默认缺失图(可选逻辑) if not os.path.exists(part_render_path): # 若需要,可指定一张"未知零件"的默认图路径 part_render_path = UNKNOWN_PART_IMG renderings.append((part_render_path, part)) # (图片路径, 零件名) return renderings def process_data(data): max_num_tokens = 410 processed_data = [] def padding(data, max_len=300): pad_data = np.pad(data, ((0, max_len - data.shape[0]), (0, 0)), 'constant', constant_values=-1) pad_data[data.shape[0]-max_len:,-1] = 1 #flag label pad_data[data.shape[0]-max_len:,-2] = 0 return pad_data processed_data.append(padding(data, max_num_tokens)) return processed_data # 处理上传的LDR文件(保持原逻辑,增强异常捕获) def process_ldr_file(file, process_for_model=True): """ Process LDR file for display and optionally for model inference Args: file: File object with .name attribute pointing to LDR file process_for_model: If True, convert to numerical format for ML model (requires label mapping). If False, skip numerical conversion (only extract parts for visualization). Returns: Tuple of (renderings, part_list, status, process_ldr_data, None, None) """ if not file: return None, None, "Please upload an LDR file", None, None, None # Read LDR content with open(file.name, 'r') as f: ldr_content = f.read() # Extract part names for visualization (always needed) part_names = [] for line in ldr_content.splitlines(): stripped_line = line.strip() if stripped_line: # 跳过空行 parts = stripped_line.split() # 检查第一列是否为'1',且行中至少有足够的元素 if len(parts) > 0 and parts[0] == '1' and len(parts) >= 12: part_name = parts[-1].lower() # 取最后一列并转为小写 part_names.append(part_name) renderings = get_part_renderings(part_names) part_list = "\n".join(part_names) # Conditionally process for ML model (requires label mapping) print(f"🔍 [DEBUG] process_ldr_file: process_for_model = {process_for_model}") if process_for_model: print(f"🔍 [DEBUG] Opening LDR file: {file.name}") with open(file.name, 'r') as f: lines = f.readlines() print(f"🔍 [DEBUG] Read {len(lines)} lines from LDR file") print(f"🔍 [DEBUG] Calling process_ldr_flatten...") ldr_data, _ = process_ldr_flatten(lines) print(f"🔍 [DEBUG] process_ldr_flatten returned: type={type(ldr_data)}, shape={ldr_data.shape if hasattr(ldr_data, 'shape') else 'N/A'}") # Sort sort_cols = ldr_data[:, [-4, -5, -3]] sort_idx = np.lexsort((sort_cols[:, 2], sort_cols[:, 1], sort_cols[:, 0])) ldr_data = ldr_data[sort_idx] print(f"🔍 [DEBUG] Calling process_data...") process_ldr_data = process_data(ldr_data) print(f"🔍 [DEBUG] process_data returned: type={type(process_ldr_data)}, value={'None' if process_ldr_data is None else 'data'}") else: # Skip numerical conversion - not needed for visualization process_ldr_data = None print(f"🔍 [DEBUG] Skipping numerical conversion (process_for_model=False)") print(f"🔍 [DEBUG] Final process_ldr_data: {'None' if process_ldr_data is None else 'has data'}") return renderings, part_list, f"File loaded, {len(part_names)} valid parts identified", process_ldr_data, None, None # except Exception as e: # return None, None, f"File processing failed: {str(e)}", None, None # Process LDR from file system path (for text-generated designs) def process_ldr_from_path(ldr_path, process_for_model=False): """ Process LDR file from file system path (not Gradio upload) Args: ldr_path: Absolute path to LDR file process_for_model: If True, convert to numerical format for ML model. If False (default), skip numerical conversion for visualization-only. Returns: Tuple of (renderings, part_list, status, process_ldr_data, None, None) """ if not os.path.exists(ldr_path): return None, None, f"LDR file not found: {ldr_path}", None, None, None # Create a mock file object to reuse process_ldr_file logic class MockFile: def __init__(self, path): self.name = path mock_file = MockFile(ldr_path) return process_ldr_file(mock_file, process_for_model=process_for_model) # Unified input handler: supports both file upload and text query def unified_input_handler(file, text_query): """ Unified input handler for both file upload and text description Priority: 1. If file is uploaded, use it 2. If text is provided, use CLIP retrieval 3. Otherwise, show error """ # Case 1: File upload (original flow) if file is not None: return process_ldr_file(file) # Case 2: Text query (neural generation) elif text_query and text_query.strip(): if not CLIP_AVAILABLE: return None, None, "❌ Text-to-LEGO feature is not available (generation module not loaded)", None, None, None try: # Generate LDR design from text query = text_query.strip() print(f"🎨 Generating design from: {query}") # Lazy load CLIP retriever (cached) retriever = get_clip_retriever_cached() result = retriever.get_best_match(query) if result is None or not result.get("ldr_exists", True): return None, None, f"❌ Could not generate design for '{query}'", None, None, None ldr_path = result["ldr_path"] confidence = result["similarity"] car_id = result["car_id"] print(f"✅ Found reference design: car_{car_id} (confidence: {confidence:.3f})") # Process the LDR design for GPT model (WITH numerical conversion) renderings, part_list, status, process_ldr_data, _, _ = process_ldr_from_path( ldr_path, process_for_model=True # Enable label mapping for GPT generation ) # Check if numerical conversion succeeded if process_ldr_data is None: return None, None, f"❌ Failed to convert LDR to model format (missing label mappings)", None, None, None # Generate new LDR using GPT model (GPU-accelerated) new_ldr_filename = f"generated_{uuid.uuid4()}.ldr" new_ldr_path = os.path.join(TMP_DIR, new_ldr_filename) predicted_ldr_lines = generate_ldr_gpu(process_ldr_data, new_ldr_path) # Render the GPT-generated LDR file print(f"🎨 Rendering GPT-generated LEGO design...") render_filename = f"generated_{uuid.uuid4()}.png" render_path = os.path.join(TMP_DIR, render_filename) render_bricks_safe(new_ldr_path, render_path) rendered_image = render_path # Update status message with generation info enhanced_status = f"✨ Generated from car_{car_id} (confidence: {confidence*100:.1f}%)\n🤖 GPT model created new assembly sequence\n{status}" # Read generated LDR content for display with open(new_ldr_path, 'r', encoding='utf-8') as f: ldr_text = f.read() return renderings, part_list, enhanced_status, process_ldr_data, ldr_text, rendered_image except Exception as e: import traceback error_msg = f"❌ Design generation failed: {str(e)}\n{traceback.format_exc()}" print(error_msg) return None, None, error_msg, None, None, None # Case 3: No input else: return None, None, "⚠️ Please upload an LDR file OR enter a text description", None, None, None import traceback # 导入traceback,用于打印完整堆栈 @spaces.GPU(duration=120) # GPT generation can take up to 120 seconds def generate_ldr_gpu(ldr_content, ldr_path): """ Generate LDR file using GPT model (GPU-accelerated) This function is decorated with @spaces.GPU to enable GPU allocation on HuggingFace ZeroGPU Spaces. The engine is loaded lazily and cached. Args: ldr_content: Numerical LDR data (numpy array) ldr_path: Output path for generated LDR file Returns: List of predicted LDR lines """ print("🤖 Running GPT model to generate new assembly sequence...") print(" Using CUDA graphs, this will take some time to warmup and capture the graph.") stride = 5 rot_num = 24 bert_shift = 1 shift = 0 # Prepare checkpoint paths (3 separate weight files as per original demo.zip design) config_path = os.path.join(os.path.dirname(__file__), 'cube3d/configs/open_model_v0.5.yaml') # Detect HuggingFace Spaces environment is_hf_space = os.getenv("SPACE_ID") is not None if is_hf_space: # HF Spaces: Use pre-downloaded weights from build-time cache from huggingface_hub import hf_hub_download print("📂 Loading pre-cached model weights from build...") # Load base GPT model (7.17 GB, pre-downloaded during build) gpt_ckpt_path = hf_hub_download( repo_id="0xZohar/object-assembler-models", filename="shape_gpt.safetensors", cache_dir=HF_CACHE_DIR, local_files_only=True ) print(f" ✓ Base GPT model loaded from cache") # Load shape tokenizer (1.09 GB, pre-downloaded during build) shape_ckpt_path = hf_hub_download( repo_id="0xZohar/object-assembler-models", filename="shape_tokenizer.safetensors", cache_dir=HF_CACHE_DIR, local_files_only=True ) print(f" ✓ Shape tokenizer loaded from cache") # Load fine-tuned adapter (1.68 GB, pre-downloaded during build) save_gpt_ckpt_path = hf_hub_download( repo_id="0xZohar/object-assembler-models", filename="save_shape_cars_whole_p_rot_scratch_4mask_randp.safetensors", cache_dir=HF_CACHE_DIR, local_files_only=True ) print(f" ✓ Fine-tuned adapter loaded from cache") else: # Local environment: Use local paths (matching original demo.zip structure) gpt_ckpt_path = 'temp_weights/shape_gpt.safetensors' shape_ckpt_path = 'temp_weights/shape_tokenizer.safetensors' save_gpt_ckpt_path = '/private/tmp/demo_extracted/demo/code/model_weights/save_shape_cars_whole_p_rot_scratch_4mask_randp.safetensors' # Create fresh engine instance (fixes state corruption from caching) device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') engine = EngineFast( config_path, gpt_ckpt_path, shape_ckpt_path, save_gpt_ckpt_path, device=device, mode='test' ) print(" Compiled the graph.") targets_source = torch.from_numpy(ldr_content[0]).to(device).unsqueeze(0) targets = targets_source.clone() logits, inputs_ids, strategy, mask, cut_idx = generate_tokens( engine, '', targets, None, None, False, 0.9, None, 1, 'test' ) targets = targets_source.clone() targets[:,shift:,-7] = logits[:,1:-3:stride,:rot_num+1].permute(0, 2, 1).argmax(dim=1) logits_x, inputs_ids, strategy, mask, cut_idx = generate_tokens( engine, '', targets, None, None, False, 0.9, None, 0, 'test' ) logits_x[:,1+bert_shift:-3:stride,:rot_num+1] = logits[:,1+bert_shift:-3:stride,:rot_num+1] predict_ldr = logits2botldrpr(logits_x[0].cpu().detach().numpy(), inputs_ids[0].cpu().detach().numpy(), stride, 0, output_file=ldr_path) print(f"✅ GPT generated {len(predict_ldr)} parts") return predict_ldr # CPU wrapper function for predict_and_render (non-GPU operations) def predict_and_render(ldr_content): """ Predict and render LDR file (orchestrator function) This function handles non-GPU operations (file I/O, rendering) and calls GPU-accelerated functions when needed. """ if not ldr_content: return "Please upload an LDR file first", None, None ldr_filename = f"{uuid.uuid4()}.ldr" ldr_path = os.path.join(TMP_DIR, ldr_filename) # Call GPU-accelerated function predicted_ldr = generate_ldr_gpu(ldr_content, ldr_path) # 渲染新LDR render_filename = f"{uuid.uuid4()}.png" render_path = os.path.join(TMP_DIR, render_filename) render_bricks_safe(ldr_path, render_path) return predicted_ldr, ldr_path, render_path #except Exception as e: # error_msg = f"类型: {type(e).__name__}, 信息: {str(e)}, 堆栈: {traceback.format_exc()}" # return f"Prediction failed: {error_msg}", None, None # 清除临时文件(保持原逻辑) def clean_temp_files(): try: shutil.rmtree(TMP_DIR) os.makedirs(TMP_DIR, exist_ok=True) return "临时文件已清理" except Exception as e: return f"清理失败: {str(e)}" #gr.Blocks.set_language("en") _DESCRIPTION = ''' * **Option 1**: Upload an LDR file with part names * **Option 2**: Describe your desired LEGO design in text (e.g., "red sports car") * Generate a 3D assembly plan in LDR format ''' with gr.Blocks( title="ObjectAssembler: Assemble Your Object with Diverse Components", ) as demo: gr.Markdown("ObjectAssembler: Assemble Your Object with Diverse Components") gr.Markdown(_DESCRIPTION) original_ldr = gr.State("") with gr.Row(): with gr.Column(scale=1): gr.Markdown("### Input Method") ldr_file = gr.File( label="Upload LDR File", file_types=[".ldr"], ) gr.Markdown("**— OR —**") text_input = gr.Textbox( label="Describe Your Design", placeholder="e.g., red sports car, blue police car, yellow construction vehicle...", lines=2 ) upload_btn = gr.Button("Load Input", variant="secondary") predict_btn = gr.Button("Generate New LDR & Render", variant="primary") clean_btn = gr.Button("Clean Temporary Files", variant="stop") status_msg = gr.Textbox(label="Status Info", interactive=False) gr.Markdown("### Original Part List") part_list = gr.Textbox(lines=6, label="Part Names", interactive=False) with gr.Column(scale=2): gr.Markdown("### Part Preview") part_renderings = gr.Gallery( label="Part List Visualization", columns=[6], rows=[2], object_fit="contain", height="auto" ) gr.Markdown("### Generated LDR Content") predicted_ldr = gr.Textbox(lines=8, label="New LDR Format", interactive=False) gr.Markdown("### Rendering Result") render_result = gr.Image(label="Part Assembly Visualization", height=300) ldr_download = gr.File(label="Download New LDR File") # 事件绑定 upload_btn.click( fn=unified_input_handler, inputs=[ldr_file, text_input], outputs=[part_renderings, part_list, status_msg, original_ldr, predicted_ldr, render_result] ) predict_btn.click( fn=predict_and_render, inputs=[original_ldr], outputs=[predicted_ldr, ldr_download, render_result] ) clean_btn.click( fn=clean_temp_files, inputs=[], outputs=[status_msg] ) # 远程服务器启动配置(Hugging Face Spaces 兼容) if __name__ == "__main__": import os # 检测是否在 Hugging Face Spaces 环境 is_hf_space = os.getenv("SPACE_ID") is not None print("\n" + "="*50) print("🚀 LEGO 3D建模序列生成系统启动中...") print("="*50) # ZeroGPU: Models are loaded lazily (on first use) to avoid CUDA initialization at startup if CLIP_AVAILABLE: print("✅ CLIP text-to-design feature enabled (lazy loading)") print(" Models will be initialized on first use") else: print("⚠️ CLIP module not available - text-to-LEGO disabled") if ZEROGPU_AVAILABLE: print("✅ ZeroGPU support enabled - GPU allocation on demand") else: print("⚠️ Running in standard mode (no ZeroGPU)") if is_hf_space: print("🌐 运行环境: Hugging Face Spaces") # Hugging Face Spaces Docker SDK 需要显式指定端口 demo.queue() demo.launch( server_name="0.0.0.0", server_port=7860, show_error=True, allowed_paths=[os.path.abspath(DEFAULT_PART_RENDER_PATH)] ) else: import threading import time print("💻 运行环境: 本地服务器") # 在后台线程中启动,避免阻塞 def launch_gradio(): try: demo.queue() # 启用队列功能 demo.launch( server_name="0.0.0.0", # 允许所有IP访问 server_port=8080, # 修改为8080端口避免冲突 share=False, # 关闭公网临时链接 quiet=False, # 显示日志输出便于调试 show_error=True, # 显示错误便于调试 debug=False, # 调试模式 inbrowser=False, # 不自动打开浏览器 prevent_thread_lock=True, # 防止线程锁定 allowed_paths=[ os.path.abspath(DEFAULT_PART_RENDER_PATH) # 转换为绝对路径 ] ) except Exception as e: print(f"启动时出现警告(可忽略): {e}") print("服务器已在 http://0.0.0.0:8080 上运行") # 启动Gradio thread = threading.Thread(target=launch_gradio, daemon=False) thread.start() # 保持主线程运行 print(f"📍 访问地址: http://localhost:8080") print(f"🔧 Blender: 已安装 (3.6.18)") print(f"🤖 模型权重: 已加载 (1.6GB)") print(f"📁 示例文件: examples/ldr_file/") print("="*50) print("\n按 Ctrl+C 停止服务器\n") try: while True: time.sleep(1) except KeyboardInterrupt: print("\n正在关闭服务器...") exit(0) # test_ldr_path = "../data/car_1k/demos/example/ldr_filter_truck_abnormal_rot_expand_trans_mid_final/modified_car_1_rot.ldr" # mock_file = MockFileStorage(test_ldr_path) # renderings, part_list, _, ldr_content, _ = process_ldr_file(mock_file) # # if result: # # print(f"调试结果:{result}") # # else: # # print("调试失败") # predict_and_render(ldr_content)