Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1603,7 +1603,7 @@ class SparkContext(config: SparkConf) extends Logging {

/**
* Get the max number of tasks that can be concurrent launched based on the ResourceProfile
* being used.
* could be used, even if some of them are being used at the moment.
* Note that please don't cache the value returned by this method, because the number can change
* due to add/remove executors.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,6 @@ private[spark] object BarrierJobAllocationFailed {
val ERROR_MESSAGE_BARRIER_REQUIRE_MORE_SLOTS_THAN_CURRENT_TOTAL_NUMBER =
"[SPARK-24819]: Barrier execution mode does not allow run a barrier stage that requires " +
"more slots than the total number of slots in the cluster currently. Please init a new " +
"cluster with more CPU cores or repartition the input RDD(s) to reduce the number of " +
"slots required to run this barrier stage."
"cluster with more resources(e.g. CPU, GPU) or repartition the input RDD(s) to reduce " +
"the number of slots required to run this barrier stage."
}
Original file line number Diff line number Diff line change
Expand Up @@ -480,10 +480,12 @@ private[spark] class DAGScheduler(
* submission.
*/
private def checkBarrierStageWithNumSlots(rdd: RDD[_], rp: ResourceProfile): Unit = {
val numPartitions = rdd.getNumPartitions
val maxNumConcurrentTasks = sc.maxNumConcurrentTasks(rp)
if (rdd.isBarrier() && numPartitions > maxNumConcurrentTasks) {
throw new BarrierJobSlotsNumberCheckFailed(numPartitions, maxNumConcurrentTasks)
if (rdd.isBarrier()) {
val numPartitions = rdd.getNumPartitions
val maxNumConcurrentTasks = sc.maxNumConcurrentTasks(rp)
if (numPartitions > maxNumConcurrentTasks) {
throw new BarrierJobSlotsNumberCheckFailed(numPartitions, maxNumConcurrentTasks)
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,4 +36,5 @@ private[spark] class ExecutorResourceInfo(
override protected def resourceName = this.name
override protected def resourceAddresses = this.addresses
override protected def slotsPerAddress: Int = numParts
def totalAddressAmount: Int = resourceAddresses.length * slotsPerAddress
}
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ private[spark] trait SchedulerBackend {

/**
* Get the max number of tasks that can be concurrent launched based on the ResourceProfile
* being used.
* could be used, even if some of them are being used at the moment.
* Note that please don't cache the value returned by this method, because the number can change
* due to add/remove executors.
*
Expand Down
113 changes: 65 additions & 48 deletions core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
Original file line number Diff line number Diff line change
Expand Up @@ -468,51 +468,6 @@ private[spark] class TaskSchedulerImpl(
Some(localTaskReqAssign.toMap)
}

// Use the resource that the resourceProfile has as the limiting resource to calculate the
// total number of slots available based on the current offers.
private def calculateAvailableSlots(
resourceProfileIds: Array[Int],
availableCpus: Array[Int],
availableResources: Array[Map[String, Buffer[String]]],
taskSet: TaskSetManager): Int = {
val resourceProfile = sc.resourceProfileManager.resourceProfileFromId(
taskSet.taskSet.resourceProfileId)
val offersForResourceProfile = resourceProfileIds.zipWithIndex.filter { case (id, _) =>
(id == resourceProfile.id)
}
val coresKnown = resourceProfile.isCoresLimitKnown
var limitingResource = resourceProfile.limitingResource(conf)
val taskCpus = ResourceProfile.getTaskCpusOrDefaultForProfile(resourceProfile, conf)

offersForResourceProfile.map { case (o, index) =>
val numTasksPerExecCores = availableCpus(index) / taskCpus
// if limiting resource is empty then we have no other resources, so it has to be CPU
if (limitingResource == ResourceProfile.CPUS || limitingResource.isEmpty) {
numTasksPerExecCores
} else {
val taskLimit = resourceProfile.taskResources.get(limitingResource).map(_.amount)
.getOrElse {
val errorMsg = "limitingResource returns from ResourceProfile " +
s"$resourceProfile doesn't actually contain that task resource!"
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tgravescs This actually should not happen, right? According to:

if (taskResourcesToCheck.nonEmpty) {
throw new SparkException("No executor resource configs were not specified for the " +
s"following task configs: ${taskResourcesToCheck.keys.mkString(",")}")
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

correct, I believe it was there just a double check and make sure nothing broke in the future

taskSet.abort(errorMsg)
throw new SparkException(errorMsg)
}
// available addresses already takes into account if there are fractional
// task resource requests
val availAddrs = availableResources(index).get(limitingResource).map(_.size).getOrElse(0)
val resourceLimit = (availAddrs / taskLimit).toInt
if (!coresKnown) {
// when executor cores config isn't set, we can't calculate the real limiting resource
// and number of tasks per executor ahead of time, so calculate it now based on what
// is available.
if (numTasksPerExecCores <= resourceLimit) numTasksPerExecCores else resourceLimit
} else {
resourceLimit
}
}
}.sum
}

private def minTaskLocality(
l1: Option[TaskLocality],
l2: Option[TaskLocality]) : Option[TaskLocality] = {
Expand Down Expand Up @@ -591,9 +546,14 @@ private[spark] class TaskSchedulerImpl(
// we only need to calculate available slots if using barrier scheduling, otherwise the
// value is -1
val numBarrierSlotsAvailable = if (taskSet.isBarrier) {
val slots = calculateAvailableSlots(resourceProfileIds, availableCpus, availableResources,
taskSet)
slots
val rpId = taskSet.taskSet.resourceProfileId
val availableResourcesAmount = availableResources.map { resourceMap =>
// available addresses already takes into account if there are fractional
// task resource requests
resourceMap.map { case (name, addresses) => (name, addresses.length) }
}
calculateAvailableSlots(this, conf, rpId, resourceProfileIds, availableCpus,
availableResourcesAmount)
} else {
-1
}
Expand Down Expand Up @@ -1166,6 +1126,63 @@ private[spark] object TaskSchedulerImpl {

val SCHEDULER_MODE_PROPERTY = SCHEDULER_MODE.key

/**
* Calculate the max available task slots given the `availableCpus` and `availableResources`
* from a collection of ResourceProfiles. And only those ResourceProfiles who has the
* same id with the `rpId` can be used to calculate the task slots.
*
* @param scheduler the TaskSchedulerImpl instance
* @param conf SparkConf used to calculate the limiting resource and get the cpu amount per task
* @param rpId the target ResourceProfile id. Only those ResourceProfiles who has the same id
* with it can be used to calculate the task slots.
* @param availableRPIds an Array of ids of the available ResourceProfiles from the executors.
* @param availableCpus an Array of the amount of available cpus from the executors.
* @param availableResources an Array of the resources map from the executors. In the resource
* map, it maps from the resource name to its amount.
* @return the number of max task slots
*/
def calculateAvailableSlots(
scheduler: TaskSchedulerImpl,
conf: SparkConf,
rpId: Int,
availableRPIds: Array[Int],
availableCpus: Array[Int],
availableResources: Array[Map[String, Int]]): Int = {
val resourceProfile = scheduler.sc.resourceProfileManager.resourceProfileFromId(rpId)
val coresKnown = resourceProfile.isCoresLimitKnown
val (limitingResource, limitedByCpu) = {
val limiting = resourceProfile.limitingResource(conf)
// if limiting resource is empty then we have no other resources, so it has to be CPU
if (limiting == ResourceProfile.CPUS || limiting.isEmpty) {
(ResourceProfile.CPUS, true)
} else {
(limiting, false)
}
}
val cpusPerTask = ResourceProfile.getTaskCpusOrDefaultForProfile(resourceProfile, conf)
val taskLimit = resourceProfile.taskResources.get(limitingResource).map(_.amount).get

availableCpus.zip(availableResources).zip(availableRPIds)
.filter { case (_, id) => id == rpId }
.map { case ((cpu, resources), _) =>
val numTasksPerExecCores = cpu / cpusPerTask
if (limitedByCpu) {
numTasksPerExecCores
} else {
val availAddrs = resources.getOrElse(limitingResource, 0)
val resourceLimit = (availAddrs / taskLimit).toInt
// when executor cores config isn't set, we can't calculate the real limiting resource
// and number of tasks per executor ahead of time, so calculate it now based on what
// is available.
if (!coresKnown && numTasksPerExecCores <= resourceLimit) {
numTasksPerExecCores
} else {
resourceLimit
}
}
}.sum
}

/**
* Used to balance containers across hosts.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -632,10 +632,28 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp

}

/**
* Get the max number of tasks that can be concurrent launched based on the ResourceProfile
* could be used, even if some of them are being used at the moment.
* Note that please don't cache the value returned by this method, because the number can change
* due to add/remove executors.
*
* @param rp ResourceProfile which to use to calculate max concurrent tasks.
* @return The max number of tasks that can be concurrent launched currently.
*/
override def maxNumConcurrentTasks(rp: ResourceProfile): Int = synchronized {
val cpusPerTask = ResourceProfile.getTaskCpusOrDefaultForProfile(rp, conf)
val executorsWithResourceProfile = executorDataMap.values.filter(_.resourceProfileId == rp.id)
executorsWithResourceProfile.map(_.totalCores / cpusPerTask).sum
val (rpIds, cpus, resources) = {
executorDataMap
.filter { case (id, _) => isExecutorActive(id) }
.values.toArray.map { executor =>
(
executor.resourceProfileId,
executor.totalCores,
executor.resourcesInfo.map { case (name, rInfo) => (name, rInfo.totalAddressAmount) }
)
}.unzip3
}
TaskSchedulerImpl.calculateAvailableSlots(scheduler, conf, rp.id, rpIds, cpus, resources)
}

// this function is for testing only
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,12 @@ package org.apache.spark

import scala.concurrent.duration._

import org.apache.spark.TestUtils.createTempScriptWithExpectedOutput
import org.apache.spark.internal.config._
import org.apache.spark.rdd.{PartitionPruningRDD, RDD}
import org.apache.spark.resource.TestResourceIDs.{EXECUTOR_GPU_ID, TASK_GPU_ID, WORKER_GPU_ID}
import org.apache.spark.scheduler.BarrierJobAllocationFailed._
import org.apache.spark.scheduler.BarrierJobSlotsNumberCheckFailed
import org.apache.spark.util.ThreadUtils

/**
Expand Down Expand Up @@ -259,4 +262,37 @@ class BarrierStageOnSubmittedSuite extends SparkFunSuite with LocalSparkContext
testSubmitJob(sc, rdd,
message = ERROR_MESSAGE_BARRIER_REQUIRE_MORE_SLOTS_THAN_CURRENT_TOTAL_NUMBER)
}

test("SPARK-32518: CoarseGrainedSchedulerBackend.maxNumConcurrentTasks should " +
"consider all kinds of resources for the barrier stage") {
withTempDir { dir =>
val discoveryScript = createTempScriptWithExpectedOutput(
dir, "gpuDiscoveryScript", """{"name": "gpu","addresses":["0"]}""")

val conf = new SparkConf()
// Setup a local cluster which would only has one executor with 2 CPUs and 1 GPU.
.setMaster("local-cluster[1, 2, 1024]")
.setAppName("test-cluster")
.set(WORKER_GPU_ID.amountConf, "1")
.set(WORKER_GPU_ID.discoveryScriptConf, discoveryScript)
.set(EXECUTOR_GPU_ID.amountConf, "1")
.set(TASK_GPU_ID.amountConf, "1")
// disable barrier stage retry to fail the application as soon as possible
.set(BARRIER_MAX_CONCURRENT_TASKS_CHECK_MAX_FAILURES, 1)
sc = new SparkContext(conf)
TestUtils.waitUntilExecutorsUp(sc, 1, 60000)

val exception = intercept[BarrierJobSlotsNumberCheckFailed] {
// Setup a barrier stage which contains 2 tasks and each task requires 1 CPU and 1 GPU.
// Therefore, the total resources requirement (2 CPUs and 2 GPUs) of this barrier stage
// can not be satisfied since the cluster only has 2 CPUs and 1 GPU in total.
sc.parallelize(Range(1, 10), 2)
.barrier()
.mapPartitions { iter => iter }
.collect()
}
assert(exception.getMessage.contains("[SPARK-24819]: Barrier execution " +
"mode does not allow run a barrier stage that requires more slots"))
}
}
}