diff --git a/core/src/test/scala/org/apache/spark/status/ListenerEventsTestHelper.scala b/core/src/test/scala/org/apache/spark/status/ListenerEventsTestHelper.scala index 585c8cc2ae6d4..4b3fbacc47f9c 100644 --- a/core/src/test/scala/org/apache/spark/status/ListenerEventsTestHelper.scala +++ b/core/src/test/scala/org/apache/spark/status/ListenerEventsTestHelper.scala @@ -137,6 +137,53 @@ object ListenerEventsTestHelper { SparkListenerExecutorMetricsUpdate(executorId.toString, accum, executorUpdates) } + case class JobInfo( + stageIds: Seq[Int], + stageToTaskIds: Map[Int, Seq[Long]], + stageToRddIds: Map[Int, Seq[Int]]) + + def pushJobEventsWithoutJobEnd( + listener: SparkListener, + jobId: Int, + jobProps: Properties, + execIds: Array[String], + time: Long): JobInfo = { + // Start a job with 1 stage / 4 tasks each + val rddsForStage = createRdds(2) + val stage = createStage(rddsForStage, Nil) + + listener.onJobStart(SparkListenerJobStart(jobId, time, Seq(stage), jobProps)) + + // Submit stage + stage.submissionTime = Some(time) + listener.onStageSubmitted(SparkListenerStageSubmitted(stage, jobProps)) + + // Start tasks from stage + val s1Tasks = createTasks(4, execIds, time) + s1Tasks.foreach { task => + listener.onTaskStart(SparkListenerTaskStart(stage.stageId, + stage.attemptNumber(), task)) + } + + // Succeed all tasks in stage. + val s1Metrics = TaskMetrics.empty + s1Metrics.setExecutorCpuTime(2L) + s1Metrics.setExecutorRunTime(4L) + + s1Tasks.foreach { task => + task.markFinished(TaskState.FINISHED, time) + listener.onTaskEnd(SparkListenerTaskEnd(stage.stageId, stage.attemptNumber, + "taskType", Success, task, new ExecutorMetrics, s1Metrics)) + } + + // End stage. + stage.completionTime = Some(time) + listener.onStageCompleted(SparkListenerStageCompleted(stage)) + + JobInfo(Seq(stage.stageId), Map(stage.stageId -> s1Tasks.map(_.taskId)), + Map(stage.stageId -> rddsForStage.map(_.id))) + } + private def nextTaskId(): Long = { taskIdTracker += 1 taskIdTracker diff --git a/sql/core/src/main/resources/META-INF/services/org.apache.spark.deploy.history.EventFilterBuilder b/sql/core/src/main/resources/META-INF/services/org.apache.spark.deploy.history.EventFilterBuilder new file mode 100644 index 0000000000000..5025616b752d1 --- /dev/null +++ b/sql/core/src/main/resources/META-INF/services/org.apache.spark.deploy.history.EventFilterBuilder @@ -0,0 +1 @@ +org.apache.spark.sql.execution.history.SQLEventFilterBuilder \ No newline at end of file diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/history/SQLEventFilterBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/history/SQLEventFilterBuilder.scala new file mode 100644 index 0000000000000..fbd729b9d7c37 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/history/SQLEventFilterBuilder.scala @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.history + +import scala.collection.mutable + +import org.apache.spark.deploy.history.{EventFilter, EventFilterBuilder, JobEventFilter} +import org.apache.spark.internal.Logging +import org.apache.spark.scheduler._ +import org.apache.spark.sql.execution.SQLExecution +import org.apache.spark.sql.execution.ui._ +import org.apache.spark.sql.streaming.StreamingQueryListener + +/** + * This class tracks live SQL executions, and pass the list to the [[SQLLiveEntitiesEventFilter]] + * to help SQLLiveEntitiesEventFilter to accept live SQL executions as well as relevant + * jobs (+ stages/tasks/RDDs). + * + * Note that this class only tracks the jobs which are relevant to SQL executions - cannot classify + * between finished job and live job without relation of SQL execution. + */ +private[spark] class SQLEventFilterBuilder extends SparkListener with EventFilterBuilder { + private val _liveExecutionToJobs = new mutable.HashMap[Long, mutable.Set[Int]] + private val _jobToStages = new mutable.HashMap[Int, Set[Int]] + private val _stageToTasks = new mutable.HashMap[Int, mutable.Set[Long]] + private val _stageToRDDs = new mutable.HashMap[Int, Set[Int]] + private val stages = new mutable.HashSet[Int] + + def liveSQLExecutions: Set[Long] = _liveExecutionToJobs.keySet.toSet + def liveJobs: Set[Int] = _liveExecutionToJobs.values.flatten.toSet + def liveStages: Set[Int] = _stageToRDDs.keySet.toSet + def liveTasks: Set[Long] = _stageToTasks.values.flatten.toSet + def liveRDDs: Set[Int] = _stageToRDDs.values.flatten.toSet + + override def onJobStart(jobStart: SparkListenerJobStart): Unit = { + val executionIdString = jobStart.properties.getProperty(SQLExecution.EXECUTION_ID_KEY) + if (executionIdString == null) { + // This is not a job created by SQL + return + } + + val executionId = executionIdString.toLong + val jobId = jobStart.jobId + + val jobsForExecution = _liveExecutionToJobs.getOrElseUpdate(executionId, + mutable.HashSet[Int]()) + jobsForExecution += jobId + + _jobToStages += jobStart.jobId -> jobStart.stageIds.toSet + stages ++= jobStart.stageIds + } + + override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted): Unit = { + val stageId = stageSubmitted.stageInfo.stageId + if (stages.contains(stageId)) { + _stageToRDDs.put(stageId, stageSubmitted.stageInfo.rddInfos.map(_.id).toSet) + _stageToTasks.getOrElseUpdate(stageId, new mutable.HashSet[Long]()) + } + } + + override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = { + _stageToTasks.get(taskStart.stageId).foreach { tasks => + tasks += taskStart.taskInfo.taskId + } + } + + override def onOtherEvent(event: SparkListenerEvent): Unit = event match { + case e: SparkListenerSQLExecutionStart => onExecutionStart(e) + case e: SparkListenerSQLExecutionEnd => onExecutionEnd(e) + case _ => // Ignore + } + + private def onExecutionStart(event: SparkListenerSQLExecutionStart): Unit = { + _liveExecutionToJobs += event.executionId -> mutable.HashSet[Int]() + } + + private def onExecutionEnd(event: SparkListenerSQLExecutionEnd): Unit = { + _liveExecutionToJobs.remove(event.executionId).foreach { jobs => + val stagesToDrop = _jobToStages.filter(kv => jobs.contains(kv._1)).values.flatten + _jobToStages --= jobs + stages --= stagesToDrop + _stageToTasks --= stagesToDrop + _stageToRDDs --= stagesToDrop + } + } + + override def createFilter(): EventFilter = { + new SQLLiveEntitiesEventFilter(liveSQLExecutions, liveJobs, liveStages, liveTasks, liveRDDs) + } +} + +/** + * This class accepts events which are related to the live SQL executions based on the given + * information. + * + * Note that acceptFn will not match the event ("Don't mind") instead of returning false on + * job related events, because it cannot determine whether the job is related to the finished + * SQL executions, or job is NOT related to the SQL executions. For this case, it just gives up + * the decision and let other filters decide it. + */ +private[spark] class SQLLiveEntitiesEventFilter( + liveSQLExecutions: Set[Long], + _liveJobs: Set[Int], + _liveStages: Set[Int], + _liveTasks: Set[Long], + _liveRDDs: Set[Int]) + extends JobEventFilter(None, _liveJobs, _liveStages, _liveTasks, _liveRDDs) with Logging { + + logDebug(s"live SQL executions : $liveSQLExecutions") + + private val _acceptFn: PartialFunction[SparkListenerEvent, Boolean] = { + case e: SparkListenerSQLExecutionStart => + liveSQLExecutions.contains(e.executionId) + case e: SparkListenerSQLAdaptiveExecutionUpdate => + liveSQLExecutions.contains(e.executionId) + case e: SparkListenerSQLExecutionEnd => + liveSQLExecutions.contains(e.executionId) + case e: SparkListenerDriverAccumUpdates => + liveSQLExecutions.contains(e.executionId) + + case e if acceptFnForJobEvents.lift(e).contains(true) => + // NOTE: if acceptFnForJobEvents(e) returns false, we should leave it to "unmatched" + // because we don't know whether the job has relevant SQL execution which is finished, + // or the job is not related to the SQL execution. + true + + // these events are for finished batches so safer to ignore + case _: StreamingQueryListener.QueryProgressEvent => false + } + + override def acceptFn(): PartialFunction[SparkListenerEvent, Boolean] = _acceptFn +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/history/SQLEventFilterBuilderSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/history/SQLEventFilterBuilderSuite.scala new file mode 100644 index 0000000000000..5f3d750e8f271 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/history/SQLEventFilterBuilderSuite.scala @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.history + +import java.util.Properties + +import org.apache.spark.SparkFunSuite +import org.apache.spark.scheduler._ +import org.apache.spark.sql.execution.{SparkPlanInfo, SQLExecution} +import org.apache.spark.sql.execution.ui.{SparkListenerSQLExecutionEnd, SparkListenerSQLExecutionStart} +import org.apache.spark.status.ListenerEventsTestHelper + +class SQLEventFilterBuilderSuite extends SparkFunSuite { + import ListenerEventsTestHelper._ + + override protected def beforeEach(): Unit = { + ListenerEventsTestHelper.reset() + } + + test("track live SQL executions") { + var time = 0L + + val listener = new SQLEventFilterBuilder + + listener.onOtherEvent(SparkListenerLogStart("TestSparkVersion")) + + // Start the application. + time += 1 + listener.onApplicationStart(SparkListenerApplicationStart( + "name", + Some("id"), + time, + "user", + Some("attempt"), + None)) + + // Start a couple of executors. + time += 1 + val execIds = Array("1", "2") + execIds.foreach { id => + listener.onExecutorAdded(createExecutorAddedEvent(id, time)) + } + + // Start SQL Execution + listener.onOtherEvent(SparkListenerSQLExecutionStart(1, "desc1", "details1", "plan", + new SparkPlanInfo("node", "str", Seq.empty, Map.empty, Seq.empty), time)) + + time += 1 + + // job 1, 2: coupled with SQL execution 1, finished + val jobProp = createJobProps() + val jobPropWithSqlExecution = new Properties(jobProp) + jobPropWithSqlExecution.setProperty(SQLExecution.EXECUTION_ID_KEY, "1") + val jobInfoForJob1 = pushJobEventsWithoutJobEnd(listener, 1, jobPropWithSqlExecution, + execIds, time) + listener.onJobEnd(SparkListenerJobEnd(1, time, JobSucceeded)) + + val jobInfoForJob2 = pushJobEventsWithoutJobEnd(listener, 2, jobPropWithSqlExecution, + execIds, time) + listener.onJobEnd(SparkListenerJobEnd(2, time, JobSucceeded)) + + // job 3: not coupled with SQL execution 1, finished + pushJobEventsWithoutJobEnd(listener, 3, jobProp, execIds, time) + listener.onJobEnd(SparkListenerJobEnd(3, time, JobSucceeded)) + + // job 4: not coupled with SQL execution 1, not finished + pushJobEventsWithoutJobEnd(listener, 4, jobProp, execIds, time) + listener.onJobEnd(SparkListenerJobEnd(4, time, JobSucceeded)) + + assert(listener.liveSQLExecutions === Set(1)) + + // only SQL executions related jobs are tracked + assert(listener.liveJobs === Set(1, 2)) + assert(listener.liveStages === + (jobInfoForJob1.stageIds ++ jobInfoForJob2.stageIds).toSet) + assert(listener.liveTasks === + (jobInfoForJob1.stageToTaskIds.values.flatten ++ + jobInfoForJob2.stageToTaskIds.values.flatten).toSet) + assert(listener.liveRDDs === + (jobInfoForJob1.stageToRddIds.values.flatten ++ + jobInfoForJob2.stageToRddIds.values.flatten).toSet) + + // End SQL execution + listener.onOtherEvent(SparkListenerSQLExecutionEnd(1, 0)) + + assert(listener.liveSQLExecutions.isEmpty) + assert(listener.liveJobs.isEmpty) + assert(listener.liveStages.isEmpty) + assert(listener.liveTasks.isEmpty) + assert(listener.liveRDDs.isEmpty) + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/history/SQLLiveEntitiesEventFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/history/SQLLiveEntitiesEventFilterSuite.scala new file mode 100644 index 0000000000000..46fdaba413c6e --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/history/SQLLiveEntitiesEventFilterSuite.scala @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.history + +import org.apache.spark.{SparkFunSuite, Success, TaskState} +import org.apache.spark.executor.ExecutorMetrics +import org.apache.spark.scheduler._ +import org.apache.spark.sql.execution.ui.{SparkListenerDriverAccumUpdates, SparkListenerSQLAdaptiveExecutionUpdate, SparkListenerSQLExecutionEnd, SparkListenerSQLExecutionStart} +import org.apache.spark.status.ListenerEventsTestHelper.{createRddsWithId, createStage, createTasks} + +class SQLLiveEntitiesEventFilterSuite extends SparkFunSuite { + test("filter in events for jobs related to live SQL execution") { + // assume finished job 1 with stage 1, task (1, 2), rdds (1, 2) and finished sql execution id 1 + // live job 2 with stages 2, tasks (3, 4), rdds (3, 4) and job 2 belongs to the live + // sql execution id 2 + + val liveSQLExecutions = Set(2L) + val liveJobs = Set(2) + val liveStages = Set(2, 3) + val liveTasks = Set(3L, 4L, 5L, 6L) + val liveRDDs = Set(3, 4, 5, 6) + val liveExecutors: Set[String] = Set("1", "2") + + val filter = new SQLLiveEntitiesEventFilter(liveSQLExecutions, liveJobs, liveStages, liveTasks, + liveRDDs) + val acceptFn = filter.acceptFn().lift + + // Verifying with finished SQL execution 1 + assert(Some(false) === acceptFn(SparkListenerSQLExecutionStart(1, "description1", "details1", + "plan", null, 0))) + assert(Some(false) === acceptFn(SparkListenerSQLExecutionEnd(1, 0))) + assert(Some(false) === acceptFn(SparkListenerSQLAdaptiveExecutionUpdate(1, "plan", null))) + assert(Some(false) === acceptFn(SparkListenerDriverAccumUpdates(1, Seq.empty))) + + // Verifying with finished job 1 + val rddsForStage1 = createRddsWithId(1 to 2) + val stage1 = createStage(1, rddsForStage1, Nil) + val tasksForStage1 = createTasks(Seq(1L, 2L), liveExecutors.toArray, 0) + tasksForStage1.foreach { task => task.markFinished(TaskState.FINISHED, 5) } + + val jobStartEventForJob1 = SparkListenerJobStart(1, 0, Seq(stage1)) + val jobEndEventForJob1 = SparkListenerJobEnd(1, 0, JobSucceeded) + val stageSubmittedEventsForJob1 = SparkListenerStageSubmitted(stage1) + val stageCompletedEventsForJob1 = SparkListenerStageCompleted(stage1) + val unpersistRDDEventsForJob1 = (1 to 2).map(SparkListenerUnpersistRDD) + + // job events for finished job should be considered as "don't know" + assert(None === acceptFn(jobStartEventForJob1)) + assert(None === acceptFn(jobEndEventForJob1)) + + // stage events for finished job should be considered as "don't know" + assert(None === acceptFn(stageSubmittedEventsForJob1)) + assert(None === acceptFn(stageCompletedEventsForJob1)) + unpersistRDDEventsForJob1.foreach { event => + assert(None === acceptFn(event)) + } + + val taskSpeculativeTaskSubmittedEvent = SparkListenerSpeculativeTaskSubmitted(stage1.stageId, + stageAttemptId = 1) + assert(None === acceptFn(taskSpeculativeTaskSubmittedEvent)) + + // task events for finished job should be considered as "don't know" + tasksForStage1.foreach { task => + val taskStartEvent = SparkListenerTaskStart(stage1.stageId, 0, task) + assert(None === acceptFn(taskStartEvent)) + + val taskGettingResultEvent = SparkListenerTaskGettingResult(task) + assert(None === acceptFn(taskGettingResultEvent)) + + val taskEndEvent = SparkListenerTaskEnd(stage1.stageId, 0, "taskType", + Success, task, new ExecutorMetrics, null) + assert(None === acceptFn(taskEndEvent)) + } + + // Verifying with live SQL execution 2 + assert(Some(true) === acceptFn(SparkListenerSQLExecutionStart(2, "description2", "details2", + "plan", null, 0))) + assert(Some(true) === acceptFn(SparkListenerSQLExecutionEnd(2, 0))) + assert(Some(true) === acceptFn(SparkListenerSQLAdaptiveExecutionUpdate(2, "plan", null))) + assert(Some(true) === acceptFn(SparkListenerDriverAccumUpdates(2, Seq.empty))) + + // Verifying with live job 2 + val rddsForStage2 = createRddsWithId(3 to 4) + val stage2 = createStage(2, rddsForStage2, Nil) + val tasksForStage2 = createTasks(Seq(3L, 4L), liveExecutors.toArray, 0) + tasksForStage1.foreach { task => task.markFinished(TaskState.FINISHED, 5) } + + val jobStartEventForJob2 = SparkListenerJobStart(2, 0, Seq(stage2)) + val stageSubmittedEventsForJob2 = SparkListenerStageSubmitted(stage2) + val stageCompletedEventsForJob2 = SparkListenerStageCompleted(stage2) + val unpersistRDDEventsForJob2 = rddsForStage2.map { rdd => SparkListenerUnpersistRDD(rdd.id) } + + // job events for live job should be accepted + assert(Some(true) === acceptFn(jobStartEventForJob2)) + + // stage events for live job should be accepted + assert(Some(true) === acceptFn(stageSubmittedEventsForJob2)) + assert(Some(true) === acceptFn(stageCompletedEventsForJob2)) + unpersistRDDEventsForJob2.foreach { event => + assert(Some(true) === acceptFn(event)) + } + + val taskSpeculativeTaskSubmittedEvent2 = SparkListenerSpeculativeTaskSubmitted(stage2.stageId, + stageAttemptId = 1) + assert(Some(true) === acceptFn(taskSpeculativeTaskSubmittedEvent2)) + + // task events for live job should be accepted + tasksForStage2.foreach { task => + val taskStartEvent = SparkListenerTaskStart(stage2.stageId, 0, task) + assert(Some(true) === acceptFn(taskStartEvent)) + + val taskGettingResultEvent = SparkListenerTaskGettingResult(task) + assert(Some(true) === acceptFn(taskGettingResultEvent)) + + val taskEndEvent = SparkListenerTaskEnd(stage1.stageId, 0, "taskType", + Success, task, new ExecutorMetrics, null) + assert(Some(true) === acceptFn(taskEndEvent)) + } + } +}