From 1330e03f5cbd132099344ce2a117c3ef41d481bb Mon Sep 17 00:00:00 2001 From: nanhe Date: Wed, 12 Oct 2022 21:20:15 +0800 Subject: [PATCH 1/2] support oidc credential --- .gitignore | 3 +- alibabacloud_credentials/client.py | 7 + alibabacloud_credentials/credentials.py | 54 ++++++ alibabacloud_credentials/models.py | 53 ++++-- alibabacloud_credentials/providers.py | 169 +++++++++++++++++- .../utils/auth_constant.py | 4 + alibabacloud_credentials/utils/auth_util.py | 1 + .../utils/parameter_helper.py | 13 ++ tests/test_client.py | 4 + tests/test_credentials.py | 43 +++++ tests/test_providers.py | 36 ++++ tests/test_util.py | 8 + 12 files changed, 371 insertions(+), 24 deletions(-) diff --git a/.gitignore b/.gitignore index fbc13a4..f313b60 100644 --- a/.gitignore +++ b/.gitignore @@ -4,4 +4,5 @@ runtime/ __pycache__/ .pytest_cache/ .coverage -*.pyc \ No newline at end of file +*.pyc +venv/ \ No newline at end of file diff --git a/alibabacloud_credentials/client.py b/alibabacloud_credentials/client.py index 3dbc89b..587d74c 100644 --- a/alibabacloud_credentials/client.py +++ b/alibabacloud_credentials/client.py @@ -59,6 +59,13 @@ def get_credential(config): 0, providers.RsaKeyPairCredentialProvider(config=config) ) + elif config.type == ac.OIDC_ROLE_ARN: + return credentials.OIDCRoleArnCredential( + config.access_key_id, + config.access_key_secret, + config.security_token, + 0, + providers.OIDCRoleArnCredentialProvider(config=config)) return providers.DefaultCredentialsProvider().get_credentials() def get_access_key_id(self): diff --git a/alibabacloud_credentials/credentials.py b/alibabacloud_credentials/credentials.py index af0ff8a..bac424a 100644 --- a/alibabacloud_credentials/credentials.py +++ b/alibabacloud_credentials/credentials.py @@ -185,6 +185,60 @@ async def get_security_token_async(self): await self._refresh_credential_async() return self.security_token +class OIDCRoleArnCredential(Credential, _AutomaticallyRefreshCredentials): + """OIDCRoleArnCredential""" + + def __init__(self, access_key_id, access_key_secret, security_token, expiration, provider): + super().__init__(expiration, provider) + self.access_key_id = access_key_id + self.access_key_secret = access_key_secret + self.security_token = security_token + self.credential_type = ac.OIDC_ROLE_ARN + + def _refresh_credential(self): + credential = super()._refresh_credential() + if credential: + self.access_key_id = credential.access_key_id + self.access_key_secret = credential.access_key_secret + self.expiration = credential.expiration + self.security_token = credential.security_token + + async def _refresh_credential_async(self): + credential = None + if self._with_should_refresh(): + credential = await self._get_new_credential_async() + + if credential: + self.access_key_id = credential.access_key_id + self.access_key_secret = credential.access_key_secret + self.expiration = credential.expiration + self.security_token = credential.security_token + + def get_access_key_id(self): + self._refresh_credential() + return self.access_key_id + + def get_access_key_secret(self): + self._refresh_credential() + return self.access_key_secret + + def get_security_token(self): + self._refresh_credential() + return self.security_token + + async def get_access_key_id_async(self): + await self._refresh_credential_async() + return self.access_key_id + + async def get_access_key_secret_async(self): + await self._refresh_credential_async() + return self.access_key_secret + + async def get_security_token_async(self): + await self._refresh_credential_async() + return self.security_token + + class CredentialsURICredential(): """CredentialsURICredential""" diff --git a/alibabacloud_credentials/models.py b/alibabacloud_credentials/models.py index 8ceeec1..5152284 100644 --- a/alibabacloud_credentials/models.py +++ b/alibabacloud_credentials/models.py @@ -7,26 +7,29 @@ class Config(TeaModel): """ Model for initing credential """ + def __init__( - self, - access_key_id: str = '', - access_key_secret: str = '', - security_token: str = '', - bearer_token: str = '', - duration_seconds: int = '', - role_arn: str = '', - policy: str = '', - role_session_expiration: int = '', - role_session_name: str = '', - public_key_id: str = '', - private_key_file: str = '', - role_name: str = '', - type: str = '', - host: str = '', - timeout: int = 1000, - connect_timeout: int = 1000, - proxy: str = '', - credentials_uri: str = '' + self, + access_key_id: str = '', + access_key_secret: str = '', + security_token: str = '', + bearer_token: str = '', + duration_seconds: int = '', + role_arn: str = '', + oidc_provider_arn: str = '', + oidc_token_file_path: str = '', + policy: str = '', + role_session_expiration: int = '', + role_session_name: str = '', + public_key_id: str = '', + private_key_file: str = '', + role_name: str = '', + type: str = '', + host: str = '', + timeout: int = 1000, + connect_timeout: int = 1000, + proxy: str = '', + credentials_uri: str = '' ): # accesskey id self.access_key_id = access_key_id @@ -40,6 +43,10 @@ def __init__( self.duration_seconds = duration_seconds # role arn self.role_arn = role_arn + # oidc provider arn + self.oidc_provider_arn = oidc_provider_arn + # oidc token file path + self.oidc_token_file_path = oidc_token_file_path # policy self.policy = policy # role session expiration @@ -78,6 +85,10 @@ def to_map(self): result['durationSeconds'] = self.duration_seconds if self.role_arn is not None: result['roleArn'] = self.role_arn + if self.oidc_provider_arn is not None: + result['oidcProviderArn'] = self.oidc_provider_arn + if self.oidc_token_file_path is not None: + result['oidcTokenFilePath'] = self.oidc_token_file_path if self.policy is not None: result['policy'] = self.policy if self.role_session_expiration is not None: @@ -118,6 +129,10 @@ def from_map(self, m: dict = None): self.duration_seconds = m.get('durationSeconds') if m.get('roleArn') is not None: self.role_arn = m.get('roleArn') + if m.get('oidcProviderArn') is not None: + self.oidc_provider_arn = m.get('oidcProviderArn') + if m.get('oidcTokenFilePath') is not None: + self.oidc_token_file_path = m.get('oidcTokenFilePath') if m.get('policy') is not None: self.policy = m.get('policy') if m.get('roleSessionExpiration') is not None: diff --git a/alibabacloud_credentials/providers.py b/alibabacloud_credentials/providers.py index 0b6f3cb..18b8236 100644 --- a/alibabacloud_credentials/providers.py +++ b/alibabacloud_credentials/providers.py @@ -31,6 +31,8 @@ def __init__(self, config=None): self.role_session_name = config.role_session_name self.public_key_id = config.public_key_id self.role_name = config.role_name + self.oidc_provider_arn = config.oidc_provider_arn + self.oidc_token_file_path = config.oidc_token_file_path self.private_key_file = config.private_key_file self.bearer_token = config.bearer_token self.security_token = config.security_token @@ -215,8 +217,12 @@ def _create_credentials(self, turl=None): 'RoleArn': self.role_arn, 'AccessKeyId': self.access_key_id, 'RegionId': self.region_id, - 'RoleSessionName': self.role_session_name + 'RoleSessionName': self.role_session_name, + 'SignatureMethod': 'HMAC-SHA1', + 'SignatureVersion': '1.0' } + tea_request.query["Timestamp"] = ph.get_iso_8061_date() + tea_request.query["SignatureNonce"] = ph.get_uuid() if self.policy is not None: tea_request.query["Policy"] = self.policy string_to_sign = ph.compose_string_to_sign("GET", tea_request.query) @@ -252,8 +258,12 @@ async def _create_credentials_async(self, turl=None): 'RoleArn': self.role_arn, 'AccessKeyId': self.access_key_id, 'RegionId': self.region_id, - 'RoleSessionName': self.role_session_name + 'RoleSessionName': self.role_session_name, + 'SignatureMethod': 'HMAC-SHA1', + 'SignatureVersion': '1.0' } + tea_request.query["Timestamp"] = ph.get_iso_8061_date() + tea_request.query["SignatureNonce"] = ph.get_uuid() if self.policy is not None: tea_request.query["Policy"] = self.policy string_to_sign = ph.compose_string_to_sign("GET", tea_request.query) @@ -276,6 +286,124 @@ async def _create_credentials_async(self, turl=None): raise CredentialException(response.body.decode('utf-8')) +class OIDCRoleArnCredentialProvider(AlibabaCloudCredentialsProvider): + """OIDCRoleArnCredentialProvider""" + + def __init__(self, access_key_id=None, access_key_secret=None, role_session_name=None, role_arn=None, + oidc_provider_arn=None, + oidc_token_file_path=None, + region_id=None, + policy=None, config=None): + self._verify_empty_args(access_key_id, access_key_secret, config=config) + super().__init__(config) + self._set_arg('role_arn', role_arn) + self._set_arg('oidc_provider_arn', oidc_provider_arn) + if oidc_token_file_path is not None: + self._set_arg('oidc_token_file_path', oidc_token_file_path) + elif config.oidc_token_file_path is not None: + self._set_arg('oidc_token_file_path', oidc_token_file_path) + elif au.environment_oidc_token_file is not None: + self._set_arg('oidc_token_file_path', au.environment_oidc_token_file) + else: + raise CredentialException( + 'The oidc_token_file_path does not exist and env ALIBABA_CLOUD_OIDC_TOKEN_FILE is none.') + self._set_arg('access_key_id', access_key_id) + self._set_arg('access_key_secret', access_key_secret) + self._set_arg('region_id', region_id) + self._set_arg('role_session_name', role_session_name) + self._set_arg('policy', policy) + + def get_credentials(self): + return self._create_credentials() + + def _create_credentials(self, turl=None): + # 获取credential 先实现签名用工具类 + oidc_token = au.get_private_key(self.oidc_token_file_path) + tea_request = TeaRequest() + tea_request.query = { + 'Action': 'AssumeRoleWithOIDC', + 'Format': 'JSON', + 'Version': '2015-04-01', + 'RegionId': self.region_id, + 'DurationSeconds': str(self.duration_seconds), + 'RoleArn': self.role_arn, + 'OIDCProviderArn': self.oidc_provider_arn, + 'OIDCToken': oidc_token, + 'RoleSessionName': self.role_session_name, + 'SignatureMethod': 'HMAC-SHA1', + 'SignatureVersion': '1.0' + } + tea_request.query["Timestamp"] = ph.get_iso_8061_date() + tea_request.query["SignatureNonce"] = ph.get_uuid() + if self.policy is not None: + tea_request.query["Policy"] = self.policy + string_to_sign = ph.compose_string_to_sign("GET", tea_request.query) + if self.access_key_id is not None and self.access_key_secret is not None: + tea_request.query["AccessKeyId"] = self.access_key_id + signature = ph.sign_string(string_to_sign, self.access_key_secret + "&") + tea_request.query["Signature"] = signature + tea_request.protocol = 'https' + tea_request.headers['host'] = turl if turl else 'sts.aliyuncs.com' + # request + response = TeaCore.do_action(tea_request) + if response.status_code == 200: + dic = json.loads(response.body.decode('utf-8')) + if "Credentials" in dic: + cre = dic.get("Credentials") + # 先转换为时间数组 + time_array = time.strptime(cre.get("Expiration"), "%Y-%m-%dT%H:%M:%SZ") + # 转换为时间戳 + expiration = calendar.timegm(time_array) + return credentials.OIDCRoleArnCredential(cre.get("AccessKeyId"), cre.get("AccessKeySecret"), + cre.get("SecurityToken"), expiration, self) + raise CredentialException(response.body.decode('utf-8')) + + async def get_credentials_async(self): + return await self._create_credentials_async() + + async def _create_credentials_async(self, turl=None): + # 获取credential 先实现签名用工具类 + oidc_token = au.get_private_key(self.oidc_token_file_path) + tea_request = TeaRequest() + tea_request.query = { + 'Action': 'AssumeRoleWithOIDC', + 'Format': 'JSON', + 'Version': '2015-04-01', + 'RegionId': self.region_id, + 'DurationSeconds': str(self.duration_seconds), + 'RoleArn': self.role_arn, + 'OIDCProviderArn': self.oidc_provider_arn, + 'OIDCToken': oidc_token, + 'RoleSessionName': self.role_session_name, + 'SignatureMethod': 'HMAC-SHA1', + 'SignatureVersion': '1.0' + } + tea_request.query["Timestamp"] = ph.get_iso_8061_date() + tea_request.query["SignatureNonce"] = ph.get_uuid() + if self.policy is not None: + tea_request.query["Policy"] = self.policy + string_to_sign = ph.compose_string_to_sign("GET", tea_request.query) + if self.access_key_id is not None and self.access_key_secret is not None: + tea_request.query["AccessKeyId"] = self.access_key_id + signature = ph.sign_string(string_to_sign, self.access_key_secret + "&") + tea_request.query["Signature"] = signature + tea_request.protocol = 'https' + tea_request.headers['host'] = turl if turl else 'sts.aliyuncs.com' + # request + response = TeaCore.async_do_action(tea_request) + if response.status_code == 200: + dic = json.loads(response.body.decode('utf-8')) + if "Credentials" in dic: + cre = dic.get("Credentials") + # 先转换为时间数组 + time_array = time.strptime(cre.get("Expiration"), "%Y-%m-%dT%H:%M:%SZ") + # 转换为时间戳 + expiration = calendar.timegm(time_array) + return credentials.OIDCRoleArnCredential(cre.get("AccessKeyId"), cre.get("AccessKeySecret"), + cre.get("SecurityToken"), expiration, self) + raise CredentialException(response.body.decode('utf-8')) + + class RsaKeyPairCredentialProvider(AlibabaCloudCredentialsProvider): def __init__(self, access_key_id=None, access_key_secret=None, region_id=None, config=None): @@ -297,7 +425,11 @@ async def _create_credential_async(self, turl=None): 'DurationSeconds': str(self.duration_seconds), 'AccessKeyId': self.access_key_id, 'RegionId': self.region_id, + 'SignatureMethod': 'HMAC-SHA1', + 'SignatureVersion': '1.0' } + tea_request.query["Timestamp"] = ph.get_iso_8061_date() + tea_request.query["SignatureNonce"] = ph.get_uuid() str_to_sign = ph.compose_string_to_sign('GET', tea_request.query) signature = ph.sign_string(str_to_sign, self.access_key_id + '&') @@ -312,7 +444,8 @@ async def _create_credential_async(self, turl=None): cre = dic.get("SessionAccessKey") time_array = time.strptime(cre.get("Expiration"), "%Y-%m-%dT%H:%M:%SZ") expiration = calendar.timegm(time_array) - return credentials.RsaKeyPairCredential(cre.get("SessionAccessKeyId"), cre.get("SessionAccessKeySecret"), + return credentials.RsaKeyPairCredential(cre.get("SessionAccessKeyId"), + cre.get("SessionAccessKeySecret"), expiration, self) raise CredentialException(response.body.decode('utf-8')) @@ -328,7 +461,11 @@ def _create_credential(self, turl=None): 'DurationSeconds': str(self.duration_seconds), 'AccessKeyId': self.access_key_id, 'RegionId': self.region_id, + 'SignatureMethod': 'HMAC-SHA1', + 'SignatureVersion': '1.0' } + tea_request.query["Timestamp"] = ph.get_iso_8061_date() + tea_request.query["SignatureNonce"] = ph.get_uuid() str_to_sign = ph.compose_string_to_sign('GET', tea_request.query) signature = ph.sign_string(str_to_sign, self.access_key_id + '&') @@ -343,7 +480,8 @@ def _create_credential(self, turl=None): cre = dic.get("SessionAccessKey") time_array = time.strptime(cre.get("Expiration"), "%Y-%m-%dT%H:%M:%SZ") expiration = calendar.timegm(time_array) - return credentials.RsaKeyPairCredential(cre.get("SessionAccessKeyId"), cre.get("SessionAccessKeySecret"), + return credentials.RsaKeyPairCredential(cre.get("SessionAccessKeyId"), + cre.get("SessionAccessKeySecret"), expiration, self) raise CredentialException(response.body.decode('utf-8')) @@ -389,6 +527,8 @@ def _create_credential(self, config): raise CredentialException("The configured client type is empty") elif ac.INI_TYPE_ARN == config_type: return self._get_sts_assume_role_session_provider(config).get_credentials() + elif ac.INI_TYPE_OIDC == config_type: + return self._get_sts_oidc_role_session_provider(config).get_credentials() elif ac.INI_TYPE_KEY_PAIR == config_type: return self._get_sts_get_session_access_key_provider(config).get_credentials() elif ac.INI_TYPE_RAM == config_type: @@ -417,6 +557,26 @@ def _get_sts_assume_role_session_provider(config): access_key_id, access_key_secret, role_session_name, role_arn, region_id, policy ) + @staticmethod + def _get_sts_oidc_role_session_provider(config): + access_key_id = config.get(ac.INI_ACCESS_KEY_ID) + access_key_secret = config.get(ac.INI_ACCESS_KEY_IDSECRET) + role_session_name = config.get(ac.INI_ROLE_SESSION_NAME) + role_arn = config.get(ac.INI_ROLE_ARN) + oidc_provider_arn = config.get(ac.INI_OIDC_PROVIDER_ARN) + oidc_token_file_path = config.get(ac.INI_OIDC_TOKEN_FILE_PATH) + region_id = config.get(ac.DEFAULT_REGION) + policy = config.get(ac.INI_POLICY) + + if not role_arn: + raise CredentialException("The configured role_arn is empty") + if not oidc_provider_arn: + raise CredentialException("The configured oidc_provider_arn is empty") + return OIDCRoleArnCredentialProvider( + access_key_id, access_key_secret, role_session_name, role_arn, oidc_provider_arn, oidc_token_file_path, + region_id, policy + ) + @staticmethod def _get_sts_get_session_access_key_provider(config): public_key_id = config.get(ac.INI_PUBLIC_KEY_ID) @@ -451,6 +611,7 @@ def get_credentials(self): raise CredentialException("Environment variable accessKeySecret cannot be empty") return credentials.AccessKeyCredential(access_key_id, access_key_secret) + class CredentialsUriProvider(AlibabaCloudCredentialsProvider): def get_credentials(self): credentials_uri = os.environ.get('ALIBABA_CLOUD_CREDENTIALS_URI') diff --git a/alibabacloud_credentials/utils/auth_constant.py b/alibabacloud_credentials/utils/auth_constant.py index c8a6291..ca346c7 100644 --- a/alibabacloud_credentials/utils/auth_constant.py +++ b/alibabacloud_credentials/utils/auth_constant.py @@ -6,6 +6,7 @@ INI_TYPE = "type" INI_TYPE_RAM = "ecs_ram_role" INI_TYPE_ARN = "ram_role_arn" +INI_TYPE_OIDC = "oidc_role_arn" INI_TYPE_KEY_PAIR = "rsa_key_pair" INI_PUBLIC_KEY_ID = "public_key_id" INI_PRIVATE_KEY_FILE = "private_key_file" @@ -14,6 +15,8 @@ INI_ROLE_SESSION_NAME = "role_session_name" INI_ROLE_ARN = "role_arn" INI_POLICY = "policy" +INI_OIDC_PROVIDER_ARN = "oidc_provider_arn" +INI_OIDC_TOKEN_FILE_PATH = "oidc_token_file_path" TSC_VALID_TIME_SECONDS = 3600 DEFAULT_REGION = "region_id" INI_ENABLE = "enable" @@ -21,6 +24,7 @@ STS = "sts" ECS_RAM_ROLE = "ecs_ram_role" RAM_ROLE_ARN = "ram_role_arn" +OIDC_ROLE_ARN = "oidc_role_arn" CREDENTIALS_URI = "credentials_uri" RSA_KEY_PAIR = "rsa_key_pair" BEARER = "bearer" diff --git a/alibabacloud_credentials/utils/auth_util.py b/alibabacloud_credentials/utils/auth_util.py index 3df2715..e3bc6d5 100644 --- a/alibabacloud_credentials/utils/auth_util.py +++ b/alibabacloud_credentials/utils/auth_util.py @@ -5,6 +5,7 @@ environment_access_key_secret = os.environ.get('ALIBABA_CLOUD_ACCESS_KEY_SECRET') environment_ECSMeta_data = os.environ.get('ALIBABA_CLOUD_ECS_METADATA') environment_credentials_file = os.environ.get('ALIBABA_CLOUD_CREDENTIALS_FILE') +environment_oidc_token_file = os.environ.get('ALIBABA_CLOUD_OIDC_TOKEN_FILE') private_key = None diff --git a/alibabacloud_credentials/utils/parameter_helper.py b/alibabacloud_credentials/utils/parameter_helper.py index eeb50c6..750f620 100644 --- a/alibabacloud_credentials/utils/parameter_helper.py +++ b/alibabacloud_credentials/utils/parameter_helper.py @@ -3,6 +3,9 @@ import hmac import hashlib import base64 +import socket +import uuid +import datetime TIME_ZONE = "UTC" FORMAT_ISO_8601 = "yyyy-MM-dd'T'HH:mm:ss'Z'" @@ -12,6 +15,16 @@ ALGORITHM_NAME = "HmacSHA1" +def get_uuid(): + name = socket.gethostname() + str(uuid.uuid1()) + namespace = uuid.NAMESPACE_URL + return str(uuid.uuid5(namespace, name)) + + +def get_iso_8061_date(): + return datetime.datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%SZ") + + def compose_string_to_sign(method, queries): sorted_key = sorted(list(queries.keys())) canonicalized_query_string = '' diff --git a/tests/test_client.py b/tests/test_client.py index 7a90039..39ab909 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -46,6 +46,10 @@ def test_client_ram_role_arn(self): conf = Config(type='ram_role_arn') self.assertIsInstance(Client.get_credential(conf), credentials.RamRoleArnCredential) + def test_client_oidc_role_arn(self): + conf = Config(type='oidc_role_arn', oidc_token_file_path='oidc_token_file_path') + self.assertIsInstance(Client.get_credential(conf), credentials.OIDCRoleArnCredential) + def test_client_rsa_key_pair(self): conf = Config(type='rsa_key_pair') self.assertIsInstance(Client.get_credential(conf), credentials.RsaKeyPairCredential) diff --git a/tests/test_credentials.py b/tests/test_credentials.py index ba91b52..bd461d6 100644 --- a/tests/test_credentials.py +++ b/tests/test_credentials.py @@ -14,6 +14,11 @@ def get_credentials(self): return credentials.RamRoleArnCredential("accessKeyId", "accessKeySecret", "securityToken", 100000000000, None) + class TestOIDCRoleArnProvider: + def get_credentials(self): + return credentials.OIDCRoleArnCredential("accessKeyId", "accessKeySecret", "securityToken", 100000000000, + None) + class TestRsaKeyPairProvider: def get_credentials(self): return credentials.RsaKeyPairCredential("accessKeyId", "accessKeySecret", 100000000000, @@ -108,6 +113,44 @@ def test_RamRoleArnCredential(self): cred._refresh_credential() self.assertIsNotNone(cred) + def test_OIDCRoleArnCredential(self): + access_key_id, access_key_secret, security_token, expiration = \ + 'access_key_id', 'access_key_secret', 'security_token', 640900000000 + provider = self.TestOIDCRoleArnProvider() + cred = credentials.OIDCRoleArnCredential( + access_key_id, access_key_secret, security_token, expiration, provider + ) + + self.assertEqual('access_key_id', cred.access_key_id) + self.assertEqual('access_key_secret', cred.access_key_secret) + self.assertEqual('security_token', cred.security_token) + self.assertEqual(640900000000, cred.expiration) + + access_key_id, access_key_secret, security_token, expiration = \ + 'access_key_id', 'access_key_secret', 'security_token', 6409 + provider = self.TestOIDCRoleArnProvider() + cred = credentials.OIDCRoleArnCredential( + access_key_id, access_key_secret, security_token, expiration, provider + ) + + # refresh token + self.assertTrue(cred._with_should_refresh()) + + self.assertEqual('accessKeyId', cred.get_access_key_id()) + self.assertEqual('accessKeySecret', cred.access_key_secret) + self.assertEqual('securityToken', cred.security_token) + self.assertEqual(100000000000, cred.expiration) + self.assertEqual('oidc_role_arn', cred.credential_type) + self.assertIsInstance(cred.provider, self.TestOIDCRoleArnProvider) + + self.assertFalse(cred._with_should_refresh()) + + g = cred._get_new_credential() + self.assertIsNotNone(g) + + cred._refresh_credential() + self.assertIsNotNone(cred) + def test_RsaKeyPairCredential(self): access_key_id, access_key_secret, expiration = 'access_key_id', 'access_key_secret', 90000000000 provider = providers.RsaKeyPairCredentialProvider(access_key_id, access_key_secret) diff --git a/tests/test_providers.py b/tests/test_providers.py index e34dee8..6db1d38 100644 --- a/tests/test_providers.py +++ b/tests/test_providers.py @@ -143,6 +143,42 @@ def test_RamRoleArnCredentialProvider(self): cred = prov._create_credentials(turl='http://127.0.0.1:8888') self.assertEqual('AccessKeyId', cred.access_key_id) + def test_OIDCRoleArnCredentialProvider(self): + access_key_id, access_key_secret, role_session_name, role_arn, oidc_provider_arn, oidc_token_file_path, region_id, policy = \ + 'access_key_id', 'access_key_secret', 'role_session_name', 'role_arn', 'oidc_provider_arn', 'tests/private_key.txt', 'region_id', 'policy' + prov = providers.OIDCRoleArnCredentialProvider( + access_key_id, access_key_secret, role_session_name, role_arn, oidc_provider_arn, oidc_token_file_path, region_id, policy + ) + self.assertEqual('access_key_id', prov.access_key_id) + self.assertEqual('access_key_secret', prov.access_key_secret) + self.assertEqual('role_session_name', prov.role_session_name) + self.assertEqual('role_arn', prov.role_arn) + self.assertEqual('oidc_provider_arn', prov.oidc_provider_arn) + self.assertEqual('tests/private_key.txt', prov.oidc_token_file_path) + self.assertEqual('region_id', prov.region_id) + self.assertEqual('policy', prov.policy) + + conf = models.Config( + access_key_id=access_key_id, + access_key_secret=access_key_secret, + role_session_name=role_session_name, + role_arn=role_arn, + oidc_provider_arn=oidc_provider_arn, + oidc_token_file_path=oidc_token_file_path + ) + prov = providers.OIDCRoleArnCredentialProvider(config=conf) + self.assertEqual('access_key_id', prov.access_key_id) + self.assertEqual('access_key_secret', prov.access_key_secret) + self.assertEqual('role_session_name', prov.role_session_name) + self.assertEqual('role_arn', prov.role_arn) + self.assertEqual('oidc_provider_arn', prov.oidc_provider_arn) + self.assertEqual('tests/private_key.txt', prov.oidc_token_file_path) + self.assertEqual('cn-hangzhou', prov.region_id) + self.assertIsNone(prov.policy) + + cred = prov._create_credentials(turl='http://127.0.0.1:8888') + self.assertEqual('AccessKeyId', cred.access_key_id) + def test_RamRoleArnCredentialProvider_async(self): async def main(): access_key_id, access_key_secret, role_session_name, role_arn, region_id, policy = \ diff --git a/tests/test_util.py b/tests/test_util.py index 54141d8..1b48eac 100644 --- a/tests/test_util.py +++ b/tests/test_util.py @@ -11,6 +11,12 @@ def test_get_private_key(self): self.assertEqual('test_private_key', key) def test_parameter_helper(self): + def test_get_uuid(test): + test.assertIsNotNone(parameter_helper.get_uuid()) + + def test_get_iso_8061_date(test): + test.assertIsNotNone(parameter_helper.get_iso_8061_date()) + def test_compose_string_to_sign(test): method, queries = 'GET', {} string_to_sign = parameter_helper.compose_string_to_sign(method, queries) @@ -26,6 +32,8 @@ def test_compose_url(test): res = parameter_helper.compose_url(endpoint, queries, protocol) test.assertEqual('https://aliyun.com/?tests=test', res) + test_get_uuid(self) + test_get_iso_8061_date(self) test_compose_string_to_sign(self) test_sign_string(self) test_compose_url(self) From 675de502e3c2f6fccffeab662878fb9aeccc2643 Mon Sep 17 00:00:00 2001 From: nanhe Date: Thu, 13 Oct 2022 12:19:54 +0800 Subject: [PATCH 2/2] update readme --- README-CN.md | 40 ++++++++++++++++++++++++++++++++++++++++ README.md | 40 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 80 insertions(+) diff --git a/README-CN.md b/README-CN.md index 7bcc3d0..c39854a 100644 --- a/README-CN.md +++ b/README-CN.md @@ -90,6 +90,34 @@ security_token = cred.get_security_token() cred_type = cred.get_type() ``` +##### OIDC Role ARN + +通过指定[OIDC角色],让凭证自动申请维护 STS Token。你可以通过为 `Policy` 赋值来限制获取到的 STS Token 的权限。 + +```python +from alibabacloud_credentials.client import Client +from alibabacloud_credentials.models import Config + +config = Config( + type='oidc_role_arn', # 凭证类型 + access_key_id='accessKeyId', # AccessKeyId + access_key_secret='accessKeySecret', # AccessKeySecret + security_token='securityToken', # STS Token + role_arn='roleArn', # 格式: acs:ram::用户ID:role/角色名 + oidc_provider_arn='oidcProviderArn', # 格式: acs:ram::用户Id:oidc-provider/OIDC身份提供商名称 + oidc_token_file_path='/Users/xxx/xxx',# 格式: path,可不设,但需要通过设置 ALIBABA_CLOUD_OIDC_TOKEN_FILE 来代替 + role_session_name='roleSessionName', # 角色会话名称 + policy='policy', # 可选, 限制 STS Token 的权限 + role_session_expiration=3600 # 可选, 限制 STS Token 的有效时间 +) +cred = Client(config) + +access_key_id = cred.get_access_key_id() +access_key_secret = cred.get_access_key_secret() +security_token = cred.get_security_token() +cred_type = cred.get_type() +``` + ##### ECS RAM Role 通过指定角色名称,让凭证自动申请维护 STS Token @@ -229,6 +257,18 @@ role_session_name = session_name # 选填 type = rsa_key_pair # 认证方式为 rsa_key_pair public_key_id = publicKeyId # Public Key ID private_key_file = /your/pk.pem # Private Key 文件 + +[client4] # 命名为 `client4` 的配置 +enable = false # 不启用 +type = oidc_role_arn # 认证方式为 oidc_role_arn +region_id = cn-test # 获取session用的region +policy = test # 选填 指定权限 +access_key_id = foo # 选填 +access_key_secret = bar # 选填 +role_arn = role_arn +oidc_provider_arn = oidc_provider_arn +oidc_token_file_path = /xxx/xxx # 可通过设置环境变量 ALIBABA_CLOUD_OIDC_TOKEN_FILE 来代替 +role_session_name = session_name # 选填 ``` 3.实例 RAM 角色 diff --git a/README.md b/README.md index 1283414..743a03a 100644 --- a/README.md +++ b/README.md @@ -87,6 +87,34 @@ security_token = cred.get_security_token() cred_type = cred.get_type() ``` +#### OIDC Role ARN + +By specifying [OIDC Role][OIDC Role], the credential will be able to automatically request maintenance of STS Token. If you want to limit the permissions([How to make a policy][policy]) of STS Token, you can assign value for `Policy`. + +```python +from alibabacloud_credentials.client import Client +from alibabacloud_credentials.models import Config + +config = Config( + type='oidc_role_arn', # credential type + access_key_id='accessKeyId', # AccessKeyId + access_key_secret='accessKeySecret', # AccessKeySecret + security_token='securityToken', # STS Token + role_arn='roleArn', # Format: acs:ram::USER_ID:role/ROLE_NAME + oidc_provider_arn='oidcProviderArn', # Format: acs:ram::USER_Id:oidc-provider/OIDC Providers + oidc_token_file_path='/Users/xxx/xxx',# oidc_token_file_path can be replaced by setting environment variable: ALIBABA_CLOUD_OIDC_TOKEN_FILE + role_session_name='roleSessionName', # Role Session Name + policy='policy', # Not required, limit the permissions of STS Token + role_session_expiration=3600 # Not required, limit the Valid time of STS Token +) +cred = Client(config) + +access_key_id = cred.get_access_key_id() +access_key_secret = cred.get_access_key_secret() +security_token = cred.get_security_token() +cred_type = cred.get_type() +``` + #### ECS RAM Role By specifying the role name, the credential will be able to automatically request maintenance of STS Token. @@ -225,6 +253,18 @@ role_session_name = session_name # optional type = rsa_key_pair # Certification type: rsa_key_pair public_key_id = publicKeyId # Public Key ID private_key_file = /your/pk.pem # Private Key file + +[client4] # configuration that is named as `client4` +enable = false # Disable +type = oidc_role_arn # Certification type: oidc_role_arn +region_id = cn-test +policy = test # optional Specify permissions +access_key_id = foo # optional +access_key_secret = bar # optional +role_arn = role_arn +oidc_provider_arn = oidc_provider_arn +oidc_token_file_path = /xxx/xxx # can be replaced by setting environment variable: ALIBABA_CLOUD_OIDC_TOKEN_FILE +role_session_name = session_name # optional ``` 3. Instance RAM Role