Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions api/src/main/java/org/apache/iceberg/catalog/Catalog.java
Original file line number Diff line number Diff line change
Expand Up @@ -137,4 +137,13 @@ default boolean tableExists(TableIdentifier identifier) {
* @throws NoSuchTableException if the table does not exist
*/
Table loadTable(TableIdentifier identifier);

/**
* Register a table.
*
* @param identifier a table identifier
* @param metadataFileLocation the location of a metadata file
* @return a Table instance
*/
Table registerTable(TableIdentifier identifier, String metadataFileLocation);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe, import would be a better name.

}
16 changes: 5 additions & 11 deletions core/src/main/java/org/apache/iceberg/BaseMetastoreCatalog.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,13 @@

import com.google.common.collect.Maps;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.exceptions.AlreadyExistsException;
import org.apache.iceberg.exceptions.CommitFailedException;
import org.apache.iceberg.exceptions.NoSuchTableException;

public abstract class BaseMetastoreCatalog implements Catalog {
private final Configuration conf;

protected BaseMetastoreCatalog(Configuration conf) {
this.conf = conf;
}

@Override
public Table createTable(
Expand All @@ -42,7 +36,7 @@ public Table createTable(
PartitionSpec spec,
String location,
Map<String, String> properties) {
TableOperations ops = newTableOps(conf, identifier);
TableOperations ops = newTableOps(identifier);
if (ops.current() != null) {
throw new AlreadyExistsException("Table already exists: " + identifier);
}
Expand All @@ -51,7 +45,7 @@ public Table createTable(
if (location != null) {
baseLocation = location;
} else {
baseLocation = defaultWarehouseLocation(conf, identifier);
baseLocation = defaultWarehouseLocation(identifier);
}

TableMetadata metadata = TableMetadata.newTableMetadata(
Expand All @@ -68,15 +62,15 @@ public Table createTable(

@Override
public Table loadTable(TableIdentifier identifier) {
TableOperations ops = newTableOps(conf, identifier);
TableOperations ops = newTableOps(identifier);
if (ops.current() == null) {
throw new NoSuchTableException("Table does not exist: " + identifier);
}

return new BaseTable(ops, identifier.toString());
}

protected abstract TableOperations newTableOps(Configuration newConf, TableIdentifier tableIdentifier);
protected abstract TableOperations newTableOps(TableIdentifier tableIdentifier);

protected abstract String defaultWarehouseLocation(Configuration hadoopConf, TableIdentifier tableIdentifier);
protected abstract String defaultWarehouseLocation(TableIdentifier tableIdentifier);
}
24 changes: 19 additions & 5 deletions hive/src/main/java/org/apache/iceberg/hive/HiveCatalog.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,18 +26,23 @@
import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.iceberg.BaseMetastoreCatalog;
import org.apache.iceberg.BaseTable;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.TableMetadataParser;
import org.apache.iceberg.TableOperations;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.hadoop.HadoopInputFile;
import org.apache.thrift.TException;

public class HiveCatalog extends BaseMetastoreCatalog implements Closeable {

private final Configuration conf;
private final HiveClientPool clients;

public HiveCatalog(Configuration conf) {
super(conf);
this.conf = conf;
this.clients = new HiveClientPool(2, conf);
}

Expand Down Expand Up @@ -113,14 +118,23 @@ public void renameTable(TableIdentifier from, TableIdentifier to) {
}

@Override
public TableOperations newTableOps(Configuration configuration, TableIdentifier tableIdentifier) {
public org.apache.iceberg.Table registerTable(TableIdentifier identifier, String metadataFileLocation) {
TableOperations ops = newTableOps(identifier);
HadoopInputFile metadataFile = HadoopInputFile.fromLocation(metadataFileLocation, conf);
TableMetadata metadata = TableMetadataParser.read(ops, metadataFile);
ops.commit(null, metadata);
return new BaseTable(ops, identifier.toString());
}

@Override
public TableOperations newTableOps(TableIdentifier tableIdentifier) {
String dbName = tableIdentifier.namespace().level(0);
String tableName = tableIdentifier.name();
return new HiveTableOperations(configuration, clients, dbName, tableName);
return new HiveTableOperations(conf, clients, dbName, tableName);
}

protected String defaultWarehouseLocation(Configuration hadoopConf, TableIdentifier tableIdentifier) {
String warehouseLocation = hadoopConf.get("hive.metastore.warehouse.dir");
protected String defaultWarehouseLocation(TableIdentifier tableIdentifier) {
String warehouseLocation = conf.get("hive.metastore.warehouse.dir");
Preconditions.checkNotNull(
warehouseLocation,
"Warehouse location is not set: hive.metastore.warehouse.dir=null");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,9 @@ public void commit(TableMetadata base, TableMetadata metadata) {
return;
}

String newMetadataLocation = writeNewMetadata(metadata, currentVersion() + 1);
String newMetadataLocation = metadata.file() == null ?
writeNewMetadata(metadata, currentVersion() + 1) :
metadata.file().location();

boolean threw = true;
Optional<Long> lockId = Optional.empty();
Expand Down
80 changes: 80 additions & 0 deletions hive/src/test/java/org/apache/iceberg/hive/HiveTableTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@

import static org.apache.iceberg.BaseMetastoreTableOperations.ICEBERG_TABLE_TYPE_VALUE;
import static org.apache.iceberg.BaseMetastoreTableOperations.METADATA_LOCATION_PROP;
import static org.apache.iceberg.BaseMetastoreTableOperations.PREVIOUS_METADATA_LOCATION_PROP;
import static org.apache.iceberg.BaseMetastoreTableOperations.TABLE_TYPE_PROP;

public class HiveTableTest extends HiveTableBaseTest {
Expand Down Expand Up @@ -223,4 +224,83 @@ public void testConcurrentFastAppends() {
icebergTable.refresh();
Assert.assertEquals(20, icebergTable.currentSnapshot().manifests().size());
}

@Test
public void testRegisterTable() throws TException {
org.apache.hadoop.hive.metastore.api.Table originalTable = metastoreClient.getTable(DB_NAME, TABLE_NAME);

Map<String, String> originalParams = originalTable.getParameters();
Assert.assertNotNull(originalParams);
Assert.assertTrue(ICEBERG_TABLE_TYPE_VALUE.equalsIgnoreCase(originalParams.get(TABLE_TYPE_PROP)));
Assert.assertTrue("EXTERNAL_TABLE".equalsIgnoreCase(originalTable.getTableType()));

catalog.dropTable(TABLE_IDENTIFIER);
Assert.assertFalse(catalog.tableExists(TABLE_IDENTIFIER));

List<String> metadataVersionFiles = metadataVersionFiles(TABLE_NAME);
Assert.assertEquals(1, metadataVersionFiles.size());

catalog.registerTable(TABLE_IDENTIFIER, "file:" + metadataVersionFiles.get(0));

org.apache.hadoop.hive.metastore.api.Table newTable = metastoreClient.getTable(DB_NAME, TABLE_NAME);

Map<String, String> newTableParameters = newTable.getParameters();
Assert.assertNull(newTableParameters.get(PREVIOUS_METADATA_LOCATION_PROP));
Assert.assertEquals(originalParams.get(TABLE_TYPE_PROP), newTableParameters.get(TABLE_TYPE_PROP));
Assert.assertEquals(originalParams.get(METADATA_LOCATION_PROP), newTableParameters.get(METADATA_LOCATION_PROP));
Assert.assertEquals(originalTable.getSd(), newTable.getSd());
}

@Test
public void testCloneTable() throws IOException {
Table table = catalog.loadTable(TABLE_IDENTIFIER);

List<String> metadataVersionFiles = metadataVersionFiles(TABLE_NAME);
Assert.assertEquals(1, metadataVersionFiles.size());

TableIdentifier anotherTableIdentifier = TableIdentifier.of(DB_NAME, TABLE_NAME + "_new");
Table anotherTable = catalog.registerTable(anotherTableIdentifier, metadataVersionFiles.get(0));

GenericRecordBuilder recordBuilder = new GenericRecordBuilder(AvroSchemaUtil.convert(schema, "test"));
List<GenericData.Record> records = Lists.newArrayList(
recordBuilder.set("id", 1L).build(),
recordBuilder.set("id", 2L).build(),
recordBuilder.set("id", 3L).build()
);

String fileLocation = table.location().replace("file:", "") + "/data/file-1.avro";
try (FileAppender<GenericData.Record> writer = Avro.write(Files.localOutput(fileLocation))
.schema(schema)
.named("test")
.build()) {
for (GenericData.Record rec : records) {
writer.add(rec);
}
}
DataFile file = DataFiles.builder(table.spec())
.withRecordCount(3)
.withPath(fileLocation)
.withFileSizeInBytes(Files.localInput(fileLocation).getLength())
.build();
table.newAppend().appendFile(file).commit();

String anotherFileLocation = anotherTable.location().replace("file:", "") + "/data/file-2.avro";
try (FileAppender<GenericData.Record> writer = Avro.write(Files.localOutput(anotherFileLocation))
.schema(schema)
.named("test")
.build()) {
for (GenericData.Record rec : records) {
writer.add(rec);
}
}
DataFile anotherFile = DataFiles.builder(anotherTable.spec())
.withRecordCount(3)
.withPath(anotherFileLocation)
.withFileSizeInBytes(Files.localInput(anotherFileLocation).getLength())
.build();
anotherTable.newAppend().appendFile(anotherFile).commit();

// verify that both tables continue to function independently
Assert.assertNotEquals(table.currentSnapshot().manifests(), anotherTable.currentSnapshot().manifests());
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have to say that the registered table won't have previous_metadata_location in its table properties. This doesn't seem like an issue to me.

}
}