diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMapping.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMapping.java index e9e350800b44e..d021ab5a785f3 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMapping.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMapping.java @@ -80,29 +80,49 @@ public static MetadataStore createMetadataStore(Configuration conf) throws Metad store = (MetadataStore) storeProperty; } else { String url; - String metadataServiceUri = ConfigurationStringUtil.castToString(conf.getProperty("metadataServiceUri")); - if (StringUtils.isNotBlank(metadataServiceUri)) { + // First get from the Pulsar broker side + String metadataStoreUrl = ConfigurationStringUtil.castToString(conf.getProperty("metadataStoreUrl")); + if (StringUtils.isNotBlank(metadataStoreUrl)) { try { - url = metadataServiceUri.replaceFirst(METADATA_STORE_SCHEME + ":", "") + url = metadataStoreUrl.replaceFirst(METADATA_STORE_SCHEME + ":", "") .replace(";", ","); } catch (Exception e) { throw new MetadataException(Code.METADATA_SERVICE_ERROR, e); } } else { - String zkServers = ConfigurationStringUtil.castToString(conf.getProperty("zkServers")); - if (StringUtils.isBlank(zkServers)) { - String errorMsg = String.format("Neither %s configuration set in the BK client configuration nor " - + "metadataServiceUri/zkServers set in bk server configuration", METADATA_STORE_INSTANCE); - throw new RuntimeException(errorMsg); + url = ConfigurationStringUtil.castToString(conf.getProperty("zookeeperServers")); + } + + // Then downgrade to the bookkeeper side, compatible with previous versions. + if (StringUtils.isBlank(url)) { + String metadataServiceUri = + ConfigurationStringUtil.castToString(conf.getProperty("metadataServiceUri")); + if (StringUtils.isNotBlank(metadataServiceUri)) { + try { + url = metadataServiceUri.replaceFirst(METADATA_STORE_SCHEME + ":", "") + .replace(";", ","); + } catch (Exception e) { + throw new MetadataException(Code.METADATA_SERVICE_ERROR, e); + } + } else { + String zkServers = ConfigurationStringUtil.castToString(conf.getProperty("zkServers")); + if (StringUtils.isBlank(zkServers)) { + String errorMsg = + String.format("Neither %s configuration set in the BK client configuration nor " + + "metadataServiceUri/zkServers set in bk server configuration", + METADATA_STORE_INSTANCE); + throw new RuntimeException(errorMsg); + } + url = zkServers; } - url = zkServers; } + try { - int zkTimeout = Integer.parseInt((String) conf.getProperty("zkTimeout")); + int sessionTimeoutMillis = conf.getInt("metadataStoreSessionTimeoutMillis", 30000); store = MetadataStoreExtended.create(url, MetadataStoreConfig.builder() .metadataStoreName(MetadataStoreConfig.METADATA_STORE) - .sessionTimeoutMillis(zkTimeout) + .sessionTimeoutMillis(sessionTimeoutMillis) .build()); } catch (MetadataStoreException e) { throw new MetadataException(Code.METADATA_SERVICE_ERROR, e);