diff --git a/src/psij/job_spec.py b/src/psij/job_spec.py index c0c61514..5cec2868 100644 --- a/src/psij/job_spec.py +++ b/src/psij/job_spec.py @@ -1,12 +1,14 @@ +import sys from pathlib import Path -from typing import Optional, List, Dict - +from typing import Optional, List, Dict, Any from psij.job_attributes import JobAttributes from psij.resource_spec import ResourceSpec +from psij.utils import path_object_to_full_path as o2p + class JobSpec(object): - """A class to hold information about the characteristics of a :class:`~psij.Job`.""" + """A class to hold information about the characteristics of a:class:`~psij.Job`.""" def __init__(self, name: Optional[str] = None, executable: Optional[str] = None, arguments: Optional[List[str]] = None, directory: Optional[Path] = None, @@ -18,38 +20,38 @@ def __init__(self, name: Optional[str] = None, executable: Optional[str] = None, """ Constructs a `JobSpec` object while allowing its properties to be initialized. - :param name: A name for the job. The name plays no functional role except that - :class:`~psij.JobExecutor` implementations may attempt to use the name to label the + :param name: A name for the job. The name plays no functional role except that + :class:`~psij.JobExecutor` implementations may attempt to use the name to label the job as presented by the underlying implementation. - :param executable: An executable, such as "/bin/date". - :param arguments: The argument list to be passed to the executable. Unlike with execve(), + :param executable: An executable, such as "/bin/date". + :param arguments: The argument list to be passed to the executable. Unlike with execve(), the first element of the list will correspond to `argv[1]` when accessed by the invoked executable. - :param directory: The directory, on the compute side, in which the executable is to be run - :param inherit_environment: If this flag is set to `False`, the job starts with an empty + :param directory: The directory, on the compute side, in which the executable is to be run + :param inherit_environment: If this flag is set to `False`, the job starts with an empty environment. The only environment variables that will be accessible to the job are the ones specified by this property. If this flag is set to `True`, which is the default, the job will also have access to variables inherited from the environment in which the job is run. - :param environment: A mapping of environment variable names to their respective values. - :param stdin_path: Path to a file whose contents will be sent to the job's standard input. - :param stdout_path: A path to a file in which to place the standard output stream of the + :param environment: A mapping of environment variable names to their respective values. + :param stdin_path: Path to a file whose contents will be sent to the job's standard input. + :param stdout_path: A path to a file in which to place the standard output stream of the job. - :param stderr_path: A path to a file in which to place the standard error stream of the job. - :param resources: The resource requirements specify the details of how the job is to be run + :param stderr_path: A path to a file in which to place the standard error stream of the job. + :param resources: The resource requirements specify the details of how the job is to be run on a cluster, such as the number and type of compute nodes used, etc. - :param attributes: Job attributes are details about the job, such as the walltime, that are + :param attributes: Job attributes are details about the job, such as the walltime, that are descriptive of how the job behaves. Attributes are, in principle, non-essential in that the job could run even though no attributes are specified. In practice, specifying a walltime is often necessary to prevent LRMs from prematurely terminating a job. - :param pre_launch: An optional path to a pre-launch script. The pre-launch script is + :param pre_launch: An optional path to a pre-launch script. The pre-launch script is sourced before the launcher is invoked. It, therefore, runs on the service node of the job rather than on all of the compute nodes allocated to the job. - :param post_launch: An optional path to a post-launch script. The post-launch script is + :param post_launch: An optional path to a post-launch script. The post-launch script is sourced after all the ranks of the job executable complete and is sourced on the same node as the pre-launch script. - :param launcher: The name of a launcher to use, such as "mpirun", "srun", "single", etc. - For a list of available launchers, :ref:`launchers` + :param launcher: The name of a launcher to use, such as "mpirun", "srun", "single", etc. + For a list of available launchers,:ref:`launchers` """ self._name = name self.executable = executable @@ -77,3 +79,80 @@ def name(self) -> Optional[str]: return self.executable else: return self._name + + @property + def _init_job_spec_dict(self) -> Dict[str, Any]: + """Returns jobspec structure as dict""" + + # convention: + # - if expected value is a string then the dict is initialized with an empty string + # - if the expected value is an object than the key is initialzied with None + + job_spec: Dict[str, Any] + job_spec = { + 'name': '', + 'executable': '', + 'arguments': [], + 'directory': None, + 'inherit_environment': True, + 'environment': {}, + 'stdin_path': None, + 'stdout_path': None, + 'stderr_path': None, + 'resources': None, + 'attributes': None + } + + return job_spec + + @property + def to_dict(self) -> Dict[str, Any]: + + d = self._init_job_spec_dict + + # Map properties to keys + d['name'] = self.name + d['executable'] = self.executable + d['arguments'] = self.arguments + d['directory'] = o2p(self.directory) + d['inherit_environment'] = self.inherit_environment + d['environment'] = self.environment + d['stdin_path'] = o2p(self.stdin_path) + d['stdout_path'] = o2p(self.stdout_path) + d['stderr_path'] = o2p(self.stderr_path) + d['resources'] = self.resources + + # Handle attributes property + if self.attributes: + d['attributes'] = { + 'duration': '', + 'queue_name': '', + 'project_name': '', + 'reservation_id': '', + 'custom_attributes': {}, + } + for k, v in self.attributes.__dict__.items(): + if k in ['duration', 'queue_name', 'project_name', 'reservation_id']: + d['attributes'][k] = str(v) + elif k == "_custom_attributes": + if v: + for ck, cv in v.items(): + if not type(cv).__name__ in ['str', + 'list', + 'dict', + 'NoneType', + 'bool', + 'int']: + sys.stderr.write("Unsupported type " + + type(cv).__name__ + + " in JobAttributes.custom_attributes for key " + + ck + + ", skipping\n") + else: + d['attributes']['custom_attributes'][ck] = cv + else: + sys.stderr.write("Unsupported attribute " + k + ", skipping attribute\n") + else: + d['attributes'] = None + + return d diff --git a/src/psij/serialize.py b/src/psij/serialize.py new file mode 100644 index 00000000..c562d9a7 --- /dev/null +++ b/src/psij/serialize.py @@ -0,0 +1,114 @@ +from pathlib import Path +from typing import Optional, Dict, Any +from psij.job_spec import JobSpec +from psij.job_attributes import JobAttributes +import sys +import json + + +class Export(object): + + """A class for exporting psij data types.""" + + def __init__(self) -> None: + self.version = '' + self.name = '' + + def envelope(self, type: Optional[str] = None) -> Dict[str, Any]: + + envelope: Dict[str, Any] + + envelope = { + 'version': 0.1, + 'type': type, + 'data': None + } + + return envelope + + def to_dict(self, obj: object) -> Dict[str, Any]: + + new_dict = {} + + if isinstance(obj, JobSpec): + new_dict = obj.to_dict + else: + sys.exit("Can't create dict, type " + type(obj).__name__ + " not supported") + + return new_dict + + def export(self, obj: Optional[object] = None, dest: Optional[str] = None) -> bool: + + if not dest: + sys.exit("Cannot export, missing destinstion file") + if not obj: + sys.exit("Cannot export, missing object") + + source_type = type(obj).__name__ + d = self.to_dict(obj) + + envelope = self.envelope(type=source_type) + envelope['data'] = d + + with open(dest, 'w', encoding='utf-8') as f: + json.dump(envelope, f, ensure_ascii=False, indent=4) + + return True + + +class Import(): + + def _dict2spec(self, d: Dict[str, Any]) -> object: + + # Initial spec object + spec = JobSpec() + + # Map properties to keys + spec._name = d['name'] if 'name' in d else d['_name'] + spec.executable = d['executable'] + spec.arguments = d['arguments'] + + spec.directory = Path(d['directory']) if ('directory' in d) and d['directory'] else None + spec.inherit_environment = d['inherit_environment'] + spec.environment = d['environment'] + spec.stdin_path = Path(d['stdin_path']) if ( + 'stdin_path' in d) and d['stdin_path'] else None + spec.stdout_path = Path(d['stdout_path']) if ( + 'stdout_path' in d) and d['stdout_path'] else None + spec.stderr_path = Path(d['stderr_path']) if ( + 'stderr_path' in d) and d['stderr_path'] else None + spec.resources = d['resources'] + + # Handle attributes property + if d['attributes']: + ja = JobAttributes() + + attributes = d['attributes'] + ja.duration = attributes['duration'] + ja.queue_name = attributes['queue_name'] + ja.reservation_id = attributes['reservation_id'] + ja._custom_attributes = attributes['custom_attributes'] + + spec.attributes = ja + print(spec) + return spec + + def from_dict(self, hash: Dict[str, Any], target_type: Optional[str] = None) -> object: + + if target_type == "JobSpec": + return(self._dict2spec(hash)) + else: + sys.exit("Can't create dict, type " + str(target_type) + " not supported") + + def load(self, src: Optional[str] = None) -> object: + + if not src: + sys.exit("Cannot import, missing source file") + + envelope = None + with open(src, 'r', encoding='utf-8') as f: + envelope = json.load(f) + + obj = self.from_dict(envelope['data'], target_type=envelope['type']) + + return obj diff --git a/src/psij/utils.py b/src/psij/utils.py new file mode 100644 index 00000000..f2e1a4ea --- /dev/null +++ b/src/psij/utils.py @@ -0,0 +1,17 @@ +from pathlib import Path +from typing import Optional +import sys + + +def path_object_to_full_path(obj: Optional[object]) -> Optional[str]: + p = None + if obj: + if isinstance(obj, str): + p = obj + elif isinstance(obj, Path): + p = obj.as_posix() + else: + print(type(obj)) + sys.exit("This type " + type(obj).__name__ + + " for a path is not supported, use pathlib instead") + return p