Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions kubernetes/base/watch/watch.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,11 +96,11 @@ def get_watch_argument_name(self, func):
def unmarshal_event(self, data, return_type):
js = json.loads(data)
js['raw_object'] = js['object']
# BOOKMARK event is treated the same as ERROR for a quick fix of
# decoding exception
# TODO: make use of the resource_version in BOOKMARK event for more
# efficient WATCH
if return_type and js['type'] != 'ERROR' and js['type'] != 'BOOKMARK':
if return_type and js['type'] != 'ERROR':
if js['type'] == 'BOOKMARK':
# treat BOOKMARK as a custom object, so that resource_version
# is updated
return_type = object
obj = SimpleNamespace(data=json.dumps(js['raw_object']))
js['object'] = self._api_client.deserialize(obj, return_type)
if hasattr(js['object'], 'metadata'):
Expand Down
42 changes: 38 additions & 4 deletions kubernetes/base/watch/watch_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,42 @@ def get_values(*args, **kwargs):
# more strict test with worse error message
self.assertEqual(fake_api.get_namespaces.mock_calls, calls)

def test_watch_resource_version_with_bookmark(self):
"""
test the resource_version get updated for bookmark events

"""
fake_resp = Mock()
fake_resp.close = Mock()
fake_resp.release_conn = Mock()
values = [
'{"type": "ADDED", "object": {"metadata": {"name": "test1",'
'"resourceVersion": "1"}, "spec": {}, "status": {}}}\n',
'{ "type": "BOOKMARK", "object": {"kind": "Pod", "apiVersion": "v1",'
'"metadata": {"resourceVersion": "3"} }}\n',
'{"type": "ADDED", "object": {"metadata": {"name": "test3",'
'"resourceVersion": "4"}, "spec": {}, "status": {}}}\n']

def get_values(*args, **kwargs):
return values

fake_resp.stream = Mock(
side_effect=get_values)

fake_api = Mock()
fake_api.get_namespaces = Mock(return_value=fake_resp)
fake_api.get_namespaces.__doc__ = ':return: V1NamespaceList'

w = Watch()
iterations = 3

for c, e in enumerate(w.stream(fake_api.get_namespaces,
resource_version="0")):
self.assertEqual(w.resource_version,
e['raw_object']['metadata']['resourceVersion'])
if c == len(values) * iterations:
w.stop()

def test_watch_stream_twice(self):
w = Watch(float)
for step in ['first', 'second']:
Expand Down Expand Up @@ -263,10 +299,8 @@ def test_unmarshal_with_bookmark(self):
'"metadata":{},"spec":{"containers":null}}},"status":{}}}',
'V1Job')
self.assertEqual("BOOKMARK", event['type'])
# Watch.resource_version is *not* updated, as BOOKMARK is treated the
# same as ERROR for a quick fix of decoding exception,
# resource_version in BOOKMARK is *not* used at all.
self.assertEqual(None, w.resource_version)

self.assertEqual("1", w.resource_version)

def test_watch_with_exception(self):
fake_resp = Mock()
Expand Down