Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,7 @@ def __init__(
termination_grace_period: int | None = None,
configmaps: list[str] | None = None,
resources: dict[str, Any] | None = None,
istio_enabled: bool = False,
**kwargs,
) -> None:

Expand Down Expand Up @@ -287,6 +288,7 @@ def __init__(
self.termination_grace_period = termination_grace_period
self.pod_request_obj: k8s.V1Pod | None = None
self.pod: k8s.V1Pod | None = None
self.istio_enabled = istio_enabled

def _render_nested_template_fields(
self,
Expand Down Expand Up @@ -342,7 +344,7 @@ def _get_ti_pod_labels(context: Context | None = None, include_try_number: bool

@cached_property
def pod_manager(self) -> PodManager:
return PodManager(kube_client=self.client)
return PodManager(kube_client=self.client, istio_enabled=self.istio_enabled)

def get_hook(self):
warnings.warn("get_hook is deprecated. Please use hook instead.", DeprecationWarning, stacklevel=2)
Expand Down Expand Up @@ -453,7 +455,7 @@ def cleanup(self, pod: k8s.V1Pod, remote_pod: k8s.V1Pod):
if not self.is_delete_operator_pod:
with _suppress(Exception):
self.patch_already_checked(remote_pod)
if pod_phase != PodPhase.SUCCEEDED:
if (not self.istio_enabled and pod_phase != PodPhase.SUCCEEDED) or (self.istio_enabled and pod_phase != PodPhase.SUCCEEDED and not self.pod_manager.container_is_succeeded(pod, 'base')):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you will need to enable pre-commit locally see https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst

Copy link
Contributor

@dstandish dstandish Oct 26, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this expression can probably be simplified. i would try to do so. it's too hard to understand as it is.

if self.log_events_on_failure:
with _suppress(Exception):
for event in self.pod_manager.read_pod_events(pod).items:
Expand Down
46 changes: 46 additions & 0 deletions airflow/providers/cncf/kubernetes/utils/pod_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,36 @@ def container_is_running(pod: V1Pod, container_name: str) -> bool:
return container_status.state.running is not None


def container_is_completed(pod: V1Pod, container_name: str) -> bool:
"""
Examines V1Pod ``pod`` to determine whether ``container_name`` is completed.
If that container is present and completed, returns True. Returns False otherwise.
"""
container_statuses = pod.status.container_statuses if pod and pod.status else None
if not container_statuses:
return False
container_status = next(iter([x for x in container_statuses if x.name == container_name]), None)
if not container_status:
return False
return container_status.state.terminated is not None


def container_is_succeeded(pod: V1Pod, container_name: str) -> bool:
"""
Examines V1Pod ``pod`` to determine whether ``container_name`` is completed and succeeded.
If that container is present and completed and succeeded, returns True. Returns False otherwise.
"""
if not container_is_completed(pod, container_name):
return False
container_statuses = pod.status.container_statuses if pod and pod.status else None
if not container_statuses:
return False
container_status = next(iter([x for x in container_statuses if x.name == container_name]), None)
if not container_status:
return False
return container_status.state.terminated.exit_code == 0
Comment on lines +87 to +114
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems you are duplicating a lot of logic here. looks like you could pull out a get_container_status function



def get_container_termination_message(pod: V1Pod, container_name: str):
try:
container_statuses = pod.status.container_statuses
Expand Down Expand Up @@ -112,6 +142,7 @@ def __init__(
kube_client: client.CoreV1Api = None,
in_cluster: bool = True,
cluster_context: str | None = None,
istio_enabled: bool = False,
):
"""
Creates the launcher.
Expand All @@ -123,6 +154,7 @@ def __init__(
super().__init__()
self._client = kube_client or get_kube_client(in_cluster=in_cluster, cluster_context=cluster_context)
self._watch = watch.Watch()
self.istio_enabled = istio_enabled

def run_pod_async(self, pod: V1Pod, **kwargs) -> V1Pod:
"""Runs POD asynchronously"""
Expand Down Expand Up @@ -272,6 +304,10 @@ def await_pod_completion(self, pod: V1Pod) -> V1Pod:
remote_pod = self.read_pod(pod)
if remote_pod.status.phase in PodPhase.terminal_states:
break
if self.istio_enabled and remote_pod.status.phase == PodPhase.RUNNING and self.container_is_completed(
remote_pod, 'base'
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

'base' is defined in a constant somewhere

):
break
self.log.info('Pod %s has phase %s', pod.metadata.name, remote_pod.status.phase)
time.sleep(2)
return remote_pod
Expand Down Expand Up @@ -306,6 +342,16 @@ def container_is_running(self, pod: V1Pod, container_name: str) -> bool:
remote_pod = self.read_pod(pod)
return container_is_running(pod=remote_pod, container_name=container_name)

def container_is_completed(self, pod: V1Pod, container_name: str) -> bool:
"""Reads pod and checks if container is completed"""
remote_pod = self.read_pod(pod)
return container_is_completed(pod=remote_pod, container_name=container_name)

def container_is_succeeded(self, pod: V1Pod, container_name: str) -> bool:
"""Reads pod and checks if container is succeeded"""
remote_pod = self.read_pod(pod)
return container_is_succeeded(pod=remote_pod, container_name=container_name)

@tenacity.retry(stop=tenacity.stop_after_attempt(3), wait=tenacity.wait_exponential(), reraise=True)
def read_pod_logs(
self,
Expand Down
124 changes: 123 additions & 1 deletion tests/providers/cncf/kubernetes/utils/test_pod_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,13 @@
from urllib3.exceptions import HTTPError as BaseHTTPError

from airflow.exceptions import AirflowException
from airflow.providers.cncf.kubernetes.utils.pod_manager import PodManager, PodPhase, container_is_running
from airflow.providers.cncf.kubernetes.utils.pod_manager import (
PodManager,
PodPhase,
container_is_completed,
container_is_running,
container_is_succeeded,
)


class TestPodManager:
Expand Down Expand Up @@ -388,3 +394,119 @@ def test_container_is_running(remote_pod, result):
an object `e` such that `e.status.container_statuses` is None, and so on. This test
verifies the expected behavior."""
assert container_is_running(remote_pod, 'base') is result


def params_for_test_container_is_completed():
"""The `container_is_completed` method is designed to handle an assortment of bad objects
returned from `read_pod`. E.g. a None object, an object `e` such that `e.status` is None,
an object `e` such that `e.status.container_statuses` is None, and so on. This function
emits params used in `test_container_is_completed` to verify this behavior.

We create mock classes not derived from MagicMock because with an instance `e` of MagicMock,
tests like `e.hello is not None` are always True.
"""

class RemotePodMock:
pass

class ContainerStatusMock:
def __init__(self, name):
self.name = name

def remote_pod(completed=None, not_completed=None):
e = RemotePodMock()
e.status = RemotePodMock()
e.status.container_statuses = []
for r in not_completed or []:
e.status.container_statuses.append(container(r, False))
for r in completed or []:
e.status.container_statuses.append(container(r, True))
return e

def container(name, completed):
c = ContainerStatusMock(name)
c.state = RemotePodMock()
c.state.terminated = {'a': 'b'} if completed else None
return c

pod_mock_list = []
pod_mock_list.append(pytest.param(None, False, id='None remote_pod'))
p = RemotePodMock()
p.status = None
pod_mock_list.append(pytest.param(p, False, id='None remote_pod.status'))
p = RemotePodMock()
p.status = RemotePodMock()
p.status.container_statuses = []
pod_mock_list.append(pytest.param(p, False, id='empty remote_pod.status.container_statuses'))
pod_mock_list.append(pytest.param(remote_pod(), False, id='filter empty'))
pod_mock_list.append(pytest.param(remote_pod(None, ['base']), False, id='filter 0 completed'))
pod_mock_list.append(pytest.param(remote_pod(['hello'], ['base']), False, id='filter 1 not completed'))
pod_mock_list.append(pytest.param(remote_pod(['base'], ['hello']), True, id='filter 1 completed'))
return pod_mock_list


@pytest.mark.parametrize('remote_pod, result', params_for_test_container_is_completed())
def test_container_is_completed(remote_pod, result):
"""The `container_is_completed` function is designed to handle an assortment of bad objects
returned from `read_pod`. E.g. a None object, an object `e` such that `e.status` is None,
an object `e` such that `e.status.container_statuses` is None, and so on. This test
verifies the expected behavior."""
assert container_is_completed(remote_pod, 'base') is result


def params_for_test_container_is_succeeded():
"""The `container_is_succeeded` method is designed to handle an assortment of bad objects
returned from `read_pod`. E.g. a None object, an object `e` such that `e.status` is None,
an object `e` such that `e.status.container_statuses` is None, and so on. This function
emits params used in `test_container_is_succeeded` to verify this behavior.

We create mock classes not derived from MagicMock because with an instance `e` of MagicMock,
tests like `e.hello is not None` are always True.
"""

class RemotePodMock:
pass

class ContainerStatusMock:
def __init__(self, name):
self.name = name

def remote_pod(succeeded=None, not_succeeded=None):
e = RemotePodMock()
e.status = RemotePodMock()
e.status.container_statuses = []
for r in not_succeeded or []:
e.status.container_statuses.append(container(r, False))
for r in succeeded or []:
e.status.container_statuses.append(container(r, True))
return e

def container(name, succeeded):
c = ContainerStatusMock(name)
c.state = RemotePodMock()
c.state.terminated = {'exit_code': 0} if succeeded else None
return c

pod_mock_list = []
pod_mock_list.append(pytest.param(None, False, id='None remote_pod'))
p = RemotePodMock()
p.status = None
pod_mock_list.append(pytest.param(p, False, id='None remote_pod.status'))
p = RemotePodMock()
p.status = RemotePodMock()
p.status.container_statuses = []
pod_mock_list.append(pytest.param(p, False, id='empty remote_pod.status.container_statuses'))
pod_mock_list.append(pytest.param(remote_pod(), False, id='filter empty'))
pod_mock_list.append(pytest.param(remote_pod(None, ['base']), False, id='filter 0 succeeded'))
pod_mock_list.append(pytest.param(remote_pod(['hello'], ['base']), False, id='filter 1 not succeeded'))
pod_mock_list.append(pytest.param(remote_pod(['base'], ['hello']), True, id='filter 1 succeeded'))
return pod_mock_list


@pytest.mark.parametrize('remote_pod, result', params_for_test_container_is_succeeded())
def test_container_is_succeeded(remote_pod, result):
"""The `container_is_succeeded` function is designed to handle an assortment of bad objects
returned from `read_pod`. E.g. a None object, an object `e` such that `e.status` is None,
an object `e` such that `e.status.container_statuses` is None, and so on. This test
verifies the expected behavior."""
assert container_is_succeeded(remote_pod, 'base') is result