diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/HiveIcebergDeleteWriter.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/HiveIcebergDeleteWriter.java index b95ba910b63c..bd61f101cd95 100644 --- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/HiveIcebergDeleteWriter.java +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/HiveIcebergDeleteWriter.java @@ -41,13 +41,15 @@ class HiveIcebergDeleteWriter extends HiveIcebergWriterBase { private final GenericRecord rowDataTemplate; + private final boolean skipRowData; HiveIcebergDeleteWriter(Schema schema, Map specs, FileWriterFactory writerFactory, OutputFileFactory fileFactory, FileIO io, - long targetFileSize) { + long targetFileSize, boolean skipRowData) { super(schema, specs, io, new ClusteredPositionDeleteWriter<>(writerFactory, fileFactory, io, targetFileSize)); rowDataTemplate = GenericRecord.create(schema); + this.skipRowData = skipRowData; } @Override @@ -55,7 +57,12 @@ public void write(Writable row) throws IOException { Record rec = ((Container) row).get(); PositionDelete positionDelete = IcebergAcidUtil.getPositionDelete(rec, rowDataTemplate); int specId = IcebergAcidUtil.parseSpecId(rec); - writer.write(positionDelete, specs.get(specId), partition(positionDelete.row(), specId)); + Record rowData = positionDelete.row(); + if (skipRowData) { + // Set null as the row data as we intend to avoid writing the actual row data in the delete file. + positionDelete.set(positionDelete.path(), positionDelete.pos(), null); + } + writer.write(positionDelete, specs.get(specId), partition(rowData, specId)); } @Override diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/WriterBuilder.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/WriterBuilder.java index 64b4cdd32f20..31073f640186 100644 --- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/WriterBuilder.java +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/WriterBuilder.java @@ -47,6 +47,10 @@ public class WriterBuilder { // A task may write multiple output files using multiple writers. Each of them must have a unique operationId. private static AtomicInteger operationNum = new AtomicInteger(0); + // To specify whether to write the actual row data while writing the delete files. + public static final String ICEBERG_DELETE_SKIPROWDATA = "iceberg.delete.skiprowdata"; + public static final String ICEBERG_DELETE_SKIPROWDATA_DEFAULT = "true"; + private WriterBuilder(Table table) { this.table = table; } @@ -92,6 +96,9 @@ public HiveIcebergWriter build() { long targetFileSize = PropertyUtil.propertyAsLong(table.properties(), TableProperties.WRITE_TARGET_FILE_SIZE_BYTES, TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT); + boolean skipRowData = + Boolean.parseBoolean(properties.getOrDefault(ICEBERG_DELETE_SKIPROWDATA, ICEBERG_DELETE_SKIPROWDATA_DEFAULT)); + Schema dataSchema = table.schema(); FileIO io = table.io(); Map specs = table.specs(); @@ -109,14 +116,15 @@ public HiveIcebergWriter build() { .operationId("delete-" + operationId) .build(); - HiveFileWriterFactory writerFactory = new HiveFileWriterFactory(table, dataFileFormat, dataSchema, null, - deleteFileFormat, null, null, null, dataSchema); + HiveFileWriterFactory writerFactory = + new HiveFileWriterFactory(table, dataFileFormat, dataSchema, null, deleteFileFormat, null, null, null, + skipRowData ? null : dataSchema); HiveIcebergWriter writer; switch (operation) { case DELETE: writer = new HiveIcebergDeleteWriter(dataSchema, specs, writerFactory, deleteOutputFileFactory, - io, targetFileSize); + io, targetFileSize, skipRowData); break; case OTHER: writer = new HiveIcebergRecordWriter(dataSchema, specs, currentSpecId, writerFactory, outputFileFactory, diff --git a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/writer/HiveIcebergWriterTestBase.java b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/writer/HiveIcebergWriterTestBase.java index 1a818a131a04..bd3d3aefce23 100644 --- a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/writer/HiveIcebergWriterTestBase.java +++ b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/writer/HiveIcebergWriterTestBase.java @@ -22,6 +22,7 @@ import java.io.File; import java.io.IOException; import java.util.Collection; +import java.util.Collections; import java.util.List; import java.util.Set; import org.apache.hadoop.hive.conf.HiveConf; @@ -83,16 +84,24 @@ public class HiveIcebergWriterTestBase { @Parameterized.Parameter(1) public boolean partitioned; - @Parameterized.Parameters(name = "fileFormat={0}, partitioned={1}") + @Parameterized.Parameter(2) + public boolean skipRowData; + + @Parameterized.Parameters(name = "fileFormat={0}, partitioned={1}, skipRowData={2}") public static Collection parameters() { return Lists.newArrayList(new Object[][] { - { FileFormat.PARQUET, true }, - { FileFormat.ORC, true }, - { FileFormat.AVRO, true }, - { FileFormat.PARQUET, false }, + { FileFormat.PARQUET, true, true }, + { FileFormat.ORC, true, true }, + { FileFormat.AVRO, true, true }, + { FileFormat.PARQUET, false, true }, + { FileFormat.PARQUET, true, false }, + { FileFormat.ORC, true, false }, + { FileFormat.AVRO, true, false }, + { FileFormat.PARQUET, false, false }, // Skip this until the ORC reader is fixed - test only issue // { FileFormat.ORC, false }, - { FileFormat.AVRO, false } + { FileFormat.AVRO, false, true }, + { FileFormat.AVRO, false, false } }); } @@ -105,7 +114,8 @@ public void init() throws IOException { PartitionSpec.builderFor(SCHEMA) .bucket("data", 3) .build(); - this.helper = new TestHelper(new HiveConf(), tables, location.toString(), SCHEMA, spec, fileFormat, temp); + this.helper = new TestHelper(new HiveConf(), tables, location.toString(), SCHEMA, spec, fileFormat, + Collections.singletonMap(WriterBuilder.ICEBERG_DELETE_SKIPROWDATA, String.valueOf(skipRowData)), temp); this.table = helper.createTable(); helper.appendToTable(RECORDS); diff --git a/iceberg/iceberg-handler/src/test/queries/positive/iceberg_v2_deletes.q b/iceberg/iceberg-handler/src/test/queries/positive/iceberg_v2_deletes.q new file mode 100644 index 000000000000..43a1a8033caf --- /dev/null +++ b/iceberg/iceberg-handler/src/test/queries/positive/iceberg_v2_deletes.q @@ -0,0 +1,83 @@ +-- Mask random uuid +--! qt:replace:/(\s+'uuid'=')\S+('\s*)/$1#Masked#$2/ + +-- create an unpartitioned table with skip delete data set to false + create table ice01 (id int) Stored by Iceberg stored as ORC + TBLPROPERTIES('format-version'='2', 'iceberg.delete.skiprowdata'='false'); + +-- check the property value +show create table ice01; + +-- insert some values +insert into ice01 values (1),(2),(3),(4); + +-- check the inserted values +select * from ice01; + +-- delete some values +delete from ice01 where id>2; + +-- check the values, the delete value should be there +select * from ice01 order by id; + +-- insert some more data + insert into ice01 values (5),(6),(7),(8); + +-- check the values, only the delete value shouldn't be there +select * from ice01 order by id; + +-- delete one value +delete from ice01 where id=7; + +-- change to skip the row data now +Alter table ice01 set TBLPROPERTIES('iceberg.delete.skiprowdata'='true'); + +-- check the property value +show create table ice01; + +-- delete some more rows now +delete from ice01 where id=5; + +-- check the entries, the deleted entries shouldn't be there. +select * from ice01 order by id; + +-- create a partitioned table with skip row data set to false + create table icepart01 (id int) partitioned by (part int) Stored by Iceberg stored as ORC + TBLPROPERTIES('format-version'='2', 'iceberg.delete.skiprowdata'='false'); + +-- insert some values +insert into icepart01 values (1,1),(2,1),(3,2),(4,2); + +-- check the inserted values +select * from icepart01 order by id;; + +-- delete some values +delete from icepart01 where id>=2 AND id<4; + +-- check the values, the delete value should be there +select * from icepart01; + +-- insert some more data + insert into icepart01 values (5,1),(6,2),(7,1),(8,2); + +-- check the values, only the delete value shouldn't be there +select * from icepart01 order by id; + +-- delete one value +delete from icepart01 where id=7; + +-- change to skip the row data now +Alter table icepart01 set TBLPROPERTIES('iceberg.delete.skiprowdata'='true'); + +-- check the property value +show create table icepart01; + +-- delete some more rows now +delete from icepart01 where id=5; + +-- check the entries, the deleted entries shouldn't be there. +select * from icepart01 order by id;; + +-- clean up +drop table ice01; +drop table icepart01; \ No newline at end of file diff --git a/iceberg/iceberg-handler/src/test/results/positive/iceberg_v2_deletes.q.out b/iceberg/iceberg-handler/src/test/results/positive/iceberg_v2_deletes.q.out new file mode 100644 index 000000000000..bf7bb4d4bc78 --- /dev/null +++ b/iceberg/iceberg-handler/src/test/results/positive/iceberg_v2_deletes.q.out @@ -0,0 +1,325 @@ +PREHOOK: query: create table ice01 (id int) Stored by Iceberg stored as ORC + TBLPROPERTIES('format-version'='2', 'iceberg.delete.skiprowdata'='false') +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@ice01 +POSTHOOK: query: create table ice01 (id int) Stored by Iceberg stored as ORC + TBLPROPERTIES('format-version'='2', 'iceberg.delete.skiprowdata'='false') +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@ice01 +PREHOOK: query: show create table ice01 +PREHOOK: type: SHOW_CREATETABLE +PREHOOK: Input: default@ice01 +POSTHOOK: query: show create table ice01 +POSTHOOK: type: SHOW_CREATETABLE +POSTHOOK: Input: default@ice01 +CREATE TABLE `ice01`( + `id` int) +ROW FORMAT SERDE + 'org.apache.iceberg.mr.hive.HiveIcebergSerDe' +STORED BY + 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler' + +LOCATION + 'hdfs://### HDFS PATH ###' +TBLPROPERTIES ( + 'bucketing_version'='2', + 'engine.hive.enabled'='true', + 'format-version'='2', + 'iceberg.delete.skiprowdata'='false', + 'iceberg.orc.files.only'='true', + 'metadata_location'='hdfs://### HDFS PATH ###', + 'serialization.format'='1', + 'table_type'='ICEBERG', +#### A masked pattern was here #### + 'uuid'='#Masked#', + 'write.delete.mode'='merge-on-read', + 'write.format.default'='orc', + 'write.merge.mode'='merge-on-read', + 'write.update.mode'='merge-on-read') +PREHOOK: query: insert into ice01 values (1),(2),(3),(4) +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +PREHOOK: Output: default@ice01 +POSTHOOK: query: insert into ice01 values (1),(2),(3),(4) +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +POSTHOOK: Output: default@ice01 +PREHOOK: query: select * from ice01 +PREHOOK: type: QUERY +PREHOOK: Input: default@ice01 +PREHOOK: Output: hdfs://### HDFS PATH ### +POSTHOOK: query: select * from ice01 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@ice01 +POSTHOOK: Output: hdfs://### HDFS PATH ### +1 +2 +3 +4 +PREHOOK: query: delete from ice01 where id>2 +PREHOOK: type: QUERY +PREHOOK: Input: default@ice01 +PREHOOK: Output: default@ice01 +POSTHOOK: query: delete from ice01 where id>2 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@ice01 +POSTHOOK: Output: default@ice01 +PREHOOK: query: select * from ice01 order by id +PREHOOK: type: QUERY +PREHOOK: Input: default@ice01 +PREHOOK: Output: hdfs://### HDFS PATH ### +POSTHOOK: query: select * from ice01 order by id +POSTHOOK: type: QUERY +POSTHOOK: Input: default@ice01 +POSTHOOK: Output: hdfs://### HDFS PATH ### +1 +2 +PREHOOK: query: insert into ice01 values (5),(6),(7),(8) +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +PREHOOK: Output: default@ice01 +POSTHOOK: query: insert into ice01 values (5),(6),(7),(8) +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +POSTHOOK: Output: default@ice01 +PREHOOK: query: select * from ice01 order by id +PREHOOK: type: QUERY +PREHOOK: Input: default@ice01 +PREHOOK: Output: hdfs://### HDFS PATH ### +POSTHOOK: query: select * from ice01 order by id +POSTHOOK: type: QUERY +POSTHOOK: Input: default@ice01 +POSTHOOK: Output: hdfs://### HDFS PATH ### +1 +2 +5 +6 +7 +8 +PREHOOK: query: delete from ice01 where id=7 +PREHOOK: type: QUERY +PREHOOK: Input: default@ice01 +PREHOOK: Output: default@ice01 +POSTHOOK: query: delete from ice01 where id=7 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@ice01 +POSTHOOK: Output: default@ice01 +PREHOOK: query: Alter table ice01 set TBLPROPERTIES('iceberg.delete.skiprowdata'='true') +PREHOOK: type: ALTERTABLE_PROPERTIES +PREHOOK: Input: default@ice01 +PREHOOK: Output: default@ice01 +POSTHOOK: query: Alter table ice01 set TBLPROPERTIES('iceberg.delete.skiprowdata'='true') +POSTHOOK: type: ALTERTABLE_PROPERTIES +POSTHOOK: Input: default@ice01 +POSTHOOK: Output: default@ice01 +PREHOOK: query: show create table ice01 +PREHOOK: type: SHOW_CREATETABLE +PREHOOK: Input: default@ice01 +POSTHOOK: query: show create table ice01 +POSTHOOK: type: SHOW_CREATETABLE +POSTHOOK: Input: default@ice01 +CREATE TABLE `ice01`( + `id` int) +ROW FORMAT SERDE + 'org.apache.iceberg.mr.hive.HiveIcebergSerDe' +STORED BY + 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler' + +LOCATION + 'hdfs://### HDFS PATH ###' +TBLPROPERTIES ( + 'bucketing_version'='2', + 'engine.hive.enabled'='true', + 'format-version'='2', + 'iceberg.delete.skiprowdata'='true', + 'iceberg.orc.files.only'='true', +#### A masked pattern was here #### + 'metadata_location'='hdfs://### HDFS PATH ###', + 'previous_metadata_location'='hdfs://### HDFS PATH ###', + 'serialization.format'='1', + 'table_type'='ICEBERG', +#### A masked pattern was here #### + 'uuid'='#Masked#', + 'write.delete.mode'='merge-on-read', + 'write.format.default'='orc', + 'write.merge.mode'='merge-on-read', + 'write.update.mode'='merge-on-read') +PREHOOK: query: delete from ice01 where id=5 +PREHOOK: type: QUERY +PREHOOK: Input: default@ice01 +PREHOOK: Output: default@ice01 +POSTHOOK: query: delete from ice01 where id=5 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@ice01 +POSTHOOK: Output: default@ice01 +PREHOOK: query: select * from ice01 order by id +PREHOOK: type: QUERY +PREHOOK: Input: default@ice01 +PREHOOK: Output: hdfs://### HDFS PATH ### +POSTHOOK: query: select * from ice01 order by id +POSTHOOK: type: QUERY +POSTHOOK: Input: default@ice01 +POSTHOOK: Output: hdfs://### HDFS PATH ### +1 +2 +6 +8 +PREHOOK: query: create table icepart01 (id int) partitioned by (part int) Stored by Iceberg stored as ORC + TBLPROPERTIES('format-version'='2', 'iceberg.delete.skiprowdata'='false') +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@icepart01 +POSTHOOK: query: create table icepart01 (id int) partitioned by (part int) Stored by Iceberg stored as ORC + TBLPROPERTIES('format-version'='2', 'iceberg.delete.skiprowdata'='false') +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@icepart01 +PREHOOK: query: insert into icepart01 values (1,1),(2,1),(3,2),(4,2) +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +PREHOOK: Output: default@icepart01 +POSTHOOK: query: insert into icepart01 values (1,1),(2,1),(3,2),(4,2) +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +POSTHOOK: Output: default@icepart01 +PREHOOK: query: select * from icepart01 order by id +PREHOOK: type: QUERY +PREHOOK: Input: default@icepart01 +PREHOOK: Output: hdfs://### HDFS PATH ### +POSTHOOK: query: select * from icepart01 order by id +POSTHOOK: type: QUERY +POSTHOOK: Input: default@icepart01 +POSTHOOK: Output: hdfs://### HDFS PATH ### +1 1 +2 1 +3 2 +4 2 +PREHOOK: query: delete from icepart01 where id>=2 AND id<4 +PREHOOK: type: QUERY +PREHOOK: Input: default@icepart01 +PREHOOK: Output: default@icepart01 +POSTHOOK: query: delete from icepart01 where id>=2 AND id<4 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@icepart01 +POSTHOOK: Output: default@icepart01 +PREHOOK: query: select * from icepart01 +PREHOOK: type: QUERY +PREHOOK: Input: default@icepart01 +PREHOOK: Output: hdfs://### HDFS PATH ### +POSTHOOK: query: select * from icepart01 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@icepart01 +POSTHOOK: Output: hdfs://### HDFS PATH ### +1 1 +4 2 +PREHOOK: query: insert into icepart01 values (5,1),(6,2),(7,1),(8,2) +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +PREHOOK: Output: default@icepart01 +POSTHOOK: query: insert into icepart01 values (5,1),(6,2),(7,1),(8,2) +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +POSTHOOK: Output: default@icepart01 +PREHOOK: query: select * from icepart01 order by id +PREHOOK: type: QUERY +PREHOOK: Input: default@icepart01 +PREHOOK: Output: hdfs://### HDFS PATH ### +POSTHOOK: query: select * from icepart01 order by id +POSTHOOK: type: QUERY +POSTHOOK: Input: default@icepart01 +POSTHOOK: Output: hdfs://### HDFS PATH ### +1 1 +4 2 +5 1 +6 2 +7 1 +8 2 +PREHOOK: query: delete from icepart01 where id=7 +PREHOOK: type: QUERY +PREHOOK: Input: default@icepart01 +PREHOOK: Output: default@icepart01 +POSTHOOK: query: delete from icepart01 where id=7 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@icepart01 +POSTHOOK: Output: default@icepart01 +PREHOOK: query: Alter table icepart01 set TBLPROPERTIES('iceberg.delete.skiprowdata'='true') +PREHOOK: type: ALTERTABLE_PROPERTIES +PREHOOK: Input: default@icepart01 +PREHOOK: Output: default@icepart01 +POSTHOOK: query: Alter table icepart01 set TBLPROPERTIES('iceberg.delete.skiprowdata'='true') +POSTHOOK: type: ALTERTABLE_PROPERTIES +POSTHOOK: Input: default@icepart01 +POSTHOOK: Output: default@icepart01 +PREHOOK: query: show create table icepart01 +PREHOOK: type: SHOW_CREATETABLE +PREHOOK: Input: default@icepart01 +POSTHOOK: query: show create table icepart01 +POSTHOOK: type: SHOW_CREATETABLE +POSTHOOK: Input: default@icepart01 +CREATE TABLE `icepart01`( + `id` int, + `part` int) +PARTITIONED BY SPEC ( +part) +ROW FORMAT SERDE + 'org.apache.iceberg.mr.hive.HiveIcebergSerDe' +STORED BY + 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler' + +LOCATION + 'hdfs://### HDFS PATH ###' +TBLPROPERTIES ( + 'bucketing_version'='2', + 'engine.hive.enabled'='true', + 'format-version'='2', + 'iceberg.delete.skiprowdata'='true', + 'iceberg.orc.files.only'='true', +#### A masked pattern was here #### + 'metadata_location'='hdfs://### HDFS PATH ###', + 'previous_metadata_location'='hdfs://### HDFS PATH ###', + 'serialization.format'='1', + 'table_type'='ICEBERG', +#### A masked pattern was here #### + 'uuid'='#Masked#', + 'write.delete.mode'='merge-on-read', + 'write.format.default'='orc', + 'write.merge.mode'='merge-on-read', + 'write.update.mode'='merge-on-read') +PREHOOK: query: delete from icepart01 where id=5 +PREHOOK: type: QUERY +PREHOOK: Input: default@icepart01 +PREHOOK: Output: default@icepart01 +POSTHOOK: query: delete from icepart01 where id=5 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@icepart01 +POSTHOOK: Output: default@icepart01 +PREHOOK: query: select * from icepart01 order by id +PREHOOK: type: QUERY +PREHOOK: Input: default@icepart01 +PREHOOK: Output: hdfs://### HDFS PATH ### +POSTHOOK: query: select * from icepart01 order by id +POSTHOOK: type: QUERY +POSTHOOK: Input: default@icepart01 +POSTHOOK: Output: hdfs://### HDFS PATH ### +1 1 +4 2 +6 2 +8 2 +PREHOOK: query: drop table ice01 +PREHOOK: type: DROPTABLE +PREHOOK: Input: default@ice01 +PREHOOK: Output: default@ice01 +POSTHOOK: query: drop table ice01 +POSTHOOK: type: DROPTABLE +POSTHOOK: Input: default@ice01 +POSTHOOK: Output: default@ice01 +PREHOOK: query: drop table icepart01 +PREHOOK: type: DROPTABLE +PREHOOK: Input: default@icepart01 +PREHOOK: Output: default@icepart01 +POSTHOOK: query: drop table icepart01 +POSTHOOK: type: DROPTABLE +POSTHOOK: Input: default@icepart01 +POSTHOOK: Output: default@icepart01