Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1173,58 +1173,54 @@ void initializeCursorPosition(Pair<PositionImpl, Long> lastPositionCounter) {
* @return the previous acknowledged position
*/
PositionImpl setAcknowledgedPosition(PositionImpl newMarkDeletePosition) {
lock.writeLock().lock();
try {
if (newMarkDeletePosition.compareTo(markDeletePosition) < 0) {
throw new IllegalArgumentException("Mark deleting an already mark-deleted position");
}

if (readPosition.compareTo(newMarkDeletePosition) <= 0) {
// If the position that is mark-deleted is past the read position, it
// means that the client has skipped some entries. We need to move
// read position forward
PositionImpl oldReadPosition = readPosition;
readPosition = ledger.getNextValidPosition(newMarkDeletePosition);
if (newMarkDeletePosition.compareTo(markDeletePosition) < 0) {
throw new IllegalArgumentException("Mark deleting an already mark-deleted position");
}

if (log.isDebugEnabled()) {
log.debug("Moved read position from: {} to: {}", oldReadPosition, readPosition);
}
if (readPosition.compareTo(newMarkDeletePosition) <= 0) {
// If the position that is mark-deleted is past the read position, it
// means that the client has skipped some entries. We need to move
// read position forward
PositionImpl oldReadPosition = readPosition;
readPosition = ledger.getNextValidPosition(newMarkDeletePosition);

oldReadPosition.recycle();
if (log.isDebugEnabled()) {
log.debug("Moved read position from: {} to: {}", oldReadPosition, readPosition);
}

PositionImpl oldMarkDeletePosition = markDeletePosition;
oldReadPosition.recycle();
}

if (!newMarkDeletePosition.equals(oldMarkDeletePosition)) {
long skippedEntries = 0;
if (newMarkDeletePosition.getLedgerId() == oldMarkDeletePosition.getLedgerId()
&& newMarkDeletePosition.getEntryId() == oldMarkDeletePosition.getEntryId() + 1) {
// Mark-deleting the position next to current one
skippedEntries = individualDeletedMessages.contains(newMarkDeletePosition) ? 0 : 1;
} else {
skippedEntries = getNumberOfEntries(Range.openClosed(oldMarkDeletePosition, newMarkDeletePosition));
}
PositionImpl positionAfterNewMarkDelete = ledger.getNextValidPosition(newMarkDeletePosition);
if (individualDeletedMessages.contains(positionAfterNewMarkDelete)) {
Range<PositionImpl> rangeToBeMarkDeleted = individualDeletedMessages
.rangeContaining(positionAfterNewMarkDelete);
newMarkDeletePosition = rangeToBeMarkDeleted.upperEndpoint();
}
PositionImpl oldMarkDeletePosition = markDeletePosition;

if (log.isDebugEnabled()) {
log.debug("Moved ack position from: {} to: {} -- skipped: {}", oldMarkDeletePosition,
newMarkDeletePosition, skippedEntries);
}
messagesConsumedCounter += skippedEntries;
if (!newMarkDeletePosition.equals(oldMarkDeletePosition)) {
long skippedEntries = 0;
if (newMarkDeletePosition.getLedgerId() == oldMarkDeletePosition.getLedgerId()
&& newMarkDeletePosition.getEntryId() == oldMarkDeletePosition.getEntryId() + 1) {
// Mark-deleting the position next to current one
skippedEntries = individualDeletedMessages.contains(newMarkDeletePosition) ? 0 : 1;
} else {
skippedEntries = getNumberOfEntries(Range.openClosed(oldMarkDeletePosition, newMarkDeletePosition));
}
PositionImpl positionAfterNewMarkDelete = ledger.getNextValidPosition(newMarkDeletePosition);
if (individualDeletedMessages.contains(positionAfterNewMarkDelete)) {
Range<PositionImpl> rangeToBeMarkDeleted = individualDeletedMessages
.rangeContaining(positionAfterNewMarkDelete);
newMarkDeletePosition = rangeToBeMarkDeleted.upperEndpoint();
}

// markDelete-position and clear out deletedMsgSet
markDeletePosition = PositionImpl.get(newMarkDeletePosition);
individualDeletedMessages.remove(Range.atMost(markDeletePosition));
oldMarkDeletePosition.recycle();
} finally {
lock.writeLock().unlock();
if (log.isDebugEnabled()) {
log.debug("Moved ack position from: {} to: {} -- skipped: {}", oldMarkDeletePosition,
newMarkDeletePosition, skippedEntries);
}
messagesConsumedCounter += skippedEntries;
}

// markDelete-position and clear out deletedMsgSet
markDeletePosition = PositionImpl.get(newMarkDeletePosition);
individualDeletedMessages.remove(Range.atMost(markDeletePosition));
oldMarkDeletePosition.recycle();

return newMarkDeletePosition;
}

Expand Down Expand Up @@ -1254,11 +1250,14 @@ public void asyncMarkDelete(final Position position, final MarkDeleteCallback ca
}
PositionImpl newPosition = (PositionImpl) position;

lock.writeLock().lock();
try {
newPosition = setAcknowledgedPosition(newPosition);
} catch (IllegalArgumentException e) {
callback.markDeleteFailed(new ManagedLedgerException(e), ctx);
return;
} finally {
lock.writeLock().unlock();
}

// Apply rate limiting to mark-delete operations
Expand Down Expand Up @@ -1487,6 +1486,12 @@ public void asyncDelete(Position pos, final AsyncCallbacks.DeleteCallback callba
newMarkDeletePosition = range.upperEndpoint();
}
}

if (newMarkDeletePosition != null) {
newMarkDeletePosition = setAcknowledgedPosition(newMarkDeletePosition);
} else {
newMarkDeletePosition = markDeletePosition;
}
} catch (Exception e) {
log.warn("[{}] [{}] Error while updating individualDeletedMessages [{}]", ledger.getName(), name,
e.getMessage(), e);
Expand All @@ -1496,19 +1501,13 @@ public void asyncDelete(Position pos, final AsyncCallbacks.DeleteCallback callba
lock.writeLock().unlock();
}

try {
if (newMarkDeletePosition != null) {
newMarkDeletePosition = setAcknowledgedPosition(newMarkDeletePosition);
} else {
newMarkDeletePosition = markDeletePosition;
}

// Apply rate limiting to mark-delete operations
if (markDeleteLimiter != null && !markDeleteLimiter.tryAcquire()) {
callback.deleteComplete(ctx);
return;
}
// Apply rate limiting to mark-delete operations
if (markDeleteLimiter != null && !markDeleteLimiter.tryAcquire()) {
callback.deleteComplete(ctx);
return;
}

try {
internalAsyncMarkDelete(newMarkDeletePosition, new MarkDeleteCallback() {
@Override
public void markDeleteComplete(Object ctx) {
Expand Down