Skip to content

2.7.0 db migration job fails with CeleryKubernetesExecutor and Sentry #33651

@LipuFei

Description

@LipuFei

Apache Airflow version

2.7.0

What happened

There are actually 2 issues.

Issue 1: Circular import

I installed Airflow on our Kubernetes cluster with CeleryKubernetesExecutor and with Sentry enabled. The database migration job fails due to circular imports. Here is the log:

....................
ERROR! Maximum number of retries (20) reached.

Last check result:
$ airflow db check
ERROR:airflow.logging_config:Unable to load the config, contains a configuration error.
Traceback (most recent call last):
  File "/usr/local/lib/python3.11/logging/config.py", line 400, in resolve
    found = getattr(found, frag)
            ^^^^^^^^^^^^^^^^^^^^
AttributeError: module 'airflow.providers.amazon.aws.log' has no attribute 'cloudwatch_task_handler'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.11/site-packages/airflow/providers/celery/executors/celery_kubernetes_executor.py", line 29, in <module>
    from airflow.providers.cncf.kubernetes.executors.kubernetes_executor import KubernetesExecutor
  File "/home/airflow/.local/lib/python3.11/site-packages/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py", line 39, in <module>
    from airflow import AirflowException
ImportError: cannot import name 'AirflowException' from partially initialized module 'airflow' (most likely due to a circular import) (/home/airflow/.local/lib/python3.11/site-packages/airflow/__init__.py)

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/lib/python3.11/logging/config.py", line 573, in configure
    handler = self.configure_handler(handlers[name])
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/logging/config.py", line 735, in configure_handler
    klass = self.resolve(cname)
            ^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/logging/config.py", line 402, in resolve
    self.importer(used)
  File "/home/airflow/.local/lib/python3.11/site-packages/airflow/providers/amazon/aws/log/cloudwatch_task_handler.py", line 26, in <module>
    from airflow.models import TaskInstance
  File "<frozen importlib._bootstrap>", line 1229, in _handle_fromlist
  File "/home/airflow/.local/lib/python3.11/site-packages/airflow/models/__init__.py", line 76, in __getattr__
    val = import_string(f"{path}.{name}")
          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.11/site-packages/airflow/utils/module_loading.py", line 37, in import_string
    module = import_module(module_path)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/importlib/__init__.py", line 126, in import_module
    return _bootstrap._gcd_import(name[level:], package, level)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 102, in <module>
    from airflow.sentry import Sentry
  File "/home/airflow/.local/lib/python3.11/site-packages/airflow/sentry.py", line 199, in <module>
    Sentry = ConfiguredSentry()
             ^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.11/site-packages/airflow/sentry.py", line 90, in __init__
    executor_class, _ = ExecutorLoader.import_default_executor_cls()
                        ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.11/site-packages/airflow/executors/executor_loader.py", line 169, in import_default_executor_cls
    executor, source = cls.import_executor_cls(executor_name, validate=validate)
                       ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.11/site-packages/airflow/executors/executor_loader.py", line 143, in import_executor_cls
    return _import_and_validate(cls.executors[executor_name]), ConnectorSource.CORE
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.11/site-packages/airflow/executors/executor_loader.py", line 137, in _import_and_validate
    executor = import_string(path)
               ^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.11/site-packages/airflow/utils/module_loading.py", line 37, in import_string
    module = import_module(module_path)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/importlib/__init__.py", line 126, in import_module
    return _bootstrap._gcd_import(name[level:], package, level)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.11/site-packages/airflow/providers/celery/executors/celery_kubernetes_executor.py", line 33, in <module>
    raise AirflowOptionalProviderFeatureException(e)
