diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java index eb82040a8133..fe1977d9971f 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java @@ -793,7 +793,7 @@ public void testRunWithMinimumMessageTime() throws Exception final ListenableFuture future = runTask(task); - waitUntil(task, this::isTaskReading); + waitUntil(task, this::isTaskPublishing); // Wait for task to exit Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode()); @@ -856,7 +856,7 @@ public void testRunWithMaximumMessageTime() throws Exception final ListenableFuture future = runTask(task); - waitUntil(task, this::isTaskReading); + waitUntil(task, this::isTaskPublishing); // Wait for task to exit Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode()); @@ -915,7 +915,7 @@ public void testRunWithTransformSpec() throws Exception ); final ListenableFuture future = runTask(task); - waitUntil(task, this::isTaskReading); + waitUntil(task, this::isTaskPublishing); // Wait for task to exit Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode()); @@ -2461,6 +2461,16 @@ private boolean isTaskReading(KinesisIndexTask task) return task.getRunner().getStatus() == SeekableStreamIndexTaskRunner.Status.READING; } + /** + * Return true if specified task is in PUBLISHING state + * @param task {@link KinesisIndexTask} having its state checked + * @return true if task is in PUBLISHING state, otherwise false + */ + private boolean isTaskPublishing(KinesisIndexTask task) + { + return task.getRunner().getStatus() == SeekableStreamIndexTaskRunner.Status.PUBLISHING; + } + private static KinesisRecordEntity kjb( String timestamp, String dim1, diff --git a/server/src/test/java/org/apache/druid/server/coordination/SegmentBootstrapperTest.java b/server/src/test/java/org/apache/druid/server/coordination/SegmentBootstrapperTest.java index fe1424e27005..ed30e1aa3af1 100644 --- a/server/src/test/java/org/apache/druid/server/coordination/SegmentBootstrapperTest.java +++ b/server/src/test/java/org/apache/druid/server/coordination/SegmentBootstrapperTest.java @@ -150,11 +150,12 @@ public void testStartStop() throws Exception Assert.assertEquals(2L, segmentManager.getDataSourceCounts().get("test_two" + i).longValue()); } - Assert.assertEquals(ImmutableList.copyOf(segments), segmentAnnouncer.getObservedSegments()); - final ImmutableList expectedBootstrapSegments = ImmutableList.copyOf(segments); - Assert.assertEquals(expectedBootstrapSegments, cacheManager.getObservedBootstrapSegments()); - Assert.assertEquals(expectedBootstrapSegments, cacheManager.getObservedBootstrapSegmentsLoadedIntoPageCache()); + + assertUnsortedListsAreEqual(expectedBootstrapSegments, segmentAnnouncer.getObservedSegments()); + assertUnsortedListsAreEqual(expectedBootstrapSegments, cacheManager.getObservedBootstrapSegments()); + assertUnsortedListsAreEqual(expectedBootstrapSegments, cacheManager.getObservedBootstrapSegmentsLoadedIntoPageCache()); + Assert.assertEquals(ImmutableList.of(), cacheManager.getObservedSegments()); Assert.assertEquals(ImmutableList.of(), cacheManager.getObservedSegmentsLoadedIntoPageCache()); @@ -211,11 +212,12 @@ public void testLoadCachedSegments() throws Exception Assert.assertEquals(2L, segmentManager.getDataSourceCounts().get("test_two" + i).longValue()); } - Assert.assertEquals(ImmutableList.copyOf(segments), segmentAnnouncer.getObservedSegments()); - final ImmutableList expectedBootstrapSegments = ImmutableList.copyOf(segments); - Assert.assertEquals(expectedBootstrapSegments, cacheManager.getObservedBootstrapSegments()); - Assert.assertEquals(expectedBootstrapSegments, cacheManager.getObservedBootstrapSegmentsLoadedIntoPageCache()); + + assertUnsortedListsAreEqual(expectedBootstrapSegments, segmentAnnouncer.getObservedSegments()); + assertUnsortedListsAreEqual(expectedBootstrapSegments, cacheManager.getObservedBootstrapSegments()); + assertUnsortedListsAreEqual(expectedBootstrapSegments, cacheManager.getObservedBootstrapSegmentsLoadedIntoPageCache()); + Assert.assertEquals(ImmutableList.of(), cacheManager.getObservedSegments()); Assert.assertEquals(ImmutableList.of(), cacheManager.getObservedSegmentsLoadedIntoPageCache()); @@ -270,10 +272,10 @@ public void testLoadBootstrapSegments() throws Exception final ImmutableList expectedBootstrapSegments = ImmutableList.copyOf(segments); - Assert.assertEquals(expectedBootstrapSegments, segmentAnnouncer.getObservedSegments()); + assertUnsortedListsAreEqual(expectedBootstrapSegments, segmentAnnouncer.getObservedSegments()); + assertUnsortedListsAreEqual(expectedBootstrapSegments, cacheManager.getObservedBootstrapSegments()); + assertUnsortedListsAreEqual(expectedBootstrapSegments, cacheManager.getObservedBootstrapSegmentsLoadedIntoPageCache()); - Assert.assertEquals(expectedBootstrapSegments, cacheManager.getObservedBootstrapSegments()); - Assert.assertEquals(expectedBootstrapSegments, cacheManager.getObservedBootstrapSegmentsLoadedIntoPageCache()); serviceEmitter.verifyValue("segment/bootstrap/count", expectedBootstrapSegments.size()); serviceEmitter.verifyEmitted("segment/bootstrap/time", 1); @@ -393,10 +395,10 @@ public void testLoadOnlyRequiredBootstrapSegments() throws Exception final ImmutableList expectedBootstrapSegments = ImmutableList.of(ds1Segment2, ds1Segment1); - Assert.assertEquals(expectedBootstrapSegments, segmentAnnouncer.getObservedSegments()); + assertUnsortedListsAreEqual(expectedBootstrapSegments, segmentAnnouncer.getObservedSegments()); + assertUnsortedListsAreEqual(expectedBootstrapSegments, cacheManager.getObservedBootstrapSegments()); + assertUnsortedListsAreEqual(expectedBootstrapSegments, cacheManager.getObservedBootstrapSegmentsLoadedIntoPageCache()); - Assert.assertEquals(expectedBootstrapSegments, cacheManager.getObservedBootstrapSegments()); - Assert.assertEquals(expectedBootstrapSegments, cacheManager.getObservedBootstrapSegmentsLoadedIntoPageCache()); serviceEmitter.verifyValue("segment/bootstrap/count", expectedBootstrapSegments.size()); serviceEmitter.verifyEmitted("segment/bootstrap/time", 1); @@ -440,4 +442,16 @@ public void testLoadBootstrapSegmentsWhenExceptionThrown() throws Exception bootstrapper.stop(); } + + /** + * Given two lists, assert they are equivalent and contain the same set of entries irrespecive of entry ordering + * @param expected The expected result list + * @param actual The actual result list + * @param Object type stored in the list parameters + */ + private static void assertUnsortedListsAreEqual(List expected, List actual) + { + Assert.assertEquals(expected.size(), actual.size()); + Assert.assertEquals(Set.copyOf(expected), Set.copyOf(actual)); + } } diff --git a/server/src/test/java/org/apache/druid/server/coordination/TestSegmentCacheManager.java b/server/src/test/java/org/apache/druid/server/coordination/TestSegmentCacheManager.java index 2cd5e8e61feb..b7cce457f136 100644 --- a/server/src/test/java/org/apache/druid/server/coordination/TestSegmentCacheManager.java +++ b/server/src/test/java/org/apache/druid/server/coordination/TestSegmentCacheManager.java @@ -30,9 +30,9 @@ import org.apache.druid.timeline.DataSegment; import org.joda.time.Interval; -import java.util.ArrayList; import java.util.List; import java.util.Set; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicInteger; /** @@ -59,11 +59,15 @@ class TestSegmentCacheManager extends NoopSegmentCacheManager TestSegmentCacheManager(final Set segmentsToCache) { this.cachedSegments = ImmutableList.copyOf(segmentsToCache); - this.observedBootstrapSegments = new ArrayList<>(); - this.observedBootstrapSegmentsLoadedIntoPageCache = new ArrayList<>(); - this.observedSegments = new ArrayList<>(); - this.observedSegmentsLoadedIntoPageCache = new ArrayList<>(); - this.observedSegmentsRemovedFromCache = new ArrayList<>(); + + // While inneficient, these CopyOnWriteArrayList objects greatly simplify meeting the thread + // safety mandate from SegmentCacheManager. For testing, this should be ok. + this.observedBootstrapSegments = new CopyOnWriteArrayList<>(); + this.observedBootstrapSegmentsLoadedIntoPageCache = new CopyOnWriteArrayList<>(); + this.observedSegments = new CopyOnWriteArrayList<>(); + this.observedSegmentsLoadedIntoPageCache = new CopyOnWriteArrayList<>(); + this.observedSegmentsRemovedFromCache = new CopyOnWriteArrayList<>(); + this.observedShutdownBootstrapCount = new AtomicInteger(0); }