diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/AutoClusterFailoverBuilder.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/AutoClusterFailoverBuilder.java new file mode 100644 index 0000000000000..f3c1de2227e36 --- /dev/null +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/AutoClusterFailoverBuilder.java @@ -0,0 +1,128 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.api; + +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import org.apache.pulsar.common.classification.InterfaceAudience; +import org.apache.pulsar.common.classification.InterfaceStability; + +/** + * {@link AutoClusterFailoverBuilder} is used to configure and create instance of {@link ServiceUrlProvider}. + * + * @since 2.10.0 + */ +@InterfaceAudience.Public +@InterfaceStability.Evolving +public interface AutoClusterFailoverBuilder { + + @SuppressWarnings("checkstyle:javadoctype") + enum FailoverPolicy { + ORDER + } + /** + * Set the primary service url. + * + * @param primary + * @return + */ + AutoClusterFailoverBuilder primary(String primary); + + /** + * Set the secondary service url. + * + * @param secondary + * @return + */ + AutoClusterFailoverBuilder secondary(List secondary); + + /** + * Set secondary choose policy. The default secondary choose policy is `ORDER`. + * @param policy + * @return + */ + AutoClusterFailoverBuilder failoverPolicy(FailoverPolicy policy); + + /** + * Set secondary authentication. + * + * @param authentication + * @return + */ + AutoClusterFailoverBuilder secondaryAuthentication(Map authentication); + + /** + * Set secondary tlsTrustCertsFilePath. + * + * @param tlsTrustCertsFilePath + * @return + */ + AutoClusterFailoverBuilder secondaryTlsTrustCertsFilePath(Map tlsTrustCertsFilePath); + + /** + * Set secondary tlsTrustStorePath. + * + * @param tlsTrustStorePath + * @return + */ + AutoClusterFailoverBuilder secondaryTlsTrustStorePath(Map tlsTrustStorePath); + + /** + * Set secondary tlsTrustStorePassword. + * + * @param tlsTrustStorePassword + * @return + */ + AutoClusterFailoverBuilder secondaryTlsTrustStorePassword(Map tlsTrustStorePassword); + /** + * Set the switch failoverDelay. When one cluster failed longer than failoverDelay, it will trigger cluster switch. + * + * @param failoverDelay + * @param timeUnit + * @return + */ + AutoClusterFailoverBuilder failoverDelay(long failoverDelay, TimeUnit timeUnit); + + /** + * Set the switchBackDelay. When switched to the secondary cluster, and after the primary cluster comes back, + * it will wait for switchBackDelay to switch back to the primary cluster. + * + * @param switchBackDelay + * @param timeUnit + * @return + */ + AutoClusterFailoverBuilder switchBackDelay(long switchBackDelay, TimeUnit timeUnit); + + /** + * Set the checkInterval for probe. + * + * @param interval + * @param timeUnit + * @return + */ + AutoClusterFailoverBuilder checkInterval(long interval, TimeUnit timeUnit); + + /** + * Build the ServiceUrlProvider instance. + * + * @return + */ + ServiceUrlProvider build(); +} diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ControlledClusterFailoverBuilder.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ControlledClusterFailoverBuilder.java new file mode 100644 index 0000000000000..21f745bb8cdb8 --- /dev/null +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ControlledClusterFailoverBuilder.java @@ -0,0 +1,73 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.api; + +import java.io.IOException; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import org.apache.pulsar.common.classification.InterfaceAudience; +import org.apache.pulsar.common.classification.InterfaceStability; + +/** + * {@link ControlledClusterFailoverBuilder} is used to configure and create instance of {@link ServiceUrlProvider}. + * + * @since 2.10.0 + */ +@InterfaceAudience.Public +@InterfaceStability.Evolving +public interface ControlledClusterFailoverBuilder { + /** + * Set default service url. + * + * @param serviceUrl + * @return + */ + ControlledClusterFailoverBuilder defaultServiceUrl(String serviceUrl); + + /** + * Set the service url provider. ServiceUrlProvider will fetch serviceUrl from urlProvider periodically. + * + * @param urlProvider + * @return + */ + ControlledClusterFailoverBuilder urlProvider(String urlProvider); + + /** + * Set the service url provider header to authenticate provider service. + * @param header + * @return + */ + ControlledClusterFailoverBuilder urlProviderHeader(Map header); + + /** + * Set the probe check interval. + * @param interval + * @param timeUnit + * @return + */ + ControlledClusterFailoverBuilder checkInterval(long interval, TimeUnit timeUnit); + + /** + * Build the ServiceUrlProvider instance. + * + * @return + * @throws IOException + */ + ServiceUrlProvider build() throws IOException; +} diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ServiceUrlProvider.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ServiceUrlProvider.java index d4305ac2c3dd3..3d3d799d2dc5b 100644 --- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ServiceUrlProvider.java +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ServiceUrlProvider.java @@ -31,7 +31,7 @@ */ @InterfaceAudience.Public @InterfaceStability.Stable -public interface ServiceUrlProvider { +public interface ServiceUrlProvider extends AutoCloseable { /** * Initialize the service url provider with Pulsar client instance. @@ -51,4 +51,12 @@ public interface ServiceUrlProvider { */ String getServiceUrl(); + /** + * Close the resource that the provider allocated. + * + */ + @Override + default void close() { + // do nothing + } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AutoClusterFailover.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AutoClusterFailover.java new file mode 100644 index 0000000000000..726f9b1780768 --- /dev/null +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AutoClusterFailover.java @@ -0,0 +1,368 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl; + +import static org.apache.pulsar.common.util.Runnables.catchingAndLoggingThrowables; +import com.google.common.base.Strings; +import io.netty.util.concurrent.DefaultThreadFactory; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.Socket; +import java.net.URI; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import lombok.Data; +import lombok.NonNull; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.client.api.Authentication; +import org.apache.pulsar.client.api.AutoClusterFailoverBuilder; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.ServiceUrlProvider; +import org.apache.pulsar.client.impl.conf.ClientConfigurationData; +import org.apache.pulsar.common.net.ServiceURI; + +@Slf4j +@Data +public class AutoClusterFailover implements ServiceUrlProvider { + private PulsarClientImpl pulsarClient; + private volatile String currentPulsarServiceUrl; + private final String primary; + private final List secondary; + private final AutoClusterFailoverBuilder.FailoverPolicy failoverPolicy; + private Authentication primaryAuthentication; + private final Map secondaryAuthentications; + private String primaryTlsTrustCertsFilePath; + private final Map secondaryTlsTrustCertsFilePaths; + private String primaryTlsTrustStorePath; + private Map secondaryTlsTrustStorePaths; + private String primaryTlsTrustStorePassword; + private Map secondaryTlsTrustStorePasswords; + private final long failoverDelayNs; + private final long switchBackDelayNs; + private final ScheduledExecutorService executor; + private long recoverTimestamp; + private long failedTimestamp; + private final long intervalMs; + private static final int TIMEOUT = 30_000; + + private AutoClusterFailover(AutoClusterFailoverBuilderImpl builder) { + this.primary = builder.primary; + this.secondary = builder.secondary; + this.failoverPolicy = builder.failoverPolicy; + this.secondaryAuthentications = builder.secondaryAuthentications; + this.secondaryTlsTrustCertsFilePaths = builder.secondaryTlsTrustCertsFilePaths; + this.secondaryTlsTrustStorePaths = builder.secondaryTlsTrustStorePaths; + this.secondaryTlsTrustStorePasswords = builder.secondaryTlsTrustStorePasswords; + this.failoverDelayNs = builder.failoverDelayNs; + this.switchBackDelayNs = builder.switchBackDelayNs; + this.currentPulsarServiceUrl = builder.primary; + this.recoverTimestamp = -1; + this.failedTimestamp = -1; + this.intervalMs = builder.checkIntervalMs; + this.executor = Executors.newSingleThreadScheduledExecutor( + new DefaultThreadFactory("pulsar-service-provider")); + } + + @Override + public void initialize(PulsarClient client) { + this.pulsarClient = (PulsarClientImpl) client; + ClientConfigurationData config = pulsarClient.getConfiguration(); + if (config != null) { + this.primaryAuthentication = config.getAuthentication(); + this.primaryTlsTrustCertsFilePath = config.getTlsTrustCertsFilePath(); + this.primaryTlsTrustStorePath = config.getTlsTrustStorePath(); + this.primaryTlsTrustStorePassword = config.getTlsTrustStorePassword(); + } + + // start to probe primary cluster active or not + this.executor.scheduleAtFixedRate(catchingAndLoggingThrowables(() -> { + if (currentPulsarServiceUrl.equals(primary)) { + // current service url is primary, probe whether it is down + probeAndUpdateServiceUrl(secondary, secondaryAuthentications, secondaryTlsTrustCertsFilePaths, + secondaryTlsTrustStorePaths, secondaryTlsTrustStorePasswords); + } else { + // current service url is secondary, probe whether it is down + probeAndUpdateServiceUrl(primary, primaryAuthentication, primaryTlsTrustCertsFilePath, + primaryTlsTrustStorePath, primaryTlsTrustStorePassword); + // secondary cluster is up, check whether need to switch back to primary or not + probeAndCheckSwitchBack(primary, primaryAuthentication, primaryTlsTrustCertsFilePath, + primaryTlsTrustStorePath, primaryTlsTrustStorePassword); + } + }), intervalMs, intervalMs, TimeUnit.MILLISECONDS); + + } + + @Override + public String getServiceUrl() { + return this.currentPulsarServiceUrl; + } + + @Override + public void close() { + this.executor.shutdown(); + } + + boolean probeAvailable(String url) { + try { + URI uri = ServiceURI.create(url).getUri(); + Socket socket = new Socket(); + socket.connect(new InetSocketAddress(uri.getHost(), uri.getPort()), TIMEOUT); + socket.close(); + return true; + } catch (Exception e) { + log.warn("Failed to probe available, url: {}", url, e); + return false; + } + } + + private static long nanosToMillis(long nanos) { + return Math.max(0L, Math.round(nanos / 1_000_000.0d)); + } + + private void updateServiceUrl(String target, + Authentication authentication, + String tlsTrustCertsFilePath, + String tlsTrustStorePath, + String tlsTrustStorePassword) { + try { + if (!Strings.isNullOrEmpty(tlsTrustCertsFilePath)) { + pulsarClient.updateTlsTrustCertsFilePath(tlsTrustCertsFilePath); + } + + if (authentication != null) { + pulsarClient.updateAuthentication(authentication); + } + + if (!Strings.isNullOrEmpty(tlsTrustStorePath)) { + pulsarClient.updateTlsTrustStorePathAndPassword(tlsTrustStorePath, tlsTrustStorePassword); + } + + pulsarClient.updateServiceUrl(target); + currentPulsarServiceUrl = target; + } catch (IOException e) { + log.error("Current Pulsar service is {}, " + + "failed to switch back to {} ", currentPulsarServiceUrl, target, e); + } + } + + private void probeAndUpdateServiceUrl(List targetServiceUrls, + Map authentications, + Map tlsTrustCertsFilePaths, + Map tlsTrustStorePaths, + Map tlsTrustStorePasswords) { + if (probeAvailable(currentPulsarServiceUrl)) { + failedTimestamp = -1; + return; + } + + long currentTimestamp = System.nanoTime(); + if (failedTimestamp == -1) { + failedTimestamp = currentTimestamp; + } else if (currentTimestamp - failedTimestamp >= failoverDelayNs) { + for (String targetServiceUrl : targetServiceUrls) { + if (probeAvailable(targetServiceUrl)) { + log.info("Current Pulsar service is {}, it has been down for {} ms, " + + "switch to the service {}. The current service down at {}", + currentPulsarServiceUrl, nanosToMillis(currentTimestamp - failedTimestamp), + targetServiceUrl, failedTimestamp); + updateServiceUrl(targetServiceUrl, + authentications != null ? authentications.get(targetServiceUrl) : null, + tlsTrustCertsFilePaths != null ? tlsTrustCertsFilePaths.get(targetServiceUrl) : null, + tlsTrustStorePaths != null ? tlsTrustStorePaths.get(targetServiceUrl) : null, + tlsTrustStorePasswords != null ? tlsTrustStorePasswords.get(targetServiceUrl) : null); + failedTimestamp = -1; + break; + } else { + log.warn("Current Pulsar service is {}, it has been down for {} ms. " + + "Failed to switch to service {}, " + + "because it is not available, continue to probe next pulsar service.", + currentPulsarServiceUrl, nanosToMillis(currentTimestamp - failedTimestamp), targetServiceUrl); + } + } + } + } + + private void probeAndUpdateServiceUrl(String targetServiceUrl, + Authentication authentication, + String tlsTrustCertsFilePath, + String tlsTrustStorePath, + String tlsTrustStorePassword) { + if (probeAvailable(currentPulsarServiceUrl)) { + failedTimestamp = -1; + return; + } + + long currentTimestamp = System.nanoTime(); + if (failedTimestamp == -1) { + failedTimestamp = currentTimestamp; + } else if (currentTimestamp - failedTimestamp >= failoverDelayNs) { + if (probeAvailable(targetServiceUrl)) { + log.info("Current Pulsar service is {}, it has been down for {} ms, " + + "switch to the service {}. The current service down at {}", + currentPulsarServiceUrl, nanosToMillis(currentTimestamp - failedTimestamp), + targetServiceUrl, failedTimestamp); + updateServiceUrl(targetServiceUrl, authentication, tlsTrustCertsFilePath, + tlsTrustStorePath, tlsTrustStorePassword); + failedTimestamp = -1; + } else { + log.error("Current Pulsar service is {}, it has been down for {} ms. " + + "Failed to switch to service {}, " + + "because it is not available", + currentPulsarServiceUrl, nanosToMillis(currentTimestamp - failedTimestamp), + targetServiceUrl); + } + } + } + + private void probeAndCheckSwitchBack(String target, + Authentication authentication, + String tlsTrustCertsFilePath, + String tlsTrustStorePath, + String tlsTrustStorePassword) { + long currentTimestamp = System.nanoTime(); + if (!probeAvailable(target)) { + recoverTimestamp = -1; + return; + } + + if (recoverTimestamp == -1) { + recoverTimestamp = currentTimestamp; + } else if (currentTimestamp - recoverTimestamp >= switchBackDelayNs) { + log.info("Current Pulsar service is secondary: {}, " + + "the primary service: {} has been recover for {} ms, " + + "switch back to the primary service", + currentPulsarServiceUrl, target, nanosToMillis(currentTimestamp - recoverTimestamp)); + updateServiceUrl(target, authentication, tlsTrustCertsFilePath, tlsTrustStorePath, tlsTrustStorePassword); + recoverTimestamp = -1; + } + } + + public static class AutoClusterFailoverBuilderImpl implements AutoClusterFailoverBuilder { + private String primary; + private List secondary; + private Map secondaryAuthentications = null; + private Map secondaryTlsTrustCertsFilePaths = null; + private Map secondaryTlsTrustStorePaths = null; + private Map secondaryTlsTrustStorePasswords = null; + private FailoverPolicy failoverPolicy = FailoverPolicy.ORDER; + private long failoverDelayNs; + private long switchBackDelayNs; + private long checkIntervalMs = 30_000; + + @Override + public AutoClusterFailoverBuilder primary(@NonNull String primary) { + this.primary = primary; + return this; + } + + @Override + public AutoClusterFailoverBuilder secondary(@NonNull List secondary) { + this.secondary = secondary; + return this; + } + + @Override + public AutoClusterFailoverBuilder failoverPolicy(@NonNull FailoverPolicy policy) { + this.failoverPolicy = policy; + return this; + } + + @Override + public AutoClusterFailoverBuilder secondaryAuthentication(Map authentication) { + this.secondaryAuthentications = authentication; + return this; + } + + @Override + public AutoClusterFailoverBuilder secondaryTlsTrustCertsFilePath(Map tlsTrustCertsFilePath) { + this.secondaryTlsTrustCertsFilePaths = tlsTrustCertsFilePath; + return this; + } + + @Override + public AutoClusterFailoverBuilder secondaryTlsTrustStorePath(Map tlsTrustStorePath) { + this.secondaryTlsTrustStorePaths = tlsTrustStorePath; + return this; + } + + @Override + public AutoClusterFailoverBuilder secondaryTlsTrustStorePassword(Map tlsTrustStorePassword) { + this.secondaryTlsTrustStorePasswords = tlsTrustStorePassword; + return this; + } + + @Override + public AutoClusterFailoverBuilder failoverDelay(long failoverDelay, TimeUnit timeUnit) { + this.failoverDelayNs = timeUnit.toNanos(failoverDelay); + return this; + } + + @Override + public AutoClusterFailoverBuilder switchBackDelay(long switchBackDelay, TimeUnit timeUnit) { + this.switchBackDelayNs = timeUnit.toNanos(switchBackDelay); + return this; + } + + @Override + public AutoClusterFailoverBuilder checkInterval(long interval, TimeUnit timeUnit) { + this.checkIntervalMs = timeUnit.toMillis(interval); + return this; + } + + @Override + public ServiceUrlProvider build() { + Objects.requireNonNull(primary, "primary service url shouldn't be null"); + checkArgument(secondary != null && secondary.size() > 0, + "secondary cluster service url shouldn't be null and should set at least one"); + checkArgument(failoverDelayNs > 0, "failoverDelay should > 0"); + checkArgument(switchBackDelayNs > 0, "switchBackDelay should > 0"); + checkArgument(checkIntervalMs > 0, "checkInterval should > 0"); + int secondarySize = secondary.size(); + + checkArgument(secondaryAuthentications == null + || secondaryAuthentications.size() == secondarySize, + "secondaryAuthentication should be null or size equal with secondary url size"); + checkArgument(secondaryTlsTrustCertsFilePaths == null + || secondaryTlsTrustCertsFilePaths.size() == secondarySize, + "secondaryTlsTrustCertsFilePath should be null or size equal with secondary url size"); + checkArgument(secondaryTlsTrustStorePaths == null + || secondaryTlsTrustStorePaths.size() == secondarySize, + "secondaryTlsTrustStorePath should be null or size equal with secondary url size"); + checkArgument(secondaryTlsTrustStorePasswords == null + || secondaryTlsTrustStorePasswords.size() == secondarySize, + "secondaryTlsTrustStorePassword should be null or size equal with secondary url size"); + + return new AutoClusterFailover(this); + } + + public static void checkArgument(boolean expression, @NonNull Object errorMessage) { + if (!expression) { + throw new IllegalArgumentException(String.valueOf(errorMessage)); + } + } + } + + public static AutoClusterFailoverBuilder builder() { + return new AutoClusterFailoverBuilderImpl(); + } +} diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ControlledClusterFailover.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ControlledClusterFailover.java new file mode 100644 index 0000000000000..50060a25217de --- /dev/null +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ControlledClusterFailover.java @@ -0,0 +1,270 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl; + +import static org.apache.pulsar.common.util.Runnables.catchingAndLoggingThrowables; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Strings; +import io.netty.handler.codec.http.HttpRequest; +import io.netty.handler.codec.http.HttpResponse; +import io.netty.util.concurrent.DefaultThreadFactory; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.nio.charset.StandardCharsets; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import lombok.Data; +import lombok.NonNull; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.PulsarVersion; +import org.apache.pulsar.client.api.Authentication; +import org.apache.pulsar.client.api.AuthenticationFactory; +import org.apache.pulsar.client.api.ControlledClusterFailoverBuilder; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.ServiceUrlProvider; +import org.apache.pulsar.common.util.ObjectMapperFactory; +import org.asynchttpclient.AsyncHttpClient; +import org.asynchttpclient.AsyncHttpClientConfig; +import org.asynchttpclient.BoundRequestBuilder; +import org.asynchttpclient.DefaultAsyncHttpClient; +import org.asynchttpclient.DefaultAsyncHttpClientConfig; +import org.asynchttpclient.Request; +import org.asynchttpclient.Response; +import org.asynchttpclient.channel.DefaultKeepAliveStrategy; +import org.checkerframework.checker.nullness.qual.Nullable; + +@Slf4j +public class ControlledClusterFailover implements ServiceUrlProvider { + private static final int DEFAULT_CONNECT_TIMEOUT_IN_SECONDS = 10; + private static final int DEFAULT_READ_TIMEOUT_IN_SECONDS = 30; + private static final int DEFAULT_MAX_REDIRECTS = 20; + + private PulsarClientImpl pulsarClient; + private volatile String currentPulsarServiceUrl; + private volatile ControlledConfiguration currentControlledConfiguration; + private final ScheduledExecutorService executor; + private final long interval; + private ObjectMapper objectMapper = null; + private final AsyncHttpClient httpClient; + private final BoundRequestBuilder requestBuilder; + + private ControlledClusterFailover(ControlledClusterFailoverBuilderImpl builder) throws IOException { + this.currentPulsarServiceUrl = builder.defaultServiceUrl; + this.interval = builder.interval; + this.executor = Executors.newSingleThreadScheduledExecutor( + new DefaultThreadFactory("pulsar-service-provider")); + + this.httpClient = buildHttpClient(); + this.requestBuilder = httpClient.prepareGet(builder.urlProvider) + .addHeader("Accept", "application/json"); + + if (builder.header != null && !builder.header.isEmpty()) { + builder.header.forEach(requestBuilder::addHeader); + } + } + + private AsyncHttpClient buildHttpClient() { + DefaultAsyncHttpClientConfig.Builder confBuilder = new DefaultAsyncHttpClientConfig.Builder(); + confBuilder.setFollowRedirect(true); + confBuilder.setMaxRedirects(DEFAULT_MAX_REDIRECTS); + confBuilder.setConnectTimeout(DEFAULT_CONNECT_TIMEOUT_IN_SECONDS * 1000); + confBuilder.setReadTimeout(DEFAULT_READ_TIMEOUT_IN_SECONDS * 1000); + confBuilder.setUserAgent(String.format("Pulsar-Java-v%s", PulsarVersion.getVersion())); + confBuilder.setKeepAliveStrategy(new DefaultKeepAliveStrategy() { + @Override + public boolean keepAlive(InetSocketAddress remoteAddress, Request ahcRequest, + HttpRequest request, HttpResponse response) { + // Close connection upon a server error or per HTTP spec + return (response.status().code() / 100 != 5) + && super.keepAlive(remoteAddress, ahcRequest, request, response); + } + }); + AsyncHttpClientConfig config = confBuilder.build(); + return new DefaultAsyncHttpClient(config); + } + + @Override + public void initialize(PulsarClient client) { + this.pulsarClient = (PulsarClientImpl) client; + + // start to check service url every 30 seconds + this.executor.scheduleAtFixedRate(catchingAndLoggingThrowables(() -> { + ControlledConfiguration controlledConfiguration = null; + try { + controlledConfiguration = fetchControlledConfiguration(); + if (controlledConfiguration != null + && !Strings.isNullOrEmpty(controlledConfiguration.getServiceUrl()) + && !controlledConfiguration.equals(currentControlledConfiguration)) { + log.info("Switch Pulsar service url from {} to {}", + currentControlledConfiguration, controlledConfiguration.toString()); + + Authentication authentication = null; + if (!Strings.isNullOrEmpty(controlledConfiguration.authPluginClassName) + && !Strings.isNullOrEmpty(controlledConfiguration.getAuthParamsString())) { + authentication = AuthenticationFactory.create(controlledConfiguration.getAuthPluginClassName(), + controlledConfiguration.getAuthParamsString()); + } + + String tlsTrustCertsFilePath = controlledConfiguration.getTlsTrustCertsFilePath(); + String serviceUrl = controlledConfiguration.getServiceUrl(); + + if (authentication != null) { + pulsarClient.updateAuthentication(authentication); + } + + if (!Strings.isNullOrEmpty(tlsTrustCertsFilePath)) { + pulsarClient.updateTlsTrustCertsFilePath(tlsTrustCertsFilePath); + } + + pulsarClient.updateServiceUrl(serviceUrl); + currentPulsarServiceUrl = serviceUrl; + currentControlledConfiguration = controlledConfiguration; + } + } catch (IOException e) { + log.error("Failed to switch new Pulsar url, current: {}, new: {}", + currentControlledConfiguration, controlledConfiguration, e); + } + }), interval, interval, TimeUnit.MILLISECONDS); + } + + public String getCurrentPulsarServiceUrl() { + return this.currentPulsarServiceUrl; + } + + @VisibleForTesting + protected BoundRequestBuilder getRequestBuilder() { + return this.requestBuilder; + } + + protected ControlledConfiguration fetchControlledConfiguration() throws IOException { + // call the service to get service URL + try { + Response response = requestBuilder.execute().get(); + int statusCode = response.getStatusCode(); + if (statusCode == 200) { + String content = response.getResponseBody(StandardCharsets.UTF_8); + return getObjectMapper().readValue(content, ControlledConfiguration.class); + } + log.warn("Failed to fetch controlled configuration, status code: {}", statusCode); + } catch (InterruptedException | ExecutionException e) { + log.error("Failed to fetch controlled configuration ", e); + } + + return null; + } + + private ObjectMapper getObjectMapper() { + if (objectMapper == null) { + objectMapper = new ObjectMapper(); + } + return objectMapper; + } + + @Data + protected static class ControlledConfiguration { + private String serviceUrl; + private String tlsTrustCertsFilePath; + + private String authPluginClassName; + private String authParamsString; + + public String toJson() { + ObjectMapper objectMapper = ObjectMapperFactory.getThreadLocal(); + try { + return objectMapper.writeValueAsString(this); + } catch (JsonProcessingException e) { + log.warn("Failed to write as json. ", e); + return null; + } + } + } + + @Override + public String getServiceUrl() { + return this.currentPulsarServiceUrl; + } + + @Override + public void close() { + this.executor.shutdown(); + if (httpClient != null) { + try { + httpClient.close(); + } catch (IOException e) { + log.error("Failed to close http client."); + } + } + } + + public static class ControlledClusterFailoverBuilderImpl implements ControlledClusterFailoverBuilder { + private String defaultServiceUrl; + private String urlProvider; + private Map header = null; + private long interval = 30_000; + + @Override + public ControlledClusterFailoverBuilder defaultServiceUrl(@NonNull String serviceUrl) { + this.defaultServiceUrl = serviceUrl; + return this; + } + + @Override + public ControlledClusterFailoverBuilder urlProvider(@NonNull String urlProvider) { + this.urlProvider = urlProvider; + return this; + } + + @Override + public ControlledClusterFailoverBuilder urlProviderHeader(Map header) { + this.header = header; + return this; + } + + @Override + public ControlledClusterFailoverBuilder checkInterval(long interval, @NonNull TimeUnit timeUnit) { + this.interval = timeUnit.toMillis(interval); + return this; + } + + @Override + public ServiceUrlProvider build() throws IOException { + Objects.requireNonNull(defaultServiceUrl, "default service url shouldn't be null"); + Objects.requireNonNull(urlProvider, "urlProvider shouldn't be null"); + checkArgument(interval > 0, "checkInterval should > 0"); + + return new ControlledClusterFailover(this); + } + + public static void checkArgument(boolean expression, @Nullable Object errorMessage) { + if (!expression) { + throw new IllegalArgumentException(String.valueOf(errorMessage)); + } + } + } + + public static ControlledClusterFailoverBuilder builder() { + return new ControlledClusterFailoverBuilderImpl(); + } +} diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java index cf0a7509cbc96..99278dd1614a4 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java @@ -27,6 +27,7 @@ import io.netty.util.HashedWheelTimer; import io.netty.util.Timer; import io.netty.util.concurrent.DefaultThreadFactory; +import java.io.IOException; import java.time.Clock; import java.util.ArrayList; import java.util.Collections; @@ -48,6 +49,7 @@ import lombok.Builder; import lombok.Getter; import org.apache.commons.lang3.StringUtils; +import org.apache.pulsar.client.api.Authentication; import org.apache.pulsar.client.api.AuthenticationFactory; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.ConsumerBuilder; @@ -751,6 +753,12 @@ public void shutdown() throws PulsarClientException { throwable = t; } } + + // close the service url provider allocated resource. + if (conf != null && conf.getServiceUrlProvider() != null) { + conf.getServiceUrlProvider().close(); + } + try { // Shutting down eventLoopGroup separately because in some cases, cnxPool might be using different // eventLoopGroup. @@ -857,6 +865,26 @@ public synchronized void updateServiceUrl(String serviceUrl) throws PulsarClient cnxPool.closeAllConnections(); } + public void updateAuthentication(Authentication authentication) throws IOException { + log.info("Updating authentication to {}", authentication); + if (conf.getAuthentication() != null) { + conf.getAuthentication().close(); + } + conf.setAuthentication(authentication); + conf.getAuthentication().start(); + } + + public void updateTlsTrustCertsFilePath(String tlsTrustCertsFilePath) { + log.info("Updating tlsTrustCertsFilePath to {}", tlsTrustCertsFilePath); + conf.setTlsTrustCertsFilePath(tlsTrustCertsFilePath); + } + + public void updateTlsTrustStorePathAndPassword(String tlsTrustStorePath, String tlsTrustStorePassword) { + log.info("Updating tlsTrustStorePath to {}, tlsTrustStorePassword to *****", tlsTrustStorePath); + conf.setTlsTrustStorePath(tlsTrustStorePath); + conf.setTlsTrustStorePassword(tlsTrustStorePassword); + } + public CompletableFuture getConnection(final String topic) { TopicName topicName = TopicName.get(topic); return lookup.getBroker(topicName) diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AutoClusterFailoverTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AutoClusterFailoverTest.java new file mode 100644 index 0000000000000..573ddd8086da7 --- /dev/null +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AutoClusterFailoverTest.java @@ -0,0 +1,248 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl; + +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import org.apache.pulsar.client.api.Authentication; +import org.apache.pulsar.client.api.AuthenticationFactory; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.ServiceUrlProvider; +import org.apache.pulsar.client.impl.conf.ClientConfigurationData; +import org.awaitility.Awaitility; +import org.mockito.Mockito; +import org.powermock.api.mockito.PowerMockito; +import org.testng.Assert; +import org.testng.annotations.Test; + +@Test(groups = "broker-impl") +public class AutoClusterFailoverTest { + @Test + public void testBuildAutoClusterFailoverInstance() throws PulsarClientException { + String primary = "pulsar://localhost:6650"; + String secondary = "pulsar://localhost:6651"; + long failoverDelay = 30; + long switchBackDelay = 60; + long checkInterval = 1_000; + ServiceUrlProvider provider = AutoClusterFailover.builder() + .primary(primary) + .secondary(Collections.singletonList(secondary)) + .failoverDelay(failoverDelay, TimeUnit.SECONDS) + .switchBackDelay(switchBackDelay, TimeUnit.SECONDS) + .checkInterval(checkInterval, TimeUnit.MILLISECONDS) + .build(); + + AutoClusterFailover autoClusterFailover = (AutoClusterFailover) provider; + Assert.assertTrue(provider instanceof AutoClusterFailover); + Assert.assertEquals(primary, provider.getServiceUrl()); + Assert.assertEquals(primary, autoClusterFailover.getPrimary()); + Assert.assertEquals(secondary, autoClusterFailover.getSecondary().get(0)); + Assert.assertEquals(TimeUnit.SECONDS.toNanos(failoverDelay), autoClusterFailover.getFailoverDelayNs()); + Assert.assertEquals(TimeUnit.SECONDS.toNanos(switchBackDelay), autoClusterFailover.getSwitchBackDelayNs()); + Assert.assertEquals(checkInterval, autoClusterFailover.getIntervalMs()); + Assert.assertNull(autoClusterFailover.getPrimaryTlsTrustCertsFilePath()); + Assert.assertNull(autoClusterFailover.getPrimaryAuthentication()); + Assert.assertNull(autoClusterFailover.getSecondaryAuthentications()); + Assert.assertNull(autoClusterFailover.getSecondaryTlsTrustCertsFilePaths()); + + String primaryTlsTrustCertsFilePath = "primary/path"; + String secondaryTlsTrustCertsFilePath = "primary/path"; + Authentication primaryAuthentication = AuthenticationFactory.create( + "org.apache.pulsar.client.impl.auth.AuthenticationTls", + "tlsCertFile:/path/to/primary-my-role.cert.pem," + + "tlsKeyFile:/path/to/primary-my-role.key-pk8.pem"); + + Authentication secondaryAuthentication = AuthenticationFactory.create( + "org.apache.pulsar.client.impl.auth.AuthenticationTls", + "tlsCertFile:/path/to/secondary-my-role.cert.pem," + + "tlsKeyFile:/path/to/secondary-role.key-pk8.pem"); + Map secondaryTlsTrustCertsFilePaths = new HashMap<>(); + secondaryTlsTrustCertsFilePaths.put(secondary, secondaryTlsTrustCertsFilePath); + + Map secondaryAuthentications = new HashMap<>(); + secondaryAuthentications.put(secondary, secondaryAuthentication); + + ServiceUrlProvider provider1 = AutoClusterFailover.builder() + .primary(primary) + .secondary(Collections.singletonList(secondary)) + .failoverDelay(failoverDelay, TimeUnit.SECONDS) + .switchBackDelay(switchBackDelay, TimeUnit.SECONDS) + .checkInterval(checkInterval, TimeUnit.MILLISECONDS) + .secondaryTlsTrustCertsFilePath(secondaryTlsTrustCertsFilePaths) + .secondaryAuthentication(secondaryAuthentications) + .build(); + + AutoClusterFailover autoClusterFailover1 = (AutoClusterFailover) provider1; + Assert.assertEquals(secondaryTlsTrustCertsFilePath, + autoClusterFailover1.getSecondaryTlsTrustCertsFilePaths().get(secondary)); + Assert.assertEquals(secondaryAuthentication, autoClusterFailover1.getSecondaryAuthentications().get(secondary)); + } + + @Test + public void testAutoClusterFailoverSwitchWithoutAuthentication() { + String primary = "pulsar://localhost:6650"; + String secondary = "pulsar://localhost:6651"; + long failoverDelay = 1; + long switchBackDelay = 1; + long checkInterval = 1_000; + + ClientConfigurationData configurationData = new ClientConfigurationData(); + + ServiceUrlProvider provider = AutoClusterFailover.builder() + .primary(primary) + .secondary(Collections.singletonList(secondary)) + .failoverDelay(failoverDelay, TimeUnit.SECONDS) + .switchBackDelay(switchBackDelay, TimeUnit.SECONDS) + .checkInterval(checkInterval, TimeUnit.MILLISECONDS) + .build(); + + AutoClusterFailover autoClusterFailover = Mockito.spy((AutoClusterFailover) provider); + PulsarClientImpl pulsarClient = PowerMockito.mock(PulsarClientImpl.class); + Mockito.doReturn(false).when(autoClusterFailover).probeAvailable(primary); + Mockito.doReturn(true).when(autoClusterFailover).probeAvailable(secondary); + Mockito.doReturn(configurationData).when(pulsarClient).getConfiguration(); + + autoClusterFailover.initialize(pulsarClient); + + Awaitility.await().untilAsserted(() -> + Assert.assertEquals(secondary, autoClusterFailover.getServiceUrl())); + + // primary cluster came back + Mockito.doReturn(true).when(autoClusterFailover).probeAvailable(primary); + Awaitility.await().untilAsserted(() -> + Assert.assertEquals(primary, autoClusterFailover.getServiceUrl())); + } + + @Test + public void testAutoClusterFailoverSwitchWithAuthentication() throws IOException { + String primary = "pulsar+ssl://localhost:6651"; + String secondary = "pulsar+ssl://localhost:6661"; + long failoverDelay = 1; + long switchBackDelay = 1; + long checkInterval = 1_000; + String primaryTlsTrustCertsFilePath = "primary/path"; + String secondaryTlsTrustCertsFilePath = "primary/path"; + Authentication primaryAuthentication = AuthenticationFactory.create( + "org.apache.pulsar.client.impl.auth.AuthenticationTls", + "tlsCertFile:/path/to/primary-my-role.cert.pem," + + "tlsKeyFile:/path/to/primary-my-role.key-pk8.pem"); + + Authentication secondaryAuthentication = AuthenticationFactory.create( + "org.apache.pulsar.client.impl.auth.AuthenticationTls", + "tlsCertFile:/path/to/secondary-my-role.cert.pem," + + "tlsKeyFile:/path/to/secondary-role.key-pk8.pem"); + + Map secondaryTlsTrustCertsFilePaths = new HashMap<>(); + secondaryTlsTrustCertsFilePaths.put(secondary, secondaryTlsTrustCertsFilePath); + + Map secondaryAuthentications = new HashMap<>(); + secondaryAuthentications.put(secondary, secondaryAuthentication); + + ClientConfigurationData configurationData = new ClientConfigurationData(); + configurationData.setTlsTrustCertsFilePath(primaryTlsTrustCertsFilePath); + configurationData.setAuthentication(primaryAuthentication); + + ServiceUrlProvider provider = AutoClusterFailover.builder() + .primary(primary) + .secondary(Collections.singletonList(secondary)) + .checkInterval(checkInterval, TimeUnit.MILLISECONDS) + .failoverDelay(failoverDelay, TimeUnit.SECONDS) + .switchBackDelay(switchBackDelay, TimeUnit.SECONDS) + .secondaryTlsTrustCertsFilePath(secondaryTlsTrustCertsFilePaths) + .secondaryAuthentication(secondaryAuthentications) + .build(); + + AutoClusterFailover autoClusterFailover = Mockito.spy((AutoClusterFailover) provider); + PulsarClientImpl pulsarClient = PowerMockito.mock(PulsarClientImpl.class); + Mockito.doReturn(false).when(autoClusterFailover).probeAvailable(primary); + Mockito.doReturn(true).when(autoClusterFailover).probeAvailable(secondary); + Mockito.doReturn(configurationData).when(pulsarClient).getConfiguration(); + + autoClusterFailover.initialize(pulsarClient); + + Awaitility.await().untilAsserted(() -> + Assert.assertEquals(secondary, autoClusterFailover.getServiceUrl())); + Mockito.verify(pulsarClient, Mockito.atLeastOnce()).updateTlsTrustCertsFilePath(secondaryTlsTrustCertsFilePath); + Mockito.verify(pulsarClient, Mockito.atLeastOnce()).updateAuthentication(secondaryAuthentication); + + // primary cluster came back + Mockito.doReturn(true).when(autoClusterFailover).probeAvailable(primary); + Awaitility.await().untilAsserted(() -> + Assert.assertEquals(primary, autoClusterFailover.getServiceUrl())); + Mockito.verify(pulsarClient, Mockito.atLeastOnce()).updateTlsTrustCertsFilePath(primaryTlsTrustCertsFilePath); + Mockito.verify(pulsarClient, Mockito.atLeastOnce()).updateAuthentication(primaryAuthentication); + + } + + @Test + public void testAutoClusterFailoverSwitchTlsTrustStore() throws IOException { + String primary = "pulsar+ssl://localhost:6651"; + String secondary = "pulsar+ssl://localhost:6661"; + long failoverDelay = 1; + long switchBackDelay = 1; + long checkInterval = 1_000; + String primaryTlsTrustStorePath = "primary/path"; + String secondaryTlsTrustStorePath = "secondary/path"; + String primaryTlsTrustStorePassword = "primaryPassword"; + String secondaryTlsTrustStorePassword = "secondaryPassword"; + + Map secondaryTlsTrustStorePaths = new HashMap<>(); + secondaryTlsTrustStorePaths.put(secondary, secondaryTlsTrustStorePath); + Map secondaryTlsTrustStorePasswords = new HashMap<>(); + secondaryTlsTrustStorePasswords.put(secondary, secondaryTlsTrustStorePassword); + + ClientConfigurationData configurationData = new ClientConfigurationData(); + configurationData.setTlsTrustStorePath(primaryTlsTrustStorePath); + configurationData.setTlsTrustStorePassword(primaryTlsTrustStorePassword); + + ServiceUrlProvider provider = AutoClusterFailover.builder() + .primary(primary) + .secondary(Collections.singletonList(secondary)) + .failoverDelay(failoverDelay, TimeUnit.SECONDS) + .switchBackDelay(switchBackDelay, TimeUnit.SECONDS) + .checkInterval(checkInterval, TimeUnit.MILLISECONDS) + .secondaryTlsTrustStorePath(secondaryTlsTrustStorePaths) + .secondaryTlsTrustStorePassword(secondaryTlsTrustStorePasswords) + .build(); + + AutoClusterFailover autoClusterFailover = Mockito.spy((AutoClusterFailover) provider); + PulsarClientImpl pulsarClient = PowerMockito.mock(PulsarClientImpl.class); + Mockito.doReturn(false).when(autoClusterFailover).probeAvailable(primary); + Mockito.doReturn(true).when(autoClusterFailover).probeAvailable(secondary); + Mockito.doReturn(configurationData).when(pulsarClient).getConfiguration(); + + autoClusterFailover.initialize(pulsarClient); + + Awaitility.await().untilAsserted(() -> + Assert.assertEquals(secondary, autoClusterFailover.getServiceUrl())); + Mockito.verify(pulsarClient, Mockito.atLeastOnce()) + .updateTlsTrustStorePathAndPassword(secondaryTlsTrustStorePath, secondaryTlsTrustStorePassword); + + // primary cluster came back + Mockito.doReturn(true).when(autoClusterFailover).probeAvailable(primary); + Awaitility.await().untilAsserted(() -> + Assert.assertEquals(primary, autoClusterFailover.getServiceUrl())); + Mockito.verify(pulsarClient, Mockito.atLeastOnce()) + .updateTlsTrustStorePathAndPassword(primaryTlsTrustStorePath, primaryTlsTrustStorePassword); + + } +} diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ControlledClusterFailoverTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ControlledClusterFailoverTest.java new file mode 100644 index 0000000000000..1fd4a017ec8d1 --- /dev/null +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ControlledClusterFailoverTest.java @@ -0,0 +1,130 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import org.apache.pulsar.client.api.Authentication; +import org.apache.pulsar.client.api.ServiceUrlProvider; +import org.asynchttpclient.Request; +import org.awaitility.Awaitility; +import org.mockito.Mockito; +import org.powermock.api.mockito.PowerMockito; +import org.testng.Assert; +import org.testng.annotations.Test; + +@Test(groups = "broker-impl") +public class ControlledClusterFailoverTest { + @Test + public void testBuildControlledClusterFailoverInstance() throws IOException { + String defaultServiceUrl = "pulsar://localhost:6650"; + String urlProvider = "http://localhost:8080/test"; + String keyA = "key-a"; + String valueA = "value-a"; + String keyB = "key-b"; + String valueB = "value-b"; + + Map header = new HashMap<>(); + header.put(keyA, valueA); + header.put(keyB, valueB); + ServiceUrlProvider provider = ControlledClusterFailover.builder() + .defaultServiceUrl(defaultServiceUrl) + .urlProvider(urlProvider) + .urlProviderHeader(header) + .build(); + + ControlledClusterFailover controlledClusterFailover = (ControlledClusterFailover) provider; + Request request = controlledClusterFailover.getRequestBuilder().build(); + + Assert.assertTrue(provider instanceof ControlledClusterFailover); + Assert.assertEquals(defaultServiceUrl, provider.getServiceUrl()); + Assert.assertEquals(defaultServiceUrl, controlledClusterFailover.getCurrentPulsarServiceUrl()); + Assert.assertEquals(urlProvider, request.getUri().toUrl()); + Assert.assertEquals(request.getHeaders().get(keyA), valueA); + Assert.assertEquals(request.getHeaders().get(keyB), valueB); + } + + @Test + public void testControlledClusterFailoverSwitch() throws IOException { + String defaultServiceUrl = "pulsar+ssl://localhost:6651"; + String backupServiceUrl = "pulsar+ssl://localhost:6661"; + String urlProvider = "http://localhost:8080"; + String tlsTrustCertsFilePath = "backup/path"; + String authPluginClassName = "org.apache.pulsar.client.impl.auth.AuthenticationToken"; + String authParamsString = "token:xxxaaabbee"; + long interval = 1_000; + + ControlledClusterFailover.ControlledConfiguration controlledConfiguration = + new ControlledClusterFailover.ControlledConfiguration(); + controlledConfiguration.setServiceUrl(backupServiceUrl); + controlledConfiguration.setTlsTrustCertsFilePath(tlsTrustCertsFilePath); + controlledConfiguration.setAuthPluginClassName(authPluginClassName); + controlledConfiguration.setAuthParamsString(authParamsString); + + ServiceUrlProvider provider = ControlledClusterFailover.builder() + .defaultServiceUrl(defaultServiceUrl) + .urlProvider(urlProvider) + .checkInterval(interval, TimeUnit.MILLISECONDS) + .build(); + + ControlledClusterFailover controlledClusterFailover = Mockito.spy((ControlledClusterFailover) provider); + PulsarClientImpl pulsarClient = PowerMockito.mock(PulsarClientImpl.class); + + controlledClusterFailover.initialize(pulsarClient); + + Awaitility.await().untilAsserted(() -> + Assert.assertEquals(defaultServiceUrl, controlledClusterFailover.getServiceUrl())); + + Mockito.doReturn(controlledConfiguration).when(controlledClusterFailover) + .fetchControlledConfiguration(); + Awaitility.await().untilAsserted(() -> + Assert.assertEquals(backupServiceUrl, controlledClusterFailover.getServiceUrl())); + Mockito.verify(pulsarClient, Mockito.atLeastOnce()) + .updateServiceUrl(backupServiceUrl); + Mockito.verify(pulsarClient, Mockito.atLeastOnce()) + .updateTlsTrustCertsFilePath(tlsTrustCertsFilePath); + Mockito.verify(pulsarClient, Mockito.atLeastOnce()) + .updateAuthentication(Mockito.any(Authentication.class)); + + // update controlled configuration + String backupServiceUrlV1 = "pulsar+ssl://localhost:6662"; + String tlsTrustCertsFilePathV1 = "backup/pathV1"; + String authPluginClassNameV1 = "org.apache.pulsar.client.impl.auth.AuthenticationToken"; + String authParamsStringV1 = "token:xxxaaabbeev1"; + ControlledClusterFailover.ControlledConfiguration controlledConfiguration1 = + new ControlledClusterFailover.ControlledConfiguration(); + controlledConfiguration1.setServiceUrl(backupServiceUrlV1); + controlledConfiguration1.setTlsTrustCertsFilePath(tlsTrustCertsFilePathV1); + controlledConfiguration1.setAuthPluginClassName(authPluginClassNameV1); + controlledConfiguration1.setAuthParamsString(authParamsStringV1); + Mockito.doReturn(controlledConfiguration1).when(controlledClusterFailover) + .fetchControlledConfiguration(); + + Awaitility.await().untilAsserted(() -> + Assert.assertEquals(backupServiceUrlV1, controlledClusterFailover.getServiceUrl())); + Mockito.verify(pulsarClient, Mockito.atLeastOnce()).updateServiceUrl(backupServiceUrlV1); + Mockito.verify(pulsarClient, Mockito.atLeastOnce()) + .updateTlsTrustCertsFilePath(tlsTrustCertsFilePathV1); + Mockito.verify(pulsarClient, Mockito.atLeastOnce()) + .updateAuthentication(Mockito.any(Authentication.class)); + + } +}