|
7 | 7 | from oidcmsg.oidc import AccessTokenRequest |
8 | 8 | from oidcmsg.oidc import AuthorizationRequest |
9 | 9 | from oidcmsg.oidc import RefreshAccessTokenRequest |
| 10 | +from oidcmsg.oidc import ResponseMessage |
10 | 11 |
|
11 | 12 | from oidcendpoint import JWT_BEARER |
12 | 13 | from oidcendpoint.client_authn import verify_client |
@@ -205,8 +206,10 @@ def test_init_with_grant_types_supported(self, conf, grant_types_supported): |
205 | 206 | def test_errors_in_grant_types_supported(self, conf, grant_types_supported): |
206 | 207 | token_conf = conf["endpoint"]["token"] |
207 | 208 | token_conf["kwargs"]["grant_types_supported"] = grant_types_supported |
208 | | - with pytest.raises(Exception): |
| 209 | + with pytest.raises(Exception) as exception_info: |
209 | 210 | EndpointContext(conf) |
| 211 | + assert exception_info.typename == "ProcessError" |
| 212 | + assert "Token Endpoint" in str(exception_info.value) |
210 | 213 |
|
211 | 214 | def test_parse(self): |
212 | 215 | session_id = setup_session(self.endpoint.endpoint_context, AUTH_REQ, uid="user") |
@@ -422,3 +425,49 @@ def test_do_refresh_access_token_not_allowed(self): |
422 | 425 | assert "error_description" in _resp |
423 | 426 | assert _resp["error"] == "invalid_request" |
424 | 427 | assert _resp["error_description"] == "Unsupported grant_type: refresh_token" |
| 428 | + |
| 429 | + def test_custom_grant_class(self, conf): |
| 430 | + """ |
| 431 | + Register a custom grant type supported and see if it works as it should. |
| 432 | + """ |
| 433 | + class CustomGrant: |
| 434 | + def __init__(self, endpoint, config=None): |
| 435 | + self.endpoint = endpoint |
| 436 | + |
| 437 | + def post_parse_request(self, request, client_id="", **kwargs): |
| 438 | + request.testvalue = "test" |
| 439 | + return request |
| 440 | + |
| 441 | + def process_request(self, request, **kwargs): |
| 442 | + """ |
| 443 | + All grant types should return a ResponseMessage class or inherit it. |
| 444 | + """ |
| 445 | + return ResponseMessage(test="successful") |
| 446 | + |
| 447 | + token_conf = conf["endpoint"]["token"] |
| 448 | + token_conf["kwargs"]["grant_types_supported"] = { |
| 449 | + "authorization_code": True, |
| 450 | + "test_grant": { |
| 451 | + "class": CustomGrant |
| 452 | + } |
| 453 | + } |
| 454 | + endpoint_context = EndpointContext(conf) |
| 455 | + token_endpoint = endpoint_context.endpoint["token"] |
| 456 | + token_endpoint.client_authn_method = [None] |
| 457 | + endpoint_context.cdb["client_1"] = { |
| 458 | + "client_secret": "hemligt", |
| 459 | + "redirect_uris": [("https://example.com/cb", None)], |
| 460 | + "client_salt": "salted", |
| 461 | + "endpoint_auth_method": "client_secret_post", |
| 462 | + "response_types": ["code", "token", "code id_token", "id_token"], |
| 463 | + } |
| 464 | + endpoint_context.keyjar.import_jwks(CLIENT_KEYJAR.export_jwks(), "client_1") |
| 465 | + |
| 466 | + request = dict(grant_type="test_grant", client_id="client_1") |
| 467 | + |
| 468 | + parsed_request = token_endpoint.parse_request(request) |
| 469 | + assert parsed_request.testvalue == "test" |
| 470 | + |
| 471 | + response = token_endpoint.process_request(parsed_request) |
| 472 | + assert "test" in response |
| 473 | + assert response["test"] == "successful" |
0 commit comments