From b8affa37d45e9a7288f4173b5daead89ba4f6137 Mon Sep 17 00:00:00 2001 From: "yi.wu" Date: Thu, 11 Jun 2020 23:32:25 +0800 Subject: [PATCH 1/2] impr --- .../main/scala/org/apache/spark/executor/Executor.scala | 8 +++----- docs/configuration.md | 8 ++++---- 2 files changed, 7 insertions(+), 9 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 93d1acdd2d156..0c9afce13064c 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -323,10 +323,7 @@ private[spark] class Executor( val threadName = s"Executor task launch worker for task $taskId" val taskName = taskDescription.name val mdcProperties = taskDescription.properties.asScala - .filter(_._1.startsWith("mdc.")).map { item => - val key = item._1.substring(4) - (key, item._2) - }.toSeq + .filter(_._1.startsWith(Executor.MDC)).toSeq /** If specified, this task has been killed and this option contains the reason. */ @volatile private var reasonIfKilled: Option[String] = None @@ -705,7 +702,7 @@ private[spark] class Executor( MDC.clear() mdc.foreach { case (key, value) => MDC.put(key, value) } // avoid overriding the takName by the user - MDC.put("taskName", taskName) + MDC.put(s"${Executor.MDC}taskName", taskName) } /** @@ -965,4 +962,5 @@ private[spark] object Executor { // task is fully deserialized. When possible, the TaskContext.getLocalProperty call should be // used instead. val taskDeserializationProps: ThreadLocal[Properties] = new ThreadLocal[Properties] + val MDC = "mdc." } diff --git a/docs/configuration.md b/docs/configuration.md index 420942f7b7bbb..706c2552b1d17 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -2955,11 +2955,11 @@ Spark uses [log4j](http://logging.apache.org/log4j/) for logging. You can config `log4j.properties` file in the `conf` directory. One way to start is to copy the existing `log4j.properties.template` located there. -By default, Spark adds 1 record to the MDC (Mapped Diagnostic Context): `taskName`, which shows something -like `task 1.0 in stage 0.0`. You can add `%X{taskName}` to your patternLayout in +By default, Spark adds 1 record to the MDC (Mapped Diagnostic Context): `mdc.taskName`, which shows something +like `task 1.0 in stage 0.0`. You can add `%X{mdc.taskName}` to your patternLayout in order to print it in the logs. -Moreover, you can use `spark.sparkContext.setLocalProperty("mdc." + name, "value")` to add user specific data into MDC. -The key in MDC will be the string after the `mdc.` prefix. +Moreover, you can use `spark.sparkContext.setLocalProperty(s"mdc.$name", "value")` to add user specific data into MDC. +The key in MDC will be the string of "mdc.$name". # Overriding configuration directory From 2c77bbbb3ea11c299bc73b80aed925cc5b0b3b92 Mon Sep 17 00:00:00 2001 From: "yi.wu" Date: Fri, 12 Jun 2020 11:59:57 +0800 Subject: [PATCH 2/2] fix --- core/src/main/scala/org/apache/spark/executor/Executor.scala | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 0c9afce13064c..c8b1afeebac0d 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -323,7 +323,7 @@ private[spark] class Executor( val threadName = s"Executor task launch worker for task $taskId" val taskName = taskDescription.name val mdcProperties = taskDescription.properties.asScala - .filter(_._1.startsWith(Executor.MDC)).toSeq + .filter(_._1.startsWith("mdc.")).toSeq /** If specified, this task has been killed and this option contains the reason. */ @volatile private var reasonIfKilled: Option[String] = None @@ -702,7 +702,7 @@ private[spark] class Executor( MDC.clear() mdc.foreach { case (key, value) => MDC.put(key, value) } // avoid overriding the takName by the user - MDC.put(s"${Executor.MDC}taskName", taskName) + MDC.put("mdc.taskName", taskName) } /** @@ -962,5 +962,4 @@ private[spark] object Executor { // task is fully deserialized. When possible, the TaskContext.getLocalProperty call should be // used instead. val taskDeserializationProps: ThreadLocal[Properties] = new ThreadLocal[Properties] - val MDC = "mdc." }