0xZohar's picture
Fix startup hang: 使用 local_files_only=True
657e4d3 verified
import gradio as gr
import os
import uuid
import shutil
import functools
from PIL import Image, ImageDraw, ImageFont
import numpy as np
import torch
# Normalize OMP threads for libgomp (HF Spaces sometimes inject invalid values)
_omp_env = os.getenv("OMP_NUM_THREADS", "")
if _omp_env and not _omp_env.isdigit():
os.environ["OMP_NUM_THREADS"] = "4"
# ZeroGPU Support - CRITICAL for HuggingFace Spaces
try:
import spaces
ZEROGPU_AVAILABLE = True
print("✅ ZeroGPU support enabled")
except ImportError:
print("⚠️ ZeroGPU not available - running in standard mode")
ZEROGPU_AVAILABLE = False
# Create dummy decorator for local development
class spaces:
@staticmethod
def GPU(duration=60):
def decorator(func):
return func
return decorator
#from cube3d.render.render_bricks import render_bricks
from cube3d.render.render_bricks_safe import render_bricks_safe
from cube3d.training.engine import Engine, EngineFast
from cube3d.training.bert_infer import generate_tokens
from cube3d.training.utils import normalize_bboxs
from cube3d.training.process_single_ldr import process_ldr_data, process_ldr_flatten, logits2botldrpr
from cube3d.config import HF_CACHE_DIR
# Neural design generation for text-to-LEGO functionality
try:
from clip_retrieval import get_retriever
CLIP_AVAILABLE = True
except ImportError:
print("⚠️ Text-to-design module not available. Text input feature will be disabled.")
CLIP_AVAILABLE = False
# Lazy loading for GPU models (ZeroGPU requirement)
_retriever = None
_gpt_engine = None
@functools.lru_cache(maxsize=1)
def get_clip_retriever_cached():
"""Lazy load CLIP retriever (initialized only once, cached)"""
print("🔧 Initializing CLIP retriever (one-time setup)...")
retriever = get_retriever(data_root="data/1313个筛选车结构和对照渲染图")
print(f"✅ CLIP retriever loaded ({retriever.features.shape[0]} designs)")
return retriever
# Removed cached engine - creates fresh instance each time to prevent state corruption
# 确保临时目录存在(远程服务器路径)
TMP_DIR = "./tmp/ldr_processor_demo"
os.makedirs(TMP_DIR, exist_ok=True)
class MockFileStorage:
def __init__(self, file_path):
self.name = file_path # 关键:模拟文件路径属性,和 Gadio 保持一致
# 模型预测函数(保持原逻辑)
def model_predict(ldr_content):
parts = [line.strip() for line in ldr_content.splitlines() if line.strip()]
positions = [(120.0, 0, 180.0), (90.0, 0, 210.0), (90.0, 0, 180.0), (70.0, 0, 170.0)]
color_code = 115
result = []
for i, part in enumerate(parts):
pos = positions[i % len(positions)]
part_line = f"1 {color_code} {pos[0]} {pos[1]} {pos[2]} 0 0 1 0 1 0 -1 0 0 {part}"
result.append(part_line)
if i < len(parts) - 1:
result.append("0 STEP")
return "\n".join(result)
DEFAULT_PART_RENDER_PATH = "../data/car_1k/demos/example/part_ldr_1k_render/"
os.makedirs(DEFAULT_PART_RENDER_PATH, exist_ok=True)
# Ensure a visual placeholder exists to avoid broken images in the gallery
UNKNOWN_PART_IMG = os.path.join(DEFAULT_PART_RENDER_PATH, "unknown_part.png")
if not os.path.exists(UNKNOWN_PART_IMG):
os.makedirs(os.path.dirname(UNKNOWN_PART_IMG), exist_ok=True)
img = Image.new("RGB", (256, 256), color=(240, 240, 240))
draw = ImageDraw.Draw(img)
text = "No preview"
draw.text((70, 120), text, fill=(120, 120, 120))
img.save(UNKNOWN_PART_IMG)
def get_part_renderings(part_names):
renderings = []
for part in part_names:
# 拼接零件对应的渲染图路径(假设文件名与part_name一致,后缀为.png)
# 例如:part为"3001.dat" → 对应路径为 "./part_renders/3001.dat.png"
part_base = part.replace(".dat", "").replace("/", "_") # 统一转为小写并移除非法路径分隔符
part_render_path = os.path.join(DEFAULT_PART_RENDER_PATH, f"{part_base}.png")
# 检查文件是否存在,不存在则使用默认缺失图(可选逻辑)
if not os.path.exists(part_render_path):
# 若需要,可指定一张"未知零件"的默认图路径
part_render_path = UNKNOWN_PART_IMG
renderings.append((part_render_path, part)) # (图片路径, 零件名)
return renderings
def process_data(data):
max_num_tokens = 410
processed_data = []
def padding(data, max_len=300):
pad_data = np.pad(data, ((0, max_len - data.shape[0]), (0, 0)), 'constant', constant_values=-1)
pad_data[data.shape[0]-max_len:,-1] = 1 #flag label
pad_data[data.shape[0]-max_len:,-2] = 0
return pad_data
processed_data.append(padding(data, max_num_tokens))
return processed_data
# 处理上传的LDR文件(保持原逻辑,增强异常捕获)
def process_ldr_file(file, process_for_model=True):
"""
Process LDR file for display and optionally for model inference
Args:
file: File object with .name attribute pointing to LDR file
process_for_model: If True, convert to numerical format for ML model (requires label mapping).
If False, skip numerical conversion (only extract parts for visualization).
Returns:
Tuple of (renderings, part_list, status, process_ldr_data, None, None)
"""
if not file:
return None, None, "Please upload an LDR file", None, None, None
# Read LDR content
with open(file.name, 'r') as f:
ldr_content = f.read()
# Extract part names for visualization (always needed)
part_names = []
for line in ldr_content.splitlines():
stripped_line = line.strip()
if stripped_line: # 跳过空行
parts = stripped_line.split()
# 检查第一列是否为'1',且行中至少有足够的元素
if len(parts) > 0 and parts[0] == '1' and len(parts) >= 12:
part_name = parts[-1].lower() # 取最后一列并转为小写
part_names.append(part_name)
renderings = get_part_renderings(part_names)
part_list = "\n".join(part_names)
# Conditionally process for ML model (requires label mapping)
print(f"🔍 [DEBUG] process_ldr_file: process_for_model = {process_for_model}")
if process_for_model:
print(f"🔍 [DEBUG] Opening LDR file: {file.name}")
with open(file.name, 'r') as f:
lines = f.readlines()
print(f"🔍 [DEBUG] Read {len(lines)} lines from LDR file")
print(f"🔍 [DEBUG] Calling process_ldr_flatten...")
ldr_data, _ = process_ldr_flatten(lines)
print(f"🔍 [DEBUG] process_ldr_flatten returned: type={type(ldr_data)}, shape={ldr_data.shape if hasattr(ldr_data, 'shape') else 'N/A'}")
# Sort
sort_cols = ldr_data[:, [-4, -5, -3]]
sort_idx = np.lexsort((sort_cols[:, 2], sort_cols[:, 1], sort_cols[:, 0]))
ldr_data = ldr_data[sort_idx]
print(f"🔍 [DEBUG] Calling process_data...")
process_ldr_data = process_data(ldr_data)
print(f"🔍 [DEBUG] process_data returned: type={type(process_ldr_data)}, value={'None' if process_ldr_data is None else 'data'}")
else:
# Skip numerical conversion - not needed for visualization
process_ldr_data = None
print(f"🔍 [DEBUG] Skipping numerical conversion (process_for_model=False)")
print(f"🔍 [DEBUG] Final process_ldr_data: {'None' if process_ldr_data is None else 'has data'}")
return renderings, part_list, f"File loaded, {len(part_names)} valid parts identified", process_ldr_data, None, None
# except Exception as e:
# return None, None, f"File processing failed: {str(e)}", None, None
# Process LDR from file system path (for text-generated designs)
def process_ldr_from_path(ldr_path, process_for_model=False):
"""
Process LDR file from file system path (not Gradio upload)
Args:
ldr_path: Absolute path to LDR file
process_for_model: If True, convert to numerical format for ML model.
If False (default), skip numerical conversion for visualization-only.
Returns:
Tuple of (renderings, part_list, status, process_ldr_data, None, None)
"""
if not os.path.exists(ldr_path):
return None, None, f"LDR file not found: {ldr_path}", None, None, None
# Create a mock file object to reuse process_ldr_file logic
class MockFile:
def __init__(self, path):
self.name = path
mock_file = MockFile(ldr_path)
return process_ldr_file(mock_file, process_for_model=process_for_model)
# Unified input handler: supports both file upload and text query
def unified_input_handler(file, text_query):
"""
Unified input handler for both file upload and text description
Priority:
1. If file is uploaded, use it
2. If text is provided, use CLIP retrieval
3. Otherwise, show error
"""
# Case 1: File upload (original flow)
if file is not None:
return process_ldr_file(file)
# Case 2: Text query (neural generation)
elif text_query and text_query.strip():
if not CLIP_AVAILABLE:
return None, None, "❌ Text-to-LEGO feature is not available (generation module not loaded)", None, None, None
try:
# Generate LDR design from text
query = text_query.strip()
print(f"🎨 Generating design from: {query}")
# Lazy load CLIP retriever (cached)
retriever = get_clip_retriever_cached()
result = retriever.get_best_match(query)
if result is None or not result.get("ldr_exists", True):
return None, None, f"❌ Could not generate design for '{query}'", None, None, None
ldr_path = result["ldr_path"]
confidence = result["similarity"]
car_id = result["car_id"]
print(f"✅ Found reference design: car_{car_id} (confidence: {confidence:.3f})")
# Process the LDR design for GPT model (WITH numerical conversion)
renderings, part_list, status, process_ldr_data, _, _ = process_ldr_from_path(
ldr_path,
process_for_model=True # Enable label mapping for GPT generation
)
# Check if numerical conversion succeeded
if process_ldr_data is None:
return None, None, f"❌ Failed to convert LDR to model format (missing label mappings)", None, None, None
# Generate new LDR using GPT model (GPU-accelerated)
new_ldr_filename = f"generated_{uuid.uuid4()}.ldr"
new_ldr_path = os.path.join(TMP_DIR, new_ldr_filename)
predicted_ldr_lines = generate_ldr_gpu(process_ldr_data, new_ldr_path)
# Render the GPT-generated LDR file
print(f"🎨 Rendering GPT-generated LEGO design...")
render_filename = f"generated_{uuid.uuid4()}.png"
render_path = os.path.join(TMP_DIR, render_filename)
render_bricks_safe(new_ldr_path, render_path)
rendered_image = render_path
# Update status message with generation info
enhanced_status = f"✨ Generated from car_{car_id} (confidence: {confidence*100:.1f}%)\n🤖 GPT model created new assembly sequence\n{status}"
# Read generated LDR content for display
with open(new_ldr_path, 'r', encoding='utf-8') as f:
ldr_text = f.read()
return renderings, part_list, enhanced_status, process_ldr_data, ldr_text, rendered_image
except Exception as e:
import traceback
error_msg = f"❌ Design generation failed: {str(e)}\n{traceback.format_exc()}"
print(error_msg)
return None, None, error_msg, None, None, None
# Case 3: No input
else:
return None, None, "⚠️ Please upload an LDR file OR enter a text description", None, None, None
import traceback # 导入traceback,用于打印完整堆栈
@spaces.GPU(duration=120) # GPT generation can take up to 120 seconds
def generate_ldr_gpu(ldr_content, ldr_path):
"""
Generate LDR file using GPT model (GPU-accelerated)
This function is decorated with @spaces.GPU to enable GPU allocation
on HuggingFace ZeroGPU Spaces. The engine is loaded lazily and cached.
Args:
ldr_content: Numerical LDR data (numpy array)
ldr_path: Output path for generated LDR file
Returns:
List of predicted LDR lines
"""
print("🤖 Running GPT model to generate new assembly sequence...")
print(" Using CUDA graphs, this will take some time to warmup and capture the graph.")
stride = 5
rot_num = 24
bert_shift = 1
shift = 0
# Prepare checkpoint paths (3 separate weight files as per original demo.zip design)
config_path = os.path.join(os.path.dirname(__file__), 'cube3d/configs/open_model_v0.5.yaml')
# Detect HuggingFace Spaces environment
is_hf_space = os.getenv("SPACE_ID") is not None
if is_hf_space:
# HF Spaces: Use pre-downloaded weights from build-time cache
from huggingface_hub import hf_hub_download
print("📂 Loading pre-cached model weights from build...")
# Load base GPT model (7.17 GB, pre-downloaded during build)
gpt_ckpt_path = hf_hub_download(
repo_id="0xZohar/object-assembler-models",
filename="shape_gpt.safetensors",
cache_dir=HF_CACHE_DIR,
local_files_only=True
)
print(f" ✓ Base GPT model loaded from cache")
# Load shape tokenizer (1.09 GB, pre-downloaded during build)
shape_ckpt_path = hf_hub_download(
repo_id="0xZohar/object-assembler-models",
filename="shape_tokenizer.safetensors",
cache_dir=HF_CACHE_DIR,
local_files_only=True
)
print(f" ✓ Shape tokenizer loaded from cache")
# Load fine-tuned adapter (1.68 GB, pre-downloaded during build)
save_gpt_ckpt_path = hf_hub_download(
repo_id="0xZohar/object-assembler-models",
filename="save_shape_cars_whole_p_rot_scratch_4mask_randp.safetensors",
cache_dir=HF_CACHE_DIR,
local_files_only=True
)
print(f" ✓ Fine-tuned adapter loaded from cache")
else:
# Local environment: Use local paths (matching original demo.zip structure)
gpt_ckpt_path = 'temp_weights/shape_gpt.safetensors'
shape_ckpt_path = 'temp_weights/shape_tokenizer.safetensors'
save_gpt_ckpt_path = '/private/tmp/demo_extracted/demo/code/model_weights/save_shape_cars_whole_p_rot_scratch_4mask_randp.safetensors'
# Create fresh engine instance (fixes state corruption from caching)
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
engine = EngineFast(
config_path, gpt_ckpt_path, shape_ckpt_path, save_gpt_ckpt_path,
device=device,
mode='test'
)
print(" Compiled the graph.")
targets_source = torch.from_numpy(ldr_content[0]).to(device).unsqueeze(0)
targets = targets_source.clone()
logits, inputs_ids, strategy, mask, cut_idx = generate_tokens(
engine,
'',
targets,
None,
None,
False,
0.9,
None,
1,
'test'
)
targets = targets_source.clone()
targets[:,shift:,-7] = logits[:,1:-3:stride,:rot_num+1].permute(0, 2, 1).argmax(dim=1)
logits_x, inputs_ids, strategy, mask, cut_idx = generate_tokens(
engine,
'',
targets,
None,
None,
False,
0.9,
None,
0,
'test'
)
logits_x[:,1+bert_shift:-3:stride,:rot_num+1] = logits[:,1+bert_shift:-3:stride,:rot_num+1]
predict_ldr = logits2botldrpr(logits_x[0].cpu().detach().numpy(), inputs_ids[0].cpu().detach().numpy(), stride, 0, output_file=ldr_path)
print(f"✅ GPT generated {len(predict_ldr)} parts")
return predict_ldr
# CPU wrapper function for predict_and_render (non-GPU operations)
def predict_and_render(ldr_content):
"""
Predict and render LDR file (orchestrator function)
This function handles non-GPU operations (file I/O, rendering)
and calls GPU-accelerated functions when needed.
"""
if not ldr_content:
return "Please upload an LDR file first", None, None
ldr_filename = f"{uuid.uuid4()}.ldr"
ldr_path = os.path.join(TMP_DIR, ldr_filename)
# Call GPU-accelerated function
predicted_ldr = generate_ldr_gpu(ldr_content, ldr_path)
# 渲染新LDR
render_filename = f"{uuid.uuid4()}.png"
render_path = os.path.join(TMP_DIR, render_filename)
render_bricks_safe(ldr_path, render_path)
return predicted_ldr, ldr_path, render_path
#except Exception as e:
# error_msg = f"类型: {type(e).__name__}, 信息: {str(e)}, 堆栈: {traceback.format_exc()}"
# return f"Prediction failed: {error_msg}", None, None
# 清除临时文件(保持原逻辑)
def clean_temp_files():
try:
shutil.rmtree(TMP_DIR)
os.makedirs(TMP_DIR, exist_ok=True)
return "临时文件已清理"
except Exception as e:
return f"清理失败: {str(e)}"
#gr.Blocks.set_language("en")
_DESCRIPTION = '''
* **Option 1**: Upload an LDR file with part names
* **Option 2**: Describe your desired LEGO design in text (e.g., "red sports car")
* Generate a 3D assembly plan in LDR format
'''
with gr.Blocks(
title="ObjectAssembler: Assemble Your Object with Diverse Components",
) as demo:
gr.Markdown("ObjectAssembler: Assemble Your Object with Diverse Components")
gr.Markdown(_DESCRIPTION)
original_ldr = gr.State("")
with gr.Row():
with gr.Column(scale=1):
gr.Markdown("### Input Method")
ldr_file = gr.File(
label="Upload LDR File",
file_types=[".ldr"],
)
gr.Markdown("**— OR —**")
text_input = gr.Textbox(
label="Describe Your Design",
placeholder="e.g., red sports car, blue police car, yellow construction vehicle...",
lines=2
)
upload_btn = gr.Button("Load Input", variant="secondary")
predict_btn = gr.Button("Generate New LDR & Render", variant="primary")
clean_btn = gr.Button("Clean Temporary Files", variant="stop")
status_msg = gr.Textbox(label="Status Info", interactive=False)
gr.Markdown("### Original Part List")
part_list = gr.Textbox(lines=6, label="Part Names", interactive=False)
with gr.Column(scale=2):
gr.Markdown("### Part Preview")
part_renderings = gr.Gallery(
label="Part List Visualization",
columns=[6],
rows=[2],
object_fit="contain",
height="auto"
)
gr.Markdown("### Generated LDR Content")
predicted_ldr = gr.Textbox(lines=8, label="New LDR Format", interactive=False)
gr.Markdown("### Rendering Result")
render_result = gr.Image(label="Part Assembly Visualization", height=300)
ldr_download = gr.File(label="Download New LDR File")
# 事件绑定
upload_btn.click(
fn=unified_input_handler,
inputs=[ldr_file, text_input],
outputs=[part_renderings, part_list, status_msg, original_ldr, predicted_ldr, render_result]
)
predict_btn.click(
fn=predict_and_render,
inputs=[original_ldr],
outputs=[predicted_ldr, ldr_download, render_result]
)
clean_btn.click(
fn=clean_temp_files,
inputs=[],
outputs=[status_msg]
)
# 远程服务器启动配置(Hugging Face Spaces 兼容)
if __name__ == "__main__":
import os
# 检测是否在 Hugging Face Spaces 环境
is_hf_space = os.getenv("SPACE_ID") is not None
print("\n" + "="*50)
print("🚀 LEGO 3D建模序列生成系统启动中...")
print("="*50)
# ZeroGPU: Models are loaded lazily (on first use) to avoid CUDA initialization at startup
if CLIP_AVAILABLE:
print("✅ CLIP text-to-design feature enabled (lazy loading)")
print(" Models will be initialized on first use")
else:
print("⚠️ CLIP module not available - text-to-LEGO disabled")
if ZEROGPU_AVAILABLE:
print("✅ ZeroGPU support enabled - GPU allocation on demand")
else:
print("⚠️ Running in standard mode (no ZeroGPU)")
if is_hf_space:
print("🌐 运行环境: Hugging Face Spaces")
# Hugging Face Spaces Docker SDK 需要显式指定端口
demo.queue()
demo.launch(
server_name="0.0.0.0",
server_port=7860,
show_error=True,
allowed_paths=[os.path.abspath(DEFAULT_PART_RENDER_PATH)]
)
else:
import threading
import time
print("💻 运行环境: 本地服务器")
# 在后台线程中启动,避免阻塞
def launch_gradio():
try:
demo.queue() # 启用队列功能
demo.launch(
server_name="0.0.0.0", # 允许所有IP访问
server_port=8080, # 修改为8080端口避免冲突
share=False, # 关闭公网临时链接
quiet=False, # 显示日志输出便于调试
show_error=True, # 显示错误便于调试
debug=False, # 调试模式
inbrowser=False, # 不自动打开浏览器
prevent_thread_lock=True, # 防止线程锁定
allowed_paths=[
os.path.abspath(DEFAULT_PART_RENDER_PATH) # 转换为绝对路径
]
)
except Exception as e:
print(f"启动时出现警告(可忽略): {e}")
print("服务器已在 http://0.0.0.0:8080 上运行")
# 启动Gradio
thread = threading.Thread(target=launch_gradio, daemon=False)
thread.start()
# 保持主线程运行
print(f"📍 访问地址: http://localhost:8080")
print(f"🔧 Blender: 已安装 (3.6.18)")
print(f"🤖 模型权重: 已加载 (1.6GB)")
print(f"📁 示例文件: examples/ldr_file/")
print("="*50)
print("\n按 Ctrl+C 停止服务器\n")
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
print("\n正在关闭服务器...")
exit(0)
# test_ldr_path = "../data/car_1k/demos/example/ldr_filter_truck_abnormal_rot_expand_trans_mid_final/modified_car_1_rot.ldr"
# mock_file = MockFileStorage(test_ldr_path)
# renderings, part_list, _, ldr_content, _ = process_ldr_file(mock_file)
# # if result:
# # print(f"调试结果:{result}")
# # else:
# # print("调试失败")
# predict_and_render(ldr_content)