From a49b54523c6e233989b255138c497cb2b9e1d0fe Mon Sep 17 00:00:00 2001 From: YANGLiiN Date: Wed, 1 Feb 2023 14:43:35 +0800 Subject: [PATCH 1/3] [fix][broker] fix the autoRecovery aware of the right rack info. --- .../bookie/rackawareness/BookieRackAffinityMapping.java | 8 ++++---- .../rackawareness/BookieRackAffinityMappingTest.java | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMapping.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMapping.java index e9e350800b44e..5556b4c5952c6 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMapping.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMapping.java @@ -80,7 +80,7 @@ public static MetadataStore createMetadataStore(Configuration conf) throws Metad store = (MetadataStore) storeProperty; } else { String url; - String metadataServiceUri = ConfigurationStringUtil.castToString(conf.getProperty("metadataServiceUri")); + String metadataServiceUri = ConfigurationStringUtil.castToString(conf.getProperty("metadataStoreUrl")); if (StringUtils.isNotBlank(metadataServiceUri)) { try { url = metadataServiceUri.replaceFirst(METADATA_STORE_SCHEME + ":", "") @@ -89,7 +89,7 @@ public static MetadataStore createMetadataStore(Configuration conf) throws Metad throw new MetadataException(Code.METADATA_SERVICE_ERROR, e); } } else { - String zkServers = ConfigurationStringUtil.castToString(conf.getProperty("zkServers")); + String zkServers = ConfigurationStringUtil.castToString(conf.getProperty("zookeeperServers")); if (StringUtils.isBlank(zkServers)) { String errorMsg = String.format("Neither %s configuration set in the BK client configuration nor " + "metadataServiceUri/zkServers set in bk server configuration", METADATA_STORE_INSTANCE); @@ -98,11 +98,11 @@ public static MetadataStore createMetadataStore(Configuration conf) throws Metad url = zkServers; } try { - int zkTimeout = Integer.parseInt((String) conf.getProperty("zkTimeout")); + int sessionTimeoutMillis = conf.getInt("metadataStoreSessionTimeoutMillis", 30000); store = MetadataStoreExtended.create(url, MetadataStoreConfig.builder() .metadataStoreName(MetadataStoreConfig.METADATA_STORE) - .sessionTimeoutMillis(zkTimeout) + .sessionTimeoutMillis(sessionTimeoutMillis) .build()); } catch (MetadataStoreException e) { throw new MetadataException(Code.METADATA_SERVICE_ERROR, e); diff --git a/pulsar-broker-common/src/test/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMappingTest.java b/pulsar-broker-common/src/test/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMappingTest.java index d7be7dabd0db1..54121859e4494 100644 --- a/pulsar-broker-common/src/test/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMappingTest.java +++ b/pulsar-broker-common/src/test/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMappingTest.java @@ -110,7 +110,7 @@ public void testBasic() throws Exception { public void testMultipleMetadataServiceUris() { BookieRackAffinityMapping mapping1 = new BookieRackAffinityMapping(); ClientConfiguration bkClientConf1 = new ClientConfiguration(); - bkClientConf1.setProperty("metadataServiceUri", "memory:local,memory:local"); + bkClientConf1.setProperty("metadataStoreUrl", "memory:local,memory:local"); bkClientConf1.setProperty("zkTimeout", "100000"); mapping1.setBookieAddressResolver(BookieSocketAddress.LEGACY_BOOKIEID_RESOLVER); From 251923adfc3a683b86847d4fb7506c3fb4f87318 Mon Sep 17 00:00:00 2001 From: YANGLiiN Date: Mon, 6 Feb 2023 15:44:55 +0800 Subject: [PATCH 2/3] [fix][broker] fix the compatible problem. --- .../BookieRackAffinityMapping.java | 38 ++++++++++++++----- .../BookieRackAffinityMappingTest.java | 2 +- 2 files changed, 30 insertions(+), 10 deletions(-) diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMapping.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMapping.java index 5556b4c5952c6..bc428bce17b66 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMapping.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMapping.java @@ -80,23 +80,43 @@ public static MetadataStore createMetadataStore(Configuration conf) throws Metad store = (MetadataStore) storeProperty; } else { String url; - String metadataServiceUri = ConfigurationStringUtil.castToString(conf.getProperty("metadataStoreUrl")); - if (StringUtils.isNotBlank(metadataServiceUri)) { + + // First get from the Pulsar broker side + String metadataStoreUrl = ConfigurationStringUtil.castToString(conf.getProperty("metadataStoreUrl")); + if (StringUtils.isNotBlank(metadataStoreUrl)) { try { - url = metadataServiceUri.replaceFirst(METADATA_STORE_SCHEME + ":", "") + url = metadataStoreUrl.replaceFirst(METADATA_STORE_SCHEME + ":", "") .replace(";", ","); } catch (Exception e) { throw new MetadataException(Code.METADATA_SERVICE_ERROR, e); } } else { - String zkServers = ConfigurationStringUtil.castToString(conf.getProperty("zookeeperServers")); - if (StringUtils.isBlank(zkServers)) { - String errorMsg = String.format("Neither %s configuration set in the BK client configuration nor " - + "metadataServiceUri/zkServers set in bk server configuration", METADATA_STORE_INSTANCE); - throw new RuntimeException(errorMsg); + url = ConfigurationStringUtil.castToString(conf.getProperty("zookeeperServers")); + } + + // Then downgrade to the bookkeeper side, compatible with previous versions. + if (StringUtils.isBlank(url)) { + String metadataServiceUri = ConfigurationStringUtil.castToString(conf.getProperty("metadataServiceUri")); + if (StringUtils.isNotBlank(metadataServiceUri)) { + try { + url = metadataServiceUri.replaceFirst(METADATA_STORE_SCHEME + ":", "") + .replace(";", ","); + } catch (Exception e) { + throw new MetadataException(Code.METADATA_SERVICE_ERROR, e); + } + } else { + String zkServers = ConfigurationStringUtil.castToString(conf.getProperty("zkServers")); + if (StringUtils.isBlank(zkServers)) { + String errorMsg = + String.format("Neither %s configuration set in the BK client configuration nor " + + "metadataServiceUri/zkServers set in bk server configuration", + METADATA_STORE_INSTANCE); + throw new RuntimeException(errorMsg); + } + url = zkServers; } - url = zkServers; } + try { int sessionTimeoutMillis = conf.getInt("metadataStoreSessionTimeoutMillis", 30000); store = MetadataStoreExtended.create(url, diff --git a/pulsar-broker-common/src/test/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMappingTest.java b/pulsar-broker-common/src/test/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMappingTest.java index 54121859e4494..d7be7dabd0db1 100644 --- a/pulsar-broker-common/src/test/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMappingTest.java +++ b/pulsar-broker-common/src/test/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMappingTest.java @@ -110,7 +110,7 @@ public void testBasic() throws Exception { public void testMultipleMetadataServiceUris() { BookieRackAffinityMapping mapping1 = new BookieRackAffinityMapping(); ClientConfiguration bkClientConf1 = new ClientConfiguration(); - bkClientConf1.setProperty("metadataStoreUrl", "memory:local,memory:local"); + bkClientConf1.setProperty("metadataServiceUri", "memory:local,memory:local"); bkClientConf1.setProperty("zkTimeout", "100000"); mapping1.setBookieAddressResolver(BookieSocketAddress.LEGACY_BOOKIEID_RESOLVER); From 6347eacb96759cc30b6c6b2ae60803af4bce94bb Mon Sep 17 00:00:00 2001 From: YANGLiiN Date: Mon, 6 Feb 2023 16:09:34 +0800 Subject: [PATCH 3/3] [fix][broker] fix the format problem. --- .../bookie/rackawareness/BookieRackAffinityMapping.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMapping.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMapping.java index bc428bce17b66..d021ab5a785f3 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMapping.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMapping.java @@ -80,7 +80,6 @@ public static MetadataStore createMetadataStore(Configuration conf) throws Metad store = (MetadataStore) storeProperty; } else { String url; - // First get from the Pulsar broker side String metadataStoreUrl = ConfigurationStringUtil.castToString(conf.getProperty("metadataStoreUrl")); if (StringUtils.isNotBlank(metadataStoreUrl)) { @@ -96,7 +95,8 @@ public static MetadataStore createMetadataStore(Configuration conf) throws Metad // Then downgrade to the bookkeeper side, compatible with previous versions. if (StringUtils.isBlank(url)) { - String metadataServiceUri = ConfigurationStringUtil.castToString(conf.getProperty("metadataServiceUri")); + String metadataServiceUri = + ConfigurationStringUtil.castToString(conf.getProperty("metadataServiceUri")); if (StringUtils.isNotBlank(metadataServiceUri)) { try { url = metadataServiceUri.replaceFirst(METADATA_STORE_SCHEME + ":", "")