Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,17 @@ import org.apache.spark.scheduler.cluster.mesos.MesosClusterSubmissionState
import org.apache.spark.ui.{UIUtils, WebUIPage}

private[mesos] class MesosClusterPage(parent: MesosClusterUI) extends WebUIPage("") {
private val historyServerURL = parent.conf.getOption("spark.mesos.dispatcher.historyServer.url")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be possible to define a constant for "spark.mesos.dispatcher.historyServer.url" (as is in SQL and YARN modules)? See https://github.com/apache/spark/blob/master/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I wasn't aware of that class. We should definitely use it, but I'd like to make that change consistently. I made a JIRA for the migration: https://issues.apache.org/jira/browse/SPARK-16881


def render(request: HttpServletRequest): Seq[Node] = {
val state = parent.scheduler.getSchedulerState()
val queuedHeaders = Seq("Driver ID", "Submit Date", "Main Class", "Driver Resources")
val driverHeaders = queuedHeaders ++

val driverHeader = Seq("Driver ID")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is a header a Seq?!

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just so I can concatenate the sequences to produce a list of headers

val historyHeader = historyServerURL.map(url => Seq("History")).getOrElse(Nil)
val submissionHeader = Seq("Submit Date", "Main Class", "Driver Resources")

val queuedHeaders = driverHeader ++ submissionHeader
val driverHeaders = driverHeader ++ historyHeader ++ submissionHeader ++
Seq("Start Date", "Mesos Slave ID", "State")
val retryHeaders = Seq("Driver ID", "Submit Date", "Description") ++
Seq("Last Failed Status", "Next Retry Time", "Attempt Count")
Expand Down Expand Up @@ -68,8 +75,18 @@ private[mesos] class MesosClusterPage(parent: MesosClusterUI) extends WebUIPage(

private def driverRow(state: MesosClusterSubmissionState): Seq[Node] = {
val id = state.driverDescription.submissionId

val historyCol = if (historyServerURL.isDefined) {
<td>
<a href={s"${historyServerURL.get}/history/${state.frameworkId}"}>
{state.frameworkId}
</a>
</td>
} else Nil

<tr>
<td><a href={s"driver?id=$id"}>{id}</a></td>
{historyCol}
<td>{state.driverDescription.submissionDate}</td>
<td>{state.driverDescription.command.mainClass}</td>
<td>cpus: {state.driverDescription.cores}, mem: {state.driverDescription.mem}</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import org.apache.spark.ui.JettyUtils._
private[spark] class MesosClusterUI(
securityManager: SecurityManager,
port: Int,
conf: SparkConf,
val conf: SparkConf,
dispatcherPublicAddress: String,
val scheduler: MesosClusterScheduler)
extends WebUI(securityManager, securityManager.getSSLOptions("mesos"), port, conf) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,26 +43,30 @@ import org.apache.spark.util.Utils
* @param slaveId Slave ID that the task is assigned to
* @param mesosTaskStatus The last known task status update.
* @param startDate The date the task was launched
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FinishDate is missing.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

* @param finishDate The date the task finished
* @param frameworkId Mesos framework ID the task registers with
*/
private[spark] class MesosClusterSubmissionState(
val driverDescription: MesosDriverDescription,
val taskId: TaskID,
val slaveId: SlaveID,
var mesosTaskStatus: Option[TaskStatus],
var startDate: Date,
var finishDate: Option[Date])
var finishDate: Option[Date],
val frameworkId: String)
extends Serializable {

def copy(): MesosClusterSubmissionState = {
new MesosClusterSubmissionState(
driverDescription, taskId, slaveId, mesosTaskStatus, startDate, finishDate)
driverDescription, taskId, slaveId, mesosTaskStatus, startDate, finishDate, frameworkId)
}
}

/**
* Tracks the retry state of a driver, which includes the next time it should be scheduled
* and necessary information to do exponential backoff.
* This class is not thread-safe, and we expect the caller to handle synchronizing state.
*
* @param lastFailureStatus Last Task status when it failed.
* @param retries Number of times it has been retried.
* @param nextRetry Time at which it should be retried next
Expand All @@ -80,6 +84,7 @@ private[spark] class MesosClusterRetryState(
/**
* The full state of the cluster scheduler, currently being used for displaying
* information on the UI.
*
* @param frameworkId Mesos Framework id for the cluster scheduler.
* @param masterUrl The Mesos master url
* @param queuedDrivers All drivers queued to be launched
Expand Down Expand Up @@ -355,7 +360,15 @@ private[spark] class MesosClusterScheduler(

private def getDriverExecutorURI(desc: MesosDriverDescription): Option[String] = {
desc.conf.getOption("spark.executor.uri")
.orElse(desc.command.environment.get("SPARK_EXECUTOR_URI"))
.orElse(desc.command.environment.get("SPARK_EXECUTOR_URI"))
}

private def adjust[A, B](m: collection.Map[A, B], k: A, default: B)(f: B => B) = {
m.updated(k, f(m.getOrElse(k, default)))
}

private def getDriverFrameworkID(desc: MesosDriverDescription): String = {
s"${frameworkId}-${desc.submissionId}"
}

private def getDriverEnvironment(desc: MesosDriverDescription): Environment = {
Expand All @@ -364,7 +377,11 @@ private[spark] class MesosClusterScheduler(
val executorEnv = Map("SPARK_EXECUTOR_OPTS" -> executorOpts)
val driverEnv = desc.conf.getAllWithPrefix("spark.mesos.driverEnv.")

driverEnv ++ executorEnv ++ desc.command.environment
var commandEnv = adjust(desc.command.environment, "SPARK_SUBMIT_OPTS", "")(
v => s"$v -Dspark.mesos.driver.frameworkId=${getDriverFrameworkID(desc)}"
)

driverEnv ++ executorEnv ++ commandEnv
}

val envBuilder = Environment.newBuilder()
Expand Down Expand Up @@ -552,7 +569,7 @@ private[spark] class MesosClusterScheduler(
logTrace(s"Using offer ${offer.offerId.getValue} to launch driver " +
submission.submissionId)
val newState = new MesosClusterSubmissionState(submission, task.getTaskId, offer.slaveId,
None, new Date(), None)
None, new Date(), None, getDriverFrameworkID(submission))
launchedDrivers(submission.submissionId) = newState
launchedDriversState.persist(submission.submissionId, newState)
afterLaunchCallback(submission.submissionId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,8 +152,13 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
sc.sparkUser,
sc.appName,
sc.conf,
sc.conf.getOption("spark.mesos.driver.webui.url").orElse(sc.ui.map(_.appUIAddress))
sc.conf.getOption("spark.mesos.driver.webui.url").orElse(sc.ui.map(_.appUIAddress)),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mind introducing a constant for spark.mesos.driver.webui.url?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree, but I also want to make this change consistently, so let's make this part of https://issues.apache.org/jira/browse/SPARK-16881

None,
None,
sc.conf.getOption("spark.mesos.driver.frameworkId")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mind introducing a constant for spark.mesos.driver.frameworkId?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

see above

)

unsetFrameworkID(sc)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the driver creates two spark contexts shouldnt it use two different spark confs with that property set up differently?
If no id is specified what will happen? I guess the dispatcher will not know anything. What if you could use a unique id as aframeworkId plus an increment, so the scheduler can detect all frameworks even the ones coming from multiple contexts?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If no id is specified what will happen?

Mesos generates a unique framework ID when none is specified. This PR only sets the framework ID on the first spark context created by the driver.

What if you could use a unique id as aframeworkId plus an increment, so the scheduler can detect all frameworks even the ones coming from multiple contexts?

We can't guarantee no collisions with that approach. It would also be difficult for the dispatcher to determine how many frameworks have been started.

startScheduler(driver)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,13 @@ private[spark] class MesosFineGrainedSchedulerBackend(
sc.sparkUser,
sc.appName,
sc.conf,
sc.conf.getOption("spark.mesos.driver.webui.url").orElse(sc.ui.map(_.appUIAddress))
sc.conf.getOption("spark.mesos.driver.webui.url").orElse(sc.ui.map(_.appUIAddress)),
Option.empty,
Option.empty,
sc.conf.getOption("spark.mesos.driver.frameworkId")
)

unsetFrameworkID(sc)
startScheduler(driver)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -357,4 +357,15 @@ private[mesos] trait MesosSchedulerUtils extends Logging {
sc.conf.getTimeAsSeconds("spark.mesos.rejectOfferDurationForReachedMaxCores", "120s")
}

/**
* spark.mesos.driver.frameworkId is set by the cluster dispatcher to correlate driver
* submissions with frameworkIDs. However, this causes issues when a driver process launches
* more than one framework (more than one SparkContext(, because they all try to register with
* the same frameworkID. To enforce that only the first driver registers with the configured
* framework ID, the driver calls this method after the first registration.
*/
def unsetFrameworkID(sc: SparkContext) {
sc.conf.remove("spark.mesos.driver.frameworkId")
System.clearProperty("spark.mesos.driver.frameworkId")
}
}
10 changes: 10 additions & 0 deletions docs/running-on-mesos.md
Original file line number Diff line number Diff line change
Expand Up @@ -468,6 +468,16 @@ See the [configuration page](configuration.html) for information on Spark config
If unset it will point to Spark's internal web UI.
</td>
</tr>
<tr>
<td><code>spark.mesos.dispatcher.historyServer.url</code></td>
<td><code>(none)</code></td>
<td>
Set the URL of the <a href="http://spark.apache.org/docs/latest/monitoring.html#viewing-after-the-fact">history
server</a>. The dispatcher will then link each driver to its entry
in the history server.
</td>
</tr>

</table>

# Troubleshooting and Debugging
Expand Down