diff --git a/.vscode/cspell.json b/.vscode/cspell.json index 3a8f31c9c874..8b762e5c1b21 100644 --- a/.vscode/cspell.json +++ b/.vscode/cspell.json @@ -831,7 +831,8 @@ { "filename": "sdk/keyvault/**", "words": [ - "eddsa" + "eddsa", + "Thawte" ] }, { diff --git a/sdk/keyvault/azure-keyvault-keys/CHANGELOG.md b/sdk/keyvault/azure-keyvault-keys/CHANGELOG.md index bfb53c243214..964a40b8dae9 100644 --- a/sdk/keyvault/azure-keyvault-keys/CHANGELOG.md +++ b/sdk/keyvault/azure-keyvault-keys/CHANGELOG.md @@ -1,14 +1,12 @@ # Release History -## 4.9.0b2 (Unreleased) +## 4.9.0b2 (2023-10-12) ### Features Added - -### Breaking Changes - -### Bugs Fixed - -### Other Changes +- The `cryptography` library's `RSAPrivateKey` and `RSAPublicKey` interfaces are now implemented by + `KeyVaultRSAPrivateKey` and `KeyVaultRSAPublicKey` classes that can use keys managed by Key Vault +- `CryptographyClient` has `create_rsa_private_key` and `create_rsa_public_key` methods that return a + `KeyVaultRSAPrivateKey` and `KeyVaultRSAPublicKey`, respectively ## 4.9.0b1 (2023-05-16) diff --git a/sdk/keyvault/azure-keyvault-keys/assets.json b/sdk/keyvault/azure-keyvault-keys/assets.json index 6db138d7b706..ba822569c33e 100644 --- a/sdk/keyvault/azure-keyvault-keys/assets.json +++ b/sdk/keyvault/azure-keyvault-keys/assets.json @@ -2,5 +2,5 @@ "AssetsRepo": "Azure/azure-sdk-assets", "AssetsRepoPrefixPath": "python", "TagPrefix": "python/keyvault/azure-keyvault-keys", - "Tag": "python/keyvault/azure-keyvault-keys_28b4323b48" + "Tag": "python/keyvault/azure-keyvault-keys_8ef3422a55" } diff --git a/sdk/keyvault/azure-keyvault-keys/azure/keyvault/keys/crypto/__init__.py b/sdk/keyvault/azure-keyvault-keys/azure/keyvault/keys/crypto/__init__.py index 44006de7365e..9e931898fc8e 100644 --- a/sdk/keyvault/azure-keyvault-keys/azure/keyvault/keys/crypto/__init__.py +++ b/sdk/keyvault/azure-keyvault-keys/azure/keyvault/keys/crypto/__init__.py @@ -2,7 +2,16 @@ # Copyright (c) Microsoft Corporation. # Licensed under the MIT License. # ------------------------------------ -from ._models import DecryptResult, EncryptResult, SignResult, WrapResult, VerifyResult, UnwrapResult +from ._models import ( + DecryptResult, + EncryptResult, + KeyVaultRSAPrivateKey, + KeyVaultRSAPublicKey, + SignResult, + WrapResult, + VerifyResult, + UnwrapResult, +) from ._enums import EncryptionAlgorithm, KeyWrapAlgorithm, SignatureAlgorithm from ._client import CryptographyClient @@ -12,6 +21,8 @@ "DecryptResult", "EncryptionAlgorithm", "EncryptResult", + "KeyVaultRSAPrivateKey", + "KeyVaultRSAPublicKey", "KeyWrapAlgorithm", "SignatureAlgorithm", "SignResult", diff --git a/sdk/keyvault/azure-keyvault-keys/azure/keyvault/keys/crypto/_client.py b/sdk/keyvault/azure-keyvault-keys/azure/keyvault/keys/crypto/_client.py index 03a02539ca57..efa51bd4f438 100644 --- a/sdk/keyvault/azure-keyvault-keys/azure/keyvault/keys/crypto/_client.py +++ b/sdk/keyvault/azure-keyvault-keys/azure/keyvault/keys/crypto/_client.py @@ -10,6 +10,7 @@ from . import DecryptResult, EncryptionAlgorithm, EncryptResult, SignResult, VerifyResult, UnwrapResult, WrapResult from ._key_validity import raise_if_time_invalid +from ._models import KeyVaultRSAPrivateKey, KeyVaultRSAPublicKey from ._providers import get_local_cryptography_provider, NoLocalCryptography from .. import KeyOperation from .._models import JsonWebKey, KeyVaultKey @@ -211,6 +212,30 @@ def _initialize(self, **kwargs) -> None: # try to get the key again next time unless we know we're forbidden to do so self._initialized = self._keys_get_forbidden + @distributed_trace + def create_rsa_private_key(self) -> KeyVaultRSAPrivateKey: # pylint:disable=client-method-missing-kwargs + """Create an `RSAPrivateKey` implementation backed by this `CryptographyClient`, as a `KeyVaultRSAPrivateKey`. + + The `CryptographyClient` will attempt to download the key, if it hasn't been already, as part of this operation. + + :returns: A `KeyVaultRSAPrivateKey`, which implements `cryptography`'s `RSAPrivateKey` interface. + :rtype: :class:`~azure.keyvault.keys.crypto.KeyVaultRSAPrivateKey` + """ + self._initialize() + return KeyVaultRSAPrivateKey(client=self, key_material=cast(JsonWebKey, self._key)) + + @distributed_trace + def create_rsa_public_key(self) -> KeyVaultRSAPublicKey: # pylint:disable=client-method-missing-kwargs + """Create an `RSAPublicKey` implementation backed by this `CryptographyClient`, as a `KeyVaultRSAPublicKey`. + + The `CryptographyClient` will attempt to download the key, if it hasn't been already, as part of this operation. + + :returns: A `KeyVaultRSAPublicKey`, which implements `cryptography`'s `RSAPublicKey` interface. + :rtype: :class:`~azure.keyvault.keys.crypto.KeyVaultRSAPublicKey` + """ + self._initialize() + return KeyVaultRSAPublicKey(client=self, key_material=cast(JsonWebKey, self._key)) + @distributed_trace def encrypt(self, algorithm: "EncryptionAlgorithm", plaintext: bytes, **kwargs) -> EncryptResult: """Encrypt bytes using the client's key. diff --git a/sdk/keyvault/azure-keyvault-keys/azure/keyvault/keys/crypto/_models.py b/sdk/keyvault/azure-keyvault-keys/azure/keyvault/keys/crypto/_models.py index a1b9fb01aa90..021b01c139bc 100644 --- a/sdk/keyvault/azure-keyvault-keys/azure/keyvault/keys/crypto/_models.py +++ b/sdk/keyvault/azure-keyvault-keys/azure/keyvault/keys/crypto/_models.py @@ -2,11 +2,437 @@ # Copyright (c) Microsoft Corporation. # Licensed under the MIT License. # ------------------------------------ -from typing import TYPE_CHECKING +from typing import cast, Optional, NoReturn, Union, TYPE_CHECKING + +from cryptography.exceptions import InvalidSignature +from cryptography.hazmat.primitives.asymmetric.padding import AsymmetricPadding, OAEP, PKCS1v15, PSS, MGF1 +from cryptography.hazmat.primitives.asymmetric.rsa import ( + rsa_crt_dmp1, + rsa_crt_dmq1, + rsa_crt_iqmp, + rsa_recover_prime_factors, + RSAPrivateKey, + RSAPrivateNumbers, + RSAPublicKey, + RSAPublicNumbers, +) +from cryptography.hazmat.primitives.asymmetric.utils import Prehashed +from cryptography.hazmat.primitives.hashes import Hash, HashAlgorithm, SHA1, SHA256, SHA384, SHA512 +from cryptography.hazmat.primitives.serialization import ( + Encoding, + KeySerializationEncryption, + PrivateFormat, + PublicFormat, +) + +from ._enums import EncryptionAlgorithm, KeyWrapAlgorithm, SignatureAlgorithm +from .._models import JsonWebKey if TYPE_CHECKING: - from . import EncryptionAlgorithm, KeyWrapAlgorithm, SignatureAlgorithm - from typing import Any, Optional + # Import client only during TYPE_CHECKING to avoid circular dependency + from ._client import CryptographyClient + + +SIGN_ALGORITHM_MAP = { + SHA256: SignatureAlgorithm.rs256, + SHA384: SignatureAlgorithm.rs384, + SHA512: SignatureAlgorithm.rs512, +} +OAEP_MAP = {SHA1: EncryptionAlgorithm.rsa_oaep, SHA256: EncryptionAlgorithm.rsa_oaep_256} +PSS_MAP = { + SignatureAlgorithm.rs256: SignatureAlgorithm.ps256, + SignatureAlgorithm.rs384: SignatureAlgorithm.ps384, + SignatureAlgorithm.rs512: SignatureAlgorithm.ps512, +} + + +def get_encryption_algorithm(padding: AsymmetricPadding) -> EncryptionAlgorithm: + """Maps an `AsymmetricPadding` to an encryption algorithm. + + :param padding: The padding to use. + :type padding: :class:`~cryptography.hazmat.primitives.asymmetric.padding.AsymmetricPadding` + + :returns: The corresponding Key Vault encryption algorithm. + :rtype: EncryptionAlgorithm + """ + if isinstance(padding, OAEP): + # Public algorithm property was only added in https://github.com/pyca/cryptography/pull/9582 + # _algorithm property has been available in every version of the OAEP class, so we use it as a backup + try: + algorithm = padding.algorithm # type: ignore[attr-defined] + except AttributeError: + algorithm = padding._algorithm # pylint:disable=protected-access + mapped_algorithm = OAEP_MAP.get(type(algorithm)) + if mapped_algorithm is None: + raise ValueError(f"Unsupported algorithm: {algorithm.name}") + + # Public mgf property was added at the same time as algorithm + try: + mgf = padding.mgf # type: ignore[attr-defined] + except AttributeError: + mgf = padding._mgf # pylint:disable=protected-access + if not isinstance(mgf, MGF1): + raise ValueError(f"Unsupported MGF: {mgf}") + + elif isinstance(padding, PKCS1v15): + mapped_algorithm = EncryptionAlgorithm.rsa1_5 + else: + raise ValueError(f"Unsupported padding: {padding.name}") + + return mapped_algorithm + + +def get_signature_algorithm(padding: AsymmetricPadding, algorithm: HashAlgorithm) -> SignatureAlgorithm: + """Maps an `AsymmetricPadding` and `HashAlgorithm` to a signature algorithm. + + :param padding: The padding to use. + :type padding: :class:`~cryptography.hazmat.primitives.asymmetric.padding.AsymmetricPadding` + :param algorithm: The algorithm to use. + :type algorithm: :class:`~cryptography.hazmat.primitives.hashes.HashAlgorithm` + + :returns: The corresponding Key Vault signature algorithm. + :rtype: SignatureAlgorithm + """ + mapped_algorithm = SIGN_ALGORITHM_MAP.get(type(algorithm)) + if mapped_algorithm is None: + raise ValueError(f"Unsupported algorithm: {algorithm.name}") + + # If PSS padding is requested, use the PSS equivalent algorithm + if isinstance(padding, PSS): + mapped_algorithm = PSS_MAP.get(mapped_algorithm) + + # Public mgf property was only added in https://github.com/pyca/cryptography/pull/9582 + # _mgf property has been available in every version of the PSS class, so we use it as a backup + try: + mgf = padding.mgf # type: ignore[attr-defined] + except AttributeError: + mgf = padding._mgf # pylint:disable=protected-access + if not isinstance(mgf, MGF1): + raise ValueError(f"Unsupported MGF: {mgf}") + + # The only other padding accepted is PKCS1v15 + elif not isinstance(padding, PKCS1v15): + raise ValueError(f"Unsupported padding: {padding.name}") + + return cast(SignatureAlgorithm, mapped_algorithm) + + +class KeyVaultRSAPublicKey(RSAPublicKey): + """An `RSAPublicKey` implementation based on a key managed by Key Vault. + + Only synchronous clients and operations are supported at this time. + """ + + def __init__(self, client: "CryptographyClient", key_material: JsonWebKey) -> None: + """Creates a `KeyVaultRSAPublicKey` from a `CryptographyClient` and key. + + :param client: The client that will be used to communicate with Key Vault. + :type client: :class:`~azure.keyvault.keys.crypto.CryptographyClient` + :param key_material: They Key Vault key's material, as a `JsonWebKey`. + :type key_material: :class:`~azure.keyvault.keys.JsonWebKey` + """ + self._client: "CryptographyClient" = client + self._key: JsonWebKey = key_material + + def encrypt(self, plaintext: bytes, padding: AsymmetricPadding) -> bytes: + """Encrypts the given plaintext. + + :param bytes plaintext: Plaintext to encrypt. + :param padding: The padding to use. Supported paddings are `OAEP` and `PKCS1v15`. For `OAEP` padding, supported + hash algorithms are `SHA1` and `SHA256`. The only supported mask generation function is `MGF1`. See + https://learn.microsoft.com/azure/key-vault/keys/about-keys-details for details. + :type padding: :class:`~cryptography.hazmat.primitives.asymmetric.padding.AsymmetricPadding` + + :returns: The encrypted ciphertext, as bytes. + :rtype: bytes + """ + mapped_algorithm = get_encryption_algorithm(padding) + result = self._client.encrypt(mapped_algorithm, plaintext) + return result.ciphertext + + @property + def key_size(self) -> int: + """The bit length of the public modulus. + + :returns: The key's size. + :rtype: int + """ + public_key = self.public_numbers().public_key() + return public_key.key_size + + def public_numbers(self) -> RSAPublicNumbers: + """Returns an `RSAPublicNumbers` representing the key's public numbers. + + :returns: The public numbers of the key. + :rtype: RSAPublicNumbers + """ + e = int.from_bytes(self._key.e, "big") # type: ignore[attr-defined] + n = int.from_bytes(self._key.n, "big") # type: ignore[attr-defined] + return RSAPublicNumbers(e, n) + + def public_bytes(self, encoding: Encoding, format: PublicFormat) -> bytes: + """Allows serialization of the key to bytes. + + This function uses the `cryptography` library's implementation. + Encoding (`PEM` or `DER`) and format (`SubjectPublicKeyInfo` or `PKCS1`) are chosen to define the exact + serialization. + + :param encoding: A value from the `Encoding` enum. + :type encoding: :class:`~cryptography.hazmat.primitives.serialization.Encoding` + :param format: A value from the `PublicFormat` enum. + :type format: :class:`~cryptography.hazmat.primitives.serialization.PublicFormat` + + :returns: The serialized key. + :rtype: bytes + """ + public_key = self.public_numbers().public_key() + return public_key.public_bytes(encoding=encoding, format=format) + + def verify( + self, + signature: bytes, + data: bytes, + padding: AsymmetricPadding, + algorithm: Union[Prehashed, HashAlgorithm], + ) -> None: + """Verifies the signature of the data. + + :param bytes signature: The signature to sign, as bytes. + :param bytes data: The message string that was signed., as bytes. + :param padding: The padding to use. Supported paddings are `PKCS1v15` and `PSS`. For `PSS`, the only supported + mask generation function is `MGF1`. See https://learn.microsoft.com/azure/key-vault/keys/about-keys-details + for details. + :type padding: :class:`~cryptography.hazmat.primitives.asymmetric.padding.AsymmetricPadding` + :param algorithm: The algorithm to sign with. Only `HashAlgorithm`s are supported -- specifically, `SHA256`, + `SHA384`, and `SHA512`. + :type algorithm: :class:`~cryptography.hazmat.primitives.asymmetric.utils.Prehashed` or + :class:`~cryptography.hazmat.primitives.hashes.HashAlgorithm` + + :raises InvalidSignature: If the signature does not validate. + """ + if isinstance(algorithm, Prehashed): + raise ValueError("`Prehashed` algorithms are unsupported. Please provide a `HashAlgorithm` instead.") + mapped_algorithm = get_signature_algorithm(padding, algorithm) + digest = Hash(algorithm) + digest.update(data) + result = self._client.verify(mapped_algorithm, digest.finalize(), signature) + if not result.is_valid: + raise InvalidSignature(f"The provided signature '{signature!r}' is invalid.") + + def recover_data_from_signature( + self, signature: bytes, padding: AsymmetricPadding, algorithm: Optional[HashAlgorithm] + ) -> bytes: + """Recovers the signed data from the signature. Only supported with `cryptography` version 3.3 and above. + + This function uses the `cryptography` library's implementation. + The data typically contains the digest of the original message string. The `padding` and `algorithm` parameters + must match the ones used when the signature was created for the recovery to succeed. + The `algorithm` parameter can also be set to None to recover all the data present in the signature, without + regard to its format or the hash algorithm used for its creation. + + For `PKCS1v15` padding, this method returns the data after removing the padding layer. For standard signatures + the data contains the full `DigestInfo` structure. For non-standard signatures, any data can be returned, + including zero-length data. + + Normally you should use the `verify()` function to validate the signature. But for some non-standard signature + formats you may need to explicitly recover and validate the signed data. The following are some examples: + * Some old Thawte and Verisign timestamp certificates without `DigestInfo`. + * Signed MD5/SHA1 hashes in TLS 1.1 or earlier + (`RFC 4346 `_, section 4.7). + * IKE version 1 signatures without `DigestInfo` + (`RFC 2409 `_, section 5.1). + + :param bytes signature: The signature. + :param padding: An instance of `AsymmetricPadding`. Recovery is only supported with some of the padding types. + :type padding: :class:`~cryptography.hazmat.primitives.asymmetric.padding.AsymmetricPadding` + :param algorithm: An instance of `HashAlgorithm`. Can be None to return all the data present in the signature. + :type algorithm: :class:`~cryptography.hazmat.primitives.hashes.HashAlgorithm` + + :returns: The signed data. + :rtype: bytes + :raises: + NotImplementedError if the local version of `cryptography` doesn't support this method. + :class:`~cryptography.exceptions.InvalidSignature` if the signature is invalid. + :class:`~cryptography.exceptions.UnsupportedAlgorithm` if the signature data recovery is not supported with + the provided `padding` type. + """ + public_key = self.public_numbers().public_key() + try: + return public_key.recover_data_from_signature(signature=signature, padding=padding, algorithm=algorithm) + except AttributeError as exc: + raise NotImplementedError( + "This method is only available on `cryptography`>=3.3. Update your package version to use this method." + ) from exc + + def __eq__(self, other: object) -> bool: + """Checks equality. + + :param object other: Another object to compare with this instance. Currently, only comparisons with + `KeyVaultRSAPrivateKey` or `JsonWebKey` instances are supported. + + :returns: True if the objects are equal; False otherwise. + :rtype: bool + """ + if isinstance(other, KeyVaultRSAPublicKey): + return all(getattr(self._key, field) == getattr(other._key, field) for field in self._key._FIELDS) + if isinstance(other, JsonWebKey): + return all(getattr(self._key, field) == getattr(other, field) for field in self._key._FIELDS) + return False + + def verifier( # pylint:disable=docstring-missing-param,docstring-missing-return,docstring-missing-rtype + self, signature: bytes, padding: AsymmetricPadding, algorithm: HashAlgorithm + ) -> NoReturn: + """Not implemented. This method was deprecated in `cryptography` 2.0 and removed in 37.0.0.""" + raise NotImplementedError() + + +class KeyVaultRSAPrivateKey(RSAPrivateKey): + """An `RSAPrivateKey` implementation based on a key managed by Key Vault. + + Only synchronous clients and operations are supported at this time. + """ + + def __init__(self, client: "CryptographyClient", key_material: JsonWebKey) -> None: + """Creates a `KeyVaultRSAPrivateKey` from a `CryptographyClient` and key. + + :param client: The client that will be used to communicate with Key Vault. + :type client: :class:`~azure.keyvault.keys.crypto.CryptographyClient` + :param key_material: They Key Vault key's material, as a `JsonWebKey`. + :type key_material: :class:`~azure.keyvault.keys.JsonWebKey` + """ + self._client: "CryptographyClient" = client + self._key: JsonWebKey = key_material + + def decrypt(self, ciphertext: bytes, padding: AsymmetricPadding) -> bytes: + """Decrypts the provided ciphertext. + + :param bytes ciphertext: Encrypted bytes to decrypt. + :param padding: The padding to use. Supported paddings are `OAEP` and `PKCS1v15`. For `OAEP` padding, supported + hash algorithms are `SHA1` and `SHA256`. The only supported mask generation function is `MGF1`. See + https://learn.microsoft.com/azure/key-vault/keys/about-keys-details for details. + :type padding: :class:`~cryptography.hazmat.primitives.asymmetric.padding.AsymmetricPadding` + + :returns: The decrypted plaintext, as bytes. + :rtype: bytes + """ + mapped_algorithm = get_encryption_algorithm(padding) + result = self._client.decrypt(mapped_algorithm, ciphertext) + return result.plaintext + + @property + def key_size(self) -> int: + """The bit length of the public modulus. + + :returns: The key's size. + :rtype: int + """ + # Key size only requires public modulus, which we can always get + # Relying on private numbers instead would cause issues for keys stored in KV (which doesn't return private key) + return self.public_key().key_size + + def public_key(self) -> KeyVaultRSAPublicKey: + """The `RSAPublicKey` associated with this private key, as a `KeyVaultRSAPublicKey`. + + The public key implementation will use the same underlying cryptography client as this private key. + + :returns: The `KeyVaultRSAPublicKey` associated with the key. + :rtype: :class:`~azure.keyvault.keys.crypto.KeyVaultRSAPublicKey` + """ + return KeyVaultRSAPublicKey(self._client, self._key) + + def sign( + self, + data: bytes, + padding: AsymmetricPadding, + algorithm: Union[Prehashed, HashAlgorithm], + ) -> bytes: + """Signs the data. + + :param bytes data: The data to sign, as bytes. + :param padding: The padding to use. Supported paddings are `PKCS1v15` and `PSS`. For `PSS`, the only supported + mask generation function is `MGF1`. See https://learn.microsoft.com/azure/key-vault/keys/about-keys-details + for details. + :type padding: :class:`~cryptography.hazmat.primitives.asymmetric.padding.AsymmetricPadding` + :param algorithm: The algorithm to sign with. Only `HashAlgorithm`s are supported -- specifically, `SHA256`, + `SHA384`, and `SHA512`. + :type algorithm: :class:`~cryptography.hazmat.primitives.asymmetric.utils.Prehashed` or + :class:`~cryptography.hazmat.primitives.hashes.HashAlgorithm` + + :returns: The signature, as bytes. + :rtype: bytes + """ + if isinstance(algorithm, Prehashed): + raise ValueError("`Prehashed` algorithms are unsupported. Please provide a `HashAlgorithm` instead.") + mapped_algorithm = get_signature_algorithm(padding, algorithm) + digest = Hash(algorithm) + digest.update(data) + result = self._client.sign(mapped_algorithm, digest.finalize()) + return result.signature + + def private_numbers(self) -> RSAPrivateNumbers: + """Returns an `RSAPrivateNumbers` representing the key's private numbers. + + :returns: The private numbers of the key. + :rtype: :class:`~cryptography.hazmat.primitives.asymmetric.rsa.RSAPrivateNumbers` + """ + # Fetch public numbers from JWK + e = int.from_bytes(self._key.e, "big") # type: ignore[attr-defined] + n = int.from_bytes(self._key.n, "big") # type: ignore[attr-defined] + public_numbers = RSAPublicNumbers(e, n) + + # Fetch private numbers from JWK + p = int.from_bytes(self._key.p, "big") if self._key.p else None # type: ignore[attr-defined] + q = int.from_bytes(self._key.q, "big") if self._key.q else None # type: ignore[attr-defined] + d = int.from_bytes(self._key.d, "big") if self._key.d else None # type: ignore[attr-defined] + dmp1 = int.from_bytes(self._key.dp, "big") if self._key.dp else None # type: ignore[attr-defined] + dmq1 = int.from_bytes(self._key.dq, "big") if self._key.dq else None # type: ignore[attr-defined] + iqmp = int.from_bytes(self._key.qi, "big") if self._key.qi else None # type: ignore[attr-defined] + + # Calculate any missing attributes + if d is None: + raise ValueError("An 'RSAPrivateNumbers' couldn't be created with the available key material.") + if p is None or q is None: + p, q = rsa_recover_prime_factors(n, e, d) + if dmp1 is None: + dmp1 = rsa_crt_dmp1(d, p) + if dmq1 is None: + dmq1 = rsa_crt_dmq1(d, q) + if iqmp is None: + iqmp = rsa_crt_iqmp(p, q) + + return RSAPrivateNumbers(p, q, d, dmp1, dmq1, iqmp, public_numbers) + + def private_bytes( + self, encoding: Encoding, format: PrivateFormat, encryption_algorithm: KeySerializationEncryption + ) -> bytes: + """Allows serialization of the key to bytes. + + This function uses the `cryptography` library's implementation. + Encoding (`PEM` or `DER`) and format (`TraditionalOpenSSL`, `OpenSSH`, or `PKCS8`) and encryption algorithm + (such as `BestAvailableEncryption` or `NoEncryption`) are chosen to define the exact serialization. + + :param encoding: A value from the `Encoding` enum. + :type encoding: :class:`~cryptography.hazmat.primitives.serialization.Encoding` + :param format: A value from the `PrivateFormat` enum. + :type format: :class:`~cryptography.hazmat.primitives.serialization.PrivateFormat` + :param encryption_algorithm: An instance of an object conforming to the `KeySerializationEncryption` interface. + :type encryption_algorithm: :class:`~cryptography.hazmat.primitives.serialization.KeySerializationEncryption` + + :returns: The serialized key. + :rtype: bytes + """ + try: + private_numbers = self.private_numbers() + except ValueError as exc: + raise ValueError("Insufficient key material to serialize the private key.") from exc + private_key = private_numbers.private_key() + return private_key.private_bytes(encoding=encoding, format=format, encryption_algorithm=encryption_algorithm) + + def signer( # pylint:disable=docstring-missing-param,docstring-missing-return,docstring-missing-rtype + self, padding: AsymmetricPadding, algorithm: HashAlgorithm + ) -> NoReturn: + """Not implemented. This method was deprecated in `cryptography` 2.0 and removed in 37.0.0.""" + raise NotImplementedError() class DecryptResult: @@ -18,7 +444,7 @@ class DecryptResult: :param bytes plaintext: The decrypted bytes """ - def __init__(self, key_id: "Optional[str]", algorithm: "EncryptionAlgorithm", plaintext: bytes) -> None: + def __init__(self, key_id: Optional[str], algorithm: EncryptionAlgorithm, plaintext: bytes) -> None: self.key_id = key_id self.algorithm = algorithm self.plaintext = plaintext @@ -39,9 +465,7 @@ class EncryptResult: authenticated algorithm """ - def __init__( - self, key_id: "Optional[str]", algorithm: "EncryptionAlgorithm", ciphertext: bytes, **kwargs - ) -> None: + def __init__(self, key_id: Optional[str], algorithm: EncryptionAlgorithm, ciphertext: bytes, **kwargs) -> None: self.key_id = key_id self.algorithm = algorithm self.ciphertext = ciphertext @@ -59,7 +483,7 @@ class SignResult: :param bytes signature: """ - def __init__(self, key_id: "Optional[str]", algorithm: "SignatureAlgorithm", signature: bytes) -> None: + def __init__(self, key_id: Optional[str], algorithm: SignatureAlgorithm, signature: bytes) -> None: self.key_id = key_id self.algorithm = algorithm self.signature = signature @@ -74,7 +498,7 @@ class VerifyResult: :type algorithm: ~azure.keyvault.keys.crypto.SignatureAlgorithm """ - def __init__(self, key_id: "Optional[str]", is_valid: bool, algorithm: "SignatureAlgorithm") -> None: + def __init__(self, key_id: Optional[str], is_valid: bool, algorithm: SignatureAlgorithm) -> None: self.key_id = key_id self.is_valid = is_valid self.algorithm = algorithm @@ -89,7 +513,7 @@ class UnwrapResult: :param bytes key: The unwrapped key """ - def __init__(self, key_id: "Optional[str]", algorithm: "KeyWrapAlgorithm", key: bytes) -> None: + def __init__(self, key_id: Optional[str], algorithm: KeyWrapAlgorithm, key: bytes) -> None: self.key_id = key_id self.algorithm = algorithm self.key = key @@ -104,7 +528,7 @@ class WrapResult: :param bytes encrypted_key: The encrypted key bytes """ - def __init__(self, key_id: "Optional[str]", algorithm: "KeyWrapAlgorithm", encrypted_key: bytes) -> None: + def __init__(self, key_id: Optional[str], algorithm: KeyWrapAlgorithm, encrypted_key: bytes) -> None: self.key_id = key_id self.algorithm = algorithm self.encrypted_key = encrypted_key diff --git a/sdk/keyvault/azure-keyvault-keys/tests/test_crypto_client.py b/sdk/keyvault/azure-keyvault-keys/tests/test_crypto_client.py index b3d617c0905c..e14d98cdb3b2 100644 --- a/sdk/keyvault/azure-keyvault-keys/tests/test_crypto_client.py +++ b/sdk/keyvault/azure-keyvault-keys/tests/test_crypto_client.py @@ -7,14 +7,20 @@ import os import time from datetime import datetime, timezone +from unittest import mock from devtools_testutils import recorded_by_proxy, set_bodiless_matcher -try: - from unittest import mock -except ImportError: - import mock - +from cryptography.hazmat.primitives.hashes import SHA1, SHA256 +from cryptography.hazmat.primitives.asymmetric.padding import MGF1, OAEP, PKCS1v15 +from cryptography.hazmat.primitives.asymmetric.rsa import ( + rsa_crt_dmp1, + rsa_crt_dmq1, + rsa_crt_iqmp, + RSAPrivateNumbers, + RSAPublicNumbers +) +from cryptography.hazmat.primitives.serialization import Encoding, NoEncryption, PrivateFormat, PublicFormat import pytest from azure.core.exceptions import AzureError, HttpResponseError from azure.core.pipeline.policies import SansIOHTTPPolicy @@ -45,6 +51,32 @@ no_get = get_decorator(permissions=NO_GET) +def _to_bytes(hex): + if len(hex) % 2: + hex = f"0{hex}" + return codecs.decode(hex, "hex_codec") + + +# RSA key with private components so that the JWK can be used for private operations +TEST_JWK = { + "kty":"RSA", + "key_ops":["decrypt", "verify", "unwrapKey"], + "n":_to_bytes( + "00a0914d00234ac683b21b4c15d5bed887bdc959c2e57af54ae734e8f00720d775d275e455207e3784ceeb60a50a4655dd72a7a94d271e8ee8f7959a669ca6e775bf0e23badae991b4529d978528b4bd90521d32dd2656796ba82b6bbfc7668c8f5eeb5053747fd199319d29a8440d08f4412d527ff9311eda71825920b47b1c46b11ab3e91d7316407e89c7f340f7b85a34042ce51743b27d4718403d34c7b438af6181be05e4d11eb985d38253d7fe9bf53fc2f1b002d22d2d793fa79a504b6ab42d0492804d7071d727a06cf3a8893aa542b1503f832b296371b6707d4dc6e372f8fe67d8ded1c908fde45ce03bc086a71487fa75e43aa0e0679aa0d20efe35" + ), + "e":_to_bytes("10001"), + "p":_to_bytes( + "00d1deac8d68ddd2c1fd52d5999655b2cf1565260de5269e43fd2a85f39280e1708ffff0682166cb6106ee5ea5e9ffd9f98d0becc9ff2cda2febc97259215ad84b9051e563e14a051dce438bc6541a24ac4f014cf9732d36ebfc1e61a00d82cbe412090f7793cfbd4b7605be133dfc3991f7e1bed5786f337de5036fc1e2df4cf3" + ), + "q":_to_bytes( + "00c3dc66b641a9b73cd833bc439cd34fc6574465ab5b7e8a92d32595a224d56d911e74624225b48c15a670282a51c40d1dad4bc2e9a3c8dab0c76f10052dfb053bc6ed42c65288a8e8bace7a8881184323f94d7db17ea6dfba651218f931a93b8f738f3d8fd3f6ba218d35b96861a0f584b0ab88ddcf446b9815f4d287d83a3237" + ), + "d":_to_bytes( + "627c7d24668148fe2252c7fa649ea8a5a9ed44d75c766cda42b29b660e99404f0e862d4561a6c95af6a83d213e0a2244b03cd28576473215073785fb067f015da19084ade9f475e08b040a9a2c7ba00253bb8125508c9df140b75161d266be347a5e0f6900fe1d8bbf78ccc25eeb37e0c9d188d6e1fc15169ba4fe12276193d77790d2326928bd60d0d01d6ead8d6ac4861abadceec95358fd6689c50a1671a4a936d2376440a41445501da4e74bfb98f823bd19c45b94eb01d98fc0d2f284507f018ebd929b8180dbe6381fdd434bffb7800aaabdd973d55f9eaf9bb88a6ea7b28c2a80231e72de1ad244826d665582c2362761019de2e9f10cb8bcc2625649" + ) +} + + class TestCryptoClient(KeyVaultTestCase, KeysTestCase): plaintext = b"5063e6aaa845f150200547944fd199679c98ed6f99da0a0b2dafeaf1f4684496fd532c1c229968cb9dee44957fcef7ccef59ceda0b362e56bcd78fd3faee5781c623c0bb22b35beabde0664fd30e0e824aba3dd1b0afffc4a3d955ede20cf6a854d52cfd" iv = codecs.decode("89b8adbfb07345e3598932a09c517441", "hex_codec") @@ -92,11 +124,6 @@ def _validate_ec_key_bundle(self, key_curve, key_attributes, vault, key_name, kt assert key_attributes.properties.created_on and key_attributes.properties.updated_on,"Missing required date attributes." def _import_test_key(self, client, name, hardware_protected=False): - def _to_bytes(hex): - if len(hex) % 2: - hex = f"0{hex}" - return codecs.decode(hex, "hex_codec") - key = JsonWebKey( kty="RSA-HSM" if hardware_protected else "RSA", key_ops=["encrypt", "decrypt", "sign", "verify", "wrapKey", "unwrapKey"], @@ -194,6 +221,37 @@ def test_encrypt_and_decrypt(self, key_client, is_hsm, **kwargs): assert EncryptionAlgorithm.rsa_oaep == result.algorithm assert self.plaintext == result.plaintext + @pytest.mark.parametrize("api_version,is_hsm", only_vault_latest) + @KeysClientPreparer() + @recorded_by_proxy + def test_encrypt_and_decrypt_with_managed_key(self, key_client, **kwargs): + set_bodiless_matcher() + key_name = self.get_resource_name("keycrypt") + + imported_key = self._import_test_key(key_client, key_name) + crypto_client = self.create_crypto_client(imported_key.id, api_version=key_client.api_version) + + # Create a KeyVaultRSAPublicKey that can perform encryption with `cryptography`'s interface + public_key = crypto_client.create_rsa_public_key() + algorithm = SHA1() + mgf = MGF1(algorithm) + padding = OAEP(mgf, algorithm, None) + ciphertext = public_key.encrypt(self.plaintext, padding) + + # Create a KeyVaultRSAPrivateKey that can perform decryption with `cryptography`'s interface + private_key = crypto_client.create_rsa_private_key() + plaintext = private_key.decrypt(ciphertext=ciphertext, padding=padding) + assert self.plaintext == plaintext + + # Use cryptography library's own implementation to validate ours (as well as our public/private numbers) + crypto_public_key = public_key.public_numbers().public_key() + crypto_ciphertext = crypto_public_key.encrypt(self.plaintext, padding) + # Create a crypto client from private JWK since we can't get the private components from an imported key + crypto_client = CryptographyClient.from_jwk(jwk=TEST_JWK) + crypto_private_key = crypto_client.create_rsa_private_key().private_numbers().private_key() + crypto_plaintext = crypto_private_key.decrypt(ciphertext=crypto_ciphertext, padding=padding) + assert crypto_plaintext == plaintext + @pytest.mark.parametrize("api_version,is_hsm", no_get) @KeysClientPreparer(permissions=NO_GET) @recorded_by_proxy @@ -215,6 +273,40 @@ def test_sign_and_verify(self, key_client, is_hsm, **kwargs): assert result.algorithm == SignatureAlgorithm.rs256 assert verified.is_valid + @pytest.mark.parametrize("api_version,is_hsm", only_vault_latest) + @KeysClientPreparer(permissions=NO_GET) + @recorded_by_proxy + def test_sign_and_verify_with_managed_key(self, key_client, is_hsm, **kwargs): + key_name = self.get_resource_name("keysign") + + imported_key = self._import_test_key(key_client, key_name, hardware_protected=is_hsm) + crypto_client = self.create_crypto_client(imported_key.id, api_version=key_client.api_version) + + # Create a KeyVaultRSAPrivateKey that can perform signing with `cryptography`'s interface + private_key = crypto_client.create_rsa_private_key() + algorithm = SHA256() + padding = PKCS1v15() + signature = private_key.sign(self.plaintext, padding, algorithm) + + # Create a KeyVaultRSAPublicKey that can perform verifying with `cryptography`'s interface + public_key = crypto_client.create_rsa_public_key() + public_key.verify(signature, self.plaintext, padding, algorithm) + + # Use cryptography library's own implementation to validate ours (as well as our public/private numbers) + # Create a crypto client from private JWK since we can't get the private components from an imported key + crypto_client = CryptographyClient.from_jwk(jwk=TEST_JWK) + private_numbers = crypto_client.create_rsa_private_key().private_numbers() + crypto_private_key = private_numbers.private_key() + crypto_signature = crypto_private_key.sign(self.plaintext, padding, algorithm) + + # PKCS#1 signing produces deterministic signatures, so we can compare the two signatures we generated + # PSS padding is nondeterministic, by comparison + assert signature == crypto_signature + + crypto_public_key = private_numbers.public_numbers.public_key() + crypto_public_key.verify(crypto_signature, self.plaintext, padding, algorithm) + crypto_public_key.verify(signature, self.plaintext, padding, algorithm) + @pytest.mark.parametrize("api_version,is_hsm", no_get) @KeysClientPreparer(permissions=NO_GET) @recorded_by_proxy @@ -756,8 +848,7 @@ def test_local_only_mode_no_service_calls(): def test_local_only_mode_raise(): """A local-only CryptographyClient should raise an exception if an operation can't be performed locally""" - jwk = {"kty":"RSA", "key_ops":["decrypt", "verify", "unwrapKey"], "n":b"10011", "e":b"10001"} - client = CryptographyClient.from_jwk(jwk=jwk) + client = CryptographyClient.from_jwk(jwk=TEST_JWK) # Algorithm not supported locally with pytest.raises(NotImplementedError) as ex: @@ -778,9 +869,8 @@ def test_local_only_mode_raise(): assert f"{KeyOperation.verify}" in str(ex.value) # Algorithm not supported locally, and operation not included in JWK permissions - with pytest.raises(NotImplementedError) as ex: + with pytest.raises(AzureError) as ex: client.sign(SignatureAlgorithm.rs256, b"...") - assert f"{SignatureAlgorithm.rs256}" in str(ex.value) assert f"{KeyOperation.sign}" in str(ex.value) # Algorithm not supported locally @@ -917,6 +1007,94 @@ def test_decrypt_argument_validation(): assert "iv" in str(ex.value) and "required" in str(ex.value) +def test_rsa_public_key_public_numbers(): + """Verify behavior of KeyVaultRSAPublicKey.public_numbers""" + + client = CryptographyClient.from_jwk(jwk=TEST_JWK) + public_key = client.create_rsa_public_key() + public_numbers = public_key.public_numbers() + assert public_numbers.e == int.from_bytes(TEST_JWK["e"], "big") + assert public_numbers.n == int.from_bytes(TEST_JWK["n"], "big") + + +def test_rsa_public_key_equals(): + """Verify behavior of KeyVaultRSAPublicKey.__eq__ against a JWK and KeyVaultRSAPublicKey instance""" + + client = CryptographyClient.from_jwk(jwk=TEST_JWK) + public_key = client.create_rsa_public_key() + assert public_key == JsonWebKey(**TEST_JWK) + key_dupe = client.create_rsa_public_key() + assert public_key == key_dupe + + +def test_rsa_public_key_public_bytes(): + """Verify behavior of KeyVaultRSAPublicKey.public_bytes""" + + client = CryptographyClient.from_jwk(jwk=TEST_JWK) + public_key = client.create_rsa_public_key() + public_bytes = public_key.public_bytes(Encoding.PEM, PublicFormat.PKCS1) + + public_numbers = public_key.public_numbers() + crypto_public_numbers = RSAPublicNumbers(e=public_numbers.e, n=public_numbers.n) + crypto_public_bytes = crypto_public_numbers.public_key().public_bytes(Encoding.PEM, PublicFormat.PKCS1) + assert public_bytes == crypto_public_bytes + + +def test_rsa_public_key_private_key_size(): + """Verify that KeyVaultRSAPublicKey.key_size and KeyVaultRSAPrivateKey.key_size are equal for the same key""" + + client = CryptographyClient.from_jwk(jwk=TEST_JWK) + public_key = client.create_rsa_public_key() + private_key = client.create_rsa_private_key() + assert public_key.key_size == private_key.key_size == 2048 + + +def test_rsa_private_key_public_key(): + """Verify behavior of KeyVaultRSAPrivateKey.public_key against a JWK and KeyVaultRSAPublicKey instance""" + + client = CryptographyClient.from_jwk(jwk=TEST_JWK) + public_key = client.create_rsa_public_key() + private_key = client.create_rsa_private_key() + assert private_key.public_key() == public_key + + +def test_rsa_private_key_private_numbers(): + """Verify behavior of KeyVaultRSAPrivateKey.private_numbers""" + + client = CryptographyClient.from_jwk(jwk=TEST_JWK) + private_key = client.create_rsa_private_key() + private_numbers = private_key.private_numbers() + assert private_numbers.d == int.from_bytes(TEST_JWK["d"], "big") + assert private_numbers.p == int.from_bytes(TEST_JWK["p"], "big") + assert private_numbers.q == int.from_bytes(TEST_JWK["q"], "big") + assert private_numbers.dmp1 == rsa_crt_dmp1(private_numbers.d, private_numbers.p) + assert private_numbers.dmq1 == rsa_crt_dmq1(private_numbers.d, private_numbers.q) + assert private_numbers.iqmp == rsa_crt_iqmp(private_numbers.p, private_numbers.q) + + +def test_rsa_private_key_private_bytes(): + """Verify behavior of KeyVaultRSAPrivateKey.private_bytes""" + + client = CryptographyClient.from_jwk(jwk=TEST_JWK) + private_key = client.create_rsa_private_key() + private_bytes = private_key.private_bytes(Encoding.PEM, PrivateFormat.TraditionalOpenSSL, NoEncryption()) + + private_numbers = private_key.private_numbers() + crypto_private_numbers = RSAPrivateNumbers( + p=private_numbers.p, + q=private_numbers.q, + d=private_numbers.d, + dmp1=private_numbers.dmp1, + dmq1=private_numbers.dmq1, + iqmp=private_numbers.iqmp, + public_numbers=private_numbers.public_numbers, + ) + crypto_private_bytes = crypto_private_numbers.private_key().private_bytes( + Encoding.PEM, PrivateFormat.TraditionalOpenSSL, NoEncryption() + ) + assert private_bytes == crypto_private_bytes + + def test_retain_url_port(): """Regression test for https://github.com/Azure/azure-sdk-for-python/issues/24446"""