From d9b1b4ce5c08570c4c4be937630aad24112484da Mon Sep 17 00:00:00 2001 From: rdhabalia Date: Tue, 21 Feb 2017 18:55:29 -0800 Subject: [PATCH 1/3] Avoid triggering startReplProducer on newAddProducer as it may flips replicator state wrongly --- .../pulsar/broker/service/persistent/PersistentReplicator.java | 2 +- .../pulsar/broker/service/persistent/PersistentTopic.java | 3 --- 2 files changed, 1 insertion(+), 4 deletions(-) diff --git a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/persistent/PersistentReplicator.java b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/persistent/PersistentReplicator.java index 9c7f6c26b4fd5..be8d00922fccc 100644 --- a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/persistent/PersistentReplicator.java +++ b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/persistent/PersistentReplicator.java @@ -193,7 +193,7 @@ public synchronized void startProducer() { // BackOff before retrying brokerService.executor().schedule(this::startProducer, waitTimeMs, TimeUnit.MILLISECONDS); } else { - log.warn("[{}][{} -> {}] Failed to create remote producer. Replicator state: ", topicName, + log.warn("[{}][{} -> {}] Failed to create remote producer. Replicator state: {}", topicName, localCluster, remoteCluster, STATE_UPDATER.get(this), ex); } return null; diff --git a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/persistent/PersistentTopic.java index 6fc13a9b3aa88..4e750e86eb139 100644 --- a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/persistent/PersistentTopic.java @@ -237,9 +237,6 @@ public void addProducer(Producer producer) throws BrokerServiceException { if (log.isDebugEnabled()) { log.debug("[{}] [{}] Added producer -- count: {}", topic, producer.getProducerName(), USAGE_COUNT_UPDATER.get(this)); } - - // Start replication producers if not already - startReplProducers(); } finally { lock.readLock().unlock(); } From 9717f115e3a5c8e44c11a10e1bb9aa6412fb015b Mon Sep 17 00:00:00 2001 From: rdhabalia Date: Tue, 21 Feb 2017 23:26:35 -0800 Subject: [PATCH 2/3] Signal replicator is stopping if porducer is not created yet --- .../pulsar/broker/service/persistent/PersistentReplicator.java | 2 +- .../pulsar/broker/service/persistent/PersistentTopic.java | 3 +++ 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/persistent/PersistentReplicator.java b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/persistent/PersistentReplicator.java index be8d00922fccc..d996380db0a58 100644 --- a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/persistent/PersistentReplicator.java +++ b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/persistent/PersistentReplicator.java @@ -617,7 +617,7 @@ public synchronized CompletableFuture disconnect(boolean failIfHasBacklog) return closeProducerAsync(); } else { // If there's already a reconnection happening, signal to close it whenever it's ready - STATE_UPDATER.set(this, State.Stopped); + STATE_UPDATER.set(this, State.Stopping); } return CompletableFuture.completedFuture(null); } diff --git a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/persistent/PersistentTopic.java index 4e750e86eb139..6fc13a9b3aa88 100644 --- a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/persistent/PersistentTopic.java @@ -237,6 +237,9 @@ public void addProducer(Producer producer) throws BrokerServiceException { if (log.isDebugEnabled()) { log.debug("[{}] [{}] Added producer -- count: {}", topic, producer.getProducerName(), USAGE_COUNT_UPDATER.get(this)); } + + // Start replication producers if not already + startReplProducers(); } finally { lock.readLock().unlock(); } From a8fa6ccb66992a43cedf78af3a9a49448c091eb9 Mon Sep 17 00:00:00 2001 From: rdhabalia Date: Wed, 22 Feb 2017 00:38:35 -0800 Subject: [PATCH 3/3] read repl-cluster from policies to avoid restart of closing-replicator --- .../service/persistent/PersistentTopic.java | 22 +++++- .../broker/service/PersistentTopicTest.java | 67 +++++++++++++++++++ 2 files changed, 87 insertions(+), 2 deletions(-) diff --git a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/persistent/PersistentTopic.java index 6fc13a9b3aa88..06659b9b2f710 100644 --- a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/persistent/PersistentTopic.java @@ -267,8 +267,26 @@ private boolean hasRemoteProducers() { return foundRemote.get(); } - private void startReplProducers() { - replicators.forEach((region, replicator) -> replicator.startProducer()); + public void startReplProducers() { + // read repl-cluster from policies to avoid restart of replicator which are in process of disconnect and close + try { + Policies policies = brokerService.pulsar().getConfigurationCache().policiesCache() + .get(AdminResource.path("policies", DestinationName.get(topic).getNamespace())) + .orElseThrow(() -> new KeeperException.NoNodeException()); + if (policies.replication_clusters != null) { + Set configuredClusters = Sets.newTreeSet(policies.replication_clusters); + replicators.forEach((region, replicator) -> { + if (configuredClusters.contains(region)) { + replicator.startProducer(); + } + }); + } + } catch (Exception e) { + if (log.isDebugEnabled()) { + log.debug("[{}] Error getting policies while starting repl-producers {}", topic, e.getMessage()); + } + replicators.forEach((region, replicator) -> replicator.startProducer()); + } } public CompletableFuture stopReplProducers() { diff --git a/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/service/PersistentTopicTest.java b/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/service/PersistentTopicTest.java index d3e7517af32de..e8347fba9a4b7 100644 --- a/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/service/PersistentTopicTest.java +++ b/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/service/PersistentTopicTest.java @@ -22,6 +22,7 @@ import static org.mockito.Mockito.atLeast; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.verify; @@ -32,7 +33,9 @@ import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; +import java.lang.reflect.Method; import java.net.InetSocketAddress; +import java.net.URL; import java.util.ArrayList; import java.util.Optional; import java.util.concurrent.CompletableFuture; @@ -57,8 +60,11 @@ import org.apache.bookkeeper.mledger.ManagedLedgerConfig; import org.apache.bookkeeper.mledger.ManagedLedgerException; import org.apache.bookkeeper.mledger.ManagedLedgerFactory; +import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl; import org.apache.bookkeeper.mledger.impl.PositionImpl; import org.apache.zookeeper.ZooKeeper; +import org.mockito.ArgumentCaptor; +import org.mockito.Mockito; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; import org.slf4j.Logger; @@ -69,12 +75,15 @@ import com.yahoo.pulsar.broker.PulsarService; import com.yahoo.pulsar.broker.ServiceConfiguration; +import com.yahoo.pulsar.broker.admin.AdminResource; import com.yahoo.pulsar.broker.cache.ConfigurationCacheService; import com.yahoo.pulsar.broker.namespace.NamespaceService; import com.yahoo.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers; import com.yahoo.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer; +import com.yahoo.pulsar.broker.service.persistent.PersistentReplicator; import com.yahoo.pulsar.broker.service.persistent.PersistentSubscription; import com.yahoo.pulsar.broker.service.persistent.PersistentTopic; +import com.yahoo.pulsar.client.api.PulsarClient; import com.yahoo.pulsar.common.api.proto.PulsarApi.CommandSubscribe; import com.yahoo.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType; import com.yahoo.pulsar.common.api.proto.PulsarApi.MessageMetadata; @@ -828,4 +837,62 @@ public void testFailoverSubscription() throws Exception { assertNull(topic2.getPersistentSubscription(successSubName)); } + + /** + * {@link PersistentReplicator.removeReplicator} doesn't remove replicator in atomic way and does in multiple step: + * 1. disconnect replicator producer + *

+ * 2. close cursor + *

+ * 3. remove from replicator-list. + *

+ * + * If we try to startReplicationProducer before step-c finish then it should not avoid restarting repl-producer. + * + * @throws Exception + */ + @Test + public void testAtomicReplicationRemoval() throws Exception { + final String globalTopicName = "persistent://prop/global/ns-abc/successTopic"; + String localCluster = "local"; + String remoteCluster = "remote"; + final ManagedLedger ledgerMock = mock(ManagedLedger.class); + doNothing().when(ledgerMock).asyncDeleteCursor(anyObject(), anyObject(), anyObject()); + doReturn(new ArrayList()).when(ledgerMock).getCursors(); + + PersistentTopic topic = new PersistentTopic(globalTopicName, ledgerMock, brokerService); + String remoteReplicatorName = topic.replicatorPrefix + "." + remoteCluster; + ConcurrentOpenHashMap replicatorMap = topic.getReplicators(); + ; + final URL brokerUrl = new URL( + "http://" + pulsar.getAdvertisedAddress() + ":" + pulsar.getConfiguration().getBrokerServicePort()); + PulsarClient client = PulsarClient.create(brokerUrl.toString()); + ManagedCursor cursor = mock(ManagedCursorImpl.class); + doReturn(remoteCluster).when(cursor).getName(); + brokerService.getReplicationClients().put(remoteCluster, client); + PersistentReplicator replicator = spy( + new PersistentReplicator(topic, cursor, localCluster, remoteCluster, brokerService)); + replicatorMap.put(remoteReplicatorName, replicator); + + // step-1 remove replicator : it will disconnect the producer but it will wait for callback to be completed + Method removeMethod = PersistentTopic.class.getDeclaredMethod("removeReplicator", String.class); + removeMethod.setAccessible(true); + removeMethod.invoke(topic, remoteReplicatorName); + + // step-2 now, policies doesn't have removed replication cluster so, it should not invoke "startProducer" of the + // replicator + when(pulsar.getConfigurationCache().policiesCache() + .get(AdminResource.path("policies", DestinationName.get(globalTopicName).getNamespace()))) + .thenReturn(Optional.of(new Policies())); + // try to start replicator again + topic.startReplProducers(); + // verify: replicator.startProducer is not invoked + verify(replicator, Mockito.times(0)).startProducer(); + + // step-3 : complete the callback to remove replicator from the list + ArgumentCaptor captor = ArgumentCaptor.forClass(DeleteCursorCallback.class); + Mockito.verify(ledgerMock).asyncDeleteCursor(anyObject(), captor.capture(), anyObject()); + DeleteCursorCallback callback = captor.getValue(); + callback.deleteCursorComplete(null); + } }