diff --git a/bittensor_cli/cli.py b/bittensor_cli/cli.py index 68d95db4c..4b3998344 100755 --- a/bittensor_cli/cli.py +++ b/bittensor_cli/cli.py @@ -109,6 +109,17 @@ class Options: "--wallet.hotkey", help="Hotkey of the wallet", ) + wallet_ss58_address = typer.Option( + None, + "--wallet-name", + "--name", + "--wallet_name", + "--wallet.name", + "--address", + "--ss58", + "--ss58-address", + help="SS58 address or wallet name to check. Leave empty to be prompted.", + ) wallet_hotkey_ss58 = typer.Option( None, "--hotkey", @@ -684,6 +695,12 @@ def __init__(self): self.wallet_app.command( "swap-hotkey", rich_help_panel=HELP_PANELS["WALLET"]["SECURITY"] )(self.wallet_swap_hotkey) + self.wallet_app.command( + "swap-coldkey", rich_help_panel=HELP_PANELS["WALLET"]["SECURITY"] + )(self.wallet_swap_coldkey) + self.wallet_app.command( + "swap-check", rich_help_panel=HELP_PANELS["WALLET"]["SECURITY"] + )(self.wallet_check_ck_swap) self.wallet_app.command( "regen-coldkey", rich_help_panel=HELP_PANELS["WALLET"]["SECURITY"] )(self.wallet_regen_coldkey) @@ -2306,28 +2323,92 @@ def wallet_new_coldkey( def wallet_check_ck_swap( self, - wallet_name: Optional[str] = Options.wallet_name, + wallet_ss58_address: Optional[str] = Options.wallet_ss58_address, wallet_path: Optional[str] = Options.wallet_path, wallet_hotkey: Optional[str] = Options.wallet_hotkey, + scheduled_block: Optional[int] = typer.Option( + None, + "--block", + help="Block number where the swap was scheduled", + ), + show_all: bool = typer.Option( + False, + "--all", + "-a", + help="Show all pending coldkey swaps", + ), network: Optional[list[str]] = Options.network, quiet: bool = Options.quiet, verbose: bool = Options.verbose, ): """ - Check the status of your scheduled coldkey swap. + Check the status of scheduled coldkey swaps. USAGE - Users should provide the old coldkey wallet to check the swap status. + This command can be used in three ways: + 1. Show all pending swaps (--all) + 2. Check status of a specific wallet's swap or SS58 address + 3. Check detailed swap status with block number (--block) - EXAMPLE + EXAMPLES + + Show all pending swaps: + [green]$[/green] btcli wallet swap-check --all - [green]$[/green] btcli wallet check_coldkey_swap + Check specific wallet's swap: + [green]$[/green] btcli wallet swap-check --wallet-name my_wallet + + Check swap using SS58 address: + [green]$[/green] btcli wallet swap-check --ss58 5DkQ4... + + Check swap details with block number: + [green]$[/green] btcli wallet swap-check --wallet-name my_wallet --block 12345 """ self.verbosity_handler(quiet, verbose) - wallet = self.wallet_ask(wallet_name, wallet_path, wallet_hotkey) self.initialize_chain(network) - return self._run_command(wallets.check_coldkey_swap(wallet, self.subtensor)) + + if show_all: + return self._run_command( + wallets.check_swap_status(self.subtensor, None, None) + ) + + if not wallet_ss58_address: + wallet_ss58_address = Prompt.ask( + "Enter [blue]wallet name[/blue] or [blue]SS58 address[/blue] [dim](leave blank to show all pending swaps)[/dim]" + ) + if not wallet_ss58_address: + return self._run_command( + wallets.check_swap_status(self.subtensor, None, None) + ) + + if is_valid_ss58_address(wallet_ss58_address): + ss58_address = wallet_ss58_address + else: + wallet = self.wallet_ask( + wallet_ss58_address, + wallet_path, + wallet_hotkey, + ask_for=[WO.NAME, WO.PATH], + validate=WV.WALLET, + ) + ss58_address = wallet.coldkeypub.ss58_address + + if not scheduled_block: + block_input = Prompt.ask( + "[blue]Enter the block number[/blue] where the swap was scheduled [dim](optional, press enter to skip)[/dim]", + default="", + ) + if block_input: + try: + scheduled_block = int(block_input) + except ValueError: + print_error("Invalid block number") + raise typer.Exit() + + return self._run_command( + wallets.check_swap_status(self.subtensor, ss58_address, scheduled_block) + ) def wallet_create_wallet( self, @@ -2762,6 +2843,91 @@ def wallet_sign( return self._run_command(wallets.sign(wallet, message, use_hotkey)) + def wallet_swap_coldkey( + self, + wallet_name: Optional[str] = Options.wallet_name, + wallet_path: Optional[str] = Options.wallet_path, + wallet_hotkey: Optional[str] = Options.wallet_hotkey, + new_wallet_or_ss58: Optional[str] = typer.Option( + None, + "--new-coldkey", + "--new-coldkey-ss58", + "--new-wallet", + "--new", + help="SS58 address of the new coldkey that will replace the current one.", + ), + network: Optional[list[str]] = Options.network, + quiet: bool = Options.quiet, + verbose: bool = Options.verbose, + force_swap: bool = typer.Option( + False, + "--force", + "-f", + "--force-swap", + help="Force the swap even if the new coldkey is already scheduled for a swap.", + ), + ): + """ + Schedule a coldkey swap for a wallet. + + This command allows you to schedule a coldkey swap for a wallet. You can either provide a new wallet name, or SS58 address. + + EXAMPLES + + [green]$[/green] btcli wallet schedule-coldkey-swap --new-wallet my_new_wallet + + [green]$[/green] btcli wallet schedule-coldkey-swap --new-coldkey-ss58 5Dk...X3q + """ + self.verbosity_handler(quiet, verbose) + + if not wallet_name: + wallet_name = Prompt.ask( + "Enter the [blue]wallet name[/blue] which you want to swap the coldkey for", + default=self.config.get("wallet_name") or defaults.wallet.name, + ) + wallet = self.wallet_ask( + wallet_name, + wallet_path, + wallet_hotkey, + ask_for=[WO.NAME], + validate=WV.WALLET, + ) + console.print( + f"\nWallet selected to swap the [blue]coldkey[/blue] from: \n" + f"[dark_sea_green3]{wallet}[/dark_sea_green3]\n" + ) + + if not new_wallet_or_ss58: + new_wallet_or_ss58 = Prompt.ask( + "Enter the [blue]new wallet name[/blue] or [blue]SS58 address[/blue] of the new coldkey", + ) + + if is_valid_ss58_address(new_wallet_or_ss58): + new_wallet_coldkey_ss58 = new_wallet_or_ss58 + else: + new_wallet_name = new_wallet_or_ss58 + new_wallet = self.wallet_ask( + new_wallet_name, + wallet_path, + wallet_hotkey, + ask_for=[WO.NAME], + validate=WV.WALLET, + ) + console.print( + f"\nNew wallet to swap the [blue]coldkey[/blue] to: \n" + f"[dark_sea_green3]{new_wallet}[/dark_sea_green3]\n" + ) + new_wallet_coldkey_ss58 = new_wallet.coldkeypub.ss58_address + + return self._run_command( + wallets.schedule_coldkey_swap( + wallet=wallet, + subtensor=self.initialize_chain(network), + new_coldkey_ss58=new_wallet_coldkey_ss58, + force_swap=force_swap, + ) + ) + def stake_list( self, network: Optional[list[str]] = Options.network, diff --git a/bittensor_cli/src/__init__.py b/bittensor_cli/src/__init__.py index 1ecd29a33..7011dfd47 100644 --- a/bittensor_cli/src/__init__.py +++ b/bittensor_cli/src/__init__.py @@ -33,6 +33,10 @@ class Constants: "latent-lite": latent_lite_entrypoint, "subvortex": subvortex_entrypoint, } + genesis_block_hash_map = { + "finney": "0x2f0555cc76fc2840a25a6ea3b9637146806f1f44b090c175ffde2a7e5ab36c03", + "test": "0x8f9cf856bf558a14440e75569c9e58594757048d7b3a84b5d25f6bd978263105", + } delegates_detail_url = "https://raw.githubusercontent.com/opentensor/bittensor-delegates/main/public/delegates.json" diff --git a/bittensor_cli/src/bittensor/subtensor_interface.py b/bittensor_cli/src/bittensor/subtensor_interface.py index dba786369..a19dc8b85 100644 --- a/bittensor_cli/src/bittensor/subtensor_interface.py +++ b/bittensor_cli/src/bittensor/subtensor_interface.py @@ -1501,3 +1501,53 @@ async def get_stake_fee( ) return Balance.from_rao(result) + + async def get_scheduled_coldkey_swap( + self, + block_hash: Optional[str] = None, + reuse_block: bool = False, + ) -> Optional[list[str]]: + """ + Queries the chain to fetch the list of coldkeys that are scheduled for a swap. + + :param block_hash: Block hash at which to perform query. + :param reuse_block: Whether to reuse the last-used block hash. + + :return: A list of SS58 addresses of the coldkeys that are scheduled for a coldkey swap. + """ + result = await self.substrate.query_map( + module="SubtensorModule", + storage_function="ColdkeySwapScheduled", + block_hash=block_hash, + reuse_block_hash=reuse_block, + ) + + keys_pending_swap = [] + async for ss58, _ in result: + keys_pending_swap.append(decode_account_id(ss58)) + return keys_pending_swap + + async def get_coldkey_swap_schedule_duration( + self, + block_hash: Optional[str] = None, + reuse_block: bool = False, + ) -> int: + """ + Retrieves the duration (in blocks) required for a coldkey swap to be executed. + + Args: + block_hash: The hash of the blockchain block number for the query. + reuse_block: Whether to reuse the last-used blockchain block hash. + + Returns: + int: The number of blocks required for the coldkey swap schedule duration. + """ + result = await self.query( + module="SubtensorModule", + storage_function="ColdkeySwapScheduleDuration", + params=[], + block_hash=block_hash, + reuse_block_hash=reuse_block, + ) + + return result diff --git a/bittensor_cli/src/commands/wallets.py b/bittensor_cli/src/commands/wallets.py index 82b1253ac..61d0bcb6f 100644 --- a/bittensor_cli/src/commands/wallets.py +++ b/bittensor_cli/src/commands/wallets.py @@ -14,8 +14,9 @@ from rich.table import Column, Table from rich.tree import Tree from rich.padding import Padding +from rich.prompt import Confirm -from bittensor_cli.src import COLOR_PALETTE +from bittensor_cli.src import COLOR_PALETTE, COLORS, Constants from bittensor_cli.src.bittensor import utils from bittensor_cli.src.bittensor.balances import Balance from bittensor_cli.src.bittensor.chain_data import ( @@ -44,6 +45,8 @@ millify_tao, unlock_key, WalletLike, + blocks_to_duration, + decode_account_id, ) @@ -1472,3 +1475,258 @@ async def sign(wallet: Wallet, message: str, use_hotkey: str): signed_message = keypair.sign(message.encode("utf-8")).hex() console.print("[dark_sea_green3]Message signed successfully:") console.print(signed_message) + + +async def schedule_coldkey_swap( + wallet: Wallet, + subtensor: SubtensorInterface, + new_coldkey_ss58: str, + force_swap: bool = False, +) -> bool: + """Schedules a coldkey swap operation to be executed at a future block. + + Args: + wallet (Wallet): The wallet initiating the coldkey swap + subtensor (SubtensorInterface): Connection to the Bittensor network + new_coldkey_ss58 (str): SS58 address of the new coldkey + force_swap (bool, optional): Whether to force the swap even if the new coldkey is already scheduled for a swap. Defaults to False. + Returns: + bool: True if the swap was scheduled successfully, False otherwise + """ + if not is_valid_ss58_address(new_coldkey_ss58): + print_error(f"Invalid SS58 address format: {new_coldkey_ss58}") + return False + + scheduled_coldkey_swap = await subtensor.get_scheduled_coldkey_swap() + if wallet.coldkeypub.ss58_address in scheduled_coldkey_swap: + print_error( + f"Coldkey {wallet.coldkeypub.ss58_address} is already scheduled for a swap." + ) + console.print("[dim]Use the force_swap (--force) flag to override this.[/dim]") + if not force_swap: + return False + else: + console.print( + "[yellow]Continuing with the swap due to force_swap flag.[/yellow]\n" + ) + + prompt = ( + "You are [red]swapping[/red] your [blue]coldkey[/blue] to a new address.\n" + f"Current ss58: [{COLORS.G.CK}]{wallet.coldkeypub.ss58_address}[/{COLORS.G.CK}]\n" + f"New ss58: [{COLORS.G.CK}]{new_coldkey_ss58}[/{COLORS.G.CK}]\n" + "Are you sure you want to continue?" + ) + if not Confirm.ask(prompt): + return False + + if not unlock_key(wallet).success: + return False + + block_pre_call, call = await asyncio.gather( + subtensor.substrate.get_block_number(), + subtensor.substrate.compose_call( + call_module="SubtensorModule", + call_function="schedule_swap_coldkey", + call_params={ + "new_coldkey": new_coldkey_ss58, + }, + ), + ) + + with console.status(":satellite: Scheduling coldkey swap on-chain..."): + success, err_msg = await subtensor.sign_and_send_extrinsic( + call, + wallet, + wait_for_inclusion=True, + wait_for_finalization=True, + ) + block_post_call = await subtensor.substrate.get_block_number() + + if not success: + print_error(f"Failed to schedule coldkey swap: {err_msg}") + return False + + console.print( + ":white_heavy_check_mark: [green]Successfully scheduled coldkey swap" + ) + + swap_info = await find_coldkey_swap_extrinsic( + subtensor=subtensor, + start_block=block_pre_call, + end_block=block_post_call, + wallet_ss58=wallet.coldkeypub.ss58_address, + ) + + if not swap_info: + console.print( + "[yellow]Warning: Could not find the swap extrinsic in recent blocks" + ) + return True + + console.print( + "\n[green]Coldkey swap details:[/green]" + f"\nBlock number: {swap_info['block_num']}" + f"\nOriginal address: [{COLORS.G.CK}]{wallet.coldkeypub.ss58_address}[/{COLORS.G.CK}]" + f"\nDestination address: [{COLORS.G.CK}]{swap_info['dest_coldkey']}[/{COLORS.G.CK}]" + f"\nThe swap will be completed at block: [green]{swap_info['execution_block']}[/green]" + f"\n[dim]You can provide the block number to `btcli wallet swap-check`[/dim]" + ) + + +async def find_coldkey_swap_extrinsic( + subtensor: SubtensorInterface, + start_block: int, + end_block: int, + wallet_ss58: str, +) -> dict: + """Search for a coldkey swap event in a range of blocks. + + Args: + subtensor: SubtensorInterface for chain queries + start_block: Starting block number to search + end_block: Ending block number to search (inclusive) + wallet_ss58: SS58 address of the signing wallet + + Returns: + dict: Contains the following keys if found: + - block_num: Block number where swap was scheduled + - dest_coldkey: SS58 address of destination coldkey + - execution_block: Block number when swap will execute + Empty dict if not found + """ + + current_block, genesis_block = await asyncio.gather( + subtensor.substrate.get_block_number(), + subtensor.substrate.get_block_hash(0) + ) + if ( + current_block - start_block > 300 + and genesis_block == Constants.genesis_block_hash_map["finney"] + ): + console.print("Querying archive node for coldkey swap events...") + await subtensor.substrate.close() + subtensor = SubtensorInterface("archive") + + block_hashes = await asyncio.gather( + *[ + subtensor.substrate.get_block_hash(block_num) + for block_num in range(start_block, end_block + 1) + ] + ) + block_events = await asyncio.gather( + *[ + subtensor.substrate.get_events(block_hash=block_hash) + for block_hash in block_hashes + ] + ) + + for block_num, events in zip(range(start_block, end_block + 1), block_events): + for event in events: + if ( + event.get("event", {}).get("module_id") == "SubtensorModule" + and event.get("event", {}).get("event_id") == "ColdkeySwapScheduled" + ): + attributes = event["event"].get("attributes", {}) + old_coldkey = decode_account_id(attributes["old_coldkey"][0]) + + if old_coldkey == wallet_ss58: + return { + "block_num": block_num, + "dest_coldkey": decode_account_id(attributes["new_coldkey"][0]), + "execution_block": attributes["execution_block"], + } + + return {} + + +async def check_swap_status( + subtensor: SubtensorInterface, + origin_ss58: Optional[str] = None, + expected_block_number: Optional[int] = None, +) -> None: + """ + Check the status of a coldkey swap. + + Args: + subtensor: Connection to the network + origin_ss58: The SS58 address of the original coldkey + block_number: Optional block number where the swap was scheduled + """ + scheduled_swaps = await subtensor.get_scheduled_coldkey_swap() + + if not origin_ss58: + if not scheduled_swaps: + console.print("[yellow]No pending coldkey swaps found.[/yellow]") + return + + table = Table( + Column( + "Original Coldkey", + justify="Left", + style=COLOR_PALETTE["GENERAL"]["SUBHEADING_MAIN"], + no_wrap=True, + ), + Column("Status", style="dark_sea_green3"), + title=f"\n[{COLOR_PALETTE['GENERAL']['HEADER']}]Pending Coldkey Swaps\n", + show_header=True, + show_edge=False, + header_style="bold white", + border_style="bright_black", + style="bold", + title_justify="center", + show_lines=False, + pad_edge=True, + ) + + for coldkey in scheduled_swaps: + table.add_row(coldkey, "Pending") + + console.print(table) + console.print( + "\n[dim]Tip: Check specific swap details by providing the original coldkey SS58 address and the block number.[/dim]" + ) + return + + is_pending = origin_ss58 in scheduled_swaps + + if not is_pending: + console.print( + f"[red]No pending swap found for coldkey:[/red] [{COLORS.G.CK}]{origin_ss58}[/{COLORS.G.CK}]" + ) + return + + console.print( + f"[green]Found pending swap for coldkey:[/green] [{COLORS.G.CK}]{origin_ss58}[/{COLORS.G.CK}]" + ) + + if expected_block_number is None: + return + + swap_info = await find_coldkey_swap_extrinsic( + subtensor=subtensor, + start_block=expected_block_number, + end_block=expected_block_number, + wallet_ss58=origin_ss58, + ) + + if not swap_info: + console.print( + f"[yellow]Warning: Could not find swap extrinsic at block {expected_block_number}[/yellow]" + ) + return + + current_block = await subtensor.substrate.get_block_number() + remaining_blocks = swap_info["execution_block"] - current_block + + if remaining_blocks <= 0: + console.print("[green]Swap period has completed![/green]") + return + + console.print( + "\n[green]Coldkey swap details:[/green]" + f"\nScheduled at block: {swap_info['block_num']}" + f"\nOriginal address: [{COLORS.G.CK}]{origin_ss58}[/{COLORS.G.CK}]" + f"\nDestination address: [{COLORS.G.CK}]{swap_info['dest_coldkey']}[/{COLORS.G.CK}]" + f"\nCompletion block: {swap_info['execution_block']}" + f"\nTime remaining: {blocks_to_duration(remaining_blocks)}" + )