Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle

private final AtomicReference<ClientCnx> clientCnxUsedForConsumerRegistration = new AtomicReference<>();
private final List<Throwable> previousExceptions = new CopyOnWriteArrayList<Throwable>();

static <T> ConsumerImpl<T> newConsumerImpl(PulsarClientImpl client,
String topic,
ConsumerConfigurationData<T> conf,
Expand Down Expand Up @@ -251,10 +252,12 @@ static <T> ConsumerImpl<T> newConsumerImpl(PulsarClientImpl client,
}

protected ConsumerImpl(PulsarClientImpl client, String topic, ConsumerConfigurationData<T> conf,
ExecutorProvider executorProvider, int partitionIndex, boolean hasParentConsumer,
boolean parentConsumerHasListener, CompletableFuture<Consumer<T>> subscribeFuture, MessageId startMessageId,
long startMessageRollbackDurationInSec, Schema<T> schema, ConsumerInterceptors<T> interceptors,
boolean createTopicIfDoesNotExist) {
ExecutorProvider executorProvider, int partitionIndex, boolean hasParentConsumer,
boolean parentConsumerHasListener, CompletableFuture<Consumer<T>> subscribeFuture,
MessageId startMessageId,
long startMessageRollbackDurationInSec, Schema<T> schema,
ConsumerInterceptors<T> interceptors,
boolean createTopicIfDoesNotExist) {
super(client, topic, conf, conf.getReceiverQueueSize(), executorProvider, subscribeFuture, schema,
interceptors);
this.consumerId = client.newConsumerId();
Expand Down Expand Up @@ -319,21 +322,21 @@ protected ConsumerImpl(PulsarClientImpl client, String topic, ConsumerConfigurat
}

this.connectionHandler = new ConnectionHandler(this,
new BackoffBuilder()
.setInitialTime(client.getConfiguration().getInitialBackoffIntervalNanos(),
TimeUnit.NANOSECONDS)
.setMax(client.getConfiguration().getMaxBackoffIntervalNanos(), TimeUnit.NANOSECONDS)
.setMandatoryStop(0, TimeUnit.MILLISECONDS)
.create(),
new BackoffBuilder()
.setInitialTime(client.getConfiguration().getInitialBackoffIntervalNanos(),
TimeUnit.NANOSECONDS)
.setMax(client.getConfiguration().getMaxBackoffIntervalNanos(), TimeUnit.NANOSECONDS)
.setMandatoryStop(0, TimeUnit.MILLISECONDS)
.create(),
this);

this.topicName = TopicName.get(topic);
if (this.topicName.isPersistent()) {
this.acknowledgmentsGroupingTracker =
new PersistentAcknowledgmentsGroupingTracker(this, conf, client.eventLoopGroup());
new PersistentAcknowledgmentsGroupingTracker(this, conf, client.eventLoopGroup());
} else {
this.acknowledgmentsGroupingTracker =
NonPersistentAcknowledgmentGroupingTracker.of();
NonPersistentAcknowledgmentGroupingTracker.of();
}

if (conf.getDeadLetterPolicy() != null) {
Expand Down Expand Up @@ -410,16 +413,16 @@ public CompletableFuture<Void> unsubscribeAsync() {
log.error("[{}][{}] Failed to unsubscribe: {}", topic, subscription, e.getCause().getMessage());
setState(State.Ready);
unsubscribeFuture.completeExceptionally(
PulsarClientException.wrap(e.getCause(),
String.format("Failed to unsubscribe the subscription %s of topic %s",
topicName.toString(), subscription)));
PulsarClientException.wrap(e.getCause(),
String.format("Failed to unsubscribe the subscription %s of topic %s",
topicName.toString(), subscription)));
return null;
});
} else {
unsubscribeFuture.completeExceptionally(
new PulsarClientException(
String.format("The client is not connected to the broker when unsubscribing the "
+ "subscription %s of the topic %s", subscription, topicName.toString())));
new PulsarClientException(
String.format("The client is not connected to the broker when unsubscribing the "
+ "subscription %s of the topic %s", subscription, topicName.toString())));
}
return unsubscribeFuture;
}
Expand Down Expand Up @@ -1400,7 +1403,7 @@ void messageReceived(CommandMessage cmdMessage, ByteBuf headersAndPayload, Clien
}

