From 0f4e91c70946df222b9181db3607b020687aa442 Mon Sep 17 00:00:00 2001 From: Ewen Cheslack-Postava Date: Mon, 29 Dec 2014 16:28:21 -0800 Subject: [PATCH] Add graceful shutdown and a shutdown hook in Application. The graceful shutdown period, during which new requests are rejected but outstanding requests are allowed to finish, is configurable. A shutdown hook in Application also allows additional resources not associated with any single request to be cleaned up. In order to ensure the process does not exit before the hook runs, lifecycle methods are added to Application that wrap and augment the ones in Jetty's Server class. --- core/pom.xml | 11 ++ .../java/io/confluent/rest/Application.java | 57 ++++++- .../java/io/confluent/rest/RestConfig.java | 12 +- .../java/io/confluent/rest/ShutdownTest.java | 149 ++++++++++++++++++ .../helloworld/HelloWorldApplication.java | 6 +- 5 files changed, 228 insertions(+), 7 deletions(-) create mode 100644 core/src/test/java/io/confluent/rest/ShutdownTest.java diff --git a/core/pom.xml b/core/pom.xml index b6a14299af..6ed89326ba 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -15,6 +15,10 @@ jar rest-utils + + 4.11 + + @@ -83,5 +87,12 @@ jackson-annotations ${jackson.version} + + + junit + junit + ${junit.version} + test + diff --git a/core/src/main/java/io/confluent/rest/Application.java b/core/src/main/java/io/confluent/rest/Application.java index f9d5ef1383..4f7e8c25a9 100644 --- a/core/src/main/java/io/confluent/rest/Application.java +++ b/core/src/main/java/io/confluent/rest/Application.java @@ -25,6 +25,8 @@ import org.glassfish.jersey.server.validation.ValidationFeature; import org.glassfish.jersey.servlet.ServletContainer; +import java.util.concurrent.CountDownLatch; + import javax.ws.rs.core.Configurable; import io.confluent.rest.exceptions.ConstraintViolationExceptionMapper; @@ -39,6 +41,8 @@ */ public abstract class Application { protected T config; + protected Server server = null; + protected CountDownLatch shutdownLatch = new CountDownLatch(1); public Application() {} @@ -82,11 +86,25 @@ public Server createServer() throws RestConfigException { // Configure the servlet container ServletContainer servletContainer = new ServletContainer(resourceConfig); ServletHolder servletHolder = new ServletHolder(servletContainer); - Server server = new Server(getConfiguration().getInt(RestConfig.PORT_CONFIG)); + server = new Server(getConfiguration().getInt(RestConfig.PORT_CONFIG)) { + @Override + protected void doStop() throws Exception { + super.doStop(); + Application.this.onShutdown(); + Application.this.shutdownLatch.countDown(); + } + }; ServletContextHandler context = new ServletContextHandler(ServletContextHandler.SESSIONS); context.setContextPath("/"); context.addServlet(servletHolder, "/*"); server.setHandler(context); + + int gracefulShutdownMs = getConfiguration().getInt(RestConfig.SHUTDOWN_GRACEFUL_MS_CONFIG); + if (gracefulShutdownMs > 0) { + server.setGracefulShutdown(gracefulShutdownMs); + } + server.setStopAtShutdown(true); + return server; } @@ -112,5 +130,42 @@ public void configureBaseApplication(Configurable config) { public T getConfiguration() { return this.config; } + + /** + * Start the server (creating it if necessary). + * @throws Exception + */ + public void start() throws Exception { + if (server == null) { + createServer(); + } + server.start(); + } + + /** + * Wait for the server to exit, allowing existing requests to complete if graceful shutdown is + * enabled and invoking the shutdown hook before returning. + * @throws InterruptedException + */ + public void join() throws InterruptedException { + server.join(); + shutdownLatch.await(); + } + + /** + * Request that the server shutdown. + * @throws Exception + */ + public void stop() throws Exception { + server.stop(); + } + + /** + * Shutdown hook that is invoked after the Jetty server has processed the shutdown request, + * stopped accepting new connections, and tried to gracefully finish existing requests. At this + * point it should be safe to clean up any resources used while processing requests. + */ + public void onShutdown() { + } } diff --git a/core/src/main/java/io/confluent/rest/RestConfig.java b/core/src/main/java/io/confluent/rest/RestConfig.java index 43a17621a8..0630b750dd 100644 --- a/core/src/main/java/io/confluent/rest/RestConfig.java +++ b/core/src/main/java/io/confluent/rest/RestConfig.java @@ -23,7 +23,7 @@ import java.util.Map; import java.util.TreeMap; -public abstract class RestConfig extends AbstractConfig { +public class RestConfig extends AbstractConfig { protected static final ConfigDef config; public static final String DEBUG_CONFIG = "debug"; @@ -48,6 +48,11 @@ public abstract class RestConfig extends AbstractConfig { "an Accept header."; protected static final String RESPONSE_MEDIATYPE_DEFAULT_CONFIG_DEFAULT = "application/json"; + public static final String SHUTDOWN_GRACEFUL_MS_CONFIG = "shutdown.graceful.ms"; + protected static final String SHUTDOWN_GRACEFUL_MS_DOC = + "Amount of time to wait after a shutdown request for outstanding requests to complete."; + protected static final String SHUTDOWN_GRACEFUL_MS_DEFAULT = "1000"; + static { config = new ConfigDef() .define(DEBUG_CONFIG, Type.BOOLEAN, @@ -59,7 +64,10 @@ public abstract class RestConfig extends AbstractConfig { RESPONSE_MEDIATYPE_PREFERRED_CONFIG_DOC) .define(RESPONSE_MEDIATYPE_DEFAULT_CONFIG, Type.STRING, RESPONSE_MEDIATYPE_DEFAULT_CONFIG_DEFAULT, Importance.HIGH, - RESPONSE_MEDIATYPE_DEFAULT_CONFIG_DOC); + RESPONSE_MEDIATYPE_DEFAULT_CONFIG_DOC) + .define(SHUTDOWN_GRACEFUL_MS_CONFIG, Type.INT, + SHUTDOWN_GRACEFUL_MS_DEFAULT, Importance.LOW, + SHUTDOWN_GRACEFUL_MS_DOC); } public RestConfig() { diff --git a/core/src/test/java/io/confluent/rest/ShutdownTest.java b/core/src/test/java/io/confluent/rest/ShutdownTest.java new file mode 100644 index 0000000000..1d15a771d3 --- /dev/null +++ b/core/src/test/java/io/confluent/rest/ShutdownTest.java @@ -0,0 +1,149 @@ +/** + * Copyright 2014 Confluent Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package io.confluent.rest; + +import org.junit.Test; + +import java.io.IOException; +import java.net.ServerSocket; +import java.util.Properties; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicBoolean; + +import javax.ws.rs.GET; +import javax.ws.rs.Path; +import javax.ws.rs.client.Client; +import javax.ws.rs.client.ClientBuilder; +import javax.ws.rs.core.Configurable; + +import static org.junit.Assert.*; + +public class ShutdownTest { + @Test + public void testShutdownHook() throws Exception { + Properties props = new Properties(); + props.put("shutdown.graceful.ms", "50"); + ShutdownApplication app = new ShutdownApplication(new RestConfig(props)); + app.start(); + + StopThread stop = new StopThread(app); + stop.start(); + + app.join(); + assertTrue(app.shutdown.get()); + } + + @Test + public void testGracefulShutdown() throws Exception { + Properties props = new Properties(); + props.put("shutdown.graceful.ms", "50"); + final RestConfig config = new RestConfig(props); + ShutdownApplication app = new ShutdownApplication(config); + app.start(); + + RequestThread req = new RequestThread(config); + req.start(); + app.resource.requestProcessingStarted.await(); + + StopThread stop = new StopThread(app); + stop.start(); + + app.join(); + + assertTrue(req.finished); + assertEquals("done", req.response); + } + + + private static class ShutdownApplication extends Application { + public AtomicBoolean shutdown = new AtomicBoolean(false); + public SlowResource resource = new SlowResource(); + + ShutdownApplication(RestConfig props) { + super(props); + } + + @Override + public void onShutdown() { + shutdown.set(true); + } + + @Override + public void setupResources(Configurable config, RestConfig appConfig) { + config.register(resource); + } + } + + @Path("/") + public static class SlowResource { + public CountDownLatch requestProcessingStarted = new CountDownLatch(1); + + @GET + public String test() throws InterruptedException { + requestProcessingStarted.countDown(); + Thread.sleep(25); + return "done"; + } + } + + private static class StopThread extends Thread { + ShutdownApplication app; + + StopThread(ShutdownApplication app) { + this.app = app; + } + + @Override + public void run() { + try { + app.stop(); + } catch (Exception e) { + } + } + }; + + private static class RequestThread extends Thread { + RestConfig config; + boolean finished = false; + String response = null; + + RequestThread(RestConfig config) { + this.config = config; + } + @Override + public void run() { + // It seems that the server isn't necessarily listening when start() returns, which makes it + // difficult to know when it is safe to make this request. Just retry until we're able to make + // the request. + while(true) { + try { + Client client = ClientBuilder.newClient(); + response = client + .target("http://localhost:" + config.getInt(RestConfig.PORT_CONFIG)) + .path("/") + .request() + .get(String.class); + finished = true; + return; + } catch (javax.ws.rs.ProcessingException e) { + // ignore and retry + } + } + } + }; + +} \ No newline at end of file diff --git a/examples/src/main/java/io/confluent/rest/examples/helloworld/HelloWorldApplication.java b/examples/src/main/java/io/confluent/rest/examples/helloworld/HelloWorldApplication.java index f02a57f74b..c93d1686ce 100644 --- a/examples/src/main/java/io/confluent/rest/examples/helloworld/HelloWorldApplication.java +++ b/examples/src/main/java/io/confluent/rest/examples/helloworld/HelloWorldApplication.java @@ -15,7 +15,6 @@ */ package io.confluent.rest.examples.helloworld; -import org.eclipse.jetty.server.Server; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -60,10 +59,9 @@ public static void main(String[] args) { } HelloWorldRestConfig config = new HelloWorldRestConfig(settings); HelloWorldApplication app = new HelloWorldApplication(config); - Server server = app.createServer(); - server.start(); + app.start(); log.info("Server started, listening for requests..."); - server.join(); + app.join(); } catch (RestConfigException e) { log.error("Server configuration failed: " + e.getMessage()); System.exit(1);