From 77bde41291b1ad956eabc16c5f375333c7050665 Mon Sep 17 00:00:00 2001 From: Kevin Pullin Date: Mon, 26 Nov 2018 13:21:44 -0800 Subject: [PATCH 1/3] Support setting global k8s affinity and toleration configuration in the airflow config file. --- airflow/config_templates/default_airflow.cfg | 10 +++ .../example_kubernetes_executor.py | 32 ++++++- .../contrib/executors/kubernetes_executor.py | 23 ++++- airflow/contrib/kubernetes/pod.py | 4 + .../kubernetes/worker_configuration.py | 8 +- .../operators/kubernetes_pod_operator.py | 6 +- scripts/ci/kubernetes/kube/configmaps.yaml | 4 + .../executors/test_kubernetes_executor.py | 88 ++++++++++++++++++- 8 files changed, 162 insertions(+), 13 deletions(-) diff --git a/airflow/config_templates/default_airflow.cfg b/airflow/config_templates/default_airflow.cfg index a9473178c15da..52bfca7c1e872 100644 --- a/airflow/config_templates/default_airflow.cfg +++ b/airflow/config_templates/default_airflow.cfg @@ -648,6 +648,16 @@ gcp_service_account_keys = # It will raise an exception if called from a process not running in a kubernetes environment. in_cluster = True +# Affinity configuration as a single line formatted JSON object. +# See the affinity model for top-level key names (e.g. `nodeAffinity`, etc.): +# https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.12/#affinity-v1-core +affinity = + +# A list of toleration objects as a single line formatted JSON array +# See: +# https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.12/#toleration-v1-core +tolerations = + [kubernetes_node_selectors] # The Key-value pairs to be given to worker pods. # The worker pods will be scheduled to the nodes of the specified key-value pairs. diff --git a/airflow/contrib/example_dags/example_kubernetes_executor.py b/airflow/contrib/example_dags/example_kubernetes_executor.py index 1d9bb7304318b..d03e255ab3287 100644 --- a/airflow/contrib/example_dags/example_kubernetes_executor.py +++ b/airflow/contrib/example_dags/example_kubernetes_executor.py @@ -32,6 +32,31 @@ schedule_interval=None ) +affinity = { + 'podAntiAffinity': { + 'requiredDuringSchedulingIgnoredDuringExecution': [ + { + 'topologyKey': 'kubernetes.io/hostname', + 'labelSelector': { + 'matchExpressions': [ + { + 'key': 'app', + 'operator': 'In', + 'values': ['airflow'] + } + ] + } + } + ] + } +} + +tolerations = [{ + 'key': 'dedicated', + 'operator': 'Equal', + 'value': 'airflow' +}] + def print_stuff(): print("stuff!") @@ -59,11 +84,14 @@ def use_zip_binary(): executor_config={"KubernetesExecutor": {"image": "airflow/ci_zip:latest"}} ) -# Limit resources on this operator/task +# Limit resources on this operator/task with node affinity & tolerations three_task = PythonOperator( task_id="three_task", python_callable=print_stuff, dag=dag, executor_config={ - "KubernetesExecutor": {"request_memory": "128Mi", "limit_memory": "128Mi"}} + "KubernetesExecutor": {"request_memory": "128Mi", + "limit_memory": "128Mi", + "tolerations": tolerations, + "affinity": affinity}} ) start_task.set_downstream([one_task, two_task, three_task]) diff --git a/airflow/contrib/executors/kubernetes_executor.py b/airflow/contrib/executors/kubernetes_executor.py index 6c1bd222b90de..3b7ece4b6551b 100644 --- a/airflow/contrib/executors/kubernetes_executor.py +++ b/airflow/contrib/executors/kubernetes_executor.py @@ -16,6 +16,7 @@ # under the License. import base64 +import json import multiprocessing from queue import Queue from dateutil import parser @@ -40,7 +41,7 @@ class KubernetesExecutorConfig: def __init__(self, image=None, image_pull_policy=None, request_memory=None, request_cpu=None, limit_memory=None, limit_cpu=None, gcp_service_account_key=None, node_selectors=None, affinity=None, - annotations=None, volumes=None, volume_mounts=None): + annotations=None, volumes=None, volume_mounts=None, tolerations=None): self.image = image self.image_pull_policy = image_pull_policy self.request_memory = request_memory @@ -53,16 +54,18 @@ def __init__(self, image=None, image_pull_policy=None, request_memory=None, self.annotations = annotations self.volumes = volumes self.volume_mounts = volume_mounts + self.tolerations = tolerations def __repr__(self): return "{}(image={}, image_pull_policy={}, request_memory={}, request_cpu={}, " \ "limit_memory={}, limit_cpu={}, gcp_service_account_key={}, " \ "node_selectors={}, affinity={}, annotations={}, volumes={}, " \ - "volume_mounts={})" \ + "volume_mounts={}, tolerations={})" \ .format(KubernetesExecutorConfig.__name__, self.image, self.image_pull_policy, self.request_memory, self.request_cpu, self.limit_memory, self.limit_cpu, self.gcp_service_account_key, self.node_selectors, - self.affinity, self.annotations, self.volumes, self.volume_mounts) + self.affinity, self.annotations, self.volumes, self.volume_mounts, + self.tolerations) @staticmethod def from_dict(obj): @@ -88,6 +91,7 @@ def from_dict(obj): annotations=namespaced.get('annotations', {}), volumes=namespaced.get('volumes', []), volume_mounts=namespaced.get('volume_mounts', []), + tolerations=namespaced.get('tolerations', None), ) def as_dict(self): @@ -104,6 +108,7 @@ def as_dict(self): 'annotations': self.annotations, 'volumes': self.volumes, 'volume_mounts': self.volume_mounts, + 'tolerations': self.tolerations, } @@ -201,6 +206,18 @@ def __init__(self): # configmap self.airflow_configmap = conf.get(self.kubernetes_section, 'airflow_configmap') + affinity_json = conf.get(self.kubernetes_section, 'affinity') + if affinity_json: + self.kube_affinity = json.loads(affinity_json) + else: + self.kube_affinity = None + + tolerations_json = conf.get(self.kubernetes_section, 'tolerations') + if tolerations_json: + self.kube_tolerations = json.loads(tolerations_json) + else: + self.kube_tolerations = None + self._validate() def _validate(self): diff --git a/airflow/contrib/kubernetes/pod.py b/airflow/contrib/kubernetes/pod.py index bad5caa738e1b..281ebea6bb521 100644 --- a/airflow/contrib/kubernetes/pod.py +++ b/airflow/contrib/kubernetes/pod.py @@ -60,6 +60,10 @@ class Pod: :type image_pull_secrets: str :param affinity: A dict containing a group of affinity scheduling rules :type affinity: dict + :param hostnetwork: If True enable host networking on the pod + :type hostnetwork: bool + :param tolerations: A list of kubernetes tolerations + :type tolerations: list tolerations """ def __init__( self, diff --git a/airflow/contrib/kubernetes/worker_configuration.py b/airflow/contrib/kubernetes/worker_configuration.py index 01298c5e60f0e..7a091a7d8d621 100644 --- a/airflow/contrib/kubernetes/worker_configuration.py +++ b/airflow/contrib/kubernetes/worker_configuration.py @@ -197,10 +197,13 @@ def make_pod(self, namespace, worker_uuid, pod_id, dag_id, task_id, execution_da limit_cpu=kube_executor_config.limit_cpu ) gcp_sa_key = kube_executor_config.gcp_service_account_key - annotations = kube_executor_config.annotations.copy() + annotations = list(kube_executor_config.annotations) if gcp_sa_key: annotations['iam.cloud.google.com/service-account'] = gcp_sa_key + affinity = kube_executor_config.affinity or self.kube_config.kube_affinity + tolerations = kube_executor_config.tolerations or self.kube_config.kube_tolerations + return Pod( namespace=namespace, name=pod_id, @@ -225,5 +228,6 @@ def make_pod(self, namespace, worker_uuid, pod_id, dag_id, task_id, execution_da annotations=annotations, node_selectors=(kube_executor_config.node_selectors or self.kube_config.kube_node_selectors), - affinity=kube_executor_config.affinity + affinity=affinity, + tolerations=tolerations ) diff --git a/airflow/contrib/operators/kubernetes_pod_operator.py b/airflow/contrib/operators/kubernetes_pod_operator.py index 99c6da11b3bf7..b165ceedeffd3 100644 --- a/airflow/contrib/operators/kubernetes_pod_operator.py +++ b/airflow/contrib/operators/kubernetes_pod_operator.py @@ -84,8 +84,10 @@ class KubernetesPodOperator(BaseOperator): /airflow/xcom/return.json in the container will also be pushed to an XCom when the container completes. :type xcom_push: bool - :param tolerations: Kubernetes tolerations - :type list of tolerations + :param hostnetwork: If True enable host networking on the pod + :type hostnetwork: bool + :param tolerations: A list of kubernetes tolerations + :type tolerations: list tolerations """ template_fields = ('cmds', 'arguments', 'env_vars', 'config_file') diff --git a/scripts/ci/kubernetes/kube/configmaps.yaml b/scripts/ci/kubernetes/kube/configmaps.yaml index 93a6364f864e3..3415fb4030831 100644 --- a/scripts/ci/kubernetes/kube/configmaps.yaml +++ b/scripts/ci/kubernetes/kube/configmaps.yaml @@ -191,6 +191,10 @@ data: namespace = default gcp_service_account_keys = + # Example affinity and toleration definitions. + affinity = {"nodeAffinity":{"requiredDuringSchedulingIgnoredDuringExecution":{"nodeSelectorTerms":[{"matchExpressions":[{"key":"kubernetes.io/hostname","operator":"NotIn","values":["4e5e6a99-e28a-450b-bba9-e0124853de9b"]}]}]}}} + tolerations = [{ "key": "dedicated", "operator": "Equal", "value": "airflow", "effect": "NoSchedule" }, { "key": "prod", "operator": "Exists" }] + # For cloning DAGs from git repositories into volumes: https://github.com/kubernetes/git-sync git_sync_container_repository = gcr.io/google-containers/git-sync-amd64 git_sync_container_tag = v2.0.5 diff --git a/tests/contrib/executors/test_kubernetes_executor.py b/tests/contrib/executors/test_kubernetes_executor.py index 1307e500cfc31..0622ae1ff0b8e 100644 --- a/tests/contrib/executors/test_kubernetes_executor.py +++ b/tests/contrib/executors/test_kubernetes_executor.py @@ -14,6 +14,8 @@ # import unittest +import uuid + import mock import re import string @@ -25,6 +27,7 @@ from kubernetes.client.rest import ApiException from airflow.contrib.executors.kubernetes_executor import AirflowKubernetesScheduler from airflow.contrib.executors.kubernetes_executor import KubernetesExecutor + from airflow.contrib.executors.kubernetes_executor import KubernetesExecutorConfig from airflow.contrib.kubernetes.worker_configuration import WorkerConfiguration except ImportError: AirflowKubernetesScheduler = None @@ -85,13 +88,41 @@ class TestKubernetesWorkerConfiguration(unittest.TestCase): options are passed to worker pod config """ + affinity_config = { + 'podAntiAffinity': { + 'requiredDuringSchedulingIgnoredDuringExecution': [ + { + 'topologyKey': 'kubernetes.io/hostname', + 'labelSelector': { + 'matchExpressions': [ + { + 'key': 'app', + 'operator': 'In', + 'values': ['airflow'] + } + ] + } + } + ] + } + } + + tolerations_config = [ + { + 'key': 'dedicated', + 'operator': 'Equal', + 'value': 'airflow' + }, + { + 'key': 'prod', + 'operator': 'Exists' + } + ] + def setUp(self): if AirflowKubernetesScheduler is None: self.skipTest("kubernetes python package is not installed") - self.pod = mock.patch( - 'airflow.contrib.kubernetes.worker_configuration.Pod' - ) self.resources = mock.patch( 'airflow.contrib.kubernetes.worker_configuration.Resources' ) @@ -99,7 +130,7 @@ def setUp(self): 'airflow.contrib.kubernetes.worker_configuration.Secret' ) - for patcher in [self.pod, self.resources, self.secret]: + for patcher in [self.resources, self.secret]: self.mock_foo = patcher.start() self.addCleanup(patcher.stop) @@ -155,6 +186,55 @@ def test_worker_environment_when_dags_folder_specified(self): self.assertEqual(dags_folder, env['AIRFLOW__CORE__DAGS_FOLDER']) + def test_make_pod_with_empty_executor_config(self): + self.kube_config.kube_affinity = self.affinity_config + self.kube_config.kube_tolerations = self.tolerations_config + + worker_config = WorkerConfiguration(self.kube_config) + kube_executor_config = KubernetesExecutorConfig(annotations=[], + volumes=[], + volume_mounts=[] + ) + + pod = worker_config.make_pod("default", str(uuid.uuid4()), "test_pod_id", "test_dag_id", + "test_task_id", str(datetime.utcnow()), "bash -c 'ls /'", + kube_executor_config) + + self.assertTrue(pod.affinity['podAntiAffinity'] is not None) + self.assertEqual('app', + pod.affinity['podAntiAffinity'] + ['requiredDuringSchedulingIgnoredDuringExecution'][0] + ['labelSelector'] + ['matchExpressions'][0] + ['key']) + + self.assertEqual(2, len(pod.tolerations)) + self.assertEqual('prod', pod.tolerations[1]['key']) + + def test_make_pod_with_executor_config(self): + worker_config = WorkerConfiguration(self.kube_config) + kube_executor_config = KubernetesExecutorConfig(affinity=self.affinity_config, + tolerations=self.tolerations_config, + annotations=[], + volumes=[], + volume_mounts=[] + ) + + pod = worker_config.make_pod("default", str(uuid.uuid4()), "test_pod_id", "test_dag_id", + "test_task_id", str(datetime.utcnow()), "bash -c 'ls /'", + kube_executor_config) + + self.assertTrue(pod.affinity['podAntiAffinity'] is not None) + self.assertEqual('app', + pod.affinity['podAntiAffinity'] + ['requiredDuringSchedulingIgnoredDuringExecution'][0] + ['labelSelector'] + ['matchExpressions'][0] + ['key']) + + self.assertEqual(2, len(pod.tolerations)) + self.assertEqual('prod', pod.tolerations[1]['key']) + class TestKubernetesExecutor(unittest.TestCase): """ From 07a262b9101305673100c5e8bb79899b6be9eb10 Mon Sep 17 00:00:00 2001 From: Kevin Pullin Date: Sun, 2 Dec 2018 19:56:09 -0800 Subject: [PATCH 2/3] Copy annotations as dict, not list --- airflow/contrib/kubernetes/worker_configuration.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow/contrib/kubernetes/worker_configuration.py b/airflow/contrib/kubernetes/worker_configuration.py index 7a091a7d8d621..2ebd097a3ae02 100644 --- a/airflow/contrib/kubernetes/worker_configuration.py +++ b/airflow/contrib/kubernetes/worker_configuration.py @@ -197,7 +197,7 @@ def make_pod(self, namespace, worker_uuid, pod_id, dag_id, task_id, execution_da limit_cpu=kube_executor_config.limit_cpu ) gcp_sa_key = kube_executor_config.gcp_service_account_key - annotations = list(kube_executor_config.annotations) + annotations = dict(kube_executor_config.annotations) if gcp_sa_key: annotations['iam.cloud.google.com/service-account'] = gcp_sa_key From 19af762ffcbf8674f96f52da616a70ceff525dc9 Mon Sep 17 00:00:00 2001 From: Kaxil Naik Date: Sun, 30 Dec 2018 09:54:05 -0800 Subject: [PATCH 3/3] Update airflow/contrib/kubernetes/pod.py Co-Authored-By: kppullin --- airflow/contrib/kubernetes/pod.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow/contrib/kubernetes/pod.py b/airflow/contrib/kubernetes/pod.py index 281ebea6bb521..6d2977592598a 100644 --- a/airflow/contrib/kubernetes/pod.py +++ b/airflow/contrib/kubernetes/pod.py @@ -63,7 +63,7 @@ class Pod: :param hostnetwork: If True enable host networking on the pod :type hostnetwork: bool :param tolerations: A list of kubernetes tolerations - :type tolerations: list tolerations + :type tolerations: list """ def __init__( self,