Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions alibabacloud_credentials/provider/oauth.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,8 +159,10 @@ async def _try_refresh_oauth_token_async(self) -> None:
self._access_token_expire = new_access_token_expire

def _refresh_credentials(self) -> RefreshResult[Credentials]:
# OAuth token 必须提前足够时间刷新,确保有效期 >= 15分钟用于后续 exchange 操作
# 设置为20分钟(1200秒)提前量,留有5分钟余量
if self._access_token is None or self._access_token_expire <= 0 or self._access_token_expire - int(
time.mktime(time.localtime())) <= 180:
time.mktime(time.localtime())) <= 1200:
self._try_refresh_oauth_token()

r = urlparse(self._sign_in_url)
Expand Down Expand Up @@ -220,8 +222,10 @@ def _refresh_credentials(self) -> RefreshResult[Credentials]:
stale_time=_get_stale_time(expiration))

async def _refresh_credentials_async(self) -> RefreshResult[Credentials]:
# OAuth token 必须提前足够时间刷新,确保有效期 >= 15分钟用于后续 exchange 操作
# 设置为20分钟(1200秒)提前量,留有5分钟余量
if self._access_token is None or self._access_token_expire <= 0 or self._access_token_expire - int(
time.mktime(time.localtime())) <= 180:
time.mktime(time.localtime())) <= 1200:
await self._try_refresh_oauth_token_async()

r = urlparse(self._sign_in_url)
Expand Down
335 changes: 335 additions & 0 deletions tests/provider/test_oauth.py
Original file line number Diff line number Diff line change
Expand Up @@ -1033,3 +1033,338 @@ async def run_test():
# 验证凭据仍然成功获取
self.assertIsNotNone(credentials)
self.assertEqual(credentials.get_access_key_id(), "test_access_key_id")

@patch('Tea.core.TeaCore.do_action')
def test_oauth_token_refresh_timing_sufficient_time(self, mock_do_action):
"""测试当 OAuth token 剩余时间 > 1200秒时,不触发刷新"""
# 模拟成功的凭据交换响应
mock_response = MagicMock()
mock_response.status_code = 200
mock_response.body = json.dumps({
"AccessKeyId": "test_access_key_id",
"AccessKeySecret": "test_access_key_secret",
"SecurityToken": "test_security_token",
"Expiration": "2030-12-31T23:59:59Z"
}).encode('utf-8')
mock_do_action.return_value = mock_response

current_time = int(time.time())
# access_token 还有 25 分钟(1500秒)过期,大于 1200秒阈值
provider = OAuthCredentialsProvider(
client_id="123",
sign_in_url="https://oauth.aliyun.com",
access_token="valid_access_token",
access_token_expire=current_time + 1500, # 25分钟后过期
refresh_token="test_refresh_token"
)

# 获取凭据
credentials = provider.get_credentials()

# 验证凭据成功获取
self.assertIsNotNone(credentials)
self.assertEqual(credentials.get_access_key_id(), "test_access_key_id")

# 验证只调用了一次(只调用了 /v1/exchange,没有调用 /v1/token 刷新)
self.assertEqual(mock_do_action.call_count, 1)
call_args = mock_do_action.call_args_list[0]
tea_request = call_args[0][0]
self.assertEqual(tea_request.pathname, '/v1/exchange')

# 验证 token 没有被刷新
self.assertEqual(provider._access_token, "valid_access_token")

@patch('Tea.core.TeaCore.do_action')
def test_oauth_token_refresh_timing_insufficient_time(self, mock_do_action):
"""测试当 OAuth token 剩余时间 <= 1200秒时,触发刷新"""
# 模拟两次响应:1. token 刷新,2. 凭据交换
refresh_response = MagicMock()
refresh_response.status_code = 200
refresh_response.body = json.dumps({
"access_token": "new_access_token",
"refresh_token": "new_refresh_token",
"expires_in": 3600
}).encode('utf-8')

exchange_response = MagicMock()
exchange_response.status_code = 200
exchange_response.body = json.dumps({
"AccessKeyId": "test_access_key_id",
"AccessKeySecret": "test_access_key_secret",
"SecurityToken": "test_security_token",
"Expiration": "2030-12-31T23:59:59Z"
}).encode('utf-8')

mock_do_action.side_effect = [refresh_response, exchange_response]

current_time = int(time.time())
# access_token 还有 15 分钟(900秒)过期,小于 1200秒阈值
provider = OAuthCredentialsProvider(
client_id="123",
sign_in_url="https://oauth.aliyun.com",
access_token="old_access_token",
access_token_expire=current_time + 900, # 15分钟后过期
refresh_token="old_refresh_token"
)

# 获取凭据
credentials = provider.get_credentials()

