Skip to content
180 changes: 173 additions & 7 deletions bittensor_cli/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,17 @@ class Options:
"--wallet.hotkey",
help="Hotkey of the wallet",
)
wallet_ss58_address = typer.Option(
None,
"--wallet-name",
"--name",
"--wallet_name",
"--wallet.name",
"--address",
"--ss58",
"--ss58-address",
help="SS58 address or wallet name to check. Leave empty to be prompted.",
)
wallet_hotkey_ss58 = typer.Option(
None,
"--hotkey",
Expand Down Expand Up @@ -684,6 +695,12 @@ def __init__(self):
self.wallet_app.command(
"swap-hotkey", rich_help_panel=HELP_PANELS["WALLET"]["SECURITY"]
)(self.wallet_swap_hotkey)
self.wallet_app.command(
"swap-coldkey", rich_help_panel=HELP_PANELS["WALLET"]["SECURITY"]
)(self.wallet_swap_coldkey)
self.wallet_app.command(
"swap-check", rich_help_panel=HELP_PANELS["WALLET"]["SECURITY"]
)(self.wallet_check_ck_swap)
self.wallet_app.command(
"regen-coldkey", rich_help_panel=HELP_PANELS["WALLET"]["SECURITY"]
)(self.wallet_regen_coldkey)
Expand Down Expand Up @@ -2306,28 +2323,92 @@ def wallet_new_coldkey(

def wallet_check_ck_swap(
self,
wallet_name: Optional[str] = Options.wallet_name,
wallet_ss58_address: Optional[str] = Options.wallet_ss58_address,
wallet_path: Optional[str] = Options.wallet_path,
wallet_hotkey: Optional[str] = Options.wallet_hotkey,
scheduled_block: Optional[int] = typer.Option(
None,
"--block",
help="Block number where the swap was scheduled",
),
show_all: bool = typer.Option(
False,
"--all",
"-a",
help="Show all pending coldkey swaps",
),
network: Optional[list[str]] = Options.network,
quiet: bool = Options.quiet,
verbose: bool = Options.verbose,
):
"""
Check the status of your scheduled coldkey swap.
Check the status of scheduled coldkey swaps.

USAGE

Users should provide the old coldkey wallet to check the swap status.
This command can be used in three ways:
1. Show all pending swaps (--all)
2. Check status of a specific wallet's swap or SS58 address
3. Check detailed swap status with block number (--block)

EXAMPLE
EXAMPLES

Show all pending swaps:
[green]$[/green] btcli wallet swap-check --all

[green]$[/green] btcli wallet check_coldkey_swap
Check specific wallet's swap:
[green]$[/green] btcli wallet swap-check --wallet-name my_wallet

Check swap using SS58 address:
[green]$[/green] btcli wallet swap-check --ss58 5DkQ4...

Check swap details with block number:
[green]$[/green] btcli wallet swap-check --wallet-name my_wallet --block 12345
"""
self.verbosity_handler(quiet, verbose)
wallet = self.wallet_ask(wallet_name, wallet_path, wallet_hotkey)
self.initialize_chain(network)
return self._run_command(wallets.check_coldkey_swap(wallet, self.subtensor))

if show_all:
return self._run_command(
wallets.check_swap_status(self.subtensor, None, None)
)

if not wallet_ss58_address:
wallet_ss58_address = Prompt.ask(
"Enter [blue]wallet name[/blue] or [blue]SS58 address[/blue] [dim](leave blank to show all pending swaps)[/dim]"
)
if not wallet_ss58_address:
return self._run_command(
wallets.check_swap_status(self.subtensor, None, None)
)

if is_valid_ss58_address(wallet_ss58_address):
ss58_address = wallet_ss58_address
else:
wallet = self.wallet_ask(
wallet_ss58_address,
wallet_path,
wallet_hotkey,
ask_for=[WO.NAME, WO.PATH],
validate=WV.WALLET,
)
ss58_address = wallet.coldkeypub.ss58_address

if not scheduled_block:
block_input = Prompt.ask(
"[blue]Enter the block number[/blue] where the swap was scheduled [dim](optional, press enter to skip)[/dim]",
default="",
)
if block_input:
try:
scheduled_block = int(block_input)
except ValueError:
print_error("Invalid block number")
raise typer.Exit()

return self._run_command(
wallets.check_swap_status(self.subtensor, ss58_address, scheduled_block)
)

def wallet_create_wallet(
self,
Expand Down Expand Up @@ -2762,6 +2843,91 @@ def wallet_sign(

return self._run_command(wallets.sign(wallet, message, use_hotkey))

def wallet_swap_coldkey(
self,
wallet_name: Optional[str] = Options.wallet_name,
wallet_path: Optional[str] = Options.wallet_path,
wallet_hotkey: Optional[str] = Options.wallet_hotkey,
new_wallet_or_ss58: Optional[str] = typer.Option(
None,
"--new-coldkey",
"--new-coldkey-ss58",
"--new-wallet",
"--new",
help="SS58 address of the new coldkey that will replace the current one.",
),
network: Optional[list[str]] = Options.network,
quiet: bool = Options.quiet,
verbose: bool = Options.verbose,
force_swap: bool = typer.Option(
False,
"--force",
"-f",
"--force-swap",
help="Force the swap even if the new coldkey is already scheduled for a swap.",
),
):
"""
Schedule a coldkey swap for a wallet.

This command allows you to schedule a coldkey swap for a wallet. You can either provide a new wallet name, or SS58 address.

EXAMPLES

[green]$[/green] btcli wallet schedule-coldkey-swap --new-wallet my_new_wallet

[green]$[/green] btcli wallet schedule-coldkey-swap --new-coldkey-ss58 5Dk...X3q
"""
self.verbosity_handler(quiet, verbose)

if not wallet_name:
wallet_name = Prompt.ask(
"Enter the [blue]wallet name[/blue] which you want to swap the coldkey for",
default=self.config.get("wallet_name") or defaults.wallet.name,
)
wallet = self.wallet_ask(
wallet_name,
wallet_path,
wallet_hotkey,
ask_for=[WO.NAME],
validate=WV.WALLET,
)
console.print(
f"\nWallet selected to swap the [blue]coldkey[/blue] from: \n"
f"[dark_sea_green3]{wallet}[/dark_sea_green3]\n"
)

if not new_wallet_or_ss58:
new_wallet_or_ss58 = Prompt.ask(
"Enter the [blue]new wallet name[/blue] or [blue]SS58 address[/blue] of the new coldkey",
)

if is_valid_ss58_address(new_wallet_or_ss58):
new_wallet_coldkey_ss58 = new_wallet_or_ss58
else:
new_wallet_name = new_wallet_or_ss58
new_wallet = self.wallet_ask(
new_wallet_name,
wallet_path,
wallet_hotkey,
ask_for=[WO.NAME],
validate=WV.WALLET,
)
console.print(
f"\nNew wallet to swap the [blue]coldkey[/blue] to: \n"
f"[dark_sea_green3]{new_wallet}[/dark_sea_green3]\n"
)
new_wallet_coldkey_ss58 = new_wallet.coldkeypub.ss58_address

return self._run_command(
wallets.schedule_coldkey_swap(
wallet=wallet,
subtensor=self.initialize_chain(network),
new_coldkey_ss58=new_wallet_coldkey_ss58,
force_swap=force_swap,
)
)

def stake_list(
self,
network: Optional[list[str]] = Options.network,
Expand Down
4 changes: 4 additions & 0 deletions bittensor_cli/src/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ class Constants:
"latent-lite": latent_lite_entrypoint,
"subvortex": subvortex_entrypoint,
}
genesis_block_hash_map = {
"finney": "0x2f0555cc76fc2840a25a6ea3b9637146806f1f44b090c175ffde2a7e5ab36c03",
"test": "0x8f9cf856bf558a14440e75569c9e58594757048d7b3a84b5d25f6bd978263105",
}
delegates_detail_url = "https://raw.githubusercontent.com/opentensor/bittensor-delegates/main/public/delegates.json"


Expand Down
50 changes: 50 additions & 0 deletions bittensor_cli/src/bittensor/subtensor_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -1501,3 +1501,53 @@ async def get_stake_fee(
)

return Balance.from_rao(result)

async def get_scheduled_coldkey_swap(
self,
block_hash: Optional[str] = None,
reuse_block: bool = False,
) -> Optional[list[str]]:
"""
Queries the chain to fetch the list of coldkeys that are scheduled for a swap.

:param block_hash: Block hash at which to perform query.
:param reuse_block: Whether to reuse the last-used block hash.

:return: A list of SS58 addresses of the coldkeys that are scheduled for a coldkey swap.
"""
result = await self.substrate.query_map(
module="SubtensorModule",
storage_function="ColdkeySwapScheduled",
block_hash=block_hash,
reuse_block_hash=reuse_block,
)

keys_pending_swap = []
async for ss58, _ in result:
keys_pending_swap.append(decode_account_id(ss58))
return keys_pending_swap

async def get_coldkey_swap_schedule_duration(
self,
block_hash: Optional[str] = None,
reuse_block: bool = False,
) -> int:
"""
Retrieves the duration (in blocks) required for a coldkey swap to be executed.

Args:
block_hash: The hash of the blockchain block number for the query.
reuse_block: Whether to reuse the last-used blockchain block hash.

Returns:
int: The number of blocks required for the coldkey swap schedule duration.
"""
result = await self.query(
module="SubtensorModule",
storage_function="ColdkeySwapScheduleDuration",
params=[],
block_hash=block_hash,
reuse_block_hash=reuse_block,
)

return result
Loading
Loading