Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions jose/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
# flake8: noqa

__version__ = "1.3.2"
__author__ = 'Michael Davis'
Expand All @@ -9,3 +10,4 @@
from .exceptions import JWSError
from .exceptions import ExpiredSignatureError
from .exceptions import JWTError

1 change: 1 addition & 0 deletions jose/constants.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import hashlib


class ALGORITHMS(object):
NONE = 'none'
HS256 = 'HS256'
Expand Down
207 changes: 205 additions & 2 deletions jose/jwk.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,12 @@
import base64
import hashlib
import hmac
import os
import struct

import six
import sys
from binascii import unhexlify

import Crypto.Hash.SHA256
import Crypto.Hash.SHA384
Expand All @@ -14,7 +17,11 @@
from Crypto.Signature import PKCS1_v1_5
from Crypto.Util.asn1 import DerSequence

import pyelliptic
from pyasn1.codec.der import decoder, encoder
from pyasn1.type import univ, tag
import ecdsa
from pyasn1.type.univ import Integer, Sequence

from jose.constants import ALGORITHMS
from jose.exceptions import JWKError
Expand Down Expand Up @@ -67,6 +74,13 @@ def construct(key_data, algorithm=None):
return RSAKey(key_data, algorithm)

if algorithm in ALGORITHMS.EC:
# TODO: Add switch between libraries.
# ECKey uses a more forgiving, python based ecdsa library.
# It may be preferred in a low to medium demand environment,
# return ECKey(key_data, algorithm)

# ECKey2 uses a far stricter, openssl wrapper library.
# It may be preferred in a high demand environment.
return ECKey(key_data, algorithm)


Expand Down Expand Up @@ -245,7 +259,190 @@ def verify(self, msg, sig):
raise JWKError(e)


class ECKey(Key):
class ECKey_clib(Key):
"""
Performs signing and verification operations using clib based ECDSA
using ECIES methods. This uses OpenSSL.EVP_sha256 hashing.

This class requires the pyelliptic package to be installed.

This is based off of the implementation in jose 1.3.2

"""
# pyelliptic will handle value hashing internally.
SHA256 = hashlib.sha256
SHA384 = hashlib.sha384
SHA512 = hashlib.sha512
valid_hash_algs = ALGORITHMS.EC

curve_map = {
ALGORITHMS.ES256: 'prime256v1',
ALGORITHMS.ES384: 'secp384r1',
ALGORITHMS.ES512: 'secp521r1',
}

# Curve OIDs are tuples that ASN1 uses to identify the content of the
# data block.
curve_oids = {
(1, 2, 840, 10045, 3, 1, 7): 'prime256v1', # p256v1 EC Private Key
(1, 3, 132, 0, 10): "secp256k1",
(1, 2, 840, 10045, 2, 1): None, # EC Public Key
(1, 3, 132, 0, 34): 'secp384r1',
(1, 3, 132, 0, 35): 'secp521r1'
}

prepared_key = None
curve = None

def __init__(self, key, algorithm):
if algorithm not in self.valid_hash_algs:
raise JWKError('hash_alg: %s is not a valid hash '
'algorithm', algorithm)
self.curve = self.curve_map.get(algorithm)
sha_map = {
'ES256': 'sha256',
'ES384': 'sha384',
'ES512': 'sha512',
}

if isinstance(key, dict):
self.prepared_key = self._process_jwk(key, sha_map[algorithm])
return

if isinstance(key, six.string_types):
if isinstance(key, six.text_type):
key = key.encode('utf-8')

# be a bit smart about what you're doing.
# keys must be in raw form, not ASN1, so convert if needed.

# The private key provided for testing is a base64 ASN1 that has a
# PEM wrapper. This may take a bit of guesswork...
der = self.pem_to_der(key)
# The key dictates the curve, this emulates the ecdsa lib
(self.curve, raw_key, raw_pub) = self.asn_to_raw(der, self.curve)
self.prepared_key = pyelliptic.ECC(
curve=self.curve,
privkey=raw_key,
pubkey=raw_pub,
hasher=sha_map[algorithm])
return
raise JWKError('Unable to parse an ECKey from key: %s' % key)

def repad(self, st):
"""Add base64 padding back to the end of a stripped character
sequence
"""
pad = '====' if isinstance(st, six.text_type) else b'===='
return st + pad[len(st) % 4:]

def pem_to_der(self, pem):
lines = pem.strip().split(b"\n")
return b''.join([line.strip() for line in lines if b'---' not in line])

def bitstring_to_str(self, bitstring):
"""Convert an ASN1 BitString to a character array."""
if isinstance(bitstring, univ.OctetString):
return bitstring.asOctets()
if isinstance(bitstring, univ.BitString):
# Convert using a 2.7 safe method.
lh = hex(int(
''.join(map(str, bitstring)), base=2))[2:].replace('L', '')
return unhexlify('0' * (len(lh) % 2) + lh)

def asn_to_raw(self, candidate, curve):
"""Extract the ASN1 information and return the curve and key pairs."""
decoded = base64.urlsafe_b64decode(self.repad(candidate))

