diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java index bd28aa26aa977..3f42ee9d6889f 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java @@ -108,7 +108,7 @@ public enum ApiKeys { UNREGISTER_BROKER(ApiMessageType.UNREGISTER_BROKER, false, RecordBatch.MAGIC_VALUE_V0, true), DESCRIBE_TRANSACTIONS(ApiMessageType.DESCRIBE_TRANSACTIONS), LIST_TRANSACTIONS(ApiMessageType.LIST_TRANSACTIONS), - ALLOCATE_PRODUCER_IDS(ApiMessageType.ALLOCATE_PRODUCER_IDS, true, false); + ALLOCATE_PRODUCER_IDS(ApiMessageType.ALLOCATE_PRODUCER_IDS, true, true); private static final Map> APIS_BY_LISTENER = new EnumMap<>(ApiMessageType.ListenerType.class); diff --git a/clients/src/main/resources/common/message/AllocateProducerIdsRequest.json b/clients/src/main/resources/common/message/AllocateProducerIdsRequest.json index 0cfa494291a36..6f37313c3a6e4 100644 --- a/clients/src/main/resources/common/message/AllocateProducerIdsRequest.json +++ b/clients/src/main/resources/common/message/AllocateProducerIdsRequest.json @@ -16,7 +16,7 @@ { "apiKey": 67, "type": "request", - "listeners": ["controller", "zkBroker"], + "listeners": ["zkBroker", "broker", "controller"], "name": "AllocateProducerIdsRequest", "validVersions": "0", "flexibleVersions": "0+", diff --git a/clients/src/main/resources/common/message/InitProducerIdRequest.json b/clients/src/main/resources/common/message/InitProducerIdRequest.json index 5537aa95d3db8..4e75352db6f33 100644 --- a/clients/src/main/resources/common/message/InitProducerIdRequest.json +++ b/clients/src/main/resources/common/message/InitProducerIdRequest.json @@ -16,7 +16,7 @@ { "apiKey": 22, "type": "request", - "listeners": ["zkBroker"], + "listeners": ["zkBroker", "broker"], "name": "InitProducerIdRequest", // Version 1 is the same as version 0. // diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala b/core/src/main/scala/kafka/server/BrokerServer.scala index bea0c53c1d78f..4c76903173ab3 100644 --- a/core/src/main/scala/kafka/server/BrokerServer.scala +++ b/core/src/main/scala/kafka/server/BrokerServer.scala @@ -42,7 +42,7 @@ import org.apache.kafka.common.security.auth.SecurityProtocol import org.apache.kafka.common.security.scram.internals.ScramMechanism import org.apache.kafka.common.security.token.delegation.internals.DelegationTokenCache import org.apache.kafka.common.utils.{AppInfoParser, LogContext, Time, Utils} -import org.apache.kafka.common.{ClusterResource, Endpoint, KafkaException} +import org.apache.kafka.common.{ClusterResource, Endpoint} import org.apache.kafka.metadata.{BrokerState, VersionRange} import org.apache.kafka.raft.RaftConfig import org.apache.kafka.raft.RaftConfig.AddressSpec @@ -244,11 +244,18 @@ class BrokerServer( // Hardcode Time.SYSTEM for now as some Streams tests fail otherwise, it would be good to fix the underlying issue groupCoordinator = GroupCoordinator(config, replicaManager, Time.SYSTEM, metrics) + val producerIdManagerSupplier = () => ProducerIdManager.rpc( + config.brokerId, + brokerEpochSupplier = () => lifecycleManager.brokerEpoch(), + clientToControllerChannelManager, + config.requestTimeoutMs + ) + // Create transaction coordinator, but don't start it until we've started replica manager. // Hardcode Time.SYSTEM for now as some Streams tests fail otherwise, it would be good to fix the underlying issue transactionCoordinator = TransactionCoordinator(config, replicaManager, new KafkaScheduler(threads = 1, threadNamePrefix = "transaction-log-manager-"), - createTemporaryProducerIdManager, metrics, metadataCache, Time.SYSTEM) + producerIdManagerSupplier, metrics, metadataCache, Time.SYSTEM) autoTopicCreationManager = new DefaultAutoTopicCreationManager( config, Some(clientToControllerChannelManager), None, None, @@ -376,24 +383,6 @@ class BrokerServer( } } - class TemporaryProducerIdManager() extends ProducerIdManager { - val maxProducerIdsPerBrokerEpoch = 1000000 - var currentOffset = -1 - override def generateProducerId(): Long = { - currentOffset = currentOffset + 1 - if (currentOffset >= maxProducerIdsPerBrokerEpoch) { - fatal(s"Exhausted all demo/temporary producerIds as the next one will has extend past the block size of $maxProducerIdsPerBrokerEpoch") - throw new KafkaException("Have exhausted all demo/temporary producerIds.") - } - lifecycleManager.initialCatchUpFuture.get() - lifecycleManager.brokerEpoch() * maxProducerIdsPerBrokerEpoch + currentOffset - } - } - - def createTemporaryProducerIdManager(): ProducerIdManager = { - new TemporaryProducerIdManager() - } - def shutdown(): Unit = { if (!maybeChangeStatus(STARTED, SHUTTING_DOWN)) return try { diff --git a/core/src/main/scala/kafka/server/ControllerApis.scala b/core/src/main/scala/kafka/server/ControllerApis.scala index 47bc19d69553a..83aea27b5e68e 100644 --- a/core/src/main/scala/kafka/server/ControllerApis.scala +++ b/core/src/main/scala/kafka/server/ControllerApis.scala @@ -100,6 +100,7 @@ class ControllerApis(val requestChannel: RequestChannel, case ApiKeys.ENVELOPE => handleEnvelopeRequest(request) case ApiKeys.SASL_HANDSHAKE => handleSaslHandshakeRequest(request) case ApiKeys.SASL_AUTHENTICATE => handleSaslAuthenticateRequest(request) + case ApiKeys.ALLOCATE_PRODUCER_IDS => handleAllocateProducerIdsRequest(request) case ApiKeys.CREATE_PARTITIONS => handleCreatePartitions(request) case ApiKeys.DESCRIBE_ACLS => aclApis.handleDescribeAcls(request) case ApiKeys.CREATE_ACLS => aclApis.handleCreateAcls(request) @@ -767,4 +768,20 @@ class ControllerApis(val requestChannel: RequestChannel, requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => new ListPartitionReassignmentsResponse(response.setThrottleTimeMs(requestThrottleMs))) } + + def handleAllocateProducerIdsRequest(request: RequestChannel.Request): Unit = { + val allocatedProducerIdsRequest = request.body[AllocateProducerIdsRequest] + authHelper.authorizeClusterOperation(request, CLUSTER_ACTION) + controller.allocateProducerIds(allocatedProducerIdsRequest.data) + .whenComplete((results, exception) => { + if (exception != null) { + requestHelper.handleError(request, exception) + } else { + requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => { + results.setThrottleTimeMs(requestThrottleMs) + new AllocateProducerIdsResponse(results) + }) + } + }) + } } diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index d75e4ae3651d7..ffb24c810e575 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -216,7 +216,7 @@ class KafkaApis(val requestChannel: RequestChannel, case ApiKeys.UNREGISTER_BROKER => maybeForwardToController(request, handleUnregisterBrokerRequest) case ApiKeys.DESCRIBE_TRANSACTIONS => handleDescribeTransactionsRequest(request) case ApiKeys.LIST_TRANSACTIONS => handleListTransactionsRequest(request) - case ApiKeys.ALLOCATE_PRODUCER_IDS => handleAllocateProducerIdsRequest(request) + case ApiKeys.ALLOCATE_PRODUCER_IDS => maybeForwardToController(request, handleAllocateProducerIdsRequest) case _ => throw new IllegalStateException(s"No handler for request api key ${request.header.apiKey}") } } catch { diff --git a/core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala b/core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala index 6dfaa1800406c..c7085f176d229 100644 --- a/core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala +++ b/core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala @@ -182,6 +182,7 @@ class BrokerMetadataListener( case rec: RemoveTopicRecord => handleRemoveTopicRecord(imageBuilder, rec) case rec: ConfigRecord => handleConfigRecord(rec) case rec: QuotaRecord => handleQuotaRecord(imageBuilder, rec) + case rec: ProducerIdsRecord => handleProducerIdRecord(rec) case _ => throw new RuntimeException(s"Unhandled record $record with type $recordType") } } @@ -259,6 +260,11 @@ class BrokerMetadataListener( clientQuotaManager.handleQuotaRecord(record) } + def handleProducerIdRecord(record: ProducerIdsRecord): Unit = { + // This is a no-op since brokers get their producer ID blocks directly from the controller via + // AllocateProducerIds RPC response + } + class HandleNewLeaderEvent(leaderAndEpoch: LeaderAndEpoch) extends EventQueue.FailureLoggingEvent(log) { override def run(): Unit = { diff --git a/core/src/test/java/kafka/test/MockController.java b/core/src/test/java/kafka/test/MockController.java index 1fba2952165b0..68e73fd331bd7 100644 --- a/core/src/test/java/kafka/test/MockController.java +++ b/core/src/test/java/kafka/test/MockController.java @@ -21,6 +21,8 @@ import org.apache.kafka.common.Uuid; import org.apache.kafka.common.config.ConfigResource; import org.apache.kafka.common.errors.NotControllerException; +import org.apache.kafka.common.message.AllocateProducerIdsRequestData; +import org.apache.kafka.common.message.AllocateProducerIdsResponseData; import org.apache.kafka.common.message.AlterIsrRequestData; import org.apache.kafka.common.message.AlterIsrResponseData; import org.apache.kafka.common.message.AlterPartitionReassignmentsRequestData; @@ -301,6 +303,11 @@ public CompletableFuture waitForReadyBrokers(int minBrokers) { throw new UnsupportedOperationException(); } + @Override + public CompletableFuture allocateProducerIds(AllocateProducerIdsRequestData request) { + throw new UnsupportedOperationException(); + } + @Override synchronized public CompletableFuture> createPartitions(long deadlineNs, List topicList) { diff --git a/core/src/test/scala/integration/kafka/coordinator/transaction/ProducerIdsIntegrationTest.scala b/core/src/test/scala/integration/kafka/coordinator/transaction/ProducerIdsIntegrationTest.scala index 6c7c248e63188..d5e082a0a8e57 100644 --- a/core/src/test/scala/integration/kafka/coordinator/transaction/ProducerIdsIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/coordinator/transaction/ProducerIdsIntegrationTest.scala @@ -44,7 +44,8 @@ class ProducerIdsIntegrationTest { @ClusterTests(Array( new ClusterTest(clusterType = Type.ZK, brokers = 3, ibp = "2.8"), - new ClusterTest(clusterType = Type.ZK, brokers = 3, ibp = "3.0-IV0") + new ClusterTest(clusterType = Type.ZK, brokers = 3, ibp = "3.0-IV0"), + new ClusterTest(clusterType = Type.RAFT, brokers = 3, ibp = "3.0-IV0") )) def testUniqueProducerIds(clusterInstance: ClusterInstance): Unit = { verifyUniqueIds(clusterInstance) diff --git a/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala b/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala index b0fa2b36b940f..59eb2b8f66bad 100644 --- a/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala +++ b/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala @@ -409,6 +409,14 @@ class ControllerApisTest { new AlterPartitionReassignmentsRequestData()).build()))) } + @Test + def testUnauthorizedHandleAllocateProducerIds(): Unit = { + assertThrows(classOf[ClusterAuthorizationException], () => createControllerApis( + Some(createDenyAllAuthorizer()), new MockController.Builder().build()). + handleAllocateProducerIdsRequest(buildRequest(new AllocateProducerIdsRequest.Builder( + new AllocateProducerIdsRequestData()).build()))) + } + @Test def testUnauthorizedHandleListPartitionReassignments(): Unit = { assertThrows(classOf[ClusterAuthorizationException], () => createControllerApis( diff --git a/metadata/src/main/java/org/apache/kafka/controller/Controller.java b/metadata/src/main/java/org/apache/kafka/controller/Controller.java index a34b084ea1302..3cb0d26ee6b60 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/Controller.java +++ b/metadata/src/main/java/org/apache/kafka/controller/Controller.java @@ -20,6 +20,8 @@ import org.apache.kafka.clients.admin.AlterConfigOp; import org.apache.kafka.common.Uuid; import org.apache.kafka.common.config.ConfigResource; +import org.apache.kafka.common.message.AllocateProducerIdsRequestData; +import org.apache.kafka.common.message.AllocateProducerIdsResponseData; import org.apache.kafka.common.message.AlterIsrRequestData; import org.apache.kafka.common.message.AlterIsrResponseData; import org.apache.kafka.common.message.AlterPartitionReassignmentsRequestData; @@ -223,6 +225,15 @@ CompletableFuture> alterClientQuotas( Collection quotaAlterations, boolean validateOnly ); + /** + * Allocate a block of producer IDs for transactional and idempotent producers + * @param request The allocate producer IDs request + * @return A future which yields a new producer ID block as a response + */ + CompletableFuture allocateProducerIds( + AllocateProducerIdsRequestData request + ); + /** * Begin writing a controller snapshot. If there was already an ongoing snapshot, it * simply returns information about that snapshot rather than starting a new one. diff --git a/metadata/src/main/java/org/apache/kafka/controller/ProducerIdControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/ProducerIdControlManager.java new file mode 100644 index 0000000000000..924605c6d910f --- /dev/null +++ b/metadata/src/main/java/org/apache/kafka/controller/ProducerIdControlManager.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.controller; + +import org.apache.kafka.common.errors.UnknownServerException; +import org.apache.kafka.common.metadata.ProducerIdsRecord; +import org.apache.kafka.server.common.ApiMessageAndVersion; +import org.apache.kafka.server.common.ProducerIdsBlock; +import org.apache.kafka.timeline.SnapshotRegistry; +import org.apache.kafka.timeline.TimelineLong; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; + + +public class ProducerIdControlManager { + + private final ClusterControlManager clusterControlManager; + private final TimelineLong lastProducerId; + + ProducerIdControlManager(ClusterControlManager clusterControlManager, SnapshotRegistry snapshotRegistry) { + this.clusterControlManager = clusterControlManager; + this.lastProducerId = new TimelineLong(snapshotRegistry, 0L); + } + + ControllerResult generateNextProducerId(int brokerId, long brokerEpoch) { + clusterControlManager.checkBrokerEpoch(brokerId, brokerEpoch); + + long producerId = lastProducerId.get(); + + if (producerId > Long.MAX_VALUE - ProducerIdsBlock.PRODUCER_ID_BLOCK_SIZE) { + throw new UnknownServerException("Exhausted all producerIds as the next block's end producerId " + + "is will has exceeded long type limit"); + } + + long nextProducerId = producerId + ProducerIdsBlock.PRODUCER_ID_BLOCK_SIZE; + ProducerIdsRecord record = new ProducerIdsRecord() + .setProducerIdsEnd(nextProducerId) + .setBrokerId(brokerId) + .setBrokerEpoch(brokerEpoch); + ProducerIdsBlock block = new ProducerIdsBlock(brokerId, producerId, ProducerIdsBlock.PRODUCER_ID_BLOCK_SIZE); + return ControllerResult.of(Collections.singletonList(new ApiMessageAndVersion(record, (short) 0)), block); + } + + void replay(ProducerIdsRecord record) { + long currentProducerId = lastProducerId.get(); + if (record.producerIdsEnd() <= currentProducerId) { + throw new RuntimeException("Producer ID from record is not monotonically increasing"); + } else { + lastProducerId.set(record.producerIdsEnd()); + } + } + + Iterator> iterator(long epoch) { + List records = new ArrayList<>(1); + + long producerId = lastProducerId.get(epoch); + if (producerId > 0) { + records.add(new ApiMessageAndVersion( + new ProducerIdsRecord() + .setProducerIdsEnd(producerId) + .setBrokerId(0) + .setBrokerEpoch(0L), + (short) 0)); + } + return Collections.singleton(records).iterator(); + } +} diff --git a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java index 746d9068488d9..bee0b69eff4e3 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java +++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java @@ -24,6 +24,8 @@ import org.apache.kafka.common.errors.ApiException; import org.apache.kafka.common.errors.NotControllerException; import org.apache.kafka.common.errors.UnknownServerException; +import org.apache.kafka.common.message.AllocateProducerIdsRequestData; +import org.apache.kafka.common.message.AllocateProducerIdsResponseData; import org.apache.kafka.common.message.AlterIsrRequestData; import org.apache.kafka.common.message.AlterIsrResponseData; import org.apache.kafka.common.message.AlterPartitionReassignmentsRequestData; @@ -44,6 +46,7 @@ import org.apache.kafka.common.metadata.MetadataRecordType; import org.apache.kafka.common.metadata.PartitionChangeRecord; import org.apache.kafka.common.metadata.PartitionRecord; +import org.apache.kafka.common.metadata.ProducerIdsRecord; import org.apache.kafka.common.metadata.QuotaRecord; import org.apache.kafka.common.metadata.RegisterBrokerRecord; import org.apache.kafka.common.metadata.RemoveTopicRecord; @@ -359,7 +362,8 @@ void createSnapshotGenerator(long epoch) { new Section("cluster", clusterControl.iterator(epoch)), new Section("replication", replicationControl.iterator(epoch)), new Section("configuration", configurationControl.iterator(epoch)), - new Section("clientQuotas", clientQuotaControlManager.iterator(epoch)))); + new Section("clientQuotas", clientQuotaControlManager.iterator(epoch)), + new Section("producerIds", producerIdControlManager.iterator(epoch)))); reschedule(0); } @@ -855,6 +859,9 @@ private void replay(ApiMessage message, Optional snapshotId, lon case QUOTA_RECORD: clientQuotaControlManager.replay((QuotaRecord) message); break; + case PRODUCER_IDS_RECORD: + producerIdControlManager.replay((ProducerIdsRecord) message); + break; default: throw new RuntimeException("Unhandled record type " + type); } @@ -929,6 +936,12 @@ private void replay(ApiMessage message, Optional snapshotId, lon */ private final FeatureControlManager featureControl; + /** + * An object which stores the controller's view of the latest producer ID + * that has been generated. This must be accessed only by the event queue thread. + */ + private final ProducerIdControlManager producerIdControlManager; + /** * An object which stores the controller's view of topics and partitions. * This must be accessed only by the event queue thread. @@ -995,6 +1008,7 @@ private QuorumController(LogContext logContext, this.clusterControl = new ClusterControlManager(logContext, time, snapshotRegistry, sessionTimeoutNs, replicaPlacer); this.featureControl = new FeatureControlManager(supportedFeatures, snapshotRegistry); + this.producerIdControlManager = new ProducerIdControlManager(clusterControl, snapshotRegistry); this.snapshotGeneratorManager = new SnapshotGeneratorManager(snapshotWriterBuilder); this.replicationControl = new ReplicationControlManager(snapshotRegistry, logContext, defaultReplicationFactor, defaultNumPartitions, @@ -1199,6 +1213,16 @@ public CompletableFuture> alterClientQuotas( }); } + @Override + public CompletableFuture allocateProducerIds( + AllocateProducerIdsRequestData request) { + return appendWriteEvent("allocateProducerIds", + () -> producerIdControlManager.generateNextProducerId(request.brokerId(), request.brokerEpoch())) + .thenApply(result -> new AllocateProducerIdsResponseData() + .setProducerIdStart(result.producerIdStart()) + .setProducerIdLen(result.producerIdLen())); + } + @Override public CompletableFuture> createPartitions(long deadlineNs, List topics) { diff --git a/metadata/src/main/java/org/apache/kafka/controller/ResultOrError.java b/metadata/src/main/java/org/apache/kafka/controller/ResultOrError.java index 2fedacdb5e1c8..6a548c4e40255 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/ResultOrError.java +++ b/metadata/src/main/java/org/apache/kafka/controller/ResultOrError.java @@ -42,6 +42,14 @@ public ResultOrError(T result) { this.result = result; } + public static ResultOrError of(T result) { + return new ResultOrError<>(result); + } + + public static ResultOrError of(ApiError error) { + return new ResultOrError<>(error); + } + public boolean isError() { return error != null; } diff --git a/metadata/src/main/java/org/apache/kafka/timeline/TimelineLong.java b/metadata/src/main/java/org/apache/kafka/timeline/TimelineLong.java index e057391c4bcd3..36a300ff94998 100644 --- a/metadata/src/main/java/org/apache/kafka/timeline/TimelineLong.java +++ b/metadata/src/main/java/org/apache/kafka/timeline/TimelineLong.java @@ -47,8 +47,12 @@ public void mergeFrom(long destinationEpoch, Delta delta) { private long value; public TimelineLong(SnapshotRegistry snapshotRegistry) { + this(snapshotRegistry, 0L); + } + + public TimelineLong(SnapshotRegistry snapshotRegistry, long value) { this.snapshotRegistry = snapshotRegistry; - this.value = 0; + this.value = value; } public long get() { diff --git a/metadata/src/main/resources/common/metadata/ProducerIdsRecord.json b/metadata/src/main/resources/common/metadata/ProducerIdsRecord.json new file mode 100644 index 0000000000000..09e6b536129e2 --- /dev/null +++ b/metadata/src/main/resources/common/metadata/ProducerIdsRecord.json @@ -0,0 +1,29 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +{ + "apiKey": 15, + "type": "metadata", + "name": "ProducerIdsRecord", + "validVersions": "0", + "fields": [ + { "name": "BrokerId", "type": "int32", "versions": "0+", "entityType": "brokerId", + "about": "The ID of the requesting broker" }, + { "name": "BrokerEpoch", "type": "int64", "versions": "0+", "default": "-1", + "about": "The epoch of the requesting broker" }, + { "name": "ProducerIdsEnd", "type": "int64", "versions": "0+", + "about": "The highest producer ID that has been generated"} + ] +} diff --git a/metadata/src/test/java/org/apache/kafka/controller/ProducerIdControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/ProducerIdControlManagerTest.java new file mode 100644 index 0000000000000..f96510ddae11e --- /dev/null +++ b/metadata/src/test/java/org/apache/kafka/controller/ProducerIdControlManagerTest.java @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.controller; + +import org.apache.kafka.common.errors.StaleBrokerEpochException; +import org.apache.kafka.common.errors.UnknownServerException; +import org.apache.kafka.common.metadata.ProducerIdsRecord; +import org.apache.kafka.common.metadata.RegisterBrokerRecord; +import org.apache.kafka.common.security.auth.SecurityProtocol; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.MockTime; +import org.apache.kafka.server.common.ApiMessageAndVersion; +import org.apache.kafka.server.common.ProducerIdsBlock; +import org.apache.kafka.timeline.SnapshotRegistry; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.Iterator; +import java.util.List; +import java.util.Random; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + + +public class ProducerIdControlManagerTest { + + private SnapshotRegistry snapshotRegistry; + private ClusterControlManager clusterControl; + private ProducerIdControlManager producerIdControlManager; + + @BeforeEach + public void setUp() { + final LogContext logContext = new LogContext(); + final MockTime time = new MockTime(); + final Random random = new Random(); + snapshotRegistry = new SnapshotRegistry(logContext); + clusterControl = new ClusterControlManager( + logContext, time, snapshotRegistry, 1000, + new StripedReplicaPlacer(random)); + + clusterControl.activate(); + for (int i = 0; i < 4; i++) { + RegisterBrokerRecord brokerRecord = new RegisterBrokerRecord().setBrokerEpoch(100).setBrokerId(i); + brokerRecord.endPoints().add(new RegisterBrokerRecord.BrokerEndpoint(). + setSecurityProtocol(SecurityProtocol.PLAINTEXT.id). + setPort((short) 9092). + setName("PLAINTEXT"). + setHost(String.format("broker-%02d.example.org", i))); + clusterControl.replay(brokerRecord); + } + + this.producerIdControlManager = new ProducerIdControlManager(clusterControl, snapshotRegistry); + } + + @Test + public void testInitialResult() { + ControllerResult result = + producerIdControlManager.generateNextProducerId(1, 100); + assertEquals(0, result.response().producerIdStart()); + assertEquals(1000, result.response().producerIdLen()); + ProducerIdsRecord record = (ProducerIdsRecord) result.records().get(0).message(); + assertEquals(1000, record.producerIdsEnd()); + } + + @Test + public void testMonotonic() { + producerIdControlManager.replay( + new ProducerIdsRecord() + .setBrokerId(1) + .setBrokerEpoch(100) + .setProducerIdsEnd(42)); + + ProducerIdsBlock range = + producerIdControlManager.generateNextProducerId(1, 100).response(); + assertEquals(42, range.producerIdStart()); + + // Can't go backwards in Producer IDs + assertThrows(RuntimeException.class, () -> { + producerIdControlManager.replay( + new ProducerIdsRecord() + .setBrokerId(1) + .setBrokerEpoch(100) + .setProducerIdsEnd(40)); + }, "Producer ID range must only increase"); + range = producerIdControlManager.generateNextProducerId(1, 100).response(); + assertEquals(42, range.producerIdStart()); + + // Gaps in the ID range are okay. + producerIdControlManager.replay( + new ProducerIdsRecord() + .setBrokerId(1) + .setBrokerEpoch(100) + .setProducerIdsEnd(50)); + range = producerIdControlManager.generateNextProducerId(1, 100).response(); + assertEquals(50, range.producerIdStart()); + } + + @Test + public void testUnknownBrokerOrEpoch() { + ControllerResult result; + + assertThrows(StaleBrokerEpochException.class, () -> + producerIdControlManager.generateNextProducerId(99, 0)); + + assertThrows(StaleBrokerEpochException.class, () -> + producerIdControlManager.generateNextProducerId(1, 99)); + } + + @Test + public void testMaxValue() { + producerIdControlManager.replay( + new ProducerIdsRecord() + .setBrokerId(1) + .setBrokerEpoch(100) + .setProducerIdsEnd(Long.MAX_VALUE - 1)); + + assertThrows(UnknownServerException.class, () -> + producerIdControlManager.generateNextProducerId(1, 100)); + } + + @Test + public void testSnapshotIterator() { + ProducerIdsBlock range = null; + for (int i = 0; i < 100; i++) { + range = generateProducerIds(producerIdControlManager, i % 4, 100); + } + + Iterator> snapshotIterator = producerIdControlManager.iterator(Long.MAX_VALUE); + assertTrue(snapshotIterator.hasNext()); + List batch = snapshotIterator.next(); + assertEquals(1, batch.size(), "Producer IDs record batch should only contain a single record"); + assertEquals(range.producerIdStart() + range.producerIdLen(), ((ProducerIdsRecord) batch.get(0).message()).producerIdsEnd()); + assertFalse(snapshotIterator.hasNext(), "Producer IDs iterator should only contain a single batch"); + + ProducerIdControlManager newProducerIdManager = new ProducerIdControlManager(clusterControl, snapshotRegistry); + snapshotIterator = producerIdControlManager.iterator(Long.MAX_VALUE); + while (snapshotIterator.hasNext()) { + snapshotIterator.next().forEach(message -> newProducerIdManager.replay((ProducerIdsRecord) message.message())); + } + + // Verify that after reloading state from this "snapshot", we don't produce any overlapping IDs + long lastProducerID = range.producerIdStart() + range.producerIdLen() - 1; + range = generateProducerIds(producerIdControlManager, 1, 100); + assertTrue(range.producerIdStart() > lastProducerID); + } + + static ProducerIdsBlock generateProducerIds( + ProducerIdControlManager producerIdControlManager, int brokerId, long brokerEpoch) { + ControllerResult result = + producerIdControlManager.generateNextProducerId(brokerId, brokerEpoch); + result.records().forEach(apiMessageAndVersion -> + producerIdControlManager.replay((ProducerIdsRecord) apiMessageAndVersion.message())); + return result.response(); + } +} diff --git a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java index c6114dee0891b..5a39f82c98cfc 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java @@ -34,6 +34,7 @@ import org.apache.kafka.common.Uuid; import org.apache.kafka.common.config.ConfigResource; import org.apache.kafka.common.errors.TimeoutException; +import org.apache.kafka.common.message.AllocateProducerIdsRequestData; import org.apache.kafka.common.message.AlterPartitionReassignmentsRequestData.ReassignableTopic; import org.apache.kafka.common.message.AlterPartitionReassignmentsRequestData; import org.apache.kafka.common.message.AlterPartitionReassignmentsResponseData; @@ -54,6 +55,7 @@ import org.apache.kafka.common.message.ListPartitionReassignmentsRequestData; import org.apache.kafka.common.message.ListPartitionReassignmentsResponseData; import org.apache.kafka.common.metadata.PartitionRecord; +import org.apache.kafka.common.metadata.ProducerIdsRecord; import org.apache.kafka.common.metadata.RegisterBrokerRecord.BrokerEndpoint; import org.apache.kafka.common.metadata.RegisterBrokerRecord.BrokerEndpointCollection; import org.apache.kafka.common.metadata.RegisterBrokerRecord; @@ -267,6 +269,8 @@ public void testSnapshotSaveAndLoad() throws Throwable { setBrokerIds(Arrays.asList(1, 2, 0))). iterator()))).iterator()))).get(); fooId = fooData.topics().find("foo").topicId(); + active.allocateProducerIds( + new AllocateProducerIdsRequestData().setBrokerId(0).setBrokerEpoch(brokerEpochs.get(0))).get(); long snapshotEpoch = active.beginWritingSnapshot().get(); writer = snapshotWriterBuilder.writers.takeFirst(); assertEquals(snapshotEpoch, writer.epoch()); @@ -338,7 +342,11 @@ private void checkSnapshotContents(Uuid fooId, setEndPoints(new BrokerEndpointCollection(Arrays.asList( new BrokerEndpoint().setName("PLAINTEXT").setHost("localhost"). setPort(9095).setSecurityProtocol((short) 0)).iterator())). - setRack(null), (short) 0))), + setRack(null), (short) 0)), + Arrays.asList(new ApiMessageAndVersion(new ProducerIdsRecord(). + setBrokerId(0). + setBrokerEpoch(brokerEpochs.get(0)). + setProducerIdsEnd(1000), (short) 0))), iterator); }