diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ControlledClusterFailover.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ControlledClusterFailover.java index 032069786a416..7819f051e7581 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ControlledClusterFailover.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ControlledClusterFailover.java @@ -44,6 +44,7 @@ import org.apache.pulsar.client.api.ControlledClusterFailoverBuilder; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.ServiceUrlProvider; +import org.apache.pulsar.client.impl.conf.ClientConfigurationData; import org.apache.pulsar.client.util.ExecutorProvider; import org.apache.pulsar.common.util.ObjectMapperFactory; import org.asynchttpclient.AsyncHttpClient; @@ -119,6 +120,15 @@ public void initialize(PulsarClient client) { .addHeader("Accept", "application/json"); headers.forEach(requestBuilder::addHeader); + // Initialize currentControlledConfiguration from client's current configuration + // to avoid unnecessary reconnection on first scheduled check when the configuration hasn't changed + ClientConfigurationData conf = pulsarClient.getConfiguration(); + this.currentControlledConfiguration = new ControlledConfiguration(); + this.currentControlledConfiguration.setServiceUrl(currentPulsarServiceUrl); + this.currentControlledConfiguration.setTlsTrustCertsFilePath(conf.getTlsTrustCertsFilePath()); + this.currentControlledConfiguration.setAuthPluginClassName(conf.getAuthPluginClassName()); + this.currentControlledConfiguration.setAuthParamsString(conf.getAuthParams()); + // start to check service url every 30 seconds this.executor.scheduleAtFixedRate(catchingAndLoggingThrowables(() -> { ControlledConfiguration controlledConfiguration = null; diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ControlledClusterFailoverTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ControlledClusterFailoverTest.java index ca4aca6c329ed..86b2fa7cb4f9b 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ControlledClusterFailoverTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ControlledClusterFailoverTest.java @@ -26,6 +26,7 @@ import lombok.Cleanup; import org.apache.pulsar.client.api.Authentication; import org.apache.pulsar.client.api.ServiceUrlProvider; +import org.apache.pulsar.client.impl.conf.ClientConfigurationData; import org.asynchttpclient.Request; import org.awaitility.Awaitility; import org.mockito.Mockito; @@ -57,7 +58,9 @@ public void testBuildControlledClusterFailoverInstance() throws Exception { PulsarClientImpl pulsarClient = mock(PulsarClientImpl.class); ConnectionPool connectionPool = mock(ConnectionPool.class); + ClientConfigurationData clientConf = new ClientConfigurationData(); when(pulsarClient.getCnxPool()).thenReturn(connectionPool); + when(pulsarClient.getConfiguration()).thenReturn(clientConf); controlledClusterFailover.initialize(pulsarClient); Request request = controlledClusterFailover.getRequestBuilder().build(); @@ -97,7 +100,9 @@ public void testControlledClusterFailoverSwitch() throws Exception { ControlledClusterFailover controlledClusterFailover = Mockito.spy((ControlledClusterFailover) provider); PulsarClientImpl pulsarClient = mock(PulsarClientImpl.class); ConnectionPool connectionPool = mock(ConnectionPool.class); + ClientConfigurationData clientConf = new ClientConfigurationData(); when(pulsarClient.getCnxPool()).thenReturn(connectionPool); + when(pulsarClient.getConfiguration()).thenReturn(clientConf); controlledClusterFailover.initialize(pulsarClient);