Skip to content
360 changes: 360 additions & 0 deletions src/common_upgrades/change_pv_in_dashboard.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,360 @@
import re
from dataclasses import dataclass
from typing import Optional

from src.file_access import FileAccess
from src.local_logger import LocalLogger


@dataclass
class Field:
value: str # Value of the field
full_line: str # The full line defining the field in the db file
comment: list[str] # any following comments


class Record:
"""
Class to contain the information in a single db record.
"""

def __init__(self, lines: list[str], start: int, end: int, _logger: LocalLogger) -> None:
self.type, self.name, self.startline = _get_name(lines[0])
self.fields: dict[str, Field] = _get_fields(lines)
self.info: dict[str, Field] = _get_fields(lines, True)
self.start = start
self.end = end
self.start_comment = _get_comment(lines[1:])
self._logger = _logger

def get_fields(self) -> list[str]:
"""Returns the lines of the db associated with all fields

Returns:
list[str]: the lines of the db associated with all fields
"""
field_lines = []
for field in self.fields.values():
field_lines = field_lines + [field.full_line] + field.comment
return field_lines

def get_info(self) -> list[str]:
"""Returns the lines of the db associated with all info

Returns:
list[str]: the lines of the db associated with all info
"""
info_lines = []
for field in self.info.values():
info_lines = info_lines + [field.full_line] + field.comment
return info_lines

def add_field(self, name: str, val: str, comment: str = "") -> None:
"""Creates a new field with the name, value, and optionally comment.

Fails if an field with the same name already exists.

Args:
name (str): The new field
val (str): The value of the new field
comment (str, optional): A comment to follow the info. Defaults to "".
"""
self._logger.info(f"adding {name} field to {self.name} record with value of {val}.")
if name in self.fields.keys():
self._logger.info(f"{name} already present.")
else:
if comment:
comment = " #" + comment
self.fields[name] = Field(val, f' field({name}, "{val}"){comment}\n', [])

def add_info(self, name: str, val: str, comment: str = "") -> None:
"""Creates a new info with the name, value, and optionally comment.

Fails if an info with the same name already exists.

Args:
name (str): The new info
val (str): The value of the new info
comment (str, optional): A comment to follow the info. Defaults to "".
"""
self._logger.info(f"adding {name} info to {self.name} record with value of {val}.")
if name in self.info.keys():
self._logger.info(f"{name} already present.")
return
else:
if comment:
comment = " #" + comment
self.info[name] = Field(val, f' info({name}, "{val}"){comment}\n', [])

def delete_field(self, name: str) -> None:
"""Deletes a field, May cause loss of following multiline comments

Args:
name (str): The field to remove
"""
self._logger.info(f"changing {name} field of {self.name} record.")
if name in self.fields.keys():
self.fields.pop(name)

def delete_info(self, name: str) -> None:
"""Deletes an info, May cause loss of following multiline comments

Args:
name (str): The info to remove
"""
self._logger.info(f"changing {name} info of {self.name} record.")
if name in self.info.keys():
self.info.pop(name)

def change_field(self, name: str, val: str, comment: str = "") -> None:
"""Update the value and comment of a field.

Args:
name (str): the field to be updated.
val (str): the new value of the field.
comment (str, optional): A comment to follow the field. Defaults to "".
"""
self._logger.info(f"changing {name} field of {self.name} record to {val}.")
if name in self.fields.keys():
line = self.fields[name].full_line
old_val = self.fields[name].value
old_multi_line_comment = self.fields[name].comment
if comment:
comment = " #" + comment
new_line = f"{line.replace(old_val, val).rstrip()}{comment}\n"
self.fields[name] = Field(val, new_line, old_multi_line_comment)
else:
self._logger.error(f"{name} not present.")

def change_info(self, name: str, val: str, comment: str = "") -> None:
"""Update the value and comment of an info field.

Args:
name (str): the info to be updated.
val (str): the new value of the info.
comment (str, optional): A comment to follow the info. Defaults to "".
Trailing multi-line comments are preserved.
"""
self._logger.info(f"changing {name} info of {self.name} record to {val}.")
if name in self.info.keys():
line = self.info[name].full_line
old_val = self.info[name].value
old_multi_line_comment = self.info[name].comment
if comment:
comment = " #" + comment
new_line = f"{line.replace(old_val, val).rstrip()}{comment}\n"
self.info[name] = Field(val, new_line, old_multi_line_comment)
else:
self._logger.error(f"{name} not present.")

