From 416a7d3c916b029f98d318f3865de36d388ed402 Mon Sep 17 00:00:00 2001 From: Cory Johns Date: Mon, 19 Jun 2017 17:07:31 -0400 Subject: [PATCH 1/9] Prevent circular reference between Monitor and Connection --- juju/client/connection.py | 22 ++++++++++++++-------- 1 file changed, 14 insertions(+), 8 deletions(-) diff --git a/juju/client/connection.py b/juju/client/connection.py index 6f2f2a2a0..74da79123 100644 --- a/juju/client/connection.py +++ b/juju/client/connection.py @@ -8,6 +8,7 @@ import ssl import string import subprocess +import weakref import websockets from concurrent.futures import CancelledError from http.client import HTTPSConnection @@ -44,10 +45,10 @@ class Monitor: UNKNOWN = 'unknown' def __init__(self, connection): - self.connection = connection - self.close_called = asyncio.Event(loop=self.connection.loop) - self.receiver_stopped = asyncio.Event(loop=self.connection.loop) - self.pinger_stopped = asyncio.Event(loop=self.connection.loop) + self.connection = weakref.ref(connection) + self.close_called = asyncio.Event(loop=connection.loop) + self.receiver_stopped = asyncio.Event(loop=connection.loop) + self.pinger_stopped = asyncio.Event(loop=connection.loop) self.receiver_stopped.set() self.pinger_stopped.set() @@ -64,8 +65,13 @@ def status(self): """ + connection = self.connection() # DISCONNECTED: connection not yet open - if not self.connection.ws: + if not connection: + # the connection instance was destroyed before we were + # this should never happen + return self.DISCONNECTED + if not connection.ws: return self.DISCONNECTED if self.receiver_stopped.is_set(): return self.DISCONNECTED @@ -74,18 +80,18 @@ def status(self): # connection.close if not self.close_called.is_set() and self.receiver_stopped.is_set(): return self.ERROR - if not self.close_called.is_set() and not self.connection.ws.open: + if not self.close_called.is_set() and not connection.ws.open: # The check for self.receiver_stopped existing above guards # against the case where we're not open because we simply # haven't setup the connection yet. return self.ERROR # DISCONNECTED: cleanly disconnected. - if self.close_called.is_set() and not self.connection.ws.open: + if self.close_called.is_set() and not connection.ws.open: return self.DISCONNECTED # CONNECTED: everything is fine! - if self.connection.ws.open: + if connection.ws.open: return self.CONNECTED # UNKNOWN: We should never hit this state -- if we do, From 188ff6b150110f48ce80929123b90ef971e6ef19 Mon Sep 17 00:00:00 2001 From: Cory Johns Date: Tue, 20 Jun 2017 10:10:40 -0400 Subject: [PATCH 2/9] Improve monitor status logic --- juju/client/connection.py | 46 ++++++++++++++------------------------- 1 file changed, 16 insertions(+), 30 deletions(-) diff --git a/juju/client/connection.py b/juju/client/connection.py index 74da79123..ededfa049 100644 --- a/juju/client/connection.py +++ b/juju/client/connection.py @@ -41,8 +41,8 @@ class Monitor: """ ERROR = 'error' CONNECTED = 'connected' + DISCONNECTING = 'disconnecting' DISCONNECTED = 'disconnected' - UNKNOWN = 'unknown' def __init__(self, connection): self.connection = weakref.ref(connection) @@ -64,40 +64,27 @@ def status(self): isn't usable until that receiver has been started. """ - connection = self.connection() - # DISCONNECTED: connection not yet open + + # the connection instance was destroyed but someone kept + # a separate reference to the monitor for some reason if not connection: - # the connection instance was destroyed before we were - # this should never happen return self.DISCONNECTED + + # connection cleanly disconnected or not yet opened if not connection.ws: return self.DISCONNECTED - if self.receiver_stopped.is_set(): - return self.DISCONNECTED - - # ERROR: Connection closed (or errored), but we didn't call - # connection.close - if not self.close_called.is_set() and self.receiver_stopped.is_set(): - return self.ERROR - if not self.close_called.is_set() and not connection.ws.open: - # The check for self.receiver_stopped existing above guards - # against the case where we're not open because we simply - # haven't setup the connection yet. - return self.ERROR - # DISCONNECTED: cleanly disconnected. - if self.close_called.is_set() and not connection.ws.open: - return self.DISCONNECTED + # close called but not yet complete + if self.close_called.is_set(): + return self.DISCONNECTING - # CONNECTED: everything is fine! - if connection.ws.open: - return self.CONNECTED + # connection closed uncleanly (we didn't call connection.close) + if self.receiver_stopped.is_set() or not connection.ws.open: + return self.ERROR - # UNKNOWN: We should never hit this state -- if we do, - # something went wrong with the logic above, and we do not - # know what state the connection is in. - return self.UNKNOWN + # everything is fine! + return self.CONNECTED class Connection: @@ -150,9 +137,7 @@ def __init__( @property def is_open(self): - if self.ws: - return self.ws.open - return False + return self.monitor.status == Monitor.CONNECTED def _get_ssl(self, cert=None): return ssl.create_default_context( @@ -183,6 +168,7 @@ async def close(self): await self.monitor.pinger_stopped.wait() await self.monitor.receiver_stopped.wait() await self.ws.close() + self.ws = None async def recv(self, request_id): if not self.is_open: From 7dc499e626d00270de578cb0dcb74db0e07183bc Mon Sep 17 00:00:00 2001 From: Cory Johns Date: Tue, 20 Jun 2017 12:02:44 -0400 Subject: [PATCH 3/9] Stop watcher task cleanly on disconnect Instead of the watcher task blowing up with an exception when the connection is closed, it should exit cleanly and allow the Monitor to report the connection error. Fixes #147 --- juju/client/connection.py | 10 +++++----- juju/model.py | 3 +++ 2 files changed, 8 insertions(+), 5 deletions(-) diff --git a/juju/client/connection.py b/juju/client/connection.py index ededfa049..3971876a5 100644 --- a/juju/client/connection.py +++ b/juju/client/connection.py @@ -189,13 +189,13 @@ async def receiver(self): await self.messages.put(result['request-id'], result) except CancelledError: pass - except Exception as e: + except websockets.ConnectionClosed as e: await self.messages.put_all(e) - if isinstance(e, websockets.ConnectionClosed): - # ConnectionClosed is not really exceptional for us, - # but it may be for any pending message listeners - return + return + except Exception as e: log.exception("Error in receiver") + # make pending listeners aware of the error + await self.messages.put_all(e) raise finally: self.monitor.receiver_stopped.set() diff --git a/juju/model.py b/juju/model.py index 61905c921..cc8135240 100644 --- a/juju/model.py +++ b/juju/model.py @@ -14,6 +14,7 @@ from functools import partial from pathlib import Path +import websockets import yaml import theblues.charmstore import theblues.errors @@ -661,6 +662,8 @@ async def _start_watch(): self._watch_received.set() except CancelledError: pass + except websockets.ConnectionClosed: + log.error('Connection closed on watcher') except Exception: log.exception('Error in watcher') raise From 0596a62283b2b7c5c112c0675a35072c9e8a255e Mon Sep 17 00:00:00 2001 From: Cory Johns Date: Tue, 20 Jun 2017 12:20:49 -0400 Subject: [PATCH 4/9] Raise ConnectionClosed exception in Model.block_until Rather than blocking indefinitely when waiting on a model change that will never happen if the connection gets closed out from under it, this makes `Model.block_until` raise a `websockets.ConnectionClosed` error so that it can be caught and dealt with by the client. --- juju/model.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/juju/model.py b/juju/model.py index cc8135240..2f958300b 100644 --- a/juju/model.py +++ b/juju/model.py @@ -551,6 +551,8 @@ async def block_until(self, *conditions, timeout=None, wait_period=0.5): """ async def _block(): while not all(c() for c in conditions): + if not (self.connection and self.connection.is_open): + raise websockets.ConnectionClosed(1006, 'no reason') await asyncio.sleep(wait_period, loop=self.loop) await asyncio.wait_for(_block(), timeout, loop=self.loop) From 0c80770da04b12e1c74d877604768eacde0f68c3 Mon Sep 17 00:00:00 2001 From: Cory Johns Date: Tue, 20 Jun 2017 18:29:02 -0400 Subject: [PATCH 5/9] Automatically reconnect lost websocket connections The receiver or all_watcher should immediately reconnect a lost connection, or it will be reconnected automatically upon issuance of the next RPC call. However, there is still a small chance that the disconnect could happen between sending a API call and receiving the response, in which case a websockets.ConnectionClosed error will still be raised to the caller. These should be quite rare, though. --- juju/client/connection.py | 75 ++++++++++++++++++++-------- juju/model.py | 27 +++++++--- tests/base.py | 5 +- tests/integration/test_connection.py | 28 ++++++++++- tests/integration/test_model.py | 9 ++++ tests/unit/test_connection.py | 2 + 6 files changed, 114 insertions(+), 32 deletions(-) diff --git a/juju/client/connection.py b/juju/client/connection.py index 3971876a5..e362528e9 100644 --- a/juju/client/connection.py +++ b/juju/client/connection.py @@ -46,6 +46,7 @@ class Monitor: def __init__(self, connection): self.connection = weakref.ref(connection) + self.reconnecting = asyncio.Lock(loop=connection.loop) self.close_called = asyncio.Event(loop=connection.loop) self.receiver_stopped = asyncio.Event(loop=connection.loop) self.pinger_stopped = asyncio.Event(loop=connection.loop) @@ -113,6 +114,7 @@ def __init__( self, endpoint, uuid, username, password, cacert=None, macaroons=None, loop=None, max_frame_size=DEFAULT_FRAME_SIZE): self.endpoint = endpoint + self._endpoint = endpoint self.uuid = uuid if macaroons: self.macaroons = macaroons @@ -123,6 +125,7 @@ def __init__( self.username = username self.password = password self.cacert = cacert + self._cacert = cacert self.loop = loop or asyncio.get_event_loop() self.__request_id__ = 0 @@ -162,7 +165,7 @@ async def open(self): return self async def close(self): - if not self.is_open: + if not self.ws: return self.monitor.close_called.set() await self.monitor.pinger_stopped.wait() @@ -190,7 +193,12 @@ async def receiver(self): except CancelledError: pass except websockets.ConnectionClosed as e: + log.warning('Receiver: Connection closed, reconnecting') await self.messages.put_all(e) + # the reconnect has to be done as a task because the receiver will + # be cancelled by the reconnect and we don't want the reconnect + # to be aborted half-way through + self.loop.create_task(self.reconnect()) return except Exception as e: log.exception("Error in receiver") @@ -217,7 +225,7 @@ async def _do_ping(): pinger_facade = client.PingerFacade.from_connection(self) try: - while self.is_open: + while True: await utils.run_with_interrupt( _do_ping(), self.monitor.close_called, @@ -226,6 +234,7 @@ async def _do_ping(): break finally: self.monitor.pinger_stopped.set() + return async def rpc(self, msg, encoder=None): self.__request_id__ += 1 @@ -235,7 +244,19 @@ async def rpc(self, msg, encoder=None): if "version" not in msg: msg['version'] = self.facades[msg['type']] outgoing = json.dumps(msg, indent=2, cls=encoder) - await self.ws.send(outgoing) + for attempt in range(3): + try: + await self.ws.send(outgoing) + break + except websockets.ConnectionClosed: + if attempt == 2: + raise + log.warning('RPC: Connection closed, reconnecting') + # the reconnect has to be done in a separate task because, + # if it is triggered by the pinger, then this RPC call will + # be cancelled when the pinger is cancelled by the reconnect, + # and we don't want the reconnect to be aborted halfway through + await asyncio.wait([self.reconnect()], loop=self.loop) result = await self.recv(msg['request-id']) if not result: @@ -371,36 +392,48 @@ async def _try_endpoint(self, endpoint, cacert): await self.close() return success, result, new_endpoints - @classmethod - async def connect( - cls, endpoint, uuid, username, password, cacert=None, - macaroons=None, loop=None, max_frame_size=None): - """Connect to the websocket. - - If uuid is None, the connection will be to the controller. Otherwise it - will be to the model. - + async def reconnect(self): + """ Force a reconnection. """ - client = cls(endpoint, uuid, username, password, cacert, macaroons, - loop, max_frame_size) - endpoints = [(endpoint, cacert)] + if self.monitor.reconnecting.locked(): + return + async with self.monitor.reconnecting: + await self.close() + await self._connect() + + async def _connect(self): + endpoints = [(self._endpoint, self._cacert)] while endpoints: _endpoint, _cacert = endpoints.pop(0) - success, result, new_endpoints = await client._try_endpoint( + success, result, new_endpoints = await self._try_endpoint( _endpoint, _cacert) if success: break endpoints.extend(new_endpoints) else: # ran out of endpoints without a successful login - raise Exception("Couldn't authenticate to {}".format(endpoint)) + raise Exception("Couldn't authenticate to {}".format( + self._endpoint)) response = result['response'] - client.info = response.copy() - client.build_facades(response.get('facades', {})) - client.loop.create_task(client.pinger()) - client.monitor.pinger_stopped.clear() + self.info = response.copy() + self.build_facades(response.get('facades', {})) + self.loop.create_task(self.pinger()) + self.monitor.pinger_stopped.clear() + @classmethod + async def connect( + cls, endpoint, uuid, username, password, cacert=None, + macaroons=None, loop=None, max_frame_size=None): + """Connect to the websocket. + + If uuid is None, the connection will be to the controller. Otherwise it + will be to the model. + + """ + client = cls(endpoint, uuid, username, password, cacert, macaroons, + loop, max_frame_size) + await client._connect() return client @classmethod diff --git a/juju/model.py b/juju/model.py index 2f958300b..c5cc9a65e 100644 --- a/juju/model.py +++ b/juju/model.py @@ -646,15 +646,28 @@ def _watch(self): See :meth:`add_observer` to register an onchange callback. """ - async def _start_watch(): + async def _all_watcher(): try: allwatcher = client.AllWatcherFacade.from_connection( self.connection) while not self._watch_stopping.is_set(): - results = await utils.run_with_interrupt( - allwatcher.Next(), - self._watch_stopping, - self.loop) + try: + results = await utils.run_with_interrupt( + allwatcher.Next(), + self._watch_stopping, + self.loop) + except websockets.ConnectionClosed: + monitor = self.connection.monitor + if monitor.status == monitor.ERROR: + # closed unexpectedly, try to reopen + log.warning( + 'Watcher: connection closed, reopening') + await self.connection.reconnect() + del allwatcher.Id + continue + else: + # closed on request, go ahead and shutdown + break if self._watch_stopping.is_set(): break for delta in results.deltas: @@ -664,8 +677,6 @@ async def _start_watch(): self._watch_received.set() except CancelledError: pass - except websockets.ConnectionClosed: - log.error('Connection closed on watcher') except Exception: log.exception('Error in watcher') raise @@ -676,7 +687,7 @@ async def _start_watch(): self._watch_received.clear() self._watch_stopping.clear() self._watch_stopped.clear() - self.loop.create_task(_start_watch()) + self.loop.create_task(_all_watcher()) async def _notify_observers(self, delta, old_obj, new_obj): """Call observing callbacks, notifying them of a change in model state diff --git a/tests/base.py b/tests/base.py index 8ea51092d..e1ec45238 100644 --- a/tests/base.py +++ b/tests/base.py @@ -44,6 +44,9 @@ async def __aenter__(self): model_name = 'model-{}'.format(uuid.uuid4()) self.model = await self.controller.add_model(model_name) + # save the model UUID in case test closes model + self.model_uuid = self.model.info.uuid + # Ensure that we connect to the new model by default. This also # prevents failures if test was started with no current model. self._patch_cm = mock.patch.object(JujuData, 'current_model', @@ -55,7 +58,7 @@ async def __aenter__(self): async def __aexit__(self, exc_type, exc, tb): self._patch_cm.stop() await self.model.disconnect() - await self.controller.destroy_model(self.model.info.uuid) + await self.controller.destroy_model(self.model_uuid) await self.controller.disconnect() diff --git a/tests/integration/test_connection.py b/tests/integration/test_connection.py index 67dfb2e3c..290203d47 100644 --- a/tests/integration/test_connection.py +++ b/tests/integration/test_connection.py @@ -1,3 +1,4 @@ +import asyncio import pytest from juju.client.connection import Connection @@ -47,7 +48,7 @@ async def test_monitor_catches_error(event_loop): @pytest.mark.asyncio async def test_full_status(event_loop): async with base.CleanModel() as model: - app = await model.deploy( + await model.deploy( 'ubuntu-0', application_name='ubuntu', series='trusty', @@ -56,4 +57,27 @@ async def test_full_status(event_loop): c = client.ClientFacade.from_connection(model.connection) - status = await c.FullStatus(None) + await c.FullStatus(None) + + +@base.bootstrapped +@pytest.mark.asyncio +async def test_reconnect(event_loop): + async with base.CleanModel() as model: + conn = await Connection.connect( + model.connection.endpoint, + model.connection.uuid, + model.connection.username, + model.connection.password, + model.connection.cacert, + model.connection.macaroons, + model.connection.loop, + model.connection.max_frame_size) + try: + await asyncio.sleep(0.1) + assert conn.is_open + await conn.ws.close() + assert not conn.is_open + await model.block_until(lambda: conn.is_open, timeout=3) + finally: + await conn.close() diff --git a/tests/integration/test_model.py b/tests/integration/test_model.py index 088dcd576..37f51c0dc 100644 --- a/tests/integration/test_model.py +++ b/tests/integration/test_model.py @@ -212,6 +212,15 @@ async def test_get_machines(event_loop): assert isinstance(result, list) +@base.bootstrapped +@pytest.mark.asyncio +async def test_watcher_reconnect(event_loop): + async with base.CleanModel() as model: + await model.connection.ws.close() + await asyncio.sleep(0.1) + assert model.connection.is_open + + # @base.bootstrapped # @pytest.mark.asyncio # async def test_grant(event_loop) diff --git a/tests/unit/test_connection.py b/tests/unit/test_connection.py index 340264ead..f69b8d6bc 100644 --- a/tests/unit/test_connection.py +++ b/tests/unit/test_connection.py @@ -1,3 +1,4 @@ +import asyncio import json import mock import pytest @@ -20,6 +21,7 @@ async def send(self, message): async def recv(self): if not self.responses: + await asyncio.sleep(1) # delay to give test time to finish raise ConnectionClosed(0, 'ran out of responses') return json.dumps(self.responses.popleft()) From 9db226f102785c8b9692143c0b00517040ad63fc Mon Sep 17 00:00:00 2001 From: Cory Johns Date: Wed, 21 Jun 2017 15:04:08 -0400 Subject: [PATCH 6/9] Gracefully handle add_signal_handler failing in a thread in loop.run --- juju/loop.py | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/juju/loop.py b/juju/loop.py index 3720159df..4abedfcc3 100644 --- a/juju/loop.py +++ b/juju/loop.py @@ -20,7 +20,14 @@ def abort(): task.cancel() run._sigint = True - loop.add_signal_handler(signal.SIGINT, abort) + added = False + try: + loop.add_signal_handler(signal.SIGINT, abort) + added = True + except ValueError as e: + # add_signal_handler doesn't work in a thread + if 'main thread' not in str(e): + raise try: for step in steps: task = loop.create_task(step) @@ -31,4 +38,5 @@ def abort(): raise task.exception() return task.result() finally: - loop.remove_signal_handler(signal.SIGINT) + if added: + loop.remove_signal_handler(signal.SIGINT) From d22497ab848c6983fecc8b999bb7d529c5f54a28 Mon Sep 17 00:00:00 2001 From: Cory Johns Date: Thu, 22 Jun 2017 10:17:04 -0400 Subject: [PATCH 7/9] Restart AllWatcher if controller stops it Fixes conjure-up/conjure-up#965 --- juju/model.py | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/juju/model.py b/juju/model.py index c5cc9a65e..051343e9b 100644 --- a/juju/model.py +++ b/juju/model.py @@ -656,6 +656,18 @@ async def _all_watcher(): allwatcher.Next(), self._watch_stopping, self.loop) + except JujuAPIError as e: + if 'watcher was stopped' not in str(e): + raise + # controller stopped our watcher for some reason + # (we never actually call AllWatcherFacade.Stop(), + # so there's no reason we should see this, but the + # controller does occasionally send it anyway; so + # if we get it, just start a new watcher) + log.warning( + 'Watcher: watcher stopped, restarting') + del allwatcher.Id + continue except websockets.ConnectionClosed: monitor = self.connection.monitor if monitor.status == monitor.ERROR: From bb114597c0e20797ae7e8f302c38b7a83920ccda Mon Sep 17 00:00:00 2001 From: Cory Johns Date: Thu, 22 Jun 2017 11:11:25 -0400 Subject: [PATCH 8/9] Explicitly let the controller know we're stopping the watcher --- juju/model.py | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/juju/model.py b/juju/model.py index 051343e9b..7b86ba3b5 100644 --- a/juju/model.py +++ b/juju/model.py @@ -659,11 +659,14 @@ async def _all_watcher(): except JujuAPIError as e: if 'watcher was stopped' not in str(e): raise + if self._watch_stopping.is_set(): + # this shouldn't ever actually happen, because + # the event should trigger before the controller + # has a chance to tell us the watcher is stopped + # but handle it gracefully, just in case + break # controller stopped our watcher for some reason - # (we never actually call AllWatcherFacade.Stop(), - # so there's no reason we should see this, but the - # controller does occasionally send it anyway; so - # if we get it, just start a new watcher) + # but we're not actually stopping, so just restart it log.warning( 'Watcher: watcher stopped, restarting') del allwatcher.Id @@ -681,6 +684,7 @@ async def _all_watcher(): # closed on request, go ahead and shutdown break if self._watch_stopping.is_set(): + await allwatcher.Stop() break for delta in results.deltas: delta = get_entity_delta(delta) From 86b54982547ccaa52b64553fecb25a94050a6777 Mon Sep 17 00:00:00 2001 From: Cory Johns Date: Thu, 22 Jun 2017 15:09:59 -0400 Subject: [PATCH 9/9] Skip reconnect if close was requested --- juju/client/connection.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/juju/client/connection.py b/juju/client/connection.py index e362528e9..745739187 100644 --- a/juju/client/connection.py +++ b/juju/client/connection.py @@ -395,9 +395,10 @@ async def _try_endpoint(self, endpoint, cacert): async def reconnect(self): """ Force a reconnection. """ - if self.monitor.reconnecting.locked(): + monitor = self.monitor + if monitor.reconnecting.locked() or monitor.close_called.is_set(): return - async with self.monitor.reconnecting: + async with monitor.reconnecting: await self.close() await self._connect()