diff --git a/UPDATING.md b/UPDATING.md index 3a66e735c3ac4..c341b38f0bc22 100644 --- a/UPDATING.md +++ b/UPDATING.md @@ -56,6 +56,9 @@ To make the config of Airflow compatible with Celery, some properties have been ``` celeryd_concurrency -> worker_concurrency celery_result_backend -> result_backend +celery_ssl_active -> ssl_active +celery_ssl_cert -> ssl_cert +celery_ssl_key -> ssl_key ``` Resulting in the same config parameters as Celery 4, with more transparency. diff --git a/airflow/configuration.py b/airflow/configuration.py index 2ee453fd7d93f..e2089e5b5b94b 100644 --- a/airflow/configuration.py +++ b/airflow/configuration.py @@ -121,9 +121,29 @@ class AirflowConfigParser(ConfigParser): ('core', 'sql_alchemy_conn'), ('core', 'fernet_key'), ('celery', 'broker_url'), - ('celery', 'result_backend') + ('celery', 'result_backend'), + # Todo: remove this in Airflow 1.11 + ('celery', 'celery_result_backend'), } + # A two-level mapping of (section -> new_name -> old_name). When reading + # new_name, the old_name will be checked to see if it exists. If it does a + # DeprecationWarning will be issued and the old name will be used instead + deprecated_options = { + 'celery': { + # Remove these keys in Airflow 1.11 + 'worker_concurrency': 'celeryd_concurrency', + 'broker_url': 'celery_broker_url', + 'ssl_active': 'celery_ssl_active', + 'ssl_cert': 'celery_ssl_cert', + 'ssl_key': 'celery_ssl_key', + } + } + deprecation_format_string = ( + 'The {old} option in [{section}] has been renamed to {new} - the old ' + 'setting has been used, but please update your config.' + ) + def __init__(self, default_config=None, *args, **kwargs): super(AirflowConfigParser, self).__init__(*args, **kwargs) @@ -181,10 +201,17 @@ def get(self, section, key, **kwargs): section = str(section).lower() key = str(key).lower() + deprecated_name = self.deprecated_options.get(section, {}).get(key, None) + # first check environment variables option = self._get_env_var_option(section, key) if option is not None: return option + if deprecated_name: + option = self._get_env_var_option(section, deprecated_name) + if option is not None: + self._warn_deprecate(section, key, deprecated_name) + return option # ...then the config file if super(AirflowConfigParser, self).has_option(section, key): @@ -192,11 +219,24 @@ def get(self, section, key, **kwargs): # separate the config from default config. return expand_env_var( super(AirflowConfigParser, self).get(section, key, **kwargs)) + if deprecated_name: + if super(AirflowConfigParser, self).has_option(section, deprecated_name): + self._warn_deprecate(section, key, deprecated_name) + return expand_env_var(super(AirflowConfigParser, self).get( + section, + deprecated_name, + **kwargs + )) # ...then commands option = self._get_cmd_option(section, key) if option: return option + if deprecated_name: + option = self._get_cmd_option(section, deprecated_name) + if option: + self._warn_deprecate(section, key, deprecated_name) + return option # ...then the default config if self.defaults.has_option(section, key): @@ -352,6 +392,17 @@ def load_test_config(self): # then read any "custom" test settings self.read(TEST_CONFIG_FILE) + def _warn_deprecate(self, section, key, deprecated_name): + warnings.warn( + self.deprecation_format_string.format( + old=deprecated_name, + new=key, + section=section, + ), + DeprecationWarning, + stacklevel=3, + ) + def mkdir_p(path): try: diff --git a/setup.py b/setup.py index 2882c885224f6..798318c7d99aa 100644 --- a/setup.py +++ b/setup.py @@ -231,7 +231,7 @@ def write_version(filename=os.path.join(*['airflow', devel_ci = [package for package in devel_all if package not in ['snakebite>=2.7.8', 'snakebite[kerberos]>=2.7.8']] else: - devel_ci = devel_all + devel_ci = devel_all + ['unittest2'] def do_setup(): diff --git a/tests/configuration.py b/tests/configuration.py index ca1d1d7a37df6..ac6f7b7db7746 100644 --- a/tests/configuration.py +++ b/tests/configuration.py @@ -20,7 +20,7 @@ from __future__ import print_function from __future__ import unicode_literals -import unittest +import os from collections import OrderedDict import six @@ -28,6 +28,12 @@ from airflow import configuration from airflow.configuration import conf, AirflowConfigParser, parameterized_config +if six.PY2: + # Need `assertWarns` back-ported from unittest2 + import unittest2 as unittest +else: + import unittest + class ConfTest(unittest.TestCase): @@ -154,3 +160,35 @@ def test_broker_transport_options(self): self.assertTrue(isinstance(section_dict['_test_only_float'], float)) self.assertTrue(isinstance(section_dict['_test_only_string'], six.string_types)) + + def test_deprecated_options(self): + # Guarantee we have a deprecated setting, so we test the deprecation + # lookup even if we remove this explicit fallback + conf.deprecated_options['celery'] = { + 'worker_concurrency': 'celeryd_concurrency', + } + + # Remove it so we are sure we use the right setting + conf.remove_option('celery', 'worker_concurrency') + + with self.assertWarns(DeprecationWarning): + os.environ['AIRFLOW__CELERY__CELERYD_CONCURRENCY'] = '99' + self.assertEquals(conf.getint('celery', 'worker_concurrency'), 99) + os.environ.pop('AIRFLOW__CELERY__CELERYD_CONCURRENCY') + + with self.assertWarns(DeprecationWarning): + conf.set('celery', 'celeryd_concurrency', '99') + self.assertEquals(conf.getint('celery', 'worker_concurrency'), 99) + conf.remove_option('celery', 'celeryd_concurrency') + + def test_deprecated_options_cmd(self): + # Guarantee we have a deprecated setting, so we test the deprecation + # lookup even if we remove this explicit fallback + conf.deprecated_options['celery'] = {'result_backend': 'celery_result_backend'} + conf.as_command_stdout.add(('celery', 'celery_result_backend')) + + conf.remove_option('celery', 'result_backend') + conf.set('celery', 'celery_result_backend_cmd', '/bin/echo 99') + + with self.assertWarns(DeprecationWarning): + self.assertEquals(conf.getint('celery', 'result_backend'), 99)