diff --git a/bittensor_cli/cli.py b/bittensor_cli/cli.py index 9ad6c89df..570c14893 100755 --- a/bittensor_cli/cli.py +++ b/bittensor_cli/cli.py @@ -7216,6 +7216,12 @@ def stake_set_claim_type( None, help="Claim type: 'keep' or 'swap'. If not provided, you'll be prompted to choose.", ), + netuids: Optional[str] = typer.Option( + None, + "--netuids", + "-n", + help="Netuids to select. Supports ranges and comma-separated values, e.g., '1-5,10,20-30'.", + ), wallet_name: Optional[str] = Options.wallet_name, wallet_path: Optional[str] = Options.wallet_path, wallet_hotkey: Optional[str] = Options.wallet_hotkey, @@ -7233,12 +7239,15 @@ def stake_set_claim_type( [bold]Claim Types:[/bold] • [green]Swap[/green]: Future Root Alpha Emissions are swapped to TAO and added to root stake (default) • [yellow]Keep[/yellow]: Future Root Alpha Emissions are kept as Alpha tokens + • [cyan]Keep Specific[/cyan]: Keep specific subnets as Alpha, swap others to TAO. You can use this type by selecting the netuids. USAGE: - [green]$[/green] btcli stake claim - [green]$[/green] btcli stake claim keep - [green]$[/green] btcli stake claim swap + [green]$[/green] btcli stake claim [cyan](Full wizard)[/cyan] + [green]$[/green] btcli stake claim keep [cyan](Keep all subnets)[/cyan] + [green]$[/green] btcli stake claim swap [cyan](Swap all subnets)[/cyan] + [green]$[/green] btcli stake claim keep --netuids 1-5,10,20-30 [cyan](Keep specific subnets)[/cyan] + [green]$[/green] btcli stake claim swap --netuids 1-30 [cyan](Swap specific subnets)[/cyan] With specific wallet: @@ -7246,16 +7255,6 @@ def stake_set_claim_type( """ self.verbosity_handler(quiet, verbose, json_output) - if claim_type is not None: - claim_type_normalized = claim_type.capitalize() - if claim_type_normalized not in ["Keep", "Swap"]: - err_console.print( - f":cross_mark: [red]Invalid claim type '{claim_type}'. Must be 'keep' or 'swap'.[/red]" - ) - raise typer.Exit() - else: - claim_type_normalized = None - wallet = self.wallet_ask( wallet_name, wallet_path, @@ -7266,7 +7265,8 @@ def stake_set_claim_type( claim_stake.set_claim_type( wallet=wallet, subtensor=self.initialize_chain(network), - claim_type=claim_type_normalized, + claim_type=claim_type, + netuids=netuids, prompt=prompt, json_output=json_output, ) diff --git a/bittensor_cli/src/bittensor/subtensor_interface.py b/bittensor_cli/src/bittensor/subtensor_interface.py index 054d67f7a..190251979 100644 --- a/bittensor_cli/src/bittensor/subtensor_interface.py +++ b/bittensor_cli/src/bittensor/subtensor_interface.py @@ -652,6 +652,26 @@ async def subnet_exists( ) return result + async def total_networks( + self, block_hash: Optional[str] = None, reuse_block: bool = False + ) -> int: + """ + Returns the total number of subnets in the Bittensor network. + + :param block_hash: The hash of the blockchain block number at which to check the subnet existence. + :param reuse_block: Whether to reuse the last-used block hash. + + :return: The total number of subnets in the network. + """ + result = await self.query( + module="SubtensorModule", + storage_function="TotalNetworks", + params=[], + block_hash=block_hash, + reuse_block_hash=reuse_block, + ) + return result + async def get_subnet_state( self, netuid: int, block_hash: Optional[str] = None ) -> Optional["SubnetState"]: @@ -1832,13 +1852,14 @@ async def get_coldkey_claim_type( coldkey_ss58: str, block_hash: Optional[str] = None, reuse_block: bool = False, - ) -> str: + ) -> dict: """ Retrieves the root claim type for a specific coldkey. Root claim types control how staking emissions are handled on the ROOT network (subnet 0): - "Swap": Future Root Alpha Emissions are swapped to TAO at claim time and added to your root stake - "Keep": Future Root Alpha Emissions are kept as Alpha + - "KeepSubnets": Specific subnets kept as Alpha, rest swapped to TAO Args: coldkey_ss58: The SS58 address of the coldkey to query. @@ -1846,7 +1867,10 @@ async def get_coldkey_claim_type( reuse_block: Whether to reuse the last-used blockchain block hash. Returns: - str: The root claim type for the coldkey ("Swap" or "Keep"). + dict: Claim type information in one of these formats: + - {"type": "Swap"} + - {"type": "Keep"} + - {"type": "KeepSubnets", "subnets": [1, 5, 10, ...]} """ result = await self.query( module="SubtensorModule", @@ -1857,14 +1881,22 @@ async def get_coldkey_claim_type( ) if result is None: - return "Swap" - return next(iter(result.keys())) + return {"type": "Swap"} + + claim_type_key = next(iter(result.keys())) + + if claim_type_key == "KeepSubnets": + subnets_data = result["KeepSubnets"]["subnets"] + subnet_list = sorted([subnet for subnet in subnets_data[0]]) + return {"type": "KeepSubnets", "subnets": subnet_list} + else: + return {"type": claim_type_key} async def get_all_coldkeys_claim_type( self, block_hash: Optional[str] = None, reuse_block: bool = False, - ) -> dict[str, str]: + ) -> dict[str, dict]: """ Retrieves all root claim types for all coldkeys in the network. @@ -1873,7 +1905,7 @@ async def get_all_coldkeys_claim_type( reuse_block: Whether to reuse the last-used blockchain block hash. Returns: - dict[str, str]: A dictionary mapping coldkey SS58 addresses to their root claim type ("Keep" or "Swap"). + dict[str, dict]: Mapping of coldkey SS58 addresses to claim type dicts """ result = await self.substrate.query_map( module="SubtensorModule", @@ -1884,10 +1916,20 @@ async def get_all_coldkeys_claim_type( ) root_claim_types = {} - async for coldkey, claim_type in result: + async for coldkey, claim_type_data in result: coldkey_ss58 = decode_account_id(coldkey[0]) - claim_type = next(iter(claim_type.value.keys())) - root_claim_types[coldkey_ss58] = claim_type + + claim_type_key = next(iter(claim_type_data.value.keys())) + + if claim_type_key == "KeepSubnets": + subnets_data = claim_type_data.value["KeepSubnets"]["subnets"] + subnet_list = sorted([subnet for subnet in subnets_data[0]]) + root_claim_types[coldkey_ss58] = { + "type": "KeepSubnets", + "subnets": subnet_list, + } + else: + root_claim_types[coldkey_ss58] = {"type": claim_type_key} return root_claim_types diff --git a/bittensor_cli/src/bittensor/utils.py b/bittensor_cli/src/bittensor/utils.py index f8e322f01..94ae015ba 100644 --- a/bittensor_cli/src/bittensor/utils.py +++ b/bittensor_cli/src/bittensor/utils.py @@ -1051,6 +1051,58 @@ def group_subnets(registrations): return ", ".join(ranges) +def parse_subnet_range(input_str: str, total_subnets: int) -> list[int]: + """ + Parse subnet range input like "1-24, 30-40, 5". + + Args: + input_str: Comma-separated list of subnets and ranges + Examples: "1-5", "1,2,3", "1-5, 10, 20-25" + total_subnets: Total number of subnets available + + Returns: + Sorted list of unique subnet IDs + + Raises: + ValueError: If input format is invalid + + Examples: + >>> parse_subnet_range("1-5, 10") + [1, 2, 3, 4, 5, 10] + >>> parse_subnet_range("5, 3, 1") + [1, 3, 5] + """ + subnets = set() + parts = [p.strip() for p in input_str.split(",") if p.strip()] + for part in parts: + if "-" in part: + try: + start, end = part.split("-", 1) + start_num = int(start.strip()) + end_num = int(end.strip()) + + if start_num > end_num: + raise ValueError(f"Invalid range '{part}': start must be ≤ end") + + if end_num - start_num > total_subnets: + raise ValueError( + f"Range '{part}' is not valid (total of {total_subnets} subnets)" + ) + + subnets.update(range(start_num, end_num + 1)) + except ValueError as e: + if "invalid literal" in str(e): + raise ValueError(f"Invalid range '{part}': must be 'start-end'") + raise + else: + try: + subnets.add(int(part)) + except ValueError: + raise ValueError(f"Invalid subnet ID '{part}': must be a number") + + return sorted(subnets) + + def validate_chain_endpoint(endpoint_url) -> tuple[bool, str]: parsed = urlparse(endpoint_url) if parsed.scheme not in ("ws", "wss"): diff --git a/bittensor_cli/src/commands/stake/claim.py b/bittensor_cli/src/commands/stake/claim.py index 67147a82c..daf2aae62 100644 --- a/bittensor_cli/src/commands/stake/claim.py +++ b/bittensor_cli/src/commands/stake/claim.py @@ -4,6 +4,7 @@ from bittensor_wallet import Wallet from rich.prompt import Confirm, Prompt +from rich.panel import Panel from rich.table import Table, Column from rich import box @@ -16,6 +17,8 @@ print_extrinsic_id, json_console, millify_tao, + group_subnets, + parse_subnet_range, ) if TYPE_CHECKING: @@ -26,6 +29,7 @@ async def set_claim_type( wallet: Wallet, subtensor: "SubtensorInterface", claim_type: Optional[str] = None, + netuids: Optional[str] = None, prompt: bool = True, json_output: bool = False, ) -> tuple[bool, str, Optional[str]]: @@ -35,11 +39,13 @@ async def set_claim_type( Root claim types control how staking emissions are handled on the ROOT network (subnet 0): - "Swap": Future Root Alpha Emissions are swapped to TAO at claim time and added to root stake - "Keep": Future Root Alpha Emissions are kept as Alpha tokens + - "KeepSubnets": Specific subnets kept as Alpha, rest swapped to TAO Args: wallet: Bittensor wallet object subtensor: SubtensorInterface object claim_type: Optional claim type ("Keep" or "Swap"). If None, user will be prompted. + netuids: Optional string of subnet IDs (e.g., "1-5,10,20-30"). Will be parsed internally. prompt: Whether to prompt for user confirmation json_output: Whether to output JSON @@ -50,45 +56,100 @@ async def set_claim_type( - Optional[str]: Extrinsic identifier if successful """ - current_type = await subtensor.get_coldkey_claim_type( - coldkey_ss58=wallet.coldkeypub.ss58_address + if claim_type is not None: + claim_type = claim_type.capitalize() + if claim_type not in ["Keep", "Swap"]: + msg = f"Invalid claim type: {claim_type}. Use 'Keep' or 'Swap', or omit for interactive mode." + err_console.print(f"[red]{msg}[/red]") + if json_output: + json_console.print(json.dumps({"success": False, "message": msg})) + return False, msg, None + + current_claim_info, all_netuids = await asyncio.gather( + subtensor.get_coldkey_claim_type(coldkey_ss58=wallet.coldkeypub.ss58_address), + subtensor.get_all_subnet_netuids(), ) + all_subnets = sorted([n for n in all_netuids if n != 0]) + + selected_netuids = None + if netuids is not None: + try: + selected_netuids = parse_subnet_range( + netuids, total_subnets=len(all_subnets) + ) + except ValueError as e: + msg = f"Invalid netuid format: {e}" + err_console.print(f"[red]{msg}[/red]") + if json_output: + json_console.print(json.dumps({"success": False, "message": msg})) + return False, msg, None claim_table = Table( + Column("[bold white]Coldkey", style=COLORS.GENERAL.COLDKEY, justify="left"), Column( - "[bold white]Coldkey", - style=COLORS.GENERAL.COLDKEY, - justify="left", - ), - Column( - "[bold white]Root Claim Type", - style=COLORS.GENERAL.SUBHEADING, - justify="center", + "[bold white]Current Type", style=COLORS.GENERAL.SUBHEADING, justify="left" ), show_header=True, - show_footer=False, - show_edge=True, border_style="bright_black", box=box.SIMPLE, - pad_edge=False, - width=None, - title=f"\n[{COLORS.GENERAL.HEADER}]Current root claim type:[/{COLORS.GENERAL.HEADER}]", + title=f"\n[{COLORS.GENERAL.HEADER}]Current Root Claim Type[/{COLORS.GENERAL.HEADER}]", ) claim_table.add_row( - wallet.coldkeypub.ss58_address, f"[yellow]{current_type}[/yellow]" + wallet.coldkeypub.ss58_address, + _format_claim_type_display(current_claim_info, all_subnets), ) console.print(claim_table) - new_type = ( - claim_type - if claim_type - else Prompt.ask( - "Select new root claim type", choices=["Swap", "Keep"], default=current_type - ) - ) - if new_type == current_type: - msg = f"Root claim type is already set to '{current_type}'. No change needed." - console.print(f"[yellow]{msg}[/yellow]") + # Full wizard + if claim_type is None and selected_netuids is None: + new_claim_info = await _ask_for_claim_types(wallet, subtensor, all_subnets) + if new_claim_info is None: + msg = "Operation cancelled." + console.print(f"[yellow]{msg}[/yellow]") + if json_output: + json_console.print( + json.dumps( + { + "success": False, + "message": msg, + "extrinsic_identifier": None, + } + ) + ) + return False, msg, None + + # Keep netuids passed thru the cli and assume Keep type + elif claim_type is None and selected_netuids is not None: + new_claim_info = {"type": "KeepSubnets", "subnets": selected_netuids} + + else: + # Netuids passed with Keep type + if selected_netuids is not None and claim_type == "Keep": + new_claim_info = {"type": "KeepSubnets", "subnets": selected_netuids} + + # Netuids passed with Swap type + elif selected_netuids is not None and claim_type == "Swap": + keep_subnets = [n for n in all_subnets if n not in selected_netuids] + invalid = [n for n in selected_netuids if n not in all_subnets] + if invalid: + msg = f"Invalid subnets (not available): {group_subnets(invalid)}" + err_console.print(msg) + if json_output: + json_console.print(json.dumps({"success": False, "message": msg})) + return False, msg, None + + if not keep_subnets: + new_claim_info = {"type": "Swap"} + elif set(keep_subnets) == set(all_subnets): + new_claim_info = {"type": "Keep"} + else: + new_claim_info = {"type": "KeepSubnets", "subnets": keep_subnets} + else: + new_claim_info = {"type": claim_type} + + if _claim_types_equal(current_claim_info, new_claim_info): + msg = f"Claim type already set to {_format_claim_type_display(new_claim_info)}. \nNo change needed." + console.print(msg) if json_output: json_console.print( json.dumps( @@ -96,8 +157,6 @@ async def set_claim_type( "success": True, "message": msg, "extrinsic_identifier": None, - "old_type": current_type, - "new_type": current_type, } ) ) @@ -105,59 +164,33 @@ async def set_claim_type( if prompt: console.print( - f"\n[bold]Changing root claim type from '{current_type}' -> '{new_type}'[/bold]\n" - ) - - if new_type == "Swap": - console.print( - "[yellow]Note:[/yellow] With 'Swap', future root alpha emissions will be swapped to TAO and added to root stake." - ) - else: - console.print( - "[yellow]Note:[/yellow] With 'Keep', future root alpha emissions will be kept as Alpha tokens." + Panel( + f"[{COLORS.GENERAL.HEADER}]Confirm Claim Type Change[/{COLORS.GENERAL.HEADER}]\n\n" + f"FROM: {_format_claim_type_display(current_claim_info, all_subnets)}\n\n" + f"TO: {_format_claim_type_display(new_claim_info, all_subnets)}" ) + ) - if not Confirm.ask("\nDo you want to proceed?"): + if not Confirm.ask("\nProceed with this change?"): msg = "Operation cancelled." console.print(f"[yellow]{msg}[/yellow]") if json_output: - json_console.print( - json.dumps( - { - "success": False, - "message": msg, - "extrinsic_identifier": None, - "old_type": current_type, - "new_type": new_type, - } - ) - ) + json_console.print(json.dumps({"success": False, "message": msg})) return False, msg, None if not (unlock := unlock_key(wallet)).success: msg = f"Failed to unlock wallet: {unlock.message}" err_console.print(f":cross_mark: [red]{msg}[/red]") if json_output: - json_console.print( - json.dumps( - { - "success": False, - "message": msg, - "extrinsic_identifier": None, - "old_type": current_type, - "new_type": new_type, - } - ) - ) + json_console.print(json.dumps({"success": False, "message": msg})) return False, msg, None - with console.status( - f":satellite: Setting root claim type to '{new_type}'...", spinner="earth" - ): + with console.status(":satellite: Setting root claim type...", spinner="earth"): + claim_type_param = _prepare_claim_type_args(new_claim_info) call = await subtensor.substrate.compose_call( call_module="SubtensorModule", call_function="set_root_claim_type", - call_params={"new_root_claim_type": new_type}, + call_params={"new_root_claim_type": claim_type_param}, ) success, err_msg, ext_receipt = await subtensor.sign_and_send_extrinsic( call, wallet @@ -165,7 +198,7 @@ async def set_claim_type( if success: ext_id = await ext_receipt.get_extrinsic_identifier() - msg = f"Successfully set root claim type to '{new_type}'" + msg = "Successfully changed claim type" console.print(f":white_heavy_check_mark: [green]{msg}[/green]") await print_extrinsic_id(ext_receipt) if json_output: @@ -175,28 +208,15 @@ async def set_claim_type( "success": True, "message": msg, "extrinsic_identifier": ext_id, - "old_type": current_type, - "new_type": new_type, } ) ) return True, msg, ext_id - else: - msg = f"Failed to set root claim type: {err_msg}" + msg = f"Failed to set claim type: {err_msg}" err_console.print(f":cross_mark: [red]{msg}[/red]") if json_output: - json_console.print( - json.dumps( - { - "success": False, - "message": msg, - "extrinsic_identifier": None, - "old_type": current_type, - "new_type": new_type, - } - ) - ) + json_console.print(json.dumps({"success": False, "message": msg})) return False, msg, None @@ -479,3 +499,239 @@ def _print_claimable_table( first_row = False console.print(table) + + +async def _ask_for_claim_types( + wallet: Wallet, + subtensor: "SubtensorInterface", + all_subnets: list, +) -> Optional[dict]: + """ + Interactive prompts for claim type selection. + + Flow: + 1. Ask "Keep or Swap?" + 2. Ask "All subnets?" + - If yes → return simple type (Keep or Swap) + - If no → enter subnet selection + + Returns: + dict: Selected claim type, or None if cancelled + """ + + console.print("\n") + console.print( + Panel( + f"[{COLORS.GENERAL.HEADER}]Root Claim Type Selection[/{COLORS.GENERAL.HEADER}]\n\n" + "Configure how your root network emissions are claimed.\n\n" + "[yellow]Options:[/yellow]\n" + " • [green]Swap[/green] - Convert emissions to TAO\n" + " • [green]Keep[/green] - Keep emissions as Alpha\n" + " • [green]Keep Specific[/green] - Keep selected subnets, swap others\n", + ) + ) + + primary_choice = Prompt.ask( + "\nSelect new root claim type", + choices=["keep", "swap", "cancel"], + default="cancel", + ) + if primary_choice == "cancel": + return None + + apply_to_all = Confirm.ask( + f"\nSet {primary_choice.capitalize()} to ALL subnets?", default=True + ) + + if apply_to_all: + return {"type": primary_choice.capitalize()} + + if primary_choice == "keep": + console.print( + "\nYou can select which subnets to KEEP as Alpha (others will be swapped to TAO).\n" + ) + else: + console.print( + "\nYou can select which subnets to SWAP to TAO (others will be kept as Alpha).\n" + ) + + return await _prompt_claim_netuids( + wallet, subtensor, all_subnets, mode=primary_choice + ) + + +async def _prompt_claim_netuids( + wallet: Wallet, + subtensor: "SubtensorInterface", + all_subnets: list, + mode: str = "keep", +) -> Optional[dict]: + """ + Interactive subnet selection. + + Args: + mode: "keep" to select subnets to keep as Alpha, "swap" to select subnets to swap to TAO + + Returns: + dict: KeepSubnets claim type or None if cancelled + """ + + if not all_subnets: + console.print("[yellow]No subnets available.[/yellow]") + return {"type": "Swap"} + + if mode == "keep": + action = "KEEP as Alpha" + else: + action = "SWAP to TAO" + + console.print( + Panel( + f"[{COLORS.GENERAL.HEADER}]Subnet Selection[/{COLORS.GENERAL.HEADER}]\n\n" + f"[bold]Available subnets:[/bold] {group_subnets(sorted(all_subnets))}\n" + f"[dim]Total: {len(all_subnets)} subnets[/dim]\n\n" + "[yellow]Input examples:[/yellow]\n" + " • [cyan]1-10[/cyan] - Range from 1 to 10\n" + " • [cyan]1, 5, 10[/cyan] - Specific subnets\n" + " • [cyan]1-10, 20-30, 50[/cyan] - Mixed" + ) + ) + + while True: + subnet_input = Prompt.ask( + f"\nEnter subnets to {action} [dim]{group_subnets(sorted(all_subnets))}", + default="", + ) + + if not subnet_input.strip(): + err_console.print("[red]No subnets entered. Please try again.[/red]") + continue + + try: + selected = parse_subnet_range(subnet_input, total_subnets=len(all_subnets)) + invalid = [s for s in selected if s not in all_subnets] + if invalid: + err_console.print( + f"[red]Invalid subnets (not available): {group_subnets(invalid)}[/red]" + ) + err_console.print("[yellow]Please try again.[/yellow]") + continue + + if mode == "keep": + keep_subnets = selected + else: + keep_subnets = [n for n in all_subnets if n not in selected] + + if _preview_subnet_selection(keep_subnets, all_subnets): + if not keep_subnets: + return {"type": "Swap"} + elif set(keep_subnets) == set(all_subnets): + return {"type": "Keep"} + else: + return {"type": "KeepSubnets", "subnets": keep_subnets} + else: + console.print( + "[yellow]Selection cancelled. Starting over...[/yellow]\n" + ) + return await _prompt_claim_netuids( + wallet, subtensor, all_subnets, mode=mode + ) + + except ValueError as e: + err_console.print( + f"Invalid subnet selection: {e}\n[yellow]Please try again." + ) + + +def _preview_subnet_selection(keep_subnets: list[int], all_subnets: list[int]) -> bool: + """Show preview and ask for confirmation.""" + + swap_subnets = [n for n in all_subnets if n not in keep_subnets] + preview_content = ( + f"[{COLORS.GENERAL.HEADER}]Preview Your Selection[/{COLORS.GENERAL.HEADER}]\n\n" + ) + + if keep_subnets: + preview_content += ( + f"[green]✓ Keep as Alpha:[/green] {group_subnets(keep_subnets)}\n" + f"[dim] ({len(keep_subnets)} subnet{'s' if len(keep_subnets) != 1 else ''})[/dim]" + ) + else: + preview_content += "[dim]No subnets kept as Alpha[/dim]" + + if swap_subnets: + preview_content += ( + f"\n\n[yellow]⟳ Swap to TAO:[/yellow] {group_subnets(swap_subnets)}\n" + f"[dim] ({len(swap_subnets)} subnet{'s' if len(swap_subnets) != 1 else ''})[/dim]" + ) + else: + preview_content += "\n\n[dim]No subnets swapped to TAO[/dim]" + + console.print(Panel(preview_content)) + + return Confirm.ask("\nIs this correct?", default=True) + + +def _format_claim_type_display( + claim_info: dict, all_subnets: Optional[list[int]] = None +) -> str: + """ + Format claim type for human-readable display. + + Args: + claim_info: Claim type information dict + all_subnets: Optional list of all available subnets (for showing swap info) + """ + + claim_type = claim_info["type"] + if claim_type == "Swap": + return "[yellow]Swap All[/yellow]" + + elif claim_type == "Keep": + return "[dark_sea_green3]Keep All[/dark_sea_green3]" + + elif claim_type == "KeepSubnets": + subnets = claim_info["subnets"] + subnet_display = group_subnets(subnets) + + result = ( + f"[cyan]Keep Specific[/cyan]\n[green] ✓ Keep:[/green] {subnet_display}" + ) + if all_subnets: + swap_subnets = [n for n in all_subnets if n not in subnets] + if swap_subnets: + swap_display = group_subnets(swap_subnets) + result += f"\n[yellow] ⟳ Swap:[/yellow] {swap_display}" + + return result + else: + return "[red]Unknown[/red]" + + +def _claim_types_equal(claim1: dict, claim2: dict) -> bool: + """Check if two claim type configs are equivalent.""" + + if claim1["type"] != claim2["type"]: + return False + + if claim1["type"] == "KeepSubnets": + subnets1 = sorted(claim1.get("subnets", [])) + subnets2 = sorted(claim2.get("subnets", [])) + return subnets1 == subnets2 + + return True + + +def _prepare_claim_type_args(claim_info: dict) -> dict: + """Convert claim type arguments for chain call""" + + claim_type = claim_info["type"] + if claim_type == "Swap": + return {"Swap": None} + elif claim_type == "Keep": + return {"Keep": None} + elif claim_type == "KeepSubnets": + subnets = claim_info["subnets"] + return {"KeepSubnets": {"subnets": subnets}} + else: + raise ValueError(f"Unknown claim type: {claim_type}") diff --git a/bittensor_cli/src/commands/subnets/subnets.py b/bittensor_cli/src/commands/subnets/subnets.py index c2346880b..4db9b3dd3 100644 --- a/bittensor_cli/src/commands/subnets/subnets.py +++ b/bittensor_cli/src/commands/subnets/subnets.py @@ -49,6 +49,64 @@ # helpers and extrinsics +def format_claim_type_for_root(claim_info: dict, total_subnets: int) -> str: + """ + Format claim type for root network metagraph. + + Args: + claim_info: Claim type dict {"type": "...", "subnets": [...]} + total_subnets: Total number of subnets in network (excluding netuid 0) + + Returns: + Formatted string showing keep/swap counts + + Examples: + {"type": "Keep"} → "Keep all" + {"type": "Swap"} → "Swap all" + {"type": "KeepSubnets", "subnets": [1,2,3]} → "Keep (3), Swap (54)" + """ + claim_type = claim_info.get("type", "Swap") + + if claim_type == "Keep": + return "Keep all" + elif claim_type == "Swap": + return "Swap all" + else: + keep_subnets = claim_info.get("subnets", []) + keep_count = len(keep_subnets) + swap_count = total_subnets - keep_count + return f"Keep ({keep_count}), Swap ({swap_count})" + + +def format_claim_type_for_subnet(claim_info: dict, current_netuid: int) -> str: + """ + Format claim type for specific subnet metagraph. + Shows whether THIS subnet's emissions are kept or swapped. + + Args: + claim_info: Claim type dict {"type": "...", "subnets": [...]} + current_netuid: The netuid being viewed + + Returns: + "Keep" if this subnet is kept, "Swap" if swapped + + Examples: + {"type": "Keep"}, netuid=5 → "Keep" + {"type": "Swap"}, netuid=5 → "Swap" + {"type": "KeepSubnets", "subnets": [1,5,10]}, netuid=5 → "Keep" + {"type": "KeepSubnets", "subnets": [1,5,10]}, netuid=3 → "Swap" + """ + claim_type = claim_info.get("type", "Swap") + + if claim_type == "Keep": + return "Keep" + elif claim_type == "Swap": + return "Swap" + else: + keep_subnets = claim_info.get("subnets", []) + return "Keep" if current_netuid in keep_subnets else "Swap" + + async def register_subnetwork_extrinsic( subtensor: "SubtensorInterface", wallet: Wallet, @@ -1077,7 +1135,9 @@ async def show_root(): ) coldkey_ss58 = root_state.coldkeys[idx] - claim_type = root_claim_types.get(coldkey_ss58, "Swap") + claim_type_info = root_claim_types.get(coldkey_ss58, {"type": "Swap"}) + total_subnets = len([n for n in all_subnets if n != 0]) + claim_type = format_claim_type_for_root(claim_type_info, total_subnets) sorted_rows.append( ( @@ -1151,7 +1211,8 @@ async def show_root(): - Emission: The emission accrued to this hotkey across all subnets every block measured in TAO. - Hotkey: The hotkey ss58 address. - Coldkey: The coldkey ss58 address. - - Root Claim: The root claim type for this coldkey. 'Swap' converts Alpha to TAO every epoch. 'Keep' keeps Alpha emissions. + - Root Claim: The root claim type for this coldkey. 'Swap' converts Alpha to TAO every epoch. 'Keep' keeps Alpha emissions. + 'Keep (count)' indicates how many subnets this coldkey is keeping Alpha emissions for. """ ) if delegate_selection: @@ -1326,10 +1387,12 @@ async def show_subnet( # Get claim type for this coldkey if applicable TAO stake coldkey_ss58 = metagraph_info.coldkeys[idx] + claim_type_info = {"type": "Swap"} # Default + claim_type = "-" + if tao_stake.tao > 0: - claim_type = root_claim_types.get(coldkey_ss58, "Swap") - else: - claim_type = "-" + claim_type_info = root_claim_types.get(coldkey_ss58, {"type": "Swap"}) + claim_type = format_claim_type_for_subnet(claim_type_info, netuid_) rows.append( ( @@ -1370,7 +1433,12 @@ async def show_subnet( "hotkey": metagraph_info.hotkeys[idx], "coldkey": metagraph_info.coldkeys[idx], "identity": uid_identity, - "claim_type": claim_type, + "claim_type": claim_type_info.get("type") + if tao_stake.tao > 0 + else None, + "claim_type_subnets": claim_type_info.get("subnets") + if claim_type_info.get("type") == "KeepSubnets" + else None, } )