Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -80,29 +80,49 @@ public static MetadataStore createMetadataStore(Configuration conf) throws Metad
store = (MetadataStore) storeProperty;
} else {
String url;
String metadataServiceUri = ConfigurationStringUtil.castToString(conf.getProperty("metadataServiceUri"));
if (StringUtils.isNotBlank(metadataServiceUri)) {
// First get from the Pulsar broker side
String metadataStoreUrl = ConfigurationStringUtil.castToString(conf.getProperty("metadataStoreUrl"));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This logic is used for bk auto-recovery, and it doesn't make sense to get it from the Pulsar broker side. If we use it with this logic, we need to configure metadataStoreUrl in the conf/bookkeeper.conf

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current BookieRackAffinityMapping policy is implemented on the pulsar side and its configuration is configured to the pulsar's zk (metadatastore).
It would be best to migrate the Metadata BookieRackAffinityMapping strategy to the bookkeeper project so that bookkeeper-only businesses can have this capability as well.

if (StringUtils.isNotBlank(metadataStoreUrl)) {
try {
url = metadataServiceUri.replaceFirst(METADATA_STORE_SCHEME + ":", "")
url = metadataStoreUrl.replaceFirst(METADATA_STORE_SCHEME + ":", "")
.replace(";", ",");
} catch (Exception e) {
throw new MetadataException(Code.METADATA_SERVICE_ERROR, e);
}
} else {
String zkServers = ConfigurationStringUtil.castToString(conf.getProperty("zkServers"));
if (StringUtils.isBlank(zkServers)) {
String errorMsg = String.format("Neither %s configuration set in the BK client configuration nor "
+ "metadataServiceUri/zkServers set in bk server configuration", METADATA_STORE_INSTANCE);
throw new RuntimeException(errorMsg);
url = ConfigurationStringUtil.castToString(conf.getProperty("zookeeperServers"));
}

// Then downgrade to the bookkeeper side, compatible with previous versions.
if (StringUtils.isBlank(url)) {
String metadataServiceUri =
ConfigurationStringUtil.castToString(conf.getProperty("metadataServiceUri"));
if (StringUtils.isNotBlank(metadataServiceUri)) {
try {
url = metadataServiceUri.replaceFirst(METADATA_STORE_SCHEME + ":", "")
.replace(";", ",");
} catch (Exception e) {
throw new MetadataException(Code.METADATA_SERVICE_ERROR, e);
}
} else {
String zkServers = ConfigurationStringUtil.castToString(conf.getProperty("zkServers"));
if (StringUtils.isBlank(zkServers)) {
String errorMsg =
String.format("Neither %s configuration set in the BK client configuration nor "
+ "metadataServiceUri/zkServers set in bk server configuration",
METADATA_STORE_INSTANCE);
throw new RuntimeException(errorMsg);
}
url = zkServers;
}
url = zkServers;
}

try {
int zkTimeout = Integer.parseInt((String) conf.getProperty("zkTimeout"));
int sessionTimeoutMillis = conf.getInt("metadataStoreSessionTimeoutMillis", 30000);
store = MetadataStoreExtended.create(url,
MetadataStoreConfig.builder()
.metadataStoreName(MetadataStoreConfig.METADATA_STORE)
.sessionTimeoutMillis(zkTimeout)
.sessionTimeoutMillis(sessionTimeoutMillis)
.build());
} catch (MetadataStoreException e) {
throw new MetadataException(Code.METADATA_SERVICE_ERROR, e);
Expand Down