From 251dbfab93757a4ea80374711cec227b250342f7 Mon Sep 17 00:00:00 2001 From: userzhy <48518279+userzhy@users.noreply.github.com> Date: Fri, 26 Dec 2025 14:48:26 +0000 Subject: [PATCH] [spark] Support compact_database procedure (#6328) --- .../apache/paimon/spark/SparkProcedures.java | 2 + .../procedure/CompactDatabaseProcedure.java | 217 ++++++++++++++++++ .../CompactDatabaseProcedureTest.scala | 204 ++++++++++++++++ 3 files changed, 423 insertions(+) create mode 100644 paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactDatabaseProcedure.java create mode 100644 paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CompactDatabaseProcedureTest.scala diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkProcedures.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkProcedures.java index 99ed1e83a3ed..10d9792317f0 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkProcedures.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkProcedures.java @@ -21,6 +21,7 @@ import org.apache.paimon.spark.procedure.AlterFunctionProcedure; import org.apache.paimon.spark.procedure.AlterViewDialectProcedure; import org.apache.paimon.spark.procedure.ClearConsumersProcedure; +import org.apache.paimon.spark.procedure.CompactDatabaseProcedure; import org.apache.paimon.spark.procedure.CompactManifestProcedure; import org.apache.paimon.spark.procedure.CompactProcedure; import org.apache.paimon.spark.procedure.CopyFilesProcedure; @@ -96,6 +97,7 @@ private static Map> initProcedureBuilders() { procedureBuilders.put("create_global_index", CreateGlobalIndexProcedure::builder); procedureBuilders.put("delete_branch", DeleteBranchProcedure::builder); procedureBuilders.put("compact", CompactProcedure::builder); + procedureBuilders.put("compact_database", CompactDatabaseProcedure::builder); procedureBuilders.put("rescale", RescaleProcedure::builder); procedureBuilders.put("migrate_database", MigrateDatabaseProcedure::builder); procedureBuilders.put("migrate_table", MigrateTableProcedure::builder); diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactDatabaseProcedure.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactDatabaseProcedure.java new file mode 100644 index 000000000000..44889a8cb5a4 --- /dev/null +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactDatabaseProcedure.java @@ -0,0 +1,217 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark.procedure; + +import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.spark.catalog.WithPaimonCatalog; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.Table; +import org.apache.paimon.utils.StringUtils; + +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.connector.catalog.TableCatalog; +import org.apache.spark.sql.types.DataTypes; +import org.apache.spark.sql.types.Metadata; +import org.apache.spark.sql.types.StructField; +import org.apache.spark.sql.types.StructType; +import org.apache.spark.unsafe.types.UTF8String; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import static org.apache.spark.sql.types.DataTypes.StringType; + +/** + * Compact database procedure. Usage: + * + *

+ *  -- compact all databases
+ *  CALL sys.compact_database()
+ *
+ *  -- compact some databases (accept regular expression)
+ *  CALL sys.compact_database(including_databases => 'db1|db2')
+ *
+ *  -- compact some tables (accept regular expression)
+ *  CALL sys.compact_database(including_databases => 'db1', including_tables => 'table1|table2')
+ *
+ *  -- exclude some tables (accept regular expression)
+ *  CALL sys.compact_database(including_databases => 'db1', including_tables => '.*', excluding_tables => 'ignore_table')
+ *
+ *  -- set table options ('k=v,...')
+ *  CALL sys.compact_database(including_databases => 'db1', options => 'key1=value1,key2=value2')
+ * 
+ */ +public class CompactDatabaseProcedure extends BaseProcedure { + + private static final Logger LOG = LoggerFactory.getLogger(CompactDatabaseProcedure.class); + + private static final ProcedureParameter[] PARAMETERS = + new ProcedureParameter[] { + ProcedureParameter.optional("including_databases", StringType), + ProcedureParameter.optional("including_tables", StringType), + ProcedureParameter.optional("excluding_tables", StringType), + ProcedureParameter.optional("options", StringType), + }; + + private static final StructType OUTPUT_TYPE = + new StructType( + new StructField[] { + new StructField("result", DataTypes.StringType, true, Metadata.empty()) + }); + + protected CompactDatabaseProcedure(TableCatalog tableCatalog) { + super(tableCatalog); + } + + @Override + public ProcedureParameter[] parameters() { + return PARAMETERS; + } + + @Override + public StructType outputType() { + return OUTPUT_TYPE; + } + + @Override + public InternalRow[] call(InternalRow args) { + String includingDatabases = args.isNullAt(0) ? ".*" : args.getString(0); + String includingTables = args.isNullAt(1) ? ".*" : args.getString(1); + String excludingTables = args.isNullAt(2) ? null : args.getString(2); + String options = args.isNullAt(3) ? null : args.getString(3); + + Pattern databasePattern = Pattern.compile(includingDatabases); + Pattern includingPattern = Pattern.compile(includingTables); + Pattern excludingPattern = + StringUtils.isNullOrWhitespaceOnly(excludingTables) + ? null + : Pattern.compile(excludingTables); + + Catalog paimonCatalog = ((WithPaimonCatalog) tableCatalog()).paimonCatalog(); + + int successCount = 0; + int failedCount = 0; + + try { + List databases = paimonCatalog.listDatabases(); + for (String databaseName : databases) { + Matcher databaseMatcher = databasePattern.matcher(databaseName); + if (!databaseMatcher.matches()) { + LOG.debug("Database '{}' is excluded by pattern.", databaseName); + continue; + } + + List tables = paimonCatalog.listTables(databaseName); + for (String tableName : tables) { + String fullTableName = String.format("%s.%s", databaseName, tableName); + + if (!shouldCompactTable(fullTableName, includingPattern, excludingPattern)) { + LOG.debug("Table '{}' is excluded by pattern.", fullTableName); + continue; + } + + try { + Table table = + paimonCatalog.getTable(Identifier.create(databaseName, tableName)); + if (!(table instanceof FileStoreTable)) { + LOG.warn( + "Only FileStoreTable supports compact action. " + + "Table '{}' type is '{}'.", + fullTableName, + table.getClass().getName()); + continue; + } + + compactTable(fullTableName, options); + successCount++; + LOG.info("Successfully compacted table: {}", fullTableName); + } catch (Exception e) { + failedCount++; + LOG.error("Failed to compact table: {}", fullTableName, e); + } + } + } + } catch (Catalog.DatabaseNotExistException e) { + throw new RuntimeException(e); + } + + String result = + String.format( + "Compact database finished. Success: %d, Failed: %d", + successCount, failedCount); + return new InternalRow[] {newInternalRow(UTF8String.fromString(result))}; + } + + private boolean shouldCompactTable( + String fullTableName, Pattern includingPattern, Pattern excludingPattern) { + boolean shouldCompact = includingPattern.matcher(fullTableName).matches(); + if (excludingPattern != null) { + shouldCompact = shouldCompact && !excludingPattern.matcher(fullTableName).matches(); + } + return shouldCompact; + } + + private void compactTable(String tableName, String options) throws Exception { + LOG.info("Start to compact table: {}", tableName); + + // Build CompactProcedure and call it for each table + CompactProcedure compactProcedure = + (CompactProcedure) + CompactProcedure.builder().withTableCatalog(tableCatalog()).build(); + + // Create InternalRow with the parameters for CompactProcedure + // Parameters: table, partitions, compact_strategy, order_strategy, order_by, where, + // options, partition_idle_time + InternalRow compactArgs = + newInternalRow( + UTF8String.fromString(tableName), // table + null, // partitions + null, // compact_strategy + null, // order_strategy + null, // order_by + null, // where + options == null ? null : UTF8String.fromString(options), // options + null // partition_idle_time + ); + + InternalRow[] result = compactProcedure.call(compactArgs); + + if (result.length > 0 && !result[0].getBoolean(0)) { + throw new RuntimeException("Compact failed for table: " + tableName); + } + } + + public static ProcedureBuilder builder() { + return new BaseProcedure.Builder() { + @Override + public CompactDatabaseProcedure doBuild() { + return new CompactDatabaseProcedure(tableCatalog()); + } + }; + } + + @Override + public String description() { + return "This procedure executes compact action on all tables in database(s)."; + } +} diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CompactDatabaseProcedureTest.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CompactDatabaseProcedureTest.scala new file mode 100644 index 000000000000..5bc4802f999f --- /dev/null +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CompactDatabaseProcedureTest.scala @@ -0,0 +1,204 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark.procedure + +import org.apache.paimon.Snapshot.CommitKind +import org.apache.paimon.spark.PaimonSparkTestBase +import org.apache.paimon.table.FileStoreTable + +import org.assertj.core.api.Assertions + +/** Test compact_database procedure. See [[CompactDatabaseProcedure]]. */ +class CompactDatabaseProcedureTest extends PaimonSparkTestBase { + + def lastSnapshotCommand(table: FileStoreTable): CommitKind = { + table.snapshotManager().latestSnapshot().commitKind() + } + + def lastSnapshotId(table: FileStoreTable): Long = { + table.snapshotManager().latestSnapshotId() + } + + test("Paimon Procedure: compact database - basic test") { + spark.sql("CREATE DATABASE IF NOT EXISTS test_db1") + spark.sql("CREATE DATABASE IF NOT EXISTS test_db2") + + withTable("test_db1.T1", "test_db1.T2", "test_db2.T3") { + // Create tables in test_db1 + spark.sql(s""" + |CREATE TABLE test_db1.T1 (id INT, value STRING) + |TBLPROPERTIES ('primary-key'='id', 'bucket'='1', 'write-only'='true') + |""".stripMargin) + + spark.sql(s""" + |CREATE TABLE test_db1.T2 (id INT, value STRING) + |TBLPROPERTIES ('primary-key'='id', 'bucket'='1', 'write-only'='true') + |""".stripMargin) + + // Create table in test_db2 + spark.sql(s""" + |CREATE TABLE test_db2.T3 (id INT, value STRING) + |TBLPROPERTIES ('primary-key'='id', 'bucket'='1', 'write-only'='true') + |""".stripMargin) + + // Insert data multiple times to create multiple files that need compaction + spark.sql("INSERT INTO test_db1.T1 VALUES (1, 'a'), (2, 'b')") + spark.sql("INSERT INTO test_db1.T1 VALUES (3, 'c'), (4, 'd')") + + spark.sql("INSERT INTO test_db1.T2 VALUES (1, 'x'), (2, 'y')") + spark.sql("INSERT INTO test_db1.T2 VALUES (3, 'z'), (4, 'w')") + + spark.sql("INSERT INTO test_db2.T3 VALUES (1, 'm'), (2, 'n')") + spark.sql("INSERT INTO test_db2.T3 VALUES (3, 'o'), (4, 'p')") + + val table1 = loadTable("test_db1", "T1") + val table2 = loadTable("test_db1", "T2") + val table3 = loadTable("test_db2", "T3") + + Assertions.assertThat(lastSnapshotId(table1)).isEqualTo(2) + Assertions.assertThat(lastSnapshotId(table2)).isEqualTo(2) + Assertions.assertThat(lastSnapshotId(table3)).isEqualTo(2) + + // Compact all databases + val result = spark.sql("CALL sys.compact_database()").collect() + Assertions.assertThat(result.length).isEqualTo(1) + Assertions.assertThat(result(0).getString(0)).contains("Success") + + // Verify compaction happened - reload table to get new snapshot + val table1After = loadTable("test_db1", "T1") + val table2After = loadTable("test_db1", "T2") + val table3After = loadTable("test_db2", "T3") + Assertions.assertThat(lastSnapshotCommand(table1After).equals(CommitKind.COMPACT)).isTrue + Assertions.assertThat(lastSnapshotCommand(table2After).equals(CommitKind.COMPACT)).isTrue + Assertions.assertThat(lastSnapshotCommand(table3After).equals(CommitKind.COMPACT)).isTrue + } + + spark.sql("DROP DATABASE IF EXISTS test_db1 CASCADE") + spark.sql("DROP DATABASE IF EXISTS test_db2 CASCADE") + } + + test("Paimon Procedure: compact database - with database filter") { + spark.sql("CREATE DATABASE IF NOT EXISTS db_include") + spark.sql("CREATE DATABASE IF NOT EXISTS db_exclude") + + withTable("db_include.T1", "db_exclude.T2") { + spark.sql(s""" + |CREATE TABLE db_include.T1 (id INT, value STRING) + |TBLPROPERTIES ('primary-key'='id', 'bucket'='1', 'write-only'='true') + |""".stripMargin) + + spark.sql(s""" + |CREATE TABLE db_exclude.T2 (id INT, value STRING) + |TBLPROPERTIES ('primary-key'='id', 'bucket'='1', 'write-only'='true') + |""".stripMargin) + + spark.sql("INSERT INTO db_include.T1 VALUES (1, 'a'), (2, 'b')") + spark.sql("INSERT INTO db_include.T1 VALUES (3, 'c'), (4, 'd')") + + spark.sql("INSERT INTO db_exclude.T2 VALUES (1, 'x'), (2, 'y')") + spark.sql("INSERT INTO db_exclude.T2 VALUES (3, 'z'), (4, 'w')") + + val table1 = loadTable("db_include", "T1") + val table2 = loadTable("db_exclude", "T2") + + // Compact only db_include database + spark.sql("CALL sys.compact_database(including_databases => 'db_include')") + + // Only table1 should be compacted - reload table to get new snapshot + val table1After = loadTable("db_include", "T1") + Assertions.assertThat(lastSnapshotCommand(table1After).equals(CommitKind.COMPACT)).isTrue + Assertions.assertThat(lastSnapshotId(table2)).isEqualTo(2) // Still only 2 snapshots + } + + spark.sql("DROP DATABASE IF EXISTS db_include CASCADE") + spark.sql("DROP DATABASE IF EXISTS db_exclude CASCADE") + } + + test("Paimon Procedure: compact database - with table filter") { + spark.sql("CREATE DATABASE IF NOT EXISTS filter_db") + + withTable("filter_db.include_table", "filter_db.exclude_table") { + spark.sql(s""" + |CREATE TABLE filter_db.include_table (id INT, value STRING) + |TBLPROPERTIES ('primary-key'='id', 'bucket'='1', 'write-only'='true') + |""".stripMargin) + + spark.sql(s""" + |CREATE TABLE filter_db.exclude_table (id INT, value STRING) + |TBLPROPERTIES ('primary-key'='id', 'bucket'='1', 'write-only'='true') + |""".stripMargin) + + spark.sql("INSERT INTO filter_db.include_table VALUES (1, 'a'), (2, 'b')") + spark.sql("INSERT INTO filter_db.include_table VALUES (3, 'c'), (4, 'd')") + + spark.sql("INSERT INTO filter_db.exclude_table VALUES (1, 'x'), (2, 'y')") + spark.sql("INSERT INTO filter_db.exclude_table VALUES (3, 'z'), (4, 'w')") + + val includeTable = loadTable("filter_db", "include_table") + val excludeTable = loadTable("filter_db", "exclude_table") + + // Compact only include_table using including_tables pattern + spark.sql( + "CALL sys.compact_database(including_databases => 'filter_db', including_tables => '.*include.*')") + + val includeTableAfter = loadTable("filter_db", "include_table") + Assertions + .assertThat(lastSnapshotCommand(includeTableAfter).equals(CommitKind.COMPACT)) + .isTrue + Assertions.assertThat(lastSnapshotId(excludeTable)).isEqualTo(2) + } + + spark.sql("DROP DATABASE IF EXISTS filter_db CASCADE") + } + + test("Paimon Procedure: compact database - with excluding_tables filter") { + spark.sql("CREATE DATABASE IF NOT EXISTS exclude_db") + + withTable("exclude_db.T1", "exclude_db.T2") { + spark.sql(s""" + |CREATE TABLE exclude_db.T1 (id INT, value STRING) + |TBLPROPERTIES ('primary-key'='id', 'bucket'='1', 'write-only'='true') + |""".stripMargin) + + spark.sql(s""" + |CREATE TABLE exclude_db.T2 (id INT, value STRING) + |TBLPROPERTIES ('primary-key'='id', 'bucket'='1', 'write-only'='true') + |""".stripMargin) + + spark.sql("INSERT INTO exclude_db.T1 VALUES (1, 'a'), (2, 'b')") + spark.sql("INSERT INTO exclude_db.T1 VALUES (3, 'c'), (4, 'd')") + + spark.sql("INSERT INTO exclude_db.T2 VALUES (1, 'x'), (2, 'y')") + spark.sql("INSERT INTO exclude_db.T2 VALUES (3, 'z'), (4, 'w')") + + val table1 = loadTable("exclude_db", "T1") + val table2 = loadTable("exclude_db", "T2") + + // Compact all tables except T2 + spark.sql( + "CALL sys.compact_database(including_databases => 'exclude_db', excluding_tables => '.*T2')") + + val table1After = loadTable("exclude_db", "T1") + Assertions.assertThat(lastSnapshotCommand(table1After).equals(CommitKind.COMPACT)).isTrue + Assertions.assertThat(lastSnapshotId(table2)).isEqualTo(2) + } + + spark.sql("DROP DATABASE IF EXISTS exclude_db CASCADE") + } +}