Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 57 additions & 42 deletions tests/test_bidstream_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,15 @@ def encode_keys(keys):
return key_json


def key_bidstream_response_json(keys, identity_scope=IdentityScope.UID2):
def key_bidstream_response_with_lifetime_json(keys, identity_scope, max_bidstream_lifetime_seconds):
if identity_scope is None:
identity_scope = IdentityScope.UID2
if max_bidstream_lifetime_seconds is None:
max_bidstream_lifetime_seconds = dt.timedelta(days=3).total_seconds()
encoded_keys = encode_keys(keys)
json_obj = {
"body": {
"max_bidstream_lifetime_seconds": dt.timedelta(days=3).total_seconds(),
"max_bidstream_lifetime_seconds": max_bidstream_lifetime_seconds,
"identity_scope": identity_scope.name,
"allow_clock_skew_seconds": 1800, # 30 mins
"keys": encoded_keys,
Expand All @@ -52,14 +56,22 @@ def key_bidstream_response_json(keys, identity_scope=IdentityScope.UID2):
return json.dumps(json_obj)


def key_bidstream_response_json(keys, identity_scope=IdentityScope.UID2, max_bidstream_lifetime_seconds=None):
return key_bidstream_response_with_lifetime_json(keys, identity_scope, max_bidstream_lifetime_seconds)


def key_bidstream_response_json_default_keys(identity_scope=IdentityScope.UID2):
return key_bidstream_response_json([master_key, site_key], identity_scope)
return key_bidstream_response_with_lifetime_json([master_key, site_key], identity_scope, None)


class TestBidStreamClient(unittest.TestCase):
_CONST_BASE_URL = 'base_url'
_CONST_API_KEY = 'api_key'

def refresh(self, refresh_json):
refresh_response = self._client._refresh_json(refresh_json)
self.assertTrue(refresh_response.success)

def assert_success(self, decryption_response, token_version, scope):
self.assertTrue(decryption_response.success)
self.assertEqual(decryption_response.uid, example_uid)
Expand Down Expand Up @@ -88,21 +100,20 @@ def _decrypt_and_assert_success(self, token, token_version, scope):
def setUp(self):
self._client = BidstreamClient(self._CONST_BASE_URL, self._CONST_API_KEY, client_secret)

def test_smoke_test(self): # SmokeTest
def test_smoke_test_for_bidstream(self): # SmokeTestForBidstream
for expected_scope, expected_version in test_cases_all_scopes_all_versions:
with self.subTest(expected_scope=expected_scope, expected_version=expected_version):
token = generate_uid_token(expected_scope, expected_version)
refresh_response = self._client._refresh_json(key_bidstream_response_json_default_keys(
expected_scope))
self.assertTrue(refresh_response.success)
token = generate_uid_token(expected_scope, expected_version,
identity_established_at=now - dt.timedelta(days=120),
generated_at=YESTERDAY,
expires_at=IN_2_DAYS)
self.refresh(key_bidstream_response_json_default_keys(expected_scope))
self._decrypt_and_assert_success(token, expected_version, expected_scope)

def test_phone_uids(self): # PhoneTest
for expected_scope, expected_version in test_cases_all_scopes_v3_v4_versions:
with self.subTest(expected_scope=expected_scope, expected_version=expected_version):
refresh_response = self._client._refresh_json(key_bidstream_response_json_default_keys(
expected_scope))
self.assertTrue(refresh_response.success)
self.refresh(key_bidstream_response_json_default_keys(expected_scope))
token = generate_uid_token(expected_scope, expected_version, phone_uid)
self.assertEqual(IdentityType.Phone, get_identity_type(token))
result = self._client.decrypt_token_into_raw_uid(token, None)
Expand All @@ -112,25 +123,40 @@ def test_phone_uids(self): # PhoneTest
self.assertEqual(result.identity_scope, expected_scope)
self.assertEqual(result.advertising_token_version, expected_version)

def test_token_lifetime_too_long_for_bidstream(self): # TokenLifetimeTooLongForBidstream
expires_in_sec = IN_3_DAYS + dt.timedelta(minutes=1)
def test_token_lifetime_too_long_for_bidstream_but_remaining_lifetime_allowed(self): # TokenLifetimeTooLongForBidstreamButRemainingLifetimeAllowed
generated = YESTERDAY
expires_in_sec = generated + dt.timedelta(days=3) + dt.timedelta(minutes=1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we extract a variable for max bidstream lifetime from here, line 134, and line 33 so we can see the connection

max_bidstream_lifetime_seconds = dt.timedelta(days=3).total_seconds()
for expected_scope, expected_version in test_cases_all_scopes_all_versions:
with self.subTest(expected_scope=expected_scope, expected_version=expected_version):
token = generate_uid_token(expected_scope, expected_version, expires_at=expires_in_sec)
refresh_response = self._client._refresh_json(key_bidstream_response_json_default_keys(
expected_scope))
self.assertTrue(refresh_response.success)
token = generate_uid_token(expected_scope, expected_version, generated_at=generated,
expires_at=expires_in_sec)
self.refresh(key_bidstream_response_json([master_key, site_key], expected_scope,
max_bidstream_lifetime_seconds))
result = self._client.decrypt_token_into_raw_uid(token, None)
if expected_version == AdvertisingTokenVersion.ADVERTISING_TOKEN_V2:
self.assert_success(result, expected_version, expected_scope)
else:
self.assert_fails(result, expected_version, expected_scope)

def test_token_remaining_lifetime_too_long_for_bidstream(self): # TokenRemainingLifetimeTooLongForBidstream
generated = now
expires_in_sec = generated + dt.timedelta(days=3) + dt.timedelta(minutes=1)
for expected_scope, expected_version in test_cases_all_scopes_v3_v4_versions:
with self.subTest(expected_scope=expected_scope, expected_version=expected_version):
token = generate_uid_token(expected_scope, expected_version, generated_at=generated,
expires_at=expires_in_sec)
self.refresh(key_bidstream_response_json_default_keys(expected_scope))
result = self._client.decrypt_token_into_raw_uid(token, None)
self.assert_fails(result, expected_version, expected_scope)

def test_token_generated_in_the_future_to_simulate_clock_skew(self): # TokenGeneratedInTheFutureToSimulateClockSkew
# Note V2 does not have a "token generated" field, therefore v2 tokens can't have a future "token generated" date and are excluded from this test.
created_at_future = dt.datetime.now(tz=timezone.utc) + dt.timedelta(minutes=31) #max allowed clock skew is 30m
for expected_scope, expected_version in test_cases_all_scopes_all_versions:
for expected_scope, expected_version in test_cases_all_scopes_v3_v4_versions:
with self.subTest(expected_scope=expected_scope, expected_version=expected_version):
token = generate_uid_token(expected_scope, expected_version, created_at=created_at_future)
refresh_response = self._client._refresh_json(key_bidstream_response_json_default_keys(
expected_scope))
self.assertTrue(refresh_response.success)
token = generate_uid_token(expected_scope, expected_version, generated_at=created_at_future)
self.refresh(key_bidstream_response_json_default_keys(expected_scope))
result = self._client.decrypt_token_into_raw_uid(token, None)
self.assert_fails(result, expected_version, expected_scope)
self.assertEqual(result.status, DecryptionStatus.INVALID_TOKEN_LIFETIME)
Expand All @@ -140,17 +166,14 @@ def test_token_generated_in_the_future_within_allowed_clock_skew(self): # Token
for expected_scope, expected_version in test_cases_all_scopes_all_versions:
with self.subTest(expected_scope=expected_scope, expected_version=expected_version):
token = generate_uid_token(expected_scope, expected_version, expires_at=created_at_future)
refresh_response = self._client._refresh_json(key_bidstream_response_json_default_keys(
expected_scope))
self.assertTrue(refresh_response.success)
self.refresh(key_bidstream_response_json_default_keys(expected_scope))
self._decrypt_and_assert_success(token, expected_version, expected_scope)

def test_legacy_response_from_old_operator(self):
test_cases = [AdvertisingTokenVersion.ADVERTISING_TOKEN_V2,
AdvertisingTokenVersion.ADVERTISING_TOKEN_V3,
AdvertisingTokenVersion.ADVERTISING_TOKEN_V4]
refresh_response = self._client._refresh_json(key_set_to_json_for_sharing([master_key, site_key]))
self.assertTrue(refresh_response.success)
self.refresh(key_set_to_json_for_sharing([master_key, site_key]))
for token_version in test_cases:
with self.subTest(token_version=token_version):
token = generate_uid_token(IdentityScope.UID2, token_version)
Expand All @@ -164,7 +187,7 @@ def test_token_generated_in_the_future_legacy_client(self): # TokenGeneratedInT
with self.subTest(expected_scope=expected_scope, expected_version=expected_version):
legacy_client.refresh_json(key_bidstream_response_json_default_keys(
expected_scope))
token = generate_uid_token(expected_scope, expected_version, created_at=created_at_future)
token = generate_uid_token(expected_scope, expected_version, generated_at=created_at_future)
result = legacy_client.decrypt(token)
self.assert_success(result, expected_version, expected_scope)

Expand All @@ -190,9 +213,7 @@ def test_identity_scope_and_types(self): # IdentityScopeAndType_TestCases
for uid, identity_scope, identity_type in test_cases:
with self.subTest(identity_scope=identity_scope, identity_type=identity_type):
token = generate_uid_token(identity_scope, AdvertisingTokenVersion.ADVERTISING_TOKEN_V4)
refresh_response = self._client._refresh_json(key_bidstream_response_json_default_keys(
identity_scope))
self.assertTrue(refresh_response.success)
self.refresh(key_bidstream_response_json_default_keys(identity_scope))
self._decrypt_and_assert_success(token, AdvertisingTokenVersion.ADVERTISING_TOKEN_V4, identity_scope)

def test_empty_keys(self): # EmptyKeyContainer
Expand All @@ -207,9 +228,7 @@ def test_master_key_expired(self): #ExpiredKeyContainer
site_key_expired = EncryptionKey(site_key_id, site_id, created=now, activates=now - dt.timedelta(hours=2),
expires=now - dt.timedelta(hours=1), secret=site_secret, keyset_id=99999)

refresh_response = self._client._refresh_json(key_bidstream_response_json(
[master_key_expired, site_key_expired]))
self.assertTrue(refresh_response.success)
self.refresh(key_bidstream_response_json([master_key_expired, site_key_expired]))

result = self._client.decrypt_token_into_raw_uid(example_uid, None)
self.assertFalse(result.success)
Expand All @@ -219,18 +238,15 @@ def test_not_authorized_for_master_key(self): #NotAuthorizedForMasterKey

another_master_key = EncryptionKey(master_key_id + site_key_id + 1, -1, created=now, activates=now, expires=now + dt.timedelta(hours=1), secret=master_secret)
another_site_key = EncryptionKey(master_key_id + site_key_id + 2, site_id, created=now, activates=now, expires=now + dt.timedelta(hours=1), secret=site_secret)
refresh_response = self._client._refresh_json(key_bidstream_response_json(
[another_master_key, another_site_key]))
self.assertTrue(refresh_response.success)
self.refresh(key_bidstream_response_json([another_master_key, another_site_key]))
token = generate_uid_token(IdentityScope.UID2, AdvertisingTokenVersion.ADVERTISING_TOKEN_V4)

result = self._client.decrypt_token_into_raw_uid(token, None)
self.assertFalse(result.success)
self.assertEqual(result.status, DecryptionStatus.NOT_AUTHORIZED_FOR_MASTER_KEY)

def test_invalid_payload(self): #InvalidPayload
refresh_response = self._client._refresh_json(key_bidstream_response_json_default_keys())
self.assertTrue(refresh_response.success)
self.refresh(key_bidstream_response_json_default_keys())
token = generate_uid_token(IdentityScope.UID2, AdvertisingTokenVersion.ADVERTISING_TOKEN_V4)
payload = Uid2Base64UrlCoder.decode(token)
bad_token = base64.urlsafe_b64encode(payload[:0])
Expand All @@ -240,13 +256,12 @@ def test_invalid_payload(self): #InvalidPayload
self.assertEqual(result.status, DecryptionStatus.INVALID_PAYLOAD)

def test_token_expiry_custom_decryption_time(self): #TokenExpiryAndCustomNow
refresh_response = self._client._refresh_json(key_bidstream_response_json_default_keys())
self.assertTrue(refresh_response.success)
self.refresh(key_bidstream_response_json_default_keys())

expires_at = now - dt.timedelta(days=60)
created_at = expires_at - dt.timedelta(minutes=1)
token = generate_uid_token(IdentityScope.UID2, AdvertisingTokenVersion.ADVERTISING_TOKEN_V4,
created_at=created_at, expires_at=expires_at)
generated_at=created_at, expires_at=expires_at)
result = self._client._decrypt_token_into_raw_uid(token, None, expires_at + dt.timedelta(seconds=1))
self.assertFalse(result.success)
self.assertEqual(result.status, DecryptionStatus.EXPIRED_TOKEN)
Expand Down
15 changes: 9 additions & 6 deletions tests/test_encryption.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ def validate_advertising_token(self, advertising_token_string, identity_scope, i
self.assertEqual(-1, advertising_token_string.find("/"))

def generate_uid2_token_v4(self, uid, master_key, site_id, site_key, params = Params(), identity_type = IdentityType.Email, identity_scope = IdentityScope.UID2):
if not isinstance(params.token_expiry, dt.datetime):
params.token_expiry = dt.datetime.now(tz=timezone.utc) + params.token_expiry
advertising_token = UID2TokenGenerator.generate_uid2_token_v4(uid, master_key, site_id, site_key, params)
self.validate_advertising_token(advertising_token, identity_scope, identity_type)
return advertising_token
Expand Down Expand Up @@ -155,7 +157,7 @@ def test_decrypt_token_v4_no_site_key(self):
result = decrypt(token, keys)

def test_decrypt_token_v4_invalid_version(self):
params = Params(dt.timedelta(hours=1))
params = Params(dt.datetime.now(tz=timezone.utc) + dt.timedelta(hours=1))
token = UID2TokenGenerator.generate_uid2_token_with_debug_info(_example_id, _master_key, _site_id, _site_key, params, 1)

keys = EncryptionKeysCollection([_master_key, _site_key])
Expand All @@ -174,7 +176,8 @@ def test_decrypt_token_v4_expired(self):
result = decrypt(token, keys)

def _generate_v2_token(self, expires_in_seconds):
return UID2TokenGenerator.generate_uid2_token_v2(_example_id, _master_key, _site_id, _site_key, Params(expires_in_seconds))
return UID2TokenGenerator.generate_uid2_token_v2(_example_id, _master_key, _site_id, _site_key,
Params(dt.datetime.now(tz=timezone.utc) + expires_in_seconds))

def _generate_v4_token(self, expires_in_seconds):
return self.generate_uid2_token_v4(_example_id, _master_key, _site_id, _site_key, Params(expires_in_seconds))
Expand Down Expand Up @@ -297,7 +300,7 @@ def test_decrypt_token_v3_no_site_key(self):
result = decrypt(token, keys)

def test_decrypt_token_v3_invalid_version(self):
params = Params(dt.timedelta(hours=1))
params = Params(dt.datetime.now(tz=timezone.utc) + dt.timedelta(hours=1))
token = UID2TokenGenerator.generate_uid2_token_with_debug_info(_example_id, _master_key, _site_id, _site_key, params, 1)

keys = EncryptionKeysCollection([_master_key, _site_key])
Expand All @@ -307,7 +310,7 @@ def test_decrypt_token_v3_invalid_version(self):


def test_decrypt_token_v3_expired(self):
params = Params(dt.timedelta(seconds=-1))
params = Params(dt.datetime.now(tz=timezone.utc) + dt.timedelta(seconds=-1))
token = UID2TokenGenerator.generate_uid2_token_v3(_example_id, _master_key, _site_id, _site_key, params)

keys = EncryptionKeysCollection([_master_key, _site_key])
Expand All @@ -331,7 +334,7 @@ def test_decrypt_token_v3_custom_now(self):


def test_decrypt_token_v3_invalid_payload(self):
params = Params(dt.timedelta(seconds=-1))
params = Params(dt.datetime.now(tz=timezone.utc) + dt.timedelta(seconds=-1))
token = UID2TokenGenerator.generate_uid2_token_v3(_example_id, _master_key, _site_id, _site_key, params)

keys = EncryptionKeysCollection([_master_key, _site_key])
Expand Down Expand Up @@ -418,7 +421,7 @@ def test_smoke_token_v3(self):
token_expiry = now + dt.timedelta(days=30) if keys.get_token_expiry_seconds() is None \
else now + dt.timedelta(seconds=int(keys.get_token_expiry_seconds()))
result = UID2TokenGenerator.generate_uid2_token_v3(uid2, _master_key, _site_id, _site_key,
Params(expiry=token_expiry, token_generated_at=now))
Params(expiry=token_expiry, token_generated=now))
final = decrypt(result, keys, now=now)

self.assertEqual(uid2, final.uid)
Expand Down
Loading