Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,11 @@ public Builder(final String transactionalId,
.setGroupInstanceId(groupInstanceId.orElse(null));
}

public Builder(final TxnOffsetCommitRequestData data) {
super(ApiKeys.TXN_OFFSET_COMMIT);
this.data = data;
}

@Override
public TxnOffsetCommitRequest build(short version) {
if (version < 3 && groupMetadataSet()) {
Expand Down Expand Up @@ -179,6 +184,11 @@ public TxnOffsetCommitResponse getErrorResponse(int throttleTimeMs, Throwable e)
.setTopics(responseTopicData));
}

@Override
public TxnOffsetCommitResponse getErrorResponse(Throwable e) {
return getErrorResponse(AbstractResponse.DEFAULT_THROTTLE_TIME, e);
}

public static TxnOffsetCommitRequest parse(ByteBuffer buffer, short version) {
return new TxnOffsetCommitRequest(new TxnOffsetCommitRequestData(
new ByteBufferAccessor(buffer), version), version);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Function;

/**
* Possible error codes:
Expand All @@ -47,6 +49,84 @@
*/
public class TxnOffsetCommitResponse extends AbstractResponse {

public static class Builder {
TxnOffsetCommitResponseData data = new TxnOffsetCommitResponseData();
HashMap<String, TxnOffsetCommitResponseTopic> byTopicName = new HashMap<>();

private TxnOffsetCommitResponseTopic getOrCreateTopic(
String topicName
) {
TxnOffsetCommitResponseTopic topic = byTopicName.get(topicName);
if (topic == null) {
topic = new TxnOffsetCommitResponseTopic().setName(topicName);
data.topics().add(topic);
byTopicName.put(topicName, topic);
}
return topic;
}

public Builder addPartition(
String topicName,
int partitionIndex,
Errors error
) {
final TxnOffsetCommitResponseTopic topicResponse = getOrCreateTopic(topicName);

topicResponse.partitions().add(new TxnOffsetCommitResponsePartition()
.setPartitionIndex(partitionIndex)
.setErrorCode(error.code()));

return this;
}

public <P> Builder addPartitions(
String topicName,
List<P> partitions,
Function<P, Integer> partitionIndex,
Errors error
) {
final TxnOffsetCommitResponseTopic topicResponse = getOrCreateTopic(topicName);

partitions.forEach(partition -> {
topicResponse.partitions().add(new TxnOffsetCommitResponsePartition()
.setPartitionIndex(partitionIndex.apply(partition))
.setErrorCode(error.code()));
});

return this;
}

public Builder merge(
TxnOffsetCommitResponseData newData
) {
if (data.topics().isEmpty()) {
// If the current data is empty, we can discard it and use the new data.
data = newData;
} else {
// Otherwise, we have to merge them together.
newData.topics().forEach(newTopic -> {
TxnOffsetCommitResponseTopic existingTopic = byTopicName.get(newTopic.name());
if (existingTopic == null) {
// If no topic exists, we can directly copy the new topic data.
data.topics().add(newTopic);
byTopicName.put(newTopic.name(), newTopic);
} else {
// Otherwise, we add the partitions to the existing one. Note we
// expect non-overlapping partitions here as we don't verify
// if the partition is already in the list before adding it.
existingTopic.partitions().addAll(newTopic.partitions());
}
});
}

return this;
}

public TxnOffsetCommitResponse build() {
return new TxnOffsetCommitResponse(data);
}
}

private final TxnOffsetCommitResponseData data;

public TxnOffsetCommitResponse(TxnOffsetCommitResponseData data) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import kafka.common.OffsetAndMetadata
import kafka.server.RequestLocal
import kafka.utils.Implicits.MapExtensionMethods
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.message.{DeleteGroupsResponseData, DescribeGroupsResponseData, HeartbeatRequestData, HeartbeatResponseData, JoinGroupRequestData, JoinGroupResponseData, LeaveGroupRequestData, LeaveGroupResponseData, ListGroupsRequestData, ListGroupsResponseData, OffsetCommitRequestData, OffsetCommitResponseData, OffsetFetchRequestData, OffsetFetchResponseData, SyncGroupRequestData, SyncGroupResponseData}
import org.apache.kafka.common.message.{DeleteGroupsResponseData, DescribeGroupsResponseData, HeartbeatRequestData, HeartbeatResponseData, JoinGroupRequestData, JoinGroupResponseData, LeaveGroupRequestData, LeaveGroupResponseData, ListGroupsRequestData, ListGroupsResponseData, OffsetCommitRequestData, OffsetCommitResponseData, OffsetFetchRequestData, OffsetFetchResponseData, SyncGroupRequestData, SyncGroupResponseData, TxnOffsetCommitRequestData, TxnOffsetCommitResponseData}
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.record.RecordBatch
import org.apache.kafka.common.requests.{OffsetCommitRequest, RequestContext}
Expand Down Expand Up @@ -360,21 +360,13 @@ class GroupCoordinatorAdapter(
request.topics.forEach { topic =>
topic.partitions.forEach { partition =>
val tp = new TopicPartition(topic.name, partition.partitionIndex)
partitions += tp -> new OffsetAndMetadata(
offset = partition.committedOffset,
leaderEpoch = partition.committedLeaderEpoch match {
case RecordBatch.NO_PARTITION_LEADER_EPOCH => Optional.empty[Integer]
case committedLeaderEpoch => Optional.of[Integer](committedLeaderEpoch)
},
metadata = partition.committedMetadata match {
case null => OffsetAndMetadata.NoMetadata
case metadata => metadata
},
commitTimestamp = partition.commitTimestamp match {
case OffsetCommitRequest.DEFAULT_TIMESTAMP => currentTimeMs
case customTimestamp => customTimestamp
},
expireTimestamp = expireTimeMs
partitions += tp -> createOffsetAndMetadata(
currentTimeMs,
partition.committedOffset,
partition.committedLeaderEpoch,
partition.committedMetadata,
partition.commitTimestamp,
expireTimeMs
)
}
}
Expand All @@ -391,4 +383,91 @@ class GroupCoordinatorAdapter(

future
}

override def commitTransactionalOffsets(
context: RequestContext,
request: TxnOffsetCommitRequestData,
bufferSupplier: BufferSupplier
): CompletableFuture[TxnOffsetCommitResponseData] = {
val currentTimeMs = time.milliseconds
val future = new CompletableFuture[TxnOffsetCommitResponseData]()

def callback(results: Map[TopicPartition, Errors]): Unit = {
val response = new TxnOffsetCommitResponseData()
val byTopics = new mutable.HashMap[String, TxnOffsetCommitResponseData.TxnOffsetCommitResponseTopic]()

results.forKeyValue { (tp, error) =>
val topic = byTopics.get(tp.topic) match {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems like it could use the getOrElseUpdate method, but I'm not sure if we typically have side effects like adding to the response. Not a huge deal either way.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I didn’t use it because of the side effect.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The downside is double-lookup (one for get and another for insert).

case Some(existingTopic) =>
existingTopic
case None =>
val newTopic = new TxnOffsetCommitResponseData.TxnOffsetCommitResponseTopic().setName(tp.topic)
byTopics += tp.topic -> newTopic
response.topics.add(newTopic)
newTopic
}

topic.partitions.add(new TxnOffsetCommitResponseData.TxnOffsetCommitResponsePartition()
.setPartitionIndex(tp.partition)
.setErrorCode(error.code))
}

future.complete(response)
}

val partitions = new mutable.HashMap[TopicPartition, OffsetAndMetadata]()
request.topics.forEach { topic =>
topic.partitions.forEach { partition =>
val tp = new TopicPartition(topic.name, partition.partitionIndex)
partitions += tp -> createOffsetAndMetadata(
currentTimeMs,
partition.committedOffset,
partition.committedLeaderEpoch,
partition.committedMetadata,
OffsetCommitRequest.DEFAULT_TIMESTAMP, // means that currentTimeMs is used.
None
)
}
}

coordinator.handleTxnCommitOffsets(
request.groupId,
request.producerId,
request.producerEpoch,
request.memberId,
Option(request.groupInstanceId),
request.generationId,
partitions.toMap,
callback,
RequestLocal(bufferSupplier)
)

future
}

private def createOffsetAndMetadata(
currentTimeMs: Long,
offset: Long,
leaderEpoch: Int,
metadata: String,
commitTimestamp: Long,
expireTimestamp: Option[Long]
): OffsetAndMetadata = {
new OffsetAndMetadata(
offset = offset,
leaderEpoch = leaderEpoch match {
case RecordBatch.NO_PARTITION_LEADER_EPOCH => Optional.empty[Integer]
case committedLeaderEpoch => Optional.of[Integer](committedLeaderEpoch)
},
metadata = metadata match {
case null => OffsetAndMetadata.NoMetadata
case metadata => metadata
},
commitTimestamp = commitTimestamp match {
case OffsetCommitRequest.DEFAULT_TIMESTAMP => currentTimeMs
case customTimestamp => customTimestamp
},
expireTimestamp = expireTimestamp
)
}
}
Loading