Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ testing/unit/ntp/output/
testing/unit/tls/output/
testing/unit/tls/tmp/
testing/unit/report/output/
testing/unit/risk_profile/output/

*.deb
make/DEBIAN/postinst
Expand Down
5 changes: 4 additions & 1 deletion framework/python/src/api/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -679,7 +679,10 @@ def get_profiles_format(self, response: Response):
return self.get_session().get_profiles_format()

def get_profiles(self):
return self.get_session().get_profiles()
profiles = []
for profile in self.get_session().get_profiles():
profiles.append(json.loads(profile.to_json()))
return profiles

async def update_profile(self, request: Request, response: Response):

Expand Down
158 changes: 110 additions & 48 deletions framework/python/src/common/risk_profile.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,53 +12,48 @@
# See the License for the specific language governing permissions and
# limitations under the License.
"""Stores additional information about a device's risk"""
#import datetime
from datetime import datetime
from dateutil.relativedelta import relativedelta
from common import logger
import json

LOGGER = logger.get_logger('risk_profile')
SECONDS_IN_YEAR = 31536000

DATA_COLLECTION_CATEGORY = 'Data Collection'
DATA_TRANSMISSION_CATEGORY = 'Data Transmission'
REMOTE_OPERATION_CATEGORY = 'Remote Operation'
OPERATING_ENVIRONMENT_CATEGORY = 'Operating Environment'


class RiskProfile():
"""Python representation of a risk profile"""

def __init__(self, profile_json=None, profile_format=None):

if profile_json is None or profile_format is None:
return

self.name = profile_json['name']
self.created = datetime.now().strftime('%Y-%m-%d')
self.created = datetime.now()
self.version = profile_json['version']
self.questions = profile_json['questions']
self.status = None
self.categories = None
self.risk = None

self._validate(profile_json, profile_format)
self._update_categories()
self._update_risk()
self._update_risk(profile_format)

# Load a profile without modifying the created date
# but still validate the profile
def load(self, profile_json, profile_format):
self.name = profile_json['name']
self.created = profile_json['created']
self.created = datetime.strptime(
profile_json['created'], '%Y-%m-%d')
self.version = profile_json['version']
self.questions = profile_json['questions']
self.status = None
self.categories = None

self._validate(profile_json, profile_format)
self._update_categories()
self._update_risk()
self._update_risk(profile_format)

return self

def update(self, profile_json, profile_format):

# Construct a new profile from json data
new_profile = RiskProfile(profile_json, profile_format)

Expand All @@ -71,37 +66,97 @@ def update(self, profile_json, profile_format):
self.created = new_profile.created
self.questions = new_profile.questions
self.status = new_profile.status
self.categories = new_profile.categories

def _validate(self, profile_json, profile_format):
if self._valid(profile_json, profile_format):
self.status = 'Expired' if self._expired() else 'Valid'
else:
self.status = 'Draft'

def _update_categories(self):
if self.status == 'Valid':
self.categories = []
self.categories.append(
self._get_category_status(DATA_COLLECTION_CATEGORY))
self.categories.append(
self._get_category_status(DATA_TRANSMISSION_CATEGORY))
self.categories.append(
self._get_category_status(REMOTE_OPERATION_CATEGORY))
self.categories.append(
self._get_category_status(OPERATING_ENVIRONMENT_CATEGORY))

def _update_risk(self):
def _update_risk(self, profile_format):

if self.status == 'Valid':

# Default risk = Limited
risk = 'Limited'
for category in self.categories:
if 'status' in category and category['status'] == 'High':

# Check each question in profile
for question in self.questions:
question_text = question['question']

# Fetch the risk levels from the profile format
format_q = self._get_format_question(
question_text, profile_format)

if format_q is None:
# This occurs when a question found in a current profile
# has been removed from the format (format change)
continue

# We only want to check the select or select-multiple
# questions for now
if format_q['type'] in ['select', 'select-multiple']:
answer = question['answer']

question_risk = 'Limited'

# The answer is a single string (select)
if isinstance(answer, str):

format_option = self._get_format_question_option(
format_q, answer)

# Format options may just be a list of strings with
# no risk attached
if format_option is None:
continue

if 'risk' in format_option and format_option['risk'] == 'High':
question_risk = format_option['risk']

# A list of indexes is the answer (select-multiple)
elif isinstance(answer, list):

