diff --git a/.travis.yml b/.travis.yml index dc995cd55d11..ad9b6b99edb7 100644 --- a/.travis.yml +++ b/.travis.yml @@ -713,7 +713,7 @@ jobs: jdk: openjdk8 services: &integration_test_services_k8s - docker - env: CONFIG_FILE='k8s_run_config_file.json' IT_TEST='-Dit.test=ITNestedQueryPushDownTest' POD_NAME=int-test POD_NAMESPACE=default BUILD_DRUID_CLSUTER=true + env: CONFIG_FILE='druid-k8s-sanity-it-config.json' IT_TEST='-Dit.test=ITNestedQueryPushDownTest' POD_NAME=int-test POD_NAMESPACE=default BUILD_DRUID_CLSUTER=true DRUID_K8S_CLUSTER_SPEC='integration-tests/k8s/druid-sanity-it-cluster.yaml' script: &run_integration_test_k8s - ${MVN} verify -pl integration-tests -P int-tests-config-file ${IT_TEST} ${MAVEN_SKIP} -Dpod.name=${POD_NAME} -Dpod.namespace=${POD_NAMESPACE} -Dbuild.druid.cluster=${BUILD_DRUID_CLSUTER} after_failure: &integration_test_diags_k8s @@ -721,6 +721,21 @@ jobs: echo "------------------------druid-tiny-cluster-"$v"s-0-------------------------"; sudo /usr/local/bin/kubectl logs --tail 1000 druid-tiny-cluster-"$v"s-0; done + + - &integration_high_availability_k8s + name: "(Compile=openjdk8, Run=openjdk8, Cluster Build On K8s) ITHighAvailabilityTest integration test" + jdk: openjdk8 + services: &integration_test_services_k8s + - docker + env: CONFIG_FILE='druid-k8s-high-availability-it-config.json' IT_TEST='-Dit.test=ITHighAvailabilityTest' POD_NAME=int-test POD_NAMESPACE=default BUILD_DRUID_CLSUTER=true DRUID_K8S_CLUSTER_SPEC='integration-tests/k8s/druid-high-availability-it-cluster.yaml' + script: &run_integration_test_k8s + - ${MVN} verify -pl integration-tests -P int-tests-config-file ${IT_TEST} ${MAVEN_SKIP} -Dpod.name=${POD_NAME} -Dpod.namespace=${POD_NAMESPACE} -Dbuild.druid.cluster=${BUILD_DRUID_CLSUTER} + after_failure: &integration_test_diags_k8s + - for v in broker middlemanager router coordinator historical ; do + echo "------------------------druid-tiny-cluster-"$v"s-0-------------------------"; + sudo /usr/local/bin/kubectl logs --tail 1000 druid-tiny-cluster-"$v"s-0; + done + - name: "security vulnerabilities" stage: cron install: skip diff --git a/extensions-core/kubernetes-extensions/pom.xml b/extensions-core/kubernetes-extensions/pom.xml index c1bd477c182a..bd7c4c10e598 100644 --- a/extensions-core/kubernetes-extensions/pom.xml +++ b/extensions-core/kubernetes-extensions/pom.xml @@ -34,10 +34,6 @@ ../../pom.xml - - 10.0.1 - - org.apache.druid diff --git a/extensions-core/kubernetes-extensions/src/main/java/org/apache/druid/k8s/discovery/DefaultK8sApiClient.java b/extensions-core/kubernetes-extensions/src/main/java/org/apache/druid/k8s/discovery/DefaultK8sApiClient.java index 32ad62316006..de2b2752a441 100644 --- a/extensions-core/kubernetes-extensions/src/main/java/org/apache/druid/k8s/discovery/DefaultK8sApiClient.java +++ b/extensions-core/kubernetes-extensions/src/main/java/org/apache/druid/k8s/discovery/DefaultK8sApiClient.java @@ -80,7 +80,7 @@ public DiscoveryDruidNodeList listPods( ) { try { - V1PodList podList = coreV1Api.listNamespacedPod(podNamespace, null, null, null, null, labelSelector, 0, null, null, null); + V1PodList podList = coreV1Api.listNamespacedPod(podNamespace, null, null, null, null, labelSelector, null, null, null, null, null); Preconditions.checkState(podList != null, "WTH: NULL podList"); Map allNodes = new HashMap(); @@ -114,7 +114,7 @@ public WatchResult watchPods(String namespace, String labelSelector, String last Watch.createWatch( realK8sClient, coreV1Api.listNamespacedPodCall(namespace, null, true, null, null, - labelSelector, null, lastKnownResourceVersion, 0, true, null + labelSelector, null, lastKnownResourceVersion, null, 0, true, null ), new TypeReference>() { @@ -136,7 +136,7 @@ public boolean hasNext() throws SocketTimeoutException item.type, new DiscoveryDruidNodeAndResourceVersion( item.object.getMetadata().getResourceVersion(), - getDiscoveryDruidNodeFromPodDef(nodeRole, item.object) + () -> getDiscoveryDruidNodeFromPodDef(nodeRole, item.object) ) ); return true; @@ -179,7 +179,7 @@ public void close() // k8s no longer has history that we need return null; } - + throw new RE(ex, "Expection in watching pods, code[%d] and error[%s].", ex.getCode(), ex.getResponseBody()); } } diff --git a/extensions-core/kubernetes-extensions/src/main/java/org/apache/druid/k8s/discovery/DefaultK8sLeaderElectorFactory.java b/extensions-core/kubernetes-extensions/src/main/java/org/apache/druid/k8s/discovery/DefaultK8sLeaderElectorFactory.java index 7ac6b5e54ba4..47b4efebd42c 100644 --- a/extensions-core/kubernetes-extensions/src/main/java/org/apache/druid/k8s/discovery/DefaultK8sLeaderElectorFactory.java +++ b/extensions-core/kubernetes-extensions/src/main/java/org/apache/druid/k8s/discovery/DefaultK8sLeaderElectorFactory.java @@ -76,6 +76,12 @@ public void run(Runnable startLeadingHook, Runnable stopLeadingHook) { leaderElector.run(startLeadingHook, stopLeadingHook); } + + @Override + public void close() + { + leaderElector.close(); + } }; } diff --git a/extensions-core/kubernetes-extensions/src/main/java/org/apache/druid/k8s/discovery/DiscoveryDruidNodeAndResourceVersion.java b/extensions-core/kubernetes-extensions/src/main/java/org/apache/druid/k8s/discovery/DiscoveryDruidNodeAndResourceVersion.java index 6b634c7317cc..8b70b1ba034c 100644 --- a/extensions-core/kubernetes-extensions/src/main/java/org/apache/druid/k8s/discovery/DiscoveryDruidNodeAndResourceVersion.java +++ b/extensions-core/kubernetes-extensions/src/main/java/org/apache/druid/k8s/discovery/DiscoveryDruidNodeAndResourceVersion.java @@ -21,12 +21,20 @@ import org.apache.druid.discovery.DiscoveryDruidNode; +import java.util.function.Supplier; + public class DiscoveryDruidNodeAndResourceVersion { private final String resourceVersion; - private final DiscoveryDruidNode node; + private final Supplier node; public DiscoveryDruidNodeAndResourceVersion(String resourceVersion, DiscoveryDruidNode node) + { + this.resourceVersion = resourceVersion; + this.node = () -> node; + } + + public DiscoveryDruidNodeAndResourceVersion(String resourceVersion, Supplier node) { this.resourceVersion = resourceVersion; this.node = node; @@ -39,6 +47,6 @@ public String getResourceVersion() public DiscoveryDruidNode getNode() { - return node; + return node.get(); } } diff --git a/extensions-core/kubernetes-extensions/src/main/java/org/apache/druid/k8s/discovery/K8sDiscoveryConfig.java b/extensions-core/kubernetes-extensions/src/main/java/org/apache/druid/k8s/discovery/K8sDiscoveryConfig.java index 998b8641c83a..59fff738a8ea 100644 --- a/extensions-core/kubernetes-extensions/src/main/java/org/apache/druid/k8s/discovery/K8sDiscoveryConfig.java +++ b/extensions-core/kubernetes-extensions/src/main/java/org/apache/druid/k8s/discovery/K8sDiscoveryConfig.java @@ -84,18 +84,11 @@ public K8sDiscoveryConfig( this.podNameEnvKey = podNameEnvKey == null ? "POD_NAME" : podNameEnvKey; this.podNamespaceEnvKey = podNamespaceEnvKey == null ? "POD_NAMESPACE" : podNamespaceEnvKey; - if (coordinatorLeaderElectionConfigMapNamespace == null) { - LOGGER.warn("IF coordinator pods run in multiple namespaces, then you MUST provide coordinatorLeaderElectionConfigMapNamespace"); - } this.coordinatorLeaderElectionConfigMapNamespace = coordinatorLeaderElectionConfigMapNamespace; - - if (overlordLeaderElectionConfigMapNamespace == null) { - LOGGER.warn("IF overlord pods run in multiple namespaces, then you MUST provide overlordLeaderElectionConfigMapNamespace"); - } this.overlordLeaderElectionConfigMapNamespace = overlordLeaderElectionConfigMapNamespace; this.leaseDuration = leaseDuration == null ? Duration.millis(60000) : leaseDuration; - this.renewDeadline = renewDeadline == null ? Duration.millis(17000) : renewDeadline; + this.renewDeadline = renewDeadline == null ? Duration.millis(47000) : renewDeadline; this.retryPeriod = retryPeriod == null ? Duration.millis(5000) : retryPeriod; } diff --git a/extensions-core/kubernetes-extensions/src/main/java/org/apache/druid/k8s/discovery/K8sDiscoveryModule.java b/extensions-core/kubernetes-extensions/src/main/java/org/apache/druid/k8s/discovery/K8sDiscoveryModule.java index 6da6819ff4b5..42fca39df173 100644 --- a/extensions-core/kubernetes-extensions/src/main/java/org/apache/druid/k8s/discovery/K8sDiscoveryModule.java +++ b/extensions-core/kubernetes-extensions/src/main/java/org/apache/druid/k8s/discovery/K8sDiscoveryModule.java @@ -26,6 +26,7 @@ import com.google.inject.Provider; import io.kubernetes.client.openapi.ApiClient; import io.kubernetes.client.util.Config; +import okhttp3.Protocol; import org.apache.druid.client.coordinator.Coordinator; import org.apache.druid.client.indexing.IndexingService; import org.apache.druid.discovery.DruidLeaderSelector; @@ -63,7 +64,15 @@ public void configure(Binder binder) try { // Note: we can probably improve things here about figuring out how to find the K8S API server, // HTTP client timeouts etc. - return Config.defaultClient(); + ApiClient k8sClient = Config.defaultClient(); + k8sClient.setHttpClient( + k8sClient + .getHttpClient() + .newBuilder() + .protocols(Collections.singletonList((Protocol.HTTP_1_1))) + .build() + ); + return k8sClient; } catch (IOException ex) { throw new RuntimeException("Failed to create K8s ApiClient instance", ex); diff --git a/extensions-core/kubernetes-extensions/src/main/java/org/apache/druid/k8s/discovery/K8sDruidNodeDiscoveryProvider.java b/extensions-core/kubernetes-extensions/src/main/java/org/apache/druid/k8s/discovery/K8sDruidNodeDiscoveryProvider.java index 08dfafd1e510..38ec5567ddf2 100644 --- a/extensions-core/kubernetes-extensions/src/main/java/org/apache/druid/k8s/discovery/K8sDruidNodeDiscoveryProvider.java +++ b/extensions-core/kubernetes-extensions/src/main/java/org/apache/druid/k8s/discovery/K8sDruidNodeDiscoveryProvider.java @@ -276,6 +276,8 @@ private void keepWatching(String namespace, String labelSelector, String resourc baseNodeRoleWatcher.childRemoved(item.object.getNode()); break; default: + // It is expected to receive additional events of type BOOKMARK and MODIFIED + // that are used only to update the last known resourceVersion } // This should be updated after the action has been dealt with successfully diff --git a/extensions-core/kubernetes-extensions/src/main/java/org/apache/druid/k8s/discovery/K8sLeaderElectorFactory.java b/extensions-core/kubernetes-extensions/src/main/java/org/apache/druid/k8s/discovery/K8sLeaderElectorFactory.java index 10bab39f65c1..a31c0cff480e 100644 --- a/extensions-core/kubernetes-extensions/src/main/java/org/apache/druid/k8s/discovery/K8sLeaderElectorFactory.java +++ b/extensions-core/kubernetes-extensions/src/main/java/org/apache/druid/k8s/discovery/K8sLeaderElectorFactory.java @@ -38,4 +38,9 @@ interface K8sLeaderElector { String getCurrentLeader(); void run(Runnable startLeadingHook, Runnable stopLeadingHook); + + default void close() + { + + } } diff --git a/extensions-core/kubernetes-extensions/src/main/java/org/apache/druid/k8s/discovery/LeaderElectorAsyncWrapper.java b/extensions-core/kubernetes-extensions/src/main/java/org/apache/druid/k8s/discovery/LeaderElectorAsyncWrapper.java index f7e1b481d044..7042014030b2 100644 --- a/extensions-core/kubernetes-extensions/src/main/java/org/apache/druid/k8s/discovery/LeaderElectorAsyncWrapper.java +++ b/extensions-core/kubernetes-extensions/src/main/java/org/apache/druid/k8s/discovery/LeaderElectorAsyncWrapper.java @@ -76,6 +76,11 @@ public void run(Runnable startLeadingHook, Runnable stopLeadingHook) executor = Execs.singleThreaded(this.getClass().getSimpleName()); futureRef.set(executor.submit( () -> { + if (!lifecycleLock.awaitStarted()) { + LOGGER.error("Lifecycle not started, LeaderElection will not run."); + return; + } + while (lifecycleLock.awaitStarted(1, TimeUnit.MILLISECONDS)) { try { k8sLeaderElector.run(startLeadingHook, stopLeadingHook); @@ -101,9 +106,11 @@ public void close() } try { + LOGGER.info("Stoppig k8s LeaderElector..."); + k8sLeaderElector.close(); futureRef.get().cancel(true); executor.shutdownNow(); - if (!executor.awaitTermination(10, TimeUnit.SECONDS)) { + if (!executor.awaitTermination(3, TimeUnit.SECONDS)) { LOGGER.warn("Failed to terminate [%s] executor.", this.getClass().getSimpleName()); } } diff --git a/extensions-core/kubernetes-extensions/src/test/java/org/apache/druid/k8s/discovery/K8sAnnouncerAndDiscoveryIntTest.java b/extensions-core/kubernetes-extensions/src/test/java/org/apache/druid/k8s/discovery/K8sAnnouncerAndDiscoveryIntTest.java index ab13b356382b..37ee785b4ab7 100644 --- a/extensions-core/kubernetes-extensions/src/test/java/org/apache/druid/k8s/discovery/K8sAnnouncerAndDiscoveryIntTest.java +++ b/extensions-core/kubernetes-extensions/src/test/java/org/apache/druid/k8s/discovery/K8sAnnouncerAndDiscoveryIntTest.java @@ -50,7 +50,7 @@ public class K8sAnnouncerAndDiscoveryIntTest private final ObjectMapper jsonMapper = new DefaultObjectMapper(); - private final PodInfo podInfo = new PodInfo("busybox", "default"); + private final PodInfo podInfo = new PodInfo("postgres-0", "default"); private final K8sDiscoveryConfig discoveryConfig = new K8sDiscoveryConfig("druid-cluster", null, null, null, null, null, null, null); diff --git a/extensions-core/kubernetes-extensions/src/test/java/org/apache/druid/k8s/discovery/K8sDruidLeaderElectionIntTest.java b/extensions-core/kubernetes-extensions/src/test/java/org/apache/druid/k8s/discovery/K8sDruidLeaderElectionIntTest.java index 168c0625cda3..f0ff05817602 100644 --- a/extensions-core/kubernetes-extensions/src/test/java/org/apache/druid/k8s/discovery/K8sDruidLeaderElectionIntTest.java +++ b/extensions-core/kubernetes-extensions/src/test/java/org/apache/druid/k8s/discovery/K8sDruidLeaderElectionIntTest.java @@ -20,17 +20,20 @@ package org.apache.druid.k8s.discovery; import io.kubernetes.client.openapi.ApiClient; +import io.kubernetes.client.openapi.ApiException; +import io.kubernetes.client.openapi.apis.CoreV1Api; import io.kubernetes.client.util.Config; import org.apache.druid.discovery.DiscoveryDruidNode; import org.apache.druid.discovery.DruidLeaderSelector; import org.apache.druid.discovery.NodeRole; +import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.server.DruidNode; -import org.joda.time.Duration; import org.junit.Assert; import org.junit.Ignore; import org.junit.Test; +import java.net.HttpURLConnection; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicBoolean; @@ -41,6 +44,8 @@ @Ignore("Needs K8S API Server") public class K8sDruidLeaderElectionIntTest { + private static final Logger LOGGER = new Logger(K8sDruidLeaderElectionIntTest.class); + private final DiscoveryDruidNode testNode1 = new DiscoveryDruidNode( new DruidNode("druid/router", "test-host1", true, 80, null, true, false), NodeRole.ROUTER, @@ -54,7 +59,7 @@ public class K8sDruidLeaderElectionIntTest ); private final K8sDiscoveryConfig discoveryConfig = new K8sDiscoveryConfig("druid-cluster", null, null, "default", "default", - Duration.millis(10_000), Duration.millis(7_000), Duration.millis(3_000)); + null, null, null); private final ApiClient k8sApiClient; @@ -91,6 +96,7 @@ public void becomeLeader() public void stopBeingLeader() { try { + // wait to make sure start-being-leader notification came first becomeLeaderLatch.await(); stopBeingLeaderLatch.countDown(); } @@ -100,54 +106,76 @@ public void stopBeingLeader() } }); + LOGGER.info("Waiting for leadership notification..."); becomeLeaderLatch.await(); + + LOGGER.info("Waiting for stop-being-leader notification..."); stopBeingLeaderLatch.await(); + Assert.assertFalse(failed.get()); } @Test(timeout = 60000L) public void test_leaderCandidate_stopped() throws Exception { - K8sDruidLeaderSelector leaderSelector = new K8sDruidLeaderSelector(testNode1.getDruidNode(), lockResourceName, discoveryConfig.getCoordinatorLeaderElectionConfigMapNamespace(), discoveryConfig, new DefaultK8sLeaderElectorFactory(k8sApiClient, discoveryConfig)); + // delete the lock resource if it exists, or else first leader candidate would need to wait for a whole + // leaseDuration configured + try { + CoreV1Api coreV1Api = new CoreV1Api(k8sApiClient); + coreV1Api.deleteNamespacedConfigMap( + lockResourceName, + discoveryConfig.getCoordinatorLeaderElectionConfigMapNamespace(), + null, + null, + null, + null, + null, + null + ); + LOGGER.info("Deleted existing lock resource [%s]", lockResourceName); + } + catch (ApiException ex) { + if (ex.getCode() != HttpURLConnection.HTTP_NOT_FOUND) { + throw ex; + } + } - CountDownLatch becomeLeaderLatch = new CountDownLatch(1); - CountDownLatch stopBeingLeaderLatch = new CountDownLatch(1); + K8sDruidLeaderSelector leaderSelector1 = new K8sDruidLeaderSelector(testNode1.getDruidNode(), lockResourceName, discoveryConfig.getCoordinatorLeaderElectionConfigMapNamespace(), discoveryConfig, new DefaultK8sLeaderElectorFactory(k8sApiClient, discoveryConfig)); - AtomicBoolean failed = new AtomicBoolean(false); + CountDownLatch becomeLeaderLatch1 = new CountDownLatch(1); + CountDownLatch stopBeingLeaderLatch1 = new CountDownLatch(1); - leaderSelector.registerListener(new DruidLeaderSelector.Listener() + AtomicBoolean failed1 = new AtomicBoolean(false); + + leaderSelector1.registerListener(new DruidLeaderSelector.Listener() { @Override public void becomeLeader() { - becomeLeaderLatch.countDown(); + becomeLeaderLatch1.countDown(); } @Override public void stopBeingLeader() { try { - becomeLeaderLatch.await(); - stopBeingLeaderLatch.countDown(); + // wait to make sure start-being-leader notification came first + becomeLeaderLatch1.await(); + stopBeingLeaderLatch1.countDown(); } catch (InterruptedException ex) { - failed.set(true); + failed1.set(true); } } }); - becomeLeaderLatch.await(); - - leaderSelector.unregisterListener(); - - stopBeingLeaderLatch.await(); - Assert.assertFalse(failed.get()); - - leaderSelector = new K8sDruidLeaderSelector(testNode2.getDruidNode(), lockResourceName, discoveryConfig.getCoordinatorLeaderElectionConfigMapNamespace(), discoveryConfig, new DefaultK8sLeaderElectorFactory(k8sApiClient, discoveryConfig)); + LOGGER.info("Waiting for candidate#1 start-being-leader notification..."); + becomeLeaderLatch1.await(); + LOGGER.info("Candidate#1 start-being-leader notification arrived."); + K8sDruidLeaderSelector leaderSelector2 = new K8sDruidLeaderSelector(testNode2.getDruidNode(), lockResourceName, discoveryConfig.getCoordinatorLeaderElectionConfigMapNamespace(), discoveryConfig, new DefaultK8sLeaderElectorFactory(k8sApiClient, discoveryConfig)); CountDownLatch becomeLeaderLatch2 = new CountDownLatch(1); - - leaderSelector.registerListener(new DruidLeaderSelector.Listener() + leaderSelector2.registerListener(new DruidLeaderSelector.Listener() { @Override public void becomeLeader() @@ -161,6 +189,18 @@ public void stopBeingLeader() } }); + LOGGER.info("Waiting for candidate#1 start-being-leader notification..."); + becomeLeaderLatch1.await(); + LOGGER.info("Candidate#1 start-being-leader notification arrived."); + + LOGGER.info("Candidate#1 stopping leader election..."); + leaderSelector1.unregisterListener(); + + LOGGER.info("Waiting for candidate#1 to receive stop-being-leader notification."); + stopBeingLeaderLatch1.await(); + Assert.assertFalse(failed1.get()); + + LOGGER.info("Waiting for candidate#2 start-being-leader notification..."); becomeLeaderLatch2.await(); } } diff --git a/integration-tests/druid-k8s-high-availability-it-config.json b/integration-tests/druid-k8s-high-availability-it-config.json new file mode 100644 index 000000000000..dbc09d7d5567 --- /dev/null +++ b/integration-tests/druid-k8s-high-availability-it-config.json @@ -0,0 +1,28 @@ +{ + "broker_host" : "localhost", + "broker_port" : "30100", + "broker_internal_host": "druid-tiny-cluster-brokers-0.broker-druid-tiny-cluster-brokers-service.default.svc.cluster.local", + "broker_tls_url" : "http://localhost:30100", + "coordinator_host" : "localhost", + "coordinator_port" : "30200", + "coordinator_internal_host": "druid-tiny-cluster-coordinator1-0.coordinator1-druid-tiny-cluster-coordinator1-service.default.svc.cluster.local", + "coordinator_two_host" : "localhost", + "coordinator_two_port" : "30201", + "coordinator_two_internal_host": "druid-tiny-cluster-coordinator2-0.coordinator2-druid-tiny-cluster-coordinator2-service.default.svc.cluster.local", + "router_host" : "localhost", + "router_port" : "30400", + "router_internal_host": "druid-tiny-cluster-routers-0.router-druid-tiny-cluster-routers-service.default.svc.cluster.local", + "router_tls_url" : "http://localhost:30400", + "historical_host" : "localhost", + "historical_port" : "30300", + "indexer_host" : "localhost", + "indexer_port" : "30210", + "overlord_internal_host": "druid-tiny-cluster-overlord1-0.overlord1-druid-tiny-cluster-overlord1-service.default.svc.cluster.local", + "overlord_two_host" : "localhost", + "overlord_two_port" : "30211", + "overlord_two_internal_host": "druid-tiny-cluster-overlord2-0.overlord2-druid-tiny-cluster-overlord2-service.default.svc.cluster.local", + "middlemanager_host": "localhost", + "zookeeper_hosts": "localhost:30600", + "druid_deployment_env_type": "K8S", + "druid_cluster_host": "localhost" +} \ No newline at end of file diff --git a/integration-tests/k8s_run_config_file.json b/integration-tests/druid-k8s-sanity-it-config.json similarity index 100% rename from integration-tests/k8s_run_config_file.json rename to integration-tests/druid-k8s-sanity-it-config.json diff --git a/integration-tests/k8s/druid-high-availability-it-cluster.yaml b/integration-tests/k8s/druid-high-availability-it-cluster.yaml new file mode 100644 index 000000000000..51f8f66e3236 --- /dev/null +++ b/integration-tests/k8s/druid-high-availability-it-cluster.yaml @@ -0,0 +1,407 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# +# For all pods, make sure that service port matches internal port. ITHighAvailabilityTest config assumes that the +# service is reachable at localhost:DiscoveryDruidNode.getDruidNode().getPlaintextPort() +# +apiVersion: "druid.apache.org/v1alpha1" +kind: "Druid" +metadata: + name: tiny-cluster +spec: + image: druid/cluster:v1 + # Optionally specify image for all nodes. Can be specify on nodes also + # imagePullSecrets: + # - name: tutu + startScript: /druid.sh + podLabels: + environment: stage + release: alpha + podAnnotations: + dummy: k8s_extn_needs_atleast_one_annotation + securityContext: + fsGroup: 0 + runAsUser: 0 + runAsGroup: 0 + containerSecurityContext: + privileged: true + services: + - spec: + type: ClusterIP + clusterIP: None + commonConfigMountPath: "/opt/druid/conf/druid/cluster/_common" + jvm.options: |- + -server + -XX:MaxDirectMemorySize=10240g + -Duser.timezone=UTC + -Dfile.encoding=UTF-8 + -Dlog4j.debug + -Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager + log4j.config: |- + + + + + + + + + + + + + + common.runtime.properties: | + + # + # Zookeeper-less Druid Cluster + # + druid.zk.service.enabled=false + druid.discovery.type=k8s + druid.discovery.k8s.clusterIdentifier=druid-it + druid.discovery.k8s.retryPeriod=PT1S + druid.serverview.type=http + druid.coordinator.loadqueuepeon.type=http + druid.indexer.runner.type=httpRemote + + # Metadata Store + druid.metadata.storage.type=postgresql + druid.metadata.storage.connector.connectURI=jdbc:postgresql://postgres.default.svc.cluster.local:5432/druid + druid.metadata.storage.connector.user=druid + druid.metadata.storage.connector.password=druid + druid.metadata.storage.connector.createTables=true + + # Deep Storage + druid.storage.type=local + druid.storage.storageDirectory=/druid/data/deepstorage + + # + # Extensions + # + druid.extensions.loadList=["druid-avro-extensions","druid-hdfs-storage", "druid-kafka-indexing-service", "druid-datasketches", "postgresql-metadata-storage", "druid-kubernetes-extensions"] + + # + # Service discovery + # + druid.selectors.indexing.serviceName=druid/overlord + druid.selectors.coordinator.serviceName=druid/coordinator + + druid.indexer.logs.type=file + druid.indexer.logs.directory=/druid/data/task-logs + druid.indexer.task.baseDir=/druid/data/task-base + + druid.lookup.enableLookupSyncOnStartup=false + env: + - name: POD_NAME + valueFrom: + fieldRef: + fieldPath: metadata.name + - name: POD_NAMESPACE + valueFrom: + fieldRef: + fieldPath: metadata.namespace + - name: DRUID_SET_HOST + value: "0" + + + nodes: + brokers: + # Optionally specify for running broker as Deployment + # kind: Deployment + nodeType: "broker" + # Optionally specify for broker nodes + # imagePullSecrets: + # - name: tutu + druid.port: 30100 + readinessProbe: + httpGet: + path: /status/health + port: 30100 + services: + - spec: + type: NodePort + ports: + - name: broker-service-port + nodePort: 30100 + port: 30100 + protocol: TCP + targetPort: 30100 + selector: + nodeSpecUniqueStr: druid-tiny-cluster-brokers + metadata: + name: broker-%s-service + - spec: + type: ClusterIP + clusterIP: None + nodeConfigMountPath: "/opt/druid/conf/druid/cluster/query/broker" + replicas: 1 + runtime.properties: | + druid.service=druid/broker + + # HTTP server threads + druid.broker.http.numConnections=5 + druid.server.http.numThreads=40 + + # Processing threads and buffers + druid.processing.buffer.sizeBytes=25000000 + druid.processing.numThreads=1 + druid.sql.enable=true + extra.jvm.options: |- + -Xmx512m + -Xms512m + volumeMounts: + - mountPath: /druid/data + name: data-volume + volumes: + - name: data-volume + hostPath: + path: REPLACE_VOLUMES/tmp + resources: + requests: + memory: "800Mi" + limits: + memory: "800Mi" + + coordinator1: + # Optionally specify for running coordinator as Deployment + # kind: Deployment + nodeType: "coordinator" + druid.port: 30200 + readinessProbe: + httpGet: + path: /status/health + port: 30200 + services: + - spec: + type: NodePort + ports: + - name: coordinator-service-port + nodePort: 30200 + port: 30200 + protocol: TCP + targetPort: 30200 + selector: + nodeSpecUniqueStr: druid-tiny-cluster-coordinators + metadata: + name: coordinator1-%s-service + - spec: + type: ClusterIP + clusterIP: None + nodeConfigMountPath: "/opt/druid/conf/druid/cluster/master/coordinator-overlord" + replicas: 1 + runtime.properties: | + druid.service=druid/coordinator + + # HTTP server threads + druid.coordinator.startDelay=PT30S + druid.coordinator.period=PT30S + + extra.jvm.options: |- + -Xmx800m + -Xms800m + volumeMounts: + - mountPath: /druid/data + name: data-volume + volumes: + - name: data-volume + hostPath: + path: REPLACE_VOLUMES/tmp + resources: + requests: + memory: "1G" + limits: + memory: "1G" + + coordinator2: + # Optionally specify for running coordinator as Deployment + # kind: Deployment + nodeType: "coordinator" + druid.port: 30201 + readinessProbe: + httpGet: + path: /status/health + port: 30201 + services: + - spec: + type: NodePort + ports: + - name: coordinator-service-port + nodePort: 30201 + port: 30201 + protocol: TCP + targetPort: 30201 + selector: + nodeSpecUniqueStr: druid-tiny-cluster-coordinators + metadata: + name: coordinator2-%s-service + - spec: + type: ClusterIP + clusterIP: None + nodeConfigMountPath: "/opt/druid/conf/druid/cluster/master/coordinator-overlord" + replicas: 1 + runtime.properties: | + druid.service=druid/coordinator + + # HTTP server threads + druid.coordinator.startDelay=PT30S + druid.coordinator.period=PT30S + + extra.jvm.options: |- + -Xmx800m + -Xms800m + volumeMounts: + - mountPath: /druid/data + name: data-volume + volumes: + - name: data-volume + hostPath: + path: REPLACE_VOLUMES/tmp + resources: + requests: + memory: "1G" + limits: + memory: "1G" + + overlord1: + # Optionally specify for running coordinator as Deployment + # kind: Deployment + nodeType: "overlord" + druid.port: 30210 + readinessProbe: + httpGet: + path: /status/health + port: 30210 + services: + - spec: + type: NodePort + ports: + - name: overlord-service-port + nodePort: 30210 + port: 30210 + protocol: TCP + targetPort: 30210 + selector: + nodeSpecUniqueStr: druid-tiny-cluster-overlords + metadata: + name: overlord1-%s-service + - spec: + type: ClusterIP + clusterIP: None + nodeConfigMountPath: "/opt/druid/conf/druid/cluster/master/coordinator-overlord" + replicas: 1 + runtime.properties: | + druid.service=druid/overlord + druid.indexer.queue.startDelay=PT30S + extra.jvm.options: |- + -Xmx800m + -Xms800m + volumeMounts: + - mountPath: /druid/data + name: data-volume + volumes: + - name: data-volume + hostPath: + path: REPLACE_VOLUMES/tmp + resources: + requests: + memory: "1G" + limits: + memory: "1G" + overlord2: + # Optionally specify for running coordinator as Deployment + # kind: Deployment + nodeType: "overlord" + druid.port: 30211 + readinessProbe: + httpGet: + path: /status/health + port: 30211 + services: + - spec: + type: NodePort + ports: + - name: overlord-service-port + nodePort: 30211 + port: 30211 + protocol: TCP + targetPort: 30211 + selector: + nodeSpecUniqueStr: druid-tiny-cluster-overlords + metadata: + name: overlord2-%s-service + - spec: + type: ClusterIP + clusterIP: None + nodeConfigMountPath: "/opt/druid/conf/druid/cluster/master/coordinator-overlord" + replicas: 1 + runtime.properties: | + druid.service=druid/overlord + druid.indexer.queue.startDelay=PT30S + extra.jvm.options: |- + -Xmx800m + -Xms800m + volumeMounts: + - mountPath: /druid/data + name: data-volume + volumes: + - name: data-volume + hostPath: + path: REPLACE_VOLUMES/tmp + resources: + requests: + memory: "1G" + limits: + memory: "1G" + routers: + nodeType: "router" + druid.port: 30400 + readinessProbe: + httpGet: + path: /status/health + port: 30400 + services: + - spec: + type: NodePort + ports: + - name: router-service-port + nodePort: 30400 + port: 30400 + protocol: TCP + targetPort: 30400 + selector: + nodeSpecUniqueStr: druid-tiny-cluster-routers + metadata: + name: router-%s-service + - spec: + type: ClusterIP + clusterIP: None + nodeConfigMountPath: "/opt/druid/conf/druid/cluster/query/router" + replicas: 1 + runtime.properties: | + druid.service=druid/router + + # HTTP proxy + druid.router.http.numConnections=50 + druid.router.http.readTimeout=PT5M + druid.router.http.numMaxThreads=100 + druid.server.http.numThreads=100 + + # Service discovery + druid.router.defaultBrokerServiceName=druid/broker + druid.router.coordinatorServiceName=druid/coordinator + + # Management proxy to coordinator / overlord: required for unified web console. + druid.router.managementProxy.enabled=true diff --git a/integration-tests/k8s/tiny-cluster.yaml b/integration-tests/k8s/druid-sanity-it-cluster.yaml similarity index 100% rename from integration-tests/k8s/tiny-cluster.yaml rename to integration-tests/k8s/druid-sanity-it-cluster.yaml diff --git a/integration-tests/k8s/postgres.yaml b/integration-tests/k8s/postgres.yaml new file mode 100644 index 000000000000..cba5c379f23d --- /dev/null +++ b/integration-tests/k8s/postgres.yaml @@ -0,0 +1,61 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +apiVersion: v1 +kind: Service +metadata: + name: postgres + labels: + app: postgres +spec: + ports: + - port: 5432 + name: postgres + selector: + app: postgres +--- +apiVersion: apps/v1 +kind: StatefulSet +metadata: + name: postgres +spec: + selector: + matchLabels: + app: postgres + serviceName: postgres + replicas: 1 + template: + metadata: + annotations: + app: postgres + labels: + app: postgres + spec: + volumes: + - name: data-volume + emptyDir: {} + containers: + - image: postgres:latest + name: postgres + env: + - name: POSTGRES_DB + value: druid + - name: POSTGRES_USER + value: druid + - name: POSTGRES_PASSWORD + value: druid + volumeMounts: + - mountPath: /druid/data + name: data-volume diff --git a/integration-tests/pom.xml b/integration-tests/pom.xml index ed6b90f5fb40..324c678eb6ff 100644 --- a/integration-tests/pom.xml +++ b/integration-tests/pom.xml @@ -382,6 +382,16 @@ + + io.kubernetes + client-java + ${kubernetes.client.version} + + + io.kubernetes + client-java-api + ${kubernetes.client.version} + io.confluent kafka-protobuf-provider diff --git a/integration-tests/script/build_run_k8s_cluster.sh b/integration-tests/script/build_run_k8s_cluster.sh index c2ac8e5fea62..13ad55b83e93 100755 --- a/integration-tests/script/build_run_k8s_cluster.sh +++ b/integration-tests/script/build_run_k8s_cluster.sh @@ -35,6 +35,6 @@ if ($BUILD_DRUID_CLSUTER); then bash $DRUID_HOME/integration-tests/script/setup_druid_operator_on_k8s.sh echo "Start to setup druid on k8s" - bash $DRUID_HOME/integration-tests/script/setup_druid_on_k8s.sh + bash $DRUID_HOME/integration-tests/script/setup_druid_on_k8s.sh $DRUID_K8S_CLUSTER_SPEC fi diff --git a/integration-tests/script/setup_druid_on_k8s.sh b/integration-tests/script/setup_druid_on_k8s.sh index f21f5613b46d..42328e6c1fe6 100755 --- a/integration-tests/script/setup_druid_on_k8s.sh +++ b/integration-tests/script/setup_druid_on_k8s.sh @@ -16,7 +16,11 @@ set -e -export KUBECTL="sudo /usr/local/bin/kubectl" +export KUBECTL="/usr/local/bin/kubectl" + +DRUID_CLUSTER_SPEC_YAML=$1 + +echo "Druid Cluster Spec ${DRUID_CLUSTER_SPEC_YAML}" # setup client keystore cd integration-tests @@ -40,9 +44,13 @@ sudo rm -rf tmp mkdir tmp chmod 777 tmp +# Technically postgres isn't needed in all tests i.e. where derby is used, but ok. +$KUBECTL apply -f integration-tests/k8s/postgres.yaml + +# spec name needs to come from argument for high availability test $KUBECTL apply -f integration-tests/k8s/role-and-binding.yaml -sed -i "s|REPLACE_VOLUMES|`pwd`|g" integration-tests/k8s/tiny-cluster.yaml -$KUBECTL apply -f integration-tests/k8s/tiny-cluster.yaml +sed -i.bak "s|REPLACE_VOLUMES|`pwd`|g" ${DRUID_CLUSTER_SPEC_YAML} +$KUBECTL apply -f ${DRUID_CLUSTER_SPEC_YAML} # Wait a bit sleep 180 @@ -51,3 +59,14 @@ sleep 180 $KUBECTL get pod $KUBECTL get svc + +# Temporary debug info +$KUBECTL describe po postgres-0 +$KUBECTL describe svc postgres +$KUBECTL get endpoints postgres + +for v in postgres-0 druid-tiny-cluster-coordinator1-0 druid-tiny-cluster-coordinator2-0 druid-tiny-cluster-overlord1-0 druid-tiny-cluster-overlord2-0 ; do + echo "\n\n####### Printing Logs for $v #########" + $KUBECTL logs --tail 1000 $v + echo "####### Printed Logs for $v #########\n\n" +done \ No newline at end of file diff --git a/integration-tests/script/setup_druid_operator_on_k8s.sh b/integration-tests/script/setup_druid_operator_on_k8s.sh index d99ec6a2bf65..6e19ca45b63e 100755 --- a/integration-tests/script/setup_druid_operator_on_k8s.sh +++ b/integration-tests/script/setup_druid_operator_on_k8s.sh @@ -17,7 +17,7 @@ set -e export DRUID_OPERATOR_VERSION=0.0.3 -export KUBECTL="sudo /usr/local/bin/kubectl" +export KUBECTL="/usr/local/bin/kubectl" # Prepare For Druid-Operator diff --git a/integration-tests/script/setup_k8s_cluster.sh b/integration-tests/script/setup_k8s_cluster.sh index 3313def4bca3..faff9d0f8bdb 100755 --- a/integration-tests/script/setup_k8s_cluster.sh +++ b/integration-tests/script/setup_k8s_cluster.sh @@ -26,9 +26,9 @@ export KUBECONFIG=$HOME/.kube/config sudo apt install -y conntrack # Lacunch K8S cluster -curl -Lo kubectl https://storage.googleapis.com/kubernetes-release/release/v1.18.1/bin/linux/amd64/kubectl && chmod +x kubectl && sudo mv kubectl /usr/local/bin/ -curl -Lo minikube https://storage.googleapis.com/minikube/releases/v1.8.1/minikube-linux-amd64 && chmod +x minikube && sudo mv minikube /usr/local/bin/ -sudo /usr/local/bin/minikube start --profile=minikube --vm-driver=none --kubernetes-version=v1.18.1 -sudo /usr/local/bin/minikube update-context +curl -Lo kubectl https://storage.googleapis.com/kubernetes-release/release/v1.18.1/bin/linux/amd64/kubectl && chmod 755 kubectl && sudo mv kubectl /usr/local/bin/ +curl -Lo minikube https://storage.googleapis.com/minikube/releases/v1.20.0/minikube-linux-amd64 && chmod 755 minikube && sudo mv minikube /usr/local/bin/ +sudo /usr/local/bin/minikube start --profile=minikube --vm-driver=none --kubernetes-version=v1.19.8 +/usr/local/bin/minikube update-context echo "Setup K8S Cluster Done!" diff --git a/integration-tests/src/main/java/org/apache/druid/testing/ConfigFileConfigProvider.java b/integration-tests/src/main/java/org/apache/druid/testing/ConfigFileConfigProvider.java index e39c630cf287..058a5727a576 100644 --- a/integration-tests/src/main/java/org/apache/druid/testing/ConfigFileConfigProvider.java +++ b/integration-tests/src/main/java/org/apache/druid/testing/ConfigFileConfigProvider.java @@ -25,6 +25,7 @@ import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.jackson.JacksonUtils; import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.testing.guice.DruidTestModule; import java.io.File; import java.io.IOException; @@ -34,12 +35,18 @@ public class ConfigFileConfigProvider implements IntegrationTestingConfigProvide { private static final Logger LOG = new Logger(ConfigFileConfigProvider.class); private String routerHost; + private String routerInternalHost; private String brokerHost; + private String brokerInternalHost; private String historicalHost; private String coordinatorHost; + private String coordinatorInternalHost; private String coordinatorTwoHost; + private String coordinatorTwoInternalHost; private String overlordHost; + private String overlordInternalHost; private String overlordTwoHost; + private String overlordTwoInternalHost; private String routerUrl; private String brokerUrl; private String historicalUrl; @@ -76,6 +83,8 @@ public class ConfigFileConfigProvider implements IntegrationTestingConfigProvide private String hadoopGcsCredentialsPath; private String azureKey; private String streamEndpoint; + private DruidTestModule.DruidDeploymentEnvType druidDeploymentEnvType; + private String druidClusterHost; @JsonCreator ConfigFileConfigProvider(@JsonProperty("configFile") String configFile) @@ -108,6 +117,12 @@ private void loadProperties(String configFile) routerTLSUrl = StringUtils.format("https://%s:%s", routerHost, props.get("router_tls_port")); } } + + routerInternalHost = props.get("router_internal_host"); + if (routerInternalHost == null) { + routerInternalHost = routerHost; + } + permissiveRouterUrl = props.get("router_permissive_url"); if (permissiveRouterUrl == null) { String permissiveRouterHost = props.get("router_permissive_host"); @@ -163,6 +178,11 @@ private void loadProperties(String configFile) } } + brokerInternalHost = props.get("broker_internal_host"); + if (brokerInternalHost == null) { + brokerInternalHost = brokerHost; + } + historicalHost = props.get("historical_host"); historicalUrl = props.get("historical_url"); if (historicalUrl == null) { @@ -187,17 +207,11 @@ private void loadProperties(String configFile) } } - overlordHost = props.get("indexer_host"); - overlordUrl = props.get("indexer_url"); - if (overlordUrl == null) { - overlordUrl = StringUtils.format("http://%s:%s", overlordHost, props.get("indexer_port")); - } - overlordTLSUrl = props.get("indexer_tls_url"); - if (overlordTLSUrl == null) { - if (null != overlordHost) { - overlordTLSUrl = StringUtils.format("https://%s:%s", overlordHost, props.get("indexer_tls_port")); - } + coordinatorInternalHost = props.get("coordinator_internal_host"); + if (coordinatorInternalHost == null) { + coordinatorInternalHost = coordinatorHost; } + coordinatorTwoHost = props.get("coordinator_two_host"); coordinatorTwoUrl = props.get("coordinator_two_url"); if (coordinatorTwoUrl == null) { @@ -210,6 +224,28 @@ private void loadProperties(String configFile) } } + coordinatorTwoInternalHost = props.get("coordinator_two_internal_host"); + if (coordinatorTwoInternalHost == null) { + coordinatorTwoInternalHost = coordinatorTwoHost; + } + + overlordHost = props.get("indexer_host"); + overlordUrl = props.get("indexer_url"); + if (overlordUrl == null) { + overlordUrl = StringUtils.format("http://%s:%s", overlordHost, props.get("indexer_port")); + } + overlordTLSUrl = props.get("indexer_tls_url"); + if (overlordTLSUrl == null) { + if (null != overlordHost) { + overlordTLSUrl = StringUtils.format("https://%s:%s", overlordHost, props.get("indexer_tls_port")); + } + } + + overlordInternalHost = props.get("overlord_internal_host"); + if (overlordInternalHost == null) { + overlordInternalHost = overlordHost; + } + overlordTwoHost = props.get("overlord_two_host"); overlordTwoUrl = props.get("overlord_two_url"); if (overlordTwoUrl == null) { @@ -221,9 +257,23 @@ private void loadProperties(String configFile) overlordTwoTLSUrl = StringUtils.format("https://%s:%s", overlordTwoHost, props.get("overlord_two_tls_port")); } } - + + overlordTwoInternalHost = props.get("overlord_two_internal_host"); + if (overlordTwoInternalHost == null) { + overlordTwoInternalHost = overlordTwoHost; + } + middleManagerHost = props.get("middlemanager_host"); + String druidDeploymentEnvTypeStr = props.get("druid_deployment_env_type"); + if (druidDeploymentEnvTypeStr != null) { + druidDeploymentEnvType = DruidTestModule.DruidDeploymentEnvType.valueOf(druidDeploymentEnvTypeStr); + } else { + druidDeploymentEnvType = DruidTestModule.DruidDeploymentEnvType.UNKNOWN; + } + + druidClusterHost = props.get("druid_cluster_host"); + zookeeperHosts = props.get("zookeeper_hosts"); kafkaHost = props.get("kafka_host") + ":" + props.get("kafka_port"); schemaRegistryHost = props.get("schema_registry_host") + ":" + props.get("schema_registry_port"); @@ -424,36 +474,78 @@ public String getBrokerHost() return brokerHost; } + @Override + public String getBrokerInternalHost() + { + return brokerInternalHost; + } + @Override public String getRouterHost() { return routerHost; } + @Override + public String getRouterInternalHost() + { + return routerInternalHost; + } + @Override public String getCoordinatorHost() { return coordinatorHost; } + @Override + public String getCoordinatorInternalHost() + { + return coordinatorInternalHost; + } + @Override public String getCoordinatorTwoHost() { return coordinatorTwoHost; } + @Override + public String getCoordinatorTwoInternalHost() + { + return coordinatorTwoInternalHost; + } + @Override public String getOverlordHost() { return overlordHost; } + @Override + public String getOverlordInternalHost() + { + return overlordInternalHost; + } + @Override public String getOverlordTwoHost() { return overlordTwoHost; } + @Override + public String getOverlordTwoInternalHost() + { + return overlordTwoInternalHost; + } + + @Override + public DruidTestModule.DruidDeploymentEnvType getDruidDeploymentEnvType() + { + return druidDeploymentEnvType; + } + @Override public String getProperty(String keyword) { @@ -551,9 +643,9 @@ public String getExtraDatasourceNameSuffix() } @Override - public boolean isDocker() + public String getDruidClusterHost() { - return false; + return druidClusterHost; } }; } diff --git a/integration-tests/src/main/java/org/apache/druid/testing/DockerConfigProvider.java b/integration-tests/src/main/java/org/apache/druid/testing/DockerConfigProvider.java index 0e3a55d25060..73e370e3ae39 100644 --- a/integration-tests/src/main/java/org/apache/druid/testing/DockerConfigProvider.java +++ b/integration-tests/src/main/java/org/apache/druid/testing/DockerConfigProvider.java @@ -25,6 +25,7 @@ import com.fasterxml.jackson.databind.DeserializationContext; import com.fasterxml.jackson.databind.JsonDeserializer; import com.fasterxml.jackson.databind.annotation.JsonDeserialize; +import org.apache.druid.testing.guice.DruidTestModule; import javax.annotation.Nullable; import javax.validation.constraints.NotNull; @@ -345,6 +346,12 @@ public String getSchemaRegistryInternalHost() return "schema-registry:8085"; } + @Override + public DruidTestModule.DruidDeploymentEnvType getDruidDeploymentEnvType() + { + return DruidTestModule.DruidDeploymentEnvType.DOCKER; + } + @Override public String getProperty(String prop) { @@ -435,15 +442,9 @@ public String getStreamEndpoint() return streamEndpoint; } - @Override - public boolean isDocker() - { - return true; - } - @Override @Nullable - public String getDockerHost() + public String getDruidClusterHost() { return dockerIp; } diff --git a/integration-tests/src/main/java/org/apache/druid/testing/IntegrationTestingConfig.java b/integration-tests/src/main/java/org/apache/druid/testing/IntegrationTestingConfig.java index 25d958018ec4..a04f654c37cd 100644 --- a/integration-tests/src/main/java/org/apache/druid/testing/IntegrationTestingConfig.java +++ b/integration-tests/src/main/java/org/apache/druid/testing/IntegrationTestingConfig.java @@ -19,6 +19,8 @@ package org.apache.druid.testing; +import org.apache.druid.testing.guice.DruidTestModule; + import javax.annotation.Nullable; import java.util.Map; @@ -96,6 +98,11 @@ default String getHistoricalInternalHost() return getHistoricalHost(); } + default DruidTestModule.DruidDeploymentEnvType getDruidDeploymentEnvType() + { + return DruidTestModule.DruidDeploymentEnvType.UNKNOWN; + } + String getCoordinatorUrl(); String getCoordinatorTLSUrl(); @@ -177,11 +184,6 @@ default String getSchemaRegistryInternalHost() return getSchemaRegistryHost(); } - boolean isDocker(); - @Nullable - default String getDockerHost() - { - return null; - } + String getDruidClusterHost(); } diff --git a/integration-tests/src/main/java/org/apache/druid/testing/guice/DruidTestModule.java b/integration-tests/src/main/java/org/apache/druid/testing/guice/DruidTestModule.java index 1e239ae57dbe..2965c46bd339 100644 --- a/integration-tests/src/main/java/org/apache/druid/testing/guice/DruidTestModule.java +++ b/integration-tests/src/main/java/org/apache/druid/testing/guice/DruidTestModule.java @@ -40,11 +40,21 @@ import org.apache.druid.testing.IntegrationTestingConfig; import org.apache.druid.testing.IntegrationTestingConfigProvider; import org.apache.druid.testing.IntegrationTestingCuratorConfig; +import org.apache.druid.testing.utils.AbstractDruidClusterAdminClient; +import org.apache.druid.testing.utils.DockerDruidClusterAdminClient; +import org.apache.druid.testing.utils.K8sDruidClusterAdminClient; /** */ public class DruidTestModule implements Module { + public enum DruidDeploymentEnvType + { + K8S, + DOCKER, + UNKNOWN + } + @Override public void configure(Binder binder) { @@ -82,4 +92,19 @@ public ServiceEmitter getServiceEmitter(Supplier config, O { return new ServiceEmitter("", "", new LoggingEmitter(config.get(), jsonMapper)); } + + @Provides + public AbstractDruidClusterAdminClient getDruidClusterAdminClient( + ObjectMapper jsonMapper, + @TestClient HttpClient httpClient, + IntegrationTestingConfig config + ) + { + DruidDeploymentEnvType envType = config.getDruidDeploymentEnvType(); + if (envType == DruidDeploymentEnvType.K8S) { + return new K8sDruidClusterAdminClient(jsonMapper, httpClient, config); + } else { + return new DockerDruidClusterAdminClient(jsonMapper, httpClient, config); + } + } } diff --git a/integration-tests/src/main/java/org/apache/druid/testing/utils/DruidClusterAdminClient.java b/integration-tests/src/main/java/org/apache/druid/testing/utils/AbstractDruidClusterAdminClient.java similarity index 91% rename from integration-tests/src/main/java/org/apache/druid/testing/utils/DruidClusterAdminClient.java rename to integration-tests/src/main/java/org/apache/druid/testing/utils/AbstractDruidClusterAdminClient.java index d0f2bbe9ed55..2d30465f5810 100644 --- a/integration-tests/src/main/java/org/apache/druid/testing/utils/DruidClusterAdminClient.java +++ b/integration-tests/src/main/java/org/apache/druid/testing/utils/AbstractDruidClusterAdminClient.java @@ -50,8 +50,10 @@ import java.util.List; import java.util.Optional; -public class DruidClusterAdminClient +public abstract class AbstractDruidClusterAdminClient { + private static final Logger LOG = new Logger(AbstractDruidClusterAdminClient.class); + public static final String COORDINATOR_DOCKER_CONTAINER_NAME = "/druid-coordinator"; public static final String COORDINATOR_TWO_DOCKER_CONTAINER_NAME = "/druid-coordinator-two"; public static final String HISTORICAL_DOCKER_CONTAINER_NAME = "/druid-historical"; @@ -61,14 +63,12 @@ public class DruidClusterAdminClient public static final String ROUTER_DOCKER_CONTAINER_NAME = "/druid-router"; public static final String MIDDLEMANAGER_DOCKER_CONTAINER_NAME = "/druid-middlemanager"; - private static final Logger LOG = new Logger(DruidClusterAdminClient.class); - private final ObjectMapper jsonMapper; private final HttpClient httpClient; private IntegrationTestingConfig config; @Inject - DruidClusterAdminClient( + AbstractDruidClusterAdminClient( ObjectMapper jsonMapper, @TestClient HttpClient httpClient, IntegrationTestingConfig config @@ -79,45 +79,21 @@ public class DruidClusterAdminClient this.config = config; } - public void restartCoordinatorContainer() - { - restartDockerContainer(COORDINATOR_DOCKER_CONTAINER_NAME); - } + public abstract void restartCoordinatorContainer(); - public void restartCoordinatorTwoContainer() - { - restartDockerContainer(COORDINATOR_TWO_DOCKER_CONTAINER_NAME); - } + public abstract void restartCoordinatorTwoContainer(); - public void restartHistoricalContainer() - { - restartDockerContainer(HISTORICAL_DOCKER_CONTAINER_NAME); - } + public abstract void restartHistoricalContainer(); - public void restartOverlordContainer() - { - restartDockerContainer(OVERLORD_DOCKER_CONTAINER_NAME); - } + public abstract void restartOverlordContainer(); - public void restartOverlordTwoContainer() - { - restartDockerContainer(OVERLORD_TWO_DOCKER_CONTAINER_NAME); - } + public abstract void restartOverlordTwoContainer(); - public void restartBrokerContainer() - { - restartDockerContainer(BROKER_DOCKER_CONTAINER_NAME); - } + public abstract void restartBrokerContainer(); - public void restartRouterContainer() - { - restartDockerContainer(ROUTER_DOCKER_CONTAINER_NAME); - } + public abstract void restartRouterContainer(); - public void restartMiddleManagerContainer() - { - restartDockerContainer(MIDDLEMANAGER_DOCKER_CONTAINER_NAME); - } + public abstract void restartMiddleManagerContainer(); public void waitUntilCoordinatorReady() { @@ -256,7 +232,7 @@ private String findDockerContainer(DockerClient dockerClient, String serviceName } return containerName.get(); } - + private void waitUntilInstanceReady(final String host) { ITRetryUtil.retryUntilTrue( diff --git a/integration-tests/src/main/java/org/apache/druid/testing/utils/AbstractTestQueryHelper.java b/integration-tests/src/main/java/org/apache/druid/testing/utils/AbstractTestQueryHelper.java index fce45a8cb7ce..27cff5b891c3 100644 --- a/integration-tests/src/main/java/org/apache/druid/testing/utils/AbstractTestQueryHelper.java +++ b/integration-tests/src/main/java/org/apache/druid/testing/utils/AbstractTestQueryHelper.java @@ -110,7 +110,7 @@ public void testQueriesFromString(String str) throws Exception public void testQueriesFromFile(String url, String filePath) throws Exception { - LOG.info("Starting query tests for [%s]", filePath); + LOG.debug("Starting query tests for [%s]", filePath); List queries = jsonMapper.readValue( TestQueryHelper.class.getResourceAsStream(filePath), @@ -124,7 +124,7 @@ public void testQueriesFromFile(String url, String filePath) throws Exception public void testQueriesFromString(String url, String str) throws Exception { - LOG.info("Starting query tests using\n%s", str); + LOG.debug("Starting query tests using\n%s", str); List queries = jsonMapper.readValue( str, @@ -137,11 +137,11 @@ public void testQueriesFromString(String url, String str) throws Exception private void testQueries(String url, List queries) throws Exception { - LOG.info("Running queries, url [%s]", url); + LOG.debug("Running queries, url [%s]", url); boolean failed = false; for (QueryResultType queryWithResult : queries) { - LOG.info("Running Query %s", queryWithResult.getQuery()); + LOG.debug("Running Query %s", queryWithResult.getQuery()); List> result = queryClient.query(url, queryWithResult.getQuery()); if (!QueryResultVerifier.compareResults(result, queryWithResult.getExpectedResults(), queryWithResult.getFieldsToTest() @@ -154,7 +154,7 @@ private void testQueries(String url, List queries) throws Excep ); failed = true; } else { - LOG.info("Results Verified for Query %s", queryWithResult.getQuery()); + LOG.debug("Results Verified for Query %s", queryWithResult.getQuery()); } } diff --git a/integration-tests/src/main/java/org/apache/druid/testing/utils/DockerDruidClusterAdminClient.java b/integration-tests/src/main/java/org/apache/druid/testing/utils/DockerDruidClusterAdminClient.java new file mode 100644 index 000000000000..6b0e5f525d4f --- /dev/null +++ b/integration-tests/src/main/java/org/apache/druid/testing/utils/DockerDruidClusterAdminClient.java @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.testing.utils; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.github.dockerjava.api.DockerClient; +import com.github.dockerjava.api.model.Container; +import com.github.dockerjava.core.DockerClientBuilder; +import com.github.dockerjava.netty.NettyDockerCmdExecFactory; +import com.google.inject.Inject; +import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.java.util.http.client.HttpClient; +import org.apache.druid.testing.IntegrationTestingConfig; +import org.apache.druid.testing.guice.TestClient; + +import java.util.Arrays; +import java.util.List; +import java.util.Optional; + +public class DockerDruidClusterAdminClient extends AbstractDruidClusterAdminClient +{ + private static final Logger LOG = new Logger(DockerDruidClusterAdminClient.class); + private static final String COORDINATOR_DOCKER_CONTAINER_NAME = "/druid-coordinator"; + private static final String COORDINATOR_TWO_DOCKER_CONTAINER_NAME = "/druid-coordinator-two"; + private static final String HISTORICAL_DOCKER_CONTAINER_NAME = "/druid-historical"; + private static final String OVERLORD_DOCKER_CONTAINER_NAME = "/druid-overlord"; + private static final String OVERLORD_TWO_DOCKER_CONTAINER_NAME = "/druid-overlord-two"; + private static final String BROKER_DOCKER_CONTAINER_NAME = "/druid-broker"; + private static final String ROUTER_DOCKER_CONTAINER_NAME = "/druid-router"; + private static final String MIDDLEMANAGER_DOCKER_CONTAINER_NAME = "/druid-middlemanager"; + + @Inject + public DockerDruidClusterAdminClient( + ObjectMapper jsonMapper, + @TestClient HttpClient httpClient, + IntegrationTestingConfig config + ) + { + super(jsonMapper, httpClient, config); + } + + @Override + public void restartCoordinatorContainer() + { + restartDockerContainer(COORDINATOR_DOCKER_CONTAINER_NAME); + } + + @Override + public void restartCoordinatorTwoContainer() + { + restartDockerContainer(COORDINATOR_TWO_DOCKER_CONTAINER_NAME); + } + + @Override + public void restartHistoricalContainer() + { + restartDockerContainer(HISTORICAL_DOCKER_CONTAINER_NAME); + } + + @Override + public void restartOverlordContainer() + { + restartDockerContainer(OVERLORD_DOCKER_CONTAINER_NAME); + } + + @Override + public void restartOverlordTwoContainer() + { + restartDockerContainer(OVERLORD_TWO_DOCKER_CONTAINER_NAME); + } + + @Override + public void restartBrokerContainer() + { + restartDockerContainer(BROKER_DOCKER_CONTAINER_NAME); + } + + @Override + public void restartRouterContainer() + { + restartDockerContainer(ROUTER_DOCKER_CONTAINER_NAME); + } + + @Override + public void restartMiddleManagerContainer() + { + restartDockerContainer(MIDDLEMANAGER_DOCKER_CONTAINER_NAME); + } + + private void restartDockerContainer(String serviceName) + { + DockerClient dockerClient = DockerClientBuilder.getInstance() + .withDockerCmdExecFactory((new NettyDockerCmdExecFactory()) + .withConnectTimeout(10 * 1000)) + .build(); + List containers = dockerClient.listContainersCmd().exec(); + Optional containerName = containers.stream() + .filter(container -> Arrays.asList(container.getNames()).contains(serviceName)) + .findFirst() + .map(container -> container.getId()); + + if (!containerName.isPresent()) { + LOG.error("Cannot find docker container for " + serviceName); + throw new ISE("Cannot find docker container for " + serviceName); + } + dockerClient.restartContainerCmd(containerName.get()).exec(); + } +} diff --git a/integration-tests/src/main/java/org/apache/druid/testing/utils/K8sDruidClusterAdminClient.java b/integration-tests/src/main/java/org/apache/druid/testing/utils/K8sDruidClusterAdminClient.java new file mode 100644 index 000000000000..6b6ea16148e1 --- /dev/null +++ b/integration-tests/src/main/java/org/apache/druid/testing/utils/K8sDruidClusterAdminClient.java @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.testing.utils; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.inject.Inject; +import io.kubernetes.client.openapi.ApiClient; +import io.kubernetes.client.openapi.ApiException; +import io.kubernetes.client.openapi.apis.CoreV1Api; +import io.kubernetes.client.openapi.models.V1Pod; +import io.kubernetes.client.util.Config; +import okhttp3.Protocol; +import org.apache.druid.java.util.common.RE; +import org.apache.druid.java.util.common.RetryUtils; +import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.java.util.http.client.HttpClient; +import org.apache.druid.testing.IntegrationTestingConfig; +import org.apache.druid.testing.guice.TestClient; + +import java.io.IOException; +import java.net.HttpURLConnection; +import java.util.Collections; + +public class K8sDruidClusterAdminClient extends AbstractDruidClusterAdminClient +{ + private static final Logger LOG = new Logger(K8sDruidClusterAdminClient.class); + + private static final String NAMESPACE = "default"; + private static final String COORDINATOR_POD_NAME = "druid-tiny-cluster-coordinator1-0"; + private static final String COORDINATOR_TWO_POD_NAME = "druid-tiny-cluster-coordinator2-0"; + private static final String HISTORICAL_POD_NAME = "druid-tiny-cluster-historicals-0"; + private static final String OVERLORD_POD_NAME = "druid-tiny-cluster-overlord1-0"; + private static final String OVERLORD_TWO_POD_NAME = "druid-tiny-cluster-overlord2-0"; + private static final String BROKER_POD_NAME = "druid-tiny-cluster-brokers-0"; + private static final String ROUTER_POD_NAME = "druid-tiny-cluster-routers-0"; + private static final String MIDDLEMANAGER_POD_NAME = "druid-tiny-cluster-middlemanagers-0"; + + private final CoreV1Api k8sClient; + + @Inject + public K8sDruidClusterAdminClient( + ObjectMapper jsonMapper, + @TestClient HttpClient httpClient, + IntegrationTestingConfig config + ) + { + super(jsonMapper, httpClient, config); + + try { + ApiClient k8sApiClient = Config.defaultClient(); + k8sApiClient.setHttpClient( + k8sApiClient + .getHttpClient() + .newBuilder() + .protocols(Collections.singletonList((Protocol.HTTP_1_1))) + .build() + ); + this.k8sClient = new CoreV1Api(k8sApiClient); + } + catch (IOException ex) { + throw new RE(ex, "Failed to create K8s ApiClient instance"); + } + } + + @Override + public void restartCoordinatorContainer() + { + restartPod(COORDINATOR_POD_NAME); + } + + @Override + public void restartCoordinatorTwoContainer() + { + restartPod(COORDINATOR_TWO_POD_NAME); + } + + @Override + public void restartHistoricalContainer() + { + restartPod(HISTORICAL_POD_NAME); + } + + @Override + public void restartOverlordContainer() + { + restartPod(OVERLORD_POD_NAME); + } + + @Override + public void restartOverlordTwoContainer() + { + restartPod(OVERLORD_TWO_POD_NAME); + } + + @Override + public void restartBrokerContainer() + { + restartPod(BROKER_POD_NAME); + } + + @Override + public void restartRouterContainer() + { + restartPod(ROUTER_POD_NAME); + } + + @Override + public void restartMiddleManagerContainer() + { + restartPod(MIDDLEMANAGER_POD_NAME); + } + + private void restartPod(String podName) + { + try { + // We only need to delete the pod, k8s StatefulSet controller will automatically recreate it. + V1Pod prevPod = RetryUtils.retry( + () -> k8sClient.deleteNamespacedPod( + podName, + NAMESPACE, + null, + null, + null, + null, + null, + null + ), + (Throwable th) -> true, + 3 + ); + + // Wait for previous pod to terminate and new pod to come up + RetryUtils.retry( + () -> { + V1Pod newPod = getPod(podName); + return newPod != null && + !newPod.getMetadata().getResourceVersion().equals(prevPod.getMetadata().getResourceVersion()); + }, + (Throwable th) -> true, + 10 + ); + + LOG.info("Restarted Pod [%s].", podName); + } + catch (Exception ex) { + throw new RE(ex, "Failed to delete pod [%s]", podName); + } + } + + private V1Pod getPod(String podName) + { + try { + return k8sClient.readNamespacedPod( + podName, + NAMESPACE, + null, + null, + null + ); + } + catch (ApiException ex) { + if (ex.getCode() != HttpURLConnection.HTTP_NOT_FOUND) { + throw new RE(ex, "Failed to get pod [%s]", podName); + } else { + return null; + } + } + } +} diff --git a/integration-tests/src/main/java/org/apache/druid/testing/utils/SuiteListener.java b/integration-tests/src/main/java/org/apache/druid/testing/utils/SuiteListener.java index 6259156b03b2..5979f6af422a 100644 --- a/integration-tests/src/main/java/org/apache/druid/testing/utils/SuiteListener.java +++ b/integration-tests/src/main/java/org/apache/druid/testing/utils/SuiteListener.java @@ -36,7 +36,7 @@ public void onStart(ISuite suite) { Injector injector = DruidTestModuleFactory.getInjector(); IntegrationTestingConfig config = injector.getInstance(IntegrationTestingConfig.class); - DruidClusterAdminClient druidClusterAdminClient = injector.getInstance(DruidClusterAdminClient.class); + AbstractDruidClusterAdminClient druidClusterAdminClient = injector.getInstance(AbstractDruidClusterAdminClient.class); druidClusterAdminClient.waitUntilCoordinatorReady(); druidClusterAdminClient.waitUntilIndexerReady(); diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractStreamIndexingTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractStreamIndexingTest.java index c118069a8fe9..a1c8d9b35ed0 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractStreamIndexingTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractStreamIndexingTest.java @@ -33,7 +33,7 @@ import org.apache.druid.segment.incremental.RowIngestionMetersTotals; import org.apache.druid.testing.IntegrationTestingConfig; import org.apache.druid.testing.clients.TaskResponseObject; -import org.apache.druid.testing.utils.DruidClusterAdminClient; +import org.apache.druid.testing.utils.AbstractDruidClusterAdminClient; import org.apache.druid.testing.utils.EventSerializer; import org.apache.druid.testing.utils.ITRetryUtil; import org.apache.druid.testing.utils.JsonEventSerializer; @@ -96,7 +96,7 @@ public abstract class AbstractStreamIndexingTest extends AbstractIndexerTest String.join("/", DATA_RESOURCE_ROOT, "json", INPUT_FORMAT_SPEC_DIR, "input_format.json"); @Inject - private DruidClusterAdminClient druidClusterAdminClient; + private AbstractDruidClusterAdminClient druidClusterAdminClient; private StreamAdminClient streamAdminClient; diff --git a/integration-tests/src/test/java/org/apache/druid/tests/leadership/ITHighAvailabilityTest.java b/integration-tests/src/test/java/org/apache/druid/tests/leadership/ITHighAvailabilityTest.java index 78e071b90a74..65d2e5a4b555 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/leadership/ITHighAvailabilityTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/leadership/ITHighAvailabilityTest.java @@ -19,28 +19,27 @@ package org.apache.druid.tests.leadership; +import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.collect.ImmutableList; import com.google.inject.Inject; import org.apache.druid.cli.CliCustomNodeRole; import org.apache.druid.common.config.NullHandling; -import org.apache.druid.curator.discovery.ServerDiscoveryFactory; -import org.apache.druid.discovery.DiscoveryDruidNode; -import org.apache.druid.discovery.DruidNodeDiscovery; -import org.apache.druid.discovery.DruidNodeDiscoveryProvider; import org.apache.druid.discovery.NodeRole; import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.common.RetryUtils; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.java.util.http.client.HttpClient; import org.apache.druid.java.util.http.client.Request; import org.apache.druid.java.util.http.client.response.StatusResponseHandler; import org.apache.druid.java.util.http.client.response.StatusResponseHolder; +import org.apache.druid.server.http.ClusterResource; import org.apache.druid.testing.IntegrationTestingConfig; import org.apache.druid.testing.clients.CoordinatorResourceTestClient; +import org.apache.druid.testing.guice.DruidTestModule; import org.apache.druid.testing.guice.DruidTestModuleFactory; import org.apache.druid.testing.guice.TestClient; -import org.apache.druid.testing.utils.DruidClusterAdminClient; +import org.apache.druid.testing.utils.AbstractDruidClusterAdminClient; import org.apache.druid.testing.utils.ITRetryUtil; import org.apache.druid.testing.utils.SqlTestQueryHelper; import org.apache.druid.tests.TestNGGroup; @@ -53,8 +52,10 @@ import java.net.MalformedURLException; import java.net.URL; +import java.util.ArrayList; import java.util.Collection; import java.util.List; +import java.util.Map; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; @@ -64,6 +65,7 @@ public class ITHighAvailabilityTest { private static final Logger LOG = new Logger(ITHighAvailabilityTest.class); private static final String SYSTEM_QUERIES_RESOURCE = "/queries/high_availability_sys.json"; + private static final String K8S_SYSTEM_QUERIES_RESOURCE = "/queries/k8s_high_availability_sys.json"; private static final int NUM_LEADERSHIP_SWAPS = 3; private static final int NUM_RETRIES = 120; @@ -73,13 +75,7 @@ public class ITHighAvailabilityTest private IntegrationTestingConfig config; @Inject - private DruidClusterAdminClient druidClusterAdminClient; - - @Inject - ServerDiscoveryFactory factory; - - @Inject - DruidNodeDiscoveryProvider druidNodeDiscovery; + private AbstractDruidClusterAdminClient druidClusterAdminClient; @Inject CoordinatorResourceTestClient coordinatorClient; @@ -102,8 +98,13 @@ public void testLeadershipChanges() throws Exception String previousOverlordLeader = null; // fetch current leaders, make sure queries work, then swap leaders and do it again do { + LOG.info("%dth round of leader testing.", runCount); + String coordinatorLeader = getLeader("coordinator"); + LOG.info("Coordinator Leader previous[%s], current[%s]", previousCoordinatorLeader, coordinatorLeader); + String overlordLeader = getLeader("indexer"); + LOG.info("Overlord Leader previous[%s], current[%s]", previousOverlordLeader, overlordLeader); // we expect leadership swap to happen Assert.assertNotEquals(previousCoordinatorLeader, coordinatorLeader); @@ -114,13 +115,25 @@ public void testLeadershipChanges() throws Exception String queries = fillTemplate( config, - AbstractIndexerTest.getResourceAsString(SYSTEM_QUERIES_RESOURCE), + AbstractIndexerTest.getResourceAsString( + config.getDruidDeploymentEnvType() == DruidTestModule.DruidDeploymentEnvType.K8S ? + K8S_SYSTEM_QUERIES_RESOURCE : SYSTEM_QUERIES_RESOURCE + ), overlordLeader, coordinatorLeader ); - queryHelper.testQueriesFromString(queries); + + RetryUtils.retry( + () -> { + queryHelper.testQueriesFromString(queries); + return true; + }, + (Throwable th) -> true, + 10 + ); swapLeadersAndWait(coordinatorLeader, overlordLeader); + LOG.info("Leaders swapped."); } while (runCount++ < NUM_LEADERSHIP_SWAPS); } @@ -130,22 +143,18 @@ public void testDiscoveryAndSelfDiscovery() ITRetryUtil.retryUntil( () -> { try { - List disco = ImmutableList.of( - druidNodeDiscovery.getForNodeRole(NodeRole.COORDINATOR), - druidNodeDiscovery.getForNodeRole(NodeRole.OVERLORD), - druidNodeDiscovery.getForNodeRole(NodeRole.HISTORICAL), - druidNodeDiscovery.getForNodeRole(NodeRole.MIDDLE_MANAGER), - druidNodeDiscovery.getForNodeRole(NodeRole.INDEXER), - druidNodeDiscovery.getForNodeRole(NodeRole.BROKER), - druidNodeDiscovery.getForNodeRole(NodeRole.ROUTER) - ); - - int servicesDiscovered = 0; - for (DruidNodeDiscovery nodeRole : disco) { - Collection nodes = nodeRole.getAllNodes(); - servicesDiscovered += testSelfDiscovery(nodes); + Map> clusterNodes = getClusterNodes(); + if (clusterNodes.get(NodeRole.COORDINATOR.getJsonName()).size() < 2 || + clusterNodes.get(NodeRole.OVERLORD.getJsonName()).size() < 2 || + clusterNodes.get(NodeRole.BROKER.getJsonName()).size() < 1 || + clusterNodes.get(NodeRole.ROUTER.getJsonName()).size() < 1) { + return false; } - return servicesDiscovered > 5; + + List allNodes = new ArrayList<>(); + clusterNodes.values().forEach((nodes) -> allNodes.addAll(nodes)); + + return allNodes.size() == testSelfDiscovery(allNodes); } catch (Throwable t) { return false; @@ -161,12 +170,15 @@ public void testDiscoveryAndSelfDiscovery() @Test public void testCustomDiscovery() { + if (config.getDruidDeploymentEnvType() == DruidTestModule.DruidDeploymentEnvType.K8S) { + // Custom NodeRole is not deployed in k8s environment just yet + return; + } + ITRetryUtil.retryUntil( () -> { try { - DruidNodeDiscovery customDisco = - druidNodeDiscovery.getForNodeRole(new NodeRole(CliCustomNodeRole.SERVICE_NAME)); - int count = testSelfDiscovery(customDisco.getAllNodes()); + int count = testSelfDiscovery(getClusterNodes(CliCustomNodeRole.SERVICE_NAME)); return count > 0; } catch (Throwable t) { @@ -180,16 +192,19 @@ public void testCustomDiscovery() ); } - private int testSelfDiscovery(Collection nodes) + private int testSelfDiscovery(Collection nodes) throws MalformedURLException, ExecutionException, InterruptedException { int count = 0; - for (DiscoveryDruidNode node : nodes) { + for (ClusterResource.Node node : nodes) { + String host = config.getDruidDeploymentEnvType() == DruidTestModule.DruidDeploymentEnvType.UNKNOWN ? + node.getHost() : config.getDruidClusterHost(); + final String location = StringUtils.format( "http://%s:%s/status/selfDiscovered", - config.isDocker() ? config.getDockerHost() : node.getDruidNode().getHost(), - node.getDruidNode().getPlaintextPort() + host, + node.getPlaintextPort() ); LOG.info("testing self discovery %s", location); StatusResponseHolder response = httpClient.go( @@ -227,6 +242,23 @@ private void swapLeadersAndWait(String coordinatorLeader, String overlordLeader) } private String getLeader(String service) + { + try { + return RetryUtils.retry( + () -> tryGetLeader(service), + (Throwable t) -> true, + 5 + ); + } + catch (RuntimeException e) { + throw e; + } + catch (Exception e) { + throw new RuntimeException(e); + } + } + + private String tryGetLeader(String service) { try { StatusResponseHolder response = httpClient.go( @@ -256,6 +288,77 @@ private String getLeader(String service) } } + private Map> getClusterNodes() + { + try { + StatusResponseHolder response = httpClient.go( + new Request( + HttpMethod.GET, + new URL(StringUtils.format( + "%s/druid/coordinator/v1/cluster", + config.getRouterUrl() + )) + ), + StatusResponseHandler.getInstance() + ).get(); + + if (!response.getStatus().equals(HttpResponseStatus.OK)) { + throw new ISE( + "Error while fetching cluster nodes from[%s] status[%s] content[%s]", + config.getRouterUrl(), + response.getStatus(), + response.getContent() + ); + } + + return jsonMapper.readValue( + response.getContent(), + new TypeReference>>() + { + } + ); + } + catch (Exception e) { + throw new RuntimeException(e); + } + } + + private List getClusterNodes(String nodeRole) + { + try { + StatusResponseHolder response = httpClient.go( + new Request( + HttpMethod.GET, + new URL(StringUtils.format( + "%s/druid/coordinator/v1/cluster/%s", + config.getRouterUrl(), + nodeRole + )) + ), + StatusResponseHandler.getInstance() + ).get(); + + if (!response.getStatus().equals(HttpResponseStatus.OK)) { + throw new ISE( + "Error while fetching cluster nodes from[%s] status[%s] content[%s]", + config.getRouterUrl(), + response.getStatus(), + response.getContent() + ); + } + + return jsonMapper.readValue( + response.getContent(), + new TypeReference>() + { + } + ); + } + catch (Exception e) { + throw new RuntimeException(e); + } + } + private static String fillTemplate(IntegrationTestingConfig config, String template, String overlordLeader, String coordinatorLeader) { /* diff --git a/integration-tests/src/test/resources/queries/k8s_high_availability_sys.json b/integration-tests/src/test/resources/queries/k8s_high_availability_sys.json new file mode 100644 index 000000000000..f2943ce98f18 --- /dev/null +++ b/integration-tests/src/test/resources/queries/k8s_high_availability_sys.json @@ -0,0 +1,30 @@ +[ + { + "description": "query sys.servers to make sure all expected servers are available", + "query": { + "query": "SELECT host, server_type, is_leader FROM sys.servers ORDER BY host" + }, + "expectedResults": [ + {"host":"%%BROKER%%","server_type":"broker", "is_leader": %%NON_LEADER%%}, + {"host":"%%COORDINATOR_ONE%%","server_type":"coordinator", "is_leader": %%COORDINATOR_ONE_LEADER%%}, + {"host":"%%COORDINATOR_TWO%%","server_type":"coordinator", "is_leader": %%COORDINATOR_TWO_LEADER%%}, + {"host":"%%OVERLORD_ONE%%","server_type":"overlord", "is_leader": %%OVERLORD_ONE_LEADER%%}, + {"host":"%%OVERLORD_TWO%%","server_type":"overlord", "is_leader": %%OVERLORD_TWO_LEADER%%}, + {"host":"%%ROUTER%%","server_type":"router", "is_leader": %%NON_LEADER%%} + ] + }, + { + "description": "query sys.segments which is fed via coordinator data", + "query": { + "query": "SELECT datasource, count(*) FROM sys.segments WHERE datasource='wikipedia_editstream' OR datasource='twitterstream' GROUP BY 1 " + }, + "expectedResults": [] + }, + { + "description": "query sys.tasks which is fed via overlord", + "query": { + "query": "SELECT datasource, count(*) FROM sys.tasks WHERE datasource='wikipedia_editstream' OR datasource='twitterstream' GROUP BY 1 " + }, + "expectedResults": [] + } +] \ No newline at end of file diff --git a/licenses.yaml b/licenses.yaml index edf3b9cf51dd..b911a655c56c 100644 --- a/licenses.yaml +++ b/licenses.yaml @@ -850,7 +850,7 @@ name: kubernetes official java client license_category: binary module: extensions/druid-kubernetes-extensions license_name: Apache License version 2.0 -version: 10.0.1 +version: 12.0.0 libraries: - io.kubernetes: client-java @@ -860,7 +860,7 @@ name: kubernetes official java client api license_category: binary module: extensions/druid-kubernetes-extensions license_name: Apache License version 2.0 -version: 10.0.1 +version: 12.0.0 libraries: - io.kubernetes: client-java-api @@ -870,7 +870,7 @@ name: kubernetes official java client extended license_category: binary module: extensions/druid-kubernetes-extensions license_name: Apache License version 2.0 -version: 10.0.1 +version: 12.0.0 libraries: - io.kubernetes: client-java-extended @@ -880,7 +880,7 @@ name: io.prometheus simpleclient_common license_category: binary module: extensions/druid-kubernetes-extensions license_name: Apache License version 2.0 -version: 0.9.0 +version: 0.10.0 libraries: - io.prometheus: simpleclient_common @@ -920,7 +920,7 @@ name: io.gsonfire gson-fire license_category: binary module: extensions/druid-kubernetes-extensions license_name: Apache License version 2.0 -version: 1.8.4 +version: 1.8.5 libraries: - io.gsonfire: gson-fire @@ -940,7 +940,7 @@ name: io.prometheus simpleclient_httpserver license_category: binary module: extensions/druid-kubernetes-extensions license_name: Apache License version 2.0 -version: 0.9.0 +version: 0.10.0 libraries: - io.prometheus: simpleclient_httpserver @@ -950,7 +950,7 @@ name: org.bitbucket.b_c jose4j license_category: binary module: extensions/druid-kubernetes-extensions license_name: Apache License version 2.0 -version: 0.7.2 +version: 0.7.6 libraries: - org.bitbucket.b_c: jose4j @@ -980,7 +980,7 @@ name: io.prometheus simpleclient license_category: binary module: extensions/druid-kubernetes-extensions license_name: Apache License version 2.0 -version: 0.9.0 +version: 0.10.0 libraries: - io.prometheus: simpleclient @@ -990,7 +990,7 @@ name: io.kubernetes client-java-proto license_category: binary module: extensions/druid-kubernetes-extensions license_name: Apache License version 2.0 -version: 10.0.1 +version: 12.0.0 libraries: - io.kubernetes: client-java-proto @@ -1000,7 +1000,7 @@ name: org.yaml snakeyaml license_category: binary module: extensions/druid-kubernetes-extensions license_name: Apache License version 2.0 -version: 1.27 +version: 1.28 libraries: - org.yaml: snakeyaml @@ -1022,7 +1022,7 @@ module: extensions/druid-kubernetes-extensions license_name: MIT License version: 1.68 libraries: - - org.bouncycastle: bcprov-jdk15on + - org.bouncycastle: bcpkix-jdk15on --- @@ -1090,7 +1090,7 @@ name: com.github.vladimir-bukhtoyarov bucket4j-core license_category: binary module: extensions/druid-kubernetes-extensions license_name: Apache License version 2.0 -version: 4.10.0 +version: 6.0.1 libraries: - com.github.vladimir-bukhtoyarov: bucket4j-core diff --git a/pom.xml b/pom.xml index b978e063cf7d..a1e7ec4f3aff 100644 --- a/pom.xml +++ b/pom.xml @@ -114,6 +114,7 @@ 0.8.7 3.5.9 + 12.0.0 2.5.7 1.26.0 v1-rev20190607-${com.google.apis.client.version} diff --git a/server/src/main/java/org/apache/druid/client/HttpServerInventoryView.java b/server/src/main/java/org/apache/druid/client/HttpServerInventoryView.java index edb95c00002f..779f4de4613d 100644 --- a/server/src/main/java/org/apache/druid/client/HttpServerInventoryView.java +++ b/server/src/main/java/org/apache/druid/client/HttpServerInventoryView.java @@ -166,6 +166,7 @@ public void nodesRemoved(Collection nodes) public void nodeViewInitialized() { if (!initialized.getAndSet(true)) { + log.info("nodeViewInitialized!"); executor.execute(HttpServerInventoryView.this::serverInventoryInitialized); } } @@ -324,6 +325,7 @@ public void run() //segmentViewInitialized on all registered segment callbacks. private void serverInventoryInitialized() { + log.info("Check if we can call SegmentCallback.segmentViewInitialized() for all callbacks"); long start = System.currentTimeMillis(); long serverSyncWaitTimeout = config.getServerTimeout() + 2 * ChangeRequestHttpSyncer.HTTP_TIMEOUT_EXTRA_MS; diff --git a/server/src/main/java/org/apache/druid/server/http/ClusterResource.java b/server/src/main/java/org/apache/druid/server/http/ClusterResource.java index eabf51ec115e..cd443c56884b 100644 --- a/server/src/main/java/org/apache/druid/server/http/ClusterResource.java +++ b/server/src/main/java/org/apache/druid/server/http/ClusterResource.java @@ -117,7 +117,7 @@ private Collection getNodes(NodeRole nodeRole, boolean full) } @JsonInclude(JsonInclude.Include.NON_NULL) - private static class Node + public static class Node { private final String host; private final String service; @@ -125,7 +125,12 @@ private static class Node private final Integer tlsPort; @JsonCreator - public Node(String host, String service, Integer plaintextPort, Integer tlsPort) + public Node( + @JsonProperty("host") String host, + @JsonProperty("service") String service, + @JsonProperty("plaintextPort") Integer plaintextPort, + @JsonProperty("tlsPort") Integer tlsPort + ) { this.host = host; this.service = service;