diff --git a/.circleci/config.yml b/.circleci/config.yml index 1ff5c6551a..4902222058 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -74,8 +74,6 @@ jobs: steps: - checkout - - run: git submodule sync - - run: git submodule update --init # For wallet submoudle - restore_cache: name: Restore cached venv diff --git a/.gitmodules b/.gitmodules deleted file mode 100644 index 4a663b180b..0000000000 --- a/.gitmodules +++ /dev/null @@ -1,3 +0,0 @@ -[submodule "bittensor-wallet"] - path = bittensor/_wallet - url = https://github.com/opentensor/bittensor-wallet diff --git a/bittensor/__init__.py b/bittensor/__init__.py index eb96477cfb..c146716fe5 100644 --- a/bittensor/__init__.py +++ b/bittensor/__init__.py @@ -184,12 +184,8 @@ def debug(on: bool = True): from substrateinterface import Keypair as Keypair from .config import * -from ._wallet import ( - wallet as wallet, - Keyfile as Keyfile, - WalletConfig as WalletConfig, - WalletConfigDefault as WalletConfigDefault, -) +from .keyfile import * +from .wallet import * from .utils import * from .utils.balance import Balance as Balance diff --git a/bittensor/_wallet b/bittensor/_wallet deleted file mode 160000 index 286eb4d1d3..0000000000 --- a/bittensor/_wallet +++ /dev/null @@ -1 +0,0 @@ -Subproject commit 286eb4d1d39c90fe9cb720ad0c7570ee6cc26543 diff --git a/bittensor/errors.py b/bittensor/errors.py index da9b9fb2b6..9053cf6824 100644 --- a/bittensor/errors.py +++ b/bittensor/errors.py @@ -72,6 +72,6 @@ class NotDelegateError(StakeError): class KeyFileError(Exception): - """Error thrown when the Keyfile is corrupt, non-writable, non-readable or the password used to decrypt is invalid.""" + """Error thrown when the keyfile is corrupt, non-writable, non-readable or the password used to decrypt is invalid.""" pass diff --git a/bittensor/keyfile.py b/bittensor/keyfile.py new file mode 100644 index 0000000000..f8a02c832d --- /dev/null +++ b/bittensor/keyfile.py @@ -0,0 +1,652 @@ +# The MIT License (MIT) +# Copyright © 2021 Yuma Rao + +# Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated +# documentation files (the “Software”), to deal in the Software without restriction, including without limitation +# the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, +# and to permit persons to whom the Software is furnished to do so, subject to the following conditions: + +# The above copyright notice and this permission notice shall be included in all copies or substantial portions of +# the Software. + +# THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO +# THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL +# THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION +# OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +# DEALINGS IN THE SOFTWARE. + +import os +import base64 +import json +import stat +import getpass +import bittensor +from typing import Optional +from pathlib import Path + +from ansible_vault import Vault +from ansible.parsing.vault import AnsibleVaultError +from cryptography.exceptions import InvalidSignature, InvalidKey +from cryptography.fernet import Fernet, InvalidToken +from cryptography.hazmat.primitives import hashes +from cryptography.hazmat.backends import default_backend +from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC +from password_strength import PasswordPolicy +from substrateinterface.utils.ss58 import ss58_encode +from termcolor import colored + + +def serialized_keypair_to_keyfile_data(keypair: "bittensor.Keypair") -> bytes: + """Serializes keypair object into keyfile data. + Args: + keypair (bittensor.Keypair): The keypair object to be serialized. + Returns: + data (bytes): Serialized keypair data. + """ + json_data = { + "accountId": "0x" + keypair.public_key.hex() if keypair.public_key else None, + "publicKey": "0x" + keypair.public_key.hex() if keypair.public_key else None, + "secretPhrase": keypair.mnemonic if keypair.mnemonic else None, + "secretSeed": "0x" + + ( + keypair.seed_hex + if isinstance(keypair.seed_hex, str) + else keypair.seed_hex.hex() + ) + if keypair.seed_hex + else None, + "ss58Address": keypair.ss58_address if keypair.ss58_address else None, + } + data = json.dumps(json_data).encode() + return data + + +def deserialize_keypair_from_keyfile_data(keyfile_data: bytes) -> "bittensor.Keypair": + """Deserializes Keypair object from passed keyfile data. + Args: + keyfile_data (bytes): The keyfile data as bytes to be loaded. + Returns: + keypair (bittensor.Keypair): The Keypair loaded from bytes. + Raises: + KeyFileError: Raised if the passed bytes cannot construct a keypair object. + """ + keyfile_data = keyfile_data.decode() + try: + keyfile_dict = dict(json.loads(keyfile_data)) + except: + string_value = str(keyfile_data) + if string_value[:2] == "0x": + string_value = ss58_encode(string_value) + keyfile_dict = { + "accountId": None, + "publicKey": None, + "secretPhrase": None, + "secretSeed": None, + "ss58Address": string_value, + } + else: + raise bittensor.KeyFileError( + "Keypair could not be created from keyfile data: {}".format( + string_value + ) + ) + + if "secretSeed" in keyfile_dict and keyfile_dict["secretSeed"] is not None: + return bittensor.Keypair.create_from_seed(keyfile_dict["secretSeed"]) + + if "secretPhrase" in keyfile_dict and keyfile_dict["secretPhrase"] is not None: + return bittensor.Keypair.create_from_mnemonic( + mnemonic=keyfile_dict["secretPhrase"] + ) + + if "ss58Address" in keyfile_dict and keyfile_dict["ss58Address"] is not None: + return bittensor.Keypair(ss58_address=keyfile_dict["ss58Address"]) + + else: + raise bittensor.KeyFileError( + "Keypair could not be created from keyfile data: {}".format(keyfile_dict) + ) + + +def validate_password(password: str) -> bool: + """Validates the password against a password policy. + Args: + password (str): The password to verify. + Returns: + valid (bool): True if the password meets validity requirements. + """ + policy = PasswordPolicy.from_names(strength=0.20, entropybits=10, length=6) + if not password: + return False + tested_pass = policy.password(password) + result = tested_pass.test() + if len(result) > 0: + print( + colored( + "Password not strong enough. Try increasing the length of the password or the password complexity" + ) + ) + return False + password_verification = getpass.getpass("Retype your password: ") + if password != password_verification: + print("Passwords do not match") + return False + return True + + +def ask_password_to_encrypt() -> str: + """Prompts the user to enter a password for key encryption. + Returns: + password (str): The valid password entered by the user. + """ + valid = False + while not valid: + password = getpass.getpass("Specify password for key encryption: ") + valid = validate_password(password) + return password + + +def keyfile_data_is_encrypted_ansible(keyfile_data: bytes) -> bool: + """Returns true if the keyfile data is ansible encrypted. + Args: + keyfile_data (bytes): The bytes to validate. + Returns: + is_ansible (bool): True if the data is ansible encrypted. + """ + return keyfile_data[:14] == b"$ANSIBLE_VAULT" + + +def keyfile_data_is_encrypted_legacy(keyfile_data: bytes) -> bool: + """Returns true if the keyfile data is legacy encrypted. + Args: + keyfile_data (bytes): The bytes to validate. + Returns: + is_legacy (bool): True if the data is legacy encrypted. + """ + return keyfile_data[:6] == b"gAAAAA" + + +def keyfile_data_is_encrypted(keyfile_data: bytes) -> bool: + """Returns true if the keyfile data is encrypted. + Args: + keyfile_data (bytes): The bytes to validate. + Returns: + is_encrypted (bool): True if the data is encrypted. + """ + return keyfile_data_is_encrypted_ansible( + keyfile_data + ) or keyfile_data_is_encrypted_legacy(keyfile_data) + + +def encrypt_keyfile_data(keyfile_data: bytes, password: str = None) -> bytes: + """Encrypts the passed keyfile data using ansible vault. + Args: + keyfile_data (bytes): The bytes to encrypt. + password (str, optional): The password used to encrypt the data. If None, asks for user input. + Returns: + encrypted_data (bytes): The encrypted data. + """ + password = ask_password_to_encrypt() if password is None else password + console = bittensor.__console__ + with console.status(":locked_with_key: Encrypting key..."): + vault = Vault(password) + return vault.vault.encrypt(keyfile_data) + + +def get_coldkey_password_from_environment(coldkey_name: str) -> Optional[str]: + """Retrieves the cold key password from the environment variables. + Args: + coldkey_name (str): The name of the cold key. + Returns: + password (str): The password retrieved from the environment variables, or None if not found. + """ + for env_var in os.environ: + if env_var.upper().startswith("BT_COLD_PW_") and env_var.upper().endswith( + coldkey_name.upper() + ): + return os.getenv(env_var) + + return None + + +def decrypt_keyfile_data( + keyfile_data: bytes, password: str = None, coldkey_name: Optional[str] = None +) -> bytes: + """Decrypts the passed keyfile data using ansible vault. + Args: + keyfile_data (bytes): The bytes to decrypt. + password (str, optional): The password used to decrypt the data. If None, asks for user input. + coldkey_name (str, optional): The name of the cold key. If provided, retrieves the password from environment variables. + Returns: + decrypted_data (bytes): The decrypted data. + Raises: + KeyFileError: Raised if the file is corrupted or if the password is incorrect. + """ + if coldkey_name is not None and password is None: + password = get_coldkey_password_from_environment(coldkey_name) + + try: + password = ( + getpass.getpass("Enter password to unlock key: ") + if password is None + else password + ) + console = bittensor.__console__ + with console.status(":key: Decrypting key..."): + # Ansible decrypt. + if keyfile_data_is_encrypted_ansible(keyfile_data): + vault = Vault(password) + try: + decrypted_keyfile_data = vault.load(keyfile_data) + except AnsibleVaultError: + raise bittensor.KeyFileError("Invalid password") + # Legacy decrypt. + elif keyfile_data_is_encrypted_legacy(keyfile_data): + __SALT = ( + b"Iguesscyborgslikemyselfhaveatendencytobeparanoidaboutourorigins" + ) + kdf = PBKDF2HMAC( + algorithm=hashes.SHA256(), + salt=__SALT, + length=32, + iterations=10000000, + backend=default_backend(), + ) + key = base64.urlsafe_b64encode(kdf.derive(password.encode())) + cipher_suite = Fernet(key) + decrypted_keyfile_data = cipher_suite.decrypt(keyfile_data) + # Unknown. + else: + raise bittensor.KeyFileError( + "keyfile data: {} is corrupt".format(keyfile_data) + ) + + except (InvalidSignature, InvalidKey, InvalidToken): + raise bittensor.KeyFileError("Invalid password") + + if not isinstance(decrypted_keyfile_data, bytes): + decrypted_keyfile_data = json.dumps(decrypted_keyfile_data).encode() + return decrypted_keyfile_data + + +class keyfile: + """Defines an interface for a substrate interface keypair stored on device.""" + + def __init__(self, path: str): + self.path = os.path.expanduser(path) + self.name = Path(self.path).parent.stem + + def __str__(self): + if not self.exists_on_device(): + return "keyfile (empty, {})>".format(self.path) + if self.is_encrypted(): + return "keyfile (encrypted, {})>".format(self.path) + else: + return "keyfile (decrypted, {})>".format(self.path) + + def __repr__(self): + return self.__str__() + + @property + def keypair(self) -> "bittensor.Keypair": + """Returns the keypair from path, decrypts data if the file is encrypted. + Returns: + keypair (bittensor.Keypair): The keypair stored under the path. + Raises: + KeyFileError: Raised if the file does not exist, is not readable, writable, corrupted, or if the password is incorrect. + """ + return self.get_keypair() + + @property + def data(self) -> bytes: + """Returns the keyfile data under path. + Returns: + keyfile_data (bytes): The keyfile data stored under the path. + Raises: + KeyFileError: Raised if the file does not exist, is not readable, or writable. + """ + return self._read_keyfile_data_from_file() + + @property + def keyfile_data(self) -> bytes: + """Returns the keyfile data under path. + Returns: + keyfile_data (bytes): The keyfile data stored under the path. + Raises: + KeyFileError: Raised if the file does not exist, is not readable, or writable. + """ + return self._read_keyfile_data_from_file() + + def set_keypair( + self, + keypair: "bittensor.Keypair", + encrypt: bool = True, + overwrite: bool = False, + password: str = None, + ): + """Writes the keypair to the file and optionally encrypts data. + Args: + keypair (bittensor.Keypair): The keypair to store under the path. + encrypt (bool, optional): If True, encrypts the file under the path. Default is True. + overwrite (bool, optional): If True, forces overwrite of the current file. Default is False. + password (str, optional): The password used to encrypt the file. If None, asks for user input. + Raises: + KeyFileError: Raised if the file does not exist, is not readable, writable, or if the password is incorrect. + """ + self.make_dirs() + keyfile_data = serialized_keypair_to_keyfile_data(keypair) + if encrypt: + keyfile_data = encrypt_keyfile_data(keyfile_data, password) + self._write_keyfile_data_to_file(keyfile_data, overwrite=overwrite) + + def get_keypair(self, password: str = None) -> "bittensor.Keypair": + """Returns the keypair from the path, decrypts data if the file is encrypted. + Args: + password (str, optional): The password used to decrypt the file. If None, asks for user input. + Returns: + keypair (bittensor.Keypair): The keypair stored under the path. + Raises: + KeyFileError: Raised if the file does not exist, is not readable, writable, corrupted, or if the password is incorrect. + """ + keyfile_data = self._read_keyfile_data_from_file() + if keyfile_data_is_encrypted(keyfile_data): + keyfile_data = decrypt_keyfile_data( + keyfile_data, password, coldkey_name=self.name + ) + return deserialize_keypair_from_keyfile_data(keyfile_data) + + def make_dirs(self): + """Creates directories for the path if they do not exist.""" + directory = os.path.dirname(self.path) + if not os.path.exists(directory): + os.makedirs(directory) + + def exists_on_device(self) -> bool: + """Returns True if the file exists on the device. + Returns: + on_device (bool): True if the file is on the device. + """ + if not os.path.isfile(self.path): + return False + return True + + def is_readable(self) -> bool: + """Returns True if the file under path is readable. + Returns: + readable (bool): True if the file is readable. + """ + if not self.exists_on_device(): + return False + if not os.access(self.path, os.R_OK): + return False + return True + + def is_writable(self) -> bool: + """Returns True if the file under path is writable. + Returns: + writable (bool): True if the file is writable. + """ + if os.access(self.path, os.W_OK): + return True + return False + + def is_encrypted(self) -> bool: + """Returns True if the file under path is encrypted. + Returns: + encrypted (bool): True if the file is encrypted. + """ + if not self.exists_on_device(): + return False + if not self.is_readable(): + return False + return keyfile_data_is_encrypted(self._read_keyfile_data_from_file()) + + def _may_overwrite(self) -> bool: + """Asks the user if it's okay to overwrite the file. + Returns: + may_overwrite (bool): True if the user allows overwriting the file. + """ + choice = input("File {} already exists. Overwrite? (y/N) ".format(self.path)) + return choice == "y" + + def encrypt(self, password: str = None): + """Encrypts the file under the path. + Args: + password (str, optional): The password for encryption. If None, asks for user input. + Raises: + KeyFileError: Raised if the file does not exist, is not readable, or writable. + """ + if not self.exists_on_device(): + raise bittensor.KeyFileError( + "Keyfile at: {} does not exist".format(self.path) + ) + if not self.is_readable(): + raise bittensor.KeyFileError( + "Keyfile at: {} is not readable".format(self.path) + ) + if not self.is_writable(): + raise bittensor.KeyFileError( + "Keyfile at: {} is not writable".format(self.path) + ) + keyfile_data = self._read_keyfile_data_from_file() + if not keyfile_data_is_encrypted(keyfile_data): + as_keypair = deserialize_keypair_from_keyfile_data(keyfile_data) + keyfile_data = serialized_keypair_to_keyfile_data(as_keypair) + keyfile_data = encrypt_keyfile_data(keyfile_data, password) + self._write_keyfile_data_to_file(keyfile_data, overwrite=True) + + def decrypt(self, password: str = None): + """Decrypts the file under the path. + Args: + password (str, optional): The password for decryption. If None, asks for user input. + Raises: + KeyFileError: Raised if the file does not exist, is not readable, writable, corrupted, or if the password is incorrect. + """ + if not self.exists_on_device(): + raise bittensor.KeyFileError( + "Keyfile at: {} does not exist".format(self.path) + ) + if not self.is_readable(): + raise bittensor.KeyFileError( + "Keyfile at: {} is not readable".format(self.path) + ) + if not self.is_writable(): + raise bittensor.KeyFileError( + "Keyfile at: {} is not writable".format(self.path) + ) + keyfile_data = self._read_keyfile_data_from_file() + if keyfile_data_is_encrypted(keyfile_data): + keyfile_data = decrypt_keyfile_data( + keyfile_data, password, coldkey_name=self.name + ) + as_keypair = deserialize_keypair_from_keyfile_data(keyfile_data) + keyfile_data = serialized_keypair_to_keyfile_data(as_keypair) + self._write_keyfile_data_to_file(keyfile_data, overwrite=True) + + def _read_keyfile_data_from_file(self) -> bytes: + """Reads the keyfile data from the file. + Returns: + keyfile_data (bytes): The keyfile data stored under the path. + Raises: + KeyFileError: Raised if the file does not exist or is not readable. + """ + if not self.exists_on_device(): + raise bittensor.KeyFileError( + "Keyfile at: {} does not exist".format(self.path) + ) + if not self.is_readable(): + raise bittensor.KeyFileError( + "Keyfile at: {} is not readable".format(self.path) + ) + with open(self.path, "rb") as file: + data = file.read() + return data + + def _write_keyfile_data_to_file(self, keyfile_data: bytes, overwrite: bool = False): + """Writes the keyfile data to the file. + Args: + keyfile_data (bytes): The byte data to store under the path. + overwrite (bool, optional): If True, overwrites the data without asking for permission from the user. Default is False. + Raises: + KeyFileError: Raised if the file is not writable or the user responds No to the overwrite prompt. + """ + # Check overwrite. + if self.exists_on_device() and not overwrite: + if not self._may_overwrite(): + raise bittensor.KeyFileError( + "Keyfile at: {} is not writable".format(self.path) + ) + with open(self.path, "wb") as keyfile: + keyfile.write(keyfile_data) + # Set file permissions. + os.chmod(self.path, stat.S_IRUSR | stat.S_IWUSR) + + +class Mockkeyfile: + """ + The Mockkeyfile is a mock object representing a keyfile that does not exist on the device. + It is designed for use in testing scenarios and simulations where actual filesystem operations are not required. + The keypair stored in the Mockkeyfile is treated as non-encrypted and the data is stored as a serialized string. + """ + + def __init__(self, path: str): + """ + Initializes a Mockkeyfile object. + + Args: + path (str): The path of the mock keyfile. + """ + self.path = path + self._mock_keypair = None + self._mock_data = None + + def __str__(self): + """ + Returns a string representation of the Mockkeyfile. The representation will indicate if the + keyfile is empty, encrypted, or decrypted. + + Returns: + str: The string representation of the Mockkeyfile. + """ + return f"Mockkeyfile({self.path})" + + def __repr__(self): + """ + Returns a string representation of the Mockkeyfile, same as __str__(). + + Returns: + str: The string representation of the Mockkeyfile. + """ + return self.__str__() + + @property + def keypair(self): + """ + Returns the mock keypair stored in the keyfile. + + Returns: + bittensor.Keypair: The mock keypair. + """ + return self._mock_keypair + + @property + def data(self): + """ + Returns the serialized keypair data stored in the keyfile. + + Returns: + bytes: The serialized keypair data. + """ + return self._mock_data + + def set_keypair(self, keypair, encrypt=True, overwrite=False, password=None): + """ + Sets the mock keypair in the keyfile. The `encrypt` and `overwrite` parameters are ignored. + + Args: + keypair (bittensor.Keypair): The mock keypair to be set. + encrypt (bool, optional): Ignored in this context. Defaults to True. + overwrite (bool, optional): Ignored in this context. Defaults to False. + password (str, optional): Ignored in this context. Defaults to None. + """ + self._mock_keypair = keypair + self._mock_data = None # You may need to serialize the keypair here + + def get_keypair(self, password=None): + """ + Returns the mock keypair stored in the keyfile. The `password` parameter is ignored. + + Args: + password (str, optional): Ignored in this context. Defaults to None. + + Returns: + bittensor.Keypair: The mock keypair stored in the keyfile. + """ + return self._mock_keypair + + def make_dirs(self): + """ + Creates the directories for the mock keyfile. Does nothing in this class, + since no actual filesystem operations are needed. + """ + pass + + def exists_on_device(self): + """ + Returns True indicating that the mock keyfile exists on the device (although + it is not created on the actual file system). + + Returns: + bool: Always returns True for Mockkeyfile. + """ + return True + + def is_readable(self): + """ + Returns True indicating that the mock keyfile is readable (although it is not + read from the actual file system). + + Returns: + bool: Always returns True for Mockkeyfile. + """ + return True + + def is_writable(self): + """ + Returns True indicating that the mock keyfile is writable (although it is not + written to the actual file system). + + Returns: + bool: Always returns True for Mockkeyfile. + """ + return True + + def is_encrypted(self): + """ + Returns False indicating that the mock keyfile is not encrypted. + + Returns: + bool: Always returns False for Mockkeyfile. + """ + return False + + def encrypt(self, password=None): + """ + Raises a ValueError since encryption is not supported for the mock keyfile. + + Args: + password (str, optional): Ignored in this context. Defaults to None. + + Raises: + ValueError: Always raises this exception for Mockkeyfile. + """ + raise ValueError("Cannot encrypt a Mockkeyfile") + + def decrypt(self, password=None): + """ + Returns without doing anything since the mock keyfile is not encrypted. + + Args: + password (str, optional): Ignored in this context. Defaults to None. + """ + pass diff --git a/bittensor/mock/keyfile_mock.py b/bittensor/mock/keyfile_mock.py index 263a4b26c6..e13126cc17 100644 --- a/bittensor/mock/keyfile_mock.py +++ b/bittensor/mock/keyfile_mock.py @@ -18,10 +18,10 @@ # OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER # DEALINGS IN THE SOFTWARE. -from .._wallet import serialized_keypair_to_keyfile_data, Keyfile, Keypair +from bittensor import serialized_keypair_to_keyfile_data, keyfile, Keypair -class MockKeyfile(Keyfile): +class MockKeyfile(keyfile): """Defines an interface to a mocked keyfile object (nothing is created on device) keypair is treated as non encrypted and the data is just the string version.""" def __init__(self, path: str): diff --git a/bittensor/mock/subtensor_mock.py b/bittensor/mock/subtensor_mock.py index 274712ab86..3999e72bf5 100644 --- a/bittensor/mock/subtensor_mock.py +++ b/bittensor/mock/subtensor_mock.py @@ -24,7 +24,7 @@ from collections.abc import Mapping from hashlib import sha256 -from .._wallet import wallet +from ..wallet import wallet from ..chain_data import ( NeuronInfo, diff --git a/bittensor/mock/wallet_mock.py b/bittensor/mock/wallet_mock.py index a3c7ef4ae6..35179f8c94 100644 --- a/bittensor/mock/wallet_mock.py +++ b/bittensor/mock/wallet_mock.py @@ -19,15 +19,14 @@ # DEALINGS IN THE SOFTWARE. import os -from .._wallet import wallet, Keyfile, Keypair, __ss58_format__ - +import bittensor from typing import Optional from Crypto.Hash import keccak from .keyfile_mock import MockKeyfile -class MockWallet(wallet): +class MockWallet(bittensor.wallet): """ Mocked Version of the bittensor wallet class, meant to be used for testing """ @@ -45,7 +44,7 @@ def __init__(self, **kwargs): self._mocked_hotkey_keyfile = None @property - def hotkey_file(self) -> "Keyfile": + def hotkey_file(self) -> "bittensor.keyfile": if self._is_mock: if self._mocked_hotkey_keyfile == None: self._mocked_hotkey_keyfile = MockKeyfile(path="MockedHotkey") @@ -53,10 +52,10 @@ def hotkey_file(self) -> "Keyfile": else: wallet_path = os.path.expanduser(os.path.join(self.path, self.name)) hotkey_path = os.path.join(wallet_path, "hotkeys", self.hotkey_str) - return Keyfile(path=hotkey_path) + return bittensor.keyfile(path=hotkey_path) @property - def coldkey_file(self) -> "Keyfile": + def coldkey_file(self) -> "bittensor.keyfile": if self._is_mock: if self._mocked_coldkey_keyfile == None: self._mocked_coldkey_keyfile = MockKeyfile(path="MockedColdkey") @@ -64,10 +63,10 @@ def coldkey_file(self) -> "Keyfile": else: wallet_path = os.path.expanduser(os.path.join(self.path, self.name)) coldkey_path = os.path.join(wallet_path, "coldkey") - return Keyfile(path=coldkey_path) + return bittensor.keyfile(path=coldkey_path) @property - def coldkeypub_file(self) -> "Keyfile": + def coldkeypub_file(self) -> "bittensor.keyfile": if self._is_mock: if self._mocked_coldkey_keyfile == None: self._mocked_coldkey_keyfile = MockKeyfile(path="MockedColdkeyPub") @@ -75,16 +74,22 @@ def coldkeypub_file(self) -> "Keyfile": else: wallet_path = os.path.expanduser(os.path.join(self.path, self.name)) coldkeypub_path = os.path.join(wallet_path, "coldkeypub.txt") - return Keyfile(path=coldkeypub_path) + return bittensor.keyfile(path=coldkeypub_path) -def get_mock_wallet(coldkey: "Keypair" = None, hotkey: "Keypair" = None): +def get_mock_wallet( + coldkey: "bittensor.Keypair" = None, hotkey: "bittensor.Keypair" = None +): wallet = MockWallet(name="mock_wallet", hotkey="mock", path="/tmp/mock_wallet") if not coldkey: - coldkey = Keypair.create_from_mnemonic(Keypair.generate_mnemonic()) + coldkey = bittensor.Keypair.create_from_mnemonic( + bittensor.Keypair.generate_mnemonic() + ) if not hotkey: - hotkey = Keypair.create_from_mnemonic(Keypair.generate_mnemonic()) + hotkey = bittensor.Keypair.create_from_mnemonic( + bittensor.Keypair.generate_mnemonic() + ) wallet.set_coldkey(coldkey, encrypt=False, overwrite=True) wallet.set_coldkeypub(coldkey, encrypt=False, overwrite=True) @@ -93,7 +98,7 @@ def get_mock_wallet(coldkey: "Keypair" = None, hotkey: "Keypair" = None): return wallet -def get_mock_keypair(uid: int, test_name: Optional[str] = None) -> Keypair: +def get_mock_keypair(uid: int, test_name: Optional[str] = None) -> bittensor.Keypair: """ Returns a mock keypair from a uid and optional test_name. If test_name is not provided, the uid is the only seed. @@ -108,9 +113,9 @@ def get_mock_keypair(uid: int, test_name: Optional[str] = None) -> Keypair: ) uid = uid + hashed_test_name_as_int - return Keypair.create_from_seed( + return bittensor.Keypair.create_from_seed( seed_hex=int.to_bytes(uid, 32, "big", signed=False), - ss58_format=__ss58_format__, + ss58_format=bittensor.__ss58_format__, ) diff --git a/bittensor/utils/wallet_utils.py b/bittensor/utils/wallet_utils.py index f1aaf79c01..8768d3a5ba 100644 --- a/bittensor/utils/wallet_utils.py +++ b/bittensor/utils/wallet_utils.py @@ -23,13 +23,6 @@ from .. import __ss58_format__ from substrateinterface import Keypair as Keypair -from .._wallet import ( - validate_password as validate_password, - serialized_keypair_to_keyfile_data as serialized_keypair_to_keyfile_data, - ask_password_to_encrypt as ask_password_to_encrypt, - decrypt_keyfile_data as decrypt_keyfile_data, -) - def get_ss58_format(ss58_address: str) -> int: """Returns the ss58 format of the given ss58 address.""" diff --git a/bittensor/wallet.py b/bittensor/wallet.py new file mode 100644 index 0000000000..b073ea99ec --- /dev/null +++ b/bittensor/wallet.py @@ -0,0 +1,816 @@ +""" Implementation of the wallet class, which manages balances with staking and transfer. Also manages hotkey and coldkey. +""" +# The MIT License (MIT) +# Copyright © 2021 Yuma Rao + +# Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated +# documentation files (the “Software”), to deal in the Software without restriction, including without limitation +# the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, +# and to permit persons to whom the Software is furnished to do so, subject to the following conditions: + +# The above copyright notice and this permission notice shall be included in all copies or substantial portions of +# the Software. + +# THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO +# THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL +# THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION +# OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +# DEALINGS IN THE SOFTWARE. + +import os +import copy +import argparse +import bittensor +from termcolor import colored +from substrateinterface import Keypair +from typing import Optional, Union, List, Tuple, Dict, overload +from bittensor.utils import is_valid_bittensor_address_or_public_key + + +def display_mnemonic_msg(keypair: Keypair, key_type: str): + """ + Display the mnemonic and a warning message to keep the mnemonic safe. + + Args: + keypair (Keypair): Keypair object. + key_type (str): Type of the key (coldkey or hotkey). + """ + mnemonic = keypair.mnemonic + mnemonic_green = colored(mnemonic, "green") + print( + colored( + "\nIMPORTANT: Store this mnemonic in a secure (preferable offline place), as anyone " + "who has possession of this mnemonic can use it to regenerate the key and access your tokens. \n", + "red", + ) + ) + print("The mnemonic to the new {} is:\n\n{}\n".format(key_type, mnemonic_green)) + print( + "You can use the mnemonic to recreate the key in case it gets lost. The command to use to regenerate the key using this mnemonic is:" + ) + print("btcli regen_{} --mnemonic {}".format(key_type, mnemonic)) + print("") + + +class wallet: + """ + Bittensor wallet maintenance class. Each wallet contains a coldkey and a hotkey. + The coldkey is the user's primary key for holding stake in their wallet + and is the only way that users can access Tao. Coldkeys can hold tokens and should be encrypted on your device. + The coldkey must be used to stake and unstake funds from a running node. The hotkey, on the other hand, is only used + for subscribing and setting weights from running code. Hotkeys are linked to coldkeys through the metagraph. + """ + + @classmethod + def config(cls) -> "bittensor.config": + """ + Get config from the argument parser. + + Returns: + bittensor.config: Config object. + """ + parser = argparse.ArgumentParser() + cls.add_args(parser) + return bittensor.config(parser, args=[]) + + @classmethod + def help(cls): + """ + Print help to stdout. + """ + parser = argparse.ArgumentParser() + cls.add_args(parser) + print(cls.__new__.__doc__) + parser.print_help() + + @classmethod + def add_args(cls, parser: argparse.ArgumentParser, prefix: str = None): + """ + Accept specific arguments from parser. + + Args: + parser (argparse.ArgumentParser): Argument parser object. + prefix (str): Argument prefix. + """ + prefix_str = "" if prefix == None else prefix + "." + try: + default_name = os.getenv("BT_WALLET_NAME") or "default" + default_hotkey = os.getenv("BT_WALLET_NAME") or "default" + default_path = os.getenv("BT_WALLET_PATH") or "~/.bittensor/wallets/" + parser.add_argument( + "--" + prefix_str + "wallet.name", + required=False, + default=default_name, + help="The name of the wallet to unlock for running bittensor " + "(name mock is reserved for mocking this wallet)", + ) + parser.add_argument( + "--" + prefix_str + "wallet.hotkey", + required=False, + default=default_hotkey, + help="The name of the wallet's hotkey.", + ) + parser.add_argument( + "--" + prefix_str + "wallet.path", + required=False, + default=default_path, + help="The path to your bittensor wallets", + ) + except argparse.ArgumentError as e: + pass + + def __init__( + self, + name: str = None, + hotkey: str = None, + path: str = None, + config: "bittensor.config" = None, + ): + r""" + Initialize the bittensor wallet object containing a hot and coldkey. + + Args: + name (str, optional): The name of the wallet to unlock for running bittensor. Defaults to 'default'. + hotkey (str, optional): The name of hotkey used to running the miner. Defaults to 'default'. + path (str, optional): The path to your bittensor wallets. Defaults to '~/.bittensor/wallets/'. + config (bittensor.config, optional): bittensor.wallet.config(). Defaults to None. + """ + # Fill config from passed args using command line defaults. + if config is None: + config = wallet.config() + self.config = copy.deepcopy(config) + self.config.wallet.name = name or self.config.wallet.get( + "name", bittensor.defaults.wallet.name + ) + self.config.wallet.hotkey = hotkey or self.config.wallet.get( + "hotkey", bittensor.defaults.wallet.hotkey + ) + self.config.wallet.path = path or self.config.wallet.get( + "path", bittensor.defaults.wallet.path + ) + + self.name = self.config.wallet.name + self.path = self.config.wallet.path + self.hotkey_str = self.config.wallet.hotkey + + self._hotkey = None + self._coldkey = None + self._coldkeypub = None + + def __str__(self): + """ + Returns the string representation of the Wallet object. + + Returns: + str: The string representation. + """ + return "wallet({}, {}, {})".format(self.name, self.hotkey_str, self.path) + + def __repr__(self): + """ + Returns the string representation of the Wallet object. + + Returns: + str: The string representation. + """ + return self.__str__() + + def create_if_non_existent( + self, coldkey_use_password: bool = True, hotkey_use_password: bool = False + ) -> "wallet": + """ + Checks for existing coldkeypub and hotkeys and creates them if non-existent. + + Args: + coldkey_use_password (bool, optional): Whether to use a password for coldkey. Defaults to True. + hotkey_use_password (bool, optional): Whether to use a password for hotkey. Defaults to False. + + Returns: + wallet: The Wallet object. + """ + return self.create(coldkey_use_password, hotkey_use_password) + + def create( + self, coldkey_use_password: bool = True, hotkey_use_password: bool = False + ) -> "wallet": + """ + Checks for existing coldkeypub and hotkeys and creates them if non-existent. + + Args: + coldkey_use_password (bool, optional): Whether to use a password for coldkey. Defaults to True. + hotkey_use_password (bool, optional): Whether to use a password for hotkey. Defaults to False. + + Returns: + wallet: The Wallet object. + """ + # ---- Setup Wallet. ---- + if ( + not self.coldkey_file.exists_on_device() + and not self.coldkeypub_file.exists_on_device() + ): + self.create_new_coldkey(n_words=12, use_password=coldkey_use_password) + if not self.hotkey_file.exists_on_device(): + self.create_new_hotkey(n_words=12, use_password=hotkey_use_password) + return self + + def recreate( + self, coldkey_use_password: bool = True, hotkey_use_password: bool = False + ) -> "wallet": + """ + Checks for existing coldkeypub and hotkeys and creates them if non-existent. + + Args: + coldkey_use_password (bool, optional): Whether to use a password for coldkey. Defaults to True. + hotkey_use_password (bool, optional): Whether to use a password for hotkey. Defaults to False. + + Returns: + wallet: The Wallet object. + """ + # ---- Setup Wallet. ---- + self.create_new_coldkey(n_words=12, use_password=coldkey_use_password) + self.create_new_hotkey(n_words=12, use_password=hotkey_use_password) + return self + + @property + def hotkey_file(self) -> "bittensor.keyfile": + """ + Property that returns the hotkey file. + + Returns: + bittensor.keyfile: The hotkey file. + """ + wallet_path = os.path.expanduser(os.path.join(self.path, self.name)) + hotkey_path = os.path.join(wallet_path, "hotkeys", self.hotkey_str) + return bittensor.keyfile(path=hotkey_path) + + @property + def coldkey_file(self) -> "bittensor.keyfile": + """ + Property that returns the coldkey file. + + Returns: + bittensor.keyfile: The coldkey file. + """ + wallet_path = os.path.expanduser(os.path.join(self.path, self.name)) + coldkey_path = os.path.join(wallet_path, "coldkey") + return bittensor.keyfile(path=coldkey_path) + + @property + def coldkeypub_file(self) -> "bittensor.keyfile": + """ + Property that returns the coldkeypub file. + + Returns: + bittensor.keyfile: The coldkeypub file. + """ + wallet_path = os.path.expanduser(os.path.join(self.path, self.name)) + coldkeypub_path = os.path.join(wallet_path, "coldkeypub.txt") + return bittensor.keyfile(path=coldkeypub_path) + + def set_hotkey( + self, + keypair: "bittensor.Keypair", + encrypt: bool = False, + overwrite: bool = False, + ) -> "bittensor.keyfile": + """ + Sets the hotkey for the wallet. + + Args: + keypair (bittensor.Keypair): The hotkey keypair. + encrypt (bool, optional): Whether to encrypt the hotkey. Defaults to False. + overwrite (bool, optional): Whether to overwrite an existing hotkey. Defaults to False. + + Returns: + bittensor.keyfile: The hotkey file. + """ + self._hotkey = keypair + self.hotkey_file.set_keypair(keypair, encrypt=encrypt, overwrite=overwrite) + + def set_coldkeypub( + self, + keypair: "bittensor.Keypair", + encrypt: bool = False, + overwrite: bool = False, + ) -> "bittensor.keyfile": + """ + Sets the coldkeypub for the wallet. + + Args: + keypair (bittensor.Keypair): The coldkeypub keypair. + encrypt (bool, optional): Whether to encrypt the coldkeypub. Defaults to False. + overwrite (bool, optional): Whether to overwrite an existing coldkeypub. Defaults to False. + + Returns: + bittensor.keyfile: The coldkeypub file. + """ + self._coldkeypub = bittensor.Keypair(ss58_address=keypair.ss58_address) + self.coldkeypub_file.set_keypair( + self._coldkeypub, encrypt=encrypt, overwrite=overwrite + ) + + def set_coldkey( + self, + keypair: "bittensor.Keypair", + encrypt: bool = True, + overwrite: bool = False, + ) -> "bittensor.keyfile": + """ + Sets the coldkey for the wallet. + + Args: + keypair (bittensor.Keypair): The coldkey keypair. + encrypt (bool, optional): Whether to encrypt the coldkey. Defaults to True. + overwrite (bool, optional): Whether to overwrite an existing coldkey. Defaults to False. + + Returns: + bittensor.keyfile: The coldkey file. + """ + self._coldkey = keypair + self.coldkey_file.set_keypair( + self._coldkey, encrypt=encrypt, overwrite=overwrite + ) + + def get_coldkey(self, password: str = None) -> "bittensor.Keypair": + """ + Gets the coldkey from the wallet. + + Args: + password (str, optional): The password to decrypt the coldkey. Defaults to None. + + Returns: + bittensor.Keypair: The coldkey keypair. + """ + return self.coldkey_file.get_keypair(password=password) + + def get_hotkey(self, password: str = None) -> "bittensor.Keypair": + """ + Gets the hotkey from the wallet. + + Args: + password (str, optional): The password to decrypt the hotkey. Defaults to None. + + Returns: + bittensor.Keypair: The hotkey keypair. + """ + return self.hotkey_file.get_keypair(password=password) + + def get_coldkeypub(self, password: str = None) -> "bittensor.Keypair": + """ + Gets the coldkeypub from the wallet. + + Args: + password (str, optional): The password to decrypt the coldkeypub. Defaults to None. + + Returns: + bittensor.Keypair: The coldkeypub keypair. + """ + return self.coldkeypub_file.get_keypair(password=password) + + @property + def hotkey(self) -> "bittensor.Keypair": + r"""Loads the hotkey from wallet.path/wallet.name/hotkeys/wallet.hotkey or raises an error. + Returns: + hotkey (Keypair): + hotkey loaded from config arguments. + Raises: + KeyFileError: Raised if the file is corrupt of non-existent. + CryptoKeyError: Raised if the user enters an incorrec password for an encrypted keyfile. + """ + if self._hotkey == None: + self._hotkey = self.hotkey_file.keypair + return self._hotkey + + @property + def coldkey(self) -> "bittensor.Keypair": + r"""Loads the hotkey from wallet.path/wallet.name/coldkey or raises an error. + Returns: + coldkey (Keypair): + colkey loaded from config arguments. + Raises: + KeyFileError: Raised if the file is corrupt of non-existent. + CryptoKeyError: Raised if the user enters an incorrec password for an encrypted keyfile. + """ + if self._coldkey == None: + self._coldkey = self.coldkey_file.keypair + return self._coldkey + + @property + def coldkeypub(self) -> "bittensor.Keypair": + r"""Loads the coldkeypub from wallet.path/wallet.name/coldkeypub.txt or raises an error. + Returns: + coldkeypub (Keypair): + colkeypub loaded from config arguments. + Raises: + KeyFileError: Raised if the file is corrupt of non-existent. + CryptoKeyError: Raised if the user enters an incorrect password for an encrypted keyfile. + """ + if self._coldkeypub == None: + self._coldkeypub = self.coldkeypub_file.keypair + return self._coldkeypub + + def create_coldkey_from_uri( + self, + uri: str, + use_password: bool = True, + overwrite: bool = False, + suppress: bool = False, + ) -> "wallet": + """Creates coldkey from suri string, optionally encrypts it with the user's inputed password. + Args: + uri: (str, required): + URI string to use i.e. /Alice or /Bob + use_password (bool, optional): + Is the created key password protected. + overwrite (bool, optional): + Will this operation overwrite the coldkey under the same path //coldkey + Returns: + wallet (bittensor.wallet): + this object with newly created coldkey. + """ + keypair = Keypair.create_from_uri(uri) + if not suppress: + display_mnemonic_msg(keypair, "coldkey") + self.set_coldkey(keypair, encrypt=use_password, overwrite=overwrite) + self.set_coldkeypub(keypair, overwrite=overwrite) + return self + + def create_hotkey_from_uri( + self, + uri: str, + use_password: bool = False, + overwrite: bool = False, + suppress: bool = False, + ) -> "wallet": + """Creates hotkey from suri string, optionally encrypts it with the user's inputed password. + Args: + uri: (str, required): + URI string to use i.e. /Alice or /Bob + use_password (bool, optional): + Is the created key password protected. + overwrite (bool, optional): + Will this operation overwrite the hotkey under the same path //hotkeys/ + Returns: + wallet (bittensor.wallet): + this object with newly created hotkey. + """ + keypair = Keypair.create_from_uri(uri) + if not suppress: + display_mnemonic_msg(keypair, "hotkey") + self.set_hotkey(keypair, encrypt=use_password, overwrite=overwrite) + return self + + def new_coldkey( + self, + n_words: int = 12, + use_password: bool = True, + overwrite: bool = False, + suppress: bool = False, + ) -> "wallet": + """Creates a new coldkey, optionally encrypts it with the user's inputed password and saves to disk. + Args: + n_words: (int, optional): + Number of mnemonic words to use. + use_password (bool, optional): + Is the created key password protected. + overwrite (bool, optional): + Will this operation overwrite the coldkey under the same path //coldkey + Returns: + wallet (bittensor.wallet): + this object with newly created coldkey. + """ + self.create_new_coldkey(n_words, use_password, overwrite, suppress) + + def create_new_coldkey( + self, + n_words: int = 12, + use_password: bool = True, + overwrite: bool = False, + suppress: bool = False, + ) -> "wallet": + """Creates a new coldkey, optionally encrypts it with the user's inputed password and saves to disk. + Args: + n_words: (int, optional): + Number of mnemonic words to use. + use_password (bool, optional): + Is the created key password protected. + overwrite (bool, optional): + Will this operation overwrite the coldkey under the same path //coldkey + Returns: + wallet (bittensor.wallet): + this object with newly created coldkey. + """ + mnemonic = Keypair.generate_mnemonic(n_words) + keypair = Keypair.create_from_mnemonic(mnemonic) + if not suppress: + display_mnemonic_msg(keypair, "coldkey") + self.set_coldkey(keypair, encrypt=use_password, overwrite=overwrite) + self.set_coldkeypub(keypair, overwrite=overwrite) + return self + + def new_hotkey( + self, + n_words: int = 12, + use_password: bool = False, + overwrite: bool = False, + suppress: bool = False, + ) -> "wallet": + """Creates a new hotkey, optionally encrypts it with the user's inputed password and saves to disk. + Args: + n_words: (int, optional): + Number of mnemonic words to use. + use_password (bool, optional): + Is the created key password protected. + overwrite (bool, optional): + Will this operation overwrite the hotkey under the same path //hotkeys/ + Returns: + wallet (bittensor.wallet): + this object with newly created hotkey. + """ + self.create_new_hotkey(n_words, use_password, overwrite, suppress) + + def create_new_hotkey( + self, + n_words: int = 12, + use_password: bool = False, + overwrite: bool = False, + suppress: bool = False, + ) -> "wallet": + """Creates a new hotkey, optionally encrypts it with the user's inputed password and saves to disk. + Args: + n_words: (int, optional): + Number of mnemonic words to use. + use_password (bool, optional): + Is the created key password protected. + overwrite (bool, optional): + Will this operation overwrite the hotkey under the same path //hotkeys/ + Returns: + wallet (bittensor.wallet): + this object with newly created hotkey. + """ + mnemonic = Keypair.generate_mnemonic(n_words) + keypair = Keypair.create_from_mnemonic(mnemonic) + if not suppress: + display_mnemonic_msg(keypair, "hotkey") + self.set_hotkey(keypair, encrypt=use_password, overwrite=overwrite) + return self + + def regenerate_coldkeypub( + self, + ss58_address: Optional[str] = None, + public_key: Optional[Union[str, bytes]] = None, + overwrite: bool = False, + suppress: bool = False, + ) -> "wallet": + """Regenerates the coldkeypub from passed ss58_address or public_key and saves the file + Requires either ss58_address or public_key to be passed. + Args: + ss58_address: (str, optional): + Address as ss58 string. + public_key: (str | bytes, optional): + Public key as hex string or bytes. + overwrite (bool, optional) (default: False): + Will this operation overwrite the coldkeypub (if exists) under the same path //coldkeypub + Returns: + wallet (bittensor.wallet): + newly re-generated Wallet with coldkeypub. + + """ + if ss58_address is None and public_key is None: + raise ValueError("Either ss58_address or public_key must be passed") + + if not is_valid_bittensor_address_or_public_key( + ss58_address if ss58_address is not None else public_key + ): + raise ValueError( + f"Invalid {'ss58_address' if ss58_address is not None else 'public_key'}" + ) + + if ss58_address is not None: + ss58_format = bittensor.utils.get_ss58_format(ss58_address) + keypair = Keypair( + ss58_address=ss58_address, + public_key=public_key, + ss58_format=ss58_format, + ) + else: + keypair = Keypair( + ss58_address=ss58_address, + public_key=public_key, + ss58_format=bittensor.__ss58_format__, + ) + + # No need to encrypt the public key + self.set_coldkeypub(keypair, overwrite=overwrite) + + return self + + # Short name for regenerate_coldkeypub + regen_coldkeypub = regenerate_coldkeypub + + @overload + def regenerate_coldkey( + self, + mnemonic: Optional[Union[list, str]] = None, + use_password: bool = True, + overwrite: bool = False, + suppress: bool = False, + ) -> "wallet": + ... + + @overload + def regenerate_coldkey( + self, + seed: Optional[str] = None, + use_password: bool = True, + overwrite: bool = False, + suppress: bool = False, + ) -> "wallet": + ... + + @overload + def regenerate_coldkey( + self, + json: Optional[Tuple[Union[str, Dict], str]] = None, + use_password: bool = True, + overwrite: bool = False, + suppress: bool = False, + ) -> "wallet": + ... + + def regenerate_coldkey( + self, + use_password: bool = True, + overwrite: bool = False, + suppress: bool = False, + **kwargs, + ) -> "wallet": + """Regenerates the coldkey from passed mnemonic, seed, or json encrypts it with the user's password and saves the file + Args: + mnemonic: (Union[list, str], optional): + Key mnemonic as list of words or string space separated words. + seed: (str, optional): + Seed as hex string. + json: (Tuple[Union[str, Dict], str], optional): + Restore from encrypted JSON backup as (json_data: Union[str, Dict], passphrase: str) + use_password (bool, optional): + Is the created key password protected. + overwrite (bool, optional): + Will this operation overwrite the coldkey under the same path //coldkey + Returns: + wallet (bittensor.wallet): + this object with newly created coldkey. + + Note: uses priority order: mnemonic > seed > json + """ + if len(kwargs) == 0: + raise ValueError("Must pass either mnemonic, seed, or json") + + # Get from kwargs + mnemonic = kwargs.get("mnemonic", None) + seed = kwargs.get("seed", None) + json = kwargs.get("json", None) + + if mnemonic is None and seed is None and json is None: + raise ValueError("Must pass either mnemonic, seed, or json") + if mnemonic is not None: + if isinstance(mnemonic, str): + mnemonic = mnemonic.split() + if len(mnemonic) not in [12, 15, 18, 21, 24]: + raise ValueError( + "Mnemonic has invalid size. This should be 12,15,18,21 or 24 words" + ) + keypair = Keypair.create_from_mnemonic( + " ".join(mnemonic), ss58_format=bittensor.__ss58_format__ + ) + if not suppress: + display_mnemonic_msg(keypair, "coldkey") + elif seed is not None: + keypair = Keypair.create_from_seed( + seed, ss58_format=bittensor.__ss58_format__ + ) + else: + # json is not None + if ( + not isinstance(json, tuple) + or len(json) != 2 + or not isinstance(json[0], (str, dict)) + or not isinstance(json[1], str) + ): + raise ValueError( + "json must be a tuple of (json_data: str | Dict, passphrase: str)" + ) + + json_data, passphrase = json + keypair = Keypair.create_from_encrypted_json( + json_data, passphrase, ss58_format=bittensor.__ss58_format__ + ) + + self.set_coldkey(keypair, encrypt=use_password, overwrite=overwrite) + self.set_coldkeypub(keypair, overwrite=overwrite) + return self + + # Short name for regenerate_coldkey + regen_coldkey = regenerate_coldkey + + @overload + def regenerate_hotkey( + self, + mnemonic: Optional[Union[list, str]] = None, + use_password: bool = True, + overwrite: bool = False, + suppress: bool = False, + ) -> "wallet": + ... + + @overload + def regenerate_hotkey( + self, + seed: Optional[str] = None, + use_password: bool = True, + overwrite: bool = False, + suppress: bool = False, + ) -> "wallet": + ... + + @overload + def regenerate_hotkey( + self, + json: Optional[Tuple[Union[str, Dict], str]] = None, + use_password: bool = True, + overwrite: bool = False, + suppress: bool = False, + ) -> "wallet": + ... + + def regenerate_hotkey( + self, + use_password: bool = True, + overwrite: bool = False, + suppress: bool = False, + **kwargs, + ) -> "wallet": + """Regenerates the hotkey from passed mnemonic, encrypts it with the user's password and save the file + Args: + mnemonic: (Union[list, str], optional): + Key mnemonic as list of words or string space separated words. + seed: (str, optional): + Seed as hex string. + json: (Tuple[Union[str, Dict], str], optional): + Restore from encrypted JSON backup as (json_data: Union[str, Dict], passphrase: str) + use_password (bool, optional): + Is the created key password protected. + overwrite (bool, optional): + Will this operation overwrite the hotkey under the same path //hotkeys/ + Returns: + wallet (bittensor.wallet): + this object with newly created hotkey. + """ + if len(kwargs) == 0: + raise ValueError("Must pass either mnemonic, seed, or json") + + # Get from kwargs + mnemonic = kwargs.get("mnemonic", None) + seed = kwargs.get("seed", None) + json = kwargs.get("json", None) + + if mnemonic is None and seed is None and json is None: + raise ValueError("Must pass either mnemonic, seed, or json") + if mnemonic is not None: + if isinstance(mnemonic, str): + mnemonic = mnemonic.split() + if len(mnemonic) not in [12, 15, 18, 21, 24]: + raise ValueError( + "Mnemonic has invalid size. This should be 12,15,18,21 or 24 words" + ) + keypair = Keypair.create_from_mnemonic( + " ".join(mnemonic), ss58_format=bittensor.__ss58_format__ + ) + if not suppress: + display_mnemonic_msg(keypair, "hotkey") + elif seed is not None: + keypair = Keypair.create_from_seed( + seed, ss58_format=bittensor.__ss58_format__ + ) + else: + # json is not None + if ( + not isinstance(json, tuple) + or len(json) != 2 + or not isinstance(json[0], (str, dict)) + or not isinstance(json[1], str) + ): + raise ValueError( + "json must be a tuple of (json_data: str | Dict, passphrase: str)" + ) + + json_data, passphrase = json + keypair = Keypair.create_from_encrypted_json( + json_data, passphrase, ss58_format=bittensor.__ss58_format__ + ) + + self.set_hotkey(keypair, encrypt=use_password, overwrite=overwrite) + return self + + # Short name for regenerate_hotkey + regen_hotkey = regenerate_hotkey diff --git a/scripts/install.sh b/scripts/install.sh index 3f4a91edd1..7fce9f9186 100755 --- a/scripts/install.sh +++ b/scripts/install.sh @@ -103,8 +103,6 @@ linux_install_bittensor() { ohai "Cloning bittensor@master into ~/.bittensor/bittensor" mkdir -p ~/.bittensor/bittensor git clone https://github.com/opentensor/bittensor.git ~/.bittensor/bittensor/ 2> /dev/null || (cd ~/.bittensor/bittensor/ ; git fetch origin master ; git checkout master ; git pull --ff-only ; git reset --hard ; git clean -xdf) - git submodule sync - git submodule update --init ohai "Installing bittensor" $python -m pip install -e ~/.bittensor/bittensor/ exit_on_error $? @@ -167,8 +165,6 @@ mac_update_pip() { mac_install_bittensor() { ohai "Cloning bittensor@text_prompting into ~/.bittensor/bittensor" git clone https://github.com/opentensor/bittensor.git ~/.bittensor/bittensor/ 2> /dev/null || (cd ~/.bittensor/bittensor/ ; git fetch origin master ; git checkout master ; git pull --ff-only ; git reset --hard; git clean -xdf) - git submodule sync - git submodule update --init ohai "Installing bittensor" $python -m pip install -e ~/.bittensor/bittensor/ exit_on_error $? diff --git a/tests/unit_tests/test_keyfile.py b/tests/unit_tests/test_keyfile.py index 4ac87b8afe..49373bfa3c 100644 --- a/tests/unit_tests/test_keyfile.py +++ b/tests/unit_tests/test_keyfile.py @@ -325,94 +325,94 @@ def tearDown(self) -> None: shutil.rmtree(self.root_path) def create_keyfile(self): - Keyfile = bittensor.Keyfile(path=os.path.join(self.root_path, "Keyfile")) + keyfile = bittensor.keyfile(path=os.path.join(self.root_path, "keyfile")) mnemonic = bittensor.Keypair.generate_mnemonic(12) alice = bittensor.Keypair.create_from_mnemonic(mnemonic) - Keyfile.set_keypair( + keyfile.set_keypair( alice, encrypt=True, overwrite=True, password="thisisafakepassword" ) bob = bittensor.Keypair.create_from_uri("/Bob") - Keyfile.set_keypair( + keyfile.set_keypair( bob, encrypt=True, overwrite=True, password="thisisafakepassword" ) - return Keyfile + return keyfile def test_create(self): - Keyfile = bittensor.Keyfile(path=os.path.join(self.root_path, "Keyfile")) + keyfile = bittensor.keyfile(path=os.path.join(self.root_path, "keyfile")) mnemonic = bittensor.Keypair.generate_mnemonic(12) alice = bittensor.Keypair.create_from_mnemonic(mnemonic) - Keyfile.set_keypair( + keyfile.set_keypair( alice, encrypt=True, overwrite=True, password="thisisafakepassword" ) - assert Keyfile.is_readable() - assert Keyfile.is_writable() - assert Keyfile.is_encrypted() - Keyfile.decrypt(password="thisisafakepassword") - assert not Keyfile.is_encrypted() - Keyfile.encrypt(password="thisisafakepassword") - assert Keyfile.is_encrypted() - str(Keyfile) - Keyfile.decrypt(password="thisisafakepassword") - assert not Keyfile.is_encrypted() - str(Keyfile) + assert keyfile.is_readable() + assert keyfile.is_writable() + assert keyfile.is_encrypted() + keyfile.decrypt(password="thisisafakepassword") + assert not keyfile.is_encrypted() + keyfile.encrypt(password="thisisafakepassword") + assert keyfile.is_encrypted() + str(keyfile) + keyfile.decrypt(password="thisisafakepassword") + assert not keyfile.is_encrypted() + str(keyfile) assert ( - Keyfile.get_keypair(password="thisisafakepassword").ss58_address + keyfile.get_keypair(password="thisisafakepassword").ss58_address == alice.ss58_address ) assert ( - Keyfile.get_keypair(password="thisisafakepassword").private_key + keyfile.get_keypair(password="thisisafakepassword").private_key == alice.private_key ) assert ( - Keyfile.get_keypair(password="thisisafakepassword").public_key + keyfile.get_keypair(password="thisisafakepassword").public_key == alice.public_key ) bob = bittensor.Keypair.create_from_uri("/Bob") - Keyfile.set_keypair( + keyfile.set_keypair( bob, encrypt=True, overwrite=True, password="thisisafakepassword" ) assert ( - Keyfile.get_keypair(password="thisisafakepassword").ss58_address + keyfile.get_keypair(password="thisisafakepassword").ss58_address == bob.ss58_address ) assert ( - Keyfile.get_keypair(password="thisisafakepassword").public_key + keyfile.get_keypair(password="thisisafakepassword").public_key == bob.public_key ) - repr(Keyfile) + repr(keyfile) def test_legacy_coldkey(self): legacy_filename = os.path.join(self.root_path, "coldlegacy_keyfile") - Keyfile = bittensor.Keyfile(path=legacy_filename) - Keyfile.make_dirs() + keyfile = bittensor.keyfile(path=legacy_filename) + keyfile.make_dirs() keyfile_data = ( b"0x32939b6abc4d81f02dff04d2b8d1d01cc8e71c5e4c7492e4fa6a238cdca3512f" ) with open(legacy_filename, "wb") as keyfile_obj: keyfile_obj.write(keyfile_data) - assert Keyfile.keyfile_data == keyfile_data - Keyfile.encrypt(password="this is the fake password") - Keyfile.decrypt(password="this is the fake password") + assert keyfile.keyfile_data == keyfile_data + keyfile.encrypt(password="this is the fake password") + keyfile.decrypt(password="this is the fake password") keypair_bytes = b'{"accountId": "0x32939b6abc4d81f02dff04d2b8d1d01cc8e71c5e4c7492e4fa6a238cdca3512f", "publicKey": "0x32939b6abc4d81f02dff04d2b8d1d01cc8e71c5e4c7492e4fa6a238cdca3512f", "secretPhrase": null, "secretSeed": null, "ss58Address": "5DD26kC2kxajmwfbbZmVmxhrY9VeeyR1Gpzy9i8wxLUg6zxm"}' - assert Keyfile.keyfile_data == keypair_bytes + assert keyfile.keyfile_data == keypair_bytes assert ( - Keyfile.get_keypair().ss58_address + keyfile.get_keypair().ss58_address == "5DD26kC2kxajmwfbbZmVmxhrY9VeeyR1Gpzy9i8wxLUg6zxm" ) assert ( - "0x" + Keyfile.get_keypair().public_key.hex() + "0x" + keyfile.get_keypair().public_key.hex() == "0x32939b6abc4d81f02dff04d2b8d1d01cc8e71c5e4c7492e4fa6a238cdca3512f" ) def test_validate_password(self): - from bittensor._wallet._keyfile import validate_password + from bittensor.keyfile import validate_password assert validate_password(None) == False assert validate_password("passw0rd") == False @@ -430,7 +430,7 @@ def test_decrypt_keyfile_data_legacy(self): from cryptography.hazmat.primitives import hashes from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC - from bittensor._wallet._keyfile import decrypt_keyfile_data + from bittensor.keyfile import decrypt_keyfile_data __SALT = b"Iguesscyborgslikemyselfhaveatendencytobeparanoidaboutourorigins" @@ -455,7 +455,7 @@ def __generate_key(password): assert decrypted_data == data def test_user_interface(self): - from bittensor._wallet._keyfile import ask_password_to_encrypt + from bittensor.keyfile import ask_password_to_encrypt with mock.patch( "getpass.getpass", @@ -464,15 +464,15 @@ def test_user_interface(self): assert ask_password_to_encrypt() == "asdury3294y" def test_overwriting(self): - Keyfile = bittensor.Keyfile(path=os.path.join(self.root_path, "Keyfile")) + keyfile = bittensor.keyfile(path=os.path.join(self.root_path, "keyfile")) alice = bittensor.Keypair.create_from_uri("/Alice") - Keyfile.set_keypair( + keyfile.set_keypair( alice, encrypt=True, overwrite=True, password="thisisafakepassword" ) bob = bittensor.Keypair.create_from_uri("/Bob") with pytest.raises(bittensor.KeyFileError) as pytest_wrapped_e: with mock.patch("builtins.input", return_value="n"): - Keyfile.set_keypair( + keyfile.set_keypair( bob, encrypt=True, overwrite=False, password="thisisafakepassword" ) diff --git a/tests/unit_tests/test_wallet.py b/tests/unit_tests/test_wallet.py index 714fb632eb..de008e956c 100644 --- a/tests/unit_tests/test_wallet.py +++ b/tests/unit_tests/test_wallet.py @@ -44,7 +44,7 @@ def test_regen_coldkeypub_from_ss58_addr(self): ss58_address = "5DD26kC2kxajmwfbbZmVmxhrY9VeeyR1Gpzy9i8wxLUg6zxm" with patch.object(self.mock_wallet, "set_coldkeypub") as mock_set_coldkeypub: self.mock_wallet.regenerate_coldkeypub( - ss58_address=ss58_address, overwrite=True + ss58_address=ss58_address, overwrite=True, suppress=True ) mock_set_coldkeypub.assert_called_once() @@ -56,7 +56,7 @@ def test_regen_coldkeypub_from_ss58_addr(self): ) with pytest.raises(ValueError): self.mock_wallet.regenerate_coldkeypub( - ss58_address=ss58_address_bad, overwrite=True + ss58_address=ss58_address_bad, overwrite=True, suppress=True ) def test_regen_coldkeypub_from_hex_pubkey_str(self): @@ -69,7 +69,7 @@ def test_regen_coldkeypub_from_hex_pubkey_str(self): ) with patch.object(self.mock_wallet, "set_coldkeypub") as mock_set_coldkeypub: self.mock_wallet.regenerate_coldkeypub( - public_key=pubkey_str, overwrite=True + public_key=pubkey_str, overwrite=True, suppress=True ) mock_set_coldkeypub.assert_called_once() @@ -79,7 +79,7 @@ def test_regen_coldkeypub_from_hex_pubkey_str(self): pubkey_str_bad = "0x32939b6abc4d81f02dff04d2b8d1d01cc8e71c5e4c7492e4fa6a238cdca3512" # 1 character short with pytest.raises(ValueError): self.mock_wallet.regenerate_coldkeypub( - ss58_address=pubkey_str_bad, overwrite=True + ss58_address=pubkey_str_bad, overwrite=True, suppress=True ) def test_regen_coldkeypub_from_hex_pubkey_bytes(self): @@ -92,7 +92,7 @@ def test_regen_coldkeypub_from_hex_pubkey_bytes(self): pubkey_bytes = bytes.fromhex(pubkey_str[2:]) # Remove 0x from beginning with patch.object(self.mock_wallet, "set_coldkeypub") as mock_set_coldkeypub: self.mock_wallet.regenerate_coldkeypub( - public_key=pubkey_bytes, overwrite=True + public_key=pubkey_bytes, overwrite=True, suppress=True ) mock_set_coldkeypub.assert_called_once() @@ -106,7 +106,7 @@ def test_regen_coldkeypub_no_pubkey(self): with pytest.raises(ValueError): # Must provide either public_key or ss58_address self.mock_wallet.regenerate_coldkeypub( - ss58_address=None, public_key=None, overwrite=True + ss58_address=None, public_key=None, overwrite=True, suppress=True ) def test_regen_coldkey_from_hex_seed_str(self):