From dd4816c50dffb023cd632ae15c53bb80ebc2058f Mon Sep 17 00:00:00 2001 From: xuyu <11161569@vivo.com> Date: Thu, 9 Jan 2025 12:59:43 +0800 Subject: [PATCH 1/6] add ut --- .../sql/DDLWithHiveCatalogTestBase.scala | 63 +++++++++++++++++++ 1 file changed, 63 insertions(+) diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DDLWithHiveCatalogTestBase.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DDLWithHiveCatalogTestBase.scala index b90fe8654925..3599300960bd 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DDLWithHiveCatalogTestBase.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DDLWithHiveCatalogTestBase.scala @@ -134,6 +134,69 @@ abstract class DDLWithHiveCatalogTestBase extends PaimonHiveTestBase { } } + test( + "Paimon partition expire test with hive catalog: expire partition for paimon table sparkCatalogName") { + Seq(paimonHiveCatalogName).foreach { + catalogName => + spark.sql(s"USE $catalogName") + withTempDir { + dBLocation => + withDatabase("paimon_db") { + val comment = "this is a test comment" + spark.sql( + s"CREATE DATABASE paimon_db LOCATION '${dBLocation.getCanonicalPath}' COMMENT '$comment'") + Assertions.assertEquals(getDatabaseLocation("paimon_db"), dBLocation.getCanonicalPath) + Assertions.assertEquals(getDatabaseComment("paimon_db"), comment) + + withTable("paimon_db.paimon_tbl") { + spark.sql(s""" + |CREATE TABLE paimon_db.paimon_tbl (id STRING, name STRING, pt STRING) + |USING PAIMON + |PARTITIONED BY (pt) + |TBLPROPERTIES('metastore.partitioned-table' = 'false') + |""".stripMargin) + Assertions.assertEquals( + getTableLocation("paimon_db.paimon_tbl"), + s"${dBLocation.getCanonicalPath}/paimon_tbl") + spark.sql("insert into paimon_db.paimon_tbl select '1', 'n', '2024-11-01'") + + spark.sql("insert into paimon_db.paimon_tbl select '2', 'n', '9999-11-01'") + + spark.sql( + "CALL paimon.sys.expire_partitions(table => 'paimon_db.paimon_tbl', expiration_time => '1 d', timestamp_formatter => 'yyyy-MM-dd')") + + checkAnswer( + spark.sql("SELECT * FROM paimon_db.paimon_tbl"), + Row("1", "n", "9999-11-01") :: Nil) + } + + withTable("paimon_db.paimon_tbl2") { + spark.sql(s""" + |CREATE TABLE paimon_db.paimon_tbl2 (id STRING, name STRING, pt STRING) + |USING PAIMON + |PARTITIONED BY (pt) + |TBLPROPERTIES('metastore.partitioned-table' = 'true') + |""".stripMargin) + Assertions.assertEquals( + getTableLocation("paimon_db.paimon_tbl2"), + s"${dBLocation.getCanonicalPath}/paimon_tbl2") + spark.sql("insert into paimon_db.paimon_tbl2 select '1', 'n', '2024-11-01'") + + spark.sql("insert into paimon_db.paimon_tbl2 select '2', 'n', '9999-11-01'") + + spark.sql( + "CALL paimon.sys.expire_partitions(table => 'paimon_db.paimon_tbl2', expiration_time => '1 d', timestamp_formatter => 'yyyy-MM-dd')") + + checkAnswer( + spark.sql("SELECT * FROM paimon_db.paimon_tbl2"), + Row("1", "n", "9999-11-01") :: Nil) + } + + } + } + } + } + test("Paimon DDL with hive catalog: create partition for paimon table sparkCatalogName") { Seq(paimonHiveCatalogName).foreach { catalogName => From 8fd77104b8c7f7766860b86ebf1921e1d3818640 Mon Sep 17 00:00:00 2001 From: xuyu <11161569@vivo.com> Date: Thu, 9 Jan 2025 14:06:47 +0800 Subject: [PATCH 2/6] fix ut --- .../paimon/spark/sql/DDLWithHiveCatalogTestBase.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DDLWithHiveCatalogTestBase.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DDLWithHiveCatalogTestBase.scala index 3599300960bd..81de0d6708ce 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DDLWithHiveCatalogTestBase.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DDLWithHiveCatalogTestBase.scala @@ -163,11 +163,11 @@ abstract class DDLWithHiveCatalogTestBase extends PaimonHiveTestBase { spark.sql("insert into paimon_db.paimon_tbl select '2', 'n', '9999-11-01'") spark.sql( - "CALL paimon.sys.expire_partitions(table => 'paimon_db.paimon_tbl', expiration_time => '1 d', timestamp_formatter => 'yyyy-MM-dd')") + "CALL paimon_hive.sys.expire_partitions(table => 'paimon_db.paimon_tbl', expiration_time => '1 d', timestamp_formatter => 'yyyy-MM-dd')") checkAnswer( spark.sql("SELECT * FROM paimon_db.paimon_tbl"), - Row("1", "n", "9999-11-01") :: Nil) + Row("2", "n", "9999-11-01") :: Nil) } withTable("paimon_db.paimon_tbl2") { @@ -185,11 +185,11 @@ abstract class DDLWithHiveCatalogTestBase extends PaimonHiveTestBase { spark.sql("insert into paimon_db.paimon_tbl2 select '2', 'n', '9999-11-01'") spark.sql( - "CALL paimon.sys.expire_partitions(table => 'paimon_db.paimon_tbl2', expiration_time => '1 d', timestamp_formatter => 'yyyy-MM-dd')") + "CALL paimon_hive.sys.expire_partitions(table => 'paimon_db.paimon_tbl2', expiration_time => '1 d', timestamp_formatter => 'yyyy-MM-dd')") checkAnswer( spark.sql("SELECT * FROM paimon_db.paimon_tbl2"), - Row("1", "n", "9999-11-01") :: Nil) + Row("2", "n", "9999-11-01") :: Nil) } } From b0993ccec3a775b1d947a5ecd6c12bf90ed3a9e6 Mon Sep 17 00:00:00 2001 From: xuyu <11161569@vivo.com> Date: Thu, 9 Jan 2025 15:04:26 +0800 Subject: [PATCH 3/6] fix conflict --- .../paimon/flink/procedure/ExpirePartitionsProcedure.java | 8 +++++++- .../paimon/flink/procedure/ExpirePartitionsProcedure.java | 8 +++++++- .../paimon/spark/procedure/ExpirePartitionsProcedure.java | 8 +++++++- 3 files changed, 21 insertions(+), 3 deletions(-) diff --git a/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/ExpirePartitionsProcedure.java b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/ExpirePartitionsProcedure.java index 36e2bd1f0964..f4c2ca178e5c 100644 --- a/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/ExpirePartitionsProcedure.java +++ b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/ExpirePartitionsProcedure.java @@ -23,6 +23,7 @@ import org.apache.paimon.catalog.Catalog; import org.apache.paimon.operation.PartitionExpire; import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.PartitionHandler; import org.apache.paimon.utils.TimeUtils; import org.apache.flink.table.procedure.ProcedureContext; @@ -78,6 +79,11 @@ public String[] call( map.put(CoreOptions.PARTITION_TIMESTAMP_FORMATTER.key(), timestampFormatter); map.put(CoreOptions.PARTITION_TIMESTAMP_PATTERN.key(), timestampPattern); + PartitionHandler partitionHandler = + fileStore.options().partitionedTableInMetastore() + ? fileStoreTable.catalogEnvironment().partitionHandler() + : null; + PartitionExpire partitionExpire = new PartitionExpire( TimeUtils.parseDuration(expirationTime), @@ -86,7 +92,7 @@ public String[] call( CoreOptions.fromMap(map), fileStore.partitionType()), fileStore.newScan(), fileStore.newCommit(""), - fileStoreTable.catalogEnvironment().partitionHandler(), + partitionHandler, fileStore.options().partitionExpireMaxNum()); if (maxExpires != null) { partitionExpire.withMaxExpireNum(maxExpires); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ExpirePartitionsProcedure.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ExpirePartitionsProcedure.java index b9435e12ed44..1003d07cedfb 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ExpirePartitionsProcedure.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ExpirePartitionsProcedure.java @@ -23,6 +23,7 @@ import org.apache.paimon.catalog.Catalog; import org.apache.paimon.operation.PartitionExpire; import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.PartitionHandler; import org.apache.paimon.utils.TimeUtils; import org.apache.flink.table.annotation.ArgumentHint; @@ -82,6 +83,11 @@ public String identifier() { map.put(CoreOptions.PARTITION_TIMESTAMP_FORMATTER.key(), timestampFormatter); map.put(CoreOptions.PARTITION_TIMESTAMP_PATTERN.key(), timestampPattern); + PartitionHandler partitionHandler = + fileStore.options().partitionedTableInMetastore() + ? fileStoreTable.catalogEnvironment().partitionHandler() + : null; + PartitionExpire partitionExpire = new PartitionExpire( TimeUtils.parseDuration(expirationTime), @@ -90,7 +96,7 @@ public String identifier() { CoreOptions.fromMap(map), fileStore.partitionType()), fileStore.newScan(), fileStore.newCommit(""), - fileStoreTable.catalogEnvironment().partitionHandler(), + partitionHandler, fileStore.options().partitionExpireMaxNum()); if (maxExpires != null) { partitionExpire.withMaxExpireNum(maxExpires); diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/ExpirePartitionsProcedure.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/ExpirePartitionsProcedure.java index 4b9d50db8d94..4001ac83d62b 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/ExpirePartitionsProcedure.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/ExpirePartitionsProcedure.java @@ -22,6 +22,7 @@ import org.apache.paimon.FileStore; import org.apache.paimon.operation.PartitionExpire; import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.PartitionHandler; import org.apache.paimon.utils.TimeUtils; import org.apache.spark.sql.catalyst.InternalRow; @@ -92,6 +93,11 @@ public InternalRow[] call(InternalRow args) { map.put(CoreOptions.PARTITION_TIMESTAMP_FORMATTER.key(), timestampFormatter); map.put(CoreOptions.PARTITION_TIMESTAMP_PATTERN.key(), timestampPattern); + PartitionHandler partitionHandler = + fileStore.options().partitionedTableInMetastore() + ? fileStoreTable.catalogEnvironment().partitionHandler() + : null; + PartitionExpire partitionExpire = new PartitionExpire( TimeUtils.parseDuration(expirationTime), @@ -100,7 +106,7 @@ public InternalRow[] call(InternalRow args) { CoreOptions.fromMap(map), fileStore.partitionType()), fileStore.newScan(), fileStore.newCommit(""), - fileStoreTable.catalogEnvironment().partitionHandler(), + partitionHandler, fileStore.options().partitionExpireMaxNum()); if (maxExpires != null) { partitionExpire.withMaxExpireNum(maxExpires); From 08966de96c10ee0c892379bf8c94e37f71799310 Mon Sep 17 00:00:00 2001 From: xuyu <11161569@vivo.com> Date: Thu, 9 Jan 2025 15:45:58 +0800 Subject: [PATCH 4/6] restore ut for hms --- .../paimon/flink/procedure/ExpirePartitionsProcedure.java | 7 +------ .../paimon/flink/procedure/ExpirePartitionsProcedure.java | 8 +------- .../paimon/spark/procedure/ExpirePartitionsProcedure.java | 8 +------- 3 files changed, 3 insertions(+), 20 deletions(-) diff --git a/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/ExpirePartitionsProcedure.java b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/ExpirePartitionsProcedure.java index f4c2ca178e5c..c1d708189859 100644 --- a/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/ExpirePartitionsProcedure.java +++ b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/ExpirePartitionsProcedure.java @@ -79,11 +79,6 @@ public String[] call( map.put(CoreOptions.PARTITION_TIMESTAMP_FORMATTER.key(), timestampFormatter); map.put(CoreOptions.PARTITION_TIMESTAMP_PATTERN.key(), timestampPattern); - PartitionHandler partitionHandler = - fileStore.options().partitionedTableInMetastore() - ? fileStoreTable.catalogEnvironment().partitionHandler() - : null; - PartitionExpire partitionExpire = new PartitionExpire( TimeUtils.parseDuration(expirationTime), @@ -92,7 +87,7 @@ public String[] call( CoreOptions.fromMap(map), fileStore.partitionType()), fileStore.newScan(), fileStore.newCommit(""), - partitionHandler, + fileStoreTable.catalogEnvironment().partitionHandler(), fileStore.options().partitionExpireMaxNum()); if (maxExpires != null) { partitionExpire.withMaxExpireNum(maxExpires); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ExpirePartitionsProcedure.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ExpirePartitionsProcedure.java index 1003d07cedfb..b9435e12ed44 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ExpirePartitionsProcedure.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ExpirePartitionsProcedure.java @@ -23,7 +23,6 @@ import org.apache.paimon.catalog.Catalog; import org.apache.paimon.operation.PartitionExpire; import org.apache.paimon.table.FileStoreTable; -import org.apache.paimon.table.PartitionHandler; import org.apache.paimon.utils.TimeUtils; import org.apache.flink.table.annotation.ArgumentHint; @@ -83,11 +82,6 @@ public String identifier() { map.put(CoreOptions.PARTITION_TIMESTAMP_FORMATTER.key(), timestampFormatter); map.put(CoreOptions.PARTITION_TIMESTAMP_PATTERN.key(), timestampPattern); - PartitionHandler partitionHandler = - fileStore.options().partitionedTableInMetastore() - ? fileStoreTable.catalogEnvironment().partitionHandler() - : null; - PartitionExpire partitionExpire = new PartitionExpire( TimeUtils.parseDuration(expirationTime), @@ -96,7 +90,7 @@ public String identifier() { CoreOptions.fromMap(map), fileStore.partitionType()), fileStore.newScan(), fileStore.newCommit(""), - partitionHandler, + fileStoreTable.catalogEnvironment().partitionHandler(), fileStore.options().partitionExpireMaxNum()); if (maxExpires != null) { partitionExpire.withMaxExpireNum(maxExpires); diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/ExpirePartitionsProcedure.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/ExpirePartitionsProcedure.java index 4001ac83d62b..4b9d50db8d94 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/ExpirePartitionsProcedure.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/ExpirePartitionsProcedure.java @@ -22,7 +22,6 @@ import org.apache.paimon.FileStore; import org.apache.paimon.operation.PartitionExpire; import org.apache.paimon.table.FileStoreTable; -import org.apache.paimon.table.PartitionHandler; import org.apache.paimon.utils.TimeUtils; import org.apache.spark.sql.catalyst.InternalRow; @@ -93,11 +92,6 @@ public InternalRow[] call(InternalRow args) { map.put(CoreOptions.PARTITION_TIMESTAMP_FORMATTER.key(), timestampFormatter); map.put(CoreOptions.PARTITION_TIMESTAMP_PATTERN.key(), timestampPattern); - PartitionHandler partitionHandler = - fileStore.options().partitionedTableInMetastore() - ? fileStoreTable.catalogEnvironment().partitionHandler() - : null; - PartitionExpire partitionExpire = new PartitionExpire( TimeUtils.parseDuration(expirationTime), @@ -106,7 +100,7 @@ public InternalRow[] call(InternalRow args) { CoreOptions.fromMap(map), fileStore.partitionType()), fileStore.newScan(), fileStore.newCommit(""), - partitionHandler, + fileStoreTable.catalogEnvironment().partitionHandler(), fileStore.options().partitionExpireMaxNum()); if (maxExpires != null) { partitionExpire.withMaxExpireNum(maxExpires); From 4f122a11cd0e4299bd910b9463447d1ded22c43c Mon Sep 17 00:00:00 2001 From: xuyu <11161569@vivo.com> Date: Thu, 9 Jan 2025 15:46:41 +0800 Subject: [PATCH 5/6] restore ut for hms --- .../apache/paimon/flink/procedure/ExpirePartitionsProcedure.java | 1 - 1 file changed, 1 deletion(-) diff --git a/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/ExpirePartitionsProcedure.java b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/ExpirePartitionsProcedure.java index c1d708189859..36e2bd1f0964 100644 --- a/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/ExpirePartitionsProcedure.java +++ b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/ExpirePartitionsProcedure.java @@ -23,7 +23,6 @@ import org.apache.paimon.catalog.Catalog; import org.apache.paimon.operation.PartitionExpire; import org.apache.paimon.table.FileStoreTable; -import org.apache.paimon.table.PartitionHandler; import org.apache.paimon.utils.TimeUtils; import org.apache.flink.table.procedure.ProcedureContext; From 1c91400d8599f341bb6d5d0a131d9b79db8f52bc Mon Sep 17 00:00:00 2001 From: xuyu <11161569@vivo.com> Date: Thu, 9 Jan 2025 16:08:20 +0800 Subject: [PATCH 6/6] opt code --- .../sql/DDLWithHiveCatalogTestBase.scala | 89 ++++++++----------- 1 file changed, 37 insertions(+), 52 deletions(-) diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DDLWithHiveCatalogTestBase.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DDLWithHiveCatalogTestBase.scala index 81de0d6708ce..0c3db9a20d90 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DDLWithHiveCatalogTestBase.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DDLWithHiveCatalogTestBase.scala @@ -136,63 +136,48 @@ abstract class DDLWithHiveCatalogTestBase extends PaimonHiveTestBase { test( "Paimon partition expire test with hive catalog: expire partition for paimon table sparkCatalogName") { - Seq(paimonHiveCatalogName).foreach { - catalogName => - spark.sql(s"USE $catalogName") - withTempDir { - dBLocation => - withDatabase("paimon_db") { - val comment = "this is a test comment" - spark.sql( - s"CREATE DATABASE paimon_db LOCATION '${dBLocation.getCanonicalPath}' COMMENT '$comment'") - Assertions.assertEquals(getDatabaseLocation("paimon_db"), dBLocation.getCanonicalPath) - Assertions.assertEquals(getDatabaseComment("paimon_db"), comment) - - withTable("paimon_db.paimon_tbl") { - spark.sql(s""" - |CREATE TABLE paimon_db.paimon_tbl (id STRING, name STRING, pt STRING) - |USING PAIMON - |PARTITIONED BY (pt) - |TBLPROPERTIES('metastore.partitioned-table' = 'false') - |""".stripMargin) - Assertions.assertEquals( - getTableLocation("paimon_db.paimon_tbl"), - s"${dBLocation.getCanonicalPath}/paimon_tbl") - spark.sql("insert into paimon_db.paimon_tbl select '1', 'n', '2024-11-01'") - - spark.sql("insert into paimon_db.paimon_tbl select '2', 'n', '9999-11-01'") - - spark.sql( - "CALL paimon_hive.sys.expire_partitions(table => 'paimon_db.paimon_tbl', expiration_time => '1 d', timestamp_formatter => 'yyyy-MM-dd')") - - checkAnswer( - spark.sql("SELECT * FROM paimon_db.paimon_tbl"), - Row("2", "n", "9999-11-01") :: Nil) - } + spark.sql(s"USE $paimonHiveCatalogName") + withTempDir { + dBLocation => + withDatabase("paimon_db") { + spark.sql(s"CREATE DATABASE paimon_db LOCATION '${dBLocation.getCanonicalPath}'") + withTable("paimon_db.paimon_tbl") { + spark.sql(s""" + |CREATE TABLE paimon_db.paimon_tbl (id STRING, name STRING, pt STRING) + |USING PAIMON + |PARTITIONED BY (pt) + |TBLPROPERTIES('metastore.partitioned-table' = 'false') + |""".stripMargin) + spark.sql("insert into paimon_db.paimon_tbl select '1', 'n', '2024-11-01'") + spark.sql("insert into paimon_db.paimon_tbl select '2', 'n', '9999-11-01'") + + spark.sql( + "CALL paimon_hive.sys.expire_partitions(table => 'paimon_db.paimon_tbl', expiration_time => '1 d', timestamp_formatter => 'yyyy-MM-dd')") + + checkAnswer( + spark.sql("SELECT * FROM paimon_db.paimon_tbl"), + Row("2", "n", "9999-11-01") :: Nil) + } - withTable("paimon_db.paimon_tbl2") { - spark.sql(s""" - |CREATE TABLE paimon_db.paimon_tbl2 (id STRING, name STRING, pt STRING) - |USING PAIMON - |PARTITIONED BY (pt) - |TBLPROPERTIES('metastore.partitioned-table' = 'true') - |""".stripMargin) - Assertions.assertEquals( - getTableLocation("paimon_db.paimon_tbl2"), - s"${dBLocation.getCanonicalPath}/paimon_tbl2") - spark.sql("insert into paimon_db.paimon_tbl2 select '1', 'n', '2024-11-01'") + withTable("paimon_db.paimon_tbl2") { + spark.sql(s""" + |CREATE TABLE paimon_db.paimon_tbl2 (id STRING, name STRING, pt STRING) + |USING PAIMON + |PARTITIONED BY (pt) + |TBLPROPERTIES('metastore.partitioned-table' = 'true') + |""".stripMargin) + spark.sql("insert into paimon_db.paimon_tbl2 select '1', 'n', '2024-11-01'") - spark.sql("insert into paimon_db.paimon_tbl2 select '2', 'n', '9999-11-01'") + spark.sql("insert into paimon_db.paimon_tbl2 select '2', 'n', '9999-11-01'") - spark.sql( - "CALL paimon_hive.sys.expire_partitions(table => 'paimon_db.paimon_tbl2', expiration_time => '1 d', timestamp_formatter => 'yyyy-MM-dd')") + spark.sql( + "CALL paimon_hive.sys.expire_partitions(table => 'paimon_db.paimon_tbl2', expiration_time => '1 d', timestamp_formatter => 'yyyy-MM-dd')") - checkAnswer( - spark.sql("SELECT * FROM paimon_db.paimon_tbl2"), - Row("2", "n", "9999-11-01") :: Nil) - } + checkAnswer( + spark.sql("SELECT * FROM paimon_db.paimon_tbl2"), + Row("2", "n", "9999-11-01") :: Nil) + } - } } } }