From b3fb5ef2e8587f57b71f08711bb91bc52cd9a12f Mon Sep 17 00:00:00 2001 From: Ke Jia Date: Fri, 15 Aug 2025 23:02:13 +0800 Subject: [PATCH 1/5] Invoke cancelJobGroup before start passing the ExecutionEnd event to prevent execution of canceled tasks. --- .../scala/org/apache/spark/sql/execution/SQLExecution.scala | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala index 1cab0f8d35af5..479e6b0077626 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala @@ -124,6 +124,8 @@ object SQLExecution extends Logging { redactedStr.substring(0, Math.min(truncateLength, redactedStr.length)) }.getOrElse(callSite.shortForm) + sparkSession.sparkContext.setJobGroup(executionId.toString, desc, true) + val globalConfigs = sparkSession.sharedState.conf.getAll.toMap val modifiedConfigs = sparkSession.sessionState.conf.getAllConfs .filterNot { case (key, value) => @@ -211,6 +213,9 @@ object SQLExecution extends Logging { } } } + + sparkSession.sparkContext.cancelJobGroup(executionId.toString) + val event = SparkListenerSQLExecutionEnd( executionId, System.currentTimeMillis(), From 3b57ceb942dc5656d386db26017702f81c107735 Mon Sep 17 00:00:00 2001 From: Ke Jia Date: Mon, 18 Aug 2025 20:56:38 +0800 Subject: [PATCH 2/5] Resolve comments --- .../scala/org/apache/spark/sql/execution/SQLExecution.scala | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala index 479e6b0077626..71fb95af44333 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala @@ -124,8 +124,6 @@ object SQLExecution extends Logging { redactedStr.substring(0, Math.min(truncateLength, redactedStr.length)) }.getOrElse(callSite.shortForm) - sparkSession.sparkContext.setJobGroup(executionId.toString, desc, true) - val globalConfigs = sparkSession.sharedState.conf.getAll.toMap val modifiedConfigs = sparkSession.sessionState.conf.getAllConfs .filterNot { case (key, value) => @@ -214,7 +212,8 @@ object SQLExecution extends Logging { } } - sparkSession.sparkContext.cancelJobGroup(executionId.toString) + sparkSession.sparkContext.cancelJobsWithTag( + executionIdJobTag(sparkSession, executionId)) val event = SparkListenerSQLExecutionEnd( executionId, From 3b27093fbf437eb012c61bc2b9886b30e7f0aea7 Mon Sep 17 00:00:00 2001 From: Ke Jia Date: Tue, 19 Aug 2025 20:49:38 +0800 Subject: [PATCH 3/5] Resolve comments --- .../scala/org/apache/spark/sql/execution/SQLExecution.scala | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala index 71fb95af44333..e094de0ff47ad 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala @@ -212,8 +212,10 @@ object SQLExecution extends Logging { } } - sparkSession.sparkContext.cancelJobsWithTag( - executionIdJobTag(sparkSession, executionId)) + if (executionId == rootExecutionId) { + sparkSession.sparkContext.cancelJobsWithTag( + executionIdJobTag(sparkSession, executionId)) + } val event = SparkListenerSQLExecutionEnd( executionId, From 9794463a03d1dbbe3e6a724eee56f412ca149021 Mon Sep 17 00:00:00 2001 From: Ke Jia Date: Wed, 20 Aug 2025 17:00:03 +0800 Subject: [PATCH 4/5] Resolve comments --- .../scala/org/apache/spark/sql/execution/SQLExecution.scala | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala index e094de0ff47ad..acac346fa7391 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala @@ -212,6 +212,11 @@ object SQLExecution extends Logging { } } + // Cancel all spark jobs associated with this executionID, but only if it's the + // root execution. + + // TODO: Consider enhancing this logic to cancel jobs earlier when nested + // query executions are completed. if (executionId == rootExecutionId) { sparkSession.sparkContext.cancelJobsWithTag( executionIdJobTag(sparkSession, executionId)) From 8c6344fb2c66e968bfbf97fc86874ad782f6ccc7 Mon Sep 17 00:00:00 2001 From: Ke Jia Date: Wed, 12 Nov 2025 16:37:16 +0800 Subject: [PATCH 5/5] log --- .../spark/sql/execution/SQLExecution.scala | 25 +++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala index acac346fa7391..94b18b17cd2f0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala @@ -100,6 +100,20 @@ object SQLExecution extends Logging { if (sc.getLocalProperty(EXECUTION_ROOT_ID_KEY) == null) { sc.setLocalProperty(EXECUTION_ROOT_ID_KEY, executionId.toString) sc.addJobTag(executionIdJobTag(sparkSession, executionId)) + logWarning("EXECUTION_ROOT_ID_KEY is null and addJobTag :" + + " " + executionIdJobTag(sparkSession, executionId) + " " +" " + + "the executionId is " + executionId + "" + + "and the rootExecutionId is " + + "" + sc.getLocalProperty(EXECUTION_ROOT_ID_KEY) + + " and the thread is " + Thread.currentThread().getId + "" + + " local property hash is " + sc.localProperties.hashCode()) + } else { + logWarning("EXECUTION_ROOT_ID_KEY is not null and " + + "the executionId is " + executionId + "" + + "and the rootExecutionId is " + + "" + sc.getLocalProperty(EXECUTION_ROOT_ID_KEY) + " " + + " and the thread is " + Thread.currentThread().getId + "" + + " local property hash is " + sc.localProperties.hashCode()) } val rootExecutionId = sc.getLocalProperty(EXECUTION_ROOT_ID_KEY).toLong executionIdToQueryExecution.put(executionId, queryExecution) @@ -249,6 +263,17 @@ object SQLExecution extends Logging { if (sc.getLocalProperty(EXECUTION_ROOT_ID_KEY) == executionId.toString) { sc.setLocalProperty(EXECUTION_ROOT_ID_KEY, null) sc.removeJobTag(executionIdJobTag(sparkSession, executionId)) + logWarning("if branch removing the jobTag " + + "" + executionIdJobTag(sparkSession, executionId) + " thread is " + + "" + Thread.currentThread().getId + " executionId is " + executionId + + " rootExecutionId is " + sc.getLocalProperty(EXECUTION_ROOT_ID_KEY) + "" + + " local property hash is " + sc.localProperties.hashCode()) + } else { + logWarning("not removing the jobTag " + + "" + executionIdJobTag(sparkSession, executionId) + " thread is " + + "" + Thread.currentThread().getId + " executionId is " + executionId + + " rootExecutionId is " + sc.getLocalProperty(EXECUTION_ROOT_ID_KEY) + "" + + " local property hash is " + sc.localProperties.hashCode()) } sc.setLocalProperty(SPARK_JOB_INTERRUPT_ON_CANCEL, originalInterruptOnCancel) }