diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index c9b408bed1163..b32accf028495 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -68,10 +68,10 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp conf.get(SCHEDULER_MAX_REGISTERED_RESOURCE_WAITING_TIME)) private val createTimeNs = System.nanoTime() - // Accessing `executorDataMap` in `DriverEndpoint.receive/receiveAndReply` doesn't need any - // protection. But accessing `executorDataMap` out of `DriverEndpoint.receive/receiveAndReply` - // must be protected by `CoarseGrainedSchedulerBackend.this`. Besides, `executorDataMap` should - // only be modified in `DriverEndpoint.receive/receiveAndReply` with protection by + // Accessing `executorDataMap` in the inherited methods from ThreadSafeRpcEndpoint doesn't need + // any protection. But accessing `executorDataMap` out of the inherited methods must be + // protected by `CoarseGrainedSchedulerBackend.this`. Besides, `executorDataMap` should only + // be modified in the inherited methods from ThreadSafeRpcEndpoint with protection by // `CoarseGrainedSchedulerBackend.this`. private val executorDataMap = new HashMap[String, ExecutorData] @@ -535,9 +535,9 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp /** * Return the number of executors currently registered with this backend. */ - private def numExistingExecutors: Int = executorDataMap.size + private def numExistingExecutors: Int = synchronized { executorDataMap.size } - override def getExecutorIds(): Seq[String] = { + override def getExecutorIds(): Seq[String] = synchronized { executorDataMap.keySet.toSeq } @@ -545,14 +545,15 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp executorDataMap.contains(id) && !executorsPendingToRemove.contains(id) } - override def maxNumConcurrentTasks(): Int = { + override def maxNumConcurrentTasks(): Int = synchronized { executorDataMap.values.map { executor => executor.totalCores / scheduler.CPUS_PER_TASK }.sum } // this function is for testing only - def getExecutorAvailableResources(executorId: String): Map[String, ExecutorResourceInfo] = { + def getExecutorAvailableResources( + executorId: String): Map[String, ExecutorResourceInfo] = synchronized { executorDataMap.get(executorId).map(_.resourcesInfo).getOrElse(Map.empty) }