diff --git a/airflow/config_templates/default_airflow.cfg b/airflow/config_templates/default_airflow.cfg index 536f0061e4cbd..960fe8ff5f514 100644 --- a/airflow/config_templates/default_airflow.cfg +++ b/airflow/config_templates/default_airflow.cfg @@ -664,6 +664,16 @@ gcp_service_account_keys = # It will raise an exception if called from a process not running in a kubernetes environment. in_cluster = True +# Affinity configuration as a single line formatted JSON object. +# See the affinity model for top-level key names (e.g. `nodeAffinity`, etc.): +# https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.12/#affinity-v1-core +affinity = + +# A list of toleration objects as a single line formatted JSON array +# See: +# https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.12/#toleration-v1-core +tolerations = + [kubernetes_node_selectors] # The Key-value pairs to be given to worker pods. # The worker pods will be scheduled to the nodes of the specified key-value pairs. diff --git a/airflow/contrib/example_dags/example_kubernetes_executor.py b/airflow/contrib/example_dags/example_kubernetes_executor.py index 1d9bb7304318b..d03e255ab3287 100644 --- a/airflow/contrib/example_dags/example_kubernetes_executor.py +++ b/airflow/contrib/example_dags/example_kubernetes_executor.py @@ -32,6 +32,31 @@ schedule_interval=None ) +affinity = { + 'podAntiAffinity': { + 'requiredDuringSchedulingIgnoredDuringExecution': [ + { + 'topologyKey': 'kubernetes.io/hostname', + 'labelSelector': { + 'matchExpressions': [ + { + 'key': 'app', + 'operator': 'In', + 'values': ['airflow'] + } + ] + } + } + ] + } +} + +tolerations = [{ + 'key': 'dedicated', + 'operator': 'Equal', + 'value': 'airflow' +}] + def print_stuff(): print("stuff!") @@ -59,11 +84,14 @@ def use_zip_binary(): executor_config={"KubernetesExecutor": {"image": "airflow/ci_zip:latest"}} ) -# Limit resources on this operator/task +# Limit resources on this operator/task with node affinity & tolerations three_task = PythonOperator( task_id="three_task", python_callable=print_stuff, dag=dag, executor_config={ - "KubernetesExecutor": {"request_memory": "128Mi", "limit_memory": "128Mi"}} + "KubernetesExecutor": {"request_memory": "128Mi", + "limit_memory": "128Mi", + "tolerations": tolerations, + "affinity": affinity}} ) start_task.set_downstream([one_task, two_task, three_task]) diff --git a/airflow/contrib/executors/kubernetes_executor.py b/airflow/contrib/executors/kubernetes_executor.py index 5cb27d7551db1..fa81cf32036fb 100644 --- a/airflow/contrib/executors/kubernetes_executor.py +++ b/airflow/contrib/executors/kubernetes_executor.py @@ -16,6 +16,7 @@ # under the License. import base64 +import json import multiprocessing from queue import Queue from dateutil import parser @@ -40,7 +41,7 @@ class KubernetesExecutorConfig: def __init__(self, image=None, image_pull_policy=None, request_memory=None, request_cpu=None, limit_memory=None, limit_cpu=None, gcp_service_account_key=None, node_selectors=None, affinity=None, - annotations=None, volumes=None, volume_mounts=None): + annotations=None, volumes=None, volume_mounts=None, tolerations=None): self.image = image self.image_pull_policy = image_pull_policy self.request_memory = request_memory @@ -53,16 +54,18 @@ def __init__(self, image=None, image_pull_policy=None, request_memory=None, self.annotations = annotations self.volumes = volumes self.volume_mounts = volume_mounts + self.tolerations = tolerations def __repr__(self): return "{}(image={}, image_pull_policy={}, request_memory={}, request_cpu={}, " \ "limit_memory={}, limit_cpu={}, gcp_service_account_key={}, " \ "node_selectors={}, affinity={}, annotations={}, volumes={}, " \ - "volume_mounts={})" \ + "volume_mounts={}, tolerations={})" \ .format(KubernetesExecutorConfig.__name__, self.image, self.image_pull_policy, self.request_memory, self.request_cpu, self.limit_memory, self.limit_cpu, self.gcp_service_account_key, self.node_selectors, - self.affinity, self.annotations, self.volumes, self.volume_mounts) + self.affinity, self.annotations, self.volumes, self.volume_mounts, + self.tolerations) @staticmethod def from_dict(obj): @@ -88,6 +91,7 @@ def from_dict(obj): annotations=namespaced.get('annotations', {}), volumes=namespaced.get('volumes', []), volume_mounts=namespaced.get('volume_mounts', []), + tolerations=namespaced.get('tolerations', None), ) def as_dict(self): @@ -104,6 +108,7 @@ def as_dict(self): 'annotations': self.annotations, 'volumes': self.volumes, 'volume_mounts': self.volume_mounts, + 'tolerations': self.tolerations, } @@ -217,6 +222,18 @@ def __init__(self): # configmap self.airflow_configmap = conf.get(self.kubernetes_section, 'airflow_configmap') + affinity_json = conf.get(self.kubernetes_section, 'affinity') + if affinity_json: + self.kube_affinity = json.loads(affinity_json) + else: + self.kube_affinity = None + + tolerations_json = conf.get(self.kubernetes_section, 'tolerations') + if tolerations_json: + self.kube_tolerations = json.loads(tolerations_json) + else: + self.kube_tolerations = None + self._validate() def _validate(self): diff --git a/airflow/contrib/kubernetes/pod.py b/airflow/contrib/kubernetes/pod.py index bad5caa738e1b..6d2977592598a 100644 --- a/airflow/contrib/kubernetes/pod.py +++ b/airflow/contrib/kubernetes/pod.py @@ -60,6 +60,10 @@ class Pod: :type image_pull_secrets: str :param affinity: A dict containing a group of affinity scheduling rules :type affinity: dict + :param hostnetwork: If True enable host networking on the pod + :type hostnetwork: bool + :param tolerations: A list of kubernetes tolerations + :type tolerations: list """ def __init__( self, diff --git a/airflow/contrib/kubernetes/worker_configuration.py b/airflow/contrib/kubernetes/worker_configuration.py index 9a6f21340d4b2..7b9a942de63cf 100644 --- a/airflow/contrib/kubernetes/worker_configuration.py +++ b/airflow/contrib/kubernetes/worker_configuration.py @@ -222,13 +222,16 @@ def make_pod(self, namespace, worker_uuid, pod_id, dag_id, task_id, execution_da limit_cpu=kube_executor_config.limit_cpu ) gcp_sa_key = kube_executor_config.gcp_service_account_key - annotations = kube_executor_config.annotations.copy() + annotations = dict(kube_executor_config.annotations) if gcp_sa_key: annotations['iam.cloud.google.com/service-account'] = gcp_sa_key volumes = [value for value in volumes_dict.values()] + kube_executor_config.volumes volume_mounts = [value for value in volume_mounts_dict.values()] + kube_executor_config.volume_mounts + affinity = kube_executor_config.affinity or self.kube_config.kube_affinity + tolerations = kube_executor_config.tolerations or self.kube_config.kube_tolerations + return Pod( namespace=namespace, name=pod_id, @@ -253,5 +256,6 @@ def make_pod(self, namespace, worker_uuid, pod_id, dag_id, task_id, execution_da annotations=annotations, node_selectors=(kube_executor_config.node_selectors or self.kube_config.kube_node_selectors), - affinity=kube_executor_config.affinity + affinity=affinity, + tolerations=tolerations ) diff --git a/airflow/contrib/operators/kubernetes_pod_operator.py b/airflow/contrib/operators/kubernetes_pod_operator.py index f6c1f9d45ee16..3a5bef5381780 100644 --- a/airflow/contrib/operators/kubernetes_pod_operator.py +++ b/airflow/contrib/operators/kubernetes_pod_operator.py @@ -84,8 +84,10 @@ class KubernetesPodOperator(BaseOperator): /airflow/xcom/return.json in the container will also be pushed to an XCom when the container completes. :type xcom_push: bool - :param tolerations: Kubernetes tolerations - :type list of tolerations + :param hostnetwork: If True enable host networking on the pod + :type hostnetwork: bool + :param tolerations: A list of kubernetes tolerations + :type tolerations: list tolerations """ template_fields = ('cmds', 'arguments', 'env_vars', 'config_file') diff --git a/scripts/ci/kubernetes/kube/templates/configmaps.template.yaml b/scripts/ci/kubernetes/kube/templates/configmaps.template.yaml index 0ca6d423e47e3..353fdd94bb036 100644 --- a/scripts/ci/kubernetes/kube/templates/configmaps.template.yaml +++ b/scripts/ci/kubernetes/kube/templates/configmaps.template.yaml @@ -199,6 +199,10 @@ data: namespace = default gcp_service_account_keys = + # Example affinity and toleration definitions. + affinity = {"nodeAffinity":{"requiredDuringSchedulingIgnoredDuringExecution":{"nodeSelectorTerms":[{"matchExpressions":[{"key":"kubernetes.io/hostname","operator":"NotIn","values":["4e5e6a99-e28a-450b-bba9-e0124853de9b"]}]}]}}} + tolerations = [{ "key": "dedicated", "operator": "Equal", "value": "airflow", "effect": "NoSchedule" }, { "key": "prod", "operator": "Exists" }] + # For cloning DAGs from git repositories into volumes: https://github.com/kubernetes/git-sync git_sync_container_repository = gcr.io/google-containers/git-sync-amd64 git_sync_container_tag = v2.0.5 diff --git a/tests/contrib/executors/test_kubernetes_executor.py b/tests/contrib/executors/test_kubernetes_executor.py index 4836693002c31..76cd9fb2c884f 100644 --- a/tests/contrib/executors/test_kubernetes_executor.py +++ b/tests/contrib/executors/test_kubernetes_executor.py @@ -14,6 +14,8 @@ # import unittest +import uuid + import mock import re import string @@ -25,6 +27,7 @@ from kubernetes.client.rest import ApiException from airflow.contrib.executors.kubernetes_executor import AirflowKubernetesScheduler from airflow.contrib.executors.kubernetes_executor import KubernetesExecutor + from airflow.contrib.executors.kubernetes_executor import KubernetesExecutorConfig from airflow.contrib.kubernetes.worker_configuration import WorkerConfiguration except ImportError: AirflowKubernetesScheduler = None @@ -85,13 +88,41 @@ class TestKubernetesWorkerConfiguration(unittest.TestCase): options are passed to worker pod config """ + affinity_config = { + 'podAntiAffinity': { + 'requiredDuringSchedulingIgnoredDuringExecution': [ + { + 'topologyKey': 'kubernetes.io/hostname', + 'labelSelector': { + 'matchExpressions': [ + { + 'key': 'app', + 'operator': 'In', + 'values': ['airflow'] + } + ] + } + } + ] + } + } + + tolerations_config = [ + { + 'key': 'dedicated', + 'operator': 'Equal', + 'value': 'airflow' + }, + { + 'key': 'prod', + 'operator': 'Exists' + } + ] + def setUp(self): if AirflowKubernetesScheduler is None: self.skipTest("kubernetes python package is not installed") - self.pod = mock.patch( - 'airflow.contrib.kubernetes.worker_configuration.Pod' - ) self.resources = mock.patch( 'airflow.contrib.kubernetes.worker_configuration.Resources' ) @@ -99,7 +130,7 @@ def setUp(self): 'airflow.contrib.kubernetes.worker_configuration.Secret' ) - for patcher in [self.pod, self.resources, self.secret]: + for patcher in [self.resources, self.secret]: self.mock_foo = patcher.start() self.addCleanup(patcher.stop) @@ -200,6 +231,55 @@ def test_worker_environment_dags_folder_using_git_sync(self): self.assertEqual(dags_folder, env['AIRFLOW__CORE__DAGS_FOLDER']) + def test_make_pod_with_empty_executor_config(self): + self.kube_config.kube_affinity = self.affinity_config + self.kube_config.kube_tolerations = self.tolerations_config + + worker_config = WorkerConfiguration(self.kube_config) + kube_executor_config = KubernetesExecutorConfig(annotations=[], + volumes=[], + volume_mounts=[] + ) + + pod = worker_config.make_pod("default", str(uuid.uuid4()), "test_pod_id", "test_dag_id", + "test_task_id", str(datetime.utcnow()), "bash -c 'ls /'", + kube_executor_config) + + self.assertTrue(pod.affinity['podAntiAffinity'] is not None) + self.assertEqual('app', + pod.affinity['podAntiAffinity'] + ['requiredDuringSchedulingIgnoredDuringExecution'][0] + ['labelSelector'] + ['matchExpressions'][0] + ['key']) + + self.assertEqual(2, len(pod.tolerations)) + self.assertEqual('prod', pod.tolerations[1]['key']) + + def test_make_pod_with_executor_config(self): + worker_config = WorkerConfiguration(self.kube_config) + kube_executor_config = KubernetesExecutorConfig(affinity=self.affinity_config, + tolerations=self.tolerations_config, + annotations=[], + volumes=[], + volume_mounts=[] + ) + + pod = worker_config.make_pod("default", str(uuid.uuid4()), "test_pod_id", "test_dag_id", + "test_task_id", str(datetime.utcnow()), "bash -c 'ls /'", + kube_executor_config) + + self.assertTrue(pod.affinity['podAntiAffinity'] is not None) + self.assertEqual('app', + pod.affinity['podAntiAffinity'] + ['requiredDuringSchedulingIgnoredDuringExecution'][0] + ['labelSelector'] + ['matchExpressions'][0] + ['key']) + + self.assertEqual(2, len(pod.tolerations)) + self.assertEqual('prod', pod.tolerations[1]['key']) + def test_worker_pvc_dags(self): # Tests persistence volume config created when `dags_volume_claim` is set self.kube_config.dags_volume_claim = 'airflow-dags'