Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,15 @@
import org.apache.paimon.options.ConfigOption;
import org.apache.paimon.options.Options;
import org.apache.paimon.types.ArrayType;
import org.apache.paimon.types.BigIntType;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.IntType;
import org.apache.paimon.types.LocalZonedTimestampType;
import org.apache.paimon.types.MapType;
import org.apache.paimon.types.MultisetType;
import org.apache.paimon.types.RowType;
import org.apache.paimon.types.TimestampType;
import org.apache.paimon.types.VarCharType;
import org.apache.paimon.utils.StringUtils;

Expand All @@ -44,6 +48,7 @@
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -176,6 +181,36 @@ public static void validateTableSchema(TableSchema schema) {
}
}

String recordLevelTimeField = options.recordLevelTimeField();
if (recordLevelTimeField != null) {
Optional<DataField> field =
schema.fields().stream()
.filter(dataField -> dataField.name().equals(recordLevelTimeField))
.findFirst();
if (!field.isPresent()) {
throw new IllegalArgumentException(
String.format(
"Can not find time field %s for record level expire.",
recordLevelTimeField));
}
DataType dataType = field.get().type();
if (!(dataType instanceof IntType
|| dataType instanceof BigIntType
|| dataType instanceof TimestampType
|| dataType instanceof LocalZonedTimestampType)) {
throw new IllegalArgumentException(
String.format(
"The record level time field type should be one of INT, BIGINT, or TIMESTAMP, but field type is %s.",
dataType));
}
if (dataType.isNullable()) {
throw new IllegalArgumentException(
String.format(
"Time field %s for record-level expire should be not null.",
recordLevelTimeField));
}
}

if (options.mergeEngine() == MergeEngine.FIRST_ROW) {
if (options.changelogProducer() != ChangelogProducer.LOOKUP
&& options.changelogProducer() != ChangelogProducer.NONE) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ private void validateTableSchemaExec(Map<String, String> options) {
Arrays.asList(
new DataField(0, "f0", DataTypes.INT()),
new DataField(1, "f1", DataTypes.INT()),
new DataField(2, "f2", DataTypes.INT()));
new DataField(2, "f2", DataTypes.INT()),
new DataField(3, "f3", DataTypes.STRING()));
List<String> partitionKeys = Collections.singletonList("f0");
List<String> primaryKeys = Collections.singletonList("f1");
options.put(BUCKET.key(), String.valueOf(-1));
Expand Down Expand Up @@ -103,4 +104,22 @@ public void testFromSnapshotConflict() {
.hasMessageContaining(
"[scan.snapshot-id] must be null when you set [scan.timestamp-millis,scan.timestamp]");
}

@Test
public void testRecordLevelTimeField() {
Map<String, String> options = new HashMap<>(2);
options.put(CoreOptions.RECORD_LEVEL_TIME_FIELD.key(), "f0");
options.put(CoreOptions.RECORD_LEVEL_EXPIRE_TIME.key(), "1 m");
assertThatThrownBy(() -> validateTableSchemaExec(options))
.hasMessageContaining("Time field f0 for record-level expire should be not null");

options.put(CoreOptions.RECORD_LEVEL_TIME_FIELD.key(), "f10");
assertThatThrownBy(() -> validateTableSchemaExec(options))
.hasMessageContaining("Can not find time field f10 for record level expire.");

options.put(CoreOptions.RECORD_LEVEL_TIME_FIELD.key(), "f3");
assertThatThrownBy(() -> validateTableSchemaExec(options))
.hasMessageContaining(
"The record level time field type should be one of INT, BIGINT, or TIMESTAMP, but field type is STRING.");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,51 @@
package org.apache.paimon.table;

import org.apache.paimon.CoreOptions;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.CatalogContext;
import org.apache.paimon.catalog.CatalogFactory;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.catalog.PrimaryKeyTableTestBase;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.fs.Path;
import org.apache.paimon.options.Options;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.utils.TraceableFileIO;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import java.time.Duration;
import java.util.UUID;

import static org.assertj.core.api.Assertions.assertThat;

class RecordLevelExpireTest extends PrimaryKeyTableTestBase {

@Override
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add a blank line before @OverRide.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

@BeforeEach
public void beforeEachBase() throws Exception {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why add this method?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Column "col1" datatype is DataTypes.INT() in PrimaryKeyTableTestBase, but it should be DataTypes.INT().notNull() in RecordLevelExpireTest.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

CatalogContext context =
CatalogContext.create(
new Path(TraceableFileIO.SCHEME + "://" + tempPath.toString()));
Catalog catalog = CatalogFactory.createCatalog(context);
Identifier identifier = new Identifier("default", "T");
catalog.createDatabase(identifier.getDatabaseName(), true);
Schema schema =
Schema.newBuilder()
.column("pt", DataTypes.INT())
.column("pk", DataTypes.INT())
.column("col1", DataTypes.INT().notNull())
.partitionKeys("pt")
.primaryKey("pk", "pt")
.options(tableOptions().toMap())
.build();
catalog.createTable(identifier, schema, true);
table = (FileStoreTable) catalog.getTable(identifier);
commitUser = UUID.randomUUID().toString();
}

@Override
protected Options tableOptions() {
Options options = new Options();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public void beforeEachBase() throws Exception {
Schema.newBuilder()
.column("pt", DataTypes.INT())
.column("pk", DataTypes.INT())
.column("col1", DataTypes.BIGINT())
.column("col1", DataTypes.BIGINT().notNull())
.partitionKeys("pt")
.primaryKey("pk", "pt")
.options(tableOptions().toMap())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public void beforeEachBase() throws Exception {
Schema.newBuilder()
.column("pt", DataTypes.INT())
.column("pk", DataTypes.INT())
.column("col1", DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE())
.column("col1", DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE().notNull())
.partitionKeys("pt")
.primaryKey("pk", "pt")
.options(tableOptions().toMap())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public void beforeEachBase() throws Exception {
Schema.newBuilder()
.column("pt", DataTypes.INT())
.column("pk", DataTypes.INT())
.column("col1", DataTypes.TIMESTAMP())
.column("col1", DataTypes.TIMESTAMP().notNull())
.partitionKeys("pt")
.primaryKey("pk", "pt")
.options(tableOptions().toMap())
Expand Down
Loading