From 7222fa532f3cfbaef109fe6f72826d544efdd6ac Mon Sep 17 00:00:00 2001 From: Sebastiaan la Fleur Date: Mon, 30 Sep 2024 16:57:43 +0200 Subject: [PATCH 1/3] Implemented optional reconnect for S2Connection. --- src/s2python/s2_connection.py | 113 +++++++++++++++++++++++----------- 1 file changed, 76 insertions(+), 37 deletions(-) diff --git a/src/s2python/s2_connection.py b/src/s2python/s2_connection.py index d766593..b65cfcf 100644 --- a/src/s2python/s2_connection.py +++ b/src/s2python/s2_connection.py @@ -1,6 +1,7 @@ import asyncio import json import logging +import time import threading import uuid from dataclasses import dataclass @@ -180,6 +181,7 @@ def register_handler(self, msg_type: Type[S2Message], handler: S2MessageHandler) class S2Connection: # pylint: disable=too-many-instance-attributes url: str + reconnect: bool reception_status_awaiter: ReceptionStatusAwaiter ws: Optional[WSConnection] s2_parser: S2Parser @@ -202,9 +204,12 @@ def __init__( role: EnergyManagementRole, control_types: List[S2ControlType], asset_details: AssetDetails, + reconnect: bool = False, ) -> None: self.url = url + self.reconnect = reconnect self.reception_status_awaiter = ReceptionStatusAwaiter() + self.ws = None self.s2_parser = S2Parser() self._handlers = MessageHandlers() @@ -221,14 +226,13 @@ def __init__( self._handlers.register_handler(HandshakeResponse, self.handle_handshake_response_as_rm) def start_as_rm(self) -> None: - self._thread = threading.Thread(target=self._run_eventloop, daemon=False) - self._thread.start() - logger.debug("Started eventloop thread!") + self._run_eventloop(self._run_as_rm()) - def _run_eventloop(self) -> None: + def _run_eventloop(self, main_task: Awaitable[None]) -> None: + self._thread = threading.current_thread() logger.debug("Starting eventloop") try: - self._eventloop.run_until_complete(self._run_as_rm()) + self._eventloop.run_until_complete(main_task) except asyncio.CancelledError: pass logger.debug("S2 connection thread has stopped.") @@ -256,45 +260,69 @@ async def _do_stop(self) -> None: async def _run_as_rm(self) -> None: logger.debug("Connecting as S2 resource manager.") - self._received_messages = asyncio.Queue() + self._stop_event = asyncio.Event() - await self.connect_ws() - background_tasks = [] - background_tasks.append(self._eventloop.create_task(self._receive_messages())) - background_tasks.append(self._eventloop.create_task(self._handle_received_messages())) + first_run = True - async def wait_till_stop() -> None: - await self._stop_event.wait() + while (first_run or self.reconnect) and not self._stop_event.is_set(): + first_run = False + self._restart_connection_event = asyncio.Event() + await self._connect_and_run() + time.sleep(1) - background_tasks.append(self._eventloop.create_task(wait_till_stop())) + logger.debug("Finished S2 connection eventloop.") - await self.connect_as_rm() - (done, pending) = await asyncio.wait(background_tasks, return_when=asyncio.FIRST_COMPLETED) - for task in done: - try: - await task - except asyncio.CancelledError: - pass - except websockets.ConnectionClosedError: - logger.info("The other party closed the websocket connection.c") + async def _connect_and_run(self) -> None: + self._received_messages = asyncio.Queue() + await self._connect_ws() + if self.ws: - for task in pending: - try: - task.cancel() - await task - except asyncio.CancelledError: - pass + async def wait_till_stop() -> None: + await self._stop_event.wait() + + async def wait_till_connection_restart() -> None: + await self._restart_connection_event.wait() + + background_tasks = [ + self._eventloop.create_task(self._receive_messages()), + self._eventloop.create_task(wait_till_stop()), + self._eventloop.create_task(self._connect_as_rm()), + self._eventloop.create_task(wait_till_connection_restart()), + ] + + (done, pending) = await asyncio.wait( + background_tasks, return_when=asyncio.FIRST_COMPLETED + ) + if self._current_control_type: + self._current_control_type.deactivate(self) + self._current_control_type = None + + for task in done: + try: + await task + except asyncio.CancelledError: + pass + except (websockets.ConnectionClosedError, websockets.ConnectionClosedOK): + logger.info("The other party closed the websocket connection.") + + for task in pending: + try: + task.cancel() + await task + except asyncio.CancelledError: + pass - if self.ws: await self.ws.close() await self.ws.wait_closed() - logger.debug("Finished S2 connection eventloop.") - async def connect_ws(self) -> None: - self.ws = await ws_connect(uri=self.url) + async def _connect_ws(self) -> None: + try: + self.ws = await ws_connect(uri=self.url) + except (EOFError, OSError) as e: + logger.info("Could not connect due to: %s", str(e)) - async def connect_as_rm(self) -> None: + async def _connect_as_rm(self) -> None: await self.send_msg_and_await_reception_status_async( Handshake( message_id=uuid.uuid4(), role=self.role, supported_protocol_versions=[S2_VERSION] @@ -302,6 +330,8 @@ async def connect_as_rm(self) -> None: ) logger.debug("Send handshake to CEM. Expecting Handshake and HandshakeResponse from CEM.") + await self._handle_received_messages() + async def handle_handshake( self, _: "S2Connection", message: S2Message, send_okay: Awaitable[None] ) -> None: @@ -427,7 +457,11 @@ async def _send_and_forget(self, s2_msg: S2Message) -> None: json_msg = s2_msg.to_json() logger.debug("Sending message %s", json_msg) - await self.ws.send(json_msg) + try: + await self.ws.send(json_msg) + except websockets.ConnectionClosedError as e: + logger.error("Unable to send message %s due to %s", s2_msg, str(e)) + self._restart_connection_event.set() async def respond_with_reception_status( self, subject_message_id: str, status: ReceptionStatusValues, diagnostic_label: str @@ -458,9 +492,14 @@ async def send_msg_and_await_reception_status_async( s2_msg.message_id, # type: ignore[attr-defined] timeout_reception_status, ) - reception_status = await self.reception_status_awaiter.wait_for_reception_status( - s2_msg.message_id, timeout_reception_status # type: ignore[attr-defined] - ) + try: + reception_status = await self.reception_status_awaiter.wait_for_reception_status( + s2_msg.message_id, timeout_reception_status # type: ignore[attr-defined] + ) + except TimeoutError: + logger.error("Did not receive a reception status on time for %s", s2_msg.message_id) + self._stop_event.set() + raise if reception_status.status != ReceptionStatusValues.OK and raise_on_error: raise RuntimeError(f"ReceptionStatus was not OK but rather {reception_status.status}") From f6ac2e5c8cff7f42aa2bfe2dba7a4bceedbbc2fc Mon Sep 17 00:00:00 2001 From: Sebastiaan la Fleur Date: Mon, 30 Sep 2024 17:02:21 +0200 Subject: [PATCH 2/3] Change FRBC example to use new reconnect and new threading model. --- examples/example_frbc_rm.py | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/examples/example_frbc_rm.py b/examples/example_frbc_rm.py index b189b1d..ac72028 100644 --- a/examples/example_frbc_rm.py +++ b/examples/example_frbc_rm.py @@ -159,20 +159,16 @@ def deactivate(self, conn: S2Connection) -> None: provides_forecast=False, provides_power_measurements=[CommodityQuantity.ELECTRIC_POWER_L1], ), + reconnect=True, ) -stop_event = threading.Event() - - def stop(signal_num, _current_stack_frame): print(f"Received signal {signal_num}. Will stop S2 connection.") - stop_event.set() + s2_conn.stop() signal.signal(signal.SIGINT, stop) signal.signal(signal.SIGTERM, stop) s2_conn.start_as_rm() -stop_event.wait() -s2_conn.stop() From 185c8f8d1866537a23f6a73bcc13485a21f5e93f Mon Sep 17 00:00:00 2001 From: Sebastiaan la Fleur Date: Mon, 30 Sep 2024 17:06:00 +0200 Subject: [PATCH 3/3] Fix linting and typing issues. --- examples/example_frbc_rm.py | 1 - src/s2python/s2_connection.py | 8 ++++++-- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/examples/example_frbc_rm.py b/examples/example_frbc_rm.py index ac72028..aea2337 100644 --- a/examples/example_frbc_rm.py +++ b/examples/example_frbc_rm.py @@ -1,6 +1,5 @@ import logging import sys -import threading import uuid import signal import datetime diff --git a/src/s2python/s2_connection.py b/src/s2python/s2_connection.py index b65cfcf..188ecc7 100644 --- a/src/s2python/s2_connection.py +++ b/src/s2python/s2_connection.py @@ -197,8 +197,9 @@ class S2Connection: # pylint: disable=too-many-instance-attributes _eventloop: asyncio.AbstractEventLoop _stop_event: asyncio.Event + _restart_connection_event: asyncio.Event - def __init__( + def __init__( # pylint: disable=too-many-arguments self, url: str, role: EnergyManagementRole, @@ -497,7 +498,10 @@ async def send_msg_and_await_reception_status_async( s2_msg.message_id, timeout_reception_status # type: ignore[attr-defined] ) except TimeoutError: - logger.error("Did not receive a reception status on time for %s", s2_msg.message_id) + logger.error( + "Did not receive a reception status on time for %s", + s2_msg.message_id, # type: ignore[attr-defined] + ) self._stop_event.set() raise