Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
66 commits
Select commit Hold shift + click to select a range
47dce70
app code working but needs cleanup and testing
capistrant Oct 3, 2025
e4fbdf5
caching side cleaner. need to add back direct client
capistrant Oct 3, 2025
8911e1f
Implementation ready for deeper UT and ET writing
capistrant Oct 5, 2025
0a10243
Merge branch 'master' into k8s-overlord-api-redux
capistrant Oct 5, 2025
ce7493a
checkstyle cleanup
capistrant Oct 5, 2025
8f13cda
Remove the busy waiting. Overhaul caching client testing
capistrant Oct 8, 2025
3778883
some config and instantion cleanup along with basic docs
capistrant Oct 8, 2025
5dcc64d
extnd the K8s task runner docker test to run with both direct and cac…
capistrant Oct 8, 2025
a1b1381
Merge branch 'master' into k8s-overlord-api-redux
capistrant Oct 8, 2025
af306e1
fix spelling and add resync to dictionary
capistrant Oct 8, 2025
08764c5
fix strict compile issues
capistrant Oct 8, 2025
a90b2d2
fixup checkstyle
capistrant Oct 8, 2025
0d9672c
fix k8s overlord module setup
capistrant Oct 8, 2025
2f24623
Merge branch 'master' into k8s-overlord-api-redux
capistrant Oct 9, 2025
1fe0f87
few small fixups
capistrant Oct 9, 2025
e9b3947
fix checkstyle
capistrant Oct 9, 2025
d62bbc7
fix up some issues with wait for job completion
capistrant Oct 10, 2025
5f6920b
cleanup and fix some tests
capistrant Oct 10, 2025
5b6c8f3
Make DruidKubernetesClient defend against invalid use if caching is off
capistrant Oct 10, 2025
11db5ad
cleanup checkstyle
capistrant Oct 10, 2025
144dd49
dont use deprecated method
capistrant Oct 10, 2025
7fcbcec
doc update
capistrant Oct 13, 2025
264879a
Merge branch 'master' into k8s-overlord-api-redux
capistrant Oct 13, 2025
2301dde
fix spelling
capistrant Oct 14, 2025
3455b0d
Merge branch 'master' into k8s-overlord-api-redux
capistrant Oct 15, 2025
8983477
fix checkstyle after merge with master
capistrant Oct 15, 2025
06ed528
Improve reliability of the Caching K8s Peon Client code and associate…
capistrant Oct 16, 2025
3451026
fix checkstyle
capistrant Oct 16, 2025
d963d55
Modifications to try and reduce caching client api impact even more
capistrant Oct 22, 2025
09ab11f
Fixup tests now that we have refactored log fetching
capistrant Oct 23, 2025
7dd0fe8
remove some whitespace from the diff. Can be corrected in a future fo…
capistrant Oct 24, 2025
baf5015
one more whitespace cleanup
capistrant Oct 24, 2025
4109b6c
more diff cleanup
capistrant Oct 24, 2025
d96cabe
Make another api usage optimization for the caching client. Clean up …
capistrant Oct 25, 2025
a7104a7
diff cleanup
capistrant Oct 25, 2025
e129bd0
Some better class javadocs for the k8s clients
capistrant Oct 25, 2025
c5cf208
logging and comment cleanup
capistrant Oct 25, 2025
9bf9c4a
DruidKubernetesClient tidy up
capistrant Oct 25, 2025
6b51c9b
javadoc link add
capistrant Oct 25, 2025
ab1cd89
Use background propagation policy when deleting jobs to lessen load o…
capistrant Oct 25, 2025
9495e3c
fix an npe and add a test to caching client
capistrant Oct 28, 2025
70d46a5
Merge branch 'master' into k8s-overlord-api-redux
capistrant Nov 20, 2025
b664b12
Merge branch 'master' into k8s-overlord-api-redux
capistrant Dec 4, 2025
b04e779
Remove AbstractK8sClient, rename DirectClient
kfaraz Dec 5, 2025
e1c1d13
Remove formatting changes in KubernetesPeonClient
kfaraz Dec 5, 2025
6ea97f4
Remove more formatting changes
kfaraz Dec 5, 2025
ceb10bc
Address the more minor review comments
capistrant Dec 5, 2025
f447b7a
re-add log watch refactors to KubernetesPeonClient, they reduce API t…
capistrant Dec 5, 2025
5653132
migrate timers to stopwatch in caching k8s client per review comments
capistrant Dec 5, 2025
aa8e12e
Remove unused code
capistrant Dec 5, 2025
8df9dfe
style fix
capistrant Dec 5, 2025
ae29963
remove unneeded code
capistrant Dec 5, 2025
66f3cda
Extract Caching client code from DruidKubernetesClient per review
capistrant Dec 5, 2025
f99673c
Make name for cache read methods more logical
capistrant Dec 5, 2025
8ec81cf
Stop exposing the EventNotifier in DruidKubernetesCachingClient
capistrant Dec 5, 2025
78e1c82
Improve informer executor name per review
capistrant Dec 5, 2025
ed671f9
Simplify informer setup for caching client
capistrant Dec 5, 2025
6b58a29
cleanup caching client tests and add a lifecycle stop to the informers
capistrant Dec 5, 2025
62ddcc3
Improve thread safety of KubernetesResourceEventNotifier
capistrant Dec 5, 2025
e58be26
Simply the peon waiting code for the caching client
capistrant Dec 8, 2025
94bfa2a
Merge branch 'master' into k8s-overlord-api-redux
capistrant Dec 8, 2025
4b7a408
Fix the k8s overlord module for the caching client
capistrant Dec 8, 2025
ef93c44
fix configs for docker embedded test
capistrant Dec 8, 2025
53d0976
fix broken embedded tests
capistrant Dec 8, 2025
b2982c3
use the indexer not informer for cache reads
capistrant Dec 8, 2025
4f735a4
Cleanup after another review round
capistrant Dec 8, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 27 additions & 1 deletion docs/development/extensions-core/k8s-jobs.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,31 @@ Consider this an [EXPERIMENTAL](../experimental.md) feature mostly because it ha

