diff --git a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamingMonitorFunction.java b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamingMonitorFunction.java index a161645979b7..c259b4ebad34 100644 --- a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamingMonitorFunction.java +++ b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamingMonitorFunction.java @@ -52,6 +52,7 @@ import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; +import org.awaitility.Awaitility; @RunWith(Parameterized.class) public class TestStreamingMonitorFunction extends TableTestBase { @@ -113,7 +114,7 @@ public void testConsumeWithoutStartSnapshotId() throws Exception { Assert.assertTrue( "Should have expected elements.", latch.await(WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS)); - Thread.sleep(1000L); + Awaitility.await().atLeast(1, TimeUnit.SECONDS).until(() -> true); // Stop the stream task. function.close(); @@ -150,7 +151,7 @@ public void testConsumeFromStartSnapshotId() throws Exception { Assert.assertTrue( "Should have expected elements.", latch.await(WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS)); - Thread.sleep(1000L); + Awaitility.await().atLeast(1, TimeUnit.SECONDS).until(() -> true); // Stop the stream task. function.close(); @@ -186,7 +187,7 @@ public void testConsumeFromStartTag() throws Exception { Assert.assertTrue( "Should have expected elements.", latch.await(WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS)); - Thread.sleep(1000L); + Awaitility.await().atLeast(1, TimeUnit.SECONDS).until(() -> true); // Stop the stream task. function.close(); @@ -214,7 +215,7 @@ public void testCheckpointRestore() throws Exception { Assert.assertTrue( "Should have expected elements.", latch.await(WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS)); - Thread.sleep(1000L); + Awaitility.await().atLeast(1, TimeUnit.SECONDS).until(() -> true); state = harness.snapshot(1, 1); @@ -240,7 +241,7 @@ public void testCheckpointRestore() throws Exception { Assert.assertTrue( "Should have expected elements.", latch.await(WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS)); - Thread.sleep(1000L); + Awaitility.await().atLeast(1, TimeUnit.SECONDS).until(() -> true); // Stop the stream task. newFunc.close(); diff --git a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceContinuous.java b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceContinuous.java index 6d26f933b334..5d876f67cf4d 100644 --- a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceContinuous.java +++ b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceContinuous.java @@ -52,6 +52,7 @@ import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; +import org.awaitility.Awaitility; public class TestIcebergSourceContinuous { @@ -237,7 +238,7 @@ public void testLatestSnapshot() throws Exception { createStream(scanContext).executeAndCollect(getClass().getSimpleName())) { // we want to make sure job is running first so that enumerator can // start from the latest snapshot before inserting the next batch2 below. - waitUntilJobIsRunning(MINI_CLUSTER_RESOURCE.getClusterClient()); + Awaitility.await().pollDelay(10, TimeUnit.MILLISECONDS).until(() -> !getRunningJobs(MINI_CLUSTER_RESOURCE.getClusterClient()).isEmpty()); // inclusive behavior for starting snapshot List result1 = waitForResult(iter, 2); @@ -325,8 +326,7 @@ public void testSpecificSnapshotTimestamp() throws Exception { long snapshot0Timestamp = tableResource.table().currentSnapshot().timestampMillis(); // sleep for 2 ms to make sure snapshot1 has a higher timestamp value - Thread.sleep(2); - + waitUntil1ms(); // snapshot1 List batch1 = RandomGenericData.generate(tableResource.table().schema(), 2, randomSeed.incrementAndGet()); @@ -400,11 +400,13 @@ public static List waitForResult(CloseableIterator iter, int limit) { } return results; } - - public static void waitUntilJobIsRunning(ClusterClient client) throws Exception { - while (getRunningJobs(client).isEmpty()) { - Thread.sleep(10); - } + public static long waitUntil1ms() { + long previous = System.currentTimeMillis(); + long current = System.currentTimeMillis(); + while (current <= previous) { + current = System.currentTimeMillis(); + } + return current; } public static List getRunningJobs(ClusterClient client) throws Exception { diff --git a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamingMonitorFunction.java b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamingMonitorFunction.java index a161645979b7..c90c401e7d78 100644 --- a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamingMonitorFunction.java +++ b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamingMonitorFunction.java @@ -52,6 +52,7 @@ import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; +import org.awaitility.Awaitility; @RunWith(Parameterized.class) public class TestStreamingMonitorFunction extends TableTestBase { @@ -113,8 +114,7 @@ public void testConsumeWithoutStartSnapshotId() throws Exception { Assert.assertTrue( "Should have expected elements.", latch.await(WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS)); - Thread.sleep(1000L); - + Awaitility.await().atLeast(1, TimeUnit.SECONDS).until(() -> true); // Stop the stream task. function.close(); @@ -150,8 +150,7 @@ public void testConsumeFromStartSnapshotId() throws Exception { Assert.assertTrue( "Should have expected elements.", latch.await(WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS)); - Thread.sleep(1000L); - + Awaitility.await().atLeast(1, TimeUnit.SECONDS).until(() -> true); // Stop the stream task. function.close(); @@ -186,8 +185,7 @@ public void testConsumeFromStartTag() throws Exception { Assert.assertTrue( "Should have expected elements.", latch.await(WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS)); - Thread.sleep(1000L); - + Awaitility.await().atLeast(1, TimeUnit.SECONDS).until(() -> true); // Stop the stream task. function.close(); @@ -214,8 +212,8 @@ public void testCheckpointRestore() throws Exception { Assert.assertTrue( "Should have expected elements.", latch.await(WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS)); - Thread.sleep(1000L); - + Awaitility.await().atLeast(1, TimeUnit.SECONDS).until(() -> true); + state = harness.snapshot(1, 1); // Stop the stream task. @@ -240,8 +238,7 @@ public void testCheckpointRestore() throws Exception { Assert.assertTrue( "Should have expected elements.", latch.await(WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS)); - Thread.sleep(1000L); - + Awaitility.await().atLeast(1, TimeUnit.SECONDS).until(() -> true); // Stop the stream task. newFunc.close(); diff --git a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceContinuous.java b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceContinuous.java index 6d26f933b334..d2ff30e3de0a 100644 --- a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceContinuous.java +++ b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceContinuous.java @@ -52,6 +52,7 @@ import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; +import org.awaitility.Awaitility; public class TestIcebergSourceContinuous { @@ -237,7 +238,7 @@ public void testLatestSnapshot() throws Exception { createStream(scanContext).executeAndCollect(getClass().getSimpleName())) { // we want to make sure job is running first so that enumerator can // start from the latest snapshot before inserting the next batch2 below. - waitUntilJobIsRunning(MINI_CLUSTER_RESOURCE.getClusterClient()); + Awaitility.await().pollDelay(10, TimeUnit.MILLISECONDS).until(() -> !getRunningJobs(MINI_CLUSTER_RESOURCE.getClusterClient()).isEmpty()); // inclusive behavior for starting snapshot List result1 = waitForResult(iter, 2); @@ -325,7 +326,7 @@ public void testSpecificSnapshotTimestamp() throws Exception { long snapshot0Timestamp = tableResource.table().currentSnapshot().timestampMillis(); // sleep for 2 ms to make sure snapshot1 has a higher timestamp value - Thread.sleep(2); + waitUntil1ms(); // snapshot1 List batch1 = @@ -400,11 +401,13 @@ public static List waitForResult(CloseableIterator iter, int limit) { } return results; } - - public static void waitUntilJobIsRunning(ClusterClient client) throws Exception { - while (getRunningJobs(client).isEmpty()) { - Thread.sleep(10); - } + public static long waitUntil1ms() { + long previous = System.currentTimeMillis(); + long current = System.currentTimeMillis(); + while (current <= previous) { + current = System.currentTimeMillis(); + } + return current; } public static List getRunningJobs(ClusterClient client) throws Exception { diff --git a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamingMonitorFunction.java b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamingMonitorFunction.java index 1b9049f1bbfc..6e222c784269 100644 --- a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamingMonitorFunction.java +++ b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamingMonitorFunction.java @@ -52,6 +52,7 @@ import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; +import org.awaitility.Awaitility; @RunWith(Parameterized.class) public class TestStreamingMonitorFunction extends TableTestBase { @@ -113,8 +114,8 @@ public void testConsumeWithoutStartSnapshotId() throws Exception { Assert.assertTrue( "Should have expected elements.", latch.await(WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS)); - Thread.sleep(1000L); - + Awaitility.await().atLeast(1, TimeUnit.SECONDS).until(() -> true); + // Stop the stream task. function.close(); @@ -147,11 +148,10 @@ public void testConsumeFromStartSnapshotId() throws Exception { CountDownLatch latch = new CountDownLatch(1); TestSourceContext sourceContext = new TestSourceContext(latch); runSourceFunctionInTask(sourceContext, function); - + Assert.assertTrue( "Should have expected elements.", latch.await(WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS)); - Thread.sleep(1000L); - + Awaitility.await().atLeast(1, TimeUnit.SECONDS).until(() -> true); // Stop the stream task. function.close(); @@ -186,8 +186,8 @@ public void testConsumeFromStartTag() throws Exception { Assert.assertTrue( "Should have expected elements.", latch.await(WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS)); - Thread.sleep(1000L); - + Awaitility.await().atLeast(1, TimeUnit.SECONDS).until(() -> true); + // Stop the stream task. function.close(); @@ -214,8 +214,8 @@ public void testCheckpointRestore() throws Exception { Assert.assertTrue( "Should have expected elements.", latch.await(WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS)); - Thread.sleep(1000L); - + Awaitility.await().atLeast(1, TimeUnit.SECONDS).until(() -> true); + state = harness.snapshot(1, 1); // Stop the stream task. @@ -240,8 +240,8 @@ public void testCheckpointRestore() throws Exception { Assert.assertTrue( "Should have expected elements.", latch.await(WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS)); - Thread.sleep(1000L); - + Awaitility.await().atLeast(1, TimeUnit.SECONDS).until(() -> true); + // Stop the stream task. newFunc.close();