Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 2 additions & 7 deletions examples/example_frbc_rm.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import logging
import sys
import threading
import uuid
import signal
import datetime
Expand Down Expand Up @@ -159,20 +158,16 @@ def deactivate(self, conn: S2Connection) -> None:
provides_forecast=False,
provides_power_measurements=[CommodityQuantity.ELECTRIC_POWER_L1],
),
reconnect=True,
)


stop_event = threading.Event()


def stop(signal_num, _current_stack_frame):
print(f"Received signal {signal_num}. Will stop S2 connection.")
stop_event.set()
s2_conn.stop()


signal.signal(signal.SIGINT, stop)
signal.signal(signal.SIGTERM, stop)

s2_conn.start_as_rm()
stop_event.wait()
s2_conn.stop()
119 changes: 81 additions & 38 deletions src/s2python/s2_connection.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import asyncio
import json
import logging
import time
import threading
import uuid
from dataclasses import dataclass
Expand Down Expand Up @@ -180,6 +181,7 @@ def register_handler(self, msg_type: Type[S2Message], handler: S2MessageHandler)

class S2Connection: # pylint: disable=too-many-instance-attributes
url: str
reconnect: bool
reception_status_awaiter: ReceptionStatusAwaiter
ws: Optional[WSConnection]
s2_parser: S2Parser
Expand All @@ -195,16 +197,20 @@ class S2Connection: # pylint: disable=too-many-instance-attributes

_eventloop: asyncio.AbstractEventLoop
_stop_event: asyncio.Event
_restart_connection_event: asyncio.Event

def __init__(
def __init__( # pylint: disable=too-many-arguments
self,
url: str,
role: EnergyManagementRole,
control_types: List[S2ControlType],
asset_details: AssetDetails,
reconnect: bool = False,
) -> None:
self.url = url
self.reconnect = reconnect
self.reception_status_awaiter = ReceptionStatusAwaiter()
self.ws = None
self.s2_parser = S2Parser()

self._handlers = MessageHandlers()
Expand All @@ -221,14 +227,13 @@ def __init__(
self._handlers.register_handler(HandshakeResponse, self.handle_handshake_response_as_rm)

def start_as_rm(self) -> None:
self._thread = threading.Thread(target=self._run_eventloop, daemon=False)
self._thread.start()
logger.debug("Started eventloop thread!")
self._run_eventloop(self._run_as_rm())

def _run_eventloop(self) -> None:
def _run_eventloop(self, main_task: Awaitable[None]) -> None:
self._thread = threading.current_thread()
logger.debug("Starting eventloop")
try:
self._eventloop.run_until_complete(self._run_as_rm())
self._eventloop.run_until_complete(main_task)
except asyncio.CancelledError:
pass
logger.debug("S2 connection thread has stopped.")
Expand Down Expand Up @@ -256,52 +261,78 @@ async def _do_stop(self) -> None:

async def _run_as_rm(self) -> None:
logger.debug("Connecting as S2 resource manager.")
self._received_messages = asyncio.Queue()

self._stop_event = asyncio.Event()
await self.connect_ws()

background_tasks = []
background_tasks.append(self._eventloop.create_task(self._receive_messages()))
background_tasks.append(self._eventloop.create_task(self._handle_received_messages()))
first_run = True

async def wait_till_stop() -> None:
await self._stop_event.wait()
while (first_run or self.reconnect) and not self._stop_event.is_set():
first_run = False
self._restart_connection_event = asyncio.Event()
await self._connect_and_run()
time.sleep(1)

background_tasks.append(self._eventloop.create_task(wait_till_stop()))
logger.debug("Finished S2 connection eventloop.")

await self.connect_as_rm()
(done, pending) = await asyncio.wait(background_tasks, return_when=asyncio.FIRST_COMPLETED)
for task in done:
try:
await task
except asyncio.CancelledError:
pass
except websockets.ConnectionClosedError:
logger.info("The other party closed the websocket connection.c")
async def _connect_and_run(self) -> None:
self._received_messages = asyncio.Queue()
await self._connect_ws()
if self.ws:

for task in pending:
try:
task.cancel()
await task
except asyncio.CancelledError:
pass
async def wait_till_stop() -> None:
await self._stop_event.wait()

async def wait_till_connection_restart() -> None:
await self._restart_connection_event.wait()

background_tasks = [
self._eventloop.create_task(self._receive_messages()),
self._eventloop.create_task(wait_till_stop()),
self._eventloop.create_task(self._connect_as_rm()),
self._eventloop.create_task(wait_till_connection_restart()),
]

(done, pending) = await asyncio.wait(
background_tasks, return_when=asyncio.FIRST_COMPLETED
)
if self._current_control_type:
self._current_control_type.deactivate(self)
self._current_control_type = None

for task in done:
try:
await task
except asyncio.CancelledError:
pass
except (websockets.ConnectionClosedError, websockets.ConnectionClosedOK):
logger.info("The other party closed the websocket connection.")

for task in pending:
try:
task.cancel()
await task
except asyncio.CancelledError:
pass

if self.ws:
await self.ws.close()
await self.ws.wait_closed()
logger.debug("Finished S2 connection eventloop.")

async def connect_ws(self) -> None:
self.ws = await ws_connect(uri=self.url)
async def _connect_ws(self) -> None:
try:
self.ws = await ws_connect(uri=self.url)
except (EOFError, OSError) as e:
logger.info("Could not connect due to: %s", str(e))

async def connect_as_rm(self) -> None:
async def _connect_as_rm(self) -> None:
await self.send_msg_and_await_reception_status_async(
Handshake(
message_id=uuid.uuid4(), role=self.role, supported_protocol_versions=[S2_VERSION]
)
)
logger.debug("Send handshake to CEM. Expecting Handshake and HandshakeResponse from CEM.")

await self._handle_received_messages()

async def handle_handshake(
self, _: "S2Connection", message: S2Message, send_okay: Awaitable[None]
) -> None:
Expand Down Expand Up @@ -427,7 +458,11 @@ async def _send_and_forget(self, s2_msg: S2Message) -> None:

json_msg = s2_msg.to_json()
logger.debug("Sending message %s", json_msg)
await self.ws.send(json_msg)
try:
await self.ws.send(json_msg)
except websockets.ConnectionClosedError as e:
logger.error("Unable to send message %s due to %s", s2_msg, str(e))
self._restart_connection_event.set()

async def respond_with_reception_status(
self, subject_message_id: str, status: ReceptionStatusValues, diagnostic_label: str
Expand Down Expand Up @@ -458,9 +493,17 @@ async def send_msg_and_await_reception_status_async(
s2_msg.message_id, # type: ignore[attr-defined]
timeout_reception_status,
)
reception_status = await self.reception_status_awaiter.wait_for_reception_status(
s2_msg.message_id, timeout_reception_status # type: ignore[attr-defined]
)
try:
reception_status = await self.reception_status_awaiter.wait_for_reception_status(
s2_msg.message_id, timeout_reception_status # type: ignore[attr-defined]
)
except TimeoutError:
logger.error(
"Did not receive a reception status on time for %s",
s2_msg.message_id, # type: ignore[attr-defined]
)
self._stop_event.set()
raise

if reception_status.status != ReceptionStatusValues.OK and raise_on_error:
raise RuntimeError(f"ReceptionStatus was not OK but rather {reception_status.status}")
Expand Down