Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 32 additions & 21 deletions msal/authority.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@
from urlparse import urlparse
import logging

from .exceptions import MsalServiceError


logger = logging.getLogger(__name__)

Expand All @@ -28,7 +26,9 @@
"b2clogin.cn",
"b2clogin.us",
"b2clogin.de",
"ciamlogin.com",
]
_CIAM_DOMAIN_SUFFIX = ".ciamlogin.com"


class AuthorityBuilder(object):
Expand Down Expand Up @@ -80,7 +80,8 @@ def __init__(
if isinstance(authority_url, AuthorityBuilder):
authority_url = str(authority_url)
authority, self.instance, tenant = canonicalize(authority_url)
self.is_adfs = tenant.lower() == 'adfs'
is_ciam = self.instance.endswith(_CIAM_DOMAIN_SUFFIX)
self.is_adfs = tenant.lower() == 'adfs' and not is_ciam
parts = authority.path.split('/')
self._is_b2c = any(
self.instance.endswith("." + d) for d in WELL_KNOWN_B2C_HOSTS
Expand Down Expand Up @@ -109,13 +110,13 @@ def __init__(
% authority_url)
tenant_discovery_endpoint = payload['tenant_discovery_endpoint']
else:
tenant_discovery_endpoint = (
'https://{}:{}{}{}/.well-known/openid-configuration'.format(
self.instance,
443 if authority.port is None else authority.port,
authority.path, # In B2C scenario, it is "/tenant/policy"
"" if tenant == "adfs" else "/v2.0" # the AAD v2 endpoint
))
tenant_discovery_endpoint = authority._replace(
path="{prefix}{version}/.well-known/openid-configuration".format(
prefix=tenant if is_ciam and len(authority.path) <= 1 # Path-less CIAM
else authority.path, # In B2C, it is "/tenant/policy"
version="" if self.is_adfs else "/v2.0",
)
).geturl() # Keeping original port and query. Query is useful for test.
try:
openid_config = tenant_discovery(
tenant_discovery_endpoint,
Expand Down Expand Up @@ -150,18 +151,28 @@ def user_realm_discovery(self, username, correlation_id=None, response=None):
return {} # This can guide the caller to fall back normal ROPC flow


def canonicalize(authority_url):
def canonicalize(authority_or_auth_endpoint):
# Returns (url_parsed_result, hostname_in_lowercase, tenant)
authority = urlparse(authority_url)
parts = authority.path.split("/")
if authority.scheme != "https" or len(parts) < 2 or not parts[1]:
raise ValueError(
"Your given address (%s) should consist of "
"an https url with a minimum of one segment in a path: e.g. "
"https://login.microsoftonline.com/<tenant> "
"or https://<tenant_name>.b2clogin.com/<tenant_name>.onmicrosoft.com/policy"
% authority_url)
return authority, authority.hostname, parts[1]
authority = urlparse(authority_or_auth_endpoint)
if authority.scheme == "https":
parts = authority.path.split("/")
first_part = parts[1] if len(parts) >= 2 and parts[1] else None
if authority.hostname.endswith(_CIAM_DOMAIN_SUFFIX): # CIAM
# Use path in CIAM authority. It will be validated by OIDC Discovery soon
tenant = first_part if first_part else "{}.onmicrosoft.com".format(
# Fallback to sub domain name. This variation may not be advertised
authority.hostname.rsplit(_CIAM_DOMAIN_SUFFIX, 1)[0])
return authority, authority.hostname, tenant
# AAD
if len(parts) >= 2 and parts[1]:
return authority, authority.hostname, parts[1]
raise ValueError(
"Your given address (%s) should consist of "
"an https url with a minimum of one segment in a path: e.g. "
"https://login.microsoftonline.com/<tenant> "
"or https://<tenant_name>.ciamlogin.com/<tenant> "
"or https://<tenant_name>.b2clogin.com/<tenant_name>.onmicrosoft.com/policy"
% authority_or_auth_endpoint)

def _instance_discovery(url, http_client, instance_discovery_endpoint, **kwargs):
resp = http_client.get(
Expand Down
20 changes: 20 additions & 0 deletions tests/test_authority.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,26 @@ def test_invalid_host_skipping_validation_can_be_turned_off(self):
pass # Those are expected for this unittest case


@patch("msal.authority.tenant_discovery", return_value={
"authorization_endpoint": "https://contoso.com/placeholder",
"token_endpoint": "https://contoso.com/placeholder",
})
class TestCiamAuthority(unittest.TestCase):
http_client = MinimalHttpClient()

def test_path_less_authority_should_work(self, oidc_discovery):
Authority('https://contoso.ciamlogin.com', self.http_client)
oidc_discovery.assert_called_once_with(
"https://contoso.ciamlogin.com/contoso.onmicrosoft.com/v2.0/.well-known/openid-configuration",
self.http_client)

def test_authority_with_path_should_be_used_as_is(self, oidc_discovery):
Authority('https://contoso.ciamlogin.com/anything', self.http_client)
oidc_discovery.assert_called_once_with(
"https://contoso.ciamlogin.com/anything/v2.0/.well-known/openid-configuration",
self.http_client)


class TestAuthorityInternalHelperCanonicalize(unittest.TestCase):

def test_canonicalize_tenant_followed_by_extra_paths(self):
Expand Down