diff --git a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/web/ResponseHandlerFilter.java b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/web/ResponseHandlerFilter.java new file mode 100644 index 0000000000000..909d774a41c4f --- /dev/null +++ b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/web/ResponseHandlerFilter.java @@ -0,0 +1,73 @@ +/** + * Copyright 2016 Yahoo Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.yahoo.pulsar.broker.web; + +import java.io.IOException; + +import javax.servlet.Filter; +import javax.servlet.FilterChain; +import javax.servlet.FilterConfig; +import javax.servlet.ServletException; +import javax.servlet.ServletRequest; +import javax.servlet.ServletResponse; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; +import javax.ws.rs.core.Response.Status; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.yahoo.pulsar.broker.PulsarService; + +/** + * Servlet filter that hooks up to handle outgoing response + */ +public class ResponseHandlerFilter implements Filter { + private static final Logger LOG = LoggerFactory.getLogger(ResponseHandlerFilter.class); + + private final String brokerAddress; + + public ResponseHandlerFilter(PulsarService pulsar) { + this.brokerAddress = pulsar.getAdvertisedAddress(); + } + + @Override + public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain) + throws IOException, ServletException { + + chain.doFilter(request, response); + ((HttpServletResponse) response).addHeader("broker-address", brokerAddress); + if (((HttpServletResponse) response).getStatus() == Status.INTERNAL_SERVER_ERROR.getStatusCode()) { + // invalidate current session from servlet-container if it received internal-server-error + try { + ((HttpServletRequest) request).getSession(false).invalidate(); + } catch (Exception ignoreException) { + /* connection is already invalidated */ + } + } + + } + + @Override + public void init(FilterConfig arg) throws ServletException { + // No init necessary. + } + + @Override + public void destroy() { + // No state to clean up. + } +} diff --git a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/web/WebService.java b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/web/WebService.java index f92d96c12ec7c..f476a11d651c8 100644 --- a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/web/WebService.java +++ b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/web/WebService.java @@ -152,6 +152,9 @@ public void addServlet(String path, ServletHolder servletHolder, boolean require context.addFilter(holder, MATCH_ALL, EnumSet.allOf(DispatcherType.class)); log.info("Enabling ApiVersionFilter"); } + + FilterHolder responseFilter = new FilterHolder(new ResponseHandlerFilter(pulsar)); + context.addFilter(responseFilter, MATCH_ALL, EnumSet.allOf(DispatcherType.class)); handlers.add(context); } diff --git a/pulsar-broker/src/test/java/com/yahoo/pulsar/client/api/AuthenticatedProducerConsumerTest.java b/pulsar-broker/src/test/java/com/yahoo/pulsar/client/api/AuthenticatedProducerConsumerTest.java index 46518e98bc24e..ea1d7d7455105 100644 --- a/pulsar-broker/src/test/java/com/yahoo/pulsar/client/api/AuthenticatedProducerConsumerTest.java +++ b/pulsar-broker/src/test/java/com/yahoo/pulsar/client/api/AuthenticatedProducerConsumerTest.java @@ -15,30 +15,34 @@ */ package com.yahoo.pulsar.client.api; -import com.google.common.collect.Lists; -import com.google.common.collect.Sets; -import com.yahoo.pulsar.client.admin.PulsarAdmin; -import com.yahoo.pulsar.common.policies.data.ClusterData; -import com.yahoo.pulsar.common.policies.data.PropertyAdmin; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.testng.Assert; -import org.testng.annotations.AfterMethod; -import org.testng.annotations.BeforeMethod; -import org.testng.annotations.DataProvider; -import org.testng.annotations.Test; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.spy; import java.net.URI; +import java.util.HashMap; import java.util.HashSet; import java.util.Map; -import java.util.HashMap; import java.util.Set; -import java.util.concurrent.*; +import java.util.concurrent.TimeUnit; -import static org.mockito.Mockito.*; +import javax.ws.rs.InternalServerErrorException; + +import org.junit.Assert; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; -import com.yahoo.pulsar.broker.authentication.*; -import com.yahoo.pulsar.client.impl.auth.*; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; +import com.yahoo.pulsar.broker.authentication.AuthenticationProviderTls; +import com.yahoo.pulsar.client.admin.PulsarAdmin; +import com.yahoo.pulsar.client.admin.PulsarAdminException; +import com.yahoo.pulsar.client.impl.auth.AuthenticationTls; +import com.yahoo.pulsar.common.policies.data.ClusterData; +import com.yahoo.pulsar.common.policies.data.PropertyAdmin; public class AuthenticatedProducerConsumerTest extends ProducerConsumerBase { private static final Logger log = LoggerFactory.getLogger(AuthenticatedProducerConsumerTest.class); @@ -152,5 +156,35 @@ public void testTlsSyncProducerAndConsumer(int batchMessageDelayMs) throws Excep consumer.close(); log.info("-- Exiting {} test --", methodName); } + + /** + * Verifies: on 500 server error, broker invalidates session and client receives 500 correctly. + * + * @throws Exception + */ + @Test + public void testAuthemticationFilterNegative() throws Exception { + log.info("-- Starting {} test --", methodName); + + Map authParams = new HashMap<>(); + authParams.put("tlsCertFile", TLS_CLIENT_CERT_FILE_PATH); + authParams.put("tlsKeyFile", TLS_CLIENT_KEY_FILE_PATH); + Authentication authTls = new AuthenticationTls(); + authTls.configure(authParams); + internalSetup(authTls); + + final String cluster = "use"; + final ClusterData clusterData = new ClusterData(brokerUrl.toString(), brokerUrlTls.toString(), + "pulsar://localhost:" + BROKER_PORT, "pulsar+ssl://localhost:" + BROKER_PORT_TLS); + // this will cause NPE and it should throw 500 + doReturn(null).when(pulsar).getGlobalZkCache(); + try { + admin.clusters().createCluster(cluster, clusterData); + } catch (PulsarAdminException e) { + Assert.assertTrue(e.getCause() instanceof InternalServerErrorException); + } + + log.info("-- Exiting {} test --", methodName); + } } diff --git a/pulsar-client/src/main/java/com/yahoo/pulsar/client/impl/HttpClient.java b/pulsar-client/src/main/java/com/yahoo/pulsar/client/impl/HttpClient.java index 1152701509708..4850fb2bb60ca 100644 --- a/pulsar-client/src/main/java/com/yahoo/pulsar/client/impl/HttpClient.java +++ b/pulsar-client/src/main/java/com/yahoo/pulsar/client/impl/HttpClient.java @@ -21,15 +21,13 @@ import java.net.HttpURLConnection; import java.net.MalformedURLException; import java.net.URL; -import java.security.cert.X509Certificate; import java.util.Map; import java.util.Properties; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutorService; import io.netty.channel.EventLoopGroup; +import io.netty.handler.codec.http.HttpResponseStatus; import io.netty.handler.ssl.SslContext; -import io.netty.handler.ssl.SslContextBuilder; import org.asynchttpclient.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -72,6 +70,9 @@ protected HttpClient(String serviceUrl, Authentication authentication, EventLoop confBuilder.setConnectTimeout(connectTimeoutInSeconds * 1000); confBuilder.setReadTimeout(readTimeoutInSeconds * 1000); confBuilder.setUserAgent(String.format("Pulsar-Java-v%s", getPulsarClientVersion())); + confBuilder.setKeepAliveStrategy((request, httpRequest, httpResponse) -> { + return !httpResponse.getStatus().equals(HttpResponseStatus.INTERNAL_SERVER_ERROR); + }); if ("https".equals(url.getProtocol())) { try {