From 616b5e3237cb794cc87e144d830607dafedc6506 Mon Sep 17 00:00:00 2001 From: Karuppayya Rajendran Date: Mon, 18 Aug 2025 16:17:41 -0700 Subject: [PATCH 1/5] SPARK-53413: Shuffle cleanup for commands Fix --- .../spark/sql/execution/QueryExecution.scala | 2 +- .../spark/sql/execution/SQLExecution.scala | 23 +++++++++++++------ .../internal/BaseSessionStateBuilder.scala | 8 +++---- .../spark/sql/internal/SessionState.scala | 8 ++++--- .../sql/execution/QueryExecutionSuite.scala | 18 ++++++++++++++- 5 files changed, 43 insertions(+), 16 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala index 9f59bded94fe1..b280b7fa86b2a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala @@ -150,7 +150,7 @@ class QueryExecution( // with the rest of processing of the root plan being just outputting command results, // for eagerly executed commands we mark this place as beginning of execution. tracker.setReadyForExecution() - val qe = sparkSession.sessionState.executePlan(p, mode) + val qe = sparkSession.sessionState.executePlan(p, mode, shuffleCleanupMode) val result = QueryExecution.withInternalError(s"Eagerly executed $name failed.") { SQLExecution.withNewExecutionId(qe, Some(name)) { qe.executedPlan.executeCollect() diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala index c5c2f9bb6a6f6..d036283f0a13a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala @@ -30,6 +30,7 @@ import org.apache.spark.internal.config.{SPARK_DRIVER_PREFIX, SPARK_EXECUTOR_PRE import org.apache.spark.internal.config.Tests.IS_TESTING import org.apache.spark.sql.classic.SparkSession import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec +import org.apache.spark.sql.execution.datasources.v2.V2CommandExec import org.apache.spark.sql.execution.exchange.ShuffleExchangeLike import org.apache.spark.sql.execution.ui.{SparkListenerSQLExecutionEnd, SparkListenerSQLExecutionStart} import org.apache.spark.sql.internal.SQLConf @@ -68,6 +69,17 @@ object SQLExecution extends Logging { } } + private def extractShuffleIds(plan: SparkPlan): Seq[Int] = { + plan match { + case ae: AdaptiveSparkPlanExec => + ae.context.shuffleIds.asScala.keys.toSeq + case nonAdaptivePlan => + nonAdaptivePlan.collect { + case exec: ShuffleExchangeLike => exec.shuffleId + } + } + } + /** * Wrap an action that will execute "queryExecution" to track all Spark jobs in the body so that * we can connect them with an execution. @@ -177,13 +189,10 @@ object SQLExecution extends Logging { if (queryExecution.shuffleCleanupMode != DoNotCleanup && isExecutedPlanAvailable) { val shuffleIds = queryExecution.executedPlan match { - case ae: AdaptiveSparkPlanExec => - ae.context.shuffleIds.asScala.keys - case nonAdaptivePlan => - nonAdaptivePlan.collect { - case exec: ShuffleExchangeLike => - exec.shuffleId - } + case command: V2CommandExec => + command.children.flatMap(extractShuffleIds) + case plan => + extractShuffleIds(plan) } shuffleIds.foreach { shuffleId => queryExecution.shuffleCleanupMode match { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala index ada30cde27cd0..eb74da2d50b75 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala @@ -30,7 +30,7 @@ import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.classic.{SparkSession, Strategy, StreamingQueryManager, UDFRegistration} import org.apache.spark.sql.connector.catalog.CatalogManager import org.apache.spark.sql.errors.QueryCompilationErrors -import org.apache.spark.sql.execution.{ColumnarRule, CommandExecutionMode, QueryExecution, SparkOptimizer, SparkPlanner, SparkSqlParser} +import org.apache.spark.sql.execution.{ColumnarRule, CommandExecutionMode, QueryExecution, ShuffleCleanupMode, SparkOptimizer, SparkPlanner, SparkSqlParser} import org.apache.spark.sql.execution.adaptive.AdaptiveRulesHolder import org.apache.spark.sql.execution.aggregate.{ResolveEncodersInScalaAgg, ScalaUDAF} import org.apache.spark.sql.execution.analysis.DetectAmbiguousSelfJoin @@ -406,9 +406,9 @@ abstract class BaseSessionStateBuilder( * Create a query execution object. */ protected def createQueryExecution: - (LogicalPlan, CommandExecutionMode.Value) => QueryExecution = - (plan, mode) => new QueryExecution(session, plan, mode = mode, - shuffleCleanupMode = QueryExecution.determineShuffleCleanupMode(session.sessionState.conf)) + (LogicalPlan, CommandExecutionMode.Value, ShuffleCleanupMode) => QueryExecution = + (plan, mode, cleanupMode) => new QueryExecution(session, plan, mode = mode, + shuffleCleanupMode = cleanupMode) /** * Interface to start and stop streaming queries. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala index 440148989ffb9..e20cf0c95f6ce 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala @@ -86,7 +86,8 @@ private[sql] class SessionState( val streamingQueryManagerBuilder: () => StreamingQueryManager, val listenerManager: ExecutionListenerManager, resourceLoaderBuilder: () => SessionResourceLoader, - createQueryExecution: (LogicalPlan, CommandExecutionMode.Value) => QueryExecution, + createQueryExecution: (LogicalPlan, CommandExecutionMode.Value, + ShuffleCleanupMode) => QueryExecution, createClone: (SparkSession, SessionState) => SessionState, val columnarRules: Seq[ColumnarRule], val adaptiveRulesHolder: AdaptiveRulesHolder, @@ -135,8 +136,9 @@ private[sql] class SessionState( def executePlan( plan: LogicalPlan, - mode: CommandExecutionMode.Value = CommandExecutionMode.ALL): QueryExecution = - createQueryExecution(plan, mode) + mode: CommandExecutionMode.Value = CommandExecutionMode.ALL, + shuffleCleanUpMode: ShuffleCleanupMode = DoNotCleanup): QueryExecution = + createQueryExecution(plan, mode, shuffleCleanUpMode) } private[sql] object SessionState { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/QueryExecutionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/QueryExecutionSuite.scala index a4e4a407255c0..77819efaa9fe9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/QueryExecutionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/QueryExecutionSuite.scala @@ -20,7 +20,7 @@ import scala.collection.mutable import scala.io.Source import scala.util.Try -import org.apache.spark.sql.{AnalysisException, ExtendedExplainGenerator, FastOperator} +import org.apache.spark.sql.{AnalysisException, ExtendedExplainGenerator, FastOperator, SaveMode} import org.apache.spark.sql.catalyst.{QueryPlanningTracker, QueryPlanningTrackerCallback, TableIdentifier} import org.apache.spark.sql.catalyst.analysis.{CurrentNamespace, UnresolvedFunction, UnresolvedRelation} import org.apache.spark.sql.catalyst.expressions.{Alias, UnsafeRow} @@ -327,6 +327,22 @@ class QueryExecutionSuite extends SharedSparkSession { } } + test("SPARK-53413: Cleanup shuffle dependencies for commands") { + Seq(true, false).foreach { adaptiveEnabled => { + withSQLConf((SQLConf.ADAPTIVE_EXECUTION_ENABLED.key, adaptiveEnabled.toString), + (SQLConf.CLASSIC_SHUFFLE_DEPENDENCY_FILE_CLEANUP_ENABLED.key, "true")) { + val plan = spark.range(100).repartition(10).logicalPlan + val df = Dataset.ofRows(spark, plan, RemoveShuffleFiles) + df.write.format("noop").mode(SaveMode.Overwrite).save() + + val blockManager = spark.sparkContext.env.blockManager + assert(blockManager.migratableResolver.getStoredShuffles().isEmpty) + assert(blockManager.diskBlockManager.getAllBlocks().isEmpty) + } + } + } + } + test("SPARK-47764: Cleanup shuffle dependencies - DoNotCleanup mode") { Seq(true, false).foreach { adaptiveEnabled => { withSQLConf((SQLConf.ADAPTIVE_EXECUTION_ENABLED.key, adaptiveEnabled.toString)) { From 602dfb19e78ca6e12d4c6918e0842381111dbb9c Mon Sep 17 00:00:00 2001 From: Karuppayya Rajendran Date: Thu, 28 Aug 2025 23:03:46 -0700 Subject: [PATCH 2/5] Add logs, handle DataWritingCommandExec --- .../scala/org/apache/spark/sql/internal/SQLConf.scala | 3 ++- .../org/apache/spark/sql/execution/QueryExecution.scala | 3 ++- .../org/apache/spark/sql/execution/SQLExecution.scala | 7 +++++++ .../spark/sql/internal/BaseSessionStateBuilder.scala | 8 ++++---- .../org/apache/spark/sql/internal/SessionState.scala | 8 +++----- 5 files changed, 18 insertions(+), 11 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 86807914c2362..557235852fbdb 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -3586,7 +3586,8 @@ object SQLConf { val CLASSIC_SHUFFLE_DEPENDENCY_FILE_CLEANUP_ENABLED = buildConf("spark.sql.classic.shuffleDependency.fileCleanup.enabled") .doc("When enabled, shuffle files will be cleaned up at the end of classic " + - "SQL executions.") + "SQL executions. Note that this cleanup may cause stage retries and regenerate " + + "shuffle files if the same dataframe reference is executed again.") .version("4.1.0") .booleanConf .createWithDefault(Utils.isTesting) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala index b280b7fa86b2a..6d27740bcea90 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala @@ -150,7 +150,8 @@ class QueryExecution( // with the rest of processing of the root plan being just outputting command results, // for eagerly executed commands we mark this place as beginning of execution. tracker.setReadyForExecution() - val qe = sparkSession.sessionState.executePlan(p, mode, shuffleCleanupMode) + val qe = new QueryExecution(sparkSession, p, mode = mode, + shuffleCleanupMode = shuffleCleanupMode) val result = QueryExecution.withInternalError(s"Eagerly executed $name failed.") { SQLExecution.withNewExecutionId(qe, Some(name)) { qe.executedPlan.executeCollect() diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala index d036283f0a13a..6f81a8d812343 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala @@ -30,6 +30,7 @@ import org.apache.spark.internal.config.{SPARK_DRIVER_PREFIX, SPARK_EXECUTOR_PRE import org.apache.spark.internal.config.Tests.IS_TESTING import org.apache.spark.sql.classic.SparkSession import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec +import org.apache.spark.sql.execution.command.{DataWritingCommandExec, ExecutedCommandExec} import org.apache.spark.sql.execution.datasources.v2.V2CommandExec import org.apache.spark.sql.execution.exchange.ShuffleExchangeLike import org.apache.spark.sql.execution.ui.{SparkListenerSQLExecutionEnd, SparkListenerSQLExecutionStart} @@ -191,6 +192,12 @@ object SQLExecution extends Logging { val shuffleIds = queryExecution.executedPlan match { case command: V2CommandExec => command.children.flatMap(extractShuffleIds) + case dataWritingCommand: DataWritingCommandExec => + // TODO: Write tests + extractShuffleIds(dataWritingCommand.child) + case _: ExecutedCommandExec => + // TODO: Discuss + Seq() case plan => extractShuffleIds(plan) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala index eb74da2d50b75..ada30cde27cd0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala @@ -30,7 +30,7 @@ import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.classic.{SparkSession, Strategy, StreamingQueryManager, UDFRegistration} import org.apache.spark.sql.connector.catalog.CatalogManager import org.apache.spark.sql.errors.QueryCompilationErrors -import org.apache.spark.sql.execution.{ColumnarRule, CommandExecutionMode, QueryExecution, ShuffleCleanupMode, SparkOptimizer, SparkPlanner, SparkSqlParser} +import org.apache.spark.sql.execution.{ColumnarRule, CommandExecutionMode, QueryExecution, SparkOptimizer, SparkPlanner, SparkSqlParser} import org.apache.spark.sql.execution.adaptive.AdaptiveRulesHolder import org.apache.spark.sql.execution.aggregate.{ResolveEncodersInScalaAgg, ScalaUDAF} import org.apache.spark.sql.execution.analysis.DetectAmbiguousSelfJoin @@ -406,9 +406,9 @@ abstract class BaseSessionStateBuilder( * Create a query execution object. */ protected def createQueryExecution: - (LogicalPlan, CommandExecutionMode.Value, ShuffleCleanupMode) => QueryExecution = - (plan, mode, cleanupMode) => new QueryExecution(session, plan, mode = mode, - shuffleCleanupMode = cleanupMode) + (LogicalPlan, CommandExecutionMode.Value) => QueryExecution = + (plan, mode) => new QueryExecution(session, plan, mode = mode, + shuffleCleanupMode = QueryExecution.determineShuffleCleanupMode(session.sessionState.conf)) /** * Interface to start and stop streaming queries. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala index e20cf0c95f6ce..440148989ffb9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala @@ -86,8 +86,7 @@ private[sql] class SessionState( val streamingQueryManagerBuilder: () => StreamingQueryManager, val listenerManager: ExecutionListenerManager, resourceLoaderBuilder: () => SessionResourceLoader, - createQueryExecution: (LogicalPlan, CommandExecutionMode.Value, - ShuffleCleanupMode) => QueryExecution, + createQueryExecution: (LogicalPlan, CommandExecutionMode.Value) => QueryExecution, createClone: (SparkSession, SessionState) => SessionState, val columnarRules: Seq[ColumnarRule], val adaptiveRulesHolder: AdaptiveRulesHolder, @@ -136,9 +135,8 @@ private[sql] class SessionState( def executePlan( plan: LogicalPlan, - mode: CommandExecutionMode.Value = CommandExecutionMode.ALL, - shuffleCleanUpMode: ShuffleCleanupMode = DoNotCleanup): QueryExecution = - createQueryExecution(plan, mode, shuffleCleanUpMode) + mode: CommandExecutionMode.Value = CommandExecutionMode.ALL): QueryExecution = + createQueryExecution(plan, mode) } private[sql] object SessionState { From 6cef7bd959b12a78a2b48e970c5918e4271dc172 Mon Sep 17 00:00:00 2001 From: Karuppayya Rajendran Date: Fri, 29 Aug 2025 13:41:18 -0700 Subject: [PATCH 3/5] Write test for DataWritingCommandExec --- .../spark/sql/execution/SQLExecution.scala | 1 - .../sql/execution/QueryExecutionSuite.scala | 23 +++++++++++++++++-- 2 files changed, 21 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala index 6f81a8d812343..31cc353389e3d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala @@ -193,7 +193,6 @@ object SQLExecution extends Logging { case command: V2CommandExec => command.children.flatMap(extractShuffleIds) case dataWritingCommand: DataWritingCommandExec => - // TODO: Write tests extractShuffleIds(dataWritingCommand.child) case _: ExecutedCommandExec => // TODO: Discuss diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/QueryExecutionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/QueryExecutionSuite.scala index 77819efaa9fe9..26736efdc1933 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/QueryExecutionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/QueryExecutionSuite.scala @@ -330,9 +330,9 @@ class QueryExecutionSuite extends SharedSparkSession { test("SPARK-53413: Cleanup shuffle dependencies for commands") { Seq(true, false).foreach { adaptiveEnabled => { withSQLConf((SQLConf.ADAPTIVE_EXECUTION_ENABLED.key, adaptiveEnabled.toString), - (SQLConf.CLASSIC_SHUFFLE_DEPENDENCY_FILE_CLEANUP_ENABLED.key, "true")) { + (SQLConf.CLASSIC_SHUFFLE_DEPENDENCY_FILE_CLEANUP_ENABLED.key, true.toString)) { val plan = spark.range(100).repartition(10).logicalPlan - val df = Dataset.ofRows(spark, plan, RemoveShuffleFiles) + val df = Dataset.ofRows(spark, plan) df.write.format("noop").mode(SaveMode.Overwrite).save() val blockManager = spark.sparkContext.env.blockManager @@ -343,6 +343,25 @@ class QueryExecutionSuite extends SharedSparkSession { } } + test("SPARK-53413: Cleanup shuffle dependencies for DataWritingCommandExec") { + withTempDir { dir => + Seq(true, false).foreach { adaptiveEnabled => { + withSQLConf((SQLConf.ADAPTIVE_EXECUTION_ENABLED.key, adaptiveEnabled.toString), + (SQLConf.CLASSIC_SHUFFLE_DEPENDENCY_FILE_CLEANUP_ENABLED.key, true.toString)) { + val plan = spark.range(100).repartition(10).logicalPlan + val df = Dataset.ofRows(spark, plan) + // V1 API write + df.write.format("parquet").mode(SaveMode.Overwrite).save(dir.getCanonicalPath) + + val blockManager = spark.sparkContext.env.blockManager + assert(blockManager.migratableResolver.getStoredShuffles().isEmpty) + assert(blockManager.diskBlockManager.getAllBlocks().isEmpty) + } + } + } + } + } + test("SPARK-47764: Cleanup shuffle dependencies - DoNotCleanup mode") { Seq(true, false).foreach { adaptiveEnabled => { withSQLConf((SQLConf.ADAPTIVE_EXECUTION_ENABLED.key, adaptiveEnabled.toString)) { From 6850ebf0a515718570153533840224e19b994a45 Mon Sep 17 00:00:00 2001 From: Karuppayya Rajendran Date: Mon, 1 Sep 2025 09:35:24 -0700 Subject: [PATCH 4/5] Address review comments: Remove unused code, fix test to write csv --- .../scala/org/apache/spark/sql/execution/SQLExecution.scala | 5 +---- .../org/apache/spark/sql/execution/QueryExecutionSuite.scala | 2 +- 2 files changed, 2 insertions(+), 5 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala index 31cc353389e3d..1cab0f8d35af5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala @@ -30,7 +30,7 @@ import org.apache.spark.internal.config.{SPARK_DRIVER_PREFIX, SPARK_EXECUTOR_PRE import org.apache.spark.internal.config.Tests.IS_TESTING import org.apache.spark.sql.classic.SparkSession import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec -import org.apache.spark.sql.execution.command.{DataWritingCommandExec, ExecutedCommandExec} +import org.apache.spark.sql.execution.command.DataWritingCommandExec import org.apache.spark.sql.execution.datasources.v2.V2CommandExec import org.apache.spark.sql.execution.exchange.ShuffleExchangeLike import org.apache.spark.sql.execution.ui.{SparkListenerSQLExecutionEnd, SparkListenerSQLExecutionStart} @@ -194,9 +194,6 @@ object SQLExecution extends Logging { command.children.flatMap(extractShuffleIds) case dataWritingCommand: DataWritingCommandExec => extractShuffleIds(dataWritingCommand.child) - case _: ExecutedCommandExec => - // TODO: Discuss - Seq() case plan => extractShuffleIds(plan) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/QueryExecutionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/QueryExecutionSuite.scala index 26736efdc1933..0d0a0f2f31007 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/QueryExecutionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/QueryExecutionSuite.scala @@ -351,7 +351,7 @@ class QueryExecutionSuite extends SharedSparkSession { val plan = spark.range(100).repartition(10).logicalPlan val df = Dataset.ofRows(spark, plan) // V1 API write - df.write.format("parquet").mode(SaveMode.Overwrite).save(dir.getCanonicalPath) + df.write.format("csv").mode(SaveMode.Overwrite).save(dir.getCanonicalPath) val blockManager = spark.sparkContext.env.blockManager assert(blockManager.migratableResolver.getStoredShuffles().isEmpty) From 4fa440c1444a21ca2c749ba677f8dfd80551bf59 Mon Sep 17 00:00:00 2001 From: Karuppayya Date: Tue, 2 Sep 2025 21:51:21 -0700 Subject: [PATCH 5/5] Trigger CI