From c62c4edce1de27ba4c3c35b5429f4c69532f65d7 Mon Sep 17 00:00:00 2001 From: Sandy Ryza Date: Fri, 11 Jul 2025 09:48:12 -0700 Subject: [PATCH] Revert "[SPARK-52576][SDP] Drop/recreate on full refresh and MV update" This reverts commit 8b437570fd1e93ae3f3fad09482f62727cca77db. --- .../spark/sql/pipelines/graph/DatasetManager.scala | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/DatasetManager.scala b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/DatasetManager.scala index d8bc3913e5ad3..0d393e307aaa7 100644 --- a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/DatasetManager.scala +++ b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/DatasetManager.scala @@ -179,13 +179,12 @@ object DatasetManager extends Logging { } // Wipe the data if we need to - val dropTable = (isFullRefresh || !table.isStreamingTable) && existingTableOpt.isDefined - if (dropTable) { - catalog.dropTable(identifier) + if ((isFullRefresh || !table.isStreamingTable) && existingTableOpt.isDefined) { + context.spark.sql(s"TRUNCATE TABLE ${table.identifier.quotedString}") } // Alter the table if we need to - if (existingTableOpt.isDefined && !dropTable) { + if (existingTableOpt.isDefined) { val existingSchema = v2ColumnsToStructType(existingTableOpt.get.columns()) val targetSchema = if (table.isStreamingTable && !isFullRefresh) { @@ -200,7 +199,7 @@ object DatasetManager extends Logging { } // Create the table if we need to - if (dropTable || existingTableOpt.isEmpty) { + if (existingTableOpt.isEmpty) { catalog.createTable( identifier, new TableInfo.Builder()