Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 66 additions & 19 deletions kbatch-proxy/kbatch_proxy/main.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import json
import logging
import os
from functools import partial
from typing import Dict, List, Optional, Tuple, Union

import jupyterhub.services.auth
Expand All @@ -11,7 +12,13 @@
import yaml
from fastapi import APIRouter, Depends, FastAPI, HTTPException, Request, status
from fastapi.responses import Response, StreamingResponse
from kubernetes.client.models import V1ConfigMap, V1CronJob, V1Job, V1JobTemplateSpec
from kubernetes.client.models import (
V1ConfigMap,
V1CronJob,
V1Job,
V1JobTemplateSpec,
V1Secret,
)
from pydantic import BaseModel
from pydantic_settings import BaseSettings, SettingsConfigDict

Expand Down Expand Up @@ -389,16 +396,19 @@ def _create_job(
else:
config_map = None

env_secret = V1Secret()

patch.patch(
job_to_patch,
config_map,
config_map=config_map,
annotations={},
labels={},
username=user.name,
ttl_seconds_after_finished=settings.kbatch_job_ttl_seconds_after_finished,
extra_env=settings.kbatch_job_extra_env,
api_token=user.api_token,
)
env_secret = patch.extract_env_secret(job_to_patch)

# What needs to happen when? We have a few requirements
# 1. The code ConfigMap must exist before adding it as a volume (we need a name,
Expand All @@ -407,37 +417,73 @@ def _create_job(
#
# So I think we're at 3 requests:
#
# 1. Submit configmap
# 1. Submit configmap, secret
# - ..
# 2. Submit Job
# 3. Patch ConfigMap to add Job as the owner
# 3. Patch ConfigMap, Secret to add Job as the owner
if settings.kbatch_create_user_namespace:
logger.info("Ensuring namespace %s", user.namespace)
created = ensure_namespace(api, user.namespace)
if created:
logger.info("Created namespace %s", user.namespace)

logger.info("Submitting Secret")
env_secret = api.create_namespaced_secret(namespace=user.namespace, body=env_secret)
patch.add_env_secret_name(job_to_patch, env_secret)

if config_map:
logger.info("Submitting ConfigMap")
config_map = api.create_namespaced_config_map(
namespace=user.namespace, body=config_map
try:
logger.info("Submitting ConfigMap")
config_map = api.create_namespaced_config_map(
namespace=user.namespace, body=config_map
)
patch.add_submitted_configmap_name(job_to_patch, config_map)
except Exception:
# owner reference not created yet
# have to delete unused secret manually
api.delete_namespaced_secret(
namespace=user.namespace, name=env_secret.metadata.name
)
raise

try:
logger.info("Submitting job")
if issubclass(model, V1Job):
resp = batch_api.create_namespaced_job(namespace=user.namespace, body=job)
elif issubclass(model, V1CronJob):
job.spec.job_template = job_to_patch
resp = batch_api.create_namespaced_cron_job(
namespace=user.namespace, body=job
)
except Exception:
# owner reference not created yet
# have to delete unused secret and config_map manually
api.delete_namespaced_secret(
namespace=user.namespace, name=env_secret.metadata.name
)
patch.add_submitted_configmap_name(job_to_patch, config_map)
if config_map:
api.delete_namespaced_config(
namespace=user.namespace, name=config_map.metadata.name
)
raise

logger.info("Submitting job")
if issubclass(model, V1Job):
resp = batch_api.create_namespaced_job(namespace=user.namespace, body=job)
elif issubclass(model, V1CronJob):
job.spec.job_template = job_to_patch
resp = batch_api.create_namespaced_cron_job(namespace=user.namespace, body=job)
logger.info(
"patching secret %s with owner %s",
env_secret.metadata.name,
resp.metadata.name,
)
patch.patch_owner(resp, env_secret)
api.patch_namespaced_secret(
name=env_secret.metadata.name, namespace=user.namespace, body=env_secret
)

if config_map:
logger.info(
"patching configmap %s with owner %s",
config_map.metadata.name,
resp.metadata.name,
)
patch.patch_configmap_owner(resp, config_map)
patch.patch_owner(resp, config_map)
api.patch_namespaced_config_map(
name=config_map.metadata.name, namespace=user.namespace, body=config_map
)
Expand Down Expand Up @@ -477,8 +523,9 @@ def _perform_action(

_, batch_api = get_k8s_api()
f = getattr(batch_api, f"{action}_namespaced_{model}")
if action != "list":
f = partial(f, job_name)
if action == "delete":
f = partial(f, propagation_policy="Foreground")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I found in testing that foreground propagation was required because if the Job was deleted too promptly, the ConfigMap and Secret would not get deleted. This may be a quirk of k3s, though.


if action == "list":
return f(namespace).to_dict()
else:
return f(job_name, namespace).to_dict()
return f(namespace).to_dict()
53 changes: 49 additions & 4 deletions kbatch-proxy/kbatch_proxy/patch.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
Patch a V1Job.
"""

import base64
import hashlib
import re
import string
Expand All @@ -14,10 +15,14 @@
V1Container,
V1CronJob,
V1EnvVar,
V1EnvVarSource,
V1Job,
V1JobTemplateSpec,
V1KeyToPath,
V1ObjectMeta,
V1OwnerReference,
V1Secret,
V1SecretKeySelector,
V1Volume,
V1VolumeMount,
)
Expand Down Expand Up @@ -170,9 +175,49 @@ def add_job_ttl_seconds_after_finished(
job.spec.ttl_seconds_after_finished = ttl_seconds_after_finished


def extract_env_secret(job: Union[V1Job, V1JobTemplateSpec]):
"""Extract all V1EnvVars into a Secret"""
meta = V1ObjectMeta(
name=job.metadata.name,
generate_name=job.metadata.generate_name,
labels=job.metadata.labels,
)
secret = V1Secret(metadata=meta, type="Opaque", data={})
for container in job.spec.template.spec.containers:
for i, env in enumerate(container.env or []):
if env.value is not None:
secret.data[env.name] = base64.b64encode(
env.value.encode("utf8")
).decode("ascii")
container.env[i] = V1EnvVar(
name=env.name,
value_from=V1EnvVarSource(
secret_key_ref=V1SecretKeySelector(
key=env.name,
name=secret.metadata.generate_name,
)
),
)
return secret


def add_env_secret_name(job: Union[V1Job, V1JobTemplateSpec], secret: V1Secret):
"""Apply the secret name to env secrets once they are known"""
generate_name = secret.metadata.generate_name
name = secret.metadata.name
for container in job.spec.template.spec.containers:
for i, env in enumerate(container.env or []):
if (
env.value_from
and env.value_from.secret_key_ref
and env.value_from.secret_key_ref.name == generate_name
):
env.value_from.secret_key_ref.name = name


def patch(
job: Union[V1Job, V1JobTemplateSpec],
config_map: Optional[V1ConfigMap],
config_map: Optional[V1ConfigMap] = None,
*,
username: str,
annotations: Optional[Dict[str, str]] = None,
Expand All @@ -186,7 +231,7 @@ def patch(

* Adds `annotations` to the job
* Adds `labels` to the job
* Sets the namespace of the job (and all containers) and ConfigMap to `namespacee`
* Sets the namespace of the job (and all containers) and ConfigMap to `namespace`
* Adds the ConfigMap as a volume for the Job's container
"""
annotations = annotations or {}
Expand All @@ -212,12 +257,12 @@ def add_submitted_configmap_name(
job.spec.template.spec.volumes[-2].config_map.name = config_map.metadata.name


def patch_configmap_owner(job: Union[V1Job, V1CronJob], config_map: V1ConfigMap):
def patch_owner(job: Union[V1Job, V1CronJob], obj: V1ConfigMap | V1Secret):
if job.metadata.name is None:
raise ValueError("job must have a name before it can be set as an owner")
assert job.metadata.name is not None

config_map.metadata.owner_references = [
obj.metadata.owner_references = [
V1OwnerReference(
api_version="batch/v1",
kind=job.kind,
Expand Down
16 changes: 16 additions & 0 deletions kbatch-proxy/tests/test_proxy.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import base64
import pathlib

import kbatch_proxy.main
Expand Down Expand Up @@ -205,6 +206,21 @@ def test_extra_env(job, job_env):
assert job.spec.template.spec.containers[0].env == expected


def test_secret_env(job):
kbatch_proxy.patch.add_extra_env(job, {"key": "value"}, api_token="super-secret")
secret = kbatch_proxy.patch.extract_env_secret(job)
secret_data = secret.data
assert "key" in secret_data
assert base64.b64decode(secret_data["key"]).decode("ascii") == "value"
assert "JUPYTERHUB_API_TOKEN" in secret_data
assert (
base64.b64decode(secret_data["JUPYTERHUB_API_TOKEN"]).decode("ascii")
== "super-secret"
)
assert "MYENV" in secret_data
assert base64.b64decode(secret_data["MYENV"]).decode("ascii") == "MYVALUE"


def test_set_job_ttl_seconds_after_finished(job):
kbatch_proxy.patch.patch(job, None, username="foo", ttl_seconds_after_finished=10)
assert job.spec.ttl_seconds_after_finished == 10
Expand Down