diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaTopicConsumerManager.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaTopicConsumerManager.java index a65ed238b0..5a56d9348d 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaTopicConsumerManager.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaTopicConsumerManager.java @@ -17,11 +17,13 @@ import io.streamnative.pulsar.handlers.kop.utils.OffsetSearchPredicate; import java.io.Closeable; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.concurrent.atomic.AtomicBoolean; import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCursorCallback; @@ -33,7 +35,6 @@ import org.apache.commons.codec.digest.DigestUtils; import org.apache.commons.lang3.tuple.Pair; import org.apache.pulsar.broker.service.persistent.PersistentTopic; -import org.apache.pulsar.common.util.collections.ConcurrentLongHashMap; /** * KafkaTopicConsumerManager manages a topic and its related offset cursor. @@ -44,32 +45,27 @@ public class KafkaTopicConsumerManager implements Closeable { private final PersistentTopic topic; private final KafkaRequestHandler requestHandler; - // the lock for closed status change. - // once closed, should not add new cursor back, since consumers are cleared. - private final ReentrantReadWriteLock rwLock; - private boolean closed; + private final AtomicBoolean closed = new AtomicBoolean(false); // key is the offset, value is the future of (cursor, offset), whose offset is the last offset in pair. @Getter - private final ConcurrentLongHashMap>> cursors; + private final Map>> cursors; // used to track all created cursor, since above consumers may be remove and in fly, // use this map will not leak cursor when close. @Getter - private final ConcurrentMap createdCursors; + private final Map createdCursors; // track last access time(millis) for offsets @Getter - private final ConcurrentLongHashMap lastAccessTimes; + private final Map lastAccessTimes; KafkaTopicConsumerManager(KafkaRequestHandler requestHandler, PersistentTopic topic) { this.topic = topic; - this.cursors = new ConcurrentLongHashMap<>(); + this.cursors = new ConcurrentHashMap<>(); this.createdCursors = new ConcurrentHashMap<>(); - this.lastAccessTimes = new ConcurrentLongHashMap<>(); + this.lastAccessTimes = new ConcurrentHashMap<>(); this.requestHandler = requestHandler; - this.rwLock = new ReentrantReadWriteLock(); - this.closed = false; } // delete expired cursors, so backlog can be cleared. @@ -82,20 +78,13 @@ void deleteExpiredCursor(long current, long expirePeriodMillis) { } void deleteOneExpiredCursor(long offset) { - CompletableFuture> cursorFuture; - - // need not do anything, since this tcm already in closing state. and close() will delete every thing. - rwLock.readLock().lock(); - try { - if (closed) { - return; - } - cursorFuture = cursors.remove(offset); - lastAccessTimes.remove(offset); - } finally { - rwLock.readLock().unlock(); + if (closed.get()) { + return; } + final CompletableFuture> cursorFuture = cursors.remove(offset); + lastAccessTimes.remove(offset); + if (cursorFuture != null) { if (log.isDebugEnabled()) { log.debug("[{}] Cursor timed out for offset: {}, cursors cache size: {}", @@ -115,6 +104,9 @@ void deleteOneExpiredCursor(long offset) { // delete passed in cursor. void deleteOneCursorAsync(ManagedCursor cursor, String reason) { + if (closed.get()) { + return; + } if (cursor != null) { topic.getManagedLedger().asyncDeleteCursor(cursor.getName(), new DeleteCursorCallback() { @Override @@ -139,56 +131,42 @@ public void deleteCursorFailed(ManagedLedgerException exception, Object ctx) { // remove from cache, so another same offset read could happen. // each success remove should have a following add. public CompletableFuture> removeCursorFuture(long offset) { - rwLock.readLock().lock(); - try { - if (closed) { - return null; - } - lastAccessTimes.remove(offset); - final CompletableFuture> cursorFuture = cursors.remove(offset); - if (cursorFuture == null) { - return asyncCreateCursorIfNotExists(offset); - } + if (closed.get()) { + return null; + } - if (log.isDebugEnabled()) { - log.debug("[{}] Get cursor for offset: {} in cache. cache size: {}", - requestHandler.ctx.channel(), offset, cursors.size()); - } - return cursorFuture; - } finally { - rwLock.readLock().unlock(); + lastAccessTimes.remove(offset); + final CompletableFuture> cursorFuture = cursors.remove(offset); + if (cursorFuture == null) { + return asyncCreateCursorIfNotExists(offset); + } + + if (log.isDebugEnabled()) { + log.debug("[{}] Get cursor for offset: {} in cache. cache size: {}", + requestHandler.ctx.channel(), offset, cursors.size()); } + return cursorFuture; } private CompletableFuture> asyncCreateCursorIfNotExists(long offset) { - rwLock.readLock().lock(); - try { - if (closed) { - return null; - } - cursors.putIfAbsent(offset, asyncGetCursorByOffset(offset)); - - // notice: above would add a - lastAccessTimes.remove(offset); - return cursors.remove(offset); - } finally { - rwLock.readLock().unlock(); + if (closed.get()) { + return null; } + cursors.putIfAbsent(offset, asyncGetCursorByOffset(offset)); + + // notice: above would add a + lastAccessTimes.remove(offset); + return cursors.remove(offset); } public void add(long offset, Pair pair) { checkArgument(offset == pair.getRight(), "offset not equal. key: " + offset + " value: " + pair.getRight()); - rwLock.readLock().lock(); - try { - if (closed) { - ManagedCursor managedCursor = pair.getLeft(); - deleteOneCursorAsync(managedCursor, "A race - add cursor back but tcm already closed"); - return; - } - } finally { - rwLock.readLock().unlock(); + if (closed.get()) { + ManagedCursor managedCursor = pair.getLeft(); + deleteOneCursorAsync(managedCursor, "A race - add cursor back but tcm already closed"); + return; } final CompletableFuture> cursorFuture = CompletableFuture.completedFuture(pair); @@ -206,49 +184,43 @@ public void add(long offset, Pair pair) { // called when channel closed. @Override public void close() { - final ConcurrentLongHashMap>> cursorFuturesToClose = - new ConcurrentLongHashMap<>(); - ConcurrentMap cursorsToClose; - rwLock.writeLock().lock(); - try { - if (closed) { - return; - } - closed = true; - if (log.isDebugEnabled()) { - log.debug("[{}] Close TCM for topic {}.", - requestHandler.ctx.channel(), topic.getName()); - } - cursors.forEach(cursorFuturesToClose::put); - cursors.clear(); - lastAccessTimes.clear(); - cursorsToClose = new ConcurrentHashMap<>(); - createdCursors.forEach(cursorsToClose::put); - createdCursors.clear(); - } finally { - rwLock.writeLock().unlock(); + if (!closed.compareAndSet(false, true)) { + return; } - - cursorFuturesToClose.values().forEach(cursorFuture -> { + if (log.isDebugEnabled()) { + log.debug("[{}] Close TCM for topic {}.", + requestHandler.ctx.channel(), topic.getName()); + } + final List>> cursorFuturesToClose = new ArrayList<>(); + cursors.forEach((ignored, cursorFuture) -> cursorFuturesToClose.add(cursorFuture)); + cursors.clear(); + lastAccessTimes.clear(); + final List cursorsToClose = new ArrayList<>(); + createdCursors.forEach((ignored, cursor) -> cursorsToClose.add(cursor)); + createdCursors.clear(); + + cursorFuturesToClose.forEach(cursorFuture -> { cursorFuture.whenComplete((pair, e) -> { if (e != null || pair == null) { return; } ManagedCursor cursor = pair.getLeft(); deleteOneCursorAsync(cursor, "TopicConsumerManager close"); - if (cursor != null) { - cursorsToClose.remove(cursor.getName()); - } }); }); + cursorFuturesToClose.clear(); // delete dangling createdCursors - cursorsToClose.values().forEach(cursor -> + cursorsToClose.forEach(cursor -> deleteOneCursorAsync(cursor, "TopicConsumerManager close but cursor is still outstanding")); cursorsToClose.clear(); } private CompletableFuture> asyncGetCursorByOffset(long offset) { + if (closed.get()) { + // return a null completed future instead of null because the returned value will be put into a Map + return CompletableFuture.completedFuture(null); + } final ManagedLedger ledger = topic.getManagedLedger(); return ledger.asyncFindPosition(new OffsetSearchPredicate(offset)).thenApply(position -> { final String cursorName = "kop-consumer-cursor-" + topic.getName() diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaTopicManager.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaTopicManager.java index 62f12ed980..eb43b8ee1b 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaTopicManager.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaTopicManager.java @@ -21,6 +21,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -67,7 +68,7 @@ public class KafkaTopicManager { // remove expired cursors, so backlog can be cleared. private static final long checkPeriodMillis = 1 * 60 * 1000; private static final long expirePeriodMillis = 2 * 60 * 1000; - private final ScheduledFuture cursorExpireTask; + private static volatile ScheduledFuture cursorExpireTask = null; private final AtomicBoolean closed = new AtomicBoolean(false); @@ -83,19 +84,25 @@ public class KafkaTopicManager { this.brokerService = pulsarService.getBrokerService(); this.internalServerCnx = new InternalServerCnx(requestHandler); - // check expired cursor every 1 min. - this.cursorExpireTask = brokerService.executor().scheduleWithFixedDelay(() -> { - long current = System.currentTimeMillis(); - if (log.isDebugEnabled()) { - log.debug("[{}] Schedule a check of expired cursor", - requestHandler.ctx.channel()); - } - consumerTopicManagers.values().forEach(future -> { - if (future != null && future.isDone() && !future.isCompletedExceptionally()) { - future.join().deleteExpiredCursor(current, expirePeriodMillis); + initializeCursorExpireTask(brokerService.executor()); + } + + private static void initializeCursorExpireTask(final ScheduledExecutorService executor) { + if (cursorExpireTask == null) { + synchronized (KafkaTopicManager.class) { + if (cursorExpireTask == null) { + // check expired cursor every 1 min. + cursorExpireTask = executor.scheduleWithFixedDelay(() -> { + long current = System.currentTimeMillis(); + consumerTopicManagers.values().forEach(future -> { + if (future != null && future.isDone() && !future.isCompletedExceptionally()) { + future.join().deleteExpiredCursor(current, expirePeriodMillis); + } + }); + }, checkPeriodMillis, checkPeriodMillis, TimeUnit.MILLISECONDS); } - }); - }, checkPeriodMillis, checkPeriodMillis, TimeUnit.MILLISECONDS); + } + } } // update Ctx information, since at internalServerCnx create time there is no ctx passed into kafkaRequestHandler. @@ -296,8 +303,6 @@ public void close() { } try { - this.cursorExpireTask.cancel(true); - closeKafkaTopicConsumerManagers(); topics.keySet().forEach(topicName -> { @@ -369,6 +374,12 @@ public static void removeKafkaTopicConsumerManager(String topicName) { } public static void closeKafkaTopicConsumerManagers() { + synchronized (KafkaTopicManager.class) { + if (cursorExpireTask != null) { + cursorExpireTask.cancel(true); + cursorExpireTask = null; + } + } consumerTopicManagers.forEach((topic, tcmFuture) -> { try { Optional.ofNullable(tcmFuture.get(300, TimeUnit.SECONDS))