Skip to content

Conversation

@gwax
Copy link
Contributor

@gwax gwax commented Aug 20, 2016

Dear Airflow Maintainers,

Please accept this PR that addresses the following issues:

Testing Done:

  • Local testing of dag operation with LatestOnlyOperator
  • Unit test added

@codecov-io
Copy link

codecov-io commented Aug 20, 2016

Current coverage is 64.70% (diff: 0.00%)

Merging #1752 into master will decrease coverage by 0.18%

@@             master      #1752   diff @@
==========================================
  Files           128        129     +1   
  Lines          9630       9656    +26   
  Methods           0          0          
  Messages          0          0          
  Branches          0          0          
==========================================
- Hits           6249       6248     -1   
- Misses         3381       3408    +27   
  Partials          0          0          

Powered by Codecov. Last update fe037d6...4bb7843

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

extra '

Copy link
Contributor

@r39132 r39132 Aug 21, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep.. it's causing Landscape.io to show an issue. Our rule is that landscape must be green for merge. It both travis and landscape take a long time to run, you can also run them in your fork.

@msumit
Copy link
Contributor

msumit commented Aug 21, 2016

Please include unit tests.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have you tested this? I'm curious about the context['ti'].start_date. Why not use datetime's now()? What is the start_date of ti set to?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TaskInstance.run sets start_date to be datetime.now(): https://github.com/apache/incubator-airflow/blob/master/airflow/models.py#L1185

There are also tests using freezegun provided with the PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason to use start_date instead of datetime.now() is because start_date gets stored in the metadata db so we can see it in the UI

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Check out https://gist.github.com/r39132/ad9601e5d056f655936c0832ca772d6d

start_date won't work. The problem is that it's fixed and doesn't change with your tasks, especially, if passed via default_args, such as shown in https://gist.github.com/r39132/2e6e8353d42852cdb8f7717a3b1998b9

In other words, it's set one time for most people, at the time the dag is first parsed.

Also, the computation of left and right doesn't seem right.

For an hourly dag, context['execution_date'] of noon will run at 1p. The left_window would be noon, the right_window would be 1pm and now would be a few seconds past 1p.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're absolutely right; airflow time is still super confusing at times.

@r39132
Copy link
Contributor

r39132 commented Sep 20, 2016

Can you add this to an example dag? Also, can you provide a screen shot to accompany it running in the sample dag, so others get a sense of how it works and how to use it. Finally, please update the docs by adding a section for this new behavior : perhaps under https://airflow.incubator.apache.org/concepts.html#additional-functionality

-s

@gwax gwax force-pushed the latest_only branch 2 times, most recently from 924cfdf to 76f5213 Compare September 21, 2016 03:02
@gwax
Copy link
Contributor Author

gwax commented Sep 21, 2016

Docs updated. Example provided.

@r39132
Copy link
Contributor

r39132 commented Sep 22, 2016

@gwax I have not been able to get your example to work. Making it run every hour, it stuck in the first dagrun.

screenshot 2016-09-22 06 03 58

screenshot 2016-09-22 06 05 07

@r39132
Copy link
Contributor

r39132 commented Sep 22, 2016

Hmn. It looks like it is working, but very, very slowly ... like making progress on a dag run every 5 minutes or more....

screenshot 2016-09-22 06 13 13

@gwax
Copy link
Contributor Author

gwax commented Sep 22, 2016

I was also seeing the slow execution as well but I've been having enough
issues with the new scheduler, I thought it was unrelated to my operator.
We use the exact same operator code as a plugin with 1.7.1.3 with no
trouble at all.

On Wed, Sep 21, 2016, 10:14 PM Sid Anand notifications@github.com wrote:

Hmn. It looks like it is working, but very, very slowly ... like making
progress on a dag run every 5 minutes or more....

[image: screenshot 2016-09-22 06 13 13]
https://cloud.githubusercontent.com/assets/581734/18737636/a87b207e-8048-11e6-9379-472c08b35542.png


You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub
#1752 (comment),
or mute the thread
https://github.com/notifications/unsubscribe-auth/AAOXwi6PJMgF7cEHfCmwXtmIIux0KCK2ks5qsg60gaJpZM4Jo9se
.

@r39132
Copy link
Contributor

r39132 commented Sep 22, 2016

I wrote a simpler example :
screenshot 2016-09-22 06 47 11

And when executing, I see something funny!

