From f379bbabf33f239f49b0e1b7652b6212f3960910 Mon Sep 17 00:00:00 2001 From: Sanjay Pillai Date: Mon, 9 May 2022 01:21:20 +0530 Subject: [PATCH] update watch resource_version on BOOKMARK event --- kubernetes/base/watch/watch.py | 10 +++---- kubernetes/base/watch/watch_test.py | 42 ++++++++++++++++++++++++++--- 2 files changed, 43 insertions(+), 9 deletions(-) diff --git a/kubernetes/base/watch/watch.py b/kubernetes/base/watch/watch.py index 71fd459191..d7d085f32c 100644 --- a/kubernetes/base/watch/watch.py +++ b/kubernetes/base/watch/watch.py @@ -96,11 +96,11 @@ def get_watch_argument_name(self, func): def unmarshal_event(self, data, return_type): js = json.loads(data) js['raw_object'] = js['object'] - # BOOKMARK event is treated the same as ERROR for a quick fix of - # decoding exception - # TODO: make use of the resource_version in BOOKMARK event for more - # efficient WATCH - if return_type and js['type'] != 'ERROR' and js['type'] != 'BOOKMARK': + if return_type and js['type'] != 'ERROR': + if js['type'] == 'BOOKMARK': + # treat BOOKMARK as a custom object, so that resource_version + # is updated + return_type = object obj = SimpleNamespace(data=json.dumps(js['raw_object'])) js['object'] = self._api_client.deserialize(obj, return_type) if hasattr(js['object'], 'metadata'): diff --git a/kubernetes/base/watch/watch_test.py b/kubernetes/base/watch/watch_test.py index f87a4ea8be..b341e61525 100644 --- a/kubernetes/base/watch/watch_test.py +++ b/kubernetes/base/watch/watch_test.py @@ -167,6 +167,42 @@ def get_values(*args, **kwargs): # more strict test with worse error message self.assertEqual(fake_api.get_namespaces.mock_calls, calls) + def test_watch_resource_version_with_bookmark(self): + """ + test the resource_version get updated for bookmark events + + """ + fake_resp = Mock() + fake_resp.close = Mock() + fake_resp.release_conn = Mock() + values = [ + '{"type": "ADDED", "object": {"metadata": {"name": "test1",' + '"resourceVersion": "1"}, "spec": {}, "status": {}}}\n', + '{ "type": "BOOKMARK", "object": {"kind": "Pod", "apiVersion": "v1",' + '"metadata": {"resourceVersion": "3"} }}\n', + '{"type": "ADDED", "object": {"metadata": {"name": "test3",' + '"resourceVersion": "4"}, "spec": {}, "status": {}}}\n'] + + def get_values(*args, **kwargs): + return values + + fake_resp.stream = Mock( + side_effect=get_values) + + fake_api = Mock() + fake_api.get_namespaces = Mock(return_value=fake_resp) + fake_api.get_namespaces.__doc__ = ':return: V1NamespaceList' + + w = Watch() + iterations = 3 + + for c, e in enumerate(w.stream(fake_api.get_namespaces, + resource_version="0")): + self.assertEqual(w.resource_version, + e['raw_object']['metadata']['resourceVersion']) + if c == len(values) * iterations: + w.stop() + def test_watch_stream_twice(self): w = Watch(float) for step in ['first', 'second']: @@ -263,10 +299,8 @@ def test_unmarshal_with_bookmark(self): '"metadata":{},"spec":{"containers":null}}},"status":{}}}', 'V1Job') self.assertEqual("BOOKMARK", event['type']) - # Watch.resource_version is *not* updated, as BOOKMARK is treated the - # same as ERROR for a quick fix of decoding exception, - # resource_version in BOOKMARK is *not* used at all. - self.assertEqual(None, w.resource_version) + + self.assertEqual("1", w.resource_version) def test_watch_with_exception(self): fake_resp = Mock()