From cd20e1aff121ad02d3479f76dcc4b4db42231e8d Mon Sep 17 00:00:00 2001 From: Arina Ielchiieva Date: Sun, 21 Jul 2019 18:38:21 +0300 Subject: [PATCH] Support write mode for Parquet and Avro using table property --- .../org/apache/iceberg/TableProperties.java | 6 + .../java/org/apache/iceberg/avro/Avro.java | 25 +++- .../apache/iceberg/avro/AvroFileAppender.java | 6 +- .../apache/iceberg/avro/TestAvroWrite.java | 116 ++++++++++++++++++ .../org/apache/iceberg/parquet/Parquet.java | 15 ++- .../apache/iceberg/parquet/ParquetWriter.java | 5 +- .../parquet/BaseParquetWritingTest.java | 25 ++-- .../apache/iceberg/parquet/TestParquet.java | 111 ++++++++++++++--- site/docs/configuration.md | 2 + 9 files changed, 273 insertions(+), 38 deletions(-) create mode 100644 core/src/test/java/org/apache/iceberg/avro/TestAvroWrite.java diff --git a/core/src/main/java/org/apache/iceberg/TableProperties.java b/core/src/main/java/org/apache/iceberg/TableProperties.java index d426dd3ba878..848aeb8c4fb6 100644 --- a/core/src/main/java/org/apache/iceberg/TableProperties.java +++ b/core/src/main/java/org/apache/iceberg/TableProperties.java @@ -56,9 +56,15 @@ private TableProperties() {} public static final String PARQUET_COMPRESSION = "write.parquet.compression-codec"; public static final String PARQUET_COMPRESSION_DEFAULT = "gzip"; + public static final String PARQUET_WRITE_MODE = "write.parquet.write-mode"; + public static final String PARQUET_WRITE_MODE_DEFAULT = "overwrite"; + public static final String AVRO_COMPRESSION = "write.avro.compression-codec"; public static final String AVRO_COMPRESSION_DEFAULT = "gzip"; + public static final String AVRO_WRITE_MODE = "write.avro.write-mode"; + public static final String AVRO_WRITE_MODE_DEFAULT = "overwrite"; + public static final String SPLIT_SIZE = "read.split.target-size"; public static final long SPLIT_SIZE_DEFAULT = 134217728; // 128 MB diff --git a/core/src/main/java/org/apache/iceberg/avro/Avro.java b/core/src/main/java/org/apache/iceberg/avro/Avro.java index 65ba9d833cef..c26f41c57fe9 100644 --- a/core/src/main/java/org/apache/iceberg/avro/Avro.java +++ b/core/src/main/java/org/apache/iceberg/avro/Avro.java @@ -34,12 +34,15 @@ import org.apache.avro.io.DatumWriter; import org.apache.avro.specific.SpecificData; import org.apache.iceberg.SchemaParser; +import org.apache.iceberg.Table; import org.apache.iceberg.io.FileAppender; import org.apache.iceberg.io.InputFile; import org.apache.iceberg.io.OutputFile; import static org.apache.iceberg.TableProperties.AVRO_COMPRESSION; import static org.apache.iceberg.TableProperties.AVRO_COMPRESSION_DEFAULT; +import static org.apache.iceberg.TableProperties.AVRO_WRITE_MODE; +import static org.apache.iceberg.TableProperties.AVRO_WRITE_MODE_DEFAULT; public class Avro { private Avro() { @@ -73,6 +76,11 @@ public CodecFactory get() { DEFAULT_MODEL.addLogicalTypeConversion(new UUIDConversion()); } + public enum WriteMode { + CREATE, + OVERWRITE + } + public static WriteBuilder write(OutputFile file) { return new WriteBuilder(file); } @@ -89,6 +97,12 @@ private WriteBuilder(OutputFile file) { this.file = file; } + public WriteBuilder forTable(Table table) { + schema(table.schema()); + setAll(table.properties()); + return this; + } + public WriteBuilder schema(org.apache.iceberg.Schema newSchema) { this.schema = newSchema; return this; @@ -133,6 +147,15 @@ private CodecFactory codec() { } } + private WriteMode writeMode() { + String writeMode = config.getOrDefault(AVRO_WRITE_MODE, AVRO_WRITE_MODE_DEFAULT); + try { + return WriteMode.valueOf(writeMode.toUpperCase(Locale.ENGLISH)); + } catch (IllegalArgumentException e) { + throw new IllegalArgumentException("Unsupported write mode: " + writeMode); + } + } + public FileAppender build() throws IOException { Preconditions.checkNotNull(schema, "Schema is required"); Preconditions.checkNotNull(name, "Table name is required and cannot be null"); @@ -141,7 +164,7 @@ public FileAppender build() throws IOException { meta("iceberg.schema", SchemaParser.toJson(schema)); return new AvroFileAppender<>( - AvroSchemaUtil.convert(schema, name), file, createWriterFunc, codec(), metadata); + AvroSchemaUtil.convert(schema, name), file, createWriterFunc, codec(), metadata, writeMode()); } } diff --git a/core/src/main/java/org/apache/iceberg/avro/AvroFileAppender.java b/core/src/main/java/org/apache/iceberg/avro/AvroFileAppender.java index 8cc0712da9b1..c5a347daca92 100644 --- a/core/src/main/java/org/apache/iceberg/avro/AvroFileAppender.java +++ b/core/src/main/java/org/apache/iceberg/avro/AvroFileAppender.java @@ -39,8 +39,9 @@ class AvroFileAppender implements FileAppender { AvroFileAppender(Schema schema, OutputFile file, Function> createWriterFunc, - CodecFactory codec, Map metadata) throws IOException { - this.stream = file.create(); + CodecFactory codec, Map metadata, + Avro.WriteMode writeMode) throws IOException { + this.stream = writeMode == Avro.WriteMode.CREATE ? file.create() : file.createOrOverwrite(); this.writer = newAvroWriter(schema, stream, createWriterFunc, codec, metadata); } @@ -92,7 +93,6 @@ private static DataFileWriter newAvroWriter( writer.setMeta(entry.getKey(), entry.getValue()); } - // TODO: support overwrite return writer.create(schema, stream); } } diff --git a/core/src/test/java/org/apache/iceberg/avro/TestAvroWrite.java b/core/src/test/java/org/apache/iceberg/avro/TestAvroWrite.java new file mode 100644 index 000000000000..bc046d939bae --- /dev/null +++ b/core/src/test/java/org/apache/iceberg/avro/TestAvroWrite.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.iceberg.avro; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; +import java.io.File; +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import org.apache.avro.generic.GenericData; +import org.apache.iceberg.Files; +import org.apache.iceberg.Schema; +import org.apache.iceberg.TableProperties; +import org.apache.iceberg.exceptions.AlreadyExistsException; +import org.apache.iceberg.io.FileAppender; +import org.apache.iceberg.types.Types; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.rules.TemporaryFolder; + +public class TestAvroWrite { + + @Rule + public TemporaryFolder temp = new TemporaryFolder(); + + @Rule + public ExpectedException thrown = ExpectedException.none(); + + @Test + public void testWriteCreate() throws IOException { + Map properties = ImmutableMap.of(TableProperties.AVRO_WRITE_MODE, "create"); + + File file = write(10, properties, null); + + thrown.expect(AlreadyExistsException.class); + write(5, properties, file); + } + + @Test + public void testWriteOverwrite() throws IOException { + Map properties = ImmutableMap.of(TableProperties.AVRO_WRITE_MODE, "overwrite"); + + File file = write(10, properties, null); + Assert.assertEquals(10, Lists.newArrayList(Avro.read(Files.localInput(file)) + .project(schema()) + .build() + ).size()); + + write(5, properties, file); + Assert.assertEquals(5, Lists.newArrayList(Avro.read(Files.localInput(file)) + .project(schema()) + .build() + ).size()); + } + + @Test + public void testUnsupportedWriteMode() throws IOException { + thrown.expect(IllegalArgumentException.class); + write(10, ImmutableMap.of(TableProperties.AVRO_WRITE_MODE, "abc"), null); + } + + private Schema schema() { + return new Schema( + Types.NestedField.required(0, "col_int", Types.IntegerType.get()) + ); + } + + private File write(int recordsNumber, Map properties, File file) throws IOException { + Schema schema = schema(); + org.apache.avro.Schema avroSchema = AvroSchemaUtil.convert(schema, "table"); + + List records = IntStream.rangeClosed(1, recordsNumber) + .mapToObj(index -> { + GenericData.Record record = new GenericData.Record(avroSchema); + record.put("col_int", index); + return record; + }) + .collect(Collectors.toList()); + + File outputFile = file; + if (outputFile == null) { + outputFile = temp.newFile(); + Assert.assertTrue("File should have been deleted", outputFile.delete()); + } + + try (FileAppender appender = Avro.write(Files.localOutput(outputFile)) + .schema(schema) + .setAll(properties) + .build()) { + appender.addAll(records); + } + + return outputFile; + } +} diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java index 494f2c57885e..117d147842bf 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java @@ -64,6 +64,8 @@ import static org.apache.iceberg.TableProperties.PARQUET_PAGE_SIZE_BYTES_DEFAULT; import static org.apache.iceberg.TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES; import static org.apache.iceberg.TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES_DEFAULT; +import static org.apache.iceberg.TableProperties.PARQUET_WRITE_MODE; +import static org.apache.iceberg.TableProperties.PARQUET_WRITE_MODE_DEFAULT; public class Parquet { private Parquet() { @@ -159,6 +161,15 @@ private CompressionCodecName codec() { } } + private ParquetFileWriter.Mode writeMode() { + String writeMode = config.getOrDefault(PARQUET_WRITE_MODE, PARQUET_WRITE_MODE_DEFAULT); + try { + return ParquetFileWriter.Mode.valueOf(writeMode.toUpperCase(Locale.ENGLISH)); + } catch (IllegalArgumentException e) { + throw new IllegalArgumentException("Unsupported write mode: " + writeMode); + } + } + public FileAppender build() throws IOException { Preconditions.checkNotNull(schema, "Schema is required"); Preconditions.checkNotNull(name, "Table name is required and cannot be null"); @@ -201,7 +212,7 @@ public FileAppender build() throws IOException { return new org.apache.iceberg.parquet.ParquetWriter<>( conf, file, schema, rowGroupSize, metadata, createWriterFunc, codec(), - parquetProperties, metricsConfig); + parquetProperties, metricsConfig, writeMode()); } else { return new ParquetWriteAdapter<>(new ParquetWriteBuilder(ParquetIO.file(file)) .withWriterVersion(writerVersion) @@ -210,7 +221,7 @@ conf, file, schema, rowGroupSize, metadata, createWriterFunc, codec(), .setKeyValueMetadata(metadata) .setWriteSupport(getWriteSupport(type)) .withCompressionCodec(codec()) - .withWriteMode(ParquetFileWriter.Mode.OVERWRITE) // TODO: support modes + .withWriteMode(writeMode()) .withRowGroupSize(rowGroupSize) .withPageSize(pageSize) .withDictionaryPageSize(dictionaryPageSize) diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetWriter.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetWriter.java index c0956a11684e..f08e9dedf288 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetWriter.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetWriter.java @@ -83,7 +83,8 @@ class ParquetWriter implements FileAppender, Closeable { Function> createWriterFunc, CompressionCodecName codec, ParquetProperties properties, - MetricsConfig metricsConfig) { + MetricsConfig metricsConfig, + ParquetFileWriter.Mode writeMode) { this.output = output; this.targetRowGroupSize = rowGroupSize; this.props = properties; @@ -95,7 +96,7 @@ class ParquetWriter implements FileAppender, Closeable { try { this.writer = new ParquetFileWriter(ParquetIO.file(output, conf), parquetSchema, - ParquetFileWriter.Mode.OVERWRITE, rowGroupSize, 0); + writeMode, rowGroupSize, 0); } catch (IOException e) { throw new RuntimeIOException(e, "Failed to create Parquet file"); } diff --git a/parquet/src/test/java/org/apache/iceberg/parquet/BaseParquetWritingTest.java b/parquet/src/test/java/org/apache/iceberg/parquet/BaseParquetWritingTest.java index d587b6442706..5fa2f2d4a706 100644 --- a/parquet/src/test/java/org/apache/iceberg/parquet/BaseParquetWritingTest.java +++ b/parquet/src/test/java/org/apache/iceberg/parquet/BaseParquetWritingTest.java @@ -45,23 +45,28 @@ public abstract class BaseParquetWritingTest { public TemporaryFolder temp = new TemporaryFolder(); File writeRecords(Schema schema, GenericData.Record... records) throws IOException { - return writeRecords(schema, Collections.emptyMap(), null, records); + return writeRecords(schema, Collections.emptyMap(), null, null, records); } - File writeRecords( - Schema schema, Map properties, - Function> createWriterFunc, - GenericData.Record... records) throws IOException { - File tmpFolder = temp.newFolder("parquet"); - String filename = UUID.randomUUID().toString(); - File file = new File(tmpFolder, FileFormat.PARQUET.addExtension(filename)); - try (FileAppender writer = Parquet.write(localOutput(file)) + File writeRecords(Schema schema, + Map properties, + Function> createWriterFunc, + File file, + GenericData.Record... records) throws IOException { + File outputFile = file; + if (outputFile == null) { + File tmpFolder = temp.newFolder("parquet"); + String filename = UUID.randomUUID().toString(); + outputFile = new File(tmpFolder, FileFormat.PARQUET.addExtension(filename)); + } + + try (FileAppender writer = Parquet.write(localOutput(outputFile)) .schema(schema) .setAll(properties) .createWriterFunc(createWriterFunc) .build()) { writer.addAll(Lists.newArrayList(records)); } - return file; + return outputFile; } } diff --git a/parquet/src/test/java/org/apache/iceberg/parquet/TestParquet.java b/parquet/src/test/java/org/apache/iceberg/parquet/TestParquet.java index 9429e45cdd30..1df62a917f1c 100644 --- a/parquet/src/test/java/org/apache/iceberg/parquet/TestParquet.java +++ b/parquet/src/test/java/org/apache/iceberg/parquet/TestParquet.java @@ -22,17 +22,24 @@ import com.google.common.collect.ImmutableMap; import java.io.File; import java.io.IOException; -import java.util.ArrayList; import java.util.List; +import java.util.Map; import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + import org.apache.avro.generic.GenericData; import org.apache.iceberg.Schema; +import org.apache.iceberg.TableProperties; import org.apache.iceberg.avro.AvroSchemaUtil; +import org.apache.iceberg.exceptions.AlreadyExistsException; import org.apache.iceberg.types.Types.IntegerType; import org.apache.parquet.hadoop.ParquetFileReader; import org.apache.parquet.schema.MessageType; import org.junit.Assert; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.ExpectedException; import static org.apache.iceberg.Files.localInput; import static org.apache.iceberg.TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES; @@ -40,6 +47,9 @@ public class TestParquet extends BaseParquetWritingTest { + @Rule + public ExpectedException thrown = ExpectedException.none(); + @Test public void testRowGroupSizeConfigurable() throws IOException { // Without an explicit writer function @@ -59,32 +69,93 @@ public void testRowGroupSizeConfigurableWithWriter() throws IOException { } } + @Test + public void testCreateModeWithoutWriterFunc() throws IOException { + Map properties = ImmutableMap.of(TableProperties.PARQUET_WRITE_MODE, "create"); + + File file = write(null, properties, 10, null); + + thrown.expect(AlreadyExistsException.class); + write(null, properties, 5, file); + } + + @Test + public void testCreateModeWithWriterFunc() throws IOException { + Map properties = ImmutableMap.of(TableProperties.PARQUET_WRITE_MODE, "create"); + + File file = write(ParquetAvroWriter::buildWriter, properties, 10, null); + + thrown.expect(AlreadyExistsException.class); + write(ParquetAvroWriter::buildWriter, properties, 5, file); + } + + @Test + public void testOverwriteModeWithoutWriterFunc() throws IOException { + Map properties = ImmutableMap.of(TableProperties.PARQUET_WRITE_MODE, "overwrite"); + + File file = write(null, properties, 10, null); + try (ParquetFileReader reader = ParquetFileReader.open(ParquetIO.file(localInput(file)))) { + Assert.assertEquals(10, reader.getRecordCount()); + } + + write(null, properties, 5, file); + try (ParquetFileReader reader = ParquetFileReader.open(ParquetIO.file(localInput(file)))) { + Assert.assertEquals(5, reader.getRecordCount()); + } + } + + @Test + public void testOverwriteModeWithWriterFunc() throws IOException { + Map properties = ImmutableMap.of(TableProperties.PARQUET_WRITE_MODE, "overwrite"); + + File file = write(ParquetAvroWriter::buildWriter, properties, 10, null); + try (ParquetFileReader reader = ParquetFileReader.open(ParquetIO.file(localInput(file)))) { + Assert.assertEquals(10, reader.getRecordCount()); + } + + write(ParquetAvroWriter::buildWriter, properties, 5, file); + try (ParquetFileReader reader = ParquetFileReader.open(ParquetIO.file(localInput(file)))) { + Assert.assertEquals(5, reader.getRecordCount()); + } + } + + @Test + public void testUnsupportedWriteMode() throws IOException { + thrown.expect(IllegalArgumentException.class); + write(null, ImmutableMap.of(TableProperties.PARQUET_WRITE_MODE, "abc"), 10, null); + } + private File generateFileWithTwoRowGroups(Function> createWriterFunc) throws IOException { - Schema schema = new Schema( - optional(1, "intCol", IntegerType.get()) - ); - int minimumRowGroupRecordCount = 100; int desiredRecordCount = minimumRowGroupRecordCount + 1; - List records = new ArrayList<>(desiredRecordCount); - org.apache.avro.Schema avroSchema = AvroSchemaUtil.convert(schema.asStruct()); - for (int i = 1; i <= desiredRecordCount; i++) { - GenericData.Record record = new GenericData.Record(avroSchema); - record.put("intCol", i); - records.add(record); - } - // Force multiple row groups by making the byte size very small // Note there'a also minimumRowGroupRecordCount which cannot be configured so we have to write // at least that many records for a new row group to occur - return writeRecords( - schema, - ImmutableMap.of( - PARQUET_ROW_GROUP_SIZE_BYTES, - Integer.toString(minimumRowGroupRecordCount * Integer.BYTES)), - createWriterFunc, - records.toArray(new GenericData.Record[] {})); + Map properties = ImmutableMap.of( + PARQUET_ROW_GROUP_SIZE_BYTES, Integer.toString(minimumRowGroupRecordCount * Integer.BYTES)); + + return write(createWriterFunc, properties, desiredRecordCount, null); + } + + private File write(Function> createWriterFunc, + Map properties, + int recordsNumber, + File file) throws IOException { + Schema schema = new Schema( + optional(1, "intCol", IntegerType.get()) + ); + + org.apache.avro.Schema avroSchema = AvroSchemaUtil.convert(schema.asStruct()); + List records = IntStream.rangeClosed(1, recordsNumber) + .mapToObj(index -> { + GenericData.Record record = new GenericData.Record(avroSchema); + record.put("intCol", index); + return record; + }) + .collect(Collectors.toList()); + + return writeRecords(schema, properties, createWriterFunc, file, records.toArray(new GenericData.Record[] {})); } } diff --git a/site/docs/configuration.md b/site/docs/configuration.md index 3b54405889f9..449aa2d1b1ff 100644 --- a/site/docs/configuration.md +++ b/site/docs/configuration.md @@ -21,7 +21,9 @@ Iceberg tables support table properties to configure table behavior, like the de | write.parquet.page-size-bytes | 1048576 (1 MB) | Parquet page size | | write.parquet.dict-size-bytes | 2097152 (2 MB) | Parquet dictionary page size | | write.parquet.compression-codec | gzip | Parquet compression codec | +| write.parquet.write-mode | overwrite | Parquet write mode | | write.avro.compression-codec | gzip | Avro compression codec | +| write.avro.write-mode | overwrite | Avro write mode | | write.metadata.compression-codec | none | Metadata compression codec; none or gzip | | write.metadata.metrics.default | truncate(16) | Default metrics mode for all columns in the table; none, counts, truncate(length), or full | | write.metadata.metrics.column.col1 | (not set) | Metrics mode for column 'col1' to allow per-column tuning; none, counts, truncate(length), or full |