diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/InputFormatConfig.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/InputFormatConfig.java index 2ee6627789b5..a7ba33624b30 100644 --- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/InputFormatConfig.java +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/InputFormatConfig.java @@ -54,6 +54,7 @@ private InputFormatConfig() { public static final String TABLE_CATALOG_PREFIX = "iceberg.mr.table.catalog."; public static final String LOCALITY = "iceberg.mr.locality"; public static final String WRITE_FANOUT_ENABLED = "write.fanout.enabled"; + public static final String VARIANT_SHREDDING_ENABLED = "variant.shredding.enabled"; public static final String CTAS_TABLE_NAME = "iceberg.mr.ctas.table.name"; public static final String SELECTED_COLUMNS = "iceberg.mr.selected.columns"; diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/serde/objectinspector/IcebergVariantObjectInspector.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/serde/objectinspector/IcebergVariantObjectInspector.java index 192d4b25bd02..8afae4c4895c 100644 --- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/serde/objectinspector/IcebergVariantObjectInspector.java +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/serde/objectinspector/IcebergVariantObjectInspector.java @@ -20,6 +20,7 @@ package org.apache.iceberg.mr.hive.serde.objectinspector; import java.nio.ByteBuffer; +import java.nio.ByteOrder; import java.util.List; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.StructField; @@ -61,11 +62,13 @@ public Object getStructFieldData(Object data, StructField fieldRef) { switch (field.getFieldID()) { case 0: // "metadata" field (binary) - ByteBuffer metadata = ByteBuffer.allocate(variant.metadata().sizeInBytes()); + ByteBuffer metadata = ByteBuffer.allocate(variant.metadata().sizeInBytes()) + .order(ByteOrder.LITTLE_ENDIAN); variant.metadata().writeTo(metadata, 0); return metadata.array(); case 1: // "value" field (binary) - ByteBuffer value = ByteBuffer.allocate(variant.value().sizeInBytes()); + ByteBuffer value = ByteBuffer.allocate(variant.value().sizeInBytes()) + .order(ByteOrder.LITTLE_ENDIAN); variant.value().writeTo(value, 0); return value.array(); default: @@ -79,10 +82,12 @@ public List getStructFieldsDataAsList(Object data) { return null; } Variant variant = (Variant) data; - ByteBuffer metadata = ByteBuffer.allocate(variant.metadata().sizeInBytes()); + ByteBuffer metadata = ByteBuffer.allocate(variant.metadata().sizeInBytes()) + .order(ByteOrder.LITTLE_ENDIAN); variant.metadata().writeTo(metadata, 0); - ByteBuffer value = ByteBuffer.allocate(variant.value().sizeInBytes()); + ByteBuffer value = ByteBuffer.allocate(variant.value().sizeInBytes()) + .order(ByteOrder.LITTLE_ENDIAN); variant.value().writeTo(value, 0); // Return the data for our fields in the correct order: metadata, value diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/HiveFileWriterFactory.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/HiveFileWriterFactory.java index 7f106f254804..6489c78f9edf 100644 --- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/HiveFileWriterFactory.java +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/HiveFileWriterFactory.java @@ -19,6 +19,8 @@ package org.apache.iceberg.mr.hive.writer; +import java.util.Map; +import java.util.function.Supplier; import org.apache.iceberg.FileFormat; import org.apache.iceberg.Schema; import org.apache.iceberg.SortOrder; @@ -31,9 +33,13 @@ import org.apache.iceberg.data.parquet.GenericParquetWriter; import org.apache.iceberg.orc.ORC; import org.apache.iceberg.parquet.Parquet; +import org.apache.iceberg.parquet.VariantUtil; class HiveFileWriterFactory extends BaseFileWriterFactory { + private final Map properties; + private Supplier sampleRecord = null; + HiveFileWriterFactory( Table table, FileFormat dataFileFormat, @@ -54,6 +60,7 @@ class HiveFileWriterFactory extends BaseFileWriterFactory { equalityDeleteRowSchema, equalityDeleteSortOrder, positionDeleteRowSchema); + properties = table.properties(); } static Builder builderFor(Table table) { @@ -78,6 +85,9 @@ protected void configurePositionDelete(Avro.DeleteWriteBuilder builder) { @Override protected void configureDataWrite(Parquet.DataWriteBuilder builder) { builder.createWriterFunc(GenericParquetWriter::create); + // Configure variant shredding function if conditions are met: + VariantUtil.variantShreddingFunc(dataSchema(), sampleRecord, properties) + .ifPresent(builder::variantShreddingFunc); } @Override @@ -149,4 +159,14 @@ HiveFileWriterFactory build() { positionDeleteRowSchema); } } + + /** + * Set a sample record to use for data-driven variant shredding schema generation. + * Should be called before the Parquet writer is created. + */ + public void initialize(Supplier record) { + if (sampleRecord == null) { + sampleRecord = record; + } + } } diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/HiveIcebergCopyOnWriteRecordWriter.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/HiveIcebergCopyOnWriteRecordWriter.java index f3c04c279e74..e8d5ed2a8e4b 100644 --- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/HiveIcebergCopyOnWriteRecordWriter.java +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/HiveIcebergCopyOnWriteRecordWriter.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.util.List; +import java.util.Set; import org.apache.hadoop.io.Writable; import org.apache.iceberg.DataFile; import org.apache.iceberg.DataFiles; @@ -28,6 +29,7 @@ import org.apache.iceberg.data.GenericRecord; import org.apache.iceberg.data.Record; import org.apache.iceberg.deletes.PositionDelete; +import org.apache.iceberg.hive.HiveSchemaUtil; import org.apache.iceberg.io.DataWriteResult; import org.apache.iceberg.io.OutputFileFactory; import org.apache.iceberg.mr.hive.FilesForCommit; @@ -35,14 +37,19 @@ import org.apache.iceberg.mr.hive.writer.WriterBuilder.Context; import org.apache.iceberg.mr.mapred.Container; import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.types.Types; class HiveIcebergCopyOnWriteRecordWriter extends HiveIcebergWriterBase { private final int currentSpecId; + private final Set missingColumns; + private final List missingOrStructFields; private final GenericRecord rowDataTemplate; private final List replacedDataFiles; + private final HiveFileWriterFactory fileWriterFactory; + HiveIcebergCopyOnWriteRecordWriter(Table table, HiveFileWriterFactory writerFactory, OutputFileFactory deleteFileFactory, Context context) { super(table, newDataWriter(table, writerFactory, deleteFileFactory, context)); @@ -50,6 +57,12 @@ class HiveIcebergCopyOnWriteRecordWriter extends HiveIcebergWriterBase { this.currentSpecId = table.spec().specId(); this.rowDataTemplate = GenericRecord.create(table.schema()); this.replacedDataFiles = Lists.newArrayList(); + + this.missingColumns = context.missingColumns(); + this.missingOrStructFields = specs.get(currentSpecId).schema().asStruct().fields().stream() + .filter(field -> missingColumns.contains(field.name()) || field.type().isStructType()) + .toList(); + this.fileWriterFactory = writerFactory; } @Override @@ -69,6 +82,8 @@ public void write(Writable row) throws IOException { .build(); replacedDataFiles.add(dataFile); } else { + HiveSchemaUtil.setDefaultValues(rowData, missingOrStructFields, missingColumns); + fileWriterFactory.initialize(() -> rowData); writer.write(rowData, specs.get(currentSpecId), partition(rowData, currentSpecId)); } } diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/HiveIcebergRecordWriter.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/HiveIcebergRecordWriter.java index 9adfd15dc20c..ca9d232e3d58 100644 --- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/HiveIcebergRecordWriter.java +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/HiveIcebergRecordWriter.java @@ -40,6 +40,8 @@ class HiveIcebergRecordWriter extends HiveIcebergWriterBase { private final Set missingColumns; private final List missingOrStructFields; + private final HiveFileWriterFactory fileWriterFactory; + HiveIcebergRecordWriter(Table table, HiveFileWriterFactory fileWriterFactory, OutputFileFactory dataFileFactory, Context context) { super(table, newDataWriter(table, fileWriterFactory, dataFileFactory, context)); @@ -47,18 +49,19 @@ class HiveIcebergRecordWriter extends HiveIcebergWriterBase { this.currentSpecId = table.spec().specId(); this.missingColumns = context.missingColumns(); this.missingOrStructFields = specs.get(currentSpecId).schema().asStruct().fields().stream() - .filter(field -> missingColumns.contains(field.name()) || field.type().isStructType()).toList(); + .filter(field -> missingColumns.contains(field.name()) || field.type().isStructType()) + .toList(); + this.fileWriterFactory = fileWriterFactory; } @Override public void write(Writable row) throws IOException { Record record = ((Container) row).get(); HiveSchemaUtil.setDefaultValues(record, missingOrStructFields, missingColumns); - + fileWriterFactory.initialize(() -> record); writer.write(record, specs.get(currentSpecId), partition(record, currentSpecId)); } - @Override public FilesForCommit files() { List dataFiles = ((DataWriteResult) writer.result()).dataFiles(); diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/parquet/Parquet.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/parquet/Parquet.java new file mode 100644 index 000000000000..56e3f3480c74 --- /dev/null +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/parquet/Parquet.java @@ -0,0 +1,1524 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.parquet; + +import java.io.File; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Collection; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Objects; +import java.util.function.BiConsumer; +import java.util.function.BiFunction; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.Files; +import org.apache.iceberg.InternalData; +import org.apache.iceberg.MetricsConfig; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.SchemaParser; +import org.apache.iceberg.SortOrder; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.SystemConfigs; +import org.apache.iceberg.Table; +import org.apache.iceberg.avro.AvroSchemaUtil; +import org.apache.iceberg.data.parquet.GenericParquetWriter; +import org.apache.iceberg.deletes.EqualityDeleteWriter; +import org.apache.iceberg.deletes.PositionDeleteWriter; +import org.apache.iceberg.encryption.EncryptedOutputFile; +import org.apache.iceberg.encryption.EncryptionKeyMetadata; +import org.apache.iceberg.encryption.NativeEncryptionInputFile; +import org.apache.iceberg.encryption.NativeEncryptionOutputFile; +import org.apache.iceberg.exceptions.RuntimeIOException; +import org.apache.iceberg.expressions.Expression; +import org.apache.iceberg.hadoop.HadoopInputFile; +import org.apache.iceberg.hadoop.HadoopOutputFile; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.DataWriter; +import org.apache.iceberg.io.DeleteSchemaUtil; +import org.apache.iceberg.io.FileAppender; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.mapping.NameMapping; +import org.apache.iceberg.parquet.ParquetValueWriters.PositionDeleteStructWriter; +import org.apache.iceberg.parquet.ParquetValueWriters.StructWriter; +import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.util.ArrayUtil; +import org.apache.iceberg.util.ByteBuffers; +import org.apache.iceberg.util.PropertyUtil; +import org.apache.parquet.HadoopReadOptions; +import org.apache.parquet.ParquetReadOptions; +import org.apache.parquet.avro.AvroReadSupport; +import org.apache.parquet.avro.AvroWriteSupport; +import org.apache.parquet.column.ParquetProperties; +import org.apache.parquet.column.ParquetProperties.WriterVersion; +import org.apache.parquet.conf.PlainParquetConfiguration; +import org.apache.parquet.crypto.FileDecryptionProperties; +import org.apache.parquet.crypto.FileEncryptionProperties; +import org.apache.parquet.hadoop.ParquetFileReader; +import org.apache.parquet.hadoop.ParquetFileWriter; +import org.apache.parquet.hadoop.ParquetOutputFormat; +import org.apache.parquet.hadoop.ParquetReader; +import org.apache.parquet.hadoop.ParquetWriter; +import org.apache.parquet.hadoop.api.ReadSupport; +import org.apache.parquet.hadoop.api.WriteSupport; +import org.apache.parquet.hadoop.metadata.CompressionCodecName; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.Type.ID; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.iceberg.TableProperties.DELETE_PARQUET_COMPRESSION; +import static org.apache.iceberg.TableProperties.DELETE_PARQUET_COMPRESSION_LEVEL; +import static org.apache.iceberg.TableProperties.DELETE_PARQUET_DICT_SIZE_BYTES; +import static org.apache.iceberg.TableProperties.DELETE_PARQUET_PAGE_ROW_LIMIT; +import static org.apache.iceberg.TableProperties.DELETE_PARQUET_PAGE_SIZE_BYTES; +import static org.apache.iceberg.TableProperties.DELETE_PARQUET_ROW_GROUP_CHECK_MAX_RECORD_COUNT; +import static org.apache.iceberg.TableProperties.DELETE_PARQUET_ROW_GROUP_CHECK_MIN_RECORD_COUNT; +import static org.apache.iceberg.TableProperties.DELETE_PARQUET_ROW_GROUP_SIZE_BYTES; +import static org.apache.iceberg.TableProperties.PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX; +import static org.apache.iceberg.TableProperties.PARQUET_BLOOM_FILTER_COLUMN_FPP_PREFIX; +import static org.apache.iceberg.TableProperties.PARQUET_BLOOM_FILTER_MAX_BYTES; +import static org.apache.iceberg.TableProperties.PARQUET_BLOOM_FILTER_MAX_BYTES_DEFAULT; +import static org.apache.iceberg.TableProperties.PARQUET_COLUMN_STATS_ENABLED_PREFIX; +import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION; +import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION_DEFAULT; +import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION_LEVEL; +import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION_LEVEL_DEFAULT; +import static org.apache.iceberg.TableProperties.PARQUET_DICT_SIZE_BYTES; +import static org.apache.iceberg.TableProperties.PARQUET_DICT_SIZE_BYTES_DEFAULT; +import static org.apache.iceberg.TableProperties.PARQUET_PAGE_ROW_LIMIT; +import static org.apache.iceberg.TableProperties.PARQUET_PAGE_ROW_LIMIT_DEFAULT; +import static org.apache.iceberg.TableProperties.PARQUET_PAGE_SIZE_BYTES; +import static org.apache.iceberg.TableProperties.PARQUET_PAGE_SIZE_BYTES_DEFAULT; +import static org.apache.iceberg.TableProperties.PARQUET_ROW_GROUP_CHECK_MAX_RECORD_COUNT; +import static org.apache.iceberg.TableProperties.PARQUET_ROW_GROUP_CHECK_MAX_RECORD_COUNT_DEFAULT; +import static org.apache.iceberg.TableProperties.PARQUET_ROW_GROUP_CHECK_MIN_RECORD_COUNT; +import static org.apache.iceberg.TableProperties.PARQUET_ROW_GROUP_CHECK_MIN_RECORD_COUNT_DEFAULT; +import static org.apache.iceberg.TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES; +import static org.apache.iceberg.TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES_DEFAULT; + +// TODO: remove class once upgraded to Iceberg v1.11.0 (https://github.com/apache/iceberg/pull/14153) + +public class Parquet { + private static final Logger LOG = LoggerFactory.getLogger(Parquet.class); + + private Parquet() { + } + + private static final Collection READ_PROPERTIES_TO_REMOVE = + Sets.newHashSet( + "parquet.read.filter", + "parquet.private.read.filter.predicate", + "parquet.read.support.class", + "parquet.crypto.factory.class"); + + public static WriteBuilder write(OutputFile file) { + if (file instanceof EncryptedOutputFile) { + return write((EncryptedOutputFile) file); + } + + return new WriteBuilder(file); + } + + public static WriteBuilder write(EncryptedOutputFile file) { + if (file instanceof NativeEncryptionOutputFile) { + NativeEncryptionOutputFile nativeFile = (NativeEncryptionOutputFile) file; + return write(nativeFile.plainOutputFile()) + .withFileEncryptionKey(nativeFile.keyMetadata().encryptionKey()) + .withAADPrefix(nativeFile.keyMetadata().aadPrefix()); + } else { + return write(file.encryptingOutputFile()); + } + } + + public static class WriteBuilder implements InternalData.WriteBuilder { + private final OutputFile file; + private final Configuration conf; + private final Map metadata = Maps.newLinkedHashMap(); + private final Map config = Maps.newLinkedHashMap(); + private Schema schema = null; + private VariantShreddingFunction variantShreddingFunc = null; + private String name = "table"; + private WriteSupport writeSupport = null; + private BiFunction> createWriterFunc = null; + private MetricsConfig metricsConfig = MetricsConfig.getDefault(); + private ParquetFileWriter.Mode writeMode = ParquetFileWriter.Mode.CREATE; + private WriterVersion writerVersion = WriterVersion.PARQUET_1_0; + private Function, Context> createContextFunc = Context::dataContext; + private ByteBuffer fileEncryptionKey = null; + private ByteBuffer fileAADPrefix = null; + + private WriteBuilder(OutputFile file) { + this.file = file; + if (file instanceof HadoopOutputFile) { + this.conf = new Configuration(((HadoopOutputFile) file).getConf()); + } else { + this.conf = new Configuration(); + } + } + + public WriteBuilder forTable(Table table) { + schema(table.schema()); + setAll(table.properties()); + metricsConfig(MetricsConfig.forTable(table)); + return this; + } + + @Override + public WriteBuilder schema(Schema newSchema) { + this.schema = newSchema; + return this; + } + + /** + * Set a {@link VariantShreddingFunction} that is called with each variant field's name and + * field ID to produce the shredding type as a {@code typed_value} field. This field is added to + * the result variant struct alongside the {@code metadata} and {@code value} fields. + * + * @param func {@link VariantShreddingFunction} that produces a shredded {@code typed_value} + * @return this for method chaining + */ + public WriteBuilder variantShreddingFunc(VariantShreddingFunction func) { + this.variantShreddingFunc = func; + return this; + } + + @Override + public WriteBuilder named(String newName) { + this.name = newName; + return this; + } + + public WriteBuilder writeSupport(WriteSupport newWriteSupport) { + this.writeSupport = newWriteSupport; + return this; + } + + @Override + public WriteBuilder set(String property, String value) { + config.put(property, value); + return this; + } + + public WriteBuilder setAll(Map properties) { + config.putAll(properties); + return this; + } + + @Override + public WriteBuilder meta(String property, String value) { + metadata.put(property, value); + return this; + } + + public WriteBuilder createWriterFunc( + Function> newCreateWriterFunc) { + if (newCreateWriterFunc != null) { + this.createWriterFunc = (icebergSchema, type) -> newCreateWriterFunc.apply(type); + } + return this; + } + + public WriteBuilder createWriterFunc( + BiFunction> newCreateWriterFunc) { + this.createWriterFunc = newCreateWriterFunc; + return this; + } + + public WriteBuilder metricsConfig(MetricsConfig newMetricsConfig) { + this.metricsConfig = newMetricsConfig; + return this; + } + + @Override + public WriteBuilder overwrite() { + return overwrite(true); + } + + public WriteBuilder overwrite(boolean enabled) { + this.writeMode = enabled ? ParquetFileWriter.Mode.OVERWRITE : ParquetFileWriter.Mode.CREATE; + return this; + } + + public WriteBuilder writerVersion(WriterVersion version) { + this.writerVersion = version; + return this; + } + + public WriteBuilder withFileEncryptionKey(ByteBuffer encryptionKey) { + this.fileEncryptionKey = encryptionKey; + return this; + } + + public WriteBuilder withAADPrefix(ByteBuffer aadPrefix) { + this.fileAADPrefix = aadPrefix; + return this; + } + + @SuppressWarnings("unchecked") + private WriteSupport getWriteSupport(MessageType type) { + if (writeSupport != null) { + return (WriteSupport) writeSupport; + } else { + return new AvroWriteSupport<>( + type, + ParquetAvro.parquetAvroSchema(AvroSchemaUtil.convert(schema, name)), + ParquetAvro.DEFAULT_MODEL); + } + } + + /* + * Sets the writer version. Default value is PARQUET_1_0 (v1). + */ + @VisibleForTesting + WriteBuilder withWriterVersion(WriterVersion version) { + this.writerVersion = version; + return this; + } + + // supposed to always be a private method used strictly by data and delete write builders + private WriteBuilder createContextFunc( + Function, Context> newCreateContextFunc) { + this.createContextFunc = newCreateContextFunc; + return this; + } + + private void setBloomFilterConfig( + Context context, + Map colNameToParquetPathMap, + BiConsumer withBloomFilterEnabled, + BiConsumer withBloomFilterFPP) { + + context + .columnBloomFilterEnabled() + .forEach( + (colPath, isEnabled) -> { + String parquetColumnPath = colNameToParquetPathMap.get(colPath); + if (parquetColumnPath == null) { + LOG.warn("Skipping bloom filter config for missing field: {}", colPath); + return; + } + + withBloomFilterEnabled.accept(parquetColumnPath, Boolean.valueOf(isEnabled)); + String fpp = context.columnBloomFilterFpp().get(colPath); + if (fpp != null) { + withBloomFilterFPP.accept(parquetColumnPath, Double.parseDouble(fpp)); + } + }); + } + + private void setColumnStatsConfig( + Context context, + Map colNameToParquetPathMap, + BiConsumer withColumnStatsEnabled) { + + context + .columnStatsEnabled() + .forEach( + (colPath, isEnabled) -> { + String parquetColumnPath = colNameToParquetPathMap.get(colPath); + if (parquetColumnPath == null) { + LOG.warn("Skipping column statistics config for missing field: {}", colPath); + return; + } + withColumnStatsEnabled.accept(parquetColumnPath, Boolean.valueOf(isEnabled)); + }); + } + + @Override + public FileAppender build() throws IOException { + Preconditions.checkNotNull(schema, "Schema is required"); + Preconditions.checkNotNull(name, "Table name is required and cannot be null"); + + // add the Iceberg schema to keyValueMetadata + meta("iceberg.schema", SchemaParser.toJson(schema)); + + // Map Iceberg properties to pass down to the Parquet writer + Context context = createContextFunc.apply(config); + + int rowGroupSize = context.rowGroupSize(); + int pageSize = context.pageSize(); + int pageRowLimit = context.pageRowLimit(); + int dictionaryPageSize = context.dictionaryPageSize(); + String compressionLevel = context.compressionLevel(); + CompressionCodecName codec = context.codec(); + int rowGroupCheckMinRecordCount = context.rowGroupCheckMinRecordCount(); + int rowGroupCheckMaxRecordCount = context.rowGroupCheckMaxRecordCount(); + int bloomFilterMaxBytes = context.bloomFilterMaxBytes(); + boolean dictionaryEnabled = context.dictionaryEnabled(); + + if (compressionLevel != null) { + switch (codec) { + case GZIP: + config.put("zlib.compress.level", compressionLevel); + break; + case BROTLI: + config.put("compression.brotli.quality", compressionLevel); + break; + case ZSTD: + // keep "io.compression.codec.zstd.level" for backwards compatibility + config.put("io.compression.codec.zstd.level", compressionLevel); + config.put("parquet.compression.codec.zstd.level", compressionLevel); + break; + default: + // compression level is not supported; ignore it + } + } + + set("parquet.avro.write-old-list-structure", "false"); + MessageType type = ParquetSchemaUtil.convert(schema, name, variantShreddingFunc); + + FileEncryptionProperties fileEncryptionProperties = null; + if (fileEncryptionKey != null) { + byte[] encryptionKeyArray = ByteBuffers.toByteArray(fileEncryptionKey); + byte[] aadPrefixArray = ByteBuffers.toByteArray(fileAADPrefix); + + fileEncryptionProperties = + FileEncryptionProperties.builder(encryptionKeyArray) + .withAADPrefix(aadPrefixArray) + .withoutAADPrefixStorage() + .build(); + } else { + Preconditions.checkState(fileAADPrefix == null, "AAD prefix set with null encryption key"); + } + + Map colNameToParquetPathMap = + type.getColumns().stream() + .filter( + col -> { + ID id = col.getPrimitiveType().getId(); + return id != null && schema.findColumnName(id.intValue()) != null; + }) + .collect( + Collectors.toMap( + col -> schema.findColumnName(col.getPrimitiveType().getId().intValue()), + col -> String.join(".", col.getPath()))); + + if (createWriterFunc != null) { + Preconditions.checkArgument( + writeSupport == null, "Cannot write with both write support and Parquet value writer"); + + for (Map.Entry entry : config.entrySet()) { + conf.set(entry.getKey(), entry.getValue()); + } + + ParquetProperties.Builder propsBuilder = + ParquetProperties.builder() + .withWriterVersion(writerVersion) + .withPageSize(pageSize) + .withPageRowCountLimit(pageRowLimit) + .withDictionaryEncoding(dictionaryEnabled) + .withDictionaryPageSize(dictionaryPageSize) + .withMinRowCountForPageSizeCheck(rowGroupCheckMinRecordCount) + .withMaxRowCountForPageSizeCheck(rowGroupCheckMaxRecordCount) + .withMaxBloomFilterBytes(bloomFilterMaxBytes); + + setBloomFilterConfig( + context, + colNameToParquetPathMap, + propsBuilder::withBloomFilterEnabled, + propsBuilder::withBloomFilterFPP); + + setColumnStatsConfig(context, colNameToParquetPathMap, propsBuilder::withStatisticsEnabled); + + ParquetProperties parquetProperties = propsBuilder.build(); + + return new org.apache.iceberg.parquet.ParquetWriter<>( + conf, + file, + schema, + type, + rowGroupSize, + metadata, + createWriterFunc, + codec, + parquetProperties, + metricsConfig, + writeMode, + fileEncryptionProperties); + } else { + ParquetWriteBuilder parquetWriteBuilder = + new ParquetWriteBuilder(ParquetIO.file(file)) + .withWriterVersion(writerVersion) + .setType(type) + .setConfig(config) + .setKeyValueMetadata(metadata) + .setWriteSupport(getWriteSupport(type)) + .withCompressionCodec(codec) + .withWriteMode(writeMode) + .withRowGroupSize(rowGroupSize) + .withPageSize(pageSize) + .withPageRowCountLimit(pageRowLimit) + .withDictionaryEncoding(dictionaryEnabled) + .withDictionaryPageSize(dictionaryPageSize) + .withEncryption(fileEncryptionProperties); + + setBloomFilterConfig( + context, + colNameToParquetPathMap, + parquetWriteBuilder::withBloomFilterEnabled, + parquetWriteBuilder::withBloomFilterFPP); + + setColumnStatsConfig( + context, colNameToParquetPathMap, parquetWriteBuilder::withStatisticsEnabled); + + return new ParquetWriteAdapter<>(parquetWriteBuilder.build(), metricsConfig); + } + } + + private static class Context { + private final int rowGroupSize; + private final int pageSize; + private final int pageRowLimit; + private final int dictionaryPageSize; + private final CompressionCodecName codec; + private final String compressionLevel; + private final int rowGroupCheckMinRecordCount; + private final int rowGroupCheckMaxRecordCount; + private final int bloomFilterMaxBytes; + private final Map columnBloomFilterFpp; + private final Map columnBloomFilterEnabled; + private final Map columnStatsEnabled; + private final boolean dictionaryEnabled; + + private Context( + int rowGroupSize, + int pageSize, + int pageRowLimit, + int dictionaryPageSize, + CompressionCodecName codec, + String compressionLevel, + int rowGroupCheckMinRecordCount, + int rowGroupCheckMaxRecordCount, + int bloomFilterMaxBytes, + Map columnBloomFilterFpp, + Map columnBloomFilterEnabled, + Map columnStatsEnabled, + boolean dictionaryEnabled) { + this.rowGroupSize = rowGroupSize; + this.pageSize = pageSize; + this.pageRowLimit = pageRowLimit; + this.dictionaryPageSize = dictionaryPageSize; + this.codec = codec; + this.compressionLevel = compressionLevel; + this.rowGroupCheckMinRecordCount = rowGroupCheckMinRecordCount; + this.rowGroupCheckMaxRecordCount = rowGroupCheckMaxRecordCount; + this.bloomFilterMaxBytes = bloomFilterMaxBytes; + this.columnBloomFilterFpp = columnBloomFilterFpp; + this.columnBloomFilterEnabled = columnBloomFilterEnabled; + this.columnStatsEnabled = columnStatsEnabled; + this.dictionaryEnabled = dictionaryEnabled; + } + + static Context dataContext(Map config) { + int rowGroupSize = + PropertyUtil.propertyAsInt( + config, PARQUET_ROW_GROUP_SIZE_BYTES, PARQUET_ROW_GROUP_SIZE_BYTES_DEFAULT); + Preconditions.checkArgument(rowGroupSize > 0, "Row group size must be > 0"); + + int pageSize = + PropertyUtil.propertyAsInt( + config, PARQUET_PAGE_SIZE_BYTES, PARQUET_PAGE_SIZE_BYTES_DEFAULT); + Preconditions.checkArgument(pageSize > 0, "Page size must be > 0"); + + int pageRowLimit = + PropertyUtil.propertyAsInt( + config, PARQUET_PAGE_ROW_LIMIT, PARQUET_PAGE_ROW_LIMIT_DEFAULT); + Preconditions.checkArgument(pageRowLimit > 0, "Page row count limit must be > 0"); + + int dictionaryPageSize = + PropertyUtil.propertyAsInt( + config, PARQUET_DICT_SIZE_BYTES, PARQUET_DICT_SIZE_BYTES_DEFAULT); + Preconditions.checkArgument(dictionaryPageSize > 0, "Dictionary page size must be > 0"); + + String codecAsString = + config.getOrDefault(PARQUET_COMPRESSION, PARQUET_COMPRESSION_DEFAULT); + CompressionCodecName codec = toCodec(codecAsString); + + String compressionLevel = + config.getOrDefault(PARQUET_COMPRESSION_LEVEL, PARQUET_COMPRESSION_LEVEL_DEFAULT); + + int rowGroupCheckMinRecordCount = + PropertyUtil.propertyAsInt( + config, + PARQUET_ROW_GROUP_CHECK_MIN_RECORD_COUNT, + PARQUET_ROW_GROUP_CHECK_MIN_RECORD_COUNT_DEFAULT); + Preconditions.checkArgument( + rowGroupCheckMinRecordCount > 0, "Row group check minimal record count must be > 0"); + + int rowGroupCheckMaxRecordCount = + PropertyUtil.propertyAsInt( + config, + PARQUET_ROW_GROUP_CHECK_MAX_RECORD_COUNT, + PARQUET_ROW_GROUP_CHECK_MAX_RECORD_COUNT_DEFAULT); + Preconditions.checkArgument( + rowGroupCheckMaxRecordCount > 0, "Row group check maximum record count must be > 0"); + Preconditions.checkArgument( + rowGroupCheckMaxRecordCount >= rowGroupCheckMinRecordCount, + "Row group check maximum record count must be >= minimal record count"); + + int bloomFilterMaxBytes = + PropertyUtil.propertyAsInt( + config, PARQUET_BLOOM_FILTER_MAX_BYTES, PARQUET_BLOOM_FILTER_MAX_BYTES_DEFAULT); + Preconditions.checkArgument(bloomFilterMaxBytes > 0, "bloom Filter Max Bytes must be > 0"); + + Map columnBloomFilterFpp = + PropertyUtil.propertiesWithPrefix(config, PARQUET_BLOOM_FILTER_COLUMN_FPP_PREFIX); + + Map columnBloomFilterEnabled = + PropertyUtil.propertiesWithPrefix(config, PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX); + + Map columnStatsEnabled = + PropertyUtil.propertiesWithPrefix(config, PARQUET_COLUMN_STATS_ENABLED_PREFIX); + + boolean dictionaryEnabled = + PropertyUtil.propertyAsBoolean(config, ParquetOutputFormat.ENABLE_DICTIONARY, true); + + return new Context( + rowGroupSize, + pageSize, + pageRowLimit, + dictionaryPageSize, + codec, + compressionLevel, + rowGroupCheckMinRecordCount, + rowGroupCheckMaxRecordCount, + bloomFilterMaxBytes, + columnBloomFilterFpp, + columnBloomFilterEnabled, + columnStatsEnabled, + dictionaryEnabled); + } + + static Context deleteContext(Map config) { + // default delete config using data config + Context dataContext = dataContext(config); + + int rowGroupSize = + PropertyUtil.propertyAsInt( + config, DELETE_PARQUET_ROW_GROUP_SIZE_BYTES, dataContext.rowGroupSize()); + Preconditions.checkArgument(rowGroupSize > 0, "Row group size must be > 0"); + + int pageSize = + PropertyUtil.propertyAsInt( + config, DELETE_PARQUET_PAGE_SIZE_BYTES, dataContext.pageSize()); + Preconditions.checkArgument(pageSize > 0, "Page size must be > 0"); + + int pageRowLimit = + PropertyUtil.propertyAsInt( + config, DELETE_PARQUET_PAGE_ROW_LIMIT, dataContext.pageRowLimit()); + Preconditions.checkArgument(pageRowLimit > 0, "Page row count limit must be > 0"); + + int dictionaryPageSize = + PropertyUtil.propertyAsInt( + config, DELETE_PARQUET_DICT_SIZE_BYTES, dataContext.dictionaryPageSize()); + Preconditions.checkArgument(dictionaryPageSize > 0, "Dictionary page size must be > 0"); + + String codecAsString = config.get(DELETE_PARQUET_COMPRESSION); + CompressionCodecName codec = + codecAsString != null ? toCodec(codecAsString) : dataContext.codec(); + + String compressionLevel = + config.getOrDefault(DELETE_PARQUET_COMPRESSION_LEVEL, dataContext.compressionLevel()); + + int rowGroupCheckMinRecordCount = + PropertyUtil.propertyAsInt( + config, + DELETE_PARQUET_ROW_GROUP_CHECK_MIN_RECORD_COUNT, + dataContext.rowGroupCheckMinRecordCount()); + Preconditions.checkArgument( + rowGroupCheckMinRecordCount > 0, "Row group check minimal record count must be > 0"); + + int rowGroupCheckMaxRecordCount = + PropertyUtil.propertyAsInt( + config, + DELETE_PARQUET_ROW_GROUP_CHECK_MAX_RECORD_COUNT, + dataContext.rowGroupCheckMaxRecordCount()); + Preconditions.checkArgument( + rowGroupCheckMaxRecordCount > 0, "Row group check maximum record count must be > 0"); + Preconditions.checkArgument( + rowGroupCheckMaxRecordCount >= rowGroupCheckMinRecordCount, + "Row group check maximum record count must be >= minimal record count"); + + boolean dictionaryEnabled = + PropertyUtil.propertyAsBoolean(config, ParquetOutputFormat.ENABLE_DICTIONARY, true); + + return new Context( + rowGroupSize, + pageSize, + pageRowLimit, + dictionaryPageSize, + codec, + compressionLevel, + rowGroupCheckMinRecordCount, + rowGroupCheckMaxRecordCount, + PARQUET_BLOOM_FILTER_MAX_BYTES_DEFAULT, + ImmutableMap.of(), + ImmutableMap.of(), + ImmutableMap.of(), + dictionaryEnabled); + } + + private static CompressionCodecName toCodec(String codecAsString) { + try { + return CompressionCodecName.valueOf(codecAsString.toUpperCase(Locale.ENGLISH)); + } catch (IllegalArgumentException e) { + throw new IllegalArgumentException("Unsupported compression codec: " + codecAsString); + } + } + + int rowGroupSize() { + return rowGroupSize; + } + + int pageSize() { + return pageSize; + } + + int pageRowLimit() { + return pageRowLimit; + } + + int dictionaryPageSize() { + return dictionaryPageSize; + } + + CompressionCodecName codec() { + return codec; + } + + String compressionLevel() { + return compressionLevel; + } + + int rowGroupCheckMinRecordCount() { + return rowGroupCheckMinRecordCount; + } + + int rowGroupCheckMaxRecordCount() { + return rowGroupCheckMaxRecordCount; + } + + int bloomFilterMaxBytes() { + return bloomFilterMaxBytes; + } + + Map columnBloomFilterFpp() { + return columnBloomFilterFpp; + } + + Map columnBloomFilterEnabled() { + return columnBloomFilterEnabled; + } + + Map columnStatsEnabled() { + return columnStatsEnabled; + } + + boolean dictionaryEnabled() { + return dictionaryEnabled; + } + } + } + + public static DataWriteBuilder writeData(OutputFile file) { + return new DataWriteBuilder(file); + } + + public static DataWriteBuilder writeData(EncryptedOutputFile file) { + if (file instanceof NativeEncryptionOutputFile) { + NativeEncryptionOutputFile nativeFile = (NativeEncryptionOutputFile) file; + return writeData(nativeFile.plainOutputFile()) + .withFileEncryptionKey(nativeFile.keyMetadata().encryptionKey()) + .withAADPrefix(nativeFile.keyMetadata().aadPrefix()); + } else { + return writeData(file.encryptingOutputFile()); + } + } + + public static class DataWriteBuilder { + private final WriteBuilder appenderBuilder; + private final String location; + private PartitionSpec spec = null; + private StructLike partition = null; + private EncryptionKeyMetadata keyMetadata = null; + private SortOrder sortOrder = null; + + private DataWriteBuilder(OutputFile file) { + this.appenderBuilder = write(file); + this.location = file.location(); + } + + public DataWriteBuilder forTable(Table table) { + schema(table.schema()); + withSpec(table.spec()); + setAll(table.properties()); + metricsConfig(MetricsConfig.forTable(table)); + return this; + } + + public DataWriteBuilder schema(Schema newSchema) { + appenderBuilder.schema(newSchema); + return this; + } + + public DataWriteBuilder set(String property, String value) { + appenderBuilder.set(property, value); + return this; + } + + public DataWriteBuilder setAll(Map properties) { + appenderBuilder.setAll(properties); + return this; + } + + public DataWriteBuilder meta(String property, String value) { + appenderBuilder.meta(property, value); + return this; + } + + public DataWriteBuilder overwrite() { + return overwrite(true); + } + + public DataWriteBuilder overwrite(boolean enabled) { + appenderBuilder.overwrite(enabled); + return this; + } + + public DataWriteBuilder metricsConfig(MetricsConfig newMetricsConfig) { + appenderBuilder.metricsConfig(newMetricsConfig); + return this; + } + + public DataWriteBuilder createWriterFunc( + Function> newCreateWriterFunc) { + appenderBuilder.createWriterFunc(newCreateWriterFunc); + return this; + } + + public DataWriteBuilder createWriterFunc( + BiFunction> newCreateWriterFunc) { + appenderBuilder.createWriterFunc(newCreateWriterFunc); + return this; + } + + public DataWriteBuilder variantShreddingFunc(VariantShreddingFunction func) { + appenderBuilder.variantShreddingFunc(func); + return this; + } + + public DataWriteBuilder withSpec(PartitionSpec newSpec) { + this.spec = newSpec; + return this; + } + + public DataWriteBuilder withPartition(StructLike newPartition) { + this.partition = newPartition; + return this; + } + + public DataWriteBuilder withKeyMetadata(EncryptionKeyMetadata metadata) { + this.keyMetadata = metadata; + return this; + } + + public DataWriteBuilder withFileEncryptionKey(ByteBuffer fileEncryptionKey) { + appenderBuilder.withFileEncryptionKey(fileEncryptionKey); + return this; + } + + public DataWriteBuilder withAADPrefix(ByteBuffer aadPrefix) { + appenderBuilder.withAADPrefix(aadPrefix); + return this; + } + + public DataWriteBuilder withSortOrder(SortOrder newSortOrder) { + this.sortOrder = newSortOrder; + return this; + } + + public DataWriter build() throws IOException { + Preconditions.checkArgument(spec != null, "Cannot create data writer without spec"); + Preconditions.checkArgument( + spec.isUnpartitioned() || partition != null, + "Partition must not be null when creating data writer for partitioned spec"); + + FileAppender fileAppender = appenderBuilder.build(); + return new DataWriter<>( + fileAppender, FileFormat.PARQUET, location, spec, partition, keyMetadata, sortOrder); + } + } + + public static DeleteWriteBuilder writeDeletes(OutputFile file) { + return new DeleteWriteBuilder(file); + } + + public static DeleteWriteBuilder writeDeletes(EncryptedOutputFile file) { + if (file instanceof NativeEncryptionOutputFile) { + NativeEncryptionOutputFile nativeFile = (NativeEncryptionOutputFile) file; + return writeDeletes(nativeFile.plainOutputFile()) + .withFileEncryptionKey(nativeFile.keyMetadata().encryptionKey()) + .withAADPrefix(nativeFile.keyMetadata().aadPrefix()); + } else { + return writeDeletes(file.encryptingOutputFile()); + } + } + + public static class DeleteWriteBuilder { + private final WriteBuilder appenderBuilder; + private final String location; + private BiFunction> createWriterFunc = null; + private Schema rowSchema = null; + private PartitionSpec spec = null; + private StructLike partition = null; + private EncryptionKeyMetadata keyMetadata = null; + private int[] equalityFieldIds = null; + private SortOrder sortOrder; + private Function pathTransformFunc = Function.identity(); + + private DeleteWriteBuilder(OutputFile file) { + this.appenderBuilder = write(file); + this.location = file.location(); + } + + public DeleteWriteBuilder forTable(Table table) { + rowSchema(table.schema()); + withSpec(table.spec()); + setAll(table.properties()); + metricsConfig(MetricsConfig.forTable(table)); + return this; + } + + public DeleteWriteBuilder set(String property, String value) { + appenderBuilder.set(property, value); + return this; + } + + public DeleteWriteBuilder setAll(Map properties) { + appenderBuilder.setAll(properties); + return this; + } + + public DeleteWriteBuilder meta(String property, String value) { + appenderBuilder.meta(property, value); + return this; + } + + public DeleteWriteBuilder overwrite() { + return overwrite(true); + } + + public DeleteWriteBuilder overwrite(boolean enabled) { + appenderBuilder.overwrite(enabled); + return this; + } + + public DeleteWriteBuilder metricsConfig(MetricsConfig newMetricsConfig) { + appenderBuilder.metricsConfig(newMetricsConfig); + return this; + } + + public DeleteWriteBuilder createWriterFunc( + Function> newCreateWriterFunc) { + this.createWriterFunc = (ignored, fileSchema) -> newCreateWriterFunc.apply(fileSchema); + return this; + } + + public DeleteWriteBuilder createWriterFunc( + BiFunction> newCreateWriterFunc) { + this.createWriterFunc = newCreateWriterFunc; + return this; + } + + public DeleteWriteBuilder rowSchema(Schema newSchema) { + this.rowSchema = newSchema; + return this; + } + + public DeleteWriteBuilder withSpec(PartitionSpec newSpec) { + this.spec = newSpec; + return this; + } + + public DeleteWriteBuilder withPartition(StructLike key) { + this.partition = key; + return this; + } + + public DeleteWriteBuilder withKeyMetadata(EncryptionKeyMetadata metadata) { + this.keyMetadata = metadata; + return this; + } + + public DeleteWriteBuilder withFileEncryptionKey(ByteBuffer fileEncryptionKey) { + appenderBuilder.withFileEncryptionKey(fileEncryptionKey); + return this; + } + + public DeleteWriteBuilder withAADPrefix(ByteBuffer aadPrefix) { + appenderBuilder.withAADPrefix(aadPrefix); + return this; + } + + public DeleteWriteBuilder equalityFieldIds(List fieldIds) { + this.equalityFieldIds = ArrayUtil.toIntArray(fieldIds); + return this; + } + + public DeleteWriteBuilder equalityFieldIds(int... fieldIds) { + this.equalityFieldIds = fieldIds; + return this; + } + + public DeleteWriteBuilder transformPaths(Function newPathTransformFunc) { + this.pathTransformFunc = newPathTransformFunc; + return this; + } + + public DeleteWriteBuilder withSortOrder(SortOrder newSortOrder) { + this.sortOrder = newSortOrder; + return this; + } + + public EqualityDeleteWriter buildEqualityWriter() throws IOException { + Preconditions.checkState( + rowSchema != null, "Cannot create equality delete file without a schema"); + Preconditions.checkState( + equalityFieldIds != null, "Cannot create equality delete file without delete field ids"); + Preconditions.checkState( + createWriterFunc != null, + "Cannot create equality delete file unless createWriterFunc is set"); + Preconditions.checkArgument( + spec != null, "Spec must not be null when creating equality delete writer"); + Preconditions.checkArgument( + spec.isUnpartitioned() || partition != null, + "Partition must not be null for partitioned writes"); + + meta("delete-type", "equality"); + meta( + "delete-field-ids", + IntStream.of(equalityFieldIds) + .mapToObj(Objects::toString) + .collect(Collectors.joining(", "))); + + // the appender uses the row schema without extra columns + appenderBuilder.schema(rowSchema); + appenderBuilder.createWriterFunc(createWriterFunc); + appenderBuilder.createContextFunc(WriteBuilder.Context::deleteContext); + + return new EqualityDeleteWriter<>( + appenderBuilder.build(), + FileFormat.PARQUET, + location, + spec, + partition, + keyMetadata, + sortOrder, + equalityFieldIds); + } + + public PositionDeleteWriter buildPositionWriter() throws IOException { + Preconditions.checkState( + equalityFieldIds == null, "Cannot create position delete file using delete field ids"); + Preconditions.checkArgument( + spec != null, "Spec must not be null when creating position delete writer"); + Preconditions.checkArgument( + spec.isUnpartitioned() || partition != null, + "Partition must not be null for partitioned writes"); + Preconditions.checkArgument( + rowSchema == null || createWriterFunc != null, + "Create function should be provided if we write row data"); + + meta("delete-type", "position"); + + if (rowSchema != null && createWriterFunc != null) { + // the appender uses the row schema wrapped with position fields + appenderBuilder.schema(DeleteSchemaUtil.posDeleteSchema(rowSchema)); + + appenderBuilder.createWriterFunc( + (schema, parquetSchema) -> { + ParquetValueWriter writer = createWriterFunc.apply(schema, parquetSchema); + if (writer instanceof StructWriter) { + return new PositionDeleteStructWriter( + (StructWriter) writer, pathTransformFunc); + } else { + throw new UnsupportedOperationException( + "Cannot wrap writer for position deletes: " + writer.getClass()); + } + }); + + } else { + appenderBuilder.schema(DeleteSchemaUtil.pathPosSchema()); + + // We ignore the 'createWriterFunc' and 'rowSchema' even if is provided, since we do not + // write row data itself + appenderBuilder.createWriterFunc( + (schema, parquetSchema) -> + new PositionDeleteStructWriter( + (StructWriter) GenericParquetWriter.create(schema, parquetSchema), + Function.identity())); + } + + appenderBuilder.createContextFunc(WriteBuilder.Context::deleteContext); + + return new PositionDeleteWriter<>( + appenderBuilder.build(), FileFormat.PARQUET, location, spec, partition, keyMetadata); + } + } + + private static class ParquetWriteBuilder + extends ParquetWriter.Builder> { + private Map keyValueMetadata = Maps.newHashMap(); + private Map config = Maps.newHashMap(); + private MessageType type; + private WriteSupport writeSupport; + + private ParquetWriteBuilder(org.apache.parquet.io.OutputFile path) { + super(path); + } + + @Override + protected ParquetWriteBuilder self() { + return this; + } + + public ParquetWriteBuilder setKeyValueMetadata(Map keyValueMetadata) { + this.keyValueMetadata = keyValueMetadata; + return self(); + } + + public ParquetWriteBuilder setConfig(Map config) { + this.config = config; + return self(); + } + + public ParquetWriteBuilder setType(MessageType type) { + this.type = type; + return self(); + } + + public ParquetWriteBuilder setWriteSupport(WriteSupport writeSupport) { + this.writeSupport = writeSupport; + return self(); + } + + @Override + protected WriteSupport getWriteSupport(Configuration configuration) { + for (Map.Entry entry : config.entrySet()) { + configuration.set(entry.getKey(), entry.getValue()); + } + return new ParquetWriteSupport<>(type, keyValueMetadata, writeSupport); + } + } + + public static ReadBuilder read(InputFile file) { + if (file instanceof NativeEncryptionInputFile) { + NativeEncryptionInputFile nativeFile = (NativeEncryptionInputFile) file; + return new ReadBuilder(nativeFile.encryptedInputFile()) + .withFileEncryptionKey(nativeFile.keyMetadata().encryptionKey()) + .withAADPrefix(nativeFile.keyMetadata().aadPrefix()); + } else { + return new ReadBuilder(file); + } + } + + public static class ReadBuilder implements InternalData.ReadBuilder { + private final InputFile file; + private final Map properties = Maps.newHashMap(); + private Long start = null; + private Long length = null; + private Schema schema = null; + private Expression filter = null; + private ReadSupport readSupport = null; + private Function> batchedReaderFunc = null; + private Function> readerFunc = null; + private BiFunction> readerFuncWithSchema = null; + private boolean filterRecords = true; + private boolean caseSensitive = true; + private boolean callInit = false; + private boolean reuseContainers = false; + private int maxRecordsPerBatch = 10000; + private NameMapping nameMapping = null; + private ByteBuffer fileEncryptionKey = null; + private ByteBuffer fileAADPrefix = null; + + private ReadBuilder(InputFile file) { + this.file = file; + } + + /** + * Restricts the read to the given range: [start, start + length). + * + * @param newStart the start position for this read + * @param newLength the length of the range this read should scan + * @return this builder for method chaining + */ + @Override + public ReadBuilder split(long newStart, long newLength) { + this.start = newStart; + this.length = newLength; + return this; + } + + @Override + public ReadBuilder project(Schema newSchema) { + this.schema = newSchema; + return this; + } + + public ReadBuilder caseInsensitive() { + return caseSensitive(false); + } + + public ReadBuilder caseSensitive(boolean newCaseSensitive) { + this.caseSensitive = newCaseSensitive; + return this; + } + + public ReadBuilder filterRecords(boolean newFilterRecords) { + this.filterRecords = newFilterRecords; + return this; + } + + public ReadBuilder filter(Expression newFilter) { + this.filter = newFilter; + return this; + } + + /** + * @deprecated will be removed in 2.0.0; use {@link #createReaderFunc(Function)} instead + */ + @Deprecated + public ReadBuilder readSupport(ReadSupport newFilterSupport) { + this.readSupport = newFilterSupport; + return this; + } + + public ReadBuilder createReaderFunc( + Function> newReaderFunction) { + Preconditions.checkArgument( + this.batchedReaderFunc == null, + "Cannot set reader function: batched reader function already set"); + Preconditions.checkArgument( + this.readerFuncWithSchema == null, + "Cannot set reader function: 2-argument reader function already set"); + this.readerFunc = newReaderFunction; + return this; + } + + public ReadBuilder createReaderFunc( + BiFunction> newReaderFunction) { + Preconditions.checkArgument( + this.readerFunc == null, + "Cannot set 2-argument reader function: reader function already set"); + Preconditions.checkArgument( + this.batchedReaderFunc == null, + "Cannot set 2-argument reader function: batched reader function already set"); + this.readerFuncWithSchema = newReaderFunction; + return this; + } + + public ReadBuilder createBatchedReaderFunc(Function> func) { + Preconditions.checkArgument( + this.readerFunc == null, + "Cannot set batched reader function: reader function already set"); + Preconditions.checkArgument( + this.readerFuncWithSchema == null, + "Cannot set batched reader function: 2-argument reader function already set"); + this.batchedReaderFunc = func; + return this; + } + + public ReadBuilder set(String key, String value) { + properties.put(key, value); + return this; + } + + /** + * @deprecated will be removed in 2.0.0; use {@link #createReaderFunc(Function)} instead + */ + @Deprecated + public ReadBuilder callInit() { + this.callInit = true; + return this; + } + + @Override + public ReadBuilder reuseContainers() { + this.reuseContainers = true; + return this; + } + + public ReadBuilder recordsPerBatch(int numRowsPerBatch) { + this.maxRecordsPerBatch = numRowsPerBatch; + return this; + } + + public ReadBuilder withNameMapping(NameMapping newNameMapping) { + this.nameMapping = newNameMapping; + return this; + } + + @Override + public ReadBuilder setRootType(Class rootClass) { + throw new UnsupportedOperationException("Custom types are not yet supported"); + } + + @Override + public ReadBuilder setCustomType(int fieldId, Class structClass) { + throw new UnsupportedOperationException("Custom types are not yet supported"); + } + + public ReadBuilder withFileEncryptionKey(ByteBuffer encryptionKey) { + this.fileEncryptionKey = encryptionKey; + return this; + } + + public ReadBuilder withAADPrefix(ByteBuffer aadPrefix) { + this.fileAADPrefix = aadPrefix; + return this; + } + + @Override + @SuppressWarnings({"unchecked", "checkstyle:CyclomaticComplexity"}) + public CloseableIterable build() { + FileDecryptionProperties fileDecryptionProperties = null; + if (fileEncryptionKey != null) { + byte[] encryptionKeyArray = ByteBuffers.toByteArray(fileEncryptionKey); + byte[] aadPrefixArray = ByteBuffers.toByteArray(fileAADPrefix); + fileDecryptionProperties = + FileDecryptionProperties.builder() + .withFooterKey(encryptionKeyArray) + .withAADPrefix(aadPrefixArray) + .build(); + } else { + Preconditions.checkState(fileAADPrefix == null, "AAD prefix set with null encryption key"); + } + + if (readerFunc != null || readerFuncWithSchema != null || batchedReaderFunc != null) { + ParquetReadOptions.Builder optionsBuilder; + if (file instanceof HadoopInputFile) { + // remove read properties already set that may conflict with this read + Configuration conf = new Configuration(((HadoopInputFile) file).getConf()); + for (String property : READ_PROPERTIES_TO_REMOVE) { + conf.unset(property); + } + optionsBuilder = HadoopReadOptions.builder(conf); + } else { + optionsBuilder = ParquetReadOptions.builder(new PlainParquetConfiguration()); + } + + for (Map.Entry entry : properties.entrySet()) { + optionsBuilder.set(entry.getKey(), entry.getValue()); + } + + if (start != null) { + optionsBuilder.withRange(start, start + length); + } + + if (fileDecryptionProperties != null) { + optionsBuilder.withDecryption(fileDecryptionProperties); + } + + ParquetReadOptions options = optionsBuilder.build(); + + NameMapping mapping; + if (nameMapping != null) { + mapping = nameMapping; + } else if (SystemConfigs.NETFLIX_UNSAFE_PARQUET_ID_FALLBACK_ENABLED.value()) { + mapping = null; + } else { + mapping = NameMapping.empty(); + } + + if (batchedReaderFunc != null) { + return new VectorizedParquetReader<>( + file, + schema, + options, + batchedReaderFunc, + mapping, + filter, + reuseContainers, + caseSensitive, + maxRecordsPerBatch); + } else { + Function> readBuilder = + readerFuncWithSchema != null ? + fileType -> readerFuncWithSchema.apply(schema, fileType) : + readerFunc; + return new org.apache.iceberg.parquet.ParquetReader<>( + file, schema, options, readBuilder, mapping, filter, reuseContainers, caseSensitive); + } + } + + ParquetReadBuilder builder = new ParquetReadBuilder<>(ParquetIO.file(file)); + + builder.project(schema); + + if (readSupport != null) { + builder.readSupport((ReadSupport) readSupport); + } else { + builder.readSupport(new AvroReadSupport<>(ParquetAvro.DEFAULT_MODEL)); + } + + // default options for readers + builder + .set("parquet.strict.typing", "false") // allow type promotion + .set("parquet.avro.compatible", "false") // use the new RecordReader with Utf8 support + .set( + "parquet.avro.add-list-element-records", + "false"); // assume that lists use a 3-level schema + + for (Map.Entry entry : properties.entrySet()) { + builder.set(entry.getKey(), entry.getValue()); + } + + if (filter != null) { + // TODO: should not need to get the schema to push down before opening the file. + // Parquet should allow setting a filter inside its read support + ParquetReadOptions decryptOptions = + ParquetReadOptions.builder(new PlainParquetConfiguration()) + .withDecryption(fileDecryptionProperties) + .build(); + MessageType type; + try (ParquetFileReader schemaReader = + ParquetFileReader.open(ParquetIO.file(file), decryptOptions)) { + type = schemaReader.getFileMetaData().getSchema(); + } catch (IOException e) { + throw new RuntimeIOException(e); + } + Schema fileSchema = ParquetSchemaUtil.convert(type); + builder + .useStatsFilter() + .useDictionaryFilter() + .useRecordFilter(filterRecords) + .useBloomFilter() + .withFilter(ParquetFilters.convert(fileSchema, filter, caseSensitive)); + } else { + // turn off filtering + builder + .useStatsFilter(false) + .useDictionaryFilter(false) + .useBloomFilter(false) + .useRecordFilter(false); + } + + if (callInit) { + builder.callInit(); + } + + if (start != null) { + builder.withFileRange(start, start + length); + } + + if (nameMapping != null) { + builder.withNameMapping(nameMapping); + } + + if (fileDecryptionProperties != null) { + builder.withDecryption(fileDecryptionProperties); + } + + return new ParquetIterable<>(builder); + } + } + + private static class ParquetReadBuilder extends ParquetReader.Builder { + private Schema schema = null; + private ReadSupport readSupport = null; + private boolean callInit = false; + private NameMapping nameMapping = null; + + private ParquetReadBuilder(org.apache.parquet.io.InputFile file) { + super(file); + } + + public ParquetReadBuilder project(Schema newSchema) { + this.schema = newSchema; + return this; + } + + public ParquetReadBuilder withNameMapping(NameMapping newNameMapping) { + this.nameMapping = newNameMapping; + return this; + } + + public ParquetReadBuilder readSupport(ReadSupport newReadSupport) { + this.readSupport = newReadSupport; + return this; + } + + public ParquetReadBuilder callInit() { + this.callInit = true; + return this; + } + + @Override + protected ReadSupport getReadSupport() { + return new ParquetReadSupport<>(schema, readSupport, callInit, nameMapping); + } + } + + /** + * Combines several files into one + * + * @param inputFiles an {@link Iterable} of parquet files. The order of iteration determines the + * order in which content of files are read and written to the {@code outputFile} + * @param outputFile the output parquet file containing all the data from {@code inputFiles} + * @param rowGroupSize the row group size to use when writing the {@code outputFile} + * @param schema the schema of the data + * @param metadata extraMetadata to write at the footer of the {@code outputFile} + */ + public static void concat( + Iterable inputFiles, + File outputFile, + int rowGroupSize, + Schema schema, + Map metadata) + throws IOException { + OutputFile file = Files.localOutput(outputFile); + try (ParquetFileWriter writer = + new ParquetFileWriter( + ParquetIO.file(file), + ParquetSchemaUtil.convert(schema, "table"), + ParquetFileWriter.Mode.CREATE, + rowGroupSize, + 0)) { + writer.start(); + for (File inputFile : inputFiles) { + writer.appendFile(ParquetIO.file(Files.localInput(inputFile))); + } + writer.end(metadata); + } + } +} diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/parquet/VariantUtil.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/parquet/VariantUtil.java new file mode 100644 index 000000000000..7d0f0feec8c0 --- /dev/null +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/parquet/VariantUtil.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.parquet; + +import java.util.Map; +import java.util.Optional; +import java.util.function.Supplier; +import org.apache.iceberg.Schema; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.mr.InputFormatConfig; +import org.apache.iceberg.types.Types; +import org.apache.iceberg.variants.Variant; + +public class VariantUtil { + + private VariantUtil() { + } + + /** + * Create a VariantShreddingFunction if variant shredding is enabled and the schema has variant columns. + * + * @param schema The Iceberg schema + * @param sampleRecord A sample record to infer variant schemas from actual data (can be null) + * @param properties Table properties to check if variant shredding is enabled + * @return An Optional containing the VariantShreddingFunction if applicable + */ + public static Optional variantShreddingFunc( + Schema schema, + Supplier sampleRecord, + Map properties) { + + // Preconditions: must have variant columns + property enabled + if (!hasVariantColumns(schema) || !isVariantShreddingEnabled(properties)) { + return Optional.empty(); + } + + VariantShreddingFunction fn = + constructVariantShreddingFunc(sampleRecord.get(), schema); + + return Optional.of(fn); + } + + private static VariantShreddingFunction constructVariantShreddingFunc( + Record sampleRecord, Schema schema) { + + return (id, name) -> { + // Validate the field exists and is a variant type + Types.NestedField field = schema.findField(id); + + if (field == null || !(field.type() instanceof Types.VariantType)) { + return null; // Not a variant field, no shredding + } + + // If we have a sample record, try to generate schema from actual data + if (sampleRecord != null) { + try { + Object variantValue = sampleRecord.getField(name); + if (variantValue instanceof Variant variant) { + // Use ParquetVariantUtil to generate schema from actual variant value + return ParquetVariantUtil.toParquetSchema(variant.value()); + } + } catch (Exception e) { + // Fall through to default schema + } + } + return null; + }; + } + + /** + * Check if the schema contains any variant columns. + */ + private static boolean hasVariantColumns(Schema schema) { + return schema.columns().stream() + .anyMatch(field -> field.type() instanceof Types.VariantType); + } + + /** + * Check if variant shredding is enabled via table properties. + */ + private static boolean isVariantShreddingEnabled(Map properties) { + String shreddingEnabled = properties.get(InputFormatConfig.VARIANT_SHREDDING_ENABLED); + return Boolean.parseBoolean(shreddingEnabled); + } + +} diff --git a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergSelects.java b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergSelects.java index 8a1cf740067b..990d6e782fdb 100644 --- a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergSelects.java +++ b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergSelects.java @@ -22,6 +22,9 @@ import java.io.IOException; import java.util.List; import java.util.stream.Collectors; +import java.util.stream.StreamSupport; +import org.apache.hadoop.fs.Path; +import org.apache.iceberg.DataFile; import org.apache.iceberg.FileFormat; import org.apache.iceberg.Schema; import org.apache.iceberg.Table; @@ -32,11 +35,16 @@ import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.types.Type; import org.apache.iceberg.types.Types; +import org.apache.parquet.hadoop.ParquetFileReader; +import org.apache.parquet.hadoop.util.HadoopInputFile; +import org.apache.parquet.schema.GroupType; +import org.apache.parquet.schema.MessageType; import org.junit.Assert; import org.junit.Test; import static org.apache.iceberg.types.Types.NestedField.optional; import static org.apache.iceberg.types.Types.NestedField.required; +import static org.assertj.core.api.Assertions.assertThat; import static org.junit.Assume.assumeTrue; /** @@ -187,6 +195,68 @@ public void testSpecialCharacters() { Assert.assertArrayEquals(new Object[]{"star", 2L}, result.get(1)); } + @Test + public void testVariantSelectProjection() throws IOException { + assumeTrue(fileFormat == FileFormat.PARQUET); + assumeTrue(!isVectorized); + + TableIdentifier table = TableIdentifier.of("default", "variant_projection"); + shell.executeStatement(String.format("DROP TABLE IF EXISTS %s", table)); + + shell.executeStatement( + String.format( + "CREATE TABLE %s (id INT, payload VARIANT) STORED BY ICEBERG STORED AS %s %s %s", + table, + fileFormat, + testTables.locationForCreateTableSQL(table), + testTables.propertiesForCreateTableSQL( + ImmutableMap.of("format-version", "3", "variant.shredding.enabled", "true")))); + + shell.executeStatement( + String.format( + "INSERT INTO %s VALUES " + + "(1, parse_json('{\"name\":\"Alice\",\"age\":30}'))," + + "(2, parse_json('{\"name\":\"Bob\"}'))", + table)); + + List rows = + shell.executeStatement( + String.format( + "SELECT id, " + + "variant_get(payload, '$.name') AS name, " + + "try_variant_get(payload, '$.age', 'int') AS age " + + "FROM %s ORDER BY id", + table)); + + Assert.assertEquals(2, rows.size()); + Assert.assertEquals(1, ((Number) rows.get(0)[0]).intValue()); + Assert.assertEquals("Alice", rows.get(0)[1]); + Assert.assertEquals(30, ((Number) rows.get(0)[2]).intValue()); + + Assert.assertEquals(2, ((Number) rows.get(1)[0]).intValue()); + Assert.assertEquals("Bob", rows.get(1)[1]); + Assert.assertNull(rows.get(1)[2]); + + Table icebergTable = testTables.loadTable(table); + Types.NestedField variantField = icebergTable.schema().findField("payload"); + Assert.assertNotNull("Variant column should exist", variantField); + DataFile dataFile = + StreamSupport.stream( + icebergTable.currentSnapshot().addedDataFiles(icebergTable.io()).spliterator(), false) + .findFirst() + .orElseThrow(() -> new IllegalStateException("No data files written for test table")); + + Path parquetPath = new Path(dataFile.path().toString()); + try (ParquetFileReader reader = + ParquetFileReader.open(HadoopInputFile.fromPath(parquetPath, shell.getHiveConf()))) { + MessageType parquetSchema = reader.getFooter().getFileMetaData().getSchema(); + GroupType variantType = parquetSchema.getType(variantField.name()).asGroupType(); + assertThat(variantType.containsField("typed_value")).isTrue(); + } + + shell.executeStatement(String.format("DROP TABLE IF EXISTS %s", table)); + } + @Test public void testScanTableCaseInsensitive() throws IOException { testTables.createTable(shell, "customers", diff --git a/iceberg/iceberg-handler/src/test/queries/positive/variant_type_shredding.q b/iceberg/iceberg-handler/src/test/queries/positive/variant_type_shredding.q new file mode 100644 index 000000000000..25d84dd0c0a1 --- /dev/null +++ b/iceberg/iceberg-handler/src/test/queries/positive/variant_type_shredding.q @@ -0,0 +1,39 @@ +-- Mask random uuid +--! qt:replace:/(\s+'uuid'=')\S+('\s*)/$1#Masked#$2/ +-- Mask random snapshot id +--! qt:replace:/('current-snapshot-id'=')\d+/$1#SnapshotId#/ +-- Mask current-snapshot-timestamp-ms +--! qt:replace:/('current-snapshot-timestamp-ms'=')\d+/$1#Masked#/ + +-- SORT_QUERY_RESULTS +set hive.explain.user=false; +set hive.fetch.task.conversion=none; + +drop table if exists tbl_shredded_variant; + +-- Create test table +CREATE EXTERNAL TABLE tbl_shredded_variant ( + id INT, + data VARIANT +) STORED BY ICEBERG +tblproperties( + 'format-version'='3', + 'variant.shredding.enabled'='true' +); + +-- Insert JSON structures +INSERT INTO tbl_shredded_variant VALUES +(1, parse_json('{"name": "John", "age": 30, "active": true}')), +(2, parse_json('{"name": "Bill", "active": false}')), +(3, parse_json('{"name": "Henry", "age": 20}')); + +-- Disable vectorized execution until Variant type is supported +set hive.vectorized.execution.enabled=false; + +-- Retrieve and verify +SELECT id, try_variant_get(data, '$.name') FROM tbl_shredded_variant +WHERE variant_get(data, '$.age') > 25; + +EXPLAIN +SELECT id, try_variant_get(data, '$.name') FROM tbl_shredded_variant +WHERE variant_get(data, '$.age') > 25; diff --git a/iceberg/iceberg-handler/src/test/results/positive/variant_type_shredding.q.out b/iceberg/iceberg-handler/src/test/results/positive/variant_type_shredding.q.out new file mode 100644 index 000000000000..b51bc7495259 --- /dev/null +++ b/iceberg/iceberg-handler/src/test/results/positive/variant_type_shredding.q.out @@ -0,0 +1,101 @@ +PREHOOK: query: drop table if exists tbl_shredded_variant +PREHOOK: type: DROPTABLE +PREHOOK: Output: database:default +POSTHOOK: query: drop table if exists tbl_shredded_variant +POSTHOOK: type: DROPTABLE +POSTHOOK: Output: database:default +PREHOOK: query: CREATE EXTERNAL TABLE tbl_shredded_variant ( + id INT, + data VARIANT +) STORED BY ICEBERG +tblproperties( + 'format-version'='3', + 'variant.shredding.enabled'='true' +) +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@tbl_shredded_variant +POSTHOOK: query: CREATE EXTERNAL TABLE tbl_shredded_variant ( + id INT, + data VARIANT +) STORED BY ICEBERG +tblproperties( + 'format-version'='3', + 'variant.shredding.enabled'='true' +) +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@tbl_shredded_variant +PREHOOK: query: INSERT INTO tbl_shredded_variant VALUES +(1, parse_json('{"name": "John", "age": 30, "active": true}')), +(2, parse_json('{"name": "Bill", "active": false}')), +(3, parse_json('{"name": "Henry", "age": 20}')) +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +PREHOOK: Output: default@tbl_shredded_variant +POSTHOOK: query: INSERT INTO tbl_shredded_variant VALUES +(1, parse_json('{"name": "John", "age": 30, "active": true}')), +(2, parse_json('{"name": "Bill", "active": false}')), +(3, parse_json('{"name": "Henry", "age": 20}')) +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +POSTHOOK: Output: default@tbl_shredded_variant +PREHOOK: query: SELECT id, try_variant_get(data, '$.name') FROM tbl_shredded_variant +WHERE variant_get(data, '$.age') > 25 +PREHOOK: type: QUERY +PREHOOK: Input: default@tbl_shredded_variant +PREHOOK: Output: hdfs://### HDFS PATH ### +POSTHOOK: query: SELECT id, try_variant_get(data, '$.name') FROM tbl_shredded_variant +WHERE variant_get(data, '$.age') > 25 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@tbl_shredded_variant +POSTHOOK: Output: hdfs://### HDFS PATH ### +1 John +PREHOOK: query: EXPLAIN +SELECT id, try_variant_get(data, '$.name') FROM tbl_shredded_variant +WHERE variant_get(data, '$.age') > 25 +PREHOOK: type: QUERY +PREHOOK: Input: default@tbl_shredded_variant +PREHOOK: Output: hdfs://### HDFS PATH ### +POSTHOOK: query: EXPLAIN +SELECT id, try_variant_get(data, '$.name') FROM tbl_shredded_variant +WHERE variant_get(data, '$.age') > 25 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@tbl_shredded_variant +POSTHOOK: Output: hdfs://### HDFS PATH ### +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 depends on stages: Stage-1 + +STAGE PLANS: + Stage: Stage-1 + Tez +#### A masked pattern was here #### + Vertices: + Map 1 + Map Operator Tree: + TableScan + alias: tbl_shredded_variant + filterExpr: (UDFToDouble(variant_get(data, '$.age')) > 25.0D) (type: boolean) + Statistics: Num rows: 3 Data size: 1020 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: (UDFToDouble(variant_get(data, '$.age')) > 25.0D) (type: boolean) + Statistics: Num rows: 1 Data size: 340 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: id (type: int), try_variant_get(data, '$.name') (type: string) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 1 Data size: 340 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + Statistics: Num rows: 1 Data size: 340 Basic stats: COMPLETE Column stats: NONE + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink + diff --git a/iceberg/iceberg-shading/pom.xml b/iceberg/iceberg-shading/pom.xml index 17d50dd91d62..ac4bf4a9b106 100644 --- a/iceberg/iceberg-shading/pom.xml +++ b/iceberg/iceberg-shading/pom.xml @@ -117,6 +117,7 @@ + *:* @@ -127,6 +128,13 @@ static/ + + + org.apache.iceberg:iceberg-parquet + + org/apache/iceberg/parquet/Parquet.class + + diff --git a/serde/src/java/org/apache/hadoop/hive/serde2/variant/VariantBuilder.java b/serde/src/java/org/apache/hadoop/hive/serde2/variant/VariantBuilder.java index 88e9240140dc..3c92879ba0c7 100644 --- a/serde/src/java/org/apache/hadoop/hive/serde2/variant/VariantBuilder.java +++ b/serde/src/java/org/apache/hadoop/hive/serde2/variant/VariantBuilder.java @@ -301,7 +301,7 @@ public int getWritePos() { // - when `allowDuplicateKeys` is true, the field with the greatest offset value (the last // appended one) is kept. // - otherwise, throw an exception. - public void finishWritingObject(int start, ArrayList fields) { + public void finishWritingObject(int start, List fields) { int size = fields.size(); Collections.sort(fields); int maxId = size == 0 ? 0 : fields.getFirst().id;