diff --git a/bittensor_cli/cli.py b/bittensor_cli/cli.py index 892d56fa5..96415d7c0 100755 --- a/bittensor_cli/cli.py +++ b/bittensor_cli/cli.py @@ -313,7 +313,7 @@ def get_creation_data( seed: Optional[str], json: Optional[str], json_password: Optional[str], -) -> tuple[str, str, str, str]: +) -> tuple[Optional[str], Optional[str], Optional[str], Optional[str]]: """ Determines which of the key creation elements have been supplied, if any. If None have been supplied, prompts to user, and determines what they've supplied. Returns all elements in a tuple. diff --git a/bittensor_cli/src/__init__.py b/bittensor_cli/src/__init__.py index 0d3e67de5..81ee77477 100644 --- a/bittensor_cli/src/__init__.py +++ b/bittensor_cli/src/__init__.py @@ -139,168 +139,6 @@ class WalletValidationTypes(Enum): "types": { "Balance": "u64", # Need to override default u128 }, - "runtime_api": { - "DelegateInfoRuntimeApi": { - "methods": { - "get_delegated": { - "params": [ - { - "name": "coldkey", - "type": "Vec", - }, - ], - "type": "Vec", - }, - "get_delegates": { - "params": [], - "type": "Vec", - }, - } - }, - "NeuronInfoRuntimeApi": { - "methods": { - "get_neuron_lite": { - "params": [ - { - "name": "netuid", - "type": "u16", - }, - { - "name": "uid", - "type": "u16", - }, - ], - "type": "Vec", - }, - "get_neurons_lite": { - "params": [ - { - "name": "netuid", - "type": "u16", - }, - ], - "type": "Vec", - }, - "get_neuron": { - "params": [ - { - "name": "netuid", - "type": "u16", - }, - { - "name": "uid", - "type": "u16", - }, - ], - "type": "Vec", - }, - "get_neurons": { - "params": [ - { - "name": "netuid", - "type": "u16", - }, - ], - "type": "Vec", - }, - } - }, - "StakeInfoRuntimeApi": { - "methods": { - "get_stake_info_for_coldkey": { - "params": [ - { - "name": "coldkey_account_vec", - "type": "Vec", - }, - ], - "type": "Vec", - }, - "get_stake_info_for_coldkeys": { - "params": [ - { - "name": "coldkey_account_vecs", - "type": "Vec>", - }, - ], - "type": "Vec", - }, - }, - }, - "ValidatorIPRuntimeApi": { - "methods": { - "get_associated_validator_ip_info_for_subnet": { - "params": [ - { - "name": "netuid", - "type": "u16", - }, - ], - "type": "Vec", - }, - }, - }, - "SubnetInfoRuntimeApi": { - "methods": { - "get_subnet_hyperparams": { - "params": [ - { - "name": "netuid", - "type": "u16", - }, - ], - "type": "Vec", - }, - "get_subnet_info": { - "params": [ - { - "name": "netuid", - "type": "u16", - }, - ], - "type": "Vec", - }, - "get_subnets_info": { - "params": [], - "type": "Vec", - }, - } - }, - "SubnetRegistrationRuntimeApi": { - "methods": {"get_network_registration_cost": {"params": [], "type": "u64"}} - }, - "ColdkeySwapRuntimeApi": { - "methods": { - "get_scheduled_coldkey_swap": { - "params": [ - { - "name": "coldkey_account_vec", - "type": "Vec", - }, - ], - "type": "Vec", - }, - "get_remaining_arbitration_period": { - "params": [ - { - "name": "coldkey_account_vec", - "type": "Vec", - }, - ], - "type": "Vec", - }, - "get_coldkey_swap_destinations": { - "params": [ - { - "name": "coldkey_account_vec", - "type": "Vec", - }, - ], - "type": "Vec", - }, - } - }, - }, } NETWORK_EXPLORER_MAP = { diff --git a/bittensor_cli/src/bittensor/async_substrate_interface.py b/bittensor_cli/src/bittensor/async_substrate_interface.py index af54b531f..3515020aa 100644 --- a/bittensor_cli/src/bittensor/async_substrate_interface.py +++ b/bittensor_cli/src/bittensor/async_substrate_interface.py @@ -4,15 +4,23 @@ from collections import defaultdict from dataclasses import dataclass from hashlib import blake2b +from itertools import chain from typing import Optional, Any, Union, Callable, Awaitable, cast, TYPE_CHECKING +from types import SimpleNamespace +from bt_decode import ( + PortableRegistry, + decode as decode_by_type_string, + encode as encode_by_type_string, + MetadataV15, +) from async_property import async_property -from bt_decode import PortableRegistry, decode as decode_by_type_string, MetadataV15 from bittensor_wallet import Keypair from scalecodec import GenericExtrinsic from scalecodec.base import ScaleBytes, ScaleType, RuntimeConfigurationObject from scalecodec.type_registry import load_type_registry_preset from scalecodec.types import GenericCall +from bittensor_wallet import Keypair from substrateinterface.exceptions import ( SubstrateRequestException, ExtrinsicNotFound, @@ -22,6 +30,8 @@ from websockets.asyncio.client import connect from websockets.exceptions import ConnectionClosed +from .utils import bytes_from_hex_string_result, encode_account_id + from bittensor_cli.src.bittensor.utils import hex_to_bytes if TYPE_CHECKING: @@ -38,6 +48,33 @@ def timeout_handler(signum, frame): raise TimeoutException("Operation timed out") +class DictWithValue(dict): + value: Any + + def __init__(self, value: Any = None): + super().__init__() + self.value = value + + def __getitem__(self, key: Union[str, int]): + result = super().get(key) + if not result and isinstance(key, int): + # if the key is not found, return the key at the given index + return list(chain.from_iterable(self.items()))[key] + return result + + @classmethod + def from_dict(cls, dict_: dict): + inst = cls() + # recursively convert all values to DictWithValue + for key, value in dict_.items(): + if isinstance(value, dict): + value = cls.from_dict(value) + inst[key] = value + inst.value = dict_ + + return inst + + class ExtrinsicReceipt: """ Object containing information of submitted extrinsic. Block hash where extrinsic is included is required @@ -756,6 +793,7 @@ async def retrieve(self, item_id: int) -> Optional[dict]: class AsyncSubstrateInterface: runtime = None registry: Optional[PortableRegistry] = None + metadata_v15: Optional[MetadataV15] = None def __init__( self, @@ -801,6 +839,7 @@ def __init__( self.transaction_version = None self.metadata = None self.metadata_version_hex = "0x0f000000" # v15 + self.metadata_v15 = None async def __aenter__(self): await self.initialize() @@ -821,6 +860,64 @@ async def initialize(self): async def __aexit__(self, exc_type, exc_val, exc_tb): pass + @staticmethod + def _type_registry_to_scale_info_types( + registry_types: list[dict[str, Any]], + ) -> list[dict[str, Any]]: + scale_info_types = [] + for type_entry in registry_types: + new_type_entry = DictWithValue(value=type_entry) + if ( + "variant" in type_entry["type"]["def"] + and len(type_entry["type"]["def"]["variant"]) == 0 + ): + type_entry["type"]["def"]["variant"] = { + "variants": [] + } # add empty variants field to variant type if empty + + for key, value in type_entry.items(): + if isinstance(value, dict): + entry = DictWithValue.from_dict(value) + else: + entry = SimpleNamespace(value=value) + new_type_entry[key] = entry + + scale_info_types.append(new_type_entry) + + return scale_info_types + + @staticmethod + def _type_id_to_name(ty_id: int) -> str: + type_string = f"scale_info::{ty_id}" + + return type_string + + def _type_registry_apis_to_runtime_api( + self, apis: list[dict[str, Any]] + ) -> dict[str, Any]: + runtime_api = {} + for api in apis: + api_name = api["name"] + methods = api["methods"] + + runtime_api[api_name] = { + "methods": { + method["name"]: { + "description": "\n".join(method["docs"]), + "params": [ + { + "name": input["name"], + "type": self._type_id_to_name(input["ty"]), + } + for input in method["inputs"] + ], + "type": self._type_id_to_name(method["output"]), + } + for method in methods + } + } + return runtime_api + @property def chain(self): """ @@ -855,6 +952,7 @@ async def load_registry(self): metadata_option_bytes = bytes.fromhex(metadata_option_hex_str[2:]) metadata_v15 = MetadataV15.decode_from_metadata_option(metadata_option_bytes) self.registry = PortableRegistry.from_metadata_v15(metadata_v15) + self.metadata_v15 = metadata_v15 async def decode_scale( self, type_string, scale_bytes: bytes, return_scale_obj=False @@ -881,6 +979,38 @@ async def decode_scale( obj = decode_by_type_string(type_string, self.registry, scale_bytes) return obj + async def encode_scale(self, type_string, value: Any) -> bytes: + """ + Helper function to encode arbitrary objects according to given RUST type_string + (e.g. BlockNumber). + + Parameters + ---------- + type_string + value + + Returns + ------- + bytes: encoded SCALE bytes + + """ + if value is None: + result = b"\x00" + else: + if type_string == "scale_info::0": # Is an AccountId + # encode string into AccountId + ## AccountId is a composite type with one, unnamed field + return encode_account_id(value) + if isinstance(value, ScaleType): + if value.data.data is not None: + # Already encoded + return bytes(value.data.data) + else: + value = value.value # Unwrap the value of the type + + result = bytes(encode_by_type_string(type_string, self.registry, value)) + return result + async def init_runtime( self, block_hash: Optional[str] = None, block_id: Optional[int] = None ) -> Runtime: @@ -2176,13 +2306,13 @@ async def get_chain_finalised_head(self): return response.get("result") - async def runtime_call( + async def runtime_call_wait_to_decode( self, api: str, method: str, params: Optional[Union[list, dict]] = None, block_hash: Optional[str] = None, - ) -> ScaleType: + ) -> tuple[str, bytes]: """ Calls a runtime API method @@ -2191,7 +2321,7 @@ async def runtime_call( :param params: List of parameters needed to call the runtime API :param block_hash: Hash of the block at which to make the runtime API call - :return: ScaleType from the runtime call + :return: Tuple of the runtime call type and the result bytes """ await self.init_runtime() @@ -2199,57 +2329,68 @@ async def runtime_call( params = {} try: - runtime_call_def = self.runtime_config.type_registry["runtime_api"][api][ - "methods" - ][method] - runtime_api_types = self.runtime_config.type_registry["runtime_api"][ - api - ].get("types", {}) + metadata_v15 = self.metadata_v15.value() + apis = {entry["name"]: entry for entry in metadata_v15["apis"]} + api_entry = apis[api] + methods = {entry["name"]: entry for entry in api_entry["methods"]} + runtime_call_def = methods[method] except KeyError: raise ValueError(f"Runtime API Call '{api}.{method}' not found in registry") - if isinstance(params, list) and len(params) != len(runtime_call_def["params"]): + if isinstance(params, list) and len(params) != len(runtime_call_def["inputs"]): raise ValueError( f"Number of parameter provided ({len(params)}) does not " - f"match definition {len(runtime_call_def['params'])}" + f"match definition {len(runtime_call_def['inputs'])}" ) - # Add runtime API types to registry - self.runtime_config.update_type_registry_types(runtime_api_types) - runtime = Runtime( - self.chain, - self.runtime_config, - self.metadata, - self.type_registry, - ) - # Encode params - param_data = ScaleBytes(bytes()) - for idx, param in enumerate(runtime_call_def["params"]): - scale_obj = runtime.runtime_config.create_scale_object(param["type"]) + param_data = b"" + for idx, param in enumerate(runtime_call_def["inputs"]): + param_type_string = f'scale_info::{param["ty"]}' if isinstance(params, list): - param_data += scale_obj.encode(params[idx]) + param_data += await self.encode_scale(param_type_string, params[idx]) else: if param["name"] not in params: raise ValueError(f"Runtime Call param '{param['name']}' is missing") - param_data += scale_obj.encode(params[param["name"]]) + param_data += await self.encode_scale( + param_type_string, params[param["name"]] + ) # RPC request result_data = await self.rpc_request( - "state_call", [f"{api}_{method}", str(param_data), block_hash] + "state_call", [f"{api}_{method}", param_data.hex(), block_hash] ) - # Decode result - # TODO update this to use bt-decode - result_obj = runtime.runtime_config.create_scale_object( - runtime_call_def["type"] - ) - result_obj.decode( - ScaleBytes(result_data["result"]), - check_remaining=self.config.get("strict_scale_decode"), + output_type_string = f'scale_info::{runtime_call_def["output"]}' + + return output_type_string, bytes_from_hex_string_result(result_data["result"]) + + async def runtime_call( + self, + api: str, + method: str, + params: Optional[Union[list, dict]] = None, + block_hash: Optional[str] = None, + ) -> ScaleType: + """ + Calls a runtime API method + + :param api: Name of the runtime API e.g. 'TransactionPaymentApi' + :param method: Name of the method e.g. 'query_fee_details' + :param params: List of parameters needed to call the runtime API + :param block_hash: Hash of the block at which to make the runtime API call + + :return: ScaleType from the runtime call + """ + # Get the runtime call type and result bytes + runtime_call_type, result_bytes = await self.runtime_call_wait_to_decode( + api, method, params, block_hash ) + # Decode the result bytes + result_obj = await self.decode_scale(runtime_call_type, result_bytes) + return result_obj async def get_account_nonce(self, account_address: str) -> int: @@ -2263,7 +2404,7 @@ async def get_account_nonce(self, account_address: str) -> int: nonce_obj = await self.runtime_call( "AccountNonceApi", "account_nonce", [account_address] ) - return nonce_obj.value + return nonce_obj async def get_metadata_constant(self, module_name, constant_name, block_hash=None): """ @@ -2356,14 +2497,17 @@ async def get_payment_info( extrinsic = await self.create_signed_extrinsic( call=call, keypair=keypair, signature=signature ) - extrinsic_len = self.runtime_config.create_scale_object("u32") - extrinsic_len.encode(len(extrinsic.data)) + extrinsic_len = len(extrinsic.data) - result = await self.runtime_call( - "TransactionPaymentApi", "query_info", [extrinsic, extrinsic_len] + result = ( + await self.runtime_call( # Needs the call hex, not the extrinsic object + "TransactionPaymentApi", + "query_info", + [extrinsic, extrinsic_len], + ) ) - return result.value + return result async def query( self, diff --git a/bittensor_cli/src/bittensor/chain_data.py b/bittensor_cli/src/bittensor/chain_data.py index 73f41b1fb..06ac49980 100644 --- a/bittensor_cli/src/bittensor/chain_data.py +++ b/bittensor_cli/src/bittensor/chain_data.py @@ -1,18 +1,14 @@ +from abc import abstractmethod from dataclasses import dataclass -from typing import Optional +from typing import Any, Optional, Union import bt_decode import netaddr -from scalecodec.utils.ss58 import ss58_encode +import munch from bittensor_cli.src.bittensor.balances import Balance from bittensor_cli.src.bittensor.networking import int_to_ip -from bittensor_cli.src.bittensor.utils import SS58_FORMAT, u16_normalized_float - - -def decode_account_id(account_id_bytes: tuple): - # Convert the AccountId bytes to a Base64 string - return ss58_encode(bytes(account_id_bytes).hex(), SS58_FORMAT) +from bittensor_cli.src.bittensor.utils import u16_normalized_float, decode_account_id def process_stake_data(stake_data): @@ -62,7 +58,32 @@ def from_neuron_info(cls, neuron_info: dict) -> "AxonInfo": @dataclass -class SubnetHyperparameters: +class InfoBase: + """Base dataclass for info objects.""" + + @abstractmethod + def _fix_decoded(self, decoded: Any) -> "InfoBase": + raise NotImplementedError( + "This is an abstract method and must be implemented in a subclass." + ) + + @classmethod + def from_any(cls, any_: Any) -> "InfoBase": + return cls._fix_decoded(any_) + + @classmethod + def list_from_any(cls, any_list: list[Any]) -> list["InfoBase"]: + return [cls.from_any(any_) for any_ in any_list] + + def __getitem__(self, item): + return getattr(self, item) + + def get(self, item, default=None): + return getattr(self, item, default) + + +@dataclass +class SubnetHyperparameters(InfoBase): """Dataclass for subnet hyperparameters.""" rho: int @@ -94,61 +115,74 @@ class SubnetHyperparameters: liquid_alpha_enabled: bool @classmethod - def from_vec_u8(cls, vec_u8: bytes) -> Optional["SubnetHyperparameters"]: - decoded = bt_decode.SubnetHyperparameters.decode(vec_u8) + def _fix_decoded( + cls, decoded: Union[dict, "SubnetHyperparameters"] + ) -> "SubnetHyperparameters": return SubnetHyperparameters( - rho=decoded.rho, - kappa=decoded.kappa, - immunity_period=decoded.immunity_period, - min_allowed_weights=decoded.min_allowed_weights, - max_weight_limit=decoded.max_weights_limit, - tempo=decoded.tempo, - min_difficulty=decoded.min_difficulty, - max_difficulty=decoded.max_difficulty, - weights_version=decoded.weights_version, - weights_rate_limit=decoded.weights_rate_limit, - adjustment_interval=decoded.adjustment_interval, - activity_cutoff=decoded.activity_cutoff, - registration_allowed=decoded.registration_allowed, - target_regs_per_interval=decoded.target_regs_per_interval, - min_burn=decoded.min_burn, - max_burn=decoded.max_burn, - bonds_moving_avg=decoded.bonds_moving_avg, - max_regs_per_block=decoded.max_regs_per_block, - serving_rate_limit=decoded.serving_rate_limit, - max_validators=decoded.max_validators, - adjustment_alpha=decoded.adjustment_alpha, - difficulty=decoded.difficulty, - commit_reveal_weights_interval=decoded.commit_reveal_weights_interval, - commit_reveal_weights_enabled=decoded.commit_reveal_weights_enabled, - alpha_high=decoded.alpha_high, - alpha_low=decoded.alpha_low, - liquid_alpha_enabled=decoded.liquid_alpha_enabled, + rho=decoded.get("rho"), + kappa=decoded.get("kappa"), + immunity_period=decoded.get("immunity_period"), + min_allowed_weights=decoded.get("min_allowed_weights"), + max_weight_limit=decoded.get("max_weights_limit"), + tempo=decoded.get("tempo"), + min_difficulty=decoded.get("min_difficulty"), + max_difficulty=decoded.get("max_difficulty"), + weights_version=decoded.get("weights_version"), + weights_rate_limit=decoded.get("weights_rate_limit"), + adjustment_interval=decoded.get("adjustment_interval"), + activity_cutoff=decoded.get("activity_cutoff"), + registration_allowed=decoded.get("registration_allowed"), + target_regs_per_interval=decoded.get("target_regs_per_interval"), + min_burn=decoded.get("min_burn"), + max_burn=decoded.get("max_burn"), + bonds_moving_avg=decoded.get("bonds_moving_avg"), + max_regs_per_block=decoded.get("max_regs_per_block"), + serving_rate_limit=decoded.get("serving_rate_limit"), + max_validators=decoded.get("max_validators"), + adjustment_alpha=decoded.get("adjustment_alpha"), + difficulty=decoded.get("difficulty"), + commit_reveal_weights_interval=decoded.get( + "commit_reveal_weights_interval" + ), + commit_reveal_weights_enabled=decoded.get("commit_reveal_weights_enabled"), + alpha_high=decoded.get("alpha_high"), + alpha_low=decoded.get("alpha_low"), + liquid_alpha_enabled=decoded.get("liquid_alpha_enabled"), ) + @classmethod + def from_vec_u8(cls, vec_u8: bytes) -> Optional["SubnetHyperparameters"]: + decoded = bt_decode.SubnetHyperparameters.decode(vec_u8) + return cls._fix_decoded(decoded) + @dataclass -class StakeInfo: +class StakeInfo(InfoBase): """Dataclass for stake info.""" hotkey_ss58: str # Hotkey address coldkey_ss58: str # Coldkey address stake: Balance # Stake for the hotkey-coldkey pair + @classmethod + def _fix_decoded(cls, decoded: Any) -> "StakeInfo": + hotkey = decode_account_id(decoded.get("hotkey")) + coldkey = decode_account_id(decoded.get("coldkey")) + stake = Balance.from_rao(decoded.get("stake")) + + return StakeInfo(hotkey, coldkey, stake) + + @classmethod + def from_any(cls, any_: Any) -> "StakeInfo": + return cls._fix_decoded(any_) + @classmethod def list_from_vec_u8(cls, vec_u8: bytes) -> list["StakeInfo"]: """ Returns a list of StakeInfo objects from a `vec_u8`. """ decoded = bt_decode.StakeInfo.decode_vec(vec_u8) - results = [] - for d in decoded: - hotkey = decode_account_id(d.hotkey) - coldkey = decode_account_id(d.coldkey) - stake = Balance.from_rao(d.stake) - results.append(StakeInfo(hotkey, coldkey, stake)) - - return results + return [cls._fix_decoded(d) for d in decoded] @dataclass @@ -170,7 +204,7 @@ def fix_decoded_values(cls, prometheus_info_decoded: dict) -> "PrometheusInfo": @dataclass -class NeuronInfo: +class NeuronInfo(InfoBase): """Dataclass for neuron metadata.""" hotkey: str @@ -241,58 +275,64 @@ def get_null_neuron() -> "NeuronInfo": return neuron @classmethod - def from_vec_u8(cls, vec_u8: bytes) -> "NeuronInfo": - n = bt_decode.NeuronInfo.decode(vec_u8) + def _fix_decoded(cls, decoded: Any) -> "NeuronInfo": + n = decoded + stake_dict = process_stake_data(n.stake) total_stake = sum(stake_dict.values()) if stake_dict else Balance(0) axon_info = n.axon_info - coldkey = decode_account_id(n.coldkey) - hotkey = decode_account_id(n.hotkey) + coldkey = decode_account_id(n.get("coldkey")) + hotkey = decode_account_id(n.get("hotkey")) return NeuronInfo( hotkey=hotkey, coldkey=coldkey, - uid=n.uid, - netuid=n.netuid, - active=n.active, + uid=n.get("uid"), + netuid=n.get("netuid"), + active=n.get("active"), stake=total_stake, stake_dict=stake_dict, total_stake=total_stake, - rank=u16_normalized_float(n.rank), - emission=n.emission / 1e9, - incentive=u16_normalized_float(n.incentive), - consensus=u16_normalized_float(n.consensus), - trust=u16_normalized_float(n.trust), - validator_trust=u16_normalized_float(n.validator_trust), - dividends=u16_normalized_float(n.dividends), - last_update=n.last_update, - validator_permit=n.validator_permit, - weights=[[e[0], e[1]] for e in n.weights], - bonds=[[e[0], e[1]] for e in n.bonds], - pruning_score=n.pruning_score, + rank=u16_normalized_float(n.rget("ank")), + emission=n.get("emission") / 1e9, + incentive=u16_normalized_float(n.get("incentive")), + consensus=u16_normalized_float(n.get("consensus")), + trust=u16_normalized_float(n.get("trust")), + validator_trust=u16_normalized_float(n.get("validator_trust")), + dividends=u16_normalized_float(n.get("dividends")), + last_update=n.get("last_update"), + validator_permit=n.get("validator_permit"), + weights=[[e[0], e[1]] for e in n.get("weights")], + bonds=[[e[0], e[1]] for e in n.get("bonds")], + pruning_score=n.get("pruning_score"), prometheus_info=PrometheusInfo( - block=n.prometheus_info.block, - version=n.prometheus_info.version, - ip=str(netaddr.IPAddress(n.prometheus_info.ip)), - port=n.prometheus_info.port, - ip_type=n.prometheus_info.ip_type, + block=n.get("prometheus_info").get("block"), + version=n.get("prometheus_info").get("version"), + ip=str(netaddr.IPAddress(n.get("prometheus_info").get("ip"))), + port=n.get("prometheus_info").get("port"), + ip_type=n.get("prometheus_info").get("ip_type"), ), axon_info=AxonInfo( - version=axon_info.version, - ip=str(netaddr.IPAddress(axon_info.ip)), - port=axon_info.port, - ip_type=axon_info.ip_type, - placeholder1=axon_info.placeholder1, - placeholder2=axon_info.placeholder2, - protocol=axon_info.protocol, + version=axon_info.get("version"), + ip=str(netaddr.IPAddress(axon_info.get("ip"))), + port=axon_info.get("port"), + ip_type=axon_info.get("ip_type"), + placeholder1=axon_info.get("placeholder1"), + placeholder2=axon_info.get("placeholder2"), + protocol=axon_info.get("protocol"), hotkey=hotkey, coldkey=coldkey, ), is_null=False, ) + @classmethod + def from_vec_u8(cls, vec_u8: bytes) -> "NeuronInfo": + n = bt_decode.NeuronInfo.decode(vec_u8) + return cls._fix_decoded(n) + @dataclass -class NeuronInfoLite: +class NeuronInfoLite(InfoBase): """Dataclass for neuron metadata, but without the weights and bonds.""" hotkey: str @@ -345,75 +385,77 @@ def get_null_neuron() -> "NeuronInfoLite": ) return neuron + @classmethod + def _fix_decoded(cls, decoded: Union[dict, "NeuronInfoLite"]) -> "NeuronInfoLite": + active = decoded.get("active") + axon_info = decoded.get("axon_info") + coldkey = decode_account_id(decoded.get("coldkey")) + consensus = decoded.get("consensus") + dividends = decoded.get("dividends") + emission = decoded.get("emission") + hotkey = decode_account_id(decoded.get("hotkey")) + incentive = decoded.get("incentive") + last_update = decoded.get("last_update") + netuid = decoded.get("netuid") + prometheus_info = decoded.get("prometheus_info") + pruning_score = decoded.get("pruning_score") + rank = decoded.get("rank") + stake_dict = process_stake_data(decoded.get("stake")) + stake = sum(stake_dict.values()) if stake_dict else Balance(0) + trust = decoded.get("trust") + uid = decoded.get("uid") + validator_permit = decoded.get("validator_permit") + validator_trust = decoded.get("validator_trust") + + neuron = cls( + active=active, + axon_info=AxonInfo( + version=axon_info.version, + ip=str(netaddr.IPAddress(axon_info.ip)), + port=axon_info.port, + ip_type=axon_info.ip_type, + placeholder1=axon_info.placeholder1, + placeholder2=axon_info.placeholder2, + protocol=axon_info.protocol, + hotkey=hotkey, + coldkey=coldkey, + ), + coldkey=coldkey, + consensus=u16_normalized_float(consensus), + dividends=u16_normalized_float(dividends), + emission=emission / 1e9, + hotkey=hotkey, + incentive=u16_normalized_float(incentive), + last_update=last_update, + netuid=netuid, + prometheus_info=PrometheusInfo( + version=prometheus_info.version, + ip=str(netaddr.IPAddress(prometheus_info.ip)), + port=prometheus_info.port, + ip_type=prometheus_info.ip_type, + block=prometheus_info.block, + ), + pruning_score=pruning_score, + rank=u16_normalized_float(rank), + stake_dict=stake_dict, + stake=stake, + total_stake=stake, + trust=u16_normalized_float(trust), + uid=uid, + validator_permit=validator_permit, + validator_trust=u16_normalized_float(validator_trust), + ) + + return neuron + @classmethod def list_from_vec_u8(cls, vec_u8: bytes) -> list["NeuronInfoLite"]: decoded = bt_decode.NeuronInfoLite.decode_vec(vec_u8) - results = [] - for item in decoded: - active = item.active - axon_info = item.axon_info - coldkey = decode_account_id(item.coldkey) - consensus = item.consensus - dividends = item.dividends - emission = item.emission - hotkey = decode_account_id(item.hotkey) - incentive = item.incentive - last_update = item.last_update - netuid = item.netuid - prometheus_info = item.prometheus_info - pruning_score = item.pruning_score - rank = item.rank - stake_dict = process_stake_data(item.stake) - stake = sum(stake_dict.values()) if stake_dict else Balance(0) - trust = item.trust - uid = item.uid - validator_permit = item.validator_permit - validator_trust = item.validator_trust - results.append( - NeuronInfoLite( - active=active, - axon_info=AxonInfo( - version=axon_info.version, - ip=str(netaddr.IPAddress(axon_info.ip)), - port=axon_info.port, - ip_type=axon_info.ip_type, - placeholder1=axon_info.placeholder1, - placeholder2=axon_info.placeholder2, - protocol=axon_info.protocol, - hotkey=hotkey, - coldkey=coldkey, - ), - coldkey=coldkey, - consensus=u16_normalized_float(consensus), - dividends=u16_normalized_float(dividends), - emission=emission / 1e9, - hotkey=hotkey, - incentive=u16_normalized_float(incentive), - last_update=last_update, - netuid=netuid, - prometheus_info=PrometheusInfo( - version=prometheus_info.version, - ip=str(netaddr.IPAddress(prometheus_info.ip)), - port=prometheus_info.port, - ip_type=prometheus_info.ip_type, - block=prometheus_info.block, - ), - pruning_score=pruning_score, - rank=u16_normalized_float(rank), - stake_dict=stake_dict, - stake=stake, - total_stake=stake, - trust=u16_normalized_float(trust), - uid=uid, - validator_permit=validator_permit, - validator_trust=u16_normalized_float(validator_trust), - ) - ) - return results + return [cls._fix_decoded(d) for d in decoded] @dataclass -class DelegateInfo: +class DelegateInfo(InfoBase): """ Dataclass for delegate information. For a lighter version of this class, see :func:`DelegateInfoLite`. @@ -444,15 +486,15 @@ class DelegateInfo: total_daily_return: Balance # Total daily return of the delegate @classmethod - def from_vec_u8(cls, vec_u8: bytes) -> Optional["DelegateInfo"]: - decoded = bt_decode.DelegateInfo.decode(vec_u8) + def _fix_decoded(cls, decoded: "DelegateInfo") -> "DelegateInfo": + # TODO check if this is hotkey_ss58 or delegate_ss58 from bt-decode hotkey = decode_account_id(decoded.delegate_ss58) owner = decode_account_id(decoded.owner_ss58) nominators = [ (decode_account_id(x), Balance.from_rao(y)) for x, y in decoded.nominators ] total_stake = sum((x[1] for x in nominators)) if nominators else Balance(0) - return DelegateInfo( + return cls( hotkey_ss58=hotkey, total_stake=total_stake, nominators=nominators, @@ -464,39 +506,22 @@ def from_vec_u8(cls, vec_u8: bytes) -> Optional["DelegateInfo"]: total_daily_return=Balance.from_rao(decoded.total_daily_return), ) + @classmethod + def from_vec_u8(cls, vec_u8: bytes) -> Optional["DelegateInfo"]: + decoded = bt_decode.DelegateInfo.decode(vec_u8) + return cls._fix_decoded(decoded) + @classmethod def list_from_vec_u8(cls, vec_u8: bytes) -> list["DelegateInfo"]: decoded = bt_decode.DelegateInfo.decode_vec(vec_u8) - results = [] - for d in decoded: - hotkey = decode_account_id(d.delegate_ss58) - owner = decode_account_id(d.owner_ss58) - nominators = [ - (decode_account_id(x), Balance.from_rao(y)) for x, y in d.nominators - ] - total_stake = sum((x[1] for x in nominators)) if nominators else Balance(0) - results.append( - DelegateInfo( - hotkey_ss58=hotkey, - total_stake=total_stake, - nominators=nominators, - owner_ss58=owner, - take=u16_normalized_float(d.take), - validator_permits=d.validator_permits, - registrations=d.registrations, - return_per_1000=Balance.from_rao(d.return_per_1000), - total_daily_return=Balance.from_rao(d.total_daily_return), - ) - ) - return results + return [cls._fix_decoded(d) for d in decoded] @classmethod - def delegated_list_from_vec_u8( - cls, vec_u8: bytes + def _fix_delegated_list( + cls, delegated_list: list[tuple["DelegateInfo", Balance]] ) -> list[tuple["DelegateInfo", Balance]]: - decoded = bt_decode.DelegateInfo.decode_delegated(vec_u8) results = [] - for d, b in decoded: + for d, b in delegated_list: nominators = [ (decode_account_id(x), Balance.from_rao(y)) for x, y in d.nominators ] @@ -515,9 +540,22 @@ def delegated_list_from_vec_u8( results.append((delegate, Balance.from_rao(b))) return results + @classmethod + def delegated_list_from_vec_u8( + cls, vec_u8: bytes + ) -> list[tuple["DelegateInfo", Balance]]: + decoded = bt_decode.DelegateInfo.decode_delegated(vec_u8) + return cls._fix_delegated_list(decoded) + + @classmethod + def delegated_list_from_any( + cls, any_list: list[Union[tuple["DelegateInfo", Balance], tuple[dict, Balance]]] + ) -> list[tuple["DelegateInfo", Balance]]: + return cls._fix_delegated_list(any_list) + @dataclass -class SubnetInfo: +class SubnetInfo(InfoBase): """Dataclass for subnet info.""" netuid: int @@ -539,194 +577,34 @@ class SubnetInfo: burn: Balance owner_ss58: str + @classmethod + def _fix_decoded(cls, decoded: "SubnetInfo") -> "SubnetInfo": + d = decoded + return SubnetInfo( + netuid=d.netuid, + rho=d.rho, + kappa=d.kappa, + difficulty=d.difficulty, + immunity_period=d.immunity_period, + max_allowed_validators=d.max_allowed_validators, + min_allowed_weights=d.min_allowed_weights, + max_weight_limit=d.max_weights_limit, + scaling_law_power=d.scaling_law_power, + subnetwork_n=d.subnetwork_n, + max_n=d.max_allowed_uids, + blocks_since_epoch=d.blocks_since_last_step, + tempo=d.tempo, + modality=d.network_modality, + connection_requirements={ + str(int(netuid)): u16_normalized_float(int(req)) + for (netuid, req) in d.network_connect + }, + emission_value=d.emission_values, + burn=Balance.from_rao(d.burn), + owner_ss58=decode_account_id(d.owner), + ) + @classmethod def list_from_vec_u8(cls, vec_u8: bytes) -> list["SubnetInfo"]: decoded = bt_decode.SubnetInfo.decode_vec_option(vec_u8) - result = [] - for d in decoded: - result.append( - SubnetInfo( - netuid=d.netuid, - rho=d.rho, - kappa=d.kappa, - difficulty=d.difficulty, - immunity_period=d.immunity_period, - max_allowed_validators=d.max_allowed_validators, - min_allowed_weights=d.min_allowed_weights, - max_weight_limit=d.max_weights_limit, - scaling_law_power=d.scaling_law_power, - subnetwork_n=d.subnetwork_n, - max_n=d.max_allowed_uids, - blocks_since_epoch=d.blocks_since_last_step, - tempo=d.tempo, - modality=d.network_modality, - connection_requirements={ - str(int(netuid)): u16_normalized_float(int(req)) - for (netuid, req) in d.network_connect - }, - emission_value=d.emission_values, - burn=Balance.from_rao(d.burn), - owner_ss58=decode_account_id(d.owner), - ) - ) - return result - - -custom_rpc_type_registry = { - "types": { - "SubnetInfo": { - "type": "struct", - "type_mapping": [ - ["netuid", "Compact"], - ["rho", "Compact"], - ["kappa", "Compact"], - ["difficulty", "Compact"], - ["immunity_period", "Compact"], - ["max_allowed_validators", "Compact"], - ["min_allowed_weights", "Compact"], - ["max_weights_limit", "Compact"], - ["scaling_law_power", "Compact"], - ["subnetwork_n", "Compact"], - ["max_allowed_uids", "Compact"], - ["blocks_since_last_step", "Compact"], - ["tempo", "Compact"], - ["network_modality", "Compact"], - ["network_connect", "Vec<[u16; 2]>"], - ["emission_values", "Compact"], - ["burn", "Compact"], - ["owner", "AccountId"], - ], - }, - "DelegateInfo": { - "type": "struct", - "type_mapping": [ - ["delegate_ss58", "AccountId"], - ["take", "Compact"], - ["nominators", "Vec<(AccountId, Compact)>"], - ["owner_ss58", "AccountId"], - ["registrations", "Vec>"], - ["validator_permits", "Vec>"], - ["return_per_1000", "Compact"], - ["total_daily_return", "Compact"], - ], - }, - "NeuronInfo": { - "type": "struct", - "type_mapping": [ - ["hotkey", "AccountId"], - ["coldkey", "AccountId"], - ["uid", "Compact"], - ["netuid", "Compact"], - ["active", "bool"], - ["axon_info", "axon_info"], - ["prometheus_info", "PrometheusInfo"], - ["stake", "Vec<(AccountId, Compact)>"], - ["rank", "Compact"], - ["emission", "Compact"], - ["incentive", "Compact"], - ["consensus", "Compact"], - ["trust", "Compact"], - ["validator_trust", "Compact"], - ["dividends", "Compact"], - ["last_update", "Compact"], - ["validator_permit", "bool"], - ["weights", "Vec<(Compact, Compact)>"], - ["bonds", "Vec<(Compact, Compact)>"], - ["pruning_score", "Compact"], - ], - }, - "NeuronInfoLite": { - "type": "struct", - "type_mapping": [ - ["hotkey", "AccountId"], - ["coldkey", "AccountId"], - ["uid", "Compact"], - ["netuid", "Compact"], - ["active", "bool"], - ["axon_info", "axon_info"], - ["prometheus_info", "PrometheusInfo"], - ["stake", "Vec<(AccountId, Compact)>"], - ["rank", "Compact"], - ["emission", "Compact"], - ["incentive", "Compact"], - ["consensus", "Compact"], - ["trust", "Compact"], - ["validator_trust", "Compact"], - ["dividends", "Compact"], - ["last_update", "Compact"], - ["validator_permit", "bool"], - ["pruning_score", "Compact"], - ], - }, - "axon_info": { - "type": "struct", - "type_mapping": [ - ["block", "u64"], - ["version", "u32"], - ["ip", "u128"], - ["port", "u16"], - ["ip_type", "u8"], - ["protocol", "u8"], - ["placeholder1", "u8"], - ["placeholder2", "u8"], - ], - }, - "PrometheusInfo": { - "type": "struct", - "type_mapping": [ - ["block", "u64"], - ["version", "u32"], - ["ip", "u128"], - ["port", "u16"], - ["ip_type", "u8"], - ], - }, - "IPInfo": { - "type": "struct", - "type_mapping": [ - ["ip", "Compact"], - ["ip_type_and_protocol", "Compact"], - ], - }, - "StakeInfo": { - "type": "struct", - "type_mapping": [ - ["hotkey", "AccountId"], - ["coldkey", "AccountId"], - ["stake", "Compact"], - ], - }, - "SubnetHyperparameters": { - "type": "struct", - "type_mapping": [ - ["rho", "Compact"], - ["kappa", "Compact"], - ["immunity_period", "Compact"], - ["min_allowed_weights", "Compact"], - ["max_weights_limit", "Compact"], - ["tempo", "Compact"], - ["min_difficulty", "Compact"], - ["max_difficulty", "Compact"], - ["weights_version", "Compact"], - ["weights_rate_limit", "Compact"], - ["adjustment_interval", "Compact"], - ["activity_cutoff", "Compact"], - ["registration_allowed", "bool"], - ["target_regs_per_interval", "Compact"], - ["min_burn", "Compact"], - ["max_burn", "Compact"], - ["bonds_moving_avg", "Compact"], - ["max_regs_per_block", "Compact"], - ["serving_rate_limit", "Compact"], - ["max_validators", "Compact"], - ["adjustment_alpha", "Compact"], - ["difficulty", "Compact"], - ["commit_reveal_weights_interval", "Compact"], - ["commit_reveal_weights_enabled", "bool"], - ["alpha_high", "Compact"], - ["alpha_low", "Compact"], - ["liquid_alpha_enabled", "bool"], - ], - }, - } -} + return [cls._fix_decoded(d) for d in decoded] diff --git a/bittensor_cli/src/bittensor/extrinsics/transfer.py b/bittensor_cli/src/bittensor/extrinsics/transfer.py index 127fb4880..d1efd6bba 100644 --- a/bittensor_cli/src/bittensor/extrinsics/transfer.py +++ b/bittensor_cli/src/bittensor/extrinsics/transfer.py @@ -64,14 +64,13 @@ async def get_transfer_fee() -> Balance: call=call, keypair=wallet.coldkeypub ) except SubstrateRequestException as e: - payment_info = {"partialFee": int(2e7)} # assume 0.02 Tao + payment_info = {"partial_fee": int(2e7)} # assume 0.02 Tao err_console.print( - f":cross_mark: [red]Failed to get payment info[/red]:[bold white]\n" - f" {format_error_message(e)}[/bold white]\n" - f" Defaulting to default transfer fee: {payment_info['partialFee']}" + f":cross_mark: [red]Failed to get payment info[/red]:\n" + f" [bold white]{format_error_message(e)}[/bold white]\n" + f" Defaulting to default transfer fee: {payment_info['partial_fee']}" ) - - return Balance.from_rao(payment_info["partialFee"]) + return Balance.from_rao(payment_info["partial_fee"]) async def do_transfer() -> tuple[bool, str, str]: """ diff --git a/bittensor_cli/src/bittensor/subtensor_interface.py b/bittensor_cli/src/bittensor/subtensor_interface.py index 8f97e58de..9aab09697 100644 --- a/bittensor_cli/src/bittensor/subtensor_interface.py +++ b/bittensor_cli/src/bittensor/subtensor_interface.py @@ -7,8 +7,6 @@ from bittensor_wallet.utils import SS58_FORMAT import scalecodec from scalecodec import GenericCall -from scalecodec.base import RuntimeConfiguration -from scalecodec.type_registry import load_type_registry_preset from substrateinterface.exceptions import SubstrateRequestException import typer @@ -18,7 +16,6 @@ ) from bittensor_cli.src.bittensor.chain_data import ( DelegateInfo, - custom_rpc_type_registry, StakeInfo, NeuronInfoLite, NeuronInfo, @@ -29,13 +26,11 @@ from bittensor_cli.src.bittensor.balances import Balance from bittensor_cli.src import Constants, defaults, TYPE_REGISTRY from bittensor_cli.src.bittensor.utils import ( - ss58_to_vec_u8, format_error_message, console, err_console, decode_hex_identity_dict, validate_chain_endpoint, - hex_to_bytes, ) @@ -207,16 +202,14 @@ async def get_delegates( :return: List of DelegateInfo objects, or an empty list if there are no delegates. """ - hex_bytes_result = await self.query_runtime_api( + result = await self.query_runtime_api( runtime_api="DelegateInfoRuntimeApi", method="get_delegates", params=[], block_hash=block_hash, ) - if hex_bytes_result is not None: - return DelegateInfo.list_from_vec_u8(hex_to_bytes(hex_bytes_result)) - else: - return [] + + return DelegateInfo.list_from_any(result) if result is not None else [] async def get_stake_info_for_coldkey( self, @@ -237,20 +230,18 @@ async def get_stake_info_for_coldkey( Stake information is vital for account holders to assess their investment and participation in the network's delegation and consensus processes. """ - encoded_coldkey = ss58_to_vec_u8(coldkey_ss58) - - hex_bytes_result = await self.query_runtime_api( + result = await self.query_runtime_api( runtime_api="StakeInfoRuntimeApi", method="get_stake_info_for_coldkey", - params=[encoded_coldkey], + params=[coldkey_ss58], block_hash=block_hash, reuse_block=reuse_block, ) - if hex_bytes_result is None: + if result is None: return [] - return StakeInfo.list_from_vec_u8(hex_to_bytes(hex_bytes_result)) + return StakeInfo.list_from_any(result) async def get_stake_for_coldkey_and_hotkey( self, hotkey_ss58: str, coldkey_ss58: str, block_hash: Optional[str] @@ -277,7 +268,7 @@ async def query_runtime_api( params: Optional[Union[list[list[int]], list[int], dict[str, int]]], block_hash: Optional[str] = None, reuse_block: Optional[bool] = False, - ) -> Optional[str]: + ) -> Optional[Any]: """ Queries the runtime API of the Bittensor blockchain, providing a way to interact with the underlying runtime and retrieve data encoded in Scale Bytes format. This function is essential for advanced users @@ -289,43 +280,44 @@ async def query_runtime_api( :param block_hash: The hash of the blockchain block number at which to perform the query. :param reuse_block: Whether to reuse the last-used block hash. - :return: The Scale Bytes encoded result from the runtime API call, or ``None`` if the call fails. + :return: The decoded result from the runtime API call, or ``None`` if the call fails. This function enables access to the deeper layers of the Bittensor blockchain, allowing for detailed and specific interactions with the network's runtime environment. """ - call_definition = TYPE_REGISTRY["runtime_api"][runtime_api]["methods"][method] - - data = ( - "0x" - if params is None - else await self.encode_params( - call_definition=call_definition, params=params - ) - ) - api_method = f"{runtime_api}_{method}" - - json_result = await self.substrate.rpc_request( - method="state_call", - params=[api_method, data, block_hash] if block_hash else [api_method, data], + result = await self.substrate.runtime_call( + runtime_api, method, params, block_hash ) - if json_result is None: - return None - - return_type = call_definition["type"] + return result - as_scale_bytes = scalecodec.ScaleBytes(json_result["result"]) # type: ignore + async def query_runtime_api_wait_to_decode( + self, + runtime_api: str, + method: str, + params: Optional[Union[list[list[int]], dict[str, int]]], + block_hash: Optional[str] = None, + reuse_block: Optional[bool] = False, + ) -> tuple[str, bytes]: + """ + Queries the runtime API of the Bittensor blockchain, providing a way to interact with the underlying + runtime and retrieve data encoded in Scale Bytes format. This function is essential for advanced users + who need to interact with specific runtime methods and decode complex data types. - rpc_runtime_config = RuntimeConfiguration() - rpc_runtime_config.update_type_registry(load_type_registry_preset("legacy")) - rpc_runtime_config.update_type_registry(custom_rpc_type_registry) + :param runtime_api: The name of the runtime API to query. + :param method: The specific method within the runtime API to call. + :param params: The parameters to pass to the method call. + :param block_hash: The hash of the blockchain block number at which to perform the query. + :param reuse_block: Whether to reuse the last-used block hash. - obj = rpc_runtime_config.create_scale_object(return_type, as_scale_bytes) - if obj.data.to_hex() == "0x0400": # RPC returned None result - return None + :return: Tuple of the runtime call type and the result bytes - return obj.decode() + This function enables access to the deeper layers of the Bittensor blockchain, allowing for detailed + and specific interactions with the network's runtime environment. + """ + return await self.substrate.runtime_call_wait_to_decode( + runtime_api, method, params, block_hash + ) async def get_balance( self, @@ -632,7 +624,7 @@ async def neurons_lite( This function offers a quick overview of the neuron population within a subnet, facilitating efficient analysis of the network's decentralized structure and neuron dynamics. """ - hex_bytes_result = await self.query_runtime_api( + result = await self.query_runtime_api( runtime_api="NeuronInfoRuntimeApi", method="get_neurons_lite", params=[ @@ -642,10 +634,10 @@ async def neurons_lite( reuse_block=reuse_block, ) - if hex_bytes_result is None: + if result is None: return [] - return NeuronInfoLite.list_from_vec_u8(hex_to_bytes(hex_bytes_result)) + return NeuronInfoLite.list_from_any(result) async def neuron_for_uid( self, uid: Optional[int], netuid: int, block_hash: Optional[str] = None @@ -668,17 +660,20 @@ async def neuron_for_uid( if uid is None: return NeuronInfo.get_null_neuron() - params = [netuid, uid, block_hash] if block_hash else [netuid, uid] - json_body = await self.substrate.rpc_request( - method="neuronInfo_getNeuron", - params=params, # custom rpc method + result = await self.query_runtime_api( + runtime_api="NeuronInfoRuntimeApi", + method="get_neuron", + params=[ + netuid, + uid, + ], # TODO check to see if this can accept more than one at a time + block_hash=block_hash, ) - if not (result := json_body.get("result", None)): + if not result: return NeuronInfo.get_null_neuron() - bytes_result = bytes(result) - return NeuronInfo.from_vec_u8(bytes_result) + return NeuronInfo.from_any(result) async def get_delegated( self, @@ -705,16 +700,18 @@ async def get_delegated( if block_hash else (self.substrate.last_block_hash if reuse_block else None) ) - encoded_coldkey = ss58_to_vec_u8(coldkey_ss58) - json_body = await self.substrate.rpc_request( - method="delegateInfo_getDelegated", - params=([block_hash, encoded_coldkey] if block_hash else [encoded_coldkey]), + + result = await self.query_runtime_api( + runtime_api="DelegateInfoRuntimeApi", + method="get_delegated", + params=[coldkey_ss58], + block_hash=block_hash, ) - if not (result := json_body.get("result")): + if not result: return [] - return DelegateInfo.delegated_list_from_vec_u8(bytes(result)) + return DelegateInfo.delegated_list_from_any(result) async def query_identity( self, @@ -962,17 +959,17 @@ async def get_subnet_hyperparameters( Understanding the hyperparameters is crucial for comprehending how subnets are configured and managed, and how they interact with the network's consensus and incentive mechanisms. """ - hex_bytes_result = await self.query_runtime_api( + result = await self.query_runtime_api( runtime_api="SubnetInfoRuntimeApi", method="get_subnet_hyperparams", params=[netuid], block_hash=block_hash, ) - if hex_bytes_result is None: + if not result: return [] - return SubnetHyperparameters.from_vec_u8(hex_to_bytes(hex_bytes_result)) + return SubnetHyperparameters.from_any(result) async def get_vote_data( self, diff --git a/bittensor_cli/src/bittensor/utils.py b/bittensor_cli/src/bittensor/utils.py index 71d5e12ac..cade09d7c 100644 --- a/bittensor_cli/src/bittensor/utils.py +++ b/bittensor_cli/src/bittensor/utils.py @@ -12,6 +12,7 @@ from bittensor_wallet.utils import SS58_FORMAT from bittensor_wallet.errors import KeyFileError, PasswordError from bittensor_wallet import utils +import bt_decode from jinja2 import Template from markupsafe import Markup import numpy as np @@ -20,6 +21,7 @@ import scalecodec from scalecodec.base import RuntimeConfiguration from scalecodec.type_registry import load_type_registry_preset +from scalecodec.utils.ss58 import ss58_encode, ss58_decode import typer @@ -376,15 +378,36 @@ def is_valid_bittensor_address_or_public_key(address: Union[str, bytes]) -> bool return False -def decode_scale_bytes(return_type, scale_bytes, custom_rpc_type_registry): - """Decodes a ScaleBytes object using our type registry and return type""" - rpc_runtime_config = RuntimeConfiguration() - rpc_runtime_config.update_type_registry(load_type_registry_preset("legacy")) - rpc_runtime_config.update_type_registry(custom_rpc_type_registry) - obj = rpc_runtime_config.create_scale_object(return_type, scale_bytes) - if obj.data.to_hex() == "0x0400": # RPC returned None result +def decode_scale_bytes( + return_type: str, + scale_bytes: Union["scalecodec.ScaleBytes", bytes], + custom_rpc_type_registry: Union[str, "bt_decode.PortableRegistry"], +) -> Any: + """ + Decodes a ScaleBytes object using our type registry and return type + + :param return_type: the type string to decode the scale bytes to + :param scale_bytes: the scale bytes to decode (either a scalecodec.ScaleBytes or bytes) + :param custom_rpc_type_registry: contains the type registry + + :return: the decoded object + """ + if isinstance(custom_rpc_type_registry, str): + portable_registry = bt_decode.PortableRegistry.from_json( + custom_rpc_type_registry + ) + else: + portable_registry = custom_rpc_type_registry + + if isinstance(scale_bytes, scalecodec.ScaleBytes): + as_bytes = bytes(scale_bytes.data) + else: + as_bytes = bytes(scale_bytes) + + if as_bytes.hex() == "0x0400": # RPC returned None result return None - return obj.decode() + + return bt_decode.decode(return_type, portable_registry, as_bytes) def ss58_address_to_bytes(ss58_address: str) -> bytes: @@ -486,7 +509,7 @@ def format_error_message(error_message: Union[dict, Exception]) -> str: elif all(x in d for x in ["code", "message", "data"]): new_error_message = d break - except ValueError: + except (ValueError, TypeError, SyntaxError, MemoryError, RecursionError): pass if new_error_message is None: return_val = " ".join(error_message.args) @@ -1010,3 +1033,21 @@ def hex_to_bytes(hex_str: str) -> bytes: else: bytes_result = bytes.fromhex(hex_str) return bytes_result + + +def bytes_from_hex_string_result(hex_string_result: str) -> bytes: + if hex_string_result.startswith("0x"): + hex_string_result = hex_string_result[2:] + + return bytes.fromhex(hex_string_result) + + +def decode_account_id(account_id_bytes: Union[tuple[int], tuple[tuple[int]]]): + if isinstance(account_id_bytes, tuple) and isinstance(account_id_bytes[0], tuple): + account_id_bytes = account_id_bytes[0] + # Convert the AccountId bytes to a Base64 string + return ss58_encode(bytes(account_id_bytes).hex(), SS58_FORMAT) + + +def encode_account_id(ss58_address: str) -> bytes: + return bytes.fromhex(ss58_decode(ss58_address, SS58_FORMAT)) diff --git a/bittensor_cli/src/commands/root.py b/bittensor_cli/src/commands/root.py index 1dde73df1..38181fd1e 100644 --- a/bittensor_cli/src/commands/root.py +++ b/bittensor_cli/src/commands/root.py @@ -38,7 +38,6 @@ print_verbose, get_metadata_table, render_table, - ss58_to_vec_u8, update_metadata_table, group_subnets, unlock_key, @@ -390,15 +389,17 @@ async def set_take_extrinsic( async def _get_delegate_by_hotkey(ss58: str) -> Optional[DelegateInfo]: """Retrieves the delegate info for a given hotkey's ss58 address""" - encoded_hotkey = ss58_to_vec_u8(ss58) - json_body = await subtensor.substrate.rpc_request( - method="delegateInfo_getDelegate", # custom rpc method - params=([encoded_hotkey, subtensor.substrate.last_block_hash]), + result = await subtensor.query_runtime_api( + runtime_api="DelegateInfoRuntimeApi", + method="get_delegate", + params=[ss58], + block_hash=subtensor.substrate.last_block_hash, ) - if not (result := json_body.get("result", None)): + + if result is None: return None - else: - return DelegateInfo.from_vec_u8(bytes(result)) + + return DelegateInfo.from_any(result) # Calculate u16 representation of the take take_u16 = int(take * 0xFFFF) diff --git a/bittensor_cli/src/commands/stake/children_hotkeys.py b/bittensor_cli/src/commands/stake/children_hotkeys.py index fc913faa9..66f25d3e3 100644 --- a/bittensor_cli/src/commands/stake/children_hotkeys.py +++ b/bittensor_cli/src/commands/stake/children_hotkeys.py @@ -299,11 +299,7 @@ async def get_total_stake_for_hk(hotkey: str, parent: bool = False): params=[hotkey], reuse_block_hash=True, ) - stake = ( - Balance.from_rao(_result) - if _result is not None - else Balance(0) - ) + stake = Balance.from_rao(_result) if _result is not None else Balance(0) if parent: console.print( f"\nYour Hotkey: [bright_magenta]{hotkey}[/bright_magenta] | Total Stake: [dark_orange]{stake}t[/dark_orange]\n", diff --git a/bittensor_cli/src/commands/wallets.py b/bittensor_cli/src/commands/wallets.py index 9a8b0bfac..6ee7328c7 100644 --- a/bittensor_cli/src/commands/wallets.py +++ b/bittensor_cli/src/commands/wallets.py @@ -12,6 +12,7 @@ from bittensor_wallet import Wallet from bittensor_wallet.errors import KeyFileError from bittensor_wallet.keyfile import Keyfile +from bt_decode import PortableRegistry from fuzzywuzzy import fuzz from rich import box from rich.align import Align @@ -42,6 +43,7 @@ RAO_PER_TAO, console, convert_blocks_to_time, + decode_scale_bytes, err_console, print_error, print_verbose, @@ -1142,34 +1144,34 @@ def _map_hotkey_to_neurons( async def _fetch_neuron_for_netuid( netuid: int, subtensor: SubtensorInterface -) -> tuple[int, Optional[str]]: +) -> tuple[int, Optional[tuple[str, bytes]]]: """ Retrieves all neurons for a specified netuid :param netuid: the netuid to query :param subtensor: the SubtensorInterface to make the query - :return: the original netuid, and a mapping of the neurons to their NeuronInfoLite objects + :return: the original netuid and a tuple of the neurons type and the encoded hex-bytes """ async def neurons_lite_for_uid(uid: int) -> Optional[str]: block_hash = subtensor.substrate.last_block_hash - hex_bytes_result = await subtensor.query_runtime_api( + type_string, bytes_result = await subtensor.query_runtime_api_wait_to_decode( runtime_api="NeuronInfoRuntimeApi", method="get_neurons_lite", params=[uid], block_hash=block_hash, ) - return hex_bytes_result + return type_string, bytes_result - neurons = await neurons_lite_for_uid(uid=netuid) - return netuid, neurons + result = await neurons_lite_for_uid(uid=netuid) + return netuid, result async def _fetch_all_neurons( netuids: list[int], subtensor -) -> list[tuple[int, Optional[str]]]: +) -> list[tuple[int, Optional[tuple[str, bytes]]]]: """Retrieves all neurons for each of the specified netuids""" return list( await asyncio.gather( @@ -1179,19 +1181,25 @@ async def _fetch_all_neurons( def _process_neurons_for_netuids( - netuids_with_all_neurons_hex_bytes: list[tuple[int, Optional[str]]], + netuids_with_all_neurons_bytes: list[tuple[int, Optional[tuple[str, bytes]]]], + custom_rpc_type_registry: PortableRegistry, ) -> list[tuple[int, list[NeuronInfoLite]]]: """ - Decode a list of hex-bytes neurons with their respective netuid + Decode a list of hex-bytes neurons (and typestring) with their respective netuid :param netuids_with_all_neurons_hex_bytes: netuids with hex-bytes neurons :return: netuids mapped to decoded neurons """ all_results = [ - (netuid, NeuronInfoLite.list_from_vec_u8(hex_to_bytes(result))) + ( + netuid, + NeuronInfoLite.list_from_any( + decode_scale_bytes(result[0], result[1], custom_rpc_type_registry) + ), + ) if result else (netuid, []) - for netuid, result in netuids_with_all_neurons_hex_bytes + for netuid, result in netuids_with_all_neurons_bytes ] return all_results @@ -1199,9 +1207,15 @@ def _process_neurons_for_netuids( async def _get_neurons_for_netuids( subtensor: SubtensorInterface, netuids: list[int], hot_wallets: list[str] ) -> list[tuple[int, list["NeuronInfoLite"], Optional[str]]]: - all_neurons_hex_bytes = await _fetch_all_neurons(netuids, subtensor) + all_neurons_bytes = await _fetch_all_neurons(netuids, subtensor) - all_processed_neurons = _process_neurons_for_netuids(all_neurons_hex_bytes) + if not subtensor.substrate.registry: + subtensor.substrate.initialize() + + custom_rpc_type_registry: PortableRegistry = subtensor.substrate.registry + all_processed_neurons = _process_neurons_for_netuids( + all_neurons_bytes, custom_rpc_type_registry + ) return [ _map_hotkey_to_neurons(neurons, hot_wallets, netuid) for netuid, neurons in all_processed_neurons diff --git a/requirements.txt b/requirements.txt index 4aa89c36e..395541726 100644 --- a/requirements.txt +++ b/requirements.txt @@ -17,4 +17,5 @@ substrate-interface~=1.7.9 typer~=0.12 websockets>=14.1 bittensor-wallet>=2.1.3 -bt-decode==0.4.0 \ No newline at end of file +# TODO remove munch and setuptools +bt-decode==0.4.0