From 0231b52863af3e7c11e547f5abbb1ca4921195e4 Mon Sep 17 00:00:00 2001 From: capistrant Date: Tue, 25 Mar 2025 11:15:01 -0500 Subject: [PATCH 1/6] Fix flakiness inSegmentBootstrapperTest Make TestSegmentCacheManager thread safe by moving from ArrayList to CopyOnWriteArrayList Modify assertions to disregard list ordering since order of list modifications is not always deterministic --- .../coordination/SegmentBootstrapperTest.java | 42 ++++++++++++++----- .../coordination/TestSegmentCacheManager.java | 16 ++++--- 2 files changed, 41 insertions(+), 17 deletions(-) diff --git a/server/src/test/java/org/apache/druid/server/coordination/SegmentBootstrapperTest.java b/server/src/test/java/org/apache/druid/server/coordination/SegmentBootstrapperTest.java index fe1424e27005..fb60a2ea96e6 100644 --- a/server/src/test/java/org/apache/druid/server/coordination/SegmentBootstrapperTest.java +++ b/server/src/test/java/org/apache/druid/server/coordination/SegmentBootstrapperTest.java @@ -150,11 +150,18 @@ public void testStartStop() throws Exception Assert.assertEquals(2L, segmentManager.getDataSourceCounts().get("test_two" + i).longValue()); } - Assert.assertEquals(ImmutableList.copyOf(segments), segmentAnnouncer.getObservedSegments()); - final ImmutableList expectedBootstrapSegments = ImmutableList.copyOf(segments); - Assert.assertEquals(expectedBootstrapSegments, cacheManager.getObservedBootstrapSegments()); - Assert.assertEquals(expectedBootstrapSegments, cacheManager.getObservedBootstrapSegmentsLoadedIntoPageCache()); + + // The following verbose assert - seen throughout this test class - confirms list item equality irrespective of item order. + Assert.assertTrue(expectedBootstrapSegments.size() == segmentAnnouncer.getObservedSegments().size() && + expectedBootstrapSegments.containsAll(segmentAnnouncer.getObservedSegments()) && + segmentAnnouncer.getObservedSegments().containsAll(expectedBootstrapSegments)); + Assert.assertTrue(expectedBootstrapSegments.size() == cacheManager.getObservedBootstrapSegments().size() && + expectedBootstrapSegments.containsAll(cacheManager.getObservedBootstrapSegments()) && + cacheManager.getObservedBootstrapSegments().containsAll(expectedBootstrapSegments)); + Assert.assertTrue(expectedBootstrapSegments.size() == cacheManager.getObservedBootstrapSegmentsLoadedIntoPageCache().size() && + expectedBootstrapSegments.containsAll(cacheManager.getObservedBootstrapSegmentsLoadedIntoPageCache()) && + cacheManager.getObservedBootstrapSegmentsLoadedIntoPageCache().containsAll(expectedBootstrapSegments)); Assert.assertEquals(ImmutableList.of(), cacheManager.getObservedSegments()); Assert.assertEquals(ImmutableList.of(), cacheManager.getObservedSegmentsLoadedIntoPageCache()); @@ -211,11 +218,16 @@ public void testLoadCachedSegments() throws Exception Assert.assertEquals(2L, segmentManager.getDataSourceCounts().get("test_two" + i).longValue()); } - Assert.assertEquals(ImmutableList.copyOf(segments), segmentAnnouncer.getObservedSegments()); - final ImmutableList expectedBootstrapSegments = ImmutableList.copyOf(segments); - Assert.assertEquals(expectedBootstrapSegments, cacheManager.getObservedBootstrapSegments()); - Assert.assertEquals(expectedBootstrapSegments, cacheManager.getObservedBootstrapSegmentsLoadedIntoPageCache()); + Assert.assertTrue(expectedBootstrapSegments.size() == segmentAnnouncer.getObservedSegments().size() && + expectedBootstrapSegments.containsAll(segmentAnnouncer.getObservedSegments()) && + segmentAnnouncer.getObservedSegments().containsAll(expectedBootstrapSegments)); + Assert.assertTrue(expectedBootstrapSegments.size() == cacheManager.getObservedBootstrapSegments().size() && + expectedBootstrapSegments.containsAll(cacheManager.getObservedBootstrapSegments()) && + cacheManager.getObservedBootstrapSegments().containsAll(expectedBootstrapSegments)); + Assert.assertTrue(expectedBootstrapSegments.size() == cacheManager.getObservedBootstrapSegmentsLoadedIntoPageCache().size() && + expectedBootstrapSegments.containsAll(cacheManager.getObservedBootstrapSegmentsLoadedIntoPageCache()) && + cacheManager.getObservedBootstrapSegmentsLoadedIntoPageCache().containsAll(expectedBootstrapSegments)); Assert.assertEquals(ImmutableList.of(), cacheManager.getObservedSegments()); Assert.assertEquals(ImmutableList.of(), cacheManager.getObservedSegmentsLoadedIntoPageCache()); @@ -270,10 +282,18 @@ public void testLoadBootstrapSegments() throws Exception final ImmutableList expectedBootstrapSegments = ImmutableList.copyOf(segments); - Assert.assertEquals(expectedBootstrapSegments, segmentAnnouncer.getObservedSegments()); + Assert.assertTrue(expectedBootstrapSegments.size() == segmentAnnouncer.getObservedSegments().size() && + expectedBootstrapSegments.containsAll(segmentAnnouncer.getObservedSegments()) && + segmentAnnouncer.getObservedSegments().containsAll(expectedBootstrapSegments)); + + Assert.assertTrue(expectedBootstrapSegments.size() == cacheManager.getObservedBootstrapSegments().size() && + expectedBootstrapSegments.containsAll(cacheManager.getObservedBootstrapSegments()) && + cacheManager.getObservedBootstrapSegments().containsAll(expectedBootstrapSegments)); + + Assert.assertTrue(expectedBootstrapSegments.size() == cacheManager.getObservedBootstrapSegmentsLoadedIntoPageCache().size() && + expectedBootstrapSegments.containsAll(cacheManager.getObservedBootstrapSegmentsLoadedIntoPageCache()) && + cacheManager.getObservedBootstrapSegmentsLoadedIntoPageCache().containsAll(expectedBootstrapSegments)); - Assert.assertEquals(expectedBootstrapSegments, cacheManager.getObservedBootstrapSegments()); - Assert.assertEquals(expectedBootstrapSegments, cacheManager.getObservedBootstrapSegmentsLoadedIntoPageCache()); serviceEmitter.verifyValue("segment/bootstrap/count", expectedBootstrapSegments.size()); serviceEmitter.verifyEmitted("segment/bootstrap/time", 1); diff --git a/server/src/test/java/org/apache/druid/server/coordination/TestSegmentCacheManager.java b/server/src/test/java/org/apache/druid/server/coordination/TestSegmentCacheManager.java index 2cd5e8e61feb..b7cce457f136 100644 --- a/server/src/test/java/org/apache/druid/server/coordination/TestSegmentCacheManager.java +++ b/server/src/test/java/org/apache/druid/server/coordination/TestSegmentCacheManager.java @@ -30,9 +30,9 @@ import org.apache.druid.timeline.DataSegment; import org.joda.time.Interval; -import java.util.ArrayList; import java.util.List; import java.util.Set; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicInteger; /** @@ -59,11 +59,15 @@ class TestSegmentCacheManager extends NoopSegmentCacheManager TestSegmentCacheManager(final Set segmentsToCache) { this.cachedSegments = ImmutableList.copyOf(segmentsToCache); - this.observedBootstrapSegments = new ArrayList<>(); - this.observedBootstrapSegmentsLoadedIntoPageCache = new ArrayList<>(); - this.observedSegments = new ArrayList<>(); - this.observedSegmentsLoadedIntoPageCache = new ArrayList<>(); - this.observedSegmentsRemovedFromCache = new ArrayList<>(); + + // While inneficient, these CopyOnWriteArrayList objects greatly simplify meeting the thread + // safety mandate from SegmentCacheManager. For testing, this should be ok. + this.observedBootstrapSegments = new CopyOnWriteArrayList<>(); + this.observedBootstrapSegmentsLoadedIntoPageCache = new CopyOnWriteArrayList<>(); + this.observedSegments = new CopyOnWriteArrayList<>(); + this.observedSegmentsLoadedIntoPageCache = new CopyOnWriteArrayList<>(); + this.observedSegmentsRemovedFromCache = new CopyOnWriteArrayList<>(); + this.observedShutdownBootstrapCount = new AtomicInteger(0); } From 896cc22f98f873daa0ebef5ecbd0d697ab4315d1 Mon Sep 17 00:00:00 2001 From: capistrant Date: Tue, 25 Mar 2025 11:17:38 -0500 Subject: [PATCH 2/6] Fix flaky KinesisIndexTask tests. Experienced local flakiness due to a race in checking for task status. Test tasks could have already moved to publishing before the wait has a chance to confirm task had entered reading state --- .../kinesis/KinesisIndexTaskTest.java | 20 ++++++++++++++++--- 1 file changed, 17 insertions(+), 3 deletions(-) diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java index eb82040a8133..690e08fb0d72 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java @@ -793,7 +793,7 @@ public void testRunWithMinimumMessageTime() throws Exception final ListenableFuture future = runTask(task); - waitUntil(task, this::isTaskReading); + waitUntil(task, this::isTaskReadingOrPublishing); // Wait for task to exit Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode()); @@ -856,7 +856,7 @@ public void testRunWithMaximumMessageTime() throws Exception final ListenableFuture future = runTask(task); - waitUntil(task, this::isTaskReading); + waitUntil(task, this::isTaskReadingOrPublishing); // Wait for task to exit Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode()); @@ -915,7 +915,7 @@ public void testRunWithTransformSpec() throws Exception ); final ListenableFuture future = runTask(task); - waitUntil(task, this::isTaskReading); + waitUntil(task, this::isTaskReadingOrPublishing); // Wait for task to exit Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode()); @@ -2461,6 +2461,20 @@ private boolean isTaskReading(KinesisIndexTask task) return task.getRunner().getStatus() == SeekableStreamIndexTaskRunner.Status.READING; } + /** + * Return true if specified task is in READING OR PUBLISHING state + * This helper method allows UTs to wait until the task associated with the test has started before continuing on with + * the test. Checking for PUBLISHING state in addition to READING prevents a race condition where a task could go + * publishing before the wait for READING state had started. + * @param task {@link KinesisIndexTask} having its state checked + * @return true if task is in READING or PUBLISHING state, otherwise false + */ + private boolean isTaskReadingOrPublishing(KinesisIndexTask task) + { + return task.getRunner().getStatus() == SeekableStreamIndexTaskRunner.Status.READING || + task.getRunner().getStatus() == SeekableStreamIndexTaskRunner.Status.PUBLISHING; + } + private static KinesisRecordEntity kjb( String timestamp, String dim1, From 07c24d0e1ff78ea483c5adb7a1d52790f1f97195 Mon Sep 17 00:00:00 2001 From: capistrant Date: Thu, 27 Mar 2025 11:47:30 -0500 Subject: [PATCH 3/6] Fix one additional flake from SegmentBootstrapperTest found after additional test runs --- .../coordination/SegmentBootstrapperTest.java | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/server/src/test/java/org/apache/druid/server/coordination/SegmentBootstrapperTest.java b/server/src/test/java/org/apache/druid/server/coordination/SegmentBootstrapperTest.java index fb60a2ea96e6..520c6d196966 100644 --- a/server/src/test/java/org/apache/druid/server/coordination/SegmentBootstrapperTest.java +++ b/server/src/test/java/org/apache/druid/server/coordination/SegmentBootstrapperTest.java @@ -413,10 +413,18 @@ public void testLoadOnlyRequiredBootstrapSegments() throws Exception final ImmutableList expectedBootstrapSegments = ImmutableList.of(ds1Segment2, ds1Segment1); - Assert.assertEquals(expectedBootstrapSegments, segmentAnnouncer.getObservedSegments()); + Assert.assertTrue(expectedBootstrapSegments.size() == segmentAnnouncer.getObservedSegments().size() && + expectedBootstrapSegments.containsAll(segmentAnnouncer.getObservedSegments()) && + segmentAnnouncer.getObservedSegments().containsAll(expectedBootstrapSegments)); + + Assert.assertTrue(expectedBootstrapSegments.size() == cacheManager.getObservedBootstrapSegments().size() && + expectedBootstrapSegments.containsAll(cacheManager.getObservedBootstrapSegments()) && + cacheManager.getObservedBootstrapSegments().containsAll(expectedBootstrapSegments)); + + Assert.assertTrue(expectedBootstrapSegments.size() == cacheManager.getObservedBootstrapSegmentsLoadedIntoPageCache().size() && + expectedBootstrapSegments.containsAll(cacheManager.getObservedBootstrapSegmentsLoadedIntoPageCache()) && + cacheManager.getObservedBootstrapSegmentsLoadedIntoPageCache().containsAll(expectedBootstrapSegments)); - Assert.assertEquals(expectedBootstrapSegments, cacheManager.getObservedBootstrapSegments()); - Assert.assertEquals(expectedBootstrapSegments, cacheManager.getObservedBootstrapSegmentsLoadedIntoPageCache()); serviceEmitter.verifyValue("segment/bootstrap/count", expectedBootstrapSegments.size()); serviceEmitter.verifyEmitted("segment/bootstrap/time", 1); From ab9162ff605b9671e89a1cc20a444b0a996b1bf8 Mon Sep 17 00:00:00 2001 From: capistrant Date: Thu, 27 Mar 2025 17:00:07 -0500 Subject: [PATCH 4/6] refactor asserts in SegmentBootstrapperTest after code review suggestions --- .../coordination/SegmentBootstrapperTest.java | 68 ++++++++----------- 1 file changed, 27 insertions(+), 41 deletions(-) diff --git a/server/src/test/java/org/apache/druid/server/coordination/SegmentBootstrapperTest.java b/server/src/test/java/org/apache/druid/server/coordination/SegmentBootstrapperTest.java index 520c6d196966..135df9b6c9b3 100644 --- a/server/src/test/java/org/apache/druid/server/coordination/SegmentBootstrapperTest.java +++ b/server/src/test/java/org/apache/druid/server/coordination/SegmentBootstrapperTest.java @@ -152,16 +152,10 @@ public void testStartStop() throws Exception final ImmutableList expectedBootstrapSegments = ImmutableList.copyOf(segments); - // The following verbose assert - seen throughout this test class - confirms list item equality irrespective of item order. - Assert.assertTrue(expectedBootstrapSegments.size() == segmentAnnouncer.getObservedSegments().size() && - expectedBootstrapSegments.containsAll(segmentAnnouncer.getObservedSegments()) && - segmentAnnouncer.getObservedSegments().containsAll(expectedBootstrapSegments)); - Assert.assertTrue(expectedBootstrapSegments.size() == cacheManager.getObservedBootstrapSegments().size() && - expectedBootstrapSegments.containsAll(cacheManager.getObservedBootstrapSegments()) && - cacheManager.getObservedBootstrapSegments().containsAll(expectedBootstrapSegments)); - Assert.assertTrue(expectedBootstrapSegments.size() == cacheManager.getObservedBootstrapSegmentsLoadedIntoPageCache().size() && - expectedBootstrapSegments.containsAll(cacheManager.getObservedBootstrapSegmentsLoadedIntoPageCache()) && - cacheManager.getObservedBootstrapSegmentsLoadedIntoPageCache().containsAll(expectedBootstrapSegments)); + assertUnsortedListsAreEqual(expectedBootstrapSegments, segmentAnnouncer.getObservedSegments()); + assertUnsortedListsAreEqual(expectedBootstrapSegments, cacheManager.getObservedBootstrapSegments()); + assertUnsortedListsAreEqual(expectedBootstrapSegments, cacheManager.getObservedBootstrapSegmentsLoadedIntoPageCache()); + Assert.assertEquals(ImmutableList.of(), cacheManager.getObservedSegments()); Assert.assertEquals(ImmutableList.of(), cacheManager.getObservedSegmentsLoadedIntoPageCache()); @@ -219,15 +213,11 @@ public void testLoadCachedSegments() throws Exception } final ImmutableList expectedBootstrapSegments = ImmutableList.copyOf(segments); - Assert.assertTrue(expectedBootstrapSegments.size() == segmentAnnouncer.getObservedSegments().size() && - expectedBootstrapSegments.containsAll(segmentAnnouncer.getObservedSegments()) && - segmentAnnouncer.getObservedSegments().containsAll(expectedBootstrapSegments)); - Assert.assertTrue(expectedBootstrapSegments.size() == cacheManager.getObservedBootstrapSegments().size() && - expectedBootstrapSegments.containsAll(cacheManager.getObservedBootstrapSegments()) && - cacheManager.getObservedBootstrapSegments().containsAll(expectedBootstrapSegments)); - Assert.assertTrue(expectedBootstrapSegments.size() == cacheManager.getObservedBootstrapSegmentsLoadedIntoPageCache().size() && - expectedBootstrapSegments.containsAll(cacheManager.getObservedBootstrapSegmentsLoadedIntoPageCache()) && - cacheManager.getObservedBootstrapSegmentsLoadedIntoPageCache().containsAll(expectedBootstrapSegments)); + + assertUnsortedListsAreEqual(expectedBootstrapSegments, segmentAnnouncer.getObservedSegments()); + assertUnsortedListsAreEqual(expectedBootstrapSegments, cacheManager.getObservedBootstrapSegments()); + assertUnsortedListsAreEqual(expectedBootstrapSegments, cacheManager.getObservedBootstrapSegmentsLoadedIntoPageCache()); + Assert.assertEquals(ImmutableList.of(), cacheManager.getObservedSegments()); Assert.assertEquals(ImmutableList.of(), cacheManager.getObservedSegmentsLoadedIntoPageCache()); @@ -282,17 +272,9 @@ public void testLoadBootstrapSegments() throws Exception final ImmutableList expectedBootstrapSegments = ImmutableList.copyOf(segments); - Assert.assertTrue(expectedBootstrapSegments.size() == segmentAnnouncer.getObservedSegments().size() && - expectedBootstrapSegments.containsAll(segmentAnnouncer.getObservedSegments()) && - segmentAnnouncer.getObservedSegments().containsAll(expectedBootstrapSegments)); - - Assert.assertTrue(expectedBootstrapSegments.size() == cacheManager.getObservedBootstrapSegments().size() && - expectedBootstrapSegments.containsAll(cacheManager.getObservedBootstrapSegments()) && - cacheManager.getObservedBootstrapSegments().containsAll(expectedBootstrapSegments)); - - Assert.assertTrue(expectedBootstrapSegments.size() == cacheManager.getObservedBootstrapSegmentsLoadedIntoPageCache().size() && - expectedBootstrapSegments.containsAll(cacheManager.getObservedBootstrapSegmentsLoadedIntoPageCache()) && - cacheManager.getObservedBootstrapSegmentsLoadedIntoPageCache().containsAll(expectedBootstrapSegments)); + assertUnsortedListsAreEqual(expectedBootstrapSegments, segmentAnnouncer.getObservedSegments()); + assertUnsortedListsAreEqual(expectedBootstrapSegments, cacheManager.getObservedBootstrapSegments()); + assertUnsortedListsAreEqual(expectedBootstrapSegments, cacheManager.getObservedBootstrapSegmentsLoadedIntoPageCache()); serviceEmitter.verifyValue("segment/bootstrap/count", expectedBootstrapSegments.size()); serviceEmitter.verifyEmitted("segment/bootstrap/time", 1); @@ -413,17 +395,9 @@ public void testLoadOnlyRequiredBootstrapSegments() throws Exception final ImmutableList expectedBootstrapSegments = ImmutableList.of(ds1Segment2, ds1Segment1); - Assert.assertTrue(expectedBootstrapSegments.size() == segmentAnnouncer.getObservedSegments().size() && - expectedBootstrapSegments.containsAll(segmentAnnouncer.getObservedSegments()) && - segmentAnnouncer.getObservedSegments().containsAll(expectedBootstrapSegments)); - - Assert.assertTrue(expectedBootstrapSegments.size() == cacheManager.getObservedBootstrapSegments().size() && - expectedBootstrapSegments.containsAll(cacheManager.getObservedBootstrapSegments()) && - cacheManager.getObservedBootstrapSegments().containsAll(expectedBootstrapSegments)); - - Assert.assertTrue(expectedBootstrapSegments.size() == cacheManager.getObservedBootstrapSegmentsLoadedIntoPageCache().size() && - expectedBootstrapSegments.containsAll(cacheManager.getObservedBootstrapSegmentsLoadedIntoPageCache()) && - cacheManager.getObservedBootstrapSegmentsLoadedIntoPageCache().containsAll(expectedBootstrapSegments)); + assertUnsortedListsAreEqual(expectedBootstrapSegments, segmentAnnouncer.getObservedSegments()); + assertUnsortedListsAreEqual(expectedBootstrapSegments, cacheManager.getObservedBootstrapSegments()); + assertUnsortedListsAreEqual(expectedBootstrapSegments, cacheManager.getObservedBootstrapSegmentsLoadedIntoPageCache()); serviceEmitter.verifyValue("segment/bootstrap/count", expectedBootstrapSegments.size()); serviceEmitter.verifyEmitted("segment/bootstrap/time", 1); @@ -468,4 +442,16 @@ public void testLoadBootstrapSegmentsWhenExceptionThrown() throws Exception bootstrapper.stop(); } + + /** + * Given two lists, assert they are equivalent and contain the same set of entries irrespecive of entry ordering + * @param expected The expected result list + * @param actual The actual result list + * @param + */ + private static void assertUnsortedListsAreEqual(List expected, List actual) + { + Assert.assertEquals(expected.size(), actual.size()); + Assert.assertEquals(Set.copyOf(expected), Set.copyOf(actual)); + } } From 45ee39a4e9ffecf972e208458548550a00c09b07 Mon Sep 17 00:00:00 2001 From: capistrant Date: Thu, 27 Mar 2025 17:50:17 -0500 Subject: [PATCH 5/6] Simplify the KinesisIndexTaskTests who wait on given task state Waiting for publishing is functionally equivalent and less verbose/confusing --- .../indexing/kinesis/KinesisIndexTaskTest.java | 18 +++++++----------- 1 file changed, 7 insertions(+), 11 deletions(-) diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java index 690e08fb0d72..fe1977d9971f 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java @@ -793,7 +793,7 @@ public void testRunWithMinimumMessageTime() throws Exception final ListenableFuture future = runTask(task); - waitUntil(task, this::isTaskReadingOrPublishing); + waitUntil(task, this::isTaskPublishing); // Wait for task to exit Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode()); @@ -856,7 +856,7 @@ public void testRunWithMaximumMessageTime() throws Exception final ListenableFuture future = runTask(task); - waitUntil(task, this::isTaskReadingOrPublishing); + waitUntil(task, this::isTaskPublishing); // Wait for task to exit Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode()); @@ -915,7 +915,7 @@ public void testRunWithTransformSpec() throws Exception ); final ListenableFuture future = runTask(task); - waitUntil(task, this::isTaskReadingOrPublishing); + waitUntil(task, this::isTaskPublishing); // Wait for task to exit Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode()); @@ -2462,17 +2462,13 @@ private boolean isTaskReading(KinesisIndexTask task) } /** - * Return true if specified task is in READING OR PUBLISHING state - * This helper method allows UTs to wait until the task associated with the test has started before continuing on with - * the test. Checking for PUBLISHING state in addition to READING prevents a race condition where a task could go - * publishing before the wait for READING state had started. + * Return true if specified task is in PUBLISHING state * @param task {@link KinesisIndexTask} having its state checked - * @return true if task is in READING or PUBLISHING state, otherwise false + * @return true if task is in PUBLISHING state, otherwise false */ - private boolean isTaskReadingOrPublishing(KinesisIndexTask task) + private boolean isTaskPublishing(KinesisIndexTask task) { - return task.getRunner().getStatus() == SeekableStreamIndexTaskRunner.Status.READING || - task.getRunner().getStatus() == SeekableStreamIndexTaskRunner.Status.PUBLISHING; + return task.getRunner().getStatus() == SeekableStreamIndexTaskRunner.Status.PUBLISHING; } private static KinesisRecordEntity kjb( From 7a1a6b6291993df43e05bd34008c84d8fd3ef500 Mon Sep 17 00:00:00 2001 From: capistrant Date: Thu, 27 Mar 2025 17:57:22 -0500 Subject: [PATCH 6/6] Complete javadoc for new method --- .../druid/server/coordination/SegmentBootstrapperTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/test/java/org/apache/druid/server/coordination/SegmentBootstrapperTest.java b/server/src/test/java/org/apache/druid/server/coordination/SegmentBootstrapperTest.java index 135df9b6c9b3..ed30e1aa3af1 100644 --- a/server/src/test/java/org/apache/druid/server/coordination/SegmentBootstrapperTest.java +++ b/server/src/test/java/org/apache/druid/server/coordination/SegmentBootstrapperTest.java @@ -447,7 +447,7 @@ public void testLoadBootstrapSegmentsWhenExceptionThrown() throws Exception * Given two lists, assert they are equivalent and contain the same set of entries irrespecive of entry ordering * @param expected The expected result list * @param actual The actual result list - * @param + * @param Object type stored in the list parameters */ private static void assertUnsortedListsAreEqual(List expected, List actual) {