The K8s extension builds a pod spec for each task using the specified pod adapter. All jobs are natively restorable, they are decoupled from the Druid deployment, thus restarting pods or doing upgrades has no effect on tasks in flight. They will continue to run and when the overlord comes back up it will start tracking them again.

## Kubernetes Client Mode

### "Direct" K8s API Interaction per task *(Default)*

Task lifecycle code in Druid talks directly to the Kubernetes API server for all operations that require interaction with the Kubernetes cluster.

### `SharedInformer` "Caching" *(Experimental)*

Enabled by setting `druid.indexer.runner.useK8sSharedInformers=true`, this mode uses `Fabric8` `SharedInformer` objects for monitoring state changes in the remote K8s cluster, reducing the number of direct API calls to the Kubernetes API server. This can greatly reduce load on the API server, especially in environments with a high volume of tasks.

This mode is experimental and should be used with caution in production until it has been vetted more thoroughly by the community.

The core idea is to use two `SharedInformers`, one for jobs and one for pods, to watch for changes in the remote K8s cluster. These informers maintain a local cache of jobs and pods that tasks can query. The informers can also notify listeners when changes occur, allowing tasks to react to state changes without polling the API server or creating per-task watches on the K8s cluster.

#### Architecture: Direct vs. Caching Mode

**Key Differences:**

- `DirectKubernetesPeonClient` (Default): Every read operation makes a direct HTTP call to the K8s API server. With 100 concurrent tasks, this results in 100+ active API connections with continuous polling.

- `CachingKubernetesPeonClient` (Experimental): All read operations query an in-memory cache maintained by `SharedInformers`. With 100 concurrent tasks, only 2 persistent watch connections are used (one for Jobs, one for Pods), achieving a large reduction in API calls.

**Shared Operations**:

Both implementations share the same write (job creation, deletion) and log read operations code, which always use direct API calls.

## Configuration

