From ded6d8fc81cdaa7e03481ad10dcffdd72d59ff00 Mon Sep 17 00:00:00 2001 From: chenlin <1572139390@qq.com> Date: Wed, 6 Jul 2022 20:50:18 +0800 Subject: [PATCH 1/9] fix No such ledger exception --- .../org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index ee1da48bbd46d..04cff9dbe5a23 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -1483,7 +1483,6 @@ public synchronized void createComplete(int rc, final LedgerHandle lh, Object ct } else { log.info("[{}] Created new ledger {}", name, lh.getId()); ledgers.put(lh.getId(), LedgerInfo.newBuilder().setLedgerId(lh.getId()).setTimestamp(0).build()); - currentLedger = lh; currentLedgerEntries = 0; currentLedgerSize = 0; @@ -1494,6 +1493,7 @@ public void operationComplete(Void v, Stat stat) { log.debug("[{}] Updating of ledgers list after create complete. version={}", name, stat); } ledgersStat = stat; + currentLedger = lh; metadataMutex.unlock(); updateLedgersIdsComplete(stat); synchronized (ManagedLedgerImpl.this) { From 140f29cf160e6c26ff6130dd80a8105093baf742 Mon Sep 17 00:00:00 2001 From: chenlin <1572139390@qq.com> Date: Wed, 6 Jul 2022 21:47:43 +0800 Subject: [PATCH 2/9] move currentLedgerEntries and currentLedgerSize --- .../apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index 04cff9dbe5a23..f92156ba2b196 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -1483,9 +1483,6 @@ public synchronized void createComplete(int rc, final LedgerHandle lh, Object ct } else { log.info("[{}] Created new ledger {}", name, lh.getId()); ledgers.put(lh.getId(), LedgerInfo.newBuilder().setLedgerId(lh.getId()).setTimestamp(0).build()); - currentLedgerEntries = 0; - currentLedgerSize = 0; - final MetaStoreCallback cb = new MetaStoreCallback() { @Override public void operationComplete(Void v, Stat stat) { @@ -1494,6 +1491,8 @@ public void operationComplete(Void v, Stat stat) { } ledgersStat = stat; currentLedger = lh; + currentLedgerEntries = 0; + currentLedgerSize = 0; metadataMutex.unlock(); updateLedgersIdsComplete(stat); synchronized (ManagedLedgerImpl.this) { From c698df75f2727783f755f51eb1c462038316c017 Mon Sep 17 00:00:00 2001 From: chenlin <1572139390@qq.com> Date: Wed, 6 Jul 2022 22:40:15 +0800 Subject: [PATCH 3/9] move ledgers.put to operationComplete --- .../org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index f92156ba2b196..ea622890b4532 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -1482,7 +1482,6 @@ public synchronized void createComplete(int rc, final LedgerHandle lh, Object ct lastLedgerCreationFailureTimestamp = clock.millis(); } else { log.info("[{}] Created new ledger {}", name, lh.getId()); - ledgers.put(lh.getId(), LedgerInfo.newBuilder().setLedgerId(lh.getId()).setTimestamp(0).build()); final MetaStoreCallback cb = new MetaStoreCallback() { @Override public void operationComplete(Void v, Stat stat) { @@ -1490,6 +1489,7 @@ public void operationComplete(Void v, Stat stat) { log.debug("[{}] Updating of ledgers list after create complete. version={}", name, stat); } ledgersStat = stat; + ledgers.put(lh.getId(), LedgerInfo.newBuilder().setLedgerId(lh.getId()).setTimestamp(0).build()); currentLedger = lh; currentLedgerEntries = 0; currentLedgerSize = 0; @@ -1507,8 +1507,6 @@ public void operationComplete(Void v, Stat stat) { @Override public void operationFailed(MetaStoreException e) { log.warn("[{}] Error updating meta data with the new list of ledgers: {}", name, e.getMessage()); - // Remove the ledger, since we failed to update the list - ledgers.remove(lh.getId()); mbean.startDataLedgerDeleteOp(); bookKeeper.asyncDeleteLedger(lh.getId(), (rc1, ctx1) -> { mbean.endDataLedgerDeleteOp(); From dc505b792f5b172bfc35e3a5fbdc8c14cbe9e62e Mon Sep 17 00:00:00 2001 From: chenlin <1572139390@qq.com> Date: Thu, 7 Jul 2022 08:51:16 +0800 Subject: [PATCH 4/9] fix ledgers.put --- .../org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index ea622890b4532..f92156ba2b196 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -1482,6 +1482,7 @@ public synchronized void createComplete(int rc, final LedgerHandle lh, Object ct lastLedgerCreationFailureTimestamp = clock.millis(); } else { log.info("[{}] Created new ledger {}", name, lh.getId()); + ledgers.put(lh.getId(), LedgerInfo.newBuilder().setLedgerId(lh.getId()).setTimestamp(0).build()); final MetaStoreCallback cb = new MetaStoreCallback() { @Override public void operationComplete(Void v, Stat stat) { @@ -1489,7 +1490,6 @@ public void operationComplete(Void v, Stat stat) { log.debug("[{}] Updating of ledgers list after create complete. version={}", name, stat); } ledgersStat = stat; - ledgers.put(lh.getId(), LedgerInfo.newBuilder().setLedgerId(lh.getId()).setTimestamp(0).build()); currentLedger = lh; currentLedgerEntries = 0; currentLedgerSize = 0; @@ -1507,6 +1507,8 @@ public void operationComplete(Void v, Stat stat) { @Override public void operationFailed(MetaStoreException e) { log.warn("[{}] Error updating meta data with the new list of ledgers: {}", name, e.getMessage()); + // Remove the ledger, since we failed to update the list + ledgers.remove(lh.getId()); mbean.startDataLedgerDeleteOp(); bookKeeper.asyncDeleteLedger(lh.getId(), (rc1, ctx1) -> { mbean.endDataLedgerDeleteOp(); From 76334b7402b0652484eeaa57ceb85c0215bf1e27 Mon Sep 17 00:00:00 2001 From: chenlin <1572139390@qq.com> Date: Thu, 7 Jul 2022 13:31:10 +0800 Subject: [PATCH 5/9] use ledgersTmp to write zookkeeper,after sucess,update ldgers --- .../mledger/impl/ManagedLedgerImpl.java | 23 +++++++++++++++++-- 1 file changed, 21 insertions(+), 2 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index f92156ba2b196..130fde1fce74e 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -1482,7 +1482,7 @@ public synchronized void createComplete(int rc, final LedgerHandle lh, Object ct lastLedgerCreationFailureTimestamp = clock.millis(); } else { log.info("[{}] Created new ledger {}", name, lh.getId()); - ledgers.put(lh.getId(), LedgerInfo.newBuilder().setLedgerId(lh.getId()).setTimestamp(0).build()); + NavigableMap ledgersTmp = new ConcurrentSkipListMap<>(ledgers); final MetaStoreCallback cb = new MetaStoreCallback() { @Override public void operationComplete(Void v, Stat stat) { @@ -1490,6 +1490,7 @@ public void operationComplete(Void v, Stat stat) { log.debug("[{}] Updating of ledgers list after create complete. version={}", name, stat); } ledgersStat = stat; + ledgers.put(lh.getId(), LedgerInfo.newBuilder().setLedgerId(lh.getId()).setTimestamp(0).build()); currentLedger = lh; currentLedgerEntries = 0; currentLedgerSize = 0; @@ -1544,7 +1545,7 @@ public void operationFailed(MetaStoreException e) { } }; - updateLedgersListAfterRollover(cb); + updateLedgersListAfterRollover(cb, getManagedLedgerInfo(ledgersTmp)); } } @@ -1561,6 +1562,20 @@ private void updateLedgersListAfterRollover(MetaStoreCallback callback) { store.asyncUpdateLedgerIds(name, getManagedLedgerInfo(), ledgersStat, callback); } + private void updateLedgersListAfterRollover(MetaStoreCallback callback, ManagedLedgerInfo mlInfo) { + if (!metadataMutex.tryLock()) { + // Defer update for later + scheduledExecutor.schedule(() -> updateLedgersListAfterRollover(callback, mlInfo), + 100, TimeUnit.MILLISECONDS); + return; + } + + if (log.isDebugEnabled()) { + log.debug("[{}] Updating ledgers ids with new ledger. version={}", name, ledgersStat); + } + store.asyncUpdateLedgerIds(name, mlInfo, ledgersStat, callback); + } + public synchronized void updateLedgersIdsComplete(Stat stat) { STATE_UPDATER.set(this, State.LedgerOpened); updateLastLedgerCreatedTimeAndScheduleRolloverTask(); @@ -3568,6 +3583,10 @@ private ManagedLedgerInfo getManagedLedgerInfo() { return buildManagedLedgerInfo(ledgers); } + private ManagedLedgerInfo getManagedLedgerInfo(NavigableMap ledgersTmp) { + return buildManagedLedgerInfo(ledgersTmp); + } + private ManagedLedgerInfo buildManagedLedgerInfo(Map ledgers) { ManagedLedgerInfo.Builder mlInfo = ManagedLedgerInfo.newBuilder().addAllLedgerInfo(ledgers.values()); if (state == State.Terminated) { From 07a1645421f162d804cadcdd0a4670a2815944b7 Mon Sep 17 00:00:00 2001 From: chenlin <1572139390@qq.com> Date: Thu, 7 Jul 2022 14:48:59 +0800 Subject: [PATCH 6/9] update updateLedgersListAfterRollover --- .../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 11 +---------- 1 file changed, 1 insertion(+), 10 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index 130fde1fce74e..2a01327c947d1 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -1550,16 +1550,7 @@ public void operationFailed(MetaStoreException e) { } private void updateLedgersListAfterRollover(MetaStoreCallback callback) { - if (!metadataMutex.tryLock()) { - // Defer update for later - scheduledExecutor.schedule(() -> updateLedgersListAfterRollover(callback), 100, TimeUnit.MILLISECONDS); - return; - } - - if (log.isDebugEnabled()) { - log.debug("[{}] Updating ledgers ids with new ledger. version={}", name, ledgersStat); - } - store.asyncUpdateLedgerIds(name, getManagedLedgerInfo(), ledgersStat, callback); + updateLedgersListAfterRollover(callback, getManagedLedgerInfo(ledgers)); } private void updateLedgersListAfterRollover(MetaStoreCallback callback, ManagedLedgerInfo mlInfo) { From e13e2fcb02081d5a579e30eb0a269dfffffa632a Mon Sep 17 00:00:00 2001 From: chenlin <1572139390@qq.com> Date: Thu, 7 Jul 2022 14:53:08 +0800 Subject: [PATCH 7/9] put lh to ledgersTmp --- .../apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index 2a01327c947d1..809ae7c3edbf9 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -1483,6 +1483,7 @@ public synchronized void createComplete(int rc, final LedgerHandle lh, Object ct } else { log.info("[{}] Created new ledger {}", name, lh.getId()); NavigableMap ledgersTmp = new ConcurrentSkipListMap<>(ledgers); + ledgersTmp.put(lh.getId(), LedgerInfo.newBuilder().setLedgerId(lh.getId()).setTimestamp(0).build()); final MetaStoreCallback cb = new MetaStoreCallback() { @Override public void operationComplete(Void v, Stat stat) { @@ -1548,11 +1549,6 @@ public void operationFailed(MetaStoreException e) { updateLedgersListAfterRollover(cb, getManagedLedgerInfo(ledgersTmp)); } } - - private void updateLedgersListAfterRollover(MetaStoreCallback callback) { - updateLedgersListAfterRollover(callback, getManagedLedgerInfo(ledgers)); - } - private void updateLedgersListAfterRollover(MetaStoreCallback callback, ManagedLedgerInfo mlInfo) { if (!metadataMutex.tryLock()) { // Defer update for later From b36da7baead47892dee2fd7220a22098cf14d453 Mon Sep 17 00:00:00 2001 From: chenlin <1572139390@qq.com> Date: Thu, 7 Jul 2022 15:52:05 +0800 Subject: [PATCH 8/9] extend buildManagedLedgerInfo --- .../mledger/impl/ManagedLedgerImpl.java | 20 ++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index 809ae7c3edbf9..71cd224b4a73d 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -1482,8 +1482,7 @@ public synchronized void createComplete(int rc, final LedgerHandle lh, Object ct lastLedgerCreationFailureTimestamp = clock.millis(); } else { log.info("[{}] Created new ledger {}", name, lh.getId()); - NavigableMap ledgersTmp = new ConcurrentSkipListMap<>(ledgers); - ledgersTmp.put(lh.getId(), LedgerInfo.newBuilder().setLedgerId(lh.getId()).setTimestamp(0).build()); + LedgerInfo newLedger = LedgerInfo.newBuilder().setLedgerId(lh.getId()).setTimestamp(0).build(); final MetaStoreCallback cb = new MetaStoreCallback() { @Override public void operationComplete(Void v, Stat stat) { @@ -1491,7 +1490,7 @@ public void operationComplete(Void v, Stat stat) { log.debug("[{}] Updating of ledgers list after create complete. version={}", name, stat); } ledgersStat = stat; - ledgers.put(lh.getId(), LedgerInfo.newBuilder().setLedgerId(lh.getId()).setTimestamp(0).build()); + ledgers.put(lh.getId(), newLedger); currentLedger = lh; currentLedgerEntries = 0; currentLedgerSize = 0; @@ -1509,8 +1508,6 @@ public void operationComplete(Void v, Stat stat) { @Override public void operationFailed(MetaStoreException e) { log.warn("[{}] Error updating meta data with the new list of ledgers: {}", name, e.getMessage()); - // Remove the ledger, since we failed to update the list - ledgers.remove(lh.getId()); mbean.startDataLedgerDeleteOp(); bookKeeper.asyncDeleteLedger(lh.getId(), (rc1, ctx1) -> { mbean.endDataLedgerDeleteOp(); @@ -1546,7 +1543,7 @@ public void operationFailed(MetaStoreException e) { } }; - updateLedgersListAfterRollover(cb, getManagedLedgerInfo(ledgersTmp)); + updateLedgersListAfterRollover(cb, getManagedLedgerInfo(newLedger)); } } private void updateLedgersListAfterRollover(MetaStoreCallback callback, ManagedLedgerInfo mlInfo) { @@ -3570,12 +3567,17 @@ private ManagedLedgerInfo getManagedLedgerInfo() { return buildManagedLedgerInfo(ledgers); } - private ManagedLedgerInfo getManagedLedgerInfo(NavigableMap ledgersTmp) { - return buildManagedLedgerInfo(ledgersTmp); + private ManagedLedgerInfo getManagedLedgerInfo(LedgerInfo newLedger) { + ManagedLedgerInfo.Builder mlInfo = ManagedLedgerInfo.newBuilder().addAllLedgerInfo(ledgers.values()) + .addLedgerInfo(newLedger); + return buildManagedLedgerInfo(mlInfo); } - private ManagedLedgerInfo buildManagedLedgerInfo(Map ledgers) { ManagedLedgerInfo.Builder mlInfo = ManagedLedgerInfo.newBuilder().addAllLedgerInfo(ledgers.values()); + return buildManagedLedgerInfo(mlInfo); + } + + private ManagedLedgerInfo buildManagedLedgerInfo(ManagedLedgerInfo.Builder mlInfo) { if (state == State.Terminated) { mlInfo.setTerminatedPosition(NestedPositionInfo.newBuilder().setLedgerId(lastConfirmedEntry.getLedgerId()) .setEntryId(lastConfirmedEntry.getEntryId())); From 1200640c589133c3a6b2e63431e8d54f95a9fe15 Mon Sep 17 00:00:00 2001 From: chenlin <1572139390@qq.com> Date: Thu, 7 Jul 2022 17:37:52 +0800 Subject: [PATCH 9/9] getManagedLedgerInfo(newLedger) after get metadataMutex --- .../apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index 71cd224b4a73d..a5612bc0eb9fb 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -1543,13 +1543,13 @@ public void operationFailed(MetaStoreException e) { } }; - updateLedgersListAfterRollover(cb, getManagedLedgerInfo(newLedger)); + updateLedgersListAfterRollover(cb, newLedger); } } - private void updateLedgersListAfterRollover(MetaStoreCallback callback, ManagedLedgerInfo mlInfo) { + private void updateLedgersListAfterRollover(MetaStoreCallback callback, LedgerInfo newLedger) { if (!metadataMutex.tryLock()) { // Defer update for later - scheduledExecutor.schedule(() -> updateLedgersListAfterRollover(callback, mlInfo), + scheduledExecutor.schedule(() -> updateLedgersListAfterRollover(callback, newLedger), 100, TimeUnit.MILLISECONDS); return; } @@ -1557,6 +1557,7 @@ private void updateLedgersListAfterRollover(MetaStoreCallback callback, Ma if (log.isDebugEnabled()) { log.debug("[{}] Updating ledgers ids with new ledger. version={}", name, ledgersStat); } + ManagedLedgerInfo mlInfo = getManagedLedgerInfo(newLedger); store.asyncUpdateLedgerIds(name, mlInfo, ledgersStat, callback); }