Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
import org.apache.iceberg.util.Pair;
import org.apache.iceberg.util.StructLikeSet;
import org.apache.iceberg.util.StructLikeWrapper;
import org.awaitility.Awaitility;
import org.junit.Assert;

public class SimpleDataUtil {
Expand Down Expand Up @@ -277,21 +278,17 @@ public static void assertRecordsEqual(List<Record> expected, List<Record> actual
}

/**
* Assert table contains the expected list of records after waiting up to {@code maxCheckCount}
* with {@code checkInterval}
* Assert table contains the expected list of records after waiting up to the configured {@code
* timeout}
*/
public static void assertTableRecords(
Table table, List<Record> expected, Duration checkInterval, int maxCheckCount)
throws IOException, InterruptedException {
for (int i = 0; i < maxCheckCount; ++i) {
if (equalsRecords(expected, tableRecords(table), table.schema())) {
break;
} else {
Thread.sleep(checkInterval.toMillis());
}
}
// success or failure, assert on the latest table state
assertRecordsEqual(expected, tableRecords(table), table.schema());
public static void assertTableRecords(Table table, List<Record> expected, Duration timeout) {
Awaitility.await("expected list of records should be produced")
.atMost(timeout)
.untilAsserted(
() -> {
equalsRecords(expected, tableRecords(table), table.schema());
assertRecordsEqual(expected, tableRecords(table), table.schema());
});
}

public static void assertTableRecords(Table table, List<Record> expected) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
*/
package org.apache.iceberg.flink.source;

import static org.assertj.core.api.Assertions.assertThat;

import java.time.Duration;
import java.util.Collection;
import java.util.List;
Expand Down Expand Up @@ -47,6 +49,7 @@
import org.apache.iceberg.flink.data.RowDataToRowMapper;
import org.apache.iceberg.flink.source.assigner.SimpleSplitAssignerFactory;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.awaitility.Awaitility;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Rule;
Expand Down Expand Up @@ -401,10 +404,11 @@ public static List<Row> waitForResult(CloseableIterator<Row> iter, int limit) {
return results;
}

public static void waitUntilJobIsRunning(ClusterClient<?> client) throws Exception {
while (getRunningJobs(client).isEmpty()) {
Thread.sleep(10);
}
public static void waitUntilJobIsRunning(ClusterClient<?> client) {
Awaitility.await("job should be running")
.atMost(Duration.ofSeconds(30))
.pollInterval(Duration.ofMillis(10))
.untilAsserted(() -> assertThat(getRunningJobs(client)).isNotEmpty());
}

public static List<JobID> getRunningJobs(ClusterClient<?> client) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
*/
package org.apache.iceberg.flink.source;

import static org.apache.iceberg.flink.SimpleDataUtil.assertTableRecords;

import java.time.Duration;
import java.util.List;
import java.util.concurrent.CompletableFuture;
Expand All @@ -39,13 +41,11 @@
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.data.GenericAppenderHelper;
import org.apache.iceberg.data.RandomGenericData;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.flink.FlinkConfigOptions;
import org.apache.iceberg.flink.HadoopTableResource;
import org.apache.iceberg.flink.SimpleDataUtil;
import org.apache.iceberg.flink.TestFixtures;
import org.apache.iceberg.flink.sink.FlinkSink;
import org.apache.iceberg.flink.source.assigner.SimpleSplitAssignerFactory;
Expand Down Expand Up @@ -98,11 +98,6 @@ protected List<Record> generateRecords(int numRecords, long seed) {
return RandomGenericData.generate(schema(), numRecords, seed);
}

protected void assertRecords(
Table table, List<Record> expectedRecords, Duration interval, int maxCount) throws Exception {
SimpleDataUtil.assertTableRecords(table, expectedRecords, interval, maxCount);
}

@Test
public void testBoundedWithTaskManagerFailover() throws Exception {
testBoundedIcebergSource(FailoverType.TM);
Expand Down Expand Up @@ -156,7 +151,7 @@ private void testBoundedIcebergSource(FailoverType failoverType) throws Exceptio
RecordCounterToFail::continueProcessing,
miniClusterResource.getMiniCluster());

assertRecords(sinkTableResource.table(), expectedRecords, Duration.ofMillis(10), 12000);
assertTableRecords(sinkTableResource.table(), expectedRecords, Duration.ofSeconds(120));
}

@Test
Expand Down Expand Up @@ -219,7 +214,7 @@ private void testContinuousIcebergSource(FailoverType failoverType) throws Excep

// wait longer for continuous source to reduce flakiness
// because CI servers tend to be overloaded.
assertRecords(sinkTableResource.table(), expectedRecords, Duration.ofMillis(10), 12000);
assertTableRecords(sinkTableResource.table(), expectedRecords, Duration.ofSeconds(120));
}

// ------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,13 @@
*/
package org.apache.iceberg.flink.source;

import static org.assertj.core.api.Assertions.assertThat;

import java.io.File;
import java.io.IOException;
import java.time.Duration;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.operators.StreamSource;
Expand All @@ -47,6 +48,7 @@
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.SnapshotUtil;
import org.apache.iceberg.util.ThreadPools;
import org.awaitility.Awaitility;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
Expand Down Expand Up @@ -111,14 +113,11 @@ public void testConsumeWithoutStartSnapshotId() throws Exception {
TestSourceContext sourceContext = new TestSourceContext(latch);
runSourceFunctionInTask(sourceContext, function);

Assert.assertTrue(
"Should have expected elements.", latch.await(WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS));
Thread.sleep(1000L);
awaitExpectedSplits(sourceContext);

// Stop the stream task.
function.close();

Assert.assertEquals("Should produce the expected splits", 1, sourceContext.splits.size());
TestHelpers.assertRecords(
sourceContext.toRows(), Lists.newArrayList(Iterables.concat(recordsList)), SCHEMA);
}
Expand Down Expand Up @@ -148,14 +147,11 @@ public void testConsumeFromStartSnapshotId() throws Exception {
TestSourceContext sourceContext = new TestSourceContext(latch);
runSourceFunctionInTask(sourceContext, function);

Assert.assertTrue(
"Should have expected elements.", latch.await(WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS));
Thread.sleep(1000L);
awaitExpectedSplits(sourceContext);

// Stop the stream task.
function.close();

Assert.assertEquals("Should produce the expected splits", 1, sourceContext.splits.size());
TestHelpers.assertRecords(
sourceContext.toRows(), Lists.newArrayList(Iterables.concat(recordsList)), SCHEMA);
}
Expand Down Expand Up @@ -184,14 +180,10 @@ public void testConsumeFromStartTag() throws Exception {
TestSourceContext sourceContext = new TestSourceContext(latch);
runSourceFunctionInTask(sourceContext, function);

Assert.assertTrue(
"Should have expected elements.", latch.await(WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS));
Thread.sleep(1000L);

awaitExpectedSplits(sourceContext);
// Stop the stream task.
function.close();

Assert.assertEquals("Should produce the expected splits", 1, sourceContext.splits.size());
TestHelpers.assertRecords(
sourceContext.toRows(), Lists.newArrayList(Iterables.concat(recordsList)), SCHEMA);
}
Expand All @@ -212,16 +204,13 @@ public void testCheckpointRestore() throws Exception {
TestSourceContext sourceContext = new TestSourceContext(latch);
runSourceFunctionInTask(sourceContext, func);

Assert.assertTrue(
"Should have expected elements.", latch.await(WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS));
Thread.sleep(1000L);
awaitExpectedSplits(sourceContext);

state = harness.snapshot(1, 1);

// Stop the stream task.
func.close();

Assert.assertEquals("Should produce the expected splits", 1, sourceContext.splits.size());
TestHelpers.assertRecords(
sourceContext.toRows(), Lists.newArrayList(Iterables.concat(recordsList)), SCHEMA);
}
Expand All @@ -238,19 +227,26 @@ public void testCheckpointRestore() throws Exception {
TestSourceContext sourceContext = new TestSourceContext(latch);
runSourceFunctionInTask(sourceContext, newFunc);

Assert.assertTrue(
"Should have expected elements.", latch.await(WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS));
Thread.sleep(1000L);
awaitExpectedSplits(sourceContext);

// Stop the stream task.
newFunc.close();

Assert.assertEquals("Should produce the expected splits", 1, sourceContext.splits.size());
TestHelpers.assertRecords(
sourceContext.toRows(), Lists.newArrayList(Iterables.concat(newRecordsList)), SCHEMA);
}
}

private void awaitExpectedSplits(TestSourceContext sourceContext) {
Awaitility.await("expected splits should be produced")
.atMost(Duration.ofMillis(WAIT_TIME_MILLIS))
.untilAsserted(
() -> {
assertThat(sourceContext.latch.getCount()).isEqualTo(0);
assertThat(sourceContext.splits).as("Should produce the expected splits").hasSize(1);
});
}

@Test
public void testInvalidMaxPlanningSnapshotCount() {
ScanContext scanContext1 =
Expand Down