Skip to content

Conversation

@blcksrx
Copy link
Contributor

@blcksrx blcksrx commented Jun 8, 2023

This PR would fix the problems mentioned in the issue.

  • with new args watch: bool = False this operator executes as same as the old version and the compatibility issue would solves
  • The job status covers by the k8s events in case of success or fail
  • tests for these situation

closes: #31183


Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in newsfragments.

@blcksrx blcksrx requested a review from jedcunningham as a code owner June 8, 2023 16:30
@boring-cyborg boring-cyborg bot added provider:cncf-kubernetes Kubernetes (k8s) provider related issues area:providers labels Jun 8, 2023
@blcksrx blcksrx force-pushed the bug/spark_sensor_operator branch 2 times, most recently from 46cadac to 1d1325f Compare June 8, 2023 18:28
@blcksrx
Copy link
Contributor Author

blcksrx commented Jun 14, 2023

@hanna-liashchuk @Owen-CH-Leung @matheus-rossi @adminidever
Since you were involved with the related issue, please don't hesitate to review this PR

@jherrmannNetfonds
Copy link

Hi, thanks for the PR.
I am testing this in production right now with airflow 2.6.1 running on Kubernetes with KubernetesExecutor. I encountered this error today:

[2023-06-16, 09:44:04 CEST] {taskinstance.py:1824} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.10/site-packages/urllib3/response.py", line 761, in _update_chunk_length
    self.chunk_left = int(line, 16)
ValueError: invalid literal for int() with base 16: b''
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.10/site-packages/urllib3/response.py", line 444, in _error_catcher
    yield
  File "/home/airflow/.local/lib/python3.10/site-packages/urllib3/response.py", line 828, in read_chunked
    self._update_chunk_length()
  File "/home/airflow/.local/lib/python3.10/site-packages/urllib3/response.py", line 765, in _update_chunk_length
    raise InvalidChunkLength(self, line)
urllib3.exceptions.InvalidChunkLength: InvalidChunkLength(got length b'', 0 bytes read)
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
  File "/opt/airflow/dags/repo/packages/local_copy_of_this_pr/spark_kubernetes.py", line 122, in execute
    for line in pod_log_stream:
  File "/home/airflow/.local/lib/python3.10/site-packages/kubernetes/watch/watch.py", line 165, in stream
    for line in iter_resp_lines(resp):
  File "/home/airflow/.local/lib/python3.10/site-packages/kubernetes/watch/watch.py", line 56, in iter_resp_lines
    for seg in resp.stream(amt=None, decode_content=False):
  File "/home/airflow/.local/lib/python3.10/site-packages/urllib3/response.py", line 624, in stream
    for line in self.read_chunked(amt, decode_content=decode_content):
  File "/home/airflow/.local/lib/python3.10/site-packages/urllib3/response.py", line 816, in read_chunked
    with self._error_catcher():
  File "/usr/local/lib/python3.10/contextlib.py", line 153, in __exit__
    self.gen.throw(typ, value, traceback)
  File "/home/airflow/.local/lib/python3.10/site-packages/urllib3/response.py", line 461, in _error_catcher
    raise ProtocolError("Connection broken: %r" % e, e)

This fails the task, but the spark application is still running and succeeding. Maybe some of these types of errors should be catched instead of letting the pod fail. What do you think?

@blcksrx
Copy link
Contributor Author

blcksrx commented Jun 16, 2023

@jherrmannNetfonds Good catch buddy.
Since with watch=True this operator is responsible for the execution, logs and states of the task, I believe in that case the pod/job should be killed, that's why I overwrited the on_kill method to kill it.
But apparently the on_kill didn't work.

@blcksrx
Copy link
Contributor Author

blcksrx commented Jun 16, 2023

@jherrmannNetfonds Good catch buddy. Since with watch=True this operator is responsible for the execution, logs and states of the task, I believe in that case the pod/job should be killed, that's why I overwrited the on_kill method to kill it. But apparently the on_kill didn't work.

@potiuk
I believe this behaviour that on_kill method did not invoke is because we only catch AirflowTaskTimeout

try:
# It's possible we're already timed out, so fast-fail if true
if timeout_seconds <= 0:
raise AirflowTaskTimeout()
# Run task in timeout wrapper
with timeout(timeout_seconds):
result = execute_callable(context=context)
except AirflowTaskTimeout:
task_to_execute.on_kill()
raise

@potiuk
Copy link
Member

potiuk commented Jun 16, 2023

@jherrmannNetfonds Good catch buddy. Since with watch=True this operator is responsible for the execution, logs and states of the task, I believe in that case the pod/job should be killed, that's why I overwrited the on_kill method to kill it. But apparently the on_kill didn't work.

@potiuk I believe this behaviour that on_kill method did not invoke is because we only catch AirflowTaskTimeout

try:
# It's possible we're already timed out, so fast-fail if true
if timeout_seconds <= 0:
raise AirflowTaskTimeout()
# Run task in timeout wrapper
with timeout(timeout_seconds):
result = execute_callable(context=context)
except AirflowTaskTimeout:
task_to_execute.on_kill()
raise

We run on_kill in task receiving signal:

        def signal_handler(signum, frame):
            pid = os.getpid()

            # If a task forks during execution (from DAG code) for whatever
            # reason, we want to make sure that we react to the signal only in
            # the process that we've spawned ourselves (referred to here as the
            # parent process).
            if pid != parent_pid:
                os._exit(1)
                return
            self.log.error("Received SIGTERM. Terminating subprocesses.")
            self.task.on_kill()
            raise AirflowException("Task received SIGTERM signal")

(the other on_kill was different)

@potiuk
Copy link
Member

potiuk commented Jun 16, 2023

@jherrmannNetfonds Good catch buddy. Since with watch=True this operator is responsible for the execution, logs and states of the task, I believe in that case the pod/job should be killed, that's why I overwrited the on_kill method to kill it. But apparently the on_kill didn't work.

@potiuk I believe this behaviour that on_kill method did not invoke is because we only catch AirflowTaskTimeout

try:
# It's possible we're already timed out, so fast-fail if true
if timeout_seconds <= 0:
raise AirflowTaskTimeout()
# Run task in timeout wrapper
with timeout(timeout_seconds):
result = execute_callable(context=context)
except AirflowTaskTimeout:
task_to_execute.on_kill()
raise

We run on_kill in local_task_runner:

        finally:
            self.on_kill()

@potiuk potiuk closed this Jun 16, 2023
@blcksrx
Copy link
Contributor Author

blcksrx commented Jun 16, 2023

@potiuk Why closing PR?
With a try catch it would be done

@jherrmannNetfonds
Copy link

Please consider reopening the PR, since 2.6.x came with braking changes for the SparkKubernetesOperator compared to 2.5.x which would be "fixed" in this PR. With 2.6.x the SparkKubernetesOperator is basically unusable, since it will fail to often in a pure k8 environment compared to the sensor approach in before 2.6.0

@potiuk potiuk reopened this Jun 16, 2023
@potiuk
Copy link
Member

potiuk commented Jun 16, 2023

Sorry, Sorry, it was PURELY accidental :). I usually write briefly why I am closing stuff (sometimes maybe in a harsh way but I usually tell why ;)

@jherrmannNetfonds
Copy link

thx for reopening.
@blcksrx fyi
I am testing my local version from line 123 with

try:
    self.log.info(line)
except HTTPError as e:
    self.log.warn(e)

Probably cleaner to kill everything (task + sparkapp) on an error, but on the other side, sparkapps are mostly long running jobs processing big amounts of data. So the question is, what is more intuitive / less confusing / less annoying for the user.

@blcksrx blcksrx force-pushed the bug/spark_sensor_operator branch from 1d1325f to af7aa7c Compare June 19, 2023 12:40
pyproject.toml Outdated
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why changing that ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh sorry, that was a clumsy action 😁

@blcksrx blcksrx force-pushed the bug/spark_sensor_operator branch from af7aa7c to 2c09c0a Compare June 25, 2023 15:31
@blcksrx blcksrx requested a review from potiuk June 26, 2023 14:01
Copy link
Member

@potiuk potiuk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not know much about the Spark/K8S interface, but the change looks legit.

@jedcunningham - would you want to take a look as well?

@sudohainguyen
Copy link
Contributor

hey @blcksrx I just got an issue when my SparkApp completed but never reached SparkApplicationCompleted reason, the airflow task is stuck into infinite loop until it gets 410 error

Have you encountered the same ?

@blcksrx
Copy link
Contributor Author

blcksrx commented Feb 21, 2024

Hi @sudohainguyen Unfortunately not.
The operator has been changed a lot since then, please check the version and create an Issue if the latest version has any problem

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:providers provider:cncf-kubernetes Kubernetes (k8s) provider related issues

Projects

None yet

Development

Successfully merging this pull request may close these issues.

SparkKubernetesSensor: 'None' has no attribute 'metadata'

4 participants