-
Notifications
You must be signed in to change notification settings - Fork 16.4k
Push Spark appId to XCOM for LivyOperator with deferrable mode #31201
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Push Spark appId to XCOM for LivyOperator with deferrable mode #31201
Conversation
phanikumv
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rebase your PR please
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we just move L151 context["ti"].xcom_push(key="app_id", value=self.get_hook().get_batch(self._batch_id)["appId"]) just before L148 i.e if not self.deferrable: or we want to push only if it reaches terminal state?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, I think we should add a test for this to avoid regression
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, currently we push the Spark appId not for all terminal states but only in case of successful run of the batch job.
If we implement your suggestion @pankajastro it would be an early push to XCOM irrespective of the batch job final status. It will depend if the downstream tasks need this XCOM value even if the task has failed. I would avoid pushing an additional XCOM record in the metadata database if it is not really needed.
@bdsoha do you think from your experience that pushing the appId to XCOM would benefit or might be needed for downstream tasks even if the task has failed? (Tagging you as you had created the original PR and I am hoping you would have some experiences on the usage & need 🙂)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It was very useful, as I used in to retrieve application logs from yarn following a tasks execution.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@bdsoha thanks. But do we need it in case of failed tasks too? Or just successful tasks? Because at the moment the place you have added this in is only after successful completions.
@potiuk what do you think? Is it an overhead if we push to XCOM backend in case of failed tasks too? Or it could be fine that we push the XCOM early irrespective of the task status?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But if you have the information in hand, what's wrong with storing it? We should not use the presence of such information as a proxy for success or failure anyway (since we have actual status available).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, I am curious to understand the general consensus. If no downstream tasks are going to pull the XCOM information, would it clean up the record in the database or it stay in there forever? My thought process is that if it's not going to get used (maybe users need it? not sure) then why populate the db with 1 more record?
And I tried to follow the previous commit to publish only in case of success and apparently, we have not got feedback yet that it might be needed for failed cases too, no? If the general consensus across the project is to push only for success then perhaps we could wait until users come up with this need for this operator that they also need this for failed tasks. What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i think it's fine to wait until needed.
but i thought that's what @bdsoha was saying here:
It was very useful, as I used in to retrieve application logs from yarn following a tasks execution.
re the "wasted record" concern, i am not concerned with that. we're talking about "exceptional" circumstances, when the creation fails right? well, that isn't what happens ordinarily. so you're talking about a small number of cases. but, if it was successful the record would be created anyway and you it's certainly no worse. in the long run, all airflow databases need cleanup / purging of old records. and that's why we added a helper command for that.
i think @potiuk's concern is about consistency. consistency is good but also we can make steps in a direction without necessarily always having to update all operators to fit a certain convention. and there is inevitably variation in behavior between operators, each of which may be tailored to different services and different use cases, so one always has to know what the behavior is in order to use it properly. and example dags and docs are there to help with that.
related note: xcoms are cleared when tasks are cleared so it doesn't have an impact there.
so, i'm not saying you need to push xcom immediately just saying i don't see a probem with doing so if there's a good use case for it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
understood.
wrt. to @bdsoha's comment, they added the XCOM push in their PR https://github.com/apache/airflow/pull/27376/files#diff-b8586f3007fabbc894632e59111527536b03a2f576bda13bbcec5df9d3c1338bR144 after the call to self.poll_for_termination method which raises an exception if the batch job does not succeed. So, I am guessing they are using the XCOM value in downstream tasks only for success scenarios, but, yes, would definitely like to hear if they are using/needing it for failure scenarios too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I say for now we stay consistent with the non-deferrable mode in this operator. If we/someone wants to start pushing it always, it can be done in a separate PR that handles both execution paths.
8b0d4ec to
cd2eb45
Compare
jedcunningham
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Change looks good but we need a test.
133305c to
02ef817
Compare
|
The static check failure is for I also tried to validate the file https://github.com/apache/airflow/blob/main/airflow/api_connexion/openapi/v1.yaml and it reported as a valid specification. Any idea what could be going wrong here? |
3901a65 to
efa3ceb
Compare
efa3ceb to
683d91e
Compare
With the change in PR apache#27376, we started pushing the Spark appId to XCOM when executing the Livy Operator in standard(non-defferable) mode. This commit now pushed the appId to XCOM in deferrable mode too to keep the expected outcome consistent for the operator so that subsequent tasks can fetch and use the appId from XCOM.
683d91e to
f1f1d95
Compare



With the change in PR #27376, we started pushing the Spark
appIdto
XCOMwhen executing theLivyOperatorin standard (non-defferable)mode. This commit now pushes the
appIdtoXCOMindeferrablemodetoo to keep the expected outcome consistent for the operator so that
subsequent tasks can fetch and use the
appIdfromXCOM.^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named
{pr_number}.significant.rstor{issue_number}.significant.rst, in newsfragments.