overwritePartition,
+ @Nullable LogSinkProvider logSinkProvider)
+ throws Exception {
int[] partitions = partitioned ? new int[] {1} : new int[0];
int[] keys = new int[] {2};
StoreSink, ?> sink =
new StoreSink<>(
- null, fileStore, partitions, keys, NUM_BUCKET, null, overwritePartition);
+ null,
+ fileStore,
+ partitions,
+ keys,
+ NUM_BUCKET,
+ null,
+ overwritePartition,
+ logSinkProvider);
input = input.keyBy(row -> row.getInt(2)); // key by
GlobalCommittingSinkTranslator.translate(input, sink);
input.getExecutionEnvironment().execute();
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FiniteTestSource.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FiniteTestSource.java
new file mode 100644
index 000000000000..0e60ceacb073
--- /dev/null
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FiniteTestSource.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector;
+
+import org.apache.flink.api.common.state.CheckpointListener;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.util.Preconditions;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * A stream source that: 1) emits a list of elements without allowing checkpoints, 2) then waits for
+ * two more checkpoints to complete, 3) then re-emits the same elements before 4) waiting for
+ * another two checkpoints and 5) exiting.
+ *
+ * The reason this class is rewritten is to support {@link CheckpointedFunction}.
+ */
+@SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
+public class FiniteTestSource
+ implements SourceFunction, CheckpointedFunction, CheckpointListener {
+
+ private static final long serialVersionUID = 1L;
+
+ @SuppressWarnings("NonSerializableFieldInSerializableClass")
+ private final Iterable elements;
+
+ private volatile boolean running = true;
+
+ private transient int numCheckpointsComplete;
+
+ private transient ListState checkpointedState;
+
+ private volatile int numTimesEmitted;
+
+ public FiniteTestSource(Iterable elements) {
+ this.elements = elements;
+ }
+
+ @Override
+ public void initializeState(FunctionInitializationContext context) throws Exception {
+ this.checkpointedState =
+ context.getOperatorStateStore()
+ .getListState(
+ new ListStateDescriptor<>("emit-times", IntSerializer.INSTANCE));
+
+ if (context.isRestored()) {
+ List retrievedStates = new ArrayList<>();
+ for (Integer entry : this.checkpointedState.get()) {
+ retrievedStates.add(entry);
+ }
+
+ // given that the parallelism of the function is 1, we can only have 1 state
+ Preconditions.checkArgument(
+ retrievedStates.size() == 1,
+ getClass().getSimpleName() + " retrieved invalid state.");
+
+ this.numTimesEmitted = retrievedStates.get(0);
+ Preconditions.checkArgument(
+ numTimesEmitted <= 2,
+ getClass().getSimpleName()
+ + " retrieved invalid numTimesEmitted: "
+ + numTimesEmitted);
+ } else {
+ this.numTimesEmitted = 0;
+ }
+ }
+
+ @Override
+ public void run(SourceContext ctx) throws Exception {
+ switch (numTimesEmitted) {
+ case 0:
+ emitElementsAndWaitForCheckpoints(ctx);
+ emitElementsAndWaitForCheckpoints(ctx);
+ break;
+ case 1:
+ emitElementsAndWaitForCheckpoints(ctx);
+ break;
+ case 2:
+ // Maybe missed notifyCheckpointComplete, wait next notifyCheckpointComplete
+ final Object lock = ctx.getCheckpointLock();
+ synchronized (lock) {
+ int checkpointToAwait = numCheckpointsComplete + 2;
+ while (running && numCheckpointsComplete < checkpointToAwait) {
+ lock.wait(1);
+ }
+ }
+ break;
+ }
+ }
+
+ private void emitElementsAndWaitForCheckpoints(SourceContext ctx)
+ throws InterruptedException {
+ final Object lock = ctx.getCheckpointLock();
+
+ final int checkpointToAwait;
+ synchronized (lock) {
+ checkpointToAwait = numCheckpointsComplete + 2;
+ for (T t : elements) {
+ ctx.collect(t);
+ }
+ numTimesEmitted++;
+ }
+
+ synchronized (lock) {
+ while (running && numCheckpointsComplete < checkpointToAwait) {
+ lock.wait(1);
+ }
+ }
+ }
+
+ @Override
+ public void cancel() {
+ running = false;
+ }
+
+ @Override
+ public void notifyCheckpointComplete(long checkpointId) {
+ numCheckpointsComplete++;
+ }
+
+ @Override
+ public void notifyCheckpointAborted(long checkpointId) {}
+
+ @Override
+ public void snapshotState(FunctionSnapshotContext context) throws Exception {
+ Preconditions.checkState(
+ this.checkpointedState != null,
+ "The " + getClass().getSimpleName() + " has not been properly initialized.");
+
+ this.checkpointedState.clear();
+ this.checkpointedState.add(this.numTimesEmitted);
+ }
+}
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/CommittableSerializerTest.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/CommittableSerializerTest.java
index 2d59440df27e..17733f7f6c7d 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/CommittableSerializerTest.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/CommittableSerializerTest.java
@@ -18,21 +18,76 @@
package org.apache.flink.table.store.connector.sink;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.streaming.runtime.operators.sink.TestSink;
+import org.apache.flink.table.store.file.mergetree.Increment;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.RowType;
+
import org.junit.jupiter.api.Test;
+import java.io.IOException;
+
+import static org.apache.flink.table.store.file.manifest.ManifestCommittableSerializerTest.randomIncrement;
+import static org.apache.flink.table.store.file.mergetree.compact.CompactManagerTest.row;
import static org.assertj.core.api.Assertions.assertThat;
/** Test for {@link CommittableSerializer}. */
public class CommittableSerializerTest {
+ private final FileCommittableSerializer fileSerializer =
+ new FileCommittableSerializer(
+ RowType.of(new IntType()),
+ RowType.of(new IntType()),
+ RowType.of(new IntType()));
+
+ private final CommittableSerializer serializer =
+ new CommittableSerializer(
+ fileSerializer,
+ (SimpleVersionedSerializer) TestSink.StringCommittableSerializer.INSTANCE);
+
+ @Test
+ public void testFile() throws IOException {
+ Increment increment = randomIncrement();
+ FileCommittable committable = new FileCommittable(row(0), 1, increment);
+ FileCommittable newCommittable =
+ (FileCommittable)
+ serializer
+ .deserialize(
+ 1,
+ serializer.serialize(
+ new Committable(
+ Committable.Kind.FILE, committable)))
+ .wrappedCommittable();
+ assertThat(newCommittable).isEqualTo(committable);
+ }
+
+ @Test
+ public void testLogOffset() throws IOException {
+ LogOffsetCommittable committable = new LogOffsetCommittable(2, 3);
+ LogOffsetCommittable newCommittable =
+ (LogOffsetCommittable)
+ serializer
+ .deserialize(
+ 1,
+ serializer.serialize(
+ new Committable(
+ Committable.Kind.LOG_OFFSET, committable)))
+ .wrappedCommittable();
+ assertThat(newCommittable).isEqualTo(committable);
+ }
+
@Test
- public void test() {
- byte[] bytes = new byte[] {4, 5, 1};
- Committable committable = new Committable(Committable.Kind.LOG, bytes, 9);
- byte[] serialize = CommittableSerializer.INSTANCE.serialize(committable);
- Committable deser = CommittableSerializer.INSTANCE.deserialize(1, serialize);
- assertThat(deser.kind()).isEqualTo(Committable.Kind.LOG);
- assertThat(deser.serializerVersion()).isEqualTo(9);
- assertThat(deser.wrappedCommittable()).isEqualTo(bytes);
+ public void testLog() throws IOException {
+ String log = "random_string";
+ String newCommittable =
+ (String)
+ serializer
+ .deserialize(
+ 1,
+ serializer.serialize(
+ new Committable(Committable.Kind.LOG, log)))
+ .wrappedCommittable();
+ assertThat(newCommittable).isEqualTo(log);
}
}
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/GlobalCommittableSerializerTest.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/GlobalCommittableSerializerTest.java
deleted file mode 100644
index 31753c3761ca..000000000000
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/GlobalCommittableSerializerTest.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.store.connector.sink;
-
-import org.apache.flink.core.io.SimpleVersionedSerializer;
-import org.apache.flink.table.store.file.manifest.ManifestCommittableSerializerTest;
-
-import org.junit.jupiter.api.Test;
-
-import java.io.IOException;
-import java.nio.charset.StandardCharsets;
-import java.util.Arrays;
-import java.util.List;
-
-import static org.assertj.core.api.Assertions.assertThat;
-
-/** Test for {@link GlobalCommittableSerializer}. */
-public class GlobalCommittableSerializerTest {
-
- @Test
- public void test() throws IOException {
- List logs = Arrays.asList("1", "2");
- GlobalCommittable committable =
- new GlobalCommittable<>(logs, ManifestCommittableSerializerTest.create());
- GlobalCommittableSerializer serializer =
- new GlobalCommittableSerializer<>(
- new TestStringSerializer(), ManifestCommittableSerializerTest.serializer());
- byte[] serialized = serializer.serialize(committable);
- GlobalCommittable deser = serializer.deserialize(1, serialized);
- assertThat(deser.logCommittables()).isEqualTo(committable.logCommittables());
- assertThat(deser.fileCommittable()).isEqualTo(committable.fileCommittable());
- }
-
- private static final class TestStringSerializer implements SimpleVersionedSerializer {
-
- private static final int VERSION = 1073741823;
-
- private TestStringSerializer() {}
-
- public int getVersion() {
- return VERSION;
- }
-
- public byte[] serialize(String str) {
- return str.getBytes(StandardCharsets.UTF_8);
- }
-
- public String deserialize(int version, byte[] serialized) {
- assertThat(version).isEqualTo(VERSION);
- return new String(serialized, StandardCharsets.UTF_8);
- }
- }
-}
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/LogStoreSinkITCase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/LogStoreSinkITCase.java
new file mode 100644
index 000000000000..023b4b626e8d
--- /dev/null
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/LogStoreSinkITCase.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector.sink;
+
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.factories.DynamicTableFactory;
+import org.apache.flink.table.store.connector.FileStoreITCase;
+import org.apache.flink.table.store.file.FileStore;
+import org.apache.flink.table.store.kafka.KafkaLogSinkProvider;
+import org.apache.flink.table.store.kafka.KafkaLogSourceProvider;
+import org.apache.flink.table.store.kafka.KafkaLogStoreFactory;
+import org.apache.flink.table.store.kafka.KafkaLogTestUtils;
+import org.apache.flink.table.store.kafka.KafkaTableTestBase;
+import org.apache.flink.table.store.log.LogOptions;
+import org.apache.flink.types.Row;
+
+import org.junit.Test;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.table.store.connector.FileStoreITCase.buildBatchEnv;
+import static org.apache.flink.table.store.connector.FileStoreITCase.buildConfiguration;
+import static org.apache.flink.table.store.connector.FileStoreITCase.buildFileStore;
+import static org.apache.flink.table.store.connector.FileStoreITCase.buildStreamEnv;
+import static org.apache.flink.table.store.connector.FileStoreITCase.buildTestSource;
+import static org.apache.flink.table.store.kafka.KafkaLogTestUtils.SINK_CONTEXT;
+import static org.apache.flink.table.store.kafka.KafkaLogTestUtils.SOURCE_CONTEXT;
+import static org.apache.flink.table.store.kafka.KafkaLogTestUtils.discoverKafkaLogFactory;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for table store with kafka. */
+public class LogStoreSinkITCase extends KafkaTableTestBase {
+
+ @Test
+ public void testStreamingPartitioned() throws Exception {
+ innerTest("testStreamingPartitioned", false, true, true);
+ }
+
+ @Test
+ public void testStreamingNonPartitioned() throws Exception {
+ innerTest("testStreamingNonPartitioned", false, false, true);
+ }
+
+ @Test
+ public void testBatchPartitioned() throws Exception {
+ innerTest("testBatchPartitioned", true, true, true);
+ }
+
+ @Test
+ public void testStreamingEventual() throws Exception {
+ innerTest("testStreamingEventual", false, true, false);
+ }
+
+ private void innerTest(String name, boolean isBatch, boolean partitioned, boolean transaction)
+ throws Exception {
+ StreamExecutionEnvironment env = isBatch ? buildBatchEnv() : buildStreamEnv();
+
+ // in eventual mode, failure will result in duplicate data
+ FileStore fileStore =
+ buildFileStore(
+ buildConfiguration(isBatch || !transaction, TEMPORARY_FOLDER.newFolder()),
+ partitioned);
+
+ // prepare log
+ DynamicTableFactory.Context context =
+ KafkaLogTestUtils.testContext(
+ name,
+ getBootstrapServers(),
+ LogOptions.LogChangelogMode.AUTO,
+ transaction
+ ? LogOptions.LogConsistency.TRANSACTIONAL
+ : LogOptions.LogConsistency.EVENTUAL,
+ FileStoreITCase.VALUE_TYPE,
+ new int[] {2});
+
+ KafkaLogStoreFactory factory = discoverKafkaLogFactory();
+ KafkaLogSinkProvider sinkProvider = factory.createSinkProvider(context, SINK_CONTEXT);
+ KafkaLogSourceProvider sourceProvider =
+ factory.createSourceProvider(context, SOURCE_CONTEXT);
+
+ factory.onCreateTable(context, 3, true);
+
+ try {
+ // write
+ DataStreamSource finiteSource = buildTestSource(env, isBatch);
+ FileStoreITCase.write(finiteSource, fileStore, partitioned, null, sinkProvider);
+
+ // read
+ List results = FileStoreITCase.read(env, fileStore);
+
+ Row[] expected =
+ partitioned
+ ? new Row[] {
+ Row.of(5, "p2", 1),
+ Row.of(3, "p2", 5),
+ Row.of(5, "p1", 1),
+ Row.of(0, "p1", 2)
+ }
+ : new Row[] {
+ Row.of(5, "p2", 1), Row.of(0, "p1", 2), Row.of(3, "p2", 5)
+ };
+ assertThat(results).containsExactlyInAnyOrder(expected);
+
+ results =
+ buildStreamEnv()
+ .fromSource(
+ sourceProvider.createSource(null),
+ WatermarkStrategy.noWatermarks(),
+ "source")
+ .executeAndCollect(isBatch ? 6 : 12).stream()
+ .map(FileStoreITCase.CONVERTER::toExternal)
+ .collect(Collectors.toList());
+
+ if (isBatch) {
+ expected =
+ new Row[] {
+ Row.of(0, "p1", 1),
+ Row.of(0, "p1", 2),
+ Row.of(5, "p1", 1),
+ Row.of(6, "p2", 1),
+ Row.of(3, "p2", 5),
+ Row.of(5, "p2", 1)
+ };
+ } else {
+ // read log
+ // expect origin data X 2 (FiniteTestSource)
+ expected =
+ new Row[] {
+ Row.of(0, "p1", 1),
+ Row.of(0, "p1", 2),
+ Row.of(5, "p1", 1),
+ Row.of(6, "p2", 1),
+ Row.of(3, "p2", 5),
+ Row.of(5, "p2", 1),
+ Row.of(0, "p1", 1),
+ Row.of(0, "p1", 2),
+ Row.of(5, "p1", 1),
+ Row.of(6, "p2", 1),
+ Row.of(3, "p2", 5),
+ Row.of(5, "p2", 1)
+ };
+ }
+ assertThat(results).containsExactlyInAnyOrder(expected);
+ } finally {
+ factory.onDropTable(context, true);
+ }
+ }
+}
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/StoreSinkTest.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/StoreSinkTest.java
index fd3d97fa9a22..e2ef3e76b4c1 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/StoreSinkTest.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/StoreSinkTest.java
@@ -24,6 +24,7 @@
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.binary.BinaryRowData;
import org.apache.flink.table.store.connector.sink.TestFileStore.TestRecordWriter;
+import org.apache.flink.table.store.file.manifest.ManifestCommittable;
import org.apache.flink.table.store.file.utils.RecordWriter;
import org.apache.flink.table.types.logical.BigIntType;
import org.apache.flink.table.types.logical.IntType;
@@ -123,7 +124,8 @@ public void testNoKeyChangelogs() throws Exception {
primaryKeys,
2,
() -> lock,
- new HashMap<>());
+ new HashMap<>(),
+ null);
writeAndCommit(
sink,
GenericRowData.ofKind(RowKind.INSERT, 0, 0, 1),
@@ -225,7 +227,7 @@ private List write(StoreSink, ?> sink, RowData... rows) throws Ex
private void commit(StoreSink, ?> sink, List fileCommittables) throws Exception {
StoreGlobalCommitter committer = sink.createGlobalCommitter();
- GlobalCommittable> committable = committer.combine(0, fileCommittables);
+ ManifestCommittable committable = committer.combine(0, fileCommittables);
fileStore.expired = false;
lock.locked = false;
@@ -246,7 +248,14 @@ private void commit(StoreSink, ?> sink, List fileCommittables) th
private StoreSink, ?> newSink(Map overwritePartition) {
return new StoreSink<>(
- identifier, fileStore, partitions, primaryKeys, 2, () -> lock, overwritePartition);
+ identifier,
+ fileStore,
+ partitions,
+ primaryKeys,
+ 2,
+ () -> lock,
+ overwritePartition,
+ null);
}
private class TestLock implements CatalogLock {
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/global/GlobalCommitterOperatorTest.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/global/GlobalCommitterOperatorTest.java
index 3bbead5c65c5..17cbba1fe67d 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/global/GlobalCommitterOperatorTest.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/global/GlobalCommitterOperatorTest.java
@@ -49,7 +49,7 @@ public class GlobalCommitterOperatorTest {
@Test
public void closeCommitter() throws Exception {
final DefaultGlobalCommitter globalCommitter = new DefaultGlobalCommitter();
- final OneInputStreamOperatorTestHarness, Void> testHarness =
+ final OneInputStreamOperatorTestHarness, ?> testHarness =
createTestHarness(globalCommitter);
testHarness.initializeEmptyState();
testHarness.open();
@@ -69,7 +69,7 @@ public void restoredFromMergedState() throws Exception {
buildSubtaskState(createTestHarness(), input2);
final DefaultGlobalCommitter globalCommitter = new DefaultGlobalCommitter();
- final OneInputStreamOperatorTestHarness, Void> testHarness =
+ final OneInputStreamOperatorTestHarness, ?> testHarness =
createTestHarness(globalCommitter);
final OperatorSubtaskState mergedOperatorSubtaskState =
@@ -108,7 +108,7 @@ public void commitMultipleStagesTogether() throws Exception {
expectedOutput.add(DefaultGlobalCommitter.COMBINER.apply(input2));
expectedOutput.add(DefaultGlobalCommitter.COMBINER.apply(input3));
- final OneInputStreamOperatorTestHarness, Void> testHarness =
+ final OneInputStreamOperatorTestHarness, ?> testHarness =
createTestHarness(globalCommitter);
testHarness.initializeEmptyState();
testHarness.open();
@@ -138,7 +138,7 @@ public void filterRecoveredCommittables() throws Exception {
final DefaultGlobalCommitter globalCommitter =
new DefaultGlobalCommitter(successCommittedCommittable);
- final OneInputStreamOperatorTestHarness, Void> testHarness =
+ final OneInputStreamOperatorTestHarness, ?> testHarness =
createTestHarness(globalCommitter);
// all data from previous checkpoint are expected to be committed,
@@ -155,7 +155,7 @@ public void filterRecoveredCommittables() throws Exception {
public void endOfInput() throws Exception {
final DefaultGlobalCommitter globalCommitter = new DefaultGlobalCommitter();
- final OneInputStreamOperatorTestHarness, Void> testHarness =
+ final OneInputStreamOperatorTestHarness, ?> testHarness =
createTestHarness(globalCommitter);
testHarness.initializeEmptyState();
testHarness.open();
@@ -166,12 +166,12 @@ public void endOfInput() throws Exception {
assertThat(globalCommitter.getCommittedData()).contains("elder+patience+silent");
}
- private OneInputStreamOperatorTestHarness, Void> createTestHarness()
+ private OneInputStreamOperatorTestHarness, ?> createTestHarness()
throws Exception {
return createTestHarness(new DefaultGlobalCommitter());
}
- private OneInputStreamOperatorTestHarness, Void> createTestHarness(
+ private OneInputStreamOperatorTestHarness, ?> createTestHarness(
GlobalCommitter globalCommitter) throws Exception {
return new OneInputStreamOperatorTestHarness<>(
new GlobalCommitterOperator<>(
@@ -183,7 +183,7 @@ private OneInputStreamOperatorTestHarness, Void> crea
}
public static OperatorSubtaskState buildSubtaskState(
- OneInputStreamOperatorTestHarness, Void> testHarness,
+ OneInputStreamOperatorTestHarness, ?> testHarness,
List input)
throws Exception {
testHarness.initializeEmptyState();
@@ -199,7 +199,7 @@ public static OperatorSubtaskState buildSubtaskState(
return operatorSubtaskState;
}
- private static List>> committableRecords(
+ static List>> committableRecords(
Collection elements) {
return elements.stream()
.map(GlobalCommitterOperatorTest::toCommittableMessage)
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/global/LocalCommitterOperatorTest.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/global/LocalCommitterOperatorTest.java
new file mode 100644
index 000000000000..528f5dc57eee
--- /dev/null
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/global/LocalCommitterOperatorTest.java
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector.sink.global;
+
+import org.apache.flink.api.connector.sink2.Committer;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
+import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage;
+import org.apache.flink.streaming.runtime.operators.sink.TestSink.StringCommittableSerializer;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+
+import org.junit.Test;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.table.store.connector.sink.global.GlobalCommitterOperatorTest.buildSubtaskState;
+import static org.apache.flink.table.store.connector.sink.global.GlobalCommitterOperatorTest.committableRecords;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.Assert.assertNotNull;
+
+/** Test the {@link LocalCommitterOperator}. */
+public class LocalCommitterOperatorTest {
+
+ @Test
+ public void supportRetry() throws Exception {
+ final List input = Arrays.asList("lazy", "leaf");
+ final RetryOnceCommitter committer = new RetryOnceCommitter();
+ final OneInputStreamOperatorTestHarness<
+ CommittableMessage, CommittableMessage>
+ testHarness = createTestHarness(committer);
+
+ testHarness.initializeEmptyState();
+ testHarness.open();
+ testHarness.processElements(committableRecords(input));
+ testHarness.prepareSnapshotPreBarrier(1);
+ testHarness.snapshot(1L, 1L);
+ testHarness.notifyOfCompletedCheckpoint(1L);
+ testHarness.snapshot(2L, 2L);
+ testHarness.notifyOfCompletedCheckpoint(2L);
+
+ testHarness.close();
+
+ assertThat(committer.getCommittedData()).contains("lazy", "leaf");
+ }
+
+ @Test
+ public void closeCommitter() throws Exception {
+ final DefaultCommitter committer = new DefaultCommitter();
+ final OneInputStreamOperatorTestHarness<
+ CommittableMessage, CommittableMessage>
+ testHarness = createTestHarness(committer);
+ testHarness.initializeEmptyState();
+ testHarness.open();
+ testHarness.close();
+ assertThat(committer.isClosed()).isTrue();
+ }
+
+ @Test
+ public void restoredFromMergedState() throws Exception {
+ final List input1 = Arrays.asList("today", "whom");
+ final OperatorSubtaskState operatorSubtaskState1 =
+ buildSubtaskState(createTestHarness(), input1);
+
+ final List input2 = Arrays.asList("future", "evil", "how");
+ final OperatorSubtaskState operatorSubtaskState2 =
+ buildSubtaskState(createTestHarness(), input2);
+
+ final DefaultCommitter committer = new DefaultCommitter();
+ final OneInputStreamOperatorTestHarness<
+ CommittableMessage, CommittableMessage>
+ testHarness = createTestHarness(committer);
+
+ final OperatorSubtaskState mergedOperatorSubtaskState =
+ OneInputStreamOperatorTestHarness.repackageState(
+ operatorSubtaskState1, operatorSubtaskState2);
+
+ testHarness.initializeState(
+ OneInputStreamOperatorTestHarness.repartitionOperatorState(
+ mergedOperatorSubtaskState, 2, 2, 1, 0));
+ testHarness.open();
+
+ final List expectedOutput = new ArrayList<>();
+ expectedOutput.addAll(input1);
+ expectedOutput.addAll(input2);
+
+ testHarness.prepareSnapshotPreBarrier(1L);
+ testHarness.snapshot(1L, 1L);
+ testHarness.notifyOfCompletedCheckpoint(1);
+
+ testHarness.close();
+
+ assertThat(committer.getCommittedData())
+ .containsExactlyInAnyOrder(expectedOutput.toArray(new String[0]));
+ }
+
+ @Test
+ public void commitMultipleStagesTogether() throws Exception {
+
+ final DefaultCommitter committer = new DefaultCommitter();
+
+ final List input1 = Arrays.asList("cautious", "nature");
+ final List input2 = Arrays.asList("count", "over");
+ final List input3 = Arrays.asList("lawyer", "grammar");
+
+ final List expectedOutput = new ArrayList<>();
+
+ expectedOutput.addAll(input1);
+ expectedOutput.addAll(input2);
+ expectedOutput.addAll(input3);
+
+ final OneInputStreamOperatorTestHarness<
+ CommittableMessage, CommittableMessage>
+ testHarness = createTestHarness(committer);
+ testHarness.initializeEmptyState();
+ testHarness.open();
+
+ testHarness.processElements(committableRecords(input1));
+ testHarness.prepareSnapshotPreBarrier(1L);
+ testHarness.snapshot(1L, 1L);
+ testHarness.processElements(committableRecords(input2));
+ testHarness.prepareSnapshotPreBarrier(2L);
+ testHarness.snapshot(2L, 2L);
+ testHarness.processElements(committableRecords(input3));
+ testHarness.prepareSnapshotPreBarrier(3L);
+ testHarness.snapshot(3L, 3L);
+
+ testHarness.notifyOfCompletedCheckpoint(1);
+ testHarness.notifyOfCompletedCheckpoint(3);
+
+ testHarness.close();
+
+ assertThat(fromRecords(testHarness.getRecordOutput())).isEqualTo(expectedOutput);
+
+ assertThat(committer.getCommittedData()).isEqualTo(expectedOutput);
+ }
+
+ private static List fromRecords(
+ Collection>> elements) {
+ return elements.stream()
+ .map(StreamRecord::getValue)
+ .filter(message -> message instanceof CommittableWithLineage)
+ .map(message -> ((CommittableWithLineage) message).getCommittable())
+ .collect(Collectors.toList());
+ }
+
+ private OneInputStreamOperatorTestHarness<
+ CommittableMessage, CommittableMessage>
+ createTestHarness() throws Exception {
+ return createTestHarness(new DefaultCommitter());
+ }
+
+ private OneInputStreamOperatorTestHarness<
+ CommittableMessage, CommittableMessage>
+ createTestHarness(Committer committer) throws Exception {
+ return new OneInputStreamOperatorTestHarness<>(
+ new LocalCommitterOperator<>(
+ () -> committer, () -> StringCommittableSerializer.INSTANCE));
+ }
+
+ /** Base class for testing {@link Committer}. */
+ private static class DefaultCommitter implements Committer, Serializable {
+
+ @Nullable protected Queue committedData;
+
+ private boolean isClosed;
+
+ @Nullable private final Supplier> queueSupplier;
+
+ public DefaultCommitter() {
+ this.committedData = new ConcurrentLinkedQueue<>();
+ this.isClosed = false;
+ this.queueSupplier = null;
+ }
+
+ public List getCommittedData() {
+ if (committedData != null) {
+ return new ArrayList<>(committedData);
+ } else {
+ return Collections.emptyList();
+ }
+ }
+
+ @Override
+ public void close() {
+ isClosed = true;
+ }
+
+ public boolean isClosed() {
+ return isClosed;
+ }
+
+ @Override
+ public void commit(Collection> requests) {
+ if (committedData == null) {
+ assertNotNull(queueSupplier);
+ committedData = queueSupplier.get();
+ }
+ committedData.addAll(
+ requests.stream()
+ .map(CommitRequest::getCommittable)
+ .collect(Collectors.toList()));
+ }
+ }
+
+ /** A {@link Committer} that always re-commits the committables data it received. */
+ private static class RetryOnceCommitter extends DefaultCommitter implements Committer {
+
+ private final Set seen = new LinkedHashSet<>();
+
+ @Override
+ public void commit(Collection> requests) {
+ requests.forEach(
+ c -> {
+ if (seen.remove(c.getCommittable())) {
+ checkNotNull(committedData);
+ committedData.add(c.getCommittable());
+ } else {
+ seen.add(c.getCommittable());
+ c.retryLater();
+ }
+ });
+ }
+ }
+}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/log/LogWriteCallback.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/log/LogWriteCallback.java
new file mode 100644
index 000000000000..03c62d6713ec
--- /dev/null
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/log/LogWriteCallback.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.log;
+
+import org.apache.flink.table.store.log.LogSinkProvider.WriteCallback;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.LongAccumulator;
+
+/** A {@link WriteCallback} implementation. */
+public class LogWriteCallback implements WriteCallback {
+
+ private final ConcurrentHashMap offsetMap = new ConcurrentHashMap<>();
+
+ @Override
+ public void onCompletion(int bucket, long offset) {
+ LongAccumulator acc = offsetMap.get(bucket);
+ if (acc == null) {
+ // computeIfAbsent will lock on the key
+ acc = offsetMap.computeIfAbsent(bucket, k -> new LongAccumulator(Long::max, 0));
+ } // else lock free
+ acc.accumulate(offset);
+ }
+
+ public Map offsets() {
+ Map offsets = new HashMap<>();
+ offsetMap.forEach((k, v) -> offsets.put(k, v.longValue()));
+ return offsets;
+ }
+}
diff --git a/flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/KafkaLogTestUtils.java b/flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/KafkaLogTestUtils.java
index 0fad212afbe9..b4b2efc3f1c1 100644
--- a/flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/KafkaLogTestUtils.java
+++ b/flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/KafkaLogTestUtils.java
@@ -63,7 +63,7 @@
/** Utils for the test of {@link KafkaLogStoreFactory}. */
public class KafkaLogTestUtils {
- static final LogStoreTableFactory.SourceContext SOURCE_CONTEXT =
+ public static final LogStoreTableFactory.SourceContext SOURCE_CONTEXT =
new LogStoreTableFactory.SourceContext() {
@Override
public TypeInformation createTypeInformation(DataType producedDataType) {
@@ -85,7 +85,7 @@ public DynamicTableSource.DataStructureConverter createDataStructureConverter(
}
};
- static final LogStoreTableFactory.SinkContext SINK_CONTEXT =
+ public static final LogStoreTableFactory.SinkContext SINK_CONTEXT =
new LogStoreTableFactory.SinkContext() {
@Override
@@ -113,7 +113,7 @@ public DynamicTableSink.DataStructureConverter createDataStructureConverter(
}
};
- static KafkaLogStoreFactory discoverKafkaLogFactory() {
+ public static KafkaLogStoreFactory discoverKafkaLogFactory() {
return (KafkaLogStoreFactory)
LogStoreTableFactory.discoverLogStoreFactory(
Thread.currentThread().getContextClassLoader(),
@@ -169,15 +169,27 @@ static DynamicTableFactory.Context testContext(
LogChangelogMode changelogMode,
LogConsistency consistency,
boolean keyed) {
+ return testContext(
+ name,
+ servers,
+ changelogMode,
+ consistency,
+ RowType.of(new IntType(), new IntType()),
+ keyed ? new int[] {0} : new int[0]);
+ }
+
+ public static DynamicTableFactory.Context testContext(
+ String name,
+ String servers,
+ LogChangelogMode changelogMode,
+ LogConsistency consistency,
+ RowType type,
+ int[] keys) {
Map options = new HashMap<>();
options.put(CHANGELOG_MODE.key(), changelogMode.toString());
options.put(CONSISTENCY.key(), consistency.toString());
options.put(BOOTSTRAP_SERVERS.key(), servers);
- return createContext(
- name,
- RowType.of(new IntType(), new IntType()),
- keyed ? new int[] {0} : new int[0],
- options);
+ return createContext(name, type, keys, options);
}
static SinkRecord testRecord(boolean keyed, int bucket, int key, int value, RowKind rowKind) {