Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/layouts/shortcodes/generated/core_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -518,6 +518,12 @@
<td>Boolean</td>
<td>Whether to ignore delete records.</td>
</tr>
<tr>
<td><h5>ignore-update-before</h5></td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>Whether to ignore update-before records.</td>
</tr>
<tr>
<td><h5>incremental-between</h5></td>
<td style="word-wrap: break-word;">(none)</td>
Expand Down
11 changes: 11 additions & 0 deletions paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -489,6 +489,13 @@ public InlineElement getDescription() {
"partial-update.ignore-delete")
.withDescription("Whether to ignore delete records.");

@Immutable
public static final ConfigOption<Boolean> IGNORE_UPDATE_BEFORE =
key("ignore-update-before")
.booleanType()
.defaultValue(false)
.withDescription("Whether to ignore update-before records.");

public static final ConfigOption<SortEngine> SORT_ENGINE =
key("sort-engine")
.enumType(SortEngine.class)
Expand Down Expand Up @@ -2288,6 +2295,10 @@ public boolean ignoreDelete() {
return options.get(IGNORE_DELETE);
}

public boolean ignoreUpdateBefore() {
return options.get(IGNORE_UPDATE_BEFORE);
}

public SortEngine sortEngine() {
return options.get(SORT_ENGINE);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.utils;

import org.apache.paimon.CoreOptions;
import org.apache.paimon.types.RowKind;

import javax.annotation.Nullable;

/** A class to filter row kinds. */
public class RowKindFilter {

private final boolean ignoreAllRetracts;
private final boolean ignoreUpdateBefore;

public RowKindFilter(boolean ignoreAllRetracts, boolean ignoreUpdateBefore) {
this.ignoreAllRetracts = ignoreAllRetracts;
this.ignoreUpdateBefore = ignoreUpdateBefore;
}

@Nullable
public static RowKindFilter of(CoreOptions options) {
boolean ignoreAllRetracts = options.ignoreDelete();
boolean ignoreUpdateBefore = options.ignoreUpdateBefore();
if (!ignoreAllRetracts && !ignoreUpdateBefore) {
return null;
}
return new RowKindFilter(ignoreAllRetracts, ignoreUpdateBefore);
}

public boolean test(RowKind rowKind) {
switch (rowKind) {
case DELETE:
if (ignoreAllRetracts) {
return false;
}
break;
case UPDATE_BEFORE:
if (ignoreUpdateBefore || ignoreAllRetracts) {
return false;
}
break;
default:
break;
}
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.paimon.table.source.splitread.SplitReadConfig;
import org.apache.paimon.table.source.splitread.SplitReadProvider;
import org.apache.paimon.utils.Preconditions;
import org.apache.paimon.utils.RowKindFilter;

import javax.annotation.Nullable;

Expand Down Expand Up @@ -139,7 +140,7 @@ public TableWriteImpl<InternalRow> newWrite(String commitUser, @Nullable Integer
return record.row();
},
rowKindGenerator(),
CoreOptions.fromMap(tableSchema.options()).ignoreDelete());
RowKindFilter.of(coreOptions()));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.paimon.table.source.MergeTreeSplitGenerator;
import org.apache.paimon.table.source.SplitGenerator;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.RowKindFilter;

import javax.annotation.Nullable;

Expand Down Expand Up @@ -170,7 +171,7 @@ public TableWriteImpl<KeyValue> newWrite(String commitUser, @Nullable Integer wr
rowKind,
record.row()),
rowKindGenerator(),
CoreOptions.fromMap(tableSchema.options()).ignoreDelete());
RowKindFilter.of(coreOptions()));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.paimon.types.RowKind;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.Restorable;
import org.apache.paimon.utils.RowKindFilter;

import javax.annotation.Nullable;

Expand All @@ -59,7 +60,7 @@ public class TableWriteImpl<T> implements InnerTableWrite, Restorable<List<State
private final KeyAndBucketExtractor<InternalRow> keyAndBucketExtractor;
private final RecordExtractor<T> recordExtractor;
@Nullable private final RowKindGenerator rowKindGenerator;
private final boolean ignoreDelete;
@Nullable private final RowKindFilter rowKindFilter;