airflow.exceptions.AirflowOptionalProviderFeatureException: cannot import name 'AirflowException' from partially initialized module 'airflow' (most likely due to a circular import) (/home/airflow/.local/lib/python3.11/site-packages/airflow/__init__.py)

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/home/airflow/.local/bin/airflow", line 5, in <module>
    from airflow.__main__ import main
  File "/home/airflow/.local/lib/python3.11/site-packages/airflow/__init__.py", line 68, in <module>
    settings.initialize()
  File "/home/airflow/.local/lib/python3.11/site-packages/airflow/settings.py", line 524, in initialize
    LOGGING_CLASS_PATH = configure_logging()
                         ^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.11/site-packages/airflow/logging_config.py", line 74, in configure_logging
    raise e
  File "/home/airflow/.local/lib/python3.11/site-packages/airflow/logging_config.py", line 69, in configure_logging
    dictConfig(logging_config)
  File "/usr/local/lib/python3.11/logging/config.py", line 823, in dictConfig
    dictConfigClass(config).configure()
  File "/usr/local/lib/python3.11/logging/config.py", line 580, in configure
    raise ValueError('Unable to configure handler '
ValueError: Unable to configure handler 'task'

This can be fixed by changing the import to from airflow.exceptions import AirflowException

Issue 2: Sentry imports executor with validate=True will fails

I created a custom docker image with the import fix from airflow.exceptions import AirflowException. Then I get another error:

ERROR:airflow.logging_config:Unable to load the config, contains a configuration error.
Traceback (most recent call last):
  File "/usr/local/lib/python3.11/logging/config.py", line 400, in resolve
    found = getattr(found, frag)
            ^^^^^^^^^^^^^^^^^^^^
AttributeError: module 'airflow.providers.amazon.aws.log' has no attribute 'cloudwatch_task_handler'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/lib/python3.11/logging/config.py", line 402, in resolve
    self.importer(used)
  File "/home/airflow/.local/lib/python3.11/site-packages/airflow/providers/amazon/aws/log/cloudwatch_task_handler.py", line 26, in <module>
    from airflow.models import TaskInstance
  File "/home/airflow/.local/lib/python3.11/site-packages/airflow/models/__init__.py", line 76, in __getattr__
    val = import_string(f"{path}.{name}")
          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.11/site-packages/airflow/utils/module_loading.py", line 37, in import_string
    module = import_module(module_path)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/importlib/__init__.py", line 126, in import_module
    return _bootstrap._gcd_import(name[level:], package, level)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 102, in <module>
    from airflow.sentry import Sentry
  File "/home/airflow/.local/lib/python3.11/site-packages/airflow/sentry.py", line 199, in <module>
    Sentry = ConfiguredSentry()
             ^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.11/site-packages/airflow/sentry.py", line 90, in __init__
    executor_class, _ = ExecutorLoader.import_default_executor_cls()
                        ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.11/site-packages/airflow/executors/executor_loader.py", line 169, in import_default_executor_cls
    executor, source = cls.import_executor_cls(executor_name, validate=validate)
                       ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.11/site-packages/airflow/executors/executor_loader.py", line 143, in import_executor_cls
    return _import_and_validate(cls.executors[executor_name]), ConnectorSource.CORE
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.11/site-packages/airflow/executors/executor_loader.py", line 139, in _import_and_validate
    cls.validate_database_executor_compatibility(executor)
  File "/home/airflow/.local/lib/python3.11/site-packages/airflow/executors/executor_loader.py", line 192, in validate_database_executor_compatibility
    from airflow.settings import engine
ImportError: cannot import name 'engine' from 'airflow.settings' (/home/airflow/.local/lib/python3.11/site-packages/airflow/settings.py)

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/usr/local/lib/python3.11/logging/config.py", line 573, in configure
    handler = self.configure_handler(handlers[name])
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/logging/config.py", line 735, in configure_handler
    klass = self.resolve(cname)
            ^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/logging/config.py", line 407, in resolve
    raise v from e
ValueError: Cannot resolve 'airflow.providers.amazon.aws.log.cloudwatch_task_handler.CloudwatchTaskHandler': cannot import name 'engine' from 'airflow.settings' (/home/airflow/.local/lib/python3.11/site-packages/airflow/settings.py)

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/home/airflow/.local/bin/airflow", line 5, in <module>
    from airflow.__main__ import main
  File "/home/airflow/.local/lib/python3.11/site-packages/airflow/__init__.py", line 68, in <module>
    settings.initialize()
  File "/home/airflow/.local/lib/python3.11/site-packages/airflow/settings.py", line 523, in initialize
    LOGGING_CLASS_PATH = configure_logging()
                         ^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.11/site-packages/airflow/logging_config.py", line 74, in configure_logging
    raise e
  File "/home/airflow/.local/lib/python3.11/site-packages/airflow/logging_config.py", line 69, in configure_logging
    dictConfig(logging_config)
  File "/usr/local/lib/python3.11/logging/config.py", line 823, in dictConfig
    dictConfigClass(config).configure()
  File "/usr/local/lib/python3.11/logging/config.py", line 580, in configure
    raise ValueError('Unable to configure handler '
ValueError: Unable to configure handler 'task'

This time, there's no reason saying why it cannot import.

I think this is because it's still in the configuration phase, and the SQL engine

engine: Engine
has not been initialised yet.

After some digging, I found this fcbbf47 commit and looks like we should avoid SQL validation during the configuration phase, So, I think this probably can be fixed by adding a validate=False to the executor_class, _ = ExecutorLoader.import_default_executor_cls() on https://github.com/apache/airflow/blob/c8a37b6677a6bafa2d6a46874e856de0d6ce04fd/airflow/sentry.py#L90C13-L90C77

I have tested this and it works. I will create PR with the fixes mentioned here.

What you think should happen instead

Database migration job should succeed without errors.

How to reproduce

Installed with Airflow Helm chart 1.10.0 and Airflow docker image 2.7.0 python-3.11. Using CeleryKubernetesExecutor, Sentry, and CloudWatch Logs for logging.

The database migration job always fail.

Operating System

Airflow 2.7.0 Python 3.11 docker image

Versions of Apache Airflow Providers

No response

Deployment

Official Apache Airflow Helm Chart

Deployment details

Installed with Airflow Helm chart 1.10.0 and Airflow docker image 2.7.0 python-3.11. Using CeleryKubernetesExecutor, Sentry, and CloudWatch Logs for logging.

Anything else

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

No one assigned

    Labels

    area:corekind:bugThis is a clearly a bugneeds-triagelabel for new issues that we didn't triage yet

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions