Skip to content

Create a new KubernetesPeonClient that uses fabric8 informers to reduce load on an underlying k8s API#18599

Merged
capistrant merged 66 commits intoapache:masterfrom
capistrant:k8s-overlord-api-redux
Dec 8, 2025
Merged

Create a new KubernetesPeonClient that uses fabric8 informers to reduce load on an underlying k8s API#18599
capistrant merged 66 commits intoapache:masterfrom
capistrant:k8s-overlord-api-redux

Conversation

@capistrant
Copy link
Copy Markdown
Contributor

@capistrant capistrant commented Oct 6, 2025

Description

KubernetesPeonClient Extension

CachingKubernetesPeonClient experimental

Replaces direct read-only k8s api calls with the use of SharedInformers from fabric8 along with a new event notifier system to centralize all of the state watching for pods and jobs, greatly reducing the api call rate to k8s by the indexing service peon lifecycle

The goal is to make the k8s task runner much more efficient in its use of the k8s control plane API. The existing client has many per pod/job actions that hit the kubernetes api, which can put undue stress on the k8s cluster in high task count/churn druid use cases

Diagram

this is now out of date. I will try to update

image

Metrics

Using a test cluster with a synthetic workload over a fixed time window I measured the following reductions in API traffic from Druid to the k8s control plane (approximate numbers as of now):

  • get requests down 35%
  • list requests down 88%
  • watches down ~100% (99.82% was the actual measurement I got)

Release note

Adds an experimental implementation of KubernetesPeonClient that utilizes fabric8 SharedInformers to cache k8s metadata and greatly reduce API traffic between the Overlord and k8s control plane. This is experimental and opt-in via configuration.


Key changed/added classes in this PR
  • AbstractKubernetesPeonClient
  • DirectKubernetesPeonClient
  • CachingKubernetesPeonClient
  • DruidKubernetesClient
  • KubernetesResourceEventNotifier

This PR has:

  • been self-reviewed.
  • added documentation for new or modified features or behaviors.
  • a release note entry in the PR description.
  • added Javadocs for most classes and all non-trivial methods. Linked related entities via Javadoc links.
  • added or updated version, license, or notice information in licenses.yaml
  • added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
  • added unit tests or modified existing tests to cover new code paths, ensuring the threshold for code coverage is met.
  • added integration tests.
  • been tested in a test Druid cluster.

@capistrant capistrant marked this pull request as draft October 6, 2025 00:07
@capistrant capistrant changed the title [WIP] Create a new KubernetesPeonClient that uses fabric8 informers to reduce load on an underlying k8s API Create a new KubernetesPeonClient that uses fabric8 informers to reduce load on an underlying k8s API Oct 8, 2025
@capistrant capistrant marked this pull request as ready for review October 8, 2025 19:13
@capistrant
Copy link
Copy Markdown
Contributor Author

@kfaraz thank you for the detailed review. I pushed up changes that I made up until now. I am still trying to solve for the issue regarding leaving dangling futures in exceptional cases. Will hopefully have a proposal for fixing that in the next 24 hours. Thanks again, appreciate all your insight on how to improve this.

private final KubernetesClient client;

public TestKubernetesClient(KubernetesClient client)
public TestKubernetesClient(KubernetesClient client, String namespace)

Check notice

Code scanning / CodeQL

Useless parameter Note test

The parameter 'namespace' is never used.
Copy link
Copy Markdown
Contributor

@kfaraz kfaraz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the update, @capistrant !
The changes look good.

The blocking comments are only these:

  • #18599 (comment) (is DruidKubernetestCachingClient.stop() called upon loss of leadership? Should we also have a start() which is called on becoming leader?)
  • #18599 (comment) (when do we call podInformer.run() in DruidKubernetesCachingClient?

The other comments are all minor suggestions.

*
* @param <T> The Kubernetes resource type (e.g., Pod, Job)
*/
public static class InformerEventHandler<T> implements ResourceEventHandler<T>
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: this class may be in a file of its own.

eventConsumer.accept(resource, InformerEventType.DELETE);
}
}
private static final EmittingLogger log = new EmittingLogger(DruidKubernetesCachingClient.class);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: please add a newline before this.

