Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ public BytesFullResponseHolder(HttpResponseStatus status, HttpResponse response)
this.chunks = new ArrayList<>();
}

@Override
public BytesFullResponseHolder addChunk(byte[] chunk)
{
chunks.add(chunk);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,7 @@ public HttpResponse getResponse()
}

/**
* Append a new chunk of data.
*/
public abstract FullResponseHolder addChunk(T chunk);
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

addChunk seems to be implemented by all the subclasses, then why remove this ?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed it because newly introduced InputStreamFullResponseHolder.addChunk(byte[] chunk) whereas getContent() needs to return InputStream . We could do the necessary wrapping so as to have InputStreamFullResponseHolder.addChunk(InputStream chunk) but would have required an extra wrap/unwrap of byte[] into/outof ByteArrayInputStream in `InputStreamResponseHandler .


/**
* Get the accumulated data via {@link #addChunk}.
* Get the data.
*/
public abstract T getContent();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.java.util.http.client.response;

import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.handler.codec.http.HttpChunk;
import org.jboss.netty.handler.codec.http.HttpResponse;

/**
* This is a clone of {@link InputStreamResponseHandler} except that it retains HTTP status/response object in the
* response holder result.
*/
public class InputStreamFullResponseHandler implements HttpResponseHandler<InputStreamFullResponseHolder, InputStreamFullResponseHolder>
{
@Override
public ClientResponse<InputStreamFullResponseHolder> handleResponse(HttpResponse response, TrafficCop trafficCop)
{
InputStreamFullResponseHolder holder = new InputStreamFullResponseHolder(response.getStatus(), response);
holder.addChunk(getContentBytes(response.getContent()));
return ClientResponse.finished(holder);
}

@Override
public ClientResponse<InputStreamFullResponseHolder> handleChunk(
ClientResponse<InputStreamFullResponseHolder> clientResponse,
HttpChunk chunk,
long chunkNum
)
{
clientResponse.getObj().addChunk(getContentBytes(chunk.getContent()));
return clientResponse;
}

@Override
public ClientResponse<InputStreamFullResponseHolder> done(ClientResponse<InputStreamFullResponseHolder> clientResponse)
{
InputStreamFullResponseHolder holder = clientResponse.getObj();
holder.done();
return ClientResponse.finished(holder);
}

@Override
public void exceptionCaught(
ClientResponse<InputStreamFullResponseHolder> clientResponse,
Throwable e
)
{
clientResponse.getObj().exceptionCaught(e);
}

private byte[] getContentBytes(ChannelBuffer content)
{
byte[] contentBytes = new byte[content.readableBytes()];
content.readBytes(contentBytes);
return contentBytes;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.java.util.http.client.response;

import org.apache.druid.java.util.http.client.io.AppendableByteArrayInputStream;
import org.jboss.netty.handler.codec.http.HttpResponse;
import org.jboss.netty.handler.codec.http.HttpResponseStatus;

import java.io.InputStream;

public class InputStreamFullResponseHolder extends FullResponseHolder<InputStream>
{
private final AppendableByteArrayInputStream is;

public InputStreamFullResponseHolder(
HttpResponseStatus status,
HttpResponse response
)
{
super(status, response);
is = new AppendableByteArrayInputStream();
}

public InputStreamFullResponseHolder addChunk(byte[] chunk)
{
is.add(chunk);
return this;
}

@Override
public InputStream getContent()
{
return is;
}

public void done()
{
is.done();
}

public void exceptionCaught(Throwable t)
{
is.exceptionCaught(t);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ public StringFullResponseHolder(
this.builder = new StringBuilder(response.getContent().toString(charset));
}

@Override
public StringFullResponseHolder addChunk(String chunk)
{
builder.append(chunk);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.java.util.http.client.response;

import org.apache.commons.io.IOUtils;
import org.apache.druid.java.util.common.StringUtils;
import org.jboss.netty.buffer.BigEndianHeapChannelBuffer;
import org.jboss.netty.handler.codec.http.DefaultHttpChunk;
import org.jboss.netty.handler.codec.http.DefaultHttpResponse;
import org.jboss.netty.handler.codec.http.HttpResponse;
import org.jboss.netty.handler.codec.http.HttpResponseStatus;
import org.jboss.netty.handler.codec.http.HttpVersion;
import org.junit.Assert;
import org.junit.Test;

import java.nio.charset.StandardCharsets;

public class InputStreamFullResponseHandlerTest
{
@Test
public void testSimple() throws Exception
{
HttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
response.setChunked(false);
response.setContent(new BigEndianHeapChannelBuffer("abcd".getBytes(StringUtils.UTF8_STRING)));

InputStreamFullResponseHandler responseHandler = new InputStreamFullResponseHandler();
ClientResponse<InputStreamFullResponseHolder> clientResp = responseHandler.handleResponse(response, null);

DefaultHttpChunk chunk = new DefaultHttpChunk(new BigEndianHeapChannelBuffer("efg".getBytes(StringUtils.UTF8_STRING)));
clientResp = responseHandler.handleChunk(clientResp, chunk, 0);

clientResp = responseHandler.done(clientResp);

Assert.assertTrue(clientResp.isFinished());
Assert.assertEquals("abcdefg", IOUtils.toString(clientResp.getObj().getContent(), StandardCharsets.UTF_8));
}

@Test
public void testException() throws Exception
{
HttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
response.setChunked(false);
response.setContent(new BigEndianHeapChannelBuffer("abcd".getBytes(StringUtils.UTF8_STRING)));

InputStreamFullResponseHandler responseHandler = new InputStreamFullResponseHandler();
ClientResponse<InputStreamFullResponseHolder> clientResp = responseHandler.handleResponse(response, null);

Exception ex = new RuntimeException("dummy!");
responseHandler.exceptionCaught(clientResp, ex);

Assert.assertTrue(clientResp.isFinished());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -509,8 +509,7 @@ public JsonParserIterator<T> make()
url,
query,
host,
toolChest.decorateObjectMapper(objectMapper, query),
null
toolChest.decorateObjectMapper(objectMapper, query)
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,14 @@
import com.fasterxml.jackson.databind.JavaType;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.RE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.guava.CloseQuietly;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryInterruptedException;
import org.apache.druid.query.ResourceLimitExceededException;
import org.apache.druid.server.coordinator.BytesAccumulatingResponseHandler;

import javax.annotation.Nullable;
import javax.servlet.http.HttpServletResponse;
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
Expand All @@ -57,7 +54,6 @@ public class JsonParserIterator<T> implements Iterator<T>, Closeable
private final String url;
private final String host;
private final ObjectMapper objectMapper;
private final BytesAccumulatingResponseHandler responseHandler;
private final boolean hasTimeout;
private final long timeoutAt;
private final String queryId;
Expand All @@ -68,8 +64,7 @@ public JsonParserIterator(
String url,
@Nullable Query<T> query,
String host,
ObjectMapper objectMapper,
BytesAccumulatingResponseHandler responseHandler
ObjectMapper objectMapper
)
{
this.typeRef = typeRef;
Expand All @@ -85,7 +80,6 @@ public JsonParserIterator(
this.jp = null;
this.host = host;
this.objectMapper = objectMapper;
this.responseHandler = responseHandler;
this.hasTimeout = timeoutAt > -1;
}

Expand Down Expand Up @@ -137,16 +131,6 @@ private void init()
InputStream is = hasTimeout
? future.get(timeLeftMillis, TimeUnit.MILLISECONDS)
: future.get();
if (responseHandler != null && responseHandler.getStatus() != HttpServletResponse.SC_OK) {
interruptQuery(
new RE(
"Unexpected response status [%s] description [%s] from request url[%s]",
responseHandler.getStatus(),
responseHandler.getDescription(),
url
)
);
}
if (is != null) {
jp = objectMapper.getFactory().createParser(is);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@

import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.util.concurrent.ListenableFuture;
import org.apache.druid.client.selector.DiscoverySelector;
import org.apache.druid.client.selector.Server;
import org.apache.druid.concurrent.LifecycleLock;
Expand Down Expand Up @@ -121,42 +120,20 @@ public void stop()
log.debug("Stopped.");
}

/**
* Make a Request object aimed at the leader. Throws IOException if the leader cannot be located.
*
* @param cached Uses cached leader if true, else uses the current leader
*/
public Request makeRequest(HttpMethod httpMethod, String urlPath, boolean cached) throws IOException
{
Preconditions.checkState(lifecycleLock.awaitStarted(1, TimeUnit.MILLISECONDS));
return new Request(httpMethod, new URL(StringUtils.format("%s%s", getCurrentKnownLeader(cached), urlPath)));
}

/**
* Make a Request object aimed at the leader. Throws IOException if the leader cannot be located.
*/
public Request makeRequest(HttpMethod httpMethod, String urlPath) throws IOException
{
return makeRequest(httpMethod, urlPath, true);
Preconditions.checkState(lifecycleLock.awaitStarted(1, TimeUnit.MILLISECONDS));
return new Request(httpMethod, new URL(StringUtils.format("%s%s", getCurrentKnownLeader(true), urlPath)));
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it would always use cached "CurrentKnownLeader", it lead to some issues earlier if the cached leader is not reachable, is that case handled somewhere now?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if cached "leader" has disappeared then connection to that host will fail and we will retry with a randomly chosen available server in code path starting line-160 .
if cached "leader" is not leader anymore then it should send a TEMPORARY_REDIRECT and we would retry with code flow starting at line 193

}

public StringFullResponseHolder go(Request request) throws IOException, InterruptedException
{
return go(request, new StringFullResponseHandler(StandardCharsets.UTF_8));
}

/**
* Executes the request object aimed at the leader and process the response with given handler
* Note: this method doesn't do retrying on errors or handle leader changes occurred during communication
*/
public <Intermediate, Final> ListenableFuture<Final> goAsync(
final Request request,
final HttpResponseHandler<Intermediate, Final> handler
)
{
return httpClient.go(request, handler);
}

/**
* Executes a Request object aimed at the leader. Throws IOException if the leader cannot be located.
*/
Expand Down
Loading