diff --git a/scripts/README.md b/scripts/README.md new file mode 100644 index 00000000..45fd79ce --- /dev/null +++ b/scripts/README.md @@ -0,0 +1,23 @@ +# Scripts + +This directory contains examples, helper scripts and the console for PSIJ. + +## psij-console + +The console takes an exported JobSpec document and either validates or executes it. + + +``` +usage: psij-consol [-h] [-v] [--debug] {validate,run} ... + +positional arguments: + {validate,run} Subcommands + validate validate JobSpec file + run execute JobSpec file + +optional arguments: + -h, --help show this help message and exit + -v, --verbose print detailed information + --debug print debug information +``` + diff --git a/scripts/SERIALZE.md b/scripts/SERIALZE.md new file mode 100644 index 00000000..d60ce737 --- /dev/null +++ b/scripts/SERIALZE.md @@ -0,0 +1,27 @@ +# Job import and export + +This example is in Python and based on the hello world example from the Quick-Start guide. + +Code snippet for exporting a JobSpec as json: +``` +from psij import Export + +e = Export() +... +job = make_job() +e.export(obj=job.spec , dest="jobSpec.json") +``` + +The command line example below shows how to run and submit an exported job 10 times using slurm. +``` +python ./psij-consol.py run --job-executor slurm --number-of-jobs 10 jobSpec.json +``` + +In addition a job can be imported and submitted using the import functionality of PSIJ: +``` +from psij import Import +i = Import() +job = psij.Job() +spec = i.load(src="jobSpec.json") +job.spec = spec +``` diff --git a/scripts/hello-job.py b/scripts/hello-job.py new file mode 100644 index 00000000..0d2ee5ee --- /dev/null +++ b/scripts/hello-job.py @@ -0,0 +1,23 @@ +import psij + +jex = psij.JobExecutor.get_instance('slurm') + +N=2 # number of jobs to run + +def make_job(i): + job = psij.Job() + spec = psij.JobSpec() + spec.executable = 'echo' + spec.arguments = ['I am number ' , i , ">>" , "hello.txt"] + spec.stdout_path = 'hello.' + str(i) + '.stdout' + job.spec = spec + return job + +jobs = [] +for i in range(N): + job = make_job(i) + jobs.append(job) + jex.submit(job) + +for i in range(N): + jobs[i].wait() \ No newline at end of file diff --git a/scripts/import-export.py b/scripts/import-export.py new file mode 100644 index 00000000..ff8488e1 --- /dev/null +++ b/scripts/import-export.py @@ -0,0 +1,37 @@ +import psij +from psij import Export +from psij import Import + + +jex = psij.JobExecutor.get_instance('local') + +N=1 # number of jobs to run + +def make_job(): + job = psij.Job() + spec = psij.JobSpec() + spec.executable = 'echo Hello World' + spec.arguments = ['10'] + job.spec = spec + return job + + + +# Create Job and export +e = Export() +for i in range(N): + job = make_job() + e.export(obj=job.spec , dest="jobSpec." + str(i) + ".json") + +# Import Job and submit +imp = Import() +jobs = [] +for i in range(N): + job = psij.Job() + spec = imp.load(src="jobSpec." + str(i) + ".json") + job.spec = spec + jobs.append(job) + jex.submit(job) + +for i in range(N): + jobs[i].wait() \ No newline at end of file diff --git a/scripts/psijcli.py b/scripts/psijcli.py new file mode 100644 index 00000000..ac68da68 --- /dev/null +++ b/scripts/psijcli.py @@ -0,0 +1,109 @@ +#! /usr/bin/python3 + +import psij +from psij import JobExecutor +from psij import JobSpec +import sys +import os +import argparse +from psij import Import + + + + +parser = argparse.ArgumentParser(prog='psij-consol') +subparser = parser.add_subparsers(dest="command", help='Subcommands') +validate_parser = subparser.add_parser("validate", help='validate JobSpec file') +validate_parser.add_argument("file", help="JobSpec file") +execute_parser = subparser.add_parser("run", help='execute JobSpec file') +execute_parser.add_argument( "executor", + choices = [ "cobalt", + "local", + "batch-test", + "flux", + "lsf", + "rp", + "saga", + "slurm" + ], + ) +execute_parser.add_argument("file", help="JobSpec file") +execute_parser.add_argument("-n", + "--number-of-jobs", + dest = "jobs", + type = int, + default=1, + help="Number of jobs to submit, default 1" + ) + +parser.add_argument("-v", "--verbose", + dest = "verbose", + default=False, + action='store_true', + help="print detailed information") + +parser.add_argument("--debug", + dest = "debug", + action='store_true', + help="print debug information") + + +# parser.print_help() + +args = parser.parse_args() + +i = Import() + +if args.command == 'validate': + if args.verbose: + print("Validating " + args.file) + job_spec = i.load(args.file) + + if job_spec and isinstance(job_spec, JobSpec): + print("File ok") + else: + sys.exit("Not a valid file, could not import " + args.file) +elif args.command == "run": + + if not args.executor: + sys.exit("Missing argument executor") + + if args.verbose: + print("Importing " + args.file) + job_spec = i.load(args.file) + if not (job_spec and isinstance(job_spec, JobSpec)): + sys.exit("Something wrong with JobSpec") + + # Get job executor + if args.verbose: + print("Initializing job executor") + + jex = None + + try: + jex = psij.JobExecutor.get_instance(args.executor) + except ValueError as err: + sys.exit(f"Panic, {err}") + except BaseException as err: + sys.exit(f"Unexpected: {err}, {type(err)}") + + # Submit jobs + number_of_jobs = args.jobs + if args.verbose: + print("Submitting " + str(number_of_jobs) + " job(s)") + + jobs = [] # list of created jobs + for i in range(number_of_jobs): + job = psij.Job() + job.spec = job_spec + jobs.append(job) + jex.submit(job) + + if args.verbose: + print("Waiting for jobs to finish") + for i in range(number_of_jobs): + jobs[i].wait() +else: + # Should never be here + sys.stderr.write("Missig command. Use --help for more information.\n") + parser.print_help(sys.stderr) \ No newline at end of file diff --git a/setup.py b/setup.py index ecbdd5ab..968b426d 100644 --- a/setup.py +++ b/setup.py @@ -34,7 +34,6 @@ 'psij': ["py.typed"] }, - scripts=[], entry_points={ diff --git a/src/psij/__init__.py b/src/psij/__init__.py index 80fee7ea..38395b94 100644 --- a/src/psij/__init__.py +++ b/src/psij/__init__.py @@ -17,12 +17,13 @@ from .job_status import JobStatus from .launchers.launcher import Launcher from .resource_spec import ResourceSpec, ResourceSpecV1 +from .serialize import Export, Import __all__ = [ 'JobExecutor', 'JobExecutorConfig', 'Job', 'JobStatusCallback', 'JobSpec', 'JobAttributes', 'JobStatus', 'JobState', 'ResourceSpec', 'ResourceSpecV1', 'Launcher', 'SubmitException', - 'InvalidJobException', 'UnreachableStateException' + 'InvalidJobException', 'UnreachableStateException', 'Export', 'Import' ] logger = logging.getLogger(__name__) diff --git a/src/psij/job_spec.py b/src/psij/job_spec.py index 09a72eb9..6d1d7212 100644 --- a/src/psij/job_spec.py +++ b/src/psij/job_spec.py @@ -132,7 +132,10 @@ def to_dict(self) -> Dict[str, Any]: } for k, v in self.attributes.__dict__.items(): if k in ['duration', 'queue_name', 'project_name', 'reservation_id']: - d['attributes'][k] = str(v) + if v: + d['attributes'][k] = str(v) + else: + d['attributes'][k] = v elif k == "_custom_attributes": if v: for ck, cv in v.items(): @@ -147,6 +150,9 @@ def to_dict(self) -> Dict[str, Any]: + " in JobAttributes.custom_attributes for key " + ck + ", skipping\n") + else: + if ck: + d['attributes']['custom_attributes'][ck] = str(cv) else: d['attributes']['custom_attributes'][ck] = cv else: diff --git a/src/psij/serialize.py b/src/psij/serialize.py index b5b68f04..80fe7f5c 100644 --- a/src/psij/serialize.py +++ b/src/psij/serialize.py @@ -92,7 +92,7 @@ def _dict2spec(self, d: Dict[str, Any]) -> object: ja._custom_attributes = attributes['custom_attributes'] spec.attributes = ja - print(spec) + return spec def from_dict(self, hash: Dict[str, Any], target_type: Optional[str] = None) -> object: