From e4625bc5f4fe855c9a78648eb273a138fdeda0ff Mon Sep 17 00:00:00 2001 From: iameskild Date: Mon, 2 May 2022 16:50:06 -0700 Subject: [PATCH 1/9] Initial cronjob implementation --- kbatch/kbatch/_backend.py | 22 ++++++++++++++++++---- kbatch/kbatch/_core.py | 6 +++++- kbatch/kbatch/_types.py | 1 + kbatch/kbatch/cli.py | 4 ++++ 4 files changed, 28 insertions(+), 5 deletions(-) diff --git a/kbatch/kbatch/_backend.py b/kbatch/kbatch/_backend.py index c63837e..b811405 100644 --- a/kbatch/kbatch/_backend.py +++ b/kbatch/kbatch/_backend.py @@ -13,6 +13,8 @@ from kubernetes.client.models import ( V1Job, V1JobSpec, + V1CronJob, + V1CronJobSpec, V1PodSpec, V1PodTemplateSpec, V1ObjectMeta, @@ -47,7 +49,7 @@ def make_job( job: Job, profile: Optional[dict] = None, -) -> V1Job: +) -> Union[V1Job, V1CronJob]: """ Make a Kubernetes pod specification for a user-submitted job. """ @@ -61,6 +63,7 @@ def make_job( command = job.command args = job.args + schedule = job.schedule # annotations = k8s_config.annotations # labels = k8s_config.labels @@ -171,6 +174,8 @@ def make_job( generate_name = name if not name.endswith("-"): generate_name = name + "-" + if schedule: + generate_name += "cron-" job_metadata = V1ObjectMeta( generate_name=generate_name, @@ -178,7 +183,18 @@ def make_job( labels=labels, ) - job = V1Job( + if schedule: + return V1CronJob( + api_version="batch/v1", + kind="CronJob", + metadata=job_metadata, + spec=V1CronJobSpec( + template=template, + starting_deadline_seconds=300, + ), + ) + + return V1Job( api_version="batch/v1", kind="Job", metadata=job_metadata, @@ -186,8 +202,6 @@ def make_job( template=template, backoff_limit=0, ttl_seconds_after_finished=300 ), ) - return job - def make_configmap(code: Union[str, pathlib.Path], generate_name) -> V1ConfigMap: code = pathlib.Path(code) diff --git a/kbatch/kbatch/_core.py b/kbatch/kbatch/_core.py index 168bf0d..deb3cad 100644 --- a/kbatch/kbatch/_core.py +++ b/kbatch/kbatch/_core.py @@ -207,7 +207,11 @@ def submit_job( # data = job.to_kubernetes().to_dict() profile = profile or {} data = make_job(job, profile=profile).to_dict() - data = {"job": data} + + if job.schedule: + data = {"cronjob": data} + else: + data = {"job": data} if code: cm = make_configmap(code, generate_name=job.name).to_dict() cm["binary_data"]["code"] = base64.b64encode(cm["binary_data"]["code"]).decode( diff --git a/kbatch/kbatch/_types.py b/kbatch/kbatch/_types.py index 9f9f3c9..925ee96 100644 --- a/kbatch/kbatch/_types.py +++ b/kbatch/kbatch/_types.py @@ -18,6 +18,7 @@ class Job: description: Optional[str] = None env: Dict[str, str] = field(default_factory=dict) code: Optional[str] = None + schedule: Optional[str] = None def to_kubernetes(self): from ._backend import make_job diff --git a/kbatch/kbatch/cli.py b/kbatch/kbatch/cli.py index c6f838f..fc88800 100644 --- a/kbatch/kbatch/cli.py +++ b/kbatch/kbatch/cli.py @@ -86,6 +86,7 @@ def list_jobs(kbatch_url, token, output): @click.option("--image", help="Container image to use to execute job.") @click.option("--command", help="Command to execute.") @click.option("--args", help="Arguments to pass to the command.") +@click.option("--schedule", help="The schedule this CronJob should run on.") @click.option("-e", "--env", help="JSON mapping of environment variables for the job.") @click.option("-d", "--description", help="A description of the job, optional.") @click.option( @@ -112,6 +113,7 @@ def submit( image, command, args, + schedule, profile, kbatch_url, token, @@ -144,6 +146,8 @@ def submit( data["args"] = args if command is not None: data["command"] = command + if schedule is not None: + data["schedule"] = schedule if env: env = json.loads(env) data["env"] = env From 71bb9ccfde7deb15c28296a9496b627334bc2a88 Mon Sep 17 00:00:00 2001 From: iameskild Date: Mon, 2 May 2022 16:52:40 -0700 Subject: [PATCH 2/9] handle cronjob on kbatch-proxy side --- kbatch-proxy/kbatch_proxy/main.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/kbatch-proxy/kbatch_proxy/main.py b/kbatch-proxy/kbatch_proxy/main.py index 69276cd..29fd583 100644 --- a/kbatch-proxy/kbatch_proxy/main.py +++ b/kbatch-proxy/kbatch_proxy/main.py @@ -182,13 +182,14 @@ async def create_job(request: Request, user: User = Depends(get_current_user)): api, batch_api = get_k8s_api() data = await request.json() - job_data = data["job"] + model = kubernetes.client.models.V1Job if data.get("job") else kubernetes.client.models.V1CronJob + job_data = data.get("job") or data.get("cronjob") if job_template: job_data = utils.merge_json_objects(job_data, job_template) - job: kubernetes.client.models.V1Job = utils.parse( - job_data, model=kubernetes.client.models.V1Job + job = utils.parse( + job_data, model=model ) code_data = data.get("code", None) From 3266076304e28c91223ca4c1cc25f2d3c3613e2a Mon Sep 17 00:00:00 2001 From: iameskild Date: Mon, 9 May 2022 14:40:53 -0700 Subject: [PATCH 3/9] Clean up --- docs/source/user-guide.md | 17 +++++++++ kbatch-proxy/kbatch_proxy/main.py | 60 ++++++++++++++++++++++++++----- kbatch/kbatch/_backend.py | 17 ++++++--- kbatch/kbatch/_core.py | 17 +++++++++ kbatch/kbatch/_types.py | 2 +- kbatch/kbatch/cli.py | 8 +++-- 6 files changed, 104 insertions(+), 17 deletions(-) diff --git a/docs/source/user-guide.md b/docs/source/user-guide.md index ff5a45a..68d2c41 100644 --- a/docs/source/user-guide.md +++ b/docs/source/user-guide.md @@ -51,6 +51,23 @@ $ kbatch job show list-files-jfprp With `kbatch job logs` you can get the logs for a job. Make sure to pass the container id. +### Submitting a cron job + +If you'd like your job to run on a repeating schedule, just specify the schedule you would like: + +```{code-block} console +$ kbatch job submit \ + --name=list-files \ + --image=alpine \ + --command='["ls", "-lh"]' + --schedule='0 22 * * 1-5' +``` + +This job will now run at 22:00 on every day-of-week from Monday through Friday **indefinitely**. + +For those familiar with Linux cron jobs, the schedule syntax is the same. For those unfamiliar, have a read through the [Kubernetes CronJob - cron schedule syntax](https://kubernetes.io/docs/concepts/workloads/controllers/cron-jobs/#cron-schedule-syntax). The website [crontab.guru](https://crontab.guru/#0_22_*_*_1-5) is a nifty tool that tries to translate the schedule syntax into "plain" English. + + ## Submitting code files Your job likely relies on some local code files to function. Perhaps this is a notebook, shell script, or utility library that wouldn't be present in your container image. You can use the `-c` or `--code` option to provide a single file or list of files that will be made available before your job starts running. diff --git a/kbatch-proxy/kbatch_proxy/main.py b/kbatch-proxy/kbatch_proxy/main.py index 29fd583..a57da02 100644 --- a/kbatch-proxy/kbatch_proxy/main.py +++ b/kbatch-proxy/kbatch_proxy/main.py @@ -158,23 +158,17 @@ def get_k8s_api() -> Tuple[kubernetes.client.CoreV1Api, kubernetes.client.BatchV @router.get("/jobs/{job_name}") async def read_job(job_name: str, user: User = Depends(get_current_user)): - _, batch_api = get_k8s_api() - result = batch_api.read_namespaced_job(job_name, user.namespace) - return result.to_dict() + return action_on_job(job_name, user.namespace, "read") @router.get("/jobs/") async def read_jobs(user: User = Depends(get_current_user)): - api, batch_api = get_k8s_api() - result = batch_api.list_namespaced_job(user.namespace) - return result.to_dict() + return list_jobs(user.namespace) @router.delete("/jobs/{job_name}") async def delete_job(job_name: str, user: User = Depends(get_current_user)): - _, batch_api = get_k8s_api() - result = batch_api.delete_namespaced_job(job_name, user.namespace) - return result.to_dict() + return action_on_job(job_name, user.namespace, "delete") @router.post("/jobs/") @@ -359,3 +353,51 @@ def ensure_namespace(api: kubernetes.client.CoreV1Api, namespace: str): raise else: return True + + +def list_jobs(namespace: str) -> Tuple[Dict, Dict]: + """ + List Jobs and CronJobs currently running or scheduled in `namespace`. + + Parameters + ---------- + namespace : Kubernetes namespace to check for Jobs and CronJobs. + """ + _, batch_api = get_k8s_api() + jobs = batch_api.list_namespaced_job(namespace).to_dict() + cronjobs = batch_api.list_namespaced_cron_jobs(namespace).to_dict() + + return jobs, cronjobs + + + +def action_on_job(job_name: str, namespace: str, action: str) -> str: + """ + Perform an action on `job_name`. + + Parameters + ---------- + job_name : name of the Kubernetes Job or CronJob. + namespace : Kubernetes namespace to check. + action : action to perform on `job_name`. Must match one item in `job_actions` list. + """ + job_actions = ['read', 'delete'] + if action not in job_actions: + raise ValueError(f"Unknown `action` specified: {action}. Please select from one of the following: {job_actions}.") + + def _job_type(job_name, namespace): + jobs, cronjobs = list_jobs(namespace) + for job in jobs['items']: + if job['metadata']['name'] == job_name: + return 'job' + for job in cronjobs['items']: + if job['metadata']['name'] == job_name: + return 'cron_job' + + raise ValueError(f"The job name specified, {job_name} cannot be found.") + + _, batch_api = get_k8s_api() + job_type = _job_type(job_name, namespace) + f = getattr(batch_api, f"{action}_namespaced_{job_type}") + + return f(job_name, namespace).to_dict() diff --git a/kbatch/kbatch/_backend.py b/kbatch/kbatch/_backend.py index b811405..1e87838 100644 --- a/kbatch/kbatch/_backend.py +++ b/kbatch/kbatch/_backend.py @@ -13,6 +13,7 @@ from kubernetes.client.models import ( V1Job, V1JobSpec, + V1JobTemplateSpec, V1CronJob, V1CronJobSpec, V1PodSpec, @@ -183,13 +184,23 @@ def make_job( labels=labels, ) + job_spec = V1JobSpec( + template=template, backoff_limit=0, ttl_seconds_after_finished=300 + ) + if schedule: + job_template = V1JobTemplateSpec( + metadata=job_metadata, + spec=job_spec, + ) + return V1CronJob( api_version="batch/v1", kind="CronJob", metadata=job_metadata, spec=V1CronJobSpec( - template=template, + schedule=schedule, + job_template=job_template, starting_deadline_seconds=300, ), ) @@ -198,9 +209,7 @@ def make_job( api_version="batch/v1", kind="Job", metadata=job_metadata, - spec=V1JobSpec( - template=template, backoff_limit=0, ttl_seconds_after_finished=300 - ), + spec=job_spec, ) def make_configmap(code: Union[str, pathlib.Path], generate_name) -> V1ConfigMap: diff --git a/kbatch/kbatch/_core.py b/kbatch/kbatch/_core.py index deb3cad..aecb9c0 100644 --- a/kbatch/kbatch/_core.py +++ b/kbatch/kbatch/_core.py @@ -287,6 +287,23 @@ def format_jobs(data): return table +def format_cronjobs(data): + table = rich.table.Table(title="CronJobs") + + table.add_column("cronjob name", style="bold", no_wrap=True) + table.add_column("started") + table.add_column("schedule") + + for row in data["items"]: + table.add_row( + row["metadata"]["name"], + row["metadata"]["creation_timestamp"], + row["spec"]["schedule"] + ) + + return table + + def format_pods(data): table = rich.table.Table(title="Pods") diff --git a/kbatch/kbatch/_types.py b/kbatch/kbatch/_types.py index 925ee96..c0ca534 100644 --- a/kbatch/kbatch/_types.py +++ b/kbatch/kbatch/_types.py @@ -14,11 +14,11 @@ class Job: image: Optional[str] = None command: Optional[List[str]] = None args: Optional[List[str]] = None + schedule: Optional[str] = None upload: Optional[str] = None description: Optional[str] = None env: Dict[str, str] = field(default_factory=dict) code: Optional[str] = None - schedule: Optional[str] = None def to_kubernetes(self): from ._backend import make_job diff --git a/kbatch/kbatch/cli.py b/kbatch/kbatch/cli.py index fc88800..1bcee32 100644 --- a/kbatch/kbatch/cli.py +++ b/kbatch/kbatch/cli.py @@ -73,12 +73,14 @@ def delete(job_name, kbatch_url, token): ) def list_jobs(kbatch_url, token, output): """List all the jobs.""" - result = _core.list_jobs(kbatch_url, token) + jobs, cronjobs = _core.list_jobs(kbatch_url, token) if output == "json": - rich.print_json(data=result) + rich.print_json(data=jobs) + rich.print_json(data=cronjobs) elif output == "table": - rich.print(_core.format_jobs(result)) + rich.print(_core.format_jobs(jobs)) + rich.print(_core.format_cronjobs(cronjobs)) @job.command() From 78002c57ef8782b5cef77f9253c5299ffc0d9927 Mon Sep 17 00:00:00 2001 From: iameskild Date: Mon, 9 May 2022 15:11:39 -0700 Subject: [PATCH 4/9] Minor clean up --- kbatch-proxy/kbatch_proxy/main.py | 4 ++-- kbatch/kbatch/_core.py | 2 +- kbatch/kbatch/cli.py | 4 +++- 3 files changed, 6 insertions(+), 4 deletions(-) diff --git a/kbatch-proxy/kbatch_proxy/main.py b/kbatch-proxy/kbatch_proxy/main.py index a57da02..0e45274 100644 --- a/kbatch-proxy/kbatch_proxy/main.py +++ b/kbatch-proxy/kbatch_proxy/main.py @@ -367,7 +367,7 @@ def list_jobs(namespace: str) -> Tuple[Dict, Dict]: jobs = batch_api.list_namespaced_job(namespace).to_dict() cronjobs = batch_api.list_namespaced_cron_jobs(namespace).to_dict() - return jobs, cronjobs + return {"jobs": jobs, "cronjobs": cronjobs} @@ -394,7 +394,7 @@ def _job_type(job_name, namespace): if job['metadata']['name'] == job_name: return 'cron_job' - raise ValueError(f"The job name specified, {job_name} cannot be found.") + raise ValueError(f"The job name specified, {job_name}, cannot be found.") _, batch_api = get_k8s_api() job_type = _job_type(job_name, namespace) diff --git a/kbatch/kbatch/_core.py b/kbatch/kbatch/_core.py index aecb9c0..7af59c1 100644 --- a/kbatch/kbatch/_core.py +++ b/kbatch/kbatch/_core.py @@ -298,7 +298,7 @@ def format_cronjobs(data): table.add_row( row["metadata"]["name"], row["metadata"]["creation_timestamp"], - row["spec"]["schedule"] + row["spec"]["schedule"], ) return table diff --git a/kbatch/kbatch/cli.py b/kbatch/kbatch/cli.py index 1bcee32..8f12574 100644 --- a/kbatch/kbatch/cli.py +++ b/kbatch/kbatch/cli.py @@ -73,7 +73,9 @@ def delete(job_name, kbatch_url, token): ) def list_jobs(kbatch_url, token, output): """List all the jobs.""" - jobs, cronjobs = _core.list_jobs(kbatch_url, token) + results = _core.list_jobs(kbatch_url, token) + jobs = results.get("jobs", {}) + cronjobs = results.get("cronjobs", {}) if output == "json": rich.print_json(data=jobs) From 66f7623dba02b301e093f20f25523a7e2d45850c Mon Sep 17 00:00:00 2001 From: iameskild Date: Fri, 27 May 2022 09:45:39 -0700 Subject: [PATCH 5/9] make cronjob first class citizen --- .pre-commit-config.yaml | 2 +- kbatch-proxy/kbatch_proxy/main.py | 224 +++++++++++++---------- kbatch/kbatch/__init__.py | 7 + kbatch/kbatch/_backend.py | 287 ++++++++++++++++++++++++++---- kbatch/kbatch/_core.py | 178 +++++++++++------- kbatch/kbatch/_types.py | 25 ++- kbatch/kbatch/cli.py | 217 ++++++++++++++++------ 7 files changed, 679 insertions(+), 261 deletions(-) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index e7b4433..4e87e5a 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -1,6 +1,6 @@ repos: - repo: https://github.com/psf/black - rev: 21.5b1 + rev: 22.3.0 hooks: - id: black language_version: python3 diff --git a/kbatch-proxy/kbatch_proxy/main.py b/kbatch-proxy/kbatch_proxy/main.py index 0e45274..9571cb1 100644 --- a/kbatch-proxy/kbatch_proxy/main.py +++ b/kbatch-proxy/kbatch_proxy/main.py @@ -155,110 +155,51 @@ def get_k8s_api() -> Tuple[kubernetes.client.CoreV1Api, kubernetes.client.BatchV # ---------------------------------------------------------------------------- # app - -@router.get("/jobs/{job_name}") -async def read_job(job_name: str, user: User = Depends(get_current_user)): +# cronjobs # +@router.get("/cronjobs/{job_name}") +async def read_cronjob(job_name: str, user: User = Depends(get_current_user)): return action_on_job(job_name, user.namespace, "read") -@router.get("/jobs/") -async def read_jobs(user: User = Depends(get_current_user)): +@router.get("/cronjobs/") +async def read_cronjobs(user: User = Depends(get_current_user)): return list_jobs(user.namespace) -@router.delete("/jobs/{job_name}") -async def delete_job(job_name: str, user: User = Depends(get_current_user)): +@router.delete("/cronjobs/{job_name}") +async def delete_cronjob(job_name: str, user: User = Depends(get_current_user)): return action_on_job(job_name, user.namespace, "delete") -@router.post("/jobs/") -async def create_job(request: Request, user: User = Depends(get_current_user)): - api, batch_api = get_k8s_api() - +@router.post("/cronjobs/") +async def create_cronjob(request: Request, user: User = Depends(get_current_user)): data = await request.json() - model = kubernetes.client.models.V1Job if data.get("job") else kubernetes.client.models.V1CronJob - job_data = data.get("job") or data.get("cronjob") - - if job_template: - job_data = utils.merge_json_objects(job_data, job_template) + model = kubernetes.client.models.V1CronJob + return create(data, model, user) - job = utils.parse( - job_data, model=model - ) - code_data = data.get("code", None) - if code_data: - # The contents were base64encoded prior to being JSON serialized - # we have to decode it *after* submitting things to the API server... - # This is not great. - # code_data["binary_data"]["code"] = base64.b64decode(code_data["binary_data"]["code"]) - config_map: Optional[kubernetes.client.models.V1ConfigMap] = utils.parse( - code_data, model=kubernetes.client.models.V1ConfigMap - ) - else: - config_map = None +# jobs # +@router.get("/jobs/{job_name}") +async def read_job(job_name: str, user: User = Depends(get_current_user)): + return action_on_job(job_name, user.namespace, "read") - patch.patch( - job, - config_map, - annotations={}, - labels={}, - username=user.name, - ttl_seconds_after_finished=settings.kbatch_job_ttl_seconds_after_finished, - extra_env=settings.kbatch_job_extra_env, - api_token=user.api_token, - ) - # What needs to happen when? We have a few requirements - # 1. The code ConfigMap must exist before adding it as a volume (we need a name, - # and k8s requires that) - # 2. MAYBE: The Job must exist before adding it as an owner for the ConfigMap - # - # So I think we're at 3 requests: - # - # 1. Submit configmap - # - .. - # 2. Submit Job - # 3. Patch ConfigMap to add Job as the owner - if settings.kbatch_create_user_namespace: - logger.info("Ensuring namespace %s", user.namespace) - created = ensure_namespace(api, user.namespace) - if created: - logger.info("Created namespace %s", user.namespace) +@router.get("/jobs/") +async def read_jobs(user: User = Depends(get_current_user)): + return list_jobs(user.namespace) - if config_map: - logger.info("Submitting ConfigMap") - config_map = api.create_namespaced_config_map( - namespace=user.namespace, body=config_map - ) - patch.add_submitted_configmap_name(job, config_map) - logger.info("Submitting job") - try: - resp = batch_api.create_namespaced_job(namespace=user.namespace, body=job) - except kubernetes.client.exceptions.ApiException as e: - content_type = e.headers.get("Content-Type") - if content_type: - headers = {"Content-Type": content_type} - else: - headers = {} - raise HTTPException(status_code=e.status, detail=e.body, headers=headers) +@router.delete("/jobs/{job_name}") +async def delete_job(job_name: str, user: User = Depends(get_current_user)): + return action_on_job(job_name, user.namespace, "delete") - if config_map: - logger.info( - "patching configmap %s with owner %s", - config_map.metadata.name, - resp.metadata.name, - ) - patch.patch_configmap_owner(resp, config_map) - api.patch_namespaced_config_map( - name=config_map.metadata.name, namespace=user.namespace, body=config_map - ) - # TODO: set Job as the owner of the code. - return resp.to_dict() +@router.post("/jobs/") +async def create_job(request: Request, user: User = Depends(get_current_user)): + return create_job() +# pods # @router.get("/pods/{pod_name}") async def read_pod(pod_name: str, user: User = Depends(get_current_user)): core_api, _ = get_k8s_api() @@ -355,6 +296,98 @@ def ensure_namespace(api: kubernetes.client.CoreV1Api, namespace: str): return True +# def create_job(request: Request, user: User = Depends(get_current_user)): +def create(data: dict, model, user: User = Depends(get_current_user)): + api, batch_api = get_k8s_api() + + # data = await request.json() + # model = ( + # kubernetes.client.models.V1Job + # if data.get("job") + # else kubernetes.client.models.V1CronJob + # ) + + job_data = data.get("job") or data.get("cronjob") + + if job_template: + job_data = utils.merge_json_objects(job_data, job_template) + + job = utils.parse(job_data, model=model) + + code_data = data.get("code", None) + if code_data: + # The contents were base64encoded prior to being JSON serialized + # we have to decode it *after* submitting things to the API server... + # This is not great. + # code_data["binary_data"]["code"] = base64.b64decode(code_data["binary_data"]["code"]) + config_map: Optional[kubernetes.client.models.V1ConfigMap] = utils.parse( + code_data, model=kubernetes.client.models.V1ConfigMap + ) + else: + config_map = None + + # How much of patch.py needs to be updated to accommodate cronjobs? + patch.patch( + job, + config_map, + annotations={}, + labels={}, + username=user.name, + ttl_seconds_after_finished=settings.kbatch_job_ttl_seconds_after_finished, + extra_env=settings.kbatch_job_extra_env, + api_token=user.api_token, + ) + + # What needs to happen when? We have a few requirements + # 1. The code ConfigMap must exist before adding it as a volume (we need a name, + # and k8s requires that) + # 2. MAYBE: The Job must exist before adding it as an owner for the ConfigMap + # + # So I think we're at 3 requests: + # + # 1. Submit configmap + # - .. + # 2. Submit Job + # 3. Patch ConfigMap to add Job as the owner + if settings.kbatch_create_user_namespace: + logger.info("Ensuring namespace %s", user.namespace) + created = ensure_namespace(api, user.namespace) + if created: + logger.info("Created namespace %s", user.namespace) + + if config_map: + logger.info("Submitting ConfigMap") + config_map = api.create_namespaced_config_map( + namespace=user.namespace, body=config_map + ) + patch.add_submitted_configmap_name(job, config_map) + + logger.info("Submitting job") + try: + resp = batch_api.create_namespaced_job(namespace=user.namespace, body=job) + except kubernetes.client.exceptions.ApiException as e: + content_type = e.headers.get("Content-Type") + if content_type: + headers = {"Content-Type": content_type} + else: + headers = {} + raise HTTPException(status_code=e.status, detail=e.body, headers=headers) + + if config_map: + logger.info( + "patching configmap %s with owner %s", + config_map.metadata.name, + resp.metadata.name, + ) + patch.patch_configmap_owner(resp, config_map) + api.patch_namespaced_config_map( + name=config_map.metadata.name, namespace=user.namespace, body=config_map + ) + + # TODO: set Job as the owner of the code. + return resp.to_dict() + + def list_jobs(namespace: str) -> Tuple[Dict, Dict]: """ List Jobs and CronJobs currently running or scheduled in `namespace`. @@ -370,7 +403,6 @@ def list_jobs(namespace: str) -> Tuple[Dict, Dict]: return {"jobs": jobs, "cronjobs": cronjobs} - def action_on_job(job_name: str, namespace: str, action: str) -> str: """ Perform an action on `job_name`. @@ -379,20 +411,24 @@ def action_on_job(job_name: str, namespace: str, action: str) -> str: ---------- job_name : name of the Kubernetes Job or CronJob. namespace : Kubernetes namespace to check. - action : action to perform on `job_name`. Must match one item in `job_actions` list. + action : action to perform on `job_name`. + Must match one item in `job_actions` list. """ - job_actions = ['read', 'delete'] + job_actions = ["read", "delete"] if action not in job_actions: - raise ValueError(f"Unknown `action` specified: {action}. Please select from one of the following: {job_actions}.") + raise ValueError( + f"Unknown `action` specified: {action}. " + + "Please select from one of the following: {job_actions}." + ) def _job_type(job_name, namespace): jobs, cronjobs = list_jobs(namespace) - for job in jobs['items']: - if job['metadata']['name'] == job_name: - return 'job' - for job in cronjobs['items']: - if job['metadata']['name'] == job_name: - return 'cron_job' + for job in jobs["items"]: + if job["metadata"]["name"] == job_name: + return "job" + for job in cronjobs["items"]: + if job["metadata"]["name"] == job_name: + return "cron_job" raise ValueError(f"The job name specified, {job_name}, cannot be found.") diff --git a/kbatch/kbatch/__init__.py b/kbatch/kbatch/__init__.py index d25c522..be12dd7 100644 --- a/kbatch/kbatch/__init__.py +++ b/kbatch/kbatch/__init__.py @@ -26,3 +26,10 @@ "show_job", "logs", ] + +# canonical resource "kind" names +CRONJOBS = "cronjobs" +JOBS = "jobs" +PODS = "pods" + +RESOURCE_KIND = [CRONJOBS, JOBS, PODS] diff --git a/kbatch/kbatch/_backend.py b/kbatch/kbatch/_backend.py index 1e87838..7e73ae6 100644 --- a/kbatch/kbatch/_backend.py +++ b/kbatch/kbatch/_backend.py @@ -8,7 +8,7 @@ import shutil import tempfile import zipfile -from typing import Optional, List, Dict, Union +from typing import Optional, List, Dict, Union, Type from kubernetes.client.models import ( V1Job, @@ -31,7 +31,7 @@ V1NodeSelectorRequirement, ) -from ._types import Job +from ._types import BaseJob, CronJob, Job SAFE_CHARS = set(string.ascii_lowercase + string.digits) @@ -47,13 +47,12 @@ # ) -def make_job( - job: Job, +def _make_job_spec( + job: Type[BaseJob], profile: Optional[dict] = None, -) -> Union[V1Job, V1CronJob]: - """ - Make a Kubernetes pod specification for a user-submitted job. - """ + labels: Optional[dict] = None, + annotations: Optional[dict] = None, +): profile = profile or {} name = job.name # TODO: deduplicate somehow... image = job.image or profile.get("image", None) @@ -64,20 +63,8 @@ def make_job( command = job.command args = job.args - schedule = job.schedule - - # annotations = k8s_config.annotations - # labels = k8s_config.labels env = job.env - # annotations = annotations or {} - annotations: Dict[str, str] = {} - # TODO: set in proxy - - # labels = labels or {} - # labels = dict(labels) - labels: Dict[str, str] = {} - # file_volume_mount = V1VolumeMount(mount_path="/code", name="file-volume") # file_volume = V1Volume(name="file-volume", empty_dir={}) @@ -172,11 +159,46 @@ def make_job( metadata=pod_metadata, ) + # generate_name = name + # if not name.endswith("-"): + # generate_name = name + "-" + # if schedule: + # generate_name += "cron-" + + return V1JobSpec(template=template, backoff_limit=0, ttl_seconds_after_finished=300) + + +def _make_job_name(name: str, schedule: str = None): generate_name = name if not name.endswith("-"): generate_name = name + "-" if schedule: generate_name += "cron-" + return generate_name + + +def make_cronjob( + cronjob: CronJob, + profile: Optional[dict] = None, +) -> V1CronJob: + """ + Make a Kubernetes pod specification for a user-submitted cronjob. + """ + name = cronjob.name + schedule = cronjob.schedule + + generate_name = _make_job_name(name, schedule=schedule) + + # annotations = k8s_config.annotations + # labels = k8s_config.labels + annotations: Dict[str, str] = {} + # TODO: set in proxy + + # labels = labels or {} + # labels = dict(labels) + labels: Dict[str, str] = {} + + job_spec = _make_job_spec(cronjob, profile, labels, annotations) job_metadata = V1ObjectMeta( generate_name=generate_name, @@ -184,26 +206,50 @@ def make_job( labels=labels, ) - job_spec = V1JobSpec( - template=template, backoff_limit=0, ttl_seconds_after_finished=300 + job_template = V1JobTemplateSpec( + metadata=job_metadata, + spec=job_spec, + ) + + return V1CronJob( + api_version="batch/v1", + kind="CronJob", + metadata=job_metadata, + spec=V1CronJobSpec( + schedule=cronjob.schedule, + job_template=job_template, + starting_deadline_seconds=300, + ), ) - if schedule: - job_template = V1JobTemplateSpec( - metadata=job_metadata, - spec=job_spec, - ) - return V1CronJob( - api_version="batch/v1", - kind="CronJob", - metadata=job_metadata, - spec=V1CronJobSpec( - schedule=schedule, - job_template=job_template, - starting_deadline_seconds=300, - ), - ) +def make_job( + job: Job, + profile: Optional[dict] = None, +) -> V1Job: + """ + Make a Kubernetes pod specification for a user-submitted job. + """ + name = job.name + + generate_name = _make_job_name(name) + + # annotations = k8s_config.annotations + # labels = k8s_config.labels + annotations: Dict[str, str] = {} + # TODO: set in proxy + + # labels = labels or {} + # labels = dict(labels) + labels: Dict[str, str] = {} + + job_spec = _make_job_spec(job, profile, labels, annotations) + + job_metadata = V1ObjectMeta( + generate_name=generate_name, + annotations=annotations, + labels=labels, + ) return V1Job( api_version="batch/v1", @@ -212,6 +258,173 @@ def make_job( spec=job_spec, ) + +# def make_job( +# job: Type[BaseJob], +# profile: Optional[dict] = None, +# ) -> Union[V1Job, V1CronJob]: +# """ +# Make a Kubernetes pod specification for a user-submitted job. +# """ +# profile = profile or {} +# name = job.name # TODO: deduplicate somehow... +# image = job.image or profile.get("image", None) +# if image is None: +# raise TypeError( +# "Must specify 'image', either with `--image` or from the profile." +# ) + +# command = job.command +# args = job.args + + +# # annotations = k8s_config.annotations +# # labels = k8s_config.labels +# env = job.env + +# annotations = annotations or {} +# annotations: Dict[str, str] = {} +# # TODO: set in proxy + +# # labels = labels or {} +# # labels = dict(labels) +# labels: Dict[str, str] = {} + +# file_volume_mount = V1VolumeMount(mount_path="/code", name="file-volume") +# file_volume = V1Volume(name="file-volume", empty_dir={}) + +# env_vars: Optional[List[V1EnvVar]] = None +# if env: +# env_vars = [V1EnvVar(name=k, value=v) for k, v in env.items()] + +# container = V1Container( +# args=args, +# command=command, +# image=image, +# name="job", +# env=env_vars, +# # volume_mounts=[file_volume_mount], +# resources=V1ResourceRequirements(), +# # TODO: this is important. validate it! +# working_dir="/code", +# ) + +# resources = profile.get("resources", {}) +# limits = resources.get("limits", {}) +# requests = resources.get("requests", {}) + +# container.resources.requests = {} +# container.resources.limits = {} + +# if requests: +# container.resources.requests.update(requests) +# if limits: +# container.resources.limits.update(limits) + +# pod_metadata = V1ObjectMeta( +# name=f"{name}-pod", +# # namespace=k8s_config.namespace, +# labels=labels, +# annotations=annotations, +# ) +# tolerations = None +# if profile.get("tolerations", []): +# tolerations = [V1Toleration(**v) for v in profile["tolerations"]] + +# node_affinity_required = profile.get("node_affinity_required", {}) +# if node_affinity_required: +# match_expressions = [] +# match_fields = [] +# for d in node_affinity_required: +# for k, affinities in d.items(): +# for v in affinities: +# if k == "matchExpressions": +# match_expressions.append( +# V1NodeSelectorRequirement( +# key=v.get("key"), +# operator=v.get("operator"), +# values=v.get("values"), +# ) +# ) +# elif k == "matchFields": +# match_fields.append( +# V1NodeSelectorRequirement( +# key=v.get("key"), +# operator=v.get("operator"), +# values=v.get("values"), +# ) +# ) +# else: +# raise ValueError( +# "Key must be 'matchExpressions' or 'matchFields'. Got {k} instead." +# ) + +# node_selector_terms = V1NodeSelectorTerm( +# match_expressions=match_expressions, +# match_fields=match_fields, +# ) +# node_selector = V1NodeSelector(node_selector_terms=[node_selector_terms]) +# node_affinity = V1NodeAffinity( +# required_during_scheduling_ignored_during_execution=node_selector +# ) +# affinity = V1Affinity(node_affinity=node_affinity) +# else: +# affinity = None + +# # TODO: verify restart policy +# template = V1PodTemplateSpec( +# spec=V1PodSpec( +# # init_containers=init_containers, +# containers=[container], +# restart_policy="Never", +# # volumes=[file_volume], +# tolerations=tolerations, +# affinity=affinity, +# ), +# metadata=pod_metadata, +# ) + +# generate_name = name +# if not name.endswith("-"): +# generate_name = name + "-" +# if schedule: +# generate_name += "cron-" + +# job_metadata = V1ObjectMeta( +# generate_name=generate_name, +# annotations=annotations, +# labels=labels, +# ) + +# job_spec = V1JobSpec( +# template=template, backoff_limit=0, ttl_seconds_after_finished=300 +# ) + +# if schedule: +# job_template = V1JobTemplateSpec( +# metadata=job_metadata, +# spec=job_spec, +# ) + +# return V1CronJob( +# api_version="batch/v1", +# kind="CronJob", +# metadata=job_metadata, +# spec=V1CronJobSpec( +# schedule=schedule, +# job_template=job_template, +# starting_deadline_seconds=300, +# ), +# ) + +# return V1Job( +# api_version="batch/v1", +# kind="Job", +# metadata=job_metadata, +# spec=job_spec, +# ) + + def make_configmap(code: Union[str, pathlib.Path], generate_name) -> V1ConfigMap: code = pathlib.Path(code) diff --git a/kbatch/kbatch/_core.py b/kbatch/kbatch/_core.py index 7af59c1..2eacb99 100644 --- a/kbatch/kbatch/_core.py +++ b/kbatch/kbatch/_core.py @@ -4,18 +4,25 @@ import logging import json from pathlib import Path -from typing import Optional, Dict, Union +from typing import Optional, Dict import rich.table import httpx import urllib.parse +import yaml -from ._types import Job from ._backend import make_configmap logger = logging.getLogger(__name__) +# canonical resource "kind" names +CRONJOBS = "cronjobs" +JOBS = "jobs" +PODS = "pods" + +RESOURCE_KIND = [CRONJOBS, JOBS, PODS] + def config_path() -> Path: config_home = ( @@ -76,7 +83,14 @@ def configure(kbatch_url=None, token=None) -> Path: return configpath -def _do_job(job_name, kbatch_url, token, method: str): +def _request_action( + kbatch_url: str, + token: str, + method: str, + resource_kind: str, + resource_name: Optional[dict] = None, + json_data: Optional[dict] = None, +): client = httpx.Client(follow_redirects=True) config = load_config() @@ -87,37 +101,67 @@ def _do_job(job_name, kbatch_url, token, method: str): "Authorization": f"token {token}", } + http_methods = ["GET", "DELETE", "POST"] + if method not in http_methods: + raise ValueError( + f"Unknown method specified: {method}. " + + "Please select from one of the following: {http_methods}." + ) + + if resource_kind not in RESOURCE_KIND: + raise ValueError( + f"Unknown resource specified: {resource_kind}. " + + "Please select from one of the following: {RESOURCE_KIND}." + ) + + endpoint = f"{resource_kind}/" + if resource_name: + endpoint += resource_name + r = client.request( - method, urllib.parse.urljoin(kbatch_url, f"jobs/{job_name}"), headers=headers + method, + urllib.parse.urljoin(kbatch_url, endpoint), + headers=headers, + json=json_data, ) - r.raise_for_status() + print(r) + try: + r.raise_for_status() + except Exception: + logger.exception(r.json()) + raise return r.json() -def show_job(job_name, kbatch_url, token): - return _do_job(job_name, kbatch_url, token, method="GET") +def show_job(resource_name, kbatch_url, token, resource_kind="jobs"): + return _request_action(kbatch_url, token, "GET", resource_kind, resource_name) -def delete_job(job_name, kbatch_url, token): - return _do_job(job_name, kbatch_url, token, method="DELETE") +def delete_job(resource_name, kbatch_url, token, resource_kind="jobs"): + return _request_action(kbatch_url, token, "DELETE", resource_kind, resource_name) -def list_jobs(kbatch_url, token): - client = httpx.Client(follow_redirects=True) - config = load_config() +def list_jobs(kbatch_url, token, resource_kind="jobs"): + return _request_action(kbatch_url, token, "GET", resource_kind) - token = token or os.environ.get("JUPYTERHUB_API_TOKEN") or config["token"] - kbatch_url = handle_url(kbatch_url, config) - headers = { - "Authorization": f"token {token}", - } +def submit_job( + job, kbatch_url, token=None, resource_kind="jobs", code=None, profile=None +): + from ._backend import make_job - r = client.get(urllib.parse.urljoin(kbatch_url, "jobs/"), headers=headers) - r.raise_for_status() + profile = profile or {} + data = make_job(job, profile=profile).to_dict() - return r.json() + if code: + cm = make_configmap(code, generate_name=job.name).to_dict() + cm["binary_data"]["code"] = base64.b64encode(cm["binary_data"]["code"]).decode( + "ascii" + ) + data["code"] = cm + print(data) + return _request_action(kbatch_url, token, "POST", resource_kind, json_data=data) def list_pods(kbatch_url: str, token: Optional[str], job_name: Optional[str] = None): @@ -185,54 +229,6 @@ def _logs( yield r.text -def submit_job( - job: Job, - *, - code: Optional[Union[str, Path]] = None, - kbatch_url: Optional[str] = None, - token: Optional[str] = None, - profile: Optional[dict] = None, -): - from ._backend import make_job - - config = load_config() - - client = httpx.Client(follow_redirects=True) - token = token or os.environ.get("JUPYTERHUB_API_TOKEN") or config["token"] - kbatch_url = handle_url(kbatch_url, config) - - headers = { - "Authorization": f"token {token}", - } - # data = job.to_kubernetes().to_dict() - profile = profile or {} - data = make_job(job, profile=profile).to_dict() - - if job.schedule: - data = {"cronjob": data} - else: - data = {"job": data} - if code: - cm = make_configmap(code, generate_name=job.name).to_dict() - cm["binary_data"]["code"] = base64.b64encode(cm["binary_data"]["code"]).decode( - "ascii" - ) - data["code"] = cm - - r = client.post( - urllib.parse.urljoin(kbatch_url, "jobs/"), - json=data, - headers=headers, - ) - try: - r.raise_for_status() - except Exception: - logger.exception(r.json()) - raise - - return r.json() - - def status(row): if row["status"]["succeeded"]: return "[green]done[/green]" @@ -340,3 +336,49 @@ def load_profile(profile_name: str, kbatch_url: str) -> dict: profiles = show_profiles(kbatch_url) profile = profiles[profile_name] return profile + + +def _prep_job_data( + file, + code, + name, + description, + image, + command, + args, + profile, + kbatch_url, + env, +): + if command: + command = json.loads(command) + if args: + args = json.loads(args) + + data = {} + + if file: + data = yaml.safe_load(Path(file).read_text()) + + data_profile = data.pop("profile", None) + profile = profile or data_profile or {} + + if name is not None: + data["name"] = name + if description is not None: + data["description"] = description + if image is not None: + data["image"] = image + if args is not None: + data["args"] = args + if command is not None: + data["command"] = command + if env: + env = json.loads(env) + data["env"] = env + + code = code or data.pop("code", None) + if profile: + profile = load_profile(profile, kbatch_url) + + return data diff --git a/kbatch/kbatch/_types.py b/kbatch/kbatch/_types.py index c0ca534..b8cefd7 100644 --- a/kbatch/kbatch/_types.py +++ b/kbatch/kbatch/_types.py @@ -9,18 +9,39 @@ class User: @dataclass() -class Job: +class _BaseJob: name: str + + +@dataclass() +class _BaseJobDefaults: image: Optional[str] = None command: Optional[List[str]] = None args: Optional[List[str]] = None - schedule: Optional[str] = None upload: Optional[str] = None description: Optional[str] = None env: Dict[str, str] = field(default_factory=dict) code: Optional[str] = None + +@dataclass() +class _CronJob: + schedule: str + + +@dataclass() +class BaseJob(_BaseJobDefaults, _BaseJob): def to_kubernetes(self): from ._backend import make_job return make_job(self) + + +@dataclass() +class Job(BaseJob): + pass + + +@dataclass() +class CronJob(BaseJob, _CronJob): + schedule: str diff --git a/kbatch/kbatch/cli.py b/kbatch/kbatch/cli.py index 8f12574..70469ed 100644 --- a/kbatch/kbatch/cli.py +++ b/kbatch/kbatch/cli.py @@ -1,14 +1,12 @@ -import json import logging -from pathlib import Path import click import rich import rich.logging -import yaml from . import _core -from ._types import Job +from ._core import CRONJOBS, JOBS +from ._types import CronJob, Job FORMAT = "%(message)s" @@ -35,6 +33,142 @@ def configure(kbatch_url, token): rich.print(f"[green]Wrote config to[/green] [bold]{str(p)}[/bold]") +@cli.command() +@click.option("--kbatch-url", help="URL to the kbatch server.") +def profiles(kbatch_url): + """ + Show the profiles used by the server. + + kbatch administrators can serve profiles at the "/profiles" endpoint with + some configuration values for various profiles. + """ + p = _core.show_profiles(kbatch_url) + rich.print_json(data=p) + + +# CRONJOB +@cli.group() +def cronjob(): + """Manage kbatch cronjobs.""" + pass + + +@cronjob.command() +@click.option("--kbatch-url", help="URL to the kbatch server.") +@click.option("--token", help="File to execute.") +@click.argument("cronjob_name") +def show_cronjob(cronjob_name, kbatch_url, token): + """Show the details for a cronjob.""" + result = _core.show_job(cronjob_name, kbatch_url, token, CRONJOBS) + rich.print_json(data=result) + + +@cronjob.command() +@click.option("--kbatch-url", help="URL to the kbatch server.") +@click.option("--token", help="File to execute.") +@click.argument("cronjob_name") +def delete_cronjob(cronjob_name, kbatch_url, token): + """Delete a cronjob, cancelling running jobs and pods.""" + result = _core.delete_job(cronjob_name, kbatch_url, token, CRONJOBS) + rich.print_json(data=result) + + +@cronjob.command(name="list") +@click.option("--kbatch-url", help="URL to the kbatch server.") +@click.option("--token", help="File to execute.") +@click.option( + "-o", + "--output", + help="output format", + type=click.Choice(["json", "table"]), + default="json", +) +def list_cronjobs(kbatch_url, token, output): + """List all the cronjobs.""" + results = _core.list_jobs(kbatch_url, token, CRONJOBS) + + if output == "json": + rich.print_json(data=results) + elif output == "table": + rich.print(_core.format_cronjobs(results)) + + +@cronjob.command() +@click.option("-n", "--name", help="CronJob name.") +@click.option("--image", help="Container image to use to execute job.") +@click.option("--command", help="Command to execute.") +@click.option("--args", help="Arguments to pass to the command.") +@click.option("--schedule", help="The schedule this CronJob should run on.") +@click.option("-e", "--env", help="JSON mapping of environment variables for the job.") +@click.option("-d", "--description", help="A description of the CronJob, optional.") +@click.option( + "-c", + "--code", + help="Local file or directory of source code to make available to the cronjob.", +) +@click.option("-p", "--profile", help="Profile name to use. See 'kbatch profiles'.") +@click.option("-f", "--file", help="Configuration file.") +@click.option("--kbatch-url", help="URL to the kbatch server.") +@click.option("--token", help="JupyterHub API token.") +@click.option( + "-o", + "--output", + default="json", + help="Output format.", + type=click.Choice(["json", "name"]), +) +def submit_cronjob( + file, + code, + name, + description, + image, + command, + args, + schedule, + profile, + kbatch_url, + token, + env, + output, +): + """ + Submit a CronJob to run on Kubernetes. + """ + + data = _core._prep_job_data( + file, + code, + name, + description, + image, + command, + args, + profile, + kbatch_url, + env, + ) + + if schedule is not None: + data["schedule"] = schedule + + cronjob = CronJob(**data) + + result = _core.submit_job( + job=cronjob, + kbatch_url=kbatch_url, + token=token, + resource_type=CRONJOBS, + code=code, + profile=profile, + ) + if output == "json": + rich.print_json(data=result) + elif output == "name": + print(result["metadata"]["name"]) + + +# JOB @cli.group() def job(): """Manage kbatch jobs.""" @@ -47,7 +181,7 @@ def job(): @click.argument("job_name") def show(job_name, kbatch_url, token): """Show the details for a job.""" - result = _core.show_job(job_name, kbatch_url, token) + result = _core.show_job(job_name, kbatch_url, token, JOBS) rich.print_json(data=result) @@ -57,7 +191,7 @@ def show(job_name, kbatch_url, token): @click.argument("job_name") def delete(job_name, kbatch_url, token): """Delete a job, cancelling running pods.""" - result = _core.delete_job(job_name, kbatch_url, token) + result = _core.delete_job(job_name, kbatch_url, token, JOBS) rich.print_json(data=result) @@ -73,16 +207,12 @@ def delete(job_name, kbatch_url, token): ) def list_jobs(kbatch_url, token, output): """List all the jobs.""" - results = _core.list_jobs(kbatch_url, token) - jobs = results.get("jobs", {}) - cronjobs = results.get("cronjobs", {}) + results = _core.list_jobs(kbatch_url, token, JOBS) if output == "json": - rich.print_json(data=jobs) - rich.print_json(data=cronjobs) + rich.print_json(results) elif output == "table": - rich.print(_core.format_jobs(jobs)) - rich.print(_core.format_cronjobs(cronjobs)) + rich.print(results) @job.command() @@ -117,7 +247,6 @@ def submit( image, command, args, - schedule, profile, kbatch_url, token, @@ -127,46 +256,28 @@ def submit( """ Submit a job to run on Kubernetes. """ - if command: - command = json.loads(command) - if args: - args = json.loads(args) - - data = {} - - if file: - data = yaml.safe_load(Path(file).read_text()) - - data_profile = data.pop("profile", None) - profile = profile or data_profile or {} - - if name is not None: - data["name"] = name - if description is not None: - data["description"] = description - if image is not None: - data["image"] = image - if args is not None: - data["args"] = args - if command is not None: - data["command"] = command - if schedule is not None: - data["schedule"] = schedule - if env: - env = json.loads(env) - data["env"] = env - code = code or data.pop("code", None) - if profile: - profile = _core.load_profile(profile, kbatch_url) + data = _core._prep_job_data( + file, + code, + name, + description, + image, + command, + args, + profile, + kbatch_url, + env, + ) job = Job(**data) result = _core.submit_job( job, - code=code, kbatch_url=kbatch_url, token=token, + resource_type=JOBS, + code=code, profile=profile, ) if output == "json": @@ -175,19 +286,7 @@ def submit( print(result["metadata"]["name"]) -@cli.command() -@click.option("--kbatch-url", help="URL to the kbatch server.") -def profiles(kbatch_url): - """ - Show the profiles used by the server. - - kbatch administrators can serve profiles at the "/profiles" endpoint with - some configuration values for various profiles. - """ - p = _core.show_profiles(kbatch_url) - rich.print_json(data=p) - - +# POD @cli.group() def pod(): """Manage job pods.""" From 6002f315bcb99bb356cfc6ecb87a97166872c000 Mon Sep 17 00:00:00 2001 From: iameskild Date: Fri, 27 May 2022 09:57:40 -0700 Subject: [PATCH 6/9] clean up --- kbatch-proxy/kbatch_proxy/main.py | 6 +- kbatch/kbatch/__init__.py | 7 -- kbatch/kbatch/_backend.py | 172 ------------------------------ 3 files changed, 3 insertions(+), 182 deletions(-) diff --git a/kbatch-proxy/kbatch_proxy/main.py b/kbatch-proxy/kbatch_proxy/main.py index 9571cb1..e264440 100644 --- a/kbatch-proxy/kbatch_proxy/main.py +++ b/kbatch-proxy/kbatch_proxy/main.py @@ -196,7 +196,9 @@ async def delete_job(job_name: str, user: User = Depends(get_current_user)): @router.post("/jobs/") async def create_job(request: Request, user: User = Depends(get_current_user)): - return create_job() + data = await request.json() + model = kubernetes.client.models.V1Job + return create(data, model, user) # pods # @@ -296,7 +298,6 @@ def ensure_namespace(api: kubernetes.client.CoreV1Api, namespace: str): return True -# def create_job(request: Request, user: User = Depends(get_current_user)): def create(data: dict, model, user: User = Depends(get_current_user)): api, batch_api = get_k8s_api() @@ -326,7 +327,6 @@ def create(data: dict, model, user: User = Depends(get_current_user)): else: config_map = None - # How much of patch.py needs to be updated to accommodate cronjobs? patch.patch( job, config_map, diff --git a/kbatch/kbatch/__init__.py b/kbatch/kbatch/__init__.py index be12dd7..d25c522 100644 --- a/kbatch/kbatch/__init__.py +++ b/kbatch/kbatch/__init__.py @@ -26,10 +26,3 @@ "show_job", "logs", ] - -# canonical resource "kind" names -CRONJOBS = "cronjobs" -JOBS = "jobs" -PODS = "pods" - -RESOURCE_KIND = [CRONJOBS, JOBS, PODS] diff --git a/kbatch/kbatch/_backend.py b/kbatch/kbatch/_backend.py index 7e73ae6..ee30ceb 100644 --- a/kbatch/kbatch/_backend.py +++ b/kbatch/kbatch/_backend.py @@ -159,12 +159,6 @@ def _make_job_spec( metadata=pod_metadata, ) - # generate_name = name - # if not name.endswith("-"): - # generate_name = name + "-" - # if schedule: - # generate_name += "cron-" - return V1JobSpec(template=template, backoff_limit=0, ttl_seconds_after_finished=300) @@ -259,172 +253,6 @@ def make_job( ) -# def make_job( -# job: Type[BaseJob], -# profile: Optional[dict] = None, -# ) -> Union[V1Job, V1CronJob]: -# """ -# Make a Kubernetes pod specification for a user-submitted job. -# """ -# profile = profile or {} -# name = job.name # TODO: deduplicate somehow... -# image = job.image or profile.get("image", None) -# if image is None: -# raise TypeError( -# "Must specify 'image', either with `--image` or from the profile." -# ) - -# command = job.command -# args = job.args - - -# # annotations = k8s_config.annotations -# # labels = k8s_config.labels -# env = job.env - -# annotations = annotations or {} -# annotations: Dict[str, str] = {} -# # TODO: set in proxy - -# # labels = labels or {} -# # labels = dict(labels) -# labels: Dict[str, str] = {} - -# file_volume_mount = V1VolumeMount(mount_path="/code", name="file-volume") -# file_volume = V1Volume(name="file-volume", empty_dir={}) - -# env_vars: Optional[List[V1EnvVar]] = None -# if env: -# env_vars = [V1EnvVar(name=k, value=v) for k, v in env.items()] - -# container = V1Container( -# args=args, -# command=command, -# image=image, -# name="job", -# env=env_vars, -# # volume_mounts=[file_volume_mount], -# resources=V1ResourceRequirements(), -# # TODO: this is important. validate it! -# working_dir="/code", -# ) - -# resources = profile.get("resources", {}) -# limits = resources.get("limits", {}) -# requests = resources.get("requests", {}) - -# container.resources.requests = {} -# container.resources.limits = {} - -# if requests: -# container.resources.requests.update(requests) -# if limits: -# container.resources.limits.update(limits) - -# pod_metadata = V1ObjectMeta( -# name=f"{name}-pod", -# # namespace=k8s_config.namespace, -# labels=labels, -# annotations=annotations, -# ) -# tolerations = None -# if profile.get("tolerations", []): -# tolerations = [V1Toleration(**v) for v in profile["tolerations"]] - -# node_affinity_required = profile.get("node_affinity_required", {}) -# if node_affinity_required: -# match_expressions = [] -# match_fields = [] -# for d in node_affinity_required: -# for k, affinities in d.items(): -# for v in affinities: -# if k == "matchExpressions": -# match_expressions.append( -# V1NodeSelectorRequirement( -# key=v.get("key"), -# operator=v.get("operator"), -# values=v.get("values"), -# ) -# ) -# elif k == "matchFields": -# match_fields.append( -# V1NodeSelectorRequirement( -# key=v.get("key"), -# operator=v.get("operator"), -# values=v.get("values"), -# ) -# ) -# else: -# raise ValueError( -# "Key must be 'matchExpressions' or 'matchFields'. Got {k} instead." -# ) - -# node_selector_terms = V1NodeSelectorTerm( -# match_expressions=match_expressions, -# match_fields=match_fields, -# ) -# node_selector = V1NodeSelector(node_selector_terms=[node_selector_terms]) -# node_affinity = V1NodeAffinity( -# required_during_scheduling_ignored_during_execution=node_selector -# ) -# affinity = V1Affinity(node_affinity=node_affinity) -# else: -# affinity = None - -# # TODO: verify restart policy -# template = V1PodTemplateSpec( -# spec=V1PodSpec( -# # init_containers=init_containers, -# containers=[container], -# restart_policy="Never", -# # volumes=[file_volume], -# tolerations=tolerations, -# affinity=affinity, -# ), -# metadata=pod_metadata, -# ) - -# generate_name = name -# if not name.endswith("-"): -# generate_name = name + "-" -# if schedule: -# generate_name += "cron-" - -# job_metadata = V1ObjectMeta( -# generate_name=generate_name, -# annotations=annotations, -# labels=labels, -# ) - -# job_spec = V1JobSpec( -# template=template, backoff_limit=0, ttl_seconds_after_finished=300 -# ) - -# if schedule: -# job_template = V1JobTemplateSpec( -# metadata=job_metadata, -# spec=job_spec, -# ) - -# return V1CronJob( -# api_version="batch/v1", -# kind="CronJob", -# metadata=job_metadata, -# spec=V1CronJobSpec( -# schedule=schedule, -# job_template=job_template, -# starting_deadline_seconds=300, -# ), -# ) - -# return V1Job( -# api_version="batch/v1", -# kind="Job", -# metadata=job_metadata, -# spec=job_spec, -# ) - - def make_configmap(code: Union[str, pathlib.Path], generate_name) -> V1ConfigMap: code = pathlib.Path(code) From 00cd7b7c769c6fe650e9f7d1e85f540f03e4741a Mon Sep 17 00:00:00 2001 From: iameskild Date: Tue, 31 May 2022 18:44:12 -0700 Subject: [PATCH 7/9] Simplify --- kbatch-proxy/kbatch_proxy/main.py | 123 ++++++++++++++++-------------- kbatch/kbatch/_backend.py | 6 +- kbatch/kbatch/_core.py | 65 +++++++++------- kbatch/kbatch/_types.py | 36 +++++---- kbatch/kbatch/cli.py | 51 +++++++------ 5 files changed, 146 insertions(+), 135 deletions(-) diff --git a/kbatch-proxy/kbatch_proxy/main.py b/kbatch-proxy/kbatch_proxy/main.py index e264440..949bb45 100644 --- a/kbatch-proxy/kbatch_proxy/main.py +++ b/kbatch-proxy/kbatch_proxy/main.py @@ -1,13 +1,13 @@ import os import logging -from typing import List, Optional, Tuple, Dict +from typing import List, Optional, Tuple, Dict, Union import yaml from pydantic import BaseModel, BaseSettings import jupyterhub.services.auth from fastapi import Depends, FastAPI, HTTPException, Request, status, APIRouter import kubernetes.client -import kubernetes.client.models +from kubernetes.client.models import V1CronJob, V1Job, V1ConfigMap import kubernetes.config import kubernetes.watch import rich.traceback @@ -88,9 +88,7 @@ class UserOut(BaseModel): job_template = yaml.safe_load(f) # parse with Kubernetes to normalize keys with job_data - job_template = utils.parse( - job_template, model=kubernetes.client.models.V1Job - ).to_dict() + job_template = utils.parse(job_template, model=V1Job).to_dict() utils.remove_nulls(job_template) else: @@ -158,47 +156,45 @@ def get_k8s_api() -> Tuple[kubernetes.client.CoreV1Api, kubernetes.client.BatchV # cronjobs # @router.get("/cronjobs/{job_name}") async def read_cronjob(job_name: str, user: User = Depends(get_current_user)): - return action_on_job(job_name, user.namespace, "read") + return _perform_action(job_name, user.namespace, "read", V1CronJob) @router.get("/cronjobs/") async def read_cronjobs(user: User = Depends(get_current_user)): - return list_jobs(user.namespace) + return _perform_action(None, user.namespace, "list", V1CronJob) @router.delete("/cronjobs/{job_name}") async def delete_cronjob(job_name: str, user: User = Depends(get_current_user)): - return action_on_job(job_name, user.namespace, "delete") + return _perform_action(job_name, user.namespace, "delete", V1CronJob) @router.post("/cronjobs/") async def create_cronjob(request: Request, user: User = Depends(get_current_user)): data = await request.json() - model = kubernetes.client.models.V1CronJob - return create(data, model, user) + return _create_job(data, V1CronJob, user) # jobs # @router.get("/jobs/{job_name}") async def read_job(job_name: str, user: User = Depends(get_current_user)): - return action_on_job(job_name, user.namespace, "read") + return _perform_action(job_name, user.namespace, "read", V1Job) @router.get("/jobs/") async def read_jobs(user: User = Depends(get_current_user)): - return list_jobs(user.namespace) + return _perform_action(None, user.namespace, "list", V1Job) @router.delete("/jobs/{job_name}") async def delete_job(job_name: str, user: User = Depends(get_current_user)): - return action_on_job(job_name, user.namespace, "delete") + return _perform_action(job_name, user.namespace, "delete", V1Job) @router.post("/jobs/") async def create_job(request: Request, user: User = Depends(get_current_user)): data = await request.json() - model = kubernetes.client.models.V1Job - return create(data, model, user) + return _create_job(data, V1Job, user) # pods # @@ -298,22 +294,40 @@ def ensure_namespace(api: kubernetes.client.CoreV1Api, namespace: str): return True -def create(data: dict, model, user: User = Depends(get_current_user)): - api, batch_api = get_k8s_api() +def _create_job( + data: dict, + model: Union[V1CronJob, V1Job], + user: User = Depends(get_current_user), +): + """ + Create a Kubernetes batch Job or CronJob. + + This is handled in three steps: + 1. Submit ConfigMap + 2. Submit Job/CronJob + 3. Patch ConfigMap to add Job/CronJob as the owner - # data = await request.json() - # model = ( - # kubernetes.client.models.V1Job - # if data.get("job") - # else kubernetes.client.models.V1CronJob - # ) + Parameters + ---------- + data : data specific to the Job or CronJob. + model : kubernetes batch models, "V1Job" "V1CronJob". + user : a `User` object which holds specific configuration settings. + """ + api, batch_api = get_k8s_api() - job_data = data.get("job") or data.get("cronjob") + job_data = data["job"] + # does it handle cronjob job specs appropriately? if job_template: job_data = utils.merge_json_objects(job_data, job_template) + # can be either job or cronjob job = utils.parse(job_data, model=model) + job_to_patch = job + + if issubclass(model, V1CronJob): + j = job_data.get("spec", {}).get("job_template", {}) + job_to_patch = utils.parse(j, V1Job) code_data = data.get("code", None) if code_data: @@ -321,14 +335,12 @@ def create(data: dict, model, user: User = Depends(get_current_user)): # we have to decode it *after* submitting things to the API server... # This is not great. # code_data["binary_data"]["code"] = base64.b64decode(code_data["binary_data"]["code"]) - config_map: Optional[kubernetes.client.models.V1ConfigMap] = utils.parse( - code_data, model=kubernetes.client.models.V1ConfigMap - ) + config_map: Optional[V1ConfigMap] = utils.parse(code_data, model=V1ConfigMap) else: config_map = None patch.patch( - job, + job_to_patch, config_map, annotations={}, labels={}, @@ -364,7 +376,13 @@ def create(data: dict, model, user: User = Depends(get_current_user)): logger.info("Submitting job") try: - resp = batch_api.create_namespaced_job(namespace=user.namespace, body=job) + if issubclass(model, V1Job): + resp = batch_api.create_namespaced_job(namespace=user.namespace, body=job) + elif issubclass(model, V1CronJob): + resp = batch_api.create_namespaced_cron_job( + namespace=user.namespace, body=job + ) + except kubernetes.client.exceptions.ApiException as e: content_type = e.headers.get("Content-Type") if content_type: @@ -388,22 +406,12 @@ def create(data: dict, model, user: User = Depends(get_current_user)): return resp.to_dict() -def list_jobs(namespace: str) -> Tuple[Dict, Dict]: - """ - List Jobs and CronJobs currently running or scheduled in `namespace`. - - Parameters - ---------- - namespace : Kubernetes namespace to check for Jobs and CronJobs. - """ - _, batch_api = get_k8s_api() - jobs = batch_api.list_namespaced_job(namespace).to_dict() - cronjobs = batch_api.list_namespaced_cron_jobs(namespace).to_dict() - - return {"jobs": jobs, "cronjobs": cronjobs} - - -def action_on_job(job_name: str, namespace: str, action: str) -> str: +def _perform_action( + job_name: Union[str, None], + namespace: str, + action: str, + model: Union[V1Job, V1CronJob], +) -> str: """ Perform an action on `job_name`. @@ -413,27 +421,24 @@ def action_on_job(job_name: str, namespace: str, action: str) -> str: namespace : Kubernetes namespace to check. action : action to perform on `job_name`. Must match one item in `job_actions` list. + model : kubernetes batch models, "V1Job" "V1CronJob" """ - job_actions = ["read", "delete"] + job_actions = ["list", "read", "delete"] if action not in job_actions: raise ValueError( f"Unknown `action` specified: {action}. " + "Please select from one of the following: {job_actions}." ) - def _job_type(job_name, namespace): - jobs, cronjobs = list_jobs(namespace) - for job in jobs["items"]: - if job["metadata"]["name"] == job_name: - return "job" - for job in cronjobs["items"]: - if job["metadata"]["name"] == job_name: - return "cron_job" - - raise ValueError(f"The job name specified, {job_name}, cannot be found.") + if issubclass(model, V1Job): + model = "job" + elif issubclass(model, V1CronJob): + model = "cron_job" _, batch_api = get_k8s_api() - job_type = _job_type(job_name, namespace) - f = getattr(batch_api, f"{action}_namespaced_{job_type}") + f = getattr(batch_api, f"{action}_namespaced_{model}") - return f(job_name, namespace).to_dict() + if action == "list": + return f(namespace).to_dict() + else: + return f(job_name, namespace).to_dict() diff --git a/kbatch/kbatch/_backend.py b/kbatch/kbatch/_backend.py index ee30ceb..049291e 100644 --- a/kbatch/kbatch/_backend.py +++ b/kbatch/kbatch/_backend.py @@ -8,7 +8,7 @@ import shutil import tempfile import zipfile -from typing import Optional, List, Dict, Union, Type +from typing import Optional, List, Dict, Union from kubernetes.client.models import ( V1Job, @@ -31,7 +31,7 @@ V1NodeSelectorRequirement, ) -from ._types import BaseJob, CronJob, Job +from ._types import CronJob, Job SAFE_CHARS = set(string.ascii_lowercase + string.digits) @@ -48,7 +48,7 @@ def _make_job_spec( - job: Type[BaseJob], + job: Union[Job, CronJob], profile: Optional[dict] = None, labels: Optional[dict] = None, annotations: Optional[dict] = None, diff --git a/kbatch/kbatch/_core.py b/kbatch/kbatch/_core.py index 2eacb99..cd0cc09 100644 --- a/kbatch/kbatch/_core.py +++ b/kbatch/kbatch/_core.py @@ -4,25 +4,19 @@ import logging import json from pathlib import Path -from typing import Optional, Dict +from typing import Optional, Dict, Union import rich.table import httpx import urllib.parse import yaml +from kubernetes.client.models import V1CronJob, V1Job from ._backend import make_configmap logger = logging.getLogger(__name__) -# canonical resource "kind" names -CRONJOBS = "cronjobs" -JOBS = "jobs" -PODS = "pods" - -RESOURCE_KIND = [CRONJOBS, JOBS, PODS] - def config_path() -> Path: config_home = ( @@ -85,10 +79,10 @@ def configure(kbatch_url=None, token=None) -> Path: def _request_action( kbatch_url: str, - token: str, + token: Optional[str], method: str, - resource_kind: str, - resource_name: Optional[dict] = None, + model: Union[V1Job, V1CronJob], + resource_name: str = None, json_data: Optional[dict] = None, ): client = httpx.Client(follow_redirects=True) @@ -108,13 +102,9 @@ def _request_action( + "Please select from one of the following: {http_methods}." ) - if resource_kind not in RESOURCE_KIND: - raise ValueError( - f"Unknown resource specified: {resource_kind}. " - + "Please select from one of the following: {RESOURCE_KIND}." - ) + print(model) + endpoint = "jobs/" if issubclass(model, V1Job) else "cronjobs/" - endpoint = f"{resource_kind}/" if resource_name: endpoint += resource_name @@ -124,7 +114,6 @@ def _request_action( headers=headers, json=json_data, ) - print(r) try: r.raise_for_status() except Exception: @@ -134,25 +123,43 @@ def _request_action( return r.json() -def show_job(resource_name, kbatch_url, token, resource_kind="jobs"): - return _request_action(kbatch_url, token, "GET", resource_kind, resource_name) +def show_job(resource_name, kbatch_url, token, model: Union[V1Job, V1CronJob] = V1Job): + return _request_action(kbatch_url, token, "GET", model, resource_name) -def delete_job(resource_name, kbatch_url, token, resource_kind="jobs"): - return _request_action(kbatch_url, token, "DELETE", resource_kind, resource_name) +def delete_job( + resource_name, kbatch_url, token, model: Union[V1Job, V1CronJob] = V1Job +): + return _request_action(kbatch_url, token, "DELETE", model, resource_name) -def list_jobs(kbatch_url, token, resource_kind="jobs"): - return _request_action(kbatch_url, token, "GET", resource_kind) +def list_jobs(kbatch_url, token, model: Union[V1Job, V1CronJob] = V1Job): + return _request_action(kbatch_url, token, "GET", model) def submit_job( - job, kbatch_url, token=None, resource_kind="jobs", code=None, profile=None + job, + kbatch_url, + token=None, + model: Union[V1Job, V1CronJob] = V1Job, + code=None, + profile=None, ): - from ._backend import make_job + from ._backend import make_job, make_cronjob profile = profile or {} - data = make_job(job, profile=profile).to_dict() + + if issubclass(model, V1Job): + data = make_job(job, profile=profile).to_dict() + elif issubclass(model, V1CronJob): + data = make_cronjob(job, profile=profile).to_dict() + else: + raise ValueError( + f"Unknown resource specified: {model}. " + + "Please select from one of the following: `V1Job` or `V1CronJob`." + ) + + data = {"job": data} if code: cm = make_configmap(code, generate_name=job.name).to_dict() @@ -160,8 +167,8 @@ def submit_job( "ascii" ) data["code"] = cm - print(data) - return _request_action(kbatch_url, token, "POST", resource_kind, json_data=data) + + return _request_action(kbatch_url, token, "POST", model, json_data=data) def list_pods(kbatch_url: str, token: Optional[str], job_name: Optional[str] = None): diff --git a/kbatch/kbatch/_types.py b/kbatch/kbatch/_types.py index b8cefd7..a291e67 100644 --- a/kbatch/kbatch/_types.py +++ b/kbatch/kbatch/_types.py @@ -9,12 +9,8 @@ class User: @dataclass() -class _BaseJob: +class Job: name: str - - -@dataclass() -class _BaseJobDefaults: image: Optional[str] = None command: Optional[List[str]] = None args: Optional[List[str]] = None @@ -23,25 +19,27 @@ class _BaseJobDefaults: env: Dict[str, str] = field(default_factory=dict) code: Optional[str] = None + def to_kubernetes(self): + return _to_kubernetes() + @dataclass() -class _CronJob: +class CronJob: + name: str schedule: str + image: Optional[str] = None + command: Optional[List[str]] = None + args: Optional[List[str]] = None + upload: Optional[str] = None + description: Optional[str] = None + env: Dict[str, str] = field(default_factory=dict) + code: Optional[str] = None - -@dataclass() -class BaseJob(_BaseJobDefaults, _BaseJob): def to_kubernetes(self): - from ._backend import make_job + return _to_kubernetes() - return make_job(self) +def _to_kubernetes(self): + from ._backend import make_job -@dataclass() -class Job(BaseJob): - pass - - -@dataclass() -class CronJob(BaseJob, _CronJob): - schedule: str + return make_job(self) diff --git a/kbatch/kbatch/cli.py b/kbatch/kbatch/cli.py index 70469ed..6f511eb 100644 --- a/kbatch/kbatch/cli.py +++ b/kbatch/kbatch/cli.py @@ -3,9 +3,9 @@ import click import rich import rich.logging +from kubernetes.client.models import V1Job, V1CronJob from . import _core -from ._core import CRONJOBS, JOBS from ._types import CronJob, Job @@ -53,23 +53,23 @@ def cronjob(): pass -@cronjob.command() +@cronjob.command(name="show") @click.option("--kbatch-url", help="URL to the kbatch server.") @click.option("--token", help="File to execute.") @click.argument("cronjob_name") def show_cronjob(cronjob_name, kbatch_url, token): """Show the details for a cronjob.""" - result = _core.show_job(cronjob_name, kbatch_url, token, CRONJOBS) + result = _core.show_job(cronjob_name, kbatch_url, token, V1CronJob) rich.print_json(data=result) -@cronjob.command() +@cronjob.command(name="delete") @click.option("--kbatch-url", help="URL to the kbatch server.") @click.option("--token", help="File to execute.") @click.argument("cronjob_name") def delete_cronjob(cronjob_name, kbatch_url, token): """Delete a cronjob, cancelling running jobs and pods.""" - result = _core.delete_job(cronjob_name, kbatch_url, token, CRONJOBS) + result = _core.delete_job(cronjob_name, kbatch_url, token, V1CronJob) rich.print_json(data=result) @@ -85,7 +85,7 @@ def delete_cronjob(cronjob_name, kbatch_url, token): ) def list_cronjobs(kbatch_url, token, output): """List all the cronjobs.""" - results = _core.list_jobs(kbatch_url, token, CRONJOBS) + results = _core.list_jobs(kbatch_url, token, V1CronJob) if output == "json": rich.print_json(data=results) @@ -93,14 +93,16 @@ def list_cronjobs(kbatch_url, token, output): rich.print(_core.format_cronjobs(results)) -@cronjob.command() -@click.option("-n", "--name", help="CronJob name.") +@cronjob.command(name="submit") +@click.argument("cronjob_name") @click.option("--image", help="Container image to use to execute job.") @click.option("--command", help="Command to execute.") @click.option("--args", help="Arguments to pass to the command.") -@click.option("--schedule", help="The schedule this CronJob should run on.") +@click.option( + "--schedule", help="The schedule this cronjob should run on.", required=True +) @click.option("-e", "--env", help="JSON mapping of environment variables for the job.") -@click.option("-d", "--description", help="A description of the CronJob, optional.") +@click.option("-d", "--description", help="A description of the cronjob, optional.") @click.option( "-c", "--code", @@ -158,7 +160,7 @@ def submit_cronjob( job=cronjob, kbatch_url=kbatch_url, token=token, - resource_type=CRONJOBS, + model=V1CronJob, code=code, profile=profile, ) @@ -175,23 +177,23 @@ def job(): pass -@job.command() +@job.command(name="show") @click.option("--kbatch-url", help="URL to the kbatch server.") @click.option("--token", help="File to execute.") @click.argument("job_name") -def show(job_name, kbatch_url, token): +def show_job(job_name, kbatch_url, token): """Show the details for a job.""" - result = _core.show_job(job_name, kbatch_url, token, JOBS) + result = _core.show_job(job_name, kbatch_url, token, V1Job) rich.print_json(data=result) -@job.command() +@job.command(name="delete") @click.option("--kbatch-url", help="URL to the kbatch server.") @click.option("--token", help="File to execute.") @click.argument("job_name") -def delete(job_name, kbatch_url, token): +def delete_job(job_name, kbatch_url, token): """Delete a job, cancelling running pods.""" - result = _core.delete_job(job_name, kbatch_url, token, JOBS) + result = _core.delete_job(job_name, kbatch_url, token, V1Job) rich.print_json(data=result) @@ -207,20 +209,19 @@ def delete(job_name, kbatch_url, token): ) def list_jobs(kbatch_url, token, output): """List all the jobs.""" - results = _core.list_jobs(kbatch_url, token, JOBS) + results = _core.list_jobs(kbatch_url, token, V1Job) if output == "json": - rich.print_json(results) + rich.print_json(data=results) elif output == "table": - rich.print(results) + rich.print(_core.format_jobs(results)) -@job.command() -@click.option("-n", "--name", help="Job name.") +@job.command(name="submit") +@click.argument("job_name") @click.option("--image", help="Container image to use to execute job.") @click.option("--command", help="Command to execute.") @click.option("--args", help="Arguments to pass to the command.") -@click.option("--schedule", help="The schedule this CronJob should run on.") @click.option("-e", "--env", help="JSON mapping of environment variables for the job.") @click.option("-d", "--description", help="A description of the job, optional.") @click.option( @@ -239,7 +240,7 @@ def list_jobs(kbatch_url, token, output): help="Output format.", type=click.Choice(["json", "name"]), ) -def submit( +def submit_job( file, code, name, @@ -276,7 +277,7 @@ def submit( job, kbatch_url=kbatch_url, token=token, - resource_type=JOBS, + model=V1Job, code=code, profile=profile, ) From 412d2f6457a0130d63254753d457feea5a9c87d7 Mon Sep 17 00:00:00 2001 From: iameskild Date: Wed, 1 Jun 2022 16:46:34 -0700 Subject: [PATCH 8/9] Add kbatch-proxy tests, update docs --- docs/source/user-guide.md | 27 ++++- kbatch-proxy/kbatch_proxy/patch.py | 31 ++++-- kbatch-proxy/tests/test_proxy.py | 169 +++++++++++++++++------------ kbatch/kbatch/_core.py | 1 - kbatch/kbatch/cli.py | 4 +- 5 files changed, 145 insertions(+), 87 deletions(-) diff --git a/docs/source/user-guide.md b/docs/source/user-guide.md index 68d2c41..a8ecf1a 100644 --- a/docs/source/user-guide.md +++ b/docs/source/user-guide.md @@ -51,12 +51,12 @@ $ kbatch job show list-files-jfprp With `kbatch job logs` you can get the logs for a job. Make sure to pass the container id. -### Submitting a cron job +## Submit a cronjob -If you'd like your job to run on a repeating schedule, just specify the schedule you would like: +If you'd like your job to run on a repeating schedule, you can leverage CronJobs. The command line interface for `kbatch cronjob` is same as `kbatch job` with the added requirement that you specify a schedule when you `submit` a cronjob: ```{code-block} console -$ kbatch job submit \ +$ kbatch cronjob submit \ --name=list-files \ --image=alpine \ --command='["ls", "-lh"]' @@ -65,6 +65,27 @@ $ kbatch job submit \ This job will now run at 22:00 on every day-of-week from Monday through Friday **indefinitely**. +You can list schedule cronjobs: + +``` +$ kbatch cronjob list -o table +┏━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━┓ +┃ cronjob name ┃ started ┃ schedule ┃ +┡━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━┩ +│ mycj-cron-kb7d6 │ 2022-05-31T05:27:25+00:00 │ */5 * * * * │ +│ list-files-cron-56whl │ 2022-06-01T23:35:20+00:00 │ 0 22 * * 1-5 │ +└───────────────────────┴───────────────────────────┴──────────────┘ +``` + + +The only way to remove a cronjob is to explicitly delete it: + +``` +kbatch cronjob delete mycj-cron-kb7d6 +``` + +### Schedule syntax + For those familiar with Linux cron jobs, the schedule syntax is the same. For those unfamiliar, have a read through the [Kubernetes CronJob - cron schedule syntax](https://kubernetes.io/docs/concepts/workloads/controllers/cron-jobs/#cron-schedule-syntax). The website [crontab.guru](https://crontab.guru/#0_22_*_*_1-5) is a nifty tool that tries to translate the schedule syntax into "plain" English. diff --git a/kbatch-proxy/kbatch_proxy/patch.py b/kbatch-proxy/kbatch_proxy/patch.py index a7c890e..988e594 100644 --- a/kbatch-proxy/kbatch_proxy/patch.py +++ b/kbatch-proxy/kbatch_proxy/patch.py @@ -3,11 +3,12 @@ """ import re import string -from typing import Dict, Optional +from typing import Dict, Optional, Union import escapism from kubernetes.client.models import ( V1Job, + V1JobTemplateSpec, V1ConfigMap, V1Container, V1VolumeMount, @@ -22,7 +23,9 @@ SAFE_CHARS = set(string.ascii_lowercase + string.digits) -def add_annotations(job: V1Job, annotations, username: str) -> None: +def add_annotations( + job: Union[V1Job, V1JobTemplateSpec], annotations, username: str +) -> None: annotations = dict(annotations) annotations["kbatch.jupyter.org/username"] = username @@ -30,7 +33,7 @@ def add_annotations(job: V1Job, annotations, username: str) -> None: job.spec.template.metadata.annotations.update(annotations) # update or replace? -def add_labels(job: V1Job, labels, username: str) -> None: +def add_labels(job: Union[V1Job, V1JobTemplateSpec], labels, username: str) -> None: labels = dict(labels) labels["kbatch.jupyter.org/username"] = escapism.escape( username, safe=SAFE_CHARS, escape_char="-" @@ -40,7 +43,7 @@ def add_labels(job: V1Job, labels, username: str) -> None: job.spec.template.metadata.labels.update(labels) # update or replace? -def add_namespace(job: V1Job, namespace: str) -> None: +def add_namespace(job: Union[V1Job, V1JobTemplateSpec], namespace: str) -> None: job.metadata.namespace = namespace job.spec.template.metadata.namespace = namespace @@ -49,11 +52,11 @@ def add_namespace_configmap(config_map: V1ConfigMap, namespace: str) -> None: config_map.metadata.namespace = namespace -def add_code_configmap(job: V1Job) -> None: +def add_code_configmap(job: Union[V1Job, V1JobTemplateSpec]) -> None: pass -def add_unzip_init_container(job: V1Job) -> None: +def add_unzip_init_container(job: Union[V1Job, V1JobTemplateSpec]) -> None: """ Adds an init container to unzip the code. """ @@ -110,7 +113,9 @@ def add_unzip_init_container(job: V1Job) -> None: def add_extra_env( - job: V1Job, extra_env: Dict[str, str], api_token: Optional[str] = None + job: Union[V1Job, V1JobTemplateSpec], + extra_env: Dict[str, str], + api_token: Optional[str] = None, ) -> None: container = job.spec.template.spec.containers[0] env_vars = [V1EnvVar(name=name, value=value) for name, value in extra_env.items()] @@ -146,13 +151,13 @@ def namespace_for_username(username: str) -> str: def add_job_ttl_seconds_after_finished( - job: V1Job, ttl_seconds_after_finished: Optional[int] + job: Union[V1Job, V1JobTemplateSpec], ttl_seconds_after_finished: Optional[int] ) -> None: job.spec.ttl_seconds_after_finished = ttl_seconds_after_finished def patch( - job: V1Job, + job: Union[V1Job, V1JobTemplateSpec], config_map: Optional[V1ConfigMap], *, username: str, @@ -185,13 +190,17 @@ def patch( add_unzip_init_container(job) -def add_submitted_configmap_name(job: V1Job, config_map: V1ConfigMap): +def add_submitted_configmap_name( + job: Union[V1Job, V1JobTemplateSpec], config_map: V1ConfigMap +): # config_map should be the response from the Kubernetes API server with the # submitted name job.spec.template.spec.volumes[-2].config_map.name = config_map.metadata.name -def patch_configmap_owner(job: V1Job, config_map: V1ConfigMap): +def patch_configmap_owner( + job: Union[V1Job, V1JobTemplateSpec], config_map: V1ConfigMap +): if job.metadata.name is None: raise ValueError("job must have a name before it can be set as an owner") assert job.metadata.name is not None diff --git a/kbatch-proxy/tests/test_proxy.py b/kbatch-proxy/tests/test_proxy.py index f4aad8d..56251d6 100644 --- a/kbatch-proxy/tests/test_proxy.py +++ b/kbatch-proxy/tests/test_proxy.py @@ -12,66 +12,97 @@ HERE = pathlib.Path(__file__).parent -@pytest.fixture +def k8s_job_spec() -> kubernetes.client.V1JobSpec: + return kubernetes.client.V1JobSpec( + template=kubernetes.client.V1PodTemplateSpec( + spec=kubernetes.client.V1PodSpec( + containers=[ + kubernetes.client.V1Container( + args=["ls", "-lh"], + command=None, + image="alpine", + name="job", + env=[kubernetes.client.V1EnvVar(name="MYENV", value="MYVALUE")], + resources=kubernetes.client.V1ResourceRequirements(), + ) + ], + restart_policy="Never", + tolerations=None, + ), + metadata=kubernetes.client.V1ObjectMeta( + name="test-name-pod", + labels={"pod": "label"}, + annotations={"pod": "annotations"}, + ), + ), + backoff_limit=4, + ttl_seconds_after_finished=300, + ) + + def k8s_job() -> kubernetes.client.V1Job: - job = kubernetes.client.models.V1Job( + metadata = kubernetes.client.V1ObjectMeta( + name="name", + generate_name="name-", + annotations={"foo": "bar"}, + labels={"baz": "qux"}, + ) + return kubernetes.client.models.V1Job( api_version="batch/v1", kind="Job", - metadata=kubernetes.client.models.V1ObjectMeta( - name="name", - generate_name="name-", - annotations={"foo": "bar"}, - labels={"baz": "qux"}, - ), - spec=kubernetes.client.V1JobSpec( - template=kubernetes.client.V1PodTemplateSpec( - spec=kubernetes.client.V1PodSpec( - containers=[ - kubernetes.client.V1Container( - args=["ls", "-lh"], - command=None, - image="alpine", - name="job", - env=[ - kubernetes.client.V1EnvVar( - name="MYENV", value="MYVALUE" - ) - ], - resources=kubernetes.client.V1ResourceRequirements(), - ) - ], - restart_policy="Never", - tolerations=None, - ), - metadata=kubernetes.client.V1ObjectMeta( - name="test-name-pod", - labels={"pod": "label"}, - annotations={"pod": "annotations"}, - ), + metadata=metadata, + spec=k8s_job_spec(), + ) + + +def k8s_cronjob() -> kubernetes.client.V1CronJob: + metadata = kubernetes.client.V1ObjectMeta( + name="name-cron", + generate_name="name-cron-", + annotations={"foo": "bar"}, + labels={"baz": "qux"}, + ) + return kubernetes.client.V1CronJob( + api_version="batch/v1", + kind="CronJob", + metadata=metadata, + spec=kubernetes.client.V1CronJobSpec( + schedule="*/5 * * * *", + job_template=kubernetes.client.V1JobTemplateSpec( + spec=k8s_job_spec(), + metadata=metadata, ), - backoff_limit=4, - ttl_seconds_after_finished=300, ), ) - return job -def test_parse_job(k8s_job: kubernetes.client.V1Job): - result = kbatch_proxy.utils.parse(k8s_job.to_dict(), kubernetes.client.V1Job) - assert result == k8s_job +@pytest.fixture(scope="function", params=[k8s_job, k8s_cronjob]) +def job(request): + j = request.param() + if isinstance(j, kubernetes.client.V1CronJob): + j = j.spec.job_template + + yield j + + +def test_parse_job(job): + if isinstance(job, kubernetes.client.V1Job): + result = kbatch_proxy.utils.parse(job.to_dict(), kubernetes.client.V1Job) + else: + result = kbatch_proxy.utils.parse( + job.to_dict(), kubernetes.client.V1JobTemplateSpec + ) container = result.spec.template.spec.containers[0] assert isinstance(container, kubernetes.client.V1Container) assert container.args == ["ls", "-lh"] -def test_patch_job(k8s_job: kubernetes.client.V1Job): - kbatch_proxy.patch.patch( - k8s_job, None, annotations={}, labels={}, username="myuser" - ) +def test_patch_job(job): + kbatch_proxy.patch.patch(job, None, annotations={}, labels={}, username="myuser") - assert k8s_job.metadata.namespace == "myuser" - assert k8s_job.spec.template.metadata.namespace == "myuser" + assert job.metadata.namespace == "myuser" + assert job.spec.template.metadata.namespace == "myuser" @pytest.mark.parametrize( @@ -101,36 +132,32 @@ def test_namespace_configmap(): @pytest.mark.parametrize("has_init_containers", [True, False]) @pytest.mark.parametrize("has_volumes", [True, False]) -def test_add_unzip_init_container( - k8s_job: kubernetes.client.V1Job, has_init_containers: bool, has_volumes: bool -): +def test_add_unzip_init_container(job, has_init_containers: bool, has_volumes: bool): if has_init_containers: - k8s_job.spec.template.spec.init_containers = [ + job.spec.template.spec.init_containers = [ kubernetes.client.V1Container(name="present-container") ] if has_volumes: - k8s_job.spec.template.spec.volumes = [ + job.spec.template.spec.volumes = [ kubernetes.client.V1Volume(name="present-volume", empty_dir={}) ] - k8s_job.spec.template.spec.containers[0].volume_mounts = [ + job.spec.template.spec.containers[0].volume_mounts = [ kubernetes.client.V1VolumeMount( name="present-volume", mount_path="/present-volume" ) ] - kbatch_proxy.patch.add_unzip_init_container(k8s_job) + kbatch_proxy.patch.add_unzip_init_container(job) n_init_containers = int(has_init_containers) + 1 - assert len(k8s_job.spec.template.spec.init_containers) == n_init_containers + assert len(job.spec.template.spec.init_containers) == n_init_containers n_volumes = int(has_volumes) + 2 - assert len(k8s_job.spec.template.spec.volumes) == n_volumes + assert len(job.spec.template.spec.volumes) == n_volumes n_volume_mounts = int(has_volumes) + 1 - assert ( - len(k8s_job.spec.template.spec.containers[0].volume_mounts) == n_volume_mounts - ) + assert len(job.spec.template.spec.containers[0].volume_mounts) == n_volume_mounts # now patch with the actual name config_map = kubernetes.client.V1ConfigMap( @@ -138,19 +165,23 @@ def test_add_unzip_init_container( name="actual-name", namespace="my-namespace" ) ) - kbatch_proxy.patch.add_submitted_configmap_name(k8s_job, config_map) - assert k8s_job.spec.template.spec.volumes[-2].config_map.name == "actual-name" + kbatch_proxy.patch.add_submitted_configmap_name(job, config_map) + assert job.spec.template.spec.volumes[-2].config_map.name == "actual-name" @pytest.mark.parametrize( "job_env", [None, [], [kubernetes.client.V1EnvVar(name="SAS_TOKEN", value="TOKEN")]] ) -def test_extra_env(job_env, k8s_job: kubernetes.client.V1Job): +def test_extra_env(job, job_env): has_env = bool(job_env) - k8s_job.spec.template.spec.containers[0].env = job_env + + # make copy to avoid mutation + job.spec.template.spec.containers[0].env = ( + job_env.copy() if job_env is not None else None + ) extra_env = {"MY_ENV": "VALUE"} - kbatch_proxy.patch.add_extra_env(k8s_job, extra_env, api_token="super-secret") + kbatch_proxy.patch.add_extra_env(job, extra_env, api_token="super-secret") if has_env: expected = [ @@ -172,23 +203,21 @@ def test_extra_env(job_env, k8s_job: kubernetes.client.V1Job): ), ] - assert k8s_job.spec.template.spec.containers[0].env == expected + assert job.spec.template.spec.containers[0].env == expected -def test_set_job_ttl_seconds_after_finished(k8s_job: kubernetes.client.V1Job): - kbatch_proxy.patch.patch( - k8s_job, None, username="foo", ttl_seconds_after_finished=10 - ) - assert k8s_job.spec.ttl_seconds_after_finished == 10 +def test_set_job_ttl_seconds_after_finished(job): + kbatch_proxy.patch.patch(job, None, username="foo", ttl_seconds_after_finished=10) + assert job.spec.ttl_seconds_after_finished == 10 -def test_add_node_affinity(k8s_job: kubernetes.client.V1Job): +def test_add_node_affinity(job): job_template = yaml.safe_load((HERE / "job_template.yaml").read_text()) job_template = kbatch_proxy.utils.parse( job_template, kubernetes.client.V1Job ).to_dict() - job_data = k8s_job.to_dict() + job_data = job.to_dict() result = kbatch_proxy.utils.merge_json_objects(job_data, job_template) result = kbatch_proxy.utils.parse(result, kubernetes.client.V1Job) diff --git a/kbatch/kbatch/_core.py b/kbatch/kbatch/_core.py index cd0cc09..4031965 100644 --- a/kbatch/kbatch/_core.py +++ b/kbatch/kbatch/_core.py @@ -102,7 +102,6 @@ def _request_action( + "Please select from one of the following: {http_methods}." ) - print(model) endpoint = "jobs/" if issubclass(model, V1Job) else "cronjobs/" if resource_name: diff --git a/kbatch/kbatch/cli.py b/kbatch/kbatch/cli.py index 6f511eb..9c85b93 100644 --- a/kbatch/kbatch/cli.py +++ b/kbatch/kbatch/cli.py @@ -94,7 +94,7 @@ def list_cronjobs(kbatch_url, token, output): @cronjob.command(name="submit") -@click.argument("cronjob_name") +@click.option("-n", "--name", help="CronJob name.", required=True) @click.option("--image", help="Container image to use to execute job.") @click.option("--command", help="Command to execute.") @click.option("--args", help="Arguments to pass to the command.") @@ -218,7 +218,7 @@ def list_jobs(kbatch_url, token, output): @job.command(name="submit") -@click.argument("job_name") +@click.option("-n", "--name", help="Job name.", required=True) @click.option("--image", help="Container image to use to execute job.") @click.option("--command", help="Command to execute.") @click.option("--args", help="Arguments to pass to the command.") From 6748d25fcbfdda1cd566f26ab2bbfc4ad6154094 Mon Sep 17 00:00:00 2001 From: iameskild Date: Wed, 1 Jun 2022 17:18:49 -0700 Subject: [PATCH 9/9] Add tests for kbatch --- kbatch/kbatch/__init__.py | 5 +-- kbatch/tests/test_core.py | 67 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 70 insertions(+), 2 deletions(-) diff --git a/kbatch/kbatch/__init__.py b/kbatch/kbatch/__init__.py index d25c522..0129d4b 100644 --- a/kbatch/kbatch/__init__.py +++ b/kbatch/kbatch/__init__.py @@ -10,8 +10,8 @@ list_pods, logs_streaming, ) -from ._types import Job -from ._backend import make_job +from ._types import Job, CronJob +from ._backend import make_job, make_cronjob __version__ = "0.3.2" @@ -22,6 +22,7 @@ "list_jobs", "submit_job", "make_job", + "make_cronjob", "configure", "show_job", "logs", diff --git a/kbatch/tests/test_core.py b/kbatch/tests/test_core.py index 11b3bd5..1880f21 100644 --- a/kbatch/tests/test_core.py +++ b/kbatch/tests/test_core.py @@ -36,6 +36,21 @@ def test_env(env): kubernetes.client.V1EnvVar(name="KEY2", value="VALUE2"), ] + cronjobin = kbatch.CronJob( + name="name", + command=["ls -lh"], + image="alpine", + env=env, + schedule="*/10 * * * *", + ) + + cronjob = kbatch.make_cronjob(cronjobin) + container = cronjob.spec.job_template.spec.template.spec.containers[0] + assert container.env == [ + kubernetes.client.V1EnvVar(name="KEY1", value="VALUE1"), + kubernetes.client.V1EnvVar(name="KEY2", value="VALUE2"), + ] + def test_command_args(): model_job = kbatch.Job( @@ -50,6 +65,19 @@ def test_command_args(): assert job_container.args == ["-c", "python"] assert job_container.command == ["/bin/sh"] + model_cronjob = kbatch.CronJob( + name="name", + command=["/bin/sh"], + args=["-c", "python"], + image="alpine", + schedule="*/10 * * * *", + ) + cronjob = kbatch.make_cronjob(model_cronjob) + + job_container = cronjob.spec.job_template.spec.template.spec.containers[0] + assert job_container.args == ["-c", "python"] + assert job_container.command == ["/bin/sh"] + @pytest.mark.parametrize("as_dir", [True, False]) def test_make_configmap(tmp_path: pathlib.Path, as_dir: bool): @@ -208,3 +236,42 @@ def test_submit_job(respx_mock: respx.MockRouter): job, code=__file__, kbatch_url="http://kbatch.com/", token="abc" ) assert result + + +def test_submit_cronjob(respx_mock: respx.MockRouter): + respx_mock.post("http://kbatch.com/cronjobs/").mock( + return_value=httpx.Response(200, json={"mock": "response"}) + ) + + cronjob = kbatch.CronJob( + name="name", + command=["/bin/sh"], + args=["-c", "python"], + image="alpine", + schedule="*/10 * * * *", + ) + + result = kbatch.submit_job( + cronjob, + kbatch_url="http://kbatch.com/", + token="abc", + model=kubernetes.client.V1CronJob, + ) + assert result + + cronjob = kbatch.CronJob( + name="name", + command=["/bin/sh"], + args=["-c", "python"], + image="alpine", + schedule="*/10 * * * *", + ) + + result = kbatch.submit_job( + cronjob, + code=__file__, + kbatch_url="http://kbatch.com/", + token="abc", + model=kubernetes.client.V1CronJob, + ) + assert result