Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
1977934
Start getting rid of references to slave
holdenk Jun 16, 2020
0441b81
I think agent is clearer given how we already use worker for other th…
holdenk Jun 18, 2020
d1244b2
Rename the blockmanager slave to replica
holdenk Jun 18, 2020
27a3762
Finish up in core
holdenk Jun 18, 2020
110e244
Fix in the main places
holdenk Jun 18, 2020
e479429
Fix in SQL
holdenk Jun 18, 2020
578da6c
Update the resource managers
holdenk Jun 18, 2020
dd5716a
fix a long line
holdenk Jun 18, 2020
b263035
Actually this is the block manager not the exec
holdenk Jun 18, 2020
74e7bde
one more long line
holdenk Jun 18, 2020
16305a0
Cleanups and rename the sbin scripts
holdenk Jun 18, 2020
83cb4e8
Add the transitional shell scritps
holdenk Jun 18, 2020
a8c35d6
Update AgentLost to ExecutorProcessLost, note we still use agent/agen…
holdenk Jun 23, 2020
9aa5676
Rename the sample config to match
holdenk Jun 23, 2020
99375d7
Update the markdown files in docs to use more accurate words
holdenk Jun 23, 2020
11228bc
Fix up Mesos to still use the SlaveID protos since while their http A…
holdenk Jun 24, 2020
eaf29d8
Update the JsonProtocolSuite to match that were outputting the new an…
holdenk Jun 24, 2020
6f5f3cc
Code review feedback
holdenk Jun 29, 2020
14d40bb
Implement a common trait as suggested by ryanblue (did not add to exi…
holdenk Jun 29, 2020
d495203
Merge branch 'master' into SPARK-32004-drop-references-to-slave
holdenk Jul 1, 2020
9c43172
Merge branch 'master' into SPARK-32004-drop-references-to-slave
holdenk Jul 13, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
File renamed without changes.
9 changes: 5 additions & 4 deletions core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala
Original file line number Diff line number Diff line change
Expand Up @@ -80,18 +80,18 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, clock: Clock)
// executor ID -> timestamp of when the last heartbeat from this executor was received
private val executorLastSeen = new HashMap[String, Long]

private val executorTimeoutMs = sc.conf.get(config.STORAGE_BLOCKMANAGER_SLAVE_TIMEOUT)
private val executorTimeoutMs = sc.conf.get(config.STORAGE_BLOCKMANAGER_HEARTBEAT_TIMEOUT)

private val checkTimeoutIntervalMs = sc.conf.get(Network.NETWORK_TIMEOUT_INTERVAL)

private val executorHeartbeatIntervalMs = sc.conf.get(config.EXECUTOR_HEARTBEAT_INTERVAL)

require(checkTimeoutIntervalMs <= executorTimeoutMs,
s"${Network.NETWORK_TIMEOUT_INTERVAL.key} should be less than or " +
s"equal to ${config.STORAGE_BLOCKMANAGER_SLAVE_TIMEOUT.key}.")
s"equal to ${config.STORAGE_BLOCKMANAGER_HEARTBEAT_TIMEOUT.key}.")
require(executorHeartbeatIntervalMs <= executorTimeoutMs,
s"${config.EXECUTOR_HEARTBEAT_INTERVAL.key} should be less than or " +
s"equal to ${config.STORAGE_BLOCKMANAGER_SLAVE_TIMEOUT.key}")
s"equal to ${config.STORAGE_BLOCKMANAGER_HEARTBEAT_TIMEOUT.key}")

private var timeoutCheckingTask: ScheduledFuture[_] = null

Expand Down Expand Up @@ -218,7 +218,8 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, clock: Clock)
sc.schedulerBackend match {
case backend: CoarseGrainedSchedulerBackend =>
backend.driverEndpoint.send(RemoveExecutor(executorId,
SlaveLost(s"Executor heartbeat timed out after ${now - lastSeenMs} ms")))
ExecutorProcessLost(
s"Executor heartbeat timed out after ${now - lastSeenMs} ms")))

// LocalSchedulerBackend is used locally and only has one single executor
case _: LocalSchedulerBackend =>
Expand Down
16 changes: 8 additions & 8 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1732,7 +1732,7 @@ class SparkContext(config: SparkConf) extends Logging {
def version: String = SPARK_VERSION

/**
* Return a map from the slave to the max memory available for caching and the remaining
* Return a map from the block manager to the max memory available for caching and the remaining
* memory available for caching.
*/
def getExecutorMemoryStatus: Map[String, (Long, Long)] = {
Expand Down Expand Up @@ -2830,14 +2830,14 @@ object SparkContext extends Logging {
scheduler.initialize(backend)
(backend, scheduler)

case LOCAL_CLUSTER_REGEX(numSlaves, coresPerSlave, memoryPerSlave) =>
checkResourcesPerTask(coresPerSlave.toInt)
// Check to make sure memory requested <= memoryPerSlave. Otherwise Spark will just hang.
val memoryPerSlaveInt = memoryPerSlave.toInt
if (sc.executorMemory > memoryPerSlaveInt) {
case LOCAL_CLUSTER_REGEX(numWorkers, coresPerWorker, memoryPerWorker) =>
checkResourcesPerTask(coresPerWorker.toInt)
// Check to make sure memory requested <= memoryPerWorker. Otherwise Spark will just hang.
val memoryPerWorkerInt = memoryPerWorker.toInt
if (sc.executorMemory > memoryPerWorkerInt) {
throw new SparkException(
"Asked to launch cluster with %d MiB RAM / worker but requested %d MiB/worker".format(
memoryPerSlaveInt, sc.executorMemory))
memoryPerWorkerInt, sc.executorMemory))
}

// For host local mode setting the default of SHUFFLE_HOST_LOCAL_DISK_READING_ENABLED
Expand All @@ -2850,7 +2850,7 @@ object SparkContext extends Logging {

val scheduler = new TaskSchedulerImpl(sc)
val localCluster = new LocalSparkCluster(
numSlaves.toInt, coresPerSlave.toInt, memoryPerSlaveInt, sc.conf)
numWorkers.toInt, coresPerWorker.toInt, memoryPerWorkerInt, sc.conf)
val masterUrls = localCluster.start()
val backend = new StandaloneSchedulerBackend(scheduler, sc, masterUrls)
scheduler.initialize(backend)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ class JavaSparkContext(val sc: SparkContext) extends Closeable {
/**
* @param master Cluster URL to connect to (e.g. mesos://host:port, spark://host:port, local[4]).
* @param appName A name for your application, to display on the cluster web UI
* @param sparkHome The SPARK_HOME directory on the slave nodes
* @param sparkHome The SPARK_HOME directory on the worker nodes
* @param jarFile JAR file to send to the cluster. This can be a path on the local file system
* or an HDFS, HTTP, HTTPS, or FTP URL.
*/
Expand All @@ -84,7 +84,7 @@ class JavaSparkContext(val sc: SparkContext) extends Closeable {
/**
* @param master Cluster URL to connect to (e.g. mesos://host:port, spark://host:port, local[4]).
* @param appName A name for your application, to display on the cluster web UI
* @param sparkHome The SPARK_HOME directory on the slave nodes
* @param sparkHome The SPARK_HOME directory on the worker nodes
* @param jars Collection of JARs to send to the cluster. These can be paths on the local file
* system or HDFS, HTTP, HTTPS, or FTP URLs.
*/
Expand All @@ -94,7 +94,7 @@ class JavaSparkContext(val sc: SparkContext) extends Closeable {
/**
* @param master Cluster URL to connect to (e.g. mesos://host:port, spark://host:port, local[4]).
* @param appName A name for your application, to display on the cluster web UI
* @param sparkHome The SPARK_HOME directory on the slave nodes
* @param sparkHome The SPARK_HOME directory on the worker nodes
* @param jars Collection of JARs to send to the cluster. These can be paths on the local file
* system or HDFS, HTTP, HTTPS, or FTP URLs.
* @param environment Environment variables to set on worker nodes
Expand Down
15 changes: 11 additions & 4 deletions core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala
Original file line number Diff line number Diff line change
Expand Up @@ -90,19 +90,23 @@ private[deploy] object JsonProtocol {
* `name` the description of the application
* `cores` total cores granted to the application
* `user` name of the user who submitted the application
* `memoryperslave` minimal memory in MB required to each executor
* `resourcesperslave` minimal resources required to each executor
* `memoryperexecutor` minimal memory in MB required to each executor
* `resourcesperexecutor` minimal resources required to each executor
* `submitdate` time in Date that the application is submitted
* `state` state of the application, see [[ApplicationState]]
* `duration` time in milliseconds that the application has been running
* For compatibility also returns the deprecated `memoryperslave` & `resourcesperslave` fields.
*/
def writeApplicationInfo(obj: ApplicationInfo): JObject = {
("id" -> obj.id) ~
("starttime" -> obj.startTime) ~
("name" -> obj.desc.name) ~
("cores" -> obj.coresGranted) ~
("user" -> obj.desc.user) ~
("memoryperexecutor" -> obj.desc.memoryPerExecutorMB) ~
("memoryperslave" -> obj.desc.memoryPerExecutorMB) ~
("resourcesperexecutor" -> obj.desc.resourceReqsPerExecutor
.toList.map(writeResourceRequirement)) ~
("resourcesperslave" -> obj.desc.resourceReqsPerExecutor
.toList.map(writeResourceRequirement)) ~
("submitdate" -> obj.submitDate.toString) ~
Expand All @@ -117,14 +121,17 @@ private[deploy] object JsonProtocol {
* @return a Json object containing the following fields:
* `name` the description of the application
* `cores` max cores that can be allocated to the application, 0 means unlimited
* `memoryperslave` minimal memory in MB required to each executor
* `resourcesperslave` minimal resources required to each executor
* `memoryperexecutor` minimal memory in MB required to each executor
* `resourcesperexecutor` minimal resources required to each executor
* `user` name of the user who submitted the application
* `command` the command string used to submit the application
* For compatibility also returns the deprecated `memoryperslave` & `resourcesperslave` fields.
*/
def writeApplicationDescription(obj: ApplicationDescription): JObject = {
("name" -> obj.name) ~
("cores" -> obj.maxCores.getOrElse(0)) ~
("memoryperexecutor" -> obj.memoryPerExecutorMB) ~
("resourcesperexecutor" -> obj.resourceReqsPerExecutor.toList.map(writeResourceRequirement)) ~
("memoryperslave" -> obj.memoryPerExecutorMB) ~
("resourcesperslave" -> obj.resourceReqsPerExecutor.toList.map(writeResourceRequirement)) ~
("user" -> obj.user) ~
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -459,9 +459,10 @@ package object config {
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("60s")

private[spark] val STORAGE_BLOCKMANAGER_SLAVE_TIMEOUT =
ConfigBuilder("spark.storage.blockManagerSlaveTimeoutMs")
private[spark] val STORAGE_BLOCKMANAGER_HEARTBEAT_TIMEOUT =
ConfigBuilder("spark.storage.blockManagerHeartbeatTimeoutMs")
.version("0.7.0")
.withAlternative("spark.storage.blockManagerSlaveTimeoutMs")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString(Network.NETWORK_TIMEOUT.defaultValueString)

Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ private[spark] class HadoopPartition(rddId: Int, override val index: Int, s: Inp
* @param sc The SparkContext to associate the RDD with.
* @param broadcastedConf A general Hadoop Configuration, or a subclass of it. If the enclosed
* variable references an instance of JobConf, then that JobConf will be used for the Hadoop job.
* Otherwise, a new JobConf will be created on each slave using the enclosed Configuration.
* Otherwise, a new JobConf will be created on each executor using the enclosed Configuration.
* @param initLocalJobConfFuncOpt Optional closure used to initialize any JobConf that HadoopRDD
* creates.
* @param inputFormatClass Storage format of the data to be read.
Expand Down Expand Up @@ -140,7 +140,7 @@ class HadoopRDD[K, V](

private val ignoreEmptySplits = sparkContext.conf.get(HADOOP_RDD_IGNORE_EMPTY_SPLITS)

// Returns a JobConf that will be used on slaves to obtain input splits for Hadoop reads.
// Returns a JobConf that will be used on executors to obtain input splits for Hadoop reads.
protected def getJobConf(): JobConf = {
val conf: Configuration = broadcastedConf.value.value
if (shouldCloneJobConf) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1912,9 +1912,9 @@ private[spark] class DAGScheduler(
* modify the scheduler's internal state. Use executorLost() to post a loss event from outside.
*
* We will also assume that we've lost all shuffle blocks associated with the executor if the
* executor serves its own blocks (i.e., we're not using external shuffle), the entire slave
* is lost (likely including the shuffle service), or a FetchFailed occurred, in which case we
* presume all shuffle data related to this executor to be lost.
* executor serves its own blocks (i.e., we're not using external shuffle), the entire executor
* process is lost (likely including the shuffle service), or a FetchFailed occurred, in which
* case we presume all shuffle data related to this executor to be lost.
*
* Optionally the epoch during which the failure was caught can be passed to avoid allowing
* stray fetch failures from possibly retriggering the detection of a node as lost.
Expand Down Expand Up @@ -2273,7 +2273,7 @@ private[scheduler] class DAGSchedulerEventProcessLoop(dagScheduler: DAGScheduler

case ExecutorLost(execId, reason) =>
val workerLost = reason match {
case SlaveLost(_, true) => true
case ExecutorProcessLost(_, true) => true
case _ => false
}
dagScheduler.handleExecutorLost(execId, workerLost)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package org.apache.spark.scheduler
import org.apache.spark.executor.ExecutorExitCode

/**
* Represents an explanation for an executor or whole slave failing or exiting.
* Represents an explanation for an executor or whole process failing or exiting.
*/
private[spark]
class ExecutorLossReason(val message: String) extends Serializable {
Expand Down Expand Up @@ -56,7 +56,7 @@ private [spark] object LossReasonPending extends ExecutorLossReason("Pending los
* @param workerLost whether the worker is confirmed lost too (i.e. including shuffle service)
*/
private[spark]
case class SlaveLost(_message: String = "Slave lost", workerLost: Boolean = false)
case class ExecutorProcessLost(_message: String = "Worker lost", workerLost: Boolean = false)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the default message should be "Executor Process Lost". Having it be "Worker lost" signals that workerLost flag is true. Ie, this would look confusing if the message is "Worker lost" and the workerLost = false.

extends ExecutorLossReason(_message)

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ private[spark] trait TaskScheduler {

// Invoked after system has successfully initialized (typically in spark context).
// Yarn uses this to bootstrap allocation of resources based on preferred locations,
// wait for slave registrations, etc.
// wait for executor registrations, etc.
def postStartHook(): Unit = { }

// Disconnect from the cluster.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -526,14 +526,14 @@ private[spark] class TaskSchedulerImpl(
}

/**
* Called by cluster manager to offer resources on slaves. We respond by asking our active task
* Called by cluster manager to offer resources on workers. We respond by asking our active task
* sets for tasks in order of priority. We fill each node with tasks in a round-robin manner so
* that tasks are balanced across the cluster.
*/
def resourceOffers(
offers: IndexedSeq[WorkerOffer],
isAllFreeResources: Boolean = true): Seq[Seq[TaskDescription]] = synchronized {
// Mark each slave as alive and remember its hostname
// Mark each worker as alive and remember its hostname
// Also track if new executor is added
var newExecAvail = false
for (o <- offers) {
Expand Down Expand Up @@ -765,7 +765,8 @@ private[spark] class TaskSchedulerImpl(
})
if (executorIdToRunningTaskIds.contains(execId)) {
reason = Some(
SlaveLost(s"Task $tid was lost, so marking the executor as lost as well."))
ExecutorProcessLost(
s"Task $tid was lost, so marking the executor as lost as well."))
removeExecutor(execId, reason.get)
failedExecutor = Some(execId)
}
Expand Down Expand Up @@ -936,7 +937,7 @@ private[spark] class TaskSchedulerImpl(

case None =>
// We may get multiple executorLost() calls with different loss reasons. For example,
// one may be triggered by a dropped connection from the slave while another may be a
// one may be triggered by a dropped connection from the worker while another may be a
// report of executor termination from Mesos. We produce log messages for both so we
// eventually report the termination reason.
logError(s"Lost an executor $executorId (already removed): $reason")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -316,9 +316,10 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
override def onDisconnected(remoteAddress: RpcAddress): Unit = {
addressToExecutorId
.get(remoteAddress)
.foreach(removeExecutor(_, SlaveLost("Remote RPC client disassociated. Likely due to " +
"containers exceeding thresholds, or network issues. Check driver logs for WARN " +
"messages.")))
.foreach(removeExecutor(_,
ExecutorProcessLost("Remote RPC client disassociated. Likely due to " +
"containers exceeding thresholds, or network issues. Check driver logs for WARN " +
"messages.")))
}

// Make fake resource offers on just one executor
Expand Down Expand Up @@ -382,7 +383,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
}
}

// Remove a disconnected slave from the cluster
// Remove a disconnected executor from the cluster
private def removeExecutor(executorId: String, reason: ExecutorLossReason): Unit = {
logDebug(s"Asked to remove executor $executorId with reason $reason")
executorDataMap.get(executorId) match {
Expand Down Expand Up @@ -556,7 +557,8 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
// Remove all the lingering executors that should be removed but not yet. The reason might be
// because (1) disconnected event is not yet received; (2) executors die silently.
executors.foreach { eid =>
removeExecutor(eid, SlaveLost("Stale executor after cluster manager re-registered."))
removeExecutor(eid,
ExecutorProcessLost("Stale executor after cluster manager re-registered."))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ private[spark] class StandaloneSchedulerBackend(
fullId: String, message: String, exitStatus: Option[Int], workerLost: Boolean): Unit = {
val reason: ExecutorLossReason = exitStatus match {
case Some(code) => ExecutorExited(code, exitCausedByApp = true, message)
case None => SlaveLost(message, workerLost = workerLost)
case None => ExecutorProcessLost(message, workerLost = workerLost)
}
logInfo("Executor %s removed: %s".format(fullId, message))
removeExecutor(fullId.split("/")(1), reason)
Expand Down
Loading