Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
c5abb31
Add ITHighAvailabilityTest for zookeeper-less Druid build
himanshug Dec 20, 2020
4f4b583
prepping for PR
himanshug Jan 13, 2021
8af598a
revert k8s java lib to 10.0.1
himanshug Jan 13, 2021
f982072
fix travis.yml
himanshug May 5, 2021
8e23239
upgrate k8s client dependency version to 12.0.0
himanshug May 5, 2021
80401e9
fix compilation error
himanshug May 5, 2021
c8f57b0
add license header to postgres.yaml
himanshug May 6, 2021
710e840
add Override annotation in DockerConfigProvider.java
himanshug May 6, 2021
4b2f99d
license.yaml updates
himanshug May 7, 2021
2b99837
Merge remote-tracking branch 'origin/master' into k8s_build
himanshug May 8, 2021
3621810
update travis.yml
himanshug May 8, 2021
195c9cc
it config files need to be top level so that they are found by java code
himanshug May 8, 2021
22b6891
add postgres to k8s when doing druid-on-k8s
himanshug May 8, 2021
5a6909e
fix druid-k8s-high-availability-it-config.json
himanshug May 9, 2021
cdb009c
remove sudo from minikube setup so that non-root user points to corre…
himanshug May 9, 2021
0bdc6c6
seems minikube needs sudo for none driver
himanshug May 9, 2021
901eb8b
update minikube and kubernetes cluster version
himanshug May 9, 2021
822d667
watch api does not expect a resourceVersionMatch param
himanshug May 12, 2021
a3d2017
temporarily get logs of coordinator/overlord for debugging
himanshug May 13, 2021
d48b8b9
revert the sleep for druid cluster getting setup
himanshug May 13, 2021
3d42fd9
uncomment inadvertantly commented code
himanshug May 13, 2021
6ac95ca
print postgres pod logs too
himanshug May 14, 2021
eab5cca
more debug logs
himanshug May 14, 2021
ec4504a
Merge remote-tracking branch 'origin/master' into k8s_build
himanshug Jul 2, 2021
ff94ebc
some more postgres debugging
himanshug Jul 2, 2021
ab500c8
try making postgress service non-headless
himanshug Jul 3, 2021
520f4f2
lazily get DruidDiscoveryNode from watch events as they are not prese…
himanshug Jul 4, 2021
4d78413
add retries to delete pod call in K8sDruidClusterAdminClient, saw a B…
himanshug Jul 4, 2021
2b2436d
change k8s version to 1.19.8 in minikube
himanshug Jul 5, 2021
445335a
try setting http1.1 protocol when talking to k8s to fix Broken Pipe e…
himanshug Jul 6, 2021
d48ac5f
fix indentation to make checkstyle happy
himanshug Jul 6, 2021
12fc7ef
Merge remote-tracking branch 'origin/master' into k8s_build
himanshug Oct 2, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 16 additions & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -713,14 +713,29 @@ jobs:
jdk: openjdk8
services: &integration_test_services_k8s
- docker
env: CONFIG_FILE='k8s_run_config_file.json' IT_TEST='-Dit.test=ITNestedQueryPushDownTest' POD_NAME=int-test POD_NAMESPACE=default BUILD_DRUID_CLSUTER=true
env: CONFIG_FILE='druid-k8s-sanity-it-config.json' IT_TEST='-Dit.test=ITNestedQueryPushDownTest' POD_NAME=int-test POD_NAMESPACE=default BUILD_DRUID_CLSUTER=true DRUID_K8S_CLUSTER_SPEC='integration-tests/k8s/druid-sanity-it-cluster.yaml'
script: &run_integration_test_k8s
- ${MVN} verify -pl integration-tests -P int-tests-config-file ${IT_TEST} ${MAVEN_SKIP} -Dpod.name=${POD_NAME} -Dpod.namespace=${POD_NAMESPACE} -Dbuild.druid.cluster=${BUILD_DRUID_CLSUTER}
after_failure: &integration_test_diags_k8s
- for v in broker middlemanager router coordinator historical ; do
echo "------------------------druid-tiny-cluster-"$v"s-0-------------------------";
sudo /usr/local/bin/kubectl logs --tail 1000 druid-tiny-cluster-"$v"s-0;
done

