From 337d70e0a9cdc198195172deba4a743db19cc997 Mon Sep 17 00:00:00 2001 From: Himanshu Gupta Date: Wed, 15 Apr 2020 15:24:36 -0700 Subject: [PATCH 1/8] remove DruidLeaderClient.goAsync(..) that does not follow redirect. Replace its usage by DruidLeaadereClient.go(..) with InputStreamFullResponseHandler --- .../response/BytesFullResponseHolder.java | 1 - .../client/response/FullResponseHolder.java | 7 +- .../InputStreamFullResponseHandler.java | 74 ++++++++++++++++ .../InputStreamFullResponseHolder.java | 62 +++++++++++++ .../response/StringFullResponseHolder.java | 1 - .../druid/discovery/DruidLeaderClient.java | 27 +----- .../calcite/schema/MetadataSegmentView.java | 41 +++++---- .../sql/calcite/schema/SystemSchema.java | 87 +++++++++++-------- .../sql/calcite/schema/SystemSchemaTest.java | 52 +++++------ .../druid/sql/calcite/util/CalciteTests.java | 2 - 10 files changed, 239 insertions(+), 115 deletions(-) create mode 100644 core/src/main/java/org/apache/druid/java/util/http/client/response/InputStreamFullResponseHandler.java create mode 100644 core/src/main/java/org/apache/druid/java/util/http/client/response/InputStreamFullResponseHolder.java diff --git a/core/src/main/java/org/apache/druid/java/util/http/client/response/BytesFullResponseHolder.java b/core/src/main/java/org/apache/druid/java/util/http/client/response/BytesFullResponseHolder.java index e1d31424e43a..8f6f233ba9a7 100644 --- a/core/src/main/java/org/apache/druid/java/util/http/client/response/BytesFullResponseHolder.java +++ b/core/src/main/java/org/apache/druid/java/util/http/client/response/BytesFullResponseHolder.java @@ -36,7 +36,6 @@ public BytesFullResponseHolder(HttpResponseStatus status, HttpResponse response) this.chunks = new ArrayList<>(); } - @Override public BytesFullResponseHolder addChunk(byte[] chunk) { chunks.add(chunk); diff --git a/core/src/main/java/org/apache/druid/java/util/http/client/response/FullResponseHolder.java b/core/src/main/java/org/apache/druid/java/util/http/client/response/FullResponseHolder.java index 001684d78190..fbbab874ffc9 100644 --- a/core/src/main/java/org/apache/druid/java/util/http/client/response/FullResponseHolder.java +++ b/core/src/main/java/org/apache/druid/java/util/http/client/response/FullResponseHolder.java @@ -49,12 +49,7 @@ public HttpResponse getResponse() } /** - * Append a new chunk of data. - */ - public abstract FullResponseHolder addChunk(T chunk); - - /** - * Get the accumulated data via {@link #addChunk}. + * Get the data. */ public abstract T getContent(); } diff --git a/core/src/main/java/org/apache/druid/java/util/http/client/response/InputStreamFullResponseHandler.java b/core/src/main/java/org/apache/druid/java/util/http/client/response/InputStreamFullResponseHandler.java new file mode 100644 index 000000000000..01a69a80f6cc --- /dev/null +++ b/core/src/main/java/org/apache/druid/java/util/http/client/response/InputStreamFullResponseHandler.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.java.util.http.client.response; + +import org.jboss.netty.buffer.ChannelBuffer; +import org.jboss.netty.handler.codec.http.HttpChunk; +import org.jboss.netty.handler.codec.http.HttpResponse; + +/** + * This is a clone of {@link InputStreamResponseHandler} except that it retains HTTP status/response object in the + * response holder result. + */ +public class InputStreamFullResponseHandler implements HttpResponseHandler +{ + @Override + public ClientResponse handleResponse(HttpResponse response, TrafficCop trafficCop) + { + InputStreamFullResponseHolder holder = new InputStreamFullResponseHolder(response.getStatus(), response); + holder.addChunk(getContentBytes(response.getContent())); + return ClientResponse.finished(holder); + } + + @Override + public ClientResponse handleChunk( + ClientResponse clientResponse, + HttpChunk chunk, + long chunkNum + ) + { + clientResponse.getObj().addChunk(getContentBytes(chunk.getContent())); + return clientResponse; + } + + @Override + public ClientResponse done(ClientResponse clientResponse) + { + InputStreamFullResponseHolder holder = clientResponse.getObj(); + holder.done(); + return ClientResponse.finished(holder); + } + + @Override + public void exceptionCaught( + ClientResponse clientResponse, + Throwable e + ) + { + clientResponse.getObj().exceptionCaught(e); + } + + private byte[] getContentBytes(ChannelBuffer content) + { + byte[] contentBytes = new byte[content.readableBytes()]; + content.readBytes(contentBytes); + return contentBytes; + } +} diff --git a/core/src/main/java/org/apache/druid/java/util/http/client/response/InputStreamFullResponseHolder.java b/core/src/main/java/org/apache/druid/java/util/http/client/response/InputStreamFullResponseHolder.java new file mode 100644 index 000000000000..fbabe63a754d --- /dev/null +++ b/core/src/main/java/org/apache/druid/java/util/http/client/response/InputStreamFullResponseHolder.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.java.util.http.client.response; + +import org.apache.druid.java.util.http.client.io.AppendableByteArrayInputStream; +import org.jboss.netty.handler.codec.http.HttpResponse; +import org.jboss.netty.handler.codec.http.HttpResponseStatus; + +import java.io.InputStream; + +public class InputStreamFullResponseHolder extends FullResponseHolder +{ + private final AppendableByteArrayInputStream is; + + public InputStreamFullResponseHolder( + HttpResponseStatus status, + HttpResponse response + ) + { + super(status, response); + is = new AppendableByteArrayInputStream(); + } + + public InputStreamFullResponseHolder addChunk(byte[] chunk) + { + is.add(chunk); + return this; + } + + @Override + public InputStream getContent() + { + return is; + } + + public void done() + { + is.done(); + } + + public void exceptionCaught(Throwable t) + { + is.exceptionCaught(t); + } +} diff --git a/core/src/main/java/org/apache/druid/java/util/http/client/response/StringFullResponseHolder.java b/core/src/main/java/org/apache/druid/java/util/http/client/response/StringFullResponseHolder.java index ac7b8aafa8f9..3fe2e081b5ea 100644 --- a/core/src/main/java/org/apache/druid/java/util/http/client/response/StringFullResponseHolder.java +++ b/core/src/main/java/org/apache/druid/java/util/http/client/response/StringFullResponseHolder.java @@ -38,7 +38,6 @@ public StringFullResponseHolder( this.builder = new StringBuilder(response.getContent().toString(charset)); } - @Override public StringFullResponseHolder addChunk(String chunk) { builder.append(chunk); diff --git a/server/src/main/java/org/apache/druid/discovery/DruidLeaderClient.java b/server/src/main/java/org/apache/druid/discovery/DruidLeaderClient.java index 95b4fa6a32c0..2b107bbac3a7 100644 --- a/server/src/main/java/org/apache/druid/discovery/DruidLeaderClient.java +++ b/server/src/main/java/org/apache/druid/discovery/DruidLeaderClient.java @@ -21,7 +21,6 @@ import com.google.common.base.Preconditions; import com.google.common.base.Throwables; -import com.google.common.util.concurrent.ListenableFuture; import org.apache.druid.client.selector.DiscoverySelector; import org.apache.druid.client.selector.Server; import org.apache.druid.concurrent.LifecycleLock; @@ -121,23 +120,13 @@ public void stop() log.debug("Stopped."); } - /** - * Make a Request object aimed at the leader. Throws IOException if the leader cannot be located. - * - * @param cached Uses cached leader if true, else uses the current leader - */ - public Request makeRequest(HttpMethod httpMethod, String urlPath, boolean cached) throws IOException - { - Preconditions.checkState(lifecycleLock.awaitStarted(1, TimeUnit.MILLISECONDS)); - return new Request(httpMethod, new URL(StringUtils.format("%s%s", getCurrentKnownLeader(cached), urlPath))); - } - /** * Make a Request object aimed at the leader. Throws IOException if the leader cannot be located. */ public Request makeRequest(HttpMethod httpMethod, String urlPath) throws IOException { - return makeRequest(httpMethod, urlPath, true); + Preconditions.checkState(lifecycleLock.awaitStarted(1, TimeUnit.MILLISECONDS)); + return new Request(httpMethod, new URL(StringUtils.format("%s%s", getCurrentKnownLeader(true), urlPath))); } public StringFullResponseHolder go(Request request) throws IOException, InterruptedException @@ -145,18 +134,6 @@ public StringFullResponseHolder go(Request request) throws IOException, Interrup return go(request, new StringFullResponseHandler(StandardCharsets.UTF_8)); } - /** - * Executes the request object aimed at the leader and process the response with given handler - * Note: this method doesn't do retrying on errors or handle leader changes occurred during communication - */ - public ListenableFuture goAsync( - final Request request, - final HttpResponseHandler handler - ) - { - return httpClient.go(request, handler); - } - /** * Executes a Request object aimed at the leader. Throws IOException if the leader cannot be located. */ diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/schema/MetadataSegmentView.java b/sql/src/main/java/org/apache/druid/sql/calcite/schema/MetadataSegmentView.java index 05cda79fd13d..af46cc41ace2 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/MetadataSegmentView.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/MetadataSegmentView.java @@ -24,7 +24,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableSortedSet; -import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.Uninterruptibles; import com.google.inject.Inject; import org.apache.druid.client.BrokerSegmentWatcherConfig; @@ -35,22 +35,24 @@ import org.apache.druid.discovery.DruidLeaderClient; import org.apache.druid.guice.ManageLifecycle; import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.common.RE; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.common.lifecycle.LifecycleStart; import org.apache.druid.java.util.common.lifecycle.LifecycleStop; import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.java.util.http.client.Request; +import org.apache.druid.java.util.http.client.response.InputStreamFullResponseHandler; +import org.apache.druid.java.util.http.client.response.InputStreamFullResponseHolder; import org.apache.druid.metadata.SegmentsMetadataManager; -import org.apache.druid.server.coordinator.BytesAccumulatingResponseHandler; import org.apache.druid.sql.calcite.planner.PlannerConfig; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.SegmentWithOvershadowedStatus; import org.checkerframework.checker.nullness.qual.MonotonicNonNull; import org.jboss.netty.handler.codec.http.HttpMethod; +import javax.servlet.http.HttpServletResponse; import java.io.IOException; -import java.io.InputStream; import java.util.Iterator; import java.util.Set; import java.util.concurrent.CountDownLatch; @@ -74,7 +76,6 @@ public class MetadataSegmentView private final DruidLeaderClient coordinatorDruidLeaderClient; private final ObjectMapper jsonMapper; - private final BytesAccumulatingResponseHandler responseHandler; private final BrokerSegmentWatcherConfig segmentWatcherConfig; private final boolean isCacheEnabled; @@ -96,7 +97,6 @@ public class MetadataSegmentView public MetadataSegmentView( final @Coordinator DruidLeaderClient druidLeaderClient, final ObjectMapper jsonMapper, - final BytesAccumulatingResponseHandler responseHandler, final BrokerSegmentWatcherConfig segmentWatcherConfig, final PlannerConfig plannerConfig ) @@ -104,7 +104,6 @@ public MetadataSegmentView( Preconditions.checkNotNull(plannerConfig, "plannerConfig"); this.coordinatorDruidLeaderClient = druidLeaderClient; this.jsonMapper = jsonMapper; - this.responseHandler = responseHandler; this.segmentWatcherConfig = segmentWatcherConfig; this.isCacheEnabled = plannerConfig.isMetadataSegmentCacheEnable(); this.pollPeriodInMS = plannerConfig.getMetadataSegmentPollPeriod(); @@ -148,7 +147,6 @@ private void poll() final JsonParserIterator metadataSegments = getMetadataSegments( coordinatorDruidLeaderClient, jsonMapper, - responseHandler, segmentWatcherConfig.getWatchedDataSources() ); @@ -175,7 +173,6 @@ Iterator getPublishedSegments() return getMetadataSegments( coordinatorDruidLeaderClient, jsonMapper, - responseHandler, segmentWatcherConfig.getWatchedDataSources() ); } @@ -185,7 +182,6 @@ Iterator getPublishedSegments() private JsonParserIterator getMetadataSegments( DruidLeaderClient coordinatorClient, ObjectMapper jsonMapper, - BytesAccumulatingResponseHandler responseHandler, Set watchedDataSources ) { @@ -201,32 +197,41 @@ private JsonParserIterator getMetadataSegments( query = "/druid/coordinator/v1/metadata/segments?includeOvershadowedStatus&" + sb; } Request request; + InputStreamFullResponseHolder responseHolder; try { request = coordinatorClient.makeRequest( HttpMethod.GET, - StringUtils.format(query), - false + StringUtils.format(query) ); + + responseHolder = coordinatorClient.go( + request, + new InputStreamFullResponseHandler() + ); + if (responseHolder.getStatus().getCode() != HttpServletResponse.SC_OK) { + throw new RE( + "Failed to talk to coordinator leader at [%s]. Error code[%d], description[%s].", + query, + responseHolder.getStatus().getCode(), + responseHolder.getStatus().getReasonPhrase() + ); + } } - catch (IOException e) { + catch (IOException | InterruptedException e) { throw new RuntimeException(e); } - ListenableFuture future = coordinatorClient.goAsync( - request, - responseHandler - ); final JavaType typeRef = jsonMapper.getTypeFactory().constructType(new TypeReference() { }); return new JsonParserIterator<>( typeRef, - future, + Futures.immediateFuture(responseHolder.getContent()), request.getUrl().toString(), null, request.getUrl().getHost(), jsonMapper, - responseHandler + null ); } diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java index c6bff43ac075..8bf3b96d6c3d 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java @@ -30,7 +30,7 @@ import com.google.common.collect.Iterables; import com.google.common.collect.Maps; import com.google.common.net.HostAndPort; -import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.Futures; import com.google.inject.Inject; import org.apache.calcite.DataContext; import org.apache.calcite.linq4j.DefaultEnumerable; @@ -60,10 +60,11 @@ import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.parsers.CloseableIterator; import org.apache.druid.java.util.http.client.Request; +import org.apache.druid.java.util.http.client.response.InputStreamFullResponseHandler; +import org.apache.druid.java.util.http.client.response.InputStreamFullResponseHolder; import org.apache.druid.segment.column.RowSignature; import org.apache.druid.segment.column.ValueType; import org.apache.druid.server.DruidNode; -import org.apache.druid.server.coordinator.BytesAccumulatingResponseHandler; import org.apache.druid.server.security.Access; import org.apache.druid.server.security.Action; import org.apache.druid.server.security.AuthenticationResult; @@ -80,8 +81,8 @@ import org.jboss.netty.handler.codec.http.HttpMethod; import javax.annotation.Nullable; +import javax.servlet.http.HttpServletResponse; import java.io.IOException; -import java.io.InputStream; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -210,7 +211,6 @@ public SystemSchema( ) { Preconditions.checkNotNull(serverView, "serverView"); - BytesAccumulatingResponseHandler responseHandler = new BytesAccumulatingResponseHandler(); final SegmentsTable segmentsTable = new SegmentsTable( druidSchema, metadataView, @@ -221,8 +221,8 @@ public SystemSchema( SEGMENTS_TABLE, segmentsTable, SERVERS_TABLE, new ServersTable(druidNodeDiscoveryProvider, serverInventoryView, authorizerMapper), SERVER_SEGMENTS_TABLE, new ServerSegmentsTable(serverView, authorizerMapper), - TASKS_TABLE, new TasksTable(overlordDruidLeaderClient, jsonMapper, responseHandler, authorizerMapper), - SUPERVISOR_TABLE, new SupervisorsTable(overlordDruidLeaderClient, jsonMapper, responseHandler, authorizerMapper) + TASKS_TABLE, new TasksTable(overlordDruidLeaderClient, jsonMapper, authorizerMapper), + SUPERVISOR_TABLE, new SupervisorsTable(overlordDruidLeaderClient, jsonMapper, authorizerMapper) ); } @@ -594,19 +594,16 @@ static class TasksTable extends AbstractTable implements ScannableTable { private final DruidLeaderClient druidLeaderClient; private final ObjectMapper jsonMapper; - private final BytesAccumulatingResponseHandler responseHandler; private final AuthorizerMapper authorizerMapper; public TasksTable( DruidLeaderClient druidLeaderClient, ObjectMapper jsonMapper, - BytesAccumulatingResponseHandler responseHandler, AuthorizerMapper authorizerMapper ) { this.druidLeaderClient = druidLeaderClient; this.jsonMapper = jsonMapper; - this.responseHandler = responseHandler; this.authorizerMapper = authorizerMapper; } @@ -707,7 +704,7 @@ public void close() } } - return new TasksEnumerable(getTasks(druidLeaderClient, jsonMapper, responseHandler)); + return new TasksEnumerable(getTasks(druidLeaderClient, jsonMapper)); } private CloseableIterator getAuthorizedTasks( @@ -736,37 +733,48 @@ private CloseableIterator getAuthorizedTasks( //Note that overlord must be up to get tasks private static JsonParserIterator getTasks( DruidLeaderClient indexingServiceClient, - ObjectMapper jsonMapper, - BytesAccumulatingResponseHandler responseHandler + ObjectMapper jsonMapper ) { Request request; + InputStreamFullResponseHolder responseHolder; try { + String query = "/druid/indexer/v1/tasks"; + request = indexingServiceClient.makeRequest( HttpMethod.GET, - "/druid/indexer/v1/tasks", - false + query + ); + + responseHolder = indexingServiceClient.go( + request, + new InputStreamFullResponseHandler() ); + + if (responseHolder.getStatus().getCode() != HttpServletResponse.SC_OK) { + throw new RE( + "Failed to talk to overlord leader at [%s]. Error code[%d], description[%s].", + query, + responseHolder.getStatus().getCode(), + responseHolder.getStatus().getReasonPhrase() + ); + } } - catch (IOException e) { + catch (IOException | InterruptedException e) { throw new RuntimeException(e); } - ListenableFuture future = indexingServiceClient.goAsync( - request, - responseHandler - ); final JavaType typeRef = jsonMapper.getTypeFactory().constructType(new TypeReference() { }); return new JsonParserIterator<>( typeRef, - future, + Futures.immediateFuture(responseHolder.getContent()), request.getUrl().toString(), null, request.getUrl().getHost(), jsonMapper, - responseHandler + null ); } @@ -777,19 +785,16 @@ static class SupervisorsTable extends AbstractTable implements ScannableTable { private final DruidLeaderClient druidLeaderClient; private final ObjectMapper jsonMapper; - private final BytesAccumulatingResponseHandler responseHandler; private final AuthorizerMapper authorizerMapper; public SupervisorsTable( DruidLeaderClient druidLeaderClient, ObjectMapper jsonMapper, - BytesAccumulatingResponseHandler responseHandler, AuthorizerMapper authorizerMapper ) { this.druidLeaderClient = druidLeaderClient; this.jsonMapper = jsonMapper; - this.responseHandler = responseHandler; this.authorizerMapper = authorizerMapper; } @@ -871,7 +876,7 @@ public void close() } } - return new SupervisorsEnumerable(getSupervisors(druidLeaderClient, jsonMapper, responseHandler)); + return new SupervisorsEnumerable(getSupervisors(druidLeaderClient, jsonMapper)); } private CloseableIterator getAuthorizedSupervisors( @@ -900,37 +905,47 @@ private CloseableIterator getAuthorizedSupervisors( // will fail with internal server error (HTTP 500) private static JsonParserIterator getSupervisors( DruidLeaderClient indexingServiceClient, - ObjectMapper jsonMapper, - BytesAccumulatingResponseHandler responseHandler + ObjectMapper jsonMapper ) { Request request; + InputStreamFullResponseHolder responseHolder; try { + String query = "/druid/indexer/v1/supervisor?system"; request = indexingServiceClient.makeRequest( HttpMethod.GET, - "/druid/indexer/v1/supervisor?system", - false + query ); + + responseHolder = indexingServiceClient.go( + request, + new InputStreamFullResponseHandler() + ); + + if (responseHolder.getStatus().getCode() != HttpServletResponse.SC_OK) { + throw new RE( + "Failed to talk to overlord leader at [%s]. Error code[%d], description[%s].", + query, + responseHolder.getStatus().getCode(), + responseHolder.getStatus().getReasonPhrase() + ); + } } - catch (IOException e) { + catch (IOException | InterruptedException e) { throw new RuntimeException(e); } - ListenableFuture future = indexingServiceClient.goAsync( - request, - responseHandler - ); final JavaType typeRef = jsonMapper.getTypeFactory().constructType(new TypeReference() { }); return new JsonParserIterator<>( typeRef, - future, + Futures.immediateFuture(responseHolder.getContent()), request.getUrl().toString(), null, request.getUrl().getHost(), jsonMapper, - responseHandler + null ); } diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java index 80bf83030f35..93347c6c8312 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java @@ -24,7 +24,6 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; -import com.google.common.util.concurrent.SettableFuture; import org.apache.calcite.DataContext; import org.apache.calcite.adapter.java.JavaTypeFactory; import org.apache.calcite.jdbc.JavaTypeFactoryImpl; @@ -53,8 +52,9 @@ import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.io.Closer; import org.apache.druid.java.util.http.client.Request; -import org.apache.druid.java.util.http.client.io.AppendableByteArrayInputStream; import org.apache.druid.java.util.http.client.response.HttpResponseHandler; +import org.apache.druid.java.util.http.client.response.InputStreamFullResponseHandler; +import org.apache.druid.java.util.http.client.response.InputStreamFullResponseHolder; import org.apache.druid.java.util.http.client.response.StringFullResponseHolder; import org.apache.druid.query.QueryRunnerFactoryConglomerate; import org.apache.druid.query.aggregation.CountAggregatorFactory; @@ -89,8 +89,11 @@ import org.apache.druid.timeline.SegmentWithOvershadowedStatus; import org.apache.druid.timeline.partition.NumberedShardSpec; import org.easymock.EasyMock; +import org.jboss.netty.handler.codec.http.DefaultHttpResponse; import org.jboss.netty.handler.codec.http.HttpMethod; import org.jboss.netty.handler.codec.http.HttpResponse; +import org.jboss.netty.handler.codec.http.HttpResponseStatus; +import org.jboss.netty.handler.codec.http.HttpVersion; import org.junit.AfterClass; import org.junit.Assert; import org.junit.Before; @@ -99,10 +102,8 @@ import org.junit.Test; import org.junit.rules.TemporaryFolder; -import javax.servlet.http.HttpServletResponse; import java.io.File; import java.io.IOException; -import java.io.InputStream; import java.net.URL; import java.nio.charset.StandardCharsets; import java.util.Arrays; @@ -1010,17 +1011,18 @@ public void testTasksTable() throws Exception { SystemSchema.TasksTable tasksTable = EasyMock.createMockBuilder(SystemSchema.TasksTable.class) - .withConstructor(client, mapper, responseHandler, authMapper) + .withConstructor(client, mapper, authMapper) .createMock(); + EasyMock.replay(tasksTable); - EasyMock.expect(client.makeRequest(HttpMethod.GET, "/druid/indexer/v1/tasks", false)).andReturn(request).anyTimes(); - SettableFuture future = SettableFuture.create(); - EasyMock.expect(client.goAsync(request, responseHandler)).andReturn(future).once(); - final int ok = HttpServletResponse.SC_OK; - EasyMock.expect(responseHandler.getStatus()).andReturn(ok).anyTimes(); - EasyMock.expect(request.getUrl()).andReturn(new URL("http://test-host:1234/druid/indexer/v1/tasks")).anyTimes(); + EasyMock.expect(client.makeRequest(HttpMethod.GET, "/druid/indexer/v1/tasks")).andReturn(request).anyTimes(); + - AppendableByteArrayInputStream in = new AppendableByteArrayInputStream(); + HttpResponse httpResp = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK); + InputStreamFullResponseHolder responseHolder = new InputStreamFullResponseHolder(httpResp.getStatus(), httpResp); + + EasyMock.expect(client.go(EasyMock.eq(request), EasyMock.anyObject(InputStreamFullResponseHandler.class))).andReturn(responseHolder).once(); + EasyMock.expect(request.getUrl()).andReturn(new URL("http://test-host:1234/druid/indexer/v1/tasks")).anyTimes(); String json = "[{\n" + "\t\"id\": \"index_wikipedia_2018-09-20T22:33:44.911Z\",\n" @@ -1056,9 +1058,8 @@ public void testTasksTable() throws Exception + "\t\"errorMsg\": null\n" + "}]"; byte[] bytesToWrite = json.getBytes(StandardCharsets.UTF_8); - in.add(bytesToWrite); - in.done(); - future.set(in); + responseHolder.addChunk(bytesToWrite); + responseHolder.done(); EasyMock.replay(client, request, responseHandler); DataContext dataContext = new DataContext() @@ -1133,24 +1134,24 @@ public void testSupervisorTable() throws Exception .withConstructor( client, mapper, - responseHandler, authMapper ) .createMock(); EasyMock.replay(supervisorTable); - EasyMock.expect(client.makeRequest(HttpMethod.GET, "/druid/indexer/v1/supervisor?system", false)) + EasyMock.expect(client.makeRequest(HttpMethod.GET, "/druid/indexer/v1/supervisor?system")) .andReturn(request) .anyTimes(); - SettableFuture future = SettableFuture.create(); - EasyMock.expect(client.goAsync(request, responseHandler)).andReturn(future).once(); - final int ok = HttpServletResponse.SC_OK; - EasyMock.expect(responseHandler.getStatus()).andReturn(ok).anyTimes(); + + HttpResponse httpResp = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK); + InputStreamFullResponseHolder responseHolder = new InputStreamFullResponseHolder(httpResp.getStatus(), httpResp); + + EasyMock.expect(client.go(EasyMock.eq(request), EasyMock.anyObject(InputStreamFullResponseHandler.class))).andReturn(responseHolder).once(); + + EasyMock.expect(responseHandler.getStatus()).andReturn(httpResp.getStatus().getCode()).anyTimes(); EasyMock.expect(request.getUrl()) .andReturn(new URL("http://test-host:1234/druid/indexer/v1/supervisor?system")) .anyTimes(); - AppendableByteArrayInputStream in = new AppendableByteArrayInputStream(); - String json = "[{\n" + "\t\"id\": \"wikipedia\",\n" + "\t\"state\": \"UNHEALTHY_SUPERVISOR\",\n" @@ -1164,9 +1165,8 @@ public void testSupervisorTable() throws Exception + "}]"; byte[] bytesToWrite = json.getBytes(StandardCharsets.UTF_8); - in.add(bytesToWrite); - in.done(); - future.set(in); + responseHolder.addChunk(bytesToWrite); + responseHolder.done(); EasyMock.replay(client, request, responseHandler); DataContext dataContext = new DataContext() diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java b/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java index 08742812f89b..93a3486ae924 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java @@ -78,7 +78,6 @@ import org.apache.druid.server.QueryLifecycleFactory; import org.apache.druid.server.QueryScheduler; import org.apache.druid.server.QueryStackTests; -import org.apache.druid.server.coordinator.BytesAccumulatingResponseHandler; import org.apache.druid.server.log.NoopRequestLogger; import org.apache.druid.server.security.Access; import org.apache.druid.server.security.AllowAllAuthenticator; @@ -784,7 +783,6 @@ NodeRole.COORDINATOR, new FakeDruidNodeDiscovery(ImmutableMap.of(NodeRole.COORDI new MetadataSegmentView( druidLeaderClient, getJsonMapper(), - new BytesAccumulatingResponseHandler(), new BrokerSegmentWatcherConfig(), plannerConfig ), From 6a7cba2cb5387a344a87b7c6fe60c0dce75ac2ed Mon Sep 17 00:00:00 2001 From: Himanshu Gupta Date: Fri, 29 May 2020 17:17:21 -0700 Subject: [PATCH 2/8] remove ByteArrayResponseHolder dependency from JsonParserIterator --- .../apache/druid/client/DirectDruidClient.java | 3 +-- .../druid/client/JsonParserIterator.java | 18 +----------------- .../calcite/schema/MetadataSegmentView.java | 3 +-- .../druid/sql/calcite/schema/SystemSchema.java | 6 ++---- 4 files changed, 5 insertions(+), 25 deletions(-) diff --git a/server/src/main/java/org/apache/druid/client/DirectDruidClient.java b/server/src/main/java/org/apache/druid/client/DirectDruidClient.java index 7c674f0e3ab7..bbfdcd9456ce 100644 --- a/server/src/main/java/org/apache/druid/client/DirectDruidClient.java +++ b/server/src/main/java/org/apache/druid/client/DirectDruidClient.java @@ -493,8 +493,7 @@ public JsonParserIterator make() url, query, host, - toolChest.decorateObjectMapper(objectMapper, query), - null + toolChest.decorateObjectMapper(objectMapper, query) ); } diff --git a/server/src/main/java/org/apache/druid/client/JsonParserIterator.java b/server/src/main/java/org/apache/druid/client/JsonParserIterator.java index 9478f246764f..c12e64a81202 100644 --- a/server/src/main/java/org/apache/druid/client/JsonParserIterator.java +++ b/server/src/main/java/org/apache/druid/client/JsonParserIterator.java @@ -25,17 +25,14 @@ import com.fasterxml.jackson.databind.JavaType; import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.druid.java.util.common.IAE; -import org.apache.druid.java.util.common.RE; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.guava.CloseQuietly; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.query.Query; import org.apache.druid.query.QueryInterruptedException; import org.apache.druid.query.ResourceLimitExceededException; -import org.apache.druid.server.coordinator.BytesAccumulatingResponseHandler; import javax.annotation.Nullable; -import javax.servlet.http.HttpServletResponse; import java.io.Closeable; import java.io.IOException; import java.io.InputStream; @@ -57,7 +54,6 @@ public class JsonParserIterator implements Iterator, Closeable private final String url; private final String host; private final ObjectMapper objectMapper; - private final BytesAccumulatingResponseHandler responseHandler; private final boolean hasTimeout; private final long timeoutAt; private final String queryId; @@ -68,8 +64,7 @@ public JsonParserIterator( String url, @Nullable Query query, String host, - ObjectMapper objectMapper, - BytesAccumulatingResponseHandler responseHandler + ObjectMapper objectMapper ) { this.typeRef = typeRef; @@ -85,7 +80,6 @@ public JsonParserIterator( this.jp = null; this.host = host; this.objectMapper = objectMapper; - this.responseHandler = responseHandler; this.hasTimeout = timeoutAt > -1; } @@ -137,16 +131,6 @@ private void init() InputStream is = hasTimeout ? future.get(timeLeftMillis, TimeUnit.MILLISECONDS) : future.get(); - if (responseHandler != null && responseHandler.getStatus() != HttpServletResponse.SC_OK) { - interruptQuery( - new RE( - "Unexpected response status [%s] description [%s] from request url[%s]", - responseHandler.getStatus(), - responseHandler.getDescription(), - url - ) - ); - } if (is != null) { jp = objectMapper.getFactory().createParser(is); } else { diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/schema/MetadataSegmentView.java b/sql/src/main/java/org/apache/druid/sql/calcite/schema/MetadataSegmentView.java index af46cc41ace2..2b2cfb177df2 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/MetadataSegmentView.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/MetadataSegmentView.java @@ -230,8 +230,7 @@ private JsonParserIterator getMetadataSegments( request.getUrl().toString(), null, request.getUrl().getHost(), - jsonMapper, - null + jsonMapper ); } diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java index 8bf3b96d6c3d..8e3bee21d186 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java @@ -773,8 +773,7 @@ private static JsonParserIterator getTasks( request.getUrl().toString(), null, request.getUrl().getHost(), - jsonMapper, - null + jsonMapper ); } @@ -944,8 +943,7 @@ private static JsonParserIterator getSupervisors( request.getUrl().toString(), null, request.getUrl().getHost(), - jsonMapper, - null + jsonMapper ); } From 330aba3dd98ce15a13cd6ca607824bc07036ee81 Mon Sep 17 00:00:00 2001 From: Himanshu Gupta Date: Sun, 26 Jul 2020 10:57:31 -0700 Subject: [PATCH 3/8] add UT to cover lines in InputStreamFullResponseHandler --- .../discovery/DruidLeaderClientTest.java | 34 ++++++++++++++++++- 1 file changed, 33 insertions(+), 1 deletion(-) diff --git a/server/src/test/java/org/apache/druid/discovery/DruidLeaderClientTest.java b/server/src/test/java/org/apache/druid/discovery/DruidLeaderClientTest.java index 7c376ea20b68..d36db6a7fcdb 100644 --- a/server/src/test/java/org/apache/druid/discovery/DruidLeaderClientTest.java +++ b/server/src/test/java/org/apache/druid/discovery/DruidLeaderClientTest.java @@ -29,6 +29,7 @@ import com.google.inject.name.Named; import com.google.inject.name.Names; import com.google.inject.servlet.GuiceFilter; +import org.apache.commons.io.IOUtils; import org.apache.druid.curator.discovery.ServerDiscoverySelector; import org.apache.druid.guice.GuiceInjectors; import org.apache.druid.guice.Jerseys; @@ -40,6 +41,8 @@ import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.http.client.HttpClient; import org.apache.druid.java.util.http.client.Request; +import org.apache.druid.java.util.http.client.response.InputStreamFullResponseHandler; +import org.apache.druid.java.util.http.client.response.InputStreamResponseHandler; import org.apache.druid.server.DruidNode; import org.apache.druid.server.initialization.BaseJettyTest; import org.apache.druid.server.initialization.jetty.JettyServerInitializer; @@ -107,7 +110,7 @@ public void configure(Binder binder) } @Test - public void testSimple() throws Exception + public void testSimpleWithStringFullResponseHandler() throws Exception { DruidNodeDiscovery druidNodeDiscovery = EasyMock.createMock(DruidNodeDiscovery.class); EasyMock.expect(druidNodeDiscovery.getAllNodes()).andReturn( @@ -133,6 +136,35 @@ public void testSimple() throws Exception Assert.assertEquals("hello", druidLeaderClient.go(request).getContent()); } + @Test + public void testSimpleWithInputStreamFullResponseHandler() throws Exception + { + DruidNodeDiscovery druidNodeDiscovery = EasyMock.createMock(DruidNodeDiscovery.class); + EasyMock.expect(druidNodeDiscovery.getAllNodes()).andReturn( + ImmutableList.of(discoveryDruidNode) + ); + + DruidNodeDiscoveryProvider druidNodeDiscoveryProvider = EasyMock.createMock(DruidNodeDiscoveryProvider.class); + EasyMock.expect(druidNodeDiscoveryProvider.getForNodeRole(NodeRole.PEON)).andReturn(druidNodeDiscovery); + + EasyMock.replay(druidNodeDiscovery, druidNodeDiscoveryProvider); + + DruidLeaderClient druidLeaderClient = new DruidLeaderClient( + httpClient, + druidNodeDiscoveryProvider, + NodeRole.PEON, + "/simple/leader", + EasyMock.createNiceMock(ServerDiscoverySelector.class) + ); + druidLeaderClient.start(); + + Request request = druidLeaderClient.makeRequest(HttpMethod.POST, "/simple/direct"); + request.setContent("hello".getBytes(StandardCharsets.UTF_8)); + + InputStreamFullResponseHandler responseHandler = new InputStreamFullResponseHandler(); + Assert.assertEquals("hello", IOUtils.toString(druidLeaderClient.go(request, responseHandler).getContent(), StandardCharsets.UTF_8)); + } + @Test public void testNoLeaderFound() throws Exception { From 18d1a910bd78cc33cc9729e21269aaf3a929bb5e Mon Sep 17 00:00:00 2001 From: Himanshu Gupta Date: Sun, 26 Jul 2020 10:57:44 -0700 Subject: [PATCH 4/8] refactor SystemSchema to reduce branches --- .../sql/calcite/schema/SystemSchema.java | 67 +++++++------------ 1 file changed, 25 insertions(+), 42 deletions(-) diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java index 4dfe85124a6f..fec990188127 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java @@ -820,43 +820,12 @@ private static JsonParserIterator getTasks( ObjectMapper jsonMapper ) { - Request request; - InputStreamFullResponseHolder responseHolder; - try { - String query = "/druid/indexer/v1/tasks"; - - request = indexingServiceClient.makeRequest( - HttpMethod.GET, - query - ); - - responseHolder = indexingServiceClient.go( - request, - new InputStreamFullResponseHandler() - ); - - if (responseHolder.getStatus().getCode() != HttpServletResponse.SC_OK) { - throw new RE( - "Failed to talk to overlord leader at [%s]. Error code[%d], description[%s].", - query, - responseHolder.getStatus().getCode(), - responseHolder.getStatus().getReasonPhrase() - ); - } - } - catch (IOException | InterruptedException e) { - throw new RuntimeException(e); - } - - final JavaType typeRef = jsonMapper.getTypeFactory().constructType(new TypeReference() - { - }); - return new JsonParserIterator<>( - typeRef, - Futures.immediateFuture(responseHolder.getContent()), - request.getUrl().toString(), - null, - request.getUrl().getHost(), + return getThingsFromLeaderNode( + "/druid/indexer/v1/tasks", + new TypeReference() + { + }, + indexingServiceClient, jsonMapper ); } @@ -992,11 +961,27 @@ private static JsonParserIterator getSupervisors( DruidLeaderClient indexingServiceClient, ObjectMapper jsonMapper ) + { + return getThingsFromLeaderNode( + "/druid/indexer/v1/supervisor?system", + new TypeReference() + { + }, + indexingServiceClient, + jsonMapper + ); + } + + private static JsonParserIterator getThingsFromLeaderNode( + String query, + TypeReference typeRef, + DruidLeaderClient indexingServiceClient, + ObjectMapper jsonMapper + ) { Request request; InputStreamFullResponseHolder responseHolder; try { - String query = "/druid/indexer/v1/supervisor?system"; request = indexingServiceClient.makeRequest( HttpMethod.GET, query @@ -1020,11 +1005,9 @@ private static JsonParserIterator getSupervisors( throw new RuntimeException(e); } - final JavaType typeRef = jsonMapper.getTypeFactory().constructType(new TypeReference() - { - }); + final JavaType javaType = jsonMapper.getTypeFactory().constructType(typeRef); return new JsonParserIterator<>( - typeRef, + javaType, Futures.immediateFuture(responseHolder.getContent()), request.getUrl().toString(), null, From ec993a986fe5a4d17c5d808d33d1a1589074042b Mon Sep 17 00:00:00 2001 From: Himanshu Gupta Date: Sun, 26 Jul 2020 16:33:40 -0700 Subject: [PATCH 5/8] further reduce branches --- .../calcite/schema/MetadataSegmentView.java | 40 ++++--------------- .../sql/calcite/schema/SystemSchema.java | 10 ++--- 2 files changed, 12 insertions(+), 38 deletions(-) diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/schema/MetadataSegmentView.java b/sql/src/main/java/org/apache/druid/sql/calcite/schema/MetadataSegmentView.java index 2b2cfb177df2..ca14d54412af 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/MetadataSegmentView.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/MetadataSegmentView.java @@ -34,6 +34,7 @@ import org.apache.druid.concurrent.LifecycleLock; import org.apache.druid.discovery.DruidLeaderClient; import org.apache.druid.guice.ManageLifecycle; +import org.apache.druid.indexing.overlord.supervisor.SupervisorStatus; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.RE; import org.apache.druid.java.util.common.StringUtils; @@ -196,40 +197,13 @@ private JsonParserIterator getMetadataSegments( sb.setLength(sb.length() - 1); query = "/druid/coordinator/v1/metadata/segments?includeOvershadowedStatus&" + sb; } - Request request; - InputStreamFullResponseHolder responseHolder; - try { - request = coordinatorClient.makeRequest( - HttpMethod.GET, - StringUtils.format(query) - ); - - responseHolder = coordinatorClient.go( - request, - new InputStreamFullResponseHandler() - ); - if (responseHolder.getStatus().getCode() != HttpServletResponse.SC_OK) { - throw new RE( - "Failed to talk to coordinator leader at [%s]. Error code[%d], description[%s].", - query, - responseHolder.getStatus().getCode(), - responseHolder.getStatus().getReasonPhrase() - ); - } - } - catch (IOException | InterruptedException e) { - throw new RuntimeException(e); - } - final JavaType typeRef = jsonMapper.getTypeFactory().constructType(new TypeReference() - { - }); - return new JsonParserIterator<>( - typeRef, - Futures.immediateFuture(responseHolder.getContent()), - request.getUrl().toString(), - null, - request.getUrl().getHost(), + return SystemSchema.getThingsFromLeaderNode( + query, + new TypeReference() + { + }, + coordinatorClient, jsonMapper ); } diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java index fec990188127..0237b86512c7 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java @@ -972,29 +972,29 @@ private static JsonParserIterator getSupervisors( ); } - private static JsonParserIterator getThingsFromLeaderNode( + public static JsonParserIterator getThingsFromLeaderNode( String query, TypeReference typeRef, - DruidLeaderClient indexingServiceClient, + DruidLeaderClient leaderClient, ObjectMapper jsonMapper ) { Request request; InputStreamFullResponseHolder responseHolder; try { - request = indexingServiceClient.makeRequest( + request = leaderClient.makeRequest( HttpMethod.GET, query ); - responseHolder = indexingServiceClient.go( + responseHolder = leaderClient.go( request, new InputStreamFullResponseHandler() ); if (responseHolder.getStatus().getCode() != HttpServletResponse.SC_OK) { throw new RE( - "Failed to talk to overlord leader at [%s]. Error code[%d], description[%s].", + "Failed to talk to leader node at [%s]. Error code[%d], description[%s].", query, responseHolder.getStatus().getCode(), responseHolder.getStatus().getReasonPhrase() From ee0285687413a2d1d7d3f2c9156d0671d6a6c088 Mon Sep 17 00:00:00 2001 From: Himanshu Gupta Date: Sun, 26 Jul 2020 17:05:07 -0700 Subject: [PATCH 6/8] Revert "add UT to cover lines in InputStreamFullResponseHandler" This reverts commit 330aba3dd98ce15a13cd6ca607824bc07036ee81. --- .../discovery/DruidLeaderClientTest.java | 34 +------------------ 1 file changed, 1 insertion(+), 33 deletions(-) diff --git a/server/src/test/java/org/apache/druid/discovery/DruidLeaderClientTest.java b/server/src/test/java/org/apache/druid/discovery/DruidLeaderClientTest.java index d36db6a7fcdb..7c376ea20b68 100644 --- a/server/src/test/java/org/apache/druid/discovery/DruidLeaderClientTest.java +++ b/server/src/test/java/org/apache/druid/discovery/DruidLeaderClientTest.java @@ -29,7 +29,6 @@ import com.google.inject.name.Named; import com.google.inject.name.Names; import com.google.inject.servlet.GuiceFilter; -import org.apache.commons.io.IOUtils; import org.apache.druid.curator.discovery.ServerDiscoverySelector; import org.apache.druid.guice.GuiceInjectors; import org.apache.druid.guice.Jerseys; @@ -41,8 +40,6 @@ import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.http.client.HttpClient; import org.apache.druid.java.util.http.client.Request; -import org.apache.druid.java.util.http.client.response.InputStreamFullResponseHandler; -import org.apache.druid.java.util.http.client.response.InputStreamResponseHandler; import org.apache.druid.server.DruidNode; import org.apache.druid.server.initialization.BaseJettyTest; import org.apache.druid.server.initialization.jetty.JettyServerInitializer; @@ -110,7 +107,7 @@ public void configure(Binder binder) } @Test - public void testSimpleWithStringFullResponseHandler() throws Exception + public void testSimple() throws Exception { DruidNodeDiscovery druidNodeDiscovery = EasyMock.createMock(DruidNodeDiscovery.class); EasyMock.expect(druidNodeDiscovery.getAllNodes()).andReturn( @@ -136,35 +133,6 @@ public void testSimpleWithStringFullResponseHandler() throws Exception Assert.assertEquals("hello", druidLeaderClient.go(request).getContent()); } - @Test - public void testSimpleWithInputStreamFullResponseHandler() throws Exception - { - DruidNodeDiscovery druidNodeDiscovery = EasyMock.createMock(DruidNodeDiscovery.class); - EasyMock.expect(druidNodeDiscovery.getAllNodes()).andReturn( - ImmutableList.of(discoveryDruidNode) - ); - - DruidNodeDiscoveryProvider druidNodeDiscoveryProvider = EasyMock.createMock(DruidNodeDiscoveryProvider.class); - EasyMock.expect(druidNodeDiscoveryProvider.getForNodeRole(NodeRole.PEON)).andReturn(druidNodeDiscovery); - - EasyMock.replay(druidNodeDiscovery, druidNodeDiscoveryProvider); - - DruidLeaderClient druidLeaderClient = new DruidLeaderClient( - httpClient, - druidNodeDiscoveryProvider, - NodeRole.PEON, - "/simple/leader", - EasyMock.createNiceMock(ServerDiscoverySelector.class) - ); - druidLeaderClient.start(); - - Request request = druidLeaderClient.makeRequest(HttpMethod.POST, "/simple/direct"); - request.setContent("hello".getBytes(StandardCharsets.UTF_8)); - - InputStreamFullResponseHandler responseHandler = new InputStreamFullResponseHandler(); - Assert.assertEquals("hello", IOUtils.toString(druidLeaderClient.go(request, responseHandler).getContent(), StandardCharsets.UTF_8)); - } - @Test public void testNoLeaderFound() throws Exception { From 0b139b8e8ca9f544a99af2bf025652459700a54a Mon Sep 17 00:00:00 2001 From: Himanshu Gupta Date: Sun, 26 Jul 2020 17:05:34 -0700 Subject: [PATCH 7/8] UTs for InputStreamFullResponseHandler --- .../InputStreamFullResponseHandlerTest.java | 71 +++++++++++++++++++ 1 file changed, 71 insertions(+) create mode 100644 core/src/test/java/org/apache/druid/java/util/http/client/response/InputStreamFullResponseHandlerTest.java diff --git a/core/src/test/java/org/apache/druid/java/util/http/client/response/InputStreamFullResponseHandlerTest.java b/core/src/test/java/org/apache/druid/java/util/http/client/response/InputStreamFullResponseHandlerTest.java new file mode 100644 index 000000000000..9c4004711035 --- /dev/null +++ b/core/src/test/java/org/apache/druid/java/util/http/client/response/InputStreamFullResponseHandlerTest.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.java.util.http.client.response; + +import org.apache.commons.io.IOUtils; +import org.apache.druid.java.util.common.StringUtils; +import org.jboss.netty.buffer.BigEndianHeapChannelBuffer; +import org.jboss.netty.handler.codec.http.DefaultHttpChunk; +import org.jboss.netty.handler.codec.http.DefaultHttpResponse; +import org.jboss.netty.handler.codec.http.HttpResponse; +import org.jboss.netty.handler.codec.http.HttpResponseStatus; +import org.jboss.netty.handler.codec.http.HttpVersion; +import org.junit.Assert; +import org.junit.Test; + +import java.nio.charset.StandardCharsets; + +public class InputStreamFullResponseHandlerTest +{ + @Test + public void testSimple() throws Exception + { + HttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK); + response.setChunked(false); + response.setContent(new BigEndianHeapChannelBuffer("abcd".getBytes(StringUtils.UTF8_STRING))); + + InputStreamFullResponseHandler responseHandler = new InputStreamFullResponseHandler(); + ClientResponse clientResp = responseHandler.handleResponse(response, null); + + DefaultHttpChunk chunk = new DefaultHttpChunk(new BigEndianHeapChannelBuffer("efg".getBytes(StringUtils.UTF8_STRING))); + clientResp = responseHandler.handleChunk(clientResp, chunk, 0); + + clientResp = responseHandler.done(clientResp); + + Assert.assertTrue(clientResp.isFinished()); + Assert.assertEquals("abcdefg", IOUtils.toString(clientResp.getObj().getContent(), StandardCharsets.UTF_8)); + } + + @Test + public void testException() throws Exception + { + HttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK); + response.setChunked(false); + response.setContent(new BigEndianHeapChannelBuffer("abcd".getBytes(StringUtils.UTF8_STRING))); + + InputStreamFullResponseHandler responseHandler = new InputStreamFullResponseHandler(); + ClientResponse clientResp = responseHandler.handleResponse(response, null); + + Exception ex = new RuntimeException("dummy!"); + responseHandler.exceptionCaught(clientResp, ex); + + Assert.assertTrue(clientResp.isFinished()); + } +} From e0d64850b29413ba920414ffd8e5feaad206ee46 Mon Sep 17 00:00:00 2001 From: Himanshu Gupta Date: Sun, 26 Jul 2020 18:45:52 -0700 Subject: [PATCH 8/8] remove unused imports --- .../druid/sql/calcite/schema/MetadataSegmentView.java | 11 ----------- 1 file changed, 11 deletions(-) diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/schema/MetadataSegmentView.java b/sql/src/main/java/org/apache/druid/sql/calcite/schema/MetadataSegmentView.java index ca14d54412af..1cc5dc746e8a 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/MetadataSegmentView.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/MetadataSegmentView.java @@ -20,11 +20,9 @@ package org.apache.druid.sql.calcite.schema; import com.fasterxml.jackson.core.type.TypeReference; -import com.fasterxml.jackson.databind.JavaType; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableSortedSet; -import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.Uninterruptibles; import com.google.inject.Inject; import org.apache.druid.client.BrokerSegmentWatcherConfig; @@ -34,26 +32,17 @@ import org.apache.druid.concurrent.LifecycleLock; import org.apache.druid.discovery.DruidLeaderClient; import org.apache.druid.guice.ManageLifecycle; -import org.apache.druid.indexing.overlord.supervisor.SupervisorStatus; import org.apache.druid.java.util.common.ISE; -import org.apache.druid.java.util.common.RE; -import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.common.lifecycle.LifecycleStart; import org.apache.druid.java.util.common.lifecycle.LifecycleStop; import org.apache.druid.java.util.emitter.EmittingLogger; -import org.apache.druid.java.util.http.client.Request; -import org.apache.druid.java.util.http.client.response.InputStreamFullResponseHandler; -import org.apache.druid.java.util.http.client.response.InputStreamFullResponseHolder; import org.apache.druid.metadata.SegmentsMetadataManager; import org.apache.druid.sql.calcite.planner.PlannerConfig; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.SegmentWithOvershadowedStatus; import org.checkerframework.checker.nullness.qual.MonotonicNonNull; -import org.jboss.netty.handler.codec.http.HttpMethod; -import javax.servlet.http.HttpServletResponse; -import java.io.IOException; import java.util.Iterator; import java.util.Set; import java.util.concurrent.CountDownLatch;