Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
196 changes: 196 additions & 0 deletions common/op_edit.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,196 @@
from common.op_params import opParams
import time
import ast


class opEdit: # use by running `python /data/openpilot/op_edit.py`
def __init__(self):
self.op_params = opParams()
self.params = None
self.sleep_time = 1.0
self.run_loop()

def run_loop(self):
print('Welcome to the opParams command line editor!')
print('Here are your parameters:\n')
while True:
self.params = self.op_params.get()

values_list = [self.params[i] if len(str(self.params[i])) < 20 else '{} ... {}'.format(str(self.params[i])[:30], str(self.params[i])[-15:]) for i in self.params]
live = [' (live!)' if i in self.op_params.default_params and self.op_params.default_params[i]['live'] else '' for i in self.params]

to_print = ['{}. {}: {} {}'.format(idx + 1, i, values_list[idx], live[idx]) for idx, i in enumerate(self.params)]
to_print.append('\n{}. Add new parameter!'.format(len(self.params) + 1))
to_print.append('{}. Delete parameter!'.format(len(self.params) + 2))
print('\n'.join(to_print))
print('\nChoose a parameter to explore (by integer index): ')
choice = input('>> ').strip()
parsed, choice = self.parse_choice(choice)
if parsed == 'continue':
continue
elif parsed == 'add':
self.add_parameter()
elif parsed == 'change':
self.change_parameter(choice)
elif parsed == 'delete':
self.delete_parameter()
elif parsed == 'error':
return

def parse_choice(self, choice):
if choice.isdigit():
choice = int(choice)
choice -= 1
elif choice == '':
print('Exiting opEdit!')
return 'error', choice
else:
print('\nNot an integer!\n', flush=True)
time.sleep(self.sleep_time)
return 'retry', choice
if choice not in range(0, len(self.params) + 2): # three for add/delete parameter
print('Not in range!\n', flush=True)
time.sleep(self.sleep_time)
return 'continue', choice

if choice == len(self.params): # add new parameter
return 'add', choice

if choice == len(self.params) + 1: # delete parameter
return 'delete', choice

return 'change', choice

def change_parameter(self, choice):
while True:
chosen_key = list(self.params)[choice]
extra_info = False
live = False
if chosen_key in self.op_params.default_params:
extra_info = True
allowed_types = self.op_params.default_params[chosen_key]['allowed_types']
description = self.op_params.default_params[chosen_key]['description']
live = self.op_params.default_params[chosen_key]['live']

old_value = self.params[chosen_key]
print('Chosen parameter: {}'.format(chosen_key))
print('Current value: {} (type: {})'.format(old_value, str(type(old_value)).split("'")[1]))
if extra_info:
print('\n- Description: {}'.format(description))
print('- Allowed types: {}'.format(', '.join([str(i).split("'")[1] for i in allowed_types])))
if live:
print('- This parameter supports live tuning! Updates should take affect within 5 seconds.\n')
print('It\'s recommended to use the new opTune module! It\'s been streamlined to make live tuning easier and quicker.')
print('Just exit out of this and type:')
print('python op_tune.py')
print('In the directory /data/openpilot\n')
else:
print()
print('Enter your new value:')
new_value = input('>> ').strip()
if new_value == '':
return

status, new_value = self.parse_input(new_value)

if not status:
continue

if extra_info and not any([isinstance(new_value, typ) for typ in allowed_types]):
self.message('The type of data you entered ({}) is not allowed with this parameter!\n'.format(str(type(new_value)).split("'")[1]))
continue

print('\nOld value: {} (type: {})'.format(old_value, str(type(old_value)).split("'")[1]))
print('New value: {} (type: {})'.format(new_value, str(type(new_value)).split("'")[1]))
print('Do you want to save this?')
choice = input('[Y/n]: ').lower().strip()
if choice == 'y':
self.op_params.put(chosen_key, new_value)
print('\nSaved!\n', flush=True)
else:
print('\nNot saved!\n', flush=True)
time.sleep(self.sleep_time)
return

def parse_input(self, dat):
try:
dat = ast.literal_eval(dat)
except:
try:
dat = ast.literal_eval('"{}"'.format(dat))
except ValueError:
self.message('Cannot parse input, please try again!')
return False, dat
return True, dat

def delete_parameter(self):
while True:
print('Enter the name of the parameter to delete:')
key = input('>> ').lower()
status, key = self.parse_input(key)
if key == '':
return
if not status:
continue
if not isinstance(key, str):
self.message('Input must be a string!')
continue
if key not in self.params:
self.message("Parameter doesn't exist!")
continue

