diff --git a/kbatch-proxy/kbatch_proxy/main.py b/kbatch-proxy/kbatch_proxy/main.py index 22d7832..0bb1579 100644 --- a/kbatch-proxy/kbatch_proxy/main.py +++ b/kbatch-proxy/kbatch_proxy/main.py @@ -1,6 +1,7 @@ import json import logging import os +from functools import partial from typing import Dict, List, Optional, Tuple, Union import jupyterhub.services.auth @@ -11,7 +12,13 @@ import yaml from fastapi import APIRouter, Depends, FastAPI, HTTPException, Request, status from fastapi.responses import Response, StreamingResponse -from kubernetes.client.models import V1ConfigMap, V1CronJob, V1Job, V1JobTemplateSpec +from kubernetes.client.models import ( + V1ConfigMap, + V1CronJob, + V1Job, + V1JobTemplateSpec, + V1Secret, +) from pydantic import BaseModel from pydantic_settings import BaseSettings, SettingsConfigDict @@ -389,9 +396,11 @@ def _create_job( else: config_map = None + env_secret = V1Secret() + patch.patch( job_to_patch, - config_map, + config_map=config_map, annotations={}, labels={}, username=user.name, @@ -399,6 +408,7 @@ def _create_job( extra_env=settings.kbatch_job_extra_env, api_token=user.api_token, ) + env_secret = patch.extract_env_secret(job_to_patch) # What needs to happen when? We have a few requirements # 1. The code ConfigMap must exist before adding it as a volume (we need a name, @@ -407,29 +417,65 @@ def _create_job( # # So I think we're at 3 requests: # - # 1. Submit configmap + # 1. Submit configmap, secret # - .. # 2. Submit Job - # 3. Patch ConfigMap to add Job as the owner + # 3. Patch ConfigMap, Secret to add Job as the owner if settings.kbatch_create_user_namespace: logger.info("Ensuring namespace %s", user.namespace) created = ensure_namespace(api, user.namespace) if created: logger.info("Created namespace %s", user.namespace) + logger.info("Submitting Secret") + env_secret = api.create_namespaced_secret(namespace=user.namespace, body=env_secret) + patch.add_env_secret_name(job_to_patch, env_secret) + if config_map: - logger.info("Submitting ConfigMap") - config_map = api.create_namespaced_config_map( - namespace=user.namespace, body=config_map + try: + logger.info("Submitting ConfigMap") + config_map = api.create_namespaced_config_map( + namespace=user.namespace, body=config_map + ) + patch.add_submitted_configmap_name(job_to_patch, config_map) + except Exception: + # owner reference not created yet + # have to delete unused secret manually + api.delete_namespaced_secret( + namespace=user.namespace, name=env_secret.metadata.name + ) + raise + + try: + logger.info("Submitting job") + if issubclass(model, V1Job): + resp = batch_api.create_namespaced_job(namespace=user.namespace, body=job) + elif issubclass(model, V1CronJob): + job.spec.job_template = job_to_patch + resp = batch_api.create_namespaced_cron_job( + namespace=user.namespace, body=job + ) + except Exception: + # owner reference not created yet + # have to delete unused secret and config_map manually + api.delete_namespaced_secret( + namespace=user.namespace, name=env_secret.metadata.name ) - patch.add_submitted_configmap_name(job_to_patch, config_map) + if config_map: + api.delete_namespaced_config( + namespace=user.namespace, name=config_map.metadata.name + ) + raise - logger.info("Submitting job") - if issubclass(model, V1Job): - resp = batch_api.create_namespaced_job(namespace=user.namespace, body=job) - elif issubclass(model, V1CronJob): - job.spec.job_template = job_to_patch - resp = batch_api.create_namespaced_cron_job(namespace=user.namespace, body=job) + logger.info( + "patching secret %s with owner %s", + env_secret.metadata.name, + resp.metadata.name, + ) + patch.patch_owner(resp, env_secret) + api.patch_namespaced_secret( + name=env_secret.metadata.name, namespace=user.namespace, body=env_secret + ) if config_map: logger.info( @@ -437,7 +483,7 @@ def _create_job( config_map.metadata.name, resp.metadata.name, ) - patch.patch_configmap_owner(resp, config_map) + patch.patch_owner(resp, config_map) api.patch_namespaced_config_map( name=config_map.metadata.name, namespace=user.namespace, body=config_map ) @@ -477,8 +523,9 @@ def _perform_action( _, batch_api = get_k8s_api() f = getattr(batch_api, f"{action}_namespaced_{model}") + if action != "list": + f = partial(f, job_name) + if action == "delete": + f = partial(f, propagation_policy="Foreground") - if action == "list": - return f(namespace).to_dict() - else: - return f(job_name, namespace).to_dict() + return f(namespace).to_dict() diff --git a/kbatch-proxy/kbatch_proxy/patch.py b/kbatch-proxy/kbatch_proxy/patch.py index 20a5a10..af9db33 100644 --- a/kbatch-proxy/kbatch_proxy/patch.py +++ b/kbatch-proxy/kbatch_proxy/patch.py @@ -2,6 +2,7 @@ Patch a V1Job. """ +import base64 import hashlib import re import string @@ -14,10 +15,14 @@ V1Container, V1CronJob, V1EnvVar, + V1EnvVarSource, V1Job, V1JobTemplateSpec, V1KeyToPath, + V1ObjectMeta, V1OwnerReference, + V1Secret, + V1SecretKeySelector, V1Volume, V1VolumeMount, ) @@ -170,9 +175,49 @@ def add_job_ttl_seconds_after_finished( job.spec.ttl_seconds_after_finished = ttl_seconds_after_finished +def extract_env_secret(job: Union[V1Job, V1JobTemplateSpec]): + """Extract all V1EnvVars into a Secret""" + meta = V1ObjectMeta( + name=job.metadata.name, + generate_name=job.metadata.generate_name, + labels=job.metadata.labels, + ) + secret = V1Secret(metadata=meta, type="Opaque", data={}) + for container in job.spec.template.spec.containers: + for i, env in enumerate(container.env or []): + if env.value is not None: + secret.data[env.name] = base64.b64encode( + env.value.encode("utf8") + ).decode("ascii") + container.env[i] = V1EnvVar( + name=env.name, + value_from=V1EnvVarSource( + secret_key_ref=V1SecretKeySelector( + key=env.name, + name=secret.metadata.generate_name, + ) + ), + ) + return secret + + +def add_env_secret_name(job: Union[V1Job, V1JobTemplateSpec], secret: V1Secret): + """Apply the secret name to env secrets once they are known""" + generate_name = secret.metadata.generate_name + name = secret.metadata.name + for container in job.spec.template.spec.containers: + for i, env in enumerate(container.env or []): + if ( + env.value_from + and env.value_from.secret_key_ref + and env.value_from.secret_key_ref.name == generate_name + ): + env.value_from.secret_key_ref.name = name + + def patch( job: Union[V1Job, V1JobTemplateSpec], - config_map: Optional[V1ConfigMap], + config_map: Optional[V1ConfigMap] = None, *, username: str, annotations: Optional[Dict[str, str]] = None, @@ -186,7 +231,7 @@ def patch( * Adds `annotations` to the job * Adds `labels` to the job - * Sets the namespace of the job (and all containers) and ConfigMap to `namespacee` + * Sets the namespace of the job (and all containers) and ConfigMap to `namespace` * Adds the ConfigMap as a volume for the Job's container """ annotations = annotations or {} @@ -212,12 +257,12 @@ def add_submitted_configmap_name( job.spec.template.spec.volumes[-2].config_map.name = config_map.metadata.name -def patch_configmap_owner(job: Union[V1Job, V1CronJob], config_map: V1ConfigMap): +def patch_owner(job: Union[V1Job, V1CronJob], obj: V1ConfigMap | V1Secret): if job.metadata.name is None: raise ValueError("job must have a name before it can be set as an owner") assert job.metadata.name is not None - config_map.metadata.owner_references = [ + obj.metadata.owner_references = [ V1OwnerReference( api_version="batch/v1", kind=job.kind, diff --git a/kbatch-proxy/tests/test_proxy.py b/kbatch-proxy/tests/test_proxy.py index 85ff8fb..b0a024c 100644 --- a/kbatch-proxy/tests/test_proxy.py +++ b/kbatch-proxy/tests/test_proxy.py @@ -1,3 +1,4 @@ +import base64 import pathlib import kbatch_proxy.main @@ -205,6 +206,21 @@ def test_extra_env(job, job_env): assert job.spec.template.spec.containers[0].env == expected +def test_secret_env(job): + kbatch_proxy.patch.add_extra_env(job, {"key": "value"}, api_token="super-secret") + secret = kbatch_proxy.patch.extract_env_secret(job) + secret_data = secret.data + assert "key" in secret_data + assert base64.b64decode(secret_data["key"]).decode("ascii") == "value" + assert "JUPYTERHUB_API_TOKEN" in secret_data + assert ( + base64.b64decode(secret_data["JUPYTERHUB_API_TOKEN"]).decode("ascii") + == "super-secret" + ) + assert "MYENV" in secret_data + assert base64.b64decode(secret_data["MYENV"]).decode("ascii") == "MYVALUE" + + def test_set_job_ttl_seconds_after_finished(job): kbatch_proxy.patch.patch(job, None, username="foo", ttl_seconds_after_finished=10) assert job.spec.ttl_seconds_after_finished == 10