Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2257,37 +2257,51 @@ public void asyncMarkDelete(final Position position, Map<String, Long> propertie
log.debug("[{}] Mark delete cursor {} up to position: {}", ledger.getName(), name, position);
}

// Snapshot all positions into local variables to avoid race condition.
Position newPosition = ackBatchPosition(position);
Position moveForwardPosition = newPosition;
Position markDeletePos = markDeletePosition;
Position lastConfirmedEntry = ledger.getLastConfirmedEntry();
if (lastConfirmedEntry.compareTo(newPosition) < 0) {
boolean shouldCursorMoveForward = false;
try {
long ledgerEntries = ledger.getLedgerInfo(markDeletePos.getLedgerId()).get().getEntries();
boolean shouldCursorMoveForward = false;
try {
if (lastConfirmedEntry.getLedgerId() >= newPosition.getLedgerId()) {
LedgerInfo curMarkDeleteLedgerInfo = ledger.getLedgerInfo(newPosition.getLedgerId()).get();
Long nextValidLedger = ledger.getNextValidLedger(newPosition.getLedgerId());
shouldCursorMoveForward = (nextValidLedger != null)
&& (curMarkDeleteLedgerInfo != null
&& newPosition.getEntryId() + 1 >= curMarkDeleteLedgerInfo.getEntries());
if (shouldCursorMoveForward) {
moveForwardPosition = PositionFactory.create(nextValidLedger, -1);
}
Comment on lines +2269 to +2275
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This won't behave in the expected way in all cases. the getNextValidLedger will return the given ledger id if it's already the last ledger.
Therefore moveForwardPosition = PositionFactory.create(nextValidLedger, -1); would result in the wrong behavior.

btw. For a LedgerInfo instance, it's currently possible to detect the rollover by checking the timestamp field. It's != 0 for rolled over ledgers.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another detail of batch acknowledgements can be seen in ackBatchPosition method. If the last entry isn't fully acknowledged, the acked position would be the previous position. I'm not sure if this impacts this logic.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the getNextValidLedger will return the given ledger id if it's already the last ledger.

the getNextValidLedger method calls ledgers.ceilingKey(ledgerId + 1), if ledgerId + 1 doesn't exists in this NavigableMap, the method will return null, indicating the current ledger is the last ledger.

public Long getNextValidLedger(long ledgerId) {
return ledgers.ceilingKey(ledgerId + 1);
}

btw. For a LedgerInfo instance, it's currently possible to detect the rollover by checking the timestamp field. It's != 0 for rolled over ledgers.

I think nextValidLedger != null is ok to detect the rollover, the timestamp field may be modified by mistake.

Another detail of batch acknowledgements can be seen in ackBatchPosition method. If the last entry isn't fully acknowledged, the acked position would be the previous position. I'm not sure if this impacts this logic.

Similar logic in the ManagedLedgerImpl#maybeUpdateCursorBeforeTrimmingConsumedLedger() method, I'm wondering if this method also affects the ackBatchPosition method.

// Snapshot positions into a local variables to avoid race condition.
Position markDeletedPosition = cursor.getMarkDeletedPosition();
Position lastAckedPosition = markDeletedPosition;
LedgerInfo curPointedLedger = ledgers.get(lastAckedPosition.getLedgerId());
LedgerInfo nextPointedLedger = Optional.ofNullable(ledgers.higherEntry(lastAckedPosition.getLedgerId()))
.map(Map.Entry::getValue).orElse(null);
if (curPointedLedger != null) {
if (nextPointedLedger != null) {
if (lastAckedPosition.getEntryId() != -1
&& lastAckedPosition.getEntryId() + 1 >= curPointedLedger.getEntries()) {
lastAckedPosition = PositionFactory.create(nextPointedLedger.getLedgerId(), -1);
}
} else {
log.debug("No need to reset cursor: {}, current ledger is the last ledger.", cursor);
}
} else {
log.warn("Cursor: {} does not exist in the managed-ledger.", cursor);
}

} else {
Long nextValidLedger = ledger.getNextValidLedger(lastConfirmedEntry.getLedgerId());
shouldCursorMoveForward = nextValidLedger != null
&& (markDeletePos.getEntryId() + 1 >= ledgerEntries)
&& (newPosition.getLedgerId() == nextValidLedger);
} catch (Exception e) {
log.warn("Failed to get ledger entries while setting mark-delete-position", e);
shouldCursorMoveForward = (nextValidLedger != null)
&& (newPosition.getLedgerId() == nextValidLedger)
&& (newPosition.getEntryId() == -1);
}
} catch (Exception e) {
log.warn("Failed to get ledger entries while setting mark-delete-position", e);
}

if (shouldCursorMoveForward) {
log.info("[{}] move mark-delete-position from {} to {} since all the entries have been consumed",
ledger.getName(), markDeletePos, newPosition);
} else {
if (log.isDebugEnabled()) {
log.debug("[{}] Failed mark delete due to invalid markDelete {} is ahead of last-confirmed-entry {}"
+ " for cursor [{}]", ledger.getName(), position, lastConfirmedEntry, name);
}
callback.markDeleteFailed(new ManagedLedgerException("Invalid mark deleted position"), ctx);
return;
if (shouldCursorMoveForward) {
log.info("[{}] move cursor {} mark-delete-position from {} to {} since all the entries have been consumed,"
+ " last-confirmed-entry: {}, attempt position: {}, ack-batch position: {}", ledger.getName(),
name, markDeletePos, moveForwardPosition, lastConfirmedEntry, position, newPosition);
} else if (lastConfirmedEntry.compareTo(newPosition) < 0) {
// If newPosition>lastConfirmedEntry and current ledger entries are not fully consumed,
// in which case the newPosition is an invalid mark delete position.
if (log.isDebugEnabled()) {
log.debug("[{}] Failed mark delete due to invalid markDelete {} is ahead of last-confirmed-entry {}"
+ " for cursor [{}], attempt position: {}", ledger.getName(), newPosition, lastConfirmedEntry,
name, position);
}
callback.markDeleteFailed(new ManagedLedgerException("Invalid mark deleted position"), ctx);
return;
}

lock.writeLock().lock();
try {
newPosition = setAcknowledgedPosition(newPosition);
moveForwardPosition = setAcknowledgedPosition(moveForwardPosition);
} catch (IllegalArgumentException e) {
callback.markDeleteFailed(getManagedLedgerException(e), ctx);
return;
Expand All @@ -2298,11 +2312,11 @@ public void asyncMarkDelete(final Position position, Map<String, Long> propertie
// Apply rate limiting to mark-delete operations
if (markDeleteLimiter != null && !markDeleteLimiter.tryAcquire()) {
isDirty = true;
updateLastMarkDeleteEntryToLatest(newPosition, properties);
updateLastMarkDeleteEntryToLatest(moveForwardPosition, properties);
callback.markDeleteComplete(ctx);
return;
}
internalAsyncMarkDelete(newPosition, properties, callback, ctx, null);
internalAsyncMarkDelete(moveForwardPosition, properties, callback, ctx, null);
}

private Position ackBatchPosition(Position position) {
Expand Down
Loading