diff --git a/docs/layouts/shortcodes/generated/core_configuration.html b/docs/layouts/shortcodes/generated/core_configuration.html index f60d0ec91009..bf83eb687bdf 100644 --- a/docs/layouts/shortcodes/generated/core_configuration.html +++ b/docs/layouts/shortcodes/generated/core_configuration.html @@ -1014,5 +1014,23 @@ Integer The bytes of types (CHAR, VARCHAR, BINARY, VARBINARY) devote to the zorder sort. + +
data-file.external-paths
+ (none) + String + The external paths where the data of this table will be written, multiple elements separated by commas. + + +
data-file.external-paths.strategy
+ none +

Enum

+ The strategy of selecting an external path when writing data.

Possible values: + + +
data-file.external-paths.specific-fs
+ (none) + String + The specific file system of the external path when data-file.external-paths.strategy is set to specific-fs, should be the prefix scheme of the external path, now supported are s3 and oss. + diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java index efd886501266..8c59cdcd4169 100644 --- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java +++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java @@ -124,6 +124,33 @@ public class CoreOptions implements Serializable { + "if there is no primary key, the full row will be used.") .build()); + public static final ConfigOption DATA_FILE_EXTERNAL_PATHS = + key("data-file.external-paths") + .stringType() + .noDefaultValue() + .withDescription( + "The external paths where the data of this table will be written, " + + "multiple elements separated by commas."); + + public static final ConfigOption DATA_FILE_EXTERNAL_PATHS_STRATEGY = + key("data-file.external-paths.strategy") + .enumType(ExternalPathStrategy.class) + .defaultValue(ExternalPathStrategy.NONE) + .withDescription( + "The strategy of selecting an external path when writing data."); + + public static final ConfigOption DATA_FILE_EXTERNAL_PATHS_SPECIFIC_FS = + key("data-file.external-paths.specific-fs") + .stringType() + .noDefaultValue() + .withDescription( + "The specific file system of the external path when " + + DATA_FILE_EXTERNAL_PATHS_STRATEGY.key() + + " is set to " + + ExternalPathStrategy.SPECIFIC_FS + + ", should be the prefix scheme of the external path, now supported are s3 and oss."); + + // todo, this path is the table schema path, the name will be changed in the later PR. @ExcludeFromDocumentation("Internal use only") public static final ConfigOption PATH = key("path") @@ -2181,6 +2208,21 @@ public PartitionExpireStrategy partitionExpireStrategy() { return options.get(PARTITION_EXPIRATION_STRATEGY); } + @Nullable + public String dataFileExternalPaths() { + return options.get(DATA_FILE_EXTERNAL_PATHS); + } + + @Nullable + public ExternalPathStrategy externalPathStrategy() { + return options.get(DATA_FILE_EXTERNAL_PATHS_STRATEGY); + } + + @Nullable + public String externalSpecificFS() { + return options.get(DATA_FILE_EXTERNAL_PATHS_SPECIFIC_FS); + } + public String partitionTimestampFormatter() { return options.get(PARTITION_TIMESTAMP_FORMATTER); } @@ -2988,6 +3030,40 @@ public InlineElement getDescription() { } } + /** Specifies the strategy for selecting external storage paths. */ + public enum ExternalPathStrategy implements DescribedEnum { + NONE( + "none", + "Do not choose any external storage, data will still be written to the default warehouse path."), + + SPECIFIC_FS( + "specific-fs", + "Select a specific file system as the external path. Currently supported are S3 and OSS."), + + ROUND_ROBIN( + "round-robin", + "When writing a new file, a path is chosen from data-file.external-paths in turn."); + + private final String value; + + private final String description; + + ExternalPathStrategy(String value, String description) { + this.value = value; + this.description = description; + } + + @Override + public String toString() { + return value; + } + + @Override + public InlineElement getDescription() { + return text(description); + } + } + /** Specifies the local file type for lookup. */ public enum LookupLocalFileType implements DescribedEnum { SORT("sort", "Construct a sorted file for lookup."), diff --git a/paimon-common/src/main/java/org/apache/paimon/fs/DataFileExternalPathProvider.java b/paimon-common/src/main/java/org/apache/paimon/fs/DataFileExternalPathProvider.java new file mode 100644 index 000000000000..71e6f78184cd --- /dev/null +++ b/paimon-common/src/main/java/org/apache/paimon/fs/DataFileExternalPathProvider.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.fs; + +import javax.annotation.Nullable; + +import java.io.Serializable; +import java.util.Objects; +import java.util.Optional; + +/** Provider for external data paths. */ +public class DataFileExternalPathProvider implements Serializable { + @Nullable private final TableExternalPathProvider tableExternalPathProvider; + private final Path relativeBucketPath; + + public DataFileExternalPathProvider( + @Nullable TableExternalPathProvider tableExternalPathProvider, + Path relativeBucketPath) { + this.tableExternalPathProvider = tableExternalPathProvider; + this.relativeBucketPath = relativeBucketPath; + } + + /** + * Get the next external data path. + * + * @return the next external data path + */ + public Optional getNextExternalDataPath() { + return Optional.ofNullable(tableExternalPathProvider) + .flatMap(TableExternalPathProvider::getNextExternalPath) + .map(path -> new Path(path, relativeBucketPath)); + } + + public boolean externalPathExists() { + return tableExternalPathProvider != null && tableExternalPathProvider.externalPathExists(); + } + + @Override + public final boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof DataFileExternalPathProvider)) { + return false; + } + + DataFileExternalPathProvider that = (DataFileExternalPathProvider) o; + return Objects.equals(tableExternalPathProvider, that.tableExternalPathProvider) + && Objects.equals(relativeBucketPath, that.relativeBucketPath); + } + + @Override + public int hashCode() { + return Objects.hash(tableExternalPathProvider, relativeBucketPath); + } + + @Override + public String toString() { + return "DataFileExternalPathProvider{" + + " externalPathProvider=" + + tableExternalPathProvider + + ", relativeBucketPath=" + + relativeBucketPath + + "}"; + } +} diff --git a/paimon-common/src/main/java/org/apache/paimon/fs/TableExternalPathProvider.java b/paimon-common/src/main/java/org/apache/paimon/fs/TableExternalPathProvider.java new file mode 100644 index 000000000000..8ae94e22b72d --- /dev/null +++ b/paimon-common/src/main/java/org/apache/paimon/fs/TableExternalPathProvider.java @@ -0,0 +1,192 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.fs; + +import org.apache.paimon.CoreOptions.ExternalPathStrategy; +import org.apache.paimon.annotation.VisibleForTesting; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Random; + +/** Provider for external paths. */ +public class TableExternalPathProvider implements Serializable { + private final Map externalPathsMap; + private final List externalPathsList; + + private final ExternalPathStrategy externalPathStrategy; + private final String externalSpecificFS; + private int currentIndex = 0; + private boolean externalPathExists; + + public TableExternalPathProvider( + String externalPaths, + ExternalPathStrategy externalPathStrategy, + String externalSpecificFS) { + this.externalPathsMap = new HashMap<>(); + this.externalPathsList = new ArrayList<>(); + this.externalPathStrategy = externalPathStrategy; + if (externalSpecificFS != null) { + this.externalSpecificFS = externalSpecificFS.toLowerCase(); + } else { + this.externalSpecificFS = null; + } + initExternalPaths(externalPaths); + if (!externalPathsList.isEmpty()) { + this.currentIndex = new Random().nextInt(externalPathsList.size()); + } + } + + private void initExternalPaths(String externalPaths) { + if (externalPaths == null) { + return; + } + + String[] tmpArray = externalPaths.split(","); + for (String s : tmpArray) { + Path path = new Path(s.trim()); + String scheme = path.toUri().getScheme(); + if (scheme == null) { + throw new IllegalArgumentException("scheme should not be null: " + path); + } + scheme = scheme.toLowerCase(); + externalPathsMap.put(scheme, path); + externalPathsList.add(path); + } + + if (externalPathStrategy != null + && externalPathStrategy.equals(ExternalPathStrategy.SPECIFIC_FS)) { + if (externalSpecificFS == null) { + throw new IllegalArgumentException("external specific fs should not be null: "); + } + + if (!externalPathsMap.containsKey(externalSpecificFS)) { + throw new IllegalArgumentException( + "external specific fs not found: " + externalSpecificFS); + } + } + + if (!externalPathsMap.isEmpty() + && !externalPathsList.isEmpty() + && externalPathStrategy != ExternalPathStrategy.NONE) { + externalPathExists = true; + } + } + + /** + * Get the next external path. + * + * @return the next external path + */ + public Optional getNextExternalPath() { + if (externalPathsMap == null || externalPathsMap.isEmpty()) { + return Optional.empty(); + } + + switch (externalPathStrategy) { + case NONE: + return Optional.empty(); + case SPECIFIC_FS: + return getSpecificFSExternalPath(); + case ROUND_ROBIN: + return getRoundRobinPath(); + default: + return Optional.empty(); + } + } + + private Optional getSpecificFSExternalPath() { + if (!externalPathsMap.containsKey(externalSpecificFS)) { + return Optional.empty(); + } + return Optional.of(externalPathsMap.get(externalSpecificFS)); + } + + private Optional getRoundRobinPath() { + currentIndex = (currentIndex + 1) % externalPathsList.size(); + return Optional.of(externalPathsList.get(currentIndex)); + } + + public boolean externalPathExists() { + return externalPathExists; + } + + @VisibleForTesting + public Map getExternalPathsMap() { + return externalPathsMap; + } + + @VisibleForTesting + public List getExternalPathsList() { + return externalPathsList; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + TableExternalPathProvider that = (TableExternalPathProvider) o; + return currentIndex == that.currentIndex + && externalPathExists == that.externalPathExists + && externalPathsMap.equals(that.externalPathsMap) + && externalPathsList.equals(that.externalPathsList) + && externalPathStrategy == that.externalPathStrategy + && Objects.equals(externalSpecificFS, that.externalSpecificFS); + } + + @Override + public String toString() { + return "ExternalPathProvider{" + + " externalPathsMap=" + + externalPathsMap + + ", externalPathsList=" + + externalPathsList + + ", externalPathStrategy=" + + externalPathStrategy + + ", externalSpecificFS='" + + externalSpecificFS + + '\'' + + ", currentIndex=" + + currentIndex + + ", externalPathExists=" + + externalPathExists + + "}"; + } + + @Override + public int hashCode() { + return Objects.hash( + externalPathsMap, + externalPathsList, + externalPathStrategy, + externalSpecificFS, + currentIndex, + externalPathExists); + } +} diff --git a/paimon-common/src/test/java/org/apache/paimon/fs/TableExternalPathProviderTest.java b/paimon-common/src/test/java/org/apache/paimon/fs/TableExternalPathProviderTest.java new file mode 100644 index 000000000000..0a444768cf6e --- /dev/null +++ b/paimon-common/src/test/java/org/apache/paimon/fs/TableExternalPathProviderTest.java @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.fs; + +import org.apache.paimon.CoreOptions.ExternalPathStrategy; + +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; + +import static org.apache.paimon.testutils.assertj.PaimonAssertions.anyCauseMatches; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** Test for {@link TableExternalPathProvider}. */ +public class TableExternalPathProviderTest { + private TableExternalPathProvider provider; + + @BeforeEach + public void setUp() { + provider = + new TableExternalPathProvider( + "oss://bucket1/path1,s3://bucket2/path2", + ExternalPathStrategy.ROUND_ROBIN, + null); + } + + @Test + public void testInitExternalPaths() { + assertThat(provider.externalPathExists()).isTrue(); + assertThat(provider.getExternalPathsMap().size()).isEqualTo(2); + assertThat(provider.getExternalPathsList().size()).isEqualTo(2); + } + + @Test + public void testGetNextExternalPathRoundRobinSimple() { + String path1 = "s3://bucket2/path2"; + String path2 = "oss://bucket1/path1"; + List expectedPaths = new ArrayList(); + expectedPaths.add(path1); + expectedPaths.add(path2); + String externalPaths = path1 + "," + path2; + provider = + new TableExternalPathProvider( + externalPaths, ExternalPathStrategy.ROUND_ROBIN, null); + + // Collect the paths returned by getNextExternalPath + List actualPaths = new ArrayList<>(); + int expectIndex = 0; + for (int i = 0; i < 6; i++) { // Collect more paths to ensure all are covered + Optional path = provider.getNextExternalPath(); + assertThat(path.isPresent()).isTrue(); + actualPaths.add(path.get().toString()); + if (i == 0) { + if (path.get().toString().equals(path1)) { + expectIndex = 0; + } else if (path.get().toString().equals(path2)) { + expectIndex = 1; + } + } + + expectIndex = (expectIndex) % expectedPaths.size(); + assertThat(path.get().toString().equals(expectedPaths.get(expectIndex))).isTrue(); + expectIndex++; + } + + // Check that all expected paths are present in the actual paths + for (String expectedPath : expectedPaths) { + assertThat(actualPaths).contains(expectedPath); + } + } + + @Test + public void testGetNextExternalPathRoundRobinComplex() { + List expectedPathsList = new ArrayList(); + for (int i = 0; i < 20; i++) { + if (i % 2 == 0) { + expectedPathsList.add("oss://bucket1/path" + i); + } else { + expectedPathsList.add("s3://bucket2/path" + i); + } + } + String externalPaths = String.join(",", expectedPathsList); + provider = + new TableExternalPathProvider( + externalPaths, ExternalPathStrategy.ROUND_ROBIN, null); + + // Collect the paths returned by getNextExternalPath + List actualPaths = new ArrayList<>(); + int expectIndex = 0; + for (int i = 0; i < 40; i++) { // Collect more paths to ensure all are covered + Optional path = provider.getNextExternalPath(); + assertThat(path.isPresent()).isTrue(); + actualPaths.add(path.get().toString()); + if (i == 0) { + for (int j = 0; j < expectedPathsList.size(); j++) { + if (path.get().toString().equals(expectedPathsList.get(j))) { + expectIndex = j; + break; + } + } + } + expectIndex = (expectIndex) % expectedPathsList.size(); + assertThat(path.get().toString().equals(expectedPathsList.get(expectIndex))).isTrue(); + expectIndex++; + } + + // Check that all expected paths are present in the actual paths + for (String expectedPath : expectedPathsList) { + assertThat(actualPaths).contains(expectedPath); + } + } + + @Test + public void testGetNextExternalPathSpecificFS() { + provider = + new TableExternalPathProvider( + "oss://bucket1/path1,s3://bucket2/path2", + ExternalPathStrategy.SPECIFIC_FS, + "OSS"); + + Optional path = provider.getNextExternalPath(); + assertThat(path.isPresent()).isTrue(); + assertThat(path.get().toString()).isEqualTo("oss://bucket1/path1"); + } + + @Test + public void testGetNextExternalPathNone() { + provider = + new TableExternalPathProvider( + "oss://bucket1/path1,s3://bucket2/path2", ExternalPathStrategy.NONE, "OSS"); + + Optional path = provider.getNextExternalPath(); + assertThat(path.isPresent()).isFalse(); + } + + @Test + public void testUnsupportedExternalPath() { + Assertions.assertThatThrownBy( + () -> { + new TableExternalPathProvider( + "hdfs://bucket1/path1", + ExternalPathStrategy.SPECIFIC_FS, + "oss"); + }) + .isInstanceOf(IllegalArgumentException.class); + } + + @Test + public void testUnsupportedExternalSpecificFS() { + assertThatThrownBy( + () -> { + provider = + new TableExternalPathProvider( + "oss://bucket1/path1", + ExternalPathStrategy.SPECIFIC_FS, + "S3"); + }) + .satisfies( + anyCauseMatches( + IllegalArgumentException.class, + "external specific fs not found: s3")); + } + + @Test + public void testExternalSpecificFSNull() { + Assertions.assertThatThrownBy( + () -> { + new TableExternalPathProvider( + "oss://bucket1/path1", ExternalPathStrategy.SPECIFIC_FS, null); + }) + .isInstanceOf(IllegalArgumentException.class); + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java index 58cc6f47a9af..655d75edfa96 100644 --- a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java +++ b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java @@ -18,10 +18,12 @@ package org.apache.paimon; +import org.apache.paimon.CoreOptions.ExternalPathStrategy; import org.apache.paimon.data.InternalRow; import org.apache.paimon.deletionvectors.DeletionVectorsIndexFile; import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.Path; +import org.apache.paimon.fs.TableExternalPathProvider; import org.apache.paimon.index.HashIndexFile; import org.apache.paimon.index.IndexFileHandler; import org.apache.paimon.manifest.IndexManifestFile; @@ -119,7 +121,16 @@ protected FileStorePathFactory pathFactory(String format) { options.legacyPartitionName(), options.fileSuffixIncludeCompression(), options.fileCompression(), - options.dataFilePathDirectory()); + options.dataFilePathDirectory(), + getExternalPathProvider()); + } + + private TableExternalPathProvider getExternalPathProvider() { + String externalPaths = options.dataFileExternalPaths(); + ExternalPathStrategy externalPathStrategy = options.externalPathStrategy(); + String externalSpecificFS = options.externalSpecificFS(); + return new TableExternalPathProvider( + externalPaths, externalPathStrategy, externalSpecificFS); } @Override diff --git a/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergManifestFile.java b/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergManifestFile.java index 5955da6220f8..d4cfc0b5ece0 100644 --- a/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergManifestFile.java +++ b/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergManifestFile.java @@ -154,6 +154,7 @@ private class IcebergManifestEntryWriter path, serializer::toRow, fileCompression, + false, false); this.partitionStatsCollector = new SimpleStatsCollector(partitionType); this.sequenceNumber = sequenceNumber; diff --git a/paimon-core/src/main/java/org/apache/paimon/io/DataFilePathFactory.java b/paimon-core/src/main/java/org/apache/paimon/io/DataFilePathFactory.java index 19525ab6cd91..be32afa644e9 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/DataFilePathFactory.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/DataFilePathFactory.java @@ -19,9 +19,11 @@ package org.apache.paimon.io; import org.apache.paimon.annotation.VisibleForTesting; +import org.apache.paimon.fs.DataFileExternalPathProvider; import org.apache.paimon.fs.Path; import org.apache.paimon.manifest.FileEntry; +import javax.annotation.Nullable; import javax.annotation.concurrent.ThreadSafe; import java.util.Optional; @@ -43,6 +45,8 @@ public class DataFilePathFactory { private final String changelogFilePrefix; private final boolean fileSuffixIncludeCompression; private final String fileCompression; + @Nullable private final DataFileExternalPathProvider dataFileExternalPathProvider; + private final boolean isExternalPath; public DataFilePathFactory( Path parent, @@ -50,7 +54,8 @@ public DataFilePathFactory( String dataFilePrefix, String changelogFilePrefix, boolean fileSuffixIncludeCompression, - String fileCompression) { + String fileCompression, + @Nullable DataFileExternalPathProvider dataFileExternalPathProvider) { this.parent = parent; this.uuid = UUID.randomUUID().toString(); this.pathCount = new AtomicInteger(0); @@ -59,6 +64,12 @@ public DataFilePathFactory( this.changelogFilePrefix = changelogFilePrefix; this.fileSuffixIncludeCompression = fileSuffixIncludeCompression; this.fileCompression = fileCompression; + this.dataFileExternalPathProvider = dataFileExternalPathProvider; + if (dataFileExternalPathProvider != null) { + this.isExternalPath = dataFileExternalPathProvider.externalPathExists(); + } else { + this.isExternalPath = false; + } } public Path newPath() { @@ -74,7 +85,10 @@ public String newChangelogFileName() { } public Path newPath(String prefix) { - return new Path(parent, newFileName(prefix)); + return Optional.ofNullable(dataFileExternalPathProvider) + .flatMap(DataFileExternalPathProvider::getNextExternalDataPath) + .map(path -> new Path(path, newFileName(prefix))) + .orElseGet(() -> new Path(parent, newFileName(prefix))); } private String newFileName(String prefix) { @@ -133,6 +147,10 @@ public static String formatIdentifier(String fileName) { return fileName.substring(index + 1); } + public boolean isExternalPath() { + return isExternalPath; + } + @VisibleForTesting String uuid() { return uuid; diff --git a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileWriter.java b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileWriter.java index f78d7556487f..359855260e78 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileWriter.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileWriter.java @@ -91,7 +91,8 @@ public KeyValueDataFileWriter( String compression, CoreOptions options, FileSource fileSource, - FileIndexOptions fileIndexOptions) { + FileIndexOptions fileIndexOptions, + boolean isExternalPath) { super( fileIO, factory, @@ -102,7 +103,8 @@ public KeyValueDataFileWriter( compression, StatsCollectorFactories.createStatsFactories( options, writeRowType.getFieldNames(), keyType.getFieldNames()), - options.asyncFileWrite()); + options.asyncFileWrite(), + isExternalPath); this.keyType = keyType; this.valueType = valueType; @@ -177,6 +179,7 @@ public DataFileMeta result() throws IOException { ? DataFileIndexWriter.EMPTY_RESULT : dataFileIndexWriter.result(); + String externalPath = isExternalPath ? path.toString() : null; return new DataFileMeta( path.getName(), fileIO.getFileSize(path), @@ -196,7 +199,7 @@ public DataFileMeta result() throws IOException { indexResult.embeddedIndexBytes(), fileSource, valueStatsPair.getKey(), - null); + externalPath); } abstract Pair fetchKeyValueStats(SimpleColStats[] rowStats); diff --git a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileWriterImpl.java b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileWriterImpl.java index 27a1aef64e36..5bfc322a50bc 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileWriterImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileWriterImpl.java @@ -52,7 +52,8 @@ public KeyValueDataFileWriterImpl( String compression, CoreOptions options, FileSource fileSource, - FileIndexOptions fileIndexOptions) { + FileIndexOptions fileIndexOptions, + boolean isExternalPath) { super( fileIO, factory, @@ -67,7 +68,8 @@ public KeyValueDataFileWriterImpl( compression, options, fileSource, - fileIndexOptions); + fileIndexOptions, + isExternalPath); } @Override diff --git a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileWriterFactory.java b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileWriterFactory.java index 7b6f4f0e3c56..feee72438985 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileWriterFactory.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileWriterFactory.java @@ -95,7 +95,10 @@ public RollingFileWriter createRollingMergeTreeFileWrite return new RollingFileWriter<>( () -> createDataFileWriter( - formatContext.pathFactory(level).newPath(), level, fileSource), + formatContext.pathFactory(level).newPath(), + level, + fileSource, + formatContext.pathFactory(level).isExternalPath()), suggestedFileSize); } @@ -105,12 +108,13 @@ public RollingFileWriter createRollingChangelogFileWrite createDataFileWriter( formatContext.pathFactory(level).newChangelogPath(), level, - FileSource.APPEND), + FileSource.APPEND, + formatContext.pathFactory(level).isExternalPath()), suggestedFileSize); } private KeyValueDataFileWriter createDataFileWriter( - Path path, int level, FileSource fileSource) { + Path path, int level, FileSource fileSource, boolean isExternalPath) { return formatContext.thinModeEnabled() ? new KeyValueThinDataFileWriterImpl( fileIO, @@ -125,7 +129,8 @@ private KeyValueDataFileWriter createDataFileWriter( formatContext.compression(level), options, fileSource, - fileIndexOptions) + fileIndexOptions, + isExternalPath) : new KeyValueDataFileWriterImpl( fileIO, formatContext.writerFactory(level), @@ -139,7 +144,8 @@ private KeyValueDataFileWriter createDataFileWriter( formatContext.compression(level), options, fileSource, - fileIndexOptions); + fileIndexOptions, + isExternalPath); } public void deleteFile(DataFileMeta file) { diff --git a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueThinDataFileWriterImpl.java b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueThinDataFileWriterImpl.java index dd7ebb006764..4b5e1c842c8d 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueThinDataFileWriterImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueThinDataFileWriterImpl.java @@ -77,7 +77,8 @@ public KeyValueThinDataFileWriterImpl( String compression, CoreOptions options, FileSource fileSource, - FileIndexOptions fileIndexOptions) { + FileIndexOptions fileIndexOptions, + boolean isExternalPath) { super( fileIO, factory, @@ -92,7 +93,8 @@ public KeyValueThinDataFileWriterImpl( compression, options, fileSource, - fileIndexOptions); + fileIndexOptions, + isExternalPath); Map idToIndex = new HashMap<>(valueType.getFieldCount()); for (int i = 0; i < valueType.getFieldCount(); i++) { idToIndex.put(valueType.getFields().get(i).id(), i); diff --git a/paimon-core/src/main/java/org/apache/paimon/io/RowDataFileWriter.java b/paimon-core/src/main/java/org/apache/paimon/io/RowDataFileWriter.java index cd46d67e3b60..97af4d067a3a 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/RowDataFileWriter.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/RowDataFileWriter.java @@ -66,7 +66,8 @@ public RowDataFileWriter( FileIndexOptions fileIndexOptions, FileSource fileSource, boolean asyncFileWrite, - boolean statsDenseStore) { + boolean statsDenseStore, + boolean isExternalPath) { super( fileIO, factory, @@ -76,7 +77,8 @@ public RowDataFileWriter( simpleStatsExtractor, fileCompression, statsCollectors, - asyncFileWrite); + asyncFileWrite, + isExternalPath); this.schemaId = schemaId; this.seqNumCounter = seqNumCounter; this.statsArraySerializer = new SimpleStatsConverter(writeSchema, statsDenseStore); @@ -111,6 +113,7 @@ public DataFileMeta result() throws IOException { dataFileIndexWriter == null ? DataFileIndexWriter.EMPTY_RESULT : dataFileIndexWriter.result(); + String externalPath = isExternalPath ? path.toString() : null; return DataFileMeta.forAppend( path.getName(), fileIO.getFileSize(path), @@ -125,6 +128,6 @@ public DataFileMeta result() throws IOException { indexResult.embeddedIndexBytes(), fileSource, statsPair.getKey(), - null); + externalPath); } } diff --git a/paimon-core/src/main/java/org/apache/paimon/io/RowDataRollingFileWriter.java b/paimon-core/src/main/java/org/apache/paimon/io/RowDataRollingFileWriter.java index b929a4ae22af..3b4b05243d50 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/RowDataRollingFileWriter.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/RowDataRollingFileWriter.java @@ -64,7 +64,8 @@ public RowDataRollingFileWriter( fileIndexOptions, fileSource, asyncFileWrite, - statsDenseStore), + statsDenseStore, + pathFactory.isExternalPath()), targetFileSize); } } diff --git a/paimon-core/src/main/java/org/apache/paimon/io/SingleFileWriter.java b/paimon-core/src/main/java/org/apache/paimon/io/SingleFileWriter.java index f303e8597870..7a3cc4143ba7 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/SingleFileWriter.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/SingleFileWriter.java @@ -54,6 +54,7 @@ public abstract class SingleFileWriter implements FileWriter { private long recordCount; protected boolean closed; + protected boolean isExternalPath; public SingleFileWriter( FileIO fileIO, @@ -61,7 +62,8 @@ public SingleFileWriter( Path path, Function converter, String compression, - boolean asyncWrite) { + boolean asyncWrite, + boolean isExternalPath) { this.fileIO = fileIO; this.path = path; this.converter = converter; @@ -84,6 +86,7 @@ public SingleFileWriter( this.recordCount = 0; this.closed = false; + this.isExternalPath = isExternalPath; } public Path path() { diff --git a/paimon-core/src/main/java/org/apache/paimon/io/StatsCollectingSingleFileWriter.java b/paimon-core/src/main/java/org/apache/paimon/io/StatsCollectingSingleFileWriter.java index 67a3fa6d1ace..0ab2de8c2bbb 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/StatsCollectingSingleFileWriter.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/StatsCollectingSingleFileWriter.java @@ -59,8 +59,9 @@ public StatsCollectingSingleFileWriter( @Nullable SimpleStatsExtractor simpleStatsExtractor, String compression, SimpleColStatsCollector.Factory[] statsCollectors, - boolean asyncWrite) { - super(fileIO, factory, path, converter, compression, asyncWrite); + boolean asyncWrite, + boolean isExternalPath) { + super(fileIO, factory, path, converter, compression, asyncWrite, isExternalPath); this.simpleStatsExtractor = simpleStatsExtractor; if (this.simpleStatsExtractor == null) { this.simpleStatsCollector = new SimpleStatsCollector(writeSchema, statsCollectors); diff --git a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFile.java b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFile.java index 1aba2ef19561..34bf77345a86 100644 --- a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFile.java +++ b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFile.java @@ -132,8 +132,8 @@ private class ManifestEntryWriter extends SingleFileWriter getHierarchicalPartitionPath(BinaryRow partition) { return PartitionPathUtils.generateHierarchicalPartitionPaths( partitionComputer.generatePartValues( diff --git a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java index 77570205327e..3e54835a0ef7 100644 --- a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java @@ -523,7 +523,8 @@ private DataFilePathFactory createPathFactory() { CoreOptions.DATA_FILE_PREFIX.defaultValue(), CoreOptions.CHANGELOG_FILE_PREFIX.defaultValue(), CoreOptions.FILE_SUFFIX_INCLUDE_COMPRESSION.defaultValue(), - CoreOptions.FILE_COMPRESSION.defaultValue()); + CoreOptions.FILE_COMPRESSION.defaultValue(), + null); } private AppendOnlyWriter createEmptyWriter(long targetFileSize) { diff --git a/paimon-core/src/test/java/org/apache/paimon/format/FileFormatSuffixTest.java b/paimon-core/src/test/java/org/apache/paimon/format/FileFormatSuffixTest.java index c29519ce8b9b..1a7202e91125 100644 --- a/paimon-core/src/test/java/org/apache/paimon/format/FileFormatSuffixTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/format/FileFormatSuffixTest.java @@ -72,7 +72,8 @@ public void testFileSuffix(@TempDir java.nio.file.Path tempDir) throws Exception CoreOptions.DATA_FILE_PREFIX.defaultValue(), CoreOptions.CHANGELOG_FILE_PREFIX.defaultValue(), CoreOptions.FILE_SUFFIX_INCLUDE_COMPRESSION.defaultValue(), - CoreOptions.FILE_COMPRESSION.defaultValue()); + CoreOptions.FILE_COMPRESSION.defaultValue(), + null); FileFormat fileFormat = FileFormat.fromIdentifier(format, new Options()); LinkedList toCompact = new LinkedList<>(); CoreOptions options = new CoreOptions(new HashMap<>()); diff --git a/paimon-core/src/test/java/org/apache/paimon/io/DataFilePathFactoryTest.java b/paimon-core/src/test/java/org/apache/paimon/io/DataFilePathFactoryTest.java index 109f33c3dc1b..c406ae24f7c0 100644 --- a/paimon-core/src/test/java/org/apache/paimon/io/DataFilePathFactoryTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/io/DataFilePathFactoryTest.java @@ -40,7 +40,8 @@ public void testNoPartition() { CoreOptions.DATA_FILE_PREFIX.defaultValue(), CoreOptions.CHANGELOG_FILE_PREFIX.defaultValue(), CoreOptions.FILE_SUFFIX_INCLUDE_COMPRESSION.defaultValue(), - CoreOptions.FILE_COMPRESSION.defaultValue()); + CoreOptions.FILE_COMPRESSION.defaultValue(), + null); String uuid = pathFactory.uuid(); for (int i = 0; i < 20; i++) { @@ -69,7 +70,8 @@ public void testWithPartition() { CoreOptions.DATA_FILE_PREFIX.defaultValue(), CoreOptions.CHANGELOG_FILE_PREFIX.defaultValue(), CoreOptions.FILE_SUFFIX_INCLUDE_COMPRESSION.defaultValue(), - CoreOptions.FILE_COMPRESSION.defaultValue()); + CoreOptions.FILE_COMPRESSION.defaultValue(), + null); String uuid = pathFactory.uuid(); for (int i = 0; i < 20; i++) { diff --git a/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java b/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java index 8f2c815404cf..2d0efdb6d10a 100644 --- a/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java @@ -237,6 +237,7 @@ protected KeyValueFileWriterFactory createWriterFactory(String pathStr, String f CoreOptions.PARTITION_GENERATE_LEGCY_NAME.defaultValue(), CoreOptions.FILE_SUFFIX_INCLUDE_COMPRESSION.defaultValue(), CoreOptions.FILE_COMPRESSION.defaultValue(), + null, null); int suggestedFileSize = ThreadLocalRandom.current().nextInt(8192) + 1024; FileIO fileIO = FileIOFinder.find(path); @@ -257,6 +258,7 @@ protected KeyValueFileWriterFactory createWriterFactory(String pathStr, String f CoreOptions.PARTITION_GENERATE_LEGCY_NAME.defaultValue(), CoreOptions.FILE_SUFFIX_INCLUDE_COMPRESSION.defaultValue(), CoreOptions.FILE_COMPRESSION.defaultValue(), + null, null)); return KeyValueFileWriterFactory.builder( diff --git a/paimon-core/src/test/java/org/apache/paimon/io/RollingFileWriterTest.java b/paimon-core/src/test/java/org/apache/paimon/io/RollingFileWriterTest.java index 9e1de71451a8..1a58bfe46573 100644 --- a/paimon-core/src/test/java/org/apache/paimon/io/RollingFileWriterTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/io/RollingFileWriterTest.java @@ -84,7 +84,8 @@ public void initialize(String identifier, boolean statsDenseStore) { .defaultValue(), CoreOptions.FILE_SUFFIX_INCLUDE_COMPRESSION .defaultValue(), - CoreOptions.FILE_COMPRESSION.defaultValue()) + CoreOptions.FILE_COMPRESSION.defaultValue(), + null) .newPath(), SCHEMA, fileFormat @@ -103,7 +104,8 @@ public void initialize(String identifier, boolean statsDenseStore) { new FileIndexOptions(), FileSource.APPEND, true, - statsDenseStore), + statsDenseStore, + false), TARGET_FILE_SIZE); } diff --git a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTestBase.java b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTestBase.java index 19bd6a856bf9..d27ab823544c 100644 --- a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTestBase.java +++ b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTestBase.java @@ -151,6 +151,7 @@ protected ManifestFile createManifestFile(String pathStr) { CoreOptions.PARTITION_GENERATE_LEGCY_NAME.defaultValue(), CoreOptions.FILE_SUFFIX_INCLUDE_COMPRESSION.defaultValue(), CoreOptions.FILE_COMPRESSION.defaultValue(), + null, null), Long.MAX_VALUE, null) diff --git a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileTest.java b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileTest.java index 089e11656a99..4c949eae961d 100644 --- a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileTest.java @@ -106,6 +106,7 @@ private ManifestFile createManifestFile(String pathStr) { CoreOptions.PARTITION_GENERATE_LEGCY_NAME.defaultValue(), CoreOptions.FILE_SUFFIX_INCLUDE_COMPRESSION.defaultValue(), CoreOptions.FILE_COMPRESSION.defaultValue(), + null, null); int suggestedFileSize = ThreadLocalRandom.current().nextInt(8192) + 1024; FileIO fileIO = FileIOFinder.find(path); diff --git a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestListTest.java b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestListTest.java index 5bf01f32cb07..f8b69a04b096 100644 --- a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestListTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestListTest.java @@ -110,6 +110,7 @@ private ManifestList createManifestList(String pathStr) { CoreOptions.PARTITION_GENERATE_LEGCY_NAME.defaultValue(), CoreOptions.FILE_SUFFIX_INCLUDE_COMPRESSION.defaultValue(), CoreOptions.FILE_COMPRESSION.defaultValue(), + null, null); return new ManifestList.Factory(FileIOFinder.find(path), avro, "zstd", pathFactory, null) .create(); diff --git a/paimon-core/src/test/java/org/apache/paimon/utils/FileStorePathFactoryTest.java b/paimon-core/src/test/java/org/apache/paimon/utils/FileStorePathFactoryTest.java index c5cda2286dfb..bf5653603c0c 100644 --- a/paimon-core/src/test/java/org/apache/paimon/utils/FileStorePathFactoryTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/utils/FileStorePathFactoryTest.java @@ -92,6 +92,7 @@ public void testCreateDataFilePathFactoryWithPartition() { CoreOptions.PARTITION_GENERATE_LEGCY_NAME.defaultValue(), CoreOptions.FILE_SUFFIX_INCLUDE_COMPRESSION.defaultValue(), CoreOptions.FILE_COMPRESSION.defaultValue(), + null, null); assertPartition("20211224", 16, pathFactory, "/dt=20211224/hr=16"); @@ -134,6 +135,7 @@ public static FileStorePathFactory createNonPartFactory(Path root) { CoreOptions.PARTITION_GENERATE_LEGCY_NAME.defaultValue(), CoreOptions.FILE_SUFFIX_INCLUDE_COMPRESSION.defaultValue(), CoreOptions.FILE_COMPRESSION.defaultValue(), + null, null); } } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/AppendOnlyTableITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/AppendOnlyTableITCase.java index e29f8ab56ad7..d05eeb16b299 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/AppendOnlyTableITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/AppendOnlyTableITCase.java @@ -21,13 +21,17 @@ import org.apache.paimon.CoreOptions; import org.apache.paimon.Snapshot; import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.fs.local.LocalFileIOLoader; import org.apache.paimon.utils.BlockingIterator; +import org.apache.paimon.utils.TraceableFileIO; import org.apache.flink.table.planner.factories.TestValuesTableFactory; import org.apache.flink.types.Row; import org.apache.flink.types.RowKind; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; +import java.nio.file.Path; import java.time.LocalDateTime; import java.time.ZoneId; import java.util.Arrays; @@ -40,6 +44,8 @@ /** Test case for append-only managed table. */ public class AppendOnlyTableITCase extends CatalogITCaseBase { + @TempDir Path tempExternalPath1; + @TempDir Path tempExternalPath2; @Test public void testCreateUnawareBucketTableWithBucketKey() { @@ -87,6 +93,169 @@ public void testReadWrite() { assertThat(rows).containsExactlyInAnyOrder(Row.of("AAA"), Row.of("BBB")); } + @Test + public void testReadWriteWithExternalPathRoundRobinStrategy1() { + String externalPaths = + TraceableFileIO.SCHEME + + "://" + + tempExternalPath1.toString() + + "," + + LocalFileIOLoader.SCHEME + + "://" + + tempExternalPath2.toString(); + batchSql( + "ALTER TABLE append_table SET ('data-file.external-paths' = '" + + externalPaths + + "')"); + batchSql( + "ALTER TABLE append_table SET ('data-file.external-paths.strategy' = 'round-robin')"); + + batchSql("INSERT INTO append_table VALUES (1, 'AAA'), (2, 'BBB')"); + List rows = batchSql("SELECT * FROM append_table"); + assertThat(rows.size()).isEqualTo(2); + assertThat(rows).containsExactlyInAnyOrder(Row.of(1, "AAA"), Row.of(2, "BBB")); + + rows = batchSql("SELECT id FROM append_table"); + assertThat(rows.size()).isEqualTo(2); + assertThat(rows).containsExactlyInAnyOrder(Row.of(1), Row.of(2)); + + rows = batchSql("SELECT data from append_table"); + assertThat(rows.size()).isEqualTo(2); + assertThat(rows).containsExactlyInAnyOrder(Row.of("AAA"), Row.of("BBB")); + + batchSql("INSERT INTO append_table VALUES (3, 'CCC')"); + rows = batchSql("SELECT * FROM append_table"); + assertThat(rows.size()).isEqualTo(3); + assertThat(rows) + .containsExactlyInAnyOrder(Row.of(1, "AAA"), Row.of(2, "BBB"), Row.of(3, "CCC")); + + batchSql("INSERT INTO append_table VALUES (4, 'DDD')"); + rows = batchSql("SELECT * FROM append_table"); + assertThat(rows.size()).isEqualTo(4); + assertThat(rows) + .containsExactlyInAnyOrder( + Row.of(1, "AAA"), Row.of(2, "BBB"), Row.of(3, "CCC"), Row.of(4, "DDD")); + } + + @Test + public void testReadWriteWithExternalPathRoundRobinStrategy2() { + batchSql("INSERT INTO append_table VALUES (1, 'AAA'), (2, 'BBB')"); + List rows = batchSql("SELECT * FROM append_table"); + assertThat(rows.size()).isEqualTo(2); + assertThat(rows).containsExactlyInAnyOrder(Row.of(1, "AAA"), Row.of(2, "BBB")); + + rows = batchSql("SELECT id FROM append_table"); + assertThat(rows.size()).isEqualTo(2); + assertThat(rows).containsExactlyInAnyOrder(Row.of(1), Row.of(2)); + + rows = batchSql("SELECT data from append_table"); + assertThat(rows.size()).isEqualTo(2); + assertThat(rows).containsExactlyInAnyOrder(Row.of("AAA"), Row.of("BBB")); + + String externalPaths = + TraceableFileIO.SCHEME + + "://" + + tempExternalPath1.toString() + + "," + + LocalFileIOLoader.SCHEME + + "://" + + tempExternalPath2.toString(); + batchSql( + "ALTER TABLE append_table SET ('data-file.external-paths' = '" + + externalPaths + + "')"); + batchSql( + "ALTER TABLE append_table SET ('data-file.external-paths.strategy' = 'round-robin')"); + + batchSql("INSERT INTO append_table VALUES (3, 'CCC')"); + rows = batchSql("SELECT * FROM append_table"); + assertThat(rows.size()).isEqualTo(3); + assertThat(rows) + .containsExactlyInAnyOrder(Row.of(1, "AAA"), Row.of(2, "BBB"), Row.of(3, "CCC")); + + batchSql("INSERT INTO append_table VALUES (4, 'DDD')"); + rows = batchSql("SELECT * FROM append_table"); + assertThat(rows.size()).isEqualTo(4); + assertThat(rows) + .containsExactlyInAnyOrder( + Row.of(1, "AAA"), Row.of(2, "BBB"), Row.of(3, "CCC"), Row.of(4, "DDD")); + } + + @Test + public void testReadWriteWithExternalPathSpecificFSStrategy() { + String externalPaths = TraceableFileIO.SCHEME + "://" + tempExternalPath1.toString(); + batchSql( + "ALTER TABLE append_table SET ('data-file.external-paths' = '" + + externalPaths + + "')"); + batchSql( + "ALTER TABLE append_table SET ('data-file.external-paths.strategy' = 'specific-fs')"); + batchSql( + "ALTER TABLE append_table SET ('data-file.external-paths.specific-fs' = 'traceable')"); + + batchSql("INSERT INTO append_table VALUES (1, 'AAA'), (2, 'BBB')"); + List rows = batchSql("SELECT * FROM append_table"); + assertThat(rows.size()).isEqualTo(2); + assertThat(rows).containsExactlyInAnyOrder(Row.of(1, "AAA"), Row.of(2, "BBB")); + + rows = batchSql("SELECT id FROM append_table"); + assertThat(rows.size()).isEqualTo(2); + assertThat(rows).containsExactlyInAnyOrder(Row.of(1), Row.of(2)); + + rows = batchSql("SELECT data from append_table"); + assertThat(rows.size()).isEqualTo(2); + assertThat(rows).containsExactlyInAnyOrder(Row.of("AAA"), Row.of("BBB")); + + batchSql("INSERT INTO append_table VALUES (3, 'CCC')"); + rows = batchSql("SELECT * FROM append_table"); + assertThat(rows.size()).isEqualTo(3); + assertThat(rows) + .containsExactlyInAnyOrder(Row.of(1, "AAA"), Row.of(2, "BBB"), Row.of(3, "CCC")); + + batchSql("INSERT INTO append_table VALUES (4, 'DDD')"); + rows = batchSql("SELECT * FROM append_table"); + assertThat(rows.size()).isEqualTo(4); + assertThat(rows) + .containsExactlyInAnyOrder( + Row.of(1, "AAA"), Row.of(2, "BBB"), Row.of(3, "CCC"), Row.of(4, "DDD")); + } + + @Test + public void testReadWriteWithExternalPathNoneStrategy() { + String externalPaths = TraceableFileIO.SCHEME + "://" + tempExternalPath1.toString(); + batchSql( + "ALTER TABLE append_table SET ('data-file.external-paths' = '" + + externalPaths + + "')"); + batchSql("ALTER TABLE append_table SET ('data-file.external-paths.strategy' = 'none')"); + + batchSql("INSERT INTO append_table VALUES (1, 'AAA'), (2, 'BBB')"); + List rows = batchSql("SELECT * FROM append_table"); + assertThat(rows.size()).isEqualTo(2); + assertThat(rows).containsExactlyInAnyOrder(Row.of(1, "AAA"), Row.of(2, "BBB")); + + rows = batchSql("SELECT id FROM append_table"); + assertThat(rows.size()).isEqualTo(2); + assertThat(rows).containsExactlyInAnyOrder(Row.of(1), Row.of(2)); + + rows = batchSql("SELECT data from append_table"); + assertThat(rows.size()).isEqualTo(2); + assertThat(rows).containsExactlyInAnyOrder(Row.of("AAA"), Row.of("BBB")); + + batchSql("INSERT INTO append_table VALUES (3, 'CCC')"); + rows = batchSql("SELECT * FROM append_table"); + assertThat(rows.size()).isEqualTo(3); + assertThat(rows) + .containsExactlyInAnyOrder(Row.of(1, "AAA"), Row.of(2, "BBB"), Row.of(3, "CCC")); + + batchSql("INSERT INTO append_table VALUES (4, 'DDD')"); + rows = batchSql("SELECT * FROM append_table"); + assertThat(rows.size()).isEqualTo(4); + assertThat(rows) + .containsExactlyInAnyOrder( + Row.of(1, "AAA"), Row.of(2, "BBB"), Row.of(3, "CCC"), Row.of(4, "DDD")); + } + @Test public void testReadUnwareBucketTableWithRebalanceShuffle() throws Exception { batchSql( diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkCatalogTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkCatalogTest.java index 4b8cf7912192..87cc9b7f040c 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkCatalogTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkCatalogTest.java @@ -709,6 +709,7 @@ public void testCreateTableWithColumnOptions() throws Exception { Map expected = got.getOptions(); expected.remove("path"); + expected.remove("table.data.path"); expected.remove(FlinkCatalogOptions.REGISTER_TIMEOUT.key()); assertThat(catalogTable.getOptions()).isEqualTo(expected); } @@ -892,14 +893,14 @@ private void checkEquals( Map optionsToAdd, Set optionsToRemove) { Path tablePath; + Path tableDataPath; try { - tablePath = - new Path( - ((FlinkCatalog) catalog) - .catalog() - .getTable(FlinkCatalog.toIdentifier(path)) - .options() - .get(PATH.key())); + Map options = + ((FlinkCatalog) catalog) + .catalog() + .getTable(FlinkCatalog.toIdentifier(path)) + .options(); + tablePath = new Path(options.get(PATH.key())); } catch (org.apache.paimon.catalog.Catalog.TableNotExistException e) { throw new RuntimeException(e); } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java index 4a90415c191a..d0c7d35d778b 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java @@ -23,7 +23,9 @@ import org.apache.paimon.flink.util.AbstractTestBase; import org.apache.paimon.fs.Path; import org.apache.paimon.fs.local.LocalFileIO; +import org.apache.paimon.fs.local.LocalFileIOLoader; import org.apache.paimon.utils.FailingFileIO; +import org.apache.paimon.utils.TraceableFileIO; import org.apache.flink.api.common.JobStatus; import org.apache.flink.core.execution.JobClient; @@ -66,10 +68,14 @@ public class PrimaryKeyFileStoreTableITCase extends AbstractTestBase { // ------------------------------------------------------------------------ private String path; private Map tableDefaultProperties; + private String externalPath1; + private String externalPath2; @BeforeEach public void before() throws IOException { path = getTempDirPath(); + externalPath1 = getTempDirPath(); + externalPath2 = getTempDirPath(); ThreadLocalRandom random = ThreadLocalRandom.current(); tableDefaultProperties = new HashMap<>(); @@ -207,6 +213,112 @@ public void testLookupChangelog() throws Exception { innerTestChangelogProducing(Collections.singletonList("'changelog-producer' = 'lookup'")); } + @Test + public void testTableReadWriteWithExternalPathRoundRobin() throws Exception { + TableEnvironment sEnv = + tableEnvironmentBuilder() + .streamingMode() + .checkpointIntervalMs(ThreadLocalRandom.current().nextInt(900) + 100) + .parallelism(1) + .build(); + + sEnv.executeSql(createCatalogSql("testCatalog", path + "/warehouse")); + sEnv.executeSql("USE CATALOG testCatalog"); + String externalPaths = + TraceableFileIO.SCHEME + + "://" + + externalPath1.toString() + + "," + + LocalFileIOLoader.SCHEME + + "://" + + externalPath2.toString(); + sEnv.executeSql( + "CREATE TABLE T2 ( k INT, v STRING, PRIMARY KEY (k) NOT ENFORCED ) " + + "WITH ( " + + "'bucket' = '1'," + + "'data-file.external-paths' = '" + + externalPaths + + "'," + + "'data-file.external-paths.strategy' = 'round-robin'" + + ")"); + + CloseableIterator it = collect(sEnv.executeSql("SELECT * FROM T2")); + + // insert data + sEnv.executeSql("INSERT INTO T2 VALUES (1, 'A')").await(); + // read initial data + List actual = new ArrayList<>(); + for (int i = 0; i < 1; i++) { + actual.add(it.next().toString()); + } + assertThat(actual).containsExactlyInAnyOrder("+I[1, A]"); + + // insert data + sEnv.executeSql("INSERT INTO T2 VALUES (2, 'B')").await(); + + for (int i = 0; i < 1; i++) { + actual.add(it.next().toString()); + } + + // insert data + sEnv.executeSql("INSERT INTO T2 VALUES (3, 'C')").await(); + + for (int i = 0; i < 1; i++) { + actual.add(it.next().toString()); + } + + assertThat(actual).containsExactlyInAnyOrder("+I[1, A]", "+I[2, B]", "+I[3, C]"); + } + + @Test + public void testTableReadWriteWithExternalPathSpecificFS() throws Exception { + TableEnvironment sEnv = + tableEnvironmentBuilder() + .streamingMode() + .checkpointIntervalMs(ThreadLocalRandom.current().nextInt(900) + 100) + .parallelism(1) + .build(); + + sEnv.executeSql(createCatalogSql("testCatalog", path + "/warehouse")); + sEnv.executeSql("USE CATALOG testCatalog"); + String externalPaths = + TraceableFileIO.SCHEME + + "://" + + externalPath1.toString() + + "," + + "fake://" + + externalPath2.toString(); + sEnv.executeSql( + "CREATE TABLE T2 ( k INT, v STRING, PRIMARY KEY (k) NOT ENFORCED ) " + + "WITH ( " + + "'bucket' = '1'," + + "'data-file.external-paths' = '" + + externalPaths + + "'," + + "'data-file.external-paths.strategy' = 'specific-fs'," + + "'data-file.external-paths.specific-fs' = 'traceable'" + + ")"); + + CloseableIterator it = collect(sEnv.executeSql("SELECT * FROM T2")); + + // insert data + sEnv.executeSql("INSERT INTO T2 VALUES (1, 'A')").await(); + // read initial data + List actual = new ArrayList<>(); + for (int i = 0; i < 1; i++) { + actual.add(it.next().toString()); + } + assertThat(actual).containsExactlyInAnyOrder("+I[1, A]"); + + // insert data + sEnv.executeSql("INSERT INTO T2 VALUES (2, 'B'), (3, 'C')").await(); + + for (int i = 0; i < 2; i++) { + actual.add(it.next().toString()); + } + assertThat(actual).containsExactlyInAnyOrder("+I[1, A]", "+I[2, B]", "+I[3, C]"); + } + @Test public void testTableReadWriteBranch() throws Exception { TableEnvironment sEnv = diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ReadWriteTableITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ReadWriteTableITCase.java index 732e96454236..dc07817b45fd 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ReadWriteTableITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ReadWriteTableITCase.java @@ -54,6 +54,7 @@ import org.apache.flink.util.CollectionUtil; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.EnumSource; import org.junit.jupiter.params.provider.ValueSource; @@ -87,6 +88,7 @@ import static org.apache.paimon.flink.util.ReadWriteTableTestUtil.buildQuery; import static org.apache.paimon.flink.util.ReadWriteTableTestUtil.buildQueryWithTableOptions; import static org.apache.paimon.flink.util.ReadWriteTableTestUtil.buildSimpleQuery; +import static org.apache.paimon.flink.util.ReadWriteTableTestUtil.checkExternalFileStorePath; import static org.apache.paimon.flink.util.ReadWriteTableTestUtil.checkFileStorePath; import static org.apache.paimon.flink.util.ReadWriteTableTestUtil.createTable; import static org.apache.paimon.flink.util.ReadWriteTableTestUtil.createTemporaryTable; @@ -114,6 +116,8 @@ public class ReadWriteTableITCase extends AbstractTestBase { private final Map staticPartitionOverwrite = Collections.singletonMap(CoreOptions.DYNAMIC_PARTITION_OVERWRITE.key(), "false"); + @TempDir public static java.nio.file.Path externalPath1; + @BeforeEach public void setUp() { init(getTempDirPath()); @@ -215,6 +219,120 @@ public void testBatchReadWriteWithPartitionedRecordsWithPk() throws Exception { changelogRow("+I", "Euro", "2022-01-01"))); } + @Test + public void testBatchReadWriteWithPartitionedRecordsWithPkWithExternalPathRoundRobinStrategy() + throws Exception { + Map options = new HashMap<>(); + options.put( + CoreOptions.DATA_FILE_EXTERNAL_PATHS.key(), "file://" + externalPath1.toString()); + options.put(CoreOptions.DATA_FILE_EXTERNAL_PATHS_STRATEGY.key(), "ROUND-ROBIN"); + checkExternalPathTestResult(options, externalPath1.toString()); + } + + @Test + public void testBatchReadWriteWithPartitionedRecordsWithPkWithExternalPathSpecificFStrategy() + throws Exception { + Map options = new HashMap<>(); + options.put( + CoreOptions.DATA_FILE_EXTERNAL_PATHS.key(), "file://" + externalPath1.toString()); + options.put(CoreOptions.DATA_FILE_EXTERNAL_PATHS_STRATEGY.key(), "specific-fs"); + options.put(CoreOptions.DATA_FILE_EXTERNAL_PATHS_SPECIFIC_FS.key(), "file"); + checkExternalPathTestResult(options, externalPath1.toString()); + } + + public void checkExternalPathTestResult(Map options, String externalPath) + throws Exception { + List initialRecords = + Arrays.asList( + // part = 2022-01-01 + changelogRow("+I", "US Dollar", 114L, "2022-01-01"), + changelogRow("+I", "Yen", 1L, "2022-01-01"), + changelogRow("+I", "Euro", 114L, "2022-01-01"), + // part = 2022-01-02 + changelogRow("+I", "Euro", 119L, "2022-01-02")); + + String table = + createTable( + Arrays.asList("currency STRING", "rate BIGINT", "dt String"), + Arrays.asList("currency", "dt"), + Collections.emptyList(), + Collections.singletonList("dt"), + options); + + insertInto( + table, + "('US Dollar', 114, '2022-01-01')", + "('Yen', 1, '2022-01-01')", + "('Euro', 114, '2022-01-01')", + "('Euro', 119, '2022-01-02')"); + + checkExternalFileStorePath(Arrays.asList("dt=2022-01-01", "dt=2022-01-02"), externalPath); + + testBatchRead(buildSimpleQuery(table), initialRecords); + + insertOverwritePartition( + table, "PARTITION (dt = '2022-01-02')", "('Euro', 100)", "('Yen', 1)"); + + // batch read to check partition refresh + testBatchRead( + buildQuery(table, "*", "WHERE dt IN ('2022-01-02')"), + Arrays.asList( + // part = 2022-01-02 + changelogRow("+I", "Euro", 100L, "2022-01-02"), + changelogRow("+I", "Yen", 1L, "2022-01-02"))); + + // test partition filter + List expectedPartitionRecords = + Arrays.asList( + changelogRow("+I", "Yen", 1L, "2022-01-01"), + changelogRow("+I", "Euro", 114L, "2022-01-01"), + changelogRow("+I", "US Dollar", 114L, "2022-01-01")); + + testBatchRead(buildQuery(table, "*", "WHERE dt <> '2022-01-02'"), expectedPartitionRecords); + + testBatchRead( + buildQuery(table, "*", "WHERE dt IN ('2022-01-01')"), expectedPartitionRecords); + + // test field filter + testBatchRead( + buildQuery(table, "*", "WHERE rate >= 100"), + Arrays.asList( + changelogRow("+I", "US Dollar", 114L, "2022-01-01"), + changelogRow("+I", "Euro", 114L, "2022-01-01"), + changelogRow("+I", "Euro", 100L, "2022-01-02"))); + + // test partition and field filter + testBatchRead( + buildQuery(table, "*", "WHERE dt = '2022-01-02' AND rate >= 100"), + Collections.singletonList(changelogRow("+I", "Euro", 100L, "2022-01-02"))); + + // test projection + testBatchRead( + buildQuery(table, "dt", ""), + Arrays.asList( + changelogRow("+I", "2022-01-01"), + changelogRow("+I", "2022-01-01"), + changelogRow("+I", "2022-01-01"), + changelogRow("+I", "2022-01-02"), + changelogRow("+I", "2022-01-02"))); + + testBatchRead( + buildQuery(table, "dt, currency, rate", ""), + Arrays.asList( + changelogRow("+I", "2022-01-01", "US Dollar", 114L), + changelogRow("+I", "2022-01-01", "Yen", 1L), + changelogRow("+I", "2022-01-01", "Euro", 114L), + changelogRow("+I", "2022-01-02", "Euro", 100L), + changelogRow("+I", "2022-01-02", "Yen", 1L))); + + // test projection and filter + testBatchRead( + buildQuery(table, "currency, dt", "WHERE rate = 114"), + Arrays.asList( + changelogRow("+I", "US Dollar", "2022-01-01"), + changelogRow("+I", "Euro", "2022-01-01"))); + } + @Test public void testNaNType() throws Exception { bEnv.executeSql( diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java index d2bb9eb98274..a9107a78fcf4 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java @@ -111,6 +111,7 @@ public TestChangelogDataReadWrite(String root) { CoreOptions.PARTITION_GENERATE_LEGCY_NAME.defaultValue(), CoreOptions.FILE_SUFFIX_INCLUDE_COMPRESSION.defaultValue(), CoreOptions.FILE_COMPRESSION.defaultValue(), + null, null); this.snapshotManager = new SnapshotManager(LocalFileIO.create(), new Path(root)); this.commitUser = UUID.randomUUID().toString(); diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/util/ReadWriteTableTestUtil.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/util/ReadWriteTableTestUtil.java index 0eac2ed2936e..1383e3f4edff 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/util/ReadWriteTableTestUtil.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/util/ReadWriteTableTestUtil.java @@ -273,6 +273,22 @@ public static void checkFileStorePath(String table, List partitionSpec) }); } + public static void checkExternalFileStorePath(List partitionSpec, String externalPath) { + // check data file path + if (partitionSpec.isEmpty()) { + partitionSpec = Collections.singletonList(""); + } + partitionSpec.stream() + .map(str -> str.replaceAll(",", "/")) + .map(str -> str.replaceAll("null", "__DEFAULT_PARTITION__")) + .forEach( + partition -> { + assertThat(Paths.get(externalPath, partition)).exists(); + // at least exists bucket-0 + assertThat(Paths.get(externalPath, partition, "bucket-0")).exists(); + }); + } + public static void testBatchRead(String query, List expected) throws Exception { CloseableIterator resultItr = bEnv.executeSql(query).collect(); try (BlockingIterator iterator = BlockingIterator.of(resultItr)) { diff --git a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkFileIndexITCase.java b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkFileIndexITCase.java index 99e95cf40e5a..cb35fa507016 100644 --- a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkFileIndexITCase.java +++ b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkFileIndexITCase.java @@ -161,6 +161,7 @@ protected void foreachIndexReader(Consumer consumer) CoreOptions.PARTITION_GENERATE_LEGCY_NAME.defaultValue(), CoreOptions.FILE_SUFFIX_INCLUDE_COMPRESSION.defaultValue(), CoreOptions.FILE_COMPRESSION.defaultValue(), + null, null); Table table = fileSystemCatalog.getTable(Identifier.create("db", "T"));