From 2f52c4fd5046e591a9244460f2bce6db6fcce11c Mon Sep 17 00:00:00 2001 From: Matteo Merli Date: Fri, 10 Mar 2017 14:48:16 -0800 Subject: [PATCH] Fixed race condition in cursor.asyncDelete() --- .../mledger/impl/ManagedCursorImpl.java | 109 +++++++++--------- 1 file changed, 54 insertions(+), 55 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java index cd5b60c35e9cd..8a599bae06b60 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java @@ -1173,58 +1173,54 @@ void initializeCursorPosition(Pair lastPositionCounter) { * @return the previous acknowledged position */ PositionImpl setAcknowledgedPosition(PositionImpl newMarkDeletePosition) { - lock.writeLock().lock(); - try { - if (newMarkDeletePosition.compareTo(markDeletePosition) < 0) { - throw new IllegalArgumentException("Mark deleting an already mark-deleted position"); - } - - if (readPosition.compareTo(newMarkDeletePosition) <= 0) { - // If the position that is mark-deleted is past the read position, it - // means that the client has skipped some entries. We need to move - // read position forward - PositionImpl oldReadPosition = readPosition; - readPosition = ledger.getNextValidPosition(newMarkDeletePosition); + if (newMarkDeletePosition.compareTo(markDeletePosition) < 0) { + throw new IllegalArgumentException("Mark deleting an already mark-deleted position"); + } - if (log.isDebugEnabled()) { - log.debug("Moved read position from: {} to: {}", oldReadPosition, readPosition); - } + if (readPosition.compareTo(newMarkDeletePosition) <= 0) { + // If the position that is mark-deleted is past the read position, it + // means that the client has skipped some entries. We need to move + // read position forward + PositionImpl oldReadPosition = readPosition; + readPosition = ledger.getNextValidPosition(newMarkDeletePosition); - oldReadPosition.recycle(); + if (log.isDebugEnabled()) { + log.debug("Moved read position from: {} to: {}", oldReadPosition, readPosition); } - PositionImpl oldMarkDeletePosition = markDeletePosition; + oldReadPosition.recycle(); + } - if (!newMarkDeletePosition.equals(oldMarkDeletePosition)) { - long skippedEntries = 0; - if (newMarkDeletePosition.getLedgerId() == oldMarkDeletePosition.getLedgerId() - && newMarkDeletePosition.getEntryId() == oldMarkDeletePosition.getEntryId() + 1) { - // Mark-deleting the position next to current one - skippedEntries = individualDeletedMessages.contains(newMarkDeletePosition) ? 0 : 1; - } else { - skippedEntries = getNumberOfEntries(Range.openClosed(oldMarkDeletePosition, newMarkDeletePosition)); - } - PositionImpl positionAfterNewMarkDelete = ledger.getNextValidPosition(newMarkDeletePosition); - if (individualDeletedMessages.contains(positionAfterNewMarkDelete)) { - Range rangeToBeMarkDeleted = individualDeletedMessages - .rangeContaining(positionAfterNewMarkDelete); - newMarkDeletePosition = rangeToBeMarkDeleted.upperEndpoint(); - } + PositionImpl oldMarkDeletePosition = markDeletePosition; - if (log.isDebugEnabled()) { - log.debug("Moved ack position from: {} to: {} -- skipped: {}", oldMarkDeletePosition, - newMarkDeletePosition, skippedEntries); - } - messagesConsumedCounter += skippedEntries; + if (!newMarkDeletePosition.equals(oldMarkDeletePosition)) { + long skippedEntries = 0; + if (newMarkDeletePosition.getLedgerId() == oldMarkDeletePosition.getLedgerId() + && newMarkDeletePosition.getEntryId() == oldMarkDeletePosition.getEntryId() + 1) { + // Mark-deleting the position next to current one + skippedEntries = individualDeletedMessages.contains(newMarkDeletePosition) ? 0 : 1; + } else { + skippedEntries = getNumberOfEntries(Range.openClosed(oldMarkDeletePosition, newMarkDeletePosition)); + } + PositionImpl positionAfterNewMarkDelete = ledger.getNextValidPosition(newMarkDeletePosition); + if (individualDeletedMessages.contains(positionAfterNewMarkDelete)) { + Range rangeToBeMarkDeleted = individualDeletedMessages + .rangeContaining(positionAfterNewMarkDelete); + newMarkDeletePosition = rangeToBeMarkDeleted.upperEndpoint(); } - // markDelete-position and clear out deletedMsgSet - markDeletePosition = PositionImpl.get(newMarkDeletePosition); - individualDeletedMessages.remove(Range.atMost(markDeletePosition)); - oldMarkDeletePosition.recycle(); - } finally { - lock.writeLock().unlock(); + if (log.isDebugEnabled()) { + log.debug("Moved ack position from: {} to: {} -- skipped: {}", oldMarkDeletePosition, + newMarkDeletePosition, skippedEntries); + } + messagesConsumedCounter += skippedEntries; } + + // markDelete-position and clear out deletedMsgSet + markDeletePosition = PositionImpl.get(newMarkDeletePosition); + individualDeletedMessages.remove(Range.atMost(markDeletePosition)); + oldMarkDeletePosition.recycle(); + return newMarkDeletePosition; } @@ -1254,11 +1250,14 @@ public void asyncMarkDelete(final Position position, final MarkDeleteCallback ca } PositionImpl newPosition = (PositionImpl) position; + lock.writeLock().lock(); try { newPosition = setAcknowledgedPosition(newPosition); } catch (IllegalArgumentException e) { callback.markDeleteFailed(new ManagedLedgerException(e), ctx); return; + } finally { + lock.writeLock().unlock(); } // Apply rate limiting to mark-delete operations @@ -1487,6 +1486,12 @@ public void asyncDelete(Position pos, final AsyncCallbacks.DeleteCallback callba newMarkDeletePosition = range.upperEndpoint(); } } + + if (newMarkDeletePosition != null) { + newMarkDeletePosition = setAcknowledgedPosition(newMarkDeletePosition); + } else { + newMarkDeletePosition = markDeletePosition; + } } catch (Exception e) { log.warn("[{}] [{}] Error while updating individualDeletedMessages [{}]", ledger.getName(), name, e.getMessage(), e); @@ -1496,19 +1501,13 @@ public void asyncDelete(Position pos, final AsyncCallbacks.DeleteCallback callba lock.writeLock().unlock(); } - try { - if (newMarkDeletePosition != null) { - newMarkDeletePosition = setAcknowledgedPosition(newMarkDeletePosition); - } else { - newMarkDeletePosition = markDeletePosition; - } - - // Apply rate limiting to mark-delete operations - if (markDeleteLimiter != null && !markDeleteLimiter.tryAcquire()) { - callback.deleteComplete(ctx); - return; - } + // Apply rate limiting to mark-delete operations + if (markDeleteLimiter != null && !markDeleteLimiter.tryAcquire()) { + callback.deleteComplete(ctx); + return; + } + try { internalAsyncMarkDelete(newMarkDeletePosition, new MarkDeleteCallback() { @Override public void markDeleteComplete(Object ctx) {