Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,7 @@ project(':iceberg-flink') {

testCompile project(path: ':iceberg-hive-metastore', configuration: 'testArtifacts')
testCompile project(path: ':iceberg-api', configuration: 'testArtifacts')
testCompile project(path: ':iceberg-core', configuration: 'testArtifacts')
testCompile project(path: ':iceberg-data', configuration: 'testArtifacts')

// By default, hive-exec is a fat/uber jar and it exports a guava library
Expand Down Expand Up @@ -766,6 +767,7 @@ project(':iceberg-spark') {
}
testCompile project(path: ':iceberg-hive-metastore', configuration: 'testArtifacts')
testCompile project(path: ':iceberg-api', configuration: 'testArtifacts')
testCompile project(path: ':iceberg-core', configuration: 'testArtifacts')
testCompile project(path: ':iceberg-data', configuration: 'testArtifacts')
}

Expand Down
80 changes: 80 additions & 0 deletions core/src/main/java/org/apache/iceberg/io/DataWriter.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.io;

import java.io.Closeable;
import java.io.IOException;
import java.nio.ByteBuffer;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DataFiles;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.encryption.EncryptionKeyMetadata;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;

public class DataWriter<T> implements Closeable {
private final FileAppender<T> appender;
private final FileFormat format;
private final String location;
private final PartitionSpec spec;
private final StructLike partition;
private final ByteBuffer keyMetadata;
private DataFile dataFile = null;

public DataWriter(FileAppender<T> appender, FileFormat format, String location,
PartitionSpec spec, StructLike partition, EncryptionKeyMetadata keyMetadata) {
this.appender = appender;
this.format = format;
this.location = location;
this.spec = spec;
this.partition = partition;
this.keyMetadata = keyMetadata != null ? keyMetadata.buffer() : null;
}

public void add(T row) {
appender.add(row);
}

public long length() {
return appender.length();
}

@Override
public void close() throws IOException {
if (dataFile == null) {
appender.close();
this.dataFile = DataFiles.builder(spec)
.withFormat(format)
.withPath(location)
.withPartition(partition)
.withEncryptionKeyMetadata(keyMetadata)
.withFileSizeInBytes(appender.length())
.withMetrics(appender.metrics())
.withSplitOffsets(appender.splitOffsets())
.build();
}
}

public DataFile toDataFile() {
Preconditions.checkState(dataFile != null, "Cannot create data file from unclosed writer");
return dataFile;
}
}
48 changes: 48 additions & 0 deletions core/src/main/java/org/apache/iceberg/io/DeleteSchemaUtil.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.io;

import org.apache.iceberg.MetadataColumns;
import org.apache.iceberg.Schema;
import org.apache.iceberg.types.Types;

public class DeleteSchemaUtil {
private DeleteSchemaUtil() {
}

private static Schema pathPosSchema(Schema rowSchema) {
return new Schema(
MetadataColumns.DELETE_FILE_PATH,
MetadataColumns.DELETE_FILE_POS,
Types.NestedField.required(
MetadataColumns.DELETE_FILE_ROW_FIELD_ID, "row", rowSchema.asStruct(),
MetadataColumns.DELETE_FILE_ROW_DOC));
}

public static Schema pathPosSchema() {
return new Schema(
MetadataColumns.DELETE_FILE_PATH,
MetadataColumns.DELETE_FILE_POS);
}

public static Schema posDeleteSchema(Schema rowSchema) {
return rowSchema == null ? pathPosSchema() : pathPosSchema(rowSchema);
}
}
34 changes: 34 additions & 0 deletions core/src/main/java/org/apache/iceberg/io/FileAppenderFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@
package org.apache.iceberg.io;

import org.apache.iceberg.FileFormat;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.deletes.EqualityDeleteWriter;
import org.apache.iceberg.deletes.PositionDeleteWriter;
import org.apache.iceberg.encryption.EncryptedOutputFile;

