Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 33 additions & 40 deletions airflow/providers/cncf/kubernetes/hooks/kubernetes.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
LOADING_KUBE_CONFIG_FILE_RESOURCE = "Loading Kubernetes configuration file kube_config from {}..."


def _load_body_to_dict(body: str) -> dict:
def _load_body_to_dict(body):
try:
body_dict = yaml.safe_load(body)
except yaml.YAMLError as e:
Expand Down Expand Up @@ -287,22 +287,37 @@ def create_custom_object(
:param namespace: kubernetes namespace
"""
api: client.CustomObjectsApi = self.custom_object_client
namespace = namespace or self.get_namespace() or self.DEFAULT_NAMESPACE

if isinstance(body, str):
body_dict = _load_body_to_dict(body)
else:
body_dict = body

response = api.create_namespaced_custom_object(
group=group,
version=version,
namespace=namespace or self.get_namespace() or self.DEFAULT_NAMESPACE,
plural=plural,
body=body_dict,
)
# Attribute "name" is not mandatory if "generateName" is used instead
if "name" in body_dict["metadata"]:
try:
api.delete_namespaced_custom_object(
group=group,
version=version,
namespace=namespace,
plural=plural,
name=body_dict["metadata"]["name"],
)

self.log.warning("Deleted SparkApplication with the same name")
except client.rest.ApiException:
self.log.info("SparkApplication %s not found", body_dict["metadata"]["name"])

self.log.debug("Response: %s", response)
return response
try:
response = api.create_namespaced_custom_object(
group=group, version=version, namespace=namespace, plural=plural, body=body_dict
)

self.log.debug("Response: %s", response)
return response
except client.rest.ApiException as e:
raise AirflowException(f"Exception when calling -> create_custom_object: {e}\n")

def get_custom_object(
self, group: str, version: str, plural: str, name: str, namespace: str | None = None
Expand All @@ -317,36 +332,14 @@ def get_custom_object(
:param namespace: kubernetes namespace
"""
api = client.CustomObjectsApi(self.api_client)
response = api.get_namespaced_custom_object(
group=group,
version=version,
namespace=namespace or self.get_namespace() or self.DEFAULT_NAMESPACE,
plural=plural,
name=name,
)
return response

def delete_custom_object(
self, group: str, version: str, plural: str, name: str, namespace: str | None = None, **kwargs
):
"""
Delete custom resource definition object from Kubernetes.

:param group: api group
:param version: api version
:param plural: api plural
:param name: crd object name
:param namespace: kubernetes namespace
"""
api = client.CustomObjectsApi(self.api_client)
return api.delete_namespaced_custom_object(
group=group,
version=version,
namespace=namespace or self.get_namespace() or self.DEFAULT_NAMESPACE,
plural=plural,
name=name,
**kwargs,
)
namespace = namespace or self.get_namespace() or self.DEFAULT_NAMESPACE
try:
response = api.get_namespaced_custom_object(
group=group, version=version, namespace=namespace, plural=plural, name=name
)
return response
except client.rest.ApiException as e:
raise AirflowException(f"Exception when calling -> get_custom_object: {e}\n")

def get_namespace(self) -> str | None:
"""Returns the namespace that defined in the connection."""
Expand Down
65 changes: 8 additions & 57 deletions airflow/providers/cncf/kubernetes/operators/spark_kubernetes.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,8 @@

from typing import TYPE_CHECKING, Sequence

from kubernetes.watch import Watch

from airflow.models import BaseOperator
from airflow.providers.cncf.kubernetes.hooks.kubernetes import KubernetesHook, _load_body_to_dict
from airflow.providers.cncf.kubernetes.hooks.kubernetes import KubernetesHook

if TYPE_CHECKING:
from airflow.utils.context import Context
Expand Down Expand Up @@ -57,71 +55,24 @@ def __init__(
kubernetes_conn_id: str = "kubernetes_default",
api_group: str = "sparkoperator.k8s.io",
api_version: str = "v1beta2",
in_cluster: bool | None = None,
cluster_context: str | None = None,
config_file: str | None = None,
**kwargs,
) -> None:
super().__init__(**kwargs)
self.application_file = application_file
self.namespace = namespace
self.kubernetes_conn_id = kubernetes_conn_id
self.api_group = api_group
self.api_version = api_version
self.plural = "sparkapplications"
self.application_file = application_file
self.in_cluster = in_cluster
self.cluster_context = cluster_context
self.config_file = config_file

self.hook = KubernetesHook(
conn_id=self.kubernetes_conn_id,
in_cluster=self.in_cluster,
config_file=self.config_file,
cluster_context=self.cluster_context,
)

def execute(self, context: Context):
body = _load_body_to_dict(self.application_file)
name = body["metadata"]["name"]
namespace = self.namespace or self.hook.get_namespace()
namespace_event_stream = Watch().stream(
self.hook.core_v1_client.list_namespaced_pod,
namespace=namespace,
_preload_content=False,
watch=True,
label_selector=f"sparkoperator.k8s.io/app-name={name},spark-role=driver",
field_selector="status.phase=Running",
)

self.hook.create_custom_object(
group=self.api_group,
version=self.api_version,
plural=self.plural,
body=body,
namespace=namespace,
)
for event in namespace_event_stream:
if event["type"] == "ADDED":
pod_log_stream = Watch().stream(
self.hook.core_v1_client.read_namespaced_pod_log,
name=f"{name}-driver",
namespace=namespace,
_preload_content=False,
timestamps=True,
)
for line in pod_log_stream:
self.log.info(line)
else:
break

def on_kill(self) -> None:
body = _load_body_to_dict(self.application_file)
name = body["metadata"]["name"]
namespace = self.namespace or self.hook.get_namespace()
self.hook.delete_custom_object(
hook = KubernetesHook(conn_id=self.kubernetes_conn_id)
self.log.info("Creating sparkApplication")
response = hook.create_custom_object(
group=self.api_group,
version=self.api_version,
plural=self.plural,
namespace=namespace,
name=name,
body=self.application_file,
namespace=self.namespace,
)
return response
63 changes: 53 additions & 10 deletions tests/providers/apache/flink/operators/test_flink_kubernetes.py
Original file line number Diff line number Diff line change
Expand Up @@ -197,8 +197,11 @@ def setup_method(self):
args = {"owner": "airflow", "start_date": timezone.datetime(2020, 2, 1)}
self.dag = DAG("test_dag_id", default_args=args)

@patch("kubernetes.client.api.custom_objects_api.CustomObjectsApi.delete_namespaced_custom_object")
@patch("kubernetes.client.api.custom_objects_api.CustomObjectsApi.create_namespaced_custom_object")
def test_create_application_from_yaml(self, mock_create_namespaced_crd, mock_kubernetes_hook):
def test_create_application_from_yaml(
self, mock_create_namespaced_crd, mock_delete_namespaced_crd, mock_kubernetes_hook
):
op = FlinkKubernetesOperator(
application_file=TEST_VALID_APPLICATION_YAML,
dag=self.dag,
Expand All @@ -207,7 +210,13 @@ def test_create_application_from_yaml(self, mock_create_namespaced_crd, mock_kub
)
op.execute(None)
mock_kubernetes_hook.assert_called_once_with()

mock_delete_namespaced_crd.assert_called_once_with(
group="flink.apache.org",
namespace="default",
plural="flinkdeployments",
version="v1beta1",
name=TEST_APPLICATION_DICT["metadata"]["name"],
)
mock_create_namespaced_crd.assert_called_with(
body=TEST_APPLICATION_DICT,
group="flink.apache.org",
Expand All @@ -216,8 +225,11 @@ def test_create_application_from_yaml(self, mock_create_namespaced_crd, mock_kub
version="v1beta1",
)

@patch("kubernetes.client.api.custom_objects_api.CustomObjectsApi.delete_namespaced_custom_object")
@patch("kubernetes.client.api.custom_objects_api.CustomObjectsApi.create_namespaced_custom_object")
def test_create_application_from_json(self, mock_create_namespaced_crd, mock_kubernetes_hook):
def test_create_application_from_json(
self, mock_create_namespaced_crd, mock_delete_namespaced_crd, mock_kubernetes_hook
):
op = FlinkKubernetesOperator(
application_file=TEST_VALID_APPLICATION_JSON,
dag=self.dag,
Expand All @@ -226,7 +238,13 @@ def test_create_application_from_json(self, mock_create_namespaced_crd, mock_kub
)
op.execute(None)
mock_kubernetes_hook.assert_called_once_with()

mock_delete_namespaced_crd.assert_called_once_with(
group="flink.apache.org",
namespace="default",
plural="flinkdeployments",
version="v1beta1",
name=TEST_APPLICATION_DICT["metadata"]["name"],
)
mock_create_namespaced_crd.assert_called_with(
body=TEST_APPLICATION_DICT,
group="flink.apache.org",
Expand All @@ -235,9 +253,10 @@ def test_create_application_from_json(self, mock_create_namespaced_crd, mock_kub
version="v1beta1",
)

@patch("kubernetes.client.api.custom_objects_api.CustomObjectsApi.delete_namespaced_custom_object")
@patch("kubernetes.client.api.custom_objects_api.CustomObjectsApi.create_namespaced_custom_object")
def test_create_application_from_json_with_api_group_and_version(
self, mock_create_namespaced_crd, mock_kubernetes_hook
self, mock_create_namespaced_crd, mock_delete_namespaced_crd, mock_kubernetes_hook
):
api_group = "flink.apache.org"
api_version = "v1beta1"
Expand All @@ -251,7 +270,13 @@ def test_create_application_from_json_with_api_group_and_version(
)
op.execute(None)
mock_kubernetes_hook.assert_called_once_with()

mock_delete_namespaced_crd.assert_called_once_with(
group=api_group,
namespace="default",
plural="flinkdeployments",
version=api_version,
name=TEST_APPLICATION_DICT["metadata"]["name"],
)
mock_create_namespaced_crd.assert_called_with(
body=TEST_APPLICATION_DICT,
group=api_group,
Expand All @@ -260,8 +285,11 @@ def test_create_application_from_json_with_api_group_and_version(
version=api_version,
)

@patch("kubernetes.client.api.custom_objects_api.CustomObjectsApi.delete_namespaced_custom_object")
@patch("kubernetes.client.api.custom_objects_api.CustomObjectsApi.create_namespaced_custom_object")
def test_namespace_from_operator(self, mock_create_namespaced_crd, mock_kubernetes_hook):
def test_namespace_from_operator(
self, mock_create_namespaced_crd, mock_delete_namespaced_crd, mock_kubernetes_hook
):
op = FlinkKubernetesOperator(
application_file=TEST_VALID_APPLICATION_JSON,
dag=self.dag,
Expand All @@ -271,7 +299,13 @@ def test_namespace_from_operator(self, mock_create_namespaced_crd, mock_kubernet
)
op.execute(None)
mock_kubernetes_hook.assert_called_once_with()

mock_delete_namespaced_crd.assert_called_once_with(
group="flink.apache.org",
namespace="operator_namespace",
plural="flinkdeployments",
version="v1beta1",
name=TEST_APPLICATION_DICT["metadata"]["name"],
)
mock_create_namespaced_crd.assert_called_with(
body=TEST_APPLICATION_DICT,
group="flink.apache.org",
Expand All @@ -280,8 +314,11 @@ def test_namespace_from_operator(self, mock_create_namespaced_crd, mock_kubernet
version="v1beta1",
)

@patch("kubernetes.client.api.custom_objects_api.CustomObjectsApi.delete_namespaced_custom_object")
@patch("kubernetes.client.api.custom_objects_api.CustomObjectsApi.create_namespaced_custom_object")
def test_namespace_from_connection(self, mock_create_namespaced_crd, mock_kubernetes_hook):
def test_namespace_from_connection(
self, mock_create_namespaced_crd, mock_delete_namespaced_crd, mock_kubernetes_hook
):
op = FlinkKubernetesOperator(
application_file=TEST_VALID_APPLICATION_JSON,
dag=self.dag,
Expand All @@ -291,7 +328,13 @@ def test_namespace_from_connection(self, mock_create_namespaced_crd, mock_kubern
op.execute(None)

mock_kubernetes_hook.assert_called_once_with()

mock_delete_namespaced_crd.assert_called_once_with(
group="flink.apache.org",
namespace="mock_namespace",
plural="flinkdeployments",
version="v1beta1",
name=TEST_APPLICATION_DICT["metadata"]["name"],
)
mock_create_namespaced_crd.assert_called_with(
body=TEST_APPLICATION_DICT,
group="flink.apache.org",
Expand Down
25 changes: 0 additions & 25 deletions tests/providers/cncf/kubernetes/hooks/test_kubernetes.py
Original file line number Diff line number Diff line change
Expand Up @@ -373,31 +373,6 @@ def test_missing_default_connection_is_ok(self, remove_default_conn):
with pytest.raises(AirflowNotFoundException, match="The conn_id `some_conn` isn't defined"):
hook.conn_extras

@patch("kubernetes.config.kube_config.KubeConfigLoader")
@patch("kubernetes.config.kube_config.KubeConfigMerger")
@patch(f"{HOOK_MODULE}.client.CustomObjectsApi")
def test_delete_custom_object(
self, mock_custom_object_api, mock_kube_config_merger, mock_kube_config_loader
):
hook = KubernetesHook()
hook.delete_custom_object(
group="group",
version="version",
plural="plural",
name="name",
namespace="namespace",
_preload_content="_preload_content",
)

mock_custom_object_api.return_value.delete_namespaced_custom_object.assert_called_once_with(
group="group",
version="version",
plural="plural",
name="name",
namespace="namespace",
_preload_content="_preload_content",
)


class TestKubernetesHookIncorrectConfiguration:
@pytest.mark.parametrize(
Expand Down
Loading