Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
from .microsoft_app_credentials import *
from .microsoft_government_app_credentials import *
from .certificate_app_credentials import *
from .certificate_government_app_credentials import *
from .certificate_service_client_credential_factory import *
from .claims_identity import *
from .jwt_token_validation import *
from .credential_provider import *
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ def __init__(
oauth_scope=oauth_scope,
)

self.scopes = [self.oauth_scope]
self.app = None
self.certificate_thumbprint = certificate_thumbprint
self.certificate_private_key = certificate_private_key
Expand All @@ -56,17 +55,18 @@ def get_access_token(self, force_refresh: bool = False) -> str:
:return: The access token for the given certificate.
"""

scope = self.oauth_scope
if not scope.endswith("/.default"):
scope += "/.default"
scopes = [scope]

# Firstly, looks up a token from cache
# Since we are looking for token for the current app, NOT for an end user,
# notice we give account parameter as None.
auth_token = self.__get_msal_app().acquire_token_silent(
self.scopes, account=None
)
auth_token = self.__get_msal_app().acquire_token_silent(scopes, account=None)
if not auth_token:
# No suitable token exists in cache. Let's get a new one from AAD.
auth_token = self.__get_msal_app().acquire_token_for_client(
scopes=self.scopes
)
auth_token = self.__get_msal_app().acquire_token_for_client(scopes=scopes)
return auth_token["access_token"]

def __get_msal_app(self):
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License.

from .certificate_app_credentials import CertificateAppCredentials
from .government_constants import GovernmentConstants


class CertificateGovernmentAppCredentials(CertificateAppCredentials):
"""
GovernmentAppCredentials implementation using a certificate.
"""

def __init__(
self,
app_id: str,
certificate_thumbprint: str,
certificate_private_key: str,
channel_auth_tenant: str = None,
oauth_scope: str = None,
certificate_public: str = None,
):
"""
AppCredentials implementation using a certificate.

:param app_id:
:param certificate_thumbprint:
:param certificate_private_key:
:param channel_auth_tenant:
:param oauth_scope:
:param certificate_public: public_certificate (optional) is public key certificate which will be sent
through ‘x5c’ JWT header only for subject name and issuer authentication to support cert auto rolls.
"""

# super will set proper scope and endpoint.
super().__init__(
app_id=app_id,
channel_auth_tenant=channel_auth_tenant,
oauth_scope=oauth_scope,
certificate_thumbprint=certificate_thumbprint,
certificate_private_key=certificate_private_key,
certificate_public=certificate_public,
)

def _get_default_channelauth_tenant(self) -> str:
return GovernmentConstants.DEFAULT_CHANNEL_AUTH_TENANT

def _get_to_channel_from_bot_loginurl_prefix(self) -> str:
return GovernmentConstants.TO_CHANNEL_FROM_BOT_LOGIN_URL_PREFIX

def _get_to_channel_from_bot_oauthscope(self) -> str:
return GovernmentConstants.TO_CHANNEL_FROM_BOT_OAUTH_SCOPE
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License.

from logging import Logger

from msrest.authentication import Authentication

from .authentication_constants import AuthenticationConstants
from .government_constants import GovernmentConstants
from .certificate_app_credentials import CertificateAppCredentials
from .certificate_government_app_credentials import CertificateGovernmentAppCredentials
from .microsoft_app_credentials import MicrosoftAppCredentials
from .service_client_credentials_factory import ServiceClientCredentialsFactory


class CertificateServiceClientCredentialsFactory(ServiceClientCredentialsFactory):
def __init__(
self,
certificate_thumbprint: str,
certificate_private_key: str,
app_id: str,
tenant_id: str = None,
certificate_public: str = None,
*,
logger: Logger = None
) -> None:
"""
CertificateServiceClientCredentialsFactory implementation using a certificate.

