Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/cluster/Partition.scala
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ object Partition {
new Partition(topicPartition,
_topicId = topicId,
replicaLagTimeMaxMs = replicaManager.config.replicaLagTimeMaxMs,
interBrokerProtocolVersion = replicaManager.config.interBrokerProtocolVersion,
interBrokerProtocolVersion = replicaManager.metadataCache.metadataVersion(),
localBrokerId = replicaManager.config.brokerId,
localBrokerEpochSupplier = replicaManager.brokerEpochSupplier,
time = time,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ import org.apache.kafka.common.requests.{TransactionResult, WriteTxnMarkersReque
import org.apache.kafka.common.security.JaasContext
import org.apache.kafka.common.utils.{LogContext, Time}
import org.apache.kafka.common.{Node, Reconfigurable, TopicPartition}
import org.apache.kafka.server.common.MetadataVersion.IBP_2_8_IV0
import org.apache.kafka.server.metrics.KafkaMetricsGroup
import org.apache.kafka.server.util.{InterBrokerSendThread, RequestAndCompletionHandler}

Expand Down Expand Up @@ -180,10 +179,6 @@ class TransactionMarkerChannelManager(

private val transactionsWithPendingMarkers = new ConcurrentHashMap[String, PendingCompleteTxn]

private val writeTxnMarkersRequestVersion: Short =
if (config.interBrokerProtocolVersion.isAtLeast(IBP_2_8_IV0)) 1
else 0

metricsGroup.newGauge(UnknownDestinationQueueSizeMetricName, () => markersQueueForUnknownBroker.totalNumMarkers)
metricsGroup.newGauge(LogAppendRetryQueueSizeMetricName, () => txnLogAppendRetryQueue.size)

Expand Down Expand Up @@ -261,7 +256,9 @@ class TransactionMarkerChannelManager(
}.filter { case (_, entries) => !entries.isEmpty }.map { case (node, entries) =>
val markersToSend = entries.asScala.map(_.txnMarkerEntry).asJava
val requestCompletionHandler = new TransactionMarkerRequestCompletionHandler(node.id, txnStateManager, this, entries)
val request = new WriteTxnMarkersRequest.Builder(writeTxnMarkersRequestVersion, markersToSend)
val request = new WriteTxnMarkersRequest.Builder(
metadataCache.metadataVersion().writeTxnMarkersRequestVersion(), markersToSend
)

new RequestAndCompletionHandler(
currentTimeMs,
Expand Down
12 changes: 6 additions & 6 deletions core/src/main/scala/kafka/server/KafkaApis.scala
Original file line number Diff line number Diff line change
Expand Up @@ -455,7 +455,7 @@ class KafkaApis(val requestChannel: RequestChannel,
if (!authHelper.authorize(request.context, READ, GROUP, offsetCommitRequest.data.groupId)) {
requestHelper.sendMaybeThrottle(request, offsetCommitRequest.getErrorResponse(Errors.GROUP_AUTHORIZATION_FAILED.exception))
CompletableFuture.completedFuture[Unit](())
} else if (offsetCommitRequest.data.groupInstanceId != null && config.interBrokerProtocolVersion.isLessThan(IBP_2_3_IV0)) {
} else if (offsetCommitRequest.data.groupInstanceId != null && metadataCache.metadataVersion().isLessThan(IBP_2_3_IV0)) {
// Only enable static membership when IBP >= 2.3, because it is not safe for the broker to use the static member logic
// until we are sure that all brokers support it. If static group being loaded by an older coordinator, it will discard
// the group.instance.id field, so static members could accidentally become "dynamic", which leads to wrong states.
Expand Down Expand Up @@ -1799,7 +1799,7 @@ class KafkaApis(val requestChannel: RequestChannel,
): CompletableFuture[Unit] = {
val joinGroupRequest = request.body[JoinGroupRequest]

if (joinGroupRequest.data.groupInstanceId != null && config.interBrokerProtocolVersion.isLessThan(IBP_2_3_IV0)) {
if (joinGroupRequest.data.groupInstanceId != null && metadataCache.metadataVersion().isLessThan(IBP_2_3_IV0)) {
// Only enable static membership when IBP >= 2.3, because it is not safe for the broker to use the static member logic
// until we are sure that all brokers support it. If static group being loaded by an older coordinator, it will discard
// the group.instance.id field, so static members could accidentally become "dynamic", which leads to wrong states.
Expand Down Expand Up @@ -1829,7 +1829,7 @@ class KafkaApis(val requestChannel: RequestChannel,
): CompletableFuture[Unit] = {
val syncGroupRequest = request.body[SyncGroupRequest]

if (syncGroupRequest.data.groupInstanceId != null && config.interBrokerProtocolVersion.isLessThan(IBP_2_3_IV0)) {
if (syncGroupRequest.data.groupInstanceId != null && metadataCache.metadataVersion().isLessThan(IBP_2_3_IV0)) {
// Only enable static membership when IBP >= 2.3, because it is not safe for the broker to use the static member logic
// until we are sure that all brokers support it. If static group being loaded by an older coordinator, it will discard
// the group.instance.id field, so static members could accidentally become "dynamic", which leads to wrong states.
Expand Down Expand Up @@ -1898,7 +1898,7 @@ class KafkaApis(val requestChannel: RequestChannel,
def handleHeartbeatRequest(request: RequestChannel.Request): CompletableFuture[Unit] = {
val heartbeatRequest = request.body[HeartbeatRequest]

if (heartbeatRequest.data.groupInstanceId != null && config.interBrokerProtocolVersion.isLessThan(IBP_2_3_IV0)) {
if (heartbeatRequest.data.groupInstanceId != null && metadataCache.metadataVersion().isLessThan(IBP_2_3_IV0)) {
// Only enable static membership when IBP >= 2.3, because it is not safe for the broker to use the static member logic
// until we are sure that all brokers support it. If static group being loaded by an older coordinator, it will discard
// the group.instance.id field, so static members could accidentally become "dynamic", which leads to wrong states.
Expand Down Expand Up @@ -2529,8 +2529,8 @@ class KafkaApis(val requestChannel: RequestChannel,
}

def ensureInterBrokerVersion(version: MetadataVersion): Unit = {
if (config.interBrokerProtocolVersion.isLessThan(version))
throw new UnsupportedVersionException(s"inter.broker.protocol.version: ${config.interBrokerProtocolVersion} is less than the required version: ${version}")
if (metadataCache.metadataVersion().isLessThan(version))
throw new UnsupportedVersionException(s"metadata.version: ${metadataCache.metadataVersion()} is less than the required version: ${version}")
}

def handleAddPartitionsToTxnRequest(request: RequestChannel.Request, requestLocal: RequestLocal): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,9 @@ class TransactionCoordinatorConcurrencyTest extends AbstractCoordinatorConcurren
)
}

when(metadataCache.metadataVersion())
.thenReturn(MetadataVersion.latestProduction())

txnStateManager = new TransactionStateManager(0, scheduler, replicaManager, metadataCache, txnConfig, time,
new Metrics())
txnStateManager.startup(() => zkClient.getTopicPartitionCount(TRANSACTION_STATE_TOPIC_NAME).get,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import org.apache.kafka.common.record.RecordBatch
import org.apache.kafka.common.requests.{RequestHeader, TransactionResult, WriteTxnMarkersRequest, WriteTxnMarkersResponse}
import org.apache.kafka.common.utils.MockTime
import org.apache.kafka.common.{Node, TopicPartition}
import org.apache.kafka.server.common.MetadataVersion
import org.apache.kafka.server.metrics.{KafkaMetricsGroup, KafkaYammerMetrics}
import org.apache.kafka.server.util.RequestAndCompletionHandler
import org.junit.jupiter.api.Assertions._
Expand Down Expand Up @@ -86,6 +87,8 @@ class TransactionMarkerChannelManagerTest {
.thenReturn(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch, txnMetadata1))))
when(txnStateManager.getTransactionState(ArgumentMatchers.eq(transactionalId2)))
.thenReturn(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch, txnMetadata2))))
when(metadataCache.metadataVersion())
.thenReturn(MetadataVersion.latestProduction())
}

@Test
Expand Down
21 changes: 20 additions & 1 deletion core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ class KafkaApisTest extends Logging {
private val brokerId = 1
// KRaft tests should override this with a KRaftMetadataCache
private var metadataCache: MetadataCache = MetadataCache.zkMetadataCache(brokerId, MetadataVersion.latestTesting())
private val brokerEpochManager: ZkBrokerEpochManager = new ZkBrokerEpochManager(metadataCache, controller, None)
private var brokerEpochManager: ZkBrokerEpochManager = new ZkBrokerEpochManager(metadataCache, controller, None)
private val clientQuotaManager: ClientQuotaManager = mock(classOf[ClientQuotaManager])
private val clientRequestQuotaManager: ClientRequestQuotaManager = mock(classOf[ClientRequestQuotaManager])
private val clientControllerQuotaManager: ControllerMutationQuotaManager = mock(classOf[ControllerMutationQuotaManager])
Expand Down Expand Up @@ -2919,34 +2919,44 @@ class KafkaApisTest extends Logging {

@Test
def shouldThrowUnsupportedVersionExceptionOnHandleAddOffsetToTxnRequestWhenInterBrokerProtocolNotSupported(): Unit = {
metadataCache = MetadataCache.zkMetadataCache(brokerId, IBP_0_10_2_IV0)
brokerEpochManager = new ZkBrokerEpochManager(metadataCache, controller, None)
kafkaApis = createKafkaApis(IBP_0_10_2_IV0)
assertThrows(classOf[UnsupportedVersionException],
() => kafkaApis.handleAddOffsetsToTxnRequest(null, RequestLocal.withThreadConfinedCaching))
}

@Test
def shouldThrowUnsupportedVersionExceptionOnHandleAddPartitionsToTxnRequestWhenInterBrokerProtocolNotSupported(): Unit = {
metadataCache = MetadataCache.zkMetadataCache(brokerId, IBP_0_10_2_IV0)
brokerEpochManager = new ZkBrokerEpochManager(metadataCache, controller, None)
kafkaApis = createKafkaApis(IBP_0_10_2_IV0)
assertThrows(classOf[UnsupportedVersionException],
() => kafkaApis.handleAddPartitionsToTxnRequest(null, RequestLocal.withThreadConfinedCaching))
}

@Test
def shouldThrowUnsupportedVersionExceptionOnHandleTxnOffsetCommitRequestWhenInterBrokerProtocolNotSupported(): Unit = {
metadataCache = MetadataCache.zkMetadataCache(brokerId, IBP_0_10_2_IV0)
brokerEpochManager = new ZkBrokerEpochManager(metadataCache, controller, None)
kafkaApis = createKafkaApis(IBP_0_10_2_IV0)
assertThrows(classOf[UnsupportedVersionException],
() => kafkaApis.handleAddPartitionsToTxnRequest(null, RequestLocal.withThreadConfinedCaching))
}

@Test
def shouldThrowUnsupportedVersionExceptionOnHandleEndTxnRequestWhenInterBrokerProtocolNotSupported(): Unit = {
metadataCache = MetadataCache.zkMetadataCache(brokerId, IBP_0_10_2_IV0)
brokerEpochManager = new ZkBrokerEpochManager(metadataCache, controller, None)
kafkaApis = createKafkaApis(IBP_0_10_2_IV0)
assertThrows(classOf[UnsupportedVersionException],
() => kafkaApis.handleEndTxnRequest(null, RequestLocal.withThreadConfinedCaching))
}

@Test
def shouldThrowUnsupportedVersionExceptionOnHandleWriteTxnMarkersRequestWhenInterBrokerProtocolNotSupported(): Unit = {
metadataCache = MetadataCache.zkMetadataCache(brokerId, IBP_0_10_2_IV0)
brokerEpochManager = new ZkBrokerEpochManager(metadataCache, controller, None)
kafkaApis = createKafkaApis(IBP_0_10_2_IV0)
assertThrows(classOf[UnsupportedVersionException],
() => kafkaApis.handleWriteTxnMarkersRequest(null, RequestLocal.withThreadConfinedCaching))
Expand Down Expand Up @@ -8748,6 +8758,8 @@ class KafkaApisTest extends Logging {
).build()

val requestChannelRequest = buildRequest(joinGroupRequest)
metadataCache = MetadataCache.zkMetadataCache(brokerId, IBP_2_2_IV1)
brokerEpochManager = new ZkBrokerEpochManager(metadataCache, controller, None)
kafkaApis = createKafkaApis(IBP_2_2_IV1)
kafkaApis.handleJoinGroupRequest(requestChannelRequest, RequestLocal.withThreadConfinedCaching)

Expand All @@ -8766,6 +8778,8 @@ class KafkaApisTest extends Logging {
).build()

val requestChannelRequest = buildRequest(syncGroupRequest)
metadataCache = MetadataCache.zkMetadataCache(brokerId, IBP_2_2_IV1)
brokerEpochManager = new ZkBrokerEpochManager(metadataCache, controller, None)
kafkaApis = createKafkaApis(IBP_2_2_IV1)
kafkaApis.handleSyncGroupRequest(requestChannelRequest, RequestLocal.withThreadConfinedCaching)

Expand All @@ -8783,6 +8797,8 @@ class KafkaApisTest extends Logging {
.setGenerationId(1)
).build()
val requestChannelRequest = buildRequest(heartbeatRequest)
metadataCache = MetadataCache.zkMetadataCache(brokerId, IBP_2_2_IV1)
brokerEpochManager = new ZkBrokerEpochManager(metadataCache, controller, None)
kafkaApis = createKafkaApis(IBP_2_2_IV1)
kafkaApis.handleHeartbeatRequest(requestChannelRequest)

Expand Down Expand Up @@ -8812,6 +8828,9 @@ class KafkaApisTest extends Logging {
).build()

val requestChannelRequest = buildRequest(offsetCommitRequest)

metadataCache = MetadataCache.zkMetadataCache(brokerId, IBP_2_2_IV1)
brokerEpochManager = new ZkBrokerEpochManager(metadataCache, controller, None)
kafkaApis = createKafkaApis(IBP_2_2_IV1)
kafkaApis.handleOffsetCommitRequest(requestChannelRequest, RequestLocal.withThreadConfinedCaching)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -646,6 +646,14 @@ private static boolean checkIfMetadataChangedOrdered(MetadataVersion highVersion
return version != lowVersion;
}

public short writeTxnMarkersRequestVersion() {
if (isAtLeast(IBP_2_8_IV0)) {
return 1;
} else {
return 0;
}
}

public boolean isAtLeast(MetadataVersion otherVersion) {
return this.compareTo(otherVersion) >= 0;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.kafka.server.common;

import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.record.RecordVersion;

import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -466,6 +467,20 @@ public void assertLatestProductionIsLessThanLatest() {
" to be less than the latest of " + MetadataVersion.latestTesting());
}

/**
* We need to ensure that the latest production MV doesn't inadvertently rely on an unstable
* request version. Currently, the broker selects the version for some inter-broker RPCs based on the MV
* rather than using the supported version from the ApiResponse.
*/
@Test
public void testProductionMetadataDontUseUnstableApiVersion() {
MetadataVersion mv = MetadataVersion.latestProduction();
assertTrue(mv.listOffsetRequestVersion() <= ApiKeys.LIST_OFFSETS.latestVersion(false));
assertTrue(mv.fetchRequestVersion() <= ApiKeys.FETCH.latestVersion(false));
assertTrue(mv.offsetForLeaderEpochRequestVersion() <= ApiKeys.OFFSET_FOR_LEADER_EPOCH.latestVersion(false));
assertTrue(mv.writeTxnMarkersRequestVersion() <= ApiKeys.WRITE_TXN_MARKERS.latestVersion(false));
}

@Test
public void assertLatestProductionIsProduction() {
assertTrue(LATEST_PRODUCTION.isProduction());
Expand Down