Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docs/content/configuration/broker.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ Druid uses Jetty to serve HTTP requests.
|`druid.server.http.enableRequestLimit`|If enabled, no requests would be queued in jetty queue and "HTTP 429 Too Many Requests" error response would be sent. |false|
|`druid.server.http.defaultQueryTimeout`|Query timeout in millis, beyond which unfinished queries will be cancelled|300000|
|`druid.server.http.maxScatterGatherBytes`|Maximum number of bytes gathered from data nodes such as historicals and realtime processes to execute a query. This is an advance configuration that allows to protect in case broker is under heavy load and not utilizing the data gathered in memory fast enough and leading to OOMs. This limit can be further reduced at query time using `maxScatterGatherBytes` in the context. Note that having large limit is not necessarily bad if broker is never under heavy concurrent load in which case data gathered is processed quickly and freeing up the memory used.|Long.MAX_VALUE|
|`druid.server.http.gracefulShutdownTimeout`|The maximum amount of time Jetty waits after receiving shutdown signal. After this timeout the threads will be forcefully shutdown. This allows any queries that are executing to complete.|`PT0s` (do not wait)|
|`druid.server.http.unannouncePropogationDelay`|How long to wait for zookeeper unannouncements to propgate before shutting down Jetty. This is a minimum and `druid.server.http.gracefulShutdownTimeout` does not start counting down until after this period elapses.|`PT0s` (do not wait)|
|`druid.broker.http.numConnections`|Size of connection pool for the Broker to connect to historical and real-time processes. If there are more queries than this number that all need to speak to the same node, then they will queue up.|20|
|`druid.broker.http.compressionCodec`|Compression codec the Broker uses to communicate with historical and real-time processes. May be "gzip" or "identity".|gzip|
|`druid.broker.http.readTimeout`|The timeout for data reads from historical and real-time processes.|PT15M|
Expand Down
2 changes: 2 additions & 0 deletions docs/content/configuration/historical.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ Druid uses Jetty to serve HTTP requests.
|`druid.server.http.maxIdleTime`|The Jetty max idle time for a connection.|PT5m|
|`druid.server.http.enableRequestLimit`|If enabled, no requests would be queued in jetty queue and "HTTP 429 Too Many Requests" error response would be sent. |false|
|`druid.server.http.defaultQueryTimeout`|Query timeout in millis, beyond which unfinished queries will be cancelled|300000|
|`druid.server.http.gracefulShutdownTimeout`|The maximum amount of time Jetty waits after receiving shutdown signal. After this timeout the threads will be forcefully shutdown. This allows any queries that are executing to complete.|`PT0s` (do not wait)|
|`druid.server.http.unannouncePropogationDelay`|How long to wait for zookeeper unannouncements to propgate before shutting down Jetty. This is a minimum and `druid.server.http.gracefulShutdownTimeout` does not start counting down until after this period elapses.|`PT0s` (do not wait)|
|`druid.server.http.maxQueryTimeout`|Maximum allowed value (in milliseconds) for `timeout` parameter. See [query-context](query-context.html) to know more about `timeout`. Query is rejected if the query context `timeout` is greater than this value. |Long.MAX_VALUE|
|`druid.server.http.maxRequestHeaderSize`|Maximum size of a request header in bytes. Larger headers consume more memory and can make a server more vulnerable to denial of service attacks.|8 * 1024|

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,14 @@ public class ServerConfig
@JsonProperty
private int maxRequestHeaderSize = 8 * 1024;

@JsonProperty
@NotNull
private Period gracefulShutdownTimeout = Period.ZERO;

@JsonProperty
@NotNull
private Period unannouncePropogationDelay = Period.ZERO;

public int getNumThreads()
{
return numThreads;
Expand Down Expand Up @@ -100,6 +108,16 @@ public int getMaxRequestHeaderSize()
return maxRequestHeaderSize;
}

public Period getGracefulShutdownTimeout()
{
return gracefulShutdownTimeout;
}

public Period getUnannouncePropogationDelay()
{
return unannouncePropogationDelay;
}

@Override
public boolean equals(Object o)
{
Expand All @@ -115,21 +133,45 @@ public boolean equals(Object o)
enableRequestLimit == that.enableRequestLimit &&
defaultQueryTimeout == that.defaultQueryTimeout &&
maxScatterGatherBytes == that.maxScatterGatherBytes &&
maxQueryTimeout == that.maxQueryTimeout &&
maxRequestHeaderSize == that.maxRequestHeaderSize &&
Objects.equals(maxIdleTime, that.maxIdleTime) &&
maxQueryTimeout == that.maxQueryTimeout;
Objects.equals(gracefulShutdownTimeout, that.gracefulShutdownTimeout) &&
Objects.equals(unannouncePropogationDelay, that.unannouncePropogationDelay);
}

@Override
public int hashCode()
{

return Objects.hash(
numThreads,
queueSize,
enableRequestLimit,
maxIdleTime,
defaultQueryTimeout,
maxScatterGatherBytes,
maxQueryTimeout
maxQueryTimeout,
maxRequestHeaderSize,
gracefulShutdownTimeout,
unannouncePropogationDelay
);
}

@Override
public String toString()
{
return "ServerConfig{" +
"numThreads=" + numThreads +
", queueSize=" + queueSize +
", enableRequestLimit=" + enableRequestLimit +
", maxIdleTime=" + maxIdleTime +
", defaultQueryTimeout=" + defaultQueryTimeout +
", maxScatterGatherBytes=" + maxScatterGatherBytes +
", maxQueryTimeout=" + maxQueryTimeout +
", maxRequestHeaderSize=" + maxRequestHeaderSize +
", gracefulShutdownTimeout=" + gracefulShutdownTimeout +
", unannouncePropogationDelay=" + unannouncePropogationDelay +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.server.SslConnectionFactory;
import org.eclipse.jetty.util.component.LifeCycle;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.eclipse.jetty.util.thread.ScheduledExecutorScheduler;
Expand Down Expand Up @@ -294,6 +295,42 @@ static Server makeAndInitializeServer(
}

server.setConnectors(connectors);
final long gracefulStop = config.getGracefulShutdownTimeout().toStandardDuration().getMillis();
if (gracefulStop > 0) {
server.setStopTimeout(gracefulStop);
}
server.addLifeCycleListener(new LifeCycle.Listener()
{
@Override
public void lifeCycleStarting(LifeCycle event)
{
log.debug("Jetty lifecycle starting [%s]", event.getClass());
}

@Override
public void lifeCycleStarted(LifeCycle event)
{
log.debug("Jetty lifeycle started [%s]", event.getClass());
}

@Override
public void lifeCycleFailure(LifeCycle event, Throwable cause)
{
log.error(cause, "Jetty lifecycle event failed [%s]", event.getClass());
}

@Override
public void lifeCycleStopping(LifeCycle event)
{
log.debug("Jetty lifecycle stopping [%s]", event.getClass());
}

@Override
public void lifeCycleStopped(LifeCycle event)
{
log.debug("Jetty lifecycle stopped [%s]", event.getClass());
}
});

// initialize server
JettyServerInitializer initializer = injector.getInstance(JettyServerInitializer.class);
Expand Down Expand Up @@ -339,9 +376,20 @@ public void start() throws Exception
public void stop()
{
try {
final long unannounceDelay = config.getUnannouncePropogationDelay().toStandardDuration().getMillis();
if (unannounceDelay > 0) {
log.info("Waiting %s ms for unannouncement to propogate.", unannounceDelay);
Thread.sleep(unannounceDelay);
} else {
log.debug("Skipping unannounce wait.");
}
log.info("Stopping Jetty Server...");
server.stop();
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RE(e, "Interrupted waiting for jetty shutdown.");
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So if an InterrupedException occurs which means this thread wakes up before unannounceDelay, server won't be stopped. Is this desirable? Maybe better to stop server anyway?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think that is desired here. The reason that stop() would be called is because the lifecycle is shutting down, aka there is already an interrupt requested.

If there's ANOTHER interrupt coming in, I would think it essentially means "You are going to get a kill -9 if you don't exit right now" in which case continuing on with the lifecycle shutdowns in a best effort stopping seems like the thing to do

}
catch (Exception e) {
log.warn(e, "Unable to stop Jetty server.");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.handler.HandlerList;
import org.eclipse.jetty.server.handler.StatisticsHandler;
import org.eclipse.jetty.servlet.DefaultServlet;
import org.eclipse.jetty.servlet.FilterHolder;
import org.eclipse.jetty.servlet.ServletContextHandler;
Expand Down Expand Up @@ -111,13 +112,24 @@ public void initialize(Server server, Injector injector)
root.addFilter(GuiceFilter.class, "/*", null);

final HandlerList handlerList = new HandlerList();
final Handler[] handlers = new Handler[extensionHandlers.size() + 2];
handlers[0] = JettyServerInitUtils.getJettyRequestLogHandler();
handlers[handlers.length - 1] = JettyServerInitUtils.wrapWithDefaultGzipHandler(root);
for (int i = 0; i < extensionHandlers.size(); i++) {
handlers[i + 1] = extensionHandlers.get(i);
// Do not change the order of the handlers that have already been added
for (Handler handler : server.getHandlers()) {
handlerList.addHandler(handler);
}
handlerList.setHandlers(handlers);
server.setHandler(handlerList);

handlerList.addHandler(JettyServerInitUtils.getJettyRequestLogHandler());

// Add all extension handlers
for (Handler handler : extensionHandlers) {
handlerList.addHandler(handler);
}

// Add Gzip handler at the very end
handlerList.addHandler(JettyServerInitUtils.wrapWithDefaultGzipHandler(root));

final StatisticsHandler statisticsHandler = new StatisticsHandler();
statisticsHandler.setHandler(handlerList);

server.setHandler(statisticsHandler);
}
}