Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,5 @@ runtime/
.vscode/
__pycache__/
.pytest_cache/
.coverage
.coverage
*.pyc
41 changes: 27 additions & 14 deletions README-CN.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ pip install alibabacloud_credentials

#### 凭证类型

##### access_key
##### Access Key

通过[用户信息管理](https://usercenter.console.aliyun.com/#/manage/ak)设置 access_key,它们具有该账户完全的权限,请妥善保管。有时出于安全考虑,您不能把具有完全访问权限的主账户 AccessKey 交于一个项目的开发者使用,您可以[创建RAM子账户](https://ram.console.aliyun.com/users)并为子账户[授权](https://ram.console.aliyun.com/permissions),使用RAM子用户的 AccessKey 来进行API调用。

Expand All @@ -42,9 +42,7 @@ access_key_secret = cred.get_access_key_secret()
cred_type = cred.get_type()
```



##### sts
##### STS

通过安全令牌服务(Security Token Service,简称 STS),申请临时安全凭证(Temporary Security Credentials,简称 TSC),创建临时安全凭证。

Expand All @@ -66,9 +64,7 @@ security_token = cred.get_security_token()
cred_type = cred.get_type()
```



##### Ram_role_arn
##### RAM Role ARN

通过指定[RAM角色](https://ram.console.aliyun.com/#/role/list),让凭证自动申请维护 STS Token。你可以通过为 `Policy` 赋值来限制获取到的 STS Token 的权限。

Expand All @@ -94,9 +90,7 @@ security_token = cred.get_security_token()
cred_type = cred.get_type()
```



##### ecs_ram_role
##### ECS RAM Role

通过指定角色名称,让凭证自动申请维护 STS Token

Expand All @@ -116,9 +110,27 @@ security_token = cred.get_security_token()
cred_type = cred.get_type()
```

##### Credentials URI

通过指定一个 Credentials 地址,从外部服务申请并自动维护 STS Token

##### Ras_key_pair
```python
from alibabacloud_credentials.client import Client
from alibabacloud_credentials.models import Config

config = Config(
type='credentials_uri', # 凭证类型
credentials_uri='http://local_or_remote_uri/', # Credentials URI
)
cred = Client(config)

access_key_id = cred.get_access_key_id()
access_key_secret = cred.get_access_key_secret()
security_token = cred.get_security_token()
cred_type = cred.get_type()
```

##### RSA Key Pair

通过指定公钥ID和私钥文件,让凭证自动申请维护 AccessKey。仅支持日本站。

Expand All @@ -139,9 +151,7 @@ security_token = cred.get_security_token()
cred_type = cred.get_type()
```



##### bearer
##### Bearer

如呼叫中心(CCC)需用此凭证,请自行申请维护 Bearer Token。

Expand Down Expand Up @@ -225,6 +235,9 @@ private_key_file = /your/pk.pem # Private Key 文件

如果定义了环境变量 `ALIBABA_CLOUD_ECS_METADATA` 且不为空,程序会将该环境变量的值作为角色名称,请求 <http://100.100.100.200/latest/meta-data/ram/security-credentials/> 获取临时安全凭证。

4. Credentials URI

如果定义了环境变量 `ALIBABA_CLOUD_CREDENTIALS_URI` 且不为空, 程序会将该环境变量的值作为 Credentials URI 地址,在调用时,获取临时安全凭证。

## 问题

Expand Down
41 changes: 27 additions & 14 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ Before you begin, you need to sign up for an Alibaba Cloud account and retrieve

### Credential Type

#### access_key
#### Access Key

Setup access_key credential through [User Information Management][ak], it have full authority over the account, please keep it safe. Sometimes for security reasons, you cannot hand over a primary account AccessKey with full access to the developer of a project. You may create a sub-account [RAM Sub-account][ram] , grant its [authorization][permissions],and use the AccessKey of RAM Sub-account.

Expand All @@ -39,9 +39,7 @@ access_key_secret = cred.get_access_key_secret()
cred_type = cred.get_type()
```



#### sts
#### STS

Create a temporary security credential by applying Temporary Security Credentials (TSC) through the Security Token Service (STS).

Expand All @@ -63,9 +61,7 @@ security_token = cred.get_security_token()
cred_type = cred.get_type()
```



#### ram_role_arn
#### RAM Role ARN

By specifying [RAM Role][RAM Role], the credential will be able to automatically request maintenance of STS Token. If you want to limit the permissions([How to make a policy][policy]) of STS Token, you can assign value for `Policy`.

Expand All @@ -91,9 +87,7 @@ security_token = cred.get_security_token()
cred_type = cred.get_type()
```



#### ecs_ram_role
#### ECS RAM Role

By specifying the role name, the credential will be able to automatically request maintenance of STS Token.

Expand All @@ -113,9 +107,27 @@ security_token = cred.get_security_token()
cred_type = cred.get_type()
```

##### Credentials URI

By specifying a credentials uri, get credential from the local or remote uri, the credential will be able to automatically request maintenance to keep it update.

#### rsa_key_pair
```python
from alibabacloud_credentials.client import Client
from alibabacloud_credentials.models import Config

config = Config(
type='credentials_uri', # credential type
credentials_uri='http://local_or_remote_uri/', # Credentials URI
)
cred = Client(config)

access_key_id = cred.get_access_key_id()
access_key_secret = cred.get_access_key_secret()
security_token = cred.get_security_token()
cred_type = cred.get_type()
```

#### RSA Key Pair

By specifying the public key ID and the private key file, the credential will be able to automatically request maintenance of the AccessKey before sending the request. Only Japan station is supported.

Expand All @@ -136,9 +148,7 @@ security_token = cred.get_security_token()
cred_type = cred.get_type()
```



#### bearer
#### Bearer

If credential is required by the Cloud Call Centre (CCC), please apply for Bearer Token maintenance by yourself.

Expand Down Expand Up @@ -221,6 +231,9 @@ private_key_file = /your/pk.pem # Private Key file

If the environment variable `ALIBABA_CLOUD_ECS_METADATA` is defined and not empty, the program will take the value of the environment variable as the role name and request <http://100.100.100.200/latest/meta-data/ram/security-credentials/> to get the temporary Security credentials.

4. Credentials URI

If the environment variable `ALIBABA_CLOUD_CREDENTIALS_URI` is defined and not empty, the program will take the value of the environment variable as credentials uri to get the temporary Security credentials.

## Issues

Expand Down
4 changes: 3 additions & 1 deletion alibabacloud_credentials/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@ class Client:

def __init__(self, config=None):
if config is None:
config = Config()
provider = providers.DefaultCredentialsProvider()
self.cloud_credential = provider.get_credentials()
return
self.cloud_credential = self.get_credential(config)

@staticmethod
Expand All @@ -42,6 +42,8 @@ def get_credential(config):
0,
providers.EcsRamRoleCredentialProvider(config=config)
)
elif config.type == ac.CREDENTIALS_URI:
return credentials.CredentialsURICredential(config.credentials_uri)
elif config.type == ac.RAM_ROLE_ARN:
return credentials.RamRoleArnCredential(
config.access_key_id,
Expand Down
121 changes: 120 additions & 1 deletion alibabacloud_credentials/credentials.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,13 @@
import calendar
import json
import time
from urllib.parse import urlparse, parse_qs

from alibabacloud_credentials.utils import auth_constant as ac
from Tea.core import TeaCore
from Tea.request import TeaRequest

from alibabacloud_credentials.utils import auth_constant as ac
from alibabacloud_credentials.exceptions import CredentialException

class Credential:
def get_access_key_id(self):
Expand Down Expand Up @@ -179,6 +185,119 @@ async def get_security_token_async(self):
await self._refresh_credential_async()
return self.security_token

class CredentialsURICredential():
"""CredentialsURICredential"""

def __init__(self, credentials_uri):
self.access_key_id = None
self.access_key_secret = None
self.security_token = None
self.expiration = None
self.credentials_uri = credentials_uri
self.credential_type = ac.CREDENTIALS_URI

def _need_refresh(self):
if self.expiration is None:
return True

return int(time.mktime(time.localtime())) >= (self.expiration - 180)

def _ensure_credential(self):
if self._need_refresh():
self._get_new_credential()

async def _ensure_credential_async(self):
if self._need_refresh():
await self._get_new_credential_async()

def _get_new_credential(self):
r = urlparse(self.credentials_uri)
tea_request = TeaRequest()
tea_request.headers['host'] = r.hostname
tea_request.port = r.port
tea_request.method = 'GET'
tea_request.pathname = r.path
for key, values in parse_qs(r.query).items():
for value in values:
tea_request.query[key] = value
response = TeaCore.do_action(tea_request)
if response.status_code != 200:
raise CredentialException("Get credentials from " + self.credentials_uri + " failed, HttpCode=" + str(response.status_code))
body = response.body.decode('utf-8')

dic = json.loads(body)
content_code = dic.get('Code')
content_access_key_id = dic.get('AccessKeyId')
content_access_key_secret = dic.get('AccessKeySecret')
content_security_token = dic.get('SecurityToken')
content_expiration = dic.get('Expiration')

if content_code != "Success":
raise CredentialException("Get credentials from " + self.credentials_uri + " failed, Code is " + content_code)

# 先转换为时间数组
time_array = time.strptime(content_expiration, "%Y-%m-%dT%H:%M:%SZ")
# 转换为时间戳
time_stamp = calendar.timegm(time_array)
self.access_key_id = content_access_key_id
self.access_key_secret = content_access_key_secret
self.security_token = content_security_token
self.expiration = time_stamp

async def _get_new_credential_async(self):
r = urlparse(self.credentials_uri)
tea_request = TeaRequest()
tea_request.headers['host'] = r.netloc
tea_request.method = 'GET'
tea_request.pathname = r.path
tea_request.query = parse_qs(r.query)
response = await TeaCore.async_do_action(tea_request)
if response.status_code != 200:
raise CredentialException("Get credentials from " + self.credentials_uri + " failed, HttpCode=" + str(response.status_code))
body = response.body.decode('utf-8')

dic = json.loads(body)
content_code = dic.get('Code')
content_access_key_id = dic.get('AccessKeyId')
content_access_key_secret = dic.get('AccessKeySecret')
content_security_token = dic.get('SecurityToken')
content_expiration = dic.get('Expiration')

if content_code != "Success":
raise CredentialException("Get credentials from " + self.credentials_uri + " failed, Code is " + content_code)

# 先转换为时间数组
time_array = time.strptime(content_expiration, "%Y-%m-%dT%H:%M:%SZ")
# 转换为时间戳
time_stamp = calendar.timegm(time_array)
self.access_key_id = content_access_key_id
self.access_key_secret = content_access_key_secret
self.security_token = content_security_token
self.expiration = time_stamp

def get_access_key_id(self):
self._ensure_credential()
return self.access_key_id

def get_access_key_secret(self):
self._ensure_credential()
return self.access_key_secret

def get_security_token(self):
self._ensure_credential()
return self.security_token

async def get_access_key_id_async(self):
await self._ensure_credential_async()
return self.access_key_id

async def get_access_key_secret_async(self):
await self._ensure_credential_async()
return self.access_key_secret

async def get_security_token_async(self):
await self._ensure_credential_async()
return self.security_token

class RsaKeyPairCredential(Credential, _AutomaticallyRefreshCredentials):
def __init__(self, access_key_id, access_key_secret, expiration, provider):
Expand Down
7 changes: 7 additions & 0 deletions alibabacloud_credentials/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ def __init__(
timeout: int = 1000,
connect_timeout: int = 1000,
proxy: str = '',
credentials_uri: str = ''
):
# accesskey id
self.access_key_id = access_key_id
Expand Down Expand Up @@ -57,6 +58,8 @@ def __init__(
self.timeout = timeout
self.connect_timeout = connect_timeout
self.proxy = proxy
# credentials uri
self.credentials_uri = credentials_uri

def validate(self):
pass
Expand Down Expand Up @@ -97,6 +100,8 @@ def to_map(self):
result['connectTimeout'] = self.connect_timeout
if self.proxy is not None:
result['proxy'] = self.proxy
if self.credentials_uri is not None:
result['credentialsUri'] = self.credentials_uri
return result

def from_map(self, m: dict = None):
Expand Down Expand Up @@ -135,4 +140,6 @@ def from_map(self, m: dict = None):
self.connect_timeout = m.get('connectTimeout')
if m.get('proxy') is not None:
self.proxy = m.get('proxy')
if m.get('credentialsUri') is not None:
self.credentials_uri = m.get('credentials_uri')
return self
8 changes: 8 additions & 0 deletions alibabacloud_credentials/providers.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ def __init__(self):
role_name = au.environment_ECSMeta_data
if role_name is not None:
self.user_configuration_providers.append(EcsRamRoleCredentialProvider(role_name))
self.user_configuration_providers.append(CredentialsUriProvider())

def get_credentials(self):
for provider in self.user_configuration_providers:
Expand Down Expand Up @@ -449,3 +450,10 @@ def get_credentials(self):
if len(access_key_secret) == 0:
raise CredentialException("Environment variable accessKeySecret cannot be empty")
return credentials.AccessKeyCredential(access_key_id, access_key_secret)

class CredentialsUriProvider(AlibabaCloudCredentialsProvider):
def get_credentials(self):
credentials_uri = os.environ.get('ALIBABA_CLOUD_CREDENTIALS_URI')
if credentials_uri is None:
return None
return credentials.CredentialsURICredential(credentials_uri)
Loading