Skip to content

Conversation

@amoghrajesh
Copy link
Contributor

@amoghrajesh amoghrajesh commented Jan 22, 2025

closes: #45752

Why?

When a task completed (moves into success) state, asset related events for that particular task should be stores into the metadata DB. Currently it is done like this: airflow/airflow/models/taskinstance.py.

This does a few things:

  1. It extracts outlets from the task
  2. For every outlet and its asset events, it creates record in the database in various tables:
    asset
    asset_alias
    asset_alias_asset
    asset_alias_asset_event
    asset_event
  3. The support is not limited to Asset type only, it also works with AssetAliases and refs for Assets like name ref and uri ref.

This behaviour needs to be ported into the task sdk.

Approach

The idea is to implement this logic in the patch ti state endpoint in the execution API server. The reasoning is so that we needn't make an additional API call but when "finishing" a task from the task runner, we can send in the relevant details like the task outlets, the outlet events and take care of the rest in ti_update_state endpoint.

Interface changes

  • We have a new payload: TISuccessStatePayload to mark a state as success from the task runner in the execution API -> reason being, we do not want to coagulate the TITerminalStatePayload with additional information slowing down the API request for no need.
  • The structure contains:
    task_outlets: translates to ti.task.outlets at execution time sent from the task runner
    outlet_events: these are the events for a outlet object. For example, for Assets it translates to context["outlet_events"][Asset Object]
    asset_type: we send the type of object (class name) the exeuction API has to deal with. This is to avoid any additional mental gymnastics on the server side to find the kind of object we are dealing with as it will be serialised.

Server Side

  • In the ti_update_state added an additional branch for success state where we call register_asset_changes which is a similar function to
    ti._register_asset_changes(events=context["outlet_events"], session=session)
    but adjusted for the execution API.
  • This function does a few things: for every task_outlet, it registers the events for different types:
  1. For Asset, it receives events from the task runner, so it just registers those.
  2. For AssetNameRef and AssetUriRef, it find the relevant Asset for those and registers the events.
  3. For AssetAlias, it creates a map of number of events to be registered on the basis of unique pairs of tuple[asset uri, extra], and generates events for those by handling some cases. Docs: https://airflow.apache.org/docs/apache-airflow/stable/authoring-and-scheduling/datasets.html#how-to-use-datasetalias

Execution side

Task Runner

The task runner runs the task as usual and before finishing, checks for task outlets defined. If task outlets are defined, it populates the task_outlets, outlet_events and asset_type for the outlets present.

For Asset:

  1. Populates the task_outlets and outlet_events as events[obj] where events = context["outlet_events"]

For AssetNameRef and AssetUriRef:

  1. Populates the task_outlets as needed and populates all the events possible as we cannot access the DB to get the model being referenced

For AssetAlias:
We dont care about the task_outlets, we only care about the asset_alias_events, so those are populated in outlet_events

Once this is done, a SucceedTask is sent to supervisor.

Supervisor

Supervisor starts treating the success state as STATES_SENT_DIRECTLY from now. Once it receives a SuceedTask message from the task runner, it calls

            self.client.task_instances.succeed(
                id=self.id,
                when=msg.end_date,
                task_outlets=msg.task_outlets,
                outlet_events=msg.outlet_events,
                asset_type=msg.asset_type,
            )
HTTP client in task sdk

Introduced a new method called: succeed which will call the ti_patch_state api with TISuccessStatePayload

Testing

Using the DAG: https://github.com/apache/airflow/blob/main/airflow/example_dags/example_asset_alias.py

DAGS:
image

Non aliases assets

  1. Unpause: asset_s3_bucket_consumer and asset_s3_bucket_producer
  2. Run the producer dag first:
    image

Event:

{"json":"{\"state\":\"success\",\"end_date\":\"2025-01-22T10:01:24.675957Z\",\"task_outlets\":[{\"name\":\"s3://bucket/my-task\",\"uri\":\"s3://bucket/my-task\"}],\"outlet_events\":[{\"key\":{\"name\":\"s3://bucket/my-task\",\"uri\":\"s3://bucket/my-task\"},\"extra\":{},\"asset_alias_events\":[]}],\"asset_type\":\"Asset\",\"type\":\"SucceedTask\"}\n","timestamp":"2025-01-22T10:01:24.676014","logger":"task","event":"Sending request","level":"debug"}

Consumer DAG also gets triggered:
image

Aliases assets

  1. Unpause: asset_alias_example_alias_producer and asset_alias_example_alias_consumer
  2. Trigger asset_alias_example_alias_producer dag
image

Event:

