Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ private[history] abstract class ApplicationHistoryProvider {
*
* @return List of all know applications.
*/
def getListing(): Iterable[ApplicationHistoryInfo]
def getListing(): Iterator[ApplicationHistoryInfo]

/**
* Returns the Spark UI for a specific application.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
}
}

override def getListing(): Iterable[FsApplicationHistoryInfo] = applications.values
override def getListing(): Iterator[FsApplicationHistoryInfo] = applications.values.iterator

override def getApplicationInfo(appId: String): Option[FsApplicationHistoryInfo] = {
applications.get(appId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,7 @@ private[history] class HistoryPage(parent: HistoryServer) extends WebUIPage("")
val requestedIncomplete =
Option(request.getParameter("showIncomplete")).getOrElse("false").toBoolean

val allApps = parent.getApplicationList()
.filter(_.completed != requestedIncomplete)
val allAppsSize = allApps.size

val allAppsSize = parent.getApplicationList().count(_.completed != requestedIncomplete)
val providerConfig = parent.getProviderConfig()
val content =
<div>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,12 +174,12 @@ class HistoryServer(
*
* @return List of all known applications.
*/
def getApplicationList(): Iterable[ApplicationHistoryInfo] = {
def getApplicationList(): Iterator[ApplicationHistoryInfo] = {
provider.getListing()
}

def getApplicationInfoList: Iterator[ApplicationInfo] = {
getApplicationList().iterator.map(ApplicationsListResource.appHistoryInfoToPublicAppInfo)
getApplicationList().map(ApplicationsListResource.appHistoryInfoToPublicAppInfo)
}

def getApplicationInfo(appId: String): Option[ApplicationInfo] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
*/
package org.apache.spark.status.api.v1

import java.util.{Arrays, Date, List => JList}
import java.util.{Date, List => JList}
import javax.ws.rs.{DefaultValue, GET, Produces, QueryParam}
import javax.ws.rs.core.MediaType

Expand All @@ -32,33 +32,21 @@ private[v1] class ApplicationListResource(uiRoot: UIRoot) {
@DefaultValue("3000-01-01") @QueryParam("maxDate") maxDate: SimpleDateParam,
@QueryParam("limit") limit: Integer)
: Iterator[ApplicationInfo] = {
val allApps = uiRoot.getApplicationInfoList
val adjStatus = {
if (status.isEmpty) {
Arrays.asList(ApplicationStatus.values(): _*)
} else {
status
}
}
val includeCompleted = adjStatus.contains(ApplicationStatus.COMPLETED)
val includeRunning = adjStatus.contains(ApplicationStatus.RUNNING)
val appList = allApps.filter { app =>

val numApps = Option(limit).map(_.toInt).getOrElse(Integer.MAX_VALUE)
val includeCompleted = status.isEmpty || status.contains(ApplicationStatus.COMPLETED)
val includeRunning = status.isEmpty || status.contains(ApplicationStatus.RUNNING)

uiRoot.getApplicationInfoList.filter { app =>
val anyRunning = app.attempts.exists(!_.completed)
// if any attempt is still running, we consider the app to also still be running
val statusOk = (!anyRunning && includeCompleted) ||
(anyRunning && includeRunning)
// if any attempt is still running, we consider the app to also still be running;
// keep the app if *any* attempts fall in the right time window
val dateOk = app.attempts.exists { attempt =>
attempt.startTime.getTime >= minDate.timestamp &&
attempt.startTime.getTime <= maxDate.timestamp
((!anyRunning && includeCompleted) || (anyRunning && includeRunning)) &&
app.attempts.exists { attempt =>
val start = attempt.startTime.getTime
start >= minDate.timestamp && start <= maxDate.timestamp
}
statusOk && dateOk
}
if (limit != null) {
appList.take(limit)
} else {
appList
}
}.take(numApps)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -447,15 +447,15 @@ class HistoryServerSuite extends SparkFunSuite with BeforeAndAfter with Matchers
assert(4 === getNumJobsRestful(), s"two jobs back-to-back not updated, server=$server\n")
}
val jobcount = getNumJobs("/jobs")
assert(!provider.getListing().head.completed)
assert(!provider.getListing().next.completed)

listApplications(false) should contain(appId)

// stop the spark context
resetSparkContext()
// check the app is now found as completed
eventually(stdTimeout, stdInterval) {
assert(provider.getListing().head.completed,
assert(provider.getListing().next.completed,
s"application never completed, server=$server\n")
}

Expand Down
2 changes: 2 additions & 0 deletions project/MimaExcludes.scala
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ object MimaExcludes {
// Exclude rules for 2.1.x
lazy val v21excludes = v20excludes ++ {
Seq(
// [SPARK-17671] Spark 2.0 history server summary page is slow even set spark.history.ui.maxApplications
ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.deploy.history.HistoryServer.getApplicationList"),
// [SPARK-14743] Improve delegation token handling in secure cluster
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.deploy.SparkHadoopUtil.getTimeFromNowToRenewal"),
// [SPARK-16199][SQL] Add a method to list the referenced columns in data source Filter
Expand Down