- &integration_high_availability_k8s
name: "(Compile=openjdk8, Run=openjdk8, Cluster Build On K8s) ITHighAvailabilityTest integration test"
jdk: openjdk8
services: &integration_test_services_k8s
- docker
env: CONFIG_FILE='druid-k8s-high-availability-it-config.json' IT_TEST='-Dit.test=ITHighAvailabilityTest' POD_NAME=int-test POD_NAMESPACE=default BUILD_DRUID_CLSUTER=true DRUID_K8S_CLUSTER_SPEC='integration-tests/k8s/druid-high-availability-it-cluster.yaml'
script: &run_integration_test_k8s
- ${MVN} verify -pl integration-tests -P int-tests-config-file ${IT_TEST} ${MAVEN_SKIP} -Dpod.name=${POD_NAME} -Dpod.namespace=${POD_NAMESPACE} -Dbuild.druid.cluster=${BUILD_DRUID_CLSUTER}
after_failure: &integration_test_diags_k8s
- for v in broker middlemanager router coordinator historical ; do
echo "------------------------druid-tiny-cluster-"$v"s-0-------------------------";
sudo /usr/local/bin/kubectl logs --tail 1000 druid-tiny-cluster-"$v"s-0;
done

- name: "security vulnerabilities"
stage: cron
install: skip
Expand Down
4 changes: 0 additions & 4 deletions extensions-core/kubernetes-extensions/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,6 @@
<relativePath>../../pom.xml</relativePath>
</parent>

<properties>
<kubernetes.client.version>10.0.1</kubernetes.client.version>
</properties>

