diff --git a/core/pom.xml b/core/pom.xml
index b6a14299af..6ed89326ba 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -15,6 +15,10 @@
jar
rest-utils
+
+ 4.11
+
+
@@ -83,5 +87,12 @@
jackson-annotations
${jackson.version}
+
+
+ junit
+ junit
+ ${junit.version}
+ test
+
diff --git a/core/src/main/java/io/confluent/rest/Application.java b/core/src/main/java/io/confluent/rest/Application.java
index f9d5ef1383..4f7e8c25a9 100644
--- a/core/src/main/java/io/confluent/rest/Application.java
+++ b/core/src/main/java/io/confluent/rest/Application.java
@@ -25,6 +25,8 @@
import org.glassfish.jersey.server.validation.ValidationFeature;
import org.glassfish.jersey.servlet.ServletContainer;
+import java.util.concurrent.CountDownLatch;
+
import javax.ws.rs.core.Configurable;
import io.confluent.rest.exceptions.ConstraintViolationExceptionMapper;
@@ -39,6 +41,8 @@
*/
public abstract class Application {
protected T config;
+ protected Server server = null;
+ protected CountDownLatch shutdownLatch = new CountDownLatch(1);
public Application() {}
@@ -82,11 +86,25 @@ public Server createServer() throws RestConfigException {
// Configure the servlet container
ServletContainer servletContainer = new ServletContainer(resourceConfig);
ServletHolder servletHolder = new ServletHolder(servletContainer);
- Server server = new Server(getConfiguration().getInt(RestConfig.PORT_CONFIG));
+ server = new Server(getConfiguration().getInt(RestConfig.PORT_CONFIG)) {
+ @Override
+ protected void doStop() throws Exception {
+ super.doStop();
+ Application.this.onShutdown();
+ Application.this.shutdownLatch.countDown();
+ }
+ };
ServletContextHandler context = new ServletContextHandler(ServletContextHandler.SESSIONS);
context.setContextPath("/");
context.addServlet(servletHolder, "/*");
server.setHandler(context);
+
+ int gracefulShutdownMs = getConfiguration().getInt(RestConfig.SHUTDOWN_GRACEFUL_MS_CONFIG);
+ if (gracefulShutdownMs > 0) {
+ server.setGracefulShutdown(gracefulShutdownMs);
+ }
+ server.setStopAtShutdown(true);
+
return server;
}
@@ -112,5 +130,42 @@ public void configureBaseApplication(Configurable> config) {
public T getConfiguration() {
return this.config;
}
+
+ /**
+ * Start the server (creating it if necessary).
+ * @throws Exception
+ */
+ public void start() throws Exception {
+ if (server == null) {
+ createServer();
+ }
+ server.start();
+ }
+
+ /**
+ * Wait for the server to exit, allowing existing requests to complete if graceful shutdown is
+ * enabled and invoking the shutdown hook before returning.
+ * @throws InterruptedException
+ */
+ public void join() throws InterruptedException {
+ server.join();
+ shutdownLatch.await();
+ }
+
+ /**
+ * Request that the server shutdown.
+ * @throws Exception
+ */
+ public void stop() throws Exception {
+ server.stop();
+ }
+
+ /**
+ * Shutdown hook that is invoked after the Jetty server has processed the shutdown request,
+ * stopped accepting new connections, and tried to gracefully finish existing requests. At this
+ * point it should be safe to clean up any resources used while processing requests.
+ */
+ public void onShutdown() {
+ }
}
diff --git a/core/src/main/java/io/confluent/rest/RestConfig.java b/core/src/main/java/io/confluent/rest/RestConfig.java
index 43a17621a8..0630b750dd 100644
--- a/core/src/main/java/io/confluent/rest/RestConfig.java
+++ b/core/src/main/java/io/confluent/rest/RestConfig.java
@@ -23,7 +23,7 @@
import java.util.Map;
import java.util.TreeMap;
-public abstract class RestConfig extends AbstractConfig {
+public class RestConfig extends AbstractConfig {
protected static final ConfigDef config;
public static final String DEBUG_CONFIG = "debug";
@@ -48,6 +48,11 @@ public abstract class RestConfig extends AbstractConfig {
"an Accept header.";
protected static final String RESPONSE_MEDIATYPE_DEFAULT_CONFIG_DEFAULT = "application/json";
+ public static final String SHUTDOWN_GRACEFUL_MS_CONFIG = "shutdown.graceful.ms";
+ protected static final String SHUTDOWN_GRACEFUL_MS_DOC =
+ "Amount of time to wait after a shutdown request for outstanding requests to complete.";
+ protected static final String SHUTDOWN_GRACEFUL_MS_DEFAULT = "1000";
+
static {
config = new ConfigDef()
.define(DEBUG_CONFIG, Type.BOOLEAN,
@@ -59,7 +64,10 @@ public abstract class RestConfig extends AbstractConfig {
RESPONSE_MEDIATYPE_PREFERRED_CONFIG_DOC)
.define(RESPONSE_MEDIATYPE_DEFAULT_CONFIG, Type.STRING,
RESPONSE_MEDIATYPE_DEFAULT_CONFIG_DEFAULT, Importance.HIGH,
- RESPONSE_MEDIATYPE_DEFAULT_CONFIG_DOC);
+ RESPONSE_MEDIATYPE_DEFAULT_CONFIG_DOC)
+ .define(SHUTDOWN_GRACEFUL_MS_CONFIG, Type.INT,
+ SHUTDOWN_GRACEFUL_MS_DEFAULT, Importance.LOW,
+ SHUTDOWN_GRACEFUL_MS_DOC);
}
public RestConfig() {
diff --git a/core/src/test/java/io/confluent/rest/ShutdownTest.java b/core/src/test/java/io/confluent/rest/ShutdownTest.java
new file mode 100644
index 0000000000..1d15a771d3
--- /dev/null
+++ b/core/src/test/java/io/confluent/rest/ShutdownTest.java
@@ -0,0 +1,149 @@
+/**
+ * Copyright 2014 Confluent Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package io.confluent.rest;
+
+import org.junit.Test;
+
+import java.io.IOException;
+import java.net.ServerSocket;
+import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.client.Client;
+import javax.ws.rs.client.ClientBuilder;
+import javax.ws.rs.core.Configurable;
+
+import static org.junit.Assert.*;
+
+public class ShutdownTest {
+ @Test
+ public void testShutdownHook() throws Exception {
+ Properties props = new Properties();
+ props.put("shutdown.graceful.ms", "50");
+ ShutdownApplication app = new ShutdownApplication(new RestConfig(props));
+ app.start();
+
+ StopThread stop = new StopThread(app);
+ stop.start();
+
+ app.join();
+ assertTrue(app.shutdown.get());
+ }
+
+ @Test
+ public void testGracefulShutdown() throws Exception {
+ Properties props = new Properties();
+ props.put("shutdown.graceful.ms", "50");
+ final RestConfig config = new RestConfig(props);
+ ShutdownApplication app = new ShutdownApplication(config);
+ app.start();
+
+ RequestThread req = new RequestThread(config);
+ req.start();
+ app.resource.requestProcessingStarted.await();
+
+ StopThread stop = new StopThread(app);
+ stop.start();
+
+ app.join();
+
+ assertTrue(req.finished);
+ assertEquals("done", req.response);
+ }
+
+
+ private static class ShutdownApplication extends Application {
+ public AtomicBoolean shutdown = new AtomicBoolean(false);
+ public SlowResource resource = new SlowResource();
+
+ ShutdownApplication(RestConfig props) {
+ super(props);
+ }
+
+ @Override
+ public void onShutdown() {
+ shutdown.set(true);
+ }
+
+ @Override
+ public void setupResources(Configurable> config, RestConfig appConfig) {
+ config.register(resource);
+ }
+ }
+
+ @Path("/")
+ public static class SlowResource {
+ public CountDownLatch requestProcessingStarted = new CountDownLatch(1);
+
+ @GET
+ public String test() throws InterruptedException {
+ requestProcessingStarted.countDown();
+ Thread.sleep(25);
+ return "done";
+ }
+ }
+
+ private static class StopThread extends Thread {
+ ShutdownApplication app;
+
+ StopThread(ShutdownApplication app) {
+ this.app = app;
+ }
+
+ @Override
+ public void run() {
+ try {
+ app.stop();
+ } catch (Exception e) {
+ }
+ }
+ };
+
+ private static class RequestThread extends Thread {
+ RestConfig config;
+ boolean finished = false;
+ String response = null;
+
+ RequestThread(RestConfig config) {
+ this.config = config;
+ }
+ @Override
+ public void run() {
+ // It seems that the server isn't necessarily listening when start() returns, which makes it
+ // difficult to know when it is safe to make this request. Just retry until we're able to make
+ // the request.
+ while(true) {
+ try {
+ Client client = ClientBuilder.newClient();
+ response = client
+ .target("http://localhost:" + config.getInt(RestConfig.PORT_CONFIG))
+ .path("/")
+ .request()
+ .get(String.class);
+ finished = true;
+ return;
+ } catch (javax.ws.rs.ProcessingException e) {
+ // ignore and retry
+ }
+ }
+ }
+ };
+
+}
\ No newline at end of file
diff --git a/examples/src/main/java/io/confluent/rest/examples/helloworld/HelloWorldApplication.java b/examples/src/main/java/io/confluent/rest/examples/helloworld/HelloWorldApplication.java
index f02a57f74b..c93d1686ce 100644
--- a/examples/src/main/java/io/confluent/rest/examples/helloworld/HelloWorldApplication.java
+++ b/examples/src/main/java/io/confluent/rest/examples/helloworld/HelloWorldApplication.java
@@ -15,7 +15,6 @@
*/
package io.confluent.rest.examples.helloworld;
-import org.eclipse.jetty.server.Server;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -60,10 +59,9 @@ public static void main(String[] args) {
}
HelloWorldRestConfig config = new HelloWorldRestConfig(settings);
HelloWorldApplication app = new HelloWorldApplication(config);
- Server server = app.createServer();
- server.start();
+ app.start();
log.info("Server started, listening for requests...");
- server.join();
+ app.join();
} catch (RestConfigException e) {
log.error("Server configuration failed: " + e.getMessage());
System.exit(1);