From a9bfd162edf46a937c961e45a92f95af3abf21a7 Mon Sep 17 00:00:00 2001 From: yndu13 Date: Tue, 27 Jan 2026 13:45:09 +0800 Subject: [PATCH] fix: increase OAuth token refresh lead time to prevent exchange failure --- alibabacloud_credentials/provider/oauth.py | 8 +- tests/provider/test_oauth.py | 335 +++++++++++++++++++++ 2 files changed, 341 insertions(+), 2 deletions(-) diff --git a/alibabacloud_credentials/provider/oauth.py b/alibabacloud_credentials/provider/oauth.py index 81eb03e..b5276c8 100644 --- a/alibabacloud_credentials/provider/oauth.py +++ b/alibabacloud_credentials/provider/oauth.py @@ -159,8 +159,10 @@ async def _try_refresh_oauth_token_async(self) -> None: self._access_token_expire = new_access_token_expire def _refresh_credentials(self) -> RefreshResult[Credentials]: + # OAuth token 必须提前足够时间刷新,确保有效期 >= 15分钟用于后续 exchange 操作 + # 设置为20分钟(1200秒)提前量,留有5分钟余量 if self._access_token is None or self._access_token_expire <= 0 or self._access_token_expire - int( - time.mktime(time.localtime())) <= 180: + time.mktime(time.localtime())) <= 1200: self._try_refresh_oauth_token() r = urlparse(self._sign_in_url) @@ -220,8 +222,10 @@ def _refresh_credentials(self) -> RefreshResult[Credentials]: stale_time=_get_stale_time(expiration)) async def _refresh_credentials_async(self) -> RefreshResult[Credentials]: + # OAuth token 必须提前足够时间刷新,确保有效期 >= 15分钟用于后续 exchange 操作 + # 设置为20分钟(1200秒)提前量,留有5分钟余量 if self._access_token is None or self._access_token_expire <= 0 or self._access_token_expire - int( - time.mktime(time.localtime())) <= 180: + time.mktime(time.localtime())) <= 1200: await self._try_refresh_oauth_token_async() r = urlparse(self._sign_in_url) diff --git a/tests/provider/test_oauth.py b/tests/provider/test_oauth.py index aa8907e..d25999d 100644 --- a/tests/provider/test_oauth.py +++ b/tests/provider/test_oauth.py @@ -1033,3 +1033,338 @@ async def run_test(): # 验证凭据仍然成功获取 self.assertIsNotNone(credentials) self.assertEqual(credentials.get_access_key_id(), "test_access_key_id") + + @patch('Tea.core.TeaCore.do_action') + def test_oauth_token_refresh_timing_sufficient_time(self, mock_do_action): + """测试当 OAuth token 剩余时间 > 1200秒时,不触发刷新""" + # 模拟成功的凭据交换响应 + mock_response = MagicMock() + mock_response.status_code = 200 + mock_response.body = json.dumps({ + "AccessKeyId": "test_access_key_id", + "AccessKeySecret": "test_access_key_secret", + "SecurityToken": "test_security_token", + "Expiration": "2030-12-31T23:59:59Z" + }).encode('utf-8') + mock_do_action.return_value = mock_response + + current_time = int(time.time()) + # access_token 还有 25 分钟(1500秒)过期,大于 1200秒阈值 + provider = OAuthCredentialsProvider( + client_id="123", + sign_in_url="https://oauth.aliyun.com", + access_token="valid_access_token", + access_token_expire=current_time + 1500, # 25分钟后过期 + refresh_token="test_refresh_token" + ) + + # 获取凭据 + credentials = provider.get_credentials() + + # 验证凭据成功获取 + self.assertIsNotNone(credentials) + self.assertEqual(credentials.get_access_key_id(), "test_access_key_id") + + # 验证只调用了一次(只调用了 /v1/exchange,没有调用 /v1/token 刷新) + self.assertEqual(mock_do_action.call_count, 1) + call_args = mock_do_action.call_args_list[0] + tea_request = call_args[0][0] + self.assertEqual(tea_request.pathname, '/v1/exchange') + + # 验证 token 没有被刷新 + self.assertEqual(provider._access_token, "valid_access_token") + + @patch('Tea.core.TeaCore.do_action') + def test_oauth_token_refresh_timing_insufficient_time(self, mock_do_action): + """测试当 OAuth token 剩余时间 <= 1200秒时,触发刷新""" + # 模拟两次响应:1. token 刷新,2. 凭据交换 + refresh_response = MagicMock() + refresh_response.status_code = 200 + refresh_response.body = json.dumps({ + "access_token": "new_access_token", + "refresh_token": "new_refresh_token", + "expires_in": 3600 + }).encode('utf-8') + + exchange_response = MagicMock() + exchange_response.status_code = 200 + exchange_response.body = json.dumps({ + "AccessKeyId": "test_access_key_id", + "AccessKeySecret": "test_access_key_secret", + "SecurityToken": "test_security_token", + "Expiration": "2030-12-31T23:59:59Z" + }).encode('utf-8') + + mock_do_action.side_effect = [refresh_response, exchange_response] + + current_time = int(time.time()) + # access_token 还有 15 分钟(900秒)过期,小于 1200秒阈值 + provider = OAuthCredentialsProvider( + client_id="123", + sign_in_url="https://oauth.aliyun.com", + access_token="old_access_token", + access_token_expire=current_time + 900, # 15分钟后过期 + refresh_token="old_refresh_token" + ) + + # 获取凭据 + credentials = provider.get_credentials() + + # 验证凭据成功获取 + self.assertIsNotNone(credentials) + self.assertEqual(credentials.get_access_key_id(), "test_access_key_id") + + # 验证调用了两次:1. /v1/token(刷新),2. /v1/exchange(交换) + self.assertEqual(mock_do_action.call_count, 2) + + # 验证第一次调用是 token 刷新 + first_call_args = mock_do_action.call_args_list[0] + first_tea_request = first_call_args[0][0] + self.assertEqual(first_tea_request.pathname, '/v1/token') + + # 验证第二次调用是凭据交换 + second_call_args = mock_do_action.call_args_list[1] + second_tea_request = second_call_args[0][0] + self.assertEqual(second_tea_request.pathname, '/v1/exchange') + + # 验证 token 被刷新 + self.assertEqual(provider._access_token, "new_access_token") + self.assertEqual(provider._refresh_token, "new_refresh_token") + + @patch('Tea.core.TeaCore.do_action') + def test_oauth_token_refresh_timing_exactly_threshold(self, mock_do_action): + """测试当 OAuth token 剩余时间正好等于 1200秒时,触发刷新""" + # 模拟两次响应:1. token 刷新,2. 凭据交换 + refresh_response = MagicMock() + refresh_response.status_code = 200 + refresh_response.body = json.dumps({ + "access_token": "new_access_token", + "refresh_token": "new_refresh_token", + "expires_in": 3600 + }).encode('utf-8') + + exchange_response = MagicMock() + exchange_response.status_code = 200 + exchange_response.body = json.dumps({ + "AccessKeyId": "test_access_key_id", + "AccessKeySecret": "test_access_key_secret", + "SecurityToken": "test_security_token", + "Expiration": "2030-12-31T23:59:59Z" + }).encode('utf-8') + + mock_do_action.side_effect = [refresh_response, exchange_response] + + current_time = int(time.time()) + # access_token 还有正好 1200秒(20分钟)过期 + provider = OAuthCredentialsProvider( + client_id="123", + sign_in_url="https://oauth.aliyun.com", + access_token="old_access_token", + access_token_expire=current_time + 1200, # 正好20分钟后过期 + refresh_token="old_refresh_token" + ) + + # 获取凭据 + credentials = provider.get_credentials() + + # 验证凭据成功获取 + self.assertIsNotNone(credentials) + + # 验证调用了两次(触发了刷新) + self.assertEqual(mock_do_action.call_count, 2) + + # 验证 token 被刷新 + self.assertEqual(provider._access_token, "new_access_token") + + @patch('Tea.core.TeaCore.async_do_action') + def test_oauth_token_refresh_timing_async_sufficient_time(self, mock_async_do_action): + """测试异步场景:当 OAuth token 剩余时间 > 1200秒时,不触发刷新""" + # 模拟成功的凭据交换响应 + mock_response = MagicMock() + mock_response.status_code = 200 + mock_response.body = json.dumps({ + "AccessKeyId": "test_access_key_id", + "AccessKeySecret": "test_access_key_secret", + "SecurityToken": "test_security_token", + "Expiration": "2030-12-31T23:59:59Z" + }).encode('utf-8') + mock_async_do_action.return_value = mock_response + + current_time = int(time.time()) + # access_token 还有 25 分钟(1500秒)过期,大于 1200秒阈值 + provider = OAuthCredentialsProvider( + client_id="123", + sign_in_url="https://oauth.aliyun.com", + access_token="valid_access_token", + access_token_expire=current_time + 1500, # 25分钟后过期 + refresh_token="test_refresh_token" + ) + + async def run_test(): + return await provider.get_credentials_async() + + # 获取凭据 + credentials = asyncio.run(run_test()) + + # 验证凭据成功获取 + self.assertIsNotNone(credentials) + self.assertEqual(credentials.get_access_key_id(), "test_access_key_id") + + # 验证只调用了一次(只调用了 /v1/exchange,没有调用 /v1/token 刷新) + self.assertEqual(mock_async_do_action.call_count, 1) + call_args = mock_async_do_action.call_args_list[0] + tea_request = call_args[0][0] + self.assertEqual(tea_request.pathname, '/v1/exchange') + + # 验证 token 没有被刷新 + self.assertEqual(provider._access_token, "valid_access_token") + + @patch('Tea.core.TeaCore.async_do_action') + def test_oauth_token_refresh_timing_async_insufficient_time(self, mock_async_do_action): + """测试异步场景:当 OAuth token 剩余时间 <= 1200秒时,触发刷新""" + # 模拟两次响应:1. token 刷新,2. 凭据交换 + refresh_response = MagicMock() + refresh_response.status_code = 200 + refresh_response.body = json.dumps({ + "access_token": "new_access_token", + "refresh_token": "new_refresh_token", + "expires_in": 3600 + }).encode('utf-8') + + exchange_response = MagicMock() + exchange_response.status_code = 200 + exchange_response.body = json.dumps({ + "AccessKeyId": "test_access_key_id", + "AccessKeySecret": "test_access_key_secret", + "SecurityToken": "test_security_token", + "Expiration": "2030-12-31T23:59:59Z" + }).encode('utf-8') + + mock_async_do_action.side_effect = [refresh_response, exchange_response] + + current_time = int(time.time()) + # access_token 还有 10 分钟(600秒)过期,小于 1200秒阈值 + provider = OAuthCredentialsProvider( + client_id="123", + sign_in_url="https://oauth.aliyun.com", + access_token="old_access_token", + access_token_expire=current_time + 600, # 10分钟后过期 + refresh_token="old_refresh_token" + ) + + async def run_test(): + return await provider.get_credentials_async() + + # 获取凭据 + credentials = asyncio.run(run_test()) + + # 验证凭据成功获取 + self.assertIsNotNone(credentials) + self.assertEqual(credentials.get_access_key_id(), "test_access_key_id") + + # 验证调用了两次:1. /v1/token(刷新),2. /v1/exchange(交换) + self.assertEqual(mock_async_do_action.call_count, 2) + + # 验证第一次调用是 token 刷新 + first_call_args = mock_async_do_action.call_args_list[0] + first_tea_request = first_call_args[0][0] + self.assertEqual(first_tea_request.pathname, '/v1/token') + + # 验证第二次调用是凭据交换 + second_call_args = mock_async_do_action.call_args_list[1] + second_tea_request = second_call_args[0][0] + self.assertEqual(second_tea_request.pathname, '/v1/exchange') + + # 验证 token 被刷新 + self.assertEqual(provider._access_token, "new_access_token") + self.assertEqual(provider._refresh_token, "new_refresh_token") + + @patch('Tea.core.TeaCore.do_action') + def test_oauth_token_refresh_timing_edge_case_zero_expire(self, mock_do_action): + """测试边界情况:access_token_expire 为 0 时触发刷新""" + # 模拟两次响应:1. token 刷新,2. 凭据交换 + refresh_response = MagicMock() + refresh_response.status_code = 200 + refresh_response.body = json.dumps({ + "access_token": "new_access_token", + "refresh_token": "new_refresh_token", + "expires_in": 3600 + }).encode('utf-8') + + exchange_response = MagicMock() + exchange_response.status_code = 200 + exchange_response.body = json.dumps({ + "AccessKeyId": "test_access_key_id", + "AccessKeySecret": "test_access_key_secret", + "SecurityToken": "test_security_token", + "Expiration": "2030-12-31T23:59:59Z" + }).encode('utf-8') + + mock_do_action.side_effect = [refresh_response, exchange_response] + + # access_token_expire 为 0 + provider = OAuthCredentialsProvider( + client_id="123", + sign_in_url="https://oauth.aliyun.com", + access_token="old_access_token", + access_token_expire=0, # 无效的过期时间 + refresh_token="old_refresh_token" + ) + + # 获取凭据 + credentials = provider.get_credentials() + + # 验证凭据成功获取 + self.assertIsNotNone(credentials) + + # 验证调用了两次(触发了刷新) + self.assertEqual(mock_do_action.call_count, 2) + + # 验证 token 被刷新 + self.assertEqual(provider._access_token, "new_access_token") + + @patch('Tea.core.TeaCore.do_action') + def test_oauth_token_refresh_timing_edge_case_none_token(self, mock_do_action): + """测试边界情况:access_token 为 None 时触发刷新""" + # 模拟两次响应:1. token 刷新,2. 凭据交换 + refresh_response = MagicMock() + refresh_response.status_code = 200 + refresh_response.body = json.dumps({ + "access_token": "new_access_token", + "refresh_token": "new_refresh_token", + "expires_in": 3600 + }).encode('utf-8') + + exchange_response = MagicMock() + exchange_response.status_code = 200 + exchange_response.body = json.dumps({ + "AccessKeyId": "test_access_key_id", + "AccessKeySecret": "test_access_key_secret", + "SecurityToken": "test_security_token", + "Expiration": "2030-12-31T23:59:59Z" + }).encode('utf-8') + + mock_do_action.side_effect = [refresh_response, exchange_response] + + current_time = int(time.time()) + # access_token 为 None + provider = OAuthCredentialsProvider( + client_id="123", + sign_in_url="https://oauth.aliyun.com", + access_token=None, # 无 access_token + access_token_expire=current_time + 3600, + refresh_token="old_refresh_token" + ) + + # 获取凭据 + credentials = provider.get_credentials() + + # 验证凭据成功获取 + self.assertIsNotNone(credentials) + + # 验证调用了两次(触发了刷新) + self.assertEqual(mock_do_action.call_count, 2) + + # 验证 token 被刷新 + self.assertEqual(provider._access_token, "new_access_token") +