diff --git a/core/src/main/java/org/apache/iceberg/BaseMetastoreCatalog.java b/core/src/main/java/org/apache/iceberg/BaseMetastoreCatalog.java index 2efcf80d2b9b..0139ec4f610b 100644 --- a/core/src/main/java/org/apache/iceberg/BaseMetastoreCatalog.java +++ b/core/src/main/java/org/apache/iceberg/BaseMetastoreCatalog.java @@ -286,7 +286,9 @@ private Transaction newReplaceTableTransaction(boolean orCreate) { * * @param io a FileIO to use for deletes * @param metadata the last valid TableMetadata instance for a dropped table. + * @deprecated will be removed in 0.11.0; use CatalogUtil.dropTableData instead. */ + @Deprecated protected static void dropTableData(FileIO io, TableMetadata metadata) { // Reads and deletes are done using Tasks.foreach(...).suppressFailureWhenFinished to complete // as much of the delete work as possible and avoid orphaned data or manifest files. diff --git a/core/src/main/java/org/apache/iceberg/CatalogUtil.java b/core/src/main/java/org/apache/iceberg/CatalogUtil.java new file mode 100644 index 000000000000..e1a1f28717a1 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/CatalogUtil.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg; + +import java.io.IOException; +import java.util.Map; +import java.util.Set; +import org.apache.iceberg.exceptions.RuntimeIOException; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.relocated.com.google.common.base.Joiner; +import org.apache.iceberg.relocated.com.google.common.collect.Iterables; +import org.apache.iceberg.relocated.com.google.common.collect.MapMaker; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.util.Tasks; +import org.apache.iceberg.util.ThreadPools; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class CatalogUtil { + private static final Logger LOG = LoggerFactory.getLogger(CatalogUtil.class); + + private CatalogUtil() { + } + + /** + * Drops all data and metadata files referenced by TableMetadata. + *

+ * This should be called by dropTable implementations to clean up table files once the table has been dropped in the + * metastore. + * + * @param io a FileIO to use for deletes + * @param metadata the last valid TableMetadata instance for a dropped table. + */ + public static void dropTableData(FileIO io, TableMetadata metadata) { + // Reads and deletes are done using Tasks.foreach(...).suppressFailureWhenFinished to complete + // as much of the delete work as possible and avoid orphaned data or manifest files. + + Set manifestListsToDelete = Sets.newHashSet(); + Set manifestsToDelete = Sets.newHashSet(); + for (Snapshot snapshot : metadata.snapshots()) { + // add all manifests to the delete set because both data and delete files should be removed + Iterables.addAll(manifestsToDelete, snapshot.allManifests()); + // add the manifest list to the delete set, if present + if (snapshot.manifestListLocation() != null) { + manifestListsToDelete.add(snapshot.manifestListLocation()); + } + } + + LOG.info("Manifests to delete: {}", Joiner.on(", ").join(manifestsToDelete)); + + // run all of the deletes + + deleteFiles(io, manifestsToDelete); + + Tasks.foreach(Iterables.transform(manifestsToDelete, ManifestFile::path)) + .noRetry().suppressFailureWhenFinished() + .onFailure((manifest, exc) -> LOG.warn("Delete failed for manifest: {}", manifest, exc)) + .run(io::deleteFile); + + Tasks.foreach(manifestListsToDelete) + .noRetry().suppressFailureWhenFinished() + .onFailure((list, exc) -> LOG.warn("Delete failed for manifest list: {}", list, exc)) + .run(io::deleteFile); + + Tasks.foreach(metadata.metadataFileLocation()) + .noRetry().suppressFailureWhenFinished() + .onFailure((list, exc) -> LOG.warn("Delete failed for metadata file: {}", list, exc)) + .run(io::deleteFile); + } + + private static void deleteFiles(FileIO io, Set allManifests) { + // keep track of deleted files in a map that can be cleaned up when memory runs low + Map deletedFiles = new MapMaker() + .concurrencyLevel(ThreadPools.WORKER_THREAD_POOL_SIZE) + .weakKeys() + .makeMap(); + + Tasks.foreach(allManifests) + .noRetry().suppressFailureWhenFinished() + .executeWith(ThreadPools.getWorkerPool()) + .onFailure((item, exc) -> LOG.warn("Failed to get deleted files: this may cause orphaned data files", exc)) + .run(manifest -> { + try (ManifestReader reader = ManifestFiles.open(manifest, io)) { + for (ManifestEntry entry : reader.entries()) { + // intern the file path because the weak key map uses identity (==) instead of equals + String path = entry.file().path().toString().intern(); + Boolean alreadyDeleted = deletedFiles.putIfAbsent(path, true); + if (alreadyDeleted == null || !alreadyDeleted) { + try { + io.deleteFile(path); + } catch (RuntimeException e) { + // this may happen if the map of deleted files gets cleaned up by gc + LOG.warn("Delete failed for data file: {}", path, e); + } + } + } + } catch (IOException e) { + throw new RuntimeIOException(e, "Failed to read manifest file: %s", manifest.path()); + } + }); + } +} diff --git a/core/src/main/java/org/apache/iceberg/hadoop/HadoopCatalog.java b/core/src/main/java/org/apache/iceberg/hadoop/HadoopCatalog.java index 235beff36567..e9408d7f2096 100644 --- a/core/src/main/java/org/apache/iceberg/hadoop/HadoopCatalog.java +++ b/core/src/main/java/org/apache/iceberg/hadoop/HadoopCatalog.java @@ -33,6 +33,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; import org.apache.iceberg.BaseMetastoreCatalog; +import org.apache.iceberg.CatalogUtil; import org.apache.iceberg.Schema; import org.apache.iceberg.TableMetadata; import org.apache.iceberg.TableOperations; @@ -198,7 +199,7 @@ public boolean dropTable(TableIdentifier identifier, boolean purge) { if (purge && lastMetadata != null) { // Since the data files and the metadata files may store in different locations, // so it has to call dropTableData to force delete the data file. - dropTableData(ops.io(), lastMetadata); + CatalogUtil.dropTableData(ops.io(), lastMetadata); } fs.delete(tablePath, true /* recursive */); return true; diff --git a/core/src/main/java/org/apache/iceberg/hadoop/HadoopTables.java b/core/src/main/java/org/apache/iceberg/hadoop/HadoopTables.java index d0667ebfa118..f8c59f48a881 100644 --- a/core/src/main/java/org/apache/iceberg/hadoop/HadoopTables.java +++ b/core/src/main/java/org/apache/iceberg/hadoop/HadoopTables.java @@ -19,11 +19,14 @@ package org.apache.iceberg.hadoop; +import java.io.IOException; +import java.io.UncheckedIOException; import java.util.Map; import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.iceberg.BaseTable; +import org.apache.iceberg.CatalogUtil; import org.apache.iceberg.MetadataTableType; import org.apache.iceberg.MetadataTableUtils; import org.apache.iceberg.PartitionSpec; @@ -144,6 +147,50 @@ public Table create(Schema schema, PartitionSpec spec, SortOrder order, return new BaseTable(ops, location); } + /** + * Drop a table and delete all data and metadata files. + * + * @param location a path URI (e.g. hdfs:///warehouse/my_table) + * @return true if the table was dropped, false if it did not exist + */ + public boolean dropTable(String location) { + return dropTable(location, true); + } + + /** + * Drop a table; optionally delete data and metadata files. + *

+ * If purge is set to true the implementation should delete all data and metadata files. + * + * @param location a path URI (e.g. hdfs:///warehouse/my_table) + * @param purge if true, delete all data and metadata files in the table + * @return true if the table was dropped, false if it did not exist + */ + public boolean dropTable(String location, boolean purge) { + TableOperations ops = newTableOps(location); + TableMetadata lastMetadata = null; + if (ops.current() != null) { + if (purge) { + lastMetadata = ops.current(); + } + } else { + return false; + } + + try { + if (purge && lastMetadata != null) { + // Since the data files and the metadata files may store in different locations, + // so it has to call dropTableData to force delete the data file. + CatalogUtil.dropTableData(ops.io(), lastMetadata); + } + Path tablePath = new Path(location); + Util.getFs(tablePath, conf).delete(tablePath, true /* recursive */); + return true; + } catch (IOException e) { + throw new UncheckedIOException("Failed to delete file: " + location, e); + } + } + @VisibleForTesting TableOperations newTableOps(String location) { if (location.contains(METADATA_JSON)) { diff --git a/core/src/test/java/org/apache/iceberg/hadoop/TestHadoopTablesSortOrder.java b/core/src/test/java/org/apache/iceberg/hadoop/TestHadoopTables.java similarity index 52% rename from core/src/test/java/org/apache/iceberg/hadoop/TestHadoopTablesSortOrder.java rename to core/src/test/java/org/apache/iceberg/hadoop/TestHadoopTables.java index 0b4e714655ac..7af55c7151e2 100644 --- a/core/src/test/java/org/apache/iceberg/hadoop/TestHadoopTablesSortOrder.java +++ b/core/src/test/java/org/apache/iceberg/hadoop/TestHadoopTables.java @@ -20,10 +20,20 @@ package org.apache.iceberg.hadoop; import java.io.File; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.ArrayList; +import org.apache.iceberg.AppendFiles; +import org.apache.iceberg.AssertHelpers; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DataFiles; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; import org.apache.iceberg.SortOrder; import org.apache.iceberg.Table; +import org.apache.iceberg.exceptions.NoSuchTableException; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.transforms.Transform; import org.apache.iceberg.transforms.Transforms; @@ -38,7 +48,7 @@ import static org.apache.iceberg.SortDirection.ASC; import static org.apache.iceberg.types.Types.NestedField.required; -public class TestHadoopTablesSortOrder { +public class TestHadoopTables { private static final HadoopTables TABLES = new HadoopTables(); private static final Schema SCHEMA = new Schema( @@ -48,12 +58,54 @@ public class TestHadoopTablesSortOrder { @Rule public TemporaryFolder temp = new TemporaryFolder(); - private String tableLocation = null; + private File tableDir = null; @Before public void setupTableLocation() throws Exception { - File tableDir = temp.newFolder(); - this.tableLocation = tableDir.toURI().toString(); + tableDir = temp.newFolder(); + } + + @Test + public void testDropTable() { + TABLES.create(SCHEMA, tableDir.toURI().toString()); + TABLES.dropTable(tableDir.toURI().toString()); + AssertHelpers.assertThrows( + "Should complain about missing table", NoSuchTableException.class, + "Table does not exist", () -> TABLES.load(tableDir.toURI().toString())); + } + + @Test + public void testDropTableWithPurge() throws IOException { + File dataDir = temp.newFolder(); + + createDummyTable(tableDir, dataDir); + + TABLES.dropTable(tableDir.toURI().toString(), true); + AssertHelpers.assertThrows( + "Should complain about missing table", NoSuchTableException.class, + "Table does not exist", () -> TABLES.load(tableDir.toURI().toString())); + + Assert.assertEquals(0, dataDir.listFiles().length); + Assert.assertFalse(tableDir.exists()); + + Assert.assertFalse(TABLES.dropTable(tableDir.toURI().toString())); + } + + @Test + public void testDropTableWithoutPurge() throws IOException { + File dataDir = temp.newFolder(); + + createDummyTable(tableDir, dataDir); + + TABLES.dropTable(tableDir.toURI().toString(), false); + AssertHelpers.assertThrows( + "Should complain about missing table", NoSuchTableException.class, + "Table does not exist", () -> TABLES.load(tableDir.toURI().toString())); + + Assert.assertEquals(1, dataDir.listFiles().length); + Assert.assertFalse(tableDir.exists()); + + Assert.assertFalse(TABLES.dropTable(tableDir.toURI().toString())); } @Test @@ -61,7 +113,7 @@ public void testDefaultSortOrder() { PartitionSpec spec = PartitionSpec.builderFor(SCHEMA) .bucket("data", 16) .build(); - Table table = TABLES.create(SCHEMA, spec, tableLocation); + Table table = TABLES.create(SCHEMA, spec, tableDir.toURI().toString()); SortOrder sortOrder = table.sortOrder(); Assert.assertEquals("Order ID must match", 0, sortOrder.orderId()); @@ -76,7 +128,7 @@ public void testCustomSortOrder() { SortOrder order = SortOrder.builderFor(SCHEMA) .asc("id", NULLS_FIRST) .build(); - Table table = TABLES.create(SCHEMA, spec, order, Maps.newHashMap(), tableLocation); + Table table = TABLES.create(SCHEMA, spec, order, Maps.newHashMap(), tableDir.toURI().toString()); SortOrder sortOrder = table.sortOrder(); Assert.assertEquals("Order ID must match", 1, sortOrder.orderId()); @@ -86,4 +138,22 @@ public void testCustomSortOrder() { Transform transform = Transforms.identity(Types.IntegerType.get()); Assert.assertEquals("Transform must match", transform, sortOrder.fields().get(0).transform()); } + + private static void createDummyTable(File tableDir, File dataDir) throws IOException { + Table table = TABLES.create(SCHEMA, tableDir.toURI().toString()); + AppendFiles append = table.newAppend(); + String data = dataDir.getPath() + "/data.parquet"; + Files.write(Paths.get(data), new ArrayList<>(), StandardCharsets.UTF_8); + DataFile dataFile = DataFiles.builder(PartitionSpec.unpartitioned()) + .withPath(data) + .withFileSizeInBytes(10) + .withRecordCount(1) + .build(); + append.appendFile(dataFile); + append.commit(); + + // Make sure that the data file and the manifest dir is created + Assert.assertEquals(1, dataDir.listFiles().length); + Assert.assertEquals(1, tableDir.listFiles().length); + } } diff --git a/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveCatalog.java b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveCatalog.java index 4862266749e8..eca5efd89b95 100644 --- a/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveCatalog.java +++ b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveCatalog.java @@ -36,6 +36,7 @@ import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.metastore.api.UnknownDBException; import org.apache.iceberg.BaseMetastoreCatalog; +import org.apache.iceberg.CatalogUtil; import org.apache.iceberg.TableMetadata; import org.apache.iceberg.TableOperations; import org.apache.iceberg.catalog.Namespace; @@ -140,7 +141,7 @@ public boolean dropTable(TableIdentifier identifier, boolean purge) { }); if (purge && lastMetadata != null) { - dropTableData(ops.io(), lastMetadata); + CatalogUtil.dropTableData(ops.io(), lastMetadata); } LOG.info("Dropped table: {}", identifier); diff --git a/mr/src/main/java/org/apache/iceberg/mr/Catalogs.java b/mr/src/main/java/org/apache/iceberg/mr/Catalogs.java index 75790479f2ee..aba158add705 100644 --- a/mr/src/main/java/org/apache/iceberg/mr/Catalogs.java +++ b/mr/src/main/java/org/apache/iceberg/mr/Catalogs.java @@ -19,9 +19,16 @@ package org.apache.iceberg.mr; +import java.util.HashMap; +import java.util.Map; import java.util.Optional; import java.util.Properties; +import java.util.Set; import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.PartitionSpecParser; +import org.apache.iceberg.Schema; +import org.apache.iceberg.SchemaParser; import org.apache.iceberg.Table; import org.apache.iceberg.catalog.Catalog; import org.apache.iceberg.catalog.TableIdentifier; @@ -32,9 +39,20 @@ import org.apache.iceberg.hive.HiveCatalogs; import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +/** + * Class for catalog resolution and accessing the common functions for {@link Catalog} API. + *

+ * Catalog resolution happens in this order: + *

    + *
  1. Custom catalog if specified by {@link InputFormatConfig#CATALOG_LOADER_CLASS} + *
  2. Hadoop or Hive catalog if specified by {@link InputFormatConfig#CATALOG} + *
  3. Hadoop Tables + *
+ */ public final class Catalogs { private static final Logger LOG = LoggerFactory.getLogger(Catalogs.class); @@ -44,15 +62,14 @@ public final class Catalogs { private static final String NAME = "name"; private static final String LOCATION = "location"; + private static final Set PROPERTIES_TO_REMOVE = + ImmutableSet.of(InputFormatConfig.TABLE_SCHEMA, InputFormatConfig.PARTITION_SPEC, LOCATION, NAME); + private Catalogs() { } /** * Load an Iceberg table using the catalog and table identifier (or table path) specified by the configuration. - * Catalog resolution happens in this order: - * 1. Custom catalog if specified by {@link InputFormatConfig#CATALOG_LOADER_CLASS} - * 2. Hadoop or Hive catalog if specified by {@link InputFormatConfig#CATALOG} - * 3. Hadoop Tables * @param conf a Hadoop conf * @return an Iceberg table */ @@ -60,7 +77,17 @@ public static Table loadTable(Configuration conf) { return loadTable(conf, conf.get(InputFormatConfig.TABLE_IDENTIFIER), conf.get(InputFormatConfig.TABLE_LOCATION)); } - // For use in HiveIcebergSerDe and HiveIcebergStorageHandler + /** + * Load an Iceberg table using the catalog specified by the configuration. + *

+ * The table identifier ({@link Catalogs#NAME}) or table path ({@link Catalogs#LOCATION}) should be specified by + * the controlling properties. + *

+ * Used by HiveIcebergSerDe and HiveIcebergStorageHandler + * @param conf a Hadoop + * @param props the controlling properties + * @return an Iceberg table + */ public static Table loadTable(Configuration conf, Properties props) { return loadTable(conf, props.getProperty(NAME), props.getProperty(LOCATION)); } @@ -77,6 +104,79 @@ private static Table loadTable(Configuration conf, String tableIdentifier, Strin return new HadoopTables(conf).load(tableLocation); } + /** + * Creates an Iceberg table using the catalog specified by the configuration. + *

+ * The properties should contain the following values: + *

+ * Other properties will be handled over to the Table creation. The controlling properties above will not be + * propagated. + * @param conf a Hadoop conf + * @param props the controlling properties + * @return the created Iceberg table + */ + public static Table createTable(Configuration conf, Properties props) { + String schemaString = props.getProperty(InputFormatConfig.TABLE_SCHEMA); + Preconditions.checkNotNull(schemaString, "Table schema not set"); + Schema schema = SchemaParser.fromJson(props.getProperty(InputFormatConfig.TABLE_SCHEMA)); + + String specString = props.getProperty(InputFormatConfig.PARTITION_SPEC); + PartitionSpec spec = PartitionSpec.unpartitioned(); + if (specString != null) { + spec = PartitionSpecParser.fromJson(schema, specString); + } + + String location = props.getProperty(LOCATION); + + // Create a table property map without the controlling properties + Map map = new HashMap<>(props.size()); + for (Object key : props.keySet()) { + if (!PROPERTIES_TO_REMOVE.contains(key)) { + map.put(key.toString(), props.get(key).toString()); + } + } + + Optional catalog = loadCatalog(conf); + + if (catalog.isPresent()) { + String name = props.getProperty(NAME); + Preconditions.checkNotNull(name, "Table identifier not set"); + return catalog.get().createTable(TableIdentifier.parse(name), schema, spec, location, map); + } + + Preconditions.checkNotNull(location, "Table location not set"); + return new HadoopTables(conf).create(schema, spec, map, location); + } + + /** + * Drops an Iceberg table using the catalog specified by the configuration. + *

+ * The table identifier ({@link Catalogs#NAME}) or table path ({@link Catalogs#LOCATION}) should be specified by + * the controlling properties. + * @param conf a Hadoop conf + * @param props the controlling properties + * @return the created Iceberg table + */ + public static boolean dropTable(Configuration conf, Properties props) { + String location = props.getProperty(LOCATION); + + Optional catalog = loadCatalog(conf); + + if (catalog.isPresent()) { + String name = props.getProperty(NAME); + Preconditions.checkNotNull(name, "Table identifier not set"); + return catalog.get().dropTable(TableIdentifier.parse(name)); + } + + Preconditions.checkNotNull(location, "Table location not set"); + return new HadoopTables(conf).dropTable(location); + } + @VisibleForTesting static Optional loadCatalog(Configuration conf) { String catalogLoaderClass = conf.get(InputFormatConfig.CATALOG_LOADER_CLASS); diff --git a/mr/src/main/java/org/apache/iceberg/mr/InputFormatConfig.java b/mr/src/main/java/org/apache/iceberg/mr/InputFormatConfig.java index c7d45453b172..1f2502559356 100644 --- a/mr/src/main/java/org/apache/iceberg/mr/InputFormatConfig.java +++ b/mr/src/main/java/org/apache/iceberg/mr/InputFormatConfig.java @@ -43,6 +43,7 @@ private InputFormatConfig() { public static final String TABLE_IDENTIFIER = "iceberg.mr.table.identifier"; public static final String TABLE_LOCATION = "iceberg.mr.table.location"; public static final String TABLE_SCHEMA = "iceberg.mr.table.schema"; + public static final String PARTITION_SPEC = "iceberg.mr.table.partition.spec"; public static final String LOCALITY = "iceberg.mr.locality"; public static final String CATALOG = "iceberg.mr.catalog"; public static final String HADOOP_CATALOG_WAREHOUSE_LOCATION = "iceberg.mr.catalog.hadoop.warehouse.location"; diff --git a/mr/src/test/java/org/apache/iceberg/mr/TestCatalogs.java b/mr/src/test/java/org/apache/iceberg/mr/TestCatalogs.java index f92ba2f4afb5..2cb27d0029f2 100644 --- a/mr/src/test/java/org/apache/iceberg/mr/TestCatalogs.java +++ b/mr/src/test/java/org/apache/iceberg/mr/TestCatalogs.java @@ -20,14 +20,20 @@ package org.apache.iceberg.mr; import java.io.IOException; +import java.util.Collections; import java.util.Optional; +import java.util.Properties; import org.apache.hadoop.conf.Configuration; import org.apache.iceberg.AssertHelpers; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.PartitionSpecParser; import org.apache.iceberg.Schema; +import org.apache.iceberg.SchemaParser; import org.apache.iceberg.Table; import org.apache.iceberg.catalog.Catalog; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.exceptions.NoSuchNamespaceException; +import org.apache.iceberg.exceptions.NoSuchTableException; import org.apache.iceberg.hadoop.HadoopCatalog; import org.apache.iceberg.hadoop.HadoopTables; import org.apache.iceberg.hive.HiveCatalog; @@ -43,6 +49,7 @@ public class TestCatalogs { private static final Schema SCHEMA = new Schema(required(1, "foo", Types.StringType.get())); + private static final PartitionSpec SPEC = PartitionSpec.builderFor(SCHEMA).identity("foo").build(); private Configuration conf; @@ -85,6 +92,96 @@ public void testLoadTableFromCatalog() throws IOException { Assert.assertEquals(hadoopCatalogTable.location(), Catalogs.loadTable(conf).location()); } + @Test + public void testCreateDropTableToLocation() throws IOException { + Properties missingSchema = new Properties(); + missingSchema.put("location", temp.newFolder("hadoop_tables").toString()); + AssertHelpers.assertThrows( + "Should complain about table schema not set", NullPointerException.class, + "schema not set", () -> Catalogs.createTable(conf, missingSchema)); + + Properties missingLocation = new Properties(); + missingLocation.put(InputFormatConfig.TABLE_SCHEMA, SchemaParser.toJson(SCHEMA)); + AssertHelpers.assertThrows( + "Should complain about table location not set", NullPointerException.class, + "location not set", () -> Catalogs.createTable(conf, missingLocation)); + + Properties properties = new Properties(); + properties.put("location", temp.toString() + "/hadoop_tables"); + properties.put(InputFormatConfig.TABLE_SCHEMA, SchemaParser.toJson(SCHEMA)); + properties.put(InputFormatConfig.PARTITION_SPEC, PartitionSpecParser.toJson(SPEC)); + properties.put("dummy", "test"); + + Catalogs.createTable(conf, properties); + + HadoopTables tables = new HadoopTables(); + Table table = tables.load(properties.getProperty("location")); + + Assert.assertEquals(properties.getProperty("location"), table.location()); + Assert.assertEquals(SchemaParser.toJson(SCHEMA), SchemaParser.toJson(table.schema())); + Assert.assertEquals(PartitionSpecParser.toJson(SPEC), PartitionSpecParser.toJson(table.spec())); + Assert.assertEquals(Collections.singletonMap("dummy", "test"), table.properties()); + + AssertHelpers.assertThrows( + "Should complain about table location not set", NullPointerException.class, + "location not set", () -> Catalogs.dropTable(conf, new Properties())); + + Properties dropProperties = new Properties(); + dropProperties.put("location", temp.toString() + "/hadoop_tables"); + Catalogs.dropTable(conf, dropProperties); + + AssertHelpers.assertThrows( + "Should complain about table not found", NoSuchTableException.class, + "Table does not exist", () -> Catalogs.loadTable(conf, dropProperties)); + } + + @Test + public void testCreateDropTableToCatalog() throws IOException { + TableIdentifier identifier = TableIdentifier.of("test", "table"); + + conf.set("warehouse.location", temp.newFolder("hadoop", "warehouse").toString()); + conf.setClass(InputFormatConfig.CATALOG_LOADER_CLASS, CustomHadoopCatalogLoader.class, CatalogLoader.class); + + Properties missingSchema = new Properties(); + missingSchema.put("name", identifier.toString()); + AssertHelpers.assertThrows( + "Should complain about table schema not set", NullPointerException.class, + "schema not set", () -> Catalogs.createTable(conf, missingSchema)); + + Properties missingIdentifier = new Properties(); + missingIdentifier.put(InputFormatConfig.TABLE_SCHEMA, SchemaParser.toJson(SCHEMA)); + AssertHelpers.assertThrows( + "Should complain about table identifier not set", NullPointerException.class, + "identifier not set", () -> Catalogs.createTable(conf, missingIdentifier)); + + Properties properties = new Properties(); + properties.put("name", identifier.toString()); + properties.put(InputFormatConfig.TABLE_SCHEMA, SchemaParser.toJson(SCHEMA)); + properties.put(InputFormatConfig.PARTITION_SPEC, PartitionSpecParser.toJson(SPEC)); + properties.put("dummy", "test"); + + Catalogs.createTable(conf, properties); + + HadoopCatalog catalog = new CustomHadoopCatalog(conf); + Table table = catalog.loadTable(identifier); + + Assert.assertEquals(SchemaParser.toJson(SCHEMA), SchemaParser.toJson(table.schema())); + Assert.assertEquals(PartitionSpecParser.toJson(SPEC), PartitionSpecParser.toJson(table.spec())); + Assert.assertEquals(Collections.singletonMap("dummy", "test"), table.properties()); + + AssertHelpers.assertThrows( + "Should complain about table identifier not set", NullPointerException.class, + "identifier not set", () -> Catalogs.dropTable(conf, new Properties())); + + Properties dropProperties = new Properties(); + dropProperties.put("name", identifier.toString()); + Catalogs.dropTable(conf, dropProperties); + + AssertHelpers.assertThrows( + "Should complain about table not found", NoSuchTableException.class, + "Table does not exist", () -> Catalogs.loadTable(conf, dropProperties)); + } + @Test public void testLoadCatalog() throws IOException { Assert.assertFalse(Catalogs.loadCatalog(conf).isPresent());