From 825dcea61a431ad5d29ca0faa5e12b9bcf478027 Mon Sep 17 00:00:00 2001 From: asloob qureshi Date: Mon, 15 Apr 2024 12:12:07 -0700 Subject: [PATCH 1/6] Handle lifetime check for v2 tokens differently --- tests/test_bidstream_client.py | 9 +++++---- tests/test_encryption.py | 2 +- tests/test_sharing_client.py | 7 ++++--- tests/test_utils.py | 6 +++--- uid2_client/encryption.py | 28 +++++++++++++++------------- uid2_client/uid2_token_generator.py | 27 +++++++++++++++------------ 6 files changed, 43 insertions(+), 36 deletions(-) diff --git a/tests/test_bidstream_client.py b/tests/test_bidstream_client.py index 56cebbf..644c83b 100644 --- a/tests/test_bidstream_client.py +++ b/tests/test_bidstream_client.py @@ -124,10 +124,11 @@ def test_token_lifetime_too_long_for_bidstream(self): # TokenLifetimeTooLongFor self.assert_fails(result, expected_version, expected_scope) def test_token_generated_in_the_future_to_simulate_clock_skew(self): # TokenGeneratedInTheFutureToSimulateClockSkew + # Note V2 does not have a "token generated" field, therefore v2 tokens can't have a future "token generated" date and are excluded from this test. created_at_future = dt.datetime.now(tz=timezone.utc) + dt.timedelta(minutes=31) #max allowed clock skew is 30m - for expected_scope, expected_version in test_cases_all_scopes_all_versions: + for expected_scope, expected_version in test_cases_all_scopes_v3_v4_versions: with self.subTest(expected_scope=expected_scope, expected_version=expected_version): - token = generate_uid_token(expected_scope, expected_version, created_at=created_at_future) + token = generate_uid_token(expected_scope, expected_version, generated_at=created_at_future) refresh_response = self._client._refresh_json(key_bidstream_response_json_default_keys( expected_scope)) self.assertTrue(refresh_response.success) @@ -164,7 +165,7 @@ def test_token_generated_in_the_future_legacy_client(self): # TokenGeneratedInT with self.subTest(expected_scope=expected_scope, expected_version=expected_version): legacy_client.refresh_json(key_bidstream_response_json_default_keys( expected_scope)) - token = generate_uid_token(expected_scope, expected_version, created_at=created_at_future) + token = generate_uid_token(expected_scope, expected_version, generated_at=created_at_future) result = legacy_client.decrypt(token) self.assert_success(result, expected_version, expected_scope) @@ -246,7 +247,7 @@ def test_token_expiry_custom_decryption_time(self): #TokenExpiryAndCustomNow expires_at = now - dt.timedelta(days=60) created_at = expires_at - dt.timedelta(minutes=1) token = generate_uid_token(IdentityScope.UID2, AdvertisingTokenVersion.ADVERTISING_TOKEN_V4, - created_at=created_at, expires_at=expires_at) + identity_established_at=created_at, expires_at=expires_at) result = self._client._decrypt_token_into_raw_uid(token, None, expires_at + dt.timedelta(seconds=1)) self.assertFalse(result.success) self.assertEqual(result.status, DecryptionStatus.EXPIRED_TOKEN) diff --git a/tests/test_encryption.py b/tests/test_encryption.py index c5849a7..c671376 100644 --- a/tests/test_encryption.py +++ b/tests/test_encryption.py @@ -418,7 +418,7 @@ def test_smoke_token_v3(self): token_expiry = now + dt.timedelta(days=30) if keys.get_token_expiry_seconds() is None \ else now + dt.timedelta(seconds=int(keys.get_token_expiry_seconds())) result = UID2TokenGenerator.generate_uid2_token_v3(uid2, _master_key, _site_id, _site_key, - Params(expiry=token_expiry, token_generated_at=now)) + Params(expiry=token_expiry, token_generated=now)) final = decrypt(result, keys, now=now) self.assertEqual(uid2, final.uid) diff --git a/tests/test_sharing_client.py b/tests/test_sharing_client.py index 602c934..2c64be6 100644 --- a/tests/test_sharing_client.py +++ b/tests/test_sharing_client.py @@ -98,10 +98,11 @@ def test_token_lifetime_too_long_for_sharing(self): # TokenLifetimeTooLongForSh self._test_bidstream_client.assert_fails(result, expected_version, expected_scope) def test_token_generated_in_the_future_to_simulate_clock_skew(self): # TokenGeneratedInTheFutureToSimulateClockSkew + # Note V2 does not have a "token generated" field, therefore v2 tokens can't have a future "token generated" date and are excluded from this test. created_at_future = dt.datetime.now(tz=timezone.utc) + dt.timedelta(minutes=31) #max allowed clock skew is 30m - for expected_scope, expected_version in test_cases_all_scopes_all_versions: + for expected_scope, expected_version in test_cases_all_scopes_v3_v4_versions: with self.subTest(expected_scope=expected_scope, expected_version=expected_version): - token = generate_uid_token(expected_scope, expected_version, created_at=created_at_future) + token = generate_uid_token(expected_scope, expected_version, generated_at=created_at_future) refresh_response = self._client._refresh_json(key_sharing_response_json_default_keys( expected_scope)) self.assertTrue(refresh_response.success) @@ -151,7 +152,7 @@ def test_token_generated_in_the_future_legacy_client(self): # TokenGeneratedInT with self.subTest(expected_scope=expected_scope, expected_version=expected_version): legacy_client.refresh_json(key_sharing_response_json_default_keys( expected_scope)) - token = generate_uid_token(expected_scope, expected_version, created_at=created_at_future) + token = generate_uid_token(expected_scope, expected_version, generated_at=created_at_future) result = legacy_client.decrypt(token) self._test_bidstream_client.assert_success(result, expected_version, expected_scope) diff --git a/tests/test_utils.py b/tests/test_utils.py index 2a43cb3..528953a 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -114,6 +114,6 @@ def create_default_key_collection(key_set): 99999, 2) -def generate_uid_token(identity_scope, version, raw_uid=example_uid, created_at=None, expires_at=None): - return UID2TokenGenerator.generate_uid_token(raw_uid, master_key, site_id, site_key, - identity_scope, version, created_at, expires_at) +def generate_uid_token(identity_scope, version, raw_uid=example_uid, identity_established_at=None, generated_at=None, expires_at=None): + return UID2TokenGenerator.generate_uid_token(raw_uid, master_key, site_id, site_key, identity_scope, version, + identity_established_at, generated_at, expires_at) diff --git a/uid2_client/encryption.py b/uid2_client/encryption.py index fcf7999..36e5485 100644 --- a/uid2_client/encryption.py +++ b/uid2_client/encryption.py @@ -111,7 +111,9 @@ def _decrypt_token(token, keys, domain_name, client_type, now): return DecryptedToken.make_error(DecryptionStatus.VERSION_NOT_SUPPORTED) -def _token_has_valid_lifetime(keys, client_type, established, expires, now): +def _token_has_valid_lifetime(keys, client_type, generated_or_now, expires, now): + # generated_or_now allows "now" for token v2, since v2 does not contain a "token generated" field. + # v2 therefore checks against remaining lifetime rather than total lifetime if client_type is ClientType.BIDSTREAM: max_life_time_seconds = keys.get_max_bidstream_lifetime_seconds() elif client_type is ClientType.SHARING: @@ -119,10 +121,10 @@ def _token_has_valid_lifetime(keys, client_type, established, expires, now): else: return True # Skip check for legacy clients - if (expires - established).total_seconds() > max_life_time_seconds: + if (expires - generated_or_now).total_seconds() > max_life_time_seconds: return False - elif established > now: - return (established - now).total_seconds() <= keys.get_allow_clock_skew_seconds() + elif generated_or_now > now: + return (generated_or_now - now).total_seconds() <= keys.get_allow_clock_skew_seconds() else: return True @@ -165,7 +167,7 @@ def _decrypt_token_v2(token_bytes, keys, domain_name, client_type, now): established_ms = int.from_bytes(identity[idx:idx + 8], 'big') established = dt.datetime.fromtimestamp(established_ms / 1000.0, tz=timezone.utc) - if not _token_has_valid_lifetime(keys, client_type, established, expires, now): + if not _token_has_valid_lifetime(keys, client_type, now, expires, now): return DecryptedToken(DecryptionStatus.INVALID_TOKEN_LIFETIME, id_str, established, site_id, site_key.site_id, keys.get_identity_scope(), None, AdvertisingTokenVersion.ADVERTISING_TOKEN_V2, False, expires) @@ -199,7 +201,7 @@ def _decrypt_token_v3(token_bytes, keys, domain_name, client_type, now, token_ve return DecryptedToken(DecryptionStatus.EXPIRED_TOKEN, None, None, None, None, keys.get_identity_scope(), identity_type, token_version, None, expires) - # created 8:16 + generated_ms = int.from_bytes(master_payload[8:16], 'big') # Token Generated # operator site id 16:20 # operator type 20 # operator version 21:25 @@ -219,6 +221,10 @@ def _decrypt_token_v3(token_bytes, keys, domain_name, client_type, now, token_ve # privacy bits 16:20 privacy_bits = bitarray() privacy_bits.frombytes(site_payload[16:20]) + established_ms = int.from_bytes(site_payload[20:28], 'big') + id_bytes = site_payload[36:] + id_str = base64.b64encode(id_bytes).decode('ascii') + is_client_side_generated = False if privacy_bits[1]: is_client_side_generated = True @@ -227,17 +233,13 @@ def _decrypt_token_v3(token_bytes, keys, domain_name, client_type, now, token_ve return DecryptedToken(DecryptionStatus.DOMAIN_NAME_CHECK_FAILED, None, None, site_id, site_key.site_id, keys.get_identity_scope(), identity_type, token_version, is_client_side_generated, expires) - established_ms = int.from_bytes(site_payload[20:28], 'big') established = dt.datetime.fromtimestamp(established_ms / 1000.0, tz=timezone.utc) - # refreshed_ms 28:36 + generated = dt.datetime.fromtimestamp(generated_ms / 1000.0, tz=timezone.utc) - if not _token_has_valid_lifetime(keys, client_type, established, expires, now): + if not _token_has_valid_lifetime(keys, client_type, generated, expires, now): return DecryptedToken(DecryptionStatus.INVALID_TOKEN_LIFETIME, None, established, site_id, site_key.site_id, keys.get_identity_scope(), identity_type, token_version, is_client_side_generated, expires) - id_bytes = site_payload[36:] - id_str = base64.b64encode(id_bytes).decode('ascii') - return DecryptedToken(DecryptionStatus.SUCCESS, id_str, established, site_id, site_key.site_id, keys.get_identity_scope(), identity_type, token_version, is_client_side_generated, expires) @@ -286,7 +288,7 @@ def encrypt(uid2, identity_scope, keys, keyset_id=None, **kwargs): if identity_scope is None: identity_scope = keys.get_identity_scope() try: - params = Params(expiry=token_expiry, identity_scope=identity_scope, token_generated_at=now) + params = Params(expiry=token_expiry, identity_scope=identity_scope, token_generated=now) return EncryptionDataResponse.make_success(UID2TokenGenerator.generate_uid2_token_v4(uid2, master_key, site_id, key, params)) except Exception: return EncryptionDataResponse.make_error(EncryptionStatus.ENCRYPTION_FAILURE) diff --git a/uid2_client/uid2_token_generator.py b/uid2_client/uid2_token_generator.py index c68ee53..a873bea 100644 --- a/uid2_client/uid2_token_generator.py +++ b/uid2_client/uid2_token_generator.py @@ -51,10 +51,12 @@ def _encrypt_data_v1(data, key, iv): class Params: def __init__(self, expiry=dt.datetime.now(tz=timezone.utc) + dt.timedelta(hours=1), - identity_scope=IdentityScope.UID2.value, token_generated_at=dt.datetime.now(tz=timezone.utc)): - self.identity_scope = identity_scope + identity_scope=IdentityScope.UID2.value, token_generated=dt.datetime.now(tz=timezone.utc), + identity_established=dt.datetime.now(tz=timezone.utc)): self.token_expiry = expiry - self.token_generated_at = token_generated_at + self.identity_scope = identity_scope + self.token_generated = token_generated + self.identity_established = identity_established if not isinstance(expiry, dt.datetime): self.token_expiry = dt.datetime.now(tz=timezone.utc) + expiry @@ -66,15 +68,14 @@ def default_params(): class UID2TokenGenerator: @staticmethod - def generate_uid2_token_v2(id_str, master_key, site_id, site_key, params = default_params(), version=2): + def generate_uid2_token_v2(id_str, master_key, site_id, site_key, params=default_params(), version=2): id = bytes(id_str, 'utf-8') identity = int.to_bytes(site_id, 4, 'big') identity += int.to_bytes(len(id), 4, 'big') identity += id # old privacy_bits identity += int.to_bytes(0, 4, 'big') - created = params.token_generated_at - identity += int.to_bytes(int(created.timestamp()) * 1000, 8, 'big') + identity += int.to_bytes(int(params.identity_established.timestamp()) * 1000, 8, 'big') identity_iv = bytes([10, 11, 12, 13, 14, 15, 16, 1, 2, 3, 4, 5, 6, 7, 8, 9]) expiry = params.token_expiry master_payload = int.to_bytes(int(expiry.timestamp()) * 1000, 8, 'big') @@ -98,11 +99,13 @@ def generate_uid2_token_v4(id_str, master_key, site_id, site_key, params=default @staticmethod def generate_uid_token(id_str, master_key, site_id, site_key, identity_scope, token_version, - created_at=None, expires_at=None): + identity_established_at=None, generated_at=None, expires_at=None): params = default_params() params.identity_scope = identity_scope - if created_at is not None: - params.token_generated_at = created_at + if identity_established_at is not None: + params.identity_established = identity_established_at + if generated_at is not None: + params.token_generated = generated_at if expires_at is not None: params.token_expiry = expires_at if token_version == AdvertisingTokenVersion.ADVERTISING_TOKEN_V2: @@ -124,13 +127,13 @@ def generate_uid2_token_with_debug_info(id_str, master_key, site_id, site_key, p # User Identity Data site_payload += int.to_bytes(0, length=4, byteorder='big') # privacy bits - generated_at_timestamp = int(params.token_generated_at.timestamp()) * 1000 - site_payload += int.to_bytes(generated_at_timestamp, length=8, byteorder='big') # established + site_payload += int.to_bytes(int(params.identity_established.timestamp()) * 1000, length=8, byteorder='big') # established + generated_at_timestamp = int(params.token_generated.timestamp()) * 1000 site_payload += int.to_bytes(generated_at_timestamp, length=8, byteorder='big') # last refreshed/generated site_payload += base64.b64decode(id_str) master_payload = int.to_bytes(int(params.token_expiry.timestamp()) * 1000, length=8, byteorder='big') # expiry - master_payload += int.to_bytes(generated_at_timestamp, length=8, byteorder='big') # created + master_payload += int.to_bytes(generated_at_timestamp, length=8, byteorder='big') # generated # Operator Identity Data master_payload += int.to_bytes(0, length=4, byteorder='big') # site id From 22c17e832482cf9613d536d43cafb861862df784 Mon Sep 17 00:00:00 2001 From: asloob qureshi Date: Thu, 25 Apr 2024 11:20:39 -0700 Subject: [PATCH 2/6] refactor and rename arguments --- tests/test_bidstream_client.py | 2 +- uid2_client/uid2_token_generator.py | 14 +++++++------- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/tests/test_bidstream_client.py b/tests/test_bidstream_client.py index 644c83b..fdd70de 100644 --- a/tests/test_bidstream_client.py +++ b/tests/test_bidstream_client.py @@ -247,7 +247,7 @@ def test_token_expiry_custom_decryption_time(self): #TokenExpiryAndCustomNow expires_at = now - dt.timedelta(days=60) created_at = expires_at - dt.timedelta(minutes=1) token = generate_uid_token(IdentityScope.UID2, AdvertisingTokenVersion.ADVERTISING_TOKEN_V4, - identity_established_at=created_at, expires_at=expires_at) + generated_at=created_at, expires_at=expires_at) result = self._client._decrypt_token_into_raw_uid(token, None, expires_at + dt.timedelta(seconds=1)) self.assertFalse(result.success) self.assertEqual(result.status, DecryptionStatus.EXPIRED_TOKEN) diff --git a/uid2_client/uid2_token_generator.py b/uid2_client/uid2_token_generator.py index a873bea..f0a23b1 100644 --- a/uid2_client/uid2_token_generator.py +++ b/uid2_client/uid2_token_generator.py @@ -99,15 +99,15 @@ def generate_uid2_token_v4(id_str, master_key, site_id, site_key, params=default @staticmethod def generate_uid_token(id_str, master_key, site_id, site_key, identity_scope, token_version, - identity_established_at=None, generated_at=None, expires_at=None): + identity_established=None, token_generated=None, token_expiry=None): params = default_params() params.identity_scope = identity_scope - if identity_established_at is not None: - params.identity_established = identity_established_at - if generated_at is not None: - params.token_generated = generated_at - if expires_at is not None: - params.token_expiry = expires_at + if identity_established is not None: + params.identity_established = identity_established + if token_generated is not None: + params.token_generated = token_generated + if token_expiry is not None: + params.token_expiry = token_expiry if token_version == AdvertisingTokenVersion.ADVERTISING_TOKEN_V2: return UID2TokenGenerator.generate_uid2_token_v2(id_str, master_key, site_id, site_key, params) elif token_version == AdvertisingTokenVersion.ADVERTISING_TOKEN_V3: From aec489dc55328bf5495efaf39a26b08721002611 Mon Sep 17 00:00:00 2001 From: asloob qureshi Date: Thu, 25 Apr 2024 13:59:37 -0700 Subject: [PATCH 3/6] Add unit tests --- tests/test_bidstream_client.py | 31 ++++++++++++++++++++++++++----- tests/test_sharing_client.py | 27 +++++++++++++++++++++++---- 2 files changed, 49 insertions(+), 9 deletions(-) diff --git a/tests/test_bidstream_client.py b/tests/test_bidstream_client.py index fdd70de..94897d7 100644 --- a/tests/test_bidstream_client.py +++ b/tests/test_bidstream_client.py @@ -88,10 +88,13 @@ def _decrypt_and_assert_success(self, token, token_version, scope): def setUp(self): self._client = BidstreamClient(self._CONST_BASE_URL, self._CONST_API_KEY, client_secret) - def test_smoke_test(self): # SmokeTest + def test_smoke_test_for_bidstream(self): # SmokeTestForBidstream for expected_scope, expected_version in test_cases_all_scopes_all_versions: with self.subTest(expected_scope=expected_scope, expected_version=expected_version): - token = generate_uid_token(expected_scope, expected_version) + token = generate_uid_token(expected_scope, expected_version, + identity_established_at=now - dt.timedelta(days=120), + generated_at=YESTERDAY, + expires_at=IN_2_DAYS) refresh_response = self._client._refresh_json(key_bidstream_response_json_default_keys( expected_scope)) self.assertTrue(refresh_response.success) @@ -112,11 +115,29 @@ def test_phone_uids(self): # PhoneTest self.assertEqual(result.identity_scope, expected_scope) self.assertEqual(result.advertising_token_version, expected_version) - def test_token_lifetime_too_long_for_bidstream(self): # TokenLifetimeTooLongForBidstream - expires_in_sec = IN_3_DAYS + dt.timedelta(minutes=1) + def test_token_lifetime_too_long_for_bidstream_but_remaining_lifetime_allowed(self): # TokenLifetimeTooLongForBidstreamButRemainingLifetimeAllowed + generated = YESTERDAY + expires_in_sec = generated + dt.timedelta(days=3) + dt.timedelta(minutes=1) for expected_scope, expected_version in test_cases_all_scopes_all_versions: with self.subTest(expected_scope=expected_scope, expected_version=expected_version): - token = generate_uid_token(expected_scope, expected_version, expires_at=expires_in_sec) + token = generate_uid_token(expected_scope, expected_version, generated_at=generated, + expires_at=expires_in_sec) + refresh_response = self._client._refresh_json(key_bidstream_response_json_default_keys( + expected_scope)) + self.assertTrue(refresh_response.success) + result = self._client.decrypt_token_into_raw_uid(token, None) + if expected_version == AdvertisingTokenVersion.ADVERTISING_TOKEN_V2: + self.assert_success(result, expected_version, expected_scope) + else: + self.assert_fails(result, expected_version, expected_scope) + + def test_token_remaining_lifetime_too_long_for_bidstream(self): # TokenRemainingLifetimeTooLongForBidstream + generated = now + expires_in_sec = generated + dt.timedelta(days=3) + dt.timedelta(minutes=1) + for expected_scope, expected_version in test_cases_all_scopes_v3_v4_versions: + with self.subTest(expected_scope=expected_scope, expected_version=expected_version): + token = generate_uid_token(expected_scope, expected_version, generated_at=generated, + expires_at=expires_in_sec) refresh_response = self._client._refresh_json(key_bidstream_response_json_default_keys( expected_scope)) self.assertTrue(refresh_response.success) diff --git a/tests/test_sharing_client.py b/tests/test_sharing_client.py index 2c64be6..f55aec3 100644 --- a/tests/test_sharing_client.py +++ b/tests/test_sharing_client.py @@ -77,17 +77,35 @@ def decrypt_and_assert_success(self, token, token_version, scope): decrypted = self._client.decrypt_token_into_raw_uid(token) self._test_bidstream_client.assert_success(decrypted, token_version, scope) - def test_smoke_test(self): # SmokeTest + def test_smoke_test_for_sharing(self): # SmokeTestForSharing for expected_scope, expected_version in test_cases_all_scopes_all_versions: with self.subTest(expected_scope=expected_scope, expected_version=expected_version): - token = generate_uid_token(expected_scope, expected_version) + token = generate_uid_token(expected_scope, expected_version, + identity_established_at=now - dt.timedelta(days=120), + generated_at=YESTERDAY, expires_at=now + dt.timedelta(days=29)) refresh_response = self._client._refresh_json(key_sharing_response_json_default_keys( expected_scope)) self.assertTrue(refresh_response.success) self.decrypt_and_assert_success(token, expected_version, expected_scope) + def test_token_lifetime_too_long_for_sharing_but_remaining_lifetime_allowed(self): # TokenLifetimeTooLongForSharingButRemainingLifetimeAllowed + generated = YESTERDAY + expires_in_sec = generated + dt.timedelta(days=31) + for expected_scope, expected_version in test_cases_all_scopes_all_versions: + with self.subTest(expected_scope=expected_scope, expected_version=expected_version): + token = generate_uid_token(expected_scope, expected_version, generated_at=generated, + expires_at=expires_in_sec) + refresh_response = self._client._refresh_json(key_sharing_response_json_default_keys( + expected_scope)) + self.assertTrue(refresh_response.success) + result = self._client.decrypt_token_into_raw_uid(token) + if expected_version == AdvertisingTokenVersion.ADVERTISING_TOKEN_V2: + self._test_bidstream_client.assert_success(result, expected_version, expected_scope) + else: + self._test_bidstream_client.assert_fails(result, expected_version, expected_scope) + def test_token_lifetime_too_long_for_sharing(self): # TokenLifetimeTooLongForSharing - expires_in_sec = dt.datetime.now(tz=timezone.utc) + dt.timedelta(days=31) + expires_in_sec = now + dt.timedelta(days=30) + dt.timedelta(minutes=1) for expected_scope, expected_version in test_cases_all_scopes_all_versions: with self.subTest(expected_scope=expected_scope, expected_version=expected_version): token = generate_uid_token(expected_scope, expected_version, expires_at=expires_in_sec) @@ -277,7 +295,8 @@ def test_expiry_in_token_matches_expiry_in_response(self): # ExpiryInTokenMatch self.assertTrue(result.status) self.assertEqual(example_uid, result.uid) - future_decryption_result = self._client._decrypt_token_into_raw_uid(encryption_data_response.encrypted_data, now + dt.timedelta(seconds=3)) + future_decryption_result = self._client._decrypt_token_into_raw_uid(encryption_data_response.encrypted_data, + now + dt.timedelta(seconds=3)) self.assertFalse(future_decryption_result.success) self.assertEqual(DecryptionStatus.EXPIRED_TOKEN, future_decryption_result.status) self.assertEqual(now + dt.timedelta(seconds=2), future_decryption_result.expiry) From c3b930f6f9d1f5164ea48a7d1916df0ea54c4995 Mon Sep 17 00:00:00 2001 From: asloob qureshi Date: Thu, 25 Apr 2024 14:14:58 -0700 Subject: [PATCH 4/6] refactor --- tests/test_bidstream_client.py | 49 ++++++++--------------- tests/test_sharing_client.py | 71 ++++++++++++++-------------------- 2 files changed, 45 insertions(+), 75 deletions(-) diff --git a/tests/test_bidstream_client.py b/tests/test_bidstream_client.py index 94897d7..230b266 100644 --- a/tests/test_bidstream_client.py +++ b/tests/test_bidstream_client.py @@ -60,6 +60,10 @@ class TestBidStreamClient(unittest.TestCase): _CONST_BASE_URL = 'base_url' _CONST_API_KEY = 'api_key' + def refresh(self, refresh_json): + refresh_response = self._client._refresh_json(refresh_json) + self.assertTrue(refresh_response.success) + def assert_success(self, decryption_response, token_version, scope): self.assertTrue(decryption_response.success) self.assertEqual(decryption_response.uid, example_uid) @@ -95,17 +99,13 @@ def test_smoke_test_for_bidstream(self): # SmokeTestForBidstream identity_established_at=now - dt.timedelta(days=120), generated_at=YESTERDAY, expires_at=IN_2_DAYS) - refresh_response = self._client._refresh_json(key_bidstream_response_json_default_keys( - expected_scope)) - self.assertTrue(refresh_response.success) + self.refresh(key_bidstream_response_json_default_keys(expected_scope)) self._decrypt_and_assert_success(token, expected_version, expected_scope) def test_phone_uids(self): # PhoneTest for expected_scope, expected_version in test_cases_all_scopes_v3_v4_versions: with self.subTest(expected_scope=expected_scope, expected_version=expected_version): - refresh_response = self._client._refresh_json(key_bidstream_response_json_default_keys( - expected_scope)) - self.assertTrue(refresh_response.success) + self.refresh(key_bidstream_response_json_default_keys(expected_scope)) token = generate_uid_token(expected_scope, expected_version, phone_uid) self.assertEqual(IdentityType.Phone, get_identity_type(token)) result = self._client.decrypt_token_into_raw_uid(token, None) @@ -122,9 +122,7 @@ def test_token_lifetime_too_long_for_bidstream_but_remaining_lifetime_allowed(se with self.subTest(expected_scope=expected_scope, expected_version=expected_version): token = generate_uid_token(expected_scope, expected_version, generated_at=generated, expires_at=expires_in_sec) - refresh_response = self._client._refresh_json(key_bidstream_response_json_default_keys( - expected_scope)) - self.assertTrue(refresh_response.success) + self.refresh(key_bidstream_response_json_default_keys(expected_scope)) result = self._client.decrypt_token_into_raw_uid(token, None) if expected_version == AdvertisingTokenVersion.ADVERTISING_TOKEN_V2: self.assert_success(result, expected_version, expected_scope) @@ -138,9 +136,7 @@ def test_token_remaining_lifetime_too_long_for_bidstream(self): # TokenRemainin with self.subTest(expected_scope=expected_scope, expected_version=expected_version): token = generate_uid_token(expected_scope, expected_version, generated_at=generated, expires_at=expires_in_sec) - refresh_response = self._client._refresh_json(key_bidstream_response_json_default_keys( - expected_scope)) - self.assertTrue(refresh_response.success) + self.refresh(key_bidstream_response_json_default_keys(expected_scope)) result = self._client.decrypt_token_into_raw_uid(token, None) self.assert_fails(result, expected_version, expected_scope) @@ -150,9 +146,7 @@ def test_token_generated_in_the_future_to_simulate_clock_skew(self): # TokenGen for expected_scope, expected_version in test_cases_all_scopes_v3_v4_versions: with self.subTest(expected_scope=expected_scope, expected_version=expected_version): token = generate_uid_token(expected_scope, expected_version, generated_at=created_at_future) - refresh_response = self._client._refresh_json(key_bidstream_response_json_default_keys( - expected_scope)) - self.assertTrue(refresh_response.success) + self.refresh(key_bidstream_response_json_default_keys(expected_scope)) result = self._client.decrypt_token_into_raw_uid(token, None) self.assert_fails(result, expected_version, expected_scope) self.assertEqual(result.status, DecryptionStatus.INVALID_TOKEN_LIFETIME) @@ -162,17 +156,14 @@ def test_token_generated_in_the_future_within_allowed_clock_skew(self): # Token for expected_scope, expected_version in test_cases_all_scopes_all_versions: with self.subTest(expected_scope=expected_scope, expected_version=expected_version): token = generate_uid_token(expected_scope, expected_version, expires_at=created_at_future) - refresh_response = self._client._refresh_json(key_bidstream_response_json_default_keys( - expected_scope)) - self.assertTrue(refresh_response.success) + self.refresh(key_bidstream_response_json_default_keys(expected_scope)) self._decrypt_and_assert_success(token, expected_version, expected_scope) def test_legacy_response_from_old_operator(self): test_cases = [AdvertisingTokenVersion.ADVERTISING_TOKEN_V2, AdvertisingTokenVersion.ADVERTISING_TOKEN_V3, AdvertisingTokenVersion.ADVERTISING_TOKEN_V4] - refresh_response = self._client._refresh_json(key_set_to_json_for_sharing([master_key, site_key])) - self.assertTrue(refresh_response.success) + self.refresh(key_set_to_json_for_sharing([master_key, site_key])) for token_version in test_cases: with self.subTest(token_version=token_version): token = generate_uid_token(IdentityScope.UID2, token_version) @@ -212,9 +203,7 @@ def test_identity_scope_and_types(self): # IdentityScopeAndType_TestCases for uid, identity_scope, identity_type in test_cases: with self.subTest(identity_scope=identity_scope, identity_type=identity_type): token = generate_uid_token(identity_scope, AdvertisingTokenVersion.ADVERTISING_TOKEN_V4) - refresh_response = self._client._refresh_json(key_bidstream_response_json_default_keys( - identity_scope)) - self.assertTrue(refresh_response.success) + self.refresh(key_bidstream_response_json_default_keys(identity_scope)) self._decrypt_and_assert_success(token, AdvertisingTokenVersion.ADVERTISING_TOKEN_V4, identity_scope) def test_empty_keys(self): # EmptyKeyContainer @@ -229,9 +218,7 @@ def test_master_key_expired(self): #ExpiredKeyContainer site_key_expired = EncryptionKey(site_key_id, site_id, created=now, activates=now - dt.timedelta(hours=2), expires=now - dt.timedelta(hours=1), secret=site_secret, keyset_id=99999) - refresh_response = self._client._refresh_json(key_bidstream_response_json( - [master_key_expired, site_key_expired])) - self.assertTrue(refresh_response.success) + self.refresh(key_bidstream_response_json([master_key_expired, site_key_expired])) result = self._client.decrypt_token_into_raw_uid(example_uid, None) self.assertFalse(result.success) @@ -241,9 +228,7 @@ def test_not_authorized_for_master_key(self): #NotAuthorizedForMasterKey another_master_key = EncryptionKey(master_key_id + site_key_id + 1, -1, created=now, activates=now, expires=now + dt.timedelta(hours=1), secret=master_secret) another_site_key = EncryptionKey(master_key_id + site_key_id + 2, site_id, created=now, activates=now, expires=now + dt.timedelta(hours=1), secret=site_secret) - refresh_response = self._client._refresh_json(key_bidstream_response_json( - [another_master_key, another_site_key])) - self.assertTrue(refresh_response.success) + self.refresh(key_bidstream_response_json([another_master_key, another_site_key])) token = generate_uid_token(IdentityScope.UID2, AdvertisingTokenVersion.ADVERTISING_TOKEN_V4) result = self._client.decrypt_token_into_raw_uid(token, None) @@ -251,8 +236,7 @@ def test_not_authorized_for_master_key(self): #NotAuthorizedForMasterKey self.assertEqual(result.status, DecryptionStatus.NOT_AUTHORIZED_FOR_MASTER_KEY) def test_invalid_payload(self): #InvalidPayload - refresh_response = self._client._refresh_json(key_bidstream_response_json_default_keys()) - self.assertTrue(refresh_response.success) + self.refresh(key_bidstream_response_json_default_keys()) token = generate_uid_token(IdentityScope.UID2, AdvertisingTokenVersion.ADVERTISING_TOKEN_V4) payload = Uid2Base64UrlCoder.decode(token) bad_token = base64.urlsafe_b64encode(payload[:0]) @@ -262,8 +246,7 @@ def test_invalid_payload(self): #InvalidPayload self.assertEqual(result.status, DecryptionStatus.INVALID_PAYLOAD) def test_token_expiry_custom_decryption_time(self): #TokenExpiryAndCustomNow - refresh_response = self._client._refresh_json(key_bidstream_response_json_default_keys()) - self.assertTrue(refresh_response.success) + self.refresh(key_bidstream_response_json_default_keys()) expires_at = now - dt.timedelta(days=60) created_at = expires_at - dt.timedelta(minutes=1) diff --git a/tests/test_sharing_client.py b/tests/test_sharing_client.py index f55aec3..30530bc 100644 --- a/tests/test_sharing_client.py +++ b/tests/test_sharing_client.py @@ -73,6 +73,10 @@ class TestSharingClient(unittest.TestCase): def setUp(self): self._client = SharingClient(self._CONST_BASE_URL, self._CONST_API_KEY, client_secret) + def refresh(self, refresh_json): + refresh_response = self._client._refresh_json(refresh_json) + self.assertTrue(refresh_response.success) + def decrypt_and_assert_success(self, token, token_version, scope): decrypted = self._client.decrypt_token_into_raw_uid(token) self._test_bidstream_client.assert_success(decrypted, token_version, scope) @@ -83,9 +87,8 @@ def test_smoke_test_for_sharing(self): # SmokeTestForSharing token = generate_uid_token(expected_scope, expected_version, identity_established_at=now - dt.timedelta(days=120), generated_at=YESTERDAY, expires_at=now + dt.timedelta(days=29)) - refresh_response = self._client._refresh_json(key_sharing_response_json_default_keys( - expected_scope)) - self.assertTrue(refresh_response.success) + self.refresh(key_sharing_response_json_default_keys(expected_scope)) + self.decrypt_and_assert_success(token, expected_version, expected_scope) def test_token_lifetime_too_long_for_sharing_but_remaining_lifetime_allowed(self): # TokenLifetimeTooLongForSharingButRemainingLifetimeAllowed @@ -95,9 +98,8 @@ def test_token_lifetime_too_long_for_sharing_but_remaining_lifetime_allowed(self with self.subTest(expected_scope=expected_scope, expected_version=expected_version): token = generate_uid_token(expected_scope, expected_version, generated_at=generated, expires_at=expires_in_sec) - refresh_response = self._client._refresh_json(key_sharing_response_json_default_keys( - expected_scope)) - self.assertTrue(refresh_response.success) + self.refresh(key_sharing_response_json_default_keys(expected_scope)) + result = self._client.decrypt_token_into_raw_uid(token) if expected_version == AdvertisingTokenVersion.ADVERTISING_TOKEN_V2: self._test_bidstream_client.assert_success(result, expected_version, expected_scope) @@ -109,9 +111,8 @@ def test_token_lifetime_too_long_for_sharing(self): # TokenLifetimeTooLongForSh for expected_scope, expected_version in test_cases_all_scopes_all_versions: with self.subTest(expected_scope=expected_scope, expected_version=expected_version): token = generate_uid_token(expected_scope, expected_version, expires_at=expires_in_sec) - refresh_response = self._client._refresh_json(key_sharing_response_json_default_keys( - expected_scope)) - self.assertTrue(refresh_response.success) + self.refresh(key_sharing_response_json_default_keys(expected_scope)) + result = self._client.decrypt_token_into_raw_uid(token) self._test_bidstream_client.assert_fails(result, expected_version, expected_scope) @@ -121,9 +122,8 @@ def test_token_generated_in_the_future_to_simulate_clock_skew(self): # TokenGen for expected_scope, expected_version in test_cases_all_scopes_v3_v4_versions: with self.subTest(expected_scope=expected_scope, expected_version=expected_version): token = generate_uid_token(expected_scope, expected_version, generated_at=created_at_future) - refresh_response = self._client._refresh_json(key_sharing_response_json_default_keys( - expected_scope)) - self.assertTrue(refresh_response.success) + self.refresh(key_sharing_response_json_default_keys(expected_scope)) + result = self._client.decrypt_token_into_raw_uid(token) self._test_bidstream_client.assert_fails(result, expected_version, expected_scope) @@ -132,17 +132,17 @@ def test_token_generated_in_the_future_within_allowed_clock_skew(self): # Token for expected_scope, expected_version in test_cases_all_scopes_all_versions: with self.subTest(expected_scope=expected_scope, expected_version=expected_version): token = generate_uid_token(expected_scope, expected_version, expires_at=created_at_future) - refresh_response = self._client._refresh_json(key_sharing_response_json_default_keys( + self.refresh(key_sharing_response_json_default_keys( expected_scope)) - self.assertTrue(refresh_response.success) + self.decrypt_and_assert_success(token, expected_version, expected_scope) def test_phone_uids(self): # PhoneTest for expected_scope, expected_version in test_cases_all_scopes_v3_v4_versions: with self.subTest(expected_scope=expected_scope, expected_version=expected_version): - refresh_response = self._client._refresh_json(key_sharing_response_json_default_keys( + self.refresh(key_sharing_response_json_default_keys( expected_scope)) - self.assertTrue(refresh_response.success) + token = generate_uid_token(expected_scope, expected_version, phone_uid) self.assertEqual(IdentityType.Phone, get_identity_type(token)) result = self._client.decrypt_token_into_raw_uid(token) @@ -155,8 +155,8 @@ def test_legacy_response_from_old_operator(self): test_cases = [AdvertisingTokenVersion.ADVERTISING_TOKEN_V2, AdvertisingTokenVersion.ADVERTISING_TOKEN_V3, AdvertisingTokenVersion.ADVERTISING_TOKEN_V4] - refresh_response = self._client._refresh_json(keyset_to_json_for_sharing([master_key, site_key])) - self.assertTrue(refresh_response.success) + self.refresh(keyset_to_json_for_sharing([master_key, site_key])) + for token_version in test_cases: with self.subTest(token_version=token_version): token = generate_uid_token(IdentityScope.UID2, token_version) @@ -198,13 +198,11 @@ def sharing_encrypt(self, identity_scope=IdentityScope.UID2): def test_client_produces_token_with_correct_prefix(self): #ClientProducesTokenWithCorrectPrefix for expected_scope in [IdentityScope.UID2, IdentityScope.EUID]: with self.subTest(expected_scope=expected_scope): - refresh_response = self._client._refresh_json(keyset_to_json_for_sharing(identity_scope=expected_scope)) - self.assertTrue(refresh_response.success) + self.refresh(keyset_to_json_for_sharing(identity_scope=expected_scope)) self.sharing_encrypt(expected_scope) def sharing_setup_and_encrypt(self): - refresh_response = self._client._refresh_json(keyset_to_json_for_sharing([master_key, site_key])) - self.assertTrue(refresh_response.success) + self.refresh(keyset_to_json_for_sharing([master_key, site_key])) return self.sharing_encrypt() def test_can_encrypt_decrypt_for_sharing(self): #CanEncryptAndDecryptForSharing @@ -235,8 +233,7 @@ def test_uid2_client_produces_uid2_token(self): # Uid2ClientProducesUid2Token self.assertEqual("A", token[0]) def test_raw_uid_produces_correct_identity_type_in_token(self): #RawUidProducesCorrectIdentityTypeInToken - refresh_response = self._client._refresh_json(keyset_to_json_for_sharing()) - self.assertTrue(refresh_response.success) + self.refresh(keyset_to_json_for_sharing()) raw_uids = [[IdentityType.Email, "Q4bGug8t1xjsutKLCNjnb5fTlXSvIQukmahYDJeLBtk="], # v2 +12345678901 [IdentityType.Phone, "BEOGxroPLdcY7LrSiwjY52+X05V0ryELpJmoWAyXiwbZ"], # v3 +12345678901 [IdentityType.Email, "oKg0ZY9ieD/CGMEjAA0kcq+8aUbLMBG0MgCT3kWUnJs="], # v2 test@example.com @@ -258,9 +255,7 @@ def test_raw_uid_produces_correct_identity_type_in_token(self): #RawUidProduces self.assertEqual(identity_type, actual_identity_type) def test_multiple_keys_per_keyset(self): # MultipleKeysPerKeyset - refresh_response = self._client._refresh_json(keyset_to_json_for_sharing([master_key, master_key2, site_key, site_key2])) - self.assertTrue(refresh_response.success) - + self.refresh(keyset_to_json_for_sharing([master_key, master_key2, site_key, site_key2])) sharing_token = self._client.encrypt_raw_uid_into_token(example_uid) result = self._client.decrypt_token_into_raw_uid(sharing_token.encrypted_data) @@ -268,25 +263,21 @@ def test_multiple_keys_per_keyset(self): # MultipleKeysPerKeyset self.assertEqual(example_uid, result.uid) def test_cannot_encrypt_if_no_key_from_default_keyset(self): #CannotEncryptIfNoKeyFromTheDefaultKeyset - refresh_response = self._client._refresh_json(keyset_to_json_for_sharing([master_key])) - self.assertTrue(refresh_response.success) - + self.refresh(keyset_to_json_for_sharing([master_key])) result = self._client.encrypt_raw_uid_into_token(example_uid) self.assertEqual(result.status, EncryptionStatus.NOT_AUTHORIZED_FOR_KEY) def test_cannot_encrypt_if_theres_no_default_keyset_header(self): #CannotEncryptIfTheresNoDefaultKeysetHeader - refresh_response = self._client._refresh_json(key_sharing_response_json([master_key, site_key], identity_scope=IdentityScope.UID2)) - self.assertTrue(refresh_response.success) + self.refresh(key_sharing_response_json([master_key, site_key], identity_scope=IdentityScope.UID2)) self._client.encrypt_raw_uid_into_token(example_uid) result = self._client.encrypt_raw_uid_into_token(example_uid) self.assertEqual(result.status, EncryptionStatus.NOT_AUTHORIZED_FOR_KEY) def test_expiry_in_token_matches_expiry_in_response(self): # ExpiryInTokenMatchesExpiryInResponse - refresh_response = self._client._refresh_json( + self.refresh( key_sharing_response_json([master_key, site_key], identity_scope=IdentityScope.UID2, default_keyset_id=99999, token_expiry_seconds=2)) - self.assertTrue(refresh_response.success) encryption_data_response = self._client.encrypt_raw_uid_into_token(example_uid) self.assertEqual(encryption_data_response.status, EncryptionStatus.SUCCESS) @@ -303,32 +294,28 @@ def test_expiry_in_token_matches_expiry_in_response(self): # ExpiryInTokenMatch def test_encrypt_key_expired(self): #EncryptKeyExpired expired_key = EncryptionKey(site_key_id, site_id, created=now, activates=now, expires=YESTERDAY, secret=site_secret) - refresh_response = self._client._refresh_json(keyset_to_json_for_sharing([master_key, expired_key])) - self.assertTrue(refresh_response.success) + self.refresh(keyset_to_json_for_sharing([master_key, expired_key])) result = self._client.encrypt_raw_uid_into_token(example_uid) self.assertEqual(result.status, EncryptionStatus.NOT_AUTHORIZED_FOR_KEY) def test_encrypt_key_inactive(self): #EncryptKeyInactive inactive_key = EncryptionKey(site_key_id, site_id, now, TOMORROW, IN_2_DAYS, site_secret) - refresh_response = self._client._refresh_json(keyset_to_json_for_sharing([master_key, inactive_key])) - self.assertTrue(refresh_response.success) + self.refresh(keyset_to_json_for_sharing([master_key, inactive_key])) result = self._client.encrypt_raw_uid_into_token(example_uid) self.assertEqual(result.status, EncryptionStatus.NOT_AUTHORIZED_FOR_KEY) def test_encrypt_site_key_expired(self): #EncryptSiteKeyExpired expired_key = EncryptionKey(site_key_id, site_id, created=now, activates=now, expires=YESTERDAY, secret=site_secret) - refresh_response = self._client._refresh_json(keyset_to_json_for_sharing([master_key, expired_key])) - self.assertTrue(refresh_response.success) + self.refresh(keyset_to_json_for_sharing([master_key, expired_key])) result = self._client.encrypt_raw_uid_into_token(example_uid) self.assertEqual(result.status, EncryptionStatus.NOT_AUTHORIZED_FOR_KEY) def test_encrypt_site_key_inactive(self): #EncryptSiteKeyInactive inactive_key = EncryptionKey(site_key_id, site_id, now, TOMORROW, IN_2_DAYS, site_secret) - refresh_response = self._client._refresh_json(keyset_to_json_for_sharing([master_key, inactive_key])) - self.assertTrue(refresh_response.success) + self.refresh(keyset_to_json_for_sharing([master_key, inactive_key])) result = self._client.encrypt_raw_uid_into_token(example_uid) self.assertEqual(result.status, EncryptionStatus.NOT_AUTHORIZED_FOR_KEY) From 5efe9ad4f501950dc5488f3f3067c0c733d49326 Mon Sep 17 00:00:00 2001 From: asloob qureshi Date: Fri, 26 Apr 2024 12:15:51 -0700 Subject: [PATCH 5/6] Fix issues due to resolving default arguments --- tests/test_encryption.py | 13 +++++---- uid2_client/auto_refresh.py | 2 +- uid2_client/uid2_token_generator.py | 43 ++++++++++++++--------------- 3 files changed, 30 insertions(+), 28 deletions(-) diff --git a/tests/test_encryption.py b/tests/test_encryption.py index c671376..f79e651 100644 --- a/tests/test_encryption.py +++ b/tests/test_encryption.py @@ -70,6 +70,8 @@ def validate_advertising_token(self, advertising_token_string, identity_scope, i self.assertEqual(-1, advertising_token_string.find("/")) def generate_uid2_token_v4(self, uid, master_key, site_id, site_key, params = Params(), identity_type = IdentityType.Email, identity_scope = IdentityScope.UID2): + if not isinstance(params.token_expiry, dt.datetime): + params.token_expiry = dt.datetime.now(tz=timezone.utc) + params.token_expiry advertising_token = UID2TokenGenerator.generate_uid2_token_v4(uid, master_key, site_id, site_key, params) self.validate_advertising_token(advertising_token, identity_scope, identity_type) return advertising_token @@ -155,7 +157,7 @@ def test_decrypt_token_v4_no_site_key(self): result = decrypt(token, keys) def test_decrypt_token_v4_invalid_version(self): - params = Params(dt.timedelta(hours=1)) + params = Params(dt.datetime.now(tz=timezone.utc) + dt.timedelta(hours=1)) token = UID2TokenGenerator.generate_uid2_token_with_debug_info(_example_id, _master_key, _site_id, _site_key, params, 1) keys = EncryptionKeysCollection([_master_key, _site_key]) @@ -174,7 +176,8 @@ def test_decrypt_token_v4_expired(self): result = decrypt(token, keys) def _generate_v2_token(self, expires_in_seconds): - return UID2TokenGenerator.generate_uid2_token_v2(_example_id, _master_key, _site_id, _site_key, Params(expires_in_seconds)) + return UID2TokenGenerator.generate_uid2_token_v2(_example_id, _master_key, _site_id, _site_key, + Params(dt.datetime.now(tz=timezone.utc) + expires_in_seconds)) def _generate_v4_token(self, expires_in_seconds): return self.generate_uid2_token_v4(_example_id, _master_key, _site_id, _site_key, Params(expires_in_seconds)) @@ -297,7 +300,7 @@ def test_decrypt_token_v3_no_site_key(self): result = decrypt(token, keys) def test_decrypt_token_v3_invalid_version(self): - params = Params(dt.timedelta(hours=1)) + params = Params(dt.datetime.now(tz=timezone.utc) + dt.timedelta(hours=1)) token = UID2TokenGenerator.generate_uid2_token_with_debug_info(_example_id, _master_key, _site_id, _site_key, params, 1) keys = EncryptionKeysCollection([_master_key, _site_key]) @@ -307,7 +310,7 @@ def test_decrypt_token_v3_invalid_version(self): def test_decrypt_token_v3_expired(self): - params = Params(dt.timedelta(seconds=-1)) + params = Params(dt.datetime.now(tz=timezone.utc) + dt.timedelta(seconds=-1)) token = UID2TokenGenerator.generate_uid2_token_v3(_example_id, _master_key, _site_id, _site_key, params) keys = EncryptionKeysCollection([_master_key, _site_key]) @@ -331,7 +334,7 @@ def test_decrypt_token_v3_custom_now(self): def test_decrypt_token_v3_invalid_payload(self): - params = Params(dt.timedelta(seconds=-1)) + params = Params(dt.datetime.now(tz=timezone.utc) + dt.timedelta(seconds=-1)) token = UID2TokenGenerator.generate_uid2_token_v3(_example_id, _master_key, _site_id, _site_key, params) keys = EncryptionKeysCollection([_master_key, _site_key]) diff --git a/uid2_client/auto_refresh.py b/uid2_client/auto_refresh.py index c546957..a60df51 100644 --- a/uid2_client/auto_refresh.py +++ b/uid2_client/auto_refresh.py @@ -119,7 +119,7 @@ def cancel(self): def _try_refresh_keys(self): """Invoke UID2 client to refresh latest keys from the service.""" try: - keys = self._client.refresh() + keys = self._client.refresh_keys() self._result = self._make_success_result(keys) return True except: diff --git a/uid2_client/uid2_token_generator.py b/uid2_client/uid2_token_generator.py index f0a23b1..6b6161b 100644 --- a/uid2_client/uid2_token_generator.py +++ b/uid2_client/uid2_token_generator.py @@ -50,25 +50,29 @@ def _encrypt_data_v1(data, key, iv): class Params: - def __init__(self, expiry=dt.datetime.now(tz=timezone.utc) + dt.timedelta(hours=1), - identity_scope=IdentityScope.UID2.value, token_generated=dt.datetime.now(tz=timezone.utc), - identity_established=dt.datetime.now(tz=timezone.utc)): + def __init__(self, expiry=None, identity_scope=IdentityScope.UID2.value, token_generated=None, + identity_established=None): + now = dt.datetime.now(tz=timezone.utc) + if identity_established is None: + identity_established = now + if token_generated is None: + token_generated = now + if expiry is None: + expiry = now + dt.timedelta(hours=1) + + self.identity_established = identity_established + self.token_generated = token_generated self.token_expiry = expiry self.identity_scope = identity_scope - self.token_generated = token_generated - self.identity_established = identity_established - if not isinstance(expiry, dt.datetime): - self.token_expiry = dt.datetime.now(tz=timezone.utc) + expiry - - -def default_params(): - return Params() class UID2TokenGenerator: @staticmethod - def generate_uid2_token_v2(id_str, master_key, site_id, site_key, params=default_params(), version=2): + def generate_uid2_token_v2(id_str, master_key, site_id, site_key, params=None, version=2): + if params is None: + params = Params() + id = bytes(id_str, 'utf-8') identity = int.to_bytes(site_id, 4, 'big') identity += int.to_bytes(len(id), 4, 'big') @@ -88,26 +92,19 @@ def generate_uid2_token_v2(id_str, master_key, site_id, site_key, params=default return base64.b64encode(token).decode('ascii') @staticmethod - def generate_uid2_token_v3(id_str, master_key, site_id, site_key, params=default_params()): + def generate_uid2_token_v3(id_str, master_key, site_id, site_key, params=None): return UID2TokenGenerator.generate_uid2_token_with_debug_info(id_str, master_key, site_id, site_key, params, AdvertisingTokenVersion.ADVERTISING_TOKEN_V3.value) @staticmethod - def generate_uid2_token_v4(id_str, master_key, site_id, site_key, params=default_params()): + def generate_uid2_token_v4(id_str, master_key, site_id, site_key, params=None): return UID2TokenGenerator.generate_uid2_token_with_debug_info(id_str, master_key, site_id, site_key, params, AdvertisingTokenVersion.ADVERTISING_TOKEN_V4.value) @staticmethod def generate_uid_token(id_str, master_key, site_id, site_key, identity_scope, token_version, identity_established=None, token_generated=None, token_expiry=None): - params = default_params() - params.identity_scope = identity_scope - if identity_established is not None: - params.identity_established = identity_established - if token_generated is not None: - params.token_generated = token_generated - if token_expiry is not None: - params.token_expiry = token_expiry + params = Params(token_expiry, identity_scope, token_generated, identity_established) if token_version == AdvertisingTokenVersion.ADVERTISING_TOKEN_V2: return UID2TokenGenerator.generate_uid2_token_v2(id_str, master_key, site_id, site_key, params) elif token_version == AdvertisingTokenVersion.ADVERTISING_TOKEN_V3: @@ -119,6 +116,8 @@ def generate_uid_token(id_str, master_key, site_id, site_key, identity_scope, to @staticmethod def generate_uid2_token_with_debug_info(id_str, master_key, site_id, site_key, params, version): + if params is None: + params = Params() # Publisher Data site_payload = int.to_bytes(site_id, length=4, byteorder='big') From ac62ec80e25548afd472b2a228dbc336bc6c6750 Mon Sep 17 00:00:00 2001 From: asloob qureshi Date: Mon, 29 Apr 2024 09:45:25 -0700 Subject: [PATCH 6/6] update tests --- tests/test_bidstream_client.py | 18 ++++++++++++++---- tests/test_sharing_client.py | 10 +++++++--- 2 files changed, 21 insertions(+), 7 deletions(-) diff --git a/tests/test_bidstream_client.py b/tests/test_bidstream_client.py index 230b266..110c984 100644 --- a/tests/test_bidstream_client.py +++ b/tests/test_bidstream_client.py @@ -26,11 +26,15 @@ def encode_keys(keys): return key_json -def key_bidstream_response_json(keys, identity_scope=IdentityScope.UID2): +def key_bidstream_response_with_lifetime_json(keys, identity_scope, max_bidstream_lifetime_seconds): + if identity_scope is None: + identity_scope = IdentityScope.UID2 + if max_bidstream_lifetime_seconds is None: + max_bidstream_lifetime_seconds = dt.timedelta(days=3).total_seconds() encoded_keys = encode_keys(keys) json_obj = { "body": { - "max_bidstream_lifetime_seconds": dt.timedelta(days=3).total_seconds(), + "max_bidstream_lifetime_seconds": max_bidstream_lifetime_seconds, "identity_scope": identity_scope.name, "allow_clock_skew_seconds": 1800, # 30 mins "keys": encoded_keys, @@ -52,8 +56,12 @@ def key_bidstream_response_json(keys, identity_scope=IdentityScope.UID2): return json.dumps(json_obj) +def key_bidstream_response_json(keys, identity_scope=IdentityScope.UID2, max_bidstream_lifetime_seconds=None): + return key_bidstream_response_with_lifetime_json(keys, identity_scope, max_bidstream_lifetime_seconds) + + def key_bidstream_response_json_default_keys(identity_scope=IdentityScope.UID2): - return key_bidstream_response_json([master_key, site_key], identity_scope) + return key_bidstream_response_with_lifetime_json([master_key, site_key], identity_scope, None) class TestBidStreamClient(unittest.TestCase): @@ -118,11 +126,13 @@ def test_phone_uids(self): # PhoneTest def test_token_lifetime_too_long_for_bidstream_but_remaining_lifetime_allowed(self): # TokenLifetimeTooLongForBidstreamButRemainingLifetimeAllowed generated = YESTERDAY expires_in_sec = generated + dt.timedelta(days=3) + dt.timedelta(minutes=1) + max_bidstream_lifetime_seconds = dt.timedelta(days=3).total_seconds() for expected_scope, expected_version in test_cases_all_scopes_all_versions: with self.subTest(expected_scope=expected_scope, expected_version=expected_version): token = generate_uid_token(expected_scope, expected_version, generated_at=generated, expires_at=expires_in_sec) - self.refresh(key_bidstream_response_json_default_keys(expected_scope)) + self.refresh(key_bidstream_response_json([master_key, site_key], expected_scope, + max_bidstream_lifetime_seconds)) result = self._client.decrypt_token_into_raw_uid(token, None) if expected_version == AdvertisingTokenVersion.ADVERTISING_TOKEN_V2: self.assert_success(result, expected_version, expected_scope) diff --git a/tests/test_sharing_client.py b/tests/test_sharing_client.py index 30530bc..0c5f48f 100644 --- a/tests/test_sharing_client.py +++ b/tests/test_sharing_client.py @@ -35,7 +35,9 @@ def encode_keys(keys): return key_json -def key_sharing_response_json(keys, identity_scope=IdentityScope.UID2, caller_site_id=site_id, default_keyset_id=None, token_expiry_seconds=2592000): +def key_sharing_response_json(keys, identity_scope=IdentityScope.UID2, caller_site_id=site_id, default_keyset_id=None, token_expiry_seconds=2592000, max_sharing_lifetime_seconds=None): + if max_sharing_lifetime_seconds is None: + max_sharing_lifetime_seconds = dt.timedelta(days=30).total_seconds() encoded_keys = encode_keys(keys) json_obj = { "body": { @@ -44,7 +46,7 @@ def key_sharing_response_json(keys, identity_scope=IdentityScope.UID2, caller_si "token_expiry_seconds": token_expiry_seconds, "identity_scope": identity_scope.name, "allow_clock_skew_seconds": 1800, # 30 mins - "max_sharing_lifetime_seconds": dt.timedelta(days=30).total_seconds(), + "max_sharing_lifetime_seconds": max_sharing_lifetime_seconds, "keys": encoded_keys, "unexpected_header_field": 12345 # ensure new fields can be handled by old SDK versions } @@ -94,11 +96,13 @@ def test_smoke_test_for_sharing(self): # SmokeTestForSharing def test_token_lifetime_too_long_for_sharing_but_remaining_lifetime_allowed(self): # TokenLifetimeTooLongForSharingButRemainingLifetimeAllowed generated = YESTERDAY expires_in_sec = generated + dt.timedelta(days=31) + max_sharing_lifetime_seconds = dt.timedelta(days=30).total_seconds() for expected_scope, expected_version in test_cases_all_scopes_all_versions: with self.subTest(expected_scope=expected_scope, expected_version=expected_version): token = generate_uid_token(expected_scope, expected_version, generated_at=generated, expires_at=expires_in_sec) - self.refresh(key_sharing_response_json_default_keys(expected_scope)) + self.refresh(key_sharing_response_json([master_key, site_key], expected_scope, + max_sharing_lifetime_seconds=max_sharing_lifetime_seconds)) result = self._client.decrypt_token_into_raw_uid(token) if expected_version == AdvertisingTokenVersion.ADVERTISING_TOKEN_V2: