diff --git a/jose/backends/__init__.py b/jose/backends/__init__.py index 82b5c2f6..c0089468 100644 --- a/jose/backends/__init__.py +++ b/jose/backends/__init__.py @@ -2,8 +2,11 @@ try: from jose.backends.pycrypto_backend import RSAKey except ImportError: - from jose.backends.cryptography_backend import CryptographyRSAKey as RSAKey - + try: + from jose.backends.cryptography_backend import CryptographyRSAKey as RSAKey + except ImportError: + from jose.backends.rsa_backend import RSAKey + try: from jose.backends.cryptography_backend import CryptographyECKey as ECKey except ImportError: diff --git a/jose/backends/cryptography_backend.py b/jose/backends/cryptography_backend.py index 1a1bc308..d02384d5 100644 --- a/jose/backends/cryptography_backend.py +++ b/jose/backends/cryptography_backend.py @@ -282,16 +282,30 @@ def public_key(self): return self return self.__class__(self.prepared_key.public_key(), self._algorithm) - def to_pem(self): + def to_pem(self, pem_format='PKCS8'): if self.is_public(): - return self.prepared_key.public_bytes( + if pem_format == 'PKCS8': + fmt = serialization.PublicFormat.SubjectPublicKeyInfo + elif pem_format == 'PKCS1': + fmt = serialization.PublicFormat.PKCS1 + else: + raise ValueError("Invalid format specified: %r" % pem_format) + pem = self.prepared_key.public_bytes( encoding=serialization.Encoding.PEM, - format=serialization.PublicFormat.SubjectPublicKeyInfo + format=fmt ) + return pem + + if pem_format == 'PKCS8': + fmt = serialization.PrivateFormat.PKCS8 + elif pem_format == 'PKCS1': + fmt = serialization.PrivateFormat.TraditionalOpenSSL + else: + raise ValueError("Invalid format specified: %r" % pem_format) return self.prepared_key.private_bytes( encoding=serialization.Encoding.PEM, - format=serialization.PrivateFormat.TraditionalOpenSSL, + format=fmt, encryption_algorithm=serialization.NoEncryption() ) diff --git a/jose/backends/pycrypto_backend.py b/jose/backends/pycrypto_backend.py index b5c24c60..14f1e6c6 100644 --- a/jose/backends/pycrypto_backend.py +++ b/jose/backends/pycrypto_backend.py @@ -9,6 +9,7 @@ from Crypto.Util.asn1 import DerSequence from jose.backends.base import Key +from jose.backends.rsa_backend import pem_to_spki from jose.utils import base64_to_long, long_to_base64 from jose.constants import ALGORITHMS from jose.exceptions import JWKError @@ -143,16 +144,23 @@ def public_key(self): return self return self.__class__(self.prepared_key.publickey(), self._algorithm) - def to_pem(self): - pem = self.prepared_key.exportKey('PEM', pkcs=1) + def to_pem(self, pem_format='PKCS8'): + if pem_format == 'PKCS8': + pkcs = 8 + elif pem_format == 'PKCS1': + pkcs = 1 + else: + raise ValueError("Invalid pem format specified: %r" % (pem_format,)) - # pycryptodome fix - begin = b'-----BEGIN RSA PUBLIC KEY-----' - end = b'-----END RSA PUBLIC KEY-----' - if pem.startswith(begin) and pem.strip().endswith(end): - pem = b'-----BEGIN PUBLIC KEY-----' + pem.strip()[len(begin):-len(end)] + b'-----END PUBLIC KEY-----' - if not pem.endswith(b'\n'): - pem = pem + b'\n' + if self.is_public(): + pem = self.prepared_key.exportKey('PEM', pkcs=1) + if pkcs == 8: + pem = pem_to_spki(pem, fmt='PKCS8') + else: + pem = pem_to_spki(pem, fmt='PKCS1') + return pem + else: + pem = self.prepared_key.exportKey('PEM', pkcs=pkcs) return pem def to_dict(self): diff --git a/jose/backends/rsa_backend.py b/jose/backends/rsa_backend.py new file mode 100644 index 00000000..f66cf3a7 --- /dev/null +++ b/jose/backends/rsa_backend.py @@ -0,0 +1,237 @@ +import six +from pyasn1.codec.der import encoder +from pyasn1.type import univ + +import rsa as pyrsa +import rsa.pem as pyrsa_pem +from rsa.asn1 import OpenSSLPubKey, AsnPubKey, PubKeyHeader + +from jose.backends.base import Key +from jose.constants import ALGORITHMS +from jose.exceptions import JWKError +from jose.utils import base64_to_long, long_to_base64 + + +PKCS8_RSA_HEADER = b'0\x82\x04\xbd\x02\x01\x000\r\x06\t*\x86H\x86\xf7\r\x01\x01\x01\x05\x00' +# Functions gcd and rsa_recover_prime_factors were copied from cryptography 1.9 +# to enable pure python rsa module to be in compliance with section 6.3.1 of RFC7518 +# which requires only private exponent (d) for private key. + +def _gcd(a, b): + """Calculate the Greatest Common Divisor of a and b. + + Unless b==0, the result will have the same sign as b (so that when + b is divided by it, the result comes out positive). + """ + while b: + a, b = b, a%b + return a + + +# Controls the number of iterations rsa_recover_prime_factors will perform +# to obtain the prime factors. Each iteration increments by 2 so the actual +# maximum attempts is half this number. +_MAX_RECOVERY_ATTEMPTS = 1000 + + +def _rsa_recover_prime_factors(n, e, d): + """ + Compute factors p and q from the private exponent d. We assume that n has + no more than two factors. This function is adapted from code in PyCrypto. + """ + # See 8.2.2(i) in Handbook of Applied Cryptography. + ktot = d * e - 1 + # The quantity d*e-1 is a multiple of phi(n), even, + # and can be represented as t*2^s. + t = ktot + while t % 2 == 0: + t = t // 2 + # Cycle through all multiplicative inverses in Zn. + # The algorithm is non-deterministic, but there is a 50% chance + # any candidate a leads to successful factoring. + # See "Digitalized Signatures and Public Key Functions as Intractable + # as Factorization", M. Rabin, 1979 + spotted = False + a = 2 + while not spotted and a < _MAX_RECOVERY_ATTEMPTS: + k = t + # Cycle through all values a^{t*2^i}=a^k + while k < ktot: + cand = pow(a, k, n) + # Check if a^k is a non-trivial root of unity (mod n) + if cand != 1 and cand != (n - 1) and pow(cand, 2, n) == 1: + # We have found a number such that (cand-1)(cand+1)=0 (mod n). + # Either of the terms divides n. + p = _gcd(cand + 1, n) + spotted = True + break + k *= 2 + # This value was not any good... let's try another! + a += 2 + if not spotted: + raise ValueError("Unable to compute factors p and q from exponent d.") + # Found ! + q, r = divmod(n, p) + assert r == 0 + p, q = sorted((p, q), reverse=True) + return (p, q) + + +def pem_to_spki(pem, fmt='PKCS8'): + key = RSAKey(pem, ALGORITHMS.RS256) + return key.to_pem(fmt) + + +class RSAKey(Key): + SHA256 = 'SHA-256' + SHA384 = 'SHA-384' + SHA512 = 'SHA-512' + + def __init__(self, key, algorithm): + if algorithm not in ALGORITHMS.RSA: + raise JWKError('hash_alg: %s is not a valid hash algorithm' % algorithm) + + self.hash_alg = { + ALGORITHMS.RS256: self.SHA256, + ALGORITHMS.RS384: self.SHA384, + ALGORITHMS.RS512: self.SHA512 + }.get(algorithm) + self._algorithm = algorithm + + if isinstance(key, dict): + self._prepared_key = self._process_jwk(key) + return + + if isinstance(key, (pyrsa.PublicKey, pyrsa.PrivateKey)): + self._prepared_key = key + return + + if isinstance(key, six.string_types): + key = key.encode('utf-8') + + if isinstance(key, six.binary_type): + try: + self._prepared_key = pyrsa.PublicKey.load_pkcs1(key) + except ValueError: + try: + self._prepared_key = pyrsa.PublicKey.load_pkcs1_openssl_pem(key) + except ValueError: + try: + self._prepared_key = pyrsa.PrivateKey.load_pkcs1(key) + except ValueError: + try: + # python-rsa does not support PKCS8 yet so we have to manually remove OID + der = pyrsa_pem.load_pem(key, b'PRIVATE KEY') + header, der = der[:22], der[22:] + if header != PKCS8_RSA_HEADER: + raise ValueError("Invalid PKCS8 header") + self._prepared_key = pyrsa.PrivateKey._load_pkcs1_der(der) + except ValueError as e: + raise JWKError(e) + return + raise JWKError('Unable to parse an RSA_JWK from key: %s' % key) + + def _process_jwk(self, jwk_dict): + if not jwk_dict.get('kty') == 'RSA': + raise JWKError("Incorrect key type. Expected: 'RSA', Recieved: %s" % jwk_dict.get('kty')) + + e = base64_to_long(jwk_dict.get('e')) + n = base64_to_long(jwk_dict.get('n')) + + if not 'd' in jwk_dict: + return pyrsa.PublicKey(e=e, n=n) + else: + d = base64_to_long(jwk_dict.get('d')) + extra_params = ['p', 'q', 'dp', 'dq', 'qi'] + + if any(k in jwk_dict for k in extra_params): + # Precomputed private key parameters are available. + if not all(k in jwk_dict for k in extra_params): + # These values must be present when 'p' is according to + # Section 6.3.2 of RFC7518, so if they are not we raise + # an error. + raise JWKError('Precomputed private key parameters are incomplete.') + + p = base64_to_long(jwk_dict['p']) + q = base64_to_long(jwk_dict['q']) + return pyrsa.PrivateKey(e=e, n=n, d=d, p=p, q=q) + else: + p, q = _rsa_recover_prime_factors(n, e, d) + return pyrsa.PrivateKey(n=n, e=e, d=d, p=p, q=q) + + + + def sign(self, msg): + return pyrsa.sign(msg, self._prepared_key, self.hash_alg) + + def verify(self, msg, sig): + try: + pyrsa.verify(msg, sig, self._prepared_key) + return True + except pyrsa.pkcs1.VerificationError: + return False + + def is_public(self): + return isinstance(self._prepared_key, pyrsa.PublicKey) + + def public_key(self): + if isinstance(self._prepared_key, pyrsa.PublicKey): + return self + return self.__class__(pyrsa.PublicKey(n=self._prepared_key.n, e=self._prepared_key.e), self._algorithm) + + def to_pem(self, pem_format='PKCS8'): + + if isinstance(self._prepared_key, pyrsa.PrivateKey): + der = self._prepared_key.save_pkcs1(format='DER') + if pem_format == 'PKCS8': + pem = pyrsa_pem.save_pem(PKCS8_RSA_HEADER + der, pem_marker='PRIVATE KEY') + elif pem_format == 'PKCS1': + pem = pyrsa_pem.save_pem(der, pem_marker='RSA PRIVATE KEY') + else: + raise ValueError("Invalid pem format specified: %r" % (pem_format,)) + else: + if pem_format == 'PKCS8': + asn_key = AsnPubKey() + asn_key.setComponentByName('modulus', self._prepared_key.n) + asn_key.setComponentByName('publicExponent', self._prepared_key.e) + der = encoder.encode(asn_key) + + header = PubKeyHeader() + header['oid'] = univ.ObjectIdentifier('1.2.840.113549.1.1.1') + pub_key = OpenSSLPubKey() + pub_key['header'] = header + pub_key['key'] = univ.BitString.fromOctetString(der) + + der = encoder.encode(pub_key) + pem = pyrsa_pem.save_pem(der, pem_marker='PUBLIC KEY') + elif pem_format == 'PKCS1': + der = self._prepared_key.save_pkcs1(format='DER') + pem = pyrsa_pem.save_pem(der, pem_marker='RSA PUBLIC KEY') + else: + raise ValueError("Invalid pem format specified: %r" % (pem_format,)) + return pem + + def to_dict(self): + if not self.is_public(): + public_key = self.public_key()._prepared_key + else: + public_key = self._prepared_key + + data = { + 'alg': self._algorithm, + 'kty': 'RSA', + 'n': long_to_base64(public_key.n), + 'e': long_to_base64(public_key.e), + } + + if not self.is_public(): + data.update({ + 'd': long_to_base64(self._prepared_key.d), + 'p': long_to_base64(self._prepared_key.p), + 'q': long_to_base64(self._prepared_key.q), + 'dp': long_to_base64(self._prepared_key.exp1), + 'dq': long_to_base64(self._prepared_key.exp2), + 'qi': long_to_base64(self._prepared_key.coef), + }) + + return data diff --git a/requirements.txt b/requirements.txt index fb3ef680..f75de0a8 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,5 @@ pycrypto six future +rsa +ecdsa diff --git a/setup.py b/setup.py index 5ca1f1a1..aceeff76 100644 --- a/setup.py +++ b/setup.py @@ -1,7 +1,6 @@ #!/usr/bin/env python # -*- coding: utf-8 -*- import os - import jose from setuptools import setup @@ -25,6 +24,7 @@ def get_packages(package): extras_require = { 'cryptography': ['cryptography'], 'pycrypto': ['pycrypto >=2.6.0, <2.7.0'], + 'pycryptodome': ['pycryptodome >=3.3.1, <4.0.0'], } @@ -55,5 +55,5 @@ def get_packages(package): 'Topic :: Utilities', ], extras_require=extras_require, - install_requires=['six <2.0', 'ecdsa <1.0', 'future <1.0', 'pycryptodome >=3.3.1, <4.0.0'] + install_requires=['six <2.0', 'ecdsa <1.0', 'rsa', 'future <1.0'] ) diff --git a/tests/algorithms/test_RSA.py b/tests/algorithms/test_RSA.py index 25582ff5..3cb74078 100644 --- a/tests/algorithms/test_RSA.py +++ b/tests/algorithms/test_RSA.py @@ -3,6 +3,7 @@ from jose.backends.pycrypto_backend import RSAKey from jose.backends.cryptography_backend import CryptographyRSAKey +from jose.backends.rsa_backend import RSAKey as PurePythonRSAKey from jose.constants import ALGORITHMS from jose.exceptions import JOSEError, JWKError @@ -70,7 +71,7 @@ class TestRSAAlgorithm: - @pytest.mark.parametrize("Backend", [RSAKey, CryptographyRSAKey]) + @pytest.mark.parametrize("Backend", [RSAKey, CryptographyRSAKey, PurePythonRSAKey]) def test_RSA_key(self, Backend): assert not Backend(private_key, ALGORITHMS.RS256).is_public() @@ -93,25 +94,25 @@ def test_cryptography_RSA_key_instance(self): pem = pubkey.to_pem() assert pem.startswith(b'-----BEGIN PUBLIC KEY-----') - @pytest.mark.parametrize("Backend", [RSAKey, CryptographyRSAKey]) + @pytest.mark.parametrize("Backend", [RSAKey, CryptographyRSAKey, PurePythonRSAKey]) def test_string_secret(self, Backend): key = 'secret' with pytest.raises(JOSEError): Backend(key, ALGORITHMS.RS256) - @pytest.mark.parametrize("Backend", [RSAKey, CryptographyRSAKey]) + @pytest.mark.parametrize("Backend", [RSAKey, CryptographyRSAKey, PurePythonRSAKey]) def test_object(self, Backend): key = object() with pytest.raises(JOSEError): Backend(key, ALGORITHMS.RS256) - @pytest.mark.parametrize("Backend", [RSAKey, CryptographyRSAKey]) + @pytest.mark.parametrize("Backend", [RSAKey, CryptographyRSAKey, PurePythonRSAKey]) def test_bad_cert(self, Backend): key = '-----BEGIN CERTIFICATE-----' with pytest.raises(JOSEError): Backend(key, ALGORITHMS.RS256) - @pytest.mark.parametrize("Backend", [RSAKey, CryptographyRSAKey]) + @pytest.mark.parametrize("Backend", [RSAKey, CryptographyRSAKey, PurePythonRSAKey]) def test_invalid_algorithm(self, Backend): with pytest.raises(JWKError): Backend(private_key, ALGORITHMS.ES256) @@ -119,7 +120,7 @@ def test_invalid_algorithm(self, Backend): with pytest.raises(JWKError): Backend({'kty': 'bla'}, ALGORITHMS.RS256) - @pytest.mark.parametrize("Backend", [RSAKey, CryptographyRSAKey]) + @pytest.mark.parametrize("Backend", [RSAKey, CryptographyRSAKey, PurePythonRSAKey]) def test_RSA_jwk(self, Backend): key = { "kty": "RSA", @@ -157,13 +158,13 @@ def test_RSA_jwk(self, Backend): # None of the extra parameters are present, but 'key' is still private. assert not Backend(key, ALGORITHMS.RS256).is_public() - @pytest.mark.parametrize("Backend", [RSAKey, CryptographyRSAKey]) + @pytest.mark.parametrize("Backend", [RSAKey, CryptographyRSAKey, PurePythonRSAKey]) def test_string_secret(self, Backend): key = 'secret' with pytest.raises(JOSEError): Backend(key, ALGORITHMS.RS256) - @pytest.mark.parametrize("Backend", [RSAKey, CryptographyRSAKey]) + @pytest.mark.parametrize("Backend", [RSAKey, CryptographyRSAKey, PurePythonRSAKey]) def test_get_public_key(self, Backend): key = Backend(private_key, ALGORITHMS.RS256) public_key = key.public_key() @@ -172,10 +173,32 @@ def test_get_public_key(self, Backend): assert public_key2.is_public() assert public_key == public_key2 - @pytest.mark.parametrize("Backend", [RSAKey, CryptographyRSAKey]) + @pytest.mark.parametrize("Backend", [RSAKey, CryptographyRSAKey, PurePythonRSAKey]) def test_to_pem(self, Backend): key = Backend(private_key, ALGORITHMS.RS256) - assert key.to_pem().strip() == private_key.strip() + assert key.to_pem(pem_format='PKCS1').strip() == private_key.strip() + + pkcs8 = key.to_pem(pem_format='PKCS8').strip() + assert pkcs8 != private_key.strip() + + newkey = Backend(pkcs8, ALGORITHMS.RS256) + assert newkey.to_pem(pem_format='PKCS1').strip() == private_key.strip() + + @pytest.mark.parametrize("BackendFrom", [RSAKey, CryptographyRSAKey, PurePythonRSAKey]) + @pytest.mark.parametrize("BackendTo", [RSAKey, CryptographyRSAKey, PurePythonRSAKey]) + def test_public_key_to_pem(self, BackendFrom, BackendTo): + key = BackendFrom(private_key, ALGORITHMS.RS256) + pubkey = key.public_key() + + pkcs1_pub = pubkey.to_pem(pem_format='PKCS1').strip() + pkcs8_pub = pubkey.to_pem(pem_format='PKCS8').strip() + assert pkcs1_pub != pkcs8_pub, BackendFrom + + pub1 = BackendTo(pkcs1_pub, ALGORITHMS.RS256) + pub8 = BackendTo(pkcs8_pub, ALGORITHMS.RS256) + + assert pkcs8_pub == pub1.to_pem(pem_format='PKCS8').strip() + assert pkcs1_pub == pub8.to_pem(pem_format='PKCS1').strip() def assert_parameters(self, as_dict, private): assert isinstance(as_dict, dict) @@ -207,7 +230,7 @@ def assert_roundtrip(self, key, Backend): ALGORITHMS.RS256 ).to_dict() == key.to_dict() - @pytest.mark.parametrize("Backend", [RSAKey, CryptographyRSAKey]) + @pytest.mark.parametrize("Backend", [RSAKey, CryptographyRSAKey, PurePythonRSAKey]) def test_to_dict(self, Backend): key = Backend(private_key, ALGORITHMS.RS256) self.assert_parameters(key.to_dict(), private=True) @@ -215,8 +238,8 @@ def test_to_dict(self, Backend): self.assert_roundtrip(key, Backend) self.assert_roundtrip(key.public_key(), Backend) - @pytest.mark.parametrize("BackendSign", [RSAKey, CryptographyRSAKey]) - @pytest.mark.parametrize("BackendVerify", [RSAKey, CryptographyRSAKey]) + @pytest.mark.parametrize("BackendSign", [RSAKey, CryptographyRSAKey, PurePythonRSAKey]) + @pytest.mark.parametrize("BackendVerify", [RSAKey, CryptographyRSAKey, PurePythonRSAKey]) def test_signing_parity(self, BackendSign, BackendVerify): key_sign = BackendSign(private_key, ALGORITHMS.RS256) key_verify = BackendVerify(private_key, ALGORITHMS.RS256).public_key() @@ -230,9 +253,15 @@ def test_signing_parity(self, BackendSign, BackendVerify): # invalid signature assert not key_verify.verify(msg, b'n' * 64) - @pytest.mark.parametrize("Backend", [RSAKey, CryptographyRSAKey]) + @pytest.mark.parametrize("Backend", [RSAKey, CryptographyRSAKey, PurePythonRSAKey]) def test_pycrypto_unencoded_cleartext(self, Backend): key = Backend(private_key, ALGORITHMS.RS256) - with pytest.raises(JWKError): - key.sign(True) + key = RSAKey(private_key, ALGORITHMS.RS256) + msg = b'test' + signature = key.sign(msg) + public_key = key.public_key() + + assert public_key.verify(msg, signature) == True + assert public_key.verify(msg, 1) == False + diff --git a/tox.ini b/tox.ini index 8046b0f7..df48dd3f 100644 --- a/tox.ini +++ b/tox.ini @@ -4,8 +4,10 @@ skip_missing_interpreters = True [testenv] commands = + pip --version py.test --cov-report term-missing --cov jose deps = + six future pycrypto ecdsa