Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
171 changes: 27 additions & 144 deletions bittensor/_axon/axon_impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,9 +111,6 @@ def __init__(
self.synapse_timeouts = synapse_timeouts
self.prometheus_level = prometheus_level
self.stats = self._init_stats()
self.started = None
self.optimizer_step = None

self.started = None

# -- Priority
Expand Down Expand Up @@ -250,6 +247,8 @@ def finalize_codes_stats_and_logs( message = None):
# === Logging
request.synapses [ index ].return_code = synapse_codes[ index ] # Set synapse wire proto codes.
request.synapses [ index ].message = synapse_messages[ index ] # Set synapse wire proto message
if synapse_is_response [index]:
self.update_stats_for_request(request,synapse_codes[ index ])
bittensor.logging.rpc_log (
axon = True,
forward = True,
Expand Down Expand Up @@ -482,6 +481,7 @@ def finalize_codes_stats_and_logs():
# === Logging
request.synapses [ index ].return_code = synapse_codes[ index ] # Set synapse wire proto codes.
request.synapses [ index ].message = synapse_messages[ index ] # Set synapse wire proto message

bittensor.logging.rpc_log (
axon = True,
forward = False,
Expand Down Expand Up @@ -685,58 +685,7 @@ def default_forward_callback(self, inputs_x:torch.FloatTensor, synapses=[], hotk
return response_tensors, response_codes, response_messages

def default_backward_callback(self, inputs_x:torch.FloatTensor, grads_dy:torch.FloatTensor, synapses=[] ):
"""
The default backward callback when no callback is attached: Is used to call specific synapse functions

Args:
inputs_x (:obj:`torch.FloatTensor`, `required`):
The inputs that will be passed to the synapse functions
grads_dy (:obj:`torch.FloatTensor`, `required`):
The gradients that will be passed to the synapse functions
synapses (:obj: list of bittensor.proto.SynapseArgs, 'Optional')
The proto message that contains additional args for individual synapse functions

Returns:
response_tensors: (:obj: list of bittensor.proto.Tensor, `required`):
serialized tensor response from the nucleus call or None.
response_codes: (:obj: list of bittensor.proto.ReturnCode, `required`)
return code associated with forward call i.e. Success of Timeout.
response_messages: (:obj: list of strings, `required`)
return message associated with synapse call
"""
# --- initialize response variables ---
response_tensors = []
response_codes = []
response_messages = []

# --- calling attached synapses ---
with torch.enable_grad() and torch.autograd.set_detect_anomaly(True):
for index, synapse in enumerate(synapses):
try:
if synapse.synapse_type in self.synapse_callbacks and self.synapse_callbacks[synapse.synapse_type] != None:
message, model_output, response_tensor = self.synapse_callbacks[synapse.synapse_type](inputs_x[index], synapse)
torch.autograd.backward (
tensors = [ response_tensor ],
grad_tensors = [ grads_dy[index] ],
retain_graph=True
)
response_tensors.append(None)
response_codes.append(bittensor.proto.ReturnCode.Success)
response_messages.append('Success')
else:
response_tensors.append(None)
response_codes.append(bittensor.proto.ReturnCode.NotImplemented)
response_messages.append('Not Implemented')
except Exception as e:
# --- Exception Hit in Synapse ---
response_tensors.append(None)
response_codes.append(bittensor.proto.ReturnCode.UnknownException)
response_messages.append(str(e))

if self.optimizer_step != None:
self.optimizer_step()

return response_tensors, response_codes, response_messages
raise Exception('No Backward Function Attached')

def attach_forward_callback(self, forward_callback: Callable[ [str, torch.Tensor, int], torch.Tensor ]):
""" Assigns the forward_callback.
Expand Down Expand Up @@ -832,88 +781,53 @@ def stop(self) -> 'Axon':

return self

def check(self):
r""" Checks axon's forward and backward callbacks
"""
pubkey = self.wallet.hotkey.ss58_address
if self.forward_callback != None:
bittensor.axon.check_forward_callback(self.forward_callback,index,pubkey)

if self.backward_callback != None:
bittensor.axon.check_backward_callback(backward,index,pubkey)
return self

def _init_stats(self):
return SimpleNamespace(
# Queries per second.
qps = stat_utils.EventsPerSecondRollingAverage( 0, 0.01 ),
# Total requests.
total_requests = 0,
# Total bytes recieved per second.
total_in_bytes = 0,
# Total bytes responded per second.
total_out_bytes = 0,
# Bytes recieved per second.
avg_in_bytes_per_second = stat_utils.AmountPerSecondRollingAverage( 0, 0.01 ),
# Bytes responded per second.
avg_out_bytes_per_second = stat_utils.AmountPerSecondRollingAverage( 0, 0.01 ),
# Total Codes.
total_codes = {
bittensor.proto.ReturnCode.Name(1):0,
bittensor.proto.ReturnCode.Name(2):0,
},
# Total Successes.
total_successes = 0,
# Requests per pubkey.
requests_per_pubkey = {},
# Success per pubkey.
successes_per_pubkey = {},
# Query time per pubkey.
query_times_per_pubkey = {},
# Queries per second per pubkey.
qps_per_pubkey = {},
# Codes recieved per pubkey.
codes_per_pubkey = {},
# Bytes recieved per pubkey.
avg_in_bytes_per_pubkey = {},
# Bytes sent per pubkey.
avg_out_bytes_per_pubkey = {}
codes_per_pubkey = {}
)

#TODO: Replace/update axon and dendrite stats
def update_stats_for_request(self, request, response, time, code):
def update_stats_for_request(self, request, code):
r""" Updates statistics for this request and response.
Args:
requests ( bittensor.proto.TensorMessage, `required`):
The request.
response ( bittensor.proto.TensorMessage, `required`):
The response.
time (:type:`float`, `required`):
Length of call in seconds.
code (:obj:`bittensor.proto.ReturnCode, `required`)
Return code associated with the call i.e. Success of Timeout.
"""
self.stats.qps.event()
self.stats.total_requests += 1
self.stats.total_in_bytes += sys.getsizeof(request)
self.stats.total_out_bytes += sys.getsizeof(response)
self.stats.avg_in_bytes_per_second.event( float(sys.getsizeof(request)) )
self.stats.avg_out_bytes_per_second.event( float(sys.getsizeof(response)) )
pubkey = request.hotkey
if pubkey not in self.stats.requests_per_pubkey:
self.stats.requests_per_pubkey[ pubkey ] = 0
self.stats.successes_per_pubkey[ pubkey ] = 0
self.stats.query_times_per_pubkey[ pubkey ] = stat_utils.AmountPerSecondRollingAverage(0, 0.05)
self.stats.qps_per_pubkey[ pubkey ] = stat_utils.EventsPerSecondRollingAverage(0, 0.05)
self.stats.codes_per_pubkey[ pubkey ] = dict([(k,0) for k in bittensor.proto.ReturnCode.keys()])
self.stats.avg_in_bytes_per_pubkey[ pubkey ] = stat_utils.AmountPerSecondRollingAverage(0, 0.01)
self.stats.avg_out_bytes_per_pubkey[ pubkey ] = stat_utils.AmountPerSecondRollingAverage(0, 0.01)

self.stats.requests_per_pubkey.setdefault(pubkey, 0)
self.stats.successes_per_pubkey.setdefault(pubkey, 0)
self.stats.codes_per_pubkey.setdefault(pubkey, {})
self.stats.total_codes.setdefault(bittensor.proto.ReturnCode.Name( code ), 0)

# Add values.
self.stats.requests_per_pubkey[ pubkey ] += 1
self.stats.successes_per_pubkey[ pubkey ] += 1 if code == 1 else 0
self.stats.query_times_per_pubkey[ pubkey ].event( float(time) )
self.stats.avg_in_bytes_per_pubkey[ pubkey ].event( float(sys.getsizeof(request)) )
self.stats.avg_out_bytes_per_pubkey[ pubkey ].event( float(sys.getsizeof(response)) )
self.stats.qps_per_pubkey[ pubkey ].event()
try:
if bittensor.proto.ReturnCode.Name( code ) in self.stats.codes_per_pubkey[ pubkey ].keys():
self.stats.codes_per_pubkey[ pubkey ][bittensor.proto.ReturnCode.Name( code )] += 1
except:
pass
self.stats.total_successes += 1 if code == 1 else 0

self.stats.codes_per_pubkey[ pubkey ].setdefault(bittensor.proto.ReturnCode.Name( code ), 0)
self.stats.codes_per_pubkey[ pubkey ][bittensor.proto.ReturnCode.Name( code )] += 1
self.stats.total_codes[bittensor.proto.ReturnCode.Name( code )] += 1



def to_dataframe ( self, metagraph ):
r""" Return a stats info as a pandas dataframe indexed by the metagraph or pubkey if not existend.
Expand All @@ -926,49 +840,18 @@ def to_dataframe ( self, metagraph ):
# Reindex the pubkey to uid if metagraph is present.
try:
index = [ metagraph.hotkeys.index(pubkey) for pubkey in self.stats.requests_per_pubkey.keys() if pubkey in metagraph.hotkeys ]
columns = [ 'axon_n_requested', 'axon_n_success', 'axon_query_time','axon_avg_inbytes','axon_avg_outbytes', 'axon_qps' ]
columns = [ 'axon_n_requested', 'axon_n_success' ]
dataframe = pandas.DataFrame(columns = columns, index = index)
for pubkey in self.stats.requests_per_pubkey.keys():
if pubkey in metagraph.hotkeys:
uid = metagraph.hotkeys.index(pubkey)
dataframe.loc[ uid ] = pandas.Series( {
'axon_n_requested': int(self.stats.requests_per_pubkey[pubkey]),
'axon_n_success': int(self.stats.requests_per_pubkey[pubkey]),
'axon_query_time': float(self.stats.query_times_per_pubkey[pubkey].get()),
'axon_avg_inbytes': float(self.stats.avg_in_bytes_per_pubkey[pubkey].get()),
'axon_avg_outbytes': float(self.stats.avg_out_bytes_per_pubkey[pubkey].get()),
'axon_qps': float(self.stats.qps_per_pubkey[pubkey].get())
} )
dataframe['uid'] = dataframe.index
return dataframe

except Exception as e:
bittensor.logging.error(prefix='failed axon.to_dataframe()', sufix=str(e))
return pandas.DataFrame()

def to_wandb( self ):
r""" Return a dictionary of axon stat info for wandb logging
Args:
metagraph: (bittensor.Metagraph):
If not None, indexes the wandb data using int uids rather than string pubkeys.
Return:
wandb_info (:obj:`Dict`)
"""
try:
avg_query_time = 0.0
for pubkey in self.stats.query_times_per_pubkey:
avg_query_time += self.stats.query_times_per_pubkey[pubkey].get() / len( self.stats.query_times_per_pubkey )
# ---- Axon summary for wandb
wandb_data = {
'axon/qps': self.stats.qps.get(),
'axon/avg_query_time': avg_query_time,
'axon/total_requests': self.stats.total_requests,
'axon/total_in_bytes' : self.stats.total_in_bytes,
'axon/total_out_bytes' : self.stats.total_out_bytes,
'axon/avg_in_bytes_per_second' : self.stats.avg_in_bytes_per_second.get(),
'axon/avg_out_bytes_per_second' : self.stats.avg_out_bytes_per_second.get(),
}
return wandb_data
except Exception as e:
bittensor.logging.error(prefix='failed during axon.to_wandb()', sufix=str(e))
return {}
return pandas.DataFrame()
Loading