From f9c6228d87ed95d35f61e364611e0714d30a13a3 Mon Sep 17 00:00:00 2001 From: Benjamin Himes Date: Thu, 22 May 2025 17:12:00 +0200 Subject: [PATCH 1/2] =?UTF-8?q?We=20occasionally=20get=20incorrect=20timeo?= =?UTF-8?q?uts=20with=20AsyncSubstrateInterface,=20and=20this=20is=20large?= =?UTF-8?q?ly=20due=20to=20the=20calculation=20of=20time=20=E2=80=94=20`lo?= =?UTF-8?q?op.time()`=20is=20not=20necessarily=20`time.time()`?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- async_substrate_interface/async_substrate.py | 18 +++++++++++++++--- 1 file changed, 15 insertions(+), 3 deletions(-) diff --git a/async_substrate_interface/async_substrate.py b/async_substrate_interface/async_substrate.py index 94bda13..a4c6847 100644 --- a/async_substrate_interface/async_substrate.py +++ b/async_substrate_interface/async_substrate.py @@ -531,13 +531,21 @@ def __init__( self._open_subscriptions = 0 self._options = options if options else {} self.last_received = time.time() + self.last_sent = time.time() async def __aenter__(self): async with self._lock: self._in_use += 1 await self.connect() + now = asyncio.get_running_loop().time() + self.last_received = now + self.last_sent = now return self + @staticmethod + async def loop_time() -> float: + return asyncio.get_running_loop().time() + async def connect(self, force=False): if self._exit_task: self._exit_task.cancel() @@ -594,7 +602,7 @@ async def _recv(self) -> None: try: # TODO consider wrapping this in asyncio.wait_for and use that for the timeout logic response = json.loads(await self.ws.recv(decode=False)) - self.last_received = time.time() + self.last_received = await self.loop_time() async with self._lock: # note that these 'subscriptions' are all waiting sent messages which have not received # responses, and thus are not the same as RPC 'subscriptions', which are unique @@ -630,12 +638,12 @@ async def send(self, payload: dict) -> int: Returns: id: the internal ID of the request (incremented int) """ - # async with self._lock: original_id = get_next_id() # self._open_subscriptions += 1 await self.max_subscriptions.acquire() try: await self.ws.send(json.dumps({**payload, **{"id": original_id}})) + self.last_sent = await self.loop_time() return original_id except (ConnectionClosed, ssl.SSLError, EOFError): async with self._lock: @@ -2120,7 +2128,11 @@ async def _make_rpc_request( if request_manager.is_complete: break - if time.time() - self.ws.last_received >= self.retry_timeout: + if ( + (current_time := await self.ws.loop_time()) - self.ws.last_received + >= self.retry_timeout + and current_time - self.ws.last_sent >= self.retry_timeout + ): if attempt >= self.max_retries: logger.warning( f"Timed out waiting for RPC requests {attempt} times. Exiting." From 11461fffa93c72d5169377caa46ce156c37245b0 Mon Sep 17 00:00:00 2001 From: Benjamin Himes Date: Thu, 22 May 2025 18:44:50 +0200 Subject: [PATCH 2/2] Ensure everything uses loop time rather than time.time --- async_substrate_interface/async_substrate.py | 19 ++++++++++++++----- 1 file changed, 14 insertions(+), 5 deletions(-) diff --git a/async_substrate_interface/async_substrate.py b/async_substrate_interface/async_substrate.py index a4c6847..f991ea0 100644 --- a/async_substrate_interface/async_substrate.py +++ b/async_substrate_interface/async_substrate.py @@ -9,6 +9,7 @@ import logging import ssl import time +import warnings from hashlib import blake2b from typing import ( Optional, @@ -530,16 +531,21 @@ def __init__( self._exit_task = None self._open_subscriptions = 0 self._options = options if options else {} - self.last_received = time.time() - self.last_sent = time.time() + try: + now = asyncio.get_running_loop().time() + except RuntimeError: + warnings.warn( + "You are instantiating the AsyncSubstrateInterface Websocket outside of an event loop. " + "Verify this is intended." + ) + now = asyncio.new_event_loop().time() + self.last_received = now + self.last_sent = now async def __aenter__(self): async with self._lock: self._in_use += 1 await self.connect() - now = asyncio.get_running_loop().time() - self.last_received = now - self.last_sent = now return self @staticmethod @@ -547,6 +553,9 @@ async def loop_time() -> float: return asyncio.get_running_loop().time() async def connect(self, force=False): + now = await self.loop_time() + self.last_received = now + self.last_sent = now if self._exit_task: self._exit_task.cancel() if not self._initialized or force: