Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -3822,6 +3822,14 @@ object SQLConf {
.version("4.1.0")
.fallbackConf(SHUFFLE_DEPENDENCY_FILE_CLEANUP_ENABLED)

val THRIFTSERVER_SHUFFLE_DEPENDENCY_FILE_CLEANUP_ENABLED =
buildConf("spark.sql.thriftserver.shuffleDependency.fileCleanup.enabled")
.doc("When enabled, shuffle files will be cleaned up at the end of Thrift server " +
"SQL executions.")
.version("4.2.0")
.booleanConf
.createWithDefault(Utils.isTesting)

val SORT_MERGE_JOIN_EXEC_BUFFER_IN_MEMORY_THRESHOLD =
buildConf("spark.sql.sortMergeJoinExec.buffer.in.memory.threshold")
.internal()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import org.apache.spark.internal.Logging
import org.apache.spark.internal.LogKeys.COMMAND
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.plans.logical.CommandResult
import org.apache.spark.sql.classic.ClassicConversions._
import org.apache.spark.sql.execution.{QueryExecution, QueryExecutionException, SQLExecution}
import org.apache.spark.sql.execution.HiveResult.hiveResultString
import org.apache.spark.sql.internal.{SQLConf, VariableSubstitution}
Expand Down Expand Up @@ -67,8 +66,23 @@ private[hive] class SparkSQLDriver(val sparkSession: SparkSession = SparkSQLEnv.
new VariableSubstitution().substitute(command)
}
sparkSession.sparkContext.setJobDescription(substitutorCommand)
val execution = sparkSession.sessionState.executePlan(sparkSession.sql(command).logicalPlan)
// The SQL command has been executed above via `executePlan`, therefore we don't need to

val logicalPlan = sparkSession.sessionState.sqlParser.parsePlan(substitutorCommand)
val conf = sparkSession.sessionState.conf

val shuffleCleanupMode =
if (conf.getConf(SQLConf.THRIFTSERVER_SHUFFLE_DEPENDENCY_FILE_CLEANUP_ENABLED)) {
org.apache.spark.sql.execution.RemoveShuffleFiles
} else {
org.apache.spark.sql.execution.DoNotCleanup
}

val execution = new QueryExecution(
sparkSession.asInstanceOf[org.apache.spark.sql.classic.SparkSession],
logicalPlan,
shuffleCleanupMode = shuffleCleanupMode)

// the above execution already has an execution ID, therefore we don't need to
// wrap it again with a new execution ID when getting Hive result.
execution.logical match {
case _: CommandResult =>
Expand Down