Skip to content

Conversation

@vincbeck
Copy link
Contributor

Resolves #42513 and #42514.

The purpose of this PR is to send events to assets when some watchers are associated to them. Example:

trigger = MyQueueTrigger(queue="<my_queue>")
asset = Asset("test_asset", watchers=[trigger])

with DAG(
    dag_id="example",
    schedule=[asset],
):
    task = EmptyOperator(task_id="task",)

As part of AIP-82, it is now possible to associate watchers to an asset. These watchers are triggers. By associating a trigger to an asset, whenever the trigger fires, the goal is to update the asset. This PR handles that part, when a trigger fires, it updates its associated triggers.

I also updated the logic behind how the triggers are cleaned up by the triggerer. These triggers that are associated to an assets long as long as the association between the trigger and the asset is defined in the DAG.


^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in newsfragments.

@vincbeck vincbeck requested a review from dstandish November 25, 2024 22:14
@boring-cyborg boring-cyborg bot added area:Scheduler including HA (high availability) scheduler area:Triggerer labels Nov 25, 2024
@tirkarthi
Copy link
Contributor

On trigger restart or reassignment to another triggerer process the coroutine is cancelled and a check for trigger_timeout is done on task_instance where task_instance is None in this case and could be checked with sample patch as below to handle this.

Traceback on trying this out locally with sample dag and ctrl+c to stop the triggerer

[2024-11-26T20:58:28.870+0530] {base_events.py:1744} ERROR - unhandled exception during asyncio.run() shutdown
task: <Task finished name='Task-3' coro=<TriggerRunner.run_trigger() done, defined at /home/karthikeyan/stuff/python/airflow/airflow/jobs/triggerer_job_runner.py:632> exception=AttributeError("'NoneType' object has no attribute 'trigger_timeout'")>
Traceback (most recent call last):
  File "/home/karthikeyan/stuff/python/airflow/airflow/jobs/triggerer_job_runner.py", line 639, in run_trigger
    async for event in trigger.run():
  File "/home/karthikeyan/stuff/python/airflow/airflow/triggers/file.py", line 87, in run
    await asyncio.sleep(self.poke_interval)
  File "/usr/lib/python3.10/asyncio/tasks.py", line 605, in sleep
    return await future
asyncio.exceptions.CancelledError

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/karthikeyan/stuff/python/airflow/airflow/jobs/triggerer_job_runner.py", line 644, in run_trigger
    if timeout := trigger.task_instance.trigger_timeout:
AttributeError: 'NoneType' object has no attribute 'trigger_timeout'

Checking for task_instance on trigger

diff --git a/airflow/jobs/triggerer_job_runner.py b/airflow/jobs/triggerer_job_runner.py
index 8c226334f7..d7e5dbc6b1 100644
--- a/airflow/jobs/triggerer_job_runner.py
+++ b/airflow/jobs/triggerer_job_runner.py
@@ -641,7 +641,7 @@ class TriggerRunner(threading.Thread, LoggingMixin):
                 self.triggers[trigger_id]["events"] += 1
                 self.events.append((trigger_id, event))
         except asyncio.CancelledError:
-            if timeout := trigger.task_instance.trigger_timeout:
+            if timeout := (trigger.task_instance and trigger.task_instance.trigger_timeout):
                 timeout = timeout.replace(tzinfo=timezone.utc) if not timeout.tzinfo else timeout
                 if timeout < timezone.utcnow():
                     self.log.error("Trigger cancelled due to timeout")

Sample dag :

from __future__ import annotations

from datetime import datetime

from airflow import DAG
from airflow.operators.empty import EmptyOperator
from airflow.triggers.file import FileTrigger
from airflow.sdk.definitions.asset import Asset

trigger = FileTrigger(filepath="/tmp/a")
asset = Asset("test_asset_1", watchers=[trigger])

with DAG(
    dag_id="file_trigger_timeout",
    start_date=datetime(2021, 1, 1),
    catchup=False,
    schedule=[asset],
) as dag:
    t1 = EmptyOperator(task_id="t1")

    t1

@tirkarthi
Copy link
Contributor

tirkarthi commented Nov 26, 2024

There are cleanup function implementations that expect task_instance to be always present like #39442 and might fail to cleanup if they are used with asset based use cases though exceptions from cleanup are suppressed so just wondering if cleanup is semantically valid for triggers with assets scenario since it's not mentioned in the AIP.

Edit : Just saw #42514 to handle cleanup so my message might not be valid. Please ignore if not needed. Thanks.

@vincbeck
Copy link
Contributor Author

trigger_timeout

Nice! Thanks for the catch! I'll apply that

@vincbeck
Copy link
Contributor Author

vincbeck commented Nov 26, 2024

There are cleanup function implementations that expect task_instance to be always present like #39442 and might fail to cleanup if they are used with asset based use cases though exceptions from cleanup are suppressed so just wondering if cleanup is semantically valid for triggers with assets scenario since it's not mentioned in the AIP.

Edit : Just saw #42514 to handle cleanup so my message might not be valid. Please ignore if not needed. Thanks.

I think it is fine here because when I read the examples you provided, you cancel external job (external from Airflow) if the task instance is not in deferred state. All the logic here implemented is specific to deferrable operators and should not overlap with this feature. At first I thought the triggers were being cleaned up but here it is external jobs, I dont see it overlapping. Triggers would be another story. But thanks for heads-up!

#42514 is being resolved as part of this PR, therefore the logic handling the trigger cleaned-up is done in that PR (at least on my perspective). So if you think something is missing or off, please call it out :)

@vincbeck vincbeck force-pushed the vincbeck/aip-82-send-update branch from a1550e7 to 9311c51 Compare November 26, 2024 20:18
@vincbeck
Copy link
Contributor Author

On trigger restart or reassignment to another triggerer process the coroutine is cancelled and a check for trigger_timeout is done on task_instance where task_instance is None in this case and could be checked with sample patch as below to handle this.

Traceback on trying this out locally with sample dag and ctrl+c to stop the triggerer

[2024-11-26T20:58:28.870+0530] {base_events.py:1744} ERROR - unhandled exception during asyncio.run() shutdown
task: <Task finished name='Task-3' coro=<TriggerRunner.run_trigger() done, defined at /home/karthikeyan/stuff/python/airflow/airflow/jobs/triggerer_job_runner.py:632> exception=AttributeError("'NoneType' object has no attribute 'trigger_timeout'")>
Traceback (most recent call last):
  File "/home/karthikeyan/stuff/python/airflow/airflow/jobs/triggerer_job_runner.py", line 639, in run_trigger
    async for event in trigger.run():
  File "/home/karthikeyan/stuff/python/airflow/airflow/triggers/file.py", line 87, in run
    await asyncio.sleep(self.poke_interval)
  File "/usr/lib/python3.10/asyncio/tasks.py", line 605, in sleep
    return await future
asyncio.exceptions.CancelledError

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/karthikeyan/stuff/python/airflow/airflow/jobs/triggerer_job_runner.py", line 644, in run_trigger
    if timeout := trigger.task_instance.trigger_timeout:
AttributeError: 'NoneType' object has no attribute 'trigger_timeout'

Checking for task_instance on trigger

diff --git a/airflow/jobs/triggerer_job_runner.py b/airflow/jobs/triggerer_job_runner.py
index 8c226334f7..d7e5dbc6b1 100644
--- a/airflow/jobs/triggerer_job_runner.py
+++ b/airflow/jobs/triggerer_job_runner.py
@@ -641,7 +641,7 @@ class TriggerRunner(threading.Thread, LoggingMixin):
                 self.triggers[trigger_id]["events"] += 1
                 self.events.append((trigger_id, event))
         except asyncio.CancelledError:
-            if timeout := trigger.task_instance.trigger_timeout:
+            if timeout := (trigger.task_instance and trigger.task_instance.trigger_timeout):
                 timeout = timeout.replace(tzinfo=timezone.utc) if not timeout.tzinfo else timeout
                 if timeout < timezone.utcnow():
                     self.log.error("Trigger cancelled due to timeout")

Sample dag :

from __future__ import annotations

from datetime import datetime

from airflow import DAG
from airflow.operators.empty import EmptyOperator
from airflow.triggers.file import FileTrigger
from airflow.sdk.definitions.asset import Asset

trigger = FileTrigger(filepath="/tmp/a")
asset = Asset("test_asset_1", watchers=[trigger])

with DAG(
    dag_id="file_trigger_timeout",
    start_date=datetime(2021, 1, 1),
    catchup=False,
    schedule=[asset],
) as dag:
    t1 = EmptyOperator(task_id="t1")

    t1

It should be fixed now

@vincbeck vincbeck force-pushed the vincbeck/aip-82-send-update branch 2 times, most recently from adca98c to cd3a16f Compare November 26, 2024 22:43
@vincbeck vincbeck force-pushed the vincbeck/aip-82-send-update branch 2 times, most recently from 4fc16a2 to 266e687 Compare December 2, 2024 19:59
@vincbeck vincbeck force-pushed the vincbeck/aip-82-send-update branch from 266e687 to 4e06931 Compare December 2, 2024 20:31
Copy link
Member

@gopidesupavan gopidesupavan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Over all LGTM :)

One question , do we have to re think triggers default_capacity config , with the new event driven it shares the triggers capacity config. so is it okay to have a single config for regular triggers and event driver triggers?

@vincbeck
Copy link
Contributor Author

vincbeck commented Dec 3, 2024

Over all LGTM :)

One question , do we have to re think triggers default_capacity config , with the new event driven it shares the triggers capacity config. so is it okay to have a single config for regular triggers and event driver triggers?

I think so and to me it makes sense. The config default_capacity is on the triggerer level and set the maximum number of triggers one triggerer can run. The triggerer runs triggers from deferrable operators but from event driven scheduling as well, therefore, to me it makes sense default_capacity covers both use cases.

The only issue with this approach is, triggers from event driven scheduling can max out the triggerer (if many many triggers are used to update assets). Event driven scheduling would become a noisy neighbour for deferrable tasks. That'd mean that no deferrable task could run unless some of the triggers used to update the assets (all that defined in DAGs) are removed.

Knowing that the default value of default_capacity is 1000, I dont know if this limit is hit often nor if we should treat them separately. What I can suggest is for now to have one common default_capacity config for all triggers. If some users, or someone with some data demonstrating we should have two default_capacity config (one for deferrable tasks and one for event driven scheduling), then we could do it.

@tirkarthi
Copy link
Contributor

There was a feature request and PR open to assign a specific queue to trigger and triggerer so that the specific triggerer instance listens to specific type of triggers but it's not active. Maybe the feature could be useful here once available to run asset related triggers in a separate triggerer instance and not to be part of the normal triggers.

#33818.

@tirkarthi
Copy link
Contributor

I tested this feature locally and will be useful at work once we upgrade for certain use cases. I am looking forward to how the "infinite scheduling" part is handled in future as noted in the AIP which will further improve usability for us. Thanks @vincbeck .

@gopidesupavan
Copy link
Member

Over all LGTM :)
One question , do we have to re think triggers default_capacity config , with the new event driven it shares the triggers capacity config. so is it okay to have a single config for regular triggers and event driver triggers?

I think so and to me it makes sense. The config default_capacity is on the triggerer level and set the maximum number of triggers one triggerer can run. The triggerer runs triggers from deferrable operators but from event driven scheduling as well, therefore, to me it makes sense default_capacity covers both use cases.

The only issue with this approach is, triggers from event driven scheduling can max out the triggerer (if many many triggers are used to update assets). Event driven scheduling would become a noisy neighbour for deferrable tasks. That'd mean that no deferrable task could run unless some of the triggers used to update the assets (all that defined in DAGs) are removed.

Knowing that the default value of default_capacity is 1000, I dont know if this limit is hit often nor if we should treat them separately. What I can suggest is for now to have one common default_capacity config for all triggers. If some users, or someone with some data demonstrating we should have two default_capacity config (one for deferrable tasks and one for event driven scheduling), then we could do it.

Sure, that makes sense. I've heard from a couple of people two or three instances where they run thousands of triggers in their data pipelines, and am sure these people will leverage this awesome event driven feature effectively once it up, I just wanted to bring up that point here. :)

@vincbeck
Copy link
Contributor Author

vincbeck commented Dec 4, 2024

There was a feature request and PR open to assign a specific queue to trigger and triggerer so that the specific triggerer instance listens to specific type of triggers but it's not active. Maybe the feature could be useful here once available to run asset related triggers in a separate triggerer instance and not to be part of the normal triggers.

#33818.

I agree, that could be indeed useful for that feature. This is definitely something we can do later based on feedbacks/comments from users

@vincbeck
Copy link
Contributor Author

vincbeck commented Dec 4, 2024

I tested this feature locally and will be useful at work once we upgrade for certain use cases. I am looking forward to how the "infinite scheduling" part is handled in future as noted in the AIP which will further improve usability for us. Thanks @vincbeck .

Thanks for testing it!!

@vincbeck
Copy link
Contributor Author

vincbeck commented Dec 4, 2024

Over all LGTM :)
One question , do we have to re think triggers default_capacity config , with the new event driven it shares the triggers capacity config. so is it okay to have a single config for regular triggers and event driver triggers?

I think so and to me it makes sense. The config default_capacity is on the triggerer level and set the maximum number of triggers one triggerer can run. The triggerer runs triggers from deferrable operators but from event driven scheduling as well, therefore, to me it makes sense default_capacity covers both use cases.
The only issue with this approach is, triggers from event driven scheduling can max out the triggerer (if many many triggers are used to update assets). Event driven scheduling would become a noisy neighbour for deferrable tasks. That'd mean that no deferrable task could run unless some of the triggers used to update the assets (all that defined in DAGs) are removed.
Knowing that the default value of default_capacity is 1000, I dont know if this limit is hit often nor if we should treat them separately. What I can suggest is for now to have one common default_capacity config for all triggers. If some users, or someone with some data demonstrating we should have two default_capacity config (one for deferrable tasks and one for event driven scheduling), then we could do it.

Sure, that makes sense. I've heard from a couple of people two or three instances where they run thousands of triggers in their data pipelines, and am sure these people will leverage this awesome event driven feature effectively once it up, I just wanted to bring up that point here. :)

And thanks for doing it :) All these feedbacks/points are very useful when implementing features so thank you for doing it :)

@vincbeck vincbeck merged commit ffcc60f into apache:main Dec 4, 2024
49 checks passed
@vincbeck vincbeck deleted the vincbeck/aip-82-send-update branch December 4, 2024 15:58
@vincbeck vincbeck mentioned this pull request Dec 5, 2024
2 tasks
got686-yandex pushed a commit to got686-yandex/airflow that referenced this pull request Jan 30, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:Scheduler including HA (high availability) scheduler area:Triggerer

Projects

None yet

Development

Successfully merging this pull request may close these issues.

AIP-82. Send event to asset when trigger succeeds

3 participants