diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java index 20cbdea66d9a..4bddcdd72d7e 100644 --- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java +++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java @@ -28,11 +28,15 @@ import org.apache.paimon.options.ConfigOption; import org.apache.paimon.options.Options; import org.apache.paimon.types.ArrayType; +import org.apache.paimon.types.BigIntType; import org.apache.paimon.types.DataField; import org.apache.paimon.types.DataType; +import org.apache.paimon.types.IntType; +import org.apache.paimon.types.LocalZonedTimestampType; import org.apache.paimon.types.MapType; import org.apache.paimon.types.MultisetType; import org.apache.paimon.types.RowType; +import org.apache.paimon.types.TimestampType; import org.apache.paimon.types.VarCharType; import org.apache.paimon.utils.StringUtils; @@ -44,6 +48,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.stream.Collectors; @@ -176,6 +181,36 @@ public static void validateTableSchema(TableSchema schema) { } } + String recordLevelTimeField = options.recordLevelTimeField(); + if (recordLevelTimeField != null) { + Optional field = + schema.fields().stream() + .filter(dataField -> dataField.name().equals(recordLevelTimeField)) + .findFirst(); + if (!field.isPresent()) { + throw new IllegalArgumentException( + String.format( + "Can not find time field %s for record level expire.", + recordLevelTimeField)); + } + DataType dataType = field.get().type(); + if (!(dataType instanceof IntType + || dataType instanceof BigIntType + || dataType instanceof TimestampType + || dataType instanceof LocalZonedTimestampType)) { + throw new IllegalArgumentException( + String.format( + "The record level time field type should be one of INT, BIGINT, or TIMESTAMP, but field type is %s.", + dataType)); + } + if (dataType.isNullable()) { + throw new IllegalArgumentException( + String.format( + "Time field %s for record-level expire should be not null.", + recordLevelTimeField)); + } + } + if (options.mergeEngine() == MergeEngine.FIRST_ROW) { if (options.changelogProducer() != ChangelogProducer.LOOKUP && options.changelogProducer() != ChangelogProducer.NONE) { diff --git a/paimon-core/src/test/java/org/apache/paimon/schema/SchemaValidationTest.java b/paimon-core/src/test/java/org/apache/paimon/schema/SchemaValidationTest.java index 1bfe45879acb..9dbfff200e7b 100644 --- a/paimon-core/src/test/java/org/apache/paimon/schema/SchemaValidationTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/schema/SchemaValidationTest.java @@ -46,7 +46,8 @@ private void validateTableSchemaExec(Map options) { Arrays.asList( new DataField(0, "f0", DataTypes.INT()), new DataField(1, "f1", DataTypes.INT()), - new DataField(2, "f2", DataTypes.INT())); + new DataField(2, "f2", DataTypes.INT()), + new DataField(3, "f3", DataTypes.STRING())); List partitionKeys = Collections.singletonList("f0"); List primaryKeys = Collections.singletonList("f1"); options.put(BUCKET.key(), String.valueOf(-1)); @@ -103,4 +104,22 @@ public void testFromSnapshotConflict() { .hasMessageContaining( "[scan.snapshot-id] must be null when you set [scan.timestamp-millis,scan.timestamp]"); } + + @Test + public void testRecordLevelTimeField() { + Map options = new HashMap<>(2); + options.put(CoreOptions.RECORD_LEVEL_TIME_FIELD.key(), "f0"); + options.put(CoreOptions.RECORD_LEVEL_EXPIRE_TIME.key(), "1 m"); + assertThatThrownBy(() -> validateTableSchemaExec(options)) + .hasMessageContaining("Time field f0 for record-level expire should be not null"); + + options.put(CoreOptions.RECORD_LEVEL_TIME_FIELD.key(), "f10"); + assertThatThrownBy(() -> validateTableSchemaExec(options)) + .hasMessageContaining("Can not find time field f10 for record level expire."); + + options.put(CoreOptions.RECORD_LEVEL_TIME_FIELD.key(), "f3"); + assertThatThrownBy(() -> validateTableSchemaExec(options)) + .hasMessageContaining( + "The record level time field type should be one of INT, BIGINT, or TIMESTAMP, but field type is STRING."); + } } diff --git a/paimon-core/src/test/java/org/apache/paimon/table/RecordLevelExpireTest.java b/paimon-core/src/test/java/org/apache/paimon/table/RecordLevelExpireTest.java index 367a30b6676c..08c8ac548003 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/RecordLevelExpireTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/RecordLevelExpireTest.java @@ -19,18 +19,51 @@ package org.apache.paimon.table; import org.apache.paimon.CoreOptions; +import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.catalog.CatalogContext; +import org.apache.paimon.catalog.CatalogFactory; +import org.apache.paimon.catalog.Identifier; import org.apache.paimon.catalog.PrimaryKeyTableTestBase; import org.apache.paimon.data.GenericRow; +import org.apache.paimon.fs.Path; import org.apache.paimon.options.Options; +import org.apache.paimon.schema.Schema; +import org.apache.paimon.types.DataTypes; +import org.apache.paimon.utils.TraceableFileIO; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import java.time.Duration; +import java.util.UUID; import static org.assertj.core.api.Assertions.assertThat; class RecordLevelExpireTest extends PrimaryKeyTableTestBase { + @Override + @BeforeEach + public void beforeEachBase() throws Exception { + CatalogContext context = + CatalogContext.create( + new Path(TraceableFileIO.SCHEME + "://" + tempPath.toString())); + Catalog catalog = CatalogFactory.createCatalog(context); + Identifier identifier = new Identifier("default", "T"); + catalog.createDatabase(identifier.getDatabaseName(), true); + Schema schema = + Schema.newBuilder() + .column("pt", DataTypes.INT()) + .column("pk", DataTypes.INT()) + .column("col1", DataTypes.INT().notNull()) + .partitionKeys("pt") + .primaryKey("pk", "pt") + .options(tableOptions().toMap()) + .build(); + catalog.createTable(identifier, schema, true); + table = (FileStoreTable) catalog.getTable(identifier); + commitUser = UUID.randomUUID().toString(); + } + @Override protected Options tableOptions() { Options options = new Options(); diff --git a/paimon-core/src/test/java/org/apache/paimon/table/RecordLevelExpireWithMillisecondTest.java b/paimon-core/src/test/java/org/apache/paimon/table/RecordLevelExpireWithMillisecondTest.java index 15b6556c3481..393683fc92ea 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/RecordLevelExpireWithMillisecondTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/RecordLevelExpireWithMillisecondTest.java @@ -53,7 +53,7 @@ public void beforeEachBase() throws Exception { Schema.newBuilder() .column("pt", DataTypes.INT()) .column("pk", DataTypes.INT()) - .column("col1", DataTypes.BIGINT()) + .column("col1", DataTypes.BIGINT().notNull()) .partitionKeys("pt") .primaryKey("pk", "pt") .options(tableOptions().toMap()) diff --git a/paimon-core/src/test/java/org/apache/paimon/table/RecordLevelExpireWithTimestampLTZTest.java b/paimon-core/src/test/java/org/apache/paimon/table/RecordLevelExpireWithTimestampLTZTest.java index af834af276c4..c256171c6f03 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/RecordLevelExpireWithTimestampLTZTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/RecordLevelExpireWithTimestampLTZTest.java @@ -46,7 +46,7 @@ public void beforeEachBase() throws Exception { Schema.newBuilder() .column("pt", DataTypes.INT()) .column("pk", DataTypes.INT()) - .column("col1", DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE()) + .column("col1", DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE().notNull()) .partitionKeys("pt") .primaryKey("pk", "pt") .options(tableOptions().toMap()) diff --git a/paimon-core/src/test/java/org/apache/paimon/table/RecordLevelExpireWithTimestampTest.java b/paimon-core/src/test/java/org/apache/paimon/table/RecordLevelExpireWithTimestampTest.java index 3c4add8914f8..697053c0e7ac 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/RecordLevelExpireWithTimestampTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/RecordLevelExpireWithTimestampTest.java @@ -46,7 +46,7 @@ public void beforeEachBase() throws Exception { Schema.newBuilder() .column("pt", DataTypes.INT()) .column("pk", DataTypes.INT()) - .column("col1", DataTypes.TIMESTAMP()) + .column("col1", DataTypes.TIMESTAMP().notNull()) .partitionKeys("pt") .primaryKey("pk", "pt") .options(tableOptions().toMap())