diff --git a/airflow/config_templates/config.yml b/airflow/config_templates/config.yml index f06ab2c0cfbd9..3a5fe0df13023 100644 --- a/airflow/config_templates/config.yml +++ b/airflow/config_templates/config.yml @@ -2096,11 +2096,23 @@ celery_broker_transport_options: ETA you're planning to use. visibility_timeout is only supported for Redis and SQS celery brokers. See: - http://docs.celeryproject.org/en/master/userguide/configuration.html#std:setting-broker_transport_options + https://docs.celeryq.dev/en/stable/getting-started/backends-and-brokers/redis.html#visibility-timeout version_added: ~ type: string example: "21600" default: ~ + sentinel_kwargs: + description: | + The sentinel_kwargs parameter allows passing additional options to the Sentinel client. + In a typical scenario where Redis Sentinel is used as the broker and Redis servers are + password-protected, the password needs to be passed through this parameter. Although its + type is string, it is required to pass a string that conforms to the dictionary format. + See: + https://docs.celeryq.dev/en/stable/getting-started/backends-and-brokers/redis.html#configuration + version_added: 2.7.0 + type: string + example: '{"password": "password_for_redis_server"}' + default: ~ dask: description: | This section only applies if you are using the DaskExecutor in diff --git a/airflow/config_templates/default_airflow.cfg b/airflow/config_templates/default_airflow.cfg index 14ef880aa1d3f..515e0e3234937 100644 --- a/airflow/config_templates/default_airflow.cfg +++ b/airflow/config_templates/default_airflow.cfg @@ -1073,10 +1073,19 @@ worker_precheck = False # ETA you're planning to use. # visibility_timeout is only supported for Redis and SQS celery brokers. # See: -# http://docs.celeryproject.org/en/master/userguide/configuration.html#std:setting-broker_transport_options +# https://docs.celeryq.dev/en/stable/getting-started/backends-and-brokers/redis.html#visibility-timeout # Example: visibility_timeout = 21600 # visibility_timeout = +# The sentinel_kwargs parameter allows passing additional options to the Sentinel client. +# In a typical scenario where Redis Sentinel is used as the broker and Redis servers are +# password-protected, the password needs to be passed through this parameter. Although its +# type is string, it is required to pass a string that conforms to the dictionary format. +# See: +# https://docs.celeryq.dev/en/stable/getting-started/backends-and-brokers/redis.html#configuration +# Example: sentinel_kwargs = {{"password": "password_for_redis_server"}} +# sentinel_kwargs = + [dask] # This section only applies if you are using the DaskExecutor in diff --git a/airflow/config_templates/default_celery.py b/airflow/config_templates/default_celery.py index d3d5a4adf11e2..9ec320097e498 100644 --- a/airflow/config_templates/default_celery.py +++ b/airflow/config_templates/default_celery.py @@ -26,7 +26,7 @@ def _broker_supports_visibility_timeout(url): - return url.startswith("redis://") or url.startswith("sqs://") + return url.startswith("redis://") or url.startswith("sqs://") or url.startswith("sentinel://") log = logging.getLogger(__name__) @@ -38,6 +38,16 @@ def _broker_supports_visibility_timeout(url): if _broker_supports_visibility_timeout(broker_url): broker_transport_options["visibility_timeout"] = 21600 +broker_transport_options_for_celery: dict = dict.copy(broker_transport_options) +if "sentinel_kwargs" in broker_transport_options: + try: + sentinel_kwargs = conf.getjson("celery_broker_transport_options", "sentinel_kwargs") + if not isinstance(sentinel_kwargs, dict): + raise ValueError + broker_transport_options_for_celery["sentinel_kwargs"] = sentinel_kwargs + except Exception: + raise AirflowException("sentinel_kwargs should be written in the correct dictionary format.") + if conf.has_option("celery", "RESULT_BACKEND"): result_backend = conf.get_mandatory_value("celery", "RESULT_BACKEND") else: @@ -53,7 +63,7 @@ def _broker_supports_visibility_timeout(url): "task_default_exchange": conf.get("operators", "DEFAULT_QUEUE"), "task_track_started": conf.getboolean("celery", "task_track_started"), "broker_url": broker_url, - "broker_transport_options": broker_transport_options, + "broker_transport_options": broker_transport_options_for_celery, "result_backend": result_backend, "worker_concurrency": conf.getint("celery", "WORKER_CONCURRENCY"), "worker_enable_remote_control": conf.getboolean("celery", "worker_enable_remote_control"), @@ -74,7 +84,7 @@ def _broker_supports_visibility_timeout(url): "ca_certs": conf.get("celery", "SSL_CACERT"), "cert_reqs": ssl.CERT_REQUIRED, } - elif broker_url and "redis://" in broker_url: + elif broker_url and ("redis://" in broker_url or "sentinel://" in broker_url): broker_use_ssl = { "ssl_keyfile": conf.get("celery", "SSL_KEY"), "ssl_certfile": conf.get("celery", "SSL_CERT"), diff --git a/docs/apache-airflow/core-concepts/executor/celery.rst b/docs/apache-airflow/core-concepts/executor/celery.rst index e16cc7e3f873a..ca137fa8e3c3d 100644 --- a/docs/apache-airflow/core-concepts/executor/celery.rst +++ b/docs/apache-airflow/core-concepts/executor/celery.rst @@ -22,7 +22,7 @@ Celery Executor =============== ``CeleryExecutor`` is one of the ways you can scale out the number of workers. For this -to work, you need to setup a Celery backend (**RabbitMQ**, **Redis**, ...) and +to work, you need to setup a Celery backend (**RabbitMQ**, **Redis**, **Redis Sentinel** ...) and change your ``airflow.cfg`` to point the executor parameter to ``CeleryExecutor`` and provide the related Celery settings. @@ -83,6 +83,7 @@ Some caveats: - Make sure to use a database backed result backend - Make sure to set a visibility timeout in ``[celery_broker_transport_options]`` that exceeds the ETA of your longest running task +- Make sure to specify the password for Redis Server in the ``[celery_broker_transport_options]`` section if you are using Redis Sentinel as your broker and the Redis servers are password-protected - Make sure to set umask in ``[worker_umask]`` to set permissions for newly created files by workers. - Tasks can consume resources. Make sure your worker has enough resources to run ``worker_concurrency`` tasks - Queue names are limited to 256 characters, but each broker backend might have its own restrictions