diff --git a/Makefile b/Makefile index ffbb910..00d99d0 100644 --- a/Makefile +++ b/Makefile @@ -1,13 +1,44 @@ -init: - pip install -r requirements.txt - pip install -r requirements.test.txt +ROOT_DIR := $(shell dirname $(realpath $(lastword $(MAKEFILE_LIST)))) +VENV_CMD := python3 -m venv +VENV_DIR := $(ROOT_DIR)/.venv +VENV_BIN := $(VENV_DIR)/bin +ISORT_BIN := $(VENV_BIN)/isort +PYBLACK_BIN := $(VENV_BIN)/black +PYTEST := $(VENV_BIN)/pytest + +DRY ?= true +ifeq ($(DRY),false) + ISORT := $(ISORT_BIN) + PYBLACK := $(PYBLACK_BIN) +else + ISORT := $(ISORT_BIN) --diff --check + PYBLACK := $(PYBLACK_BIN) --diff --check +endif + +lint: venv + $(ISORT) code_crypt/ + $(PYBLACK) code_crypt/ + +test: venv + $(PYTEST) install: python setup.py install -lint: - pycodestyle code_crypt/ - pyflakes code_crypt/ +############################################################################### +# Development Environment Setup +############################################################################### +venv: $(VENV_DIR) + +$(VENV_BIN)/activate: + $(VENV_CMD) $(VENV_DIR) + +$(VENV_DIR): $(VENV_BIN)/activate requirements.txt requirements.test.txt + $(VENV_BIN)/python3 -m pip install -U pip && \ + $(VENV_BIN)/pip install -U setuptools wheel && \ + $(VENV_BIN)/pip install -r requirements.test.txt && \ + $(VENV_BIN)/pip install -U -r requirements.txt && \ + touch $(VENV_DIR) -test: - nosetests +clean: + find code_crypt -type f -name '*.pyc' -exec rm {} \; diff --git a/README.md b/README.md index 81df7f7..51033f5 100644 --- a/README.md +++ b/README.md @@ -1,16 +1,12 @@ -[![CircleCI](https://circleci.com/gh/Nextdoor/code-crypt.svg)](https://circleci.com/gh/Nextdoor/code-crypt) - # Code Crypt -Code Crypt provides a simple Python library and command line interface to +Code Crypt provides a simple Python library and command line interface to manage your application secrets within a project repository. Master keys are -managed externally by the AWS Key Management Service (KMS), which perform +managed externally by the AWS Key Management Service (KMS), which perform envelope encryption on a RSA private key used for decrypts on individual -secrets within a particular environment context. Encrypted secrets are kept +secrets within a particular environment context. Encrypted secrets are kept as binary files within the project folder using hybrid RSA-AES cryptopgraphy. -It supports Python 2.6 or newer and all Python 3 versions. - ## Features - Self-serve for project contributors @@ -18,97 +14,99 @@ It supports Python 2.6 or newer and all Python 3 versions. - CRUD operations on a per-secret basis - Environment contexts (development, staging, production) -## Installation - -0) Clone the project locally. -1) `virtualenv .venv && source .venv/bin/activate` -2) `make init` -3) `make install` -4) Test: `code-crypt --version` - ## Initialize Project -We have a project `my_project` that we'd like to initialize with 3 different -environment contexts (`development`, `staging` and `production`) with their own +We have a project `my_project` that we'd like to initialize with 3 different +environment contexts (`development`, `staging` and `production`) with their own KMS master keys. - $ APP_ROOT=/Users/bob/my_project code-crypt --env development --init --kms-key-id aaaaaaaa-bbbb-cccc-dddd-123456111111 - $ APP_ROOT=/Users/bob/my_project code-crypt --env staging --init --kms-key-id eeeeeeee-ffff-gggg-hhhh-123456222222 - $ APP_ROOT=/Users/bob/my_project code-crypt --env production --init --kms-key-id iiiiiiii-jjjj-kkkk-llll-123456333333 - -This will initialize the project folder with a data directory of the following +``` +$ APP_ROOT=/Users/bob/my_project code-crypt --env development --init --kms-key-id aaaaaaaa-bbbb-cccc-dddd-123456111111 +$ APP_ROOT=/Users/bob/my_project code-crypt --env staging --init --kms-key-id eeeeeeee-ffff-gggg-hhhh-123456222222 +$ APP_ROOT=/Users/bob/my_project code-crypt --env production --init --kms-key-id iiiiiiii-jjjj-kkkk-llll-123456333333 +``` + +This will initialize the project folder with a data directory of the following structure: - $ pwd - /Users/bob/my_project - $ tree - . - └── code_crypt - └── data - ├── keys - │   ├── development - │   │   ├── encrypted_private_key.pem - │   │   └── public_key.asc - │   ├── production - │   │   ├── encrypted_private_key.pem - │   │   └── public_key.asc - │   └── staging - │   ├── encrypted_private_key.pem - │   └── public_key.asc - └── secrets - ├── development - ├── production - └── staging - -(Note: `--env` defaults to `development` and won't be explicitly used in this +``` +$ pwd +/Users/bob/my_project +$ tree +. +└── code_crypt + └── data + ├── keys + │   ├── development + │   │   ├── encrypted_private_key.pem + │   │   └── public_key.asc + │   ├── production + │   │   ├── encrypted_private_key.pem + │   │   └── public_key.asc + │   └── staging + │   ├── encrypted_private_key.pem + │   └── public_key.asc + └── secrets + ├── development + ├── production + └── staging +``` + +(Note: `--env` defaults to `development` and won't be explicitly used in this guide going forward.) ## Encrypt Secrets Single secrets can be encrypted with `--encrypt` option. - $ APP_ROOT=/Users/bob/my_project code-crypt --encrypt SOME_SECRET='a1b2c3' - -In this case an encrypted binary file would be created at +``` +$ APP_ROOT=/Users/bob/my_project code-crypt --encrypt SOME_SECRET='a1b2c3' +``` + +In this case an encrypted binary file would be created at `code_crypt/data/secrets/development/SOME_SECRET.bin`. - + ## Decrypt Secrets (CLI) -Single secrets can be decrypted with `--decrypt` option which returns a +Single secrets can be decrypted with `--decrypt` option which returns a plaintext value. - $ APP_ROOT=/Users/bob/my_project code-crypt --decrypt SOME_SECRET - a1b2c3 - -Multiple secrets can be decrypted with the `--decrypt-all` option which returns -a JSON string -of key-value pairs. +``` +$ APP_ROOT=/Users/bob/my_project code-crypt --decrypt SOME_SECRETa1b2c3 +``` + +Multiple secrets can be decrypted with the `--decrypt-all` option which returns +a JSON string of key-value pairs. - $ APP_ROOT=/Users/bob/my_project code-crypt --decrypt-all - { - "SOME_SECRET": "a1b2c3" - } +``` +$ APP_ROOT=/Users/bob/my_project code-crypt --decrypt-all +{ + "SOME_SECRET": "a1b2c3" +} +``` ## Decrypt Secrets (Application) -Prerequisite: Grant your application run-time authentication to its environment's respective +Prerequisite: Grant your application run-time authentication to its environment's respective KMS master key. Create a Code Crypt object and run the `decrypt()` function. - from code_crypt import core as code_crypt +```python +from code_crypt import core as code_crypt - CC = code_crypt.CodeCrypt(app_root=MY_APP_ROOT, env=MY_ENV) - CC_SECRETS = CC.decrypt() - -The resulting `CC_SECRETS` object is a dict of decrypted secret key-value pairs. +CC = code_crypt.CodeCrypt(app_root=MY_APP_ROOT, env=MY_ENV) +CC_SECRETS = CC.decrypt() +``` +The resulting `CC_SECRETS` object is a dict of decrypted secret key-value pairs. # Developer Setup If you are interested in working on the codebase, setting up your development environment is quick and easy. - $ virtualenv .venv - $ source .venv/bin/activate - $ pip install -r requirements.txt +```bash +$ make venv +$ source .venv/bin/activate +``` diff --git a/circle.yml b/circle.yml deleted file mode 100644 index 59f3406..0000000 --- a/circle.yml +++ /dev/null @@ -1,28 +0,0 @@ -version: 2 -jobs: - build: - docker: - - image: circleci/python:2.7 - steps: - - checkout - - run: - name: install python3 - command: | - sudo apt-get install -y python3 - - run: - name: install dependencies - command: | - virtualenv .venv - virtualenv -p python3 .venv3 - source .venv/bin/activate && make init - source .venv3/bin/activate && make init - - run: - name: run tests - command: | - source .venv/bin/activate && make test - source .venv3/bin/activate && make test - - run: - name: lint - command: | - source .venv/bin/activate && make lint - source .venv3/bin/activate && make lint diff --git a/code_crypt/__init__.py b/code_crypt/__init__.py index 9e149ae..ee436cc 100644 --- a/code_crypt/__init__.py +++ b/code_crypt/__init__.py @@ -12,4 +12,4 @@ # # Copyright 2017 Nextdoor.com, Inc -__author__ = 'Nehal Patel (nehal@nextdoor.com)' +__author__ = "Nehal Patel (nehal@nextdoor.com)" diff --git a/code_crypt/cli.py b/code_crypt/cli.py index 870eb51..bc11ab9 100755 --- a/code_crypt/cli.py +++ b/code_crypt/cli.py @@ -6,72 +6,64 @@ import sys from code_crypt import core as code_crypt -from code_crypt import defaults -from code_crypt import errors +from code_crypt import defaults, errors from code_crypt.metadata import __version__ def get_config(argv): - parser = argparse.ArgumentParser( - prog=argv[0], - description='Code Crypt') + parser = argparse.ArgumentParser(prog=argv[0], description="Code Crypt") - parser.add_argument( - "--kms-key-id", - help="the KMS key id of the master key.") + parser.add_argument("--kms-key-id", help="the KMS key id of the master key.") parser.add_argument( "--env", - help="the environment context for secrets from %s " - "Defaults to '%s'" % (str(defaults.ENV_TAGS), defaults.DEFAULT_ENV), - default=defaults.DEFAULT_ENV) + help=f"the environment context for secrets from {str(defaults.ENV_TAGS)} Defaults to '{defaults.DEFAULT_ENV}'", + default=defaults.DEFAULT_ENV, + ) action_group = parser.add_mutually_exclusive_group() action_group.add_argument( "--init", - help="initializes the repo with an asymmetric RSA key pair " - "('%s' is the default env)" % defaults.DEFAULT_ENV, - action="store_true") - action_group.add_argument( - "--decrypt", - help="decrypt and print a single secret") + help=f"initializes the repo with an asymmetric RSA key pair ('{defaults.DEFAULT_ENV}' is the default env)", + action="store_true", + ) + action_group.add_argument("--decrypt", help="decrypt and print a single secret") action_group.add_argument( - "--decrypt-all", - help="decrypt and print all secrets", - action="store_true") + "--decrypt-all", help="decrypt and print all secrets", action="store_true" + ) action_group.add_argument( - "--import-secrets", - help="imports and encrypts secrets from a json file") + "--import-secrets", help="imports and encrypts secrets from a json file" + ) action_group.add_argument( "--encrypt", help="encrypts a single secret (usage: " - "--encrypt SOME_SECRET_NAME=some_secret_value)") + "--encrypt SOME_SECRET_NAME=some_secret_value)", + ) action_group.add_argument( "--blob-encrypt", help="encrypts a single secret and returns an encrypted blob binary " - "(usage: --blob-encrypt some_secret_value)") + "(usage: --blob-encrypt some_secret_value)", + ) action_group.add_argument( "--blob-decrypt", help="decrypts an encrypted blob binary and returns a plaintext " - "secret (usage: --blob-decrypt some_code_crypt_secret_blob)") + "secret (usage: --blob-decrypt some_code_crypt_secret_blob)", + ) action_group.add_argument( - "-v", - "--version", - help="prints version", - action="store_true") + "-v", "--version", help="prints version", action="store_true" + ) log_group = parser.add_mutually_exclusive_group() log_group.add_argument( - "--verbose", - help="increase output verbosity", - action="store_true") + "--verbose", help="increase output verbosity", action="store_true" + ) log_group.add_argument( - "--debug", - help="enable full debug mode", - action="store_true") + "--debug", help="enable full debug mode", action="store_true" + ) log_group.add_argument( "--quiet", help="enable quiet mode, no logs will be outputted", - action="store_true") + action="store_true", + ) args = parser.parse_args(args=argv[1:]) return args @@ -90,17 +82,17 @@ def main(): logging.basicConfig() if config.version: - print("Code Crypt - " + str(__version__)) - return + print("Code Crypt - " + str(__version__)) + return if config.env not in defaults.ENV_TAGS: - print("Error: env must be set to one of the following: %s" % ( - str(defaults.ENV_TAGS))) + print( + f"Error: env must be set to one of the following: {str(defaults.ENV_TAGS)}" + ) exit(1) env = config.env - code_crypt_obj = code_crypt.CodeCrypt( - kms_key_id=config.kms_key_id, env=env) + code_crypt_obj = code_crypt.CodeCrypt(kms_key_id=config.kms_key_id, env=env) try: if config.init: @@ -118,19 +110,20 @@ def main(): if config.import_secrets: secrets_file = config.import_secrets try: - with open(secrets_file, 'r') as f: + with open(secrets_file, "r") as f: secrets_json = f.read() except IOError as e: - raise errors.InputError("secrets file '%s' does not exist" % ( - secrets_file)) + raise errors.InputError(f"secrets file '{secrets_file}' does not exist") code_crypt_obj.import_secrets(secrets_json) return if config.encrypt: - if '=' not in config.encrypt: - raise errors.InputError("provide a key value pair (usage: " - "--encrypt SOME_SECRET_NAME=some_" - "secret_value)") - keyval = config.encrypt.split('=', 1) + if "=" not in config.encrypt: + raise errors.InputError( + "provide a key value pair (usage: " + "--encrypt SOME_SECRET_NAME=some_" + "secret_value)" + ) + keyval = config.encrypt.split("=", 1) code_crypt_obj.encrypt(keyval[0], keyval[1]) return if config.blob_encrypt: @@ -148,5 +141,5 @@ def main(): exit(1) -if __name__ == '__main__': +if __name__ == "__main__": main() diff --git a/code_crypt/cli_test.py b/code_crypt/cli_test.py new file mode 100644 index 0000000..864cd95 --- /dev/null +++ b/code_crypt/cli_test.py @@ -0,0 +1,23 @@ +import unittest + +from code_crypt import cli + +KMS_KEY_ID = "aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee" + + +class CliTest(unittest.TestCase): + """This is a simple sanity check that argparser is setup properly""" + + def test_get_config_init(self): + + argv = ["cli.py", "--kms-key-id", KMS_KEY_ID, "--init"] + config = cli.get_config(argv) + self.assertTrue(config.init) + self.assertEqual(config.kms_key_id, KMS_KEY_ID) + + def test_get_config_encrypt(self): + encrypt_param = "TEST=value" + + argv = ["cli.py", "--encrypt", "TEST=value"] + config = cli.get_config(argv) + self.assertEqual(config.encrypt, encrypt_param) diff --git a/code_crypt/core.py b/code_crypt/core.py index 271c8eb..d08eb5a 100755 --- a/code_crypt/core.py +++ b/code_crypt/core.py @@ -1,39 +1,39 @@ """Module to encrypt and decrypt app secrets asymmetrically via KMS.""" -import boto3 import json import logging import os +from base64 import b64decode, b64encode -from base64 import b64encode, b64decode +import boto3 from botocore.client import Config - from cryptography.fernet import Fernet from cryptography.hazmat.backends import default_backend from cryptography.hazmat.primitives import serialization from cryptography.hazmat.primitives.asymmetric import rsa -from code_crypt import defaults -from code_crypt import errors +from code_crypt import defaults, errors log = logging.getLogger(__name__) class Decryptor: - '''Helper class which creates a decryptor object with default - padding.''' - def __init__(self, private_key_obj, padding=defaults.RSA_PADDING): - self.private_key_obj = private_key_obj - self.padding = padding + """Helper class which creates a decryptor object with default + padding.""" + + def __init__(self, private_key_obj, padding=defaults.RSA_PADDING): + self.private_key_obj = private_key_obj + self.padding = padding - def decrypt(self, data): - plaintext = self.private_key_obj.decrypt(data, self.padding) - return plaintext + def decrypt(self, data): + plaintext = self.private_key_obj.decrypt(data, self.padding) + return plaintext class Encryptor: - '''Helper class which creates a encryptor object with default - padding.''' + """Helper class which creates a encryptor object with default + padding.""" + def __init__(self, public_key_obj, padding=defaults.RSA_PADDING): self.public_key_obj = public_key_obj self.padding = padding @@ -44,17 +44,19 @@ def encrypt(self, data): class CodeCrypt: - '''CodeCrypt object which handles the setup of RSA and AES cryptgraphic - facilities, secret and key storange and communication with KMS.''' + """CodeCrypt object which handles the setup of RSA and AES cryptgraphic + facilities, secret and key storange and communication with KMS.""" + def __init__( - self, - kms_key_id=None, - aws_region=defaults.AWS_REGION, - app_root=defaults.APP_ROOT, - env=None, - ciphertext_ext=defaults.CIPHERTEXT_EXT, - encrypted_private_key_file=None, - rsa_key_size=defaults.RSA_KEY_SIZE): + self, + kms_key_id=None, + aws_region=defaults.AWS_REGION, + app_root=defaults.APP_ROOT, + env=None, + ciphertext_ext=defaults.CIPHERTEXT_EXT, + encrypted_private_key_file=None, + rsa_key_size=defaults.RSA_KEY_SIZE, + ): self.kms_key_id = kms_key_id self.ciphertext_ext = ciphertext_ext self.rsa_key_size = rsa_key_size @@ -70,35 +72,36 @@ def __init__( self._init_dirs(data_dir) self.encrypted_private_key_file = os.path.join( - self.environment_keys_dir, 'encrypted_private_key.pem') + self.environment_keys_dir, "encrypted_private_key.pem" + ) if encrypted_private_key_file: self.encrypted_private_key_file = encrypted_private_key_file - self.public_key_file = os.path.join( - self.environment_keys_dir, 'public_key.asc') + self.public_key_file = os.path.join(self.environment_keys_dir, "public_key.asc") - self.encryption_context = {'app_environment': self.app_environment} + self.encryption_context = {"app_environment": self.app_environment} self._init_kms_client(aws_region) self.encryptor = None self.decryptor = None def _init_kms_client(self, aws_region): - '''Initializes a kms boto3 client.''' + """Initializes a kms boto3 client.""" config = Config( region_name=aws_region, connect_timeout=5, read_timeout=10, - retries=dict(max_attempts=4)) + retries=dict(max_attempts=4), + ) - self.kms = boto3.client('kms', config=config) + self.kms = boto3.client("kms", config=config) def _init_dirs(self, data_dir): - '''Sets and create the Code Crypt data directory which contains secrets - and keys based on environment.''' - self.environment_keys_dir = os.path.join( - data_dir, 'keys', self.app_environment) + """Sets and create the Code Crypt data directory which contains secrets + and keys based on environment.""" + self.environment_keys_dir = os.path.join(data_dir, "keys", self.app_environment) self.environment_secrets_dir = os.path.join( - data_dir, 'secrets', self.app_environment) + data_dir, "secrets", self.app_environment + ) if not os.path.exists(self.environment_secrets_dir): os.makedirs(self.environment_secrets_dir) @@ -114,106 +117,99 @@ def _init_env(self, env): self.app_environment = defaults.DEFAULT_ENV - def _get_plaintext_private_key( - self, ciphertext_blob, encryption_context={}): - '''Return a plaintext RSA private key by performing a KMS decrypt on - the stored encrypted private key''' + def _get_plaintext_private_key(self, ciphertext_blob, encryption_context={}): + """Return a plaintext RSA private key by performing a KMS decrypt on + the stored encrypted private key""" try: response = self.kms.decrypt( CiphertextBlob=b64decode(ciphertext_blob), - EncryptionContext=encryption_context) + EncryptionContext=encryption_context, + ) except Exception as e: raise errors.KmsError( - "Could not decrypt private key using KMS " - "key '%s' (Reason: %s)" % (self.kms_key_id, str(e))) + f"Could not decrypt private key using KMS key '{self.kms_key_id}' (Reason: {str(e)})" + ) - return response[u'Plaintext'] + return response["Plaintext"] def _set_encryptor(self, public_key): - '''Creates a OAEP decryptor based on a RSA private key.''' + """Creates a OAEP decryptor based on a RSA private key.""" if self.encryptor is not None: return if not public_key: try: - with open(self.public_key_file, 'rb') as f: + with open(self.public_key_file, "rb") as f: public_key = f.read() except IOError as e: raise errors.InputError( - "public key '%s' does not exist." % self.public_key_file) + f"public key '{self.public_key_file}' does not exist." + ) try: public_key_obj = serialization.load_pem_public_key( - public_key, - backend=default_backend()) + public_key, backend=default_backend() + ) self.encryptor = Encryptor(public_key_obj) except Exception as e: - raise errors.EncryptorError( - "public key is malformed. (Reason: %s)" % (str(e))) - - def _set_decryptor( - self, - plaintext_private_key=None, - encrypted_private_key=None): - '''Creates a OAEP decryptor based on a RSA private key.''' + raise errors.EncryptorError(f"public key is malformed. (Reason: {str(e)})") + + def _set_decryptor(self, plaintext_private_key=None, encrypted_private_key=None): + """Creates a OAEP decryptor based on a RSA private key.""" if self.decryptor is not None: return if not plaintext_private_key: if not encrypted_private_key: try: - with open(self.encrypted_private_key_file, 'rb') as f: + with open(self.encrypted_private_key_file, "rb") as f: encrypted_private_key = f.read() except IOError as e: raise errors.InputError( - "private key '%s' does not exist." % ( - self.encrypted_private_key_file)) + f"private key '{self.encrypted_private_key_file}' does not exist." + ) plaintext_private_key = self._get_plaintext_private_key( - encrypted_private_key, - encryption_context=self.encryption_context) + encrypted_private_key, encryption_context=self.encryption_context + ) try: private_key_obj = serialization.load_pem_private_key( - plaintext_private_key, - password=None, - backend=default_backend()) + plaintext_private_key, password=None, backend=default_backend() + ) self.decryptor = Decryptor(private_key_obj) except Exception as e: - raise errors.DecryptorError( - "private key is malformed. (Reason: %s)" % (str(e))) + raise errors.DecryptorError(f"private key is malformed. (Reason: {str(e)})") def _validate_secret(self, secret_name, secret, blob_mode): if not blob_mode: # secret must have a value if len(secret_name) == 0: raise errors.InputError( - "secret name '%s' must be greather than length of 0" % ( - secret_name)) + f"secret name '{secret_name}' must be greather than length of 0" + ) # secret name can only contain ascii characters if not self._is_ascii(secret_name): raise errors.InputError( - "secret name '%s' must contain only ASCII chararacters" % ( - secret_name)) + f"secret name '{secret_name}' must contain only ASCII chararacters" + ) # warn when secret value contains non-ascii chars if not self._is_ascii(secret): - log.warn("Secret '%s' contains non-ASCII characters!" % ( - secret_name)) + log.warning("Secret '%s' contains non-ASCII characters!", secret_name) # warn when secret value contains escape characters (e.g. "\r") if "\\" in secret: - log.warn("Secret '%s' contains escape sequence characters!" % ( - secret_name)) + log.warning("Secret '%s' contains escape sequence characters!", secret_name) def _encrypt(self, secret_name, secret, blob_mode=False): - '''Encrypt a single secret name and value pair into the data - directory by environment.''' + """Encrypt a single secret name and value pair into the data + directory by environment.""" self._validate_secret(secret_name, secret, blob_mode) - secret = secret.encode('utf-8') + secret = secret.encode("utf-8") ciphertext_bin = self._encrypt_with_aes_session_key(secret) ciphertext_bin_b64 = b64encode(ciphertext_bin) @@ -225,13 +221,13 @@ def _encrypt(self, secret_name, secret, blob_mode=False): filename = secret_name + self.ciphertext_ext secret_filepath = os.path.join(self.environment_secrets_dir, filename) - with open(secret_filepath, 'wb') as f: + with open(secret_filepath, "wb") as f: f.write(ciphertext_bin_b64) def _encrypt_with_aes_session_key(self, secret): - '''Creates a AES-CBC 128 bit session key and to encrypt secrets with + """Creates a AES-CBC 128 bit session key and to encrypt secrets with and packages that session key (encrypted with the RSA public key) - along with the ciphertext as a binary.''' + along with the ciphertext as a binary.""" session_key = Fernet.generate_key() encrypted_session_key = self.encryptor.encrypt(session_key) @@ -243,14 +239,14 @@ def _encrypt_with_aes_session_key(self, secret): return ciphertext_bin def _encrypt_all_secrets(self, secrets_json): - '''Helper function to encrypt all secrets from a JSON string''' + """Helper function to encrypt all secrets from a JSON string""" try: secrets = json.loads(secrets_json) except ValueError: - raise errors.InputError('input is not valid JSON') + raise errors.InputError("input is not valid JSON") for secret_name, secret in secrets.items(): - log.info(secret_name) + log.info("%s", secret_name) self._encrypt(secret_name, secret) def _convert_blob_to_bin(self, secret_blob): @@ -262,18 +258,19 @@ def _convert_blob_to_bin(self, secret_blob): if self.offset > len(ciphertext_bin): raise errors.InputError( "RSA ciphertext length is larger than the " - "secret ciphertext binary length") + "secret ciphertext binary length" + ) return ciphertext_bin def _decrypt_secret_blob(self, secret_blob): - '''Decrypts a binary blob which contains an RSA encrypted AES session - key and AES encrypted data.''' + """Decrypts a binary blob which contains an RSA encrypted AES session + key and AES encrypted data.""" # Break out base64 encoded binary to encrypted session key and data ciphertext_bin = self._convert_blob_to_bin(secret_blob) - encrypted_session_key = ciphertext_bin[:self.offset] - ciphertext = ciphertext_bin[self.offset:] + encrypted_session_key = ciphertext_bin[: self.offset] + ciphertext = ciphertext_bin[self.offset :] secret = None @@ -282,7 +279,7 @@ def _decrypt_secret_blob(self, secret_blob): session_key = self.decryptor.decrypt(encrypted_session_key) fernet_cipher = Fernet(session_key) - secret = fernet_cipher.decrypt(ciphertext).decode('utf-8') + secret = fernet_cipher.decrypt(ciphertext).decode("utf-8") # A number of ciphertext issues could cause AES-RSA decryption to fail except Exception: raise errors.InputError("ciphertext binary is corrupt") @@ -290,34 +287,36 @@ def _decrypt_secret_blob(self, secret_blob): return secret def _decrypt_aes_wrapped_file(self, secret_file): - '''Decrypts a binary file which a base64 encoded secrets blob.''' + """Decrypts a binary file which a base64 encoded secrets blob.""" try: - with open(secret_file, 'rb') as f: + with open(secret_file, "rb") as f: secret_blob = f.read() secret = self._decrypt_secret_blob(secret_blob) except Exception as e: log.error( - "Could not decrypt AES wrapped secret '%s' (Reason: %s)" % ( - os.path.basename(secret_file), str(e))) + "Could not decrypt AES wrapped secret '%s' (Reason: %s)", + os.path.basename(secret_file), + str(e), + ) secret = None return secret def _decrypt_secret(self, secret_name): - '''Decrypt a single secret by name and return the value as a string.''' + """Decrypt a single secret by name and return the value as a string.""" secret = None if secret_name in self.secrets_dict: secret_path = self.secrets_dict[secret_name] - if os.path.basename(secret_path).endswith('.bin'): + if os.path.basename(secret_path).endswith(".bin"): secret = self._decrypt_aes_wrapped_file(secret_path) return secret def _decrypt_all_secrets(self): - '''Decrypt all secrets for the current environment and return a - dict object''' + """Decrypt all secrets for the current environment and return a + dict object""" secrets = {} for secret_name, secret_path in self.secrets_dict.items(): @@ -328,91 +327,91 @@ def _decrypt_all_secrets(self): return secrets def _get_secrets_dict(self): - '''Returns a dict of all secrets available for the current environment. + """Returns a dict of all secrets available for the current environment. key: secret_name value: absolute path of the secret file - ''' + """ secrets_dict = {} for file in os.listdir(self.environment_secrets_dir): if file.endswith(defaults.CIPHERTEXT_EXT): secret_name = self._chomp_ext(file) secrets_dict[secret_name] = os.path.join( - self.environment_secrets_dir, file) + self.environment_secrets_dir, file + ) return secrets_dict def _chomp_ext(self, string): - '''Remove the extension of a filename.''' + """Remove the extension of a filename.""" return os.path.splitext(string)[0] def _is_ascii(self, string): try: - string.encode('ascii') + string.encode("ascii") except UnicodeEncodeError: return False return True def generate_key_pair(self): - '''RSA key generation + """RSA key generation Sets up a project or application with asymmetric keys used for all Code Crypt operations. If only one of two of the key pair already exists an an exception is raised. If both already exist, no action is taken. KMS encrypt is needed to encrypt the RSA private key and save it. - ''' + """ public_key_file_exists = os.path.exists(self.public_key_file) - private_key_file_exists = os.path.exists( - self.encrypted_private_key_file) + private_key_file_exists = os.path.exists(self.encrypted_private_key_file) if public_key_file_exists and private_key_file_exists: - log.info('Public key and private key already exist.') + log.info("Public key and private key already exist.") return elif public_key_file_exists and not private_key_file_exists: - raise errors.CodeCryptError( - 'public key exists but private key is missing.') + raise errors.CodeCryptError("public key exists but private key is missing.") elif private_key_file_exists and not public_key_file_exists: - raise errors.CodeCryptError( - 'private key exists but public key is missing.') + raise errors.CodeCryptError("private key exists but public key is missing.") - log.info('Generating key pair...') + log.info("Generating key pair...") private_key = rsa.generate_private_key( - public_exponent=65537, - key_size=self.rsa_key_size, - backend=default_backend()) + public_exponent=65537, key_size=self.rsa_key_size, backend=default_backend() + ) private_key_pem = private_key.private_bytes( encoding=serialization.Encoding.PEM, format=serialization.PrivateFormat.TraditionalOpenSSL, - encryption_algorithm=serialization.NoEncryption()) + encryption_algorithm=serialization.NoEncryption(), + ) public_key = private_key.public_key() public_key_pem = public_key.public_bytes( encoding=serialization.Encoding.PEM, - format=serialization.PublicFormat.SubjectPublicKeyInfo) + format=serialization.PublicFormat.SubjectPublicKeyInfo, + ) try: response = self.kms.encrypt( KeyId=self.kms_key_id, Plaintext=private_key_pem, - EncryptionContext=self.encryption_context) + EncryptionContext=self.encryption_context, + ) except Exception as e: raise errors.KmsError( - "Could not encrypt private key using KMS " - "key '%s' (Reason: %s)" % (self.kms_key_id, str(e))) + f"Could not encrypt private key using KMS key '{self.kms_key_id}' (Reason: {str(e)})" + ) - ciphertext_blob = response['CiphertextBlob'] + ciphertext_blob = response["CiphertextBlob"] - with open(self.encrypted_private_key_file, 'wb') as f: + with open(self.encrypted_private_key_file, "wb") as f: f.write(b64encode(ciphertext_blob)) - with open(self.public_key_file, 'wb') as f: + with open(self.public_key_file, "wb") as f: f.write(public_key_pem) def encrypt(self, secret_name, secret, public_key=None): - '''Single secret encrypt + """Single secret encrypt Takes in a single key-value pair secret and encrypts it with AES. The AES session key is then encrypted asymmetrically with RSA so that @@ -425,15 +424,15 @@ def encrypt(self, secret_name, secret, public_key=None): secret_name: secret name secret: secret value public_key: public RSA to encrypt secrets - ''' + """ self._set_encryptor(public_key) self.secrets_dict = self._get_secrets_dict() - log.info('Encrypting...') + log.info("Encrypting...") self._encrypt(secret_name, secret) def blob_encrypt(self, secret, public_key=None): - '''Single secret encrypt that returns an encrypted blob + """Single secret encrypt that returns an encrypted blob Takes in a single value and encrypts with it RSA-AES hybrid and returns an encrypted base64 encrypted binary blob. @@ -441,12 +440,12 @@ def blob_encrypt(self, secret, public_key=None): Args: secret: secret value public_key: public RSA to encrypt secrets - ''' + """ self._set_encryptor(public_key) return self._encrypt(None, secret, blob_mode=True) def import_secrets(self, secrets_json, public_key=None): - '''Secrets JSON encryption + """Secrets JSON encryption Takes in a JSON object in the form of key-value pairs, initializes an encryptor based on a public key and writes it to into the data @@ -455,19 +454,17 @@ def import_secrets(self, secrets_json, public_key=None): Args: secrets_json: JSON string with secrets to be encrypted public_key: public RSA to encrypt AES session keys with - ''' + """ self._set_encryptor(public_key) self._get_secrets_dict() - log.info('Importing...') + log.info("Importing...") self._encrypt_all_secrets(secrets_json) def decrypt( - self, - secret_name=None, - plaintext_private_key=None, - encrypted_private_key=None): - '''Decrypt secrets to JSON + self, secret_name=None, plaintext_private_key=None, encrypted_private_key=None + ): + """Decrypt secrets to JSON Takes in a single secret to decrypt, or none to decrypt all of them to produce a dict result of key-value pairs. @@ -486,28 +483,28 @@ def decrypt( Returns: decrypted secrets in dict form - ''' + """ if plaintext_private_key and encrypted_private_key: raise errors.CodeCryptError( - 'both plaintext and encrypted private keys cannot be provided') + "both plaintext and encrypted private keys cannot be provided" + ) self._set_decryptor( plaintext_private_key=plaintext_private_key, - encrypted_private_key=encrypted_private_key) + encrypted_private_key=encrypted_private_key, + ) self.secrets_dict = self._get_secrets_dict() - log.info('Decrypting...') + log.info("Decrypting...") if secret_name: return self._decrypt_secret(secret_name) return self._decrypt_all_secrets() def blob_decrypt( - self, - secret_blob, - plaintext_private_key=None, - encrypted_private_key=None): - '''Decrypts a base64 encoded encrypted binary to a plaintext secret + self, secret_blob, plaintext_private_key=None, encrypted_private_key=None + ): + """Decrypts a base64 encoded encrypted binary to a plaintext secret Either a plaintext RSA private key can be provided, or a KMS encrypted RSA private key. In the latter case, a KMS decrypt operation will need @@ -520,16 +517,18 @@ def blob_decrypt( Returns: decrypted secret string - ''' + """ if plaintext_private_key and encrypted_private_key: raise errors.CodeCryptError( - 'both plaintext and encrypted private keys cannot be provided') + "both plaintext and encrypted private keys cannot be provided" + ) # We perform this here to validate the secret blob early self._convert_blob_to_bin(secret_blob) self._set_decryptor( plaintext_private_key=plaintext_private_key, - encrypted_private_key=encrypted_private_key) + encrypted_private_key=encrypted_private_key, + ) return self._decrypt_secret_blob(secret_blob) diff --git a/code_crypt/test/test_core.py b/code_crypt/core_test.py similarity index 62% rename from code_crypt/test/test_core.py rename to code_crypt/core_test.py index d389809..a6b336d 100644 --- a/code_crypt/test/test_core.py +++ b/code_crypt/core_test.py @@ -1,24 +1,24 @@ # coding: utf8 import base64 -import mock import os import shutil import tempfile import unittest +import mock from botocore.client import Config from code_crypt import core as code_crypt from code_crypt import errors APP_ROOT = tempfile.mkdtemp() -DATA_DIR = u'code_crypt/data' -ENV = u'test' -EXT = u'.bin' -KMS_KEY_ID = u'aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee' +DATA_DIR = "code_crypt/data" +ENV = "test" +EXT = ".bin" +KMS_KEY_ID = "aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee" -TEST_VALID_JSON = u"""{ +TEST_VALID_JSON = """{ "SECRET_NAME_A": "AAA", "SECRET_NAME_B": "BBB" }""" @@ -61,7 +61,7 @@ RnzPabvLpiSymUdlOMiTOBG1IEioY48rYt/JRzwywcZJ24dm3FE5/cM= -----END RSA PRIVATE KEY-----""" -TEST_ENCRYPTED_PRIVATE_KEY = u"""AQICAHhiG23RsuSTqwlDgwSBWuBR8vtuEXp93gSa1U3HT2 +TEST_ENCRYPTED_PRIVATE_KEY = """AQICAHhiG23RsuSTqwlDgwSBWuBR8vtuEXp93gSa1U3HT2 B6gwFXBr/FJsoxfaluFn9dEQaXAAAG9zCCBvMGCSqGSIb3DQEHBqCCBuQwggbgAgEAMIIG2QYJKoZI hvcNAQcBMB4GCWCGSAFlAwQBLjARBAwRxVHAc1aReGMBLLUCARCAggaqcBmtFSHgQYH8xRJCnkcNyc 5HAtYG0jz+yA/DUKyWNaaPzqdx2+sFYVttHEjv/P5i01DwYuzn5AT7dSi73cTvXn0znShV+GsGC7WQ @@ -96,7 +96,7 @@ n91rMuBMWam1FHtcPdLeXFw1NX68NtUfiB48keI1tBgo=""" # Sample RSA Private Key -LARGE_SECRET = u"""-----BEGIN RSA PRIVATE KEY----- +LARGE_SECRET = """-----BEGIN RSA PRIVATE KEY----- MIICXAIBAAKBgQCqGKukO1De7zhZj6+H0qtjTkVxwTCpvKe4eCZ0FPqri0cb2JZfXJ/DgYSF6vUp wmJG8wVQZKjeGcjDOL5UlsuusFncCzWBQ7RKNUSesmQRMSGkVb1/3j+skZ6UtW+5u09lHNsj6tQ5 1s1SPrCBkedbNf0Tp0GbMJDyR4e9T04ZZwIDAQABAoGAFijko56+qGyN8M0RVyaRAXz++xTqHBLh @@ -110,7 +110,7 @@ 37sJ5QsW+sJyoNde3xH8vdXhzU7eT82D6X/scw9RZz+/6rCJ4p0= -----END RSA PRIVATE KEY-----""" -TEST_BASE64_TEXT = u"""QmFzZTY0IGlzIGEgZ3JvdXAgb2Ygc2ltaWxhciBiaW5hcnktdG8tdG +TEST_BASE64_TEXT = """QmFzZTY0IGlzIGEgZ3JvdXAgb2Ygc2ltaWxhciBiaW5hcnktdG8tdG V4dCBlbmNvZGluZyBzY2hlbWVzIHRoYXQgcmVwcmVzZW50IGJpbmFyeSBkYXRhIGluIGFuIEFTQ0l JIHN0cmluZyBmb3JtYXQgYnkgdHJhbnNsYXRpbmcgaXQgaW50byBhIHJhZGl4LTY0IHJlcHJlc2Vu dGF0aW9uLiBUaGUgdGVybSBCYXNlNjQgb3JpZ2luYXRlcyBmcm9tIGEgc3BlY2lmaWMgTUlNRSBjb @@ -119,7 +119,6 @@ class TestGenerateKeyPair(unittest.TestCase): - def _is_base64(self, s): try: base64.b64decode(s) @@ -130,82 +129,77 @@ def _is_base64(self, s): def setUp(self): self.cc_obj = code_crypt.CodeCrypt( - kms_key_id=KMS_KEY_ID, - app_root=APP_ROOT, - env=ENV, - ciphertext_ext=EXT) + kms_key_id=KMS_KEY_ID, app_root=APP_ROOT, env=ENV, ciphertext_ext=EXT + ) self.expected_public_key_file = os.path.join( - APP_ROOT, DATA_DIR, 'keys', ENV, 'public_key.asc') + APP_ROOT, DATA_DIR, "keys", ENV, "public_key.asc" + ) self.expected_private_key_file = os.path.join( - APP_ROOT, DATA_DIR, 'keys', ENV, 'encrypted_private_key.pem') + APP_ROOT, DATA_DIR, "keys", ENV, "encrypted_private_key.pem" + ) def test_generate_key_pair_with_missing_private_key(self): - with open(self.expected_public_key_file, 'wb') as f: - f.write(b'') + with open(self.expected_public_key_file, "wb") as f: + f.write(b"") self.assertRaises( - errors.CodeCryptError, - lambda: self.cc_obj.generate_key_pair()) + errors.CodeCryptError, lambda: self.cc_obj.generate_key_pair() + ) def test_generate_key_pair_with_missing_public_key_only(self): - with open(self.expected_private_key_file, 'wb') as f: - f.write(b'') + with open(self.expected_private_key_file, "wb") as f: + f.write(b"") self.assertRaises( - errors.CodeCryptError, - lambda: self.cc_obj.generate_key_pair()) + errors.CodeCryptError, lambda: self.cc_obj.generate_key_pair() + ) @mock.patch("boto3.client") - def test_kms_config( - self, mock_client): + def test_kms_config(self, mock_client): mock_client.return_value = mock.MagicMock() self.cc_obj = code_crypt.CodeCrypt( - kms_key_id=KMS_KEY_ID, - app_root=APP_ROOT, - env=ENV, - ciphertext_ext=EXT) + kms_key_id=KMS_KEY_ID, app_root=APP_ROOT, env=ENV, ciphertext_ext=EXT + ) self.cc_obj.kms.encrypt.return_value = { - 'CiphertextBlob': TEST_PLAINTEXT_PRIVATE_KEY} + "CiphertextBlob": TEST_PLAINTEXT_PRIVATE_KEY + } self.cc_obj.generate_key_pair() - self.assertEqual('kms', mock_client.call_args[0][0]) + self.assertEqual("kms", mock_client.call_args[0][0]) - assert isinstance(mock_client.call_args[1]['config'], Config) - self.assertEqual( - 'us-east-1', mock_client.call_args[1]['config'].region_name) + assert isinstance(mock_client.call_args[1]["config"], Config) + self.assertEqual("us-east-1", mock_client.call_args[1]["config"].region_name) @mock.patch("boto3.client") - def test_generate_key_pair_with_no_existing_keys( - self, mock_client): + def test_generate_key_pair_with_no_existing_keys(self, mock_client): self.cc_obj = code_crypt.CodeCrypt( - kms_key_id=KMS_KEY_ID, - app_root=APP_ROOT, - env=ENV, - ciphertext_ext=EXT) + kms_key_id=KMS_KEY_ID, app_root=APP_ROOT, env=ENV, ciphertext_ext=EXT + ) self.cc_obj.kms.encrypt.return_value = { - 'CiphertextBlob': TEST_PLAINTEXT_PRIVATE_KEY} + "CiphertextBlob": TEST_PLAINTEXT_PRIVATE_KEY + } self.cc_obj.generate_key_pair() try: - with open(self.expected_public_key_file, 'r') as f: + with open(self.expected_public_key_file, "r") as f: public_key = f.read() - with open(self.expected_private_key_file, 'r') as f: + with open(self.expected_private_key_file, "r") as f: private_key = f.read() except IOError: - print('Cannot write out generate keys.') + print("Cannot write out generate keys.") exit(2) - self.assertTrue(u'BEGIN PUBLIC' in public_key.split("\n")[0]) + self.assertTrue("BEGIN PUBLIC" in public_key.split("\n")[0]) self.assertTrue(self._is_base64(private_key)) def tearDown(self): @@ -213,194 +207,171 @@ def tearDown(self): class TestImportSecrets(unittest.TestCase): - def setUp(self): self.cc_obj = code_crypt.CodeCrypt( - kms_key_id=KMS_KEY_ID, - app_root=APP_ROOT, - env=ENV, - ciphertext_ext=EXT) + kms_key_id=KMS_KEY_ID, app_root=APP_ROOT, env=ENV, ciphertext_ext=EXT + ) def test_import_secrets_with_invalid_secrets_json(self): self.assertRaises( errors.InputError, - lambda: self.cc_obj.import_secrets( - '{foo=bar}', TEST_PUBLIC_KEY)) + lambda: self.cc_obj.import_secrets("{foo=bar}", TEST_PUBLIC_KEY), + ) def test_import_secrets_with_valid_data(self): self.cc_obj.import_secrets(TEST_VALID_JSON, TEST_PUBLIC_KEY) decrypted_result = self.cc_obj.decrypt( - 'SECRET_NAME_A', plaintext_private_key=TEST_PLAINTEXT_PRIVATE_KEY) - self.assertEqual(decrypted_result, 'AAA') + "SECRET_NAME_A", plaintext_private_key=TEST_PLAINTEXT_PRIVATE_KEY + ) + self.assertEqual(decrypted_result, "AAA") decrypted_result = self.cc_obj.decrypt( - 'SECRET_NAME_B', plaintext_private_key=TEST_PLAINTEXT_PRIVATE_KEY) - self.assertEqual(decrypted_result, 'BBB') + "SECRET_NAME_B", plaintext_private_key=TEST_PLAINTEXT_PRIVATE_KEY + ) + self.assertEqual(decrypted_result, "BBB") def tearDown(self): shutil.rmtree(APP_ROOT) class TestDecrypt(unittest.TestCase): - def setUp(self): self.cc_obj = code_crypt.CodeCrypt( - kms_key_id=KMS_KEY_ID, - app_root=APP_ROOT, - env=ENV, - ciphertext_ext=EXT) + kms_key_id=KMS_KEY_ID, app_root=APP_ROOT, env=ENV, ciphertext_ext=EXT + ) def test_decrypt_with_missing_private_key_file(self): - self.assertRaises( - errors.InputError, lambda: self.cc_obj.decrypt('FOO')) + self.assertRaises(errors.InputError, lambda: self.cc_obj.decrypt("FOO")) def test_decrypt_with_malformed_private_key(self): self.assertRaises( - errors.DecryptorError, - lambda: self.cc_obj.decrypt('FOO', 'bar')) + errors.DecryptorError, lambda: self.cc_obj.decrypt("FOO", "bar") + ) def test_decrypt_with_missing_secret(self): decrypted_result = self.cc_obj.decrypt( - 'FOO', plaintext_private_key=TEST_PLAINTEXT_PRIVATE_KEY) + "FOO", plaintext_private_key=TEST_PLAINTEXT_PRIVATE_KEY + ) self.assertIsNone(decrypted_result) def test_decrypt_secret_with_plaintext_private_key(self): - self.cc_obj.encrypt('SECRET_NAME_CCC', 'CCC', TEST_PUBLIC_KEY) + self.cc_obj.encrypt("SECRET_NAME_CCC", "CCC", TEST_PUBLIC_KEY) decrypted_result = self.cc_obj.decrypt( - 'SECRET_NAME_CCC', - plaintext_private_key=TEST_PLAINTEXT_PRIVATE_KEY) - self.assertEqual(decrypted_result, 'CCC') + "SECRET_NAME_CCC", plaintext_private_key=TEST_PLAINTEXT_PRIVATE_KEY + ) + self.assertEqual(decrypted_result, "CCC") def test_decrypt_secret_with_invalid_kms_context(self): self.assertRaises( errors.KmsError, lambda: self.cc_obj.decrypt( - encrypted_private_key=TEST_ENCRYPTED_PRIVATE_KEY)) + encrypted_private_key=TEST_ENCRYPTED_PRIVATE_KEY + ), + ) def test_decrypt_secret_with_missing_kms_key(self): - self.assertRaises( - errors.InputError, - lambda: self.cc_obj.decrypt('FOO')) + self.assertRaises(errors.InputError, lambda: self.cc_obj.decrypt("FOO")) def test_decrypt_all_with_no_secrets(self): decrypted_result = self.cc_obj.decrypt( - plaintext_private_key=TEST_PLAINTEXT_PRIVATE_KEY) - self.assertEquals(decrypted_result, {}) + plaintext_private_key=TEST_PLAINTEXT_PRIVATE_KEY + ) + self.assertEqual(decrypted_result, {}) @mock.patch("boto3.client") def test_decrypt_secret_with_kms(self, mock_client): - secret = 'DDD' - self.cc_obj.encrypt('SECRET_NAME_DDD', secret, TEST_PUBLIC_KEY) + secret = "DDD" + self.cc_obj.encrypt("SECRET_NAME_DDD", secret, TEST_PUBLIC_KEY) self.cc_obj = code_crypt.CodeCrypt( - kms_key_id=KMS_KEY_ID, - app_root=APP_ROOT, - env=ENV, - ciphertext_ext=EXT) + kms_key_id=KMS_KEY_ID, app_root=APP_ROOT, env=ENV, ciphertext_ext=EXT + ) - self.cc_obj.kms.decrypt.return_value = { - 'Plaintext': TEST_PLAINTEXT_PRIVATE_KEY} + self.cc_obj.kms.decrypt.return_value = {"Plaintext": TEST_PLAINTEXT_PRIVATE_KEY} decrypted_result = self.cc_obj.decrypt( - 'SECRET_NAME_DDD', - encrypted_private_key=TEST_ENCRYPTED_PRIVATE_KEY) + "SECRET_NAME_DDD", encrypted_private_key=TEST_ENCRYPTED_PRIVATE_KEY + ) self.assertEqual(decrypted_result, secret) @mock.patch("boto3.client") def test_decrypt_large_secret_with_kms(self, mock_client): - self.cc_obj.encrypt('SECRET_NAME_DDD', LARGE_SECRET, TEST_PUBLIC_KEY) + self.cc_obj.encrypt("SECRET_NAME_DDD", LARGE_SECRET, TEST_PUBLIC_KEY) self.cc_obj = code_crypt.CodeCrypt( - kms_key_id=KMS_KEY_ID, - app_root=APP_ROOT, - env=ENV, - ciphertext_ext=EXT) + kms_key_id=KMS_KEY_ID, app_root=APP_ROOT, env=ENV, ciphertext_ext=EXT + ) - self.cc_obj.kms.decrypt.return_value = { - 'Plaintext': TEST_PLAINTEXT_PRIVATE_KEY} + self.cc_obj.kms.decrypt.return_value = {"Plaintext": TEST_PLAINTEXT_PRIVATE_KEY} decrypted_result = self.cc_obj.decrypt( - 'SECRET_NAME_DDD', - encrypted_private_key=TEST_ENCRYPTED_PRIVATE_KEY) + "SECRET_NAME_DDD", encrypted_private_key=TEST_ENCRYPTED_PRIVATE_KEY + ) self.assertEqual(decrypted_result, LARGE_SECRET) @mock.patch("boto3.client") def test_decrypt_explicit_all_with_kms(self, mock_client): - secret_one = 'one' - secret_two = 'two' - self.cc_obj.encrypt( - 'SECRET_NAME_1', secret_one, public_key=TEST_PUBLIC_KEY) - self.cc_obj.encrypt( - 'SECRET_NAME_2', secret_two, public_key=TEST_PUBLIC_KEY) + secret_one = "one" + secret_two = "two" + self.cc_obj.encrypt("SECRET_NAME_1", secret_one, public_key=TEST_PUBLIC_KEY) + self.cc_obj.encrypt("SECRET_NAME_2", secret_two, public_key=TEST_PUBLIC_KEY) self.cc_obj = code_crypt.CodeCrypt( - kms_key_id=KMS_KEY_ID, - app_root=APP_ROOT, - env=ENV, - ciphertext_ext=EXT) + kms_key_id=KMS_KEY_ID, app_root=APP_ROOT, env=ENV, ciphertext_ext=EXT + ) - self.cc_obj.kms.decrypt.return_value = { - 'Plaintext': TEST_PLAINTEXT_PRIVATE_KEY} + self.cc_obj.kms.decrypt.return_value = {"Plaintext": TEST_PLAINTEXT_PRIVATE_KEY} decrypted_result = self.cc_obj.decrypt( - encrypted_private_key=TEST_ENCRYPTED_PRIVATE_KEY) + encrypted_private_key=TEST_ENCRYPTED_PRIVATE_KEY + ) - self.assertEqual(decrypted_result['SECRET_NAME_1'], secret_one) - self.assertEqual(decrypted_result['SECRET_NAME_2'], secret_two) + self.assertEqual(decrypted_result["SECRET_NAME_1"], secret_one) + self.assertEqual(decrypted_result["SECRET_NAME_2"], secret_two) @mock.patch("boto3.client") def test_decrypt_all_with_kms(self, mock_client): - secret_one = 'one' - secret_two = 'two' - self.cc_obj.encrypt( - 'SECRET_NAME_1', secret_one, public_key=TEST_PUBLIC_KEY) - self.cc_obj.encrypt( - 'SECRET_NAME_2', secret_two, public_key=TEST_PUBLIC_KEY) + secret_one = "one" + secret_two = "two" + self.cc_obj.encrypt("SECRET_NAME_1", secret_one, public_key=TEST_PUBLIC_KEY) + self.cc_obj.encrypt("SECRET_NAME_2", secret_two, public_key=TEST_PUBLIC_KEY) self.cc_obj = code_crypt.CodeCrypt( - kms_key_id=KMS_KEY_ID, - app_root=APP_ROOT, - env=ENV, - ciphertext_ext=EXT) + kms_key_id=KMS_KEY_ID, app_root=APP_ROOT, env=ENV, ciphertext_ext=EXT + ) - self.cc_obj.kms.decrypt.return_value = { - 'Plaintext': TEST_PLAINTEXT_PRIVATE_KEY} + self.cc_obj.kms.decrypt.return_value = {"Plaintext": TEST_PLAINTEXT_PRIVATE_KEY} decrypted_result = self.cc_obj.decrypt( - plaintext_private_key=TEST_PLAINTEXT_PRIVATE_KEY) + plaintext_private_key=TEST_PLAINTEXT_PRIVATE_KEY + ) - self.assertEqual(decrypted_result['SECRET_NAME_1'], secret_one) - self.assertEqual(decrypted_result['SECRET_NAME_2'], secret_two) + self.assertEqual(decrypted_result["SECRET_NAME_1"], secret_one) + self.assertEqual(decrypted_result["SECRET_NAME_2"], secret_two) def tearDown(self): shutil.rmtree(APP_ROOT) class TestBlobEncryptDecrypt(unittest.TestCase): - def setUp(self): self.cc_obj = code_crypt.CodeCrypt( - kms_key_id=KMS_KEY_ID, - app_root=APP_ROOT, - env=ENV, - ciphertext_ext=EXT) + kms_key_id=KMS_KEY_ID, app_root=APP_ROOT, env=ENV, ciphertext_ext=EXT + ) @mock.patch("boto3.client") def test_blob_decrypt_secret_with_kms(self, mock_client): - secret = 'DDD' + secret = "DDD" secret_blob = self.cc_obj.blob_encrypt(secret, TEST_PUBLIC_KEY) self.cc_obj = code_crypt.CodeCrypt( - kms_key_id=KMS_KEY_ID, - app_root=APP_ROOT, - env=ENV, - ciphertext_ext=EXT) + kms_key_id=KMS_KEY_ID, app_root=APP_ROOT, env=ENV, ciphertext_ext=EXT + ) - self.cc_obj.kms.decrypt.return_value = { - 'Plaintext': TEST_PLAINTEXT_PRIVATE_KEY} + self.cc_obj.kms.decrypt.return_value = {"Plaintext": TEST_PLAINTEXT_PRIVATE_KEY} decrypted_result = self.cc_obj.blob_decrypt( - secret_blob, - encrypted_private_key=TEST_ENCRYPTED_PRIVATE_KEY) + secret_blob, encrypted_private_key=TEST_ENCRYPTED_PRIVATE_KEY + ) self.assertEqual(decrypted_result, secret) @@ -409,95 +380,91 @@ def tearDown(self): class TestBlobDecrypt(unittest.TestCase): - def setUp(self): self.cc_obj = code_crypt.CodeCrypt( - kms_key_id=KMS_KEY_ID, - app_root=APP_ROOT, - env=ENV, - ciphertext_ext=EXT) + kms_key_id=KMS_KEY_ID, app_root=APP_ROOT, env=ENV, ciphertext_ext=EXT + ) def test_blob_decrypt_with_invalid_short_secret(self): - self.assertRaises( - errors.InputError, - lambda: self.cc_obj.blob_decrypt('FOO')) + self.assertRaises(errors.InputError, lambda: self.cc_obj.blob_decrypt("FOO")) def test_blob_decrypt_with_invalid_base64_value(self): self.assertRaises( - errors.InputError, - lambda: self.cc_obj.blob_decrypt('FOOFOOFOOFOOFOOFOOFOO')) + errors.InputError, lambda: self.cc_obj.blob_decrypt("FOOFOOFOOFOOFOOFOOFOO") + ) def test_blob_decrypt_with_invalid_base64_decoded_value(self): self.assertRaises( - errors.InputError, - lambda: self.cc_obj.blob_decrypt(TEST_BASE64_TEXT)) + errors.InputError, lambda: self.cc_obj.blob_decrypt(TEST_BASE64_TEXT) + ) def tearDown(self): shutil.rmtree(APP_ROOT) class TestEncrypt(unittest.TestCase): - def setUp(self): self.cc_obj = code_crypt.CodeCrypt( - kms_key_id=KMS_KEY_ID, - app_root=APP_ROOT, - env=ENV, - ciphertext_ext=EXT) + kms_key_id=KMS_KEY_ID, app_root=APP_ROOT, env=ENV, ciphertext_ext=EXT + ) def test_encrypt_with_missing_public_key(self): - self.assertRaises( - errors.InputError, - lambda: self.cc_obj.encrypt('FOO', 'BAR')) + self.assertRaises(errors.InputError, lambda: self.cc_obj.encrypt("FOO", "BAR")) def test_encrypt_with_malformed_public_key(self): self.assertRaises( errors.EncryptorError, - lambda: self.cc_obj.encrypt('FOO', 'BAR', 'SOME_BAD_KEY')) + lambda: self.cc_obj.encrypt("FOO", "BAR", "SOME_BAD_KEY"), + ) def test_encrypt_with_zero_length_secret_name(self): - secret_name = '' - secret = 'AAA' + secret_name = "" + secret = "AAA" self.assertRaises( errors.InputError, lambda: self.cc_obj.encrypt( - secret_name, secret, public_key=TEST_PUBLIC_KEY)) + secret_name, secret, public_key=TEST_PUBLIC_KEY + ), + ) def test_encrypt_with_non_ascii_secret_name(self): - secret_name = u'试' - secret = 'AAA' + secret_name = "试" + secret = "AAA" self.assertRaises( errors.InputError, lambda: self.cc_obj.encrypt( - secret_name, secret, public_key=TEST_PUBLIC_KEY)) + secret_name, secret, public_key=TEST_PUBLIC_KEY + ), + ) def test_encrypt_on_new_secret(self): - secret_name = 'NEW_SECRET' - secret = 'AAA' + secret_name = "NEW_SECRET" + secret = "AAA" self.cc_obj.encrypt(secret_name, secret, public_key=TEST_PUBLIC_KEY) decrypted_result = self.cc_obj.decrypt( - secret_name, plaintext_private_key=TEST_PLAINTEXT_PRIVATE_KEY) + secret_name, plaintext_private_key=TEST_PLAINTEXT_PRIVATE_KEY + ) - self.assertEquals(decrypted_result, secret) + self.assertEqual(decrypted_result, secret) def test_encrypt_on_existing_secret(self): - secret_name = 'EXISTING_SECRET' - secret = 'AAA' - new_secret = 'BBB' + secret_name = "EXISTING_SECRET" + secret = "AAA" + new_secret = "BBB" self.cc_obj.encrypt(secret_name, secret, public_key=TEST_PUBLIC_KEY) - self.cc_obj.encrypt( - secret_name, new_secret, public_key=TEST_PUBLIC_KEY) + self.cc_obj.encrypt(secret_name, new_secret, public_key=TEST_PUBLIC_KEY) decrypted_result = self.cc_obj.decrypt( - secret_name, plaintext_private_key=TEST_PLAINTEXT_PRIVATE_KEY) + secret_name, plaintext_private_key=TEST_PLAINTEXT_PRIVATE_KEY + ) - self.assertEquals(decrypted_result, new_secret) + self.assertEqual(decrypted_result, new_secret) def tearDown(self): shutil.rmtree(APP_ROOT) -if __name__ == '__main__': +if __name__ == "__main__": unittest.main() diff --git a/code_crypt/defaults.py b/code_crypt/defaults.py index f8b44d8..518f67f 100644 --- a/code_crypt/defaults.py +++ b/code_crypt/defaults.py @@ -1,18 +1,17 @@ import os + from cryptography.hazmat.primitives import hashes from cryptography.hazmat.primitives.asymmetric import padding - -AWS_REGION = u'us-east-1' -APP_ROOT = os.getenv(u'APP_ROOT', os.getcwd()) -DATA_DIR = u'code_crypt/data' -DEFAULT_ENV = u'development' -ENV_TAGS = [DEFAULT_ENV, u'staging', u'production'] -ENV_MAP = {u'dev': u'development', u'stag': u'staging', u'prod': u'production'} +AWS_REGION = "us-east-1" +APP_ROOT = os.getenv("APP_ROOT", os.getcwd()) +DATA_DIR = "code_crypt/data" +DEFAULT_ENV = "development" +ENV_TAGS = [DEFAULT_ENV, "staging", "production"] +ENV_MAP = {"dev": "development", "stag": "staging", "prod": "production"} RSA_KEY_SIZE = 2048 -CIPHERTEXT_EXT = u'.bin' +CIPHERTEXT_EXT = ".bin" RSA_PADDING = padding.OAEP( - mgf=padding.MGF1(algorithm=hashes.SHA1()), - algorithm=hashes.SHA1(), - label=None) + mgf=padding.MGF1(algorithm=hashes.SHA1()), algorithm=hashes.SHA1(), label=None +) diff --git a/code_crypt/errors.py b/code_crypt/errors.py index a726263..483bea1 100644 --- a/code_crypt/errors.py +++ b/code_crypt/errors.py @@ -1,28 +1,32 @@ - class CodeCryptError(Exception): - '''Base Exception for Code Crypt''' + """Base Exception for Code Crypt""" + def __init__(self, message=""): - if message is not "": - self.message = "%s: %s" % (self.__class__.__name__, message) + if message != "": + self.message = f"{self.__class__.__name__}: {message}" else: self.message = self.__class__.__name__ class EncryptorError(CodeCryptError): - '''Error creating RSA encryptor''' + """Error creating RSA encryptor""" + pass class DecryptorError(CodeCryptError): - '''Error creating RSA decryptor''' + """Error creating RSA decryptor""" + pass class InputError(CodeCryptError): - '''Error with inputs into the CodeCrypt object''' + """Error with inputs into the CodeCrypt object""" + pass class KmsError(CodeCryptError): - '''Error with using an AWS KMS operation''' + """Error with using an AWS KMS operation""" + pass diff --git a/code_crypt/metadata.py b/code_crypt/metadata.py index 219e284..882d3ae 100644 --- a/code_crypt/metadata.py +++ b/code_crypt/metadata.py @@ -13,5 +13,5 @@ # Copyright 2017 Nextdoor.com, Inc -__version__ = '0.1.3' -__desc__ = 'Code Crypt' +__version__ = "0.1.4" +__desc__ = "Code Crypt" diff --git a/code_crypt/test/test_cli.py b/code_crypt/test/test_cli.py deleted file mode 100644 index 75662fe..0000000 --- a/code_crypt/test/test_cli.py +++ /dev/null @@ -1,30 +0,0 @@ -import unittest - -from code_crypt import cli - -KMS_KEY_ID = 'aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee' - - -class CliTest(unittest.TestCase): - '''This is a simple sanity check that argparser is setup properly''' - - def test_get_config_init(self): - - argv = [ - 'cli.py', - '--kms-key-id', KMS_KEY_ID, - '--init' - ] - config = cli.get_config(argv) - self.assertTrue(config.init) - self.assertEquals(config.kms_key_id, KMS_KEY_ID) - - def test_get_config_encrypt(self): - encrypt_param = 'TEST=value' - - argv = [ - 'cli.py', - '--encrypt', 'TEST=value' - ] - config = cli.get_config(argv) - self.assertEquals(config.encrypt, encrypt_param) diff --git a/requirements.test.txt b/requirements.test.txt index 5108473..336eaa7 100644 --- a/requirements.test.txt +++ b/requirements.test.txt @@ -1,4 +1,8 @@ +# linting tools +black +isort + +# testing tools +pytest mock -nose>=1.3.7 -pycodestyle -pyflakes +botocore diff --git a/requirements.txt b/requirements.txt index 3c82119..d7e18dc 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,2 @@ -boto3>=1.4.0 -botocore>=1.6.0 -cryptography>=1.6 +boto3>=1.21.25,<2.0.0 +cryptography>=36.0.2 # ugh... they use major versions mainly now diff --git a/setup.py b/setup.py index 9d068d4..92b42f3 100644 --- a/setup.py +++ b/setup.py @@ -1,4 +1,4 @@ -# Copyright 2017 Nextdoor.com, Inc. +# Copyright 2017-2022 Nextdoor.com, Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -14,29 +14,26 @@ import os -from setuptools import setup, find_packages +from setuptools import find_packages, setup from code_crypt.metadata import __desc__, __version__ -PACKAGE = 'code_crypt' +PACKAGE = "code_crypt" DIR = os.path.dirname(os.path.realpath(__file__)) setup( name=PACKAGE, version=__version__, description=__desc__, - long_description=open('%s/README.md' % DIR, 'rb').read().decode('utf8'), - author='Nextdoor Engineering', - author_email='nehal@nextdoor.com', - url='https://github.com/Nextdoor/code-crypt', - download_url='https://github.com/Nextdoor/code-crypt/tarball/0.1.3', - license='Apache License, Version 2.0', + long_description=open(f"{DIR}/README.md", "rb").read().decode("utf8"), + author="Nextdoor Engineering", + author_email="nehal@nextdoor.com", + url="https://github.com/Nextdoor/code-crypt", + download_url=f"https://github.com/Nextdoor/code-crypt/tarball/{__version__}", + license="Apache License, Version 2.0", packages=find_packages(), - test_suite='nose.collector', - tests_require=open('%s/requirements.test.txt' % DIR).readlines(), + tests_require=open(f"{DIR}/requirements.test.txt").readlines(), setup_requires=[], - install_requires=open('%s/requirements.txt' % DIR).readlines(), - entry_points={ - 'console_scripts': [ - 'code-crypt = code_crypt.cli:main']} + install_requires=open(f"{DIR}/requirements.txt").readlines(), + entry_points={"console_scripts": ["code-crypt = code_crypt.cli:main"]}, )