diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java index cc73ea42c1a4..f890a9e9ac0f 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java @@ -34,10 +34,10 @@ import it.unimi.dsi.fastutil.ints.IntArraySet; import it.unimi.dsi.fastutil.ints.IntList; import it.unimi.dsi.fastutil.ints.IntSet; +import org.apache.druid.client.broker.BrokerClient; import org.apache.druid.common.guava.FutureUtils; import org.apache.druid.data.input.StringTuple; import org.apache.druid.data.input.impl.DimensionsSpec; -import org.apache.druid.discovery.BrokerClient; import org.apache.druid.error.DruidException; import org.apache.druid.frame.allocation.ArenaMemoryAllocator; import org.apache.druid.frame.channel.ReadableConcatFrameChannel; diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/SegmentLoadStatusFetcher.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/SegmentLoadStatusFetcher.java index d4eaef600125..814f4d8a63db 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/SegmentLoadStatusFetcher.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/SegmentLoadStatusFetcher.java @@ -26,23 +26,20 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; +import org.apache.druid.client.broker.BrokerClient; import org.apache.druid.common.guava.FutureUtils; -import org.apache.druid.discovery.BrokerClient; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.common.logger.Logger; -import org.apache.druid.java.util.http.client.Request; +import org.apache.druid.query.http.ClientSqlQuery; import org.apache.druid.sql.http.ResultFormat; -import org.apache.druid.sql.http.SqlQuery; import org.apache.druid.timeline.DataSegment; -import org.jboss.netty.handler.codec.http.HttpMethod; import org.joda.time.DateTime; import org.joda.time.Interval; import javax.annotation.Nullable; -import javax.ws.rs.core.MediaType; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -238,13 +235,12 @@ private void updateStatus(State state, DateTime startTime) */ private VersionLoadStatus fetchLoadStatusFromBroker() throws Exception { - Request request = brokerClient.makeRequest(HttpMethod.POST, "/druid/v2/sql/"); - SqlQuery sqlQuery = new SqlQuery(StringUtils.format(LOAD_QUERY, datasource, versionsConditionString), - ResultFormat.OBJECTLINES, - false, false, false, null, null + ClientSqlQuery clientSqlQuery = new ClientSqlQuery( + StringUtils.format(LOAD_QUERY, datasource, versionsConditionString), + ResultFormat.OBJECTLINES.contentType(), + false, false, false, null, null ); - request.setContent(MediaType.APPLICATION_JSON, objectMapper.writeValueAsBytes(sqlQuery)); - String response = brokerClient.sendQuery(request); + final String response = FutureUtils.get(brokerClient.submitSqlQuery(clientSqlQuery), true); if (response == null) { // Unable to query broker diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/SegmentLoadStatusFetcherTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/SegmentLoadStatusFetcherTest.java index 548a7ac473e9..fdebcc7a655f 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/SegmentLoadStatusFetcherTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/SegmentLoadStatusFetcherTest.java @@ -20,9 +20,11 @@ package org.apache.druid.msq.exec; import com.fasterxml.jackson.databind.ObjectMapper; -import org.apache.druid.discovery.BrokerClient; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import org.apache.druid.client.broker.BrokerClient; import org.apache.druid.java.util.common.Intervals; -import org.apache.druid.java.util.http.client.Request; +import org.apache.druid.query.http.ClientSqlQuery; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.partition.NumberedShardSpec; import org.junit.Assert; @@ -34,17 +36,18 @@ import java.util.stream.IntStream; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.doAnswer; -import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; public class SegmentLoadStatusFetcherTest { private static final String TEST_DATASOURCE = "testDatasource"; + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + private SegmentLoadStatusFetcher segmentLoadWaiter; private BrokerClient brokerClient; @@ -57,13 +60,14 @@ public void testSingleVersionWaitsForLoadCorrectly() throws Exception { brokerClient = mock(BrokerClient.class); - doReturn(mock(Request.class)).when(brokerClient).makeRequest(any(), anyString()); - doAnswer(new Answer() + String dummyString = ""; + when(brokerClient.submitSqlQuery(any(ClientSqlQuery.class))).thenReturn(Futures.immediateFuture(dummyString)); + doAnswer(new Answer>() { int timesInvoked = 0; @Override - public String answer(InvocationOnMock invocation) throws Throwable + public ListenableFuture answer(InvocationOnMock invocation) throws Throwable { timesInvoked += 1; SegmentLoadStatusFetcher.VersionLoadStatus loadStatus = new SegmentLoadStatusFetcher.VersionLoadStatus( @@ -73,12 +77,13 @@ public String answer(InvocationOnMock invocation) throws Throwable 5 - timesInvoked, 0 ); - return new ObjectMapper().writeValueAsString(loadStatus); + String jsonResponse = OBJECT_MAPPER.writeValueAsString(loadStatus); + return Futures.immediateFuture(jsonResponse); } - }).when(brokerClient).sendQuery(any()); + }).when(brokerClient).submitSqlQuery(any(ClientSqlQuery.class)); segmentLoadWaiter = new SegmentLoadStatusFetcher( brokerClient, - new ObjectMapper(), + OBJECT_MAPPER, "id", TEST_DATASOURCE, IntStream.range(0, 5).boxed().map(partitionNum -> createTestDataSegment("version1", partitionNum)).collect(Collectors.toSet()), @@ -86,7 +91,7 @@ public String answer(InvocationOnMock invocation) throws Throwable ); segmentLoadWaiter.waitForSegmentsToLoad(); - verify(brokerClient, times(5)).sendQuery(any()); + verify(brokerClient, times(5)).submitSqlQuery(any(ClientSqlQuery.class)); } @Test @@ -94,13 +99,14 @@ public void testMultipleVersionWaitsForLoadCorrectly() throws Exception { brokerClient = mock(BrokerClient.class); - doReturn(mock(Request.class)).when(brokerClient).makeRequest(any(), anyString()); - doAnswer(new Answer() + String dummyString = ""; + when(brokerClient.submitSqlQuery(any(ClientSqlQuery.class))).thenReturn(Futures.immediateFuture(dummyString)); + when(brokerClient.submitSqlQuery(any(ClientSqlQuery.class))).thenAnswer(new Answer>() { int timesInvoked = 0; @Override - public String answer(InvocationOnMock invocation) throws Throwable + public ListenableFuture answer(InvocationOnMock invocation) throws Throwable { timesInvoked += 1; SegmentLoadStatusFetcher.VersionLoadStatus loadStatus = new SegmentLoadStatusFetcher.VersionLoadStatus( @@ -110,12 +116,13 @@ public String answer(InvocationOnMock invocation) throws Throwable 5 - timesInvoked, 0 ); - return new ObjectMapper().writeValueAsString(loadStatus); + String jsonResponse = OBJECT_MAPPER.writeValueAsString(loadStatus); + return Futures.immediateFuture(jsonResponse); } - }).when(brokerClient).sendQuery(any()); + }); segmentLoadWaiter = new SegmentLoadStatusFetcher( brokerClient, - new ObjectMapper(), + OBJECT_MAPPER, "id", TEST_DATASOURCE, IntStream.range(0, 5).boxed().map(partitionNum -> createTestDataSegment("version1", partitionNum)).collect(Collectors.toSet()), @@ -123,22 +130,25 @@ public String answer(InvocationOnMock invocation) throws Throwable ); segmentLoadWaiter.waitForSegmentsToLoad(); - verify(brokerClient, times(5)).sendQuery(any()); + verify(brokerClient, times(5)).submitSqlQuery(any(ClientSqlQuery.class)); } @Test public void triggerCancellationFromAnotherThread() throws Exception { brokerClient = mock(BrokerClient.class); - doReturn(mock(Request.class)).when(brokerClient).makeRequest(any(), anyString()); - doAnswer(new Answer() + + String dummyString = ""; + when(brokerClient.submitSqlQuery(any(ClientSqlQuery.class))).thenReturn(Futures.immediateFuture(dummyString)); + + doAnswer(new Answer>() { int timesInvoked = 0; @Override - public String answer(InvocationOnMock invocation) throws Throwable + public ListenableFuture answer(InvocationOnMock invocation) throws Throwable { - // sleeping broker call to simulate a long running query + // sleeping broker call to simulate a long-running query Thread.sleep(1000); timesInvoked++; SegmentLoadStatusFetcher.VersionLoadStatus loadStatus = new SegmentLoadStatusFetcher.VersionLoadStatus( @@ -148,12 +158,13 @@ public String answer(InvocationOnMock invocation) throws Throwable 5 - timesInvoked, 0 ); - return new ObjectMapper().writeValueAsString(loadStatus); + String jsonResponse = OBJECT_MAPPER.writeValueAsString(loadStatus); + return Futures.immediateFuture(jsonResponse); } - }).when(brokerClient).sendQuery(any()); + }).when(brokerClient).submitSqlQuery(any(ClientSqlQuery.class)); segmentLoadWaiter = new SegmentLoadStatusFetcher( brokerClient, - new ObjectMapper(), + OBJECT_MAPPER, "id", TEST_DATASOURCE, IntStream.range(0, 5).boxed().map(partitionNum -> createTestDataSegment("version1", partitionNum)).collect(Collectors.toSet()), diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java index 951e08d35a3d..08efce8e9106 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java @@ -43,7 +43,6 @@ import org.apache.druid.data.input.impl.DimensionsSpec; import org.apache.druid.data.input.impl.LongDimensionSchema; import org.apache.druid.data.input.impl.StringDimensionSchema; -import org.apache.druid.discovery.BrokerClient; import org.apache.druid.discovery.NodeRole; import org.apache.druid.frame.channel.FrameChannelSequence; import org.apache.druid.frame.processor.Bouncer; @@ -75,7 +74,6 @@ import org.apache.druid.java.util.common.io.Closer; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.java.util.emitter.EmittingLogger; -import org.apache.druid.java.util.http.client.Request; import org.apache.druid.metadata.input.InputSourceModule; import org.apache.druid.msq.counters.CounterNames; import org.apache.druid.msq.counters.CounterSnapshots; @@ -247,7 +245,7 @@ import static org.mockito.Mockito.mock; /** - * Base test runner for running MSQ unit tests. It sets up multi stage query execution environment + * Base test runner for running MSQ unit tests. It sets up multi-stage query execution environment * and populates data for the datasources. The runner does not go via the HTTP layer for communication between the * various MSQ processes. *

@@ -437,7 +435,6 @@ public void setUp2() throws Exception segmentManager = new MSQTestSegmentManager(segmentCacheManager); - BrokerClient brokerClient = mock(BrokerClient.class); List modules = ImmutableList.of( binder -> { DruidProcessingConfig druidProcessingConfig = new DruidProcessingConfig() @@ -537,7 +534,6 @@ public String getFormatString() new LookylooModule(), new SegmentWranglerModule(), new HllSketchModule(), - binder -> binder.bind(BrokerClient.class).toInstance(brokerClient), binder -> binder.bind(Bouncer.class).toInstance(new Bouncer(1)) ); // adding node role injection to the modules, since CliPeon would also do that through run method @@ -551,8 +547,6 @@ public String getFormatString() objectMapper.registerModules(sqlModule.getJacksonModules()); objectMapper.registerModules(BuiltInTypesModule.getJacksonModulesList()); - doReturn(mock(Request.class)).when(brokerClient).makeRequest(any(), anyString()); - testTaskActionClient = Mockito.spy(new MSQTestTaskActionClient(objectMapper, injector)); indexingServiceClient = new MSQTestOverlordServiceClient( objectMapper, diff --git a/server/src/main/java/org/apache/druid/client/broker/BrokerClient.java b/server/src/main/java/org/apache/druid/client/broker/BrokerClient.java index 611e6399ee69..ea370572c447 100644 --- a/server/src/main/java/org/apache/druid/client/broker/BrokerClient.java +++ b/server/src/main/java/org/apache/druid/client/broker/BrokerClient.java @@ -37,6 +37,11 @@ */ public interface BrokerClient { + /** + * Submit the given {@code sqlQuery} to the Broker's SQL query endpoint. + */ + ListenableFuture submitSqlQuery(ClientSqlQuery sqlQuery); + /** * Submit the given {@code sqlQuery} to the Broker's SQL task endpoint. */ diff --git a/server/src/main/java/org/apache/druid/client/broker/BrokerClientImpl.java b/server/src/main/java/org/apache/druid/client/broker/BrokerClientImpl.java index 728a67401b2e..5ad609147429 100644 --- a/server/src/main/java/org/apache/druid/client/broker/BrokerClientImpl.java +++ b/server/src/main/java/org/apache/druid/client/broker/BrokerClientImpl.java @@ -26,6 +26,8 @@ import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.jackson.JacksonUtils; import org.apache.druid.java.util.http.client.response.BytesFullResponseHandler; +import org.apache.druid.java.util.http.client.response.FullResponseHolder; +import org.apache.druid.java.util.http.client.response.StringFullResponseHandler; import org.apache.druid.query.explain.ExplainPlan; import org.apache.druid.query.http.ClientSqlQuery; import org.apache.druid.query.http.SqlTaskStatus; @@ -33,6 +35,7 @@ import org.apache.druid.rpc.ServiceClient; import org.jboss.netty.handler.codec.http.HttpMethod; +import java.nio.charset.StandardCharsets; import java.util.List; public class BrokerClientImpl implements BrokerClient @@ -46,6 +49,19 @@ public BrokerClientImpl(final ServiceClient client, final ObjectMapper jsonMappe this.jsonMapper = jsonMapper; } + @Override + public ListenableFuture submitSqlQuery(final ClientSqlQuery sqlQuery) + { + return FutureUtils.transform( + client.asyncRequest( + new RequestBuilder(HttpMethod.POST, "/druid/v2/sql/") + .jsonContent(jsonMapper, sqlQuery), + new StringFullResponseHandler(StandardCharsets.UTF_8) + ), + FullResponseHolder::getContent + ); + } + @Override public ListenableFuture submitSqlTask(final ClientSqlQuery sqlQuery) { diff --git a/server/src/main/java/org/apache/druid/discovery/BrokerClient.java b/server/src/main/java/org/apache/druid/discovery/BrokerClient.java deleted file mode 100644 index a0ddbf42bed8..000000000000 --- a/server/src/main/java/org/apache/druid/discovery/BrokerClient.java +++ /dev/null @@ -1,127 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.druid.discovery; - -import com.google.inject.Inject; -import org.apache.druid.error.DruidException; -import org.apache.druid.guice.annotations.EscalatedGlobal; -import org.apache.druid.java.util.common.IOE; -import org.apache.druid.java.util.common.RetryUtils; -import org.apache.druid.java.util.common.StringUtils; -import org.apache.druid.java.util.http.client.HttpClient; -import org.apache.druid.java.util.http.client.Request; -import org.apache.druid.java.util.http.client.response.StringFullResponseHandler; -import org.apache.druid.java.util.http.client.response.StringFullResponseHolder; -import org.apache.druid.rpc.ServiceClient; -import org.jboss.netty.channel.ChannelException; -import org.jboss.netty.handler.codec.http.HttpMethod; -import org.jboss.netty.handler.codec.http.HttpResponseStatus; - -import java.io.IOException; -import java.net.MalformedURLException; -import java.net.URL; -import java.nio.charset.StandardCharsets; -import java.util.concurrent.ExecutionException; - -/** - * This class facilitates interaction with Broker. - * Note that this should be removed and reconciled with org.apache.druid.sql.client.BrokerClient, which has the - * built-in functionality of {@link ServiceClient}, and proper Guice and service discovery wired in. - */ -@Deprecated -public class BrokerClient -{ - private static final int MAX_RETRIES = 5; - - private final HttpClient brokerHttpClient; - private final DruidNodeDiscovery druidNodeDiscovery; - - @Inject - public BrokerClient( - @EscalatedGlobal HttpClient brokerHttpClient, - DruidNodeDiscoveryProvider druidNodeDiscoveryProvider - ) - { - this.brokerHttpClient = brokerHttpClient; - this.druidNodeDiscovery = druidNodeDiscoveryProvider.getForNodeRole(NodeRole.BROKER); - } - - /** - * Creates and returns a {@link Request} after choosing a broker. - */ - public Request makeRequest(HttpMethod httpMethod, String urlPath) throws IOException - { - String host = ClientUtils.pickOneHost(druidNodeDiscovery); - - if (host == null) { - throw DruidException.forPersona(DruidException.Persona.ADMIN) - .ofCategory(DruidException.Category.NOT_FOUND) - .build("A leader node could not be found for [%s] service. Check the logs to validate that service is healthy.", NodeRole.BROKER); - } - return new Request(httpMethod, new URL(StringUtils.format("%s%s", host, urlPath))); - } - - public String sendQuery(final Request request) throws Exception - { - return RetryUtils.retry( - () -> { - Request newRequestUrl = getNewRequestUrl(request); - final StringFullResponseHolder fullResponseHolder = brokerHttpClient.go(newRequestUrl, new StringFullResponseHandler(StandardCharsets.UTF_8)).get(); - - HttpResponseStatus responseStatus = fullResponseHolder.getResponse().getStatus(); - if (HttpResponseStatus.SERVICE_UNAVAILABLE.equals(responseStatus) - || HttpResponseStatus.GATEWAY_TIMEOUT.equals(responseStatus)) { - throw DruidException.forPersona(DruidException.Persona.OPERATOR) - .ofCategory(DruidException.Category.RUNTIME_FAILURE) - .build("Request to broker failed due to failed response status: [%s]", responseStatus); - } - return fullResponseHolder.getContent(); - }, - (throwable) -> { - if (throwable instanceof ExecutionException) { - return throwable.getCause() instanceof IOException || throwable.getCause() instanceof ChannelException; - } - if (throwable instanceof DruidException) { - return ((DruidException) throwable).getCategory() == DruidException.Category.RUNTIME_FAILURE; - } - return throwable instanceof IOE; - }, - MAX_RETRIES - ); - } - - private Request getNewRequestUrl(Request oldRequest) - { - try { - return ClientUtils.withUrl( - oldRequest, - new URL(StringUtils.format("%s%s", ClientUtils.pickOneHost(druidNodeDiscovery), oldRequest.getUrl().getPath())) - ); - } - catch (MalformedURLException e) { - // Not an IOException; this is our own fault. - throw DruidException.defensive( - "Failed to build url with path[%s] and query string [%s].", - oldRequest.getUrl().getPath(), - oldRequest.getUrl().getQuery() - ); - } - } -} diff --git a/server/src/test/java/org/apache/druid/discovery/BrokerClientTest.java b/server/src/test/java/org/apache/druid/discovery/BrokerClientTest.java deleted file mode 100644 index de03877a9b0c..000000000000 --- a/server/src/test/java/org/apache/druid/discovery/BrokerClientTest.java +++ /dev/null @@ -1,182 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.druid.discovery; - -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; -import com.google.inject.Injector; -import com.google.inject.Key; -import com.google.inject.name.Names; -import org.apache.druid.guice.GuiceInjectors; -import org.apache.druid.guice.Jerseys; -import org.apache.druid.guice.JsonConfigProvider; -import org.apache.druid.guice.LazySingleton; -import org.apache.druid.guice.LifecycleModule; -import org.apache.druid.guice.annotations.Self; -import org.apache.druid.initialization.Initialization; -import org.apache.druid.java.util.http.client.HttpClient; -import org.apache.druid.java.util.http.client.Request; -import org.apache.druid.server.DruidNode; -import org.apache.druid.server.initialization.BaseJettyTest; -import org.apache.druid.server.initialization.jetty.JettyServerInitializer; -import org.easymock.EasyMock; -import org.eclipse.jetty.server.Server; -import org.jboss.netty.handler.codec.http.HttpMethod; -import org.junit.Assert; -import org.junit.Test; - -import javax.ws.rs.POST; -import javax.ws.rs.Path; -import javax.ws.rs.Produces; -import javax.ws.rs.core.MediaType; -import javax.ws.rs.core.Response; -import java.nio.charset.StandardCharsets; - -public class BrokerClientTest extends BaseJettyTest -{ - private DiscoveryDruidNode discoveryDruidNode; - private HttpClient httpClient; - - @Override - protected Injector setupInjector() - { - final DruidNode node = new DruidNode("test", "localhost", false, null, null, true, false); - discoveryDruidNode = new DiscoveryDruidNode(node, NodeRole.BROKER, ImmutableMap.of()); - - Injector injector = Initialization.makeInjectorWithModules( - GuiceInjectors.makeStartupInjector(), ImmutableList.of( - binder -> { - JsonConfigProvider.bindInstance( - binder, - Key.get(DruidNode.class, Self.class), - node - ); - binder.bind(Integer.class).annotatedWith(Names.named("port")).toInstance(node.getPlaintextPort()); - binder.bind(JettyServerInitializer.class).to(DruidLeaderClientTest.TestJettyServerInitializer.class).in(LazySingleton.class); - Jerseys.addResource(binder, SimpleResource.class); - LifecycleModule.register(binder, Server.class); - } - ) - ); - httpClient = injector.getInstance(BaseJettyTest.ClientHolder.class).getClient(); - return injector; - } - - @Test - public void testSimple() throws Exception - { - DruidNodeDiscovery druidNodeDiscovery = EasyMock.createMock(DruidNodeDiscovery.class); - EasyMock.expect(druidNodeDiscovery.getAllNodes()).andReturn(ImmutableList.of(discoveryDruidNode)).anyTimes(); - - DruidNodeDiscoveryProvider druidNodeDiscoveryProvider = EasyMock.createMock(DruidNodeDiscoveryProvider.class); - EasyMock.expect(druidNodeDiscoveryProvider.getForNodeRole(NodeRole.BROKER)).andReturn(druidNodeDiscovery); - - EasyMock.replay(druidNodeDiscovery, druidNodeDiscoveryProvider); - - BrokerClient brokerClient = new BrokerClient( - httpClient, - druidNodeDiscoveryProvider - ); - - Request request = brokerClient.makeRequest(HttpMethod.POST, "/simple/direct"); - request.setContent("hello".getBytes(StandardCharsets.UTF_8)); - Assert.assertEquals("hello", brokerClient.sendQuery(request)); - } - - @Test - public void testRetryableError() throws Exception - { - DruidNodeDiscovery druidNodeDiscovery = EasyMock.createMock(DruidNodeDiscovery.class); - EasyMock.expect(druidNodeDiscovery.getAllNodes()).andReturn(ImmutableList.of(discoveryDruidNode)).anyTimes(); - - DruidNodeDiscoveryProvider druidNodeDiscoveryProvider = EasyMock.createMock(DruidNodeDiscoveryProvider.class); - EasyMock.expect(druidNodeDiscoveryProvider.getForNodeRole(NodeRole.BROKER)).andReturn(druidNodeDiscovery); - - EasyMock.replay(druidNodeDiscovery, druidNodeDiscoveryProvider); - - BrokerClient brokerClient = new BrokerClient( - httpClient, - druidNodeDiscoveryProvider - ); - - Request request = brokerClient.makeRequest(HttpMethod.POST, "/simple/flakey"); - request.setContent("hello".getBytes(StandardCharsets.UTF_8)); - Assert.assertEquals("hello", brokerClient.sendQuery(request)); - } - - @Test - public void testNonRetryableError() throws Exception - { - DruidNodeDiscovery druidNodeDiscovery = EasyMock.createMock(DruidNodeDiscovery.class); - EasyMock.expect(druidNodeDiscovery.getAllNodes()).andReturn(ImmutableList.of(discoveryDruidNode)).anyTimes(); - - DruidNodeDiscoveryProvider druidNodeDiscoveryProvider = EasyMock.createMock(DruidNodeDiscoveryProvider.class); - EasyMock.expect(druidNodeDiscoveryProvider.getForNodeRole(NodeRole.BROKER)).andReturn(druidNodeDiscovery); - - EasyMock.replay(druidNodeDiscovery, druidNodeDiscoveryProvider); - - BrokerClient brokerClient = new BrokerClient( - httpClient, - druidNodeDiscoveryProvider - ); - - Request request = brokerClient.makeRequest(HttpMethod.POST, "/simple/error"); - Assert.assertEquals("", brokerClient.sendQuery(request)); - } - - @Path("/simple") - public static class SimpleResource - { - private static int attempt = 0; - - @POST - @Path("/direct") - @Produces(MediaType.APPLICATION_JSON) - public Response direct(String input) - { - if ("hello".equals(input)) { - return Response.ok("hello").build(); - } else { - return Response.serverError().build(); - } - } - - @POST - @Path("/flakey") - @Produces(MediaType.APPLICATION_JSON) - public Response redirecting() - { - if (attempt > 2) { - return Response.ok("hello").build(); - } else { - attempt += 1; - return Response.status(504).build(); - } - } - - @POST - @Path("/error") - @Produces(MediaType.APPLICATION_JSON) - public Response error() - { - return Response.status(404).build(); - } - } -}