Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .github/workflows/testPython.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
python-version: [ 3.7, 3.8, 3.9 ]
python-version: [ "3.7", "3.8", "3.9", "3.10", "3.11", "3.12" ]
fail-fast: false
steps:
- uses: actions/checkout@v2
- name: Set up Python ${{ matrix.python-version }}
Expand Down
14 changes: 13 additions & 1 deletion alibabacloud_credentials/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@ def __init__(
timeout: int = 1000,
connect_timeout: int = 1000,
proxy: str = '',
credentials_uri: str = ''
credentials_uri: str = '',
enable_imds_v2: bool = False,
metadata_token_duration: int = 21600
):
# accesskey id
self.access_key_id = access_key_id
Expand Down Expand Up @@ -59,6 +61,8 @@ def __init__(
self.private_key_file = private_key_file
# role name
self.role_name = role_name
self.enable_imds_v2 = enable_imds_v2
self.metadata_token_duration = metadata_token_duration
# credential type
self.type = type
self.host = host
Expand Down Expand Up @@ -101,6 +105,10 @@ def to_map(self):
result['privateKeyFile'] = self.private_key_file
if self.role_name is not None:
result['roleName'] = self.role_name
if self.enable_imds_v2 is not None:
result['enableIMDSv2'] = self.enable_imds_v2
if self.metadata_token_duration is not None:
result['metadataTokenDuration'] = self.metadata_token_duration
if self.type is not None:
result['type'] = self.type
if self.host is not None:
Expand Down Expand Up @@ -145,6 +153,10 @@ def from_map(self, m: dict = None):
self.private_key_file = m.get('privateKeyFile')
if m.get('roleName') is not None:
self.role_name = m.get('roleName')
if m.get('enableIMDSv2') is not None:
self.enable_imds_v2 = m.get('enableIMDSv2')
if m.get('metadataTokenDuration') is not None:
self.metadata_token_duration = m.get('metadataTokenDuration')
if m.get('type') is not None:
self.type = m.get('type')
if m.get('host') is not None:
Expand Down
61 changes: 59 additions & 2 deletions alibabacloud_credentials/providers.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ def __init__(self, config=None):
self.role_session_name = config.role_session_name
self.public_key_id = config.public_key_id
self.role_name = config.role_name
self.enable_imds_v2 = config.enable_imds_v2
self.metadata_token_duration = config.metadata_token_duration
self.oidc_provider_arn = config.oidc_provider_arn
self.oidc_token_file_path = config.oidc_token_file_path
self.private_key_file = config.private_key_file
Expand Down Expand Up @@ -102,14 +104,24 @@ def clear_credentials_provider(self):

class EcsRamRoleCredentialProvider(AlibabaCloudCredentialsProvider):
"""EcsRamRoleCredentialProvider"""
default_metadata_token_duration = 21600

def __init__(self, role_name=None, config=None):
self._verify_empty_args(role_name, config=config)
super().__init__(config)
self.__url_in_ecs_metadata = "/latest/meta-data/ram/security-credentials/"
self.__url_in_ecs_metadata_token = "/latest/api/token"
self.__ecs_metadata_fetch_error_msg = "Failed to get RAM session credentials from ECS metadata service."
self.__ecs_metadata_token_fetch_error_msg = "Failed to get token from ECS Metadata Service."
self.__metadata_service_host = "100.100.100.200"
self._set_arg('role_name', role_name)
self.__metadata_token = None
self.__stale_time = 0
self.enable_imds_v2 = au.environment_ecs_meta_data_imds_v2_enable and au.environment_ecs_meta_data_imds_v2_enable.lower() == 'true'
self.metadata_token_duration = self.default_metadata_token_duration
if isinstance(config, Config):
self.enable_imds_v2 = config.enable_imds_v2
self.metadata_token_duration = config.metadata_token_duration

def _get_role_name(self, url=None):
url = url if url else f'http://{self.__metadata_service_host}{self.__url_in_ecs_metadata}'
Expand All @@ -129,9 +141,46 @@ async def _get_role_name_async(self, url=None):
raise CredentialException(self.__ecs_metadata_fetch_error_msg + " HttpCode=" + str(response.status_code))
self.role_name = response.body.decode('utf-8')

def _create_credential(self, url=None):
def _need_to_refresh_token(self):
return int(time.mktime(time.localtime())) >= self.__stale_time

def _get_metadata_token(self, url=None):
if self._need_to_refresh_token():
tmp_time = int(time.mktime(time.localtime())) + self.metadata_token_duration
tea_request = TeaRequest()
tea_request.method = 'PUT'
tea_request.headers['host'] = url if url else self.__metadata_service_host
tea_request.headers['X-aliyun-ecs-metadata-token-ttl-seconds'] = str(self.metadata_token_duration)
if not url:
tea_request.pathname = self.__url_in_ecs_metadata_token
response = TeaCore.do_action(tea_request)
if response.status_code != 200:
raise CredentialException(
self.__ecs_metadata_token_fetch_error_msg + " HttpCode=" + str(response.status_code))
self.__stale_time = tmp_time
self.__metadata_token = response.body.decode('utf-8')

async def _get_metadata_token_async(self, url=None):
if self._need_to_refresh_token():
tmp_time = int(time.mktime(time.localtime())) + self.metadata_token_duration
tea_request = TeaRequest()
tea_request.method = 'PUT'
tea_request.headers['host'] = url if url else self.__metadata_service_host
tea_request.headers['X-aliyun-ecs-metadata-token-ttl-seconds'] = str(self.metadata_token_duration)
if not url:
tea_request.pathname = self.__url_in_ecs_metadata_token
response = await TeaCore.async_do_action(tea_request)
if response.status_code != 200:
raise CredentialException(
self.__ecs_metadata_token_fetch_error_msg + " HttpCode=" + str(response.status_code))
self.__stale_time = tmp_time
self.__metadata_token = response.body.decode('utf-8')

def _create_credential(self, url=None, metadata_token=None):
tea_request = TeaRequest()
tea_request.headers['host'] = url if url else self.__metadata_service_host
if metadata_token:
tea_request.headers['X-aliyun-ecs-metadata-token'] = metadata_token
if not url:
tea_request.pathname = self.__url_in_ecs_metadata + self.role_name
# request
Expand Down Expand Up @@ -160,11 +209,16 @@ def _create_credential(self, url=None):
def get_credentials(self):
if self.role_name == "":
self._get_role_name()
if self.enable_imds_v2:
self._get_metadata_token()
return self._create_credential(metadata_token=self.__metadata_token)
return self._create_credential()

async def _create_credential_async(self, url=None):
async def _create_credential_async(self, url=None, metadata_token=None):
tea_request = TeaRequest()
tea_request.headers['host'] = url if url else self.__metadata_service_host
if metadata_token:
tea_request.headers['X-aliyun-ecs-metadata-token'] = metadata_token
if not url:
tea_request.pathname = self.__url_in_ecs_metadata + self.role_name

Expand Down Expand Up @@ -194,6 +248,9 @@ async def _create_credential_async(self, url=None):
async def get_credentials_async(self):
if self.role_name == "":
await self._get_role_name_async()
if self.enable_imds_v2:
await self._get_metadata_token_async()
return await self._create_credential_async(metadata_token=self.__metadata_token)
return await self._create_credential_async()


Expand Down
1 change: 1 addition & 0 deletions alibabacloud_credentials/utils/auth_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
environment_access_key_id = os.environ.get('ALIBABA_CLOUD_ACCESS_KEY_ID')
environment_access_key_secret = os.environ.get('ALIBABA_CLOUD_ACCESS_KEY_SECRET')
environment_ECSMeta_data = os.environ.get('ALIBABA_CLOUD_ECS_METADATA')
environment_ecs_meta_data_imds_v2_enable = os.environ.get('ALIBABA_CLOUD_ECS_IMDSV2_ENABLE')
environment_credentials_file = os.environ.get('ALIBABA_CLOUD_CREDENTIALS_FILE')
environment_oidc_token_file = os.environ.get('ALIBABA_CLOUD_OIDC_TOKEN_FILE')
environment_role_arn = os.environ.get('ALIBABA_CLOUD_ROLE_ARN')
Expand Down
2 changes: 1 addition & 1 deletion tests/test_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,4 @@ def test_OIDCRoleArn(self):
default_client = Client()
default_client.get_access_key_id()
except CredentialException as e:
self.assertRegex(e.message, 'AuthenticationFail.OIDCToken.Invalid')
self.assertRegex(e.message, 'AuthenticationFail.NoPermission')
45 changes: 44 additions & 1 deletion tests/test_providers.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,12 @@ def do_GET(self):
b' {"Expiration": "3999-08-07T20:20:20Z", "AccessKeyId": "AccessKeyId"}, "SessionAccessKey":'
b' {"Expiration": "3999-08-07T20:20:20Z", "SessionAccessKeyId": "SessionAccessKeyId"}}')

def do_PUT(self):
self.send_response(200)
self.send_header('Content-type', 'text/plain')
self.end_headers()
self.wfile.write(b'token')


def run_server():
server = HTTPServer(('localhost', 8888), Request)
Expand All @@ -45,6 +51,26 @@ def test_EcsRamRoleCredentialProvider(self):
self.assertIsNotNone(prov)
self.assertEqual("roleName", prov.role_name)

auth_util.environment_ecs_meta_data_imds_v2_enable = 'False'
prov = providers.EcsRamRoleCredentialProvider("roleName")
self.assertIsNotNone(prov)
self.assertEqual("roleName", prov.role_name)
self.assertFalse(prov.enable_imds_v2)

auth_util.environment_ecs_meta_data_imds_v2_enable = '1'
prov = providers.EcsRamRoleCredentialProvider("roleName")
self.assertIsNotNone(prov)
self.assertEqual("roleName", prov.role_name)
self.assertFalse(prov.enable_imds_v2)

auth_util.environment_ecs_meta_data_imds_v2_enable = 'True'
prov = providers.EcsRamRoleCredentialProvider("roleName")
self.assertIsNotNone(prov)
self.assertEqual("roleName", prov.role_name)
self.assertTrue(prov.enable_imds_v2)

auth_util.environment_ecs_meta_data_imds_v2_enable = None

