From 562a3b3caaf30e60a84621cd26a4a3d547b79533 Mon Sep 17 00:00:00 2001 From: Anna Cocuzzo Date: Mon, 21 Mar 2022 22:42:17 -0400 Subject: [PATCH 1/8] fix: mypy errors --- .../_protocol/streaming_pull_manager.py | 25 +++++++++++-------- 1 file changed, 14 insertions(+), 11 deletions(-) diff --git a/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py b/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py index e098491fe..e0a508b81 100644 --- a/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py +++ b/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py @@ -20,7 +20,7 @@ import logging import threading import typing -from typing import Any, Callable, Iterable, List, Optional, Tuple, Union +from typing import Any, Dict, Callable, Iterable, List, Optional, Tuple, Union import uuid import grpc # type: ignore @@ -159,7 +159,7 @@ def _get_ack_errors( def _process_requests( error_status: Optional["status_pb2.Status"], - ack_reqs_dict: "containers.ScalarMap", + ack_reqs_dict: Dict[str, requests.AckRequest], errors_dict: Optional["containers.ScalarMap"], ): """Process requests by referring to error_status and errors_dict. @@ -174,7 +174,7 @@ def _process_requests( # Handle special errors returned for ack/modack RPCs via the ErrorInfo # sidecar metadata when exactly-once delivery is enabled. if errors_dict and ack_id in errors_dict: - exactly_once_error = errors_dict[ack_id] + exactly_once_error = str(errors_dict[ack_id]) if exactly_once_error.startswith("TRANSIENT_"): requests_to_retry.append(ack_reqs_dict[ack_id]) else: @@ -182,10 +182,10 @@ def _process_requests( exc = AcknowledgeError(AcknowledgeStatus.INVALID_ACK_ID, info=None) else: exc = AcknowledgeError(AcknowledgeStatus.OTHER, exactly_once_error) - future = ack_reqs_dict[ack_id].future - future.set_exception(exc) - requests_completed.append(ack_reqs_dict[ack_id]) + if future is not None: + future.set_exception(exc) + requests_completed.append(ack_reqs_dict[ack_id]) # Temporary GRPC errors are retried elif ( error_status @@ -200,15 +200,18 @@ def _process_requests( exc = AcknowledgeError(AcknowledgeStatus.FAILED_PRECONDITION, info=None) else: exc = AcknowledgeError(AcknowledgeStatus.OTHER, str(error_status)) - future = ack_reqs_dict[ack_id].future - future.set_exception(exc) - requests_completed.append(ack_reqs_dict[ack_id]) + if ack_id in ack_reqs_dict: + future = ack_reqs_dict[ack_id].future + if future is not None: + future.set_exception(exc) + requests_completed.append(ack_reqs_dict[ack_id]) # Since no error occurred, requests with futures are completed successfully. elif ack_reqs_dict[ack_id].future: future = ack_reqs_dict[ack_id].future # success - future.set_result(AcknowledgeStatus.SUCCESS) - requests_completed.append(ack_reqs_dict[ack_id]) + if future is not None: + future.set_result(AcknowledgeStatus.SUCCESS) + requests_completed.append(ack_reqs_dict[ack_id]) # All other requests are considered completed. else: requests_completed.append(ack_reqs_dict[ack_id]) From 02913e1425e6be54665c6e50154d9c46c90e229f Mon Sep 17 00:00:00 2001 From: Anna Cocuzzo Date: Mon, 21 Mar 2022 23:05:43 -0400 Subject: [PATCH 2/8] fixing linter errors --- .../_protocol/streaming_pull_manager.py | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py b/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py index e0a508b81..c2f64f6f9 100644 --- a/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py +++ b/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py @@ -184,8 +184,8 @@ def _process_requests( exc = AcknowledgeError(AcknowledgeStatus.OTHER, exactly_once_error) future = ack_reqs_dict[ack_id].future if future is not None: - future.set_exception(exc) - requests_completed.append(ack_reqs_dict[ack_id]) + future.set_exception(exc) + requests_completed.append(ack_reqs_dict[ack_id]) # Temporary GRPC errors are retried elif ( error_status @@ -201,17 +201,17 @@ def _process_requests( else: exc = AcknowledgeError(AcknowledgeStatus.OTHER, str(error_status)) if ack_id in ack_reqs_dict: - future = ack_reqs_dict[ack_id].future - if future is not None: - future.set_exception(exc) - requests_completed.append(ack_reqs_dict[ack_id]) + future = ack_reqs_dict[ack_id].future + if future is not None: + future.set_exception(exc) + requests_completed.append(ack_reqs_dict[ack_id]) # Since no error occurred, requests with futures are completed successfully. elif ack_reqs_dict[ack_id].future: future = ack_reqs_dict[ack_id].future # success if future is not None: - future.set_result(AcknowledgeStatus.SUCCESS) - requests_completed.append(ack_reqs_dict[ack_id]) + future.set_result(AcknowledgeStatus.SUCCESS) + requests_completed.append(ack_reqs_dict[ack_id]) # All other requests are considered completed. else: requests_completed.append(ack_reqs_dict[ack_id]) From 0c43b0ac8e30bb86352f6c0436cba83785fec75f Mon Sep 17 00:00:00 2001 From: Anna Cocuzzo Date: Tue, 22 Mar 2022 14:51:32 -0400 Subject: [PATCH 3/8] addressing review comments --- .../_protocol/streaming_pull_manager.py | 22 +++++++++---------- 1 file changed, 10 insertions(+), 12 deletions(-) diff --git a/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py b/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py index c2f64f6f9..8b374b2ca 100644 --- a/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py +++ b/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py @@ -141,9 +141,7 @@ def _get_status(exc: exceptions.GoogleAPICallError,) -> Optional["status_pb2.Sta return None -def _get_ack_errors( - exc: exceptions.GoogleAPICallError, -) -> Optional["containers.ScalarMap"]: +def _get_ack_errors(exc: exceptions.GoogleAPICallError,) -> Optional[Dict[str, str]]: status = _get_status(exc) if not status: _LOGGER.debug("Unable to get status of errored RPC.") @@ -160,7 +158,7 @@ def _get_ack_errors( def _process_requests( error_status: Optional["status_pb2.Status"], ack_reqs_dict: Dict[str, requests.AckRequest], - errors_dict: Optional["containers.ScalarMap"], + errors_dict: Optional[Dict[str, str]], ): """Process requests by referring to error_status and errors_dict. @@ -174,7 +172,7 @@ def _process_requests( # Handle special errors returned for ack/modack RPCs via the ErrorInfo # sidecar metadata when exactly-once delivery is enabled. if errors_dict and ack_id in errors_dict: - exactly_once_error = str(errors_dict[ack_id]) + exactly_once_error = errors_dict[ack_id] if exactly_once_error.startswith("TRANSIENT_"): requests_to_retry.append(ack_reqs_dict[ack_id]) else: @@ -185,7 +183,7 @@ def _process_requests( future = ack_reqs_dict[ack_id].future if future is not None: future.set_exception(exc) - requests_completed.append(ack_reqs_dict[ack_id]) + requests_completed.append(ack_reqs_dict[ack_id]) # Temporary GRPC errors are retried elif ( error_status @@ -200,18 +198,18 @@ def _process_requests( exc = AcknowledgeError(AcknowledgeStatus.FAILED_PRECONDITION, info=None) else: exc = AcknowledgeError(AcknowledgeStatus.OTHER, str(error_status)) - if ack_id in ack_reqs_dict: - future = ack_reqs_dict[ack_id].future - if future is not None: - future.set_exception(exc) - requests_completed.append(ack_reqs_dict[ack_id]) + assert ack_reqs_dict[ack_id] is not None + future = ack_reqs_dict[ack_id].future + if future is not None: + future.set_exception(exc) + requests_completed.append(ack_reqs_dict[ack_id]) # Since no error occurred, requests with futures are completed successfully. elif ack_reqs_dict[ack_id].future: future = ack_reqs_dict[ack_id].future # success if future is not None: future.set_result(AcknowledgeStatus.SUCCESS) - requests_completed.append(ack_reqs_dict[ack_id]) + requests_completed.append(ack_reqs_dict[ack_id]) # All other requests are considered completed. else: requests_completed.append(ack_reqs_dict[ack_id]) From 86d16f7162712da61ce2d101a316fd6dd4ff2fca Mon Sep 17 00:00:00 2001 From: Anna Cocuzzo Date: Tue, 22 Mar 2022 15:40:21 -0400 Subject: [PATCH 4/8] fixing test coverage --- .../_protocol/streaming_pull_manager.py | 6 +- .../subscriber/test_streaming_pull_manager.py | 65 +++++++++++++++++++ 2 files changed, 67 insertions(+), 4 deletions(-) diff --git a/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py b/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py index 8b374b2ca..4d9097ff9 100644 --- a/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py +++ b/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py @@ -49,7 +49,6 @@ if typing.TYPE_CHECKING: # pragma: NO COVER from google.cloud.pubsub_v1 import subscriber - from google.protobuf.internal import containers _LOGGER = logging.getLogger(__name__) @@ -198,7 +197,6 @@ def _process_requests( exc = AcknowledgeError(AcknowledgeStatus.FAILED_PRECONDITION, info=None) else: exc = AcknowledgeError(AcknowledgeStatus.OTHER, str(error_status)) - assert ack_reqs_dict[ack_id] is not None future = ack_reqs_dict[ack_id].future if future is not None: future.set_exception(exc) @@ -207,8 +205,8 @@ def _process_requests( elif ack_reqs_dict[ack_id].future: future = ack_reqs_dict[ack_id].future # success - if future is not None: - future.set_result(AcknowledgeStatus.SUCCESS) + assert future is not None + future.set_result(AcknowledgeStatus.SUCCESS) requests_completed.append(ack_reqs_dict[ack_id]) # All other requests are considered completed. else: diff --git a/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py b/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py index 36f82b621..572a04b98 100644 --- a/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py +++ b/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py @@ -1713,6 +1713,21 @@ def test_process_requests_no_errors(): assert not requests_to_retry +def test_process_requests_no_errors_no_future(): + # no errors so request and its future should be completed + ack_reqs_dict = { + "ackid1": requests.AckRequest( + ack_id="ackid1", byte_size=0, time_to_ack=20, ordering_key="", future=None + ) + } + errors_dict = {} + requests_completed, requests_to_retry = streaming_pull_manager._process_requests( + None, ack_reqs_dict, errors_dict + ) + assert requests_completed[0].ack_id == "ackid1" + assert not requests_to_retry + + def test_process_requests_permanent_error_raises_exception(): # a permanent error raises an exception future = futures.Future() @@ -1735,6 +1750,40 @@ def test_process_requests_permanent_error_raises_exception(): assert not requests_to_retry +def test_process_requests_permanent_error_other_raises_exception(): + # a permanent error raises an exception + future = futures.Future() + ack_reqs_dict = { + "ackid1": requests.AckRequest( + ack_id="ackid1", byte_size=0, time_to_ack=20, ordering_key="", future=future + ) + } + errors_dict = {"ackid1": "PERMANENT_FAILURE_OTHER"} + requests_completed, requests_to_retry = streaming_pull_manager._process_requests( + None, ack_reqs_dict, errors_dict + ) + assert requests_completed[0].ack_id == "ackid1" + with pytest.raises(subscriber_exceptions.AcknowledgeError) as exc_info: + future.result() + assert exc_info.value.error_code == subscriber_exceptions.AcknowledgeStatus.OTHER + assert not requests_to_retry + + +def test_process_requests_permanent_error_other_raises_exception_no_future(): + # a permanent error raises an exception + ack_reqs_dict = { + "ackid1": requests.AckRequest( + ack_id="ackid1", byte_size=0, time_to_ack=20, ordering_key="", future=None + ) + } + errors_dict = {"ackid1": "PERMANENT_FAILURE_OTHER"} + requests_completed, requests_to_retry = streaming_pull_manager._process_requests( + None, ack_reqs_dict, errors_dict + ) + assert requests_completed[0].ack_id == "ackid1" + assert not requests_to_retry + + def test_process_requests_transient_error_returns_request_for_retrying(): # a transient error returns the request in `requests_to_retry` future = futures.Future() @@ -1872,6 +1921,22 @@ def test_process_requests_other_error_status_raises_exception(): assert not requests_to_retry +def test_process_requests_other_error_status_raises_exception_no_future(): + # an unrecognized error status raises an exception + ack_reqs_dict = { + "ackid1": requests.AckRequest( + ack_id="ackid1", byte_size=0, time_to_ack=20, ordering_key="", future=None + ) + } + st = status_pb2.Status() + st.code = code_pb2.Code.OUT_OF_RANGE + requests_completed, requests_to_retry = streaming_pull_manager._process_requests( + st, ack_reqs_dict, None + ) + assert requests_completed[0].ack_id == "ackid1" + assert not requests_to_retry + + def test_process_requests_mixed_success_and_failure_acks(): # mixed success and failure (acks) future1 = futures.Future() From 09c27459eb7d13fabe4049595ac3427750f463a6 Mon Sep 17 00:00:00 2001 From: Anna Cocuzzo Date: Tue, 22 Mar 2022 20:06:31 -0400 Subject: [PATCH 5/8] fixing test comments --- .../pubsub_v1/subscriber/test_streaming_pull_manager.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py b/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py index 572a04b98..9c620fd5d 100644 --- a/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py +++ b/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py @@ -1714,7 +1714,7 @@ def test_process_requests_no_errors(): def test_process_requests_no_errors_no_future(): - # no errors so request and its future should be completed + # no errors, request should be completed, even when future is None. ack_reqs_dict = { "ackid1": requests.AckRequest( ack_id="ackid1", byte_size=0, time_to_ack=20, ordering_key="", future=None @@ -1770,7 +1770,7 @@ def test_process_requests_permanent_error_other_raises_exception(): def test_process_requests_permanent_error_other_raises_exception_no_future(): - # a permanent error raises an exception + # with a permanent error, request is completed even when future is None. ack_reqs_dict = { "ackid1": requests.AckRequest( ack_id="ackid1", byte_size=0, time_to_ack=20, ordering_key="", future=None @@ -1922,7 +1922,8 @@ def test_process_requests_other_error_status_raises_exception(): def test_process_requests_other_error_status_raises_exception_no_future(): - # an unrecognized error status raises an exception + # with an unrecognized error status, requests are completed, even when + # future is None. ack_reqs_dict = { "ackid1": requests.AckRequest( ack_id="ackid1", byte_size=0, time_to_ack=20, ordering_key="", future=None From 09c7632b0d72bc8dc480da3363ad31b07a7fd113 Mon Sep 17 00:00:00 2001 From: Anna Cocuzzo Date: Tue, 22 Mar 2022 20:44:00 -0400 Subject: [PATCH 6/8] fixing test comments --- tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py b/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py index 9c620fd5d..76220795a 100644 --- a/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py +++ b/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py @@ -10,7 +10,7 @@ # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and -# limitations under the License. +# limitations under the License. import functools import logging From 0d9cf22aa0873a374c26d443cd7ff29e4e596234 Mon Sep 17 00:00:00 2001 From: Anna Cocuzzo Date: Tue, 22 Mar 2022 20:50:45 -0400 Subject: [PATCH 7/8] linting for test --- tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py b/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py index 76220795a..9c620fd5d 100644 --- a/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py +++ b/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py @@ -10,7 +10,7 @@ # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and -# limitations under the License. +# limitations under the License. import functools import logging From 7f4e2ee9c38594bffd72a9efd8929cc7f67dc808 Mon Sep 17 00:00:00 2001 From: Anna Cocuzzo Date: Wed, 23 Mar 2022 09:31:38 -0400 Subject: [PATCH 8/8] fixing test comment --- tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py b/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py index 9c620fd5d..e9554deda 100644 --- a/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py +++ b/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py @@ -1751,7 +1751,7 @@ def test_process_requests_permanent_error_raises_exception(): def test_process_requests_permanent_error_other_raises_exception(): - # a permanent error raises an exception + # a permanent error of other raises an exception future = futures.Future() ack_reqs_dict = { "ackid1": requests.AckRequest(