-
Notifications
You must be signed in to change notification settings - Fork 16.4k
feat: automatically inject OL info into spark job in DataprocSubmitJobOperator #44477
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: automatically inject OL info into spark job in DataprocSubmitJobOperator #44477
Conversation
ee38e20 to
7ed0872
Compare
providers/src/airflow/providers/google/cloud/operators/dataproc.py
Outdated
Show resolved
Hide resolved
providers/src/airflow/providers/google/cloud/operators/dataproc.py
Outdated
Show resolved
Hide resolved
e96ac0e to
ac60867
Compare
ac60867 to
6b1a2a0
Compare
providers/src/airflow/providers/common/compat/openlineage/utils/spark.py
Fixed
Show fixed
Hide fixed
6b1a2a0 to
6eed40c
Compare
6eed40c to
c63e132
Compare
c63e132 to
82abaee
Compare
MaksYermak
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
|
Hi, are we planning on also emitting the lineage events from Airflow itself? I think we have other services that emit lineage (for example, BigQuery) where we also still emit this lineage from Airflow. For example, in Composer, we generate the events based on the SQL query of Hive, SparkSQL, Presto and Trino jobs. |
|
Hey @ahidalgob, just to confirm I understand you correctly: are you asking if we plan to emit the lineage from the child job (in this case, Spark) directly from Airflow? As of now, there aren’t any plans for that that I'm aware of. In my opinion, it’s a bit more complex to implement compared to a SQL-based approach, where we can parse the SQL on the Airflow side and occasionally patch it with API calls to BigQuery or similar solutions. Extracting lineage from a Spark jar, which can do virtually anything, is more challenging. For now, I’m focusing on making it easier for users to configure Spark integration, without changing the entity responsible for emitting the events. |
|
Hi @kacpermuda , what I meant was exactly this you describe: parsing the SQL query on Airflow side and generating the inputs/outputs. Right now this PR only as you confirmed only configures how Spark generates the lineage events but doesn't generate from Airflow side, right? |
|
Correct, this feature is only about automatically passing some OpenLineage information from Airflow to Spark to automate the process of configuring the OpenLineage/Spark integration. |
|
Thanks @kacpermuda, we would like to contribute the logic we used in Composer to generate the events from the SQL queries in other DataprocSubmitJob types. I think this PR and what we want to contribute are not incompatible, does it sound good to you? (also @mobuchowski ) |
|
@ahidalgob I don't think that's right, since you can submit JAR with arbitrary code rather than just SQL. Also, even for SQL jobs, rather than using parser (which is a best effort solution) we can use Spark integration that actually understands the uploaded jobs. Airflow events here can contribute proper hierarchy. |
82abaee to
1a99aed
Compare
2ac8894 to
18bc511
Compare
|
The failing trino test comes from changes made in #44717. Waiting for the fixing PR as it's on the way by the author. |
18bc511 to
9585316
Compare
|
To make sure I understand the target state:
I think it's fair for each layer of orchestration to emit metadata that it has access to, for example depending on the Spark job type/implementation, the low level information about Spark execution, or, in case of Airflow, information about DAG/task/Airflow deployment. For Airflow itself, to construct such lineage event, Airflow needs to be aware of the input/output assets (as long as we cannot link lineage events only by process identifier). SQL parsing can be a way to get this information for SQL-like jobs, in case of other types of jobs (not necessarily Spark jobs) we can for example query the service the operator is integrated with (e.g. with BigQuery jobs - we could query BigQuery API to get that information and emit event linking input/output assets with BigQuery job id, and DAG/Task/Airflow deployment id). |
Correct, that is the target state in my opinion, Airflow events will still be emitted without any changes. For now we are simply automating the transfer of some additional information to Spark integration (but people have been doing this manually until now with OL provided macros). |
a6433b8 to
bfb553b
Compare
…bOperator Signed-off-by: Kacper Muda <mudakacper@gmail.com>
bfb553b to
d525026
Compare
…bOperator (apache#44477) Signed-off-by: Kacper Muda <mudakacper@gmail.com>
This PR introduces a completely new feature to OpenLineage integration. It will NOT impact users that are not using OpenLineage or have not explicitly enabled this feature (False by default).
TLDR;
When explicitly enabled by the user for supported operators, we will automatically inject parent job information into the Spark job properties. For example, when submitting a Spark job using the DataprocSubmitJobOperator, we will include details about the Airflow task that triggered it so that the OpenLineage Spark integration can include them in parentRunFacet.
Why ?
To enable full pipeline visibility and track dependencies between jobs in OpenLineage, we utilize the parentRunFacet. This facet stores the identifier of the parent job that triggered the current job. This approach works across various integrations, f.e. you can pass Airflow’s job identifier to a Spark application if it was triggered by an Airflow operator. Currently, this process requires manual configuration by the user, such as leveraging macros:
Understanding how various Airflow operators configure Spark allows us to automatically inject parent job information.
Controlling the Behavior
We provide users with a flexible control mechanism to manage this injection, combining per-operator enablement with a global fallback configuration. This design is inspired by the
deferrableargument in Airflow.Each supported operator will include an argument like
ol_inject_parent_job_info, which defaults to the global configuration value ofopenlineage.spark_inject_parent_job_info. This approach allows users to:This design ensures both flexibility and ease of use, enabling users to fine-tune their workflows while minimizing repetitive configuration. I am aware that adding an OpenLineage-related argument to the operator will affect all users, even those not using OpenLineage, but since it defaults to False and can be ignored, I hope this will not pose any issues.
How?
The implementation is divided into three parts for better organization and clarity:
Operator's Code (including the
executemethod):Contains minimal logic to avoid overwhelming users who are not actively working with OpenLineage.
Google's Provider OpenLineage Utils File:
Handles the logic for accessing Spark properties specific to a given operator or job.
OpenLineage Provider's Utils:
Responsible for creating / extracting all necessary information in a format compatible with the OpenLineage Spark integration. We are also performing modifications to the Spark properties here.
For some operators parts 1 and 2 may be in the operator's code. In general, the specific operator / provider will know how to get the spark properties and the OL will know what to inject and do the injection itself.
Next steps
Expand Operator Coverage:
Increase support for additional operators by extending the parent job information injection to cover more cases.
Automate Transport Configuration:
Implement similar automation for transport configurations, starting with HTTP, to streamline the integration process.
^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named
{pr_number}.significant.rstor{issue_number}.significant.rst, in newsfragments.