Expand Down Expand Up @@ -798,7 +823,8 @@ Should you require the needed permissions for interacting across Kubernetes name
| `druid.indexer.runner.capacity` | `Integer` | Number of concurrent jobs that can be sent to Kubernetes. Value will be overridden if a dynamic config value has been set. | `2147483647` | No |
| `druid.indexer.runner.cpuCoreInMicro` | `Integer` | Number of CPU micro core for the task. | `1000` | No |
| `druid.indexer.runner.logSaveTimeout` | `Duration` | The peon executing the ingestion task makes a best effort to persist the pod logs from `k8s` to persistent task log storage. The timeout ensures that `k8s` connection issues do not cause the pod to hang indefinitely thereby blocking Overlord operations. If the timeout occurs before the logs are saved, those logs will not be available in Druid. | `PT300S` | NO |

| `druid.indexer.runner.useK8sSharedInformers` | `boolean` | Whether to use shared informers to watch for pod/job changes. This is more efficient on the Kubernetes API server, but may use more memory in the Overlord. | `false` | No |
| `druid.indexer.runner.k8sSharedInformerResyncPeriod` | `Duration` | When using shared informers, controls how frequently the informers resync with the Kubernetes API server. This prevents change events from being missed, keeping the informer cache clean and accurate. | `PT300S` | No |

### Metrics added

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,10 @@
package org.apache.druid.testing.embedded.indexing;

import com.google.common.base.Optional;
import com.google.common.collect.ImmutableList;
import org.apache.commons.io.IOUtils;
import org.apache.druid.common.utils.IdUtils;
import org.apache.druid.data.input.impl.CsvInputFormat;
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.TaskStatusPlus;
import org.apache.druid.indexing.common.task.CompactionTask;
import org.apache.druid.indexing.common.task.IndexTask;
import org.apache.druid.indexing.common.task.NoopTask;
Expand All @@ -40,7 +37,6 @@
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.apache.druid.metadata.storage.postgresql.PostgreSQLMetadataStorageModule;
import org.apache.druid.query.DruidMetrics;
import org.apache.druid.query.http.SqlTaskStatus;
Expand Down Expand Up @@ -243,7 +239,7 @@ public void test_runIndexParallelTask_andCompactData()
.dynamicPartitionWithMaxRows(5000)
.withId(compactTaskId);
cluster.callApi().onLeaderOverlord(o -> o.runTask(compactTaskId, compactionTask));
cluster.callApi().waitForTaskToSucceed(taskId, eventCollector.latchableEmitter());
cluster.callApi().waitForTaskToSucceed(compactTaskId, eventCollector.latchableEmitter());
Comment thread
kfaraz marked this conversation as resolved.

// Verify the compacted data
final int numCompactedSegments = 5;
Expand Down Expand Up @@ -308,13 +304,10 @@ public void test_runKafkaSupervisor()
Assertions.assertEquals("RUNNING", supervisorStatus.getState());
Assertions.assertEquals(topic, supervisorStatus.getSource());

// Get the task statuses
List<TaskStatusPlus> taskStatuses = ImmutableList.copyOf(
(CloseableIterator<TaskStatusPlus>)
cluster.callApi().onLeaderOverlord(o -> o.taskStatuses(null, dataSource, 1))
);
Assertions.assertFalse(taskStatuses.isEmpty());
Assertions.assertEquals(TaskState.RUNNING, taskStatuses.get(0).getStatusCode());
// Confirm tasks are being created and running
int runningTasks = cluster.callApi().getTaskCount("running", dataSource);
int completedTasks = cluster.callApi().getTaskCount("complete", dataSource);
Assertions.assertTrue(runningTasks + completedTasks > 0);

// Suspend the supervisor and verify the state
cluster.callApi().onLeaderOverlord(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,17 @@
import org.junit.jupiter.api.BeforeEach;

/**
* Runs some basic ingestion tests against latest image Druid containers running
* on a K3s cluster with druid-operator and using {@code k8s} task runner type.
* Base class for Kubernetes task runner tests. Subclasses configure whether to use
* SharedInformers for caching.
*/
public class KubernetesTaskRunnerDockerTest extends IngestionSmokeTest implements LatestImageDockerTest
abstract class BaseKubernetesTaskRunnerDockerTest extends IngestionSmokeTest implements LatestImageDockerTest
{
private static final String MANIFEST_TEMPLATE = "manifests/druid-service-with-operator.yaml";
protected static final String MANIFEST_TEMPLATE = "manifests/druid-service-with-operator.yaml";

/**
* Subclasses override to enable/disable SharedInformer caching.
*/
protected abstract boolean useSharedInformers();

@Override
protected EmbeddedDruidCluster addServers(EmbeddedDruidCluster cluster)
Expand All @@ -45,6 +50,8 @@ protected EmbeddedDruidCluster addServers(EmbeddedDruidCluster cluster)
.addProperty("druid.indexer.runner.type", "k8s")
.addProperty("druid.indexer.runner.namespace", "druid")
.addProperty("druid.indexer.runner.capacity", "4")
.addProperty("druid.indexer.runner.useK8sSharedInformers", String.valueOf(useSharedInformers()))
.addProperty("druid.indexer.runner.k8sSharedInformerResyncPeriod", "PT1s")
.usingPort(30090);

final K3sClusterResource k3sCluster = new K3sClusterWithOperatorResource()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.testing.embedded.k8s;

/**
* Runs ingestion tests using SharedInformer caching mode.
* Uses Fabric8 SharedInformers to maintain a local cache of Jobs and Pods,
* reducing load on the Kubernetes API server.
*/
public class KubernetesTaskRunnerCachingModeDockerTest extends BaseKubernetesTaskRunnerDockerTest
{
@Override
protected boolean useSharedInformers()
{
return true;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.testing.embedded.k8s;

/**
* Runs ingestion tests using direct K8s API interaction (default mode).
* Each task makes direct API calls to the Kubernetes API server.
*/
public class KubernetesTaskRunnerDirectModeDockerTest extends BaseKubernetesTaskRunnerDockerTest
{
@Override
protected boolean useSharedInformers()
{
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.lifecycle.Lifecycle;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.k8s.overlord.common.DruidKubernetesCachingClient;
import org.apache.druid.k8s.overlord.common.DruidKubernetesClient;
import org.apache.druid.k8s.overlord.common.httpclient.DruidKubernetesHttpClientFactory;
import org.apache.druid.k8s.overlord.common.httpclient.jdk.DruidKubernetesJdkHttpClientConfig;
Expand All @@ -73,6 +74,7 @@
import org.apache.druid.server.log.StartupLoggingConfig;
import org.apache.druid.tasklogs.TaskLogs;

import javax.annotation.Nullable;
import java.util.Locale;
import java.util.Properties;

Expand Down Expand Up @@ -160,6 +162,10 @@ public KubernetesTaskRunnerEffectiveConfig provideEffectiveConfig(
return new KubernetesTaskRunnerEffectiveConfig(staticConfig, dynamicConfigSupplier);
}

/**
* Provides the base Kubernetes client for direct API operations.
* This is always created regardless of caching configuration.
*/
@Provides
@LazySingleton
public DruidKubernetesClient makeKubernetesClient(
Expand All @@ -168,15 +174,16 @@ public DruidKubernetesClient makeKubernetesClient(
Lifecycle lifecycle
)
{
final DruidKubernetesClient client;
final Config config = new ConfigBuilder().build();

if (kubernetesTaskRunnerConfig.isDisableClientProxy()) {
config.setHttpsProxy(null);
config.setHttpProxy(null);
}

client = new DruidKubernetesClient(httpClientFactory, config);
config.setNamespace(kubernetesTaskRunnerConfig.getNamespace());

final DruidKubernetesClient client = new DruidKubernetesClient(httpClientFactory, config);

lifecycle.addHandler(
new Lifecycle.Handler()
Expand All @@ -199,6 +206,58 @@ public void stop()
return client;
}

/**
* Provides the caching Kubernetes client that uses informers for efficient resource watching.
* Only created when caching is enabled via configuration.
*/
@Provides
@LazySingleton
@Nullable
public DruidKubernetesCachingClient makeCachingKubernetesClient(
KubernetesTaskRunnerStaticConfig kubernetesTaskRunnerConfig,
DruidKubernetesClient baseClient,
Lifecycle lifecycle
)
{
if (!kubernetesTaskRunnerConfig.isUseK8sSharedInformers()) {
log.info("Kubernetes shared informers disabled, caching client will not be created");
return null;
}

String namespace = kubernetesTaskRunnerConfig.getNamespace();
long resyncPeriodMillis = kubernetesTaskRunnerConfig
.getK8sSharedInformerResyncPeriod()
.toStandardDuration()
.getMillis();

log.info("Creating Kubernetes caching client with informer resync period: %d ms", resyncPeriodMillis);
final DruidKubernetesCachingClient cachingClient = new DruidKubernetesCachingClient(
baseClient,
namespace,
resyncPeriodMillis
);

lifecycle.addHandler(
new Lifecycle.Handler()
{
@Override
public void start()
{

}

@Override
public void stop()
{
log.info("Stopping Kubernetes caching client");
cachingClient.stop();
}
}
);

return cachingClient;
}

/**
* Provides a TaskRunnerFactory instance suitable for environments without Zookeeper.
* In such environments, the standard RemoteTaskRunnerFactory may not be operational.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,25 @@ public interface KubernetesTaskRunnerConfig

Integer getCapacity();

/**
* Whether to use caching for Kubernetes resources tied to indexing tasks.
* <p>
* Enabling shared informers can significantly reduce the number of API calls made to the Kubernetes API server,
* improving performance and reducing load on the server. However, it also increases memory usage as informers
* maintain local caches of resources.
* </p>
*/
boolean isUseK8sSharedInformers();

/**
* The resync period for the Kubernetes shared informers, if enabled.
* <p>
* Periodic resyncs ensure that the informer's local cache is kept up to date with the remote Kubernetes API server
* state. This helps handle missed events or transient errors.
* </p>
*/
Period getK8sSharedInformerResyncPeriod();
Comment on lines +83 to +92
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe add 1-line javadocs for these.


static Builder builder()
{
return new Builder();
Expand Down Expand Up @@ -100,6 +119,8 @@ public static class Builder
private Integer capacity;
private Period taskJoinTimeout;
private Period logSaveTimeout;
private boolean useK8sSharedInformers;
private Period k8sSharedInformerResyncPeriod;

public Builder()
{
Expand Down Expand Up @@ -232,6 +253,18 @@ public Builder withLogSaveTimeout(Period logSaveTimeout)
return this;
}

public Builder withUseK8sSharedInformers(boolean useK8sSharedInformers)
{
this.useK8sSharedInformers = useK8sSharedInformers;
return this;
}

public Builder withK8sSharedInformerResyncPeriod(Period k8sSharedInformerResyncPeriod)
{
this.k8sSharedInformerResyncPeriod = k8sSharedInformerResyncPeriod;
return this;
}

public KubernetesTaskRunnerStaticConfig build()
{
return new KubernetesTaskRunnerStaticConfig(
Expand All @@ -255,7 +288,9 @@ public KubernetesTaskRunnerStaticConfig build()
this.labels,
this.annotations,
this.capacity,
this.taskJoinTimeout
this.taskJoinTimeout,
this.useK8sSharedInformers,
this.k8sSharedInformerResyncPeriod
);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,18 @@ public Integer getCapacity()
return dynamicConfigSupplier.get().getCapacity();
}

@Override
public boolean isUseK8sSharedInformers()
{
return staticConfig.isUseK8sSharedInformers();
}

@Override
public Period getK8sSharedInformerResyncPeriod()
{
return staticConfig.getK8sSharedInformerResyncPeriod();
}

public PodTemplateSelectStrategy getPodTemplateSelectStrategy()
{
if (dynamicConfigSupplier == null || dynamicConfigSupplier.get() == null || dynamicConfigSupplier.get().getPodTemplateSelectStrategy() == null) {
Expand Down
Loading
Loading