From fe27f88f5210df9d9fc9a00f1c8a853ad22f5fe9 Mon Sep 17 00:00:00 2001 From: "yi.wu" Date: Mon, 8 Jun 2020 23:19:40 +0800 Subject: [PATCH 1/4] fix --- .../org/apache/spark/executor/Executor.scala | 48 ++++++++++++++++--- 1 file changed, 41 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 45cec726c4ca7..f8bff55de9fca 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -322,11 +322,15 @@ private[spark] class Executor( val taskId = taskDescription.taskId val threadName = s"Executor task launch worker for task $taskId" val taskName = taskDescription.name - val mdcProperties = taskDescription.properties.asScala - .filter(_._1.startsWith("mdc.")).map { item => + val mdcProperties = (taskDescription.properties.asScala ++ + Seq((Executor.TASK_MDC_KEY, taskName))) + .filter(_._1.startsWith(Executor.MDC_KEY)).map { item => val key = item._1.substring(4) + if (key == Executor.TASK_MDC_KEY && item._2 != taskName) { + logWarning(s"Override mdc.taskName is not allowed, ignore ${item._2}") + } (key, item._2) - }.toSeq + }.toMap /** If specified, this task has been killed and this option contains the reason. */ @volatile private var reasonIfKilled: Option[String] = None @@ -401,9 +405,22 @@ private[spark] class Executor( } override def run(): Unit = { + val oldMdcProperties = mdcProperties.keys.map(k => (k, MDC.get(k))) + try { + mdcProperties.foreach { case (k, v) => MDC.put(k, v) } + runInternal() + } finally { + oldMdcProperties.foreach { case (k, v) => + if (v == null) { + MDC.remove(k) + } else { + MDC.put(k, v) + } + } + } + } - setMDCForTask(taskName, mdcProperties) - + private def runInternal(): Unit = { threadId = Thread.currentThread.getId Thread.currentThread.setName(threadName) val threadMXBean = ManagementFactory.getThreadMXBean @@ -750,9 +767,23 @@ private[spark] class Executor( private[this] val takeThreadDump: Boolean = conf.get(TASK_REAPER_THREAD_DUMP) override def run(): Unit = { + val mdcProperties = taskRunner.mdcProperties + val oldMdcProperties = mdcProperties.keys.map(k => (k, MDC.get(k))) + try { + mdcProperties.foreach { case (k, v) => MDC.put(k, v) } + runInternal() + } finally { + oldMdcProperties.foreach { case (k, v) => + if (v == null) { + MDC.remove(k) + } else { + MDC.put(k, v) + } + } + } + } - setMDCForTask(taskRunner.taskName, taskRunner.mdcProperties) - + private def runInternal(): Unit = { val startTimeNs = System.nanoTime() def elapsedTimeNs = System.nanoTime() - startTimeNs def timeoutExceeded(): Boolean = killTimeoutNs > 0 && elapsedTimeNs > killTimeoutNs @@ -969,4 +1000,7 @@ private[spark] object Executor { // task is fully deserialized. When possible, the TaskContext.getLocalProperty call should be // used instead. val taskDeserializationProps: ThreadLocal[Properties] = new ThreadLocal[Properties] + + val MDC_KEY = "mdc." + val TASK_MDC_KEY = s"${MDC_KEY}taskName" } From be3d7521eb68a9ded5fc4f3cf4b71ca4d1d226be Mon Sep 17 00:00:00 2001 From: "yi.wu" Date: Mon, 8 Jun 2020 23:22:40 +0800 Subject: [PATCH 2/4] remove setMDC --- .../main/scala/org/apache/spark/executor/Executor.scala | 8 -------- 1 file changed, 8 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index f8bff55de9fca..d095392852cb6 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -719,14 +719,6 @@ private[spark] class Executor( } } - private def setMDCForTask(taskName: String, mdc: Seq[(String, String)]): Unit = { - MDC.put("taskName", taskName) - - mdc.foreach { case (key, value) => - MDC.put(key, value) - } - } - /** * Supervises the killing / cancellation of a task by sending the interrupted flag, optionally * sending a Thread.interrupt(), and monitoring the task until it finishes. From 0cefc616952a22f3d66d79e793e8efb2e62d009a Mon Sep 17 00:00:00 2001 From: "yi.wu" Date: Thu, 11 Jun 2020 17:06:01 +0800 Subject: [PATCH 3/4] address comments --- .../org/apache/spark/executor/Executor.scala | 43 ++++++------------- 1 file changed, 14 insertions(+), 29 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index d095392852cb6..c0f1a34cf8b98 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -322,15 +322,11 @@ private[spark] class Executor( val taskId = taskDescription.taskId val threadName = s"Executor task launch worker for task $taskId" val taskName = taskDescription.name - val mdcProperties = (taskDescription.properties.asScala ++ - Seq((Executor.TASK_MDC_KEY, taskName))) - .filter(_._1.startsWith(Executor.MDC_KEY)).map { item => + val mdcProperties = taskDescription.properties.asScala + .filter(_._1.startsWith("mdc.")).map { item => val key = item._1.substring(4) - if (key == Executor.TASK_MDC_KEY && item._2 != taskName) { - logWarning(s"Override mdc.taskName is not allowed, ignore ${item._2}") - } (key, item._2) - }.toMap + }.toSeq /** If specified, this task has been killed and this option contains the reason. */ @volatile private var reasonIfKilled: Option[String] = None @@ -405,18 +401,11 @@ private[spark] class Executor( } override def run(): Unit = { - val oldMdcProperties = mdcProperties.keys.map(k => (k, MDC.get(k))) try { - mdcProperties.foreach { case (k, v) => MDC.put(k, v) } + setMDCForTask(taskName, mdcProperties) runInternal() } finally { - oldMdcProperties.foreach { case (k, v) => - if (v == null) { - MDC.remove(k) - } else { - MDC.put(k, v) - } - } + MDC.clear() } } @@ -719,6 +708,13 @@ private[spark] class Executor( } } + private def setMDCForTask(taskName: String, mdc: Seq[(String, String)]): Unit = { + mdc.foreach { case (key, value) => + MDC.put(key, value) + } + MDC.put("taskName", taskName) + } + /** * Supervises the killing / cancellation of a task by sending the interrupted flag, optionally * sending a Thread.interrupt(), and monitoring the task until it finishes. @@ -759,19 +755,11 @@ private[spark] class Executor( private[this] val takeThreadDump: Boolean = conf.get(TASK_REAPER_THREAD_DUMP) override def run(): Unit = { - val mdcProperties = taskRunner.mdcProperties - val oldMdcProperties = mdcProperties.keys.map(k => (k, MDC.get(k))) try { - mdcProperties.foreach { case (k, v) => MDC.put(k, v) } + setMDCForTask(taskRunner.taskName, taskRunner.mdcProperties) runInternal() } finally { - oldMdcProperties.foreach { case (k, v) => - if (v == null) { - MDC.remove(k) - } else { - MDC.put(k, v) - } - } + MDC.clear() } } @@ -992,7 +980,4 @@ private[spark] object Executor { // task is fully deserialized. When possible, the TaskContext.getLocalProperty call should be // used instead. val taskDeserializationProps: ThreadLocal[Properties] = new ThreadLocal[Properties] - - val MDC_KEY = "mdc." - val TASK_MDC_KEY = s"${MDC_KEY}taskName" } From 2f1b86b59d92c3a163034e8a478536d1fc13fcac Mon Sep 17 00:00:00 2001 From: "yi.wu" Date: Thu, 11 Jun 2020 17:27:08 +0800 Subject: [PATCH 4/4] clear MDC at the beginning --- .../org/apache/spark/executor/Executor.scala | 27 +++++-------------- 1 file changed, 6 insertions(+), 21 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index c0f1a34cf8b98..93d1acdd2d156 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -401,15 +401,7 @@ private[spark] class Executor( } override def run(): Unit = { - try { - setMDCForTask(taskName, mdcProperties) - runInternal() - } finally { - MDC.clear() - } - } - - private def runInternal(): Unit = { + setMDCForTask(taskName, mdcProperties) threadId = Thread.currentThread.getId Thread.currentThread.setName(threadName) val threadMXBean = ManagementFactory.getThreadMXBean @@ -709,9 +701,10 @@ private[spark] class Executor( } private def setMDCForTask(taskName: String, mdc: Seq[(String, String)]): Unit = { - mdc.foreach { case (key, value) => - MDC.put(key, value) - } + // make sure we run the task with the user-specified mdc properties only + MDC.clear() + mdc.foreach { case (key, value) => MDC.put(key, value) } + // avoid overriding the takName by the user MDC.put("taskName", taskName) } @@ -755,15 +748,7 @@ private[spark] class Executor( private[this] val takeThreadDump: Boolean = conf.get(TASK_REAPER_THREAD_DUMP) override def run(): Unit = { - try { - setMDCForTask(taskRunner.taskName, taskRunner.mdcProperties) - runInternal() - } finally { - MDC.clear() - } - } - - private def runInternal(): Unit = { + setMDCForTask(taskRunner.taskName, taskRunner.mdcProperties) val startTimeNs = System.nanoTime() def elapsedTimeNs = System.nanoTime() - startTimeNs def timeoutExceeded(): Boolean = killTimeoutNs > 0 && elapsedTimeNs > killTimeoutNs