From 6d83cbe849a5a5716d059730de941cc86fafa11f Mon Sep 17 00:00:00 2001 From: ORuteMa <30034544+ORuteMa@users.noreply.github.com> Date: Wed, 29 Mar 2023 15:41:55 +0800 Subject: [PATCH 1/6] support requirepass redis sentinel --- airflow/config_templates/default_celery.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/airflow/config_templates/default_celery.py b/airflow/config_templates/default_celery.py index d3d5a4adf11e2..28f42a59abc0c 100644 --- a/airflow/config_templates/default_celery.py +++ b/airflow/config_templates/default_celery.py @@ -26,7 +26,7 @@ def _broker_supports_visibility_timeout(url): - return url.startswith("redis://") or url.startswith("sqs://") + return url.startswith("redis://") or url.startswith("sqs://") or url.startswith("sentinel://") log = logging.getLogger(__name__) @@ -37,6 +37,12 @@ def _broker_supports_visibility_timeout(url): if "visibility_timeout" not in broker_transport_options: if _broker_supports_visibility_timeout(broker_url): broker_transport_options["visibility_timeout"] = 21600 +if "sentinel_kwargs" in broker_transport_options: + try: + sen_kwargs = conf.getjson("celery_broker_transport_options", "sentinel_kwargs") + broker_transport_options["sentinel_kwargs"] = sen_kwargs + except Exception: + raise AirflowException("sentinel_kwargs should be written in the correct dictionary format.") if conf.has_option("celery", "RESULT_BACKEND"): result_backend = conf.get_mandatory_value("celery", "RESULT_BACKEND") @@ -74,7 +80,7 @@ def _broker_supports_visibility_timeout(url): "ca_certs": conf.get("celery", "SSL_CACERT"), "cert_reqs": ssl.CERT_REQUIRED, } - elif broker_url and "redis://" in broker_url: + elif broker_url and ("redis://" in broker_url or "sentinel://" in broker_url): broker_use_ssl = { "ssl_keyfile": conf.get("celery", "SSL_KEY"), "ssl_certfile": conf.get("celery", "SSL_CERT"), From 53def7e95525b6676b688d7c8ad62777525e9cc2 Mon Sep 17 00:00:00 2001 From: ORuteMa <30034544+orutema@users.noreply.github.com> Date: Tue, 4 Apr 2023 16:10:52 +0800 Subject: [PATCH 2/6] pass static check --- airflow/config_templates/default_celery.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/airflow/config_templates/default_celery.py b/airflow/config_templates/default_celery.py index 28f42a59abc0c..fc77af04971ba 100644 --- a/airflow/config_templates/default_celery.py +++ b/airflow/config_templates/default_celery.py @@ -37,10 +37,12 @@ def _broker_supports_visibility_timeout(url): if "visibility_timeout" not in broker_transport_options: if _broker_supports_visibility_timeout(broker_url): broker_transport_options["visibility_timeout"] = 21600 + +broker_transport_options_for_celery: dict = dict.copy(broker_transport_options) if "sentinel_kwargs" in broker_transport_options: try: - sen_kwargs = conf.getjson("celery_broker_transport_options", "sentinel_kwargs") - broker_transport_options["sentinel_kwargs"] = sen_kwargs + sentinel_kwargs = conf.getjson("celery_broker_transport_options", "sentinel_kwargs") + broker_transport_options_for_celery["sentinel_kwargs"] = sentinel_kwargs except Exception: raise AirflowException("sentinel_kwargs should be written in the correct dictionary format.") @@ -59,7 +61,7 @@ def _broker_supports_visibility_timeout(url): "task_default_exchange": conf.get("operators", "DEFAULT_QUEUE"), "task_track_started": conf.getboolean("celery", "task_track_started"), "broker_url": broker_url, - "broker_transport_options": broker_transport_options, + "broker_transport_options": broker_transport_options_for_celery, "result_backend": result_backend, "worker_concurrency": conf.getint("celery", "WORKER_CONCURRENCY"), "worker_enable_remote_control": conf.getboolean("celery", "worker_enable_remote_control"), From de9e3803965a7e5006a6d748bf37cbf2f4926762 Mon Sep 17 00:00:00 2001 From: ORuteMa <30034544+orutema@users.noreply.github.com> Date: Tue, 25 Apr 2023 15:48:04 +0800 Subject: [PATCH 3/6] add docs for sentinel_kwargs --- airflow/config_templates/config.yml | 11 +++++++++++ airflow/config_templates/default_airflow.cfg | 8 ++++++++ airflow/config_templates/default_celery.py | 2 ++ 3 files changed, 21 insertions(+) diff --git a/airflow/config_templates/config.yml b/airflow/config_templates/config.yml index f06ab2c0cfbd9..0636da75f08b5 100644 --- a/airflow/config_templates/config.yml +++ b/airflow/config_templates/config.yml @@ -2101,6 +2101,17 @@ celery_broker_transport_options: type: string example: "21600" default: ~ + sentinel_kwargs: + description: | + The sentinel_kwargs parameter allows passing additional options to the Sentinel client. + In a typical scenario where Redis Sentinel is used as the broker and Redis servers are + password-protected, the password needs to be passed through this parameter. Although its + type is string, it is required to pass a string that conforms to the dictionary format, + which Airflow will perform special conversion on during reading. + version_added: 2.6.1 + type: string + example: '{"password": "password_for_redis_server"}' + default: ~ dask: description: | This section only applies if you are using the DaskExecutor in diff --git a/airflow/config_templates/default_airflow.cfg b/airflow/config_templates/default_airflow.cfg index 14ef880aa1d3f..afbed5e2dcdec 100644 --- a/airflow/config_templates/default_airflow.cfg +++ b/airflow/config_templates/default_airflow.cfg @@ -1077,6 +1077,14 @@ worker_precheck = False # Example: visibility_timeout = 21600 # visibility_timeout = +# The sentinel_kwargs parameter allows passing additional options to the Sentinel client. +# In a typical scenario where Redis Sentinel is used as the broker and Redis servers are +# password-protected, the password needs to be passed through this parameter. Although its +# type is string, it is required to pass a string that conforms to the dictionary format, +# which Airflow will perform special conversion on during reading. +# Example: sentinel_kwargs = {{"password": "password_for_redis_server"}} +# sentinel_kwargs = + [dask] # This section only applies if you are using the DaskExecutor in diff --git a/airflow/config_templates/default_celery.py b/airflow/config_templates/default_celery.py index fc77af04971ba..9ec320097e498 100644 --- a/airflow/config_templates/default_celery.py +++ b/airflow/config_templates/default_celery.py @@ -42,6 +42,8 @@ def _broker_supports_visibility_timeout(url): if "sentinel_kwargs" in broker_transport_options: try: sentinel_kwargs = conf.getjson("celery_broker_transport_options", "sentinel_kwargs") + if not isinstance(sentinel_kwargs, dict): + raise ValueError broker_transport_options_for_celery["sentinel_kwargs"] = sentinel_kwargs except Exception: raise AirflowException("sentinel_kwargs should be written in the correct dictionary format.") From 1e9d87869091fc9899fe6fb06d515b7bbf93cddc Mon Sep 17 00:00:00 2001 From: ORuteMa <30034544+orutema@users.noreply.github.com> Date: Fri, 5 May 2023 11:49:40 +0800 Subject: [PATCH 4/6] add docs for redis-sentinel usage --- airflow/config_templates/config.yml | 5 ++--- airflow/config_templates/default_airflow.cfg | 3 +-- docs/apache-airflow/core-concepts/executor/celery.rst | 3 ++- 3 files changed, 5 insertions(+), 6 deletions(-) diff --git a/airflow/config_templates/config.yml b/airflow/config_templates/config.yml index 0636da75f08b5..3d990654f3014 100644 --- a/airflow/config_templates/config.yml +++ b/airflow/config_templates/config.yml @@ -2106,9 +2106,8 @@ celery_broker_transport_options: The sentinel_kwargs parameter allows passing additional options to the Sentinel client. In a typical scenario where Redis Sentinel is used as the broker and Redis servers are password-protected, the password needs to be passed through this parameter. Although its - type is string, it is required to pass a string that conforms to the dictionary format, - which Airflow will perform special conversion on during reading. - version_added: 2.6.1 + type is string, it is required to pass a string that conforms to the dictionary format. + version_added: 2.7.0 type: string example: '{"password": "password_for_redis_server"}' default: ~ diff --git a/airflow/config_templates/default_airflow.cfg b/airflow/config_templates/default_airflow.cfg index afbed5e2dcdec..71c78b3d64d52 100644 --- a/airflow/config_templates/default_airflow.cfg +++ b/airflow/config_templates/default_airflow.cfg @@ -1080,8 +1080,7 @@ worker_precheck = False # The sentinel_kwargs parameter allows passing additional options to the Sentinel client. # In a typical scenario where Redis Sentinel is used as the broker and Redis servers are # password-protected, the password needs to be passed through this parameter. Although its -# type is string, it is required to pass a string that conforms to the dictionary format, -# which Airflow will perform special conversion on during reading. +# type is string, it is required to pass a string that conforms to the dictionary format. # Example: sentinel_kwargs = {{"password": "password_for_redis_server"}} # sentinel_kwargs = diff --git a/docs/apache-airflow/core-concepts/executor/celery.rst b/docs/apache-airflow/core-concepts/executor/celery.rst index e16cc7e3f873a..84c65dec90762 100644 --- a/docs/apache-airflow/core-concepts/executor/celery.rst +++ b/docs/apache-airflow/core-concepts/executor/celery.rst @@ -22,7 +22,7 @@ Celery Executor =============== ``CeleryExecutor`` is one of the ways you can scale out the number of workers. For this -to work, you need to setup a Celery backend (**RabbitMQ**, **Redis**, ...) and +to work, you need to setup a Celery backend (**RabbitMQ**, **Redis**, **Redis Sentinel** ...) and change your ``airflow.cfg`` to point the executor parameter to ``CeleryExecutor`` and provide the related Celery settings. @@ -83,6 +83,7 @@ Some caveats: - Make sure to use a database backed result backend - Make sure to set a visibility timeout in ``[celery_broker_transport_options]`` that exceeds the ETA of your longest running task +- Make sure to specify the password for Redis Server in the [celery_broker_transport_options] section if you are using Redis Sentinel as your broker and the Redis servers are password-protected - Make sure to set umask in ``[worker_umask]`` to set permissions for newly created files by workers. - Tasks can consume resources. Make sure your worker has enough resources to run ``worker_concurrency`` tasks - Queue names are limited to 256 characters, but each broker backend might have its own restrictions From 4a0c358b14784609715386d8254faf042d4df8b7 Mon Sep 17 00:00:00 2001 From: ORuteMa <30034544+orutema@users.noreply.github.com> Date: Fri, 5 May 2023 11:54:54 +0800 Subject: [PATCH 5/6] update celery docs link --- airflow/config_templates/config.yml | 4 +++- airflow/config_templates/default_airflow.cfg | 4 +++- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/airflow/config_templates/config.yml b/airflow/config_templates/config.yml index 3d990654f3014..3a5fe0df13023 100644 --- a/airflow/config_templates/config.yml +++ b/airflow/config_templates/config.yml @@ -2096,7 +2096,7 @@ celery_broker_transport_options: ETA you're planning to use. visibility_timeout is only supported for Redis and SQS celery brokers. See: - http://docs.celeryproject.org/en/master/userguide/configuration.html#std:setting-broker_transport_options + https://docs.celeryq.dev/en/stable/getting-started/backends-and-brokers/redis.html#visibility-timeout version_added: ~ type: string example: "21600" @@ -2107,6 +2107,8 @@ celery_broker_transport_options: In a typical scenario where Redis Sentinel is used as the broker and Redis servers are password-protected, the password needs to be passed through this parameter. Although its type is string, it is required to pass a string that conforms to the dictionary format. + See: + https://docs.celeryq.dev/en/stable/getting-started/backends-and-brokers/redis.html#configuration version_added: 2.7.0 type: string example: '{"password": "password_for_redis_server"}' diff --git a/airflow/config_templates/default_airflow.cfg b/airflow/config_templates/default_airflow.cfg index 71c78b3d64d52..515e0e3234937 100644 --- a/airflow/config_templates/default_airflow.cfg +++ b/airflow/config_templates/default_airflow.cfg @@ -1073,7 +1073,7 @@ worker_precheck = False # ETA you're planning to use. # visibility_timeout is only supported for Redis and SQS celery brokers. # See: -# http://docs.celeryproject.org/en/master/userguide/configuration.html#std:setting-broker_transport_options +# https://docs.celeryq.dev/en/stable/getting-started/backends-and-brokers/redis.html#visibility-timeout # Example: visibility_timeout = 21600 # visibility_timeout = @@ -1081,6 +1081,8 @@ worker_precheck = False # In a typical scenario where Redis Sentinel is used as the broker and Redis servers are # password-protected, the password needs to be passed through this parameter. Although its # type is string, it is required to pass a string that conforms to the dictionary format. +# See: +# https://docs.celeryq.dev/en/stable/getting-started/backends-and-brokers/redis.html#configuration # Example: sentinel_kwargs = {{"password": "password_for_redis_server"}} # sentinel_kwargs = From 22af23babffaef73bdc4a7a730ac2ff754baecc9 Mon Sep 17 00:00:00 2001 From: ORuteMa <30034544+orutema@users.noreply.github.com> Date: Fri, 5 May 2023 14:20:01 +0800 Subject: [PATCH 6/6] emphasize section --- docs/apache-airflow/core-concepts/executor/celery.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/apache-airflow/core-concepts/executor/celery.rst b/docs/apache-airflow/core-concepts/executor/celery.rst index 84c65dec90762..ca137fa8e3c3d 100644 --- a/docs/apache-airflow/core-concepts/executor/celery.rst +++ b/docs/apache-airflow/core-concepts/executor/celery.rst @@ -83,7 +83,7 @@ Some caveats: - Make sure to use a database backed result backend - Make sure to set a visibility timeout in ``[celery_broker_transport_options]`` that exceeds the ETA of your longest running task -- Make sure to specify the password for Redis Server in the [celery_broker_transport_options] section if you are using Redis Sentinel as your broker and the Redis servers are password-protected +- Make sure to specify the password for Redis Server in the ``[celery_broker_transport_options]`` section if you are using Redis Sentinel as your broker and the Redis servers are password-protected - Make sure to set umask in ``[worker_umask]`` to set permissions for newly created files by workers. - Tasks can consume resources. Make sure your worker has enough resources to run ``worker_concurrency`` tasks - Queue names are limited to 256 characters, but each broker backend might have its own restrictions