From 1eb337329c5c535b19a9d55a8bbf4c86d63da3ae Mon Sep 17 00:00:00 2001 From: m1a2st Date: Fri, 16 Aug 2024 18:29:02 +0800 Subject: [PATCH 1/6] transfer from config.interBrokerProtocolVersion -> metadataCache.metadataVersion --- .../main/scala/kafka/cluster/Partition.scala | 2 +- .../coordinator/group/GroupCoordinator.scala | 2 +- .../TransactionMarkerChannelManager.scala | 9 +++----- .../scala/kafka/server/ConfigHandler.scala | 4 ++-- .../main/scala/kafka/server/KafkaApis.scala | 12 +++++------ .../AbstractCoordinatorConcurrencyTest.scala | 14 ++++++++----- .../GroupCoordinatorConcurrencyTest.scala | 3 +++ .../group/GroupCoordinatorTest.scala | 7 ++++++- ...ransactionCoordinatorConcurrencyTest.scala | 2 ++ .../TransactionMarkerChannelManagerTest.scala | 3 +++ .../unit/kafka/server/KafkaApisTest.scala | 21 ++++++++++++++++++- .../kafka/server/common/MetadataVersion.java | 8 +++++++ .../server/common/MetadataVersionTest.java | 15 +++++++++++++ 13 files changed, 79 insertions(+), 23 deletions(-) diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index 7584406f39419..bad46a94689f2 100755 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -140,7 +140,7 @@ object Partition { new Partition(topicPartition, _topicId = topicId, replicaLagTimeMaxMs = replicaManager.config.replicaLagTimeMaxMs, - interBrokerProtocolVersion = replicaManager.config.interBrokerProtocolVersion, + interBrokerProtocolVersion = replicaManager.metadataCache.metadataVersion(), localBrokerId = replicaManager.config.brokerId, localBrokerEpochSupplier = replicaManager.brokerEpochSupplier, time = time, diff --git a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala index f592f3c9cea82..a4640f6c1f7d8 100644 --- a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala +++ b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala @@ -1793,7 +1793,7 @@ object GroupCoordinator { groupMaxSize = config.groupCoordinatorConfig.classicGroupMaxSize, groupInitialRebalanceDelayMs = config.groupCoordinatorConfig.classicGroupInitialRebalanceDelayMs) - val groupMetadataManager = new GroupMetadataManager(config.brokerId, config.interBrokerProtocolVersion, + val groupMetadataManager = new GroupMetadataManager(config.brokerId, replicaManager.metadataCache.metadataVersion(), offsetConfig, replicaManager, time, metrics) new GroupCoordinator(config.brokerId, groupConfig, offsetConfig, groupMetadataManager, heartbeatPurgatory, rebalancePurgatory, time, metrics) diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala index 44176d22763e9..688de61849367 100644 --- a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala +++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala @@ -33,7 +33,6 @@ import org.apache.kafka.common.requests.{TransactionResult, WriteTxnMarkersReque import org.apache.kafka.common.security.JaasContext import org.apache.kafka.common.utils.{LogContext, Time} import org.apache.kafka.common.{Node, Reconfigurable, TopicPartition} -import org.apache.kafka.server.common.MetadataVersion.IBP_2_8_IV0 import org.apache.kafka.server.metrics.KafkaMetricsGroup import org.apache.kafka.server.util.{InterBrokerSendThread, RequestAndCompletionHandler} @@ -180,10 +179,6 @@ class TransactionMarkerChannelManager( private val transactionsWithPendingMarkers = new ConcurrentHashMap[String, PendingCompleteTxn] - private val writeTxnMarkersRequestVersion: Short = - if (config.interBrokerProtocolVersion.isAtLeast(IBP_2_8_IV0)) 1 - else 0 - metricsGroup.newGauge(UnknownDestinationQueueSizeMetricName, () => markersQueueForUnknownBroker.totalNumMarkers) metricsGroup.newGauge(LogAppendRetryQueueSizeMetricName, () => txnLogAppendRetryQueue.size) @@ -261,7 +256,9 @@ class TransactionMarkerChannelManager( }.filter { case (_, entries) => !entries.isEmpty }.map { case (node, entries) => val markersToSend = entries.asScala.map(_.txnMarkerEntry).asJava val requestCompletionHandler = new TransactionMarkerRequestCompletionHandler(node.id, txnStateManager, this, entries) - val request = new WriteTxnMarkersRequest.Builder(writeTxnMarkersRequestVersion, markersToSend) + val request = new WriteTxnMarkersRequest.Builder( + metadataCache.metadataVersion().writeTxnMarkersRequestVersion(), markersToSend + ) new RequestAndCompletionHandler( currentTimeMs, diff --git a/core/src/main/scala/kafka/server/ConfigHandler.scala b/core/src/main/scala/kafka/server/ConfigHandler.scala index 77a3874a3ddc5..37cbea7e4b628 100644 --- a/core/src/main/scala/kafka/server/ConfigHandler.scala +++ b/core/src/main/scala/kafka/server/ConfigHandler.scala @@ -166,12 +166,12 @@ class TopicConfigHandler(private val replicaManager: ReplicaManager, private def excludedConfigs(topic: String, topicConfig: Properties): Set[String] = { // Verify message format version Option(topicConfig.getProperty(TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG)).flatMap { versionString => - val messageFormatVersion = new MessageFormatVersion(versionString, kafkaConfig.interBrokerProtocolVersion.version) + val messageFormatVersion = new MessageFormatVersion(versionString, replicaManager.metadataCache.metadataVersion().version) if (messageFormatVersion.shouldIgnore) { if (messageFormatVersion.shouldWarn) warn(messageFormatVersion.topicWarningMessage(topic)) Some(TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG) - } else if (kafkaConfig.interBrokerProtocolVersion.isLessThan(messageFormatVersion.messageFormatVersion)) { + } else if (replicaManager.metadataCache.metadataVersion().isLessThan(messageFormatVersion.messageFormatVersion)) { warn(s"Topic configuration ${TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG} is ignored for `$topic` because `$versionString` " + s"is higher than what is allowed by the inter-broker protocol version `${kafkaConfig.interBrokerProtocolVersionString}`") Some(TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG) diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 07c8eb0563b94..c180e0fb0ca8f 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -455,7 +455,7 @@ class KafkaApis(val requestChannel: RequestChannel, if (!authHelper.authorize(request.context, READ, GROUP, offsetCommitRequest.data.groupId)) { requestHelper.sendMaybeThrottle(request, offsetCommitRequest.getErrorResponse(Errors.GROUP_AUTHORIZATION_FAILED.exception)) CompletableFuture.completedFuture[Unit](()) - } else if (offsetCommitRequest.data.groupInstanceId != null && config.interBrokerProtocolVersion.isLessThan(IBP_2_3_IV0)) { + } else if (offsetCommitRequest.data.groupInstanceId != null && metadataCache.metadataVersion().isLessThan(IBP_2_3_IV0)) { // Only enable static membership when IBP >= 2.3, because it is not safe for the broker to use the static member logic // until we are sure that all brokers support it. If static group being loaded by an older coordinator, it will discard // the group.instance.id field, so static members could accidentally become "dynamic", which leads to wrong states. @@ -1799,7 +1799,7 @@ class KafkaApis(val requestChannel: RequestChannel, ): CompletableFuture[Unit] = { val joinGroupRequest = request.body[JoinGroupRequest] - if (joinGroupRequest.data.groupInstanceId != null && config.interBrokerProtocolVersion.isLessThan(IBP_2_3_IV0)) { + if (joinGroupRequest.data.groupInstanceId != null && metadataCache.metadataVersion().isLessThan(IBP_2_3_IV0)) { // Only enable static membership when IBP >= 2.3, because it is not safe for the broker to use the static member logic // until we are sure that all brokers support it. If static group being loaded by an older coordinator, it will discard // the group.instance.id field, so static members could accidentally become "dynamic", which leads to wrong states. @@ -1829,7 +1829,7 @@ class KafkaApis(val requestChannel: RequestChannel, ): CompletableFuture[Unit] = { val syncGroupRequest = request.body[SyncGroupRequest] - if (syncGroupRequest.data.groupInstanceId != null && config.interBrokerProtocolVersion.isLessThan(IBP_2_3_IV0)) { + if (syncGroupRequest.data.groupInstanceId != null && metadataCache.metadataVersion().isLessThan(IBP_2_3_IV0)) { // Only enable static membership when IBP >= 2.3, because it is not safe for the broker to use the static member logic // until we are sure that all brokers support it. If static group being loaded by an older coordinator, it will discard // the group.instance.id field, so static members could accidentally become "dynamic", which leads to wrong states. @@ -1898,7 +1898,7 @@ class KafkaApis(val requestChannel: RequestChannel, def handleHeartbeatRequest(request: RequestChannel.Request): CompletableFuture[Unit] = { val heartbeatRequest = request.body[HeartbeatRequest] - if (heartbeatRequest.data.groupInstanceId != null && config.interBrokerProtocolVersion.isLessThan(IBP_2_3_IV0)) { + if (heartbeatRequest.data.groupInstanceId != null && metadataCache.metadataVersion().isLessThan(IBP_2_3_IV0)) { // Only enable static membership when IBP >= 2.3, because it is not safe for the broker to use the static member logic // until we are sure that all brokers support it. If static group being loaded by an older coordinator, it will discard // the group.instance.id field, so static members could accidentally become "dynamic", which leads to wrong states. @@ -2529,8 +2529,8 @@ class KafkaApis(val requestChannel: RequestChannel, } def ensureInterBrokerVersion(version: MetadataVersion): Unit = { - if (config.interBrokerProtocolVersion.isLessThan(version)) - throw new UnsupportedVersionException(s"inter.broker.protocol.version: ${config.interBrokerProtocolVersion} is less than the required version: ${version}") + if (metadataCache.metadataVersion().isLessThan(version)) + throw new UnsupportedVersionException(s"inter.broker.protocol.version: ${metadataCache.metadataVersion()} is less than the required version: ${version}") } def handleAddPartitionsToTxnRequest(request: RequestChannel.Request, requestLocal: RequestLocal): Unit = { diff --git a/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala b/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala index d7138a89fef0a..66646bbe562e8 100644 --- a/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala @@ -62,7 +62,9 @@ abstract class AbstractCoordinatorConcurrencyTest[M <: CoordinatorMember] extend when(mockLogMger.liveLogDirs).thenReturn(Seq.empty) val producePurgatory = new DelayedOperationPurgatory[DelayedProduce]("Produce", timer, 1, reaperEnabled = false) val watchKeys = Collections.newSetFromMap(new ConcurrentHashMap[TopicPartitionOperationKey, java.lang.Boolean]()).asScala - replicaManager = TestReplicaManager(KafkaConfig.fromProps(serverProps), time, scheduler, timer, mockLogMger, mock(classOf[QuotaManagers], withSettings().stubOnly()), producePurgatory, watchKeys) + replicaManager = TestReplicaManager(KafkaConfig.fromProps(serverProps), + time, scheduler, timer, mockLogMger, mock(classOf[QuotaManagers], + withSettings().stubOnly()), producePurgatory, watchKeys, mock(classOf[MetadataCache])) zkClient = mock(classOf[KafkaZkClient], withSettings().stubOnly()) } @@ -173,7 +175,8 @@ object AbstractCoordinatorConcurrencyTest { val delayedFetchPurgatoryParam: DelayedOperationPurgatory[DelayedFetch], val delayedDeleteRecordsPurgatoryParam: DelayedOperationPurgatory[DelayedDeleteRecords], val delayedElectLeaderPurgatoryParam: DelayedOperationPurgatory[DelayedElectLeader], - val delayedRemoteFetchPurgatoryParam: DelayedOperationPurgatory[DelayedRemoteFetch]) + val delayedRemoteFetchPurgatoryParam: DelayedOperationPurgatory[DelayedRemoteFetch], + metadataCache: MetadataCache = null) extends ReplicaManager( config, metrics = null, @@ -182,7 +185,7 @@ object AbstractCoordinatorConcurrencyTest { logManager, None, quotaManagers, - null, + metadataCache, null, null, delayedProducePurgatoryParam = Some(producePurgatory), @@ -281,7 +284,8 @@ object AbstractCoordinatorConcurrencyTest { logManager: LogManager, quotaManagers: QuotaManagers, producePurgatory: DelayedOperationPurgatory[DelayedProduce], - watchKeys: mutable.Set[TopicPartitionOperationKey]): TestReplicaManager = { + watchKeys: mutable.Set[TopicPartitionOperationKey], + metadataCache: MetadataCache): TestReplicaManager = { val mockRemoteFetchPurgatory = new DelayedOperationPurgatory[DelayedRemoteFetch]( purgatoryName = "RemoteFetch", timer, reaperEnabled = false) val mockFetchPurgatory = new DelayedOperationPurgatory[DelayedFetch]( @@ -291,7 +295,7 @@ object AbstractCoordinatorConcurrencyTest { val mockElectLeaderPurgatory = new DelayedOperationPurgatory[DelayedElectLeader]( purgatoryName = "ElectLeader", timer, reaperEnabled = false) new TestReplicaManager(config, time, scheduler, logManager, quotaManagers, watchKeys, producePurgatory, - mockFetchPurgatory, mockDeleteRecordsPurgatory, mockElectLeaderPurgatory, mockRemoteFetchPurgatory) + mockFetchPurgatory, mockDeleteRecordsPurgatory, mockElectLeaderPurgatory, mockRemoteFetchPurgatory, metadataCache = metadataCache) } } } diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorConcurrencyTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorConcurrencyTest.scala index 6bdf6ade8d47d..6cd2884d6b81f 100644 --- a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorConcurrencyTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorConcurrencyTest.scala @@ -34,6 +34,7 @@ import org.apache.kafka.common.protocol.{ApiKeys, Errors} import org.apache.kafka.common.requests.{JoinGroupRequest, OffsetFetchResponse} import org.apache.kafka.common.utils.Time import org.apache.kafka.coordinator.group.GroupCoordinatorConfig +import org.apache.kafka.server.common.MetadataVersion import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.{AfterEach, BeforeEach, Test} import org.mockito.Mockito.when @@ -70,6 +71,8 @@ class GroupCoordinatorConcurrencyTest extends AbstractCoordinatorConcurrencyTest override def setUp(): Unit = { super.setUp() + when(replicaManager.metadataCache.metadataVersion()) + .thenReturn(MetadataVersion.latestProduction()) when(zkClient.getTopicPartitionCount(Topic.GROUP_METADATA_TOPIC_NAME)) .thenReturn(Some(numPartitions)) diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala index de4b7e3e795e1..16112ef616d15 100644 --- a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala @@ -19,7 +19,7 @@ package kafka.coordinator.group import java.util.{Optional, OptionalInt} import kafka.common.OffsetAndMetadata -import kafka.server.{ActionQueue, DelayedOperationPurgatory, HostedPartition, KafkaConfig, KafkaRequestHandler, ReplicaManager, RequestLocal} +import kafka.server.{ActionQueue, DelayedOperationPurgatory, HostedPartition, KafkaConfig, KafkaRequestHandler, MetadataCache, ReplicaManager, RequestLocal} import kafka.utils._ import org.apache.kafka.common.{TopicIdPartition, TopicPartition, Uuid} import org.apache.kafka.common.protocol.{ApiKeys, Errors} @@ -37,6 +37,7 @@ import org.apache.kafka.common.internals.Topic import org.apache.kafka.common.metrics.Metrics import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity import org.apache.kafka.coordinator.group.GroupCoordinatorConfig +import org.apache.kafka.server.common.MetadataVersion import org.apache.kafka.server.util.timer.MockTimer import org.apache.kafka.server.util.{KafkaScheduler, MockTime} import org.apache.kafka.storage.internals.log.{AppendOrigin, VerificationGuard} @@ -108,6 +109,10 @@ class GroupCoordinatorTest { ret += (Topic.GROUP_METADATA_TOPIC_NAME -> Map(0 -> Seq(1), 1 -> Seq(1))) replicaManager = mock(classOf[ReplicaManager]) + when(replicaManager.metadataCache) + .thenReturn(mock(classOf[MetadataCache])) + when(replicaManager.metadataCache.metadataVersion()) + .thenReturn(MetadataVersion.latestProduction()) zkClient = mock(classOf[KafkaZkClient]) // make two partitions of the group topic to make sure some partitions are not owned by the coordinator diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorConcurrencyTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorConcurrencyTest.scala index 3de08c1af7c2c..680ec5085ab81 100644 --- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorConcurrencyTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorConcurrencyTest.scala @@ -91,6 +91,8 @@ class TransactionCoordinatorConcurrencyTest extends AbstractCoordinatorConcurren true ) } + when(metadataCache.metadataVersion()) + .thenReturn(MetadataVersion.latestProduction()) txnStateManager = new TransactionStateManager(0, scheduler, replicaManager, metadataCache, txnConfig, time, new Metrics()) diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelManagerTest.scala index 3356c4f9e372c..d40932f3226ce 100644 --- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelManagerTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelManagerTest.scala @@ -28,6 +28,7 @@ import org.apache.kafka.common.record.RecordBatch import org.apache.kafka.common.requests.{RequestHeader, TransactionResult, WriteTxnMarkersRequest, WriteTxnMarkersResponse} import org.apache.kafka.common.utils.MockTime import org.apache.kafka.common.{Node, TopicPartition} +import org.apache.kafka.server.common.MetadataVersion import org.apache.kafka.server.metrics.{KafkaMetricsGroup, KafkaYammerMetrics} import org.apache.kafka.server.util.RequestAndCompletionHandler import org.junit.jupiter.api.Assertions._ @@ -86,6 +87,8 @@ class TransactionMarkerChannelManagerTest { .thenReturn(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch, txnMetadata1)))) when(txnStateManager.getTransactionState(ArgumentMatchers.eq(transactionalId2))) .thenReturn(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch, txnMetadata2)))) + when(metadataCache.metadataVersion()) + .thenReturn(MetadataVersion.latestProduction()) } @Test diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala index 8018f9697a2e2..015d94d4bbbcd 100644 --- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala @@ -129,7 +129,7 @@ class KafkaApisTest extends Logging { private val brokerId = 1 // KRaft tests should override this with a KRaftMetadataCache private var metadataCache: MetadataCache = MetadataCache.zkMetadataCache(brokerId, MetadataVersion.latestTesting()) - private val brokerEpochManager: ZkBrokerEpochManager = new ZkBrokerEpochManager(metadataCache, controller, None) + private var brokerEpochManager: ZkBrokerEpochManager = new ZkBrokerEpochManager(metadataCache, controller, None) private val clientQuotaManager: ClientQuotaManager = mock(classOf[ClientQuotaManager]) private val clientRequestQuotaManager: ClientRequestQuotaManager = mock(classOf[ClientRequestQuotaManager]) private val clientControllerQuotaManager: ControllerMutationQuotaManager = mock(classOf[ControllerMutationQuotaManager]) @@ -2919,6 +2919,8 @@ class KafkaApisTest extends Logging { @Test def shouldThrowUnsupportedVersionExceptionOnHandleAddOffsetToTxnRequestWhenInterBrokerProtocolNotSupported(): Unit = { + metadataCache = MetadataCache.zkMetadataCache(brokerId, IBP_0_10_2_IV0) + brokerEpochManager = new ZkBrokerEpochManager(metadataCache, controller, None) kafkaApis = createKafkaApis(IBP_0_10_2_IV0) assertThrows(classOf[UnsupportedVersionException], () => kafkaApis.handleAddOffsetsToTxnRequest(null, RequestLocal.withThreadConfinedCaching)) @@ -2926,6 +2928,8 @@ class KafkaApisTest extends Logging { @Test def shouldThrowUnsupportedVersionExceptionOnHandleAddPartitionsToTxnRequestWhenInterBrokerProtocolNotSupported(): Unit = { + metadataCache = MetadataCache.zkMetadataCache(brokerId, IBP_0_10_2_IV0) + brokerEpochManager = new ZkBrokerEpochManager(metadataCache, controller, None) kafkaApis = createKafkaApis(IBP_0_10_2_IV0) assertThrows(classOf[UnsupportedVersionException], () => kafkaApis.handleAddPartitionsToTxnRequest(null, RequestLocal.withThreadConfinedCaching)) @@ -2933,6 +2937,8 @@ class KafkaApisTest extends Logging { @Test def shouldThrowUnsupportedVersionExceptionOnHandleTxnOffsetCommitRequestWhenInterBrokerProtocolNotSupported(): Unit = { + metadataCache = MetadataCache.zkMetadataCache(brokerId, IBP_0_10_2_IV0) + brokerEpochManager = new ZkBrokerEpochManager(metadataCache, controller, None) kafkaApis = createKafkaApis(IBP_0_10_2_IV0) assertThrows(classOf[UnsupportedVersionException], () => kafkaApis.handleAddPartitionsToTxnRequest(null, RequestLocal.withThreadConfinedCaching)) @@ -2940,6 +2946,8 @@ class KafkaApisTest extends Logging { @Test def shouldThrowUnsupportedVersionExceptionOnHandleEndTxnRequestWhenInterBrokerProtocolNotSupported(): Unit = { + metadataCache = MetadataCache.zkMetadataCache(brokerId, IBP_0_10_2_IV0) + brokerEpochManager = new ZkBrokerEpochManager(metadataCache, controller, None) kafkaApis = createKafkaApis(IBP_0_10_2_IV0) assertThrows(classOf[UnsupportedVersionException], () => kafkaApis.handleEndTxnRequest(null, RequestLocal.withThreadConfinedCaching)) @@ -2947,6 +2955,8 @@ class KafkaApisTest extends Logging { @Test def shouldThrowUnsupportedVersionExceptionOnHandleWriteTxnMarkersRequestWhenInterBrokerProtocolNotSupported(): Unit = { + metadataCache = MetadataCache.zkMetadataCache(brokerId, IBP_0_10_2_IV0) + brokerEpochManager = new ZkBrokerEpochManager(metadataCache, controller, None) kafkaApis = createKafkaApis(IBP_0_10_2_IV0) assertThrows(classOf[UnsupportedVersionException], () => kafkaApis.handleWriteTxnMarkersRequest(null, RequestLocal.withThreadConfinedCaching)) @@ -8748,6 +8758,8 @@ class KafkaApisTest extends Logging { ).build() val requestChannelRequest = buildRequest(joinGroupRequest) + metadataCache = MetadataCache.zkMetadataCache(brokerId, IBP_2_2_IV1) + brokerEpochManager = new ZkBrokerEpochManager(metadataCache, controller, None) kafkaApis = createKafkaApis(IBP_2_2_IV1) kafkaApis.handleJoinGroupRequest(requestChannelRequest, RequestLocal.withThreadConfinedCaching) @@ -8766,6 +8778,8 @@ class KafkaApisTest extends Logging { ).build() val requestChannelRequest = buildRequest(syncGroupRequest) + metadataCache = MetadataCache.zkMetadataCache(brokerId, IBP_2_2_IV1) + brokerEpochManager = new ZkBrokerEpochManager(metadataCache, controller, None) kafkaApis = createKafkaApis(IBP_2_2_IV1) kafkaApis.handleSyncGroupRequest(requestChannelRequest, RequestLocal.withThreadConfinedCaching) @@ -8783,6 +8797,8 @@ class KafkaApisTest extends Logging { .setGenerationId(1) ).build() val requestChannelRequest = buildRequest(heartbeatRequest) + metadataCache = MetadataCache.zkMetadataCache(brokerId, IBP_2_2_IV1) + brokerEpochManager = new ZkBrokerEpochManager(metadataCache, controller, None) kafkaApis = createKafkaApis(IBP_2_2_IV1) kafkaApis.handleHeartbeatRequest(requestChannelRequest) @@ -8812,6 +8828,9 @@ class KafkaApisTest extends Logging { ).build() val requestChannelRequest = buildRequest(offsetCommitRequest) + + metadataCache = MetadataCache.zkMetadataCache(brokerId, IBP_2_2_IV1) + brokerEpochManager = new ZkBrokerEpochManager(metadataCache, controller, None) kafkaApis = createKafkaApis(IBP_2_2_IV1) kafkaApis.handleOffsetCommitRequest(requestChannelRequest, RequestLocal.withThreadConfinedCaching) diff --git a/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java b/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java index 94dff4c91fa7a..0574b97cdedf5 100644 --- a/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java +++ b/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java @@ -646,6 +646,14 @@ private static boolean checkIfMetadataChangedOrdered(MetadataVersion highVersion return version != lowVersion; } + public short writeTxnMarkersRequestVersion() { + if (isAtLeast(IBP_2_8_IV0)) { + return 1; + } else { + return 0; + } + } + public boolean isAtLeast(MetadataVersion otherVersion) { return this.compareTo(otherVersion) >= 0; } diff --git a/server-common/src/test/java/org/apache/kafka/server/common/MetadataVersionTest.java b/server-common/src/test/java/org/apache/kafka/server/common/MetadataVersionTest.java index 3908dd1cf6db9..d2867756bf38a 100644 --- a/server-common/src/test/java/org/apache/kafka/server/common/MetadataVersionTest.java +++ b/server-common/src/test/java/org/apache/kafka/server/common/MetadataVersionTest.java @@ -17,6 +17,7 @@ package org.apache.kafka.server.common; +import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.record.RecordVersion; import org.junit.jupiter.api.Test; @@ -466,6 +467,20 @@ public void assertLatestProductionIsLessThanLatest() { " to be less than the latest of " + MetadataVersion.latestTesting()); } + /** + * We need to ensure that the latest production MV doesn't inadvertently rely on an unstable + * request version. Currently, the broker selects the version for some inter-broker RPCs based on the MV + * rather than using the supported version from the ApiResponse. + */ + @Test + public void testProductionMetadataDontUseUnstableApiVersion() { + MetadataVersion mv = MetadataVersion.latestProduction(); + assertTrue(mv.listOffsetRequestVersion() <= ApiKeys.LIST_OFFSETS.latestVersion(false)); + assertTrue(mv.fetchRequestVersion() <= ApiKeys.FETCH.latestVersion(false)); + assertTrue(mv.offsetForLeaderEpochRequestVersion() <= ApiKeys.OFFSET_FOR_LEADER_EPOCH.latestVersion(false)); + assertTrue(mv.writeTxnMarkersRequestVersion() <= ApiKeys.WRITE_TXN_MARKERS.latestVersion(false)); + } + @Test public void assertLatestProductionIsProduction() { assertTrue(LATEST_PRODUCTION.isProduction()); From a4c11ef21cabe62c6dce3b1569ec69cc78632c07 Mon Sep 17 00:00:00 2001 From: m1a2st Date: Tue, 20 Aug 2024 21:44:14 +0800 Subject: [PATCH 2/6] revert replicaManager.metadataVersion --- .../scala/kafka/coordinator/group/GroupCoordinator.scala | 2 +- core/src/main/scala/kafka/server/ConfigHandler.scala | 4 ++-- .../coordinator/AbstractCoordinatorConcurrencyTest.scala | 4 +--- .../group/GroupCoordinatorConcurrencyTest.scala | 3 --- .../kafka/coordinator/group/GroupCoordinatorTest.scala | 8 +------- .../TransactionCoordinatorConcurrencyTest.scala | 2 -- .../transaction/TransactionMarkerChannelManagerTest.scala | 3 --- 7 files changed, 5 insertions(+), 21 deletions(-) diff --git a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala index a4640f6c1f7d8..f592f3c9cea82 100644 --- a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala +++ b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala @@ -1793,7 +1793,7 @@ object GroupCoordinator { groupMaxSize = config.groupCoordinatorConfig.classicGroupMaxSize, groupInitialRebalanceDelayMs = config.groupCoordinatorConfig.classicGroupInitialRebalanceDelayMs) - val groupMetadataManager = new GroupMetadataManager(config.brokerId, replicaManager.metadataCache.metadataVersion(), + val groupMetadataManager = new GroupMetadataManager(config.brokerId, config.interBrokerProtocolVersion, offsetConfig, replicaManager, time, metrics) new GroupCoordinator(config.brokerId, groupConfig, offsetConfig, groupMetadataManager, heartbeatPurgatory, rebalancePurgatory, time, metrics) diff --git a/core/src/main/scala/kafka/server/ConfigHandler.scala b/core/src/main/scala/kafka/server/ConfigHandler.scala index 37cbea7e4b628..77a3874a3ddc5 100644 --- a/core/src/main/scala/kafka/server/ConfigHandler.scala +++ b/core/src/main/scala/kafka/server/ConfigHandler.scala @@ -166,12 +166,12 @@ class TopicConfigHandler(private val replicaManager: ReplicaManager, private def excludedConfigs(topic: String, topicConfig: Properties): Set[String] = { // Verify message format version Option(topicConfig.getProperty(TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG)).flatMap { versionString => - val messageFormatVersion = new MessageFormatVersion(versionString, replicaManager.metadataCache.metadataVersion().version) + val messageFormatVersion = new MessageFormatVersion(versionString, kafkaConfig.interBrokerProtocolVersion.version) if (messageFormatVersion.shouldIgnore) { if (messageFormatVersion.shouldWarn) warn(messageFormatVersion.topicWarningMessage(topic)) Some(TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG) - } else if (replicaManager.metadataCache.metadataVersion().isLessThan(messageFormatVersion.messageFormatVersion)) { + } else if (kafkaConfig.interBrokerProtocolVersion.isLessThan(messageFormatVersion.messageFormatVersion)) { warn(s"Topic configuration ${TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG} is ignored for `$topic` because `$versionString` " + s"is higher than what is allowed by the inter-broker protocol version `${kafkaConfig.interBrokerProtocolVersionString}`") Some(TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG) diff --git a/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala b/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala index 66646bbe562e8..1b3d8f6cefd3a 100644 --- a/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala @@ -62,9 +62,7 @@ abstract class AbstractCoordinatorConcurrencyTest[M <: CoordinatorMember] extend when(mockLogMger.liveLogDirs).thenReturn(Seq.empty) val producePurgatory = new DelayedOperationPurgatory[DelayedProduce]("Produce", timer, 1, reaperEnabled = false) val watchKeys = Collections.newSetFromMap(new ConcurrentHashMap[TopicPartitionOperationKey, java.lang.Boolean]()).asScala - replicaManager = TestReplicaManager(KafkaConfig.fromProps(serverProps), - time, scheduler, timer, mockLogMger, mock(classOf[QuotaManagers], - withSettings().stubOnly()), producePurgatory, watchKeys, mock(classOf[MetadataCache])) + replicaManager = TestReplicaManager(KafkaConfig.fromProps(serverProps), time, scheduler, timer, mockLogMger, mock(classOf[QuotaManagers], withSettings().stubOnly()), producePurgatory, watchKeys) zkClient = mock(classOf[KafkaZkClient], withSettings().stubOnly()) } diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorConcurrencyTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorConcurrencyTest.scala index 6cd2884d6b81f..6bdf6ade8d47d 100644 --- a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorConcurrencyTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorConcurrencyTest.scala @@ -34,7 +34,6 @@ import org.apache.kafka.common.protocol.{ApiKeys, Errors} import org.apache.kafka.common.requests.{JoinGroupRequest, OffsetFetchResponse} import org.apache.kafka.common.utils.Time import org.apache.kafka.coordinator.group.GroupCoordinatorConfig -import org.apache.kafka.server.common.MetadataVersion import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.{AfterEach, BeforeEach, Test} import org.mockito.Mockito.when @@ -71,8 +70,6 @@ class GroupCoordinatorConcurrencyTest extends AbstractCoordinatorConcurrencyTest override def setUp(): Unit = { super.setUp() - when(replicaManager.metadataCache.metadataVersion()) - .thenReturn(MetadataVersion.latestProduction()) when(zkClient.getTopicPartitionCount(Topic.GROUP_METADATA_TOPIC_NAME)) .thenReturn(Some(numPartitions)) diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala index 16112ef616d15..81128af336527 100644 --- a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala @@ -19,7 +19,7 @@ package kafka.coordinator.group import java.util.{Optional, OptionalInt} import kafka.common.OffsetAndMetadata -import kafka.server.{ActionQueue, DelayedOperationPurgatory, HostedPartition, KafkaConfig, KafkaRequestHandler, MetadataCache, ReplicaManager, RequestLocal} +import kafka.server.{ActionQueue, DelayedOperationPurgatory, HostedPartition, KafkaConfig, KafkaRequestHandler, ReplicaManager, RequestLocal} import kafka.utils._ import org.apache.kafka.common.{TopicIdPartition, TopicPartition, Uuid} import org.apache.kafka.common.protocol.{ApiKeys, Errors} @@ -108,12 +108,6 @@ class GroupCoordinatorTest { val ret = mutable.Map[String, Map[Int, Seq[Int]]]() ret += (Topic.GROUP_METADATA_TOPIC_NAME -> Map(0 -> Seq(1), 1 -> Seq(1))) - replicaManager = mock(classOf[ReplicaManager]) - when(replicaManager.metadataCache) - .thenReturn(mock(classOf[MetadataCache])) - when(replicaManager.metadataCache.metadataVersion()) - .thenReturn(MetadataVersion.latestProduction()) - zkClient = mock(classOf[KafkaZkClient]) // make two partitions of the group topic to make sure some partitions are not owned by the coordinator when(zkClient.getTopicPartitionCount(Topic.GROUP_METADATA_TOPIC_NAME)).thenReturn(Some(2)) diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorConcurrencyTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorConcurrencyTest.scala index 680ec5085ab81..3de08c1af7c2c 100644 --- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorConcurrencyTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorConcurrencyTest.scala @@ -91,8 +91,6 @@ class TransactionCoordinatorConcurrencyTest extends AbstractCoordinatorConcurren true ) } - when(metadataCache.metadataVersion()) - .thenReturn(MetadataVersion.latestProduction()) txnStateManager = new TransactionStateManager(0, scheduler, replicaManager, metadataCache, txnConfig, time, new Metrics()) diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelManagerTest.scala index d40932f3226ce..3356c4f9e372c 100644 --- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelManagerTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelManagerTest.scala @@ -28,7 +28,6 @@ import org.apache.kafka.common.record.RecordBatch import org.apache.kafka.common.requests.{RequestHeader, TransactionResult, WriteTxnMarkersRequest, WriteTxnMarkersResponse} import org.apache.kafka.common.utils.MockTime import org.apache.kafka.common.{Node, TopicPartition} -import org.apache.kafka.server.common.MetadataVersion import org.apache.kafka.server.metrics.{KafkaMetricsGroup, KafkaYammerMetrics} import org.apache.kafka.server.util.RequestAndCompletionHandler import org.junit.jupiter.api.Assertions._ @@ -87,8 +86,6 @@ class TransactionMarkerChannelManagerTest { .thenReturn(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch, txnMetadata1)))) when(txnStateManager.getTransactionState(ArgumentMatchers.eq(transactionalId2))) .thenReturn(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch, txnMetadata2)))) - when(metadataCache.metadataVersion()) - .thenReturn(MetadataVersion.latestProduction()) } @Test From 6bfd3d7b3e31c364df4fc6ebaea24abbdc9f95eb Mon Sep 17 00:00:00 2001 From: m1a2st Date: Tue, 20 Aug 2024 21:47:27 +0800 Subject: [PATCH 3/6] revert replicaManager.metadataVersion --- .../AbstractCoordinatorConcurrencyTest.scala | 10 ++++------ .../kafka/coordinator/group/GroupCoordinatorTest.scala | 3 ++- 2 files changed, 6 insertions(+), 7 deletions(-) diff --git a/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala b/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala index 1b3d8f6cefd3a..d7138a89fef0a 100644 --- a/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala @@ -173,8 +173,7 @@ object AbstractCoordinatorConcurrencyTest { val delayedFetchPurgatoryParam: DelayedOperationPurgatory[DelayedFetch], val delayedDeleteRecordsPurgatoryParam: DelayedOperationPurgatory[DelayedDeleteRecords], val delayedElectLeaderPurgatoryParam: DelayedOperationPurgatory[DelayedElectLeader], - val delayedRemoteFetchPurgatoryParam: DelayedOperationPurgatory[DelayedRemoteFetch], - metadataCache: MetadataCache = null) + val delayedRemoteFetchPurgatoryParam: DelayedOperationPurgatory[DelayedRemoteFetch]) extends ReplicaManager( config, metrics = null, @@ -183,7 +182,7 @@ object AbstractCoordinatorConcurrencyTest { logManager, None, quotaManagers, - metadataCache, + null, null, null, delayedProducePurgatoryParam = Some(producePurgatory), @@ -282,8 +281,7 @@ object AbstractCoordinatorConcurrencyTest { logManager: LogManager, quotaManagers: QuotaManagers, producePurgatory: DelayedOperationPurgatory[DelayedProduce], - watchKeys: mutable.Set[TopicPartitionOperationKey], - metadataCache: MetadataCache): TestReplicaManager = { + watchKeys: mutable.Set[TopicPartitionOperationKey]): TestReplicaManager = { val mockRemoteFetchPurgatory = new DelayedOperationPurgatory[DelayedRemoteFetch]( purgatoryName = "RemoteFetch", timer, reaperEnabled = false) val mockFetchPurgatory = new DelayedOperationPurgatory[DelayedFetch]( @@ -293,7 +291,7 @@ object AbstractCoordinatorConcurrencyTest { val mockElectLeaderPurgatory = new DelayedOperationPurgatory[DelayedElectLeader]( purgatoryName = "ElectLeader", timer, reaperEnabled = false) new TestReplicaManager(config, time, scheduler, logManager, quotaManagers, watchKeys, producePurgatory, - mockFetchPurgatory, mockDeleteRecordsPurgatory, mockElectLeaderPurgatory, mockRemoteFetchPurgatory, metadataCache = metadataCache) + mockFetchPurgatory, mockDeleteRecordsPurgatory, mockElectLeaderPurgatory, mockRemoteFetchPurgatory) } } } diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala index 81128af336527..de4b7e3e795e1 100644 --- a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala @@ -37,7 +37,6 @@ import org.apache.kafka.common.internals.Topic import org.apache.kafka.common.metrics.Metrics import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity import org.apache.kafka.coordinator.group.GroupCoordinatorConfig -import org.apache.kafka.server.common.MetadataVersion import org.apache.kafka.server.util.timer.MockTimer import org.apache.kafka.server.util.{KafkaScheduler, MockTime} import org.apache.kafka.storage.internals.log.{AppendOrigin, VerificationGuard} @@ -108,6 +107,8 @@ class GroupCoordinatorTest { val ret = mutable.Map[String, Map[Int, Seq[Int]]]() ret += (Topic.GROUP_METADATA_TOPIC_NAME -> Map(0 -> Seq(1), 1 -> Seq(1))) + replicaManager = mock(classOf[ReplicaManager]) + zkClient = mock(classOf[KafkaZkClient]) // make two partitions of the group topic to make sure some partitions are not owned by the coordinator when(zkClient.getTopicPartitionCount(Topic.GROUP_METADATA_TOPIC_NAME)).thenReturn(Some(2)) From b2e7e63153f0271b024744afbef87cc27ea93b8b Mon Sep 17 00:00:00 2001 From: m1a2st Date: Wed, 21 Aug 2024 01:10:43 +0800 Subject: [PATCH 4/6] add new mock in TransactionCoordinatorConcurrencyTest --- .../transaction/TransactionCoordinatorConcurrencyTest.scala | 3 +++ 1 file changed, 3 insertions(+) diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorConcurrencyTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorConcurrencyTest.scala index 3de08c1af7c2c..44a35e981a286 100644 --- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorConcurrencyTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorConcurrencyTest.scala @@ -92,6 +92,9 @@ class TransactionCoordinatorConcurrencyTest extends AbstractCoordinatorConcurren ) } + when(metadataCache.metadataVersion()) + .thenReturn(MetadataVersion.latestProduction()) + txnStateManager = new TransactionStateManager(0, scheduler, replicaManager, metadataCache, txnConfig, time, new Metrics()) txnStateManager.startup(() => zkClient.getTopicPartitionCount(TRANSACTION_STATE_TOPIC_NAME).get, From 0888764d5b3c77256c2472d5abbe42e6a0f18c69 Mon Sep 17 00:00:00 2001 From: m1a2st Date: Wed, 21 Aug 2024 02:36:16 +0800 Subject: [PATCH 5/6] change error message from inter.broker.protocol.version to metadata.version --- core/src/main/scala/kafka/server/KafkaApis.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index c180e0fb0ca8f..a9d46fa0296eb 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -2530,7 +2530,7 @@ class KafkaApis(val requestChannel: RequestChannel, def ensureInterBrokerVersion(version: MetadataVersion): Unit = { if (metadataCache.metadataVersion().isLessThan(version)) - throw new UnsupportedVersionException(s"inter.broker.protocol.version: ${metadataCache.metadataVersion()} is less than the required version: ${version}") + throw new UnsupportedVersionException(s"metadata.version: ${metadataCache.metadataVersion()} is less than the required version: ${version}") } def handleAddPartitionsToTxnRequest(request: RequestChannel.Request, requestLocal: RequestLocal): Unit = { From 764f0e1476897f681c58c3d577a08755015f7cb3 Mon Sep 17 00:00:00 2001 From: m1a2st Date: Wed, 21 Aug 2024 07:12:29 +0800 Subject: [PATCH 6/6] fix TransactionMarkerChannelManagerTest --- .../transaction/TransactionMarkerChannelManagerTest.scala | 3 +++ 1 file changed, 3 insertions(+) diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelManagerTest.scala index 3356c4f9e372c..d40932f3226ce 100644 --- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelManagerTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelManagerTest.scala @@ -28,6 +28,7 @@ import org.apache.kafka.common.record.RecordBatch import org.apache.kafka.common.requests.{RequestHeader, TransactionResult, WriteTxnMarkersRequest, WriteTxnMarkersResponse} import org.apache.kafka.common.utils.MockTime import org.apache.kafka.common.{Node, TopicPartition} +import org.apache.kafka.server.common.MetadataVersion import org.apache.kafka.server.metrics.{KafkaMetricsGroup, KafkaYammerMetrics} import org.apache.kafka.server.util.RequestAndCompletionHandler import org.junit.jupiter.api.Assertions._ @@ -86,6 +87,8 @@ class TransactionMarkerChannelManagerTest { .thenReturn(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch, txnMetadata1)))) when(txnStateManager.getTransactionState(ArgumentMatchers.eq(transactionalId2))) .thenReturn(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch, txnMetadata2)))) + when(metadataCache.metadataVersion()) + .thenReturn(MetadataVersion.latestProduction()) } @Test