Skip to content

Refactor: Deduplicate execute() and _execute_single() in TaskEngine #207

@FernandoCelmer

Description

@FernandoCelmer

Summary

`TaskEngine` has two methods that contain nearly identical logic: `execute()` and `_execute_single()`. The only difference is that `execute()` sets `self.task.current_context` while `_execute_single()` returns the context.

Current Code

# dotflow/core/engine.py

class TaskEngine:

    def execute(self):
        """Executes the task function and returns the context."""
        current_context = self.task.step(
            initial_context=self.task.initial_context,
            previous_context=self.task.previous_context,
            task=self.task,
        )

        if type(current_context.storage) not in self.VALID_OBJECTS:
            current_context = self._execution_with_class(
                class_instance=current_context.storage
            )

        self.task.current_context = current_context    # ← sets on task
        return current_context

    def _execute_single(self):
        """Executes the task function once, handling class-based steps."""
        current_context = self.task.step(
            initial_context=self.task.initial_context,
            previous_context=self.task.previous_context,
            task=self.task,
        )

        if type(current_context.storage) not in self.VALID_OBJECTS:
            current_context = self._execution_with_class(
                class_instance=current_context.storage
            )

        return current_context                         # ← returns only

The two methods are identical except for the `self.task.current_context = current_context` line.

`execute_with_retry()` uses `_execute_single()` and then sets the context itself:

def execute_with_retry(self):
    for attempt in range(1, max_attempts + 1):
        try:
            result = self._execute_single()       # calls _execute_single
            self.task.current_context = result     # sets context here
            return result

Proposed Fix

Remove `execute()` and have it delegate to `_execute_single()`:

class TaskEngine:

    def execute(self):
        """Executes the task function and sets the context on the task."""
        current_context = self._execute_single()
        self.task.current_context = current_context
        return current_context

    def _execute_single(self):
        """Executes the task function once. Returns context without side effects."""
        current_context = self.task.step(
            initial_context=self.task.initial_context,
            previous_context=self.task.previous_context,
            task=self.task,
        )

        if type(current_context.storage) not in self.VALID_OBJECTS:
            current_context = self._execution_with_class(
                class_instance=current_context.storage
            )

        return current_context

This keeps the public API unchanged while eliminating the duplicated logic.

Checklist

  • Refactor `execute()` to delegate to `_execute_single()`
  • Verify all callers of `execute()` still work (used by `Execution` wrapper)
  • Verify `execute_with_retry()` still works
  • Ensure tests pass

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions