diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java index 18e2e2d16c37d..a142c80400172 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.broker.service; +import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; @@ -44,6 +45,7 @@ public abstract class AbstractReplicator { protected final String remoteCluster; protected final PulsarClientImpl replicationClient; protected final PulsarClientImpl client; + protected final Topic localTopic; protected volatile ProducerImpl producer; public static final String REPL_PRODUCER_NAME_DELIMITER = "-->"; @@ -64,11 +66,12 @@ protected enum State { Stopped, Starting, Started, Stopping } - public AbstractReplicator(String topicName, String replicatorPrefix, String localCluster, String remoteCluster, + public AbstractReplicator(Topic localTopic, String replicatorPrefix, String localCluster, String remoteCluster, BrokerService brokerService, PulsarClientImpl replicationClient) throws PulsarServerException { this.brokerService = brokerService; - this.topicName = topicName; + this.localTopic = localTopic; + this.topicName = localTopic.getName(); this.replicatorPrefix = replicatorPrefix; this.localCluster = localCluster.intern(); this.remoteCluster = remoteCluster.intern(); @@ -111,7 +114,8 @@ public synchronized void startProducer() { topicName, localCluster, remoteCluster, waitTimeMs / 1000.0); } // BackOff before retrying - brokerService.executor().schedule(this::startProducer, waitTimeMs, TimeUnit.MILLISECONDS); + brokerService.executor().schedule(this::checkTopicActiveAndRetryStartProducer, + waitTimeMs, TimeUnit.MILLISECONDS); return; } State state = STATE_UPDATER.get(this); @@ -139,7 +143,8 @@ public synchronized void startProducer() { localCluster, remoteCluster, ex.getMessage(), waitTimeMs / 1000.0); // BackOff before retrying - brokerService.executor().schedule(this::startProducer, waitTimeMs, TimeUnit.MILLISECONDS); + brokerService.executor().schedule(this::checkTopicActiveAndRetryStartProducer, + waitTimeMs, TimeUnit.MILLISECONDS); } else { log.warn("[{}][{} -> {}] Failed to create remote producer. Replicator state: {}", topicName, localCluster, remoteCluster, STATE_UPDATER.get(this), ex); @@ -149,6 +154,32 @@ public synchronized void startProducer() { } + protected void checkTopicActiveAndRetryStartProducer() { + isLocalTopicActive().thenAccept(isTopicActive -> { + if (isTopicActive) { + startProducer(); + } + }).exceptionally(ex -> { + log.warn("[{}] Stop retry to create producer due to topic load fail. Replicator state: {}", + String.format("%s%s%s", getReplicatorName(replicatorPrefix, localCluster), + REPL_PRODUCER_NAME_DELIMITER, remoteCluster), STATE_UPDATER.get(this), ex); + return null; + }); + } + + protected CompletableFuture isLocalTopicActive() { + CompletableFuture> topicFuture = brokerService.getTopics().get(topicName); + if (topicFuture == null){ + return CompletableFuture.completedFuture(false); + } + return topicFuture.thenApplyAsync(optional -> { + if (optional.isEmpty()) { + return false; + } + return optional.get() == localTopic; + }, brokerService.executor()); + } + protected synchronized CompletableFuture closeProducerAsync() { if (producer == null) { STATE_UPDATER.set(this, State.Stopped); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentReplicator.java index b863e9eb3c2cd..40d19e8176da0 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentReplicator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentReplicator.java @@ -50,7 +50,7 @@ public class NonPersistentReplicator extends AbstractReplicator implements Repli public NonPersistentReplicator(NonPersistentTopic topic, String localCluster, String remoteCluster, BrokerService brokerService, PulsarClientImpl replicationClient) throws PulsarServerException { - super(topic.getName(), topic.getReplicatorPrefix(), localCluster, remoteCluster, brokerService, + super(topic, topic.getReplicatorPrefix(), localCluster, remoteCluster, brokerService, replicationClient); producerBuilder.blockIfQueueFull(false); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java index 0471f12f3c93e..75ea294329efc 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java @@ -110,7 +110,7 @@ public class PersistentReplicator extends AbstractReplicator public PersistentReplicator(PersistentTopic topic, ManagedCursor cursor, String localCluster, String remoteCluster, BrokerService brokerService, PulsarClientImpl replicationClient) throws PulsarServerException { - super(topic.getName(), topic.getReplicatorPrefix(), localCluster, remoteCluster, brokerService, + super(topic, topic.getReplicatorPrefix(), localCluster, remoteCluster, brokerService, replicationClient); this.topic = topic; this.cursor = cursor; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractReplicatorTest.java new file mode 100644 index 0000000000000..30be3c55c596d --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractReplicatorTest.java @@ -0,0 +1,146 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.pulsar.broker.service; + +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.anyBoolean; +import static org.mockito.Mockito.anyInt; +import static org.mockito.Mockito.anyString; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import io.netty.channel.DefaultEventLoop; +import io.netty.util.internal.DefaultPriorityQueue; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.bookkeeper.mledger.Position; +import org.apache.bookkeeper.mledger.impl.PositionImpl; +import org.apache.pulsar.broker.PulsarServerException; +import org.apache.pulsar.broker.PulsarService; +import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.ProducerBuilder; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.impl.PulsarClientImpl; +import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap; +import org.awaitility.Awaitility; +import org.awaitility.reflect.WhiteboxImpl; +import org.testng.Assert; +import org.testng.annotations.Test; + + +@Test(groups = "broker") +public class AbstractReplicatorTest { + + @Test + public void testRetryStartProducerStoppedByTopicRemove() throws Exception { + final String localCluster = "localCluster"; + final String remoteCluster = "remoteCluster"; + final String topicName = "remoteTopicName"; + final String replicatorPrefix = "pulsar.repl"; + final DefaultEventLoop eventLoopGroup = new DefaultEventLoop(); + // Mock services. + final ServiceConfiguration pulsarConfig = mock(ServiceConfiguration.class); + final PulsarService pulsar = mock(PulsarService.class); + final BrokerService broker = mock(BrokerService.class); + final Topic localTopic = mock(Topic.class); + final PulsarClientImpl localClient = mock(PulsarClientImpl.class); + final PulsarClientImpl remoteClient = mock(PulsarClientImpl.class); + final ProducerBuilder producerBuilder = mock(ProducerBuilder.class); + final ConcurrentOpenHashMap>> topics = new ConcurrentOpenHashMap<>(); + when(broker.executor()).thenReturn(eventLoopGroup); + when(broker.getTopics()).thenReturn(topics); + when(remoteClient.newProducer(any(Schema.class))).thenReturn(producerBuilder); + when(broker.pulsar()).thenReturn(pulsar); + when(pulsar.getClient()).thenReturn(localClient); + when(pulsar.getConfiguration()).thenReturn(pulsarConfig); + when(pulsarConfig.getReplicationProducerQueueSize()).thenReturn(100); + when(localTopic.getName()).thenReturn(topicName); + when(producerBuilder.topic(any())).thenReturn(producerBuilder); + when(producerBuilder.messageRoutingMode(any())).thenReturn(producerBuilder); + when(producerBuilder.enableBatching(anyBoolean())).thenReturn(producerBuilder); + when(producerBuilder.sendTimeout(anyInt(), any())).thenReturn(producerBuilder); + when(producerBuilder.maxPendingMessages(anyInt())).thenReturn(producerBuilder); + when(producerBuilder.producerName(anyString())).thenReturn(producerBuilder); + // Mock create producer fail. + when(producerBuilder.create()).thenThrow(new RuntimeException("mocked ex")); + when(producerBuilder.createAsync()) + .thenReturn(CompletableFuture.failedFuture(new RuntimeException("mocked ex"))); + // Make race condition: "retry start producer" and "close replicator". + final ReplicatorInTest replicator = new ReplicatorInTest(localTopic, remoteCluster, topicName, + replicatorPrefix, broker, remoteClient); + replicator.startProducer(); + replicator.disconnect(); + + // Verify task will done. + Awaitility.await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> { + AtomicInteger taskCounter = new AtomicInteger(); + CountDownLatch checkTaskFinished = new CountDownLatch(1); + eventLoopGroup.execute(() -> { + synchronized (replicator) { + LinkedBlockingQueue taskQueue = WhiteboxImpl.getInternalState(eventLoopGroup, "taskQueue"); + DefaultPriorityQueue scheduledTaskQueue = + WhiteboxImpl.getInternalState(eventLoopGroup, "scheduledTaskQueue"); + taskCounter.set(taskQueue.size() + scheduledTaskQueue.size()); + checkTaskFinished.countDown(); + } + }); + checkTaskFinished.await(); + Assert.assertEquals(taskCounter.get(), 0); + }); + } + + private static class ReplicatorInTest extends AbstractReplicator { + + public ReplicatorInTest(Topic localTopic, String remoteCluster, String remoteTopicName, + String replicatorPrefix, BrokerService brokerService, + PulsarClientImpl replicationClient) throws PulsarServerException { + super(localTopic, remoteCluster, remoteTopicName, replicatorPrefix, brokerService, + replicationClient); + } + + protected String getProducerName() { + return "pulsar.repl.producer"; + } + + @Override + protected void readEntries(Producer producer) { + + } + + @Override + protected Position getReplicatorReadPosition() { + return PositionImpl.EARLIEST; + } + + @Override + protected long getNumberOfEntriesInBacklog() { + return 0; + } + + @Override + protected void disableReplicatorRead() { + + } + } +}