screenshot 2016-09-22 06 48 35

The task instance for the latest_only operator should not be in skipped state but in SUCCESS state.

@r39132
Copy link
Contributor

r39132 commented Sep 22, 2016

After removing the raise AirflowSkipException('Not latest execution, skipping self.') line, the operator is no longer skipping itself. Please make that change as well as adding your example (and a simpler working example as well) to the example_dags corpus so folks can see them run.

@gwax
Copy link
Contributor Author

gwax commented Sep 22, 2016

Done.

Copy link
Contributor

@r39132 r39132 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor changes needed

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There will be a dag_id name conflict with the other example.

Perhaps call this one latest_only_with_trigger.

Copy link
Contributor

@r39132 r39132 Sep 26, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The name here should match the dag name in the examples... so latest_only_with_trigger. You should change the image name too.

@r39132
Copy link
Contributor

r39132 commented Sep 26, 2016

@gwax after you make the changes above, it should be good to merge.

@gwax gwax force-pushed the latest_only branch 2 times, most recently from 247f4a0 to d7b7f1f Compare September 27, 2016 17:18
@gwax
Copy link
Contributor Author

gwax commented Sep 27, 2016

@r39132 : Done.

@r39132
Copy link
Contributor

r39132 commented Sep 27, 2016

@gwax thx. Now the only issue is that all of the tests are failing.

@r39132
Copy link
Contributor

r39132 commented Sep 27, 2016

Simple fix for this .

Traceback (most recent call last):
  File "/home/travis/build/apache/incubator-airflow/tests/core.py", line 589, in test_import_examples
    self.assertEqual(len(self.dagbag.dags), NUM_EXAMPLE_DAGS)
AssertionError: 18 != 16

Change NUM_EXAMPLE_DAGS = 16 to NUM_EXAMPLE_DAGS = 18 in tests/core.py.

The reason for this check I believe is to detect silent failures in importing examples.

@gwax
Copy link
Contributor Author

gwax commented Sep 27, 2016

Sigh... done.

@r39132
Copy link
Contributor

r39132 commented Sep 27, 2016

+1

@r39132
Copy link
Contributor

r39132 commented Sep 27, 2016

@gwax the problem with the "slowness of the scheduler" and other issues that you and I noticed are being tracked in another jira. The work-around for it is to add min_file_process_interval = 0 under your airflow.cfg scheduler's section ... my scheduler section looks as shown below :

[scheduler]
min_file_process_interval = 0
job_heartbeat_sec = 5
max_threads = 4

@r39132
Copy link
Contributor

r39132 commented Sep 28, 2016

Looks good and works for me.

I have 2 sample DAGs of my own.

Here are some screenshots :
This one shows 2 latest only DAGs that have run 641 DAG runs. The tree views that follow show that all but the latest DAG runs were completed successfully.

screenshot 2016-09-27 20 53 37

The code for latest_only_op_v1 is shown below:

screenshot 2016-09-27 16 53 27

The tree view shows that it skipped 639 runs until the latest one, which it did execute, is shown below:

screenshot 2016-09-27 16 52 02

The code for simple_latest_only_op_v1 is shown below:

screenshot 2016-09-27 16 55 49

The tree view shows that it skipped 639 runs until the latest one, which it did execute, is shown below:

screenshot 2016-09-27 16 51 52

@poulainv
Copy link

poulainv commented Oct 24, 2016

@gwax Thank you for this contribution. It works well and I will use it in production in few days!
However, I notice some limitations in my basic usage and I would like to have your hints about it:

• If my schedule_interval is None it raises an exception because it can not compare NoneType with datetime. (It comes that I unschedule my DAG in order to manually trigger it :/)

• My second point is about manual trigger with a defined schedule_interval. It obviously does not work because it's not the "last". Logs : - Checking latest only with left_window: 2016-10-24 23:00:00 right_window: 2016-10-25 00:00:00 now: 2016-10-24 22:45:41.283157 :)

So, this operator make impossible to manually trigger a DAG. Do you have any workaround ?

@sudowork
Copy link
Contributor

I tried using this operator in production; however, I found some issues. For context, I'm using the CeleryExecutor.

I noticed in my logs for a single DAG run that it appeared the DAG run was reenqueued many times.

