From d0ad9d57df3dc1d6e6f0a891f4306a9199d5a535 Mon Sep 17 00:00:00 2001 From: HouliangQi Date: Thu, 19 Dec 2024 13:09:18 +0800 Subject: [PATCH 1/7] add DataFileMeta10Serializer --- .../paimon/io/DataFileMeta10Serializer.java | 150 ++++++++++++++++++ .../table/sink/CommitMessageSerializer.java | 10 +- .../apache/paimon/table/source/DataSplit.java | 6 +- ...ommittableSerializerCompatibilityTest.java | 8 +- 4 files changed, 168 insertions(+), 6 deletions(-) create mode 100644 paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta10Serializer.java diff --git a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta10Serializer.java b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta10Serializer.java new file mode 100644 index 000000000000..e4b63577ef98 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta10Serializer.java @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.io; + +import org.apache.paimon.data.BinaryString; +import org.apache.paimon.data.GenericRow; +import org.apache.paimon.data.safe.SafeBinaryRow; +import org.apache.paimon.data.serializer.InternalRowSerializer; +import org.apache.paimon.data.serializer.InternalSerializers; +import org.apache.paimon.manifest.FileSource; +import org.apache.paimon.stats.SimpleStats; +import org.apache.paimon.types.ArrayType; +import org.apache.paimon.types.BigIntType; +import org.apache.paimon.types.DataField; +import org.apache.paimon.types.DataTypes; +import org.apache.paimon.types.IntType; +import org.apache.paimon.types.RowType; +import org.apache.paimon.types.TinyIntType; + +import java.io.IOException; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import static org.apache.paimon.utils.InternalRowUtils.fromStringArrayData; +import static org.apache.paimon.utils.InternalRowUtils.toStringArrayData; +import static org.apache.paimon.utils.SerializationUtils.deserializeBinaryRow; +import static org.apache.paimon.utils.SerializationUtils.newBytesType; +import static org.apache.paimon.utils.SerializationUtils.newStringType; +import static org.apache.paimon.utils.SerializationUtils.serializeBinaryRow; + +/** Serializer for {@link DataFileMeta} with 0.9 version. */ +public class DataFileMeta10Serializer implements Serializable { + + private static final long serialVersionUID = 1L; + + public static final RowType SCHEMA = + new RowType( + false, + Arrays.asList( + new DataField(0, "_FILE_NAME", newStringType(false)), + new DataField(1, "_FILE_SIZE", new BigIntType(false)), + new DataField(2, "_ROW_COUNT", new BigIntType(false)), + new DataField(3, "_MIN_KEY", newBytesType(false)), + new DataField(4, "_MAX_KEY", newBytesType(false)), + new DataField(5, "_KEY_STATS", SimpleStats.SCHEMA), + new DataField(6, "_VALUE_STATS", SimpleStats.SCHEMA), + new DataField(7, "_MIN_SEQUENCE_NUMBER", new BigIntType(false)), + new DataField(8, "_MAX_SEQUENCE_NUMBER", new BigIntType(false)), + new DataField(9, "_SCHEMA_ID", new BigIntType(false)), + new DataField(10, "_LEVEL", new IntType(false)), + new DataField( + 11, "_EXTRA_FILES", new ArrayType(false, newStringType(false))), + new DataField(12, "_CREATION_TIME", DataTypes.TIMESTAMP_MILLIS()), + new DataField(13, "_DELETE_ROW_COUNT", new BigIntType(true)), + new DataField(14, "_EMBEDDED_FILE_INDEX", newBytesType(true)), + new DataField(15, "_FILE_SOURCE", new TinyIntType(true)), + new DataField( + 16, + "_VALUE_STATS_COLS", + DataTypes.ARRAY(DataTypes.STRING().notNull())))); + + protected final InternalRowSerializer rowSerializer; + + public DataFileMeta10Serializer() { + this.rowSerializer = InternalSerializers.create(SCHEMA); + } + + public final void serializeList(List records, DataOutputView target) + throws IOException { + target.writeInt(records.size()); + for (DataFileMeta t : records) { + serialize(t, target); + } + } + + public void serialize(DataFileMeta meta, DataOutputView target) throws IOException { + GenericRow row = + GenericRow.of( + BinaryString.fromString(meta.fileName()), + meta.fileSize(), + meta.rowCount(), + serializeBinaryRow(meta.minKey()), + serializeBinaryRow(meta.maxKey()), + meta.keyStats().toRow(), + meta.valueStats().toRow(), + meta.minSequenceNumber(), + meta.maxSequenceNumber(), + meta.schemaId(), + meta.level(), + toStringArrayData(meta.extraFiles()), + meta.creationTime(), + meta.deleteRowCount().orElse(null), + meta.embeddedIndex(), + meta.fileSource().map(FileSource::toByteValue).orElse(null), + toStringArrayData(meta.valueStatsCols())); + rowSerializer.serialize(row, target); + } + + public final List deserializeList(DataInputView source) throws IOException { + int size = source.readInt(); + List records = new ArrayList<>(size); + for (int i = 0; i < size; i++) { + records.add(deserialize(source)); + } + return records; + } + + public DataFileMeta deserialize(DataInputView in) throws IOException { + byte[] bytes = new byte[in.readInt()]; + in.readFully(bytes); + SafeBinaryRow row = new SafeBinaryRow(rowSerializer.getArity(), bytes, 0); + return new DataFileMeta( + row.getString(0).toString(), + row.getLong(1), + row.getLong(2), + deserializeBinaryRow(row.getBinary(3)), + deserializeBinaryRow(row.getBinary(4)), + SimpleStats.fromRow(row.getRow(5, 3)), + SimpleStats.fromRow(row.getRow(6, 3)), + row.getLong(7), + row.getLong(8), + row.getLong(9), + row.getInt(10), + fromStringArrayData(row.getArray(11)), + row.getTimestamp(12, 3), + row.isNullAt(13) ? null : row.getLong(13), + row.isNullAt(14) ? null : row.getBinary(14), + row.isNullAt(15) ? null : FileSource.fromByteValue(row.getByte(15)), + row.isNullAt(16) ? null : fromStringArrayData(row.getArray(16)), + null); + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageSerializer.java b/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageSerializer.java index 9fc251c36672..bd5d1ad6839a 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageSerializer.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageSerializer.java @@ -26,6 +26,7 @@ import org.apache.paimon.io.DataFileMeta; import org.apache.paimon.io.DataFileMeta08Serializer; import org.apache.paimon.io.DataFileMeta09Serializer; +import org.apache.paimon.io.DataFileMeta10Serializer; import org.apache.paimon.io.DataFileMetaSerializer; import org.apache.paimon.io.DataIncrement; import org.apache.paimon.io.DataInputDeserializer; @@ -52,6 +53,7 @@ public class CommitMessageSerializer implements VersionedSerializer> fileDeserializer( int version, DataInputView view) { - if (version >= 4) { + if (version >= 5) { return () -> dataFileSerializer.deserializeList(view); + } + if (version == 4) { + if (dataFile10Serializer == null) { + dataFile10Serializer = new DataFileMeta10Serializer(); + } + return () -> dataFile10Serializer.deserializeList(view); } else if (version == 3) { if (dataFile09Serializer == null) { dataFile09Serializer = new DataFileMeta09Serializer(); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java b/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java index b9460f28b4e7..40ffa8902011 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java @@ -22,6 +22,7 @@ import org.apache.paimon.io.DataFileMeta; import org.apache.paimon.io.DataFileMeta08Serializer; import org.apache.paimon.io.DataFileMeta09Serializer; +import org.apache.paimon.io.DataFileMeta10Serializer; import org.apache.paimon.io.DataFileMetaSerializer; import org.apache.paimon.io.DataInputView; import org.apache.paimon.io.DataInputViewStreamWrapper; @@ -362,7 +363,10 @@ private static FunctionWithIOException getFileMetaS } else if (version == 2) { DataFileMeta09Serializer serializer = new DataFileMeta09Serializer(); return serializer::deserialize; - } else if (version >= 3) { + } else if (version == 3) { + DataFileMeta10Serializer serializer = new DataFileMeta10Serializer(); + return serializer::deserialize; + } else if (version >= 4) { DataFileMetaSerializer serializer = new DataFileMetaSerializer(); return serializer::deserialize; } else { diff --git a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestCommittableSerializerCompatibilityTest.java b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestCommittableSerializerCompatibilityTest.java index fbc02b2d73f2..835deb48f193 100644 --- a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestCommittableSerializerCompatibilityTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestCommittableSerializerCompatibilityTest.java @@ -162,9 +162,9 @@ public void testCompatibilityToVersion4() throws IOException { Collections.singletonList(commitMessage)); ManifestCommittableSerializer serializer = new ManifestCommittableSerializer(); - byte[] bytes = serializer.serialize(manifestCommittable); - ManifestCommittable deserialized = serializer.deserialize(3, bytes); - assertThat(deserialized).isEqualTo(manifestCommittable); + // byte[] bytes = serializer.serialize(manifestCommittable); + // ManifestCommittable deserialized = serializer.deserialize(3, bytes); + // assertThat(deserialized).isEqualTo(manifestCommittable); byte[] v2Bytes = IOUtils.readFully( @@ -172,7 +172,7 @@ public void testCompatibilityToVersion4() throws IOException { .getClassLoader() .getResourceAsStream("compatibility/manifest-committable-v4"), true); - deserialized = serializer.deserialize(2, v2Bytes); + ManifestCommittable deserialized = serializer.deserialize(2, v2Bytes); assertThat(deserialized).isEqualTo(manifestCommittable); } From cd9e63857606d36f2ceeb0d43241e1bbb76fbd36 Mon Sep 17 00:00:00 2001 From: HouliangQi Date: Sun, 22 Dec 2024 01:14:05 +0800 Subject: [PATCH 2/7] add data root location in DataFileMeta --- .../generated/core_configuration.html | 6 ++ .../java/org/apache/paimon/CoreOptions.java | 29 ++++++++ .../org/apache/paimon/io/DataFileMeta.java | 62 ++++++++++++++---- .../paimon/io/DataFileMeta08Serializer.java | 1 + .../paimon/io/DataFileMeta09Serializer.java | 1 + .../paimon/io/DataFileMeta10Serializer.java | 3 +- .../paimon/io/DataFileMetaSerializer.java | 6 +- .../sink/CommitMessageLegacyV2Serializer.java | 1 + .../table/sink/CommitMessageSerializer.java | 11 +--- .../apache/paimon/table/source/DataSplit.java | 6 +- .../crosspartition/IndexBootstrapTest.java | 1 + .../apache/paimon/io/DataFileTestUtils.java | 1 + ...ommittableSerializerCompatibilityTest.java | 17 +++-- .../manifest/ManifestFileMetaTestBase.java | 1 + .../compact/IntervalPartitionTest.java | 1 + .../paimon/operation/ExpireSnapshotsTest.java | 1 + .../apache/paimon/table/source/SplitTest.java | 8 ++- .../compatibility/manifest-committable-v4 | Bin 3145 -> 3449 bytes .../compatibility/manifest-committable-v4.bak | Bin 0 -> 3145 bytes 19 files changed, 117 insertions(+), 39 deletions(-) create mode 100644 paimon-core/src/test/resources/compatibility/manifest-committable-v4.bak diff --git a/docs/layouts/shortcodes/generated/core_configuration.html b/docs/layouts/shortcodes/generated/core_configuration.html index 1133de289fa3..af60412ad579 100644 --- a/docs/layouts/shortcodes/generated/core_configuration.html +++ b/docs/layouts/shortcodes/generated/core_configuration.html @@ -1008,5 +1008,11 @@ Integer The bytes of types (CHAR, VARCHAR, BINARY, VARBINARY) devote to the zorder sort. + +
data-file.external-path
+ (none) + String + The location where the data of this table is currently written. + diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java index 6e1e9bba076b..3d860a32404f 100644 --- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java +++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java @@ -131,6 +131,20 @@ public class CoreOptions implements Serializable { .noDefaultValue() .withDescription("The file path of this table in the filesystem."); + @ExcludeFromDocumentation("Internal use only") + public static final ConfigOption WAREHOUSE_ROOT_PATH = + key("warehouse.root-path") + .stringType() + .noDefaultValue() + .withDescription("The file path of the warehouse in the filesystem."); + + public static final ConfigOption DATA_FILE_EXTERNAL_PATH = + key("data-file.external-path") + .stringType() + .noDefaultValue() + .withDescription( + "The location where the data of this table is currently written."); + public static final ConfigOption BRANCH = key("branch").stringType().defaultValue("main").withDescription("Specify branch name."); @@ -2368,6 +2382,21 @@ public boolean dataFileThinMode() { return options.get(DATA_FILE_THIN_MODE); } + public String getDataFileExternalPath() { + return options.get(DATA_FILE_EXTERNAL_PATH); + } + + public String getWarehouseRootPath() { + return options.get(WAREHOUSE_ROOT_PATH); + } + + public String getDataRootLocation() { + if (getDataFileExternalPath() == null || getDataFileExternalPath().isEmpty()) { + return getWarehouseRootPath(); + } + return getDataFileExternalPath(); + } + /** Specifies the merge engine for table with primary key. */ public enum MergeEngine implements DescribedEnum { DEDUPLICATE("deduplicate", "De-duplicate and keep the last row."), diff --git a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java index bb9e45ff002d..de38b77d713c 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java @@ -82,7 +82,8 @@ public class DataFileMeta { new DataField( 16, "_VALUE_STATS_COLS", - DataTypes.ARRAY(DataTypes.STRING().notNull())))); + DataTypes.ARRAY(DataTypes.STRING().notNull())), + new DataField(17, "_DATA_ROOT_LOCATION", newStringType(true)))); public static final BinaryRow EMPTY_MIN_KEY = EMPTY_ROW; public static final BinaryRow EMPTY_MAX_KEY = EMPTY_ROW; @@ -120,6 +121,13 @@ public class DataFileMeta { private final @Nullable List valueStatsCols; + /** + * the data root location that the file resides in, if it is null, the file is in the default + * warehouse path, when {@link CoreOptions#DATA_FILE_PATH_DIRECTORY} is set, new writen files + * will be persisted in {@link CoreOptions#DATA_FILE_PATH_DIRECTORY}. + */ + private final @Nullable String dataRootLocation; + public static DataFileMeta forAppend( String fileName, long fileSize, @@ -149,7 +157,8 @@ public static DataFileMeta forAppend( 0L, embeddedIndex, fileSource, - valueStatsCols); + valueStatsCols, + null); } public DataFileMeta( @@ -186,7 +195,8 @@ public DataFileMeta( deleteRowCount, embeddedIndex, fileSource, - valueStatsCols); + valueStatsCols, + null); } public DataFileMeta( @@ -222,7 +232,8 @@ public DataFileMeta( deleteRowCount, embeddedIndex, fileSource, - valueStatsCols); + valueStatsCols, + null); } public DataFileMeta( @@ -242,7 +253,8 @@ public DataFileMeta( @Nullable Long deleteRowCount, @Nullable byte[] embeddedIndex, @Nullable FileSource fileSource, - @Nullable List valueStatsCols) { + @Nullable List valueStatsCols, + @Nullable String dataRootLocation) { this.fileName = fileName; this.fileSize = fileSize; @@ -264,6 +276,7 @@ public DataFileMeta( this.deleteRowCount = deleteRowCount; this.fileSource = fileSource; this.valueStatsCols = valueStatsCols; + this.dataRootLocation = dataRootLocation; } public String fileName() { @@ -357,6 +370,19 @@ public String fileFormat() { return split[split.length - 1]; } + @Nullable + public String getDataRootLocationString() { + return dataRootLocation; + } + + @Nullable + public Path getDataRootLocation() { + if (dataRootLocation == null) { + return null; + } + return new Path(dataRootLocation); + } + public Optional fileSource() { return Optional.ofNullable(fileSource); } @@ -385,7 +411,8 @@ public DataFileMeta upgrade(int newLevel) { deleteRowCount, embeddedIndex, fileSource, - valueStatsCols); + valueStatsCols, + dataRootLocation); } public DataFileMeta rename(String newFileName) { @@ -406,7 +433,8 @@ public DataFileMeta rename(String newFileName) { deleteRowCount, embeddedIndex, fileSource, - valueStatsCols); + valueStatsCols, + dataRootLocation); } public DataFileMeta copyWithoutStats() { @@ -427,7 +455,8 @@ public DataFileMeta copyWithoutStats() { deleteRowCount, embeddedIndex, fileSource, - Collections.emptyList()); + Collections.emptyList(), + dataRootLocation); } public List collectFiles(DataFilePathFactory pathFactory) { @@ -455,7 +484,8 @@ public DataFileMeta copy(List newExtraFiles) { deleteRowCount, embeddedIndex, fileSource, - valueStatsCols); + valueStatsCols, + dataRootLocation); } public DataFileMeta copy(byte[] newEmbeddedIndex) { @@ -476,7 +506,8 @@ public DataFileMeta copy(byte[] newEmbeddedIndex) { deleteRowCount, newEmbeddedIndex, fileSource, - valueStatsCols); + valueStatsCols, + dataRootLocation); } @Override @@ -504,7 +535,8 @@ public boolean equals(Object o) { && Objects.equals(creationTime, that.creationTime) && Objects.equals(deleteRowCount, that.deleteRowCount) && Objects.equals(fileSource, that.fileSource) - && Objects.equals(valueStatsCols, that.valueStatsCols); + && Objects.equals(valueStatsCols, that.valueStatsCols) + && Objects.equals(dataRootLocation, that.dataRootLocation); } @Override @@ -526,7 +558,8 @@ public int hashCode() { creationTime, deleteRowCount, fileSource, - valueStatsCols); + valueStatsCols, + dataRootLocation); } @Override @@ -536,7 +569,7 @@ public String toString() { + "minKey: %s, maxKey: %s, keyStats: %s, valueStats: %s, " + "minSequenceNumber: %d, maxSequenceNumber: %d, " + "schemaId: %d, level: %d, extraFiles: %s, creationTime: %s, " - + "deleteRowCount: %d, fileSource: %s, valueStatsCols: %s}", + + "deleteRowCount: %d, fileSource: %s, valueStatsCols: %s, dataRootLocation: %s}", fileName, fileSize, rowCount, @@ -553,7 +586,8 @@ public String toString() { creationTime, deleteRowCount, fileSource, - valueStatsCols); + valueStatsCols, + dataRootLocation); } public static long getMaxSequenceNumber(List fileMetas) { diff --git a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta08Serializer.java b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta08Serializer.java index 03e4ed51f4be..e6c10f15342b 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta08Serializer.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta08Serializer.java @@ -133,6 +133,7 @@ public DataFileMeta deserialize(DataInputView in) throws IOException { row.isNullAt(13) ? null : row.getLong(13), row.isNullAt(14) ? null : row.getBinary(14), null, + null, null); } } diff --git a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta09Serializer.java b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta09Serializer.java index 2f8d89f5b1ab..36d1ad260fc9 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta09Serializer.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta09Serializer.java @@ -139,6 +139,7 @@ public DataFileMeta deserialize(DataInputView in) throws IOException { row.isNullAt(13) ? null : row.getLong(13), row.isNullAt(14) ? null : row.getBinary(14), row.isNullAt(15) ? null : FileSource.fromByteValue(row.getByte(15)), + null, null); } } diff --git a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta10Serializer.java b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta10Serializer.java index e4b63577ef98..7b4c95459e0c 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta10Serializer.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta10Serializer.java @@ -75,7 +75,8 @@ public class DataFileMeta10Serializer implements Serializable { new DataField( 16, "_VALUE_STATS_COLS", - DataTypes.ARRAY(DataTypes.STRING().notNull())))); + DataTypes.ARRAY(DataTypes.STRING().notNull())), + new DataField(17, "_DATA_ROOT_LOCATION", newStringType(true)))); protected final InternalRowSerializer rowSerializer; diff --git a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMetaSerializer.java b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMetaSerializer.java index 626201ca30ce..8e17c2e91b61 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMetaSerializer.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMetaSerializer.java @@ -58,7 +58,8 @@ public InternalRow toRow(DataFileMeta meta) { meta.deleteRowCount().orElse(null), meta.embeddedIndex(), meta.fileSource().map(FileSource::toByteValue).orElse(null), - toStringArrayData(meta.valueStatsCols())); + toStringArrayData(meta.valueStatsCols()), + BinaryString.fromString(meta.getDataRootLocationString())); } @Override @@ -80,6 +81,7 @@ public DataFileMeta fromRow(InternalRow row) { row.isNullAt(13) ? null : row.getLong(13), row.isNullAt(14) ? null : row.getBinary(14), row.isNullAt(15) ? null : FileSource.fromByteValue(row.getByte(15)), - row.isNullAt(16) ? null : fromStringArrayData(row.getArray(16))); + row.isNullAt(16) ? null : fromStringArrayData(row.getArray(16)), + row.isNullAt(17) ? null : row.getString(17).toString()); } } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageLegacyV2Serializer.java b/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageLegacyV2Serializer.java index 3e351cd1dae9..5da96da765fe 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageLegacyV2Serializer.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageLegacyV2Serializer.java @@ -155,6 +155,7 @@ public DataFileMeta fromRow(InternalRow row) { null, null, null, + null, null); } } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageSerializer.java b/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageSerializer.java index bd5d1ad6839a..abbd2a2eb710 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageSerializer.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageSerializer.java @@ -26,7 +26,6 @@ import org.apache.paimon.io.DataFileMeta; import org.apache.paimon.io.DataFileMeta08Serializer; import org.apache.paimon.io.DataFileMeta09Serializer; -import org.apache.paimon.io.DataFileMeta10Serializer; import org.apache.paimon.io.DataFileMetaSerializer; import org.apache.paimon.io.DataIncrement; import org.apache.paimon.io.DataInputDeserializer; @@ -53,7 +52,7 @@ public class CommitMessageSerializer implements VersionedSerializer> fileDeserializer( int version, DataInputView view) { - if (version >= 5) { + if (version >= 4) { return () -> dataFileSerializer.deserializeList(view); - } - if (version == 4) { - if (dataFile10Serializer == null) { - dataFile10Serializer = new DataFileMeta10Serializer(); - } - return () -> dataFile10Serializer.deserializeList(view); } else if (version == 3) { if (dataFile09Serializer == null) { dataFile09Serializer = new DataFileMeta09Serializer(); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java b/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java index 40ffa8902011..b9460f28b4e7 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java @@ -22,7 +22,6 @@ import org.apache.paimon.io.DataFileMeta; import org.apache.paimon.io.DataFileMeta08Serializer; import org.apache.paimon.io.DataFileMeta09Serializer; -import org.apache.paimon.io.DataFileMeta10Serializer; import org.apache.paimon.io.DataFileMetaSerializer; import org.apache.paimon.io.DataInputView; import org.apache.paimon.io.DataInputViewStreamWrapper; @@ -363,10 +362,7 @@ private static FunctionWithIOException getFileMetaS } else if (version == 2) { DataFileMeta09Serializer serializer = new DataFileMeta09Serializer(); return serializer::deserialize; - } else if (version == 3) { - DataFileMeta10Serializer serializer = new DataFileMeta10Serializer(); - return serializer::deserialize; - } else if (version >= 4) { + } else if (version >= 3) { DataFileMetaSerializer serializer = new DataFileMetaSerializer(); return serializer::deserialize; } else { diff --git a/paimon-core/src/test/java/org/apache/paimon/crosspartition/IndexBootstrapTest.java b/paimon-core/src/test/java/org/apache/paimon/crosspartition/IndexBootstrapTest.java index be4147735614..27fa311ddb0a 100644 --- a/paimon-core/src/test/java/org/apache/paimon/crosspartition/IndexBootstrapTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/crosspartition/IndexBootstrapTest.java @@ -160,6 +160,7 @@ private static DataFileMeta newFile(long timeMillis) { 0L, null, FileSource.APPEND, + null, null); } diff --git a/paimon-core/src/test/java/org/apache/paimon/io/DataFileTestUtils.java b/paimon-core/src/test/java/org/apache/paimon/io/DataFileTestUtils.java index 48c8d44876ae..a44ef9a53085 100644 --- a/paimon-core/src/test/java/org/apache/paimon/io/DataFileTestUtils.java +++ b/paimon-core/src/test/java/org/apache/paimon/io/DataFileTestUtils.java @@ -57,6 +57,7 @@ public static DataFileMeta newFile(long minSeq, long maxSeq) { maxSeq - minSeq + 1, null, FileSource.APPEND, + null, null); } diff --git a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestCommittableSerializerCompatibilityTest.java b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestCommittableSerializerCompatibilityTest.java index 835deb48f193..813c956fb2bc 100644 --- a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestCommittableSerializerCompatibilityTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestCommittableSerializerCompatibilityTest.java @@ -75,7 +75,8 @@ public void testProduction() throws IOException { 11L, new byte[] {1, 2, 4}, FileSource.COMPACT, - Arrays.asList("field1", "field2", "field3")); + Arrays.asList("field1", "field2", "field3"), + null); List dataFiles = Collections.singletonList(dataFile); LinkedHashMap dvMetas = new LinkedHashMap<>(); @@ -136,7 +137,8 @@ public void testCompatibilityToVersion4() throws IOException { 11L, new byte[] {1, 2, 4}, FileSource.COMPACT, - Arrays.asList("field1", "field2", "field3")); + Arrays.asList("field1", "field2", "field3"), + "hdfs://localhost:9000/path/to/file"); List dataFiles = Collections.singletonList(dataFile); LinkedHashMap dvMetas = new LinkedHashMap<>(); @@ -162,9 +164,9 @@ public void testCompatibilityToVersion4() throws IOException { Collections.singletonList(commitMessage)); ManifestCommittableSerializer serializer = new ManifestCommittableSerializer(); - // byte[] bytes = serializer.serialize(manifestCommittable); - // ManifestCommittable deserialized = serializer.deserialize(3, bytes); - // assertThat(deserialized).isEqualTo(manifestCommittable); + byte[] bytes = serializer.serialize(manifestCommittable); + ManifestCommittable deserialized = serializer.deserialize(3, bytes); + assertThat(deserialized).isEqualTo(manifestCommittable); byte[] v2Bytes = IOUtils.readFully( @@ -172,7 +174,7 @@ public void testCompatibilityToVersion4() throws IOException { .getClassLoader() .getResourceAsStream("compatibility/manifest-committable-v4"), true); - ManifestCommittable deserialized = serializer.deserialize(2, v2Bytes); + deserialized = serializer.deserialize(2, v2Bytes); assertThat(deserialized).isEqualTo(manifestCommittable); } @@ -206,6 +208,7 @@ public void testCompatibilityToVersion3() throws IOException { 11L, new byte[] {1, 2, 4}, FileSource.COMPACT, + null, null); List dataFiles = Collections.singletonList(dataFile); @@ -276,6 +279,7 @@ public void testCompatibilityToVersion2() throws IOException { 11L, new byte[] {1, 2, 4}, null, + null, null); List dataFiles = Collections.singletonList(dataFile); @@ -346,6 +350,7 @@ public void testCompatibilityToVersion2PaimonV07() throws IOException { null, null, null, + null, null); List dataFiles = Collections.singletonList(dataFile); diff --git a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTestBase.java b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTestBase.java index 52d82e76be2a..19bd6a856bf9 100644 --- a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTestBase.java +++ b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTestBase.java @@ -95,6 +95,7 @@ protected ManifestEntry makeEntry( 0L, // not used embeddedIndex, // not used FileSource.APPEND, + null, null)); } diff --git a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/IntervalPartitionTest.java b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/IntervalPartitionTest.java index bdee5c5f7507..94c11498c5db 100644 --- a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/IntervalPartitionTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/IntervalPartitionTest.java @@ -184,6 +184,7 @@ private DataFileMeta makeInterval(int left, int right) { 0L, null, FileSource.APPEND, + null, null); } diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/ExpireSnapshotsTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/ExpireSnapshotsTest.java index 9dc98343734b..abff820b2cb4 100644 --- a/paimon-core/src/test/java/org/apache/paimon/operation/ExpireSnapshotsTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/operation/ExpireSnapshotsTest.java @@ -214,6 +214,7 @@ public void testExpireExtraFiles() throws IOException { 0L, null, FileSource.APPEND, + null, null); ManifestEntry add = new ManifestEntry(FileKind.ADD, partition, 0, 1, dataFile); ManifestEntry delete = new ManifestEntry(FileKind.DELETE, partition, 0, 1, dataFile); diff --git a/paimon-core/src/test/java/org/apache/paimon/table/source/SplitTest.java b/paimon-core/src/test/java/org/apache/paimon/table/source/SplitTest.java index 0219941a0ac0..f937c3b6c74a 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/source/SplitTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/source/SplitTest.java @@ -139,7 +139,8 @@ public void testSerializerNormal() throws Exception { 11L, new byte[] {1, 2, 4}, FileSource.COMPACT, - Arrays.asList("field1", "field2", "field3")); + Arrays.asList("field1", "field2", "field3"), + null); List dataFiles = Collections.singletonList(dataFile); DeletionFile deletionFile = new DeletionFile("deletion_file", 100, 22, 33L); @@ -194,6 +195,7 @@ public void testSerializerCompatibleV1() throws Exception { 11L, new byte[] {1, 2, 4}, null, + null, null); List dataFiles = Collections.singletonList(dataFile); @@ -254,6 +256,7 @@ public void testSerializerCompatibleV2() throws Exception { 11L, new byte[] {1, 2, 4}, FileSource.COMPACT, + null, null); List dataFiles = Collections.singletonList(dataFile); @@ -314,7 +317,8 @@ public void testSerializerCompatibleV3() throws Exception { 11L, new byte[] {1, 2, 4}, FileSource.COMPACT, - Arrays.asList("field1", "field2", "field3")); + Arrays.asList("field1", "field2", "field3"), + null); List dataFiles = Collections.singletonList(dataFile); DeletionFile deletionFile = new DeletionFile("deletion_file", 100, 22, null); diff --git a/paimon-core/src/test/resources/compatibility/manifest-committable-v4 b/paimon-core/src/test/resources/compatibility/manifest-committable-v4 index 9c095669a34b8806fa6ccf29d77b578c1631e117..066372a0312de78e4c01f3015f5e57b615c24b84 100644 GIT binary patch literal 3449 zcmeHKJx{_=6ulHoFluzr#IZ3BBn~aOC=;VG4EzIVY)KmnZJ-jEG%bAejZGU zq)4(c>#!XF@lIW=Fd$}_q|r+5fKZd3L(nt8`~nPsci;hN;0pU-HrCkGp{OdCbPQf2 zZqU8%-h=tZY!ye`;1QBA7vmciC*Th$Y8Q=?asM`2;B3M+dN42R!pY}!0p<&UpT7sd zHv9f?JrgYVnvoOi0?bzcUq1(6X3I7ftn;~NL3$UlcVoZf@EkwmOpw~=D30dEBxur7 zaEq2Y&%-22L+FB`3qGf191f>#&r7qLAdRzW(Z2M3-^+s{_KM8opWGY_x3u+uY?#!K zO7&$#lP)>2^xf*2O;6k$fgY74|GXD4f1-8rT?855RXwG7h78zgXnlnwqt0RdpZSfRDc|m^%Q~c_0dB{7KMU>3&Ve TOE1Q6ajli^pVjNigz@AHO0kb! delta 363 zcmewtZX={9USpc0p3tO=DF@|+J|b)HrNA$S!a}v zi@eBhhhj&z0BcmyQhvmg0m8yNgBPJ^e?Tb^?WCr=LbCb>3la-Y{otEvkQ}&qv;WHrL-3G{N-XGQr zgWcTFienQXd;w(r5kSI}O%$=qz9(t_E|cDsd>_Ymtd9$W{Jg5?Y+8)dhM8M#nz{P7 zmuLMhcFWi;pPD(-SStQsao?a6FYEsG?zR6lL5cs@c|_wY=C4M`4|yK*w|owV)GFtj zdM}>OoYV1q{xx1g5NY6ej`y!~gfIGHef2NuDAbbwY(na!zzeukzwE=Z@ wKI~@GPB9xveHe79zR|VbOL&&MS6J#)HW-0#@!u7ytkO literal 0 HcmV?d00001 From ac5fe147554090c3b571ba5c42159bb5f8e10667 Mon Sep 17 00:00:00 2001 From: HouliangQi Date: Sun, 22 Dec 2024 01:17:17 +0800 Subject: [PATCH 3/7] delete redundant files --- .../compatibility/manifest-committable-v4.bak | Bin 3145 -> 0 bytes 1 file changed, 0 insertions(+), 0 deletions(-) delete mode 100644 paimon-core/src/test/resources/compatibility/manifest-committable-v4.bak diff --git a/paimon-core/src/test/resources/compatibility/manifest-committable-v4.bak b/paimon-core/src/test/resources/compatibility/manifest-committable-v4.bak deleted file mode 100644 index 9c095669a34b8806fa6ccf29d77b578c1631e117..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 3145 zcmeH}y-EW?5XUzk7NVksg5?!tZX={9USpc0p3tO=DF@|+J|b)HrNA$S!a}v zi@eBhhhj&z0BcmyQhvmg0m8yNgBPJ^e?Tb^?WCr=LbCb>3la-Y{otEvkQ}&qv;WHrL-3G{N-XGQr zgWcTFienQXd;w(r5kSI}O%$=qz9(t_E|cDsd>_Ymtd9$W{Jg5?Y+8)dhM8M#nz{P7 zmuLMhcFWi;pPD(-SStQsao?a6FYEsG?zR6lL5cs@c|_wY=C4M`4|yK*w|owV)GFtj zdM}>OoYV1q{xx1g5NY6ej`y!~gfIGHef2NuDAbbwY(na!zzeukzwE=Z@ wKI~@GPB9xveHe79zR|VbOL&&MS6J#)HW-0#@!u7ytkO From 3d2b1fde94162000975e8c5c5c7e9209e633d613 Mon Sep 17 00:00:00 2001 From: HouliangQi Date: Sun, 22 Dec 2024 01:19:22 +0800 Subject: [PATCH 4/7] delete redundant files --- .../paimon/io/DataFileMeta10Serializer.java | 151 ------------------ .../table/sink/CommitMessageSerializer.java | 1 - 2 files changed, 152 deletions(-) delete mode 100644 paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta10Serializer.java diff --git a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta10Serializer.java b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta10Serializer.java deleted file mode 100644 index 7b4c95459e0c..000000000000 --- a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta10Serializer.java +++ /dev/null @@ -1,151 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.paimon.io; - -import org.apache.paimon.data.BinaryString; -import org.apache.paimon.data.GenericRow; -import org.apache.paimon.data.safe.SafeBinaryRow; -import org.apache.paimon.data.serializer.InternalRowSerializer; -import org.apache.paimon.data.serializer.InternalSerializers; -import org.apache.paimon.manifest.FileSource; -import org.apache.paimon.stats.SimpleStats; -import org.apache.paimon.types.ArrayType; -import org.apache.paimon.types.BigIntType; -import org.apache.paimon.types.DataField; -import org.apache.paimon.types.DataTypes; -import org.apache.paimon.types.IntType; -import org.apache.paimon.types.RowType; -import org.apache.paimon.types.TinyIntType; - -import java.io.IOException; -import java.io.Serializable; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; - -import static org.apache.paimon.utils.InternalRowUtils.fromStringArrayData; -import static org.apache.paimon.utils.InternalRowUtils.toStringArrayData; -import static org.apache.paimon.utils.SerializationUtils.deserializeBinaryRow; -import static org.apache.paimon.utils.SerializationUtils.newBytesType; -import static org.apache.paimon.utils.SerializationUtils.newStringType; -import static org.apache.paimon.utils.SerializationUtils.serializeBinaryRow; - -/** Serializer for {@link DataFileMeta} with 0.9 version. */ -public class DataFileMeta10Serializer implements Serializable { - - private static final long serialVersionUID = 1L; - - public static final RowType SCHEMA = - new RowType( - false, - Arrays.asList( - new DataField(0, "_FILE_NAME", newStringType(false)), - new DataField(1, "_FILE_SIZE", new BigIntType(false)), - new DataField(2, "_ROW_COUNT", new BigIntType(false)), - new DataField(3, "_MIN_KEY", newBytesType(false)), - new DataField(4, "_MAX_KEY", newBytesType(false)), - new DataField(5, "_KEY_STATS", SimpleStats.SCHEMA), - new DataField(6, "_VALUE_STATS", SimpleStats.SCHEMA), - new DataField(7, "_MIN_SEQUENCE_NUMBER", new BigIntType(false)), - new DataField(8, "_MAX_SEQUENCE_NUMBER", new BigIntType(false)), - new DataField(9, "_SCHEMA_ID", new BigIntType(false)), - new DataField(10, "_LEVEL", new IntType(false)), - new DataField( - 11, "_EXTRA_FILES", new ArrayType(false, newStringType(false))), - new DataField(12, "_CREATION_TIME", DataTypes.TIMESTAMP_MILLIS()), - new DataField(13, "_DELETE_ROW_COUNT", new BigIntType(true)), - new DataField(14, "_EMBEDDED_FILE_INDEX", newBytesType(true)), - new DataField(15, "_FILE_SOURCE", new TinyIntType(true)), - new DataField( - 16, - "_VALUE_STATS_COLS", - DataTypes.ARRAY(DataTypes.STRING().notNull())), - new DataField(17, "_DATA_ROOT_LOCATION", newStringType(true)))); - - protected final InternalRowSerializer rowSerializer; - - public DataFileMeta10Serializer() { - this.rowSerializer = InternalSerializers.create(SCHEMA); - } - - public final void serializeList(List records, DataOutputView target) - throws IOException { - target.writeInt(records.size()); - for (DataFileMeta t : records) { - serialize(t, target); - } - } - - public void serialize(DataFileMeta meta, DataOutputView target) throws IOException { - GenericRow row = - GenericRow.of( - BinaryString.fromString(meta.fileName()), - meta.fileSize(), - meta.rowCount(), - serializeBinaryRow(meta.minKey()), - serializeBinaryRow(meta.maxKey()), - meta.keyStats().toRow(), - meta.valueStats().toRow(), - meta.minSequenceNumber(), - meta.maxSequenceNumber(), - meta.schemaId(), - meta.level(), - toStringArrayData(meta.extraFiles()), - meta.creationTime(), - meta.deleteRowCount().orElse(null), - meta.embeddedIndex(), - meta.fileSource().map(FileSource::toByteValue).orElse(null), - toStringArrayData(meta.valueStatsCols())); - rowSerializer.serialize(row, target); - } - - public final List deserializeList(DataInputView source) throws IOException { - int size = source.readInt(); - List records = new ArrayList<>(size); - for (int i = 0; i < size; i++) { - records.add(deserialize(source)); - } - return records; - } - - public DataFileMeta deserialize(DataInputView in) throws IOException { - byte[] bytes = new byte[in.readInt()]; - in.readFully(bytes); - SafeBinaryRow row = new SafeBinaryRow(rowSerializer.getArity(), bytes, 0); - return new DataFileMeta( - row.getString(0).toString(), - row.getLong(1), - row.getLong(2), - deserializeBinaryRow(row.getBinary(3)), - deserializeBinaryRow(row.getBinary(4)), - SimpleStats.fromRow(row.getRow(5, 3)), - SimpleStats.fromRow(row.getRow(6, 3)), - row.getLong(7), - row.getLong(8), - row.getLong(9), - row.getInt(10), - fromStringArrayData(row.getArray(11)), - row.getTimestamp(12, 3), - row.isNullAt(13) ? null : row.getLong(13), - row.isNullAt(14) ? null : row.getBinary(14), - row.isNullAt(15) ? null : FileSource.fromByteValue(row.getByte(15)), - row.isNullAt(16) ? null : fromStringArrayData(row.getArray(16)), - null); - } -} diff --git a/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageSerializer.java b/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageSerializer.java index abbd2a2eb710..9fc251c36672 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageSerializer.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageSerializer.java @@ -52,7 +52,6 @@ public class CommitMessageSerializer implements VersionedSerializer Date: Sun, 22 Dec 2024 01:40:31 +0800 Subject: [PATCH 5/7] change datasplit v3 --- .../apache/paimon/table/source/SplitTest.java | 2 +- .../test/resources/compatibility/datasplit-v3 | Bin 886 -> 934 bytes 2 files changed, 1 insertion(+), 1 deletion(-) diff --git a/paimon-core/src/test/java/org/apache/paimon/table/source/SplitTest.java b/paimon-core/src/test/java/org/apache/paimon/table/source/SplitTest.java index f937c3b6c74a..66b25a7ae9cd 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/source/SplitTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/source/SplitTest.java @@ -318,7 +318,7 @@ public void testSerializerCompatibleV3() throws Exception { new byte[] {1, 2, 4}, FileSource.COMPACT, Arrays.asList("field1", "field2", "field3"), - null); + "hdfs:///path/to/warehouse"); List dataFiles = Collections.singletonList(dataFile); DeletionFile deletionFile = new DeletionFile("deletion_file", 100, 22, null); diff --git a/paimon-core/src/test/resources/compatibility/datasplit-v3 b/paimon-core/src/test/resources/compatibility/datasplit-v3 index 6b19fe2d958d3c80465def7a5e08d160210fc472..ddded6178ba64febfc01611f45a268ca03a88ed2 100644 GIT binary patch delta 132 zcmeyywv2s)J|l1Gy~CFbbR^%TF)%Q&OtxXPU}9pJT*jy(IRhvl0>m4DI01-H0C58l zYfRqEsLr@z@@+x2b-ChycrcTQqqd8^!4=%5=%1lOY-&06N^$a@=J?T Tfl5GNvLCZ5_kSp0V5k59$Ves* delta 83 zcmZ3+{*7&eJ|nN|y~CFbbR^%TF)%PNPqtySU}C&6xr|Xoasp651c+AvaRLw@0OAH9 eR+zk*QJrzY5&UY!qbx From 1c74139f865d9833891f8e5424a22ed469aa36c2 Mon Sep 17 00:00:00 2001 From: HouliangQi Date: Sun, 22 Dec 2024 18:11:33 +0800 Subject: [PATCH 6/7] add DataFileMeta10LegacySerializer --- .../generated/core_configuration.html | 6 - .../java/org/apache/paimon/CoreOptions.java | 29 ---- .../org/apache/paimon/io/DataFileMeta.java | 35 ++-- .../io/DataFileMeta10LegacySerializer.java | 150 ++++++++++++++++++ .../table/sink/CommitMessageSerializer.java | 11 +- .../apache/paimon/table/source/DataSplit.java | 8 +- ...ommittableSerializerCompatibilityTest.java | 74 ++++++++- .../apache/paimon/table/source/SplitTest.java | 69 +++++++- .../test/resources/compatibility/datasplit-v3 | Bin 934 -> 886 bytes .../test/resources/compatibility/datasplit-v4 | Bin 0 -> 934 bytes .../compatibility/manifest-committable-v4 | Bin 3449 -> 3145 bytes .../compatibility/manifest-committable-v5 | Bin 0 -> 3449 bytes 12 files changed, 321 insertions(+), 61 deletions(-) create mode 100644 paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta10LegacySerializer.java create mode 100644 paimon-core/src/test/resources/compatibility/datasplit-v4 create mode 100644 paimon-core/src/test/resources/compatibility/manifest-committable-v5 diff --git a/docs/layouts/shortcodes/generated/core_configuration.html b/docs/layouts/shortcodes/generated/core_configuration.html index af60412ad579..1133de289fa3 100644 --- a/docs/layouts/shortcodes/generated/core_configuration.html +++ b/docs/layouts/shortcodes/generated/core_configuration.html @@ -1008,11 +1008,5 @@ Integer The bytes of types (CHAR, VARCHAR, BINARY, VARBINARY) devote to the zorder sort. - -
data-file.external-path
- (none) - String - The location where the data of this table is currently written. - diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java index 3d860a32404f..6e1e9bba076b 100644 --- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java +++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java @@ -131,20 +131,6 @@ public class CoreOptions implements Serializable { .noDefaultValue() .withDescription("The file path of this table in the filesystem."); - @ExcludeFromDocumentation("Internal use only") - public static final ConfigOption WAREHOUSE_ROOT_PATH = - key("warehouse.root-path") - .stringType() - .noDefaultValue() - .withDescription("The file path of the warehouse in the filesystem."); - - public static final ConfigOption DATA_FILE_EXTERNAL_PATH = - key("data-file.external-path") - .stringType() - .noDefaultValue() - .withDescription( - "The location where the data of this table is currently written."); - public static final ConfigOption BRANCH = key("branch").stringType().defaultValue("main").withDescription("Specify branch name."); @@ -2382,21 +2368,6 @@ public boolean dataFileThinMode() { return options.get(DATA_FILE_THIN_MODE); } - public String getDataFileExternalPath() { - return options.get(DATA_FILE_EXTERNAL_PATH); - } - - public String getWarehouseRootPath() { - return options.get(WAREHOUSE_ROOT_PATH); - } - - public String getDataRootLocation() { - if (getDataFileExternalPath() == null || getDataFileExternalPath().isEmpty()) { - return getWarehouseRootPath(); - } - return getDataFileExternalPath(); - } - /** Specifies the merge engine for table with primary key. */ public enum MergeEngine implements DescribedEnum { DEDUPLICATE("deduplicate", "De-duplicate and keep the last row."), diff --git a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java index de38b77d713c..549bfa3328eb 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java @@ -122,11 +122,10 @@ public class DataFileMeta { private final @Nullable List valueStatsCols; /** - * the data root location that the file resides in, if it is null, the file is in the default - * warehouse path, when {@link CoreOptions#DATA_FILE_PATH_DIRECTORY} is set, new writen files - * will be persisted in {@link CoreOptions#DATA_FILE_PATH_DIRECTORY}. + * the external that the file resides in, if it is null, the file is in the default warehouse + * path. */ - private final @Nullable String dataRootLocation; + private final @Nullable String externalPath; public static DataFileMeta forAppend( String fileName, @@ -254,7 +253,7 @@ public DataFileMeta( @Nullable byte[] embeddedIndex, @Nullable FileSource fileSource, @Nullable List valueStatsCols, - @Nullable String dataRootLocation) { + @Nullable String externalPath) { this.fileName = fileName; this.fileSize = fileSize; @@ -276,7 +275,7 @@ public DataFileMeta( this.deleteRowCount = deleteRowCount; this.fileSource = fileSource; this.valueStatsCols = valueStatsCols; - this.dataRootLocation = dataRootLocation; + this.externalPath = externalPath; } public String fileName() { @@ -372,15 +371,15 @@ public String fileFormat() { @Nullable public String getDataRootLocationString() { - return dataRootLocation; + return externalPath; } @Nullable - public Path getDataRootLocation() { - if (dataRootLocation == null) { + public Path getExternalPath() { + if (externalPath == null) { return null; } - return new Path(dataRootLocation); + return new Path(externalPath); } public Optional fileSource() { @@ -412,7 +411,7 @@ public DataFileMeta upgrade(int newLevel) { embeddedIndex, fileSource, valueStatsCols, - dataRootLocation); + externalPath); } public DataFileMeta rename(String newFileName) { @@ -434,7 +433,7 @@ public DataFileMeta rename(String newFileName) { embeddedIndex, fileSource, valueStatsCols, - dataRootLocation); + externalPath); } public DataFileMeta copyWithoutStats() { @@ -456,7 +455,7 @@ public DataFileMeta copyWithoutStats() { embeddedIndex, fileSource, Collections.emptyList(), - dataRootLocation); + externalPath); } public List collectFiles(DataFilePathFactory pathFactory) { @@ -485,7 +484,7 @@ public DataFileMeta copy(List newExtraFiles) { embeddedIndex, fileSource, valueStatsCols, - dataRootLocation); + externalPath); } public DataFileMeta copy(byte[] newEmbeddedIndex) { @@ -507,7 +506,7 @@ public DataFileMeta copy(byte[] newEmbeddedIndex) { newEmbeddedIndex, fileSource, valueStatsCols, - dataRootLocation); + externalPath); } @Override @@ -536,7 +535,7 @@ public boolean equals(Object o) { && Objects.equals(deleteRowCount, that.deleteRowCount) && Objects.equals(fileSource, that.fileSource) && Objects.equals(valueStatsCols, that.valueStatsCols) - && Objects.equals(dataRootLocation, that.dataRootLocation); + && Objects.equals(externalPath, that.externalPath); } @Override @@ -559,7 +558,7 @@ public int hashCode() { deleteRowCount, fileSource, valueStatsCols, - dataRootLocation); + externalPath); } @Override @@ -587,7 +586,7 @@ public String toString() { deleteRowCount, fileSource, valueStatsCols, - dataRootLocation); + externalPath); } public static long getMaxSequenceNumber(List fileMetas) { diff --git a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta10LegacySerializer.java b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta10LegacySerializer.java new file mode 100644 index 000000000000..68ccba6ea31c --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta10LegacySerializer.java @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.io; + +import org.apache.paimon.data.BinaryString; +import org.apache.paimon.data.GenericRow; +import org.apache.paimon.data.safe.SafeBinaryRow; +import org.apache.paimon.data.serializer.InternalRowSerializer; +import org.apache.paimon.data.serializer.InternalSerializers; +import org.apache.paimon.manifest.FileSource; +import org.apache.paimon.stats.SimpleStats; +import org.apache.paimon.types.ArrayType; +import org.apache.paimon.types.BigIntType; +import org.apache.paimon.types.DataField; +import org.apache.paimon.types.DataTypes; +import org.apache.paimon.types.IntType; +import org.apache.paimon.types.RowType; +import org.apache.paimon.types.TinyIntType; + +import java.io.IOException; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import static org.apache.paimon.utils.InternalRowUtils.fromStringArrayData; +import static org.apache.paimon.utils.InternalRowUtils.toStringArrayData; +import static org.apache.paimon.utils.SerializationUtils.deserializeBinaryRow; +import static org.apache.paimon.utils.SerializationUtils.newBytesType; +import static org.apache.paimon.utils.SerializationUtils.newStringType; +import static org.apache.paimon.utils.SerializationUtils.serializeBinaryRow; + +/** Serializer for {@link DataFileMeta} with 0.9 version. */ +public class DataFileMeta10LegacySerializer implements Serializable { + + private static final long serialVersionUID = 1L; + + public static final RowType SCHEMA = + new RowType( + false, + Arrays.asList( + new DataField(0, "_FILE_NAME", newStringType(false)), + new DataField(1, "_FILE_SIZE", new BigIntType(false)), + new DataField(2, "_ROW_COUNT", new BigIntType(false)), + new DataField(3, "_MIN_KEY", newBytesType(false)), + new DataField(4, "_MAX_KEY", newBytesType(false)), + new DataField(5, "_KEY_STATS", SimpleStats.SCHEMA), + new DataField(6, "_VALUE_STATS", SimpleStats.SCHEMA), + new DataField(7, "_MIN_SEQUENCE_NUMBER", new BigIntType(false)), + new DataField(8, "_MAX_SEQUENCE_NUMBER", new BigIntType(false)), + new DataField(9, "_SCHEMA_ID", new BigIntType(false)), + new DataField(10, "_LEVEL", new IntType(false)), + new DataField( + 11, "_EXTRA_FILES", new ArrayType(false, newStringType(false))), + new DataField(12, "_CREATION_TIME", DataTypes.TIMESTAMP_MILLIS()), + new DataField(13, "_DELETE_ROW_COUNT", new BigIntType(true)), + new DataField(14, "_EMBEDDED_FILE_INDEX", newBytesType(true)), + new DataField(15, "_FILE_SOURCE", new TinyIntType(true)), + new DataField( + 16, + "_VALUE_STATS_COLS", + DataTypes.ARRAY(DataTypes.STRING().notNull())))); + + protected final InternalRowSerializer rowSerializer; + + public DataFileMeta10LegacySerializer() { + this.rowSerializer = InternalSerializers.create(SCHEMA); + } + + public final void serializeList(List records, DataOutputView target) + throws IOException { + target.writeInt(records.size()); + for (DataFileMeta t : records) { + serialize(t, target); + } + } + + public void serialize(DataFileMeta meta, DataOutputView target) throws IOException { + GenericRow row = + GenericRow.of( + BinaryString.fromString(meta.fileName()), + meta.fileSize(), + meta.rowCount(), + serializeBinaryRow(meta.minKey()), + serializeBinaryRow(meta.maxKey()), + meta.keyStats().toRow(), + meta.valueStats().toRow(), + meta.minSequenceNumber(), + meta.maxSequenceNumber(), + meta.schemaId(), + meta.level(), + toStringArrayData(meta.extraFiles()), + meta.creationTime(), + meta.deleteRowCount().orElse(null), + meta.embeddedIndex(), + meta.fileSource().map(FileSource::toByteValue).orElse(null), + toStringArrayData(meta.valueStatsCols())); + rowSerializer.serialize(row, target); + } + + public final List deserializeList(DataInputView source) throws IOException { + int size = source.readInt(); + List records = new ArrayList<>(size); + for (int i = 0; i < size; i++) { + records.add(deserialize(source)); + } + return records; + } + + public DataFileMeta deserialize(DataInputView in) throws IOException { + byte[] bytes = new byte[in.readInt()]; + in.readFully(bytes); + SafeBinaryRow row = new SafeBinaryRow(rowSerializer.getArity(), bytes, 0); + return new DataFileMeta( + row.getString(0).toString(), + row.getLong(1), + row.getLong(2), + deserializeBinaryRow(row.getBinary(3)), + deserializeBinaryRow(row.getBinary(4)), + SimpleStats.fromRow(row.getRow(5, 3)), + SimpleStats.fromRow(row.getRow(6, 3)), + row.getLong(7), + row.getLong(8), + row.getLong(9), + row.getInt(10), + fromStringArrayData(row.getArray(11)), + row.getTimestamp(12, 3), + row.isNullAt(13) ? null : row.getLong(13), + row.isNullAt(14) ? null : row.getBinary(14), + row.isNullAt(15) ? null : FileSource.fromByteValue(row.getByte(15)), + row.isNullAt(16) ? null : fromStringArrayData(row.getArray(16)), + null); + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageSerializer.java b/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageSerializer.java index 9fc251c36672..1b216943b4a9 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageSerializer.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageSerializer.java @@ -26,6 +26,7 @@ import org.apache.paimon.io.DataFileMeta; import org.apache.paimon.io.DataFileMeta08Serializer; import org.apache.paimon.io.DataFileMeta09Serializer; +import org.apache.paimon.io.DataFileMeta10LegacySerializer; import org.apache.paimon.io.DataFileMetaSerializer; import org.apache.paimon.io.DataIncrement; import org.apache.paimon.io.DataInputDeserializer; @@ -47,11 +48,12 @@ /** {@link VersionedSerializer} for {@link CommitMessage}. */ public class CommitMessageSerializer implements VersionedSerializer { - private static final int CURRENT_VERSION = 5; + private static final int CURRENT_VERSION = 6; private final DataFileMetaSerializer dataFileSerializer; private final IndexFileMetaSerializer indexEntrySerializer; + private DataFileMeta10LegacySerializer dataFileMeta10LegacySerializer; private DataFileMeta09Serializer dataFile09Serializer; private DataFileMeta08Serializer dataFile08Serializer; private IndexFileMeta09Serializer indexEntry09Serializer; @@ -129,8 +131,13 @@ private CommitMessage deserialize(int version, DataInputView view) throws IOExce private IOExceptionSupplier> fileDeserializer( int version, DataInputView view) { - if (version >= 4) { + if (version >= 6) { return () -> dataFileSerializer.deserializeList(view); + } else if (version >= 4) { + if (dataFileMeta10LegacySerializer == null) { + dataFileMeta10LegacySerializer = new DataFileMeta10LegacySerializer(); + } + return () -> dataFileMeta10LegacySerializer.deserializeList(view); } else if (version == 3) { if (dataFile09Serializer == null) { dataFile09Serializer = new DataFileMeta09Serializer(); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java b/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java index b9460f28b4e7..d74a14d9d56b 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java @@ -22,6 +22,7 @@ import org.apache.paimon.io.DataFileMeta; import org.apache.paimon.io.DataFileMeta08Serializer; import org.apache.paimon.io.DataFileMeta09Serializer; +import org.apache.paimon.io.DataFileMeta10LegacySerializer; import org.apache.paimon.io.DataFileMetaSerializer; import org.apache.paimon.io.DataInputView; import org.apache.paimon.io.DataInputViewStreamWrapper; @@ -51,7 +52,7 @@ public class DataSplit implements Split { private static final long serialVersionUID = 7L; private static final long MAGIC = -2394839472490812314L; - private static final int VERSION = 4; + private static final int VERSION = 5; private long snapshotId = 0; private BinaryRow partition; @@ -362,7 +363,10 @@ private static FunctionWithIOException getFileMetaS } else if (version == 2) { DataFileMeta09Serializer serializer = new DataFileMeta09Serializer(); return serializer::deserialize; - } else if (version >= 3) { + } else if (version >= 3 && version <= 4) { + DataFileMeta10LegacySerializer serializer = new DataFileMeta10LegacySerializer(); + return serializer::deserialize; + } else if (version >= 5) { DataFileMetaSerializer serializer = new DataFileMetaSerializer(); return serializer::deserialize; } else { diff --git a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestCommittableSerializerCompatibilityTest.java b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestCommittableSerializerCompatibilityTest.java index 813c956fb2bc..34af55165954 100644 --- a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestCommittableSerializerCompatibilityTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestCommittableSerializerCompatibilityTest.java @@ -76,7 +76,7 @@ public void testProduction() throws IOException { new byte[] {1, 2, 4}, FileSource.COMPACT, Arrays.asList("field1", "field2", "field3"), - null); + "hdfs://localhost:9000/path/to/file"); List dataFiles = Collections.singletonList(dataFile); LinkedHashMap dvMetas = new LinkedHashMap<>(); @@ -108,7 +108,7 @@ public void testProduction() throws IOException { } @Test - public void testCompatibilityToVersion4() throws IOException { + public void testCompatibilityToVersion5() throws IOException { SimpleStats keyStats = new SimpleStats( singleColumn("min_key"), @@ -148,6 +148,76 @@ public void testCompatibilityToVersion4() throws IOException { new IndexFileMeta("my_index_type", "my_index_file", 1024 * 100, 1002, dvMetas); List indexFiles = Collections.singletonList(indexFile); + CommitMessageImpl commitMessage = + new CommitMessageImpl( + singleColumn("my_partition"), + 11, + new DataIncrement(dataFiles, dataFiles, dataFiles), + new CompactIncrement(dataFiles, dataFiles, dataFiles), + new IndexIncrement(indexFiles)); + + ManifestCommittable manifestCommittable = + new ManifestCommittable( + 5, + 202020L, + Collections.singletonMap(5, 555L), + Collections.singletonList(commitMessage)); + + ManifestCommittableSerializer serializer = new ManifestCommittableSerializer(); + byte[] bytes = serializer.serialize(manifestCommittable); + ManifestCommittable deserialized = serializer.deserialize(3, bytes); + assertThat(deserialized).isEqualTo(manifestCommittable); + byte[] v2Bytes = + IOUtils.readFully( + ManifestCommittableSerializerCompatibilityTest.class + .getClassLoader() + .getResourceAsStream("compatibility/manifest-committable-v5"), + true); + deserialized = serializer.deserialize(2, v2Bytes); + assertThat(deserialized).isEqualTo(manifestCommittable); + } + + @Test + public void testCompatibilityToVersion4() throws IOException { + SimpleStats keyStats = + new SimpleStats( + singleColumn("min_key"), + singleColumn("max_key"), + fromLongArray(new Long[] {0L})); + SimpleStats valueStats = + new SimpleStats( + singleColumn("min_value"), + singleColumn("max_value"), + fromLongArray(new Long[] {0L})); + DataFileMeta dataFile = + new DataFileMeta( + "my_file", + 1024 * 1024, + 1024, + singleColumn("min_key"), + singleColumn("max_key"), + keyStats, + valueStats, + 15, + 200, + 5, + 3, + Arrays.asList("extra1", "extra2"), + Timestamp.fromLocalDateTime(LocalDateTime.parse("2022-03-02T20:20:12")), + 11L, + new byte[] {1, 2, 4}, + FileSource.COMPACT, + Arrays.asList("field1", "field2", "field3"), + null); + List dataFiles = Collections.singletonList(dataFile); + + LinkedHashMap dvMetas = new LinkedHashMap<>(); + dvMetas.put("dv_key1", new DeletionVectorMeta("dv_key1", 1, 2, null)); + dvMetas.put("dv_key2", new DeletionVectorMeta("dv_key2", 3, 4, null)); + IndexFileMeta indexFile = + new IndexFileMeta("my_index_type", "my_index_file", 1024 * 100, 1002, dvMetas); + List indexFiles = Collections.singletonList(indexFile); + CommitMessageImpl commitMessage = new CommitMessageImpl( singleColumn("my_partition"), diff --git a/paimon-core/src/test/java/org/apache/paimon/table/source/SplitTest.java b/paimon-core/src/test/java/org/apache/paimon/table/source/SplitTest.java index 66b25a7ae9cd..88394d2dc33b 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/source/SplitTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/source/SplitTest.java @@ -140,7 +140,7 @@ public void testSerializerNormal() throws Exception { new byte[] {1, 2, 4}, FileSource.COMPACT, Arrays.asList("field1", "field2", "field3"), - null); + "hdfs:///path/to/warehouse"); List dataFiles = Collections.singletonList(dataFile); DeletionFile deletionFile = new DeletionFile("deletion_file", 100, 22, 33L); @@ -318,7 +318,7 @@ public void testSerializerCompatibleV3() throws Exception { new byte[] {1, 2, 4}, FileSource.COMPACT, Arrays.asList("field1", "field2", "field3"), - "hdfs:///path/to/warehouse"); + null); List dataFiles = Collections.singletonList(dataFile); DeletionFile deletionFile = new DeletionFile("deletion_file", 100, 22, null); @@ -351,6 +351,71 @@ public void testSerializerCompatibleV3() throws Exception { assertThat(actual).isEqualTo(split); } + @Test + public void testSerializerCompatibleV4() throws Exception { + SimpleStats keyStats = + new SimpleStats( + singleColumn("min_key"), + singleColumn("max_key"), + fromLongArray(new Long[] {0L})); + SimpleStats valueStats = + new SimpleStats( + singleColumn("min_value"), + singleColumn("max_value"), + fromLongArray(new Long[] {0L})); + + DataFileMeta dataFile = + new DataFileMeta( + "my_file", + 1024 * 1024, + 1024, + singleColumn("min_key"), + singleColumn("max_key"), + keyStats, + valueStats, + 15, + 200, + 5, + 3, + Arrays.asList("extra1", "extra2"), + Timestamp.fromLocalDateTime(LocalDateTime.parse("2022-03-02T20:20:12")), + 11L, + new byte[] {1, 2, 4}, + FileSource.COMPACT, + Arrays.asList("field1", "field2", "field3"), + "hdfs:///path/to/warehouse"); + List dataFiles = Collections.singletonList(dataFile); + + DeletionFile deletionFile = new DeletionFile("deletion_file", 100, 22, null); + List deletionFiles = Collections.singletonList(deletionFile); + + BinaryRow partition = new BinaryRow(1); + BinaryRowWriter binaryRowWriter = new BinaryRowWriter(partition); + binaryRowWriter.writeString(0, BinaryString.fromString("aaaaa")); + binaryRowWriter.complete(); + + DataSplit split = + DataSplit.builder() + .withSnapshot(18) + .withPartition(partition) + .withBucket(20) + .withDataFiles(dataFiles) + .withDataDeletionFiles(deletionFiles) + .withBucketPath("my path") + .build(); + + byte[] v2Bytes = + IOUtils.readFully( + SplitTest.class + .getClassLoader() + .getResourceAsStream("compatibility/datasplit-v4"), + true); + + DataSplit actual = + InstantiationUtil.deserializeObject(v2Bytes, DataSplit.class.getClassLoader()); + assertThat(actual).isEqualTo(split); + } + private DataFileMeta newDataFile(long rowCount) { return DataFileMeta.forAppend( "my_data_file.parquet", diff --git a/paimon-core/src/test/resources/compatibility/datasplit-v3 b/paimon-core/src/test/resources/compatibility/datasplit-v3 index ddded6178ba64febfc01611f45a268ca03a88ed2..6b19fe2d958d3c80465def7a5e08d160210fc472 100644 GIT binary patch delta 83 zcmZ3+{*7&eJ|nN|y~CFbbR^%TF)%PNPqtySU}C&6xr|Xoasp651c+AvaRLw@0OAH9 eR+zk*QJrzY5&UY!qbx delta 132 zcmeyywv2s)J|l1Gy~CFbbR^%TF)%Q&OtxXPU}9pJT*jy(IRhvl0>m4DI01-H0C58l zYfRqEsLr@z@@+x2b-ChycrcTQqqd8^!4=%5=%1lOY-&06N^$a@=J?T Tfl5GNvLCZ5_kSp0V5k59$Ves* diff --git a/paimon-core/src/test/resources/compatibility/datasplit-v4 b/paimon-core/src/test/resources/compatibility/datasplit-v4 new file mode 100644 index 0000000000000000000000000000000000000000..6ccef002b15d9ba89f6b1fb62f1481709b8b2c16 GIT binary patch literal 934 zcma)5O>WdM82vJ(R5K%>6olw*gjjTeVkjF#cTgc(Wl_;xH?3#lOiVLzaGd#J0Z549 z3LJq23yy%qhBXI3901M$&vr6XV8bW9y!iS1wUZA&X|>d}rSvf51s|j$EVvx0JT$zY ziLg|a9*A(48-7@1(vY{c73%EK`F=Gx73PS#vOF|e@R7`iM|4s1$wQTog*Fm7_Ni0m zyeLy;_Tq@H^hKhy*cF*DQss|iCQ3u=(NjL=tTHlVku1$lL|yOb9G=k&3L@(Gf(P8B zhBkai#`7UVb(BWXZ-#&l6eQWbaf@4^%&te076$t7zace|xoW3@i^$PUcY{IM6ywhlQoM8U9MC)EV z=gFpf2Uz|HoB&^dSHKoBTmoCOnzg2t6~CnG;P>!5iT0zH;BKu4Emz<*BxgN5KVh*6 z{_e$Qi*T36r(%ZQoGo=eJ*&G-+h-53d=IeG900Io+<)X-IQ!Ve^CsSUfaS-4&0hep zMMxV9tJ}WEJgbD=UE05XnB#B1g;O2Z+?1Fa&F|3M@ixtyb55klV(5;cJHAWvG)~I< njNwxk{_E2wM6HDCKx8`1PP!*RDh literal 0 HcmV?d00001 diff --git a/paimon-core/src/test/resources/compatibility/manifest-committable-v4 b/paimon-core/src/test/resources/compatibility/manifest-committable-v4 index 066372a0312de78e4c01f3015f5e57b615c24b84..9c095669a34b8806fa6ccf29d77b578c1631e117 100644 GIT binary patch delta 363 zcmewbAejZGU zq)4(c>#!XF@lIW=Fd$}_q|r+5fKZd3L(nt8`~nPsci;hN;0pU-HrCkGp{OdCbPQf2 zZqU8%-h=tZY!ye`;1QBA7vmciC*Th$Y8Q=?asM`2;B3M+dN42R!pY}!0p<&UpT7sd zHv9f?JrgYVnvoOi0?bzcUq1(6X3I7ftn;~NL3$UlcVoZf@EkwmOpw~=D30dEBxur7 zaEq2Y&%-22L+FB`3qGf191f>#&r7qLAdRzW(Z2M3-^+s{_KM8opWGY_x3u+uY?#!K zO7&$#lP)>2^xf*2O;6k$fgY74|GXD4f1-8rT?855RXwG7h78zgXnlnwqt0RdpZSfRDc|m^%Q~c_0dB{7KMU>3&Ve TOE1Q6ajli^pVjNigz@AHO0kb! diff --git a/paimon-core/src/test/resources/compatibility/manifest-committable-v5 b/paimon-core/src/test/resources/compatibility/manifest-committable-v5 new file mode 100644 index 0000000000000000000000000000000000000000..8b2b05869bcf52cf211c013426d43e8c05a1ad09 GIT binary patch literal 3449 zcmeHKu};G<5WS`a6#;?)B*q9aRANY47U3D0%*ZGMSoLkg%3lb9bg|AE&6#d z5s65$G3&5x0BbmPIYNV&VUk8GH3LFTc@98N0P_nl0N#Onpn(9J%1bF=%fSD;<7_iR!o(1V`#NM_2j>CKWj59%co})RMi%HO= zrQ{YZi*p_(Q5r&*3|;aWE#q)FZF^ptT?c8LO-1{{_kAx9MC^&oTHFXNMekf;ysvBHLwtlenBxlC|{=cACen+-l zz}MUX)1#`WK6L>bL+Syzha}@Ln)k&b=hD=Sb*Wm^!43HMZo%9EsLl;hIO8ipb5+!9 VMZEH&{TA0+74@_JT$wQLd;wSnk6r)( literal 0 HcmV?d00001 From 4f1993fc8f3d25572bb5746d70702c94f49af955 Mon Sep 17 00:00:00 2001 From: HouliangQi Date: Sun, 22 Dec 2024 22:26:59 +0800 Subject: [PATCH 7/7] change the version judge --- .../main/java/org/apache/paimon/io/DataFileMeta.java | 12 ++---------- .../org/apache/paimon/io/DataFileMetaSerializer.java | 2 +- .../paimon/table/sink/CommitMessageSerializer.java | 4 ++-- .../org/apache/paimon/table/source/DataSplit.java | 4 ++-- 4 files changed, 7 insertions(+), 15 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java index 549bfa3328eb..bb4a2eed7223 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java @@ -370,18 +370,10 @@ public String fileFormat() { } @Nullable - public String getDataRootLocationString() { + public String getExternalPath() { return externalPath; } - @Nullable - public Path getExternalPath() { - if (externalPath == null) { - return null; - } - return new Path(externalPath); - } - public Optional fileSource() { return Optional.ofNullable(fileSource); } @@ -568,7 +560,7 @@ public String toString() { + "minKey: %s, maxKey: %s, keyStats: %s, valueStats: %s, " + "minSequenceNumber: %d, maxSequenceNumber: %d, " + "schemaId: %d, level: %d, extraFiles: %s, creationTime: %s, " - + "deleteRowCount: %d, fileSource: %s, valueStatsCols: %s, dataRootLocation: %s}", + + "deleteRowCount: %d, fileSource: %s, valueStatsCols: %s, externalPath: %s}", fileName, fileSize, rowCount, diff --git a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMetaSerializer.java b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMetaSerializer.java index 8e17c2e91b61..2d109d96d7a8 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMetaSerializer.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMetaSerializer.java @@ -59,7 +59,7 @@ public InternalRow toRow(DataFileMeta meta) { meta.embeddedIndex(), meta.fileSource().map(FileSource::toByteValue).orElse(null), toStringArrayData(meta.valueStatsCols()), - BinaryString.fromString(meta.getDataRootLocationString())); + BinaryString.fromString(meta.getExternalPath())); } @Override diff --git a/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageSerializer.java b/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageSerializer.java index 1b216943b4a9..c65f8302aa87 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageSerializer.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageSerializer.java @@ -131,9 +131,9 @@ private CommitMessage deserialize(int version, DataInputView view) throws IOExce private IOExceptionSupplier> fileDeserializer( int version, DataInputView view) { - if (version >= 6) { + if (version >= 5) { return () -> dataFileSerializer.deserializeList(view); - } else if (version >= 4) { + } else if (version == 4) { if (dataFileMeta10LegacySerializer == null) { dataFileMeta10LegacySerializer = new DataFileMeta10LegacySerializer(); } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java b/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java index d74a14d9d56b..40673ee78826 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java @@ -363,10 +363,10 @@ private static FunctionWithIOException getFileMetaS } else if (version == 2) { DataFileMeta09Serializer serializer = new DataFileMeta09Serializer(); return serializer::deserialize; - } else if (version >= 3 && version <= 4) { + } else if (version == 3) { DataFileMeta10LegacySerializer serializer = new DataFileMeta10LegacySerializer(); return serializer::deserialize; - } else if (version >= 5) { + } else if (version >= 4) { DataFileMetaSerializer serializer = new DataFileMetaSerializer(); return serializer::deserialize; } else {