Skip to content
This repository was archived by the owner on Jun 23, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/oidcop/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import secrets

__version__ = "2.3.1"
__version__ = "2.3.5"

DEF_SIGN_ALG = {
"id_token": "RS256",
Expand Down
17 changes: 8 additions & 9 deletions src/oidcop/endpoint_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,13 +119,13 @@ class EndpointContext(OidcContext):
}

def __init__(
self,
conf: Union[dict, OPConfiguration],
server_get: Callable,
keyjar: Optional[KeyJar] = None,
cwd: Optional[str] = "",
cookie_handler: Optional[Any] = None,
httpc: Optional[Any] = None,
self,
conf: Union[dict, OPConfiguration],
server_get: Callable,
keyjar: Optional[KeyJar] = None,
cwd: Optional[str] = "",
cookie_handler: Optional[Any] = None,
httpc: Optional[Any] = None,
):
OidcContext.__init__(self, conf, keyjar, entity_id=conf.get("issuer", ""))
self.conf = conf
Expand Down Expand Up @@ -161,7 +161,7 @@ def __init__(
self.login_hint2acrs = None
self.par_db = {}
self.provider_info = {}
self.scope2claims = SCOPE2CLAIMS
self.scope2claims = conf.get("scopes_to_claims", SCOPE2CLAIMS)
self.session_manager = None
self.sso_ttl = 14400 # 4h
self.symkey = rndstr(24)
Expand Down Expand Up @@ -215,7 +215,6 @@ def __init__(
"cookie_handler",
"authentication",
"id_token",
"scope2claims",
]:
_func = getattr(self, "do_{}".format(item), None)
if _func:
Expand Down
155 changes: 155 additions & 0 deletions tests/test_05_jwt_token.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from oidcmsg.oidc import AccessTokenRequest
from oidcmsg.oidc import AuthorizationRequest
from oidcmsg.time_util import utc_time_sans_frac
from oidcop.scopes import SCOPE2CLAIMS

from oidcop import user_info
from oidcop.authn_event import create_authn_event
Expand Down Expand Up @@ -275,3 +276,157 @@ def test_is_expired(self):
assert access_token.is_active()
# 4000 seconds in the future. Passed the lifetime.
assert access_token.is_active(now=utc_time_sans_frac() + 4000) is False


class TestEndpointWebID(object):
@pytest.fixture(autouse=True)
def create_endpoint(self):
_scope2claims = SCOPE2CLAIMS.copy()
_scope2claims.update({"webid": ["webid"]})
conf = {
"issuer": ISSUER,
"httpc_params": {"verify": False, "timeout": 1},
"capabilities": CAPABILITIES,
"keys": {"uri_path": "jwks.json", "key_defs": KEYDEFS},
"token_handler_args": {
"jwks_file": "private/token_jwks.json",
"code": {"lifetime": 600},
"token": {
"class": "oidcop.token.jwt_token.JWTToken",
"kwargs": {
"lifetime": 3600,
"base_claims": {"eduperson_scoped_affiliation": None},
"add_claims_by_scope": True,
"aud": ["https://example.org/appl"],
},
},
"refresh": {
"class": "oidcop.token.jwt_token.JWTToken",
"kwargs": {"lifetime": 3600, "aud": ["https://example.org/appl"], },
},
"id_token": {
"class": "oidcop.token.id_token.IDToken",
"kwargs": {
"base_claims": {
"email": {"essential": True},
"email_verified": {"essential": True},
}
},
},
},
"endpoint": {
"provider_config": {
"path": "{}/.well-known/openid-configuration",
"class": ProviderConfiguration,
"kwargs": {},
},
"registration": {"path": "{}/registration", "class": Registration, "kwargs": {}, },
"authorization": {
"path": "{}/authorization",
"class": Authorization,
"kwargs": {},
},
"token": {"path": "{}/token", "class": Token, "kwargs": {}},
"session": {"path": "{}/end_session", "class": Session},
"introspection": {"path": "{}/introspection", "class": Introspection},
},
"client_authn": verify_client,
"authentication": {
"anon": {
"acr": INTERNETPROTOCOLPASSWORD,
"class": "oidcop.user_authn.user.NoAuthn",
"kwargs": {"user": "diana"},
}
},
"template_dir": "template",
"userinfo": {
"class": user_info.UserInfo,
"kwargs": {"db_file": full_path("users.json")},
},
"authz": {
"class": AuthzHandling,
"kwargs": {
"grant_config": {
"usage_rules": {
"authorization_code": {
"supports_minting": ["access_token", "refresh_token", "id_token", ],
"max_usage": 1,
},
"access_token": {},
"refresh_token": {
"supports_minting": ["access_token", "refresh_token"],
},
},
"expires_in": 43200,
}
},
},
"claims_interface": {"class": "oidcop.session.claims.ClaimsInterface", "kwargs": {}},
"scopes_to_claims": _scope2claims,
}
server = Server(conf, keyjar=KEYJAR)
self.endpoint_context = server.endpoint_context
self.endpoint_context.cdb["client_1"] = {
"client_secret": "hemligt",
"redirect_uris": [("https://example.com/cb", None)],
"client_salt": "salted",
"token_endpoint_auth_method": "client_secret_post",
"response_types": ["code", "token", "code id_token", "id_token"],
"add_claims": {
"always": {},
"by_scope": {},
},
}
self.session_manager = self.endpoint_context.session_manager
self.user_id = "diana"
self.endpoint = server.server_get("endpoint", "session")

def _create_session(self, auth_req, sub_type="public", sector_identifier=""):
if sector_identifier:
authz_req = auth_req.copy()
authz_req["sector_identifier_uri"] = sector_identifier
else:
authz_req = auth_req
client_id = authz_req["client_id"]
ae = create_authn_event(self.user_id)
return self.session_manager.create_session(
ae, authz_req, self.user_id, client_id=client_id, sub_type=sub_type
)

def _mint_token(self, token_class, grant, session_id, based_on=None, **kwargs):
# Constructing an authorization code is now done
return grant.mint_token(
session_id=session_id,
endpoint_context=self.endpoint_context,
token_class=token_class,
token_handler=self.session_manager.token_handler.handler[token_class],
expires_at=utc_time_sans_frac() + 300, # 5 minutes from now
based_on=based_on,
**kwargs
)

def test_parse(self):
_auth_req = AuthorizationRequest(
client_id="client_1",
redirect_uri="https://example.com/cb",
scope=["openid", "webid"],
state="STATE",
response_type="code",
)

session_id = self._create_session(_auth_req)
# apply consent
grant = self.endpoint_context.authz(session_id=session_id, request=_auth_req)
# grant = self.session_manager[session_id]
code = self._mint_token("authorization_code", grant, session_id)
access_token = self._mint_token(
"access_token", grant, session_id, code, resources=[_auth_req["client_id"]]
)

_verifier = JWT(self.endpoint_context.keyjar)
_info = _verifier.unpack(access_token.value)

assert _info["token_class"] == "access_token"
# assert _info["eduperson_scoped_affiliation"] == ["staff@example.org"]
assert set(_info["aud"]) == {"client_1"}
assert "webid" in _info
3 changes: 2 additions & 1 deletion tests/users.json
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@
},
"eduperson_scoped_affiliation": [
"staff@example.org"
]
],
"webid": "http://bblfish.net/#hjs"
},
"babs": {
"name": "Barbara J Jensen",
Expand Down