diff --git a/bittensor/_cli/__init__.py b/bittensor/_cli/__init__.py index 6525929768..8e655d60cb 100644 --- a/bittensor/_cli/__init__.py +++ b/bittensor/_cli/__init__.py @@ -26,7 +26,7 @@ import bittensor import torch -from rich.prompt import Confirm, Prompt +from rich.prompt import Confirm, Prompt, PromptBase from . import cli_impl @@ -823,6 +823,36 @@ def check_overview_config( config: 'bittensor.Config' ): wallet_name = Prompt.ask("Enter wallet name", default = bittensor.defaults.wallet.name) config.wallet.name = str(wallet_name) + def _check_for_cuda_reg_config( config: 'bittensor.Config' ) -> None: + """Checks, when CUDA is available, if the user would like to register with their CUDA device.""" + if torch.cuda.is_available(): + if config.subtensor.register.cuda.get('use_cuda') is None: + # Ask about cuda registration only if a CUDA device is available. + cuda = Confirm.ask("Detected CUDA device, use CUDA for registration?\n") + config.subtensor.register.cuda.use_cuda = cuda + + # Only ask about which CUDA device if the user has more than one CUDA device. + if config.subtensor.register.cuda.use_cuda and config.subtensor.register.cuda.get('dev_id') is None and torch.cuda.device_count() > 0: + devices: List[str] = [str(x) for x in range(torch.cuda.device_count())] + device_names: List[str] = [torch.cuda.get_device_name(x) for x in range(torch.cuda.device_count())] + console.print("Available CUDA devices:") + choices_str: str = "" + for i, device in enumerate(devices): + choices_str += (" {}: {}\n".format(device, device_names[i])) + console.print(choices_str) + dev_id = IntListPrompt.ask("Which GPU(s) would you like to use? Please list one, or comma-separated", choices=devices, default='All') + if dev_id == 'All': + dev_id = list(range(torch.cuda.device_count())) + else: + try: + # replace the commas with spaces then split over whitespace., + # then strip the whitespace and convert to ints. + dev_id = [int(dev_id.strip()) for dev_id in dev_id.replace(',', ' ').split()] + except ValueError: + console.error(":cross_mark:[red]Invalid GPU device[/red] [bold white]{}[/bold white]\nAvailable CUDA devices:{}".format(dev_id, choices_str)) + sys.exit(1) + config.subtensor.register.cuda.dev_id = dev_id + def check_register_config( config: 'bittensor.Config' ): if config.subtensor.get('network') == bittensor.defaults.subtensor.network and not config.no_prompt: config.subtensor.network = Prompt.ask("Enter subtensor network", choices=bittensor.__networks__, default = bittensor.defaults.subtensor.network) @@ -835,27 +865,8 @@ def check_register_config( config: 'bittensor.Config' ): hotkey = Prompt.ask("Enter hotkey name", default = bittensor.defaults.wallet.hotkey) config.wallet.hotkey = str(hotkey) - if not config.no_prompt and config.subtensor.register.cuda.use_cuda == bittensor.defaults.subtensor.register.cuda.use_cuda: - # Ask about cuda registration only if a CUDA device is available. - if torch.cuda.is_available(): - cuda = Confirm.ask("Detected CUDA device, use CUDA for registration?\n") - config.subtensor.register.cuda.use_cuda = cuda - # Only ask about which CUDA device if the user has more than one CUDA device. - if cuda and config.subtensor.register.cuda.get('dev_id') is None and torch.cuda.device_count() > 0: - devices: List[str] = [str(x) for x in range(torch.cuda.device_count())] - device_names: List[str] = [torch.cuda.get_device_name(x) for x in range(torch.cuda.device_count())] - console.print("Available CUDA devices:") - choices_str: str = "" - for i, device in enumerate(devices): - choices_str += (" {}: {}\n".format(device, device_names[i])) - console.print(choices_str) - dev_id = Prompt.ask("Which GPU would you like to use?", choices=devices, default=str(bittensor.defaults.subtensor.register.cuda.dev_id)) - try: - dev_id = int(dev_id) - except ValueError: - console.error(":cross_mark:[red]Invalid GPU device[/red] [bold white]{}[/bold white]\nAvailable CUDA devices:{}".format(dev_id, choices_str)) - sys.exit(1) - config.subtensor.register.cuda.dev_id = dev_id + if not config.no_prompt: + cli._check_for_cuda_reg_config(config) def check_new_coldkey_config( config: 'bittensor.Config' ): if config.wallet.get('name') == bittensor.defaults.wallet.name and not config.no_prompt: @@ -931,6 +942,10 @@ def check_run_config( config: 'bittensor.Config' ): if 'server' in config.model and not config.no_prompt: synapse = Prompt.ask('Enter synapse', choices = list(bittensor.synapse.__synapses_types__), default = 'All') config.synapse = synapse + + # Don't need to ask about registration if they don't want to reregister the wallet. + if config.wallet.get('reregister', bittensor.defaults.wallet.reregister) and not config.no_prompt: + cli._check_for_cuda_reg_config(config) def check_help_config( config: 'bittensor.Config'): if config.model == 'None': @@ -941,3 +956,13 @@ def check_update_config( config: 'bittensor.Config'): if not config.no_prompt: answer = Prompt.ask('This will update the local bittensor package', choices = ['Y','N'], default = 'Y') config.answer = answer + +class IntListPrompt(PromptBase): + """ Prompt for a list of integers. """ + + def check_choice( self, value: str ) -> bool: + assert self.choices is not None + # check if value is a valid choice or all the values in a list of ints are valid choices + return value == "All" or \ + value in self.choices or \ + all( val.strip() in self.choices for val in value.replace(',', ' ').split( )) diff --git a/bittensor/_cli/cli_impl.py b/bittensor/_cli/cli_impl.py index bdce4358dc..bdc4744e1a 100644 --- a/bittensor/_cli/cli_impl.py +++ b/bittensor/_cli/cli_impl.py @@ -309,7 +309,7 @@ def unstake( self ): if not self.config.no_prompt: if not Confirm.ask("Do you want to unstake from the following keys:\n" + \ "".join([ - f" [bold white]- {wallet.hotkey_str}: {amount.tao}𝜏[/bold white]\n" for wallet, amount in zip(final_wallets, final_amounts) + f" [bold white]- {wallet.hotkey_str}: {amount}𝜏[/bold white]\n" for wallet, amount in zip(final_wallets, final_amounts) ]) ): return None diff --git a/bittensor/_subtensor/__init__.py b/bittensor/_subtensor/__init__.py index 8c0be7c88f..3b0c870671 100644 --- a/bittensor/_subtensor/__init__.py +++ b/bittensor/_subtensor/__init__.py @@ -15,22 +15,16 @@ # OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER # DEALINGS IN THE SOFTWARE. import argparse +import copy import os -import random -import time -import psutil -import subprocess -from sys import platform - import bittensor -import copy +from loguru import logger from substrateinterface import SubstrateInterface +from torch.cuda import is_available as is_cuda_available -from . import subtensor_impl -from . import subtensor_mock +from . import subtensor_impl, subtensor_mock -from loguru import logger logger = logger.opt(colors=True) __type_registery__ = { @@ -193,8 +187,9 @@ def add_args(cls, parser: argparse.ArgumentParser, prefix: str = None ): parser.add_argument('--' + prefix_str + 'subtensor.register.num_processes', '-n', dest='subtensor.register.num_processes', help="Number of processors to use for registration", type=int, default=bittensor.defaults.subtensor.register.num_processes) parser.add_argument('--' + prefix_str + 'subtensor.register.update_interval', '--' + prefix_str + 'subtensor.register.cuda.update_interval', '--' + prefix_str + 'cuda.update_interval', '-u', help="The number of nonces to process before checking for next block during registration", type=int, default=bittensor.defaults.subtensor.register.update_interval) # registration args. Used for register and re-register and anything that calls register. - parser.add_argument( '--' + prefix_str + 'subtensor.register.cuda.use_cuda', '--' + prefix_str + 'cuda', '--' + prefix_str + 'cuda.use_cuda', default=bittensor.defaults.subtensor.register.cuda.use_cuda, help='''Set true to use CUDA.''', action='store_true', required=False ) - parser.add_argument( '--' + prefix_str + 'subtensor.register.cuda.dev_id', '--' + prefix_str + 'cuda.dev_id', type=int, default=argparse.SUPPRESS, help='''Set the CUDA device id. Goes by the order of speed. (i.e. 0 is the fastest).''', required=False ) + parser.add_argument( '--' + prefix_str + 'subtensor.register.cuda.use_cuda', '--' + prefix_str + 'cuda', '--' + prefix_str + 'cuda.use_cuda', default=argparse.SUPPRESS, help='''Set true to use CUDA.''', action='store_true', required=False ) + parser.add_argument( '--' + prefix_str + 'subtensor.register.cuda.dev_id', '--' + prefix_str + 'cuda.dev_id', type=int, nargs='+', default=argparse.SUPPRESS, help='''Set the CUDA device id(s). Goes by the order of speed. (i.e. 0 is the fastest).''', required=False ) + parser.add_argument( '--' + prefix_str + 'subtensor.register.cuda.TPB', '--' + prefix_str + 'cuda.TPB', type=int, default=bittensor.defaults.subtensor.register.cuda.TPB, help='''Set the number of Threads Per Block for CUDA.''', required=False ) except argparse.ArgumentError: @@ -215,7 +210,7 @@ def add_defaults(cls, defaults ): defaults.subtensor.register.update_interval = os.getenv('BT_SUBTENSOR_REGISTER_UPDATE_INTERVAL') if os.getenv('BT_SUBTENSOR_REGISTER_UPDATE_INTERVAL') != None else 50_000 defaults.subtensor.register.cuda = bittensor.Config() - defaults.subtensor.register.cuda.dev_id = 0 + defaults.subtensor.register.cuda.dev_id = [0] defaults.subtensor.register.cuda.use_cuda = False defaults.subtensor.register.cuda.TPB = 256 @@ -223,6 +218,18 @@ def add_defaults(cls, defaults ): def check_config( config: 'bittensor.Config' ): assert config.subtensor #assert config.subtensor.network != None + if config.subtensor.get('register') and config.subtensor.register.get('cuda'): + assert all((isinstance(x, int) or isinstance(x, str) and x.isnumeric() ) for x in config.subtensor.register.cuda.get('dev_id', [])) + + if config.subtensor.register.cuda.get('use_cuda', False): + try: + import cubit + except ImportError: + raise ImportError('CUDA registration is enabled but cubit is not installed. Please install cubit.') + + if not is_cuda_available(): + raise RuntimeError('CUDA registration is enabled but no CUDA devices are detected.') + @staticmethod def determine_chain_endpoint(network: str): diff --git a/bittensor/_subtensor/subtensor_impl.py b/bittensor/_subtensor/subtensor_impl.py index 0feb96ab16..5da7dd1232 100644 --- a/bittensor/_subtensor/subtensor_impl.py +++ b/bittensor/_subtensor/subtensor_impl.py @@ -445,7 +445,7 @@ def register ( prompt: bool = False, max_allowed_attempts: int = 3, cuda: bool = False, - dev_id: int = 0, + dev_id: Union[List[int], int] = 0, TPB: int = 256, num_processes: Optional[int] = None, update_interval: Optional[int] = None, @@ -465,11 +465,11 @@ def register ( max_allowed_attempts (int): Maximum number of attempts to register the wallet. cuda (bool): - If true, the wallet should be registered on the cuda device. - dev_id (int): - The cuda device id. + If true, the wallet should be registered using CUDA device(s). + dev_id (Union[List[int], int]): + The CUDA device id to use, or a list of device ids. TPB (int): - The number of threads per block (cuda). + The number of threads per block (CUDA). num_processes (int): The number of processes to use to register. update_interval (int): @@ -504,73 +504,75 @@ def register ( else: pow_result = bittensor.utils.create_pow( self, wallet, num_processes=num_processes, update_interval=update_interval) - # pow failed - if not pow_result: - # might be registered already - if (wallet.is_registered( self )): - bittensor.__console__.print(":white_heavy_check_mark: [green]Registered[/green]") - return True - - # pow successful, proceed to submit pow to chain for registration - else: - with bittensor.__console__.status(":satellite: Submitting POW..."): - # check if pow result is still valid - while bittensor.utils.POWNotStale(self, pow_result): - with self.substrate as substrate: - # create extrinsic call - call = substrate.compose_call( - call_module='SubtensorModule', - call_function='register', - call_params={ - 'block_number': pow_result['block_number'], - 'nonce': pow_result['nonce'], - 'work': bittensor.utils.hex_bytes_to_u8_list( pow_result['work'] ), - 'hotkey': wallet.hotkey.ss58_address, - 'coldkey': wallet.coldkeypub.ss58_address - } - ) - extrinsic = substrate.create_signed_extrinsic( call = call, keypair = wallet.hotkey ) - response = substrate.submit_extrinsic( extrinsic, wait_for_inclusion=wait_for_inclusion, wait_for_finalization=wait_for_finalization ) - - # We only wait here if we expect finalization. - if not wait_for_finalization and not wait_for_inclusion: - bittensor.__console__.print(":white_heavy_check_mark: [green]Sent[/green]") + # pow failed + if not pow_result: + # might be registered already + if (wallet.is_registered( self )): + bittensor.__console__.print(":white_heavy_check_mark: [green]Registered[/green]") + return True + + # pow successful, proceed to submit pow to chain for registration + else: + with bittensor.__console__.status(":satellite: Submitting POW..."): + # check if pow result is still valid + while bittensor.utils.POWNotStale(self, pow_result): + with self.substrate as substrate: + # create extrinsic call + call = substrate.compose_call( + call_module='SubtensorModule', + call_function='register', + call_params={ + 'block_number': pow_result['block_number'], + 'nonce': pow_result['nonce'], + 'work': bittensor.utils.hex_bytes_to_u8_list( pow_result['work'] ), + 'hotkey': wallet.hotkey.ss58_address, + 'coldkey': wallet.coldkeypub.ss58_address + } + ) + extrinsic = substrate.create_signed_extrinsic( call = call, keypair = wallet.hotkey ) + response = substrate.submit_extrinsic( extrinsic, wait_for_inclusion=wait_for_inclusion, wait_for_finalization=wait_for_finalization ) + + # We only wait here if we expect finalization. + if not wait_for_finalization and not wait_for_inclusion: + bittensor.__console__.print(":white_heavy_check_mark: [green]Sent[/green]") + return True + + # process if registration successful, try again if pow is still valid + response.process_events() + if not response.is_success: + if 'key is already registered' in response.error_message: + # Error meant that the key is already registered. + bittensor.__console__.print(":white_heavy_check_mark: [green]Already Registered[/green]") + return True + + bittensor.__console__.print(":cross_mark: [red]Failed[/red]: error:{}".format(response.error_message)) + time.sleep(0.5) + + # Successful registration, final check for neuron and pubkey + else: + bittensor.__console__.print(":satellite: Checking Balance...") + neuron = self.neuron_for_pubkey( wallet.hotkey.ss58_address ) + if not neuron.is_null: + bittensor.__console__.print(":white_heavy_check_mark: [green]Registered[/green]") return True - - # process if registration successful, try again if pow is still valid - response.process_events() - if not response.is_success: - if 'key is already registered' in response.error_message: - # Error meant that the key is already registered. - bittensor.__console__.print(":white_heavy_check_mark: [green]Already Registered[/green]") - return True - - bittensor.__console__.print(":cross_mark: [red]Failed[/red]: error:{}".format(response.error_message)) - time.sleep(0.5) - - # Successful registration, final check for neuron and pubkey else: - bittensor.__console__.print(":satellite: Checking Balance...") - neuron = self.neuron_for_pubkey( wallet.hotkey.ss58_address ) - if not neuron.is_null: - bittensor.__console__.print(":white_heavy_check_mark: [green]Registered[/green]") - return True - else: - # neuron not found, try again - bittensor.__console__.print(":cross_mark: [red]Unknown error. Neuron not found.[/red]") - continue - else: - # Exited loop because pow is no longer valid. - bittensor.__console__.print( "[red]POW is stale.[/red]" ) - return False - if attempts < max_allowed_attempts: - #Failed registration, retry pow - attempts += 1 - bittensor.__console__.print( ":satellite: Failed registration, retrying pow ...({}/{})".format(attempts, max_allowed_attempts)) - else: - # Failed to register after max attempts. - bittensor.__console__.print( "[red]No more attempts.[/red]" ) - return False + # neuron not found, try again + bittensor.__console__.print(":cross_mark: [red]Unknown error. Neuron not found.[/red]") + continue + else: + # Exited loop because pow is no longer valid. + bittensor.__console__.print( "[red]POW is stale.[/red]" ) + # Try again. + continue + + if attempts < max_allowed_attempts: + #Failed registration, retry pow + attempts += 1 + bittensor.__console__.print( ":satellite: Failed registration, retrying pow ...({}/{})".format(attempts, max_allowed_attempts)) + else: + # Failed to register after max attempts. + bittensor.__console__.print( "[red]No more attempts.[/red]" ) + return False def serve ( self, diff --git a/bittensor/utils/__init__.py b/bittensor/utils/__init__.py index 3a5b353b8d..ef448484e2 100644 --- a/bittensor/utils/__init__.py +++ b/bittensor/utils/__init__.py @@ -1,5 +1,4 @@ import binascii -import datetime import hashlib import math import multiprocessing @@ -9,7 +8,7 @@ import time from dataclasses import dataclass from queue import Empty -from typing import Any, Dict, Optional, Tuple, Union +from typing import Any, Dict, List, Optional, Tuple, Union import backoff import bittensor @@ -20,7 +19,12 @@ from substrateinterface import Keypair from substrateinterface.utils import ss58 -from .register_cuda import reset_cuda, solve_cuda +from .register_cuda import solve_cuda + + +class CUDAException(Exception): + """An exception raised when an error occurs in the CUDA environment.""" + pass def indexed_values_to_dataframe ( @@ -140,7 +144,7 @@ class POWSolution: difficulty: int seal: bytes -class Solver(multiprocessing.Process): +class SolverBase(multiprocessing.Process): """ A process that solves the registration PoW problem. @@ -188,7 +192,7 @@ class Solver(multiprocessing.Process): proc_num: int num_proc: int update_interval: int - best_queue: multiprocessing.Queue + best_queue: Optional[multiprocessing.Queue] time_queue: multiprocessing.Queue solution_queue: multiprocessing.Queue newBlockEvent: multiprocessing.Event @@ -216,6 +220,10 @@ def __init__(self, proc_num, num_proc, update_interval, best_queue, time_queue, self.stopEvent = stopEvent self.limit = limit + def run(self): + raise NotImplementedError("SolverBase is an abstract class") + +class Solver(SolverBase): def run(self): block_number: int block_bytes: bytes @@ -250,6 +258,72 @@ def run(self): nonce_start += self.update_interval * self.num_proc nonce_end += self.update_interval * self.num_proc +class CUDASolver(SolverBase): + dev_id: int + TPB: int + + def __init__(self, proc_num, num_proc, update_interval, time_queue, solution_queue, stopEvent, curr_block, curr_block_num, curr_diff, check_block, limit, dev_id: int, TPB: int): + super().__init__(proc_num, num_proc, update_interval, None, time_queue, solution_queue, stopEvent, curr_block, curr_block_num, curr_diff, check_block, limit) + self.dev_id = dev_id + self.TPB = TPB + + def run(self): + block_number: int + block_bytes: bytes + block_difficulty: int + nonce_limit = int(math.pow(2,64)) - 1 + + # Start at random nonce + nonce_start = self.TPB * self.update_interval * self.proc_num + random.randint( 0, nonce_limit ) + nonce_end = nonce_start + self.update_interval * self.TPB + while not self.stopEvent.is_set(): + if self.newBlockEvent.is_set(): + with self.check_block: + block_number = self.curr_block_num.value + block_bytes = bytes(self.curr_block) + block_difficulty = registration_diff_unpack(self.curr_diff) + + self.newBlockEvent.clear() + # reset nonces to start from random point + nonce_start = self.update_interval * self.proc_num + random.randint( 0, nonce_limit ) + nonce_end = nonce_start + self.update_interval + + # Do a block of nonces + solution, time = solve_for_nonce_block_cuda(self, nonce_start, self.update_interval, block_bytes, block_difficulty, self.limit, block_number, self.dev_id, self.TPB) + if solution is not None: + self.solution_queue.put(solution) + + # Send time + self.time_queue.put_nowait(time) + + nonce_start += self.update_interval * self.num_proc + nonce_start = nonce_start % nonce_limit + nonce_end += self.update_interval * self.num_proc + + +def solve_for_nonce_block_cuda(solver: CUDASolver, nonce_start: int, update_interval: int, block_bytes: bytes, difficulty: int, limit: int, block_number: int, dev_id: int, TPB: int) -> Tuple[Optional[POWSolution], int]: + start = time.time() + + solution, seal = solve_cuda(nonce_start, + update_interval, + TPB, + block_bytes, + block_number, + difficulty, + limit, + dev_id) + + if (solution != -1): + # Check if solution is valid + # Attempt to reset CUDA device + #reset_cuda() + + #print(f"{solver.proc_num} on cuda:{solver.dev_id} found a solution: {solution}, {block_number}, {str(block_bytes)}, {str(seal)}, {difficulty}") + # Found a solution, save it. + return POWSolution(solution, block_number, difficulty, seal), time.time() - start + + return None, time.time() - start + def solve_for_nonce_block(solver: Solver, nonce_start: int, nonce_end: int, block_bytes: bytes, difficulty: int, limit: int, block_number: int) -> Tuple[Optional[POWSolution], int]: best_local = float('inf') @@ -297,6 +371,12 @@ def update_curr_block(curr_diff: multiprocessing.Array, curr_block: multiprocess curr_block[i] = block_bytes[i] registration_diff_pack(diff, curr_diff) +def get_cpu_count(): + try: + return len(os.sched_getaffinity(0)) + except AttributeError: + # OSX does not have sched_getaffinity + return os.cpu_count() def solve_for_difficulty_fast( subtensor, wallet, num_processes: Optional[int] = None, update_interval: Optional[int] = None ) -> Optional[POWSolution]: """ @@ -317,7 +397,7 @@ def solve_for_difficulty_fast( subtensor, wallet, num_processes: Optional[int] = """ if num_processes == None: # get the number of allowed processes for this process - num_processes = len(os.sched_getaffinity(0)) + num_processes = min(1, get_cpu_count()) if update_interval is None: update_interval = 50_000 @@ -401,12 +481,11 @@ def solve_for_difficulty_fast( subtensor, wallet, num_processes: Optional[int] = # Get times for each solver time_total = 0 num_time = 0 - while time_queue.qsize() > 0: + + for _ in solvers: try: - time_ = time_queue.get_nowait() - time_total += time_ + time_total += time_queue.get_nowait() num_time += 1 - except Empty: break @@ -416,7 +495,7 @@ def solve_for_difficulty_fast( subtensor, wallet, num_processes: Optional[int] = itrs_per_sec = update_interval*num_processes / time_avg # get best solution from each solver using the best_queue - while best_queue.qsize() > 0: + for _ in solvers: try: num, seal = best_queue.get_nowait() if num < best_number: @@ -449,12 +528,12 @@ def get_human_readable(num, suffix="H"): return f"{num:.1f}Y{suffix}" def millify(n: int): - millnames = ['',' K',' M',' B',' T'] + millnames = ['',' K',' M',' B',' T', 'q', 'Q'] n = float(n) millidx = max(0,min(len(millnames)-1, int(math.floor(0 if n == 0 else math.log10(abs(n))/3)))) - return '{:.0f}{}'.format(n / 10**(3 * millidx), millnames[millidx]) + return '{:.4f}{}'.format(n / 10**(3 * millidx), millnames[millidx]) @backoff.on_exception(backoff.constant, Exception, @@ -468,7 +547,8 @@ def get_block_with_retry(subtensor: 'bittensor.Subtensor') -> Tuple[int, int, by raise Exception("Network error. Could not connect to substrate to get block hash") return block_number, difficulty, block_hash -def solve_for_difficulty_fast_cuda( subtensor: 'bittensor.Subtensor', wallet: 'bittensor.Wallet', update_interval: int = 50_000, TPB: int = 512, dev_id: int = 0 ) -> Optional[POWSolution]: + +def solve_for_difficulty_fast_cuda( subtensor: 'bittensor.Subtensor', wallet: 'bittensor.Wallet', update_interval: int = 50_000, TPB: int = 512, dev_id: Union[List[int], int] = 0, use_kernel_launch_optimization: bool = False ) -> Optional[POWSolution]: """ Solves the registration fast using CUDA Args: @@ -480,79 +560,138 @@ def solve_for_difficulty_fast_cuda( subtensor: 'bittensor.Subtensor', wallet: 'b The number of nonces to try before checking for more blocks TPB: int The number of threads per block. CUDA param that should match the GPU capability - dev_id: int - The CUDA device ID to execute the registration on + dev_id: Union[List[int], int] + The CUDA device IDs to execute the registration on, either a single device or a list of devices """ - if not torch.cuda.is_available(): - raise Exception("CUDA not available") + if isinstance(dev_id, int): + dev_id = [dev_id] + elif dev_id is None: + dev_id = [0] if update_interval is None: update_interval = 50_000 - - block_number, difficulty, block_hash = get_block_with_retry(subtensor) - block_bytes = block_hash.encode('utf-8')[2:] - - nonce = 0 + + if not torch.cuda.is_available(): + raise Exception("CUDA not available") + limit = int(math.pow(2,256)) - 1 - start_time = time.time() console = bittensor.__console__ status = console.status("Solving") + + # Set mp start to use spawn so CUDA doesn't complain + multiprocessing.set_start_method('spawn') + + curr_block = multiprocessing.Array('h', 64, lock=True) # byte array + curr_block_num = multiprocessing.Value('i', 0, lock=True) # int + curr_diff = multiprocessing.Array('Q', [0, 0], lock=True) # [high, low] + + def update_curr_block(block_number: int, block_bytes: bytes, diff: int, lock: multiprocessing.Lock): + with lock: + curr_block_num.value = block_number + for i in range(64): + curr_block[i] = block_bytes[i] + registration_diff_pack(diff, curr_diff) + + status.start() + + # Establish communication queues + stopEvent = multiprocessing.Event() + stopEvent.clear() + solution_queue = multiprocessing.Queue() + time_queue = multiprocessing.Queue() + check_block = multiprocessing.Lock() + + # Start consumers + num_processes = len(dev_id) + ## Create one consumer per GPU + solvers = [ CUDASolver(i, num_processes, update_interval, time_queue, solution_queue, stopEvent, curr_block, curr_block_num, curr_diff, check_block, limit, dev_id[i], TPB) + for i in range(num_processes) ] + + # Get first block + block_number = subtensor.get_current_block() + difficulty = subtensor.difficulty + block_hash = subtensor.substrate.get_block_hash( block_number ) + while block_hash == None: + block_hash = subtensor.substrate.get_block_hash( block_number ) + block_bytes = block_hash.encode('utf-8')[2:] + old_block_number = block_number + # Set to current block + update_curr_block(block_number, block_bytes, difficulty, check_block) + + # Set new block events for each solver to start + for w in solvers: + w.newBlockEvent.set() + + for w in solvers: + w.start() # start the solver processes - solution = -1 start_time = time.time() - interval_time = start_time + time_since = 0.0 + solution = None + itrs_per_sec = 0 + while not wallet.is_registered(subtensor): + # Wait until a solver finds a solution + try: + solution = solution_queue.get(block=True, timeout=0.15) + if solution is not None: + break + except Empty: + # No solution found, try again + pass - status.start() - while solution == -1 and not wallet.is_registered(subtensor): - solution, seal = solve_cuda(nonce, - update_interval, - TPB, - block_bytes, - block_number, - difficulty, - limit, - dev_id) - - if (solution != -1): - # Attempt to reset CUDA device - reset_cuda() - status.stop() - new_bn = subtensor.get_current_block() - print(f"Found solution for bn: {block_number}; Newest: {new_bn}") - return POWSolution(solution, block_number, difficulty, seal) - - nonce += (TPB * update_interval) - if (nonce >= int(math.pow(2,63))): - nonce = 0 - itrs_per_sec = (TPB * update_interval) / (time.time() - interval_time) - interval_time = time.time() - - block_number, difficulty, block_hash = get_block_with_retry(subtensor) - block_bytes = block_hash.encode('utf-8')[2:] + # check for new block + block_number = subtensor.get_current_block() + if block_number != old_block_number: + old_block_number = block_number + # update block information + block_hash = subtensor.substrate.get_block_hash( block_number) + while block_hash == None: + block_hash = subtensor.substrate.get_block_hash( block_number) + block_bytes = block_hash.encode('utf-8')[2:] + difficulty = subtensor.difficulty + update_curr_block(block_number, block_bytes, difficulty, check_block) + # Set new block events for each solver + for w in solvers: + w.newBlockEvent.set() + + # Get times for each solver + time_total = 0 + num_time = 0 + for _ in solvers: + try: + time_ = time_queue.get_nowait() + time_total += time_ + num_time += 1 + + except Empty: + break + + if num_time > 0: + time_avg = time_total / num_time + itrs_per_sec = TPB*update_interval*num_processes / time_avg + time_since = time.time() - start_time + message = f"""Solving - time spent: {datetime.timedelta(seconds=time.time() - start_time)} - Nonce: [bold white]{nonce}[/bold white] + time spent: {time_since} Difficulty: [bold white]{millify(difficulty)}[/bold white] - Iters: [bold white]{get_human_readable(int(itrs_per_sec), "H")}/s[/bold white] + Iters: [bold white]{get_human_readable(int(itrs_per_sec), 'H')}/s[/bold white] Block: [bold white]{block_number}[/bold white] Block_hash: [bold white]{block_hash.encode('utf-8')}[/bold white]""" status.update(message.replace(" ", "")) - - # exited while, found_solution contains the nonce or wallet is registered - if solution == -1: # didn't find solution - reset_cuda() - status.stop() - return None - else: - reset_cuda() - # Shouldn't get here + # exited while, found_solution contains the nonce or wallet is registered + if solution is not None: + stopEvent.set() # stop all other processes status.stop() - return None -def create_pow( subtensor, wallet, cuda: bool = False, dev_id: int = 0, tpb: int = 256, num_processes: int = None, update_interval: int = None ) -> Optional[Dict[str, Any]]: + return solution + + status.stop() + return None + +def create_pow( subtensor, wallet, cuda: bool = False, dev_id: Union[List[int], int] = 0, tpb: int = 256, num_processes: int = None, update_interval: int = None) -> Optional[Dict[str, Any]]: if cuda: solution: POWSolution = solve_for_difficulty_fast_cuda( subtensor, wallet, dev_id=dev_id, TPB=tpb, update_interval=update_interval ) else: diff --git a/bittensor/utils/register_cuda.py b/bittensor/utils/register_cuda.py index f64f4777b4..086f1f3637 100644 --- a/bittensor/utils/register_cuda.py +++ b/bittensor/utils/register_cuda.py @@ -6,6 +6,9 @@ import numpy as np from Crypto.Hash import keccak +from contextlib import redirect_stdout +import io + def solve_cuda(nonce_start: np.int64, update_interval: np.int64, TPB: int, block_bytes: bytes, bn: int, difficulty: int, limit: int, dev_id: int = 0) -> Tuple[np.int64, bytes]: """ @@ -66,7 +69,6 @@ def create_seal_hash( block_bytes:bytes, nonce:int ) -> bytes: solution = cubit.solve_cuda(TPB, nonce_start, update_interval, upper_bytes, block_bytes, dev_id) # 0 is first GPU seal = None if solution != -1: - print(f"Checking solution: {solution} for bn: {bn}") seal = create_seal_hash(block_bytes, solution) if seal_meets_difficulty(seal, difficulty): return solution, seal @@ -85,3 +87,24 @@ def reset_cuda(): raise ImportError("Please install cubit") cubit.reset_cuda() + +def log_cuda_errors() -> str: + """ + Logs any CUDA errors. + """ + try: + import cubit + except ImportError: + raise ImportError("Please install cubit") + + f = io.StringIO() + with redirect_stdout(f): + cubit.log_cuda_errors() + + s = f.getvalue() + + return s + + + + diff --git a/setup.py b/setup.py index d4a9723bcd..50771f802c 100644 --- a/setup.py +++ b/setup.py @@ -70,6 +70,6 @@ ], python_requires='>=3.7', extras_requires={ - 'cubit': ['cubit>=1.0.5 @ git+https://github.com/opentensor/cubit.git'] + 'cubit': ['cubit>=1.1.0 @ git+https://github.com/opentensor/cubit.git'] } ) diff --git a/tests/integration_tests/test_subtensor.py b/tests/integration_tests/test_subtensor.py index 8c1ae1967e..9d84284329 100644 --- a/tests/integration_tests/test_subtensor.py +++ b/tests/integration_tests/test_subtensor.py @@ -16,18 +16,19 @@ # DEALINGS IN THE SOFTWARE. -import multiprocessing -from unittest.mock import patch +import random +import time +import unittest +from queue import Empty as QueueEmpty +from unittest.mock import MagicMock, patch + import bittensor import pytest -import unittest -import time -import random -from unittest.mock import MagicMock +from bittensor._subtensor.subtensor_mock import mock_subtensor from bittensor.utils.balance import Balance -from bittensor.utils import Solver, update_curr_block from substrateinterface import Keypair -from bittensor._subtensor.subtensor_mock import mock_subtensor + + class TestSubtensor(unittest.TestCase): def setUp(self): self.subtensor = bittensor.subtensor( network = 'nobunaga' ) @@ -404,8 +405,8 @@ def process_events(self): with patch('bittensor.Subtensor.difficulty'): # patch solution queue to return None with patch('multiprocessing.queues.Queue.get', return_value=None) as mock_queue_get: - # patch time queue size check - with patch('multiprocessing.queues.Queue.qsize', return_value=0): + # patch time queue get to raise Empty exception + with patch('multiprocessing.queues.Queue.get_nowait', side_effect=QueueEmpty) as mock_queue_get_nowait: wallet = bittensor.wallet(_mock=True) wallet.is_registered = MagicMock( side_effect=is_registered_return_values ) @@ -491,6 +492,46 @@ def process_events(self): assert self.subtensor.register(wallet=wallet,) == False assert bittensor.utils.create_pow.call_count == 3 + def test_registration_stale_then_continue( self ): + # verifty that after a stale solution, the solve will continue without exiting + + class ExitEarly(Exception): + pass + + mock_not_stale = MagicMock( + side_effect = [False, True] + ) + + mock_substrate_enter = MagicMock( + side_effect=ExitEarly() + ) + + mock_subtensor_self = MagicMock( + neuron_for_pubkey = MagicMock( return_value = MagicMock(is_null = True) ), # not registered + substrate=MagicMock( + __enter__ = mock_substrate_enter + ) + ) + + mock_wallet = MagicMock() + + mock_create_pow = MagicMock( + return_value = MagicMock() + ) + + + with patch('bittensor.utils.create_pow', mock_create_pow): + with patch('bittensor.utils.POWNotStale', mock_not_stale): + # should create a pow and check if it is stale + # then should create a new pow and check if it is stale + # then should enter substrate and exit early because of test + with pytest.raises(ExitEarly): + bittensor.Subtensor.register(mock_subtensor_self, mock_wallet) + assert mock_create_pow.call_count == 2 # must try another pow after stale + assert mock_not_stale.call_count == 2 + assert mock_substrate_enter.call_count == 1 # only tries to submit once, then exits + + def test_subtensor_mock(): mock_subtensor.kill_global_mock_process() sub = bittensor.subtensor(_mock=True) @@ -575,4 +616,4 @@ def test_subtensor_mock_functions(): if __name__ == "__main__": sub = TestSubtensor() sub.setUp() - sub.test_registration_partly_failed() \ No newline at end of file + sub.test_registration_partly_failed() diff --git a/tests/unit_tests/bittensor_tests/utils/test_utils.py b/tests/unit_tests/bittensor_tests/utils/test_utils.py index 5d0643bc08..fb748013fe 100644 --- a/tests/unit_tests/bittensor_tests/utils/test_utils.py +++ b/tests/unit_tests/bittensor_tests/utils/test_utils.py @@ -10,6 +10,7 @@ import random import torch import multiprocessing +from types import SimpleNamespace from sys import platform from substrateinterface.base import Keypair @@ -346,6 +347,58 @@ def test_pow_not_stale_diff_block_number_too_old(self): assert not bittensor.utils.POWNotStale(mock_subtensor, mock_solution) +def test_pow_called_for_cuda(): + class MockException(Exception): + pass + mock_compose_call = MagicMock(side_effect=MockException) + + mock_subtensor = bittensor.subtensor(_mock=True) + mock_subtensor.neuron_for_pubkey=MagicMock(is_null=True) + mock_subtensor.substrate = MagicMock( + __enter__= MagicMock(return_value=MagicMock( + compose_call=mock_compose_call + )), + __exit__ = MagicMock(return_value=None), + ) + + mock_wallet = SimpleNamespace( + hotkey=SimpleNamespace( + ss58_address='' + ), + coldkeypub=SimpleNamespace( + ss58_address='' + ) + ) + + mock_result = { + "block_number": 1, + 'nonce': random.randint(0, pow(2, 32)), + 'work': b'\x00' * 64, + } + + with patch('bittensor.utils.POWNotStale', return_value=True) as mock_pow_not_stale: + with patch('torch.cuda.is_available', return_value=True) as mock_cuda_available: + with patch('bittensor.utils.create_pow', return_value=mock_result) as mock_create_pow: + with patch('bittensor.utils.hex_bytes_to_u8_list', return_value=b''): + + # Should exit early + with pytest.raises(MockException): + mock_subtensor.register(mock_wallet, cuda=True, prompt=False) + + mock_pow_not_stale.assert_called_once() + mock_create_pow.assert_called_once() + mock_cuda_available.assert_called_once() + + call0 = mock_pow_not_stale.call_args + assert call0[0][0] == mock_subtensor + assert call0[0][1] == mock_result + + mock_compose_call.assert_called_once() + call1 = mock_compose_call.call_args + assert call1[1]['call_function'] == 'register' + call_params = call1[1]['call_params'] + assert call_params['nonce'] == mock_result['nonce'] + if __name__ == "__main__": test_solve_for_difficulty_fast_registered_already() \ No newline at end of file