diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerException.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerException.java index a22e98a3090ec..87576517c3c05 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerException.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerException.java @@ -63,6 +63,12 @@ public ConcurrentFindCursorPositionException(String msg) { } } + public static class CursorAlreadyClosedException extends ManagedLedgerException { + public CursorAlreadyClosedException(String msg) { + super(msg); + } + } + public static class TooManyRequestsException extends ManagedLedgerException { public TooManyRequestsException(String msg) { super(msg); diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java index 799bf1251bf8a..7e11d68a538b5 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java @@ -24,7 +24,10 @@ import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.*; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -47,6 +50,7 @@ import org.apache.bookkeeper.mledger.ManagedCursor; import org.apache.bookkeeper.mledger.ManagedLedgerConfig; import org.apache.bookkeeper.mledger.ManagedLedgerException; +import org.apache.bookkeeper.mledger.ManagedLedgerException.CursorAlreadyClosedException; import org.apache.bookkeeper.mledger.ManagedLedgerException.MetaStoreException; import org.apache.bookkeeper.mledger.Position; import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.PositionBound; @@ -451,7 +455,7 @@ public void readEntriesFailed(ManagedLedgerException exception, Object ctx) { public void asyncReadEntriesOrWait(int numberOfEntriesToRead, ReadEntriesCallback callback, Object ctx) { checkArgument(numberOfEntriesToRead > 0); if (STATE_UPDATER.get(this) == State.Closed) { - callback.readEntriesFailed(new ManagedLedgerException("Cursor was already closed"), ctx); + callback.readEntriesFailed(new CursorAlreadyClosedException("Cursor was already closed"), ctx); return; } diff --git a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/persistent/PersistentReplicator.java b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/persistent/PersistentReplicator.java index 9c7f6c26b4fd5..1a0c174517faf 100644 --- a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/persistent/PersistentReplicator.java +++ b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/persistent/PersistentReplicator.java @@ -30,21 +30,22 @@ import org.apache.bookkeeper.mledger.ManagedCursor; import org.apache.bookkeeper.mledger.ManagedCursor.IndividualDeletedEntries; import org.apache.bookkeeper.mledger.ManagedLedgerException; +import org.apache.bookkeeper.mledger.ManagedLedgerException.CursorAlreadyClosedException; import org.apache.bookkeeper.mledger.ManagedLedgerException.TooManyRequestsException; import org.apache.bookkeeper.mledger.util.Rate; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.yahoo.pulsar.common.policies.data.ReplicatorStats; import com.yahoo.pulsar.broker.service.BrokerService; import com.yahoo.pulsar.broker.service.BrokerServiceException.TopicBusyException; import com.yahoo.pulsar.client.api.MessageId; import com.yahoo.pulsar.client.api.ProducerConfiguration; import com.yahoo.pulsar.client.impl.Backoff; -import com.yahoo.pulsar.client.impl.PulsarClientImpl; import com.yahoo.pulsar.client.impl.MessageImpl; import com.yahoo.pulsar.client.impl.ProducerImpl; +import com.yahoo.pulsar.client.impl.PulsarClientImpl; import com.yahoo.pulsar.client.impl.SendCallback; +import com.yahoo.pulsar.common.policies.data.ReplicatorStats; import io.netty.buffer.ByteBuf; import io.netty.util.Recycler; @@ -488,7 +489,13 @@ public void readEntriesFailed(ManagedLedgerException exception, Object ctx) { long waitTimeMillis = readFailureBackoff.next(); - if (!(exception instanceof TooManyRequestsException)) { + if(exception instanceof CursorAlreadyClosedException) { + log.error("[{}][{} -> {}] Error reading entries because replicator is already deleted and cursor is already closed {}, ({})", topic, localCluster, + remoteCluster, ctx, exception.getMessage(), exception); + // replicator is already deleted and cursor is already closed so, producer should also be stopped + closeProducerAsync(); + return; + }else if (!(exception instanceof TooManyRequestsException)) { log.error("[{}][{} -> {}] Error reading entries at {}. Retrying to read in {}s. ({})", topic, localCluster, remoteCluster, ctx, waitTimeMillis / 1000.0, exception.getMessage(), exception); } else { diff --git a/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/service/ReplicatorTest.java b/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/service/ReplicatorTest.java index 061a6bc6bccdd..43a40f47f5b1c 100644 --- a/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/service/ReplicatorTest.java +++ b/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/service/ReplicatorTest.java @@ -35,8 +35,11 @@ import java.util.concurrent.TimeUnit; import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCursorCallback; +import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback; import org.apache.bookkeeper.mledger.Entry; +import org.apache.bookkeeper.mledger.ManagedCursor; import org.apache.bookkeeper.mledger.ManagedLedgerException; +import org.apache.bookkeeper.mledger.ManagedLedgerException.CursorAlreadyClosedException; import org.mockito.Mockito; import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; import org.slf4j.Logger; @@ -741,6 +744,67 @@ public void testResumptionAfterBacklogRelaxed() throws Exception { } } + /** + * It verifies that PersistentReplicator considers CursorAlreadyClosedException as non-retriable-read exception and + * it should closed the producer as cursor is already closed because replicator is already deleted. + * + * @throws Exception + */ + @Test(timeOut = 5000) + public void testCloseReplicatorStartProducer() throws Exception { + + DestinationName dest = DestinationName.get("persistent://pulsar/global/ns1/closeCursor"); + // Producer on r1 + MessageProducer producer1 = new MessageProducer(url1, dest); + // Consumer on r1 + MessageConsumer consumer1 = new MessageConsumer(url1, dest); + // Consumer on r2 + MessageConsumer consumer2 = new MessageConsumer(url2, dest); + + // Replicator for r1 -> r2 + PersistentTopic topic = (PersistentTopic) pulsar1.getBrokerService().getTopicReference(dest.toString()); + PersistentReplicator replicator = topic.getPersistentReplicator("r2"); + + // close the cursor + Field cursorField = PersistentReplicator.class.getDeclaredField("cursor"); + cursorField.setAccessible(true); + ManagedCursor cursor = (ManagedCursor) cursorField.get(replicator); + cursor.close(); + // try to read entries + CountDownLatch latch = new CountDownLatch(1); + producer1.produce(10); + cursor.asyncReadEntriesOrWait(10, new ReadEntriesCallback() { + @Override + public void readEntriesComplete(List entries, Object ctx) { + latch.countDown(); + fail("it should have been failed"); + } + + @Override + public void readEntriesFailed(ManagedLedgerException exception, Object ctx) { + latch.countDown(); + assertTrue(exception instanceof CursorAlreadyClosedException); + } + }, null); + + // replicator-readException: cursorAlreadyClosed + replicator.readEntriesFailed(new CursorAlreadyClosedException("Cursor already closed exception"), null); + + // wait replicator producer to be closed + Thread.sleep(1000); + + // Replicator producer must be closed + Field producerField = PersistentReplicator.class.getDeclaredField("producer"); + producerField.setAccessible(true); + ProducerImpl replicatorProducer = (ProducerImpl) producerField.get(replicator); + assertEquals(replicatorProducer, null); + + producer1.close(); + consumer1.close(); + consumer2.close(); + + } + private static final Logger log = LoggerFactory.getLogger(ReplicatorTest.class); }