Skip to content

Conversation

@horizonzy
Copy link
Member

@horizonzy horizonzy commented Apr 10, 2022

Master Issue: #5669 #5809

Motivation

Relational code:

public static OpReadEntry create(ManagedCursorImpl cursor, PositionImpl readPositionRef, int count,
ReadEntriesCallback callback, Object ctx, PositionImpl maxPosition) {
OpReadEntry op = RECYCLER.get();
op.readPosition = cursor.ledger.startReadOperationOnLedger(readPositionRef, op);
op.cursor = cursor;
op.count = count;
op.callback = callback;
op.entries = Lists.newArrayList();
if (maxPosition == null) {
maxPosition = PositionImpl.LATEST;
}
op.maxPosition = maxPosition;
op.ctx = ctx;
op.nextReadPosition = PositionImpl.get(op.readPosition);
return op;
}

PositionImpl startReadOperationOnLedger(PositionImpl position, OpReadEntry opReadEntry) {
Long ledgerId = ledgers.ceilingKey(position.getLedgerId());
if (null == ledgerId) {
opReadEntry.readEntriesFailed(new ManagedLedgerException.NoMoreEntriesToReadException("The ceilingKey(K key"
+ ") method is used to return the least key greater than or equal to the given key, "
+ "or null if there is no such key"), null);
}
if (ledgerId != position.getLedgerId()) {
// The ledger pointed by this position does not exist anymore. It was deleted because it was empty. We need
// to skip on the next available ledger
position = new PositionImpl(ledgerId, 0);
}
return position;
}

In #5809, there be 3 potential problem.

case 1: In OpAddEntry construction, it use ManagedLedgerImpl#startReadOperationOnLedger to setup readPosition, in startReadOperationOnLedger, it will callback OpAddEntry, but now some property not initial yet(like curtos,callback, entries...), cause npe.
case 2:
in startReadOperationOnLedger, the callback did't pass on ctx, the callback process need it, also case npe.
case 3:
in startReadOperationOnLedger, it didn't return when not found ledger id. The judgement unbox also case npe.

Modifications

patch #5809

Documentation

  • no-need-doc

@github-actions
Copy link

@horizonzy:Thanks for your contribution. For this PR, do we need to update docs?
(The PR template contains info about doc, which helps others know more about the changes. Can you provide doc-related info in this and future PR descriptions? Thanks)

@github-actions
Copy link

@horizonzy:Thanks for providing doc info!

@github-actions github-actions bot added doc-not-needed Your PR changes do not impact docs and removed doc-label-missing labels Apr 10, 2022
Copy link
Member

@mattisonchao mattisonchao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall LGTM, but i have a question:
Could we check for an OpReadEntry earlier (to avoid creating useless objects) or throw an exception (to achieve fail fast) when creating it?
Please let me know what you think, thanks~

@horizonzy
Copy link
Member Author

Overall LGTM, but i have a question: Could we check for an OpReadEntry earlier (to avoid creating useless objects) or throw an exception (to achieve fail fast) when creating it? Please let me know what you think, thanks~

yep, I have thought it, but it's not atomic, it need user make invalid outside. This way more secure.

@horizonzy
Copy link
Member Author

Overall LGTM, but i have a question: Could we check for an OpReadEntry earlier (to avoid creating useless objects) or throw an exception (to achieve fail fast) when creating it? Please let me know what you think, thanks~

And the operation startReadOperationOnLedger not only happen at opAddEntry construction, so I change the invalid in the method.

@mattisonchao
Copy link
Member

mattisonchao commented Apr 10, 2022

Overall LGTM, but i have a question: Could we check for an OpReadEntry earlier (to avoid creating useless objects) or throw an exception (to achieve fail fast) when creating it? Please let me know what you think, thanks~

yep, I have thought it, but it's not atomic, it need user make invalid outside. This way more secure.

Got it. I was just wondering if there is a race condition here.
I would also like to say that we can throw an exception when we get an empty position. But it doesn't look particularly different from your approach.

