From eb98435b8fb06fc79a8d360bf31bb44b7187605f Mon Sep 17 00:00:00 2001 From: Chris Wilcox Date: Wed, 29 Jan 2020 13:02:23 -0800 Subject: [PATCH 1/4] fix: fetch updated resume tokens for restarting of stream --- firestore/google/cloud/firestore_v1/watch.py | 14 ++++++++++---- firestore/tests/unit/v1/test_watch.py | 8 +++++++- 2 files changed, 17 insertions(+), 5 deletions(-) diff --git a/firestore/google/cloud/firestore_v1/watch.py b/firestore/google/cloud/firestore_v1/watch.py index 2216acd4580a..670464357274 100644 --- a/firestore/google/cloud/firestore_v1/watch.py +++ b/firestore/google/cloud/firestore_v1/watch.py @@ -213,9 +213,9 @@ def __init__( self._closing = threading.Lock() self._closed = False - initial_request = firestore_pb2.ListenRequest( - database=self._firestore._database_string, add_target=self._targets - ) + self.resume_token = None + + initial_request = self._get_initial_request if ResumableBidiRpc is None: ResumableBidiRpc = self.ResumableBidiRpc # FBO unit tests @@ -252,13 +252,19 @@ def __init__( self.has_pushed = False # The server assigns and updates the resume token. - self.resume_token = None if BackgroundConsumer is None: # FBO unit tests BackgroundConsumer = self.BackgroundConsumer self._consumer = BackgroundConsumer(self._rpc, self.on_snapshot) self._consumer.start() + def _get_initial_request(self): + if self.resume_token is not None: + self._targets["resume_token"] = self.resume_token + return firestore_pb2.ListenRequest( + database=self._firestore._database_string, add_target=self._targets + ) + @property def is_active(self): """bool: True if this manager is actively streaming. diff --git a/firestore/tests/unit/v1/test_watch.py b/firestore/tests/unit/v1/test_watch.py index afd88b813081..b3369e66c3c0 100644 --- a/firestore/tests/unit/v1/test_watch.py +++ b/firestore/tests/unit/v1/test_watch.py @@ -209,7 +209,7 @@ def test_ctor(self): self.assertIs(inst._rpc.start_rpc, inst._api.transport.listen) self.assertIs(inst._rpc.should_recover, _should_recover) self.assertIs(inst._rpc.should_terminate, _should_terminate) - self.assertIsInstance(inst._rpc.initial_request, firestore_pb2.ListenRequest) + self.assertIsInstance(inst._rpc.initial_request(), firestore_pb2.ListenRequest) self.assertEqual(inst._rpc.metadata, DummyFirestore._rpc_metadata) def test__on_rpc_done(self): @@ -776,6 +776,12 @@ def test__reset_docs(self): self.assertEqual(inst.resume_token, None) self.assertFalse(inst.current) + def test_resume_token_sent_on_recovery(self): + inst = self._makeOne() + inst.resume_token = b"ABCD0123" + request = inst._get_initial_request() + self.assertEqual(request.add_target.resume_token, b"ABCD0123") + class DummyFirestoreStub(object): def Listen(self): # pragma: NO COVER From 19a7e5992b7a091b01bcfd5540baccfc6b9c1d6b Mon Sep 17 00:00:00 2001 From: Chris Wilcox Date: Wed, 29 Jan 2020 15:42:34 -0800 Subject: [PATCH 2/4] fix: Improve DummyRPC to call initial_request when necessary --- firestore/tests/unit/v1/test_watch.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/firestore/tests/unit/v1/test_watch.py b/firestore/tests/unit/v1/test_watch.py index b3369e66c3c0..fe02db104e16 100644 --- a/firestore/tests/unit/v1/test_watch.py +++ b/firestore/tests/unit/v1/test_watch.py @@ -209,7 +209,7 @@ def test_ctor(self): self.assertIs(inst._rpc.start_rpc, inst._api.transport.listen) self.assertIs(inst._rpc.should_recover, _should_recover) self.assertIs(inst._rpc.should_terminate, _should_terminate) - self.assertIsInstance(inst._rpc.initial_request(), firestore_pb2.ListenRequest) + self.assertIsInstance(inst._rpc.initial_request, firestore_pb2.ListenRequest) self.assertEqual(inst._rpc.metadata, DummyFirestore._rpc_metadata) def test__on_rpc_done(self): @@ -928,7 +928,7 @@ def __init__( self.start_rpc = start_rpc self.should_recover = should_recover self.should_terminate = should_terminate - self.initial_request = initial_request + self.initial_request = initial_request() self.metadata = metadata self.closed = False self.callbacks = [] From 3c170a362b830efb71098353a6e98dee0600e445 Mon Sep 17 00:00:00 2001 From: Chris Wilcox Date: Thu, 30 Jan 2020 16:07:58 -0800 Subject: [PATCH 3/4] fix: rename method to reflect request is not initial only --- firestore/google/cloud/firestore_v1/watch.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/firestore/google/cloud/firestore_v1/watch.py b/firestore/google/cloud/firestore_v1/watch.py index 670464357274..1037322230d1 100644 --- a/firestore/google/cloud/firestore_v1/watch.py +++ b/firestore/google/cloud/firestore_v1/watch.py @@ -215,7 +215,7 @@ def __init__( self.resume_token = None - initial_request = self._get_initial_request + rpc_request = self._get_rpc_request if ResumableBidiRpc is None: ResumableBidiRpc = self.ResumableBidiRpc # FBO unit tests @@ -224,7 +224,7 @@ def __init__( self._api.transport.listen, should_recover=_should_recover, should_terminate=_should_terminate, - initial_request=initial_request, + initial_request=rpc_request, metadata=self._firestore._rpc_metadata, ) @@ -258,7 +258,7 @@ def __init__( self._consumer = BackgroundConsumer(self._rpc, self.on_snapshot) self._consumer.start() - def _get_initial_request(self): + def _get_rpc_request(self): if self.resume_token is not None: self._targets["resume_token"] = self.resume_token return firestore_pb2.ListenRequest( From a20b5d88fd0cbb3e9950d3ceb198a594b8aa90e6 Mon Sep 17 00:00:00 2001 From: Chris Wilcox Date: Thu, 30 Jan 2020 18:56:32 -0800 Subject: [PATCH 4/4] fix: adjust test to use new function name --- firestore/tests/unit/v1/test_watch.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/firestore/tests/unit/v1/test_watch.py b/firestore/tests/unit/v1/test_watch.py index fe02db104e16..0778717bcc09 100644 --- a/firestore/tests/unit/v1/test_watch.py +++ b/firestore/tests/unit/v1/test_watch.py @@ -779,7 +779,7 @@ def test__reset_docs(self): def test_resume_token_sent_on_recovery(self): inst = self._makeOne() inst.resume_token = b"ABCD0123" - request = inst._get_initial_request() + request = inst._get_rpc_request() self.assertEqual(request.add_target.resume_token, b"ABCD0123")