diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckStore.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckStore.java index 3da676eb827d0..2f85d2430dbbd 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckStore.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckStore.java @@ -20,7 +20,7 @@ import java.util.List; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ExecutorService; import org.apache.bookkeeper.mledger.Position; import org.apache.bookkeeper.mledger.impl.PositionImpl; import org.apache.commons.lang3.tuple.MutablePair; @@ -38,7 +38,7 @@ public interface PendingAckStore { * @param pendingAckHandle the handle of pending ack * @param executorService the replay executor service */ - void replayAsync(PendingAckHandleImpl pendingAckHandle, ScheduledExecutorService executorService); + void replayAsync(PendingAckHandleImpl pendingAckHandle, ExecutorService executorService); /** * Close the transaction pending ack store. diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/InMemoryPendingAckStore.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/InMemoryPendingAckStore.java index d882c80c47863..44c9fbe039b0b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/InMemoryPendingAckStore.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/InMemoryPendingAckStore.java @@ -20,7 +20,7 @@ import java.util.List; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ExecutorService; import org.apache.bookkeeper.mledger.impl.PositionImpl; import org.apache.commons.lang3.tuple.MutablePair; import org.apache.pulsar.broker.transaction.pendingack.PendingAckStore; @@ -33,7 +33,7 @@ public class InMemoryPendingAckStore implements PendingAckStore { @Override - public void replayAsync(PendingAckHandleImpl pendingAckHandle, ScheduledExecutorService scheduledExecutorService) { + public void replayAsync(PendingAckHandleImpl pendingAckHandle, ExecutorService scheduledExecutorService) { pendingAckHandle.changeToReadyState(); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStore.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStore.java index 0828340162ec7..dd58fe774a8fd 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStore.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStore.java @@ -26,7 +26,7 @@ import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentSkipListMap; -import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicLong; import org.apache.bookkeeper.mledger.AsyncCallbacks; import org.apache.bookkeeper.mledger.Entry; @@ -107,7 +107,7 @@ public MLPendingAckStore(ManagedLedger managedLedger, ManagedCursor cursor, } @Override - public void replayAsync(PendingAckHandleImpl pendingAckHandle, ScheduledExecutorService transactionReplayExecutor) { + public void replayAsync(PendingAckHandleImpl pendingAckHandle, ExecutorService transactionReplayExecutor) { transactionReplayExecutor .execute(new PendingAckReplay(new MLPendingAckReplyCallBack(pendingAckHandle))); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java index 6376634761fbc..f69908827017a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java @@ -34,7 +34,6 @@ import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingDeque; -import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.Semaphore; import lombok.Getter; import lombok.extern.slf4j.Slf4j; @@ -157,8 +156,7 @@ private void initPendingAckStore() { this.pendingAckStoreFuture = pendingAckStoreProvider.newPendingAckStore(persistentSubscription); this.pendingAckStoreFuture.thenAccept(pendingAckStore -> { - pendingAckStore.replayAsync(this, - (ScheduledExecutorService) internalPinnedExecutor); + pendingAckStore.replayAsync(this, internalPinnedExecutor); }).exceptionally(e -> { acceptQueue.clear(); changeToErrorState(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentSubscriptionTest.java index 2ef88c51b5aa2..2d1e72d45c80e 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentSubscriptionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentSubscriptionTest.java @@ -40,7 +40,7 @@ import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; -import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ExecutorService; import org.apache.bookkeeper.common.util.OrderedExecutor; import org.apache.bookkeeper.mledger.AsyncCallbacks; import org.apache.bookkeeper.mledger.ManagedLedger; @@ -129,7 +129,7 @@ public void setup() throws Exception { public CompletableFuture newPendingAckStore(PersistentSubscription subscription) { return CompletableFuture.completedFuture(new PendingAckStore() { @Override - public void replayAsync(PendingAckHandleImpl pendingAckHandle, ScheduledExecutorService executorService) { + public void replayAsync(PendingAckHandleImpl pendingAckHandle, ExecutorService executorService) { try { Field field = PendingAckHandleState.class.getDeclaredField("state"); field.setAccessible(true); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MultiTopicsConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MultiTopicsConsumerTest.java index f325091579ce8..14d818ce3bebb 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MultiTopicsConsumerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MultiTopicsConsumerTest.java @@ -72,7 +72,7 @@ protected PulsarClient createNewPulsarClient(ClientBuilder clientBuilder) throws return new PulsarClientImpl(conf) { { ScheduledExecutorService internalExecutorService = - (ScheduledExecutorService) super.getInternalExecutorService(); + (ScheduledExecutorService) super.getScheduledExecutorProvider().getExecutor(); internalExecutorServiceDelegate = mock(ScheduledExecutorService.class, // a spy isn't used since that doesn't work for private classes, instead // the mock delegatesTo an existing instance. A delegate is sufficient for verifying diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java index a129091d609cb..0da17def09757 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java @@ -34,7 +34,6 @@ import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; -import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; @@ -79,8 +78,8 @@ public abstract class ConsumerBase extends HandlerState implements Consumer listener; protected final ConsumerEventListener consumerEventListener; protected final ExecutorProvider executorProvider; - protected final ScheduledExecutorService externalPinnedExecutor; - protected final ScheduledExecutorService internalPinnedExecutor; + protected final ExecutorService externalPinnedExecutor; + protected final ExecutorService internalPinnedExecutor; final BlockingQueue> incomingMessages; protected ConcurrentOpenHashMap unAckedChunkedMessageIdSequenceMap; protected final ConcurrentLinkedQueue>> pendingReceives; @@ -128,8 +127,8 @@ protected ConsumerBase(PulsarClientImpl client, String topic, ConsumerConfigurat this.unAckedChunkedMessageIdSequenceMap = ConcurrentOpenHashMap.newBuilder().build(); this.executorProvider = executorProvider; - this.externalPinnedExecutor = (ScheduledExecutorService) executorProvider.getExecutor(); - this.internalPinnedExecutor = (ScheduledExecutorService) client.getInternalExecutorService(); + this.externalPinnedExecutor = executorProvider.getExecutor(); + this.internalPinnedExecutor = client.getInternalExecutorService(); this.pendingReceives = Queues.newConcurrentLinkedQueue(); this.pendingBatchReceives = Queues.newConcurrentLinkedQueue(); this.schema = schema; diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index ffd2f68d7602c..5fd09b4b3530f 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -48,6 +48,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ExecutionException; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -1378,10 +1379,12 @@ private ByteBuf processMessageChunk(ByteBuf compressedPayload, MessageMetadata m // Lazy task scheduling to expire incomplete chunk message if (!expireChunkMessageTaskScheduled && expireTimeOfIncompleteChunkedMessageMillis > 0) { - internalPinnedExecutor - .scheduleAtFixedRate(catchingAndLoggingThrowables(this::removeExpireIncompleteChunkedMessages), - expireTimeOfIncompleteChunkedMessageMillis, expireTimeOfIncompleteChunkedMessageMillis, - TimeUnit.MILLISECONDS); + ((ScheduledExecutorService) client.getScheduledExecutorProvider().getExecutor()).scheduleAtFixedRate( + () -> internalPinnedExecutor + .execute(catchingAndLoggingThrowables(this::removeExpireIncompleteChunkedMessages)), + expireTimeOfIncompleteChunkedMessageMillis, expireTimeOfIncompleteChunkedMessageMillis, + TimeUnit.MILLISECONDS + ); expireChunkMessageTaskScheduled = true; } @@ -2387,7 +2390,7 @@ private void internalGetLastMessageIdAsync(final Backoff backoff, return; } - internalPinnedExecutor.schedule(() -> { + ((ScheduledExecutorService) client.getScheduledExecutorProvider().getExecutor()).schedule(() -> { log.warn("[{}] [{}] Could not get connection while getLastMessageId -- Will try again in {} ms", topic, getHandlerName(), nextDelay); remainingTime.addAndGet(-nextDelay); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java index 6cd02f2698f9d..d40fe0b0e43c6 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java @@ -41,6 +41,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ExecutionException; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -276,7 +277,8 @@ private void receiveMessageFromConsumer(ConsumerImpl consumer) { return null; } log.error("Receive operation failed on consumer {} - Retrying later", consumer, ex); - internalPinnedExecutor.schedule(() -> receiveMessageFromConsumer(consumer), 10, TimeUnit.SECONDS); + ((ScheduledExecutorService) client.getScheduledExecutorProvider()) + .schedule(() -> receiveMessageFromConsumer(consumer), 10, TimeUnit.SECONDS); return null; }); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java index 8a7d50f0d4b87..2460f4c53f5a1 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java @@ -77,6 +77,7 @@ import org.apache.pulsar.client.impl.transaction.TransactionBuilderImpl; import org.apache.pulsar.client.impl.transaction.TransactionCoordinatorClientImpl; import org.apache.pulsar.client.util.ExecutorProvider; +import org.apache.pulsar.client.util.ScheduledExecutorProvider; import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace.Mode; import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.TopicDomain; @@ -108,6 +109,8 @@ public class PulsarClientImpl implements PulsarClient { private boolean needStopTimer; private final ExecutorProvider externalExecutorProvider; private final ExecutorProvider internalExecutorProvider; + + private final ScheduledExecutorProvider scheduledExecutorProvider; private final boolean createdEventLoopGroup; private final boolean createdCnxPool; @@ -193,6 +196,8 @@ private PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup eventLoopG new ExecutorProvider(conf.getNumListenerThreads(), "pulsar-external-listener"); this.internalExecutorProvider = internalExecutorProvider != null ? internalExecutorProvider : new ExecutorProvider(conf.getNumIoThreads(), "pulsar-client-internal"); + this.scheduledExecutorProvider = new ScheduledExecutorProvider(conf.getNumIoThreads(), + "pulsar-client-scheduled"); if (conf.getServiceUrl().startsWith("http")) { lookup = new HttpLookupService(conf, this.eventLoopGroup); } else { @@ -1020,7 +1025,7 @@ private void getPartitionedTopicMetadata(TopicName topicName, } previousExceptions.add(e); - ((ScheduledExecutorService) externalExecutorProvider.getExecutor()).schedule(() -> { + ((ScheduledExecutorService) scheduledExecutorProvider.getExecutor()).schedule(() -> { log.warn("[topic: {}] Could not get connection while getPartitionedTopicMetadata -- " + "Will try again in {} ms", topicName, nextDelay); remainingTime.addAndGet(-nextDelay); @@ -1142,6 +1147,11 @@ protected CompletableFuture> preProcessSchemaBeforeSubscribe(Pulsa public ExecutorService getInternalExecutorService() { return internalExecutorProvider.getExecutor(); } + + public ScheduledExecutorProvider getScheduledExecutorProvider() { + return scheduledExecutorProvider; + } + // // Transaction related API // diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/util/ExecutorProvider.java b/pulsar-client/src/main/java/org/apache/pulsar/client/util/ExecutorProvider.java index b5fb3543b820f..67606af63a770 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/util/ExecutorProvider.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/util/ExecutorProvider.java @@ -25,7 +25,6 @@ import java.util.Objects; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import lombok.Getter; @@ -41,7 +40,7 @@ public class ExecutorProvider { private final String poolName; private volatile boolean isShutdown; - private static class ExtendedThreadFactory extends DefaultThreadFactory { + protected static class ExtendedThreadFactory extends DefaultThreadFactory { @Getter private Thread thread; @@ -56,7 +55,6 @@ public Thread newThread(Runnable r) { } } - public ExecutorProvider(int numThreads, String poolName) { checkArgument(numThreads > 0); this.numThreads = numThreads; @@ -65,13 +63,17 @@ public ExecutorProvider(int numThreads, String poolName) { for (int i = 0; i < numThreads; i++) { ExtendedThreadFactory threadFactory = new ExtendedThreadFactory( poolName, Thread.currentThread().isDaemon()); - ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(threadFactory); + ExecutorService executor = createExecutor(threadFactory); executors.add(Pair.of(executor, threadFactory)); } isShutdown = false; this.poolName = poolName; } + protected ExecutorService createExecutor(ExtendedThreadFactory threadFactory) { + return Executors.newSingleThreadExecutor(threadFactory); + } + public ExecutorService getExecutor() { return executors.get((currentThread.getAndIncrement() & Integer.MAX_VALUE) % numThreads).getKey(); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/util/ScheduledExecutorProvider.java b/pulsar-client/src/main/java/org/apache/pulsar/client/util/ScheduledExecutorProvider.java new file mode 100644 index 0000000000000..887ae3bb7fff4 --- /dev/null +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/util/ScheduledExecutorProvider.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.util; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class ScheduledExecutorProvider extends ExecutorProvider { + + public ScheduledExecutorProvider(int numThreads, String poolName) { + super(numThreads, poolName); + } + + @Override + protected ExecutorService createExecutor(ExtendedThreadFactory threadFactory) { + return Executors.newSingleThreadScheduledExecutor(threadFactory); + } +}