From 6a8369227dcd4a0bae6d428193affb761edb4fa6 Mon Sep 17 00:00:00 2001 From: cecemei Date: Fri, 13 Mar 2026 16:30:01 -0700 Subject: [PATCH 1/6] test-flaky --- .../compact/CompactionSupervisorTest.java | 58 +++++++++---------- 1 file changed, 29 insertions(+), 29 deletions(-) diff --git a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/compact/CompactionSupervisorTest.java b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/compact/CompactionSupervisorTest.java index 2c68c0eeab6d..7421e442dfba 100644 --- a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/compact/CompactionSupervisorTest.java +++ b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/compact/CompactionSupervisorTest.java @@ -240,8 +240,7 @@ public void test_minorCompactionWithMSQ(PartitionsSpec partitionsSpec) throws Ex .get() .withDataSchema(schema -> schema.withTimestamp(new TimestampSpec("timestamp", "iso", null)) .withDimensions(DimensionsSpec.builder().useSchemaDiscovery(true).build())) - .withTuningConfig(tuningConfig -> tuningConfig.withMaxRowsPerSegment(1)) - .withIoConfig(ioConfig -> ioConfig.withConsumerProperties(kafkaServer.consumerProperties()).withTaskCount(2)); + .withIoConfig(ioConfig -> ioConfig.withConsumerProperties(kafkaServer.consumerProperties())); // Set up first topic and supervisor final String topic1 = IdUtils.getRandomId(); @@ -249,11 +248,15 @@ public void test_minorCompactionWithMSQ(PartitionsSpec partitionsSpec) throws Ex final KafkaSupervisorSpec supervisor1 = kafkaSupervisorSpecBuilder.withId(topic1).build(dataSource, topic1); cluster.callApi().postSupervisor(supervisor1); - final int totalRowCount = publish1kRecords(topic1, true) + publish1kRecords(topic1, false); - waitUntilPublishedRecordsAreIngested(totalRowCount); + int totalRows = publish1kRecords(topic1); + waitUntilPublishedRecordsAreIngested(totalRows); - // Before compaction - Assertions.assertEquals(4, getNumSegmentsWith(Granularities.HOUR)); + totalRows += publish1kRecords(topic1); + waitUntilPublishedRecordsAreIngested(totalRows); + + // Before compaction, ingestion generates 2 segments, sometimes 3 if ingestion happens cross hourly boundary + Assertions.assertTrue(getNumSegmentsWith(Granularities.HOUR) >= 2); + Assertions.assertEquals(totalRows, getTotalRowCount()); // Create a compaction config with DAY granularity InlineSchemaDataSourceCompactionConfig dayGranularityConfig = @@ -278,18 +281,16 @@ public void test_minorCompactionWithMSQ(PartitionsSpec partitionsSpec) throws Ex pauseCompaction(dayGranularityConfig); Assertions.assertEquals(0, getNumSegmentsWith(Granularities.HOUR)); Assertions.assertEquals(1, getNumSegmentsWith(Granularities.DAY)); - Assertions.assertEquals(2000, getTotalRowCount()); + Assertions.assertEquals(totalRows, getTotalRowCount()); verifyCompactedSegmentsHaveFingerprints(dayGranularityConfig); - // published another 1k - final int appendedRowCount = publish1kRecords(topic1, true); - indexer.latchableEmitter().flush(); - waitUntilPublishedRecordsAreIngested(appendedRowCount); + // published another 2k + totalRows += publish1kRecords(topic1); + waitUntilPublishedRecordsAreIngested(totalRows); - // Tear down both topics and supervisors - kafkaServer.deleteTopic(topic1); - cluster.callApi().postSupervisor(supervisor1.createSuspendedSpec()); + totalRows += publish1kRecords(topic1); + waitUntilPublishedRecordsAreIngested(totalRows); long totalUsed = overlord.latchableEmitter().getMetricValues( "segment/metadataCache/used/count", @@ -297,9 +298,9 @@ public void test_minorCompactionWithMSQ(PartitionsSpec partitionsSpec) throws Ex ).stream().reduce((first, second) -> second).orElse(0).longValue(); Assertions.assertEquals(0, getNumSegmentsWith(Granularities.HOUR)); - // 1 compacted segment + 2 appended segment - Assertions.assertEquals(3, getNumSegmentsWith(Granularities.DAY)); - Assertions.assertEquals(3000, getTotalRowCount()); + // 1 compacted segment + 2 appended segments (sometimes 3 appended segments if ingestion happens cross hourly boundary) + Assertions.assertTrue(getNumSegmentsWith(Granularities.DAY) >= 3); + Assertions.assertEquals(totalRows, getTotalRowCount()); runCompactionWithSpec(dayGranularityConfig); waitForAllCompactionTasksToFinish(); @@ -310,9 +311,13 @@ public void test_minorCompactionWithMSQ(PartitionsSpec partitionsSpec) throws Ex .hasDimension(DruidMetrics.DATASOURCE, dataSource) .hasValueMatching(Matchers.greaterThan(totalUsed))); - // performed minor compaction: 1 previously compacted segment + 1 incrementally compacted segment + // performed minor compaction: 1 previously compacted segment + 1 recently compacted segment from minor compaction Assertions.assertEquals(2, getNumSegmentsWith(Granularities.DAY)); - Assertions.assertEquals(3000, getTotalRowCount()); + Assertions.assertEquals(totalRows, getTotalRowCount()); + + // Tear down kafka + kafkaServer.deleteTopic(topic1); + cluster.callApi().postSupervisor(supervisor1.createSuspendedSpec()); } protected void waitUntilPublishedRecordsAreIngested(int expectedRowCount) @@ -332,22 +337,17 @@ protected void waitUntilPublishedRecordsAreIngested(int expectedRowCount) Assertions.assertEquals(expectedRowCount, totalEventsProcessed); } - protected int publish1kRecords(String topic, boolean useTransactions) + protected int publish1kRecords(String topic) { final EventSerializer serializer = new JsonEventSerializer(overlord.bindings().jsonMapper()); - final StreamGenerator streamGenerator = new WikipediaStreamEventStreamGenerator(serializer, 100, 100); - List records = streamGenerator.generateEvents(10); + final StreamGenerator streamGenerator = new WikipediaStreamEventStreamGenerator(serializer, 500, 100); + List records = streamGenerator.generateEvents(2); ArrayList> producerRecords = new ArrayList<>(); for (byte[] record : records) { producerRecords.add(new ProducerRecord<>(topic, record)); } - - if (useTransactions) { - kafkaServer.produceRecordsToTopic(producerRecords); - } else { - kafkaServer.produceRecordsWithoutTransaction(producerRecords); - } + kafkaServer.produceRecordsToTopic(producerRecords); return producerRecords.size(); } @@ -859,7 +859,7 @@ private void runIngestionAtGranularity( public static List getPartitionsSpec() { return List.of( - new DimensionRangePartitionsSpec(null, 5000, List.of("page"), false), + new DimensionRangePartitionsSpec(null, 10_000, List.of("page"), false), new DynamicPartitionsSpec(null, null) ); } From e8d7a0eae509b433c03061a1585869c5fe53a1ac Mon Sep 17 00:00:00 2001 From: cecemei Date: Sun, 15 Mar 2026 14:42:12 -0700 Subject: [PATCH 2/6] processed --- .../embedded/compact/CompactionSupervisorTest.java | 10 ++-------- 1 file changed, 2 insertions(+), 8 deletions(-) diff --git a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/compact/CompactionSupervisorTest.java b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/compact/CompactionSupervisorTest.java index 7421e442dfba..9086208ad301 100644 --- a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/compact/CompactionSupervisorTest.java +++ b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/compact/CompactionSupervisorTest.java @@ -322,19 +322,13 @@ public void test_minorCompactionWithMSQ(PartitionsSpec partitionsSpec) throws Ex protected void waitUntilPublishedRecordsAreIngested(int expectedRowCount) { + // sometimes processed events could be greater than expectedRowCount, + // because some events can be processed twice but only persisted once. indexer.latchableEmitter().waitForEventAggregate( event -> event.hasMetricName("ingest/events/processed") .hasDimension(DruidMetrics.DATASOURCE, dataSource), agg -> agg.hasSumAtLeast(expectedRowCount) ); - - final int totalEventsProcessed = indexer - .latchableEmitter() - .getMetricValues("ingest/events/processed", Map.of(DruidMetrics.DATASOURCE, dataSource)) - .stream() - .mapToInt(Number::intValue) - .sum(); - Assertions.assertEquals(expectedRowCount, totalEventsProcessed); } protected int publish1kRecords(String topic) From cec4cc229e47200a00f6842ab9acc25946759e2b Mon Sep 17 00:00:00 2001 From: cecemei Date: Mon, 16 Mar 2026 16:06:18 -0700 Subject: [PATCH 3/6] batch-ingest --- .../compact/CompactionSupervisorTest.java | 131 +++++++++--------- 1 file changed, 64 insertions(+), 67 deletions(-) diff --git a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/compact/CompactionSupervisorTest.java b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/compact/CompactionSupervisorTest.java index 9086208ad301..825e5da5e048 100644 --- a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/compact/CompactionSupervisorTest.java +++ b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/compact/CompactionSupervisorTest.java @@ -23,6 +23,8 @@ import org.apache.druid.catalog.guice.CatalogCoordinatorModule; import org.apache.druid.common.utils.IdUtils; import org.apache.druid.data.input.impl.DimensionsSpec; +import org.apache.druid.data.input.impl.InlineInputSource; +import org.apache.druid.data.input.impl.JsonInputFormat; import org.apache.druid.data.input.impl.StringDimensionSchema; import org.apache.druid.data.input.impl.TimestampSpec; import org.apache.druid.indexer.CompactionEngine; @@ -32,12 +34,12 @@ import org.apache.druid.indexer.partitions.PartitionsSpec; import org.apache.druid.indexing.common.task.IndexTask; import org.apache.druid.indexing.common.task.TaskBuilder; +import org.apache.druid.indexing.common.task.TuningConfigBuilder; +import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexIOConfig; +import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexIngestionSpec; +import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexSupervisorTask; import org.apache.druid.indexing.compact.CascadingReindexingTemplate; import org.apache.druid.indexing.compact.CompactionSupervisorSpec; -import org.apache.druid.indexing.kafka.KafkaIndexTaskModule; -import org.apache.druid.indexing.kafka.simulate.KafkaResource; -import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorSpec; -import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorSpecBuilder; import org.apache.druid.indexing.overlord.Segments; import org.apache.druid.jackson.DefaultObjectMapper; import org.apache.druid.java.util.common.DateTimes; @@ -55,6 +57,7 @@ import org.apache.druid.rpc.UpdateResponse; import org.apache.druid.segment.VirtualColumns; import org.apache.druid.segment.column.ColumnType; +import org.apache.druid.segment.indexing.DataSchema; import org.apache.druid.segment.metadata.DefaultIndexingStateFingerprintMapper; import org.apache.druid.segment.metadata.IndexingStateCache; import org.apache.druid.segment.metadata.IndexingStateFingerprintMapper; @@ -76,6 +79,7 @@ import org.apache.druid.server.metrics.LatchableEmitter; import org.apache.druid.server.metrics.StorageMonitor; import org.apache.druid.testing.embedded.EmbeddedBroker; +import org.apache.druid.testing.embedded.EmbeddedClusterApis; import org.apache.druid.testing.embedded.EmbeddedCoordinator; import org.apache.druid.testing.embedded.EmbeddedDruidCluster; import org.apache.druid.testing.embedded.EmbeddedHistorical; @@ -89,7 +93,6 @@ import org.apache.druid.testing.tools.StreamGenerator; import org.apache.druid.testing.tools.WikipediaStreamEventStreamGenerator; import org.apache.druid.timeline.DataSegment; -import org.apache.kafka.clients.producer.ProducerRecord; import org.hamcrest.Matcher; import org.hamcrest.Matchers; import org.joda.time.DateTime; @@ -101,6 +104,7 @@ import org.junit.jupiter.params.provider.MethodSource; import javax.annotation.Nullable; +import java.nio.charset.StandardCharsets; import java.time.Duration; import java.util.ArrayList; import java.util.List; @@ -112,7 +116,6 @@ */ public class CompactionSupervisorTest extends EmbeddedClusterTestBase { - private final KafkaResource kafkaServer = new KafkaResource(); private final EmbeddedBroker broker = new EmbeddedBroker(); private final EmbeddedIndexer indexer = new EmbeddedIndexer() .setServerMemory(2_000_000_000L) @@ -138,12 +141,7 @@ public EmbeddedDruidCluster createCluster() "[\"org.apache.druid.query.policy.NoRestrictionPolicy\"]" ) .addCommonProperty("druid.policy.enforcer.type", "restrictAllTables") - .addExtensions( - CatalogClientModule.class, - CatalogCoordinatorModule.class, - KafkaIndexTaskModule.class - ) - .addResource(kafkaServer) + .addExtensions(CatalogClientModule.class, CatalogCoordinatorModule.class) .addServer(coordinator) .addServer(overlord) .addServer(indexer) @@ -156,7 +154,14 @@ public EmbeddedDruidCluster createCluster() private void configureCompaction(CompactionEngine compactionEngine, @Nullable CompactionCandidateSearchPolicy policy) { final UpdateResponse updateResponse = cluster.callApi().onLeaderOverlord( - o -> o.updateClusterCompactionConfig(new ClusterCompactionConfig(1.0, 100, policy, true, compactionEngine, true)) + o -> o.updateClusterCompactionConfig(new ClusterCompactionConfig( + 1.0, + 100, + policy, + true, + compactionEngine, + true + )) ); Assertions.assertTrue(updateResponse.isSuccess()); } @@ -236,27 +241,14 @@ public void test_minorCompactionWithMSQ(PartitionsSpec partitionsSpec) throws Ex CompactionEngine.MSQ, new MostFragmentedIntervalFirstPolicy(2, new HumanReadableBytes("1KiB"), null, 80, null) ); - KafkaSupervisorSpecBuilder kafkaSupervisorSpecBuilder = MoreResources.Supervisor.KAFKA_JSON - .get() - .withDataSchema(schema -> schema.withTimestamp(new TimestampSpec("timestamp", "iso", null)) - .withDimensions(DimensionsSpec.builder().useSchemaDiscovery(true).build())) - .withIoConfig(ioConfig -> ioConfig.withConsumerProperties(kafkaServer.consumerProperties())); - - // Set up first topic and supervisor - final String topic1 = IdUtils.getRandomId(); - kafkaServer.createTopicWithPartitions(topic1, 1); - final KafkaSupervisorSpec supervisor1 = kafkaSupervisorSpecBuilder.withId(topic1).build(dataSource, topic1); - cluster.callApi().postSupervisor(supervisor1); - int totalRows = publish1kRecords(topic1); - waitUntilPublishedRecordsAreIngested(totalRows); + ingest1kRecords(); + ingest1kRecords(); - totalRows += publish1kRecords(topic1); - waitUntilPublishedRecordsAreIngested(totalRows); - - // Before compaction, ingestion generates 2 segments, sometimes 3 if ingestion happens cross hourly boundary - Assertions.assertTrue(getNumSegmentsWith(Granularities.HOUR) >= 2); - Assertions.assertEquals(totalRows, getTotalRowCount()); + broker.latchableEmitter().waitForNextEvent(event -> event.hasMetricName("segment/metadataCache/sync/time")); + overlord.latchableEmitter().waitForNextEvent(event -> event.hasMetricName("segment/metadataCache/sync/time")); + Assertions.assertEquals(2, getNumSegmentsWith(Granularities.DAY)); + Assertions.assertEquals(2000, getTotalRowCount()); // Create a compaction config with DAY granularity InlineSchemaDataSourceCompactionConfig dayGranularityConfig = @@ -279,29 +271,28 @@ public void test_minorCompactionWithMSQ(PartitionsSpec partitionsSpec) throws Ex waitForAllCompactionTasksToFinish(); pauseCompaction(dayGranularityConfig); - Assertions.assertEquals(0, getNumSegmentsWith(Granularities.HOUR)); + + broker.latchableEmitter().waitForNextEvent(event -> event.hasMetricName("segment/metadataCache/sync/time")); + overlord.latchableEmitter().waitForNextEvent(event -> event.hasMetricName("segment/metadataCache/sync/time")); Assertions.assertEquals(1, getNumSegmentsWith(Granularities.DAY)); - Assertions.assertEquals(totalRows, getTotalRowCount()); + Assertions.assertEquals(2000, getTotalRowCount()); verifyCompactedSegmentsHaveFingerprints(dayGranularityConfig); - // published another 2k - totalRows += publish1kRecords(topic1); - waitUntilPublishedRecordsAreIngested(totalRows); + // ingest another 2k + ingest1kRecords(); + ingest1kRecords(); - totalRows += publish1kRecords(topic1); - waitUntilPublishedRecordsAreIngested(totalRows); + broker.latchableEmitter().waitForNextEvent(event -> event.hasMetricName("segment/metadataCache/sync/time")); + overlord.latchableEmitter().waitForNextEvent(event -> event.hasMetricName("segment/metadataCache/sync/time")); + Assertions.assertEquals(3, getNumSegmentsWith(Granularities.DAY)); + Assertions.assertEquals(4000, getTotalRowCount()); long totalUsed = overlord.latchableEmitter().getMetricValues( "segment/metadataCache/used/count", Map.of(DruidMetrics.DATASOURCE, dataSource) ).stream().reduce((first, second) -> second).orElse(0).longValue(); - Assertions.assertEquals(0, getNumSegmentsWith(Granularities.HOUR)); - // 1 compacted segment + 2 appended segments (sometimes 3 appended segments if ingestion happens cross hourly boundary) - Assertions.assertTrue(getNumSegmentsWith(Granularities.DAY) >= 3); - Assertions.assertEquals(totalRows, getTotalRowCount()); - runCompactionWithSpec(dayGranularityConfig); waitForAllCompactionTasksToFinish(); @@ -313,36 +304,42 @@ public void test_minorCompactionWithMSQ(PartitionsSpec partitionsSpec) throws Ex // performed minor compaction: 1 previously compacted segment + 1 recently compacted segment from minor compaction Assertions.assertEquals(2, getNumSegmentsWith(Granularities.DAY)); - Assertions.assertEquals(totalRows, getTotalRowCount()); - - // Tear down kafka - kafkaServer.deleteTopic(topic1); - cluster.callApi().postSupervisor(supervisor1.createSuspendedSpec()); + Assertions.assertEquals(4000, getTotalRowCount()); } - protected void waitUntilPublishedRecordsAreIngested(int expectedRowCount) - { - // sometimes processed events could be greater than expectedRowCount, - // because some events can be processed twice but only persisted once. - indexer.latchableEmitter().waitForEventAggregate( - event -> event.hasMetricName("ingest/events/processed") - .hasDimension(DruidMetrics.DATASOURCE, dataSource), - agg -> agg.hasSumAtLeast(expectedRowCount) - ); - } - - protected int publish1kRecords(String topic) + protected void ingest1kRecords() { final EventSerializer serializer = new JsonEventSerializer(overlord.bindings().jsonMapper()); final StreamGenerator streamGenerator = new WikipediaStreamEventStreamGenerator(serializer, 500, 100); List records = streamGenerator.generateEvents(2); - ArrayList> producerRecords = new ArrayList<>(); - for (byte[] record : records) { - producerRecords.add(new ProducerRecord<>(topic, record)); - } - kafkaServer.produceRecordsToTopic(producerRecords); - return producerRecords.size(); + final InlineInputSource input = new InlineInputSource( + records.stream().map(b -> new String(b, StandardCharsets.UTF_8)).collect(Collectors.joining("\n"))); + final ParallelIndexIOConfig ioConfig = new ParallelIndexIOConfig( + input, + new JsonInputFormat(null, null, null, null, null), + true, + null + ); + final ParallelIndexIngestionSpec indexIngestionSpec = new ParallelIndexIngestionSpec( + DataSchema.builder() + .withDataSource(dataSource) + .withTimestamp(new TimestampSpec("timestamp", "iso", null)) + .withDimensions(DimensionsSpec.builder().useSchemaDiscovery(true).build()) + .build(), + ioConfig, + TuningConfigBuilder.forParallelIndexTask().build() + ); + final String taskId = EmbeddedClusterApis.newTaskId(dataSource); + final ParallelIndexSupervisorTask task = new ParallelIndexSupervisorTask( + taskId, + null, + null, + indexIngestionSpec, + null + ); + cluster.callApi().submitTask(task); + cluster.callApi().waitForTaskToSucceed(taskId, overlord); } @MethodSource("getEngine") From 1f1ca3d7d46b36a84c16c5b8b81af31fefc9be81 Mon Sep 17 00:00:00 2001 From: cecemei Date: Mon, 16 Mar 2026 19:56:06 -0700 Subject: [PATCH 4/6] flaky --- .../compact/CompactionSupervisorTest.java | 32 +++++++++++++++++-- 1 file changed, 29 insertions(+), 3 deletions(-) diff --git a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/compact/CompactionSupervisorTest.java b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/compact/CompactionSupervisorTest.java index 825e5da5e048..dce531ff1598 100644 --- a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/compact/CompactionSupervisorTest.java +++ b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/compact/CompactionSupervisorTest.java @@ -89,10 +89,12 @@ import org.apache.druid.testing.embedded.indexing.MoreResources; import org.apache.druid.testing.embedded.junit5.EmbeddedClusterTestBase; import org.apache.druid.testing.tools.EventSerializer; +import org.apache.druid.testing.tools.ITRetryUtil; import org.apache.druid.testing.tools.JsonEventSerializer; import org.apache.druid.testing.tools.StreamGenerator; import org.apache.druid.testing.tools.WikipediaStreamEventStreamGenerator; import org.apache.druid.timeline.DataSegment; +import org.apache.druid.timeline.SegmentId; import org.hamcrest.Matcher; import org.hamcrest.Matchers; import org.joda.time.DateTime; @@ -245,8 +247,8 @@ public void test_minorCompactionWithMSQ(PartitionsSpec partitionsSpec) throws Ex ingest1kRecords(); ingest1kRecords(); - broker.latchableEmitter().waitForNextEvent(event -> event.hasMetricName("segment/metadataCache/sync/time")); overlord.latchableEmitter().waitForNextEvent(event -> event.hasMetricName("segment/metadataCache/sync/time")); + waitSegmentsAvailableInBroker(); Assertions.assertEquals(2, getNumSegmentsWith(Granularities.DAY)); Assertions.assertEquals(2000, getTotalRowCount()); @@ -272,8 +274,8 @@ public void test_minorCompactionWithMSQ(PartitionsSpec partitionsSpec) throws Ex pauseCompaction(dayGranularityConfig); - broker.latchableEmitter().waitForNextEvent(event -> event.hasMetricName("segment/metadataCache/sync/time")); overlord.latchableEmitter().waitForNextEvent(event -> event.hasMetricName("segment/metadataCache/sync/time")); + waitSegmentsAvailableInBroker(); Assertions.assertEquals(1, getNumSegmentsWith(Granularities.DAY)); Assertions.assertEquals(2000, getTotalRowCount()); @@ -283,8 +285,8 @@ public void test_minorCompactionWithMSQ(PartitionsSpec partitionsSpec) throws Ex ingest1kRecords(); ingest1kRecords(); - broker.latchableEmitter().waitForNextEvent(event -> event.hasMetricName("segment/metadataCache/sync/time")); overlord.latchableEmitter().waitForNextEvent(event -> event.hasMetricName("segment/metadataCache/sync/time")); + waitSegmentsAvailableInBroker(); Assertions.assertEquals(3, getNumSegmentsWith(Granularities.DAY)); Assertions.assertEquals(4000, getTotalRowCount()); @@ -833,6 +835,30 @@ private int getNumSegmentsWith(Granularity granularity) .count(); } + private void waitSegmentsAvailableInBroker() + { + List segments = overlord + .bindings() + .segmentsMetadataStorage() + .retrieveAllUsedSegments(dataSource, Segments.ONLY_VISIBLE) + .stream() + .map(DataSegment::getId) + .map(SegmentId::toString) + .map(s -> StringUtils.format("'%s'", s)) + .toList(); + + ITRetryUtil.retryUntilEquals( + () -> + Numbers.parseInt(cluster.callApi() + .runSql( + "select count(*) from sys.segments where segment_id in (%s)", + String.join(", ", segments) + )), + segments.size(), + "wait until segments are available in broker" + ); + } + private void runIngestionAtGranularity( String granularity, String inlineDataCsv From 789de9db62d5742ebe3c5de63de844adefe0ca13 Mon Sep 17 00:00:00 2001 From: cecemei Date: Mon, 16 Mar 2026 20:44:07 -0700 Subject: [PATCH 5/6] flaky --- .../compact/CompactionSupervisorTest.java | 26 ++++++++++++------- 1 file changed, 16 insertions(+), 10 deletions(-) diff --git a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/compact/CompactionSupervisorTest.java b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/compact/CompactionSupervisorTest.java index dce531ff1598..9f72f1151cbb 100644 --- a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/compact/CompactionSupervisorTest.java +++ b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/compact/CompactionSupervisorTest.java @@ -21,6 +21,8 @@ import org.apache.druid.catalog.guice.CatalogClientModule; import org.apache.druid.catalog.guice.CatalogCoordinatorModule; +import org.apache.druid.client.BrokerServerView; +import org.apache.druid.client.selector.ServerSelector; import org.apache.druid.common.utils.IdUtils; import org.apache.druid.data.input.impl.DimensionsSpec; import org.apache.druid.data.input.impl.InlineInputSource; @@ -51,6 +53,7 @@ import org.apache.druid.java.util.common.granularity.Granularity; import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; import org.apache.druid.query.DruidMetrics; +import org.apache.druid.query.TableDataSource; import org.apache.druid.query.expression.TestExprMacroTable; import org.apache.druid.query.filter.EqualityFilter; import org.apache.druid.query.filter.NotDimFilter; @@ -111,6 +114,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.stream.Collectors; /** @@ -837,24 +841,26 @@ private int getNumSegmentsWith(Granularity granularity) private void waitSegmentsAvailableInBroker() { - List segments = overlord + Set segments = overlord .bindings() .segmentsMetadataStorage() .retrieveAllUsedSegments(dataSource, Segments.ONLY_VISIBLE) .stream() .map(DataSegment::getId) - .map(SegmentId::toString) - .map(s -> StringUtils.format("'%s'", s)) - .toList(); + .collect(Collectors.toSet()); ITRetryUtil.retryUntilEquals( () -> - Numbers.parseInt(cluster.callApi() - .runSql( - "select count(*) from sys.segments where segment_id in (%s)", - String.join(", ", segments) - )), - segments.size(), + broker.bindings() + .getInstance(BrokerServerView.class) + .getTimeline(TableDataSource.create(dataSource)) + .get() + .iterateAllObjects() + .stream() + .map(ServerSelector::getSegment) + .map(DataSegment::getId) + .collect(Collectors.toSet()).containsAll(segments), + true, "wait until segments are available in broker" ); } From 07d7e137ea741d015ff48482ca4a69877387e903 Mon Sep 17 00:00:00 2001 From: cecemei Date: Tue, 17 Mar 2026 10:53:03 -0700 Subject: [PATCH 6/6] comment --- .../compact/CompactionSupervisorTest.java | 38 ++----------------- 1 file changed, 3 insertions(+), 35 deletions(-) diff --git a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/compact/CompactionSupervisorTest.java b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/compact/CompactionSupervisorTest.java index 9f72f1151cbb..bc9f72365309 100644 --- a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/compact/CompactionSupervisorTest.java +++ b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/compact/CompactionSupervisorTest.java @@ -21,8 +21,6 @@ import org.apache.druid.catalog.guice.CatalogClientModule; import org.apache.druid.catalog.guice.CatalogCoordinatorModule; -import org.apache.druid.client.BrokerServerView; -import org.apache.druid.client.selector.ServerSelector; import org.apache.druid.common.utils.IdUtils; import org.apache.druid.data.input.impl.DimensionsSpec; import org.apache.druid.data.input.impl.InlineInputSource; @@ -53,7 +51,6 @@ import org.apache.druid.java.util.common.granularity.Granularity; import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; import org.apache.druid.query.DruidMetrics; -import org.apache.druid.query.TableDataSource; import org.apache.druid.query.expression.TestExprMacroTable; import org.apache.druid.query.filter.EqualityFilter; import org.apache.druid.query.filter.NotDimFilter; @@ -92,12 +89,10 @@ import org.apache.druid.testing.embedded.indexing.MoreResources; import org.apache.druid.testing.embedded.junit5.EmbeddedClusterTestBase; import org.apache.druid.testing.tools.EventSerializer; -import org.apache.druid.testing.tools.ITRetryUtil; import org.apache.druid.testing.tools.JsonEventSerializer; import org.apache.druid.testing.tools.StreamGenerator; import org.apache.druid.testing.tools.WikipediaStreamEventStreamGenerator; import org.apache.druid.timeline.DataSegment; -import org.apache.druid.timeline.SegmentId; import org.hamcrest.Matcher; import org.hamcrest.Matchers; import org.joda.time.DateTime; @@ -114,7 +109,6 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.stream.Collectors; /** @@ -252,7 +246,7 @@ public void test_minorCompactionWithMSQ(PartitionsSpec partitionsSpec) throws Ex ingest1kRecords(); overlord.latchableEmitter().waitForNextEvent(event -> event.hasMetricName("segment/metadataCache/sync/time")); - waitSegmentsAvailableInBroker(); + cluster.callApi().waitForAllSegmentsToBeAvailable(dataSource, coordinator, broker); Assertions.assertEquals(2, getNumSegmentsWith(Granularities.DAY)); Assertions.assertEquals(2000, getTotalRowCount()); @@ -279,7 +273,7 @@ public void test_minorCompactionWithMSQ(PartitionsSpec partitionsSpec) throws Ex pauseCompaction(dayGranularityConfig); overlord.latchableEmitter().waitForNextEvent(event -> event.hasMetricName("segment/metadataCache/sync/time")); - waitSegmentsAvailableInBroker(); + cluster.callApi().waitForAllSegmentsToBeAvailable(dataSource, coordinator, broker); Assertions.assertEquals(1, getNumSegmentsWith(Granularities.DAY)); Assertions.assertEquals(2000, getTotalRowCount()); @@ -290,7 +284,7 @@ public void test_minorCompactionWithMSQ(PartitionsSpec partitionsSpec) throws Ex ingest1kRecords(); overlord.latchableEmitter().waitForNextEvent(event -> event.hasMetricName("segment/metadataCache/sync/time")); - waitSegmentsAvailableInBroker(); + cluster.callApi().waitForAllSegmentsToBeAvailable(dataSource, coordinator, broker); Assertions.assertEquals(3, getNumSegmentsWith(Granularities.DAY)); Assertions.assertEquals(4000, getTotalRowCount()); @@ -839,32 +833,6 @@ private int getNumSegmentsWith(Granularity granularity) .count(); } - private void waitSegmentsAvailableInBroker() - { - Set segments = overlord - .bindings() - .segmentsMetadataStorage() - .retrieveAllUsedSegments(dataSource, Segments.ONLY_VISIBLE) - .stream() - .map(DataSegment::getId) - .collect(Collectors.toSet()); - - ITRetryUtil.retryUntilEquals( - () -> - broker.bindings() - .getInstance(BrokerServerView.class) - .getTimeline(TableDataSource.create(dataSource)) - .get() - .iterateAllObjects() - .stream() - .map(ServerSelector::getSegment) - .map(DataSegment::getId) - .collect(Collectors.toSet()).containsAll(segments), - true, - "wait until segments are available in broker" - ); - } - private void runIngestionAtGranularity( String granularity, String inlineDataCsv