dynamicConfigRef
)
{
this.smileMapper = smileMapper;
@@ -82,6 +86,7 @@ public KubernetesTaskRunnerFactory(
this.properties = properties;
this.druidKubernetesClient = druidKubernetesClient;
this.emitter = emitter;
+ this.dynamicConfigRef = dynamicConfigRef;
}
@Override
@@ -146,7 +151,8 @@ private TaskAdapter buildTaskAdapter(DruidKubernetesClient client)
druidNode,
smileMapper,
properties,
- taskLogs
+ taskLogs,
+ dynamicConfigRef
);
} else {
return new SingleContainerTaskAdapter(
diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/execution/DefaultKubernetesTaskRunnerDynamicConfig.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/execution/DefaultKubernetesTaskRunnerDynamicConfig.java
new file mode 100644
index 000000000000..eddd5e4a1ee1
--- /dev/null
+++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/execution/DefaultKubernetesTaskRunnerDynamicConfig.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.k8s.overlord.execution;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Preconditions;
+
+import java.util.Objects;
+
+public class DefaultKubernetesTaskRunnerDynamicConfig implements KubernetesTaskRunnerDynamicConfig
+{
+ private final PodTemplateSelectStrategy podTemplateSelectStrategy;
+
+ @JsonCreator
+ public DefaultKubernetesTaskRunnerDynamicConfig(
+ @JsonProperty("podTemplateSelectStrategy") PodTemplateSelectStrategy podTemplateSelectStrategy
+ )
+ {
+ Preconditions.checkNotNull(podTemplateSelectStrategy);
+ this.podTemplateSelectStrategy = podTemplateSelectStrategy;
+ }
+
+ @Override
+ @JsonProperty
+ public PodTemplateSelectStrategy getPodTemplateSelectStrategy()
+ {
+ return podTemplateSelectStrategy;
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ DefaultKubernetesTaskRunnerDynamicConfig that = (DefaultKubernetesTaskRunnerDynamicConfig) o;
+ return Objects.equals(podTemplateSelectStrategy, that.podTemplateSelectStrategy);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hashCode(podTemplateSelectStrategy);
+ }
+
+ @Override
+ public String toString()
+ {
+ return "DefaultKubernetesTaskRunnerDynamicConfig{" +
+ "podTemplateSelectStrategy=" + podTemplateSelectStrategy +
+ '}';
+ }
+}
diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/execution/KubernetesTaskExecutionConfigResource.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/execution/KubernetesTaskExecutionConfigResource.java
new file mode 100644
index 000000000000..ec03b045f503
--- /dev/null
+++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/execution/KubernetesTaskExecutionConfigResource.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.k8s.overlord.execution;
+
+import com.google.common.collect.ImmutableMap;
+import com.sun.jersey.spi.container.ResourceFilters;
+import org.apache.druid.audit.AuditEntry;
+import org.apache.druid.audit.AuditManager;
+import org.apache.druid.common.config.ConfigManager;
+import org.apache.druid.common.config.JacksonConfigManager;
+import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.server.http.security.ConfigResourceFilter;
+import org.apache.druid.server.security.AuthorizationUtils;
+import org.joda.time.Interval;
+
+import javax.inject.Inject;
+import javax.servlet.http.HttpServletRequest;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.GET;
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Resource that manages Kubernetes-specific execution configurations for running tasks.
+ *
+ * This class handles the CRUD operations for execution configurations and provides
+ * endpoints to update, retrieve, and manage the history of these configurations.
+ */
+@Path("/druid/indexer/v1/k8s/taskrunner/executionconfig")
+public class KubernetesTaskExecutionConfigResource
+{
+ private static final Logger log = new Logger(KubernetesTaskExecutionConfigResource.class);
+ private final JacksonConfigManager configManager;
+ private final AuditManager auditManager;
+ private AtomicReference dynamicConfigRef = null;
+
+ @Inject
+ public KubernetesTaskExecutionConfigResource(
+ final JacksonConfigManager configManager,
+ final AuditManager auditManager
+ )
+ {
+ this.configManager = configManager;
+ this.auditManager = auditManager;
+ }
+
+ /**
+ * Updates the Kubernetes execution configuration.
+ *
+ * @param dynamicConfig the new execution configuration to set
+ * @param req the HTTP servlet request providing context for audit information
+ * @return a response indicating the success or failure of the update operation
+ */
+ @POST
+ @Consumes(MediaType.APPLICATION_JSON)
+ @ResourceFilters(ConfigResourceFilter.class)
+ public Response setExecutionConfig(
+ final KubernetesTaskRunnerDynamicConfig dynamicConfig,
+ @Context final HttpServletRequest req
+ )
+ {
+ final ConfigManager.SetResult setResult = configManager.set(
+ KubernetesTaskRunnerDynamicConfig.CONFIG_KEY,
+ dynamicConfig,
+ AuthorizationUtils.buildAuditInfo(req)
+ );
+ if (setResult.isOk()) {
+ log.info("Updating K8s execution configs: %s", dynamicConfig);
+
+ return Response.ok().build();
+ } else {
+ return Response.status(Response.Status.BAD_REQUEST).build();
+ }
+ }
+
+ /**
+ * Retrieves the history of changes to the Kubernetes execution configuration.
+ *
+ * @param interval the time interval for fetching historical data (optional)
+ * @param count the maximum number of historical entries to fetch (optional)
+ * @return a response containing a list of audit entries or an error message
+ */
+ @GET
+ @Path("/history")
+ @Produces(MediaType.APPLICATION_JSON)
+ @ResourceFilters(ConfigResourceFilter.class)
+ public Response getExecutionConfigHistory(
+ @QueryParam("interval") final String interval,
+ @QueryParam("count") final Integer count
+ )
+ {
+ Interval theInterval = interval == null ? null : Intervals.of(interval);
+ if (theInterval == null && count != null) {
+ try {
+ List executionEntryList = auditManager.fetchAuditHistory(
+ KubernetesTaskRunnerDynamicConfig.CONFIG_KEY,
+ KubernetesTaskRunnerDynamicConfig.CONFIG_KEY,
+ count
+ );
+ return Response.ok(executionEntryList).build();
+ }
+ catch (IllegalArgumentException e) {
+ return Response.status(Response.Status.BAD_REQUEST)
+ .entity(ImmutableMap.of("error", e.getMessage()))
+ .build();
+ }
+ }
+ List executionEntryList = auditManager.fetchAuditHistory(
+ KubernetesTaskRunnerDynamicConfig.CONFIG_KEY,
+ KubernetesTaskRunnerDynamicConfig.CONFIG_KEY,
+ theInterval
+ );
+ return Response.ok(executionEntryList).build();
+ }
+
+ /**
+ * Retrieves the current execution configuration for tasks running in Kubernetes.
+ *
+ * @return a Response object containing the current execution configuration in JSON format.
+ */
+ @GET
+ @Produces(MediaType.APPLICATION_JSON)
+ @ResourceFilters(ConfigResourceFilter.class)
+ public Response getExecutionConfig()
+ {
+ if (dynamicConfigRef == null) {
+ dynamicConfigRef = configManager.watch(KubernetesTaskRunnerDynamicConfig.CONFIG_KEY, KubernetesTaskRunnerDynamicConfig.class);
+ }
+
+ return Response.ok(dynamicConfigRef.get()).build();
+ }
+}
diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/execution/KubernetesTaskRunnerDynamicConfig.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/execution/KubernetesTaskRunnerDynamicConfig.java
new file mode 100644
index 000000000000..4f6d4b07c41d
--- /dev/null
+++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/execution/KubernetesTaskRunnerDynamicConfig.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.k8s.overlord.execution;
+
+import com.fasterxml.jackson.annotation.JsonSubTypes;
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+
+/**
+ * Represents the configuration for task execution within a Kubernetes environment.
+ * This interface allows for dynamic configuration of task execution strategies based
+ * on specified behavior strategies.
+ */
+@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = DefaultKubernetesTaskRunnerDynamicConfig.class)
+@JsonSubTypes(value = {
+ @JsonSubTypes.Type(name = "default", value = DefaultKubernetesTaskRunnerDynamicConfig.class)
+})
+public interface KubernetesTaskRunnerDynamicConfig
+{
+ String CONFIG_KEY = "k8s.taskrunner.config";
+ PodTemplateSelectStrategy DEFAULT_STRATEGY = new TaskTypePodTemplateSelectStrategy();
+
+ /**
+ * Retrieves the execution behavior strategy associated with this configuration.
+ * @return the execution behavior strategy
+ */
+ PodTemplateSelectStrategy getPodTemplateSelectStrategy();
+}
diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/execution/PodTemplateSelectStrategy.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/execution/PodTemplateSelectStrategy.java
new file mode 100644
index 000000000000..1b8d57419d28
--- /dev/null
+++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/execution/PodTemplateSelectStrategy.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.k8s.overlord.execution;
+
+import com.fasterxml.jackson.annotation.JsonSubTypes;
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import io.fabric8.kubernetes.api.model.PodTemplate;
+import org.apache.druid.indexing.common.task.Task;
+
+import java.util.Map;
+
+/**
+ * Defines a strategy for selecting the Pod template of tasks based on specific conditions.
+ */
+@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = TaskTypePodTemplateSelectStrategy.class)
+@JsonSubTypes(value = {
+ @JsonSubTypes.Type(name = "default", value = TaskTypePodTemplateSelectStrategy.class),
+ @JsonSubTypes.Type(name = "selectorBased", value = SelectorBasedPodTemplateSelectStrategy.class),
+})
+public interface PodTemplateSelectStrategy
+{
+ /**
+ * Determines the appropriate Pod template for a task by evaluating its properties. This selection
+ * allows for customized resource allocation and management tailored to the task's specific requirements.
+ *
+ * @param task The task for which the Pod template is determined.
+ * @return The selected Pod template. If no matching template is found,
+ * the method falls back to a base template.
+ */
+ PodTemplate getPodTemplateForTask(Task task, Map templates);
+}
diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/execution/Selector.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/execution/Selector.java
new file mode 100644
index 000000000000..a314a69b3811
--- /dev/null
+++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/execution/Selector.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.k8s.overlord.execution;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.druid.indexing.common.task.Task;
+import org.apache.druid.query.DruidMetrics;
+
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+
+/**
+ * Represents a condition-based selector that evaluates whether a given task meets specified criteria.
+ * The selector uses conditions defined on context tags and task fields to determine if a task matches.
+ */
+public class Selector
+{
+ private final String selectionKey;
+ private final Map> cxtTagsConditions;
+ private final Set taskTypeCondition;
+ private final Set dataSourceCondition;
+
+ /**
+ * Creates a selector with specified conditions for context tags and task fields.
+ *
+ * @param selectionKey the identifier representing the outcome when a task matches the conditions
+ * @param cxtTagsConditions conditions on context tags
+ * @param taskTypeCondition conditions on task type
+ * @param dataSourceCondition conditions on task dataSource
+ */
+ @JsonCreator
+ public Selector(
+ @JsonProperty("selectionKey") String selectionKey,
+ @JsonProperty("context.tags") Map> cxtTagsConditions,
+ @JsonProperty("type") Set taskTypeCondition,
+ @JsonProperty("dataSource") Set dataSourceCondition
+ )
+ {
+ this.selectionKey = selectionKey;
+ this.cxtTagsConditions = cxtTagsConditions;
+ this.taskTypeCondition = taskTypeCondition;
+ this.dataSourceCondition = dataSourceCondition;
+ }
+
+ /**
+ * Evaluates this selector against a given task.
+ *
+ * @param task the task to evaluate
+ * @return true if the task meets all the conditions specified by this selector, otherwise false
+ */
+ public boolean evaluate(Task task)
+ {
+ boolean isMatch = true;
+ if (cxtTagsConditions != null) {
+ isMatch = cxtTagsConditions.entrySet().stream().allMatch(entry -> {
+ String tagKey = entry.getKey();
+ Set tagValues = entry.getValue();
+ Map tags = task.getContextValue(DruidMetrics.TAGS);
+ if (tags == null || tags.isEmpty()) {
+ return false;
+ }
+ Object tagValue = tags.get(tagKey);
+
+ return tagValue == null ? false : tagValues.contains((String) tagValue);
+ });
+ }
+
+ if (isMatch && taskTypeCondition != null) {
+ isMatch = taskTypeCondition.contains(task.getType());
+ }
+
+ if (isMatch && dataSourceCondition != null) {
+ isMatch = dataSourceCondition.contains(task.getDataSource());
+ }
+
+ return isMatch;
+ }
+
+ @JsonProperty
+ public String getSelectionKey()
+ {
+ return selectionKey;
+ }
+
+ @JsonProperty("context.tags")
+ public Map> getCxtTagsConditions()
+ {
+ return cxtTagsConditions;
+ }
+
+ @JsonProperty("type")
+ public Set getTaskTypeCondition()
+ {
+ return taskTypeCondition;
+ }
+
+ @JsonProperty("dataSource")
+ public Set getDataSourceCondition()
+ {
+ return dataSourceCondition;
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ Selector selector = (Selector) o;
+ return Objects.equals(selectionKey, selector.selectionKey) && Objects.equals(
+ cxtTagsConditions,
+ selector.cxtTagsConditions
+ ) && Objects.equals(taskTypeCondition, selector.taskTypeCondition) && Objects.equals(
+ dataSourceCondition,
+ selector.dataSourceCondition
+ );
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hash(selectionKey, cxtTagsConditions, taskTypeCondition, dataSourceCondition);
+ }
+
+ @Override
+ public String toString()
+ {
+ return "Selector{" +
+ "selectionKey=" + selectionKey +
+ ", context.tags=" + cxtTagsConditions +
+ ", type=" + taskTypeCondition +
+ ", dataSource=" + dataSourceCondition +
+ '}';
+ }
+}
diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/execution/SelectorBasedPodTemplateSelectStrategy.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/execution/SelectorBasedPodTemplateSelectStrategy.java
new file mode 100644
index 000000000000..938ed04e6a60
--- /dev/null
+++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/execution/SelectorBasedPodTemplateSelectStrategy.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.k8s.overlord.execution;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Preconditions;
+import io.fabric8.kubernetes.api.model.PodTemplate;
+import org.apache.druid.indexing.common.task.Task;
+
+import javax.annotation.Nullable;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * Implements {@link PodTemplateSelectStrategy} by dynamically evaluating a series of selectors.
+ * Each selector corresponds to a potential task template key.
+ */
+public class SelectorBasedPodTemplateSelectStrategy implements PodTemplateSelectStrategy
+{
+ @Nullable
+ private String defaultKey;
+ private List selectors;
+
+ @JsonCreator
+ public SelectorBasedPodTemplateSelectStrategy(
+ @JsonProperty("selectors") List selectors,
+ @JsonProperty("defaultKey") @Nullable String defaultKey
+ )
+ {
+ Preconditions.checkNotNull(selectors, "selectors");
+ this.selectors = selectors;
+ this.defaultKey = defaultKey;
+ }
+
+ /**
+ * Evaluates the provided task against the set selectors to determine its template.
+ *
+ * @param task the task to be checked
+ * @return the template if a selector matches, otherwise fallback to base template
+ */
+ @Override
+ public PodTemplate getPodTemplateForTask(Task task, Map templates)
+ {
+ String templateKey = selectors.stream()
+ .filter(selector -> selector.evaluate(task))
+ .findFirst()
+ .map(Selector::getSelectionKey)
+ .orElse(defaultKey);
+
+ return templates.getOrDefault(templateKey, templates.get("base"));
+ }
+
+ @JsonProperty
+ public List getSelectors()
+ {
+ return selectors;
+ }
+
+ @Nullable
+ @JsonProperty
+ public String getDefaultKey()
+ {
+ return defaultKey;
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ SelectorBasedPodTemplateSelectStrategy that = (SelectorBasedPodTemplateSelectStrategy) o;
+ return Objects.equals(defaultKey, that.defaultKey) && Objects.equals(selectors, that.selectors);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hash(defaultKey, selectors);
+ }
+
+ @Override
+ public String toString()
+ {
+ return "SelectorBasedPodTemplateSelectStrategy{" +
+ "selectors=" + selectors +
+ ", defaultKey=" + defaultKey +
+ '}';
+ }
+}
diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/execution/TaskTypePodTemplateSelectStrategy.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/execution/TaskTypePodTemplateSelectStrategy.java
new file mode 100644
index 000000000000..b374e0b6ff40
--- /dev/null
+++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/execution/TaskTypePodTemplateSelectStrategy.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.k8s.overlord.execution;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import io.fabric8.kubernetes.api.model.PodTemplate;
+import org.apache.druid.indexing.common.task.Task;
+
+import java.util.Map;
+
+/**
+ * This strategy defines how task template is selected based on their type for execution purposes.
+ *
+ * This implementation selects pod template by looking at the type of the task,
+ * making it a straightforward, type-based template selection strategy.
+ */
+public class TaskTypePodTemplateSelectStrategy implements PodTemplateSelectStrategy
+{
+
+ @JsonCreator
+ public TaskTypePodTemplateSelectStrategy()
+ {
+ }
+
+ @Override
+ public PodTemplate getPodTemplateForTask(Task task, Map templates)
+ {
+ return templates.getOrDefault(task.getType(), templates.get("base"));
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ return o instanceof TaskTypePodTemplateSelectStrategy;
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return 1; // Any constant will work here
+ }
+
+ @Override
+ public String toString()
+ {
+ return "TaskTypePodTemplateSelectStrategy{" +
+ '}';
+ }
+}
diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/PodTemplateTaskAdapter.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/PodTemplateTaskAdapter.java
index c22fa5869d8c..8e3788e31e15 100644
--- a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/PodTemplateTaskAdapter.java
+++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/PodTemplateTaskAdapter.java
@@ -20,6 +20,7 @@
package org.apache.druid.k8s.overlord.taskadapter;
import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import io.fabric8.kubernetes.api.model.EnvVar;
@@ -46,6 +47,8 @@
import org.apache.druid.k8s.overlord.common.DruidK8sConstants;
import org.apache.druid.k8s.overlord.common.K8sTaskId;
import org.apache.druid.k8s.overlord.common.KubernetesOverlordUtils;
+import org.apache.druid.k8s.overlord.execution.KubernetesTaskRunnerDynamicConfig;
+import org.apache.druid.k8s.overlord.execution.PodTemplateSelectStrategy;
import org.apache.druid.server.DruidNode;
import org.apache.druid.tasklogs.TaskLogs;
@@ -83,7 +86,6 @@ public class PodTemplateTaskAdapter implements TaskAdapter
private static final Logger log = new Logger(PodTemplateTaskAdapter.class);
-
private static final String TASK_PROPERTY = IndexingServiceModuleHelper.INDEXER_RUNNER_PROPERTY_PREFIX + ".k8s.podTemplate.";
private final KubernetesTaskRunnerConfig taskRunnerConfig;
@@ -92,6 +94,7 @@ public class PodTemplateTaskAdapter implements TaskAdapter
private final ObjectMapper mapper;
private final HashMap templates;
private final TaskLogs taskLogs;
+ private final Supplier dynamicConfigRef;
public PodTemplateTaskAdapter(
KubernetesTaskRunnerConfig taskRunnerConfig,
@@ -99,7 +102,8 @@ public PodTemplateTaskAdapter(
DruidNode node,
ObjectMapper mapper,
Properties properties,
- TaskLogs taskLogs
+ TaskLogs taskLogs,
+ Supplier dynamicConfigRef
)
{
this.taskRunnerConfig = taskRunnerConfig;
@@ -108,6 +112,7 @@ public PodTemplateTaskAdapter(
this.mapper = mapper;
this.templates = initializePodTemplates(properties);
this.taskLogs = taskLogs;
+ this.dynamicConfigRef = dynamicConfigRef;
}
/**
@@ -126,7 +131,16 @@ public PodTemplateTaskAdapter(
@Override
public Job fromTask(Task task) throws IOException
{
- PodTemplate podTemplate = templates.getOrDefault(task.getType(), templates.get("base"));
+ PodTemplateSelectStrategy podTemplateSelectStrategy;
+ KubernetesTaskRunnerDynamicConfig dynamicConfig = dynamicConfigRef.get();
+ if (dynamicConfig == null || dynamicConfig.getPodTemplateSelectStrategy() == null) {
+ podTemplateSelectStrategy = KubernetesTaskRunnerDynamicConfig.DEFAULT_STRATEGY;
+ } else {
+ podTemplateSelectStrategy = dynamicConfig.getPodTemplateSelectStrategy();
+ }
+
+ PodTemplate podTemplate = podTemplateSelectStrategy.getPodTemplateForTask(task, templates);
+
if (podTemplate == null) {
throw new ISE("Pod template spec not found for task type [%s]", task.getType());
}
@@ -152,7 +166,9 @@ public Job fromTask(Task task) throws IOException
.endTemplate()
.withActiveDeadlineSeconds(taskRunnerConfig.getTaskTimeout().toStandardDuration().getStandardSeconds())
.withBackoffLimit(0) // druid does not support an external system retrying failed tasks
- .withTtlSecondsAfterFinished((int) taskRunnerConfig.getTaskCleanupDelay().toStandardDuration().getStandardSeconds())
+ .withTtlSecondsAfterFinished((int) taskRunnerConfig.getTaskCleanupDelay()
+ .toStandardDuration()
+ .getStandardSeconds())
.endSpec()
.build();
}
@@ -320,12 +336,12 @@ private Map getJobLabels(KubernetesTaskRunnerConfig config, Task
private Map getJobAnnotations(KubernetesTaskRunnerConfig config, Task task)
{
return ImmutableMap.builder()
- .putAll(config.getAnnotations())
- .put(DruidK8sConstants.TASK_ID, task.getId())
- .put(DruidK8sConstants.TASK_TYPE, task.getType())
- .put(DruidK8sConstants.TASK_GROUP_ID, task.getGroupId())
- .put(DruidK8sConstants.TASK_DATASOURCE, task.getDataSource())
- .build();
+ .putAll(config.getAnnotations())
+ .put(DruidK8sConstants.TASK_ID, task.getId())
+ .put(DruidK8sConstants.TASK_TYPE, task.getType())
+ .put(DruidK8sConstants.TASK_GROUP_ID, task.getGroupId())
+ .put(DruidK8sConstants.TASK_DATASOURCE, task.getDataSource())
+ .build();
}
private String getDruidLabel(String baseLabel)
diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesOverlordModuleTest.java b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesOverlordModuleTest.java
index b83ec562bda7..369f29726a40 100644
--- a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesOverlordModuleTest.java
+++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesOverlordModuleTest.java
@@ -19,10 +19,15 @@
package org.apache.druid.k8s.overlord;
+import com.google.common.base.Supplier;
+import com.google.common.base.Suppliers;
import com.google.common.collect.ImmutableList;
import com.google.inject.Guice;
import com.google.inject.Injector;
import com.google.inject.ProvisionException;
+import com.google.inject.TypeLiteral;
+import org.apache.druid.audit.AuditManager;
+import org.apache.druid.common.config.ConfigManagerConfig;
import org.apache.druid.guice.ConfigModule;
import org.apache.druid.guice.DruidGuiceExtensions;
import org.apache.druid.guice.annotations.EscalatedGlobal;
@@ -33,6 +38,8 @@
import org.apache.druid.jackson.JacksonModule;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.http.client.HttpClient;
+import org.apache.druid.metadata.MetadataStorageConnector;
+import org.apache.druid.metadata.MetadataStorageTablesConfig;
import org.apache.druid.server.DruidNode;
import org.easymock.EasyMockRunner;
import org.easymock.Mock;
@@ -55,6 +62,14 @@ public class KubernetesOverlordModuleTest
private RemoteTaskRunnerFactory remoteTaskRunnerFactory;
@Mock
private HttpRemoteTaskRunnerFactory httpRemoteTaskRunnerFactory;
+ @Mock
+ private ConfigManagerConfig configManagerConfig;
+ @Mock
+ private MetadataStorageTablesConfig metadataStorageTablesConfig;
+ @Mock
+ private AuditManager auditManager;
+ @Mock
+ private MetadataStorageConnector metadataStorageConnector;
private Injector injector;
@Test
@@ -111,6 +126,16 @@ private Injector makeInjectorWithProperties(
if (isWorkerTypeHttpRemote) {
binder.bind(HttpRemoteTaskRunnerFactory.class).toInstance(httpRemoteTaskRunnerFactory);
}
+ binder.bind(
+ new TypeLiteral>()
+ {
+ }).toInstance(Suppliers.ofInstance(configManagerConfig));
+ binder.bind(
+ new TypeLiteral>()
+ {
+ }).toInstance(Suppliers.ofInstance(metadataStorageTablesConfig));
+ binder.bind(AuditManager.class).toInstance(auditManager);
+ binder.bind(MetadataStorageConnector.class).toInstance(metadataStorageConnector);
},
new ConfigModule(),
new KubernetesOverlordModule()
diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerFactoryTest.java b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerFactoryTest.java
index ba9d2accf170..ec347a5714d3 100644
--- a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerFactoryTest.java
+++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerFactoryTest.java
@@ -20,12 +20,14 @@
package org.apache.druid.k8s.overlord;
import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Supplier;
import org.apache.druid.indexing.common.TestUtils;
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.common.config.TaskConfigBuilder;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.k8s.overlord.common.DruidKubernetesClient;
+import org.apache.druid.k8s.overlord.execution.KubernetesTaskRunnerDynamicConfig;
import org.apache.druid.k8s.overlord.taskadapter.MultiContainerTaskAdapter;
import org.apache.druid.k8s.overlord.taskadapter.PodTemplateTaskAdapter;
import org.apache.druid.k8s.overlord.taskadapter.SingleContainerTaskAdapter;
@@ -53,6 +55,7 @@ public class KubernetesTaskRunnerFactoryTest
private DruidKubernetesClient druidKubernetesClient;
@Mock private ServiceEmitter emitter;
+ @Mock private Supplier dynamicConfigRef;
@Before
public void setup()
@@ -90,7 +93,8 @@ public void test_get_returnsSameKuberentesTaskRunner_asBuild()
taskConfig,
properties,
druidKubernetesClient,
- emitter
+ emitter,
+ dynamicConfigRef
);
KubernetesTaskRunner expectedRunner = factory.build();
@@ -112,7 +116,8 @@ public void test_build_withoutSidecarSupport_returnsKubernetesTaskRunnerWithSing
taskConfig,
properties,
druidKubernetesClient,
- emitter
+ emitter,
+ dynamicConfigRef
);
KubernetesTaskRunner runner = factory.build();
@@ -139,7 +144,8 @@ public void test_build_withSidecarSupport_returnsKubernetesTaskRunnerWithMultiCo
taskConfig,
properties,
druidKubernetesClient,
- emitter
+ emitter,
+ dynamicConfigRef
);
KubernetesTaskRunner runner = factory.build();
@@ -164,7 +170,8 @@ public void test_build_withSingleContainerAdapterType_returnsKubernetesTaskRunne
taskConfig,
props,
druidKubernetesClient,
- emitter
+ emitter,
+ dynamicConfigRef
);
KubernetesTaskRunner runner = factory.build();
@@ -194,7 +201,8 @@ public void test_build_withSingleContainerAdapterTypeAndSidecarSupport_throwsIAE
taskConfig,
props,
druidKubernetesClient,
- emitter
+ emitter,
+ dynamicConfigRef
);
Assert.assertThrows(
@@ -225,7 +233,8 @@ public void test_build_withMultiContainerAdapterType_returnsKubernetesTaskRunner
taskConfig,
props,
druidKubernetesClient,
- emitter
+ emitter,
+ dynamicConfigRef
);
KubernetesTaskRunner runner = factory.build();
@@ -250,7 +259,8 @@ public void test_build_withMultiContainerAdapterTypeAndSidecarSupport_returnsKub
taskConfig,
props,
druidKubernetesClient,
- emitter
+ emitter,
+ dynamicConfigRef
);
KubernetesTaskRunner runner = factory.build();
@@ -278,7 +288,8 @@ public void test_build_withPodTemplateAdapterType_returnsKubernetesTaskRunnerWit
taskConfig,
props,
druidKubernetesClient,
- emitter
+ emitter,
+ dynamicConfigRef
);
KubernetesTaskRunner runner = factory.build();
diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/execution/DefaultKubernetesTaskRunnerDynamicConfigTest.java b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/execution/DefaultKubernetesTaskRunnerDynamicConfigTest.java
new file mode 100644
index 000000000000..de8919e329de
--- /dev/null
+++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/execution/DefaultKubernetesTaskRunnerDynamicConfigTest.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.k8s.overlord.execution;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.druid.segment.TestHelper;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class DefaultKubernetesTaskRunnerDynamicConfigTest
+{
+
+ @Test
+ public void getPodTemplateSelectStrategyTest()
+ {
+ PodTemplateSelectStrategy strategy = new TaskTypePodTemplateSelectStrategy();
+ DefaultKubernetesTaskRunnerDynamicConfig config = new DefaultKubernetesTaskRunnerDynamicConfig(strategy);
+
+ Assert.assertEquals(strategy, config.getPodTemplateSelectStrategy());
+ }
+
+ @Test
+ public void testSerde() throws Exception
+ {
+ final ObjectMapper objectMapper = TestHelper.makeJsonMapper();
+ PodTemplateSelectStrategy strategy = new TaskTypePodTemplateSelectStrategy();
+ DefaultKubernetesTaskRunnerDynamicConfig config = new DefaultKubernetesTaskRunnerDynamicConfig(strategy);
+
+ DefaultKubernetesTaskRunnerDynamicConfig config2 = objectMapper.readValue(
+ objectMapper.writeValueAsBytes(config),
+ DefaultKubernetesTaskRunnerDynamicConfig.class
+ );
+ Assert.assertEquals(config, config2);
+ }
+}
diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/execution/KubernetesTaskExecutionConfigResourceTest.java b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/execution/KubernetesTaskExecutionConfigResourceTest.java
new file mode 100644
index 000000000000..b76b7eaf0cfe
--- /dev/null
+++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/execution/KubernetesTaskExecutionConfigResourceTest.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.k8s.overlord.execution;
+
+import org.apache.druid.audit.AuditManager;
+import org.apache.druid.common.config.ConfigManager;
+import org.apache.druid.common.config.JacksonConfigManager;
+import org.apache.druid.server.security.AuthConfig;
+import org.apache.druid.server.security.AuthorizationUtils;
+import org.easymock.EasyMock;
+import org.junit.Before;
+import org.junit.Test;
+
+import javax.servlet.http.HttpServletRequest;
+import javax.ws.rs.core.Response;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class KubernetesTaskExecutionConfigResourceTest
+{
+ private JacksonConfigManager configManager;
+ private AuditManager auditManager;
+ private HttpServletRequest req;
+ private KubernetesTaskRunnerDynamicConfig dynamicConfig;
+
+ @Before
+ public void setUp()
+ {
+ configManager = EasyMock.createMock(JacksonConfigManager.class);
+ auditManager = EasyMock.createMock(AuditManager.class);
+ req = EasyMock.createMock(HttpServletRequest.class);
+ dynamicConfig = EasyMock.createMock(KubernetesTaskRunnerDynamicConfig.class);
+ }
+
+ @Test
+ public void setExecutionConfigSuccessfulUpdate()
+ {
+ KubernetesTaskExecutionConfigResource testedResource = new KubernetesTaskExecutionConfigResource(
+ configManager,
+ auditManager
+ );
+ EasyMock.expect(req.getHeader(AuditManager.X_DRUID_AUTHOR)).andReturn(null).anyTimes();
+ EasyMock.expect(req.getHeader(AuditManager.X_DRUID_COMMENT)).andReturn(null).anyTimes();
+ EasyMock.expect(req.getAttribute(AuthConfig.DRUID_AUTHENTICATION_RESULT)).andReturn(null).anyTimes();
+ EasyMock.expect(req.getRemoteAddr()).andReturn("127.0.0.1").anyTimes();
+ EasyMock.replay(req);
+ EasyMock.expect(configManager.set(
+ KubernetesTaskRunnerDynamicConfig.CONFIG_KEY,
+ dynamicConfig,
+ AuthorizationUtils.buildAuditInfo(req)
+ )).andReturn(ConfigManager.SetResult.ok());
+ EasyMock.replay(configManager, auditManager, dynamicConfig);
+
+ Response result = testedResource.setExecutionConfig(dynamicConfig, req);
+ assertEquals(Response.Status.OK.getStatusCode(), result.getStatus());
+ }
+
+ @Test
+ public void setExecutionConfigFailedUpdate()
+ {
+ KubernetesTaskExecutionConfigResource testedResource = new KubernetesTaskExecutionConfigResource(
+ configManager,
+ auditManager
+ );
+ EasyMock.expect(req.getHeader(AuditManager.X_DRUID_AUTHOR)).andReturn(null).anyTimes();
+ EasyMock.expect(req.getHeader(AuditManager.X_DRUID_COMMENT)).andReturn(null).anyTimes();
+ EasyMock.expect(req.getAttribute(AuthConfig.DRUID_AUTHENTICATION_RESULT)).andReturn(null).anyTimes();
+ EasyMock.expect(req.getRemoteAddr()).andReturn("127.0.0.1").anyTimes();
+ EasyMock.replay(req);
+ EasyMock.expect(configManager.set(
+ KubernetesTaskRunnerDynamicConfig.CONFIG_KEY,
+ dynamicConfig,
+ AuthorizationUtils.buildAuditInfo(req)
+ )).andReturn(ConfigManager.SetResult.failure(new RuntimeException()));
+ EasyMock.replay(configManager, auditManager, dynamicConfig);
+
+ Response result = testedResource.setExecutionConfig(dynamicConfig, req);
+ assertEquals(Response.Status.BAD_REQUEST.getStatusCode(), result.getStatus());
+ }
+}
diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/execution/KubernetesTaskRunnerDynamicConfigTest.java b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/execution/KubernetesTaskRunnerDynamicConfigTest.java
new file mode 100644
index 000000000000..6236794d3661
--- /dev/null
+++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/execution/KubernetesTaskRunnerDynamicConfigTest.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.k8s.overlord.execution;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.druid.jackson.DefaultObjectMapper;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class KubernetesTaskRunnerDynamicConfigTest
+{
+ private final ObjectMapper jsonMapper = new DefaultObjectMapper();
+
+ @Test
+ public void testSerde() throws JsonProcessingException
+ {
+ String json = "{\n"
+ + " \"type\": \"default\",\n"
+ + " \"podTemplateSelectStrategy\": {\n"
+ + " \"type\": \"default\"\n"
+ + " }\n"
+ + "}";
+
+ KubernetesTaskRunnerDynamicConfig deserialized = jsonMapper.readValue(
+ json,
+ KubernetesTaskRunnerDynamicConfig.class
+ );
+ PodTemplateSelectStrategy selectStrategy = deserialized.getPodTemplateSelectStrategy();
+ Assert.assertTrue(selectStrategy instanceof TaskTypePodTemplateSelectStrategy);
+
+ json = "{\n"
+ + " \"type\": \"default\",\n"
+ + " \"podTemplateSelectStrategy\":\n"
+ + " {\n"
+ + " \"type\": \"selectorBased\",\n"
+ + " \"selectors\": [\n"
+ + " {\n"
+ + " \"selectionKey\": \"low-throughput\",\n"
+ + " \"context.tags\":\n"
+ + " {\n"
+ + " \"billingCategory\": [\"streaming_ingestion\"]\n"
+ + " },\n"
+ + " \"dataSource\": [\"wikipedia\"]\n"
+ + " },\n"
+ + " {\n"
+ + " \"selectionKey\": \"medium-throughput\",\n"
+ + " \"type\": [\"index_kafka\"]\n"
+ + " }\n"
+ + " ],\n"
+ + " \"defaultKey\": \"base\"\n"
+ + " }\n"
+ + "}";
+
+ deserialized = jsonMapper.readValue(json, KubernetesTaskRunnerDynamicConfig.class);
+ selectStrategy = deserialized.getPodTemplateSelectStrategy();
+ Assert.assertTrue(selectStrategy instanceof SelectorBasedPodTemplateSelectStrategy);
+ Assert.assertEquals(2, ((SelectorBasedPodTemplateSelectStrategy) selectStrategy).getSelectors().size());
+ }
+}
diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/execution/SelectorBasedPodTemplateSelectStrategyTest.java b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/execution/SelectorBasedPodTemplateSelectStrategyTest.java
new file mode 100644
index 000000000000..9aa1376a5157
--- /dev/null
+++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/execution/SelectorBasedPodTemplateSelectStrategyTest.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.k8s.overlord.execution;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import io.fabric8.kubernetes.api.model.ObjectMeta;
+import io.fabric8.kubernetes.api.model.PodTemplate;
+import org.apache.druid.indexing.common.task.NoopTask;
+import org.apache.druid.indexing.common.task.Task;
+import org.apache.druid.segment.TestHelper;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class SelectorBasedPodTemplateSelectStrategyTest
+{
+ private Map templates;
+
+ @Before
+ public void setup()
+ {
+ templates = ImmutableMap.of(
+ "mock",
+ new PodTemplate(null, null, new ObjectMeta()
+ {
+ @Override
+ public String getName()
+ {
+ return "mock";
+ }
+ }, null),
+ "no_match",
+ new PodTemplate(null, null, new ObjectMeta()
+ {
+ @Override
+ public String getName()
+ {
+ return "no_match";
+ }
+ }, null),
+ "match",
+ new PodTemplate(null, null, new ObjectMeta()
+ {
+ @Override
+ public String getName()
+ {
+ return "match";
+ }
+ }, null),
+ "base",
+ new PodTemplate(null, "base", new ObjectMeta()
+ {
+ @Override
+ public String getName()
+ {
+ return "base";
+ }
+ }, null)
+ );
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void shouldThrowNullPointerExceptionWhenSelectorsAreNull()
+ {
+ new SelectorBasedPodTemplateSelectStrategy(null, null);
+ }
+
+ @Test
+ public void testGetPodTemplate_ForTask_emptySelectorsFallbackToBaseTemplate()
+ {
+ List emptySelectors = Collections.emptyList();
+ SelectorBasedPodTemplateSelectStrategy strategy = new SelectorBasedPodTemplateSelectStrategy(emptySelectors, null);
+ Task task = NoopTask.create();
+ Assert.assertEquals("base", strategy.getPodTemplateForTask(task, templates).getMetadata().getName());
+ }
+
+ @Test
+ public void testGetPodTemplate_ForTask_noMatchSelectorsFallbackToBaseTemplateIfNullDefaultKey()
+ {
+ Selector noMatchSelector = new MockSelector(false, "mock");
+ List selectors = Collections.singletonList(noMatchSelector);
+ SelectorBasedPodTemplateSelectStrategy strategy = new SelectorBasedPodTemplateSelectStrategy(selectors, null);
+ Task task = NoopTask.create();
+ Assert.assertEquals("base", strategy.getPodTemplateForTask(task, templates).getMetadata().getName());
+ }
+
+ @Test
+ public void testGetPodTemplate_ForTask_noMatchSelectorsFallbackToDefaultKeyTemplate()
+ {
+ Selector noMatchSelector = new MockSelector(false, "mock");
+ List selectors = Collections.singletonList(noMatchSelector);
+ SelectorBasedPodTemplateSelectStrategy strategy = new SelectorBasedPodTemplateSelectStrategy(selectors, "match");
+ Task task = NoopTask.create();
+ Assert.assertEquals("match", strategy.getPodTemplateForTask(task, templates).getMetadata().getName());
+ }
+
+ @Test
+ public void testGetPodTemplate_ForTask_withMatchSelectors()
+ {
+ Selector noMatchSelector = new MockSelector(
+ false,
+ "no_match"
+ );
+ Selector matchSelector = new MockSelector(true, "match");
+ List selectors = Lists.newArrayList(
+ noMatchSelector,
+ matchSelector
+ );
+ SelectorBasedPodTemplateSelectStrategy strategy = new SelectorBasedPodTemplateSelectStrategy(selectors, null);
+ Task task = NoopTask.create();
+ Assert.assertEquals("match", strategy.getPodTemplateForTask(task, templates).getMetadata().getName());
+ }
+
+ @Test
+ public void testSerde() throws Exception
+ {
+ final ObjectMapper objectMapper = TestHelper.makeJsonMapper();
+ Map> cxtTagsConditions = new HashMap<>();
+ cxtTagsConditions.put("tag1", Sets.newHashSet("tag1value"));
+
+ Selector selector = new Selector(
+ "TestSelector",
+ cxtTagsConditions,
+ Sets.newHashSet(NoopTask.TYPE),
+ Sets.newHashSet("my_table")
+ );
+
+ SelectorBasedPodTemplateSelectStrategy strategy = new SelectorBasedPodTemplateSelectStrategy(
+ Collections.singletonList(selector), "default");
+
+ SelectorBasedPodTemplateSelectStrategy strategy2 = objectMapper.readValue(
+ objectMapper.writeValueAsBytes(strategy),
+ SelectorBasedPodTemplateSelectStrategy.class
+ );
+ Assert.assertEquals(strategy, strategy2);
+ }
+
+ static class MockSelector extends Selector
+ {
+ private final boolean matches;
+
+ MockSelector(boolean matches, String name)
+ {
+ super(name, null, null, null);
+ this.matches = matches;
+ }
+
+ @Override
+ public boolean evaluate(final Task task)
+ {
+ return matches;
+ }
+ }
+}
diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/execution/SelectorTest.java b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/execution/SelectorTest.java
new file mode 100644
index 000000000000..0ecff67408e3
--- /dev/null
+++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/execution/SelectorTest.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.k8s.overlord.execution;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Sets;
+import org.apache.druid.indexing.common.task.NoopTask;
+import org.apache.druid.indexing.common.task.Task;
+import org.apache.druid.query.DruidMetrics;
+import org.apache.druid.segment.TestHelper;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+public class SelectorTest
+{
+
+ @Test
+ public void shouldReturnTrueWhenAllTagsAndTasksMatch()
+ {
+ String dataSource = "my_table";
+ Map> cxtTagsConditions = new HashMap<>();
+ cxtTagsConditions.put("tag1", Sets.newHashSet("tag1value"));
+
+ Task task = NoopTask.forDatasource(dataSource);
+ task.addToContext(DruidMetrics.TAGS, ImmutableMap.of("tag1", "tag1value"));
+
+ Selector selector = new Selector(
+ "TestSelector",
+ cxtTagsConditions,
+ null,
+ Sets.newHashSet(dataSource)
+ );
+
+ Assert.assertTrue(selector.evaluate(task));
+ }
+
+ @Test
+ public void shouldReturnFalseWhenTagsDoNotMatch()
+ {
+ String dataSource = "my_table";
+ Map> cxtTagsConditions = new HashMap<>();
+ cxtTagsConditions.put("nonexistentTag", Sets.newHashSet("tag1value"));
+
+ Task task = NoopTask.forDatasource(dataSource);
+
+ Selector selector = new Selector(
+ "TestSelector",
+ cxtTagsConditions,
+ null,
+ Sets.newHashSet(dataSource)
+ );
+
+ Assert.assertFalse(selector.evaluate(task));
+ }
+
+ @Test
+ public void shouldReturnFalseWhenSomeTagsDoNotMatch()
+ {
+ String dataSource = "my_table";
+ Map> cxtTagsConditions = new HashMap<>();
+ cxtTagsConditions.put("nonexistentTag", Sets.newHashSet("nonexistentTagValue"));
+ cxtTagsConditions.put("tag1", Sets.newHashSet("tag1value"));
+
+ Task task = NoopTask.forDatasource(dataSource);
+ task.addToContext(DruidMetrics.TAGS, ImmutableMap.of("tag1", "tag1value"));
+
+ Selector selector = new Selector(
+ "TestSelector",
+ cxtTagsConditions,
+ null,
+ Sets.newHashSet(dataSource)
+ );
+
+ Assert.assertFalse(selector.evaluate(task));
+ }
+
+ @Test
+ public void shouldReturnFalseWhenTaskFieldsDoNotMatch()
+ {
+ Map> cxtTagsConditions = new HashMap<>();
+ cxtTagsConditions.put("tag1", Sets.newHashSet("tag1value"));
+
+ Task task = NoopTask.forDatasource("another_table");
+ task.addToContext(DruidMetrics.TAGS, ImmutableMap.of("tag1", "tag1value"));
+
+ Selector selector = new Selector(
+ "TestSelector",
+ cxtTagsConditions,
+ null,
+ Sets.newHashSet("my_table")
+ );
+
+ Assert.assertFalse(selector.evaluate(task));
+ }
+
+ @Test
+ public void shouldReturnFalseWhenSomeTaskFieldsDoNotMatch()
+ {
+ Map> cxtTagsConditions = new HashMap<>();
+ cxtTagsConditions.put("tag1", Sets.newHashSet("tag1value"));
+
+ Task task = NoopTask.forDatasource("another_table");
+ task.addToContext(DruidMetrics.TAGS, ImmutableMap.of("tag1", "tag1value"));
+
+ Selector selector = new Selector(
+ "TestSelector",
+ cxtTagsConditions,
+ Sets.newHashSet(NoopTask.TYPE),
+ Sets.newHashSet("my_table")
+ );
+
+ Assert.assertFalse(selector.evaluate(task));
+ }
+
+ @Test
+ public void shouldReturnTrueWhenNoConditionsSpecified()
+ {
+ Task task = NoopTask.forDatasource("my_table");
+ task.addToContext(DruidMetrics.TAGS, ImmutableMap.of("tag1", "tag1value"));
+
+ Selector selector = new Selector(
+ "TestSelector",
+ null,
+ null,
+ null
+ );
+
+ Assert.assertTrue(selector.evaluate(task));
+ }
+
+ @Test
+ public void testSerde() throws Exception
+ {
+ final ObjectMapper objectMapper = TestHelper.makeJsonMapper();
+ Map> cxtTagsConditions = new HashMap<>();
+ cxtTagsConditions.put("tag1", Sets.newHashSet("tag1value"));
+
+ Selector selector = new Selector(
+ "TestSelector",
+ cxtTagsConditions,
+ Sets.newHashSet(NoopTask.TYPE),
+ Sets.newHashSet("my_table")
+ );
+
+ Selector selector2 = objectMapper.readValue(
+ objectMapper.writeValueAsBytes(selector),
+ Selector.class
+ );
+ Assert.assertEquals(selector, selector2);
+ }
+}
diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/PodTemplateTaskAdapterTest.java b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/PodTemplateTaskAdapterTest.java
index 1796bb2b4396..36fc77631a1c 100644
--- a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/PodTemplateTaskAdapterTest.java
+++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/PodTemplateTaskAdapterTest.java
@@ -21,6 +21,7 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Optional;
+import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableMap;
import io.fabric8.kubernetes.api.model.PodTemplate;
import io.fabric8.kubernetes.api.model.PodTemplateBuilder;
@@ -40,16 +41,20 @@
import org.apache.druid.k8s.overlord.common.DruidK8sConstants;
import org.apache.druid.k8s.overlord.common.K8sTaskId;
import org.apache.druid.k8s.overlord.common.K8sTestUtils;
+import org.apache.druid.k8s.overlord.execution.DefaultKubernetesTaskRunnerDynamicConfig;
+import org.apache.druid.k8s.overlord.execution.KubernetesTaskRunnerDynamicConfig;
+import org.apache.druid.k8s.overlord.execution.Selector;
+import org.apache.druid.k8s.overlord.execution.SelectorBasedPodTemplateSelectStrategy;
import org.apache.druid.server.DruidNode;
import org.apache.druid.tasklogs.TaskLogs;
import org.easymock.EasyMock;
-import org.easymock.Mock;
import org.junit.Assert;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import org.mockito.Mockito;
+import org.mockito.internal.util.collections.Sets;
import java.io.ByteArrayInputStream;
import java.io.IOException;
@@ -71,7 +76,8 @@ public class PodTemplateTaskAdapterTest
private TaskConfig taskConfig;
private DruidNode node;
private ObjectMapper mapper;
- @Mock private TaskLogs taskLogs;
+ private TaskLogs taskLogs;
+ private Supplier dynamicConfigRef;
@BeforeEach
public void setup()
@@ -89,6 +95,9 @@ public void setup()
);
mapper = new TestUtils().getTestObjectMapper();
podTemplateSpec = K8sTestUtils.fileToResource("basePodTemplate.yaml", PodTemplate.class);
+
+ taskLogs = EasyMock.createMock(TaskLogs.class);
+ dynamicConfigRef = () -> new DefaultKubernetesTaskRunnerDynamicConfig(KubernetesTaskRunnerDynamicConfig.DEFAULT_STRATEGY);
}
@Test
@@ -103,7 +112,8 @@ public void test_fromTask_withoutBasePodTemplateInRuntimeProperites_raisesIAE()
node,
mapper,
new Properties(),
- taskLogs
+ taskLogs,
+ dynamicConfigRef
));
Assert.assertEquals(exception.getMessage(), "Pod template task adapter requires a base pod template to be specified under druid.indexer.runner.k8s.podTemplate.base");
}
@@ -125,11 +135,11 @@ public void test_fromTask_withBasePodTemplateInRuntimeProperites_withEmptyFile_r
node,
mapper,
props,
- taskLogs
+ taskLogs,
+ dynamicConfigRef
));
Assert.assertTrue(exception.getMessage().contains("Failed to load pod template file for"));
-
}
@Test
@@ -147,7 +157,8 @@ public void test_fromTask_withBasePodTemplateInRuntimeProperites() throws IOExce
node,
mapper,
props,
- taskLogs
+ taskLogs,
+ dynamicConfigRef
);
Task task = new NoopTask("id", "id", "datasource", 0, 0, null);
@@ -180,7 +191,8 @@ public void test_fromTask_withBasePodTemplateInRuntimeProperites_andTlsEnabled()
),
mapper,
props,
- taskLogs
+ taskLogs,
+ dynamicConfigRef
);
Task task = new NoopTask("id", "id", "datasource", 0, 0, null);
@@ -207,7 +219,8 @@ public void test_fromTask_withNoopPodTemplateInRuntimeProperties_withEmptyFile_r
node,
mapper,
props,
- taskLogs
+ taskLogs,
+ dynamicConfigRef
));
}
@@ -227,7 +240,8 @@ public void test_fromTask_withNoopPodTemplateInRuntimeProperites() throws IOExce
node,
mapper,
props,
- taskLogs
+ taskLogs,
+ dynamicConfigRef
);
Task task = new NoopTask("id", "id", "datasource", 0, 0, null);
@@ -253,7 +267,8 @@ public void test_fromTask_withNoopPodTemplateInRuntimeProperites_dontSetTaskJSON
node,
mapper,
props,
- taskLogs
+ taskLogs,
+ dynamicConfigRef
);
Task task = new NoopTask(
@@ -286,7 +301,8 @@ public void test_fromTask_withoutAnnotations_throwsDruidException() throws IOExc
node,
mapper,
props,
- taskLogs
+ taskLogs,
+ dynamicConfigRef
);
Job job = K8sTestUtils.fileToResource("baseJobWithoutAnnotations.yaml", Job.class);
@@ -309,7 +325,8 @@ public void test_getTaskId() throws IOException
node,
mapper,
props,
- taskLogs
+ taskLogs,
+ dynamicConfigRef
);
Job job = new JobBuilder()
.editSpec().editTemplate().editMetadata()
@@ -333,7 +350,8 @@ public void test_getTaskId_noAnnotations() throws IOException
node,
mapper,
props,
- taskLogs
+ taskLogs,
+ dynamicConfigRef
);
Job job = new JobBuilder()
.editSpec().editTemplate().editMetadata()
@@ -357,7 +375,8 @@ public void test_getTaskId_missingTaskIdAnnotation() throws IOException
node,
mapper,
props,
- taskLogs
+ taskLogs,
+ dynamicConfigRef
);
Job job = new JobBuilder()
.editSpec().editTemplate().editMetadata()
@@ -383,7 +402,8 @@ public void test_toTask_withoutTaskAnnotation_throwsIOE() throws IOException
node,
mapper,
props,
- taskLogs
+ taskLogs,
+ dynamicConfigRef
);
Job baseJob = K8sTestUtils.fileToResource("baseJobWithoutAnnotations.yaml", Job.class);
@@ -415,7 +435,8 @@ public void test_toTask() throws IOException
node,
mapper,
props,
- taskLogs
+ taskLogs,
+ dynamicConfigRef
);
Job job = K8sTestUtils.fileToResource("baseJob.yaml", Job.class);
@@ -446,7 +467,8 @@ public void test_toTask_useTaskPayloadManager() throws IOException
node,
mapper,
props,
- mockTestLogs
+ mockTestLogs,
+ dynamicConfigRef
);
Job job = K8sTestUtils.fileToResource("expectedNoopJob.yaml", Job.class);
@@ -470,7 +492,8 @@ public void test_fromTask_withRealIds() throws IOException
node,
mapper,
props,
- taskLogs
+ taskLogs,
+ dynamicConfigRef
);
Task task = new NoopTask(
@@ -504,7 +527,8 @@ public void test_fromTask_taskSupportsQueries() throws IOException
node,
mapper,
props,
- taskLogs
+ taskLogs,
+ dynamicConfigRef
);
Task task = EasyMock.mock(Task.class);
@@ -552,7 +576,8 @@ public void test_fromTask_withIndexKafkaPodTemplateInRuntimeProperites() throws
node,
mapper,
props,
- taskLogs
+ taskLogs,
+ dynamicConfigRef
);
Task kafkaTask = new NoopTask("id", "id", "datasource", 0, 0, null) {
@@ -571,6 +596,51 @@ public String getType()
Assert.assertEquals(0, actual.getSpec().getTemplate().getSpec().getVolumes().size(), 1);
}
+ @Test
+ public void test_fromTask_matchPodTemplateBasedOnStrategy() throws IOException
+ {
+ String dataSource = "my_table";
+ Path baseTemplatePath = Files.createFile(tempDir.resolve("base.yaml"));
+ mapper.writeValue(baseTemplatePath.toFile(), podTemplateSpec);
+
+ Path lowThroughputTemplatePath = Files.createFile(tempDir.resolve("low-throughput.yaml"));
+ PodTemplate lowThroughputPodTemplate = new PodTemplateBuilder(podTemplateSpec)
+ .editTemplate()
+ .editSpec()
+ .setNewVolumeLike(0, new VolumeBuilder().withName("volume").build())
+ .endVolume()
+ .endSpec()
+ .endTemplate()
+ .build();
+ mapper.writeValue(lowThroughputTemplatePath.toFile(), lowThroughputPodTemplate);
+
+ Properties props = new Properties();
+ props.setProperty("druid.indexer.runner.k8s.podTemplate.base", baseTemplatePath.toString());
+ props.setProperty("druid.indexer.runner.k8s.podTemplate.lowThroughput", lowThroughputTemplatePath.toString());
+ dynamicConfigRef = () -> new DefaultKubernetesTaskRunnerDynamicConfig(new SelectorBasedPodTemplateSelectStrategy(
+ Collections.singletonList(
+ new Selector("lowThrougput", null, null, Sets.newSet(dataSource)
+ )), null));
+
+ PodTemplateTaskAdapter adapter = new PodTemplateTaskAdapter(
+ taskRunnerConfig,
+ taskConfig,
+ node,
+ mapper,
+ props,
+ taskLogs,
+ dynamicConfigRef
+ );
+
+ Task taskWithMatchedDatasource = new NoopTask("id", "id", dataSource, 0, 0, null);
+ Task noopTask = new NoopTask("id", "id", "datasource", 0, 0, null);
+ Job actual = adapter.fromTask(taskWithMatchedDatasource);
+ Assert.assertEquals(1, actual.getSpec().getTemplate().getSpec().getVolumes().size(), 1);
+
+ actual = adapter.fromTask(noopTask);
+ Assert.assertEquals(0, actual.getSpec().getTemplate().getSpec().getVolumes().size(), 1);
+ }
+
private void assertJobSpecsEqual(Job actual, Job expected) throws IOException
{
Map actualAnnotations = actual.getSpec().getTemplate().getMetadata().getAnnotations();