-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-16759][CORE] Add a configuration property to pass caller contexts of upstream applications into Spark #15563
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Can't this just be a normal sparkconf? Why do we need to specialize the spark-submit arguments for this? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
spark.hadoop prefix will be treated as Hadoop configuration and set to Configuration, probably be better to change to another name.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for this info. I have changed the property name to "spark.upstreamApp.callerContext".
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could simplify with getOrElse here and below.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
upstreamCallerContextStr needs to combine some characters/words with upstreamCallerContext.get, so getOrElse is not used here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
upstreamCallerContext.map("_" + _).getOrElse("") ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I have updated the PR to use this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be better to define "spark.hadoop.callerContext" in internal/config and use that instead.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. Done.
|
@rxin Thanks for reviewing this. I have updated the PR that does not make this to a spark-submit argument. |
2cdbe4f to
45fccf8
Compare
|
Hadoop verifies caller contexts passed in here and isContextValid(). It limits the length. |
|
Hi, @tgravescs Could you please review this PR? Thanks. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd prefer to see new parameters added as the last param to the class.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree with you that new parameters should be added as last parameters so that no client code would be broken. It’s just in this case, there are three callers in total and they are all under my control. The new optional parameter would be used much more frequently than other optional parameters, so I think I shall change the parameter orders before the class is used more widely. If I put the new parameter as the last one, users (including the existing three) would have to pass may “None”s as parameters.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is possible to use named parameters, but I agree with the point of spark private scope + frequency of use here to be mitigating factor.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we log a warning here saying we truncated and what the string was truncated from and to.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. Done.
docs/running-on-yarn.md
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
doesn't match above actual config spark.upstreamApp.callerContext. how about: spark.log.callerContext
If I'm running spark in standalonde mode with master/worker and reading from hdfs, the caller context would still work on the hdfs side, right? So this isn't just a spark on yarn config and should move to general configuration section but mention applies to yarn/hdfs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. I have changed the config to spark.log.callerContext and moved the documentation to spark/docs/configuration.md.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
upstreamCallerContext.map("_" + _).getOrElse("") ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should prepareContext be done once as part of initialization of context or should it need to be done for each invocation of setCurrentContext ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
prepareContext needs to be done for each invocation of setCurrentContext.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why? we set the actual context when we create it and there is no way to change it. set is just actually calling the hadoop routine. I don't know that it matters to much right now since we always create and set but its better to do.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The reason prepareContext is called in each setCallerContext is that currently every client only need to set the caller context once, and each CallerContext object only call setCallerContext once. Yes, I agree with your point, that will make the implementation better and benefit future invocations.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
private
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. Done.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can inline upstreamCallerContextStr and remove the variable ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. I have updated the PR to inline the variables.
…ntexts of upstream applications into Spark
|
Jenkins, test this please |
|
Test build #68340 has finished for PR 15563 at commit
|
|
|
||
| val context = "SPARK_" + from + appIdStr + appAttemptIdStr + | ||
| jobIdStr + stageIdStr + stageAttemptIdStr + taskIdStr + taskAttemptNumberStr | ||
| val context = "SPARK_" + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
make this private
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
|
please look at jenkins failures |
|
Jenkins, test this please |
1 similar comment
|
Jenkins, test this please |
|
@mridulm @tgravescs It seems Jenkins doesn't work? |
|
jenkins, ok to test |
|
Test build #68437 has finished for PR 15563 at commit
|
|
+1. @mridulm you have any further comments? |
|
Looks good to me |
|
Merging into master |
|
Thanks a lot for the review, @tgravescs @mridulm |
|
|
||
| val context = "SPARK_" + from + appIdStr + appAttemptIdStr + | ||
| jobIdStr + stageIdStr + stageAttemptIdStr + taskIdStr + taskAttemptNumberStr | ||
| from: String, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is off?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What do you mean? Could you please elaborate? Thanks.
| .booleanConf | ||
| .createWithDefault(false) | ||
|
|
||
| private[spark] val APP_CALLER_CONTEXT = ConfigBuilder("spark.log.callerContext") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shouldn't this option be called spark.yarn.log.callerContext?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not just for Yarn, if running spark apps in standalonde mode with master and workers and reading/writing from/to hdfs, the caller context would still work on the hdfs side. (PS. we also can not use spark.hadoop prefix that will be treated as Hadoop configuration and set to Configuration.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the comment. Would it be a problem if we use spark.hadoop.log.callerContext? I know it gets passed into Configuration, but why would that be a problem? Is it overriding some common configuration in Hadoop?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It shouldn't hurt anything as I don't think there is a hadoop config called that but if there was it would conflict. In general I think that is a bad idea for us to purposely do this, there is no reason to put it into the hadoop configuration also. I saw this as something that could apply to other things (then yarn/hdfs) if they supplied the api. For instance is aws s3 or any other filesystem provided the similar api you could use same config. But I don't know if it does now.
What is your concern with the current name?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My concern is that this is a very hadoop specific thing at the moment, and it is unclear other environments will support it, so it'd make more sense to have hadoop in the name. A lot of Spark users don't run Hadoop.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Personally I don't like using the spark.hadoop. for this if we know its already being used and these configs will always be added to the hadoop config when not needed. I guess perhaps we messed up naming that to put things into hadoop configuration.
I would say we should change spark.hadoop being applied to hadoop conf (to like spark.hadoopConf.) but even though I don't see it documented anywhere I think it would be to painful to change as I know people are using it.
what about spark.log.hadoop.callerContext? Although perhaps we should set a policy for this in general.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi, @rxin If you think spark.log.hadoop.callerContext is ok, I can submit a follow-up PR to rename spark.log.callerContext.
…xts of upstream applications into Spark
## What changes were proposed in this pull request?
Many applications take Spark as a computing engine and run on it. This PR adds a configuration property `spark.log.callerContext` that can be used by Spark's upstream applications (e.g. Oozie) to set up their caller contexts into Spark. In the end, Spark will combine its own caller context with the caller contexts of its upstream applications, and write them into Yarn RM log and HDFS audit log.
The audit log has a config to truncate the caller contexts passed in (default 128). The caller contexts will be sent over rpc, so it should be concise. The call context written into HDFS log and Yarn log consists of two parts: the information `A` specified by Spark itself and the value `B` of `spark.log.callerContext` property. Currently `A` typically takes 64 to 74 characters, so `B` can have up to 50 characters (mentioned in the doc `running-on-yarn.md`)
## How was this patch tested?
Manual tests. I have run some Spark applications with `spark.log.callerContext` configuration in Yarn client/cluster mode, and verified that the caller contexts were written into Yarn RM log and HDFS audit log correctly.
The ways to configure `spark.log.callerContext` property:
- In spark-defaults.conf:
```
spark.log.callerContext infoSpecifiedByUpstreamApp
```
- In app's source code:
```
val spark = SparkSession
.builder
.appName("SparkKMeans")
.config("spark.log.callerContext", "infoSpecifiedByUpstreamApp")
.getOrCreate()
```
When running on Spark Yarn cluster mode, the driver is unable to pass 'spark.log.callerContext' to Yarn client and AM since Yarn client and AM have already started before the driver performs `.config("spark.log.callerContext", "infoSpecifiedByUpstreamApp")`.
The following example shows the command line used to submit a SparkKMeans application and the corresponding records in Yarn RM log and HDFS audit log.
Command:
```
./bin/spark-submit --verbose --executor-cores 3 --num-executors 1 --master yarn --deploy-mode client --class org.apache.spark.examples.SparkKMeans examples/target/original-spark-examples_2.11-2.1.0-SNAPSHOT.jar hdfs://localhost:9000/lr_big.txt 2 5
```
Yarn RM log:
<img width="1440" alt="screen shot 2016-10-19 at 9 12 03 pm" src="https://cloud.githubusercontent.com/assets/8546874/19547050/7d2f278c-9649-11e6-9df8-8d5ff12609f0.png">
HDFS audit log:
<img width="1400" alt="screen shot 2016-10-19 at 10 18 14 pm" src="https://cloud.githubusercontent.com/assets/8546874/19547102/096060ae-964a-11e6-981a-cb28efd5a058.png">
Author: Weiqing Yang <yangweiqing001@gmail.com>
Closes apache#15563 from weiqingy/SPARK-16759.
What changes were proposed in this pull request?
Many applications take Spark as a computing engine and run on it. This PR adds a configuration property
spark.log.callerContextthat can be used by Spark's upstream applications (e.g. Oozie) to set up their caller contexts into Spark. In the end, Spark will combine its own caller context with the caller contexts of its upstream applications, and write them into Yarn RM log and HDFS audit log.The audit log has a config to truncate the caller contexts passed in (default 128). The caller contexts will be sent over rpc, so it should be concise. The call context written into HDFS log and Yarn log consists of two parts: the information
Aspecified by Spark itself and the valueBofspark.log.callerContextproperty. CurrentlyAtypically takes 64 to 74 characters, soBcan have up to 50 characters (mentioned in the docrunning-on-yarn.md)How was this patch tested?
Manual tests. I have run some Spark applications with
spark.log.callerContextconfiguration in Yarn client/cluster mode, and verified that the caller contexts were written into Yarn RM log and HDFS audit log correctly.The ways to configure
spark.log.callerContextproperty:When running on Spark Yarn cluster mode, the driver is unable to pass 'spark.log.callerContext' to Yarn client and AM since Yarn client and AM have already started before the driver performs
.config("spark.log.callerContext", "infoSpecifiedByUpstreamApp").The following example shows the command line used to submit a SparkKMeans application and the corresponding records in Yarn RM log and HDFS audit log.
Command:
Yarn RM log:
HDFS audit log: