diff --git a/libraries/botframework-connector/botframework/connector/auth/__init__.py b/libraries/botframework-connector/botframework/connector/auth/__init__.py index 8747a03c8..d58dcf5fa 100644 --- a/libraries/botframework-connector/botframework/connector/auth/__init__.py +++ b/libraries/botframework-connector/botframework/connector/auth/__init__.py @@ -17,6 +17,8 @@ from .microsoft_app_credentials import * from .microsoft_government_app_credentials import * from .certificate_app_credentials import * +from .certificate_government_app_credentials import * +from .certificate_service_client_credential_factory import * from .claims_identity import * from .jwt_token_validation import * from .credential_provider import * diff --git a/libraries/botframework-connector/botframework/connector/auth/certificate_app_credentials.py b/libraries/botframework-connector/botframework/connector/auth/certificate_app_credentials.py index b39511d82..89dbe882d 100644 --- a/libraries/botframework-connector/botframework/connector/auth/certificate_app_credentials.py +++ b/libraries/botframework-connector/botframework/connector/auth/certificate_app_credentials.py @@ -44,7 +44,6 @@ def __init__( oauth_scope=oauth_scope, ) - self.scopes = [self.oauth_scope] self.app = None self.certificate_thumbprint = certificate_thumbprint self.certificate_private_key = certificate_private_key @@ -56,17 +55,18 @@ def get_access_token(self, force_refresh: bool = False) -> str: :return: The access token for the given certificate. """ + scope = self.oauth_scope + if not scope.endswith("/.default"): + scope += "/.default" + scopes = [scope] + # Firstly, looks up a token from cache # Since we are looking for token for the current app, NOT for an end user, # notice we give account parameter as None. - auth_token = self.__get_msal_app().acquire_token_silent( - self.scopes, account=None - ) + auth_token = self.__get_msal_app().acquire_token_silent(scopes, account=None) if not auth_token: # No suitable token exists in cache. Let's get a new one from AAD. - auth_token = self.__get_msal_app().acquire_token_for_client( - scopes=self.scopes - ) + auth_token = self.__get_msal_app().acquire_token_for_client(scopes=scopes) return auth_token["access_token"] def __get_msal_app(self): diff --git a/libraries/botframework-connector/botframework/connector/auth/certificate_government_app_credentials.py b/libraries/botframework-connector/botframework/connector/auth/certificate_government_app_credentials.py new file mode 100644 index 000000000..b2883cfa1 --- /dev/null +++ b/libraries/botframework-connector/botframework/connector/auth/certificate_government_app_credentials.py @@ -0,0 +1,51 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. + +from .certificate_app_credentials import CertificateAppCredentials +from .government_constants import GovernmentConstants + + +class CertificateGovernmentAppCredentials(CertificateAppCredentials): + """ + GovernmentAppCredentials implementation using a certificate. + """ + + def __init__( + self, + app_id: str, + certificate_thumbprint: str, + certificate_private_key: str, + channel_auth_tenant: str = None, + oauth_scope: str = None, + certificate_public: str = None, + ): + """ + AppCredentials implementation using a certificate. + + :param app_id: + :param certificate_thumbprint: + :param certificate_private_key: + :param channel_auth_tenant: + :param oauth_scope: + :param certificate_public: public_certificate (optional) is public key certificate which will be sent + through ‘x5c’ JWT header only for subject name and issuer authentication to support cert auto rolls. + """ + + # super will set proper scope and endpoint. + super().__init__( + app_id=app_id, + channel_auth_tenant=channel_auth_tenant, + oauth_scope=oauth_scope, + certificate_thumbprint=certificate_thumbprint, + certificate_private_key=certificate_private_key, + certificate_public=certificate_public, + ) + + def _get_default_channelauth_tenant(self) -> str: + return GovernmentConstants.DEFAULT_CHANNEL_AUTH_TENANT + + def _get_to_channel_from_bot_loginurl_prefix(self) -> str: + return GovernmentConstants.TO_CHANNEL_FROM_BOT_LOGIN_URL_PREFIX + + def _get_to_channel_from_bot_oauthscope(self) -> str: + return GovernmentConstants.TO_CHANNEL_FROM_BOT_OAUTH_SCOPE diff --git a/libraries/botframework-connector/botframework/connector/auth/certificate_service_client_credential_factory.py b/libraries/botframework-connector/botframework/connector/auth/certificate_service_client_credential_factory.py new file mode 100644 index 000000000..7a71c28bd --- /dev/null +++ b/libraries/botframework-connector/botframework/connector/auth/certificate_service_client_credential_factory.py @@ -0,0 +1,129 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. + +from logging import Logger + +from msrest.authentication import Authentication + +from .authentication_constants import AuthenticationConstants +from .government_constants import GovernmentConstants +from .certificate_app_credentials import CertificateAppCredentials +from .certificate_government_app_credentials import CertificateGovernmentAppCredentials +from .microsoft_app_credentials import MicrosoftAppCredentials +from .service_client_credentials_factory import ServiceClientCredentialsFactory + + +class CertificateServiceClientCredentialsFactory(ServiceClientCredentialsFactory): + def __init__( + self, + certificate_thumbprint: str, + certificate_private_key: str, + app_id: str, + tenant_id: str = None, + certificate_public: str = None, + *, + logger: Logger = None + ) -> None: + """ + CertificateServiceClientCredentialsFactory implementation using a certificate. + + :param certificate_thumbprint: + :param certificate_private_key: + :param app_id: + :param tenant_id: + :param certificate_public: public_certificate (optional) is public key certificate which will be sent + through ‘x5c’ JWT header only for subject name and issuer authentication to support cert auto rolls. + """ + + self.certificate_thumbprint = certificate_thumbprint + self.certificate_private_key = certificate_private_key + self.app_id = app_id + self.tenant_id = tenant_id + self.certificate_public = certificate_public + self._logger = logger + + async def is_valid_app_id(self, app_id: str) -> bool: + return app_id == self.app_id + + async def is_authentication_disabled(self) -> bool: + return not self.app_id + + async def create_credentials( + self, + app_id: str, + oauth_scope: str, + login_endpoint: str, + validate_authority: bool, + ) -> Authentication: + if await self.is_authentication_disabled(): + return MicrosoftAppCredentials.empty() + + if not await self.is_valid_app_id(app_id): + raise Exception("Invalid app_id") + + normalized_endpoint = login_endpoint.lower() if login_endpoint else "" + + if normalized_endpoint.startswith( + AuthenticationConstants.TO_CHANNEL_FROM_BOT_LOGIN_URL_PREFIX + ): + credentials = CertificateAppCredentials( + app_id, + self.certificate_thumbprint, + self.certificate_private_key, + self.tenant_id, + oauth_scope, + self.certificate_public, + ) + elif normalized_endpoint.startswith( + GovernmentConstants.TO_CHANNEL_FROM_BOT_LOGIN_URL_PREFIX + ): + credentials = CertificateGovernmentAppCredentials( + app_id, + self.certificate_thumbprint, + self.certificate_private_key, + self.tenant_id, + oauth_scope, + self.certificate_public, + ) + else: + credentials = _CertificatePrivateCloudAppCredentials( + app_id, + self.certificate_thumbprint, + self.certificate_private_key, + self.tenant_id, + oauth_scope, + self.certificate_public, + login_endpoint, + validate_authority, + ) + + return credentials + + +class _CertificatePrivateCloudAppCredentials(CertificateAppCredentials): + def __init__( + self, + app_id: str, + certificate_thumbprint: str, + certificate_private_key: str, + channel_auth_tenant: str, + oauth_scope: str, + certificate_public: str, + oauth_endpoint: str, + validate_authority: bool, + ): + super().__init__( + app_id, + certificate_thumbprint, + certificate_private_key, + channel_auth_tenant, + oauth_scope, + certificate_public, + ) + + self.oauth_endpoint = oauth_endpoint + self._validate_authority = validate_authority + + @property + def validate_authority(self): + return self._validate_authority diff --git a/libraries/botframework-connector/tests/test_certificate_service_client_credential_factory.py b/libraries/botframework-connector/tests/test_certificate_service_client_credential_factory.py new file mode 100644 index 000000000..558397c9f --- /dev/null +++ b/libraries/botframework-connector/tests/test_certificate_service_client_credential_factory.py @@ -0,0 +1,73 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. + +import aiounittest +from botframework.connector.auth import ( + AppCredentials, + AuthenticationConstants, + GovernmentConstants, + CertificateServiceClientCredentialsFactory, + CertificateAppCredentials, + CertificateGovernmentAppCredentials, +) + + +class CertificateServiceClientCredentialsFactoryTests(aiounittest.AsyncTestCase): + test_appid = "test_appid" + test_tenant_id = "test_tenant_id" + test_audience = "test_audience" + login_endpoint = AuthenticationConstants.TO_CHANNEL_FROM_BOT_LOGIN_URL_PREFIX + gov_login_endpoint = GovernmentConstants.TO_CHANNEL_FROM_BOT_LOGIN_URL_PREFIX + private_login_endpoint = "https://login.privatecloud.com" + + async def test_can_create_public_credentials(self): + factory = CertificateServiceClientCredentialsFactory( + app_id=CertificateServiceClientCredentialsFactoryTests.test_appid, + certificate_thumbprint="thumbprint", + certificate_private_key="private_key", + ) + + credentials = await factory.create_credentials( + CertificateServiceClientCredentialsFactoryTests.test_appid, + CertificateServiceClientCredentialsFactoryTests.test_audience, + CertificateServiceClientCredentialsFactoryTests.login_endpoint, + True, + ) + + assert isinstance(credentials, CertificateAppCredentials) + + async def test_can_create_gov_credentials(self): + factory = CertificateServiceClientCredentialsFactory( + app_id=CertificateServiceClientCredentialsFactoryTests.test_appid, + certificate_thumbprint="thumbprint", + certificate_private_key="private_key", + ) + + credentials = await factory.create_credentials( + CertificateServiceClientCredentialsFactoryTests.test_appid, + CertificateServiceClientCredentialsFactoryTests.test_audience, + CertificateServiceClientCredentialsFactoryTests.gov_login_endpoint, + True, + ) + + assert isinstance(credentials, CertificateGovernmentAppCredentials) + + async def test_can_create_private_credentials(self): + factory = CertificateServiceClientCredentialsFactory( + app_id=CertificateServiceClientCredentialsFactoryTests.test_appid, + certificate_thumbprint="thumbprint", + certificate_private_key="private_key", + ) + + credentials = await factory.create_credentials( + CertificateServiceClientCredentialsFactoryTests.test_appid, + CertificateServiceClientCredentialsFactoryTests.test_audience, + CertificateServiceClientCredentialsFactoryTests.private_login_endpoint, + True, + ) + + assert isinstance(credentials, CertificateAppCredentials) + assert ( + credentials.oauth_endpoint + == CertificateServiceClientCredentialsFactoryTests.private_login_endpoint + )