diff --git a/.github/workflows/python-package.yml b/.github/workflows/python-package.yml index 09840ba6..10afc207 100644 --- a/.github/workflows/python-package.yml +++ b/.github/workflows/python-package.yml @@ -26,7 +26,7 @@ jobs: runs-on: ubuntu-latest strategy: matrix: - python-version: [2.7, 3.5, 3.6, 3.7, 3.8, 3.9] + python-version: [2.7, 3.5, 3.6, 3.7, 3.8, 3.9, "3.10", "3.11.0-alpha.5"] steps: - uses: actions/checkout@v2 diff --git a/README.md b/README.md index e193f257..9088b60a 100644 --- a/README.md +++ b/README.md @@ -18,7 +18,7 @@ Quick links: Click on the following thumbnail to visit a large map with clickable links to proper samples. -[![Map effect won't work inside github's markdown file, so we have to use a thumbnail here to lure audience to a real static website](docs/thumbnail.png)](https://msal-python.readthedocs.io/en/latest/) +[![Map effect won't work inside github's markdown file, so we have to use a thumbnail here to lure audience to a real static website](https://raw.githubusercontent.com/AzureAD/microsoft-authentication-library-for-python/dev/docs/thumbnail.png)](https://msal-python.readthedocs.io/en/latest/) ## Installation diff --git a/msal/application.py b/msal/application.py index e2f20446..1f6d50f2 100644 --- a/msal/application.py +++ b/msal/application.py @@ -21,13 +21,14 @@ import msal.telemetry from .region import _detect_region from .throttled_http_client import ThrottledHttpClient +from .cloudshell import _is_running_in_cloud_shell # The __init__.py will import this. Not the other way around. -__version__ = "1.17.0" # When releasing, also check and bump our dependencies's versions if needed +__version__ = "1.18.0b1" # When releasing, also check and bump our dependencies's versions if needed logger = logging.getLogger(__name__) - +_AUTHORITY_TYPE_CLOUDSHELL = "CLOUDSHELL" def extract_certs(public_cert_content): # Parses raw public certificate file contents and returns a list of strings @@ -636,6 +637,7 @@ def initiate_auth_code_flow( domain_hint=None, # type: Optional[str] claims_challenge=None, max_age=None, + response_mode=None, # type: Optional[str] ): """Initiate an auth code flow. @@ -677,6 +679,20 @@ def initiate_auth_code_flow( New in version 1.15. + :param str response_mode: + OPTIONAL. Specifies the method with which response parameters should be returned. + The default value is equivalent to ``query``, which is still secure enough in MSAL Python + (because MSAL Python does not transfer tokens via query parameter in the first place). + For even better security, we recommend using the value ``form_post``. + In "form_post" mode, response parameters + will be encoded as HTML form values that are transmitted via the HTTP POST method and + encoded in the body using the application/x-www-form-urlencoded format. + Valid values can be either "form_post" for HTTP POST to callback URI or + "query" (the default) for HTTP GET with parameters encoded in query string. + More information on possible values + `here ` + and `here ` + :return: The auth code flow. It is a dict in this form:: @@ -707,6 +723,7 @@ def initiate_auth_code_flow( claims=_merge_claims_challenge_and_capabilities( self._client_capabilities, claims_challenge), max_age=max_age, + response_mode=response_mode, ) flow["claims_challenge"] = claims_challenge return flow @@ -970,6 +987,10 @@ def get_accounts(self, username=None): return accounts def _find_msal_accounts(self, environment): + interested_authority_types = [ + TokenCache.AuthorityType.ADFS, TokenCache.AuthorityType.MSSTS] + if _is_running_in_cloud_shell(): + interested_authority_types.append(_AUTHORITY_TYPE_CLOUDSHELL) grouped_accounts = { a.get("home_account_id"): # Grouped by home tenant's id { # These are minimal amount of non-tenant-specific account info @@ -985,8 +1006,7 @@ def _find_msal_accounts(self, environment): for a in self.token_cache.find( TokenCache.CredentialType.ACCOUNT, query={"environment": environment}) - if a["authority_type"] in ( - TokenCache.AuthorityType.ADFS, TokenCache.AuthorityType.MSSTS) + if a["authority_type"] in interested_authority_types } return list(grouped_accounts.values()) @@ -1046,6 +1066,21 @@ def _forget_me(self, home_account): TokenCache.CredentialType.ACCOUNT, query=owned_by_home_account): self.token_cache.remove_account(a) + def _acquire_token_by_cloud_shell(self, scopes, data=None): + from .cloudshell import _obtain_token + response = _obtain_token( + self.http_client, scopes, client_id=self.client_id, data=data) + if "error" not in response: + self.token_cache.add(dict( + client_id=self.client_id, + scope=response["scope"].split() if "scope" in response else scopes, + token_endpoint=self.authority.token_endpoint, + response=response.copy(), + data=data or {}, + authority_type=_AUTHORITY_TYPE_CLOUDSHELL, + )) + return response + def acquire_token_silent( self, scopes, # type: List[str] @@ -1179,6 +1214,7 @@ def _acquire_token_silent_from_cache_and_possibly_refresh_it( authority, # This can be different than self.authority force_refresh=False, # type: Optional[boolean] claims_challenge=None, + correlation_id=None, **kwargs): access_token_from_cache = None if not (force_refresh or claims_challenge): # Bypass AT when desired or using claims @@ -1217,9 +1253,13 @@ def _acquire_token_silent_from_cache_and_possibly_refresh_it( refresh_reason = msal.telemetry.FORCE_REFRESH # TODO: It could also mean claims_challenge assert refresh_reason, "It should have been established at this point" try: + if account and account.get("authority_type") == _AUTHORITY_TYPE_CLOUDSHELL: + return self._acquire_token_by_cloud_shell( + scopes, data=kwargs.get("data")) result = _clean_up(self._acquire_token_silent_by_finding_rt_belongs_to_me_or_my_family( authority, self._decorate_scope(scopes), account, refresh_reason=refresh_reason, claims_challenge=claims_challenge, + correlation_id=correlation_id, **kwargs)) if (result and "error" not in result) or (not access_token_from_cache): return result @@ -1558,6 +1598,9 @@ def acquire_token_interactive( - A dict containing an "error" key, when token refresh failed. """ self._validate_ssh_cert_input_data(kwargs.get("data", {})) + if _is_running_in_cloud_shell() and prompt == "none": + return self._acquire_token_by_cloud_shell( + scopes, data=kwargs.pop("data", {})) claims = _merge_claims_challenge_and_capabilities( self._client_capabilities, claims_challenge) telemetry_context = self._build_telemetry_context( @@ -1659,6 +1702,11 @@ def acquire_token_for_client(self, scopes, claims_challenge=None, **kwargs): - an error response would contain "error" and usually "error_description". """ # TBD: force_refresh behavior + if self.authority.tenant.lower() in ["common", "organizations"]: + warnings.warn( + "Using /common or /organizations authority " + "in acquire_token_for_client() is unreliable. " + "Please use a specific tenant instead.", DeprecationWarning) self._validate_ssh_cert_input_data(kwargs.get("data", {})) telemetry_context = self._build_telemetry_context( self.ACQUIRE_TOKEN_FOR_CLIENT_ID) diff --git a/msal/cloudshell.py b/msal/cloudshell.py new file mode 100644 index 00000000..f4feaf44 --- /dev/null +++ b/msal/cloudshell.py @@ -0,0 +1,122 @@ +# Copyright (c) Microsoft Corporation. +# All rights reserved. +# +# This code is licensed under the MIT License. + +"""This module wraps Cloud Shell's IMDS-like interface inside an OAuth2-like helper""" +import base64 +import json +import logging +import os +import time +try: # Python 2 + from urlparse import urlparse +except: # Python 3 + from urllib.parse import urlparse +from .oauth2cli.oidc import decode_part + + +logger = logging.getLogger(__name__) + + +def _is_running_in_cloud_shell(): + return os.environ.get("AZUREPS_HOST_ENVIRONMENT", "").startswith("cloud-shell") + + +def _scope_to_resource(scope): # This is an experimental reasonable-effort approach + cloud_shell_supported_audiences = [ + "https://analysis.windows.net/powerbi/api", # Came from https://msazure.visualstudio.com/One/_git/compute-CloudShell?path=/src/images/agent/env/envconfig.PROD.json + "https://pas.windows.net/CheckMyAccess/Linux/.default", # Cloud Shell accepts it as-is + ] + for a in cloud_shell_supported_audiences: + if scope.startswith(a): + return a + u = urlparse(scope) + if u.scheme: + return "{}://{}".format(u.scheme, u.netloc) + return scope # There is no much else we can do here + + +def _obtain_token(http_client, scopes, client_id=None, data=None): + resp = http_client.post( + "http://localhost:50342/oauth2/token", + data=dict( + data or {}, + resource=" ".join(map(_scope_to_resource, scopes))), + headers={"Metadata": "true"}, + ) + if resp.status_code >= 300: + logger.debug("Cloud Shell IMDS error: %s", resp.text) + cs_error = json.loads(resp.text).get("error", {}) + return {k: v for k, v in { + "error": cs_error.get("code"), + "error_description": cs_error.get("message"), + }.items() if v} + imds_payload = json.loads(resp.text) + BEARER = "Bearer" + oauth2_response = { + "access_token": imds_payload["access_token"], + "expires_in": int(imds_payload["expires_in"]), + "token_type": imds_payload.get("token_type", BEARER), + } + expected_token_type = (data or {}).get("token_type", BEARER) + if oauth2_response["token_type"] != expected_token_type: + return { # Generate a normal error (rather than an intrusive exception) + "error": "broker_error", + "error_description": "token_type {} is not supported by this version of Azure Portal".format( + expected_token_type), + } + parts = imds_payload["access_token"].split(".") + + # The following default values are useful in SSH Cert scenario + client_info = { # Default value, in case the real value will be unavailable + "uid": "user", + "utid": "cloudshell", + } + now = time.time() + preferred_username = "currentuser@cloudshell" + oauth2_response["id_token_claims"] = { # First 5 claims are required per OIDC + "iss": "cloudshell", + "sub": "user", + "aud": client_id, + "exp": now + 3600, + "iat": now, + "preferred_username": preferred_username, # Useful as MSAL account's username + } + + if len(parts) == 3: # Probably a JWT. Use it to derive client_info and id token. + try: + # Data defined in https://docs.microsoft.com/en-us/azure/active-directory/develop/access-tokens#payload-claims + jwt_payload = json.loads(decode_part(parts[1])) + client_info = { + # Mimic a real home_account_id, + # so that this pseudo account and a real account would interop. + "uid": jwt_payload.get("oid", "user"), + "utid": jwt_payload.get("tid", "cloudshell"), + } + oauth2_response["id_token_claims"] = { + "iss": jwt_payload["iss"], + "sub": jwt_payload["sub"], # Could use oid instead + "aud": client_id, + "exp": jwt_payload["exp"], + "iat": jwt_payload["iat"], + "preferred_username": jwt_payload.get("preferred_username") # V2 + or jwt_payload.get("unique_name") # V1 + or preferred_username, + } + except ValueError: + logger.debug("Unable to decode jwt payload: %s", parts[1]) + oauth2_response["client_info"] = base64.b64encode( + # Mimic a client_info, so that MSAL would create an account + json.dumps(client_info).encode("utf-8")).decode("utf-8") + oauth2_response["id_token_claims"]["tid"] = client_info["utid"] # TBD + + ## Note: Decided to not surface resource back as scope, + ## because they would cause the downstream OAuth2 code path to + ## cache the token with a different scope and won't hit them later. + #if imds_payload.get("resource"): + # oauth2_response["scope"] = imds_payload["resource"] + if imds_payload.get("refresh_token"): + oauth2_response["refresh_token"] = imds_payload["refresh_token"] + return oauth2_response + diff --git a/msal/token_cache.py b/msal/token_cache.py index 2ed819d7..f7d9f955 100644 --- a/msal/token_cache.py +++ b/msal/token_cache.py @@ -113,6 +113,7 @@ def wipe(dictionary, sensitive_fields): # Masks sensitive info return self.__add(event, now=now) finally: wipe(event.get("response", {}), ( # These claims were useful during __add() + "id_token_claims", # Provided by broker "access_token", "refresh_token", "id_token", "username")) wipe(event, ["username"]) # Needed for federated ROPC logger.debug("event=%s", json.dumps( @@ -150,7 +151,8 @@ def __add(self, event, now=None): id_token = response.get("id_token") id_token_claims = ( decode_id_token(id_token, client_id=event["client_id"]) - if id_token else {}) + if id_token + else response.get("id_token_claims", {})) # Broker would provide id_token_claims client_info, home_account_id = self.__parse_account(response, id_token_claims) target = ' '.join(event.get("scope") or []) # Per schema, we don't sort it @@ -195,9 +197,10 @@ def __add(self, event, now=None): or data.get("username") # Falls back to ROPC username or event.get("username") # Falls back to Federated ROPC username or "", # The schema does not like null - "authority_type": + "authority_type": event.get( + "authority_type", # Honor caller's choice of authority_type self.AuthorityType.ADFS if realm == "adfs" - else self.AuthorityType.MSSTS, + else self.AuthorityType.MSSTS), # "client_info": response.get("client_info"), # Optional } self.modify(self.CredentialType.ACCOUNT, account, account) diff --git a/setup.py b/setup.py index bcec8fe7..f8bdd7d7 100644 --- a/setup.py +++ b/setup.py @@ -63,6 +63,7 @@ 'Programming Language :: Python :: 3.7', 'Programming Language :: Python :: 3.8', 'Programming Language :: Python :: 3.9', + 'Programming Language :: Python :: 3.10', 'License :: OSI Approved :: MIT License', 'Operating System :: OS Independent', ], @@ -75,7 +76,7 @@ 'requests>=2.0.0,<3', 'PyJWT[crypto]>=1.0.0,<3', - 'cryptography>=0.6,<39', + 'cryptography>=0.6,<40', # load_pem_private_key() is available since 0.6 # https://github.com/pyca/cryptography/blob/master/CHANGELOG.rst#06---2014-09-29 # diff --git a/tests/msaltest.py b/tests/msaltest.py new file mode 100644 index 00000000..c1ef1e7c --- /dev/null +++ b/tests/msaltest.py @@ -0,0 +1,178 @@ +import getpass, logging, pprint, sys, msal + + +def _input_boolean(message): + return input( + "{} (N/n/F/f or empty means False, otherwise it is True): ".format(message) + ) not in ('N', 'n', 'F', 'f', '') + +def _input(message, default=None): + return input(message.format(default=default)).strip() or default + +def _select_options( + options, header="Your options:", footer=" Your choice? ", option_renderer=str, + accept_nonempty_string=False, + ): + assert options, "options must not be empty" + if header: + print(header) + for i, o in enumerate(options, start=1): + print(" {}: {}".format(i, option_renderer(o))) + if accept_nonempty_string: + print(" Or you can just type in your input.") + while True: + raw_data = input(footer) + try: + choice = int(raw_data) + if 1 <= choice <= len(options): + return options[choice - 1] + except ValueError: + if raw_data and accept_nonempty_string: + return raw_data + +def _input_scopes(): + return _select_options([ + "https://graph.microsoft.com/.default", + "https://management.azure.com/.default", + "User.Read", + "User.ReadBasic.All", + ], + header="Select a scope (multiple scopes can only be input by manually typing them, delimited by space):", + accept_nonempty_string=True, + ).split() + +def _select_account(app): + accounts = app.get_accounts() + if accounts: + return _select_options( + accounts, + option_renderer=lambda a: a["username"], + header="Account(s) already signed in inside MSAL Python:", + ) + else: + print("No account available inside MSAL Python. Use other methods to acquire token first.") + +def acquire_token_silent(app): + """acquire_token_silent() - with an account already signed into MSAL Python.""" + account = _select_account(app) + if account: + pprint.pprint(app.acquire_token_silent( + _input_scopes(), + account=account, + force_refresh=_input_boolean("Bypass MSAL Python's token cache?"), + )) + +def _acquire_token_interactive(app, scopes, data=None): + prompt = _select_options([ + {"value": None, "description": "Unspecified. Proceed silently with a default account (if any), fallback to prompt."}, + {"value": "none", "description": "none. Proceed silently with a default account (if any), or error out."}, + {"value": "select_account", "description": "select_account. Prompt with an account picker."}, + ], + option_renderer=lambda o: o["description"], + header="Prompt behavior?")["value"] + raw_login_hint = _select_options( + # login_hint is unnecessary when prompt=select_account, + # but we still let tester input login_hint, just for testing purpose. + [None] + [a["username"] for a in app.get_accounts()], + header="login_hint? (If you have multiple signed-in sessions in browser, and you specify a login_hint to match one of them, you will bypass the account picker.)", + accept_nonempty_string=True, + ) + login_hint = raw_login_hint["username"] if isinstance(raw_login_hint, dict) else raw_login_hint + result = app.acquire_token_interactive( + scopes, prompt=prompt, login_hint=login_hint, data=data or {}) + if login_hint and "id_token_claims" in result: + signed_in_user = result.get("id_token_claims", {}).get("preferred_username") + if signed_in_user != login_hint: + logging.warning('Signed-in user "%s" does not match login_hint', signed_in_user) + return result + +def acquire_token_interactive(app): + """acquire_token_interactive() - User will be prompted if app opts to do select_account.""" + pprint.pprint(_acquire_token_interactive(app, _input_scopes())) + +def acquire_token_by_username_password(app): + """acquire_token_by_username_password() - See constraints here: https://docs.microsoft.com/en-us/azure/active-directory/develop/msal-authentication-flows#constraints-for-ropc""" + pprint.pprint(app.acquire_token_by_username_password( + _input("username: "), getpass.getpass("password: "), scopes=_input_scopes())) + +_JWK1 = """{"kty":"RSA", "n":"2tNr73xwcj6lH7bqRZrFzgSLj7OeLfbn8216uOMDHuaZ6TEUBDN8Uz0ve8jAlKsP9CQFCSVoSNovdE-fs7c15MxEGHjDcNKLWonznximj8pDGZQjVdfK-7mG6P6z-lgVcLuYu5JcWU_PeEqIKg5llOaz-qeQ4LEDS4T1D2qWRGpAra4rJX1-kmrWmX_XIamq30C9EIO0gGuT4rc2hJBWQ-4-FnE1NXmy125wfT3NdotAJGq5lMIfhjfglDbJCwhc8Oe17ORjO3FsB5CLuBRpYmP7Nzn66lRY3Fe11Xz8AEBl3anKFSJcTvlMnFtu3EpD-eiaHfTgRBU7CztGQqVbiQ", "e":"AQAB"}""" +SSH_CERT_DATA = {"token_type": "ssh-cert", "key_id": "key1", "req_cnf": _JWK1} +SSH_CERT_SCOPE = ["https://pas.windows.net/CheckMyAccess/Linux/.default"] + +def acquire_ssh_cert_silently(app): + """Acquire an SSH Cert silently- This typically only works with Azure CLI""" + account = _select_account(app) + if account: + result = app.acquire_token_silent( + SSH_CERT_SCOPE, + account, + data=SSH_CERT_DATA, + force_refresh=_input_boolean("Bypass MSAL Python's token cache?"), + ) + pprint.pprint(result) + if result and result.get("token_type") != "ssh-cert": + logging.error("Unable to acquire an ssh-cert.") + +def acquire_ssh_cert_interactive(app): + """Acquire an SSH Cert interactively - This typically only works with Azure CLI""" + result = _acquire_token_interactive(app, SSH_CERT_SCOPE, data=SSH_CERT_DATA) + pprint.pprint(result) + if result.get("token_type") != "ssh-cert": + logging.error("Unable to acquire an ssh-cert") + +def remove_account(app): + """remove_account() - Invalidate account and/or token(s) from cache, so that acquire_token_silent() would be reset""" + account = _select_account(app) + if account: + app.remove_account(account) + print('Account "{}" and/or its token(s) are signed out from MSAL Python'.format(account["username"])) + +def exit(_): + """Exit""" + bug_link = "https://github.com/AzureAD/microsoft-authentication-library-for-python/issues/new/choose" + print("Bye. If you found a bug, please report it here: {}".format(bug_link)) + sys.exit() + +def main(): + print("Welcome to the Msal Python Console Test App, committed at 2022-5-2\n") + chosen_app = _select_options([ + {"client_id": "04b07795-8ddb-461a-bbee-02f9e1bf7b46", "name": "Azure CLI (Correctly configured for MSA-PT)"}, + {"client_id": "04f0c124-f2bc-4f59-8241-bf6df9866bbd", "name": "Visual Studio (Correctly configured for MSA-PT)"}, + {"client_id": "95de633a-083e-42f5-b444-a4295d8e9314", "name": "Whiteboard Services (Non MSA-PT app. Accepts AAD & MSA accounts.)"}, + ], + option_renderer=lambda a: a["name"], + header="Impersonate this app (or you can type in the client_id of your own app)", + accept_nonempty_string=True) + app = msal.PublicClientApplication( + chosen_app["client_id"] if isinstance(chosen_app, dict) else chosen_app, + authority=_select_options([ + "https://login.microsoftonline.com/common", + "https://login.microsoftonline.com/organizations", + "https://login.microsoftonline.com/microsoft.onmicrosoft.com", + "https://login.microsoftonline.com/msidlab4.onmicrosoft.com", + "https://login.microsoftonline.com/consumers", + ], + header="Input authority (Note that MSA-PT apps would NOT use the /common authority)", + accept_nonempty_string=True, + ), + ) + if _input_boolean("Enable MSAL Python's DEBUG log?"): + logging.basicConfig(level=logging.DEBUG) + while True: + func = _select_options([ + acquire_token_silent, + acquire_token_interactive, + acquire_token_by_username_password, + acquire_ssh_cert_silently, + acquire_ssh_cert_interactive, + remove_account, + exit, + ], option_renderer=lambda f: f.__doc__, header="MSAL Python APIs:") + try: + func(app) + except KeyboardInterrupt: # Useful for bailing out a stuck interactive flow + print("Aborted") + +if __name__ == "__main__": + main() + diff --git a/tests/test_application.py b/tests/test_application.py index 518042a8..804ccb82 100644 --- a/tests/test_application.py +++ b/tests/test_application.py @@ -1,5 +1,6 @@ # Note: Since Aug 2019 we move all e2e tests into test_e2e.py, # so this test_application file contains only unit tests without dependency. +import sys from msal.application import * from msal.application import _str2bytes import msal @@ -602,3 +603,25 @@ def test_get_accounts(self): self.assertIn("local_account_id", account, "Backward compatibility") self.assertIn("realm", account, "Backward compatibility") + +@unittest.skipUnless( + sys.version_info[0] >= 3 and sys.version_info[1] >= 2, + "assertWarns() is only available in Python 3.2+") +class TestClientCredentialGrant(unittest.TestCase): + def _test_certain_authority_should_emit_warnning(self, authority): + app = ConfidentialClientApplication( + "client_id", client_credential="secret", authority=authority) + def mock_post(url, headers=None, *args, **kwargs): + return MinimalResponse( + status_code=200, text=json.dumps({"access_token": "an AT"})) + with self.assertWarns(DeprecationWarning): + app.acquire_token_for_client(["scope"], post=mock_post) + + def test_common_authority_should_emit_warnning(self): + self._test_certain_authority_should_emit_warnning( + authority="https://login.microsoftonline.com/common") + + def test_organizations_authority_should_emit_warnning(self): + self._test_certain_authority_should_emit_warnning( + authority="https://login.microsoftonline.com/organizations") + diff --git a/tests/test_e2e.py b/tests/test_e2e.py index 65691689..9a971f46 100644 --- a/tests/test_e2e.py +++ b/tests/test_e2e.py @@ -86,7 +86,7 @@ def assertCacheWorksForUser( self.assertNotEqual(0, len(accounts)) account = accounts[0] if ("scope" not in result_from_wire # This is the usual case - or # Authority server could reject some scopes + or # Authority server could return different set of scopes set(scope) <= set(result_from_wire["scope"].split(" ")) ): # Going to test acquire_token_silent(...) to locate an AT from cache @@ -115,7 +115,7 @@ def assertCacheWorksForUser( # result_from_wire['access_token'] != result_from_cache['access_token'] # but ROPC in B2C tends to return the same AT we obtained seconds ago. # Now looking back, "refresh_token grant would return a brand new AT" - # was just an empirical observation but never a committment in specs, + # was just an empirical observation but never a commitment in specs, # so we adjust our way to assert here. (result_from_cache or {}).get("access_token"), "We should get an AT from acquire_token_silent(...) call") @@ -185,12 +185,14 @@ def _test_acquire_token_interactive( self, client_id=None, authority=None, scope=None, port=None, username_uri="", # But you would want to provide one data=None, # Needed by ssh-cert feature + prompt=None, **ignored): assert client_id and authority and scope self.app = msal.PublicClientApplication( client_id, authority=authority, http_client=MinimalHttpClient()) result = self.app.acquire_token_interactive( scope, + prompt=prompt, timeout=120, port=port, welcome_template= # This is an undocumented feature for testing @@ -237,6 +239,7 @@ def test_ssh_cert_for_user(self): scope=self.SCOPE, data=self.DATA1, username_uri="https://msidlab.com/api/user?usertype=cloud", + prompt="none" if msal.application._is_running_in_cloud_shell() else None, ) # It already tests reading AT from cache, and using RT to refresh # acquire_token_silent() would work because we pass in the same key self.assertIsNotNone(result.get("access_token"), "Encountered {}: {}".format( @@ -254,6 +257,20 @@ def test_ssh_cert_for_user(self): self.assertNotEqual(result["access_token"], refreshed_ssh_cert['access_token']) +@unittest.skipUnless( + msal.application._is_running_in_cloud_shell(), + "Manually run this test case from inside Cloud Shell") +class CloudShellTestCase(E2eTestCase): + app = msal.PublicClientApplication("client_id") + scope_that_requires_no_managed_device = "https://management.core.windows.net/" # Scopes came from https://msazure.visualstudio.com/One/_git/compute-CloudShell?path=/src/images/agent/env/envconfig.PROD.json&version=GBmaster&_a=contents + def test_access_token_should_be_obtained_for_a_supported_scope(self): + result = self.app.acquire_token_interactive( + [self.scope_that_requires_no_managed_device], prompt="none") + self.assertEqual( + "Bearer", result.get("token_type"), "Unexpected result: %s" % result) + self.assertIsNotNone(result.get("access_token")) + + THIS_FOLDER = os.path.dirname(__file__) CONFIG = os.path.join(THIS_FOLDER, "config.json") @unittest.skipUnless(os.path.exists(CONFIG), "Optional %s not found" % CONFIG)