Skip to content
This repository was archived by the owner on Feb 7, 2025. It is now read-only.
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
131 changes: 65 additions & 66 deletions generative/inferers/inferer.py
Original file line number Diff line number Diff line change
Expand Up @@ -381,15 +381,17 @@ def __call__(
if self.ldm_latent_shape is not None:
latent = self.ldm_resizer(latent)

call = partial(super().__call__, seg = seg) if \
isinstance(diffusion_model, SPADEDiffusionModelUNet) else super().__call__
call = super().__call__
if isinstance(diffusion_model, SPADEDiffusionModelUNet):
call = partial(super().__call__, seg=seg)

prediction = call(
inputs=latent,
diffusion_model=diffusion_model,
noise=noise,
timesteps=timesteps,
condition=condition,
mode=mode
mode=mode,
)
return prediction

Expand Down Expand Up @@ -432,9 +434,9 @@ def sample(
"labels for each must be compatible. "
)

sample = (
partial(super().sample, seg=seg) if isinstance(diffusion_model, SPADEDiffusionModelUNet) else super().sample
)
sample = super().sample
if isinstance(diffusion_model, SPADEDiffusionModelUNet):
sample = partial(super().sample, seg=seg)

outputs = sample(
input_noise=input_noise,
Expand All @@ -456,19 +458,19 @@ def sample(
latent = self.autoencoder_resizer(latent)
latent_intermediates = [self.autoencoder_resizer(l) for l in latent_intermediates]

image = autoencoder_model.decode_stage_2_outputs(latent / self.scale_factor)
decode = autoencoder_model.decode_stage_2_outputs
if isinstance(autoencoder_model, SPADEAutoencoderKL):
decode = partial(autoencoder_model.decode_stage_2_outputs, seg=seg)

image = decode(latent / self.scale_factor)

if save_intermediates:
intermediates = []
for latent_intermediate in latent_intermediates:
decode = autoencoder_model.decode_stage_2_outputs
if isinstance(autoencoder_model, SPADEAutoencoderKL):
intermediates.append(
autoencoder_model.decode_stage_2_outputs(latent_intermediate / self.scale_factor, seg=seg)
)
else:
intermediates.append(
autoencoder_model.decode_stage_2_outputs(latent_intermediate / self.scale_factor)
)
decode = partial(autoencoder_model.decode_stage_2_outputs, seg=seg)
intermediates.append(decode(latent_intermediate / self.scale_factor))
return image, intermediates

else:
Expand Down Expand Up @@ -521,11 +523,9 @@ def get_likelihood(
if self.ldm_latent_shape is not None:
latents = self.ldm_resizer(latents)

get_likelihood = (
partial(super().get_likelihood, seg=seg)
if isinstance(diffusion_model, SPADEDiffusionModelUNet)
else super().get_likelihood
)
get_likelihood = super().get_likelihood
if isinstance(diffusion_model, SPADEDiffusionModelUNet):
get_likelihood = partial(super().get_likelihood, seg=seg)

outputs = get_likelihood(
inputs=latents,
Expand Down Expand Up @@ -596,13 +596,11 @@ def __call__(
noisy_image = torch.cat([noisy_image, condition], dim=1)
condition = None

diffusion_model = (
partial(diffusion_model, seg=seg)
if isinstance(diffusion_model, SPADEDiffusionModelUNet)
else diffusion_model
)
diffuse = diffusion_model
if isinstance(diffusion_model, SPADEDiffusionModelUNet):
diffuse = partial(diffusion_model, seg = seg)

prediction = diffusion_model(
prediction = diffuse(
x=noisy_image,
timesteps=timesteps,
context=condition,
Expand Down Expand Up @@ -658,22 +656,21 @@ def sample(
x=image, timesteps=torch.Tensor((t,)).to(input_noise.device), controlnet_cond=cn_cond
)
# 2. predict noise model_output
diffusion_model = (
partial(diffusion_model, seg=seg)
if isinstance(diffusion_model, SPADEDiffusionModelUNet)
else diffusion_model
)
diffuse = diffusion_model
if isinstance(diffusion_model, SPADEDiffusionModelUNet):
diffuse = partial(diffusion_model, seg=seg)

if mode == "concat":
model_input = torch.cat([image, conditioning], dim=1)
model_output = diffusion_model(
model_output = diffuse(
model_input,
timesteps=torch.Tensor((t,)).to(input_noise.device),
context=None,
down_block_additional_residuals=down_block_res_samples,
mid_block_additional_residual=mid_block_res_sample,
)
else:
model_output = diffusion_model(
model_output = diffuse(
image,
timesteps=torch.Tensor((t,)).to(input_noise.device),
context=conditioning,
Expand Down Expand Up @@ -747,22 +744,21 @@ def get_likelihood(
x=noisy_image, timesteps=torch.Tensor((t,)).to(inputs.device), controlnet_cond=cn_cond
)

diffusion_model = (
partial(diffusion_model, seg=seg)
if isinstance(diffusion_model, SPADEDiffusionModelUNet)
else diffusion_model
)
diffuse = diffusion_model
if isinstance(diffusion_model, SPADEDiffusionModelUNet):
diffuse = partial(diffusion_model, seg = seg)

if mode == "concat":
noisy_image = torch.cat([noisy_image, conditioning], dim=1)
model_output = diffusion_model(
model_output = diffuse(
noisy_image,
timesteps=timesteps,
context=None,
down_block_additional_residuals=down_block_res_samples,
mid_block_additional_residual=mid_block_res_sample,
)
else:
model_output = diffusion_model(
model_output = diffuse(
x=noisy_image,
timesteps=timesteps,
context=conditioning,
Expand Down Expand Up @@ -836,7 +832,6 @@ def get_likelihood(
else:
return total_kl


class ControlNetLatentDiffusionInferer(ControlNetDiffusionInferer):
"""
ControlNetLatentDiffusionInferer takes a stage 1 model (VQVAE or AutoencoderKL), diffusion model, controlnet,
Expand Down Expand Up @@ -905,8 +900,10 @@ def __call__(
if cn_cond.shape[2:] != latent.shape[2:]:
cn_cond = F.interpolate(cn_cond, latent.shape[2:])

call = partial(super().__call__, seg = seg) if \
isinstance(diffusion_model, SPADEDiffusionModelUNet) else super().__call__
call = super().__call__
if isinstance(diffusion_model, SPADEDiffusionModelUNet):
call = partial(super().__call__, seg=seg)

prediction = call(
inputs=latent,
diffusion_model=diffusion_model,
Expand All @@ -915,7 +912,7 @@ def __call__(
timesteps=timesteps,
cn_cond=cn_cond,
condition=condition,
mode=mode
mode=mode,
)

return prediction
Expand Down Expand Up @@ -966,20 +963,21 @@ def sample(
if cn_cond.shape[2:] != input_noise.shape[2:]:
cn_cond = F.interpolate(cn_cond, input_noise.shape[2:])

sample = partial(super().sample, seg = seg) if \
isinstance(diffusion_model, SPADEDiffusionModelUNet) else super().sample
sample = super().sample
if isinstance(diffusion_model, SPADEDiffusionModelUNet):
sample = partial(super().sample, seg=seg)

outputs = sample(
input_noise=input_noise,
diffusion_model=diffusion_model,
controlnet=controlnet,
cn_cond=cn_cond,
scheduler=scheduler,
save_intermediates=save_intermediates,
intermediate_steps=intermediate_steps,
conditioning=conditioning,
mode=mode,
verbose=verbose,
input_noise=input_noise,
diffusion_model=diffusion_model,
controlnet=controlnet,
cn_cond=cn_cond,
scheduler=scheduler,
save_intermediates=save_intermediates,
intermediate_steps=intermediate_steps,
conditioning=conditioning,
mode=mode,
verbose=verbose,
)

if save_intermediates:
Expand All @@ -991,19 +989,19 @@ def sample(
latent = self.autoencoder_resizer(latent)
latent_intermediates = [self.autoencoder_resizer(l) for l in latent_intermediates]

image = autoencoder_model.decode_stage_2_outputs(latent / self.scale_factor)
decode = autoencoder_model.decode_stage_2_outputs
if isinstance(autoencoder_model, SPADEAutoencoderKL):
decode = partial(autoencoder_model.decode_stage_2_outputs, seg=seg)

image = decode(latent / self.scale_factor)

if save_intermediates:
intermediates = []
for latent_intermediate in latent_intermediates:
decode = autoencoder_model.decode_stage_2_outputs
if isinstance(autoencoder_model, SPADEAutoencoderKL):
intermediates.append(
autoencoder_model.decode_stage_2_outputs(latent_intermediate / self.scale_factor), seg=seg
)
else:
intermediates.append(
autoencoder_model.decode_stage_2_outputs(latent_intermediate / self.scale_factor)
)
decode = partial(autoencoder_model.decode_stage_2_outputs, seg=seg)
intermediates.append(decode(latent_intermediate / self.scale_factor))
return image, intermediates

else:
Expand Down Expand Up @@ -1064,8 +1062,10 @@ def get_likelihood(
if self.ldm_latent_shape is not None:
latents = self.ldm_resizer(latents)

get_likelihood = partial(super().get_likelihood, seg = seg) if \
isinstance(diffusion_model, SPADEDiffusionModelUNet) else super().get_likelihood
get_likelihood = super().get_likelihood
if isinstance(diffusion_model, SPADEDiffusionModelUNet):
get_likelihood = partial(super().get_likelihood, seg=seg)

outputs = get_likelihood(
inputs=latents,
diffusion_model=diffusion_model,
Expand All @@ -1085,7 +1085,6 @@ def get_likelihood(
outputs = (outputs[0], intermediates)
return outputs


class VQVAETransformerInferer(Inferer):
"""
Class to perform inference with a VQVAE + Transformer model.
Expand Down