Skip to content

Conversation

@aoen
Copy link
Contributor

@aoen aoen commented May 19, 2016

See #1435 for previous discussion for this PR

THIS IS A DRAFT WIP PULL REQUEST
This is a draft for a solution to #1383 among several other things.
There are tons of things broken/missing here, I'm just putting this out there to get some initial eyes on the high-level changes.

Goals

  • Simplify, consolidate, and make consistent the logic of whether or not a task should be run
  • Provide a view that gives insight into why a task instance is not currently running (no more viewing the scheduler logs to find out why a task instance isn't running for the majority of cases)
    e.g. (this will not be the final product):
    image

Things to review in this PR

  • The high level idea of refactoring of all of the dependency checking for regular task execution, backfills, etc into one single function
  • Any of the functional changes (not the code itself) made listed in "Some big changes made in this PR"
  • The design of the BaseTIDep class
  • Any changes in design that would require a complete rewrite of the current changes (where it would be non-trivial to change the code in the PRs diff to conform with the new design)

Things NOT to review in this PR

  • The view layer for surfacing the failing dependencies to users
  • Bugs/debugging statements that should be removed/lack of tests/lack of comments/etc.
  • Failing CI/merge conflicts, due to the somewhat large scope of this change I will iterate on a fixed SHA on master and then either rebase all at once or break the PR up into smaller parts and release them individually
  • Performance/DB hits (Max had some valid concerns and I will test for this and optimize/cache as appropriate)

Some big changes made in this PR

  • Running a backfill in the command line and running a task in the UI will now display detailed error messages based on which dependencies were not met for a task instead of appearing to succeed but actually failing silently
  • Pool behavior in backfills is now consistent with how regular tasks are pooled (pools are always respected in backfills).
    This will break one use case:
    Using pools to restrict some resource on airflow executors themselves (rather than an external resource like a DB), e.g. some task uses 60% of cpu on a worker so we restrict that task's pool size to 1 to prevent two of the tasks from running on the same host. When backfilling a task of this type, now the backfill will wait on the pool to have slots open up before running the task even though we don't need to do this if backfilling on a different host outside of the pool. I think breaking this use case is OK since the use case is a hack due to not having a proper resource isolation solution (e.g. mesos should be used in this case instead).
  • To make things less confusing for users, there is now a "ignore all dependencies" option for running tasks, "ignore dependencies" has been renamed to "ignore task dependencies", and "force" has been renamed to "ignore task instance state". The new "Ignore all dependencies" flag will ignore the following:
    • task instance's pool being full
    • execution date for a task instance being in the future
    • a task instance being in the retry waiting period
    • the task instance's task ending prior to the task instance's execution date
    • task instance is already queued
    • task instance has already completed
    • task instance is in the shutdown state
    • WILL NOT OVERRIDE task instance is already running
  • SLA miss emails will now include all tasks that did not finish for a particular DAG run, even if the tasks didn't run because depends_on_past was not met for a task
  • Failed tasks will no longer be considered "runnable", they must be force-run or cleared first from the UI to be run, just like successful tasks
  • New CLI command task_failing_deps
  • Queuing a task into a pool that doesn't exist will now get stopped in the scheduler instead of a worker

Future Work (will not be released in the first version)

  • Show one more dagrun in the tree view (even when that dagrun's execution date hasn't occurred yet) so that users can see why task instances in the next dagrun are blocked. I wanted to include this in the first release but it's non-trivial to do (one way to solve this is to generate DAG runs before their execution date occurs). You can still view a non-existent task instance by passing the right query parameters to the task instance page, or by querying via the new command-line command.
  • Break down failed dependency explanations better (e.g. if the trigger rules requiring all upstream task successes fails, indicate the specific upstream tasks that failed)
  • Make failing dependencies for task instances more visible to users (e.g. asynchronously display failing dependencies when mousing over a task instance in the tree view).
  • Parallelize task instance dependency checking.
  • Indicate that a task isn't running because it was manually cleared: if a task is manually cleared the scheduler won't automatically rerun it, but there is no way of knowing this for sure without some kind of flag that marks the task state as cleared.
  • Can potentially add tips to fix a failing dependency in the new view that shows failed task instance dependencies (e.g. for "depends_on_past" when the last task state is failing we can give the tip to get this task to succeed, or even provide a button to clear it right from that view)
  • Additional task instance dependencies, e.g. there are no schedulers running (a task can't get scheduled until a scheduler is running), or the airflow scheduler queue is backed up, or no executors being available to run the task instance
  • Forcing tasks in the UI should give users feedback on why their task didn't get forced (e.g. task was already running)
  • Keep history of changes to blocked dependencies that are accessible via the UI (this is useful to know why a task's execution was delayed)

Tests for dependency contexts are missing, I will add them once I get an an initial LGTM

@mistercrunch @jlowin @plypaul

jlowin and others added 8 commits May 9, 2016 16:12
Dag hash function tried (and failed) to hash the list of tasks, then fell back on repr-ing the list, which took forever. Instead, hash tuple(task_dict.keys()). In addition this replaces two slow list comprehensions with much faster hash lookups (using the new task_dict).
When Scheduler is run with `—num-runs`, there can be multiple
Schedulers and Executors all trying to run tasks. For queued tasks,
Scheduler was previously only trying to run tasks that it itself had
queued — but that doesn’t work if the Scheduler is restarting. This PR
reverts that behavior and adds two types of “best effort” executions —
before running a TI, executors check if it is already running, and
before ending executors call sync() one last time
The scheduler can encounter a queued task twice before the
task actually starts to run -- this locks the task and avoids
that condition.
@landscape-bot
Copy link

Code Health
Repository health increased by 0.08% when pulling f272717 on aoen:blockedTIExplainer into 5e40d98 on apache:master.

@bolkedebruin
Copy link
Contributor

@aoen Firstly, thanks for this great work. A couple of things:.

Paused DAG checking now occurs at the task instance level in models.py instead of in jobs.py to consolidate the "should this task run" logic into one place

I think this should happen at the DagRun level not at the task instance level. (Dag->DagRun Task->TaskInstance)

To make things less confusing for users, force running a task will now override/ignore:
task instance's pool being full

This is an operational issue. At a minimum I would like to see a "Pool is full. Please confirm"

task instance is already queued

What will happen with the queued instance?

Failed tasks will no longer be considered "runnable", they must be force-run or cleared first from the UI to be run, just like successful tasks

Nice!

Future work

Parallelize task instance dependency checking.

I have some ideas about this. I think we can go below 1-2s loops on a single core and reducing the amount of queries significantly by having taskinstances update counters in the dagrun. The issue lies in the are_dependencies_met function that does aggregate queries for every task(!).

airflow/jobs.py Outdated
State.RUNNING, State.QUEUED, State.SUCCESS, State.FAILED):
continue
elif ti.is_runnable(flag_upstream_failed=True):
elif ti.are_dependencies_met(flag_upstream_failed=True):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please provide the session if not necessary to have a new session. It reduces connection overhead.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will do in final version (still a WIP).

@aoen aoen force-pushed the blockedTIExplainer branch from f272717 to 1f1f1cd Compare June 2, 2016 00:55
@aoen
Copy link
Contributor Author

aoen commented Jun 2, 2016

Sorry, missed your comments. Thanks much for the review!

I think this should happen at the DagRun level not at the task instance level. (Dag->DagRun Task->TaskInstance)

I don't think it's that bad to check at both places, i.e. I think it is logical to not schedule any new tasks once a DAG is paused, even for existing dagruns. My medium-term vision is something like:
(note is_paused is in DAG Dependencies not DAGRunDependencies)

def TaskInstanceDependenciesMet(): # used for schduling new task instances
  # task-instance specific logic
  return DAGRunDependenciesMet()

def DAGRunDependenciesMet(): # used for scheduling new dag runs
  # logic that checks e.g. if the start date for the dagrun has been met
  return DAGDependenciesMet()

def DAGDependenciesMet():
  return dag.is_paused()

This is an operational issue. At a minimum I would like to see a "Pool is full. Please confirm"

The way I have it currently is that the UIs (web UI, CLI backfill etc) will throw an error if you don't specify "force". The UIs won't prompt you if you are running with "force" which I think is acceptable since a user must actively click/add "force". I believe backfilling a task ignores the pool at the moment anyways by default so my change just makes starting a task in the the web UI consistent with backfilling. That being said if you feel strongly about this let me know, I think the simpler solution for now would be to just not allow overriding pools.

Kind of a tangent but I think the primary use case of pools is to simulate a throttling layer to external services. I can see the usefulness of this in practice and understand that users are depending on this functionality in the wild, but I think the throttling should really be performed in the respective services (separation of concerns, control over what "quota" means, ability to throttle from sources other than Airflow, etc), not in Airflow. I think pools should eventually be removed as they add a lot of complexity both to users and in the code. I'm curious what you and others think about this.

What will happen with the queued instance?

The executors check if another executor already picked up the instance (i.e. if the TI's state == RUNNING), so double-queueing shouldn't be a problem. The DB state will be consistent too although the previous queueing attempt would be lost since we don't have a concept of a TI state history at the moment. Queuing a task should also be an idempotent operation.

I have some ideas about this. I think we can go below 1-2s loops on a single core and reducing the amount of queries significantly by having taskinstances update counters in the dagrun. The issue lies in the are_dependencies_met function that does aggregate queries for every task(!).

Agreed, I liked the idea of are_dependencies_met doing aggregate queries using in-process caching and prefetching a whole bunch of data to reduce requests to the DB. For example we could query the task instances for all of the upstream/downstream task instances in the same request. The nice thing is we would have separation of concerns, i.e. accessing the queried data would be done the same way everywhere and we wouldn't need to pass around the DB query results between functions. I haven't thought about this too deeply though the complexity might not be worth it.

@aoen aoen force-pushed the blockedTIExplainer branch 20 times, most recently from 2a2c256 to 2bf3d4e Compare June 10, 2016 04:41
Instead of parsing the DAG definition files in the same process as the
scheduler, this change parses the files in a child process. This helps
to isolate the scheduler from bad user code.
@shenghuy
Copy link

shenghuy commented Jun 14, 2016

Great work, do we have a timeline for merging this PR? #1579 is trying to resolve some concurrency bug, which you believe is already covered by this PR.

@aoen
Copy link
Contributor Author

aoen commented Jun 14, 2016

Should be merged fairly shortly after #1559

There were 3 big commits (including this one) that touched the core that stepped on each other toes.

aoen added 2 commits June 17, 2016 10:45
Some iterating

Some iterating

Some iterating

Some iterating

Some iterating

Some iterating

Some iterating

[airflow] 2nd attempt at releasing airflow 1.7.1

Accidentally worked in airflow dir instead of airflow-dev, merged in changes

iterating

iterating

iterating

CHECKPOINT BEFORE REBASE
@aoen aoen force-pushed the blockedTIExplainer branch from 2bf3d4e to c93979e Compare June 20, 2016 21:46
@aoen
Copy link
Contributor Author

aoen commented Jun 24, 2016

The final PR (minus rebasing against master) is here: aoen#1 , closing this PR

@aoen aoen closed this Jun 24, 2016
abhishekbhakat pushed a commit to abhishekbhakat/my_airflow that referenced this pull request May 28, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

7 participants