Skip to content

Using the LatestOnlyOperator results in tasks being reenqueued many times over #7969

@dimberman

Description

@dimberman

Apache Airflow version: 1.8.0

Ticket was created 23/Nov/16 01:12

Kubernetes version (if you are using kubernetes) (use kubectl version):

Environment:

  • Cloud provider or hardware configuration:
  • OS (e.g. from /etc/os-release):
  • Kernel (e.g. uname -a):
  • Install tools:
  • Others:
    What happened:

We ported over a number of our cronjobs to run using Airflow. To achieve the desired behavior, we use the LatestOnlyOperator that was merged to master by pull request 1752: #1752.

When we moved over our cronjobs, we migrated many at a time (using ansible). These DAGs had a start date going back a few days.

The first thing I noticed is that it seemed to take a long time to process the backfilled DAGs. They were being processed correctly, in the sense that the 'latest_only' operator was being completely successfully, and the downstream BashOperator was marked as skipped. It also appeared that the DAG run completed successfully in the tree view. However, when I searched the DAG runs for state contains running, I saw that they were present.

One thing I noticed was that in the logs for one of the "stuck" DAG runs, it appeared that the 'latest_only' task was processed multiple times.

[2016-11-22 12:26:27,701] {models.py:154} INFO - Filling up the DagBag from /DAGDIR/my_dag.py
[2016-11-22 12:28:50,335] {models.py:154} INFO - Filling up the DagBag from /DAGDIR/my_dag.py
[2016-11-22 12:28:53,288] {models.py:154} INFO - Filling up the DagBag from /DAGDIR/my_dag.py
[2016-11-22 12:28:58,400] {models.py:1196} INFO -

Starting attempt 1 of 1

[2016-11-22 12:28:59,334] {models.py:1219} INFO - Executing <Task(LatestOnlyOperator): latest_only> on 2016-11-20 04:00:00
[2016-11-22 12:29:00,671] {airflow_next.py:27} INFO - Checking latest only with left_window: 2016-11-21 04:00:00 right_window: 2016-11-22 04:00:00 now: 2016-11-22 12:29:00.670321
[2016-11-22 12:29:00,671] {airflow_next.py:29} INFO - Not latest execution, skipping downstream.
[2016-11-22 12:29:00,672] {airflow_next.py:34} INFO - Skipping task: my_dag
[2016-11-22 12:29:01,397] {airflow_next.py:41} INFO - Done.
[2016-11-22 12:31:13,055] {models.py:154} INFO - Filling up the DagBag from /DAGDIR/my_dag.py
[2016-11-22 12:31:17,899] {models.py:1150} INFO - Task <TaskInstance: my_dag.latest_only 2016-11-20 04:00:00 [success]> previously succeeded on 2016-11-22 12:29:01.397870
[2016-11-22 12:32:31,907] {models.py:154} INFO - Filling up the DagBag from /DAGDIR/my_dag.py
[2016-11-22 12:34:56,522] {models.py:154} INFO - Filling up the DagBag from /DAGDIR/my_dag.py
[2016-11-22 12:35:00,975] {models.py:1150} INFO - Task <TaskInstance: my_dag.latest_only 2016-11-20 04:00:00 [success]> previously succeeded on 2016-11-22 12:29:01.397870
[2016-11-22 12:35:36,323] {models.py:154} INFO - Filling up the DagBag from /DAGDIR/my_dag.py
[2016-11-22 12:38:00,140] {models.py:154} INFO - Filling up the DagBag from /DAGDIR/my_dag.py
[2016-11-22 12:38:05,057] {models.py:1150} INFO - Task <TaskInstance: my_dag.latest_only 2016-11-20 04:00:00 [success]> previously succeeded on 2016-11-22 12:29:01.397870
[2016-11-22 12:38:50,014] {models.py:154} INFO - Filling up the DagBag from /DAGDIR/my_dag.py
[2016-11-22 12:41:07,609] {models.py:154} INFO - Filling up the DagBag from /DAGDIR/my_dag.py
[2016-11-22 12:41:12,232] {models.py:1150} INFO - Task <TaskInstance: my_dag.latest_only 2016-11-20 04:00:00 [success]> previously succeeded on 2016-11-22 12:29:01.397870
[2016-11-22 12:41:45,857] {models.py:154} INFO - Filling up the DagBag from /DAGDIR/my_dag.py
[2016-11-22 12:44:05,354] {models.py:154} INFO - Filling up the DagBag from /DAGDIR/my_dag.py
[2016-11-22 12:44:09,635] {models.py:1150} INFO - Task <TaskInstance: my_dag.latest_only 2016-11-20 04:00:00 [success]> previously succeeded on 2016-11-22 12:29:01.397870
[2016-11-22 12:44:30,851] {models.py:154} INFO - Filling up the DagBag from /DAGDIR/my_dag.py
[2016-11-22 12:46:58,977] {models.py:154} INFO - Filling up the DagBag from /DAGDIR/my_dag.py
[2016-11-22 12:47:02,836] {models.py:1150} INFO - Task <TaskInstance: my_dag.latest_only 2016-11-20 04:00:00 [success]> previously succeeded on 2016-11-22 12:29:01.397870
[2016-11-22 12:48:27,571] {models.py:154} INFO - Filling up the DagBag from /DAGDIR/my_dag.py
[2016-11-22 12:50:54,034] {models.py:154} INFO - Filling up the DagBag from /DAGDIR/my_dag.py
[2016-11-22 12:50:57,951] {models.py:1150} INFO - Task <TaskInstance: my_dag.latest_only 2016-11-20 04:00:00 [success]> previously succeeded on 2016-11-22 12:29:01.397870
[2016-11-22 12:51:21,442] {models.py:154} INFO - Filling up the DagBag from /DAGDIR/my_dag.py
[2016-11-22 12:53:44,461] {models.py:154} INFO - Filling up the DagBag from /DAGDIR/my_dag.py
[2016-11-22 12:53:48,392] {models.py:1150} INFO - Task <TaskInstance: my_dag.latest_only 2016-11-20 04:00:00 [success]> previously succeeded on 2016-11-22 12:29:01.397870
[2016-11-22 12:54:28,745] {models.py:154} INFO - Filling up the DagBag from /DAGDIR/my_dag.py
[2016-11-22 12:56:50,740] {models.py:154} INFO - Filling up the DagBag from /DAGDIR/my_dag.py
[2016-11-22 12:56:54,382] {models.py:1150} INFO - Task <TaskInstance: my_dag.latest_only 2016-11-20 04:00:00 [success]> previously succeeded on 2016-11-22 12:29:01.397870
[2016-11-22 12:57:59,881] {models.py:154} INFO - Filling up the DagBag from /DAGDIR/my_dag.py
[2016-11-22 12:59:04,245] {models.py:154} INFO - Filling up the DagBag from /DAGDIR/my_dag.py
[2016-11-22 12:59:05,666] {models.py:1150} INFO - Task <TaskInstance: my_dag.latest_only 2016-11-20 04:00:00 [success]> previously succeeded on 2016-11-22 12:29:01.397870
[2016-11-22 13:02:18,434] {models.py:154} INFO - Filling up the DagBag from /DAGDIR/my_dag.py

We use CeleryExecutor backed by Redis. When inspecting the Redis key itself that held the queue, I verified that duplicate tasks were in there; there were thousnands of tasks in the queue. Here are the tasks that were at the head of the task list (the dag names are changed for readability):

