diff --git a/.gitignore b/.gitignore
index cd4dd29..fbc13a4 100644
--- a/.gitignore
+++ b/.gitignore
@@ -3,4 +3,5 @@ runtime/
.vscode/
__pycache__/
.pytest_cache/
-.coverage
\ No newline at end of file
+.coverage
+*.pyc
\ No newline at end of file
diff --git a/README-CN.md b/README-CN.md
index 2ae4b7a..7bcc3d0 100644
--- a/README-CN.md
+++ b/README-CN.md
@@ -22,7 +22,7 @@ pip install alibabacloud_credentials
#### 凭证类型
-##### access_key
+##### Access Key
通过[用户信息管理](https://usercenter.console.aliyun.com/#/manage/ak)设置 access_key,它们具有该账户完全的权限,请妥善保管。有时出于安全考虑,您不能把具有完全访问权限的主账户 AccessKey 交于一个项目的开发者使用,您可以[创建RAM子账户](https://ram.console.aliyun.com/users)并为子账户[授权](https://ram.console.aliyun.com/permissions),使用RAM子用户的 AccessKey 来进行API调用。
@@ -42,9 +42,7 @@ access_key_secret = cred.get_access_key_secret()
cred_type = cred.get_type()
```
-
-
-##### sts
+##### STS
通过安全令牌服务(Security Token Service,简称 STS),申请临时安全凭证(Temporary Security Credentials,简称 TSC),创建临时安全凭证。
@@ -66,9 +64,7 @@ security_token = cred.get_security_token()
cred_type = cred.get_type()
```
-
-
-##### Ram_role_arn
+##### RAM Role ARN
通过指定[RAM角色](https://ram.console.aliyun.com/#/role/list),让凭证自动申请维护 STS Token。你可以通过为 `Policy` 赋值来限制获取到的 STS Token 的权限。
@@ -94,9 +90,7 @@ security_token = cred.get_security_token()
cred_type = cred.get_type()
```
-
-
-##### ecs_ram_role
+##### ECS RAM Role
通过指定角色名称,让凭证自动申请维护 STS Token
@@ -116,9 +110,27 @@ security_token = cred.get_security_token()
cred_type = cred.get_type()
```
+##### Credentials URI
+通过指定一个 Credentials 地址,从外部服务申请并自动维护 STS Token
-##### Ras_key_pair
+```python
+from alibabacloud_credentials.client import Client
+from alibabacloud_credentials.models import Config
+
+config = Config(
+ type='credentials_uri', # 凭证类型
+ credentials_uri='http://local_or_remote_uri/', # Credentials URI
+)
+cred = Client(config)
+
+access_key_id = cred.get_access_key_id()
+access_key_secret = cred.get_access_key_secret()
+security_token = cred.get_security_token()
+cred_type = cred.get_type()
+```
+
+##### RSA Key Pair
通过指定公钥ID和私钥文件,让凭证自动申请维护 AccessKey。仅支持日本站。
@@ -139,9 +151,7 @@ security_token = cred.get_security_token()
cred_type = cred.get_type()
```
-
-
-##### bearer
+##### Bearer
如呼叫中心(CCC)需用此凭证,请自行申请维护 Bearer Token。
@@ -225,6 +235,9 @@ private_key_file = /your/pk.pem # Private Key 文件
如果定义了环境变量 `ALIBABA_CLOUD_ECS_METADATA` 且不为空,程序会将该环境变量的值作为角色名称,请求 获取临时安全凭证。
+4. Credentials URI
+
+如果定义了环境变量 `ALIBABA_CLOUD_CREDENTIALS_URI` 且不为空, 程序会将该环境变量的值作为 Credentials URI 地址,在调用时,获取临时安全凭证。
## 问题
diff --git a/README.md b/README.md
index a6fbbc1..1283414 100644
--- a/README.md
+++ b/README.md
@@ -19,7 +19,7 @@ Before you begin, you need to sign up for an Alibaba Cloud account and retrieve
### Credential Type
-#### access_key
+#### Access Key
Setup access_key credential through [User Information Management][ak], it have full authority over the account, please keep it safe. Sometimes for security reasons, you cannot hand over a primary account AccessKey with full access to the developer of a project. You may create a sub-account [RAM Sub-account][ram] , grant its [authorization][permissions],and use the AccessKey of RAM Sub-account.
@@ -39,9 +39,7 @@ access_key_secret = cred.get_access_key_secret()
cred_type = cred.get_type()
```
-
-
-#### sts
+#### STS
Create a temporary security credential by applying Temporary Security Credentials (TSC) through the Security Token Service (STS).
@@ -63,9 +61,7 @@ security_token = cred.get_security_token()
cred_type = cred.get_type()
```
-
-
-#### ram_role_arn
+#### RAM Role ARN
By specifying [RAM Role][RAM Role], the credential will be able to automatically request maintenance of STS Token. If you want to limit the permissions([How to make a policy][policy]) of STS Token, you can assign value for `Policy`.
@@ -91,9 +87,7 @@ security_token = cred.get_security_token()
cred_type = cred.get_type()
```
-
-
-#### ecs_ram_role
+#### ECS RAM Role
By specifying the role name, the credential will be able to automatically request maintenance of STS Token.
@@ -113,9 +107,27 @@ security_token = cred.get_security_token()
cred_type = cred.get_type()
```
+##### Credentials URI
+By specifying a credentials uri, get credential from the local or remote uri, the credential will be able to automatically request maintenance to keep it update.
-#### rsa_key_pair
+```python
+from alibabacloud_credentials.client import Client
+from alibabacloud_credentials.models import Config
+
+config = Config(
+ type='credentials_uri', # credential type
+ credentials_uri='http://local_or_remote_uri/', # Credentials URI
+)
+cred = Client(config)
+
+access_key_id = cred.get_access_key_id()
+access_key_secret = cred.get_access_key_secret()
+security_token = cred.get_security_token()
+cred_type = cred.get_type()
+```
+
+#### RSA Key Pair
By specifying the public key ID and the private key file, the credential will be able to automatically request maintenance of the AccessKey before sending the request. Only Japan station is supported.
@@ -136,9 +148,7 @@ security_token = cred.get_security_token()
cred_type = cred.get_type()
```
-
-
-#### bearer
+#### Bearer
If credential is required by the Cloud Call Centre (CCC), please apply for Bearer Token maintenance by yourself.
@@ -221,6 +231,9 @@ private_key_file = /your/pk.pem # Private Key file
If the environment variable `ALIBABA_CLOUD_ECS_METADATA` is defined and not empty, the program will take the value of the environment variable as the role name and request to get the temporary Security credentials.
+4. Credentials URI
+
+If the environment variable `ALIBABA_CLOUD_CREDENTIALS_URI` is defined and not empty, the program will take the value of the environment variable as credentials uri to get the temporary Security credentials.
## Issues
diff --git a/alibabacloud_credentials/client.py b/alibabacloud_credentials/client.py
index 79e6397..3dbc89b 100644
--- a/alibabacloud_credentials/client.py
+++ b/alibabacloud_credentials/client.py
@@ -21,9 +21,9 @@ class Client:
def __init__(self, config=None):
if config is None:
- config = Config()
provider = providers.DefaultCredentialsProvider()
self.cloud_credential = provider.get_credentials()
+ return
self.cloud_credential = self.get_credential(config)
@staticmethod
@@ -42,6 +42,8 @@ def get_credential(config):
0,
providers.EcsRamRoleCredentialProvider(config=config)
)
+ elif config.type == ac.CREDENTIALS_URI:
+ return credentials.CredentialsURICredential(config.credentials_uri)
elif config.type == ac.RAM_ROLE_ARN:
return credentials.RamRoleArnCredential(
config.access_key_id,
diff --git a/alibabacloud_credentials/credentials.py b/alibabacloud_credentials/credentials.py
index 4523c04..af0ff8a 100644
--- a/alibabacloud_credentials/credentials.py
+++ b/alibabacloud_credentials/credentials.py
@@ -1,7 +1,13 @@
+import calendar
+import json
import time
+from urllib.parse import urlparse, parse_qs
-from alibabacloud_credentials.utils import auth_constant as ac
+from Tea.core import TeaCore
+from Tea.request import TeaRequest
+from alibabacloud_credentials.utils import auth_constant as ac
+from alibabacloud_credentials.exceptions import CredentialException
class Credential:
def get_access_key_id(self):
@@ -179,6 +185,119 @@ async def get_security_token_async(self):
await self._refresh_credential_async()
return self.security_token
+class CredentialsURICredential():
+ """CredentialsURICredential"""
+
+ def __init__(self, credentials_uri):
+ self.access_key_id = None
+ self.access_key_secret = None
+ self.security_token = None
+ self.expiration = None
+ self.credentials_uri = credentials_uri
+ self.credential_type = ac.CREDENTIALS_URI
+
+ def _need_refresh(self):
+ if self.expiration is None:
+ return True
+
+ return int(time.mktime(time.localtime())) >= (self.expiration - 180)
+
+ def _ensure_credential(self):
+ if self._need_refresh():
+ self._get_new_credential()
+
+ async def _ensure_credential_async(self):
+ if self._need_refresh():
+ await self._get_new_credential_async()
+
+ def _get_new_credential(self):
+ r = urlparse(self.credentials_uri)
+ tea_request = TeaRequest()
+ tea_request.headers['host'] = r.hostname
+ tea_request.port = r.port
+ tea_request.method = 'GET'
+ tea_request.pathname = r.path
+ for key, values in parse_qs(r.query).items():
+ for value in values:
+ tea_request.query[key] = value
+ response = TeaCore.do_action(tea_request)
+ if response.status_code != 200:
+ raise CredentialException("Get credentials from " + self.credentials_uri + " failed, HttpCode=" + str(response.status_code))
+ body = response.body.decode('utf-8')
+
+ dic = json.loads(body)
+ content_code = dic.get('Code')
+ content_access_key_id = dic.get('AccessKeyId')
+ content_access_key_secret = dic.get('AccessKeySecret')
+ content_security_token = dic.get('SecurityToken')
+ content_expiration = dic.get('Expiration')
+
+ if content_code != "Success":
+ raise CredentialException("Get credentials from " + self.credentials_uri + " failed, Code is " + content_code)
+
+ # 先转换为时间数组
+ time_array = time.strptime(content_expiration, "%Y-%m-%dT%H:%M:%SZ")
+ # 转换为时间戳
+ time_stamp = calendar.timegm(time_array)
+ self.access_key_id = content_access_key_id
+ self.access_key_secret = content_access_key_secret
+ self.security_token = content_security_token
+ self.expiration = time_stamp
+
+ async def _get_new_credential_async(self):
+ r = urlparse(self.credentials_uri)
+ tea_request = TeaRequest()
+ tea_request.headers['host'] = r.netloc
+ tea_request.method = 'GET'
+ tea_request.pathname = r.path
+ tea_request.query = parse_qs(r.query)
+ response = await TeaCore.async_do_action(tea_request)
+ if response.status_code != 200:
+ raise CredentialException("Get credentials from " + self.credentials_uri + " failed, HttpCode=" + str(response.status_code))
+ body = response.body.decode('utf-8')
+
+ dic = json.loads(body)
+ content_code = dic.get('Code')
+ content_access_key_id = dic.get('AccessKeyId')
+ content_access_key_secret = dic.get('AccessKeySecret')
+ content_security_token = dic.get('SecurityToken')
+ content_expiration = dic.get('Expiration')
+
+ if content_code != "Success":
+ raise CredentialException("Get credentials from " + self.credentials_uri + " failed, Code is " + content_code)
+
+ # 先转换为时间数组
+ time_array = time.strptime(content_expiration, "%Y-%m-%dT%H:%M:%SZ")
+ # 转换为时间戳
+ time_stamp = calendar.timegm(time_array)
+ self.access_key_id = content_access_key_id
+ self.access_key_secret = content_access_key_secret
+ self.security_token = content_security_token
+ self.expiration = time_stamp
+
+ def get_access_key_id(self):
+ self._ensure_credential()
+ return self.access_key_id
+
+ def get_access_key_secret(self):
+ self._ensure_credential()
+ return self.access_key_secret
+
+ def get_security_token(self):
+ self._ensure_credential()
+ return self.security_token
+
+ async def get_access_key_id_async(self):
+ await self._ensure_credential_async()
+ return self.access_key_id
+
+ async def get_access_key_secret_async(self):
+ await self._ensure_credential_async()
+ return self.access_key_secret
+
+ async def get_security_token_async(self):
+ await self._ensure_credential_async()
+ return self.security_token
class RsaKeyPairCredential(Credential, _AutomaticallyRefreshCredentials):
def __init__(self, access_key_id, access_key_secret, expiration, provider):
diff --git a/alibabacloud_credentials/models.py b/alibabacloud_credentials/models.py
index 5e740b0..8ceeec1 100644
--- a/alibabacloud_credentials/models.py
+++ b/alibabacloud_credentials/models.py
@@ -26,6 +26,7 @@ def __init__(
timeout: int = 1000,
connect_timeout: int = 1000,
proxy: str = '',
+ credentials_uri: str = ''
):
# accesskey id
self.access_key_id = access_key_id
@@ -57,6 +58,8 @@ def __init__(
self.timeout = timeout
self.connect_timeout = connect_timeout
self.proxy = proxy
+ # credentials uri
+ self.credentials_uri = credentials_uri
def validate(self):
pass
@@ -97,6 +100,8 @@ def to_map(self):
result['connectTimeout'] = self.connect_timeout
if self.proxy is not None:
result['proxy'] = self.proxy
+ if self.credentials_uri is not None:
+ result['credentialsUri'] = self.credentials_uri
return result
def from_map(self, m: dict = None):
@@ -135,4 +140,6 @@ def from_map(self, m: dict = None):
self.connect_timeout = m.get('connectTimeout')
if m.get('proxy') is not None:
self.proxy = m.get('proxy')
+ if m.get('credentialsUri') is not None:
+ self.credentials_uri = m.get('credentials_uri')
return self
diff --git a/alibabacloud_credentials/providers.py b/alibabacloud_credentials/providers.py
index 37266f4..0b6f3cb 100644
--- a/alibabacloud_credentials/providers.py
+++ b/alibabacloud_credentials/providers.py
@@ -67,6 +67,7 @@ def __init__(self):
role_name = au.environment_ECSMeta_data
if role_name is not None:
self.user_configuration_providers.append(EcsRamRoleCredentialProvider(role_name))
+ self.user_configuration_providers.append(CredentialsUriProvider())
def get_credentials(self):
for provider in self.user_configuration_providers:
@@ -449,3 +450,10 @@ def get_credentials(self):
if len(access_key_secret) == 0:
raise CredentialException("Environment variable accessKeySecret cannot be empty")
return credentials.AccessKeyCredential(access_key_id, access_key_secret)
+
+class CredentialsUriProvider(AlibabaCloudCredentialsProvider):
+ def get_credentials(self):
+ credentials_uri = os.environ.get('ALIBABA_CLOUD_CREDENTIALS_URI')
+ if credentials_uri is None:
+ return None
+ return credentials.CredentialsURICredential(credentials_uri)
diff --git a/alibabacloud_credentials/utils/auth_constant.py b/alibabacloud_credentials/utils/auth_constant.py
index 4dec9c1..c8a6291 100644
--- a/alibabacloud_credentials/utils/auth_constant.py
+++ b/alibabacloud_credentials/utils/auth_constant.py
@@ -21,5 +21,6 @@
STS = "sts"
ECS_RAM_ROLE = "ecs_ram_role"
RAM_ROLE_ARN = "ram_role_arn"
+CREDENTIALS_URI = "credentials_uri"
RSA_KEY_PAIR = "rsa_key_pair"
BEARER = "bearer"
diff --git a/tests/test_client.py b/tests/test_client.py
index 479de6d..897837f 100644
--- a/tests/test_client.py
+++ b/tests/test_client.py
@@ -7,7 +7,7 @@
from alibabacloud_credentials import credentials
class TestClient(unittest.TestCase):
- def test_ak_client(self):
+ def test_client_ak(self):
conf = Config()
conf.type = auth_constant.ACCESS_KEY
conf.access_key_id = '123456'
@@ -20,26 +20,36 @@ def test_ak_client(self):
try:
cred = Client()
cred.get_access_key_id()
- except Exception as e:
- self.assertEqual('not found credentials', str(e))
+ except Exception as ex:
+ self.assertEqual('not found credentials', str(ex))
+ def test_client_sts(self):
conf = Config(type='sts')
cred = Client(conf)
self.assertIsInstance(cred.cloud_credential, credentials.StsCredential)
+ def test_client_bearer(self):
conf = Config(type='bearer')
cred = Client(conf)
self.assertIsInstance(cred.cloud_credential, credentials.BearerTokenCredential)
+ def test_client_ecs_ram_role(self):
conf = Config(type='ecs_ram_role')
self.assertIsInstance(Client.get_credential(conf), credentials.EcsRamRoleCredential)
+ def test_client_credentials_uri(self):
+ conf = Config(type='credentials_uri')
+ self.assertIsInstance(Client.get_credential(conf), credentials.CredentialsURICredential)
+
+ def test_client_ram_role_arn(self):
conf = Config(type='ram_role_arn')
self.assertIsInstance(Client.get_credential(conf), credentials.RamRoleArnCredential)
+ def test_client_rsa_key_pair(self):
conf = Config(type='rsa_key_pair')
self.assertIsInstance(Client.get_credential(conf), credentials.RsaKeyPairCredential)
+ def test_async_call(self):
conf = Config(
access_key_id='ak1',
access_key_secret='sk1',
diff --git a/tests/test_credentials.py b/tests/test_credentials.py
index 9f57523..3437606 100644
--- a/tests/test_credentials.py
+++ b/tests/test_credentials.py
@@ -18,7 +18,7 @@ class TestRsaKeyPairProvider:
def get_credentials(self):
return credentials.RsaKeyPairCredential("accessKeyId", "accessKeySecret", 100000000000,
None)
-
+
def test_EcsRamRoleCredential(self):
provider = providers.EcsRamRoleCredentialProvider("roleName")
access_key_id = 'access_key_id'
@@ -132,6 +132,16 @@ def test_RsaKeyPairCredential(self):
self.assertEqual('accessKeySecret', cred.access_key_secret)
self.assertEqual(100000000000, cred.expiration)
+ def test_CredentialsURICredential(self):
+ credentials_uri = 'http://localhost:6666/test'
+ cred = credentials.CredentialsURICredential(
+ credentials_uri
+ )
+ self.assertEqual('access_key_id', cred.access_key_id)
+ self.assertEqual('access_key_secret', cred.access_key_secret)
+ self.assertEqual('security_token', cred.security_token)
+ self.assertEqual('credentials_uri', cred.credential_type)
+
def test_StsCredential(self):
access_key_id, access_key_secret, security_token =\
'access_key_id', 'access_key_secret', 'security_token'