From a084915bc9af4dba418faba8566b7d3a8c88fcf4 Mon Sep 17 00:00:00 2001 From: Benjamin Himes Date: Thu, 21 Nov 2024 22:58:50 +0200 Subject: [PATCH 1/7] Updated Subtensor to use a websocket object that we create and control. Updated AsyncSubstrate to use the updated async websocket client, bumped websockets version req to 14.1. --- bittensor/core/subtensor.py | 18 ++++++++++------- bittensor/utils/async_substrate_interface.py | 20 ++++++++----------- bittensor/utils/networking.py | 2 +- requirements/prod.txt | 2 +- .../test_subtensor_integration.py | 2 +- 5 files changed, 22 insertions(+), 22 deletions(-) diff --git a/bittensor/core/subtensor.py b/bittensor/core/subtensor.py index 656a513afe..ade08e2c33 100644 --- a/bittensor/core/subtensor.py +++ b/bittensor/core/subtensor.py @@ -5,7 +5,6 @@ import argparse import copy -import socket import ssl from typing import Union, Optional, TypedDict, Any @@ -18,6 +17,7 @@ from scalecodec.type_registry import load_type_registry_preset from scalecodec.types import ScaleType from substrateinterface.base import QueryMapResult, SubstrateInterface +from websockets.sync import client as ws_client from bittensor.core import settings from bittensor.core.axon import Axon @@ -132,6 +132,7 @@ def __init__( _mock: bool = False, log_verbose: bool = False, connection_timeout: int = 600, + websocket: Optional[ws_client.ClientConnection] = None, ) -> None: """ Initializes a Subtensor interface for interacting with the Bittensor blockchain. @@ -147,6 +148,7 @@ def __init__( _mock (bool): If set to ``True``, uses a mocked connection for testing purposes. Default is ``False``. log_verbose (bool): Whether to enable verbose logging. If set to ``True``, detailed log information about the connection and network operations will be provided. Default is ``True``. connection_timeout (int): The maximum time in seconds to keep the connection alive. Default is ``600``. + websocket: websockets sync (threading) client object connected to the network. This initialization sets up the connection to the specified Bittensor network, allowing for various blockchain operations such as neuron registration, stake management, and setting weights. """ @@ -183,6 +185,7 @@ def __init__( self.log_verbose = log_verbose self._connection_timeout = connection_timeout self.substrate: "SubstrateInterface" = None + self.websocket = websocket self._get_substrate() def __str__(self) -> str: @@ -205,22 +208,23 @@ def _get_substrate(self): """Establishes a connection to the Substrate node using configured parameters.""" try: # Set up params. + if not self.websocket: + self.websocket = ws_client.connect( + self.chain_endpoint, + open_timeout=self._connection_timeout, + max_size=2**32, + ) self.substrate = SubstrateInterface( ss58_format=settings.SS58_FORMAT, use_remote_preset=True, - url=self.chain_endpoint, type_registry=settings.TYPE_REGISTRY, + websocket=self.websocket, ) if self.log_verbose: logging.debug( f"Connected to {self.network} network and {self.chain_endpoint}." ) - try: - self.substrate.websocket.settimeout(self._connection_timeout) - except (AttributeError, TypeError, socket.error, OSError) as e: - logging.warning(f"Error setting timeout: {e}") - except (ConnectionRefusedError, ssl.SSLError) as error: logging.error( f"Could not connect to {self.network} network with {self.chain_endpoint} chain endpoint.", diff --git a/bittensor/utils/async_substrate_interface.py b/bittensor/utils/async_substrate_interface.py index c3af691952..77362ee052 100644 --- a/bittensor/utils/async_substrate_interface.py +++ b/bittensor/utils/async_substrate_interface.py @@ -6,11 +6,9 @@ from hashlib import blake2b from typing import Optional, Any, Union, Callable, Awaitable, cast -import websockets from async_property import async_property from bittensor_wallet import Keypair from bt_decode import PortableRegistry, decode as decode_by_type_string, MetadataV15 -from packaging import version from scalecodec import GenericExtrinsic from scalecodec.base import ScaleBytes, ScaleType, RuntimeConfigurationObject from scalecodec.type_registry import load_type_registry_preset @@ -21,6 +19,8 @@ BlockNotFound, ) from substrateinterface.storage import StorageKey +from websockets.asyncio.client import ClientConnection, connect +from websockets.exceptions import ConnectionClosed ResultHandler = Callable[[dict, Any], Awaitable[tuple[dict, bool]]] @@ -622,7 +622,7 @@ def __init__( # TODO allow setting max concurrent connections and rpc subscriptions per connection # TODO reconnection logic self.ws_url = ws_url - self.ws: Optional[websockets.WebSocketClientProtocol] = None + self.ws: Optional[ClientConnection] = None self.id = 0 self.max_subscriptions = max_subscriptions self.max_connections = max_connections @@ -650,7 +650,7 @@ async def __aenter__(self): async def _connect(self): self.ws = await asyncio.wait_for( - websockets.connect(self.ws_url, **self._options), timeout=10 + connect(self.ws_url, **self._options), timeout=10 ) async def __aexit__(self, exc_type, exc_val, exc_tb): @@ -693,9 +693,7 @@ async def shutdown(self): async def _recv(self) -> None: try: - response = json.loads( - await cast(websockets.WebSocketClientProtocol, self.ws).recv() - ) + response = json.loads(await cast(ClientProtocol, self.ws).recv()) async with self._lock: self._open_subscriptions -= 1 if "id" in response: @@ -704,7 +702,7 @@ async def _recv(self) -> None: self._received[response["params"]["subscription"]] = response else: raise KeyError(response) - except websockets.ConnectionClosed: + except ConnectionClosed: raise except KeyError as e: raise e @@ -715,7 +713,7 @@ async def _start_receiving(self): await self._recv() except asyncio.CancelledError: pass - except websockets.ConnectionClosed: + except ConnectionClosed: # TODO try reconnect, but only if it's needed raise @@ -732,7 +730,7 @@ async def send(self, payload: dict) -> int: try: await self.ws.send(json.dumps({**payload, **{"id": original_id}})) return original_id - except websockets.ConnectionClosed: + except ConnectionClosed: raise async def retrieve(self, item_id: int) -> Optional[dict]: @@ -773,8 +771,6 @@ def __init__( "max_size": 2**32, "write_limit": 2**16, } - if version.parse(websockets.__version__) < version.parse("14.0"): - options.update({"read_limit": 2**16}) self.ws = Websocket(chain_endpoint, options=options) self._lock = asyncio.Lock() self.last_block_hash: Optional[str] = None diff --git a/bittensor/utils/networking.py b/bittensor/utils/networking.py index 7524b353f5..e0ce8c2bce 100644 --- a/bittensor/utils/networking.py +++ b/bittensor/utils/networking.py @@ -165,7 +165,7 @@ def ensure_connected(func): def is_connected(substrate) -> bool: """Check if the substrate connection is active.""" - sock = substrate.websocket.sock + sock = substrate.websocket.socket return ( sock is not None and sock.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR) == 0 diff --git a/requirements/prod.txt b/requirements/prod.txt index d084b5e37a..b50ae9c2af 100644 --- a/requirements/prod.txt +++ b/requirements/prod.txt @@ -23,5 +23,5 @@ python-Levenshtein scalecodec==1.2.11 substrate-interface~=1.7.9 uvicorn -websockets>12.0 +websockets>=14.1 bittensor-wallet>=2.1.0 diff --git a/tests/integration_tests/test_subtensor_integration.py b/tests/integration_tests/test_subtensor_integration.py index bacb340f2c..ebe07fdf56 100644 --- a/tests/integration_tests/test_subtensor_integration.py +++ b/tests/integration_tests/test_subtensor_integration.py @@ -75,7 +75,7 @@ def test_network_overrides(self): config1.subtensor.chain_endpoint = None # Mock network calls - with patch("substrateinterface.SubstrateInterface.connect_websocket"): + with patch("websockets.sync.client.connect"): with patch("substrateinterface.SubstrateInterface.reload_type_registry"): print(bittensor.Subtensor, type(bittensor.Subtensor)) # Choose network arg over config From c5f3edb7a0db0dea3d4dead48658ef1c79b3ed1e Mon Sep 17 00:00:00 2001 From: Benjamin Himes Date: Thu, 21 Nov 2024 23:10:45 +0200 Subject: [PATCH 2/7] Typo --- bittensor/utils/async_substrate_interface.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bittensor/utils/async_substrate_interface.py b/bittensor/utils/async_substrate_interface.py index 77362ee052..1a92bd8508 100644 --- a/bittensor/utils/async_substrate_interface.py +++ b/bittensor/utils/async_substrate_interface.py @@ -693,7 +693,7 @@ async def shutdown(self): async def _recv(self) -> None: try: - response = json.loads(await cast(ClientProtocol, self.ws).recv()) + response = json.loads(await cast(ClientConnection, self.ws).recv()) async with self._lock: self._open_subscriptions -= 1 if "id" in response: From 557acf87f108b803df9f04ed2b5c6a9162f47359 Mon Sep 17 00:00:00 2001 From: Benjamin Himes Date: Thu, 21 Nov 2024 23:49:56 +0200 Subject: [PATCH 3/7] fix test --- tests/unit_tests/test_subtensor.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/unit_tests/test_subtensor.py b/tests/unit_tests/test_subtensor.py index fa7e190dc5..c7636a65f2 100644 --- a/tests/unit_tests/test_subtensor.py +++ b/tests/unit_tests/test_subtensor.py @@ -1904,7 +1904,7 @@ def test_connect_with_substrate(mocker): """Ensure re-connection is non called when using an alive substrate.""" # Prep fake_substrate = mocker.MagicMock() - fake_substrate.websocket.sock.getsockopt.return_value = 0 + fake_substrate.websocket.socket.getsockopt.return_value = 0 mocker.patch.object( subtensor_module, "SubstrateInterface", return_value=fake_substrate ) From 911412555fb250254831bf68e4555c5f54547c84 Mon Sep 17 00:00:00 2001 From: Benjamin Himes Date: Fri, 22 Nov 2024 00:39:50 +0200 Subject: [PATCH 4/7] fix test --- tests/unit_tests/test_subtensor.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/unit_tests/test_subtensor.py b/tests/unit_tests/test_subtensor.py index c7636a65f2..dd31173083 100644 --- a/tests/unit_tests/test_subtensor.py +++ b/tests/unit_tests/test_subtensor.py @@ -2145,6 +2145,7 @@ def test_networks_during_connection(mocker): """Test networks during_connection.""" # Preps subtensor_module.SubstrateInterface = mocker.Mock() + mocker.patch("websockets.sync.client.connect") # Call for network in list(settings.NETWORK_MAP.keys()) + ["undefined"]: sub = Subtensor(network) From ca5d4fd70bbf1bde359619de63c4a37678b05c63 Mon Sep 17 00:00:00 2001 From: Benjamin Himes Date: Fri, 22 Nov 2024 00:45:57 +0200 Subject: [PATCH 5/7] Moved ClientConnection to TYPE_CHECKING import --- bittensor/utils/async_substrate_interface.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/bittensor/utils/async_substrate_interface.py b/bittensor/utils/async_substrate_interface.py index 1a92bd8508..b95fae3f11 100644 --- a/bittensor/utils/async_substrate_interface.py +++ b/bittensor/utils/async_substrate_interface.py @@ -4,7 +4,7 @@ from collections import defaultdict from dataclasses import dataclass from hashlib import blake2b -from typing import Optional, Any, Union, Callable, Awaitable, cast +from typing import Optional, Any, Union, Callable, Awaitable, cast, TYPE_CHECKING from async_property import async_property from bittensor_wallet import Keypair @@ -19,9 +19,12 @@ BlockNotFound, ) from substrateinterface.storage import StorageKey -from websockets.asyncio.client import ClientConnection, connect +from websockets.asyncio.client import connect from websockets.exceptions import ConnectionClosed +if TYPE_CHECKING: + from websockets.asyncio.client import ClientConnection + ResultHandler = Callable[[dict, Any], Awaitable[tuple[dict, bool]]] From d3ce88b92623b1b4ce334946b7d74b0b19d72614 Mon Sep 17 00:00:00 2001 From: Benjamin Himes <37844818+thewhaleking@users.noreply.github.com> Date: Fri, 22 Nov 2024 00:46:13 +0200 Subject: [PATCH 6/7] Update bittensor/utils/async_substrate_interface.py Co-authored-by: Roman <167799377+roman-opentensor@users.noreply.github.com> --- bittensor/utils/async_substrate_interface.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bittensor/utils/async_substrate_interface.py b/bittensor/utils/async_substrate_interface.py index b95fae3f11..982e8dfc96 100644 --- a/bittensor/utils/async_substrate_interface.py +++ b/bittensor/utils/async_substrate_interface.py @@ -625,7 +625,7 @@ def __init__( # TODO allow setting max concurrent connections and rpc subscriptions per connection # TODO reconnection logic self.ws_url = ws_url - self.ws: Optional[ClientConnection] = None + self.ws: Optional["ClientConnection"] = None self.id = 0 self.max_subscriptions = max_subscriptions self.max_connections = max_connections From de51ca39fddf591a10122c5144e79c4afe8e6983 Mon Sep 17 00:00:00 2001 From: Benjamin Himes <37844818+thewhaleking@users.noreply.github.com> Date: Fri, 22 Nov 2024 00:46:28 +0200 Subject: [PATCH 7/7] Update bittensor/core/subtensor.py Co-authored-by: Roman <167799377+roman-opentensor@users.noreply.github.com> --- bittensor/core/subtensor.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bittensor/core/subtensor.py b/bittensor/core/subtensor.py index ade08e2c33..8e4be9dd29 100644 --- a/bittensor/core/subtensor.py +++ b/bittensor/core/subtensor.py @@ -148,7 +148,7 @@ def __init__( _mock (bool): If set to ``True``, uses a mocked connection for testing purposes. Default is ``False``. log_verbose (bool): Whether to enable verbose logging. If set to ``True``, detailed log information about the connection and network operations will be provided. Default is ``True``. connection_timeout (int): The maximum time in seconds to keep the connection alive. Default is ``600``. - websocket: websockets sync (threading) client object connected to the network. + websocket (websockets.sync.client.ClientConnection): websockets sync (threading) client object connected to the network. This initialization sets up the connection to the specified Bittensor network, allowing for various blockchain operations such as neuron registration, stake management, and setting weights. """