Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions UPDATING.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@ To make the config of Airflow compatible with Celery, some properties have been
```
celeryd_concurrency -> worker_concurrency
celery_result_backend -> result_backend
celery_ssl_active -> ssl_active
celery_ssl_cert -> ssl_cert
celery_ssl_key -> ssl_key
```
Resulting in the same config parameters as Celery 4, with more transparency.

Expand Down
53 changes: 52 additions & 1 deletion airflow/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,9 +121,29 @@ class AirflowConfigParser(ConfigParser):
('core', 'sql_alchemy_conn'),
('core', 'fernet_key'),
('celery', 'broker_url'),
('celery', 'result_backend')
('celery', 'result_backend'),
# Todo: remove this in Airflow 1.11
('celery', 'celery_result_backend'),
}

# A two-level mapping of (section -> new_name -> old_name). When reading
# new_name, the old_name will be checked to see if it exists. If it does a
# DeprecationWarning will be issued and the old name will be used instead
deprecated_options = {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How are multi-hop upgrades handled?
Say Airflow 1.8 -> 1.11

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@afernandez Traditionally, we have not supported them very well. We have a few pre apache releases that required pretty much a full reinstall. Right now, I think we should mostly offer to support one hop upgrades.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah - right now one hop is the most we should aim for. One of my long-term tasks is to write down what the back-compat policy we should adopt is. (I plan on basing it on Django's when I get around to it)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Breaking changes should happen only in major versions. Thus not from 1.10 -> 1.11, but rather from 1.10 -> 2.0

'celery': {
# Remove these keys in Airflow 1.11
'worker_concurrency': 'celeryd_concurrency',
'broker_url': 'celery_broker_url',
'ssl_active': 'celery_ssl_active',
'ssl_cert': 'celery_ssl_cert',
'ssl_key': 'celery_ssl_key',
}
}
deprecation_format_string = (
'The {old} option in [{section}] has been renamed to {new} - the old '
'setting has been used, but please update your config.'
)

def __init__(self, default_config=None, *args, **kwargs):
super(AirflowConfigParser, self).__init__(*args, **kwargs)

Expand Down Expand Up @@ -181,22 +201,42 @@ def get(self, section, key, **kwargs):
section = str(section).lower()
key = str(key).lower()

deprecated_name = self.deprecated_options.get(section, {}).get(key, None)

# first check environment variables
option = self._get_env_var_option(section, key)
if option is not None:
return option
if deprecated_name:
option = self._get_env_var_option(section, deprecated_name)
if option is not None:
self._warn_deprecate(section, key, deprecated_name)
return option

# ...then the config file
if super(AirflowConfigParser, self).has_option(section, key):
# Use the parent's methods to get the actual config here to be able to
# separate the config from default config.
return expand_env_var(
super(AirflowConfigParser, self).get(section, key, **kwargs))
if deprecated_name:
if super(AirflowConfigParser, self).has_option(section, deprecated_name):
self._warn_deprecate(section, key, deprecated_name)
return expand_env_var(super(AirflowConfigParser, self).get(
section,
deprecated_name,
**kwargs
))

# ...then commands
option = self._get_cmd_option(section, key)
if option:
return option
if deprecated_name:
option = self._get_cmd_option(section, deprecated_name)
if option:
self._warn_deprecate(section, key, deprecated_name)
return option

# ...then the default config
if self.defaults.has_option(section, key):
Expand Down Expand Up @@ -352,6 +392,17 @@ def load_test_config(self):
# then read any "custom" test settings
self.read(TEST_CONFIG_FILE)

def _warn_deprecate(self, section, key, deprecated_name):
warnings.warn(
self.deprecation_format_string.format(
old=deprecated_name,
new=key,
section=section,
),
DeprecationWarning,
stacklevel=3,
)


def mkdir_p(path):
try:
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ def write_version(filename=os.path.join(*['airflow',
devel_ci = [package for package in devel_all if package not in
['snakebite>=2.7.8', 'snakebite[kerberos]>=2.7.8']]
else:
devel_ci = devel_all
devel_ci = devel_all + ['unittest2']


def do_setup():
Expand Down
40 changes: 39 additions & 1 deletion tests/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,20 @@
from __future__ import print_function
from __future__ import unicode_literals

import unittest
import os
from collections import OrderedDict

import six

from airflow import configuration
from airflow.configuration import conf, AirflowConfigParser, parameterized_config

if six.PY2:
# Need `assertWarns` back-ported from unittest2
import unittest2 as unittest
else:
import unittest


class ConfTest(unittest.TestCase):

Expand Down Expand Up @@ -154,3 +160,35 @@ def test_broker_transport_options(self):
self.assertTrue(isinstance(section_dict['_test_only_float'], float))

self.assertTrue(isinstance(section_dict['_test_only_string'], six.string_types))

def test_deprecated_options(self):
# Guarantee we have a deprecated setting, so we test the deprecation
# lookup even if we remove this explicit fallback
conf.deprecated_options['celery'] = {
'worker_concurrency': 'celeryd_concurrency',
}

# Remove it so we are sure we use the right setting
conf.remove_option('celery', 'worker_concurrency')

with self.assertWarns(DeprecationWarning):
os.environ['AIRFLOW__CELERY__CELERYD_CONCURRENCY'] = '99'
self.assertEquals(conf.getint('celery', 'worker_concurrency'), 99)
os.environ.pop('AIRFLOW__CELERY__CELERYD_CONCURRENCY')

with self.assertWarns(DeprecationWarning):
conf.set('celery', 'celeryd_concurrency', '99')
self.assertEquals(conf.getint('celery', 'worker_concurrency'), 99)
conf.remove_option('celery', 'celeryd_concurrency')

def test_deprecated_options_cmd(self):
# Guarantee we have a deprecated setting, so we test the deprecation
# lookup even if we remove this explicit fallback
conf.deprecated_options['celery'] = {'result_backend': 'celery_result_backend'}
conf.as_command_stdout.add(('celery', 'celery_result_backend'))

conf.remove_option('celery', 'result_backend')
conf.set('celery', 'celery_result_backend_cmd', '/bin/echo 99')

with self.assertWarns(DeprecationWarning):
self.assertEquals(conf.getint('celery', 'result_backend'), 99)