From 0997ef9eb8bd07d01ca6a6547127ed3e2cc4d3f6 Mon Sep 17 00:00:00 2001 From: Gabor Torok Date: Thu, 16 Apr 2026 16:11:52 -0700 Subject: [PATCH] Launch job from the task queue --- app/routers/compute/compute.py | 26 ++++++++++++++++++++++---- app/routers/task/facility_adapter.py | 9 ++++++++- 2 files changed, 30 insertions(+), 5 deletions(-) diff --git a/app/routers/compute/compute.py b/app/routers/compute/compute.py index 71c80b45..0f961c3b 100644 --- a/app/routers/compute/compute.py +++ b/app/routers/compute/compute.py @@ -9,10 +9,12 @@ from ..error_handlers import DEFAULT_RESPONSES from ..iri_meta import iri_meta_dict from ..status.status import router as status_router +from ..task import facility_adapter as task_facility_adapter, models as task_models from . import facility_adapter, models router = iri_router.IriRouter( facility_adapter.FacilityAdapter, + task_facility_adapter.FacilityAdapter, prefix="/compute", tags=["compute"], ) @@ -31,22 +33,38 @@ async def submit_job( job_spec: models.JobSpec, request: Request, user: User = Depends(router.current_user), - _forbid=Depends(forbidExtraQueryParams()), + _forbid=Depends(forbidExtraQueryParams("run_as_task")), + run_as_task: bool|None = False, ): """ Submit a job on a compute resource - **resource**: the name of the compute resource to use - **job_request**: a PSIJ job spec as defined here + - **run_as_task**: return immediately and start job via the task queue. (Returns the task id as the job's id.) This command will attempt to submit a job and return its id. """ # look up the resource (todo: maybe ensure it's available) resource = await status_router.adapter.get_resource(resource_id) - # the handler can use whatever means it wants to submit the job and then fill in its id - # see: https://exaworks.org/psij-python/docs/v/0.9.11/user_guide.html#submitting-jobs - return await router.adapter.submit_job(resource=resource, user=user, job_spec=job_spec) + if run_as_task: + task = await router.task_adapter.put_task( + user=user, + resource=resource, + task=task_models.TaskCommand( + router=router.get_router_name(), + command="submit_job", + args={ + "job_spec": job_spec, + }, + ), + ) + return models.Job(id=task.task_id, status=models.JobStatus(state=models.JobState.NEW), job_spec=job_spec) + else: + # the handler can use whatever means it wants to submit the job and then fill in its id + # see: https://exaworks.org/psij-python/docs/v/0.9.11/user_guide.html#submitting-jobs + return await router.adapter.submit_job(resource=resource, user=user, job_spec=job_spec) @router.put( diff --git a/app/routers/task/facility_adapter.py b/app/routers/task/facility_adapter.py index bec55bca..8fbe6c9d 100644 --- a/app/routers/task/facility_adapter.py +++ b/app/routers/task/facility_adapter.py @@ -3,6 +3,7 @@ from ...types.user import User from . import models as task_models from ..status import models as status_models +from ..compute import models as compute_models, facility_adapter as compute_adapter from ..filesystem import models as filesystem_models, facility_adapter as filesystem_adapter from ..iri_router import AuthenticatedAdapter, IriRouter @@ -46,7 +47,13 @@ def _extractNull(ind): try: r = None logger.info(f"Received task: {task.router}:{task.command} with args: {task.args}") - if task.router == "filesystem": + if task.router == "compute": + c_adapter = IriRouter.create_adapter(task.router, compute_adapter.FacilityAdapter) + if task.command == "submit_job": + data = _extractNull(task.args["job_spec"]) + job_spec = compute_models.JobSpec.model_validate(data) + r = await c_adapter.submit_job(resource=resource, user=user, job_spec=job_spec) + elif task.router == "filesystem": fs_adapter = IriRouter.create_adapter(task.router, filesystem_adapter.FacilityAdapter) if task.command == "chmod": data = _extractNull(task.args["request_model"])