From 2f832c73bc4bb8aff00234a62055aaf5ef9bb8f5 Mon Sep 17 00:00:00 2001 From: Xiangyu Feng Date: Mon, 6 Jan 2025 10:39:52 +0800 Subject: [PATCH] [core] remove nullable check for record-level.time-field --- .../org/apache/paimon/schema/SchemaValidation.java | 6 ------ .../paimon/catalog/PrimaryKeyTableTestBase.java | 6 +++++- .../apache/paimon/schema/SchemaValidationTest.java | 4 ++-- .../apache/paimon/table/RecordLevelExpireTest.java | 12 +++++++++++- .../table/RecordLevelExpireWithMillisecondTest.java | 11 ++++++++++- .../RecordLevelExpireWithTimestampBaseTest.java | 9 +++++++++ .../table/RecordLevelExpireWithTimestampLTZTest.java | 2 +- .../table/RecordLevelExpireWithTimestampTest.java | 2 +- 8 files changed, 39 insertions(+), 13 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java index 4bddcdd72d7e..f0f9284ed7d6 100644 --- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java +++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java @@ -203,12 +203,6 @@ public static void validateTableSchema(TableSchema schema) { "The record level time field type should be one of INT, BIGINT, or TIMESTAMP, but field type is %s.", dataType)); } - if (dataType.isNullable()) { - throw new IllegalArgumentException( - String.format( - "Time field %s for record-level expire should be not null.", - recordLevelTimeField)); - } } if (options.mergeEngine() == MergeEngine.FIRST_ROW) { diff --git a/paimon-core/src/test/java/org/apache/paimon/catalog/PrimaryKeyTableTestBase.java b/paimon-core/src/test/java/org/apache/paimon/catalog/PrimaryKeyTableTestBase.java index 54152c8965d6..e648699aed86 100644 --- a/paimon-core/src/test/java/org/apache/paimon/catalog/PrimaryKeyTableTestBase.java +++ b/paimon-core/src/test/java/org/apache/paimon/catalog/PrimaryKeyTableTestBase.java @@ -123,7 +123,11 @@ protected List query(int[] projection) throws Exception { r -> { GenericRow newR = new GenericRow(projection.length); for (int i = 0; i < projection.length; i++) { - newR.setField(i, r.getInt(i)); + if (r.isNullAt(i)) { + newR.setField(i, null); + } else { + newR.setField(i, r.getInt(i)); + } } rows.add(newR); }); diff --git a/paimon-core/src/test/java/org/apache/paimon/schema/SchemaValidationTest.java b/paimon-core/src/test/java/org/apache/paimon/schema/SchemaValidationTest.java index 9dbfff200e7b..501b9f588dbc 100644 --- a/paimon-core/src/test/java/org/apache/paimon/schema/SchemaValidationTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/schema/SchemaValidationTest.java @@ -36,6 +36,7 @@ import static org.apache.paimon.CoreOptions.BUCKET; import static org.apache.paimon.CoreOptions.SCAN_SNAPSHOT_ID; import static org.apache.paimon.schema.SchemaValidation.validateTableSchema; +import static org.assertj.core.api.Assertions.assertThatCode; import static org.assertj.core.api.Assertions.assertThatNoException; import static org.assertj.core.api.Assertions.assertThatThrownBy; @@ -110,8 +111,7 @@ public void testRecordLevelTimeField() { Map options = new HashMap<>(2); options.put(CoreOptions.RECORD_LEVEL_TIME_FIELD.key(), "f0"); options.put(CoreOptions.RECORD_LEVEL_EXPIRE_TIME.key(), "1 m"); - assertThatThrownBy(() -> validateTableSchemaExec(options)) - .hasMessageContaining("Time field f0 for record-level expire should be not null"); + assertThatCode(() -> validateTableSchemaExec(options)).doesNotThrowAnyException(); options.put(CoreOptions.RECORD_LEVEL_TIME_FIELD.key(), "f10"); assertThatThrownBy(() -> validateTableSchemaExec(options)) diff --git a/paimon-core/src/test/java/org/apache/paimon/table/RecordLevelExpireTest.java b/paimon-core/src/test/java/org/apache/paimon/table/RecordLevelExpireTest.java index 08c8ac548003..bd13b0ecf8fa 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/RecordLevelExpireTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/RecordLevelExpireTest.java @@ -38,6 +38,7 @@ import java.util.UUID; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; class RecordLevelExpireTest extends PrimaryKeyTableTestBase { @@ -54,7 +55,7 @@ public void beforeEachBase() throws Exception { Schema.newBuilder() .column("pt", DataTypes.INT()) .column("pk", DataTypes.INT()) - .column("col1", DataTypes.INT().notNull()) + .column("col1", DataTypes.INT()) .partitionKeys("pt") .primaryKey("pk", "pt") .options(tableOptions().toMap()) @@ -99,5 +100,14 @@ public void test() throws Exception { assertThat(query()).containsExactlyInAnyOrder(GenericRow.of(1, 4, currentSecs + 60 * 60)); assertThat(query(new int[] {2})) .containsExactlyInAnyOrder(GenericRow.of(currentSecs + 60 * 60)); + + writeCommit(GenericRow.of(1, 5, null)); + assertThat(query()) + .containsExactlyInAnyOrder( + GenericRow.of(1, 4, currentSecs + 60 * 60), GenericRow.of(1, 5, null)); + + // null time field for record-level expire is not supported yet. + assertThatThrownBy(() -> compact(1)) + .hasMessageContaining("Time field for record-level expire should not be null."); } } diff --git a/paimon-core/src/test/java/org/apache/paimon/table/RecordLevelExpireWithMillisecondTest.java b/paimon-core/src/test/java/org/apache/paimon/table/RecordLevelExpireWithMillisecondTest.java index 393683fc92ea..295058bfbd22 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/RecordLevelExpireWithMillisecondTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/RecordLevelExpireWithMillisecondTest.java @@ -38,6 +38,7 @@ import java.util.UUID; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; class RecordLevelExpireWithMillisecondTest extends PrimaryKeyTableTestBase { @Override @@ -53,7 +54,7 @@ public void beforeEachBase() throws Exception { Schema.newBuilder() .column("pt", DataTypes.INT()) .column("pk", DataTypes.INT()) - .column("col1", DataTypes.BIGINT().notNull()) + .column("col1", DataTypes.BIGINT()) .partitionKeys("pt") .primaryKey("pk", "pt") .options(tableOptions().toMap()) @@ -96,5 +97,13 @@ public void test() throws Exception { // compact, expired compact(1); assertThat(query(new int[] {0, 1})).containsExactlyInAnyOrder(GenericRow.of(1, 4)); + + writeCommit(GenericRow.of(1, 5, null)); + assertThat(query(new int[] {0, 1})) + .containsExactlyInAnyOrder(GenericRow.of(1, 4), GenericRow.of(1, 5)); + + // null time field for record-level expire is not supported yet. + assertThatThrownBy(() -> compact(1)) + .hasMessageContaining("Time field for record-level expire should not be null."); } } diff --git a/paimon-core/src/test/java/org/apache/paimon/table/RecordLevelExpireWithTimestampBaseTest.java b/paimon-core/src/test/java/org/apache/paimon/table/RecordLevelExpireWithTimestampBaseTest.java index abcb8c1c76ca..f352693759a7 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/RecordLevelExpireWithTimestampBaseTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/RecordLevelExpireWithTimestampBaseTest.java @@ -29,6 +29,7 @@ import java.time.Duration; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; abstract class RecordLevelExpireWithTimestampBaseTest extends PrimaryKeyTableTestBase { @@ -61,5 +62,13 @@ public void testTimestampTypeExpire() throws Exception { // compact, expired compact(1); assertThat(query(new int[] {0, 1})).containsExactlyInAnyOrder(GenericRow.of(1, 3)); + + writeCommit(GenericRow.of(1, 5, null)); + assertThat(query(new int[] {0, 1})) + .containsExactlyInAnyOrder(GenericRow.of(1, 3), GenericRow.of(1, 5)); + + // null time field for record-level expire is not supported yet. + assertThatThrownBy(() -> compact(1)) + .hasMessageContaining("Time field for record-level expire should not be null."); } } diff --git a/paimon-core/src/test/java/org/apache/paimon/table/RecordLevelExpireWithTimestampLTZTest.java b/paimon-core/src/test/java/org/apache/paimon/table/RecordLevelExpireWithTimestampLTZTest.java index c256171c6f03..af834af276c4 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/RecordLevelExpireWithTimestampLTZTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/RecordLevelExpireWithTimestampLTZTest.java @@ -46,7 +46,7 @@ public void beforeEachBase() throws Exception { Schema.newBuilder() .column("pt", DataTypes.INT()) .column("pk", DataTypes.INT()) - .column("col1", DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE().notNull()) + .column("col1", DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE()) .partitionKeys("pt") .primaryKey("pk", "pt") .options(tableOptions().toMap()) diff --git a/paimon-core/src/test/java/org/apache/paimon/table/RecordLevelExpireWithTimestampTest.java b/paimon-core/src/test/java/org/apache/paimon/table/RecordLevelExpireWithTimestampTest.java index 697053c0e7ac..3c4add8914f8 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/RecordLevelExpireWithTimestampTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/RecordLevelExpireWithTimestampTest.java @@ -46,7 +46,7 @@ public void beforeEachBase() throws Exception { Schema.newBuilder() .column("pt", DataTypes.INT()) .column("pk", DataTypes.INT()) - .column("col1", DataTypes.TIMESTAMP().notNull()) + .column("col1", DataTypes.TIMESTAMP()) .partitionKeys("pt") .primaryKey("pk", "pt") .options(tableOptions().toMap())