# if it's already raw... (Most likely a public key)
if len(decoded) == 64:
return curve, None, "\04" + decoded
if decoded[0] == "\04":
return curve, None, decoded

try:
asn_set = decoder.decode(decoded)[0]
except:
raise JWKError("Invalid EC Key")
pri_key = None
pub_key = None
# A private key starts with a Integer(1)
if (isinstance(asn_set[0], univ.Integer) and
asn_set[0] == 1):
# Followed by the OID
curve = self.curve_oids.get(asn_set[2])
if curve:
pri_key = self.bitstring_to_str(asn_set[1])
# And finally the public key
pub_key = self.bitstring_to_str(asn_set[3])
# A public key starts with a sequence
if isinstance(asn_set[0], univ.Sequence):
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

an elif instead of if would be a bit clearer here

# confirm that the public key curve matches up
# with the OID pair type (Also includes the public key OID,
# which we ignore)
pcurve = self.curve_oids.get(asn_set[0][1])
if pcurve:
curve = pcurve
pub_key = self.bitstring_to_str(asn_set[1])
if not curve:
raise JWKError("Unknown or unsupported EC curve type key "
"specified.")
return curve, pri_key, pub_key

def _process_jwk(self, jwk_dict, algorithm="sha256"):
key_type = jwk_dict.get('kty')
if key_type != 'EC':
raise JWKError("Incorrect key type. "
"Expected 'EC' Received: %s" % key_type)
privkey = None
if 'd' in jwk_dict:
privkey = base64.urlsafe_b64decode(self.repad(jwk_dict.get('d')))
key = pyelliptic.ECC(
curve=self.curve,
raw_privkey=privkey,
pubkey_x=base64.urlsafe_b64decode(self.repad(jwk_dict.get('x'))),
pubkey_y=base64.urlsafe_b64decode(self.repad(jwk_dict.get('y'))),
hasher=algorithm
)
return key

def sign(self, msg):
def zpad(num):
return ("0" * (len(num) % 2)) + num

sig_asn = self.prepared_key.sign(msg)
ss = decoder.decode(sig_asn)
# convert the longs into byte array strings.
rh = zpad(hex(long(ss[0][0]))[2:].strip("L"))
sh = zpad(hex(long(ss[0][1]))[2:].strip("L"))

# On occasion, 512 keys can generate values that are encoded as
# uneven bytes. This will break validation, since the digits will
# offset.
max_key_len = max(len(rh), len(sh))
# Make sure the max length is even
max_key_len += (max_key_len % 2)
# prepad the byte strings to split evenly
r = bytearray.fromhex(("0" * (max_key_len - len(rh))) + rh)
s = bytearray.fromhex(("0" * (max_key_len - len(sh))) + sh)
return r+s

def verify(self, msg, sig):
# Convert byte array strings back into their longs
if len(sig) % 2:
raise JWKError("Invalid signature value used.")
split = len(sig)/2
r = Integer(base64_to_long(base64.urlsafe_b64encode(sig[:split])))
s = Integer(base64_to_long(base64.urlsafe_b64encode(sig[split:])))
ss = Sequence(tagSet=[tag.Tag(0, 32, 16)])
ss.setComponentByPosition(0, r)
ss.setComponentByPosition(1, s)
sig_asn = encoder.encode(ss)
ver = self.prepared_key.verify(sig_asn, msg)
return ver


class ECKey_py(Key):
"""
Performs signing and verification operations using
ECDSA and the specified hash function
Expand Down Expand Up @@ -324,5 +521,11 @@ def sign(self, msg):
def verify(self, msg, sig):
try:
return self.prepared_key.verify(sig, msg, hashfunc=self.hash_alg, sigdecode=ecdsa.util.sigdecode_string)
except:
except Exception:
return False


if os.environ.get('JOSE_USE_PYTHON', False):
ECKey = ECKey_py
else:
ECKey = ECKey_clib
10 changes: 6 additions & 4 deletions jose/jws.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,9 @@ def sign(payload, key, headers=None, algorithm=ALGORITHMS.HS256):

if algorithm not in ALGORITHMS.SUPPORTED:
raise JWSError('Algorithm %s not supported.' % algorithm)

encoded_header = _encode_header(algorithm, additional_headers=headers)
encoded_payload = _encode_payload(payload)
signed_output = _sign_header_and_claims(encoded_header, encoded_payload, algorithm, key)

return signed_output


Expand Down Expand Up @@ -161,7 +159,11 @@ def _sign_header_and_claims(encoded_header, encoded_claims, algorithm, key_data)
signing_input = b'.'.join([encoded_header, encoded_claims])
try:
key = jwk.construct(key_data, algorithm)
signature = key.sign(signing_input)
# signing_input is a binary stream, which we need to re-encode to
# a base string. Decoding produces a ustring, encoding produces a
# base string. This resolves compatibility for 2.7+ and 3.x
dat = signing_input.decode('utf8').encode('utf8')
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: This could use a comment explaining why it's being round-tripped through the decoder/encoder

signature = key.sign(dat)
except Exception as e:
raise JWSError(e)

Expand Down Expand Up @@ -211,7 +213,7 @@ def _sig_matches_keys(keys, signing_input, signature, alg):
try:
if key.verify(signing_input, signature):
return True
except:
except Exception:
pass
return False

Expand Down
4 changes: 2 additions & 2 deletions jose/jwt.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@

import binascii
import json

from calendar import timegm
Expand Down Expand Up @@ -382,6 +381,7 @@ def _validate_sub(claims, subject=None):
if claims.get('sub') != subject:
raise JWTClaimsError('Invalid subject')


def _validate_jti(claims):
"""Validates that the 'jti' claim is valid.