private boolean batchCommitted = false;
private BucketMode bucketMode;
Expand All @@ -73,13 +74,13 @@ public TableWriteImpl(
KeyAndBucketExtractor<InternalRow> keyAndBucketExtractor,
RecordExtractor<T> recordExtractor,
@Nullable RowKindGenerator rowKindGenerator,
boolean ignoreDelete) {
@Nullable RowKindFilter rowKindFilter) {
this.rowType = rowType;
this.write = write;
this.keyAndBucketExtractor = keyAndBucketExtractor;
this.recordExtractor = recordExtractor;
this.rowKindGenerator = rowKindGenerator;
this.ignoreDelete = ignoreDelete;
this.rowKindFilter = rowKindFilter;

List<String> notNullColumnNames =
rowType.getFields().stream()
Expand Down Expand Up @@ -183,7 +184,7 @@ public SinkRecord writeAndReturn(InternalRow row, int bucket) throws Exception {
checkNullability(row);
row = wrapDefaultValue(row);
RowKind rowKind = RowKindGenerator.getRowKind(rowKindGenerator, row);
if (ignoreDelete && rowKind.isRetract()) {
if (rowKindFilter != null && !rowKindFilter.test(rowKind)) {
return null;
}
SinkRecord record = bucket == -1 ? toSinkRecord(row) : toSinkRecord(row, bucket);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.KeyComparatorSupplier;
import org.apache.paimon.utils.Preconditions;
import org.apache.paimon.utils.RowKindFilter;
import org.apache.paimon.utils.UserDefinedSeqComparator;

import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
Expand All @@ -56,6 +57,8 @@
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;

import javax.annotation.Nullable;

import java.util.List;

import static org.apache.paimon.table.PrimaryKeyTableUtils.addKeyNamePrefix;
Expand All @@ -70,7 +73,7 @@ public class LocalMergeOperator extends AbstractStreamOperator<InternalRow>
private static final long serialVersionUID = 1L;

private final TableSchema schema;
private final boolean ignoreDelete;
private final @Nullable RowKindFilter rowKindFilter;

private transient Projection keyProjection;

Expand All @@ -87,7 +90,7 @@ private LocalMergeOperator(
schema.primaryKeys().size() > 0,
"LocalMergeOperator currently only support tables with primary keys");
this.schema = schema;
this.ignoreDelete = CoreOptions.fromMap(schema.options()).ignoreDelete();
this.rowKindFilter = RowKindFilter.of(CoreOptions.fromMap(schema.options()));
setup(parameters.getContainingTask(), parameters.getStreamConfig(), parameters.getOutput());
}

Expand Down Expand Up @@ -170,7 +173,7 @@ public void processElement(StreamRecord<InternalRow> record) throws Exception {
InternalRow row = record.getValue();

RowKind rowKind = RowKindGenerator.getRowKind(rowKindGenerator, row);
if (ignoreDelete && rowKind.isRetract()) {
if (rowKindFilter != null && !rowKindFilter.test(rowKind)) {
return;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -619,7 +619,7 @@ public void testIgnoreDelete() {
public void testIgnoreDeleteWithRowKindField() {
sql(
"CREATE TABLE ignore_delete (pk INT PRIMARY KEY NOT ENFORCED, v STRING, kind STRING) "
+ "WITH ('merge-engine' = 'deduplicate', 'ignore-delete' = 'true', 'bucket' = '1', 'rowkind.field' = 'kind')");
+ "WITH ('ignore-delete' = 'true', 'bucket' = '1', 'rowkind.field' = 'kind')");

sql("INSERT INTO ignore_delete VALUES (1, 'A', '+I')");
assertThat(sql("SELECT * FROM ignore_delete")).containsExactly(Row.of(1, "A", "+I"));
Expand All @@ -631,6 +631,22 @@ public void testIgnoreDeleteWithRowKindField() {
assertThat(sql("SELECT * FROM ignore_delete")).containsExactly(Row.of(1, "B", "+I"));
}

@Test
public void testIgnoreUpdateBeforeWithRowKindField() {
sql(
"CREATE TABLE ignore_delete (pk INT PRIMARY KEY NOT ENFORCED, v STRING, kind STRING) "
+ "WITH ('ignore-update-before' = 'true', 'bucket' = '1', 'rowkind.field' = 'kind')");

sql("INSERT INTO ignore_delete VALUES (1, 'A', '+I')");
assertThat(sql("SELECT * FROM ignore_delete")).containsExactly(Row.of(1, "A", "+I"));

sql("INSERT INTO ignore_delete VALUES (1, 'A', '-U')");
assertThat(sql("SELECT * FROM ignore_delete")).containsExactly(Row.of(1, "A", "+I"));

sql("INSERT INTO ignore_delete VALUES (1, 'A', '-D')");
assertThat(sql("SELECT * FROM ignore_delete")).isEmpty();
}

@Test
public void testDeleteWithPkLookup() throws Exception {
sql(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,13 @@
import java.util.Random;
import java.util.function.Consumer;

import static org.apache.paimon.CoreOptions.IGNORE_DELETE;
import static org.apache.paimon.CoreOptions.IGNORE_UPDATE_BEFORE;
import static org.apache.paimon.CoreOptions.LOCAL_MERGE_BUFFER_SIZE;
import static org.apache.paimon.CoreOptions.SEQUENCE_FIELD;
import static org.apache.paimon.data.BinaryString.fromString;
import static org.apache.paimon.types.RowKind.DELETE;
import static org.apache.paimon.types.RowKind.UPDATE_BEFORE;
import static org.assertj.core.api.Assertions.assertThat;

class LocalMergeOperatorTest {
Expand Down Expand Up @@ -114,6 +117,50 @@ public void testUserDefineSequence() throws Exception {
result.clear();
}

@Test
public void testIgnoreUpdateBefore() throws Exception {
Map<String, String> options = new HashMap<>();
options.put(IGNORE_UPDATE_BEFORE.key(), "true");
prepareHashOperator(options);

List<String> result = new ArrayList<>();
setOutput(result);

processElement("a", 1);
processElement(UPDATE_BEFORE, "a", 1);
operator.prepareSnapshotPreBarrier(0);
assertThat(result).containsExactlyInAnyOrder("+I:a->1");
result.clear();

processElement("a", 1);
processElement(DELETE, "a", 1);
operator.prepareSnapshotPreBarrier(1);
assertThat(result).containsExactlyInAnyOrder("-D:a->1");
result.clear();
}

@Test
public void testIgnoreDelete() throws Exception {
Map<String, String> options = new HashMap<>();
options.put(IGNORE_DELETE.key(), "true");
prepareHashOperator(options);

List<String> result = new ArrayList<>();
setOutput(result);

processElement("a", 1);
processElement(UPDATE_BEFORE, "a", 1);
operator.prepareSnapshotPreBarrier(0);
assertThat(result).containsExactlyInAnyOrder("+I:a->1");
result.clear();

processElement("a", 1);
processElement(DELETE, "a", 1);
operator.prepareSnapshotPreBarrier(1);
assertThat(result).containsExactlyInAnyOrder("+I:a->1");
result.clear();
}

@Test
public void testHashSpill() throws Exception {
Map<String, String> options = new HashMap<>();
Expand Down
Loading