From fe8425d2135b0fa99102fadbd98066161f05b6b9 Mon Sep 17 00:00:00 2001 From: tison Date: Mon, 15 May 2023 10:47:00 +0800 Subject: [PATCH 1/2] Revert "[fix][client] Seek should be thread-safe (#20242)" This reverts commit bc1764f9ef71dd31e8cd61c7571e493442bc6395. --- .../pulsar/client/impl/ConsumerImpl.java | 109 ++++++++---------- .../pulsar/client/impl/ConsumerImplTest.java | 27 +---- 2 files changed, 50 insertions(+), 86 deletions(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index 2693f12d3ea8e..8a06ec122b5e6 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -208,7 +208,6 @@ public class ConsumerImpl extends ConsumerBase implements ConnectionHandle private final AtomicReference clientCnxUsedForConsumerRegistration = new AtomicReference<>(); private final List previousExceptions = new CopyOnWriteArrayList(); - static ConsumerImpl newConsumerImpl(PulsarClientImpl client, String topic, ConsumerConfigurationData conf, @@ -252,12 +251,10 @@ static ConsumerImpl newConsumerImpl(PulsarClientImpl client, } protected ConsumerImpl(PulsarClientImpl client, String topic, ConsumerConfigurationData conf, - ExecutorProvider executorProvider, int partitionIndex, boolean hasParentConsumer, - boolean parentConsumerHasListener, CompletableFuture> subscribeFuture, - MessageId startMessageId, - long startMessageRollbackDurationInSec, Schema schema, - ConsumerInterceptors interceptors, - boolean createTopicIfDoesNotExist) { + ExecutorProvider executorProvider, int partitionIndex, boolean hasParentConsumer, + boolean parentConsumerHasListener, CompletableFuture> subscribeFuture, MessageId startMessageId, + long startMessageRollbackDurationInSec, Schema schema, ConsumerInterceptors interceptors, + boolean createTopicIfDoesNotExist) { super(client, topic, conf, conf.getReceiverQueueSize(), executorProvider, subscribeFuture, schema, interceptors); this.consumerId = client.newConsumerId(); @@ -329,21 +326,21 @@ protected ConsumerImpl(PulsarClientImpl client, String topic, ConsumerConfigurat } this.connectionHandler = new ConnectionHandler(this, - new BackoffBuilder() - .setInitialTime(client.getConfiguration().getInitialBackoffIntervalNanos(), - TimeUnit.NANOSECONDS) - .setMax(client.getConfiguration().getMaxBackoffIntervalNanos(), TimeUnit.NANOSECONDS) - .setMandatoryStop(0, TimeUnit.MILLISECONDS) - .create(), + new BackoffBuilder() + .setInitialTime(client.getConfiguration().getInitialBackoffIntervalNanos(), + TimeUnit.NANOSECONDS) + .setMax(client.getConfiguration().getMaxBackoffIntervalNanos(), TimeUnit.NANOSECONDS) + .setMandatoryStop(0, TimeUnit.MILLISECONDS) + .create(), this); this.topicName = TopicName.get(topic); if (this.topicName.isPersistent()) { this.acknowledgmentsGroupingTracker = - new PersistentAcknowledgmentsGroupingTracker(this, conf, client.eventLoopGroup()); + new PersistentAcknowledgmentsGroupingTracker(this, conf, client.eventLoopGroup()); } else { this.acknowledgmentsGroupingTracker = - NonPersistentAcknowledgmentGroupingTracker.of(); + NonPersistentAcknowledgmentGroupingTracker.of(); } if (conf.getDeadLetterPolicy() != null) { @@ -420,16 +417,16 @@ public CompletableFuture unsubscribeAsync() { log.error("[{}][{}] Failed to unsubscribe: {}", topic, subscription, e.getCause().getMessage()); setState(State.Ready); unsubscribeFuture.completeExceptionally( - PulsarClientException.wrap(e.getCause(), - String.format("Failed to unsubscribe the subscription %s of topic %s", - topicName.toString(), subscription))); + PulsarClientException.wrap(e.getCause(), + String.format("Failed to unsubscribe the subscription %s of topic %s", + topicName.toString(), subscription))); return null; }); } else { unsubscribeFuture.completeExceptionally( - new PulsarClientException( - String.format("The client is not connected to the broker when unsubscribing the " - + "subscription %s of the topic %s", subscription, topicName.toString()))); + new PulsarClientException( + String.format("The client is not connected to the broker when unsubscribing the " + + "subscription %s of the topic %s", subscription, topicName.toString()))); } return unsubscribeFuture; } @@ -1410,7 +1407,7 @@ void messageReceived(CommandMessage cmdMessage, ByteBuf headersAndPayload, Clien } private ByteBuf processMessageChunk(ByteBuf compressedPayload, MessageMetadata msgMetadata, MessageIdImpl msgId, - MessageIdData messageId, ClientCnx cnx) { + MessageIdData messageId, ClientCnx cnx) { // Lazy task scheduling to expire incomplete chunk message if (expireTimeOfIncompleteChunkedMessageMillis > 0 && expireChunkMessageTaskScheduled.compareAndSet(false, @@ -1455,7 +1452,7 @@ private ByteBuf processMessageChunk(ByteBuf compressedPayload, MessageMetadata m increaseAvailablePermits(cnx); if (expireTimeOfIncompleteChunkedMessageMillis > 0 && System.currentTimeMillis() > (msgMetadata.getPublishTime() - + expireTimeOfIncompleteChunkedMessageMillis)) { + + expireTimeOfIncompleteChunkedMessageMillis)) { doAcknowledge(msgId, AckType.Individual, Collections.emptyMap(), null); } else { trackMessage(msgId); @@ -1646,7 +1643,7 @@ protected void trackMessage(Message msg) { } protected void trackMessage(MessageId messageId) { - trackMessage(messageId, 0); + trackMessage(messageId, 0); } protected void trackMessage(MessageId messageId, int redeliveryCount) { @@ -1787,7 +1784,7 @@ private ByteBuf decryptPayloadIfNeeded(MessageIdData messageId, int redeliveryCo } private ByteBuf uncompressPayloadIfNeeded(MessageIdData messageId, MessageMetadata msgMetadata, ByteBuf payload, - ClientCnx currentCnx, boolean checkMaxMessageSize) { + ClientCnx currentCnx, boolean checkMaxMessageSize) { CompressionType compressionType = msgMetadata.getCompression(); CompressionCodec codec = CompressionCodecProvider.getCompressionCodec(compressionType); int uncompressedSize = msgMetadata.getUncompressedSize(); @@ -1840,7 +1837,7 @@ private void discardCorruptedMessage(MessageIdImpl messageId, ClientCnx currentC } private void discardCorruptedMessage(MessageIdData messageId, ClientCnx currentCnx, - ValidationError validationError) { + ValidationError validationError) { log.error("[{}][{}] Discarding corrupted message at {}:{}", topic, subscription, messageId.getLedgerId(), messageId.getEntryId()); discardMessage(messageId, currentCnx, validationError); @@ -2032,8 +2029,8 @@ private CompletableFuture processPossibleToDLQ(MessageIdAdv messageId) String originTopicNameStr = getOriginTopicNameStr(message); TypedMessageBuilder typedMessageBuilderNew = producerDLQ.newMessage(Schema.AUTO_PRODUCE_BYTES(message.getReaderSchema().get())) - .value(message.getData()) - .properties(getPropertiesMap(message, originMessageIdStr, originTopicNameStr)); + .value(message.getData()) + .properties(getPropertiesMap(message, originMessageIdStr, originTopicNameStr)); if (message.hasKey()) { typedMessageBuilderNew.key(message.getKey()); } @@ -2062,7 +2059,7 @@ private CompletableFuture processPossibleToDLQ(MessageIdAdv messageId) } result.complete(false); return null; - }); + }); } }, internalPinnedExecutor).exceptionally(ex -> { log.error("Dead letter producer exception with topic: {}", deadLetterPolicy.getDeadLetterTopic(), ex); @@ -2161,15 +2158,9 @@ private CompletableFuture seekAsyncInternal(long requestId, ByteBuf seek, final CompletableFuture seekFuture = new CompletableFuture<>(); ClientCnx cnx = cnx(); - if (!duringSeek.compareAndSet(false, true)) { - log.warn("[{}][{}] Attempting to seek operation that is already in progress, cancelling {}", - topic, subscription, seekBy); - seekFuture.cancel(true); - return seekFuture; - } - MessageIdAdv originSeekMessageId = seekMessageId; seekMessageId = (MessageIdAdv) seekId; + duringSeek.set(true); log.info("[{}][{}] Seeking subscription to {}", topic, subscription, seekBy); cnx.sendRequestWithId(seek, requestId).thenRun(() -> { @@ -2187,9 +2178,9 @@ private CompletableFuture seekAsyncInternal(long requestId, ByteBuf seek, log.error("[{}][{}] Failed to reset subscription: {}", topic, subscription, e.getCause().getMessage()); seekFuture.completeExceptionally( - PulsarClientException.wrap(e.getCause(), - String.format("Failed to seek the subscription %s of the topic %s to %s", - subscription, topicName.toString(), seekBy))); + PulsarClientException.wrap(e.getCause(), + String.format("Failed to seek the subscription %s of the topic %s to %s", + subscription, topicName.toString(), seekBy))); return null; }); return seekFuture; @@ -2201,7 +2192,7 @@ public CompletableFuture seekAsync(long timestamp) { return seekAsyncCheckState(seekBy).orElseGet(() -> { long requestId = client.newRequestId(); return seekAsyncInternal(requestId, Commands.newSeek(consumerId, requestId, timestamp), - MessageId.earliest, seekBy); + MessageId.earliest, seekBy); }); } @@ -2367,11 +2358,10 @@ public CompletableFuture> getLastMessageIdsAsync() { public CompletableFuture internalGetLastMessageIdAsync() { if (getState() == State.Closing || getState() == State.Closed) { return FutureUtil - .failedFuture(new PulsarClientException.AlreadyClosedException( - String.format("The consumer %s was already closed when the subscription %s of the topic %s " - + "getting the last message id", consumerName, subscription, - topicName.toString()))); - } + .failedFuture(new PulsarClientException.AlreadyClosedException( + String.format("The consumer %s was already closed when the subscription %s of the topic %s " + + "getting the last message id", consumerName, subscription, topicName.toString()))); + } AtomicLong opTimeoutMs = new AtomicLong(client.getConfiguration().getOperationTimeoutMs()); Backoff backoff = new BackoffBuilder() @@ -2393,12 +2383,11 @@ private void internalGetLastMessageIdAsync(final Backoff backoff, if (isConnected() && cnx != null) { if (!Commands.peerSupportsGetLastMessageId(cnx.getRemoteEndpointProtocolVersion())) { future.completeExceptionally( - new PulsarClientException.NotSupportedException( - String.format( - "The command `GetLastMessageId` is not supported for the protocol version %d. " - + "The consumer is %s, topic %s, subscription %s", - cnx.getRemoteEndpointProtocolVersion(), - consumerName, topicName.toString(), subscription))); + new PulsarClientException.NotSupportedException( + String.format("The command `GetLastMessageId` is not supported for the protocol version %d. " + + "The consumer is %s, topic %s, subscription %s", + cnx.getRemoteEndpointProtocolVersion(), + consumerName, topicName.toString(), subscription))); return; } @@ -2417,31 +2406,31 @@ private void internalGetLastMessageIdAsync(final Backoff backoff, } if (log.isDebugEnabled()) { log.debug("[{}][{}] Successfully getLastMessageId {}:{}", - topic, subscription, lastMessageId.getLedgerId(), lastMessageId.getEntryId()); + topic, subscription, lastMessageId.getLedgerId(), lastMessageId.getEntryId()); } MessageId lastMsgId = lastMessageId.getBatchIndex() <= 0 ? new MessageIdImpl(lastMessageId.getLedgerId(), - lastMessageId.getEntryId(), lastMessageId.getPartition()) + lastMessageId.getEntryId(), lastMessageId.getPartition()) : new BatchMessageIdImpl(lastMessageId.getLedgerId(), lastMessageId.getEntryId(), - lastMessageId.getPartition(), lastMessageId.getBatchIndex()); + lastMessageId.getPartition(), lastMessageId.getBatchIndex()); future.complete(new GetLastMessageIdResponse(lastMsgId, markDeletePosition)); }).exceptionally(e -> { log.error("[{}][{}] Failed getLastMessageId command", topic, subscription); future.completeExceptionally( - PulsarClientException.wrap(e.getCause(), - String.format("The subscription %s of the topic %s gets the last message id was failed", - subscription, topicName.toString()))); + PulsarClientException.wrap(e.getCause(), + String.format("The subscription %s of the topic %s gets the last message id was failed", + subscription, topicName.toString()))); return null; }); } else { long nextDelay = Math.min(backoff.next(), remainingTime.get()); if (nextDelay <= 0) { future.completeExceptionally( - new PulsarClientException.TimeoutException( - String.format("The subscription %s of the topic %s could not get the last message id " - + "withing configured timeout", subscription, topicName.toString()))); + new PulsarClientException.TimeoutException( + String.format("The subscription %s of the topic %s could not get the last message id " + + "withing configured timeout", subscription, topicName.toString()))); return; } diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java index ab11675f5434c..29d180f5f9a16 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java @@ -19,7 +19,6 @@ package org.apache.pulsar.client.impl; import static org.assertj.core.api.Assertions.assertThat; -import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.Mockito.any; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doReturn; @@ -27,9 +26,7 @@ import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; -import static org.testng.Assert.assertTrue; -import io.netty.buffer.ByteBuf; + import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.concurrent.ExecutorService; @@ -262,26 +259,4 @@ public void testTopicPriorityLevel() { assertThat(consumer.getPriorityLevel()).isEqualTo(1); } - - @Test(invocationTimeOut = 1000) - public void testSeekAsyncInternal() { - // given - ClientCnx cnx = mock(ClientCnx.class); - CompletableFuture clientReq = new CompletableFuture<>(); - when(cnx.sendRequestWithId(any(ByteBuf.class), anyLong())).thenReturn(clientReq); - - consumer.setClientCnx(cnx); - consumer.setState(HandlerState.State.Ready); - - // when - CompletableFuture firstResult = consumer.seekAsync(1L); - CompletableFuture secondResult = consumer.seekAsync(1L); - - clientReq.complete(null); - - // then - assertTrue(firstResult.isDone()); - assertTrue(secondResult.isCancelled()); - verify(cnx, times(1)).sendRequestWithId(any(ByteBuf.class), anyLong()); - } } From 40b766575b1b5fb27c602f83d68d1374bbf6a6f1 Mon Sep 17 00:00:00 2001 From: tison Date: Mon, 15 May 2023 15:10:11 +0800 Subject: [PATCH 2/2] [fix][client] thread-safe seek Signed-off-by: tison --- .../pulsar/client/impl/ConsumerImpl.java | 11 +++++++- .../pulsar/client/impl/ConsumerImplTest.java | 27 ++++++++++++++++++- 2 files changed, 36 insertions(+), 2 deletions(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index 8a06ec122b5e6..4a84e765065f2 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -2158,9 +2158,18 @@ private CompletableFuture seekAsyncInternal(long requestId, ByteBuf seek, final CompletableFuture seekFuture = new CompletableFuture<>(); ClientCnx cnx = cnx(); + if (!duringSeek.compareAndSet(false, true)) { + final String message = String.format( + "[%s][%s] attempting to seek operation that is already in progress (seek by %s)", + topic, subscription, seekBy); + log.warn("[{}][{}] Attempting to seek operation that is already in progress, cancelling {}", + topic, subscription, seekBy); + seekFuture.completeExceptionally(new IllegalStateException(message)); + return seekFuture; + } + MessageIdAdv originSeekMessageId = seekMessageId; seekMessageId = (MessageIdAdv) seekId; - duringSeek.set(true); log.info("[{}][{}] Seeking subscription to {}", topic, subscription, seekBy); cnx.sendRequestWithId(seek, requestId).thenRun(() -> { diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java index 29d180f5f9a16..5a223d5da15c0 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java @@ -19,6 +19,7 @@ package org.apache.pulsar.client.impl; import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.Mockito.any; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doReturn; @@ -26,7 +27,9 @@ import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; - +import static org.mockito.Mockito.when; +import static org.testng.Assert.assertTrue; +import io.netty.buffer.ByteBuf; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.concurrent.ExecutorService; @@ -259,4 +262,26 @@ public void testTopicPriorityLevel() { assertThat(consumer.getPriorityLevel()).isEqualTo(1); } + + @Test(invocationTimeOut = 1000) + public void testSeekAsyncInternal() { + // given + ClientCnx cnx = mock(ClientCnx.class); + CompletableFuture clientReq = new CompletableFuture<>(); + when(cnx.sendRequestWithId(any(ByteBuf.class), anyLong())).thenReturn(clientReq); + + consumer.setClientCnx(cnx); + consumer.setState(HandlerState.State.Ready); + + // when + CompletableFuture firstResult = consumer.seekAsync(1L); + CompletableFuture secondResult = consumer.seekAsync(1L); + + clientReq.complete(null); + + // then + assertTrue(firstResult.isDone()); + assertTrue(secondResult.isCompletedExceptionally()); + verify(cnx, times(1)).sendRequestWithId(any(ByteBuf.class), anyLong()); + } }