Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
196 changes: 194 additions & 2 deletions keepercli-package/src/keepercli/commands/record_type.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,12 @@
import json
import logging

from keepersdk.vault import record_type_management
from keepersdk.vault import record_type_management, record_types

from . import base
from . import base, record_type_utils
from ..params import KeeperParams
from .. import api
from ..helpers import report_utils

logger = api.get_logger()

Expand Down Expand Up @@ -45,6 +46,7 @@ def execute(self, context: KeeperParams, **kwargs) -> None:
context.vault, title, fields, description, categories
)
logger.info(f"Custom record type '{title}' created successfully with fields: {[f['$ref'] for f in fields]} and recordTypeId: {result.recordTypeId}")
return


class RecordTypeEditCommand(base.ArgparseCommand):
Expand Down Expand Up @@ -92,6 +94,7 @@ def execute(self, context: KeeperParams, **kwargs) -> None:
context.vault, record_type_id, title, fields, description, categories
)
logger.info(f"Custom record type (ID: {record_type_id}) updated successfully with fields: {[f['$ref'] for f in fields]} and recordTypeId: {result.recordTypeId}")
return


class RecordTypeDeleteCommand(base.ArgparseCommand):
Expand All @@ -118,6 +121,195 @@ def execute(self, context: KeeperParams, **kwargs) -> None:

result = record_type_management.delete_custom_record_types(context.vault, record_type_id)
logger.info(f"Custom record type deleted successfully with record type id: {result.recordTypeId}")
return


class RecordTypeInfoCommand(base.ArgparseCommand):

def __init__(self):
self.parser = argparse.ArgumentParser(
prog='record-type-info',
description='Get record type info'
)
RecordTypeInfoCommand.add_arguments_to_parser(self.parser)
super().__init__(self.parser)

def add_arguments_to_parser(parser: argparse.ArgumentParser):
parser.add_argument(
'-lr',
'--list-record-type',
type=str,
dest='record_name',
action='store',
default=None,
const = '*',
nargs='?',
help='list record type by name or use * to list all'
)
parser.add_argument(
'-lf',
'--list-field',
type=str,
dest='field_name',
action='store',
default=None,
help='list field type by name or use * to list all'
)
parser.add_argument(
'-e',
'--example',
dest='example',
action='store_true',
help='Use --example to generate example JSON'
)

def execute(self, context: KeeperParams, **kwargs) -> None:
if not context.vault:
raise ValueError("Vault is not initialized.")

vault = context.vault
example = kwargs.get('example', False)
field_name = kwargs.get('field_name')
record_type_name = kwargs.get('record_name')

if field_name is not None:
headers = ('Field Type ID', 'Lookup', 'Multiple', 'Description')
show_all_fields = field_name.strip() == '' or field_name.strip() == '*'
if show_all_fields:
rows = []
for ft in record_types.FieldTypes.values():
rows.append(record_type_utils.get_field_definitions(ft))
return report_utils.dump_report_data(rows, headers, column_width='auto', fmt='simple')
else:
# Fetch a specific field type
ft = record_types.FieldTypes.get(field_name)
if not ft:
raise ValueError(f"Field type '{field_name}' is not a valid RecordField.")
row = record_type_utils.get_field_definitions(ft)
return report_utils.dump_report_data([row], headers, column_width='auto', fmt='simple')

if record_type_name and record_type_name != '*' and record_type_name != '' and example:
record_type_example = record_type_utils.get_record_type_example(vault, record_type_name)
logger.info(record_type_example)
return

# Record Types
if record_type_name and record_type_name != '*' and record_type_name != '':
#Fetch a specific record type
record_type = vault.vault_data.get_record_type_by_name(record_type_name)
if not record_type:
raise ValueError(f"Record type '{record_type_name}' not found.")

rows = []
fields = record_type.fields
scope = record_type_utils.get_record_type_scope(record_type.scope)
rows.append([
record_type.id,
record_type.name,
scope,
fields[0].label if hasattr(fields[0], 'label') else str(fields[0])
])
for field in fields[1:]:
rows.append(['', '', '', field.label if hasattr(field, 'label') else str(field)])

headers = ('id', 'name', 'scope', 'fields')
return report_utils.dump_report_data(rows, headers, column_width='auto', fmt='simple')
else:
#Show all record types
record_types_list = record_type_utils.get_record_types(vault)
if not record_types_list:
raise ValueError("No record types found.")

rows = []
for rtid, name, scope in record_types_list:
rows.append([rtid, name, scope])

headers = ('Record Type ID', 'Record Type Name', 'Record Type Scope')
return report_utils.dump_report_data(rows, headers, column_width='auto', fmt='simple')


class LoadRecordTypesCommand(base.ArgparseCommand):

def __init__(self):
parser = argparse.ArgumentParser(
prog='load-record-types',
description='Loads custom record types from a JSON file.'
)
parser.add_argument(
'--file',
dest='file',
action='store',
required=True,
help='Path to the JSON file containing the record type definition.'
)
super().__init__(parser)

def execute(self, context: KeeperParams, **kwargs) -> None:
if not context.vault:
raise ValueError("Vault is not initialized.")

filepath = kwargs.get('file')
if not filepath:
raise ValueError("Missing required argument: --file")

count = 0
record_types_list = record_type_utils.validate_record_type_file(filepath)

loaded_record_types = set()
existing_record_types = record_type_utils.get_record_types(context.vault)
if existing_record_types:
for existing_record_type in existing_record_types:
loaded_record_types.add(existing_record_type[1].lower())

for record_type in record_types_list:
record_type_name = record_type.get('record_type_name')
if not record_type_name:
logger.error('Record type name is missing in the record type definition.', record_type)
continue

record_type_name = record_type_name[:30]
if record_type_name.lower() in loaded_record_types:
logger.info(f'Record type "{record_type_name}" already exists. Skipping.')
continue

fields = record_type.get('fields')
if not isinstance(fields, list):
logger.error('Fields must be a list in the record type definition.', record_type)
continue

is_valid = True
add_fields = []
for field in fields:
field_type = field.get('$type')
if field_type not in record_types.RecordFields:
is_valid = False
break
fo = {'$ref': field.get('$type')}
if field.get('required') is True:
fo['required'] = True
add_fields.append(fo)
if not is_valid:
logger.error('Invalid field type in the record type definition.', record_type)
continue

if len(add_fields) == 0:
logger.error('No fields found in the record type definition.', record_type)
continue

record_type_management.create_custom_record_type(
vault=context.vault,
title=record_type_name,
fields=add_fields,
description=record_type.get('description') or '',
categories=record_type.get('categories') or []
)
count += 1

if count != 0:
logger.info(f"Custom record types imported successfully. {count} record types were added.")
else:
logger.info("No custom record types were imported. Record types already exist in the vault or the file is empty.")
return


record_implicit_fields = {
Expand Down
148 changes: 148 additions & 0 deletions keepercli-package/src/keepercli/commands/record_type_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
import json

from keepersdk.vault import vault_online, storage_types, record_types, vault_types
from keepersdk.proto import record_pb2

def get_record_type_example(vault: vault_online.VaultOnline, record_type_name: str) -> str:
STR_VALUE = 'text'

result = ''
rte = {}
record_type = vault.vault_data.get_record_type_by_name(record_type_name)
if record_type:
record_type_fields = record_type.fields
rte = {
'type': record_type_name,
'title': STR_VALUE,
'notes': STR_VALUE,
'fields': [],
'custom': []
}

fields = record_type.fields or []
fields = [x.label for x in fields]
for fname in fields:
ft = get_field_type(fname)

required = next((x.required for x in record_type_fields if x.label == fname), None)
label = next((x.label for x in record_type_fields if x.label == fname), None)

val = {
'type': fname,
'value': [ft.get('value') or ''],
'required': required,
'label': label
}

if fname not in ('fileRef', 'addressRef', 'cardRef'):
if fname == 'phone' and ft and 'sample' in ft and 'region' in ft['sample']:
ft['sample']['region'] = 'US'

rte['fields'].append(val)
else:
raise ValueError(f'No record type found with name {record_type_name}. Use "record-type-info" to list all record types')

result = json.dumps(rte, indent=2) if rte else ''
return result


def get_record_types(vault:vault_online.VaultOnline) -> list[vault_types.RecordType]:
records = [] # (recordTypeId, name, scope)
record_types = vault.vault_data.get_record_types()

if record_types:
for record_type in record_types:
name = record_type.name
scope = get_record_type_scope(record_type.scope)
records.append((record_type.id, name, scope))

return records


def get_field_type(id):
ftypes = [
{**vars(record_types.RecordFields[rkey]), **vars(record_types.FieldTypes[fkey])}
for rkey in record_types.RecordFields
for fkey in record_types.FieldTypes
if record_types.RecordFields[rkey].type == record_types.FieldTypes[fkey].name
]
result = next((ft for ft in ftypes if id.lower() == ft.get('name').lower()), {})
if result:
# Determine value based on whether the id matches a FieldType or RecordField
field_type_obj = next((ft for ft in record_types.FieldTypes.values() if ft.name.lower() == id.lower()), None)

if field_type_obj:
value = getattr(field_type_obj, 'value', None)
else:
value = result.get('type', None)

result = {
'id': result.get('$id') or result.get('name') or '',
'type': result.get('type') or result.get('name') or '',
'value': value,
}
return result


def isEnterpriseRecordType(record_type_id: int) -> bool:
num_rts_per_scope = 1_000_000
enterprise_scope = record_pb2.RT_ENTERPRISE
min_id = num_rts_per_scope * enterprise_scope
max_id = min_id + num_rts_per_scope
is_enterprise_rt = min_id < record_type_id <= max_id
real_type_id = record_type_id % num_rts_per_scope

return is_enterprise_rt, real_type_id


def get_field_definitions(field: record_types.FieldType):
recordfield_names = {rf.name for rf in record_types.RecordFields.values()}
lookup = field.name if field.name in recordfield_names else ""
multiple = (
record_types.RecordFields[field.name].multiple.name
if lookup else "Optional"
)
row = [
field.name,
lookup,
multiple,
field.description
]
return row


scope_map = {
storage_types.RecordTypeScope.Standard: 'Standard',
storage_types.RecordTypeScope.User: 'User',
storage_types.RecordTypeScope.Enterprise: 'Enterprise'
}


def get_record_type_scope(scope: storage_types.RecordTypeScope) -> str:
return scope_map.get(scope, str(scope))


def validate_record_type_file(file_path: str) -> list:
if not file_path:
raise ValueError('File path is required.')

if not file_path.endswith('.json'):
raise ValueError('Record type file must be a JSON file.')

try:
with open(file_path, 'r') as f:
json_obj = json.load(f)
except json.JSONDecodeError as e:
raise ValueError(f'Invalid JSON in record type file: {e}')
except FileNotFoundError:
raise ValueError(f'Record type file not found: {file_path}')

if not isinstance(json_obj, dict):
raise ValueError('Invalid custom record types file')

record_types_list = json_obj.get('record_types')

if not isinstance(record_types_list, list):
raise ValueError('Invalid custom record types list')

return record_types_list
2 changes: 2 additions & 0 deletions keepercli-package/src/keepercli/register_commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ def register_commands(commands: base.CliCommands, scopes: Optional[base.CommandS
commands.register_command('record-type-add', record_type.RecordTypeAddCommand(), base.CommandScope.Vault)
commands.register_command('record-type-edit', record_type.RecordTypeEditCommand(), base.CommandScope.Vault)
commands.register_command('record-type-delete', record_type.RecordTypeDeleteCommand(), base.CommandScope.Vault)
commands.register_command('record-type-info', record_type.RecordTypeInfoCommand(), base.CommandScope.Vault, 'rti')
commands.register_command('load-record-types', record_type.LoadRecordTypesCommand(), base.CommandScope.Vault)


if not scopes or bool(scopes & base.CommandScope.Enterprise):
Expand Down
Loading