From bbba172b91e664028d49789f801791bda1014e93 Mon Sep 17 00:00:00 2001 From: Himanshu Gupta Date: Wed, 30 Aug 2017 12:32:50 -0500 Subject: [PATCH 1/5] Use internal-discovery and http for talking to overlord/coordinator leaders --- .../actions/RemoteTaskActionClient.java | 68 +---- .../RemoteTaskActionClientFactory.java | 15 +- .../actions/RemoteTaskActionClientTest.java | 91 ++---- .../client/coordinator/CoordinatorClient.java | 73 +---- .../indexing/IndexingServiceClient.java | 59 +--- .../io/druid/discovery/DruidLeaderClient.java | 231 +++++++++++++++ .../guice/CoordinatorDiscoveryModule.java | 22 ++ .../guice/IndexingServiceDiscoveryModule.java | 22 ++ .../server/http/OverlordProxyServlet.java | 20 +- .../server/router/CoordinatorRuleManager.java | 75 +---- .../discovery/DruidLeaderClientTest.java | 270 ++++++++++++++++++ .../DruidCoordinatorSegmentMergerTest.java | 2 +- .../server/http/OverlordProxyServletTest.java | 15 +- .../router/TieredBrokerHostSelectorTest.java | 7 +- .../src/main/java/io/druid/cli/CliRouter.java | 20 ++ 15 files changed, 673 insertions(+), 317 deletions(-) create mode 100644 server/src/main/java/io/druid/discovery/DruidLeaderClient.java create mode 100644 server/src/test/java/io/druid/discovery/DruidLeaderClientTest.java diff --git a/indexing-service/src/main/java/io/druid/indexing/common/actions/RemoteTaskActionClient.java b/indexing-service/src/main/java/io/druid/indexing/common/actions/RemoteTaskActionClient.java index 829a59222c78..fa20e41f4677 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/actions/RemoteTaskActionClient.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/actions/RemoteTaskActionClient.java @@ -20,20 +20,14 @@ package io.druid.indexing.common.actions; import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.base.Charsets; import com.google.common.base.Throwables; -import com.metamx.http.client.HttpClient; -import com.metamx.http.client.Request; -import com.metamx.http.client.response.StatusResponseHandler; -import com.metamx.http.client.response.StatusResponseHolder; -import io.druid.client.selector.Server; -import io.druid.curator.discovery.ServerDiscoverySelector; +import com.metamx.http.client.response.FullResponseHolder; +import io.druid.discovery.DruidLeaderClient; import io.druid.indexing.common.RetryPolicy; import io.druid.indexing.common.RetryPolicyFactory; import io.druid.indexing.common.task.Task; import io.druid.java.util.common.IOE; import io.druid.java.util.common.jackson.JacksonUtils; -import io.druid.java.util.common.ISE; import io.druid.java.util.common.logger.Logger; import org.jboss.netty.channel.ChannelException; import org.jboss.netty.handler.codec.http.HttpMethod; @@ -41,35 +35,30 @@ import javax.ws.rs.core.MediaType; import java.io.IOException; -import java.net.URI; -import java.net.URISyntaxException; import java.util.Map; import java.util.Random; public class RemoteTaskActionClient implements TaskActionClient { private final Task task; - private final HttpClient httpClient; - private final ServerDiscoverySelector selector; private final RetryPolicyFactory retryPolicyFactory; private final ObjectMapper jsonMapper; + private final DruidLeaderClient druidLeaderClient; private final Random random = new Random(); private static final Logger log = new Logger(RemoteTaskActionClient.class); public RemoteTaskActionClient( Task task, - HttpClient httpClient, - ServerDiscoverySelector selector, + DruidLeaderClient druidLeaderClient, RetryPolicyFactory retryPolicyFactory, ObjectMapper jsonMapper ) { this.task = task; - this.httpClient = httpClient; - this.selector = selector; this.retryPolicyFactory = retryPolicyFactory; this.jsonMapper = jsonMapper; + this.druidLeaderClient = druidLeaderClient; } @Override @@ -83,27 +72,16 @@ public RetType submit(TaskAction taskAction) throws IOExcepti while (true) { try { - final Server server; - final URI serviceUri; - try { - server = getServiceInstance(); - serviceUri = makeServiceUri(server); - } - catch (Exception e) { - // Want to retry, so throw an IOException. - throw new IOException("Failed to locate service uri", e); - } - final StatusResponseHolder response; + final FullResponseHolder fullResponseHolder; - log.info("Submitting action for task[%s] to overlord[%s]: %s", task.getId(), serviceUri, taskAction); + log.info("Submitting action for task[%s] to overlord: [%s].", task.getId(), taskAction); try { - response = httpClient.go( - new Request(HttpMethod.POST, serviceUri.toURL()) - .setContent(MediaType.APPLICATION_JSON, dataToSend), - new StatusResponseHandler(Charsets.UTF_8) - ).get(); + fullResponseHolder = druidLeaderClient.go( + druidLeaderClient.makeRequest(HttpMethod.POST, "/druid/indexer/v1/action") + .setContent(MediaType.APPLICATION_JSON, dataToSend) + ); } catch (Exception e) { Throwables.propagateIfInstanceOf(e.getCause(), IOException.class); @@ -111,18 +89,17 @@ public RetType submit(TaskAction taskAction) throws IOExcepti throw Throwables.propagate(e); } - if (response.getStatus().getCode() / 100 == 2) { + if (fullResponseHolder.getStatus().getCode() / 100 == 2) { final Map responseDict = jsonMapper.readValue( - response.getContent(), + fullResponseHolder.getContent(), JacksonUtils.TYPE_REFERENCE_MAP_STRING_OBJECT ); return jsonMapper.convertValue(responseDict.get("result"), taskAction.getReturnTypeReference()); } else { // Want to retry, so throw an IOException. throw new IOE( - "Scary HTTP status returned: %s. Check your overlord[%s] logs for exceptions.", - response.getStatus(), - server.getHost() + "Scary HTTP status returned: %s. Check your overlord logs for exceptions.", + fullResponseHolder.getStatus() ); } } @@ -152,19 +129,4 @@ private long jitter(long input) long retval = input + (long) jitter; return retval < 0 ? 0 : retval; } - - private URI makeServiceUri(final Server instance) throws URISyntaxException - { - return new URI(instance.getScheme(), null, instance.getAddress(), instance.getPort(), "/druid/indexer/v1/action", null, null); - } - - private Server getServiceInstance() - { - final Server instance = selector.pick(); - if (instance == null) { - throw new ISE("Cannot find instance of indexer to talk to!"); - } else { - return instance; - } - } } diff --git a/indexing-service/src/main/java/io/druid/indexing/common/actions/RemoteTaskActionClientFactory.java b/indexing-service/src/main/java/io/druid/indexing/common/actions/RemoteTaskActionClientFactory.java index 2556621db1f0..efb5f6189319 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/actions/RemoteTaskActionClientFactory.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/actions/RemoteTaskActionClientFactory.java @@ -21,10 +21,8 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.inject.Inject; -import com.metamx.http.client.HttpClient; import io.druid.client.indexing.IndexingService; -import io.druid.curator.discovery.ServerDiscoverySelector; -import io.druid.guice.annotations.Global; +import io.druid.discovery.DruidLeaderClient; import io.druid.indexing.common.RetryPolicyFactory; import io.druid.indexing.common.task.Task; @@ -32,21 +30,18 @@ */ public class RemoteTaskActionClientFactory implements TaskActionClientFactory { - private final HttpClient httpClient; - private final ServerDiscoverySelector selector; + private final DruidLeaderClient druidLeaderClient; private final RetryPolicyFactory retryPolicyFactory; private final ObjectMapper jsonMapper; @Inject public RemoteTaskActionClientFactory( - @Global HttpClient httpClient, - @IndexingService ServerDiscoverySelector selector, + @IndexingService DruidLeaderClient leaderHttpClient, RetryPolicyFactory retryPolicyFactory, ObjectMapper jsonMapper ) { - this.httpClient = httpClient; - this.selector = selector; + this.druidLeaderClient = leaderHttpClient; this.retryPolicyFactory = retryPolicyFactory; this.jsonMapper = jsonMapper; } @@ -54,6 +49,6 @@ public RemoteTaskActionClientFactory( @Override public TaskActionClient create(Task task) { - return new RemoteTaskActionClient(task, httpClient, selector, retryPolicyFactory, jsonMapper); + return new RemoteTaskActionClient(task, druidLeaderClient, retryPolicyFactory, jsonMapper); } } diff --git a/indexing-service/src/test/java/io/druid/indexing/common/actions/RemoteTaskActionClientTest.java b/indexing-service/src/test/java/io/druid/indexing/common/actions/RemoteTaskActionClientTest.java index c07d444dde90..e5276cbed255 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/actions/RemoteTaskActionClientTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/actions/RemoteTaskActionClientTest.java @@ -19,15 +19,10 @@ package io.druid.indexing.common.actions; -import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.util.concurrent.Futures; -import com.metamx.http.client.HttpClient; import com.metamx.http.client.Request; -import com.metamx.http.client.response.StatusResponseHandler; -import com.metamx.http.client.response.StatusResponseHolder; -import io.druid.client.selector.Server; -import io.druid.curator.discovery.ServerDiscoverySelector; +import com.metamx.http.client.response.FullResponseHolder; +import io.druid.discovery.DruidLeaderClient; import io.druid.indexing.common.RetryPolicyConfig; import io.druid.indexing.common.RetryPolicyFactory; import io.druid.indexing.common.TaskLock; @@ -36,64 +31,33 @@ import io.druid.jackson.DefaultObjectMapper; import io.druid.java.util.common.Intervals; import org.easymock.EasyMock; +import org.jboss.netty.handler.codec.http.HttpMethod; +import org.jboss.netty.handler.codec.http.HttpResponse; import org.jboss.netty.handler.codec.http.HttpResponseStatus; import org.junit.Assert; import org.junit.Before; import org.junit.Test; import java.io.IOException; +import java.net.URL; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; -import static org.easymock.EasyMock.anyObject; -import static org.easymock.EasyMock.createMock; import static org.easymock.EasyMock.expect; import static org.easymock.EasyMock.replay; public class RemoteTaskActionClientTest { - - private HttpClient httpClient; - private ServerDiscoverySelector selector; - private Server server; + private DruidLeaderClient druidLeaderClient; List result = null; private ObjectMapper objectMapper = new DefaultObjectMapper(); @Before public void setUp() { - httpClient = createMock(HttpClient.class); - selector = createMock(ServerDiscoverySelector.class); - - server = new Server() - { - - @Override - public String getScheme() - { - return "http"; - } - - @Override - public int getPort() - { - return 8080; - } - - @Override - public String getHost() - { - return "localhost"; - } - - @Override - public String getAddress() - { - return "localhost"; - } - }; + druidLeaderClient = EasyMock.createMock(DruidLeaderClient.class); long now = System.currentTimeMillis(); @@ -106,29 +70,30 @@ public String getAddress() } @Test - public void testSubmitSimple() throws JsonProcessingException + public void testSubmitSimple() throws Exception { + Request request = new Request(HttpMethod.POST, new URL("http://localhost:1234/xx")); + expect(druidLeaderClient.makeRequest(HttpMethod.POST, "/druid/indexer/v1/action")) + .andReturn(request); + // return status code 200 and a list with size equals 1 Map responseBody = new HashMap(); responseBody.put("result", result); String strResult = objectMapper.writeValueAsString(responseBody); - StatusResponseHolder responseHolder = new StatusResponseHolder( + FullResponseHolder responseHolder = new FullResponseHolder( HttpResponseStatus.OK, + EasyMock.createNiceMock(HttpResponse.class), new StringBuilder().append(strResult) ); // set up mocks - expect(selector.pick()).andReturn(server); - replay(selector); + expect(druidLeaderClient.go(request)).andReturn(responseHolder); + replay(druidLeaderClient); - expect(httpClient.go(anyObject(Request.class), anyObject(StatusResponseHandler.class))).andReturn( - Futures.immediateFuture(responseHolder) - ); - replay(httpClient); Task task = new NoopTask("id", 0, 0, null, null, null); RemoteTaskActionClient client = new RemoteTaskActionClient( - task, httpClient, selector, new RetryPolicyFactory( + task, druidLeaderClient, new RetryPolicyFactory( new RetryPolicyConfig() ), objectMapper ); @@ -140,33 +105,35 @@ task, httpClient, selector, new RetryPolicyFactory( } Assert.assertEquals(1, result.size()); - EasyMock.verify(selector, httpClient); + EasyMock.verify(druidLeaderClient); } @Test(expected = IOException.class) - public void testSubmitWithIllegalStatusCode() throws IOException + public void testSubmitWithIllegalStatusCode() throws Exception { // return status code 400 and a list with size equals 1 + Request request = new Request(HttpMethod.POST, new URL("http://localhost:1234/xx")); + expect(druidLeaderClient.makeRequest(HttpMethod.POST, "/druid/indexer/v1/action")) + .andReturn(request); + + // return status code 200 and a list with size equals 1 Map responseBody = new HashMap(); responseBody.put("result", result); String strResult = objectMapper.writeValueAsString(responseBody); - StatusResponseHolder responseHolder = new StatusResponseHolder( + FullResponseHolder responseHolder = new FullResponseHolder( HttpResponseStatus.BAD_REQUEST, + EasyMock.createNiceMock(HttpResponse.class), new StringBuilder().append(strResult) ); // set up mocks - expect(selector.pick()).andReturn(server); - replay(selector); + expect(druidLeaderClient.go(request)).andReturn(responseHolder); + replay(druidLeaderClient); - expect(httpClient.go(anyObject(Request.class), anyObject(StatusResponseHandler.class))).andReturn( - Futures.immediateFuture(responseHolder) - ); - replay(httpClient); Task task = new NoopTask("id", 0, 0, null, null, null); RemoteTaskActionClient client = new RemoteTaskActionClient( - task, httpClient, selector, new RetryPolicyFactory( + task, druidLeaderClient, new RetryPolicyFactory( objectMapper.readValue("{\"maxRetryCount\":0}", RetryPolicyConfig.class) ), objectMapper ); diff --git a/server/src/main/java/io/druid/client/coordinator/CoordinatorClient.java b/server/src/main/java/io/druid/client/coordinator/CoordinatorClient.java index b70da3e5d75a..20b65f065bef 100644 --- a/server/src/main/java/io/druid/client/coordinator/CoordinatorClient.java +++ b/server/src/main/java/io/druid/client/coordinator/CoordinatorClient.java @@ -21,67 +21,48 @@ import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.base.Charsets; import com.google.common.base.Throwables; import com.google.inject.Inject; -import com.metamx.http.client.HttpClient; -import com.metamx.http.client.Request; -import com.metamx.http.client.response.StatusResponseHandler; -import com.metamx.http.client.response.StatusResponseHolder; +import com.metamx.http.client.response.FullResponseHolder; import io.druid.client.ImmutableSegmentLoadInfo; -import io.druid.client.selector.Server; -import io.druid.curator.discovery.ServerDiscoverySelector; -import io.druid.guice.annotations.Global; +import io.druid.discovery.DruidLeaderClient; import io.druid.java.util.common.ISE; - import io.druid.java.util.common.StringUtils; import org.jboss.netty.handler.codec.http.HttpMethod; import org.jboss.netty.handler.codec.http.HttpResponseStatus; import org.joda.time.Interval; -import java.net.URI; -import java.net.URL; import java.util.List; public class CoordinatorClient { - private static final StatusResponseHandler RESPONSE_HANDLER = new StatusResponseHandler(Charsets.UTF_8); - - private final HttpClient client; + private final DruidLeaderClient druidLeaderClient; private final ObjectMapper jsonMapper; - private final ServerDiscoverySelector selector; @Inject public CoordinatorClient( - @Global HttpClient client, ObjectMapper jsonMapper, - @Coordinator ServerDiscoverySelector selector + @Coordinator DruidLeaderClient druidLeaderClient ) { - this.client = client; this.jsonMapper = jsonMapper; - this.selector = selector; + this.druidLeaderClient = druidLeaderClient; } public List fetchServerView(String dataSource, Interval interval, boolean incompleteOk) { try { - StatusResponseHolder response = client.go( - new Request( - HttpMethod.GET, - new URL( - StringUtils.format( - "%s/datasources/%s/intervals/%s/serverview?partial=%s", - baseUrl(), - dataSource, - interval.toString().replace("/", "_"), - incompleteOk - ) - ) - ), - RESPONSE_HANDLER - ).get(); + FullResponseHolder response = druidLeaderClient.go( + druidLeaderClient.makeRequest(HttpMethod.GET, + StringUtils.format( + "/druid/coordinator/v1/datasources/%s/intervals/%s/serverview?partial=%s", + dataSource, + interval.toString().replace("/", "_"), + incompleteOk + )) + ); + if (!response.getStatus().equals(HttpResponseStatus.OK)) { throw new ISE( "Error while fetching serverView status[%s] content[%s]", @@ -100,28 +81,4 @@ public List fetchServerView(String dataSource, Interva throw Throwables.propagate(e); } } - - - private String baseUrl() - { - try { - final Server instance = selector.pick(); - if (instance == null) { - throw new ISE("Cannot find instance of coordinator.. Did you set `druid.selectors.coordinator.serviceName`?"); - } - - return new URI( - instance.getScheme(), - null, - instance.getAddress(), - instance.getPort(), - "/druid/coordinator/v1", - null, - null - ).toString(); - } - catch (Exception e) { - throw Throwables.propagate(e); - } - } } diff --git a/server/src/main/java/io/druid/client/indexing/IndexingServiceClient.java b/server/src/main/java/io/druid/client/indexing/IndexingServiceClient.java index 96b5d9bb7d9c..71a03a5a41df 100644 --- a/server/src/main/java/io/druid/client/indexing/IndexingServiceClient.java +++ b/server/src/main/java/io/druid/client/indexing/IndexingServiceClient.java @@ -22,44 +22,29 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Throwables; import com.google.inject.Inject; -import com.metamx.http.client.HttpClient; -import com.metamx.http.client.Request; -import com.metamx.http.client.response.InputStreamResponseHandler; -import io.druid.client.selector.Server; -import io.druid.curator.discovery.ServerDiscoverySelector; -import io.druid.guice.annotations.Global; +import io.druid.discovery.DruidLeaderClient; import io.druid.java.util.common.IAE; -import io.druid.java.util.common.ISE; -import io.druid.java.util.common.StringUtils; import io.druid.timeline.DataSegment; import org.jboss.netty.handler.codec.http.HttpMethod; import org.joda.time.Interval; import javax.ws.rs.core.MediaType; -import java.io.InputStream; -import java.net.URI; -import java.net.URL; import java.util.Iterator; import java.util.List; public class IndexingServiceClient { - private static final InputStreamResponseHandler RESPONSE_HANDLER = new InputStreamResponseHandler(); - - private final HttpClient client; + private final DruidLeaderClient druidLeaderClient; private final ObjectMapper jsonMapper; - private final ServerDiscoverySelector selector; @Inject public IndexingServiceClient( - @Global HttpClient client, ObjectMapper jsonMapper, - @IndexingService ServerDiscoverySelector selector + @IndexingService DruidLeaderClient druidLeaderClient ) { - this.client = client; this.jsonMapper = jsonMapper; - this.selector = selector; + this.druidLeaderClient = druidLeaderClient; } public void mergeSegments(List segments) @@ -95,39 +80,15 @@ public void upgradeSegments(String dataSource, Interval interval) runQuery(new ClientConversionQuery(dataSource, interval)); } - private InputStream runQuery(Object queryObject) + private void runQuery(Object queryObject) { try { - return client.go( - new Request( + druidLeaderClient.go( + druidLeaderClient.makeRequest( HttpMethod.POST, - new URL(StringUtils.format("%s/task", baseUrl())) - ).setContent(MediaType.APPLICATION_JSON, jsonMapper.writeValueAsBytes(queryObject)), - RESPONSE_HANDLER - ).get(); - } - catch (Exception e) { - throw Throwables.propagate(e); - } - } - - private String baseUrl() - { - try { - final Server instance = selector.pick(); - if (instance == null) { - throw new ISE("Cannot find instance of indexingService"); - } - - return new URI( - instance.getScheme(), - null, - instance.getAddress(), - instance.getPort(), - "/druid/indexer/v1", - null, - null - ).toString(); + "/druid/indexer/v1/task" + ).setContent(MediaType.APPLICATION_JSON, jsonMapper.writeValueAsBytes(queryObject)) + ); } catch (Exception e) { throw Throwables.propagate(e); diff --git a/server/src/main/java/io/druid/discovery/DruidLeaderClient.java b/server/src/main/java/io/druid/discovery/DruidLeaderClient.java new file mode 100644 index 000000000000..bb41619ea1a2 --- /dev/null +++ b/server/src/main/java/io/druid/discovery/DruidLeaderClient.java @@ -0,0 +1,231 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.discovery; + +import com.google.common.base.Charsets; +import com.metamx.http.client.HttpClient; +import com.metamx.http.client.Request; +import com.metamx.http.client.response.FullResponseHandler; +import com.metamx.http.client.response.FullResponseHolder; +import io.druid.client.selector.Server; +import io.druid.curator.discovery.ServerDiscoverySelector; +import io.druid.java.util.common.ISE; +import io.druid.java.util.common.StringUtils; +import io.druid.java.util.common.logger.Logger; +import org.jboss.netty.handler.codec.http.HttpMethod; +import org.jboss.netty.handler.codec.http.HttpResponseStatus; + +import java.net.MalformedURLException; +import java.net.URL; +import java.util.Iterator; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.atomic.AtomicReference; + +/** + * This class facilitates interaction with Coordinator/Overlord leader nodes. Instance of this class is injected + * via Guice with annotations @Coordinator or @IndexingService . + * Usage: + * Request request = druidLeaderClient.makeRequest(HttpMethod, requestPath) + * request.setXXX(..) + * FullResponseHolder responseHolder = druidLeaderClient.go(request) + */ +public class DruidLeaderClient +{ + private final Logger log = new Logger(DruidLeaderClient.class); + + private static final int MAX_RETRIES = 3; + + private final HttpClient httpClient; + private final DruidNodeDiscoveryProvider druidNodeDiscoveryProvider; + private final String nodeTypeToWatch; + + private final String leaderRequestPath; + + //Note: This is kept for back compatibility with pre 0.11.0 releases and should be removed in future. + private final ServerDiscoverySelector serverDiscoverySelector; + + private AtomicReference currentKnownLeader = new AtomicReference<>(); + + public DruidLeaderClient( + HttpClient httpClient, + DruidNodeDiscoveryProvider druidNodeDiscoveryProvider, + String nodeTypeToWatch, + String leaderRequestPath, + ServerDiscoverySelector serverDiscoverySelector + ) + { + this.httpClient = httpClient; + this.druidNodeDiscoveryProvider = druidNodeDiscoveryProvider; + this.nodeTypeToWatch = nodeTypeToWatch; + this.leaderRequestPath = leaderRequestPath; + this.serverDiscoverySelector = serverDiscoverySelector; + } + + public Request makeRequest(HttpMethod httpMethod, String urlPath) throws MalformedURLException + { + return new Request(httpMethod, new URL(StringUtils.format("%s%s", getCurrentKnownLeader(true), urlPath))); + } + + public FullResponseHolder go(Request request) throws InterruptedException + { + for (int counter = 0; counter < MAX_RETRIES; counter++) { + + final FullResponseHolder fullResponseHolder; + + try { + fullResponseHolder = httpClient.go(request, new FullResponseHandler(Charsets.UTF_8)).get(); + } + catch (ExecutionException ex) { + // can happen if the node is stopped. + log.info("Request[%s] failed with msg [%s].", request.getUrl(), ex.getMessage()); + log.debug(ex, "Request[%s] failed.", request.getUrl()); + + try { + if (request.getUrl().getQuery() == null) { + request = withUrl( + request, + new URL(StringUtils.format("%s%s", getCurrentKnownLeader(false), request.getUrl().getPath())) + ); + } else { + request = withUrl( + request, + new URL(StringUtils.format("%s%s?%s", + getCurrentKnownLeader(false), + request.getUrl().getPath(), + request.getUrl().getQuery() + )) + ); + } + continue; + } + catch (MalformedURLException e) { + throw new ISE( + e, + "failed to build url with path[%] and query string [%s].", + request.getUrl().getPath(), + request.getUrl().getQuery() + ); + } + } + + if (HttpResponseStatus.TEMPORARY_REDIRECT.equals(fullResponseHolder.getResponse().getStatus())) { + String redirectUrlStr = fullResponseHolder.getResponse().headers().get("Location"); + if (redirectUrlStr == null) { + throw new ISE("No redirect location is found in response from url[%s].", request.getUrl()); + } + + log.info("Request[%s] received redirect response to location [%s].", request.getUrl(), redirectUrlStr); + + final URL redirectUrl; + try { + redirectUrl = new URL(redirectUrlStr); + } + catch (MalformedURLException ex) { + throw new ISE( + ex, + "Malformed redirect location is found in response from url[%s], new location[%s].", + request.getUrl(), + redirectUrlStr + ); + } + + //update known leader location + currentKnownLeader.set(StringUtils.format( + "%s://%s:%s", + redirectUrl.getProtocol(), + redirectUrl.getHost(), + redirectUrl.getPort() + )); + + request = withUrl(request, redirectUrl); + } else { + return fullResponseHolder; + } + } + + throw new ISE("Retries exhausted, couldn't fulfill request to [%s].", request.getUrl()); + } + + public String findCurrentLeader() + { + final FullResponseHolder responseHolder; + try { + responseHolder = go(makeRequest(HttpMethod.GET, leaderRequestPath)); + } + catch (Exception ex) { + throw new ISE(ex, "Couldn't find leader."); + } + + if (responseHolder.getStatus().getCode() == 200) { + return responseHolder.getContent(); + } else { + throw new ISE( + "Couldn't find leader, failed response status is [%s] and content [%s].", + responseHolder.getStatus().getCode(), + responseHolder.getContent() + ); + } + } + + private String getCurrentKnownLeader(final boolean cached) + { + return currentKnownLeader.accumulateAndGet( + null, + (current, given) -> current == null || !cached ? pickOneHost() : current + ); + } + + private String pickOneHost() + { + Server server = serverDiscoverySelector.pick(); + if (server != null) { + return StringUtils.format( + "%s://%s:%s", + server.getScheme(), + server.getAddress(), + server.getPort() + ); + } + + Iterator iter = druidNodeDiscoveryProvider.getForNodeType(nodeTypeToWatch) + .getAllNodes() + .iterator(); + if (iter.hasNext()) { + DiscoveryDruidNode node = iter.next(); + return StringUtils.format( + "%s://%s", + node.getDruidNode().getServiceScheme(), + node.getDruidNode().getHostAndPortToUse() + ); + } + + throw new ISE("Couldn't find any servers."); + } + + private Request withUrl(Request old, URL url) + { + Request req = new Request(old.getMethod(), url); + req.addHeaderValues(old.getHeaders()); + if (old.hasContent()) { + req.setContent(old.getContent()); + } + return req; + } +} diff --git a/server/src/main/java/io/druid/guice/CoordinatorDiscoveryModule.java b/server/src/main/java/io/druid/guice/CoordinatorDiscoveryModule.java index 908ce87f3787..3c147a425f5f 100644 --- a/server/src/main/java/io/druid/guice/CoordinatorDiscoveryModule.java +++ b/server/src/main/java/io/druid/guice/CoordinatorDiscoveryModule.java @@ -22,10 +22,14 @@ import com.google.inject.Binder; import com.google.inject.Module; import com.google.inject.Provides; +import com.metamx.http.client.HttpClient; import io.druid.client.coordinator.Coordinator; import io.druid.client.coordinator.CoordinatorSelectorConfig; import io.druid.curator.discovery.ServerDiscoveryFactory; import io.druid.curator.discovery.ServerDiscoverySelector; +import io.druid.discovery.DruidLeaderClient; +import io.druid.discovery.DruidNodeDiscoveryProvider; +import io.druid.guice.annotations.Global; /** */ @@ -47,4 +51,22 @@ public ServerDiscoverySelector getServiceProvider( { return serverDiscoveryFactory.createSelector(config.getServiceName()); } + + @Provides + @Coordinator + @LazySingleton + public DruidLeaderClient getLeaderHttpClient( + @Global HttpClient httpClient, + DruidNodeDiscoveryProvider druidNodeDiscoveryProvider, + @Coordinator ServerDiscoverySelector serverDiscoverySelector + ) + { + return new DruidLeaderClient( + httpClient, + druidNodeDiscoveryProvider, + DruidNodeDiscoveryProvider.NODE_TYPE_COORDINATOR, + "/druid/coordinator/v1/leader", + serverDiscoverySelector + ); + } } diff --git a/server/src/main/java/io/druid/guice/IndexingServiceDiscoveryModule.java b/server/src/main/java/io/druid/guice/IndexingServiceDiscoveryModule.java index 167e003f2365..bfdf95e84af8 100644 --- a/server/src/main/java/io/druid/guice/IndexingServiceDiscoveryModule.java +++ b/server/src/main/java/io/druid/guice/IndexingServiceDiscoveryModule.java @@ -22,10 +22,14 @@ import com.google.inject.Binder; import com.google.inject.Module; import com.google.inject.Provides; +import com.metamx.http.client.HttpClient; import io.druid.client.indexing.IndexingService; import io.druid.client.indexing.IndexingServiceSelectorConfig; import io.druid.curator.discovery.ServerDiscoveryFactory; import io.druid.curator.discovery.ServerDiscoverySelector; +import io.druid.discovery.DruidLeaderClient; +import io.druid.discovery.DruidNodeDiscoveryProvider; +import io.druid.guice.annotations.Global; /** */ @@ -47,4 +51,22 @@ public ServerDiscoverySelector getServiceProvider( { return serverDiscoveryFactory.createSelector(config.getServiceName()); } + + @Provides + @IndexingService + @LazySingleton + public DruidLeaderClient getLeaderHttpClient( + @Global HttpClient httpClient, + DruidNodeDiscoveryProvider druidNodeDiscoveryProvider, + @IndexingService ServerDiscoverySelector serverDiscoverySelector + ) + { + return new DruidLeaderClient( + httpClient, + druidNodeDiscoveryProvider, + DruidNodeDiscoveryProvider.NODE_TYPE_OVERLORD, + "/druid/indexer/v1/leader", + serverDiscoverySelector + ); + } } diff --git a/server/src/main/java/io/druid/server/http/OverlordProxyServlet.java b/server/src/main/java/io/druid/server/http/OverlordProxyServlet.java index a23c5702eda1..844edf2dbff6 100644 --- a/server/src/main/java/io/druid/server/http/OverlordProxyServlet.java +++ b/server/src/main/java/io/druid/server/http/OverlordProxyServlet.java @@ -21,12 +21,9 @@ import com.google.common.base.Throwables; import com.google.inject.Inject; - import io.druid.client.indexing.IndexingService; -import io.druid.client.selector.Server; -import io.druid.curator.discovery.ServerDiscoverySelector; +import io.druid.discovery.DruidLeaderClient; import io.druid.java.util.common.ISE; - import org.eclipse.jetty.proxy.ProxyServlet; import javax.servlet.http.HttpServletRequest; @@ -38,27 +35,28 @@ */ public class OverlordProxyServlet extends ProxyServlet { - private final ServerDiscoverySelector selector; + private final DruidLeaderClient druidLeaderClient; @Inject OverlordProxyServlet( - @IndexingService ServerDiscoverySelector selector + @IndexingService DruidLeaderClient druidLeaderClient ) { - this.selector = selector; + this.druidLeaderClient = druidLeaderClient; } @Override protected String rewriteTarget(HttpServletRequest request) { try { - final Server indexer = selector.pick(); - if (indexer == null) { - throw new ISE("Can't find indexingService, did you configure druid.selectors.indexing.serviceName same as druid.service at overlord?"); + final String overlordLeader = druidLeaderClient.findCurrentLeader(); + if (overlordLeader == null) { + throw new ISE("Can't find Overlord leader."); } + return new URI( request.getScheme(), - indexer.getHost(), + overlordLeader, request.getRequestURI(), request.getQueryString(), null diff --git a/server/src/main/java/io/druid/server/router/CoordinatorRuleManager.java b/server/src/main/java/io/druid/server/router/CoordinatorRuleManager.java index 66bcdb0d2e56..70a0148c7aa5 100644 --- a/server/src/main/java/io/druid/server/router/CoordinatorRuleManager.java +++ b/server/src/main/java/io/druid/server/router/CoordinatorRuleManager.java @@ -21,20 +21,15 @@ import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.base.Charsets; import com.google.common.base.Supplier; import com.google.common.collect.Lists; import com.google.inject.Inject; -import com.metamx.http.client.HttpClient; -import com.metamx.http.client.Request; -import com.metamx.http.client.response.FullResponseHandler; import com.metamx.http.client.response.FullResponseHolder; -import io.druid.client.selector.Server; import io.druid.concurrent.Execs; -import io.druid.curator.discovery.ServerDiscoverySelector; +import io.druid.discovery.DruidLeaderClient; import io.druid.guice.ManageLifecycle; -import io.druid.guice.annotations.Global; import io.druid.guice.annotations.Json; +import io.druid.java.util.common.ISE; import io.druid.java.util.common.concurrent.ScheduledExecutors; import io.druid.java.util.common.lifecycle.LifecycleStart; import io.druid.java.util.common.lifecycle.LifecycleStop; @@ -44,9 +39,6 @@ import org.jboss.netty.handler.codec.http.HttpResponseStatus; import org.joda.time.Duration; -import java.net.URI; -import java.net.URISyntaxException; -import java.net.URL; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -60,14 +52,13 @@ public class CoordinatorRuleManager { private static final Logger log = new Logger(CoordinatorRuleManager.class); - private final HttpClient httpClient; private final ObjectMapper jsonMapper; private final Supplier config; - private final ServerDiscoverySelector selector; - private final FullResponseHandler responseHandler; private final AtomicReference>> rules; + private final DruidLeaderClient druidLeaderClient; + private volatile ScheduledExecutorService exec; private final Object lock = new Object(); @@ -76,18 +67,15 @@ public class CoordinatorRuleManager @Inject public CoordinatorRuleManager( - @Global HttpClient httpClient, @Json ObjectMapper jsonMapper, Supplier config, - ServerDiscoverySelector selector + DruidLeaderClient druidLeaderClient ) { - this.httpClient = httpClient; this.jsonMapper = jsonMapper; this.config = config; - this.selector = selector; + this.druidLeaderClient = druidLeaderClient; - this.responseHandler = new FullResponseHandler(Charsets.UTF_8); this.rules = new AtomicReference<>( new ConcurrentHashMap>() ); @@ -146,29 +134,16 @@ public boolean isStarted() public void poll() { try { - String url = getRuleURL(); - if (url == null) { - return; - } + FullResponseHolder response = druidLeaderClient.go( + druidLeaderClient.makeRequest(HttpMethod.GET, config.get().getRulesEndpoint()) + ); - FullResponseHolder response = httpClient.go( - new Request( - HttpMethod.GET, - new URL(url) - ), - responseHandler - ).get(); - - if (response.getStatus().equals(HttpResponseStatus.FOUND)) { - url = response.getResponse().headers().get("Location"); - log.info("Redirecting rule request to [%s]", url); - response = httpClient.go( - new Request( - HttpMethod.GET, - new URL(url) - ), - responseHandler - ).get(); + if (!response.getStatus().equals(HttpResponseStatus.OK)) { + throw new ISE( + "Error while polling rules, status[%s] content[%s]", + response.getStatus(), + response.getContent() + ); } ConcurrentHashMap> newRules = new ConcurrentHashMap<>( @@ -200,24 +175,4 @@ public List getRulesWithDefault(final String dataSource) } return retVal; } - - private String getRuleURL() throws URISyntaxException - { - Server server = selector.pick(); - - if (server == null) { - log.error("No instances found for [%s]!", config.get().getCoordinatorServiceName()); - return null; - } - - return new URI( - server.getScheme(), - null, - server.getAddress(), - server.getPort(), - config.get().getRulesEndpoint(), - null, - null - ).toString(); - } } diff --git a/server/src/test/java/io/druid/discovery/DruidLeaderClientTest.java b/server/src/test/java/io/druid/discovery/DruidLeaderClientTest.java new file mode 100644 index 000000000000..630412a5c27c --- /dev/null +++ b/server/src/test/java/io/druid/discovery/DruidLeaderClientTest.java @@ -0,0 +1,270 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.discovery; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.inject.Binder; +import com.google.inject.Inject; +import com.google.inject.Injector; +import com.google.inject.Key; +import com.google.inject.Module; +import com.google.inject.name.Named; +import com.google.inject.name.Names; +import com.google.inject.servlet.GuiceFilter; +import com.metamx.http.client.HttpClient; +import com.metamx.http.client.Request; +import io.druid.curator.discovery.ServerDiscoverySelector; +import io.druid.guice.GuiceInjectors; +import io.druid.guice.Jerseys; +import io.druid.guice.JsonConfigProvider; +import io.druid.guice.LazySingleton; +import io.druid.guice.LifecycleModule; +import io.druid.guice.annotations.Self; +import io.druid.initialization.Initialization; +import io.druid.java.util.common.StringUtils; +import io.druid.server.DruidNode; +import io.druid.server.initialization.BaseJettyTest; +import io.druid.server.initialization.ServerConfig; +import io.druid.server.initialization.jetty.JettyServerInitializer; +import org.easymock.EasyMock; +import org.eclipse.jetty.server.Handler; +import org.eclipse.jetty.server.Server; +import org.eclipse.jetty.server.handler.HandlerList; +import org.eclipse.jetty.servlet.DefaultServlet; +import org.eclipse.jetty.servlet.ServletContextHandler; +import org.eclipse.jetty.servlet.ServletHolder; +import org.jboss.netty.handler.codec.http.HttpMethod; +import org.junit.Assert; +import org.junit.Test; + +import javax.ws.rs.GET; +import javax.ws.rs.POST; +import javax.ws.rs.Path; +import javax.ws.rs.Produces; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; +import java.net.URI; + +/** + */ +public class DruidLeaderClientTest extends BaseJettyTest +{ + private DiscoveryDruidNode discoveryDruidNode; + private HttpClient httpClient; + + @Override + protected Injector setupInjector() + { + final DruidNode node = new DruidNode("test", "localhost", null, null, new ServerConfig()); + discoveryDruidNode = new DiscoveryDruidNode(node, "test", ImmutableMap.of()); + + Injector injector = Initialization.makeInjectorWithModules( + GuiceInjectors.makeStartupInjector(), ImmutableList.of( + new Module() + { + @Override + public void configure(Binder binder) + { + JsonConfigProvider.bindInstance( + binder, + Key.get(DruidNode.class, Self.class), + node + ); + binder.bind(Integer.class).annotatedWith(Names.named("port")).toInstance(node.getPlaintextPort()); + binder.bind(JettyServerInitializer.class).to(TestJettyServerInitializer.class).in(LazySingleton.class); + Jerseys.addResource(binder, SimpleResource.class); + LifecycleModule.register(binder, Server.class); + } + } + ) + ); + + httpClient = injector.getInstance(ClientHolder.class).getClient(); + return injector; + } + + @Test + public void testSimple() throws Exception + { + DruidNodeDiscovery druidNodeDiscovery = EasyMock.createMock(DruidNodeDiscovery.class); + EasyMock.expect(druidNodeDiscovery.getAllNodes()).andReturn( + ImmutableList.of(discoveryDruidNode) + ); + + DruidNodeDiscoveryProvider druidNodeDiscoveryProvider = EasyMock.createMock(DruidNodeDiscoveryProvider.class); + EasyMock.expect(druidNodeDiscoveryProvider.getForNodeType("nodetype")).andReturn(druidNodeDiscovery); + + EasyMock.replay(druidNodeDiscovery, druidNodeDiscoveryProvider); + + DruidLeaderClient druidLeaderClient = new DruidLeaderClient( + httpClient, + druidNodeDiscoveryProvider, + "nodetype", + "/simple/leader", + EasyMock.createNiceMock(ServerDiscoverySelector.class) + ); + + Request request = druidLeaderClient.makeRequest(HttpMethod.POST, "/simple/direct"); + request.setContent("hello".getBytes("UTF-8")); + Assert.assertEquals("hello", druidLeaderClient.go(request).getContent()); + } + + @Test + public void testRedirection() throws Exception + { + DruidNodeDiscovery druidNodeDiscovery = EasyMock.createMock(DruidNodeDiscovery.class); + EasyMock.expect(druidNodeDiscovery.getAllNodes()).andReturn( + ImmutableList.of(discoveryDruidNode) + ); + + DruidNodeDiscoveryProvider druidNodeDiscoveryProvider = EasyMock.createMock(DruidNodeDiscoveryProvider.class); + EasyMock.expect(druidNodeDiscoveryProvider.getForNodeType("nodetype")).andReturn(druidNodeDiscovery); + + EasyMock.replay(druidNodeDiscovery, druidNodeDiscoveryProvider); + + DruidLeaderClient druidLeaderClient = new DruidLeaderClient( + httpClient, + druidNodeDiscoveryProvider, + "nodetype", + "/simple/leader", + EasyMock.createNiceMock(ServerDiscoverySelector.class) + ); + + Request request = druidLeaderClient.makeRequest(HttpMethod.POST, "/simple/redirect"); + request.setContent("hello".getBytes("UTF-8")); + Assert.assertEquals("hello", druidLeaderClient.go(request).getContent()); + } + + @Test + public void testServerFailureAndRedirect() throws Exception + { + ServerDiscoverySelector serverDiscoverySelector = EasyMock.createMock(ServerDiscoverySelector.class); + EasyMock.expect(serverDiscoverySelector.pick()).andReturn(null).anyTimes(); + + DruidNodeDiscovery druidNodeDiscovery = EasyMock.createMock(DruidNodeDiscovery.class); + EasyMock.expect(druidNodeDiscovery.getAllNodes()).andReturn( + ImmutableList.of(new DiscoveryDruidNode( + new DruidNode("test", "dummyhost", 64231, null, new ServerConfig()), + "test", + ImmutableMap.of() + )) + ); + EasyMock.expect(druidNodeDiscovery.getAllNodes()).andReturn( + ImmutableList.of(discoveryDruidNode) + ); + + DruidNodeDiscoveryProvider druidNodeDiscoveryProvider = EasyMock.createMock(DruidNodeDiscoveryProvider.class); + EasyMock.expect(druidNodeDiscoveryProvider.getForNodeType("nodetype")).andReturn(druidNodeDiscovery).anyTimes(); + + EasyMock.replay(serverDiscoverySelector, druidNodeDiscovery, druidNodeDiscoveryProvider); + + DruidLeaderClient druidLeaderClient = new DruidLeaderClient( + httpClient, + druidNodeDiscoveryProvider, + "nodetype", + "/simple/leader", + serverDiscoverySelector + ); + + Request request = druidLeaderClient.makeRequest(HttpMethod.POST, "/simple/redirect"); + request.setContent("hello".getBytes("UTF-8")); + Assert.assertEquals("hello", druidLeaderClient.go(request).getContent()); + } + + @Test + public void testFindCurrentLeader() throws Exception + { + DruidNodeDiscovery druidNodeDiscovery = EasyMock.createMock(DruidNodeDiscovery.class); + EasyMock.expect(druidNodeDiscovery.getAllNodes()).andReturn( + ImmutableList.of(discoveryDruidNode) + ); + + DruidNodeDiscoveryProvider druidNodeDiscoveryProvider = EasyMock.createMock(DruidNodeDiscoveryProvider.class); + EasyMock.expect(druidNodeDiscoveryProvider.getForNodeType("nodetype")).andReturn(druidNodeDiscovery); + + EasyMock.replay(druidNodeDiscovery, druidNodeDiscoveryProvider); + + DruidLeaderClient druidLeaderClient = new DruidLeaderClient( + httpClient, + druidNodeDiscoveryProvider, + "nodetype", + "/simple/leader", + EasyMock.createNiceMock(ServerDiscoverySelector.class) + ); + + Assert.assertEquals("http://localhost:1234/", druidLeaderClient.findCurrentLeader()); + } + + private static class TestJettyServerInitializer implements JettyServerInitializer + { + @Override + public void initialize(Server server, Injector injector) + { + final ServletContextHandler root = new ServletContextHandler(ServletContextHandler.SESSIONS); + root.addServlet(new ServletHolder(new DefaultServlet()), "/*"); + root.addFilter(GuiceFilter.class, "/*", null); + + final HandlerList handlerList = new HandlerList(); + handlerList.setHandlers(new Handler[]{root}); + server.setHandler(handlerList); + } + } + + @Path("/simple") + public static class SimpleResource + { + private final int port; + + @Inject + public SimpleResource(@Named("port") int port) + { + this.port = port; + } + + @POST + @Path("/direct") + @Produces(MediaType.APPLICATION_JSON) + public Response direct(String input) + { + if ("hello".equals(input)) { + return Response.ok("hello").build(); + } else { + return Response.serverError().build(); + } + } + + @POST + @Path("/redirect") + @Produces(MediaType.APPLICATION_JSON) + public Response redirecting() throws Exception + { + return Response.temporaryRedirect(new URI(StringUtils.format("http://localhost:%s/simple/direct", port))).build(); + } + + @GET + @Path("/leader") + @Produces(MediaType.APPLICATION_JSON) + public Response leader() + { + return Response.ok("http://localhost:1234/").build(); + } + } +} diff --git a/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorSegmentMergerTest.java b/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorSegmentMergerTest.java index eec7a87d88d6..686d7399d57e 100644 --- a/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorSegmentMergerTest.java +++ b/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorSegmentMergerTest.java @@ -455,7 +455,7 @@ private static List> merge(final Collection segme EasyMock.replay(configManager); final List> retVal = Lists.newArrayList(); - final IndexingServiceClient indexingServiceClient = new IndexingServiceClient(null, null, null) + final IndexingServiceClient indexingServiceClient = new IndexingServiceClient(null, null) { @Override public void mergeSegments(List segmentsToMerge) diff --git a/server/src/test/java/io/druid/server/http/OverlordProxyServletTest.java b/server/src/test/java/io/druid/server/http/OverlordProxyServletTest.java index 20ad1f17dd5f..120a529ddb20 100644 --- a/server/src/test/java/io/druid/server/http/OverlordProxyServletTest.java +++ b/server/src/test/java/io/druid/server/http/OverlordProxyServletTest.java @@ -19,8 +19,7 @@ package io.druid.server.http; -import io.druid.client.selector.Server; -import io.druid.curator.discovery.ServerDiscoverySelector; +import io.druid.discovery.DruidLeaderClient; import org.easymock.EasyMock; import org.junit.Assert; import org.junit.Test; @@ -33,17 +32,17 @@ public class OverlordProxyServletTest @Test public void testRewriteURI() { - ServerDiscoverySelector selector = EasyMock.createMock(ServerDiscoverySelector.class); - Server server = EasyMock.createMock(Server.class); - EasyMock.expect(server.getHost()).andReturn("overlord:port"); - EasyMock.expect(selector.pick()).andReturn(server).anyTimes(); + DruidLeaderClient druidLeaderClient = EasyMock.createMock(DruidLeaderClient.class); + EasyMock.expect(druidLeaderClient.findCurrentLeader()).andReturn("overlord:port"); + HttpServletRequest request = EasyMock.createMock(HttpServletRequest.class); EasyMock.expect(request.getScheme()).andReturn("https").anyTimes(); EasyMock.expect(request.getQueryString()).andReturn("param1=test¶m2=test2").anyTimes(); EasyMock.expect(request.getRequestURI()).andReturn("/druid/overlord/worker").anyTimes(); - EasyMock.replay(server, selector, request); - URI uri = URI.create(new OverlordProxyServlet(selector).rewriteTarget(request)); + EasyMock.replay(druidLeaderClient, request); + + URI uri = URI.create(new OverlordProxyServlet(druidLeaderClient).rewriteTarget(request)); Assert.assertEquals("https://overlord:port/druid/overlord/worker?param1=test¶m2=test2", uri.toString()); } diff --git a/server/src/test/java/io/druid/server/router/TieredBrokerHostSelectorTest.java b/server/src/test/java/io/druid/server/router/TieredBrokerHostSelectorTest.java index a2c253dd4052..7cc0a9c5fd8e 100644 --- a/server/src/test/java/io/druid/server/router/TieredBrokerHostSelectorTest.java +++ b/server/src/test/java/io/druid/server/router/TieredBrokerHostSelectorTest.java @@ -27,13 +27,11 @@ import com.google.common.collect.ImmutableSet; import com.google.common.collect.Lists; import com.google.common.collect.Maps; -import com.metamx.http.client.HttpClient; import io.druid.client.DruidServer; import io.druid.client.selector.Server; import io.druid.discovery.DiscoveryDruidNode; import io.druid.discovery.DruidNodeDiscovery; import io.druid.discovery.DruidNodeDiscoveryProvider; -import io.druid.guice.annotations.Global; import io.druid.guice.annotations.Json; import io.druid.java.util.common.Intervals; import io.druid.java.util.common.Pair; @@ -115,7 +113,7 @@ public void registerListener(Listener listener) EasyMock.replay(druidNodeDiscoveryProvider); brokerSelector = new TieredBrokerHostSelector( - new TestRuleManager(null, null, null), + new TestRuleManager(null, null), new TieredBrokerConfig() { @Override @@ -320,12 +318,11 @@ public List apply(@Nullable List servers) private static class TestRuleManager extends CoordinatorRuleManager { public TestRuleManager( - @Global HttpClient httpClient, @Json ObjectMapper jsonMapper, Supplier config ) { - super(httpClient, jsonMapper, config, null); + super(jsonMapper, config, null); } @Override diff --git a/services/src/main/java/io/druid/cli/CliRouter.java b/services/src/main/java/io/druid/cli/CliRouter.java index aa28951c836d..c990e080f3ba 100644 --- a/services/src/main/java/io/druid/cli/CliRouter.java +++ b/services/src/main/java/io/druid/cli/CliRouter.java @@ -26,10 +26,12 @@ import com.google.inject.Provides; import com.google.inject.TypeLiteral; import com.google.inject.name.Names; +import com.metamx.http.client.HttpClient; import io.airlift.airline.Command; import io.druid.curator.discovery.DiscoveryModule; import io.druid.curator.discovery.ServerDiscoveryFactory; import io.druid.curator.discovery.ServerDiscoverySelector; +import io.druid.discovery.DruidLeaderClient; import io.druid.discovery.DruidNodeDiscoveryProvider; import io.druid.guice.Jerseys; import io.druid.guice.JsonConfigProvider; @@ -39,6 +41,7 @@ import io.druid.guice.QueryRunnerFactoryModule; import io.druid.guice.QueryableModule; import io.druid.guice.RouterProcessingModule; +import io.druid.guice.annotations.Global; import io.druid.guice.annotations.Self; import io.druid.guice.http.JettyHttpClientModule; import io.druid.java.util.common.logger.Logger; @@ -131,6 +134,23 @@ public ServerDiscoverySelector getCoordinatorServerDiscoverySelector( { return factory.createSelector(config.getCoordinatorServiceName()); } + + @Provides + @LazySingleton + public DruidLeaderClient getLeaderHttpClient( + @Global HttpClient httpClient, + DruidNodeDiscoveryProvider druidNodeDiscoveryProvider, + ServerDiscoverySelector serverDiscoverySelector + ) + { + return new DruidLeaderClient( + httpClient, + druidNodeDiscoveryProvider, + DruidNodeDiscoveryProvider.NODE_TYPE_COORDINATOR, + "/druid/coordinator/v1/leader", + serverDiscoverySelector + ); + } }, new LookupModule() ); From 7d4fd6d5ca8fd89729ae4beca48aed4427037dde Mon Sep 17 00:00:00 2001 From: Himanshu Gupta Date: Tue, 5 Sep 2017 09:50:59 -0500 Subject: [PATCH 2/5] CuratorDruidNodeDiscovery.getAllNodes() best effort 30 sec wait for cache initialization --- .../CuratorDruidNodeDiscoveryProvider.java | 28 ++++++++++++++----- 1 file changed, 21 insertions(+), 7 deletions(-) diff --git a/server/src/main/java/io/druid/curator/discovery/CuratorDruidNodeDiscoveryProvider.java b/server/src/main/java/io/druid/curator/discovery/CuratorDruidNodeDiscoveryProvider.java index b4e4827760ad..ce8437235f58 100644 --- a/server/src/main/java/io/druid/curator/discovery/CuratorDruidNodeDiscoveryProvider.java +++ b/server/src/main/java/io/druid/curator/discovery/CuratorDruidNodeDiscoveryProvider.java @@ -48,6 +48,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; @@ -168,7 +169,7 @@ private static class NodeTypeWatcher implements DruidNodeDiscovery private final Object lock = new Object(); - private boolean cacheInitialized = false; + private CountDownLatch cacheInitialized = new CountDownLatch(1); NodeTypeWatcher( ExecutorService listenerExecutor, @@ -197,6 +198,9 @@ private static class NodeTypeWatcher implements DruidNodeDiscovery @Override public Collection getAllNodes() { + if (!isCacheInitialized(30, TimeUnit.SECONDS)) { + log.info("cache is not initialized yet. getAllNodes() might not return full information."); + } return Collections.unmodifiableCollection(nodes.values()); } @@ -204,7 +208,7 @@ public Collection getAllNodes() public void registerListener(DruidNodeDiscovery.Listener listener) { synchronized (lock) { - if (cacheInitialized) { + if (isCacheInitialized(1, TimeUnit.MICROSECONDS)) { ImmutableList currNodes = ImmutableList.copyOf(nodes.values()); safeSchedule( () -> { @@ -278,15 +282,13 @@ public void handleChildEvent(CuratorFramework client, PathChildrenCacheEvent eve break; } case INITIALIZED: { - if (cacheInitialized) { + if (isCacheInitialized(1, TimeUnit.MICROSECONDS)) { log.warn("cache is already initialized. ignoring [%s] event, nodeType [%s].", event.getType(), nodeType); return; } log.info("Received INITIALIZED in node watcher for type [%s].", nodeType); - cacheInitialized = true; - ImmutableList currNodes = ImmutableList.copyOf(nodes.values()); for (Listener l : nodeListeners) { safeSchedule( @@ -297,6 +299,7 @@ public void handleChildEvent(CuratorFramework client, PathChildrenCacheEvent eve ); } + cacheInitialized.countDown(); break; } default: { @@ -310,6 +313,17 @@ public void handleChildEvent(CuratorFramework client, PathChildrenCacheEvent eve } } + private boolean isCacheInitialized(long waitFor, TimeUnit timeUnit) + { + try { + return cacheInitialized.await(waitFor, timeUnit); + } + catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + return false; + } + } + private void safeSchedule( Runnable runnable, String errMsgFormat, Object... args @@ -329,7 +343,7 @@ private void addNode(DiscoveryDruidNode druidNode) { DiscoveryDruidNode prev = nodes.putIfAbsent(druidNode.getDruidNode().getHostAndPortToUse(), druidNode); if (prev == null) { - if (cacheInitialized) { + if (isCacheInitialized(1, TimeUnit.MICROSECONDS)) { List newNode = ImmutableList.of(druidNode); for (Listener l : nodeListeners) { safeSchedule( @@ -354,7 +368,7 @@ private void removeNode(DiscoveryDruidNode druidNode) return; } - if (cacheInitialized) { + if (isCacheInitialized(1, TimeUnit.MICROSECONDS)) { List nodeRemoved = ImmutableList.of(druidNode); for (Listener l : nodeListeners) { safeSchedule( From 623d81db65ded94a6929c54ca6420c74502c8f6d Mon Sep 17 00:00:00 2001 From: Himanshu Gupta Date: Tue, 5 Sep 2017 10:05:16 -0500 Subject: [PATCH 3/5] DruidLeaderClientProvider to eagerly instantiate DruidNodeDiscovery when needed so that DruidNodeDiscovery impl cache gets initialized well in time --- .../RemoteTaskActionClientFactory.java | 5 +- .../client/coordinator/CoordinatorClient.java | 5 +- .../indexing/IndexingServiceClient.java | 5 +- .../io/druid/discovery/DruidLeaderClient.java | 19 ++--- .../discovery/DruidLeaderClientProvider.java | 74 +++++++++++++++++++ .../guice/CoordinatorDiscoveryModule.java | 6 +- .../guice/IndexingServiceDiscoveryModule.java | 6 +- .../server/http/OverlordProxyServlet.java | 5 +- .../server/router/CoordinatorRuleManager.java | 5 +- .../discovery/DruidLeaderClientTest.java | 32 ++------ .../server/http/OverlordProxyServletTest.java | 7 +- .../src/main/java/io/druid/cli/CliRouter.java | 6 +- 12 files changed, 120 insertions(+), 55 deletions(-) create mode 100644 server/src/main/java/io/druid/discovery/DruidLeaderClientProvider.java diff --git a/indexing-service/src/main/java/io/druid/indexing/common/actions/RemoteTaskActionClientFactory.java b/indexing-service/src/main/java/io/druid/indexing/common/actions/RemoteTaskActionClientFactory.java index efb5f6189319..c81e73da20ea 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/actions/RemoteTaskActionClientFactory.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/actions/RemoteTaskActionClientFactory.java @@ -23,6 +23,7 @@ import com.google.inject.Inject; import io.druid.client.indexing.IndexingService; import io.druid.discovery.DruidLeaderClient; +import io.druid.discovery.DruidLeaderClientProvider; import io.druid.indexing.common.RetryPolicyFactory; import io.druid.indexing.common.task.Task; @@ -36,12 +37,12 @@ public class RemoteTaskActionClientFactory implements TaskActionClientFactory @Inject public RemoteTaskActionClientFactory( - @IndexingService DruidLeaderClient leaderHttpClient, + @IndexingService DruidLeaderClientProvider druidLeaderClientProvider, RetryPolicyFactory retryPolicyFactory, ObjectMapper jsonMapper ) { - this.druidLeaderClient = leaderHttpClient; + this.druidLeaderClient = druidLeaderClientProvider.get(); this.retryPolicyFactory = retryPolicyFactory; this.jsonMapper = jsonMapper; } diff --git a/server/src/main/java/io/druid/client/coordinator/CoordinatorClient.java b/server/src/main/java/io/druid/client/coordinator/CoordinatorClient.java index 20b65f065bef..d3c8612d2475 100644 --- a/server/src/main/java/io/druid/client/coordinator/CoordinatorClient.java +++ b/server/src/main/java/io/druid/client/coordinator/CoordinatorClient.java @@ -26,6 +26,7 @@ import com.metamx.http.client.response.FullResponseHolder; import io.druid.client.ImmutableSegmentLoadInfo; import io.druid.discovery.DruidLeaderClient; +import io.druid.discovery.DruidLeaderClientProvider; import io.druid.java.util.common.ISE; import io.druid.java.util.common.StringUtils; import org.jboss.netty.handler.codec.http.HttpMethod; @@ -42,11 +43,11 @@ public class CoordinatorClient @Inject public CoordinatorClient( ObjectMapper jsonMapper, - @Coordinator DruidLeaderClient druidLeaderClient + @Coordinator DruidLeaderClientProvider druidLeaderClientProvider ) { this.jsonMapper = jsonMapper; - this.druidLeaderClient = druidLeaderClient; + this.druidLeaderClient = druidLeaderClientProvider.get(); } diff --git a/server/src/main/java/io/druid/client/indexing/IndexingServiceClient.java b/server/src/main/java/io/druid/client/indexing/IndexingServiceClient.java index 71a03a5a41df..2e17106c5e77 100644 --- a/server/src/main/java/io/druid/client/indexing/IndexingServiceClient.java +++ b/server/src/main/java/io/druid/client/indexing/IndexingServiceClient.java @@ -23,6 +23,7 @@ import com.google.common.base.Throwables; import com.google.inject.Inject; import io.druid.discovery.DruidLeaderClient; +import io.druid.discovery.DruidLeaderClientProvider; import io.druid.java.util.common.IAE; import io.druid.timeline.DataSegment; import org.jboss.netty.handler.codec.http.HttpMethod; @@ -40,11 +41,11 @@ public class IndexingServiceClient @Inject public IndexingServiceClient( ObjectMapper jsonMapper, - @IndexingService DruidLeaderClient druidLeaderClient + @IndexingService DruidLeaderClientProvider druidLeaderClientProvider ) { this.jsonMapper = jsonMapper; - this.druidLeaderClient = druidLeaderClient; + this.druidLeaderClient = druidLeaderClientProvider.get(); } public void mergeSegments(List segments) diff --git a/server/src/main/java/io/druid/discovery/DruidLeaderClient.java b/server/src/main/java/io/druid/discovery/DruidLeaderClient.java index bb41619ea1a2..c3467606437c 100644 --- a/server/src/main/java/io/druid/discovery/DruidLeaderClient.java +++ b/server/src/main/java/io/druid/discovery/DruidLeaderClient.java @@ -39,8 +39,10 @@ import java.util.concurrent.atomic.AtomicReference; /** - * This class facilitates interaction with Coordinator/Overlord leader nodes. Instance of this class is injected - * via Guice with annotations @Coordinator or @IndexingService . + * This class facilitates interaction with Coordinator/Overlord leader nodes. Instance of this class obtained by + * getting DruidLeaderClientProvider injected (via Guice with annotations @Coordinator or @IndexingService) and + * calling get() on it. + * * Usage: * Request request = druidLeaderClient.makeRequest(HttpMethod, requestPath) * request.setXXX(..) @@ -53,8 +55,7 @@ public class DruidLeaderClient private static final int MAX_RETRIES = 3; private final HttpClient httpClient; - private final DruidNodeDiscoveryProvider druidNodeDiscoveryProvider; - private final String nodeTypeToWatch; + private final DruidNodeDiscovery druidNodeDiscovery; private final String leaderRequestPath; @@ -65,15 +66,13 @@ public class DruidLeaderClient public DruidLeaderClient( HttpClient httpClient, - DruidNodeDiscoveryProvider druidNodeDiscoveryProvider, - String nodeTypeToWatch, + DruidNodeDiscovery druidNodeDiscovery, String leaderRequestPath, ServerDiscoverySelector serverDiscoverySelector ) { this.httpClient = httpClient; - this.druidNodeDiscoveryProvider = druidNodeDiscoveryProvider; - this.nodeTypeToWatch = nodeTypeToWatch; + this.druidNodeDiscovery = druidNodeDiscovery; this.leaderRequestPath = leaderRequestPath; this.serverDiscoverySelector = serverDiscoverySelector; } @@ -204,9 +203,7 @@ private String pickOneHost() ); } - Iterator iter = druidNodeDiscoveryProvider.getForNodeType(nodeTypeToWatch) - .getAllNodes() - .iterator(); + Iterator iter = druidNodeDiscovery.getAllNodes().iterator(); if (iter.hasNext()) { DiscoveryDruidNode node = iter.next(); return StringUtils.format( diff --git a/server/src/main/java/io/druid/discovery/DruidLeaderClientProvider.java b/server/src/main/java/io/druid/discovery/DruidLeaderClientProvider.java new file mode 100644 index 000000000000..e32bd54403d6 --- /dev/null +++ b/server/src/main/java/io/druid/discovery/DruidLeaderClientProvider.java @@ -0,0 +1,74 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.discovery; + +import com.metamx.http.client.HttpClient; +import io.druid.curator.discovery.ServerDiscoverySelector; + +import java.util.concurrent.atomic.AtomicReference; + +/** + */ +public class DruidLeaderClientProvider +{ + private final AtomicReference instanceRef = new AtomicReference<>(); + + private final HttpClient httpClient; + private final DruidNodeDiscoveryProvider druidNodeDiscoveryProvider; + private final String nodeTypeToWatch; + + private final String leaderRequestPath; + + private final ServerDiscoverySelector serverDiscoverySelector; + + public DruidLeaderClientProvider( + HttpClient httpClient, + DruidNodeDiscoveryProvider druidNodeDiscoveryProvider, + String nodeTypeToWatch, + String leaderRequestPath, + ServerDiscoverySelector serverDiscoverySelector + ) + { + this.httpClient = httpClient; + this.druidNodeDiscoveryProvider = druidNodeDiscoveryProvider; + this.nodeTypeToWatch = nodeTypeToWatch; + this.leaderRequestPath = leaderRequestPath; + this.serverDiscoverySelector = serverDiscoverySelector; + } + + public DruidLeaderClient get() + { + return instanceRef.accumulateAndGet( + null, + (current, given) -> { + if (current != null) { + return current; + } + + return new DruidLeaderClient( + httpClient, + druidNodeDiscoveryProvider.getForNodeType(nodeTypeToWatch), + leaderRequestPath, + serverDiscoverySelector + ); + } + ); + } +} diff --git a/server/src/main/java/io/druid/guice/CoordinatorDiscoveryModule.java b/server/src/main/java/io/druid/guice/CoordinatorDiscoveryModule.java index 3c147a425f5f..9d255c5d10b4 100644 --- a/server/src/main/java/io/druid/guice/CoordinatorDiscoveryModule.java +++ b/server/src/main/java/io/druid/guice/CoordinatorDiscoveryModule.java @@ -27,7 +27,7 @@ import io.druid.client.coordinator.CoordinatorSelectorConfig; import io.druid.curator.discovery.ServerDiscoveryFactory; import io.druid.curator.discovery.ServerDiscoverySelector; -import io.druid.discovery.DruidLeaderClient; +import io.druid.discovery.DruidLeaderClientProvider; import io.druid.discovery.DruidNodeDiscoveryProvider; import io.druid.guice.annotations.Global; @@ -55,13 +55,13 @@ public ServerDiscoverySelector getServiceProvider( @Provides @Coordinator @LazySingleton - public DruidLeaderClient getLeaderHttpClient( + public DruidLeaderClientProvider getLeaderHttpClient( @Global HttpClient httpClient, DruidNodeDiscoveryProvider druidNodeDiscoveryProvider, @Coordinator ServerDiscoverySelector serverDiscoverySelector ) { - return new DruidLeaderClient( + return new DruidLeaderClientProvider( httpClient, druidNodeDiscoveryProvider, DruidNodeDiscoveryProvider.NODE_TYPE_COORDINATOR, diff --git a/server/src/main/java/io/druid/guice/IndexingServiceDiscoveryModule.java b/server/src/main/java/io/druid/guice/IndexingServiceDiscoveryModule.java index bfdf95e84af8..b56013c77bbc 100644 --- a/server/src/main/java/io/druid/guice/IndexingServiceDiscoveryModule.java +++ b/server/src/main/java/io/druid/guice/IndexingServiceDiscoveryModule.java @@ -27,7 +27,7 @@ import io.druid.client.indexing.IndexingServiceSelectorConfig; import io.druid.curator.discovery.ServerDiscoveryFactory; import io.druid.curator.discovery.ServerDiscoverySelector; -import io.druid.discovery.DruidLeaderClient; +import io.druid.discovery.DruidLeaderClientProvider; import io.druid.discovery.DruidNodeDiscoveryProvider; import io.druid.guice.annotations.Global; @@ -55,13 +55,13 @@ public ServerDiscoverySelector getServiceProvider( @Provides @IndexingService @LazySingleton - public DruidLeaderClient getLeaderHttpClient( + public DruidLeaderClientProvider getLeaderHttpClient( @Global HttpClient httpClient, DruidNodeDiscoveryProvider druidNodeDiscoveryProvider, @IndexingService ServerDiscoverySelector serverDiscoverySelector ) { - return new DruidLeaderClient( + return new DruidLeaderClientProvider( httpClient, druidNodeDiscoveryProvider, DruidNodeDiscoveryProvider.NODE_TYPE_OVERLORD, diff --git a/server/src/main/java/io/druid/server/http/OverlordProxyServlet.java b/server/src/main/java/io/druid/server/http/OverlordProxyServlet.java index 844edf2dbff6..dab95f01a7c0 100644 --- a/server/src/main/java/io/druid/server/http/OverlordProxyServlet.java +++ b/server/src/main/java/io/druid/server/http/OverlordProxyServlet.java @@ -23,6 +23,7 @@ import com.google.inject.Inject; import io.druid.client.indexing.IndexingService; import io.druid.discovery.DruidLeaderClient; +import io.druid.discovery.DruidLeaderClientProvider; import io.druid.java.util.common.ISE; import org.eclipse.jetty.proxy.ProxyServlet; @@ -39,10 +40,10 @@ public class OverlordProxyServlet extends ProxyServlet @Inject OverlordProxyServlet( - @IndexingService DruidLeaderClient druidLeaderClient + @IndexingService DruidLeaderClientProvider druidLeaderClientProvider ) { - this.druidLeaderClient = druidLeaderClient; + this.druidLeaderClient = druidLeaderClientProvider.get(); } @Override diff --git a/server/src/main/java/io/druid/server/router/CoordinatorRuleManager.java b/server/src/main/java/io/druid/server/router/CoordinatorRuleManager.java index 70a0148c7aa5..d363bfd2c60e 100644 --- a/server/src/main/java/io/druid/server/router/CoordinatorRuleManager.java +++ b/server/src/main/java/io/druid/server/router/CoordinatorRuleManager.java @@ -27,6 +27,7 @@ import com.metamx.http.client.response.FullResponseHolder; import io.druid.concurrent.Execs; import io.druid.discovery.DruidLeaderClient; +import io.druid.discovery.DruidLeaderClientProvider; import io.druid.guice.ManageLifecycle; import io.druid.guice.annotations.Json; import io.druid.java.util.common.ISE; @@ -69,12 +70,12 @@ public class CoordinatorRuleManager public CoordinatorRuleManager( @Json ObjectMapper jsonMapper, Supplier config, - DruidLeaderClient druidLeaderClient + DruidLeaderClientProvider druidLeaderClientProvider ) { this.jsonMapper = jsonMapper; this.config = config; - this.druidLeaderClient = druidLeaderClient; + this.druidLeaderClient = druidLeaderClientProvider.get(); this.rules = new AtomicReference<>( new ConcurrentHashMap>() diff --git a/server/src/test/java/io/druid/discovery/DruidLeaderClientTest.java b/server/src/test/java/io/druid/discovery/DruidLeaderClientTest.java index 630412a5c27c..ef01cdabeb87 100644 --- a/server/src/test/java/io/druid/discovery/DruidLeaderClientTest.java +++ b/server/src/test/java/io/druid/discovery/DruidLeaderClientTest.java @@ -109,15 +109,11 @@ public void testSimple() throws Exception ImmutableList.of(discoveryDruidNode) ); - DruidNodeDiscoveryProvider druidNodeDiscoveryProvider = EasyMock.createMock(DruidNodeDiscoveryProvider.class); - EasyMock.expect(druidNodeDiscoveryProvider.getForNodeType("nodetype")).andReturn(druidNodeDiscovery); - - EasyMock.replay(druidNodeDiscovery, druidNodeDiscoveryProvider); + EasyMock.replay(druidNodeDiscovery); DruidLeaderClient druidLeaderClient = new DruidLeaderClient( httpClient, - druidNodeDiscoveryProvider, - "nodetype", + druidNodeDiscovery, "/simple/leader", EasyMock.createNiceMock(ServerDiscoverySelector.class) ); @@ -135,15 +131,11 @@ public void testRedirection() throws Exception ImmutableList.of(discoveryDruidNode) ); - DruidNodeDiscoveryProvider druidNodeDiscoveryProvider = EasyMock.createMock(DruidNodeDiscoveryProvider.class); - EasyMock.expect(druidNodeDiscoveryProvider.getForNodeType("nodetype")).andReturn(druidNodeDiscovery); - - EasyMock.replay(druidNodeDiscovery, druidNodeDiscoveryProvider); + EasyMock.replay(druidNodeDiscovery); DruidLeaderClient druidLeaderClient = new DruidLeaderClient( httpClient, - druidNodeDiscoveryProvider, - "nodetype", + druidNodeDiscovery, "/simple/leader", EasyMock.createNiceMock(ServerDiscoverySelector.class) ); @@ -171,15 +163,11 @@ public void testServerFailureAndRedirect() throws Exception ImmutableList.of(discoveryDruidNode) ); - DruidNodeDiscoveryProvider druidNodeDiscoveryProvider = EasyMock.createMock(DruidNodeDiscoveryProvider.class); - EasyMock.expect(druidNodeDiscoveryProvider.getForNodeType("nodetype")).andReturn(druidNodeDiscovery).anyTimes(); - - EasyMock.replay(serverDiscoverySelector, druidNodeDiscovery, druidNodeDiscoveryProvider); + EasyMock.replay(serverDiscoverySelector, druidNodeDiscovery); DruidLeaderClient druidLeaderClient = new DruidLeaderClient( httpClient, - druidNodeDiscoveryProvider, - "nodetype", + druidNodeDiscovery, "/simple/leader", serverDiscoverySelector ); @@ -197,15 +185,11 @@ public void testFindCurrentLeader() throws Exception ImmutableList.of(discoveryDruidNode) ); - DruidNodeDiscoveryProvider druidNodeDiscoveryProvider = EasyMock.createMock(DruidNodeDiscoveryProvider.class); - EasyMock.expect(druidNodeDiscoveryProvider.getForNodeType("nodetype")).andReturn(druidNodeDiscovery); - - EasyMock.replay(druidNodeDiscovery, druidNodeDiscoveryProvider); + EasyMock.replay(druidNodeDiscovery); DruidLeaderClient druidLeaderClient = new DruidLeaderClient( httpClient, - druidNodeDiscoveryProvider, - "nodetype", + druidNodeDiscovery, "/simple/leader", EasyMock.createNiceMock(ServerDiscoverySelector.class) ); diff --git a/server/src/test/java/io/druid/server/http/OverlordProxyServletTest.java b/server/src/test/java/io/druid/server/http/OverlordProxyServletTest.java index 120a529ddb20..e0e11dd650d5 100644 --- a/server/src/test/java/io/druid/server/http/OverlordProxyServletTest.java +++ b/server/src/test/java/io/druid/server/http/OverlordProxyServletTest.java @@ -20,6 +20,7 @@ package io.druid.server.http; import io.druid.discovery.DruidLeaderClient; +import io.druid.discovery.DruidLeaderClientProvider; import org.easymock.EasyMock; import org.junit.Assert; import org.junit.Test; @@ -33,6 +34,10 @@ public class OverlordProxyServletTest public void testRewriteURI() { DruidLeaderClient druidLeaderClient = EasyMock.createMock(DruidLeaderClient.class); + DruidLeaderClientProvider druidLeaderClientProvider = EasyMock.createMock(DruidLeaderClientProvider.class); + EasyMock.expect(druidLeaderClientProvider.get()).andReturn(druidLeaderClient); + EasyMock.replay(druidLeaderClientProvider); + EasyMock.expect(druidLeaderClient.findCurrentLeader()).andReturn("overlord:port"); HttpServletRequest request = EasyMock.createMock(HttpServletRequest.class); @@ -42,7 +47,7 @@ public void testRewriteURI() EasyMock.replay(druidLeaderClient, request); - URI uri = URI.create(new OverlordProxyServlet(druidLeaderClient).rewriteTarget(request)); + URI uri = URI.create(new OverlordProxyServlet(druidLeaderClientProvider).rewriteTarget(request)); Assert.assertEquals("https://overlord:port/druid/overlord/worker?param1=test¶m2=test2", uri.toString()); } diff --git a/services/src/main/java/io/druid/cli/CliRouter.java b/services/src/main/java/io/druid/cli/CliRouter.java index c990e080f3ba..3d21278cda0d 100644 --- a/services/src/main/java/io/druid/cli/CliRouter.java +++ b/services/src/main/java/io/druid/cli/CliRouter.java @@ -31,7 +31,7 @@ import io.druid.curator.discovery.DiscoveryModule; import io.druid.curator.discovery.ServerDiscoveryFactory; import io.druid.curator.discovery.ServerDiscoverySelector; -import io.druid.discovery.DruidLeaderClient; +import io.druid.discovery.DruidLeaderClientProvider; import io.druid.discovery.DruidNodeDiscoveryProvider; import io.druid.guice.Jerseys; import io.druid.guice.JsonConfigProvider; @@ -137,13 +137,13 @@ public ServerDiscoverySelector getCoordinatorServerDiscoverySelector( @Provides @LazySingleton - public DruidLeaderClient getLeaderHttpClient( + public DruidLeaderClientProvider getLeaderHttpClient( @Global HttpClient httpClient, DruidNodeDiscoveryProvider druidNodeDiscoveryProvider, ServerDiscoverySelector serverDiscoverySelector ) { - return new DruidLeaderClient( + return new DruidLeaderClientProvider( httpClient, druidNodeDiscoveryProvider, DruidNodeDiscoveryProvider.NODE_TYPE_COORDINATOR, From 37b3db45dc2d81e63dcbc8074ac364a12b5ade83 Mon Sep 17 00:00:00 2001 From: Himanshu Gupta Date: Tue, 5 Sep 2017 10:23:29 -0500 Subject: [PATCH 4/5] Revert "DruidLeaderClientProvider to eagerly instantiate DruidNodeDiscovery when needed so that DruidNodeDiscovery impl cache gets initialized well in time" This reverts commit f1a2432614ba56ddc2d55fe47e990d17fcfd6129. --- .../RemoteTaskActionClientFactory.java | 5 +- .../client/coordinator/CoordinatorClient.java | 5 +- .../indexing/IndexingServiceClient.java | 5 +- .../io/druid/discovery/DruidLeaderClient.java | 19 +++-- .../discovery/DruidLeaderClientProvider.java | 74 ------------------- .../guice/CoordinatorDiscoveryModule.java | 6 +- .../guice/IndexingServiceDiscoveryModule.java | 6 +- .../server/http/OverlordProxyServlet.java | 5 +- .../server/router/CoordinatorRuleManager.java | 5 +- .../discovery/DruidLeaderClientTest.java | 32 ++++++-- .../server/http/OverlordProxyServletTest.java | 7 +- .../src/main/java/io/druid/cli/CliRouter.java | 6 +- 12 files changed, 55 insertions(+), 120 deletions(-) delete mode 100644 server/src/main/java/io/druid/discovery/DruidLeaderClientProvider.java diff --git a/indexing-service/src/main/java/io/druid/indexing/common/actions/RemoteTaskActionClientFactory.java b/indexing-service/src/main/java/io/druid/indexing/common/actions/RemoteTaskActionClientFactory.java index c81e73da20ea..efb5f6189319 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/actions/RemoteTaskActionClientFactory.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/actions/RemoteTaskActionClientFactory.java @@ -23,7 +23,6 @@ import com.google.inject.Inject; import io.druid.client.indexing.IndexingService; import io.druid.discovery.DruidLeaderClient; -import io.druid.discovery.DruidLeaderClientProvider; import io.druid.indexing.common.RetryPolicyFactory; import io.druid.indexing.common.task.Task; @@ -37,12 +36,12 @@ public class RemoteTaskActionClientFactory implements TaskActionClientFactory @Inject public RemoteTaskActionClientFactory( - @IndexingService DruidLeaderClientProvider druidLeaderClientProvider, + @IndexingService DruidLeaderClient leaderHttpClient, RetryPolicyFactory retryPolicyFactory, ObjectMapper jsonMapper ) { - this.druidLeaderClient = druidLeaderClientProvider.get(); + this.druidLeaderClient = leaderHttpClient; this.retryPolicyFactory = retryPolicyFactory; this.jsonMapper = jsonMapper; } diff --git a/server/src/main/java/io/druid/client/coordinator/CoordinatorClient.java b/server/src/main/java/io/druid/client/coordinator/CoordinatorClient.java index d3c8612d2475..20b65f065bef 100644 --- a/server/src/main/java/io/druid/client/coordinator/CoordinatorClient.java +++ b/server/src/main/java/io/druid/client/coordinator/CoordinatorClient.java @@ -26,7 +26,6 @@ import com.metamx.http.client.response.FullResponseHolder; import io.druid.client.ImmutableSegmentLoadInfo; import io.druid.discovery.DruidLeaderClient; -import io.druid.discovery.DruidLeaderClientProvider; import io.druid.java.util.common.ISE; import io.druid.java.util.common.StringUtils; import org.jboss.netty.handler.codec.http.HttpMethod; @@ -43,11 +42,11 @@ public class CoordinatorClient @Inject public CoordinatorClient( ObjectMapper jsonMapper, - @Coordinator DruidLeaderClientProvider druidLeaderClientProvider + @Coordinator DruidLeaderClient druidLeaderClient ) { this.jsonMapper = jsonMapper; - this.druidLeaderClient = druidLeaderClientProvider.get(); + this.druidLeaderClient = druidLeaderClient; } diff --git a/server/src/main/java/io/druid/client/indexing/IndexingServiceClient.java b/server/src/main/java/io/druid/client/indexing/IndexingServiceClient.java index 2e17106c5e77..71a03a5a41df 100644 --- a/server/src/main/java/io/druid/client/indexing/IndexingServiceClient.java +++ b/server/src/main/java/io/druid/client/indexing/IndexingServiceClient.java @@ -23,7 +23,6 @@ import com.google.common.base.Throwables; import com.google.inject.Inject; import io.druid.discovery.DruidLeaderClient; -import io.druid.discovery.DruidLeaderClientProvider; import io.druid.java.util.common.IAE; import io.druid.timeline.DataSegment; import org.jboss.netty.handler.codec.http.HttpMethod; @@ -41,11 +40,11 @@ public class IndexingServiceClient @Inject public IndexingServiceClient( ObjectMapper jsonMapper, - @IndexingService DruidLeaderClientProvider druidLeaderClientProvider + @IndexingService DruidLeaderClient druidLeaderClient ) { this.jsonMapper = jsonMapper; - this.druidLeaderClient = druidLeaderClientProvider.get(); + this.druidLeaderClient = druidLeaderClient; } public void mergeSegments(List segments) diff --git a/server/src/main/java/io/druid/discovery/DruidLeaderClient.java b/server/src/main/java/io/druid/discovery/DruidLeaderClient.java index c3467606437c..bb41619ea1a2 100644 --- a/server/src/main/java/io/druid/discovery/DruidLeaderClient.java +++ b/server/src/main/java/io/druid/discovery/DruidLeaderClient.java @@ -39,10 +39,8 @@ import java.util.concurrent.atomic.AtomicReference; /** - * This class facilitates interaction with Coordinator/Overlord leader nodes. Instance of this class obtained by - * getting DruidLeaderClientProvider injected (via Guice with annotations @Coordinator or @IndexingService) and - * calling get() on it. - * + * This class facilitates interaction with Coordinator/Overlord leader nodes. Instance of this class is injected + * via Guice with annotations @Coordinator or @IndexingService . * Usage: * Request request = druidLeaderClient.makeRequest(HttpMethod, requestPath) * request.setXXX(..) @@ -55,7 +53,8 @@ public class DruidLeaderClient private static final int MAX_RETRIES = 3; private final HttpClient httpClient; - private final DruidNodeDiscovery druidNodeDiscovery; + private final DruidNodeDiscoveryProvider druidNodeDiscoveryProvider; + private final String nodeTypeToWatch; private final String leaderRequestPath; @@ -66,13 +65,15 @@ public class DruidLeaderClient public DruidLeaderClient( HttpClient httpClient, - DruidNodeDiscovery druidNodeDiscovery, + DruidNodeDiscoveryProvider druidNodeDiscoveryProvider, + String nodeTypeToWatch, String leaderRequestPath, ServerDiscoverySelector serverDiscoverySelector ) { this.httpClient = httpClient; - this.druidNodeDiscovery = druidNodeDiscovery; + this.druidNodeDiscoveryProvider = druidNodeDiscoveryProvider; + this.nodeTypeToWatch = nodeTypeToWatch; this.leaderRequestPath = leaderRequestPath; this.serverDiscoverySelector = serverDiscoverySelector; } @@ -203,7 +204,9 @@ private String pickOneHost() ); } - Iterator iter = druidNodeDiscovery.getAllNodes().iterator(); + Iterator iter = druidNodeDiscoveryProvider.getForNodeType(nodeTypeToWatch) + .getAllNodes() + .iterator(); if (iter.hasNext()) { DiscoveryDruidNode node = iter.next(); return StringUtils.format( diff --git a/server/src/main/java/io/druid/discovery/DruidLeaderClientProvider.java b/server/src/main/java/io/druid/discovery/DruidLeaderClientProvider.java deleted file mode 100644 index e32bd54403d6..000000000000 --- a/server/src/main/java/io/druid/discovery/DruidLeaderClientProvider.java +++ /dev/null @@ -1,74 +0,0 @@ -/* - * Licensed to Metamarkets Group Inc. (Metamarkets) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. Metamarkets licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package io.druid.discovery; - -import com.metamx.http.client.HttpClient; -import io.druid.curator.discovery.ServerDiscoverySelector; - -import java.util.concurrent.atomic.AtomicReference; - -/** - */ -public class DruidLeaderClientProvider -{ - private final AtomicReference instanceRef = new AtomicReference<>(); - - private final HttpClient httpClient; - private final DruidNodeDiscoveryProvider druidNodeDiscoveryProvider; - private final String nodeTypeToWatch; - - private final String leaderRequestPath; - - private final ServerDiscoverySelector serverDiscoverySelector; - - public DruidLeaderClientProvider( - HttpClient httpClient, - DruidNodeDiscoveryProvider druidNodeDiscoveryProvider, - String nodeTypeToWatch, - String leaderRequestPath, - ServerDiscoverySelector serverDiscoverySelector - ) - { - this.httpClient = httpClient; - this.druidNodeDiscoveryProvider = druidNodeDiscoveryProvider; - this.nodeTypeToWatch = nodeTypeToWatch; - this.leaderRequestPath = leaderRequestPath; - this.serverDiscoverySelector = serverDiscoverySelector; - } - - public DruidLeaderClient get() - { - return instanceRef.accumulateAndGet( - null, - (current, given) -> { - if (current != null) { - return current; - } - - return new DruidLeaderClient( - httpClient, - druidNodeDiscoveryProvider.getForNodeType(nodeTypeToWatch), - leaderRequestPath, - serverDiscoverySelector - ); - } - ); - } -} diff --git a/server/src/main/java/io/druid/guice/CoordinatorDiscoveryModule.java b/server/src/main/java/io/druid/guice/CoordinatorDiscoveryModule.java index 9d255c5d10b4..3c147a425f5f 100644 --- a/server/src/main/java/io/druid/guice/CoordinatorDiscoveryModule.java +++ b/server/src/main/java/io/druid/guice/CoordinatorDiscoveryModule.java @@ -27,7 +27,7 @@ import io.druid.client.coordinator.CoordinatorSelectorConfig; import io.druid.curator.discovery.ServerDiscoveryFactory; import io.druid.curator.discovery.ServerDiscoverySelector; -import io.druid.discovery.DruidLeaderClientProvider; +import io.druid.discovery.DruidLeaderClient; import io.druid.discovery.DruidNodeDiscoveryProvider; import io.druid.guice.annotations.Global; @@ -55,13 +55,13 @@ public ServerDiscoverySelector getServiceProvider( @Provides @Coordinator @LazySingleton - public DruidLeaderClientProvider getLeaderHttpClient( + public DruidLeaderClient getLeaderHttpClient( @Global HttpClient httpClient, DruidNodeDiscoveryProvider druidNodeDiscoveryProvider, @Coordinator ServerDiscoverySelector serverDiscoverySelector ) { - return new DruidLeaderClientProvider( + return new DruidLeaderClient( httpClient, druidNodeDiscoveryProvider, DruidNodeDiscoveryProvider.NODE_TYPE_COORDINATOR, diff --git a/server/src/main/java/io/druid/guice/IndexingServiceDiscoveryModule.java b/server/src/main/java/io/druid/guice/IndexingServiceDiscoveryModule.java index b56013c77bbc..bfdf95e84af8 100644 --- a/server/src/main/java/io/druid/guice/IndexingServiceDiscoveryModule.java +++ b/server/src/main/java/io/druid/guice/IndexingServiceDiscoveryModule.java @@ -27,7 +27,7 @@ import io.druid.client.indexing.IndexingServiceSelectorConfig; import io.druid.curator.discovery.ServerDiscoveryFactory; import io.druid.curator.discovery.ServerDiscoverySelector; -import io.druid.discovery.DruidLeaderClientProvider; +import io.druid.discovery.DruidLeaderClient; import io.druid.discovery.DruidNodeDiscoveryProvider; import io.druid.guice.annotations.Global; @@ -55,13 +55,13 @@ public ServerDiscoverySelector getServiceProvider( @Provides @IndexingService @LazySingleton - public DruidLeaderClientProvider getLeaderHttpClient( + public DruidLeaderClient getLeaderHttpClient( @Global HttpClient httpClient, DruidNodeDiscoveryProvider druidNodeDiscoveryProvider, @IndexingService ServerDiscoverySelector serverDiscoverySelector ) { - return new DruidLeaderClientProvider( + return new DruidLeaderClient( httpClient, druidNodeDiscoveryProvider, DruidNodeDiscoveryProvider.NODE_TYPE_OVERLORD, diff --git a/server/src/main/java/io/druid/server/http/OverlordProxyServlet.java b/server/src/main/java/io/druid/server/http/OverlordProxyServlet.java index dab95f01a7c0..844edf2dbff6 100644 --- a/server/src/main/java/io/druid/server/http/OverlordProxyServlet.java +++ b/server/src/main/java/io/druid/server/http/OverlordProxyServlet.java @@ -23,7 +23,6 @@ import com.google.inject.Inject; import io.druid.client.indexing.IndexingService; import io.druid.discovery.DruidLeaderClient; -import io.druid.discovery.DruidLeaderClientProvider; import io.druid.java.util.common.ISE; import org.eclipse.jetty.proxy.ProxyServlet; @@ -40,10 +39,10 @@ public class OverlordProxyServlet extends ProxyServlet @Inject OverlordProxyServlet( - @IndexingService DruidLeaderClientProvider druidLeaderClientProvider + @IndexingService DruidLeaderClient druidLeaderClient ) { - this.druidLeaderClient = druidLeaderClientProvider.get(); + this.druidLeaderClient = druidLeaderClient; } @Override diff --git a/server/src/main/java/io/druid/server/router/CoordinatorRuleManager.java b/server/src/main/java/io/druid/server/router/CoordinatorRuleManager.java index d363bfd2c60e..70a0148c7aa5 100644 --- a/server/src/main/java/io/druid/server/router/CoordinatorRuleManager.java +++ b/server/src/main/java/io/druid/server/router/CoordinatorRuleManager.java @@ -27,7 +27,6 @@ import com.metamx.http.client.response.FullResponseHolder; import io.druid.concurrent.Execs; import io.druid.discovery.DruidLeaderClient; -import io.druid.discovery.DruidLeaderClientProvider; import io.druid.guice.ManageLifecycle; import io.druid.guice.annotations.Json; import io.druid.java.util.common.ISE; @@ -70,12 +69,12 @@ public class CoordinatorRuleManager public CoordinatorRuleManager( @Json ObjectMapper jsonMapper, Supplier config, - DruidLeaderClientProvider druidLeaderClientProvider + DruidLeaderClient druidLeaderClient ) { this.jsonMapper = jsonMapper; this.config = config; - this.druidLeaderClient = druidLeaderClientProvider.get(); + this.druidLeaderClient = druidLeaderClient; this.rules = new AtomicReference<>( new ConcurrentHashMap>() diff --git a/server/src/test/java/io/druid/discovery/DruidLeaderClientTest.java b/server/src/test/java/io/druid/discovery/DruidLeaderClientTest.java index ef01cdabeb87..630412a5c27c 100644 --- a/server/src/test/java/io/druid/discovery/DruidLeaderClientTest.java +++ b/server/src/test/java/io/druid/discovery/DruidLeaderClientTest.java @@ -109,11 +109,15 @@ public void testSimple() throws Exception ImmutableList.of(discoveryDruidNode) ); - EasyMock.replay(druidNodeDiscovery); + DruidNodeDiscoveryProvider druidNodeDiscoveryProvider = EasyMock.createMock(DruidNodeDiscoveryProvider.class); + EasyMock.expect(druidNodeDiscoveryProvider.getForNodeType("nodetype")).andReturn(druidNodeDiscovery); + + EasyMock.replay(druidNodeDiscovery, druidNodeDiscoveryProvider); DruidLeaderClient druidLeaderClient = new DruidLeaderClient( httpClient, - druidNodeDiscovery, + druidNodeDiscoveryProvider, + "nodetype", "/simple/leader", EasyMock.createNiceMock(ServerDiscoverySelector.class) ); @@ -131,11 +135,15 @@ public void testRedirection() throws Exception ImmutableList.of(discoveryDruidNode) ); - EasyMock.replay(druidNodeDiscovery); + DruidNodeDiscoveryProvider druidNodeDiscoveryProvider = EasyMock.createMock(DruidNodeDiscoveryProvider.class); + EasyMock.expect(druidNodeDiscoveryProvider.getForNodeType("nodetype")).andReturn(druidNodeDiscovery); + + EasyMock.replay(druidNodeDiscovery, druidNodeDiscoveryProvider); DruidLeaderClient druidLeaderClient = new DruidLeaderClient( httpClient, - druidNodeDiscovery, + druidNodeDiscoveryProvider, + "nodetype", "/simple/leader", EasyMock.createNiceMock(ServerDiscoverySelector.class) ); @@ -163,11 +171,15 @@ public void testServerFailureAndRedirect() throws Exception ImmutableList.of(discoveryDruidNode) ); - EasyMock.replay(serverDiscoverySelector, druidNodeDiscovery); + DruidNodeDiscoveryProvider druidNodeDiscoveryProvider = EasyMock.createMock(DruidNodeDiscoveryProvider.class); + EasyMock.expect(druidNodeDiscoveryProvider.getForNodeType("nodetype")).andReturn(druidNodeDiscovery).anyTimes(); + + EasyMock.replay(serverDiscoverySelector, druidNodeDiscovery, druidNodeDiscoveryProvider); DruidLeaderClient druidLeaderClient = new DruidLeaderClient( httpClient, - druidNodeDiscovery, + druidNodeDiscoveryProvider, + "nodetype", "/simple/leader", serverDiscoverySelector ); @@ -185,11 +197,15 @@ public void testFindCurrentLeader() throws Exception ImmutableList.of(discoveryDruidNode) ); - EasyMock.replay(druidNodeDiscovery); + DruidNodeDiscoveryProvider druidNodeDiscoveryProvider = EasyMock.createMock(DruidNodeDiscoveryProvider.class); + EasyMock.expect(druidNodeDiscoveryProvider.getForNodeType("nodetype")).andReturn(druidNodeDiscovery); + + EasyMock.replay(druidNodeDiscovery, druidNodeDiscoveryProvider); DruidLeaderClient druidLeaderClient = new DruidLeaderClient( httpClient, - druidNodeDiscovery, + druidNodeDiscoveryProvider, + "nodetype", "/simple/leader", EasyMock.createNiceMock(ServerDiscoverySelector.class) ); diff --git a/server/src/test/java/io/druid/server/http/OverlordProxyServletTest.java b/server/src/test/java/io/druid/server/http/OverlordProxyServletTest.java index e0e11dd650d5..120a529ddb20 100644 --- a/server/src/test/java/io/druid/server/http/OverlordProxyServletTest.java +++ b/server/src/test/java/io/druid/server/http/OverlordProxyServletTest.java @@ -20,7 +20,6 @@ package io.druid.server.http; import io.druid.discovery.DruidLeaderClient; -import io.druid.discovery.DruidLeaderClientProvider; import org.easymock.EasyMock; import org.junit.Assert; import org.junit.Test; @@ -34,10 +33,6 @@ public class OverlordProxyServletTest public void testRewriteURI() { DruidLeaderClient druidLeaderClient = EasyMock.createMock(DruidLeaderClient.class); - DruidLeaderClientProvider druidLeaderClientProvider = EasyMock.createMock(DruidLeaderClientProvider.class); - EasyMock.expect(druidLeaderClientProvider.get()).andReturn(druidLeaderClient); - EasyMock.replay(druidLeaderClientProvider); - EasyMock.expect(druidLeaderClient.findCurrentLeader()).andReturn("overlord:port"); HttpServletRequest request = EasyMock.createMock(HttpServletRequest.class); @@ -47,7 +42,7 @@ public void testRewriteURI() EasyMock.replay(druidLeaderClient, request); - URI uri = URI.create(new OverlordProxyServlet(druidLeaderClientProvider).rewriteTarget(request)); + URI uri = URI.create(new OverlordProxyServlet(druidLeaderClient).rewriteTarget(request)); Assert.assertEquals("https://overlord:port/druid/overlord/worker?param1=test¶m2=test2", uri.toString()); } diff --git a/services/src/main/java/io/druid/cli/CliRouter.java b/services/src/main/java/io/druid/cli/CliRouter.java index 3d21278cda0d..c990e080f3ba 100644 --- a/services/src/main/java/io/druid/cli/CliRouter.java +++ b/services/src/main/java/io/druid/cli/CliRouter.java @@ -31,7 +31,7 @@ import io.druid.curator.discovery.DiscoveryModule; import io.druid.curator.discovery.ServerDiscoveryFactory; import io.druid.curator.discovery.ServerDiscoverySelector; -import io.druid.discovery.DruidLeaderClientProvider; +import io.druid.discovery.DruidLeaderClient; import io.druid.discovery.DruidNodeDiscoveryProvider; import io.druid.guice.Jerseys; import io.druid.guice.JsonConfigProvider; @@ -137,13 +137,13 @@ public ServerDiscoverySelector getCoordinatorServerDiscoverySelector( @Provides @LazySingleton - public DruidLeaderClientProvider getLeaderHttpClient( + public DruidLeaderClient getLeaderHttpClient( @Global HttpClient httpClient, DruidNodeDiscoveryProvider druidNodeDiscoveryProvider, ServerDiscoverySelector serverDiscoverySelector ) { - return new DruidLeaderClientProvider( + return new DruidLeaderClient( httpClient, druidNodeDiscoveryProvider, DruidNodeDiscoveryProvider.NODE_TYPE_COORDINATOR, From 6f1b70259edfb91a639e63fab78048bd08ac3c96 Mon Sep 17 00:00:00 2001 From: Himanshu Gupta Date: Tue, 5 Sep 2017 10:36:17 -0500 Subject: [PATCH 5/5] add lifecycle to DruidLeaderClient to early initialize DruidNodeDiscovery so that it has its cache update well in time --- .../io/druid/discovery/DruidLeaderClient.java | 41 +++++++++++++++++-- .../guice/CoordinatorDiscoveryModule.java | 2 +- .../guice/IndexingServiceDiscoveryModule.java | 2 +- .../discovery/DruidLeaderClientTest.java | 4 ++ .../src/main/java/io/druid/cli/CliRouter.java | 2 +- 5 files changed, 45 insertions(+), 6 deletions(-) diff --git a/server/src/main/java/io/druid/discovery/DruidLeaderClient.java b/server/src/main/java/io/druid/discovery/DruidLeaderClient.java index bb41619ea1a2..5835405949d5 100644 --- a/server/src/main/java/io/druid/discovery/DruidLeaderClient.java +++ b/server/src/main/java/io/druid/discovery/DruidLeaderClient.java @@ -20,14 +20,18 @@ package io.druid.discovery; import com.google.common.base.Charsets; +import com.google.common.base.Preconditions; import com.metamx.http.client.HttpClient; import com.metamx.http.client.Request; import com.metamx.http.client.response.FullResponseHandler; import com.metamx.http.client.response.FullResponseHolder; import io.druid.client.selector.Server; +import io.druid.concurrent.LifecycleLock; import io.druid.curator.discovery.ServerDiscoverySelector; import io.druid.java.util.common.ISE; import io.druid.java.util.common.StringUtils; +import io.druid.java.util.common.lifecycle.LifecycleStart; +import io.druid.java.util.common.lifecycle.LifecycleStop; import io.druid.java.util.common.logger.Logger; import org.jboss.netty.handler.codec.http.HttpMethod; import org.jboss.netty.handler.codec.http.HttpResponseStatus; @@ -36,6 +40,7 @@ import java.net.URL; import java.util.Iterator; import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; /** @@ -61,6 +66,8 @@ public class DruidLeaderClient //Note: This is kept for back compatibility with pre 0.11.0 releases and should be removed in future. private final ServerDiscoverySelector serverDiscoverySelector; + private LifecycleLock lifecycleLock = new LifecycleLock(); + private DruidNodeDiscovery druidNodeDiscovery; private AtomicReference currentKnownLeader = new AtomicReference<>(); public DruidLeaderClient( @@ -78,13 +85,42 @@ public DruidLeaderClient( this.serverDiscoverySelector = serverDiscoverySelector; } + @LifecycleStart + public void start() + { + if (!lifecycleLock.canStart()) { + throw new ISE("can't start."); + } + + try { + druidNodeDiscovery = druidNodeDiscoveryProvider.getForNodeType(nodeTypeToWatch); + lifecycleLock.started(); + log.info("Started."); + } + finally { + lifecycleLock.exitStart(); + } + } + + @LifecycleStop + public void stop() + { + if (!lifecycleLock.canStop()) { + throw new ISE("can't stop."); + } + + log.info("Stopped."); + } + public Request makeRequest(HttpMethod httpMethod, String urlPath) throws MalformedURLException { + Preconditions.checkState(lifecycleLock.awaitStarted(1, TimeUnit.MILLISECONDS)); return new Request(httpMethod, new URL(StringUtils.format("%s%s", getCurrentKnownLeader(true), urlPath))); } public FullResponseHolder go(Request request) throws InterruptedException { + Preconditions.checkState(lifecycleLock.awaitStarted(1, TimeUnit.MILLISECONDS)); for (int counter = 0; counter < MAX_RETRIES; counter++) { final FullResponseHolder fullResponseHolder; @@ -165,6 +201,7 @@ public FullResponseHolder go(Request request) throws InterruptedException public String findCurrentLeader() { + Preconditions.checkState(lifecycleLock.awaitStarted(1, TimeUnit.MILLISECONDS)); final FullResponseHolder responseHolder; try { responseHolder = go(makeRequest(HttpMethod.GET, leaderRequestPath)); @@ -204,9 +241,7 @@ private String pickOneHost() ); } - Iterator iter = druidNodeDiscoveryProvider.getForNodeType(nodeTypeToWatch) - .getAllNodes() - .iterator(); + Iterator iter = druidNodeDiscovery.getAllNodes().iterator(); if (iter.hasNext()) { DiscoveryDruidNode node = iter.next(); return StringUtils.format( diff --git a/server/src/main/java/io/druid/guice/CoordinatorDiscoveryModule.java b/server/src/main/java/io/druid/guice/CoordinatorDiscoveryModule.java index 3c147a425f5f..3aa9a91c0333 100644 --- a/server/src/main/java/io/druid/guice/CoordinatorDiscoveryModule.java +++ b/server/src/main/java/io/druid/guice/CoordinatorDiscoveryModule.java @@ -54,7 +54,7 @@ public ServerDiscoverySelector getServiceProvider( @Provides @Coordinator - @LazySingleton + @ManageLifecycle public DruidLeaderClient getLeaderHttpClient( @Global HttpClient httpClient, DruidNodeDiscoveryProvider druidNodeDiscoveryProvider, diff --git a/server/src/main/java/io/druid/guice/IndexingServiceDiscoveryModule.java b/server/src/main/java/io/druid/guice/IndexingServiceDiscoveryModule.java index bfdf95e84af8..32737351cafd 100644 --- a/server/src/main/java/io/druid/guice/IndexingServiceDiscoveryModule.java +++ b/server/src/main/java/io/druid/guice/IndexingServiceDiscoveryModule.java @@ -54,7 +54,7 @@ public ServerDiscoverySelector getServiceProvider( @Provides @IndexingService - @LazySingleton + @ManageLifecycle public DruidLeaderClient getLeaderHttpClient( @Global HttpClient httpClient, DruidNodeDiscoveryProvider druidNodeDiscoveryProvider, diff --git a/server/src/test/java/io/druid/discovery/DruidLeaderClientTest.java b/server/src/test/java/io/druid/discovery/DruidLeaderClientTest.java index 630412a5c27c..c29c6451ef01 100644 --- a/server/src/test/java/io/druid/discovery/DruidLeaderClientTest.java +++ b/server/src/test/java/io/druid/discovery/DruidLeaderClientTest.java @@ -121,6 +121,7 @@ public void testSimple() throws Exception "/simple/leader", EasyMock.createNiceMock(ServerDiscoverySelector.class) ); + druidLeaderClient.start(); Request request = druidLeaderClient.makeRequest(HttpMethod.POST, "/simple/direct"); request.setContent("hello".getBytes("UTF-8")); @@ -147,6 +148,7 @@ public void testRedirection() throws Exception "/simple/leader", EasyMock.createNiceMock(ServerDiscoverySelector.class) ); + druidLeaderClient.start(); Request request = druidLeaderClient.makeRequest(HttpMethod.POST, "/simple/redirect"); request.setContent("hello".getBytes("UTF-8")); @@ -183,6 +185,7 @@ public void testServerFailureAndRedirect() throws Exception "/simple/leader", serverDiscoverySelector ); + druidLeaderClient.start(); Request request = druidLeaderClient.makeRequest(HttpMethod.POST, "/simple/redirect"); request.setContent("hello".getBytes("UTF-8")); @@ -209,6 +212,7 @@ public void testFindCurrentLeader() throws Exception "/simple/leader", EasyMock.createNiceMock(ServerDiscoverySelector.class) ); + druidLeaderClient.start(); Assert.assertEquals("http://localhost:1234/", druidLeaderClient.findCurrentLeader()); } diff --git a/services/src/main/java/io/druid/cli/CliRouter.java b/services/src/main/java/io/druid/cli/CliRouter.java index c990e080f3ba..faebceb9b4d6 100644 --- a/services/src/main/java/io/druid/cli/CliRouter.java +++ b/services/src/main/java/io/druid/cli/CliRouter.java @@ -136,7 +136,7 @@ public ServerDiscoverySelector getCoordinatorServerDiscoverySelector( } @Provides - @LazySingleton + @ManageLifecycle public DruidLeaderClient getLeaderHttpClient( @Global HttpClient httpClient, DruidNodeDiscoveryProvider druidNodeDiscoveryProvider,