From 2dc990534d3b1bcdcc72e023ab289f6cc7cf0616 Mon Sep 17 00:00:00 2001 From: Xianyang Liu Date: Wed, 25 Sep 2019 09:55:10 +0800 Subject: [PATCH 1/2] access executorDataMap should be protected by lock --- .../scheduler/cluster/CoarseGrainedSchedulerBackend.scala | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index c9b408bed1163..86accb36e82ed 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -537,7 +537,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp */ private def numExistingExecutors: Int = executorDataMap.size - override def getExecutorIds(): Seq[String] = { + override def getExecutorIds(): Seq[String] = synchronized { executorDataMap.keySet.toSeq } @@ -545,14 +545,15 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp executorDataMap.contains(id) && !executorsPendingToRemove.contains(id) } - override def maxNumConcurrentTasks(): Int = { + override def maxNumConcurrentTasks(): Int = synchronized { executorDataMap.values.map { executor => executor.totalCores / scheduler.CPUS_PER_TASK }.sum } // this function is for testing only - def getExecutorAvailableResources(executorId: String): Map[String, ExecutorResourceInfo] = { + def getExecutorAvailableResources( + executorId: String): Map[String, ExecutorResourceInfo] = synchronized { executorDataMap.get(executorId).map(_.resourcesInfo).getOrElse(Map.empty) } From 08f29888554d839b1329de49624989911d366984 Mon Sep 17 00:00:00 2001 From: Xianyang Liu Date: Wed, 25 Sep 2019 16:18:14 +0800 Subject: [PATCH 2/2] address comments --- .../cluster/CoarseGrainedSchedulerBackend.scala | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 86accb36e82ed..b32accf028495 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -68,10 +68,10 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp conf.get(SCHEDULER_MAX_REGISTERED_RESOURCE_WAITING_TIME)) private val createTimeNs = System.nanoTime() - // Accessing `executorDataMap` in `DriverEndpoint.receive/receiveAndReply` doesn't need any - // protection. But accessing `executorDataMap` out of `DriverEndpoint.receive/receiveAndReply` - // must be protected by `CoarseGrainedSchedulerBackend.this`. Besides, `executorDataMap` should - // only be modified in `DriverEndpoint.receive/receiveAndReply` with protection by + // Accessing `executorDataMap` in the inherited methods from ThreadSafeRpcEndpoint doesn't need + // any protection. But accessing `executorDataMap` out of the inherited methods must be + // protected by `CoarseGrainedSchedulerBackend.this`. Besides, `executorDataMap` should only + // be modified in the inherited methods from ThreadSafeRpcEndpoint with protection by // `CoarseGrainedSchedulerBackend.this`. private val executorDataMap = new HashMap[String, ExecutorData] @@ -535,7 +535,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp /** * Return the number of executors currently registered with this backend. */ - private def numExistingExecutors: Int = executorDataMap.size + private def numExistingExecutors: Int = synchronized { executorDataMap.size } override def getExecutorIds(): Seq[String] = synchronized { executorDataMap.keySet.toSeq