From 582f158a65d1309bdf1fb8015f81a563daf3346d Mon Sep 17 00:00:00 2001 From: Matteo Merli Date: Wed, 27 Jul 2022 23:45:03 -0700 Subject: [PATCH 1/2] [Branch-2.7] Fixed deadlock on metadata cache missing while doing checkReplication (#12484) (cherry picked from commit 32fe228854464504d18de240f719b583cf262042) --- .../pulsar/broker/service/BrokerService.java | 210 ++++++++++-------- .../service/persistent/PersistentTopic.java | 109 ++++----- 2 files changed, 172 insertions(+), 147 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index 29d1001e4bdd9..40fa540b9c02f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -1190,119 +1190,139 @@ public CompletableFuture getManagedLedgerConfig(TopicName t Optional policies = Optional.empty(); Optional localPolicies = Optional.empty(); - PersistencePolicies persistencePolicies = null; - RetentionPolicies retentionPolicies = null; - OffloadPolicies topicLevelOffloadPolicies = null; + PersistencePolicies tmpPersistencePolicies = null; + RetentionPolicies tmpRetentionPolicies = null; + OffloadPolicies tmpTopicLevelOffloadPolicies = null; if (pulsar.getConfig().isTopicLevelPoliciesEnabled()) { try { TopicPolicies topicPolicies = pulsar.getTopicPoliciesService().getTopicPolicies(topicName); if (topicPolicies != null) { - persistencePolicies = topicPolicies.getPersistence(); - retentionPolicies = topicPolicies.getRetentionPolicies(); - topicLevelOffloadPolicies = topicPolicies.getOffloadPolicies(); + tmpPersistencePolicies = topicPolicies.getPersistence(); + tmpRetentionPolicies = topicPolicies.getRetentionPolicies(); + tmpTopicLevelOffloadPolicies = topicPolicies.getOffloadPolicies(); } } catch (BrokerServiceException.TopicPoliciesCacheNotInitException e) { log.debug("Topic {} policies have not been initialized yet.", topicName); } } - try { - policies = pulsar - .getConfigurationCache().policiesCache().get(AdminResource.path(POLICIES, - namespace.toString())); - String path = joinPath(LOCAL_POLICIES_ROOT, topicName.getNamespaceObject().toString()); - localPolicies = pulsar().getLocalZkCacheService().policiesCache().get(path); - } catch (Throwable t) { - // Ignoring since if we don't have policies, we fallback on the default - log.warn("Got exception when reading persistence policy for {}: {}", topicName, t.getMessage(), t); - future.completeExceptionally(t); - return; - } - - if (persistencePolicies == null) { - persistencePolicies = policies.map(p -> p.persistence).orElseGet( - () -> new PersistencePolicies(serviceConfig.getManagedLedgerDefaultEnsembleSize(), - serviceConfig.getManagedLedgerDefaultWriteQuorum(), - serviceConfig.getManagedLedgerDefaultAckQuorum(), - serviceConfig.getManagedLedgerDefaultMarkDeleteRateLimit())); - } + final PersistencePolicies finalPersistencePolicies = tmpPersistencePolicies; + final RetentionPolicies finalRetentionPolicies = tmpRetentionPolicies; + final OffloadPolicies finalTopicLevelOffloadPolicies = tmpTopicLevelOffloadPolicies; + + + CompletableFuture> policiesFuture = pulsar + .getConfigurationCache().policiesCache().getAsync(AdminResource.path(POLICIES, + namespace.toString())); + String path = joinPath(LOCAL_POLICIES_ROOT, topicName.getNamespaceObject().toString()); + CompletableFuture> localPoliciesFuture = + pulsar().getLocalZkCacheService().policiesCache().getAsync(path); + + policiesFuture.thenCombine(localPoliciesFuture, (optPolicies, optLocalPolicies) -> { + PersistencePolicies persistencePolicies = finalPersistencePolicies; + RetentionPolicies retentionPolicies = finalRetentionPolicies; + OffloadPolicies topicLevelOffloadPolicies = finalTopicLevelOffloadPolicies; + + if (persistencePolicies == null) { + persistencePolicies = policies.map(p -> p.persistence).orElseGet( + () -> new PersistencePolicies(serviceConfig.getManagedLedgerDefaultEnsembleSize(), + serviceConfig.getManagedLedgerDefaultWriteQuorum(), + serviceConfig.getManagedLedgerDefaultAckQuorum(), + serviceConfig.getManagedLedgerDefaultMarkDeleteRateLimit())); + } - if (retentionPolicies == null) { - retentionPolicies = policies.map(p -> p.retention_policies).orElseGet( - () -> new RetentionPolicies(serviceConfig.getDefaultRetentionTimeInMinutes(), - serviceConfig.getDefaultRetentionSizeInMB()) - ); - } + if (retentionPolicies == null) { + retentionPolicies = policies.map(p -> p.retention_policies).orElseGet( + () -> new RetentionPolicies(serviceConfig.getDefaultRetentionTimeInMinutes(), + serviceConfig.getDefaultRetentionSizeInMB()) + ); + } - ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig(); - managedLedgerConfig.setEnsembleSize(persistencePolicies.getBookkeeperEnsemble()); - managedLedgerConfig.setWriteQuorumSize(persistencePolicies.getBookkeeperWriteQuorum()); - managedLedgerConfig.setAckQuorumSize(persistencePolicies.getBookkeeperAckQuorum()); - if (localPolicies.isPresent() && localPolicies.get().bookieAffinityGroup != null) { + ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig(); + managedLedgerConfig.setEnsembleSize(persistencePolicies.getBookkeeperEnsemble()); + managedLedgerConfig.setWriteQuorumSize(persistencePolicies.getBookkeeperWriteQuorum()); + managedLedgerConfig.setAckQuorumSize(persistencePolicies.getBookkeeperAckQuorum()); + if (localPolicies.isPresent() && localPolicies.get().bookieAffinityGroup != null) { + managedLedgerConfig + .setBookKeeperEnsemblePlacementPolicyClassName( + ZkIsolatedBookieEnsemblePlacementPolicy.class); + Map properties = Maps.newHashMap(); + properties.put(ZkIsolatedBookieEnsemblePlacementPolicy.ISOLATION_BOOKIE_GROUPS, + localPolicies.get().bookieAffinityGroup.bookkeeperAffinityGroupPrimary); + properties.put(ZkIsolatedBookieEnsemblePlacementPolicy.SECONDARY_ISOLATION_BOOKIE_GROUPS, + localPolicies.get().bookieAffinityGroup.bookkeeperAffinityGroupSecondary); + managedLedgerConfig.setBookKeeperEnsemblePlacementPolicyProperties(properties); + } + managedLedgerConfig.setThrottleMarkDelete(persistencePolicies.getManagedLedgerMaxMarkDeleteRate()); + managedLedgerConfig.setDigestType(serviceConfig.getManagedLedgerDigestType()); + managedLedgerConfig.setPassword(serviceConfig.getManagedLedgerPassword()); + + managedLedgerConfig.setMaxUnackedRangesToPersist( + serviceConfig.getManagedLedgerMaxUnackedRangesToPersist()); + managedLedgerConfig.setMaxUnackedRangesToPersistInZk( + serviceConfig.getManagedLedgerMaxUnackedRangesToPersistInZooKeeper()); + managedLedgerConfig.setMaxEntriesPerLedger(serviceConfig.getManagedLedgerMaxEntriesPerLedger()); + managedLedgerConfig.setMinimumRolloverTime(serviceConfig.getManagedLedgerMinLedgerRolloverTimeMinutes(), + TimeUnit.MINUTES); + managedLedgerConfig.setMaximumRolloverTime(serviceConfig.getManagedLedgerMaxLedgerRolloverTimeMinutes(), + TimeUnit.MINUTES); + managedLedgerConfig.setMaxSizePerLedgerMb(serviceConfig.getManagedLedgerMaxSizePerLedgerMbytes()); + + managedLedgerConfig.setMetadataOperationsTimeoutSeconds( + serviceConfig.getManagedLedgerMetadataOperationsTimeoutSeconds()); + managedLedgerConfig.setReadEntryTimeoutSeconds(serviceConfig.getManagedLedgerReadEntryTimeoutSeconds()); + managedLedgerConfig.setAddEntryTimeoutSeconds(serviceConfig.getManagedLedgerAddEntryTimeoutSeconds()); + managedLedgerConfig.setMetadataEnsembleSize(serviceConfig.getManagedLedgerDefaultEnsembleSize()); + managedLedgerConfig.setUnackedRangesOpenCacheSetEnabled( + serviceConfig.isManagedLedgerUnackedRangesOpenCacheSetEnabled()); + managedLedgerConfig.setMetadataWriteQuorumSize(serviceConfig.getManagedLedgerDefaultWriteQuorum()); + managedLedgerConfig.setMetadataAckQuorumSize(serviceConfig.getManagedLedgerDefaultAckQuorum()); managedLedgerConfig - .setBookKeeperEnsemblePlacementPolicyClassName(ZkIsolatedBookieEnsemblePlacementPolicy.class); - Map properties = Maps.newHashMap(); - properties.put(ZkIsolatedBookieEnsemblePlacementPolicy.ISOLATION_BOOKIE_GROUPS, - localPolicies.get().bookieAffinityGroup.bookkeeperAffinityGroupPrimary); - properties.put(ZkIsolatedBookieEnsemblePlacementPolicy.SECONDARY_ISOLATION_BOOKIE_GROUPS, - localPolicies.get().bookieAffinityGroup.bookkeeperAffinityGroupSecondary); - managedLedgerConfig.setBookKeeperEnsemblePlacementPolicyProperties(properties); - } - managedLedgerConfig.setThrottleMarkDelete(persistencePolicies.getManagedLedgerMaxMarkDeleteRate()); - managedLedgerConfig.setDigestType(serviceConfig.getManagedLedgerDigestType()); - managedLedgerConfig.setPassword(serviceConfig.getManagedLedgerPassword()); - - managedLedgerConfig.setMaxUnackedRangesToPersist(serviceConfig.getManagedLedgerMaxUnackedRangesToPersist()); - managedLedgerConfig.setMaxUnackedRangesToPersistInZk(serviceConfig.getManagedLedgerMaxUnackedRangesToPersistInZooKeeper()); - managedLedgerConfig.setMaxEntriesPerLedger(serviceConfig.getManagedLedgerMaxEntriesPerLedger()); - managedLedgerConfig.setMinimumRolloverTime(serviceConfig.getManagedLedgerMinLedgerRolloverTimeMinutes(), - TimeUnit.MINUTES); - managedLedgerConfig.setMaximumRolloverTime(serviceConfig.getManagedLedgerMaxLedgerRolloverTimeMinutes(), - TimeUnit.MINUTES); - managedLedgerConfig.setMaxSizePerLedgerMb(serviceConfig.getManagedLedgerMaxSizePerLedgerMbytes()); - - managedLedgerConfig.setMetadataOperationsTimeoutSeconds( - serviceConfig.getManagedLedgerMetadataOperationsTimeoutSeconds()); - managedLedgerConfig.setReadEntryTimeoutSeconds(serviceConfig.getManagedLedgerReadEntryTimeoutSeconds()); - managedLedgerConfig.setAddEntryTimeoutSeconds(serviceConfig.getManagedLedgerAddEntryTimeoutSeconds()); - managedLedgerConfig.setMetadataEnsembleSize(serviceConfig.getManagedLedgerDefaultEnsembleSize()); - managedLedgerConfig.setUnackedRangesOpenCacheSetEnabled( - serviceConfig.isManagedLedgerUnackedRangesOpenCacheSetEnabled()); - managedLedgerConfig.setMetadataWriteQuorumSize(serviceConfig.getManagedLedgerDefaultWriteQuorum()); - managedLedgerConfig.setMetadataAckQuorumSize(serviceConfig.getManagedLedgerDefaultAckQuorum()); - managedLedgerConfig - .setMetadataMaxEntriesPerLedger(serviceConfig.getManagedLedgerCursorMaxEntriesPerLedger()); - - managedLedgerConfig.setLedgerRolloverTimeout(serviceConfig.getManagedLedgerCursorRolloverTimeInSeconds()); - managedLedgerConfig.setRetentionTime(retentionPolicies.getRetentionTimeInMinutes(), TimeUnit.MINUTES); - managedLedgerConfig.setRetentionSizeInMB(retentionPolicies.getRetentionSizeInMB()); - managedLedgerConfig.setAutoSkipNonRecoverableData(serviceConfig.isAutoSkipNonRecoverableData()); - managedLedgerConfig.setLazyCursorRecovery(serviceConfig.isLazyCursorRecovery()); - - OffloadPolicies nsLevelOffloadPolicies = policies.map(p -> p.offload_policies).orElse(null); - OffloadPolicies offloadPolicies = OffloadPolicies.mergeConfiguration( - topicLevelOffloadPolicies, - OffloadPolicies.oldPoliciesCompatible(nsLevelOffloadPolicies, policies.orElse(null)), - getPulsar().getConfig().getProperties()); - if (topicLevelOffloadPolicies != null) { - try { - LedgerOffloader topicLevelLedgerOffLoader = pulsar().createManagedLedgerOffloader(offloadPolicies); - managedLedgerConfig.setLedgerOffloader(topicLevelLedgerOffLoader); - } catch (PulsarServerException e) { - future.completeExceptionally(e); - return; + .setMetadataMaxEntriesPerLedger(serviceConfig.getManagedLedgerCursorMaxEntriesPerLedger()); + + managedLedgerConfig.setLedgerRolloverTimeout( + serviceConfig.getManagedLedgerCursorRolloverTimeInSeconds()); + managedLedgerConfig.setRetentionTime(retentionPolicies.getRetentionTimeInMinutes(), TimeUnit.MINUTES); + managedLedgerConfig.setRetentionSizeInMB(retentionPolicies.getRetentionSizeInMB()); + managedLedgerConfig.setAutoSkipNonRecoverableData(serviceConfig.isAutoSkipNonRecoverableData()); + managedLedgerConfig.setLazyCursorRecovery(serviceConfig.isLazyCursorRecovery()); + + OffloadPolicies nsLevelOffloadPolicies = policies.map(p -> p.offload_policies).orElse(null); + OffloadPolicies offloadPolicies = OffloadPolicies.mergeConfiguration( + topicLevelOffloadPolicies, + OffloadPolicies.oldPoliciesCompatible(nsLevelOffloadPolicies, policies.orElse(null)), + getPulsar().getConfig().getProperties()); + if (topicLevelOffloadPolicies != null) { + try { + LedgerOffloader topicLevelLedgerOffLoader = + pulsar().createManagedLedgerOffloader(offloadPolicies); + managedLedgerConfig.setLedgerOffloader(topicLevelLedgerOffLoader); + } catch (PulsarServerException e) { + future.completeExceptionally(e); + return null; + } + } else { + //If the topic level policy is null, use the namespace level + managedLedgerConfig.setLedgerOffloader( + pulsar.getManagedLedgerOffloader(namespace, offloadPolicies)); } - } else { - //If the topic level policy is null, use the namespace level - managedLedgerConfig.setLedgerOffloader(pulsar.getManagedLedgerOffloader(namespace, offloadPolicies)); - } - managedLedgerConfig.setDeletionAtBatchIndexLevelEnabled(serviceConfig.isAcknowledgmentAtBatchIndexLevelEnabled()); - managedLedgerConfig.setNewEntriesCheckDelayInMillis(serviceConfig.getManagedLedgerNewEntriesCheckDelayInMillis()); + managedLedgerConfig.setDeletionAtBatchIndexLevelEnabled( + serviceConfig.isAcknowledgmentAtBatchIndexLevelEnabled()); + managedLedgerConfig.setNewEntriesCheckDelayInMillis( + serviceConfig.getManagedLedgerNewEntriesCheckDelayInMillis()); + + + future.complete(managedLedgerConfig); + return null; + }).exceptionally(ex -> { + log.warn("Got exception when reading persistence policy for {}: {}", topicName, ex.getMessage(), ex); + future.completeExceptionally(ex); + return null; + }); - future.complete(managedLedgerConfig); }, (exception) -> future.completeExceptionally(exception))); return future; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index a5613d439a5b6..48a3714ff9d8c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -157,6 +157,7 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal @SuppressWarnings("unused") private volatile long usageCount = 0; + static final String DEDUPLICATION_CURSOR_NAME = "pulsar.dedup"; private static final double MESSAGE_EXPIRY_THRESHOLD = 1.5; @@ -1155,68 +1156,72 @@ public CompletableFuture checkReplication() { log.debug("[{}] Checking replication status", name); } - Policies policies = null; - try { - policies = brokerService.pulsar().getConfigurationCache().policiesCache() - .get(AdminResource.path(POLICIES, name.getNamespace())) - .orElseThrow(() -> new KeeperException.NoNodeException()); - } catch (Exception e) { - CompletableFuture future = new CompletableFuture<>(); - future.completeExceptionally(new ServerMetadataException(e)); - return future; - } - //Ignore current broker's config for messageTTL for replication. - final int newMessageTTLinSeconds; - try { - newMessageTTLinSeconds = getMessageTTL(); - } catch (Exception e) { - return FutureUtil.failedFuture(new ServerMetadataException(e)); - } + return brokerService.pulsar().getConfigurationCache().policiesCache() + .getAsync(AdminResource.path(POLICIES, name.getNamespace())) + .thenCompose(optPolicies -> { + if (!optPolicies.isPresent()) { + return FutureUtil.failedFuture( + new ServerMetadataException("Namespace not found: " + name.getNamespace())); + } - Set configuredClusters; - if (policies.replication_clusters != null) { - configuredClusters = Sets.newTreeSet(policies.replication_clusters); - } else { - configuredClusters = Collections.emptySet(); - } + Policies policies = optPolicies.get(); - String localCluster = brokerService.pulsar().getConfiguration().getClusterName(); + //Ignore current broker's config for messageTTL for replication. + final int newMessageTTLinSeconds; + try { + newMessageTTLinSeconds = getMessageTTL(); + } catch (Exception e) { + return FutureUtil.failedFuture(new ServerMetadataException(e)); + } - // if local cluster is removed from global namespace cluster-list : then delete topic forcefully because pulsar - // doesn't serve global topic without local repl-cluster configured. - if (TopicName.get(topic).isGlobal() && !configuredClusters.contains(localCluster)) { - log.info("Deleting topic [{}] because local cluster is not part of global namespace repl list {}", - topic, configuredClusters); - return deleteForcefully(); - } + Set configuredClusters; + if (policies.replication_clusters != null) { + configuredClusters = Sets.newTreeSet(policies.replication_clusters); + } else { + configuredClusters = Collections.emptySet(); + } - List> futures = Lists.newArrayList(); + String localCluster = brokerService.pulsar().getConfiguration().getClusterName(); + + // if local cluster is removed from global namespace cluster-list : then delete topic forcefully + // because pulsar + // doesn't serve global topic without local repl-cluster configured. + if (TopicName.get(topic).isGlobal() && !configuredClusters.contains(localCluster)) { + log.info( + "Deleting topic [{}] because local cluster is not part of global namespace repl list " + + "{}", + topic, configuredClusters); + return deleteForcefully(); + } - // Check for missing replicators - for (String cluster : configuredClusters) { - if (cluster.equals(localCluster)) { - continue; - } + List> futures = Lists.newArrayList(); - if (!replicators.containsKey(cluster)) { - futures.add(startReplicator(cluster)); - } - } + // Check for missing replicators + for (String cluster : configuredClusters) { + if (cluster.equals(localCluster)) { + continue; + } - // Check for replicators to be stopped - replicators.forEach((cluster, replicator) -> { - // Update message TTL - ((PersistentReplicator) replicator).updateMessageTTL(newMessageTTLinSeconds); + if (!replicators.containsKey(cluster)) { + futures.add(startReplicator(cluster)); + } + } - if (!cluster.equals(localCluster)) { - if (!configuredClusters.contains(cluster)) { - futures.add(removeReplicator(cluster)); - } - } + // Check for replicators to be stopped + replicators.forEach((cluster, replicator) -> { + // Update message TTL + ((PersistentReplicator) replicator).updateMessageTTL(newMessageTTLinSeconds); - }); + if (!cluster.equals(localCluster)) { + if (!configuredClusters.contains(cluster)) { + futures.add(removeReplicator(cluster)); + } + } - return FutureUtil.waitForAll(futures); + }); + + return FutureUtil.waitForAll(futures); + }); } @Override From a7ddc8ae967a498b97097d364aa9396caf220cc1 Mon Sep 17 00:00:00 2001 From: JiangHaiting Date: Sun, 31 Jul 2022 16:05:03 +0800 Subject: [PATCH 2/2] fix bug in BrokerService --- .../org/apache/pulsar/broker/service/BrokerService.java | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index 40fa540b9c02f..215835e10b0ba 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -1186,10 +1186,6 @@ public CompletableFuture getManagedLedgerConfig(TopicName t NamespaceName namespace = topicName.getNamespaceObject(); ServiceConfiguration serviceConfig = pulsar.getConfiguration(); - // Get persistence policy for this topic - Optional policies = Optional.empty(); - Optional localPolicies = Optional.empty(); - PersistencePolicies tmpPersistencePolicies = null; RetentionPolicies tmpRetentionPolicies = null; OffloadPolicies tmpTopicLevelOffloadPolicies = null; @@ -1218,8 +1214,7 @@ public CompletableFuture getManagedLedgerConfig(TopicName t String path = joinPath(LOCAL_POLICIES_ROOT, topicName.getNamespaceObject().toString()); CompletableFuture> localPoliciesFuture = pulsar().getLocalZkCacheService().policiesCache().getAsync(path); - - policiesFuture.thenCombine(localPoliciesFuture, (optPolicies, optLocalPolicies) -> { + policiesFuture.thenCombine(localPoliciesFuture, (policies, localPolicies) -> { PersistencePolicies persistencePolicies = finalPersistencePolicies; RetentionPolicies retentionPolicies = finalRetentionPolicies; OffloadPolicies topicLevelOffloadPolicies = finalTopicLevelOffloadPolicies;