Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,10 @@
import it.unimi.dsi.fastutil.ints.IntArraySet;
import it.unimi.dsi.fastutil.ints.IntList;
import it.unimi.dsi.fastutil.ints.IntSet;
import org.apache.druid.client.broker.BrokerClient;
import org.apache.druid.common.guava.FutureUtils;
import org.apache.druid.data.input.StringTuple;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.discovery.BrokerClient;
import org.apache.druid.error.DruidException;
import org.apache.druid.frame.allocation.ArenaMemoryAllocator;
import org.apache.druid.frame.channel.ReadableConcatFrameChannel;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,23 +26,20 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import org.apache.druid.client.broker.BrokerClient;
import org.apache.druid.common.guava.FutureUtils;
import org.apache.druid.discovery.BrokerClient;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.http.client.Request;
import org.apache.druid.query.http.ClientSqlQuery;
import org.apache.druid.sql.http.ResultFormat;
import org.apache.druid.sql.http.SqlQuery;
import org.apache.druid.timeline.DataSegment;
import org.jboss.netty.handler.codec.http.HttpMethod;
import org.joda.time.DateTime;
import org.joda.time.Interval;

import javax.annotation.Nullable;
import javax.ws.rs.core.MediaType;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -238,13 +235,12 @@ private void updateStatus(State state, DateTime startTime)
*/
private VersionLoadStatus fetchLoadStatusFromBroker() throws Exception
{
Request request = brokerClient.makeRequest(HttpMethod.POST, "/druid/v2/sql/");
SqlQuery sqlQuery = new SqlQuery(StringUtils.format(LOAD_QUERY, datasource, versionsConditionString),
ResultFormat.OBJECTLINES,
false, false, false, null, null
ClientSqlQuery clientSqlQuery = new ClientSqlQuery(
StringUtils.format(LOAD_QUERY, datasource, versionsConditionString),
ResultFormat.OBJECTLINES.contentType(),
false, false, false, null, null
);
request.setContent(MediaType.APPLICATION_JSON, objectMapper.writeValueAsBytes(sqlQuery));
String response = brokerClient.sendQuery(request);
final String response = FutureUtils.get(brokerClient.submitSqlQuery(clientSqlQuery), true);

if (response == null) {
// Unable to query broker
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@
package org.apache.druid.msq.exec;

import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.druid.discovery.BrokerClient;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import org.apache.druid.client.broker.BrokerClient;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.http.client.Request;
import org.apache.druid.query.http.ClientSqlQuery;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.NumberedShardSpec;
import org.junit.Assert;
Expand All @@ -34,17 +36,18 @@
import java.util.stream.IntStream;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

public class SegmentLoadStatusFetcherTest
{
private static final String TEST_DATASOURCE = "testDatasource";

private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();

private SegmentLoadStatusFetcher segmentLoadWaiter;

private BrokerClient brokerClient;
Expand All @@ -57,13 +60,14 @@ public void testSingleVersionWaitsForLoadCorrectly() throws Exception
{
brokerClient = mock(BrokerClient.class);

doReturn(mock(Request.class)).when(brokerClient).makeRequest(any(), anyString());
doAnswer(new Answer<String>()
String dummyString = "";
when(brokerClient.submitSqlQuery(any(ClientSqlQuery.class))).thenReturn(Futures.immediateFuture(dummyString));
doAnswer(new Answer<ListenableFuture<String>>()
{
int timesInvoked = 0;

@Override
public String answer(InvocationOnMock invocation) throws Throwable
public ListenableFuture<String> answer(InvocationOnMock invocation) throws Throwable
{
timesInvoked += 1;
SegmentLoadStatusFetcher.VersionLoadStatus loadStatus = new SegmentLoadStatusFetcher.VersionLoadStatus(
Expand All @@ -73,34 +77,36 @@ public String answer(InvocationOnMock invocation) throws Throwable
5 - timesInvoked,
0
);
return new ObjectMapper().writeValueAsString(loadStatus);
String jsonResponse = OBJECT_MAPPER.writeValueAsString(loadStatus);
return Futures.immediateFuture(jsonResponse);
}
}).when(brokerClient).sendQuery(any());
}).when(brokerClient).submitSqlQuery(any(ClientSqlQuery.class));
segmentLoadWaiter = new SegmentLoadStatusFetcher(
brokerClient,
new ObjectMapper(),
OBJECT_MAPPER,
"id",
TEST_DATASOURCE,
IntStream.range(0, 5).boxed().map(partitionNum -> createTestDataSegment("version1", partitionNum)).collect(Collectors.toSet()),
false
);
segmentLoadWaiter.waitForSegmentsToLoad();

verify(brokerClient, times(5)).sendQuery(any());
verify(brokerClient, times(5)).submitSqlQuery(any(ClientSqlQuery.class));
}

@Test
public void testMultipleVersionWaitsForLoadCorrectly() throws Exception
{
brokerClient = mock(BrokerClient.class);

doReturn(mock(Request.class)).when(brokerClient).makeRequest(any(), anyString());
doAnswer(new Answer<String>()
String dummyString = "";
when(brokerClient.submitSqlQuery(any(ClientSqlQuery.class))).thenReturn(Futures.immediateFuture(dummyString));
when(brokerClient.submitSqlQuery(any(ClientSqlQuery.class))).thenAnswer(new Answer<ListenableFuture<String>>()
{
int timesInvoked = 0;

@Override
public String answer(InvocationOnMock invocation) throws Throwable
public ListenableFuture<String> answer(InvocationOnMock invocation) throws Throwable
{
timesInvoked += 1;
SegmentLoadStatusFetcher.VersionLoadStatus loadStatus = new SegmentLoadStatusFetcher.VersionLoadStatus(
Expand All @@ -110,35 +116,39 @@ public String answer(InvocationOnMock invocation) throws Throwable
5 - timesInvoked,
0
);
return new ObjectMapper().writeValueAsString(loadStatus);
String jsonResponse = OBJECT_MAPPER.writeValueAsString(loadStatus);
return Futures.immediateFuture(jsonResponse);
}
}).when(brokerClient).sendQuery(any());
});
segmentLoadWaiter = new SegmentLoadStatusFetcher(
brokerClient,
new ObjectMapper(),
OBJECT_MAPPER,
"id",
TEST_DATASOURCE,
IntStream.range(0, 5).boxed().map(partitionNum -> createTestDataSegment("version1", partitionNum)).collect(Collectors.toSet()),
false
);
segmentLoadWaiter.waitForSegmentsToLoad();

verify(brokerClient, times(5)).sendQuery(any());
verify(brokerClient, times(5)).submitSqlQuery(any(ClientSqlQuery.class));
}

@Test
public void triggerCancellationFromAnotherThread() throws Exception
{
brokerClient = mock(BrokerClient.class);
doReturn(mock(Request.class)).when(brokerClient).makeRequest(any(), anyString());
doAnswer(new Answer<String>()

String dummyString = "";
when(brokerClient.submitSqlQuery(any(ClientSqlQuery.class))).thenReturn(Futures.immediateFuture(dummyString));

doAnswer(new Answer<ListenableFuture<String>>()
{
int timesInvoked = 0;

@Override
public String answer(InvocationOnMock invocation) throws Throwable
public ListenableFuture<String> answer(InvocationOnMock invocation) throws Throwable
{
// sleeping broker call to simulate a long running query
// sleeping broker call to simulate a long-running query
Thread.sleep(1000);
timesInvoked++;
SegmentLoadStatusFetcher.VersionLoadStatus loadStatus = new SegmentLoadStatusFetcher.VersionLoadStatus(
Expand All @@ -148,12 +158,13 @@ public String answer(InvocationOnMock invocation) throws Throwable
5 - timesInvoked,
0
);
return new ObjectMapper().writeValueAsString(loadStatus);
String jsonResponse = OBJECT_MAPPER.writeValueAsString(loadStatus);
return Futures.immediateFuture(jsonResponse);
}
}).when(brokerClient).sendQuery(any());
}).when(brokerClient).submitSqlQuery(any(ClientSqlQuery.class));
segmentLoadWaiter = new SegmentLoadStatusFetcher(
brokerClient,
new ObjectMapper(),
OBJECT_MAPPER,
"id",
TEST_DATASOURCE,
IntStream.range(0, 5).boxed().map(partitionNum -> createTestDataSegment("version1", partitionNum)).collect(Collectors.toSet()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.LongDimensionSchema;
import org.apache.druid.data.input.impl.StringDimensionSchema;
import org.apache.druid.discovery.BrokerClient;
import org.apache.druid.discovery.NodeRole;
import org.apache.druid.frame.channel.FrameChannelSequence;
import org.apache.druid.frame.processor.Bouncer;
Expand Down Expand Up @@ -75,7 +74,6 @@
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.http.client.Request;
import org.apache.druid.metadata.input.InputSourceModule;
import org.apache.druid.msq.counters.CounterNames;
import org.apache.druid.msq.counters.CounterSnapshots;
Expand Down Expand Up @@ -247,7 +245,7 @@
import static org.mockito.Mockito.mock;

/**
* Base test runner for running MSQ unit tests. It sets up multi stage query execution environment
* Base test runner for running MSQ unit tests. It sets up multi-stage query execution environment
* and populates data for the datasources. The runner does not go via the HTTP layer for communication between the
* various MSQ processes.
* <p>
Expand Down Expand Up @@ -437,7 +435,6 @@ public void setUp2() throws Exception

segmentManager = new MSQTestSegmentManager(segmentCacheManager);

BrokerClient brokerClient = mock(BrokerClient.class);
List<Module> modules = ImmutableList.of(
binder -> {
DruidProcessingConfig druidProcessingConfig = new DruidProcessingConfig()
Expand Down Expand Up @@ -537,7 +534,6 @@ public String getFormatString()
new LookylooModule(),
new SegmentWranglerModule(),
new HllSketchModule(),
binder -> binder.bind(BrokerClient.class).toInstance(brokerClient),
binder -> binder.bind(Bouncer.class).toInstance(new Bouncer(1))
);
// adding node role injection to the modules, since CliPeon would also do that through run method
Expand All @@ -551,8 +547,6 @@ public String getFormatString()
objectMapper.registerModules(sqlModule.getJacksonModules());
objectMapper.registerModules(BuiltInTypesModule.getJacksonModulesList());

doReturn(mock(Request.class)).when(brokerClient).makeRequest(any(), anyString());

testTaskActionClient = Mockito.spy(new MSQTestTaskActionClient(objectMapper, injector));
indexingServiceClient = new MSQTestOverlordServiceClient(
objectMapper,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,11 @@
*/
public interface BrokerClient
{
/**
* Submit the given {@code sqlQuery} to the Broker's SQL query endpoint.
*/
ListenableFuture<String> submitSqlQuery(ClientSqlQuery sqlQuery);

/**
* Submit the given {@code sqlQuery} to the Broker's SQL task endpoint.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,16 @@
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.jackson.JacksonUtils;
import org.apache.druid.java.util.http.client.response.BytesFullResponseHandler;
import org.apache.druid.java.util.http.client.response.FullResponseHolder;
import org.apache.druid.java.util.http.client.response.StringFullResponseHandler;
import org.apache.druid.query.explain.ExplainPlan;
import org.apache.druid.query.http.ClientSqlQuery;
import org.apache.druid.query.http.SqlTaskStatus;
import org.apache.druid.rpc.RequestBuilder;
import org.apache.druid.rpc.ServiceClient;
import org.jboss.netty.handler.codec.http.HttpMethod;

import java.nio.charset.StandardCharsets;
import java.util.List;

public class BrokerClientImpl implements BrokerClient
Expand All @@ -46,6 +49,19 @@ public BrokerClientImpl(final ServiceClient client, final ObjectMapper jsonMappe
this.jsonMapper = jsonMapper;
}

@Override
public ListenableFuture<String> submitSqlQuery(final ClientSqlQuery sqlQuery)
{
return FutureUtils.transform(
client.asyncRequest(
new RequestBuilder(HttpMethod.POST, "/druid/v2/sql/")
.jsonContent(jsonMapper, sqlQuery),
new StringFullResponseHandler(StandardCharsets.UTF_8)
),
FullResponseHolder::getContent
);
}

@Override
public ListenableFuture<SqlTaskStatus> submitSqlTask(final ClientSqlQuery sqlQuery)
{
Expand Down
Loading
Loading