Skip to content
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -1482,18 +1482,18 @@ public synchronized void createComplete(int rc, final LedgerHandle lh, Object ct
lastLedgerCreationFailureTimestamp = clock.millis();
} else {
log.info("[{}] Created new ledger {}", name, lh.getId());
ledgers.put(lh.getId(), LedgerInfo.newBuilder().setLedgerId(lh.getId()).setTimestamp(0).build());
currentLedger = lh;
currentLedgerEntries = 0;
currentLedgerSize = 0;

LedgerInfo newLedger = LedgerInfo.newBuilder().setLedgerId(lh.getId()).setTimestamp(0).build();
final MetaStoreCallback<Void> cb = new MetaStoreCallback<Void>() {
@Override
public void operationComplete(Void v, Stat stat) {
if (log.isDebugEnabled()) {
log.debug("[{}] Updating of ledgers list after create complete. version={}", name, stat);
}
ledgersStat = stat;
ledgers.put(lh.getId(), newLedger);
currentLedger = lh;
currentLedgerEntries = 0;
currentLedgerSize = 0;
metadataMutex.unlock();
updateLedgersIdsComplete(stat);
synchronized (ManagedLedgerImpl.this) {
Expand All @@ -1508,8 +1508,6 @@ public void operationComplete(Void v, Stat stat) {
@Override
public void operationFailed(MetaStoreException e) {
log.warn("[{}] Error updating meta data with the new list of ledgers: {}", name, e.getMessage());
// Remove the ledger, since we failed to update the list
ledgers.remove(lh.getId());
mbean.startDataLedgerDeleteOp();
bookKeeper.asyncDeleteLedger(lh.getId(), (rc1, ctx1) -> {
mbean.endDataLedgerDeleteOp();
Expand Down Expand Up @@ -1545,21 +1543,22 @@ public void operationFailed(MetaStoreException e) {
}
};

updateLedgersListAfterRollover(cb);
updateLedgersListAfterRollover(cb, newLedger);
}
}

private void updateLedgersListAfterRollover(MetaStoreCallback<Void> callback) {
private void updateLedgersListAfterRollover(MetaStoreCallback<Void> callback, LedgerInfo newLedger) {
if (!metadataMutex.tryLock()) {
// Defer update for later
scheduledExecutor.schedule(() -> updateLedgersListAfterRollover(callback), 100, TimeUnit.MILLISECONDS);
scheduledExecutor.schedule(() -> updateLedgersListAfterRollover(callback, newLedger),
100, TimeUnit.MILLISECONDS);
return;
}

if (log.isDebugEnabled()) {
log.debug("[{}] Updating ledgers ids with new ledger. version={}", name, ledgersStat);
}
store.asyncUpdateLedgerIds(name, getManagedLedgerInfo(), ledgersStat, callback);
ManagedLedgerInfo mlInfo = getManagedLedgerInfo(newLedger);
store.asyncUpdateLedgerIds(name, mlInfo, ledgersStat, callback);
}

public synchronized void updateLedgersIdsComplete(Stat stat) {
Expand Down Expand Up @@ -3569,8 +3568,17 @@ private ManagedLedgerInfo getManagedLedgerInfo() {
return buildManagedLedgerInfo(ledgers);
}

private ManagedLedgerInfo getManagedLedgerInfo(LedgerInfo newLedger) {
ManagedLedgerInfo.Builder mlInfo = ManagedLedgerInfo.newBuilder().addAllLedgerInfo(ledgers.values())
.addLedgerInfo(newLedger);
return buildManagedLedgerInfo(mlInfo);
}
private ManagedLedgerInfo buildManagedLedgerInfo(Map<Long, LedgerInfo> ledgers) {
ManagedLedgerInfo.Builder mlInfo = ManagedLedgerInfo.newBuilder().addAllLedgerInfo(ledgers.values());
return buildManagedLedgerInfo(mlInfo);
}

private ManagedLedgerInfo buildManagedLedgerInfo(ManagedLedgerInfo.Builder mlInfo) {
if (state == State.Terminated) {
mlInfo.setTerminatedPosition(NestedPositionInfo.newBuilder().setLedgerId(lastConfirmedEntry.getLedgerId())
.setEntryId(lastConfirmedEntry.getEntryId()));
Expand Down