Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions airflow/operators/branch_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,13 @@
# under the License.
"""Branching operators"""

import warnings
from typing import Dict, Iterable, Union

from airflow.models import BaseOperator, SkipMixin

warnings.warn("This module is deprecated. Please use `airflow.operators.python.BranchPythonOperator`.")


class BaseBranchOperator(BaseOperator, SkipMixin):
"""
Expand Down
52 changes: 26 additions & 26 deletions airflow/operators/latest_only_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,12 @@
This module contains an operator to run downstream tasks only for the
latest scheduled DagRun
"""
from typing import Dict, Iterable, Union

import pendulum

from airflow.operators.branch_operator import BaseBranchOperator
from airflow.operators.python import BranchPythonOperator


class LatestOnlyOperator(BaseBranchOperator):
class LatestOnlyOperator(BranchPythonOperator):
"""
Allows a workflow to skip tasks that are not running during the most
recent schedule interval.
Expand All @@ -40,28 +38,30 @@ class LatestOnlyOperator(BaseBranchOperator):

ui_color = '#e9ffdb' # nyanza

def choose_branch(self, context: Dict) -> Union[str, Iterable[str]]:
# If the DAG Run is externally triggered, then return without
# skipping downstream tasks
if context['dag_run'] and context['dag_run'].external_trigger:
def __init__(self, *args, **kwargs):
def python_callable(dag_run, task, dag, execution_date, **_):
# If the DAG Run is externally triggered, then return without
# skipping downstream tasks
if dag_run and dag_run.external_trigger:
self.log.info(
"Externally triggered DAG_Run: allowing execution to proceed.")
return list(task.get_direct_relative_ids(upstream=False))

now = pendulum.utcnow()
left_window = dag.following_schedule(execution_date)
right_window = dag.following_schedule(left_window)
self.log.info(
"Externally triggered DAG_Run: allowing execution to proceed.")
return context['task'].get_direct_relative_ids(upstream=False)
'Checking latest only with left_window: %s right_window: %s now: %s',
left_window, right_window, now
)

now = pendulum.utcnow()
left_window = context['dag'].following_schedule(
context['execution_date'])
right_window = context['dag'].following_schedule(left_window)
self.log.info(
'Checking latest only with left_window: %s right_window: %s now: %s',
left_window, right_window, now
)
if not left_window < now <= right_window:
self.log.info('Not latest execution, skipping downstream.')
# we return an empty list, thus the parent BranchPythonOperator
# won't exclude any downstream tasks from skipping.
return []
else:
self.log.info('Latest, allowing execution to proceed.')
return list(task.get_direct_relative_ids(upstream=False))

if not left_window < now <= right_window:
self.log.info('Not latest execution, skipping downstream.')
# we return an empty list, thus the parent BaseBranchOperator
# won't exclude any downstream tasks from skipping.
return []
else:
self.log.info('Latest, allowing execution to proceed.')
return context['task'].get_direct_relative_ids(upstream=False)
super().__init__(python_callable=python_callable, *args, **kwargs)
25 changes: 13 additions & 12 deletions docs/concepts.rst
Original file line number Diff line number Diff line change
Expand Up @@ -763,25 +763,26 @@ For example:

start_op >> branch_op >> [continue_op, stop_op]

If you wish to implement your own operators with branching functionality, you
can inherit from :class:`~airflow.operators.branch_operator.BaseBranchOperator`,
which behaves similarly to ``BranchPythonOperator`` but expects you to provide
an implementation of the method ``choose_branch``. As with the callable for
``BranchPythonOperator``, this method should return the ID of a downstream task,
or a list of task IDs, which will be run, and all others will be skipped.
Most of the times you can get away with passing a ``python_callable`` to ``BranchPythonOperator``
to create branching logic. However, if you do wish to implement your own operators with branching
functionality, you can inherit from ``BranchPythonOperator`` too. If jinja templating is needed for
the arguments of ``python_callable``, pass them as ``op_args`` or ``op_kwargs`` to
``BranchPythonOperator``.

.. code:: python

class MyBranchOperator(BaseBranchOperator):
def choose_branch(self, context):
class MyBranchOperator(BranchPythonOperator):
def __init__(self, *args, **kwargs):
"""
Run an extra branch on the first day of the month
"""
if context['execution_date'].day == 1:
return ['daily_task_id', 'monthly_task_id']
else:
return 'daily_task_id'
def python_callable(execution_date, **_):
if execution_date.day == 1:
return ['daily_task_id', 'monthly_task_id']
else:
return 'daily_task_id'

super().__init__(python_callable=python_callable, *args, **kwargs)

SubDAGs
=======
Expand Down
177 changes: 0 additions & 177 deletions tests/operators/test_branch_operator.py

This file was deleted.