diff --git a/LowCostDP3T.py b/LowCostDP3T.py index 9cd47d2..b97e6c6 100644 --- a/LowCostDP3T.py +++ b/LowCostDP3T.py @@ -44,236 +44,243 @@ # Local key management and storage ################################## class KeyStore: - ''' This class handles local key management (SKs and EphIDs). - The set of previous SKs is kept in a local array (up to max length). - The set of EphIDs for the current day is kept in another array. - ''' - - def __init__(self): - self.SKt = [] # Current set of SKts - self.ephIDs = [] # Daily set of ephIDs - # Initial key is created from true random value - self.SKt.insert(0, secrets.token_bytes(32)) - self.rotate_ephIDs() - - @staticmethod - def get_SKt1(SK_t0): - ''' Updates a given SK_t to SK_t+1. - - This method creates the next key in the chain of SK_t's. - This method is called either for the local rotation or when we - recover the different SK_ts from an infected person. - - Arguments: - SK(list): current SK_t (b"rand" * 32). - - Returns: - b[]: The next SK (SK_t+1). - ''' - SK_t1 = hashlib.sha256(SK_t0).digest() - return SK_t1 - - def rotate_SK(self): - ''' Create a new SK_t+1 based on SK_t. - - This method updates the current key and moves on to the next day. - This method is called at midnight UTC. - ''' - SK_t1 = KeyStore.get_SKt1(self.SKt[0]) - self.SKt.insert(0, SK_t1) - # truncate list to max days - while len(self.SKt) > RETENTION_PERIOD: - self.SKt.pop() - - @staticmethod - def create_ephIDs(SK): - ''' Create the set of beacons for the day based on a new SK_t. - - This method created the set of ephemeral IDs given an SK for - day / broadcast_key. - - Arguments: - SK(b[]): given SK (b"rand" * 32) - - Returns: - [b[]]: Set of ephemeral IDs (ephIDs) for a single day. - ''' - # Set up PRF with the SK and broadcast_key - prf = hmac.new(SK, BROADCAST_KEY.encode(), hashlib.sha256).digest() - # Start with a fresh counter each day and initialize AES in CTR mode - prg = AES.new(prf, AES.MODE_CTR, counter = Counter.new(128, initial_value=0)) - ephIDs = [] - - # Create the number of desired ephIDs by encrypting 0 bytes - prg_data = prg.encrypt(b"\0" * 16 * NUM_EPOCHS_PER_DAY) - for i in range(NUM_EPOCHS_PER_DAY): - # split the prg data into individual ephIDs - ephIDs.append(prg_data[i*16:i*16+16]) - return ephIDs - - def rotate_ephIDs(self): - ''' Generate the daily set of EphIDs. - - This method creates the set of ephIDs for the app to broadcast. - We shuffle this set before returning it back to the app. - This method updates the local set of ephIDs, epoch indexes into set. - Executed once per day, at 0:00 UTC - ''' - ephIDs = self.create_ephIDs(self.SKt[0]) - # TODO: random.shuffle is not cryptographically secure! - # The real app will use a cryptographic secure shuffle - random.shuffle(ephIDs) - self.ephIDs = ephIDs - - def get_epoch(self, now): - ''' Return the current epoch. - now: time mapped to epoch - ''' - offset = now.hour*60 + now.minute - delta = 24*60 // NUM_EPOCHS_PER_DAY - return offset//delta - - def get_current_ephID(self, now = None): - ''' Returns the current ephID - ''' - if now is None: - now = datetime.now(timezone.utc) - return self.ephIDs[self.get_epoch(now)] + ''' This class handles local key management (SKs and EphIDs). + The set of previous SKs is kept in a local array (up to max length). + The set of EphIDs for the current day is kept in another array. + ''' + + def __init__(self): + self.SKt = [] # Current set of SKts + self.ephIDs = [] # Daily set of ephIDs + # Initial key is created from true random value + self.SKt.insert(0, secrets.token_bytes(32)) + self.rotate_ephIDs() + + @staticmethod + def get_SKt1(SK_t0): + ''' Updates a given SK_t to SK_t+1. + + This method creates the next key in the chain of SK_t's. + This method is called either for the local rotation or when we + recover the different SK_ts from an infected person. + + Arguments: + SK(list): current SK_t (b"rand" * 32). + + Returns: + b[]: The next SK (SK_t+1). + ''' + SK_t1 = hashlib.sha256(SK_t0).digest() + return SK_t1 + + def rotate_SK(self): + ''' Create a new SK_t+1 based on SK_t. + + This method updates the current key and moves on to the next day. + This method is called at midnight UTC. + ''' + SK_t1 = KeyStore.get_SKt1(self.SKt[0]) + self.SKt.insert(0, SK_t1) + # truncate list to max days + while len(self.SKt) > RETENTION_PERIOD: + self.SKt.pop() + + @staticmethod + def create_ephIDs(SK): + ''' Create the set of beacons for the day based on a new SK_t. + + This method created the set of ephemeral IDs given an SK for + day / broadcast_key. + + Arguments: + SK(b[]): given SK (b"rand" * 32) + + Returns: + [b[]]: Set of ephemeral IDs (ephIDs) for a single day. + ''' + # Set up PRF with the SK and broadcast_key + prf = hmac.new(SK, BROADCAST_KEY.encode(), hashlib.sha256).digest() + # Start with a fresh counter each day and initialize AES in CTR mode + prg = AES.new(prf, AES.MODE_CTR, + counter=Counter.new(128, initial_value=0)) + ephIDs = [] + + # Create the number of desired ephIDs by encrypting 0 bytes + prg_data = prg.encrypt(b"\0" * 16 * NUM_EPOCHS_PER_DAY) + for i in range(NUM_EPOCHS_PER_DAY): + # split the prg data into individual ephIDs + ephIDs.append(prg_data[i*16:i*16+16]) + return ephIDs + + def rotate_ephIDs(self): + ''' Generate the daily set of EphIDs. + + This method creates the set of ephIDs for the app to broadcast. + We shuffle this set before returning it back to the app. + This method updates the local set of ephIDs, epoch indexes into set. + Executed once per day, at 0:00 UTC + ''' + ephIDs = self.create_ephIDs(self.SKt[0]) + # TODO: random.shuffle is not cryptographically secure! + # The real app will use a cryptographic secure shuffle + random.shuffle(ephIDs) + self.ephIDs = ephIDs + + def get_epoch(self, now): + ''' Return the current epoch. + now: time mapped to epoch + ''' + offset = now.hour*60 + now.minute + delta = 24*60 // NUM_EPOCHS_PER_DAY + return offset//delta + + def get_current_ephID(self, now=None): + ''' Returns the current ephID + ''' + if now is None: + now = datetime.now(timezone.utc) + return self.ephIDs[self.get_epoch(now)] # Handle and manage contacts ############################ class ContactManager: - ''' Keep track of contacts and manage measurements - ''' - - def __init__(self): - self.observations = {} # Observations of the current epoch - self.contacts = [{}] # Array of daily contact sets - - # Remote beacon management - ########################## - def receive_scans(self, beacons = None, now = None): - ''' Receive a set of new BLE beacons and process them. - - Add the current received information to the observations for the - current epoch. - - Arguments: - beacons([]): list of received beacons. - now(datetime): current time, override for mock testing. - ''' - if beacons is None: - beacons = [] - - if now is None: - now = datetime.now(timezone.utc) - - timestamp = (now.hour*60 + now.minute)*60 + now.second - for beacon in beacons: - self.add_observation(beacon, timestamp) - - - # Contact management and proximity logger - ######################################### - def add_observation(self, beacon, timestamp): - ''' Adds a new contact observation to the current epoch. - - Arguments: - beacon(b[]): observed beacon. - timestamp(int): offset to beginning of UTC 0:00 in seconds. - ''' - if beacon in self.observations: - self.observations[beacon].append(timestamp) - else: - self.observations[beacon] = [timestamp] - - def rotate_contacts(self): - ''' Move to the next day for contacts. - - Create a new empty set of contacts, update and truncate history. - ''' - self.contacts.insert(0, {}) - # truncate history - while len(self.contacts) > RETENTION_PERIOD: - self.contacts.pop() - - def process_epoch(self): - ''' Process observations/epoch, add to contact set if >threshold. - - Iterate through all observations and identify which observations - are above a threshold, i.e., turn the set of observations into a - set of minimal contacts. This process aggregates multiple - observations into a contact of a given duration. As a side effect, - this process drops timing information (i.e., when the contact - happened and only stores how long the contact lasted). - ''' - for beacon in self.observations: - # TODO: as of now, we subtract the last observation from the first - # and use this as overall timestamp. The real app will use a - # elaborate way to identify a contact, e.g., by averaging and - # adding some statistical modeling across all timestamps. - if len(self.observations[beacon]) >= 2: - duration = self.observations[beacon][-1] - self.observations[beacon][0] - if duration > CONTACT_THRESHOLD: - self.contacts[0][beacon] = duration - self.observations = {} - - - # Update infected and local risk scoring - ######################################## - def check_infected(self, inf_SK0, date, now = None): - ''' Checks if our database was exposed to an infected SK starting on date. - - NOTE: this implementation uses the date of the SK_t to reduce the - number of comparisons. The backend needs to store tuples. - - Check if we recorded a contact with a given SK0 across in our - database of contact records. This implementation assumes we are - given a date of infection and checks on a per-day basis. - - Arguments - infSK0(b[]): SK_t of infected - date(str): date of SK_t (i.e., the t in the form 2020-04-23). - now(datetime): current date for mock testing. - ''' - if now is None: - now = datetime.now(timezone.utc) - infect_date = datetime.strptime(date, "%Y-%m-%d") - days_infected = (now-infect_date).days - inf_SK = inf_SK0 - for day in range(days_infected, -1, -1): - # Create infected EphIDs and rotate infected SK - infected_ephIDs = KeyStore.create_ephIDs(inf_SK) - inf_SK = KeyStore.get_SKt1(inf_SK) - - # Do we have observations that day? - if len(self.contacts)<=day or len(self.contacts[day]) == 0: - continue - - # Go through all infected EphIDs and check if we have a hit - for inf_ephID in infected_ephIDs: - # Hash check of infected beacon in set of daily contacts - if inf_ephID in self.contacts[day]: - duration = self.contacts[day][inf_ephID] - print("At risk, observed {} on day -{} for {}".format(inf_ephID.hex(), day, duration)) + ''' Keep track of contacts and manage measurements + ''' + + def __init__(self): + self.observations = {} # Observations of the current epoch + self.contacts = [{}] # Array of daily contact sets + + # Remote beacon management + ########################## + def receive_scans(self, beacons=None, now=None): + ''' Receive a set of new BLE beacons and process them. + + Add the current received information to the observations for the + current epoch. + + Arguments: + beacons([]): list of received beacons. + now(datetime): current time, override for mock testing. + ''' + if beacons is None: + beacons = [] + + if now is None: + now = datetime.now(timezone.utc) + + timestamp = (now.hour*60 + now.minute)*60 + now.second + for beacon in beacons: + self.add_observation(beacon, timestamp) + + # Contact management and proximity logger + ######################################### + + def add_observation(self, beacon, timestamp): + ''' Adds a new contact observation to the current epoch. + + Arguments: + beacon(b[]): observed beacon. + timestamp(int): offset to beginning of UTC 0:00 in seconds. + ''' + if beacon in self.observations: + self.observations[beacon].append(timestamp) + else: + self.observations[beacon] = [timestamp] + + def rotate_contacts(self): + ''' Move to the next day for contacts. + + Create a new empty set of contacts, update and truncate history. + ''' + self.contacts.insert(0, {}) + # truncate history + while len(self.contacts) > RETENTION_PERIOD: + self.contacts.pop() + + def process_epoch(self): + ''' Process observations/epoch, add to contact set if >threshold. + + Iterate through all observations and identify which observations + are above a threshold, i.e., turn the set of observations into a + set of minimal contacts. This process aggregates multiple + observations into a contact of a given duration. As a side effect, + this process drops timing information (i.e., when the contact + happened and only stores how long the contact lasted). + ''' + for beacon in self.observations: + # TODO: as of now, we subtract the last observation from the first + # and use this as overall timestamp. The real app will use a + # elaborate way to identify a contact, e.g., by averaging and + # adding some statistical modeling across all timestamps. + if len(self.observations[beacon]) >= 2: + duration = self.observations[beacon][-1] - \ + self.observations[beacon][0] + if duration > CONTACT_THRESHOLD: + self.contacts[0][beacon] = duration + self.observations = {} + + # Update infected and local risk scoring + ######################################## + + def check_infected(self, inf_SK0, dateInfectious, datePublished): + ''' Checks if our database was exposed to an infected SK starting on date. + + NOTE: this implementation uses the date of the SK_t to reduce the + number of comparisons. The backend needs to store tuples. + + Check if we recorded a contact with a given SK0 across in our + database of contact records. This implementation assumes we are + given a date of infection and a date of publication and checks + on a per-day basis. + + Arguments + infSK0(b[]): SK_t of infected + dateInfectious(str): date of SK_t (i.e., the t in the + form 2020-04-23). + datePublish(datetime): date of publication of SK_t (same + format). + ''' + infectious_date = datetime.strptime(dateInfectious, "%Y-%m-%d") + published_date = datetime.strptime(datePublished, "%Y-%m-%d") + days_infected = (published_date-infectious_date).days + inf_SK = inf_SK0 + # We do not compute the SK key of the day of the publication + for day in range(days_infected, 0, -1): + # Create infected EphIDs and rotate infected SK + infected_ephIDs = KeyStore.create_ephIDs(inf_SK) + inf_SK = KeyStore.get_SKt1(inf_SK) + + # Do we have observations that day? + if len(self.contacts) <= day or len(self.contacts[day]) == 0: + continue + + # Go through all infected EphIDs and check if we have a hit + for inf_ephID in infected_ephIDs: + # Hash check of infected beacon in set of daily contacts + if inf_ephID in self.contacts[day]: + duration = self.contacts[day][inf_ephID] + print( + "At risk, observed {} on day ({})-{} day(s) for {} seconds".format(inf_ephID.hex(), datePublished, day, duration)) # Mock Application that ties contact manager and keystore together ################################################################## class MockApp: - def __init__(self): - ''' Initialize the simple mock app, create an SK/ephIDs. - ''' - self.keystore = KeyStore() - self.ctmgr = ContactManager() - - def next_day(self): - # Rotate keys daily - # ASSERT(This function is executed at 0:00 UTC) - self.keystore.rotate_SK() - self.keystore.rotate_ephIDs() - self.ctmgr.rotate_contacts() - - def next_epoch(self): - self.ctmgr.process_epoch() + def __init__(self): + ''' Initialize the simple mock app, create an SK/ephIDs. + ''' + self.keystore = KeyStore() + self.ctmgr = ContactManager() + + def next_day(self): + # Rotate keys daily + # ASSERT(This function is executed at 0:00 UTC) + self.keystore.rotate_SK() + self.keystore.rotate_ephIDs() + self.ctmgr.rotate_contacts() + + def next_epoch(self): + self.ctmgr.process_epoch() diff --git a/example_run.py b/example_run.py index fb86095..258452e 100644 --- a/example_run.py +++ b/example_run.py @@ -24,76 +24,95 @@ if __name__ == "__main__": - # Mock time starts midnight on April 01. - epotime = datetime.timestamp(datetime.strptime("2020-04-01", "%Y-%m-%d")) - - # We have three people: Alice, Bob, and Isidor - alice = LowCostDP3T.MockApp() - bob = LowCostDP3T.MockApp() - isidor = LowCostDP3T.MockApp() - - # Run tests for the specified number of days - for day in range(2): - print("Day: Alice, Bob, and Isidor do not have contact.") - epotime += 24*60*60 - alice.next_day() - bob.next_day() - isidor.next_day() - - for day in range(3): - print("Day: Alice and Bob work in the same office, Isidor elsewhere.") - for hour in range(8, 17): - time = epotime + hour*60*60 - # We break each hour into epochs - for epoch in range(60//LowCostDP3T.EPOCH_LENGTH): - now = datetime.utcfromtimestamp(epotime) - alice_ephID = alice.keystore.get_current_ephID(now) - bob_ephID = bob.keystore.get_current_ephID(now) - # Record two beacons in the same epoch, resulting in a contact - alice.ctmgr.receive_scans([bob_ephID], now = now) - bob.ctmgr.receive_scans([alice_ephID], now = now) - now = now + timedelta(seconds=LowCostDP3T.CONTACT_THRESHOLD+1) - alice.ctmgr.receive_scans([bob_ephID], now = now) - bob.ctmgr.receive_scans([alice_ephID], now = now) - # Process the received beacons - alice.next_epoch() - bob.next_epoch() - # Tik Tok - epotime += 24*60*60 - alice.next_day() - bob.next_day() - isidor.next_day() - - print("Day: Bob and Isidor meet for dinner.") - for hour in range(17, 20): - for epoch in range(60//LowCostDP3T.EPOCH_LENGTH): - now = datetime.utcfromtimestamp(epotime) - bob_ephID = bob.keystore.get_current_ephID(now) - isidor_ephID = isidor.keystore.get_current_ephID(now) - # Record two beacons in the same epoch, resulting in a contact - bob.ctmgr.receive_scans([isidor_ephID], now = now) - isidor.ctmgr.receive_scans([bob_ephID], now = now) - now = now + timedelta(seconds=LowCostDP3T.CONTACT_THRESHOLD+1) - bob.ctmgr.receive_scans([isidor_ephID], now = now) - isidor.ctmgr.receive_scans([bob_ephID], now = now) - # Process the received beacons - alice.next_epoch() - bob.next_epoch() - isidor.next_epoch() - - print("Isidor is tested positive.") - infectious_date = datetime.utcfromtimestamp(epotime) - infections_SK = isidor.keystore.SKt[0] - - # Tik Tok - epotime += 24*60*60 - alice.next_day() - bob.next_day() - isidor.next_day() - - # Check infectiousness - print("Check exposure of Alice and Bob.") - print("Alice: (not positive)") - alice.ctmgr.check_infected(infections_SK, infectious_date.strftime("%Y-%m-%d"), datetime.utcfromtimestamp(epotime)) - print("Bob: (at risk)") - bob.ctmgr.check_infected(infections_SK, infectious_date.strftime("%Y-%m-%d"), datetime.utcfromtimestamp(epotime)) + # Mock time starts midnight on April 01. + epotime = datetime.timestamp(datetime.strptime("2020-04-01", "%Y-%m-%d")) + + # We have three people: Alice, Bob, and Isidor + alice = LowCostDP3T.MockApp() + bob = LowCostDP3T.MockApp() + isidor = LowCostDP3T.MockApp() + + # Run tests for the specified number of days + for day in range(2): + print("Day: Alice, Bob, and Isidor do not have contact.") + epotime += 24*60*60 + alice.next_day() + bob.next_day() + isidor.next_day() + + for day in range(3): + print("Day: Alice and Bob work in the same office, Isidor elsewhere.") + for hour in range(8, 17): + time = epotime + hour*60*60 + # We break each hour into epochs + for epoch in range(60//LowCostDP3T.EPOCH_LENGTH): + now = datetime.utcfromtimestamp(epotime) + alice_ephID = alice.keystore.get_current_ephID(now) + bob_ephID = bob.keystore.get_current_ephID(now) + # Record two beacons in the same epoch, resulting in a contact + alice.ctmgr.receive_scans([bob_ephID], now=now) + bob.ctmgr.receive_scans([alice_ephID], now=now) + now = now + timedelta(seconds=LowCostDP3T.CONTACT_THRESHOLD+1) + alice.ctmgr.receive_scans([bob_ephID], now=now) + bob.ctmgr.receive_scans([alice_ephID], now=now) + # Process the received beacons + alice.next_epoch() + bob.next_epoch() + # Tik Tok + epotime += 24*60*60 + alice.next_day() + bob.next_day() + isidor.next_day() + + print("Isidor is now infectious.") + infectious_date = datetime.utcfromtimestamp(epotime) + infections_SK = isidor.keystore.SKt[0] + + print("Day: Bob and Isidor meet for dinner.") + for hour in range(17, 20): + for epoch in range(60//LowCostDP3T.EPOCH_LENGTH): + now = datetime.utcfromtimestamp(epotime) + bob_ephID = bob.keystore.get_current_ephID(now) + isidor_ephID = isidor.keystore.get_current_ephID(now) + # Record two beacons in the same epoch, resulting in a contact + bob.ctmgr.receive_scans([isidor_ephID], now=now) + isidor.ctmgr.receive_scans([bob_ephID], now=now) + now = now + timedelta(seconds=LowCostDP3T.CONTACT_THRESHOLD+1) + bob.ctmgr.receive_scans([isidor_ephID], now=now) + isidor.ctmgr.receive_scans([bob_ephID], now=now) + # Process the received beacons + alice.next_epoch() + bob.next_epoch() + isidor.next_epoch() + + # Tik Tok + epotime += 24*60*60 + alice.next_day() + bob.next_day() + isidor.next_day() + + print("Isidor is tested positive and publish infectious_SK (from yesterday).") + published_date = datetime.utcfromtimestamp(epotime) + + # Tik Tok + epotime += 2*60*60 + + # Alice receives a replay and should not be positive because of that + now = datetime.utcfromtimestamp(epotime) + bob_ephID = bob.keystore.get_current_ephID(now) + isidor_ephID = isidor.keystore.get_current_ephID(now) + # The point is that anyone can compute isidor_ephID at this point since infections_SK has been publicly published at B + alice.ctmgr.receive_scans([isidor_ephID], now=now) + now = now + timedelta(seconds=LowCostDP3T.CONTACT_THRESHOLD+1) + alice.ctmgr.receive_scans([isidor_ephID], now=now) + # Process the received beacons + alice.next_epoch() + + # Check infectiousness + print("Check exposure of Alice and Bob.") + print("Alice: (not positive)") + alice.ctmgr.check_infected(infections_SK, infectious_date.strftime( + "%Y-%m-%d"), published_date.strftime("%Y-%m-%d")) + print("Bob: (at risk)") + bob.ctmgr.check_infected(infections_SK, infectious_date.strftime( + "%Y-%m-%d"), published_date.strftime("%Y-%m-%d"))