Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,5 @@ runtime/
__pycache__/
.pytest_cache/
.coverage
*.pyc
*.pyc
venv/
40 changes: 40 additions & 0 deletions README-CN.md
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,34 @@ security_token = cred.get_security_token()
cred_type = cred.get_type()
```

##### OIDC Role ARN

通过指定[OIDC角色],让凭证自动申请维护 STS Token。你可以通过为 `Policy` 赋值来限制获取到的 STS Token 的权限。

```python
from alibabacloud_credentials.client import Client
from alibabacloud_credentials.models import Config

config = Config(
type='oidc_role_arn', # 凭证类型
access_key_id='accessKeyId', # AccessKeyId
access_key_secret='accessKeySecret', # AccessKeySecret
security_token='securityToken', # STS Token
role_arn='roleArn', # 格式: acs:ram::用户ID:role/角色名
oidc_provider_arn='oidcProviderArn', # 格式: acs:ram::用户Id:oidc-provider/OIDC身份提供商名称
oidc_token_file_path='/Users/xxx/xxx',# 格式: path,可不设,但需要通过设置 ALIBABA_CLOUD_OIDC_TOKEN_FILE 来代替
role_session_name='roleSessionName', # 角色会话名称
policy='policy', # 可选, 限制 STS Token 的权限
role_session_expiration=3600 # 可选, 限制 STS Token 的有效时间
)
cred = Client(config)

access_key_id = cred.get_access_key_id()
access_key_secret = cred.get_access_key_secret()
security_token = cred.get_security_token()
cred_type = cred.get_type()
```

##### ECS RAM Role

通过指定角色名称,让凭证自动申请维护 STS Token
Expand Down Expand Up @@ -229,6 +257,18 @@ role_session_name = session_name # 选填
type = rsa_key_pair # 认证方式为 rsa_key_pair
public_key_id = publicKeyId # Public Key ID
private_key_file = /your/pk.pem # Private Key 文件

[client4] # 命名为 `client4` 的配置
enable = false # 不启用
type = oidc_role_arn # 认证方式为 oidc_role_arn
region_id = cn-test # 获取session用的region
policy = test # 选填 指定权限
access_key_id = foo # 选填
access_key_secret = bar # 选填
role_arn = role_arn
oidc_provider_arn = oidc_provider_arn
oidc_token_file_path = /xxx/xxx # 可通过设置环境变量 ALIBABA_CLOUD_OIDC_TOKEN_FILE 来代替
role_session_name = session_name # 选填
```

3.实例 RAM 角色
Expand Down
40 changes: 40 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,34 @@ security_token = cred.get_security_token()
cred_type = cred.get_type()
```

#### OIDC Role ARN

By specifying [OIDC Role][OIDC Role], the credential will be able to automatically request maintenance of STS Token. If you want to limit the permissions([How to make a policy][policy]) of STS Token, you can assign value for `Policy`.

```python
from alibabacloud_credentials.client import Client
from alibabacloud_credentials.models import Config

config = Config(
type='oidc_role_arn', # credential type
access_key_id='accessKeyId', # AccessKeyId
access_key_secret='accessKeySecret', # AccessKeySecret
security_token='securityToken', # STS Token
role_arn='roleArn', # Format: acs:ram::USER_ID:role/ROLE_NAME
oidc_provider_arn='oidcProviderArn', # Format: acs:ram::USER_Id:oidc-provider/OIDC Providers
oidc_token_file_path='/Users/xxx/xxx',# oidc_token_file_path can be replaced by setting environment variable: ALIBABA_CLOUD_OIDC_TOKEN_FILE
role_session_name='roleSessionName', # Role Session Name
policy='policy', # Not required, limit the permissions of STS Token
role_session_expiration=3600 # Not required, limit the Valid time of STS Token
)
cred = Client(config)

access_key_id = cred.get_access_key_id()
access_key_secret = cred.get_access_key_secret()
security_token = cred.get_security_token()
cred_type = cred.get_type()
```

#### ECS RAM Role

By specifying the role name, the credential will be able to automatically request maintenance of STS Token.
Expand Down Expand Up @@ -225,6 +253,18 @@ role_session_name = session_name # optional
type = rsa_key_pair # Certification type: rsa_key_pair
public_key_id = publicKeyId # Public Key ID
private_key_file = /your/pk.pem # Private Key file

[client4] # configuration that is named as `client4`
enable = false # Disable
type = oidc_role_arn # Certification type: oidc_role_arn
region_id = cn-test
policy = test # optional Specify permissions
access_key_id = foo # optional
access_key_secret = bar # optional
role_arn = role_arn
oidc_provider_arn = oidc_provider_arn
oidc_token_file_path = /xxx/xxx # can be replaced by setting environment variable: ALIBABA_CLOUD_OIDC_TOKEN_FILE
role_session_name = session_name # optional
```

