-
Notifications
You must be signed in to change notification settings - Fork 16.4k
Added Istio support to KubernetesPodOperator #26739
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Congratulations on your first Pull Request and welcome to the Apache Airflow community! If you have any issues or are unsure about any anything please check our Contribution Guide (https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst)
|
|
Thank you for that change, our initial test proved it working |
|
I don't think this should be a new operator! |
Agree. I think it should be merged into KPO (istio=True). It's logic might be separated into classes but it should be one module and operator to use by the users. |
was just gonna say the same thnig |
|
it is actioned as single operator now |
| with _suppress(Exception): | ||
| self.patch_already_checked(remote_pod) | ||
| if pod_phase != PodPhase.SUCCEEDED: | ||
| if (not self.istio_enabled and pod_phase != PodPhase.SUCCEEDED) or (self.istio_enabled and pod_phase != PodPhase.SUCCEEDED and not self.pod_manager.container_is_succeeded(pod, 'base')): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you will need to enable pre-commit locally see https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst
| with _suppress(Exception): | ||
| self.patch_already_checked(remote_pod) | ||
| if pod_phase != PodPhase.SUCCEEDED: | ||
| if (not self.istio_enabled and pod_phase != PodPhase.SUCCEEDED) or (self.istio_enabled and pod_phase != PodPhase.SUCCEEDED and not self.pod_manager.container_is_succeeded(pod, 'base')): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this expression can probably be simplified. i would try to do so. it's too hard to understand as it is.
| def container_is_completed(pod: V1Pod, container_name: str) -> bool: | ||
| """ | ||
| Examines V1Pod ``pod`` to determine whether ``container_name`` is completed. | ||
| If that container is present and completed, returns True. Returns False otherwise. | ||
| """ | ||
| container_statuses = pod.status.container_statuses if pod and pod.status else None | ||
| if not container_statuses: | ||
| return False | ||
| container_status = next(iter([x for x in container_statuses if x.name == container_name]), None) | ||
| if not container_status: | ||
| return False | ||
| return container_status.state.terminated is not None | ||
|
|
||
|
|
||
| def container_is_succeeded(pod: V1Pod, container_name: str) -> bool: | ||
| """ | ||
| Examines V1Pod ``pod`` to determine whether ``container_name`` is completed and succeeded. | ||
| If that container is present and completed and succeeded, returns True. Returns False otherwise. | ||
| """ | ||
| if not container_is_completed(pod, container_name): | ||
| return False | ||
| container_statuses = pod.status.container_statuses if pod and pod.status else None | ||
| if not container_statuses: | ||
| return False | ||
| container_status = next(iter([x for x in container_statuses if x.name == container_name]), None) | ||
| if not container_status: | ||
| return False | ||
| return container_status.state.terminated.exit_code == 0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
seems you are duplicating a lot of logic here. looks like you could pull out a get_container_status function
| if remote_pod.status.phase in PodPhase.terminal_states: | ||
| break | ||
| if self.istio_enabled and remote_pod.status.phase == PodPhase.RUNNING and self.container_is_completed( | ||
| remote_pod, 'base' |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
'base' is defined in a constant somewhere
|
@joshuayeung are you still working on this PR? |
|
This pull request has been automatically marked as stale because it has not had recent activity. It will be closed in 5 days if no further activity occurs. Thank you for your contributions. |
|
any chances to reopen this @eladkal ? I know @joshuayeung personally and have his consent to continue to work on this PR. Thanks |
|
@Owen-CH-Leung I am actually interested in this work too. I have worked on this in my company and we are seeing stable results in development environment. |
|
i reopened it... but you might have to create a new PR anyway, unless joshua grants you permission to his fork |
|
for anyone who wants to continue the work started on this PR, it's much better to cherry pick the commits to your own fork/branch and continue from there. By doing so you are working independently and also save the credit for the original author of the commits that you are building on top of. |
|
Ok I'll cherry pick @joshuayeung 's commits and start a new PR from there |
|
Closing infavor of #31389 |
This PR added IstioKubernetesPodOperator and IstioPodManager for making KubernetesPodOperator works in Kubernetes with Istio enabled.
Istio will inject a sidecar called istio-proxy inside your pod and handle the traffic for you. But this hinders the standard workflow of the Kubernetes executor and Kubernetes Pod Operator. It is because both of them use the Pod Phase to determine the status of the dag task. Airflow will consider that the task is finished when the Pod Phase is successful and failed. Because of the istio-proxy sidecar, the pod will always be running. Just the task container itself is completed. This will lead to the Pod Phase become Not Ready in the success task, and Error in the failed task. In Airflow UI, you will see the task always running and never-ending.
The best way is to implement an Istio Kubernetes Pod Operator that shut down the pod when the base container (the container that runs your task) is completed. It involves creating an Istio pod launcher.
^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named
{pr_number}.significant.rstor{issue_number}.significant.rst, in newsfragments.