Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.awaitility.Awaitility;

@RunWith(Parameterized.class)
public class TestStreamingMonitorFunction extends TableTestBase {
Expand Down Expand Up @@ -113,7 +114,7 @@ public void testConsumeWithoutStartSnapshotId() throws Exception {

Assert.assertTrue(
"Should have expected elements.", latch.await(WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS));
Thread.sleep(1000L);
Awaitility.await().atLeast(1, TimeUnit.SECONDS).until(() -> true);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this goes back to my earlier comment here. We don't want to blindly just change Thread.sleep usage to Awaitility

Copy link
Author

@shreyanshR7 shreyanshR7 Oct 6, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am new to open source , will it be ok if i replace this with simply Awaitility.await()
.atMost(WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS)
.pollInterval(100, TimeUnit.MILLISECONDS)
.until(() -> latch.getCount() == 0); @nastra or suggest changes please.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@shreyanshR7 , I also tried to make a fix for this here . You can still continue on flink module.


// Stop the stream task.
function.close();
Expand Down Expand Up @@ -150,7 +151,7 @@ public void testConsumeFromStartSnapshotId() throws Exception {

Assert.assertTrue(
"Should have expected elements.", latch.await(WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS));
Thread.sleep(1000L);
Awaitility.await().atLeast(1, TimeUnit.SECONDS).until(() -> true);

// Stop the stream task.
function.close();
Expand Down Expand Up @@ -186,7 +187,7 @@ public void testConsumeFromStartTag() throws Exception {

Assert.assertTrue(
"Should have expected elements.", latch.await(WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS));
Thread.sleep(1000L);
Awaitility.await().atLeast(1, TimeUnit.SECONDS).until(() -> true);

// Stop the stream task.
function.close();
Expand Down Expand Up @@ -214,7 +215,7 @@ public void testCheckpointRestore() throws Exception {

Assert.assertTrue(
"Should have expected elements.", latch.await(WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS));
Thread.sleep(1000L);
Awaitility.await().atLeast(1, TimeUnit.SECONDS).until(() -> true);

state = harness.snapshot(1, 1);

Expand All @@ -240,7 +241,7 @@ public void testCheckpointRestore() throws Exception {

Assert.assertTrue(
"Should have expected elements.", latch.await(WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS));
Thread.sleep(1000L);
Awaitility.await().atLeast(1, TimeUnit.SECONDS).until(() -> true);

// Stop the stream task.
newFunc.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.awaitility.Awaitility;

public class TestIcebergSourceContinuous {

Expand Down Expand Up @@ -237,7 +238,7 @@ public void testLatestSnapshot() throws Exception {
createStream(scanContext).executeAndCollect(getClass().getSimpleName())) {
// we want to make sure job is running first so that enumerator can
// start from the latest snapshot before inserting the next batch2 below.
waitUntilJobIsRunning(MINI_CLUSTER_RESOURCE.getClusterClient());
Awaitility.await().pollDelay(10, TimeUnit.MILLISECONDS).until(() -> !getRunningJobs(MINI_CLUSTER_RESOURCE.getClusterClient()).isEmpty());

// inclusive behavior for starting snapshot
List<Row> result1 = waitForResult(iter, 2);
Expand Down Expand Up @@ -325,8 +326,7 @@ public void testSpecificSnapshotTimestamp() throws Exception {
long snapshot0Timestamp = tableResource.table().currentSnapshot().timestampMillis();

// sleep for 2 ms to make sure snapshot1 has a higher timestamp value
Thread.sleep(2);

waitUntil1ms();
// snapshot1
List<Record> batch1 =
RandomGenericData.generate(tableResource.table().schema(), 2, randomSeed.incrementAndGet());
Expand Down Expand Up @@ -400,11 +400,13 @@ public static List<Row> waitForResult(CloseableIterator<Row> iter, int limit) {
}
return results;
}

public static void waitUntilJobIsRunning(ClusterClient<?> client) throws Exception {
while (getRunningJobs(client).isEmpty()) {
Thread.sleep(10);
}
public static long waitUntil1ms() {
long previous = System.currentTimeMillis();
long current = System.currentTimeMillis();
while (current <= previous) {
current = System.currentTimeMillis();
}
return current;
}

public static List<JobID> getRunningJobs(ClusterClient<?> client) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.awaitility.Awaitility;

@RunWith(Parameterized.class)
public class TestStreamingMonitorFunction extends TableTestBase {
Expand Down Expand Up @@ -113,8 +114,7 @@ public void testConsumeWithoutStartSnapshotId() throws Exception {

Assert.assertTrue(
"Should have expected elements.", latch.await(WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS));
Thread.sleep(1000L);

Awaitility.await().atLeast(1, TimeUnit.SECONDS).until(() -> true);
// Stop the stream task.
function.close();

Expand Down Expand Up @@ -150,8 +150,7 @@ public void testConsumeFromStartSnapshotId() throws Exception {

Assert.assertTrue(
"Should have expected elements.", latch.await(WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS));
Thread.sleep(1000L);

Awaitility.await().atLeast(1, TimeUnit.SECONDS).until(() -> true);
// Stop the stream task.
function.close();

Expand Down Expand Up @@ -186,8 +185,7 @@ public void testConsumeFromStartTag() throws Exception {

Assert.assertTrue(
"Should have expected elements.", latch.await(WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS));
Thread.sleep(1000L);

Awaitility.await().atLeast(1, TimeUnit.SECONDS).until(() -> true);
// Stop the stream task.
function.close();

Expand All @@ -214,8 +212,8 @@ public void testCheckpointRestore() throws Exception {

Assert.assertTrue(
"Should have expected elements.", latch.await(WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS));
Thread.sleep(1000L);

Awaitility.await().atLeast(1, TimeUnit.SECONDS).until(() -> true);
state = harness.snapshot(1, 1);

// Stop the stream task.
Expand All @@ -240,8 +238,7 @@ public void testCheckpointRestore() throws Exception {

Assert.assertTrue(
"Should have expected elements.", latch.await(WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS));
Thread.sleep(1000L);

Awaitility.await().atLeast(1, TimeUnit.SECONDS).until(() -> true);
// Stop the stream task.
newFunc.close();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.awaitility.Awaitility;

public class TestIcebergSourceContinuous {

Expand Down Expand Up @@ -237,7 +238,7 @@ public void testLatestSnapshot() throws Exception {
createStream(scanContext).executeAndCollect(getClass().getSimpleName())) {
// we want to make sure job is running first so that enumerator can
// start from the latest snapshot before inserting the next batch2 below.
waitUntilJobIsRunning(MINI_CLUSTER_RESOURCE.getClusterClient());
Awaitility.await().pollDelay(10, TimeUnit.MILLISECONDS).until(() -> !getRunningJobs(MINI_CLUSTER_RESOURCE.getClusterClient()).isEmpty());

// inclusive behavior for starting snapshot
List<Row> result1 = waitForResult(iter, 2);
Expand Down Expand Up @@ -325,7 +326,7 @@ public void testSpecificSnapshotTimestamp() throws Exception {
long snapshot0Timestamp = tableResource.table().currentSnapshot().timestampMillis();

// sleep for 2 ms to make sure snapshot1 has a higher timestamp value
Thread.sleep(2);
waitUntil1ms();

// snapshot1
List<Record> batch1 =
Expand Down Expand Up @@ -400,11 +401,13 @@ public static List<Row> waitForResult(CloseableIterator<Row> iter, int limit) {
}
return results;
}

public static void waitUntilJobIsRunning(ClusterClient<?> client) throws Exception {
while (getRunningJobs(client).isEmpty()) {
Thread.sleep(10);
}
public static long waitUntil1ms() {
long previous = System.currentTimeMillis();
long current = System.currentTimeMillis();
while (current <= previous) {
current = System.currentTimeMillis();
}
return current;
}

public static List<JobID> getRunningJobs(ClusterClient<?> client) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.awaitility.Awaitility;

@RunWith(Parameterized.class)
public class TestStreamingMonitorFunction extends TableTestBase {
Expand Down Expand Up @@ -113,8 +114,8 @@ public void testConsumeWithoutStartSnapshotId() throws Exception {

Assert.assertTrue(
"Should have expected elements.", latch.await(WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS));
Thread.sleep(1000L);

Awaitility.await().atLeast(1, TimeUnit.SECONDS).until(() -> true);
// Stop the stream task.
function.close();

Expand Down Expand Up @@ -147,11 +148,10 @@ public void testConsumeFromStartSnapshotId() throws Exception {
CountDownLatch latch = new CountDownLatch(1);
TestSourceContext sourceContext = new TestSourceContext(latch);
runSourceFunctionInTask(sourceContext, function);

Assert.assertTrue(
"Should have expected elements.", latch.await(WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS));
Thread.sleep(1000L);

Awaitility.await().atLeast(1, TimeUnit.SECONDS).until(() -> true);
// Stop the stream task.
function.close();

Expand Down Expand Up @@ -186,8 +186,8 @@ public void testConsumeFromStartTag() throws Exception {

Assert.assertTrue(
"Should have expected elements.", latch.await(WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS));
Thread.sleep(1000L);

Awaitility.await().atLeast(1, TimeUnit.SECONDS).until(() -> true);
// Stop the stream task.
function.close();

Expand All @@ -214,8 +214,8 @@ public void testCheckpointRestore() throws Exception {

Assert.assertTrue(
"Should have expected elements.", latch.await(WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS));
Thread.sleep(1000L);

Awaitility.await().atLeast(1, TimeUnit.SECONDS).until(() -> true);
state = harness.snapshot(1, 1);

// Stop the stream task.
Expand All @@ -240,8 +240,8 @@ public void testCheckpointRestore() throws Exception {

Assert.assertTrue(
"Should have expected elements.", latch.await(WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS));
Thread.sleep(1000L);

Awaitility.await().atLeast(1, TimeUnit.SECONDS).until(() -> true);
// Stop the stream task.
newFunc.close();

Expand Down