Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion airflow/config_templates/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2096,11 +2096,23 @@ celery_broker_transport_options:
ETA you're planning to use.
visibility_timeout is only supported for Redis and SQS celery brokers.
See:
http://docs.celeryproject.org/en/master/userguide/configuration.html#std:setting-broker_transport_options
https://docs.celeryq.dev/en/stable/getting-started/backends-and-brokers/redis.html#visibility-timeout
version_added: ~
type: string
example: "21600"
default: ~
sentinel_kwargs:
description: |
The sentinel_kwargs parameter allows passing additional options to the Sentinel client.
In a typical scenario where Redis Sentinel is used as the broker and Redis servers are
password-protected, the password needs to be passed through this parameter. Although its
type is string, it is required to pass a string that conforms to the dictionary format.
See:
https://docs.celeryq.dev/en/stable/getting-started/backends-and-brokers/redis.html#configuration
version_added: 2.7.0
type: string
example: '{"password": "password_for_redis_server"}'
default: ~
dask:
description: |
This section only applies if you are using the DaskExecutor in
Expand Down
11 changes: 10 additions & 1 deletion airflow/config_templates/default_airflow.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -1073,10 +1073,19 @@ worker_precheck = False
# ETA you're planning to use.
# visibility_timeout is only supported for Redis and SQS celery brokers.
# See:
# http://docs.celeryproject.org/en/master/userguide/configuration.html#std:setting-broker_transport_options
# https://docs.celeryq.dev/en/stable/getting-started/backends-and-brokers/redis.html#visibility-timeout
# Example: visibility_timeout = 21600
# visibility_timeout =

# The sentinel_kwargs parameter allows passing additional options to the Sentinel client.
# In a typical scenario where Redis Sentinel is used as the broker and Redis servers are
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should add them description in config.yml -> this is the source of truth for this defailt config but also it is used to generate documentation automatically, so the comment you added here, should be added in config.yml and then pre-commit should automatically add it above and you should not need to add it manually here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, I don't add it manually. When I make modifications in config.yml and do a git commit, the pre-commit hook prevents my merge request and automatically generates this content in my git stage. Only after that, I can successfully commit without any pre-commit issues.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The pre-commit prevents you doing it and modifes it and ask you to commit the changes it has done when it add.
You need to:

  1. add changes to ,yml
  2. commit and let pre-commit add it to the config
  3. add the generated config changes it to the staging
  4. commit again

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah ... stupid me. yes you re right! I missed that it is example generated from the other part. Sorry for the confusion.

# password-protected, the password needs to be passed through this parameter. Although its
# type is string, it is required to pass a string that conforms to the dictionary format.
# See:
# https://docs.celeryq.dev/en/stable/getting-started/backends-and-brokers/redis.html#configuration
# Example: sentinel_kwargs = {{"password": "password_for_redis_server"}}
# sentinel_kwargs =

[dask]

# This section only applies if you are using the DaskExecutor in
Expand Down
16 changes: 13 additions & 3 deletions airflow/config_templates/default_celery.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@


def _broker_supports_visibility_timeout(url):
return url.startswith("redis://") or url.startswith("sqs://")
return url.startswith("redis://") or url.startswith("sqs://") or url.startswith("sentinel://")


log = logging.getLogger(__name__)
Expand All @@ -38,6 +38,16 @@ def _broker_supports_visibility_timeout(url):
if _broker_supports_visibility_timeout(broker_url):
broker_transport_options["visibility_timeout"] = 21600

broker_transport_options_for_celery: dict = dict.copy(broker_transport_options)
if "sentinel_kwargs" in broker_transport_options:
try:
sentinel_kwargs = conf.getjson("celery_broker_transport_options", "sentinel_kwargs")
if not isinstance(sentinel_kwargs, dict):
raise ValueError
broker_transport_options_for_celery["sentinel_kwargs"] = sentinel_kwargs
except Exception:
raise AirflowException("sentinel_kwargs should be written in the correct dictionary format.")

if conf.has_option("celery", "RESULT_BACKEND"):
result_backend = conf.get_mandatory_value("celery", "RESULT_BACKEND")
else:
Expand All @@ -53,7 +63,7 @@ def _broker_supports_visibility_timeout(url):
"task_default_exchange": conf.get("operators", "DEFAULT_QUEUE"),
"task_track_started": conf.getboolean("celery", "task_track_started"),
"broker_url": broker_url,
"broker_transport_options": broker_transport_options,
"broker_transport_options": broker_transport_options_for_celery,
"result_backend": result_backend,
"worker_concurrency": conf.getint("celery", "WORKER_CONCURRENCY"),
"worker_enable_remote_control": conf.getboolean("celery", "worker_enable_remote_control"),
Expand All @@ -74,7 +84,7 @@ def _broker_supports_visibility_timeout(url):
"ca_certs": conf.get("celery", "SSL_CACERT"),
"cert_reqs": ssl.CERT_REQUIRED,
}
elif broker_url and "redis://" in broker_url:
elif broker_url and ("redis://" in broker_url or "sentinel://" in broker_url):
broker_use_ssl = {
"ssl_keyfile": conf.get("celery", "SSL_KEY"),
"ssl_certfile": conf.get("celery", "SSL_CERT"),
Expand Down
3 changes: 2 additions & 1 deletion docs/apache-airflow/core-concepts/executor/celery.rst
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ Celery Executor
===============

``CeleryExecutor`` is one of the ways you can scale out the number of workers. For this
to work, you need to setup a Celery backend (**RabbitMQ**, **Redis**, ...) and
to work, you need to setup a Celery backend (**RabbitMQ**, **Redis**, **Redis Sentinel** ...) and
change your ``airflow.cfg`` to point the executor parameter to
``CeleryExecutor`` and provide the related Celery settings.

Expand Down Expand Up @@ -83,6 +83,7 @@ Some caveats:

- Make sure to use a database backed result backend
- Make sure to set a visibility timeout in ``[celery_broker_transport_options]`` that exceeds the ETA of your longest running task
- Make sure to specify the password for Redis Server in the ``[celery_broker_transport_options]`` section if you are using Redis Sentinel as your broker and the Redis servers are password-protected
- Make sure to set umask in ``[worker_umask]`` to set permissions for newly created files by workers.
- Tasks can consume resources. Make sure your worker has enough resources to run ``worker_concurrency`` tasks
- Queue names are limited to 256 characters, but each broker backend might have its own restrictions
Expand Down