Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -42,20 +42,37 @@ public interface LedgerOffloaderFactory<T extends LedgerOffloader> {
boolean isDriverSupported(String driverName);

/**
* Create a ledger offloader with the provided configuration, user-metadata and scheduler.
* Create a ledger offloader with the provided configuration, user-metadata, scheduler and offloaderStats.
*
* @param offloadPolicies offload policies
* @param userMetadata user metadata
* @param scheduler scheduler
* @return the offloader instance
* @throws IOException when fail to create an offloader
*/
T create(OffloadPoliciesImpl offloadPolicies,
Map<String, String> userMetadata,
OrderedScheduler scheduler)
throws IOException;


/**
* Create a ledger offloader with the provided configuration, user-metadata, scheduler and offloaderStats.
*
* @param offloadPolicies offload policies
* @param userMetadata user metadata
* @param scheduler scheduler
* @param offloaderStats offloaderStats
* @return the offloader instance
* @throws IOException when fail to create an offloader
*/
T create(OffloadPoliciesImpl offloadPolicies,
Map<String, String> userMetadata,
OrderedScheduler scheduler,
LedgerOffloaderStats offloaderStats)
throws IOException;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this method T create(offloadPolicies, userMetadata, scheduler, offloaderStats), the changes in #13833 can not guarantee forward compatibility, right?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are four scenarios:

  • old nar & old broker: I think it will work.
  • old nar & new broker: It will not work, and we do not guarantee forward compatibility. So it is okay.
  • new nar & old broker: The broker will call the method without @param offloaderStats. I think it will work.
    • we should make the method without @param offloaderStats non-default.
  • new nar & new broker I think it will work.

All things are good.



/**
* Create a ledger offloader with the provided configuration, user-metadata, schema storage and scheduler.
*
Expand All @@ -66,6 +83,26 @@ T create(OffloadPoliciesImpl offloadPolicies,
* @return the offloader instance
* @throws IOException when fail to create an offloader
*/
default T create(OffloadPoliciesImpl offloadPolicies,
Map<String, String> userMetadata,
SchemaStorage schemaStorage,
OrderedScheduler scheduler)
throws IOException {
return create(offloadPolicies, userMetadata, scheduler);
}

/**
* Create a ledger offloader with the provided configuration, user-metadata, schema storage,
* scheduler and offloaderStats.
*
* @param offloadPolicies offload policies
* @param userMetadata user metadata
* @param schemaStorage used for schema lookup in offloader
* @param scheduler scheduler
* @param offloaderStats offloaderStats
* @return the offloader instance
* @throws IOException when fail to create an offloader
*/
default T create(OffloadPoliciesImpl offloadPolicies,
Map<String, String> userMetadata,
SchemaStorage schemaStorage,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@

import java.util.concurrent.TimeUnit;

class LedgerOffloaderStatsDisable implements LedgerOffloaderStats {
public class LedgerOffloaderStatsDisable implements LedgerOffloaderStats {

static final LedgerOffloaderStats INSTANCE = new LedgerOffloaderStatsDisable();
public static final LedgerOffloaderStats INSTANCE = new LedgerOffloaderStatsDisable();

private LedgerOffloaderStatsDisable() {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.bookkeeper.common.util.OrderedScheduler;
import org.apache.bookkeeper.mledger.LedgerOffloaderFactory;
import org.apache.bookkeeper.mledger.LedgerOffloaderStats;
import org.apache.bookkeeper.mledger.LedgerOffloaderStatsDisable;
import org.apache.bookkeeper.mledger.offload.filesystem.impl.FileSystemManagedLedgerOffloader;
import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl;

Expand All @@ -32,6 +33,13 @@ public boolean isDriverSupported(String driverName) {
return FileSystemManagedLedgerOffloader.driverSupported(driverName);
}

@Override
public FileSystemManagedLedgerOffloader create(OffloadPoliciesImpl offloadPolicies,
Map<String, String> userMetadata, OrderedScheduler scheduler)
throws IOException {
return create(offloadPolicies, userMetadata, scheduler, LedgerOffloaderStatsDisable.INSTANCE);
}

@Override
public FileSystemManagedLedgerOffloader create(OffloadPoliciesImpl offloadPolicies,
Map<String, String> userMetadata,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.bookkeeper.common.util.OrderedScheduler;
import org.apache.bookkeeper.mledger.LedgerOffloaderFactory;
import org.apache.bookkeeper.mledger.LedgerOffloaderStats;
import org.apache.bookkeeper.mledger.LedgerOffloaderStatsDisable;
import org.apache.bookkeeper.mledger.offload.jcloud.impl.BlobStoreManagedLedgerOffloader;
import org.apache.bookkeeper.mledger.offload.jcloud.provider.JCloudBlobStoreProvider;
import org.apache.bookkeeper.mledger.offload.jcloud.provider.TieredStorageConfiguration;
Expand All @@ -44,6 +45,12 @@ public boolean isDriverSupported(String driverName) {
return JCloudBlobStoreProvider.driverSupported(driverName);
}

@Override
public BlobStoreManagedLedgerOffloader create(OffloadPoliciesImpl offloadPolicies, Map<String, String> userMetadata,
OrderedScheduler scheduler) throws IOException {
return create(offloadPolicies, userMetadata, scheduler, LedgerOffloaderStatsDisable.INSTANCE);
}

@Override
public BlobStoreManagedLedgerOffloader create(OffloadPoliciesImpl offloadPolicies, Map<String, String> userMetadata,
OrderedScheduler scheduler,
Expand Down