Comment on lines +90 to +92
protected final SharedIndexInformer<Pod> podInformer;
protected final SharedIndexInformer<Job> jobInformer;
protected final KubernetesResourceEventNotifier eventNotifier;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do these need to be protected or can they be private too?

this.jobInformer = setupJobInformer(namespace);
}

public void stop()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add a short javadoc. Should this be called upon loss of leadership too or only service termination?

Should there also be an equivalent start() that is invoked on becoming leader?


public <T> T readPodCache(SharedInformerCacheReadRequestExecutor<T, Pod> executor)
{
if (podInformer == null) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

podInformer and jobInformer will never be null.

}
catch (Exception e) {
log.error(e, "Error watching logs from task: %s", taskId);
log.error(e, "Error watching logs from task: %s, pod: %s", taskId, podName);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
log.error(e, "Error watching logs from task: %s, pod: %s", taskId, podName);
log.error(e, "Error watching logs from task[%s], pod[%s].", taskId, podName);

Comment on lines +213 to +217
* Get an InputStream for the logs of the peon pod associated with the given taskId.
* <p>
* Any issues creating the InputStream will be logged and an absent Optional will be returned.
* </p>
*
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: I think this part can be omitted since the same info is already captured in @return tag too.

* Any issues creating the InputStream will be logged and an absent Optional will be returned.
* </p>
*
* @return an Optional containing the {@link InputStream} if the pod exists and logs could be streamed, or absent otherwise
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* @return an Optional containing the {@link InputStream} if the pod exists and logs could be streamed, or absent otherwise
* @return an Optional containing the {@link InputStream} for the logs of the pod, if it exists and logs could be streamed, or absent otherwise.

return this;
}

public Builder withEnablePeonClientCache(boolean enableKubernetesClientCaching)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please align the method name and arg name with the config i.e. useK8sSharedInformers.

Comment on lines +75 to +77
boolean isUseK8sSharedInformers();

Period getK8sSharedInformerResyncPeriod();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe add 1-line javadocs for these.

@capistrant
Copy link
Copy Markdown
Contributor Author

Thanks for the update, @capistrant ! The changes look good.

The blocking comments are only these:

  • #18599 (comment) (is DruidKubernetestCachingClient.stop() called upon loss of leadership? Should we also have a start() which is called on becoming leader?)
  • #18599 (comment) (when do we call podInformer.run() in DruidKubernetesCachingClient?

The other comments are all minor suggestions.

Thanks @kfaraz for another review round. I think that these questions are both inter-mingled actually.

How everything works now, the Informers start automatically when DruidKubernetesCachingClient is created. This is happening as a part of the overlord startup, regardless of leadership. The start() calls on the informers in the tests were benign and I removed them.

The above information leads to your other question on when to call stop. Right now we only call stop as a part of the JVM lifecycle and not on loss of leadership. This is because as currently constructed the informers are started and exist for the lifecycle of the jvm so they are active regardless of if an OL is leader.

I think the key question is, do we want the informer/caching client to be tied to the JVM lifecycle as they are now or tied to leadership state. doing the latter would reduce resource required by the standby in exchange for lengthening failover time (I'm not exactly sure on statistics here regarding how long failover would be extended by. It would scale with the pod count in k8s, but I don't have any good estimate on what that looks like for a k8s cluster with 500 pods, 5000 pods, 50000 pods, etc)

@kfaraz
Copy link
Copy Markdown
Contributor

kfaraz commented Dec 8, 2025

Thanks for the clarification, @capistrant !

I think it is better to have the cache pre-warmed on the follower Overlord, so that it is able to quickly take over in case of a leader failure. So let's keep the lifecycle of the cache tied to the Overlord service itself.

Copy link
Copy Markdown
Contributor

@kfaraz kfaraz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Major changes look good. There are minor non-blocking suggestions, which may be addressed here or in a follow up.

Thanks for adding the feature, @capistrant !

@capistrant capistrant merged commit 69505a3 into apache:master Dec 8, 2025
99 of 100 checks passed
@kgyrtkirk kgyrtkirk added this to the 36.0.0 milestone Jan 19, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants