import torch import spaces import gradio as gr import sys import platform import diffusers import transformers import psutil import os import time import traceback from diffusers import BitsAndBytesConfig as DiffusersBitsAndBytesConfig from diffusers import ZImagePipeline, AutoModel from transformers import BitsAndBytesConfig as TransformersBitsAndBytesConfig latent_history = [] # ============================================================ # LOGGING BUFFER # ============================================================ LOGS = "" def log(msg): global LOGS print(msg) LOGS += msg + "\n" return msg # ============================================================ # SYSTEM METRICS โ€” LIVE GPU + CPU MONITORING # ============================================================ def log_system_stats(tag=""): try: log(f"\n===== ๐Ÿ”ฅ SYSTEM STATS {tag} =====") # ============= GPU STATS ============= if torch.cuda.is_available(): allocated = torch.cuda.memory_allocated(0) / 1e9 reserved = torch.cuda.memory_reserved(0) / 1e9 total = torch.cuda.get_device_properties(0).total_memory / 1e9 free = total - allocated log(f"๐Ÿ’  GPU Total : {total:.2f} GB") log(f"๐Ÿ’  GPU Allocated : {allocated:.2f} GB") log(f"๐Ÿ’  GPU Reserved : {reserved:.2f} GB") log(f"๐Ÿ’  GPU Free : {free:.2f} GB") # ============= CPU STATS ============ cpu = psutil.cpu_percent() ram_used = psutil.virtual_memory().used / 1e9 ram_total = psutil.virtual_memory().total / 1e9 log(f"๐Ÿง  CPU Usage : {cpu}%") log(f"๐Ÿง  RAM Used : {ram_used:.2f} GB / {ram_total:.2f} GB") except Exception as e: log(f"โš ๏ธ Failed to log system stats: {e}") # ============================================================ # ENVIRONMENT INFO # ============================================================ log("===================================================") log("๐Ÿ” Z-IMAGE-TURBO DEBUGGING + LIVE METRIC LOGGER") log("===================================================\n") log(f"๐Ÿ“Œ PYTHON VERSION : {sys.version.replace(chr(10),' ')}") log(f"๐Ÿ“Œ PLATFORM : {platform.platform()}") log(f"๐Ÿ“Œ TORCH VERSION : {torch.__version__}") log(f"๐Ÿ“Œ TRANSFORMERS VERSION : {transformers.__version__}") log(f"๐Ÿ“Œ DIFFUSERS VERSION : {diffusers.__version__}") log(f"๐Ÿ“Œ CUDA AVAILABLE : {torch.cuda.is_available()}") log_system_stats("AT STARTUP") if not torch.cuda.is_available(): raise RuntimeError("โŒ CUDA Required") device = "cuda" gpu_id = 0 # ============================================================ # MODEL SETTINGS # ============================================================ model_cache = "./weights/" model_id = "Tongyi-MAI/Z-Image-Turbo" torch_dtype = torch.bfloat16 USE_CPU_OFFLOAD = False log("\n===================================================") log("๐Ÿง  MODEL CONFIGURATION") log("===================================================") log(f"Model ID : {model_id}") log(f"Model Cache Directory : {model_cache}") log(f"torch_dtype : {torch_dtype}") log(f"USE_CPU_OFFLOAD : {USE_CPU_OFFLOAD}") log_system_stats("BEFORE TRANSFORMER LOAD") # ============================================================ # LORA SETTINGS # ============================================================ # ============================================================ # FUNCTION TO CONVERT LATENTS TO IMAGE # ============================================================ def latent_to_image(latent): """ Convert a latent tensor to a PIL image using pipe.vae """ try: img_tensor = pipe.vae.decode(latent) img_tensor = (img_tensor / 2 + 0.5).clamp(0, 1) pil_img = T.ToPILImage()(img_tensor[0].cpu()) # <--- single image return pil_img except Exception as e: log(f"โš ๏ธ Failed to decode latent: {e}") # fallback blank image return Image.new("RGB", (latent.shape[-1]*8, latent.shape[-2]*8), color=(255,255,255)) # ============================================================ # SAFE TRANSFORMER INSPECTION # ============================================================ def inspect_transformer(model, name): log(f"\n๐Ÿ”๐Ÿ” FULL TRANSFORMER DEBUG DUMP: {name}") log("=" * 80) try: log(f"Model class : {model.__class__.__name__}") log(f"DType : {getattr(model, 'dtype', 'unknown')}") log(f"Device : {next(model.parameters()).device}") log(f"Requires Grad? : {any(p.requires_grad for p in model.parameters())}") # Check quantization if hasattr(model, "is_loaded_in_4bit"): log(f"4bit Quantization : {model.is_loaded_in_4bit}") if hasattr(model, "is_loaded_in_8bit"): log(f"8bit Quantization : {model.is_loaded_in_8bit}") # Find blocks candidates = ["transformer_blocks", "blocks", "layers", "encoder", "model"] blocks = None chosen_attr = None for attr in candidates: if hasattr(model, attr): blocks = getattr(model, attr) chosen_attr = attr break log(f"Block container attr : {chosen_attr}") if blocks is None: log("โš ๏ธ No valid block container found.") return if not hasattr(blocks, "__len__"): log("โš ๏ธ Blocks exist but not iterable.") return total = len(blocks) log(f"Total Blocks : {total}") log("-" * 80) # Inspect first N blocks N = min(20, total) for i in range(N): block = blocks[i] log(f"\n๐Ÿงฉ Block [{i}/{total-1}]") log(f"Class: {block.__class__.__name__}") # Print submodules for n, m in block.named_children(): log(f" โ”œโ”€ {n}: {m.__class__.__name__}") # Print attention related if hasattr(block, "attn"): attn = block.attn log(f" โ”œโ”€ Attention: {attn.__class__.__name__}") log(f" โ”‚ Heads : {getattr(attn, 'num_heads', 'unknown')}") log(f" โ”‚ Dim : {getattr(attn, 'hidden_size', 'unknown')}") log(f" โ”‚ Backend : {getattr(attn, 'attention_backend', 'unknown')}") # Device + dtype info try: dev = next(block.parameters()).device log(f" โ”œโ”€ Device : {dev}") except StopIteration: pass try: dt = next(block.parameters()).dtype log(f" โ”œโ”€ DType : {dt}") except StopIteration: pass log("\n๐Ÿ”š END TRANSFORMER DEBUG DUMP") log("=" * 80) except Exception as e: log(f"โŒ ERROR IN INSPECTOR: {e}") import torch import time # ---------- UTILITY ---------- def pretty_header(title): log("\n\n" + "=" * 80) log(f"๐ŸŽ›๏ธ {title}") log("=" * 80 + "\n") # ---------- MEMORY ---------- def get_vram(prefix=""): try: allocated = torch.cuda.memory_allocated() / 1024**2 reserved = torch.cuda.memory_reserved() / 1024**2 log(f"{prefix}Allocated VRAM : {allocated:.2f} MB") log(f"{prefix}Reserved VRAM : {reserved:.2f} MB") except: log(f"{prefix}VRAM: CUDA not available") # ---------- MODULE INSPECT ---------- def inspect_module(name, module): pretty_header(f"๐Ÿ”ฌ Inspecting {name}") try: log(f"๐Ÿ“ฆ Class : {module.__class__.__name__}") log(f"๐Ÿ”ข DType : {getattr(module, 'dtype', 'unknown')}") log(f"๐Ÿ’ป Device : {next(module.parameters()).device}") log(f"๐Ÿงฎ Params : {sum(p.numel() for p in module.parameters()):,}") # Quantization state if hasattr(module, "is_loaded_in_4bit"): log(f"โš™๏ธ 4-bit QLoRA : {module.is_loaded_in_4bit}") if hasattr(module, "is_loaded_in_8bit"): log(f"โš™๏ธ 8-bit load : {module.is_loaded_in_8bit}") # Attention backend (DiT) if hasattr(module, "set_attention_backend"): try: attn = getattr(module, "attention_backend", None) log(f"๐Ÿš€ Attention Backend: {attn}") except: pass # Search for blocks candidates = ["transformer_blocks", "blocks", "layers", "encoder", "model"] blocks = None chosen_attr = None for attr in candidates: if hasattr(module, attr): blocks = getattr(module, attr) chosen_attr = attr break log(f"\n๐Ÿ“š Block Container : {chosen_attr}") if blocks is None: log("โš ๏ธ No block structure found") return if not hasattr(blocks, "__len__"): log("โš ๏ธ Blocks exist but are not iterable") return total = len(blocks) log(f"๐Ÿ”ข Total Blocks : {total}\n") # Inspect first 15 blocks N = min(15, total) for i in range(N): blk = blocks[i] log(f"\n๐Ÿงฉ Block [{i}/{total-1}] โ€” {blk.__class__.__name__}") for n, m in blk.named_children(): log(f" โ”œโ”€ {n:<15} {m.__class__.__name__}") # Attention details if hasattr(blk, "attn"): a = blk.attn log(f" โ”œโ”€ Attention") log(f" โ”‚ Heads : {getattr(a, 'num_heads', 'unknown')}") log(f" โ”‚ Dim : {getattr(a, 'hidden_size', 'unknown')}") log(f" โ”‚ Backend : {getattr(a, 'attention_backend', 'unknown')}") # Device / dtype try: log(f" โ”œโ”€ Device : {next(blk.parameters()).device}") log(f" โ”œโ”€ DType : {next(blk.parameters()).dtype}") except StopIteration: pass get_vram(" โ–ถ ") except Exception as e: log(f"โŒ Module inspect error: {e}") # ---------- LORA INSPECTION ---------- def inspect_loras(pipe): pretty_header("๐Ÿงฉ LoRA ADAPTERS") try: if not hasattr(pipe, "lora_state_dict") and not hasattr(pipe, "adapter_names"): log("โš ๏ธ No LoRA system detected.") return if hasattr(pipe, "adapter_names"): names = pipe.adapter_names log(f"Available Adapters: {names}") if hasattr(pipe, "active_adapters"): log(f"Active Adapters : {pipe.active_adapters}") if hasattr(pipe, "lora_scale"): log(f"LoRA Scale : {pipe.lora_scale}") # LoRA modules if hasattr(pipe, "transformer") and hasattr(pipe.transformer, "modules"): for name, module in pipe.transformer.named_modules(): if "lora" in name.lower(): log(f" ๐Ÿ”ง LoRA Module: {name} ({module.__class__.__name__})") except Exception as e: log(f"โŒ LoRA inspect error: {e}") # ---------- PIPELINE INSPECTOR ---------- def debug_pipeline(pipe): pretty_header("๐Ÿš€ FULL PIPELINE DEBUGGING") try: log(f"Pipeline Class : {pipe.__class__.__name__}") log(f"Attention Impl : {getattr(pipe, 'attn_implementation', 'unknown')}") log(f"Device : {pipe.device}") except: pass get_vram("โ–ถ ") # Inspect TRANSFORMER if hasattr(pipe, "transformer"): inspect_module("Transformer", pipe.transformer) # Inspect TEXT ENCODER if hasattr(pipe, "text_encoder") and pipe.text_encoder is not None: inspect_module("Text Encoder", pipe.text_encoder) # Inspect UNET (if ZImage pipeline has it) if hasattr(pipe, "unet"): inspect_module("UNet", pipe.unet) # LoRA adapters inspect_loras(pipe) pretty_header("๐ŸŽ‰ END DEBUG REPORT") # ============================================================ # LOAD TRANSFORMER โ€” WITH LIVE STATS # ============================================================ log("\n===================================================") log("๐Ÿ”ง LOADING TRANSFORMER BLOCK") log("===================================================") log("๐Ÿ“Œ Logging memory before load:") log_system_stats("START TRANSFORMER LOAD") try: quant_cfg = DiffusersBitsAndBytesConfig( load_in_4bit=True, bnb_4bit_quant_type="nf4", bnb_4bit_compute_dtype=torch_dtype, bnb_4bit_use_double_quant=True, ) transformer = AutoModel.from_pretrained( model_id, cache_dir=model_cache, subfolder="transformer", quantization_config=quant_cfg, torch_dtype=torch_dtype, device_map=device, ) log("โœ… Transformer loaded successfully.") except Exception as e: log(f"โŒ Transformer load failed: {e}") transformer = None log_system_stats("AFTER TRANSFORMER LOAD") if transformer: inspect_transformer(transformer, "Transformer") # ============================================================ # LOAD TEXT ENCODER # ============================================================ log("\n===================================================") log("๐Ÿ”ง LOADING TEXT ENCODER") log("===================================================") log_system_stats("START TEXT ENCODER LOAD") try: quant_cfg2 = TransformersBitsAndBytesConfig( load_in_4bit=True, bnb_4bit_quant_type="nf4", bnb_4bit_compute_dtype=torch_dtype, bnb_4bit_use_double_quant=True, ) text_encoder = AutoModel.from_pretrained( model_id, cache_dir=model_cache, subfolder="text_encoder", quantization_config=quant_cfg2, torch_dtype=torch_dtype, device_map=device, ) log("โœ… Text encoder loaded successfully.") except Exception as e: log(f"โŒ Text encoder load failed: {e}") text_encoder = None log_system_stats("AFTER TEXT ENCODER LOAD") if text_encoder: inspect_transformer(text_encoder, "Text Encoder") # ============================================================ # BUILD PIPELINE # ============================================================ log("\n===================================================") log("๐Ÿ”ง BUILDING PIPELINE") log("===================================================") log_system_stats("START PIPELINE BUILD") try: pipe = ZImagePipeline.from_pretrained( model_id, transformer=transformer, text_encoder=text_encoder, torch_dtype=torch_dtype, ) # Prefer flash attention if supported try: if hasattr(pipe, "transformer") and hasattr(pipe.transformer, "set_attention_backend"): pipe.transformer.set_attention_backend("_flash_3") log("โœ… transformer.set_attention_backend('_flash_3') called") except Exception as _e: log(f"โš ๏ธ set_attention_backend failed: {_e}") # ๐Ÿšซ NO default LoRA here # ๐Ÿšซ NO fuse # ๐Ÿšซ NO unload pipe.to("cuda") log("โœ… Pipeline built successfully.") LOGS += log("Pipeline build completed.") + "\n" except Exception as e: log(f"โŒ Pipeline build failed: {e}") log(traceback.format_exc()) pipe = None log_system_stats("AFTER PIPELINE BUILD") # ----------------------------- # Monkey-patch prepare_latents (safe) # ----------------------------- if pipe is not None and hasattr(pipe, "prepare_latents"): original_prepare_latents = pipe.prepare_latents def logged_prepare_latents(self, batch_size, num_channels_latents, height, width, dtype, device, generator, latents=None): try: result_latents = original_prepare_latents(batch_size, num_channels_latents, height, width, dtype, device, generator, latents) log_msg = f"๐Ÿ”น prepare_latents called | shape={result_latents.shape}, dtype={result_latents.dtype}, device={result_latents.device}" if hasattr(self, "_latents_log"): self._latents_log.append(log_msg) else: self._latents_log = [log_msg] return result_latents except Exception as e: log(f"โš ๏ธ prepare_latents wrapper failed: {e}") raise # apply patch safely try: pipe.prepare_latents = logged_prepare_latents.__get__(pipe) log("โœ… prepare_latents monkey-patched") except Exception as e: log(f"โš ๏ธ Failed to attach prepare_latents patch: {e}") else: log("โŒ WARNING: Pipe not initialized or prepare_latents missing; skipping prepare_latents patch") from PIL import Image import torch # -------------------------- # Helper: Safe latent extractor # -------------------------- def safe_get_latents(pipe, height, width, generator, device, LOGS): """ Safely prepare latents for any ZImagePipeline variant. Returns latents tensor, logs issues instead of failing. """ try: # Determine number of channels num_channels = 4 # default fallback if hasattr(pipe, "unet") and hasattr(pipe.unet, "in_channels"): num_channels = pipe.unet.in_channels elif hasattr(pipe, "vae") and hasattr(pipe.vae, "latent_channels"): num_channels = pipe.vae.latent_channels # some pipelines define this LOGS.append(f"๐Ÿ”น Using num_channels={num_channels} for latents") latents = pipe.prepare_latents( batch_size=1, num_channels_latents=num_channels, height=height, width=width, dtype=torch.float32, device=device, generator=generator, ) LOGS.append(f"๐Ÿ”น Latents shape: {latents.shape}, dtype: {latents.dtype}, device: {latents.device}") return latents except Exception as e: LOGS.append(f"โš ๏ธ Latent extraction failed: {e}") # fallback: guess a safe shape fallback_channels = 16 # try standard default for ZImage pipelines latents = torch.randn((1, fallback_channels, height // 8, width // 8), generator=generator, device=device) LOGS.append(f"๐Ÿ”น Using fallback random latents shape: {latents.shape}") return latents # -------------------------- # Main generation function (kept exactly as your logic) # -------------------------- from huggingface_hub import HfApi, HfFolder import torch import os HF_REPO_ID = "rahul7star/Zstudio-latent" # Model repo HF_TOKEN = HfFolder.get_token() # Make sure you are logged in via `huggingface-cli login` def upload_latents_to_hf(latent_dict, filename="latents.pt"): local_path = f"/tmp/{filename}" torch.save(latent_dict, local_path) try: api = HfApi() api.upload_file( path_or_fileobj=local_path, path_in_repo=filename, repo_id=HF_REPO_ID, token=HF_TOKEN, repo_type="model" # since this is a model repo ) os.remove(local_path) return f"https://huggingface.co/{HF_REPO_ID}/resolve/main/{filename}" except Exception as e: os.remove(local_path) raise e import asyncio import torch from PIL import Image async def async_upload_latents(latent_dict, filename, LOGS): try: hf_url = await upload_latents_to_hf(latent_dict, filename=filename) # assume this can be async LOGS.append(f"๐Ÿ”น All preview latents uploaded: {hf_url}") except Exception as e: LOGS.append(f"โš ๏ธ Failed to upload all preview latents: {e}") # this code genetae all frame for latest GPU expseinve bt decide fails sp use this later @spaces.GPU def generate_image_all_latents(prompt, height, width, steps, seed, guidance_scale=0.0): LOGS = [] device = "cpu" # FORCE CPU generator = torch.Generator(device).manual_seed(int(seed)) placeholder = Image.new("RGB", (width, height), color=(255, 255, 255)) latent_gallery = [] final_gallery = [] last_four_latents = [] # we only upload 4 # -------------------------------------------------- # LATENT PREVIEW GENERATION (CPU MODE) # -------------------------------------------------- try: latents = safe_get_latents(pipe, height, width, generator, device, LOGS) latents = latents.to("cpu") # keep EVERYTHING CPU timestep_count = len(pipe.scheduler.timesteps) preview_every = max(1, timestep_count // 10) for i, t in enumerate(pipe.scheduler.timesteps): # -------------- decode latent preview -------------- try: with torch.no_grad(): latent_cpu = latents.to(pipe.vae.dtype) # match VAE dtype decoded = pipe.vae.decode(latent_cpu).sample # [1,3,H,W] decoded = (decoded / 2 + 0.5).clamp(0, 1) decoded = decoded[0].permute(1,2,0).cpu().numpy() latent_img = Image.fromarray((decoded * 255).astype("uint8")) except Exception: latent_img = placeholder LOGS.append("โš ๏ธ Latent preview decode failed.") latent_gallery.append(latent_img) # store last 4 latent states if len(last_four_latents) >= 4: last_four_latents.pop(0) last_four_latents.append(latents.cpu().clone()) # UI preview yields if i % preview_every == 0: yield None, latent_gallery, LOGS # -------------------------------------------------- # UPLOAD LAST 4 LATENTS (SYNC) # -------------------------------------------------- try: upload_dict = { "last_4_latents": last_four_latents, "prompt": prompt, "seed": seed } hf_url = upload_latents_to_hf( upload_dict, filename=f"latents_last4_{seed}.pt" ) LOGS.append(f"๐Ÿ”น Uploaded last 4 latents: {hf_url}") except Exception as e: LOGS.append(f"โš ๏ธ Failed to upload latents: {e}") except Exception as e: LOGS.append(f"โš ๏ธ Latent generation failed: {e}") latent_gallery.append(placeholder) yield None, latent_gallery, LOGS # -------------------------------------------------- # FINAL IMAGE - UNTOUCHED # -------------------------------------------------- try: output = pipe( prompt=prompt, height=height, width=width, num_inference_steps=steps, guidance_scale=guidance_scale, generator=generator, ) final_img = output.images[0] LOGS.append("โœ… Standard pipeline succeeded.") yield final_img, latent_gallery, LOGS except Exception as e2: LOGS.append(f"โŒ Standard pipeline failed: {e2}") yield placeholder, latent_gallery, LOGS @spaces.GPU def generate_imagenegative(prompt, height, width, steps, seed, guidance_scale=7.5): """ Generate image using ZImagePipeline with optional LoRA adapter. Shows step previews and final image. """ LOGS = [] generator = torch.Generator("cuda").manual_seed(int(seed)) placeholder = Image.new("RGB", (width, height), color=(255, 255, 255)) latent_gallery = [] final_gallery = [] # Determine active LoRA adapter active_adapter = None active_strength = 1.0 if loaded_loras: active_adapter = list(loaded_loras.keys())[-1] active_strength = loaded_loras[active_adapter + "_strength"] if loaded_loras.get(active_adapter + "_strength") else 1.0 pipe.set_adapters([active_adapter], [active_strength]) LOGS.append(f"๐Ÿงฉ Using LoRA adapter: {active_adapter} (strength={active_strength})") else: pipe.set_adapters([], []) LOGS.append("โšก No LoRA applied") try: # Generate small preview steps num_preview_steps = min(5, steps) for i in range(num_preview_steps): step = i + 1 try: preview_output = pipe( prompt=prompt, height=height // 4, # small preview width=width // 4, num_inference_steps=step, guidance_scale=guidance_scale, generator=generator, ) img = preview_output.images[0].resize((width, height)) latent_gallery.append(img) except Exception as e: LOGS.append(f"โš ๏ธ Preview step {step} failed: {e}") latent_gallery.append(placeholder) # --- Final image --- output = pipe( prompt=prompt, height=height, width=width, num_inference_steps=steps, guidance_scale=guidance_scale, generator=generator, ) final_img = output.images[0] final_gallery.append(final_img) latent_gallery.append(final_img) LOGS.append("โœ… Image generation completed.") yield final_img, latent_gallery, LOGS except Exception as e: LOGS.append(f"โŒ Generation failed: {e}") latent_gallery.append(placeholder) final_gallery.append(placeholder) yield placeholder, latent_gallery, LOGS @spaces.GPU def generate_image(prompt, height, width, steps, seed, guidance_scale=0.0): LOGS = [] device = "cuda" generator = torch.Generator(device).manual_seed(int(seed)) placeholder = Image.new("RGB", (width, height), color=(255, 255, 255)) latent_gallery = [] final_gallery = [] # --- Generate latent previews in a loop --- try: latents = safe_get_latents(pipe, height, width, generator, device, LOGS) # Convert latents to float32 if necessary if latents.dtype != torch.float32: latents = latents.float() # Loop for multiple previews before final image num_previews = min(10, steps) # show ~10 previews preview_steps = torch.linspace(0, 1, num_previews) for i, alpha in enumerate(preview_steps): try: with torch.no_grad(): # Simple noise interpolation for preview (simulate denoising progress) preview_latent = latents * alpha + torch.randn_like(latents) * (1 - alpha) # Decode to PIL latent_img_tensor = pipe.vae.decode(preview_latent).sample # [1,3,H,W] latent_img_tensor = (latent_img_tensor / 2 + 0.5).clamp(0, 1) latent_img_tensor = latent_img_tensor.cpu().permute(0, 2, 3, 1)[0] latent_img = Image.fromarray((latent_img_tensor.numpy() * 255).astype('uint8')) except Exception as e: LOGS.append(f"โš ๏ธ Latent preview decode failed: {e}") latent_img = placeholder latent_gallery.append(latent_img) yield None, latent_gallery, LOGS # update Gradio with intermediate preview # Save final latents to HF latent_dict = {"latents": latents.cpu(), "prompt": prompt, "seed": seed} try: hf_url = upload_latents_to_hf(latent_dict, filename=f"latents_{seed}.pt") LOGS.append(f"๐Ÿ”น Latents uploaded: {hf_url}") except Exception as e: LOGS.append(f"โš ๏ธ Failed to upload latents: {e}") except Exception as e: LOGS.append(f"โš ๏ธ Latent generation failed: {e}") latent_gallery.append(placeholder) yield None, latent_gallery, LOGS # --- Final image: untouched standard pipeline --- try: output = pipe( prompt=prompt, height=height, width=width, num_inference_steps=steps, guidance_scale=guidance_scale, generator=generator, ) final_img = output.images[0] final_gallery.append(final_img) latent_gallery.append(final_img) # fallback preview if needed LOGS.append("โœ… Standard pipeline succeeded.") yield final_img, latent_gallery, LOGS except Exception as e2: LOGS.append(f"โŒ Standard pipeline failed: {e2}") final_gallery.append(placeholder) latent_gallery.append(placeholder) yield placeholder, latent_gallery, LOGS # this is astable vesopn tha can gen final and a noise to latent @spaces.GPU def generate_image_verygood_realnoise(prompt, height, width, steps, seed, guidance_scale=0.0): LOGS = [] device = "cuda" generator = torch.Generator().manual_seed(int(seed)) placeholder = Image.new("RGB", (width, height), color=(255, 255, 255)) latent_gallery = [] final_gallery = [] # --- Generate latent previews --- try: latents = safe_get_latents(pipe, height, width, generator, device, LOGS) latents = latents.float() # keep float32 until decode num_previews = min(10, steps) preview_steps = torch.linspace(0, 1, num_previews) for alpha in preview_steps: try: with torch.no_grad(): # Simulate denoising progression like Z-Image Turbo preview_latent = latents * alpha + latents * 0 # optional: simple progression # Move to same device and dtype as VAE preview_latent = preview_latent.to(pipe.vae.device).to(pipe.vae.dtype) # Decode decoded = pipe.vae.decode(preview_latent, return_dict=False)[0] # Convert to PIL following same logic as final image decoded = (decoded / 2 + 0.5).clamp(0, 1) decoded = decoded.cpu().permute(0, 2, 3, 1).float().numpy() decoded = (decoded * 255).round().astype("uint8") latent_img = Image.fromarray(decoded[0]) except Exception as e: LOGS.append(f"โš ๏ธ Latent preview decode failed: {e}") latent_img = placeholder latent_gallery.append(latent_img) yield None, latent_gallery, LOGS except Exception as e: LOGS.append(f"โš ๏ธ Latent generation failed: {e}") latent_gallery.append(placeholder) yield None, latent_gallery, LOGS # --- Final image: untouched --- try: output = pipe( prompt=prompt, height=height, width=width, num_inference_steps=steps, guidance_scale=guidance_scale, generator=generator, ) final_img = output.images[0] final_gallery.append(final_img) latent_gallery.append(final_img) # fallback preview LOGS.append("โœ… Standard pipeline succeeded.") yield final_img, latent_gallery, LOGS except Exception as e2: LOGS.append(f"โŒ Standard pipeline failed: {e2}") final_gallery.append(placeholder) latent_gallery.append(placeholder) yield placeholder, latent_gallery, LOGS # DO NOT TOUCH this is astable vesopn tha can gen final and a noise to latent with latent upload to repo @spaces.GPU def generate_image_safe(prompt, height, width, steps, seed, guidance_scale=0.0): LOGS = [] device = "cuda" generator = torch.Generator(device).manual_seed(int(seed)) placeholder = Image.new("RGB", (width, height), color=(255, 255, 255)) latent_gallery = [] final_gallery = [] # --- Generate latent previews in a loop --- try: latents = safe_get_latents(pipe, height, width, generator, device, LOGS) # Convert latents to float32 if necessary if latents.dtype != torch.float32: latents = latents.float() # Loop for multiple previews before final image num_previews = min(10, steps) # show ~10 previews preview_steps = torch.linspace(0, 1, num_previews) for i, alpha in enumerate(preview_steps): try: with torch.no_grad(): # Simple noise interpolation for preview (simulate denoising progress) preview_latent = latents * alpha + torch.randn_like(latents) * (1 - alpha) # Decode to PIL latent_img_tensor = pipe.vae.decode(preview_latent).sample # [1,3,H,W] latent_img_tensor = (latent_img_tensor / 2 + 0.5).clamp(0, 1) latent_img_tensor = latent_img_tensor.cpu().permute(0, 2, 3, 1)[0] latent_img = Image.fromarray((latent_img_tensor.numpy() * 255).astype('uint8')) except Exception as e: LOGS.append(f"โš ๏ธ Latent preview decode failed: {e}") latent_img = placeholder latent_gallery.append(latent_img) yield None, latent_gallery, LOGS # update Gradio with intermediate preview # Save final latents to HF latent_dict = {"latents": latents.cpu(), "prompt": prompt, "seed": seed} try: hf_url = upload_latents_to_hf(latent_dict, filename=f"latents_{seed}.pt") LOGS.append(f"๐Ÿ”น Latents uploaded: {hf_url}") except Exception as e: LOGS.append(f"โš ๏ธ Failed to upload latents: {e}") except Exception as e: LOGS.append(f"โš ๏ธ Latent generation failed: {e}") latent_gallery.append(placeholder) yield None, latent_gallery, LOGS # --- Final image: untouched standard pipeline --- try: output = pipe( prompt=prompt, height=height, width=width, num_inference_steps=steps, guidance_scale=guidance_scale, generator=generator, ) final_img = output.images[0] final_gallery.append(final_img) latent_gallery.append(final_img) # fallback preview if needed LOGS.append("โœ… Standard pipeline succeeded.") yield final_img, latent_gallery, LOGS except Exception as e2: LOGS.append(f"โŒ Standard pipeline failed: {e2}") final_gallery.append(placeholder) latent_gallery.append(placeholder) yield placeholder, latent_gallery, LOGS import gradio as gr from huggingface_hub import list_repo_files, hf_hub_download import gradio as gr import os # ------------------------- # Helper: Recursive LoRA listing # ------------------------- from huggingface_hub import list_repo_files import gradio as gr from PIL import Image # ---------------------------- # LIST LoRA FILES HELPER # ---------------------------- # ---------------------------- # GRADIO UI # ---------------------------- # ------------------------- # Helper function # ------------------------- def list_loras_from_repo(repo_id: str): """ List all .safetensors files in a Hugging Face repo, including subfolders. Returns relative paths like 'Anime/retro_neo_noir_style_z_image_turbo.safetensors' """ try: all_files = list_repo_files(repo_id) safetensors_files = [f for f in all_files if f.endswith(".safetensors")] return safetensors_files except Exception as e: log(f"โŒ Failed to list repo files: {e}") return [] # Keep track of loaded adapters loaded_loras = {} # ------------------------- # Gradio UI # ------------------------- with gr.Blocks(title="Z-Image-Turbo") as demo: gr.Markdown("# ๐ŸŽจ Z-Image-Turbo (LoRA-enabled UI)") # ------------------------- # Tabs # ------------------------- with gr.Tabs(): # -------- Image & Latents -------- with gr.TabItem("Image & Latents"): with gr.Row(): with gr.Column(scale=1): prompt = gr.Textbox(label="Prompt", value="boat in Ocean") height = gr.Slider(256, 2048, value=1024, step=8, label="Height") width = gr.Slider(256, 2048, value=1024, step=8, label="Width") steps = gr.Slider(1, 50, value=20, step=1, label="Inference Steps") seed = gr.Number(value=42, label="Seed") run_btn = gr.Button("๐Ÿš€ Generate Image") with gr.Column(scale=1): final_image = gr.Image(label="Final Image") latent_gallery = gr.Gallery(label="Latent Steps", columns=4, height=256, preview=True) # -------- Logs -------- with gr.TabItem("Logs"): logs_box = gr.Textbox(label="Logs", lines=25, interactive=False) # ------------------------- # LoRA Controls # ------------------------- gr.Markdown("## ๐Ÿงฉ LoRA Controls") with gr.Row(): lora_repo = gr.Textbox(label="LoRA Repo (HF)", value="rahul7star/ZImageLora") lora_file = gr.Dropdown(label="LoRA file (.safetensors)", choices=[]) lora_strength = gr.Slider(0.0, 2.0, value=1.0, step=0.05, label="LoRA strength") with gr.Row(): refresh_lora_btn = gr.Button("๐Ÿ”„ Refresh LoRA List") apply_lora_btn = gr.Button("โœ… Apply LoRA") clear_lora_btn = gr.Button("โŒ Clear LoRA") # ------------------------- # Callbacks # ------------------------- def refresh_lora_list(repo_name): files = list_loras_from_repo(repo_name) if not files: log(f"โš ๏ธ No LoRA files found in {repo_name}") return gr.update(choices=[], value=None) log(f"๐Ÿ“ฆ Found {len(files)} LoRA files in {repo_name}") return gr.update(choices=files, value=files[0]) refresh_lora_btn.click(refresh_lora_list, inputs=[lora_repo], outputs=[lora_file]) def apply_lora(repo_name, lora_filename, strength): global pipe, loaded_loras if pipe is None: return "โŒ Pipeline not initialized" if not lora_filename: return "โš ๏ธ No LoRA file selected" adapter_name = f"ui_lora_{lora_filename.replace('/', '_').replace('.', '_')}" try: if adapter_name not in loaded_loras: pipe.load_lora_weights(repo_name, weight_name=lora_filename, adapter_name=adapter_name) loaded_loras[adapter_name] = lora_filename log(f"๐Ÿ“ฅ Loaded LoRA: {lora_filename}") pipe.set_adapters([adapter_name], [strength]) log(f"โœ… Applied LoRA adapter: {adapter_name} (strength={strength})") return f"LoRA applied: {lora_filename}" except Exception as e: log(f"โŒ Failed to apply LoRA: {e}") return f"Failed: {e}" apply_lora_btn.click(apply_lora, inputs=[lora_repo, lora_file, lora_strength], outputs=[logs_box]) def clear_lora(): global pipe if pipe is None: return "โŒ Pipeline not initialized" try: pipe.set_adapters([], []) log("๐Ÿงน LoRA cleared") return "LoRA cleared" except Exception as e: log(f"โŒ Failed to clear LoRA: {e}") return f"Failed: {e}" clear_lora_btn.click(clear_lora, outputs=[logs_box]) # ------------------------- # Run Generation # ------------------------- run_btn.click( generate_image, inputs=[prompt, height, width, steps, seed], outputs=[final_image, latent_gallery, logs_box] ) demo.launch()