/**
* Factory to create a new {@link FileAppender} to write records.
Expand All @@ -36,4 +40,34 @@ public interface FileAppenderFactory<T> {
* @return a newly created {@link FileAppender}
*/
FileAppender<T> newAppender(OutputFile outputFile, FileFormat fileFormat);

/**
* Create a new {@link DataWriter}.
*
* @param outputFile an OutputFile used to create an output stream.
* @param format a file format
* @param partition a tuple of partition values
* @return a newly created {@link DataWriter} for rows
*/
DataWriter<T> newDataWriter(EncryptedOutputFile outputFile, FileFormat format, StructLike partition);

/**
* Create a new {@link EqualityDeleteWriter}.
*
* @param outputFile an OutputFile used to create an output stream.
* @param format a file format
* @param partition a tuple of partition values
* @return a newly created {@link EqualityDeleteWriter} for equality deletes
*/
EqualityDeleteWriter<T> newEqDeleteWriter(EncryptedOutputFile outputFile, FileFormat format, StructLike partition);

/**
* Create a new {@link PositionDeleteWriter}.
*
* @param outputFile an OutputFile used to create an output stream.
* @param format a file format
* @param partition a tuple of partition values
* @return a newly created {@link EqualityDeleteWriter} for position deletes
*/
PositionDeleteWriter<T> newPosDeleteWriter(EncryptedOutputFile outputFile, FileFormat format, StructLike partition);
}
8 changes: 4 additions & 4 deletions core/src/test/java/org/apache/iceberg/TableTestBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public class TableTestBase {
);

// Partition spec used to create tables
static final PartitionSpec SPEC = PartitionSpec.builderFor(SCHEMA)
protected static final PartitionSpec SPEC = PartitionSpec.builderFor(SCHEMA)
.bucket("data", 16)
.build();

Expand Down Expand Up @@ -104,8 +104,8 @@ public class TableTestBase {
@Rule
public TemporaryFolder temp = new TemporaryFolder();

File tableDir = null;
File metadataDir = null;
protected File tableDir = null;
protected File metadataDir = null;
public TestTables.TestTable table = null;

protected final int formatVersion;
Expand Down Expand Up @@ -143,7 +143,7 @@ List<File> listManifestFiles(File tableDirToList) {
!name.startsWith("snap") && Files.getFileExtension(name).equalsIgnoreCase("avro")));
}

