Skip to content

Conversation

@liunux4odoo
Copy link
Contributor

@liunux4odoo liunux4odoo commented Sep 20, 2024

As discussed in #3 , I rewrite ProcessJob.run to support sync generators.
Please review if this approach is appropriate, we can rewrite other Job.run methods and support async generators too.

example code:

import typing as t
import asyncio

from executor.engine import Engine, ProcessJob


def func(n: int) -> t.List[int]:
    import time

    time.sleep(1)
    return list(range(n))


def gen(n: int) -> t.Generator[int, None, None]:
    import time

    for i in range(n):
        time.sleep(0.5)
        print(f"sync yield from executor: {i}")
        yield i


async def gen_async(n: int) -> t.AsyncGenerator[int, None]:
    import asyncio

    for i in range(n):
        await asyncio.sleep(0.5)
        print(f"async yield from executor: {i}")
        yield i


async def main():
    with Engine() as engine:
        job1 = ProcessJob(func, (5,))
        engine.submit(job1)
        engine.wait()
        print(job1.result())

        job2 = ProcessJob(gen, (5,))
        engine.submit(job2)
        engine.wait()
        for x in job2.result():
            print(f"main received: {x}")

        job3 = ProcessJob(gen_async, (5,))
        engine.submit(job3)
        engine.wait()
        async for x in job3.result():
            print(f"main received: {x}")

if __name__ == "__main__":
    asyncio.run(main())

@Nanguage Nanguage marked this pull request as ready for review September 20, 2024 08:58
@Nanguage Nanguage merged commit c2e7da4 into Nanguage:master Sep 20, 2024
@Nanguage
Copy link
Owner

Thank you for your contribution, it's a very clever implementation.

@liunux4odoo
Copy link
Contributor Author

Thank you for your contribution, it's a very clever implementation.

Glad to contribute.

I have been searching for useful concurrent frameworks for quite some time, and this is the lightest and most practical one I have seen.
I will create another PR later to make ThreadJob support generators too. I'm not familiar with Dask, it's implementation need more research.

@Nanguage
Copy link
Owner

I am trying to support generators at the base class level of Job, making the implementation of process a special case.

@liunux4odoo
Copy link
Contributor Author

OK, expecting official implementation.

I tried LocalJob and ThreadJob, it seems the generator job works well. The generator object could be returned directly from them without pickle.

@Nanguage
Copy link
Owner

I have defined the run_generator method in the base class of Job and made the implementation of ProcessJob a specific case of it.

async def run(self):
"""Run the job."""
if inspect.isgeneratorfunction(self.func) or inspect.isasyncgenfunction(self.func): # noqa: E501
return await self.run_generator()
else:
return await self.run_function()
async def run_function(self): # pragma: no cover
"""Run the job as a function."""
msg = f"{type(self).__name__} does not implement " \
"run_function method."
raise NotImplementedError(msg)
async def run_generator(self): # pragma: no cover
"""Run the job as a generator."""
msg = f"{type(self).__name__} does not implement " \
"run_generator method."
raise NotImplementedError(msg)

I think you can build upon this and incorporate your implementation of LocalJob and ThreadJob.

Additionally, some management of the lifecycle has been added. Currently, the status of the corresponding job will only be set to done after completing the iteration. like this:

with Engine() as engine:
def gen():
for i in range(10):
yield i
job = ProcessJob(gen)
await engine.submit_async(job)
await job.join()
assert job.status == "running"
g = job.result()
assert list(g) == list(range(10))
assert job.status == "done"
async def gen_async(n):
for i in range(n):
yield i
job = ProcessJob(gen_async, (10,))
await engine.submit_async(job)
await job.join()
res = []
async for i in job.result():
assert job.status == "running"
res.append(i)
assert job.status == "done"
assert res == list(range(10))

More details see 389af29

@liunux4odoo
Copy link
Contributor Author

Ok, I would like to try it.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants