Skip to content

Conversation

@liangyepianzhou
Copy link
Contributor

@liangyepianzhou liangyepianzhou commented Oct 21, 2021

Master Issue: #12482
PIP : streamnative/community#6

Motivation

Redundant information does not need to be stored for a long time. The messages that have been aborted in a transaction and the Txn Marks that have been read are all redundant information

Modifications

Only Offload the length and entryId of entries that do not need to be Offloaded.

Modified the offload method in ledgerOffload interface, and added an OffloadFilter type parameter.
OffloadFilter is a newly added interface, responsible for judging from the perspective of transaction,
whether the ledger can be uninstalled, and the entry needs to be offloaded and so on. The main implementation is OffloadFilterImp and OffloadFilterDisabled

Verifying this change

Add a unit test in pulsar-broker.

  • Make sure that the change passes the CI checks.

Does this pull request potentially affect one of the following parts:

If yes was chosen, please highlight the changes

  • Dependencies (does it add or upgrade a dependency): (yes / no)
  • The public API: (yes / no)
  • The schema: (yes / no / don't know)
  • The default values of configurations: (yes / no)
  • The wire protocol: (yes / no)
  • The rest endpoints: (yes / no)
  • The admin cli options: (yes / no)
  • Anything that affects deployment: (yes / no / don't know)

Documentation

Check the box below and label this PR (if you have committer privilege).

Need to update docs?

  • no-need-doc
    This is a perfection of existing functions

@eolivelli eolivelli added the doc-not-needed Your PR changes do not impact docs label Oct 21, 2021
reader.seek(key);
while (entriesToRead > 0) {
reader.next(key, value);
reader.next(key, value);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not use while do? if reader.next(key, value) return false, value.getLength() will throw NPE

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

value.getLength() will be 0. And the entries will get. a entry which the length is 0.
And then the exception will not be thrown when LedgerEntriesImpl.create(entries).

continue;
long entryId;
int length;
try {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dont use try catch, this code have been created in try catch line 110

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The two try catch here are different. The try catch here is to obtain the EOF exception caused by the last few entries in the ledger that have been fittered, and exit the reading loop when the EOF exception is obtained.

Copy link
Contributor

@congbobo184 congbobo184 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

left some comments


while (entriesToRead > 0) {
if (state == State.Closed) {
log.warn("Reading a closed read handler. Ledger ID: {}, Read range: {}-{}", ledgerId, firstEntry, lastEntry);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should delete entriesToRead right? because we may not read the entriesToRead number of entries

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there is no transaction messages, this is still useful. Can we consider keeping it?

Comment on lines 176 to 179
if (entries.isEmpty()) {
ByteBuf buf = PooledByteBufAllocator.DEFAULT.buffer(0, 0);
entries.add(LedgerEntryImpl.create(ledgerId, 0, 0, buf));
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why we should add this logic?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

promise.complete(LedgerEntriesImpl.create(entries));
The method of LedgerEntriesImpl.create(entries) will check if entries are empty.

.getIndexEntryForEntry(groupedReader.ledgerId, nextExpectedId)
.getDataOffset());
continue;

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same as BlobStoreBackedReadHandleImpl.java

@liangyepianzhou
Copy link
Contributor Author

/pulsarbot run-failure-checks

if (opReadEntry.readPosition.compareTo(opReadEntry.maxPosition) > 0) {
//entries that has been offloaded do not need to be restricted by TransactionBuffer`maxReadPosition
LedgerInfo info = ledgers.get(ledger.getId());
if (opReadEntry.readPosition.compareTo(opReadEntry.maxPosition) > 0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we only can offload small than maxPosition? why we should add this logic?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don’t know if the messages after MaxReadPosition are aborted.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why the messages are aborted or not is related to offloading?

// can read max position entryId
if (ledger.getId() == opReadEntry.maxPosition.getLedgerId()) {
lastEntryInLedger = min(opReadEntry.maxPosition.getEntryId(), lastEntryInLedger);
if(!info.getOffloadContext().getComplete()){
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same as above

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If no transaction message has been sent, then MaxReadPosition does not need to be considered when recovering from tiered storage.

}
entryId = groupedReader.dataStream.readLong();
} catch (EOFException ioException) {
//Some entries in this ledger may be filtered. If the last entry in this ledger was filtered,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add a test for it when produce EOFException

Comment on lines 158 to 161
groupedReader.inputStream
.seek(groupedReader.index
.getIndexEntryForEntry(groupedReader.ledgerId, nextExpectedId)
.getDataOffset());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Annotate this seems to have no effect on test

}
if (entries.isEmpty()) {
ByteBuf buf = PooledByteBufAllocator.DEFAULT.buffer(0, 0);
entries.add(LedgerEntryImpl.create(ledgerId, 0, 0, buf));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can't do this, because LedgerEntryImpl.create(ledgerId, 0, 0, buf) will be sent to consumer but it don't have any data

}
}
entryId = key.get();
ByteBuf buf = PooledByteBufAllocator.DEFAULT.buffer(length, length);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can't return length is 0 entry, same as above BlockAwareSegmentInputStream.java

Copy link
Contributor

@congbobo184 congbobo184 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM! left some comment

* @param uid unique id to identity this offload attempt
* @param extraMetadata metadata to be stored with the offloaded ledger for informational
* purposes
* @param offloadFilter Filter out unnecessary entries
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Filter lower case

}

// can read max position entryId
if (ledger.getId() == opReadEntry.maxPosition.getLedgerId()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ledger.getId() == opReadEntry.maxPosition.getLedgerId() && !info.getOffloadContext().getComplete()

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why need this check?


if (persistentTopic.getBrokerService().getPulsar().getConfig()
.isTransactionCoordinatorEnabled()){
if (ledger instanceof ManagedLedgerImpl) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add a interface to ManagedLedger is better, right?

private static final String NAMESPACE1 = TENANT + "/ns1";
private static final int NUM_BROKERS = 1;
private static final int NUM_PARTITIONS = 1;
private Properties properties = new Properties();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seem not to use

Comment on lines 90 to 111
this.setBrokerCount(NUM_BROKERS);
this.internalSetup();
String[] brokerServiceUrlArr = getPulsarServiceList().get(0).getBrokerServiceUrl().split(":");
String webServicePort = brokerServiceUrlArr[brokerServiceUrlArr.length - 1];
admin.clusters().createCluster(CLUSTER_NAME, ClusterData.builder()
.serviceUrl("http://localhost:" + webServicePort).build());
admin.tenants().createTenant(TENANT,
new TenantInfoImpl(Sets.newHashSet("appid1"), Sets.newHashSet(CLUSTER_NAME)));
admin.namespaces().createNamespace(NAMESPACE1);

admin.tenants().createTenant(NamespaceName.SYSTEM_NAMESPACE.getTenant(),
new TenantInfoImpl(Sets.newHashSet("appid1"), Sets.newHashSet(CLUSTER_NAME)));
admin.namespaces().createNamespace(NamespaceName.SYSTEM_NAMESPACE.toString());
admin.topics().createPartitionedTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString(), NUM_PARTITIONS);
pulsarClient.close();
pulsarClient = PulsarClient.builder()
.serviceUrl(getPulsarServiceList().get(0).getBrokerServiceUrl())
.statsInterval(0, TimeUnit.SECONDS)
.enableTransaction(true)
.build();
// wait tc init success to ready state
waitForCoordinatorToBeAvailable(NUM_PARTITIONS);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same to TransactionTestBase.setUpBase, could you please code like MockedPulsarServiceBaseTest.internalSetup?

fileSystemManagedLedgerOffloader.getOffloadPolicies().setManagedLedgerOffloadThresholdInBytes(1L);
setLedgerOffloader(fileSystemManagedLedgerOffloader);
setMaxEntriesPerLedger(4);
setProperties(properties);
Copy link
Contributor

@congbobo184 congbobo184 Nov 17, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seem not to use

Copy link
Contributor

@gaoran10 gaoran10 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice work! Left some comments.

if (opReadEntry.readPosition.compareTo(opReadEntry.maxPosition) > 0) {
//entries that has been offloaded do not need to be restricted by TransactionBuffer`maxReadPosition
LedgerInfo info = ledgers.get(ledger.getId());
if (opReadEntry.readPosition.compareTo(opReadEntry.maxPosition) > 0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why the messages are aborted or not is related to offloading?

}

// can read max position entryId
if (ledger.getId() == opReadEntry.maxPosition.getLedgerId()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why need this check?


private ManagedLedgerInterceptor managedLedgerInterceptor;

private OffloadFilter offloadFilter;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we could use a list here? It seems that the offload filter may have different implementations.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why don't we add methods in it?

if (entry == null) {
continue;
} else if (entry.getLength() == 0) {
subscription.acknowledgeMessage(Collections.singletonList(entry.getPosition()), AckType.Individual,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add a description here.

@codelipenghui codelipenghui added this to the 2.11.0 milestone Jan 21, 2022
@github-actions
Copy link

The pr had no activity for 30 days, mark with Stale label.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

doc-not-needed Your PR changes do not impact docs lifecycle/stale

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants