diff --git a/juju/constraints.py b/juju/constraints.py index 3315feadc..44ee5fbe1 100644 --- a/juju/constraints.py +++ b/juju/constraints.py @@ -19,6 +19,9 @@ # import re +from typing import Dict, List, Optional, TypedDict, Union +from typing_extensions import Required, NotRequired + # Matches on a string specifying memory size MEM = re.compile('^[1-9][0-9]*[MGTP]$') @@ -52,7 +55,8 @@ "spaces", "virt_type", "zones", - "allocate_public_ip"] + "allocate_public_ip", +] LIST_KEYS = {'tags', 'spaces', 'zones'} @@ -60,7 +64,28 @@ SNAKE2 = re.compile('([a-z0-9])([A-Z])') -def parse(constraints): +ParsedValue = Union[int, bool, str] + + +class ConstraintsDict(TypedDict, total=False): + allocate_public_ip: ParsedValue + arch: ParsedValue + container: ParsedValue + cores: ParsedValue + cpu_cores: ParsedValue + cpu_power: ParsedValue + instance_role: ParsedValue + instance_type: ParsedValue + mem: ParsedValue + root_disk: ParsedValue + root_dist_source: ParsedValue + spaces: List[ParsedValue] + tags: List[ParsedValue] + virt_type: ParsedValue + zones: List[ParsedValue] + + +def parse(constraints: Union[str, ConstraintsDict]) -> Optional[ConstraintsDict]: """ Constraints must be expressed as a string containing only spaces and key value pairs joined by an '='. @@ -69,23 +94,26 @@ def parse(constraints): if not constraints: return None - if type(constraints) is dict: + if isinstance(constraints, dict): # Fowards compatibilty: already parsed return constraints - normalized_constraints = {} + normalized_constraints: ConstraintsDict = {} for s in constraints.split(" "): if "=" not in s: - raise Exception("malformed constraint %s" % s) + raise ValueError("malformed constraint %s" % s) k, v = s.split("=") - normalized_constraints[normalize_key(k)] = normalize_list_value(v) if\ - k in LIST_KEYS else normalize_value(v) + normalized_constraints[normalize_key(k)] = ( + normalize_list_value(v) + if k in LIST_KEYS + else normalize_value(v) + ) return normalized_constraints -def normalize_key(orig_key): +def normalize_key(orig_key: str) -> str: key = orig_key.strip() key = key.replace("-", "_") # Our _client lib wants "_" in place of "-" @@ -95,11 +123,11 @@ def normalize_key(orig_key): key = SNAKE2.sub(r'\1_\2', key).lower() if key not in SUPPORTED_KEYS: - raise Exception("unknown constraint in %s" % orig_key) + raise ValueError("unknown constraint in %s" % orig_key) return key -def normalize_value(value): +def normalize_value(value: str) -> Union[int, bool, str]: value = value.strip() if MEM.match(value): @@ -117,22 +145,43 @@ def normalize_value(value): return value -def normalize_list_value(value): +def normalize_list_value(value: str) -> List[ParsedValue]: values = value.strip().split(',') return [normalize_value(value) for value in values] STORAGE = re.compile( - '(?:(?:^|(?<=,))(?:|(?P[a-zA-Z]+[-?a-zA-Z0-9]*)|(?P-?[0-9]+)|(?:(?P-?[0-9]+(?:\\.[0-9]+)?)(?P[MGTPEZY])(?:i?B)?))(?:$|,))') - - -def parse_storage_constraint(constraint): - storage = {'count': 1} + # original regex: + # '(?:(?:^|(?<=,))(?:|(?P[a-zA-Z]+[-?a-zA-Z0-9]*)|(?P-?[0-9]+)|(?:(?P-?[0-9]+(?:\\.[0-9]+)?)(?P[MGTPEZY])(?:i?B)?))(?:$|,))' + # with formatting and explanation -- note that this regex is used with re.finditer: + '(?:' + '(?:^|(?<=,))' # start of string or previous match ends with ',' + '(?:' # match one of the following: + '|(?P[a-zA-Z]+[-?a-zA-Z0-9]*)' # * pool: a sequence starting with a letter, ending with a letter or number, + # ------- and including letters, numbers and hyphens (no more than one in a row) + '|(?P-?[0-9]+)' # * count: an optional minus sign followed by one or more digits + '|(?:' # * size (number) and size_exp (units): + '(?P-?[0-9]+(?:\\.[0-9]+)?)' # -- * an optional minus sign followed by one or more digits, optionally with decimal point and more digits + '(?P[MGTPEZY])(?:i?B)?)' # -- * one of MGTPEZY, optionally followed by iB or B, for example 1M or 2.0MB or -3.3MiB + ')' + '(?:$|,)' # end of string or ',' + ')' +) + + +class StorageConstraintDict(TypedDict): + count: Required[int] # >= 1 + pool: NotRequired[str] + size: NotRequired[int] + + +def parse_storage_constraint(constraint: str) -> StorageConstraintDict: + storage: StorageConstraintDict = {'count': 1} for m in STORAGE.finditer(constraint): pool = m.group('pool') if pool: if 'pool' in storage: - raise Exception("pool already specified") + raise ValueError("pool already specified") storage['pool'] = pool count = m.group('count') if count: @@ -145,15 +194,35 @@ def parse_storage_constraint(constraint): DEVICE = re.compile( - '^(?P[0-9]+)?(?:^|,)(?P[^,]+)(?:$|,(?!$))(?P(?:[^=]+=[^;]+)+)*$') + # original regex: + # '^(?P[0-9]+)?(?:^|,)(?P[^,]+)(?:$|,(?!$))(?P(?:[^=]+=[^;]+)+)*$' + # with formatting and explanation -- note this regex is used with re.match: + '^' # start of string + '(?P[0-9]+)?' # count is 1+ digits, and is optional + '(?:^|,)' # match start of string or a comma + # -- so type can be at the start or comma separated from count + '(?P[^,]+)' # type is 1+ anything not a comma (including digits), and is required + '(?:$|,(?!$))' # match end of string | or a non-trailing comma + # -- so type can be at the end or followed by attrs + '(?P(?:[^=]+=[^;]+)+)*' # attrs is any number of semicolon separated key=value items + # -- value can have spare '=' inside, possible not intended + # -- attrs will be matched with ATTR.finditer afterwards in parse_device_constraint + '$' # end of string +) ATTR = re.compile(';?(?P[^=]+)=(?P[^;]+)') -def parse_device_constraint(constraint): +class DeviceConstraintDict(TypedDict): + count: Required[int] + type: Required[str] + attributes: NotRequired[Dict[str, str]] + + +def parse_device_constraint(constraint: str) -> DeviceConstraintDict: m = DEVICE.match(constraint) if m is None: - raise Exception("device constraint does not match") - device = {} + raise ValueError("device constraint does not match") + device: DeviceConstraintDict = {} count = m.group('count') if count: count = int(count)