Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.bookkeeper.mledger.AsyncCallbacks.OffloadCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.OpenCursorCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.TerminateCallback;
import org.apache.bookkeeper.mledger.interceptor.ManagedLedgerInterceptor;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.InitialPosition;

/**
Expand Down Expand Up @@ -74,6 +75,18 @@ public interface ManagedLedger {
*/
Position addEntry(byte[] data) throws InterruptedException, ManagedLedgerException;

/**
* Append a new entry to the end of a managed ledger.
*
* @param data
* data entry to be persisted
* @param numberOfMessages
* numberOfMessages of entry
* @return the Position at which the entry has been inserted
* @throws ManagedLedgerException
*/
Position addEntry(byte[] data, int numberOfMessages) throws InterruptedException, ManagedLedgerException;

/**
* Append a new entry asynchronously.
*
Expand Down Expand Up @@ -102,6 +115,22 @@ public interface ManagedLedger {
*/
Position addEntry(byte[] data, int offset, int length) throws InterruptedException, ManagedLedgerException;

/**
* Append a new entry to the end of a managed ledger.
*
* @param data
* data entry to be persisted
* @param numberOfMessages
* numberOfMessages of entry
* @param offset
* offset in the data array
* @param length
* number of bytes
* @return the Position at which the entry has been inserted
* @throws ManagedLedgerException
*/
Position addEntry(byte[] data, int numberOfMessages, int offset, int length) throws InterruptedException, ManagedLedgerException;

/**
* Append a new entry asynchronously.
*
Expand All @@ -119,6 +148,26 @@ public interface ManagedLedger {
*/
void asyncAddEntry(byte[] data, int offset, int length, AddEntryCallback callback, Object ctx);

/**
* Append a new entry asynchronously.
*
* @see #addEntry(byte[])
* @param data
* data entry to be persisted
* @param numberOfMessages
* numberOfMessages of entry
* @param offset
* offset in the data array
* @param length
* number of bytes
* @param callback
* callback object
* @param ctx
* opaque context
*/
void asyncAddEntry(byte[] data, int numberOfMessages, int offset, int length, AddEntryCallback callback, Object ctx);


/**
* Append a new entry asynchronously.
*
Expand All @@ -132,6 +181,21 @@ public interface ManagedLedger {
*/
void asyncAddEntry(ByteBuf buffer, AddEntryCallback callback, Object ctx);

/**
* Append a new entry asynchronously.
*
* @see #addEntry(byte[])
* @param buffer
* buffer with the data entry
* @param numberOfMessages
* numberOfMessages for data entry
* @param callback
* callback object
* @param ctx
* opaque context
*/
void asyncAddEntry(ByteBuf buffer, int numberOfMessages, AddEntryCallback callback, Object ctx);

/**
* Open a ManagedCursor in this ManagedLedger.
*
Expand Down Expand Up @@ -520,4 +584,14 @@ void asyncSetProperties(Map<String, String> properties, final AsyncCallbacks.Upd
* Roll current ledger if it is full
*/
void rollCurrentLedgerIfFull();

/**
* Find position by sequenceId.
* */
CompletableFuture<Position> asyncFindPosition(com.google.common.base.Predicate<Entry> predicate);

/**
* Get the ManagedLedgerInterceptor for ManagedLedger.
* */
ManagedLedgerInterceptor getManagedLedgerInterceptor();
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.bookkeeper.common.annotation.InterfaceStability;
import org.apache.bookkeeper.mledger.impl.NullLedgerOffloader;

import org.apache.bookkeeper.mledger.interceptor.ManagedLedgerInterceptor;
import org.apache.pulsar.common.util.collections.ConcurrentOpenLongPairRangeSet;

/**
Expand Down Expand Up @@ -75,6 +76,7 @@ public class ManagedLedgerConfig {
private LedgerOffloader ledgerOffloader = NullLedgerOffloader.INSTANCE;
private int newEntriesCheckDelayInMillis = 10;
private Clock clock = Clock.systemUTC();
private ManagedLedgerInterceptor managedLedgerInterceptor;

public boolean isCreateIfMissing() {
return createIfMissing;
Expand Down Expand Up @@ -637,4 +639,11 @@ public void setNewEntriesCheckDelayInMillis(int newEntriesCheckDelayInMillis) {
this.newEntriesCheckDelayInMillis = newEntriesCheckDelayInMillis;
}

public ManagedLedgerInterceptor getManagedLedgerInterceptor() {
return managedLedgerInterceptor;
}

public void setManagedLedgerInterceptor(ManagedLedgerInterceptor managedLedgerInterceptor) {
this.managedLedgerInterceptor = managedLedgerInterceptor;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,12 @@ public CursorNotFoundException(String msg) {
}
}

public static class ManagedLedgerInterceptException extends ManagedLedgerException {
public ManagedLedgerInterceptException(String msg) {
super(msg);
}
}

@Override
public synchronized Throwable fillInStackTrace() {
// Disable stack traces to be filled in
Expand Down
Loading