diff --git a/beacon_api/conf/__init__.py b/beacon_api/conf/__init__.py index 6b02a8cd..fba5bbd7 100644 --- a/beacon_api/conf/__init__.py +++ b/beacon_api/conf/__init__.py @@ -63,13 +63,14 @@ def parse_oauth2_config_file(path: str) -> Any: """Parse configuration file.""" config = ConfigParser() config.read(path) - config_vars: Dict[str, Union[str, bool, None]] = { + config_vars: Dict[str, Union[str, bool, None, List[str]]] = { "server": config.get("oauth2", "server"), "issuers": config.get("oauth2", "issuers"), "userinfo": config.get("oauth2", "userinfo"), "audience": config.get("oauth2", "audience") or None, "verify_aud": bool(strtobool(config.get("oauth2", "verify_aud"))), "bona_fide_value": config.get("oauth2", "bona_fide_value"), + "trusted_jkus": config.get("oauth2", "trusted_jkus", fallback="").split(","), } return convert(config_vars) diff --git a/beacon_api/conf/config.ini b/beacon_api/conf/config.ini index c4a715f2..dd9fd74e 100644 --- a/beacon_api/conf/config.ini +++ b/beacon_api/conf/config.ini @@ -122,3 +122,8 @@ audience= # If your service is not part of any network or AAI, but you still want to use tokens # produced by other AAI parties, set this value to False to skip the audience validation step verify_aud=False + +# Comma separated list of trusted JKUs for checking passports +# Passport with an untrusted JKU will be denied access +# Leave empty to disable JKU checking +trusted_jkus= diff --git a/beacon_api/permissions/ga4gh.py b/beacon_api/permissions/ga4gh.py index 4ec6def6..07917f9f 100644 --- a/beacon_api/permissions/ga4gh.py +++ b/beacon_api/permissions/ga4gh.py @@ -163,6 +163,13 @@ async def get_ga4gh_permissions(token: str) -> Tuple[set, bool]: for encoded_passport in encoded_passports: # Decode passport header, payload = await decode_passport(encoded_passport) + # If trusted_jkus variable is set, only allow passports with a trusted JKU + if not OAUTH2_CONFIG.trusted_jkus == [""]: + # Skip passports with untrusted JKUs + passport_jku = header.get("jku") + if passport_jku not in OAUTH2_CONFIG.trusted_jkus: + LOG.debug("Untrusted JKU.") + continue # Sort passports that carry dataset permissions pass_type = payload.get("ga4gh_visa_v1", {}).get("type") if pass_type == "ControlledAccessGrants": # nosec diff --git a/tests/test.ini b/tests/test.ini index 8aa4ad7a..93373921 100644 --- a/tests/test.ini +++ b/tests/test.ini @@ -116,3 +116,8 @@ audience= # If your service is not part of any network or AAI, but you still want to use tokens # produced by other AAI parties, set this value to False to skip the audience validation step verify_aud=False + +# Comma separated list of trusted JKUs for checking passports +# Passport with an untrusted JKU will be denied access +# Leave empty to disable JKU checking +trusted_jkus=http://test.csc.fi/jwk diff --git a/tests/test_basic.py b/tests/test_basic.py index ce100aa4..1f1d95e1 100644 --- a/tests/test_basic.py +++ b/tests/test_basic.py @@ -9,6 +9,7 @@ from .test_app import PARAMS, generate_token from testfixtures import TempDirectory from test.support.os_helper import EnvironmentVarGuard +from beacon_api.conf import OAUTH2_CONFIG def mock_token(bona_fide, permissions, auth): @@ -70,6 +71,11 @@ async def load_datafile(self, vcf, datafile, datasetId, n=1000, min_ac=1): return ["datasetId", "variants"] +async def mock_get_ga4gh_controlled(input): + """Mock retrieve dataset permissions.""" + return input + + class TestBasicFunctions(unittest.IsolatedAsyncioTestCase): """Test supporting functions.""" @@ -482,5 +488,50 @@ async def test_get_ga4gh_permissions(self, m_userinfo, m_decode, m_controlled, m self.assertEqual(bona_fide_status, True) +class TestCaseCheckJku(unittest.IsolatedAsyncioTestCase): + """Test case.""" + + @unittest.mock.patch("beacon_api.permissions.ga4gh.get_ga4gh_bona_fide") + @unittest.mock.patch("beacon_api.permissions.ga4gh.get_ga4gh_controlled", side_effect=mock_get_ga4gh_controlled) + @unittest.mock.patch("beacon_api.permissions.ga4gh.decode_passport") + @unittest.mock.patch("beacon_api.permissions.ga4gh.retrieve_user_data") + async def test_jku_check(self, m_userinfo, m_decode, m_controller, m_bonafide): + """Test trusted and untrusted jku.""" + # Test: trusted jku + m_userinfo.return_value = [""] + header = {"jku": "http://test.csc.fi/jwk"} + payload = {"ga4gh_visa_v1": {"type": "ControlledAccessGrants"}} + m_decode.return_value = header, payload + m_bonafide.return_value = False + dataset_permissions, bona_fide_status = await get_ga4gh_permissions({}) + self.assertEqual(dataset_permissions, [("", header)]) + self.assertEqual(bona_fide_status, False) + # Test: untrusted jku + m_userinfo.return_value = [""] + header = {"jku": "untrusted_jku"} + payload = {"ga4gh_visa_v1": {"type": "ControlledAccessGrants"}} + m_decode.return_value = header, payload + m_bonafide.return_value = False + dataset_permissions, bona_fide_status = await get_ga4gh_permissions({}) + self.assertEqual(dataset_permissions, []) + self.assertEqual(bona_fide_status, False) + + @unittest.mock.patch("beacon_api.permissions.ga4gh.OAUTH2_CONFIG", new=OAUTH2_CONFIG._replace(trusted_jkus=[""])) + @unittest.mock.patch("beacon_api.permissions.ga4gh.get_ga4gh_bona_fide") + @unittest.mock.patch("beacon_api.permissions.ga4gh.get_ga4gh_controlled", side_effect=mock_get_ga4gh_controlled) + @unittest.mock.patch("beacon_api.permissions.ga4gh.decode_passport") + @unittest.mock.patch("beacon_api.permissions.ga4gh.retrieve_user_data") + async def test_jku_check_not_active(self, m_userinfo, m_decode, m_controller, m_bonafide): + """Test if jku check is skipped when trusted_jkus config var is not set.""" + m_userinfo.return_value = [""] + header = {"jku": "untrusted_jku"} + payload = {"ga4gh_visa_v1": {"type": "ControlledAccessGrants"}} + m_decode.return_value = header, payload + m_bonafide.return_value = False + dataset_permissions, bona_fide_status = await get_ga4gh_permissions({}) + self.assertEqual(dataset_permissions, [("", header)]) + self.assertEqual(bona_fide_status, False) + + if __name__ == "__main__": unittest.main()