format_options = format_q['options']

for index in answer:
option = self._get_option_from_index(format_options, index)

if option is None:
LOGGER.error('Answer had an invalid index for question: ' +
format_q['question'])
continue

if 'risk' in option and option['risk'] == 'High':
question_risk = 'High'

question['risk'] = question_risk

for question in self.questions:
if 'risk' in question and question['risk'] == 'High':
risk = 'High'

self.risk = risk

else:
# Remove risk
risk = None
self.risk = risk

def _get_format_question(self, question, profile_format):
for q in profile_format:
if q['question'] == question:
return q
return None

def _get_option_from_index(self, options, index):
i = 0
for option in options:
if i == index:
return option
i+=1
return None

def _check_answer(self, question):
status = 'Limited'
if question['validation']['required']:
Expand All @@ -114,14 +169,6 @@ def _check_answer(self, question):
status = 'High'
return status

def _get_category_status(self, category):
status = 'Limited'
for question in self.questions:
if 'category' in question and question['category'] == category:
if question['validation']['required']:
status = 'High' if self._check_answer(question) == 'High' else status
return {'name': category, 'status': status}

def _get_profile_question(self, profile_json, question):

for q in profile_json['questions']:
Expand All @@ -130,6 +177,19 @@ def _get_profile_question(self, profile_json, question):

return None

def _get_format_question_option(self, question_obj, answer):

for option in question_obj['options']:

# Ignore just string lists
if isinstance(option, str):
continue

if option['text'] == answer:
return option

return None

def _valid(self, profile_json, profile_format):

# Check name field is present
Expand Down Expand Up @@ -164,21 +224,23 @@ def _valid(self, profile_json, profile_format):
return all_questions_answered and all_questions_present

def _expired(self):
# Check expiry
created_date = datetime.strptime(
self.created, '%Y-%m-%d').timestamp()
today = datetime.now().timestamp()
return created_date < (today - SECONDS_IN_YEAR)
# Calculate the date one year after the creation date
expiry_date = self.created + relativedelta(years=1)

# Normalize the current date and time to midnight
today = datetime.now().replace(hour=0, minute=0, second=0, microsecond=0)

# Check if the current date and time is past the expiry date
return today > expiry_date

def to_json(self, pretty=False):
json_dict = {
'name': self.name,
'version': self.version,
'created': self.created,
'created': self.created.strftime('%Y-%m-%d'),
'status': self.status,
'risk': self.risk,
'questions': self.questions
}
if self.categories is not None:
json_dict['categories'] = self.categories
indent = 2 if pretty else None
return json.dumps(json_dict, indent=indent)
72 changes: 51 additions & 21 deletions framework/python/src/common/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,11 @@ def __init__(self, root_dir):

# Profiles
self._profiles = []
# Profile format that is passed to the frontend
# (excluding internal properties)
self._profile_format_json = None
# Profile format used for internal validation
self._profile_format = None

# System configuration
self._config_file = os.path.join(root_dir, CONFIG_FILE_PATH)
Expand Down Expand Up @@ -359,34 +363,60 @@ def _load_profiles(self):
with open(os.path.join(
self._root_dir, PROFILE_FORMAT_PATH
), encoding='utf-8') as profile_format_file:
self._profile_format_json = json.load(profile_format_file)

format_json = json.load(profile_format_file)

# Save original profile format for internal validation
self._profile_format = format_json

except (IOError, ValueError) as e:
LOGGER.error(
'An error occurred whilst loading the risk assessment format')
LOGGER.debug(e)

profile_format_array = []

# Remove internal properties
for question_obj in format_json:
new_obj = {}
for key in question_obj:
if key == 'options':
options = []
for option in question_obj[key]:
if isinstance(option, str):
options.append(option)
else:
options.append(option['text'])
new_obj['options'] = options
else:
new_obj[key] = question_obj[key]

profile_format_array.append(new_obj)

self._profile_format_json = profile_format_array

# Load existing profiles
LOGGER.debug('Loading risk profiles')

try:
for risk_profile_file in os.listdir(os.path.join(
self._root_dir, PROFILES_DIR
)):
LOGGER.debug(f'Discovered profile {risk_profile_file}')

