Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
111 changes: 90 additions & 21 deletions juju/constraints.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@
#

import re
from typing import Dict, List, Optional, TypedDict, Union
from typing_extensions import Required, NotRequired


# Matches on a string specifying memory size
MEM = re.compile('^[1-9][0-9]*[MGTP]$')
Expand Down Expand Up @@ -52,15 +55,37 @@
"spaces",
"virt_type",
"zones",
"allocate_public_ip"]
"allocate_public_ip",
]

LIST_KEYS = {'tags', 'spaces', 'zones'}

SNAKE1 = re.compile(r'(.)([A-Z][a-z]+)')
SNAKE2 = re.compile('([a-z0-9])([A-Z])')


def parse(constraints):
ParsedValue = Union[int, bool, str]


class ConstraintsDict(TypedDict, total=False):
allocate_public_ip: ParsedValue
arch: ParsedValue
container: ParsedValue
cores: ParsedValue
cpu_cores: ParsedValue
cpu_power: ParsedValue
instance_role: ParsedValue
instance_type: ParsedValue
mem: ParsedValue
root_disk: ParsedValue
root_dist_source: ParsedValue
spaces: List[ParsedValue]
tags: List[ParsedValue]
virt_type: ParsedValue
zones: List[ParsedValue]


def parse(constraints: Union[str, ConstraintsDict]) -> Optional[ConstraintsDict]:
"""
Constraints must be expressed as a string containing only spaces
and key value pairs joined by an '='.
Expand All @@ -69,23 +94,26 @@ def parse(constraints):
if not constraints:
return None

if type(constraints) is dict:
if isinstance(constraints, dict):
# Fowards compatibilty: already parsed
return constraints

normalized_constraints = {}
normalized_constraints: ConstraintsDict = {}
for s in constraints.split(" "):
if "=" not in s:
raise Exception("malformed constraint %s" % s)
raise ValueError("malformed constraint %s" % s)

k, v = s.split("=")
normalized_constraints[normalize_key(k)] = normalize_list_value(v) if\
k in LIST_KEYS else normalize_value(v)
normalized_constraints[normalize_key(k)] = (
normalize_list_value(v)
if k in LIST_KEYS
else normalize_value(v)
)

return normalized_constraints


def normalize_key(orig_key):
def normalize_key(orig_key: str) -> str:
key = orig_key.strip()

key = key.replace("-", "_") # Our _client lib wants "_" in place of "-"
Expand All @@ -95,11 +123,11 @@ def normalize_key(orig_key):
key = SNAKE2.sub(r'\1_\2', key).lower()

if key not in SUPPORTED_KEYS:
raise Exception("unknown constraint in %s" % orig_key)
raise ValueError("unknown constraint in %s" % orig_key)
return key


def normalize_value(value):
def normalize_value(value: str) -> Union[int, bool, str]:
value = value.strip()

if MEM.match(value):
Expand All @@ -117,22 +145,43 @@ def normalize_value(value):
return value


def normalize_list_value(value):
def normalize_list_value(value: str) -> List[ParsedValue]:
values = value.strip().split(',')
return [normalize_value(value) for value in values]


STORAGE = re.compile(
'(?:(?:^|(?<=,))(?:|(?P<pool>[a-zA-Z]+[-?a-zA-Z0-9]*)|(?P<count>-?[0-9]+)|(?:(?P<size>-?[0-9]+(?:\\.[0-9]+)?)(?P<size_exp>[MGTPEZY])(?:i?B)?))(?:$|,))')


def parse_storage_constraint(constraint):
storage = {'count': 1}
# original regex:
# '(?:(?:^|(?<=,))(?:|(?P<pool>[a-zA-Z]+[-?a-zA-Z0-9]*)|(?P<count>-?[0-9]+)|(?:(?P<size>-?[0-9]+(?:\\.[0-9]+)?)(?P<size_exp>[MGTPEZY])(?:i?B)?))(?:$|,))'
# with formatting and explanation -- note that this regex is used with re.finditer:
'(?:'
'(?:^|(?<=,))' # start of string or previous match ends with ','
'(?:' # match one of the following:
'|(?P<pool>[a-zA-Z]+[-?a-zA-Z0-9]*)' # * pool: a sequence starting with a letter, ending with a letter or number,
# ------- and including letters, numbers and hyphens (no more than one in a row)
'|(?P<count>-?[0-9]+)' # * count: an optional minus sign followed by one or more digits
'|(?:' # * size (number) and size_exp (units):
'(?P<size>-?[0-9]+(?:\\.[0-9]+)?)' # -- * an optional minus sign followed by one or more digits, optionally with decimal point and more digits
'(?P<size_exp>[MGTPEZY])(?:i?B)?)' # -- * one of MGTPEZY, optionally followed by iB or B, for example 1M or 2.0MB or -3.3MiB
')'
'(?:$|,)' # end of string or ','
')'
)


class StorageConstraintDict(TypedDict):
count: Required[int] # >= 1
pool: NotRequired[str]
size: NotRequired[int]


def parse_storage_constraint(constraint: str) -> StorageConstraintDict:
storage: StorageConstraintDict = {'count': 1}
for m in STORAGE.finditer(constraint):
pool = m.group('pool')
if pool:
if 'pool' in storage:
raise Exception("pool already specified")
raise ValueError("pool already specified")
storage['pool'] = pool
count = m.group('count')
if count:
Expand All @@ -145,15 +194,35 @@ def parse_storage_constraint(constraint):


DEVICE = re.compile(
'^(?P<count>[0-9]+)?(?:^|,)(?P<type>[^,]+)(?:$|,(?!$))(?P<attrs>(?:[^=]+=[^;]+)+)*$')
# original regex:
# '^(?P<count>[0-9]+)?(?:^|,)(?P<type>[^,]+)(?:$|,(?!$))(?P<attrs>(?:[^=]+=[^;]+)+)*$'
# with formatting and explanation -- note this regex is used with re.match:
'^' # start of string
'(?P<count>[0-9]+)?' # count is 1+ digits, and is optional
'(?:^|,)' # match start of string or a comma
# -- so type can be at the start or comma separated from count
'(?P<type>[^,]+)' # type is 1+ anything not a comma (including digits), and is required
'(?:$|,(?!$))' # match end of string | or a non-trailing comma
# -- so type can be at the end or followed by attrs
'(?P<attrs>(?:[^=]+=[^;]+)+)*' # attrs is any number of semicolon separated key=value items
# -- value can have spare '=' inside, possible not intended
# -- attrs will be matched with ATTR.finditer afterwards in parse_device_constraint
'$' # end of string
)
ATTR = re.compile(';?(?P<key>[^=]+)=(?P<value>[^;]+)')


def parse_device_constraint(constraint):
class DeviceConstraintDict(TypedDict):
count: Required[int]
type: Required[str]
attributes: NotRequired[Dict[str, str]]


def parse_device_constraint(constraint: str) -> DeviceConstraintDict:
m = DEVICE.match(constraint)
if m is None:
raise Exception("device constraint does not match")
device = {}
raise ValueError("device constraint does not match")
device: DeviceConstraintDict = {}
count = m.group('count')
if count:
count = int(count)
Expand Down