[2016-11-22 12:26:27,701] {models.py:154} INFO - Filling up the DagBag from /DAGDIR/my_dag.py
[2016-11-22 12:28:50,335] {models.py:154} INFO - Filling up the DagBag from /DAGDIR/my_dag.py
[2016-11-22 12:28:53,288] {models.py:154} INFO - Filling up the DagBag from /DAGDIR/my_dag.py
[2016-11-22 12:28:58,400] {models.py:1196} INFO -
--------------------------------------------------------------------------------
Starting attempt 1 of 1
--------------------------------------------------------------------------------

[2016-11-22 12:28:59,334] {models.py:1219} INFO - Executing <Task(LatestOnlyOperator): latest_only> on 2016-11-20 04:00:00
[2016-11-22 12:29:00,671] {airflow_next.py:27} INFO - Checking latest only with left_window: 2016-11-21 04:00:00 right_window: 2016-11-22 04:00:00 now: 2016-11-22 12:29:00.670321
[2016-11-22 12:29:00,671] {airflow_next.py:29} INFO - Not latest execution, skipping downstream.
[2016-11-22 12:29:00,672] {airflow_next.py:34} INFO - Skipping task: my_dag
[2016-11-22 12:29:01,397] {airflow_next.py:41} INFO - Done.
[2016-11-22 12:31:13,055] {models.py:154} INFO - Filling up the DagBag from /DAGDIR/my_dag.py
[2016-11-22 12:31:17,899] {models.py:1150} INFO - Task <TaskInstance: my_dag.latest_only 2016-11-20 04:00:00 [success]> previously succeeded on 2016-11-22 12:29:01.397870
[2016-11-22 12:32:31,907] {models.py:154} INFO - Filling up the DagBag from /DAGDIR/my_dag.py
[2016-11-22 12:34:56,522] {models.py:154} INFO - Filling up the DagBag from /DAGDIR/my_dag.py
[2016-11-22 12:35:00,975] {models.py:1150} INFO - Task <TaskInstance: my_dag.latest_only 2016-11-20 04:00:00 [success]> previously succeeded on 2016-11-22 12:29:01.397870
[2016-11-22 12:35:36,323] {models.py:154} INFO - Filling up the DagBag from /DAGDIR/my_dag.py
[2016-11-22 12:38:00,140] {models.py:154} INFO - Filling up the DagBag from /DAGDIR/my_dag.py
[2016-11-22 12:38:05,057] {models.py:1150} INFO - Task <TaskInstance: my_dag.latest_only 2016-11-20 04:00:00 [success]> previously succeeded on 2016-11-22 12:29:01.397870
[2016-11-22 12:38:50,014] {models.py:154} INFO - Filling up the DagBag from /DAGDIR/my_dag.py
[2016-11-22 12:41:07,609] {models.py:154} INFO - Filling up the DagBag from /DAGDIR/my_dag.py
[2016-11-22 12:41:12,232] {models.py:1150} INFO - Task <TaskInstance: my_dag.latest_only 2016-11-20 04:00:00 [success]> previously succeeded on 2016-11-22 12:29:01.397870
[2016-11-22 12:41:45,857] {models.py:154} INFO - Filling up the DagBag from /DAGDIR/my_dag.py
[2016-11-22 12:44:05,354] {models.py:154} INFO - Filling up the DagBag from /DAGDIR/my_dag.py
[2016-11-22 12:44:09,635] {models.py:1150} INFO - Task <TaskInstance: my_dag.latest_only 2016-11-20 04:00:00 [success]> previously succeeded on 2016-11-22 12:29:01.397870
[2016-11-22 12:44:30,851] {models.py:154} INFO - Filling up the DagBag from /DAGDIR/my_dag.py
[2016-11-22 12:46:58,977] {models.py:154} INFO - Filling up the DagBag from /DAGDIR/my_dag.py
[2016-11-22 12:47:02,836] {models.py:1150} INFO - Task <TaskInstance: my_dag.latest_only 2016-11-20 04:00:00 [success]> previously succeeded on 2016-11-22 12:29:01.397870
[2016-11-22 12:48:27,571] {models.py:154} INFO - Filling up the DagBag from /DAGDIR/my_dag.py
[2016-11-22 12:50:54,034] {models.py:154} INFO - Filling up the DagBag from /DAGDIR/my_dag.py
[2016-11-22 12:50:57,951] {models.py:1150} INFO - Task <TaskInstance: my_dag.latest_only 2016-11-20 04:00:00 [success]> previously succeeded on 2016-11-22 12:29:01.397870
[2016-11-22 12:51:21,442] {models.py:154} INFO - Filling up the DagBag from /DAGDIR/my_dag.py
[2016-11-22 12:53:44,461] {models.py:154} INFO - Filling up the DagBag from /DAGDIR/my_dag.py
[2016-11-22 12:53:48,392] {models.py:1150} INFO - Task <TaskInstance: my_dag.latest_only 2016-11-20 04:00:00 [success]> previously succeeded on 2016-11-22 12:29:01.397870
[2016-11-22 12:54:28,745] {models.py:154} INFO - Filling up the DagBag from /DAGDIR/my_dag.py
[2016-11-22 12:56:50,740] {models.py:154} INFO - Filling up the DagBag from /DAGDIR/my_dag.py
[2016-11-22 12:56:54,382] {models.py:1150} INFO - Task <TaskInstance: my_dag.latest_only 2016-11-20 04:00:00 [success]> previously succeeded on 2016-11-22 12:29:01.397870
[2016-11-22 12:57:59,881] {models.py:154} INFO - Filling up the DagBag from /DAGDIR/my_dag.py
[2016-11-22 12:59:04,245] {models.py:154} INFO - Filling up the DagBag from /DAGDIR/my_dag.py
[2016-11-22 12:59:05,666] {models.py:1150} INFO - Task <TaskInstance: my_dag.latest_only 2016-11-20 04:00:00 [success]> previously succeeded on 2016-11-22 12:29:01.397870
[2016-11-22 13:02:18,434] {models.py:154} INFO - Filling up the DagBag from /DAGDIR/my_dag.py