3. Instance RAM Role
Expand Down
7 changes: 7 additions & 0 deletions alibabacloud_credentials/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,13 @@ def get_credential(config):
0,
providers.RsaKeyPairCredentialProvider(config=config)
)
elif config.type == ac.OIDC_ROLE_ARN:
return credentials.OIDCRoleArnCredential(
config.access_key_id,
config.access_key_secret,
config.security_token,
0,
providers.OIDCRoleArnCredentialProvider(config=config))
return providers.DefaultCredentialsProvider().get_credentials()

def get_access_key_id(self):
Expand Down
54 changes: 54 additions & 0 deletions alibabacloud_credentials/credentials.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,60 @@ async def get_security_token_async(self):
await self._refresh_credential_async()
return self.security_token

class OIDCRoleArnCredential(Credential, _AutomaticallyRefreshCredentials):
"""OIDCRoleArnCredential"""

def __init__(self, access_key_id, access_key_secret, security_token, expiration, provider):
super().__init__(expiration, provider)
self.access_key_id = access_key_id
self.access_key_secret = access_key_secret
self.security_token = security_token
self.credential_type = ac.OIDC_ROLE_ARN

def _refresh_credential(self):
credential = super()._refresh_credential()
if credential:
self.access_key_id = credential.access_key_id
self.access_key_secret = credential.access_key_secret
self.expiration = credential.expiration
self.security_token = credential.security_token

async def _refresh_credential_async(self):
credential = None
if self._with_should_refresh():
credential = await self._get_new_credential_async()

if credential:
self.access_key_id = credential.access_key_id
self.access_key_secret = credential.access_key_secret
self.expiration = credential.expiration
self.security_token = credential.security_token

def get_access_key_id(self):
self._refresh_credential()
return self.access_key_id

def get_access_key_secret(self):
self._refresh_credential()
return self.access_key_secret

def get_security_token(self):
self._refresh_credential()
return self.security_token

async def get_access_key_id_async(self):
await self._refresh_credential_async()
return self.access_key_id

async def get_access_key_secret_async(self):
await self._refresh_credential_async()
return self.access_key_secret

async def get_security_token_async(self):
await self._refresh_credential_async()
return self.security_token


class CredentialsURICredential():
"""CredentialsURICredential"""

Expand Down
53 changes: 34 additions & 19 deletions alibabacloud_credentials/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,26 +7,29 @@ class Config(TeaModel):
"""
Model for initing credential
"""

def __init__(
self,
access_key_id: str = '',
access_key_secret: str = '',
security_token: str = '',
bearer_token: str = '',
duration_seconds: int = '',
role_arn: str = '',
policy: str = '',
role_session_expiration: int = '',
role_session_name: str = '',
public_key_id: str = '',
private_key_file: str = '',
role_name: str = '',
type: str = '',
host: str = '',
timeout: int = 1000,
connect_timeout: int = 1000,
proxy: str = '',
credentials_uri: str = ''
self,
access_key_id: str = '',
access_key_secret: str = '',
security_token: str = '',
bearer_token: str = '',
duration_seconds: int = '',
role_arn: str = '',
oidc_provider_arn: str = '',
oidc_token_file_path: str = '',
policy: str = '',
role_session_expiration: int = '',
role_session_name: str = '',
public_key_id: str = '',
private_key_file: str = '',
role_name: str = '',
type: str = '',
host: str = '',
timeout: int = 1000,
connect_timeout: int = 1000,
proxy: str = '',
credentials_uri: str = ''
):
# accesskey id
self.access_key_id = access_key_id
Expand All @@ -40,6 +43,10 @@ def __init__(
self.duration_seconds = duration_seconds
# role arn
self.role_arn = role_arn
# oidc provider arn
self.oidc_provider_arn = oidc_provider_arn
# oidc token file path
self.oidc_token_file_path = oidc_token_file_path
# policy
self.policy = policy
# role session expiration
Expand Down Expand Up @@ -78,6 +85,10 @@ def to_map(self):
result['durationSeconds'] = self.duration_seconds
if self.role_arn is not None:
result['roleArn'] = self.role_arn
if self.oidc_provider_arn is not None:
result['oidcProviderArn'] = self.oidc_provider_arn
if self.oidc_token_file_path is not None:
result['oidcTokenFilePath'] = self.oidc_token_file_path
if self.policy is not None:
result['policy'] = self.policy
if self.role_session_expiration is not None:
Expand Down Expand Up @@ -118,6 +129,10 @@ def from_map(self, m: dict = None):
self.duration_seconds = m.get('durationSeconds')
if m.get('roleArn') is not None:
self.role_arn = m.get('roleArn')
if m.get('oidcProviderArn') is not None:
self.oidc_provider_arn = m.get('oidcProviderArn')
if m.get('oidcTokenFilePath') is not None:
self.oidc_token_file_path = m.get('oidcTokenFilePath')
if m.get('policy') is not None:
self.policy = m.get('policy')
if m.get('roleSessionExpiration') is not None:
Expand Down
Loading