import gradio as gr import numpy as np import random, json, spaces, torch, time from diffusers import AutoencoderKL, FlowMatchEulerDiscreteScheduler from transformers import AutoTokenizer, Qwen3ForCausalLM from safetensors.torch import load_file from utils import repo_utils, prompt_utils, image_utils # from controlnet_aux.processor import Processor from omegaconf import OmegaConf # clone and move videox_fun repo_utils.clone_repo_if_not_exists("https://github.com/aigc-apps/VideoX-Fun.git", "repos") repo_utils.move_folder("repos/VideoX-Fun/videox_fun", "videox_fun") repo_utils.move_folder("repos/VideoX-Fun/config", "config") from videox_fun.pipeline import ZImageControlPipeline from videox_fun.models import ZImageControlTransformer2DModel from videox_fun.utils.utils import get_image_latent from controlnet_aux.processor import Processor #clone models repo_utils.clone_repo_if_not_exists("https://huggingface.co/Tongyi-MAI/Z-Image-Turbo", "models") repo_utils.clone_repo_if_not_exists("https://huggingface.co/alibaba-pai/Z-Image-Turbo-Fun-Controlnet-Union-2.0", "models") MODEL_LOCAL = "models/Z-Image-Turbo/" TRANSFORMER_LOCAL = "models/Z-Image-Turbo-Fun-Controlnet-Union-2.0/Z-Image-Turbo-Fun-Controlnet-Union-2.0.safetensors" TRANSFORMER_CONFIG = "config/z_image/z_image_control_2.0.yaml" TRANSFORMER_MERGED = "models/ZIT-Merged" MAX_SEED = np.iinfo(np.int32).max MAX_IMAGE_SIZE = 1280 DTYPE = torch.bfloat16 has_merged = repo_utils.check_dir_exist(TRANSFORMER_MERGED) # load transformer config = OmegaConf.load(TRANSFORMER_CONFIG) # if not has_merged: print('load transformer from base') transformer = ZImageControlTransformer2DModel.from_pretrained( MODEL_LOCAL, subfolder="transformer", transformer_additional_kwargs=OmegaConf.to_container(config['transformer_additional_kwargs']), ).to("cuda", torch.bfloat16) print('load state_dict') state_dict = load_file(TRANSFORMER_LOCAL) state_dict = state_dict["state_dict"] if "state_dict" in state_dict else state_dict m, u = transformer.load_state_dict(state_dict, strict=False) print(f"missing keys: {len(m)}, unexpected keys: {len(u)}") transformer.save_pretrained(TRANSFORMER_MERGED) # else: # print('load transformer from merged to bypass calculation') # transformer = ZImageControlTransformer2DModel.from_pretrained( # TRANSFORMER_MERGED, # transformer_additional_kwargs=OmegaConf.to_container(config['transformer_additional_kwargs']), # ).to("cuda", torch.bfloat16) print("transformer ready.") # load ZImageControlPipeline vae = AutoencoderKL.from_pretrained( MODEL_LOCAL, subfolder="vae", device_map="cuda" ).to(DTYPE) print("vae ready.") tokenizer = AutoTokenizer.from_pretrained( MODEL_LOCAL, subfolder="tokenizer" ) print("tokenizer ready.") text_encoder = Qwen3ForCausalLM.from_pretrained( MODEL_LOCAL, subfolder="text_encoder", torch_dtype=DTYPE, ) # scheduler = FlowMatchEulerDiscreteScheduler(num_train_timesteps=1000, shift=3) scheduler = FlowMatchEulerDiscreteScheduler.from_pretrained( MODEL_LOCAL, subfolder="scheduler" ) print("scheduler ready.") pipe = ZImageControlPipeline( vae=vae, tokenizer=tokenizer, text_encoder=text_encoder, transformer=transformer, scheduler=scheduler, ) pipe.to("cuda", torch.bfloat16) print("pipe ready.") def prepare(edit_dict, prompt): # return edit_dict['background'] if not prompt: prompt = "Ultra HD, 4K" output_image = image_utils.replace_transparent(edit_dict['layers'][0], (0, 0, 0)) return output_image, prompt @spaces.GPU def inference( prompt, negative_prompt, edit_dict, mask_image, control_context_scale = 0.75, seed=42, randomize_seed=True, guidance_scale=1.5, num_inference_steps=8, progress=gr.Progress(track_tqdm=True), ): timestamp = time.time() print(f"timestamp: {timestamp}") # process image print("DEBUG: process image") if edit_dict is None or mask_image is None: print("Error: edit_dict or mask_image is empty.") return None # rescale to prevent OOM input_image = edit_dict['background'] input_image, width, height = image_utils.rescale_image(inpaint_image, 1, 8, max_size=1280) sample_size = [height, width] print("DEBUG: control_image_torch") if input_image is not None: inpaint_image = get_image_latent(input_image, sample_size=sample_size)[:, :, 0] else: inpaint_image = torch.zeros([1, 3, sample_size[0], sample_size[1]]) if mask_image is not None: mask_image, w, h = image_utils.rescale_image(mask_image, 1, 8, max_size=1280) mask_image = get_image_latent(mask_image, sample_size=sample_size)[:, :1, 0] else: mask_image = torch.ones([1, 1, sample_size[0], sample_size[1]]) * 255 print("DEBUG: control_image_torch") processor = Processor('canny') control_image, w, h = image_utils.rescale_image(input_image, 1, 8, max_size=1280) control_image = control_image.resize((1024, 1024)) control_image = processor(control_image, to_pil=True) control_image = control_image.resize((width, height)) control_image_torch = get_image_latent(control_image, sample_size=sample_size)[:, :, 0] # generation if randomize_seed: seed = random.randint(0, MAX_SEED) generator = torch.Generator().manual_seed(seed) output_image = pipe( prompt=prompt, negative_prompt = negative_prompt, height=height, width=width, generator=generator, guidance_scale=guidance_scale, image = inpaint_image, mask_image = mask_image, control_image=control_image_torch, num_inference_steps=num_inference_steps, control_context_scale=control_context_scale, ).images[0] return output_image, seed def read_file(path: str) -> str: with open(path, 'r', encoding='utf-8') as f: content = f.read() return content css = """ #col-container { margin: 0 auto; max-width: 960px; } """ with open('examples/0data.json', 'r') as file: examples = json.load(file) with gr.Blocks() as demo: with gr.Column(elem_id="col-container"): with gr.Column(): gr.HTML(read_file("static/header.html")) with gr.Row(): with gr.Column(): edit_dict = gr.ImageMask( height=600, sources=['upload', 'clipboard'], type="pil", brush= gr.Brush( colors=["#FFFFFF"], color_mode="fixed", default_size=75 ), label="Edit Image" ) prompt = gr.Textbox( label="Prompt", show_label=False, lines=2, placeholder="Enter your prompt", # container=False, ) run_button = gr.Button("Generate", variant="primary") with gr.Accordion("Advanced Settings", open=False): negative_prompt = gr.Textbox( label="Negative prompt", lines=2, container=False, placeholder="Enter your negative prompt", value="blurry ugly bad" ) # with gr.Row(): num_inference_steps = gr.Slider( label="Steps", minimum=1, maximum=30, step=1, value=9, ) control_context_scale = gr.Slider( label="Context scale", minimum=0.0, maximum=1.0, step=0.01, value=0.40, ) guidance_scale = gr.Slider( label="Guidance scale", minimum=0.0, maximum=10.0, step=0.1, value=1.0, ) seed = gr.Slider( label="Seed", minimum=0, maximum=MAX_SEED, step=1, value=42, ) randomize_seed = gr.Checkbox(label="Randomize seed", value=False) with gr.Column(): output_image = gr.Image(label="Generated image", show_label=False) # polished_prompt = gr.Textbox(label="Polished prompt", interactive=False) with gr.Accordion("Preprocessor data", open=False): mask_image = gr.Image( label="Generated Mask", interactive=False, type="pil", ) gr.Examples(examples=examples, inputs=[edit_dict, prompt]) gr.Markdown(read_file("static/footer.md")) # edit_dict.upload(fn=lambda x: x, inputs=[mask_image], outputs=[input_image]) run_button.click( fn=prepare, inputs=[edit_dict, prompt], outputs=[mask_image, prompt] ).then( fn=inference, inputs=[ prompt, negative_prompt, edit_dict, mask_image, control_context_scale, seed, randomize_seed, guidance_scale, num_inference_steps, ], outputs=[output_image, seed], ) if __name__ == "__main__": demo.launch(mcp_server=True, css=css)