# 验证凭据成功获取
self.assertIsNotNone(credentials)
self.assertEqual(credentials.get_access_key_id(), "test_access_key_id")

# 验证调用了两次:1. /v1/token(刷新),2. /v1/exchange(交换)
self.assertEqual(mock_do_action.call_count, 2)

# 验证第一次调用是 token 刷新
first_call_args = mock_do_action.call_args_list[0]
first_tea_request = first_call_args[0][0]
self.assertEqual(first_tea_request.pathname, '/v1/token')

# 验证第二次调用是凭据交换
second_call_args = mock_do_action.call_args_list[1]
second_tea_request = second_call_args[0][0]
self.assertEqual(second_tea_request.pathname, '/v1/exchange')

# 验证 token 被刷新
self.assertEqual(provider._access_token, "new_access_token")
self.assertEqual(provider._refresh_token, "new_refresh_token")

@patch('Tea.core.TeaCore.do_action')
def test_oauth_token_refresh_timing_exactly_threshold(self, mock_do_action):
"""测试当 OAuth token 剩余时间正好等于 1200秒时,触发刷新"""
# 模拟两次响应:1. token 刷新,2. 凭据交换
refresh_response = MagicMock()
refresh_response.status_code = 200
refresh_response.body = json.dumps({
"access_token": "new_access_token",
"refresh_token": "new_refresh_token",
"expires_in": 3600
}).encode('utf-8')

exchange_response = MagicMock()
exchange_response.status_code = 200
exchange_response.body = json.dumps({
"AccessKeyId": "test_access_key_id",
"AccessKeySecret": "test_access_key_secret",
"SecurityToken": "test_security_token",
"Expiration": "2030-12-31T23:59:59Z"
}).encode('utf-8')

mock_do_action.side_effect = [refresh_response, exchange_response]

current_time = int(time.time())
# access_token 还有正好 1200秒(20分钟)过期
provider = OAuthCredentialsProvider(
client_id="123",
sign_in_url="https://oauth.aliyun.com",
access_token="old_access_token",
access_token_expire=current_time + 1200, # 正好20分钟后过期
refresh_token="old_refresh_token"
)

# 获取凭据
credentials = provider.get_credentials()

# 验证凭据成功获取
self.assertIsNotNone(credentials)

# 验证调用了两次(触发了刷新)
self.assertEqual(mock_do_action.call_count, 2)

# 验证 token 被刷新
self.assertEqual(provider._access_token, "new_access_token")

@patch('Tea.core.TeaCore.async_do_action')
def test_oauth_token_refresh_timing_async_sufficient_time(self, mock_async_do_action):
"""测试异步场景:当 OAuth token 剩余时间 > 1200秒时,不触发刷新"""
# 模拟成功的凭据交换响应
mock_response = MagicMock()
mock_response.status_code = 200
mock_response.body = json.dumps({
"AccessKeyId": "test_access_key_id",
"AccessKeySecret": "test_access_key_secret",
"SecurityToken": "test_security_token",
"Expiration": "2030-12-31T23:59:59Z"
}).encode('utf-8')
mock_async_do_action.return_value = mock_response

current_time = int(time.time())
# access_token 还有 25 分钟(1500秒)过期,大于 1200秒阈值
provider = OAuthCredentialsProvider(
client_id="123",
sign_in_url="https://oauth.aliyun.com",
access_token="valid_access_token",
access_token_expire=current_time + 1500, # 25分钟后过期
refresh_token="test_refresh_token"
)

async def run_test():
return await provider.get_credentials_async()

# 获取凭据
credentials = asyncio.run(run_test())

# 验证凭据成功获取
self.assertIsNotNone(credentials)
self.assertEqual(credentials.get_access_key_id(), "test_access_key_id")

# 验证只调用了一次(只调用了 /v1/exchange,没有调用 /v1/token 刷新)
self.assertEqual(mock_async_do_action.call_count, 1)
call_args = mock_async_do_action.call_args_list[0]
tea_request = call_args[0][0]
self.assertEqual(tea_request.pathname, '/v1/exchange')

# 验证 token 没有被刷新
self.assertEqual(provider._access_token, "valid_access_token")

@patch('Tea.core.TeaCore.async_do_action')
def test_oauth_token_refresh_timing_async_insufficient_time(self, mock_async_do_action):
"""测试异步场景:当 OAuth token 剩余时间 <= 1200秒时,触发刷新"""
# 模拟两次响应:1. token 刷新,2. 凭据交换
refresh_response = MagicMock()
refresh_response.status_code = 200
refresh_response.body = json.dumps({
"access_token": "new_access_token",
"refresh_token": "new_refresh_token",
"expires_in": 3600
}).encode('utf-8')

