From a453b25642f01329e2d6ae41f1b95d6420680ca6 Mon Sep 17 00:00:00 2001 From: hellysmile Date: Sun, 1 Jul 2018 02:25:21 +0300 Subject: [PATCH 1/4] Added suport contextvars for loop.run_in_executor --- Lib/asyncio/base_events.py | 15 +++++++++++-- Lib/test/test_asyncio/test_events.py | 33 ++++++++++++++++++++++++++++ 2 files changed, 46 insertions(+), 2 deletions(-) diff --git a/Lib/asyncio/base_events.py b/Lib/asyncio/base_events.py index dc0ca3f02b9bf6..74934b52e0b38d 100644 --- a/Lib/asyncio/base_events.py +++ b/Lib/asyncio/base_events.py @@ -16,6 +16,8 @@ import collections import collections.abc import concurrent.futures +import contextvars +import functools import heapq import itertools import logging @@ -728,7 +730,7 @@ def call_soon_threadsafe(self, callback, *args, context=None): self._write_to_self() return handle - def run_in_executor(self, executor, func, *args): + def run_in_executor(self, executor, func, *args, context=None): self._check_closed() if self._debug: self._check_callback(func, 'run_in_executor') @@ -737,8 +739,17 @@ def run_in_executor(self, executor, func, *args): if executor is None: executor = concurrent.futures.ThreadPoolExecutor() self._default_executor = executor + + if context is None: + context = contextvars.copy_context() + + if args: + fn = functools.partial(func, *args) + else: + fn = func + return futures.wrap_future( - executor.submit(func, *args), loop=self) + executor.submit(context.run, fn), loop=self) def set_default_executor(self, executor): self._default_executor = executor diff --git a/Lib/test/test_asyncio/test_events.py b/Lib/test/test_asyncio/test_events.py index 11cd950df1cedb..8f1438d260f364 100644 --- a/Lib/test/test_asyncio/test_events.py +++ b/Lib/test/test_asyncio/test_events.py @@ -2,6 +2,7 @@ import collections.abc import concurrent.futures +import contextvars import functools import io import os @@ -369,6 +370,38 @@ def run(): time.sleep(0.4) self.assertFalse(called) + def test_run_in_executor_no_context(self): + def run(): + return foo.get() + + foo = contextvars.ContextVar('foo') + foo.set('bar') + f = self.loop.run_in_executor(None, run) + res = self.loop.run_until_complete(f) + self.assertEqual(res, 'bar') + + def test_run_in_executor_context(self): + def run(): + return foo.get() + + foo = contextvars.ContextVar('foo') + foo.set('bar') + context = contextvars.copy_context() + f = self.loop.run_in_executor(None, run, context=context) + res = self.loop.run_until_complete(f) + self.assertEqual(res, 'bar') + + def test_run_in_executor_context_args(self): + def run(arg): + return (arg, foo.get()) + + foo = contextvars.ContextVar('foo') + foo.set('bar') + context = contextvars.copy_context() + f = self.loop.run_in_executor(None, run, 'yo', context=context) + res = self.loop.run_until_complete(f) + self.assertEqual(res, ('yo', 'bar')) + def test_reader_callback(self): r, w = socket.socketpair() r.setblocking(False) From 6fbe93fc04e551ccb0a0b231056394a6d08b2cc8 Mon Sep 17 00:00:00 2001 From: hellysmile Date: Sun, 1 Jul 2018 02:37:25 +0300 Subject: [PATCH 2/4] Added new entry. --- .../NEWS.d/next/Library/2018-07-01-02-37-05.bpo-34014.RfrJGJ.rst | 1 + 1 file changed, 1 insertion(+) create mode 100644 Misc/NEWS.d/next/Library/2018-07-01-02-37-05.bpo-34014.RfrJGJ.rst diff --git a/Misc/NEWS.d/next/Library/2018-07-01-02-37-05.bpo-34014.RfrJGJ.rst b/Misc/NEWS.d/next/Library/2018-07-01-02-37-05.bpo-34014.RfrJGJ.rst new file mode 100644 index 00000000000000..28c0a9c504da4a --- /dev/null +++ b/Misc/NEWS.d/next/Library/2018-07-01-02-37-05.bpo-34014.RfrJGJ.rst @@ -0,0 +1 @@ +Added support of contextvars for BaseEventLoop.run_in_executor From 20e139d5e69cedd0cbc8a4b481e47a7a2a87ea4d Mon Sep 17 00:00:00 2001 From: hellysmile Date: Sun, 1 Jul 2018 02:47:02 +0300 Subject: [PATCH 3/4] Set contextvars on top of the module. --- Lib/test/test_asyncio/test_events.py | 29 +++++++++++++++++++--------- 1 file changed, 20 insertions(+), 9 deletions(-) diff --git a/Lib/test/test_asyncio/test_events.py b/Lib/test/test_asyncio/test_events.py index 8f1438d260f364..fbd4bcdfac4711 100644 --- a/Lib/test/test_asyncio/test_events.py +++ b/Lib/test/test_asyncio/test_events.py @@ -37,6 +37,9 @@ from test.test_asyncio import utils as test_utils from test import support +foo_ctx = contextvars.ContextVar('foo') +foo_ctx.set('bar') + def tearDownModule(): asyncio.set_event_loop_policy(None) @@ -370,22 +373,32 @@ def run(): time.sleep(0.4) self.assertFalse(called) + def test_run_in_executor_hierarchy(self): + def run(): + foo_ctx.set('foo') + res = foo_ctx.get() + self.assertEqual(res, 'foo') + return res + + f = self.loop.run_in_executor(None, run) + res = self.loop.run_until_complete(f) + self.assertEqual(res, 'foo') + + res = foo_ctx.get() + self.assertEqual(res, 'bar') + def test_run_in_executor_no_context(self): def run(): - return foo.get() + return foo_ctx.get() - foo = contextvars.ContextVar('foo') - foo.set('bar') f = self.loop.run_in_executor(None, run) res = self.loop.run_until_complete(f) self.assertEqual(res, 'bar') def test_run_in_executor_context(self): def run(): - return foo.get() + return foo_ctx.get() - foo = contextvars.ContextVar('foo') - foo.set('bar') context = contextvars.copy_context() f = self.loop.run_in_executor(None, run, context=context) res = self.loop.run_until_complete(f) @@ -393,10 +406,8 @@ def run(): def test_run_in_executor_context_args(self): def run(arg): - return (arg, foo.get()) + return (arg, foo_ctx.get()) - foo = contextvars.ContextVar('foo') - foo.set('bar') context = contextvars.copy_context() f = self.loop.run_in_executor(None, run, 'yo', context=context) res = self.loop.run_until_complete(f) From ae42bf584020a49c2bf0fc7a761776281efe2a6c Mon Sep 17 00:00:00 2001 From: hellysmile Date: Sun, 1 Jul 2018 12:18:13 +0300 Subject: [PATCH 4/4] Simplify run_in_executor partial wrapper --- Lib/asyncio/base_events.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/Lib/asyncio/base_events.py b/Lib/asyncio/base_events.py index 74934b52e0b38d..deb5069321604b 100644 --- a/Lib/asyncio/base_events.py +++ b/Lib/asyncio/base_events.py @@ -744,12 +744,10 @@ def run_in_executor(self, executor, func, *args, context=None): context = contextvars.copy_context() if args: - fn = functools.partial(func, *args) - else: - fn = func + func = functools.partial(func, *args) return futures.wrap_future( - executor.submit(context.run, fn), loop=self) + executor.submit(context.run, func), loop=self) def set_default_executor(self, executor): self._default_executor = executor