:param certificate_thumbprint:
:param certificate_private_key:
:param app_id:
:param tenant_id:
:param certificate_public: public_certificate (optional) is public key certificate which will be sent
through ‘x5c’ JWT header only for subject name and issuer authentication to support cert auto rolls.
"""

self.certificate_thumbprint = certificate_thumbprint
self.certificate_private_key = certificate_private_key
self.app_id = app_id
self.tenant_id = tenant_id
self.certificate_public = certificate_public
self._logger = logger

async def is_valid_app_id(self, app_id: str) -> bool:
return app_id == self.app_id

async def is_authentication_disabled(self) -> bool:
return not self.app_id

async def create_credentials(
self,
app_id: str,
oauth_scope: str,
login_endpoint: str,
validate_authority: bool,
) -> Authentication:
if await self.is_authentication_disabled():
return MicrosoftAppCredentials.empty()

if not await self.is_valid_app_id(app_id):
raise Exception("Invalid app_id")

normalized_endpoint = login_endpoint.lower() if login_endpoint else ""

if normalized_endpoint.startswith(
AuthenticationConstants.TO_CHANNEL_FROM_BOT_LOGIN_URL_PREFIX
):
credentials = CertificateAppCredentials(
app_id,
self.certificate_thumbprint,
self.certificate_private_key,
self.tenant_id,
oauth_scope,
self.certificate_public,
)
elif normalized_endpoint.startswith(
GovernmentConstants.TO_CHANNEL_FROM_BOT_LOGIN_URL_PREFIX
):
credentials = CertificateGovernmentAppCredentials(
app_id,
self.certificate_thumbprint,
self.certificate_private_key,
self.tenant_id,
oauth_scope,
self.certificate_public,
)
else:
credentials = _CertificatePrivateCloudAppCredentials(
app_id,
self.certificate_thumbprint,
self.certificate_private_key,
self.tenant_id,
oauth_scope,
self.certificate_public,
login_endpoint,
validate_authority,
)

return credentials


class _CertificatePrivateCloudAppCredentials(CertificateAppCredentials):
def __init__(
self,
app_id: str,
certificate_thumbprint: str,
certificate_private_key: str,
channel_auth_tenant: str,
oauth_scope: str,
certificate_public: str,
oauth_endpoint: str,
validate_authority: bool,
):
super().__init__(
app_id,
certificate_thumbprint,
certificate_private_key,
channel_auth_tenant,
oauth_scope,
certificate_public,
)

self.oauth_endpoint = oauth_endpoint
self._validate_authority = validate_authority

@property
def validate_authority(self):
return self._validate_authority
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License.

import aiounittest
from botframework.connector.auth import (
AppCredentials,
AuthenticationConstants,
GovernmentConstants,
CertificateServiceClientCredentialsFactory,
CertificateAppCredentials,
CertificateGovernmentAppCredentials,
)


class CertificateServiceClientCredentialsFactoryTests(aiounittest.AsyncTestCase):
test_appid = "test_appid"
test_tenant_id = "test_tenant_id"
test_audience = "test_audience"
login_endpoint = AuthenticationConstants.TO_CHANNEL_FROM_BOT_LOGIN_URL_PREFIX
gov_login_endpoint = GovernmentConstants.TO_CHANNEL_FROM_BOT_LOGIN_URL_PREFIX
private_login_endpoint = "https://login.privatecloud.com"

async def test_can_create_public_credentials(self):
factory = CertificateServiceClientCredentialsFactory(
app_id=CertificateServiceClientCredentialsFactoryTests.test_appid,
certificate_thumbprint="thumbprint",
certificate_private_key="private_key",
)

credentials = await factory.create_credentials(
CertificateServiceClientCredentialsFactoryTests.test_appid,
CertificateServiceClientCredentialsFactoryTests.test_audience,
CertificateServiceClientCredentialsFactoryTests.login_endpoint,
True,
)

assert isinstance(credentials, CertificateAppCredentials)

async def test_can_create_gov_credentials(self):
factory = CertificateServiceClientCredentialsFactory(
app_id=CertificateServiceClientCredentialsFactoryTests.test_appid,
certificate_thumbprint="thumbprint",
certificate_private_key="private_key",
)

credentials = await factory.create_credentials(
CertificateServiceClientCredentialsFactoryTests.test_appid,
CertificateServiceClientCredentialsFactoryTests.test_audience,
CertificateServiceClientCredentialsFactoryTests.gov_login_endpoint,
True,
)

assert isinstance(credentials, CertificateGovernmentAppCredentials)

async def test_can_create_private_credentials(self):
factory = CertificateServiceClientCredentialsFactory(
app_id=CertificateServiceClientCredentialsFactoryTests.test_appid,
certificate_thumbprint="thumbprint",
certificate_private_key="private_key",
)

credentials = await factory.create_credentials(
CertificateServiceClientCredentialsFactoryTests.test_appid,
CertificateServiceClientCredentialsFactoryTests.test_audience,
CertificateServiceClientCredentialsFactoryTests.private_login_endpoint,
True,
)

assert isinstance(credentials, CertificateAppCredentials)
assert (
credentials.oauth_endpoint
== CertificateServiceClientCredentialsFactoryTests.private_login_endpoint
)