Skip to content

Bug: Async task execution creates new event loops with asyncio.run() #211

@FernandoCelmer

Description

@FernandoCelmer

Summary

`Action._call_func()` handles async functions by calling `asyncio.run()`, which creates a new event loop on every invocation. When called from within an existing event loop, it falls back to spawning a thread pool — a fragile workaround that adds overhead and can cause subtle concurrency issues.

Current Code

# dotflow/core/action.py

class Action:
    def _call_func(self, is_async, *args, **kwargs):
        if is_async:
            try:
                loop = asyncio.get_running_loop()
            except RuntimeError:
                loop = None

            if loop and loop.is_running():
                # Workaround: run async func in a separate thread with its own event loop
                import concurrent.futures
                with concurrent.futures.ThreadPoolExecutor() as pool:
                    return pool.submit(
                        asyncio.run, self.func(*args, **kwargs)
                    ).result()
            return asyncio.run(self.func(*args, **kwargs))
        return self.func(*args, **kwargs)

Problems

1. `asyncio.run()` creates a new event loop every time

Each task invocation creates and destroys an event loop. This means:

  • No shared async resources (connections, sessions, semaphores) across tasks
  • Event loop creation overhead on every task execution
  • `contextvars` from the parent loop are not propagated

2. Thread pool workaround is fragile

When a loop is already running, the code spawns a thread with its own `asyncio.run()`. This:

  • Breaks `asyncio.Lock`, `asyncio.Event`, etc. — they're bound to a specific loop
  • Cannot share connections or state with the parent loop's tasks
  • Adds thread synchronization overhead
  • Can cause deadlocks if the async function tries to interact with the parent loop

3. Doesn't work in Jupyter notebooks / IPython

Jupyter runs its own event loop. The thread pool workaround technically works but is inefficient and doesn't integrate with Jupyter's loop.

Example of breakage

import asyncio
import aiohttp

session = None

@action
async def init_session():
    global session
    session = aiohttp.ClientSession()  # bound to THIS event loop

@action
async def fetch_data():
    # This runs in a DIFFERENT event loop (new asyncio.run())
    async with session.get("https://api.example.com") as resp:  
        # RuntimeError: Session and target loop are different!
        return await resp.json()

Proposed Fix

Short-term: use `loop.run_until_complete()` when a loop exists

def _call_func(self, is_async, *args, **kwargs):
    if not is_async:
        return self.func(*args, **kwargs)

    try:
        loop = asyncio.get_running_loop()
    except RuntimeError:
        loop = None

    if loop is None:
        # No running loop — safe to create one
        return asyncio.run(self.func(*args, **kwargs))

    if not loop.is_running():
        # Loop exists but isn't running — use it
        return loop.run_until_complete(self.func(*args, **kwargs))

    # Loop is running (e.g., inside another async context)
    # Use nest_asyncio or schedule as a task
    import nest_asyncio
    nest_asyncio.apply()
    return asyncio.run(self.func(*args, **kwargs))

Long-term: native async engine

Add an `AsyncTaskEngine` that runs within an event loop, avoiding the sync/async boundary entirely:

class AsyncTaskEngine:
    async def start(self):
        ...

    async def execute(self):
        result = await self.task.step(...)
        ...

# Usage:
async def run_workflow_async(tasks):
    for task in tasks:
        engine = AsyncTaskEngine(task=task, ...)
        async with engine.start():
            await engine.execute()

This would be opt-in and wouldn't replace the sync engine.

Checklist

  • Evaluate whether `nest_asyncio` is an acceptable dependency
  • Short-term: improve `_call_func` to reuse existing event loops when possible
  • Long-term: design `AsyncTaskEngine` for native async workflow execution
  • Add tests for async tasks called from sync context
  • Add tests for async tasks called from within a running event loop
  • Document async task limitations and best practices

Metadata

Metadata

Assignees

No one assigned

    Labels

    0.15.0Introduced in v0.15.0bugSomething isn't working

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions