Skip to content
Merged
53 changes: 53 additions & 0 deletions docs/development/extensions-contrib/k8s-jobs.md
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,59 @@ data:
druid.peon.mode=remote
druid.indexer.task.encapsulatedTask=true
```
#### Dynamic Pod Template Selection Config
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note to self: doc should be re-written. remove use of new feature, more flexible, etc.

What is the right point to talk about this config

The Dynamic Pod Template Selection feature enhances the K8s extension by enabling more flexible and dynamic selection of pod templates based on task properties. This process is governed by the `PodTemplateSelectStrategy`. Below are the two strategies implemented:

|Property|Description|Default|
|--------|-----------|-------|
|`TaskTypePodTemplateSelectStrategy`| This strategy selects pod templates based on task type for execution purposes, implementing the behavior that maps templates to specific task types. | true |
|`SelectorBasedPodTemplateSelectStrategy`| This strategy evaluates a series of selectors, known as `selectors`, which are aligned with potential task properties. | false |

`SelectorBasedPodTemplateSelectStrategy`, the strategy implementing this new feature, is based on conditional `selectors` that match against top-level keys from the task payload. Currently, it supports matching based on task context tags, task type, and dataSource. These selectors are ordered in the dynamic configuration, with the first selector given the highest priority during the evaluation process. This means that the selection process uses these ordered conditions to determine a task’s Pod template. The first matching condition immediately determines the Pod template, thereby prioritizing certain configurations over others. If no selector matches, it will fall back to an optional `defaultKey` if configured; if there is still no match, it will use the `base` template.

Example Configuration:

We define two template keys in the configuration—`low-throughput` and `medium-throughput`—each associated with specific task conditions and arranged in a priority order.

- Low Throughput Template: This is the first template evaluated and has the highest priority. Tasks that have a context tag `billingCategory=streaming_ingestion` and a datasource of `wikipedia` will be classified under the `low-throughput` template. This classification directs such tasks to utilize a predefined pod template optimized for low throughput requirements.

- Medium Throughput Template: If a task does not meet the low-throughput criteria, the system will then evaluate it against the next selector in order. In this example, if the task type is index_kafka, it will fall into the `medium-throughput` template.
```
{
"type": "default",
"podTemplateSelectStrategy":
{
"type": "selectorBased",
"selectors": [
{
"selectionKey": "low-throughput",
Comment thread
YongGang marked this conversation as resolved.
"context.tags":
{
"billingCategory": ["streaming_ingestion"]
},
"dataSource": ["wikipedia"]
},
{
"selectionKey": "medium-throughput",
"type": ["index_kafka"]
}
],
"defaultKey"" "base"
}
}
```
Task specific pod templates can be specified as the runtime property `druid.indexer.runner.k8s.podTemplate.{template}: /path/to/taskSpecificPodSpec.yaml` where {template} is the matched `selectionKey` of the `podTemplateSelectStrategy` i.e low-throughput.

Similar to Overlord dynamic configuration, the following API endpoints are defined to retrieve and manage dynamic configurations of Pod Template Selection config:

- Get dynamic configuration:
`POST` `/druid/indexer/v1/k8s/taskRunner/executionConfig`

- Update dynamic configuration:
`GET` `/druid/indexer/v1/k8s/taskRunner/executionConfig`

- Get dynamic configuration history:
`GET` `/druid/indexer/v1/k8s/taskRunner/executionConfig/history`

### Properties
|Property| Possible Values | Description |Default|required|
Expand Down
20 changes: 20 additions & 0 deletions extensions-contrib/kubernetes-overlord-extensions/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,26 @@
<version>6.7.2</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>javax.ws.rs</groupId>
<artifactId>jsr311-api</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>javax.servlet</groupId>
<artifactId>javax.servlet-api</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.sun.jersey</groupId>
<artifactId>jersey-server</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>jakarta.inject</groupId>
<artifactId>jakarta.inject-api</artifactId>
<scope>provided</scope>
</dependency>

<!-- Tests -->
<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
import org.apache.druid.discovery.NodeRole;
import org.apache.druid.guice.Binders;
import org.apache.druid.guice.IndexingServiceModuleHelper;
import org.apache.druid.guice.JacksonConfigProvider;
import org.apache.druid.guice.Jerseys;
import org.apache.druid.guice.JsonConfigProvider;
import org.apache.druid.guice.JsonConfigurator;
import org.apache.druid.guice.LazySingleton;
Expand All @@ -49,6 +51,8 @@
import org.apache.druid.java.util.common.lifecycle.Lifecycle;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.k8s.overlord.common.DruidKubernetesClient;
import org.apache.druid.k8s.overlord.execution.KubernetesTaskExecutionConfigResource;
import org.apache.druid.k8s.overlord.execution.KubernetesTaskRunnerDynamicConfig;
import org.apache.druid.k8s.overlord.runnerstrategy.RunnerStrategy;
import org.apache.druid.tasklogs.NoopTaskLogs;
import org.apache.druid.tasklogs.TaskLogKiller;
Expand All @@ -75,6 +79,7 @@ public void configure(Binder binder)
JsonConfigProvider.bind(binder, IndexingServiceModuleHelper.INDEXER_RUNNER_PROPERTY_PREFIX, KubernetesTaskRunnerConfig.class);
JsonConfigProvider.bind(binder, K8SANDWORKER_PROPERTIES_PREFIX, KubernetesAndWorkerTaskRunnerConfig.class);
JsonConfigProvider.bind(binder, "druid.indexer.queue", TaskQueueConfig.class);
JacksonConfigProvider.bind(binder, KubernetesTaskRunnerDynamicConfig.CONFIG_KEY, KubernetesTaskRunnerDynamicConfig.class, null);
Comment thread
suneet-s marked this conversation as resolved.
PolyBind.createChoice(
binder,
"druid.indexer.runner.type",
Expand All @@ -98,6 +103,8 @@ public void configure(Binder binder)
.toProvider(RunnerStrategyProvider.class)
.in(LazySingleton.class);
configureTaskLogs(binder);

Jerseys.addResource(binder, KubernetesTaskExecutionConfigResource.class);
}

@Provides
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.apache.druid.k8s.overlord;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Supplier;
import com.google.inject.Inject;
import org.apache.druid.guice.IndexingServiceModuleHelper;
import org.apache.druid.guice.annotations.EscalatedGlobal;
Expand All @@ -32,6 +33,7 @@
import org.apache.druid.java.util.http.client.HttpClient;
import org.apache.druid.k8s.overlord.common.DruidKubernetesClient;
import org.apache.druid.k8s.overlord.common.KubernetesPeonClient;
import org.apache.druid.k8s.overlord.execution.KubernetesTaskRunnerDynamicConfig;
import org.apache.druid.k8s.overlord.taskadapter.MultiContainerTaskAdapter;
import org.apache.druid.k8s.overlord.taskadapter.PodTemplateTaskAdapter;
import org.apache.druid.k8s.overlord.taskadapter.SingleContainerTaskAdapter;
Expand All @@ -56,6 +58,7 @@ public class KubernetesTaskRunnerFactory implements TaskRunnerFactory<Kubernetes
private final Properties properties;
private final DruidKubernetesClient druidKubernetesClient;
private final ServiceEmitter emitter;
private final Supplier<KubernetesTaskRunnerDynamicConfig> dynamicConfigRef;
private KubernetesTaskRunner runner;

@Inject
Expand All @@ -69,7 +72,8 @@ public KubernetesTaskRunnerFactory(
TaskConfig taskConfig,
Properties properties,
DruidKubernetesClient druidKubernetesClient,
ServiceEmitter emitter
ServiceEmitter emitter,
Supplier<KubernetesTaskRunnerDynamicConfig> dynamicConfigRef
)
{
this.smileMapper = smileMapper;
Expand All @@ -82,6 +86,7 @@ public KubernetesTaskRunnerFactory(
this.properties = properties;
this.druidKubernetesClient = druidKubernetesClient;
this.emitter = emitter;
this.dynamicConfigRef = dynamicConfigRef;
}

@Override
Expand Down Expand Up @@ -146,7 +151,8 @@ private TaskAdapter buildTaskAdapter(DruidKubernetesClient client)
druidNode,
smileMapper,
properties,
taskLogs
taskLogs,
dynamicConfigRef
);
} else {
return new SingleContainerTaskAdapter(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.k8s.overlord.execution;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;

import java.util.Objects;

public class DefaultKubernetesTaskRunnerDynamicConfig implements KubernetesTaskRunnerDynamicConfig
{
private final PodTemplateSelectStrategy podTemplateSelectStrategy;

@JsonCreator
public DefaultKubernetesTaskRunnerDynamicConfig(
@JsonProperty("podTemplateSelectStrategy") PodTemplateSelectStrategy podTemplateSelectStrategy
)
{
Preconditions.checkNotNull(podTemplateSelectStrategy);
this.podTemplateSelectStrategy = podTemplateSelectStrategy;
}

@Override
@JsonProperty
public PodTemplateSelectStrategy getPodTemplateSelectStrategy()
{
return podTemplateSelectStrategy;
}

@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
DefaultKubernetesTaskRunnerDynamicConfig that = (DefaultKubernetesTaskRunnerDynamicConfig) o;
return Objects.equals(podTemplateSelectStrategy, that.podTemplateSelectStrategy);
}

@Override
public int hashCode()
{
return Objects.hashCode(podTemplateSelectStrategy);
}

@Override
public String toString()
{
return "DefaultKubernetesTaskRunnerDynamicConfig{" +
"podTemplateSelectStrategy=" + podTemplateSelectStrategy +
'}';
}
}
Loading