From c24c3d0eed473cdc782eb14ab11f9ed10ad14f23 Mon Sep 17 00:00:00 2001 From: Benjamin Himes Date: Thu, 6 Feb 2025 16:02:30 +0200 Subject: [PATCH 01/14] Reuses the websocket for sync Substrate --- async_substrate_interface/sync_substrate.py | 128 +++++++++++--------- 1 file changed, 68 insertions(+), 60 deletions(-) diff --git a/async_substrate_interface/sync_substrate.py b/async_substrate_interface/sync_substrate.py index 0cb87d1..7f9ea85 100644 --- a/async_substrate_interface/sync_substrate.py +++ b/async_substrate_interface/sync_substrate.py @@ -15,6 +15,7 @@ ) from scalecodec.base import RuntimeConfigurationObject, ScaleBytes, ScaleType from websockets.sync.client import connect +from websockets.exceptions import ConnectionClosed from async_substrate_interface.errors import ( ExtrinsicNotFound, @@ -507,6 +508,7 @@ def __init__( self._metadata_cache = {} self.metadata_version_hex = "0x0f000000" # v15 self.reload_type_registry() + self.ws = self.connect(init=True) if not _mock: self.initialize() @@ -527,7 +529,7 @@ def initialize(self): self.initialized = True def __exit__(self, exc_type, exc_val, exc_tb): - pass + self.ws.close() @property def properties(self): @@ -562,6 +564,15 @@ def name(self): self._name = self.rpc_request("system_name", []).get("result") return self._name + def connect(self, init=False): + if init is True: + return connect(self.chain_endpoint, max_size=2**32) + else: + if not self.ws.close_code: + return self.ws + else: + return connect(self.chain_endpoint, max_size=2**32) + def get_storage_item(self, module: str, storage_function: str): if not self._metadata: self.init_runtime() @@ -1620,69 +1631,67 @@ def _make_rpc_request( _received = {} subscription_added = False - with connect(self.chain_endpoint, max_size=2**32) as ws: - item_id = 0 - for payload in payloads: - item_id += 1 - ws.send(json.dumps({**payload["payload"], **{"id": item_id}})) - request_manager.add_request(item_id, payload["id"]) - - while True: - try: - response = json.loads( - ws.recv(timeout=self.retry_timeout, decode=False) + ws = self.connect(init=False if attempt == 1 else True) + item_id = 0 + for payload in payloads: + item_id += 1 + ws.send(json.dumps({**payload["payload"], **{"id": item_id}})) + request_manager.add_request(item_id, payload["id"]) + + while True: + try: + response = json.loads(ws.recv(timeout=self.retry_timeout, decode=False)) + except (TimeoutError, ConnectionClosed): + if attempt >= self.max_retries: + logging.warning( + f"Timed out waiting for RPC requests {attempt} times. Exiting." ) - except TimeoutError: - if attempt >= self.max_retries: - logging.warning( - f"Timed out waiting for RPC requests {attempt} times. Exiting." - ) - raise SubstrateRequestException("Max retries reached.") - else: - return self._make_rpc_request( - payloads, + raise SubstrateRequestException("Max retries reached.") + else: + return self._make_rpc_request( + payloads, + value_scale_type, + storage_item, + result_handler, + attempt + 1, + ) + if "id" in response: + _received[response["id"]] = response + elif "params" in response: + _received[response["params"]["subscription"]] = response + else: + raise SubstrateRequestException(response) + for item_id in list(request_manager.response_map.keys()): + if item_id not in request_manager.responses or isinstance( + result_handler, Callable + ): + if response := _received.pop(item_id): + if ( + isinstance(result_handler, Callable) + and not subscription_added + ): + # handles subscriptions, overwrites the previous mapping of {item_id : payload_id} + # with {subscription_id : payload_id} + try: + item_id = request_manager.overwrite_request( + item_id, response["result"] + ) + subscription_added = True + except KeyError: + raise SubstrateRequestException(str(response)) + decoded_response, complete = self._process_response( + response, + item_id, value_scale_type, storage_item, result_handler, - attempt + 1, ) - if "id" in response: - _received[response["id"]] = response - elif "params" in response: - _received[response["params"]["subscription"]] = response - else: - raise SubstrateRequestException(response) - for item_id in list(request_manager.response_map.keys()): - if item_id not in request_manager.responses or isinstance( - result_handler, Callable - ): - if response := _received.pop(item_id): - if ( - isinstance(result_handler, Callable) - and not subscription_added - ): - # handles subscriptions, overwrites the previous mapping of {item_id : payload_id} - # with {subscription_id : payload_id} - try: - item_id = request_manager.overwrite_request( - item_id, response["result"] - ) - subscription_added = True - except KeyError: - raise SubstrateRequestException(str(response)) - decoded_response, complete = self._process_response( - response, - item_id, - value_scale_type, - storage_item, - result_handler, - ) - request_manager.add_response( - item_id, decoded_response, complete - ) + request_manager.add_response( + item_id, decoded_response, complete + ) - if request_manager.is_complete: - break + if request_manager.is_complete: + break return request_manager.get_results() @@ -2874,9 +2883,8 @@ def close(self): """ Closes the substrate connection, and the websocket connection. """ - # TODO change this logic try: - self.ws.shutdown() + self.ws.close() except AttributeError: pass From 23d08a3922189b5481f12ae600c6834383711eab Mon Sep 17 00:00:00 2001 From: Cameron Fairchild Date: Thu, 6 Feb 2025 11:14:37 -0500 Subject: [PATCH 02/14] bump btdecode --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index f2f4c9e..b8f4963 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -10,7 +10,7 @@ dependencies = [ "wheel", "asyncstdlib~=3.13.0", "bittensor-wallet>=2.1.3", - "bt-decode==v0.5.0-a0", + "bt-decode==v0.5.0-a1", "scalecodec~=1.2.11", "websockets>=14.1", "xxhash" From e60964403a77778fc1cb6ba6df062f6e54e6486a Mon Sep 17 00:00:00 2001 From: Benjamin Himes Date: Thu, 6 Feb 2025 19:52:19 +0200 Subject: [PATCH 03/14] Moved max_size to mixin for reuse --- async_substrate_interface/async_substrate.py | 2 +- async_substrate_interface/sync_substrate.py | 4 ++-- async_substrate_interface/types.py | 1 + 3 files changed, 4 insertions(+), 3 deletions(-) diff --git a/async_substrate_interface/async_substrate.py b/async_substrate_interface/async_substrate.py index efeb24b..7950c64 100644 --- a/async_substrate_interface/async_substrate.py +++ b/async_substrate_interface/async_substrate.py @@ -685,7 +685,7 @@ def __init__( self.ws = Websocket( url, options={ - "max_size": 2**32, + "max_size": self.ws_max_size, "write_limit": 2**16, }, ) diff --git a/async_substrate_interface/sync_substrate.py b/async_substrate_interface/sync_substrate.py index 7f9ea85..105ddde 100644 --- a/async_substrate_interface/sync_substrate.py +++ b/async_substrate_interface/sync_substrate.py @@ -566,12 +566,12 @@ def name(self): def connect(self, init=False): if init is True: - return connect(self.chain_endpoint, max_size=2**32) + return connect(self.chain_endpoint, max_size=self.ws_max_size) else: if not self.ws.close_code: return self.ws else: - return connect(self.chain_endpoint, max_size=2**32) + return connect(self.chain_endpoint, max_size=self.ws_max_size) def get_storage_item(self, module: str, storage_function: str): if not self._metadata: diff --git a/async_substrate_interface/types.py b/async_substrate_interface/types.py index b4c38ed..daaaafc 100644 --- a/async_substrate_interface/types.py +++ b/async_substrate_interface/types.py @@ -344,6 +344,7 @@ class SubstrateMixin(ABC): runtime_config: RuntimeConfigurationObject type_registry: Optional[dict] ss58_format: Optional[int] + ws_max_size = 2**32 @property def chain(self): From c473f85f07f6e84fc6be9a803a68b82b06b3f346 Mon Sep 17 00:00:00 2001 From: Cameron Fairchild Date: Thu, 6 Feb 2025 12:57:30 -0500 Subject: [PATCH 04/14] fallback to old method of decoding --- async_substrate_interface/sync_substrate.py | 149 +++++++++++++++++- async_substrate_interface/type_registry.py | 163 ++++++++++++++++++++ 2 files changed, 309 insertions(+), 3 deletions(-) create mode 100644 async_substrate_interface/type_registry.py diff --git a/async_substrate_interface/sync_substrate.py b/async_substrate_interface/sync_substrate.py index 0cb87d1..2423032 100644 --- a/async_substrate_interface/sync_substrate.py +++ b/async_substrate_interface/sync_substrate.py @@ -6,7 +6,13 @@ from bittensor_wallet.keypair import Keypair from bittensor_wallet.utils import SS58_FORMAT -from bt_decode import MetadataV15, PortableRegistry, decode as decode_by_type_string +from bt_decode import ( + MetadataV15, + PortableRegistry, + decode as decode_by_type_string, + AxonInfo as OldAxonInfo, + PrometheusInfo as OldPrometheusInfo, +) from scalecodec import ( GenericCall, GenericExtrinsic, @@ -31,6 +37,7 @@ ) from async_substrate_interface.utils import hex_to_bytes, json from async_substrate_interface.utils.storage import StorageKey +from async_substrate_interface.type_registry import _TYPE_REGISTRY ResultHandler = Callable[[dict, Any], tuple[dict, bool]] @@ -505,6 +512,8 @@ def __init__( ss58_format=self.ss58_format, implements_scale_info=True ) self._metadata_cache = {} + self._metadata_v15_cache = {} + self._old_metadata_v15 = None self.metadata_version_hex = "0x0f000000" # v15 self.reload_type_registry() if not _mock: @@ -593,6 +602,20 @@ def load_registry(self): ) self.registry = PortableRegistry.from_metadata_v15(self.metadata_v15) + def _load_registry_at_block(self, block_hash: str) -> MetadataV15: + # Should be called for any block that fails decoding. + # Possibly the metadata was different. + metadata_rpc_result = self.rpc_request( + "state_call", + ["Metadata_metadata_at_version", self.metadata_version_hex], + block_hash=block_hash, + ) + metadata_option_hex_str = metadata_rpc_result["result"] + metadata_option_bytes = bytes.fromhex(metadata_option_hex_str[2:]) + old_metadata = MetadataV15.decode_from_metadata_option(metadata_option_bytes) + + return old_metadata + def decode_scale( self, type_string: str, @@ -634,6 +657,7 @@ def _first_initialize_runtime(self): metadata = self.get_block_metadata() self._metadata = metadata self._metadata_cache[self.runtime_version] = self._metadata + self._metadata_v15_cache[self.runtime_version] = self.metadata_v15 self.runtime_version = runtime_info.get("specVersion") self.runtime_config.set_active_spec_version_id(self.runtime_version) self.transaction_version = runtime_info.get("transactionVersion") @@ -755,6 +779,30 @@ def get_runtime(block_hash, block_id) -> Runtime: self._metadata_cache[self.runtime_version] = self._metadata else: metadata = self._metadata + + if self.runtime_version in self._metadata_v15_cache: + # Get metadata v15 from cache + logging.debug( + "Retrieved metadata v15 for {} from memory".format( + self.runtime_version + ) + ) + metadata_v15 = self._old_metadata_v15 = self._metadata_v15_cache[ + self.runtime_version + ] + else: + metadata_v15 = self._old_metadata_v15 = self._load_registry_at_block( + block_hash=runtime_block_hash + ) + logging.debug( + "Retrieved metadata v15 for {} from Substrate node".format( + self.runtime_version + ) + ) + + # Update metadata v15 cache + self._metadata_v15_cache[self.runtime_version] = metadata_v15 + # Update type registry self.reload_type_registry(use_remote_preset=False, auto_discover=True) @@ -2202,6 +2250,61 @@ def get_chain_finalised_head(self): return response.get("result") + def _do_runtime_call_old( + self, + api: str, + method: str, + params: Optional[Union[list, dict]] = None, + block_hash: Optional[str] = None, + ) -> ScaleType: + runtime_call_def = _TYPE_REGISTRY["runtime_api"][api]["methods"][method] + + # Encode params + param_data = b"" + + if "encoder" in runtime_call_def: + param_data = runtime_call_def["encoder"](params) + else: + for idx, param in enumerate(runtime_call_def["params"]): + param_type_string = f"{param['type']}" + if isinstance(params, list): + param_data += self.encode_scale(param_type_string, params[idx]) + else: + if param["name"] not in params: + raise ValueError( + f"Runtime Call param '{param['name']}' is missing" + ) + + param_data += self.encode_scale( + param_type_string, params[param["name"]] + ) + + # RPC request + result_data = self.rpc_request( + "state_call", [f"{api}_{method}", param_data.hex(), block_hash] + ) + result_vec_u8_bytes = hex_to_bytes(result_data["result"]) + result_bytes = self.decode_scale("Vec", result_vec_u8_bytes) + + def _as_dict(obj): + as_dict = {} + for key in dir(obj): + if not key.startswith("_"): + val = getattr(obj, key) + if isinstance(val, (OldAxonInfo, OldPrometheusInfo)): + as_dict[key] = _as_dict(val) + else: + as_dict[key] = val + return as_dict + + # Decode result + # Get correct type + result_decoded = runtime_call_def["decoder"](bytes(result_bytes)) + as_dict = _as_dict(result_decoded) + result_obj = ScaleObj(as_dict) + + return result_obj + def runtime_call( self, api: str, @@ -2228,14 +2331,54 @@ def runtime_call( params = {} try: - metadata_v15 = self.metadata_v15.value() - apis = {entry["name"]: entry for entry in metadata_v15["apis"]} + if block_hash: + # Use old metadata v15 from init_runtime call + metadata_v15 = self._old_metadata_v15 + else: + metadata_v15 = self.metadata_v15 + + self.registry = PortableRegistry.from_metadata_v15(metadata_v15) + metadata_v15_value = metadata_v15.value() + + apis = {entry["name"]: entry for entry in metadata_v15_value["apis"]} api_entry = apis[api] methods = {entry["name"]: entry for entry in api_entry["methods"]} runtime_call_def = methods[method] except KeyError: raise ValueError(f"Runtime API Call '{api}.{method}' not found in registry") + # Check if the output type is a Vec + # If so, call the API using the old method + output_type_def = [ + x + for x in metadata_v15_value["types"]["types"] + if x["id"] == runtime_call_def["output"] + ] + if output_type_def: + output_type_def = output_type_def[0] + + if "sequence" in output_type_def["type"]["def"]: + output_type_seq_def_id = output_type_def["type"]["def"]["sequence"][ + "type" + ] + output_type_seq_def = [ + x + for x in metadata_v15_value["types"]["types"] + if x["id"] == output_type_seq_def_id + ] + if output_type_seq_def: + output_type_seq_def = output_type_seq_def[0] + if ( + "primitive" in output_type_seq_def["type"]["def"] + and output_type_seq_def["type"]["def"]["primitive"] == "u8" + ): + # This is Vec + result = self._do_runtime_call_old( + api, method, params, block_hash + ) + + return result + if isinstance(params, list) and len(params) != len(runtime_call_def["inputs"]): raise ValueError( f"Number of parameter provided ({len(params)}) does not " diff --git a/async_substrate_interface/type_registry.py b/async_substrate_interface/type_registry.py new file mode 100644 index 0000000..0f224e8 --- /dev/null +++ b/async_substrate_interface/type_registry.py @@ -0,0 +1,163 @@ +from bt_decode import ( + NeuronInfo, + NeuronInfoLite, + DelegateInfo, + StakeInfo, + SubnetHyperparameters, + SubnetInfo, + SubnetInfoV2, + encode, +) +from scalecodec import ss58_encode + +_TYPE_REGISTRY: dict[str, dict] = { + "types": { + "Balance": "u64", # Need to override default u128 + }, + "runtime_api": { + "DelegateInfoRuntimeApi": { + "methods": { + "get_delegated": { + "params": [ + { + "name": "coldkey", + "type": "Vec", + }, + ], + "encoder": lambda addr: encode(ss58_encode(addr), "Vec"), + "type": "Vec", + "decoder": DelegateInfo.decode_delegated, + }, + "get_delegates": { + "params": [], + "type": "Vec", + "decoder": DelegateInfo.decode_vec, + }, + } + }, + "NeuronInfoRuntimeApi": { + "methods": { + "get_neuron_lite": { + "params": [ + { + "name": "netuid", + "type": "u16", + }, + { + "name": "uid", + "type": "u16", + }, + ], + "type": "Vec", + "decoder": NeuronInfoLite.decode, + }, + "get_neurons_lite": { + "params": [ + { + "name": "netuid", + "type": "u16", + }, + ], + "type": "Vec", + "decoder": NeuronInfoLite.decode_vec, + }, + "get_neuron": { + "params": [ + { + "name": "netuid", + "type": "u16", + }, + { + "name": "uid", + "type": "u16", + }, + ], + "type": "Vec", + "decoder": NeuronInfo.decode, + }, + "get_neurons": { + "params": [ + { + "name": "netuid", + "type": "u16", + }, + ], + "type": "Vec", + "decoder": NeuronInfo.decode_vec, + }, + } + }, + "StakeInfoRuntimeApi": { + "methods": { + "get_stake_info_for_coldkey": { + "params": [ + { + "name": "coldkey_account_vec", + "type": "Vec", + }, + ], + "type": "Vec", + "encoder": lambda addr: encode(ss58_encode(addr), "Vec"), + "decoder": StakeInfo.decode_vec, + }, + "get_stake_info_for_coldkeys": { + "params": [ + { + "name": "coldkey_account_vecs", + "type": "Vec>", + }, + ], + "type": "Vec", + "encoder": lambda addrs: encode( + [ss58_encode(addr) for addr in addrs], "Vec>" + ), + "decoder": StakeInfo.decode_vec_tuple_vec, + }, + }, + }, + "SubnetInfoRuntimeApi": { + "methods": { + "get_subnet_hyperparams": { + "params": [ + { + "name": "netuid", + "type": "u16", + }, + ], + "type": "Vec", + "decoder": SubnetHyperparameters.decode_option, + }, + "get_subnet_info": { + "params": [ + { + "name": "netuid", + "type": "u16", + }, + ], + "type": "Vec", + "decoder": SubnetInfo.decode_option, + }, + "get_subnet_info_v2": { + "params": [ + { + "name": "netuid", + "type": "u16", + }, + ], + "type": "Vec", + "decoder": SubnetInfoV2.decode_option, + }, + "get_subnets_info": { + "params": [], + "type": "Vec", + "decoder": SubnetInfo.decode_vec_option, + }, + "get_subnets_info_v2": { + "params": [], + "type": "Vec", + "decoder": SubnetInfo.decode_vec_option, + }, + } + }, + }, +} From a206f5986fef74e03fc244ac79b182e61e0acdc7 Mon Sep 17 00:00:00 2001 From: Cameron Fairchild Date: Thu, 6 Feb 2025 12:59:40 -0500 Subject: [PATCH 05/14] add debug log --- async_substrate_interface/sync_substrate.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/async_substrate_interface/sync_substrate.py b/async_substrate_interface/sync_substrate.py index 2423032..f3c376e 100644 --- a/async_substrate_interface/sync_substrate.py +++ b/async_substrate_interface/sync_substrate.py @@ -2257,6 +2257,9 @@ def _do_runtime_call_old( params: Optional[Union[list, dict]] = None, block_hash: Optional[str] = None, ) -> ScaleType: + logging.debug( + f"Decoding old runtime call: {api}.{method} with params: {params} at block hash: {block_hash}" + ) runtime_call_def = _TYPE_REGISTRY["runtime_api"][api]["methods"][method] # Encode params @@ -2301,6 +2304,7 @@ def _as_dict(obj): # Get correct type result_decoded = runtime_call_def["decoder"](bytes(result_bytes)) as_dict = _as_dict(result_decoded) + logging.debug("Decoded old runtime call result: ", as_dict) result_obj = ScaleObj(as_dict) return result_obj From 606ee2d1ecb67646a3665811ec3d2cdb0d9378f1 Mon Sep 17 00:00:00 2001 From: Cameron Fairchild Date: Thu, 6 Feb 2025 13:02:57 -0500 Subject: [PATCH 06/14] extract as helper --- async_substrate_interface/sync_substrate.py | 34 +++------------------ async_substrate_interface/utils/decoding.py | 26 ++++++++++++++++ 2 files changed, 30 insertions(+), 30 deletions(-) create mode 100644 async_substrate_interface/utils/decoding.py diff --git a/async_substrate_interface/sync_substrate.py b/async_substrate_interface/sync_substrate.py index f3c376e..eee6a0d 100644 --- a/async_substrate_interface/sync_substrate.py +++ b/async_substrate_interface/sync_substrate.py @@ -36,6 +36,7 @@ ScaleObj, ) from async_substrate_interface.utils import hex_to_bytes, json +from async_substrate_interface.utils.decoding import _determine_if_old_runtime_call from async_substrate_interface.utils.storage import StorageKey from async_substrate_interface.type_registry import _TYPE_REGISTRY @@ -2351,37 +2352,10 @@ def runtime_call( except KeyError: raise ValueError(f"Runtime API Call '{api}.{method}' not found in registry") - # Check if the output type is a Vec - # If so, call the API using the old method - output_type_def = [ - x - for x in metadata_v15_value["types"]["types"] - if x["id"] == runtime_call_def["output"] - ] - if output_type_def: - output_type_def = output_type_def[0] - - if "sequence" in output_type_def["type"]["def"]: - output_type_seq_def_id = output_type_def["type"]["def"]["sequence"][ - "type" - ] - output_type_seq_def = [ - x - for x in metadata_v15_value["types"]["types"] - if x["id"] == output_type_seq_def_id - ] - if output_type_seq_def: - output_type_seq_def = output_type_seq_def[0] - if ( - "primitive" in output_type_seq_def["type"]["def"] - and output_type_seq_def["type"]["def"]["primitive"] == "u8" - ): - # This is Vec - result = self._do_runtime_call_old( - api, method, params, block_hash - ) + if _determine_if_old_runtime_call(runtime_call_def, metadata_v15_value): + result = self._do_runtime_call_old(api, method, params, block_hash) - return result + return result if isinstance(params, list) and len(params) != len(runtime_call_def["inputs"]): raise ValueError( diff --git a/async_substrate_interface/utils/decoding.py b/async_substrate_interface/utils/decoding.py new file mode 100644 index 0000000..ce8a6f2 --- /dev/null +++ b/async_substrate_interface/utils/decoding.py @@ -0,0 +1,26 @@ +def _determine_if_old_runtime_call(runtime_call_def, metadata_v15_value) -> bool: + # Check if the output type is a Vec + # If so, call the API using the old method + output_type_def = [ + x + for x in metadata_v15_value["types"]["types"] + if x["id"] == runtime_call_def["output"] + ] + if output_type_def: + output_type_def = output_type_def[0] + + if "sequence" in output_type_def["type"]["def"]: + output_type_seq_def_id = output_type_def["type"]["def"]["sequence"]["type"] + output_type_seq_def = [ + x + for x in metadata_v15_value["types"]["types"] + if x["id"] == output_type_seq_def_id + ] + if output_type_seq_def: + output_type_seq_def = output_type_seq_def[0] + if ( + "primitive" in output_type_seq_def["type"]["def"] + and output_type_seq_def["type"]["def"]["primitive"] == "u8" + ): + return True + return False From 470e8ee169f838e59f64f0736da3724b65120ae2 Mon Sep 17 00:00:00 2001 From: Cameron Fairchild Date: Thu, 6 Feb 2025 13:15:45 -0500 Subject: [PATCH 07/14] add async ver --- async_substrate_interface/async_substrate.py | 129 ++++++++++++++++++- 1 file changed, 126 insertions(+), 3 deletions(-) diff --git a/async_substrate_interface/async_substrate.py b/async_substrate_interface/async_substrate.py index efeb24b..875bd45 100644 --- a/async_substrate_interface/async_substrate.py +++ b/async_substrate_interface/async_substrate.py @@ -24,7 +24,13 @@ import asyncstdlib as a from bittensor_wallet.keypair import Keypair from bittensor_wallet.utils import SS58_FORMAT -from bt_decode import MetadataV15, PortableRegistry, decode as decode_by_type_string +from bt_decode import ( + MetadataV15, + PortableRegistry, + decode as decode_by_type_string, + AxonInfo as OldAxonInfo, + PrometheusInfo as OldPrometheusInfo, +) from scalecodec.base import ScaleBytes, ScaleType, RuntimeConfigurationObject from scalecodec.types import ( GenericCall, @@ -49,7 +55,9 @@ Preprocessed, ) from async_substrate_interface.utils import hex_to_bytes, json +from async_substrate_interface.utils.decoding import _determine_if_old_runtime_call from async_substrate_interface.utils.storage import StorageKey +from async_substrate_interface.type_registry import _TYPE_REGISTRY if TYPE_CHECKING: from websockets.asyncio.client import ClientConnection @@ -706,6 +714,8 @@ def __init__( ss58_format=self.ss58_format, implements_scale_info=True ) self._metadata_cache = {} + self._metadata_v15_cache = {} + self._old_metadata_v15 = None self._nonces = {} self.metadata_version_hex = "0x0f000000" # v15 self.reload_type_registry() @@ -800,6 +810,20 @@ async def load_registry(self): ) self.registry = PortableRegistry.from_metadata_v15(self.metadata_v15) + async def _load_registry_at_block(self, block_hash: str) -> MetadataV15: + # Should be called for any block that fails decoding. + # Possibly the metadata was different. + metadata_rpc_result = await self.rpc_request( + "state_call", + ["Metadata_metadata_at_version", self.metadata_version_hex], + block_hash=block_hash, + ) + metadata_option_hex_str = metadata_rpc_result["result"] + metadata_option_bytes = bytes.fromhex(metadata_option_hex_str[2:]) + old_metadata = MetadataV15.decode_from_metadata_option(metadata_option_bytes) + + return old_metadata + async def _wait_for_registry(self, _attempt: int = 1, _retries: int = 3) -> None: async def _waiter(): while self.registry is None: @@ -890,6 +914,7 @@ async def _first_initialize_runtime(self): ) self._metadata = metadata self._metadata_cache[self.runtime_version] = self._metadata + self._metadata_v15_cache[self.runtime_version] = self.metadata_v15 self.runtime_version = runtime_info.get("specVersion") self.runtime_config.set_active_spec_version_id(self.runtime_version) self.transaction_version = runtime_info.get("transactionVersion") @@ -1015,6 +1040,30 @@ async def get_runtime(block_hash, block_id) -> Runtime: self._metadata_cache[self.runtime_version] = self._metadata else: metadata = self._metadata + + if self.runtime_version in self._metadata_v15_cache: + # Get metadata v15 from cache + logging.debug( + "Retrieved metadata v15 for {} from memory".format( + self.runtime_version + ) + ) + metadata_v15 = self._old_metadata_v15 = self._metadata_v15_cache[ + self.runtime_version + ] + else: + metadata_v15 = ( + self._old_metadata_v15 + ) = await self._load_registry_at_block(block_hash=runtime_block_hash) + logging.debug( + "Retrieved metadata v15 for {} from Substrate node".format( + self.runtime_version + ) + ) + + # Update metadata v15 cache + self._metadata_v15_cache[self.runtime_version] = metadata_v15 + # Update type registry self.reload_type_registry(use_remote_preset=False, auto_discover=True) @@ -2487,6 +2536,67 @@ async def get_chain_finalised_head(self): return response.get("result") + async def _do_runtime_call_old( + self, + api: str, + method: str, + params: Optional[Union[list, dict]] = None, + block_hash: Optional[str] = None, + ) -> ScaleType: + logging.debug( + f"Decoding old runtime call: {api}.{method} with params: {params} at block hash: {block_hash}" + ) + runtime_call_def = _TYPE_REGISTRY["runtime_api"][api]["methods"][method] + + # Encode params + param_data = b"" + + if "encoder" in runtime_call_def: + param_data = runtime_call_def["encoder"](params) + else: + for idx, param in enumerate(runtime_call_def["params"]): + param_type_string = f"{param['type']}" + if isinstance(params, list): + param_data += await self.encode_scale( + param_type_string, params[idx] + ) + else: + if param["name"] not in params: + raise ValueError( + f"Runtime Call param '{param['name']}' is missing" + ) + + param_data += await self.encode_scale( + param_type_string, params[param["name"]] + ) + + # RPC request + result_data = await self.rpc_request( + "state_call", [f"{api}_{method}", param_data.hex(), block_hash] + ) + result_vec_u8_bytes = hex_to_bytes(result_data["result"]) + result_bytes = await self.decode_scale("Vec", result_vec_u8_bytes) + + def _as_dict(obj): + as_dict = {} + for key in dir(obj): + if not key.startswith("_"): + val = getattr(obj, key) + if isinstance(val, (OldAxonInfo, OldPrometheusInfo)): + as_dict[key] = _as_dict(val) + else: + as_dict[key] = val + return as_dict + + # Decode result + # Get correct type + result_decoded = runtime_call_def["decoder"](bytes(result_bytes)) + as_dict = _as_dict(result_decoded) + logging.debug("Decoded old runtime call result: ", as_dict) + result_obj = ScaleObj(as_dict) + + return result_obj + async def runtime_call( self, api: str, @@ -2513,14 +2623,27 @@ async def runtime_call( params = {} try: - metadata_v15 = self.metadata_v15.value() - apis = {entry["name"]: entry for entry in metadata_v15["apis"]} + if block_hash: + # Use old metadata v15 from init_runtime call + metadata_v15 = self._old_metadata_v15 + else: + metadata_v15 = self.metadata_v15 + + self.registry = PortableRegistry.from_metadata_v15(metadata_v15) + metadata_v15_value = metadata_v15.value() + + apis = {entry["name"]: entry for entry in metadata_v15_value["apis"]} api_entry = apis[api] methods = {entry["name"]: entry for entry in api_entry["methods"]} runtime_call_def = methods[method] except KeyError: raise ValueError(f"Runtime API Call '{api}.{method}' not found in registry") + if _determine_if_old_runtime_call(runtime_call_def, metadata_v15_value): + result = await self._do_runtime_call_old(api, method, params, block_hash) + + return result + if isinstance(params, list) and len(params) != len(runtime_call_def["inputs"]): raise ValueError( f"Number of parameter provided ({len(params)}) does not " From 3d654c50f2536fcb710c23a4f5243bcd2a49d25e Mon Sep 17 00:00:00 2001 From: Cameron Fairchild Date: Thu, 6 Feb 2025 13:23:06 -0500 Subject: [PATCH 08/14] dont store in init first runtime --- async_substrate_interface/async_substrate.py | 1 - async_substrate_interface/sync_substrate.py | 1 - 2 files changed, 2 deletions(-) diff --git a/async_substrate_interface/async_substrate.py b/async_substrate_interface/async_substrate.py index 875bd45..13a461f 100644 --- a/async_substrate_interface/async_substrate.py +++ b/async_substrate_interface/async_substrate.py @@ -914,7 +914,6 @@ async def _first_initialize_runtime(self): ) self._metadata = metadata self._metadata_cache[self.runtime_version] = self._metadata - self._metadata_v15_cache[self.runtime_version] = self.metadata_v15 self.runtime_version = runtime_info.get("specVersion") self.runtime_config.set_active_spec_version_id(self.runtime_version) self.transaction_version = runtime_info.get("transactionVersion") diff --git a/async_substrate_interface/sync_substrate.py b/async_substrate_interface/sync_substrate.py index eee6a0d..f5c30db 100644 --- a/async_substrate_interface/sync_substrate.py +++ b/async_substrate_interface/sync_substrate.py @@ -658,7 +658,6 @@ def _first_initialize_runtime(self): metadata = self.get_block_metadata() self._metadata = metadata self._metadata_cache[self.runtime_version] = self._metadata - self._metadata_v15_cache[self.runtime_version] = self.metadata_v15 self.runtime_version = runtime_info.get("specVersion") self.runtime_config.set_active_spec_version_id(self.runtime_version) self.transaction_version = runtime_info.get("transactionVersion") From 89bce8cfcce3558d81cbee762e13f2f0eff6a404 Mon Sep 17 00:00:00 2001 From: Cameron Fairchild Date: Thu, 6 Feb 2025 13:48:02 -0500 Subject: [PATCH 09/14] change to helper --- async_substrate_interface/async_substrate.py | 18 +++++------------- async_substrate_interface/sync_substrate.py | 18 +++++------------- async_substrate_interface/utils/decoding.py | 18 ++++++++++++++++++ 3 files changed, 28 insertions(+), 26 deletions(-) diff --git a/async_substrate_interface/async_substrate.py b/async_substrate_interface/async_substrate.py index 13a461f..b65c6e0 100644 --- a/async_substrate_interface/async_substrate.py +++ b/async_substrate_interface/async_substrate.py @@ -55,7 +55,10 @@ Preprocessed, ) from async_substrate_interface.utils import hex_to_bytes, json -from async_substrate_interface.utils.decoding import _determine_if_old_runtime_call +from async_substrate_interface.utils.decoding import ( + _determine_if_old_runtime_call, + _bt_decode_to_dict_or_list, +) from async_substrate_interface.utils.storage import StorageKey from async_substrate_interface.type_registry import _TYPE_REGISTRY @@ -2576,21 +2579,10 @@ async def _do_runtime_call_old( result_vec_u8_bytes = hex_to_bytes(result_data["result"]) result_bytes = await self.decode_scale("Vec", result_vec_u8_bytes) - def _as_dict(obj): - as_dict = {} - for key in dir(obj): - if not key.startswith("_"): - val = getattr(obj, key) - if isinstance(val, (OldAxonInfo, OldPrometheusInfo)): - as_dict[key] = _as_dict(val) - else: - as_dict[key] = val - return as_dict - # Decode result # Get correct type result_decoded = runtime_call_def["decoder"](bytes(result_bytes)) - as_dict = _as_dict(result_decoded) + as_dict = _bt_decode_to_dict_or_list(result_decoded) logging.debug("Decoded old runtime call result: ", as_dict) result_obj = ScaleObj(as_dict) diff --git a/async_substrate_interface/sync_substrate.py b/async_substrate_interface/sync_substrate.py index f5c30db..969dcc8 100644 --- a/async_substrate_interface/sync_substrate.py +++ b/async_substrate_interface/sync_substrate.py @@ -36,7 +36,10 @@ ScaleObj, ) from async_substrate_interface.utils import hex_to_bytes, json -from async_substrate_interface.utils.decoding import _determine_if_old_runtime_call +from async_substrate_interface.utils.decoding import ( + _determine_if_old_runtime_call, + _bt_decode_to_dict_or_list, +) from async_substrate_interface.utils.storage import StorageKey from async_substrate_interface.type_registry import _TYPE_REGISTRY @@ -2289,21 +2292,10 @@ def _do_runtime_call_old( result_vec_u8_bytes = hex_to_bytes(result_data["result"]) result_bytes = self.decode_scale("Vec", result_vec_u8_bytes) - def _as_dict(obj): - as_dict = {} - for key in dir(obj): - if not key.startswith("_"): - val = getattr(obj, key) - if isinstance(val, (OldAxonInfo, OldPrometheusInfo)): - as_dict[key] = _as_dict(val) - else: - as_dict[key] = val - return as_dict - # Decode result # Get correct type result_decoded = runtime_call_def["decoder"](bytes(result_bytes)) - as_dict = _as_dict(result_decoded) + as_dict = _bt_decode_to_dict_or_list(result_decoded) logging.debug("Decoded old runtime call result: ", as_dict) result_obj = ScaleObj(as_dict) diff --git a/async_substrate_interface/utils/decoding.py b/async_substrate_interface/utils/decoding.py index ce8a6f2..8607699 100644 --- a/async_substrate_interface/utils/decoding.py +++ b/async_substrate_interface/utils/decoding.py @@ -1,3 +1,6 @@ +from bt_metadata import AxonInfo, PrometheusInfo + + def _determine_if_old_runtime_call(runtime_call_def, metadata_v15_value) -> bool: # Check if the output type is a Vec # If so, call the API using the old method @@ -24,3 +27,18 @@ def _determine_if_old_runtime_call(runtime_call_def, metadata_v15_value) -> bool ): return True return False + + +def _bt_decode_to_dict_or_list(obj) -> dict | list[dict]: + if isinstance(obj, list): + return [_bt_decode_to_dict_or_list(item) for item in obj] + + as_dict = {} + for key in dir(obj): + if not key.startswith("_"): + val = getattr(obj, key) + if isinstance(val, (AxonInfo, PrometheusInfo)): + as_dict[key] = _bt_decode_to_dict_or_list(val) + else: + as_dict[key] = val + return as_dict From 8c80c7f0c248209e2527c34ab55b117b236d0c1a Mon Sep 17 00:00:00 2001 From: Benjamin Himes Date: Thu, 6 Feb 2025 21:25:29 +0200 Subject: [PATCH 10/14] Fix name --- async_substrate_interface/utils/decoding.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/async_substrate_interface/utils/decoding.py b/async_substrate_interface/utils/decoding.py index 8607699..3162fe4 100644 --- a/async_substrate_interface/utils/decoding.py +++ b/async_substrate_interface/utils/decoding.py @@ -1,4 +1,4 @@ -from bt_metadata import AxonInfo, PrometheusInfo +from bt_decode import AxonInfo, PrometheusInfo def _determine_if_old_runtime_call(runtime_call_def, metadata_v15_value) -> bool: From c82df029a0a42022a87458cef1c8ebeedcb7597d Mon Sep 17 00:00:00 2001 From: Benjamin Himes Date: Thu, 6 Feb 2025 21:57:34 +0200 Subject: [PATCH 11/14] Fix cache --- async_substrate_interface/async_substrate.py | 19 ++++++++----------- async_substrate_interface/sync_substrate.py | 20 ++++++++------------ 2 files changed, 16 insertions(+), 23 deletions(-) diff --git a/async_substrate_interface/async_substrate.py b/async_substrate_interface/async_substrate.py index b65c6e0..9440807 100644 --- a/async_substrate_interface/async_substrate.py +++ b/async_substrate_interface/async_substrate.py @@ -24,13 +24,7 @@ import asyncstdlib as a from bittensor_wallet.keypair import Keypair from bittensor_wallet.utils import SS58_FORMAT -from bt_decode import ( - MetadataV15, - PortableRegistry, - decode as decode_by_type_string, - AxonInfo as OldAxonInfo, - PrometheusInfo as OldPrometheusInfo, -) +from bt_decode import MetadataV15, PortableRegistry, decode as decode_by_type_string from scalecodec.base import ScaleBytes, ScaleType, RuntimeConfigurationObject from scalecodec.types import ( GenericCall, @@ -957,7 +951,10 @@ async def get_runtime(block_hash, block_id) -> Runtime: if ( (block_hash and block_hash == self.last_block_hash) or (block_id and block_id == self.block_id) - ) and self._metadata is not None: + ) and all( + x is not None + for x in [self._metadata, self._old_metadata_v15, self.metadata_v15] + ): return Runtime( self.chain, self.runtime_config, @@ -1003,9 +1000,9 @@ async def get_runtime(block_hash, block_id) -> Runtime: f"No runtime information for block '{block_hash}'" ) # Check if runtime state already set to current block - if ( - runtime_info.get("specVersion") == self.runtime_version - and self._metadata is not None + if runtime_info.get("specVersion") == self.runtime_version and all( + x is not None + for x in [self._metadata, self._old_metadata_v15, self.metadata_v15] ): return Runtime( self.chain, diff --git a/async_substrate_interface/sync_substrate.py b/async_substrate_interface/sync_substrate.py index 969dcc8..297cd4c 100644 --- a/async_substrate_interface/sync_substrate.py +++ b/async_substrate_interface/sync_substrate.py @@ -6,13 +6,7 @@ from bittensor_wallet.keypair import Keypair from bittensor_wallet.utils import SS58_FORMAT -from bt_decode import ( - MetadataV15, - PortableRegistry, - decode as decode_by_type_string, - AxonInfo as OldAxonInfo, - PrometheusInfo as OldPrometheusInfo, -) +from bt_decode import MetadataV15, PortableRegistry, decode as decode_by_type_string from scalecodec import ( GenericCall, GenericExtrinsic, @@ -701,7 +695,10 @@ def get_runtime(block_hash, block_id) -> Runtime: if ( (block_hash and block_hash == self.last_block_hash) or (block_id and block_id == self.block_id) - ) and self._metadata is not None: + ) and all( + x is not None + for x in [self._metadata, self._old_metadata_v15, self.metadata_v15] + ): return Runtime( self.chain, self.runtime_config, @@ -743,9 +740,9 @@ def get_runtime(block_hash, block_id) -> Runtime: f"No runtime information for block '{block_hash}'" ) # Check if runtime state already set to current block - if ( - runtime_info.get("specVersion") == self.runtime_version - and self._metadata is not None + if runtime_info.get("specVersion") == self.runtime_version and all( + x is not None + for x in [self._metadata, self._old_metadata_v15, self.metadata_v15] ): return Runtime( self.chain, @@ -802,7 +799,6 @@ def get_runtime(block_hash, block_id) -> Runtime: self.runtime_version ) ) - # Update metadata v15 cache self._metadata_v15_cache[self.runtime_version] = metadata_v15 From 8a46fc4e6df1a1f6b26ea4a3735ac665c72f6b3c Mon Sep 17 00:00:00 2001 From: Benjamin Himes Date: Thu, 6 Feb 2025 22:11:16 +0200 Subject: [PATCH 12/14] Add TODO for later. --- async_substrate_interface/async_substrate.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/async_substrate_interface/async_substrate.py b/async_substrate_interface/async_substrate.py index 9440807..8c6983e 100644 --- a/async_substrate_interface/async_substrate.py +++ b/async_substrate_interface/async_substrate.py @@ -1026,6 +1026,8 @@ async def get_runtime(block_hash, block_id) -> Runtime: self.runtime_version ] else: + # TODO when I get time, I'd like to add this and the metadata v15 as tasks with callbacks + # TODO to update the caches, but I don't have time now. metadata = self._metadata = await self.get_block_metadata( block_hash=runtime_block_hash, decode=True ) From f0f25dfdccfc99e3d4fce1c0146904ab67f72bd2 Mon Sep 17 00:00:00 2001 From: Benjamin Himes Date: Thu, 6 Feb 2025 22:13:50 +0200 Subject: [PATCH 13/14] Update bt-decode --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index b8f4963..d3bd392 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -10,7 +10,7 @@ dependencies = [ "wheel", "asyncstdlib~=3.13.0", "bittensor-wallet>=2.1.3", - "bt-decode==v0.5.0-a1", + "bt-decode==v0.5.0-a2", "scalecodec~=1.2.11", "websockets>=14.1", "xxhash" From 9ea82fb63f8b726d31fe9de9ad0000148ad5f312 Mon Sep 17 00:00:00 2001 From: ibraheem-opentensor Date: Thu, 6 Feb 2025 14:06:12 -0800 Subject: [PATCH 14/14] Bumps version and changelog --- CHANGELOG.md | 5 +++++ pyproject.toml | 2 +- 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 8e73e42..4797dc4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,10 @@ # Changelog +## 1.0.0rc11 /2025-02-06 +* Reuses the websocket for sync Substrate by @thewhaleking in https://github.com/opentensor/async-substrate-interface/pull/29 +* Feat/metadata v15 cache by @camfairchild in https://github.com/opentensor/async-substrate-interface/pull/30 +* Backmerge main to staging rc10 by @ibraheem-opentensor in https://github.com/opentensor/async-substrate-interface/pull/31 + ## 1.0.0rc10 /2025-02-04 * Fixes decoding account ids for sync substrate diff --git a/pyproject.toml b/pyproject.toml index 02cbaed..c6281f3 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "async-substrate-interface" -version = "1.0.0rc10" +version = "1.0.0rc11" description = "Asyncio library for interacting with substrate. Mostly API-compatible with py-substrate-interface" readme = "README.md" license = { file = "LICENSE" }