diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkFileIO.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkFileIO.java index 74512409bfc8..617d25125f37 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkFileIO.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkFileIO.java @@ -29,10 +29,10 @@ import org.apache.flink.core.fs.FSDataOutputStream; import org.apache.flink.core.fs.FileSystem; import org.apache.flink.core.fs.FileSystem.WriteMode; -import org.apache.flink.core.fs.FileSystemKind; import java.io.IOException; import java.io.UncheckedIOException; +import java.util.Locale; /** Flink {@link FileIO} to use {@link FileSystem}. */ public class FlinkFileIO implements FileIO { @@ -48,7 +48,27 @@ public FlinkFileIO(Path path) { @Override public boolean isObjectStore() { try { - return path.getFileSystem().getKind() != FileSystemKind.FILE_SYSTEM; + FileSystem fs = path.getFileSystem(); + String scheme = fs.getUri().getScheme().toLowerCase(Locale.US); + + if (scheme.startsWith("s3") + || scheme.startsWith("emr") + || scheme.startsWith("oss") + || scheme.startsWith("wasb") + || scheme.startsWith("gs")) { + // the Amazon S3 storage or Aliyun OSS storage or Azure Blob Storage + // or Google Cloud Storage + return true; + } else if (scheme.startsWith("http") || scheme.startsWith("ftp")) { + // file servers instead of file systems + // they might actually be consistent, but we have no hard guarantees + // currently to rely on that + return true; + } else { + // the remainder should include hdfs, kosmos, ceph, ... + // this also includes federated HDFS (viewfs). + return false; + } } catch (IOException e) { throw new UncheckedIOException(e); } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FileStoreSourceReader.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FileStoreSourceReader.java index 92adf5e04998..8fc78c868ba5 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FileStoreSourceReader.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FileStoreSourceReader.java @@ -25,9 +25,7 @@ import org.apache.flink.api.connector.source.SourceReader; import org.apache.flink.api.connector.source.SourceReaderContext; -import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds; import org.apache.flink.connector.base.source.reader.SingleThreadMultiplexSourceReaderBase; -import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue; import org.apache.flink.connector.file.src.reader.BulkFormat.RecordIterator; import org.apache.flink.table.data.RowData; @@ -64,27 +62,6 @@ public FileStoreSourceReader( this.ioManager = ioManager; } - public FileStoreSourceReader( - SourceReaderContext readerContext, - TableRead tableRead, - FileStoreSourceReaderMetrics metrics, - IOManager ioManager, - @Nullable Long limit, - FutureCompletingBlockingQueue>> - elementsQueue) { - super( - elementsQueue, - () -> - new FileStoreSourceSplitReader( - tableRead, RecordLimiter.create(limit), metrics), - (element, output, state) -> - FlinkRecordsWithSplitIds.emitRecord( - readerContext, element, output, state, metrics), - readerContext.getConfiguration(), - readerContext); - this.ioManager = ioManager; - } - @Override public void start() { // we request a split only if we did not get splits during the checkpoint restore diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedSourceReader.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedSourceReader.java index 1f0bbca314b6..a8ffe3de561f 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedSourceReader.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedSourceReader.java @@ -58,7 +58,7 @@ public AlignedSourceReader( @Nullable Long limit, FutureCompletingBlockingQueue>> elementsQueue) { - super(readerContext, tableRead, metrics, ioManager, limit, elementsQueue); + super(readerContext, tableRead, metrics, ioManager, limit); this.elementsQueue = elementsQueue; this.nextCheckpointId = null; } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java index 2a855796d8d4..96334de3f87b 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java @@ -251,17 +251,20 @@ public void testSchemasTable() { sql("ALTER TABLE T SET ('snapshot.num-retained.min' = '18')"); sql("ALTER TABLE T SET ('manifest.format' = 'avro')"); - assertThat(sql("SHOW CREATE TABLE T$schemas").toString()) - .isEqualTo( - "[+I[CREATE TABLE `PAIMON`.`default`.`T$schemas` (\n" - + " `schema_id` BIGINT NOT NULL,\n" - + " `fields` VARCHAR(2147483647) NOT NULL,\n" - + " `partition_keys` VARCHAR(2147483647) NOT NULL,\n" - + " `primary_keys` VARCHAR(2147483647) NOT NULL,\n" - + " `options` VARCHAR(2147483647) NOT NULL,\n" - + " `comment` VARCHAR(2147483647),\n" - + " `update_time` TIMESTAMP(3) NOT NULL\n" - + ") ]]"); + String actualResult = sql("SHOW CREATE TABLE T$schemas").toString(); + String expectedResult = + "[+I[CREATE TABLE `PAIMON`.`default`.`T$schemas` (\n" + + " `schema_id` BIGINT NOT NULL,\n" + + " `fields` VARCHAR(2147483647) NOT NULL,\n" + + " `partition_keys` VARCHAR(2147483647) NOT NULL,\n" + + " `primary_keys` VARCHAR(2147483647) NOT NULL,\n" + + " `options` VARCHAR(2147483647) NOT NULL,\n" + + " `comment` VARCHAR(2147483647),\n" + + " `update_time` TIMESTAMP(3) NOT NULL\n" + + ") ]]"; + actualResult = actualResult.replace(" ", "").replace("\n", ""); + expectedResult = expectedResult.replace(" ", "").replace("\n", ""); + assertThat(actualResult).isEqualTo(expectedResult); List result = sql( diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/IteratorSourcesITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/IteratorSourcesITCase.java index 8404d994fa9f..0c5d485af7bc 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/IteratorSourcesITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/IteratorSourcesITCase.java @@ -18,10 +18,10 @@ package org.apache.paimon.flink.source; +import org.apache.commons.collections.IteratorUtils; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.datastream.DataStreamUtils; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.data.RowData; import org.apache.flink.test.util.MiniClusterWithClientResource; @@ -67,7 +67,7 @@ public void testParallelSourceExecution() throws Exception { "iterator source"); final List result = - DataStreamUtils.collectBoundedStream(stream, "Iterator Source Test"); + IteratorUtils.toList(stream.executeAndCollect("Iterator Source Test")); verifySequence(result, 1L, 1_000L); } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/util/MiniClusterWithClientExtension.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/util/MiniClusterWithClientExtension.java index cfc23a0a44d8..39939f78670b 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/util/MiniClusterWithClientExtension.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/util/MiniClusterWithClientExtension.java @@ -29,7 +29,6 @@ import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; import org.apache.flink.streaming.util.TestStreamEnvironment; import org.apache.flink.test.junit5.InjectClusterClient; -import org.apache.flink.test.util.TestEnvironment; import org.junit.jupiter.api.extension.AfterAllCallback; import org.junit.jupiter.api.extension.AfterEachCallback; import org.junit.jupiter.api.extension.BeforeAllCallback; @@ -167,17 +166,12 @@ private void registerEnv(InternalMiniClusterExtension internalMiniClusterExtensi .getOptional(CoreOptions.DEFAULT_PARALLELISM) .orElse(internalMiniClusterExtension.getNumberSlots()); - TestEnvironment executionEnvironment = - new TestEnvironment( - internalMiniClusterExtension.getMiniCluster(), defaultParallelism, false); - executionEnvironment.setAsContext(); TestStreamEnvironment.setAsContext( internalMiniClusterExtension.getMiniCluster(), defaultParallelism); } private void unregisterEnv(InternalMiniClusterExtension internalMiniClusterExtension) { TestStreamEnvironment.unsetAsContext(); - TestEnvironment.unsetAsContext(); } private MiniClusterClient createMiniClusterClient( diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/util/ReadWriteTableTestUtil.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/util/ReadWriteTableTestUtil.java index 9c3170f9a96b..0eac2ed2936e 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/util/ReadWriteTableTestUtil.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/util/ReadWriteTableTestUtil.java @@ -23,7 +23,6 @@ import org.apache.paimon.utils.BlockingIterator; import org.apache.flink.api.common.RuntimeExecutionMode; -import org.apache.flink.api.common.time.Time; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.RestartStrategyOptions; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; @@ -37,6 +36,7 @@ import javax.annotation.Nullable; import java.nio.file.Paths; +import java.time.Duration; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -53,7 +53,7 @@ /** Test util for {@link ReadWriteTableITCase}. */ public class ReadWriteTableTestUtil { - private static final Time TIME_OUT = Time.seconds(10); + private static final Duration TIME_OUT = Duration.ofSeconds(10); public static final int DEFAULT_PARALLELISM = 2; @@ -278,7 +278,7 @@ public static void testBatchRead(String query, List expected) throws Except try (BlockingIterator iterator = BlockingIterator.of(resultItr)) { if (!expected.isEmpty()) { List result = - iterator.collect(expected.size(), TIME_OUT.getSize(), TIME_OUT.getUnit()); + iterator.collect(expected.size(), TIME_OUT.getSeconds(), TimeUnit.SECONDS); assertThat(toInsertOnlyRows(result)) .containsExactlyInAnyOrderElementsOf(toInsertOnlyRows(expected)); }