diff --git a/airflow/config_templates/default_test.cfg b/airflow/config_templates/default_test.cfg index f9279cce54c76..2630a60ce4ef6 100644 --- a/airflow/config_templates/default_test.cfg +++ b/airflow/config_templates/default_test.cfg @@ -125,3 +125,6 @@ hide_sensitive_variable_fields = True elasticsearch_host = elasticsearch_log_id_template = {{dag_id}}-{{task_id}}-{{execution_date}}-{{try_number}} elasticsearch_end_of_log_mark = end_of_log + +[kubernetes] +dags_volume_claim = default diff --git a/airflow/contrib/executors/kubernetes_executor.py b/airflow/contrib/executors/kubernetes_executor.py index de1f9f4235d11..f9e350d3031b1 100644 --- a/airflow/contrib/executors/kubernetes_executor.py +++ b/airflow/contrib/executors/kubernetes_executor.py @@ -599,8 +599,14 @@ def sync(self): last_resource_version, session=self._session) if not self.task_queue.empty(): - key, command, kube_executor_config = self.task_queue.get() - self.kube_scheduler.run_next((key, command, kube_executor_config)) + task = self.task_queue.get() + + try: + self.kube_scheduler.run_next(task) + except ApiException: + self.log.exception('ApiException when attempting ' + + 'to run task, re-queueing.') + self.task_queue.put(task) def _change_state(self, key, state, pod_id): if state != State.RUNNING: diff --git a/tests/contrib/executors/test_kubernetes_executor.py b/tests/contrib/executors/test_kubernetes_executor.py index c203e18d5cf8e..905beeec40db3 100644 --- a/tests/contrib/executors/test_kubernetes_executor.py +++ b/tests/contrib/executors/test_kubernetes_executor.py @@ -18,10 +18,13 @@ import re import string import random +from urllib3 import HTTPResponse from datetime import datetime try: + from kubernetes.client.rest import ApiException from airflow.contrib.executors.kubernetes_executor import AirflowKubernetesScheduler + from airflow.contrib.executors.kubernetes_executor import KubernetesExecutor from airflow.contrib.kubernetes.worker_configuration import WorkerConfiguration except ImportError: AirflowKubernetesScheduler = None @@ -81,6 +84,7 @@ class TestKubernetesWorkerConfiguration(unittest.TestCase): Tests that if dags_volume_subpath/logs_volume_subpath configuration options are passed to worker pod config """ + def setUp(self): if AirflowKubernetesScheduler is None: self.skipTest("kubernetes python package is not installed") @@ -152,5 +156,61 @@ def test_worker_environment_when_dags_folder_specified(self): self.assertEqual(dags_folder, env['AIRFLOW__CORE__DAGS_FOLDER']) +class TestKubernetesExecutor(unittest.TestCase): + """ + Tests if an ApiException from the Kube Client will cause the task to + be rescheduled. + """ + @unittest.skipIf(AirflowKubernetesScheduler is None, + 'kubernetes python package is not installed') + @mock.patch('airflow.contrib.executors.kubernetes_executor.KubernetesJobWatcher') + @mock.patch('airflow.contrib.executors.kubernetes_executor.get_kube_client') + def test_run_next_exception(self, mock_get_kube_client, mock_kubernetes_job_watcher): + + # When a quota is exceeded this is the ApiException we get + r = HTTPResponse() + r.body = { + "kind": "Status", + "apiVersion": "v1", + "metadata": {}, + "status": "Failure", + "message": "pods \"podname\" is forbidden: " + + "exceeded quota: compute-resources, " + + "requested: limits.memory=4Gi, " + + "used: limits.memory=6508Mi, " + + "limited: limits.memory=10Gi", + "reason": "Forbidden", + "details": {"name": "podname", "kind": "pods"}, + "code": 403}, + r.status = 403 + r.reason = "Forbidden" + + # A mock kube_client that throws errors when making a pod + mock_kube_client = mock.patch('kubernetes.client.CoreV1Api', autospec=True) + mock_kube_client.create_namespaced_pod = mock.MagicMock( + side_effect=ApiException(http_resp=r)) + mock_get_kube_client.return_value = mock_kube_client + + kubernetesExecutor = KubernetesExecutor() + kubernetesExecutor.start() + + # Execute a task while the Api Throws errors + kubernetesExecutor.execute_async(key=('dag', 'task', datetime.utcnow()), + command='command', executor_config={}) + kubernetesExecutor.sync() + kubernetesExecutor.sync() + + mock_kube_client.create_namespaced_pod.assert_called() + self.assertFalse(kubernetesExecutor.task_queue.empty()) + + # Disable the ApiException + mock_kube_client.create_namespaced_pod.side_effect = None + + # Execute the task without errors should empty the queue + kubernetesExecutor.sync() + mock_kube_client.create_namespaced_pod.assert_called() + self.assertTrue(kubernetesExecutor.task_queue.empty()) + + if __name__ == '__main__': unittest.main()