value = self.params.get(key)
print('Parameter name: {}'.format(key))
print('Parameter value: {} (type: {})'.format(value, str(type(value)).split("'")[1]))
print('Do you want to delete this?')

choice = input('[Y/n]: ').lower().strip()
if choice == 'y':
self.op_params.delete(key)
print('\nDeleted!\n')
else:
print('\nNot saved!\n', flush=True)
time.sleep(self.sleep_time)
return

def add_parameter(self):
while True:
print('Type the name of your new parameter:')
key = input('>> ').strip()
if key == '':
return

status, key = self.parse_input(key)

if not status:
continue
if not isinstance(key, str):
self.message('Input must be a string!')
continue

print("Enter the data you'd like to save with this parameter:")
value = input('>> ').strip()
status, value = self.parse_input(value)
if not status:
continue

print('Parameter name: {}'.format(key))
print('Parameter value: {} (type: {})'.format(value, str(type(value)).split("'")[1]))
print('Do you want to save this?')

choice = input('[Y/n]: ').lower().strip()
if choice == 'y':
self.op_params.put(key, value)
print('\nSaved!\n', flush=True)
else:
print('\nNot saved!\n', flush=True)
time.sleep(self.sleep_time)
return

def message(self, msg):
print('--------\n{}\n--------'.format(msg), flush=True)
time.sleep(self.sleep_time)
print()


opEdit()
130 changes: 130 additions & 0 deletions common/op_params.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
import os
import json
import time
import string
import random
from common.travis_checker import travis

def write_params(params, params_file):
if not travis:
with open(params_file, "w") as f:
json.dump(params, f, indent=2, sort_keys=True)
os.chmod(params_file, 0o764)


def read_params(params_file, default_params):
try:
with open(params_file, "r") as f:
params = json.load(f)
return params, True
except Exception as e:
print(e)
params = default_params
return params, False


class opParams:
def __init__(self):
self.default_params = {'camera_offset': {'default': 0.06, 'allowed_types': [float, int], 'description': 'Your camera offset to use in lane_planner.py', 'live': True},
'awareness_factor': {'default': 2.0, 'allowed_types': [float, int], 'description': 'Multiplier for the awareness times', 'live': False},
'lane_hug_direction': {'default': None, 'allowed_types': [type(None), str], 'description': "(NoneType, 'left', 'right'): Direction of your lane hugging, if present. None will disable this modification", 'live': False},
'lane_hug_angle_offset': {'default': 0.0, 'allowed_types': [float, int], 'description': ('This is the angle your wheel reads when driving straight at highway speeds. '
'Used to offset desired angle_steers in latcontrol to help fix lane hugging. '
'Enter absolute value here, direction is determined by parameter \'lane_hug_direction\''), 'live': False},
'following_distance': {'default': None, 'allowed_types': [type(None), float], 'description': 'None has no effect, while setting this to a float will let you change the TR (0.9 to 2.7, if set dynamic follow will be disabled)', 'live': False},
'alca_nudge_required': {'default': True, 'allowed_types': [bool], 'description': ('Whether to wait for applied torque to the wheel (nudge) before making lane changes. '
'If False, lane change will occur IMMEDIATELY after signaling'), 'live': False},
'alca_min_speed': {'default': 25.0, 'allowed_types': [float, int], 'description': 'The minimum speed allowed for an automatic lane change (in MPH)', 'live': False},
'static_steer_ratio': {'default': False, 'allowed_types': [bool], 'description': 'Whether you want openpilot to use the steering ratio in interface.py, or the automatically learned steering ratio. If True, it will use the static value in interface.py', 'live': False},
'use_dynamic_lane_speed': {'default': True, 'allowed_types': [bool], 'description': 'Whether you want openpilot to adjust your speed based on surrounding vehicles', 'live': False},
'min_dynamic_lane_speed': {'default': 20.0, 'allowed_types': [float, int], 'description': 'The minimum speed to allow dynamic lane speed to operate (in MPH)', 'live': False}}

self.params = {}
self.params_file = "/data/op_params.json"
self.kegman_file = "/data/kegman.json"
self.last_read_time = time.time()
self.read_frequency = 5.0 # max frequency to read with self.get(...) (sec)
self.force_update = False # replaces values with default params if True, not just add add missing key/value pairs
self.to_delete = ['dynamic_lane_speed', 'longkiV']
self.run_init() # restores, reads, and updates params

def create_id(self): # creates unique identifier to send with sentry errors. please update uniqueID with op_edit.py to your username!
need_id = False
if "uniqueID" not in self.params:
need_id = True
if "uniqueID" in self.params and self.params["uniqueID"] is None:
need_id = True
if need_id:
random_id = ''.join([random.choice(string.ascii_lowercase + string.ascii_uppercase + string.digits) for i in range(15)])
self.params["uniqueID"] = random_id

