From 7965c31aa4bbc3df5fcfa17b3e34cb2798c59c00 Mon Sep 17 00:00:00 2001 From: shreyanshsingh Date: Thu, 5 Oct 2023 23:06:11 +0530 Subject: [PATCH 01/13] Thread.sleep() method is replaced with Awaitility --- .../source/TestStreamingMonitorFunction.java | 36 +++++++++------- .../source/TestIcebergSourceContinuous.java | 10 +---- .../source/TestStreamingMonitorFunction.java | 40 +++++++++--------- .../source/TestIcebergSourceContinuous.java | 10 +---- .../source/TestStreamingMonitorFunction.java | 41 +++++++++---------- 5 files changed, 65 insertions(+), 72 deletions(-) diff --git a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamingMonitorFunction.java b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamingMonitorFunction.java index a161645979b7..ea15678af01d 100644 --- a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamingMonitorFunction.java +++ b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamingMonitorFunction.java @@ -52,6 +52,7 @@ import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; +import org.awaitility.Awaitility; @RunWith(Parameterized.class) public class TestStreamingMonitorFunction extends TableTestBase { @@ -111,9 +112,10 @@ public void testConsumeWithoutStartSnapshotId() throws Exception { TestSourceContext sourceContext = new TestSourceContext(latch); runSourceFunctionInTask(sourceContext, function); - Assert.assertTrue( - "Should have expected elements.", latch.await(WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS)); - Thread.sleep(1000L); + Awaitility.await() + .atMost(WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS) + .pollInterval(100, TimeUnit.MILLISECONDS) + .until(() -> latch.getCount() == 0); // Stop the stream task. function.close(); @@ -148,9 +150,10 @@ public void testConsumeFromStartSnapshotId() throws Exception { TestSourceContext sourceContext = new TestSourceContext(latch); runSourceFunctionInTask(sourceContext, function); - Assert.assertTrue( - "Should have expected elements.", latch.await(WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS)); - Thread.sleep(1000L); + Awaitility.await() + .atMost(WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS) + .pollInterval(100, TimeUnit.MILLISECONDS) + .until(() -> latch.getCount() == 0); // Stop the stream task. function.close(); @@ -184,9 +187,10 @@ public void testConsumeFromStartTag() throws Exception { TestSourceContext sourceContext = new TestSourceContext(latch); runSourceFunctionInTask(sourceContext, function); - Assert.assertTrue( - "Should have expected elements.", latch.await(WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS)); - Thread.sleep(1000L); + Awaitility.await() + .atMost(WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS) + .pollInterval(100, TimeUnit.MILLISECONDS) + .until(() -> latch.getCount() == 0); // Stop the stream task. function.close(); @@ -212,9 +216,10 @@ public void testCheckpointRestore() throws Exception { TestSourceContext sourceContext = new TestSourceContext(latch); runSourceFunctionInTask(sourceContext, func); - Assert.assertTrue( - "Should have expected elements.", latch.await(WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS)); - Thread.sleep(1000L); + Awaitility.await() + .atMost(WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS) + .pollInterval(100, TimeUnit.MILLISECONDS) + .until(() -> latch.getCount() == 0); state = harness.snapshot(1, 1); @@ -238,9 +243,10 @@ public void testCheckpointRestore() throws Exception { TestSourceContext sourceContext = new TestSourceContext(latch); runSourceFunctionInTask(sourceContext, newFunc); - Assert.assertTrue( - "Should have expected elements.", latch.await(WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS)); - Thread.sleep(1000L); + Awaitility.await() + .atMost(WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS) + .pollInterval(100, TimeUnit.MILLISECONDS) + .until(() -> latch.getCount() == 0); // Stop the stream task. newFunc.close(); diff --git a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceContinuous.java b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceContinuous.java index 6d26f933b334..64033920ca72 100644 --- a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceContinuous.java +++ b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceContinuous.java @@ -237,7 +237,7 @@ public void testLatestSnapshot() throws Exception { createStream(scanContext).executeAndCollect(getClass().getSimpleName())) { // we want to make sure job is running first so that enumerator can // start from the latest snapshot before inserting the next batch2 below. - waitUntilJobIsRunning(MINI_CLUSTER_RESOURCE.getClusterClient()); + Awaitility.await().pollDelay(10, TimeUnit.MILLISECONDS).until(() -> !getRunningJobs(MINI_CLUSTER_RESOURCE.getClusterClient()).isEmpty()); // inclusive behavior for starting snapshot List result1 = waitForResult(iter, 2); @@ -325,7 +325,7 @@ public void testSpecificSnapshotTimestamp() throws Exception { long snapshot0Timestamp = tableResource.table().currentSnapshot().timestampMillis(); // sleep for 2 ms to make sure snapshot1 has a higher timestamp value - Thread.sleep(2); + Awaitility.await().pollDelay(2, TimeUnit.MILLISECONDS).until(() -> System.currentTimeMillis()-snapshot0Timestamp>2); // snapshot1 List batch1 = @@ -401,12 +401,6 @@ public static List waitForResult(CloseableIterator iter, int limit) { return results; } - public static void waitUntilJobIsRunning(ClusterClient client) throws Exception { - while (getRunningJobs(client).isEmpty()) { - Thread.sleep(10); - } - } - public static List getRunningJobs(ClusterClient client) throws Exception { Collection statusMessages = client.listJobs().get(); return statusMessages.stream() diff --git a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamingMonitorFunction.java b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamingMonitorFunction.java index a161645979b7..b0a6358894e9 100644 --- a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamingMonitorFunction.java +++ b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamingMonitorFunction.java @@ -111,10 +111,10 @@ public void testConsumeWithoutStartSnapshotId() throws Exception { TestSourceContext sourceContext = new TestSourceContext(latch); runSourceFunctionInTask(sourceContext, function); - Assert.assertTrue( - "Should have expected elements.", latch.await(WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS)); - Thread.sleep(1000L); - + Awaitility.await() + .atMost(WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS) + .pollInterval(100, TimeUnit.MILLISECONDS) + .until(() -> latch.getCount() == 0); // Stop the stream task. function.close(); @@ -148,10 +148,10 @@ public void testConsumeFromStartSnapshotId() throws Exception { TestSourceContext sourceContext = new TestSourceContext(latch); runSourceFunctionInTask(sourceContext, function); - Assert.assertTrue( - "Should have expected elements.", latch.await(WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS)); - Thread.sleep(1000L); - + Awaitility.await() + .atMost(WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS) + .pollInterval(100, TimeUnit.MILLISECONDS) + .until(() -> latch.getCount() == 0); // Stop the stream task. function.close(); @@ -184,10 +184,10 @@ public void testConsumeFromStartTag() throws Exception { TestSourceContext sourceContext = new TestSourceContext(latch); runSourceFunctionInTask(sourceContext, function); - Assert.assertTrue( - "Should have expected elements.", latch.await(WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS)); - Thread.sleep(1000L); - + Awaitility.await() + .atMost(WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS) + .pollInterval(100, TimeUnit.MILLISECONDS) + .until(() -> latch.getCount() == 0); // Stop the stream task. function.close(); @@ -212,10 +212,10 @@ public void testCheckpointRestore() throws Exception { TestSourceContext sourceContext = new TestSourceContext(latch); runSourceFunctionInTask(sourceContext, func); - Assert.assertTrue( - "Should have expected elements.", latch.await(WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS)); - Thread.sleep(1000L); - + Awaitility.await() + .atMost(WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS) + .pollInterval(100, TimeUnit.MILLISECONDS) + .until(() -> latch.getCount() == 0); state = harness.snapshot(1, 1); // Stop the stream task. @@ -238,10 +238,10 @@ public void testCheckpointRestore() throws Exception { TestSourceContext sourceContext = new TestSourceContext(latch); runSourceFunctionInTask(sourceContext, newFunc); - Assert.assertTrue( - "Should have expected elements.", latch.await(WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS)); - Thread.sleep(1000L); - + Awaitility.await() + .atMost(WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS) + .pollInterval(100, TimeUnit.MILLISECONDS) + .until(() -> latch.getCount() == 0); // Stop the stream task. newFunc.close(); diff --git a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceContinuous.java b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceContinuous.java index 6d26f933b334..64033920ca72 100644 --- a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceContinuous.java +++ b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceContinuous.java @@ -237,7 +237,7 @@ public void testLatestSnapshot() throws Exception { createStream(scanContext).executeAndCollect(getClass().getSimpleName())) { // we want to make sure job is running first so that enumerator can // start from the latest snapshot before inserting the next batch2 below. - waitUntilJobIsRunning(MINI_CLUSTER_RESOURCE.getClusterClient()); + Awaitility.await().pollDelay(10, TimeUnit.MILLISECONDS).until(() -> !getRunningJobs(MINI_CLUSTER_RESOURCE.getClusterClient()).isEmpty()); // inclusive behavior for starting snapshot List result1 = waitForResult(iter, 2); @@ -325,7 +325,7 @@ public void testSpecificSnapshotTimestamp() throws Exception { long snapshot0Timestamp = tableResource.table().currentSnapshot().timestampMillis(); // sleep for 2 ms to make sure snapshot1 has a higher timestamp value - Thread.sleep(2); + Awaitility.await().pollDelay(2, TimeUnit.MILLISECONDS).until(() -> System.currentTimeMillis()-snapshot0Timestamp>2); // snapshot1 List batch1 = @@ -401,12 +401,6 @@ public static List waitForResult(CloseableIterator iter, int limit) { return results; } - public static void waitUntilJobIsRunning(ClusterClient client) throws Exception { - while (getRunningJobs(client).isEmpty()) { - Thread.sleep(10); - } - } - public static List getRunningJobs(ClusterClient client) throws Exception { Collection statusMessages = client.listJobs().get(); return statusMessages.stream() diff --git a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamingMonitorFunction.java b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamingMonitorFunction.java index 1b9049f1bbfc..f49248110138 100644 --- a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamingMonitorFunction.java +++ b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamingMonitorFunction.java @@ -111,10 +111,10 @@ public void testConsumeWithoutStartSnapshotId() throws Exception { TestSourceContext sourceContext = new TestSourceContext(latch); runSourceFunctionInTask(sourceContext, function); - Assert.assertTrue( - "Should have expected elements.", latch.await(WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS)); - Thread.sleep(1000L); - + Awaitility.await() + .atMost(WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS) + .pollInterval(100, TimeUnit.MILLISECONDS) + .until(() -> latch.getCount() == 0); // Stop the stream task. function.close(); @@ -147,11 +147,10 @@ public void testConsumeFromStartSnapshotId() throws Exception { CountDownLatch latch = new CountDownLatch(1); TestSourceContext sourceContext = new TestSourceContext(latch); runSourceFunctionInTask(sourceContext, function); - - Assert.assertTrue( - "Should have expected elements.", latch.await(WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS)); - Thread.sleep(1000L); - + Awaitility.await() + .atMost(WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS) + .pollInterval(100, TimeUnit.MILLISECONDS) + .until(() -> latch.getCount() == 0); // Stop the stream task. function.close(); @@ -184,10 +183,10 @@ public void testConsumeFromStartTag() throws Exception { TestSourceContext sourceContext = new TestSourceContext(latch); runSourceFunctionInTask(sourceContext, function); - Assert.assertTrue( - "Should have expected elements.", latch.await(WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS)); - Thread.sleep(1000L); - + Awaitility.await() + .atMost(WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS) + .pollInterval(100, TimeUnit.MILLISECONDS) + .until(() -> latch.getCount() == 0); // Stop the stream task. function.close(); @@ -212,10 +211,10 @@ public void testCheckpointRestore() throws Exception { TestSourceContext sourceContext = new TestSourceContext(latch); runSourceFunctionInTask(sourceContext, func); - Assert.assertTrue( - "Should have expected elements.", latch.await(WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS)); - Thread.sleep(1000L); - + Awaitility.await() + .atMost(WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS) + .pollInterval(100, TimeUnit.MILLISECONDS) + .until(() -> latch.getCount() == 0); state = harness.snapshot(1, 1); // Stop the stream task. @@ -238,10 +237,10 @@ public void testCheckpointRestore() throws Exception { TestSourceContext sourceContext = new TestSourceContext(latch); runSourceFunctionInTask(sourceContext, newFunc); - Assert.assertTrue( - "Should have expected elements.", latch.await(WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS)); - Thread.sleep(1000L); - + Awaitility.await() + .atMost(WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS) + .pollInterval(100, TimeUnit.MILLISECONDS) + .until(() -> latch.getCount() == 0); // Stop the stream task. newFunc.close(); From 4220218139eee246d74f8cf609bf363de949553b Mon Sep 17 00:00:00 2001 From: shreyanshR7 <134536320+shreyanshR7@users.noreply.github.com> Date: Thu, 5 Oct 2023 23:32:44 +0530 Subject: [PATCH 02/13] Update TestIcebergSourceContinuous.java --- .../apache/iceberg/flink/source/TestIcebergSourceContinuous.java | 1 + 1 file changed, 1 insertion(+) diff --git a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceContinuous.java b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceContinuous.java index 64033920ca72..28c7d0aacb63 100644 --- a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceContinuous.java +++ b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceContinuous.java @@ -52,6 +52,7 @@ import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; +import org.awaitility.Awaitility; public class TestIcebergSourceContinuous { From a9a038cd6e40cde2e0f34d6f69d5b6af756af3b1 Mon Sep 17 00:00:00 2001 From: shreyanshR7 <134536320+shreyanshR7@users.noreply.github.com> Date: Thu, 5 Oct 2023 23:33:40 +0530 Subject: [PATCH 03/13] Update TestStreamingMonitorFunction.java --- .../iceberg/flink/source/TestStreamingMonitorFunction.java | 1 + 1 file changed, 1 insertion(+) diff --git a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamingMonitorFunction.java b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamingMonitorFunction.java index b0a6358894e9..30c7313a8a33 100644 --- a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamingMonitorFunction.java +++ b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamingMonitorFunction.java @@ -52,6 +52,7 @@ import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; +import org.awaitility.Awaitility; @RunWith(Parameterized.class) public class TestStreamingMonitorFunction extends TableTestBase { From a5408b7601e505d2ed009387e00c6b5d0491dd5c Mon Sep 17 00:00:00 2001 From: shreyanshR7 <134536320+shreyanshR7@users.noreply.github.com> Date: Thu, 5 Oct 2023 23:34:13 +0530 Subject: [PATCH 04/13] Update TestIcebergSourceContinuous.java --- .../apache/iceberg/flink/source/TestIcebergSourceContinuous.java | 1 + 1 file changed, 1 insertion(+) diff --git a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceContinuous.java b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceContinuous.java index 64033920ca72..28c7d0aacb63 100644 --- a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceContinuous.java +++ b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceContinuous.java @@ -52,6 +52,7 @@ import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; +import org.awaitility.Awaitility; public class TestIcebergSourceContinuous { From 187f0f579330acc9e50711aea0fa672ecf58b02d Mon Sep 17 00:00:00 2001 From: shreyanshR7 <134536320+shreyanshR7@users.noreply.github.com> Date: Thu, 5 Oct 2023 23:34:43 +0530 Subject: [PATCH 05/13] Update TestStreamingMonitorFunction.java --- .../iceberg/flink/source/TestStreamingMonitorFunction.java | 1 + 1 file changed, 1 insertion(+) diff --git a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamingMonitorFunction.java b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamingMonitorFunction.java index f49248110138..76790784a031 100644 --- a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamingMonitorFunction.java +++ b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamingMonitorFunction.java @@ -52,6 +52,7 @@ import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; +import org.awaitility.Awaitility; @RunWith(Parameterized.class) public class TestStreamingMonitorFunction extends TableTestBase { From ece224eece11fc825480d21dd18a2d4867a4ef01 Mon Sep 17 00:00:00 2001 From: shreyanshR7 <134536320+shreyanshR7@users.noreply.github.com> Date: Fri, 6 Oct 2023 11:42:49 +0530 Subject: [PATCH 06/13] Update TestStreamingMonitorFunction.java --- .../source/TestStreamingMonitorFunction.java | 28 ++++++++----------- 1 file changed, 12 insertions(+), 16 deletions(-) diff --git a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamingMonitorFunction.java b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamingMonitorFunction.java index ea15678af01d..ba67afc27009 100644 --- a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamingMonitorFunction.java +++ b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamingMonitorFunction.java @@ -150,10 +150,9 @@ public void testConsumeFromStartSnapshotId() throws Exception { TestSourceContext sourceContext = new TestSourceContext(latch); runSourceFunctionInTask(sourceContext, function); - Awaitility.await() - .atMost(WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS) - .pollInterval(100, TimeUnit.MILLISECONDS) - .until(() -> latch.getCount() == 0); + Assert.assertTrue( + "Should have expected elements.", latch.await(WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS)); + Awaitility.await().atLeast(1, TimeUnit.SECONDS).until(() -> true); // Stop the stream task. function.close(); @@ -187,10 +186,9 @@ public void testConsumeFromStartTag() throws Exception { TestSourceContext sourceContext = new TestSourceContext(latch); runSourceFunctionInTask(sourceContext, function); - Awaitility.await() - .atMost(WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS) - .pollInterval(100, TimeUnit.MILLISECONDS) - .until(() -> latch.getCount() == 0); + Assert.assertTrue( + "Should have expected elements.", latch.await(WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS)); + Awaitility.await().atLeast(1, TimeUnit.SECONDS).until(() -> true); // Stop the stream task. function.close(); @@ -216,10 +214,9 @@ public void testCheckpointRestore() throws Exception { TestSourceContext sourceContext = new TestSourceContext(latch); runSourceFunctionInTask(sourceContext, func); - Awaitility.await() - .atMost(WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS) - .pollInterval(100, TimeUnit.MILLISECONDS) - .until(() -> latch.getCount() == 0); + Assert.assertTrue( + "Should have expected elements.", latch.await(WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS)); + Awaitility.await().atLeast(1, TimeUnit.SECONDS).until(() -> true); state = harness.snapshot(1, 1); @@ -243,10 +240,9 @@ public void testCheckpointRestore() throws Exception { TestSourceContext sourceContext = new TestSourceContext(latch); runSourceFunctionInTask(sourceContext, newFunc); - Awaitility.await() - .atMost(WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS) - .pollInterval(100, TimeUnit.MILLISECONDS) - .until(() -> latch.getCount() == 0); + Assert.assertTrue( + "Should have expected elements.", latch.await(WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS)); + Awaitility.await().atLeast(1, TimeUnit.SECONDS).until(() -> true); // Stop the stream task. newFunc.close(); From 953ccb8e0e20631a9b03e5fa6bd7b9178817c722 Mon Sep 17 00:00:00 2001 From: shreyanshR7 <134536320+shreyanshR7@users.noreply.github.com> Date: Fri, 6 Oct 2023 11:48:37 +0530 Subject: [PATCH 07/13] Update TestStreamingMonitorFunction.java --- .../iceberg/flink/source/TestStreamingMonitorFunction.java | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamingMonitorFunction.java b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamingMonitorFunction.java index ba67afc27009..c259b4ebad34 100644 --- a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamingMonitorFunction.java +++ b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamingMonitorFunction.java @@ -112,10 +112,9 @@ public void testConsumeWithoutStartSnapshotId() throws Exception { TestSourceContext sourceContext = new TestSourceContext(latch); runSourceFunctionInTask(sourceContext, function); - Awaitility.await() - .atMost(WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS) - .pollInterval(100, TimeUnit.MILLISECONDS) - .until(() -> latch.getCount() == 0); + Assert.assertTrue( + "Should have expected elements.", latch.await(WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS)); + Awaitility.await().atLeast(1, TimeUnit.SECONDS).until(() -> true); // Stop the stream task. function.close(); From 6119e5ed7f5f11b4aee17abbc3b82af57877b0bf Mon Sep 17 00:00:00 2001 From: shreyanshR7 <134536320+shreyanshR7@users.noreply.github.com> Date: Fri, 6 Oct 2023 11:50:16 +0530 Subject: [PATCH 08/13] Update TestStreamingMonitorFunction.java --- .../source/TestStreamingMonitorFunction.java | 37 +++++++++---------- 1 file changed, 17 insertions(+), 20 deletions(-) diff --git a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamingMonitorFunction.java b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamingMonitorFunction.java index 30c7313a8a33..5d4b51713092 100644 --- a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamingMonitorFunction.java +++ b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamingMonitorFunction.java @@ -112,10 +112,9 @@ public void testConsumeWithoutStartSnapshotId() throws Exception { TestSourceContext sourceContext = new TestSourceContext(latch); runSourceFunctionInTask(sourceContext, function); - Awaitility.await() - .atMost(WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS) - .pollInterval(100, TimeUnit.MILLISECONDS) - .until(() -> latch.getCount() == 0); + Assert.assertTrue( + "Should have expected elements.", latch.await(WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS)); + Awaitility.await().atLeast(1, TimeUnit.SECONDS).until(() -> true); // Stop the stream task. function.close(); @@ -149,10 +148,9 @@ public void testConsumeFromStartSnapshotId() throws Exception { TestSourceContext sourceContext = new TestSourceContext(latch); runSourceFunctionInTask(sourceContext, function); - Awaitility.await() - .atMost(WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS) - .pollInterval(100, TimeUnit.MILLISECONDS) - .until(() -> latch.getCount() == 0); + Assert.assertTrue( + "Should have expected elements.", latch.await(WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS)); + Awaitility.await().atLeast(1, TimeUnit.SECONDS).until(() -> true); // Stop the stream task. function.close(); @@ -185,10 +183,9 @@ public void testConsumeFromStartTag() throws Exception { TestSourceContext sourceContext = new TestSourceContext(latch); runSourceFunctionInTask(sourceContext, function); - Awaitility.await() - .atMost(WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS) - .pollInterval(100, TimeUnit.MILLISECONDS) - .until(() -> latch.getCount() == 0); + Assert.assertTrue( + "Should have expected elements.", latch.await(WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS)); + Awaitility.await().atLeast(1, TimeUnit.SECONDS).until(() -> true); // Stop the stream task. function.close(); @@ -213,10 +210,10 @@ public void testCheckpointRestore() throws Exception { TestSourceContext sourceContext = new TestSourceContext(latch); runSourceFunctionInTask(sourceContext, func); - Awaitility.await() - .atMost(WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS) - .pollInterval(100, TimeUnit.MILLISECONDS) - .until(() -> latch.getCount() == 0); + Assert.assertTrue( + "Should have expected elements.", latch.await(WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS)); + Awaitility.await().atLeast(1, TimeUnit.SECONDS).until(() -> true); + state = harness.snapshot(1, 1); // Stop the stream task. @@ -239,10 +236,10 @@ public void testCheckpointRestore() throws Exception { TestSourceContext sourceContext = new TestSourceContext(latch); runSourceFunctionInTask(sourceContext, newFunc); - Awaitility.await() - .atMost(WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS) - .pollInterval(100, TimeUnit.MILLISECONDS) - .until(() -> latch.getCount() == 0); + Assert.assertTrue( + "Should have expected elements.", latch.await(WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS)); + Awaitility.await().atLeast(1, TimeUnit.SECONDS).until(() -> true); + // Stop the stream task. newFunc.close(); From 2d5a567e00557deda739c86824678e46aaedf332 Mon Sep 17 00:00:00 2001 From: shreyanshR7 <134536320+shreyanshR7@users.noreply.github.com> Date: Fri, 6 Oct 2023 11:51:39 +0530 Subject: [PATCH 09/13] Update TestStreamingMonitorFunction.java --- .../source/TestStreamingMonitorFunction.java | 32 +++++++++---------- 1 file changed, 16 insertions(+), 16 deletions(-) diff --git a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamingMonitorFunction.java b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamingMonitorFunction.java index 76790784a031..d6b5343f4bac 100644 --- a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamingMonitorFunction.java +++ b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamingMonitorFunction.java @@ -112,10 +112,10 @@ public void testConsumeWithoutStartSnapshotId() throws Exception { TestSourceContext sourceContext = new TestSourceContext(latch); runSourceFunctionInTask(sourceContext, function); - Awaitility.await() - .atMost(WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS) - .pollInterval(100, TimeUnit.MILLISECONDS) - .until(() -> latch.getCount() == 0); + Assert.assertTrue( + "Should have expected elements.", latch.await(WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS)); + Awaitility.await().atLeast(1, TimeUnit.SECONDS).until(() -> true); + // Stop the stream task. function.close(); @@ -184,10 +184,10 @@ public void testConsumeFromStartTag() throws Exception { TestSourceContext sourceContext = new TestSourceContext(latch); runSourceFunctionInTask(sourceContext, function); - Awaitility.await() - .atMost(WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS) - .pollInterval(100, TimeUnit.MILLISECONDS) - .until(() -> latch.getCount() == 0); + Assert.assertTrue( + "Should have expected elements.", latch.await(WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS)); + Awaitility.await().atLeast(1, TimeUnit.SECONDS).until(() -> true); + // Stop the stream task. function.close(); @@ -212,10 +212,10 @@ public void testCheckpointRestore() throws Exception { TestSourceContext sourceContext = new TestSourceContext(latch); runSourceFunctionInTask(sourceContext, func); - Awaitility.await() - .atMost(WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS) - .pollInterval(100, TimeUnit.MILLISECONDS) - .until(() -> latch.getCount() == 0); + Assert.assertTrue( + "Should have expected elements.", latch.await(WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS)); + Awaitility.await().atLeast(1, TimeUnit.SECONDS).until(() -> true); + state = harness.snapshot(1, 1); // Stop the stream task. @@ -238,10 +238,10 @@ public void testCheckpointRestore() throws Exception { TestSourceContext sourceContext = new TestSourceContext(latch); runSourceFunctionInTask(sourceContext, newFunc); - Awaitility.await() - .atMost(WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS) - .pollInterval(100, TimeUnit.MILLISECONDS) - .until(() -> latch.getCount() == 0); + Assert.assertTrue( + "Should have expected elements.", latch.await(WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS)); + Awaitility.await().atLeast(1, TimeUnit.SECONDS).until(() -> true); + // Stop the stream task. newFunc.close(); From d6eb3fe3f2b075f0cacc245a9552b89e6201eacb Mon Sep 17 00:00:00 2001 From: shreyanshR7 <134536320+shreyanshR7@users.noreply.github.com> Date: Fri, 6 Oct 2023 11:55:16 +0530 Subject: [PATCH 10/13] Update TestStreamingMonitorFunction.java --- .../iceberg/flink/source/TestStreamingMonitorFunction.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamingMonitorFunction.java b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamingMonitorFunction.java index 5d4b51713092..c90c401e7d78 100644 --- a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamingMonitorFunction.java +++ b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamingMonitorFunction.java @@ -112,7 +112,7 @@ public void testConsumeWithoutStartSnapshotId() throws Exception { TestSourceContext sourceContext = new TestSourceContext(latch); runSourceFunctionInTask(sourceContext, function); - Assert.assertTrue( + Assert.assertTrue( "Should have expected elements.", latch.await(WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS)); Awaitility.await().atLeast(1, TimeUnit.SECONDS).until(() -> true); // Stop the stream task. @@ -239,7 +239,6 @@ public void testCheckpointRestore() throws Exception { Assert.assertTrue( "Should have expected elements.", latch.await(WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS)); Awaitility.await().atLeast(1, TimeUnit.SECONDS).until(() -> true); - // Stop the stream task. newFunc.close(); From 53e3f5370387d7d615080072f7fbfe0ac9ece3c8 Mon Sep 17 00:00:00 2001 From: shreyanshR7 <134536320+shreyanshR7@users.noreply.github.com> Date: Fri, 6 Oct 2023 11:57:01 +0530 Subject: [PATCH 11/13] Update TestStreamingMonitorFunction.java --- .../flink/source/TestStreamingMonitorFunction.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamingMonitorFunction.java b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamingMonitorFunction.java index d6b5343f4bac..6e222c784269 100644 --- a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamingMonitorFunction.java +++ b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamingMonitorFunction.java @@ -148,10 +148,10 @@ public void testConsumeFromStartSnapshotId() throws Exception { CountDownLatch latch = new CountDownLatch(1); TestSourceContext sourceContext = new TestSourceContext(latch); runSourceFunctionInTask(sourceContext, function); - Awaitility.await() - .atMost(WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS) - .pollInterval(100, TimeUnit.MILLISECONDS) - .until(() -> latch.getCount() == 0); + + Assert.assertTrue( + "Should have expected elements.", latch.await(WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS)); + Awaitility.await().atLeast(1, TimeUnit.SECONDS).until(() -> true); // Stop the stream task. function.close(); From 1f704eed7daa359bf7dd471d139ffa61fbf4d7c5 Mon Sep 17 00:00:00 2001 From: shreyanshR7 <134536320+shreyanshR7@users.noreply.github.com> Date: Fri, 6 Oct 2023 12:00:30 +0530 Subject: [PATCH 12/13] Update TestIcebergSourceContinuous.java --- .../flink/source/TestIcebergSourceContinuous.java | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceContinuous.java b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceContinuous.java index 28c7d0aacb63..5d876f67cf4d 100644 --- a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceContinuous.java +++ b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceContinuous.java @@ -326,8 +326,7 @@ public void testSpecificSnapshotTimestamp() throws Exception { long snapshot0Timestamp = tableResource.table().currentSnapshot().timestampMillis(); // sleep for 2 ms to make sure snapshot1 has a higher timestamp value - Awaitility.await().pollDelay(2, TimeUnit.MILLISECONDS).until(() -> System.currentTimeMillis()-snapshot0Timestamp>2); - + waitUntil1ms(); // snapshot1 List batch1 = RandomGenericData.generate(tableResource.table().schema(), 2, randomSeed.incrementAndGet()); @@ -401,6 +400,14 @@ public static List waitForResult(CloseableIterator iter, int limit) { } return results; } + public static long waitUntil1ms() { + long previous = System.currentTimeMillis(); + long current = System.currentTimeMillis(); + while (current <= previous) { + current = System.currentTimeMillis(); + } + return current; + } public static List getRunningJobs(ClusterClient client) throws Exception { Collection statusMessages = client.listJobs().get(); From 67c5978981d74d65a2fb5684081a07e2becdc94d Mon Sep 17 00:00:00 2001 From: shreyanshR7 <134536320+shreyanshR7@users.noreply.github.com> Date: Fri, 6 Oct 2023 12:01:40 +0530 Subject: [PATCH 13/13] Update TestIcebergSourceContinuous.java --- .../flink/source/TestIcebergSourceContinuous.java | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceContinuous.java b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceContinuous.java index 28c7d0aacb63..d2ff30e3de0a 100644 --- a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceContinuous.java +++ b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceContinuous.java @@ -326,7 +326,7 @@ public void testSpecificSnapshotTimestamp() throws Exception { long snapshot0Timestamp = tableResource.table().currentSnapshot().timestampMillis(); // sleep for 2 ms to make sure snapshot1 has a higher timestamp value - Awaitility.await().pollDelay(2, TimeUnit.MILLISECONDS).until(() -> System.currentTimeMillis()-snapshot0Timestamp>2); + waitUntil1ms(); // snapshot1 List batch1 = @@ -401,6 +401,14 @@ public static List waitForResult(CloseableIterator iter, int limit) { } return results; } + public static long waitUntil1ms() { + long previous = System.currentTimeMillis(); + long current = System.currentTimeMillis(); + while (current <= previous) { + current = System.currentTimeMillis(); + } + return current; + } public static List getRunningJobs(ClusterClient client) throws Exception { Collection statusMessages = client.listJobs().get();