diff --git a/docs/configuration/index.md b/docs/configuration/index.md index aaac62110d96..08d58e82d7fa 100644 --- a/docs/configuration/index.md +++ b/docs/configuration/index.md @@ -983,6 +983,37 @@ useful if you want work evenly distributed across your MiddleManagers. |`type`|`equalDistribution`.|required; must be `equalDistribution`| |`affinityConfig`|[Affinity config](#affinity) object|null (no affinity)| +###### Equal Distribution With Category Spec + +This strategy is a variant of `Equal Distribution`, which support `workerCategorySpec` field rather than `affinityConfig`. By specifying `workerCategorySpec`, you can assign tasks to run on different categories of MiddleManagers based on the tasks' **taskType** and **dataSource name**. This strategy can't work with `AutoScaler` since the behavior is undefined. + +|Property|Description|Default| +|--------|-----------|-------| +|`type`|`equalDistributionWithCategorySpec`.|required; must be `equalDistributionWithCategorySpec`| +|`workerCategorySpec`|[Worker Category Spec](#workercategoryspec) object|null (no worker category spec)| + +Example: specify tasks default to run on **c1** whose task +type is "index_kafka", while dataSource "ds1" run on **c2**. + +```json +{ + "selectStrategy": { + "type": "equalDistributionWithCategorySpec", + "workerCategorySpec": { + "strong": false, + "categoryMap": { + "index_kafka": { + "defaultCategory": "c1", + "categoryAffinity": { + "ds1": "c2" + } + } + } + } + } +} +``` + ###### Fill Capacity Tasks are assigned to the worker with the most currently-running tasks at the time the task begins running. This is @@ -997,6 +1028,17 @@ MiddleManagers up to capacity simultaneously, rather than a single MiddleManager |`type`|`fillCapacity`.|required; must be `fillCapacity`| |`affinityConfig`|[Affinity config](#affinity) object|null (no affinity)| +###### Fill Capacity With Category Spec + +This strategy is a variant of `Fill Capacity`, which support `workerCategorySpec` field rather than `affinityConfig`. The usage is the same with _equalDistributionWithCategorySpec_ strategy. This strategy can't work with `AutoScaler` since the behavior is undefined. + +|Property|Description|Default| +|--------|-----------|-------| +|`type`|`fillCapacityWithCategorySpec`.|required; must be `fillCapacityWithCategorySpec`| +|`workerCategorySpec`|[Worker Category Spec](#workercategoryspec) object|null (no worker category spec)| + +> Before using the _equalDistributionWithCategorySpec_ and _fillCapacityWithCategorySpec_ strategies, you must upgrade overlord and all MiddleManagers to the version that support this feature. + ###### JavaScript @@ -1033,6 +1075,23 @@ field. If not provided, the default is to not use affinity at all. |`affinity`|JSON object mapping a datasource String name to a list of indexing service MiddleManager host:port String values. Druid doesn't perform DNS resolution, so the 'host' value must match what is configured on the MiddleManager and what the MiddleManager announces itself as (examine the Overlord logs to see what your MiddleManager announces itself as).|{}| |`strong`|With weak affinity (the default), tasks for a dataSource may be assigned to other MiddleManagers if their affinity-mapped MiddleManagers are not able to run all pending tasks in the queue for that dataSource. With strong affinity, tasks for a dataSource will only ever be assigned to their affinity-mapped MiddleManagers, and will wait in the pending queue if necessary.|false| +###### WorkerCategorySpec + +WorkerCategorySpec can be provided to the _equalDistributionWithCategorySpec_ and _fillCapacityWithCategorySpec_ strategies using the "workerCategorySpec" +field. If not provided, the default is to not use it at all. + +|Property|Description|Default| +|--------|-----------|-------| +|`categoryMap`|A JSON map object mapping a task type String name to a [CategoryConfig](#categoryconfig) object, by which you can specify category config for different task type.|{}| +|`strong`|With weak workerCategorySpec (the default), tasks for a dataSource may be assigned to other MiddleManagers if the MiddleManagers specified in `categoryMap` are not able to run all pending tasks in the queue for that dataSource. With strong workerCategorySpec, tasks for a dataSource will only ever be assigned to their specified MiddleManagers, and will wait in the pending queue if necessary.|false| + +###### CategoryConfig + +|Property|Description|Default| +|--------|-----------|-------| +|`defaultCategory`|Specify default category for a task type.|null| +|`categoryAffinity`|A JSON map object mapping a datasource String name to a category String name of the MiddleManager. If category isn't specified for a datasource, then using the `defaultCategory`. If no specified category and the `defaultCategory` is also null, then tasks can run on any available MiddleManagers.|null| + ##### Autoscaler Amazon's EC2 is currently the only supported autoscaler. @@ -1084,6 +1143,7 @@ Middle managers pass their configurations down to their child peons. The MiddleM |`druid.worker.ip`|The IP of the worker.|localhost| |`druid.worker.version`|Version identifier for the MiddleManager.|0| |`druid.worker.capacity`|Maximum number of tasks the MiddleManager can accept.|Number of available processors - 1| +|`druid.worker.category`|A string to name the category that the MiddleManager node belongs to.|`__default_worker_category`| #### Peon Processing diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/autoscaling/PendingTaskBasedWorkerProvisioningStrategy.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/autoscaling/PendingTaskBasedWorkerProvisioningStrategy.java index c908d282a30c..52624f41e4e0 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/autoscaling/PendingTaskBasedWorkerProvisioningStrategy.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/autoscaling/PendingTaskBasedWorkerProvisioningStrategy.java @@ -37,6 +37,7 @@ import org.apache.druid.indexing.overlord.setup.WorkerBehaviorConfig; import org.apache.druid.indexing.overlord.setup.WorkerSelectStrategy; import org.apache.druid.indexing.worker.Worker; +import org.apache.druid.indexing.worker.config.WorkerConfig; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.concurrent.ScheduledExecutors; import org.apache.druid.java.util.emitter.EmittingLogger; @@ -477,7 +478,7 @@ private static ImmutableWorkerInfo workerWithTask(ImmutableWorkerInfo immutableW private static ImmutableWorkerInfo createDummyWorker(String scheme, String host, int capacity, String version) { return new ImmutableWorkerInfo( - new Worker(scheme, host, "-2", capacity, version), + new Worker(scheme, host, "-2", capacity, version, WorkerConfig.DEFAULT_CATEGORY), 0, new HashSet<>(), new HashSet<>(), diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java index b6222d269c0f..afbdbd75b410 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java @@ -485,7 +485,8 @@ private Worker toWorker(DiscoveryDruidNode node) node.getDruidNode().getHostAndPortToUse(), ((WorkerNodeService) node.getServices().get(WorkerNodeService.DISCOVERY_SERVICE_KEY)).getIp(), ((WorkerNodeService) node.getServices().get(WorkerNodeService.DISCOVERY_SERVICE_KEY)).getCapacity(), - ((WorkerNodeService) node.getServices().get(WorkerNodeService.DISCOVERY_SERVICE_KEY)).getVersion() + ((WorkerNodeService) node.getServices().get(WorkerNodeService.DISCOVERY_SERVICE_KEY)).getVersion(), + ((WorkerNodeService) node.getServices().get(WorkerNodeService.DISCOVERY_SERVICE_KEY)).getCategory() ); } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/WorkerHolder.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/WorkerHolder.java index ee8421b5c440..cfb618066bb0 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/WorkerHolder.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/WorkerHolder.java @@ -183,7 +183,8 @@ public ImmutableWorkerInfo toImmutable() worker.getHost(), worker.getIp(), worker.getCapacity(), - "" + "", + worker.getCategory() ); } w = disabledWorker; diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/setup/EqualDistributionWithCategorySpecWorkerSelectStrategy.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/setup/EqualDistributionWithCategorySpecWorkerSelectStrategy.java new file mode 100644 index 000000000000..ec65693ac3dd --- /dev/null +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/setup/EqualDistributionWithCategorySpecWorkerSelectStrategy.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.overlord.setup; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.collect.ImmutableMap; +import org.apache.druid.indexing.common.task.Task; +import org.apache.druid.indexing.overlord.ImmutableWorkerInfo; +import org.apache.druid.indexing.overlord.config.WorkerTaskRunnerConfig; + +import javax.annotation.Nullable; +import java.util.Objects; + +public class EqualDistributionWithCategorySpecWorkerSelectStrategy implements WorkerSelectStrategy +{ + private final WorkerCategorySpec workerCategorySpec; + + @JsonCreator + public EqualDistributionWithCategorySpecWorkerSelectStrategy( + @JsonProperty("workerCategorySpec") WorkerCategorySpec workerCategorySpec + ) + { + this.workerCategorySpec = workerCategorySpec; + } + + @JsonProperty + public WorkerCategorySpec getWorkerCategorySpec() + { + return workerCategorySpec; + } + + @Nullable + @Override + public ImmutableWorkerInfo findWorkerForTask( + final WorkerTaskRunnerConfig config, + final ImmutableMap zkWorkers, + final Task task + ) + { + return WorkerSelectUtils.selectWorker( + task, + zkWorkers, + config, + workerCategorySpec, + EqualDistributionWorkerSelectStrategy::selectFromEligibleWorkers + ); + } + + @Override + public boolean equals(final Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + final EqualDistributionWithCategorySpecWorkerSelectStrategy that = (EqualDistributionWithCategorySpecWorkerSelectStrategy) o; + return Objects.equals(workerCategorySpec, that.workerCategorySpec); + } + + @Override + public int hashCode() + { + return Objects.hash(workerCategorySpec); + } + + @Override + public String toString() + { + return "EqualDistributionWithCategorySpecWorkerSelectStrategy{" + + "workerCategorySpec=" + workerCategorySpec + + '}'; + } +} diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/setup/EqualDistributionWorkerSelectStrategy.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/setup/EqualDistributionWorkerSelectStrategy.java index eb9a3c57909c..afb020e9f7ab 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/setup/EqualDistributionWorkerSelectStrategy.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/setup/EqualDistributionWorkerSelectStrategy.java @@ -66,7 +66,7 @@ public ImmutableWorkerInfo findWorkerForTask( ); } - private static ImmutableWorkerInfo selectFromEligibleWorkers(final Map eligibleWorkers) + static ImmutableWorkerInfo selectFromEligibleWorkers(final Map eligibleWorkers) { return eligibleWorkers.values().stream().max( Comparator.comparing(ImmutableWorkerInfo::getAvailableCapacity) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/setup/FillCapacityWithCategorySpecWorkerSelectStrategy.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/setup/FillCapacityWithCategorySpecWorkerSelectStrategy.java new file mode 100644 index 000000000000..3dcdfe9a5401 --- /dev/null +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/setup/FillCapacityWithCategorySpecWorkerSelectStrategy.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.overlord.setup; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.collect.ImmutableMap; +import org.apache.druid.indexing.common.task.Task; +import org.apache.druid.indexing.overlord.ImmutableWorkerInfo; +import org.apache.druid.indexing.overlord.config.WorkerTaskRunnerConfig; + +import javax.annotation.Nullable; +import java.util.Objects; + +public class FillCapacityWithCategorySpecWorkerSelectStrategy implements WorkerSelectStrategy +{ + private final WorkerCategorySpec workerCategorySpec; + + @JsonCreator + public FillCapacityWithCategorySpecWorkerSelectStrategy( + @JsonProperty("workerCategorySpec") WorkerCategorySpec workerCategorySpec + ) + { + this.workerCategorySpec = workerCategorySpec; + } + + @JsonProperty + public WorkerCategorySpec getWorkerCategorySpec() + { + return workerCategorySpec; + } + + @Nullable + @Override + public ImmutableWorkerInfo findWorkerForTask( + final WorkerTaskRunnerConfig config, + final ImmutableMap zkWorkers, + final Task task + ) + { + return WorkerSelectUtils.selectWorker( + task, + zkWorkers, + config, + workerCategorySpec, + FillCapacityWorkerSelectStrategy::selectFromEligibleWorkers + ); + } + + @Override + public boolean equals(final Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + final FillCapacityWithCategorySpecWorkerSelectStrategy that = (FillCapacityWithCategorySpecWorkerSelectStrategy) o; + return Objects.equals(workerCategorySpec, that.workerCategorySpec); + } + + @Override + public int hashCode() + { + return Objects.hash(workerCategorySpec); + } + + @Override + public String toString() + { + return "FillCapacityWithCategorySpecWorkerSelectStrategy{" + + "workerCategorySpec=" + workerCategorySpec + + '}'; + } +} diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/setup/FillCapacityWorkerSelectStrategy.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/setup/FillCapacityWorkerSelectStrategy.java index 2c82215abff0..47ac65b8271d 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/setup/FillCapacityWorkerSelectStrategy.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/setup/FillCapacityWorkerSelectStrategy.java @@ -64,7 +64,7 @@ public ImmutableWorkerInfo findWorkerForTask( ); } - private static ImmutableWorkerInfo selectFromEligibleWorkers(final Map eligibleWorkers) + static ImmutableWorkerInfo selectFromEligibleWorkers(final Map eligibleWorkers) { return eligibleWorkers.values().stream().max( Comparator.comparing(ImmutableWorkerInfo::getCurrCapacityUsed) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/setup/WorkerCategorySpec.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/setup/WorkerCategorySpec.java new file mode 100644 index 000000000000..57340bc8f48b --- /dev/null +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/setup/WorkerCategorySpec.java @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.overlord.setup; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + +import java.util.Collections; +import java.util.Map; +import java.util.Objects; + +public class WorkerCategorySpec +{ + // key: taskType, value: categoryConfig + private final Map categoryMap; + private final boolean strong; + + @JsonCreator + public WorkerCategorySpec( + @JsonProperty("categoryMap") Map categoryMap, + @JsonProperty("strong") boolean strong + ) + { + this.categoryMap = categoryMap == null ? Collections.EMPTY_MAP : categoryMap; + this.strong = strong; + } + + @JsonProperty + public Map getCategoryMap() + { + return categoryMap; + } + + @JsonProperty + public boolean isStrong() + { + return strong; + } + + @Override + public boolean equals(final Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + final WorkerCategorySpec that = (WorkerCategorySpec) o; + return strong == that.strong && + Objects.equals(categoryMap, that.categoryMap); + } + + @Override + public int hashCode() + { + return Objects.hash(categoryMap, strong); + } + + @Override + public String toString() + { + return "WorkerCategorySpec{" + + "categoryMap=" + categoryMap + + ", strong=" + strong + + '}'; + } + + public static class CategoryConfig + { + private final String defaultCategory; + // key: datasource, value: category + private final Map categoryAffinity; + + @JsonCreator + public CategoryConfig( + @JsonProperty("defaultCategory") String defaultCategory, + @JsonProperty("categoryAffinity") Map categoryAffinity + ) + { + this.defaultCategory = defaultCategory; + this.categoryAffinity = categoryAffinity == null ? Collections.EMPTY_MAP : categoryAffinity; + } + + @JsonProperty + public String getDefaultCategory() + { + return defaultCategory; + } + + @JsonProperty + public Map getCategoryAffinity() + { + return categoryAffinity; + } + + @Override + public boolean equals(final Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + final CategoryConfig that = (CategoryConfig) o; + return Objects.equals(defaultCategory, that.defaultCategory) && + Objects.equals(categoryAffinity, that.categoryAffinity); + } + + @Override + public int hashCode() + { + return Objects.hash(defaultCategory, categoryAffinity); + } + + @Override + public String toString() + { + return "CategoryConfig{" + + "defaultCategory=" + defaultCategory + + ", categoryAffinity=" + categoryAffinity + + '}'; + } + } +} diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/setup/WorkerSelectStrategy.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/setup/WorkerSelectStrategy.java index fbb8638907d6..a3443ee73583 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/setup/WorkerSelectStrategy.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/setup/WorkerSelectStrategy.java @@ -38,7 +38,9 @@ @JsonSubTypes.Type(name = "fillCapacityWithAffinity", value = FillCapacityWithAffinityWorkerSelectStrategy.class), @JsonSubTypes.Type(name = "equalDistribution", value = EqualDistributionWorkerSelectStrategy.class), @JsonSubTypes.Type(name = "equalDistributionWithAffinity", value = EqualDistributionWithAffinityWorkerSelectStrategy.class), - @JsonSubTypes.Type(name = "javascript", value = JavaScriptWorkerSelectStrategy.class) + @JsonSubTypes.Type(name = "javascript", value = JavaScriptWorkerSelectStrategy.class), + @JsonSubTypes.Type(name = "fillCapacityWithCategorySpec", value = FillCapacityWithCategorySpecWorkerSelectStrategy.class), + @JsonSubTypes.Type(name = "equalDistributionWithCategorySpec", value = EqualDistributionWithCategorySpecWorkerSelectStrategy.class) }) @PublicApi public interface WorkerSelectStrategy diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/setup/WorkerSelectUtils.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/setup/WorkerSelectUtils.java index c5ccd15bebe7..24721e85d068 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/setup/WorkerSelectUtils.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/setup/WorkerSelectUtils.java @@ -57,13 +57,7 @@ public static ImmutableWorkerInfo selectWorker( final Function, ImmutableWorkerInfo> workerSelector ) { - // Workers that could potentially run this task, ignoring affinityConfig. - final Map runnableWorkers = allWorkers - .values() - .stream() - .filter(worker -> worker.canRunTask(task) - && worker.isValidVersion(workerTaskRunnerConfig.getMinWorkerVersion())) - .collect(Collectors.toMap(w -> w.getWorker().getHost(), Function.identity())); + final Map runnableWorkers = getRunnableWorkers(task, allWorkers, workerTaskRunnerConfig); if (affinityConfig == null) { // All runnable workers are valid. @@ -95,6 +89,90 @@ public static ImmutableWorkerInfo selectWorker( } } + /** + * Helper for {@link WorkerSelectStrategy} implementations. + * + * @param allWorkers map of all workers, in the style provided to {@link WorkerSelectStrategy} + * @param workerCategorySpec worker category spec, or null + * @param workerSelector function that receives a list of eligible workers: version is high enough, worker can run + * the task, and worker satisfies the worker category spec. may return null. + * + * @return selected worker from "allWorkers", or null. + */ + @Nullable + public static ImmutableWorkerInfo selectWorker( + final Task task, + final Map allWorkers, + final WorkerTaskRunnerConfig workerTaskRunnerConfig, + @Nullable final WorkerCategorySpec workerCategorySpec, + final Function, ImmutableWorkerInfo> workerSelector + ) + { + final Map runnableWorkers = getRunnableWorkers(task, allWorkers, workerTaskRunnerConfig); + + // select worker according to worker category spec + if (workerCategorySpec != null) { + final WorkerCategorySpec.CategoryConfig categoryConfig = workerCategorySpec.getCategoryMap().get(task.getType()); + + if (categoryConfig != null) { + final String defaultCategory = categoryConfig.getDefaultCategory(); + final Map categoryAffinity = categoryConfig.getCategoryAffinity(); + + String preferredCategory = categoryAffinity.get(task.getDataSource()); + // If there is no preferred category for the datasource, then using the defaultCategory. However, the defaultCategory + // may be null too, so we need to do one more null check (see below). + preferredCategory = preferredCategory == null ? defaultCategory : preferredCategory; + + if (preferredCategory != null) { + // select worker from preferred category + final ImmutableMap categoryWorkers = getCategoryWorkers(preferredCategory, runnableWorkers); + final ImmutableWorkerInfo selected = workerSelector.apply(categoryWorkers); + + if (selected != null) { + return selected; + } else if (workerCategorySpec.isStrong()) { + return null; + } + } + } + } + + // select worker from all runnable workers by default + return workerSelector.apply(ImmutableMap.copyOf(runnableWorkers)); + } + + // Get workers that could potentially run this task, ignoring affinityConfig/workerCategorySpec. + private static Map getRunnableWorkers( + final Task task, + final Map allWorkers, + final WorkerTaskRunnerConfig workerTaskRunnerConfig + ) + { + return allWorkers.values() + .stream() + .filter(worker -> worker.canRunTask(task) + && worker.isValidVersion(workerTaskRunnerConfig.getMinWorkerVersion())) + .collect(Collectors.toMap(w -> w.getWorker().getHost(), Function.identity())); + } + + /** + * Return workers belong to this category. + * + * @param category worker category name + * @param workerMap map of worker hostname to worker info + * + * @return map of worker hostname to worker info + */ + private static ImmutableMap getCategoryWorkers( + final String category, + final Map workerMap + ) + { + return ImmutableMap.copyOf( + Maps.filterValues(workerMap, workerInfo -> workerInfo.getWorker().getCategory().equals(category)) + ); + } + /** * Return workers not assigned to any affinity pool at all. * diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/worker/Worker.java b/indexing-service/src/main/java/org/apache/druid/indexing/worker/Worker.java index fd141daf2668..2394a464e5c2 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/worker/Worker.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/worker/Worker.java @@ -34,6 +34,7 @@ public class Worker private final String ip; private final int capacity; private final String version; + private final String category; @JsonCreator public Worker( @@ -41,7 +42,8 @@ public Worker( @JsonProperty("host") String host, @JsonProperty("ip") String ip, @JsonProperty("capacity") int capacity, - @JsonProperty("version") String version + @JsonProperty("version") String version, + @JsonProperty("category") String category ) { this.scheme = scheme == null ? "http" : scheme; // needed for backwards compatibility with older workers (pre-#4270) @@ -49,6 +51,7 @@ public Worker( this.ip = ip; this.capacity = capacity; this.version = version; + this.category = category; } @JsonProperty @@ -81,6 +84,12 @@ public String getVersion() return version; } + @JsonProperty + public String getCategory() + { + return category; + } + @Override public boolean equals(Object o) { @@ -105,7 +114,10 @@ public boolean equals(Object o) if (!ip.equals(worker.ip)) { return false; } - return version.equals(worker.version); + if (!version.equals(worker.version)) { + return false; + } + return category.equals(worker.category); } @Override @@ -116,6 +128,7 @@ public int hashCode() result = 31 * result + ip.hashCode(); result = 31 * result + capacity; result = 31 * result + version.hashCode(); + result = 31 * result + category.hashCode(); return result; } @@ -128,6 +141,7 @@ public String toString() ", ip='" + ip + '\'' + ", capacity=" + capacity + ", version='" + version + '\'' + + ", category='" + category + '\'' + '}'; } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/worker/http/WorkerResource.java b/indexing-service/src/main/java/org/apache/druid/indexing/worker/http/WorkerResource.java index 7bbf10bb4244..cafabc9e525f 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/worker/http/WorkerResource.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/worker/http/WorkerResource.java @@ -91,7 +91,8 @@ public Response doDisable() enabledWorker.getHost(), enabledWorker.getIp(), enabledWorker.getCapacity(), - DISABLED_VERSION + DISABLED_VERSION, + enabledWorker.getCategory() ); curatorCoordinator.updateWorkerAnnouncement(disabledWorker); workerTaskManager.workerDisabled(); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/ImmutableWorkerInfoTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/ImmutableWorkerInfoTest.java index 3c1530d3f6d0..785698e68f0e 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/ImmutableWorkerInfoTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/ImmutableWorkerInfoTest.java @@ -23,6 +23,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableSet; import org.apache.druid.indexing.worker.Worker; +import org.apache.druid.indexing.worker.config.WorkerConfig; import org.apache.druid.jackson.DefaultObjectMapper; import org.apache.druid.java.util.common.DateTimes; import org.junit.Assert; @@ -35,7 +36,7 @@ public void testSerde() throws Exception { ImmutableWorkerInfo workerInfo = new ImmutableWorkerInfo( new Worker( - "http", "testWorker", "192.0.0.1", 10, "v1" + "http", "testWorker", "192.0.0.1", 10, "v1", WorkerConfig.DEFAULT_CATEGORY ), 2, ImmutableSet.of("grp1", "grp2"), @@ -56,7 +57,7 @@ public void testEqualsAndSerde() // Everything equal assertEqualsAndHashCode(new ImmutableWorkerInfo( new Worker( - "http", "testWorker", "192.0.0.1", 10, "v1" + "http", "testWorker", "192.0.0.1", 10, "v1", WorkerConfig.DEFAULT_CATEGORY ), 2, ImmutableSet.of("grp1", "grp2"), @@ -64,7 +65,7 @@ public void testEqualsAndSerde() DateTimes.of("2015-01-01T01:01:01Z") ), new ImmutableWorkerInfo( new Worker( - "http", "testWorker", "192.0.0.1", 10, "v1" + "http", "testWorker", "192.0.0.1", 10, "v1", WorkerConfig.DEFAULT_CATEGORY ), 2, ImmutableSet.of("grp1", "grp2"), @@ -72,10 +73,29 @@ public void testEqualsAndSerde() DateTimes.of("2015-01-01T01:01:01Z") ), true); + // same worker different category + assertEqualsAndHashCode(new ImmutableWorkerInfo( + new Worker( + "http", "testWorker", "192.0.0.1", 10, "v1", "c1" + ), + 2, + ImmutableSet.of("grp1", "grp2"), + ImmutableSet.of("task1", "task2"), + DateTimes.of("2015-01-01T01:01:01Z") + ), new ImmutableWorkerInfo( + new Worker( + "http", "testWorker", "192.0.0.1", 10, "v1", "c2" + ), + 2, + ImmutableSet.of("grp1", "grp2"), + ImmutableSet.of("task1", "task2"), + DateTimes.of("2015-01-01T01:01:01Z") + ), false); + // different worker same tasks assertEqualsAndHashCode(new ImmutableWorkerInfo( new Worker( - "http", "testWorker1", "192.0.0.1", 10, "v1" + "http", "testWorker1", "192.0.0.1", 10, "v1", WorkerConfig.DEFAULT_CATEGORY ), 2, ImmutableSet.of("grp1", "grp2"), @@ -83,7 +103,7 @@ public void testEqualsAndSerde() DateTimes.of("2015-01-01T01:01:01Z") ), new ImmutableWorkerInfo( new Worker( - "http", "testWorker2", "192.0.0.1", 10, "v1" + "http", "testWorker2", "192.0.0.1", 10, "v1", WorkerConfig.DEFAULT_CATEGORY ), 2, ImmutableSet.of("grp1", "grp2"), @@ -94,7 +114,7 @@ public void testEqualsAndSerde() // same worker different task groups assertEqualsAndHashCode(new ImmutableWorkerInfo( new Worker( - "http", "testWorker", "192.0.0.1", 10, "v1" + "http", "testWorker", "192.0.0.1", 10, "v1", WorkerConfig.DEFAULT_CATEGORY ), 2, ImmutableSet.of("grp3", "grp2"), @@ -102,7 +122,7 @@ public void testEqualsAndSerde() DateTimes.of("2015-01-01T01:01:01Z") ), new ImmutableWorkerInfo( new Worker( - "http", "testWorker", "192.0.0.1", 10, "v1" + "http", "testWorker", "192.0.0.1", 10, "v1", WorkerConfig.DEFAULT_CATEGORY ), 2, ImmutableSet.of("grp1", "grp2"), @@ -113,7 +133,7 @@ public void testEqualsAndSerde() // same worker different tasks assertEqualsAndHashCode(new ImmutableWorkerInfo( new Worker( - "http", "testWorker1", "192.0.0.1", 10, "v1" + "http", "testWorker1", "192.0.0.1", 10, "v1", WorkerConfig.DEFAULT_CATEGORY ), 2, ImmutableSet.of("grp1", "grp2"), @@ -121,7 +141,7 @@ public void testEqualsAndSerde() DateTimes.of("2015-01-01T01:01:01Z") ), new ImmutableWorkerInfo( new Worker( - "http", "testWorker2", "192.0.0.1", 10, "v1" + "http", "testWorker2", "192.0.0.1", 10, "v1", WorkerConfig.DEFAULT_CATEGORY ), 2, ImmutableSet.of("grp1", "grp2"), @@ -132,7 +152,7 @@ public void testEqualsAndSerde() // same worker different capacity assertEqualsAndHashCode(new ImmutableWorkerInfo( new Worker( - "http", "testWorker1", "192.0.0.1", 10, "v1" + "http", "testWorker1", "192.0.0.1", 10, "v1", WorkerConfig.DEFAULT_CATEGORY ), 3, ImmutableSet.of("grp1", "grp2"), @@ -140,7 +160,7 @@ public void testEqualsAndSerde() DateTimes.of("2015-01-01T01:01:01Z") ), new ImmutableWorkerInfo( new Worker( - "http", "testWorker2", "192.0.0.1", 10, "v1" + "http", "testWorker2", "192.0.0.1", 10, "v1", WorkerConfig.DEFAULT_CATEGORY ), 2, ImmutableSet.of("grp1", "grp2"), @@ -151,7 +171,7 @@ public void testEqualsAndSerde() // same worker different lastCompletedTaskTime assertEqualsAndHashCode(new ImmutableWorkerInfo( new Worker( - "http", "testWorker1", "192.0.0.1", 10, "v1" + "http", "testWorker1", "192.0.0.1", 10, "v1", WorkerConfig.DEFAULT_CATEGORY ), 3, ImmutableSet.of("grp1", "grp2"), @@ -159,7 +179,7 @@ public void testEqualsAndSerde() DateTimes.of("2015-01-01T01:01:01Z") ), new ImmutableWorkerInfo( new Worker( - "http", "testWorker2", "192.0.0.1", 10, "v1" + "http", "testWorker2", "192.0.0.1", 10, "v1", WorkerConfig.DEFAULT_CATEGORY ), 2, ImmutableSet.of("grp1", "grp2"), @@ -170,7 +190,7 @@ public void testEqualsAndSerde() // same worker different blacklistedUntil assertEqualsAndHashCode(new ImmutableWorkerInfo( new Worker( - "http", "testWorker1", "192.0.0.1", 10, "v1" + "http", "testWorker1", "192.0.0.1", 10, "v1", WorkerConfig.DEFAULT_CATEGORY ), 3, ImmutableSet.of("grp1", "grp2"), @@ -179,7 +199,7 @@ public void testEqualsAndSerde() DateTimes.of("2017-07-30") ), new ImmutableWorkerInfo( new Worker( - "http", "testWorker2", "192.0.0.1", 10, "v1" + "http", "testWorker2", "192.0.0.1", 10, "v1", WorkerConfig.DEFAULT_CATEGORY ), 2, ImmutableSet.of("grp1", "grp2"), diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RemoteTaskRunnerTestUtils.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RemoteTaskRunnerTestUtils.java index ed0713dd23bd..46239e3d76fc 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RemoteTaskRunnerTestUtils.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RemoteTaskRunnerTestUtils.java @@ -42,6 +42,7 @@ import org.apache.druid.indexing.overlord.setup.WorkerBehaviorConfig; import org.apache.druid.indexing.worker.TaskAnnouncement; import org.apache.druid.indexing.worker.Worker; +import org.apache.druid.indexing.worker.config.WorkerConfig; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.http.client.HttpClient; import org.apache.druid.server.initialization.IndexerZkConfig; @@ -146,7 +147,8 @@ Worker makeWorker(final String workerId, final int capacity) throws Exception workerId, workerId, capacity, - "0" + "0", + WorkerConfig.DEFAULT_CATEGORY ); cf.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath( @@ -162,7 +164,14 @@ void disableWorker(Worker worker) throws Exception { cf.setData().forPath( JOINER.join(ANNOUNCEMENTS_PATH, worker.getHost()), - jsonMapper.writeValueAsBytes(new Worker(worker.getScheme(), worker.getHost(), worker.getIp(), worker.getCapacity(), "")) + jsonMapper.writeValueAsBytes(new Worker( + worker.getScheme(), + worker.getHost(), + worker.getIp(), + worker.getCapacity(), + "", + worker.getCategory() + )) ); } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskRunnerUtilsTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskRunnerUtilsTest.java index b21a98fda4ce..194fb9c86514 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskRunnerUtilsTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskRunnerUtilsTest.java @@ -20,6 +20,7 @@ package org.apache.druid.indexing.overlord; import org.apache.druid.indexing.worker.Worker; +import org.apache.druid.indexing.worker.config.WorkerConfig; import org.junit.Assert; import org.junit.Test; @@ -31,7 +32,7 @@ public class TaskRunnerUtilsTest public void testMakeWorkerURL() { final URL url = TaskRunnerUtils.makeWorkerURL( - new Worker("https", "1.2.3.4:8290", "1.2.3.4", 1, "0"), + new Worker("https", "1.2.3.4:8290", "1.2.3.4", 1, "0", WorkerConfig.DEFAULT_CATEGORY), "/druid/worker/v1/task/%s/log", "foo bar&" ); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/WorkerTaskRunnerQueryAdpaterTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/WorkerTaskRunnerQueryAdpaterTest.java index 330ff2c8b7e8..4bb1c9b24d8c 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/WorkerTaskRunnerQueryAdpaterTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/WorkerTaskRunnerQueryAdpaterTest.java @@ -24,6 +24,7 @@ import com.google.common.collect.ImmutableSet; import com.google.common.util.concurrent.SettableFuture; import org.apache.druid.indexing.worker.Worker; +import org.apache.druid.indexing.worker.config.WorkerConfig; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.RE; import org.apache.druid.java.util.http.client.HttpClient; @@ -66,7 +67,7 @@ public void setup() ImmutableList.of( new ImmutableWorkerInfo( new Worker( - "http", "worker-host1", "192.0.0.1", 10, "v1" + "http", "worker-host1", "192.0.0.1", 10, "v1", WorkerConfig.DEFAULT_CATEGORY ), 2, ImmutableSet.of("grp1", "grp2"), @@ -75,7 +76,7 @@ public void setup() ), new ImmutableWorkerInfo( new Worker( - "https", "worker-host2", "192.0.0.2", 4, "v1" + "https", "worker-host2", "192.0.0.2", 4, "v1", WorkerConfig.DEFAULT_CATEGORY ), 1, ImmutableSet.of("grp1"), diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/autoscaling/PendingTaskBasedProvisioningStrategyTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/autoscaling/PendingTaskBasedProvisioningStrategyTest.java index 2fb0949c96fc..45207940668e 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/autoscaling/PendingTaskBasedProvisioningStrategyTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/autoscaling/PendingTaskBasedProvisioningStrategyTest.java @@ -36,6 +36,7 @@ import org.apache.druid.indexing.overlord.setup.WorkerBehaviorConfig; import org.apache.druid.indexing.worker.TaskAnnouncement; import org.apache.druid.indexing.worker.Worker; +import org.apache.druid.indexing.worker.config.WorkerConfig; import org.apache.druid.jackson.DefaultObjectMapper; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.concurrent.Execs; @@ -577,7 +578,7 @@ public TestZkWorker( int capacity ) { - super(new Worker(scheme, host, ip, capacity, version), null, new DefaultObjectMapper()); + super(new Worker(scheme, host, ip, capacity, version, WorkerConfig.DEFAULT_CATEGORY), null, new DefaultObjectMapper()); this.testTask = testTask; } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/autoscaling/SimpleProvisioningStrategyTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/autoscaling/SimpleProvisioningStrategyTest.java index 7796dfcb4178..7f9739ad55ff 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/autoscaling/SimpleProvisioningStrategyTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/autoscaling/SimpleProvisioningStrategyTest.java @@ -34,6 +34,7 @@ import org.apache.druid.indexing.overlord.setup.WorkerBehaviorConfig; import org.apache.druid.indexing.worker.TaskAnnouncement; import org.apache.druid.indexing.worker.Worker; +import org.apache.druid.indexing.worker.config.WorkerConfig; import org.apache.druid.jackson.DefaultObjectMapper; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.concurrent.Execs; @@ -507,7 +508,7 @@ public TestZkWorker( String version ) { - super(new Worker(scheme, host, ip, 3, version), null, new DefaultObjectMapper()); + super(new Worker(scheme, host, ip, 3, version, WorkerConfig.DEFAULT_CATEGORY), null, new DefaultObjectMapper()); this.testTask = testTask; } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunnerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunnerTest.java index 4f73e49200f2..dc87de699bdb 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunnerTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunnerTest.java @@ -46,6 +46,7 @@ import org.apache.druid.indexing.overlord.setup.DefaultWorkerBehaviorConfig; import org.apache.druid.indexing.worker.TaskAnnouncement; import org.apache.druid.indexing.worker.Worker; +import org.apache.druid.indexing.worker.config.WorkerConfig; import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.concurrent.Execs; @@ -143,7 +144,7 @@ protected WorkerHolder createWorkerHolder( new DruidNode("service", "host1", false, 8080, null, true, false), NodeType.MIDDLE_MANAGER, ImmutableMap.of( - WorkerNodeService.DISCOVERY_SERVICE_KEY, new WorkerNodeService("ip1", 2, "0") + WorkerNodeService.DISCOVERY_SERVICE_KEY, new WorkerNodeService("ip1", 2, "0", WorkerConfig.DEFAULT_CATEGORY) ) ); @@ -151,7 +152,7 @@ WorkerNodeService.DISCOVERY_SERVICE_KEY, new WorkerNodeService("ip1", 2, "0") new DruidNode("service", "host2", false, 8080, null, true, false), NodeType.MIDDLE_MANAGER, ImmutableMap.of( - WorkerNodeService.DISCOVERY_SERVICE_KEY, new WorkerNodeService("ip2", 2, "0") + WorkerNodeService.DISCOVERY_SERVICE_KEY, new WorkerNodeService("ip2", 2, "0", WorkerConfig.DEFAULT_CATEGORY) ) ); @@ -240,7 +241,7 @@ protected WorkerHolder createWorkerHolder( new DruidNode("service", "host1", false, 8080, null, true, false), NodeType.MIDDLE_MANAGER, ImmutableMap.of( - WorkerNodeService.DISCOVERY_SERVICE_KEY, new WorkerNodeService("ip1", 2, "0") + WorkerNodeService.DISCOVERY_SERVICE_KEY, new WorkerNodeService("ip1", 2, "0", WorkerConfig.DEFAULT_CATEGORY) ) ); @@ -248,7 +249,7 @@ WorkerNodeService.DISCOVERY_SERVICE_KEY, new WorkerNodeService("ip1", 2, "0") new DruidNode("service", "host2", false, 8080, null, true, false), NodeType.MIDDLE_MANAGER, ImmutableMap.of( - WorkerNodeService.DISCOVERY_SERVICE_KEY, new WorkerNodeService("ip2", 2, "0") + WorkerNodeService.DISCOVERY_SERVICE_KEY, new WorkerNodeService("ip2", 2, "0", WorkerConfig.DEFAULT_CATEGORY) ) ); @@ -344,7 +345,7 @@ protected WorkerHolder createWorkerHolder( new DruidNode("service", "host", false, 1234, null, true, false), NodeType.MIDDLE_MANAGER, ImmutableMap.of( - WorkerNodeService.DISCOVERY_SERVICE_KEY, new WorkerNodeService("ip1", 2, "0") + WorkerNodeService.DISCOVERY_SERVICE_KEY, new WorkerNodeService("ip1", 2, "0", WorkerConfig.DEFAULT_CATEGORY) ) ); @@ -489,7 +490,7 @@ protected WorkerHolder createWorkerHolder( new DruidNode("service", "host", false, 1234, null, true, false), NodeType.MIDDLE_MANAGER, ImmutableMap.of( - WorkerNodeService.DISCOVERY_SERVICE_KEY, new WorkerNodeService("ip1", 2, "0") + WorkerNodeService.DISCOVERY_SERVICE_KEY, new WorkerNodeService("ip1", 2, "0", WorkerConfig.DEFAULT_CATEGORY) ) ); @@ -663,7 +664,7 @@ protected WorkerHolder createWorkerHolder( DiscoveryDruidNode druidNode = new DiscoveryDruidNode( new DruidNode("service", "host", false, 1234, null, true, false), NodeType.MIDDLE_MANAGER, - ImmutableMap.of(WorkerNodeService.DISCOVERY_SERVICE_KEY, new WorkerNodeService("ip1", 2, "0")) + ImmutableMap.of(WorkerNodeService.DISCOVERY_SERVICE_KEY, new WorkerNodeService("ip1", 2, "0", WorkerConfig.DEFAULT_CATEGORY)) ); workerHolders.put( @@ -844,7 +845,7 @@ protected WorkerHolder createWorkerHolder( new DruidNode("service", "host1", false, 8080, null, true, false), NodeType.MIDDLE_MANAGER, ImmutableMap.of( - WorkerNodeService.DISCOVERY_SERVICE_KEY, new WorkerNodeService("ip1", 1, "0") + WorkerNodeService.DISCOVERY_SERVICE_KEY, new WorkerNodeService("ip1", 1, "0", WorkerConfig.DEFAULT_CATEGORY) ) ); @@ -889,7 +890,7 @@ WorkerNodeService.DISCOVERY_SERVICE_KEY, new WorkerNodeService("ip1", 1, "0") DiscoveryDruidNode druidNode2 = new DiscoveryDruidNode( new DruidNode("service", "host2", false, 8080, null, true, false), NodeType.MIDDLE_MANAGER, - ImmutableMap.of(WorkerNodeService.DISCOVERY_SERVICE_KEY, new WorkerNodeService("ip2", 1, "0")) + ImmutableMap.of(WorkerNodeService.DISCOVERY_SERVICE_KEY, new WorkerNodeService("ip2", 1, "0", WorkerConfig.DEFAULT_CATEGORY)) ); workerHolders.put( @@ -920,7 +921,7 @@ WorkerNodeService.DISCOVERY_SERVICE_KEY, new WorkerNodeService("ip1", 1, "0") DiscoveryDruidNode druidNode3 = new DiscoveryDruidNode( new DruidNode("service", "host3", false, 8080, null, true, false), NodeType.MIDDLE_MANAGER, - ImmutableMap.of(WorkerNodeService.DISCOVERY_SERVICE_KEY, new WorkerNodeService("ip2", 1, "0")) + ImmutableMap.of(WorkerNodeService.DISCOVERY_SERVICE_KEY, new WorkerNodeService("ip2", 1, "0", WorkerConfig.DEFAULT_CATEGORY)) ); workerHolders.put( @@ -966,7 +967,7 @@ public void testTaskAddedOrUpdated1() throws Exception ); WorkerHolder workerHolder = EasyMock.createMock(WorkerHolder.class); - EasyMock.expect(workerHolder.getWorker()).andReturn(new Worker("http", "worker", "127.0.0.1", 1, "v1")).anyTimes(); + EasyMock.expect(workerHolder.getWorker()).andReturn(new Worker("http", "worker", "127.0.0.1", 1, "v1", WorkerConfig.DEFAULT_CATEGORY)).anyTimes(); workerHolder.setLastCompletedTaskTime(EasyMock.anyObject()); workerHolder.resetContinuouslyFailedTasksCount(); EasyMock.expect(workerHolder.getContinuouslyFailedTasksCount()).andReturn(0); @@ -1002,7 +1003,7 @@ public void testTaskAddedOrUpdated1() throws Exception // Another "rogue-worker" reports running it, and gets asked to shutdown the task WorkerHolder rogueWorkerHolder = EasyMock.createMock(WorkerHolder.class); EasyMock.expect(rogueWorkerHolder.getWorker()) - .andReturn(new Worker("http", "rogue-worker", "127.0.0.1", 5, "v1")) + .andReturn(new Worker("http", "rogue-worker", "127.0.0.1", 5, "v1", WorkerConfig.DEFAULT_CATEGORY)) .anyTimes(); rogueWorkerHolder.shutdownTask(task.getId()); EasyMock.replay(rogueWorkerHolder); @@ -1017,7 +1018,7 @@ public void testTaskAddedOrUpdated1() throws Exception // "rogue-worker" reports FAILURE for the task, ignored rogueWorkerHolder = EasyMock.createMock(WorkerHolder.class); EasyMock.expect(rogueWorkerHolder.getWorker()) - .andReturn(new Worker("http", "rogue-worker", "127.0.0.1", 5, "v1")) + .andReturn(new Worker("http", "rogue-worker", "127.0.0.1", 5, "v1", WorkerConfig.DEFAULT_CATEGORY)) .anyTimes(); EasyMock.replay(rogueWorkerHolder); taskRunner.taskAddedOrUpdated(TaskAnnouncement.create( @@ -1040,7 +1041,7 @@ public void testTaskAddedOrUpdated1() throws Exception // "rogue-worker" reports running it, and gets asked to shutdown the task rogueWorkerHolder = EasyMock.createMock(WorkerHolder.class); EasyMock.expect(rogueWorkerHolder.getWorker()) - .andReturn(new Worker("http", "rogue-worker", "127.0.0.1", 5, "v1")) + .andReturn(new Worker("http", "rogue-worker", "127.0.0.1", 5, "v1", WorkerConfig.DEFAULT_CATEGORY)) .anyTimes(); rogueWorkerHolder.shutdownTask(task.getId()); EasyMock.replay(rogueWorkerHolder); @@ -1055,7 +1056,7 @@ public void testTaskAddedOrUpdated1() throws Exception // "rogue-worker" reports FAILURE for the tasks, ignored rogueWorkerHolder = EasyMock.createMock(WorkerHolder.class); EasyMock.expect(rogueWorkerHolder.getWorker()) - .andReturn(new Worker("http", "rogue-worker", "127.0.0.1", 5, "v1")) + .andReturn(new Worker("http", "rogue-worker", "127.0.0.1", 5, "v1", WorkerConfig.DEFAULT_CATEGORY)) .anyTimes(); EasyMock.replay(rogueWorkerHolder); taskRunner.taskAddedOrUpdated(TaskAnnouncement.create( @@ -1094,7 +1095,7 @@ public void testTaskAddedOrUpdated2() throws Exception listenerNotificationsAccumulator ); - Worker worker = new Worker("http", "localhost", "127.0.0.1", 1, "v1"); + Worker worker = new Worker("http", "localhost", "127.0.0.1", 1, "v1", WorkerConfig.DEFAULT_CATEGORY); WorkerHolder workerHolder = EasyMock.createMock(WorkerHolder.class); EasyMock.expect(workerHolder.getWorker()).andReturn(worker).anyTimes(); @@ -1153,7 +1154,7 @@ public void testTaskAddedOrUpdated3() HttpRemoteTaskRunner taskRunner = createTaskRunnerForTestTaskAddedOrUpdated(taskStorage, listenerNotificationsAccumulator); - Worker worker = new Worker("http", "localhost", "127.0.0.1", 1, "v1"); + Worker worker = new Worker("http", "localhost", "127.0.0.1", 1, "v1", WorkerConfig.DEFAULT_CATEGORY); WorkerHolder workerHolder = EasyMock.createMock(WorkerHolder.class); EasyMock.expect(workerHolder.getWorker()).andReturn(worker).anyTimes(); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/hrtr/WorkerHolderTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/hrtr/WorkerHolderTest.java index bc07e51f0cb9..3319d8125713 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/hrtr/WorkerHolderTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/hrtr/WorkerHolderTest.java @@ -28,6 +28,7 @@ import org.apache.druid.indexing.worker.TaskAnnouncement; import org.apache.druid.indexing.worker.Worker; import org.apache.druid.indexing.worker.WorkerHistoryItem; +import org.apache.druid.indexing.worker.config.WorkerConfig; import org.apache.druid.java.util.http.client.HttpClient; import org.apache.druid.segment.TestHelper; import org.apache.druid.server.coordination.ChangeRequestHttpSyncer; @@ -59,7 +60,7 @@ public void testSyncListener() new HttpRemoteTaskRunnerConfig(), EasyMock.createNiceMock(ScheduledExecutorService.class), (taskAnnouncement, holder) -> updates.add(taskAnnouncement), - new Worker("http", "localhost", "127.0.0.1", 5, "v0"), + new Worker("http", "localhost", "127.0.0.1", 5, "v0", WorkerConfig.DEFAULT_CATEGORY), ImmutableList.of( TaskAnnouncement.create( task0, diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/setup/EqualDistributionWithAffinityWorkerSelectStrategyTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/setup/EqualDistributionWithAffinityWorkerSelectStrategyTest.java index 3c0bddbce950..05d1bd0ed76f 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/setup/EqualDistributionWithAffinityWorkerSelectStrategyTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/setup/EqualDistributionWithAffinityWorkerSelectStrategyTest.java @@ -26,6 +26,7 @@ import org.apache.druid.indexing.overlord.ImmutableWorkerInfo; import org.apache.druid.indexing.overlord.config.RemoteTaskRunnerConfig; import org.apache.druid.indexing.worker.Worker; +import org.apache.druid.indexing.worker.config.WorkerConfig; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.segment.TestHelper; import org.junit.Assert; @@ -55,31 +56,31 @@ public String getDataSource() ImmutableMap.of( "localhost0", new ImmutableWorkerInfo( - new Worker("http", "localhost0", "localhost0", 2, "v1"), 0, + new Worker("http", "localhost0", "localhost0", 2, "v1", WorkerConfig.DEFAULT_CATEGORY), 0, new HashSet<>(), new HashSet<>(), DateTimes.nowUtc() ), "localhost1", new ImmutableWorkerInfo( - new Worker("http", "localhost1", "localhost1", 2, "v1"), 0, - new HashSet<>(), - new HashSet<>(), - DateTimes.nowUtc() + new Worker("http", "localhost1", "localhost1", 2, "v1", WorkerConfig.DEFAULT_CATEGORY), 0, + new HashSet<>(), + new HashSet<>(), + DateTimes.nowUtc() ), "localhost2", new ImmutableWorkerInfo( - new Worker("http", "localhost2", "localhost2", 2, "v1"), 1, - new HashSet<>(), - new HashSet<>(), - DateTimes.nowUtc() + new Worker("http", "localhost2", "localhost2", 2, "v1", WorkerConfig.DEFAULT_CATEGORY), 1, + new HashSet<>(), + new HashSet<>(), + DateTimes.nowUtc() ), "localhost3", new ImmutableWorkerInfo( - new Worker("http", "localhost3", "localhost3", 2, "v1"), 1, - new HashSet<>(), - new HashSet<>(), - DateTimes.nowUtc() + new Worker("http", "localhost3", "localhost3", 2, "v1", WorkerConfig.DEFAULT_CATEGORY), 1, + new HashSet<>(), + new HashSet<>(), + DateTimes.nowUtc() ) ), noopTask @@ -99,17 +100,17 @@ public void testFindWorkerForTaskWithNulls() ImmutableMap.of( "lhost", new ImmutableWorkerInfo( - new Worker("http", "lhost", "lhost", 1, "v1"), 0, - new HashSet<>(), - new HashSet<>(), - DateTimes.nowUtc() + new Worker("http", "lhost", "lhost", 1, "v1", WorkerConfig.DEFAULT_CATEGORY), 0, + new HashSet<>(), + new HashSet<>(), + DateTimes.nowUtc() ), "localhost", new ImmutableWorkerInfo( - new Worker("http", "localhost", "localhost", 1, "v1"), 0, - new HashSet<>(), - new HashSet<>(), - DateTimes.nowUtc() + new Worker("http", "localhost", "localhost", 1, "v1", WorkerConfig.DEFAULT_CATEGORY), 0, + new HashSet<>(), + new HashSet<>(), + DateTimes.nowUtc() ) ), new NoopTask(null, null, null, 1, 0, null, null, null) @@ -129,10 +130,10 @@ public void testIsolation() ImmutableMap.of( "localhost", new ImmutableWorkerInfo( - new Worker("http", "localhost", "localhost", 1, "v1"), 0, - new HashSet<>(), - new HashSet<>(), - DateTimes.nowUtc() + new Worker("http", "localhost", "localhost", 1, "v1", WorkerConfig.DEFAULT_CATEGORY), 0, + new HashSet<>(), + new HashSet<>(), + DateTimes.nowUtc() ) ), new NoopTask(null, null, null, 1, 0, null, null, null) diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/setup/EqualDistributionWithCategorySpecWorkerSelectStrategyTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/setup/EqualDistributionWithCategorySpecWorkerSelectStrategyTest.java new file mode 100644 index 000000000000..80366ea03ad6 --- /dev/null +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/setup/EqualDistributionWithCategorySpecWorkerSelectStrategyTest.java @@ -0,0 +1,190 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.overlord.setup; + +import com.google.common.collect.ImmutableMap; +import org.apache.druid.indexing.common.task.NoopTask; +import org.apache.druid.indexing.overlord.ImmutableWorkerInfo; +import org.apache.druid.indexing.overlord.config.RemoteTaskRunnerConfig; +import org.apache.druid.indexing.worker.Worker; +import org.apache.druid.java.util.common.DateTimes; +import org.junit.Assert; +import org.junit.Test; + +import java.util.HashSet; + +public class EqualDistributionWithCategorySpecWorkerSelectStrategyTest +{ + private static final ImmutableMap WORKERS_FOR_TIER_TESTS = + ImmutableMap.of( + "localhost0", + new ImmutableWorkerInfo( + new Worker("http", "localhost0", "localhost0", 1, "v1", "c1"), 0, + new HashSet<>(), + new HashSet<>(), + DateTimes.nowUtc() + ), + "localhost1", + new ImmutableWorkerInfo( + new Worker("http", "localhost1", "localhost1", 2, "v1", "c1"), 0, + new HashSet<>(), + new HashSet<>(), + DateTimes.nowUtc() + ), + "localhost2", + new ImmutableWorkerInfo( + new Worker("http", "localhost2", "localhost2", 3, "v1", "c2"), 0, + new HashSet<>(), + new HashSet<>(), + DateTimes.nowUtc() + ), + "localhost3", + new ImmutableWorkerInfo( + new Worker("http", "localhost3", "localhost3", 4, "v1", "c2"), 0, + new HashSet<>(), + new HashSet<>(), + DateTimes.nowUtc() + ) + ); + + @Test + public void testFindWorkerForTaskWithNullWorkerTierSpec() + { + ImmutableWorkerInfo worker = selectWorker(null); + Assert.assertEquals("localhost3", worker.getWorker().getHost()); + } + + @Test + public void testFindWorkerForTaskWithPreferredTier() + { + // test defaultTier != null and tierAffinity is not empty + final WorkerCategorySpec workerCategorySpec1 = new WorkerCategorySpec( + ImmutableMap.of( + "noop", + new WorkerCategorySpec.CategoryConfig( + "c2", + ImmutableMap.of("ds1", "c2") + ) + ), + false + ); + + ImmutableWorkerInfo worker1 = selectWorker(workerCategorySpec1); + Assert.assertEquals("localhost3", worker1.getWorker().getHost()); + + // test defaultTier == null and tierAffinity is not empty + final WorkerCategorySpec workerCategorySpec2 = new WorkerCategorySpec( + ImmutableMap.of( + "noop", + new WorkerCategorySpec.CategoryConfig( + null, + ImmutableMap.of("ds1", "c2") + ) + ), + false + ); + + ImmutableWorkerInfo worker2 = selectWorker(workerCategorySpec2); + Assert.assertEquals("localhost3", worker2.getWorker().getHost()); + + // test defaultTier != null and tierAffinity is empty + final WorkerCategorySpec workerCategorySpec3 = new WorkerCategorySpec( + ImmutableMap.of( + "noop", + new WorkerCategorySpec.CategoryConfig( + "c2", + null + ) + ), + false + ); + + ImmutableWorkerInfo worker3 = selectWorker(workerCategorySpec3); + Assert.assertEquals("localhost3", worker3.getWorker().getHost()); + } + + @Test + public void testFindWorkerForTaskWithNullPreferredTier() + { + final WorkerCategorySpec workerCategorySpec = new WorkerCategorySpec( + ImmutableMap.of( + "noop", + new WorkerCategorySpec.CategoryConfig( + null, + null + ) + ), + false + ); + + ImmutableWorkerInfo worker = selectWorker(workerCategorySpec); + Assert.assertEquals("localhost3", worker.getWorker().getHost()); + } + + @Test + public void testWeakTierSpec() + { + final WorkerCategorySpec workerCategorySpec = new WorkerCategorySpec( + ImmutableMap.of( + "noop", + new WorkerCategorySpec.CategoryConfig( + "c1", + ImmutableMap.of("ds1", "c3") + ) + ), + false + ); + + ImmutableWorkerInfo worker = selectWorker(workerCategorySpec); + Assert.assertEquals("localhost3", worker.getWorker().getHost()); + } + + @Test + public void testStrongTierSpec() + { + final WorkerCategorySpec workerCategorySpec = new WorkerCategorySpec( + ImmutableMap.of( + "noop", + new WorkerCategorySpec.CategoryConfig( + "c1", + ImmutableMap.of("ds1", "c3") + ) + ), + true + ); + + ImmutableWorkerInfo worker = selectWorker(workerCategorySpec); + Assert.assertNull(worker); + } + + private ImmutableWorkerInfo selectWorker(WorkerCategorySpec workerCategorySpec) + { + final EqualDistributionWithCategorySpecWorkerSelectStrategy strategy = new EqualDistributionWithCategorySpecWorkerSelectStrategy( + workerCategorySpec); + + ImmutableWorkerInfo worker = strategy.findWorkerForTask( + new RemoteTaskRunnerConfig(), + WORKERS_FOR_TIER_TESTS, + new NoopTask(null, null, "ds1", 1, 0, null, null, null) + ); + + return worker; + } +} diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/setup/EqualDistributionWorkerSelectStrategyTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/setup/EqualDistributionWorkerSelectStrategyTest.java index 3602d5b57045..20259c3a52ed 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/setup/EqualDistributionWorkerSelectStrategyTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/setup/EqualDistributionWorkerSelectStrategyTest.java @@ -25,6 +25,7 @@ import org.apache.druid.indexing.overlord.ImmutableWorkerInfo; import org.apache.druid.indexing.overlord.config.RemoteTaskRunnerConfig; import org.apache.druid.indexing.worker.Worker; +import org.apache.druid.indexing.worker.config.WorkerConfig; import org.apache.druid.java.util.common.DateTimes; import org.junit.Assert; import org.junit.Test; @@ -37,28 +38,28 @@ public class EqualDistributionWorkerSelectStrategyTest ImmutableMap.of( "localhost0", new ImmutableWorkerInfo( - new Worker("http", "localhost0", "localhost0", 2, "v1"), 0, + new Worker("http", "localhost0", "localhost0", 2, "v1", WorkerConfig.DEFAULT_CATEGORY), 0, new HashSet<>(), new HashSet<>(), DateTimes.nowUtc() ), "localhost1", new ImmutableWorkerInfo( - new Worker("http", "localhost1", "localhost1", 2, "v1"), 0, + new Worker("http", "localhost1", "localhost1", 2, "v1", WorkerConfig.DEFAULT_CATEGORY), 0, new HashSet<>(), new HashSet<>(), DateTimes.nowUtc() ), "localhost2", new ImmutableWorkerInfo( - new Worker("http", "localhost2", "localhost2", 2, "v1"), 1, + new Worker("http", "localhost2", "localhost2", 2, "v1", WorkerConfig.DEFAULT_CATEGORY), 1, new HashSet<>(), new HashSet<>(), DateTimes.nowUtc() ), "localhost3", new ImmutableWorkerInfo( - new Worker("http", "localhost3", "localhost3", 2, "v1"), 1, + new Worker("http", "localhost3", "localhost3", 2, "v1", WorkerConfig.DEFAULT_CATEGORY), 1, new HashSet<>(), new HashSet<>(), DateTimes.nowUtc() @@ -75,14 +76,14 @@ public void testFindWorkerForTask() ImmutableMap.of( "lhost", new ImmutableWorkerInfo( - new Worker("http", "lhost", "lhost", 1, "v1"), 0, + new Worker("http", "lhost", "lhost", 1, "v1", WorkerConfig.DEFAULT_CATEGORY), 0, new HashSet<>(), new HashSet<>(), DateTimes.nowUtc() ), "localhost", new ImmutableWorkerInfo( - new Worker("http", "localhost", "localhost", 1, "v1"), 1, + new Worker("http", "localhost", "localhost", 1, "v1", WorkerConfig.DEFAULT_CATEGORY), 1, new HashSet<>(), new HashSet<>(), DateTimes.nowUtc() @@ -110,14 +111,14 @@ public void testFindWorkerForTaskWhenSameCurrCapacityUsed() ImmutableMap.of( "lhost", new ImmutableWorkerInfo( - new Worker("http", "lhost", "lhost", 5, "v1"), 5, + new Worker("http", "lhost", "lhost", 5, "v1", WorkerConfig.DEFAULT_CATEGORY), 5, new HashSet<>(), new HashSet<>(), DateTimes.nowUtc() ), "localhost", new ImmutableWorkerInfo( - new Worker("http", "localhost", "localhost", 10, "v1"), 5, + new Worker("http", "localhost", "localhost", 10, "v1", WorkerConfig.DEFAULT_CATEGORY), 5, new HashSet<>(), new HashSet<>(), DateTimes.nowUtc() @@ -146,14 +147,14 @@ public void testOneDisableWorkerDifferentUsedCapacity() ImmutableMap.of( "lhost", new ImmutableWorkerInfo( - new Worker("http", "disableHost", "disableHost", 10, disabledVersion), 2, + new Worker("http", "disableHost", "disableHost", 10, disabledVersion, WorkerConfig.DEFAULT_CATEGORY), 2, new HashSet<>(), new HashSet<>(), DateTimes.nowUtc() ), "localhost", new ImmutableWorkerInfo( - new Worker("http", "enableHost", "enableHost", 10, "v1"), 5, + new Worker("http", "enableHost", "enableHost", 10, "v1", WorkerConfig.DEFAULT_CATEGORY), 5, new HashSet<>(), new HashSet<>(), DateTimes.nowUtc() @@ -182,14 +183,14 @@ public void testOneDisableWorkerSameUsedCapacity() ImmutableMap.of( "lhost", new ImmutableWorkerInfo( - new Worker("http", "disableHost", "disableHost", 10, disabledVersion), 5, + new Worker("http", "disableHost", "disableHost", 10, disabledVersion, WorkerConfig.DEFAULT_CATEGORY), 5, new HashSet<>(), new HashSet<>(), DateTimes.nowUtc() ), "localhost", new ImmutableWorkerInfo( - new Worker("http", "enableHost", "enableHost", 10, "v1"), 5, + new Worker("http", "enableHost", "enableHost", 10, "v1", WorkerConfig.DEFAULT_CATEGORY), 5, new HashSet<>(), new HashSet<>(), DateTimes.nowUtc() diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/setup/FillCapacityWithAffinityWorkerSelectStrategyTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/setup/FillCapacityWithAffinityWorkerSelectStrategyTest.java index dc2cba48ad95..efdcb8235565 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/setup/FillCapacityWithAffinityWorkerSelectStrategyTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/setup/FillCapacityWithAffinityWorkerSelectStrategyTest.java @@ -25,6 +25,7 @@ import org.apache.druid.indexing.overlord.ImmutableWorkerInfo; import org.apache.druid.indexing.overlord.config.RemoteTaskRunnerConfig; import org.apache.druid.indexing.worker.Worker; +import org.apache.druid.indexing.worker.config.WorkerConfig; import org.apache.druid.java.util.common.DateTimes; import org.junit.Assert; import org.junit.Test; @@ -45,14 +46,14 @@ public void testFindWorkerForTask() ImmutableMap.of( "lhost", new ImmutableWorkerInfo( - new Worker("http", "lhost", "lhost", 1, "v1"), 0, + new Worker("http", "lhost", "lhost", 1, "v1", WorkerConfig.DEFAULT_CATEGORY), 0, new HashSet<>(), new HashSet<>(), DateTimes.nowUtc() ), "localhost", new ImmutableWorkerInfo( - new Worker("http", "localhost", "localhost", 1, "v1"), 0, + new Worker("http", "localhost", "localhost", 1, "v1", WorkerConfig.DEFAULT_CATEGORY), 0, new HashSet<>(), new HashSet<>(), DateTimes.nowUtc() @@ -82,14 +83,14 @@ public void testFindWorkerForTaskWithNulls() ImmutableMap.of( "lhost", new ImmutableWorkerInfo( - new Worker("http", "lhost", "lhost", 1, "v1"), 0, + new Worker("http", "lhost", "lhost", 1, "v1", WorkerConfig.DEFAULT_CATEGORY), 0, new HashSet<>(), new HashSet<>(), DateTimes.nowUtc() ), "localhost", new ImmutableWorkerInfo( - new Worker("http", "localhost", "localhost", 1, "v1"), 0, + new Worker("http", "localhost", "localhost", 1, "v1", WorkerConfig.DEFAULT_CATEGORY), 0, new HashSet<>(), new HashSet<>(), DateTimes.nowUtc() @@ -112,7 +113,7 @@ public void testIsolation() ImmutableMap.of( "localhost", new ImmutableWorkerInfo( - new Worker("http", "localhost", "localhost", 1, "v1"), 0, + new Worker("http", "localhost", "localhost", 1, "v1", WorkerConfig.DEFAULT_CATEGORY), 0, new HashSet<>(), new HashSet<>(), DateTimes.nowUtc() diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/setup/FillCapacityWithCategorySpecWorkerSelectStrategyTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/setup/FillCapacityWithCategorySpecWorkerSelectStrategyTest.java new file mode 100644 index 000000000000..8968a7538555 --- /dev/null +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/setup/FillCapacityWithCategorySpecWorkerSelectStrategyTest.java @@ -0,0 +1,190 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.overlord.setup; + +import com.google.common.collect.ImmutableMap; +import org.apache.druid.indexing.common.task.NoopTask; +import org.apache.druid.indexing.overlord.ImmutableWorkerInfo; +import org.apache.druid.indexing.overlord.config.RemoteTaskRunnerConfig; +import org.apache.druid.indexing.worker.Worker; +import org.apache.druid.java.util.common.DateTimes; +import org.junit.Assert; +import org.junit.Test; + +import java.util.HashSet; + +public class FillCapacityWithCategorySpecWorkerSelectStrategyTest +{ + private static final ImmutableMap WORKERS_FOR_TIER_TESTS = + ImmutableMap.of( + "localhost0", + new ImmutableWorkerInfo( + new Worker("http", "localhost0", "localhost0", 5, "v1", "c1"), 1, + new HashSet<>(), + new HashSet<>(), + DateTimes.nowUtc() + ), + "localhost1", + new ImmutableWorkerInfo( + new Worker("http", "localhost1", "localhost1", 5, "v1", "c1"), 2, + new HashSet<>(), + new HashSet<>(), + DateTimes.nowUtc() + ), + "localhost2", + new ImmutableWorkerInfo( + new Worker("http", "localhost2", "localhost2", 5, "v1", "c2"), 3, + new HashSet<>(), + new HashSet<>(), + DateTimes.nowUtc() + ), + "localhost3", + new ImmutableWorkerInfo( + new Worker("http", "localhost3", "localhost3", 5, "v1", "c2"), 4, + new HashSet<>(), + new HashSet<>(), + DateTimes.nowUtc() + ) + ); + + @Test + public void testFindWorkerForTaskWithNullWorkerTierSpec() + { + ImmutableWorkerInfo worker = selectWorker(null); + Assert.assertEquals("localhost3", worker.getWorker().getHost()); + } + + @Test + public void testFindWorkerForTaskWithPreferredTier() + { + // test defaultTier != null and tierAffinity is not empty + final WorkerCategorySpec workerCategorySpec1 = new WorkerCategorySpec( + ImmutableMap.of( + "noop", + new WorkerCategorySpec.CategoryConfig( + "c1", + ImmutableMap.of("ds1", "c1") + ) + ), + false + ); + + ImmutableWorkerInfo worker1 = selectWorker(workerCategorySpec1); + Assert.assertEquals("localhost1", worker1.getWorker().getHost()); + + // test defaultTier == null and tierAffinity is not empty + final WorkerCategorySpec workerCategorySpec2 = new WorkerCategorySpec( + ImmutableMap.of( + "noop", + new WorkerCategorySpec.CategoryConfig( + null, + ImmutableMap.of("ds1", "c1") + ) + ), + false + ); + + ImmutableWorkerInfo worker2 = selectWorker(workerCategorySpec2); + Assert.assertEquals("localhost1", worker2.getWorker().getHost()); + + // test defaultTier != null and tierAffinity is empty + final WorkerCategorySpec workerCategorySpec3 = new WorkerCategorySpec( + ImmutableMap.of( + "noop", + new WorkerCategorySpec.CategoryConfig( + "c1", + null + ) + ), + false + ); + + ImmutableWorkerInfo worker3 = selectWorker(workerCategorySpec3); + Assert.assertEquals("localhost1", worker3.getWorker().getHost()); + } + + @Test + public void testFindWorkerForTaskWithNullPreferredTier() + { + final WorkerCategorySpec workerCategorySpec = new WorkerCategorySpec( + ImmutableMap.of( + "noop", + new WorkerCategorySpec.CategoryConfig( + null, + null + ) + ), + false + ); + + ImmutableWorkerInfo worker = selectWorker(workerCategorySpec); + Assert.assertEquals("localhost3", worker.getWorker().getHost()); + } + + @Test + public void testWeakTierSpec() + { + final WorkerCategorySpec workerCategorySpec = new WorkerCategorySpec( + ImmutableMap.of( + "noop", + new WorkerCategorySpec.CategoryConfig( + "c1", + ImmutableMap.of("ds1", "c3") + ) + ), + false + ); + + ImmutableWorkerInfo worker = selectWorker(workerCategorySpec); + Assert.assertEquals("localhost3", worker.getWorker().getHost()); + } + + @Test + public void testStrongTierSpec() + { + final WorkerCategorySpec workerCategorySpec = new WorkerCategorySpec( + ImmutableMap.of( + "noop", + new WorkerCategorySpec.CategoryConfig( + "c1", + ImmutableMap.of("ds1", "c3") + ) + ), + true + ); + + ImmutableWorkerInfo worker = selectWorker(workerCategorySpec); + Assert.assertNull(worker); + } + + private ImmutableWorkerInfo selectWorker(WorkerCategorySpec workerCategorySpec) + { + final FillCapacityWithCategorySpecWorkerSelectStrategy strategy = new FillCapacityWithCategorySpecWorkerSelectStrategy( + workerCategorySpec); + + ImmutableWorkerInfo worker = strategy.findWorkerForTask( + new RemoteTaskRunnerConfig(), + WORKERS_FOR_TIER_TESTS, + new NoopTask(null, null, "ds1", 1, 0, null, null, null) + ); + + return worker; + } +} diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/setup/WorkerCategorySpecTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/setup/WorkerCategorySpecTest.java new file mode 100644 index 000000000000..4277984fc10a --- /dev/null +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/setup/WorkerCategorySpecTest.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.overlord.setup; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableMap; +import org.apache.druid.jackson.DefaultObjectMapper; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class WorkerCategorySpecTest +{ + private ObjectMapper mapper; + + @Before + public void setUp() + { + mapper = new DefaultObjectMapper(); + } + + @Test + public void testSerde() throws Exception + { + String jsonStr = "{\n" + + " \"strong\": true,\n" + + " \"categoryMap\": {\n" + + " \"index_kafka\": {\"defaultCategory\": \"c1\", \"categoryAffinity\": {\"ds1\": \"c2\"}}\n" + + " }\n" + + "}"; + + WorkerCategorySpec workerCategorySpec = mapper.readValue( + mapper.writeValueAsString( + mapper.readValue( + jsonStr, + WorkerCategorySpec.class + ) + ), WorkerCategorySpec.class + ); + + Assert.assertTrue(workerCategorySpec.isStrong()); + Assert.assertEquals(ImmutableMap.of( + "index_kafka", + new WorkerCategorySpec.CategoryConfig("c1", ImmutableMap.of("ds1", "c2")) + ), workerCategorySpec.getCategoryMap()); + } +} diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskMonitorTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskMonitorTest.java index a5bddbea4a45..93379f843326 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskMonitorTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskMonitorTest.java @@ -43,6 +43,7 @@ import org.apache.druid.indexing.common.task.Task; import org.apache.druid.indexing.overlord.SingleTaskBackgroundRunner; import org.apache.druid.indexing.overlord.TestRemoteTaskRunnerConfig; +import org.apache.druid.indexing.worker.config.WorkerConfig; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.segment.IndexIO; import org.apache.druid.segment.IndexMergerV9; @@ -112,7 +113,8 @@ public void setUp() throws Exception "worker", "localhost", 3, - "0" + "0", + WorkerConfig.DEFAULT_CATEGORY ); workerCuratorCoordinator = new WorkerCuratorCoordinator( diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/worker/http/WorkerResourceTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/worker/http/WorkerResourceTest.java index 3e79405223f9..3667ef74e27a 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/worker/http/WorkerResourceTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/worker/http/WorkerResourceTest.java @@ -29,6 +29,7 @@ import org.apache.druid.indexing.worker.Worker; import org.apache.druid.indexing.worker.WorkerCuratorCoordinator; import org.apache.druid.indexing.worker.WorkerTaskMonitor; +import org.apache.druid.indexing.worker.config.WorkerConfig; import org.apache.druid.jackson.DefaultObjectMapper; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.server.initialization.IndexerZkConfig; @@ -77,7 +78,8 @@ public void setUp() throws Exception "host", "ip", 3, - "v1" + "v1", + WorkerConfig.DEFAULT_CATEGORY ); curatorCoordinator = new WorkerCuratorCoordinator( diff --git a/server/src/main/java/org/apache/druid/discovery/WorkerNodeService.java b/server/src/main/java/org/apache/druid/discovery/WorkerNodeService.java index 5ccfec964693..71ad41aacbea 100644 --- a/server/src/main/java/org/apache/druid/discovery/WorkerNodeService.java +++ b/server/src/main/java/org/apache/druid/discovery/WorkerNodeService.java @@ -33,16 +33,19 @@ public class WorkerNodeService extends DruidService private final String ip; private final int capacity; private final String version; + private final String category; public WorkerNodeService( @JsonProperty("ip") String ip, @JsonProperty("capacity") int capacity, - @JsonProperty("version") String version + @JsonProperty("version") String version, + @JsonProperty("category") String category ) { this.ip = ip; this.capacity = capacity; this.version = version; + this.category = category; } @Override @@ -69,6 +72,12 @@ public String getVersion() return version; } + @JsonProperty + public String getCategory() + { + return category; + } + @Override public boolean equals(Object o) { @@ -81,13 +90,14 @@ public boolean equals(Object o) WorkerNodeService that = (WorkerNodeService) o; return capacity == that.capacity && Objects.equals(ip, that.ip) && - Objects.equals(version, that.version); + Objects.equals(version, that.version) && + Objects.equals(category, that.category); } @Override public int hashCode() { - return Objects.hash(ip, capacity, version); + return Objects.hash(ip, capacity, version, category); } @Override @@ -97,6 +107,7 @@ public String toString() "ip='" + ip + '\'' + ", capacity=" + capacity + ", version='" + version + '\'' + + ", category='" + category + '\'' + '}'; } } diff --git a/server/src/main/java/org/apache/druid/indexing/worker/config/WorkerConfig.java b/server/src/main/java/org/apache/druid/indexing/worker/config/WorkerConfig.java index 0b6bfc6ac9bb..49e3c2a7b9b7 100644 --- a/server/src/main/java/org/apache/druid/indexing/worker/config/WorkerConfig.java +++ b/server/src/main/java/org/apache/druid/indexing/worker/config/WorkerConfig.java @@ -25,11 +25,14 @@ import org.joda.time.Period; import javax.validation.constraints.Min; +import javax.validation.constraints.NotNull; /** */ public class WorkerConfig { + public static final String DEFAULT_CATEGORY = "_default_worker_category"; + @JsonProperty private String ip = DruidNode.getDefaultHost(); @@ -41,6 +44,9 @@ public class WorkerConfig private int capacity = Math.max(1, JvmUtils.getRuntimeInfo().getAvailableProcessors() - 1); @JsonProperty + @NotNull + private String category = DEFAULT_CATEGORY; + private long intermediaryPartitionDiscoveryPeriodSec = 60L; @JsonProperty @@ -70,6 +76,11 @@ public int getCapacity() return capacity; } + public String getCategory() + { + return category; + } + public long getIntermediaryPartitionDiscoveryPeriodSec() { return intermediaryPartitionDiscoveryPeriodSec; diff --git a/server/src/test/java/org/apache/druid/discovery/WorkerNodeServiceTest.java b/server/src/test/java/org/apache/druid/discovery/WorkerNodeServiceTest.java index 873b189d0a9a..b4a08e942b99 100644 --- a/server/src/test/java/org/apache/druid/discovery/WorkerNodeServiceTest.java +++ b/server/src/test/java/org/apache/druid/discovery/WorkerNodeServiceTest.java @@ -34,7 +34,8 @@ public void testSerde() throws Exception DruidService expected = new WorkerNodeService( "1.1.1.1", 100, - "v1" + "v1", + "c1" ); ObjectMapper mapper = TestHelper.makeJsonMapper(); diff --git a/services/src/main/java/org/apache/druid/cli/CliIndexer.java b/services/src/main/java/org/apache/druid/cli/CliIndexer.java index 97afa5b413bd..42d24ac2661b 100644 --- a/services/src/main/java/org/apache/druid/cli/CliIndexer.java +++ b/services/src/main/java/org/apache/druid/cli/CliIndexer.java @@ -161,7 +161,8 @@ public Worker getWorker(@Self DruidNode node, WorkerConfig config) node.getHostAndPortToUse(), config.getIp(), config.getCapacity(), - config.getVersion() + config.getVersion(), + WorkerConfig.DEFAULT_CATEGORY ); } @@ -172,7 +173,8 @@ public WorkerNodeService getWorkerNodeService(WorkerConfig workerConfig) return new WorkerNodeService( workerConfig.getIp(), workerConfig.getCapacity(), - workerConfig.getVersion() + workerConfig.getVersion(), + WorkerConfig.DEFAULT_CATEGORY ); } }, diff --git a/services/src/main/java/org/apache/druid/cli/CliMiddleManager.java b/services/src/main/java/org/apache/druid/cli/CliMiddleManager.java index d3e30968f28b..8efeb3cecb67 100644 --- a/services/src/main/java/org/apache/druid/cli/CliMiddleManager.java +++ b/services/src/main/java/org/apache/druid/cli/CliMiddleManager.java @@ -154,7 +154,8 @@ public Worker getWorker(@Self DruidNode node, WorkerConfig config) node.getHostAndPortToUse(), config.getIp(), config.getCapacity(), - config.getVersion() + config.getVersion(), + config.getCategory() ); } @@ -165,7 +166,8 @@ public WorkerNodeService getWorkerNodeService(WorkerConfig workerConfig) return new WorkerNodeService( workerConfig.getIp(), workerConfig.getCapacity(), - workerConfig.getVersion() + workerConfig.getVersion(), + workerConfig.getCategory() ); } }, diff --git a/website/.spelling b/website/.spelling index 3167f85d1f3f..b4efcdfc23a9 100644 --- a/website/.spelling +++ b/website/.spelling @@ -1600,6 +1600,16 @@ v0.12.0 versionReplacementString workerId yyyy-MM-dd +taskType +index_kafka +c1 +c2 +ds1 +equalDistributionWithCategorySpec +fillCapacityWithCategorySpec +WorkerCategorySpec +workerCategorySpec +CategoryConfig - ../docs/design/index.md logsearch - ../docs/ingestion/index.md