Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -41,21 +41,28 @@
class HiveIcebergDeleteWriter extends HiveIcebergWriterBase {

private final GenericRecord rowDataTemplate;
private final boolean skipRowData;

HiveIcebergDeleteWriter(Schema schema, Map<Integer, PartitionSpec> specs,
FileWriterFactory<Record> writerFactory, OutputFileFactory fileFactory, FileIO io,
long targetFileSize) {
long targetFileSize, boolean skipRowData) {
super(schema, specs, io,
new ClusteredPositionDeleteWriter<>(writerFactory, fileFactory, io, targetFileSize));
rowDataTemplate = GenericRecord.create(schema);
this.skipRowData = skipRowData;
}

@Override
public void write(Writable row) throws IOException {
Record rec = ((Container<Record>) row).get();
PositionDelete<Record> positionDelete = IcebergAcidUtil.getPositionDelete(rec, rowDataTemplate);
int specId = IcebergAcidUtil.parseSpecId(rec);
writer.write(positionDelete, specs.get(specId), partition(positionDelete.row(), specId));
Record rowData = positionDelete.row();
if (skipRowData) {
// Set null as the row data as we intend to avoid writing the actual row data in the delete file.
positionDelete.set(positionDelete.path(), positionDelete.pos(), null);
}
writer.write(positionDelete, specs.get(specId), partition(rowData, specId));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@ public class WriterBuilder {
// A task may write multiple output files using multiple writers. Each of them must have a unique operationId.
private static AtomicInteger operationNum = new AtomicInteger(0);

// To specify whether to write the actual row data while writing the delete files.
public static final String ICEBERG_DELETE_SKIPROWDATA = "iceberg.delete.skiprowdata";
public static final String ICEBERG_DELETE_SKIPROWDATA_DEFAULT = "true";

private WriterBuilder(Table table) {
this.table = table;
}
Expand Down Expand Up @@ -92,6 +96,9 @@ public HiveIcebergWriter build() {
long targetFileSize = PropertyUtil.propertyAsLong(table.properties(), TableProperties.WRITE_TARGET_FILE_SIZE_BYTES,
TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT);

boolean skipRowData =
Boolean.parseBoolean(properties.getOrDefault(ICEBERG_DELETE_SKIPROWDATA, ICEBERG_DELETE_SKIPROWDATA_DEFAULT));

Schema dataSchema = table.schema();
FileIO io = table.io();
Map<Integer, PartitionSpec> specs = table.specs();
Expand All @@ -109,14 +116,15 @@ public HiveIcebergWriter build() {
.operationId("delete-" + operationId)
.build();

HiveFileWriterFactory writerFactory = new HiveFileWriterFactory(table, dataFileFormat, dataSchema, null,
deleteFileFormat, null, null, null, dataSchema);
HiveFileWriterFactory writerFactory =
new HiveFileWriterFactory(table, dataFileFormat, dataSchema, null, deleteFileFormat, null, null, null,
skipRowData ? null : dataSchema);

HiveIcebergWriter writer;
switch (operation) {
case DELETE:
writer = new HiveIcebergDeleteWriter(dataSchema, specs, writerFactory, deleteOutputFileFactory,
io, targetFileSize);
io, targetFileSize, skipRowData);
break;
case OTHER:
writer = new HiveIcebergRecordWriter(dataSchema, specs, currentSpecId, writerFactory, outputFileFactory,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.io.File;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import org.apache.hadoop.hive.conf.HiveConf;
Expand Down Expand Up @@ -83,16 +84,24 @@ public class HiveIcebergWriterTestBase {
@Parameterized.Parameter(1)
public boolean partitioned;

@Parameterized.Parameters(name = "fileFormat={0}, partitioned={1}")
@Parameterized.Parameter(2)
public boolean skipRowData;

@Parameterized.Parameters(name = "fileFormat={0}, partitioned={1}, skipRowData={2}")
public static Collection<Object[]> parameters() {
return Lists.newArrayList(new Object[][] {
{ FileFormat.PARQUET, true },
{ FileFormat.ORC, true },
{ FileFormat.AVRO, true },
{ FileFormat.PARQUET, false },
{ FileFormat.PARQUET, true, true },
{ FileFormat.ORC, true, true },
{ FileFormat.AVRO, true, true },
{ FileFormat.PARQUET, false, true },
{ FileFormat.PARQUET, true, false },
{ FileFormat.ORC, true, false },
{ FileFormat.AVRO, true, false },
{ FileFormat.PARQUET, false, false },
// Skip this until the ORC reader is fixed - test only issue
// { FileFormat.ORC, false },
{ FileFormat.AVRO, false }
{ FileFormat.AVRO, false, true },
{ FileFormat.AVRO, false, false }
});
}

Expand All @@ -105,7 +114,8 @@ public void init() throws IOException {
PartitionSpec.builderFor(SCHEMA)
.bucket("data", 3)
.build();
this.helper = new TestHelper(new HiveConf(), tables, location.toString(), SCHEMA, spec, fileFormat, temp);
this.helper = new TestHelper(new HiveConf(), tables, location.toString(), SCHEMA, spec, fileFormat,
Collections.singletonMap(WriterBuilder.ICEBERG_DELETE_SKIPROWDATA, String.valueOf(skipRowData)), temp);
this.table = helper.createTable();
helper.appendToTable(RECORDS);

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
-- Mask random uuid
--! qt:replace:/(\s+'uuid'=')\S+('\s*)/$1#Masked#$2/

-- create an unpartitioned table with skip delete data set to false
create table ice01 (id int) Stored by Iceberg stored as ORC
TBLPROPERTIES('format-version'='2', 'iceberg.delete.skiprowdata'='false');

-- check the property value
show create table ice01;

-- insert some values
insert into ice01 values (1),(2),(3),(4);

-- check the inserted values
select * from ice01;

-- delete some values
delete from ice01 where id>2;

-- check the values, the delete value should be there
select * from ice01 order by id;

-- insert some more data
insert into ice01 values (5),(6),(7),(8);

-- check the values, only the delete value shouldn't be there
select * from ice01 order by id;

-- delete one value
delete from ice01 where id=7;

-- change to skip the row data now
Alter table ice01 set TBLPROPERTIES('iceberg.delete.skiprowdata'='true');

-- check the property value
show create table ice01;

-- delete some more rows now
delete from ice01 where id=5;

-- check the entries, the deleted entries shouldn't be there.
select * from ice01 order by id;

-- create a partitioned table with skip row data set to false
create table icepart01 (id int) partitioned by (part int) Stored by Iceberg stored as ORC
TBLPROPERTIES('format-version'='2', 'iceberg.delete.skiprowdata'='false');

-- insert some values
insert into icepart01 values (1,1),(2,1),(3,2),(4,2);

-- check the inserted values
select * from icepart01 order by id;;

-- delete some values
delete from icepart01 where id>=2 AND id<4;

-- check the values, the delete value should be there
select * from icepart01;

-- insert some more data
insert into icepart01 values (5,1),(6,2),(7,1),(8,2);

-- check the values, only the delete value shouldn't be there
select * from icepart01 order by id;

-- delete one value
delete from icepart01 where id=7;

-- change to skip the row data now
Alter table icepart01 set TBLPROPERTIES('iceberg.delete.skiprowdata'='true');

-- check the property value
show create table icepart01;

-- delete some more rows now
delete from icepart01 where id=5;

-- check the entries, the deleted entries shouldn't be there.
select * from icepart01 order by id;;

-- clean up
drop table ice01;
drop table icepart01;
Loading