Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,10 @@ public class ManagedLedgerConfig {
private int minimumBacklogEntriesForCaching = 1000;
private int maxBacklogBetweenCursorsForCaching = 1000;

@Getter
@Setter
private String shadowSourceName;

public boolean isCreateIfMissing() {
return createIfMissing;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
protected final BookKeeper bookKeeper;
protected final String name;
private final Map<String, byte[]> ledgerMetadata;
private final BookKeeper.DigestType digestType;
protected final BookKeeper.DigestType digestType;

protected ManagedLedgerConfig config;
protected Map<String, String> propertiesMap;
Expand All @@ -164,7 +164,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
.concurrencyLevel(1) // number of sections
.build();
protected final NavigableMap<Long, LedgerInfo> ledgers = new ConcurrentSkipListMap<>();
private volatile Stat ledgersStat;
protected volatile Stat ledgersStat;

// contains all cursors, where durable cursors are ordered by mark delete position
private final ManagedCursorContainer cursors = new ManagedCursorContainer();
Expand Down Expand Up @@ -215,10 +215,10 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
private final CallbackMutex offloadMutex = new CallbackMutex();
private static final CompletableFuture<PositionImpl> NULL_OFFLOAD_PROMISE = CompletableFuture
.completedFuture(PositionImpl.LATEST);
private volatile LedgerHandle currentLedger;
private long currentLedgerEntries = 0;
private long currentLedgerSize = 0;
private volatile long lastLedgerCreatedTimestamp = 0;
protected volatile LedgerHandle currentLedger;
protected long currentLedgerEntries = 0;
protected long currentLedgerSize = 0;
protected volatile long lastLedgerCreatedTimestamp = 0;
private volatile long lastLedgerCreationFailureTimestamp = 0;
private long lastLedgerCreationInitiationTimestamp = 0;

Expand All @@ -236,9 +236,9 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {

volatile PositionImpl lastConfirmedEntry;

private ManagedLedgerInterceptor managedLedgerInterceptor;
protected ManagedLedgerInterceptor managedLedgerInterceptor;

private volatile long lastAddEntryTimeMs = 0;
protected volatile long lastAddEntryTimeMs = 0;
private long inactiveLedgerRollOverTimeMs = 0;

protected static final int DEFAULT_LEDGER_DELETE_RETRIES = 3;
Expand Down Expand Up @@ -285,7 +285,7 @@ public enum PositionBound {
startIncluded, startExcluded
}

private static final AtomicReferenceFieldUpdater<ManagedLedgerImpl, State> STATE_UPDATER =
protected static final AtomicReferenceFieldUpdater<ManagedLedgerImpl, State> STATE_UPDATER =
AtomicReferenceFieldUpdater.newUpdater(ManagedLedgerImpl.class, State.class, "state");
protected volatile State state = null;
private volatile boolean migrated = false;
Expand All @@ -294,7 +294,7 @@ public enum PositionBound {
private final OrderedScheduler scheduledExecutor;

@Getter
private final OrderedExecutor executor;
protected final OrderedExecutor executor;

@Getter
private final ManagedLedgerFactoryImpl factory;
Expand Down Expand Up @@ -465,7 +465,11 @@ public void operationFailed(MetaStoreException e) {
scheduleTimeoutTask();
}

private synchronized void initializeBookKeeper(final ManagedLedgerInitializeLedgerCallback callback) {
protected boolean isLedgersReadonly() {
return false;
}

protected synchronized void initializeBookKeeper(final ManagedLedgerInitializeLedgerCallback callback) {
if (log.isDebugEnabled()) {
log.debug("[{}] initializing bookkeeper; ledgers {}", name, ledgers);
}
Expand Down Expand Up @@ -550,7 +554,7 @@ public void operationFailed(MetaStoreException e) {
}, ledgerMetadata);
}

private void initializeCursors(final ManagedLedgerInitializeLedgerCallback callback) {
protected void initializeCursors(final ManagedLedgerInitializeLedgerCallback callback) {
if (log.isDebugEnabled()) {
log.debug("[{}] initializing cursors", name);
}
Expand Down Expand Up @@ -791,7 +795,7 @@ public void asyncAddEntry(ByteBuf buffer, int numberOfMessages, AddEntryCallback
}));
}

private synchronized void internalAsyncAddEntry(OpAddEntry addOperation) {
protected synchronized void internalAsyncAddEntry(OpAddEntry addOperation) {
if (!beforeAddEntry(addOperation)) {
return;
}
Expand Down Expand Up @@ -861,7 +865,7 @@ private synchronized void internalAsyncAddEntry(OpAddEntry addOperation) {
lastAddEntryTimeMs = System.currentTimeMillis();
}

private boolean beforeAddEntry(OpAddEntry addOperation) {
protected boolean beforeAddEntry(OpAddEntry addOperation) {
// if no interceptor, just return true to make sure addOperation will be initiate()
if (managedLedgerInterceptor == null) {
return true;
Expand Down Expand Up @@ -1609,7 +1613,7 @@ public void operationFailed(MetaStoreException e) {
}
}

private void handleBadVersion(Throwable e) {
protected void handleBadVersion(Throwable e) {
if (e instanceof BadVersionException) {
setFenced();
}
Expand Down Expand Up @@ -1649,7 +1653,7 @@ void createNewOpAddEntryForNewLedger() {
} while (existsOp != null && --pendingSize > 0);
}

private synchronized void updateLedgersIdsComplete() {
protected synchronized void updateLedgersIdsComplete() {
STATE_UPDATER.set(this, State.LedgerOpened);
updateLastLedgerCreatedTimeAndScheduleRolloverTask();

Expand Down Expand Up @@ -2542,8 +2546,8 @@ void internalTrimLedgers(boolean isTruncate, CompletableFuture<?> promise) {
return;
}

List<LedgerInfo> ledgersToDelete = new ArrayList();
List<LedgerInfo> offloadedLedgersToDelete = new ArrayList();
List<LedgerInfo> ledgersToDelete = new ArrayList<>();
List<LedgerInfo> offloadedLedgersToDelete = new ArrayList<>();
Optional<OffloadPolicies> optionalOffloadPolicies = Optional.ofNullable(config.getLedgerOffloader() != null
&& config.getLedgerOffloader() != NullLedgerOffloader.INSTANCE
? config.getLedgerOffloader().getOffloadPolicies()
Expand Down Expand Up @@ -2671,23 +2675,8 @@ void internalTrimLedgers(boolean isTruncate, CompletableFuture<?> promise) {
return;
}

PositionImpl currentLastConfirmedEntry = lastConfirmedEntry;
// Update metadata
for (LedgerInfo ls : ledgersToDelete) {
if (currentLastConfirmedEntry != null && ls.getLedgerId() == currentLastConfirmedEntry.getLedgerId()) {
// this info is relevant because the lastMessageId won't be available anymore
log.info("[{}] Ledger {} contains the current last confirmed entry {}, and it is going to be "
+ "deleted", name, ls.getLedgerId(), currentLastConfirmedEntry);
}
doDeleteLedgers(ledgersToDelete);

invalidateReadHandle(ls.getLedgerId());

ledgers.remove(ls.getLedgerId());
NUMBER_OF_ENTRIES_UPDATER.addAndGet(this, -ls.getEntries());
TOTAL_SIZE_UPDATER.addAndGet(this, -ls.getSize());

entryCache.invalidateAllEntries(ls.getLedgerId());
}
for (LedgerInfo ls : offloadedLedgersToDelete) {
LedgerInfo.Builder newInfoBuilder = ls.toBuilder();
newInfoBuilder.getOffloadContextBuilder().setBookkeeperDeleted(true);
Expand Down Expand Up @@ -2737,6 +2726,26 @@ public void operationFailed(MetaStoreException e) {
}
}

protected void doDeleteLedgers(List<LedgerInfo> ledgersToDelete) {
PositionImpl currentLastConfirmedEntry = lastConfirmedEntry;
// Update metadata
for (LedgerInfo ls : ledgersToDelete) {
if (currentLastConfirmedEntry != null && ls.getLedgerId() == currentLastConfirmedEntry.getLedgerId()) {
// this info is relevant because the lastMessageId won't be available anymore
log.info("[{}] Ledger {} contains the current last confirmed entry {}, and it is going to be "
+ "deleted", name, ls.getLedgerId(), currentLastConfirmedEntry);
}

invalidateReadHandle(ls.getLedgerId());

ledgers.remove(ls.getLedgerId());
NUMBER_OF_ENTRIES_UPDATER.addAndGet(this, -ls.getEntries());
TOTAL_SIZE_UPDATER.addAndGet(this, -ls.getSize());

entryCache.invalidateAllEntries(ls.getLedgerId());
}
}

/**
* Non-durable cursors have to be moved forward when data is trimmed since they are not retain that data.
* This is to make sure that the `consumedEntries` counter is correctly updated with the number of skipped
Expand Down Expand Up @@ -3696,7 +3705,7 @@ public NavigableMap<Long, LedgerInfo> getLedgersInfo() {
return ledgers;
}

private ManagedLedgerInfo getManagedLedgerInfo() {
protected ManagedLedgerInfo getManagedLedgerInfo() {
return buildManagedLedgerInfo(ledgers);
}

Expand Down Expand Up @@ -4284,7 +4293,7 @@ public CompletableFuture<Set<BookieId>> getEnsemblesAsync(long ledgerId) {
});
}

private void updateLastLedgerCreatedTimeAndScheduleRolloverTask() {
protected void updateLastLedgerCreatedTimeAndScheduleRolloverTask() {
this.lastLedgerCreatedTimestamp = clock.millis();
if (config.getMaximumRolloverTimeMs() > 0) {
if (checkLedgerRollTask != null && !checkLedgerRollTask.isDone()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ interface MetaStoreCallback<T> {
void operationFailed(MetaStoreException e);
}

interface UpdateCallback<T> {
void onUpdate(T result, Stat stat);
}

/**
* Get the metadata used by the ManagedLedger.
*
Expand Down Expand Up @@ -71,6 +75,19 @@ default void getManagedLedgerInfo(String ledgerName, boolean createIfMissing,
void getManagedLedgerInfo(String ledgerName, boolean createIfMissing, Map<String, String> properties,
MetaStoreCallback<ManagedLedgerInfo> callback);

/**
* Watch the metadata used by the ManagedLedger.
* @param ledgerName
* @param callback
*/
void watchManagedLedgerInfo(String ledgerName, UpdateCallback<ManagedLedgerInfo> callback);

/**
* Unwatch the metadata changes for ledgerName.
* @param ledgerName
*/
void unwatchManagedLedgerInfo(String ledgerName);

/**
*
* @param ledgerName
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.RejectedExecutionException;
import java.util.function.Consumer;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.common.util.OrderedExecutor;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
Expand All @@ -47,10 +49,12 @@
import org.apache.pulsar.common.compression.CompressionCodecProvider;
import org.apache.pulsar.metadata.api.MetadataStore;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.api.Notification;
import org.apache.pulsar.metadata.api.NotificationType;
import org.apache.pulsar.metadata.api.Stat;

@Slf4j
public class MetaStoreImpl implements MetaStore {
public class MetaStoreImpl implements MetaStore, Consumer<Notification> {

private static final String BASE_NODE = "/managed-ledgers";
private static final String PREFIX = BASE_NODE + "/";
Expand All @@ -62,11 +66,17 @@ public class MetaStoreImpl implements MetaStore {
private final CompressionType ledgerInfoCompressionType;
private final CompressionType cursorInfoCompressionType;

private final Map<String, UpdateCallback<ManagedLedgerInfo>> managedLedgerInfoUpdateCallbackMap;

public MetaStoreImpl(MetadataStore store, OrderedExecutor executor) {
this.store = store;
this.executor = executor;
this.ledgerInfoCompressionType = CompressionType.NONE;
this.cursorInfoCompressionType = CompressionType.NONE;
managedLedgerInfoUpdateCallbackMap = new ConcurrentHashMap<>();
if (store != null) {
store.registerListener(this);
}
}

public MetaStoreImpl(MetadataStore store, OrderedExecutor executor, String ledgerInfoCompressionType,
Expand All @@ -75,6 +85,10 @@ public MetaStoreImpl(MetadataStore store, OrderedExecutor executor, String ledge
this.executor = executor;
this.ledgerInfoCompressionType = parseCompressionType(ledgerInfoCompressionType);
this.cursorInfoCompressionType = parseCompressionType(cursorInfoCompressionType);
managedLedgerInfoUpdateCallbackMap = new ConcurrentHashMap<>();
if (store != null) {
store.registerListener(this);
}
}

private CompressionType parseCompressionType(String value) {
Expand Down Expand Up @@ -327,6 +341,43 @@ public CompletableFuture<Boolean> asyncExists(String path) {
return store.exists(PREFIX + path);
}

@Override
public void watchManagedLedgerInfo(String ledgerName, UpdateCallback<ManagedLedgerInfo> callback) {
managedLedgerInfoUpdateCallbackMap.put(PREFIX + ledgerName, callback);
}

@Override
public void unwatchManagedLedgerInfo(String ledgerName) {
managedLedgerInfoUpdateCallbackMap.remove(PREFIX + ledgerName);
}

@Override
public void accept(Notification notification) {
if (!notification.getPath().startsWith(PREFIX) || notification.getType() != NotificationType.Modified) {
return;
}
UpdateCallback<ManagedLedgerInfo> callback = managedLedgerInfoUpdateCallbackMap.get(notification.getPath());
if (callback == null) {
return;
}
String ledgerName = notification.getPath().substring(PREFIX.length());
store.get(notification.getPath()).thenAcceptAsync(optResult -> {
if (optResult.isPresent()) {
ManagedLedgerInfo info;
try {
info = parseManagedLedgerInfo(optResult.get().getValue());
info = updateMLInfoTimestamp(info);
callback.onUpdate(info, optResult.get().getStat());
} catch (InvalidProtocolBufferException e) {
log.error("[{}] Error when parseManagedLedgerInfo", ledgerName, e);
}
}
}, executor.chooseThread(ledgerName)).exceptionally(ex -> {
log.error("[{}] Error when read ManagedLedgerInfo", ledgerName, ex);
return null;
});
}

//
// update timestamp if missing or 0
// 3 cases - timestamp does not exist for ledgers serialized before
Expand Down
Loading