Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ private InputFormatConfig() {
public static final String TABLE_CATALOG_PREFIX = "iceberg.mr.table.catalog.";
public static final String LOCALITY = "iceberg.mr.locality";
public static final String WRITE_FANOUT_ENABLED = "write.fanout.enabled";
public static final String VARIANT_SHREDDING_ENABLED = "variant.shredding.enabled";

public static final String CTAS_TABLE_NAME = "iceberg.mr.ctas.table.name";
public static final String SELECTED_COLUMNS = "iceberg.mr.selected.columns";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.apache.iceberg.mr.hive.serde.objectinspector;

import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.List;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.StructField;
Expand Down Expand Up @@ -61,11 +62,13 @@ public Object getStructFieldData(Object data, StructField fieldRef) {

switch (field.getFieldID()) {
case 0: // "metadata" field (binary)
ByteBuffer metadata = ByteBuffer.allocate(variant.metadata().sizeInBytes());
ByteBuffer metadata = ByteBuffer.allocate(variant.metadata().sizeInBytes())
.order(ByteOrder.LITTLE_ENDIAN);
variant.metadata().writeTo(metadata, 0);
return metadata.array();
case 1: // "value" field (binary)
ByteBuffer value = ByteBuffer.allocate(variant.value().sizeInBytes());
ByteBuffer value = ByteBuffer.allocate(variant.value().sizeInBytes())
.order(ByteOrder.LITTLE_ENDIAN);
variant.value().writeTo(value, 0);
return value.array();
default:
Expand All @@ -79,10 +82,12 @@ public List<Object> getStructFieldsDataAsList(Object data) {
return null;
}
Variant variant = (Variant) data;
ByteBuffer metadata = ByteBuffer.allocate(variant.metadata().sizeInBytes());
ByteBuffer metadata = ByteBuffer.allocate(variant.metadata().sizeInBytes())
.order(ByteOrder.LITTLE_ENDIAN);
variant.metadata().writeTo(metadata, 0);

ByteBuffer value = ByteBuffer.allocate(variant.value().sizeInBytes());
ByteBuffer value = ByteBuffer.allocate(variant.value().sizeInBytes())
.order(ByteOrder.LITTLE_ENDIAN);
variant.value().writeTo(value, 0);

// Return the data for our fields in the correct order: metadata, value
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

package org.apache.iceberg.mr.hive.writer;

import java.util.Map;
import java.util.function.Supplier;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.Schema;
import org.apache.iceberg.SortOrder;
Expand All @@ -31,9 +33,13 @@
import org.apache.iceberg.data.parquet.GenericParquetWriter;
import org.apache.iceberg.orc.ORC;
import org.apache.iceberg.parquet.Parquet;
import org.apache.iceberg.parquet.VariantUtil;

class HiveFileWriterFactory extends BaseFileWriterFactory<Record> {

private final Map<String, String> properties;
private Supplier<Record> sampleRecord = null;

HiveFileWriterFactory(
Table table,
FileFormat dataFileFormat,
Expand All @@ -54,6 +60,7 @@ class HiveFileWriterFactory extends BaseFileWriterFactory<Record> {
equalityDeleteRowSchema,
equalityDeleteSortOrder,
positionDeleteRowSchema);
properties = table.properties();
}

static Builder builderFor(Table table) {
Expand All @@ -78,6 +85,9 @@ protected void configurePositionDelete(Avro.DeleteWriteBuilder builder) {
@Override
protected void configureDataWrite(Parquet.DataWriteBuilder builder) {
builder.createWriterFunc(GenericParquetWriter::create);
// Configure variant shredding function if conditions are met:
VariantUtil.variantShreddingFunc(dataSchema(), sampleRecord, properties)
.ifPresent(builder::variantShreddingFunc);
}

@Override
Expand Down Expand Up @@ -149,4 +159,14 @@ HiveFileWriterFactory build() {
positionDeleteRowSchema);
}
}

/**
* Set a sample record to use for data-driven variant shredding schema generation.
* Should be called before the Parquet writer is created.
*/
public void initialize(Supplier<Record> record) {
if (sampleRecord == null) {
sampleRecord = record;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It only needs to initialize when the sampleRecord is null? Wouldn't be easier just to always initialize? Maybe there is a special place for caller to handle this.

Copy link
Member Author

@deniskuzZ deniskuzZ Dec 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It captures the first record being written and stores it in sampleRecord. the same strategy is applied in Spark to perform variant shredding.

Comment on lines +168 to +169
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thinking about this, we are taking the first record. Can it lead to Task-level non-determinism, like one insert lead to multiple inserts & each task captures its own first record & schema.

but i think there wasn't a better way, maybe in some later world we allow the user itself to define the columns to be shredded

Copy link
Member Author

@deniskuzZ deniskuzZ Dec 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, something like this, an explicit shredded schema would be a better solution

}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,35 +21,48 @@

import java.io.IOException;
import java.util.List;
import java.util.Set;
import org.apache.hadoop.io.Writable;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DataFiles;
import org.apache.iceberg.Table;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.deletes.PositionDelete;
import org.apache.iceberg.hive.HiveSchemaUtil;
import org.apache.iceberg.io.DataWriteResult;
import org.apache.iceberg.io.OutputFileFactory;
import org.apache.iceberg.mr.hive.FilesForCommit;
import org.apache.iceberg.mr.hive.IcebergAcidUtil;
import org.apache.iceberg.mr.hive.writer.WriterBuilder.Context;
import org.apache.iceberg.mr.mapred.Container;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.types.Types;

class HiveIcebergCopyOnWriteRecordWriter extends HiveIcebergWriterBase {

private final int currentSpecId;
private final Set<String> missingColumns;
private final List<Types.NestedField> missingOrStructFields;

private final GenericRecord rowDataTemplate;
private final List<DataFile> replacedDataFiles;

private final HiveFileWriterFactory fileWriterFactory;

HiveIcebergCopyOnWriteRecordWriter(Table table, HiveFileWriterFactory writerFactory,
OutputFileFactory deleteFileFactory, Context context) {
super(table, newDataWriter(table, writerFactory, deleteFileFactory, context));

this.currentSpecId = table.spec().specId();
this.rowDataTemplate = GenericRecord.create(table.schema());
this.replacedDataFiles = Lists.newArrayList();

this.missingColumns = context.missingColumns();
this.missingOrStructFields = specs.get(currentSpecId).schema().asStruct().fields().stream()
.filter(field -> missingColumns.contains(field.name()) || field.type().isStructType())
.toList();
this.fileWriterFactory = writerFactory;
}

@Override
Expand All @@ -69,6 +82,8 @@ public void write(Writable row) throws IOException {
.build();
replacedDataFiles.add(dataFile);
} else {
HiveSchemaUtil.setDefaultValues(rowData, missingOrStructFields, missingColumns);
fileWriterFactory.initialize(() -> rowData);
writer.write(rowData, specs.get(currentSpecId), partition(rowData, currentSpecId));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,25 +40,28 @@ class HiveIcebergRecordWriter extends HiveIcebergWriterBase {
private final Set<String> missingColumns;
private final List<Types.NestedField> missingOrStructFields;

private final HiveFileWriterFactory fileWriterFactory;

HiveIcebergRecordWriter(Table table, HiveFileWriterFactory fileWriterFactory,
OutputFileFactory dataFileFactory, Context context) {
super(table, newDataWriter(table, fileWriterFactory, dataFileFactory, context));

this.currentSpecId = table.spec().specId();
this.missingColumns = context.missingColumns();
this.missingOrStructFields = specs.get(currentSpecId).schema().asStruct().fields().stream()
.filter(field -> missingColumns.contains(field.name()) || field.type().isStructType()).toList();
.filter(field -> missingColumns.contains(field.name()) || field.type().isStructType())
.toList();
this.fileWriterFactory = fileWriterFactory;
}

@Override
public void write(Writable row) throws IOException {
Record record = ((Container<Record>) row).get();
HiveSchemaUtil.setDefaultValues(record, missingOrStructFields, missingColumns);

fileWriterFactory.initialize(() -> record);
writer.write(record, specs.get(currentSpecId), partition(record, currentSpecId));
}


@Override
public FilesForCommit files() {
List<DataFile> dataFiles = ((DataWriteResult) writer.result()).dataFiles();
Expand Down
Loading