diff --git a/msal/authority.py b/msal/authority.py index 13aafa7f..6eb294f1 100644 --- a/msal/authority.py +++ b/msal/authority.py @@ -5,8 +5,6 @@ from urlparse import urlparse import logging -from .exceptions import MsalServiceError - logger = logging.getLogger(__name__) @@ -28,7 +26,9 @@ "b2clogin.cn", "b2clogin.us", "b2clogin.de", + "ciamlogin.com", ] +_CIAM_DOMAIN_SUFFIX = ".ciamlogin.com" class AuthorityBuilder(object): @@ -74,7 +74,8 @@ def __init__( if isinstance(authority_url, AuthorityBuilder): authority_url = str(authority_url) authority, self.instance, tenant = canonicalize(authority_url) - self.is_adfs = tenant.lower() == 'adfs' + is_ciam = self.instance.endswith(_CIAM_DOMAIN_SUFFIX) + self.is_adfs = tenant.lower() == 'adfs' and not is_ciam parts = authority.path.split('/') self._is_b2c = any( self.instance.endswith("." + d) for d in WELL_KNOWN_B2C_HOSTS @@ -103,13 +104,13 @@ def __init__( % authority_url) tenant_discovery_endpoint = payload['tenant_discovery_endpoint'] else: - tenant_discovery_endpoint = ( - 'https://{}:{}{}{}/.well-known/openid-configuration'.format( - self.instance, - 443 if authority.port is None else authority.port, - authority.path, # In B2C scenario, it is "/tenant/policy" - "" if tenant == "adfs" else "/v2.0" # the AAD v2 endpoint - )) + tenant_discovery_endpoint = authority._replace( + path="{prefix}{version}/.well-known/openid-configuration".format( + prefix=tenant if is_ciam and len(authority.path) <= 1 # Path-less CIAM + else authority.path, # In B2C, it is "/tenant/policy" + version="" if self.is_adfs else "/v2.0", + ) + ).geturl() # Keeping original port and query. Query is useful for test. try: openid_config = tenant_discovery( tenant_discovery_endpoint, @@ -144,18 +145,28 @@ def user_realm_discovery(self, username, correlation_id=None, response=None): return {} # This can guide the caller to fall back normal ROPC flow -def canonicalize(authority_url): +def canonicalize(authority_or_auth_endpoint): # Returns (url_parsed_result, hostname_in_lowercase, tenant) - authority = urlparse(authority_url) - parts = authority.path.split("/") - if authority.scheme != "https" or len(parts) < 2 or not parts[1]: - raise ValueError( - "Your given address (%s) should consist of " - "an https url with a minimum of one segment in a path: e.g. " - "https://login.microsoftonline.com/ " - "or https://.b2clogin.com/.onmicrosoft.com/policy" - % authority_url) - return authority, authority.hostname, parts[1] + authority = urlparse(authority_or_auth_endpoint) + if authority.scheme == "https": + parts = authority.path.split("/") + first_part = parts[1] if len(parts) >= 2 and parts[1] else None + if authority.hostname.endswith(_CIAM_DOMAIN_SUFFIX): # CIAM + # Use path in CIAM authority. It will be validated by OIDC Discovery soon + tenant = first_part if first_part else "{}.onmicrosoft.com".format( + # Fallback to sub domain name. This variation may not be advertised + authority.hostname.rsplit(_CIAM_DOMAIN_SUFFIX, 1)[0]) + return authority, authority.hostname, tenant + # AAD + if len(parts) >= 2 and parts[1]: + return authority, authority.hostname, parts[1] + raise ValueError( + "Your given address (%s) should consist of " + "an https url with a minimum of one segment in a path: e.g. " + "https://login.microsoftonline.com/ " + "or https://.ciamlogin.com/ " + "or https://.b2clogin.com/.onmicrosoft.com/policy" + % authority_or_auth_endpoint) def _instance_discovery(url, http_client, instance_discovery_endpoint, **kwargs): resp = http_client.get( diff --git a/tests/test_authority.py b/tests/test_authority.py index ca0bc68f..2ced23f8 100644 --- a/tests/test_authority.py +++ b/tests/test_authority.py @@ -79,6 +79,26 @@ def test_invalid_host_skipping_validation_can_be_turned_off(self): pass # Those are expected for this unittest case +@patch("msal.authority.tenant_discovery", return_value={ + "authorization_endpoint": "https://contoso.com/placeholder", + "token_endpoint": "https://contoso.com/placeholder", + }) +class TestCiamAuthority(unittest.TestCase): + http_client = MinimalHttpClient() + + def test_path_less_authority_should_work(self, oidc_discovery): + Authority('https://contoso.ciamlogin.com', self.http_client) + oidc_discovery.assert_called_once_with( + "https://contoso.ciamlogin.com/contoso.onmicrosoft.com/v2.0/.well-known/openid-configuration", + self.http_client) + + def test_authority_with_path_should_be_used_as_is(self, oidc_discovery): + Authority('https://contoso.ciamlogin.com/anything', self.http_client) + oidc_discovery.assert_called_once_with( + "https://contoso.ciamlogin.com/anything/v2.0/.well-known/openid-configuration", + self.http_client) + + class TestAuthorityInternalHelperCanonicalize(unittest.TestCase): def test_canonicalize_tenant_followed_by_extra_paths(self): diff --git a/tests/test_e2e.py b/tests/test_e2e.py index 48ffe47a..44c1d5f2 100644 --- a/tests/test_e2e.py +++ b/tests/test_e2e.py @@ -897,6 +897,57 @@ def test_b2c_allows_using_client_id_as_scope(self): ) +class CiamTestCase(LabBasedTestCase): + # Test cases below show you what scenarios need to be covered for CIAM. + # Detail test behaviors have already been implemented in preexisting helpers. + + @classmethod + def setUpClass(cls): + super(CiamTestCase, cls).setUpClass() + cls.user = cls.get_lab_user( + federationProvider="ciam", signinAudience="azureadmyorg", publicClient="No") + # FYI: Only single- or multi-tenant CIAM app can have other-than-OIDC + # delegated permissions on Microsoft Graph. + cls.app_config = cls.get_lab_app_object(cls.user["client_id"]) + + def test_ciam_acquire_token_interactive(self): + self._test_acquire_token_interactive( + authority=self.app_config["authority"], + client_id=self.app_config["appId"], + scope=self.app_config["scopes"], + username=self.user["username"], + lab_name=self.user["lab_name"], + ) + + def test_ciam_acquire_token_for_client(self): + self._test_acquire_token_by_client_secret( + client_id=self.app_config["appId"], + client_secret=self.get_lab_user_secret( + self.app_config["clientSecret"].split("=")[-1]), + authority=self.app_config["authority"], + scope=["{}/.default".format(self.app_config["appId"])], # App permission + ) + + def test_ciam_acquire_token_by_ropc(self): + # Somehow, this would only work after creating a secret for the test app + # and enabling "Allow public client flows". + # Otherwise it would hit AADSTS7000218. + self._test_username_password( + authority=self.app_config["authority"], + client_id=self.app_config["appId"], + username=self.user["username"], + password=self.get_lab_user_secret(self.user["lab_name"]), + scope=self.app_config["scopes"], + ) + + def test_ciam_device_flow(self): + self._test_device_flow( + authority=self.app_config["authority"], + client_id=self.app_config["appId"], + scope=self.app_config["scopes"], + ) + + class WorldWideRegionalEndpointTestCase(LabBasedTestCase): region = "westus" timeout = 2 # Short timeout makes this test case responsive on non-VM