Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 41 additions & 39 deletions tests/decorators/test_python.py
Original file line number Diff line number Diff line change
Expand Up @@ -1000,50 +1000,52 @@ def down(a: str):


def test_teardown_trigger_rule_selective_application(dag_maker, session):
with dag_maker(session=session) as dag:

@dag.task
def my_work():
return "abc"

@setup
@dag.task
def my_setup():
return "abc"

@teardown
@dag.task
def my_teardown():
return "abc"

work_task = my_work()
setup_task = my_setup()
teardown_task = my_teardown()
with dag_maker(session=session, serialized=True) as created_dag:
dag = created_dag

@dag.task
def my_work():
return "abc"

@setup
@dag.task
def my_setup():
return "abc"

@teardown
@dag.task
def my_teardown():
return "abc"

work_task = my_work()
setup_task = my_setup()
teardown_task = my_teardown()
assert work_task.operator.trigger_rule == TriggerRule.ALL_SUCCESS
assert setup_task.operator.trigger_rule == TriggerRule.ALL_SUCCESS
assert teardown_task.operator.trigger_rule == TriggerRule.ALL_DONE_SETUP_SUCCESS


def test_teardown_trigger_rule_override_behavior(dag_maker, session):
with dag_maker(session=session) as dag:

@dag.task(trigger_rule=TriggerRule.ONE_SUCCESS)
def my_work():
return "abc"

@setup
@dag.task(trigger_rule=TriggerRule.ONE_SUCCESS)
def my_setup():
return "abc"

@teardown
@dag.task(trigger_rule=TriggerRule.ONE_SUCCESS)
def my_teardown():
return "abc"

work_task = my_work()
setup_task = my_setup()
with pytest.raises(Exception, match="Trigger rule not configurable for teardown tasks."):
my_teardown()
with dag_maker(session=session, serialized=True) as created_dag:
dag = created_dag

@dag.task(trigger_rule=TriggerRule.ONE_SUCCESS)
def my_work():
return "abc"

@setup
@dag.task(trigger_rule=TriggerRule.ONE_SUCCESS)
def my_setup():
return "abc"

@teardown
@dag.task(trigger_rule=TriggerRule.ONE_SUCCESS)
def my_teardown():
return "abc"

work_task = my_work()
setup_task = my_setup()
with pytest.raises(Exception, match="Trigger rule not configurable for teardown tasks."):
my_teardown()
assert work_task.operator.trigger_rule == TriggerRule.ONE_SUCCESS
assert setup_task.operator.trigger_rule == TriggerRule.ONE_SUCCESS