diff --git a/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergOptions.java b/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergOptions.java index 55fbab5158fa..9baa3d451777 100644 --- a/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergOptions.java +++ b/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergOptions.java @@ -90,6 +90,22 @@ public class IcebergOptions { .defaultValue("org.apache.hadoop.hive.metastore.HiveMetaStoreClient") .withDescription("Hive client class name for Iceberg Hive Catalog."); + public static final ConfigOption HIVE_DATABASE = + key("metadata.iceberg.database") + .stringType() + .noDefaultValue() + .withDescription( + "Hive database name for Iceberg Hive Catalog. " + + "Set this as an iceberg database alias if using a centralized Catalog."); + + public static final ConfigOption HIVE_TABLE = + key("metadata.iceberg.table") + .stringType() + .noDefaultValue() + .withDescription( + "Hive table name for Iceberg Hive Catalog." + + "Set this as an iceberg table alias if using a centralized Catalog."); + /** Where to store Iceberg metadata. */ public enum StorageType implements DescribedEnum { DISABLED("disabled", "Disable Iceberg compatibility support."), diff --git a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/iceberg/IcebergHiveMetadataCommitter.java b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/iceberg/IcebergHiveMetadataCommitter.java index ddd21384cbc8..6e1da2c3eb78 100644 --- a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/iceberg/IcebergHiveMetadataCommitter.java +++ b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/iceberg/IcebergHiveMetadataCommitter.java @@ -61,6 +61,8 @@ public class IcebergHiveMetadataCommitter implements IcebergMetadataCommitter { private final FileStoreTable table; private final Identifier identifier; private final ClientPool clients; + private final String icebergHiveDatabase; + private final String icebergHiveTable; public IcebergHiveMetadataCommitter(FileStoreTable table) { this.table = table; @@ -78,6 +80,8 @@ public IcebergHiveMetadataCommitter(FileStoreTable table) { String uri = options.get(IcebergOptions.URI); String hiveConfDir = options.get(IcebergOptions.HIVE_CONF_DIR); String hadoopConfDir = options.get(IcebergOptions.HADOOP_CONF_DIR); + String icebergDatabase = options.get(IcebergOptions.HIVE_DATABASE); + String icebergTable = options.get(IcebergOptions.HIVE_TABLE); Configuration hadoopConf = new Configuration(); hadoopConf.setClassLoader(IcebergHiveMetadataCommitter.class.getClassLoader()); HiveConf hiveConf = HiveCatalog.createHiveConf(hiveConfDir, hadoopConfDir, hadoopConf); @@ -97,6 +101,15 @@ public IcebergHiveMetadataCommitter(FileStoreTable table) { IcebergOptions.URI.key()); } + this.icebergHiveDatabase = + icebergDatabase != null && !icebergDatabase.isEmpty() + ? icebergDatabase + : this.identifier.getDatabaseName(); + this.icebergHiveTable = + icebergTable != null && !icebergTable.isEmpty() + ? icebergTable + : this.identifier.getTableName(); + this.clients = new CachedClientPool( hiveConf, options, options.getString(IcebergOptions.HIVE_CLIENT_CLASS)); @@ -113,18 +126,14 @@ public void commitMetadata(Path newMetadataPath, @Nullable Path baseMetadataPath private void commitMetadataImpl(Path newMetadataPath, @Nullable Path baseMetadataPath) throws Exception { - if (!databaseExists(identifier.getDatabaseName())) { - createDatabase(identifier.getDatabaseName()); + if (!databaseExists(icebergHiveDatabase)) { + createDatabase(icebergHiveDatabase); } Table hiveTable; - if (tableExists(identifier)) { + if (tableExists()) { hiveTable = - clients.run( - client -> - client.getTable( - identifier.getDatabaseName(), - identifier.getTableName())); + clients.run(client -> client.getTable(icebergHiveDatabase, icebergHiveTable)); } else { hiveTable = createTable(newMetadataPath); } @@ -138,11 +147,7 @@ private void commitMetadataImpl(Path newMetadataPath, @Nullable Path baseMetadat clients.execute( client -> - client.alter_table( - identifier.getDatabaseName(), - identifier.getTableName(), - hiveTable, - true)); + client.alter_table(icebergHiveDatabase, icebergHiveTable, hiveTable, true)); } private boolean databaseExists(String databaseName) throws Exception { @@ -161,19 +166,16 @@ private void createDatabase(String databaseName) throws Exception { clients.execute(client -> client.createDatabase(database)); } - private boolean tableExists(Identifier identifier) throws Exception { - return clients.run( - client -> - client.tableExists( - identifier.getDatabaseName(), identifier.getTableName())); + private boolean tableExists() throws Exception { + return clients.run(client -> client.tableExists(icebergHiveDatabase, icebergHiveTable)); } private Table createTable(Path metadataPath) throws Exception { long currentTimeMillis = System.currentTimeMillis(); Table hiveTable = new Table( - identifier.getTableName(), - identifier.getDatabaseName(), + icebergHiveTable, + icebergHiveDatabase, // current linux user System.getProperty("user.name"), (int) (currentTimeMillis / 1000), diff --git a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/iceberg/IcebergHiveMetadataCommitterITCaseBase.java b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/iceberg/IcebergHiveMetadataCommitterITCaseBase.java index d0c64c5d3b7f..c97f6f209048 100644 --- a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/iceberg/IcebergHiveMetadataCommitterITCaseBase.java +++ b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/iceberg/IcebergHiveMetadataCommitterITCaseBase.java @@ -59,6 +59,7 @@ public void before() throws Exception { @After public void after() { hiveShell.execute("DROP DATABASE IF EXISTS test_db CASCADE"); + hiveShell.execute("DROP DATABASE IF EXISTS test_db_iceberg CASCADE"); } @Test @@ -110,6 +111,45 @@ public void testPrimaryKeyTable() throws Exception { .executeQuery("DESC DATABASE EXTENDED test_db") .toString() .contains("iceberg/test_db")); + + // specify a dedicated hive database and table for paimon iceberg commiter + tEnv.executeSql( + "CREATE TABLE my_paimon.test_db.t1 ( pt INT, id INT, data STRING, PRIMARY KEY (pt, id) NOT ENFORCED ) " + + "PARTITIONED BY (pt) WITH " + + "( 'metadata.iceberg.storage' = 'hive-catalog', 'metadata.iceberg.uri' = '', 'file.format' = 'avro', " + + " 'metadata.iceberg.database' = 'test_db_iceberg', 'metadata.iceberg.table' = 't1_iceberg'," + // make sure all changes are visible in iceberg metadata + + " 'full-compaction.delta-commits' = '1' )"); + tEnv.executeSql( + "INSERT INTO my_paimon.test_db.t1 VALUES " + + "(1, 1, 'apple'), (1, 2, 'pear'), (2, 1, 'cat'), (2, 2, 'dog')") + .await(); + + Assert.assertEquals( + Arrays.asList(Row.of("pear", 2, 1), Row.of("dog", 2, 2)), + collect( + tEnv.executeSql( + "SELECT data, id, pt FROM my_iceberg.test_db_iceberg.t1_iceberg WHERE id = 2 ORDER BY pt, id"))); + + tEnv.executeSql( + "INSERT INTO my_paimon.test_db.t1 VALUES " + + "(1, 1, 'cherry'), (2, 2, 'elephant')") + .await(); + Assert.assertEquals( + Arrays.asList( + Row.of(1, 1, "cherry"), + Row.of(1, 2, "pear"), + Row.of(2, 1, "cat"), + Row.of(2, 2, "elephant")), + collect( + tEnv.executeSql( + "SELECT * FROM my_iceberg.test_db_iceberg.t1_iceberg ORDER BY pt, id"))); + + Assert.assertTrue( + hiveShell + .executeQuery("DESC DATABASE EXTENDED test_db_iceberg") + .toString() + .contains("iceberg/test_db")); } @Test @@ -154,6 +194,36 @@ public void testAppendOnlyTable() throws Exception { collect( tEnv.executeSql( "SELECT data, id, pt FROM my_iceberg.test_db.t WHERE id > 1 ORDER BY pt, id"))); + + // specify a dedicated hive database and table for paimon iceberg commiter + tEnv.executeSql( + "CREATE TABLE my_paimon.test_db.t1 ( pt INT, id INT, data STRING ) PARTITIONED BY (pt) WITH " + + "( 'metadata.iceberg.storage' = 'hive-catalog', 'metadata.iceberg.uri' = '', 'file.format' = 'avro', " + + "'metadata.iceberg.database' = 'test_db_iceberg', 'metadata.iceberg.table' = 't1_iceberg')"); + tEnv.executeSql( + "INSERT INTO my_paimon.test_db.t1 VALUES " + + "(1, 1, 'apple'), (1, 2, 'pear'), (2, 1, 'cat'), (2, 2, 'dog')") + .await(); + + Assert.assertEquals( + Arrays.asList(Row.of("pear", 2, 1), Row.of("dog", 2, 2)), + collect( + tEnv.executeSql( + "SELECT data, id, pt FROM my_iceberg.test_db_iceberg.t1_iceberg WHERE id = 2 ORDER BY pt, id"))); + + tEnv.executeSql( + "INSERT INTO my_paimon.test_db.t1 VALUES " + + "(1, 3, 'cherry'), (2, 3, 'elephant')") + .await(); + Assert.assertEquals( + Arrays.asList( + Row.of("pear", 2, 1), + Row.of("cherry", 3, 1), + Row.of("dog", 2, 2), + Row.of("elephant", 3, 2)), + collect( + tEnv.executeSql( + "SELECT data, id, pt FROM my_iceberg.test_db_iceberg.t1_iceberg WHERE id > 1 ORDER BY pt, id"))); } @Test