<dependencies>
<dependency>
<groupId>org.apache.druid</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ public DiscoveryDruidNodeList listPods(
)
{
try {
V1PodList podList = coreV1Api.listNamespacedPod(podNamespace, null, null, null, null, labelSelector, 0, null, null, null);
V1PodList podList = coreV1Api.listNamespacedPod(podNamespace, null, null, null, null, labelSelector, null, null, null, null, null);
Preconditions.checkState(podList != null, "WTH: NULL podList");

Map<String, DiscoveryDruidNode> allNodes = new HashMap();
Expand Down Expand Up @@ -114,7 +114,7 @@ public WatchResult watchPods(String namespace, String labelSelector, String last
Watch.createWatch(
realK8sClient,
coreV1Api.listNamespacedPodCall(namespace, null, true, null, null,
labelSelector, null, lastKnownResourceVersion, 0, true, null
labelSelector, null, lastKnownResourceVersion, null, 0, true, null
),
new TypeReference<Watch.Response<V1Pod>>()
{
Expand All @@ -136,7 +136,7 @@ public boolean hasNext() throws SocketTimeoutException
item.type,
new DiscoveryDruidNodeAndResourceVersion(
item.object.getMetadata().getResourceVersion(),
getDiscoveryDruidNodeFromPodDef(nodeRole, item.object)
() -> getDiscoveryDruidNodeFromPodDef(nodeRole, item.object)
)
);
return true;
Expand Down Expand Up @@ -179,7 +179,7 @@ public void close()
// k8s no longer has history that we need
return null;
}

throw new RE(ex, "Expection in watching pods, code[%d] and error[%s].", ex.getCode(), ex.getResponseBody());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,12 @@ public void run(Runnable startLeadingHook, Runnable stopLeadingHook)
{
leaderElector.run(startLeadingHook, stopLeadingHook);
}

@Override
public void close()
{
leaderElector.close();
}
};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,20 @@

import org.apache.druid.discovery.DiscoveryDruidNode;

import java.util.function.Supplier;

public class DiscoveryDruidNodeAndResourceVersion
{
private final String resourceVersion;
private final DiscoveryDruidNode node;
private final Supplier<DiscoveryDruidNode> node;

public DiscoveryDruidNodeAndResourceVersion(String resourceVersion, DiscoveryDruidNode node)
{
this.resourceVersion = resourceVersion;
this.node = () -> node;
}

public DiscoveryDruidNodeAndResourceVersion(String resourceVersion, Supplier<DiscoveryDruidNode> node)
{
this.resourceVersion = resourceVersion;
this.node = node;
Expand All @@ -39,6 +47,6 @@ public String getResourceVersion()

public DiscoveryDruidNode getNode()
{
return node;
return node.get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -84,18 +84,11 @@ public K8sDiscoveryConfig(
this.podNameEnvKey = podNameEnvKey == null ? "POD_NAME" : podNameEnvKey;
this.podNamespaceEnvKey = podNamespaceEnvKey == null ? "POD_NAMESPACE" : podNamespaceEnvKey;

if (coordinatorLeaderElectionConfigMapNamespace == null) {
LOGGER.warn("IF coordinator pods run in multiple namespaces, then you MUST provide coordinatorLeaderElectionConfigMapNamespace");
}
this.coordinatorLeaderElectionConfigMapNamespace = coordinatorLeaderElectionConfigMapNamespace;

if (overlordLeaderElectionConfigMapNamespace == null) {
LOGGER.warn("IF overlord pods run in multiple namespaces, then you MUST provide overlordLeaderElectionConfigMapNamespace");
}
this.overlordLeaderElectionConfigMapNamespace = overlordLeaderElectionConfigMapNamespace;

this.leaseDuration = leaseDuration == null ? Duration.millis(60000) : leaseDuration;
this.renewDeadline = renewDeadline == null ? Duration.millis(17000) : renewDeadline;
this.renewDeadline = renewDeadline == null ? Duration.millis(47000) : renewDeadline;
this.retryPeriod = retryPeriod == null ? Duration.millis(5000) : retryPeriod;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import com.google.inject.Provider;
import io.kubernetes.client.openapi.ApiClient;
import io.kubernetes.client.util.Config;
import okhttp3.Protocol;
import org.apache.druid.client.coordinator.Coordinator;
import org.apache.druid.client.indexing.IndexingService;
import org.apache.druid.discovery.DruidLeaderSelector;
Expand Down Expand Up @@ -63,7 +64,15 @@ public void configure(Binder binder)
try {
// Note: we can probably improve things here about figuring out how to find the K8S API server,
// HTTP client timeouts etc.
return Config.defaultClient();
ApiClient k8sClient = Config.defaultClient();
k8sClient.setHttpClient(
k8sClient
.getHttpClient()
.newBuilder()
.protocols(Collections.singletonList((Protocol.HTTP_1_1)))
.build()
);
return k8sClient;
}
catch (IOException ex) {
throw new RuntimeException("Failed to create K8s ApiClient instance", ex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,8 @@ private void keepWatching(String namespace, String labelSelector, String resourc
baseNodeRoleWatcher.childRemoved(item.object.getNode());
break;
default:
// It is expected to receive additional events of type BOOKMARK and MODIFIED
// that are used only to update the last known resourceVersion
}

// This should be updated after the action has been dealt with successfully
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,4 +38,9 @@ interface K8sLeaderElector
{
String getCurrentLeader();
void run(Runnable startLeadingHook, Runnable stopLeadingHook);

default void close()
{

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,11 @@ public void run(Runnable startLeadingHook, Runnable stopLeadingHook)
executor = Execs.singleThreaded(this.getClass().getSimpleName());
futureRef.set(executor.submit(
() -> {
if (!lifecycleLock.awaitStarted()) {
LOGGER.error("Lifecycle not started, LeaderElection will not run.");
return;
}

while (lifecycleLock.awaitStarted(1, TimeUnit.MILLISECONDS)) {
try {
k8sLeaderElector.run(startLeadingHook, stopLeadingHook);
Expand All @@ -101,9 +106,11 @@ public void close()
}

try {
LOGGER.info("Stoppig k8s LeaderElector...");
k8sLeaderElector.close();
futureRef.get().cancel(true);
executor.shutdownNow();
if (!executor.awaitTermination(10, TimeUnit.SECONDS)) {
if (!executor.awaitTermination(3, TimeUnit.SECONDS)) {
LOGGER.warn("Failed to terminate [%s] executor.", this.getClass().getSimpleName());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public class K8sAnnouncerAndDiscoveryIntTest

private final ObjectMapper jsonMapper = new DefaultObjectMapper();

private final PodInfo podInfo = new PodInfo("busybox", "default");
private final PodInfo podInfo = new PodInfo("postgres-0", "default");

private final K8sDiscoveryConfig discoveryConfig = new K8sDiscoveryConfig("druid-cluster", null, null, null, null, null, null, null);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,20 @@
package org.apache.druid.k8s.discovery;

import io.kubernetes.client.openapi.ApiClient;
import io.kubernetes.client.openapi.ApiException;
import io.kubernetes.client.openapi.apis.CoreV1Api;
import io.kubernetes.client.util.Config;
import org.apache.druid.discovery.DiscoveryDruidNode;
import org.apache.druid.discovery.DruidLeaderSelector;
import org.apache.druid.discovery.NodeRole;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.server.DruidNode;
import org.joda.time.Duration;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;

import java.net.HttpURLConnection;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;

Expand All @@ -41,6 +44,8 @@
@Ignore("Needs K8S API Server")
public class K8sDruidLeaderElectionIntTest
{
private static final Logger LOGGER = new Logger(K8sDruidLeaderElectionIntTest.class);

private final DiscoveryDruidNode testNode1 = new DiscoveryDruidNode(
new DruidNode("druid/router", "test-host1", true, 80, null, true, false),
NodeRole.ROUTER,
Expand All @@ -54,7 +59,7 @@ public class K8sDruidLeaderElectionIntTest
);

private final K8sDiscoveryConfig discoveryConfig = new K8sDiscoveryConfig("druid-cluster", null, null, "default", "default",
Duration.millis(10_000), Duration.millis(7_000), Duration.millis(3_000));
null, null, null);

private final ApiClient k8sApiClient;

Expand Down Expand Up @@ -91,6 +96,7 @@ public void becomeLeader()
public void stopBeingLeader()
{
try {
// wait to make sure start-being-leader notification came first
becomeLeaderLatch.await();
stopBeingLeaderLatch.countDown();
}
Expand All @@ -100,54 +106,76 @@ public void stopBeingLeader()
}
});

LOGGER.info("Waiting for leadership notification...");
becomeLeaderLatch.await();

LOGGER.info("Waiting for stop-being-leader notification...");
stopBeingLeaderLatch.await();

Assert.assertFalse(failed.get());
}

@Test(timeout = 60000L)
public void test_leaderCandidate_stopped() throws Exception
{
K8sDruidLeaderSelector leaderSelector = new K8sDruidLeaderSelector(testNode1.getDruidNode(), lockResourceName, discoveryConfig.getCoordinatorLeaderElectionConfigMapNamespace(), discoveryConfig, new DefaultK8sLeaderElectorFactory(k8sApiClient, discoveryConfig));
// delete the lock resource if it exists, or else first leader candidate would need to wait for a whole
// leaseDuration configured
try {
CoreV1Api coreV1Api = new CoreV1Api(k8sApiClient);
coreV1Api.deleteNamespacedConfigMap(
lockResourceName,
discoveryConfig.getCoordinatorLeaderElectionConfigMapNamespace(),
null,
null,
null,
null,
null,
null
);
LOGGER.info("Deleted existing lock resource [%s]", lockResourceName);
}
catch (ApiException ex) {
if (ex.getCode() != HttpURLConnection.HTTP_NOT_FOUND) {
throw ex;
}
}

CountDownLatch becomeLeaderLatch = new CountDownLatch(1);
CountDownLatch stopBeingLeaderLatch = new CountDownLatch(1);
K8sDruidLeaderSelector leaderSelector1 = new K8sDruidLeaderSelector(testNode1.getDruidNode(), lockResourceName, discoveryConfig.getCoordinatorLeaderElectionConfigMapNamespace(), discoveryConfig, new DefaultK8sLeaderElectorFactory(k8sApiClient, discoveryConfig));

AtomicBoolean failed = new AtomicBoolean(false);
CountDownLatch becomeLeaderLatch1 = new CountDownLatch(1);
CountDownLatch stopBeingLeaderLatch1 = new CountDownLatch(1);

leaderSelector.registerListener(new DruidLeaderSelector.Listener()
AtomicBoolean failed1 = new AtomicBoolean(false);

leaderSelector1.registerListener(new DruidLeaderSelector.Listener()
{
@Override
public void becomeLeader()
{
becomeLeaderLatch.countDown();
becomeLeaderLatch1.countDown();
}

@Override
public void stopBeingLeader()
{
try {
becomeLeaderLatch.await();
stopBeingLeaderLatch.countDown();
// wait to make sure start-being-leader notification came first
becomeLeaderLatch1.await();
stopBeingLeaderLatch1.countDown();
}
catch (InterruptedException ex) {
failed.set(true);
failed1.set(true);
}
}
});

becomeLeaderLatch.await();

leaderSelector.unregisterListener();

stopBeingLeaderLatch.await();
Assert.assertFalse(failed.get());

leaderSelector = new K8sDruidLeaderSelector(testNode2.getDruidNode(), lockResourceName, discoveryConfig.getCoordinatorLeaderElectionConfigMapNamespace(), discoveryConfig, new DefaultK8sLeaderElectorFactory(k8sApiClient, discoveryConfig));
LOGGER.info("Waiting for candidate#1 start-being-leader notification...");
becomeLeaderLatch1.await();
LOGGER.info("Candidate#1 start-being-leader notification arrived.");

K8sDruidLeaderSelector leaderSelector2 = new K8sDruidLeaderSelector(testNode2.getDruidNode(), lockResourceName, discoveryConfig.getCoordinatorLeaderElectionConfigMapNamespace(), discoveryConfig, new DefaultK8sLeaderElectorFactory(k8sApiClient, discoveryConfig));
CountDownLatch becomeLeaderLatch2 = new CountDownLatch(1);

leaderSelector.registerListener(new DruidLeaderSelector.Listener()
leaderSelector2.registerListener(new DruidLeaderSelector.Listener()
{
@Override
public void becomeLeader()
Expand All @@ -161,6 +189,18 @@ public void stopBeingLeader()
}
});

LOGGER.info("Waiting for candidate#1 start-being-leader notification...");
becomeLeaderLatch1.await();
LOGGER.info("Candidate#1 start-being-leader notification arrived.");

LOGGER.info("Candidate#1 stopping leader election...");
leaderSelector1.unregisterListener();

LOGGER.info("Waiting for candidate#1 to receive stop-being-leader notification.");
stopBeingLeaderLatch1.await();
Assert.assertFalse(failed1.get());

LOGGER.info("Waiting for candidate#2 start-being-leader notification...");
becomeLeaderLatch2.await();
}
}
Loading