Skip to content

Commit b9379f7

Browse files
committed
refactor!: make rfc folders private
1 parent 056b2aa commit b9379f7

70 files changed

Lines changed: 2888 additions & 2706 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ filterwarnings = ["error"]
8383
branch = true
8484
source = ["joserfc"]
8585
omit = [
86-
"src/joserfc/rfc7797/*",
86+
"src/joserfc/rfc*/*",
8787
]
8888

8989
[tool.coverage.paths]

src/joserfc/_keys.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,11 @@
11
from __future__ import annotations
22
import typing as t
33
import random
4-
from .rfc7517.types import AnyKey, KeyParameters, DictKey
5-
from .rfc7518.oct_key import OctKey
6-
from .rfc7518.rsa_key import RSAKey
7-
from .rfc7518.ec_key import ECKey
8-
from .rfc8037.okp_key import OKPKey
4+
from ._rfc7517.types import AnyKey, KeyParameters, DictKey
5+
from ._rfc7518.oct_key import OctKey
6+
from ._rfc7518.rsa_key import RSAKey
7+
from ._rfc7518.ec_key import ECKey
8+
from ._rfc8037.okp_key import OKPKey
99
from .errors import (
1010
MissingKeyError,
1111
InvalidKeyIdError,

src/joserfc/_rfc7515/__init__.py

Whitespace-only changes.

src/joserfc/_rfc7515/compact.py

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
import typing as t
2+
from .model import JWSAlgModel, CompactSignature
3+
from ..errors import (
4+
DecodeError,
5+
MissingAlgorithmError,
6+
)
7+
from ..util import (
8+
json_b64encode,
9+
json_b64decode,
10+
urlsafe_b64encode,
11+
urlsafe_b64decode,
12+
)
13+
14+
15+
def sign_compact(obj: CompactSignature, alg: JWSAlgModel, key: t.Any) -> bytes:
16+
header_segment = json_b64encode(obj.headers())
17+
payload_segment = urlsafe_b64encode(obj.payload)
18+
signing_input = header_segment + b"." + payload_segment
19+
signature = urlsafe_b64encode(alg.sign(signing_input, key))
20+
return signing_input + b"." + signature
21+
22+
23+
def verify_compact(obj: CompactSignature, alg: JWSAlgModel, key: t.Any) -> bool:
24+
signing_input = obj.segments["header"] + b"." + obj.segments["payload"]
25+
try:
26+
sig = urlsafe_b64decode(obj.segments["signature"])
27+
except (TypeError, ValueError):
28+
return False
29+
return alg.verify(signing_input, sig, key)
30+
31+
32+
def detach_compact_content(value: str) -> str:
33+
# https://www.rfc-editor.org/rfc/rfc7515#appendix-F
34+
parts = value.split(".")
35+
parts[1] = ""
36+
return ".".join(parts)
37+
38+
39+
def decode_header(header_segment: bytes) -> t.Dict[str, t.Any]:
40+
try:
41+
protected: t.Dict[str, t.Any] = json_b64decode(header_segment)
42+
if "alg" not in protected:
43+
raise MissingAlgorithmError()
44+
except (TypeError, ValueError):
45+
raise DecodeError("Invalid header")
46+
return protected

src/joserfc/_rfc7515/json.py

Lines changed: 144 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,144 @@
1+
from __future__ import annotations
2+
import typing as t
3+
import copy
4+
from .model import (
5+
HeaderMember,
6+
GeneralJSONSignature,
7+
FlattenedJSONSignature,
8+
)
9+
from .types import (
10+
JSONSignatureDict,
11+
GeneralJSONSerialization,
12+
FlattenedJSONSerialization,
13+
)
14+
from .registry import JWSRegistry
15+
from ..util import (
16+
json_b64encode,
17+
json_b64decode,
18+
urlsafe_b64encode,
19+
urlsafe_b64decode,
20+
)
21+
from ..errors import DecodeError
22+
23+
FindKey = t.Callable[[HeaderMember], t.Any]
24+
25+
26+
def sign_general_json(
27+
members: list[HeaderMember],
28+
payload: bytes,
29+
registry: JWSRegistry,
30+
find_key: FindKey,
31+
) -> GeneralJSONSerialization:
32+
payload_segment = urlsafe_b64encode(payload)
33+
signatures: list[JSONSignatureDict] = [
34+
sign_json_member(payload_segment, member, registry, find_key) for member in members
35+
]
36+
return {
37+
"payload": payload_segment.decode("utf-8"),
38+
"signatures": signatures,
39+
}
40+
41+
42+
def sign_flattened_json(
43+
member: HeaderMember,
44+
payload: bytes,
45+
registry: JWSRegistry,
46+
find_key: FindKey,
47+
) -> FlattenedJSONSerialization:
48+
payload_segment = urlsafe_b64encode(payload)
49+
signature = sign_json_member(payload_segment, member, registry, find_key)
50+
data: FlattenedJSONSerialization = {"payload": payload_segment.decode("utf-8"), **signature}
51+
return data
52+
53+
54+
def sign_json_member(
55+
payload_segment: bytes, member: HeaderMember, registry: JWSRegistry, find_key: FindKey
56+
) -> JSONSignatureDict:
57+
headers = member.headers()
58+
registry.check_header(headers)
59+
alg = registry.get_alg(headers["alg"])
60+
key = find_key(member)
61+
key.check_use("sig")
62+
alg.check_key_type(key)
63+
if member.protected:
64+
protected_segment = json_b64encode(member.protected)
65+
else:
66+
protected_segment = b""
67+
signing_input = b".".join([protected_segment, payload_segment])
68+
signature = urlsafe_b64encode(alg.sign(signing_input, key))
69+
rv: JSONSignatureDict = {"signature": signature.decode("utf-8")}
70+
if member.protected:
71+
rv["protected"] = protected_segment.decode("utf-8")
72+
if member.header:
73+
rv["header"] = member.header
74+
return rv
75+
76+
77+
def extract_general_json(value: GeneralJSONSerialization) -> GeneralJSONSignature:
78+
payload_segment: bytes = value["payload"].encode("utf-8")
79+
try:
80+
payload = urlsafe_b64decode(payload_segment)
81+
except (TypeError, ValueError):
82+
raise DecodeError("Invalid payload")
83+
84+
signatures: list[JSONSignatureDict] = value["signatures"]
85+
members = [__signature_to_member(sig) for sig in signatures]
86+
obj = GeneralJSONSignature(members, payload)
87+
obj.signatures = signatures
88+
obj.segments = {"payload": payload_segment}
89+
return obj
90+
91+
92+
def __signature_to_member(sig: JSONSignatureDict) -> HeaderMember:
93+
member = HeaderMember()
94+
if "protected" in sig:
95+
protected_segment = sig["protected"]
96+
member.protected = json_b64decode(protected_segment)
97+
if "header" in sig:
98+
member.header = sig["header"]
99+
return member
100+
101+
102+
def verify_general_json(obj: GeneralJSONSignature, registry: JWSRegistry, find_key: FindKey) -> bool:
103+
payload_segment = obj.segments["payload"]
104+
for index, signature in enumerate(obj.signatures):
105+
member = obj.members[index]
106+
if not verify_signature(member, signature, payload_segment, registry, find_key):
107+
return False
108+
return True
109+
110+
111+
def verify_flattened_json(obj: FlattenedJSONSignature, registry: JWSRegistry, find_key: FindKey) -> bool:
112+
payload_segment = obj.segments["payload"]
113+
assert obj.signature is not None
114+
return verify_signature(obj.member, obj.signature, payload_segment, registry, find_key)
115+
116+
117+
def verify_signature(
118+
member: HeaderMember,
119+
signature: JSONSignatureDict,
120+
payload_segment: bytes,
121+
registry: JWSRegistry,
122+
find_key: FindKey,
123+
) -> bool:
124+
headers = member.headers()
125+
registry.check_header(headers)
126+
alg = registry.get_alg(headers["alg"])
127+
key = find_key(member)
128+
key.check_use("sig")
129+
alg.check_key_type(key)
130+
if "protected" in signature:
131+
protected_segment = signature["protected"].encode("utf-8")
132+
else:
133+
protected_segment = b""
134+
sig = urlsafe_b64decode(signature["signature"].encode("utf-8"))
135+
signing_input = b".".join([protected_segment, payload_segment])
136+
return alg.verify(signing_input, sig, key)
137+
138+
139+
def detach_json_content(value: t.Dict[str, t.Any]) -> t.Dict[str, t.Any]:
140+
# https://www.rfc-editor.org/rfc/rfc7515#appendix-F
141+
rv = copy.deepcopy(value) # don't alter original value
142+
if "payload" in rv:
143+
del rv["payload"]
144+
return rv

src/joserfc/_rfc7515/model.py

Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,127 @@
1+
from __future__ import annotations
2+
from typing import Any, ClassVar
3+
from abc import ABCMeta, abstractmethod
4+
from .types import SegmentsDict, JSONSignatureDict
5+
from ..errors import InvalidKeyTypeError
6+
from ..registry import Header
7+
8+
9+
class HeaderMember:
10+
"""A header member of the JSON signature. It is combined with protected header,
11+
and unprotected header.
12+
"""
13+
14+
def __init__(self, protected: Header | None = None, header: Header | None = None):
15+
#: protected header
16+
self.protected = protected
17+
#: unprotected header
18+
self.header = header
19+
20+
def headers(self) -> Header:
21+
rv: Header = {}
22+
if self.protected:
23+
rv.update(self.protected)
24+
if self.header:
25+
rv.update(self.header)
26+
return rv
27+
28+
def set_kid(self, kid: str) -> None:
29+
if self.header is None:
30+
self.header = {}
31+
self.header["kid"] = kid
32+
33+
34+
class CompactSignature:
35+
"""JSON Web Signature object for compact mode. This object is used to
36+
represent the JWS instance.
37+
"""
38+
39+
def __init__(self, protected: Header, payload: bytes):
40+
#: protected header
41+
self.protected = protected
42+
#: payload content in bytes
43+
self.payload = payload
44+
self.segments: SegmentsDict = {}
45+
46+
def headers(self) -> Header:
47+
"""Returns protected header values in dict."""
48+
return self.protected
49+
50+
def set_kid(self, kid: str) -> None:
51+
self.protected["kid"] = kid
52+
53+
54+
class FlattenedJSONSignature:
55+
"""JSON Signature object that represents a flattened JSON serialization."""
56+
57+
#: mark it as flattened
58+
flattened: ClassVar[bool] = True
59+
60+
def __init__(self, member: HeaderMember, payload: bytes):
61+
#: the only header member
62+
self.member: HeaderMember = member
63+
#: payload content in bytes
64+
self.payload: bytes = payload
65+
self.signature: JSONSignatureDict | None = None
66+
self.segments: SegmentsDict = {}
67+
68+
@property
69+
def members(self) -> list[HeaderMember]:
70+
"""A list of header members. For flattened JSON serialization, there will
71+
be only one header member."""
72+
return [self.member]
73+
74+
def headers(self) -> Header:
75+
"""Header values in dict."""
76+
return self.member.headers()
77+
78+
79+
class GeneralJSONSignature:
80+
"""JSON Signature object that represents a general JSON serialization."""
81+
82+
#: mark it as not flattened (general)
83+
flattened: ClassVar[bool] = False
84+
85+
def __init__(self, members: list[HeaderMember], payload: bytes):
86+
#: a list of header members
87+
self.members: list[HeaderMember] = members
88+
#: payload content in bytes
89+
self.payload: bytes = payload
90+
self.signatures: list[JSONSignatureDict] = []
91+
self.segments: SegmentsDict = {}
92+
93+
94+
class JWSAlgModel(object, metaclass=ABCMeta):
95+
"""Interface for JWS algorithm. JWA specification (RFC7518) SHOULD
96+
implement the algorithms for JWS with this base implementation.
97+
"""
98+
99+
name: str
100+
description: str
101+
recommended: bool = False
102+
key_type = "oct"
103+
algorithm_type = "JWS"
104+
algorithm_location = "sig"
105+
106+
def check_key_type(self, key: Any) -> None:
107+
if key.key_type != self.key_type:
108+
raise InvalidKeyTypeError(f"Algorithm '{self.name}' requires '{self.key_type}' key")
109+
110+
@abstractmethod
111+
def sign(self, msg: bytes, key: Any) -> bytes:
112+
"""Sign the text msg with a private/sign key.
113+
114+
:param msg: message bytes to be signed
115+
:param key: private key to sign the message
116+
:return: bytes
117+
"""
118+
119+
@abstractmethod
120+
def verify(self, msg: bytes, sig: bytes, key: Any) -> bool:
121+
"""Verify the signature of text msg with a public/verify key.
122+
123+
:param msg: message bytes to be signed
124+
:param sig: result signature to be compared
125+
:param key: public key to verify the signature
126+
:return: boolean
127+
"""

0 commit comments

Comments
 (0)