From ccfb24454f9f90a99f5eb01a889d233dc5e03b49 Mon Sep 17 00:00:00 2001 From: Apoorv Mittal Date: Tue, 4 Jun 2024 01:12:54 +0100 Subject: [PATCH 1/2] KAFKA-16740: Adding skeleton code for Share Fetch and Acknowledge RPC --- .../java/kafka/server/SharePartition.java | 436 ++++++++++++++++++ .../kafka/server/SharePartitionManager.java | 228 +++++++++ .../main/scala/kafka/server/KafkaApis.scala | 32 ++ .../java/kafka/server/SharePartitionTest.java | 66 +++ 4 files changed, 762 insertions(+) create mode 100644 core/src/main/java/kafka/server/SharePartition.java create mode 100644 core/src/main/java/kafka/server/SharePartitionManager.java create mode 100644 core/src/test/java/kafka/server/SharePartitionTest.java diff --git a/core/src/main/java/kafka/server/SharePartition.java b/core/src/main/java/kafka/server/SharePartition.java new file mode 100644 index 0000000000000..8407f1f2f95ff --- /dev/null +++ b/core/src/main/java/kafka/server/SharePartition.java @@ -0,0 +1,436 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.server; + +import org.apache.kafka.common.TopicIdPartition; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.message.ShareFetchResponseData.AcquiredRecords; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.server.util.timer.Timer; +import org.apache.kafka.storage.internals.log.FetchPartitionData; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; +import java.util.NavigableMap; +import java.util.Objects; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentSkipListMap; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +/** + * The SharePartition is used to track the state of a partition that is shared between multiple + * consumers. The class maintains the state of the records that have been fetched from the leader + * and are in-flight. + */ +public class SharePartition { + + private final static Logger log = LoggerFactory.getLogger(SharePartition.class); + + /** + * empty member id used to indicate when a record is not acquired by any member. + */ + final static String EMPTY_MEMBER_ID = Uuid.ZERO_UUID.toString(); + + /** + * The RecordState is used to track the state of a record that has been fetched from the leader. + * The state of the records determines if the records should be re-delivered, move the next fetch + * offset, or be state persisted to disk. + */ + public enum RecordState { + AVAILABLE((byte) 0), + ACQUIRED((byte) 1), + ACKNOWLEDGED((byte) 2), + ARCHIVED((byte) 4); + + public final byte id; + + RecordState(byte id) { + this.id = id; + } + + /** + * Validates that the newState is one of the valid transition from the current + * {@code RecordState}. + * + * @param newState State into which requesting to transition; must be non-null + * + * @return {@code RecordState} newState if validation succeeds. Returning + * newState helps state assignment chaining. + * + * @throws IllegalStateException if the state transition validation fails. + */ + public RecordState validateTransition(RecordState newState) throws IllegalStateException { + Objects.requireNonNull(newState, "newState cannot be null"); + if (this == newState) { + throw new IllegalStateException("The state transition is invalid as the new state is" + + "the same as the current state"); + } + + if (this == ACKNOWLEDGED || this == ARCHIVED) { + throw new IllegalStateException("The state transition is invalid from the current state: " + this); + } + + if (this == AVAILABLE && newState != ACQUIRED) { + throw new IllegalStateException("The state can only be transitioned to ACQUIRED from AVAILABLE"); + } + + // Either the transition is from Available -> Acquired or from Acquired -> Available/ + // Acknowledged/Archived. + return newState; + } + + public static RecordState forId(byte id) { + switch (id) { + case 0: + return AVAILABLE; + case 1: + return ACQUIRED; + case 2: + return ACKNOWLEDGED; + case 4: + return ARCHIVED; + default: + throw new IllegalArgumentException("Unknown record state id: " + id); + } + } + } + + /** + * The group id of the share partition belongs to. + */ + private final String groupId; + + /** + * The topic id partition of the share partition. + */ + private final TopicIdPartition topicIdPartition; + + /** + * The in-flight record is used to track the state of a record that has been fetched from the + * leader. The state of the record is used to determine if the record should be re-fetched or if it + * can be acknowledged or archived. Once share partition start offset is moved then the in-flight + * records prior to the start offset are removed from the cache. The cache holds data against the + * first offset of the in-flight batch. + */ + private final NavigableMap cachedState; + + /** + * The lock is used to synchronize access to the in-flight records. The lock is used to ensure that + * the in-flight records are accessed in a thread-safe manner. + */ + private final ReadWriteLock lock; + + /** + * The find next fetch offset is used to indicate if the next fetch offset should be recomputed. + */ + private final AtomicBoolean findNextFetchOffset; + + /** + * The lock to ensure that the same share partition does not enter a fetch queue + * while another one is being fetched within the queue. + */ + private final AtomicBoolean fetchLock; + + /** + * The max in-flight messages is used to limit the number of records that can be in-flight at any + * given time. The max in-flight messages is used to prevent the consumer from fetching too many + * records from the leader and running out of memory. + */ + private final int maxInFlightMessages; + + /** + * The max delivery count is used to limit the number of times a record can be delivered to the + * consumer. The max delivery count is used to prevent the consumer re-delivering the same record + * indefinitely. + */ + private final int maxDeliveryCount; + + /** + * The record lock duration is used to limit the duration for which a consumer can acquire a record. + * Once this time period is elapsed, the record will be made available or archived depending on the delivery count. + */ + private final int recordLockDurationMs; + + /** + * Timer is used to implement acquisition lock on records that guarantees the movement of records from + * acquired to available/archived state upon timeout + */ + private final Timer timer; + + /** + * Time is used to get the currentTime. + */ + private final Time time; + + /** + * The share partition start offset specifies the partition start offset from which the records + * are cached in the cachedState of the sharePartition. + */ + private long startOffset; + + /** + * The share partition end offset specifies the partition end offset from which the records + * are already fetched. + */ + private long endOffset; + + /** + * The state epoch is used to track the version of the state of the share partition. + */ + private int stateEpoch; + + /** + * The replica manager is used to get the earliest offset of the share partition, so we can adjust the start offset. + */ + private final ReplicaManager replicaManager; + + SharePartition( + String groupId, + TopicIdPartition topicIdPartition, + int maxInFlightMessages, + int maxDeliveryCount, + int recordLockDurationMs, + Timer timer, + Time time, + ReplicaManager replicaManager + ) { + this.groupId = groupId; + this.topicIdPartition = topicIdPartition; + this.maxInFlightMessages = maxInFlightMessages; + this.maxDeliveryCount = maxDeliveryCount; + this.cachedState = new ConcurrentSkipListMap<>(); + this.lock = new ReentrantReadWriteLock(); + this.findNextFetchOffset = new AtomicBoolean(false); + this.fetchLock = new AtomicBoolean(false); + this.recordLockDurationMs = recordLockDurationMs; + this.timer = timer; + this.time = time; + this.replicaManager = replicaManager; + // Initialize the partition. + initialize(); + } + + /** + * The next fetch offset is used to determine the next offset that should be fetched from the leader. + * The offset should be the next offset after the last fetched batch but there could be batches/ + * offsets that are either released by acknowledgement API or lock timed out hence the next fetch + * offset might be different from the last batch next offset. Hence, method checks if the next + * fetch offset should be recomputed else returns the last computed next fetch offset. + * + * @return The next fetch offset that should be fetched from the leader. + */ + public long nextFetchOffset() { + // TODO: Implement the logic to compute the next fetch offset. + return 0; + } + + /** + * Acquire the fetched records for the share partition. The acquired records are added to the + * in-flight records and the next fetch offset is updated to the next offset that should be + * fetched from the leader. + * + * @param memberId The member id of the client that is fetching the record. + * @param fetchPartitionData The fetched records for the share partition. + * + * @return A future which is completed when the records are acquired. + */ + public CompletableFuture> acquire( + String memberId, + FetchPartitionData fetchPartitionData + ) { + log.trace("Received acquire request for share partition: {}-{}", memberId, fetchPartitionData); + + CompletableFuture> future = new CompletableFuture<>(); + future.completeExceptionally(new UnsupportedOperationException("Not implemented")); + + return future; + } + + /** + * Acknowledge the fetched records for the share partition. The accepted batches are removed + * from the in-flight records once persisted. The next fetch offset is updated to the next offset + * that should be fetched from the leader, if required. + * + * @param memberId The member id of the client that is fetching the record. + * @param acknowledgementBatch The acknowledgement batch list for the share partition. + * + * @return A future which is completed when the records are acknowledged. + */ + public CompletableFuture> acknowledge( + String memberId, + List acknowledgementBatch + ) { + log.trace("Acknowledgement batch request for share partition: {}-{}", groupId, topicIdPartition); + + CompletableFuture> future = new CompletableFuture<>(); + future.completeExceptionally(new UnsupportedOperationException("Not implemented")); + + return future; + } + + /** + * Release the acquired records for the share partition. The next fetch offset is updated to the next offset + * that should be fetched from the leader. + * + * @param memberId The member id of the client whose records shall be released. + * + * @return A future which is completed when the records are released. + */ + public CompletableFuture> releaseAcquiredRecords(String memberId) { + log.trace("Release acquired records request for share partition: {}-{}-{}", groupId, memberId, topicIdPartition); + + CompletableFuture> future = new CompletableFuture<>(); + future.completeExceptionally(new UnsupportedOperationException("Not implemented")); + + return future; + } + + private void initialize() { + // Initialize the partition. + log.debug("Initializing share partition: {}-{}", groupId, topicIdPartition); + + // TODO: Provide implementation to initialize the share partition. + } + + /** + * The InFlightBatch maintains the in-memory state of the fetched records i.e. in-flight records. + */ + private static class InFlightBatch { + /** + * The offset of the first record in the batch that is fetched from the log. + */ + private final long firstOffset; + /** + * The last offset of the batch that is fetched from the log. + */ + private final long lastOffset; + /** + * The in-flight state of the fetched records. If the offset state map is empty then inflightState + * determines the state of the complete batch else individual offset determines the state of + * the respective records. + */ + private InFlightState inFlightState; + + InFlightBatch(String memberId, long firstOffset, long lastOffset, RecordState state, int deliveryCount) { + this.firstOffset = firstOffset; + this.lastOffset = lastOffset; + this.inFlightState = new InFlightState(state, deliveryCount, memberId); + } + + @Override + public String toString() { + return "InFlightBatch(" + + " firstOffset=" + firstOffset + + ", lastOffset=" + lastOffset + + ", inFlightState=" + inFlightState + + ")"; + } + } + + /** + * The InFlightState is used to track the state and delivery count of a record that has been + * fetched from the leader. The state of the record is used to determine if the record should + * be re-deliver or if it can be acknowledged or archived. + */ + private static class InFlightState { + /** + * The state of the fetch batch records. + */ + private RecordState state; + /** + * The number of times the records has been delivered to the client. + */ + private int deliveryCount; + /** + * The member id of the client that is fetching/acknowledging the record. + */ + private String memberId; + + InFlightState(RecordState state, int deliveryCount, String memberId) { + this.state = state; + this.deliveryCount = deliveryCount; + this.memberId = memberId; + } + + @Override + public int hashCode() { + return Objects.hash(state, deliveryCount, memberId); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + InFlightState that = (InFlightState) o; + return state == that.state && deliveryCount == that.deliveryCount && memberId.equals(that.memberId); + } + + @Override + public String toString() { + return "InFlightState(" + + " state=" + state.toString() + + ", deliveryCount=" + deliveryCount + + ", memberId=" + memberId + + ")"; + } + } + + /** + * The AcknowledgementBatch containing the fields required to acknowledge the fetched records. + */ + public static class AcknowledgementBatch { + + private final long firstOffset; + private final long lastOffset; + private final List acknowledgeTypes; + + public AcknowledgementBatch(long firstOffset, long lastOffset, List acknowledgeTypes) { + this.firstOffset = firstOffset; + this.lastOffset = lastOffset; + this.acknowledgeTypes = acknowledgeTypes; + } + + public long firstOffset() { + return firstOffset; + } + + public long lastOffset() { + return lastOffset; + } + + public List acknowledgeTypes() { + return acknowledgeTypes; + } + + @Override + public String toString() { + return "AcknowledgementBatch(" + + " firstOffset=" + firstOffset + + ", lastOffset=" + lastOffset + + ", acknowledgeTypes=" + ((acknowledgeTypes == null) ? "" : acknowledgeTypes) + + ")"; + } + } +} \ No newline at end of file diff --git a/core/src/main/java/kafka/server/SharePartitionManager.java b/core/src/main/java/kafka/server/SharePartitionManager.java new file mode 100644 index 0000000000000..f377f92a22f03 --- /dev/null +++ b/core/src/main/java/kafka/server/SharePartitionManager.java @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.server; + +import org.apache.kafka.common.TopicIdPartition; +import org.apache.kafka.common.message.ShareAcknowledgeResponseData; +import org.apache.kafka.common.message.ShareFetchResponseData; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.storage.internals.log.FetchParams; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * The SharePartitionManager is responsible for managing the SharePartitions and ShareSessions. + * It is responsible for fetching messages from the log and acknowledging the messages. + */ +public class SharePartitionManager implements AutoCloseable { + + private final static Logger log = LoggerFactory.getLogger(SharePartitionManager.class); + + /** + * The partition cache map is used to store the SharePartition objects for each share group topic-partition. + */ + private final Map partitionCacheMap; + + /** + * The replica manager is used to fetch messages from the log. + */ + private final ReplicaManager replicaManager; + + /** + * The time instance is used to get the current time. + */ + private final Time time; + + /** + * The share session cache stores the share sessions. + */ + private final ShareSessionCache cache; + + /** + * The fetch queue stores the share fetch requests that are waiting to be processed. + */ + private final ConcurrentLinkedQueue fetchQueue; + + /** + * The process fetch queue lock is used to ensure that only one thread is processing the fetch queue at a time. + */ + private final AtomicBoolean processFetchQueueLock; + + /** + * The record lock duration is the time in milliseconds that a record lock is held for. + */ + private final int recordLockDurationMs; + + /** + * The max in flight messages is the maximum number of messages that can be in flight at any one time per share-partition. + */ + private final int maxInFlightMessages; + + /** + * The max delivery count is the maximum number of times a message can be delivered before it is considered to be archived. + */ + private final int maxDeliveryCount; + + public SharePartitionManager( + ReplicaManager replicaManager, + Time time, + ShareSessionCache cache, + int recordLockDurationMs, + int maxDeliveryCount, + int maxInFlightMessages + ) { + this(replicaManager, time, cache, new ConcurrentHashMap<>(), recordLockDurationMs, maxDeliveryCount, maxInFlightMessages); + } + + SharePartitionManager( + ReplicaManager replicaManager, + Time time, + ShareSessionCache cache, + Map partitionCacheMap, + int recordLockDurationMs, + int maxDeliveryCount, + int maxInFlightMessages + ) { + this.replicaManager = replicaManager; + this.time = time; + this.cache = cache; + this.partitionCacheMap = partitionCacheMap; + this.fetchQueue = new ConcurrentLinkedQueue<>(); + this.processFetchQueueLock = new AtomicBoolean(false); + this.recordLockDurationMs = recordLockDurationMs; + this.maxDeliveryCount = maxDeliveryCount; + this.maxInFlightMessages = maxInFlightMessages; + } + + /** + * The fetch messages method is used to fetch messages from the log for the specified topic-partitions. + * The method returns a future that will be completed with the fetched messages. + * + * @param groupId The group id, this is used to identify the share group. + * @param memberId The member id, generated by the group-coordinator, this is used to identify the client. + * @param fetchParams The fetch parameters from the share fetch request. + * @param topicIdPartitions The topic-partitions to fetch messages for. + * @param partitionMaxBytes The maximum number of bytes to fetch for each partition. + * + * @return A future that will be completed with the fetched messages. + */ + public CompletableFuture> fetchMessages( + String groupId, + String memberId, + FetchParams fetchParams, + List topicIdPartitions, + Map partitionMaxBytes + ) { + log.trace("Fetch request for topicIdPartitions: {} with groupId: {} fetch params: {}", + topicIdPartitions, groupId, fetchParams); + + CompletableFuture> future = new CompletableFuture<>(); + future.completeExceptionally(new UnsupportedOperationException("Not implemented yet")); + + return future; + } + + /** + * The acknowledge method is used to acknowledge the messages that have been fetched. + * The method returns a future that will be completed with the acknowledge response. + * + * @param memberId The member id, generated by the group-coordinator, this is used to identify the client. + * @param groupId The group id, this is used to identify the share group. + * @param acknowledgeTopics The acknowledge topics and their corresponding acknowledge batches. + * + * @return A future that will be completed with the acknowledge response. + */ + public CompletableFuture> acknowledge( + String memberId, + String groupId, + Map> acknowledgeTopics + ) { + log.trace("Acknowledge request for topicIdPartitions: {} with groupId: {}", + acknowledgeTopics.keySet(), groupId); + + CompletableFuture> future = new CompletableFuture<>(); + future.completeExceptionally(new UnsupportedOperationException("Not implemented yet")); + + return future; + } + + /** + * The release acquired records method is used to release the acquired records for the specified topic-partitions. + * The method returns a future that will be completed with the release response. + * + * @param groupId The group id, this is used to identify the share group. + * @param memberId The member id, generated by the group-coordinator, this is used to identify the client. + * @param topicIdPartitions The topic-partitions to release the acquired records for. + * + * @return A future that will be completed with the release response. + */ + public CompletableFuture> releaseAcquiredRecords( + String groupId, + String memberId, + List topicIdPartitions + ) { + log.trace("Release acquired records request for topicIdPartitions: {} with groupId: {}", + topicIdPartitions, groupId); + + CompletableFuture> future = new CompletableFuture<>(); + future.completeExceptionally(new UnsupportedOperationException("Not implemented yet")); + + return future; + } + + @Override + public void close() throws Exception { + // TODO: Provide Implementation + } + + /** + * The SharePartitionKey is used to uniquely identify a share partition. The key is made up of the + * share group id, the topic id and the partition id. The key is used to store the SharePartition + * objects in the partition cache map. + */ + private static class SharePartitionKey { + + } + + /** + * Caches share sessions. + *

+ * See tryEvict for an explanation of the cache eviction strategy. + *

+ * The ShareSessionCache is thread-safe because all of its methods are synchronized. + * Note that individual share sessions have their own locks which are separate from the + * ShareSessionCache lock. In order to avoid deadlock, the ShareSessionCache lock + * must never be acquired while an individual ShareSession lock is already held. + */ + public static class ShareSessionCache { + // TODO: Provide Implementation + } + + /** + * The ShareFetchPartitionData class is used to store the fetch parameters for a share fetch request. + */ + private static class ShareFetchPartitionData { + // TODO: Provide Implementation + } +} \ No newline at end of file diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index b76ebff59cb78..5eecd17d804d3 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -256,6 +256,8 @@ class KafkaApis(val requestChannel: RequestChannel, case ApiKeys.GET_TELEMETRY_SUBSCRIPTIONS => handleGetTelemetrySubscriptionsRequest(request) case ApiKeys.PUSH_TELEMETRY => handlePushTelemetryRequest(request) case ApiKeys.LIST_CLIENT_METRICS_RESOURCES => handleListClientMetricsResources(request) + case ApiKeys.SHARE_FETCH => handleShareFetchRequest(request) + case ApiKeys.SHARE_ACKNOWLEDGE => handleShareAcknowledgeRequest(request) case _ => throw new IllegalStateException(s"No handler for request api key ${request.header.apiKey}") } } catch { @@ -3927,6 +3929,36 @@ class KafkaApis(val requestChannel: RequestChannel, } } + def handleShareFetchRequest(request: RequestChannel.Request): Unit = { + val shareFetchRequest = request.body[ShareFetchRequest] + + if (!config.isNewGroupCoordinatorEnabled) { + // The API is not supported by the "old" group coordinator (the default). If the + // new one is not enabled, we fail directly here. + requestHelper.sendMaybeThrottle(request, shareFetchRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception)) + CompletableFuture.completedFuture[Unit](()) + return + } + // TODO: Implement the ShareFetchRequest handling + requestHelper.sendMaybeThrottle(request, shareFetchRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception)) + CompletableFuture.completedFuture[Unit](()) + } + + def handleShareAcknowledgeRequest(request: RequestChannel.Request): Unit = { + val shareAcknowledgeRequest = request.body[ShareAcknowledgeRequest] + + if (!config.isNewGroupCoordinatorEnabled) { + // The API is not supported by the "old" group coordinator (the default). If the + // new one is not enabled, we fail directly here. (KIP-848) + requestHelper.sendMaybeThrottle(request, shareAcknowledgeRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception)) + CompletableFuture.completedFuture[Unit](()) + return + } + // TODO: Implement the ShareAcknowledgeRequest handling + requestHelper.sendMaybeThrottle(request, shareAcknowledgeRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception)) + CompletableFuture.completedFuture[Unit](()) + } + private def updateRecordConversionStats(request: RequestChannel.Request, tp: TopicPartition, conversionStats: RecordValidationStats): Unit = { diff --git a/core/src/test/java/kafka/server/SharePartitionTest.java b/core/src/test/java/kafka/server/SharePartitionTest.java new file mode 100644 index 0000000000000..fcf8935e97532 --- /dev/null +++ b/core/src/test/java/kafka/server/SharePartitionTest.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.server; + +import kafka.server.SharePartition.RecordState; + +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + +public class SharePartitionTest { + + @Test + public void testRecordStateValidateTransition() { + // Null check. + assertThrows(NullPointerException.class, () -> RecordState.AVAILABLE.validateTransition(null)); + // Same state transition check. + assertThrows(IllegalStateException.class, () -> RecordState.AVAILABLE.validateTransition(RecordState.AVAILABLE)); + assertThrows(IllegalStateException.class, () -> RecordState.ACQUIRED.validateTransition(RecordState.ACQUIRED)); + assertThrows(IllegalStateException.class, () -> RecordState.ACKNOWLEDGED.validateTransition(RecordState.ACKNOWLEDGED)); + assertThrows(IllegalStateException.class, () -> RecordState.ARCHIVED.validateTransition(RecordState.ARCHIVED)); + // Invalid state transition to any other state from Acknowledged state. + assertThrows(IllegalStateException.class, () -> RecordState.ACKNOWLEDGED.validateTransition(RecordState.AVAILABLE)); + assertThrows(IllegalStateException.class, () -> RecordState.ACKNOWLEDGED.validateTransition(RecordState.ACQUIRED)); + assertThrows(IllegalStateException.class, () -> RecordState.ACKNOWLEDGED.validateTransition(RecordState.ARCHIVED)); + // Invalid state transition to any other state from Archived state. + assertThrows(IllegalStateException.class, () -> RecordState.ARCHIVED.validateTransition(RecordState.AVAILABLE)); + assertThrows(IllegalStateException.class, () -> RecordState.ARCHIVED.validateTransition(RecordState.ACKNOWLEDGED)); + assertThrows(IllegalStateException.class, () -> RecordState.ARCHIVED.validateTransition(RecordState.ARCHIVED)); + // Invalid state transition to any other state from Available state other than Acquired. + assertThrows(IllegalStateException.class, () -> RecordState.AVAILABLE.validateTransition(RecordState.ACKNOWLEDGED)); + assertThrows(IllegalStateException.class, () -> RecordState.AVAILABLE.validateTransition(RecordState.ARCHIVED)); + + // Successful transition from Available to Acquired. + assertEquals(RecordState.ACQUIRED, RecordState.AVAILABLE.validateTransition(RecordState.ACQUIRED)); + // Successful transition from Acquired to any state. + assertEquals(RecordState.AVAILABLE, RecordState.ACQUIRED.validateTransition(RecordState.AVAILABLE)); + assertEquals(RecordState.ACKNOWLEDGED, RecordState.ACQUIRED.validateTransition(RecordState.ACKNOWLEDGED)); + assertEquals(RecordState.ARCHIVED, RecordState.ACQUIRED.validateTransition(RecordState.ARCHIVED)); + } + + @Test + public void testRecordStateForId() { + assertEquals(RecordState.AVAILABLE, RecordState.forId((byte) 0)); + assertEquals(RecordState.ACQUIRED, RecordState.forId((byte) 1)); + assertEquals(RecordState.ACKNOWLEDGED, RecordState.forId((byte) 2)); + assertEquals(RecordState.ARCHIVED, RecordState.forId((byte) 4)); + // Invalid check. + assertThrows(IllegalArgumentException.class, () -> RecordState.forId((byte) 5)); + } +} \ No newline at end of file From cc0100b22089948feff3e0d8a8c255e2fa170a6e Mon Sep 17 00:00:00 2001 From: Apoorv Mittal Date: Tue, 4 Jun 2024 11:50:57 +0100 Subject: [PATCH 2/2] Addressing review comments --- .../main/java/kafka/server/SharePartition.java | 2 +- .../java/kafka/server/SharePartitionManager.java | 2 +- core/src/main/scala/kafka/server/KafkaApis.scala | 16 ---------------- .../java/kafka/server/SharePartitionTest.java | 2 +- 4 files changed, 3 insertions(+), 19 deletions(-) diff --git a/core/src/main/java/kafka/server/SharePartition.java b/core/src/main/java/kafka/server/SharePartition.java index 8407f1f2f95ff..ddbdb89930c5c 100644 --- a/core/src/main/java/kafka/server/SharePartition.java +++ b/core/src/main/java/kafka/server/SharePartition.java @@ -433,4 +433,4 @@ public String toString() { ")"; } } -} \ No newline at end of file +} diff --git a/core/src/main/java/kafka/server/SharePartitionManager.java b/core/src/main/java/kafka/server/SharePartitionManager.java index f377f92a22f03..ec4af0265adb3 100644 --- a/core/src/main/java/kafka/server/SharePartitionManager.java +++ b/core/src/main/java/kafka/server/SharePartitionManager.java @@ -225,4 +225,4 @@ public static class ShareSessionCache { private static class ShareFetchPartitionData { // TODO: Provide Implementation } -} \ No newline at end of file +} diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 5eecd17d804d3..3e97f1df88860 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -3931,14 +3931,6 @@ class KafkaApis(val requestChannel: RequestChannel, def handleShareFetchRequest(request: RequestChannel.Request): Unit = { val shareFetchRequest = request.body[ShareFetchRequest] - - if (!config.isNewGroupCoordinatorEnabled) { - // The API is not supported by the "old" group coordinator (the default). If the - // new one is not enabled, we fail directly here. - requestHelper.sendMaybeThrottle(request, shareFetchRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception)) - CompletableFuture.completedFuture[Unit](()) - return - } // TODO: Implement the ShareFetchRequest handling requestHelper.sendMaybeThrottle(request, shareFetchRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception)) CompletableFuture.completedFuture[Unit](()) @@ -3946,14 +3938,6 @@ class KafkaApis(val requestChannel: RequestChannel, def handleShareAcknowledgeRequest(request: RequestChannel.Request): Unit = { val shareAcknowledgeRequest = request.body[ShareAcknowledgeRequest] - - if (!config.isNewGroupCoordinatorEnabled) { - // The API is not supported by the "old" group coordinator (the default). If the - // new one is not enabled, we fail directly here. (KIP-848) - requestHelper.sendMaybeThrottle(request, shareAcknowledgeRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception)) - CompletableFuture.completedFuture[Unit](()) - return - } // TODO: Implement the ShareAcknowledgeRequest handling requestHelper.sendMaybeThrottle(request, shareAcknowledgeRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception)) CompletableFuture.completedFuture[Unit](()) diff --git a/core/src/test/java/kafka/server/SharePartitionTest.java b/core/src/test/java/kafka/server/SharePartitionTest.java index fcf8935e97532..c5b57bc378da2 100644 --- a/core/src/test/java/kafka/server/SharePartitionTest.java +++ b/core/src/test/java/kafka/server/SharePartitionTest.java @@ -63,4 +63,4 @@ public void testRecordStateForId() { // Invalid check. assertThrows(IllegalArgumentException.class, () -> RecordState.forId((byte) 5)); } -} \ No newline at end of file +}