diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index ee1da48bbd46d..a5612bc0eb9fb 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -1482,11 +1482,7 @@ public synchronized void createComplete(int rc, final LedgerHandle lh, Object ct lastLedgerCreationFailureTimestamp = clock.millis(); } else { log.info("[{}] Created new ledger {}", name, lh.getId()); - ledgers.put(lh.getId(), LedgerInfo.newBuilder().setLedgerId(lh.getId()).setTimestamp(0).build()); - currentLedger = lh; - currentLedgerEntries = 0; - currentLedgerSize = 0; - + LedgerInfo newLedger = LedgerInfo.newBuilder().setLedgerId(lh.getId()).setTimestamp(0).build(); final MetaStoreCallback cb = new MetaStoreCallback() { @Override public void operationComplete(Void v, Stat stat) { @@ -1494,6 +1490,10 @@ public void operationComplete(Void v, Stat stat) { log.debug("[{}] Updating of ledgers list after create complete. version={}", name, stat); } ledgersStat = stat; + ledgers.put(lh.getId(), newLedger); + currentLedger = lh; + currentLedgerEntries = 0; + currentLedgerSize = 0; metadataMutex.unlock(); updateLedgersIdsComplete(stat); synchronized (ManagedLedgerImpl.this) { @@ -1508,8 +1508,6 @@ public void operationComplete(Void v, Stat stat) { @Override public void operationFailed(MetaStoreException e) { log.warn("[{}] Error updating meta data with the new list of ledgers: {}", name, e.getMessage()); - // Remove the ledger, since we failed to update the list - ledgers.remove(lh.getId()); mbean.startDataLedgerDeleteOp(); bookKeeper.asyncDeleteLedger(lh.getId(), (rc1, ctx1) -> { mbean.endDataLedgerDeleteOp(); @@ -1545,21 +1543,22 @@ public void operationFailed(MetaStoreException e) { } }; - updateLedgersListAfterRollover(cb); + updateLedgersListAfterRollover(cb, newLedger); } } - - private void updateLedgersListAfterRollover(MetaStoreCallback callback) { + private void updateLedgersListAfterRollover(MetaStoreCallback callback, LedgerInfo newLedger) { if (!metadataMutex.tryLock()) { // Defer update for later - scheduledExecutor.schedule(() -> updateLedgersListAfterRollover(callback), 100, TimeUnit.MILLISECONDS); + scheduledExecutor.schedule(() -> updateLedgersListAfterRollover(callback, newLedger), + 100, TimeUnit.MILLISECONDS); return; } if (log.isDebugEnabled()) { log.debug("[{}] Updating ledgers ids with new ledger. version={}", name, ledgersStat); } - store.asyncUpdateLedgerIds(name, getManagedLedgerInfo(), ledgersStat, callback); + ManagedLedgerInfo mlInfo = getManagedLedgerInfo(newLedger); + store.asyncUpdateLedgerIds(name, mlInfo, ledgersStat, callback); } public synchronized void updateLedgersIdsComplete(Stat stat) { @@ -3569,8 +3568,17 @@ private ManagedLedgerInfo getManagedLedgerInfo() { return buildManagedLedgerInfo(ledgers); } + private ManagedLedgerInfo getManagedLedgerInfo(LedgerInfo newLedger) { + ManagedLedgerInfo.Builder mlInfo = ManagedLedgerInfo.newBuilder().addAllLedgerInfo(ledgers.values()) + .addLedgerInfo(newLedger); + return buildManagedLedgerInfo(mlInfo); + } private ManagedLedgerInfo buildManagedLedgerInfo(Map ledgers) { ManagedLedgerInfo.Builder mlInfo = ManagedLedgerInfo.newBuilder().addAllLedgerInfo(ledgers.values()); + return buildManagedLedgerInfo(mlInfo); + } + + private ManagedLedgerInfo buildManagedLedgerInfo(ManagedLedgerInfo.Builder mlInfo) { if (state == State.Terminated) { mlInfo.setTerminatedPosition(NestedPositionInfo.newBuilder().setLedgerId(lastConfirmedEntry.getLedgerId()) .setEntryId(lastConfirmedEntry.getEntryId()));