diff --git a/providers/apache/kafka/src/airflow/providers/apache/kafka/operators/consume.py b/providers/apache/kafka/src/airflow/providers/apache/kafka/operators/consume.py index 3dedbdca3f5c3..6abdb7ed8243d 100644 --- a/providers/apache/kafka/src/airflow/providers/apache/kafka/operators/consume.py +++ b/providers/apache/kafka/src/airflow/providers/apache/kafka/operators/consume.py @@ -21,8 +21,8 @@ from typing import Any from airflow.exceptions import AirflowException -from airflow.models import BaseOperator from airflow.providers.apache.kafka.hooks.consume import KafkaConsumerHook +from airflow.providers.apache.kafka.version_compat import BaseOperator from airflow.utils.module_loading import import_string VALID_COMMIT_CADENCE = {"never", "end_of_batch", "end_of_operator"} diff --git a/providers/apache/kafka/src/airflow/providers/apache/kafka/operators/produce.py b/providers/apache/kafka/src/airflow/providers/apache/kafka/operators/produce.py index 1dfc553f8a1e6..da460c3a87d2b 100644 --- a/providers/apache/kafka/src/airflow/providers/apache/kafka/operators/produce.py +++ b/providers/apache/kafka/src/airflow/providers/apache/kafka/operators/produce.py @@ -22,8 +22,8 @@ from typing import Any from airflow.exceptions import AirflowException -from airflow.models import BaseOperator from airflow.providers.apache.kafka.hooks.produce import KafkaProducerHook +from airflow.providers.apache.kafka.version_compat import BaseOperator from airflow.utils.module_loading import import_string local_logger = logging.getLogger("airflow") diff --git a/providers/apache/kafka/src/airflow/providers/apache/kafka/sensors/kafka.py b/providers/apache/kafka/src/airflow/providers/apache/kafka/sensors/kafka.py index afe196a29f064..7bf1886ea262c 100644 --- a/providers/apache/kafka/src/airflow/providers/apache/kafka/sensors/kafka.py +++ b/providers/apache/kafka/src/airflow/providers/apache/kafka/sensors/kafka.py @@ -19,8 +19,8 @@ from collections.abc import Callable, Sequence from typing import Any -from airflow.models import BaseOperator from airflow.providers.apache.kafka.triggers.await_message import AwaitMessageTrigger +from airflow.providers.apache.kafka.version_compat import BaseOperator VALID_COMMIT_CADENCE = {"never", "end_of_batch", "end_of_operator"} @@ -107,7 +107,7 @@ def execute(self, context) -> Any: def execute_complete(self, context, event=None): if self.xcom_push_key: - self.xcom_push(context, key=self.xcom_push_key, value=event) + context["task_instance"].xcom_push(key=self.xcom_push_key, value=event) return event diff --git a/providers/apache/kafka/src/airflow/providers/apache/kafka/version_compat.py b/providers/apache/kafka/src/airflow/providers/apache/kafka/version_compat.py new file mode 100644 index 0000000000000..e7a259afb357c --- /dev/null +++ b/providers/apache/kafka/src/airflow/providers/apache/kafka/version_compat.py @@ -0,0 +1,45 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +# NOTE! THIS FILE IS COPIED MANUALLY IN OTHER PROVIDERS DELIBERATELY TO AVOID ADDING UNNECESSARY +# DEPENDENCIES BETWEEN PROVIDERS. IF YOU WANT TO ADD CONDITIONAL CODE IN YOUR PROVIDER THAT DEPENDS +# ON AIRFLOW VERSION, PLEASE COPY THIS FILE TO THE ROOT PACKAGE OF YOUR PROVIDER AND IMPORT +# THOSE CONSTANTS FROM IT RATHER THAN IMPORTING THEM FROM ANOTHER PROVIDER OR TEST CODE +# +from __future__ import annotations + + +def get_base_airflow_version_tuple() -> tuple[int, int, int]: + from packaging.version import Version + + from airflow import __version__ + + airflow_version = Version(__version__) + return airflow_version.major, airflow_version.minor, airflow_version.micro + + +AIRFLOW_V_3_0_PLUS = get_base_airflow_version_tuple() >= (3, 0, 0) + +if AIRFLOW_V_3_0_PLUS: + from airflow.sdk import BaseOperator +else: + from airflow.models import BaseOperator + +__all__ = [ + "AIRFLOW_V_3_0_PLUS", + "BaseOperator", +]