diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index e7b4433..4e87e5a 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -1,6 +1,6 @@ repos: - repo: https://github.com/psf/black - rev: 21.5b1 + rev: 22.3.0 hooks: - id: black language_version: python3 diff --git a/docs/source/user-guide.md b/docs/source/user-guide.md index ff5a45a..a8ecf1a 100644 --- a/docs/source/user-guide.md +++ b/docs/source/user-guide.md @@ -51,6 +51,44 @@ $ kbatch job show list-files-jfprp With `kbatch job logs` you can get the logs for a job. Make sure to pass the container id. +## Submit a cronjob + +If you'd like your job to run on a repeating schedule, you can leverage CronJobs. The command line interface for `kbatch cronjob` is same as `kbatch job` with the added requirement that you specify a schedule when you `submit` a cronjob: + +```{code-block} console +$ kbatch cronjob submit \ + --name=list-files \ + --image=alpine \ + --command='["ls", "-lh"]' + --schedule='0 22 * * 1-5' +``` + +This job will now run at 22:00 on every day-of-week from Monday through Friday **indefinitely**. + +You can list schedule cronjobs: + +``` +$ kbatch cronjob list -o table +┏━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━┓ +┃ cronjob name ┃ started ┃ schedule ┃ +┡━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━┩ +│ mycj-cron-kb7d6 │ 2022-05-31T05:27:25+00:00 │ */5 * * * * │ +│ list-files-cron-56whl │ 2022-06-01T23:35:20+00:00 │ 0 22 * * 1-5 │ +└───────────────────────┴───────────────────────────┴──────────────┘ +``` + + +The only way to remove a cronjob is to explicitly delete it: + +``` +kbatch cronjob delete mycj-cron-kb7d6 +``` + +### Schedule syntax + +For those familiar with Linux cron jobs, the schedule syntax is the same. For those unfamiliar, have a read through the [Kubernetes CronJob - cron schedule syntax](https://kubernetes.io/docs/concepts/workloads/controllers/cron-jobs/#cron-schedule-syntax). The website [crontab.guru](https://crontab.guru/#0_22_*_*_1-5) is a nifty tool that tries to translate the schedule syntax into "plain" English. + + ## Submitting code files Your job likely relies on some local code files to function. Perhaps this is a notebook, shell script, or utility library that wouldn't be present in your container image. You can use the `-c` or `--code` option to provide a single file or list of files that will be made available before your job starts running. diff --git a/kbatch-proxy/kbatch_proxy/main.py b/kbatch-proxy/kbatch_proxy/main.py index 69276cd..949bb45 100644 --- a/kbatch-proxy/kbatch_proxy/main.py +++ b/kbatch-proxy/kbatch_proxy/main.py @@ -1,13 +1,13 @@ import os import logging -from typing import List, Optional, Tuple, Dict +from typing import List, Optional, Tuple, Dict, Union import yaml from pydantic import BaseModel, BaseSettings import jupyterhub.services.auth from fastapi import Depends, FastAPI, HTTPException, Request, status, APIRouter import kubernetes.client -import kubernetes.client.models +from kubernetes.client.models import V1CronJob, V1Job, V1ConfigMap import kubernetes.config import kubernetes.watch import rich.traceback @@ -88,9 +88,7 @@ class UserOut(BaseModel): job_template = yaml.safe_load(f) # parse with Kubernetes to normalize keys with job_data - job_template = utils.parse( - job_template, model=kubernetes.client.models.V1Job - ).to_dict() + job_template = utils.parse(job_template, model=V1Job).to_dict() utils.remove_nulls(job_template) else: @@ -155,115 +153,51 @@ def get_k8s_api() -> Tuple[kubernetes.client.CoreV1Api, kubernetes.client.BatchV # ---------------------------------------------------------------------------- # app +# cronjobs # +@router.get("/cronjobs/{job_name}") +async def read_cronjob(job_name: str, user: User = Depends(get_current_user)): + return _perform_action(job_name, user.namespace, "read", V1CronJob) + +@router.get("/cronjobs/") +async def read_cronjobs(user: User = Depends(get_current_user)): + return _perform_action(None, user.namespace, "list", V1CronJob) + + +@router.delete("/cronjobs/{job_name}") +async def delete_cronjob(job_name: str, user: User = Depends(get_current_user)): + return _perform_action(job_name, user.namespace, "delete", V1CronJob) + + +@router.post("/cronjobs/") +async def create_cronjob(request: Request, user: User = Depends(get_current_user)): + data = await request.json() + return _create_job(data, V1CronJob, user) + + +# jobs # @router.get("/jobs/{job_name}") async def read_job(job_name: str, user: User = Depends(get_current_user)): - _, batch_api = get_k8s_api() - result = batch_api.read_namespaced_job(job_name, user.namespace) - return result.to_dict() + return _perform_action(job_name, user.namespace, "read", V1Job) @router.get("/jobs/") async def read_jobs(user: User = Depends(get_current_user)): - api, batch_api = get_k8s_api() - result = batch_api.list_namespaced_job(user.namespace) - return result.to_dict() + return _perform_action(None, user.namespace, "list", V1Job) @router.delete("/jobs/{job_name}") async def delete_job(job_name: str, user: User = Depends(get_current_user)): - _, batch_api = get_k8s_api() - result = batch_api.delete_namespaced_job(job_name, user.namespace) - return result.to_dict() + return _perform_action(job_name, user.namespace, "delete", V1Job) @router.post("/jobs/") async def create_job(request: Request, user: User = Depends(get_current_user)): - api, batch_api = get_k8s_api() - data = await request.json() - job_data = data["job"] - - if job_template: - job_data = utils.merge_json_objects(job_data, job_template) - - job: kubernetes.client.models.V1Job = utils.parse( - job_data, model=kubernetes.client.models.V1Job - ) - - code_data = data.get("code", None) - if code_data: - # The contents were base64encoded prior to being JSON serialized - # we have to decode it *after* submitting things to the API server... - # This is not great. - # code_data["binary_data"]["code"] = base64.b64decode(code_data["binary_data"]["code"]) - config_map: Optional[kubernetes.client.models.V1ConfigMap] = utils.parse( - code_data, model=kubernetes.client.models.V1ConfigMap - ) - else: - config_map = None - - patch.patch( - job, - config_map, - annotations={}, - labels={}, - username=user.name, - ttl_seconds_after_finished=settings.kbatch_job_ttl_seconds_after_finished, - extra_env=settings.kbatch_job_extra_env, - api_token=user.api_token, - ) - - # What needs to happen when? We have a few requirements - # 1. The code ConfigMap must exist before adding it as a volume (we need a name, - # and k8s requires that) - # 2. MAYBE: The Job must exist before adding it as an owner for the ConfigMap - # - # So I think we're at 3 requests: - # - # 1. Submit configmap - # - .. - # 2. Submit Job - # 3. Patch ConfigMap to add Job as the owner - if settings.kbatch_create_user_namespace: - logger.info("Ensuring namespace %s", user.namespace) - created = ensure_namespace(api, user.namespace) - if created: - logger.info("Created namespace %s", user.namespace) - - if config_map: - logger.info("Submitting ConfigMap") - config_map = api.create_namespaced_config_map( - namespace=user.namespace, body=config_map - ) - patch.add_submitted_configmap_name(job, config_map) - - logger.info("Submitting job") - try: - resp = batch_api.create_namespaced_job(namespace=user.namespace, body=job) - except kubernetes.client.exceptions.ApiException as e: - content_type = e.headers.get("Content-Type") - if content_type: - headers = {"Content-Type": content_type} - else: - headers = {} - raise HTTPException(status_code=e.status, detail=e.body, headers=headers) - - if config_map: - logger.info( - "patching configmap %s with owner %s", - config_map.metadata.name, - resp.metadata.name, - ) - patch.patch_configmap_owner(resp, config_map) - api.patch_namespaced_config_map( - name=config_map.metadata.name, namespace=user.namespace, body=config_map - ) - - # TODO: set Job as the owner of the code. - return resp.to_dict() + return _create_job(data, V1Job, user) +# pods # @router.get("/pods/{pod_name}") async def read_pod(pod_name: str, user: User = Depends(get_current_user)): core_api, _ = get_k8s_api() @@ -358,3 +292,153 @@ def ensure_namespace(api: kubernetes.client.CoreV1Api, namespace: str): raise else: return True + + +def _create_job( + data: dict, + model: Union[V1CronJob, V1Job], + user: User = Depends(get_current_user), +): + """ + Create a Kubernetes batch Job or CronJob. + + This is handled in three steps: + 1. Submit ConfigMap + 2. Submit Job/CronJob + 3. Patch ConfigMap to add Job/CronJob as the owner + + Parameters + ---------- + data : data specific to the Job or CronJob. + model : kubernetes batch models, "V1Job" "V1CronJob". + user : a `User` object which holds specific configuration settings. + """ + api, batch_api = get_k8s_api() + + job_data = data["job"] + + # does it handle cronjob job specs appropriately? + if job_template: + job_data = utils.merge_json_objects(job_data, job_template) + + # can be either job or cronjob + job = utils.parse(job_data, model=model) + job_to_patch = job + + if issubclass(model, V1CronJob): + j = job_data.get("spec", {}).get("job_template", {}) + job_to_patch = utils.parse(j, V1Job) + + code_data = data.get("code", None) + if code_data: + # The contents were base64encoded prior to being JSON serialized + # we have to decode it *after* submitting things to the API server... + # This is not great. + # code_data["binary_data"]["code"] = base64.b64decode(code_data["binary_data"]["code"]) + config_map: Optional[V1ConfigMap] = utils.parse(code_data, model=V1ConfigMap) + else: + config_map = None + + patch.patch( + job_to_patch, + config_map, + annotations={}, + labels={}, + username=user.name, + ttl_seconds_after_finished=settings.kbatch_job_ttl_seconds_after_finished, + extra_env=settings.kbatch_job_extra_env, + api_token=user.api_token, + ) + + # What needs to happen when? We have a few requirements + # 1. The code ConfigMap must exist before adding it as a volume (we need a name, + # and k8s requires that) + # 2. MAYBE: The Job must exist before adding it as an owner for the ConfigMap + # + # So I think we're at 3 requests: + # + # 1. Submit configmap + # - .. + # 2. Submit Job + # 3. Patch ConfigMap to add Job as the owner + if settings.kbatch_create_user_namespace: + logger.info("Ensuring namespace %s", user.namespace) + created = ensure_namespace(api, user.namespace) + if created: + logger.info("Created namespace %s", user.namespace) + + if config_map: + logger.info("Submitting ConfigMap") + config_map = api.create_namespaced_config_map( + namespace=user.namespace, body=config_map + ) + patch.add_submitted_configmap_name(job, config_map) + + logger.info("Submitting job") + try: + if issubclass(model, V1Job): + resp = batch_api.create_namespaced_job(namespace=user.namespace, body=job) + elif issubclass(model, V1CronJob): + resp = batch_api.create_namespaced_cron_job( + namespace=user.namespace, body=job + ) + + except kubernetes.client.exceptions.ApiException as e: + content_type = e.headers.get("Content-Type") + if content_type: + headers = {"Content-Type": content_type} + else: + headers = {} + raise HTTPException(status_code=e.status, detail=e.body, headers=headers) + + if config_map: + logger.info( + "patching configmap %s with owner %s", + config_map.metadata.name, + resp.metadata.name, + ) + patch.patch_configmap_owner(resp, config_map) + api.patch_namespaced_config_map( + name=config_map.metadata.name, namespace=user.namespace, body=config_map + ) + + # TODO: set Job as the owner of the code. + return resp.to_dict() + + +def _perform_action( + job_name: Union[str, None], + namespace: str, + action: str, + model: Union[V1Job, V1CronJob], +) -> str: + """ + Perform an action on `job_name`. + + Parameters + ---------- + job_name : name of the Kubernetes Job or CronJob. + namespace : Kubernetes namespace to check. + action : action to perform on `job_name`. + Must match one item in `job_actions` list. + model : kubernetes batch models, "V1Job" "V1CronJob" + """ + job_actions = ["list", "read", "delete"] + if action not in job_actions: + raise ValueError( + f"Unknown `action` specified: {action}. " + + "Please select from one of the following: {job_actions}." + ) + + if issubclass(model, V1Job): + model = "job" + elif issubclass(model, V1CronJob): + model = "cron_job" + + _, batch_api = get_k8s_api() + f = getattr(batch_api, f"{action}_namespaced_{model}") + + if action == "list": + return f(namespace).to_dict() + else: + return f(job_name, namespace).to_dict() diff --git a/kbatch-proxy/kbatch_proxy/patch.py b/kbatch-proxy/kbatch_proxy/patch.py index a7c890e..988e594 100644 --- a/kbatch-proxy/kbatch_proxy/patch.py +++ b/kbatch-proxy/kbatch_proxy/patch.py @@ -3,11 +3,12 @@ """ import re import string -from typing import Dict, Optional +from typing import Dict, Optional, Union import escapism from kubernetes.client.models import ( V1Job, + V1JobTemplateSpec, V1ConfigMap, V1Container, V1VolumeMount, @@ -22,7 +23,9 @@ SAFE_CHARS = set(string.ascii_lowercase + string.digits) -def add_annotations(job: V1Job, annotations, username: str) -> None: +def add_annotations( + job: Union[V1Job, V1JobTemplateSpec], annotations, username: str +) -> None: annotations = dict(annotations) annotations["kbatch.jupyter.org/username"] = username @@ -30,7 +33,7 @@ def add_annotations(job: V1Job, annotations, username: str) -> None: job.spec.template.metadata.annotations.update(annotations) # update or replace? -def add_labels(job: V1Job, labels, username: str) -> None: +def add_labels(job: Union[V1Job, V1JobTemplateSpec], labels, username: str) -> None: labels = dict(labels) labels["kbatch.jupyter.org/username"] = escapism.escape( username, safe=SAFE_CHARS, escape_char="-" @@ -40,7 +43,7 @@ def add_labels(job: V1Job, labels, username: str) -> None: job.spec.template.metadata.labels.update(labels) # update or replace? -def add_namespace(job: V1Job, namespace: str) -> None: +def add_namespace(job: Union[V1Job, V1JobTemplateSpec], namespace: str) -> None: job.metadata.namespace = namespace job.spec.template.metadata.namespace = namespace @@ -49,11 +52,11 @@ def add_namespace_configmap(config_map: V1ConfigMap, namespace: str) -> None: config_map.metadata.namespace = namespace -def add_code_configmap(job: V1Job) -> None: +def add_code_configmap(job: Union[V1Job, V1JobTemplateSpec]) -> None: pass -def add_unzip_init_container(job: V1Job) -> None: +def add_unzip_init_container(job: Union[V1Job, V1JobTemplateSpec]) -> None: """ Adds an init container to unzip the code. """ @@ -110,7 +113,9 @@ def add_unzip_init_container(job: V1Job) -> None: def add_extra_env( - job: V1Job, extra_env: Dict[str, str], api_token: Optional[str] = None + job: Union[V1Job, V1JobTemplateSpec], + extra_env: Dict[str, str], + api_token: Optional[str] = None, ) -> None: container = job.spec.template.spec.containers[0] env_vars = [V1EnvVar(name=name, value=value) for name, value in extra_env.items()] @@ -146,13 +151,13 @@ def namespace_for_username(username: str) -> str: def add_job_ttl_seconds_after_finished( - job: V1Job, ttl_seconds_after_finished: Optional[int] + job: Union[V1Job, V1JobTemplateSpec], ttl_seconds_after_finished: Optional[int] ) -> None: job.spec.ttl_seconds_after_finished = ttl_seconds_after_finished def patch( - job: V1Job, + job: Union[V1Job, V1JobTemplateSpec], config_map: Optional[V1ConfigMap], *, username: str, @@ -185,13 +190,17 @@ def patch( add_unzip_init_container(job) -def add_submitted_configmap_name(job: V1Job, config_map: V1ConfigMap): +def add_submitted_configmap_name( + job: Union[V1Job, V1JobTemplateSpec], config_map: V1ConfigMap +): # config_map should be the response from the Kubernetes API server with the # submitted name job.spec.template.spec.volumes[-2].config_map.name = config_map.metadata.name -def patch_configmap_owner(job: V1Job, config_map: V1ConfigMap): +def patch_configmap_owner( + job: Union[V1Job, V1JobTemplateSpec], config_map: V1ConfigMap +): if job.metadata.name is None: raise ValueError("job must have a name before it can be set as an owner") assert job.metadata.name is not None diff --git a/kbatch-proxy/tests/test_proxy.py b/kbatch-proxy/tests/test_proxy.py index f4aad8d..56251d6 100644 --- a/kbatch-proxy/tests/test_proxy.py +++ b/kbatch-proxy/tests/test_proxy.py @@ -12,66 +12,97 @@ HERE = pathlib.Path(__file__).parent -@pytest.fixture +def k8s_job_spec() -> kubernetes.client.V1JobSpec: + return kubernetes.client.V1JobSpec( + template=kubernetes.client.V1PodTemplateSpec( + spec=kubernetes.client.V1PodSpec( + containers=[ + kubernetes.client.V1Container( + args=["ls", "-lh"], + command=None, + image="alpine", + name="job", + env=[kubernetes.client.V1EnvVar(name="MYENV", value="MYVALUE")], + resources=kubernetes.client.V1ResourceRequirements(), + ) + ], + restart_policy="Never", + tolerations=None, + ), + metadata=kubernetes.client.V1ObjectMeta( + name="test-name-pod", + labels={"pod": "label"}, + annotations={"pod": "annotations"}, + ), + ), + backoff_limit=4, + ttl_seconds_after_finished=300, + ) + + def k8s_job() -> kubernetes.client.V1Job: - job = kubernetes.client.models.V1Job( + metadata = kubernetes.client.V1ObjectMeta( + name="name", + generate_name="name-", + annotations={"foo": "bar"}, + labels={"baz": "qux"}, + ) + return kubernetes.client.models.V1Job( api_version="batch/v1", kind="Job", - metadata=kubernetes.client.models.V1ObjectMeta( - name="name", - generate_name="name-", - annotations={"foo": "bar"}, - labels={"baz": "qux"}, - ), - spec=kubernetes.client.V1JobSpec( - template=kubernetes.client.V1PodTemplateSpec( - spec=kubernetes.client.V1PodSpec( - containers=[ - kubernetes.client.V1Container( - args=["ls", "-lh"], - command=None, - image="alpine", - name="job", - env=[ - kubernetes.client.V1EnvVar( - name="MYENV", value="MYVALUE" - ) - ], - resources=kubernetes.client.V1ResourceRequirements(), - ) - ], - restart_policy="Never", - tolerations=None, - ), - metadata=kubernetes.client.V1ObjectMeta( - name="test-name-pod", - labels={"pod": "label"}, - annotations={"pod": "annotations"}, - ), + metadata=metadata, + spec=k8s_job_spec(), + ) + + +def k8s_cronjob() -> kubernetes.client.V1CronJob: + metadata = kubernetes.client.V1ObjectMeta( + name="name-cron", + generate_name="name-cron-", + annotations={"foo": "bar"}, + labels={"baz": "qux"}, + ) + return kubernetes.client.V1CronJob( + api_version="batch/v1", + kind="CronJob", + metadata=metadata, + spec=kubernetes.client.V1CronJobSpec( + schedule="*/5 * * * *", + job_template=kubernetes.client.V1JobTemplateSpec( + spec=k8s_job_spec(), + metadata=metadata, ), - backoff_limit=4, - ttl_seconds_after_finished=300, ), ) - return job -def test_parse_job(k8s_job: kubernetes.client.V1Job): - result = kbatch_proxy.utils.parse(k8s_job.to_dict(), kubernetes.client.V1Job) - assert result == k8s_job +@pytest.fixture(scope="function", params=[k8s_job, k8s_cronjob]) +def job(request): + j = request.param() + if isinstance(j, kubernetes.client.V1CronJob): + j = j.spec.job_template + + yield j + + +def test_parse_job(job): + if isinstance(job, kubernetes.client.V1Job): + result = kbatch_proxy.utils.parse(job.to_dict(), kubernetes.client.V1Job) + else: + result = kbatch_proxy.utils.parse( + job.to_dict(), kubernetes.client.V1JobTemplateSpec + ) container = result.spec.template.spec.containers[0] assert isinstance(container, kubernetes.client.V1Container) assert container.args == ["ls", "-lh"] -def test_patch_job(k8s_job: kubernetes.client.V1Job): - kbatch_proxy.patch.patch( - k8s_job, None, annotations={}, labels={}, username="myuser" - ) +def test_patch_job(job): + kbatch_proxy.patch.patch(job, None, annotations={}, labels={}, username="myuser") - assert k8s_job.metadata.namespace == "myuser" - assert k8s_job.spec.template.metadata.namespace == "myuser" + assert job.metadata.namespace == "myuser" + assert job.spec.template.metadata.namespace == "myuser" @pytest.mark.parametrize( @@ -101,36 +132,32 @@ def test_namespace_configmap(): @pytest.mark.parametrize("has_init_containers", [True, False]) @pytest.mark.parametrize("has_volumes", [True, False]) -def test_add_unzip_init_container( - k8s_job: kubernetes.client.V1Job, has_init_containers: bool, has_volumes: bool -): +def test_add_unzip_init_container(job, has_init_containers: bool, has_volumes: bool): if has_init_containers: - k8s_job.spec.template.spec.init_containers = [ + job.spec.template.spec.init_containers = [ kubernetes.client.V1Container(name="present-container") ] if has_volumes: - k8s_job.spec.template.spec.volumes = [ + job.spec.template.spec.volumes = [ kubernetes.client.V1Volume(name="present-volume", empty_dir={}) ] - k8s_job.spec.template.spec.containers[0].volume_mounts = [ + job.spec.template.spec.containers[0].volume_mounts = [ kubernetes.client.V1VolumeMount( name="present-volume", mount_path="/present-volume" ) ] - kbatch_proxy.patch.add_unzip_init_container(k8s_job) + kbatch_proxy.patch.add_unzip_init_container(job) n_init_containers = int(has_init_containers) + 1 - assert len(k8s_job.spec.template.spec.init_containers) == n_init_containers + assert len(job.spec.template.spec.init_containers) == n_init_containers n_volumes = int(has_volumes) + 2 - assert len(k8s_job.spec.template.spec.volumes) == n_volumes + assert len(job.spec.template.spec.volumes) == n_volumes n_volume_mounts = int(has_volumes) + 1 - assert ( - len(k8s_job.spec.template.spec.containers[0].volume_mounts) == n_volume_mounts - ) + assert len(job.spec.template.spec.containers[0].volume_mounts) == n_volume_mounts # now patch with the actual name config_map = kubernetes.client.V1ConfigMap( @@ -138,19 +165,23 @@ def test_add_unzip_init_container( name="actual-name", namespace="my-namespace" ) ) - kbatch_proxy.patch.add_submitted_configmap_name(k8s_job, config_map) - assert k8s_job.spec.template.spec.volumes[-2].config_map.name == "actual-name" + kbatch_proxy.patch.add_submitted_configmap_name(job, config_map) + assert job.spec.template.spec.volumes[-2].config_map.name == "actual-name" @pytest.mark.parametrize( "job_env", [None, [], [kubernetes.client.V1EnvVar(name="SAS_TOKEN", value="TOKEN")]] ) -def test_extra_env(job_env, k8s_job: kubernetes.client.V1Job): +def test_extra_env(job, job_env): has_env = bool(job_env) - k8s_job.spec.template.spec.containers[0].env = job_env + + # make copy to avoid mutation + job.spec.template.spec.containers[0].env = ( + job_env.copy() if job_env is not None else None + ) extra_env = {"MY_ENV": "VALUE"} - kbatch_proxy.patch.add_extra_env(k8s_job, extra_env, api_token="super-secret") + kbatch_proxy.patch.add_extra_env(job, extra_env, api_token="super-secret") if has_env: expected = [ @@ -172,23 +203,21 @@ def test_extra_env(job_env, k8s_job: kubernetes.client.V1Job): ), ] - assert k8s_job.spec.template.spec.containers[0].env == expected + assert job.spec.template.spec.containers[0].env == expected -def test_set_job_ttl_seconds_after_finished(k8s_job: kubernetes.client.V1Job): - kbatch_proxy.patch.patch( - k8s_job, None, username="foo", ttl_seconds_after_finished=10 - ) - assert k8s_job.spec.ttl_seconds_after_finished == 10 +def test_set_job_ttl_seconds_after_finished(job): + kbatch_proxy.patch.patch(job, None, username="foo", ttl_seconds_after_finished=10) + assert job.spec.ttl_seconds_after_finished == 10 -def test_add_node_affinity(k8s_job: kubernetes.client.V1Job): +def test_add_node_affinity(job): job_template = yaml.safe_load((HERE / "job_template.yaml").read_text()) job_template = kbatch_proxy.utils.parse( job_template, kubernetes.client.V1Job ).to_dict() - job_data = k8s_job.to_dict() + job_data = job.to_dict() result = kbatch_proxy.utils.merge_json_objects(job_data, job_template) result = kbatch_proxy.utils.parse(result, kubernetes.client.V1Job) diff --git a/kbatch/kbatch/__init__.py b/kbatch/kbatch/__init__.py index d25c522..0129d4b 100644 --- a/kbatch/kbatch/__init__.py +++ b/kbatch/kbatch/__init__.py @@ -10,8 +10,8 @@ list_pods, logs_streaming, ) -from ._types import Job -from ._backend import make_job +from ._types import Job, CronJob +from ._backend import make_job, make_cronjob __version__ = "0.3.2" @@ -22,6 +22,7 @@ "list_jobs", "submit_job", "make_job", + "make_cronjob", "configure", "show_job", "logs", diff --git a/kbatch/kbatch/_backend.py b/kbatch/kbatch/_backend.py index c63837e..049291e 100644 --- a/kbatch/kbatch/_backend.py +++ b/kbatch/kbatch/_backend.py @@ -13,6 +13,9 @@ from kubernetes.client.models import ( V1Job, V1JobSpec, + V1JobTemplateSpec, + V1CronJob, + V1CronJobSpec, V1PodSpec, V1PodTemplateSpec, V1ObjectMeta, @@ -28,7 +31,7 @@ V1NodeSelectorRequirement, ) -from ._types import Job +from ._types import CronJob, Job SAFE_CHARS = set(string.ascii_lowercase + string.digits) @@ -44,13 +47,12 @@ # ) -def make_job( - job: Job, +def _make_job_spec( + job: Union[Job, CronJob], profile: Optional[dict] = None, -) -> V1Job: - """ - Make a Kubernetes pod specification for a user-submitted job. - """ + labels: Optional[dict] = None, + annotations: Optional[dict] = None, +): profile = profile or {} name = job.name # TODO: deduplicate somehow... image = job.image or profile.get("image", None) @@ -61,19 +63,8 @@ def make_job( command = job.command args = job.args - - # annotations = k8s_config.annotations - # labels = k8s_config.labels env = job.env - # annotations = annotations or {} - annotations: Dict[str, str] = {} - # TODO: set in proxy - - # labels = labels or {} - # labels = dict(labels) - labels: Dict[str, str] = {} - # file_volume_mount = V1VolumeMount(mount_path="/code", name="file-volume") # file_volume = V1Volume(name="file-volume", empty_dir={}) @@ -168,9 +159,40 @@ def make_job( metadata=pod_metadata, ) + return V1JobSpec(template=template, backoff_limit=0, ttl_seconds_after_finished=300) + + +def _make_job_name(name: str, schedule: str = None): generate_name = name if not name.endswith("-"): generate_name = name + "-" + if schedule: + generate_name += "cron-" + return generate_name + + +def make_cronjob( + cronjob: CronJob, + profile: Optional[dict] = None, +) -> V1CronJob: + """ + Make a Kubernetes pod specification for a user-submitted cronjob. + """ + name = cronjob.name + schedule = cronjob.schedule + + generate_name = _make_job_name(name, schedule=schedule) + + # annotations = k8s_config.annotations + # labels = k8s_config.labels + annotations: Dict[str, str] = {} + # TODO: set in proxy + + # labels = labels or {} + # labels = dict(labels) + labels: Dict[str, str] = {} + + job_spec = _make_job_spec(cronjob, profile, labels, annotations) job_metadata = V1ObjectMeta( generate_name=generate_name, @@ -178,15 +200,57 @@ def make_job( labels=labels, ) - job = V1Job( + job_template = V1JobTemplateSpec( + metadata=job_metadata, + spec=job_spec, + ) + + return V1CronJob( api_version="batch/v1", - kind="Job", + kind="CronJob", metadata=job_metadata, - spec=V1JobSpec( - template=template, backoff_limit=0, ttl_seconds_after_finished=300 + spec=V1CronJobSpec( + schedule=cronjob.schedule, + job_template=job_template, + starting_deadline_seconds=300, ), ) - return job + + +def make_job( + job: Job, + profile: Optional[dict] = None, +) -> V1Job: + """ + Make a Kubernetes pod specification for a user-submitted job. + """ + name = job.name + + generate_name = _make_job_name(name) + + # annotations = k8s_config.annotations + # labels = k8s_config.labels + annotations: Dict[str, str] = {} + # TODO: set in proxy + + # labels = labels or {} + # labels = dict(labels) + labels: Dict[str, str] = {} + + job_spec = _make_job_spec(job, profile, labels, annotations) + + job_metadata = V1ObjectMeta( + generate_name=generate_name, + annotations=annotations, + labels=labels, + ) + + return V1Job( + api_version="batch/v1", + kind="Job", + metadata=job_metadata, + spec=job_spec, + ) def make_configmap(code: Union[str, pathlib.Path], generate_name) -> V1ConfigMap: diff --git a/kbatch/kbatch/_core.py b/kbatch/kbatch/_core.py index 168bf0d..4031965 100644 --- a/kbatch/kbatch/_core.py +++ b/kbatch/kbatch/_core.py @@ -9,8 +9,9 @@ import rich.table import httpx import urllib.parse +import yaml +from kubernetes.client.models import V1CronJob, V1Job -from ._types import Job from ._backend import make_configmap @@ -76,7 +77,14 @@ def configure(kbatch_url=None, token=None) -> Path: return configpath -def _do_job(job_name, kbatch_url, token, method: str): +def _request_action( + kbatch_url: str, + token: Optional[str], + method: str, + model: Union[V1Job, V1CronJob], + resource_name: str = None, + json_data: Optional[dict] = None, +): client = httpx.Client(follow_redirects=True) config = load_config() @@ -87,37 +95,79 @@ def _do_job(job_name, kbatch_url, token, method: str): "Authorization": f"token {token}", } + http_methods = ["GET", "DELETE", "POST"] + if method not in http_methods: + raise ValueError( + f"Unknown method specified: {method}. " + + "Please select from one of the following: {http_methods}." + ) + + endpoint = "jobs/" if issubclass(model, V1Job) else "cronjobs/" + + if resource_name: + endpoint += resource_name + r = client.request( - method, urllib.parse.urljoin(kbatch_url, f"jobs/{job_name}"), headers=headers + method, + urllib.parse.urljoin(kbatch_url, endpoint), + headers=headers, + json=json_data, ) - r.raise_for_status() + try: + r.raise_for_status() + except Exception: + logger.exception(r.json()) + raise return r.json() -def show_job(job_name, kbatch_url, token): - return _do_job(job_name, kbatch_url, token, method="GET") +def show_job(resource_name, kbatch_url, token, model: Union[V1Job, V1CronJob] = V1Job): + return _request_action(kbatch_url, token, "GET", model, resource_name) -def delete_job(job_name, kbatch_url, token): - return _do_job(job_name, kbatch_url, token, method="DELETE") +def delete_job( + resource_name, kbatch_url, token, model: Union[V1Job, V1CronJob] = V1Job +): + return _request_action(kbatch_url, token, "DELETE", model, resource_name) -def list_jobs(kbatch_url, token): - client = httpx.Client(follow_redirects=True) - config = load_config() +def list_jobs(kbatch_url, token, model: Union[V1Job, V1CronJob] = V1Job): + return _request_action(kbatch_url, token, "GET", model) - token = token or os.environ.get("JUPYTERHUB_API_TOKEN") or config["token"] - kbatch_url = handle_url(kbatch_url, config) - headers = { - "Authorization": f"token {token}", - } +def submit_job( + job, + kbatch_url, + token=None, + model: Union[V1Job, V1CronJob] = V1Job, + code=None, + profile=None, +): + from ._backend import make_job, make_cronjob - r = client.get(urllib.parse.urljoin(kbatch_url, "jobs/"), headers=headers) - r.raise_for_status() + profile = profile or {} - return r.json() + if issubclass(model, V1Job): + data = make_job(job, profile=profile).to_dict() + elif issubclass(model, V1CronJob): + data = make_cronjob(job, profile=profile).to_dict() + else: + raise ValueError( + f"Unknown resource specified: {model}. " + + "Please select from one of the following: `V1Job` or `V1CronJob`." + ) + + data = {"job": data} + + if code: + cm = make_configmap(code, generate_name=job.name).to_dict() + cm["binary_data"]["code"] = base64.b64encode(cm["binary_data"]["code"]).decode( + "ascii" + ) + data["code"] = cm + + return _request_action(kbatch_url, token, "POST", model, json_data=data) def list_pods(kbatch_url: str, token: Optional[str], job_name: Optional[str] = None): @@ -185,50 +235,6 @@ def _logs( yield r.text -def submit_job( - job: Job, - *, - code: Optional[Union[str, Path]] = None, - kbatch_url: Optional[str] = None, - token: Optional[str] = None, - profile: Optional[dict] = None, -): - from ._backend import make_job - - config = load_config() - - client = httpx.Client(follow_redirects=True) - token = token or os.environ.get("JUPYTERHUB_API_TOKEN") or config["token"] - kbatch_url = handle_url(kbatch_url, config) - - headers = { - "Authorization": f"token {token}", - } - # data = job.to_kubernetes().to_dict() - profile = profile or {} - data = make_job(job, profile=profile).to_dict() - data = {"job": data} - if code: - cm = make_configmap(code, generate_name=job.name).to_dict() - cm["binary_data"]["code"] = base64.b64encode(cm["binary_data"]["code"]).decode( - "ascii" - ) - data["code"] = cm - - r = client.post( - urllib.parse.urljoin(kbatch_url, "jobs/"), - json=data, - headers=headers, - ) - try: - r.raise_for_status() - except Exception: - logger.exception(r.json()) - raise - - return r.json() - - def status(row): if row["status"]["succeeded"]: return "[green]done[/green]" @@ -283,6 +289,23 @@ def format_jobs(data): return table +def format_cronjobs(data): + table = rich.table.Table(title="CronJobs") + + table.add_column("cronjob name", style="bold", no_wrap=True) + table.add_column("started") + table.add_column("schedule") + + for row in data["items"]: + table.add_row( + row["metadata"]["name"], + row["metadata"]["creation_timestamp"], + row["spec"]["schedule"], + ) + + return table + + def format_pods(data): table = rich.table.Table(title="Pods") @@ -319,3 +342,49 @@ def load_profile(profile_name: str, kbatch_url: str) -> dict: profiles = show_profiles(kbatch_url) profile = profiles[profile_name] return profile + + +def _prep_job_data( + file, + code, + name, + description, + image, + command, + args, + profile, + kbatch_url, + env, +): + if command: + command = json.loads(command) + if args: + args = json.loads(args) + + data = {} + + if file: + data = yaml.safe_load(Path(file).read_text()) + + data_profile = data.pop("profile", None) + profile = profile or data_profile or {} + + if name is not None: + data["name"] = name + if description is not None: + data["description"] = description + if image is not None: + data["image"] = image + if args is not None: + data["args"] = args + if command is not None: + data["command"] = command + if env: + env = json.loads(env) + data["env"] = env + + code = code or data.pop("code", None) + if profile: + profile = load_profile(profile, kbatch_url) + + return data diff --git a/kbatch/kbatch/_types.py b/kbatch/kbatch/_types.py index 9f9f3c9..a291e67 100644 --- a/kbatch/kbatch/_types.py +++ b/kbatch/kbatch/_types.py @@ -20,6 +20,26 @@ class Job: code: Optional[str] = None def to_kubernetes(self): - from ._backend import make_job + return _to_kubernetes() - return make_job(self) + +@dataclass() +class CronJob: + name: str + schedule: str + image: Optional[str] = None + command: Optional[List[str]] = None + args: Optional[List[str]] = None + upload: Optional[str] = None + description: Optional[str] = None + env: Dict[str, str] = field(default_factory=dict) + code: Optional[str] = None + + def to_kubernetes(self): + return _to_kubernetes() + + +def _to_kubernetes(self): + from ._backend import make_job + + return make_job(self) diff --git a/kbatch/kbatch/cli.py b/kbatch/kbatch/cli.py index c6f838f..9c85b93 100644 --- a/kbatch/kbatch/cli.py +++ b/kbatch/kbatch/cli.py @@ -1,14 +1,12 @@ -import json import logging -from pathlib import Path import click import rich import rich.logging -import yaml +from kubernetes.client.models import V1Job, V1CronJob from . import _core -from ._types import Job +from ._types import CronJob, Job FORMAT = "%(message)s" @@ -35,29 +33,167 @@ def configure(kbatch_url, token): rich.print(f"[green]Wrote config to[/green] [bold]{str(p)}[/bold]") +@cli.command() +@click.option("--kbatch-url", help="URL to the kbatch server.") +def profiles(kbatch_url): + """ + Show the profiles used by the server. + + kbatch administrators can serve profiles at the "/profiles" endpoint with + some configuration values for various profiles. + """ + p = _core.show_profiles(kbatch_url) + rich.print_json(data=p) + + +# CRONJOB +@cli.group() +def cronjob(): + """Manage kbatch cronjobs.""" + pass + + +@cronjob.command(name="show") +@click.option("--kbatch-url", help="URL to the kbatch server.") +@click.option("--token", help="File to execute.") +@click.argument("cronjob_name") +def show_cronjob(cronjob_name, kbatch_url, token): + """Show the details for a cronjob.""" + result = _core.show_job(cronjob_name, kbatch_url, token, V1CronJob) + rich.print_json(data=result) + + +@cronjob.command(name="delete") +@click.option("--kbatch-url", help="URL to the kbatch server.") +@click.option("--token", help="File to execute.") +@click.argument("cronjob_name") +def delete_cronjob(cronjob_name, kbatch_url, token): + """Delete a cronjob, cancelling running jobs and pods.""" + result = _core.delete_job(cronjob_name, kbatch_url, token, V1CronJob) + rich.print_json(data=result) + + +@cronjob.command(name="list") +@click.option("--kbatch-url", help="URL to the kbatch server.") +@click.option("--token", help="File to execute.") +@click.option( + "-o", + "--output", + help="output format", + type=click.Choice(["json", "table"]), + default="json", +) +def list_cronjobs(kbatch_url, token, output): + """List all the cronjobs.""" + results = _core.list_jobs(kbatch_url, token, V1CronJob) + + if output == "json": + rich.print_json(data=results) + elif output == "table": + rich.print(_core.format_cronjobs(results)) + + +@cronjob.command(name="submit") +@click.option("-n", "--name", help="CronJob name.", required=True) +@click.option("--image", help="Container image to use to execute job.") +@click.option("--command", help="Command to execute.") +@click.option("--args", help="Arguments to pass to the command.") +@click.option( + "--schedule", help="The schedule this cronjob should run on.", required=True +) +@click.option("-e", "--env", help="JSON mapping of environment variables for the job.") +@click.option("-d", "--description", help="A description of the cronjob, optional.") +@click.option( + "-c", + "--code", + help="Local file or directory of source code to make available to the cronjob.", +) +@click.option("-p", "--profile", help="Profile name to use. See 'kbatch profiles'.") +@click.option("-f", "--file", help="Configuration file.") +@click.option("--kbatch-url", help="URL to the kbatch server.") +@click.option("--token", help="JupyterHub API token.") +@click.option( + "-o", + "--output", + default="json", + help="Output format.", + type=click.Choice(["json", "name"]), +) +def submit_cronjob( + file, + code, + name, + description, + image, + command, + args, + schedule, + profile, + kbatch_url, + token, + env, + output, +): + """ + Submit a CronJob to run on Kubernetes. + """ + + data = _core._prep_job_data( + file, + code, + name, + description, + image, + command, + args, + profile, + kbatch_url, + env, + ) + + if schedule is not None: + data["schedule"] = schedule + + cronjob = CronJob(**data) + + result = _core.submit_job( + job=cronjob, + kbatch_url=kbatch_url, + token=token, + model=V1CronJob, + code=code, + profile=profile, + ) + if output == "json": + rich.print_json(data=result) + elif output == "name": + print(result["metadata"]["name"]) + + +# JOB @cli.group() def job(): """Manage kbatch jobs.""" pass -@job.command() +@job.command(name="show") @click.option("--kbatch-url", help="URL to the kbatch server.") @click.option("--token", help="File to execute.") @click.argument("job_name") -def show(job_name, kbatch_url, token): +def show_job(job_name, kbatch_url, token): """Show the details for a job.""" - result = _core.show_job(job_name, kbatch_url, token) + result = _core.show_job(job_name, kbatch_url, token, V1Job) rich.print_json(data=result) -@job.command() +@job.command(name="delete") @click.option("--kbatch-url", help="URL to the kbatch server.") @click.option("--token", help="File to execute.") @click.argument("job_name") -def delete(job_name, kbatch_url, token): +def delete_job(job_name, kbatch_url, token): """Delete a job, cancelling running pods.""" - result = _core.delete_job(job_name, kbatch_url, token) + result = _core.delete_job(job_name, kbatch_url, token, V1Job) rich.print_json(data=result) @@ -73,16 +209,16 @@ def delete(job_name, kbatch_url, token): ) def list_jobs(kbatch_url, token, output): """List all the jobs.""" - result = _core.list_jobs(kbatch_url, token) + results = _core.list_jobs(kbatch_url, token, V1Job) if output == "json": - rich.print_json(data=result) + rich.print_json(data=results) elif output == "table": - rich.print(_core.format_jobs(result)) + rich.print(_core.format_jobs(results)) -@job.command() -@click.option("-n", "--name", help="Job name.") +@job.command(name="submit") +@click.option("-n", "--name", help="Job name.", required=True) @click.option("--image", help="Container image to use to execute job.") @click.option("--command", help="Command to execute.") @click.option("--args", help="Arguments to pass to the command.") @@ -104,7 +240,7 @@ def list_jobs(kbatch_url, token, output): help="Output format.", type=click.Choice(["json", "name"]), ) -def submit( +def submit_job( file, code, name, @@ -121,44 +257,28 @@ def submit( """ Submit a job to run on Kubernetes. """ - if command: - command = json.loads(command) - if args: - args = json.loads(args) - - data = {} - - if file: - data = yaml.safe_load(Path(file).read_text()) - - data_profile = data.pop("profile", None) - profile = profile or data_profile or {} - - if name is not None: - data["name"] = name - if description is not None: - data["description"] = description - if image is not None: - data["image"] = image - if args is not None: - data["args"] = args - if command is not None: - data["command"] = command - if env: - env = json.loads(env) - data["env"] = env - - code = code or data.pop("code", None) - if profile: - profile = _core.load_profile(profile, kbatch_url) + + data = _core._prep_job_data( + file, + code, + name, + description, + image, + command, + args, + profile, + kbatch_url, + env, + ) job = Job(**data) result = _core.submit_job( job, - code=code, kbatch_url=kbatch_url, token=token, + model=V1Job, + code=code, profile=profile, ) if output == "json": @@ -167,19 +287,7 @@ def submit( print(result["metadata"]["name"]) -@cli.command() -@click.option("--kbatch-url", help="URL to the kbatch server.") -def profiles(kbatch_url): - """ - Show the profiles used by the server. - - kbatch administrators can serve profiles at the "/profiles" endpoint with - some configuration values for various profiles. - """ - p = _core.show_profiles(kbatch_url) - rich.print_json(data=p) - - +# POD @cli.group() def pod(): """Manage job pods.""" diff --git a/kbatch/tests/test_core.py b/kbatch/tests/test_core.py index 11b3bd5..1880f21 100644 --- a/kbatch/tests/test_core.py +++ b/kbatch/tests/test_core.py @@ -36,6 +36,21 @@ def test_env(env): kubernetes.client.V1EnvVar(name="KEY2", value="VALUE2"), ] + cronjobin = kbatch.CronJob( + name="name", + command=["ls -lh"], + image="alpine", + env=env, + schedule="*/10 * * * *", + ) + + cronjob = kbatch.make_cronjob(cronjobin) + container = cronjob.spec.job_template.spec.template.spec.containers[0] + assert container.env == [ + kubernetes.client.V1EnvVar(name="KEY1", value="VALUE1"), + kubernetes.client.V1EnvVar(name="KEY2", value="VALUE2"), + ] + def test_command_args(): model_job = kbatch.Job( @@ -50,6 +65,19 @@ def test_command_args(): assert job_container.args == ["-c", "python"] assert job_container.command == ["/bin/sh"] + model_cronjob = kbatch.CronJob( + name="name", + command=["/bin/sh"], + args=["-c", "python"], + image="alpine", + schedule="*/10 * * * *", + ) + cronjob = kbatch.make_cronjob(model_cronjob) + + job_container = cronjob.spec.job_template.spec.template.spec.containers[0] + assert job_container.args == ["-c", "python"] + assert job_container.command == ["/bin/sh"] + @pytest.mark.parametrize("as_dir", [True, False]) def test_make_configmap(tmp_path: pathlib.Path, as_dir: bool): @@ -208,3 +236,42 @@ def test_submit_job(respx_mock: respx.MockRouter): job, code=__file__, kbatch_url="http://kbatch.com/", token="abc" ) assert result + + +def test_submit_cronjob(respx_mock: respx.MockRouter): + respx_mock.post("http://kbatch.com/cronjobs/").mock( + return_value=httpx.Response(200, json={"mock": "response"}) + ) + + cronjob = kbatch.CronJob( + name="name", + command=["/bin/sh"], + args=["-c", "python"], + image="alpine", + schedule="*/10 * * * *", + ) + + result = kbatch.submit_job( + cronjob, + kbatch_url="http://kbatch.com/", + token="abc", + model=kubernetes.client.V1CronJob, + ) + assert result + + cronjob = kbatch.CronJob( + name="name", + command=["/bin/sh"], + args=["-c", "python"], + image="alpine", + schedule="*/10 * * * *", + ) + + result = kbatch.submit_job( + cronjob, + code=__file__, + kbatch_url="http://kbatch.com/", + token="abc", + model=kubernetes.client.V1CronJob, + ) + assert result