Skip to content
This repository was archived by the owner on Jan 24, 2024. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions kafka-impl/conf/kop.conf
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,10 @@ saslAllowedMechanisms=
# Enable the deletion of inactive topics
brokerDeleteInactiveTopicsEnabled=false

allowAutoTopicCreation=true

allowAutoTopicCreationType=partitioned

# Name of the cluster to which this broker belongs to
clusterName=kafka-cluster

Expand Down
4 changes: 4 additions & 0 deletions kafka-impl/conf/kop_standalone.conf
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,10 @@ saslAllowedMechanisms=
# Enable the deletion of inactive topics
brokerDeleteInactiveTopicsEnabled=false

allowAutoTopicCreation=true

allowAutoTopicCreationType=partitioned

### --- General broker settings --- ###

# Max Rate(in 1 seconds) of Message allowed to publish for a broker if broker publish rate limiting enabled
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import io.streamnative.pulsar.handlers.kop.utils.ConfigurationUtils;
import io.streamnative.pulsar.handlers.kop.utils.timer.SystemTimer;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand All @@ -47,12 +46,7 @@
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.ReaderBuilder;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.ReaderBuilderImpl;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
Expand Down Expand Up @@ -290,6 +284,7 @@ public void close() {
if (groupCoordinator != null) {
groupCoordinator.shutdown();
}
KafkaTopicManager.LOOKUP_CACHE.clear();
}

public void initGroupCoordinator(BrokerService service) throws Exception {
Expand All @@ -314,19 +309,8 @@ public void initGroupCoordinator(BrokerService service) throws Exception {
// topicName in pulsar format: tenant/ns/topic
createKafkaOffsetsTopic(service);

ProducerBuilder<ByteBuffer> groupCoordinatorTopicProducer = service.pulsar().getClient()
.newProducer(Schema.BYTEBUFFER)
.maxPendingMessages(100000);

// TODO: replace this back to service.pulsar().getClient().newReader after merge pulsar PR:
// https://github.com/apache/pulsar/pull/5923
ReaderBuilder<ByteBuffer> groupCoordinatorTopicReader =
new ReaderBuilderImpl<>((PulsarClientImpl) (service.pulsar().getClient()), Schema.BYTEBUFFER);
groupCoordinatorTopicReader.startMessageId(MessageId.earliest);

this.groupCoordinator = GroupCoordinator.of(
groupCoordinatorTopicProducer,
groupCoordinatorTopicReader,
(PulsarClientImpl) (service.pulsar().getClient()),
groupConfig,
offsetConfig,
SystemTimer.builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,10 @@ private Producer registerInPersistentTopic(PersistentTopic persistentTopic) thro
// call pulsarclient.lookup.getbroker to get and own a topic
public CompletableFuture<InetSocketAddress> getTopicBroker(String topicName) {
return LOOKUP_CACHE.computeIfAbsent(topicName, t -> {
if (log.isDebugEnabled()) {
log.debug("topic {} not in Lookup_cache, call lookupBroker",
topicName);
}
CompletableFuture<InetSocketAddress> returnFuture = new CompletableFuture<>();
Backoff backoff = new Backoff(
100, TimeUnit.MILLISECONDS,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.AsyncCallbacks.MarkDeleteCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedCursor;
Expand Down Expand Up @@ -314,10 +315,16 @@ public void readEntriesComplete(List<Entry> list, Object o) {
if (!list.isEmpty()) {
entry = list.get(0);
long offset = MessageIdUtils.getOffset(entry.getLedgerId(), entry.getEntryId());
PositionImpl currentPosition = PositionImpl
.get(entry.getLedgerId(), entry.getEntryId());

// commit the offset, so backlog not affect by this cursor.
commitOffset((NonDurableCursorImpl) cursor, currentPosition);

// get next offset
PositionImpl nextPosition = ((NonDurableCursorImpl ) cursor)
.getNextAvailablePosition(PositionImpl
.get(entry.getLedgerId(), entry.getEntryId()));
PositionImpl nextPosition = ((NonDurableCursorImpl) cursor)
.getNextAvailablePosition(currentPosition);

long nextOffset = MessageIdUtils
.getOffset(nextPosition.getLedgerId(), nextPosition.getEntryId());

Expand Down Expand Up @@ -366,4 +373,23 @@ public void readEntriesFailed(ManagedLedgerException e, Object o) {
return readFutures;
}

// commit the offset, so backlog not affect by this cursor.
private static void commitOffset(NonDurableCursorImpl cursor, PositionImpl currentPosition) {
cursor.asyncMarkDelete(currentPosition, new MarkDeleteCallback() {
@Override
public void markDeleteComplete(Object ctx) {
if (log.isDebugEnabled()) {
log.debug("Mark delete success for position: {}", currentPosition);
}
}

// this is OK, since this is kind of cumulative ack, following commit will come.
@Override
public void markDeleteFailed(ManagedLedgerException e, Object ctx) {
log.warn("Mark delete success for position: {} with error:",
currentPosition, e);
}
}, null);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.common.util.OrderedScheduler;
import org.apache.kafka.common.TopicPartition;
Expand All @@ -61,10 +62,14 @@
import org.apache.kafka.common.requests.OffsetFetchResponse.PartitionData;
import org.apache.kafka.common.requests.TransactionResult;
import org.apache.kafka.common.utils.Time;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.ReaderBuilder;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.ReaderBuilderImpl;
import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.common.util.FutureUtil;

Expand All @@ -75,8 +80,7 @@
public class GroupCoordinator {

public static GroupCoordinator of(
ProducerBuilder<ByteBuffer> producer,
ReaderBuilder<ByteBuffer> reader,
PulsarClientImpl pulsarClient,
GroupConfig groupConfig,
OffsetConfig offsetConfig,
Timer timer,
Expand All @@ -86,6 +90,13 @@ public static GroupCoordinator of(
.name("group-coordinator-executor")
.build();

// __offset partitions producers and readers builder.
ProducerBuilder<ByteBuffer> producer = pulsarClient
.newProducer(Schema.BYTEBUFFER)
.maxPendingMessages(100000);
ReaderBuilder<ByteBuffer> reader = new ReaderBuilderImpl<>(pulsarClient, Schema.BYTEBUFFER);

reader.startMessageId(MessageId.earliest);
GroupMetadataManager metadataManager = new GroupMetadataManager(
offsetConfig,
producer,
Expand All @@ -105,12 +116,14 @@ public static GroupCoordinator of(
.timeoutTimer(timer)
.build();

OffsetAcker offsetAcker = new OffsetAcker(pulsarClient);
return new GroupCoordinator(
groupConfig,
metadataManager,
heartbeatPurgatory,
joinPurgatory,
time
time,
offsetAcker
);
}

Expand All @@ -133,6 +146,9 @@ public static GroupCoordinator of(
Collections.emptyList()
);

// for topic backlog tracking.
@Getter
private final OffsetAcker offsetAcker;
private final AtomicBoolean isActive = new AtomicBoolean(false);
private final GroupConfig groupConfig;
private final GroupMetadataManager groupManager;
Expand All @@ -145,12 +161,14 @@ public GroupCoordinator(
GroupMetadataManager groupManager,
DelayedOperationPurgatory<DelayedHeartbeat> heartbeatPurgatory,
DelayedOperationPurgatory<DelayedJoin> joinPurgatory,
Time time) {
Time time,
OffsetAcker offsetAcker) {
this.groupConfig = groupConfig;
this.groupManager = groupManager;
this.heartbeatPurgatory = heartbeatPurgatory;
this.joinPurgatory = joinPurgatory;
this.time = time;
this.offsetAcker = offsetAcker;
}

/**
Expand All @@ -173,6 +191,7 @@ public void shutdown() {
groupManager.shutdown();
heartbeatPurgatory.shutdown();
joinPurgatory.shutdown();
offsetAcker.close();
log.info("Shutdown group coordinator completely.");
}

Expand Down Expand Up @@ -433,6 +452,12 @@ public CompletableFuture<KeyValue<Errors, byte[]>> handleSyncGroup(
(assignment, errors) -> resultFuture.complete(
new KeyValue<>(errors, assignment))
);

resultFuture.whenCompleteAsync((kv, throwable) -> {
if (throwable == null && kv.getKey() == Errors.NONE) {
offsetAcker.addOffsetsTracker(groupId, kv.getValue());
}
});
return resultFuture;
}

Expand Down Expand Up @@ -642,6 +667,7 @@ public Map<String, Errors> handleDeleteGroups(Set<String> groupIds) {
);
}

offsetAcker.close(groupIds);
return groupErrors;
}

Expand Down Expand Up @@ -740,7 +766,7 @@ public CompletableFuture<Map<TopicPartition, Errors>> handleCommitOffsets(
int generationId,
Map<TopicPartition, OffsetAndMetadata> offsetMetadata
) {
return validateGroupStatus(groupId, ApiKeys.OFFSET_COMMIT)
CompletableFuture<Map<TopicPartition, Errors>> result = validateGroupStatus(groupId, ApiKeys.OFFSET_COMMIT)
.map(error ->
CompletableFuture.completedFuture(
CoreUtils.mapValue(
Expand Down Expand Up @@ -771,6 +797,14 @@ public CompletableFuture<Map<TopicPartition, Errors>> handleCommitOffsets(
}
});
});

result.whenCompleteAsync((ignore, e) ->{
if (e == null){
offsetAcker.ackOffsets(groupId, offsetMetadata);
}
});

return result;
}

public Future<?> scheduleHandleTxnCompletion(
Expand Down
Loading