diff --git a/core/src/main/java/org/apache/druid/annotations/SuppressFBWarnings.java b/core/src/main/java/org/apache/druid/annotations/SuppressFBWarnings.java
new file mode 100644
index 000000000000..fb1d5b1a126d
--- /dev/null
+++ b/core/src/main/java/org/apache/druid/annotations/SuppressFBWarnings.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.annotations;
+
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+
+/**
+ * Annotation for suppressing spotbugs checks when necessary.
+ */
+@Retention(RetentionPolicy.CLASS)
+public @interface SuppressFBWarnings
+{
+ /**
+ * The set of FindBugs warnings that are to be suppressed in
+ * annotated element. The value can be a bug category, kind or pattern.
+ *
+ */
+ String[] value() default {};
+
+ /**
+ * Optional documentation of the reason why the warning is suppressed
+ */
+ String justification() default "";
+}
diff --git a/distribution/bin/check-licenses.py b/distribution/bin/check-licenses.py
index 9d55ee54bcae..0d3f83b0255d 100755
--- a/distribution/bin/check-licenses.py
+++ b/distribution/bin/check-licenses.py
@@ -277,6 +277,7 @@ def build_compatible_license_names():
compatible_licenses['The MIT License'] = 'MIT License'
compatible_licenses['MIT License'] = 'MIT License'
compatible_licenses['The MIT License (MIT)'] = 'MIT License'
+ compatible_licenses['Bouncy Castle Licence'] = 'MIT License'
compatible_licenses['-'] = '-'
return compatible_licenses
diff --git a/distribution/pom.xml b/distribution/pom.xml
index d5c7a6ed274c..75d5f11c71a3 100644
--- a/distribution/pom.xml
+++ b/distribution/pom.xml
@@ -243,6 +243,8 @@
org.apache.druid.extensions:druid-pac4j
-c
org.apache.druid.extensions:druid-ranger-security
+ -c
+ org.apache.druid.extensions:druid-kubernetes-extensions
${druid.distribution.pulldeps.opts}
diff --git a/docs/development/extensions-core/kubernetes.md b/docs/development/extensions-core/kubernetes.md
new file mode 100644
index 000000000000..513a98d9c655
--- /dev/null
+++ b/docs/development/extensions-core/kubernetes.md
@@ -0,0 +1,59 @@
+---
+id: druid-kubernetes
+title: "Kubernetes"
+---
+
+
+
+Consider this an [EXPERIMENTAL](../experimental.md) feature mostly because it has not been tested yet on a wide variety of long running Druid clusters.
+
+Apache Druid Extension to enable using Kubernetes API Server for node discovery and leader election. This extension allows Druid cluster deployment on Kubernetes without Zookeeper. It allows running multiple Druid clusters within same Kubernetes Cluster, See `clusterIdentifier` config below.
+
+
+## Configuration
+
+To use this extension please make sure to [include](../../development/extensions.md#loading-extensions) `druid-kubernetes-extensions` as an extension.
+
+This extension works together with HTTP based segment and task management in Druid. Consequently, following configurations must be set on all Druid nodes.
+
+`druid.zk.service.enabled=false`
+`druid.serverview.type=http`
+`druid.coordinator.loadqueuepeon.type=http`
+`druid.indexer.runner.type=httpRemote`
+`druid.discovery.type=k8s`
+
+For Node Discovery, Each Druid process running inside a pod "announces" itself by adding few "labels" and "annotations" in the pod spec. So, to add those...
+- Druid process needs to be aware of pod name and namespace which it reads from environment variables `POD_NAME` and `POD_NAMESPACE`. These variable names can be changed, see configuration below. But in the end, each pod needs to have pod name and namespace added as environment variables.
+- Label/Annotation path in the pod spec must exist, which is easily satisfied if there is at least one label/annotation in the pod spec already. This limitation may be removed in future.
+
+Additionally, this extension has following configuration.
+
+### Properties
+|Property|Possible Values|Description|Default|required|
+|--------|---------------|-----------|-------|--------|
+|`druid.discovery.k8s.clusterIdentifier`|`string that matches [a-z0-9][a-z0-9-]*[a-z0-9]`|Unique identifier for this Druid cluster in Kubernetes e.g. us-west-prod-druid.|None|Yes|
+|`druid.discovery.k8s.podNameEnvKey`|`Pod Env Variable`|Pod Env variable whose value is that pod's name.|POD_NAME|No|
+|`druid.discovery.k8s.podNamespaceEnvKey`|`Pod Env Variable`|Pod Env variable whose value is that pod's kubernetes namespace.|POD_NAMESPACE|No|
+|`druid.discovery.k8s.coordinatorLeaderElectionConfigMapNamespace`|`k8s namespace`|Leader election algorithm requires creating a ConfigMap resource in a namespace. This MUST only be provided if different coordinator pods run in different namespaces, such setup is discouraged however.|coordinator pod's namespace|No|
+|`druid.discovery.k8s.overlordLeaderElectionConfigMapNamespace`|`k8s namespace`|Leader election algorithm requires creating a ConfigMap resource in a namespace. This MUST only be provided if different overlord pods run in different namespaces, such setup is discouraged however.|overlord pod's namespace|No|
+|`druid.discovery.k8s.leaseDuration`|`Duration`|Lease duration used by Leader Election algorithm. Candidates wait for this time before taking over previous Leader.|PT60S|No|
+|`druid.discovery.k8s.renewDeadline`|`Duration`|Lease renewal period used by Leader.|PT17S|No|
+|`druid.discovery.k8s.retryPeriod`|`Duration`|Retry wait used by Leader Election algorithm on failed operations.|PT5S|No|
+
diff --git a/docs/operations/kubernetes.md b/docs/operations/kubernetes.md
index cba2bf044562..7298dfa9fbf8 100644
--- a/docs/operations/kubernetes.md
+++ b/docs/operations/kubernetes.md
@@ -30,3 +30,5 @@ $docker pull apache/druid:0.16.0-incubating
```
[druid-operator](https://github.com/druid-io/druid-operator) can be used to manage a Druid cluster on [Kubernetes](https://kubernetes.io/) .
+
+Druid clusters deployed on Kubernetes can function without Zookeeper using [druid–kubernetes-extensions](../development/extensions-core/kubernetes.md) .
diff --git a/extensions-core/kubernetes-extensions/pom.xml b/extensions-core/kubernetes-extensions/pom.xml
new file mode 100644
index 000000000000..39368a68105c
--- /dev/null
+++ b/extensions-core/kubernetes-extensions/pom.xml
@@ -0,0 +1,152 @@
+
+
+
+
+ 4.0.0
+
+ org.apache.druid.extensions
+ druid-kubernetes-extensions
+ druid-kubernetes-extensions
+ druid-kubernetes-extensions
+
+
+ org.apache.druid
+ druid
+ 0.21.0-SNAPSHOT
+ ../../pom.xml
+
+
+
+ 10.0.0
+
+
+
+
+ org.apache.druid
+ druid-server
+ ${project.parent.version}
+ provided
+
+
+ org.apache.druid
+ druid-core
+ ${project.parent.version}
+ provided
+
+
+ org.apache.druid
+ druid-processing
+ ${project.parent.version}
+ provided
+
+
+
+ io.kubernetes
+ client-java
+ ${kubernetes.client.version}
+
+
+ io.kubernetes
+ client-java-extended
+ ${kubernetes.client.version}
+
+
+ io.kubernetes
+ client-java-api
+ ${kubernetes.client.version}
+
+
+
+
+ junit
+ junit
+ test
+
+
+ org.easymock
+ easymock
+ test
+
+
+
+
+ com.google.code.findbugs
+ jsr305
+ provided
+
+
+ com.google.inject
+ guice
+ provided
+
+
+ com.fasterxml.jackson.core
+ jackson-databind
+ provided
+
+
+ com.google.inject.extensions
+ guice-multibindings
+ provided
+
+
+ joda-time
+ joda-time
+ provided
+
+
+ com.google.guava
+ guava
+ provided
+
+
+ com.fasterxml.jackson.core
+ jackson-core
+ provided
+
+
+ com.fasterxml.jackson.core
+ jackson-annotations
+ provided
+
+
+
+
+
+
+
+ org.jacoco
+ jacoco-maven-plugin
+ 0.8.5
+
+
+
+ org/apache/druid/k8s/discovery/K8sDiscoveryModule*
+
+
+ org/apache/druid/k8s/discovery/DefaultK8sApiClient*
+ org/apache/druid/k8s/discovery/DefaultK8sLeaderElectorFactory*
+
+
+
+
+
+
diff --git a/extensions-core/kubernetes-extensions/src/main/java/org/apache/druid/k8s/discovery/DefaultK8sApiClient.java b/extensions-core/kubernetes-extensions/src/main/java/org/apache/druid/k8s/discovery/DefaultK8sApiClient.java
new file mode 100644
index 000000000000..32ad62316006
--- /dev/null
+++ b/extensions-core/kubernetes-extensions/src/main/java/org/apache/druid/k8s/discovery/DefaultK8sApiClient.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.k8s.discovery;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
+import com.google.inject.Inject;
+import io.kubernetes.client.custom.V1Patch;
+import io.kubernetes.client.openapi.ApiClient;
+import io.kubernetes.client.openapi.ApiException;
+import io.kubernetes.client.openapi.apis.CoreV1Api;
+import io.kubernetes.client.openapi.models.V1Pod;
+import io.kubernetes.client.openapi.models.V1PodList;
+import io.kubernetes.client.util.Watch;
+import org.apache.druid.discovery.DiscoveryDruidNode;
+import org.apache.druid.discovery.NodeRole;
+import org.apache.druid.guice.annotations.Json;
+import org.apache.druid.java.util.common.RE;
+import org.apache.druid.java.util.common.logger.Logger;
+
+import java.io.IOException;
+import java.net.SocketTimeoutException;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Concrete {@link K8sApiClient} impl using k8s-client java lib.
+ */
+public class DefaultK8sApiClient implements K8sApiClient
+{
+ private static final Logger LOGGER = new Logger(DefaultK8sApiClient.class);
+
+ private final ApiClient realK8sClient;
+ private final CoreV1Api coreV1Api;
+ private final ObjectMapper jsonMapper;
+
+ @Inject
+ public DefaultK8sApiClient(ApiClient realK8sClient, @Json ObjectMapper jsonMapper)
+ {
+ this.realK8sClient = realK8sClient;
+ this.coreV1Api = new CoreV1Api(realK8sClient);
+ this.jsonMapper = jsonMapper;
+ }
+
+ @Override
+ public void patchPod(String podName, String podNamespace, String jsonPatchStr)
+ {
+ try {
+ coreV1Api.patchNamespacedPod(podName, podNamespace, new V1Patch(jsonPatchStr), "true", null, null, null);
+ }
+ catch (ApiException ex) {
+ throw new RE(ex, "Failed to patch pod[%s/%s], code[%d], error[%s].", podNamespace, podName, ex.getCode(), ex.getResponseBody());
+ }
+ }
+
+ @Override
+ public DiscoveryDruidNodeList listPods(
+ String podNamespace,
+ String labelSelector,
+ NodeRole nodeRole
+ )
+ {
+ try {
+ V1PodList podList = coreV1Api.listNamespacedPod(podNamespace, null, null, null, null, labelSelector, 0, null, null, null);
+ Preconditions.checkState(podList != null, "WTH: NULL podList");
+
+ Map allNodes = new HashMap();
+ for (V1Pod podDef : podList.getItems()) {
+ DiscoveryDruidNode node = getDiscoveryDruidNodeFromPodDef(nodeRole, podDef);
+ allNodes.put(node.getDruidNode().getHostAndPortToUse(), node);
+ }
+ return new DiscoveryDruidNodeList(podList.getMetadata().getResourceVersion(), allNodes);
+ }
+ catch (ApiException ex) {
+ throw new RE(ex, "Expection in listing pods, code[%d] and error[%s].", ex.getCode(), ex.getResponseBody());
+ }
+ }
+
+ private DiscoveryDruidNode getDiscoveryDruidNodeFromPodDef(NodeRole nodeRole, V1Pod podDef)
+ {
+ String jsonStr = podDef.getMetadata().getAnnotations().get(K8sDruidNodeAnnouncer.getInfoAnnotation(nodeRole));
+ try {
+ return jsonMapper.readValue(jsonStr, DiscoveryDruidNode.class);
+ }
+ catch (JsonProcessingException ex) {
+ throw new RE(ex, "Failed to deserialize DiscoveryDruidNode[%s]", jsonStr);
+ }
+ }
+
+ @Override
+ public WatchResult watchPods(String namespace, String labelSelector, String lastKnownResourceVersion, NodeRole nodeRole)
+ {
+ try {
+ Watch watch =
+ Watch.createWatch(
+ realK8sClient,
+ coreV1Api.listNamespacedPodCall(namespace, null, true, null, null,
+ labelSelector, null, lastKnownResourceVersion, 0, true, null
+ ),
+ new TypeReference>()
+ {
+ }.getType()
+ );
+
+ return new WatchResult()
+ {
+ private Watch.Response obj;
+
+ @Override
+ public boolean hasNext() throws SocketTimeoutException
+ {
+ try {
+ while (watch.hasNext()) {
+ Watch.Response item = watch.next();
+ if (item != null && item.type != null) {
+ obj = new Watch.Response(
+ item.type,
+ new DiscoveryDruidNodeAndResourceVersion(
+ item.object.getMetadata().getResourceVersion(),
+ getDiscoveryDruidNodeFromPodDef(nodeRole, item.object)
+ )
+ );
+ return true;
+ } else {
+ LOGGER.error("WTH! item or item.type is NULL");
+ }
+ }
+ }
+ catch (RuntimeException ex) {
+ if (ex.getCause() instanceof SocketTimeoutException) {
+ throw (SocketTimeoutException) ex.getCause();
+ } else {
+ throw ex;
+ }
+ }
+
+ return false;
+ }
+
+ @Override
+ public Watch.Response next()
+ {
+ return obj;
+ }
+
+ @Override
+ public void close()
+ {
+ try {
+ watch.close();
+ }
+ catch (IOException ex) {
+ throw new RE(ex, "Exception while closing watch.");
+ }
+ }
+ };
+ }
+ catch (ApiException ex) {
+ if (ex.getCode() == 410) {
+ // k8s no longer has history that we need
+ return null;
+ }
+
+ throw new RE(ex, "Expection in watching pods, code[%d] and error[%s].", ex.getCode(), ex.getResponseBody());
+ }
+ }
+}
diff --git a/extensions-core/kubernetes-extensions/src/main/java/org/apache/druid/k8s/discovery/DefaultK8sLeaderElectorFactory.java b/extensions-core/kubernetes-extensions/src/main/java/org/apache/druid/k8s/discovery/DefaultK8sLeaderElectorFactory.java
new file mode 100644
index 000000000000..7ac6b5e54ba4
--- /dev/null
+++ b/extensions-core/kubernetes-extensions/src/main/java/org/apache/druid/k8s/discovery/DefaultK8sLeaderElectorFactory.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.k8s.discovery;
+
+import com.google.inject.Inject;
+import io.kubernetes.client.extended.leaderelection.LeaderElectionConfig;
+import io.kubernetes.client.extended.leaderelection.LeaderElector;
+import io.kubernetes.client.extended.leaderelection.Lock;
+import io.kubernetes.client.extended.leaderelection.resourcelock.ConfigMapLock;
+import io.kubernetes.client.openapi.ApiClient;
+import io.kubernetes.client.openapi.ApiException;
+import org.apache.druid.java.util.common.RE;
+
+import java.time.Duration;
+
+/**
+ * Concrete {@link K8sLeaderElectorFactory} impl using k8s-client java lib.
+ */
+public class DefaultK8sLeaderElectorFactory implements K8sLeaderElectorFactory
+{
+ private final ApiClient realK8sClient;
+ private final K8sDiscoveryConfig discoveryConfig;
+
+ @Inject
+ public DefaultK8sLeaderElectorFactory(ApiClient realK8sClient, K8sDiscoveryConfig discoveryConfig)
+ {
+ this.realK8sClient = realK8sClient;
+ this.discoveryConfig = discoveryConfig;
+ }
+
+ @Override
+ public K8sLeaderElector create(String candidateId, String namespace, String lockResourceName)
+ {
+ Lock lock = createLock(candidateId, namespace, lockResourceName, realK8sClient);
+ LeaderElectionConfig leaderElectionConfig =
+ new LeaderElectionConfig(
+ lock,
+ Duration.ofMillis(discoveryConfig.getLeaseDuration().getMillis()),
+ Duration.ofMillis(discoveryConfig.getRenewDeadline().getMillis()),
+ Duration.ofMillis(discoveryConfig.getRetryPeriod().getMillis())
+ );
+ LeaderElector leaderElector = new LeaderElector(leaderElectionConfig);
+
+ return new K8sLeaderElector()
+ {
+ @Override
+ public String getCurrentLeader()
+ {
+ try {
+ return lock.get().getHolderIdentity();
+ }
+ catch (ApiException ex) {
+ throw new RE(ex, "Failed to get current leader for [%s]", lockResourceName);
+ }
+ }
+
+ @Override
+ public void run(Runnable startLeadingHook, Runnable stopLeadingHook)
+ {
+ leaderElector.run(startLeadingHook, stopLeadingHook);
+ }
+ };
+ }
+
+ private Lock createLock(String candidateId, String namespace, String lockResourceName, ApiClient k8sApiClient)
+ {
+ return new ConfigMapLock(
+ namespace,
+ lockResourceName,
+ candidateId,
+ k8sApiClient
+ );
+ }
+}
diff --git a/extensions-core/kubernetes-extensions/src/main/java/org/apache/druid/k8s/discovery/DiscoveryDruidNodeAndResourceVersion.java b/extensions-core/kubernetes-extensions/src/main/java/org/apache/druid/k8s/discovery/DiscoveryDruidNodeAndResourceVersion.java
new file mode 100644
index 000000000000..6b634c7317cc
--- /dev/null
+++ b/extensions-core/kubernetes-extensions/src/main/java/org/apache/druid/k8s/discovery/DiscoveryDruidNodeAndResourceVersion.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.k8s.discovery;
+
+import org.apache.druid.discovery.DiscoveryDruidNode;
+
+public class DiscoveryDruidNodeAndResourceVersion
+{
+ private final String resourceVersion;
+ private final DiscoveryDruidNode node;
+
+ public DiscoveryDruidNodeAndResourceVersion(String resourceVersion, DiscoveryDruidNode node)
+ {
+ this.resourceVersion = resourceVersion;
+ this.node = node;
+ }
+
+ public String getResourceVersion()
+ {
+ return resourceVersion;
+ }
+
+ public DiscoveryDruidNode getNode()
+ {
+ return node;
+ }
+}
diff --git a/extensions-core/kubernetes-extensions/src/main/java/org/apache/druid/k8s/discovery/DiscoveryDruidNodeList.java b/extensions-core/kubernetes-extensions/src/main/java/org/apache/druid/k8s/discovery/DiscoveryDruidNodeList.java
new file mode 100644
index 000000000000..cdc82ebdff68
--- /dev/null
+++ b/extensions-core/kubernetes-extensions/src/main/java/org/apache/druid/k8s/discovery/DiscoveryDruidNodeList.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.k8s.discovery;
+
+import com.google.common.base.Preconditions;
+import org.apache.druid.discovery.DiscoveryDruidNode;
+
+import javax.annotation.Nullable;
+import java.util.Collections;
+import java.util.Map;
+
+public class DiscoveryDruidNodeList
+{
+ private final String resourceVersion;
+ private final Map druidNodes;
+
+ public DiscoveryDruidNodeList(
+ String resourceVersion,
+ @Nullable Map druidNodes
+ )
+ {
+ this.resourceVersion = Preconditions.checkNotNull(resourceVersion, "NULL resource version!");
+ this.druidNodes = druidNodes == null ? Collections.emptyMap() : druidNodes;
+ }
+
+ public String getResourceVersion()
+ {
+ return resourceVersion;
+ }
+
+ public Map getDruidNodes()
+ {
+ return druidNodes;
+ }
+}
diff --git a/extensions-core/kubernetes-extensions/src/main/java/org/apache/druid/k8s/discovery/K8sApiClient.java b/extensions-core/kubernetes-extensions/src/main/java/org/apache/druid/k8s/discovery/K8sApiClient.java
new file mode 100644
index 000000000000..1e61677420c8
--- /dev/null
+++ b/extensions-core/kubernetes-extensions/src/main/java/org/apache/druid/k8s/discovery/K8sApiClient.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.k8s.discovery;
+
+import org.apache.druid.discovery.NodeRole;
+
+/**
+ * Interface to abstract pod read/update with K8S API Server to allow unit tests with mock impl.
+ */
+public interface K8sApiClient
+{
+ void patchPod(String podName, String namespace, String jsonPatchStr);
+
+ DiscoveryDruidNodeList listPods(String namespace, String labelSelector, NodeRole nodeRole);
+
+ /**
+ * @return NULL if history not available or else return the {@link WatchResult} object
+ */
+ WatchResult watchPods(String namespace, String labelSelector, String lastKnownResourceVersion, NodeRole nodeRole);
+}
diff --git a/extensions-core/kubernetes-extensions/src/main/java/org/apache/druid/k8s/discovery/K8sDiscoveryConfig.java b/extensions-core/kubernetes-extensions/src/main/java/org/apache/druid/k8s/discovery/K8sDiscoveryConfig.java
new file mode 100644
index 000000000000..998b8641c83a
--- /dev/null
+++ b/extensions-core/kubernetes-extensions/src/main/java/org/apache/druid/k8s/discovery/K8sDiscoveryConfig.java
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.k8s.discovery;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Preconditions;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.joda.time.Duration;
+
+import javax.annotation.Nonnull;
+import java.util.Objects;
+import java.util.regex.Pattern;
+
+public class K8sDiscoveryConfig
+{
+ private static final Logger LOGGER = new Logger(K8sDiscoveryConfig.class);
+
+ public static final Pattern K8S_RESOURCE_NAME_REGEX = Pattern.compile("[a-z0-9][a-z0-9-]*[a-z0-9]");
+
+ @JsonProperty
+ @Nonnull
+ private final String clusterIdentifier;
+
+ @JsonProperty
+ private final String podNameEnvKey;
+
+ @JsonProperty
+ private final String podNamespaceEnvKey;
+
+ @JsonProperty
+ private final String coordinatorLeaderElectionConfigMapNamespace;
+
+ @JsonProperty
+ private final String overlordLeaderElectionConfigMapNamespace;
+
+ @JsonProperty
+ private final Duration leaseDuration;
+
+ @JsonProperty
+ private final Duration renewDeadline;
+
+ @JsonProperty
+ private final Duration retryPeriod;
+
+ @JsonCreator
+ public K8sDiscoveryConfig(
+ @JsonProperty("clusterIdentifier") String clusterIdentifier,
+ @JsonProperty("podNameEnvKey") String podNameEnvKey,
+ @JsonProperty("podNamespaceEnvKey") String podNamespaceEnvKey,
+ @JsonProperty("coordinatorLeaderElectionConfigMapNamespace") String coordinatorLeaderElectionConfigMapNamespace,
+ @JsonProperty("overlordLeaderElectionConfigMapNamespace") String overlordLeaderElectionConfigMapNamespace,
+ @JsonProperty("leaseDuration") Duration leaseDuration,
+ @JsonProperty("renewDeadline") Duration renewDeadline,
+ @JsonProperty("retryPeriod") Duration retryPeriod
+ )
+ {
+ Preconditions.checkArgument(clusterIdentifier != null && !clusterIdentifier.isEmpty(), "null/empty clusterIdentifier");
+ Preconditions.checkArgument(
+ K8S_RESOURCE_NAME_REGEX.matcher(clusterIdentifier).matches(),
+ "clusterIdentifier[%s] is used in k8s resource name and must match regex[%s]",
+ clusterIdentifier,
+ K8S_RESOURCE_NAME_REGEX.pattern()
+ );
+ this.clusterIdentifier = clusterIdentifier;
+
+ this.podNameEnvKey = podNameEnvKey == null ? "POD_NAME" : podNameEnvKey;
+ this.podNamespaceEnvKey = podNamespaceEnvKey == null ? "POD_NAMESPACE" : podNamespaceEnvKey;
+
+ if (coordinatorLeaderElectionConfigMapNamespace == null) {
+ LOGGER.warn("IF coordinator pods run in multiple namespaces, then you MUST provide coordinatorLeaderElectionConfigMapNamespace");
+ }
+ this.coordinatorLeaderElectionConfigMapNamespace = coordinatorLeaderElectionConfigMapNamespace;
+
+ if (overlordLeaderElectionConfigMapNamespace == null) {
+ LOGGER.warn("IF overlord pods run in multiple namespaces, then you MUST provide overlordLeaderElectionConfigMapNamespace");
+ }
+ this.overlordLeaderElectionConfigMapNamespace = overlordLeaderElectionConfigMapNamespace;
+
+ this.leaseDuration = leaseDuration == null ? Duration.millis(60000) : leaseDuration;
+ this.renewDeadline = renewDeadline == null ? Duration.millis(17000) : renewDeadline;
+ this.retryPeriod = retryPeriod == null ? Duration.millis(5000) : retryPeriod;
+ }
+
+ @JsonProperty
+ public String getClusterIdentifier()
+ {
+ return clusterIdentifier;
+ }
+
+ @JsonProperty
+ public String getPodNameEnvKey()
+ {
+ return podNameEnvKey;
+ }
+
+ @JsonProperty
+ public String getPodNamespaceEnvKey()
+ {
+ return podNamespaceEnvKey;
+ }
+
+ @JsonProperty
+ public String getCoordinatorLeaderElectionConfigMapNamespace()
+ {
+ return coordinatorLeaderElectionConfigMapNamespace;
+ }
+
+ @JsonProperty
+ public String getOverlordLeaderElectionConfigMapNamespace()
+ {
+ return overlordLeaderElectionConfigMapNamespace;
+ }
+
+ @JsonProperty
+ public Duration getLeaseDuration()
+ {
+ return leaseDuration;
+ }
+
+ @JsonProperty
+ public Duration getRenewDeadline()
+ {
+ return renewDeadline;
+ }
+
+ @JsonProperty
+ public Duration getRetryPeriod()
+ {
+ return retryPeriod;
+ }
+
+ @Override
+ public String toString()
+ {
+ return "K8sDiscoveryConfig{" +
+ "clusterIdentifier='" + clusterIdentifier + '\'' +
+ ", podNameEnvKey='" + podNameEnvKey + '\'' +
+ ", podNamespaceEnvKey='" + podNamespaceEnvKey + '\'' +
+ ", coordinatorLeaderElectionConfigMapNamespace='" + coordinatorLeaderElectionConfigMapNamespace + '\'' +
+ ", overlordLeaderElectionConfigMapNamespace='" + overlordLeaderElectionConfigMapNamespace + '\'' +
+ ", leaseDuration=" + leaseDuration +
+ ", renewDeadline=" + renewDeadline +
+ ", retryPeriod=" + retryPeriod +
+ '}';
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ K8sDiscoveryConfig that = (K8sDiscoveryConfig) o;
+ return clusterIdentifier.equals(that.clusterIdentifier) &&
+ Objects.equals(podNameEnvKey, that.podNameEnvKey) &&
+ Objects.equals(podNamespaceEnvKey, that.podNamespaceEnvKey) &&
+ Objects.equals(
+ coordinatorLeaderElectionConfigMapNamespace,
+ that.coordinatorLeaderElectionConfigMapNamespace
+ ) &&
+ Objects.equals(
+ overlordLeaderElectionConfigMapNamespace,
+ that.overlordLeaderElectionConfigMapNamespace
+ ) &&
+ Objects.equals(leaseDuration, that.leaseDuration) &&
+ Objects.equals(renewDeadline, that.renewDeadline) &&
+ Objects.equals(retryPeriod, that.retryPeriod);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hash(
+ clusterIdentifier,
+ podNameEnvKey,
+ podNamespaceEnvKey,
+ coordinatorLeaderElectionConfigMapNamespace,
+ overlordLeaderElectionConfigMapNamespace,
+ leaseDuration,
+ renewDeadline,
+ retryPeriod
+ );
+ }
+}
diff --git a/extensions-core/kubernetes-extensions/src/main/java/org/apache/druid/k8s/discovery/K8sDiscoveryModule.java b/extensions-core/kubernetes-extensions/src/main/java/org/apache/druid/k8s/discovery/K8sDiscoveryModule.java
new file mode 100644
index 000000000000..6da6819ff4b5
--- /dev/null
+++ b/extensions-core/kubernetes-extensions/src/main/java/org/apache/druid/k8s/discovery/K8sDiscoveryModule.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.k8s.discovery;
+
+import com.fasterxml.jackson.databind.Module;
+import com.google.inject.Binder;
+import com.google.inject.Inject;
+import com.google.inject.Key;
+import com.google.inject.Provider;
+import io.kubernetes.client.openapi.ApiClient;
+import io.kubernetes.client.util.Config;
+import org.apache.druid.client.coordinator.Coordinator;
+import org.apache.druid.client.indexing.IndexingService;
+import org.apache.druid.discovery.DruidLeaderSelector;
+import org.apache.druid.discovery.DruidNodeAnnouncer;
+import org.apache.druid.discovery.DruidNodeDiscoveryProvider;
+import org.apache.druid.guice.JsonConfigProvider;
+import org.apache.druid.guice.LazySingleton;
+import org.apache.druid.guice.PolyBind;
+import org.apache.druid.guice.annotations.Self;
+import org.apache.druid.initialization.DruidModule;
+import org.apache.druid.server.DruidNode;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+
+public class K8sDiscoveryModule implements DruidModule
+{
+ private static final String K8S_KEY = "k8s";
+
+ @Override
+ public List extends Module> getJacksonModules()
+ {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public void configure(Binder binder)
+ {
+ JsonConfigProvider.bind(binder, "druid.discovery.k8s", K8sDiscoveryConfig.class);
+
+ binder.bind(ApiClient.class)
+ .toProvider(
+ () -> {
+ try {
+ // Note: we can probably improve things here about figuring out how to find the K8S API server,
+ // HTTP client timeouts etc.
+ return Config.defaultClient();
+ }
+ catch (IOException ex) {
+ throw new RuntimeException("Failed to create K8s ApiClient instance", ex);
+ }
+ }
+ )
+ .in(LazySingleton.class);
+
+ binder.bind(K8sApiClient.class).to(DefaultK8sApiClient.class).in(LazySingleton.class);
+ binder.bind(K8sLeaderElectorFactory.class).to(DefaultK8sLeaderElectorFactory.class).in(LazySingleton.class);
+
+ PolyBind.optionBinder(binder, Key.get(DruidNodeDiscoveryProvider.class))
+ .addBinding(K8S_KEY)
+ .to(K8sDruidNodeDiscoveryProvider.class)
+ .in(LazySingleton.class);
+
+ PolyBind.optionBinder(binder, Key.get(DruidNodeAnnouncer.class))
+ .addBinding(K8S_KEY)
+ .to(K8sDruidNodeAnnouncer.class)
+ .in(LazySingleton.class);
+
+ PolyBind.optionBinder(binder, Key.get(DruidLeaderSelector.class, Coordinator.class))
+ .addBinding(K8S_KEY)
+ .toProvider(
+ new DruidLeaderSelectorProvider(true)
+ )
+ .in(LazySingleton.class);
+
+ PolyBind.optionBinder(binder, Key.get(DruidLeaderSelector.class, IndexingService.class))
+ .addBinding(K8S_KEY)
+ .toProvider(
+ new DruidLeaderSelectorProvider(false)
+ )
+ .in(LazySingleton.class);
+ }
+
+ private static class DruidLeaderSelectorProvider implements Provider
+ {
+ @Inject
+ @Self
+ private DruidNode druidNode;
+
+ @Inject
+ private PodInfo podInfo;
+
+ @Inject
+ private K8sDiscoveryConfig discoveryConfig;
+
+ @Inject
+ private Provider k8sApiClientProvider;
+
+ private boolean isCoordinator;
+
+ DruidLeaderSelectorProvider(boolean isCoordinator)
+ {
+ this.isCoordinator = isCoordinator;
+ }
+
+ @Override
+ public DruidLeaderSelector get()
+ {
+ // Note: these can not be setup in the constructor because injected K8sDiscoveryConfig and PodInfo
+ // are not available at that time.
+ String lockResourceName;
+ String lockResourceNamespace;
+
+ if (isCoordinator) {
+ lockResourceName = discoveryConfig.getClusterIdentifier() + "-leaderelection-coordinator";
+ lockResourceNamespace = discoveryConfig.getCoordinatorLeaderElectionConfigMapNamespace() == null ?
+ podInfo.getPodNamespace() : discoveryConfig.getCoordinatorLeaderElectionConfigMapNamespace();
+ } else {
+ lockResourceName = discoveryConfig.getClusterIdentifier() + "-leaderelection-overlord";
+ lockResourceNamespace = discoveryConfig.getOverlordLeaderElectionConfigMapNamespace() == null ?
+ podInfo.getPodNamespace() : discoveryConfig.getOverlordLeaderElectionConfigMapNamespace();
+ }
+
+ return new K8sDruidLeaderSelector(
+ druidNode,
+ lockResourceName,
+ lockResourceNamespace,
+ discoveryConfig,
+ new DefaultK8sLeaderElectorFactory(k8sApiClientProvider.get(), discoveryConfig)
+ );
+ }
+ }
+}
diff --git a/extensions-core/kubernetes-extensions/src/main/java/org/apache/druid/k8s/discovery/K8sDruidLeaderSelector.java b/extensions-core/kubernetes-extensions/src/main/java/org/apache/druid/k8s/discovery/K8sDruidLeaderSelector.java
new file mode 100644
index 000000000000..2cf5a8d581b2
--- /dev/null
+++ b/extensions-core/kubernetes-extensions/src/main/java/org/apache/druid/k8s/discovery/K8sDruidLeaderSelector.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.k8s.discovery;
+
+import com.google.common.base.Preconditions;
+import org.apache.druid.annotations.SuppressFBWarnings;
+import org.apache.druid.concurrent.LifecycleLock;
+import org.apache.druid.discovery.DruidLeaderSelector;
+import org.apache.druid.guice.annotations.Self;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.guava.CloseQuietly;
+import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.server.DruidNode;
+
+import javax.annotation.Nullable;
+
+public class K8sDruidLeaderSelector implements DruidLeaderSelector
+{
+ private static final EmittingLogger LOGGER = new EmittingLogger(K8sDruidLeaderSelector.class);
+
+ private final LifecycleLock lifecycleLock = new LifecycleLock();
+
+ private DruidLeaderSelector.Listener listener = null;
+ private final LeaderElectorAsyncWrapper leaderLatch;
+
+ private volatile boolean leader = false;
+
+ @SuppressFBWarnings(value = "VO_VOLATILE_INCREMENT", justification = "incremented but in single thread")
+ private volatile int term = 0;
+
+ public K8sDruidLeaderSelector(@Self DruidNode self, String lockResourceName, String lockResourceNamespace, K8sDiscoveryConfig discoveryConfig, K8sLeaderElectorFactory k8sLeaderElectorFactory)
+ {
+ this.leaderLatch = new LeaderElectorAsyncWrapper(
+ self.getServiceScheme() + "://" + self.getHostAndPortToUse(),
+ lockResourceName,
+ lockResourceNamespace,
+ discoveryConfig,
+ k8sLeaderElectorFactory
+ );
+ }
+
+ private void startLeaderElector(LeaderElectorAsyncWrapper leaderElector)
+ {
+ leaderElector.run(
+ () -> {
+ try {
+ if (leader) {
+ LOGGER.warn("I'm being asked to become leader. But I am already the leader. Ignored event.");
+ return;
+ }
+
+ leader = true;
+ term++;
+ listener.becomeLeader();
+ }
+ catch (Throwable ex) {
+ LOGGER.makeAlert(ex, "listener becomeLeader() failed. Unable to become leader").emit();
+
+ CloseQuietly.close(leaderLatch);
+ leader = false;
+ //Exit and Kubernetes would simply create a new replacement pod.
+ System.exit(1);
+ }
+ },
+ () -> {
+ try {
+ if (!leader) {
+ LOGGER.warn("I'm being asked to stop being leader. But I am not the leader. Ignored event.");
+ return;
+ }
+
+ leader = false;
+ listener.stopBeingLeader();
+ }
+ catch (Throwable ex) {
+ LOGGER.makeAlert(ex, "listener.stopBeingLeader() failed. Unable to stopBeingLeader").emit();
+ }
+ }
+ );
+ }
+
+ @Nullable
+ @Override
+ public String getCurrentLeader()
+ {
+ try {
+ return leaderLatch.getCurrentLeader();
+ }
+ catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public boolean isLeader()
+ {
+ return leader;
+ }
+
+ @Override
+ public int localTerm()
+ {
+ return term;
+ }
+
+ @Override
+ public void registerListener(DruidLeaderSelector.Listener listener)
+ {
+ Preconditions.checkArgument(listener != null, "listener is null.");
+
+ if (!lifecycleLock.canStart()) {
+ throw new ISE("can't start.");
+ }
+ try {
+ this.listener = listener;
+ startLeaderElector(leaderLatch);
+ lifecycleLock.started();
+ }
+ catch (Exception ex) {
+ throw new RuntimeException(ex);
+ }
+ finally {
+ lifecycleLock.exitStart();
+ }
+ }
+
+ @Override
+ public void unregisterListener()
+ {
+ if (!lifecycleLock.canStop()) {
+ throw new ISE("can't stop.");
+ }
+ CloseQuietly.close(leaderLatch);
+ }
+}
diff --git a/extensions-core/kubernetes-extensions/src/main/java/org/apache/druid/k8s/discovery/K8sDruidNodeAnnouncer.java b/extensions-core/kubernetes-extensions/src/main/java/org/apache/druid/k8s/discovery/K8sDruidNodeAnnouncer.java
new file mode 100644
index 000000000000..29c06f443a3e
--- /dev/null
+++ b/extensions-core/kubernetes-extensions/src/main/java/org/apache/druid/k8s/discovery/K8sDruidNodeAnnouncer.java
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.k8s.discovery;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
+import com.google.inject.Inject;
+import org.apache.druid.discovery.DiscoveryDruidNode;
+import org.apache.druid.discovery.DruidNodeAnnouncer;
+import org.apache.druid.discovery.NodeRole;
+import org.apache.druid.guice.annotations.Json;
+import org.apache.druid.java.util.common.RE;
+import org.apache.druid.java.util.common.RetryUtils;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.server.DruidNode;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Announcement creates following in the pod def...
+ *
+ * Labels -
+ * druidDiscoveryAnnouncement- = true
+ * druidDiscoveryAnnouncement-id = encodeHostPort(host:port)
+ * druidDiscoveryAnnouncement-cluster-identifier =
+ *
+ * Annotation -
+ * druidNodeInfo- = json_serialize(DiscoveryDruidNode)
+ *
+ * Note that, a node can have multiple roles e.g. coordinator can take up overlord's role as well.
+ */
+public class K8sDruidNodeAnnouncer implements DruidNodeAnnouncer
+{
+ private static final Logger LOGGER = new Logger(K8sDruidNodeAnnouncer.class);
+
+ private static String POD_LABELS_PATH_PREFIX = "/metadata/labels";
+ private static String POD_ANNOTATIONS_PATH_PREFIX = "/metadata/annotations";
+
+ private static final String OP_ADD = "add";
+ private static final String OP_REMOVE = "remove";
+
+ public static final String ANNOUNCEMENT_DONE = "true";
+
+ private final ObjectMapper jsonMapper;
+ private final K8sDiscoveryConfig discoveryConfig;
+ private final PodInfo podInfo;
+ private final K8sApiClient k8sApiClient;
+
+ @Inject
+ public K8sDruidNodeAnnouncer(
+ PodInfo podInfo,
+ K8sDiscoveryConfig discoveryConfig,
+ K8sApiClient k8sApiClient,
+ @Json ObjectMapper jsonMapper
+ )
+ {
+ this.discoveryConfig = discoveryConfig;
+ this.podInfo = podInfo;
+ this.k8sApiClient = k8sApiClient;
+ this.jsonMapper = jsonMapper;
+ }
+
+ @Override
+ public void announce(DiscoveryDruidNode discoveryDruidNode)
+ {
+ LOGGER.info("Announcing DiscoveryDruidNode[%s]", discoveryDruidNode);
+
+ String roleAnnouncementLabel = getRoleAnnouncementLabel(discoveryDruidNode.getNodeRole());
+ String idAnnouncementLabel = getIdAnnouncementLabel();
+ String clusterIdentifierAnnouncementLabel = getClusterIdentifierAnnouncementLabel();
+ String infoAnnotation = getInfoAnnotation(discoveryDruidNode.getNodeRole());
+
+ try {
+ List