Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions airflow/config_templates/default_airflow.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -664,6 +664,16 @@ gcp_service_account_keys =
# It will raise an exception if called from a process not running in a kubernetes environment.
in_cluster = True

# Affinity configuration as a single line formatted JSON object.
# See the affinity model for top-level key names (e.g. `nodeAffinity`, etc.):
# https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.12/#affinity-v1-core
affinity =

# A list of toleration objects as a single line formatted JSON array
# See:
# https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.12/#toleration-v1-core
tolerations =

[kubernetes_node_selectors]
# The Key-value pairs to be given to worker pods.
# The worker pods will be scheduled to the nodes of the specified key-value pairs.
Expand Down
32 changes: 30 additions & 2 deletions airflow/contrib/example_dags/example_kubernetes_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,31 @@
schedule_interval=None
)

affinity = {
'podAntiAffinity': {
'requiredDuringSchedulingIgnoredDuringExecution': [
{
'topologyKey': 'kubernetes.io/hostname',
'labelSelector': {
'matchExpressions': [
{
'key': 'app',
'operator': 'In',
'values': ['airflow']
}
]
}
}
]
}
}

tolerations = [{
'key': 'dedicated',
'operator': 'Equal',
'value': 'airflow'
}]


def print_stuff():
print("stuff!")
Expand Down Expand Up @@ -59,11 +84,14 @@ def use_zip_binary():
executor_config={"KubernetesExecutor": {"image": "airflow/ci_zip:latest"}}
)

# Limit resources on this operator/task
# Limit resources on this operator/task with node affinity & tolerations
three_task = PythonOperator(
task_id="three_task", python_callable=print_stuff, dag=dag,
executor_config={
"KubernetesExecutor": {"request_memory": "128Mi", "limit_memory": "128Mi"}}
"KubernetesExecutor": {"request_memory": "128Mi",
"limit_memory": "128Mi",
"tolerations": tolerations,
"affinity": affinity}}
)

start_task.set_downstream([one_task, two_task, three_task])
23 changes: 20 additions & 3 deletions airflow/contrib/executors/kubernetes_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
# under the License.

import base64
import json
import multiprocessing
from queue import Queue
from dateutil import parser
Expand All @@ -40,7 +41,7 @@ class KubernetesExecutorConfig:
def __init__(self, image=None, image_pull_policy=None, request_memory=None,
request_cpu=None, limit_memory=None, limit_cpu=None,
gcp_service_account_key=None, node_selectors=None, affinity=None,
annotations=None, volumes=None, volume_mounts=None):
annotations=None, volumes=None, volume_mounts=None, tolerations=None):
self.image = image
self.image_pull_policy = image_pull_policy
self.request_memory = request_memory
Expand All @@ -53,16 +54,18 @@ def __init__(self, image=None, image_pull_policy=None, request_memory=None,
self.annotations = annotations
self.volumes = volumes
self.volume_mounts = volume_mounts
self.tolerations = tolerations

def __repr__(self):
return "{}(image={}, image_pull_policy={}, request_memory={}, request_cpu={}, " \
"limit_memory={}, limit_cpu={}, gcp_service_account_key={}, " \
"node_selectors={}, affinity={}, annotations={}, volumes={}, " \
"volume_mounts={})" \
"volume_mounts={}, tolerations={})" \
.format(KubernetesExecutorConfig.__name__, self.image, self.image_pull_policy,
self.request_memory, self.request_cpu, self.limit_memory,
self.limit_cpu, self.gcp_service_account_key, self.node_selectors,
self.affinity, self.annotations, self.volumes, self.volume_mounts)
self.affinity, self.annotations, self.volumes, self.volume_mounts,
self.tolerations)

@staticmethod
def from_dict(obj):
Expand All @@ -88,6 +91,7 @@ def from_dict(obj):
annotations=namespaced.get('annotations', {}),
volumes=namespaced.get('volumes', []),
volume_mounts=namespaced.get('volume_mounts', []),
tolerations=namespaced.get('tolerations', None),
)

def as_dict(self):
Expand All @@ -104,6 +108,7 @@ def as_dict(self):
'annotations': self.annotations,
'volumes': self.volumes,
'volume_mounts': self.volume_mounts,
'tolerations': self.tolerations,
}


Expand Down Expand Up @@ -217,6 +222,18 @@ def __init__(self):
# configmap
self.airflow_configmap = conf.get(self.kubernetes_section, 'airflow_configmap')

affinity_json = conf.get(self.kubernetes_section, 'affinity')
if affinity_json:
self.kube_affinity = json.loads(affinity_json)
else:
self.kube_affinity = None

tolerations_json = conf.get(self.kubernetes_section, 'tolerations')
if tolerations_json:
self.kube_tolerations = json.loads(tolerations_json)
else:
self.kube_tolerations = None

self._validate()

def _validate(self):
Expand Down
4 changes: 4 additions & 0 deletions airflow/contrib/kubernetes/pod.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,10 @@ class Pod:
:type image_pull_secrets: str
:param affinity: A dict containing a group of affinity scheduling rules
:type affinity: dict
:param hostnetwork: If True enable host networking on the pod
:type hostnetwork: bool
:param tolerations: A list of kubernetes tolerations
:type tolerations: list
"""
def __init__(
self,
Expand Down
8 changes: 6 additions & 2 deletions airflow/contrib/kubernetes/worker_configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -222,13 +222,16 @@ def make_pod(self, namespace, worker_uuid, pod_id, dag_id, task_id, execution_da
limit_cpu=kube_executor_config.limit_cpu
)
gcp_sa_key = kube_executor_config.gcp_service_account_key
annotations = kube_executor_config.annotations.copy()
annotations = dict(kube_executor_config.annotations)
if gcp_sa_key:
annotations['iam.cloud.google.com/service-account'] = gcp_sa_key

volumes = [value for value in volumes_dict.values()] + kube_executor_config.volumes
volume_mounts = [value for value in volume_mounts_dict.values()] + kube_executor_config.volume_mounts

affinity = kube_executor_config.affinity or self.kube_config.kube_affinity
tolerations = kube_executor_config.tolerations or self.kube_config.kube_tolerations

return Pod(
namespace=namespace,
name=pod_id,
Expand All @@ -253,5 +256,6 @@ def make_pod(self, namespace, worker_uuid, pod_id, dag_id, task_id, execution_da
annotations=annotations,
node_selectors=(kube_executor_config.node_selectors or
self.kube_config.kube_node_selectors),
affinity=kube_executor_config.affinity
affinity=affinity,
tolerations=tolerations
)
6 changes: 4 additions & 2 deletions airflow/contrib/operators/kubernetes_pod_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,10 @@ class KubernetesPodOperator(BaseOperator):
/airflow/xcom/return.json in the container will also be pushed to an
XCom when the container completes.
:type xcom_push: bool
:param tolerations: Kubernetes tolerations
:type list of tolerations
:param hostnetwork: If True enable host networking on the pod
:type hostnetwork: bool
:param tolerations: A list of kubernetes tolerations
:type tolerations: list tolerations
"""
template_fields = ('cmds', 'arguments', 'env_vars', 'config_file')

Expand Down
4 changes: 4 additions & 0 deletions scripts/ci/kubernetes/kube/templates/configmaps.template.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,10 @@ data:
namespace = default
gcp_service_account_keys =

# Example affinity and toleration definitions.
affinity = {"nodeAffinity":{"requiredDuringSchedulingIgnoredDuringExecution":{"nodeSelectorTerms":[{"matchExpressions":[{"key":"kubernetes.io/hostname","operator":"NotIn","values":["4e5e6a99-e28a-450b-bba9-e0124853de9b"]}]}]}}}
tolerations = [{ "key": "dedicated", "operator": "Equal", "value": "airflow", "effect": "NoSchedule" }, { "key": "prod", "operator": "Exists" }]

# For cloning DAGs from git repositories into volumes: https://github.com/kubernetes/git-sync
git_sync_container_repository = gcr.io/google-containers/git-sync-amd64
git_sync_container_tag = v2.0.5
Expand Down
88 changes: 84 additions & 4 deletions tests/contrib/executors/test_kubernetes_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
#