[
u"['airflow run DAG_01 latest_only 2016-11-18T09:23:00 --local -sd DAGS_FOLDER/DAG_01.py ']",
u"['airflow run DAG_02 latest_only 2016-11-18T03:45:00 --local -sd DAGS_FOLDER/DAG_02.py ']",
u"['airflow run DAG_03 latest_only 2016-11-18T00:08:00 --local -sd DAGS_FOLDER/DAG_03.py ']",
u"['airflow run DAG_04 latest_only 2016-11-22T10:40:00 --local -sd DAGS_FOLDER/DAG_04.py ']",
u"['airflow run DAG_05 latest_only 2016-11-18T05:00:00 --local -sd DAGS_FOLDER/DAG_05.py ']",
u"['airflow run DAG_06 latest_only 2016-11-22T11:20:00 --local -sd DAGS_FOLDER/DAG_06.py ']",
u"['airflow run DAG_07 latest_only 2016-11-18T10:14:00 --local -sd DAGS_FOLDER/DAG_07.py ']",
u"['airflow run DAG_01 latest_only 2016-11-18T09:23:00 --local -sd DAGS_FOLDER/DAG_01.py ']",
u"['airflow run DAG_02 latest_only 2016-11-18T03:45:00 --local -sd DAGS_FOLDER/DAG_02.py ']",
u"['airflow run DAG_03 latest_only 2016-11-18T00:08:00 --local -sd DAGS_FOLDER/DAG_03.py ']",
u"['airflow run DAG_04 latest_only 2016-11-22T10:40:00 --local -sd DAGS_FOLDER/DAG_04.py ']",
u"['airflow run DAG_05 latest_only 2016-11-18T05:00:00 --local -sd DAGS_FOLDER/DAG_05.py ']",
u"['airflow run DAG_06 latest_only 2016-11-22T11:20:00 --local -sd DAGS_FOLDER/DAG_06.py ']",
u"['airflow run DAG_07 latest_only 2016-11-18T10:14:00 --local -sd DAGS_FOLDER/DAG_07.py ']"
]

Grepping the scheduler's logs, here is one of the instances I see of the scheduler enqueuing a duplicate task:

$ grep -A2 "DAG_01" /var/log/airflow/airflow-scheduler.log | grep -A2 "09:23"
[2016-11-22 13:24:26,660] {models.py:2660} INFO - Checking state for <DagRun DAG_01 @ 2016-11-18 09:23:00: scheduled__2016-11-18T09:23:00, externally triggered: False>
[2016-11-22 13:24:26,672] {jobs.py:498} INFO - Getting list of tasks to skip for active runs.
[2016-11-22 13:24:26,678] {jobs.py:514} INFO - Checking dependencies on 2 tasks instances, minus 0 skippable ones
[2016-11-22 13:24:26,726] {base_executor.py:36} INFO - Adding to queue: airflow run DAG_01 latest_only 2016-11-18T09:23:00 --local -sd DAGS_FOLDER/DAG_01.py
[2016-11-22 13:24:26,769] {jobs.py:498} INFO - Getting list of tasks to skip for active runs.
[2016-11-22 13:24:26,769] {jobs.py:514} INFO - Checking dependencies on 0 tasks instances, minus 0 skippable ones

[2016-11-22 13:24:31,830] {models.py:2660} INFO - Checking state for <DagRun DAG_01 @ 2016-11-18 09:23:00: scheduled__2016-11-18T09:23:00, externally triggered: False>
[2016-11-22 13:24:31,832] {jobs.py:498} INFO - Getting list of tasks to skip for active runs.
[2016-11-22 13:24:31,832] {jobs.py:514} INFO - Checking dependencies on 0 tasks instances, minus 0 skippable ones

[2016-11-22 13:24:37,238] {models.py:2660} INFO - Checking state for <DagRun DAG_01 @ 2016-11-18 09:23:00: scheduled__2016-11-18T09:23:00, externally triggered: False>
[2016-11-22 13:24:37,240] {jobs.py:498} INFO - Getting list of tasks to skip for active runs.
[2016-11-22 13:24:37,252] {jobs.py:498} INFO - Getting list of tasks to skip for active runs.