with open(os.path.join(
self._root_dir, PROFILES_DIR, risk_profile_file
), encoding='utf-8') as f:
json_data = json.load(f)
risk_profile = RiskProfile()
risk_profile = risk_profile.load(
profile_json=json_data,
profile_format=self.get_profiles_format())
self._profiles.append(risk_profile)
#try:
for risk_profile_file in os.listdir(os.path.join(
self._root_dir, PROFILES_DIR
)):
LOGGER.debug(f'Discovered profile {risk_profile_file}')

except Exception as e:
LOGGER.error('An error occurred whilst loading risk profiles')
LOGGER.debug(e)
with open(os.path.join(
self._root_dir, PROFILES_DIR, risk_profile_file
), encoding='utf-8') as f:
json_data = json.load(f)
risk_profile = RiskProfile()
risk_profile = risk_profile.load(
profile_json=json_data,
profile_format=self._profile_format)
self._profiles.append(risk_profile)

# except Exception as e:
# LOGGER.error('An error occurred whilst loading risk profiles')
# LOGGER.debug(e)

def get_profiles_format(self):
return self._profile_format_json
Expand All @@ -413,12 +443,12 @@ def update_profile(self, profile_json):

# Create a new risk profile
risk_profile = RiskProfile(profile_json=profile_json,
profile_format=self.get_profiles_format())
profile_format=self._profile_format)
self._profiles.append(risk_profile)

else:

risk_profile.update(profile_json, self.get_profiles_format())
risk_profile.update(profile_json, self._profile_format)
# Check if name has changed
if 'rename' in profile_json:
# Delete the original file
Expand Down
5 changes: 4 additions & 1 deletion framework/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,7 @@ markdown==3.5.2

# Requirements for the session
cryptography==42.0.7
pytz==2024.1
pytz==2024.1

# Requirements for the risk profile
python-dateutil==2.9.0
1 change: 1 addition & 0 deletions local/.gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
system.json
devices
root_certs
risk_profiles
2 changes: 1 addition & 1 deletion local/risk_profiles/Primary profile.json
Original file line number Diff line number Diff line change
@@ -1 +1 @@
{"name": "Primary profile", "version": "1.3-alpha", "created": "2024-06-18", "status": "Valid", "questions": [{"question": "What type of device is this?", "answer": "IoT Sensor"}, {"question": "How will this device be used at Google?", "answer": "Hey"}, {"question": "Is this device going to be managed by Google or a third party?", "answer": "Managed by Google"}, {"question": "What is the email of the device owner(s)?", "answer": "boddey@google.com, cmeredith@google.com"}, {"question": "Is this device going to be managed by Google or a third party?", "answer": "Google"}, {"question": "Will the third-party device administrator be able to grant access to authorized Google personnel upon request?", "answer": "Yes"}, {"question": "Are any of the following statements true about your device?", "answer": [0, 1]}, {"question": "Which of the following statements are true about this device?", "answer": [1, 2, 3]}, {"question": "Does the network protocol assure server-to-client identity verification?", "answer": "Yes"}, {"question": "Click the statements that best describe the characteristics of this device.", "answer": [0, 1]}, {"question": "Are any of the following statements true about this device?", "answer": [2, 3, 5]}], "categories": [{"name": "Data Collection", "status": "Limited"}, {"name": "Data Transmission", "status": "Limited"}, {"name": "Remote Operation", "status": "Limited"}, {"name": "Operating Environment", "status": "Limited"}]}
{"name": "Primary profile", "version": "1.3-alpha", "created": "2024-06-27", "status": "Valid", "risk": "High", "questions": [{"question": "What type of device is this?", "answer": "IoT Gateway", "risk": "High"}, {"question": "How will this device be used at Google?", "answer": "Hey"}, {"question": "Is this device going to be managed by Google or a third party?", "answer": "Google", "risk": "Limited"}, {"question": "Will the third-party device administrator be able to grant access to authorized Google personnel upon request?", "answer": "Yes", "risk": "Limited"}, {"question": "Are any of the following statements true about your device?", "answer": [3], "risk": "Limited"}, {"question": "Which of the following statements are true about this device?", "answer": [5], "risk": "Limited"}, {"question": "Does the network protocol assure server-to-client identity verification?", "answer": "Yes", "risk": "Limited"}, {"question": "Click the statements that best describe the characteristics of this device.", "answer": [4], "risk": "High"}, {"question": "Are any of the following statements true about this device?", "answer": [0], "risk": "High"}]}
Loading