def change_name(self, name: str) -> None:
"""Change the name of the record

Args:
name (str): The new name e.g. $(P)CS:DASHBOARD:BANNER:LEFT:VALUE
"""
self._logger.info(f"changing name of {self.name} record.")
self.startline = self.startline.replace(self.name, name)
self.name = name

def change_type(self, type: str) -> None:
"""Change the type of the record

Args:
type (str): the new type i.e. mbbi
"""
self._logger.info(f"changing type of {self.name} record.")
self.startline = self.startline.replace(self.type, type)
self.type = type

def update_record(self, db_file: list[str]) -> list[str]:
"""Method to update the record in the db file based on the record object

Args:
db_file (list[str]): The read in lines of the db file.

Returns:
list[str]: the lines of the db file with the record replaced with the updated version.
"""
self._logger.info(f"Updating {self.name} record.")
before_record = db_file[: self.start]
after_record = db_file[1 + self.end :]

new_db_lines = [self.startline]
new_db_lines = new_db_lines + self.start_comment
new_db_lines = new_db_lines + self.get_fields()
new_db_lines = new_db_lines + self.get_info()
new_db_lines = new_db_lines + ["}\n"]
new_db_lines = before_record + new_db_lines + after_record
return new_db_lines

def delete_record(self, db_file: list[str]) -> list[str]:
"""Method to remove the record object from the db

Args:
db_file (list[str]): The read in lines of the db file.

Returns:
list[str]: The lines of the db file without the record.
"""
self._logger.info(f"Removing {self.name} record.")
before_record = db_file[: self.start]
after_record = db_file[1 + self.end :]
return before_record + after_record


class ChangePvInDashboard:
def __init__(self, file_access: FileAccess, logger: LocalLogger) -> None:
"""Initialise.

Args:
file_access: Object to allow for file access.
logger: Logger to use.
"""
self._file_access = file_access
self._logger = logger

def read_file(self) -> list[str]:
"""Reads the dashboard.db into memory

Returns:
list[str]: list containing each line in dashboard.db
"""
return self._file_access.read_dashboard_file()

def write_file(self, db_lines: list[str]) -> None:
"""writes db lines back into the file

Args:
db_lines: list[str] - the lines to write to the fule
"""
return self._file_access.write_dashboard_file(db_lines)

def get_record(self, record_name: str, db_file: list[str]) -> Optional[Record]:
"""Given a record name generate a record object.

Args:
record_name (str): The name of the record to find
(e.g. $(P)CS:DASHBOARD:BANNER:LEFT:VALUE)
db_file (list[str]): The loaded in lines to check through

Returns:
Optional[Record]: A record object containing the information of the record,
any comments inside it. or None if the record is not present or doesn't properly close.

"""
self._logger.info(f"Getting {record_name} record.")
name = re.escape(record_name)
for i in range(0, len(db_file)):
if re.match(rf"record\(.+, [\"\']{name}[\"\']\) {{", db_file[i]):
end = _get_end_of_record(db_file=db_file, line_number=i)
if end is None:
self._logger.error("Record not properly terminated.")
return None
else:
return Record(db_file[i:end], i, end, self._logger)
self._logger.error("Record does not exist.")
return None


def _get_end_of_record(db_file: list[str], line_number: int) -> int | None:
Comment thread Fixed
"""Given the first line of a record, return its end

Args:
db_file (list[str]): the list of strings loaded in from db
line_number (int): The start of the record

Returns:
int | None: the line number of the closing } of the record or none
if the record does not terminate properly.
"""
for i in range(line_number, len(db_file)):
if re.match(r"^([^#]|(\$\(.*=.*#.*\)))*}.*$", db_file[i]):
Comment thread Fixed
Comment thread Fixed
Comment thread Fixed
Comment thread Fixed
Comment thread
github-advanced-security[bot] marked this conversation as resolved.
Fixed
Comment thread Fixed
Comment thread Fixed
Comment thread Fixed
Comment thread Fixed
Comment thread Fixed
Comment thread Fixed
Comment thread Fixed
Comment thread Dismissed
Comment thread Dismissed
Comment thread Dismissed
Comment thread Dismissed
return i
return None


def _get_fields(lines: list[str], info: bool = False) -> dict[str, Field]:
"""Takes a list of strings and extracts fields or info

Args:
lines (list[str]): The line to check (usually the lines of a record)
info (bool, optional): Whether to search for fields or info

Returns:
dict[str, Field]: returns a dictionary
where the keys are the name/type of the fields,
and the value is a Field dataclass containing the value,
the full line string, and a list
of any following multi-line comments.
"""

field_dict = {}
if info:
search = "info"
else:
search = "field"

for index, line in enumerate(lines):
if f"{search}(" in line:
# find strings of the form field(type, "val") or info(type, "val")
match = re.match(rf".*{search}\((.*), \"(.*)\"\).*", line)
# If a match is found
if match is not None:
# split out the value, as well as keeping a record of the whole string.
value = str(match.group(2))
full_line = f"{match.group(0)}\n"
# if this is not the last line of the record then also get any trailing comments
if index + 1 < len(lines):
field = Field(
value,
full_line,
_get_comment(lines[index + 1 :]),
)
# otherwise store as is.
else:
field = Field(value, full_line, [])
# Store in a dict access using the field type (which should be a unique key)
field_dict[str(match.group(1))] = field
return field_dict


def _get_name(line: str) -> tuple[str, str, str]:
"""Get the name, type, and full first line of a record.

Args:
line (str): the line to check

Returns:
tuple[str, str, str]: a tuple containing the name, type,
and full string of a record definition, or None if fails regex
"""
match = re.match(r".*record\((.*), \"(.*)\"\).*", line)
if match is None:
return ("", "", "")
return match.group(1), match.group(2), f"{match.group(0)}\n"


def _get_comment(lines: list[str]) -> list[str]:
"""Get a whole line comment

Checks for whole line comments or multi-line comments.

Args:
lines (list[str]): The lines to check (usually the entire record on from a starting point)

Returns:
list[str]: A list of consecutive comments i.e.
['#this comment \n' '#is on\n' ' #multiple lines']
"""
multi_line_comment = []
i = 0
match = re.match(r"(\s*#.*)", lines[i])
while match is not None:
multi_line_comment.append(f"{match.group(0)}\n")
i = i + 1
if i < len(lines):
match = re.match(r"(\s*#.*)", lines[i])
else:
match = None
return multi_line_comment
1 change: 1 addition & 0 deletions src/common_upgrades/utils/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
BLOCK_FILE = "blocks.xml"
DEVICE_SCREEN_FILE = "screens.xml"
GLOBALS_FILENAME = os.path.abspath(os.path.join(CONFIG_ROOT, "globals.txt"))
DASHBOARD_DB_FILENAME = os.path.abspath(os.path.join(CONFIG_ROOT, "dashboard.db"))

MOTION_SET_POINTS_FOLDER = os.path.abspath(os.path.join(CONFIG_ROOT, "motionSetPoints"))

Expand Down
11 changes: 11 additions & 0 deletions src/file_access.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from src.common_upgrades.utils.constants import (
COMPONENT_FOLDER,
CONFIG_FOLDER,
DASHBOARD_DB_FILENAME,
DEVICE_SCREEN_FILE,
DEVICE_SCREENS_FOLDER,
SYNOPTIC_FOLDER,
Expand Down Expand Up @@ -237,6 +238,16 @@ def get_file_paths(self, directory: str, extension: str = ""):
if extension is None or file.endswith(extension):
yield os.path.join(root, file)

def read_dashboard_file(self):
with open(DASHBOARD_DB_FILENAME) as db_file:
self._logger.info(f"Reading {DASHBOARD_DB_FILENAME} file")
return db_file.readlines()

def write_dashboard_file(self, db_lines: list[str]):
with open(DASHBOARD_DB_FILENAME, "w") as db_file:
self._logger.info(f"Writing {DASHBOARD_DB_FILENAME} file")
db_file.writelines(db_lines)


class CachingFileAccess(object):
"""Context that uses the given file access object but does not actually write to file until the context is left
Expand Down
Loading
Loading