Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions airflow/config_templates/default_test.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -125,3 +125,6 @@ hide_sensitive_variable_fields = True
elasticsearch_host =
elasticsearch_log_id_template = {{dag_id}}-{{task_id}}-{{execution_date}}-{{try_number}}
elasticsearch_end_of_log_mark = end_of_log

[kubernetes]
dags_volume_claim = default
10 changes: 8 additions & 2 deletions airflow/contrib/executors/kubernetes_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -599,8 +599,14 @@ def sync(self):
last_resource_version, session=self._session)

if not self.task_queue.empty():
key, command, kube_executor_config = self.task_queue.get()
self.kube_scheduler.run_next((key, command, kube_executor_config))
task = self.task_queue.get()

try:
self.kube_scheduler.run_next(task)
except ApiException:
self.log.exception('ApiException when attempting ' +
'to run task, re-queueing.')
self.task_queue.put(task)

def _change_state(self, key, state, pod_id):
if state != State.RUNNING:
Expand Down
60 changes: 60 additions & 0 deletions tests/contrib/executors/test_kubernetes_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,13 @@
import re
import string
import random
from urllib3 import HTTPResponse
from datetime import datetime

try:
from kubernetes.client.rest import ApiException
from airflow.contrib.executors.kubernetes_executor import AirflowKubernetesScheduler
from airflow.contrib.executors.kubernetes_executor import KubernetesExecutor
from airflow.contrib.kubernetes.worker_configuration import WorkerConfiguration
except ImportError:
AirflowKubernetesScheduler = None
Expand Down Expand Up @@ -81,6 +84,7 @@ class TestKubernetesWorkerConfiguration(unittest.TestCase):
Tests that if dags_volume_subpath/logs_volume_subpath configuration
options are passed to worker pod config
"""

def setUp(self):
if AirflowKubernetesScheduler is None:
self.skipTest("kubernetes python package is not installed")
Expand Down Expand Up @@ -152,5 +156,61 @@ def test_worker_environment_when_dags_folder_specified(self):
self.assertEqual(dags_folder, env['AIRFLOW__CORE__DAGS_FOLDER'])


class TestKubernetesExecutor(unittest.TestCase):
"""
Tests if an ApiException from the Kube Client will cause the task to
be rescheduled.
"""
@unittest.skipIf(AirflowKubernetesScheduler is None,
'kubernetes python package is not installed')
@mock.patch('airflow.contrib.executors.kubernetes_executor.KubernetesJobWatcher')
@mock.patch('airflow.contrib.executors.kubernetes_executor.get_kube_client')
def test_run_next_exception(self, mock_get_kube_client, mock_kubernetes_job_watcher):

# When a quota is exceeded this is the ApiException we get
r = HTTPResponse()
r.body = {
"kind": "Status",
"apiVersion": "v1",
"metadata": {},
"status": "Failure",
"message": "pods \"podname\" is forbidden: " +
"exceeded quota: compute-resources, " +
"requested: limits.memory=4Gi, " +
"used: limits.memory=6508Mi, " +
"limited: limits.memory=10Gi",
"reason": "Forbidden",
"details": {"name": "podname", "kind": "pods"},
"code": 403},
r.status = 403
r.reason = "Forbidden"

# A mock kube_client that throws errors when making a pod
mock_kube_client = mock.patch('kubernetes.client.CoreV1Api', autospec=True)
mock_kube_client.create_namespaced_pod = mock.MagicMock(
side_effect=ApiException(http_resp=r))
mock_get_kube_client.return_value = mock_kube_client

kubernetesExecutor = KubernetesExecutor()
kubernetesExecutor.start()

# Execute a task while the Api Throws errors
kubernetesExecutor.execute_async(key=('dag', 'task', datetime.utcnow()),
command='command', executor_config={})
kubernetesExecutor.sync()
kubernetesExecutor.sync()

mock_kube_client.create_namespaced_pod.assert_called()
self.assertFalse(kubernetesExecutor.task_queue.empty())

# Disable the ApiException
mock_kube_client.create_namespaced_pod.side_effect = None

# Execute the task without errors should empty the queue
kubernetesExecutor.sync()
mock_kube_client.create_namespaced_pod.assert_called()
self.assertTrue(kubernetesExecutor.task_queue.empty())


if __name__ == '__main__':
unittest.main()