private ByteBuf processMessageChunk(ByteBuf compressedPayload, MessageMetadata msgMetadata, MessageIdImpl msgId,
MessageIdData messageId, ClientCnx cnx) {
MessageIdData messageId, ClientCnx cnx) {

// Lazy task scheduling to expire incomplete chunk message
if (expireTimeOfIncompleteChunkedMessageMillis > 0 && expireChunkMessageTaskScheduled.compareAndSet(false,
Expand Down Expand Up @@ -1445,7 +1448,7 @@ private ByteBuf processMessageChunk(ByteBuf compressedPayload, MessageMetadata m
increaseAvailablePermits(cnx);
if (expireTimeOfIncompleteChunkedMessageMillis > 0
&& System.currentTimeMillis() > (msgMetadata.getPublishTime()
+ expireTimeOfIncompleteChunkedMessageMillis)) {
+ expireTimeOfIncompleteChunkedMessageMillis)) {
doAcknowledge(msgId, AckType.Individual, Collections.emptyMap(), null);
} else {
trackMessage(msgId);
Expand Down Expand Up @@ -1636,7 +1639,7 @@ protected void trackMessage(Message<?> msg) {
}

protected void trackMessage(MessageId messageId) {
trackMessage(messageId, 0);
trackMessage(messageId, 0);
}

protected void trackMessage(MessageId messageId, int redeliveryCount) {
Expand Down Expand Up @@ -1777,7 +1780,7 @@ private ByteBuf decryptPayloadIfNeeded(MessageIdData messageId, int redeliveryCo
}

private ByteBuf uncompressPayloadIfNeeded(MessageIdData messageId, MessageMetadata msgMetadata, ByteBuf payload,
ClientCnx currentCnx, boolean checkMaxMessageSize) {
ClientCnx currentCnx, boolean checkMaxMessageSize) {
CompressionType compressionType = msgMetadata.getCompression();
CompressionCodec codec = CompressionCodecProvider.getCompressionCodec(compressionType);
int uncompressedSize = msgMetadata.getUncompressedSize();
Expand Down Expand Up @@ -1830,7 +1833,7 @@ private void discardCorruptedMessage(MessageIdImpl messageId, ClientCnx currentC
}

private void discardCorruptedMessage(MessageIdData messageId, ClientCnx currentCnx,
ValidationError validationError) {
ValidationError validationError) {
log.error("[{}][{}] Discarding corrupted message at {}:{}", topic, subscription, messageId.getLedgerId(),
messageId.getEntryId());
discardMessage(messageId, currentCnx, validationError);
Expand Down Expand Up @@ -2022,8 +2025,8 @@ private CompletableFuture<Boolean> processPossibleToDLQ(MessageIdAdv messageId)
String originTopicNameStr = getOriginTopicNameStr(message);
TypedMessageBuilder<byte[]> typedMessageBuilderNew =
producerDLQ.newMessage(Schema.AUTO_PRODUCE_BYTES(message.getReaderSchema().get()))
.value(message.getData())
.properties(getPropertiesMap(message, originMessageIdStr, originTopicNameStr));
.value(message.getData())
.properties(getPropertiesMap(message, originMessageIdStr, originTopicNameStr));
if (message.hasKey()) {
typedMessageBuilderNew.key(message.getKey());
}
Expand Down Expand Up @@ -2052,7 +2055,7 @@ private CompletableFuture<Boolean> processPossibleToDLQ(MessageIdAdv messageId)
}
result.complete(false);
return null;
});
});
}
}, internalPinnedExecutor).exceptionally(ex -> {
log.error("Dead letter producer exception with topic: {}", deadLetterPolicy.getDeadLetterTopic(), ex);
Expand Down Expand Up @@ -2151,9 +2154,15 @@ private CompletableFuture<Void> seekAsyncInternal(long requestId, ByteBuf seek,
final CompletableFuture<Void> seekFuture = new CompletableFuture<>();
ClientCnx cnx = cnx();

if (!duringSeek.compareAndSet(false, true)) {
log.warn("[{}][{}] Attempting to seek operation that is already in progress, cancelling {}",
topic, subscription, seekBy);
seekFuture.cancel(true);
Comment thread
tisonkun marked this conversation as resolved.
return seekFuture;
}

MessageIdAdv originSeekMessageId = seekMessageId;
seekMessageId = (MessageIdAdv) seekId;
duringSeek.set(true);
log.info("[{}][{}] Seeking subscription to {}", topic, subscription, seekBy);

cnx.sendRequestWithId(seek, requestId).thenRun(() -> {
Expand All @@ -2171,9 +2180,9 @@ private CompletableFuture<Void> seekAsyncInternal(long requestId, ByteBuf seek,
log.error("[{}][{}] Failed to reset subscription: {}", topic, subscription, e.getCause().getMessage());

seekFuture.completeExceptionally(
PulsarClientException.wrap(e.getCause(),
String.format("Failed to seek the subscription %s of the topic %s to %s",
subscription, topicName.toString(), seekBy)));
PulsarClientException.wrap(e.getCause(),
String.format("Failed to seek the subscription %s of the topic %s to %s",
subscription, topicName.toString(), seekBy)));
return null;
});
return seekFuture;
Expand All @@ -2185,7 +2194,7 @@ public CompletableFuture<Void> seekAsync(long timestamp) {
return seekAsyncCheckState(seekBy).orElseGet(() -> {
long requestId = client.newRequestId();
return seekAsyncInternal(requestId, Commands.newSeek(consumerId, requestId, timestamp),
MessageId.earliest, seekBy);
MessageId.earliest, seekBy);
});
}

Expand Down Expand Up @@ -2351,10 +2360,11 @@ public CompletableFuture<List<TopicMessageId>> getLastMessageIdsAsync() {
public CompletableFuture<GetLastMessageIdResponse> internalGetLastMessageIdAsync() {
if (getState() == State.Closing || getState() == State.Closed) {
return FutureUtil
.failedFuture(new PulsarClientException.AlreadyClosedException(
String.format("The consumer %s was already closed when the subscription %s of the topic %s "
+ "getting the last message id", consumerName, subscription, topicName.toString())));
}
.failedFuture(new PulsarClientException.AlreadyClosedException(
String.format("The consumer %s was already closed when the subscription %s of the topic %s "
+ "getting the last message id", consumerName, subscription,
topicName.toString())));
}

AtomicLong opTimeoutMs = new AtomicLong(client.getConfiguration().getOperationTimeoutMs());
Backoff backoff = new BackoffBuilder()
Expand All @@ -2376,11 +2386,12 @@ private void internalGetLastMessageIdAsync(final Backoff backoff,
if (isConnected() && cnx != null) {
if (!Commands.peerSupportsGetLastMessageId(cnx.getRemoteEndpointProtocolVersion())) {
future.completeExceptionally(
new PulsarClientException.NotSupportedException(
String.format("The command `GetLastMessageId` is not supported for the protocol version %d. "
+ "The consumer is %s, topic %s, subscription %s",
cnx.getRemoteEndpointProtocolVersion(),
consumerName, topicName.toString(), subscription)));
new PulsarClientException.NotSupportedException(
String.format(
"The command `GetLastMessageId` is not supported for the protocol version %d. "
+ "The consumer is %s, topic %s, subscription %s",
cnx.getRemoteEndpointProtocolVersion(),
consumerName, topicName.toString(), subscription)));
return;
}

Expand All @@ -2399,31 +2410,31 @@ private void internalGetLastMessageIdAsync(final Backoff backoff,
}
if (log.isDebugEnabled()) {
log.debug("[{}][{}] Successfully getLastMessageId {}:{}",
topic, subscription, lastMessageId.getLedgerId(), lastMessageId.getEntryId());
topic, subscription, lastMessageId.getLedgerId(), lastMessageId.getEntryId());
}

MessageId lastMsgId = lastMessageId.getBatchIndex() <= 0
? new MessageIdImpl(lastMessageId.getLedgerId(),
lastMessageId.getEntryId(), lastMessageId.getPartition())
lastMessageId.getEntryId(), lastMessageId.getPartition())
: new BatchMessageIdImpl(lastMessageId.getLedgerId(), lastMessageId.getEntryId(),
lastMessageId.getPartition(), lastMessageId.getBatchIndex());
lastMessageId.getPartition(), lastMessageId.getBatchIndex());

future.complete(new GetLastMessageIdResponse(lastMsgId, markDeletePosition));
}).exceptionally(e -> {
log.error("[{}][{}] Failed getLastMessageId command", topic, subscription);
future.completeExceptionally(
PulsarClientException.wrap(e.getCause(),
String.format("The subscription %s of the topic %s gets the last message id was failed",
subscription, topicName.toString())));
PulsarClientException.wrap(e.getCause(),
String.format("The subscription %s of the topic %s gets the last message id was failed",
subscription, topicName.toString())));
return null;
});
} else {
long nextDelay = Math.min(backoff.next(), remainingTime.get());
if (nextDelay <= 0) {
future.completeExceptionally(
new PulsarClientException.TimeoutException(
String.format("The subscription %s of the topic %s could not get the last message id "
+ "withing configured timeout", subscription, topicName.toString())));
new PulsarClientException.TimeoutException(
String.format("The subscription %s of the topic %s could not get the last message id "
+ "withing configured timeout", subscription, topicName.toString())));
return;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,17 @@
package org.apache.pulsar.client.impl;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;

import static org.mockito.Mockito.when;
import static org.testng.Assert.assertTrue;
import io.netty.buffer.ByteBuf;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutorService;
Expand Down Expand Up @@ -259,4 +262,26 @@ public void testTopicPriorityLevel() {

assertThat(consumer.getPriorityLevel()).isEqualTo(1);
}

@Test(invocationTimeOut = 1000)
public void testSeekAsyncInternal() {
// given
ClientCnx cnx = mock(ClientCnx.class);
CompletableFuture<ProducerResponse> clientReq = new CompletableFuture<>();
when(cnx.sendRequestWithId(any(ByteBuf.class), anyLong())).thenReturn(clientReq);

consumer.setClientCnx(cnx);
consumer.setState(HandlerState.State.Ready);

// when
CompletableFuture<Void> firstResult = consumer.seekAsync(1L);
CompletableFuture<Void> secondResult = consumer.seekAsync(1L);

clientReq.complete(null);

// then
assertTrue(firstResult.isDone());
assertTrue(secondResult.isCancelled());
verify(cnx, times(1)).sendRequestWithId(any(ByteBuf.class), anyLong());
}
}