Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,10 @@
import org.apache.flink.core.fs.FSDataOutputStream;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.FileSystem.WriteMode;
import org.apache.flink.core.fs.FileSystemKind;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.Locale;

/** Flink {@link FileIO} to use {@link FileSystem}. */
public class FlinkFileIO implements FileIO {
Expand All @@ -48,7 +48,27 @@ public FlinkFileIO(Path path) {
@Override
public boolean isObjectStore() {
try {
return path.getFileSystem().getKind() != FileSystemKind.FILE_SYSTEM;
FileSystem fs = path.getFileSystem();
String scheme = fs.getUri().getScheme().toLowerCase(Locale.US);

if (scheme.startsWith("s3")
|| scheme.startsWith("emr")
|| scheme.startsWith("oss")
|| scheme.startsWith("wasb")
|| scheme.startsWith("gs")) {
// the Amazon S3 storage or Aliyun OSS storage or Azure Blob Storage
// or Google Cloud Storage
return true;
} else if (scheme.startsWith("http") || scheme.startsWith("ftp")) {
// file servers instead of file systems
// they might actually be consistent, but we have no hard guarantees
// currently to rely on that
return true;
} else {
// the remainder should include hdfs, kosmos, ceph, ...
// this also includes federated HDFS (viewfs).
return false;
}
} catch (IOException e) {
throw new UncheckedIOException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,7 @@

import org.apache.flink.api.connector.source.SourceReader;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
import org.apache.flink.connector.base.source.reader.SingleThreadMultiplexSourceReaderBase;
import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
import org.apache.flink.connector.file.src.reader.BulkFormat.RecordIterator;
import org.apache.flink.table.data.RowData;

Expand Down Expand Up @@ -64,27 +62,6 @@ public FileStoreSourceReader(
this.ioManager = ioManager;
}

public FileStoreSourceReader(
SourceReaderContext readerContext,
TableRead tableRead,
FileStoreSourceReaderMetrics metrics,
IOManager ioManager,
@Nullable Long limit,
FutureCompletingBlockingQueue<RecordsWithSplitIds<RecordIterator<RowData>>>
elementsQueue) {
super(
elementsQueue,
() ->
new FileStoreSourceSplitReader(
tableRead, RecordLimiter.create(limit), metrics),
(element, output, state) ->
FlinkRecordsWithSplitIds.emitRecord(
readerContext, element, output, state, metrics),
readerContext.getConfiguration(),
readerContext);
this.ioManager = ioManager;
}

@Override
public void start() {
// we request a split only if we did not get splits during the checkpoint restore
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public AlignedSourceReader(
@Nullable Long limit,
FutureCompletingBlockingQueue<RecordsWithSplitIds<BulkFormat.RecordIterator<RowData>>>
elementsQueue) {
super(readerContext, tableRead, metrics, ioManager, limit, elementsQueue);
super(readerContext, tableRead, metrics, ioManager, limit);
this.elementsQueue = elementsQueue;
this.nextCheckpointId = null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -251,17 +251,20 @@ public void testSchemasTable() {
sql("ALTER TABLE T SET ('snapshot.num-retained.min' = '18')");
sql("ALTER TABLE T SET ('manifest.format' = 'avro')");

assertThat(sql("SHOW CREATE TABLE T$schemas").toString())
.isEqualTo(
"[+I[CREATE TABLE `PAIMON`.`default`.`T$schemas` (\n"
+ " `schema_id` BIGINT NOT NULL,\n"
+ " `fields` VARCHAR(2147483647) NOT NULL,\n"
+ " `partition_keys` VARCHAR(2147483647) NOT NULL,\n"
+ " `primary_keys` VARCHAR(2147483647) NOT NULL,\n"
+ " `options` VARCHAR(2147483647) NOT NULL,\n"
+ " `comment` VARCHAR(2147483647),\n"
+ " `update_time` TIMESTAMP(3) NOT NULL\n"
+ ") ]]");
String actualResult = sql("SHOW CREATE TABLE T$schemas").toString();
String expectedResult =
"[+I[CREATE TABLE `PAIMON`.`default`.`T$schemas` (\n"
+ " `schema_id` BIGINT NOT NULL,\n"
+ " `fields` VARCHAR(2147483647) NOT NULL,\n"
+ " `partition_keys` VARCHAR(2147483647) NOT NULL,\n"
+ " `primary_keys` VARCHAR(2147483647) NOT NULL,\n"
+ " `options` VARCHAR(2147483647) NOT NULL,\n"
+ " `comment` VARCHAR(2147483647),\n"
+ " `update_time` TIMESTAMP(3) NOT NULL\n"
+ ") ]]";
actualResult = actualResult.replace(" ", "").replace("\n", "");
expectedResult = expectedResult.replace(" ", "").replace("\n", "");
assertThat(actualResult).isEqualTo(expectedResult);

List<Row> result =
sql(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@

package org.apache.paimon.flink.source;

import org.apache.commons.collections.IteratorUtils;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamUtils;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.data.RowData;
import org.apache.flink.test.util.MiniClusterWithClientResource;
Expand Down Expand Up @@ -67,7 +67,7 @@ public void testParallelSourceExecution() throws Exception {
"iterator source");

final List<RowData> result =
DataStreamUtils.collectBoundedStream(stream, "Iterator Source Test");
IteratorUtils.toList(stream.executeAndCollect("Iterator Source Test"));

verifySequence(result, 1L, 1_000L);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.util.TestStreamEnvironment;
import org.apache.flink.test.junit5.InjectClusterClient;
import org.apache.flink.test.util.TestEnvironment;
import org.junit.jupiter.api.extension.AfterAllCallback;
import org.junit.jupiter.api.extension.AfterEachCallback;
import org.junit.jupiter.api.extension.BeforeAllCallback;
Expand Down Expand Up @@ -167,17 +166,12 @@ private void registerEnv(InternalMiniClusterExtension internalMiniClusterExtensi
.getOptional(CoreOptions.DEFAULT_PARALLELISM)
.orElse(internalMiniClusterExtension.getNumberSlots());

TestEnvironment executionEnvironment =
new TestEnvironment(
internalMiniClusterExtension.getMiniCluster(), defaultParallelism, false);
executionEnvironment.setAsContext();
TestStreamEnvironment.setAsContext(
internalMiniClusterExtension.getMiniCluster(), defaultParallelism);
}

private void unregisterEnv(InternalMiniClusterExtension internalMiniClusterExtension) {
TestStreamEnvironment.unsetAsContext();
TestEnvironment.unsetAsContext();
}

private MiniClusterClient createMiniClusterClient(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.apache.paimon.utils.BlockingIterator;

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestartStrategyOptions;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
Expand All @@ -37,6 +36,7 @@
import javax.annotation.Nullable;

import java.nio.file.Paths;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
Expand All @@ -53,7 +53,7 @@
/** Test util for {@link ReadWriteTableITCase}. */
public class ReadWriteTableTestUtil {

private static final Time TIME_OUT = Time.seconds(10);
private static final Duration TIME_OUT = Duration.ofSeconds(10);

public static final int DEFAULT_PARALLELISM = 2;

Expand Down Expand Up @@ -278,7 +278,7 @@ public static void testBatchRead(String query, List<Row> expected) throws Except
try (BlockingIterator<Row, Row> iterator = BlockingIterator.of(resultItr)) {
if (!expected.isEmpty()) {
List<Row> result =
iterator.collect(expected.size(), TIME_OUT.getSize(), TIME_OUT.getUnit());
iterator.collect(expected.size(), TIME_OUT.getSeconds(), TimeUnit.SECONDS);
assertThat(toInsertOnlyRows(result))
.containsExactlyInAnyOrderElementsOf(toInsertOnlyRows(expected));
}
Expand Down