Skip to content

Conversation

@kacpermuda
Copy link
Contributor

Adding OpenLineage support for the following operators:

  • S3CopyObjectOperator
  • S3CreateObjectOperator
  • S3DeleteObjectsOperator

^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in newsfragments.

@boring-cyborg boring-cyborg bot added area:providers provider:amazon AWS/Amazon - related issues labels Nov 22, 2023
@kacpermuda kacpermuda force-pushed the feat/s3-simple-operators-lineage branch from 0c9d685 to 87fb99a Compare November 22, 2023 13:53
@vincbeck
Copy link
Contributor

Would you have an example DAG which uses these new functions? Since the code you introduce is never executed in the execute method, I am wondering how that would look like in a DAG

@kacpermuda
Copy link
Contributor Author

kacpermuda commented Nov 22, 2023

Sure. The example DAG would be just simple execution of these operators, as their core function is to send proper lineage events to the OpenLineage backend and not to perform any actions within a service (S3 in this case). Simpliest possible DAG would be:

from airflow.models.dag import DAG

from airflow.providers.amazon.aws.operators.s3 import (
    S3DeleteObjectsOperator,
    S3CreateObjectOperator,
    S3CopyObjectOperator,
)
from airflow.utils.dates import days_ago

BUCKET = "testbucket"

with DAG(
    dag_id="s3_test",
    schedule_interval=None,
    start_date=days_ago(2),
) as dag:
    create_file = S3CreateObjectOperator(
        task_id="create_file",
        dag=dag,
        s3_bucket=BUCKET,
        s3_key="a.csv",
        data="test1",
    )
    copy_file = S3CopyObjectOperator(
        task_id="copy_file",
        dag=dag,
        source_bucket_name=BUCKET,
        source_bucket_key="a.csv",
        dest_bucket_key=f"s3://{BUCKET}/c.csv",
    )

    delete_file = S3DeleteObjectsOperator(task_id="delete_file", dag=dag, bucket=BUCKET, keys=["a.csv", "b.csv"])
    create_file >> copy_file >> delete_file

These methods that I implemented would not be explicitly called anywhere in execute or by the user. They are part of OpenLineage and Airflow integration (similar examples: #35778 or #35660 ) so they would be called just before execute (for _on_start method) and just after execute (for _on_complete method). If You have an OpenLineage backend configured, they would extract the lineage event that will be sent to the backend. Here is an AIP for OL integration.

Please let me know if You need some additional information.

EDIT:
The easiest way to test it is to run this DAG in breeze with OL integration and see those events in Marquez Web UI.

@vincbeck
Copy link
Contributor

Got it! Thanks for the details :) That makes more sense now. Code looks good to me

@o-nikolas o-nikolas merged commit 9e159fc into apache:main Nov 22, 2023
@kacpermuda kacpermuda deleted the feat/s3-simple-operators-lineage branch November 23, 2023 09:32
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:providers provider:amazon AWS/Amazon - related issues

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants