From 3c9c12083eabf84d1538d9ab06e5c6b2dcf4ab16 Mon Sep 17 00:00:00 2001 From: grzgrzgrz3 Date: Thu, 25 May 2017 16:22:57 +0200 Subject: [PATCH] [3.5] bpo-30414: multiprocessing.Queue._feed do not break from main loop on exc (GH-1683) * bpo-30414: multiprocesing.Queue._feed do not break from main loop on exc Queue background running thread was not handling exceptions correctly. Any exception occurred inside thread (putting unpickable object) cause feeder to finish running. After that every message put into queue is silently ignored. * bpo-30414: multiprocesing.Queue._feed do not break from main loop on exc Queue background running thread was not handling exceptions correctly. Any exception occurred inside thread (putting unpickable object) cause feeder to finish running. After that every message put into queue is silently ignored. (cherry picked from commit bc50f03db4f58c869b78e98468e374d7e61f1227) --- Lib/multiprocessing/queues.py | 22 ++++++++++------------ Lib/test/_test_multiprocessing.py | 14 ++++++++++++++ Misc/NEWS | 3 +++ 3 files changed, 27 insertions(+), 12 deletions(-) diff --git a/Lib/multiprocessing/queues.py b/Lib/multiprocessing/queues.py index 40ae10a88c1508..2c888c00e7c298 100644 --- a/Lib/multiprocessing/queues.py +++ b/Lib/multiprocessing/queues.py @@ -221,8 +221,8 @@ def _feed(buffer, notempty, send_bytes, writelock, close, ignore_epipe): else: wacquire = None - try: - while 1: + while 1: + try: nacquire() try: if not buffer: @@ -249,21 +249,19 @@ def _feed(buffer, notempty, send_bytes, writelock, close, ignore_epipe): wrelease() except IndexError: pass - except Exception as e: - if ignore_epipe and getattr(e, 'errno', 0) == errno.EPIPE: - return - # Since this runs in a daemon thread the resources it uses - # may be become unusable while the process is cleaning up. - # We ignore errors which happen after the process has - # started to cleanup. - try: + except Exception as e: + if ignore_epipe and getattr(e, 'errno', 0) == errno.EPIPE: + return + # Since this runs in a daemon thread the resources it uses + # may be become unusable while the process is cleaning up. + # We ignore errors which happen after the process has + # started to cleanup. if is_exiting(): info('error in queue thread: %s', e) + return else: import traceback traceback.print_exc() - except Exception: - pass _sentinel = object() diff --git a/Lib/test/_test_multiprocessing.py b/Lib/test/_test_multiprocessing.py index 040212889486ec..b24e7d6c32df1d 100644 --- a/Lib/test/_test_multiprocessing.py +++ b/Lib/test/_test_multiprocessing.py @@ -752,6 +752,20 @@ def test_timeout(self): # Windows (usually 15.6 ms) self.assertGreaterEqual(delta, 0.170) + def test_queue_feeder_donot_stop_onexc(self): + # bpo-30414: verify feeder handles exceptions correctly + if self.TYPE != 'processes': + self.skipTest('test not appropriate for {}'.format(self.TYPE)) + + class NotSerializable(object): + def __reduce__(self): + raise AttributeError + with test.support.captured_stderr(): + q = self.Queue() + q.put(NotSerializable()) + q.put(True) + self.assertTrue(q.get(timeout=0.1)) + # # # diff --git a/Misc/NEWS b/Misc/NEWS index 23eed2a9b0f464..5aed87a30a72d4 100644 --- a/Misc/NEWS +++ b/Misc/NEWS @@ -56,6 +56,9 @@ Extension Modules Library ------- +- bpo-30414: multiprocessing.Queue._feed background running + thread do not break from main loop on exception. + - bpo-30003: Fix handling escape characters in HZ codec. Based on patch by Ma Lin.