Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
128 changes: 51 additions & 77 deletions bittensor/core/async_subtensor.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,10 @@
from bittensor_wallet.utils import SS58_FORMAT
from numpy.typing import NDArray
from scalecodec import GenericCall
from scalecodec.base import RuntimeConfiguration
from scalecodec.type_registry import load_type_registry_preset
from substrateinterface.exceptions import SubstrateRequestException

from bittensor.core.chain_data import (
DelegateInfo,
custom_rpc_type_registry,
StakeInfo,
NeuronInfoLite,
NeuronInfo,
Expand Down Expand Up @@ -50,7 +47,6 @@
from bittensor.core.settings import version_as_int
from bittensor.utils import (
torch,
ss58_to_vec_u8,
format_error_message,
decode_hex_identity_dict,
validate_chain_endpoint,
Expand Down Expand Up @@ -498,17 +494,15 @@ async def get_delegates(
List of DelegateInfo objects, or an empty list if there are no delegates.
"""
block_hash = await self._determine_block_hash(block, block_hash, reuse_block)
hex_bytes_result = await self.query_runtime_api(
result = await self.query_runtime_api(
runtime_api="DelegateInfoRuntimeApi",
method="get_delegates",
params=[],
block_hash=block_hash,
reuse_block=reuse_block,
)
if hex_bytes_result is not None:
return DelegateInfo.list_from_vec_u8(hex_to_bytes(hex_bytes_result))
else:
return []

return DelegateInfo.list_from_any(result) if result is not None else []

async def get_stake_info_for_coldkey(
self,
Expand All @@ -534,20 +528,19 @@ async def get_stake_info_for_coldkey(
Stake information is vital for account holders to assess their investment and participation in the network's
delegation and consensus processes.
"""
encoded_coldkey = ss58_to_vec_u8(coldkey_ss58)
block_hash = await self._determine_block_hash(block, block_hash, reuse_block)
hex_bytes_result = await self.query_runtime_api(
result = await self.query_runtime_api(
runtime_api="StakeInfoRuntimeApi",
method="get_stake_info_for_coldkey",
params=[encoded_coldkey],
params=[coldkey_ss58],
block_hash=block_hash,
reuse_block=reuse_block,
)

if hex_bytes_result is None:
if result is None:
return []

return StakeInfo.list_from_vec_u8(hex_to_bytes(hex_bytes_result))
return StakeInfo.list_from_any(result)

async def get_stake_for_coldkey_and_hotkey(
self,
Expand Down Expand Up @@ -585,11 +578,11 @@ async def query_runtime_api(
self,
runtime_api: str,
method: str,
params: Optional[Union[list[list[int]], dict[str, int], list[int]]],
params: Optional[Union[list[Any], dict[str, Any]]],
block: Optional[int] = None,
block_hash: Optional[str] = None,
reuse_block: bool = False,
) -> Optional[str]:
) -> Optional[Any]:
"""
Queries the runtime API of the Bittensor blockchain, providing a way to interact with the underlying runtime and
retrieve data encoded in Scale Bytes format. This function is essential for advanced users who need to
Expand All @@ -605,46 +598,17 @@ async def query_runtime_api(
reuse_block: Whether to reuse the last-used block hash. Do not set if using block_hash or block

Returns:
The Scale Bytes encoded result from the runtime API call, or `None` if the call fails.
The decoded result from the runtime API call, or `None` if the call fails.

This function enables access to the deeper layers of the Bittensor blockchain, allowing for detailed and
specific interactions with the network's runtime environment.
"""
block_hash = await self._determine_block_hash(block, block_hash, reuse_block)

call_definition = TYPE_REGISTRY["runtime_api"][runtime_api]["methods"][method]

data = (
"0x"
if params is None
else await self.encode_params(
call_definition=call_definition, params=params
)
)
api_method = f"{runtime_api}_{method}"

json_result = await self.substrate.rpc_request(
method="state_call",
params=[api_method, data, block_hash] if block_hash else [api_method, data],
reuse_block_hash=reuse_block,
result = await self.substrate.runtime_call(
runtime_api, method, params, block_hash
)

if json_result is None:
return None

return_type = call_definition["type"]

as_scale_bytes = scalecodec.ScaleBytes(json_result["result"]) # type: ignore

rpc_runtime_config = RuntimeConfiguration()
rpc_runtime_config.update_type_registry(load_type_registry_preset("legacy"))
rpc_runtime_config.update_type_registry(custom_rpc_type_registry)

obj = rpc_runtime_config.create_scale_object(return_type, as_scale_bytes)
if obj.data.to_hex() == "0x0400": # RPC returned None result
return None

return obj.decode()
return result

async def get_balance(
self,
Expand Down Expand Up @@ -1002,18 +966,18 @@ async def neurons(
decentralized structure and the dynamics of its consensus and governance processes.
"""
block_hash = await self._determine_block_hash(block, block_hash, reuse_block)
hex_bytes_result = await self.query_runtime_api(
result = await self.query_runtime_api(
runtime_api="NeuronInfoRuntimeApi",
method="get_neurons",
params=[netuid],
block_hash=block_hash,
reuse_block=reuse_block,
)

if hex_bytes_result is None:
if result is None:
return []

return NeuronInfo.list_from_vec_u8(hex_to_bytes(hex_bytes_result))
return NeuronInfo.list_from_any(result)

async def neurons_lite(
self,
Expand Down Expand Up @@ -1041,18 +1005,18 @@ async def neurons_lite(
of the network's decentralized structure and neuron dynamics.
"""
block_hash = await self._determine_block_hash(block, block_hash, reuse_block)
hex_bytes_result = await self.query_runtime_api(
result = await self.query_runtime_api(
runtime_api="NeuronInfoRuntimeApi",
method="get_neurons_lite",
params=[netuid],
block_hash=block_hash,
reuse_block=reuse_block,
)

if hex_bytes_result is None:
if result is None:
return []

return NeuronInfoLite.list_from_vec_u8(hex_to_bytes(hex_bytes_result))
return NeuronInfoLite.list_from_any(result)

async def get_neuron_for_pubkey_and_subnet(
self,
Expand Down Expand Up @@ -1092,16 +1056,20 @@ async def get_neuron_for_pubkey_and_subnet(
if uid is None:
return NeuronInfo.get_null_neuron()

params = [netuid, uid]
json_body = await self.substrate.rpc_request(
method="neuronInfo_getNeuron",
params=params,
result = await self.query_runtime_api(
runtime_api="NeuronInfoRuntimeApi",
method="get_neuron",
params=[
netuid,
uid,
], # TODO check to see if this can accept more than one at a time
block_hash=block_hash,
)

if not (result := json_body.get("result", None)):
if not result:
return NeuronInfo.get_null_neuron()

return NeuronInfo.from_vec_u8(bytes(result))
return NeuronInfo.from_any(result)

async def neuron_for_uid(
self,
Expand Down Expand Up @@ -1137,16 +1105,20 @@ async def neuron_for_uid(
if reuse_block:
block_hash = self.substrate.last_block_hash

params = [netuid, uid, block_hash] if block_hash else [netuid, uid]
json_body = await self.substrate.rpc_request(
method="neuronInfo_getNeuron",
params=params, # custom rpc method
result = await self.query_runtime_api(
runtime_api="NeuronInfoRuntimeApi",
method="get_neuron",
params=[
netuid,
uid,
],
block_hash=block_hash,
)
if not (result := json_body.get("result", None)):

if not result:
return NeuronInfo.get_null_neuron()

bytes_result = bytes(result)
return NeuronInfo.from_vec_u8(bytes_result)
return NeuronInfo.from_any(result)

async def get_delegated(
self,
Expand Down Expand Up @@ -1177,16 +1149,18 @@ async def get_delegated(
if (bh := await self._determine_block_hash(block, block_hash, reuse_block))
else (self.substrate.last_block_hash if reuse_block else None)
)
encoded_coldkey = ss58_to_vec_u8(coldkey_ss58)
json_body = await self.substrate.rpc_request(
method="delegateInfo_getDelegated",
params=([block_hash, encoded_coldkey] if block_hash else [encoded_coldkey]),

result = await self.query_runtime_api(
runtime_api="DelegateInfoRuntimeApi",
method="get_delegated",
params=[coldkey_ss58],
block_hash=block_hash,
)

if not (result := json_body.get("result")):
if not result:
return []

return DelegateInfo.delegated_list_from_vec_u8(bytes(result))
return DelegateInfo.delegated_list_from_any(result)

async def query_identity(
self,
Expand Down Expand Up @@ -1496,18 +1470,18 @@ async def get_subnet_hyperparameters(
they interact with the network's consensus and incentive mechanisms.
"""
block_hash = await self._determine_block_hash(block, block_hash, reuse_block)
hex_bytes_result = await self.query_runtime_api(
result = await self.query_runtime_api(
runtime_api="SubnetInfoRuntimeApi",
method="get_subnet_hyperparams",
params=[netuid],
block_hash=block_hash,
reuse_block=reuse_block,
)

if hex_bytes_result is None:
if result is None:
return []

return SubnetHyperparameters.from_vec_u8(hex_to_bytes(hex_bytes_result))
return SubnetHyperparameters.from_any(result)

async def get_vote_data(
self,
Expand Down
2 changes: 1 addition & 1 deletion bittensor/core/chain_data/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,6 @@
from .stake_info import StakeInfo
from .subnet_hyperparameters import SubnetHyperparameters
from .subnet_info import SubnetInfo
from .utils import custom_rpc_type_registry, decode_account_id, process_stake_data
from .utils import decode_account_id, process_stake_data

ProposalCallData = GenericCall
57 changes: 25 additions & 32 deletions bittensor/core/chain_data/delegate_info.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
import bt_decode

from dataclasses import dataclass
from typing import Optional
from typing import Any, Optional

import bt_decode
import munch

from bittensor.core.chain_data.info_base import InfoBase
from bittensor.core.chain_data.utils import decode_account_id
from bittensor.utils import u16_normalized_float
from bittensor.utils.balance import Balance


@dataclass
class DelegateInfo:
class DelegateInfo(InfoBase):
"""
Dataclass for delegate information. For a lighter version of this class, see ``DelegateInfoLite``.

Expand Down Expand Up @@ -40,8 +42,7 @@ class DelegateInfo:
total_daily_return: Balance # Total daily return of the delegate

@classmethod
def from_vec_u8(cls, vec_u8: bytes) -> Optional["DelegateInfo"]:
decoded = bt_decode.DelegateInfo.decode(vec_u8)
def _fix_decoded(cls, decoded: "DelegateInfo") -> Optional["DelegateInfo"]:
hotkey = decode_account_id(decoded.delegate_ss58)
owner = decode_account_id(decoded.owner_ss58)
nominators = [
Expand All @@ -63,36 +64,14 @@ def from_vec_u8(cls, vec_u8: bytes) -> Optional["DelegateInfo"]:
@classmethod
def list_from_vec_u8(cls, vec_u8: bytes) -> list["DelegateInfo"]:
decoded = bt_decode.DelegateInfo.decode_vec(vec_u8)
results = []
for d in decoded:
hotkey = decode_account_id(d.delegate_ss58)
owner = decode_account_id(d.owner_ss58)
nominators = [
(decode_account_id(x), Balance.from_rao(y)) for x, y in d.nominators
]
total_stake = sum((x[1] for x in nominators)) if nominators else Balance(0)
results.append(
DelegateInfo(
hotkey_ss58=hotkey,
total_stake=total_stake,
nominators=nominators,
owner_ss58=owner,
take=u16_normalized_float(d.take),
validator_permits=d.validator_permits,
registrations=d.registrations,
return_per_1000=Balance.from_rao(d.return_per_1000),
total_daily_return=Balance.from_rao(d.total_daily_return),
)
)
return results
return [cls._fix_decoded(d) for d in decoded]

@classmethod
def delegated_list_from_vec_u8(
cls, vec_u8: bytes
def fix_delegated_list(
cls, delegated_list: list[tuple["DelegateInfo", Balance]]
) -> list[tuple["DelegateInfo", Balance]]:
decoded = bt_decode.DelegateInfo.decode_delegated(vec_u8)
results = []
for d, b in decoded:
for d, b in delegated_list:
nominators = [
(decode_account_id(x), Balance.from_rao(y)) for x, y in d.nominators
]
Expand All @@ -110,3 +89,17 @@ def delegated_list_from_vec_u8(
)
results.append((delegate, Balance.from_rao(b)))
return results

@classmethod
def delegated_list_from_vec_u8(
cls, vec_u8: bytes
) -> list[tuple["DelegateInfo", Balance]]:
decoded = bt_decode.DelegateInfo.decode_delegated(vec_u8)
return cls.fix_delegated_list(decoded)

@classmethod
def delegated_list_from_any(
cls, any_list: list[Any]
) -> list[tuple["DelegateInfo", Balance]]:
any_list = [munch.munchify(any_) for any_ in any_list]
return cls.fix_delegated_list(any_list)
Loading