Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion core/src/main/java/org/apache/iceberg/io/BaseTaskWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,8 @@ protected BaseEqualityDeltaWriter(StructLike partition, Schema schema, Schema de
*/
protected abstract StructLike asStructLike(T data);

protected abstract StructLike asStructLikeKey(T data);

public void write(T row) throws IOException {
PathOffset pathOffset = PathOffset.of(dataWriter.currentPath(), dataWriter.currentRows());

Expand Down Expand Up @@ -167,7 +169,7 @@ public void delete(T row) throws IOException {
* @param key is the projected data whose columns are the same as the equality fields.
*/
public void deleteKey(T key) throws IOException {
if (!internalPosDelete(asStructLike(key))) {
if (!internalPosDelete(asStructLikeKey(key))) {
eqDeleteWriter.write(key);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -479,6 +479,11 @@ private GenericEqualityDeltaWriter(PartitionKey partition, Schema schema, Schema
protected StructLike asStructLike(Record row) {
return row;
}

@Override
protected StructLike asStructLikeKey(Record data) {
return data;
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -450,7 +450,10 @@ private static class RowDataWriter extends ParquetValueWriters.StructWriter<RowD

@Override
protected Object get(RowData struct, int index) {
return fieldGetter[index].getFieldOrNull(struct);
if (struct.isNullAt(index)) {
return null;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did this need to change? NPE in a test?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is an NPE in some test cases, yes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

However, I'm going to investigate a bit furhter as I do think it might be indicative of a bug.

I think if we use the correct deletion schema in all cases, the NPEs will go away. I am testing now.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or actually, I don't think this change is needed. If there's no fieldGetter for a given index, that's likely indicative of a bug.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah this didn't need to be changed if we use the full schema as the deletion schema outside of upsert case like here https://github.com/apache/iceberg/pull/4364/files#diff-bbdfbcbd83d2e6f53d402e804dcd0a6fd28ab39d1cc3f74a88641ae8763fde3bR75-R87

return this.fieldGetter[index].getFieldOrNull(struct);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.flink.FlinkSchemaUtil;
import org.apache.iceberg.flink.RowDataWrapper;
import org.apache.iceberg.flink.data.RowDataProjection;
import org.apache.iceberg.io.BaseTaskWriter;
import org.apache.iceberg.io.FileAppenderFactory;
import org.apache.iceberg.io.FileIO;
Expand All @@ -41,6 +43,7 @@ abstract class BaseDeltaTaskWriter extends BaseTaskWriter<RowData> {
private final Schema schema;
private final Schema deleteSchema;
private final RowDataWrapper wrapper;
private final RowDataWrapper wrapperDelete;
private final boolean upsert;

BaseDeltaTaskWriter(PartitionSpec spec,
Expand All @@ -57,6 +60,7 @@ abstract class BaseDeltaTaskWriter extends BaseTaskWriter<RowData> {
this.schema = schema;
this.deleteSchema = TypeUtil.select(schema, Sets.newHashSet(equalityFieldIds));
this.wrapper = new RowDataWrapper(flinkSchema, schema.asStruct());
this.wrapperDelete = new RowDataWrapper(FlinkSchemaUtil.convert(deleteSchema), deleteSchema.asStruct());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about keyWrapper instead?

this.upsert = upsert;
}

Expand All @@ -66,6 +70,10 @@ RowDataWrapper wrapper() {
return wrapper;
}

RowDataWrapper wrapperDelete() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is there an accessor for this?

return wrapperDelete;
}

@Override
public void write(RowData row) throws IOException {
RowDataDeltaWriter writer = route(row);
Expand All @@ -74,7 +82,8 @@ public void write(RowData row) throws IOException {
case INSERT:
case UPDATE_AFTER:
if (upsert) {
writer.delete(row);
RowData wrap = RowDataProjection.create(schema, deleteSchema).wrap(row);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This shouldn't create a new projection every time. Instead it should create one and reuse it by calling wrap every time.

writer.deleteKey(wrap);
}
writer.write(row);
break;
Expand All @@ -83,10 +92,21 @@ public void write(RowData row) throws IOException {
if (upsert) {
break; // UPDATE_BEFORE is not necessary for UPDATE, we do nothing to prevent delete one row twice
}
writer.delete(row);
if (deleteSchema != null) {
RowData wrap = RowDataProjection.create(schema, deleteSchema).wrap(row);
writer.deleteKey(wrap);
} else {
writer.delete(row);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does this need to change? I think you just want to fix the upsert case.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addtionally, in the upsert case, data doesn't come through as UPDATE_BEFORE. Though this might be needed to keep CDC data in check.

We've been workin on the PR in my fork but I'll run some tests.

break;

case DELETE:
writer.delete(row);
if (deleteSchema != null) {
RowData wrap = RowDataProjection.create(schema, deleteSchema).wrap(row);
writer.deleteKey(wrap);
} else {
writer.delete(row);
}
break;

default:
Expand All @@ -103,5 +123,10 @@ protected class RowDataDeltaWriter extends BaseEqualityDeltaWriter {
protected StructLike asStructLike(RowData data) {
return wrapper.wrap(data);
}

@Override
protected StructLike asStructLikeKey(RowData data) {
return wrapperDelete.wrap(data);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
import org.apache.iceberg.io.TaskWriter;
import org.apache.iceberg.io.UnpartitionedWriter;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.types.TypeUtil;
import org.apache.iceberg.util.ArrayUtil;

public class RowDataTaskWriterFactory implements TaskWriterFactory<RowData> {
Expand Down Expand Up @@ -70,9 +72,8 @@ public RowDataTaskWriterFactory(Table table,
if (equalityFieldIds == null || equalityFieldIds.isEmpty()) {
this.appenderFactory = new FlinkAppenderFactory(schema, flinkSchema, table.properties(), spec);
} else {
// TODO provide the ability to customize the equality-delete row schema.
this.appenderFactory = new FlinkAppenderFactory(schema, flinkSchema, table.properties(), spec,
ArrayUtil.toIntArray(equalityFieldIds), schema, null);
ArrayUtil.toIntArray(equalityFieldIds), TypeUtil.select(schema, Sets.newHashSet(equalityFieldIds)), null);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the correct schema for upsert but should not be used for delete when the row passed to the delete file is the deleted row.

}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -450,7 +450,10 @@ private static class RowDataWriter extends ParquetValueWriters.StructWriter<RowD

@Override
protected Object get(RowData struct, int index) {
return fieldGetter[index].getFieldOrNull(struct);
if (struct.isNullAt(index)) {
return null;
}
return this.fieldGetter[index].getFieldOrNull(struct);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's keep the work to just Flink 1.14 for now, so comments don't get duplicated on many things.

}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.flink.FlinkSchemaUtil;
import org.apache.iceberg.flink.RowDataWrapper;
import org.apache.iceberg.flink.data.RowDataProjection;
import org.apache.iceberg.io.BaseTaskWriter;
import org.apache.iceberg.io.FileAppenderFactory;
import org.apache.iceberg.io.FileIO;
Expand All @@ -41,6 +43,7 @@ abstract class BaseDeltaTaskWriter extends BaseTaskWriter<RowData> {
private final Schema schema;
private final Schema deleteSchema;
private final RowDataWrapper wrapper;
private final RowDataWrapper wrapperDelete;
private final boolean upsert;

BaseDeltaTaskWriter(PartitionSpec spec,
Expand All @@ -57,6 +60,7 @@ abstract class BaseDeltaTaskWriter extends BaseTaskWriter<RowData> {
this.schema = schema;
this.deleteSchema = TypeUtil.select(schema, Sets.newHashSet(equalityFieldIds));
this.wrapper = new RowDataWrapper(flinkSchema, schema.asStruct());
this.wrapperDelete = new RowDataWrapper(FlinkSchemaUtil.convert(deleteSchema), deleteSchema.asStruct());
this.upsert = upsert;
}

Expand All @@ -66,6 +70,10 @@ RowDataWrapper wrapper() {
return wrapper;
}

RowDataWrapper wrapperDelete() {
return wrapperDelete;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you revert the changes in all modules other than 1.14? That makes reviewing and updating for review comments much easier. Once this goes in we can backport to 1.12 and 1.13.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah agreed. I left a similar comment on another file because the changes were a bit much.

It keeps the discussion of a specific change all in one place.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this case, I believe that @hililiwei made the changes as existing tests might not pass in earlier versions of Flink.

But for something important, we should still keep the changes in one PR while reviewing. Otherwise it's difficult for others to review.


@Override
public void write(RowData row) throws IOException {
RowDataDeltaWriter writer = route(row);
Expand All @@ -74,7 +82,8 @@ public void write(RowData row) throws IOException {
case INSERT:
case UPDATE_AFTER:
if (upsert) {
writer.delete(row);
RowData wrap = RowDataProjection.create(schema, deleteSchema).wrap(row);
writer.deleteKey(wrap);
}
writer.write(row);
break;
Expand All @@ -83,10 +92,21 @@ public void write(RowData row) throws IOException {
if (upsert) {
break; // UPDATE_BEFORE is not necessary for UPDATE, we do nothing to prevent delete one row twice
}
writer.delete(row);
if (deleteSchema != null) {
RowData wrap = RowDataProjection.create(schema, deleteSchema).wrap(row);
writer.deleteKey(wrap);
} else {
writer.delete(row);
}
break;

case DELETE:
writer.delete(row);
if (deleteSchema != null) {
RowData wrap = RowDataProjection.create(schema, deleteSchema).wrap(row);
writer.deleteKey(wrap);
} else {
writer.delete(row);
}
break;

default:
Expand All @@ -103,5 +123,10 @@ protected class RowDataDeltaWriter extends BaseEqualityDeltaWriter {
protected StructLike asStructLike(RowData data) {
return wrapper.wrap(data);
}

@Override
protected StructLike asStructLikeKey(RowData data) {
return wrapperDelete.wrap(data);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
import org.apache.iceberg.io.TaskWriter;
import org.apache.iceberg.io.UnpartitionedWriter;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.types.TypeUtil;
import org.apache.iceberg.util.ArrayUtil;

public class RowDataTaskWriterFactory implements TaskWriterFactory<RowData> {
Expand Down Expand Up @@ -72,7 +74,7 @@ public RowDataTaskWriterFactory(Table table,
} else {
// TODO provide the ability to customize the equality-delete row schema.
this.appenderFactory = new FlinkAppenderFactory(schema, flinkSchema, table.properties(), spec,
ArrayUtil.toIntArray(equalityFieldIds), schema, null);
ArrayUtil.toIntArray(equalityFieldIds), TypeUtil.select(schema, Sets.newHashSet(equalityFieldIds)), null);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -450,7 +450,10 @@ private static class RowDataWriter extends ParquetValueWriters.StructWriter<RowD

@Override
protected Object get(RowData struct, int index) {
return fieldGetter[index].getFieldOrNull(struct);
if (struct.isNullAt(index)) {
return null;
}
return this.fieldGetter[index].getFieldOrNull(struct);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.flink.FlinkSchemaUtil;
import org.apache.iceberg.flink.RowDataWrapper;
import org.apache.iceberg.flink.data.RowDataProjection;
import org.apache.iceberg.io.BaseTaskWriter;
import org.apache.iceberg.io.FileAppenderFactory;
import org.apache.iceberg.io.FileIO;
Expand All @@ -41,6 +43,7 @@ abstract class BaseDeltaTaskWriter extends BaseTaskWriter<RowData> {
private final Schema schema;
private final Schema deleteSchema;
private final RowDataWrapper wrapper;
private final RowDataWrapper wrapperDelete;
private final boolean upsert;

BaseDeltaTaskWriter(PartitionSpec spec,
Expand All @@ -57,6 +60,7 @@ abstract class BaseDeltaTaskWriter extends BaseTaskWriter<RowData> {
this.schema = schema;
this.deleteSchema = TypeUtil.select(schema, Sets.newHashSet(equalityFieldIds));
this.wrapper = new RowDataWrapper(flinkSchema, schema.asStruct());
this.wrapperDelete = new RowDataWrapper(FlinkSchemaUtil.convert(deleteSchema), deleteSchema.asStruct());
this.upsert = upsert;
}

Expand All @@ -74,7 +78,8 @@ public void write(RowData row) throws IOException {
case INSERT:
case UPDATE_AFTER:
if (upsert) {
writer.delete(row);
RowData wrap = RowDataProjection.create(schema, deleteSchema).wrap(row);
writer.deleteKey(wrap);
}
writer.write(row);
break;
Expand All @@ -83,10 +88,21 @@ public void write(RowData row) throws IOException {
if (upsert) {
break; // UPDATE_BEFORE is not necessary for UPDATE, we do nothing to prevent delete one row twice
}
writer.delete(row);
if (deleteSchema != null) {
RowData wrap = RowDataProjection.create(schema, deleteSchema).wrap(row);
writer.deleteKey(wrap);
} else {
writer.delete(row);
}
break;

case DELETE:
writer.delete(row);
if (deleteSchema != null) {
RowData wrap = RowDataProjection.create(schema, deleteSchema).wrap(row);
writer.deleteKey(wrap);
} else {
writer.delete(row);
}
break;

default:
Expand All @@ -103,5 +119,10 @@ protected class RowDataDeltaWriter extends BaseEqualityDeltaWriter {
protected StructLike asStructLike(RowData data) {
return wrapper.wrap(data);
}

@Override
protected StructLike asStructLikeKey(RowData data) {
return wrapperDelete.wrap(data);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
import org.apache.iceberg.io.TaskWriter;
import org.apache.iceberg.io.UnpartitionedWriter;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.types.TypeUtil;
import org.apache.iceberg.util.ArrayUtil;

public class RowDataTaskWriterFactory implements TaskWriterFactory<RowData> {
Expand Down Expand Up @@ -70,9 +72,8 @@ public RowDataTaskWriterFactory(Table table,
if (equalityFieldIds == null || equalityFieldIds.isEmpty()) {
this.appenderFactory = new FlinkAppenderFactory(schema, flinkSchema, table.properties(), spec);
} else {
// TODO provide the ability to customize the equality-delete row schema.
this.appenderFactory = new FlinkAppenderFactory(schema, flinkSchema, table.properties(), spec,
ArrayUtil.toIntArray(equalityFieldIds), schema, null);
ArrayUtil.toIntArray(equalityFieldIds), TypeUtil.select(schema, Sets.newHashSet(equalityFieldIds)), null);
}
}

Expand Down
Loading