{"json":"{\"state\":\"success\",\"end_date\":\"2025-01-22T10:05:01.268374Z\",\"task_outlets\":[],\"outlet_events\":[{\"source_alias_name\":\"example-alias\",\"dest_asset_key\":{\"name\":\"s3://bucket/my-task\",\"uri\":\"s3://bucket/my-task\"},\"extra\":{}}],\"asset_type\":\"AssetAlias\",\"type\":\"SucceedTask\"}\n","timestamp":"2025-01-22T10:05:01.268428","logger":"task","event":"Sending request","level":"debug"}

This triggers two DAGs now: asset_alias_example_alias_consumer and asset_s3_bucket_consumer since the aliased asset was updated

Fails due to inlet_events still not being ported:
image

image image

DB level checks

Asset created:
image

Asset Alias:
image

Asset Alias Asset mapping:
image

Asset event: (first one triggered by asset, second by alias)
image

Alias Event mapping:
image


^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in newsfragments.

@amoghrajesh
Copy link
Contributor Author

Working on fixing the tests

@amoghrajesh amoghrajesh requested a review from XD-DENG as a code owner January 23, 2025 06:59
@amoghrajesh amoghrajesh requested a review from ashb January 23, 2025 07:05
@amoghrajesh
Copy link
Contributor Author

amoghrajesh commented Jan 23, 2025

With the new changes, tested both for legacy and task sdk DAGs (cc @ashb)

Legacy Results:
image

Task SDK Results:
(asset_s3_bucket_producer first 2 failures are unrelated and the failure for asset_alias_example_alias_consumer is because of inlet_events not yet woirking)
image

@amoghrajesh
Copy link
Contributor Author

amoghrajesh commented Jan 23, 2025

Looks like the changes in this PR break some test cases for inlet_events. Looking into it

@amoghrajesh
Copy link
Contributor Author

Legacy without my changes:

2f107d64f771
 ▶ Log message source details
[2025-01-23, 09:14:33 UTC] {local_task_job_runner.py:120} ▶ Pre task execution logs
[2025-01-23, 09:14:33 UTC] {logging_mixin.py:212} INFO - AssetEvent(id=2, asset_id=1, extra={}, source_task_id='produce_asset_events_through_asset_alias', source_dag_id='asset_alias_example_alias_producer', source_run_id='manual__2025-01-23T09:14:17.503997+00:00', source_map_index=-1, source_aliases=[AssetAliasModel(name='example-alias')])
[2025-01-23, 09:14:33 UTC] {python.py:198} INFO - Done. Returned value was: None
[2025-01-23, 09:14:33 UTC] {taskinstance.py:331} ▶ Post task execution logs

Legacy with my changes:

ae19b24f9ec4
 ▶ Log message source details
[2025-01-23, 09:18:54 UTC] {local_task_job_runner.py:120} ▶ Pre task execution logs
[2025-01-23, 09:18:54 UTC] {logging_mixin.py:212} INFO - AssetEvent(id=2, asset_id=1, extra={}, source_task_id='produce_asset_events_through_asset_alias', source_dag_id='asset_alias_example_alias_producer', source_run_id='manual__2025-01-23T09:18:36.263671+00:00', source_map_index=-1, source_aliases=[AssetAliasModel(name='example-alias')])
[2025-01-23, 09:18:54 UTC] {python.py:198} INFO - Done. Returned value was: None
[2025-01-23, 09:18:54 UTC] {taskinstance.py:332} ▶ Post task execution logs

Which is the same, so likely its a change needed in the test cases.

@amoghrajesh
Copy link
Contributor Author

Ok I think I figured out the reason for failure, working on a fix

Copy link
Member

@ashb ashb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking better. A few tidy ups to do but def on the right track.

@amoghrajesh amoghrajesh requested a review from ashb January 23, 2025 11:50
@amoghrajesh
Copy link
Contributor Author

I will just resolve the conversations with relevant replies, rebase & merge this

@amoghrajesh amoghrajesh merged commit bb77ebf into apache:main Jan 24, 2025
63 checks passed
@amoghrajesh amoghrajesh deleted the AIP72-outlet-events-sdk branch January 24, 2025 06:41
@utkarsharma2 utkarsharma2 added type:improvement Changelog: Improvements type:new-feature Changelog: New Features and removed type:improvement Changelog: Improvements labels Jan 27, 2025
@utkarsharma2 utkarsharma2 added this to the Airflow 3.0.0 milestone Jan 27, 2025
gpathak128 pushed a commit to gpathak128/airflow that referenced this pull request Jan 29, 2025
got686-yandex pushed a commit to got686-yandex/airflow that referenced this pull request Jan 30, 2025
niklasr22 pushed a commit to niklasr22/airflow that referenced this pull request Feb 8, 2025
ambika-garg pushed a commit to ambika-garg/airflow that referenced this pull request Feb 17, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:task-sdk type:new-feature Changelog: New Features

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Port Registering of Asset Changes to Task SDK

3 participants