Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions docs/content/operations/http-compression.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
---
layout: doc_page
---
# HTTP Compression

Druid supports http request decompression and response compression, to use this, http request header `Content-Encoding:gzip` and `Accept-Encoding:gzip` is needed to be set.

# General Configuration

|Property|Description|Default|
|--------|-----------|-------|
|`druid.server.http.compressionLevel`|The compression level. Value should be between [-1,9], -1 for default level, 0 for no compression.|-1 (default compression level)|
|`druid.server.http.inflateBufferSize`|The buffer size used by gzip decoder. Set to 0 to disable request decompression.|4096|
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@
<fastutil.version>8.1.0</fastutil.version>
<guava.version>16.0.1</guava.version>
<guice.version>4.1.0</guice.version>
<jetty.version>9.3.19.v20170502</jetty.version>
<jetty.version>9.4.10.v20180503</jetty.version>
<jersey.version>1.19.3</jersey.version>
<!-- jackson 2.7.x causes injection error and 2.8.x can't be used because avatica is using 2.6.3 -->
<jackson.version>2.6.7</jackson.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.client.api.Result;
import org.eclipse.jetty.client.util.BytesContentProvider;
import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.http.HttpMethod;
import org.eclipse.jetty.proxy.AsyncProxyServlet;

Expand Down Expand Up @@ -300,7 +301,9 @@ protected void sendProxyRequest(
if (query != null) {
final ObjectMapper objectMapper = (ObjectMapper) clientRequest.getAttribute(OBJECTMAPPER_ATTRIBUTE);
try {
proxyRequest.content(new BytesContentProvider(objectMapper.writeValueAsBytes(query)));
byte[] bytes = objectMapper.writeValueAsBytes(query);
proxyRequest.content(new BytesContentProvider(bytes));
proxyRequest.getHeaders().put(HttpHeader.CONTENT_LENGTH, String.valueOf(bytes.length));
}
catch (JsonProcessingException e) {
Throwables.propagate(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,19 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import org.joda.time.Period;

import javax.validation.constraints.Max;
import javax.validation.constraints.Min;
import javax.validation.constraints.NotNull;
import java.util.Objects;
import java.util.zip.Deflater;

/**
*/
public class ServerConfig
{

public static final int DEFAULT_GZIP_INFLATE_BUFFER_SIZE = 4096;

@JsonProperty
@Min(1)
private int numThreads = Math.max(10, (Runtime.getRuntime().availableProcessors() * 17) / 16 + 2) + 30;
Expand Down Expand Up @@ -68,6 +73,15 @@ public class ServerConfig
@NotNull
private Period unannouncePropogationDelay = Period.ZERO;

@JsonProperty
@Min(0)
private int inflateBufferSize = DEFAULT_GZIP_INFLATE_BUFFER_SIZE;

@JsonProperty
@Min(-1)
@Max(9)
private int compressionLevel = Deflater.DEFAULT_COMPRESSION;

public int getNumThreads()
{
return numThreads;
Expand Down Expand Up @@ -118,6 +132,17 @@ public Period getUnannouncePropogationDelay()
return unannouncePropogationDelay;
}

public int getInflateBufferSize()
{
return inflateBufferSize;
}

public int getCompressionLevel()
{
return compressionLevel;
}


@Override
public boolean equals(Object o)
{
Expand All @@ -135,6 +160,8 @@ public boolean equals(Object o)
maxScatterGatherBytes == that.maxScatterGatherBytes &&
maxQueryTimeout == that.maxQueryTimeout &&
maxRequestHeaderSize == that.maxRequestHeaderSize &&
inflateBufferSize == that.inflateBufferSize &&
compressionLevel == that.compressionLevel &&
Objects.equals(maxIdleTime, that.maxIdleTime) &&
Objects.equals(gracefulShutdownTimeout, that.gracefulShutdownTimeout) &&
Objects.equals(unannouncePropogationDelay, that.unannouncePropogationDelay);
Expand All @@ -154,7 +181,9 @@ public int hashCode()
maxQueryTimeout,
maxRequestHeaderSize,
gracefulShutdownTimeout,
unannouncePropogationDelay
unannouncePropogationDelay,
inflateBufferSize,
compressionLevel
);
}

Expand All @@ -172,6 +201,8 @@ public String toString()
", maxRequestHeaderSize=" + maxRequestHeaderSize +
", gracefulShutdownTimeout=" + gracefulShutdownTimeout +
", unannouncePropogationDelay=" + unannouncePropogationDelay +
", inflateBufferSize=" + inflateBufferSize +
", compressionLevel=" + compressionLevel +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,7 @@
import com.google.inject.Injector;
import com.google.inject.Key;
import com.google.inject.TypeLiteral;

import io.druid.java.util.common.ISE;

import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.handler.RequestLogHandler;
import org.eclipse.jetty.server.handler.gzip.GzipHandler;
Expand All @@ -38,11 +36,13 @@ public class JettyServerInitUtils
{
private static final String[] GZIP_METHODS = new String[]{HttpMethod.GET, HttpMethod.POST};

public static GzipHandler wrapWithDefaultGzipHandler(final Handler handler)
public static GzipHandler wrapWithDefaultGzipHandler(final Handler handler, int inflateBufferSize, int compressionLevel)
{
GzipHandler gzipHandler = new GzipHandler();
gzipHandler.setMinGzipSize(0);
gzipHandler.setIncludedMethods(GZIP_METHODS);
gzipHandler.setInflateBufferSize(inflateBufferSize);
gzipHandler.setCompressionLevel(compressionLevel);

// We don't actually have any precomputed .gz resources, and checking for them inside jars is expensive.
gzipHandler.setCheckGzExists(false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -436,7 +436,7 @@ public String getCurrentLeader()
JettyServerInitUtils.addExtensionFilters(root, injector);

final HandlerList handlerList = new HandlerList();
handlerList.setHandlers(new Handler[]{JettyServerInitUtils.wrapWithDefaultGzipHandler(root)});
handlerList.setHandlers(new Handler[]{JettyServerInitUtils.wrapWithDefaultGzipHandler(root, 4096, -1)});
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should have used ServerConfig.DEFAULT_GZIP_INFLATE_BUFFER_SIZE and Deflater.DEFAULT_COMPRESSION instead of magic constants

server.setHandler(handlerList);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,7 @@ protected URI rewriteURI(HttpServletRequest request, String scheme, String host)
root.addFilter(GuiceFilter.class, "/exception/*", null);

final HandlerList handlerList = new HandlerList();
handlerList.setHandlers(new Handler[]{JettyServerInitUtils.wrapWithDefaultGzipHandler(root)});
handlerList.setHandlers(new Handler[]{JettyServerInitUtils.wrapWithDefaultGzipHandler(root, 4096, -1)});
server.setHandler(handlerList);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import javax.servlet.ServletResponse;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import javax.ws.rs.Consumes;
import javax.ws.rs.DELETE;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
Expand All @@ -66,6 +67,8 @@

public abstract class BaseJettyTest
{
protected static final String DEFAULT_RESPONSE_CONTENT = "hello";

protected Lifecycle lifecycle;
protected HttpClient client;
protected Server server;
Expand Down Expand Up @@ -142,7 +145,7 @@ public void initialize(Server server, Injector injector)
root.addFilter(GuiceFilter.class, "/*", null);

final HandlerList handlerList = new HandlerList();
handlerList.setHandlers(new Handler[]{JettyServerInitUtils.wrapWithDefaultGzipHandler(root)});
handlerList.setHandlers(new Handler[]{JettyServerInitUtils.wrapWithDefaultGzipHandler(root, 4096, -1)});
server.setHandler(handlerList);
}

Expand All @@ -165,33 +168,46 @@ public Response hello()
catch (InterruptedException e) {
//
}
return Response.ok("hello").build();
return Response.ok(DEFAULT_RESPONSE_CONTENT).build();
}
}

@Path("/default")
public static class DefaultResource
{

@DELETE
@Path("{resource}")
@Produces(MediaType.APPLICATION_JSON)
public Response delete()
{
return Response.ok("hello").build();
return Response.ok(DEFAULT_RESPONSE_CONTENT).build();
}

@GET
@Produces(MediaType.APPLICATION_JSON)
public Response get()
{
return Response.ok("hello").build();
return Response.ok(DEFAULT_RESPONSE_CONTENT).build();
}

@POST
@Produces(MediaType.APPLICATION_JSON)
public Response post()
{
return Response.ok("hello").build();
return Response.ok(DEFAULT_RESPONSE_CONTENT).build();
}
}

@Path("/return")
public static class DirectlyReturnResource
{
@POST
@Consumes(MediaType.TEXT_PLAIN)
@Produces(MediaType.TEXT_PLAIN)
public Response postText(String text)
{
return Response.ok(text).build();
}
}

Expand Down
37 changes: 32 additions & 5 deletions server/src/test/java/io/druid/server/initialization/JettyTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,17 +27,17 @@
import com.google.inject.Key;
import com.google.inject.Module;
import com.google.inject.multibindings.Multibinder;
import io.druid.java.util.http.client.Request;
import io.druid.java.util.http.client.response.InputStreamResponseHandler;
import io.druid.java.util.http.client.response.StatusResponseHandler;
import io.druid.java.util.http.client.response.StatusResponseHolder;
import io.druid.guice.GuiceInjectors;
import io.druid.guice.Jerseys;
import io.druid.guice.JsonConfigProvider;
import io.druid.guice.LazySingleton;
import io.druid.guice.LifecycleModule;
import io.druid.guice.annotations.Self;
import io.druid.initialization.Initialization;
import io.druid.java.util.http.client.Request;
import io.druid.java.util.http.client.response.InputStreamResponseHandler;
import io.druid.java.util.http.client.response.StatusResponseHandler;
import io.druid.java.util.http.client.response.StatusResponseHolder;
import io.druid.server.DruidNode;
import io.druid.server.initialization.jetty.JettyServerInitializer;
import io.druid.server.initialization.jetty.ServletFilterHolder;
Expand All @@ -53,12 +53,15 @@
import javax.servlet.DispatcherType;
import javax.servlet.Filter;
import javax.servlet.http.HttpServletResponse;
import javax.ws.rs.core.MediaType;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.StringWriter;
import java.net.HttpURLConnection;
import java.net.URL;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.EnumSet;
import java.util.Locale;
import java.util.Map;
Expand All @@ -67,6 +70,8 @@
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;

public class JettyTest extends BaseJettyTest
{
Expand Down Expand Up @@ -129,6 +134,7 @@ public EnumSet<DispatcherType> getDispatcherType()
Jerseys.addResource(binder, SlowResource.class);
Jerseys.addResource(binder, ExceptionResource.class);
Jerseys.addResource(binder, DefaultResource.class);
Jerseys.addResource(binder, DirectlyReturnResource.class);
binder.bind(AuthorizerMapper.class).toInstance(AuthTestUtils.TEST_AUTHORIZER_MAPPER);
LifecycleModule.register(binder, Server.class);
}
Expand Down Expand Up @@ -196,24 +202,28 @@ public void run()
}

@Test
public void testGzipCompression() throws Exception
public void testGzipResponseCompression() throws Exception
{
final URL url = new URL("http://localhost:" + port + "/default");
final HttpURLConnection get = (HttpURLConnection) url.openConnection();
get.setRequestProperty("Accept-Encoding", "gzip");
Assert.assertEquals("gzip", get.getContentEncoding());
Assert.assertEquals(DEFAULT_RESPONSE_CONTENT, IOUtils.toString(new GZIPInputStream(get.getInputStream()), StandardCharsets.UTF_8));

final HttpURLConnection post = (HttpURLConnection) url.openConnection();
post.setRequestProperty("Accept-Encoding", "gzip");
post.setRequestMethod("POST");
Assert.assertEquals("gzip", post.getContentEncoding());
Assert.assertEquals(DEFAULT_RESPONSE_CONTENT, IOUtils.toString(new GZIPInputStream(post.getInputStream()), StandardCharsets.UTF_8));

final HttpURLConnection getNoGzip = (HttpURLConnection) url.openConnection();
Assert.assertNotEquals("gzip", getNoGzip.getContentEncoding());
Assert.assertEquals(DEFAULT_RESPONSE_CONTENT, IOUtils.toString(getNoGzip.getInputStream(), StandardCharsets.UTF_8));

final HttpURLConnection postNoGzip = (HttpURLConnection) url.openConnection();
postNoGzip.setRequestMethod("POST");
Assert.assertNotEquals("gzip", postNoGzip.getContentEncoding());
Assert.assertEquals(DEFAULT_RESPONSE_CONTENT, IOUtils.toString(postNoGzip.getInputStream(), StandardCharsets.UTF_8));
}

// Tests that threads are not stuck when partial chunk is not finalized
Expand Down Expand Up @@ -283,4 +293,21 @@ public void testExtensionAuthFilter() throws Exception
get.setRequestProperty(DummyAuthFilter.AUTH_HDR, "hacker");
Assert.assertEquals(HttpServletResponse.SC_UNAUTHORIZED, get.getResponseCode());
}

@Test
public void testGzipRequestDecompression() throws Exception
{
String text = "hello";
ByteArrayOutputStream out = new ByteArrayOutputStream();
try (GZIPOutputStream gzipOutputStream = new GZIPOutputStream(out)) {
gzipOutputStream.write(text.getBytes(Charset.defaultCharset()));
}
Request request = new Request(HttpMethod.POST, new URL("http://localhost:" + port + "/return"));
request.setHeader("Content-Encoding", "gzip");
request.setContent(MediaType.TEXT_PLAIN, out.toByteArray());
Assert.assertEquals(text, new String(IOUtils.toByteArray(client.go(
request,
new InputStreamResponseHandler()
).get()), Charset.defaultCharset()));
}
}
11 changes: 9 additions & 2 deletions services/src/main/java/io/druid/cli/CliOverlord.java
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@
import io.druid.server.coordinator.CoordinatorOverlordServiceConfig;
import io.druid.server.http.RedirectFilter;
import io.druid.server.http.RedirectInfo;
import io.druid.server.initialization.ServerConfig;
import io.druid.server.initialization.jetty.JettyServerInitUtils;
import io.druid.server.initialization.jetty.JettyServerInitializer;
import io.druid.server.security.AuthConfig;
Expand Down Expand Up @@ -307,11 +308,13 @@ private void configureOverlordHelpers(Binder binder)
private static class OverlordJettyServerInitializer implements JettyServerInitializer
{
private final AuthConfig authConfig;
private final ServerConfig serverConfig;

@Inject
OverlordJettyServerInitializer(AuthConfig authConfig)
OverlordJettyServerInitializer(AuthConfig authConfig, ServerConfig serverConfig)
{
this.authConfig = authConfig;
this.serverConfig = serverConfig;
}

@Override
Expand Down Expand Up @@ -373,7 +376,11 @@ public void initialize(Server server, Injector injector)
handlerList.setHandlers(
new Handler[]{
JettyServerInitUtils.getJettyRequestLogHandler(),
JettyServerInitUtils.wrapWithDefaultGzipHandler(root)
JettyServerInitUtils.wrapWithDefaultGzipHandler(
root,
serverConfig.getInflateBufferSize(),
serverConfig.getCompressionLevel()
)
}
);

Expand Down
Loading