diff --git a/airflow/providers/cncf/kubernetes/hooks/kubernetes.py b/airflow/providers/cncf/kubernetes/hooks/kubernetes.py index 56852fb1a200b..37b3b5529ebc7 100644 --- a/airflow/providers/cncf/kubernetes/hooks/kubernetes.py +++ b/airflow/providers/cncf/kubernetes/hooks/kubernetes.py @@ -18,7 +18,6 @@ import contextlib import tempfile -from functools import cached_property from typing import TYPE_CHECKING, Any, Generator from asgiref.sync import sync_to_async @@ -151,7 +150,7 @@ def get_connection(cls, conn_id: str) -> Connection: else: raise - @cached_property + @property def conn_extras(self): if self.conn_id: connection = self.get_connection(self.conn_id) @@ -263,16 +262,16 @@ def is_in_cluster(self) -> bool: assert self._is_in_cluster is not None return self._is_in_cluster - @cached_property + @property def api_client(self) -> client.ApiClient: """Cached Kubernetes API client.""" return self.get_conn() - @cached_property + @property def core_v1_client(self) -> client.CoreV1Api: return client.CoreV1Api(api_client=self.api_client) - @cached_property + @property def custom_object_client(self) -> client.CustomObjectsApi: return client.CustomObjectsApi(api_client=self.api_client) diff --git a/airflow/providers/cncf/kubernetes/operators/pod.py b/airflow/providers/cncf/kubernetes/operators/pod.py index 6e6acbec4e38a..1eb83d0cc7eb1 100644 --- a/airflow/providers/cncf/kubernetes/operators/pod.py +++ b/airflow/providers/cncf/kubernetes/operators/pod.py @@ -29,6 +29,7 @@ from functools import cached_property from typing import TYPE_CHECKING, Any, Iterable, Sequence +from kubernetes import config from kubernetes.client import CoreV1Api, models as k8s from slugify import slugify from urllib3.exceptions import HTTPError @@ -353,7 +354,7 @@ def __init__( self.volumes = [convert_volume(volume) for volume in volumes] if volumes else [] self.secrets = secrets or [] self.in_cluster = in_cluster - self.cluster_context = cluster_context + self.cluster_context = cluster_context or "" self.reattach_on_restart = reattach_on_restart self.get_logs = get_logs self.container_logs = container_logs @@ -492,11 +493,11 @@ def _get_ti_pod_labels(context: Context | None = None, include_try_number: bool labels[label_id] = safe_label return labels - @cached_property + @property def pod_manager(self) -> PodManager: - return PodManager(kube_client=self.client) + return PodManager(kube_client=self.client, cluster_context=self.cluster_context) - @cached_property + @property def hook(self) -> PodOperatorHookProtocol: hook = KubernetesHook( conn_id=self.kubernetes_conn_id, @@ -506,8 +507,9 @@ def hook(self) -> PodOperatorHookProtocol: ) return hook - @cached_property + @property def client(self) -> CoreV1Api: + config.load_kube_config(context=self.cluster_context) return self.hook.core_v1_client def find_pod(self, namespace: str, context: Context, *, exclude_checked: bool = True) -> k8s.V1Pod | None: diff --git a/airflow/providers/cncf/kubernetes/utils/delete_from.py b/airflow/providers/cncf/kubernetes/utils/delete_from.py index 98d1c4d6d9526..60eab746f786a 100644 --- a/airflow/providers/cncf/kubernetes/utils/delete_from.py +++ b/airflow/providers/cncf/kubernetes/utils/delete_from.py @@ -51,7 +51,6 @@ def delete_from_dict(k8s_client, data, body, namespace, verbose=False, **kwargs) except client.rest.ApiException as api_exception: api_exceptions.append(api_exception) else: - try: _delete_from_yaml_single_item( k8s_client=k8s_client, diff --git a/airflow/providers/cncf/kubernetes/utils/pod_manager.py b/airflow/providers/cncf/kubernetes/utils/pod_manager.py index c8ac74382d6db..f26b46cafed63 100644 --- a/airflow/providers/cncf/kubernetes/utils/pod_manager.py +++ b/airflow/providers/cncf/kubernetes/utils/pod_manager.py @@ -31,7 +31,7 @@ import pendulum import tenacity -from kubernetes import client, watch +from kubernetes import client, config, watch from kubernetes.client.models.v1_container_status import V1ContainerStatus from kubernetes.client.models.v1_pod import V1Pod from kubernetes.client.rest import ApiException @@ -43,6 +43,7 @@ from urllib3.response import HTTPResponse from airflow.exceptions import AirflowException, AirflowProviderDeprecationWarning +from airflow.providers.cncf.kubernetes.kube_client import get_kube_client from airflow.providers.cncf.kubernetes.pod_generator import PodDefaults from airflow.typing_compat import Literal, Protocol from airflow.utils.log.logging_mixin import LoggingMixin @@ -257,16 +258,14 @@ class PodLoggingStatus: class PodManager(LoggingMixin): """Create, monitor, and otherwise interact with Kubernetes pods for use with the KubernetesPodOperator.""" - def __init__( - self, - kube_client: client.CoreV1Api, - ): + def __init__(self, kube_client: client.CoreV1Api, cluster_context: str): """ Creates the launcher. :param kube_client: kubernetes client """ super().__init__() + self.my_cluster_context = cluster_context self._client = kube_client self._watch = watch.Watch() @@ -616,6 +615,8 @@ def read_pod_events(self, pod: V1Pod) -> CoreV1EventList: def read_pod(self, pod: V1Pod) -> V1Pod: """Read POD information.""" try: + config.load_kube_config(context=self.my_cluster_context) + self._client = get_kube_client(in_cluster=False, cluster_context=self.my_cluster_context) return self._client.read_namespaced_pod(pod.metadata.name, pod.metadata.namespace) except BaseHTTPError as e: raise AirflowException(f"There was an error reading the kubernetes API: {e}")