diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index 7584406f39419..bad46a94689f2 100755 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -140,7 +140,7 @@ object Partition { new Partition(topicPartition, _topicId = topicId, replicaLagTimeMaxMs = replicaManager.config.replicaLagTimeMaxMs, - interBrokerProtocolVersion = replicaManager.config.interBrokerProtocolVersion, + interBrokerProtocolVersion = replicaManager.metadataCache.metadataVersion(), localBrokerId = replicaManager.config.brokerId, localBrokerEpochSupplier = replicaManager.brokerEpochSupplier, time = time, diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala index 44176d22763e9..688de61849367 100644 --- a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala +++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala @@ -33,7 +33,6 @@ import org.apache.kafka.common.requests.{TransactionResult, WriteTxnMarkersReque import org.apache.kafka.common.security.JaasContext import org.apache.kafka.common.utils.{LogContext, Time} import org.apache.kafka.common.{Node, Reconfigurable, TopicPartition} -import org.apache.kafka.server.common.MetadataVersion.IBP_2_8_IV0 import org.apache.kafka.server.metrics.KafkaMetricsGroup import org.apache.kafka.server.util.{InterBrokerSendThread, RequestAndCompletionHandler} @@ -180,10 +179,6 @@ class TransactionMarkerChannelManager( private val transactionsWithPendingMarkers = new ConcurrentHashMap[String, PendingCompleteTxn] - private val writeTxnMarkersRequestVersion: Short = - if (config.interBrokerProtocolVersion.isAtLeast(IBP_2_8_IV0)) 1 - else 0 - metricsGroup.newGauge(UnknownDestinationQueueSizeMetricName, () => markersQueueForUnknownBroker.totalNumMarkers) metricsGroup.newGauge(LogAppendRetryQueueSizeMetricName, () => txnLogAppendRetryQueue.size) @@ -261,7 +256,9 @@ class TransactionMarkerChannelManager( }.filter { case (_, entries) => !entries.isEmpty }.map { case (node, entries) => val markersToSend = entries.asScala.map(_.txnMarkerEntry).asJava val requestCompletionHandler = new TransactionMarkerRequestCompletionHandler(node.id, txnStateManager, this, entries) - val request = new WriteTxnMarkersRequest.Builder(writeTxnMarkersRequestVersion, markersToSend) + val request = new WriteTxnMarkersRequest.Builder( + metadataCache.metadataVersion().writeTxnMarkersRequestVersion(), markersToSend + ) new RequestAndCompletionHandler( currentTimeMs, diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 07c8eb0563b94..a9d46fa0296eb 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -455,7 +455,7 @@ class KafkaApis(val requestChannel: RequestChannel, if (!authHelper.authorize(request.context, READ, GROUP, offsetCommitRequest.data.groupId)) { requestHelper.sendMaybeThrottle(request, offsetCommitRequest.getErrorResponse(Errors.GROUP_AUTHORIZATION_FAILED.exception)) CompletableFuture.completedFuture[Unit](()) - } else if (offsetCommitRequest.data.groupInstanceId != null && config.interBrokerProtocolVersion.isLessThan(IBP_2_3_IV0)) { + } else if (offsetCommitRequest.data.groupInstanceId != null && metadataCache.metadataVersion().isLessThan(IBP_2_3_IV0)) { // Only enable static membership when IBP >= 2.3, because it is not safe for the broker to use the static member logic // until we are sure that all brokers support it. If static group being loaded by an older coordinator, it will discard // the group.instance.id field, so static members could accidentally become "dynamic", which leads to wrong states. @@ -1799,7 +1799,7 @@ class KafkaApis(val requestChannel: RequestChannel, ): CompletableFuture[Unit] = { val joinGroupRequest = request.body[JoinGroupRequest] - if (joinGroupRequest.data.groupInstanceId != null && config.interBrokerProtocolVersion.isLessThan(IBP_2_3_IV0)) { + if (joinGroupRequest.data.groupInstanceId != null && metadataCache.metadataVersion().isLessThan(IBP_2_3_IV0)) { // Only enable static membership when IBP >= 2.3, because it is not safe for the broker to use the static member logic // until we are sure that all brokers support it. If static group being loaded by an older coordinator, it will discard // the group.instance.id field, so static members could accidentally become "dynamic", which leads to wrong states. @@ -1829,7 +1829,7 @@ class KafkaApis(val requestChannel: RequestChannel, ): CompletableFuture[Unit] = { val syncGroupRequest = request.body[SyncGroupRequest] - if (syncGroupRequest.data.groupInstanceId != null && config.interBrokerProtocolVersion.isLessThan(IBP_2_3_IV0)) { + if (syncGroupRequest.data.groupInstanceId != null && metadataCache.metadataVersion().isLessThan(IBP_2_3_IV0)) { // Only enable static membership when IBP >= 2.3, because it is not safe for the broker to use the static member logic // until we are sure that all brokers support it. If static group being loaded by an older coordinator, it will discard // the group.instance.id field, so static members could accidentally become "dynamic", which leads to wrong states. @@ -1898,7 +1898,7 @@ class KafkaApis(val requestChannel: RequestChannel, def handleHeartbeatRequest(request: RequestChannel.Request): CompletableFuture[Unit] = { val heartbeatRequest = request.body[HeartbeatRequest] - if (heartbeatRequest.data.groupInstanceId != null && config.interBrokerProtocolVersion.isLessThan(IBP_2_3_IV0)) { + if (heartbeatRequest.data.groupInstanceId != null && metadataCache.metadataVersion().isLessThan(IBP_2_3_IV0)) { // Only enable static membership when IBP >= 2.3, because it is not safe for the broker to use the static member logic // until we are sure that all brokers support it. If static group being loaded by an older coordinator, it will discard // the group.instance.id field, so static members could accidentally become "dynamic", which leads to wrong states. @@ -2529,8 +2529,8 @@ class KafkaApis(val requestChannel: RequestChannel, } def ensureInterBrokerVersion(version: MetadataVersion): Unit = { - if (config.interBrokerProtocolVersion.isLessThan(version)) - throw new UnsupportedVersionException(s"inter.broker.protocol.version: ${config.interBrokerProtocolVersion} is less than the required version: ${version}") + if (metadataCache.metadataVersion().isLessThan(version)) + throw new UnsupportedVersionException(s"metadata.version: ${metadataCache.metadataVersion()} is less than the required version: ${version}") } def handleAddPartitionsToTxnRequest(request: RequestChannel.Request, requestLocal: RequestLocal): Unit = { diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorConcurrencyTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorConcurrencyTest.scala index 3de08c1af7c2c..44a35e981a286 100644 --- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorConcurrencyTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorConcurrencyTest.scala @@ -92,6 +92,9 @@ class TransactionCoordinatorConcurrencyTest extends AbstractCoordinatorConcurren ) } + when(metadataCache.metadataVersion()) + .thenReturn(MetadataVersion.latestProduction()) + txnStateManager = new TransactionStateManager(0, scheduler, replicaManager, metadataCache, txnConfig, time, new Metrics()) txnStateManager.startup(() => zkClient.getTopicPartitionCount(TRANSACTION_STATE_TOPIC_NAME).get, diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelManagerTest.scala index 3356c4f9e372c..d40932f3226ce 100644 --- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelManagerTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelManagerTest.scala @@ -28,6 +28,7 @@ import org.apache.kafka.common.record.RecordBatch import org.apache.kafka.common.requests.{RequestHeader, TransactionResult, WriteTxnMarkersRequest, WriteTxnMarkersResponse} import org.apache.kafka.common.utils.MockTime import org.apache.kafka.common.{Node, TopicPartition} +import org.apache.kafka.server.common.MetadataVersion import org.apache.kafka.server.metrics.{KafkaMetricsGroup, KafkaYammerMetrics} import org.apache.kafka.server.util.RequestAndCompletionHandler import org.junit.jupiter.api.Assertions._ @@ -86,6 +87,8 @@ class TransactionMarkerChannelManagerTest { .thenReturn(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch, txnMetadata1)))) when(txnStateManager.getTransactionState(ArgumentMatchers.eq(transactionalId2))) .thenReturn(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch, txnMetadata2)))) + when(metadataCache.metadataVersion()) + .thenReturn(MetadataVersion.latestProduction()) } @Test diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala index 8018f9697a2e2..015d94d4bbbcd 100644 --- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala @@ -129,7 +129,7 @@ class KafkaApisTest extends Logging { private val brokerId = 1 // KRaft tests should override this with a KRaftMetadataCache private var metadataCache: MetadataCache = MetadataCache.zkMetadataCache(brokerId, MetadataVersion.latestTesting()) - private val brokerEpochManager: ZkBrokerEpochManager = new ZkBrokerEpochManager(metadataCache, controller, None) + private var brokerEpochManager: ZkBrokerEpochManager = new ZkBrokerEpochManager(metadataCache, controller, None) private val clientQuotaManager: ClientQuotaManager = mock(classOf[ClientQuotaManager]) private val clientRequestQuotaManager: ClientRequestQuotaManager = mock(classOf[ClientRequestQuotaManager]) private val clientControllerQuotaManager: ControllerMutationQuotaManager = mock(classOf[ControllerMutationQuotaManager]) @@ -2919,6 +2919,8 @@ class KafkaApisTest extends Logging { @Test def shouldThrowUnsupportedVersionExceptionOnHandleAddOffsetToTxnRequestWhenInterBrokerProtocolNotSupported(): Unit = { + metadataCache = MetadataCache.zkMetadataCache(brokerId, IBP_0_10_2_IV0) + brokerEpochManager = new ZkBrokerEpochManager(metadataCache, controller, None) kafkaApis = createKafkaApis(IBP_0_10_2_IV0) assertThrows(classOf[UnsupportedVersionException], () => kafkaApis.handleAddOffsetsToTxnRequest(null, RequestLocal.withThreadConfinedCaching)) @@ -2926,6 +2928,8 @@ class KafkaApisTest extends Logging { @Test def shouldThrowUnsupportedVersionExceptionOnHandleAddPartitionsToTxnRequestWhenInterBrokerProtocolNotSupported(): Unit = { + metadataCache = MetadataCache.zkMetadataCache(brokerId, IBP_0_10_2_IV0) + brokerEpochManager = new ZkBrokerEpochManager(metadataCache, controller, None) kafkaApis = createKafkaApis(IBP_0_10_2_IV0) assertThrows(classOf[UnsupportedVersionException], () => kafkaApis.handleAddPartitionsToTxnRequest(null, RequestLocal.withThreadConfinedCaching)) @@ -2933,6 +2937,8 @@ class KafkaApisTest extends Logging { @Test def shouldThrowUnsupportedVersionExceptionOnHandleTxnOffsetCommitRequestWhenInterBrokerProtocolNotSupported(): Unit = { + metadataCache = MetadataCache.zkMetadataCache(brokerId, IBP_0_10_2_IV0) + brokerEpochManager = new ZkBrokerEpochManager(metadataCache, controller, None) kafkaApis = createKafkaApis(IBP_0_10_2_IV0) assertThrows(classOf[UnsupportedVersionException], () => kafkaApis.handleAddPartitionsToTxnRequest(null, RequestLocal.withThreadConfinedCaching)) @@ -2940,6 +2946,8 @@ class KafkaApisTest extends Logging { @Test def shouldThrowUnsupportedVersionExceptionOnHandleEndTxnRequestWhenInterBrokerProtocolNotSupported(): Unit = { + metadataCache = MetadataCache.zkMetadataCache(brokerId, IBP_0_10_2_IV0) + brokerEpochManager = new ZkBrokerEpochManager(metadataCache, controller, None) kafkaApis = createKafkaApis(IBP_0_10_2_IV0) assertThrows(classOf[UnsupportedVersionException], () => kafkaApis.handleEndTxnRequest(null, RequestLocal.withThreadConfinedCaching)) @@ -2947,6 +2955,8 @@ class KafkaApisTest extends Logging { @Test def shouldThrowUnsupportedVersionExceptionOnHandleWriteTxnMarkersRequestWhenInterBrokerProtocolNotSupported(): Unit = { + metadataCache = MetadataCache.zkMetadataCache(brokerId, IBP_0_10_2_IV0) + brokerEpochManager = new ZkBrokerEpochManager(metadataCache, controller, None) kafkaApis = createKafkaApis(IBP_0_10_2_IV0) assertThrows(classOf[UnsupportedVersionException], () => kafkaApis.handleWriteTxnMarkersRequest(null, RequestLocal.withThreadConfinedCaching)) @@ -8748,6 +8758,8 @@ class KafkaApisTest extends Logging { ).build() val requestChannelRequest = buildRequest(joinGroupRequest) + metadataCache = MetadataCache.zkMetadataCache(brokerId, IBP_2_2_IV1) + brokerEpochManager = new ZkBrokerEpochManager(metadataCache, controller, None) kafkaApis = createKafkaApis(IBP_2_2_IV1) kafkaApis.handleJoinGroupRequest(requestChannelRequest, RequestLocal.withThreadConfinedCaching) @@ -8766,6 +8778,8 @@ class KafkaApisTest extends Logging { ).build() val requestChannelRequest = buildRequest(syncGroupRequest) + metadataCache = MetadataCache.zkMetadataCache(brokerId, IBP_2_2_IV1) + brokerEpochManager = new ZkBrokerEpochManager(metadataCache, controller, None) kafkaApis = createKafkaApis(IBP_2_2_IV1) kafkaApis.handleSyncGroupRequest(requestChannelRequest, RequestLocal.withThreadConfinedCaching) @@ -8783,6 +8797,8 @@ class KafkaApisTest extends Logging { .setGenerationId(1) ).build() val requestChannelRequest = buildRequest(heartbeatRequest) + metadataCache = MetadataCache.zkMetadataCache(brokerId, IBP_2_2_IV1) + brokerEpochManager = new ZkBrokerEpochManager(metadataCache, controller, None) kafkaApis = createKafkaApis(IBP_2_2_IV1) kafkaApis.handleHeartbeatRequest(requestChannelRequest) @@ -8812,6 +8828,9 @@ class KafkaApisTest extends Logging { ).build() val requestChannelRequest = buildRequest(offsetCommitRequest) + + metadataCache = MetadataCache.zkMetadataCache(brokerId, IBP_2_2_IV1) + brokerEpochManager = new ZkBrokerEpochManager(metadataCache, controller, None) kafkaApis = createKafkaApis(IBP_2_2_IV1) kafkaApis.handleOffsetCommitRequest(requestChannelRequest, RequestLocal.withThreadConfinedCaching) diff --git a/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java b/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java index 94dff4c91fa7a..0574b97cdedf5 100644 --- a/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java +++ b/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java @@ -646,6 +646,14 @@ private static boolean checkIfMetadataChangedOrdered(MetadataVersion highVersion return version != lowVersion; } + public short writeTxnMarkersRequestVersion() { + if (isAtLeast(IBP_2_8_IV0)) { + return 1; + } else { + return 0; + } + } + public boolean isAtLeast(MetadataVersion otherVersion) { return this.compareTo(otherVersion) >= 0; } diff --git a/server-common/src/test/java/org/apache/kafka/server/common/MetadataVersionTest.java b/server-common/src/test/java/org/apache/kafka/server/common/MetadataVersionTest.java index 3908dd1cf6db9..d2867756bf38a 100644 --- a/server-common/src/test/java/org/apache/kafka/server/common/MetadataVersionTest.java +++ b/server-common/src/test/java/org/apache/kafka/server/common/MetadataVersionTest.java @@ -17,6 +17,7 @@ package org.apache.kafka.server.common; +import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.record.RecordVersion; import org.junit.jupiter.api.Test; @@ -466,6 +467,20 @@ public void assertLatestProductionIsLessThanLatest() { " to be less than the latest of " + MetadataVersion.latestTesting()); } + /** + * We need to ensure that the latest production MV doesn't inadvertently rely on an unstable + * request version. Currently, the broker selects the version for some inter-broker RPCs based on the MV + * rather than using the supported version from the ApiResponse. + */ + @Test + public void testProductionMetadataDontUseUnstableApiVersion() { + MetadataVersion mv = MetadataVersion.latestProduction(); + assertTrue(mv.listOffsetRequestVersion() <= ApiKeys.LIST_OFFSETS.latestVersion(false)); + assertTrue(mv.fetchRequestVersion() <= ApiKeys.FETCH.latestVersion(false)); + assertTrue(mv.offsetForLeaderEpochRequestVersion() <= ApiKeys.OFFSET_FOR_LEADER_EPOCH.latestVersion(false)); + assertTrue(mv.writeTxnMarkersRequestVersion() <= ApiKeys.WRITE_TXN_MARKERS.latestVersion(false)); + } + @Test public void assertLatestProductionIsProduction() { assertTrue(LATEST_PRODUCTION.isProduction());