Skip to content
This repository was archived by the owner on Jan 24, 2024. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,14 @@ add configurations in Pulsar's configuration file, such as `broker.conf` or `sta
listeners=PLAINTEXT://127.0.0.1:9092
advertisedAddress=127.0.0.1
```
3. Offset Management

Offset management for KoP is dependent on "Broker Entry Metadata" feature of Pulsar. So, you should set `brokerEntryMetadataInterceptors` to `org.apache.pulsar.common.intercept.AppendIndexMetadataInterceptor`.

**Example**
```properties
brokerEntryMetadataInterceptors=org.apache.pulsar.common.intercept.AppendIndexMetadataInterceptor
```
### Restart Pulsar brokers to load KoP

After you have installed the KoP protocol handler to Pulsar broker, you can restart the Pulsar brokers to load KoP.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,7 @@ public void initGroupCoordinator(BrokerService service) throws Exception {


this.groupCoordinator = GroupCoordinator.of(
brokerService,
(PulsarClientImpl) (service.pulsar().getClient()),
groupConfig,
offsetConfig,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -703,29 +703,15 @@ protected void handleOffsetFetchRequest(KafkaHeaderAndRequest offsetFetch,
}

ManagedLedgerImpl managedLedger = (ManagedLedgerImpl) perTopic.getManagedLedger();
PositionImpl lac = (PositionImpl) managedLedger.getLastConfirmedEntry();
if (timestamp == ListOffsetRequest.LATEST_TIMESTAMP) {
PositionImpl position = (PositionImpl) managedLedger.getLastConfirmedEntry();
if (log.isDebugEnabled()) {
log.debug("Get latest position for topic {} time {}. result: {}",
perTopic.getName(), timestamp, position);
}

// no entry in ledger, then entry id could be -1
long entryId = position.getEntryId();

if (legacyMode) {
partitionData.complete(new ListOffsetResponse.PartitionData(
Errors.NONE,
Collections.singletonList(MessageIdUtils
.getOffset(position.getLedgerId(), entryId == -1 ? 0 : entryId))));

} else {
partitionData.complete(new ListOffsetResponse.PartitionData(
Errors.NONE,
DEFAULT_TIMESTAMP,
MessageIdUtils
.getOffset(position.getLedgerId(), entryId == -1 ? 0 : entryId)));
}
long offset = MessageIdUtils.getCurrentOffset(managedLedger);
fetchOffsetForTimestampSuccess(partitionData, legacyMode, offset);

} else if (timestamp == ListOffsetRequest.EARLIEST_TIMESTAMP) {
PositionImpl position = OffsetFinder.getFirstValidPosition(managedLedger);
Expand All @@ -734,17 +720,18 @@ protected void handleOffsetFetchRequest(KafkaHeaderAndRequest offsetFetch,
log.debug("Get earliest position for topic {} time {}. result: {}",
perTopic.getName(), timestamp, position);
}

if (legacyMode) {
partitionData.complete(new ListOffsetResponse.PartitionData(
Errors.NONE,
Collections.singletonList(MessageIdUtils.getOffset(position.getLedgerId(),
position.getEntryId()))));
if (position.compareTo(lac) > 0 || MessageIdUtils.getCurrentOffset(managedLedger) < 0) {
long offset = Math.max(0, MessageIdUtils.getCurrentOffset(managedLedger));
fetchOffsetForTimestampSuccess(partitionData, legacyMode, offset);
} else {
partitionData.complete(new ListOffsetResponse.PartitionData(
Errors.NONE,
DEFAULT_TIMESTAMP,
MessageIdUtils.getOffset(position.getLedgerId(), position.getEntryId())));
MessageIdUtils.getOffsetOfPosition(managedLedger, position).whenComplete((offset, throwable) -> {
if (throwable != null) {
log.error("[{}] Failed to get offset for position {}", perTopic, position, throwable);
fetchOffsetForTimestampFailed(partitionData, legacyMode);
return;
}
fetchOffsetForTimestampSuccess(partitionData, legacyMode, offset);
});
}

} else {
Expand All @@ -760,40 +747,33 @@ public void findEntryComplete(Position position, Object ctx) {
if (finalPosition == null) {
log.warn("Unable to find position for topic {} time {}. get NULL position",
perTopic.getName(), timestamp);

if (legacyMode) {
partitionData.complete(new ListOffsetResponse
.PartitionData(
Errors.UNKNOWN_SERVER_ERROR,
Collections.emptyList()));
} else {
partitionData.complete(new ListOffsetResponse
.PartitionData(
Errors.UNKNOWN_SERVER_ERROR,
ListOffsetResponse.UNKNOWN_TIMESTAMP,
ListOffsetResponse.UNKNOWN_OFFSET));
}
fetchOffsetForTimestampFailed(partitionData, legacyMode);
return;
}
} else {
finalPosition = (PositionImpl) position;
}


if (log.isDebugEnabled()) {
log.debug("Find position for topic {} time {}. position: {}",
perTopic.getName(), timestamp, finalPosition);
}
if (legacyMode) {
partitionData.complete(new ListOffsetResponse.PartitionData(
Errors.NONE,
Collections.singletonList(
MessageIdUtils.getOffset(
finalPosition.getLedgerId(), finalPosition.getEntryId()))));

if (finalPosition.compareTo(lac) > 0 || MessageIdUtils.getCurrentOffset(managedLedger) < 0) {
long offset = Math.max(0, MessageIdUtils.getCurrentOffset(managedLedger));
fetchOffsetForTimestampSuccess(partitionData, legacyMode, offset);
} else {
partitionData.complete(new ListOffsetResponse.PartitionData(
Errors.NONE,
DEFAULT_TIMESTAMP,
MessageIdUtils.getOffset(finalPosition.getLedgerId(), finalPosition.getEntryId())));
MessageIdUtils.getOffsetOfPosition(managedLedger, finalPosition)
.whenComplete((offset, throwable) -> {
if (throwable != null) {
log.error("[{}] Failed to get offset for position {}",
perTopic, finalPosition, throwable);
fetchOffsetForTimestampFailed(partitionData, legacyMode);
return;
}
fetchOffsetForTimestampSuccess(partitionData, legacyMode, offset);
});
}
}

Expand All @@ -802,18 +782,7 @@ public void findEntryFailed(ManagedLedgerException exception,
Optional<Position> position, Object ctx) {
log.warn("Unable to find position for topic {} time {}. Exception:",
perTopic.getName(), timestamp, exception);
if (legacyMode) {
partitionData.complete(new ListOffsetResponse
.PartitionData(
Errors.UNKNOWN_SERVER_ERROR,
Collections.emptyList()));
} else {
partitionData.complete(new ListOffsetResponse
.PartitionData(
Errors.UNKNOWN_SERVER_ERROR,
ListOffsetResponse.UNKNOWN_TIMESTAMP,
ListOffsetResponse.UNKNOWN_OFFSET));
}
fetchOffsetForTimestampFailed(partitionData, legacyMode);
return;
}
});
Expand All @@ -823,6 +792,37 @@ public void findEntryFailed(ManagedLedgerException exception,
return partitionData;
}

private void fetchOffsetForTimestampFailed(CompletableFuture<ListOffsetResponse.PartitionData> partitionData,
boolean legacyMode) {
if (legacyMode) {
partitionData.complete(new ListOffsetResponse
.PartitionData(
Errors.UNKNOWN_SERVER_ERROR,
Collections.emptyList()));
} else {
partitionData.complete(new ListOffsetResponse
.PartitionData(
Errors.UNKNOWN_SERVER_ERROR,
ListOffsetResponse.UNKNOWN_TIMESTAMP,
ListOffsetResponse.UNKNOWN_OFFSET));
}
}

private void fetchOffsetForTimestampSuccess(CompletableFuture<ListOffsetResponse.PartitionData> partitionData,
boolean legacyMode,
long offset) {
if (legacyMode) {
partitionData.complete(new ListOffsetResponse.PartitionData(
Errors.NONE,
Collections.singletonList(offset)));
} else {
partitionData.complete(new ListOffsetResponse.PartitionData(
Errors.NONE,
DEFAULT_TIMESTAMP,
offset));
}
}

private void handleListOffsetRequestV1AndAbove(KafkaHeaderAndRequest listOffset,
CompletableFuture<AbstractResponse> resultFuture) {
ListOffsetRequest request = (ListOffsetRequest) listOffset.getRequest();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,9 +245,9 @@ public class KafkaServiceConfiguration extends ServiceConfiguration {

@FieldContext(
category = CATEGORY_KOP,
doc = "Maximum number of entries that are read from cursor once per time"
doc = "Maximum number of entries that are read from cursor once per time, default is 1"
)
private int maxReadEntriesNum = 5;
private int maxReadEntriesNum = 1;
Comment thread
BewareMyPower marked this conversation as resolved.

@FieldContext(
category = CATEGORY_KOP,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
package io.streamnative.pulsar.handlers.kop;

import static com.google.common.base.Preconditions.checkArgument;
import static io.streamnative.pulsar.handlers.kop.utils.MessageIdUtils.offsetAfterBatchIndex;

import io.streamnative.pulsar.handlers.kop.utils.MessageIdUtils;
import java.io.Closeable;
Expand Down Expand Up @@ -95,8 +94,8 @@ void deleteOneExpiredCursor(long offset) {

if (pair != null) {
if (log.isDebugEnabled()) {
log.debug("[{}] Cursor timed out for offset: {} - {}, cursors cache size: {}",
requestHandler.ctx.channel(), offset, MessageIdUtils.getPosition(offset), consumers.size());
log.debug("[{}] Cursor timed out for offset: {}, cursors cache size: {}",
requestHandler.ctx.channel(), offset, consumers.size());
}

ManagedCursor managedCursor = pair.getKey();
Expand Down Expand Up @@ -146,8 +145,8 @@ public Pair<ManagedCursor, Long> remove(long offset) {

if (cursor != null) {
if (log.isDebugEnabled()) {
log.debug("[{}] Get cursor for offset: {} - {} in cache. cache size: {}",
requestHandler.ctx.channel(), offset, MessageIdUtils.getPosition(offset), consumers.size());
log.debug("[{}] Get cursor for offset: {} in cache. cache size: {}",
requestHandler.ctx.channel(), offset, consumers.size());
}
return cursor;
}
Expand All @@ -156,8 +155,6 @@ public Pair<ManagedCursor, Long> remove(long offset) {
}

private Pair<ManagedCursor, Long> createCursorIfNotExists(long offset) {
// This is for read a new entry, first check if offset is from a batched message request.
offset = offsetAfterBatchIndex(offset);

Pair<ManagedCursor, Long> cursor;

Expand All @@ -170,14 +167,15 @@ private Pair<ManagedCursor, Long> createCursorIfNotExists(long offset) {
consumers.computeIfAbsent(
offset,
off -> {
PositionImpl position = MessageIdUtils.getPosition(off);
ManagedLedgerImpl ledger = (ManagedLedgerImpl) topic.getManagedLedger();

PositionImpl position = MessageIdUtils.getPositionForOffset(ledger, off);

String cursorName = "kop-consumer-cursor-" + topic.getName()
+ "-" + position.getLedgerId() + "-" + position.getEntryId()
+ "-" + DigestUtils.sha1Hex(UUID.randomUUID().toString()).substring(0, 10);

// get previous position, because NonDurableCursor is read from next position.
ManagedLedgerImpl ledger = (ManagedLedgerImpl) topic.getManagedLedger();
PositionImpl previous = ledger.getPreviousPosition(position);
if (log.isDebugEnabled()) {
log.debug("[{}] Create cursor {} for offset: {}. position: {}, previousPosition: {}",
Expand Down Expand Up @@ -231,8 +229,8 @@ public void add(long offset, Pair<ManagedCursor, Long> pair) {
lastAccessTimes.put(offset, System.currentTimeMillis());

if (log.isDebugEnabled()) {
log.debug("[{}] Add cursor back {} for offset: {} - {}",
requestHandler.ctx.channel(), pair.getLeft().getName(), offset, MessageIdUtils.getPosition(offset));
log.debug("[{}] Add cursor back {} for offset: {}",
requestHandler.ctx.channel(), pair.getLeft().getName(), offset);
}
}

Expand Down
Loading