Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -860,20 +860,17 @@ private void testNoChangelogProducerRandom(
// Deletion vectors mode not support concurrent write
numProducers = 1;
}
List<TableResult> results =
testRandom(
tEnv,
numProducers,
enableFailure,
"'bucket' = '4',"
+ String.format(
"'deletion-vectors.enabled' = '%s'",
enableDeletionVectors));

for (TableResult result : results) {
result.await();
}
checkBatchResult(numProducers);

testRandom(
tEnv,
numProducers,
enableFailure,
"'bucket' = '4',"
+ String.format(
"'deletion-vectors.enabled' = '%s'", enableDeletionVectors));

// changelog is produced by Flink normalize operator
checkChangelogTestResult(numProducers);
}

private void testFullCompactionChangelogProducerRandom(
Expand Down Expand Up @@ -1059,7 +1056,7 @@ private void checkChangelogTestResult(int numProducers) throws Exception {
* <p>All jobs will modify the same set of partitions to emulate conflicting writes. Each job
* will write its own set of keys for easy result checking.
*/
private List<TableResult> testRandom(
private void testRandom(
TableEnvironment tEnv, int numProducers, boolean enableFailure, String tableProperties)
throws Exception {
// producers will very quickly produce snapshots,
Expand Down Expand Up @@ -1130,8 +1127,6 @@ private List<TableResult> testRandom(
+ ")")
.await();

List<TableResult> results = new ArrayList<>();

if (enableFailure) {
FailingFileIO.reset(failingName, 2, 10000);
}
Expand All @@ -1157,18 +1152,14 @@ private List<TableResult> testRandom(

// run test SQL
int idx = i;
TableResult result =
FailingFileIO.retryArtificialException(
() ->
tEnv.executeSql(
"INSERT INTO T /*+ OPTIONS('sink.parallelism' = '2') */ SELECT pt, k, v1, v2 FROM myView"
+ idx
+ " WHERE i < "
+ usefulNumRows));
results.add(result);
FailingFileIO.retryArtificialException(
() ->
tEnv.executeSql(
"INSERT INTO T /*+ OPTIONS('sink.parallelism' = '2') */ SELECT pt, k, v1, v2 FROM myView"
+ idx
+ " WHERE i < "
+ usefulNumRows));
}

return results;
}

private void checkBatchResult(int numProducers) throws Exception {
Expand Down
Loading