cfg = models.Config()
cfg.role_name = "roleNameConfig"
cfg.timeout = 1100
Expand All @@ -59,6 +85,23 @@ def test_EcsRamRoleCredentialProvider(self):
prov._get_role_name(url='http://127.0.0.1:8888')
self.assertIsNotNone(prov.role_name)

cfg.enable_imds_v2 = True
cfg.metadata_token_duration = 180
prov = providers.EcsRamRoleCredentialProvider(config=cfg)
self.assertIsNotNone(prov)
self.assertTrue(prov.enable_imds_v2)
self.assertEqual(180, prov.metadata_token_duration)
self.assertEqual("roleNameConfig", prov.role_name)
self.assertEqual(2300, prov.timeout)
prov._get_metadata_token(url='127.0.0.1:8888')
cred = prov._create_credential(url='127.0.0.1:8888')
self.assertEqual("token", getattr(prov, '_EcsRamRoleCredentialProvider__metadata_token'))
self.assertNotEqual(0, getattr(prov, '_EcsRamRoleCredentialProvider__stale_time'))
self.assertEqual('ak', cred.access_key_id)

prov._get_role_name(url='http://127.0.0.1:8888')
self.assertIsNotNone(prov.role_name)

def test_EcsRamRoleCredentialProvider_async(self):
async def main():
prov = providers.EcsRamRoleCredentialProvider("roleName")
Expand Down Expand Up @@ -126,7 +169,7 @@ def test_DefaultCredentialsProvider(self):
try:
prov.get_credentials()
except Exception as e:
self.assertRegex(e.message, 'AuthenticationFail.OIDCToken.Invalid')
self.assertRegex(e.message, 'AuthenticationFail.NoPermission')
auth_util.environment_role_arn = environment_role_arn
auth_util.environment_oidc_provider_arn = environment_oidc_provider_arn
auth_util.environment_oidc_token_file = environment_oidc_token_file
Expand Down