From 7cc01b9b312057900fbdd3b40b3e3f6835bce1e2 Mon Sep 17 00:00:00 2001 From: mcheah Date: Fri, 16 Nov 2018 16:54:33 -0800 Subject: [PATCH 01/10] Support customizing the location where data is written in Spark. --- .../java/com/netflix/iceberg/TableProperties.java | 4 ++++ .../netflix/iceberg/spark/source/IcebergSource.java | 9 ++++++++- .../java/com/netflix/iceberg/spark/source/Writer.java | 11 ++++------- .../iceberg/spark/source/TestParquetWrite.java | 4 ++++ 4 files changed, 20 insertions(+), 8 deletions(-) diff --git a/core/src/main/java/com/netflix/iceberg/TableProperties.java b/core/src/main/java/com/netflix/iceberg/TableProperties.java index cebe78f6c93c..19a27068944d 100644 --- a/core/src/main/java/com/netflix/iceberg/TableProperties.java +++ b/core/src/main/java/com/netflix/iceberg/TableProperties.java @@ -63,4 +63,8 @@ public class TableProperties { public static final boolean OBJECT_STORE_ENABLED_DEFAULT = false; public static final String OBJECT_STORE_PATH = "write.object-storage.path"; + + // This only applies to files written after this property is set. Files previously written aren't relocated to + // reflect this parameter. + public static final String WRITE_NEW_DATA_LOCATION = "write.data.location"; } diff --git a/spark/src/main/java/com/netflix/iceberg/spark/source/IcebergSource.java b/spark/src/main/java/com/netflix/iceberg/spark/source/IcebergSource.java index 3db8a2236dbe..446aff65ef34 100644 --- a/spark/src/main/java/com/netflix/iceberg/spark/source/IcebergSource.java +++ b/spark/src/main/java/com/netflix/iceberg/spark/source/IcebergSource.java @@ -20,10 +20,13 @@ import com.netflix.iceberg.FileFormat; import com.netflix.iceberg.Schema; import com.netflix.iceberg.Table; +import com.netflix.iceberg.TableProperties; import com.netflix.iceberg.hadoop.HadoopTables; import com.netflix.iceberg.spark.SparkSchemaUtil; import com.netflix.iceberg.types.CheckCompatibility; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; + import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.sources.DataSourceRegister; @@ -86,7 +89,11 @@ public Optional createWriter(String jobId, StructType dfStruct .toUpperCase(Locale.ENGLISH)); } - return Optional.of(new Writer(table, lazyConf(), format)); + String dataLocation = options.get(TableProperties.WRITE_NEW_DATA_LOCATION) + .orElse(table.properties().getOrDefault( + TableProperties.WRITE_NEW_DATA_LOCATION, + new Path(new Path(table.location()), "data").toString())); + return Optional.of(new Writer(table, lazyConf(), format, dataLocation)); } protected Table findTable(DataSourceOptions options) { diff --git a/spark/src/main/java/com/netflix/iceberg/spark/source/Writer.java b/spark/src/main/java/com/netflix/iceberg/spark/source/Writer.java index bb0202cabf57..71f7c679bf9f 100644 --- a/spark/src/main/java/com/netflix/iceberg/spark/source/Writer.java +++ b/spark/src/main/java/com/netflix/iceberg/spark/source/Writer.java @@ -91,16 +91,18 @@ class Writer implements DataSourceWriter, SupportsWriteInternalRow { private final Table table; private final Configuration conf; private final FileFormat format; + private final String dataLocation; - Writer(Table table, Configuration conf, FileFormat format) { + Writer(Table table, Configuration conf, FileFormat format, String dataLocation) { this.table = table; this.conf = conf; this.format = format; + this.dataLocation = dataLocation; } @Override public DataWriterFactory createInternalRowWriterFactory() { - return new WriterFactory(table.spec(), format, dataLocation(), table.properties(), conf); + return new WriterFactory(table.spec(), format, dataLocation, table.properties(), conf); } @Override @@ -164,17 +166,12 @@ private int propertyAsInt(String property, int defaultValue) { return defaultValue; } - private String dataLocation() { - return new Path(new Path(table.location()), "data").toString(); - } - @Override public String toString() { return String.format("IcebergWrite(table=%s, type=%s, format=%s)", table, table.schema().asStruct(), format); } - private static class TaskCommit implements WriterCommitMessage { private final DataFile[] files; diff --git a/spark/src/test/java/com/netflix/iceberg/spark/source/TestParquetWrite.java b/spark/src/test/java/com/netflix/iceberg/spark/source/TestParquetWrite.java index c8cb93f300cc..cf0b9edb0a4f 100644 --- a/spark/src/test/java/com/netflix/iceberg/spark/source/TestParquetWrite.java +++ b/spark/src/test/java/com/netflix/iceberg/spark/source/TestParquetWrite.java @@ -20,6 +20,7 @@ import com.netflix.iceberg.PartitionSpec; import com.netflix.iceberg.Schema; import com.netflix.iceberg.Table; +import com.netflix.iceberg.TableProperties; import com.netflix.iceberg.hadoop.HadoopTables; import com.netflix.iceberg.types.Types; import org.apache.hadoop.conf.Configuration; @@ -68,7 +69,9 @@ public static void stopSpark() { public void testBasicWrite() throws IOException { File parent = temp.newFolder("parquet"); File location = new File(parent, "test"); + File dataLocation = new File(parent, "test-data"); location.mkdirs(); + dataLocation.mkdirs(); HadoopTables tables = new HadoopTables(CONF); PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).identity("data").build(); @@ -85,6 +88,7 @@ public void testBasicWrite() throws IOException { // TODO: incoming columns must be ordered according to the table's schema df.select("id", "data").write() .format("iceberg") + .option(TableProperties.WRITE_NEW_DATA_LOCATION, "file://" + dataLocation.getAbsolutePath()) .mode("append") .save(location.toString()); From dcc3404039147193b80f15a617fece02f7b8df22 Mon Sep 17 00:00:00 2001 From: mcheah Date: Fri, 16 Nov 2018 16:54:33 -0800 Subject: [PATCH 02/10] Support customizing the location where data is written in Spark. --- .../java/com/netflix/iceberg/TableProperties.java | 4 ++++ .../netflix/iceberg/spark/source/IcebergSource.java | 9 ++++++++- .../java/com/netflix/iceberg/spark/source/Writer.java | 11 ++++------- .../iceberg/spark/source/TestParquetWrite.java | 4 ++++ 4 files changed, 20 insertions(+), 8 deletions(-) diff --git a/core/src/main/java/com/netflix/iceberg/TableProperties.java b/core/src/main/java/com/netflix/iceberg/TableProperties.java index 6ca09a5475a4..4771822bf111 100644 --- a/core/src/main/java/com/netflix/iceberg/TableProperties.java +++ b/core/src/main/java/com/netflix/iceberg/TableProperties.java @@ -66,4 +66,8 @@ public class TableProperties { public static final boolean OBJECT_STORE_ENABLED_DEFAULT = false; public static final String OBJECT_STORE_PATH = "write.object-storage.path"; + + // This only applies to files written after this property is set. Files previously written aren't relocated to + // reflect this parameter. + public static final String WRITE_NEW_DATA_LOCATION = "write.data.location"; } diff --git a/spark/src/main/java/com/netflix/iceberg/spark/source/IcebergSource.java b/spark/src/main/java/com/netflix/iceberg/spark/source/IcebergSource.java index 7daa330d1271..2b4d53f7dce2 100644 --- a/spark/src/main/java/com/netflix/iceberg/spark/source/IcebergSource.java +++ b/spark/src/main/java/com/netflix/iceberg/spark/source/IcebergSource.java @@ -23,10 +23,13 @@ import com.netflix.iceberg.FileFormat; import com.netflix.iceberg.Schema; import com.netflix.iceberg.Table; +import com.netflix.iceberg.TableProperties; import com.netflix.iceberg.hadoop.HadoopTables; import com.netflix.iceberg.spark.SparkSchemaUtil; import com.netflix.iceberg.types.CheckCompatibility; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; + import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.sources.DataSourceRegister; @@ -89,7 +92,11 @@ public Optional createWriter(String jobId, StructType dfStruct .toUpperCase(Locale.ENGLISH)); } - return Optional.of(new Writer(table, lazyConf(), format)); + String dataLocation = options.get(TableProperties.WRITE_NEW_DATA_LOCATION) + .orElse(table.properties().getOrDefault( + TableProperties.WRITE_NEW_DATA_LOCATION, + new Path(new Path(table.location()), "data").toString())); + return Optional.of(new Writer(table, lazyConf(), format, dataLocation)); } protected Table findTable(DataSourceOptions options) { diff --git a/spark/src/main/java/com/netflix/iceberg/spark/source/Writer.java b/spark/src/main/java/com/netflix/iceberg/spark/source/Writer.java index e72947470ae4..7394745290af 100644 --- a/spark/src/main/java/com/netflix/iceberg/spark/source/Writer.java +++ b/spark/src/main/java/com/netflix/iceberg/spark/source/Writer.java @@ -94,16 +94,18 @@ class Writer implements DataSourceWriter, SupportsWriteInternalRow { private final Table table; private final Configuration conf; private final FileFormat format; + private final String dataLocation; - Writer(Table table, Configuration conf, FileFormat format) { + Writer(Table table, Configuration conf, FileFormat format, String dataLocation) { this.table = table; this.conf = conf; this.format = format; + this.dataLocation = dataLocation; } @Override public DataWriterFactory createInternalRowWriterFactory() { - return new WriterFactory(table.spec(), format, dataLocation(), table.properties(), conf); + return new WriterFactory(table.spec(), format, dataLocation, table.properties(), conf); } @Override @@ -167,17 +169,12 @@ private int propertyAsInt(String property, int defaultValue) { return defaultValue; } - private String dataLocation() { - return new Path(new Path(table.location()), "data").toString(); - } - @Override public String toString() { return String.format("IcebergWrite(table=%s, type=%s, format=%s)", table, table.schema().asStruct(), format); } - private static class TaskCommit implements WriterCommitMessage { private final DataFile[] files; diff --git a/spark/src/test/java/com/netflix/iceberg/spark/source/TestParquetWrite.java b/spark/src/test/java/com/netflix/iceberg/spark/source/TestParquetWrite.java index 4f71eada6392..bcad4cf29247 100644 --- a/spark/src/test/java/com/netflix/iceberg/spark/source/TestParquetWrite.java +++ b/spark/src/test/java/com/netflix/iceberg/spark/source/TestParquetWrite.java @@ -23,6 +23,7 @@ import com.netflix.iceberg.PartitionSpec; import com.netflix.iceberg.Schema; import com.netflix.iceberg.Table; +import com.netflix.iceberg.TableProperties; import com.netflix.iceberg.hadoop.HadoopTables; import com.netflix.iceberg.types.Types; import org.apache.hadoop.conf.Configuration; @@ -71,7 +72,9 @@ public static void stopSpark() { public void testBasicWrite() throws IOException { File parent = temp.newFolder("parquet"); File location = new File(parent, "test"); + File dataLocation = new File(parent, "test-data"); location.mkdirs(); + dataLocation.mkdirs(); HadoopTables tables = new HadoopTables(CONF); PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).identity("data").build(); @@ -88,6 +91,7 @@ public void testBasicWrite() throws IOException { // TODO: incoming columns must be ordered according to the table's schema df.select("id", "data").write() .format("iceberg") + .option(TableProperties.WRITE_NEW_DATA_LOCATION, "file://" + dataLocation.getAbsolutePath()) .mode("append") .save(location.toString()); From 3e889589b79c9e704bc515891cbc8a0db84aae85 Mon Sep 17 00:00:00 2001 From: mcheah Date: Tue, 27 Nov 2018 14:42:28 -0800 Subject: [PATCH 03/10] Minor comments --- core/src/main/java/com/netflix/iceberg/TableProperties.java | 3 ++- .../java/com/netflix/iceberg/spark/source/IcebergSource.java | 1 - .../src/main/java/com/netflix/iceberg/spark/source/Writer.java | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/core/src/main/java/com/netflix/iceberg/TableProperties.java b/core/src/main/java/com/netflix/iceberg/TableProperties.java index 4771822bf111..956f91db6052 100644 --- a/core/src/main/java/com/netflix/iceberg/TableProperties.java +++ b/core/src/main/java/com/netflix/iceberg/TableProperties.java @@ -69,5 +69,6 @@ public class TableProperties { // This only applies to files written after this property is set. Files previously written aren't relocated to // reflect this parameter. - public static final String WRITE_NEW_DATA_LOCATION = "write.data.location"; + // If not set, defaults to a "data" folder underneath the root path of the table. + public static final String WRITE_NEW_DATA_LOCATION = "write.folder-storage.path"; } diff --git a/spark/src/main/java/com/netflix/iceberg/spark/source/IcebergSource.java b/spark/src/main/java/com/netflix/iceberg/spark/source/IcebergSource.java index 2b4d53f7dce2..dd9af7067ea4 100644 --- a/spark/src/main/java/com/netflix/iceberg/spark/source/IcebergSource.java +++ b/spark/src/main/java/com/netflix/iceberg/spark/source/IcebergSource.java @@ -29,7 +29,6 @@ import com.netflix.iceberg.types.CheckCompatibility; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; - import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.sources.DataSourceRegister; diff --git a/spark/src/main/java/com/netflix/iceberg/spark/source/Writer.java b/spark/src/main/java/com/netflix/iceberg/spark/source/Writer.java index 7394745290af..fda89062fc7c 100644 --- a/spark/src/main/java/com/netflix/iceberg/spark/source/Writer.java +++ b/spark/src/main/java/com/netflix/iceberg/spark/source/Writer.java @@ -19,7 +19,6 @@ package com.netflix.iceberg.spark.source; -import com.google.common.base.Joiner; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; @@ -175,6 +174,7 @@ public String toString() { table, table.schema().asStruct(), format); } + private static class TaskCommit implements WriterCommitMessage { private final DataFile[] files; From 798cfb4478de4a09da648022bc1c39e48719eaf9 Mon Sep 17 00:00:00 2001 From: mcheah Date: Tue, 27 Nov 2018 19:11:27 -0800 Subject: [PATCH 04/10] Adjust tests. --- .../netflix/iceberg/spark/source/Writer.java | 2 +- .../spark/source/TestDataFrameWrites.java | 52 +++++++++++++++++++ .../spark/source/TestParquetWrite.java | 3 -- 3 files changed, 53 insertions(+), 4 deletions(-) diff --git a/spark/src/main/java/com/netflix/iceberg/spark/source/Writer.java b/spark/src/main/java/com/netflix/iceberg/spark/source/Writer.java index fda89062fc7c..b1cc50f3169f 100644 --- a/spark/src/main/java/com/netflix/iceberg/spark/source/Writer.java +++ b/spark/src/main/java/com/netflix/iceberg/spark/source/Writer.java @@ -174,7 +174,7 @@ public String toString() { table, table.schema().asStruct(), format); } - + private static class TaskCommit implements WriterCommitMessage { private final DataFile[] files; diff --git a/spark/src/test/java/com/netflix/iceberg/spark/source/TestDataFrameWrites.java b/spark/src/test/java/com/netflix/iceberg/spark/source/TestDataFrameWrites.java index a93675f0695f..9012250b34c3 100644 --- a/spark/src/test/java/com/netflix/iceberg/spark/source/TestDataFrameWrites.java +++ b/spark/src/test/java/com/netflix/iceberg/spark/source/TestDataFrameWrites.java @@ -32,6 +32,7 @@ import com.netflix.iceberg.spark.data.AvroDataTest; import com.netflix.iceberg.spark.data.RandomData; import com.netflix.iceberg.spark.data.SparkAvroReader; +import com.netflix.iceberg.types.Types; import org.apache.avro.generic.GenericData.Record; import org.apache.hadoop.conf.Configuration; import org.apache.spark.api.java.JavaRDD; @@ -43,10 +44,12 @@ import org.junit.AfterClass; import org.junit.Assert; import org.junit.BeforeClass; +import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; import java.io.File; import java.io.IOException; +import java.net.URI; import java.util.List; import static com.netflix.iceberg.spark.SparkSchemaUtil.convert; @@ -123,6 +126,55 @@ protected void writeAndValidate(Schema schema) throws IOException { } } + @Test + public void testWriteToOverriddenLocationByOptions() throws IOException { + File parent = temp.newFolder("parquet"); + File location = new File(parent, "test-metadata"); + Assert.assertTrue("Mkdir should succeed.", location.mkdirs()); + File tableDataLocation = new File(parent, "test-data-from-table"); + Assert.assertTrue("Mkdir should succeed.", tableDataLocation.mkdirs()); + File optionDataLocation = new File(parent, "test-data-from-options"); + Assert.assertTrue("Mkdir should succeed.", optionDataLocation.mkdirs()); + + Schema schema = new Schema( + Types.NestedField.required(0, "id", Types.LongType.get()), + Types.NestedField.optional(1, "data", Types.MapType.ofOptional(2, 3, + Types.LongType.get(), + Types.StringType.get()))); + HadoopTables tables = new HadoopTables(CONF); + Table table = tables.create(schema, PartitionSpec.unpartitioned(), location.toString()); + Schema tableSchema = table.schema(); // use the table schema because ids are reassigned + + table.updateProperties() + .set(TableProperties.DEFAULT_FILE_FORMAT, format) + .set(TableProperties.WRITE_NEW_DATA_LOCATION, tableDataLocation.getAbsolutePath()) + .commit(); + + List expected = RandomData.generateList(tableSchema, 100, 0L); + Dataset df = createDataset(expected, tableSchema); + + df.write() + .format("iceberg") + .mode("append") + .option(TableProperties.WRITE_NEW_DATA_LOCATION, optionDataLocation.getAbsolutePath()) + .save(location.toString()); + + table.refresh(); + table.currentSnapshot().addedFiles().forEach(file -> + Assert.assertTrue( + String.format("File should have the parent directory %s.", optionDataLocation.getAbsolutePath()), + URI.create(file.path().toString()).getPath().startsWith(optionDataLocation.getAbsolutePath()))); + Dataset result = spark.read() + .format("iceberg") + .load(location.toString()); + List actual = result.collectAsList(); + + Assert.assertEquals("Result size should match expected", expected.size(), actual.size()); + for (int i = 0; i < expected.size(); i += 1) { + assertEqualsSafe(tableSchema.asStruct(), expected.get(i), actual.get(i)); + } + } + private Dataset createDataset(List records, Schema schema) throws IOException { // this uses the SparkAvroReader to create a DataFrame from the list of records // it assumes that SparkAvroReader is correct diff --git a/spark/src/test/java/com/netflix/iceberg/spark/source/TestParquetWrite.java b/spark/src/test/java/com/netflix/iceberg/spark/source/TestParquetWrite.java index bcad4cf29247..ed1b5619c89d 100644 --- a/spark/src/test/java/com/netflix/iceberg/spark/source/TestParquetWrite.java +++ b/spark/src/test/java/com/netflix/iceberg/spark/source/TestParquetWrite.java @@ -72,9 +72,7 @@ public static void stopSpark() { public void testBasicWrite() throws IOException { File parent = temp.newFolder("parquet"); File location = new File(parent, "test"); - File dataLocation = new File(parent, "test-data"); location.mkdirs(); - dataLocation.mkdirs(); HadoopTables tables = new HadoopTables(CONF); PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).identity("data").build(); @@ -91,7 +89,6 @@ public void testBasicWrite() throws IOException { // TODO: incoming columns must be ordered according to the table's schema df.select("id", "data").write() .format("iceberg") - .option(TableProperties.WRITE_NEW_DATA_LOCATION, "file://" + dataLocation.getAbsolutePath()) .mode("append") .save(location.toString()); From f917697ea28ae319828f83a54b4faa90739d65d8 Mon Sep 17 00:00:00 2001 From: mcheah Date: Tue, 27 Nov 2018 20:35:58 -0800 Subject: [PATCH 05/10] Test various write locations --- .../spark/source/TestDataFrameWrites.java | 105 +++++++++--------- 1 file changed, 54 insertions(+), 51 deletions(-) diff --git a/spark/src/test/java/com/netflix/iceberg/spark/source/TestDataFrameWrites.java b/spark/src/test/java/com/netflix/iceberg/spark/source/TestDataFrameWrites.java index 9012250b34c3..4c22cd0735d9 100644 --- a/spark/src/test/java/com/netflix/iceberg/spark/source/TestDataFrameWrites.java +++ b/spark/src/test/java/com/netflix/iceberg/spark/source/TestDataFrameWrites.java @@ -37,6 +37,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.sql.DataFrameWriter; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; @@ -59,6 +60,9 @@ @RunWith(Parameterized.class) public class TestDataFrameWrites extends AvroDataTest { private static final Configuration CONF = new Configuration(); + private static final Schema BASIC_SCHEMA = new Schema( + Types.NestedField.required(0, "id", Types.LongType.get()), + Types.NestedField.optional(1, "data", Types.ListType.ofOptional(2, Types.StringType.get()))); private String format = null; @@ -94,23 +98,55 @@ public static void stopSpark() { @Override protected void writeAndValidate(Schema schema) throws IOException { + writeAndValidateWithLocations(schema, false, false); + } + + @Test + public void testWrite_overridingDataLocation_tablePropertyOnly() throws IOException { + writeAndValidateWithLocations(BASIC_SCHEMA, true, false); + } + + @Test + public void testWrite_overridingDataLocation_sourceOptionOnly() throws IOException { + writeAndValidateWithLocations(BASIC_SCHEMA, false, true); + } + + @Test + public void testWrite_overridingDataLocation_sourceOptionTakesPrecedence() throws IOException { + writeAndValidateWithLocations(BASIC_SCHEMA, true, true); + } + + private void writeAndValidateWithLocations( + Schema schema, + boolean setTablePropertyDataLocation, + boolean setWriterOptionDataLocation) throws IOException { File parent = temp.newFolder("parquet"); File location = new File(parent, "test"); Assert.assertTrue("Mkdir should succeed", location.mkdirs()); + File tablePropertyDataLocation = new File(parent, "test-table-property-data-dir"); + Assert.assertTrue("Mkdir should succeed", tablePropertyDataLocation.mkdirs()); + File writerPropertyDataLocation = new File(parent, "test-source-option-data-dir"); + Assert.assertTrue("Mkdir should succeed", writerPropertyDataLocation.mkdirs()); + HadoopTables tables = new HadoopTables(CONF); Table table = tables.create(schema, PartitionSpec.unpartitioned(), location.toString()); Schema tableSchema = table.schema(); // use the table schema because ids are reassigned table.updateProperties().set(TableProperties.DEFAULT_FILE_FORMAT, format).commit(); + if (setTablePropertyDataLocation) { + table.updateProperties().set( + TableProperties.WRITE_NEW_DATA_LOCATION, tablePropertyDataLocation.getAbsolutePath()).commit(); + } List expected = RandomData.generateList(tableSchema, 100, 0L); Dataset df = createDataset(expected, tableSchema); + DataFrameWriter writer = df.write().format("iceberg").mode("append"); + if (setWriterOptionDataLocation) { + writer = writer.option(TableProperties.WRITE_NEW_DATA_LOCATION, writerPropertyDataLocation.getAbsolutePath()); + } - df.write() - .format("iceberg") - .mode("append") - .save(location.toString()); + writer.save(location.toString()); table.refresh(); @@ -124,55 +160,22 @@ protected void writeAndValidate(Schema schema) throws IOException { for (int i = 0; i < expected.size(); i += 1) { assertEqualsSafe(tableSchema.asStruct(), expected.get(i), actual.get(i)); } - } - @Test - public void testWriteToOverriddenLocationByOptions() throws IOException { - File parent = temp.newFolder("parquet"); - File location = new File(parent, "test-metadata"); - Assert.assertTrue("Mkdir should succeed.", location.mkdirs()); - File tableDataLocation = new File(parent, "test-data-from-table"); - Assert.assertTrue("Mkdir should succeed.", tableDataLocation.mkdirs()); - File optionDataLocation = new File(parent, "test-data-from-options"); - Assert.assertTrue("Mkdir should succeed.", optionDataLocation.mkdirs()); - - Schema schema = new Schema( - Types.NestedField.required(0, "id", Types.LongType.get()), - Types.NestedField.optional(1, "data", Types.MapType.ofOptional(2, 3, - Types.LongType.get(), - Types.StringType.get()))); - HadoopTables tables = new HadoopTables(CONF); - Table table = tables.create(schema, PartitionSpec.unpartitioned(), location.toString()); - Schema tableSchema = table.schema(); // use the table schema because ids are reassigned - - table.updateProperties() - .set(TableProperties.DEFAULT_FILE_FORMAT, format) - .set(TableProperties.WRITE_NEW_DATA_LOCATION, tableDataLocation.getAbsolutePath()) - .commit(); - - List expected = RandomData.generateList(tableSchema, 100, 0L); - Dataset df = createDataset(expected, tableSchema); - - df.write() - .format("iceberg") - .mode("append") - .option(TableProperties.WRITE_NEW_DATA_LOCATION, optionDataLocation.getAbsolutePath()) - .save(location.toString()); - - table.refresh(); - table.currentSnapshot().addedFiles().forEach(file -> - Assert.assertTrue( - String.format("File should have the parent directory %s.", optionDataLocation.getAbsolutePath()), - URI.create(file.path().toString()).getPath().startsWith(optionDataLocation.getAbsolutePath()))); - Dataset result = spark.read() - .format("iceberg") - .load(location.toString()); - List actual = result.collectAsList(); - - Assert.assertEquals("Result size should match expected", expected.size(), actual.size()); - for (int i = 0; i < expected.size(); i += 1) { - assertEqualsSafe(tableSchema.asStruct(), expected.get(i), actual.get(i)); + File expectedDataDir; + if (setWriterOptionDataLocation) { + expectedDataDir = writerPropertyDataLocation; + } else if (setTablePropertyDataLocation) { + expectedDataDir = tablePropertyDataLocation; + } else { + expectedDataDir = new File(location, "data"); } + table.currentSnapshot().addedFiles().forEach(dataFile -> + Assert.assertTrue( + String.format( + "File should have the parent directory %s, but has: %s.", + expectedDataDir.getAbsolutePath(), + dataFile.path()), + URI.create(dataFile.path().toString()).getPath().startsWith(expectedDataDir.getAbsolutePath()))); } private Dataset createDataset(List records, Schema schema) throws IOException { From 16efff2e011983858637925532623525ade795d9 Mon Sep 17 00:00:00 2001 From: mcheah Date: Mon, 10 Dec 2018 16:38:44 -0800 Subject: [PATCH 06/10] Don't allow write data location to be set in data source options. --- core/src/main/java/com/netflix/iceberg/TableProperties.java | 4 ---- 1 file changed, 4 deletions(-) diff --git a/core/src/main/java/com/netflix/iceberg/TableProperties.java b/core/src/main/java/com/netflix/iceberg/TableProperties.java index 03082b974992..1257a7bdb232 100644 --- a/core/src/main/java/com/netflix/iceberg/TableProperties.java +++ b/core/src/main/java/com/netflix/iceberg/TableProperties.java @@ -66,15 +66,11 @@ public class TableProperties { public static final boolean OBJECT_STORE_ENABLED_DEFAULT = false; public static final String OBJECT_STORE_PATH = "write.object-storage.path"; -<<<<<<< HEAD // This only applies to files written after this property is set. Files previously written aren't relocated to // reflect this parameter. public static final String WRITE_NEW_DATA_LOCATION = "write.data.location"; -||||||| merged common ancestors -======= public static final String MANIFEST_LISTS_ENABLED = "write.manifest-lists.enabled"; public static final boolean MANIFEST_LISTS_ENABLED_DEFAULT = false; ->>>>>>> upstream-incubator/master } From 9755b098104ed2d893e4a271e262ac7bdb5b323b Mon Sep 17 00:00:00 2001 From: mcheah Date: Mon, 10 Dec 2018 16:46:16 -0800 Subject: [PATCH 07/10] Address some comments --- .../main/java/com/netflix/iceberg/spark/source/Writer.java | 1 + .../com/netflix/iceberg/spark/source/TestParquetWrite.java | 4 ---- 2 files changed, 1 insertion(+), 4 deletions(-) diff --git a/spark/src/main/java/com/netflix/iceberg/spark/source/Writer.java b/spark/src/main/java/com/netflix/iceberg/spark/source/Writer.java index a1df67bf68a4..c9d3a7b31221 100644 --- a/spark/src/main/java/com/netflix/iceberg/spark/source/Writer.java +++ b/spark/src/main/java/com/netflix/iceberg/spark/source/Writer.java @@ -176,6 +176,7 @@ public String toString() { table, table.schema().asStruct(), format); } + private static class TaskCommit implements WriterCommitMessage { private final DataFile[] files; diff --git a/spark/src/test/java/com/netflix/iceberg/spark/source/TestParquetWrite.java b/spark/src/test/java/com/netflix/iceberg/spark/source/TestParquetWrite.java index bcad4cf29247..20fb88dfe08e 100644 --- a/spark/src/test/java/com/netflix/iceberg/spark/source/TestParquetWrite.java +++ b/spark/src/test/java/com/netflix/iceberg/spark/source/TestParquetWrite.java @@ -72,9 +72,6 @@ public static void stopSpark() { public void testBasicWrite() throws IOException { File parent = temp.newFolder("parquet"); File location = new File(parent, "test"); - File dataLocation = new File(parent, "test-data"); - location.mkdirs(); - dataLocation.mkdirs(); HadoopTables tables = new HadoopTables(CONF); PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).identity("data").build(); @@ -91,7 +88,6 @@ public void testBasicWrite() throws IOException { // TODO: incoming columns must be ordered according to the table's schema df.select("id", "data").write() .format("iceberg") - .option(TableProperties.WRITE_NEW_DATA_LOCATION, "file://" + dataLocation.getAbsolutePath()) .mode("append") .save(location.toString()); From a651062a1d3a2be0e1adda039a3b28de12bc36a9 Mon Sep 17 00:00:00 2001 From: mcheah Date: Mon, 10 Dec 2018 16:47:05 -0800 Subject: [PATCH 08/10] Don't import --- .../java/com/netflix/iceberg/spark/source/TestParquetWrite.java | 1 - 1 file changed, 1 deletion(-) diff --git a/spark/src/test/java/com/netflix/iceberg/spark/source/TestParquetWrite.java b/spark/src/test/java/com/netflix/iceberg/spark/source/TestParquetWrite.java index 20fb88dfe08e..a2d105d780e4 100644 --- a/spark/src/test/java/com/netflix/iceberg/spark/source/TestParquetWrite.java +++ b/spark/src/test/java/com/netflix/iceberg/spark/source/TestParquetWrite.java @@ -23,7 +23,6 @@ import com.netflix.iceberg.PartitionSpec; import com.netflix.iceberg.Schema; import com.netflix.iceberg.Table; -import com.netflix.iceberg.TableProperties; import com.netflix.iceberg.hadoop.HadoopTables; import com.netflix.iceberg.types.Types; import org.apache.hadoop.conf.Configuration; From e415c834cae743a2fd050589ab23c638cb440292 Mon Sep 17 00:00:00 2001 From: mcheah Date: Mon, 10 Dec 2018 17:18:16 -0800 Subject: [PATCH 09/10] Make test less convoluted --- .../iceberg/spark/data/AvroDataTest.java | 2 +- .../spark/source/TestDataFrameWrites.java | 63 +++++++------------ 2 files changed, 22 insertions(+), 43 deletions(-) diff --git a/spark/src/test/java/com/netflix/iceberg/spark/data/AvroDataTest.java b/spark/src/test/java/com/netflix/iceberg/spark/data/AvroDataTest.java index f84c6fe8b763..bc74908d728d 100644 --- a/spark/src/test/java/com/netflix/iceberg/spark/data/AvroDataTest.java +++ b/spark/src/test/java/com/netflix/iceberg/spark/data/AvroDataTest.java @@ -38,7 +38,7 @@ public abstract class AvroDataTest { protected abstract void writeAndValidate(Schema schema) throws IOException; - private static final StructType SUPPORTED_PRIMITIVES = StructType.of( + protected static final StructType SUPPORTED_PRIMITIVES = StructType.of( required(100, "id", LongType.get()), optional(101, "data", Types.StringType.get()), required(102, "b", Types.BooleanType.get()), diff --git a/spark/src/test/java/com/netflix/iceberg/spark/source/TestDataFrameWrites.java b/spark/src/test/java/com/netflix/iceberg/spark/source/TestDataFrameWrites.java index b8ab5e459fca..8fb7bc76add1 100644 --- a/spark/src/test/java/com/netflix/iceberg/spark/source/TestDataFrameWrites.java +++ b/spark/src/test/java/com/netflix/iceberg/spark/source/TestDataFrameWrites.java @@ -60,16 +60,15 @@ @RunWith(Parameterized.class) public class TestDataFrameWrites extends AvroDataTest { private static final Configuration CONF = new Configuration(); - private static final Schema BASIC_SCHEMA = new Schema( - Types.NestedField.required(0, "id", Types.LongType.get()), - Types.NestedField.optional(1, "data", Types.ListType.ofOptional(2, Types.StringType.get()))); - private String format = null; + private final String format; @Parameterized.Parameters public static Object[][] parameters() { return new Object[][] { new Object[] { "parquet" }, + new Object[] { "parquet" }, + new Object[] { "avro" }, new Object[] { "avro" } }; } @@ -97,53 +96,41 @@ public static void stopSpark() { @Override protected void writeAndValidate(Schema schema) throws IOException { - writeAndValidateWithLocations(schema, false, false); - } - - @Test - public void testWrite_overridingDataLocation_tablePropertyOnly() throws IOException { - writeAndValidateWithLocations(BASIC_SCHEMA, true, false); - } - - @Test - public void testWrite_overridingDataLocation_sourceOptionOnly() throws IOException { - writeAndValidateWithLocations(BASIC_SCHEMA, false, true); + File location = createTableFolder(); + Table table = createTable(schema, location); + writeAndValidateWithLocations(table, location, new File(location, "data")); } @Test - public void testWrite_overridingDataLocation_sourceOptionTakesPrecedence() throws IOException { - writeAndValidateWithLocations(BASIC_SCHEMA, true, true); + public void testWriteWithCustomDataLocation() throws IOException { + File location = createTableFolder(); + File tablePropertyDataLocation = temp.newFolder("test-table-property-data-dir"); + Table table = createTable(new Schema(SUPPORTED_PRIMITIVES.fields()), location); + table.updateProperties().set( + TableProperties.WRITE_NEW_DATA_LOCATION, tablePropertyDataLocation.getAbsolutePath()).commit(); + writeAndValidateWithLocations(table, location, tablePropertyDataLocation); } - private void writeAndValidateWithLocations( - Schema schema, - boolean setTablePropertyDataLocation, - boolean setWriterOptionDataLocation) throws IOException { + private File createTableFolder() throws IOException { File parent = temp.newFolder("parquet"); File location = new File(parent, "test"); Assert.assertTrue("Mkdir should succeed", location.mkdirs()); + return location; + } - File tablePropertyDataLocation = new File(parent, "test-table-property-data-dir"); - Assert.assertTrue("Mkdir should succeed", tablePropertyDataLocation.mkdirs()); - File writerPropertyDataLocation = new File(parent, "test-source-option-data-dir"); - Assert.assertTrue("Mkdir should succeed", writerPropertyDataLocation.mkdirs()); - + private Table createTable(Schema schema, File location) { HadoopTables tables = new HadoopTables(CONF); - Table table = tables.create(schema, PartitionSpec.unpartitioned(), location.toString()); + return tables.create(schema, PartitionSpec.unpartitioned(), location.toString()); + } + + private void writeAndValidateWithLocations(Table table, File location, File expectedDataDir) throws IOException { Schema tableSchema = table.schema(); // use the table schema because ids are reassigned table.updateProperties().set(TableProperties.DEFAULT_FILE_FORMAT, format).commit(); - if (setTablePropertyDataLocation) { - table.updateProperties().set( - TableProperties.WRITE_NEW_DATA_LOCATION, tablePropertyDataLocation.getAbsolutePath()).commit(); - } List expected = RandomData.generateList(tableSchema, 100, 0L); Dataset df = createDataset(expected, tableSchema); DataFrameWriter writer = df.write().format("iceberg").mode("append"); - if (setWriterOptionDataLocation) { - writer = writer.option(TableProperties.WRITE_NEW_DATA_LOCATION, writerPropertyDataLocation.getAbsolutePath()); - } writer.save(location.toString()); @@ -160,14 +147,6 @@ private void writeAndValidateWithLocations( assertEqualsSafe(tableSchema.asStruct(), expected.get(i), actual.get(i)); } - File expectedDataDir; - if (setWriterOptionDataLocation) { - expectedDataDir = writerPropertyDataLocation; - } else if (setTablePropertyDataLocation) { - expectedDataDir = tablePropertyDataLocation; - } else { - expectedDataDir = new File(location, "data"); - } table.currentSnapshot().addedFiles().forEach(dataFile -> Assert.assertTrue( String.format( From cc1f33f0d001714f68b2d210450df99192df4f2e Mon Sep 17 00:00:00 2001 From: mcheah Date: Tue, 11 Dec 2018 11:35:49 -0800 Subject: [PATCH 10/10] Remove duplicate tests --- .../com/netflix/iceberg/spark/source/TestDataFrameWrites.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/spark/src/test/java/com/netflix/iceberg/spark/source/TestDataFrameWrites.java b/spark/src/test/java/com/netflix/iceberg/spark/source/TestDataFrameWrites.java index 8fb7bc76add1..05f8f80b9039 100644 --- a/spark/src/test/java/com/netflix/iceberg/spark/source/TestDataFrameWrites.java +++ b/spark/src/test/java/com/netflix/iceberg/spark/source/TestDataFrameWrites.java @@ -67,8 +67,6 @@ public class TestDataFrameWrites extends AvroDataTest { public static Object[][] parameters() { return new Object[][] { new Object[] { "parquet" }, - new Object[] { "parquet" }, - new Object[] { "avro" }, new Object[] { "avro" } }; }