From acd79dbd829fd2f4878938483f1a6ffaf0ea86f9 Mon Sep 17 00:00:00 2001 From: Chris Wilcox Date: Fri, 24 Jan 2020 14:48:26 -0800 Subject: [PATCH 1/7] fix: consume part of StreamingResponseIterator to support failure while under retry --- api_core/google/api_core/grpc_helpers.py | 18 +++++++++++++++++- 1 file changed, 17 insertions(+), 1 deletion(-) diff --git a/api_core/google/api_core/grpc_helpers.py b/api_core/google/api_core/grpc_helpers.py index 4d63beb36910..2a37cd456878 100644 --- a/api_core/google/api_core/grpc_helpers.py +++ b/api_core/google/api_core/grpc_helpers.py @@ -64,6 +64,18 @@ def error_remapped_callable(*args, **kwargs): class _StreamingResponseIterator(grpc.Call): def __init__(self, wrapped): self._wrapped = wrapped + self._stored_first_result = False + + # This iterator is used in a retry context, and returned outside after init. + # gRPC will not throw an exception until the stream is consumed, so we need + # to retrieve the first result, in order to fail, in order to trigger a retry. + try: + self._first_result = six.next(self._wrapped) + self._stored_first_result = True + except StopIteration: + # ignore stop iteration at this time. This should be handled outside of retry. + pass + def __iter__(self): """This iterator is also an iterable that returns itself.""" @@ -76,8 +88,12 @@ def next(self): protobuf.Message: A single response from the stream. """ try: - return six.next(self._wrapped) + if self._stored_first_result: + self._stored_first_result = False + return self._first_result + return six.next(self._wrapped) except grpc.RpcError as exc: + # If the stream has already returned data, we cannot recover here. six.raise_from(exceptions.from_grpc_error(exc), exc) # Alias needed for Python 2/3 support. From 21864410699099fa730e8ba20658fec89a0cdf91 Mon Sep 17 00:00:00 2001 From: Chris Wilcox Date: Fri, 24 Jan 2020 15:24:41 -0800 Subject: [PATCH 2/7] fix: move to not having a flag for first result --- api_core/google/api_core/grpc_helpers.py | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/api_core/google/api_core/grpc_helpers.py b/api_core/google/api_core/grpc_helpers.py index 2a37cd456878..1b3e78954fe4 100644 --- a/api_core/google/api_core/grpc_helpers.py +++ b/api_core/google/api_core/grpc_helpers.py @@ -64,14 +64,12 @@ def error_remapped_callable(*args, **kwargs): class _StreamingResponseIterator(grpc.Call): def __init__(self, wrapped): self._wrapped = wrapped - self._stored_first_result = False # This iterator is used in a retry context, and returned outside after init. # gRPC will not throw an exception until the stream is consumed, so we need # to retrieve the first result, in order to fail, in order to trigger a retry. try: - self._first_result = six.next(self._wrapped) - self._stored_first_result = True + self._stored_first_result = six.next(self._wrapped) except StopIteration: # ignore stop iteration at this time. This should be handled outside of retry. pass @@ -88,10 +86,11 @@ def next(self): protobuf.Message: A single response from the stream. """ try: - if self._stored_first_result: - self._stored_first_result = False - return self._first_result - return six.next(self._wrapped) + if hasattr(self._stored_first_result): + result = self._stored_first_result + del self._stored_first_result + return result + return six.next(self._wrapped) except grpc.RpcError as exc: # If the stream has already returned data, we cannot recover here. six.raise_from(exceptions.from_grpc_error(exc), exc) From 1c36fded97f037cef53477570254f485c1a6cd6b Mon Sep 17 00:00:00 2001 From: Chris Wilcox Date: Mon, 27 Jan 2020 11:27:11 -0800 Subject: [PATCH 3/7] fix: fix tests --- api_core/google/api_core/grpc_helpers.py | 6 +++++- api_core/tests/unit/test_grpc_helpers.py | 5 ++--- 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/api_core/google/api_core/grpc_helpers.py b/api_core/google/api_core/grpc_helpers.py index 1b3e78954fe4..b15686c9cad4 100644 --- a/api_core/google/api_core/grpc_helpers.py +++ b/api_core/google/api_core/grpc_helpers.py @@ -70,6 +70,10 @@ def __init__(self, wrapped): # to retrieve the first result, in order to fail, in order to trigger a retry. try: self._stored_first_result = six.next(self._wrapped) + except TypeError: + # It is possible the wrappe method isn't an iterable (a grpc.Call + # for instance). If this happens don't store the first result. + pass except StopIteration: # ignore stop iteration at this time. This should be handled outside of retry. pass @@ -86,7 +90,7 @@ def next(self): protobuf.Message: A single response from the stream. """ try: - if hasattr(self._stored_first_result): + if hasattr(self, "_stored_first_result"): result = self._stored_first_result del self._stored_first_result return result diff --git a/api_core/tests/unit/test_grpc_helpers.py b/api_core/tests/unit/test_grpc_helpers.py index c37c3eedbe83..17f05a1f5064 100644 --- a/api_core/tests/unit/test_grpc_helpers.py +++ b/api_core/tests/unit/test_grpc_helpers.py @@ -88,6 +88,7 @@ def test_wrap_stream_iterable_iterface(): got_iterator = wrapped_callable() + callable_.assert_called_once_with() # Check each aliased method in the grpc.Call interface @@ -146,10 +147,8 @@ def test_wrap_stream_errors_iterator(): wrapped_callable = grpc_helpers._wrap_stream_errors(callable_) - got_iterator = wrapped_callable(1, 2, three="four") - with pytest.raises(exceptions.ServiceUnavailable) as exc_info: - next(got_iterator) + got_iterator = wrapped_callable(1, 2, three="four") callable_.assert_called_once_with(1, 2, three="four") assert exc_info.value.response == grpc_error From d2271dd195d93a222675480a14d370e205884e3f Mon Sep 17 00:00:00 2001 From: Chris Wilcox Date: Mon, 27 Jan 2020 13:23:35 -0800 Subject: [PATCH 4/7] fix: lint adjust --- api_core/google/api_core/grpc_helpers.py | 5 ++--- api_core/tests/unit/test_grpc_helpers.py | 3 +-- 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/api_core/google/api_core/grpc_helpers.py b/api_core/google/api_core/grpc_helpers.py index b15686c9cad4..8e583af64c8b 100644 --- a/api_core/google/api_core/grpc_helpers.py +++ b/api_core/google/api_core/grpc_helpers.py @@ -66,7 +66,7 @@ def __init__(self, wrapped): self._wrapped = wrapped # This iterator is used in a retry context, and returned outside after init. - # gRPC will not throw an exception until the stream is consumed, so we need + # gRPC will not throw an exception until the stream is consumed, so we need # to retrieve the first result, in order to fail, in order to trigger a retry. try: self._stored_first_result = six.next(self._wrapped) @@ -78,7 +78,6 @@ def __init__(self, wrapped): # ignore stop iteration at this time. This should be handled outside of retry. pass - def __iter__(self): """This iterator is also an iterable that returns itself.""" return self @@ -96,7 +95,7 @@ def next(self): return result return six.next(self._wrapped) except grpc.RpcError as exc: - # If the stream has already returned data, we cannot recover here. + # If the stream has already returned data, we cannot recover here. six.raise_from(exceptions.from_grpc_error(exc), exc) # Alias needed for Python 2/3 support. diff --git a/api_core/tests/unit/test_grpc_helpers.py b/api_core/tests/unit/test_grpc_helpers.py index 17f05a1f5064..72d1439f8510 100644 --- a/api_core/tests/unit/test_grpc_helpers.py +++ b/api_core/tests/unit/test_grpc_helpers.py @@ -88,7 +88,6 @@ def test_wrap_stream_iterable_iterface(): got_iterator = wrapped_callable() - callable_.assert_called_once_with() # Check each aliased method in the grpc.Call interface @@ -148,7 +147,7 @@ def test_wrap_stream_errors_iterator(): wrapped_callable = grpc_helpers._wrap_stream_errors(callable_) with pytest.raises(exceptions.ServiceUnavailable) as exc_info: - got_iterator = wrapped_callable(1, 2, three="four") + wrapped_callable(1, 2, three="four") callable_.assert_called_once_with(1, 2, three="four") assert exc_info.value.response == grpc_error From 49fc431c4e09d8802cdcb5cf45acf44232789194 Mon Sep 17 00:00:00 2001 From: Chris Wilcox Date: Mon, 27 Jan 2020 14:05:54 -0800 Subject: [PATCH 5/7] fix: add unit test --- api_core/tests/unit/test_grpc_helpers.py | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/api_core/tests/unit/test_grpc_helpers.py b/api_core/tests/unit/test_grpc_helpers.py index 72d1439f8510..9ab117bea68a 100644 --- a/api_core/tests/unit/test_grpc_helpers.py +++ b/api_core/tests/unit/test_grpc_helpers.py @@ -129,6 +129,20 @@ def test_wrap_stream_errors_invocation(): assert exc_info.value.response == grpc_error +def test_wrap_stream_empty_iterator(): + expected_responses = [] + callable_ = mock.Mock(spec=["__call__"], return_value=iter(expected_responses)) + + wrapped_callable = grpc_helpers._wrap_stream_errors(callable_) + + got_iterator = wrapped_callable() + + responses = list(got_iterator) + + callable_.assert_called_once_with() + assert responses == expected_responses + + class RpcResponseIteratorImpl(object): def __init__(self, exception): self._exception = exception From 135d9c0f22325e5823133f18d71f2ea7e19566ea Mon Sep 17 00:00:00 2001 From: Chris Wilcox Date: Mon, 27 Jan 2020 14:53:22 -0800 Subject: [PATCH 6/7] fix: add unit test for failure in iteration --- api_core/tests/unit/test_grpc_helpers.py | 29 ++++++++++++++++++++---- 1 file changed, 24 insertions(+), 5 deletions(-) diff --git a/api_core/tests/unit/test_grpc_helpers.py b/api_core/tests/unit/test_grpc_helpers.py index 9ab117bea68a..1fec64f7cec6 100644 --- a/api_core/tests/unit/test_grpc_helpers.py +++ b/api_core/tests/unit/test_grpc_helpers.py @@ -144,18 +144,21 @@ def test_wrap_stream_empty_iterator(): class RpcResponseIteratorImpl(object): - def __init__(self, exception): - self._exception = exception + def __init__(self, iterable): + self._iterable = iter(iterable) def next(self): - raise self._exception + next_item = next(self._iterable) + if isinstance(next_item, RpcErrorImpl): + raise next_item + return next_item __next__ = next -def test_wrap_stream_errors_iterator(): +def test_wrap_stream_errors_iterator_initialization(): grpc_error = RpcErrorImpl(grpc.StatusCode.UNAVAILABLE) - response_iter = RpcResponseIteratorImpl(grpc_error) + response_iter = RpcResponseIteratorImpl([grpc_error]) callable_ = mock.Mock(spec=["__call__"], return_value=response_iter) wrapped_callable = grpc_helpers._wrap_stream_errors(callable_) @@ -167,6 +170,22 @@ def test_wrap_stream_errors_iterator(): assert exc_info.value.response == grpc_error +def test_wrap_stream_errors_during_iteration(): + grpc_error = RpcErrorImpl(grpc.StatusCode.UNAVAILABLE) + response_iter = RpcResponseIteratorImpl([1, grpc_error]) + callable_ = mock.Mock(spec=["__call__"], return_value=response_iter) + + wrapped_callable = grpc_helpers._wrap_stream_errors(callable_) + got_iterator = wrapped_callable(1, 2, three="four") + next(got_iterator) + + with pytest.raises(exceptions.ServiceUnavailable) as exc_info: + next(got_iterator) + + callable_.assert_called_once_with(1, 2, three="four") + assert exc_info.value.response == grpc_error + + @mock.patch("google.api_core.grpc_helpers._wrap_unary_errors") def test_wrap_errors_non_streaming(wrap_unary_errors): callable_ = mock.create_autospec(grpc.UnaryUnaryMultiCallable) From 0f77cb5642aa8570b54239b1c9844320f8a8e304 Mon Sep 17 00:00:00 2001 From: Chris Wilcox Date: Mon, 27 Jan 2020 15:15:32 -0800 Subject: [PATCH 7/7] fix: typo in comment --- api_core/google/api_core/grpc_helpers.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/api_core/google/api_core/grpc_helpers.py b/api_core/google/api_core/grpc_helpers.py index 8e583af64c8b..c47b09fdb883 100644 --- a/api_core/google/api_core/grpc_helpers.py +++ b/api_core/google/api_core/grpc_helpers.py @@ -71,7 +71,7 @@ def __init__(self, wrapped): try: self._stored_first_result = six.next(self._wrapped) except TypeError: - # It is possible the wrappe method isn't an iterable (a grpc.Call + # It is possible the wrapped method isn't an iterable (a grpc.Call # for instance). If this happens don't store the first result. pass except StopIteration: