diff --git a/.github/workflows/testPython.yml b/.github/workflows/testPython.yml index 518175a..9f9f2e0 100644 --- a/.github/workflows/testPython.yml +++ b/.github/workflows/testPython.yml @@ -11,7 +11,8 @@ jobs: runs-on: ubuntu-latest strategy: matrix: - python-version: [ 3.7, 3.8, 3.9 ] + python-version: [ "3.7", "3.8", "3.9", "3.10", "3.11", "3.12" ] + fail-fast: false steps: - uses: actions/checkout@v2 - name: Set up Python ${{ matrix.python-version }} diff --git a/alibabacloud_credentials/models.py b/alibabacloud_credentials/models.py index 5152284..28e75ca 100644 --- a/alibabacloud_credentials/models.py +++ b/alibabacloud_credentials/models.py @@ -29,7 +29,9 @@ def __init__( timeout: int = 1000, connect_timeout: int = 1000, proxy: str = '', - credentials_uri: str = '' + credentials_uri: str = '', + enable_imds_v2: bool = False, + metadata_token_duration: int = 21600 ): # accesskey id self.access_key_id = access_key_id @@ -59,6 +61,8 @@ def __init__( self.private_key_file = private_key_file # role name self.role_name = role_name + self.enable_imds_v2 = enable_imds_v2 + self.metadata_token_duration = metadata_token_duration # credential type self.type = type self.host = host @@ -101,6 +105,10 @@ def to_map(self): result['privateKeyFile'] = self.private_key_file if self.role_name is not None: result['roleName'] = self.role_name + if self.enable_imds_v2 is not None: + result['enableIMDSv2'] = self.enable_imds_v2 + if self.metadata_token_duration is not None: + result['metadataTokenDuration'] = self.metadata_token_duration if self.type is not None: result['type'] = self.type if self.host is not None: @@ -145,6 +153,10 @@ def from_map(self, m: dict = None): self.private_key_file = m.get('privateKeyFile') if m.get('roleName') is not None: self.role_name = m.get('roleName') + if m.get('enableIMDSv2') is not None: + self.enable_imds_v2 = m.get('enableIMDSv2') + if m.get('metadataTokenDuration') is not None: + self.metadata_token_duration = m.get('metadataTokenDuration') if m.get('type') is not None: self.type = m.get('type') if m.get('host') is not None: diff --git a/alibabacloud_credentials/providers.py b/alibabacloud_credentials/providers.py index 90d6025..37ea502 100644 --- a/alibabacloud_credentials/providers.py +++ b/alibabacloud_credentials/providers.py @@ -31,6 +31,8 @@ def __init__(self, config=None): self.role_session_name = config.role_session_name self.public_key_id = config.public_key_id self.role_name = config.role_name + self.enable_imds_v2 = config.enable_imds_v2 + self.metadata_token_duration = config.metadata_token_duration self.oidc_provider_arn = config.oidc_provider_arn self.oidc_token_file_path = config.oidc_token_file_path self.private_key_file = config.private_key_file @@ -102,14 +104,24 @@ def clear_credentials_provider(self): class EcsRamRoleCredentialProvider(AlibabaCloudCredentialsProvider): """EcsRamRoleCredentialProvider""" + default_metadata_token_duration = 21600 def __init__(self, role_name=None, config=None): self._verify_empty_args(role_name, config=config) super().__init__(config) self.__url_in_ecs_metadata = "/latest/meta-data/ram/security-credentials/" + self.__url_in_ecs_metadata_token = "/latest/api/token" self.__ecs_metadata_fetch_error_msg = "Failed to get RAM session credentials from ECS metadata service." + self.__ecs_metadata_token_fetch_error_msg = "Failed to get token from ECS Metadata Service." self.__metadata_service_host = "100.100.100.200" self._set_arg('role_name', role_name) + self.__metadata_token = None + self.__stale_time = 0 + self.enable_imds_v2 = au.environment_ecs_meta_data_imds_v2_enable and au.environment_ecs_meta_data_imds_v2_enable.lower() == 'true' + self.metadata_token_duration = self.default_metadata_token_duration + if isinstance(config, Config): + self.enable_imds_v2 = config.enable_imds_v2 + self.metadata_token_duration = config.metadata_token_duration def _get_role_name(self, url=None): url = url if url else f'http://{self.__metadata_service_host}{self.__url_in_ecs_metadata}' @@ -129,9 +141,46 @@ async def _get_role_name_async(self, url=None): raise CredentialException(self.__ecs_metadata_fetch_error_msg + " HttpCode=" + str(response.status_code)) self.role_name = response.body.decode('utf-8') - def _create_credential(self, url=None): + def _need_to_refresh_token(self): + return int(time.mktime(time.localtime())) >= self.__stale_time + + def _get_metadata_token(self, url=None): + if self._need_to_refresh_token(): + tmp_time = int(time.mktime(time.localtime())) + self.metadata_token_duration + tea_request = TeaRequest() + tea_request.method = 'PUT' + tea_request.headers['host'] = url if url else self.__metadata_service_host + tea_request.headers['X-aliyun-ecs-metadata-token-ttl-seconds'] = str(self.metadata_token_duration) + if not url: + tea_request.pathname = self.__url_in_ecs_metadata_token + response = TeaCore.do_action(tea_request) + if response.status_code != 200: + raise CredentialException( + self.__ecs_metadata_token_fetch_error_msg + " HttpCode=" + str(response.status_code)) + self.__stale_time = tmp_time + self.__metadata_token = response.body.decode('utf-8') + + async def _get_metadata_token_async(self, url=None): + if self._need_to_refresh_token(): + tmp_time = int(time.mktime(time.localtime())) + self.metadata_token_duration + tea_request = TeaRequest() + tea_request.method = 'PUT' + tea_request.headers['host'] = url if url else self.__metadata_service_host + tea_request.headers['X-aliyun-ecs-metadata-token-ttl-seconds'] = str(self.metadata_token_duration) + if not url: + tea_request.pathname = self.__url_in_ecs_metadata_token + response = await TeaCore.async_do_action(tea_request) + if response.status_code != 200: + raise CredentialException( + self.__ecs_metadata_token_fetch_error_msg + " HttpCode=" + str(response.status_code)) + self.__stale_time = tmp_time + self.__metadata_token = response.body.decode('utf-8') + + def _create_credential(self, url=None, metadata_token=None): tea_request = TeaRequest() tea_request.headers['host'] = url if url else self.__metadata_service_host + if metadata_token: + tea_request.headers['X-aliyun-ecs-metadata-token'] = metadata_token if not url: tea_request.pathname = self.__url_in_ecs_metadata + self.role_name # request @@ -160,11 +209,16 @@ def _create_credential(self, url=None): def get_credentials(self): if self.role_name == "": self._get_role_name() + if self.enable_imds_v2: + self._get_metadata_token() + return self._create_credential(metadata_token=self.__metadata_token) return self._create_credential() - async def _create_credential_async(self, url=None): + async def _create_credential_async(self, url=None, metadata_token=None): tea_request = TeaRequest() tea_request.headers['host'] = url if url else self.__metadata_service_host + if metadata_token: + tea_request.headers['X-aliyun-ecs-metadata-token'] = metadata_token if not url: tea_request.pathname = self.__url_in_ecs_metadata + self.role_name @@ -194,6 +248,9 @@ async def _create_credential_async(self, url=None): async def get_credentials_async(self): if self.role_name == "": await self._get_role_name_async() + if self.enable_imds_v2: + await self._get_metadata_token_async() + return await self._create_credential_async(metadata_token=self.__metadata_token) return await self._create_credential_async() diff --git a/alibabacloud_credentials/utils/auth_util.py b/alibabacloud_credentials/utils/auth_util.py index ad65b72..ae68f9a 100644 --- a/alibabacloud_credentials/utils/auth_util.py +++ b/alibabacloud_credentials/utils/auth_util.py @@ -4,6 +4,7 @@ environment_access_key_id = os.environ.get('ALIBABA_CLOUD_ACCESS_KEY_ID') environment_access_key_secret = os.environ.get('ALIBABA_CLOUD_ACCESS_KEY_SECRET') environment_ECSMeta_data = os.environ.get('ALIBABA_CLOUD_ECS_METADATA') +environment_ecs_meta_data_imds_v2_enable = os.environ.get('ALIBABA_CLOUD_ECS_IMDSV2_ENABLE') environment_credentials_file = os.environ.get('ALIBABA_CLOUD_CREDENTIALS_FILE') environment_oidc_token_file = os.environ.get('ALIBABA_CLOUD_OIDC_TOKEN_FILE') environment_role_arn = os.environ.get('ALIBABA_CLOUD_ROLE_ARN') diff --git a/tests/test_integration.py b/tests/test_integration.py index 4e5db15..f20ec8e 100644 --- a/tests/test_integration.py +++ b/tests/test_integration.py @@ -34,4 +34,4 @@ def test_OIDCRoleArn(self): default_client = Client() default_client.get_access_key_id() except CredentialException as e: - self.assertRegex(e.message, 'AuthenticationFail.OIDCToken.Invalid') + self.assertRegex(e.message, 'AuthenticationFail.NoPermission') diff --git a/tests/test_providers.py b/tests/test_providers.py index 474a8c3..77e84e6 100644 --- a/tests/test_providers.py +++ b/tests/test_providers.py @@ -23,6 +23,12 @@ def do_GET(self): b' {"Expiration": "3999-08-07T20:20:20Z", "AccessKeyId": "AccessKeyId"}, "SessionAccessKey":' b' {"Expiration": "3999-08-07T20:20:20Z", "SessionAccessKeyId": "SessionAccessKeyId"}}') + def do_PUT(self): + self.send_response(200) + self.send_header('Content-type', 'text/plain') + self.end_headers() + self.wfile.write(b'token') + def run_server(): server = HTTPServer(('localhost', 8888), Request) @@ -45,6 +51,26 @@ def test_EcsRamRoleCredentialProvider(self): self.assertIsNotNone(prov) self.assertEqual("roleName", prov.role_name) + auth_util.environment_ecs_meta_data_imds_v2_enable = 'False' + prov = providers.EcsRamRoleCredentialProvider("roleName") + self.assertIsNotNone(prov) + self.assertEqual("roleName", prov.role_name) + self.assertFalse(prov.enable_imds_v2) + + auth_util.environment_ecs_meta_data_imds_v2_enable = '1' + prov = providers.EcsRamRoleCredentialProvider("roleName") + self.assertIsNotNone(prov) + self.assertEqual("roleName", prov.role_name) + self.assertFalse(prov.enable_imds_v2) + + auth_util.environment_ecs_meta_data_imds_v2_enable = 'True' + prov = providers.EcsRamRoleCredentialProvider("roleName") + self.assertIsNotNone(prov) + self.assertEqual("roleName", prov.role_name) + self.assertTrue(prov.enable_imds_v2) + + auth_util.environment_ecs_meta_data_imds_v2_enable = None + cfg = models.Config() cfg.role_name = "roleNameConfig" cfg.timeout = 1100 @@ -59,6 +85,23 @@ def test_EcsRamRoleCredentialProvider(self): prov._get_role_name(url='http://127.0.0.1:8888') self.assertIsNotNone(prov.role_name) + cfg.enable_imds_v2 = True + cfg.metadata_token_duration = 180 + prov = providers.EcsRamRoleCredentialProvider(config=cfg) + self.assertIsNotNone(prov) + self.assertTrue(prov.enable_imds_v2) + self.assertEqual(180, prov.metadata_token_duration) + self.assertEqual("roleNameConfig", prov.role_name) + self.assertEqual(2300, prov.timeout) + prov._get_metadata_token(url='127.0.0.1:8888') + cred = prov._create_credential(url='127.0.0.1:8888') + self.assertEqual("token", getattr(prov, '_EcsRamRoleCredentialProvider__metadata_token')) + self.assertNotEqual(0, getattr(prov, '_EcsRamRoleCredentialProvider__stale_time')) + self.assertEqual('ak', cred.access_key_id) + + prov._get_role_name(url='http://127.0.0.1:8888') + self.assertIsNotNone(prov.role_name) + def test_EcsRamRoleCredentialProvider_async(self): async def main(): prov = providers.EcsRamRoleCredentialProvider("roleName") @@ -126,7 +169,7 @@ def test_DefaultCredentialsProvider(self): try: prov.get_credentials() except Exception as e: - self.assertRegex(e.message, 'AuthenticationFail.OIDCToken.Invalid') + self.assertRegex(e.message, 'AuthenticationFail.NoPermission') auth_util.environment_role_arn = environment_role_arn auth_util.environment_oidc_provider_arn = environment_oidc_provider_arn auth_util.environment_oidc_token_file = environment_oidc_token_file