Copy link
Member

@mattisonchao mattisonchao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great work! LGTM +1

opReadEntry.readEntriesFailed(new ManagedLedgerException.NoMoreEntriesToReadException("The ceilingKey(K key"
+ ") method is used to return the least key greater than or equal to the given key, "
+ "or null if there is no such key"), null);
opReadEntry.makeInvalid();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This changes the behavior of the method. So we need to go through all the usage of this method.
As far as I can see,

  1. This line makes no sense any more.
    cursor.ledger.startReadOperationOnLedger(nextReadPosition, OpReadEntry.this);
  2. Also affects the OpReadEntry.create in
    OpReadEntry op = OpReadEntry.create(this, readPosition, numberOfEntriesToRead, callback,

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your reminder. I have checked it, found another problem, I will fix it at another pr.
This one patch just for npe.

Copy link
Member Author

@horizonzy horizonzy Apr 10, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

location 1:
It's useless code, I remove it at #15104. So needn't check it.

location 2:
It's a wait opReadEntry, when opAddEntry completed, it will nofity wait opReadEntry read again. The logicment is not conflict with this changes. And I fix the bad behavior when wait opReadEntry read at #15102

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a wait opReadEntry, when opAddEntry completed, it will nofity wait opReadEntry read again.

Can you add a unit test to cover this path?
It seems readPosition of this invalid OpReadEntry is used before asyncReadEntries in hasMoreEntries

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a unit test to cover this path? It seems readPosition of this invalid OpReadEntry is used before asyncReadEntries in hasMoreEntries

Maybe not, it will still be used in asyncReadEntries.

At line_768, the wait OpReadEntry set to property WAITING_READ_OP_UPDATER.

if (!WAITING_READ_OP_UPDATER.compareAndSet(this, null, op)) {

In notifyEntriesAvailable, get wait OpReadEntry from WAITING_READ_OP_UPDATER, and handle it to asyncReadEntries .

And in notifyEntriesAvailable, there be a bad behavior. fixes at #15102

void notifyEntriesAvailable() {
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Received ml notification", ledger.getName(), name);
}
OpReadEntry opReadEntry = WAITING_READ_OP_UPDATER.getAndSet(this, null);
if (opReadEntry != null) {
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Received notification of new messages persisted, reading at {} -- last: {}",
ledger.getName(), name, opReadEntry.readPosition, ledger.lastConfirmedEntry);
log.debug("[{}] Consumer {} cursor notification: other counters: consumed {} mdPos {} rdPos {}",
ledger.getName(), name, messagesConsumedCounter, markDeletePosition, readPosition);
}
PENDING_READ_OPS_UPDATER.incrementAndGet(this);
opReadEntry.readPosition = (PositionImpl) getReadPosition();
ledger.asyncReadEntries(opReadEntry);
} else {
// No one is waiting to be notified. Ignore
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Received notification but had no pending read operation", ledger.getName(), name);
}
}
}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add unit to cover wait opReadEntry case. And close #15102, the handle push at this pr.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

all done, pls check it again when you convenient.

@lhotari
Copy link
Member

lhotari commented Apr 10, 2022

There's also #12396 which fixes a NPE in OpReadEntry. That might be another problem that it's fixing.

@horizonzy
Copy link
Member Author

There's also #12396 which fixes a NPE in OpReadEntry. That might be another problem that it's fixing.

well, I will check it together.

return null;
} else {
// for wait opReadEntry, the readPosition will recalculate.
opReadEntry.makeValid();
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here for wait opReadEntry, it maybe change from invalid to valid. So add this logicment.

@codelipenghui codelipenghui added this to the 2.11.0 milestone Apr 25, 2022
@github-actions
Copy link

The pr had no activity for 30 days, mark with Stale label.

@github-actions
Copy link

The pr had no activity for 30 days, mark with Stale label.

@horizonzy horizonzy closed this Jun 30, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

doc-not-needed Your PR changes do not impact docs lifecycle/stale Stale

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants