Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.common.config.Configs;

import javax.annotation.Nullable;
import java.util.Collections;
import java.util.Map;
import java.util.Objects;
Expand Down Expand Up @@ -86,17 +88,19 @@ public String toString()
public static class CategoryConfig
{
private final String defaultCategory;
// key: datasource, value: category
private final Map<String, String> categoryAffinity;
private final Map<String, String> supervisorIdCategoryAffinity;

@JsonCreator
public CategoryConfig(
@JsonProperty("defaultCategory") String defaultCategory,
@JsonProperty("categoryAffinity") Map<String, String> categoryAffinity
@JsonProperty("categoryAffinity") Map<String, String> categoryAffinity,
@JsonProperty("supervisorIdCategoryAffinity") @Nullable Map<String, String> supervisorIdCategoryAffinity
)
{
this.defaultCategory = defaultCategory;
this.categoryAffinity = categoryAffinity == null ? Collections.emptyMap() : categoryAffinity;
this.supervisorIdCategoryAffinity = Configs.valueOrDefault(supervisorIdCategoryAffinity, Map.of());
}

@JsonProperty
Expand All @@ -105,12 +109,25 @@ public String getDefaultCategory()
return defaultCategory;
}

/**
* Returns a map from datasource name to the worker category name to be used for tasks of that datasource.
*/
@JsonProperty
public Map<String, String> getCategoryAffinity()
{
return categoryAffinity;
}

/**
* Returns a map from supervisor ID to worker category name to be used for tasks of that supervisor.
* This takes precedence over {@link #getCategoryAffinity()} when both are configured.
*/
@JsonProperty
public Map<String, String> getSupervisorIdCategoryAffinity()
{
return supervisorIdCategoryAffinity;
}

@Override
public boolean equals(final Object o)
{
Expand All @@ -122,13 +139,14 @@ public boolean equals(final Object o)
}
final CategoryConfig that = (CategoryConfig) o;
return Objects.equals(defaultCategory, that.defaultCategory) &&
Objects.equals(categoryAffinity, that.categoryAffinity);
Objects.equals(categoryAffinity, that.categoryAffinity) &&
Objects.equals(supervisorIdCategoryAffinity, that.supervisorIdCategoryAffinity);
}

@Override
public int hashCode()
{
return Objects.hash(defaultCategory, categoryAffinity);
return Objects.hash(defaultCategory, categoryAffinity, supervisorIdCategoryAffinity);
}

@Override
Expand All @@ -137,6 +155,7 @@ public String toString()
return "CategoryConfig{" +
"defaultCategory=" + defaultCategory +
", categoryAffinity=" + categoryAffinity +
", supervisorIdCategoryAffinity=" + supervisorIdCategoryAffinity +
'}';
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.overlord.ImmutableWorkerInfo;
import org.apache.druid.indexing.overlord.config.WorkerTaskRunnerConfig;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTask;

import javax.annotation.Nullable;
import java.util.Collections;
Expand Down Expand Up @@ -120,10 +121,25 @@ public static ImmutableWorkerInfo selectWorker(
if (categoryConfig != null) {
final String defaultCategory = categoryConfig.getDefaultCategory();
final Map<String, String> categoryAffinity = categoryConfig.getCategoryAffinity();

String preferredCategory = categoryAffinity.get(task.getDataSource());
// If there is no preferred category for the datasource, then using the defaultCategory. However, the defaultCategory
// may be null too, so we need to do one more null check (see below).
final Map<String, String> supervisorIdCategoryAffinity = categoryConfig.getSupervisorIdCategoryAffinity();

String preferredCategory = null;

// First, check if this task has a supervisorId and if there's a category affinity for it
if (task instanceof SeekableStreamIndexTask) {
final String supervisorId = ((SeekableStreamIndexTask<?, ?, ?>) task).getSupervisorId();
if (supervisorId != null) {
preferredCategory = supervisorIdCategoryAffinity.get(supervisorId);
}
}

// If no supervisor-based category is found, fall back to datasource-based category affinity
if (preferredCategory == null) {
preferredCategory = categoryAffinity.get(task.getDataSource());
}

// If there is no preferred category for the supervisorId or datasource, then use the defaultCategory.
// However, the defaultCategory may be null too, so we need to do one more null check (see below).
preferredCategory = preferredCategory == null ? defaultCategory : preferredCategory;

if (preferredCategory != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,26 @@
package org.apache.druid.indexing.overlord.setup;

import com.google.common.collect.ImmutableMap;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.indexer.granularity.ArbitraryGranularitySpec;
import org.apache.druid.indexing.common.task.NoopTask;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.overlord.ImmutableWorkerInfo;
import org.apache.druid.indexing.overlord.config.RemoteTaskRunnerConfig;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskIOConfig;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskTuningConfig;
import org.apache.druid.indexing.seekablestream.TestSeekableStreamIndexTask;
import org.apache.druid.indexing.worker.Worker;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.granularity.AllGranularity;
import org.apache.druid.segment.indexing.DataSchema;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;

import javax.annotation.Nullable;
import java.util.Collections;
import java.util.HashSet;

public class EqualDistributionWithCategorySpecWorkerSelectStrategyTest
Expand Down Expand Up @@ -80,7 +92,8 @@ public void testFindWorkerForTaskWithPreferredTier()
"noop",
new WorkerCategorySpec.CategoryConfig(
"c2",
ImmutableMap.of("ds1", "c2")
ImmutableMap.of("ds1", "c2"),
null
)
),
false
Expand All @@ -95,7 +108,8 @@ public void testFindWorkerForTaskWithPreferredTier()
"noop",
new WorkerCategorySpec.CategoryConfig(
null,
ImmutableMap.of("ds1", "c2")
ImmutableMap.of("ds1", "c2"),
null
)
),
false
Expand All @@ -110,6 +124,7 @@ public void testFindWorkerForTaskWithPreferredTier()
"noop",
new WorkerCategorySpec.CategoryConfig(
"c2",
null,
null
)
),
Expand All @@ -127,6 +142,7 @@ public void testFindWorkerForTaskWithNullPreferredTier()
ImmutableMap.of(
"noop",
new WorkerCategorySpec.CategoryConfig(
null,
null,
null
)
Expand All @@ -146,7 +162,8 @@ public void testWeakTierSpec()
"noop",
new WorkerCategorySpec.CategoryConfig(
"c1",
ImmutableMap.of("ds1", "c3")
ImmutableMap.of("ds1", "c3"),
null
)
),
false
Expand All @@ -164,7 +181,8 @@ public void testStrongTierSpec()
"noop",
new WorkerCategorySpec.CategoryConfig(
"c1",
ImmutableMap.of("ds1", "c3")
ImmutableMap.of("ds1", "c3"),
null
)
),
true
Expand All @@ -174,6 +192,94 @@ public void testStrongTierSpec()
Assert.assertNull(worker);
}

@Test
public void testSupervisorIdCategoryAffinity()
{
final WorkerCategorySpec workerCategorySpec = new WorkerCategorySpec(
ImmutableMap.of(
"test_seekable_stream",
new WorkerCategorySpec.CategoryConfig(
"c1",
ImmutableMap.of("ds1", "c1"),
ImmutableMap.of("supervisor1", "c2")
)
),
false
);
final Task taskWithSupervisor = createTestTask("task1", "supervisor1", "ds1");

final EqualDistributionWithCategorySpecWorkerSelectStrategy strategy =
new EqualDistributionWithCategorySpecWorkerSelectStrategy(workerCategorySpec, null);

ImmutableWorkerInfo worker = strategy.findWorkerForTask(
new RemoteTaskRunnerConfig(),
WORKERS_FOR_TIER_TESTS,
taskWithSupervisor
);
Assert.assertNotNull(worker);
Assert.assertEquals("c2", worker.getWorker().getCategory());
Assert.assertEquals("localhost3", worker.getWorker().getHost());
}

@Test
public void testSupervisorIdCategoryAffinityFallbackToDatasource()
{
final WorkerCategorySpec workerCategorySpec = new WorkerCategorySpec(
ImmutableMap.of(
"test_seekable_stream",
new WorkerCategorySpec.CategoryConfig(
"c2",
ImmutableMap.of("ds1", "c1"),
ImmutableMap.of("supervisor2", "c2")
)
),
false
);
final Task taskWithSupervisor = createTestTask("task1", "supervisor1", "ds1");

final EqualDistributionWithCategorySpecWorkerSelectStrategy strategy =
new EqualDistributionWithCategorySpecWorkerSelectStrategy(workerCategorySpec, null);

ImmutableWorkerInfo worker = strategy.findWorkerForTask(
new RemoteTaskRunnerConfig(),
WORKERS_FOR_TIER_TESTS,
taskWithSupervisor
);
Assert.assertNotNull(worker);
Assert.assertEquals("c1", worker.getWorker().getCategory());
Assert.assertEquals("localhost1", worker.getWorker().getHost());
}

@Test
public void testSupervisorIdCategoryAffinityFallbackToDefault()
{
final WorkerCategorySpec workerCategorySpec = new WorkerCategorySpec(
ImmutableMap.of(
"test_seekable_stream",
new WorkerCategorySpec.CategoryConfig(
"c2",
ImmutableMap.of("ds2", "c1"),
ImmutableMap.of("supervisor2", "c1")
)
),
false
);

final Task taskWithSupervisor = createTestTask("task1", "supervisor1", "ds1");

final EqualDistributionWithCategorySpecWorkerSelectStrategy strategy =
new EqualDistributionWithCategorySpecWorkerSelectStrategy(workerCategorySpec, null);

ImmutableWorkerInfo worker = strategy.findWorkerForTask(
new RemoteTaskRunnerConfig(),
WORKERS_FOR_TIER_TESTS,
taskWithSupervisor
);
Assert.assertNotNull(worker);
Assert.assertEquals("c2", worker.getWorker().getCategory());
Assert.assertEquals("localhost3", worker.getWorker().getHost());
}

private ImmutableWorkerInfo selectWorker(WorkerCategorySpec workerCategorySpec)
{
final EqualDistributionWithCategorySpecWorkerSelectStrategy strategy =
Expand All @@ -187,4 +293,27 @@ private ImmutableWorkerInfo selectWorker(WorkerCategorySpec workerCategorySpec)

return worker;
}

/**
* Helper method to create a test task with supervisor ID for testing
*/
@SuppressWarnings("unchecked")
private static Task createTestTask(String id, @Nullable String supervisorId, String datasource)
{
return new TestSeekableStreamIndexTask(
id,
supervisorId,
null,
DataSchema.builder()
.withDataSource(datasource)
.withTimestamp(new TimestampSpec(null, null, null))
.withDimensions(new DimensionsSpec(Collections.emptyList()))
.withGranularity(new ArbitraryGranularitySpec(new AllGranularity(), Collections.emptyList()))
.build(),
Mockito.mock(SeekableStreamIndexTaskTuningConfig.class),
Mockito.mock(SeekableStreamIndexTaskIOConfig.class),
null,
null
);
}
}
Loading