diff --git a/bittensor/_axon/axon_impl.py b/bittensor/_axon/axon_impl.py index f059ac891a..cd171f83cd 100644 --- a/bittensor/_axon/axon_impl.py +++ b/bittensor/_axon/axon_impl.py @@ -111,9 +111,6 @@ def __init__( self.synapse_timeouts = synapse_timeouts self.prometheus_level = prometheus_level self.stats = self._init_stats() - self.started = None - self.optimizer_step = None - self.started = None # -- Priority @@ -250,6 +247,8 @@ def finalize_codes_stats_and_logs( message = None): # === Logging request.synapses [ index ].return_code = synapse_codes[ index ] # Set synapse wire proto codes. request.synapses [ index ].message = synapse_messages[ index ] # Set synapse wire proto message + if synapse_is_response [index]: + self.update_stats_for_request(request,synapse_codes[ index ]) bittensor.logging.rpc_log ( axon = True, forward = True, @@ -482,6 +481,7 @@ def finalize_codes_stats_and_logs(): # === Logging request.synapses [ index ].return_code = synapse_codes[ index ] # Set synapse wire proto codes. request.synapses [ index ].message = synapse_messages[ index ] # Set synapse wire proto message + bittensor.logging.rpc_log ( axon = True, forward = False, @@ -685,58 +685,7 @@ def default_forward_callback(self, inputs_x:torch.FloatTensor, synapses=[], hotk return response_tensors, response_codes, response_messages def default_backward_callback(self, inputs_x:torch.FloatTensor, grads_dy:torch.FloatTensor, synapses=[] ): - """ - The default backward callback when no callback is attached: Is used to call specific synapse functions - - Args: - inputs_x (:obj:`torch.FloatTensor`, `required`): - The inputs that will be passed to the synapse functions - grads_dy (:obj:`torch.FloatTensor`, `required`): - The gradients that will be passed to the synapse functions - synapses (:obj: list of bittensor.proto.SynapseArgs, 'Optional') - The proto message that contains additional args for individual synapse functions - - Returns: - response_tensors: (:obj: list of bittensor.proto.Tensor, `required`): - serialized tensor response from the nucleus call or None. - response_codes: (:obj: list of bittensor.proto.ReturnCode, `required`) - return code associated with forward call i.e. Success of Timeout. - response_messages: (:obj: list of strings, `required`) - return message associated with synapse call - """ - # --- initialize response variables --- - response_tensors = [] - response_codes = [] - response_messages = [] - - # --- calling attached synapses --- - with torch.enable_grad() and torch.autograd.set_detect_anomaly(True): - for index, synapse in enumerate(synapses): - try: - if synapse.synapse_type in self.synapse_callbacks and self.synapse_callbacks[synapse.synapse_type] != None: - message, model_output, response_tensor = self.synapse_callbacks[synapse.synapse_type](inputs_x[index], synapse) - torch.autograd.backward ( - tensors = [ response_tensor ], - grad_tensors = [ grads_dy[index] ], - retain_graph=True - ) - response_tensors.append(None) - response_codes.append(bittensor.proto.ReturnCode.Success) - response_messages.append('Success') - else: - response_tensors.append(None) - response_codes.append(bittensor.proto.ReturnCode.NotImplemented) - response_messages.append('Not Implemented') - except Exception as e: - # --- Exception Hit in Synapse --- - response_tensors.append(None) - response_codes.append(bittensor.proto.ReturnCode.UnknownException) - response_messages.append(str(e)) - - if self.optimizer_step != None: - self.optimizer_step() - - return response_tensors, response_codes, response_messages + raise Exception('No Backward Function Attached') def attach_forward_callback(self, forward_callback: Callable[ [str, torch.Tensor, int], torch.Tensor ]): """ Assigns the forward_callback. @@ -832,88 +781,53 @@ def stop(self) -> 'Axon': return self - def check(self): - r""" Checks axon's forward and backward callbacks - """ - pubkey = self.wallet.hotkey.ss58_address - if self.forward_callback != None: - bittensor.axon.check_forward_callback(self.forward_callback,index,pubkey) - - if self.backward_callback != None: - bittensor.axon.check_backward_callback(backward,index,pubkey) - return self - def _init_stats(self): return SimpleNamespace( - # Queries per second. - qps = stat_utils.EventsPerSecondRollingAverage( 0, 0.01 ), # Total requests. total_requests = 0, - # Total bytes recieved per second. - total_in_bytes = 0, - # Total bytes responded per second. - total_out_bytes = 0, - # Bytes recieved per second. - avg_in_bytes_per_second = stat_utils.AmountPerSecondRollingAverage( 0, 0.01 ), - # Bytes responded per second. - avg_out_bytes_per_second = stat_utils.AmountPerSecondRollingAverage( 0, 0.01 ), + # Total Codes. + total_codes = { + bittensor.proto.ReturnCode.Name(1):0, + bittensor.proto.ReturnCode.Name(2):0, + }, + # Total Successes. + total_successes = 0, # Requests per pubkey. requests_per_pubkey = {}, # Success per pubkey. successes_per_pubkey = {}, - # Query time per pubkey. - query_times_per_pubkey = {}, - # Queries per second per pubkey. - qps_per_pubkey = {}, # Codes recieved per pubkey. - codes_per_pubkey = {}, - # Bytes recieved per pubkey. - avg_in_bytes_per_pubkey = {}, - # Bytes sent per pubkey. - avg_out_bytes_per_pubkey = {} + codes_per_pubkey = {} ) - #TODO: Replace/update axon and dendrite stats - def update_stats_for_request(self, request, response, time, code): + def update_stats_for_request(self, request, code): r""" Updates statistics for this request and response. Args: requests ( bittensor.proto.TensorMessage, `required`): The request. - response ( bittensor.proto.TensorMessage, `required`): - The response. time (:type:`float`, `required`): Length of call in seconds. code (:obj:`bittensor.proto.ReturnCode, `required`) Return code associated with the call i.e. Success of Timeout. """ - self.stats.qps.event() self.stats.total_requests += 1 - self.stats.total_in_bytes += sys.getsizeof(request) - self.stats.total_out_bytes += sys.getsizeof(response) - self.stats.avg_in_bytes_per_second.event( float(sys.getsizeof(request)) ) - self.stats.avg_out_bytes_per_second.event( float(sys.getsizeof(response)) ) pubkey = request.hotkey - if pubkey not in self.stats.requests_per_pubkey: - self.stats.requests_per_pubkey[ pubkey ] = 0 - self.stats.successes_per_pubkey[ pubkey ] = 0 - self.stats.query_times_per_pubkey[ pubkey ] = stat_utils.AmountPerSecondRollingAverage(0, 0.05) - self.stats.qps_per_pubkey[ pubkey ] = stat_utils.EventsPerSecondRollingAverage(0, 0.05) - self.stats.codes_per_pubkey[ pubkey ] = dict([(k,0) for k in bittensor.proto.ReturnCode.keys()]) - self.stats.avg_in_bytes_per_pubkey[ pubkey ] = stat_utils.AmountPerSecondRollingAverage(0, 0.01) - self.stats.avg_out_bytes_per_pubkey[ pubkey ] = stat_utils.AmountPerSecondRollingAverage(0, 0.01) + + self.stats.requests_per_pubkey.setdefault(pubkey, 0) + self.stats.successes_per_pubkey.setdefault(pubkey, 0) + self.stats.codes_per_pubkey.setdefault(pubkey, {}) + self.stats.total_codes.setdefault(bittensor.proto.ReturnCode.Name( code ), 0) # Add values. self.stats.requests_per_pubkey[ pubkey ] += 1 self.stats.successes_per_pubkey[ pubkey ] += 1 if code == 1 else 0 - self.stats.query_times_per_pubkey[ pubkey ].event( float(time) ) - self.stats.avg_in_bytes_per_pubkey[ pubkey ].event( float(sys.getsizeof(request)) ) - self.stats.avg_out_bytes_per_pubkey[ pubkey ].event( float(sys.getsizeof(response)) ) - self.stats.qps_per_pubkey[ pubkey ].event() - try: - if bittensor.proto.ReturnCode.Name( code ) in self.stats.codes_per_pubkey[ pubkey ].keys(): - self.stats.codes_per_pubkey[ pubkey ][bittensor.proto.ReturnCode.Name( code )] += 1 - except: - pass + self.stats.total_successes += 1 if code == 1 else 0 + + self.stats.codes_per_pubkey[ pubkey ].setdefault(bittensor.proto.ReturnCode.Name( code ), 0) + self.stats.codes_per_pubkey[ pubkey ][bittensor.proto.ReturnCode.Name( code )] += 1 + self.stats.total_codes[bittensor.proto.ReturnCode.Name( code )] += 1 + + def to_dataframe ( self, metagraph ): r""" Return a stats info as a pandas dataframe indexed by the metagraph or pubkey if not existend. @@ -926,7 +840,7 @@ def to_dataframe ( self, metagraph ): # Reindex the pubkey to uid if metagraph is present. try: index = [ metagraph.hotkeys.index(pubkey) for pubkey in self.stats.requests_per_pubkey.keys() if pubkey in metagraph.hotkeys ] - columns = [ 'axon_n_requested', 'axon_n_success', 'axon_query_time','axon_avg_inbytes','axon_avg_outbytes', 'axon_qps' ] + columns = [ 'axon_n_requested', 'axon_n_success' ] dataframe = pandas.DataFrame(columns = columns, index = index) for pubkey in self.stats.requests_per_pubkey.keys(): if pubkey in metagraph.hotkeys: @@ -934,41 +848,10 @@ def to_dataframe ( self, metagraph ): dataframe.loc[ uid ] = pandas.Series( { 'axon_n_requested': int(self.stats.requests_per_pubkey[pubkey]), 'axon_n_success': int(self.stats.requests_per_pubkey[pubkey]), - 'axon_query_time': float(self.stats.query_times_per_pubkey[pubkey].get()), - 'axon_avg_inbytes': float(self.stats.avg_in_bytes_per_pubkey[pubkey].get()), - 'axon_avg_outbytes': float(self.stats.avg_out_bytes_per_pubkey[pubkey].get()), - 'axon_qps': float(self.stats.qps_per_pubkey[pubkey].get()) } ) dataframe['uid'] = dataframe.index return dataframe except Exception as e: bittensor.logging.error(prefix='failed axon.to_dataframe()', sufix=str(e)) - return pandas.DataFrame() - - def to_wandb( self ): - r""" Return a dictionary of axon stat info for wandb logging - Args: - metagraph: (bittensor.Metagraph): - If not None, indexes the wandb data using int uids rather than string pubkeys. - Return: - wandb_info (:obj:`Dict`) - """ - try: - avg_query_time = 0.0 - for pubkey in self.stats.query_times_per_pubkey: - avg_query_time += self.stats.query_times_per_pubkey[pubkey].get() / len( self.stats.query_times_per_pubkey ) - # ---- Axon summary for wandb - wandb_data = { - 'axon/qps': self.stats.qps.get(), - 'axon/avg_query_time': avg_query_time, - 'axon/total_requests': self.stats.total_requests, - 'axon/total_in_bytes' : self.stats.total_in_bytes, - 'axon/total_out_bytes' : self.stats.total_out_bytes, - 'axon/avg_in_bytes_per_second' : self.stats.avg_in_bytes_per_second.get(), - 'axon/avg_out_bytes_per_second' : self.stats.avg_out_bytes_per_second.get(), - } - return wandb_data - except Exception as e: - bittensor.logging.error(prefix='failed during axon.to_wandb()', sufix=str(e)) - return {} \ No newline at end of file + return pandas.DataFrame() \ No newline at end of file diff --git a/bittensor/_neuron/text/core_server/__init__.py b/bittensor/_neuron/text/core_server/__init__.py index da67f28133..e8a7141460 100644 --- a/bittensor/_neuron/text/core_server/__init__.py +++ b/bittensor/_neuron/text/core_server/__init__.py @@ -24,9 +24,26 @@ import bittensor import os - +import torch from .nucleus_impl import server -from .run import serve +from prometheus_client import Counter, Gauge, Histogram, Summary, Info, CollectorRegistry +from threading import Lock +from loguru import logger; logger = logger.opt(colors=True) +import time + +from datetime import datetime,timedelta +import wandb +import pandas + +# Torch +import torch +import torch.nn.functional as F +from torch.nn.utils.rnn import pad_sequence +from torch.nn.utils import clip_grad_norm_ + +from rich import print +from rich.console import Console +from rich.style import Style class neuron: r""" @@ -54,9 +71,9 @@ class neuron: seq2seq (:obj:bittensor.metagraph, `optional`): seq2seq synapse control synapse_list (:obj:list of int, `optional`): - + - Examples:: + Examples:: >>> subtensor = bittensor.subtensor(network='nakamoto') >>> server = bittensor.neuron.text.core_server.neuron(subtensor=subtensor) >>> server.run() @@ -68,13 +85,16 @@ def __init__( wallet: 'bittensor.wallet' = None, axon: 'bittensor.axon' = None, metagraph: 'bittensor.metagraph' = None, + model: 'bittensor.neurons.text.core_server.server' = None, lasthidden = None, causallm = None, causallmnext = None, seq2seq = None, synapse_list = None, ): - if config == None: config = server.config() + if config is None: + config = server.config() + config = config; if synapse_list != None: @@ -101,36 +121,47 @@ def __init__( config.neuron.seq2seq = seq2seq if seq2seq != None else config.neuron.seq2seq self.check_config( config ) + self.config = config + bittensor.logging ( config = config, logging_dir = config.neuron.full_path, ) + # Init prometheus. # By default we pick the prometheus port to be axon.port - 1000 so that we can match port to server. bittensor.prometheus ( config = config, port = config.prometheus.port if config.axon.port == bittensor.defaults.axon.port else config.axon.port - 1000 ) - - self.model = server(config = config) - self.config = config + # --- Setup prometheus summaries. + # These will not be posted if the user passes --prometheus.level OFF + registry = CollectorRegistry() + self.prometheus_counters = Counter('neuron_counters', 'Counter sumamries for the running server-miner.', ['neuron_counters_name'], registry=registry) + self.prometheus_guages = Gauge('neuron_guages', 'Guage sumamries for the running server-miner.', ['neuron_guages_name'], registry=registry) + self.prometheus_info = Info('neuron_info', "Info sumamries for the running server-miner.", registry=registry) self.config.to_prometheus() - self.subtensor = subtensor - self.wallet = wallet + self.mutex = Lock() + self.model = server(config = config).to(config.neuron.device) if model == None else model + self.subtensor = bittensor.subtensor(config = config) if subtensor == None else subtensor + self.wallet = bittensor.wallet( config = config ) if wallet == None else wallet + self.metagraph = bittensor.metagraph ( config = config, subtensor = subtensor) if metagraph == None else metagraph + self.timecheck_dicts = {bittensor.proto.RequestType.FORWARD:{}, bittensor.proto.RequestType.BACKWARD:{}} + if axon == None: + axon = bittensor.axon( + config = config, + wallet = wallet, + synapse_checks=self.synapse_check, + synapse_last_hidden = self.forward_hidden_state if self.model.config.neuron.lasthidden else None, + synapse_causal_lm = self.forward_casual_lm if self.model.config.neuron.causallm else None, + synapse_causal_lm_next = self.forward_casual_lm_next if self.model.config.neuron.causallmnext else None, + synapse_seq_2_seq = self.forward_generate if self.model.config.neuron.seq2seq else None , + blacklist = self.blacklist if not self.model.config.neuron.disable_blacklist else None, + priority = self.priority if not self.model.config.neuron.disable_priority else None, + ) self.axon = axon - self.metagraph = metagraph - - def run(self): - serve( - self.config, - self.model, - subtensor = self.subtensor, - wallet = self.wallet, - axon = self.axon, - metagraph = self.metagraph, - ) - + self.query_data = {} @classmethod def config(cls): @@ -152,3 +183,428 @@ def check_config( config: 'bittensor.Config' ): config.neuron.full_path = os.path.expanduser(full_path) if not os.path.exists(config.neuron.full_path): os.makedirs(config.neuron.full_path) + + def run( + self, + ): + self.config.to_defaults() + + # Load/Create our bittensor wallet. + self.wallet.reregister(subtensor=self.subtensor) + + + self.metagraph.load().sync().save() + + # Create our optimizer. + optimizer = torch.optim.SGD( + [ {"params": self.model.parameters()} ], + lr = self.config.neuron.learning_rate, + momentum = self.config.neuron.momentum, + ) + + self.prometheus_guages.labels( 'model_size_params' ).set( sum(p.numel() for p in self.model.parameters()) ) + self.prometheus_guages.labels( 'model_size_bytes' ).set( sum(p.element_size() * p.nelement() for p in self.model.parameters()) ) + self.prometheus_info.info ({ + 'type': "core_server", + 'uid': str(self.metagraph.hotkeys.index( self.wallet.hotkey.ss58_address )), + 'network': self.config.subtensor.network, + 'coldkey': str(self.wallet.coldkeypub.ss58_address), + 'hotkey': str(self.wallet.hotkey.ss58_address), + }) + + # Create our axon server and subscribe it to the network. + self.axon.start().serve(subtensor=self.subtensor) + self.axon.attach_backward_callback(self.backward_callback) + + + # Training Data + if self.config.neuron.local_train: + self.dataset = bittensor.dataset(config=self.config) + self.dataset.set_data_size(10, 64) + data = next(self.dataset) + + # load our old model + if not self.config.neuron.restart : + self.model.load(self.config.neuron.full_path) + + if self.config.wandb.api_key != 'default': + # --- Init Wandb. + bittensor.wandb( + config = self.config, + cold_pubkey = self.wallet.coldkeypub.ss58_address, + hot_pubkey = self.wallet.hotkey.ss58_address, + root_dir = self.config.neuron.full_path + ) + + last_set_block = self.subtensor.get_current_block() + blocks_per_epoch = self.subtensor.blocks_per_epoch if self.config.neuron.blocks_per_epoch == -1 else self.config.neuron.blocks_per_epoch + blocks_per_set_weights = self.subtensor.blocks_per_epoch if self.config.neuron.blocks_per_set_weights == -1 else self.config.neuron.blocks_per_set_weights + epoch_starting_successes = self.axon.stats.total_successes + epoch_starting_requests = self.axon.stats.total_requests + # --- Run Forever. + while True: + iteration = 0 + local_data = {} + self.query_data = {} + + nn = self.subtensor.neuron_for_pubkey(self.wallet.hotkey.ss58_address) + uid = self.metagraph.hotkeys.index( self.wallet.hotkey.ss58_address ) + current_block = self.subtensor.get_current_block() + end_block = current_block + self.config.neuron.blocks_per_epoch + + if self.config.neuron.local_train: + # --- Training step. + while end_block >= current_block: + if current_block != self.subtensor.get_current_block() and self.axon.priority_threadpool.is_empty: + with self.mutex: + logger.info(f'local training\titeration: {iteration}\tstart') + loss, _ = self.model( next(self.dataset).to(self.model.device) ) + if iteration > 0 : + losses += loss + else: + losses = loss + iteration += 1 + current_block = self.subtensor.get_current_block() + logger.info(f'local training\titeration: {iteration}\tloss: {loss}') + else: + time.sleep(1) + + if iteration != 0: + (losses/iteration).backward() + + else: + while end_block > current_block: + time.sleep(12) + current_block = self.subtensor.get_current_block() + + # --- Update parameters + if (self.config.neuron.local_train and iteration > 0) or (self.config.neuron.remote_train and self.model.backward_gradients_count > 0): + # Custom learning rate + if self.model.backward_gradients_count > 0: + optimizer.param_groups[0]['lr'] = 0.1/(self.model.backward_gradients_count) + else: + optimizer.param_groups[0]['lr'] = 0.1 + + logger.info('Optmization Started') + with self.mutex: + clip_grad_norm_(self.model.parameters(), 1.0) + optimizer.step() + optimizer.zero_grad() + logger.info('Optimization Successful: Model updated') + + if (self.config.neuron.local_train and iteration > 0): + local_data = {'local/loss': losses.detach().item() / iteration} + + if local_data['local/loss'] < self.model.best_loss: + self.model.best_loss = local_data['local/loss'] + self.model.save(self.config.neuron.full_path) + + # Save it only when it gives a low average loss over a large sample size (config.neuron.num_remote_loss), default to 20. + elif (self.config.neuron.remote_train and len(self.model.remote_losses) >= self.config.neuron.num_remote_loss): + local_data = {'local/remote_loss': sum(self.model.remote_losses) / len(self.model.remote_losses)} + + if local_data['local/remote_loss'] < self.model.best_remote_loss: + self.model.best_remote_loss = local_data['local/remote_loss'] + self.model.save(self.config.neuron.full_path) + + self.model.remote_losses = [] + + self.model.backward_gradients_count = 0 + + data = { + 'stake': nn.stake, + 'rank': nn.rank, + 'trust': nn.trust, + 'consensus': nn.consensus, + 'incentive': nn.incentive, + 'emission': nn.emission, + } + + if self.config.wandb.api_key != 'default': + + df = pandas.concat( [ + bittensor.utils.indexed_values_to_dataframe( prefix = 'w_i_{}'.format(nn.uid), index = self.metagraph.uids, values = self.metagraph.W[:, uid] ), + self.axonaxon.to_dataframe( metagraph = self.metagraph ), + ], axis = 1) + df['uid'] = df.index + wandb_info_axon = self.axon.to_wandb() + wandb.log( { **data, **wandb_info_axon, **local_data }, step = current_block ) + wandb.log( { 'stats': wandb.Table( dataframe = df ) }, step = current_block ) + + # === Prometheus logging. + self.prometheus_guages.labels("stake").set( nn.stake ) + self.prometheus_guages.labels("rank").set( nn.rank ) + self.prometheus_guages.labels("trust").set( nn.trust ) + self.prometheus_guages.labels("consensus").set( nn.consensus ) + self.prometheus_guages.labels("incentive").set( nn.incentive ) + self.prometheus_guages.labels("emission").set( nn.emission ) + + print(f"[white not bold]{datetime.now():%Y-%m-%d %H:%M:%S}[/white not bold]{' ' * 4} | " + f"{f'[magenta dim not bold]#{current_block}[/magenta dim not bold]'.center(16 + len('[magenta dim not bold][/magenta dim not bold]'))} | " + f'[green not bold]{current_block - last_set_block}[/green not bold]/' + f'[white not bold]{blocks_per_set_weights}[/white not bold] [dim]blocks/epoch[/dim] | ' + f'[bright_green not bold]{self.axon.stats.total_successes - epoch_starting_successes}[/bright_green not bold]' + f'[white]/{self.axon.stats.total_requests - epoch_starting_requests}[/white] ' + f'[dim]Epoch Success [/dim]|' + f'[dim][green] {self.axon.stats.total_successes}[/green]' + f'[white]/{self.axon.stats.total_requests}[/white]' + f' Total Success [/dim]|' + f'[dim white not bold][yellow] {self.axon.stats.total_codes[bittensor.proto.ReturnCode.Name(2)]}[/yellow]' + f'[white]/{self.axon.stats.total_requests}[/white]' + f' Timeout [/dim white not bold]|' + f'[dim white not bold][red] {self.axon.stats.total_requests - self.axon.stats.total_successes - self.axon.stats.total_codes[bittensor.proto.ReturnCode.Name(2)]}[/red]' + f'[white]/{self.axon.stats.total_requests}[/white]' + f' Error [/dim white not bold]|' + f'[dim white not bold] [green]{nn.stake:.4}[/green] Stake [/dim white not bold]' + f'[dim white not bold]| [yellow]{nn.trust:.3}[/yellow] Trust [/dim white not bold]' + f'[dim white not bold]| [green]{nn.incentive:.3}[/green] Incentive [/dim white not bold]') + + if current_block - last_set_block > blocks_per_set_weights: + self.metagraph.sync() + last_set_block = current_block + epoch_starting_successes = self.axon.stats.total_successes + epoch_starting_requests = self.axon.stats.total_requests + + if not self.config.neuron.no_set_weights: + try: + # Set self weights to maintain activity. + # --- query the chain for the most current number of peers on the network + chain_weights = torch.zeros(self.subtensor.n) + chain_weights [ uid ] = 1 + did_set = self.subtensor.set_weights( + uids=torch.arange(0,self.subtensor.n), + weights = chain_weights, + wait_for_inclusion = False, + wallet = self.wallet, + ) + if did_set: + logger.success('Successfully set weights on the chain') + else: + logger.error('Failed to set weights on chain. (Timeout)') + + except Exception as e: + logger.error('Failure setting weights on chain with error: {}', e) + + def synapse_check(self, synapse, hotkey): + """ + Custom synapse function to protect certain synapse functions depending on the stake and weight. + Certain synapses require more compute than others. For instance, TEXT_SEQ_2_SEQ requires a significantly + more commitment by the server than a requeset for TEXT_CAUSAL_LM_NEXT. + + Args: + synapse (:obj:`bittensor.proto.SynapseArgs`, `required`): + The proto message that contains additional args for individual synapse functions + hotkey (:obj:`torch.FloatTensor`, `required`): + The hotkey that sent the request + + """ + ## Uid that sent the request + incoming_uid = self.metagraph.hotkeys.index(hotkey) + if synapse.synapse_type == bittensor.proto.Synapse.SynapseType.TEXT_LAST_HIDDEN_STATE: + + if self.metagraph.S[incoming_uid] < self.config.neuron.lasthidden_stake: + return False + + elif synapse.synapse_type == bittensor.proto.Synapse.SynapseType.TEXT_CAUSAL_LM: + + if self.metagraph.S[incoming_uid] < self.config.neuron.causallm_stake: + return False + + elif synapse.synapse_type == bittensor.proto.Synapse.SynapseType.TEXT_CAUSAL_LM_NEXT: + + if self.metagraph.S[incoming_uid] < self.config.neuron.causallmnext_stake: + return False + + elif synapse.synapse_type == bittensor.proto.Synapse.SynapseType.TEXT_SEQ_2_SEQ: + + if (self.metagraph.S[incoming_uid] < self.config.neuron.seq2seq_stake) and (self.metagraph.S[incoming_uid, self.uid]): + return False + else: + return False + + return True + + def backward_callback(self, inputs_x:torch.FloatTensor, grads_dy:torch.FloatTensor, synapses=[] ): + """ + The default backward callback when no callback is attached: Is used to call specific synapse functions + + Args: + inputs_x (:obj:`torch.FloatTensor`, `required`): + The inputs that will be passed to the synapse functions + grads_dy (:obj:`torch.FloatTensor`, `required`): + The gradients that will be passed to the synapse functions + synapses (:obj: list of bittensor.proto.SynapseArgs, 'Optional') + The proto message that contains additional args for individual synapse functions + + Returns: + response_tensors: (:obj: list of bittensor.proto.Tensor, `required`): + serialized tensor response from the nucleus call or None. + response_codes: (:obj: list of bittensor.proto.ReturnCode, `required`) + return code associated with forward call i.e. Success of Timeout. + response_messages: (:obj: list of strings, `required`) + return message associated with synapse call + """ + # --- initialize response variables --- + response_tensors = [] + response_codes = [] + response_messages = [] + + if not self.config.neuron.remote_train: + return response_tensors, response_codes, response_messages + + # --- calling attached synapses --- + with self.mutex and torch.enable_grad() and torch.autograd.set_detect_anomaly(True): + for index, synapse in enumerate(synapses): + try: + if synapse.synapse_type in self.axon.synapse_callbacks and self.axon.synapse_callbacks[synapse.synapse_type] != None: + message, model_output, response_tensor = self.axon.synapse_callbacks[synapse.synapse_type](inputs_x[index], synapse) + grads_dy_norm = grads_dy[index]/(grads_dy[index].sum() + 0.00001) + torch.autograd.backward ( + tensors = [ response_tensor ], + grad_tensors = [ grads_dy_norm ], + retain_graph=True + ) + # Only consider loss from causal LM next. + if synapse.synapse_type == bittensor.proto.Synapse.SynapseType.TEXT_CAUSAL_LM_NEXT: + self.model.remote_losses.append(model_output.loss) + self.model.remote_losses = self.model.remote_losses[-self.config.neuron.num_remote_loss:] if len(self.model.remote_losses) > self.config.neuron.num_remote_loss else self.model.remote_losses + self.model.backward_gradients_count += inputs_x[index].size(0) + response_tensors.append(None) + response_codes.append(bittensor.proto.ReturnCode.Success) + response_messages.append('Success') + + else: + response_tensors.append(None) + response_codes.append(bittensor.proto.ReturnCode.NotImplemented) + response_messages.append('Not Implemented') + except Exception as e: + # --- Exception Hit in Synapse --- + response_tensors.append(None) + response_codes.append(bittensor.proto.ReturnCode.UnknownException) + response_messages.append(str(e)) + + return response_tensors, response_codes, response_messages + + + def priority(self, pubkey:str, request_type:bittensor.proto.RequestType, inputs_x) -> float: + r"""Calculates the priority on requests based on stake and size of input + Args: + pubkey ( str, `required`): + The public key of the caller. + inputs_x ( :obj:`torch.Tensor`, `required`): + torch inputs to be forward processed. + request_type ( bittensor.proto.RequestType, `required`): + the request type ('FORWARD' or 'BACKWARD'). + """ + try: + uid = self.metagraph.hotkeys.index(pubkey) + priority = self.metagraph.S[uid].item() + + except: + # zero priority for those who are not registered. + priority = 0 + + return priority + + def forward_generate(self, inputs_x:torch.FloatTensor, synapse, model_output = None): + tokens = self.model.token_remap(inputs_x.to(self.model.device)) + output = self.model.pre_model.generate( + input_ids=tokens['input_ids'], + attention_mask=tokens['attention_mask'], + max_length=max(tokens['input_ids'].shape[1] + 1, synapse.num_to_generate), + num_beams=synapse.num_beams, + no_repeat_ngram_size=synapse.no_repeat_ngram_size, + early_stopping = synapse.early_stopping, + do_sample=synapse.do_sample, + top_p=synapse.top_p, + num_return_sequences=synapse.num_return_sequences, + temperature = synapse.temperature, + repetition_penalty = synapse.repetition_penalty, + length_penalty = synapse.length_penalty, + max_time = synapse.max_time, + num_beam_groups = synapse.num_beam_groups, + ) + raw_texts = [self.model.tokenizer.decode(out) for out in output] + tokens = [self.model.std_tokenizer.encode(raw_text, return_tensors="pt")[:,:synapse.num_to_generate].view(-1) for raw_text in raw_texts] + bittensor_output = pad_sequence(tokens, batch_first=True) + return None, model_output, bittensor_output + + def forward_hidden_state(self, inputs_x:torch.FloatTensor, synapse, model_output = None): + with self.mutex: + message, model_output, hidden = self.model.encode_forward(inputs_x.to(self.model.device), model_output=model_output) + return message, model_output, hidden + + def forward_casual_lm(self, inputs_x:torch.FloatTensor, synapse, model_output = None): + with self.mutex: + message, model_output, logits = self.model.encode_forward_causallm(inputs_x.to(self.model.device), model_output=model_output) + return message, model_output, logits + + def forward_casual_lm_next(self,inputs_x: torch.FloatTensor, synapse, model_output=None): + with self.mutex: + message, model_output, topk_token_phrases = self.model.encode_forward_causallmnext(inputs_x.to(self.model.device), + topk=synapse.topk, + model_output=model_output) + # topk_token_phrases: [sum_b(sum_k(len(phrase_k) + 1)_b)] contains topk token phrases and probabilities + # Compacted 1-D tensor >= batch_size * (2 * topk + 1) + return message, model_output, topk_token_phrases + + + def blacklist(self, pubkey:str, request_type:bittensor.proto.RequestType) -> bool: + r"""Axon security blacklisting, used to blacklist message from low stake members + Args: + pubkey ( str, `required`): + The public key of the caller. + request_type ( bittensor.proto.RequestType, `required`): + the request type ('FORWARD' or 'BACKWARD'). + """ + # Check for registrations + + def registration_check(): + # If we allow non-registered requests return False = not blacklisted. + is_registered = pubkey in self.metagraph.hotkeys + if not is_registered: + if self.config.neuron.blacklist_allow_non_registered: + return False + + self.prometheus_counters.labels("blacklisted.registration").inc() + + raise Exception('Registration blacklist') + + # Check for stake + def stake_check() -> bool: + # Check stake. + uid = self.metagraph.hotkeys.index(pubkey) + if self.metagraph.S[uid].item() < self.config.neuron.blacklist.stake: + self.prometheus_counters.labels("blacklisted.stake").inc() + + raise Exception('Stake blacklist') + return False + + # Check for time + def time_check(): + current_time = datetime.now() + # Only check if the request are forward requests + timecheck = self.timecheck_dicts[request_type] + if pubkey in timecheck.keys(): + prev_time = timecheck[pubkey] + if current_time - prev_time >= timedelta(seconds=self.config.neuron.blacklist.time): + timecheck[pubkey] = current_time + else: + timecheck[pubkey] = current_time + self.prometheus_counters.labels("blacklisted.time").inc() + + raise Exception('Time blacklist') + else: + timecheck[pubkey] = current_time + + return False + + # Black list or not + try: + registration_check() + time_check() + stake_check() + return False + except Exception as e: + self.prometheus_counters.labels("blacklisted").inc() + return True diff --git a/bittensor/_neuron/text/core_server/nucleus_impl.py b/bittensor/_neuron/text/core_server/nucleus_impl.py index b327be91b7..e3c9bd36ed 100644 --- a/bittensor/_neuron/text/core_server/nucleus_impl.py +++ b/bittensor/_neuron/text/core_server/nucleus_impl.py @@ -371,7 +371,6 @@ def _forward(_model_output=model_output): output_hidden_states=True) self.model_output_check(_model_output) pre_logits = _model_output.logits # [batch_size, sequence_len, self.tokenizer.vocab_len] - probs_std = translate_logits_to_probs_std(pre_logits, tokens['offset_mapping'], tokens['offset_mapping_std'], self.tokenizer, self.std_tokenizer, @@ -380,7 +379,6 @@ def _forward(_model_output=model_output): tokens['input_ids'], token_batch) probs_std = probs_std.to(self.device) logits_std = torch.log(probs_std + 1e-40) - #removing the loss calculation for stablity testing original_loss = self.get_loss_fct(pre_logits, tokens['input_ids']).item() translated_loss = self.get_loss_fct(logits_std, token_batch).item() @@ -451,6 +449,7 @@ def _forward(_model_output=model_output): topk_tensor = topk_token_phrases(last_logits, self.tokenizer, topk=topk) # [batch_size, (topk + 1), max_len] original_loss = self.get_loss_fct(_model_output.logits, tokens['input_ids']).item() + message = f'Loss: {original_loss:.2f}' _model_output.loss = original_loss diff --git a/bittensor/_neuron/text/core_server/run.py b/bittensor/_neuron/text/core_server/run.py deleted file mode 100644 index 249b7b7131..0000000000 --- a/bittensor/_neuron/text/core_server/run.py +++ /dev/null @@ -1,480 +0,0 @@ -#!/bin/python3 -# The MIT License (MIT) -# Copyright © 2021 Yuma Rao - -# Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated -# documentation files (the “Software”), to deal in the Software without restriction, including without limitation -# the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, -# and to permit persons to whom the Software is furnished to do so, subject to the following conditions: - -# The above copyright notice and this permission notice shall be included in all copies or substantial portions of -# the Software. - -# THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO -# THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL -# THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION -# OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER -# DEALINGS IN THE SOFTWARE. -""" The Exodus base client. - -Example: - $ python miners/text/template_client.py - -""" -import bittensor -import sys -import time -import datetime -from threading import Lock -from datetime import datetime,timedelta -from loguru import logger; logger = logger.opt(colors=True) -from torch.nn.utils.rnn import pad_sequence - -import wandb -import pandas -# Prometheus -from prometheus_client import Counter, Gauge, Histogram, Summary, Info, CollectorRegistry -# Torch -import torch -import torch.nn.functional as F -from torch.nn.utils import clip_grad_norm_ - -def serve( - config, - model, - subtensor = None, - wallet = None, - axon= None, - metagraph = None, - ): - config.to_defaults() - model= model.to(model.device) - - # Create Subtensor connection - subtensor = bittensor.subtensor(config = config) if subtensor == None else subtensor - - # Load/Create our bittensor wallet. - if wallet == None: - wallet = bittensor.wallet( config = config ).create().reregister(subtensor=subtensor) - else: - wallet.reregister(subtensor=subtensor) - - # Load/Sync/Save our metagraph. - if metagraph == None: - metagraph = bittensor.metagraph ( - subtensor = subtensor - ) - - metagraph.load().sync().save() - - # Create our optimizer. - optimizer = torch.optim.SGD( - [ {"params": model.parameters()} ], - lr = config.neuron.learning_rate, - momentum = config.neuron.momentum, - ) - mutex = Lock() - - # --- Setup prometheus summaries. - # These will not be posted if the user passes --prometheus.level OFF - registry = CollectorRegistry() - prometheus_counters = Counter('neuron_counters', 'Counter sumamries for the running server-miner.', ['neuron_counters_name'], registry=registry) - prometheus_guages = Gauge('neuron_guages', 'Guage sumamries for the running server-miner.', ['neuron_guages_name'], registry=registry) - prometheus_info = Info('neuron_info', "Info sumamries for the running server-miner.", registry=registry) - prometheus_guages.labels( 'model_size_params' ).set( sum(p.numel() for p in model.parameters()) ) - prometheus_guages.labels( 'model_size_bytes' ).set( sum(p.element_size() * p.nelement() for p in model.parameters()) ) - prometheus_info.info ({ - 'type': "core_server", - 'uid': str(metagraph.hotkeys.index( wallet.hotkey.ss58_address )), - 'network': config.subtensor.network, - 'coldkey': str(wallet.coldkeypub.ss58_address), - 'hotkey': str(wallet.hotkey.ss58_address), - }) - - timecheck_dicts = {bittensor.proto.RequestType.FORWARD:{}, bittensor.proto.RequestType.BACKWARD:{}} - n_topk_peer_weights = subtensor.min_allowed_weights - - def priority(pubkey:str, request_type:bittensor.proto.RequestType, inputs_x) -> float: - r"""Calculates the priority on requests based on stake and size of input - Args: - pubkey ( str, `required`): - The public key of the caller. - inputs_x ( :obj:`torch.Tensor`, `required`): - torch inputs to be forward processed. - request_type ( bittensor.proto.RequestType, `required`): - the request type ('FORWARD' or 'BACKWARD'). - """ - try: - uid = metagraph.hotkeys.index(pubkey) - priority = metagraph.S[uid].item() - - except: - # zero priority for those who are not registered. - priority = 0 - - return priority - - def forward_generate( inputs_x:torch.FloatTensor, synapse, model_output = None): - tokens = model.token_remap(inputs_x.to(model.device)) - output = model.pre_model.generate( - input_ids=tokens['input_ids'], - attention_mask=tokens['attention_mask'], - max_length=max(tokens['input_ids'].shape[1] + 1, synapse.num_to_generate), - num_beams=synapse.num_beams, - no_repeat_ngram_size=synapse.no_repeat_ngram_size, - early_stopping = synapse.early_stopping, - do_sample=synapse.do_sample, - top_p=synapse.top_p, - num_return_sequences=synapse.num_return_sequences, - temperature = synapse.temperature, - repetition_penalty = synapse.repetition_penalty, - length_penalty = synapse.length_penalty, - max_time = synapse.max_time, - num_beam_groups = synapse.num_beam_groups, - ) - raw_texts = [model.tokenizer.decode(out) for out in output] - tokens = [model.std_tokenizer.encode(raw_text, return_tensors="pt")[:,:synapse.num_to_generate].view(-1) for raw_text in raw_texts] - bittensor_output = pad_sequence(tokens, batch_first=True) - return None, model_output, bittensor_output - - def forward_hidden_state(inputs_x:torch.FloatTensor, synapse, model_output = None): - with mutex: - message, model_output, hidden = model.encode_forward(inputs_x.to(model.device), model_output=model_output) - return message, model_output, hidden - - def forward_casual_lm(inputs_x:torch.FloatTensor, synapse, model_output = None): - with mutex: - message, model_output, logits = model.encode_forward_causallm(inputs_x.to(model.device), model_output=model_output) - return message, model_output, logits - - def forward_casual_lm_next(inputs_x: torch.FloatTensor, synapse, model_output=None): - with mutex: - message, model_output, topk_token_phrases = model.encode_forward_causallmnext(inputs_x.to(model.device), - topk=synapse.topk, - model_output=model_output) - # topk_token_phrases: [sum_b(sum_k(len(phrase_k) + 1)_b)] contains topk token phrases and probabilities - # Compacted 1-D tensor >= batch_size * (2 * topk + 1) - return message, model_output, topk_token_phrases - - def optimizer_step(): - optimizer.step() - optimizer.zero_grad() - - def blacklist(pubkey:str, request_type:bittensor.proto.RequestType) -> bool: - r"""Axon security blacklisting, used to blacklist message from low stake members - Args: - pubkey ( str, `required`): - The public key of the caller. - request_type ( bittensor.proto.RequestType, `required`): - the request type ('FORWARD' or 'BACKWARD'). - """ - # Check for registrations - - def registration_check(): - # If we allow non-registered requests return False = not blacklisted. - is_registered = pubkey in metagraph.hotkeys - if not is_registered: - if config.neuron.blacklist_allow_non_registered: - return False - - prometheus_counters.labels("blacklisted.registration").inc() - - raise Exception('Registration blacklist') - - # Check for stake - def stake_check() -> bool: - # Check stake. - uid = metagraph.hotkeys.index(pubkey) - if metagraph.S[uid].item() < config.neuron.blacklist.stake: - prometheus_counters.labels("blacklisted.stake").inc() - - raise Exception('Stake blacklist') - return False - - # Check for time - def time_check(): - current_time = datetime.now() - # Only check if the request are forward requests - timecheck = timecheck_dicts[request_type] - if pubkey in timecheck.keys(): - prev_time = timecheck[pubkey] - if current_time - prev_time >= timedelta(seconds=config.neuron.blacklist.time): - timecheck[pubkey] = current_time - else: - timecheck[pubkey] = current_time - prometheus_counters.labels("blacklisted.time").inc() - - raise Exception('Time blacklist') - else: - timecheck[pubkey] = current_time - - return False - - # Black list or not - try: - registration_check() - time_check() - stake_check() - return False - except Exception as e: - prometheus_counters.labels("blacklisted").inc() - return True - - def synapse_check(synapse, hotkey): - """ - Custom synapse function to protect certain synapse functions depending on the stake and weight. - Certain synapses require more compute than others. For instance, TEXT_SEQ_2_SEQ requires a significantly - more commitment by the server than a requeset for TEXT_CAUSAL_LM_NEXT. - - Args: - synapse (:obj:`bittensor.proto.SynapseArgs`, `required`): - The proto message that contains additional args for individual synapse functions - hotkey (:obj:`torch.FloatTensor`, `required`): - The hotkey that sent the request - - """ - ## Uid that sent the request - incoming_uid = metagraph.hotkeys.index(hotkey) - if synapse.synapse_type == bittensor.proto.Synapse.SynapseType.TEXT_LAST_HIDDEN_STATE: - - if metagraph.S[incoming_uid] < config.neuron.lasthidden_stake: - return False - - elif synapse.synapse_type == bittensor.proto.Synapse.SynapseType.TEXT_CAUSAL_LM: - - if metagraph.S[incoming_uid] < config.neuron.causallm_stake: - return False - - elif synapse.synapse_type == bittensor.proto.Synapse.SynapseType.TEXT_CAUSAL_LM_NEXT: - - if metagraph.S[incoming_uid] < config.neuron.causallmnext_stake: - return False - - elif synapse.synapse_type == bittensor.proto.Synapse.SynapseType.TEXT_SEQ_2_SEQ: - - if (metagraph.S[incoming_uid] < config.neuron.seq2seq_stake) and (metagraph.S[incoming_uid, uid]): - return False - else: - return False - - return True - - def backward_callback(inputs_x:torch.FloatTensor, grads_dy:torch.FloatTensor, synapses=[] ): - """ - The default backward callback when no callback is attached: Is used to call specific synapse functions - - Args: - inputs_x (:obj:`torch.FloatTensor`, `required`): - The inputs that will be passed to the synapse functions - grads_dy (:obj:`torch.FloatTensor`, `required`): - The gradients that will be passed to the synapse functions - synapses (:obj: list of bittensor.proto.SynapseArgs, 'Optional') - The proto message that contains additional args for individual synapse functions - - Returns: - response_tensors: (:obj: list of bittensor.proto.Tensor, `required`): - serialized tensor response from the nucleus call or None. - response_codes: (:obj: list of bittensor.proto.ReturnCode, `required`) - return code associated with forward call i.e. Success of Timeout. - response_messages: (:obj: list of strings, `required`) - return message associated with synapse call - """ - # --- initialize response variables --- - response_tensors = [] - response_codes = [] - response_messages = [] - - if not config.neuron.remote_train: - return response_tensors, response_codes, response_messages - - # --- calling attached synapses --- - with mutex and torch.enable_grad() and torch.autograd.set_detect_anomaly(True): - for index, synapse in enumerate(synapses): - try: - if synapse.synapse_type in axon.synapse_callbacks and axon.synapse_callbacks[synapse.synapse_type] != None: - message, model_output, response_tensor = axon.synapse_callbacks[synapse.synapse_type](inputs_x[index], synapse) - grads_dy_norm = grads_dy[index]/(grads_dy[index].sum() + 0.00001) - torch.autograd.backward ( - tensors = [ response_tensor ], - grad_tensors = [ grads_dy_norm ], - retain_graph=True - ) - # Only consider loss from causal LM next. - if synapse.synapse_type == bittensor.proto.Synapse.SynapseType.TEXT_CAUSAL_LM_NEXT: - model.remote_losses.append(model_output.loss) - model.remote_losses = model.remote_losses[-config.neuron.num_remote_loss:] if len(model.remote_losses) > config.neuron.num_remote_loss else model.remote_losses - model.backward_gradients_count += inputs_x[index].size(0) - response_tensors.append(None) - response_codes.append(bittensor.proto.ReturnCode.Success) - response_messages.append('Success') - - else: - response_tensors.append(None) - response_codes.append(bittensor.proto.ReturnCode.NotImplemented) - response_messages.append('Not Implemented') - except Exception as e: - # --- Exception Hit in Synapse --- - response_tensors.append(None) - response_codes.append(bittensor.proto.ReturnCode.UnknownException) - response_messages.append(str(e)) - - return response_tensors, response_codes, response_messages - - # Create our axon server and subscribe it to the network. - if axon == None: - axon = bittensor.axon( - config = config, - wallet = wallet, - synapse_checks=synapse_check, - synapse_last_hidden = forward_hidden_state if model.config.neuron.lasthidden else None, - synapse_causal_lm = forward_casual_lm if model.config.neuron.causallm else None, - synapse_causal_lm_next = forward_casual_lm_next if model.config.neuron.causallmnext else None, - synapse_seq_2_seq = forward_generate if model.config.neuron.seq2seq else None , - blacklist = blacklist if not model.config.neuron.disable_blacklist else None, - priority = priority if not model.config.neuron.disable_priority else None, - ).start().serve(subtensor=subtensor) - - axon.optimizer_step = optimizer_step - axon.attach_backward_callback(backward_callback) - # Training Data - if config.neuron.local_train: - dataset = bittensor.dataset(config=config) - dataset.set_data_size(10, 64) - data = next(dataset) - - # load our old model - if not config.neuron.restart : - model.load(config.neuron.full_path) - - if config.wandb.api_key != 'default': - # --- Init Wandb. - bittensor.wandb( - config = config, - cold_pubkey = wallet.coldkeypub.ss58_address, - hot_pubkey = wallet.hotkey.ss58_address, - root_dir = config.neuron.full_path - ) - - last_set_block = subtensor.get_current_block() - blocks_per_epoch = subtensor.blocks_per_epoch if config.neuron.blocks_per_epoch == -1 else config.neuron.blocks_per_epoch - blocks_per_set_weights = subtensor.blocks_per_epoch if config.neuron.blocks_per_set_weights == -1 else config.neuron.blocks_per_set_weights - - # --- Run Forever. - while True: - iteration = 0 - local_data = {} - nn = subtensor.neuron_for_pubkey(wallet.hotkey.ss58_address) - uid = metagraph.hotkeys.index( wallet.hotkey.ss58_address ) - current_block = subtensor.get_current_block() - end_block = current_block + config.neuron.blocks_per_epoch - if config.neuron.local_train: - # --- Training step. - while end_block >= current_block: - if current_block != subtensor.get_current_block() and axon.priority_threadpool.is_empty: - with mutex: - logger.info(f'local training\titeration: {iteration}\tstart') - loss, _ = model( next(dataset).to(model.device) ) - if iteration > 0 : - losses += loss - else: - losses = loss - iteration += 1 - current_block = subtensor.get_current_block() - logger.info(f'local training\titeration: {iteration}\tloss: {loss}') - else: - time.sleep(1) - - if iteration != 0: - (losses/iteration).backward() - - else: - while end_block >= current_block: - time.sleep(12) - current_block = subtensor.get_current_block() - - # --- Update parameters - if (config.neuron.local_train and iteration > 0) or (config.neuron.remote_train and model.backward_gradients_count > 0): - # Custom learning rate - if model.backward_gradients_count > 0: - optimizer.param_groups[0]['lr'] = 0.1/(model.backward_gradients_count) - else: - optimizer.param_groups[0]['lr'] = 0.1 - - logger.info('Optmization Started') - with mutex: - clip_grad_norm_(model.parameters(), 1.0) - optimizer.step() - optimizer.zero_grad() - logger.info('Optimization Successful: Model updated') - - if (config.neuron.local_train and iteration > 0): - local_data = {'local/loss': losses.detach().item() / iteration} - - if local_data['local/loss'] < model.best_loss: - model.best_loss = local_data['local/loss'] - model.save(config.neuron.full_path) - - # Save it only when it gives a low average loss over a large sample size (config.neuron.num_remote_loss), default to 20. - elif (config.neuron.remote_train and len(model.remote_losses) >= config.neuron.num_remote_loss): - local_data = {'local/remote_loss': sum(model.remote_losses) / len(model.remote_losses)} - - if local_data['local/remote_loss'] < model.best_remote_loss: - model.best_remote_loss = local_data['local/remote_loss'] - model.save(config.neuron.full_path) - - model.remote_losses = [] - - model.backward_gradients_count = 0 - - wandb_data = { - 'stake': nn.stake, - 'rank': nn.rank, - 'trust': nn.trust, - 'consensus': nn.consensus, - 'incentive': nn.incentive, - 'emission': nn.emission, - } - - if config.wandb.api_key != 'default': - - df = pandas.concat( [ - bittensor.utils.indexed_values_to_dataframe( prefix = 'w_i_{}'.format(nn.uid), index = metagraph.uids, values = metagraph.W[:, uid] ), - axon.to_dataframe( metagraph = metagraph ), - ], axis = 1) - df['uid'] = df.index - wandb_info_axon = axon.to_wandb() - wandb.log( { **wandb_data, **wandb_info_axon, **local_data }, step = current_block ) - wandb.log( { 'stats': wandb.Table( dataframe = df ) }, step = current_block ) - - # === Prometheus logging. - prometheus_guages.labels("stake").set( nn.stake ) - prometheus_guages.labels("rank").set( nn.rank ) - prometheus_guages.labels("trust").set( nn.trust ) - prometheus_guages.labels("consensus").set( nn.consensus ) - prometheus_guages.labels("incentive").set( nn.incentive ) - prometheus_guages.labels("emission").set( nn.emission ) - - if current_block - last_set_block > blocks_per_set_weights: - bittensor.__console__.print('[green]Current Status:[/green]', {**wandb_data, **local_data}) - metagraph.sync() - last_set_block = current_block - if not config.neuron.no_set_weights: - try: - bittensor.__console__.print('[green]Current Status:[/green]', {**wandb_data, **local_data}) - # Set self weights to maintain activity. - # --- query the chain for the most current number of peers on the network - chain_weights = torch.zeros(subtensor.n) - chain_weights [ uid ] = 1 - did_set = subtensor.set_weights( - uids=torch.arange(0,subtensor.n), - weights = chain_weights, - wait_for_inclusion = False, - wallet = wallet, - ) - if did_set: - logger.success('Successfully set weights on the chain') - else: - logger.error('Failed to set weights on chain. (Timeout)') - - except Exception as e: - logger.error('Failure setting weights on chain with error: {}', e) diff --git a/tests/unit_tests/bittensor_tests/test_axon.py b/tests/unit_tests/bittensor_tests/test_axon.py index 98ff0ba737..c1686e52d0 100644 --- a/tests/unit_tests/bittensor_tests/test_axon.py +++ b/tests/unit_tests/bittensor_tests/test_axon.py @@ -66,19 +66,6 @@ def test_sign_v1(): def test_sign_v2(): sign_v2(sender_wallet, wallet) -def test_forward_wandb(): - inputs_raw = torch.rand(3, 3, bittensor.__network_dim__) - serializer = bittensor.serializer( serializer_type = bittensor.proto.Serializer.MSGPACK ) - inputs_serialized = serializer.serialize(inputs_raw, modality = bittensor.proto.Modality.TENSOR, from_type = bittensor.proto.TensorType.TORCH) - request = bittensor.proto.TensorMessage( - version = bittensor.__version_as_int__, - tensors=[inputs_serialized] - ) - response, code, synapses = axon._forward( request ) - #axon.update_stats_for_request( request, response, call_time, code ) - print( axon.to_wandb() ) - - def test_forward_not_implemented(): inputs_raw = torch.rand(3, 3) serializer = bittensor.serializer( serializer_type = bittensor.proto.Serializer.MSGPACK ) @@ -705,9 +692,10 @@ def test_backward_grads_shape_error(): def test_backward_response_success_hidden(): - def forward( inputs_x:torch.FloatTensor, synapse, model_output = None): - return None, dict(), torch.zeros( [1, 1, bittensor.__network_dim__], requires_grad=True) - axon.attach_synapse_callback( forward, synapse_type = bittensor.proto.Synapse.SynapseType.TEXT_LAST_HIDDEN_STATE) + def backward( inputs_x:torch.FloatTensor, grads_dy:torch.FloatTensor, synapses): + return [], [1], ['success'] + + axon.attach_backward_callback(backward) inputs_raw = torch.ones(1, 1) grads_raw = torch.zeros(1, 1, bittensor.__network_dim__) synapses = [bittensor.synapse.TextLastHiddenState()] @@ -724,10 +712,10 @@ def forward( inputs_x:torch.FloatTensor, synapse, model_output = None): assert code == bittensor.proto.ReturnCode.Success def test_backward_response_success_causal_lm(): - def forward( inputs_x:torch.FloatTensor, synapse, model_output = None): - return None, dict(), torch.zeros( [1, 1, bittensor.__vocab_size__], requires_grad=True) + def backward( inputs_x:torch.FloatTensor, grads_dy:torch.FloatTensor, synapses): + return [], [1], ['success'] - axon.attach_synapse_callback( forward, synapse_type = bittensor.proto.Synapse.SynapseType.TEXT_CAUSAL_LM) + axon.attach_backward_callback( backward) inputs_raw = torch.ones(1, 1) grads_raw = torch.zeros(1, 1, bittensor.__vocab_size__) synapses = [bittensor.synapse.TextCausalLM()] @@ -744,10 +732,10 @@ def forward( inputs_x:torch.FloatTensor, synapse, model_output = None): assert code == bittensor.proto.ReturnCode.Success def test_backward_response_success_causal_lm_next(): - def forward(inputs_x: torch.FloatTensor, synapse, model_output=None): # [batch_size, (topk + 1), max_len] - return None, dict(), torch.zeros([1, (synapses[0].topk + 1), 1 + 1], requires_grad=True) + def backward( inputs_x:torch.FloatTensor, grads_dy:torch.FloatTensor, synapses): + return [], [1], ['success'] - axon.attach_synapse_callback(forward, synapse_type=bittensor.proto.Synapse.SynapseType.TEXT_CAUSAL_LM_NEXT) + axon.attach_backward_callback( backward) synapses = [bittensor.synapse.TextCausalLMNext()] inputs_raw = torch.ones(1, 1) @@ -967,15 +955,15 @@ def test_grpc_forward_works(): run_test_grpc_forward_works(receiver_version) def run_test_grpc_backward_works(receiver_version): - def forward( inputs_x:torch.FloatTensor, synapse , model_output = None): - return None, dict(), torch.zeros( [3, 3, bittensor.__network_dim__], requires_grad=True) + def backward( inputs_x:torch.FloatTensor, grads_dy:torch.FloatTensor, synapses): + return [], [1], ['success'] axon = bittensor.axon ( port = 7086, ip = '127.0.0.1', wallet = wallet, ) - axon.attach_synapse_callback( forward, synapse_type = bittensor.proto.Synapse.SynapseType.TEXT_LAST_HIDDEN_STATE) + axon.attach_backward_callback( backward) axon.start() channel = grpc.insecure_channel( @@ -1277,5 +1265,5 @@ def test_external_ip_port_set_full_address_internal(self): # test_forward_joint_success() # test_forward_joint_missing_synapse() # test_forward_priority_timeout() - test_forward_priority_2nd_request_timeout() + test_backward_response_success_hidden() # test_forward_joint_faulty_synapse() \ No newline at end of file diff --git a/tests/unit_tests/bittensor_tests/test_neuron.py b/tests/unit_tests/bittensor_tests/test_neuron.py index 626c609528..d48a0ba732 100644 --- a/tests/unit_tests/bittensor_tests/test_neuron.py +++ b/tests/unit_tests/bittensor_tests/test_neuron.py @@ -273,7 +273,7 @@ class TestBlacklist(unittest.TestCase): @staticmethod def construct_config(): - defaults = bittensor.Config() + defaults = bittensor.neurons.core_server.neuron.config() bittensor.subtensor.add_defaults( defaults ) bittensor.dendrite.add_defaults( defaults ) bittensor.axon.add_defaults( defaults ) @@ -284,7 +284,7 @@ def construct_config(): bittensor.prometheus.add_defaults( defaults ) defaults.wandb.api_key = 'test' - defaults.neuron = bittensor.neurons.core_server.neuron.config() + bittensor.neurons.core_server.neuron.check_config(defaults) defaults.neuron.learning_rate = 0.0001 defaults.neuron.momentum = 0.9 defaults.prometheus.level = "OFF" @@ -296,7 +296,7 @@ def exit_early(self, *args, **kwargs): def test_stake_blacklist(self): import sys - sys.setrecursionlimit(100) + sys.setrecursionlimit(200) mock_hotkey = "0x0000000000000000000000000000000000000000" mock_hotkey_1 = "0x0000000000000000000000000000000000000001" @@ -325,15 +325,12 @@ def test_stake_blacklist(self): ) mock_config = self.construct_config() - - mock_config.neuron.blacklist = bittensor.Config() mock_config.neuron.blacklist.stake = 1000 # blacklist if stake is less than 1000 mock_model_config = bittensor.neurons.core_server.server.config() mock_model_config.neuron = MagicMock( disable_blacklist = False ) - mock_model = MagicMock( spec=bittensor.neurons.core_server.server, config=mock_model_config, @@ -342,7 +339,7 @@ def test_stake_blacklist(self): with patch('bittensor.axon.__new__', side_effect=self.exit_early) as mock_new_axon: with patch('bittensor.neurons.core_server.neuron.check_config', return_value=True): with pytest.raises(MockException): - bittensor.neurons.core_server.serve( + bittensor.neurons.core_server.neuron( config=mock_config, model=MagicMock( spec=bittensor.neurons.core_server.server, @@ -354,7 +351,7 @@ def test_stake_blacklist(self): wallet=mock_wallet, axon=None, metagraph=mock_metagraph - ) + ).run() # args, kwargs _, kwargs = mock_new_axon.call_args @@ -368,4 +365,4 @@ def test_stake_blacklist(self): if __name__ == '__main__': - pass + TestBlacklist().test_stake_blacklist() diff --git a/tests/unit_tests/bittensor_tests/test_wandb.py b/tests/unit_tests/bittensor_tests/test_wandb.py index 3c6d384970..599565e460 100644 --- a/tests/unit_tests/bittensor_tests/test_wandb.py +++ b/tests/unit_tests/bittensor_tests/test_wandb.py @@ -17,10 +17,6 @@ import bittensor -def test_axon(): - axon = bittensor.axon() - axon.to_wandb() - def test_dendrite(): dendrite = bittensor.dendrite() dendrite.to_wandb()