diff --git a/indexing-service/src/main/java/io/druid/indexing/common/actions/RemoteTaskActionClient.java b/indexing-service/src/main/java/io/druid/indexing/common/actions/RemoteTaskActionClient.java index 829a59222c78..fa20e41f4677 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/actions/RemoteTaskActionClient.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/actions/RemoteTaskActionClient.java @@ -20,20 +20,14 @@ package io.druid.indexing.common.actions; import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.base.Charsets; import com.google.common.base.Throwables; -import com.metamx.http.client.HttpClient; -import com.metamx.http.client.Request; -import com.metamx.http.client.response.StatusResponseHandler; -import com.metamx.http.client.response.StatusResponseHolder; -import io.druid.client.selector.Server; -import io.druid.curator.discovery.ServerDiscoverySelector; +import com.metamx.http.client.response.FullResponseHolder; +import io.druid.discovery.DruidLeaderClient; import io.druid.indexing.common.RetryPolicy; import io.druid.indexing.common.RetryPolicyFactory; import io.druid.indexing.common.task.Task; import io.druid.java.util.common.IOE; import io.druid.java.util.common.jackson.JacksonUtils; -import io.druid.java.util.common.ISE; import io.druid.java.util.common.logger.Logger; import org.jboss.netty.channel.ChannelException; import org.jboss.netty.handler.codec.http.HttpMethod; @@ -41,35 +35,30 @@ import javax.ws.rs.core.MediaType; import java.io.IOException; -import java.net.URI; -import java.net.URISyntaxException; import java.util.Map; import java.util.Random; public class RemoteTaskActionClient implements TaskActionClient { private final Task task; - private final HttpClient httpClient; - private final ServerDiscoverySelector selector; private final RetryPolicyFactory retryPolicyFactory; private final ObjectMapper jsonMapper; + private final DruidLeaderClient druidLeaderClient; private final Random random = new Random(); private static final Logger log = new Logger(RemoteTaskActionClient.class); public RemoteTaskActionClient( Task task, - HttpClient httpClient, - ServerDiscoverySelector selector, + DruidLeaderClient druidLeaderClient, RetryPolicyFactory retryPolicyFactory, ObjectMapper jsonMapper ) { this.task = task; - this.httpClient = httpClient; - this.selector = selector; this.retryPolicyFactory = retryPolicyFactory; this.jsonMapper = jsonMapper; + this.druidLeaderClient = druidLeaderClient; } @Override @@ -83,27 +72,16 @@ public RetType submit(TaskAction taskAction) throws IOExcepti while (true) { try { - final Server server; - final URI serviceUri; - try { - server = getServiceInstance(); - serviceUri = makeServiceUri(server); - } - catch (Exception e) { - // Want to retry, so throw an IOException. - throw new IOException("Failed to locate service uri", e); - } - final StatusResponseHolder response; + final FullResponseHolder fullResponseHolder; - log.info("Submitting action for task[%s] to overlord[%s]: %s", task.getId(), serviceUri, taskAction); + log.info("Submitting action for task[%s] to overlord: [%s].", task.getId(), taskAction); try { - response = httpClient.go( - new Request(HttpMethod.POST, serviceUri.toURL()) - .setContent(MediaType.APPLICATION_JSON, dataToSend), - new StatusResponseHandler(Charsets.UTF_8) - ).get(); + fullResponseHolder = druidLeaderClient.go( + druidLeaderClient.makeRequest(HttpMethod.POST, "/druid/indexer/v1/action") + .setContent(MediaType.APPLICATION_JSON, dataToSend) + ); } catch (Exception e) { Throwables.propagateIfInstanceOf(e.getCause(), IOException.class); @@ -111,18 +89,17 @@ public RetType submit(TaskAction taskAction) throws IOExcepti throw Throwables.propagate(e); } - if (response.getStatus().getCode() / 100 == 2) { + if (fullResponseHolder.getStatus().getCode() / 100 == 2) { final Map responseDict = jsonMapper.readValue( - response.getContent(), + fullResponseHolder.getContent(), JacksonUtils.TYPE_REFERENCE_MAP_STRING_OBJECT ); return jsonMapper.convertValue(responseDict.get("result"), taskAction.getReturnTypeReference()); } else { // Want to retry, so throw an IOException. throw new IOE( - "Scary HTTP status returned: %s. Check your overlord[%s] logs for exceptions.", - response.getStatus(), - server.getHost() + "Scary HTTP status returned: %s. Check your overlord logs for exceptions.", + fullResponseHolder.getStatus() ); } } @@ -152,19 +129,4 @@ private long jitter(long input) long retval = input + (long) jitter; return retval < 0 ? 0 : retval; } - - private URI makeServiceUri(final Server instance) throws URISyntaxException - { - return new URI(instance.getScheme(), null, instance.getAddress(), instance.getPort(), "/druid/indexer/v1/action", null, null); - } - - private Server getServiceInstance() - { - final Server instance = selector.pick(); - if (instance == null) { - throw new ISE("Cannot find instance of indexer to talk to!"); - } else { - return instance; - } - } } diff --git a/indexing-service/src/main/java/io/druid/indexing/common/actions/RemoteTaskActionClientFactory.java b/indexing-service/src/main/java/io/druid/indexing/common/actions/RemoteTaskActionClientFactory.java index 2556621db1f0..efb5f6189319 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/actions/RemoteTaskActionClientFactory.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/actions/RemoteTaskActionClientFactory.java @@ -21,10 +21,8 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.inject.Inject; -import com.metamx.http.client.HttpClient; import io.druid.client.indexing.IndexingService; -import io.druid.curator.discovery.ServerDiscoverySelector; -import io.druid.guice.annotations.Global; +import io.druid.discovery.DruidLeaderClient; import io.druid.indexing.common.RetryPolicyFactory; import io.druid.indexing.common.task.Task; @@ -32,21 +30,18 @@ */ public class RemoteTaskActionClientFactory implements TaskActionClientFactory { - private final HttpClient httpClient; - private final ServerDiscoverySelector selector; + private final DruidLeaderClient druidLeaderClient; private final RetryPolicyFactory retryPolicyFactory; private final ObjectMapper jsonMapper; @Inject public RemoteTaskActionClientFactory( - @Global HttpClient httpClient, - @IndexingService ServerDiscoverySelector selector, + @IndexingService DruidLeaderClient leaderHttpClient, RetryPolicyFactory retryPolicyFactory, ObjectMapper jsonMapper ) { - this.httpClient = httpClient; - this.selector = selector; + this.druidLeaderClient = leaderHttpClient; this.retryPolicyFactory = retryPolicyFactory; this.jsonMapper = jsonMapper; } @@ -54,6 +49,6 @@ public RemoteTaskActionClientFactory( @Override public TaskActionClient create(Task task) { - return new RemoteTaskActionClient(task, httpClient, selector, retryPolicyFactory, jsonMapper); + return new RemoteTaskActionClient(task, druidLeaderClient, retryPolicyFactory, jsonMapper); } } diff --git a/indexing-service/src/test/java/io/druid/indexing/common/actions/RemoteTaskActionClientTest.java b/indexing-service/src/test/java/io/druid/indexing/common/actions/RemoteTaskActionClientTest.java index c07d444dde90..e5276cbed255 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/actions/RemoteTaskActionClientTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/actions/RemoteTaskActionClientTest.java @@ -19,15 +19,10 @@ package io.druid.indexing.common.actions; -import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.util.concurrent.Futures; -import com.metamx.http.client.HttpClient; import com.metamx.http.client.Request; -import com.metamx.http.client.response.StatusResponseHandler; -import com.metamx.http.client.response.StatusResponseHolder; -import io.druid.client.selector.Server; -import io.druid.curator.discovery.ServerDiscoverySelector; +import com.metamx.http.client.response.FullResponseHolder; +import io.druid.discovery.DruidLeaderClient; import io.druid.indexing.common.RetryPolicyConfig; import io.druid.indexing.common.RetryPolicyFactory; import io.druid.indexing.common.TaskLock; @@ -36,64 +31,33 @@ import io.druid.jackson.DefaultObjectMapper; import io.druid.java.util.common.Intervals; import org.easymock.EasyMock; +import org.jboss.netty.handler.codec.http.HttpMethod; +import org.jboss.netty.handler.codec.http.HttpResponse; import org.jboss.netty.handler.codec.http.HttpResponseStatus; import org.junit.Assert; import org.junit.Before; import org.junit.Test; import java.io.IOException; +import java.net.URL; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; -import static org.easymock.EasyMock.anyObject; -import static org.easymock.EasyMock.createMock; import static org.easymock.EasyMock.expect; import static org.easymock.EasyMock.replay; public class RemoteTaskActionClientTest { - - private HttpClient httpClient; - private ServerDiscoverySelector selector; - private Server server; + private DruidLeaderClient druidLeaderClient; List result = null; private ObjectMapper objectMapper = new DefaultObjectMapper(); @Before public void setUp() { - httpClient = createMock(HttpClient.class); - selector = createMock(ServerDiscoverySelector.class); - - server = new Server() - { - - @Override - public String getScheme() - { - return "http"; - } - - @Override - public int getPort() - { - return 8080; - } - - @Override - public String getHost() - { - return "localhost"; - } - - @Override - public String getAddress() - { - return "localhost"; - } - }; + druidLeaderClient = EasyMock.createMock(DruidLeaderClient.class); long now = System.currentTimeMillis(); @@ -106,29 +70,30 @@ public String getAddress() } @Test - public void testSubmitSimple() throws JsonProcessingException + public void testSubmitSimple() throws Exception { + Request request = new Request(HttpMethod.POST, new URL("http://localhost:1234/xx")); + expect(druidLeaderClient.makeRequest(HttpMethod.POST, "/druid/indexer/v1/action")) + .andReturn(request); + // return status code 200 and a list with size equals 1 Map responseBody = new HashMap(); responseBody.put("result", result); String strResult = objectMapper.writeValueAsString(responseBody); - StatusResponseHolder responseHolder = new StatusResponseHolder( + FullResponseHolder responseHolder = new FullResponseHolder( HttpResponseStatus.OK, + EasyMock.createNiceMock(HttpResponse.class), new StringBuilder().append(strResult) ); // set up mocks - expect(selector.pick()).andReturn(server); - replay(selector); + expect(druidLeaderClient.go(request)).andReturn(responseHolder); + replay(druidLeaderClient); - expect(httpClient.go(anyObject(Request.class), anyObject(StatusResponseHandler.class))).andReturn( - Futures.immediateFuture(responseHolder) - ); - replay(httpClient); Task task = new NoopTask("id", 0, 0, null, null, null); RemoteTaskActionClient client = new RemoteTaskActionClient( - task, httpClient, selector, new RetryPolicyFactory( + task, druidLeaderClient, new RetryPolicyFactory( new RetryPolicyConfig() ), objectMapper ); @@ -140,33 +105,35 @@ task, httpClient, selector, new RetryPolicyFactory( } Assert.assertEquals(1, result.size()); - EasyMock.verify(selector, httpClient); + EasyMock.verify(druidLeaderClient); } @Test(expected = IOException.class) - public void testSubmitWithIllegalStatusCode() throws IOException + public void testSubmitWithIllegalStatusCode() throws Exception { // return status code 400 and a list with size equals 1 + Request request = new Request(HttpMethod.POST, new URL("http://localhost:1234/xx")); + expect(druidLeaderClient.makeRequest(HttpMethod.POST, "/druid/indexer/v1/action")) + .andReturn(request); + + // return status code 200 and a list with size equals 1 Map responseBody = new HashMap(); responseBody.put("result", result); String strResult = objectMapper.writeValueAsString(responseBody); - StatusResponseHolder responseHolder = new StatusResponseHolder( + FullResponseHolder responseHolder = new FullResponseHolder( HttpResponseStatus.BAD_REQUEST, + EasyMock.createNiceMock(HttpResponse.class), new StringBuilder().append(strResult) ); // set up mocks - expect(selector.pick()).andReturn(server); - replay(selector); + expect(druidLeaderClient.go(request)).andReturn(responseHolder); + replay(druidLeaderClient); - expect(httpClient.go(anyObject(Request.class), anyObject(StatusResponseHandler.class))).andReturn( - Futures.immediateFuture(responseHolder) - ); - replay(httpClient); Task task = new NoopTask("id", 0, 0, null, null, null); RemoteTaskActionClient client = new RemoteTaskActionClient( - task, httpClient, selector, new RetryPolicyFactory( + task, druidLeaderClient, new RetryPolicyFactory( objectMapper.readValue("{\"maxRetryCount\":0}", RetryPolicyConfig.class) ), objectMapper ); diff --git a/server/src/main/java/io/druid/client/coordinator/CoordinatorClient.java b/server/src/main/java/io/druid/client/coordinator/CoordinatorClient.java index b70da3e5d75a..20b65f065bef 100644 --- a/server/src/main/java/io/druid/client/coordinator/CoordinatorClient.java +++ b/server/src/main/java/io/druid/client/coordinator/CoordinatorClient.java @@ -21,67 +21,48 @@ import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.base.Charsets; import com.google.common.base.Throwables; import com.google.inject.Inject; -import com.metamx.http.client.HttpClient; -import com.metamx.http.client.Request; -import com.metamx.http.client.response.StatusResponseHandler; -import com.metamx.http.client.response.StatusResponseHolder; +import com.metamx.http.client.response.FullResponseHolder; import io.druid.client.ImmutableSegmentLoadInfo; -import io.druid.client.selector.Server; -import io.druid.curator.discovery.ServerDiscoverySelector; -import io.druid.guice.annotations.Global; +import io.druid.discovery.DruidLeaderClient; import io.druid.java.util.common.ISE; - import io.druid.java.util.common.StringUtils; import org.jboss.netty.handler.codec.http.HttpMethod; import org.jboss.netty.handler.codec.http.HttpResponseStatus; import org.joda.time.Interval; -import java.net.URI; -import java.net.URL; import java.util.List; public class CoordinatorClient { - private static final StatusResponseHandler RESPONSE_HANDLER = new StatusResponseHandler(Charsets.UTF_8); - - private final HttpClient client; + private final DruidLeaderClient druidLeaderClient; private final ObjectMapper jsonMapper; - private final ServerDiscoverySelector selector; @Inject public CoordinatorClient( - @Global HttpClient client, ObjectMapper jsonMapper, - @Coordinator ServerDiscoverySelector selector + @Coordinator DruidLeaderClient druidLeaderClient ) { - this.client = client; this.jsonMapper = jsonMapper; - this.selector = selector; + this.druidLeaderClient = druidLeaderClient; } public List fetchServerView(String dataSource, Interval interval, boolean incompleteOk) { try { - StatusResponseHolder response = client.go( - new Request( - HttpMethod.GET, - new URL( - StringUtils.format( - "%s/datasources/%s/intervals/%s/serverview?partial=%s", - baseUrl(), - dataSource, - interval.toString().replace("/", "_"), - incompleteOk - ) - ) - ), - RESPONSE_HANDLER - ).get(); + FullResponseHolder response = druidLeaderClient.go( + druidLeaderClient.makeRequest(HttpMethod.GET, + StringUtils.format( + "/druid/coordinator/v1/datasources/%s/intervals/%s/serverview?partial=%s", + dataSource, + interval.toString().replace("/", "_"), + incompleteOk + )) + ); + if (!response.getStatus().equals(HttpResponseStatus.OK)) { throw new ISE( "Error while fetching serverView status[%s] content[%s]", @@ -100,28 +81,4 @@ public List fetchServerView(String dataSource, Interva throw Throwables.propagate(e); } } - - - private String baseUrl() - { - try { - final Server instance = selector.pick(); - if (instance == null) { - throw new ISE("Cannot find instance of coordinator.. Did you set `druid.selectors.coordinator.serviceName`?"); - } - - return new URI( - instance.getScheme(), - null, - instance.getAddress(), - instance.getPort(), - "/druid/coordinator/v1", - null, - null - ).toString(); - } - catch (Exception e) { - throw Throwables.propagate(e); - } - } } diff --git a/server/src/main/java/io/druid/client/indexing/IndexingServiceClient.java b/server/src/main/java/io/druid/client/indexing/IndexingServiceClient.java index 96b5d9bb7d9c..71a03a5a41df 100644 --- a/server/src/main/java/io/druid/client/indexing/IndexingServiceClient.java +++ b/server/src/main/java/io/druid/client/indexing/IndexingServiceClient.java @@ -22,44 +22,29 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Throwables; import com.google.inject.Inject; -import com.metamx.http.client.HttpClient; -import com.metamx.http.client.Request; -import com.metamx.http.client.response.InputStreamResponseHandler; -import io.druid.client.selector.Server; -import io.druid.curator.discovery.ServerDiscoverySelector; -import io.druid.guice.annotations.Global; +import io.druid.discovery.DruidLeaderClient; import io.druid.java.util.common.IAE; -import io.druid.java.util.common.ISE; -import io.druid.java.util.common.StringUtils; import io.druid.timeline.DataSegment; import org.jboss.netty.handler.codec.http.HttpMethod; import org.joda.time.Interval; import javax.ws.rs.core.MediaType; -import java.io.InputStream; -import java.net.URI; -import java.net.URL; import java.util.Iterator; import java.util.List; public class IndexingServiceClient { - private static final InputStreamResponseHandler RESPONSE_HANDLER = new InputStreamResponseHandler(); - - private final HttpClient client; + private final DruidLeaderClient druidLeaderClient; private final ObjectMapper jsonMapper; - private final ServerDiscoverySelector selector; @Inject public IndexingServiceClient( - @Global HttpClient client, ObjectMapper jsonMapper, - @IndexingService ServerDiscoverySelector selector + @IndexingService DruidLeaderClient druidLeaderClient ) { - this.client = client; this.jsonMapper = jsonMapper; - this.selector = selector; + this.druidLeaderClient = druidLeaderClient; } public void mergeSegments(List segments) @@ -95,39 +80,15 @@ public void upgradeSegments(String dataSource, Interval interval) runQuery(new ClientConversionQuery(dataSource, interval)); } - private InputStream runQuery(Object queryObject) + private void runQuery(Object queryObject) { try { - return client.go( - new Request( + druidLeaderClient.go( + druidLeaderClient.makeRequest( HttpMethod.POST, - new URL(StringUtils.format("%s/task", baseUrl())) - ).setContent(MediaType.APPLICATION_JSON, jsonMapper.writeValueAsBytes(queryObject)), - RESPONSE_HANDLER - ).get(); - } - catch (Exception e) { - throw Throwables.propagate(e); - } - } - - private String baseUrl() - { - try { - final Server instance = selector.pick(); - if (instance == null) { - throw new ISE("Cannot find instance of indexingService"); - } - - return new URI( - instance.getScheme(), - null, - instance.getAddress(), - instance.getPort(), - "/druid/indexer/v1", - null, - null - ).toString(); + "/druid/indexer/v1/task" + ).setContent(MediaType.APPLICATION_JSON, jsonMapper.writeValueAsBytes(queryObject)) + ); } catch (Exception e) { throw Throwables.propagate(e); diff --git a/server/src/main/java/io/druid/curator/discovery/CuratorDruidNodeDiscoveryProvider.java b/server/src/main/java/io/druid/curator/discovery/CuratorDruidNodeDiscoveryProvider.java index b4e4827760ad..ce8437235f58 100644 --- a/server/src/main/java/io/druid/curator/discovery/CuratorDruidNodeDiscoveryProvider.java +++ b/server/src/main/java/io/druid/curator/discovery/CuratorDruidNodeDiscoveryProvider.java @@ -48,6 +48,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; @@ -168,7 +169,7 @@ private static class NodeTypeWatcher implements DruidNodeDiscovery private final Object lock = new Object(); - private boolean cacheInitialized = false; + private CountDownLatch cacheInitialized = new CountDownLatch(1); NodeTypeWatcher( ExecutorService listenerExecutor, @@ -197,6 +198,9 @@ private static class NodeTypeWatcher implements DruidNodeDiscovery @Override public Collection getAllNodes() { + if (!isCacheInitialized(30, TimeUnit.SECONDS)) { + log.info("cache is not initialized yet. getAllNodes() might not return full information."); + } return Collections.unmodifiableCollection(nodes.values()); } @@ -204,7 +208,7 @@ public Collection getAllNodes() public void registerListener(DruidNodeDiscovery.Listener listener) { synchronized (lock) { - if (cacheInitialized) { + if (isCacheInitialized(1, TimeUnit.MICROSECONDS)) { ImmutableList currNodes = ImmutableList.copyOf(nodes.values()); safeSchedule( () -> { @@ -278,15 +282,13 @@ public void handleChildEvent(CuratorFramework client, PathChildrenCacheEvent eve break; } case INITIALIZED: { - if (cacheInitialized) { + if (isCacheInitialized(1, TimeUnit.MICROSECONDS)) { log.warn("cache is already initialized. ignoring [%s] event, nodeType [%s].", event.getType(), nodeType); return; } log.info("Received INITIALIZED in node watcher for type [%s].", nodeType); - cacheInitialized = true; - ImmutableList currNodes = ImmutableList.copyOf(nodes.values()); for (Listener l : nodeListeners) { safeSchedule( @@ -297,6 +299,7 @@ public void handleChildEvent(CuratorFramework client, PathChildrenCacheEvent eve ); } + cacheInitialized.countDown(); break; } default: { @@ -310,6 +313,17 @@ public void handleChildEvent(CuratorFramework client, PathChildrenCacheEvent eve } } + private boolean isCacheInitialized(long waitFor, TimeUnit timeUnit) + { + try { + return cacheInitialized.await(waitFor, timeUnit); + } + catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + return false; + } + } + private void safeSchedule( Runnable runnable, String errMsgFormat, Object... args @@ -329,7 +343,7 @@ private void addNode(DiscoveryDruidNode druidNode) { DiscoveryDruidNode prev = nodes.putIfAbsent(druidNode.getDruidNode().getHostAndPortToUse(), druidNode); if (prev == null) { - if (cacheInitialized) { + if (isCacheInitialized(1, TimeUnit.MICROSECONDS)) { List newNode = ImmutableList.of(druidNode); for (Listener l : nodeListeners) { safeSchedule( @@ -354,7 +368,7 @@ private void removeNode(DiscoveryDruidNode druidNode) return; } - if (cacheInitialized) { + if (isCacheInitialized(1, TimeUnit.MICROSECONDS)) { List nodeRemoved = ImmutableList.of(druidNode); for (Listener l : nodeListeners) { safeSchedule( diff --git a/server/src/main/java/io/druid/discovery/DruidLeaderClient.java b/server/src/main/java/io/druid/discovery/DruidLeaderClient.java new file mode 100644 index 000000000000..5835405949d5 --- /dev/null +++ b/server/src/main/java/io/druid/discovery/DruidLeaderClient.java @@ -0,0 +1,266 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.discovery; + +import com.google.common.base.Charsets; +import com.google.common.base.Preconditions; +import com.metamx.http.client.HttpClient; +import com.metamx.http.client.Request; +import com.metamx.http.client.response.FullResponseHandler; +import com.metamx.http.client.response.FullResponseHolder; +import io.druid.client.selector.Server; +import io.druid.concurrent.LifecycleLock; +import io.druid.curator.discovery.ServerDiscoverySelector; +import io.druid.java.util.common.ISE; +import io.druid.java.util.common.StringUtils; +import io.druid.java.util.common.lifecycle.LifecycleStart; +import io.druid.java.util.common.lifecycle.LifecycleStop; +import io.druid.java.util.common.logger.Logger; +import org.jboss.netty.handler.codec.http.HttpMethod; +import org.jboss.netty.handler.codec.http.HttpResponseStatus; + +import java.net.MalformedURLException; +import java.net.URL; +import java.util.Iterator; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +/** + * This class facilitates interaction with Coordinator/Overlord leader nodes. Instance of this class is injected + * via Guice with annotations @Coordinator or @IndexingService . + * Usage: + * Request request = druidLeaderClient.makeRequest(HttpMethod, requestPath) + * request.setXXX(..) + * FullResponseHolder responseHolder = druidLeaderClient.go(request) + */ +public class DruidLeaderClient +{ + private final Logger log = new Logger(DruidLeaderClient.class); + + private static final int MAX_RETRIES = 3; + + private final HttpClient httpClient; + private final DruidNodeDiscoveryProvider druidNodeDiscoveryProvider; + private final String nodeTypeToWatch; + + private final String leaderRequestPath; + + //Note: This is kept for back compatibility with pre 0.11.0 releases and should be removed in future. + private final ServerDiscoverySelector serverDiscoverySelector; + + private LifecycleLock lifecycleLock = new LifecycleLock(); + private DruidNodeDiscovery druidNodeDiscovery; + private AtomicReference currentKnownLeader = new AtomicReference<>(); + + public DruidLeaderClient( + HttpClient httpClient, + DruidNodeDiscoveryProvider druidNodeDiscoveryProvider, + String nodeTypeToWatch, + String leaderRequestPath, + ServerDiscoverySelector serverDiscoverySelector + ) + { + this.httpClient = httpClient; + this.druidNodeDiscoveryProvider = druidNodeDiscoveryProvider; + this.nodeTypeToWatch = nodeTypeToWatch; + this.leaderRequestPath = leaderRequestPath; + this.serverDiscoverySelector = serverDiscoverySelector; + } + + @LifecycleStart + public void start() + { + if (!lifecycleLock.canStart()) { + throw new ISE("can't start."); + } + + try { + druidNodeDiscovery = druidNodeDiscoveryProvider.getForNodeType(nodeTypeToWatch); + lifecycleLock.started(); + log.info("Started."); + } + finally { + lifecycleLock.exitStart(); + } + } + + @LifecycleStop + public void stop() + { + if (!lifecycleLock.canStop()) { + throw new ISE("can't stop."); + } + + log.info("Stopped."); + } + + public Request makeRequest(HttpMethod httpMethod, String urlPath) throws MalformedURLException + { + Preconditions.checkState(lifecycleLock.awaitStarted(1, TimeUnit.MILLISECONDS)); + return new Request(httpMethod, new URL(StringUtils.format("%s%s", getCurrentKnownLeader(true), urlPath))); + } + + public FullResponseHolder go(Request request) throws InterruptedException + { + Preconditions.checkState(lifecycleLock.awaitStarted(1, TimeUnit.MILLISECONDS)); + for (int counter = 0; counter < MAX_RETRIES; counter++) { + + final FullResponseHolder fullResponseHolder; + + try { + fullResponseHolder = httpClient.go(request, new FullResponseHandler(Charsets.UTF_8)).get(); + } + catch (ExecutionException ex) { + // can happen if the node is stopped. + log.info("Request[%s] failed with msg [%s].", request.getUrl(), ex.getMessage()); + log.debug(ex, "Request[%s] failed.", request.getUrl()); + + try { + if (request.getUrl().getQuery() == null) { + request = withUrl( + request, + new URL(StringUtils.format("%s%s", getCurrentKnownLeader(false), request.getUrl().getPath())) + ); + } else { + request = withUrl( + request, + new URL(StringUtils.format("%s%s?%s", + getCurrentKnownLeader(false), + request.getUrl().getPath(), + request.getUrl().getQuery() + )) + ); + } + continue; + } + catch (MalformedURLException e) { + throw new ISE( + e, + "failed to build url with path[%] and query string [%s].", + request.getUrl().getPath(), + request.getUrl().getQuery() + ); + } + } + + if (HttpResponseStatus.TEMPORARY_REDIRECT.equals(fullResponseHolder.getResponse().getStatus())) { + String redirectUrlStr = fullResponseHolder.getResponse().headers().get("Location"); + if (redirectUrlStr == null) { + throw new ISE("No redirect location is found in response from url[%s].", request.getUrl()); + } + + log.info("Request[%s] received redirect response to location [%s].", request.getUrl(), redirectUrlStr); + + final URL redirectUrl; + try { + redirectUrl = new URL(redirectUrlStr); + } + catch (MalformedURLException ex) { + throw new ISE( + ex, + "Malformed redirect location is found in response from url[%s], new location[%s].", + request.getUrl(), + redirectUrlStr + ); + } + + //update known leader location + currentKnownLeader.set(StringUtils.format( + "%s://%s:%s", + redirectUrl.getProtocol(), + redirectUrl.getHost(), + redirectUrl.getPort() + )); + + request = withUrl(request, redirectUrl); + } else { + return fullResponseHolder; + } + } + + throw new ISE("Retries exhausted, couldn't fulfill request to [%s].", request.getUrl()); + } + + public String findCurrentLeader() + { + Preconditions.checkState(lifecycleLock.awaitStarted(1, TimeUnit.MILLISECONDS)); + final FullResponseHolder responseHolder; + try { + responseHolder = go(makeRequest(HttpMethod.GET, leaderRequestPath)); + } + catch (Exception ex) { + throw new ISE(ex, "Couldn't find leader."); + } + + if (responseHolder.getStatus().getCode() == 200) { + return responseHolder.getContent(); + } else { + throw new ISE( + "Couldn't find leader, failed response status is [%s] and content [%s].", + responseHolder.getStatus().getCode(), + responseHolder.getContent() + ); + } + } + + private String getCurrentKnownLeader(final boolean cached) + { + return currentKnownLeader.accumulateAndGet( + null, + (current, given) -> current == null || !cached ? pickOneHost() : current + ); + } + + private String pickOneHost() + { + Server server = serverDiscoverySelector.pick(); + if (server != null) { + return StringUtils.format( + "%s://%s:%s", + server.getScheme(), + server.getAddress(), + server.getPort() + ); + } + + Iterator iter = druidNodeDiscovery.getAllNodes().iterator(); + if (iter.hasNext()) { + DiscoveryDruidNode node = iter.next(); + return StringUtils.format( + "%s://%s", + node.getDruidNode().getServiceScheme(), + node.getDruidNode().getHostAndPortToUse() + ); + } + + throw new ISE("Couldn't find any servers."); + } + + private Request withUrl(Request old, URL url) + { + Request req = new Request(old.getMethod(), url); + req.addHeaderValues(old.getHeaders()); + if (old.hasContent()) { + req.setContent(old.getContent()); + } + return req; + } +} diff --git a/server/src/main/java/io/druid/guice/CoordinatorDiscoveryModule.java b/server/src/main/java/io/druid/guice/CoordinatorDiscoveryModule.java index 908ce87f3787..3aa9a91c0333 100644 --- a/server/src/main/java/io/druid/guice/CoordinatorDiscoveryModule.java +++ b/server/src/main/java/io/druid/guice/CoordinatorDiscoveryModule.java @@ -22,10 +22,14 @@ import com.google.inject.Binder; import com.google.inject.Module; import com.google.inject.Provides; +import com.metamx.http.client.HttpClient; import io.druid.client.coordinator.Coordinator; import io.druid.client.coordinator.CoordinatorSelectorConfig; import io.druid.curator.discovery.ServerDiscoveryFactory; import io.druid.curator.discovery.ServerDiscoverySelector; +import io.druid.discovery.DruidLeaderClient; +import io.druid.discovery.DruidNodeDiscoveryProvider; +import io.druid.guice.annotations.Global; /** */ @@ -47,4 +51,22 @@ public ServerDiscoverySelector getServiceProvider( { return serverDiscoveryFactory.createSelector(config.getServiceName()); } + + @Provides + @Coordinator + @ManageLifecycle + public DruidLeaderClient getLeaderHttpClient( + @Global HttpClient httpClient, + DruidNodeDiscoveryProvider druidNodeDiscoveryProvider, + @Coordinator ServerDiscoverySelector serverDiscoverySelector + ) + { + return new DruidLeaderClient( + httpClient, + druidNodeDiscoveryProvider, + DruidNodeDiscoveryProvider.NODE_TYPE_COORDINATOR, + "/druid/coordinator/v1/leader", + serverDiscoverySelector + ); + } } diff --git a/server/src/main/java/io/druid/guice/IndexingServiceDiscoveryModule.java b/server/src/main/java/io/druid/guice/IndexingServiceDiscoveryModule.java index 167e003f2365..32737351cafd 100644 --- a/server/src/main/java/io/druid/guice/IndexingServiceDiscoveryModule.java +++ b/server/src/main/java/io/druid/guice/IndexingServiceDiscoveryModule.java @@ -22,10 +22,14 @@ import com.google.inject.Binder; import com.google.inject.Module; import com.google.inject.Provides; +import com.metamx.http.client.HttpClient; import io.druid.client.indexing.IndexingService; import io.druid.client.indexing.IndexingServiceSelectorConfig; import io.druid.curator.discovery.ServerDiscoveryFactory; import io.druid.curator.discovery.ServerDiscoverySelector; +import io.druid.discovery.DruidLeaderClient; +import io.druid.discovery.DruidNodeDiscoveryProvider; +import io.druid.guice.annotations.Global; /** */ @@ -47,4 +51,22 @@ public ServerDiscoverySelector getServiceProvider( { return serverDiscoveryFactory.createSelector(config.getServiceName()); } + + @Provides + @IndexingService + @ManageLifecycle + public DruidLeaderClient getLeaderHttpClient( + @Global HttpClient httpClient, + DruidNodeDiscoveryProvider druidNodeDiscoveryProvider, + @IndexingService ServerDiscoverySelector serverDiscoverySelector + ) + { + return new DruidLeaderClient( + httpClient, + druidNodeDiscoveryProvider, + DruidNodeDiscoveryProvider.NODE_TYPE_OVERLORD, + "/druid/indexer/v1/leader", + serverDiscoverySelector + ); + } } diff --git a/server/src/main/java/io/druid/server/http/OverlordProxyServlet.java b/server/src/main/java/io/druid/server/http/OverlordProxyServlet.java index a23c5702eda1..844edf2dbff6 100644 --- a/server/src/main/java/io/druid/server/http/OverlordProxyServlet.java +++ b/server/src/main/java/io/druid/server/http/OverlordProxyServlet.java @@ -21,12 +21,9 @@ import com.google.common.base.Throwables; import com.google.inject.Inject; - import io.druid.client.indexing.IndexingService; -import io.druid.client.selector.Server; -import io.druid.curator.discovery.ServerDiscoverySelector; +import io.druid.discovery.DruidLeaderClient; import io.druid.java.util.common.ISE; - import org.eclipse.jetty.proxy.ProxyServlet; import javax.servlet.http.HttpServletRequest; @@ -38,27 +35,28 @@ */ public class OverlordProxyServlet extends ProxyServlet { - private final ServerDiscoverySelector selector; + private final DruidLeaderClient druidLeaderClient; @Inject OverlordProxyServlet( - @IndexingService ServerDiscoverySelector selector + @IndexingService DruidLeaderClient druidLeaderClient ) { - this.selector = selector; + this.druidLeaderClient = druidLeaderClient; } @Override protected String rewriteTarget(HttpServletRequest request) { try { - final Server indexer = selector.pick(); - if (indexer == null) { - throw new ISE("Can't find indexingService, did you configure druid.selectors.indexing.serviceName same as druid.service at overlord?"); + final String overlordLeader = druidLeaderClient.findCurrentLeader(); + if (overlordLeader == null) { + throw new ISE("Can't find Overlord leader."); } + return new URI( request.getScheme(), - indexer.getHost(), + overlordLeader, request.getRequestURI(), request.getQueryString(), null diff --git a/server/src/main/java/io/druid/server/router/CoordinatorRuleManager.java b/server/src/main/java/io/druid/server/router/CoordinatorRuleManager.java index 66bcdb0d2e56..70a0148c7aa5 100644 --- a/server/src/main/java/io/druid/server/router/CoordinatorRuleManager.java +++ b/server/src/main/java/io/druid/server/router/CoordinatorRuleManager.java @@ -21,20 +21,15 @@ import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.base.Charsets; import com.google.common.base.Supplier; import com.google.common.collect.Lists; import com.google.inject.Inject; -import com.metamx.http.client.HttpClient; -import com.metamx.http.client.Request; -import com.metamx.http.client.response.FullResponseHandler; import com.metamx.http.client.response.FullResponseHolder; -import io.druid.client.selector.Server; import io.druid.concurrent.Execs; -import io.druid.curator.discovery.ServerDiscoverySelector; +import io.druid.discovery.DruidLeaderClient; import io.druid.guice.ManageLifecycle; -import io.druid.guice.annotations.Global; import io.druid.guice.annotations.Json; +import io.druid.java.util.common.ISE; import io.druid.java.util.common.concurrent.ScheduledExecutors; import io.druid.java.util.common.lifecycle.LifecycleStart; import io.druid.java.util.common.lifecycle.LifecycleStop; @@ -44,9 +39,6 @@ import org.jboss.netty.handler.codec.http.HttpResponseStatus; import org.joda.time.Duration; -import java.net.URI; -import java.net.URISyntaxException; -import java.net.URL; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -60,14 +52,13 @@ public class CoordinatorRuleManager { private static final Logger log = new Logger(CoordinatorRuleManager.class); - private final HttpClient httpClient; private final ObjectMapper jsonMapper; private final Supplier config; - private final ServerDiscoverySelector selector; - private final FullResponseHandler responseHandler; private final AtomicReference>> rules; + private final DruidLeaderClient druidLeaderClient; + private volatile ScheduledExecutorService exec; private final Object lock = new Object(); @@ -76,18 +67,15 @@ public class CoordinatorRuleManager @Inject public CoordinatorRuleManager( - @Global HttpClient httpClient, @Json ObjectMapper jsonMapper, Supplier config, - ServerDiscoverySelector selector + DruidLeaderClient druidLeaderClient ) { - this.httpClient = httpClient; this.jsonMapper = jsonMapper; this.config = config; - this.selector = selector; + this.druidLeaderClient = druidLeaderClient; - this.responseHandler = new FullResponseHandler(Charsets.UTF_8); this.rules = new AtomicReference<>( new ConcurrentHashMap>() ); @@ -146,29 +134,16 @@ public boolean isStarted() public void poll() { try { - String url = getRuleURL(); - if (url == null) { - return; - } + FullResponseHolder response = druidLeaderClient.go( + druidLeaderClient.makeRequest(HttpMethod.GET, config.get().getRulesEndpoint()) + ); - FullResponseHolder response = httpClient.go( - new Request( - HttpMethod.GET, - new URL(url) - ), - responseHandler - ).get(); - - if (response.getStatus().equals(HttpResponseStatus.FOUND)) { - url = response.getResponse().headers().get("Location"); - log.info("Redirecting rule request to [%s]", url); - response = httpClient.go( - new Request( - HttpMethod.GET, - new URL(url) - ), - responseHandler - ).get(); + if (!response.getStatus().equals(HttpResponseStatus.OK)) { + throw new ISE( + "Error while polling rules, status[%s] content[%s]", + response.getStatus(), + response.getContent() + ); } ConcurrentHashMap> newRules = new ConcurrentHashMap<>( @@ -200,24 +175,4 @@ public List getRulesWithDefault(final String dataSource) } return retVal; } - - private String getRuleURL() throws URISyntaxException - { - Server server = selector.pick(); - - if (server == null) { - log.error("No instances found for [%s]!", config.get().getCoordinatorServiceName()); - return null; - } - - return new URI( - server.getScheme(), - null, - server.getAddress(), - server.getPort(), - config.get().getRulesEndpoint(), - null, - null - ).toString(); - } } diff --git a/server/src/test/java/io/druid/discovery/DruidLeaderClientTest.java b/server/src/test/java/io/druid/discovery/DruidLeaderClientTest.java new file mode 100644 index 000000000000..c29c6451ef01 --- /dev/null +++ b/server/src/test/java/io/druid/discovery/DruidLeaderClientTest.java @@ -0,0 +1,274 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.discovery; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.inject.Binder; +import com.google.inject.Inject; +import com.google.inject.Injector; +import com.google.inject.Key; +import com.google.inject.Module; +import com.google.inject.name.Named; +import com.google.inject.name.Names; +import com.google.inject.servlet.GuiceFilter; +import com.metamx.http.client.HttpClient; +import com.metamx.http.client.Request; +import io.druid.curator.discovery.ServerDiscoverySelector; +import io.druid.guice.GuiceInjectors; +import io.druid.guice.Jerseys; +import io.druid.guice.JsonConfigProvider; +import io.druid.guice.LazySingleton; +import io.druid.guice.LifecycleModule; +import io.druid.guice.annotations.Self; +import io.druid.initialization.Initialization; +import io.druid.java.util.common.StringUtils; +import io.druid.server.DruidNode; +import io.druid.server.initialization.BaseJettyTest; +import io.druid.server.initialization.ServerConfig; +import io.druid.server.initialization.jetty.JettyServerInitializer; +import org.easymock.EasyMock; +import org.eclipse.jetty.server.Handler; +import org.eclipse.jetty.server.Server; +import org.eclipse.jetty.server.handler.HandlerList; +import org.eclipse.jetty.servlet.DefaultServlet; +import org.eclipse.jetty.servlet.ServletContextHandler; +import org.eclipse.jetty.servlet.ServletHolder; +import org.jboss.netty.handler.codec.http.HttpMethod; +import org.junit.Assert; +import org.junit.Test; + +import javax.ws.rs.GET; +import javax.ws.rs.POST; +import javax.ws.rs.Path; +import javax.ws.rs.Produces; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; +import java.net.URI; + +/** + */ +public class DruidLeaderClientTest extends BaseJettyTest +{ + private DiscoveryDruidNode discoveryDruidNode; + private HttpClient httpClient; + + @Override + protected Injector setupInjector() + { + final DruidNode node = new DruidNode("test", "localhost", null, null, new ServerConfig()); + discoveryDruidNode = new DiscoveryDruidNode(node, "test", ImmutableMap.of()); + + Injector injector = Initialization.makeInjectorWithModules( + GuiceInjectors.makeStartupInjector(), ImmutableList.of( + new Module() + { + @Override + public void configure(Binder binder) + { + JsonConfigProvider.bindInstance( + binder, + Key.get(DruidNode.class, Self.class), + node + ); + binder.bind(Integer.class).annotatedWith(Names.named("port")).toInstance(node.getPlaintextPort()); + binder.bind(JettyServerInitializer.class).to(TestJettyServerInitializer.class).in(LazySingleton.class); + Jerseys.addResource(binder, SimpleResource.class); + LifecycleModule.register(binder, Server.class); + } + } + ) + ); + + httpClient = injector.getInstance(ClientHolder.class).getClient(); + return injector; + } + + @Test + public void testSimple() throws Exception + { + DruidNodeDiscovery druidNodeDiscovery = EasyMock.createMock(DruidNodeDiscovery.class); + EasyMock.expect(druidNodeDiscovery.getAllNodes()).andReturn( + ImmutableList.of(discoveryDruidNode) + ); + + DruidNodeDiscoveryProvider druidNodeDiscoveryProvider = EasyMock.createMock(DruidNodeDiscoveryProvider.class); + EasyMock.expect(druidNodeDiscoveryProvider.getForNodeType("nodetype")).andReturn(druidNodeDiscovery); + + EasyMock.replay(druidNodeDiscovery, druidNodeDiscoveryProvider); + + DruidLeaderClient druidLeaderClient = new DruidLeaderClient( + httpClient, + druidNodeDiscoveryProvider, + "nodetype", + "/simple/leader", + EasyMock.createNiceMock(ServerDiscoverySelector.class) + ); + druidLeaderClient.start(); + + Request request = druidLeaderClient.makeRequest(HttpMethod.POST, "/simple/direct"); + request.setContent("hello".getBytes("UTF-8")); + Assert.assertEquals("hello", druidLeaderClient.go(request).getContent()); + } + + @Test + public void testRedirection() throws Exception + { + DruidNodeDiscovery druidNodeDiscovery = EasyMock.createMock(DruidNodeDiscovery.class); + EasyMock.expect(druidNodeDiscovery.getAllNodes()).andReturn( + ImmutableList.of(discoveryDruidNode) + ); + + DruidNodeDiscoveryProvider druidNodeDiscoveryProvider = EasyMock.createMock(DruidNodeDiscoveryProvider.class); + EasyMock.expect(druidNodeDiscoveryProvider.getForNodeType("nodetype")).andReturn(druidNodeDiscovery); + + EasyMock.replay(druidNodeDiscovery, druidNodeDiscoveryProvider); + + DruidLeaderClient druidLeaderClient = new DruidLeaderClient( + httpClient, + druidNodeDiscoveryProvider, + "nodetype", + "/simple/leader", + EasyMock.createNiceMock(ServerDiscoverySelector.class) + ); + druidLeaderClient.start(); + + Request request = druidLeaderClient.makeRequest(HttpMethod.POST, "/simple/redirect"); + request.setContent("hello".getBytes("UTF-8")); + Assert.assertEquals("hello", druidLeaderClient.go(request).getContent()); + } + + @Test + public void testServerFailureAndRedirect() throws Exception + { + ServerDiscoverySelector serverDiscoverySelector = EasyMock.createMock(ServerDiscoverySelector.class); + EasyMock.expect(serverDiscoverySelector.pick()).andReturn(null).anyTimes(); + + DruidNodeDiscovery druidNodeDiscovery = EasyMock.createMock(DruidNodeDiscovery.class); + EasyMock.expect(druidNodeDiscovery.getAllNodes()).andReturn( + ImmutableList.of(new DiscoveryDruidNode( + new DruidNode("test", "dummyhost", 64231, null, new ServerConfig()), + "test", + ImmutableMap.of() + )) + ); + EasyMock.expect(druidNodeDiscovery.getAllNodes()).andReturn( + ImmutableList.of(discoveryDruidNode) + ); + + DruidNodeDiscoveryProvider druidNodeDiscoveryProvider = EasyMock.createMock(DruidNodeDiscoveryProvider.class); + EasyMock.expect(druidNodeDiscoveryProvider.getForNodeType("nodetype")).andReturn(druidNodeDiscovery).anyTimes(); + + EasyMock.replay(serverDiscoverySelector, druidNodeDiscovery, druidNodeDiscoveryProvider); + + DruidLeaderClient druidLeaderClient = new DruidLeaderClient( + httpClient, + druidNodeDiscoveryProvider, + "nodetype", + "/simple/leader", + serverDiscoverySelector + ); + druidLeaderClient.start(); + + Request request = druidLeaderClient.makeRequest(HttpMethod.POST, "/simple/redirect"); + request.setContent("hello".getBytes("UTF-8")); + Assert.assertEquals("hello", druidLeaderClient.go(request).getContent()); + } + + @Test + public void testFindCurrentLeader() throws Exception + { + DruidNodeDiscovery druidNodeDiscovery = EasyMock.createMock(DruidNodeDiscovery.class); + EasyMock.expect(druidNodeDiscovery.getAllNodes()).andReturn( + ImmutableList.of(discoveryDruidNode) + ); + + DruidNodeDiscoveryProvider druidNodeDiscoveryProvider = EasyMock.createMock(DruidNodeDiscoveryProvider.class); + EasyMock.expect(druidNodeDiscoveryProvider.getForNodeType("nodetype")).andReturn(druidNodeDiscovery); + + EasyMock.replay(druidNodeDiscovery, druidNodeDiscoveryProvider); + + DruidLeaderClient druidLeaderClient = new DruidLeaderClient( + httpClient, + druidNodeDiscoveryProvider, + "nodetype", + "/simple/leader", + EasyMock.createNiceMock(ServerDiscoverySelector.class) + ); + druidLeaderClient.start(); + + Assert.assertEquals("http://localhost:1234/", druidLeaderClient.findCurrentLeader()); + } + + private static class TestJettyServerInitializer implements JettyServerInitializer + { + @Override + public void initialize(Server server, Injector injector) + { + final ServletContextHandler root = new ServletContextHandler(ServletContextHandler.SESSIONS); + root.addServlet(new ServletHolder(new DefaultServlet()), "/*"); + root.addFilter(GuiceFilter.class, "/*", null); + + final HandlerList handlerList = new HandlerList(); + handlerList.setHandlers(new Handler[]{root}); + server.setHandler(handlerList); + } + } + + @Path("/simple") + public static class SimpleResource + { + private final int port; + + @Inject + public SimpleResource(@Named("port") int port) + { + this.port = port; + } + + @POST + @Path("/direct") + @Produces(MediaType.APPLICATION_JSON) + public Response direct(String input) + { + if ("hello".equals(input)) { + return Response.ok("hello").build(); + } else { + return Response.serverError().build(); + } + } + + @POST + @Path("/redirect") + @Produces(MediaType.APPLICATION_JSON) + public Response redirecting() throws Exception + { + return Response.temporaryRedirect(new URI(StringUtils.format("http://localhost:%s/simple/direct", port))).build(); + } + + @GET + @Path("/leader") + @Produces(MediaType.APPLICATION_JSON) + public Response leader() + { + return Response.ok("http://localhost:1234/").build(); + } + } +} diff --git a/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorSegmentMergerTest.java b/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorSegmentMergerTest.java index eec7a87d88d6..686d7399d57e 100644 --- a/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorSegmentMergerTest.java +++ b/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorSegmentMergerTest.java @@ -455,7 +455,7 @@ private static List> merge(final Collection segme EasyMock.replay(configManager); final List> retVal = Lists.newArrayList(); - final IndexingServiceClient indexingServiceClient = new IndexingServiceClient(null, null, null) + final IndexingServiceClient indexingServiceClient = new IndexingServiceClient(null, null) { @Override public void mergeSegments(List segmentsToMerge) diff --git a/server/src/test/java/io/druid/server/http/OverlordProxyServletTest.java b/server/src/test/java/io/druid/server/http/OverlordProxyServletTest.java index 20ad1f17dd5f..120a529ddb20 100644 --- a/server/src/test/java/io/druid/server/http/OverlordProxyServletTest.java +++ b/server/src/test/java/io/druid/server/http/OverlordProxyServletTest.java @@ -19,8 +19,7 @@ package io.druid.server.http; -import io.druid.client.selector.Server; -import io.druid.curator.discovery.ServerDiscoverySelector; +import io.druid.discovery.DruidLeaderClient; import org.easymock.EasyMock; import org.junit.Assert; import org.junit.Test; @@ -33,17 +32,17 @@ public class OverlordProxyServletTest @Test public void testRewriteURI() { - ServerDiscoverySelector selector = EasyMock.createMock(ServerDiscoverySelector.class); - Server server = EasyMock.createMock(Server.class); - EasyMock.expect(server.getHost()).andReturn("overlord:port"); - EasyMock.expect(selector.pick()).andReturn(server).anyTimes(); + DruidLeaderClient druidLeaderClient = EasyMock.createMock(DruidLeaderClient.class); + EasyMock.expect(druidLeaderClient.findCurrentLeader()).andReturn("overlord:port"); + HttpServletRequest request = EasyMock.createMock(HttpServletRequest.class); EasyMock.expect(request.getScheme()).andReturn("https").anyTimes(); EasyMock.expect(request.getQueryString()).andReturn("param1=test¶m2=test2").anyTimes(); EasyMock.expect(request.getRequestURI()).andReturn("/druid/overlord/worker").anyTimes(); - EasyMock.replay(server, selector, request); - URI uri = URI.create(new OverlordProxyServlet(selector).rewriteTarget(request)); + EasyMock.replay(druidLeaderClient, request); + + URI uri = URI.create(new OverlordProxyServlet(druidLeaderClient).rewriteTarget(request)); Assert.assertEquals("https://overlord:port/druid/overlord/worker?param1=test¶m2=test2", uri.toString()); } diff --git a/server/src/test/java/io/druid/server/router/TieredBrokerHostSelectorTest.java b/server/src/test/java/io/druid/server/router/TieredBrokerHostSelectorTest.java index a2c253dd4052..7cc0a9c5fd8e 100644 --- a/server/src/test/java/io/druid/server/router/TieredBrokerHostSelectorTest.java +++ b/server/src/test/java/io/druid/server/router/TieredBrokerHostSelectorTest.java @@ -27,13 +27,11 @@ import com.google.common.collect.ImmutableSet; import com.google.common.collect.Lists; import com.google.common.collect.Maps; -import com.metamx.http.client.HttpClient; import io.druid.client.DruidServer; import io.druid.client.selector.Server; import io.druid.discovery.DiscoveryDruidNode; import io.druid.discovery.DruidNodeDiscovery; import io.druid.discovery.DruidNodeDiscoveryProvider; -import io.druid.guice.annotations.Global; import io.druid.guice.annotations.Json; import io.druid.java.util.common.Intervals; import io.druid.java.util.common.Pair; @@ -115,7 +113,7 @@ public void registerListener(Listener listener) EasyMock.replay(druidNodeDiscoveryProvider); brokerSelector = new TieredBrokerHostSelector( - new TestRuleManager(null, null, null), + new TestRuleManager(null, null), new TieredBrokerConfig() { @Override @@ -320,12 +318,11 @@ public List apply(@Nullable List servers) private static class TestRuleManager extends CoordinatorRuleManager { public TestRuleManager( - @Global HttpClient httpClient, @Json ObjectMapper jsonMapper, Supplier config ) { - super(httpClient, jsonMapper, config, null); + super(jsonMapper, config, null); } @Override diff --git a/services/src/main/java/io/druid/cli/CliRouter.java b/services/src/main/java/io/druid/cli/CliRouter.java index aa28951c836d..faebceb9b4d6 100644 --- a/services/src/main/java/io/druid/cli/CliRouter.java +++ b/services/src/main/java/io/druid/cli/CliRouter.java @@ -26,10 +26,12 @@ import com.google.inject.Provides; import com.google.inject.TypeLiteral; import com.google.inject.name.Names; +import com.metamx.http.client.HttpClient; import io.airlift.airline.Command; import io.druid.curator.discovery.DiscoveryModule; import io.druid.curator.discovery.ServerDiscoveryFactory; import io.druid.curator.discovery.ServerDiscoverySelector; +import io.druid.discovery.DruidLeaderClient; import io.druid.discovery.DruidNodeDiscoveryProvider; import io.druid.guice.Jerseys; import io.druid.guice.JsonConfigProvider; @@ -39,6 +41,7 @@ import io.druid.guice.QueryRunnerFactoryModule; import io.druid.guice.QueryableModule; import io.druid.guice.RouterProcessingModule; +import io.druid.guice.annotations.Global; import io.druid.guice.annotations.Self; import io.druid.guice.http.JettyHttpClientModule; import io.druid.java.util.common.logger.Logger; @@ -131,6 +134,23 @@ public ServerDiscoverySelector getCoordinatorServerDiscoverySelector( { return factory.createSelector(config.getCoordinatorServiceName()); } + + @Provides + @ManageLifecycle + public DruidLeaderClient getLeaderHttpClient( + @Global HttpClient httpClient, + DruidNodeDiscoveryProvider druidNodeDiscoveryProvider, + ServerDiscoverySelector serverDiscoverySelector + ) + { + return new DruidLeaderClient( + httpClient, + druidNodeDiscoveryProvider, + DruidNodeDiscoveryProvider.NODE_TYPE_COORDINATOR, + "/druid/coordinator/v1/leader", + serverDiscoverySelector + ); + } }, new LookupModule() );