Skip to content

Make it easier to farm out work #685

@Nikratio

Description

@Nikratio

I have a bunch of work items that need processing. This involves calling expensive functions that release the GIL, so I want to use worker threads. Ideally, I would like to write code like this:

limiter = trio.CapacityLimiter(max_workers)
with something_something:
  for item in worklist:
    await trio.run_in_worker_thread(do_work, item, limiter=limiter)

This should go through worklist, making sure that all max_workers threads are busy. At the end of the while statement, all workers should have completed.

Sadly, there seems to be no something_something that I could use for with with block, nor is there a run_in_worker_thread that is not synchronous. So I think what I would need to write instead is something like:

END_SENTINEL = object()
async def worker_loop(q):
    while True:
        item = q.get()
        if item is END_SENTINEL:
            break
        await trio.run_sync_in_worker_thread(do_work, item)

queue = trio.Queue(1)
with trio.open_nursery() as nursery:
    for _ in max_workers:
        nursery.start_soon(run_thread_as_task, worker_loop, queue)
    for item in worklist:
        await queue.put(item)
    for _ in max_workers:
        await queue.put(END_SENTINEL)

This is rather verbose. I was wondering if Trio might be able to offer something more concise, along the lines of the first (currently hypothetical) code?

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions