Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
repos:
- repo: https://github.com/psf/black
rev: 21.5b1
rev: 22.3.0
hooks:
- id: black
language_version: python3
Expand Down
38 changes: 38 additions & 0 deletions docs/source/user-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,44 @@ $ kbatch job show list-files-jfprp

With `kbatch job logs` you can get the logs for a job. Make sure to pass the container id.

## Submit a cronjob

If you'd like your job to run on a repeating schedule, you can leverage CronJobs. The command line interface for `kbatch cronjob` is same as `kbatch job` with the added requirement that you specify a schedule when you `submit` a cronjob:

```{code-block} console
$ kbatch cronjob submit \
--name=list-files \
--image=alpine \
--command='["ls", "-lh"]'
--schedule='0 22 * * 1-5'
```

This job will now run at 22:00 on every day-of-week from Monday through Friday **indefinitely**.

You can list schedule cronjobs:

```
$ kbatch cronjob list -o table
┏━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━┓
┃ cronjob name ┃ started ┃ schedule ┃
┡━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━┩
│ mycj-cron-kb7d6 │ 2022-05-31T05:27:25+00:00 │ */5 * * * * │
│ list-files-cron-56whl │ 2022-06-01T23:35:20+00:00 │ 0 22 * * 1-5 │
└───────────────────────┴───────────────────────────┴──────────────┘
```


The only way to remove a cronjob is to explicitly delete it:

```
kbatch cronjob delete mycj-cron-kb7d6
```

### Schedule syntax

For those familiar with Linux cron jobs, the schedule syntax is the same. For those unfamiliar, have a read through the [Kubernetes CronJob - cron schedule syntax](https://kubernetes.io/docs/concepts/workloads/controllers/cron-jobs/#cron-schedule-syntax). The website [crontab.guru](https://crontab.guru/#0_22_*_*_1-5) is a nifty tool that tries to translate the schedule syntax into "plain" English.


## Submitting code files

Your job likely relies on some local code files to function. Perhaps this is a notebook, shell script, or utility library that wouldn't be present in your container image. You can use the `-c` or `--code` option to provide a single file or list of files that will be made available before your job starts running.
Expand Down
276 changes: 180 additions & 96 deletions kbatch-proxy/kbatch_proxy/main.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
import os
import logging
from typing import List, Optional, Tuple, Dict
from typing import List, Optional, Tuple, Dict, Union

import yaml
from pydantic import BaseModel, BaseSettings
import jupyterhub.services.auth
from fastapi import Depends, FastAPI, HTTPException, Request, status, APIRouter
import kubernetes.client
import kubernetes.client.models
from kubernetes.client.models import V1CronJob, V1Job, V1ConfigMap
import kubernetes.config
import kubernetes.watch
import rich.traceback
Expand Down Expand Up @@ -88,9 +88,7 @@ class UserOut(BaseModel):
job_template = yaml.safe_load(f)

# parse with Kubernetes to normalize keys with job_data
job_template = utils.parse(
job_template, model=kubernetes.client.models.V1Job
).to_dict()
job_template = utils.parse(job_template, model=V1Job).to_dict()
utils.remove_nulls(job_template)

else:
Expand Down Expand Up @@ -155,115 +153,51 @@ def get_k8s_api() -> Tuple[kubernetes.client.CoreV1Api, kubernetes.client.BatchV
# ----------------------------------------------------------------------------
# app

# cronjobs #
@router.get("/cronjobs/{job_name}")
async def read_cronjob(job_name: str, user: User = Depends(get_current_user)):
return _perform_action(job_name, user.namespace, "read", V1CronJob)


@router.get("/cronjobs/")
async def read_cronjobs(user: User = Depends(get_current_user)):
return _perform_action(None, user.namespace, "list", V1CronJob)


@router.delete("/cronjobs/{job_name}")
async def delete_cronjob(job_name: str, user: User = Depends(get_current_user)):
return _perform_action(job_name, user.namespace, "delete", V1CronJob)


@router.post("/cronjobs/")
async def create_cronjob(request: Request, user: User = Depends(get_current_user)):
data = await request.json()
return _create_job(data, V1CronJob, user)


# jobs #
@router.get("/jobs/{job_name}")
async def read_job(job_name: str, user: User = Depends(get_current_user)):
_, batch_api = get_k8s_api()
result = batch_api.read_namespaced_job(job_name, user.namespace)
return result.to_dict()
return _perform_action(job_name, user.namespace, "read", V1Job)


@router.get("/jobs/")
async def read_jobs(user: User = Depends(get_current_user)):
api, batch_api = get_k8s_api()
result = batch_api.list_namespaced_job(user.namespace)
return result.to_dict()
return _perform_action(None, user.namespace, "list", V1Job)


@router.delete("/jobs/{job_name}")
async def delete_job(job_name: str, user: User = Depends(get_current_user)):
_, batch_api = get_k8s_api()
result = batch_api.delete_namespaced_job(job_name, user.namespace)
return result.to_dict()
return _perform_action(job_name, user.namespace, "delete", V1Job)


@router.post("/jobs/")
async def create_job(request: Request, user: User = Depends(get_current_user)):
api, batch_api = get_k8s_api()

data = await request.json()
job_data = data["job"]

if job_template:
job_data = utils.merge_json_objects(job_data, job_template)

job: kubernetes.client.models.V1Job = utils.parse(
job_data, model=kubernetes.client.models.V1Job
)

code_data = data.get("code", None)
if code_data:
# The contents were base64encoded prior to being JSON serialized
# we have to decode it *after* submitting things to the API server...
# This is not great.
# code_data["binary_data"]["code"] = base64.b64decode(code_data["binary_data"]["code"])
config_map: Optional[kubernetes.client.models.V1ConfigMap] = utils.parse(
code_data, model=kubernetes.client.models.V1ConfigMap
)
else:
config_map = None

patch.patch(
job,
config_map,
annotations={},
labels={},
username=user.name,
ttl_seconds_after_finished=settings.kbatch_job_ttl_seconds_after_finished,
extra_env=settings.kbatch_job_extra_env,
api_token=user.api_token,
)

# What needs to happen when? We have a few requirements
# 1. The code ConfigMap must exist before adding it as a volume (we need a name,
# and k8s requires that)
# 2. MAYBE: The Job must exist before adding it as an owner for the ConfigMap
#
# So I think we're at 3 requests:
#
# 1. Submit configmap
# - ..
# 2. Submit Job
# 3. Patch ConfigMap to add Job as the owner
if settings.kbatch_create_user_namespace:
logger.info("Ensuring namespace %s", user.namespace)
created = ensure_namespace(api, user.namespace)
if created:
logger.info("Created namespace %s", user.namespace)

if config_map:
logger.info("Submitting ConfigMap")
config_map = api.create_namespaced_config_map(
namespace=user.namespace, body=config_map
)
patch.add_submitted_configmap_name(job, config_map)

logger.info("Submitting job")
try:
resp = batch_api.create_namespaced_job(namespace=user.namespace, body=job)
except kubernetes.client.exceptions.ApiException as e:
content_type = e.headers.get("Content-Type")
if content_type:
headers = {"Content-Type": content_type}
else:
headers = {}
raise HTTPException(status_code=e.status, detail=e.body, headers=headers)

if config_map:
logger.info(
"patching configmap %s with owner %s",
config_map.metadata.name,
resp.metadata.name,
)
patch.patch_configmap_owner(resp, config_map)
api.patch_namespaced_config_map(
name=config_map.metadata.name, namespace=user.namespace, body=config_map
)

# TODO: set Job as the owner of the code.
return resp.to_dict()
return _create_job(data, V1Job, user)


# pods #
@router.get("/pods/{pod_name}")
async def read_pod(pod_name: str, user: User = Depends(get_current_user)):
core_api, _ = get_k8s_api()
Expand Down Expand Up @@ -358,3 +292,153 @@ def ensure_namespace(api: kubernetes.client.CoreV1Api, namespace: str):
raise
else:
return True


def _create_job(
data: dict,
model: Union[V1CronJob, V1Job],
user: User = Depends(get_current_user),
):
"""
Create a Kubernetes batch Job or CronJob.

This is handled in three steps:
1. Submit ConfigMap
2. Submit Job/CronJob
3. Patch ConfigMap to add Job/CronJob as the owner

Parameters
----------
data : data specific to the Job or CronJob.
model : kubernetes batch models, "V1Job" "V1CronJob".
user : a `User` object which holds specific configuration settings.
"""
api, batch_api = get_k8s_api()

job_data = data["job"]

# does it handle cronjob job specs appropriately?
if job_template:
job_data = utils.merge_json_objects(job_data, job_template)

# can be either job or cronjob
job = utils.parse(job_data, model=model)
job_to_patch = job

if issubclass(model, V1CronJob):
j = job_data.get("spec", {}).get("job_template", {})
job_to_patch = utils.parse(j, V1Job)

code_data = data.get("code", None)
if code_data:
# The contents were base64encoded prior to being JSON serialized
# we have to decode it *after* submitting things to the API server...
# This is not great.
# code_data["binary_data"]["code"] = base64.b64decode(code_data["binary_data"]["code"])
config_map: Optional[V1ConfigMap] = utils.parse(code_data, model=V1ConfigMap)
else:
config_map = None

patch.patch(
job_to_patch,
config_map,
annotations={},
labels={},
username=user.name,
ttl_seconds_after_finished=settings.kbatch_job_ttl_seconds_after_finished,
extra_env=settings.kbatch_job_extra_env,
api_token=user.api_token,
)

# What needs to happen when? We have a few requirements
# 1. The code ConfigMap must exist before adding it as a volume (we need a name,
# and k8s requires that)
# 2. MAYBE: The Job must exist before adding it as an owner for the ConfigMap
#
# So I think we're at 3 requests:
#
# 1. Submit configmap
# - ..
# 2. Submit Job
# 3. Patch ConfigMap to add Job as the owner
if settings.kbatch_create_user_namespace:
logger.info("Ensuring namespace %s", user.namespace)
created = ensure_namespace(api, user.namespace)
if created:
logger.info("Created namespace %s", user.namespace)

if config_map:
logger.info("Submitting ConfigMap")
config_map = api.create_namespaced_config_map(
namespace=user.namespace, body=config_map
)
patch.add_submitted_configmap_name(job, config_map)

logger.info("Submitting job")
try:
if issubclass(model, V1Job):
resp = batch_api.create_namespaced_job(namespace=user.namespace, body=job)
elif issubclass(model, V1CronJob):
resp = batch_api.create_namespaced_cron_job(
namespace=user.namespace, body=job
)

except kubernetes.client.exceptions.ApiException as e:
content_type = e.headers.get("Content-Type")
if content_type:
headers = {"Content-Type": content_type}
else:
headers = {}
raise HTTPException(status_code=e.status, detail=e.body, headers=headers)

if config_map:
logger.info(
"patching configmap %s with owner %s",
config_map.metadata.name,
resp.metadata.name,
)
patch.patch_configmap_owner(resp, config_map)
api.patch_namespaced_config_map(
name=config_map.metadata.name, namespace=user.namespace, body=config_map
)

# TODO: set Job as the owner of the code.
return resp.to_dict()


def _perform_action(
job_name: Union[str, None],
namespace: str,
action: str,
model: Union[V1Job, V1CronJob],
) -> str:
"""
Perform an action on `job_name`.

Parameters
----------
job_name : name of the Kubernetes Job or CronJob.
namespace : Kubernetes namespace to check.
action : action to perform on `job_name`.
Must match one item in `job_actions` list.
model : kubernetes batch models, "V1Job" "V1CronJob"
"""
job_actions = ["list", "read", "delete"]
if action not in job_actions:
raise ValueError(
f"Unknown `action` specified: {action}. "
+ "Please select from one of the following: {job_actions}."
)

if issubclass(model, V1Job):
model = "job"
elif issubclass(model, V1CronJob):
model = "cron_job"

_, batch_api = get_k8s_api()
f = getattr(batch_api, f"{action}_namespaced_{model}")

if action == "list":
return f(namespace).to_dict()
else:
return f(job_name, namespace).to_dict()
Loading