Skip to content
Merged
14 changes: 14 additions & 0 deletions paimon-flink/paimon-flink-1.20/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,20 @@ under the License.
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@
import org.apache.flink.configuration.ExecutionOptions;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.functions.source.SourceFunction;

import java.time.Duration;
import java.util.HashMap;
Expand Down Expand Up @@ -131,7 +130,7 @@ public void build() throws Exception {

protected void beforeBuildingSourceSink() throws Exception {}

protected Object buildSource() {
protected Source<CdcSourceRecord, ?, ?> buildSource() {
return syncJobHandler.provideSource();
}

Expand All @@ -147,41 +146,32 @@ protected void validateRuntimeExecutionMode() {
"It's only support STREAMING mode for flink-cdc sync table action.");
}

private DataStreamSource<CdcSourceRecord> buildDataStreamSource(Object source) {
if (source instanceof Source) {
boolean isAutomaticWatermarkCreationEnabled =
tableConfig.containsKey(CoreOptions.TAG_AUTOMATIC_CREATION.key())
&& Objects.equals(
tableConfig.get(CoreOptions.TAG_AUTOMATIC_CREATION.key()),
WATERMARK.toString());

Options options = Options.fromMap(tableConfig);
Duration idleTimeout = options.get(SCAN_WATERMARK_IDLE_TIMEOUT);
String watermarkAlignGroup = options.get(SCAN_WATERMARK_ALIGNMENT_GROUP);
WatermarkStrategy<CdcSourceRecord> watermarkStrategy =
isAutomaticWatermarkCreationEnabled
? watermarkAlignGroup != null
? new CdcWatermarkStrategy(createCdcTimestampExtractor())
.withWatermarkAlignment(
watermarkAlignGroup,
options.get(SCAN_WATERMARK_ALIGNMENT_MAX_DRIFT),
options.get(
SCAN_WATERMARK_ALIGNMENT_UPDATE_INTERVAL))
: new CdcWatermarkStrategy(createCdcTimestampExtractor())
: WatermarkStrategy.noWatermarks();
if (idleTimeout != null) {
watermarkStrategy = watermarkStrategy.withIdleness(idleTimeout);
}
return env.fromSource(
(Source<CdcSourceRecord, ?, ?>) source,
watermarkStrategy,
syncJobHandler.provideSourceName());
private DataStreamSource<CdcSourceRecord> buildDataStreamSource(
Source<CdcSourceRecord, ?, ?> source) {
boolean isAutomaticWatermarkCreationEnabled =
tableConfig.containsKey(CoreOptions.TAG_AUTOMATIC_CREATION.key())
&& Objects.equals(
tableConfig.get(CoreOptions.TAG_AUTOMATIC_CREATION.key()),
WATERMARK.toString());

Options options = Options.fromMap(tableConfig);
Duration idleTimeout = options.get(SCAN_WATERMARK_IDLE_TIMEOUT);
String watermarkAlignGroup = options.get(SCAN_WATERMARK_ALIGNMENT_GROUP);
WatermarkStrategy<CdcSourceRecord> watermarkStrategy =
isAutomaticWatermarkCreationEnabled
? watermarkAlignGroup != null
? new CdcWatermarkStrategy(createCdcTimestampExtractor())
.withWatermarkAlignment(
watermarkAlignGroup,
options.get(SCAN_WATERMARK_ALIGNMENT_MAX_DRIFT),
options.get(
SCAN_WATERMARK_ALIGNMENT_UPDATE_INTERVAL))
: new CdcWatermarkStrategy(createCdcTimestampExtractor())
: WatermarkStrategy.noWatermarks();
if (idleTimeout != null) {
watermarkStrategy = watermarkStrategy.withIdleness(idleTimeout);
}
if (source instanceof SourceFunction) {
return env.addSource(
(SourceFunction<CdcSourceRecord>) source, syncJobHandler.provideSourceName());
}
throw new UnsupportedOperationException("Unrecognized source type");
return env.fromSource(source, watermarkStrategy, syncJobHandler.provideSourceName());
}

protected abstract FlatMapFunction<CdcSourceRecord, RichCdcMultiplexRecord> recordParse();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
import org.apache.flink.streaming.api.transformations.OneInputTransformation;
import org.apache.flink.streaming.api.transformations.PartitionTransformation;
import org.junit.jupiter.api.Test;
Expand All @@ -45,14 +44,7 @@ public void testTransformationParallelism() {
env.setParallelism(8);
int inputParallelism = ThreadLocalRandom.current().nextInt(8) + 1;
DataStreamSource<CdcMultiplexRecord> input =
env.addSource(
new ParallelSourceFunction<CdcMultiplexRecord>() {
@Override
public void run(SourceContext<CdcMultiplexRecord> ctx) {}

@Override
public void cancel() {}
})
env.fromData(CdcMultiplexRecord.class, new CdcMultiplexRecord("", "", null))
.setParallelism(inputParallelism);

FlinkCdcMultiTableSink sink =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.apache.paimon.utils.FailingFileIO;
import org.apache.paimon.utils.TraceableFileIO;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -154,8 +155,9 @@ private void innerTestRandomCdcEvents(Supplier<Integer> bucket, boolean unawareB
.allowRestart(enableFailure)
.build();

TestCdcSourceFunction sourceFunction = new TestCdcSourceFunction(events);
DataStreamSource<TestCdcEvent> source = env.addSource(sourceFunction);
TestCdcSource testCdcSource = new TestCdcSource(events);
DataStreamSource<TestCdcEvent> source =
env.fromSource(testCdcSource, WatermarkStrategy.noWatermarks(), "TestCdcSource");
source.setParallelism(2);

Options catalogOptions = new Options();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.apache.paimon.utils.FailingFileIO;
import org.apache.paimon.utils.TraceableFileIO;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.junit.jupiter.api.Disabled;
Expand Down Expand Up @@ -151,8 +152,9 @@ private void innerTestRandomCdcEvents(
.allowRestart(enableFailure)
.build();

TestCdcSourceFunction sourceFunction = new TestCdcSourceFunction(testTable.events());
DataStreamSource<TestCdcEvent> source = env.addSource(sourceFunction);
TestCdcSource testCdcSource = new TestCdcSource(testTable.events());
DataStreamSource<TestCdcEvent> source =
env.fromSource(testCdcSource, WatermarkStrategy.noWatermarks(), "TestCdcSource");
source.setParallelism(2);

Options catalogOptions = new Options();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.flink.sink.cdc;

import org.apache.paimon.flink.source.AbstractNonCoordinatedSource;
import org.apache.paimon.flink.source.AbstractNonCoordinatedSourceReader;
import org.apache.paimon.flink.source.SimpleSourceSplit;
import org.apache.paimon.flink.source.SplitListState;

import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.ReaderOutput;
import org.apache.flink.api.connector.source.SourceReader;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.core.io.InputStatus;

import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicInteger;

/**
* Testing parallel {@link org.apache.flink.api.connector.source.Source} to produce {@link
* TestCdcEvent}. {@link TestCdcEvent}s with the same key will be produced by the same parallelism.
*/
public class TestCdcSource extends AbstractNonCoordinatedSource<TestCdcEvent> {

private static final long serialVersionUID = 1L;
private final LinkedList<TestCdcEvent> events;

public TestCdcSource(Collection<TestCdcEvent> events) {
this.events = new LinkedList<>(events);
}

@Override
public Boundedness getBoundedness() {
return Boundedness.CONTINUOUS_UNBOUNDED;
}

@Override
public SourceReader<TestCdcEvent, SimpleSourceSplit> createReader(SourceReaderContext context) {
return new Reader(
context.getIndexOfSubtask(),
context.currentParallelism(),
new LinkedList<>(events));
}

private static class Reader extends AbstractNonCoordinatedSourceReader<TestCdcEvent> {
private final int subtaskId;
private final int totalSubtasks;

private final LinkedList<TestCdcEvent> events;
private final SplitListState<Integer> remainingEventsCount =
new SplitListState<>("events", x -> Integer.toString(x), Integer::parseInt);

private final int numRecordsPerCheckpoint;
private final AtomicInteger recordsThisCheckpoint;

private Reader(int subtaskId, int totalSubtasks, LinkedList<TestCdcEvent> events) {
this.subtaskId = subtaskId;
this.totalSubtasks = totalSubtasks;
this.events = events;
numRecordsPerCheckpoint =
events.size() / ThreadLocalRandom.current().nextInt(10, 20) + 1;
recordsThisCheckpoint = new AtomicInteger(0);
}

@Override
public InputStatus pollNext(ReaderOutput<TestCdcEvent> readerOutput) throws Exception {
if (events.isEmpty()) {
return InputStatus.END_OF_INPUT;
}

if (recordsThisCheckpoint.get() >= numRecordsPerCheckpoint) {
Thread.sleep(10);
return InputStatus.MORE_AVAILABLE;
}

TestCdcEvent event = events.poll();
if (event.records() != null) {
if (Math.abs(event.hashCode()) % totalSubtasks != subtaskId) {
return InputStatus.MORE_AVAILABLE;
}
}
readerOutput.collect(event);
recordsThisCheckpoint.incrementAndGet();
return InputStatus.MORE_AVAILABLE;
}

@Override
public List<SimpleSourceSplit> snapshotState(long l) {
recordsThisCheckpoint.set(0);
remainingEventsCount.clear();
remainingEventsCount.add(events.size());
return remainingEventsCount.snapshotState();
}

@Override
public void addSplits(List<SimpleSourceSplit> list) {
remainingEventsCount.restoreState(list);
int count = 0;
for (int c : remainingEventsCount.get()) {
count += c;
}
while (events.size() > count) {
events.poll();
}
}
}
}
Loading
Loading