import unittest
import uuid

import mock
import re
import string
Expand All @@ -25,6 +27,7 @@
from kubernetes.client.rest import ApiException
from airflow.contrib.executors.kubernetes_executor import AirflowKubernetesScheduler
from airflow.contrib.executors.kubernetes_executor import KubernetesExecutor
from airflow.contrib.executors.kubernetes_executor import KubernetesExecutorConfig
from airflow.contrib.kubernetes.worker_configuration import WorkerConfiguration
except ImportError:
AirflowKubernetesScheduler = None
Expand Down Expand Up @@ -85,21 +88,49 @@ class TestKubernetesWorkerConfiguration(unittest.TestCase):
options are passed to worker pod config
"""

affinity_config = {
'podAntiAffinity': {
'requiredDuringSchedulingIgnoredDuringExecution': [
{
'topologyKey': 'kubernetes.io/hostname',
'labelSelector': {
'matchExpressions': [
{
'key': 'app',
'operator': 'In',
'values': ['airflow']
}
]
}
}
]
}
}

tolerations_config = [
{
'key': 'dedicated',
'operator': 'Equal',
'value': 'airflow'
},
{
'key': 'prod',
'operator': 'Exists'
}
]

def setUp(self):
if AirflowKubernetesScheduler is None:
self.skipTest("kubernetes python package is not installed")

self.pod = mock.patch(
'airflow.contrib.kubernetes.worker_configuration.Pod'
)
self.resources = mock.patch(
'airflow.contrib.kubernetes.worker_configuration.Resources'
)
self.secret = mock.patch(
'airflow.contrib.kubernetes.worker_configuration.Secret'
)

for patcher in [self.pod, self.resources, self.secret]:
for patcher in [self.resources, self.secret]:
self.mock_foo = patcher.start()
self.addCleanup(patcher.stop)

Expand Down Expand Up @@ -200,6 +231,55 @@ def test_worker_environment_dags_folder_using_git_sync(self):

self.assertEqual(dags_folder, env['AIRFLOW__CORE__DAGS_FOLDER'])

def test_make_pod_with_empty_executor_config(self):
self.kube_config.kube_affinity = self.affinity_config
self.kube_config.kube_tolerations = self.tolerations_config

worker_config = WorkerConfiguration(self.kube_config)
kube_executor_config = KubernetesExecutorConfig(annotations=[],
volumes=[],
volume_mounts=[]
)

pod = worker_config.make_pod("default", str(uuid.uuid4()), "test_pod_id", "test_dag_id",
"test_task_id", str(datetime.utcnow()), "bash -c 'ls /'",
kube_executor_config)

self.assertTrue(pod.affinity['podAntiAffinity'] is not None)
self.assertEqual('app',
pod.affinity['podAntiAffinity']
['requiredDuringSchedulingIgnoredDuringExecution'][0]
['labelSelector']
['matchExpressions'][0]
['key'])

self.assertEqual(2, len(pod.tolerations))
self.assertEqual('prod', pod.tolerations[1]['key'])

def test_make_pod_with_executor_config(self):
worker_config = WorkerConfiguration(self.kube_config)
kube_executor_config = KubernetesExecutorConfig(affinity=self.affinity_config,
tolerations=self.tolerations_config,
annotations=[],
volumes=[],
volume_mounts=[]
)

pod = worker_config.make_pod("default", str(uuid.uuid4()), "test_pod_id", "test_dag_id",
"test_task_id", str(datetime.utcnow()), "bash -c 'ls /'",
kube_executor_config)

self.assertTrue(pod.affinity['podAntiAffinity'] is not None)
self.assertEqual('app',
pod.affinity['podAntiAffinity']
['requiredDuringSchedulingIgnoredDuringExecution'][0]
['labelSelector']
['matchExpressions'][0]
['key'])

self.assertEqual(2, len(pod.tolerations))
self.assertEqual('prod', pod.tolerations[1]['key'])

def test_worker_pvc_dags(self):
# Tests persistence volume config created when `dags_volume_claim` is set
self.kube_config.dags_volume_claim = 'airflow-dags'
Expand Down