[2016-11-22 13:24:45,736] {models.py:2660} INFO - Checking state for <DagRun DAG_01 @ 2016-11-18 09:23:00: scheduled__2016-11-18T09:23:00, externally triggered: False>
[2016-11-22 13:24:45,744] {jobs.py:498} INFO - Getting list of tasks to skip for active runs.
[2016-11-22 13:24:45,756] {jobs.py:514} INFO - Checking dependencies on 2 tasks instances, minus 0 skippable ones

[2016-11-22 13:24:56,613] {models.py:2660} INFO - Checking state for <DagRun DAG_01 @ 2016-11-18 09:23:00: scheduled__2016-11-18T09:23:00, externally triggered: False>
[2016-11-22 13:24:56,624] {jobs.py:498} INFO - Getting list of tasks to skip for active runs.
[2016-11-22 13:24:56,638] {jobs.py:514} INFO - Checking dependencies on 2 tasks instances, minus 0 skippable ones

[2016-11-22 13:24:56,680] {base_executor.py:36} INFO - Adding to queue: airflow run DAG_01 latest_only 2016-11-18T09:23:00 --local -sd DAGS_FOLDER/DAG_01.py
[2016-11-22 13:24:56,823] {jobs.py:498} INFO - Getting list of tasks to skip for active runs.
[2016-11-22 13:24:56,824] {jobs.py:514} INFO - Checking dependencies on 0 tasks instances, minus 0 skippable ones

Eventually, we ended up just creating new DAG definitions with future start dates and manually cleared the Redis queue.

Additional Context:

Our scheduler is daemonized by upstart and runs with -n 5

Here is the template we use for our cron DAGs (note it's a jinja2 template):

{{ansible_managed}}

from dateutil import parser

from airflow.operators import LatestOnlyOperator
from airflow.operators import BashOperator
from airflow.models import DAG

args = {
'owner': 'airflow',
'start_date': parser.parse('{{item.start_date}}'),
'retries': 0,
}

dag = DAG(
dag_id='{{item.name}}',
default_args=args,
schedule_interval='{{item.schedule}}',
max_active_runs=1,
)

latest_only = LatestOnlyOperator(task_id='latest_only', dag=dag)

script = BashOperator(
task_id='{{item.name}}',
bash_command='{{item.command}}',
default_args=args,
dag=dag,
)

script.set_upstream(latest_only)

One thing to note: We are on Airflow 2.7.1.3; however, we brought in the operator through a plugin:

import datetime
import logging

from airflow.models import BaseOperator, TaskInstance
from airflow.plugins_manager import AirflowPlugin
from airflow.utils.state import State
from airflow import settings

class LatestOnlyOperator(BaseOperator):
"""
Allows a workflow to skip tasks that are not running during the most
recent schedule interval.
If the task is run outside of the latest schedule interval, all
directly downstream tasks will be skipped.
"""

ui_color = '#e9ffdb'  # nyanza

def execute(self, context):
    now = datetime.datetime.now()
    left_window = context['dag'].following_schedule(
        context['execution_date'])
    right_window = context['dag'].following_schedule(left_window)
    logging.info(
        'Checking latest only with left_window: %s right_window: %s '
        'now: %s', left_window, right_window, now)
    if not left_window < now <= right_window:
        logging.info('Not latest execution, skipping downstream.')
        session = settings.Session()
        for task in context['task'].downstream_list:
            ti = TaskInstance(
                task, execution_date=context['ti'].execution_date)
            logging.info('Skipping task: %s', ti.task_id)
            ti.state = State.SKIPPED
            ti.start_date = now
            ti.end_date = now
            session.merge(ti)
        session.commit()
        session.close()
        logging.info('Done.')
    else:
        logging.info('Latest, allowing execution to proceed.')

class AirflowNextPlugin(AirflowPlugin):
name = "airflow_next"
operators = [LatestOnlyOperator]

What you expected to happen:

How to reproduce it:

Anything else we need to know:

Moved here from https://issues.apache.org/jira/browse/AIRFLOW-648

Metadata

Metadata

Assignees

No one assigned

    Labels

    kind:bugThis is a clearly a bug

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions