From 8a54f03c6f1cb5387729c1798f755503e0f1c8d5 Mon Sep 17 00:00:00 2001 From: Prad Nelluru Date: Mon, 28 Mar 2022 13:35:44 -0400 Subject: [PATCH 1/4] Process EOS/GRPC errors for ack/modack only when EOS is enabled; don't retry temporary errors for these RPCS when EOS is disabled. --- .../_protocol/streaming_pull_manager.py | 87 ++++++-- .../subscriber/test_streaming_pull_manager.py | 201 +++++++++++++++++- 2 files changed, 265 insertions(+), 23 deletions(-) diff --git a/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py b/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py index 4d9097ff9..2773d910a 100644 --- a/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py +++ b/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py @@ -159,7 +159,8 @@ def _process_requests( ack_reqs_dict: Dict[str, requests.AckRequest], errors_dict: Optional[Dict[str, str]], ): - """Process requests by referring to error_status and errors_dict. + """Process requests when exactly-once delivery is enabled by referring to + error_status and errors_dict. The errors returned by the server in as `error_status` or in `errors_dict` are used to complete the request futures in `ack_reqs_dict` (with a success @@ -167,6 +168,7 @@ def _process_requests( """ requests_completed = [] requests_to_retry = [] + for ack_id in ack_reqs_dict: # Handle special errors returned for ack/modack RPCs via the ErrorInfo # sidecar metadata when exactly-once delivery is enabled. @@ -595,14 +597,23 @@ def send_unary_ack( error_status = _get_status(exc) ack_errors_dict = _get_ack_errors(exc) except exceptions.RetryError as exc: - status = status_pb2.Status() - # Choose a non-retriable error code so the futures fail with - # exceptions. - status.code = code_pb2.UNKNOWN + exactly_once_delivery_enabled = self._exactly_once_delivery_enabled() # Makes sure to complete futures so they don't block forever. - _process_requests(status, ack_reqs_dict, None) + for req in ack_reqs_dict.values(): + # Futures may be present even with exactly-once delivery + # disabled, in transition periods after the setting is changed on + # the subscription. + if req.future: + if exactly_once_delivery_enabled: + e = AcknowledgeError( + AcknowledgeStatus.OTHER, "RetryError while sending ack RPC." + ) + req.future.set_exception(e) + else: + req.future.set_result(AcknowledgeStatus.SUCCESS) + _LOGGER.debug( - "RetryError while sending unary RPC. Waiting on a transient " + "RetryError while sending ack RPC. Waiting on a transient " "error resolution for too long, will now trigger shutdown.", exc_info=False, ) @@ -611,9 +622,23 @@ def send_unary_ack( self._on_rpc_done(exc) raise - requests_completed, requests_to_retry = _process_requests( - error_status, ack_reqs_dict, ack_errors_dict - ) + if self._exactly_once_delivery_enabled(): + requests_completed, requests_to_retry = _process_requests( + error_status, ack_reqs_dict, ack_errors_dict + ) + else: + requests_completed = [] + requests_to_retry = [] + # When exactly-once delivery is NOT enabled, acks/modacks are considered + # best-effort. So, they always succeed even if the RPC fails. + for req in ack_reqs_dict.values(): + # Futures may be present even with exactly-once delivery + # disabled, in transition periods after the setting is changed on + # the subscription. + if req.future: + req.future.set_result(AcknowledgeStatus.SUCCESS) + requests_completed.append(req) + return requests_completed, requests_to_retry def send_unary_modack( @@ -651,14 +676,24 @@ def send_unary_modack( error_status = _get_status(exc) modack_errors_dict = _get_ack_errors(exc) except exceptions.RetryError as exc: - status = status_pb2.Status() - # Choose a non-retriable error code so the futures fail with - # exceptions. - status.code = code_pb2.UNKNOWN + exactly_once_delivery_enabled = self._exactly_once_delivery_enabled() # Makes sure to complete futures so they don't block forever. - _process_requests(status, ack_reqs_dict, None) + for req in ack_reqs_dict.values(): + # Futures may be present even with exactly-once delivery + # disabled, in transition periods after the setting is changed on + # the subscription. + if req.future: + if exactly_once_delivery_enabled: + e = AcknowledgeError( + AcknowledgeStatus.OTHER, + "RetryError while sending modack RPC.", + ) + req.future.set_exception(e) + else: + req.future.set_result(AcknowledgeStatus.SUCCESS) + _LOGGER.debug( - "RetryError while sending unary RPC. Waiting on a transient " + "RetryError while sending modack RPC. Waiting on a transient " "error resolution for too long, will now trigger shutdown.", exc_info=False, ) @@ -667,9 +702,23 @@ def send_unary_modack( self._on_rpc_done(exc) raise - requests_completed, requests_to_retry = _process_requests( - error_status, ack_reqs_dict, modack_errors_dict - ) + if self._exactly_once_delivery_enabled(): + requests_completed, requests_to_retry = _process_requests( + error_status, ack_reqs_dict, modack_errors_dict + ) + else: + requests_completed = [] + requests_to_retry = [] + # When exactly-once delivery is NOT enabled, acks/modacks are considered + # best-effort. So, they always succeed even if the RPC fails. + for req in ack_reqs_dict.values(): + # Futures may be present even with exactly-once delivery + # disabled, in transition periods after the setting is changed on + # the subscription. + if req.future: + req.future.set_result(AcknowledgeStatus.SUCCESS) + requests_completed.append(req) + return requests_completed, requests_to_retry def heartbeat(self) -> bool: diff --git a/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py b/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py index e9554deda..c08ad05d4 100644 --- a/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py +++ b/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py @@ -521,6 +521,36 @@ def test_send_unary_ack(): ) +def test_send_unary_ack_exactly_once_disabled_with_futures(): + manager = make_manager() + + future1 = futures.Future() + future2 = futures.Future() + ack_reqs_dict = { + "ack_id1": requests.AckRequest( + ack_id="ack_id1", + byte_size=0, + time_to_ack=20, + ordering_key="", + future=future1, + ), + "ack_id2": requests.AckRequest( + ack_id="ack_id2", + byte_size=0, + time_to_ack=20, + ordering_key="", + future=future2, + ), + } + manager.send_unary_ack(ack_ids=["ack_id1", "ack_id2"], ack_reqs_dict=ack_reqs_dict) + + manager._client.acknowledge.assert_called_once_with( + subscription=manager._subscription, ack_ids=["ack_id1", "ack_id2"] + ) + assert future1.result() == subscriber_exceptions.AcknowledgeStatus.SUCCESS + assert future2.result() == subscriber_exceptions.AcknowledgeStatus.SUCCESS + + def test_send_unary_modack(): manager = make_manager() @@ -552,6 +582,43 @@ def test_send_unary_modack(): ) +def test_send_unary_modack_exactly_once_disabled_with_futures(): + manager = make_manager() + + future1 = futures.Future() + future2 = futures.Future() + future3 = futures.Future() + ack_reqs_dict = { + "ack_id3": requests.ModAckRequest(ack_id="ack_id3", seconds=60, future=future1), + "ack_id4": requests.ModAckRequest(ack_id="ack_id4", seconds=60, future=future2), + "ack_id5": requests.ModAckRequest(ack_id="ack_id5", seconds=60, future=future3), + } + manager.send_unary_modack( + modify_deadline_ack_ids=["ack_id3", "ack_id4", "ack_id5"], + modify_deadline_seconds=[10, 20, 20], + ack_reqs_dict=ack_reqs_dict, + ) + + manager._client.modify_ack_deadline.assert_has_calls( + [ + mock.call( + subscription=manager._subscription, + ack_ids=["ack_id3"], + ack_deadline_seconds=10, + ), + mock.call( + subscription=manager._subscription, + ack_ids=["ack_id4", "ack_id5"], + ack_deadline_seconds=20, + ), + ], + any_order=True, + ) + assert future1.result() == subscriber_exceptions.AcknowledgeStatus.SUCCESS + assert future2.result() == subscriber_exceptions.AcknowledgeStatus.SUCCESS + assert future3.result() == subscriber_exceptions.AcknowledgeStatus.SUCCESS + + def test_send_unary_ack_api_call_error(caplog): caplog.set_level(logging.DEBUG) @@ -606,10 +673,39 @@ def test_send_unary_modack_api_call_error(caplog): assert "The front fell off" in caplog.text -def test_send_unary_ack_retry_error(caplog): +def test_send_unary_ack_retry_error_no_futures(caplog): caplog.set_level(logging.DEBUG) manager, _, _, _, _, _ = make_running_manager() + manager._exactly_once_enabled = False + + error = exceptions.RetryError( + "Too long a transient error", cause=Exception("Out of time!") + ) + manager._client.acknowledge.side_effect = error + + ack_reqs_dict = { + "ack_id1": requests.AckRequest( + ack_id="ack_id1", byte_size=0, time_to_ack=20, ordering_key="", future=None, + ), + "ack_id2": requests.AckRequest( + ack_id="ack_id2", byte_size=0, time_to_ack=20, ordering_key="", future=None, + ), + } + with pytest.raises(exceptions.RetryError): + manager.send_unary_ack( + ack_ids=["ack_id1", "ack_id2"], ack_reqs_dict=ack_reqs_dict + ) + + assert "RetryError while sending ack RPC" in caplog.text + assert "signaled streaming pull manager shutdown" in caplog.text + + +def test_send_unary_ack_retry_error_exactly_once_disabled_with_futures(caplog): + caplog.set_level(logging.DEBUG) + + manager, _, _, _, _, _ = make_running_manager() + manager._exactly_once_enabled = False error = exceptions.RetryError( "Too long a transient error", cause=Exception("Out of time!") @@ -639,7 +735,47 @@ def test_send_unary_ack_retry_error(caplog): ack_ids=["ack_id1", "ack_id2"], ack_reqs_dict=ack_reqs_dict ) - assert "RetryError while sending unary RPC" in caplog.text + assert "RetryError while sending ack RPC" in caplog.text + assert "signaled streaming pull manager shutdown" in caplog.text + assert future1.result() == subscriber_exceptions.AcknowledgeStatus.SUCCESS + assert future2.result() == subscriber_exceptions.AcknowledgeStatus.SUCCESS + + +def test_send_unary_ack_retry_error_exactly_once_enabled_with_futures(caplog): + caplog.set_level(logging.DEBUG) + + manager, _, _, _, _, _ = make_running_manager() + manager._exactly_once_enabled = True + + error = exceptions.RetryError( + "Too long a transient error", cause=Exception("Out of time!") + ) + manager._client.acknowledge.side_effect = error + + future1 = futures.Future() + future2 = futures.Future() + ack_reqs_dict = { + "ack_id1": requests.AckRequest( + ack_id="ack_id1", + byte_size=0, + time_to_ack=20, + ordering_key="", + future=future1, + ), + "ack_id2": requests.AckRequest( + ack_id="ack_id2", + byte_size=0, + time_to_ack=20, + ordering_key="", + future=future2, + ), + } + with pytest.raises(exceptions.RetryError): + manager.send_unary_ack( + ack_ids=["ack_id1", "ack_id2"], ack_reqs_dict=ack_reqs_dict + ) + + assert "RetryError while sending ack RPC" in caplog.text assert "signaled streaming pull manager shutdown" in caplog.text assert isinstance(future1.exception(), subscriber_exceptions.AcknowledgeError) assert ( @@ -651,10 +787,67 @@ def test_send_unary_ack_retry_error(caplog): ) -def test_send_unary_modack_retry_error(caplog): +def test_send_unary_modack_retry_error_no_future(caplog): caplog.set_level(logging.DEBUG) manager, _, _, _, _, _ = make_running_manager() + manager._exactly_once_enabled = False + + error = exceptions.RetryError( + "Too long a transient error", cause=Exception("Out of time!") + ) + manager._client.modify_ack_deadline.side_effect = error + + ack_reqs_dict = { + "ackid1": requests.ModAckRequest(ack_id="ackid1", seconds=60, future=None) + } + with pytest.raises(exceptions.RetryError): + manager.send_unary_modack( + modify_deadline_ack_ids=["ackid1"], + modify_deadline_seconds=[0], + ack_reqs_dict=ack_reqs_dict, + ) + + assert "RetryError while sending modack RPC" in caplog.text + assert "signaled streaming pull manager shutdown" in caplog.text + + +def test_send_unary_modack_retry_error_exactly_once_delivery_disabled_with_futures( + caplog, +): + caplog.set_level(logging.DEBUG) + + manager, _, _, _, _, _ = make_running_manager() + manager._exactly_once_enabled = False + + error = exceptions.RetryError( + "Too long a transient error", cause=Exception("Out of time!") + ) + manager._client.modify_ack_deadline.side_effect = error + + future = futures.Future() + ack_reqs_dict = { + "ackid1": requests.ModAckRequest(ack_id="ackid1", seconds=60, future=future) + } + with pytest.raises(exceptions.RetryError): + manager.send_unary_modack( + modify_deadline_ack_ids=["ackid1"], + modify_deadline_seconds=[0], + ack_reqs_dict=ack_reqs_dict, + ) + + assert "RetryError while sending modack RPC" in caplog.text + assert "signaled streaming pull manager shutdown" in caplog.text + assert future.result() == subscriber_exceptions.AcknowledgeStatus.SUCCESS + + +def test_send_unary_modack_retry_error_exactly_once_delivery_enabled_with_futures( + caplog, +): + caplog.set_level(logging.DEBUG) + + manager, _, _, _, _, _ = make_running_manager() + manager._exactly_once_enabled = True error = exceptions.RetryError( "Too long a transient error", cause=Exception("Out of time!") @@ -672,7 +865,7 @@ def test_send_unary_modack_retry_error(caplog): ack_reqs_dict=ack_reqs_dict, ) - assert "RetryError while sending unary RPC" in caplog.text + assert "RetryError while sending modack RPC" in caplog.text assert "signaled streaming pull manager shutdown" in caplog.text assert isinstance(future.exception(), subscriber_exceptions.AcknowledgeError) assert ( From fad4f73f657b6021ddc3e92370f9fdb996f88e79 Mon Sep 17 00:00:00 2001 From: Prad Nelluru Date: Mon, 28 Mar 2022 15:05:30 -0400 Subject: [PATCH 2/4] Add more tests for coverage --- .../subscriber/test_streaming_pull_manager.py | 140 +++++++++++++++++- 1 file changed, 136 insertions(+), 4 deletions(-) diff --git a/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py b/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py index c08ad05d4..8ee6bd290 100644 --- a/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py +++ b/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py @@ -521,6 +521,37 @@ def test_send_unary_ack(): ) +def test_send_unary_ack_exactly_once_enabled_with_futures(): + manager = make_manager() + manager._exactly_once_enabled = True + + future1 = futures.Future() + future2 = futures.Future() + ack_reqs_dict = { + "ack_id1": requests.AckRequest( + ack_id="ack_id1", + byte_size=0, + time_to_ack=20, + ordering_key="", + future=future1, + ), + "ack_id2": requests.AckRequest( + ack_id="ack_id2", + byte_size=0, + time_to_ack=20, + ordering_key="", + future=future2, + ), + } + manager.send_unary_ack(ack_ids=["ack_id1", "ack_id2"], ack_reqs_dict=ack_reqs_dict) + + manager._client.acknowledge.assert_called_once_with( + subscription=manager._subscription, ack_ids=["ack_id1", "ack_id2"] + ) + assert future1.result() == subscriber_exceptions.AcknowledgeStatus.SUCCESS + assert future2.result() == subscriber_exceptions.AcknowledgeStatus.SUCCESS + + def test_send_unary_ack_exactly_once_disabled_with_futures(): manager = make_manager() @@ -582,6 +613,44 @@ def test_send_unary_modack(): ) +def test_send_unary_modack_exactly_once_enabled_with_futures(): + manager = make_manager() + manager._exactly_once_enabled = True + + future1 = futures.Future() + future2 = futures.Future() + future3 = futures.Future() + ack_reqs_dict = { + "ack_id3": requests.ModAckRequest(ack_id="ack_id3", seconds=60, future=future1), + "ack_id4": requests.ModAckRequest(ack_id="ack_id4", seconds=60, future=future2), + "ack_id5": requests.ModAckRequest(ack_id="ack_id5", seconds=60, future=future3), + } + manager.send_unary_modack( + modify_deadline_ack_ids=["ack_id3", "ack_id4", "ack_id5"], + modify_deadline_seconds=[10, 20, 20], + ack_reqs_dict=ack_reqs_dict, + ) + + manager._client.modify_ack_deadline.assert_has_calls( + [ + mock.call( + subscription=manager._subscription, + ack_ids=["ack_id3"], + ack_deadline_seconds=10, + ), + mock.call( + subscription=manager._subscription, + ack_ids=["ack_id4", "ack_id5"], + ack_deadline_seconds=20, + ), + ], + any_order=True, + ) + assert future1.result() == subscriber_exceptions.AcknowledgeStatus.SUCCESS + assert future2.result() == subscriber_exceptions.AcknowledgeStatus.SUCCESS + assert future3.result() == subscriber_exceptions.AcknowledgeStatus.SUCCESS + + def test_send_unary_modack_exactly_once_disabled_with_futures(): manager = make_manager() @@ -673,7 +742,7 @@ def test_send_unary_modack_api_call_error(caplog): assert "The front fell off" in caplog.text -def test_send_unary_ack_retry_error_no_futures(caplog): +def test_send_unary_ack_retry_error_exactly_once_disabled_no_futures(caplog): caplog.set_level(logging.DEBUG) manager, _, _, _, _, _ = make_running_manager() @@ -741,6 +810,42 @@ def test_send_unary_ack_retry_error_exactly_once_disabled_with_futures(caplog): assert future2.result() == subscriber_exceptions.AcknowledgeStatus.SUCCESS +def test_send_unary_ack_retry_error_exactly_once_enabled_no_futures(caplog): + caplog.set_level(logging.DEBUG) + + manager, _, _, _, _, _ = make_running_manager() + manager._exactly_once_enabled = True + + error = exceptions.RetryError( + "Too long a transient error", cause=Exception("Out of time!") + ) + manager._client.acknowledge.side_effect = error + + ack_reqs_dict = { + "ack_id1": requests.AckRequest( + ack_id="ack_id1", + byte_size=0, + time_to_ack=20, + ordering_key="", + future=None, + ), + "ack_id2": requests.AckRequest( + ack_id="ack_id2", + byte_size=0, + time_to_ack=20, + ordering_key="", + future=None, + ), + } + with pytest.raises(exceptions.RetryError): + manager.send_unary_ack( + ack_ids=["ack_id1", "ack_id2"], ack_reqs_dict=ack_reqs_dict + ) + + assert "RetryError while sending ack RPC" in caplog.text + assert "signaled streaming pull manager shutdown" in caplog.text + + def test_send_unary_ack_retry_error_exactly_once_enabled_with_futures(caplog): caplog.set_level(logging.DEBUG) @@ -787,7 +892,7 @@ def test_send_unary_ack_retry_error_exactly_once_enabled_with_futures(caplog): ) -def test_send_unary_modack_retry_error_no_future(caplog): +def test_send_unary_modack_retry_error_exactly_once_disabled_no_future(caplog): caplog.set_level(logging.DEBUG) manager, _, _, _, _, _ = make_running_manager() @@ -812,7 +917,7 @@ def test_send_unary_modack_retry_error_no_future(caplog): assert "signaled streaming pull manager shutdown" in caplog.text -def test_send_unary_modack_retry_error_exactly_once_delivery_disabled_with_futures( +def test_send_unary_modack_retry_error_exactly_once_disabled_with_futures( caplog, ): caplog.set_level(logging.DEBUG) @@ -841,7 +946,34 @@ def test_send_unary_modack_retry_error_exactly_once_delivery_disabled_with_futur assert future.result() == subscriber_exceptions.AcknowledgeStatus.SUCCESS -def test_send_unary_modack_retry_error_exactly_once_delivery_enabled_with_futures( +def test_send_unary_modack_retry_error_exactly_once_enabled_no_futures( + caplog, +): + caplog.set_level(logging.DEBUG) + + manager, _, _, _, _, _ = make_running_manager() + manager._exactly_once_enabled = True + + error = exceptions.RetryError( + "Too long a transient error", cause=Exception("Out of time!") + ) + manager._client.modify_ack_deadline.side_effect = error + + ack_reqs_dict = { + "ackid1": requests.ModAckRequest(ack_id="ackid1", seconds=60, future=None) + } + with pytest.raises(exceptions.RetryError): + manager.send_unary_modack( + modify_deadline_ack_ids=["ackid1"], + modify_deadline_seconds=[0], + ack_reqs_dict=ack_reqs_dict, + ) + + assert "RetryError while sending modack RPC" in caplog.text + assert "signaled streaming pull manager shutdown" in caplog.text + + +def test_send_unary_modack_retry_error_exactly_once_enabled_with_futures( caplog, ): caplog.set_level(logging.DEBUG) From 1159fd895e8d3c242856529741d41274ea5d88aa Mon Sep 17 00:00:00 2001 From: Prad Nelluru Date: Tue, 29 Mar 2022 13:56:04 -0400 Subject: [PATCH 3/4] Reformat tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py --- .../_protocol/streaming_pull_manager.py | 1 - .../subscriber/test_streaming_pull_manager.py | 24 ++++--------------- 2 files changed, 5 insertions(+), 20 deletions(-) diff --git a/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py b/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py index 2773d910a..1a63e50f9 100644 --- a/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py +++ b/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py @@ -168,7 +168,6 @@ def _process_requests( """ requests_completed = [] requests_to_retry = [] - for ack_id in ack_reqs_dict: # Handle special errors returned for ack/modack RPCs via the ErrorInfo # sidecar metadata when exactly-once delivery is enabled. diff --git a/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py b/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py index 8ee6bd290..b19eee9b5 100644 --- a/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py +++ b/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py @@ -823,18 +823,10 @@ def test_send_unary_ack_retry_error_exactly_once_enabled_no_futures(caplog): ack_reqs_dict = { "ack_id1": requests.AckRequest( - ack_id="ack_id1", - byte_size=0, - time_to_ack=20, - ordering_key="", - future=None, + ack_id="ack_id1", byte_size=0, time_to_ack=20, ordering_key="", future=None, ), "ack_id2": requests.AckRequest( - ack_id="ack_id2", - byte_size=0, - time_to_ack=20, - ordering_key="", - future=None, + ack_id="ack_id2", byte_size=0, time_to_ack=20, ordering_key="", future=None, ), } with pytest.raises(exceptions.RetryError): @@ -917,9 +909,7 @@ def test_send_unary_modack_retry_error_exactly_once_disabled_no_future(caplog): assert "signaled streaming pull manager shutdown" in caplog.text -def test_send_unary_modack_retry_error_exactly_once_disabled_with_futures( - caplog, -): +def test_send_unary_modack_retry_error_exactly_once_disabled_with_futures(caplog,): caplog.set_level(logging.DEBUG) manager, _, _, _, _, _ = make_running_manager() @@ -946,9 +936,7 @@ def test_send_unary_modack_retry_error_exactly_once_disabled_with_futures( assert future.result() == subscriber_exceptions.AcknowledgeStatus.SUCCESS -def test_send_unary_modack_retry_error_exactly_once_enabled_no_futures( - caplog, -): +def test_send_unary_modack_retry_error_exactly_once_enabled_no_futures(caplog,): caplog.set_level(logging.DEBUG) manager, _, _, _, _, _ = make_running_manager() @@ -973,9 +961,7 @@ def test_send_unary_modack_retry_error_exactly_once_enabled_no_futures( assert "signaled streaming pull manager shutdown" in caplog.text -def test_send_unary_modack_retry_error_exactly_once_enabled_with_futures( - caplog, -): +def test_send_unary_modack_retry_error_exactly_once_enabled_with_futures(caplog,): caplog.set_level(logging.DEBUG) manager, _, _, _, _, _ = make_running_manager() From 3de727184f64d7caefa85faf1050e7947c40fcc3 Mon Sep 17 00:00:00 2001 From: Prad Nelluru Date: Tue, 29 Mar 2022 13:59:40 -0400 Subject: [PATCH 4/4] Reformat with new version of black --- .../subscriber/test_streaming_pull_manager.py | 36 +++++++++++++++---- 1 file changed, 29 insertions(+), 7 deletions(-) diff --git a/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py b/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py index b19eee9b5..8a1460951 100644 --- a/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py +++ b/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py @@ -755,10 +755,18 @@ def test_send_unary_ack_retry_error_exactly_once_disabled_no_futures(caplog): ack_reqs_dict = { "ack_id1": requests.AckRequest( - ack_id="ack_id1", byte_size=0, time_to_ack=20, ordering_key="", future=None, + ack_id="ack_id1", + byte_size=0, + time_to_ack=20, + ordering_key="", + future=None, ), "ack_id2": requests.AckRequest( - ack_id="ack_id2", byte_size=0, time_to_ack=20, ordering_key="", future=None, + ack_id="ack_id2", + byte_size=0, + time_to_ack=20, + ordering_key="", + future=None, ), } with pytest.raises(exceptions.RetryError): @@ -823,10 +831,18 @@ def test_send_unary_ack_retry_error_exactly_once_enabled_no_futures(caplog): ack_reqs_dict = { "ack_id1": requests.AckRequest( - ack_id="ack_id1", byte_size=0, time_to_ack=20, ordering_key="", future=None, + ack_id="ack_id1", + byte_size=0, + time_to_ack=20, + ordering_key="", + future=None, ), "ack_id2": requests.AckRequest( - ack_id="ack_id2", byte_size=0, time_to_ack=20, ordering_key="", future=None, + ack_id="ack_id2", + byte_size=0, + time_to_ack=20, + ordering_key="", + future=None, ), } with pytest.raises(exceptions.RetryError): @@ -909,7 +925,9 @@ def test_send_unary_modack_retry_error_exactly_once_disabled_no_future(caplog): assert "signaled streaming pull manager shutdown" in caplog.text -def test_send_unary_modack_retry_error_exactly_once_disabled_with_futures(caplog,): +def test_send_unary_modack_retry_error_exactly_once_disabled_with_futures( + caplog, +): caplog.set_level(logging.DEBUG) manager, _, _, _, _, _ = make_running_manager() @@ -936,7 +954,9 @@ def test_send_unary_modack_retry_error_exactly_once_disabled_with_futures(caplog assert future.result() == subscriber_exceptions.AcknowledgeStatus.SUCCESS -def test_send_unary_modack_retry_error_exactly_once_enabled_no_futures(caplog,): +def test_send_unary_modack_retry_error_exactly_once_enabled_no_futures( + caplog, +): caplog.set_level(logging.DEBUG) manager, _, _, _, _, _ = make_running_manager() @@ -961,7 +981,9 @@ def test_send_unary_modack_retry_error_exactly_once_enabled_no_futures(caplog,): assert "signaled streaming pull manager shutdown" in caplog.text -def test_send_unary_modack_retry_error_exactly_once_enabled_with_futures(caplog,): +def test_send_unary_modack_retry_error_exactly_once_enabled_with_futures( + caplog, +): caplog.set_level(logging.DEBUG) manager, _, _, _, _, _ = make_running_manager()