From c6c0b4620d4dcd09ce1059b2acef13dd2d22e935 Mon Sep 17 00:00:00 2001 From: Ash Berlin-Taylor Date: Tue, 26 Jun 2018 10:23:05 +0100 Subject: [PATCH] [AIRFLOW-1840] Support back-compat on old celery config The new names are in-line with Celery 4, but if anyone upgrades Airflow without following the UPDATING.md instructions (which we probably assume most people won't, not until something stops working) their workers would suddenly just start failing. That's bad. This will issue a warning but carry on working as expected. We can remove the deprecation settings (but leave the code in config) after this release has been made. --- UPDATING.md | 3 +++ airflow/configuration.py | 53 +++++++++++++++++++++++++++++++++++++++- setup.py | 2 +- tests/configuration.py | 40 +++++++++++++++++++++++++++++- 4 files changed, 95 insertions(+), 3 deletions(-) diff --git a/UPDATING.md b/UPDATING.md index 3a66e735c3ac4..c341b38f0bc22 100644 --- a/UPDATING.md +++ b/UPDATING.md @@ -56,6 +56,9 @@ To make the config of Airflow compatible with Celery, some properties have been ``` celeryd_concurrency -> worker_concurrency celery_result_backend -> result_backend +celery_ssl_active -> ssl_active +celery_ssl_cert -> ssl_cert +celery_ssl_key -> ssl_key ``` Resulting in the same config parameters as Celery 4, with more transparency. diff --git a/airflow/configuration.py b/airflow/configuration.py index 2ee453fd7d93f..e2089e5b5b94b 100644 --- a/airflow/configuration.py +++ b/airflow/configuration.py @@ -121,9 +121,29 @@ class AirflowConfigParser(ConfigParser): ('core', 'sql_alchemy_conn'), ('core', 'fernet_key'), ('celery', 'broker_url'), - ('celery', 'result_backend') + ('celery', 'result_backend'), + # Todo: remove this in Airflow 1.11 + ('celery', 'celery_result_backend'), } + # A two-level mapping of (section -> new_name -> old_name). When reading + # new_name, the old_name will be checked to see if it exists. If it does a + # DeprecationWarning will be issued and the old name will be used instead + deprecated_options = { + 'celery': { + # Remove these keys in Airflow 1.11 + 'worker_concurrency': 'celeryd_concurrency', + 'broker_url': 'celery_broker_url', + 'ssl_active': 'celery_ssl_active', + 'ssl_cert': 'celery_ssl_cert', + 'ssl_key': 'celery_ssl_key', + } + } + deprecation_format_string = ( + 'The {old} option in [{section}] has been renamed to {new} - the old ' + 'setting has been used, but please update your config.' + ) + def __init__(self, default_config=None, *args, **kwargs): super(AirflowConfigParser, self).__init__(*args, **kwargs) @@ -181,10 +201,17 @@ def get(self, section, key, **kwargs): section = str(section).lower() key = str(key).lower() + deprecated_name = self.deprecated_options.get(section, {}).get(key, None) + # first check environment variables option = self._get_env_var_option(section, key) if option is not None: return option + if deprecated_name: + option = self._get_env_var_option(section, deprecated_name) + if option is not None: + self._warn_deprecate(section, key, deprecated_name) + return option # ...then the config file if super(AirflowConfigParser, self).has_option(section, key): @@ -192,11 +219,24 @@ def get(self, section, key, **kwargs): # separate the config from default config. return expand_env_var( super(AirflowConfigParser, self).get(section, key, **kwargs)) + if deprecated_name: + if super(AirflowConfigParser, self).has_option(section, deprecated_name): + self._warn_deprecate(section, key, deprecated_name) + return expand_env_var(super(AirflowConfigParser, self).get( + section, + deprecated_name, + **kwargs + )) # ...then commands option = self._get_cmd_option(section, key) if option: return option + if deprecated_name: + option = self._get_cmd_option(section, deprecated_name) + if option: + self._warn_deprecate(section, key, deprecated_name) + return option # ...then the default config if self.defaults.has_option(section, key): @@ -352,6 +392,17 @@ def load_test_config(self): # then read any "custom" test settings self.read(TEST_CONFIG_FILE) + def _warn_deprecate(self, section, key, deprecated_name): + warnings.warn( + self.deprecation_format_string.format( + old=deprecated_name, + new=key, + section=section, + ), + DeprecationWarning, + stacklevel=3, + ) + def mkdir_p(path): try: diff --git a/setup.py b/setup.py index 2882c885224f6..798318c7d99aa 100644 --- a/setup.py +++ b/setup.py @@ -231,7 +231,7 @@ def write_version(filename=os.path.join(*['airflow', devel_ci = [package for package in devel_all if package not in ['snakebite>=2.7.8', 'snakebite[kerberos]>=2.7.8']] else: - devel_ci = devel_all + devel_ci = devel_all + ['unittest2'] def do_setup(): diff --git a/tests/configuration.py b/tests/configuration.py index ca1d1d7a37df6..ac6f7b7db7746 100644 --- a/tests/configuration.py +++ b/tests/configuration.py @@ -20,7 +20,7 @@ from __future__ import print_function from __future__ import unicode_literals -import unittest +import os from collections import OrderedDict import six @@ -28,6 +28,12 @@ from airflow import configuration from airflow.configuration import conf, AirflowConfigParser, parameterized_config +if six.PY2: + # Need `assertWarns` back-ported from unittest2 + import unittest2 as unittest +else: + import unittest + class ConfTest(unittest.TestCase): @@ -154,3 +160,35 @@ def test_broker_transport_options(self): self.assertTrue(isinstance(section_dict['_test_only_float'], float)) self.assertTrue(isinstance(section_dict['_test_only_string'], six.string_types)) + + def test_deprecated_options(self): + # Guarantee we have a deprecated setting, so we test the deprecation + # lookup even if we remove this explicit fallback + conf.deprecated_options['celery'] = { + 'worker_concurrency': 'celeryd_concurrency', + } + + # Remove it so we are sure we use the right setting + conf.remove_option('celery', 'worker_concurrency') + + with self.assertWarns(DeprecationWarning): + os.environ['AIRFLOW__CELERY__CELERYD_CONCURRENCY'] = '99' + self.assertEquals(conf.getint('celery', 'worker_concurrency'), 99) + os.environ.pop('AIRFLOW__CELERY__CELERYD_CONCURRENCY') + + with self.assertWarns(DeprecationWarning): + conf.set('celery', 'celeryd_concurrency', '99') + self.assertEquals(conf.getint('celery', 'worker_concurrency'), 99) + conf.remove_option('celery', 'celeryd_concurrency') + + def test_deprecated_options_cmd(self): + # Guarantee we have a deprecated setting, so we test the deprecation + # lookup even if we remove this explicit fallback + conf.deprecated_options['celery'] = {'result_backend': 'celery_result_backend'} + conf.as_command_stdout.add(('celery', 'celery_result_backend')) + + conf.remove_option('celery', 'result_backend') + conf.set('celery', 'celery_result_backend_cmd', '/bin/echo 99') + + with self.assertWarns(DeprecationWarning): + self.assertEquals(conf.getint('celery', 'result_backend'), 99)