TestTables.TestTable create(Schema schema, PartitionSpec spec) {
protected TestTables.TestTable create(Schema schema, PartitionSpec spec) {
return TestTables.create(tableDir, "test", schema, spec, formatVersion);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,22 @@
import java.util.Map;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.MetricsConfig;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.avro.Avro;
import org.apache.iceberg.data.avro.DataWriter;
import org.apache.iceberg.data.orc.GenericOrcWriter;
import org.apache.iceberg.data.parquet.GenericParquetWriter;
import org.apache.iceberg.deletes.EqualityDeleteWriter;
import org.apache.iceberg.deletes.PositionDeleteWriter;
import org.apache.iceberg.encryption.EncryptedOutputFile;
import org.apache.iceberg.io.FileAppender;
import org.apache.iceberg.io.FileAppenderFactory;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.orc.ORC;
import org.apache.iceberg.parquet.Parquet;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;

/**
Expand All @@ -42,10 +48,29 @@
public class GenericAppenderFactory implements FileAppenderFactory<Record> {

private final Schema schema;
private final PartitionSpec spec;
private final int[] equalityFieldIds;
private final Schema eqDeleteRowSchema;
private final Schema posDeleteRowSchema;
private final Map<String, String> config = Maps.newHashMap();

public GenericAppenderFactory(Schema schema) {
this(schema, PartitionSpec.unpartitioned(), null, null, null);
}

public GenericAppenderFactory(Schema schema, PartitionSpec spec) {
this(schema, spec, null, null, null);
}

public GenericAppenderFactory(Schema schema, PartitionSpec spec,
int[] equalityFieldIds,
Schema eqDeleteRowSchema,
Schema posDeleteRowSchema) {
this.schema = schema;
this.spec = spec;
this.equalityFieldIds = equalityFieldIds;
this.eqDeleteRowSchema = eqDeleteRowSchema;
this.posDeleteRowSchema = posDeleteRowSchema;
}

public GenericAppenderFactory set(String property, String value) {
Expand Down Expand Up @@ -89,7 +114,97 @@ public FileAppender<Record> newAppender(OutputFile outputFile, FileFormat fileFo
.build();

default:
throw new UnsupportedOperationException("Cannot write format: " + fileFormat);
throw new UnsupportedOperationException("Cannot write unknown file format: " + fileFormat);
}
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}

@Override
public org.apache.iceberg.io.DataWriter<Record> newDataWriter(EncryptedOutputFile file, FileFormat format,
StructLike partition) {
return new org.apache.iceberg.io.DataWriter<>(
newAppender(file.encryptingOutputFile(), format), format,
file.encryptingOutputFile().location(), spec, partition, file.keyMetadata());
}

@Override
public EqualityDeleteWriter<Record> newEqDeleteWriter(EncryptedOutputFile file, FileFormat format,
StructLike partition) {
Preconditions.checkState(equalityFieldIds != null && equalityFieldIds.length > 0,
"Equality field ids shouldn't be null or empty when creating equality-delete writer");
Preconditions.checkNotNull(eqDeleteRowSchema,
"Equality delete row schema shouldn't be null when creating equality-delete writer");

MetricsConfig metricsConfig = MetricsConfig.fromProperties(config);
try {
switch (format) {
case AVRO:
return Avro.writeDeletes(file.encryptingOutputFile())
.createWriterFunc(DataWriter::create)
.withPartition(partition)
.overwrite()
.setAll(config)
.rowSchema(eqDeleteRowSchema)
.withSpec(spec)
.withKeyMetadata(file.keyMetadata())
.equalityFieldIds(equalityFieldIds)
.buildEqualityWriter();

case PARQUET:
return Parquet.writeDeletes(file.encryptingOutputFile())
.createWriterFunc(GenericParquetWriter::buildWriter)
.withPartition(partition)
.overwrite()
.setAll(config)
.metricsConfig(metricsConfig)
.rowSchema(eqDeleteRowSchema)
.withSpec(spec)
.withKeyMetadata(file.keyMetadata())
.equalityFieldIds(equalityFieldIds)
.buildEqualityWriter();

default:
throw new UnsupportedOperationException(
"Cannot write equality-deletes for unsupported file format: " + format);
}
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}

@Override
public PositionDeleteWriter<Record> newPosDeleteWriter(EncryptedOutputFile file, FileFormat format,
StructLike partition) {
MetricsConfig metricsConfig = MetricsConfig.fromProperties(config);
try {
switch (format) {
case AVRO:
return Avro.writeDeletes(file.encryptingOutputFile())
.createWriterFunc(DataWriter::create)
.withPartition(partition)
.overwrite()
.setAll(config)
.rowSchema(posDeleteRowSchema)
.withSpec(spec)
.withKeyMetadata(file.keyMetadata())
.buildPositionWriter();

case PARQUET:
return Parquet.writeDeletes(file.encryptingOutputFile())
.createWriterFunc(GenericParquetWriter::buildWriter)
.withPartition(partition)
.overwrite()
.setAll(config)
.metricsConfig(metricsConfig)
.rowSchema(posDeleteRowSchema)
.withSpec(spec)
.withKeyMetadata(file.keyMetadata())
.buildPositionWriter();

default:
throw new UnsupportedOperationException("Cannot write pos-deletes for unsupported file format: " + format);
}
} catch (IOException e) {
throw new UncheckedIOException(e);
Expand Down
Loading