diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMapping.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMapping.java index c302a0162d836..ec4b7da250e0d 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMapping.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMapping.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.bookie.rackawareness; +import static org.apache.pulsar.metadata.bookkeeper.AbstractMetadataDriver.METADATA_STORE_SCHEME; import java.net.InetAddress; import java.util.ArrayList; import java.util.Collections; @@ -31,6 +32,8 @@ import java.util.concurrent.ExecutionException; import org.apache.bookkeeper.client.ITopologyAwareEnsemblePlacementPolicy; import org.apache.bookkeeper.client.RackChangeNotifier; +import org.apache.bookkeeper.meta.exceptions.Code; +import org.apache.bookkeeper.meta.exceptions.MetadataException; import org.apache.bookkeeper.net.AbstractDNSToSwitchMapping; import org.apache.bookkeeper.net.BookieId; import org.apache.bookkeeper.net.BookieNode; @@ -42,7 +45,10 @@ import org.apache.pulsar.common.policies.data.BookiesRackConfiguration; import org.apache.pulsar.metadata.api.MetadataCache; import org.apache.pulsar.metadata.api.MetadataStore; +import org.apache.pulsar.metadata.api.MetadataStoreConfig; +import org.apache.pulsar.metadata.api.MetadataStoreException; import org.apache.pulsar.metadata.api.Notification; +import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -63,24 +69,54 @@ public class BookieRackAffinityMapping extends AbstractDNSToSwitchMapping private volatile BookiesRackConfiguration racksWithHost = new BookiesRackConfiguration(); private volatile Map bookieInfoMap = new HashMap<>(); - @Override - public void setConf(Configuration conf) { - super.setConf(conf); + public static MetadataStore createMetadataStore(Configuration conf) throws MetadataException { + MetadataStore store; Object storeProperty = conf.getProperty(METADATA_STORE_INSTANCE); - if (storeProperty == null) { - throw new RuntimeException(METADATA_STORE_INSTANCE + " configuration was not set in the BK client " - + "configuration"); - } - - if (!(storeProperty instanceof MetadataStore)) { - throw new RuntimeException(METADATA_STORE_INSTANCE + " is not an instance of MetadataStore"); + if (storeProperty != null) { + if (!(storeProperty instanceof MetadataStore)) { + throw new RuntimeException(METADATA_STORE_INSTANCE + " is not an instance of MetadataStore"); + } + store = (MetadataStore) storeProperty; + } else { + String url; + String metadataServiceUri = (String) conf.getProperty("metadataServiceUri"); + if (StringUtils.isNotBlank(metadataServiceUri)) { + try { + url = metadataServiceUri.replaceFirst(METADATA_STORE_SCHEME + ":", "") + .replace(";", ","); + } catch (Exception e) { + throw new MetadataException(Code.METADATA_SERVICE_ERROR, e); + } + } else { + String zkServers = (String) conf.getProperty("zkServers"); + if (StringUtils.isBlank(zkServers)) { + String errorMsg = String.format("Neither %s configuration set in the BK client configuration nor " + + "metadataServiceUri/zkServers set in bk server configuration", METADATA_STORE_INSTANCE); + throw new RuntimeException(errorMsg); + } + url = zkServers; + } + try { + int zkTimeout = Integer.parseInt((String) conf.getProperty("zkTimeout")); + store = MetadataStoreExtended.create(url, + MetadataStoreConfig.builder() + .sessionTimeoutMillis(zkTimeout) + .build()); + } catch (MetadataStoreException e) { + throw new MetadataException(Code.METADATA_SERVICE_ERROR, e); + } } + return store; + } - MetadataStore store = (MetadataStore) storeProperty; - - bookieMappingCache = store.getMetadataCache(BookiesRackConfiguration.class); - bookieMappingCache.get(BOOKIE_INFO_ROOT_PATH).join(); + @Override + public void setConf(Configuration conf) { + super.setConf(conf); + MetadataStore store; try { + store = createMetadataStore(conf); + bookieMappingCache = store.getMetadataCache(BookiesRackConfiguration.class); + bookieMappingCache.get(BOOKIE_INFO_ROOT_PATH).join(); for (Map bookieMapping : bookieMappingCache.get(BOOKIE_INFO_ROOT_PATH).get() .map(Map::values).orElse(Collections.emptyList())) { for (String address : bookieMapping.keySet()) { @@ -91,7 +127,7 @@ public void setConf(Configuration conf) { bookieAddressListLastTime); } } - } catch (InterruptedException | ExecutionException e) { + } catch (InterruptedException | ExecutionException | MetadataException e) { throw new RuntimeException(METADATA_STORE_INSTANCE + " failed to init BookieId list"); } store.registerListener(this::handleUpdates); diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicy.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicy.java index fed8943e7e9fb..c086e2c4e5e46 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicy.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicy.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.bookie.rackawareness; +import static org.apache.pulsar.bookie.rackawareness.BookieRackAffinityMapping.METADATA_STORE_INSTANCE; import io.netty.util.HashedWheelTimer; import java.util.ArrayList; import java.util.Arrays; @@ -35,6 +36,7 @@ import org.apache.bookkeeper.client.RackawareEnsemblePlacementPolicyImpl; import org.apache.bookkeeper.conf.ClientConfiguration; import org.apache.bookkeeper.feature.FeatureProvider; +import org.apache.bookkeeper.meta.exceptions.MetadataException; import org.apache.bookkeeper.net.BookieId; import org.apache.bookkeeper.net.DNSToSwitchMapping; import org.apache.bookkeeper.proto.BookieAddressResolver; @@ -68,20 +70,12 @@ public IsolatedBookieEnsemblePlacementPolicy() { public RackawareEnsemblePlacementPolicyImpl initialize(ClientConfiguration conf, Optional optionalDnsResolver, HashedWheelTimer timer, FeatureProvider featureProvider, StatsLogger statsLogger, BookieAddressResolver bookieAddressResolver) { - - Object storeProperty = conf.getProperty(BookieRackAffinityMapping.METADATA_STORE_INSTANCE); - if (storeProperty == null) { - throw new RuntimeException(BookieRackAffinityMapping.METADATA_STORE_INSTANCE - + " configuration was not set in the BK client configuration"); - } - - if (!(storeProperty instanceof MetadataStore)) { - throw new RuntimeException( - BookieRackAffinityMapping.METADATA_STORE_INSTANCE + " is not an instance of MetadataStore"); + MetadataStore store; + try { + store = BookieRackAffinityMapping.createMetadataStore(conf); + } catch (MetadataException e) { + throw new RuntimeException(METADATA_STORE_INSTANCE + " failed initialized"); } - - MetadataStore store = (MetadataStore) storeProperty; - Set primaryIsolationGroups = new HashSet<>(); Set secondaryIsolationGroups = new HashSet<>(); if (conf.getProperty(ISOLATION_BOOKIE_GROUPS) != null) { diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/AbstractMetadataDriver.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/AbstractMetadataDriver.java index dceeb21e8b72e..1a088e976c8f2 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/AbstractMetadataDriver.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/AbstractMetadataDriver.java @@ -37,7 +37,7 @@ public abstract class AbstractMetadataDriver implements Closeable { - protected static final String METADATA_STORE_SCHEME = "metadata-store"; + public static final String METADATA_STORE_SCHEME = "metadata-store"; public static final String METADATA_STORE_INSTANCE = "metadata-store-instance";