Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ public void asyncReadEntry(LedgerHandle lh, PositionImpl position, final ReadEnt
manager.mlFactoryMBean.recordCacheMiss(1, returnEntry.getLength());
ml.mbean.addReadEntriesSample(1, returnEntry.getLength());

ml.getExecutor().submitOrdered(ml.getName(), safeRun(() -> {
ml.getExecutor().executeOrdered(ml.getName(), safeRun(() -> {
callback.readEntryComplete(returnEntry, obj);
}));
} else {
Expand Down Expand Up @@ -254,7 +254,7 @@ public void asyncReadEntry(LedgerHandle lh, long firstEntry, long lastEntry, boo

checkNotNull(ml.getName());
checkNotNull(ml.getExecutor());
ml.getExecutor().submitOrdered(ml.getName(), safeRun(() -> {
ml.getExecutor().executeOrdered(ml.getName(), safeRun(() -> {
// We got the entries, we need to transform them to a List<> type
long totalSize = 0;
final List<EntryImpl> entriesToReturn = Lists.newArrayListWithExpectedSize(entriesToRead);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ boolean hasSpaceInCache() {

// Trigger a single eviction in background. While the eviction is running we stop inserting entries in the cache
if (currentSize > evictionTriggerThreshold && evictionInProgress.compareAndSet(false, true)) {
mlFactory.executor.execute(safeRun(() -> {
mlFactory.scheduledExecutor.execute(safeRun(() -> {
// Trigger a new cache eviction cycle to bring the used memory below the cacheEvictionWatermark
// percentage limit
long sizeToEvict = currentSize - (long) (maxSize * cacheEvictionWatermak);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import com.google.common.collect.TreeRangeSet;
import com.google.common.util.concurrent.RateLimiter;
import com.google.protobuf.InvalidProtocolBufferException;

import java.util.ArrayDeque;
import java.util.Collections;
import java.util.List;
Expand All @@ -51,6 +52,7 @@
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;

import org.apache.bookkeeper.client.AsyncCallback.CloseCallback;
import org.apache.bookkeeper.client.AsyncCallback.DeleteCallback;
import org.apache.bookkeeper.client.BKException;
Expand Down Expand Up @@ -847,7 +849,7 @@ public void asyncResetCursor(Position newPos, AsyncCallbacks.ResetCursorCallback
final PositionImpl newPosition = (PositionImpl) newPos;

// order trim and reset operations on a ledger
ledger.getExecutor().submitOrdered(ledger.getName(), safeRun(() -> {
ledger.getExecutor().executeOrdered(ledger.getName(), safeRun(() -> {
if (ledger.isValidPosition(newPosition) || newPosition.equals(PositionImpl.earliest)
|| newPosition.equals(PositionImpl.latest)) {
internalResetCursor(newPosition, callback);
Expand Down Expand Up @@ -1923,7 +1925,7 @@ void createNewMetadataLedger(final VoidCallback callback) {

bookkeeper.asyncCreateLedger(config.getMetadataEnsemblesize(), config.getMetadataWriteQuorumSize(),
config.getMetadataAckQuorumSize(), config.getDigestType(), config.getPassword(), (rc, lh, ctx) -> {
ledger.getExecutor().submit(safeRun(() -> {
ledger.getExecutor().execute(safeRun(() -> {
ledger.mbean.endCursorLedgerCreateOp();
if (rc != BKException.Code.OK) {
log.warn("[{}] Error creating ledger for cursor {}: {}", ledger.getName(), name,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,20 +23,20 @@

import com.google.common.base.Predicates;
import com.google.common.collect.Maps;
import io.netty.util.concurrent.DefaultThreadFactory;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;

import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.common.util.OrderedExecutor;
import org.apache.bookkeeper.common.util.OrderedScheduler;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
Expand All @@ -63,7 +63,6 @@
import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedCursorInfo;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.MessageRange;
import org.apache.bookkeeper.mledger.util.Futures;
import org.apache.bookkeeper.util.OrderedSafeExecutor;
import org.apache.pulsar.common.util.DateFormatter;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
Expand All @@ -77,9 +76,9 @@ public class ManagedLedgerFactoryImpl implements ManagedLedgerFactory {
private final boolean isBookkeeperManaged;
private final ZooKeeper zookeeper;
private final ManagedLedgerFactoryConfig config;
protected final ScheduledExecutorService executor = Executors.newScheduledThreadPool(16,
new DefaultThreadFactory("bookkeeper-ml"));
private final OrderedScheduler orderedExecutor = OrderedScheduler.newSchedulerBuilder().numThreads(16)
protected final OrderedScheduler scheduledExecutor = OrderedScheduler.newSchedulerBuilder().numThreads(16)
.name("bookkeeper-ml-scheduler").build();
private final OrderedExecutor orderedExecutor = OrderedExecutor.newBuilder().numThreads(16)
.name("bookkeeper-ml-workers").build();

protected final ManagedLedgerFactoryMBeanImpl mbean;
Expand Down Expand Up @@ -122,7 +121,7 @@ public ManagedLedgerFactoryImpl(ClientConfiguration bkClientConfiguration, Manag
this.config = config;
this.mbean = new ManagedLedgerFactoryMBeanImpl(this);
this.entryCacheManager = new EntryCacheManager(this);
this.statsTask = executor.scheduleAtFixedRate(() -> refreshStats(), 0, StatsPeriodSeconds, TimeUnit.SECONDS);
this.statsTask = scheduledExecutor.scheduleAtFixedRate(() -> refreshStats(), 0, StatsPeriodSeconds, TimeUnit.SECONDS);
}

public ManagedLedgerFactoryImpl(BookKeeper bookKeeper, ZooKeeper zooKeeper) throws Exception {
Expand All @@ -138,7 +137,7 @@ public ManagedLedgerFactoryImpl(BookKeeper bookKeeper, ZooKeeper zooKeeper, Mana
this.config = config;
this.mbean = new ManagedLedgerFactoryMBeanImpl(this);
this.entryCacheManager = new EntryCacheManager(this);
this.statsTask = executor.scheduleAtFixedRate(() -> refreshStats(), 0, StatsPeriodSeconds, TimeUnit.SECONDS);
this.statsTask = scheduledExecutor.scheduleAtFixedRate(() -> refreshStats(), 0, StatsPeriodSeconds, TimeUnit.SECONDS);
}

private synchronized void refreshStats() {
Expand Down Expand Up @@ -232,7 +231,7 @@ public void asyncOpen(final String name, final ManagedLedgerConfig config, final
ledgers.computeIfAbsent(name, (mlName) -> {
// Create the managed ledger
CompletableFuture<ManagedLedgerImpl> future = new CompletableFuture<>();
final ManagedLedgerImpl newledger = new ManagedLedgerImpl(this, bookKeeper, store, config, executor,
final ManagedLedgerImpl newledger = new ManagedLedgerImpl(this, bookKeeper, store, config, scheduledExecutor,
orderedExecutor, name);
newledger.initialize(new ManagedLedgerInitializeLedgerCallback() {
@Override
Expand Down Expand Up @@ -305,7 +304,7 @@ public void closeFailed(ManagedLedgerException exception, Object ctx) {
}
}

executor.shutdown();
scheduledExecutor.shutdown();
orderedExecutor.shutdown();

entryCacheManager.clear();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,26 @@
import static java.lang.Math.min;
import static org.apache.bookkeeper.mledger.util.SafeRun.safeRun;

import com.google.common.collect.BoundType;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Queues;
import com.google.common.collect.Range;
import com.google.common.util.concurrent.RateLimiter;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;

import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Queue;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
Expand All @@ -45,6 +53,7 @@
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.common.util.OrderedExecutor;
import org.apache.bookkeeper.common.util.OrderedScheduler;
import org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.CloseCallback;
Expand Down Expand Up @@ -77,23 +86,13 @@
import org.apache.bookkeeper.mledger.util.Futures;
import org.apache.bookkeeper.mledger.util.Pair;
import org.apache.pulsar.common.api.Commands;
import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.InitialPosition;
import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
import org.apache.pulsar.common.util.collections.ConcurrentLongHashMap;
import org.apache.pulsar.common.util.collections.GrowableArrayBlockingQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.collect.BoundType;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Queues;
import com.google.common.collect.Range;
import com.google.common.util.concurrent.RateLimiter;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;

public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
private final static long MegaByte = 1024 * 1024;

Expand Down Expand Up @@ -190,8 +189,8 @@ enum PositionBound {
AtomicReferenceFieldUpdater.newUpdater(ManagedLedgerImpl.class, State.class, "state");
private volatile State state = null;

private final ScheduledExecutorService scheduledExecutor;
private final OrderedScheduler executor;
private final OrderedScheduler scheduledExecutor;
private final OrderedExecutor executor;
final ManagedLedgerFactoryImpl factory;
protected final ManagedLedgerMBeanImpl mbean;

Expand All @@ -204,7 +203,7 @@ enum PositionBound {
// //////////////////////////////////////////////////////////////////////

public ManagedLedgerImpl(ManagedLedgerFactoryImpl factory, BookKeeper bookKeeper, MetaStore store,
ManagedLedgerConfig config, ScheduledExecutorService scheduledExecutor, OrderedScheduler orderedExecutor,
ManagedLedgerConfig config, OrderedScheduler scheduledExecutor, OrderedExecutor orderedExecutor,
final String name) {
this.factory = factory;
this.bookKeeper = bookKeeper;
Expand Down Expand Up @@ -250,7 +249,7 @@ public void operationComplete(ManagedLedgerInfo mlInfo, Stat stat) {
if (ledgers.size() > 0) {
final long id = ledgers.lastKey();
OpenCallback opencb = (rc, lh, ctx1) -> {
executor.submitOrdered(name, safeRun(() -> {
executor.executeOrdered(name, safeRun(() -> {
mbean.endDataLedgerOpenOp();
if (log.isDebugEnabled()) {
log.debug("[{}] Opened ledger {}: ", name, id, BKException.getMessage(rc));
Expand Down Expand Up @@ -338,7 +337,7 @@ public void operationFailed(MetaStoreException e) {
mbean.startDataLedgerCreateOp();
bookKeeper.asyncCreateLedger(config.getEnsembleSize(), config.getWriteQuorumSize(), config.getAckQuorumSize(),
config.getDigestType(), config.getPassword(), (rc, lh, ctx) -> {
executor.submitOrdered(name, safeRun(() -> {
executor.executeOrdered(name, safeRun(() -> {
mbean.endDataLedgerCreateOp();
if (rc != BKException.Code.OK) {
callback.initializeFailed(createManagedLedgerException(rc));
Expand Down Expand Up @@ -1319,7 +1318,7 @@ CompletableFuture<LedgerHandle> getLedgerHandle(long ledgerId) {
mbean.startDataLedgerOpenOp();
bookKeeper.asyncOpenLedger(ledgerId, config.getDigestType(), config.getPassword(),
(int rc, LedgerHandle lh, Object ctx) -> {
executor.submit(safeRun(() -> {
executor.executeOrdered(name, safeRun(() -> {
mbean.endDataLedgerOpenOp();
if (rc != BKException.Code.OK) {
// Remove the ledger future from cache to give chance to reopen it later
Expand Down Expand Up @@ -1484,12 +1483,12 @@ void notifyCursors() {
break;
}

executor.submit(safeRun(() -> waitingCursor.notifyEntriesAvailable()));
executor.execute(safeRun(() -> waitingCursor.notifyEntriesAvailable()));
}
}

private void trimConsumedLedgersInBackground() {
executor.submitOrdered(name, safeRun(() -> {
executor.executeOrdered(name, safeRun(() -> {
internalTrimConsumedLedgers();
}));
}
Expand Down Expand Up @@ -2113,11 +2112,11 @@ public NavigableMap<Long, LedgerInfo> getLedgersInfo() {
return ledgers;
}

ScheduledExecutorService getScheduledExecutor() {
OrderedScheduler getScheduledExecutor() {
return scheduledExecutor;
}

OrderedScheduler getExecutor() {
OrderedExecutor getExecutor() {
return executor;
}

Expand Down
Loading