Expand Down Expand Up @@ -431,7 +431,7 @@ def _validate_at_hash(claims, access_token, algorithm):
except (TypeError, ValueError):
msg = 'Unable to calculate at_hash to verify against token claims.'
raise JWTClaimsError(msg)

if claims['at_hash'] != expected_hash:
raise JWTClaimsError('at_hash claim does not match access_token.')

Expand Down
4 changes: 4 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
pycrypto
six
future
pyasn1==0.1.9
pyelliptic==1.6.0
# Until 1.6.0 lands, the corrected pyelliptic library is available at:
# -e git+https://github.com/jrconlin/pyelliptic.git#egg=pyelliptic
3 changes: 1 addition & 2 deletions tests/algorithms/test_EC.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,7 @@
class TestECAlgorithm:

def test_EC_key(self):
key = ecdsa.SigningKey.from_pem(private_key)
ECKey(key, ALGORITHMS.ES256)
ECKey(private_key, ALGORITHMS.ES256)

def test_string_secret(self):
key = 'secret'
Expand Down
19 changes: 15 additions & 4 deletions tests/rfc/test_rfc7520.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import os

from jose.jwk import RSAKey
from jose.jws import verify
from jose.jwk import RSAKey, ECKey
from jose.jws import verify, sign

import pytest

Expand Down Expand Up @@ -336,7 +337,10 @@
"use": "sig",
"crv": "P-521",
"x": "AHKZLLOsCOzz5cY97ewNUajB957y-C-U88c3v13nmGZx6sYl_oJXu9A5RkTKqjqvjyekWF-7ytDyRXYgCF5cj0Kt",
"y": "AdymlHvOiLxXkEhayXQnNCvDX4h9htZaCJN34kfmC6pV5OhQHiraVySsUdaQkAgDPrwQrJmbnX9cwlGfP-HqHZR1"
"y": "AdymlHvOiLxXkEhayXQnNCvDX4h9htZaCJN34kfmC6pV5OhQHiraVySsUdaQkAgDPrwQrJmbnX9cwlGfP-HqHZR1",
# The following is part of the private key. It's recorded here for
# testing and diagnostics.
# "d": "CFE43av1ypdfWGD5GgjpHW1fmnatQBh2akdmgLVc0znoq2xytfrNsqKlCsJb0IZkfdPi5umehMosNgn98Xf-sm0"
}

# Figure 1: Elliptic Curve P-521 Public Key
Expand Down Expand Up @@ -1162,10 +1166,17 @@ def test_signature(self):

class TestFourThreeThree:

# The original token doesn't validate because the original 521 key
# is not on curve. I've created a new token that uses a signature that
# is derived from a point that is on curve. NOTE: the original token
# signature is also a few octets short.
token = "eyJhbGciOiJFUzUxMiIsImtpZCI6ImJpbGJvLmJhZ2dpbnNAaG9iYml0b24uZXhhbXBsZSJ9.SXTigJlzIGEgZGFuZ2Vyb3VzIGJ1c2luZXNzLCBGcm9kbywgZ29pbmcgb3V0IHlvdXIgZG9vci4gWW91IHN0ZXAgb250byB0aGUgcm9hZCwgYW5kIGlmIHlvdSBkb24ndCBrZWVwIHlvdXIgZmVldCwgdGhlcmXigJlzIG5vIGtub3dpbmcgd2hlcmUgeW91IG1pZ2h0IGJlIHN3ZXB0IG9mZiB0by4.AE_R_YZCChjn4791jSQCrdPZCNYqHXCTZH0-JZGYNlaAjP2kqaluUIIUnC9qvbu9Plon7KRTzoNEuT4Va2cmL1eJAQy3mtPBu_u_sDDyYjnAMDxXPn7XrT0lw-kvAD890jl8e2puQens_IEKBpHABlsbEPX6sFY8OcGDqoRuBomu9xQ2"

def test_signature(self):

# # If not using ecdsa lib, try to generate a sig value to see if it
# # passes
# if not os.environ.get('JOSE_USE_PYTHON', False):
# sig = sign(expected_payload, ec_public_key, None, 'ES512')
payload = verify(self.token, ec_public_key, 'ES512')
assert payload == expected_payload

Expand Down
Loading