Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/content/configuration/broker.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ Druid uses Jetty to serve HTTP requests.
|`druid.server.http.maxIdleTime`|The Jetty max idle time for a connection.|PT5m|
|`druid.server.http.defaultQueryTimeout`|Query timeout in millis, beyond which unfinished queries will be cancelled|300000|
|`druid.server.http.maxScatterGatherBytes`|Maximum number of bytes gathered from data nodes such as historicals and realtime processes to execute a query. This is an advance configuration that allows to protect in case broker is under heavy load and not utilizing the data gathered in memory fast enough and leading to OOMs. This limit can be further reduced at query time using `maxScatterGatherBytes` in the context. Note that having large limit is not necessarily bad if broker is never under heavy concurrent load in which case data gathered is processed quickly and freeing up the memory used.|Long.MAX_VALUE|
|`druid.server.http.gracefulShutdownTimeout`|The maximum amount of time Jetty waits after receiving shutdown signal. After this timeout the threads will be forcefully shutdown. This allows any queries that are executing to complete.|PT15s|
|`druid.broker.http.numConnections`|Size of connection pool for the Broker to connect to historical and real-time processes. If there are more queries than this number that all need to speak to the same node, then they will queue up.|20|
|`druid.broker.http.compressionCodec`|Compression codec the Broker uses to communicate with historical and real-time processes. May be "gzip" or "identity".|gzip|
|`druid.broker.http.readTimeout`|The timeout for data reads from historical and real-time processes.|PT15M|
Expand Down
1 change: 1 addition & 0 deletions docs/content/configuration/historical.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ Druid uses Jetty to serve HTTP requests.
|`druid.server.http.numThreads`|Number of threads for HTTP requests.|max(10, (Number of cores * 17) / 16 + 2) + 30|
|`druid.server.http.maxIdleTime`|The Jetty max idle time for a connection.|PT5m|
|`druid.server.http.defaultQueryTimeout`|Query timeout in millis, beyond which unfinished queries will be cancelled|300000|
|`druid.server.http.gracefulShutdownTimeout`|The maximum amount of time Jetty waits after receiving shutdown signal. After this timeout the threads will be forcefully shutdown. This allows any queries that are executing to complete.|PT15s|

#### Processing

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ public class ServerConfig
@JsonProperty
private boolean tls = false;

@JsonProperty
@NotNull
private Period gracefulShutdownTimeout = new Period("PT15s");
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

documentation says it is 5s by default. should probably be PT5s .


public int getNumThreads()
{
return numThreads;
Expand Down Expand Up @@ -82,6 +86,11 @@ public boolean isTls()
return tls;
}

public Period getGracefulShutdownTimeout()
{
return gracefulShutdownTimeout;
}

@Override
public boolean equals(Object o)
{
Expand Down Expand Up @@ -116,6 +125,7 @@ public String toString()
", maxScatterGatherBytes=" + maxScatterGatherBytes +
", plaintext=" + plaintext +
", tls=" + tls +
", gracefulStopTimeout=" + gracefulShutdownTimeout +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.server.SslConnectionFactory;
import org.eclipse.jetty.server.handler.StatisticsHandler;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.eclipse.jetty.util.thread.ScheduledExecutorScheduler;
Expand All @@ -85,6 +86,7 @@ public class JettyServerModule extends JerseyServletModule
private static final Logger log = new Logger(JettyServerModule.class);

private static final AtomicInteger activeConnections = new AtomicInteger();
private static final String GRACEFUL_SHUTDOWN_TIMEOUT = "graceful_shutdown_timeout";

@Override
protected void configureServlets()
Expand Down Expand Up @@ -226,7 +228,9 @@ static Server makeJettyServer(DruidNode node, ServerConfig config, TLSServerConf
connector.setConnectionFactories(monitoredConnFactories);
}

server.setHandler(new StatisticsHandler());
server.setConnectors(connectors);
server.setAttribute(GRACEFUL_SHUTDOWN_TIMEOUT, config.getGracefulShutdownTimeout().toStandardDuration().getMillis());

return server;
}
Expand All @@ -250,8 +254,50 @@ public void start() throws Exception
server.start();
}

private Handler getStatisticsHandler()
{
for (Handler handler : server.getHandlers()) {
if (handler.getClass().equals(StatisticsHandler.class)) {
return handler;
}
}

return null;
}

@Override
public void stop()
{
StatisticsHandler statisticsHandler = (StatisticsHandler) getStatisticsHandler();

if (statisticsHandler != null) {
long startTime = System.currentTimeMillis();
long gracefulShutDownTimeout = (long) server.getAttribute(GRACEFUL_SHUTDOWN_TIMEOUT);

if (statisticsHandler.getRequestsActive() > 0) {
log.info("Waiting for upto [%s] milliseconds for active requests to be zero, current active requests=[%s]",
gracefulShutDownTimeout, statisticsHandler.getRequestsActive());
}

while (statisticsHandler.getRequestsActive() > 0 &&
System.currentTimeMillis() - startTime < gracefulShutDownTimeout) {
try {
Thread.sleep(500);
}
catch (InterruptedException e) {
log.error("Sleep has been interrupted, while waiting for active requests to be zero");
stopImmediately();
Thread.currentThread().interrupt();
return;
}
}
log.info("Stopping Jetty Server with active requests=[%s]", statisticsHandler.getRequestsActive());
}

stopImmediately();
}

private void stopImmediately()
{
try {
server.stop();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,13 +57,21 @@ public void initialize(Server server, Injector injector)
root.addFilter(GuiceFilter.class, "/*", null);

final HandlerList handlerList = new HandlerList();
final Handler[] handlers = new Handler[extensionHandlers.size() + 2];
handlers[0] = JettyServerInitUtils.getJettyRequestLogHandler();
handlers[handlers.length - 1] = JettyServerInitUtils.wrapWithDefaultGzipHandler(root);
for (int i = 0; i < extensionHandlers.size(); i++) {
handlers[i + 1] = extensionHandlers.get(i);
// Do not change the order of the handlers that have already been added
for (Handler handler : server.getHandlers()) {
handlerList.addHandler(handler);
}
handlerList.setHandlers(handlers);

handlerList.addHandler(JettyServerInitUtils.getJettyRequestLogHandler());

// Add all extension handlers
for (Handler handler : extensionHandlers) {
handlerList.addHandler(handler);
}

// Add Gzip handler at the very end
handlerList.addHandler(JettyServerInitUtils.wrapWithDefaultGzipHandler(root));

server.setHandler(handlerList);
}
}