def add_default_params(self):
prev_params = dict(self.params)
if not travis:
self.create_id()
for key in self.default_params:
if self.force_update:
self.params[key] = self.default_params[key]['default']
elif key not in self.params:
self.params[key] = self.default_params[key]['default']
return prev_params == self.params

def format_default_params(self):
return {key: self.default_params[key]['default'] for key in self.default_params}

def run_init(self): # does first time initializing of default params, and/or restoring from kegman.json
if travis:
self.params = self.format_default_params()
return
self.params = self.format_default_params() # in case any file is corrupted
to_write = False
no_params = False
if os.path.isfile(self.params_file):
self.params, read_status = read_params(self.params_file, self.format_default_params())
if read_status:
to_write = not self.add_default_params() # if new default data has been added
if self.delete_old(): # or if old params have been deleted
to_write = True
else: # don't overwrite corrupted params, just print to screen
print("ERROR: Can't read op_params.json file")
elif os.path.isfile(self.kegman_file):
to_write = True # write no matter what
try:
with open(self.kegman_file, "r") as f: # restore params from kegman
self.params = json.load(f)
self.add_default_params()
except:
print("ERROR: Can't read kegman.json file")
else:
no_params = True # user's first time running a fork with kegman_conf or op_params
if to_write or no_params:
write_params(self.params, self.params_file)

def delete_old(self):
prev_params = self.params
for i in self.to_delete:
if i in self.params:
del self.params[i]
return prev_params == self.params

def put(self, key, value):
self.params.update({key: value})
write_params(self.params, self.params_file)

def get(self, key=None, default=None): # can specify a default value if key doesn't exist
if key is None:
return self.params
if not travis and key in self.default_params and self.default_params[key]['live']: # if is a live param, we want to get updates while openpilot is running
if time.time() - self.last_read_time >= self.read_frequency: # make sure we aren't reading file too often
self.params, read_status = read_params(self.params_file, self.format_default_params())
if not read_status:
time.sleep(0.01)
self.params, read_status = read_params(self.params_file, self.format_default_params()) # if the file was being written to, retry once
self.last_read_time = time.time()

return self.params[key] if key in self.params else default

def delete(self, key):
if key in self.params:
del self.params[key]
write_params(self.params, self.params_file)
68 changes: 68 additions & 0 deletions common/op_tune.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
from common.op_params import opParams
import ast
import time


class opTune:
def __init__(self):
self.op_params = opParams()
self.sleep_time = 1.0
self.start()

def start(self):
print('Welcome to the opParams command line live tuner!')
editable = [p for p in self.op_params.get() if p in self.op_params.default_params and self.op_params.default_params[p]['live']]
while True:
print('Choose a parameter to tune:')
print('\n'.join(['{}. {}'.format(idx + 1, p) for idx, p in enumerate(editable)]))
choice = input('>> ')
if not choice:
print('Exiting opTune!')
break
choice = ast.literal_eval(choice) - 1
if choice not in range(len(editable)):
self.message('Error, not in range!')
continue
self.chosen(editable[choice])

def chosen(self, param):
allowed_types = self.op_params.default_params[param]['allowed_types']
print('\nChosen parameter: {}'.format(param))
print('Current value: {}'.format(self.op_params.get(param)))
print('\n- Description: {}'.format(self.op_params.default_params[param]['description']))
print('- Allowed types: {}\n'.format(', '.join([str(i).split("'")[1] for i in allowed_types])))
while True:
value = input('Enter value: ')
if value == '':
self.message('Exiting this parameter...')
break

status, value = self.parse_input(value)
if not status:
self.message('Cannot parse input!')
continue

if not any([isinstance(value, typ) for typ in allowed_types]):
self.message('The type of data you entered ({}) is not allowed with this parameter!\n'.format(str(type(value)).split("'")[1]))
continue
self.op_params.put(param, value)
print('Saved {} with value: {}! (type: {})\n'.format(param, value, str(type(value)).split("'")[1]))

def message(self, msg):
print('--------\n{}\n--------'.format(msg), flush=True)
time.sleep(self.sleep_time)
print()

def parse_input(self, dat):
dat = dat.replace("'", '"')
try:
dat = ast.literal_eval(dat)
except:
try:
dat = ast.literal_eval('"{}"'.format(dat))
except ValueError:
return False, dat
return True, dat


opTune()
2 changes: 2 additions & 0 deletions common/travis_checker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
from common.basedir import BASEDIR
travis = BASEDIR.strip('/').split('/')[0] != 'data'
Loading