diff --git a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/compact/CompactionSupervisorTest.java b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/compact/CompactionSupervisorTest.java index 2c68c0eeab6d..bc9f72365309 100644 --- a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/compact/CompactionSupervisorTest.java +++ b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/compact/CompactionSupervisorTest.java @@ -23,6 +23,8 @@ import org.apache.druid.catalog.guice.CatalogCoordinatorModule; import org.apache.druid.common.utils.IdUtils; import org.apache.druid.data.input.impl.DimensionsSpec; +import org.apache.druid.data.input.impl.InlineInputSource; +import org.apache.druid.data.input.impl.JsonInputFormat; import org.apache.druid.data.input.impl.StringDimensionSchema; import org.apache.druid.data.input.impl.TimestampSpec; import org.apache.druid.indexer.CompactionEngine; @@ -32,12 +34,12 @@ import org.apache.druid.indexer.partitions.PartitionsSpec; import org.apache.druid.indexing.common.task.IndexTask; import org.apache.druid.indexing.common.task.TaskBuilder; +import org.apache.druid.indexing.common.task.TuningConfigBuilder; +import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexIOConfig; +import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexIngestionSpec; +import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexSupervisorTask; import org.apache.druid.indexing.compact.CascadingReindexingTemplate; import org.apache.druid.indexing.compact.CompactionSupervisorSpec; -import org.apache.druid.indexing.kafka.KafkaIndexTaskModule; -import org.apache.druid.indexing.kafka.simulate.KafkaResource; -import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorSpec; -import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorSpecBuilder; import org.apache.druid.indexing.overlord.Segments; import org.apache.druid.jackson.DefaultObjectMapper; import org.apache.druid.java.util.common.DateTimes; @@ -55,6 +57,7 @@ import org.apache.druid.rpc.UpdateResponse; import org.apache.druid.segment.VirtualColumns; import org.apache.druid.segment.column.ColumnType; +import org.apache.druid.segment.indexing.DataSchema; import org.apache.druid.segment.metadata.DefaultIndexingStateFingerprintMapper; import org.apache.druid.segment.metadata.IndexingStateCache; import org.apache.druid.segment.metadata.IndexingStateFingerprintMapper; @@ -76,6 +79,7 @@ import org.apache.druid.server.metrics.LatchableEmitter; import org.apache.druid.server.metrics.StorageMonitor; import org.apache.druid.testing.embedded.EmbeddedBroker; +import org.apache.druid.testing.embedded.EmbeddedClusterApis; import org.apache.druid.testing.embedded.EmbeddedCoordinator; import org.apache.druid.testing.embedded.EmbeddedDruidCluster; import org.apache.druid.testing.embedded.EmbeddedHistorical; @@ -89,7 +93,6 @@ import org.apache.druid.testing.tools.StreamGenerator; import org.apache.druid.testing.tools.WikipediaStreamEventStreamGenerator; import org.apache.druid.timeline.DataSegment; -import org.apache.kafka.clients.producer.ProducerRecord; import org.hamcrest.Matcher; import org.hamcrest.Matchers; import org.joda.time.DateTime; @@ -101,6 +104,7 @@ import org.junit.jupiter.params.provider.MethodSource; import javax.annotation.Nullable; +import java.nio.charset.StandardCharsets; import java.time.Duration; import java.util.ArrayList; import java.util.List; @@ -112,7 +116,6 @@ */ public class CompactionSupervisorTest extends EmbeddedClusterTestBase { - private final KafkaResource kafkaServer = new KafkaResource(); private final EmbeddedBroker broker = new EmbeddedBroker(); private final EmbeddedIndexer indexer = new EmbeddedIndexer() .setServerMemory(2_000_000_000L) @@ -138,12 +141,7 @@ public EmbeddedDruidCluster createCluster() "[\"org.apache.druid.query.policy.NoRestrictionPolicy\"]" ) .addCommonProperty("druid.policy.enforcer.type", "restrictAllTables") - .addExtensions( - CatalogClientModule.class, - CatalogCoordinatorModule.class, - KafkaIndexTaskModule.class - ) - .addResource(kafkaServer) + .addExtensions(CatalogClientModule.class, CatalogCoordinatorModule.class) .addServer(coordinator) .addServer(overlord) .addServer(indexer) @@ -156,7 +154,14 @@ public EmbeddedDruidCluster createCluster() private void configureCompaction(CompactionEngine compactionEngine, @Nullable CompactionCandidateSearchPolicy policy) { final UpdateResponse updateResponse = cluster.callApi().onLeaderOverlord( - o -> o.updateClusterCompactionConfig(new ClusterCompactionConfig(1.0, 100, policy, true, compactionEngine, true)) + o -> o.updateClusterCompactionConfig(new ClusterCompactionConfig( + 1.0, + 100, + policy, + true, + compactionEngine, + true + )) ); Assertions.assertTrue(updateResponse.isSuccess()); } @@ -236,24 +241,14 @@ public void test_minorCompactionWithMSQ(PartitionsSpec partitionsSpec) throws Ex CompactionEngine.MSQ, new MostFragmentedIntervalFirstPolicy(2, new HumanReadableBytes("1KiB"), null, 80, null) ); - KafkaSupervisorSpecBuilder kafkaSupervisorSpecBuilder = MoreResources.Supervisor.KAFKA_JSON - .get() - .withDataSchema(schema -> schema.withTimestamp(new TimestampSpec("timestamp", "iso", null)) - .withDimensions(DimensionsSpec.builder().useSchemaDiscovery(true).build())) - .withTuningConfig(tuningConfig -> tuningConfig.withMaxRowsPerSegment(1)) - .withIoConfig(ioConfig -> ioConfig.withConsumerProperties(kafkaServer.consumerProperties()).withTaskCount(2)); - // Set up first topic and supervisor - final String topic1 = IdUtils.getRandomId(); - kafkaServer.createTopicWithPartitions(topic1, 1); - final KafkaSupervisorSpec supervisor1 = kafkaSupervisorSpecBuilder.withId(topic1).build(dataSource, topic1); - cluster.callApi().postSupervisor(supervisor1); + ingest1kRecords(); + ingest1kRecords(); - final int totalRowCount = publish1kRecords(topic1, true) + publish1kRecords(topic1, false); - waitUntilPublishedRecordsAreIngested(totalRowCount); - - // Before compaction - Assertions.assertEquals(4, getNumSegmentsWith(Granularities.HOUR)); + overlord.latchableEmitter().waitForNextEvent(event -> event.hasMetricName("segment/metadataCache/sync/time")); + cluster.callApi().waitForAllSegmentsToBeAvailable(dataSource, coordinator, broker); + Assertions.assertEquals(2, getNumSegmentsWith(Granularities.DAY)); + Assertions.assertEquals(2000, getTotalRowCount()); // Create a compaction config with DAY granularity InlineSchemaDataSourceCompactionConfig dayGranularityConfig = @@ -276,31 +271,28 @@ public void test_minorCompactionWithMSQ(PartitionsSpec partitionsSpec) throws Ex waitForAllCompactionTasksToFinish(); pauseCompaction(dayGranularityConfig); - Assertions.assertEquals(0, getNumSegmentsWith(Granularities.HOUR)); + + overlord.latchableEmitter().waitForNextEvent(event -> event.hasMetricName("segment/metadataCache/sync/time")); + cluster.callApi().waitForAllSegmentsToBeAvailable(dataSource, coordinator, broker); Assertions.assertEquals(1, getNumSegmentsWith(Granularities.DAY)); Assertions.assertEquals(2000, getTotalRowCount()); verifyCompactedSegmentsHaveFingerprints(dayGranularityConfig); - // published another 1k - final int appendedRowCount = publish1kRecords(topic1, true); - indexer.latchableEmitter().flush(); - waitUntilPublishedRecordsAreIngested(appendedRowCount); + // ingest another 2k + ingest1kRecords(); + ingest1kRecords(); - // Tear down both topics and supervisors - kafkaServer.deleteTopic(topic1); - cluster.callApi().postSupervisor(supervisor1.createSuspendedSpec()); + overlord.latchableEmitter().waitForNextEvent(event -> event.hasMetricName("segment/metadataCache/sync/time")); + cluster.callApi().waitForAllSegmentsToBeAvailable(dataSource, coordinator, broker); + Assertions.assertEquals(3, getNumSegmentsWith(Granularities.DAY)); + Assertions.assertEquals(4000, getTotalRowCount()); long totalUsed = overlord.latchableEmitter().getMetricValues( "segment/metadataCache/used/count", Map.of(DruidMetrics.DATASOURCE, dataSource) ).stream().reduce((first, second) -> second).orElse(0).longValue(); - Assertions.assertEquals(0, getNumSegmentsWith(Granularities.HOUR)); - // 1 compacted segment + 2 appended segment - Assertions.assertEquals(3, getNumSegmentsWith(Granularities.DAY)); - Assertions.assertEquals(3000, getTotalRowCount()); - runCompactionWithSpec(dayGranularityConfig); waitForAllCompactionTasksToFinish(); @@ -310,45 +302,44 @@ public void test_minorCompactionWithMSQ(PartitionsSpec partitionsSpec) throws Ex .hasDimension(DruidMetrics.DATASOURCE, dataSource) .hasValueMatching(Matchers.greaterThan(totalUsed))); - // performed minor compaction: 1 previously compacted segment + 1 incrementally compacted segment + // performed minor compaction: 1 previously compacted segment + 1 recently compacted segment from minor compaction Assertions.assertEquals(2, getNumSegmentsWith(Granularities.DAY)); - Assertions.assertEquals(3000, getTotalRowCount()); - } - - protected void waitUntilPublishedRecordsAreIngested(int expectedRowCount) - { - indexer.latchableEmitter().waitForEventAggregate( - event -> event.hasMetricName("ingest/events/processed") - .hasDimension(DruidMetrics.DATASOURCE, dataSource), - agg -> agg.hasSumAtLeast(expectedRowCount) - ); - - final int totalEventsProcessed = indexer - .latchableEmitter() - .getMetricValues("ingest/events/processed", Map.of(DruidMetrics.DATASOURCE, dataSource)) - .stream() - .mapToInt(Number::intValue) - .sum(); - Assertions.assertEquals(expectedRowCount, totalEventsProcessed); + Assertions.assertEquals(4000, getTotalRowCount()); } - protected int publish1kRecords(String topic, boolean useTransactions) + protected void ingest1kRecords() { final EventSerializer serializer = new JsonEventSerializer(overlord.bindings().jsonMapper()); - final StreamGenerator streamGenerator = new WikipediaStreamEventStreamGenerator(serializer, 100, 100); - List records = streamGenerator.generateEvents(10); - - ArrayList> producerRecords = new ArrayList<>(); - for (byte[] record : records) { - producerRecords.add(new ProducerRecord<>(topic, record)); - } - - if (useTransactions) { - kafkaServer.produceRecordsToTopic(producerRecords); - } else { - kafkaServer.produceRecordsWithoutTransaction(producerRecords); - } - return producerRecords.size(); + final StreamGenerator streamGenerator = new WikipediaStreamEventStreamGenerator(serializer, 500, 100); + List records = streamGenerator.generateEvents(2); + + final InlineInputSource input = new InlineInputSource( + records.stream().map(b -> new String(b, StandardCharsets.UTF_8)).collect(Collectors.joining("\n"))); + final ParallelIndexIOConfig ioConfig = new ParallelIndexIOConfig( + input, + new JsonInputFormat(null, null, null, null, null), + true, + null + ); + final ParallelIndexIngestionSpec indexIngestionSpec = new ParallelIndexIngestionSpec( + DataSchema.builder() + .withDataSource(dataSource) + .withTimestamp(new TimestampSpec("timestamp", "iso", null)) + .withDimensions(DimensionsSpec.builder().useSchemaDiscovery(true).build()) + .build(), + ioConfig, + TuningConfigBuilder.forParallelIndexTask().build() + ); + final String taskId = EmbeddedClusterApis.newTaskId(dataSource); + final ParallelIndexSupervisorTask task = new ParallelIndexSupervisorTask( + taskId, + null, + null, + indexIngestionSpec, + null + ); + cluster.callApi().submitTask(task); + cluster.callApi().waitForTaskToSucceed(taskId, overlord); } @MethodSource("getEngine") @@ -859,7 +850,7 @@ private void runIngestionAtGranularity( public static List getPartitionsSpec() { return List.of( - new DimensionRangePartitionsSpec(null, 5000, List.of("page"), false), + new DimensionRangePartitionsSpec(null, 10_000, List.of("page"), false), new DynamicPartitionsSpec(null, null) ); }