-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathworkflow_node.py
More file actions
402 lines (346 loc) · 13.9 KB
/
workflow_node.py
File metadata and controls
402 lines (346 loc) · 13.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
"""
Subworkflow: loads an inner workflow (UI or API format), infers its I/O from
Subworkflow Input / Subworkflow Output nodes, and executes it as a subgraph expansion.
"""
import logging
from comfy_api.latest import io
from .debug_utils import configure_logger
from .workflow_utils import (
list_workflow_files,
load_workflow_file,
load_workflow_url,
build_expansion,
build_modifier_source_expansion,
apply_control_after_generate,
validate_workflow_nodes_installed,
get_workflow_interface,
)
log = configure_logger("ComfyUI-Subworkflow")
class _UnboundedOutputTypes:
"""Returns '*' for any slot index so validation never IndexErrors on dynamic outputs."""
def __getitem__(self, idx):
return "*"
def __len__(self):
return 0
def __iter__(self):
return iter(())
class _UnboundedFalseSeq:
"""Infinite False iterator so merge_result_data zips correctly for any output count."""
def __getitem__(self, idx):
return False
def __len__(self):
return 0
def __iter__(self):
while True:
yield False
def _apply_primitive_overrides(data: dict, kwargs: dict) -> dict:
"""
For INT/FLOAT inputs where the outer slot is not connected and the override
widget has its switch enabled, substitute the widget value so build_expansion
uses it instead of the inner fallback node.
The frontend sends a single widget named swf_override_i whose value is a dict
{"use": bool, "val": number}.
"""
info = get_workflow_interface(data)
effective = dict(kwargs)
for i, inp_info in enumerate(info["inputs"]):
if inp_info.get("type") not in ("INT", "FLOAT"):
continue
key = f"swf_in_{i}"
if effective.get(key) is not None:
continue # connected node takes priority
override = effective.get(f"swf_override_{i}")
if isinstance(override, dict) and override.get("use"):
effective[key] = override.get("val")
return effective
class BaseSubworkflow(io.ComfyNode):
"""
Shared execution behavior for subworkflow nodes that load workflow JSON from
different sources.
"""
@classmethod
def validate_inputs(cls, **kwargs):
# Accept any incoming types for dynamic swf_in_* inputs.
return True
@classmethod
def check_lazy_status(cls, **kwargs):
missing_inputs = [
name for name, value in kwargs.items()
if name.startswith("swf_in_") and value is None
]
if missing_inputs:
log.debug("[Subworkflow] waiting for dynamic input(s) %s", missing_inputs)
return missing_inputs
@classmethod
def fingerprint_inputs(cls, *args, **kwargs):
# The inner workflow can change between runs by file reload or by cached
# control-after-generate mutations, even when outer inputs are unchanged.
return float("NaN")
_loaded_workflows: dict[str, dict] = {}
@classmethod
def _source_label(cls, **kwargs) -> str:
raise NotImplementedError
@classmethod
def _source_cache_key(cls, **kwargs) -> str:
raise NotImplementedError
@classmethod
def _load_source(cls, **kwargs) -> dict:
raise NotImplementedError
@classmethod
def _get_workflow_data(cls, reload_each_execution: bool, **kwargs) -> dict:
cache_key = cls._source_cache_key(**kwargs)
label = cls._source_label(**kwargs)
if reload_each_execution or cache_key not in cls._loaded_workflows:
reason = "reload_each_execution enabled" if reload_each_execution else "cache miss"
log.info("[Subworkflow] loading inner workflow %r (%s)", label, reason)
cls._loaded_workflows[cache_key] = cls._load_source(**kwargs)
else:
log.debug("[Subworkflow] reusing cached inner workflow %r", label)
return cls._loaded_workflows[cache_key]
@classmethod
def _execute_source(cls, reload_each_execution=True, **kwargs):
label = cls._source_label(**kwargs)
log.debug(
"[Subworkflow] executing inner workflow %r (reload_each_execution=%s, dynamic_inputs=%s)",
label,
reload_each_execution,
sorted(k for k in kwargs if k.startswith("swf_in_")),
)
data = cls._get_workflow_data(reload_each_execution, **kwargs)
validate_workflow_nodes_installed(data)
effective_inputs = _apply_primitive_overrides(data, kwargs)
output_refs, graph = build_expansion(data, effective_inputs)
return cls._finalize_execution(output_refs, graph, data, reload_each_execution)
@classmethod
def _finalize_execution(cls, output_refs, graph, data: dict, reload_each_execution: bool):
if not reload_each_execution:
apply_control_after_generate(data)
return io.NodeOutput(*output_refs, expand=graph.finalize())
def _subworkflow_outputs():
return []
def _reload_input():
return io.Boolean.Input(
"reload_each_execution",
display_name="at execution",
default=True,
label_on="reload",
label_off="keep loaded",
)
class Subworkflow(BaseSubworkflow):
"""
Selects a saved workflow, exposes its Subworkflow Input nodes as inputs and
its Subworkflow Output nodes as outputs, then executes the inner workflow as
a transparent subgraph when the outer workflow runs.
"""
@classmethod
def define_schema(cls):
return io.Schema(
node_id="SWF_Subworkflow",
display_name="Subworkflow",
category="Subworkflow",
description=(
"Executes a selected workflow as an expandable subworkflow, "
"using Subworkflow Input and Subworkflow Output boundary nodes."
),
inputs=[
io.Combo.Input("workflow", options=list_workflow_files()),
_reload_input(),
],
outputs=_subworkflow_outputs(),
enable_expand=True,
accept_all_inputs=True,
)
@classmethod
def _source_label(cls, workflow: str, **kwargs) -> str:
return workflow
@classmethod
def _source_cache_key(cls, workflow: str, **kwargs) -> str:
return f"file:{workflow}"
@classmethod
def _load_source(cls, workflow: str, **kwargs) -> dict:
if not workflow:
raise ValueError("No workflow selected. Choose a workflow file from the dropdown.")
try:
return load_workflow_file(workflow)
except FileNotFoundError:
raise ValueError(f"Workflow file not found: {workflow!r}") from None
@classmethod
def execute(cls, workflow: str, reload_each_execution=True, **kwargs):
return cls._execute_source(
workflow=workflow,
reload_each_execution=reload_each_execution,
**kwargs,
)
class SubworkflowFromURL(BaseSubworkflow):
"""
Loads a workflow JSON document from an HTTP(S) URL and executes it as a
transparent subgraph.
"""
@classmethod
def define_schema(cls):
log.debug("[Subworkflow] SubworkflowFromURL define_schema called")
return io.Schema(
node_id="SWF_SubworkflowFromURL",
display_name="Subworkflow (from URL)",
category="Subworkflow",
description=(
"Executes a workflow loaded from an HTTP(S) URL as an expandable "
"subworkflow, using Subworkflow Input and Subworkflow Output "
"boundary nodes."
),
inputs=[
io.String.Input("url", multiline=False, default=""),
io.Boolean.Input(
"verify_ssl",
display_name="verify SSL",
default=True,
label_on="verify",
label_off="skip",
),
_reload_input(),
],
outputs=_subworkflow_outputs(),
enable_expand=True,
accept_all_inputs=True,
)
@classmethod
def _source_label(cls, url: str, **kwargs) -> str:
log.debug("[Subworkflow] SubworkflowFromURL _source_label url=%r", url)
return url
@classmethod
def _source_cache_key(cls, url: str, verify_ssl: bool = True, **kwargs) -> str:
log.debug(
"[Subworkflow] SubworkflowFromURL _source_cache_key url=%r stripped=%r verify_ssl=%s",
url,
url.strip(),
verify_ssl,
)
return f"url:{url.strip()}:verify_ssl:{bool(verify_ssl)}"
@classmethod
def _load_source(cls, url: str, verify_ssl: bool = True, **kwargs) -> dict:
log.debug(
"[Subworkflow] SubworkflowFromURL _load_source url=%r verify_ssl=%s kwargs_keys=%s",
url,
verify_ssl,
sorted(kwargs.keys()),
)
if not url:
log.warning("[Subworkflow] SubworkflowFromURL _load_source missing URL")
raise ValueError("No workflow URL specified.")
return load_workflow_url(url.strip(), verify_ssl=bool(verify_ssl))
@classmethod
def execute(cls, url: str, verify_ssl=True, reload_each_execution=True, **kwargs):
log.debug(
"[Subworkflow] SubworkflowFromURL execute called url=%r verify_ssl=%s reload_each_execution=%s dynamic_inputs=%s",
url,
verify_ssl,
reload_each_execution,
sorted(k for k in kwargs if k.startswith("swf_in_")),
)
return cls._execute_source(
url=url,
verify_ssl=verify_ssl,
reload_each_execution=reload_each_execution,
**kwargs,
)
class SubworkflowModifierSource(BaseSubworkflow):
"""
Exposes a modifier-source slot from an inner workflow as a separate outer node.
This breaks the outer dependency cycle by moving the pre-modifier value onto
its own node, while the main Subworkflow keeps only the modifier input.
"""
@classmethod
def define_schema(cls):
return io.Schema(
node_id="SWF_SubworkflowModifierSource",
display_name="Subworkflow Modifier Source",
category="Subworkflow",
description=(
"Exposes all modifier source slots from a selected inner workflow. "
"Use this together with the main Subworkflow node to break outer "
"modifier dependency loops."
),
inputs=[
io.Combo.Input("workflow", options=list_workflow_files()),
],
outputs=_subworkflow_outputs(),
enable_expand=True,
accept_all_inputs=True,
)
@classmethod
def _source_label(cls, workflow: str, **kwargs) -> str:
return workflow
@classmethod
def _source_cache_key(cls, workflow: str, **kwargs) -> str:
return f"file:{workflow}"
@classmethod
def _load_source(cls, workflow: str, **kwargs) -> dict:
if not workflow:
raise ValueError("No workflow selected. Choose a workflow file from the dropdown.")
try:
return load_workflow_file(workflow)
except FileNotFoundError:
raise ValueError(f"Workflow file not found: {workflow!r}") from None
@classmethod
def execute(cls, workflow: str, **kwargs):
reload_each_execution = True
data = cls._get_workflow_data(reload_each_execution, workflow=workflow, **kwargs)
validate_workflow_nodes_installed(data)
output_refs, graph = build_modifier_source_expansion(data, kwargs)
return cls._finalize_execution(output_refs, graph, data, reload_each_execution)
class SubworkflowModifierSourceFromURL(BaseSubworkflow):
@classmethod
def define_schema(cls):
return io.Schema(
node_id="SWF_SubworkflowModifierSourceFromURL",
display_name="Subworkflow Modifier Source (from URL)",
category="Subworkflow",
description=(
"Exposes all modifier source slots from a workflow loaded from an "
"HTTP(S) URL."
),
inputs=[
io.String.Input("url", multiline=False, default=""),
io.Boolean.Input(
"verify_ssl",
display_name="verify SSL",
default=True,
label_on="verify",
label_off="skip",
),
],
outputs=_subworkflow_outputs(),
enable_expand=True,
accept_all_inputs=True,
)
@classmethod
def _source_label(cls, url: str, **kwargs) -> str:
return url
@classmethod
def _source_cache_key(cls, url: str, verify_ssl: bool = True, **kwargs) -> str:
return f"url:{url.strip()}:verify_ssl:{bool(verify_ssl)}"
@classmethod
def _load_source(cls, url: str, verify_ssl: bool = True, **kwargs) -> dict:
if not url:
raise ValueError("No workflow URL specified.")
return load_workflow_url(url.strip(), verify_ssl=bool(verify_ssl))
@classmethod
def execute(cls, url: str, verify_ssl: bool, **kwargs):
reload_each_execution = True
data = cls._get_workflow_data(
reload_each_execution,
url=url,
verify_ssl=verify_ssl,
**kwargs,
)
validate_workflow_nodes_installed(data)
output_refs, graph = build_modifier_source_expansion(data, kwargs)
return cls._finalize_execution(output_refs, graph, data, reload_each_execution)
_DYNAMIC_RETURN = _UnboundedOutputTypes()
for _cls in (Subworkflow, SubworkflowFromURL, SubworkflowModifierSource, SubworkflowModifierSourceFromURL):
# Pre-set so GET_SCHEMA() doesn't overwrite with the empty list that outputs=[] produces.
# Returning '*' for any slot index lets validation accept connections to any output slot.
_cls._RETURN_TYPES = _DYNAMIC_RETURN
_cls._RETURN_NAMES = []
_cls._OUTPUT_IS_LIST = _UnboundedFalseSeq()
_cls._OUTPUT_TOOLTIPS = []