diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 8b50abbe4052c..2f7706c859bae 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -3822,6 +3822,14 @@ object SQLConf { .version("4.1.0") .fallbackConf(SHUFFLE_DEPENDENCY_FILE_CLEANUP_ENABLED) + val THRIFTSERVER_SHUFFLE_DEPENDENCY_FILE_CLEANUP_ENABLED = + buildConf("spark.sql.thriftserver.shuffleDependency.fileCleanup.enabled") + .doc("When enabled, shuffle files will be cleaned up at the end of Thrift server " + + "SQL executions.") + .version("4.2.0") + .booleanConf + .createWithDefault(Utils.isTesting) + val SORT_MERGE_JOIN_EXEC_BUFFER_IN_MEMORY_THRESHOLD = buildConf("spark.sql.sortMergeJoinExec.buffer.in.memory.threshold") .internal() diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLDriver.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLDriver.scala index 8b9b7352fdca2..7a220d516757a 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLDriver.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLDriver.scala @@ -30,7 +30,6 @@ import org.apache.spark.internal.Logging import org.apache.spark.internal.LogKeys.COMMAND import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.plans.logical.CommandResult -import org.apache.spark.sql.classic.ClassicConversions._ import org.apache.spark.sql.execution.{QueryExecution, QueryExecutionException, SQLExecution} import org.apache.spark.sql.execution.HiveResult.hiveResultString import org.apache.spark.sql.internal.{SQLConf, VariableSubstitution} @@ -67,8 +66,23 @@ private[hive] class SparkSQLDriver(val sparkSession: SparkSession = SparkSQLEnv. new VariableSubstitution().substitute(command) } sparkSession.sparkContext.setJobDescription(substitutorCommand) - val execution = sparkSession.sessionState.executePlan(sparkSession.sql(command).logicalPlan) - // The SQL command has been executed above via `executePlan`, therefore we don't need to + + val logicalPlan = sparkSession.sessionState.sqlParser.parsePlan(substitutorCommand) + val conf = sparkSession.sessionState.conf + + val shuffleCleanupMode = + if (conf.getConf(SQLConf.THRIFTSERVER_SHUFFLE_DEPENDENCY_FILE_CLEANUP_ENABLED)) { + org.apache.spark.sql.execution.RemoveShuffleFiles + } else { + org.apache.spark.sql.execution.DoNotCleanup + } + + val execution = new QueryExecution( + sparkSession.asInstanceOf[org.apache.spark.sql.classic.SparkSession], + logicalPlan, + shuffleCleanupMode = shuffleCleanupMode) + + // the above execution already has an execution ID, therefore we don't need to // wrap it again with a new execution ID when getting Hive result. execution.logical match { case _: CommandResult =>