Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import org.apache.paimon.utils.TraceableFileIO;

import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.configuration.ExecutionOptions;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.TableEnvironment;
Expand All @@ -37,7 +39,6 @@
import org.apache.flink.types.RowKind;
import org.apache.flink.util.CloseableIterator;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.params.ParameterizedTest;
Expand Down Expand Up @@ -787,7 +788,6 @@ public void testFullCompactionChangelogProducerBatchRandom() throws Exception {
testFullCompactionChangelogProducerRandom(bEnv, 1, false);
}

@Disabled // TODO: fix this unstable test
@Test
@Timeout(TIMEOUT)
public void testFullCompactionChangelogProducerStreamingRandom() throws Exception {
Expand Down Expand Up @@ -1088,6 +1088,29 @@ private List<TableResult> testRandom(
tEnv.getConfig()
.getConfiguration()
.set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1);

// We use a large number of rows to mimic unbounded streams because there is a known
// consistency issue in bounded streams.
//
// For bounded streams, if COMPACT snapshot fails to commit when the stream ends (due to
// conflict or whatever reasons), we have no chance to modify the compaction result, so the
// changelogs produced by compaction will not be committed.
//
// If it happens in production, users can run another job to compact the table, or run
// another job to write more data into the table. These remaining changelogs will be
// produced again.
int factor;
RuntimeExecutionMode mode =
tEnv.getConfig().getConfiguration().get(ExecutionOptions.RUNTIME_MODE);
if (mode == RuntimeExecutionMode.BATCH) {
factor = 1;
} else if (mode == RuntimeExecutionMode.STREAMING) {
factor = 10;
} else {
throw new UnsupportedOperationException(
"Unknown runtime execution mode " + mode.name());
}
int usefulNumRows = LIMIT + NUM_PARTS * NUM_KEYS;
tEnv.executeSql(
"CREATE TABLE `default_catalog`.`default_database`.`S` ("
+ " i INT"
Expand All @@ -1096,10 +1119,10 @@ private List<TableResult> testRandom(
+ " 'fields.i.kind' = 'sequence',"
+ " 'fields.i.start' = '0',"
+ " 'fields.i.end' = '"
+ (LIMIT + NUM_PARTS * NUM_KEYS - 1)
+ (usefulNumRows - 1) * factor
+ "',"
+ " 'number-of-rows' = '"
+ (LIMIT + NUM_PARTS * NUM_KEYS)
+ usefulNumRows * factor
+ "',"
+ " 'rows-per-second' = '"
+ (LIMIT / 20 + ThreadLocalRandom.current().nextInt(LIMIT / 20))
Expand Down Expand Up @@ -1129,7 +1152,7 @@ private List<TableResult> testRandom(
String v2Sql = "CAST(i AS STRING) || '.str' AS v2";
tEnv.executeSql(
String.format(
"CREATE TEMPORARY VIEW myView%d AS SELECT %s, %s, %s, %s FROM `default_catalog`.`default_database`.`S`",
"CREATE TEMPORARY VIEW myView%d AS SELECT %s, %s, %s, %s, i FROM `default_catalog`.`default_database`.`S`",
i, ptSql, kSql, v1Sql, v2Sql));

// run test SQL
Expand All @@ -1138,8 +1161,10 @@ private List<TableResult> testRandom(
FailingFileIO.retryArtificialException(
() ->
tEnv.executeSql(
"INSERT INTO T /*+ OPTIONS('sink.parallelism' = '2') */ SELECT * FROM myView"
+ idx));
"INSERT INTO T /*+ OPTIONS('sink.parallelism' = '2') */ SELECT pt, k, v1, v2 FROM myView"
+ idx
+ " WHERE i < "
+ usefulNumRows));
results.add(result);
}

Expand Down
Loading