exchange_response = MagicMock()
exchange_response.status_code = 200
exchange_response.body = json.dumps({
"AccessKeyId": "test_access_key_id",
"AccessKeySecret": "test_access_key_secret",
"SecurityToken": "test_security_token",
"Expiration": "2030-12-31T23:59:59Z"
}).encode('utf-8')

mock_async_do_action.side_effect = [refresh_response, exchange_response]

current_time = int(time.time())
# access_token 还有 10 分钟(600秒)过期,小于 1200秒阈值
provider = OAuthCredentialsProvider(
client_id="123",
sign_in_url="https://oauth.aliyun.com",
access_token="old_access_token",
access_token_expire=current_time + 600, # 10分钟后过期
refresh_token="old_refresh_token"
)

async def run_test():
return await provider.get_credentials_async()

# 获取凭据
credentials = asyncio.run(run_test())

# 验证凭据成功获取
self.assertIsNotNone(credentials)
self.assertEqual(credentials.get_access_key_id(), "test_access_key_id")

# 验证调用了两次:1. /v1/token(刷新),2. /v1/exchange(交换)
self.assertEqual(mock_async_do_action.call_count, 2)

# 验证第一次调用是 token 刷新
first_call_args = mock_async_do_action.call_args_list[0]
first_tea_request = first_call_args[0][0]
self.assertEqual(first_tea_request.pathname, '/v1/token')

# 验证第二次调用是凭据交换
second_call_args = mock_async_do_action.call_args_list[1]
second_tea_request = second_call_args[0][0]
self.assertEqual(second_tea_request.pathname, '/v1/exchange')

# 验证 token 被刷新
self.assertEqual(provider._access_token, "new_access_token")
self.assertEqual(provider._refresh_token, "new_refresh_token")

@patch('Tea.core.TeaCore.do_action')
def test_oauth_token_refresh_timing_edge_case_zero_expire(self, mock_do_action):
"""测试边界情况:access_token_expire 为 0 时触发刷新"""
# 模拟两次响应:1. token 刷新,2. 凭据交换
refresh_response = MagicMock()
refresh_response.status_code = 200
refresh_response.body = json.dumps({
"access_token": "new_access_token",
"refresh_token": "new_refresh_token",
"expires_in": 3600
}).encode('utf-8')

exchange_response = MagicMock()
exchange_response.status_code = 200
exchange_response.body = json.dumps({
"AccessKeyId": "test_access_key_id",
"AccessKeySecret": "test_access_key_secret",
"SecurityToken": "test_security_token",
"Expiration": "2030-12-31T23:59:59Z"
}).encode('utf-8')

mock_do_action.side_effect = [refresh_response, exchange_response]

# access_token_expire 为 0
provider = OAuthCredentialsProvider(
client_id="123",
sign_in_url="https://oauth.aliyun.com",
access_token="old_access_token",
access_token_expire=0, # 无效的过期时间
refresh_token="old_refresh_token"
)

# 获取凭据
credentials = provider.get_credentials()

# 验证凭据成功获取
self.assertIsNotNone(credentials)

# 验证调用了两次(触发了刷新)
self.assertEqual(mock_do_action.call_count, 2)

# 验证 token 被刷新
self.assertEqual(provider._access_token, "new_access_token")

@patch('Tea.core.TeaCore.do_action')
def test_oauth_token_refresh_timing_edge_case_none_token(self, mock_do_action):
"""测试边界情况:access_token 为 None 时触发刷新"""
# 模拟两次响应:1. token 刷新,2. 凭据交换
refresh_response = MagicMock()
refresh_response.status_code = 200
refresh_response.body = json.dumps({
"access_token": "new_access_token",
"refresh_token": "new_refresh_token",
"expires_in": 3600
}).encode('utf-8')

exchange_response = MagicMock()
exchange_response.status_code = 200
exchange_response.body = json.dumps({
"AccessKeyId": "test_access_key_id",
"AccessKeySecret": "test_access_key_secret",
"SecurityToken": "test_security_token",
"Expiration": "2030-12-31T23:59:59Z"
}).encode('utf-8')

mock_do_action.side_effect = [refresh_response, exchange_response]

current_time = int(time.time())
# access_token 为 None
provider = OAuthCredentialsProvider(
client_id="123",
sign_in_url="https://oauth.aliyun.com",
access_token=None, # 无 access_token
access_token_expire=current_time + 3600,
refresh_token="old_refresh_token"
)

# 获取凭据
credentials = provider.get_credentials()

# 验证凭据成功获取
self.assertIsNotNone(credentials)

# 验证调用了两次(触发了刷新)
self.assertEqual(mock_do_action.call_count, 2)

# 验证 token 被刷新
self.assertEqual(provider._access_token, "new_access_token")

Loading