diff --git a/bittensor/core/async_subtensor.py b/bittensor/core/async_subtensor.py index da7d75a82f..c4503b4142 100644 --- a/bittensor/core/async_subtensor.py +++ b/bittensor/core/async_subtensor.py @@ -973,6 +973,47 @@ async def get_block_hash(self, block: Optional[int] = None) -> str: else: return await self.substrate.get_chain_head() + async def get_parents( + self, + hotkey: str, + netuid: int, + block: Optional[int] = None, + block_hash: Optional[str] = None, + reuse_block: bool = False, + ) -> list[tuple[float, str]]: + """ + This method retrieves the parent of a given hotkey and netuid. It queries the SubtensorModule's ParentKeys + storage function to get the children and formats them before returning as a tuple. + + Arguments: + hotkey: The child hotkey SS58. + netuid: The netuid value. + block: The block number for which the children are to be retrieved. + block_hash: The hash of the block to retrieve the subnet unique identifiers from. + reuse_block: Whether to reuse the last-used block hash. + + Returns: + A list of formatted parents [(proportion, parent)] + """ + block_hash = await self.determine_block_hash(block, block_hash, reuse_block) + parents = await self.substrate.query( + module="SubtensorModule", + storage_function="ParentKeys", + params=[hotkey, netuid], + block_hash=block_hash, + reuse_block_hash=reuse_block, + ) + if parents: + formatted_parents = [] + for proportion, parent in parents.value: + # Convert U64 to int + formatted_child = decode_account_id(parent[0]) + normalized_proportion = u64_normalized_float(proportion) + formatted_parents.append((normalized_proportion, formatted_child)) + return formatted_parents + + return [] + async def get_children( self, hotkey: str, diff --git a/bittensor/core/subtensor.py b/bittensor/core/subtensor.py index 406295e330..bb0806159d 100644 --- a/bittensor/core/subtensor.py +++ b/bittensor/core/subtensor.py @@ -751,6 +751,38 @@ def get_hyperparameter( return getattr(result, "value", result) + def get_parents( + self, hotkey: str, netuid: int, block: Optional[int] = None + ) -> list[tuple[float, str]]: + """ + This method retrieves the parent of a given hotkey and netuid. It queries the SubtensorModule's ParentKeys + storage function to get the children and formats them before returning as a tuple. + + Arguments: + hotkey: The child hotkey SS58. + netuid: The netuid. + block: The block number for which the children are to be retrieved. + + Returns: + A list of formatted parents [(proportion, parent)] + """ + parents = self.substrate.query( + module="SubtensorModule", + storage_function="ParentKeys", + params=[hotkey, netuid], + block_hash=self.determine_block_hash(block), + ) + if parents: + formatted_parents = [] + for proportion, parent in parents.value: + # Convert U64 to int + formatted_child = decode_account_id(parent[0]) + normalized_proportion = u64_normalized_float(proportion) + formatted_parents.append((normalized_proportion, formatted_child)) + return formatted_parents + + return [] + def get_children( self, hotkey: str, netuid: int, block: Optional[int] = None ) -> tuple[bool, list[tuple[float, str]], str]: diff --git a/bittensor/core/subtensor_api/subnets.py b/bittensor/core/subtensor_api/subnets.py index 962f761d1e..ddeedaf1fa 100644 --- a/bittensor/core/subtensor_api/subnets.py +++ b/bittensor/core/subtensor_api/subnets.py @@ -14,6 +14,7 @@ def __init__(self, subtensor: Union["_Subtensor", "_AsyncSubtensor"]): self.bonds = subtensor.bonds self.difficulty = subtensor.difficulty self.get_all_subnets_info = subtensor.get_all_subnets_info + self.get_parents = subtensor.get_parents self.get_children = subtensor.get_children self.get_children_pending = subtensor.get_children_pending self.get_current_weight_commit_info = subtensor.get_current_weight_commit_info diff --git a/bittensor/core/subtensor_api/utils.py b/bittensor/core/subtensor_api/utils.py index 0ca9b1234a..0ddd28bcc7 100644 --- a/bittensor/core/subtensor_api/utils.py +++ b/bittensor/core/subtensor_api/utils.py @@ -36,6 +36,7 @@ def add_legacy_methods(subtensor: "SubtensorApi"): subtensor.get_balance = subtensor._subtensor.get_balance subtensor.get_balances = subtensor._subtensor.get_balances subtensor.get_block_hash = subtensor._subtensor.get_block_hash + subtensor.get_parents = subtensor._subtensor.get_parents subtensor.get_children = subtensor._subtensor.get_children subtensor.get_children_pending = subtensor._subtensor.get_children_pending subtensor.get_commitment = subtensor._subtensor.get_commitment diff --git a/tests/e2e_tests/test_hotkeys.py b/tests/e2e_tests/test_hotkeys.py index 3a699f08d0..8bd8cfbd63 100644 --- a/tests/e2e_tests/test_hotkeys.py +++ b/tests/e2e_tests/test_hotkeys.py @@ -69,7 +69,7 @@ def test_hotkeys(subtensor, alice_wallet, dave_wallet): ) is True ) - logging.console.success(f"✅ Test [green]test_hotkeys[/green] passed") + logging.console.success("✅ Test [green]test_hotkeys[/green] passed") @pytest.mark.asyncio @@ -276,6 +276,10 @@ async def test_children(local_chain, subtensor, alice_wallet, bob_wallet, dave_w assert success is True assert children == [(1.0, bob_wallet.hotkey.ss58_address)] + parent_ = subtensor.get_parents(bob_wallet.hotkey.ss58_address, dave_subnet_netuid) + + assert parent_ == [(1.0, alice_wallet.hotkey.ss58_address)] + # pending queue is empty pending, cooldown = subtensor.get_children_pending( alice_wallet.hotkey.ss58_address, diff --git a/tests/unit_tests/test_async_subtensor.py b/tests/unit_tests/test_async_subtensor.py index a023be1c62..a723f4ed83 100644 --- a/tests/unit_tests/test_async_subtensor.py +++ b/tests/unit_tests/test_async_subtensor.py @@ -1981,6 +1981,90 @@ async def test_get_children_substrate_request_exception(subtensor, mocker): assert result == (False, [], "Formatted error message") +@pytest.mark.asyncio +async def test_get_parents_success(subtensor, mocker): + """Tests get_parents when parents are successfully retrieved and formatted.""" + # Preps + fake_hotkey = "valid_hotkey" + fake_netuid = 1 + fake_parents = mocker.Mock( + value=[ + (1000, ["parent_key_1"]), + (2000, ["parent_key_2"]), + ] + ) + + mocked_query = mocker.AsyncMock(return_value=fake_parents) + subtensor.substrate.query = mocked_query + + mocked_decode_account_id = mocker.Mock( + side_effect=["decoded_parent_key_1", "decoded_parent_key_2"] + ) + mocker.patch.object(async_subtensor, "decode_account_id", mocked_decode_account_id) + + expected_formatted_parents = [ + (u64_normalized_float(1000), "decoded_parent_key_1"), + (u64_normalized_float(2000), "decoded_parent_key_2"), + ] + + # Call + result = await subtensor.get_parents(hotkey=fake_hotkey, netuid=fake_netuid) + + # Asserts + mocked_query.assert_called_once_with( + block_hash=None, + module="SubtensorModule", + storage_function="ParentKeys", + params=[fake_hotkey, fake_netuid], + reuse_block_hash=False, + ) + mocked_decode_account_id.assert_has_calls( + [mocker.call("parent_key_1"), mocker.call("parent_key_2")] + ) + assert result == expected_formatted_parents + + +@pytest.mark.asyncio +async def test_get_parents_no_parents(subtensor, mocker): + """Tests get_parents when there are no parents to retrieve.""" + # Preps + fake_hotkey = "valid_hotkey" + fake_netuid = 1 + fake_parents = [] + + mocked_query = mocker.AsyncMock(return_value=fake_parents) + subtensor.substrate.query = mocked_query + + # Call + result = await subtensor.get_parents(hotkey=fake_hotkey, netuid=fake_netuid) + + # Asserts + mocked_query.assert_called_once_with( + block_hash=None, + module="SubtensorModule", + storage_function="ParentKeys", + params=[fake_hotkey, fake_netuid], + reuse_block_hash=False, + ) + assert result == [] + + +@pytest.mark.asyncio +async def test_get_parents_substrate_request_exception(subtensor, mocker): + """Tests get_parents when SubstrateRequestException is raised.""" + # Preps + fake_hotkey = "valid_hotkey" + fake_netuid = 1 + fake_exception = async_subtensor.SubstrateRequestException("Test Exception") + + mocked_query = mocker.AsyncMock(side_effect=fake_exception) + subtensor.substrate.query = mocked_query + + # Call + with pytest.raises(async_subtensor.SubstrateRequestException): + await subtensor.get_parents(hotkey=fake_hotkey, netuid=fake_netuid) + + @pytest.mark.asyncio async def test_get_children_pending(mock_substrate, subtensor): mock_substrate.query.return_value.value = [ diff --git a/tests/unit_tests/test_subtensor.py b/tests/unit_tests/test_subtensor.py index 2499db8248..11fa933425 100644 --- a/tests/unit_tests/test_subtensor.py +++ b/tests/unit_tests/test_subtensor.py @@ -3746,3 +3746,67 @@ def test_get_next_epoch_start_block(mocker, subtensor, call_return, expected): ) subtensor.tempo.assert_called_once_with(netuid=netuid, block=block) assert result == expected + + +def test_get_parents_success(subtensor, mocker): + """Tests get_parents when parents are successfully retrieved and formatted.""" + # Preps + fake_hotkey = "valid_hotkey" + fake_netuid = 1 + fake_parents = mocker.Mock( + value=[ + (1000, ["parent_key_1"]), + (2000, ["parent_key_2"]), + ] + ) + + mocked_query = mocker.MagicMock(return_value=fake_parents) + subtensor.substrate.query = mocked_query + + mocked_decode_account_id = mocker.Mock( + side_effect=["decoded_parent_key_1", "decoded_parent_key_2"] + ) + mocker.patch.object(subtensor_module, "decode_account_id", mocked_decode_account_id) + + expected_formatted_parents = [ + (u64_normalized_float(1000), "decoded_parent_key_1"), + (u64_normalized_float(2000), "decoded_parent_key_2"), + ] + + # Call + result = subtensor.get_parents(hotkey=fake_hotkey, netuid=fake_netuid) + + # Asserts + mocked_query.assert_called_once_with( + block_hash=None, + module="SubtensorModule", + storage_function="ParentKeys", + params=[fake_hotkey, fake_netuid], + ) + mocked_decode_account_id.assert_has_calls( + [mocker.call("parent_key_1"), mocker.call("parent_key_2")] + ) + assert result == expected_formatted_parents + + +def test_get_parents_no_parents(subtensor, mocker): + """Tests get_parents when there are no parents to retrieve.""" + # Preps + fake_hotkey = "valid_hotkey" + fake_netuid = 1 + fake_parents = [] + + mocked_query = mocker.MagicMock(return_value=fake_parents) + subtensor.substrate.query = mocked_query + + # Call + result = subtensor.get_parents(hotkey=fake_hotkey, netuid=fake_netuid) + + # Asserts + mocked_query.assert_called_once_with( + block_hash=None, + module="SubtensorModule", + storage_function="ParentKeys", + params=[fake_hotkey, fake_netuid], + ) + assert result == []