From 9bbc5db5b4678be31cd61603880f821632f5a89e Mon Sep 17 00:00:00 2001 From: Duprat Date: Fri, 10 Feb 2023 17:52:33 +0100 Subject: [PATCH 01/15] change comment --- Lib/queue.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/Lib/queue.py b/Lib/queue.py index f6af7cb6df5cffc..f08dbd47f188ee1 100644 --- a/Lib/queue.py +++ b/Lib/queue.py @@ -245,6 +245,10 @@ def shutdown(self, immediate=False): if immediate: self.shutdown_state = _queue_shutdown_immediate self.not_empty.notify_all() + # set self.unfinished_tasks to 0 + # to break the loop in 'self.join()' + # when quits from `wait()` + self.unfinished_tasks = 0 self.all_tasks_done.notify_all() else: self.shutdown_state = _queue_shutdown From f9f2c0629a5dd283e64f804f88a14d7e4d4655bd Mon Sep 17 00:00:00 2001 From: Duprat Date: Fri, 10 Feb 2023 17:57:21 +0100 Subject: [PATCH 02/15] call to self._finished.set() in order to release all joined tasks/coros --- Lib/asyncio/queues.py | 35 +++++++++++++++++++++-------------- 1 file changed, 21 insertions(+), 14 deletions(-) diff --git a/Lib/asyncio/queues.py b/Lib/asyncio/queues.py index a869993a1de3fe2..5b58f753a0fac7d 100644 --- a/Lib/asyncio/queues.py +++ b/Lib/asyncio/queues.py @@ -8,6 +8,7 @@ ) import collections +import enum import heapq from types import GenericAlias @@ -30,9 +31,10 @@ class QueueShutDown(Exception): pass -_queue_alive = "alive" -_queue_shutdown = "shutdown" -_queue_shutdown_immediate = "shutdown-immediate" +class _QueueState(enum.Enum): + ALIVE = "alive" + SHUTDOWN = "shutdown" + SHUTDOWN_IMMEDIATE = "shutdown-immediate" class Queue(mixins._LoopBoundMixin): @@ -58,7 +60,7 @@ def __init__(self, maxsize=0): self._finished = locks.Event() self._finished.set() self._init(maxsize) - self.shutdown_state = _queue_alive + self._shutdown_state = _QueueState.ALIVE # These three are overridable in subclasses. @@ -99,6 +101,8 @@ def _format(self): result += f' _putters[{len(self._putters)}]' if self._unfinished_tasks: result += f' tasks={self._unfinished_tasks}' + if self._shutdown_state is not _QueueState.ALIVE: + result += f' shutdown={self._shutdown_state.value}' return result def qsize(self): @@ -131,7 +135,7 @@ async def put(self, item): Put an item into the queue. If the queue is full, wait until a free slot is available before adding item. """ - if self.shutdown_state != _queue_alive: + if self._shutdown_state is not _QueueState.ALIVE: raise QueueShutDown while self.full(): putter = self._get_loop().create_future() @@ -152,7 +156,7 @@ async def put(self, item): # the call. Wake up the next in line. self._wakeup_next(self._putters) raise - if self.shutdown_state != _queue_alive: + if self._shutdown_state is not _QueueState.ALIVE: raise QueueShutDown return self.put_nowait(item) @@ -161,7 +165,7 @@ def put_nowait(self, item): If no free slot is immediately available, raise QueueFull. """ - if self.shutdown_state != _queue_alive: + if self._shutdown_state is not _QueueState.ALIVE: raise QueueShutDown if self.full(): raise QueueFull @@ -175,10 +179,10 @@ async def get(self): If queue is empty, wait until an item is available. """ - if self.shutdown_state == _queue_shutdown_immediate: + if self._shutdown_state is _QueueState.SHUTDOWN_IMMEDIATE: raise QueueShutDown while self.empty(): - if self.shutdown_state != _queue_alive: + if self._shutdown_state is not _QueueState.ALIVE: raise QueueShutDown getter = self._get_loop().create_future() self._getters.append(getter) @@ -198,7 +202,7 @@ async def get(self): # the call. Wake up the next in line. self._wakeup_next(self._getters) raise - if self.shutdown_state == _queue_shutdown_immediate: + if self._shutdown_state is not _QueueState.ALIVE: raise QueueShutDown return self.get_nowait() @@ -208,10 +212,10 @@ def get_nowait(self): Return an item if one is immediately available, else raise QueueEmpty. """ if self.empty(): - if self.shutdown_state != _queue_alive: + if self._shutdown_state is not _QueueState.ALIVE: raise QueueShutDown raise QueueEmpty - elif self.shutdown_state == _queue_shutdown_immediate: + elif self._shutdown_state is _QueueState.SHUTDOWN_IMMEDIATE: raise QueueShutDown item = self._get() self._wakeup_next(self._putters) @@ -258,17 +262,20 @@ def shutdown(self, immediate=False): and join() if 'immediate'. The QueueShutDown exception is raised. """ if immediate: - self.shutdown_state = _queue_shutdown_immediate + self._shutdown_state = _QueueState.SHUTDOWN_IMMEDIATE while self._getters: getter = self._getters.popleft() if not getter.done(): getter.set_result(None) else: - self.shutdown_state = _queue_shutdown + self._shutdown_state = _QueueState.SHUTDOWN while self._putters: putter = self._putters.popleft() if not putter.done(): putter.set_result(None) + # Release 'joined' tasks/coros + self._finished.set() + class PriorityQueue(Queue): """A subclass of Queue; retrieves entries in priority order (lowest first). From 7491ef1ff9e36213333e650cd6c5c398174c16df Mon Sep 17 00:00:00 2001 From: Duprat Date: Fri, 10 Feb 2023 17:59:19 +0100 Subject: [PATCH 03/15] add unitests to `shutdwon` method --- Lib/test/test_asyncio/test_queues.py | 95 ++++++++++++++++++++++++++++ 1 file changed, 95 insertions(+) diff --git a/Lib/test/test_asyncio/test_queues.py b/Lib/test/test_asyncio/test_queues.py index 418c3fe618d89b8..718b1ef5c776340 100644 --- a/Lib/test/test_asyncio/test_queues.py +++ b/Lib/test/test_asyncio/test_queues.py @@ -560,6 +560,101 @@ async def test_immediate(self): except asyncio.QueueShutDown: pass + async def test_shutdown_repr(self): + q = self.q_class() + q.shutdown() + self.assertIn("shutdown", repr(q)) + + q = self.q_class() + q.shutdown(immediate=True) + self.assertIn("shutdown-immediate", repr(q)) + + async def test_get_shutdown_immediate(self): + results = [] + maxsize = 2 + delay = 1e-3 + + async def get_once(q): + try: + msg = await q.get() + results.append(False) + except asyncio.QueueShutDown: + results.append(True) + return True + + async def shutdown(q, delay, immediate): + await asyncio.sleep(delay) + q.shutdown(immediate) + return True + + q = self.q_class(maxsize) + t = [asyncio.create_task(get_once(q)) for _ in range(maxsize)] + t += [asyncio.create_task(shutdown(q, delay, True))] + res = await asyncio.gather(*t) + + self.assertEqual(results, [True]*maxsize) + + + async def test_put_shutdown(self): + maxsize = 2 + results = [] + go = asyncio.Event() + + async def put_twice(q, go, msg): + await q.put(msg) + await go.wait() + try: + await q.put(msg+maxsize) + results.append(False) + except asyncio.QueueShutDown: + results.append(True) + return msg + + async def shutdown(q, go, immediate): + q.shutdown(immediate) + go.set() + + q = self.q_class(maxsize) + t = [asyncio.create_task(put_twice(q, go, i+1)) for i in range(maxsize)] + t += [asyncio.create_task(shutdown(q, go, False))] + res = await asyncio.gather(*t) + + self.assertEqual(results, [True]*maxsize) + + + async def test_put_and_join_shutdown(self): + maxsize = 2 + results = [] + go = asyncio.Event() + + async def put_twice(q, go, msg): + await q.put(msg) + await go.wait() + try: + await q.put(msg+100) + results.append(False) + except asyncio.QueueShutDown: + results.append(True) + return msg + + async def shutdown(q, go, immediate): + q.shutdown(immediate) + go.set() + + async def join(q, delay): + await go.wait() + await q.join() + results.append(True) + return True + + q = self.q_class(maxsize) + t = [asyncio.create_task(put_twice(q, go, i+1)) for i in range(maxsize)] + t += [asyncio.create_task(shutdown(q, go, True)), + asyncio.create_task(join(q, go))] + res = await asyncio.gather(*t) + + self.assertEqual(results, [True]*(maxsize+1)) + class QueueShutdownTests( _QueueShutdownTestMixin, unittest.IsolatedAsyncioTestCase From dd22c6b53ab99cb2f25f0f4f29e65cb973717198 Mon Sep 17 00:00:00 2001 From: Duprat Date: Fri, 10 Feb 2023 17:59:38 +0100 Subject: [PATCH 04/15] add unitests to `shutdwon` method --- Lib/test/test_queue.py | 187 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 187 insertions(+) diff --git a/Lib/test/test_queue.py b/Lib/test/test_queue.py index 354299b9a5b16a6..7b377864308dda7 100644 --- a/Lib/test/test_queue.py +++ b/Lib/test/test_queue.py @@ -276,6 +276,193 @@ def test_shutdown_immediate(self): except self.queue.ShutDown: pass + def test_get_shutdown(self): + q = self.type2test(2) + results = [] + go = threading.Event() + + def get_once(q, go): + try: + go.wait() + msg = q.get() + results.append(False) + except self.queue.ShutDown: + results.append(True) + return True + + tests = ( + (get_once, (q, go)), + (get_once, (q, go)), + ) + threads = [] + for f, params in tests: + thread = threading.Thread(target=f, args=params) + thread.start() + threads.append(thread) + q.shutdown() + go.set() + for t in threads: + t.join() + + self.assertEqual(results, [True]*len(tests)) + + def test_put_shutdown(self): + q = self.type2test(2) + results = [] + go = threading.Event() + + def put_twice(q, msg, go): + q.put(msg) + go.wait() + try: + q.put(msg) + results.append(False) + except self.queue.ShutDown: + results.append(True) + return msg + + tests = ( + (put_twice, (q, 100, go)), + (put_twice, (q, 200, go)), + ) + threads = [] + for f, params in tests: + thread = threading.Thread(target=f, args=params) + thread.start() + threads.append(thread) + q.shutdown() + go.set() + for t in threads: + t.join() + + self.assertEqual(results, [True]*len(tests)) + + def _join_shutdown(self, immediate): + q = self.type2test() + results = [] + go = threading.Event() + + def join(q, go): + go.wait() + q.join() + results.append(True) + + tests = ( + (join, (q, go)), + (join, (q, go)), + ) + threads = [] + for f, params in tests: + thread = threading.Thread(target=f, args=params) + thread.start() + threads.append(thread) + go.set() + q.shutdown(immediate) + for t in threads: + t.join() + + self.assertEqual(results, [True]*len(tests)) + + def test_join_shutdown_immediate(self): + return self._join_shutdown(True) + + def test_join_shutdown(self): + return self._join_shutdown(False) + + def _put_and_join_shutdown(self, immediate): + q = self.type2test(2) + results = [] + go = threading.Event() + + def put_twice(q, msg, go): + q.put(msg) + go.wait() + try: + q.put(msg) + results.append(False) + except self.queue.ShutDown: + results.append(True) + return msg + + def join(q, go): + go.wait() + q.join() + results.append(True) + + tests = ( + (put_twice, (q, 100, go)), + (put_twice, (q, 200, go)), + (join, (q, go)), + (join, (q, go)), + ) + threads = [] + for f, params in tests: + thread = threading.Thread(target=f, args=params) + thread.start() + threads.append(thread) + go.set() + q.shutdown(immediate) + if not immediate: + self.assertTrue(q.unfinished_tasks, 2) + for i in range(2): + thread = threading.Thread(target=q.task_done) + thread.start() + threads.append(thread) + + for t in threads: + t.join() + + self.assertEqual(results, [True]*len(tests)) + + def test_put_and_join_shutdown_immediate(self): + return self._put_and_join_shutdown(True) + + def test_put_and_join_shutdown(self): + return self._put_and_join_shutdown(False) + + def _get_and_join_shutdown(self, immediate): + q = self.type2test() + results = [] + go = threading.Event() + + def get_once(q, go): + try: + go.wait() + msg = q.get() + results.append(False) + except self.queue.ShutDown: + results.append(True) + return True + + def join(q, go): + go.wait() + q.join() + results.append(True) + + tests = ( + (get_once, (q, go)), + (get_once, (q, go)), + (join, (q, go)), + (join, (q, go)), + ) + threads = [] + for f, params in tests: + thread = threading.Thread(target=f, args=params) + thread.start() + threads.append(thread) + go.set() + q.shutdown(immediate) + for t in threads: + t.join() + + self.assertEqual(results, [True]*len(tests)) + + def test_get_and_join_shutdown_immediate(self): + return self._get_and_join_shutdown(True) + + def test_get_and_join_shutdown(self): + return self._get_and_join_shutdown(False) + class QueueTest(BaseQueueTestMixin): def setUp(self): From 52393064797dc37ff34939b88165c4118600effb Mon Sep 17 00:00:00 2001 From: Duprat Date: Fri, 10 Feb 2023 18:17:02 +0100 Subject: [PATCH 05/15] replace global state variable with an enum `_QueueState` --- Lib/queue.py | 38 +++++++++++++++++++++----------------- 1 file changed, 21 insertions(+), 17 deletions(-) diff --git a/Lib/queue.py b/Lib/queue.py index f08dbd47f188ee1..e3c2f01fd57df81 100644 --- a/Lib/queue.py +++ b/Lib/queue.py @@ -1,5 +1,6 @@ '''A multi-producer, multi-consumer queue.''' +import enum import threading import types from collections import deque @@ -29,9 +30,12 @@ class ShutDown(Exception): '''Raised when put/get with shut-down queue.''' -_queue_alive = "alive" -_queue_shutdown = "shutdown" -_queue_shutdown_immediate = "shutdown-immediate" +class _QueueState(enum.Enum): + ALIVE = "alive" + SHUTDOWN = "shutdown" + SHUTDOWN_IMMEDIATE = "shutdown_immediate" + +E class Queue: @@ -64,7 +68,7 @@ def __init__(self, maxsize=0): self.unfinished_tasks = 0 # Queue shut-down state - self.shutdown_state = _queue_alive + self.shutdown_state = _QueueState.ALIVE def task_done(self): '''Indicate that a formerly enqueued task is complete. @@ -99,7 +103,7 @@ def join(self): ''' with self.all_tasks_done: while self.unfinished_tasks: - if self.shutdown_state == _queue_shutdown_immediate: + if self.shutdown_state is _QueueState.SHUTDOWN_IMMEDIATE: return self.all_tasks_done.wait() @@ -144,7 +148,7 @@ def put(self, item, block=True, timeout=None): is immediately available, else raise the Full exception ('timeout' is ignored in that case). ''' - if self.shutdown_state != _queue_alive: + if self.shutdown_state is not _QueueState.ALIVE: raise ShutDown with self.not_full: if self.maxsize > 0: @@ -154,7 +158,7 @@ def put(self, item, block=True, timeout=None): elif timeout is None: while self._qsize() >= self.maxsize: self.not_full.wait() - if self.shutdown_state != _queue_alive: + if self.shutdown_state is not _QueueState.ALIVE: raise ShutDown elif timeout < 0: raise ValueError("'timeout' must be a non-negative number") @@ -165,7 +169,7 @@ def put(self, item, block=True, timeout=None): if remaining <= 0.0: raise Full self.not_full.wait(remaining) - if self.shutdown_state != _queue_alive: + if self.shutdown_state is not _QueueState.ALIVE: raise ShutDown self._put(item) self.unfinished_tasks += 1 @@ -182,35 +186,35 @@ def get(self, block=True, timeout=None): available, else raise the Empty exception ('timeout' is ignored in that case). ''' - if self.shutdown_state == _queue_shutdown_immediate: + if self.shutdown_state is _QueueState.SHUTDOWN_IMMEDIATE: raise ShutDown with self.not_empty: if not block: if not self._qsize(): - if self.shutdown_state != _queue_alive: + if self.shutdown_state is not _QueueState.ALIVE: raise ShutDown raise Empty elif timeout is None: while not self._qsize(): - if self.shutdown_state != _queue_alive: + if self.shutdown_state is not _QueueState.ALIVE: raise ShutDown self.not_empty.wait() - if self.shutdown_state != _queue_alive: + if self.shutdown_state is not _QueueState.ALIVE: raise ShutDown elif timeout < 0: raise ValueError("'timeout' must be a non-negative number") else: endtime = time() + timeout while not self._qsize(): - if self.shutdown_state != _queue_alive: + if self.shutdown_state is not _QueueState.ALIVE: raise ShutDown remaining = endtime - time() if remaining <= 0.0: raise Empty self.not_empty.wait(remaining) - if self.shutdown_state != _queue_alive: + if self.shutdown_state is not _QueueState.ALIVE: raise ShutDown - if self.shutdown_state == _queue_shutdown_immediate: + if self.shutdown_state is _QueueState.SHUTDOWN_IMMEDIATE: raise ShutDown item = self._get() self.not_full.notify() @@ -243,7 +247,7 @@ def shutdown(self, immediate=False): ''' with self.mutex: if immediate: - self.shutdown_state = _queue_shutdown_immediate + self.shutdown_state = _QueueState.SHUTDOWN_IMMEDIATE self.not_empty.notify_all() # set self.unfinished_tasks to 0 # to break the loop in 'self.join()' @@ -251,7 +255,7 @@ def shutdown(self, immediate=False): self.unfinished_tasks = 0 self.all_tasks_done.notify_all() else: - self.shutdown_state = _queue_shutdown + self.shutdown_state = _QueueState.SHUTDOWN self.not_full.notify_all() # Override these methods to implement other queue organizations From 4b127b6376b887a04d975898d24a8284ff0dac44 Mon Sep 17 00:00:00 2001 From: Duprat Date: Fri, 10 Feb 2023 19:21:10 +0100 Subject: [PATCH 06/15] replace global state variable with an enum `_QueueState` - erase E just line below --- Lib/queue.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/Lib/queue.py b/Lib/queue.py index e3c2f01fd57df81..9481a91feb4ff6e 100644 --- a/Lib/queue.py +++ b/Lib/queue.py @@ -35,8 +35,6 @@ class _QueueState(enum.Enum): SHUTDOWN = "shutdown" SHUTDOWN_IMMEDIATE = "shutdown_immediate" -E - class Queue: '''Create a queue object with a given maximum size. From 6402de7ff7c0a9186d8b4c72a572c2b1af0cefaa Mon Sep 17 00:00:00 2001 From: Duprat Date: Sat, 11 Feb 2023 17:02:37 +0100 Subject: [PATCH 07/15] simplify and unify tests --- Lib/test/test_asyncio/test_queues.py | 91 +++++++++++++++++++--------- 1 file changed, 62 insertions(+), 29 deletions(-) diff --git a/Lib/test/test_asyncio/test_queues.py b/Lib/test/test_asyncio/test_queues.py index 718b1ef5c776340..a1a6cd3e0c9c7b4 100644 --- a/Lib/test/test_asyncio/test_queues.py +++ b/Lib/test/test_asyncio/test_queues.py @@ -570,11 +570,12 @@ async def test_shutdown_repr(self): self.assertIn("shutdown-immediate", repr(q)) async def test_get_shutdown_immediate(self): + q = self.q_class() results = [] - maxsize = 2 - delay = 1e-3 + go = asyncio.Event() - async def get_once(q): + async def get_once(q, go): + await go.wait() try: msg = await q.get() results.append(False) @@ -582,29 +583,36 @@ async def get_once(q): results.append(True) return True - async def shutdown(q, delay, immediate): - await asyncio.sleep(delay) + async def shutdown(q, go, immediate): q.shutdown(immediate) + go.set() return True - q = self.q_class(maxsize) - t = [asyncio.create_task(get_once(q)) for _ in range(maxsize)] - t += [asyncio.create_task(shutdown(q, delay, True))] + tasks = ( + (get_once, (q, go)), + (get_once, (q, go)), + ) + t = [] + for coro, params in tasks: + t.append(asyncio.create_task(coro(*params))) + t.append(asyncio.create_task(shutdown(q, go, True))) res = await asyncio.gather(*t) - self.assertEqual(results, [True]*maxsize) + self.assertEqual(results, [True]*len(tasks)) - async def test_put_shutdown(self): - maxsize = 2 + async def _put_shutdown(self, immediate): + q = self.q_class(2) results = [] go = asyncio.Event() + await q.put("Y") + await q.put("D") + # queue fulled - async def put_twice(q, go, msg): - await q.put(msg) + async def put_once(q, go, msg): await go.wait() try: - await q.put(msg+maxsize) + await q.put(msg) results.append(False) except asyncio.QueueShutDown: results.append(True) @@ -614,24 +622,37 @@ async def shutdown(q, go, immediate): q.shutdown(immediate) go.set() - q = self.q_class(maxsize) - t = [asyncio.create_task(put_twice(q, go, i+1)) for i in range(maxsize)] - t += [asyncio.create_task(shutdown(q, go, False))] + tasks = ( + (put_once, (q, go, 100)), + (put_once, (q, go, 200)), + ) + t = [] + for coro, params in tasks: + t.append(asyncio.create_task(coro(*params))) + t.append(asyncio.create_task(shutdown(q, go, immediate))) res = await asyncio.gather(*t) - self.assertEqual(results, [True]*maxsize) + self.assertEqual(results, [True]*len(tasks)) + async def test_put_shutdown(self): + return await self._put_shutdown(False) - async def test_put_and_join_shutdown(self): - maxsize = 2 + async def test_put_shutdown_immediate(self): + return await self._put_shutdown(True) + + + async def _put_and_join_shutdown(self, immediate): + q = self.q_class(2) results = [] go = asyncio.Event() + await q.put("Y") + await q.put("D") + # queue fulled - async def put_twice(q, go, msg): - await q.put(msg) + async def put_once(q, go, msg): await go.wait() try: - await q.put(msg+100) + await q.put(msg) results.append(False) except asyncio.QueueShutDown: results.append(True) @@ -641,19 +662,31 @@ async def shutdown(q, go, immediate): q.shutdown(immediate) go.set() - async def join(q, delay): + async def join(q, go): await go.wait() await q.join() results.append(True) return True - q = self.q_class(maxsize) - t = [asyncio.create_task(put_twice(q, go, i+1)) for i in range(maxsize)] - t += [asyncio.create_task(shutdown(q, go, True)), - asyncio.create_task(join(q, go))] + tasks = ( + (put_once, (q, go, 'E')), + (put_once, (q, go, 'W')), + (join, (q, go)), + (join, (q, go)), + ) + t = [] + for coro, params in tasks: + t.append(asyncio.create_task(coro(*params))) + t.append(asyncio.create_task(shutdown(q, go, immediate))) res = await asyncio.gather(*t) - self.assertEqual(results, [True]*(maxsize+1)) + self.assertEqual(results, [True]*len(tasks)) + + async def test_put_and_join_shutdown(self): + return await self._put_and_join_shutdown(False) + + async def test_put_and_join_shutdown_immediate(self): + return await self._put_and_join_shutdown(True) class QueueShutdownTests( From be9588bfece569ee5cb8e2932b41495b80bd11c0 Mon Sep 17 00:00:00 2001 From: Duprat Date: Sat, 11 Feb 2023 17:02:53 +0100 Subject: [PATCH 08/15] simplify and unify tests --- Lib/test/test_queue.py | 54 +++++++++++++++++++++++------------------- 1 file changed, 29 insertions(+), 25 deletions(-) diff --git a/Lib/test/test_queue.py b/Lib/test/test_queue.py index 7b377864308dda7..55a02e82e157959 100644 --- a/Lib/test/test_queue.py +++ b/Lib/test/test_queue.py @@ -282,20 +282,20 @@ def test_get_shutdown(self): go = threading.Event() def get_once(q, go): + go.wait() try: - go.wait() msg = q.get() results.append(False) except self.queue.ShutDown: results.append(True) return True - tests = ( + thrds = ( (get_once, (q, go)), (get_once, (q, go)), ) threads = [] - for f, params in tests: + for f, params in thrds: thread = threading.Thread(target=f, args=params) thread.start() threads.append(thread) @@ -304,15 +304,17 @@ def get_once(q, go): for t in threads: t.join() - self.assertEqual(results, [True]*len(tests)) + self.assertEqual(results, [True]*len(thrds)) def test_put_shutdown(self): q = self.type2test(2) results = [] go = threading.Event() + q.put("Y") + q.put("D") + # queue fulled - def put_twice(q, msg, go): - q.put(msg) + def put_once(q, msg, go): go.wait() try: q.put(msg) @@ -321,12 +323,12 @@ def put_twice(q, msg, go): results.append(True) return msg - tests = ( - (put_twice, (q, 100, go)), - (put_twice, (q, 200, go)), + thrds = ( + (put_once, (q, 100, go)), + (put_once, (q, 200, go)), ) threads = [] - for f, params in tests: + for f, params in thrds: thread = threading.Thread(target=f, args=params) thread.start() threads.append(thread) @@ -335,7 +337,7 @@ def put_twice(q, msg, go): for t in threads: t.join() - self.assertEqual(results, [True]*len(tests)) + self.assertEqual(results, [True]*len(thrds)) def _join_shutdown(self, immediate): q = self.type2test() @@ -347,12 +349,12 @@ def join(q, go): q.join() results.append(True) - tests = ( + thrds = ( (join, (q, go)), (join, (q, go)), ) threads = [] - for f, params in tests: + for f, params in thrds: thread = threading.Thread(target=f, args=params) thread.start() threads.append(thread) @@ -361,7 +363,7 @@ def join(q, go): for t in threads: t.join() - self.assertEqual(results, [True]*len(tests)) + self.assertEqual(results, [True]*len(thrds)) def test_join_shutdown_immediate(self): return self._join_shutdown(True) @@ -373,9 +375,11 @@ def _put_and_join_shutdown(self, immediate): q = self.type2test(2) results = [] go = threading.Event() + q.put("Y") + q.put("D") + # queue fulled - def put_twice(q, msg, go): - q.put(msg) + def put_once(q, msg, go): go.wait() try: q.put(msg) @@ -389,14 +393,14 @@ def join(q, go): q.join() results.append(True) - tests = ( - (put_twice, (q, 100, go)), - (put_twice, (q, 200, go)), + thrds = ( + (put_once, (q, 100, go)), + (put_once, (q, 200, go)), (join, (q, go)), (join, (q, go)), ) threads = [] - for f, params in tests: + for f, params in thrds: thread = threading.Thread(target=f, args=params) thread.start() threads.append(thread) @@ -412,7 +416,7 @@ def join(q, go): for t in threads: t.join() - self.assertEqual(results, [True]*len(tests)) + self.assertEqual(results, [True]*len(thrds)) def test_put_and_join_shutdown_immediate(self): return self._put_and_join_shutdown(True) @@ -426,8 +430,8 @@ def _get_and_join_shutdown(self, immediate): go = threading.Event() def get_once(q, go): + go.wait() try: - go.wait() msg = q.get() results.append(False) except self.queue.ShutDown: @@ -439,14 +443,14 @@ def join(q, go): q.join() results.append(True) - tests = ( + thrds = ( (get_once, (q, go)), (get_once, (q, go)), (join, (q, go)), (join, (q, go)), ) threads = [] - for f, params in tests: + for f, params in thrds: thread = threading.Thread(target=f, args=params) thread.start() threads.append(thread) @@ -455,7 +459,7 @@ def join(q, go): for t in threads: t.join() - self.assertEqual(results, [True]*len(tests)) + self.assertEqual(results, [True]*len(thrds)) def test_get_and_join_shutdown_immediate(self): return self._get_and_join_shutdown(True) From 3613f5d4a2c600ab096d37bf3990a1c2b2fde8d6 Mon Sep 17 00:00:00 2001 From: Duprat Date: Mon, 13 Feb 2023 17:34:55 +0100 Subject: [PATCH 09/15] add `_shutdown_state` to tuples of `__getstate__` and `__setstate__`, add an empty `shutdown` method --- Lib/multiprocessing/queues.py | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/Lib/multiprocessing/queues.py b/Lib/multiprocessing/queues.py index 26b212d6a50610f..bfaf81e2dc66163 100644 --- a/Lib/multiprocessing/queues.py +++ b/Lib/multiprocessing/queues.py @@ -65,11 +65,13 @@ def __init__(self, maxsize=0, *, ctx): def __getstate__(self): context.assert_spawning(self) return (self._ignore_epipe, self._maxsize, self._reader, self._writer, - self._rlock, self._wlock, self._sem, self._opid) + self._rlock, self._wlock, self._sem, self._opid, + self._shutdown_state) def __setstate__(self, state): (self._ignore_epipe, self._maxsize, self._reader, self._writer, - self._rlock, self._wlock, self._sem, self._opid) = state + self._rlock, self._wlock, self._sem, self._opid, + self._shutdown_state) = state self._reset() def _after_fork(self): @@ -159,6 +161,9 @@ def get_nowait(self): def put_nowait(self, obj): return self.put(obj, False) + def shutdown(self, immediate=True): + pass + def close(self): self._closed = True close = self._close From 06775bb94ecff21c3d83a473bc8db592a7f8a0c4 Mon Sep 17 00:00:00 2001 From: Duprat Date: Wed, 15 Feb 2023 15:15:27 +0100 Subject: [PATCH 10/15] Update initial tests with `self.assertRaises` Add an unittest `test_shutdown`transition --- Lib/test/test_asyncio/test_queues.py | 34 +++++++++++++++------------- Lib/test/test_queue.py | 34 +++++++++++++++------------- 2 files changed, 36 insertions(+), 32 deletions(-) diff --git a/Lib/test/test_asyncio/test_queues.py b/Lib/test/test_asyncio/test_queues.py index a1a6cd3e0c9c7b4..7c882f62ef1038b 100644 --- a/Lib/test/test_asyncio/test_queues.py +++ b/Lib/test/test_asyncio/test_queues.py @@ -528,37 +528,25 @@ class _QueueShutdownTestMixin: async def test_empty(self): q = self.q_class() q.shutdown() - try: + with self.assertRaises(asyncio.QueueShutDown): await q.put("data") - self.fail("Didn't appear to shut-down queue") - except asyncio.QueueShutDown: - pass - try: + with self.assertRaises(asyncio.QueueShutDown): await q.get() - self.fail("Didn't appear to shut-down queue") - except asyncio.QueueShutDown: - pass async def test_nonempty(self): q = self.q_class() q.put_nowait("data") q.shutdown() await q.get() - try: + with self.assertRaises(asyncio.QueueShutDown): await q.get() - self.fail("Didn't appear to shut-down queue") - except asyncio.QueueShutDown: - pass async def test_immediate(self): q = self.q_class() q.put_nowait("data") q.shutdown(immediate=True) - try: + with self.assertRaises(asyncio.QueueShutDown): await q.get() - self.fail("Didn't appear to shut-down queue") - except asyncio.QueueShutDown: - pass async def test_shutdown_repr(self): q = self.q_class() @@ -569,6 +557,20 @@ async def test_shutdown_repr(self): q.shutdown(immediate=True) self.assertIn("shutdown-immediate", repr(q)) + async def test_shutdown_transition(self): + # allowed transitions would be from alive via shutdown to immediate + q = self.q_class() + self.assertEqual("alive", q._shutdown_state.value) + + q.shutdown() + self.assertEqual("shutdown", q._shutdown_state.value) + + q.shutdown(immediate=True) + self.assertEqual("shutdown-immediate", q._shutdown_state.value) + + q.shutdown() + self.assertEqual("shutdown-immediate", q._shutdown_state.value) + async def test_get_shutdown_immediate(self): q = self.q_class() results = [] diff --git a/Lib/test/test_queue.py b/Lib/test/test_queue.py index 55a02e82e157959..b25453f9203b5e6 100644 --- a/Lib/test/test_queue.py +++ b/Lib/test/test_queue.py @@ -244,37 +244,39 @@ def test_shrinking_queue(self): def test_shutdown_empty(self): q = self.type2test() q.shutdown() - try: + with self.assertRaises(self.queue.ShutDown): q.put("data") - self.fail("Didn't appear to shut-down queue") - except self.queue.ShutDown: - pass - try: + with self.assertRaises(self.queue.ShutDown): q.get() - self.fail("Didn't appear to shut-down queue") - except self.queue.ShutDown: - pass def test_shutdown_nonempty(self): q = self.type2test() q.put("data") q.shutdown() q.get() - try: + with self.assertRaises(self.queue.ShutDown): q.get() - self.fail("Didn't appear to shut-down queue") - except self.queue.ShutDown: - pass def test_shutdown_immediate(self): q = self.type2test() q.put("data") q.shutdown(immediate=True) - try: + with self.assertRaises(self.queue.ShutDown): q.get() - self.fail("Didn't appear to shut-down queue") - except self.queue.ShutDown: - pass + + def test_shutdown_transition(self): + # allowed transitions would be from alive via shutdown to immediate + q = self.type2test() + self.assertEqual("alive", q.shutdown_state.value) + + q.shutdown() + self.assertEqual("shutdown", q.shutdown_state.value) + + q.shutdown(immediate=True) + self.assertEqual("shutdown-immediate", q.shutdown_state.value) + + q.shutdown(immediate=False) + self.assertEqual("shutdown-immediate", q.shutdown_state.value) def test_get_shutdown(self): q = self.type2test(2) From 6f0101567e493b88d4272fcb3f8e2c55afc63510 Mon Sep 17 00:00:00 2001 From: Duprat Date: Wed, 15 Feb 2023 15:18:11 +0100 Subject: [PATCH 11/15] integration of shudown transition in `shutdown` method Change "shutdown_immediate" to "shutdown-immediate" --- Lib/asyncio/queues.py | 7 +++++-- Lib/queue.py | 5 ++++- 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/Lib/asyncio/queues.py b/Lib/asyncio/queues.py index 5b58f753a0fac7d..f4e1ba58866e532 100644 --- a/Lib/asyncio/queues.py +++ b/Lib/asyncio/queues.py @@ -261,6 +261,9 @@ def shutdown(self, immediate=False): All blocked callers of put() will be unblocked, and also get() and join() if 'immediate'. The QueueShutDown exception is raised. """ + if self._shutdown_state is _QueueState.SHUTDOWN_IMMEDIATE: + return + if immediate: self._shutdown_state = _QueueState.SHUTDOWN_IMMEDIATE while self._getters: @@ -268,12 +271,12 @@ def shutdown(self, immediate=False): if not getter.done(): getter.set_result(None) else: - self._shutdown_state = _QueueState.SHUTDOWN + self._shutdown_state = _QueueState.SHUTDOWN while self._putters: putter = self._putters.popleft() if not putter.done(): putter.set_result(None) - # Release 'joined' tasks/coros + # Release 'blocked' tasks/coros via `.join()` self._finished.set() diff --git a/Lib/queue.py b/Lib/queue.py index 9481a91feb4ff6e..933f18062050726 100644 --- a/Lib/queue.py +++ b/Lib/queue.py @@ -33,7 +33,7 @@ class ShutDown(Exception): class _QueueState(enum.Enum): ALIVE = "alive" SHUTDOWN = "shutdown" - SHUTDOWN_IMMEDIATE = "shutdown_immediate" + SHUTDOWN_IMMEDIATE = "shutdown-immediate" class Queue: @@ -244,6 +244,9 @@ def shutdown(self, immediate=False): and join() if 'immediate'. The ShutDown exception is raised. ''' with self.mutex: + if self.shutdown_state is _QueueState.SHUTDOWN_IMMEDIATE: + return + if immediate: self.shutdown_state = _QueueState.SHUTDOWN_IMMEDIATE self.not_empty.notify_all() From ff9895dd212a9d51a0d7bcfbca9d223305b69344 Mon Sep 17 00:00:00 2001 From: Duprat Date: Wed, 15 Feb 2023 15:45:50 +0100 Subject: [PATCH 12/15] Set `test_shutdown` prefix to all unittests --- Lib/test/test_asyncio/test_queues.py | 22 +++++++++--------- Lib/test/test_queue.py | 34 ++++++++++++++-------------- 2 files changed, 28 insertions(+), 28 deletions(-) diff --git a/Lib/test/test_asyncio/test_queues.py b/Lib/test/test_asyncio/test_queues.py index 7c882f62ef1038b..f2c329c94b79973 100644 --- a/Lib/test/test_asyncio/test_queues.py +++ b/Lib/test/test_asyncio/test_queues.py @@ -571,7 +571,7 @@ async def test_shutdown_transition(self): q.shutdown() self.assertEqual("shutdown-immediate", q._shutdown_state.value) - async def test_get_shutdown_immediate(self): + async def test_shutdown_immediate_get(self): q = self.q_class() results = [] go = asyncio.Event() @@ -603,7 +603,7 @@ async def shutdown(q, go, immediate): self.assertEqual(results, [True]*len(tasks)) - async def _put_shutdown(self, immediate): + async def _shutdown_put(self, immediate): q = self.q_class(2) results = [] go = asyncio.Event() @@ -636,14 +636,14 @@ async def shutdown(q, go, immediate): self.assertEqual(results, [True]*len(tasks)) - async def test_put_shutdown(self): - return await self._put_shutdown(False) + async def test_shutdown_put(self): + return await self._shutdown_put(False) - async def test_put_shutdown_immediate(self): - return await self._put_shutdown(True) + async def test_shutdown_immediate_put(self): + return await self._shutdown_put(True) - async def _put_and_join_shutdown(self, immediate): + async def _shutdown_put_and_join(self, immediate): q = self.q_class(2) results = [] go = asyncio.Event() @@ -684,11 +684,11 @@ async def join(q, go): self.assertEqual(results, [True]*len(tasks)) - async def test_put_and_join_shutdown(self): - return await self._put_and_join_shutdown(False) + async def test_shutdown_put_and_join(self): + return await self._shutdown_put_and_join(False) - async def test_put_and_join_shutdown_immediate(self): - return await self._put_and_join_shutdown(True) + async def test_shutdown_immediate_put_and_join(self): + return await self._shutdown_put_and_join(True) class QueueShutdownTests( diff --git a/Lib/test/test_queue.py b/Lib/test/test_queue.py index b25453f9203b5e6..a2814ac40f2e181 100644 --- a/Lib/test/test_queue.py +++ b/Lib/test/test_queue.py @@ -278,7 +278,7 @@ def test_shutdown_transition(self): q.shutdown(immediate=False) self.assertEqual("shutdown-immediate", q.shutdown_state.value) - def test_get_shutdown(self): + def test_shutdown_get(self): q = self.type2test(2) results = [] go = threading.Event() @@ -308,7 +308,7 @@ def get_once(q, go): self.assertEqual(results, [True]*len(thrds)) - def test_put_shutdown(self): + def test_shutdown_put(self): q = self.type2test(2) results = [] go = threading.Event() @@ -341,7 +341,7 @@ def put_once(q, msg, go): self.assertEqual(results, [True]*len(thrds)) - def _join_shutdown(self, immediate): + def _shutdown_join(self, immediate): q = self.type2test() results = [] go = threading.Event() @@ -367,13 +367,13 @@ def join(q, go): self.assertEqual(results, [True]*len(thrds)) - def test_join_shutdown_immediate(self): - return self._join_shutdown(True) + def test_shutdown_immediate_join(self): + return self._shutdown_join(True) - def test_join_shutdown(self): - return self._join_shutdown(False) + def test_shutdown_join(self): + return self._shutdown_join(False) - def _put_and_join_shutdown(self, immediate): + def _shutdown_put_and_join(self, immediate): q = self.type2test(2) results = [] go = threading.Event() @@ -420,13 +420,13 @@ def join(q, go): self.assertEqual(results, [True]*len(thrds)) - def test_put_and_join_shutdown_immediate(self): - return self._put_and_join_shutdown(True) + def test_shutdown_immediate_put_and_join(self): + return self._shutdown_put_and_join(True) - def test_put_and_join_shutdown(self): - return self._put_and_join_shutdown(False) + def test_shutdown_put_and_join(self): + return self._shutdown_put_and_join(False) - def _get_and_join_shutdown(self, immediate): + def _shutdown_get_and_join(self, immediate): q = self.type2test() results = [] go = threading.Event() @@ -463,11 +463,11 @@ def join(q, go): self.assertEqual(results, [True]*len(thrds)) - def test_get_and_join_shutdown_immediate(self): - return self._get_and_join_shutdown(True) + def test_shutdown_immediate_get_and_join(self): + return self._shutdown_get_and_join(True) - def test_get_and_join_shutdown(self): - return self._get_and_join_shutdown(False) + def test__shutdown_get_and_join(self): + return self._shutdown_get_and_join(False) class QueueTest(BaseQueueTestMixin): From d42433eed43e86424437936537c59f8eab5d49e2 Mon Sep 17 00:00:00 2001 From: Duprat Date: Tue, 28 Feb 2023 17:45:08 +0100 Subject: [PATCH 13/15] asyncio.queue: refactoring of tests, add new tests, last updates and corrections --- Lib/asyncio/queues.py | 16 +- Lib/test/test_asyncio/test_queues.py | 335 ++++++++++++++++++++------- 2 files changed, 264 insertions(+), 87 deletions(-) diff --git a/Lib/asyncio/queues.py b/Lib/asyncio/queues.py index f4e1ba58866e532..095f8ad0d2c3a7d 100644 --- a/Lib/asyncio/queues.py +++ b/Lib/asyncio/queues.py @@ -202,7 +202,7 @@ async def get(self): # the call. Wake up the next in line. self._wakeup_next(self._getters) raise - if self._shutdown_state is not _QueueState.ALIVE: + if self._shutdown_state is _QueueState.SHUTDOWN_IMMEDIATE: raise QueueShutDown return self.get_nowait() @@ -235,6 +235,8 @@ def task_done(self): Raises ValueError if called more times than there were items placed in the queue. """ + if self._shutdown_state is _QueueState.SHUTDOWN_IMMEDIATE: + raise QueueShutDown if self._unfinished_tasks <= 0: raise ValueError('task_done() called too many times') self._unfinished_tasks -= 1 @@ -249,8 +251,12 @@ async def join(self): indicate that the item was retrieved and all work on it is complete. When the count of unfinished tasks drops to zero, join() unblocks. """ + if self._shutdown_state is _QueueState.SHUTDOWN_IMMEDIATE: + raise QueueShutDown if self._unfinished_tasks > 0: await self._finished.wait() + if self._shutdown_state is _QueueState.SHUTDOWN_IMMEDIATE: + raise QueueShutDown def shutdown(self, immediate=False): """Shut-down the queue, making queue gets and puts raise. @@ -263,21 +269,21 @@ def shutdown(self, immediate=False): """ if self._shutdown_state is _QueueState.SHUTDOWN_IMMEDIATE: return - + # here _shutdown_state is ALIVE or SHUTDOWN if immediate: self._shutdown_state = _QueueState.SHUTDOWN_IMMEDIATE while self._getters: getter = self._getters.popleft() if not getter.done(): getter.set_result(None) - else: + # Release 'blocked' tasks/coros via `.join()` + self._finished.set() + elif self._shutdown_state is _QueueState.ALIVE: # here self._shutdown_state = _QueueState.SHUTDOWN while self._putters: putter = self._putters.popleft() if not putter.done(): putter.set_result(None) - # Release 'blocked' tasks/coros via `.join()` - self._finished.set() class PriorityQueue(Queue): diff --git a/Lib/test/test_asyncio/test_queues.py b/Lib/test/test_asyncio/test_queues.py index f2c329c94b79973..16b30a5126feda3 100644 --- a/Lib/test/test_asyncio/test_queues.py +++ b/Lib/test/test_asyncio/test_queues.py @@ -525,12 +525,114 @@ class PriorityQueueJoinTests(_QueueJoinTestMixin, unittest.IsolatedAsyncioTestCa class _QueueShutdownTestMixin: q_class = None + async def _get(self, q, go, results): + await go.wait() + try: + msg = await q.get() + results.append(True) + return msg + except asyncio.QueueShutDown: + results.append(False) + return False + + async def _get_shutdown(self, q, go, results): + await go.wait() + try: + msg = await q.get() + results.append(False) + return msg + except asyncio.QueueShutDown: + results.append(True) + return False + + async def _get_nowait(self, q, go, results): + await go.wait() + try: + msg = q.get_nowait() + results.append(True) + return msg + except asyncio.QueueShutDown: + results.append(False) + return False + + async def _get_task_done(self, q, go, results): + await go.wait() + try: + msg = await q.get() + q.task_done() + results.append(True) + return msg + except asyncio.QueueShutDown: + results.append(False) + return False + + async def _get_nowait_shutdown(self, q, go, results): + await go.wait() + try: + msg = q.get_nowait() + results.append(False) + except asyncio.QueueShutDown: + results.append(True) + return True + + async def _put_shutdown(self, q, go, msg, results): + await go.wait() + try: + await q.put(msg) + results.append(False) + except asyncio.QueueShutDown: + results.append(True) + return msg + + async def _put_nowait_shutdown(self, q, go, msg, results): + await go.wait() + try: + q.put_nowait(msg) + results.append(False) + except asyncio.QueueShutDown: + results.append(True) + return msg + + async def _shutdown(self, q, go, immediate): + await asyncio.sleep(0.001) + q.shutdown(immediate) + await asyncio.sleep(0.001) + go.set() + await asyncio.sleep(0.001) + + async def _join(self, q, go, results): + await go.wait() + try: + await q.join() + results.append(True) + return True + except asyncio.QueueShutDown: + results.append(False) + return False + + async def _join_shutdown(self, q, go, results): + await go.wait() + try: + await q.join() + results.append(False) + return False + except asyncio.QueueShutDown: + results.append(True) + return True + except asyncio.CancelledError: + results.append(True) + raise + async def test_empty(self): q = self.q_class() q.shutdown() - with self.assertRaises(asyncio.QueueShutDown): + with self.assertRaises( + asyncio.QueueShutDown, msg="Didn't appear to shut-down queue" + ): await q.put("data") - with self.assertRaises(asyncio.QueueShutDown): + with self.assertRaises( + asyncio.QueueShutDown, msg="Didn't appear to shut-down queue" + ): await q.get() async def test_nonempty(self): @@ -538,18 +640,24 @@ async def test_nonempty(self): q.put_nowait("data") q.shutdown() await q.get() - with self.assertRaises(asyncio.QueueShutDown): + with self.assertRaises( + asyncio.QueueShutDown, msg="Didn't appear to shut-down queue" + ): await q.get() async def test_immediate(self): q = self.q_class() q.put_nowait("data") q.shutdown(immediate=True) - with self.assertRaises(asyncio.QueueShutDown): + with self.assertRaises( + asyncio.QueueShutDown, msg="Didn't appear to shut-down queue" + ): await q.get() async def test_shutdown_repr(self): q = self.q_class() + self.assertNotIn("alive", repr(q)) + q.shutdown() self.assertIn("shutdown", repr(q)) @@ -557,7 +665,7 @@ async def test_shutdown_repr(self): q.shutdown(immediate=True) self.assertIn("shutdown-immediate", repr(q)) - async def test_shutdown_transition(self): + async def test_shutdown_allowed_transitions(self): # allowed transitions would be from alive via shutdown to immediate q = self.q_class() self.assertEqual("alive", q._shutdown_state.value) @@ -568,73 +676,142 @@ async def test_shutdown_transition(self): q.shutdown(immediate=True) self.assertEqual("shutdown-immediate", q._shutdown_state.value) - q.shutdown() - self.assertEqual("shutdown-immediate", q._shutdown_state.value) + q.shutdown(immediate=False) + self.assertNotEqual("shutdown", q._shutdown_state.value) - async def test_shutdown_immediate_get(self): - q = self.q_class() + async def _shutdown_putters(self, immediate): + delay = 0.001 + q = self.q_class(2) results = [] - go = asyncio.Event() + await q.put("E") + await q.put("W") + # queue full + t = asyncio.create_task(q.put("Y")) + await asyncio.sleep(delay) + self.assertTrue(len(q._putters) == 1) + with self.assertRaises(asyncio.QueueShutDown): + # here `t` raises a QueueShuDown + q.shutdown(immediate) + await t + self.assertTrue(not q._putters) + + async def test_shutdown_putters_deque(self): + return await self._shutdown_putters(False) - async def get_once(q, go): - await go.wait() - try: - msg = await q.get() - results.append(False) - except asyncio.QueueShutDown: - results.append(True) - return True + async def test_shutdown_immediate_putters_deque(self): + return await self._shutdown_putters(True) - async def shutdown(q, go, immediate): + async def _shutdown_getters(self, immediate): + delay = 0.001 + q = self.q_class(1) + results = [] + await q.put("Y") + # queue full + asyncio.create_task(q.get()) + await asyncio.sleep(delay) + t = asyncio.create_task(q.get()) + await asyncio.sleep(delay) + self.assertTrue(len(q._getters) == 1) + if immediate: + # here `t` raises a QueueShuDown + with self.assertRaises(asyncio.QueueShutDown): + q.shutdown(immediate) + await t + self.assertTrue(not q._getters) + else: + # here `t` is always pending q.shutdown(immediate) - go.set() - return True + await asyncio.sleep(delay) + self.assertTrue(q._getters) - tasks = ( - (get_once, (q, go)), - (get_once, (q, go)), - ) + async def test_shutdown_getters_deque(self): + return await self._shutdown_getters(False) + + async def test_shutdown_immediate_getters_deque(self): + return await self._shutdown_getters(True) + + async def _shutdown_get_nowait(self, immediate): + q = self.q_class(2) + results = [] + go = asyncio.Event() + await q.put("Y") + await q.put("D") + nb = q.qsize() + # queue full + + if immediate: + coros = ( + (self._get_nowait_shutdown(q, go, results)), + (self._get_nowait_shutdown(q, go, results)), + ) + else: + coros = ( + (self._get_nowait(q, go, results)), + (self._get_nowait(q, go, results)), + ) t = [] - for coro, params in tasks: - t.append(asyncio.create_task(coro(*params))) - t.append(asyncio.create_task(shutdown(q, go, True))) + for coro in coros: + t.append(asyncio.create_task(coro)) + t.append(asyncio.create_task(self._shutdown(q, go, immediate))) res = await asyncio.gather(*t) - self.assertEqual(results, [True]*len(tasks)) + self.assertEqual(results, [True]*len(coros)) + self.assertEqual(len(q._putters), 0) + if immediate: + self.assertEqual(len(q._getters), 0) + self.assertEqual(q._unfinished_tasks, nb) + async def test_shutdown_get_nowait(self): + return await self._shutdown_get_nowait(False) - async def _shutdown_put(self, immediate): + async def test_shutdown_immediate_get_nowait(self): + return await self._shutdown_get_nowait(True) + + async def test_shutdown_get_task_done_join(self, immediate=False): q = self.q_class(2) results = [] go = asyncio.Event() await q.put("Y") await q.put("D") - # queue fulled + self.assertEqual(q._unfinished_tasks, q.qsize()) - async def put_once(q, go, msg): - await go.wait() - try: - await q.put(msg) - results.append(False) - except asyncio.QueueShutDown: - results.append(True) - return msg + # queue full - async def shutdown(q, go, immediate): - q.shutdown(immediate) - go.set() + coros = ( + (self._get_task_done(q, go, results)), + (self._get_task_done(q, go, results)), + (self._join(q, go, results)), + (self._join(q, go, results)), + ) + t = [] + for coro in coros: + t.append(asyncio.create_task(coro)) + t.append(asyncio.create_task(self._shutdown(q, go, False))) + res = await asyncio.gather(*t) + + self.assertEqual(results, [True]*len(coros)) + self.assertIn(t[0].result(), "YD") + self.assertIn(t[1].result(), "YD") + self.assertNotEqual(t[0].result(), t[1].result()) + self.assertEqual(q._unfinished_tasks, 0) + + async def _shutdown_put(self, immediate): + q = self.q_class() + results = [] + go = asyncio.Event() + # queue not empty - tasks = ( - (put_once, (q, go, 100)), - (put_once, (q, go, 200)), + coros = ( + (self._put_shutdown(q, go, "Y", results)), + (self._put_nowait_shutdown(q, go, "D", results)), ) t = [] - for coro, params in tasks: - t.append(asyncio.create_task(coro(*params))) - t.append(asyncio.create_task(shutdown(q, go, immediate))) + for coro in coros: + t.append(asyncio.create_task(coro)) + t.append(asyncio.create_task(self._shutdown(q, go, immediate))) res = await asyncio.gather(*t) - self.assertEqual(results, [True]*len(tasks)) + self.assertEqual(results, [True]*len(coros)) async def test_shutdown_put(self): return await self._shutdown_put(False) @@ -642,53 +819,47 @@ async def test_shutdown_put(self): async def test_shutdown_immediate_put(self): return await self._shutdown_put(True) - - async def _shutdown_put_and_join(self, immediate): + async def _shutdown_put_join(self, immediate): q = self.q_class(2) results = [] go = asyncio.Event() await q.put("Y") await q.put("D") + nb = q.qsize() # queue fulled - async def put_once(q, go, msg): - await go.wait() - try: - await q.put(msg) - results.append(False) - except asyncio.QueueShutDown: - results.append(True) - return msg - - async def shutdown(q, go, immediate): - q.shutdown(immediate) - go.set() - - async def join(q, go): - await go.wait() - await q.join() - results.append(True) - return True + async def _cancel_join_task(q, delay, t): + await asyncio.sleep(delay) + t.cancel() + await asyncio.sleep(0) + q._finished.set() - tasks = ( - (put_once, (q, go, 'E')), - (put_once, (q, go, 'W')), - (join, (q, go)), - (join, (q, go)), + coros = ( + (self._put_shutdown(q, go, "E", results)), + (self._put_nowait_shutdown(q, go, "W", results)), + (self._join_shutdown(q, go, results)), ) t = [] - for coro, params in tasks: - t.append(asyncio.create_task(coro(*params))) - t.append(asyncio.create_task(shutdown(q, go, immediate))) - res = await asyncio.gather(*t) - - self.assertEqual(results, [True]*len(tasks)) + for coro in coros: + t.append(asyncio.create_task(coro)) + t.append(asyncio.create_task(self._shutdown(q, go, immediate))) + if not immediate: + # Here calls `join` is a blocking operation + # so wait for a delay and cancel this blocked task + t.append(asyncio.create_task(_cancel_join_task(q, 0.01, t[2]))) + with self.assertRaises(asyncio.CancelledError) as e: + await asyncio.gather(*t) + else: + res = await asyncio.gather(*t) + + self.assertEqual(results, [True]*len(coros)) + self.assertTrue(q._finished.is_set()) async def test_shutdown_put_and_join(self): - return await self._shutdown_put_and_join(False) + return await self._shutdown_put_join(False) async def test_shutdown_immediate_put_and_join(self): - return await self._shutdown_put_and_join(True) + return await self._shutdown_put_join(True) class QueueShutdownTests( From 53078bbf9804ac8674c191af604fcd5f6a2d2f08 Mon Sep 17 00:00:00 2001 From: Duprat Date: Tue, 28 Feb 2023 18:28:59 +0100 Subject: [PATCH 14/15] first version working --- Lib/multiprocessing/queues.py | 37 ++++++++++++++++++++++++++--------- 1 file changed, 28 insertions(+), 9 deletions(-) diff --git a/Lib/multiprocessing/queues.py b/Lib/multiprocessing/queues.py index bfaf81e2dc66163..a629de6125eee0a 100644 --- a/Lib/multiprocessing/queues.py +++ b/Lib/multiprocessing/queues.py @@ -56,7 +56,7 @@ def __init__(self, maxsize=0, *, ctx): self._ignore_epipe = False self._reset() self._shutdown_state = context._default_context.Value( - ctypes.c_uint8, lock=self._rlock + ctypes.c_uint8, _queue_alive, lock=True ) if sys.platform != 'win32': @@ -102,18 +102,16 @@ def put(self, obj, block=True, timeout=None): raise Full with self._notempty: - if self._shutdown_state.value != _queue_alive: - raise ShutDown if self._thread is None: self._start_thread() self._buffer.append(obj) self._notempty.notify() def get(self, block=True, timeout=None): - if self._shutdown_state.value == _queue_shutdown_immediate: - raise ShutDown if self._closed: raise ValueError(f"Queue {self!r} is closed") + if self._shutdown_state.value != _queue_alive: + raise ShutDown if block and timeout is None: with self._rlock: if self._shutdown_state.value != _queue_alive: @@ -132,7 +130,7 @@ def get(self, block=True, timeout=None): if self._shutdown_state.value != _queue_alive: raise ShutDown raise Empty - if self._shutdown_state.value != _queue_alive : + if self._shutdown_state.value != _queue_alive: raise ShutDown elif not self._poll(): raise Empty @@ -162,7 +160,15 @@ def put_nowait(self, obj): return self.put(obj, False) def shutdown(self, immediate=True): - pass + with self._shutdown_state.get_lock(): + if self._shutdown_state.value == _queue_shutdown_immediate: + return + if immediate: + self._shutdown_state.value = _queue_shutdown_immediate + with self._notempty: + self._notempty.notify_all() # cf from @EpicWink + else: + self._shutdown_state.value = _queue_shutdown def close(self): self._closed = True @@ -335,6 +341,8 @@ def __setstate__(self, state): def put(self, obj, block=True, timeout=None): if self._closed: raise ValueError(f"Queue {self!r} is closed") + if self._shutdown_state.value != _queue_alive: + return if not self._sem.acquire(block, timeout): raise Full @@ -347,6 +355,8 @@ def put(self, obj, block=True, timeout=None): def task_done(self): with self._cond: + if self._shutdown_state.value != _queue_alive: + raise ShutDown if not self._unfinished_tasks.acquire(False): raise ValueError('task_done() called too many times') if self._unfinished_tasks._semlock._is_zero(): @@ -354,10 +364,19 @@ def task_done(self): def join(self): with self._cond: - if self._shutdown_state.value == _queue_shutdown_immediate: - return + if self._shutdown_state.value != _queue_alive: + raise ShutDown if not self._unfinished_tasks._semlock._is_zero(): self._cond.wait() + if self._shutdown_state.value == _queue_shutdown_immediate: + raise ShutDown + + def shutdown(self, immediate=True): + initial_shutdown = self._shutdown_state.value + super().shutdown(immediate) + if initial_shutdown == _queue_alive: + with self._cond: + self._cond.notify_all() # here to check YD # # Simplified Queue type -- really just a locked pipe From 0075039bdf0fbff57e5e5ca1263adb1e9f5f9e75 Mon Sep 17 00:00:00 2001 From: Duprat Date: Tue, 28 Feb 2023 19:00:59 +0100 Subject: [PATCH 15/15] Some corrections --- Lib/multiprocessing/queues.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/Lib/multiprocessing/queues.py b/Lib/multiprocessing/queues.py index a629de6125eee0a..9d76d8d389c6518 100644 --- a/Lib/multiprocessing/queues.py +++ b/Lib/multiprocessing/queues.py @@ -110,11 +110,11 @@ def put(self, obj, block=True, timeout=None): def get(self, block=True, timeout=None): if self._closed: raise ValueError(f"Queue {self!r} is closed") - if self._shutdown_state.value != _queue_alive: + if self._shutdown_state.value == _queue_shutdown_immediate: raise ShutDown if block and timeout is None: with self._rlock: - if self._shutdown_state.value != _queue_alive: + if self._shutdown_state.value == _queue_shutdown_immediate: raise ShutDown res = self._recv_bytes() self._sem.release() @@ -127,10 +127,10 @@ def get(self, block=True, timeout=None): if block: timeout = deadline - time.monotonic() if not self._poll(timeout): - if self._shutdown_state.value != _queue_alive: + if self._shutdown_state.value == _queue_shutdown_immediate: raise ShutDown raise Empty - if self._shutdown_state.value != _queue_alive: + if self._shutdown_state.value == _queue_shutdown_immediate: raise ShutDown elif not self._poll(): raise Empty @@ -138,7 +138,7 @@ def get(self, block=True, timeout=None): self._sem.release() finally: self._rlock.release() - if self._shutdown_state.value == _queue_shutdown: + if self._shutdown_state.value == _queue_shutdown_immediate: raise ShutDown # unserialize the data after having released the lock return _ForkingPickler.loads(res) @@ -342,7 +342,7 @@ def put(self, obj, block=True, timeout=None): if self._closed: raise ValueError(f"Queue {self!r} is closed") if self._shutdown_state.value != _queue_alive: - return + raise ShutDown if not self._sem.acquire(block, timeout): raise Full @@ -355,7 +355,7 @@ def put(self, obj, block=True, timeout=None): def task_done(self): with self._cond: - if self._shutdown_state.value != _queue_alive: + if self._shutdown_state.value == _queue_shutdown_immediate: raise ShutDown if not self._unfinished_tasks.acquire(False): raise ValueError('task_done() called too many times')