From cc1cd27b49fbe7fdba8f2f5e435955ae07b00c3c Mon Sep 17 00:00:00 2001 From: larkee Date: Tue, 4 Aug 2020 14:55:58 +1000 Subject: [PATCH 1/4] fix: resume iterator on EOS internal error --- google/cloud/spanner_v1/snapshot.py | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/google/cloud/spanner_v1/snapshot.py b/google/cloud/spanner_v1/snapshot.py index 0b5ee1d894..c7867a5b2f 100644 --- a/google/cloud/spanner_v1/snapshot.py +++ b/google/cloud/spanner_v1/snapshot.py @@ -20,7 +20,7 @@ from google.cloud.spanner_v1.proto.transaction_pb2 import TransactionOptions from google.cloud.spanner_v1.proto.transaction_pb2 import TransactionSelector -from google.api_core.exceptions import ServiceUnavailable +from google.api_core.exceptions import ServiceUnavailable, InternalServerError import google.api_core.gapic_v1.method from google.cloud._helpers import _datetime_to_pb_timestamp from google.cloud.spanner_v1._helpers import _merge_query_options @@ -32,6 +32,9 @@ from google.cloud.spanner_v1.types import PartitionOptions from google.cloud.spanner_v1._opentelemetry_tracing import trace_call +_STREAM_RESUMPTION_INTERNAL_ERROR_MESSAGES = ( + "RST_STREAM", +) def _restart_on_unavailable(restart, trace_name=None, session=None, attributes=None): """Restart iteration after :exc:`.ServiceUnavailable`. @@ -55,6 +58,17 @@ def _restart_on_unavailable(restart, trace_name=None, session=None, attributes=N with trace_call(trace_name, session, attributes): iterator = restart(resume_token=resume_token) continue + except InternalServerError as exc: + resumable_error = any( + resumable_message in exc.message + for resumable_message in _STREAM_RESUMPTION_INTERNAL_ERROR_MESSAGES + ) + if not resumable_error: + raise + del item_buffer[:] + with trace_call(trace_name, session, attributes): + iterator = restart(resume_token=resume_token) + continue if len(item_buffer) == 0: break From a572c2fd7412608ed3d98960be4f776f29281b30 Mon Sep 17 00:00:00 2001 From: larkee Date: Thu, 6 Aug 2020 12:13:54 +1000 Subject: [PATCH 2/4] fix: add additional stream resumption message --- google/cloud/spanner_v1/snapshot.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/google/cloud/spanner_v1/snapshot.py b/google/cloud/spanner_v1/snapshot.py index c7867a5b2f..e42f309d98 100644 --- a/google/cloud/spanner_v1/snapshot.py +++ b/google/cloud/spanner_v1/snapshot.py @@ -34,8 +34,10 @@ _STREAM_RESUMPTION_INTERNAL_ERROR_MESSAGES = ( "RST_STREAM", + "Received unexpected EOS on DATA frame from server", ) + def _restart_on_unavailable(restart, trace_name=None, session=None, attributes=None): """Restart iteration after :exc:`.ServiceUnavailable`. From 2036af910cb15d82697626f5211aaa097b7473d3 Mon Sep 17 00:00:00 2001 From: larkee Date: Fri, 21 Aug 2020 15:43:16 +1000 Subject: [PATCH 3/4] test: add unit tests --- tests/unit/test_snapshot.py | 141 ++++++++++++++++++++++++++++++++++-- 1 file changed, 134 insertions(+), 7 deletions(-) diff --git a/tests/unit/test_snapshot.py b/tests/unit/test_snapshot.py index 5c53ee6a0e..8589a0c363 100644 --- a/tests/unit/test_snapshot.py +++ b/tests/unit/test_snapshot.py @@ -86,12 +86,35 @@ def test_iteration_w_raw_w_resume_tken(self): self.assertNoSpans() def test_iteration_w_raw_raising_unavailable_no_token(self): + from google.api_core.exceptions import ServiceUnavailable + + ITEMS = ( + self._make_item(0), + self._make_item(1, resume_token=RESUME_TOKEN), + self._make_item(2), + ) + before = _MockIterator(fail_after=True, error=ServiceUnavailable("testing")) + after = _MockIterator(*ITEMS) + restart = mock.Mock(spec=[], side_effect=[before, after]) + resumable = self._call_fut(restart) + self.assertEqual(list(resumable), list(ITEMS)) + self.assertEqual(restart.mock_calls, [mock.call(), mock.call(resume_token=b"")]) + self.assertNoSpans() + + def test_iteration_w_raw_raising_retryable_internal_error_no_token(self): + from google.api_core.exceptions import InternalServerError + ITEMS = ( self._make_item(0), self._make_item(1, resume_token=RESUME_TOKEN), self._make_item(2), ) - before = _MockIterator(fail_after=True) + before = _MockIterator( + fail_after=True, + error=InternalServerError( + "Received unexpected EOS on DATA frame from server" + ), + ) after = _MockIterator(*ITEMS) restart = mock.Mock(spec=[], side_effect=[before, after]) resumable = self._call_fut(restart) @@ -99,11 +122,32 @@ def test_iteration_w_raw_raising_unavailable_no_token(self): self.assertEqual(restart.mock_calls, [mock.call(), mock.call(resume_token=b"")]) self.assertNoSpans() + def test_iteration_w_raw_raising_non_retryable_internal_error_no_token(self): + from google.api_core.exceptions import InternalServerError + + ITEMS = ( + self._make_item(0), + self._make_item(1, resume_token=RESUME_TOKEN), + self._make_item(2), + ) + before = _MockIterator(fail_after=True, error=InternalServerError("testing")) + after = _MockIterator(*ITEMS) + restart = mock.Mock(spec=[], side_effect=[before, after]) + resumable = self._call_fut(restart) + with self.assertRaises(InternalServerError): + list(resumable) + self.assertEqual(restart.mock_calls, [mock.call()]) + self.assertNoSpans() + def test_iteration_w_raw_raising_unavailable(self): + from google.api_core.exceptions import ServiceUnavailable + FIRST = (self._make_item(0), self._make_item(1, resume_token=RESUME_TOKEN)) SECOND = (self._make_item(2),) # discarded after 503 LAST = (self._make_item(3),) - before = _MockIterator(*(FIRST + SECOND), fail_after=True) + before = _MockIterator( + *(FIRST + SECOND), fail_after=True, error=ServiceUnavailable("testing") + ) after = _MockIterator(*LAST) restart = mock.Mock(spec=[], side_effect=[before, after]) resumable = self._call_fut(restart) @@ -113,10 +157,53 @@ def test_iteration_w_raw_raising_unavailable(self): ) self.assertNoSpans() + def test_iteration_w_raw_raising_retryable_internal_error(self): + from google.api_core.exceptions import InternalServerError + + FIRST = (self._make_item(0), self._make_item(1, resume_token=RESUME_TOKEN)) + SECOND = (self._make_item(2),) # discarded after 503 + LAST = (self._make_item(3),) + before = _MockIterator( + *(FIRST + SECOND), + fail_after=True, + error=InternalServerError( + "Received unexpected EOS on DATA frame from server" + ) + ) + after = _MockIterator(*LAST) + restart = mock.Mock(spec=[], side_effect=[before, after]) + resumable = self._call_fut(restart) + self.assertEqual(list(resumable), list(FIRST + LAST)) + self.assertEqual( + restart.mock_calls, [mock.call(), mock.call(resume_token=RESUME_TOKEN)] + ) + self.assertNoSpans() + + def test_iteration_w_raw_raising_non_retryable_internal_error(self): + from google.api_core.exceptions import InternalServerError + + FIRST = (self._make_item(0), self._make_item(1, resume_token=RESUME_TOKEN)) + SECOND = (self._make_item(2),) # discarded after 503 + LAST = (self._make_item(3),) + before = _MockIterator( + *(FIRST + SECOND), fail_after=True, error=InternalServerError("testing") + ) + after = _MockIterator(*LAST) + restart = mock.Mock(spec=[], side_effect=[before, after]) + resumable = self._call_fut(restart) + with self.assertRaises(InternalServerError): + list(resumable) + self.assertEqual(restart.mock_calls, [mock.call()]) + self.assertNoSpans() + def test_iteration_w_raw_raising_unavailable_after_token(self): + from google.api_core.exceptions import ServiceUnavailable + FIRST = (self._make_item(0), self._make_item(1, resume_token=RESUME_TOKEN)) SECOND = (self._make_item(2), self._make_item(3)) - before = _MockIterator(*FIRST, fail_after=True) + before = _MockIterator( + *FIRST, fail_after=True, error=ServiceUnavailable("testing") + ) after = _MockIterator(*SECOND) restart = mock.Mock(spec=[], side_effect=[before, after]) resumable = self._call_fut(restart) @@ -126,6 +213,43 @@ def test_iteration_w_raw_raising_unavailable_after_token(self): ) self.assertNoSpans() + def test_iteration_w_raw_raising_retryable_internal_error_after_token(self): + from google.api_core.exceptions import InternalServerError + + FIRST = (self._make_item(0), self._make_item(1, resume_token=RESUME_TOKEN)) + SECOND = (self._make_item(2), self._make_item(3)) + before = _MockIterator( + *FIRST, + fail_after=True, + error=InternalServerError( + "Received unexpected EOS on DATA frame from server" + ) + ) + after = _MockIterator(*SECOND) + restart = mock.Mock(spec=[], side_effect=[before, after]) + resumable = self._call_fut(restart) + self.assertEqual(list(resumable), list(FIRST + SECOND)) + self.assertEqual( + restart.mock_calls, [mock.call(), mock.call(resume_token=RESUME_TOKEN)] + ) + self.assertNoSpans() + + def test_iteration_w_raw_raising_non_retryable_internal_error_after_token(self): + from google.api_core.exceptions import InternalServerError + + FIRST = (self._make_item(0), self._make_item(1, resume_token=RESUME_TOKEN)) + SECOND = (self._make_item(2), self._make_item(3)) + before = _MockIterator( + *FIRST, fail_after=True, error=InternalServerError("testing") + ) + after = _MockIterator(*SECOND) + restart = mock.Mock(spec=[], side_effect=[before, after]) + resumable = self._call_fut(restart) + with self.assertRaises(InternalServerError): + list(resumable) + self.assertEqual(restart.mock_calls, [mock.call()]) + self.assertNoSpans() + def test_iteration_w_span_creation(self): name = "TestSpan" extra_atts = {"test_att": 1} @@ -136,11 +260,15 @@ def test_iteration_w_span_creation(self): self.assertSpanAttributes(name, attributes=dict(BASE_ATTRIBUTES, test_att=1)) def test_iteration_w_multiple_span_creation(self): + from google.api_core.exceptions import ServiceUnavailable + if HAS_OPENTELEMETRY_INSTALLED: FIRST = (self._make_item(0), self._make_item(1, resume_token=RESUME_TOKEN)) SECOND = (self._make_item(2),) # discarded after 503 LAST = (self._make_item(3),) - before = _MockIterator(*(FIRST + SECOND), fail_after=True) + before = _MockIterator( + *(FIRST + SECOND), fail_after=True, error=ServiceUnavailable("testing") + ) after = _MockIterator(*LAST) restart = mock.Mock(spec=[], side_effect=[before, after]) name = "TestSpan" @@ -1153,18 +1281,17 @@ class _MockIterator(object): def __init__(self, *values, **kw): self._iter_values = iter(values) self._fail_after = kw.pop("fail_after", False) + self._error = kw.pop("error", Exception) def __iter__(self): return self def __next__(self): - from google.api_core.exceptions import ServiceUnavailable - try: return next(self._iter_values) except StopIteration: if self._fail_after: - raise ServiceUnavailable("testing") + raise self._error raise next = __next__ From 59948825410fcdcb17fdc3314d51ef83dfc3ee19 Mon Sep 17 00:00:00 2001 From: larkee <31196561+larkee@users.noreply.github.com> Date: Mon, 24 Aug 2020 17:03:36 +1000 Subject: [PATCH 4/4] Apply suggestions from code review Co-authored-by: Tres Seaver --- google/cloud/spanner_v1/snapshot.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/google/cloud/spanner_v1/snapshot.py b/google/cloud/spanner_v1/snapshot.py index e42f309d98..42e71545d4 100644 --- a/google/cloud/spanner_v1/snapshot.py +++ b/google/cloud/spanner_v1/snapshot.py @@ -20,7 +20,8 @@ from google.cloud.spanner_v1.proto.transaction_pb2 import TransactionOptions from google.cloud.spanner_v1.proto.transaction_pb2 import TransactionSelector -from google.api_core.exceptions import ServiceUnavailable, InternalServerError +from google.api_core.exceptions import InternalServerError +from google.api_core.exceptions import ServiceUnavailable import google.api_core.gapic_v1.method from google.cloud._helpers import _datetime_to_pb_timestamp from google.cloud.spanner_v1._helpers import _merge_query_options