From bb4cb94ab7ba30fa8a5b613db13c2729dd6d7a24 Mon Sep 17 00:00:00 2001 From: Pankaj Kumar Date: Tue, 14 Oct 2025 13:33:42 +0530 Subject: [PATCH 1/8] minor change --- .../druid/indexing/overlord/duty/UnusedSegmentsKillerTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/duty/UnusedSegmentsKillerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/duty/UnusedSegmentsKillerTest.java index 0cc5f6f5f232..a6d0caa75124 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/duty/UnusedSegmentsKillerTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/duty/UnusedSegmentsKillerTest.java @@ -231,7 +231,7 @@ public void test_maxSegmentsKilledInAnInterval_is_1k() ); } - @Test(timeout = 20_000L) + @Test(timeout = 30_000L) public void test_maxIntervalsKilledInADatasource_is_10k() { leaderSelector.becomeLeader(); From 593b232f894f39ef52adcbd2b907291d9cdd4073 Mon Sep 17 00:00:00 2001 From: Pankaj Kumar Date: Mon, 13 Oct 2025 16:11:32 +0530 Subject: [PATCH 2/8] Add new task distribution strategy --- ...visorCategorySpecWorkerSelectStrategy.java | 108 ++++++++ .../overlord/setup/WorkerSelectStrategy.java | 3 +- .../overlord/setup/WorkerSelectUtils.java | 59 +++++ ...rCategorySpecWorkerSelectStrategyTest.java | 245 ++++++++++++++++++ 4 files changed, 414 insertions(+), 1 deletion(-) create mode 100644 indexing-service/src/main/java/org/apache/druid/indexing/overlord/setup/EqualDistributionWithSupervisorCategorySpecWorkerSelectStrategy.java create mode 100644 indexing-service/src/test/java/org/apache/druid/indexing/overlord/setup/EqualDistributionWithSupervisorCategorySpecWorkerSelectStrategyTest.java diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/setup/EqualDistributionWithSupervisorCategorySpecWorkerSelectStrategy.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/setup/EqualDistributionWithSupervisorCategorySpecWorkerSelectStrategy.java new file mode 100644 index 000000000000..d034ee3afc6b --- /dev/null +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/setup/EqualDistributionWithSupervisorCategorySpecWorkerSelectStrategy.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.overlord.setup; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.collect.ImmutableMap; +import org.apache.druid.common.config.Configs; +import org.apache.druid.indexing.common.task.Task; +import org.apache.druid.indexing.overlord.ImmutableWorkerInfo; +import org.apache.druid.indexing.overlord.config.WorkerTaskRunnerConfig; + +import javax.annotation.Nullable; +import java.util.Objects; + +public class EqualDistributionWithSupervisorCategorySpecWorkerSelectStrategy implements WorkerSelectStrategy +{ + private final WorkerCategorySpec workerCategorySpec; + private final TaskLimits taskLimits; + + @JsonCreator + public EqualDistributionWithSupervisorCategorySpecWorkerSelectStrategy( + @JsonProperty("workerCategorySpec") WorkerCategorySpec workerCategorySpec, + @JsonProperty("taskLimits") @Nullable TaskLimits taskLimits + ) + { + this.workerCategorySpec = workerCategorySpec; + this.taskLimits = Configs.valueOrDefault(taskLimits, TaskLimits.EMPTY); + } + + @JsonProperty + public WorkerCategorySpec getWorkerCategorySpec() + { + return workerCategorySpec; + } + + @JsonProperty + public TaskLimits getTaskLimits() + { + return taskLimits; + } + + @Nullable + @Override + public ImmutableWorkerInfo findWorkerForTask( + final WorkerTaskRunnerConfig config, + final ImmutableMap zkWorkers, + final Task task + ) + { + return WorkerSelectUtils.selectWorkerBySupervisorCategorySpec( + task, + zkWorkers, + config, + workerCategorySpec, + EqualDistributionWorkerSelectStrategy::selectFromEligibleWorkers, + taskLimits + ); + } + + @Override + public boolean equals(final Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + final EqualDistributionWithSupervisorCategorySpecWorkerSelectStrategy that = (EqualDistributionWithSupervisorCategorySpecWorkerSelectStrategy) o; + return Objects.equals(workerCategorySpec, that.workerCategorySpec) + && Objects.equals(taskLimits, that.taskLimits); + } + + @Override + public int hashCode() + { + return Objects.hash(workerCategorySpec, taskLimits); + } + + @Override + public String toString() + { + return "EqualDistributionWithSupervisorCategorySpecWorkerSelectStrategy{" + + "workerCategorySpec=" + workerCategorySpec + + ", taskLimits=" + taskLimits + + '}'; + } +} + + diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/setup/WorkerSelectStrategy.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/setup/WorkerSelectStrategy.java index a3443ee73583..d0482d239a42 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/setup/WorkerSelectStrategy.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/setup/WorkerSelectStrategy.java @@ -40,7 +40,8 @@ @JsonSubTypes.Type(name = "equalDistributionWithAffinity", value = EqualDistributionWithAffinityWorkerSelectStrategy.class), @JsonSubTypes.Type(name = "javascript", value = JavaScriptWorkerSelectStrategy.class), @JsonSubTypes.Type(name = "fillCapacityWithCategorySpec", value = FillCapacityWithCategorySpecWorkerSelectStrategy.class), - @JsonSubTypes.Type(name = "equalDistributionWithCategorySpec", value = EqualDistributionWithCategorySpecWorkerSelectStrategy.class) + @JsonSubTypes.Type(name = "equalDistributionWithCategorySpec", value = EqualDistributionWithCategorySpecWorkerSelectStrategy.class), + @JsonSubTypes.Type(name = "equalDistributionWithSupervisorCategorySpec", value = EqualDistributionWithSupervisorCategorySpecWorkerSelectStrategy.class) }) @PublicApi public interface WorkerSelectStrategy diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/setup/WorkerSelectUtils.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/setup/WorkerSelectUtils.java index be4e8426008b..5a41a82489a1 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/setup/WorkerSelectUtils.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/setup/WorkerSelectUtils.java @@ -24,6 +24,7 @@ import org.apache.druid.indexing.common.task.Task; import org.apache.druid.indexing.overlord.ImmutableWorkerInfo; import org.apache.druid.indexing.overlord.config.WorkerTaskRunnerConfig; +import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTask; import javax.annotation.Nullable; import java.util.Collections; @@ -144,6 +145,64 @@ public static ImmutableWorkerInfo selectWorker( return workerSelector.apply(ImmutableMap.copyOf(runnableWorkers)); } + /** + * Helper for {@link WorkerSelectStrategy} implementations. + * + * @param allWorkers map of all workers, in the style provided to {@link WorkerSelectStrategy} + * @param workerCategorySpec worker category spec, or null + * @param workerSelector function that receives a list of eligible workers: version is high enough, worker can run + * the task, and worker satisfies the worker category spec. may return null. + * + * @return selected worker from "allWorkers", or null. + */ + @Nullable + public static ImmutableWorkerInfo selectWorkerBySupervisorCategorySpec( + final Task task, + final Map allWorkers, + final WorkerTaskRunnerConfig workerTaskRunnerConfig, + @Nullable final WorkerCategorySpec workerCategorySpec, + final Function, ImmutableWorkerInfo> workerSelector, + final TaskLimits taskLimits + ) + { + final Map runnableWorkers = getRunnableWorkers(task, allWorkers, workerTaskRunnerConfig, taskLimits); + + // select worker according to worker category spec, keyed by supervisorId + if (workerCategorySpec != null) { + final WorkerCategorySpec.CategoryConfig categoryConfig = workerCategorySpec.getCategoryMap().get(task.getType()); + + if (categoryConfig != null) { + final String defaultCategory = categoryConfig.getDefaultCategory(); + final Map categoryAffinity = categoryConfig.getCategoryAffinity(); + + String keyForAffinity; + if (task instanceof SeekableStreamIndexTask) { + keyForAffinity = ((SeekableStreamIndexTask) task).getSupervisorId(); + } else { + // fall back to datasource affinity if no supervisorId + keyForAffinity = task.getDataSource(); + } + String preferredCategory = categoryAffinity.get(keyForAffinity); + preferredCategory = preferredCategory == null ? defaultCategory : preferredCategory; + + if (preferredCategory != null) { + // select worker from preferred category + final ImmutableMap categoryWorkers = getCategoryWorkers(preferredCategory, runnableWorkers); + final ImmutableWorkerInfo selected = workerSelector.apply(categoryWorkers); + + if (selected != null) { + return selected; + } else if (workerCategorySpec.isStrong()) { + return null; + } + } + } + } + + // select worker from all runnable workers by default + return workerSelector.apply(ImmutableMap.copyOf(runnableWorkers)); + } + // Get workers that could potentially run this task, ignoring affinityConfig/workerCategorySpec. private static Map getRunnableWorkers( final Task task, diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/setup/EqualDistributionWithSupervisorCategorySpecWorkerSelectStrategyTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/setup/EqualDistributionWithSupervisorCategorySpecWorkerSelectStrategyTest.java new file mode 100644 index 000000000000..d47bcd215ac0 --- /dev/null +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/setup/EqualDistributionWithSupervisorCategorySpecWorkerSelectStrategyTest.java @@ -0,0 +1,245 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.overlord.setup; + +import com.google.common.collect.ImmutableMap; +import org.apache.druid.data.input.impl.ByteEntity; +import org.apache.druid.data.input.impl.DimensionsSpec; +import org.apache.druid.data.input.impl.TimestampSpec; +import org.apache.druid.indexer.granularity.ArbitraryGranularitySpec; +import org.apache.druid.indexing.common.TaskToolbox; +import org.apache.druid.indexing.common.task.AbstractTask; +import org.apache.druid.indexing.common.task.NoopTask; +import org.apache.druid.indexing.common.task.TaskResource; +import org.apache.druid.indexing.overlord.ImmutableWorkerInfo; +import org.apache.druid.indexing.overlord.config.RemoteTaskRunnerConfig; +import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTask; +import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskIOConfig; +import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner; +import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskTuningConfig; +import org.apache.druid.indexing.seekablestream.common.RecordSupplier; +import org.apache.druid.indexing.worker.Worker; +import org.apache.druid.java.util.common.DateTimes; +import org.apache.druid.java.util.common.granularity.AllGranularity; +import org.apache.druid.segment.indexing.DataSchema; +import org.junit.Assert; +import org.junit.Test; +import org.mockito.Mockito; + +import javax.annotation.Nullable; +import java.util.Collections; +import java.util.HashSet; +import java.util.Map; + + +public class EqualDistributionWithSupervisorCategorySpecWorkerSelectStrategyTest +{ + private static final ImmutableMap WORKERS_FOR_TIER_TESTS = + ImmutableMap.of( + "localhost0", + new ImmutableWorkerInfo( + new Worker("http", "localhost0", "localhost0", 1, "v1", "c1"), 0, + new HashSet<>(), + new HashSet<>(), + DateTimes.nowUtc() + ), + "localhost1", + new ImmutableWorkerInfo( + new Worker("http", "localhost1", "localhost1", 2, "v1", "c1"), 0, + new HashSet<>(), + new HashSet<>(), + DateTimes.nowUtc() + ), + "localhost2", + new ImmutableWorkerInfo( + new Worker("http", "localhost2", "localhost2", 3, "v1", "c2"), 0, + new HashSet<>(), + new HashSet<>(), + DateTimes.nowUtc() + ), + "localhost3", + new ImmutableWorkerInfo( + new Worker("http", "localhost3", "localhost3", 4, "v1", "c2"), 0, + new HashSet<>(), + new HashSet<>(), + DateTimes.nowUtc() + ) + ); + + @Test + public void testFindWorkerForTaskWithNullWorkerTierSpec() + { + ImmutableWorkerInfo worker = selectWorker(null, new TestStreamingTask("id1", "sup-1", "ds1", null, null)); + Assert.assertEquals("localhost3", worker.getWorker().getHost()); + } + + @Test + public void testFindWorkerForTaskWithPreferredTierBySupervisor() + { + final WorkerCategorySpec workerCategorySpec = new WorkerCategorySpec( + ImmutableMap.of( + "noop", + new WorkerCategorySpec.CategoryConfig( + "c1", + ImmutableMap.of("sup-2", "c2") + ) + ), + false + ); + + // supervisor sup-2 prefers c2, which should pick highest-capacity in c2 -> localhost3 + ImmutableWorkerInfo worker1 = selectWorker(workerCategorySpec, new TestStreamingTask("id2", "sup-2", "ds1", null, null)); + Assert.assertEquals("localhost3", worker1.getWorker().getHost()); + + // not specified, defaultCategory c1 -> pick highest in c1 -> localhost1 + ImmutableWorkerInfo worker2 = selectWorker(workerCategorySpec, new TestStreamingTask("id3", "sup-1", "ds1", null, null)); + Assert.assertEquals("localhost1", worker2.getWorker().getHost()); + } + + @Test + public void testWeakTierSpecFallsBack() + { + final WorkerCategorySpec workerCategorySpec = new WorkerCategorySpec( + ImmutableMap.of( + "noop", + new WorkerCategorySpec.CategoryConfig( + "c1", + ImmutableMap.of("sup-x", "c3") + ) + ), + false + ); + + // preferred category c3 doesn't exist; weak spec -> choose from all runnable (highest capacity overall -> localhost3) + ImmutableWorkerInfo worker = selectWorker(workerCategorySpec, new TestStreamingTask("id4", "sup-x", "ds1", null, null)); + Assert.assertEquals("localhost3", worker.getWorker().getHost()); + } + + @Test + public void testStrongTierSpecReturnsNullIfUnavailable() + { + final WorkerCategorySpec workerCategorySpec = new WorkerCategorySpec( + ImmutableMap.of( + "noop", + new WorkerCategorySpec.CategoryConfig( + "c1", + ImmutableMap.of("sup-x", "c3") + ) + ), + true + ); + + ImmutableWorkerInfo worker = selectWorker(workerCategorySpec, new TestStreamingTask("id5", "sup-x", "ds1", null, null)); + Assert.assertNull(worker); + } + + @Test + public void testNonSeekableTaskFallsBackToDatasource() + { + final WorkerCategorySpec workerCategorySpec = new WorkerCategorySpec( + ImmutableMap.of( + "noop", + new WorkerCategorySpec.CategoryConfig( + "c1", + ImmutableMap.of("ds1", "c2") + ) + ), + false + ); + + // No supervisorId available -> uses datasource mapping to c2 -> highest in c2 -> localhost3 + ImmutableWorkerInfo worker = selectWorker(workerCategorySpec, NoopTask.forDatasource("ds1")); + Assert.assertEquals("localhost3", worker.getWorker().getHost()); + } + + private ImmutableWorkerInfo selectWorker(WorkerCategorySpec workerCategorySpec, AbstractTask task) + { + final EqualDistributionWithSupervisorCategorySpecWorkerSelectStrategy strategy = + new EqualDistributionWithSupervisorCategorySpecWorkerSelectStrategy(workerCategorySpec, null); + + return strategy.findWorkerForTask( + new RemoteTaskRunnerConfig(), + WORKERS_FOR_TIER_TESTS, + task + ); + } + + private static class TestStreamingTask extends SeekableStreamIndexTask + { + TestStreamingTask( + String id, + @Nullable String supervisorId, + String datasource, + @Nullable Map context, + @Nullable String groupId + ) + { + this( + id, + supervisorId, + null, + DataSchema.builder() + .withDataSource(datasource) + .withTimestamp(new TimestampSpec(null, null, null)) + .withDimensions(new DimensionsSpec(Collections.emptyList())) + .withGranularity(new ArbitraryGranularitySpec(new AllGranularity(), Collections.emptyList())) + .build(), + Mockito.mock(SeekableStreamIndexTaskTuningConfig.class), + Mockito.mock(SeekableStreamIndexTaskIOConfig.class), + context, + groupId + ); + } + + private TestStreamingTask( + String id, + @Nullable String supervisorId, + @Nullable TaskResource taskResource, + DataSchema dataSchema, + SeekableStreamIndexTaskTuningConfig tuningConfig, + SeekableStreamIndexTaskIOConfig ioConfig, + @Nullable Map context, + @Nullable String groupId + ) + { + super(id, supervisorId, taskResource, dataSchema, tuningConfig, ioConfig, context, groupId); + } + + @Override + protected SeekableStreamIndexTaskRunner createTaskRunner() + { + return null; + } + + @Override + protected RecordSupplier newTaskRecordSupplier(final TaskToolbox toolbox) + { + return null; + } + + @Override + public String getType() + { + return "noop"; + } + } +} + + From 1467da9b56338e2964c35f92306ac5d74715cccf Mon Sep 17 00:00:00 2001 From: Pankaj Kumar Date: Wed, 15 Oct 2025 10:48:06 +0530 Subject: [PATCH 3/8] revert --- .../druid/indexing/overlord/duty/UnusedSegmentsKillerTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/duty/UnusedSegmentsKillerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/duty/UnusedSegmentsKillerTest.java index a6d0caa75124..0cc5f6f5f232 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/duty/UnusedSegmentsKillerTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/duty/UnusedSegmentsKillerTest.java @@ -231,7 +231,7 @@ public void test_maxSegmentsKilledInAnInterval_is_1k() ); } - @Test(timeout = 30_000L) + @Test(timeout = 20_000L) public void test_maxIntervalsKilledInADatasource_is_10k() { leaderSelector.becomeLeader(); From 44a27de85aa7d6920eb3679588f2b1488164cfb6 Mon Sep 17 00:00:00 2001 From: Pankaj Kumar Date: Sun, 19 Oct 2025 00:14:43 +0530 Subject: [PATCH 4/8] Adding map for supervisorCategoryIdAffinity --- ...visorCategorySpecWorkerSelectStrategy.java | 108 -------- .../overlord/setup/WorkerCategorySpec.java | 18 +- .../overlord/setup/WorkerSelectStrategy.java | 3 +- .../overlord/setup/WorkerSelectUtils.java | 73 ++---- ...hCategorySpecWorkerSelectStrategyTest.java | 195 +++++++++++++- ...rCategorySpecWorkerSelectStrategyTest.java | 245 ------------------ ...hCategorySpecWorkerSelectStrategyTest.java | 14 +- .../setup/WorkerCategorySpecTest.java | 2 +- 8 files changed, 233 insertions(+), 425 deletions(-) delete mode 100644 indexing-service/src/main/java/org/apache/druid/indexing/overlord/setup/EqualDistributionWithSupervisorCategorySpecWorkerSelectStrategy.java delete mode 100644 indexing-service/src/test/java/org/apache/druid/indexing/overlord/setup/EqualDistributionWithSupervisorCategorySpecWorkerSelectStrategyTest.java diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/setup/EqualDistributionWithSupervisorCategorySpecWorkerSelectStrategy.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/setup/EqualDistributionWithSupervisorCategorySpecWorkerSelectStrategy.java deleted file mode 100644 index d034ee3afc6b..000000000000 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/setup/EqualDistributionWithSupervisorCategorySpecWorkerSelectStrategy.java +++ /dev/null @@ -1,108 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.druid.indexing.overlord.setup; - -import com.fasterxml.jackson.annotation.JsonCreator; -import com.fasterxml.jackson.annotation.JsonProperty; -import com.google.common.collect.ImmutableMap; -import org.apache.druid.common.config.Configs; -import org.apache.druid.indexing.common.task.Task; -import org.apache.druid.indexing.overlord.ImmutableWorkerInfo; -import org.apache.druid.indexing.overlord.config.WorkerTaskRunnerConfig; - -import javax.annotation.Nullable; -import java.util.Objects; - -public class EqualDistributionWithSupervisorCategorySpecWorkerSelectStrategy implements WorkerSelectStrategy -{ - private final WorkerCategorySpec workerCategorySpec; - private final TaskLimits taskLimits; - - @JsonCreator - public EqualDistributionWithSupervisorCategorySpecWorkerSelectStrategy( - @JsonProperty("workerCategorySpec") WorkerCategorySpec workerCategorySpec, - @JsonProperty("taskLimits") @Nullable TaskLimits taskLimits - ) - { - this.workerCategorySpec = workerCategorySpec; - this.taskLimits = Configs.valueOrDefault(taskLimits, TaskLimits.EMPTY); - } - - @JsonProperty - public WorkerCategorySpec getWorkerCategorySpec() - { - return workerCategorySpec; - } - - @JsonProperty - public TaskLimits getTaskLimits() - { - return taskLimits; - } - - @Nullable - @Override - public ImmutableWorkerInfo findWorkerForTask( - final WorkerTaskRunnerConfig config, - final ImmutableMap zkWorkers, - final Task task - ) - { - return WorkerSelectUtils.selectWorkerBySupervisorCategorySpec( - task, - zkWorkers, - config, - workerCategorySpec, - EqualDistributionWorkerSelectStrategy::selectFromEligibleWorkers, - taskLimits - ); - } - - @Override - public boolean equals(final Object o) - { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - final EqualDistributionWithSupervisorCategorySpecWorkerSelectStrategy that = (EqualDistributionWithSupervisorCategorySpecWorkerSelectStrategy) o; - return Objects.equals(workerCategorySpec, that.workerCategorySpec) - && Objects.equals(taskLimits, that.taskLimits); - } - - @Override - public int hashCode() - { - return Objects.hash(workerCategorySpec, taskLimits); - } - - @Override - public String toString() - { - return "EqualDistributionWithSupervisorCategorySpecWorkerSelectStrategy{" + - "workerCategorySpec=" + workerCategorySpec + - ", taskLimits=" + taskLimits + - '}'; - } -} - - diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/setup/WorkerCategorySpec.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/setup/WorkerCategorySpec.java index 9f58d62d05a9..3a23242e609a 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/setup/WorkerCategorySpec.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/setup/WorkerCategorySpec.java @@ -88,15 +88,19 @@ public static class CategoryConfig private final String defaultCategory; // key: datasource, value: category private final Map categoryAffinity; + // key: supervisorId, value: category + private final Map supervisorIdCategoryAffinity; @JsonCreator public CategoryConfig( @JsonProperty("defaultCategory") String defaultCategory, - @JsonProperty("categoryAffinity") Map categoryAffinity + @JsonProperty("categoryAffinity") Map categoryAffinity, + @JsonProperty("supervisorIdCategoryAffinity") Map supervisorIdCategoryAffinity ) { this.defaultCategory = defaultCategory; this.categoryAffinity = categoryAffinity == null ? Collections.emptyMap() : categoryAffinity; + this.supervisorIdCategoryAffinity = supervisorIdCategoryAffinity == null ? Collections.emptyMap() : supervisorIdCategoryAffinity; } @JsonProperty @@ -111,6 +115,12 @@ public Map getCategoryAffinity() return categoryAffinity; } + @JsonProperty + public Map getSupervisorIdCategoryAffinity() + { + return supervisorIdCategoryAffinity; + } + @Override public boolean equals(final Object o) { @@ -122,13 +132,14 @@ public boolean equals(final Object o) } final CategoryConfig that = (CategoryConfig) o; return Objects.equals(defaultCategory, that.defaultCategory) && - Objects.equals(categoryAffinity, that.categoryAffinity); + Objects.equals(categoryAffinity, that.categoryAffinity) && + Objects.equals(supervisorIdCategoryAffinity, that.supervisorIdCategoryAffinity); } @Override public int hashCode() { - return Objects.hash(defaultCategory, categoryAffinity); + return Objects.hash(defaultCategory, categoryAffinity, supervisorIdCategoryAffinity); } @Override @@ -137,6 +148,7 @@ public String toString() return "CategoryConfig{" + "defaultCategory=" + defaultCategory + ", categoryAffinity=" + categoryAffinity + + ", supervisorIdCategoryAffinity=" + supervisorIdCategoryAffinity + '}'; } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/setup/WorkerSelectStrategy.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/setup/WorkerSelectStrategy.java index d0482d239a42..a3443ee73583 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/setup/WorkerSelectStrategy.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/setup/WorkerSelectStrategy.java @@ -40,8 +40,7 @@ @JsonSubTypes.Type(name = "equalDistributionWithAffinity", value = EqualDistributionWithAffinityWorkerSelectStrategy.class), @JsonSubTypes.Type(name = "javascript", value = JavaScriptWorkerSelectStrategy.class), @JsonSubTypes.Type(name = "fillCapacityWithCategorySpec", value = FillCapacityWithCategorySpecWorkerSelectStrategy.class), - @JsonSubTypes.Type(name = "equalDistributionWithCategorySpec", value = EqualDistributionWithCategorySpecWorkerSelectStrategy.class), - @JsonSubTypes.Type(name = "equalDistributionWithSupervisorCategorySpec", value = EqualDistributionWithSupervisorCategorySpecWorkerSelectStrategy.class) + @JsonSubTypes.Type(name = "equalDistributionWithCategorySpec", value = EqualDistributionWithCategorySpecWorkerSelectStrategy.class) }) @PublicApi public interface WorkerSelectStrategy diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/setup/WorkerSelectUtils.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/setup/WorkerSelectUtils.java index 5a41a82489a1..45945659e31e 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/setup/WorkerSelectUtils.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/setup/WorkerSelectUtils.java @@ -121,68 +121,25 @@ public static ImmutableWorkerInfo selectWorker( if (categoryConfig != null) { final String defaultCategory = categoryConfig.getDefaultCategory(); final Map categoryAffinity = categoryConfig.getCategoryAffinity(); + final Map supervisorIdCategoryAffinity = categoryConfig.getSupervisorIdCategoryAffinity(); - String preferredCategory = categoryAffinity.get(task.getDataSource()); - // If there is no preferred category for the datasource, then using the defaultCategory. However, the defaultCategory - // may be null too, so we need to do one more null check (see below). - preferredCategory = preferredCategory == null ? defaultCategory : preferredCategory; - - if (preferredCategory != null) { - // select worker from preferred category - final ImmutableMap categoryWorkers = getCategoryWorkers(preferredCategory, runnableWorkers); - final ImmutableWorkerInfo selected = workerSelector.apply(categoryWorkers); - - if (selected != null) { - return selected; - } else if (workerCategorySpec.isStrong()) { - return null; + String preferredCategory = null; + + // First, check if this task has a supervisorId and if there's a category affinity for it + if (task instanceof SeekableStreamIndexTask) { + final String supervisorId = ((SeekableStreamIndexTask) task).getSupervisorId(); + if (supervisorId != null) { + preferredCategory = supervisorIdCategoryAffinity.get(supervisorId); } } - } - } - - // select worker from all runnable workers by default - return workerSelector.apply(ImmutableMap.copyOf(runnableWorkers)); - } - - /** - * Helper for {@link WorkerSelectStrategy} implementations. - * - * @param allWorkers map of all workers, in the style provided to {@link WorkerSelectStrategy} - * @param workerCategorySpec worker category spec, or null - * @param workerSelector function that receives a list of eligible workers: version is high enough, worker can run - * the task, and worker satisfies the worker category spec. may return null. - * - * @return selected worker from "allWorkers", or null. - */ - @Nullable - public static ImmutableWorkerInfo selectWorkerBySupervisorCategorySpec( - final Task task, - final Map allWorkers, - final WorkerTaskRunnerConfig workerTaskRunnerConfig, - @Nullable final WorkerCategorySpec workerCategorySpec, - final Function, ImmutableWorkerInfo> workerSelector, - final TaskLimits taskLimits - ) - { - final Map runnableWorkers = getRunnableWorkers(task, allWorkers, workerTaskRunnerConfig, taskLimits); - - // select worker according to worker category spec, keyed by supervisorId - if (workerCategorySpec != null) { - final WorkerCategorySpec.CategoryConfig categoryConfig = workerCategorySpec.getCategoryMap().get(task.getType()); - - if (categoryConfig != null) { - final String defaultCategory = categoryConfig.getDefaultCategory(); - final Map categoryAffinity = categoryConfig.getCategoryAffinity(); - - String keyForAffinity; - if (task instanceof SeekableStreamIndexTask) { - keyForAffinity = ((SeekableStreamIndexTask) task).getSupervisorId(); - } else { - // fall back to datasource affinity if no supervisorId - keyForAffinity = task.getDataSource(); + + // If no supervisor-based category is found, fall back to datasource-based category affinity + if (preferredCategory == null) { + preferredCategory = categoryAffinity.get(task.getDataSource()); } - String preferredCategory = categoryAffinity.get(keyForAffinity); + + // If there is no preferred category for the supervisorId or datasource, then using the defaultCategory. + // However, the defaultCategory may be null too, so we need to do one more null check (see below). preferredCategory = preferredCategory == null ? defaultCategory : preferredCategory; if (preferredCategory != null) { diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/setup/EqualDistributionWithCategorySpecWorkerSelectStrategyTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/setup/EqualDistributionWithCategorySpecWorkerSelectStrategyTest.java index 6ac04a8920fc..49fab05c7138 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/setup/EqualDistributionWithCategorySpecWorkerSelectStrategyTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/setup/EqualDistributionWithCategorySpecWorkerSelectStrategyTest.java @@ -20,15 +20,33 @@ package org.apache.druid.indexing.overlord.setup; import com.google.common.collect.ImmutableMap; +import org.apache.druid.data.input.impl.ByteEntity; +import org.apache.druid.data.input.impl.DimensionsSpec; +import org.apache.druid.data.input.impl.TimestampSpec; +import org.apache.druid.indexer.granularity.ArbitraryGranularitySpec; +import org.apache.druid.indexing.common.TaskToolbox; import org.apache.druid.indexing.common.task.NoopTask; +import org.apache.druid.indexing.common.task.Task; +import org.apache.druid.indexing.common.task.TaskResource; import org.apache.druid.indexing.overlord.ImmutableWorkerInfo; import org.apache.druid.indexing.overlord.config.RemoteTaskRunnerConfig; +import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTask; +import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskIOConfig; +import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner; +import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskTuningConfig; +import org.apache.druid.indexing.seekablestream.common.RecordSupplier; import org.apache.druid.indexing.worker.Worker; import org.apache.druid.java.util.common.DateTimes; +import org.apache.druid.java.util.common.granularity.AllGranularity; +import org.apache.druid.segment.indexing.DataSchema; import org.junit.Assert; import org.junit.Test; +import org.mockito.Mockito; +import javax.annotation.Nullable; +import java.util.Collections; import java.util.HashSet; +import java.util.Map; public class EqualDistributionWithCategorySpecWorkerSelectStrategyTest { @@ -80,7 +98,8 @@ public void testFindWorkerForTaskWithPreferredTier() "noop", new WorkerCategorySpec.CategoryConfig( "c2", - ImmutableMap.of("ds1", "c2") + ImmutableMap.of("ds1", "c2"), + null ) ), false @@ -95,7 +114,8 @@ public void testFindWorkerForTaskWithPreferredTier() "noop", new WorkerCategorySpec.CategoryConfig( null, - ImmutableMap.of("ds1", "c2") + ImmutableMap.of("ds1", "c2"), + null ) ), false @@ -110,6 +130,7 @@ public void testFindWorkerForTaskWithPreferredTier() "noop", new WorkerCategorySpec.CategoryConfig( "c2", + null, null ) ), @@ -127,6 +148,7 @@ public void testFindWorkerForTaskWithNullPreferredTier() ImmutableMap.of( "noop", new WorkerCategorySpec.CategoryConfig( + null, null, null ) @@ -146,7 +168,8 @@ public void testWeakTierSpec() "noop", new WorkerCategorySpec.CategoryConfig( "c1", - ImmutableMap.of("ds1", "c3") + ImmutableMap.of("ds1", "c3"), + null ) ), false @@ -164,7 +187,8 @@ public void testStrongTierSpec() "noop", new WorkerCategorySpec.CategoryConfig( "c1", - ImmutableMap.of("ds1", "c3") + ImmutableMap.of("ds1", "c3"), + null ) ), true @@ -174,6 +198,108 @@ public void testStrongTierSpec() Assert.assertNull(worker); } + @Test + public void testSupervisorIdCategoryAffinity() + { + // Test that supervisor ID affinity takes precedence over datasource affinity + final WorkerCategorySpec workerCategorySpec = new WorkerCategorySpec( + ImmutableMap.of( + "test_seekable_stream", + new WorkerCategorySpec.CategoryConfig( + "c1", // default category + ImmutableMap.of("ds1", "c1"), // datasource affinity + ImmutableMap.of("supervisor1", "c2") // supervisor ID affinity + ) + ), + false + ); + + // Create a test task with supervisor ID "supervisor1" + final Task taskWithSupervisor = new TestSeekableStreamIndexTask("task1", "supervisor1", "ds1"); + + final EqualDistributionWithCategorySpecWorkerSelectStrategy strategy = + new EqualDistributionWithCategorySpecWorkerSelectStrategy(workerCategorySpec, null); + + ImmutableWorkerInfo worker = strategy.findWorkerForTask( + new RemoteTaskRunnerConfig(), + WORKERS_FOR_TIER_TESTS, + taskWithSupervisor + ); + + // Should select c2 worker (localhost3) because supervisor ID affinity takes precedence + Assert.assertNotNull(worker); + Assert.assertEquals("c2", worker.getWorker().getCategory()); + Assert.assertEquals("localhost3", worker.getWorker().getHost()); + } + + @Test + public void testSupervisorIdCategoryAffinityFallbackToDatasource() + { + // Test that it falls back to datasource affinity when supervisor ID affinity is not found + final WorkerCategorySpec workerCategorySpec = new WorkerCategorySpec( + ImmutableMap.of( + "test_seekable_stream", + new WorkerCategorySpec.CategoryConfig( + "c2", // default category + ImmutableMap.of("ds1", "c1"), // datasource affinity + ImmutableMap.of("supervisor2", "c2") // supervisor ID affinity (different supervisor) + ) + ), + false + ); + + // Create a test task with supervisor ID "supervisor1" (not in supervisorIdCategoryAffinity map) + final Task taskWithSupervisor = new TestSeekableStreamIndexTask("task1", "supervisor1", "ds1"); + + final EqualDistributionWithCategorySpecWorkerSelectStrategy strategy = + new EqualDistributionWithCategorySpecWorkerSelectStrategy(workerCategorySpec, null); + + ImmutableWorkerInfo worker = strategy.findWorkerForTask( + new RemoteTaskRunnerConfig(), + WORKERS_FOR_TIER_TESTS, + taskWithSupervisor + ); + + // Should fall back to datasource affinity and select c1 worker + Assert.assertNotNull(worker); + Assert.assertEquals("c1", worker.getWorker().getCategory()); + Assert.assertEquals("localhost1", worker.getWorker().getHost()); + } + + @Test + public void testSupervisorIdCategoryAffinityFallbackToDefault() + { + // Test that it falls back to default category when neither supervisor ID nor datasource affinity is found + final WorkerCategorySpec workerCategorySpec = new WorkerCategorySpec( + ImmutableMap.of( + "test_seekable_stream", + new WorkerCategorySpec.CategoryConfig( + "c2", // default category + ImmutableMap.of("ds2", "c1"), // datasource affinity (different datasource) + ImmutableMap.of("supervisor2", "c1") // supervisor ID affinity (different supervisor) + ) + ), + false + ); + + // Create a test task with supervisor ID "supervisor1" and datasource "ds1" + final Task taskWithSupervisor = new TestSeekableStreamIndexTask("task1", "supervisor1", "ds1"); + + final EqualDistributionWithCategorySpecWorkerSelectStrategy strategy = + new EqualDistributionWithCategorySpecWorkerSelectStrategy(workerCategorySpec, null); + + ImmutableWorkerInfo worker = strategy.findWorkerForTask( + new RemoteTaskRunnerConfig(), + WORKERS_FOR_TIER_TESTS, + taskWithSupervisor + ); + + // Should fall back to default category c2 + Assert.assertNotNull(worker); + Assert.assertEquals("c2", worker.getWorker().getCategory()); + Assert.assertEquals("localhost3", worker.getWorker().getHost()); + } + private ImmutableWorkerInfo selectWorker(WorkerCategorySpec workerCategorySpec) { final EqualDistributionWithCategorySpecWorkerSelectStrategy strategy = @@ -187,4 +313,65 @@ private ImmutableWorkerInfo selectWorker(WorkerCategorySpec workerCategorySpec) return worker; } + + /** + * Test implementation of SeekableStreamIndexTask for testing supervisor ID affinity + */ + private static class TestSeekableStreamIndexTask extends SeekableStreamIndexTask + { + TestSeekableStreamIndexTask( + String id, + @Nullable String supervisorId, + String datasource + ) + { + this( + id, + supervisorId, + null, + DataSchema.builder() + .withDataSource(datasource) + .withTimestamp(new TimestampSpec(null, null, null)) + .withDimensions(new DimensionsSpec(Collections.emptyList())) + .withGranularity(new ArbitraryGranularitySpec(new AllGranularity(), Collections.emptyList())) + .build(), + Mockito.mock(SeekableStreamIndexTaskTuningConfig.class), + Mockito.mock(SeekableStreamIndexTaskIOConfig.class), + null, + null + ); + } + + private TestSeekableStreamIndexTask( + String id, + @Nullable String supervisorId, + @Nullable TaskResource taskResource, + DataSchema dataSchema, + SeekableStreamIndexTaskTuningConfig tuningConfig, + SeekableStreamIndexTaskIOConfig ioConfig, + @Nullable Map context, + @Nullable String groupId + ) + { + super(id, supervisorId, taskResource, dataSchema, tuningConfig, ioConfig, context, groupId); + } + + @Override + protected SeekableStreamIndexTaskRunner createTaskRunner() + { + return null; + } + + @Override + protected RecordSupplier newTaskRecordSupplier(final TaskToolbox toolbox) + { + return null; + } + + @Override + public String getType() + { + return "test_seekable_stream"; + } + } } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/setup/EqualDistributionWithSupervisorCategorySpecWorkerSelectStrategyTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/setup/EqualDistributionWithSupervisorCategorySpecWorkerSelectStrategyTest.java deleted file mode 100644 index d47bcd215ac0..000000000000 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/setup/EqualDistributionWithSupervisorCategorySpecWorkerSelectStrategyTest.java +++ /dev/null @@ -1,245 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.druid.indexing.overlord.setup; - -import com.google.common.collect.ImmutableMap; -import org.apache.druid.data.input.impl.ByteEntity; -import org.apache.druid.data.input.impl.DimensionsSpec; -import org.apache.druid.data.input.impl.TimestampSpec; -import org.apache.druid.indexer.granularity.ArbitraryGranularitySpec; -import org.apache.druid.indexing.common.TaskToolbox; -import org.apache.druid.indexing.common.task.AbstractTask; -import org.apache.druid.indexing.common.task.NoopTask; -import org.apache.druid.indexing.common.task.TaskResource; -import org.apache.druid.indexing.overlord.ImmutableWorkerInfo; -import org.apache.druid.indexing.overlord.config.RemoteTaskRunnerConfig; -import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTask; -import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskIOConfig; -import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner; -import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskTuningConfig; -import org.apache.druid.indexing.seekablestream.common.RecordSupplier; -import org.apache.druid.indexing.worker.Worker; -import org.apache.druid.java.util.common.DateTimes; -import org.apache.druid.java.util.common.granularity.AllGranularity; -import org.apache.druid.segment.indexing.DataSchema; -import org.junit.Assert; -import org.junit.Test; -import org.mockito.Mockito; - -import javax.annotation.Nullable; -import java.util.Collections; -import java.util.HashSet; -import java.util.Map; - - -public class EqualDistributionWithSupervisorCategorySpecWorkerSelectStrategyTest -{ - private static final ImmutableMap WORKERS_FOR_TIER_TESTS = - ImmutableMap.of( - "localhost0", - new ImmutableWorkerInfo( - new Worker("http", "localhost0", "localhost0", 1, "v1", "c1"), 0, - new HashSet<>(), - new HashSet<>(), - DateTimes.nowUtc() - ), - "localhost1", - new ImmutableWorkerInfo( - new Worker("http", "localhost1", "localhost1", 2, "v1", "c1"), 0, - new HashSet<>(), - new HashSet<>(), - DateTimes.nowUtc() - ), - "localhost2", - new ImmutableWorkerInfo( - new Worker("http", "localhost2", "localhost2", 3, "v1", "c2"), 0, - new HashSet<>(), - new HashSet<>(), - DateTimes.nowUtc() - ), - "localhost3", - new ImmutableWorkerInfo( - new Worker("http", "localhost3", "localhost3", 4, "v1", "c2"), 0, - new HashSet<>(), - new HashSet<>(), - DateTimes.nowUtc() - ) - ); - - @Test - public void testFindWorkerForTaskWithNullWorkerTierSpec() - { - ImmutableWorkerInfo worker = selectWorker(null, new TestStreamingTask("id1", "sup-1", "ds1", null, null)); - Assert.assertEquals("localhost3", worker.getWorker().getHost()); - } - - @Test - public void testFindWorkerForTaskWithPreferredTierBySupervisor() - { - final WorkerCategorySpec workerCategorySpec = new WorkerCategorySpec( - ImmutableMap.of( - "noop", - new WorkerCategorySpec.CategoryConfig( - "c1", - ImmutableMap.of("sup-2", "c2") - ) - ), - false - ); - - // supervisor sup-2 prefers c2, which should pick highest-capacity in c2 -> localhost3 - ImmutableWorkerInfo worker1 = selectWorker(workerCategorySpec, new TestStreamingTask("id2", "sup-2", "ds1", null, null)); - Assert.assertEquals("localhost3", worker1.getWorker().getHost()); - - // not specified, defaultCategory c1 -> pick highest in c1 -> localhost1 - ImmutableWorkerInfo worker2 = selectWorker(workerCategorySpec, new TestStreamingTask("id3", "sup-1", "ds1", null, null)); - Assert.assertEquals("localhost1", worker2.getWorker().getHost()); - } - - @Test - public void testWeakTierSpecFallsBack() - { - final WorkerCategorySpec workerCategorySpec = new WorkerCategorySpec( - ImmutableMap.of( - "noop", - new WorkerCategorySpec.CategoryConfig( - "c1", - ImmutableMap.of("sup-x", "c3") - ) - ), - false - ); - - // preferred category c3 doesn't exist; weak spec -> choose from all runnable (highest capacity overall -> localhost3) - ImmutableWorkerInfo worker = selectWorker(workerCategorySpec, new TestStreamingTask("id4", "sup-x", "ds1", null, null)); - Assert.assertEquals("localhost3", worker.getWorker().getHost()); - } - - @Test - public void testStrongTierSpecReturnsNullIfUnavailable() - { - final WorkerCategorySpec workerCategorySpec = new WorkerCategorySpec( - ImmutableMap.of( - "noop", - new WorkerCategorySpec.CategoryConfig( - "c1", - ImmutableMap.of("sup-x", "c3") - ) - ), - true - ); - - ImmutableWorkerInfo worker = selectWorker(workerCategorySpec, new TestStreamingTask("id5", "sup-x", "ds1", null, null)); - Assert.assertNull(worker); - } - - @Test - public void testNonSeekableTaskFallsBackToDatasource() - { - final WorkerCategorySpec workerCategorySpec = new WorkerCategorySpec( - ImmutableMap.of( - "noop", - new WorkerCategorySpec.CategoryConfig( - "c1", - ImmutableMap.of("ds1", "c2") - ) - ), - false - ); - - // No supervisorId available -> uses datasource mapping to c2 -> highest in c2 -> localhost3 - ImmutableWorkerInfo worker = selectWorker(workerCategorySpec, NoopTask.forDatasource("ds1")); - Assert.assertEquals("localhost3", worker.getWorker().getHost()); - } - - private ImmutableWorkerInfo selectWorker(WorkerCategorySpec workerCategorySpec, AbstractTask task) - { - final EqualDistributionWithSupervisorCategorySpecWorkerSelectStrategy strategy = - new EqualDistributionWithSupervisorCategorySpecWorkerSelectStrategy(workerCategorySpec, null); - - return strategy.findWorkerForTask( - new RemoteTaskRunnerConfig(), - WORKERS_FOR_TIER_TESTS, - task - ); - } - - private static class TestStreamingTask extends SeekableStreamIndexTask - { - TestStreamingTask( - String id, - @Nullable String supervisorId, - String datasource, - @Nullable Map context, - @Nullable String groupId - ) - { - this( - id, - supervisorId, - null, - DataSchema.builder() - .withDataSource(datasource) - .withTimestamp(new TimestampSpec(null, null, null)) - .withDimensions(new DimensionsSpec(Collections.emptyList())) - .withGranularity(new ArbitraryGranularitySpec(new AllGranularity(), Collections.emptyList())) - .build(), - Mockito.mock(SeekableStreamIndexTaskTuningConfig.class), - Mockito.mock(SeekableStreamIndexTaskIOConfig.class), - context, - groupId - ); - } - - private TestStreamingTask( - String id, - @Nullable String supervisorId, - @Nullable TaskResource taskResource, - DataSchema dataSchema, - SeekableStreamIndexTaskTuningConfig tuningConfig, - SeekableStreamIndexTaskIOConfig ioConfig, - @Nullable Map context, - @Nullable String groupId - ) - { - super(id, supervisorId, taskResource, dataSchema, tuningConfig, ioConfig, context, groupId); - } - - @Override - protected SeekableStreamIndexTaskRunner createTaskRunner() - { - return null; - } - - @Override - protected RecordSupplier newTaskRecordSupplier(final TaskToolbox toolbox) - { - return null; - } - - @Override - public String getType() - { - return "noop"; - } - } -} - - diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/setup/FillCapacityWithCategorySpecWorkerSelectStrategyTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/setup/FillCapacityWithCategorySpecWorkerSelectStrategyTest.java index 880ef743dca6..c8ed9e139303 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/setup/FillCapacityWithCategorySpecWorkerSelectStrategyTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/setup/FillCapacityWithCategorySpecWorkerSelectStrategyTest.java @@ -80,7 +80,8 @@ public void testFindWorkerForTaskWithPreferredTier() "noop", new WorkerCategorySpec.CategoryConfig( "c1", - ImmutableMap.of("ds1", "c1") + ImmutableMap.of("ds1", "c1"), + null ) ), false @@ -95,7 +96,8 @@ public void testFindWorkerForTaskWithPreferredTier() "noop", new WorkerCategorySpec.CategoryConfig( null, - ImmutableMap.of("ds1", "c1") + ImmutableMap.of("ds1", "c1"), + null ) ), false @@ -110,6 +112,7 @@ public void testFindWorkerForTaskWithPreferredTier() "noop", new WorkerCategorySpec.CategoryConfig( "c1", + null, null ) ), @@ -127,6 +130,7 @@ public void testFindWorkerForTaskWithNullPreferredTier() ImmutableMap.of( "noop", new WorkerCategorySpec.CategoryConfig( + null, null, null ) @@ -146,7 +150,8 @@ public void testWeakTierSpec() "noop", new WorkerCategorySpec.CategoryConfig( "c1", - ImmutableMap.of("ds1", "c3") + ImmutableMap.of("ds1", "c3"), + null ) ), false @@ -164,7 +169,8 @@ public void testStrongTierSpec() "noop", new WorkerCategorySpec.CategoryConfig( "c1", - ImmutableMap.of("ds1", "c3") + ImmutableMap.of("ds1", "c3"), + null ) ), true diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/setup/WorkerCategorySpecTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/setup/WorkerCategorySpecTest.java index 4277984fc10a..9576aa0ffbbd 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/setup/WorkerCategorySpecTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/setup/WorkerCategorySpecTest.java @@ -58,7 +58,7 @@ public void testSerde() throws Exception Assert.assertTrue(workerCategorySpec.isStrong()); Assert.assertEquals(ImmutableMap.of( "index_kafka", - new WorkerCategorySpec.CategoryConfig("c1", ImmutableMap.of("ds1", "c2")) + new WorkerCategorySpec.CategoryConfig("c1", ImmutableMap.of("ds1", "c2"), null) ), workerCategorySpec.getCategoryMap()); } } From f477482cc92b67ae77f2931ad9c47438aaa6a73f Mon Sep 17 00:00:00 2001 From: Pankaj Kumar Date: Fri, 24 Oct 2025 09:58:36 +0530 Subject: [PATCH 5/8] addressign review comments --- ...hCategorySpecWorkerSelectStrategyTest.java | 93 ++++-------- ...hCategorySpecWorkerSelectStrategyTest.java | 138 ++++++++++++++++++ .../TestSeekableStreamIndexTask.java | 93 ++++++++++++ .../SeekableStreamSupervisorStateTest.java | 112 ++++---------- 4 files changed, 288 insertions(+), 148 deletions(-) create mode 100644 indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/TestSeekableStreamIndexTask.java diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/setup/EqualDistributionWithCategorySpecWorkerSelectStrategyTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/setup/EqualDistributionWithCategorySpecWorkerSelectStrategyTest.java index 49fab05c7138..a390248b5d42 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/setup/EqualDistributionWithCategorySpecWorkerSelectStrategyTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/setup/EqualDistributionWithCategorySpecWorkerSelectStrategyTest.java @@ -20,21 +20,14 @@ package org.apache.druid.indexing.overlord.setup; import com.google.common.collect.ImmutableMap; -import org.apache.druid.data.input.impl.ByteEntity; import org.apache.druid.data.input.impl.DimensionsSpec; import org.apache.druid.data.input.impl.TimestampSpec; import org.apache.druid.indexer.granularity.ArbitraryGranularitySpec; -import org.apache.druid.indexing.common.TaskToolbox; import org.apache.druid.indexing.common.task.NoopTask; import org.apache.druid.indexing.common.task.Task; -import org.apache.druid.indexing.common.task.TaskResource; import org.apache.druid.indexing.overlord.ImmutableWorkerInfo; import org.apache.druid.indexing.overlord.config.RemoteTaskRunnerConfig; -import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTask; -import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskIOConfig; -import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner; -import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskTuningConfig; -import org.apache.druid.indexing.seekablestream.common.RecordSupplier; +import org.apache.druid.indexing.seekablestream.TestSeekableStreamIndexTask; import org.apache.druid.indexing.worker.Worker; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.granularity.AllGranularity; @@ -46,7 +39,6 @@ import javax.annotation.Nullable; import java.util.Collections; import java.util.HashSet; -import java.util.Map; public class EqualDistributionWithCategorySpecWorkerSelectStrategyTest { @@ -215,7 +207,7 @@ public void testSupervisorIdCategoryAffinity() ); // Create a test task with supervisor ID "supervisor1" - final Task taskWithSupervisor = new TestSeekableStreamIndexTask("task1", "supervisor1", "ds1"); + final Task taskWithSupervisor = createTestTask("task1", "supervisor1", "ds1"); final EqualDistributionWithCategorySpecWorkerSelectStrategy strategy = new EqualDistributionWithCategorySpecWorkerSelectStrategy(workerCategorySpec, null); @@ -249,7 +241,7 @@ public void testSupervisorIdCategoryAffinityFallbackToDatasource() ); // Create a test task with supervisor ID "supervisor1" (not in supervisorIdCategoryAffinity map) - final Task taskWithSupervisor = new TestSeekableStreamIndexTask("task1", "supervisor1", "ds1"); + final Task taskWithSupervisor = createTestTask("task1", "supervisor1", "ds1"); final EqualDistributionWithCategorySpecWorkerSelectStrategy strategy = new EqualDistributionWithCategorySpecWorkerSelectStrategy(workerCategorySpec, null); @@ -283,7 +275,7 @@ public void testSupervisorIdCategoryAffinityFallbackToDefault() ); // Create a test task with supervisor ID "supervisor1" and datasource "ds1" - final Task taskWithSupervisor = new TestSeekableStreamIndexTask("task1", "supervisor1", "ds1"); + final Task taskWithSupervisor = createTestTask("task1", "supervisor1", "ds1"); final EqualDistributionWithCategorySpecWorkerSelectStrategy strategy = new EqualDistributionWithCategorySpecWorkerSelectStrategy(workerCategorySpec, null); @@ -315,63 +307,28 @@ private ImmutableWorkerInfo selectWorker(WorkerCategorySpec workerCategorySpec) } /** - * Test implementation of SeekableStreamIndexTask for testing supervisor ID affinity + * Helper method to create a test task with supervisor ID for testing */ - private static class TestSeekableStreamIndexTask extends SeekableStreamIndexTask + @SuppressWarnings("unchecked") + private static Task createTestTask(String id, @Nullable String supervisorId, String datasource) { - TestSeekableStreamIndexTask( - String id, - @Nullable String supervisorId, - String datasource - ) - { - this( - id, - supervisorId, - null, - DataSchema.builder() - .withDataSource(datasource) - .withTimestamp(new TimestampSpec(null, null, null)) - .withDimensions(new DimensionsSpec(Collections.emptyList())) - .withGranularity(new ArbitraryGranularitySpec(new AllGranularity(), Collections.emptyList())) - .build(), - Mockito.mock(SeekableStreamIndexTaskTuningConfig.class), - Mockito.mock(SeekableStreamIndexTaskIOConfig.class), - null, - null - ); - } - - private TestSeekableStreamIndexTask( - String id, - @Nullable String supervisorId, - @Nullable TaskResource taskResource, - DataSchema dataSchema, - SeekableStreamIndexTaskTuningConfig tuningConfig, - SeekableStreamIndexTaskIOConfig ioConfig, - @Nullable Map context, - @Nullable String groupId - ) - { - super(id, supervisorId, taskResource, dataSchema, tuningConfig, ioConfig, context, groupId); - } - - @Override - protected SeekableStreamIndexTaskRunner createTaskRunner() - { - return null; - } - - @Override - protected RecordSupplier newTaskRecordSupplier(final TaskToolbox toolbox) - { - return null; - } - - @Override - public String getType() - { - return "test_seekable_stream"; - } + return new TestSeekableStreamIndexTask( + id, + supervisorId, + null, + DataSchema.builder() + .withDataSource(datasource) + .withTimestamp(new TimestampSpec(null, null, null)) + .withDimensions(new DimensionsSpec(Collections.emptyList())) + .withGranularity(new ArbitraryGranularitySpec(new AllGranularity(), Collections.emptyList())) + .build(), + Mockito.mock(org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskTuningConfig.class), + Mockito.mock(org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskIOConfig.class), + null, + null, + null, + null, + "test_seekable_stream" + ); } } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/setup/FillCapacityWithCategorySpecWorkerSelectStrategyTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/setup/FillCapacityWithCategorySpecWorkerSelectStrategyTest.java index c8ed9e139303..505a3639e157 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/setup/FillCapacityWithCategorySpecWorkerSelectStrategyTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/setup/FillCapacityWithCategorySpecWorkerSelectStrategyTest.java @@ -20,14 +20,24 @@ package org.apache.druid.indexing.overlord.setup; import com.google.common.collect.ImmutableMap; +import org.apache.druid.data.input.impl.DimensionsSpec; +import org.apache.druid.data.input.impl.TimestampSpec; +import org.apache.druid.indexer.granularity.ArbitraryGranularitySpec; import org.apache.druid.indexing.common.task.NoopTask; +import org.apache.druid.indexing.common.task.Task; import org.apache.druid.indexing.overlord.ImmutableWorkerInfo; import org.apache.druid.indexing.overlord.config.RemoteTaskRunnerConfig; +import org.apache.druid.indexing.seekablestream.TestSeekableStreamIndexTask; import org.apache.druid.indexing.worker.Worker; import org.apache.druid.java.util.common.DateTimes; +import org.apache.druid.java.util.common.granularity.AllGranularity; +import org.apache.druid.segment.indexing.DataSchema; import org.junit.Assert; import org.junit.Test; +import org.mockito.Mockito; +import javax.annotation.Nullable; +import java.util.Collections; import java.util.HashSet; public class FillCapacityWithCategorySpecWorkerSelectStrategyTest @@ -180,6 +190,108 @@ public void testStrongTierSpec() Assert.assertNull(worker); } + @Test + public void testSupervisorIdCategoryAffinity() + { + // Test that supervisor ID affinity takes precedence over datasource affinity + final WorkerCategorySpec workerCategorySpec = new WorkerCategorySpec( + ImmutableMap.of( + "test_seekable_stream", + new WorkerCategorySpec.CategoryConfig( + "c1", // default category + ImmutableMap.of("ds1", "c1"), // datasource affinity + ImmutableMap.of("supervisor1", "c2") // supervisor ID affinity + ) + ), + false + ); + + // Create a test task with supervisor ID "supervisor1" + final Task taskWithSupervisor = createTestTask("task1", "supervisor1", "ds1"); + + final FillCapacityWithCategorySpecWorkerSelectStrategy strategy = + new FillCapacityWithCategorySpecWorkerSelectStrategy(workerCategorySpec, null); + + ImmutableWorkerInfo worker = strategy.findWorkerForTask( + new RemoteTaskRunnerConfig(), + WORKERS_FOR_TIER_TESTS, + taskWithSupervisor + ); + + // Should select c2 worker (localhost3) because supervisor ID affinity takes precedence + Assert.assertNotNull(worker); + Assert.assertEquals("c2", worker.getWorker().getCategory()); + Assert.assertEquals("localhost3", worker.getWorker().getHost()); + } + + @Test + public void testSupervisorIdCategoryAffinityFallbackToDatasource() + { + // Test that it falls back to datasource affinity when supervisor ID affinity is not found + final WorkerCategorySpec workerCategorySpec = new WorkerCategorySpec( + ImmutableMap.of( + "test_seekable_stream", + new WorkerCategorySpec.CategoryConfig( + "c2", // default category + ImmutableMap.of("ds1", "c1"), // datasource affinity + ImmutableMap.of("supervisor2", "c2") // supervisor ID affinity (different supervisor) + ) + ), + false + ); + + // Create a test task with supervisor ID "supervisor1" (not in supervisorIdCategoryAffinity map) + final Task taskWithSupervisor = createTestTask("task1", "supervisor1", "ds1"); + + final FillCapacityWithCategorySpecWorkerSelectStrategy strategy = + new FillCapacityWithCategorySpecWorkerSelectStrategy(workerCategorySpec, null); + + ImmutableWorkerInfo worker = strategy.findWorkerForTask( + new RemoteTaskRunnerConfig(), + WORKERS_FOR_TIER_TESTS, + taskWithSupervisor + ); + + // Should fall back to datasource affinity and select c1 worker + Assert.assertNotNull(worker); + Assert.assertEquals("c1", worker.getWorker().getCategory()); + Assert.assertEquals("localhost1", worker.getWorker().getHost()); + } + + @Test + public void testSupervisorIdCategoryAffinityFallbackToDefault() + { + // Test that it falls back to default category when neither supervisor ID nor datasource affinity is found + final WorkerCategorySpec workerCategorySpec = new WorkerCategorySpec( + ImmutableMap.of( + "test_seekable_stream", + new WorkerCategorySpec.CategoryConfig( + "c2", // default category + ImmutableMap.of("ds2", "c1"), // datasource affinity (different datasource) + ImmutableMap.of("supervisor2", "c1") // supervisor ID affinity (different supervisor) + ) + ), + false + ); + + // Create a test task with supervisor ID "supervisor1" and datasource "ds1" + final Task taskWithSupervisor = createTestTask("task1", "supervisor1", "ds1"); + + final FillCapacityWithCategorySpecWorkerSelectStrategy strategy = + new FillCapacityWithCategorySpecWorkerSelectStrategy(workerCategorySpec, null); + + ImmutableWorkerInfo worker = strategy.findWorkerForTask( + new RemoteTaskRunnerConfig(), + WORKERS_FOR_TIER_TESTS, + taskWithSupervisor + ); + + // Should fall back to default category c2 + Assert.assertNotNull(worker); + Assert.assertEquals("c2", worker.getWorker().getCategory()); + Assert.assertEquals("localhost3", worker.getWorker().getHost()); + } + private ImmutableWorkerInfo selectWorker(WorkerCategorySpec workerCategorySpec) { final FillCapacityWithCategorySpecWorkerSelectStrategy strategy = @@ -193,4 +305,30 @@ private ImmutableWorkerInfo selectWorker(WorkerCategorySpec workerCategorySpec) return worker; } + + /** + * Helper method to create a test task with supervisor ID for testing + */ + @SuppressWarnings("unchecked") + private static Task createTestTask(String id, @Nullable String supervisorId, String datasource) + { + return new TestSeekableStreamIndexTask( + id, + supervisorId, + null, + DataSchema.builder() + .withDataSource(datasource) + .withTimestamp(new TimestampSpec(null, null, null)) + .withDimensions(new DimensionsSpec(Collections.emptyList())) + .withGranularity(new ArbitraryGranularitySpec(new AllGranularity(), Collections.emptyList())) + .build(), + Mockito.mock(org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskTuningConfig.class), + Mockito.mock(org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskIOConfig.class), + null, + null, + null, + null, + "test_seekable_stream" + ); + } } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/TestSeekableStreamIndexTask.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/TestSeekableStreamIndexTask.java new file mode 100644 index 000000000000..2e64528501fe --- /dev/null +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/TestSeekableStreamIndexTask.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.seekablestream; + +import org.apache.druid.data.input.impl.ByteEntity; +import org.apache.druid.indexing.common.TaskToolbox; +import org.apache.druid.indexing.common.task.TaskResource; +import org.apache.druid.indexing.seekablestream.common.RecordSupplier; +import org.apache.druid.segment.indexing.DataSchema; + +import javax.annotation.Nullable; +import java.util.Map; + +/** + * Test implementation of SeekableStreamIndexTask for use in unit tests. + */ +public class TestSeekableStreamIndexTask extends SeekableStreamIndexTask +{ + private final SeekableStreamIndexTaskRunner streamingTaskRunner; + private final RecordSupplier recordSupplier; + private final String taskType; + + public TestSeekableStreamIndexTask( + String id, + @Nullable String supervisorId, + @Nullable TaskResource taskResource, + DataSchema dataSchema, + SeekableStreamIndexTaskTuningConfig tuningConfig, + SeekableStreamIndexTaskIOConfig ioConfig, + @Nullable Map context, + @Nullable String groupId + ) + { + this(id, supervisorId, taskResource, dataSchema, tuningConfig, ioConfig, context, groupId, null, null, "test"); + } + + public TestSeekableStreamIndexTask( + String id, + @Nullable String supervisorId, + @Nullable TaskResource taskResource, + DataSchema dataSchema, + SeekableStreamIndexTaskTuningConfig tuningConfig, + SeekableStreamIndexTaskIOConfig ioConfig, + @Nullable Map context, + @Nullable String groupId, + @Nullable SeekableStreamIndexTaskRunner streamingTaskRunner, + @Nullable RecordSupplier recordSupplier, + String taskType + ) + { + super(id, supervisorId, taskResource, dataSchema, tuningConfig, ioConfig, context, groupId); + this.streamingTaskRunner = streamingTaskRunner; + this.recordSupplier = recordSupplier; + this.taskType = taskType; + } + + @Nullable + @Override + protected SeekableStreamIndexTaskRunner createTaskRunner() + { + return streamingTaskRunner; + } + + @Override + protected RecordSupplier newTaskRecordSupplier(final TaskToolbox toolbox) + { + return recordSupplier; + } + + @Override + public String getType() + { + return taskType; + } +} + diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java index e204a67cae84..705a52143b98 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java @@ -63,6 +63,7 @@ import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskIOConfig; import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner; import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskTuningConfig; +import org.apache.druid.indexing.seekablestream.TestSeekableStreamIndexTask; import org.apache.druid.indexing.seekablestream.SeekableStreamSequenceNumbers; import org.apache.druid.indexing.seekablestream.SeekableStreamStartSequenceNumbers; import org.apache.druid.indexing.seekablestream.common.OrderedSequenceNumber; @@ -1151,7 +1152,10 @@ public Duration getEmissionDuration() taskTuningConfig, taskIoConfig, context, - "0" + "0", + null, + recordSupplier, + "test" ); TestSeekableStreamIndexTask id2 = new TestSeekableStreamIndexTask( @@ -1162,7 +1166,10 @@ public Duration getEmissionDuration() taskTuningConfig, taskIoConfig, context, - "0" + "0", + null, + recordSupplier, + "test" ); TestSeekableStreamIndexTask id3 = new TestSeekableStreamIndexTask( @@ -1173,7 +1180,10 @@ public Duration getEmissionDuration() taskTuningConfig, taskIoConfig, context, - "0" + "0", + null, + recordSupplier, + "test" ); final TaskLocation location1 = TaskLocation.create("testHost", 1234, -1); @@ -1364,7 +1374,10 @@ public Duration getEmissionDuration() ioConfig ), context, - "0" + "0", + null, + recordSupplier, + "test" ); TestSeekableStreamIndexTask id2 = new TestSeekableStreamIndexTask( @@ -1384,7 +1397,10 @@ public Duration getEmissionDuration() ioConfig ), context, - "1" + "1", + null, + recordSupplier, + "test" ); TestSeekableStreamIndexTask id3 = new TestSeekableStreamIndexTask( @@ -1404,7 +1420,10 @@ public Duration getEmissionDuration() ioConfig ), context, - "2" + "2", + null, + recordSupplier, + "test" ); final TaskLocation location1 = TaskLocation.create("testHost", 1234, -1); @@ -1596,7 +1615,9 @@ public Duration getEmissionDuration() ), context, "0", - streamingTaskRunner + streamingTaskRunner, + recordSupplier, + "test" ); final TaskLocation location1 = TaskLocation.create("testHost", 1234, -1); @@ -2907,78 +2928,6 @@ public String toString() }; } - private class TestSeekableStreamIndexTask extends SeekableStreamIndexTask - { - private final SeekableStreamIndexTaskRunner streamingTaskRunner; - - public TestSeekableStreamIndexTask( - String id, - @Nullable String supervisorId, - @Nullable TaskResource taskResource, - DataSchema dataSchema, - SeekableStreamIndexTaskTuningConfig tuningConfig, - SeekableStreamIndexTaskIOConfig ioConfig, - @Nullable Map context, - @Nullable String groupId - ) - { - this( - id, - supervisorId, - taskResource, - dataSchema, - tuningConfig, - ioConfig, - context, - groupId, - null - ); - } - - public TestSeekableStreamIndexTask( - String id, - @Nullable String supervisorId, - @Nullable TaskResource taskResource, - DataSchema dataSchema, - SeekableStreamIndexTaskTuningConfig tuningConfig, - SeekableStreamIndexTaskIOConfig ioConfig, - @Nullable Map context, - @Nullable String groupId, - @Nullable SeekableStreamIndexTaskRunner streamingTaskRunner - ) - { - super( - id, - supervisorId, - taskResource, - dataSchema, - tuningConfig, - ioConfig, - context, - groupId - ); - this.streamingTaskRunner = streamingTaskRunner; - } - - @Nullable - @Override - protected SeekableStreamIndexTaskRunner createTaskRunner() - { - return streamingTaskRunner; - } - - @Override - protected RecordSupplier newTaskRecordSupplier(final TaskToolbox toolbox) - { - return recordSupplier; - } - - @Override - public String getType() - { - return "test"; - } - } private abstract class BaseTestSeekableStreamSupervisor extends SeekableStreamSupervisor { @@ -3069,7 +3018,10 @@ protected List> createIndexT taskTuningConfig, taskIoConfig, null, - null + null, + null, + recordSupplier, + "test" )); } From 41f2280985e39b9620cf19b2ed497b0a6548ca39 Mon Sep 17 00:00:00 2001 From: Pankaj Kumar Date: Fri, 24 Oct 2025 10:27:20 +0530 Subject: [PATCH 6/8] Address review comments --- ...hCategorySpecWorkerSelectStrategyTest.java | 11 ++++---- ...hCategorySpecWorkerSelectStrategyTest.java | 11 ++++---- .../TestSeekableStreamIndexTask.java | 9 ++---- .../SeekableStreamSupervisorStateTest.java | 28 ++++++------------- 4 files changed, 22 insertions(+), 37 deletions(-) diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/setup/EqualDistributionWithCategorySpecWorkerSelectStrategyTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/setup/EqualDistributionWithCategorySpecWorkerSelectStrategyTest.java index a390248b5d42..994ea3abb9e8 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/setup/EqualDistributionWithCategorySpecWorkerSelectStrategyTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/setup/EqualDistributionWithCategorySpecWorkerSelectStrategyTest.java @@ -27,6 +27,8 @@ import org.apache.druid.indexing.common.task.Task; import org.apache.druid.indexing.overlord.ImmutableWorkerInfo; import org.apache.druid.indexing.overlord.config.RemoteTaskRunnerConfig; +import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskIOConfig; +import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskTuningConfig; import org.apache.druid.indexing.seekablestream.TestSeekableStreamIndexTask; import org.apache.druid.indexing.worker.Worker; import org.apache.druid.java.util.common.DateTimes; @@ -322,13 +324,10 @@ private static Task createTestTask(String id, @Nullable String supervisorId, Str .withDimensions(new DimensionsSpec(Collections.emptyList())) .withGranularity(new ArbitraryGranularitySpec(new AllGranularity(), Collections.emptyList())) .build(), - Mockito.mock(org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskTuningConfig.class), - Mockito.mock(org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskIOConfig.class), + Mockito.mock(SeekableStreamIndexTaskTuningConfig.class), + Mockito.mock(SeekableStreamIndexTaskIOConfig.class), null, - null, - null, - null, - "test_seekable_stream" + null ); } } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/setup/FillCapacityWithCategorySpecWorkerSelectStrategyTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/setup/FillCapacityWithCategorySpecWorkerSelectStrategyTest.java index 505a3639e157..0731070577a6 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/setup/FillCapacityWithCategorySpecWorkerSelectStrategyTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/setup/FillCapacityWithCategorySpecWorkerSelectStrategyTest.java @@ -27,6 +27,8 @@ import org.apache.druid.indexing.common.task.Task; import org.apache.druid.indexing.overlord.ImmutableWorkerInfo; import org.apache.druid.indexing.overlord.config.RemoteTaskRunnerConfig; +import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskIOConfig; +import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskTuningConfig; import org.apache.druid.indexing.seekablestream.TestSeekableStreamIndexTask; import org.apache.druid.indexing.worker.Worker; import org.apache.druid.java.util.common.DateTimes; @@ -322,13 +324,10 @@ private static Task createTestTask(String id, @Nullable String supervisorId, Str .withDimensions(new DimensionsSpec(Collections.emptyList())) .withGranularity(new ArbitraryGranularitySpec(new AllGranularity(), Collections.emptyList())) .build(), - Mockito.mock(org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskTuningConfig.class), - Mockito.mock(org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskIOConfig.class), + Mockito.mock(SeekableStreamIndexTaskTuningConfig.class), + Mockito.mock(SeekableStreamIndexTaskIOConfig.class), null, - null, - null, - null, - "test_seekable_stream" + null ); } } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/TestSeekableStreamIndexTask.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/TestSeekableStreamIndexTask.java index 2e64528501fe..985aa7da706a 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/TestSeekableStreamIndexTask.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/TestSeekableStreamIndexTask.java @@ -35,7 +35,6 @@ public class TestSeekableStreamIndexTask extends SeekableStreamIndexTask streamingTaskRunner; private final RecordSupplier recordSupplier; - private final String taskType; public TestSeekableStreamIndexTask( String id, @@ -48,7 +47,7 @@ public TestSeekableStreamIndexTask( @Nullable String groupId ) { - this(id, supervisorId, taskResource, dataSchema, tuningConfig, ioConfig, context, groupId, null, null, "test"); + this(id, supervisorId, taskResource, dataSchema, tuningConfig, ioConfig, context, groupId, null, null); } public TestSeekableStreamIndexTask( @@ -61,14 +60,12 @@ public TestSeekableStreamIndexTask( @Nullable Map context, @Nullable String groupId, @Nullable SeekableStreamIndexTaskRunner streamingTaskRunner, - @Nullable RecordSupplier recordSupplier, - String taskType + @Nullable RecordSupplier recordSupplier ) { super(id, supervisorId, taskResource, dataSchema, tuningConfig, ioConfig, context, groupId); this.streamingTaskRunner = streamingTaskRunner; this.recordSupplier = recordSupplier; - this.taskType = taskType; } @Nullable @@ -87,7 +84,7 @@ protected RecordSupplier newTaskRecordSupplier(final @Override public String getType() { - return taskType; + return "test_seekable_stream"; } } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java index 705a52143b98..5df9edd184d2 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java @@ -39,10 +39,8 @@ import org.apache.druid.indexer.TaskLocation; import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexer.granularity.UniformGranularitySpec; -import org.apache.druid.indexing.common.TaskToolbox; import org.apache.druid.indexing.common.TestUtils; import org.apache.druid.indexing.common.task.Task; -import org.apache.druid.indexing.common.task.TaskResource; import org.apache.druid.indexing.overlord.DataSourceMetadata; import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator; import org.apache.druid.indexing.overlord.TaskMaster; @@ -63,9 +61,9 @@ import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskIOConfig; import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner; import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskTuningConfig; -import org.apache.druid.indexing.seekablestream.TestSeekableStreamIndexTask; import org.apache.druid.indexing.seekablestream.SeekableStreamSequenceNumbers; import org.apache.druid.indexing.seekablestream.SeekableStreamStartSequenceNumbers; +import org.apache.druid.indexing.seekablestream.TestSeekableStreamIndexTask; import org.apache.druid.indexing.seekablestream.common.OrderedSequenceNumber; import org.apache.druid.indexing.seekablestream.common.RecordSupplier; import org.apache.druid.indexing.seekablestream.common.StreamException; @@ -1154,8 +1152,7 @@ public Duration getEmissionDuration() context, "0", null, - recordSupplier, - "test" + recordSupplier ); TestSeekableStreamIndexTask id2 = new TestSeekableStreamIndexTask( @@ -1168,8 +1165,7 @@ public Duration getEmissionDuration() context, "0", null, - recordSupplier, - "test" + recordSupplier ); TestSeekableStreamIndexTask id3 = new TestSeekableStreamIndexTask( @@ -1182,8 +1178,7 @@ public Duration getEmissionDuration() context, "0", null, - recordSupplier, - "test" + recordSupplier ); final TaskLocation location1 = TaskLocation.create("testHost", 1234, -1); @@ -1376,8 +1371,7 @@ public Duration getEmissionDuration() context, "0", null, - recordSupplier, - "test" + recordSupplier ); TestSeekableStreamIndexTask id2 = new TestSeekableStreamIndexTask( @@ -1399,8 +1393,7 @@ public Duration getEmissionDuration() context, "1", null, - recordSupplier, - "test" + recordSupplier ); TestSeekableStreamIndexTask id3 = new TestSeekableStreamIndexTask( @@ -1422,8 +1415,7 @@ public Duration getEmissionDuration() context, "2", null, - recordSupplier, - "test" + recordSupplier ); final TaskLocation location1 = TaskLocation.create("testHost", 1234, -1); @@ -1616,8 +1608,7 @@ public Duration getEmissionDuration() context, "0", streamingTaskRunner, - recordSupplier, - "test" + recordSupplier ); final TaskLocation location1 = TaskLocation.create("testHost", 1234, -1); @@ -3020,8 +3011,7 @@ protected List> createIndexT null, null, null, - recordSupplier, - "test" + recordSupplier )); } From 66c2e12e4c8a74fa2256e7738e57bda34675397b Mon Sep 17 00:00:00 2001 From: Pankaj Kumar Date: Fri, 24 Oct 2025 10:43:49 +0530 Subject: [PATCH 7/8] minor changes --- .../overlord/setup/WorkerCategorySpec.java | 21 +++++++++--- ...hCategorySpecWorkerSelectStrategyTest.java | 32 ++++++------------- ...hCategorySpecWorkerSelectStrategyTest.java | 27 ++++++---------- 3 files changed, 35 insertions(+), 45 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/setup/WorkerCategorySpec.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/setup/WorkerCategorySpec.java index 3a23242e609a..823a08e5a141 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/setup/WorkerCategorySpec.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/setup/WorkerCategorySpec.java @@ -21,7 +21,9 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.druid.common.config.Configs; +import javax.annotation.Nullable; import java.util.Collections; import java.util.Map; import java.util.Objects; @@ -86,21 +88,19 @@ public String toString() public static class CategoryConfig { private final String defaultCategory; - // key: datasource, value: category private final Map categoryAffinity; - // key: supervisorId, value: category private final Map supervisorIdCategoryAffinity; @JsonCreator public CategoryConfig( @JsonProperty("defaultCategory") String defaultCategory, @JsonProperty("categoryAffinity") Map categoryAffinity, - @JsonProperty("supervisorIdCategoryAffinity") Map supervisorIdCategoryAffinity + @JsonProperty("supervisorIdCategoryAffinity") @Nullable Map supervisorIdCategoryAffinity ) { this.defaultCategory = defaultCategory; this.categoryAffinity = categoryAffinity == null ? Collections.emptyMap() : categoryAffinity; - this.supervisorIdCategoryAffinity = supervisorIdCategoryAffinity == null ? Collections.emptyMap() : supervisorIdCategoryAffinity; + this.supervisorIdCategoryAffinity = Configs.valueOrDefault(supervisorIdCategoryAffinity, Map.of()); } @JsonProperty @@ -109,12 +109,25 @@ public String getDefaultCategory() return defaultCategory; } + /** + * Returns a map of datasource names to worker category names. + * Used to assign tasks to specific worker categories based on their datasource. + * + * @return map where key is datasource name and value is worker category name + */ @JsonProperty public Map getCategoryAffinity() { return categoryAffinity; } + /** + * Returns a map of supervisor IDs to worker category names. + * Used to assign tasks to specific worker categories based on their supervisor ID. + * This takes precedence over {@link #getCategoryAffinity()} when both are configured. + * + * @return map where key is supervisor ID and value is worker category name + */ @JsonProperty public Map getSupervisorIdCategoryAffinity() { diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/setup/EqualDistributionWithCategorySpecWorkerSelectStrategyTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/setup/EqualDistributionWithCategorySpecWorkerSelectStrategyTest.java index 994ea3abb9e8..042516f72826 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/setup/EqualDistributionWithCategorySpecWorkerSelectStrategyTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/setup/EqualDistributionWithCategorySpecWorkerSelectStrategyTest.java @@ -195,20 +195,17 @@ public void testStrongTierSpec() @Test public void testSupervisorIdCategoryAffinity() { - // Test that supervisor ID affinity takes precedence over datasource affinity final WorkerCategorySpec workerCategorySpec = new WorkerCategorySpec( ImmutableMap.of( "test_seekable_stream", new WorkerCategorySpec.CategoryConfig( - "c1", // default category - ImmutableMap.of("ds1", "c1"), // datasource affinity - ImmutableMap.of("supervisor1", "c2") // supervisor ID affinity + "c1", + ImmutableMap.of("ds1", "c1"), + ImmutableMap.of("supervisor1", "c2") ) ), false ); - - // Create a test task with supervisor ID "supervisor1" final Task taskWithSupervisor = createTestTask("task1", "supervisor1", "ds1"); final EqualDistributionWithCategorySpecWorkerSelectStrategy strategy = @@ -219,8 +216,6 @@ public void testSupervisorIdCategoryAffinity() WORKERS_FOR_TIER_TESTS, taskWithSupervisor ); - - // Should select c2 worker (localhost3) because supervisor ID affinity takes precedence Assert.assertNotNull(worker); Assert.assertEquals("c2", worker.getWorker().getCategory()); Assert.assertEquals("localhost3", worker.getWorker().getHost()); @@ -229,20 +224,17 @@ public void testSupervisorIdCategoryAffinity() @Test public void testSupervisorIdCategoryAffinityFallbackToDatasource() { - // Test that it falls back to datasource affinity when supervisor ID affinity is not found final WorkerCategorySpec workerCategorySpec = new WorkerCategorySpec( ImmutableMap.of( "test_seekable_stream", new WorkerCategorySpec.CategoryConfig( - "c2", // default category - ImmutableMap.of("ds1", "c1"), // datasource affinity - ImmutableMap.of("supervisor2", "c2") // supervisor ID affinity (different supervisor) + "c2", + ImmutableMap.of("ds1", "c1"), + ImmutableMap.of("supervisor2", "c2") ) ), false ); - - // Create a test task with supervisor ID "supervisor1" (not in supervisorIdCategoryAffinity map) final Task taskWithSupervisor = createTestTask("task1", "supervisor1", "ds1"); final EqualDistributionWithCategorySpecWorkerSelectStrategy strategy = @@ -253,8 +245,6 @@ public void testSupervisorIdCategoryAffinityFallbackToDatasource() WORKERS_FOR_TIER_TESTS, taskWithSupervisor ); - - // Should fall back to datasource affinity and select c1 worker Assert.assertNotNull(worker); Assert.assertEquals("c1", worker.getWorker().getCategory()); Assert.assertEquals("localhost1", worker.getWorker().getHost()); @@ -263,20 +253,18 @@ public void testSupervisorIdCategoryAffinityFallbackToDatasource() @Test public void testSupervisorIdCategoryAffinityFallbackToDefault() { - // Test that it falls back to default category when neither supervisor ID nor datasource affinity is found final WorkerCategorySpec workerCategorySpec = new WorkerCategorySpec( ImmutableMap.of( "test_seekable_stream", new WorkerCategorySpec.CategoryConfig( - "c2", // default category - ImmutableMap.of("ds2", "c1"), // datasource affinity (different datasource) - ImmutableMap.of("supervisor2", "c1") // supervisor ID affinity (different supervisor) + "c2", + ImmutableMap.of("ds2", "c1"), + ImmutableMap.of("supervisor2", "c1") ) ), false ); - // Create a test task with supervisor ID "supervisor1" and datasource "ds1" final Task taskWithSupervisor = createTestTask("task1", "supervisor1", "ds1"); final EqualDistributionWithCategorySpecWorkerSelectStrategy strategy = @@ -287,8 +275,6 @@ public void testSupervisorIdCategoryAffinityFallbackToDefault() WORKERS_FOR_TIER_TESTS, taskWithSupervisor ); - - // Should fall back to default category c2 Assert.assertNotNull(worker); Assert.assertEquals("c2", worker.getWorker().getCategory()); Assert.assertEquals("localhost3", worker.getWorker().getHost()); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/setup/FillCapacityWithCategorySpecWorkerSelectStrategyTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/setup/FillCapacityWithCategorySpecWorkerSelectStrategyTest.java index 0731070577a6..ea4ffb16af5b 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/setup/FillCapacityWithCategorySpecWorkerSelectStrategyTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/setup/FillCapacityWithCategorySpecWorkerSelectStrategyTest.java @@ -195,14 +195,13 @@ public void testStrongTierSpec() @Test public void testSupervisorIdCategoryAffinity() { - // Test that supervisor ID affinity takes precedence over datasource affinity final WorkerCategorySpec workerCategorySpec = new WorkerCategorySpec( ImmutableMap.of( "test_seekable_stream", new WorkerCategorySpec.CategoryConfig( - "c1", // default category - ImmutableMap.of("ds1", "c1"), // datasource affinity - ImmutableMap.of("supervisor1", "c2") // supervisor ID affinity + "c1", + ImmutableMap.of("ds1", "c1"), + ImmutableMap.of("supervisor1", "c2") ) ), false @@ -219,8 +218,6 @@ public void testSupervisorIdCategoryAffinity() WORKERS_FOR_TIER_TESTS, taskWithSupervisor ); - - // Should select c2 worker (localhost3) because supervisor ID affinity takes precedence Assert.assertNotNull(worker); Assert.assertEquals("c2", worker.getWorker().getCategory()); Assert.assertEquals("localhost3", worker.getWorker().getHost()); @@ -229,14 +226,13 @@ public void testSupervisorIdCategoryAffinity() @Test public void testSupervisorIdCategoryAffinityFallbackToDatasource() { - // Test that it falls back to datasource affinity when supervisor ID affinity is not found final WorkerCategorySpec workerCategorySpec = new WorkerCategorySpec( ImmutableMap.of( "test_seekable_stream", new WorkerCategorySpec.CategoryConfig( - "c2", // default category - ImmutableMap.of("ds1", "c1"), // datasource affinity - ImmutableMap.of("supervisor2", "c2") // supervisor ID affinity (different supervisor) + "c2", + ImmutableMap.of("ds1", "c1"), + ImmutableMap.of("supervisor2", "c2") ) ), false @@ -253,8 +249,6 @@ public void testSupervisorIdCategoryAffinityFallbackToDatasource() WORKERS_FOR_TIER_TESTS, taskWithSupervisor ); - - // Should fall back to datasource affinity and select c1 worker Assert.assertNotNull(worker); Assert.assertEquals("c1", worker.getWorker().getCategory()); Assert.assertEquals("localhost1", worker.getWorker().getHost()); @@ -263,14 +257,13 @@ public void testSupervisorIdCategoryAffinityFallbackToDatasource() @Test public void testSupervisorIdCategoryAffinityFallbackToDefault() { - // Test that it falls back to default category when neither supervisor ID nor datasource affinity is found final WorkerCategorySpec workerCategorySpec = new WorkerCategorySpec( ImmutableMap.of( "test_seekable_stream", new WorkerCategorySpec.CategoryConfig( - "c2", // default category - ImmutableMap.of("ds2", "c1"), // datasource affinity (different datasource) - ImmutableMap.of("supervisor2", "c1") // supervisor ID affinity (different supervisor) + "c2", + ImmutableMap.of("ds2", "c1"), + ImmutableMap.of("supervisor2", "c1") ) ), false @@ -287,8 +280,6 @@ public void testSupervisorIdCategoryAffinityFallbackToDefault() WORKERS_FOR_TIER_TESTS, taskWithSupervisor ); - - // Should fall back to default category c2 Assert.assertNotNull(worker); Assert.assertEquals("c2", worker.getWorker().getCategory()); Assert.assertEquals("localhost3", worker.getWorker().getHost()); From e1d84b8aab8c754f35ec1ec10a1e722c600921cc Mon Sep 17 00:00:00 2001 From: Pankaj Kumar Date: Mon, 27 Oct 2025 11:50:26 +0530 Subject: [PATCH 8/8] minor changes --- .../indexing/overlord/setup/WorkerCategorySpec.java | 10 ++-------- .../indexing/overlord/setup/WorkerSelectUtils.java | 2 +- ...butionWithCategorySpecWorkerSelectStrategyTest.java | 10 +++++----- 3 files changed, 8 insertions(+), 14 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/setup/WorkerCategorySpec.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/setup/WorkerCategorySpec.java index 823a08e5a141..410aad78a912 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/setup/WorkerCategorySpec.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/setup/WorkerCategorySpec.java @@ -110,10 +110,7 @@ public String getDefaultCategory() } /** - * Returns a map of datasource names to worker category names. - * Used to assign tasks to specific worker categories based on their datasource. - * - * @return map where key is datasource name and value is worker category name + * Returns a map from datasource name to the worker category name to be used for tasks of that datasource. */ @JsonProperty public Map getCategoryAffinity() @@ -122,11 +119,8 @@ public Map getCategoryAffinity() } /** - * Returns a map of supervisor IDs to worker category names. - * Used to assign tasks to specific worker categories based on their supervisor ID. + * Returns a map from supervisor ID to worker category name to be used for tasks of that supervisor. * This takes precedence over {@link #getCategoryAffinity()} when both are configured. - * - * @return map where key is supervisor ID and value is worker category name */ @JsonProperty public Map getSupervisorIdCategoryAffinity() diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/setup/WorkerSelectUtils.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/setup/WorkerSelectUtils.java index 45945659e31e..e8c75a814bd4 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/setup/WorkerSelectUtils.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/setup/WorkerSelectUtils.java @@ -138,7 +138,7 @@ public static ImmutableWorkerInfo selectWorker( preferredCategory = categoryAffinity.get(task.getDataSource()); } - // If there is no preferred category for the supervisorId or datasource, then using the defaultCategory. + // If there is no preferred category for the supervisorId or datasource, then use the defaultCategory. // However, the defaultCategory may be null too, so we need to do one more null check (see below). preferredCategory = preferredCategory == null ? defaultCategory : preferredCategory; diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/setup/EqualDistributionWithCategorySpecWorkerSelectStrategyTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/setup/EqualDistributionWithCategorySpecWorkerSelectStrategyTest.java index 042516f72826..7ac8b2b746ab 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/setup/EqualDistributionWithCategorySpecWorkerSelectStrategyTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/setup/EqualDistributionWithCategorySpecWorkerSelectStrategyTest.java @@ -305,11 +305,11 @@ private static Task createTestTask(String id, @Nullable String supervisorId, Str supervisorId, null, DataSchema.builder() - .withDataSource(datasource) - .withTimestamp(new TimestampSpec(null, null, null)) - .withDimensions(new DimensionsSpec(Collections.emptyList())) - .withGranularity(new ArbitraryGranularitySpec(new AllGranularity(), Collections.emptyList())) - .build(), + .withDataSource(datasource) + .withTimestamp(new TimestampSpec(null, null, null)) + .withDimensions(new DimensionsSpec(Collections.emptyList())) + .withGranularity(new ArbitraryGranularitySpec(new AllGranularity(), Collections.emptyList())) + .build(), Mockito.mock(SeekableStreamIndexTaskTuningConfig.class), Mockito.mock(SeekableStreamIndexTaskIOConfig.class), null,