What's interesting is that the DAG status was reporting as success; however, when I checked my Celery broker's queue, there appeared to have been thousands of tasks enqueued. Will attempt to do some more investigation.

@r39132
Copy link
Contributor

r39132 commented Nov 22, 2016

@sudowork Wow! Very interesting! Please open a JIRA for this.
@poulainv This is definitely a regression. One idea is to 'Latest, allowing execution to proceed.' whenever a DAG is manually or externally triggered (i.e. not scheduled). I'll submit a fix for this!

@gwax FYI! Also, @btallman since you use trigger_dag, refer to @poulainv 's comment above.

@r39132
Copy link
Contributor

r39132 commented Nov 22, 2016

@poulainv FYI.. if an issue like this occurs in the future and you don't get a response from the original PR submitter by commenting on the PR, send an email to dev@airflow.incubator.apache.org and someone else might be able to fix it.

@sudowork
Copy link
Contributor

@r39132 Done! https://issues.apache.org/jira/browse/AIRFLOW-648

Let me know if there's more info you'd like me to provide.

@r39132
Copy link
Contributor

r39132 commented Nov 23, 2016

@sudowork Thx. I won't be able to take that bug on.. maybe @gwax can

I'm fixing https://issues.apache.org/jira/browse/AIRFLOW-649 <-- @poulainv

@poulainv
Copy link

poulainv commented Nov 23, 2016

@r39132 thanks to handle this!

@gwax
Copy link
Contributor Author

gwax commented Nov 23, 2016

I'll see if I can find some time over Thanksgiving.

@sudowork
Copy link
Contributor

sudowork commented Dec 21, 2016

@poulainv I ran into this use case today for needing to manually trigger a DAG that has the LatestOnlyOperator in it. My workaround isn't very elegant, but you can trigger the DAG manually using the CLI. Then in the UI, go to the Tree View for the DAG, and then click on the Skipped task. You can the re-run the task manually using the Run button.

@r39132
Copy link
Contributor

r39132 commented Dec 22, 2016

#1956 will be merged shortly! @sudowork @poulainv @gwax @btallman

@sudowork
Copy link
Contributor

Thanks @r39132 ! Just tested and verified it's working for me.

@r39132
Copy link
Contributor

r39132 commented Dec 23, 2016

Great!

@poulainv
Copy link

Cool thanks!

alekstorm pushed a commit to alekstorm/incubator-airflow that referenced this pull request Jun 1, 2017
Dear Airflow Maintainers,

Please accept this PR that addresses the following
issues:
-
https://issues.apache.org/jira/browse/AIRFLOW-198

Testing Done:
- Local testing of dag operation with
LatestOnlyOperator
- Unit test added

Closes apache#1752 from gwax/latest_only
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants