Skip to content

Refactor: Manager and Flow execute in __init__ — prevents dry-run and control #210

@FernandoCelmer

Description

@FernandoCelmer

Summary

Both `Manager.init()` and `Flow.init()` execute the entire workflow/strategy during object construction. This makes it impossible to inspect, validate, or control execution before it starts.

Current Code

Manager

# dotflow/core/workflow.py
class Manager:
    def __init__(self, tasks, on_success, on_failure, mode, keep_going, workflow_id, resume, config):
        # ... setup ...

        execution = getattr(self, mode)

        # Executes the entire workflow right here, in __init__:
        self.tasks = execution(
            tasks=tasks,
            workflow_id=self.workflow_id,
            ignore=keep_going,
            groups=groups,
            resume=resume,
        )

        if mode != TypeExecution.BACKGROUND:
            self._callback_workflow(tasks=self.tasks)

Flow (ABC)

# dotflow/abc/flow.py
class Flow(ABC):
    def __init__(self, tasks, workflow_id, ignore, groups, resume):
        # ... setup ...

        self.setup_queue()
        self.run()  # ← executes in __init__

Problems

1. No control between creation and execution

manager = Manager(tasks=tasks, mode="sequential")
# At this point, the workflow has ALREADY finished executing.
# There's no way to:
# - Inspect the task list before execution
# - Validate configuration
# - Add last-minute tasks
# - Do a dry-run

2. Constructor side effects are an anti-pattern

Object construction (`init`) should set up the object's state, not trigger execution. Constructors with side effects are:

  • Hard to test (can't create instance without triggering full execution)
  • Hard to extend (subclass `init` must account for execution happening in `super().init`)
  • Surprising to users who expect `Manager(tasks)` to prepare, not execute

3. DotFlow uses `functools.partial` to work around this

# dotflow/core/dotflow.py
class DotFlow:
    def __init__(self, config=None, workflow_id=None):
        # Uses partial to delay Manager execution:
        self.start = partial(Manager, tasks=self.task.queue, ...)

`DotFlow` wraps `Manager` in a `partial` because calling `Manager()` directly would execute immediately. This is a workaround for the init-execution pattern.

Proposed Fix

Separate construction from execution:

Manager

class Manager:
    def __init__(self, tasks, on_success, on_failure, mode, keep_going, workflow_id, resume, config):
        self.tasks = tasks
        self.on_success = on_success
        self.on_failure = on_failure
        self.mode = mode
        self.keep_going = keep_going
        self.workflow_id = workflow_id or uuid4()
        self.resume = resume
        self.config = config
        self.started = None

    def run(self):
        """Execute the workflow. Call this explicitly after construction."""
        self.started = datetime.now()

        if self.config:
            self.config.tracer.start_workflow(...)

        groups = grouper(tasks=self.tasks)
        execution = getattr(self, self.mode)

        self.tasks = execution(
            tasks=self.tasks,
            workflow_id=self.workflow_id,
            ignore=self.keep_going,
            groups=groups,
            resume=self.resume,
        )

        if self.mode != TypeExecution.BACKGROUND:
            self._callback_workflow(tasks=self.tasks)

        return self.tasks

Flow (ABC)

class Flow(ABC):
    def __init__(self, tasks, workflow_id, ignore, groups, resume):
        self.tasks = tasks
        self.workflow_id = workflow_id
        self.ignore = ignore
        self.groups = groups
        self.resume = resume
        self.setup_queue()
        # run() is NOT called here anymore

    @abstractmethod
    def run(self):
        ...

Updated DotFlow

class DotFlow:
    def start(self, mode="sequential", **kwargs):
        manager = Manager(
            tasks=self.task.queue,
            workflow_id=self.workflow_id,
            config=self._config,
            mode=mode,
            **kwargs,
        )
        return manager.run()  # explicit execution

New possibilities

# Dry-run / validation:
manager = Manager(tasks=tasks, mode="sequential")
print(f"Will execute {len(manager.tasks)} tasks in {manager.mode} mode")
manager.run()

# Conditional execution:
manager = Manager(tasks=tasks, mode="parallel")
if all_dependencies_ready():
    manager.run()

Backward Compatibility

This is a breaking change for users who call `Manager()` directly. However:

  • `DotFlow.start()` is the primary public API — updating the `partial` to call `.run()` maintains compatibility for most users
  • `Manager` is documented as an internal class, not a primary user-facing API
  • A deprecation warning can be added to `init` if it detects tasks being passed (to ease migration)

Checklist

  • Move execution logic from `Manager.init` to `Manager.run()`
  • Move `self.run()` call out of `Flow.init`
  • Update strategy classes (`Sequential`, `Parallel`, etc.) — caller must call `.run()` after construction
  • Update `Manager.sequential()`, `.parallel()`, etc. to call `.run()` on the strategy
  • Update `DotFlow.start` to call `Manager().run()`
  • Add deprecation warning if needed
  • Update tests

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions