-
Notifications
You must be signed in to change notification settings - Fork 16.4k
AIRFLOW-124 Implement create_dagrun #1506
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
airflow/models.py
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should there only ever by one dag run with a given dag_id/execution_date/run_id? If so, wonder if we want to fail, or log grumpily if we get more than one response back.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It refreshes itself so no we cannot have more. Maybe one() would be better.
|
Couple of nits/questions, but LGTM overall. Take my opinion with a grain of salt, since my scheduler knowledge is unfortunately a bit lacking. |
airflow/models.py
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: order of params is backwards (state should come before start_date), and conf/session are missing
This adds the create_dagrun function to DAG and the staticmethod DagRun.find. create_dagrun will create a dagrun including its tasks. By having taskinstances created at dagrun instantiation time, deadlocks that were tested for will not take place anymore. Tests have been adjusted accordingly. In addition, integrity has been improved by a bugfix to add_task of the BaseOperator to make sure to always assign a Dag if it is present to a task. DagRun.find is a convenience function that returns the DagRuns for a given dag. It makes sure to have a single place how to find dagruns.
|
@bolkedebruin LGTM |
|
LGTM after commented added |
|
|
||
| @staticmethod | ||
| @provide_session | ||
| def find(dag_id, run_id=None, state=None, external_trigger=None, session=None): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for encapsulating the logic by the way, this is amazing.
|
LGTM except for the unnecessary |
|
@mistercrunch thanks. There is one operational regression (also heads up @aoen, @jlowin, @artwr) due to the eager creation of task instances. In its current form the scheduler will evaluate all State.NONE task instances and thus due to eager creation it will take more time to evaluate them. I'm addressing this issue in a follow up PR (almost done/ready for review). |
Dear Airflow Maintainers,
Please accept this PR that addresses the following issues:
This PR forms the basis for the roadmap (https://drive.google.com/open?id=0B_Y7S4YFVWvYM1o0aDhKMjJhNzg) . It is one of the initial commits from master...bolkedebruin:AIRFLOW_SCHEDULER .
DAG.create_dagrun (please review: @aoen, @jlowin, @mistercrunch, @artwr )
This creates dagrun from a Dag. It also creates the TaskInstances from the tasks known at instantiation time. By having taskinstances created at dagrun instantiation time, deadlocks that were tested for will not take place anymore (@jlowin, correct? different test required?). For now, the visual consequence of having these taskinstances already there is that they will be black in the tree view.
Tests in core.py were adjusted as they were supposedly creating a dagrun with tasks, while they were actually creating dagruns and orphaned TaskInstances (ie. the dag_id was not matching the dag_id from the dagrun). This was discussed with @artwr, who said these were remnants from the past.
By doing this I also fixed an issue in models.py that a dag was not set for a task if called from dag.add_task (@aoen, @jlowin).
DagRun.find is a convenience function that returns the DagRuns for a given dag. It makes sure to have a single place how to find dagruns
DagRun.find does not work for all cases yet (ie multiple execution dates etc). My aim is to limit the ORM required in sub functions that implement somethings just a bit differently creating compatibility issues.