diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/setup/WorkerCategorySpec.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/setup/WorkerCategorySpec.java index 9f58d62d05a9..410aad78a912 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/setup/WorkerCategorySpec.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/setup/WorkerCategorySpec.java @@ -21,7 +21,9 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.druid.common.config.Configs; +import javax.annotation.Nullable; import java.util.Collections; import java.util.Map; import java.util.Objects; @@ -86,17 +88,19 @@ public String toString() public static class CategoryConfig { private final String defaultCategory; - // key: datasource, value: category private final Map categoryAffinity; + private final Map supervisorIdCategoryAffinity; @JsonCreator public CategoryConfig( @JsonProperty("defaultCategory") String defaultCategory, - @JsonProperty("categoryAffinity") Map categoryAffinity + @JsonProperty("categoryAffinity") Map categoryAffinity, + @JsonProperty("supervisorIdCategoryAffinity") @Nullable Map supervisorIdCategoryAffinity ) { this.defaultCategory = defaultCategory; this.categoryAffinity = categoryAffinity == null ? Collections.emptyMap() : categoryAffinity; + this.supervisorIdCategoryAffinity = Configs.valueOrDefault(supervisorIdCategoryAffinity, Map.of()); } @JsonProperty @@ -105,12 +109,25 @@ public String getDefaultCategory() return defaultCategory; } + /** + * Returns a map from datasource name to the worker category name to be used for tasks of that datasource. + */ @JsonProperty public Map getCategoryAffinity() { return categoryAffinity; } + /** + * Returns a map from supervisor ID to worker category name to be used for tasks of that supervisor. + * This takes precedence over {@link #getCategoryAffinity()} when both are configured. + */ + @JsonProperty + public Map getSupervisorIdCategoryAffinity() + { + return supervisorIdCategoryAffinity; + } + @Override public boolean equals(final Object o) { @@ -122,13 +139,14 @@ public boolean equals(final Object o) } final CategoryConfig that = (CategoryConfig) o; return Objects.equals(defaultCategory, that.defaultCategory) && - Objects.equals(categoryAffinity, that.categoryAffinity); + Objects.equals(categoryAffinity, that.categoryAffinity) && + Objects.equals(supervisorIdCategoryAffinity, that.supervisorIdCategoryAffinity); } @Override public int hashCode() { - return Objects.hash(defaultCategory, categoryAffinity); + return Objects.hash(defaultCategory, categoryAffinity, supervisorIdCategoryAffinity); } @Override @@ -137,6 +155,7 @@ public String toString() return "CategoryConfig{" + "defaultCategory=" + defaultCategory + ", categoryAffinity=" + categoryAffinity + + ", supervisorIdCategoryAffinity=" + supervisorIdCategoryAffinity + '}'; } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/setup/WorkerSelectUtils.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/setup/WorkerSelectUtils.java index be4e8426008b..e8c75a814bd4 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/setup/WorkerSelectUtils.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/setup/WorkerSelectUtils.java @@ -24,6 +24,7 @@ import org.apache.druid.indexing.common.task.Task; import org.apache.druid.indexing.overlord.ImmutableWorkerInfo; import org.apache.druid.indexing.overlord.config.WorkerTaskRunnerConfig; +import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTask; import javax.annotation.Nullable; import java.util.Collections; @@ -120,10 +121,25 @@ public static ImmutableWorkerInfo selectWorker( if (categoryConfig != null) { final String defaultCategory = categoryConfig.getDefaultCategory(); final Map categoryAffinity = categoryConfig.getCategoryAffinity(); - - String preferredCategory = categoryAffinity.get(task.getDataSource()); - // If there is no preferred category for the datasource, then using the defaultCategory. However, the defaultCategory - // may be null too, so we need to do one more null check (see below). + final Map supervisorIdCategoryAffinity = categoryConfig.getSupervisorIdCategoryAffinity(); + + String preferredCategory = null; + + // First, check if this task has a supervisorId and if there's a category affinity for it + if (task instanceof SeekableStreamIndexTask) { + final String supervisorId = ((SeekableStreamIndexTask) task).getSupervisorId(); + if (supervisorId != null) { + preferredCategory = supervisorIdCategoryAffinity.get(supervisorId); + } + } + + // If no supervisor-based category is found, fall back to datasource-based category affinity + if (preferredCategory == null) { + preferredCategory = categoryAffinity.get(task.getDataSource()); + } + + // If there is no preferred category for the supervisorId or datasource, then use the defaultCategory. + // However, the defaultCategory may be null too, so we need to do one more null check (see below). preferredCategory = preferredCategory == null ? defaultCategory : preferredCategory; if (preferredCategory != null) { diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/setup/EqualDistributionWithCategorySpecWorkerSelectStrategyTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/setup/EqualDistributionWithCategorySpecWorkerSelectStrategyTest.java index 6ac04a8920fc..7ac8b2b746ab 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/setup/EqualDistributionWithCategorySpecWorkerSelectStrategyTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/setup/EqualDistributionWithCategorySpecWorkerSelectStrategyTest.java @@ -20,14 +20,26 @@ package org.apache.druid.indexing.overlord.setup; import com.google.common.collect.ImmutableMap; +import org.apache.druid.data.input.impl.DimensionsSpec; +import org.apache.druid.data.input.impl.TimestampSpec; +import org.apache.druid.indexer.granularity.ArbitraryGranularitySpec; import org.apache.druid.indexing.common.task.NoopTask; +import org.apache.druid.indexing.common.task.Task; import org.apache.druid.indexing.overlord.ImmutableWorkerInfo; import org.apache.druid.indexing.overlord.config.RemoteTaskRunnerConfig; +import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskIOConfig; +import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskTuningConfig; +import org.apache.druid.indexing.seekablestream.TestSeekableStreamIndexTask; import org.apache.druid.indexing.worker.Worker; import org.apache.druid.java.util.common.DateTimes; +import org.apache.druid.java.util.common.granularity.AllGranularity; +import org.apache.druid.segment.indexing.DataSchema; import org.junit.Assert; import org.junit.Test; +import org.mockito.Mockito; +import javax.annotation.Nullable; +import java.util.Collections; import java.util.HashSet; public class EqualDistributionWithCategorySpecWorkerSelectStrategyTest @@ -80,7 +92,8 @@ public void testFindWorkerForTaskWithPreferredTier() "noop", new WorkerCategorySpec.CategoryConfig( "c2", - ImmutableMap.of("ds1", "c2") + ImmutableMap.of("ds1", "c2"), + null ) ), false @@ -95,7 +108,8 @@ public void testFindWorkerForTaskWithPreferredTier() "noop", new WorkerCategorySpec.CategoryConfig( null, - ImmutableMap.of("ds1", "c2") + ImmutableMap.of("ds1", "c2"), + null ) ), false @@ -110,6 +124,7 @@ public void testFindWorkerForTaskWithPreferredTier() "noop", new WorkerCategorySpec.CategoryConfig( "c2", + null, null ) ), @@ -127,6 +142,7 @@ public void testFindWorkerForTaskWithNullPreferredTier() ImmutableMap.of( "noop", new WorkerCategorySpec.CategoryConfig( + null, null, null ) @@ -146,7 +162,8 @@ public void testWeakTierSpec() "noop", new WorkerCategorySpec.CategoryConfig( "c1", - ImmutableMap.of("ds1", "c3") + ImmutableMap.of("ds1", "c3"), + null ) ), false @@ -164,7 +181,8 @@ public void testStrongTierSpec() "noop", new WorkerCategorySpec.CategoryConfig( "c1", - ImmutableMap.of("ds1", "c3") + ImmutableMap.of("ds1", "c3"), + null ) ), true @@ -174,6 +192,94 @@ public void testStrongTierSpec() Assert.assertNull(worker); } + @Test + public void testSupervisorIdCategoryAffinity() + { + final WorkerCategorySpec workerCategorySpec = new WorkerCategorySpec( + ImmutableMap.of( + "test_seekable_stream", + new WorkerCategorySpec.CategoryConfig( + "c1", + ImmutableMap.of("ds1", "c1"), + ImmutableMap.of("supervisor1", "c2") + ) + ), + false + ); + final Task taskWithSupervisor = createTestTask("task1", "supervisor1", "ds1"); + + final EqualDistributionWithCategorySpecWorkerSelectStrategy strategy = + new EqualDistributionWithCategorySpecWorkerSelectStrategy(workerCategorySpec, null); + + ImmutableWorkerInfo worker = strategy.findWorkerForTask( + new RemoteTaskRunnerConfig(), + WORKERS_FOR_TIER_TESTS, + taskWithSupervisor + ); + Assert.assertNotNull(worker); + Assert.assertEquals("c2", worker.getWorker().getCategory()); + Assert.assertEquals("localhost3", worker.getWorker().getHost()); + } + + @Test + public void testSupervisorIdCategoryAffinityFallbackToDatasource() + { + final WorkerCategorySpec workerCategorySpec = new WorkerCategorySpec( + ImmutableMap.of( + "test_seekable_stream", + new WorkerCategorySpec.CategoryConfig( + "c2", + ImmutableMap.of("ds1", "c1"), + ImmutableMap.of("supervisor2", "c2") + ) + ), + false + ); + final Task taskWithSupervisor = createTestTask("task1", "supervisor1", "ds1"); + + final EqualDistributionWithCategorySpecWorkerSelectStrategy strategy = + new EqualDistributionWithCategorySpecWorkerSelectStrategy(workerCategorySpec, null); + + ImmutableWorkerInfo worker = strategy.findWorkerForTask( + new RemoteTaskRunnerConfig(), + WORKERS_FOR_TIER_TESTS, + taskWithSupervisor + ); + Assert.assertNotNull(worker); + Assert.assertEquals("c1", worker.getWorker().getCategory()); + Assert.assertEquals("localhost1", worker.getWorker().getHost()); + } + + @Test + public void testSupervisorIdCategoryAffinityFallbackToDefault() + { + final WorkerCategorySpec workerCategorySpec = new WorkerCategorySpec( + ImmutableMap.of( + "test_seekable_stream", + new WorkerCategorySpec.CategoryConfig( + "c2", + ImmutableMap.of("ds2", "c1"), + ImmutableMap.of("supervisor2", "c1") + ) + ), + false + ); + + final Task taskWithSupervisor = createTestTask("task1", "supervisor1", "ds1"); + + final EqualDistributionWithCategorySpecWorkerSelectStrategy strategy = + new EqualDistributionWithCategorySpecWorkerSelectStrategy(workerCategorySpec, null); + + ImmutableWorkerInfo worker = strategy.findWorkerForTask( + new RemoteTaskRunnerConfig(), + WORKERS_FOR_TIER_TESTS, + taskWithSupervisor + ); + Assert.assertNotNull(worker); + Assert.assertEquals("c2", worker.getWorker().getCategory()); + Assert.assertEquals("localhost3", worker.getWorker().getHost()); + } + private ImmutableWorkerInfo selectWorker(WorkerCategorySpec workerCategorySpec) { final EqualDistributionWithCategorySpecWorkerSelectStrategy strategy = @@ -187,4 +293,27 @@ private ImmutableWorkerInfo selectWorker(WorkerCategorySpec workerCategorySpec) return worker; } + + /** + * Helper method to create a test task with supervisor ID for testing + */ + @SuppressWarnings("unchecked") + private static Task createTestTask(String id, @Nullable String supervisorId, String datasource) + { + return new TestSeekableStreamIndexTask( + id, + supervisorId, + null, + DataSchema.builder() + .withDataSource(datasource) + .withTimestamp(new TimestampSpec(null, null, null)) + .withDimensions(new DimensionsSpec(Collections.emptyList())) + .withGranularity(new ArbitraryGranularitySpec(new AllGranularity(), Collections.emptyList())) + .build(), + Mockito.mock(SeekableStreamIndexTaskTuningConfig.class), + Mockito.mock(SeekableStreamIndexTaskIOConfig.class), + null, + null + ); + } } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/setup/FillCapacityWithCategorySpecWorkerSelectStrategyTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/setup/FillCapacityWithCategorySpecWorkerSelectStrategyTest.java index 880ef743dca6..ea4ffb16af5b 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/setup/FillCapacityWithCategorySpecWorkerSelectStrategyTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/setup/FillCapacityWithCategorySpecWorkerSelectStrategyTest.java @@ -20,14 +20,26 @@ package org.apache.druid.indexing.overlord.setup; import com.google.common.collect.ImmutableMap; +import org.apache.druid.data.input.impl.DimensionsSpec; +import org.apache.druid.data.input.impl.TimestampSpec; +import org.apache.druid.indexer.granularity.ArbitraryGranularitySpec; import org.apache.druid.indexing.common.task.NoopTask; +import org.apache.druid.indexing.common.task.Task; import org.apache.druid.indexing.overlord.ImmutableWorkerInfo; import org.apache.druid.indexing.overlord.config.RemoteTaskRunnerConfig; +import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskIOConfig; +import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskTuningConfig; +import org.apache.druid.indexing.seekablestream.TestSeekableStreamIndexTask; import org.apache.druid.indexing.worker.Worker; import org.apache.druid.java.util.common.DateTimes; +import org.apache.druid.java.util.common.granularity.AllGranularity; +import org.apache.druid.segment.indexing.DataSchema; import org.junit.Assert; import org.junit.Test; +import org.mockito.Mockito; +import javax.annotation.Nullable; +import java.util.Collections; import java.util.HashSet; public class FillCapacityWithCategorySpecWorkerSelectStrategyTest @@ -80,7 +92,8 @@ public void testFindWorkerForTaskWithPreferredTier() "noop", new WorkerCategorySpec.CategoryConfig( "c1", - ImmutableMap.of("ds1", "c1") + ImmutableMap.of("ds1", "c1"), + null ) ), false @@ -95,7 +108,8 @@ public void testFindWorkerForTaskWithPreferredTier() "noop", new WorkerCategorySpec.CategoryConfig( null, - ImmutableMap.of("ds1", "c1") + ImmutableMap.of("ds1", "c1"), + null ) ), false @@ -110,6 +124,7 @@ public void testFindWorkerForTaskWithPreferredTier() "noop", new WorkerCategorySpec.CategoryConfig( "c1", + null, null ) ), @@ -127,6 +142,7 @@ public void testFindWorkerForTaskWithNullPreferredTier() ImmutableMap.of( "noop", new WorkerCategorySpec.CategoryConfig( + null, null, null ) @@ -146,7 +162,8 @@ public void testWeakTierSpec() "noop", new WorkerCategorySpec.CategoryConfig( "c1", - ImmutableMap.of("ds1", "c3") + ImmutableMap.of("ds1", "c3"), + null ) ), false @@ -164,7 +181,8 @@ public void testStrongTierSpec() "noop", new WorkerCategorySpec.CategoryConfig( "c1", - ImmutableMap.of("ds1", "c3") + ImmutableMap.of("ds1", "c3"), + null ) ), true @@ -174,6 +192,99 @@ public void testStrongTierSpec() Assert.assertNull(worker); } + @Test + public void testSupervisorIdCategoryAffinity() + { + final WorkerCategorySpec workerCategorySpec = new WorkerCategorySpec( + ImmutableMap.of( + "test_seekable_stream", + new WorkerCategorySpec.CategoryConfig( + "c1", + ImmutableMap.of("ds1", "c1"), + ImmutableMap.of("supervisor1", "c2") + ) + ), + false + ); + + // Create a test task with supervisor ID "supervisor1" + final Task taskWithSupervisor = createTestTask("task1", "supervisor1", "ds1"); + + final FillCapacityWithCategorySpecWorkerSelectStrategy strategy = + new FillCapacityWithCategorySpecWorkerSelectStrategy(workerCategorySpec, null); + + ImmutableWorkerInfo worker = strategy.findWorkerForTask( + new RemoteTaskRunnerConfig(), + WORKERS_FOR_TIER_TESTS, + taskWithSupervisor + ); + Assert.assertNotNull(worker); + Assert.assertEquals("c2", worker.getWorker().getCategory()); + Assert.assertEquals("localhost3", worker.getWorker().getHost()); + } + + @Test + public void testSupervisorIdCategoryAffinityFallbackToDatasource() + { + final WorkerCategorySpec workerCategorySpec = new WorkerCategorySpec( + ImmutableMap.of( + "test_seekable_stream", + new WorkerCategorySpec.CategoryConfig( + "c2", + ImmutableMap.of("ds1", "c1"), + ImmutableMap.of("supervisor2", "c2") + ) + ), + false + ); + + // Create a test task with supervisor ID "supervisor1" (not in supervisorIdCategoryAffinity map) + final Task taskWithSupervisor = createTestTask("task1", "supervisor1", "ds1"); + + final FillCapacityWithCategorySpecWorkerSelectStrategy strategy = + new FillCapacityWithCategorySpecWorkerSelectStrategy(workerCategorySpec, null); + + ImmutableWorkerInfo worker = strategy.findWorkerForTask( + new RemoteTaskRunnerConfig(), + WORKERS_FOR_TIER_TESTS, + taskWithSupervisor + ); + Assert.assertNotNull(worker); + Assert.assertEquals("c1", worker.getWorker().getCategory()); + Assert.assertEquals("localhost1", worker.getWorker().getHost()); + } + + @Test + public void testSupervisorIdCategoryAffinityFallbackToDefault() + { + final WorkerCategorySpec workerCategorySpec = new WorkerCategorySpec( + ImmutableMap.of( + "test_seekable_stream", + new WorkerCategorySpec.CategoryConfig( + "c2", + ImmutableMap.of("ds2", "c1"), + ImmutableMap.of("supervisor2", "c1") + ) + ), + false + ); + + // Create a test task with supervisor ID "supervisor1" and datasource "ds1" + final Task taskWithSupervisor = createTestTask("task1", "supervisor1", "ds1"); + + final FillCapacityWithCategorySpecWorkerSelectStrategy strategy = + new FillCapacityWithCategorySpecWorkerSelectStrategy(workerCategorySpec, null); + + ImmutableWorkerInfo worker = strategy.findWorkerForTask( + new RemoteTaskRunnerConfig(), + WORKERS_FOR_TIER_TESTS, + taskWithSupervisor + ); + Assert.assertNotNull(worker); + Assert.assertEquals("c2", worker.getWorker().getCategory()); + Assert.assertEquals("localhost3", worker.getWorker().getHost()); + } + private ImmutableWorkerInfo selectWorker(WorkerCategorySpec workerCategorySpec) { final FillCapacityWithCategorySpecWorkerSelectStrategy strategy = @@ -187,4 +298,27 @@ private ImmutableWorkerInfo selectWorker(WorkerCategorySpec workerCategorySpec) return worker; } + + /** + * Helper method to create a test task with supervisor ID for testing + */ + @SuppressWarnings("unchecked") + private static Task createTestTask(String id, @Nullable String supervisorId, String datasource) + { + return new TestSeekableStreamIndexTask( + id, + supervisorId, + null, + DataSchema.builder() + .withDataSource(datasource) + .withTimestamp(new TimestampSpec(null, null, null)) + .withDimensions(new DimensionsSpec(Collections.emptyList())) + .withGranularity(new ArbitraryGranularitySpec(new AllGranularity(), Collections.emptyList())) + .build(), + Mockito.mock(SeekableStreamIndexTaskTuningConfig.class), + Mockito.mock(SeekableStreamIndexTaskIOConfig.class), + null, + null + ); + } } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/setup/WorkerCategorySpecTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/setup/WorkerCategorySpecTest.java index 4277984fc10a..9576aa0ffbbd 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/setup/WorkerCategorySpecTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/setup/WorkerCategorySpecTest.java @@ -58,7 +58,7 @@ public void testSerde() throws Exception Assert.assertTrue(workerCategorySpec.isStrong()); Assert.assertEquals(ImmutableMap.of( "index_kafka", - new WorkerCategorySpec.CategoryConfig("c1", ImmutableMap.of("ds1", "c2")) + new WorkerCategorySpec.CategoryConfig("c1", ImmutableMap.of("ds1", "c2"), null) ), workerCategorySpec.getCategoryMap()); } } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/TestSeekableStreamIndexTask.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/TestSeekableStreamIndexTask.java new file mode 100644 index 000000000000..985aa7da706a --- /dev/null +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/TestSeekableStreamIndexTask.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.seekablestream; + +import org.apache.druid.data.input.impl.ByteEntity; +import org.apache.druid.indexing.common.TaskToolbox; +import org.apache.druid.indexing.common.task.TaskResource; +import org.apache.druid.indexing.seekablestream.common.RecordSupplier; +import org.apache.druid.segment.indexing.DataSchema; + +import javax.annotation.Nullable; +import java.util.Map; + +/** + * Test implementation of SeekableStreamIndexTask for use in unit tests. + */ +public class TestSeekableStreamIndexTask extends SeekableStreamIndexTask +{ + private final SeekableStreamIndexTaskRunner streamingTaskRunner; + private final RecordSupplier recordSupplier; + + public TestSeekableStreamIndexTask( + String id, + @Nullable String supervisorId, + @Nullable TaskResource taskResource, + DataSchema dataSchema, + SeekableStreamIndexTaskTuningConfig tuningConfig, + SeekableStreamIndexTaskIOConfig ioConfig, + @Nullable Map context, + @Nullable String groupId + ) + { + this(id, supervisorId, taskResource, dataSchema, tuningConfig, ioConfig, context, groupId, null, null); + } + + public TestSeekableStreamIndexTask( + String id, + @Nullable String supervisorId, + @Nullable TaskResource taskResource, + DataSchema dataSchema, + SeekableStreamIndexTaskTuningConfig tuningConfig, + SeekableStreamIndexTaskIOConfig ioConfig, + @Nullable Map context, + @Nullable String groupId, + @Nullable SeekableStreamIndexTaskRunner streamingTaskRunner, + @Nullable RecordSupplier recordSupplier + ) + { + super(id, supervisorId, taskResource, dataSchema, tuningConfig, ioConfig, context, groupId); + this.streamingTaskRunner = streamingTaskRunner; + this.recordSupplier = recordSupplier; + } + + @Nullable + @Override + protected SeekableStreamIndexTaskRunner createTaskRunner() + { + return streamingTaskRunner; + } + + @Override + protected RecordSupplier newTaskRecordSupplier(final TaskToolbox toolbox) + { + return recordSupplier; + } + + @Override + public String getType() + { + return "test_seekable_stream"; + } +} + diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java index e204a67cae84..5df9edd184d2 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java @@ -39,10 +39,8 @@ import org.apache.druid.indexer.TaskLocation; import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexer.granularity.UniformGranularitySpec; -import org.apache.druid.indexing.common.TaskToolbox; import org.apache.druid.indexing.common.TestUtils; import org.apache.druid.indexing.common.task.Task; -import org.apache.druid.indexing.common.task.TaskResource; import org.apache.druid.indexing.overlord.DataSourceMetadata; import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator; import org.apache.druid.indexing.overlord.TaskMaster; @@ -65,6 +63,7 @@ import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskTuningConfig; import org.apache.druid.indexing.seekablestream.SeekableStreamSequenceNumbers; import org.apache.druid.indexing.seekablestream.SeekableStreamStartSequenceNumbers; +import org.apache.druid.indexing.seekablestream.TestSeekableStreamIndexTask; import org.apache.druid.indexing.seekablestream.common.OrderedSequenceNumber; import org.apache.druid.indexing.seekablestream.common.RecordSupplier; import org.apache.druid.indexing.seekablestream.common.StreamException; @@ -1151,7 +1150,9 @@ public Duration getEmissionDuration() taskTuningConfig, taskIoConfig, context, - "0" + "0", + null, + recordSupplier ); TestSeekableStreamIndexTask id2 = new TestSeekableStreamIndexTask( @@ -1162,7 +1163,9 @@ public Duration getEmissionDuration() taskTuningConfig, taskIoConfig, context, - "0" + "0", + null, + recordSupplier ); TestSeekableStreamIndexTask id3 = new TestSeekableStreamIndexTask( @@ -1173,7 +1176,9 @@ public Duration getEmissionDuration() taskTuningConfig, taskIoConfig, context, - "0" + "0", + null, + recordSupplier ); final TaskLocation location1 = TaskLocation.create("testHost", 1234, -1); @@ -1364,7 +1369,9 @@ public Duration getEmissionDuration() ioConfig ), context, - "0" + "0", + null, + recordSupplier ); TestSeekableStreamIndexTask id2 = new TestSeekableStreamIndexTask( @@ -1384,7 +1391,9 @@ public Duration getEmissionDuration() ioConfig ), context, - "1" + "1", + null, + recordSupplier ); TestSeekableStreamIndexTask id3 = new TestSeekableStreamIndexTask( @@ -1404,7 +1413,9 @@ public Duration getEmissionDuration() ioConfig ), context, - "2" + "2", + null, + recordSupplier ); final TaskLocation location1 = TaskLocation.create("testHost", 1234, -1); @@ -1596,7 +1607,8 @@ public Duration getEmissionDuration() ), context, "0", - streamingTaskRunner + streamingTaskRunner, + recordSupplier ); final TaskLocation location1 = TaskLocation.create("testHost", 1234, -1); @@ -2907,78 +2919,6 @@ public String toString() }; } - private class TestSeekableStreamIndexTask extends SeekableStreamIndexTask - { - private final SeekableStreamIndexTaskRunner streamingTaskRunner; - - public TestSeekableStreamIndexTask( - String id, - @Nullable String supervisorId, - @Nullable TaskResource taskResource, - DataSchema dataSchema, - SeekableStreamIndexTaskTuningConfig tuningConfig, - SeekableStreamIndexTaskIOConfig ioConfig, - @Nullable Map context, - @Nullable String groupId - ) - { - this( - id, - supervisorId, - taskResource, - dataSchema, - tuningConfig, - ioConfig, - context, - groupId, - null - ); - } - - public TestSeekableStreamIndexTask( - String id, - @Nullable String supervisorId, - @Nullable TaskResource taskResource, - DataSchema dataSchema, - SeekableStreamIndexTaskTuningConfig tuningConfig, - SeekableStreamIndexTaskIOConfig ioConfig, - @Nullable Map context, - @Nullable String groupId, - @Nullable SeekableStreamIndexTaskRunner streamingTaskRunner - ) - { - super( - id, - supervisorId, - taskResource, - dataSchema, - tuningConfig, - ioConfig, - context, - groupId - ); - this.streamingTaskRunner = streamingTaskRunner; - } - - @Nullable - @Override - protected SeekableStreamIndexTaskRunner createTaskRunner() - { - return streamingTaskRunner; - } - - @Override - protected RecordSupplier newTaskRecordSupplier(final TaskToolbox toolbox) - { - return recordSupplier; - } - - @Override - public String getType() - { - return "test"; - } - } private abstract class BaseTestSeekableStreamSupervisor extends SeekableStreamSupervisor { @@ -3069,7 +3009,9 @@ protected List> createIndexT taskTuningConfig, taskIoConfig, null, - null + null, + null, + recordSupplier )); }