From e21478011a64bcb7b501a219432202c0e3ce323a Mon Sep 17 00:00:00 2001 From: Ivan Kelly Date: Fri, 26 Oct 2018 14:32:24 +0200 Subject: [PATCH 1/2] Add connection timeout client configuration option Allows the client to specify how long to wait for brokers to respond. --- .../pulsar/client/api/ClientBuilder.java | 9 +++ .../client/api/ClientConfiguration.java | 21 +++++++ .../pulsar/client/impl/ClientBuilderImpl.java | 6 ++ .../pulsar/client/impl/ConnectionPool.java | 2 +- .../impl/conf/ClientConfigurationData.java | 3 +- .../client/impl/ConnectionTimeoutTest.java | 62 +++++++++++++++++++ 6 files changed, 101 insertions(+), 2 deletions(-) create mode 100644 pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConnectionTimeoutTest.java diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java index 2831c0989ac01..9fa2f91f6749b 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java @@ -309,4 +309,13 @@ ClientBuilder authentication(String authPluginClassName, Map aut * @param unit time unit for {@code statsInterval} */ ClientBuilder keepAliveInterval(int keepAliveIntervalSeconds, TimeUnit unit); + + /** + * Set the duration of time to wait for a connection to a broker to be established. If the duration + * passes without a response from the broker, the connection attempt is dropped. + * + * @param duration the duration to wait + * @param unit the time unit in which the duration is defined + */ + ClientBuilder connectionTimeout(int duration, TimeUnit unit); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ClientConfiguration.java b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ClientConfiguration.java index a2213b3bd1d29..e5bac914b7865 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ClientConfiguration.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ClientConfiguration.java @@ -362,6 +362,27 @@ public ClientConfiguration setServiceUrl(String serviceUrl) { return this; } + /** + * Set the duration of time to wait for a connection to a broker to be established. If the duration + * passes without a response from the broker, the connection attempt is dropped. + * + * @param duration the duration to wait + * @param unit the time unit in which the duration is defined + */ + public void setConnectionTimeout(int duration, TimeUnit unit) { + confData.setConnectionTimeoutMs((int)unit.toMillis(duration)); + } + + /** + * Get the duration of time for which the client will wait for a connection to a broker to be + * established before giving up. + * + * @return the duration, in milliseconds + */ + public long getConnectionTimeoutMs() { + return confData.getConnectionTimeoutMs(); + } + public ClientConfigurationData getConfigurationData() { return confData; } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java index 3e87bc93520eb..d5545cee3df1f 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java @@ -189,6 +189,12 @@ public ClientBuilder keepAliveInterval(int keepAliveIntervalSeconds, TimeUnit un return this; } + @Override + public ClientBuilder connectionTimeout(int duration, TimeUnit unit) { + conf.setConnectionTimeoutMs((int)unit.toMillis(duration)); + return this; + } + public ClientConfigurationData getClientConfigurationData() { return conf; } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java index 0c7751bd9abeb..25f4b14a7fd3a 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java @@ -82,7 +82,7 @@ public ConnectionPool(ClientConfigurationData conf, EventLoopGroup eventLoopGrou bootstrap.group(eventLoopGroup); bootstrap.channel(EventLoopUtil.getClientSocketChannelClass(eventLoopGroup)); - bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10000); + bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, conf.getConnectionTimeoutMs()); bootstrap.option(ChannelOption.TCP_NODELAY, conf.isUseTcpNoDelay()); bootstrap.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT); bootstrap.handler(new ChannelInitializer() { diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java index f007782f71350..f7bec63c5a800 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java @@ -58,7 +58,8 @@ public class ClientConfigurationData implements Serializable, Cloneable { private int maxLookupRequest = 50000; private int maxNumberOfRejectedRequestPerConnection = 50; private int keepAliveIntervalSeconds = 30; - + private int connectionTimeoutMs = 10000; + public ClientConfigurationData clone() { try { return (ClientConfigurationData) super.clone(); diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConnectionTimeoutTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConnectionTimeoutTest.java new file mode 100644 index 0000000000000..ffff6bb170f08 --- /dev/null +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConnectionTimeoutTest.java @@ -0,0 +1,62 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl; + +import io.netty.channel.ConnectTimeoutException; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; + +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.PulsarClientException; + +import org.testng.Assert; +import org.testng.annotations.Test; + +public class ConnectionTimeoutTest { + + // 192.0.2.0/24 is assigned for documentation, should be a deadend + final static String blackholeBroker = "pulsar://192.0.2.1:1234"; + + @Test + public void testLowTimeout() throws Exception { + long startNanos = System.nanoTime(); + try (PulsarClient client = PulsarClient.builder().serviceUrl(blackholeBroker) + .connectionTimeout(1, TimeUnit.MILLISECONDS).build()) { + client.newProducer().topic("foo").create(); + Assert.fail("Shouldn't be able to connect to anything"); + } catch (PulsarClientException pse) { + Assert.assertEquals(pse.getCause().getCause().getClass(), ConnectTimeoutException.class); + Assert.assertTrue((System.nanoTime() - startNanos) < TimeUnit.SECONDS.toNanos(3)); + } + } + + @Test + public void testHighTimeout() throws Exception { + try (PulsarClient client = PulsarClient.builder().serviceUrl(blackholeBroker) + .connectionTimeout(24, TimeUnit.HOURS).build()) { + CompletableFuture> f = client.newProducer().topic("foo").createAsync(); + + Thread.sleep(12000); // sleep for 12 seconds (default timeout is 10) + + Assert.assertFalse(f.isDone()); + } + } +} From 11db0dcc1514bcaf1657eabeb10974c15fe9de98 Mon Sep 17 00:00:00 2001 From: Ivan Kelly Date: Tue, 30 Oct 2018 15:30:59 +0100 Subject: [PATCH 2/2] Add since and remove long timeout test --- .../pulsar/client/api/ClientBuilder.java | 1 + .../client/impl/ConnectionTimeoutTest.java | 33 +++++++++---------- 2 files changed, 16 insertions(+), 18 deletions(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java index 9fa2f91f6749b..416fc6cfa23d7 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java @@ -314,6 +314,7 @@ ClientBuilder authentication(String authPluginClassName, Map aut * Set the duration of time to wait for a connection to a broker to be established. If the duration * passes without a response from the broker, the connection attempt is dropped. * + * @since 2.3.0 * @param duration the duration to wait * @param unit the time unit in which the duration is defined */ diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConnectionTimeoutTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConnectionTimeoutTest.java index ffff6bb170f08..f81ada0bc3c58 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConnectionTimeoutTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConnectionTimeoutTest.java @@ -38,25 +38,22 @@ public class ConnectionTimeoutTest { @Test public void testLowTimeout() throws Exception { long startNanos = System.nanoTime(); - try (PulsarClient client = PulsarClient.builder().serviceUrl(blackholeBroker) - .connectionTimeout(1, TimeUnit.MILLISECONDS).build()) { - client.newProducer().topic("foo").create(); - Assert.fail("Shouldn't be able to connect to anything"); - } catch (PulsarClientException pse) { - Assert.assertEquals(pse.getCause().getCause().getClass(), ConnectTimeoutException.class); - Assert.assertTrue((System.nanoTime() - startNanos) < TimeUnit.SECONDS.toNanos(3)); - } - } - - @Test - public void testHighTimeout() throws Exception { - try (PulsarClient client = PulsarClient.builder().serviceUrl(blackholeBroker) - .connectionTimeout(24, TimeUnit.HOURS).build()) { - CompletableFuture> f = client.newProducer().topic("foo").createAsync(); - - Thread.sleep(12000); // sleep for 12 seconds (default timeout is 10) - Assert.assertFalse(f.isDone()); + try (PulsarClient clientLow = PulsarClient.builder().serviceUrl(blackholeBroker) + .connectionTimeout(1, TimeUnit.MILLISECONDS).build(); + PulsarClient clientDefault = PulsarClient.builder().serviceUrl(blackholeBroker).build()) { + CompletableFuture lowFuture = clientLow.newProducer().topic("foo").createAsync(); + CompletableFuture defaultFuture = clientDefault.newProducer().topic("foo").createAsync(); + + try { + lowFuture.get(); + Assert.fail("Shouldn't be able to connect to anything"); + } catch (Exception e) { + Assert.assertFalse(defaultFuture.isDone()); + Assert.assertEquals(e.getCause().getCause().getCause().getClass(), + ConnectTimeoutException.class); + Assert.assertTrue((System.nanoTime() - startNanos) < TimeUnit.SECONDS.toNanos(3)); + } } } }