Search before asking
Motivation
As the code shown, even if we disable txn, we use topic.getMaxReadPosition() which is topic.lastConfirmedEntry as OpReadEntry's maxPosition.
However, exist the situation:
- create OpReadEntry, using current topic's lastConfirmedEntry
- cursor hasNoMoreEntry, addWaitingCursor. opReadEntry become waiting.
- topic add new entry, trigger notifyEntriesAvailable() to notify opReadEntry. topic's lastConfirmedEntry move forward.
- Since opReadEntry.maxPosition is the previous lastConfirmedEntry, and now the lastConfirmedEntry has been changed. This op would enter opReadEntry.checkReadCompletion and trigger callback directly without reading any entry.
- Therefore, it would cause one more time to execute readEntry request.
So this is where we can improve, actually we may be able to execute the opReadEntry directly.
By the way, in version-2.9, if we disable txn, topic.getMaxReadPosition() is Position.latest, so opReadEntry.maxPosition is Position.latest too. And it would not have this issue since op.maxPosition is always > op.readPosition.
|
@Override |
|
public Position getMaxReadPosition() { |
|
return topic.getLastPosition(); |
|
} |
|
|
|
havePendingRead = true; |
|
if (consumer.readCompacted()) { |
|
boolean readFromEarliest = isFirstRead && MessageId.earliest.equals(consumer.getStartMessageId()) |
|
&& (!cursor.isDurable() || cursor.getName().equals(Compactor.COMPACTION_SUBSCRIPTION) |
|
|| hasValidMarkDeletePosition(cursor)); |
|
TopicCompactionService topicCompactionService = topic.getTopicCompactionService(); |
|
CompactedTopicUtils.asyncReadCompactedEntries(topicCompactionService, cursor, messagesToRead, |
|
bytesToRead, topic.getMaxReadPosition(), readFromEarliest, this, true, consumer); |
|
} else { |
|
ReadEntriesCtx readEntriesCtx = |
|
ReadEntriesCtx.create(consumer, consumer.getConsumerEpoch()); |
|
cursor.asyncReadEntriesOrWait(messagesToRead, |
|
bytesToRead, this, readEntriesCtx, topic.getMaxReadPosition()); |
|
} |
|
public void asyncReadEntriesWithSkipOrWait(int maxEntries, long maxSizeBytes, ReadEntriesCallback callback, |
|
Object ctx, Position maxPosition, |
|
Predicate<Position> skipCondition) { |
|
checkArgument(maxEntries > 0); |
|
if (isClosed()) { |
|
callback.readEntriesFailed(new CursorAlreadyClosedException("Cursor was already closed"), ctx); |
|
return; |
|
} |
|
|
|
int numberOfEntriesToRead = applyMaxSizeCap(maxEntries, maxSizeBytes); |
|
|
|
if (hasMoreEntries()) { |
|
// If we have available entries, we can read them immediately |
|
if (log.isDebugEnabled()) { |
|
log.debug("[{}] [{}] Read entries immediately", ledger.getName(), name); |
|
} |
|
asyncReadEntriesWithSkip(numberOfEntriesToRead, NO_MAX_SIZE_LIMIT, callback, ctx, |
|
maxPosition, skipCondition); |
|
} else { |
|
// Skip deleted entries. |
|
skipCondition = skipCondition == null ? this::isMessageDeleted : skipCondition.or(this::isMessageDeleted); |
|
OpReadEntry op = OpReadEntry.create(this, readPosition, numberOfEntriesToRead, callback, |
|
ctx, maxPosition, skipCondition); |
|
|
|
if (!WAITING_READ_OP_UPDATER.compareAndSet(this, null, op)) { |
|
op.recycle(); |
|
callback.readEntriesFailed(new ManagedLedgerException.ConcurrentWaitCallbackException(), ctx); |
|
return; |
|
} |
|
|
|
if (log.isDebugEnabled()) { |
|
log.debug("[{}] [{}] Deferring retry of read at position {}", ledger.getName(), name, op.readPosition); |
|
} |
|
|
|
// Check again for new entries after the configured time, then if still no entries are available register |
|
// to be notified |
|
if (getConfig().getNewEntriesCheckDelayInMillis() > 0) { |
|
ledger.getScheduledExecutor() |
|
.schedule(() -> checkForNewEntries(op, callback, ctx), |
|
getConfig().getNewEntriesCheckDelayInMillis(), TimeUnit.MILLISECONDS); |
|
} else { |
|
// If there's no delay, check directly from the same thread |
|
checkForNewEntries(op, callback, ctx); |
|
} |
|
} |
|
} |
|
private void internalReadFromLedger(ReadHandle ledger, OpReadEntry opReadEntry) { |
|
|
|
if (opReadEntry.readPosition.compareTo(opReadEntry.maxPosition) > 0) { |
|
opReadEntry.checkReadCompletion(); |
|
return; |
|
} |
|
void checkReadCompletion() { |
|
// op readPosition is smaller or equals maxPosition then can read again |
|
if (entries.size() < count && cursor.hasMoreEntries() |
|
&& maxPosition.compareTo(readPosition) > 0) { |
|
|
|
// We still have more entries to read from the next ledger, schedule a new async operation |
|
cursor.ledger.getExecutor().execute(() -> { |
|
readPosition = cursor.ledger.startReadOperationOnLedger(nextReadPosition); |
|
cursor.ledger.asyncReadEntries(OpReadEntry.this); |
|
}); |
|
} else { |
|
// The reading was already completed, release resources and trigger callback |
|
try { |
|
cursor.readOperationCompleted(); |
|
|
|
} finally { |
|
cursor.ledger.getExecutor().execute(() -> { |
|
callback.readEntriesComplete(entries, ctx); |
|
recycle(); |
|
}); |
|
} |
|
} |
|
} |
Solution
Update the opReadEntry.maxPosition when trigger notifyEntriesAvailable()
Alternatives
No response
Anything else?
No response
Are you willing to submit a PR?
Search before asking
Motivation
As the code shown, even if we disable txn, we use topic.getMaxReadPosition() which is topic.lastConfirmedEntry as OpReadEntry's maxPosition.
However, exist the situation:
So this is where we can improve, actually we may be able to execute the opReadEntry directly.
By the way, in version-2.9, if we disable txn, topic.getMaxReadPosition() is Position.latest, so opReadEntry.maxPosition is Position.latest too. And it would not have this issue since op.maxPosition is always > op.readPosition.
pulsar/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferDisable.java
Lines 109 to 113 in c160cc9
pulsar/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
Lines 368 to 381 in c160cc9
pulsar/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
Lines 934 to 979 in 5dc0304
pulsar/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
Lines 2051 to 2056 in 5dc0304
pulsar/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java
Lines 164 to 186 in 5dc0304
Solution
Update the opReadEntry.maxPosition when trigger notifyEntriesAvailable()
Alternatives
No response
Anything else?
No response
Are you willing to submit a PR?