From f1595b0dfb3aea7c9d41fac470c053bb7ae5c365 Mon Sep 17 00:00:00 2001 From: Anton Okolnychyi Date: Thu, 4 Jul 2019 14:49:38 +0100 Subject: [PATCH] Register existing tables in Iceberg HiveCatalog --- .../org/apache/iceberg/catalog/Catalog.java | 9 +++ .../apache/iceberg/BaseMetastoreCatalog.java | 16 ++-- .../org/apache/iceberg/hive/HiveCatalog.java | 24 ++++-- .../iceberg/hive/HiveTableOperations.java | 4 +- .../apache/iceberg/hive/HiveTableTest.java | 80 +++++++++++++++++++ 5 files changed, 116 insertions(+), 17 deletions(-) diff --git a/api/src/main/java/org/apache/iceberg/catalog/Catalog.java b/api/src/main/java/org/apache/iceberg/catalog/Catalog.java index dfe2c30d932c..fa6330533b0b 100644 --- a/api/src/main/java/org/apache/iceberg/catalog/Catalog.java +++ b/api/src/main/java/org/apache/iceberg/catalog/Catalog.java @@ -137,4 +137,13 @@ default boolean tableExists(TableIdentifier identifier) { * @throws NoSuchTableException if the table does not exist */ Table loadTable(TableIdentifier identifier); + + /** + * Register a table. + * + * @param identifier a table identifier + * @param metadataFileLocation the location of a metadata file + * @return a Table instance + */ + Table registerTable(TableIdentifier identifier, String metadataFileLocation); } diff --git a/core/src/main/java/org/apache/iceberg/BaseMetastoreCatalog.java b/core/src/main/java/org/apache/iceberg/BaseMetastoreCatalog.java index 9e2dcdd2390c..a2f5b6c3a50c 100644 --- a/core/src/main/java/org/apache/iceberg/BaseMetastoreCatalog.java +++ b/core/src/main/java/org/apache/iceberg/BaseMetastoreCatalog.java @@ -21,7 +21,6 @@ import com.google.common.collect.Maps; import java.util.Map; -import org.apache.hadoop.conf.Configuration; import org.apache.iceberg.catalog.Catalog; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.exceptions.AlreadyExistsException; @@ -29,11 +28,6 @@ import org.apache.iceberg.exceptions.NoSuchTableException; public abstract class BaseMetastoreCatalog implements Catalog { - private final Configuration conf; - - protected BaseMetastoreCatalog(Configuration conf) { - this.conf = conf; - } @Override public Table createTable( @@ -42,7 +36,7 @@ public Table createTable( PartitionSpec spec, String location, Map properties) { - TableOperations ops = newTableOps(conf, identifier); + TableOperations ops = newTableOps(identifier); if (ops.current() != null) { throw new AlreadyExistsException("Table already exists: " + identifier); } @@ -51,7 +45,7 @@ public Table createTable( if (location != null) { baseLocation = location; } else { - baseLocation = defaultWarehouseLocation(conf, identifier); + baseLocation = defaultWarehouseLocation(identifier); } TableMetadata metadata = TableMetadata.newTableMetadata( @@ -68,7 +62,7 @@ public Table createTable( @Override public Table loadTable(TableIdentifier identifier) { - TableOperations ops = newTableOps(conf, identifier); + TableOperations ops = newTableOps(identifier); if (ops.current() == null) { throw new NoSuchTableException("Table does not exist: " + identifier); } @@ -76,7 +70,7 @@ public Table loadTable(TableIdentifier identifier) { return new BaseTable(ops, identifier.toString()); } - protected abstract TableOperations newTableOps(Configuration newConf, TableIdentifier tableIdentifier); + protected abstract TableOperations newTableOps(TableIdentifier tableIdentifier); - protected abstract String defaultWarehouseLocation(Configuration hadoopConf, TableIdentifier tableIdentifier); + protected abstract String defaultWarehouseLocation(TableIdentifier tableIdentifier); } diff --git a/hive/src/main/java/org/apache/iceberg/hive/HiveCatalog.java b/hive/src/main/java/org/apache/iceberg/hive/HiveCatalog.java index 837c589d33af..fce0efc89550 100644 --- a/hive/src/main/java/org/apache/iceberg/hive/HiveCatalog.java +++ b/hive/src/main/java/org/apache/iceberg/hive/HiveCatalog.java @@ -26,18 +26,23 @@ import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; import org.apache.hadoop.hive.metastore.api.Table; import org.apache.iceberg.BaseMetastoreCatalog; +import org.apache.iceberg.BaseTable; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; +import org.apache.iceberg.TableMetadata; +import org.apache.iceberg.TableMetadataParser; import org.apache.iceberg.TableOperations; import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.hadoop.HadoopInputFile; import org.apache.thrift.TException; public class HiveCatalog extends BaseMetastoreCatalog implements Closeable { + private final Configuration conf; private final HiveClientPool clients; public HiveCatalog(Configuration conf) { - super(conf); + this.conf = conf; this.clients = new HiveClientPool(2, conf); } @@ -113,14 +118,23 @@ public void renameTable(TableIdentifier from, TableIdentifier to) { } @Override - public TableOperations newTableOps(Configuration configuration, TableIdentifier tableIdentifier) { + public org.apache.iceberg.Table registerTable(TableIdentifier identifier, String metadataFileLocation) { + TableOperations ops = newTableOps(identifier); + HadoopInputFile metadataFile = HadoopInputFile.fromLocation(metadataFileLocation, conf); + TableMetadata metadata = TableMetadataParser.read(ops, metadataFile); + ops.commit(null, metadata); + return new BaseTable(ops, identifier.toString()); + } + + @Override + public TableOperations newTableOps(TableIdentifier tableIdentifier) { String dbName = tableIdentifier.namespace().level(0); String tableName = tableIdentifier.name(); - return new HiveTableOperations(configuration, clients, dbName, tableName); + return new HiveTableOperations(conf, clients, dbName, tableName); } - protected String defaultWarehouseLocation(Configuration hadoopConf, TableIdentifier tableIdentifier) { - String warehouseLocation = hadoopConf.get("hive.metastore.warehouse.dir"); + protected String defaultWarehouseLocation(TableIdentifier tableIdentifier) { + String warehouseLocation = conf.get("hive.metastore.warehouse.dir"); Preconditions.checkNotNull( warehouseLocation, "Warehouse location is not set: hive.metastore.warehouse.dir=null"); diff --git a/hive/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java b/hive/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java index 1d130244720c..bccc9d1cf15d 100644 --- a/hive/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java +++ b/hive/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java @@ -119,7 +119,9 @@ public void commit(TableMetadata base, TableMetadata metadata) { return; } - String newMetadataLocation = writeNewMetadata(metadata, currentVersion() + 1); + String newMetadataLocation = metadata.file() == null ? + writeNewMetadata(metadata, currentVersion() + 1) : + metadata.file().location(); boolean threw = true; Optional lockId = Optional.empty(); diff --git a/hive/src/test/java/org/apache/iceberg/hive/HiveTableTest.java b/hive/src/test/java/org/apache/iceberg/hive/HiveTableTest.java index 3459e99bd578..257ba10d67e5 100644 --- a/hive/src/test/java/org/apache/iceberg/hive/HiveTableTest.java +++ b/hive/src/test/java/org/apache/iceberg/hive/HiveTableTest.java @@ -52,6 +52,7 @@ import static org.apache.iceberg.BaseMetastoreTableOperations.ICEBERG_TABLE_TYPE_VALUE; import static org.apache.iceberg.BaseMetastoreTableOperations.METADATA_LOCATION_PROP; +import static org.apache.iceberg.BaseMetastoreTableOperations.PREVIOUS_METADATA_LOCATION_PROP; import static org.apache.iceberg.BaseMetastoreTableOperations.TABLE_TYPE_PROP; public class HiveTableTest extends HiveTableBaseTest { @@ -223,4 +224,83 @@ public void testConcurrentFastAppends() { icebergTable.refresh(); Assert.assertEquals(20, icebergTable.currentSnapshot().manifests().size()); } + + @Test + public void testRegisterTable() throws TException { + org.apache.hadoop.hive.metastore.api.Table originalTable = metastoreClient.getTable(DB_NAME, TABLE_NAME); + + Map originalParams = originalTable.getParameters(); + Assert.assertNotNull(originalParams); + Assert.assertTrue(ICEBERG_TABLE_TYPE_VALUE.equalsIgnoreCase(originalParams.get(TABLE_TYPE_PROP))); + Assert.assertTrue("EXTERNAL_TABLE".equalsIgnoreCase(originalTable.getTableType())); + + catalog.dropTable(TABLE_IDENTIFIER); + Assert.assertFalse(catalog.tableExists(TABLE_IDENTIFIER)); + + List metadataVersionFiles = metadataVersionFiles(TABLE_NAME); + Assert.assertEquals(1, metadataVersionFiles.size()); + + catalog.registerTable(TABLE_IDENTIFIER, "file:" + metadataVersionFiles.get(0)); + + org.apache.hadoop.hive.metastore.api.Table newTable = metastoreClient.getTable(DB_NAME, TABLE_NAME); + + Map newTableParameters = newTable.getParameters(); + Assert.assertNull(newTableParameters.get(PREVIOUS_METADATA_LOCATION_PROP)); + Assert.assertEquals(originalParams.get(TABLE_TYPE_PROP), newTableParameters.get(TABLE_TYPE_PROP)); + Assert.assertEquals(originalParams.get(METADATA_LOCATION_PROP), newTableParameters.get(METADATA_LOCATION_PROP)); + Assert.assertEquals(originalTable.getSd(), newTable.getSd()); + } + + @Test + public void testCloneTable() throws IOException { + Table table = catalog.loadTable(TABLE_IDENTIFIER); + + List metadataVersionFiles = metadataVersionFiles(TABLE_NAME); + Assert.assertEquals(1, metadataVersionFiles.size()); + + TableIdentifier anotherTableIdentifier = TableIdentifier.of(DB_NAME, TABLE_NAME + "_new"); + Table anotherTable = catalog.registerTable(anotherTableIdentifier, metadataVersionFiles.get(0)); + + GenericRecordBuilder recordBuilder = new GenericRecordBuilder(AvroSchemaUtil.convert(schema, "test")); + List records = Lists.newArrayList( + recordBuilder.set("id", 1L).build(), + recordBuilder.set("id", 2L).build(), + recordBuilder.set("id", 3L).build() + ); + + String fileLocation = table.location().replace("file:", "") + "/data/file-1.avro"; + try (FileAppender writer = Avro.write(Files.localOutput(fileLocation)) + .schema(schema) + .named("test") + .build()) { + for (GenericData.Record rec : records) { + writer.add(rec); + } + } + DataFile file = DataFiles.builder(table.spec()) + .withRecordCount(3) + .withPath(fileLocation) + .withFileSizeInBytes(Files.localInput(fileLocation).getLength()) + .build(); + table.newAppend().appendFile(file).commit(); + + String anotherFileLocation = anotherTable.location().replace("file:", "") + "/data/file-2.avro"; + try (FileAppender writer = Avro.write(Files.localOutput(anotherFileLocation)) + .schema(schema) + .named("test") + .build()) { + for (GenericData.Record rec : records) { + writer.add(rec); + } + } + DataFile anotherFile = DataFiles.builder(anotherTable.spec()) + .withRecordCount(3) + .withPath(anotherFileLocation) + .withFileSizeInBytes(Files.localInput(anotherFileLocation).getLength()) + .build(); + anotherTable.newAppend().appendFile(anotherFile).commit(); + + // verify that both tables continue to function independently + Assert.assertNotEquals(table.currentSnapshot().manifests(), anotherTable.currentSnapshot().manifests()); + } }