Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 47 additions & 8 deletions src/diffusers/modular_pipelines/modular_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -1707,6 +1707,8 @@ def __init__(
_blocks_class_name=self._blocks.__class__.__name__ if self._blocks is not None else None
)

self._pretrained_model_name_or_path = pretrained_model_name_or_path

@property
def default_call_parameters(self) -> dict[str, Any]:
"""
Expand Down Expand Up @@ -2254,6 +2256,11 @@ def update_components(self, **kwargs):
new_component_spec = current_component_spec
if hasattr(self, name) and getattr(self, name) is not None:
logger.warning(f"ModularPipeline.update_components: setting {name} to None (spec unchanged)")
elif (
current_component_spec.default_creation_method == "from_pretrained"
and getattr(component, "_diffusers_load_id", None) is None
):
new_component_spec = ComponentSpec(name=name, type_hint=type(component))
else:
new_component_spec = ComponentSpec.from_component(name, component)

Expand Down Expand Up @@ -2325,17 +2332,49 @@ def load_components(self, names: list[str] | str | None = None, **kwargs):
elif "default" in value:
# check if the default is specified
component_load_kwargs[key] = value["default"]
# Only pass trust_remote_code to components from the same repo as the pipeline.
# When a user passes trust_remote_code=True, they intend to trust code from the
# pipeline's repo, not from external repos referenced in modular_model_index.json.
trust_remote_code_stripped = False
if (
"trust_remote_code" in component_load_kwargs
and self._pretrained_model_name_or_path is not None
and spec.pretrained_model_name_or_path != self._pretrained_model_name_or_path
):
component_load_kwargs.pop("trust_remote_code")
trust_remote_code_stripped = True

if not spec.pretrained_model_name_or_path:
logger.info(f"Skipping component `{name}`: no pretrained model path specified.")
continue

try:
components_to_register[name] = spec.load(**component_load_kwargs)
except Exception:
logger.warning(
f"\nFailed to create component {name}:\n"
f"- Component spec: {spec}\n"
f"- load() called with kwargs: {component_load_kwargs}\n"
"If this component is not required for your workflow you can safely ignore this message.\n\n"
"Traceback:\n"
f"{traceback.format_exc()}"
)
tb = traceback.format_exc()
if trust_remote_code_stripped and "trust_remote_code" in tb:
warning_msg = (
f"Failed to load component `{name}` from external repository "
f"`{spec.pretrained_model_name_or_path}`.\n\n"
f"`trust_remote_code=True` was not forwarded to `{name}` because it comes from "
f"a different repository than the pipeline (`{self._pretrained_model_name_or_path}`). "
f"For safety, `trust_remote_code` is only forwarded to components from the same "
f"repository as the pipeline.\n\n"
f"You need to load this component manually with `trust_remote_code=True` and pass it "
f"to the pipeline via `pipe.update_components()`. For example, if it is a custom model:\n\n"
f' {name} = AutoModel.from_pretrained("{spec.pretrained_model_name_or_path}", trust_remote_code=True)\n'
f" pipe.update_components({name}={name})\n"
)
Comment on lines +2364 to +2367
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is perfect, thank you!

else:
warning_msg = (
f"Failed to create component {name}:\n"
f"- Component spec: {spec}\n"
f"- load() called with kwargs: {component_load_kwargs}\n"
"If this component is not required for your workflow you can safely ignore this message.\n\n"
"Traceback:\n"
f"{tb}"
)
logger.warning(warning_msg)

# Register all components at once
self.register_components(**components_to_register)
Expand Down
42 changes: 42 additions & 0 deletions tests/modular_pipelines/test_modular_pipelines_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -687,6 +687,18 @@ def test_load_components_selective_loading(self):
assert pipe.unet is not None
assert getattr(pipe, "vae", None) is None

def test_load_components_selective_loading_incremental(self):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we also test if pipe doesn't have other expected components (like scheduler)?

"""Loading a subset of components should not affect already-loaded components."""
pipe = ModularPipeline.from_pretrained("hf-internal-testing/tiny-stable-diffusion-xl-pipe")

pipe.load_components(names="unet", torch_dtype=torch.float32)
pipe.load_components(names="text_encoder", torch_dtype=torch.float32)

assert hasattr(pipe, "unet")
assert pipe.unet is not None
assert hasattr(pipe, "text_encoder")
assert pipe.text_encoder is not None

def test_load_components_skips_invalid_pretrained_path(self):
pipe = ModularPipeline.from_pretrained("hf-internal-testing/tiny-stable-diffusion-xl-pipe")

Expand Down Expand Up @@ -749,6 +761,36 @@ def test_save_pretrained_roundtrip_with_local_model(self, tmp_path):
for key in original_state_dict:
assert torch.equal(original_state_dict[key], loaded_state_dict[key]), f"Mismatch in {key}"

def test_save_pretrained_updates_index_for_model_with_no_load_id(self, tmp_path):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very cool test!

"""testing the workflow of update the pipeline with a custom model and save the pipeline,
the modular_model_index.json should point to the save directory."""
import json

from diffusers import UNet2DConditionModel
Comment on lines +767 to +769
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(nit): maybe we could shift all the imports to the top.


pipe = ModularPipeline.from_pretrained("hf-internal-testing/tiny-stable-diffusion-xl-pipe")
pipe.load_components(torch_dtype=torch.float32)

unet = UNet2DConditionModel.from_pretrained(
"hf-internal-testing/tiny-stable-diffusion-xl-pipe", subfolder="unet"
)
assert not hasattr(unet, "_diffusers_load_id")

pipe.update_components(unet=unet)

save_dir = str(tmp_path / "my-pipeline")
pipe.save_pretrained(save_dir)

with open(os.path.join(save_dir, "modular_model_index.json")) as f:
index = json.load(f)

_library, _cls, unet_spec = index["unet"]
assert unet_spec["pretrained_model_name_or_path"] == save_dir
assert unet_spec["subfolder"] == "unet"

_library, _cls, vae_spec = index["vae"]
assert vae_spec["pretrained_model_name_or_path"] == "hf-internal-testing/tiny-stable-diffusion-xl-pipe"

def test_save_pretrained_overwrite_modular_index(self, tmp_path):
"""With overwrite_modular_index=True, all component references should point to the save directory."""
import json
Expand Down
150 changes: 150 additions & 0 deletions tests/modular_pipelines/test_modular_pipelines_custom_blocks.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,156 @@ def test_custom_block_supported_components(self):
assert len(pipe.components) == 1
assert pipe.component_names[0] == "transformer"

def test_trust_remote_code_not_propagated_to_external_repo(self):
"""When a modular pipeline repo references a component from an external repo that has custom
code (auto_map in config), calling load_components(trust_remote_code=True) should NOT
propagate trust_remote_code to that external component. The external component should fail
to load."""

from diffusers import ModularPipeline

CUSTOM_MODEL_CODE = (
"import torch\n"
"from diffusers import ModelMixin, ConfigMixin\n"
"from diffusers.configuration_utils import register_to_config\n"
"\n"
"class CustomModel(ModelMixin, ConfigMixin):\n"
" @register_to_config\n"
" def __init__(self, hidden_size=8):\n"
" super().__init__()\n"
" self.linear = torch.nn.Linear(hidden_size, hidden_size)\n"
"\n"
" def forward(self, x):\n"
" return self.linear(x)\n"
)

with tempfile.TemporaryDirectory() as external_repo_dir, tempfile.TemporaryDirectory() as pipeline_repo_dir:
# Step 1: Create an external model repo with custom code (requires trust_remote_code)
with open(os.path.join(external_repo_dir, "modeling.py"), "w") as f:
f.write(CUSTOM_MODEL_CODE)

config = {
"_class_name": "CustomModel",
"_diffusers_version": "0.0.0",
"auto_map": {"AutoModel": "modeling.CustomModel"},
"hidden_size": 8,
}
with open(os.path.join(external_repo_dir, "config.json"), "w") as f:
json.dump(config, f)

torch.save({}, os.path.join(external_repo_dir, "diffusion_pytorch_model.bin"))

# Step 2: Create a custom block that references the external repo.
# Define both the class (for direct use) and its code string (for block.py).
class ExternalRefBlock(ModularPipelineBlocks):
@property
def expected_components(self):
return [
ComponentSpec(
"custom_model",
AutoModel,
pretrained_model_name_or_path=external_repo_dir,
)
]

@property
def inputs(self) -> List[InputParam]:
return [InputParam("prompt", type_hint=str, required=True)]

@property
def intermediate_inputs(self) -> List[InputParam]:
return []

@property
def intermediate_outputs(self) -> List[OutputParam]:
return [OutputParam("output", type_hint=str)]

def __call__(self, components, state: PipelineState) -> PipelineState:
block_state = self.get_block_state(state)
block_state.output = "test"
self.set_block_state(state, block_state)
return components, state

EXTERNAL_REF_BLOCK_CODE_STR = (
"from typing import List\n"
"from diffusers import AutoModel\n"
"from diffusers.modular_pipelines import (\n"
" ComponentSpec,\n"
" InputParam,\n"
" ModularPipelineBlocks,\n"
" OutputParam,\n"
" PipelineState,\n"
")\n"
"\n"
"class ExternalRefBlock(ModularPipelineBlocks):\n"
" @property\n"
" def expected_components(self):\n"
" return [\n"
" ComponentSpec(\n"
' "custom_model",\n'
" AutoModel,\n"
f' pretrained_model_name_or_path="{external_repo_dir}",\n'
" )\n"
" ]\n"
"\n"
" @property\n"
" def inputs(self) -> List[InputParam]:\n"
' return [InputParam("prompt", type_hint=str, required=True)]\n'
"\n"
" @property\n"
" def intermediate_inputs(self) -> List[InputParam]:\n"
" return []\n"
"\n"
" @property\n"
" def intermediate_outputs(self) -> List[OutputParam]:\n"
' return [OutputParam("output", type_hint=str)]\n'
"\n"
" def __call__(self, components, state: PipelineState) -> PipelineState:\n"
" block_state = self.get_block_state(state)\n"
' block_state.output = "test"\n'
" self.set_block_state(state, block_state)\n"
" return components, state\n"
)

# Save the block config, write block.py, then load back via from_pretrained
block = ExternalRefBlock()
block.save_pretrained(pipeline_repo_dir)

# auto_map will reference the module name derived from ExternalRefBlock.__module__,
# which is "test_modular_pipelines_custom_blocks". Write the code file with that name.
code_path = os.path.join(pipeline_repo_dir, "test_modular_pipelines_custom_blocks.py")
with open(code_path, "w") as f:
f.write(EXTERNAL_REF_BLOCK_CODE_STR)

block = ModularPipelineBlocks.from_pretrained(pipeline_repo_dir, trust_remote_code=True)
pipe = block.init_pipeline()
pipe.save_pretrained(pipeline_repo_dir)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we also inspect the modular_model_index.json file here (checking if the repo paths were serialized as expected)?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think if we can load it back, we are fine (we loaded it back next and verified the path too)


# Step 3: Load the pipeline from the saved directory.
loaded_pipe = ModularPipeline.from_pretrained(pipeline_repo_dir, trust_remote_code=True)

assert loaded_pipe._pretrained_model_name_or_path == pipeline_repo_dir
assert loaded_pipe._component_specs["custom_model"].pretrained_model_name_or_path == external_repo_dir
assert getattr(loaded_pipe, "custom_model", None) is None

# Step 4a: load_components WITHOUT trust_remote_code.
# It should still fail
loaded_pipe.load_components()
assert getattr(loaded_pipe, "custom_model", None) is None

# Step 4b: load_components with trust_remote_code=True.
# trust_remote_code should be stripped for the external component, so it fails.
# The warning should contain guidance about manually loading with trust_remote_code.
loaded_pipe.load_components(trust_remote_code=True)
assert getattr(loaded_pipe, "custom_model", None) is None

# Step 4c: Manually load with AutoModel and update_components — this should work.
from diffusers import AutoModel

custom_model = AutoModel.from_pretrained(external_repo_dir, trust_remote_code=True)
loaded_pipe.update_components(custom_model=custom_model)
assert getattr(loaded_pipe, "custom_model", None) is not None

def test_custom_block_loads_from_hub(self):
repo_id = "hf-internal-testing/tiny-modular-diffusers-block"
block = ModularPipelineBlocks.from_pretrained(repo_id, trust_remote_code=True)
Expand Down