Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 22 additions & 4 deletions app/routers/compute/compute.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,12 @@
from ..error_handlers import DEFAULT_RESPONSES
from ..iri_meta import iri_meta_dict
from ..status.status import router as status_router
from ..task import facility_adapter as task_facility_adapter, models as task_models
from . import facility_adapter, models

router = iri_router.IriRouter(
facility_adapter.FacilityAdapter,
task_facility_adapter.FacilityAdapter,
prefix="/compute",
tags=["compute"],
)
Expand All @@ -31,22 +33,38 @@ async def submit_job(
job_spec: models.JobSpec,
request: Request,
user: User = Depends(router.current_user),
_forbid=Depends(forbidExtraQueryParams()),
_forbid=Depends(forbidExtraQueryParams("run_as_task")),
run_as_task: bool|None = False,
):
"""
Submit a job on a compute resource

- **resource**: the name of the compute resource to use
- **job_request**: a PSIJ job spec as defined <a href="https://exaworks.org/psij-python/docs/v/0.9.11/.generated/tree.html#jobspec">here</a>
- **run_as_task**: return immediately and start job via the task queue. (Returns the task id as the job's id.)

This command will attempt to submit a job and return its id.
"""
# look up the resource (todo: maybe ensure it's available)
resource = await status_router.adapter.get_resource(resource_id)

# the handler can use whatever means it wants to submit the job and then fill in its id
# see: https://exaworks.org/psij-python/docs/v/0.9.11/user_guide.html#submitting-jobs
return await router.adapter.submit_job(resource=resource, user=user, job_spec=job_spec)
if run_as_task:
task = await router.task_adapter.put_task(
user=user,
resource=resource,
task=task_models.TaskCommand(
router=router.get_router_name(),
command="submit_job",
args={
"job_spec": job_spec,
},
),
)
return models.Job(id=task.task_id, status=models.JobStatus(state=models.JobState.NEW), job_spec=job_spec)
else:
# the handler can use whatever means it wants to submit the job and then fill in its id
# see: https://exaworks.org/psij-python/docs/v/0.9.11/user_guide.html#submitting-jobs
return await router.adapter.submit_job(resource=resource, user=user, job_spec=job_spec)


@router.put(
Expand Down
9 changes: 8 additions & 1 deletion app/routers/task/facility_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from ...types.user import User
from . import models as task_models
from ..status import models as status_models
from ..compute import models as compute_models, facility_adapter as compute_adapter
from ..filesystem import models as filesystem_models, facility_adapter as filesystem_adapter
from ..iri_router import AuthenticatedAdapter, IriRouter

Expand Down Expand Up @@ -46,7 +47,13 @@ def _extractNull(ind):
try:
r = None
logger.info(f"Received task: {task.router}:{task.command} with args: {task.args}")
if task.router == "filesystem":
if task.router == "compute":
c_adapter = IriRouter.create_adapter(task.router, compute_adapter.FacilityAdapter)
if task.command == "submit_job":
data = _extractNull(task.args["job_spec"])
job_spec = compute_models.JobSpec.model_validate(data)
r = await c_adapter.submit_job(resource=resource, user=user, job_spec=job_spec)
elif task.router == "filesystem":
fs_adapter = IriRouter.create_adapter(task.router, filesystem_adapter.FacilityAdapter)
if task.command == "chmod":
data = _extractNull(task.args["request_model"])
Expand Down
Loading