Skip to content

Conversation

@seelmann
Copy link
Member

Jira

Description

  • Here are some details about my PR, including screenshots of any UI changes:
    • Fix zombie detection and killing by removing the condition that returned zombie task instances only once within 10 seconds. The method is called only once per second anyway because the loop sleeps if it's faster than one second. The executed query uses indexes.

Tests

  • My PR adds the following unit tests:
    • Adapted existing test

Commits

  • My commits all reference Jira issues in their subject lines, and I have squashed multiple commits if they address the same issue. In addition, my commits follow the guidelines from "How to write a good git commit message":
    1. Subject is separated from body by a blank line
    2. Subject is limited to 50 characters (not including Jira issue reference)
    3. Subject does not end with a period
    4. Subject uses the imperative mood ("add", not "adding")
    5. Body wraps at 72 characters
    6. Body explains "what" and "why", not "how"

Documentation

  • In case of new functionality, my PR adds documentation that describes how to use it.
    • All the public functions and the classes in the PR contain docstrings that explain what it does
    • If you implement backwards incompatible changes, please leave a note in the Updating.md so we can assign it to a appropriate release

Code Quality

  • Passes flake8

@codecov-io
Copy link

codecov-io commented Jun 15, 2019

Codecov Report

Merging #5420 into master will decrease coverage by <.01%.
The diff coverage is 100%.

Impacted file tree graph

@@            Coverage Diff            @@
##           master   #5420      +/-   ##
=========================================
- Coverage    79.1%   79.1%   -0.01%     
=========================================
  Files         483     483              
  Lines       30317   30312       -5     
=========================================
- Hits        23983   23977       -6     
- Misses       6334    6335       +1
Impacted Files Coverage Δ
airflow/utils/dag_processing.py 59.93% <100%> (-0.35%) ⬇️
airflow/models/taskinstance.py 93.02% <0%> (-0.17%) ⬇️

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 2d18f07...93de2ce. Read the comment docs.

@ashb
Copy link
Member

ashb commented Jun 17, 2019

The jira issue talks about

That means only in every 10th iteration of the DAG file processing loop zombies are detected. And only if the zombie task belong to one of the DAG files of the current iteration they are killed.

However I don't see anything in the change or original code that changes what DAGs we look for zombies in - it should be all dags. So either this change doesn't fix the problem, or it never behaved as described?

In addition to all that, rather than running that every loop I'd feel more comformatable making the default 10s interval a config value, so that for example you could set it to 0 in your install if you wanted.

@seelmann
Copy link
Member Author

seelmann commented Jun 17, 2019

Yes, correct, the _find_zombies() function returns zombies for all the DAGs (or an empty list within the 10 second window). But the caller of this function

zombies = self._find_zombies()
then only starts processors for n DAG files which receive the list of zombies, subsequent processors for other DAG files just get an empty list.

The list of (all or none) zombies is passed down via DagFileProcessor and SchedulerJob.process_file() to DagBag.kill_zombies()

def kill_zombies(self, zombies, session=None):
which then checks each zombie if it belongs to the DAG and kills it.

This is far too complex for such a simple thing like detecting zombie task instances and kill them. Last Friday I debugged 5 hours to find the reason.

I thought about if it's not better to remove the zombie detection from DagFileProcessorManager and all the passing the list around and just implement the query within DagBag.kill_zombies(), which can only search for it's own DAGs and there a 10 seconds delay makes sense. WDYT?

@ashb
Copy link
Member

ashb commented Jun 17, 2019

Oh ouch.

I'd have to double check the code, but finding zombies doesn't seem relevant to dag processing so moving it does sound sensible.

@seelmann
Copy link
Member Author

Ok, I'll give it a try

TI = airflow.models.TaskInstance
limit_dttm = timezone.utcnow() - timedelta(
seconds=self._zombie_threshold_secs)
self.log.info("Failing jobs without heartbeat after %s", limit_dttm)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is going to produce quite a lot of logs.

@seelmann
Copy link
Member Author

Here is the first draft of the alternative approach to move zombie detection down to DagBag: https://github.com/seelmann/incubator-airflow/commit/6ce4f56b907efdf34db780c26b986bab4225d045

Missing: more tests, verify SQL query is fast (uses index), find out if throttling makes sense, ...

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants