From 9bbc5db5b4678be31cd61603880f821632f5a89e Mon Sep 17 00:00:00 2001 From: Duprat Date: Fri, 10 Feb 2023 17:52:33 +0100 Subject: [PATCH 01/12] change comment --- Lib/queue.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/Lib/queue.py b/Lib/queue.py index f6af7cb6df5cffc..f08dbd47f188ee1 100644 --- a/Lib/queue.py +++ b/Lib/queue.py @@ -245,6 +245,10 @@ def shutdown(self, immediate=False): if immediate: self.shutdown_state = _queue_shutdown_immediate self.not_empty.notify_all() + # set self.unfinished_tasks to 0 + # to break the loop in 'self.join()' + # when quits from `wait()` + self.unfinished_tasks = 0 self.all_tasks_done.notify_all() else: self.shutdown_state = _queue_shutdown From f9f2c0629a5dd283e64f804f88a14d7e4d4655bd Mon Sep 17 00:00:00 2001 From: Duprat Date: Fri, 10 Feb 2023 17:57:21 +0100 Subject: [PATCH 02/12] call to self._finished.set() in order to release all joined tasks/coros --- Lib/asyncio/queues.py | 35 +++++++++++++++++++++-------------- 1 file changed, 21 insertions(+), 14 deletions(-) diff --git a/Lib/asyncio/queues.py b/Lib/asyncio/queues.py index a869993a1de3fe2..5b58f753a0fac7d 100644 --- a/Lib/asyncio/queues.py +++ b/Lib/asyncio/queues.py @@ -8,6 +8,7 @@ ) import collections +import enum import heapq from types import GenericAlias @@ -30,9 +31,10 @@ class QueueShutDown(Exception): pass -_queue_alive = "alive" -_queue_shutdown = "shutdown" -_queue_shutdown_immediate = "shutdown-immediate" +class _QueueState(enum.Enum): + ALIVE = "alive" + SHUTDOWN = "shutdown" + SHUTDOWN_IMMEDIATE = "shutdown-immediate" class Queue(mixins._LoopBoundMixin): @@ -58,7 +60,7 @@ def __init__(self, maxsize=0): self._finished = locks.Event() self._finished.set() self._init(maxsize) - self.shutdown_state = _queue_alive + self._shutdown_state = _QueueState.ALIVE # These three are overridable in subclasses. @@ -99,6 +101,8 @@ def _format(self): result += f' _putters[{len(self._putters)}]' if self._unfinished_tasks: result += f' tasks={self._unfinished_tasks}' + if self._shutdown_state is not _QueueState.ALIVE: + result += f' shutdown={self._shutdown_state.value}' return result def qsize(self): @@ -131,7 +135,7 @@ async def put(self, item): Put an item into the queue. If the queue is full, wait until a free slot is available before adding item. """ - if self.shutdown_state != _queue_alive: + if self._shutdown_state is not _QueueState.ALIVE: raise QueueShutDown while self.full(): putter = self._get_loop().create_future() @@ -152,7 +156,7 @@ async def put(self, item): # the call. Wake up the next in line. self._wakeup_next(self._putters) raise - if self.shutdown_state != _queue_alive: + if self._shutdown_state is not _QueueState.ALIVE: raise QueueShutDown return self.put_nowait(item) @@ -161,7 +165,7 @@ def put_nowait(self, item): If no free slot is immediately available, raise QueueFull. """ - if self.shutdown_state != _queue_alive: + if self._shutdown_state is not _QueueState.ALIVE: raise QueueShutDown if self.full(): raise QueueFull @@ -175,10 +179,10 @@ async def get(self): If queue is empty, wait until an item is available. """ - if self.shutdown_state == _queue_shutdown_immediate: + if self._shutdown_state is _QueueState.SHUTDOWN_IMMEDIATE: raise QueueShutDown while self.empty(): - if self.shutdown_state != _queue_alive: + if self._shutdown_state is not _QueueState.ALIVE: raise QueueShutDown getter = self._get_loop().create_future() self._getters.append(getter) @@ -198,7 +202,7 @@ async def get(self): # the call. Wake up the next in line. self._wakeup_next(self._getters) raise - if self.shutdown_state == _queue_shutdown_immediate: + if self._shutdown_state is not _QueueState.ALIVE: raise QueueShutDown return self.get_nowait() @@ -208,10 +212,10 @@ def get_nowait(self): Return an item if one is immediately available, else raise QueueEmpty. """ if self.empty(): - if self.shutdown_state != _queue_alive: + if self._shutdown_state is not _QueueState.ALIVE: raise QueueShutDown raise QueueEmpty - elif self.shutdown_state == _queue_shutdown_immediate: + elif self._shutdown_state is _QueueState.SHUTDOWN_IMMEDIATE: raise QueueShutDown item = self._get() self._wakeup_next(self._putters) @@ -258,17 +262,20 @@ def shutdown(self, immediate=False): and join() if 'immediate'. The QueueShutDown exception is raised. """ if immediate: - self.shutdown_state = _queue_shutdown_immediate + self._shutdown_state = _QueueState.SHUTDOWN_IMMEDIATE while self._getters: getter = self._getters.popleft() if not getter.done(): getter.set_result(None) else: - self.shutdown_state = _queue_shutdown + self._shutdown_state = _QueueState.SHUTDOWN while self._putters: putter = self._putters.popleft() if not putter.done(): putter.set_result(None) + # Release 'joined' tasks/coros + self._finished.set() + class PriorityQueue(Queue): """A subclass of Queue; retrieves entries in priority order (lowest first). From 7491ef1ff9e36213333e650cd6c5c398174c16df Mon Sep 17 00:00:00 2001 From: Duprat Date: Fri, 10 Feb 2023 17:59:19 +0100 Subject: [PATCH 03/12] add unitests to `shutdwon` method --- Lib/test/test_asyncio/test_queues.py | 95 ++++++++++++++++++++++++++++ 1 file changed, 95 insertions(+) diff --git a/Lib/test/test_asyncio/test_queues.py b/Lib/test/test_asyncio/test_queues.py index 418c3fe618d89b8..718b1ef5c776340 100644 --- a/Lib/test/test_asyncio/test_queues.py +++ b/Lib/test/test_asyncio/test_queues.py @@ -560,6 +560,101 @@ async def test_immediate(self): except asyncio.QueueShutDown: pass + async def test_shutdown_repr(self): + q = self.q_class() + q.shutdown() + self.assertIn("shutdown", repr(q)) + + q = self.q_class() + q.shutdown(immediate=True) + self.assertIn("shutdown-immediate", repr(q)) + + async def test_get_shutdown_immediate(self): + results = [] + maxsize = 2 + delay = 1e-3 + + async def get_once(q): + try: + msg = await q.get() + results.append(False) + except asyncio.QueueShutDown: + results.append(True) + return True + + async def shutdown(q, delay, immediate): + await asyncio.sleep(delay) + q.shutdown(immediate) + return True + + q = self.q_class(maxsize) + t = [asyncio.create_task(get_once(q)) for _ in range(maxsize)] + t += [asyncio.create_task(shutdown(q, delay, True))] + res = await asyncio.gather(*t) + + self.assertEqual(results, [True]*maxsize) + + + async def test_put_shutdown(self): + maxsize = 2 + results = [] + go = asyncio.Event() + + async def put_twice(q, go, msg): + await q.put(msg) + await go.wait() + try: + await q.put(msg+maxsize) + results.append(False) + except asyncio.QueueShutDown: + results.append(True) + return msg + + async def shutdown(q, go, immediate): + q.shutdown(immediate) + go.set() + + q = self.q_class(maxsize) + t = [asyncio.create_task(put_twice(q, go, i+1)) for i in range(maxsize)] + t += [asyncio.create_task(shutdown(q, go, False))] + res = await asyncio.gather(*t) + + self.assertEqual(results, [True]*maxsize) + + + async def test_put_and_join_shutdown(self): + maxsize = 2 + results = [] + go = asyncio.Event() + + async def put_twice(q, go, msg): + await q.put(msg) + await go.wait() + try: + await q.put(msg+100) + results.append(False) + except asyncio.QueueShutDown: + results.append(True) + return msg + + async def shutdown(q, go, immediate): + q.shutdown(immediate) + go.set() + + async def join(q, delay): + await go.wait() + await q.join() + results.append(True) + return True + + q = self.q_class(maxsize) + t = [asyncio.create_task(put_twice(q, go, i+1)) for i in range(maxsize)] + t += [asyncio.create_task(shutdown(q, go, True)), + asyncio.create_task(join(q, go))] + res = await asyncio.gather(*t) + + self.assertEqual(results, [True]*(maxsize+1)) + class QueueShutdownTests( _QueueShutdownTestMixin, unittest.IsolatedAsyncioTestCase From dd22c6b53ab99cb2f25f0f4f29e65cb973717198 Mon Sep 17 00:00:00 2001 From: Duprat Date: Fri, 10 Feb 2023 17:59:38 +0100 Subject: [PATCH 04/12] add unitests to `shutdwon` method --- Lib/test/test_queue.py | 187 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 187 insertions(+) diff --git a/Lib/test/test_queue.py b/Lib/test/test_queue.py index 354299b9a5b16a6..7b377864308dda7 100644 --- a/Lib/test/test_queue.py +++ b/Lib/test/test_queue.py @@ -276,6 +276,193 @@ def test_shutdown_immediate(self): except self.queue.ShutDown: pass + def test_get_shutdown(self): + q = self.type2test(2) + results = [] + go = threading.Event() + + def get_once(q, go): + try: + go.wait() + msg = q.get() + results.append(False) + except self.queue.ShutDown: + results.append(True) + return True + + tests = ( + (get_once, (q, go)), + (get_once, (q, go)), + ) + threads = [] + for f, params in tests: + thread = threading.Thread(target=f, args=params) + thread.start() + threads.append(thread) + q.shutdown() + go.set() + for t in threads: + t.join() + + self.assertEqual(results, [True]*len(tests)) + + def test_put_shutdown(self): + q = self.type2test(2) + results = [] + go = threading.Event() + + def put_twice(q, msg, go): + q.put(msg) + go.wait() + try: + q.put(msg) + results.append(False) + except self.queue.ShutDown: + results.append(True) + return msg + + tests = ( + (put_twice, (q, 100, go)), + (put_twice, (q, 200, go)), + ) + threads = [] + for f, params in tests: + thread = threading.Thread(target=f, args=params) + thread.start() + threads.append(thread) + q.shutdown() + go.set() + for t in threads: + t.join() + + self.assertEqual(results, [True]*len(tests)) + + def _join_shutdown(self, immediate): + q = self.type2test() + results = [] + go = threading.Event() + + def join(q, go): + go.wait() + q.join() + results.append(True) + + tests = ( + (join, (q, go)), + (join, (q, go)), + ) + threads = [] + for f, params in tests: + thread = threading.Thread(target=f, args=params) + thread.start() + threads.append(thread) + go.set() + q.shutdown(immediate) + for t in threads: + t.join() + + self.assertEqual(results, [True]*len(tests)) + + def test_join_shutdown_immediate(self): + return self._join_shutdown(True) + + def test_join_shutdown(self): + return self._join_shutdown(False) + + def _put_and_join_shutdown(self, immediate): + q = self.type2test(2) + results = [] + go = threading.Event() + + def put_twice(q, msg, go): + q.put(msg) + go.wait() + try: + q.put(msg) + results.append(False) + except self.queue.ShutDown: + results.append(True) + return msg + + def join(q, go): + go.wait() + q.join() + results.append(True) + + tests = ( + (put_twice, (q, 100, go)), + (put_twice, (q, 200, go)), + (join, (q, go)), + (join, (q, go)), + ) + threads = [] + for f, params in tests: + thread = threading.Thread(target=f, args=params) + thread.start() + threads.append(thread) + go.set() + q.shutdown(immediate) + if not immediate: + self.assertTrue(q.unfinished_tasks, 2) + for i in range(2): + thread = threading.Thread(target=q.task_done) + thread.start() + threads.append(thread) + + for t in threads: + t.join() + + self.assertEqual(results, [True]*len(tests)) + + def test_put_and_join_shutdown_immediate(self): + return self._put_and_join_shutdown(True) + + def test_put_and_join_shutdown(self): + return self._put_and_join_shutdown(False) + + def _get_and_join_shutdown(self, immediate): + q = self.type2test() + results = [] + go = threading.Event() + + def get_once(q, go): + try: + go.wait() + msg = q.get() + results.append(False) + except self.queue.ShutDown: + results.append(True) + return True + + def join(q, go): + go.wait() + q.join() + results.append(True) + + tests = ( + (get_once, (q, go)), + (get_once, (q, go)), + (join, (q, go)), + (join, (q, go)), + ) + threads = [] + for f, params in tests: + thread = threading.Thread(target=f, args=params) + thread.start() + threads.append(thread) + go.set() + q.shutdown(immediate) + for t in threads: + t.join() + + self.assertEqual(results, [True]*len(tests)) + + def test_get_and_join_shutdown_immediate(self): + return self._get_and_join_shutdown(True) + + def test_get_and_join_shutdown(self): + return self._get_and_join_shutdown(False) + class QueueTest(BaseQueueTestMixin): def setUp(self): From 52393064797dc37ff34939b88165c4118600effb Mon Sep 17 00:00:00 2001 From: Duprat Date: Fri, 10 Feb 2023 18:17:02 +0100 Subject: [PATCH 05/12] replace global state variable with an enum `_QueueState` --- Lib/queue.py | 38 +++++++++++++++++++++----------------- 1 file changed, 21 insertions(+), 17 deletions(-) diff --git a/Lib/queue.py b/Lib/queue.py index f08dbd47f188ee1..e3c2f01fd57df81 100644 --- a/Lib/queue.py +++ b/Lib/queue.py @@ -1,5 +1,6 @@ '''A multi-producer, multi-consumer queue.''' +import enum import threading import types from collections import deque @@ -29,9 +30,12 @@ class ShutDown(Exception): '''Raised when put/get with shut-down queue.''' -_queue_alive = "alive" -_queue_shutdown = "shutdown" -_queue_shutdown_immediate = "shutdown-immediate" +class _QueueState(enum.Enum): + ALIVE = "alive" + SHUTDOWN = "shutdown" + SHUTDOWN_IMMEDIATE = "shutdown_immediate" + +E class Queue: @@ -64,7 +68,7 @@ def __init__(self, maxsize=0): self.unfinished_tasks = 0 # Queue shut-down state - self.shutdown_state = _queue_alive + self.shutdown_state = _QueueState.ALIVE def task_done(self): '''Indicate that a formerly enqueued task is complete. @@ -99,7 +103,7 @@ def join(self): ''' with self.all_tasks_done: while self.unfinished_tasks: - if self.shutdown_state == _queue_shutdown_immediate: + if self.shutdown_state is _QueueState.SHUTDOWN_IMMEDIATE: return self.all_tasks_done.wait() @@ -144,7 +148,7 @@ def put(self, item, block=True, timeout=None): is immediately available, else raise the Full exception ('timeout' is ignored in that case). ''' - if self.shutdown_state != _queue_alive: + if self.shutdown_state is not _QueueState.ALIVE: raise ShutDown with self.not_full: if self.maxsize > 0: @@ -154,7 +158,7 @@ def put(self, item, block=True, timeout=None): elif timeout is None: while self._qsize() >= self.maxsize: self.not_full.wait() - if self.shutdown_state != _queue_alive: + if self.shutdown_state is not _QueueState.ALIVE: raise ShutDown elif timeout < 0: raise ValueError("'timeout' must be a non-negative number") @@ -165,7 +169,7 @@ def put(self, item, block=True, timeout=None): if remaining <= 0.0: raise Full self.not_full.wait(remaining) - if self.shutdown_state != _queue_alive: + if self.shutdown_state is not _QueueState.ALIVE: raise ShutDown self._put(item) self.unfinished_tasks += 1 @@ -182,35 +186,35 @@ def get(self, block=True, timeout=None): available, else raise the Empty exception ('timeout' is ignored in that case). ''' - if self.shutdown_state == _queue_shutdown_immediate: + if self.shutdown_state is _QueueState.SHUTDOWN_IMMEDIATE: raise ShutDown with self.not_empty: if not block: if not self._qsize(): - if self.shutdown_state != _queue_alive: + if self.shutdown_state is not _QueueState.ALIVE: raise ShutDown raise Empty elif timeout is None: while not self._qsize(): - if self.shutdown_state != _queue_alive: + if self.shutdown_state is not _QueueState.ALIVE: raise ShutDown self.not_empty.wait() - if self.shutdown_state != _queue_alive: + if self.shutdown_state is not _QueueState.ALIVE: raise ShutDown elif timeout < 0: raise ValueError("'timeout' must be a non-negative number") else: endtime = time() + timeout while not self._qsize(): - if self.shutdown_state != _queue_alive: + if self.shutdown_state is not _QueueState.ALIVE: raise ShutDown remaining = endtime - time() if remaining <= 0.0: raise Empty self.not_empty.wait(remaining) - if self.shutdown_state != _queue_alive: + if self.shutdown_state is not _QueueState.ALIVE: raise ShutDown - if self.shutdown_state == _queue_shutdown_immediate: + if self.shutdown_state is _QueueState.SHUTDOWN_IMMEDIATE: raise ShutDown item = self._get() self.not_full.notify() @@ -243,7 +247,7 @@ def shutdown(self, immediate=False): ''' with self.mutex: if immediate: - self.shutdown_state = _queue_shutdown_immediate + self.shutdown_state = _QueueState.SHUTDOWN_IMMEDIATE self.not_empty.notify_all() # set self.unfinished_tasks to 0 # to break the loop in 'self.join()' @@ -251,7 +255,7 @@ def shutdown(self, immediate=False): self.unfinished_tasks = 0 self.all_tasks_done.notify_all() else: - self.shutdown_state = _queue_shutdown + self.shutdown_state = _QueueState.SHUTDOWN self.not_full.notify_all() # Override these methods to implement other queue organizations From 4b127b6376b887a04d975898d24a8284ff0dac44 Mon Sep 17 00:00:00 2001 From: Duprat Date: Fri, 10 Feb 2023 19:21:10 +0100 Subject: [PATCH 06/12] replace global state variable with an enum `_QueueState` - erase E just line below --- Lib/queue.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/Lib/queue.py b/Lib/queue.py index e3c2f01fd57df81..9481a91feb4ff6e 100644 --- a/Lib/queue.py +++ b/Lib/queue.py @@ -35,8 +35,6 @@ class _QueueState(enum.Enum): SHUTDOWN = "shutdown" SHUTDOWN_IMMEDIATE = "shutdown_immediate" -E - class Queue: '''Create a queue object with a given maximum size. From 6402de7ff7c0a9186d8b4c72a572c2b1af0cefaa Mon Sep 17 00:00:00 2001 From: Duprat Date: Sat, 11 Feb 2023 17:02:37 +0100 Subject: [PATCH 07/12] simplify and unify tests --- Lib/test/test_asyncio/test_queues.py | 91 +++++++++++++++++++--------- 1 file changed, 62 insertions(+), 29 deletions(-) diff --git a/Lib/test/test_asyncio/test_queues.py b/Lib/test/test_asyncio/test_queues.py index 718b1ef5c776340..a1a6cd3e0c9c7b4 100644 --- a/Lib/test/test_asyncio/test_queues.py +++ b/Lib/test/test_asyncio/test_queues.py @@ -570,11 +570,12 @@ async def test_shutdown_repr(self): self.assertIn("shutdown-immediate", repr(q)) async def test_get_shutdown_immediate(self): + q = self.q_class() results = [] - maxsize = 2 - delay = 1e-3 + go = asyncio.Event() - async def get_once(q): + async def get_once(q, go): + await go.wait() try: msg = await q.get() results.append(False) @@ -582,29 +583,36 @@ async def get_once(q): results.append(True) return True - async def shutdown(q, delay, immediate): - await asyncio.sleep(delay) + async def shutdown(q, go, immediate): q.shutdown(immediate) + go.set() return True - q = self.q_class(maxsize) - t = [asyncio.create_task(get_once(q)) for _ in range(maxsize)] - t += [asyncio.create_task(shutdown(q, delay, True))] + tasks = ( + (get_once, (q, go)), + (get_once, (q, go)), + ) + t = [] + for coro, params in tasks: + t.append(asyncio.create_task(coro(*params))) + t.append(asyncio.create_task(shutdown(q, go, True))) res = await asyncio.gather(*t) - self.assertEqual(results, [True]*maxsize) + self.assertEqual(results, [True]*len(tasks)) - async def test_put_shutdown(self): - maxsize = 2 + async def _put_shutdown(self, immediate): + q = self.q_class(2) results = [] go = asyncio.Event() + await q.put("Y") + await q.put("D") + # queue fulled - async def put_twice(q, go, msg): - await q.put(msg) + async def put_once(q, go, msg): await go.wait() try: - await q.put(msg+maxsize) + await q.put(msg) results.append(False) except asyncio.QueueShutDown: results.append(True) @@ -614,24 +622,37 @@ async def shutdown(q, go, immediate): q.shutdown(immediate) go.set() - q = self.q_class(maxsize) - t = [asyncio.create_task(put_twice(q, go, i+1)) for i in range(maxsize)] - t += [asyncio.create_task(shutdown(q, go, False))] + tasks = ( + (put_once, (q, go, 100)), + (put_once, (q, go, 200)), + ) + t = [] + for coro, params in tasks: + t.append(asyncio.create_task(coro(*params))) + t.append(asyncio.create_task(shutdown(q, go, immediate))) res = await asyncio.gather(*t) - self.assertEqual(results, [True]*maxsize) + self.assertEqual(results, [True]*len(tasks)) + async def test_put_shutdown(self): + return await self._put_shutdown(False) - async def test_put_and_join_shutdown(self): - maxsize = 2 + async def test_put_shutdown_immediate(self): + return await self._put_shutdown(True) + + + async def _put_and_join_shutdown(self, immediate): + q = self.q_class(2) results = [] go = asyncio.Event() + await q.put("Y") + await q.put("D") + # queue fulled - async def put_twice(q, go, msg): - await q.put(msg) + async def put_once(q, go, msg): await go.wait() try: - await q.put(msg+100) + await q.put(msg) results.append(False) except asyncio.QueueShutDown: results.append(True) @@ -641,19 +662,31 @@ async def shutdown(q, go, immediate): q.shutdown(immediate) go.set() - async def join(q, delay): + async def join(q, go): await go.wait() await q.join() results.append(True) return True - q = self.q_class(maxsize) - t = [asyncio.create_task(put_twice(q, go, i+1)) for i in range(maxsize)] - t += [asyncio.create_task(shutdown(q, go, True)), - asyncio.create_task(join(q, go))] + tasks = ( + (put_once, (q, go, 'E')), + (put_once, (q, go, 'W')), + (join, (q, go)), + (join, (q, go)), + ) + t = [] + for coro, params in tasks: + t.append(asyncio.create_task(coro(*params))) + t.append(asyncio.create_task(shutdown(q, go, immediate))) res = await asyncio.gather(*t) - self.assertEqual(results, [True]*(maxsize+1)) + self.assertEqual(results, [True]*len(tasks)) + + async def test_put_and_join_shutdown(self): + return await self._put_and_join_shutdown(False) + + async def test_put_and_join_shutdown_immediate(self): + return await self._put_and_join_shutdown(True) class QueueShutdownTests( From be9588bfece569ee5cb8e2932b41495b80bd11c0 Mon Sep 17 00:00:00 2001 From: Duprat Date: Sat, 11 Feb 2023 17:02:53 +0100 Subject: [PATCH 08/12] simplify and unify tests --- Lib/test/test_queue.py | 54 +++++++++++++++++++++++------------------- 1 file changed, 29 insertions(+), 25 deletions(-) diff --git a/Lib/test/test_queue.py b/Lib/test/test_queue.py index 7b377864308dda7..55a02e82e157959 100644 --- a/Lib/test/test_queue.py +++ b/Lib/test/test_queue.py @@ -282,20 +282,20 @@ def test_get_shutdown(self): go = threading.Event() def get_once(q, go): + go.wait() try: - go.wait() msg = q.get() results.append(False) except self.queue.ShutDown: results.append(True) return True - tests = ( + thrds = ( (get_once, (q, go)), (get_once, (q, go)), ) threads = [] - for f, params in tests: + for f, params in thrds: thread = threading.Thread(target=f, args=params) thread.start() threads.append(thread) @@ -304,15 +304,17 @@ def get_once(q, go): for t in threads: t.join() - self.assertEqual(results, [True]*len(tests)) + self.assertEqual(results, [True]*len(thrds)) def test_put_shutdown(self): q = self.type2test(2) results = [] go = threading.Event() + q.put("Y") + q.put("D") + # queue fulled - def put_twice(q, msg, go): - q.put(msg) + def put_once(q, msg, go): go.wait() try: q.put(msg) @@ -321,12 +323,12 @@ def put_twice(q, msg, go): results.append(True) return msg - tests = ( - (put_twice, (q, 100, go)), - (put_twice, (q, 200, go)), + thrds = ( + (put_once, (q, 100, go)), + (put_once, (q, 200, go)), ) threads = [] - for f, params in tests: + for f, params in thrds: thread = threading.Thread(target=f, args=params) thread.start() threads.append(thread) @@ -335,7 +337,7 @@ def put_twice(q, msg, go): for t in threads: t.join() - self.assertEqual(results, [True]*len(tests)) + self.assertEqual(results, [True]*len(thrds)) def _join_shutdown(self, immediate): q = self.type2test() @@ -347,12 +349,12 @@ def join(q, go): q.join() results.append(True) - tests = ( + thrds = ( (join, (q, go)), (join, (q, go)), ) threads = [] - for f, params in tests: + for f, params in thrds: thread = threading.Thread(target=f, args=params) thread.start() threads.append(thread) @@ -361,7 +363,7 @@ def join(q, go): for t in threads: t.join() - self.assertEqual(results, [True]*len(tests)) + self.assertEqual(results, [True]*len(thrds)) def test_join_shutdown_immediate(self): return self._join_shutdown(True) @@ -373,9 +375,11 @@ def _put_and_join_shutdown(self, immediate): q = self.type2test(2) results = [] go = threading.Event() + q.put("Y") + q.put("D") + # queue fulled - def put_twice(q, msg, go): - q.put(msg) + def put_once(q, msg, go): go.wait() try: q.put(msg) @@ -389,14 +393,14 @@ def join(q, go): q.join() results.append(True) - tests = ( - (put_twice, (q, 100, go)), - (put_twice, (q, 200, go)), + thrds = ( + (put_once, (q, 100, go)), + (put_once, (q, 200, go)), (join, (q, go)), (join, (q, go)), ) threads = [] - for f, params in tests: + for f, params in thrds: thread = threading.Thread(target=f, args=params) thread.start() threads.append(thread) @@ -412,7 +416,7 @@ def join(q, go): for t in threads: t.join() - self.assertEqual(results, [True]*len(tests)) + self.assertEqual(results, [True]*len(thrds)) def test_put_and_join_shutdown_immediate(self): return self._put_and_join_shutdown(True) @@ -426,8 +430,8 @@ def _get_and_join_shutdown(self, immediate): go = threading.Event() def get_once(q, go): + go.wait() try: - go.wait() msg = q.get() results.append(False) except self.queue.ShutDown: @@ -439,14 +443,14 @@ def join(q, go): q.join() results.append(True) - tests = ( + thrds = ( (get_once, (q, go)), (get_once, (q, go)), (join, (q, go)), (join, (q, go)), ) threads = [] - for f, params in tests: + for f, params in thrds: thread = threading.Thread(target=f, args=params) thread.start() threads.append(thread) @@ -455,7 +459,7 @@ def join(q, go): for t in threads: t.join() - self.assertEqual(results, [True]*len(tests)) + self.assertEqual(results, [True]*len(thrds)) def test_get_and_join_shutdown_immediate(self): return self._get_and_join_shutdown(True) From 3613f5d4a2c600ab096d37bf3990a1c2b2fde8d6 Mon Sep 17 00:00:00 2001 From: Duprat Date: Mon, 13 Feb 2023 17:34:55 +0100 Subject: [PATCH 09/12] add `_shutdown_state` to tuples of `__getstate__` and `__setstate__`, add an empty `shutdown` method --- Lib/multiprocessing/queues.py | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/Lib/multiprocessing/queues.py b/Lib/multiprocessing/queues.py index 26b212d6a50610f..bfaf81e2dc66163 100644 --- a/Lib/multiprocessing/queues.py +++ b/Lib/multiprocessing/queues.py @@ -65,11 +65,13 @@ def __init__(self, maxsize=0, *, ctx): def __getstate__(self): context.assert_spawning(self) return (self._ignore_epipe, self._maxsize, self._reader, self._writer, - self._rlock, self._wlock, self._sem, self._opid) + self._rlock, self._wlock, self._sem, self._opid, + self._shutdown_state) def __setstate__(self, state): (self._ignore_epipe, self._maxsize, self._reader, self._writer, - self._rlock, self._wlock, self._sem, self._opid) = state + self._rlock, self._wlock, self._sem, self._opid, + self._shutdown_state) = state self._reset() def _after_fork(self): @@ -159,6 +161,9 @@ def get_nowait(self): def put_nowait(self, obj): return self.put(obj, False) + def shutdown(self, immediate=True): + pass + def close(self): self._closed = True close = self._close From 06775bb94ecff21c3d83a473bc8db592a7f8a0c4 Mon Sep 17 00:00:00 2001 From: Duprat Date: Wed, 15 Feb 2023 15:15:27 +0100 Subject: [PATCH 10/12] Update initial tests with `self.assertRaises` Add an unittest `test_shutdown`transition --- Lib/test/test_asyncio/test_queues.py | 34 +++++++++++++++------------- Lib/test/test_queue.py | 34 +++++++++++++++------------- 2 files changed, 36 insertions(+), 32 deletions(-) diff --git a/Lib/test/test_asyncio/test_queues.py b/Lib/test/test_asyncio/test_queues.py index a1a6cd3e0c9c7b4..7c882f62ef1038b 100644 --- a/Lib/test/test_asyncio/test_queues.py +++ b/Lib/test/test_asyncio/test_queues.py @@ -528,37 +528,25 @@ class _QueueShutdownTestMixin: async def test_empty(self): q = self.q_class() q.shutdown() - try: + with self.assertRaises(asyncio.QueueShutDown): await q.put("data") - self.fail("Didn't appear to shut-down queue") - except asyncio.QueueShutDown: - pass - try: + with self.assertRaises(asyncio.QueueShutDown): await q.get() - self.fail("Didn't appear to shut-down queue") - except asyncio.QueueShutDown: - pass async def test_nonempty(self): q = self.q_class() q.put_nowait("data") q.shutdown() await q.get() - try: + with self.assertRaises(asyncio.QueueShutDown): await q.get() - self.fail("Didn't appear to shut-down queue") - except asyncio.QueueShutDown: - pass async def test_immediate(self): q = self.q_class() q.put_nowait("data") q.shutdown(immediate=True) - try: + with self.assertRaises(asyncio.QueueShutDown): await q.get() - self.fail("Didn't appear to shut-down queue") - except asyncio.QueueShutDown: - pass async def test_shutdown_repr(self): q = self.q_class() @@ -569,6 +557,20 @@ async def test_shutdown_repr(self): q.shutdown(immediate=True) self.assertIn("shutdown-immediate", repr(q)) + async def test_shutdown_transition(self): + # allowed transitions would be from alive via shutdown to immediate + q = self.q_class() + self.assertEqual("alive", q._shutdown_state.value) + + q.shutdown() + self.assertEqual("shutdown", q._shutdown_state.value) + + q.shutdown(immediate=True) + self.assertEqual("shutdown-immediate", q._shutdown_state.value) + + q.shutdown() + self.assertEqual("shutdown-immediate", q._shutdown_state.value) + async def test_get_shutdown_immediate(self): q = self.q_class() results = [] diff --git a/Lib/test/test_queue.py b/Lib/test/test_queue.py index 55a02e82e157959..b25453f9203b5e6 100644 --- a/Lib/test/test_queue.py +++ b/Lib/test/test_queue.py @@ -244,37 +244,39 @@ def test_shrinking_queue(self): def test_shutdown_empty(self): q = self.type2test() q.shutdown() - try: + with self.assertRaises(self.queue.ShutDown): q.put("data") - self.fail("Didn't appear to shut-down queue") - except self.queue.ShutDown: - pass - try: + with self.assertRaises(self.queue.ShutDown): q.get() - self.fail("Didn't appear to shut-down queue") - except self.queue.ShutDown: - pass def test_shutdown_nonempty(self): q = self.type2test() q.put("data") q.shutdown() q.get() - try: + with self.assertRaises(self.queue.ShutDown): q.get() - self.fail("Didn't appear to shut-down queue") - except self.queue.ShutDown: - pass def test_shutdown_immediate(self): q = self.type2test() q.put("data") q.shutdown(immediate=True) - try: + with self.assertRaises(self.queue.ShutDown): q.get() - self.fail("Didn't appear to shut-down queue") - except self.queue.ShutDown: - pass + + def test_shutdown_transition(self): + # allowed transitions would be from alive via shutdown to immediate + q = self.type2test() + self.assertEqual("alive", q.shutdown_state.value) + + q.shutdown() + self.assertEqual("shutdown", q.shutdown_state.value) + + q.shutdown(immediate=True) + self.assertEqual("shutdown-immediate", q.shutdown_state.value) + + q.shutdown(immediate=False) + self.assertEqual("shutdown-immediate", q.shutdown_state.value) def test_get_shutdown(self): q = self.type2test(2) From 6f0101567e493b88d4272fcb3f8e2c55afc63510 Mon Sep 17 00:00:00 2001 From: Duprat Date: Wed, 15 Feb 2023 15:18:11 +0100 Subject: [PATCH 11/12] integration of shudown transition in `shutdown` method Change "shutdown_immediate" to "shutdown-immediate" --- Lib/asyncio/queues.py | 7 +++++-- Lib/queue.py | 5 ++++- 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/Lib/asyncio/queues.py b/Lib/asyncio/queues.py index 5b58f753a0fac7d..f4e1ba58866e532 100644 --- a/Lib/asyncio/queues.py +++ b/Lib/asyncio/queues.py @@ -261,6 +261,9 @@ def shutdown(self, immediate=False): All blocked callers of put() will be unblocked, and also get() and join() if 'immediate'. The QueueShutDown exception is raised. """ + if self._shutdown_state is _QueueState.SHUTDOWN_IMMEDIATE: + return + if immediate: self._shutdown_state = _QueueState.SHUTDOWN_IMMEDIATE while self._getters: @@ -268,12 +271,12 @@ def shutdown(self, immediate=False): if not getter.done(): getter.set_result(None) else: - self._shutdown_state = _QueueState.SHUTDOWN + self._shutdown_state = _QueueState.SHUTDOWN while self._putters: putter = self._putters.popleft() if not putter.done(): putter.set_result(None) - # Release 'joined' tasks/coros + # Release 'blocked' tasks/coros via `.join()` self._finished.set() diff --git a/Lib/queue.py b/Lib/queue.py index 9481a91feb4ff6e..933f18062050726 100644 --- a/Lib/queue.py +++ b/Lib/queue.py @@ -33,7 +33,7 @@ class ShutDown(Exception): class _QueueState(enum.Enum): ALIVE = "alive" SHUTDOWN = "shutdown" - SHUTDOWN_IMMEDIATE = "shutdown_immediate" + SHUTDOWN_IMMEDIATE = "shutdown-immediate" class Queue: @@ -244,6 +244,9 @@ def shutdown(self, immediate=False): and join() if 'immediate'. The ShutDown exception is raised. ''' with self.mutex: + if self.shutdown_state is _QueueState.SHUTDOWN_IMMEDIATE: + return + if immediate: self.shutdown_state = _QueueState.SHUTDOWN_IMMEDIATE self.not_empty.notify_all() From ff9895dd212a9d51a0d7bcfbca9d223305b69344 Mon Sep 17 00:00:00 2001 From: Duprat Date: Wed, 15 Feb 2023 15:45:50 +0100 Subject: [PATCH 12/12] Set `test_shutdown` prefix to all unittests --- Lib/test/test_asyncio/test_queues.py | 22 +++++++++--------- Lib/test/test_queue.py | 34 ++++++++++++++-------------- 2 files changed, 28 insertions(+), 28 deletions(-) diff --git a/Lib/test/test_asyncio/test_queues.py b/Lib/test/test_asyncio/test_queues.py index 7c882f62ef1038b..f2c329c94b79973 100644 --- a/Lib/test/test_asyncio/test_queues.py +++ b/Lib/test/test_asyncio/test_queues.py @@ -571,7 +571,7 @@ async def test_shutdown_transition(self): q.shutdown() self.assertEqual("shutdown-immediate", q._shutdown_state.value) - async def test_get_shutdown_immediate(self): + async def test_shutdown_immediate_get(self): q = self.q_class() results = [] go = asyncio.Event() @@ -603,7 +603,7 @@ async def shutdown(q, go, immediate): self.assertEqual(results, [True]*len(tasks)) - async def _put_shutdown(self, immediate): + async def _shutdown_put(self, immediate): q = self.q_class(2) results = [] go = asyncio.Event() @@ -636,14 +636,14 @@ async def shutdown(q, go, immediate): self.assertEqual(results, [True]*len(tasks)) - async def test_put_shutdown(self): - return await self._put_shutdown(False) + async def test_shutdown_put(self): + return await self._shutdown_put(False) - async def test_put_shutdown_immediate(self): - return await self._put_shutdown(True) + async def test_shutdown_immediate_put(self): + return await self._shutdown_put(True) - async def _put_and_join_shutdown(self, immediate): + async def _shutdown_put_and_join(self, immediate): q = self.q_class(2) results = [] go = asyncio.Event() @@ -684,11 +684,11 @@ async def join(q, go): self.assertEqual(results, [True]*len(tasks)) - async def test_put_and_join_shutdown(self): - return await self._put_and_join_shutdown(False) + async def test_shutdown_put_and_join(self): + return await self._shutdown_put_and_join(False) - async def test_put_and_join_shutdown_immediate(self): - return await self._put_and_join_shutdown(True) + async def test_shutdown_immediate_put_and_join(self): + return await self._shutdown_put_and_join(True) class QueueShutdownTests( diff --git a/Lib/test/test_queue.py b/Lib/test/test_queue.py index b25453f9203b5e6..a2814ac40f2e181 100644 --- a/Lib/test/test_queue.py +++ b/Lib/test/test_queue.py @@ -278,7 +278,7 @@ def test_shutdown_transition(self): q.shutdown(immediate=False) self.assertEqual("shutdown-immediate", q.shutdown_state.value) - def test_get_shutdown(self): + def test_shutdown_get(self): q = self.type2test(2) results = [] go = threading.Event() @@ -308,7 +308,7 @@ def get_once(q, go): self.assertEqual(results, [True]*len(thrds)) - def test_put_shutdown(self): + def test_shutdown_put(self): q = self.type2test(2) results = [] go = threading.Event() @@ -341,7 +341,7 @@ def put_once(q, msg, go): self.assertEqual(results, [True]*len(thrds)) - def _join_shutdown(self, immediate): + def _shutdown_join(self, immediate): q = self.type2test() results = [] go = threading.Event() @@ -367,13 +367,13 @@ def join(q, go): self.assertEqual(results, [True]*len(thrds)) - def test_join_shutdown_immediate(self): - return self._join_shutdown(True) + def test_shutdown_immediate_join(self): + return self._shutdown_join(True) - def test_join_shutdown(self): - return self._join_shutdown(False) + def test_shutdown_join(self): + return self._shutdown_join(False) - def _put_and_join_shutdown(self, immediate): + def _shutdown_put_and_join(self, immediate): q = self.type2test(2) results = [] go = threading.Event() @@ -420,13 +420,13 @@ def join(q, go): self.assertEqual(results, [True]*len(thrds)) - def test_put_and_join_shutdown_immediate(self): - return self._put_and_join_shutdown(True) + def test_shutdown_immediate_put_and_join(self): + return self._shutdown_put_and_join(True) - def test_put_and_join_shutdown(self): - return self._put_and_join_shutdown(False) + def test_shutdown_put_and_join(self): + return self._shutdown_put_and_join(False) - def _get_and_join_shutdown(self, immediate): + def _shutdown_get_and_join(self, immediate): q = self.type2test() results = [] go = threading.Event() @@ -463,11 +463,11 @@ def join(q, go): self.assertEqual(results, [True]*len(thrds)) - def test_get_and_join_shutdown_immediate(self): - return self._get_and_join_shutdown(True) + def test_shutdown_immediate_get_and_join(self): + return self._shutdown_get_and_join(True) - def test_get_and_join_shutdown(self): - return self._get_and_join_shutdown(False) + def test__shutdown_get_and_join(self): + return self._shutdown_get_and_join(False) class QueueTest(BaseQueueTestMixin):