Skip to content
This repository was archived by the owner on Jan 24, 2024. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,13 @@

import io.streamnative.pulsar.handlers.kop.utils.OffsetSearchPredicate;
import java.io.Closeable;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.concurrent.atomic.AtomicBoolean;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCursorCallback;
Expand All @@ -33,7 +35,6 @@
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.common.util.collections.ConcurrentLongHashMap;

/**
* KafkaTopicConsumerManager manages a topic and its related offset cursor.
Expand All @@ -44,32 +45,27 @@ public class KafkaTopicConsumerManager implements Closeable {
private final PersistentTopic topic;
private final KafkaRequestHandler requestHandler;

// the lock for closed status change.
// once closed, should not add new cursor back, since consumers are cleared.
private final ReentrantReadWriteLock rwLock;
private boolean closed;
private final AtomicBoolean closed = new AtomicBoolean(false);

// key is the offset, value is the future of (cursor, offset), whose offset is the last offset in pair.
@Getter
private final ConcurrentLongHashMap<CompletableFuture<Pair<ManagedCursor, Long>>> cursors;
private final Map<Long, CompletableFuture<Pair<ManagedCursor, Long>>> cursors;

// used to track all created cursor, since above consumers may be remove and in fly,
// use this map will not leak cursor when close.
@Getter
private final ConcurrentMap<String, ManagedCursor> createdCursors;
private final Map<String, ManagedCursor> createdCursors;

// track last access time(millis) for offsets <offset, time>
@Getter
private final ConcurrentLongHashMap<Long> lastAccessTimes;
private final Map<Long, Long> lastAccessTimes;

KafkaTopicConsumerManager(KafkaRequestHandler requestHandler, PersistentTopic topic) {
this.topic = topic;
this.cursors = new ConcurrentLongHashMap<>();
this.cursors = new ConcurrentHashMap<>();
this.createdCursors = new ConcurrentHashMap<>();
this.lastAccessTimes = new ConcurrentLongHashMap<>();
this.lastAccessTimes = new ConcurrentHashMap<>();
this.requestHandler = requestHandler;
this.rwLock = new ReentrantReadWriteLock();
this.closed = false;
}

// delete expired cursors, so backlog can be cleared.
Expand All @@ -82,20 +78,13 @@ void deleteExpiredCursor(long current, long expirePeriodMillis) {
}

void deleteOneExpiredCursor(long offset) {
CompletableFuture<Pair<ManagedCursor, Long>> cursorFuture;

// need not do anything, since this tcm already in closing state. and close() will delete every thing.
rwLock.readLock().lock();
try {
if (closed) {
return;
}
cursorFuture = cursors.remove(offset);
lastAccessTimes.remove(offset);
} finally {
rwLock.readLock().unlock();
if (closed.get()) {
return;
}

final CompletableFuture<Pair<ManagedCursor, Long>> cursorFuture = cursors.remove(offset);
lastAccessTimes.remove(offset);

if (cursorFuture != null) {
if (log.isDebugEnabled()) {
log.debug("[{}] Cursor timed out for offset: {}, cursors cache size: {}",
Expand All @@ -115,6 +104,9 @@ void deleteOneExpiredCursor(long offset) {

// delete passed in cursor.
void deleteOneCursorAsync(ManagedCursor cursor, String reason) {
if (closed.get()) {
return;
}
if (cursor != null) {
topic.getManagedLedger().asyncDeleteCursor(cursor.getName(), new DeleteCursorCallback() {
@Override
Expand All @@ -139,56 +131,42 @@ public void deleteCursorFailed(ManagedLedgerException exception, Object ctx) {
// remove from cache, so another same offset read could happen.
// each success remove should have a following add.
public CompletableFuture<Pair<ManagedCursor, Long>> removeCursorFuture(long offset) {
rwLock.readLock().lock();
try {
if (closed) {
return null;
}
lastAccessTimes.remove(offset);
final CompletableFuture<Pair<ManagedCursor, Long>> cursorFuture = cursors.remove(offset);
if (cursorFuture == null) {
return asyncCreateCursorIfNotExists(offset);
}
if (closed.get()) {
return null;
}

if (log.isDebugEnabled()) {
log.debug("[{}] Get cursor for offset: {} in cache. cache size: {}",
requestHandler.ctx.channel(), offset, cursors.size());
}
return cursorFuture;
} finally {
rwLock.readLock().unlock();
lastAccessTimes.remove(offset);
final CompletableFuture<Pair<ManagedCursor, Long>> cursorFuture = cursors.remove(offset);
if (cursorFuture == null) {
return asyncCreateCursorIfNotExists(offset);
}

if (log.isDebugEnabled()) {
log.debug("[{}] Get cursor for offset: {} in cache. cache size: {}",
requestHandler.ctx.channel(), offset, cursors.size());
}
return cursorFuture;
}

private CompletableFuture<Pair<ManagedCursor, Long>> asyncCreateCursorIfNotExists(long offset) {
rwLock.readLock().lock();
try {
if (closed) {
return null;
}
cursors.putIfAbsent(offset, asyncGetCursorByOffset(offset));

// notice: above would add a <offset, null-Pair>
lastAccessTimes.remove(offset);
return cursors.remove(offset);
} finally {
rwLock.readLock().unlock();
if (closed.get()) {
return null;
}
cursors.putIfAbsent(offset, asyncGetCursorByOffset(offset));

// notice: above would add a <offset, null-Pair>
lastAccessTimes.remove(offset);
return cursors.remove(offset);
}

public void add(long offset, Pair<ManagedCursor, Long> pair) {
checkArgument(offset == pair.getRight(),
"offset not equal. key: " + offset + " value: " + pair.getRight());

rwLock.readLock().lock();
try {
if (closed) {
ManagedCursor managedCursor = pair.getLeft();
deleteOneCursorAsync(managedCursor, "A race - add cursor back but tcm already closed");
return;
}
} finally {
rwLock.readLock().unlock();
if (closed.get()) {
ManagedCursor managedCursor = pair.getLeft();
deleteOneCursorAsync(managedCursor, "A race - add cursor back but tcm already closed");
return;
}

final CompletableFuture<Pair<ManagedCursor, Long>> cursorFuture = CompletableFuture.completedFuture(pair);
Expand All @@ -206,49 +184,43 @@ public void add(long offset, Pair<ManagedCursor, Long> pair) {
// called when channel closed.
@Override
public void close() {
final ConcurrentLongHashMap<CompletableFuture<Pair<ManagedCursor, Long>>> cursorFuturesToClose =
new ConcurrentLongHashMap<>();
ConcurrentMap<String, ManagedCursor> cursorsToClose;
rwLock.writeLock().lock();
try {
if (closed) {
return;
}
closed = true;
if (log.isDebugEnabled()) {
log.debug("[{}] Close TCM for topic {}.",
requestHandler.ctx.channel(), topic.getName());
}
cursors.forEach(cursorFuturesToClose::put);
cursors.clear();
lastAccessTimes.clear();
cursorsToClose = new ConcurrentHashMap<>();
createdCursors.forEach(cursorsToClose::put);
createdCursors.clear();
} finally {
rwLock.writeLock().unlock();
if (!closed.compareAndSet(false, true)) {
return;
}

cursorFuturesToClose.values().forEach(cursorFuture -> {
if (log.isDebugEnabled()) {
log.debug("[{}] Close TCM for topic {}.",
requestHandler.ctx.channel(), topic.getName());
}
final List<CompletableFuture<Pair<ManagedCursor, Long>>> cursorFuturesToClose = new ArrayList<>();
cursors.forEach((ignored, cursorFuture) -> cursorFuturesToClose.add(cursorFuture));
cursors.clear();
lastAccessTimes.clear();
final List<ManagedCursor> cursorsToClose = new ArrayList<>();
createdCursors.forEach((ignored, cursor) -> cursorsToClose.add(cursor));
createdCursors.clear();

cursorFuturesToClose.forEach(cursorFuture -> {
cursorFuture.whenComplete((pair, e) -> {
if (e != null || pair == null) {
return;
}
ManagedCursor cursor = pair.getLeft();
deleteOneCursorAsync(cursor, "TopicConsumerManager close");
if (cursor != null) {
cursorsToClose.remove(cursor.getName());
}
});
});
cursorFuturesToClose.clear();

// delete dangling createdCursors
cursorsToClose.values().forEach(cursor ->
cursorsToClose.forEach(cursor ->
deleteOneCursorAsync(cursor, "TopicConsumerManager close but cursor is still outstanding"));
cursorsToClose.clear();
}

private CompletableFuture<Pair<ManagedCursor, Long>> asyncGetCursorByOffset(long offset) {
if (closed.get()) {
// return a null completed future instead of null because the returned value will be put into a Map
return CompletableFuture.completedFuture(null);
}
final ManagedLedger ledger = topic.getManagedLedger();
return ledger.asyncFindPosition(new OffsetSearchPredicate(offset)).thenApply(position -> {
final String cursorName = "kop-consumer-cursor-" + topic.getName()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
Expand Down Expand Up @@ -67,7 +68,7 @@ public class KafkaTopicManager {
// remove expired cursors, so backlog can be cleared.
private static final long checkPeriodMillis = 1 * 60 * 1000;
private static final long expirePeriodMillis = 2 * 60 * 1000;
private final ScheduledFuture<?> cursorExpireTask;
private static volatile ScheduledFuture<?> cursorExpireTask = null;

private final AtomicBoolean closed = new AtomicBoolean(false);

Expand All @@ -83,19 +84,25 @@ public class KafkaTopicManager {
this.brokerService = pulsarService.getBrokerService();
this.internalServerCnx = new InternalServerCnx(requestHandler);

// check expired cursor every 1 min.
this.cursorExpireTask = brokerService.executor().scheduleWithFixedDelay(() -> {
long current = System.currentTimeMillis();
if (log.isDebugEnabled()) {
log.debug("[{}] Schedule a check of expired cursor",
requestHandler.ctx.channel());
}
consumerTopicManagers.values().forEach(future -> {
if (future != null && future.isDone() && !future.isCompletedExceptionally()) {
future.join().deleteExpiredCursor(current, expirePeriodMillis);
initializeCursorExpireTask(brokerService.executor());
}

private static void initializeCursorExpireTask(final ScheduledExecutorService executor) {
if (cursorExpireTask == null) {
synchronized (KafkaTopicManager.class) {
if (cursorExpireTask == null) {
// check expired cursor every 1 min.
cursorExpireTask = executor.scheduleWithFixedDelay(() -> {
long current = System.currentTimeMillis();
consumerTopicManagers.values().forEach(future -> {
if (future != null && future.isDone() && !future.isCompletedExceptionally()) {
future.join().deleteExpiredCursor(current, expirePeriodMillis);
}
});
}, checkPeriodMillis, checkPeriodMillis, TimeUnit.MILLISECONDS);
}
});
}, checkPeriodMillis, checkPeriodMillis, TimeUnit.MILLISECONDS);
}
}
}

// update Ctx information, since at internalServerCnx create time there is no ctx passed into kafkaRequestHandler.
Expand Down Expand Up @@ -296,8 +303,6 @@ public void close() {
}

try {
this.cursorExpireTask.cancel(true);

closeKafkaTopicConsumerManagers();

topics.keySet().forEach(topicName -> {
Expand Down Expand Up @@ -369,6 +374,12 @@ public static void removeKafkaTopicConsumerManager(String topicName) {
}

public static void closeKafkaTopicConsumerManagers() {
synchronized (KafkaTopicManager.class) {
if (cursorExpireTask != null) {
cursorExpireTask.cancel(true);
cursorExpireTask = null;
}
}
consumerTopicManagers.forEach((topic, tcmFuture) -> {
try {
Optional.ofNullable(tcmFuture.get(300, TimeUnit.SECONDS))
Expand Down