From 3ecbb4d514cda153f59feebc5619ccc3dbffa9dc Mon Sep 17 00:00:00 2001 From: David Arthur Date: Fri, 21 May 2021 16:07:25 -0400 Subject: [PATCH 1/5] QuorumController support for AllocateProducerIds --- .../apache/kafka/common/protocol/ApiKeys.java | 2 +- .../message/AllocateProducerIdsRequest.json | 2 +- .../common/message/InitProducerIdRequest.json | 2 +- .../scala/kafka/server/BrokerServer.scala | 30 +-- .../scala/kafka/server/ControllerApis.scala | 17 ++ .../main/scala/kafka/server/KafkaApis.scala | 2 +- .../metadata/BrokerMetadataListener.scala | 7 +- .../test/java/kafka/test/MockController.java | 7 + .../ProducerIdsIntegrationTest.scala | 3 +- .../apache/kafka/controller/Controller.java | 11 ++ .../controller/ProducerIdControlManager.java | 105 +++++++++++ .../kafka/controller/QuorumController.java | 32 +++- .../kafka/controller/ResultOrError.java | 8 + .../common/metadata/ProducerIdsRecord.json | 29 +++ .../ProducerIdControlManagerTest.java | 174 ++++++++++++++++++ 15 files changed, 403 insertions(+), 28 deletions(-) create mode 100644 metadata/src/main/java/org/apache/kafka/controller/ProducerIdControlManager.java create mode 100644 metadata/src/main/resources/common/metadata/ProducerIdsRecord.json create mode 100644 metadata/src/test/java/org/apache/kafka/controller/ProducerIdControlManagerTest.java diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java index bd28aa26aa977..3f42ee9d6889f 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java @@ -108,7 +108,7 @@ public enum ApiKeys { UNREGISTER_BROKER(ApiMessageType.UNREGISTER_BROKER, false, RecordBatch.MAGIC_VALUE_V0, true), DESCRIBE_TRANSACTIONS(ApiMessageType.DESCRIBE_TRANSACTIONS), LIST_TRANSACTIONS(ApiMessageType.LIST_TRANSACTIONS), - ALLOCATE_PRODUCER_IDS(ApiMessageType.ALLOCATE_PRODUCER_IDS, true, false); + ALLOCATE_PRODUCER_IDS(ApiMessageType.ALLOCATE_PRODUCER_IDS, true, true); private static final Map> APIS_BY_LISTENER = new EnumMap<>(ApiMessageType.ListenerType.class); diff --git a/clients/src/main/resources/common/message/AllocateProducerIdsRequest.json b/clients/src/main/resources/common/message/AllocateProducerIdsRequest.json index 0cfa494291a36..6f37313c3a6e4 100644 --- a/clients/src/main/resources/common/message/AllocateProducerIdsRequest.json +++ b/clients/src/main/resources/common/message/AllocateProducerIdsRequest.json @@ -16,7 +16,7 @@ { "apiKey": 67, "type": "request", - "listeners": ["controller", "zkBroker"], + "listeners": ["zkBroker", "broker", "controller"], "name": "AllocateProducerIdsRequest", "validVersions": "0", "flexibleVersions": "0+", diff --git a/clients/src/main/resources/common/message/InitProducerIdRequest.json b/clients/src/main/resources/common/message/InitProducerIdRequest.json index 5537aa95d3db8..4e75352db6f33 100644 --- a/clients/src/main/resources/common/message/InitProducerIdRequest.json +++ b/clients/src/main/resources/common/message/InitProducerIdRequest.json @@ -16,7 +16,7 @@ { "apiKey": 22, "type": "request", - "listeners": ["zkBroker"], + "listeners": ["zkBroker", "broker"], "name": "InitProducerIdRequest", // Version 1 is the same as version 0. // diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala b/core/src/main/scala/kafka/server/BrokerServer.scala index bea0c53c1d78f..0a7a64e93f730 100644 --- a/core/src/main/scala/kafka/server/BrokerServer.scala +++ b/core/src/main/scala/kafka/server/BrokerServer.scala @@ -22,7 +22,6 @@ import java.util.concurrent.atomic.AtomicBoolean import java.util.concurrent.locks.ReentrantLock import java.util.concurrent.{CompletableFuture, TimeUnit, TimeoutException} import java.net.InetAddress - import kafka.cluster.Broker.ServerInfo import kafka.coordinator.group.GroupCoordinator import kafka.coordinator.transaction.{ProducerIdManager, TransactionCoordinator} @@ -42,7 +41,7 @@ import org.apache.kafka.common.security.auth.SecurityProtocol import org.apache.kafka.common.security.scram.internals.ScramMechanism import org.apache.kafka.common.security.token.delegation.internals.DelegationTokenCache import org.apache.kafka.common.utils.{AppInfoParser, LogContext, Time, Utils} -import org.apache.kafka.common.{ClusterResource, Endpoint, KafkaException} +import org.apache.kafka.common.{ClusterResource, Endpoint} import org.apache.kafka.metadata.{BrokerState, VersionRange} import org.apache.kafka.raft.RaftConfig import org.apache.kafka.raft.RaftConfig.AddressSpec @@ -246,9 +245,16 @@ class BrokerServer( // Create transaction coordinator, but don't start it until we've started replica manager. // Hardcode Time.SYSTEM for now as some Streams tests fail otherwise, it would be good to fix the underlying issue + val producerIdManagerSupplier = () => ProducerIdManager.rpc( + config.brokerId, + brokerEpochSupplier = () => lifecycleManager.brokerEpoch(), + clientToControllerChannelManager, + config.requestTimeoutMs + ) + transactionCoordinator = TransactionCoordinator(config, replicaManager, new KafkaScheduler(threads = 1, threadNamePrefix = "transaction-log-manager-"), - createTemporaryProducerIdManager, metrics, metadataCache, Time.SYSTEM) + producerIdManagerSupplier, metrics, metadataCache, Time.SYSTEM) autoTopicCreationManager = new DefaultAutoTopicCreationManager( config, Some(clientToControllerChannelManager), None, None, @@ -376,24 +382,6 @@ class BrokerServer( } } - class TemporaryProducerIdManager() extends ProducerIdManager { - val maxProducerIdsPerBrokerEpoch = 1000000 - var currentOffset = -1 - override def generateProducerId(): Long = { - currentOffset = currentOffset + 1 - if (currentOffset >= maxProducerIdsPerBrokerEpoch) { - fatal(s"Exhausted all demo/temporary producerIds as the next one will has extend past the block size of $maxProducerIdsPerBrokerEpoch") - throw new KafkaException("Have exhausted all demo/temporary producerIds.") - } - lifecycleManager.initialCatchUpFuture.get() - lifecycleManager.brokerEpoch() * maxProducerIdsPerBrokerEpoch + currentOffset - } - } - - def createTemporaryProducerIdManager(): ProducerIdManager = { - new TemporaryProducerIdManager() - } - def shutdown(): Unit = { if (!maybeChangeStatus(STARTED, SHUTTING_DOWN)) return try { diff --git a/core/src/main/scala/kafka/server/ControllerApis.scala b/core/src/main/scala/kafka/server/ControllerApis.scala index 47bc19d69553a..83aea27b5e68e 100644 --- a/core/src/main/scala/kafka/server/ControllerApis.scala +++ b/core/src/main/scala/kafka/server/ControllerApis.scala @@ -100,6 +100,7 @@ class ControllerApis(val requestChannel: RequestChannel, case ApiKeys.ENVELOPE => handleEnvelopeRequest(request) case ApiKeys.SASL_HANDSHAKE => handleSaslHandshakeRequest(request) case ApiKeys.SASL_AUTHENTICATE => handleSaslAuthenticateRequest(request) + case ApiKeys.ALLOCATE_PRODUCER_IDS => handleAllocateProducerIdsRequest(request) case ApiKeys.CREATE_PARTITIONS => handleCreatePartitions(request) case ApiKeys.DESCRIBE_ACLS => aclApis.handleDescribeAcls(request) case ApiKeys.CREATE_ACLS => aclApis.handleCreateAcls(request) @@ -767,4 +768,20 @@ class ControllerApis(val requestChannel: RequestChannel, requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => new ListPartitionReassignmentsResponse(response.setThrottleTimeMs(requestThrottleMs))) } + + def handleAllocateProducerIdsRequest(request: RequestChannel.Request): Unit = { + val allocatedProducerIdsRequest = request.body[AllocateProducerIdsRequest] + authHelper.authorizeClusterOperation(request, CLUSTER_ACTION) + controller.allocateProducerIds(allocatedProducerIdsRequest.data) + .whenComplete((results, exception) => { + if (exception != null) { + requestHelper.handleError(request, exception) + } else { + requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => { + results.setThrottleTimeMs(requestThrottleMs) + new AllocateProducerIdsResponse(results) + }) + } + }) + } } diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index d75e4ae3651d7..ffb24c810e575 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -216,7 +216,7 @@ class KafkaApis(val requestChannel: RequestChannel, case ApiKeys.UNREGISTER_BROKER => maybeForwardToController(request, handleUnregisterBrokerRequest) case ApiKeys.DESCRIBE_TRANSACTIONS => handleDescribeTransactionsRequest(request) case ApiKeys.LIST_TRANSACTIONS => handleListTransactionsRequest(request) - case ApiKeys.ALLOCATE_PRODUCER_IDS => handleAllocateProducerIdsRequest(request) + case ApiKeys.ALLOCATE_PRODUCER_IDS => maybeForwardToController(request, handleAllocateProducerIdsRequest) case _ => throw new IllegalStateException(s"No handler for request api key ${request.header.apiKey}") } } catch { diff --git a/core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala b/core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala index 6dfaa1800406c..a10f7ba1bcc0c 100644 --- a/core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala +++ b/core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala @@ -182,6 +182,7 @@ class BrokerMetadataListener( case rec: RemoveTopicRecord => handleRemoveTopicRecord(imageBuilder, rec) case rec: ConfigRecord => handleConfigRecord(rec) case rec: QuotaRecord => handleQuotaRecord(imageBuilder, rec) + case rec: ProducerIdsRecord => handleProducerIdRecord(rec) case _ => throw new RuntimeException(s"Unhandled record $record with type $recordType") } } @@ -259,7 +260,11 @@ class BrokerMetadataListener( clientQuotaManager.handleQuotaRecord(record) } - class HandleNewLeaderEvent(leaderAndEpoch: LeaderAndEpoch) + def handleProducerIdRecord(record: ProducerIdsRecord): Unit = { + // no-op + } + + class HandleNewLeaderEvent(leader: MetaLogLeader) extends EventQueue.FailureLoggingEvent(log) { override def run(): Unit = { val imageBuilder = diff --git a/core/src/test/java/kafka/test/MockController.java b/core/src/test/java/kafka/test/MockController.java index 1fba2952165b0..68e73fd331bd7 100644 --- a/core/src/test/java/kafka/test/MockController.java +++ b/core/src/test/java/kafka/test/MockController.java @@ -21,6 +21,8 @@ import org.apache.kafka.common.Uuid; import org.apache.kafka.common.config.ConfigResource; import org.apache.kafka.common.errors.NotControllerException; +import org.apache.kafka.common.message.AllocateProducerIdsRequestData; +import org.apache.kafka.common.message.AllocateProducerIdsResponseData; import org.apache.kafka.common.message.AlterIsrRequestData; import org.apache.kafka.common.message.AlterIsrResponseData; import org.apache.kafka.common.message.AlterPartitionReassignmentsRequestData; @@ -301,6 +303,11 @@ public CompletableFuture waitForReadyBrokers(int minBrokers) { throw new UnsupportedOperationException(); } + @Override + public CompletableFuture allocateProducerIds(AllocateProducerIdsRequestData request) { + throw new UnsupportedOperationException(); + } + @Override synchronized public CompletableFuture> createPartitions(long deadlineNs, List topicList) { diff --git a/core/src/test/scala/integration/kafka/coordinator/transaction/ProducerIdsIntegrationTest.scala b/core/src/test/scala/integration/kafka/coordinator/transaction/ProducerIdsIntegrationTest.scala index 6c7c248e63188..d5e082a0a8e57 100644 --- a/core/src/test/scala/integration/kafka/coordinator/transaction/ProducerIdsIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/coordinator/transaction/ProducerIdsIntegrationTest.scala @@ -44,7 +44,8 @@ class ProducerIdsIntegrationTest { @ClusterTests(Array( new ClusterTest(clusterType = Type.ZK, brokers = 3, ibp = "2.8"), - new ClusterTest(clusterType = Type.ZK, brokers = 3, ibp = "3.0-IV0") + new ClusterTest(clusterType = Type.ZK, brokers = 3, ibp = "3.0-IV0"), + new ClusterTest(clusterType = Type.RAFT, brokers = 3, ibp = "3.0-IV0") )) def testUniqueProducerIds(clusterInstance: ClusterInstance): Unit = { verifyUniqueIds(clusterInstance) diff --git a/metadata/src/main/java/org/apache/kafka/controller/Controller.java b/metadata/src/main/java/org/apache/kafka/controller/Controller.java index a34b084ea1302..3cb0d26ee6b60 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/Controller.java +++ b/metadata/src/main/java/org/apache/kafka/controller/Controller.java @@ -20,6 +20,8 @@ import org.apache.kafka.clients.admin.AlterConfigOp; import org.apache.kafka.common.Uuid; import org.apache.kafka.common.config.ConfigResource; +import org.apache.kafka.common.message.AllocateProducerIdsRequestData; +import org.apache.kafka.common.message.AllocateProducerIdsResponseData; import org.apache.kafka.common.message.AlterIsrRequestData; import org.apache.kafka.common.message.AlterIsrResponseData; import org.apache.kafka.common.message.AlterPartitionReassignmentsRequestData; @@ -223,6 +225,15 @@ CompletableFuture> alterClientQuotas( Collection quotaAlterations, boolean validateOnly ); + /** + * Allocate a block of producer IDs for transactional and idempotent producers + * @param request The allocate producer IDs request + * @return A future which yields a new producer ID block as a response + */ + CompletableFuture allocateProducerIds( + AllocateProducerIdsRequestData request + ); + /** * Begin writing a controller snapshot. If there was already an ongoing snapshot, it * simply returns information about that snapshot rather than starting a new one. diff --git a/metadata/src/main/java/org/apache/kafka/controller/ProducerIdControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/ProducerIdControlManager.java new file mode 100644 index 0000000000000..72c07d216a999 --- /dev/null +++ b/metadata/src/main/java/org/apache/kafka/controller/ProducerIdControlManager.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.controller; + +import org.apache.kafka.common.errors.StaleBrokerEpochException; +import org.apache.kafka.common.metadata.ProducerIdsRecord; +import org.apache.kafka.common.requests.ApiError; +import org.apache.kafka.server.common.ApiMessageAndVersion; +import org.apache.kafka.server.common.ProducerIdsBlock; +import org.apache.kafka.timeline.SnapshotRegistry; +import org.apache.kafka.timeline.TimelineHashMap; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +import static org.apache.kafka.common.protocol.Errors.UNKNOWN_SERVER_ERROR; + + +public class ProducerIdControlManager { + private static final Object PRODUCER_ID_KEY = new Object(); + private static final int PRODUCER_ID_BLOCK_SIZE = 1000; + + private final ClusterControlManager clusterControlManager; + private final TimelineHashMap lastProducerId; + + ProducerIdControlManager(ClusterControlManager clusterControlManager, SnapshotRegistry snapshotRegistry) { + this.clusterControlManager = clusterControlManager; + this.lastProducerId = new TimelineHashMap<>(snapshotRegistry, 0); + } + + ControllerResult> generateNextProducerId(int brokerId, long brokerEpoch) { + try { + clusterControlManager.checkBrokerEpoch(brokerId, brokerEpoch); + } catch (StaleBrokerEpochException e) { + return ControllerResult.of(Collections.emptyList(), ResultOrError.of(ApiError.fromThrowable(e))); + } + + long producerId = lastProducerId.getOrDefault(PRODUCER_ID_KEY, 0L); + + if (producerId > Long.MAX_VALUE - PRODUCER_ID_BLOCK_SIZE) { + ApiError error = new ApiError(UNKNOWN_SERVER_ERROR, + "Exhausted all producerIds as the next block's end producerId " + + "is will has exceeded long type limit"); + return ControllerResult.of(Collections.emptyList(), ResultOrError.of(error)); + } + + long nextProducerId = producerId + PRODUCER_ID_BLOCK_SIZE; + ProducerIdsRecord record = new ProducerIdsRecord() + .setProducerIdsEnd(nextProducerId) + .setBrokerId(brokerId) + .setBrokerEpoch(brokerEpoch); + ProducerIdsBlock block = new ProducerIdsBlock(-1, producerId, PRODUCER_ID_BLOCK_SIZE); + return ControllerResult.of( + Collections.singletonList(new ApiMessageAndVersion(record, (short) 0)), ResultOrError.of(block)); + } + + void replay(ProducerIdsRecord record) { + long currentProducerId = lastProducerId.getOrDefault(PRODUCER_ID_KEY, 0L); + if (record.producerIdsEnd() <= currentProducerId) { + throw new RuntimeException("Producer ID from record is not monotonically increasing"); + } else { + lastProducerId.put(PRODUCER_ID_KEY, record.producerIdsEnd()); + } + } + + Iterator> iterator(long epoch) { + List records = new ArrayList<>(1); + + long producerId = 0L; + for (Map.Entry entry : lastProducerId.entrySet(epoch)) { + if (entry.getKey() == PRODUCER_ID_KEY) { + producerId = lastProducerId.getOrDefault(PRODUCER_ID_KEY, 0L); + } else { + throw new IllegalStateException("Unexpected key in producer ids map " + entry.getKey()); + } + } + if (producerId > 0) { + records.add(new ApiMessageAndVersion( + new ProducerIdsRecord() + .setProducerIdsEnd(producerId) + .setBrokerId(0) + .setBrokerEpoch(0L), + (short) 0)); + } + return Collections.singleton(records).iterator(); + } +} diff --git a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java index 746d9068488d9..03abe0f2e3b0b 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java +++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java @@ -24,6 +24,8 @@ import org.apache.kafka.common.errors.ApiException; import org.apache.kafka.common.errors.NotControllerException; import org.apache.kafka.common.errors.UnknownServerException; +import org.apache.kafka.common.message.AllocateProducerIdsRequestData; +import org.apache.kafka.common.message.AllocateProducerIdsResponseData; import org.apache.kafka.common.message.AlterIsrRequestData; import org.apache.kafka.common.message.AlterIsrResponseData; import org.apache.kafka.common.message.AlterPartitionReassignmentsRequestData; @@ -44,6 +46,7 @@ import org.apache.kafka.common.metadata.MetadataRecordType; import org.apache.kafka.common.metadata.PartitionChangeRecord; import org.apache.kafka.common.metadata.PartitionRecord; +import org.apache.kafka.common.metadata.ProducerIdsRecord; import org.apache.kafka.common.metadata.QuotaRecord; import org.apache.kafka.common.metadata.RegisterBrokerRecord; import org.apache.kafka.common.metadata.RemoveTopicRecord; @@ -359,7 +362,8 @@ void createSnapshotGenerator(long epoch) { new Section("cluster", clusterControl.iterator(epoch)), new Section("replication", replicationControl.iterator(epoch)), new Section("configuration", configurationControl.iterator(epoch)), - new Section("clientQuotas", clientQuotaControlManager.iterator(epoch)))); + new Section("clientQuotas", clientQuotaControlManager.iterator(epoch)), + new Section("producerIds", producerIdControlManager.iterator(epoch)))); reschedule(0); } @@ -855,6 +859,9 @@ private void replay(ApiMessage message, Optional snapshotId, lon case QUOTA_RECORD: clientQuotaControlManager.replay((QuotaRecord) message); break; + case PRODUCER_IDS_RECORD: + producerIdControlManager.replay((ProducerIdsRecord) message); + break; default: throw new RuntimeException("Unhandled record type " + type); } @@ -929,6 +936,12 @@ private void replay(ApiMessage message, Optional snapshotId, lon */ private final FeatureControlManager featureControl; + /** + * An object which stores the controller's view of the latest producer ID + * that has been generated. This must be accessed only by the event queue thread. + */ + private final ProducerIdControlManager producerIdControlManager; + /** * An object which stores the controller's view of topics and partitions. * This must be accessed only by the event queue thread. @@ -995,6 +1008,7 @@ private QuorumController(LogContext logContext, this.clusterControl = new ClusterControlManager(logContext, time, snapshotRegistry, sessionTimeoutNs, replicaPlacer); this.featureControl = new FeatureControlManager(supportedFeatures, snapshotRegistry); + this.producerIdControlManager = new ProducerIdControlManager(clusterControl, snapshotRegistry); this.snapshotGeneratorManager = new SnapshotGeneratorManager(snapshotWriterBuilder); this.replicationControl = new ReplicationControlManager(snapshotRegistry, logContext, defaultReplicationFactor, defaultNumPartitions, @@ -1199,6 +1213,22 @@ public CompletableFuture> alterClientQuotas( }); } + @Override + public CompletableFuture allocateProducerIds( + AllocateProducerIdsRequestData request) { + return appendWriteEvent("allocateProducerIds", + () -> producerIdControlManager.generateNextProducerId(request.brokerId(), request.brokerEpoch())) + .thenApply(range -> { + if (range.isError()) { + return new AllocateProducerIdsResponseData().setErrorCode(range.error().error().code()); + } else { + return new AllocateProducerIdsResponseData() + .setProducerIdStart(range.result().producerIdStart()) + .setProducerIdLen(range.result().producerIdLen()); + } + }); + } + @Override public CompletableFuture> createPartitions(long deadlineNs, List topics) { diff --git a/metadata/src/main/java/org/apache/kafka/controller/ResultOrError.java b/metadata/src/main/java/org/apache/kafka/controller/ResultOrError.java index 2fedacdb5e1c8..6a548c4e40255 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/ResultOrError.java +++ b/metadata/src/main/java/org/apache/kafka/controller/ResultOrError.java @@ -42,6 +42,14 @@ public ResultOrError(T result) { this.result = result; } + public static ResultOrError of(T result) { + return new ResultOrError<>(result); + } + + public static ResultOrError of(ApiError error) { + return new ResultOrError<>(error); + } + public boolean isError() { return error != null; } diff --git a/metadata/src/main/resources/common/metadata/ProducerIdsRecord.json b/metadata/src/main/resources/common/metadata/ProducerIdsRecord.json new file mode 100644 index 0000000000000..774fd482379a2 --- /dev/null +++ b/metadata/src/main/resources/common/metadata/ProducerIdsRecord.json @@ -0,0 +1,29 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +{ + "apiKey": 15, + "type": "metadata", + "name": "ProducerIdsRecord", + "validVersions": "0", + "fields": [ + { "name": "BrokerId", "type": "int32", "versions": "0+", "entityType": "brokerId", + "about": "The ID of the requesting broker" }, + { "name": "BrokerEpoch", "type": "int64", "versions": "0+", "default": "-1", + "about": "The epoch of the requesting broker" }, + { "name": "ProducerIdsEnd", "type": "int64", "versions": "0+", + "about": "The highest producer ID that has been generated"} + ] +} \ No newline at end of file diff --git a/metadata/src/test/java/org/apache/kafka/controller/ProducerIdControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/ProducerIdControlManagerTest.java new file mode 100644 index 0000000000000..23bbaf0403444 --- /dev/null +++ b/metadata/src/test/java/org/apache/kafka/controller/ProducerIdControlManagerTest.java @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.controller; + +import org.apache.kafka.common.metadata.ProducerIdsRecord; +import org.apache.kafka.common.metadata.RegisterBrokerRecord; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.security.auth.SecurityProtocol; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.MockTime; +import org.apache.kafka.server.common.ApiMessageAndVersion; +import org.apache.kafka.server.common.ProducerIdsBlock; +import org.apache.kafka.timeline.SnapshotRegistry; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.Iterator; +import java.util.List; +import java.util.Random; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + + +public class ProducerIdControlManagerTest { + + private SnapshotRegistry snapshotRegistry; + private ClusterControlManager clusterControl; + private ProducerIdControlManager producerIdControlManager; + + @BeforeEach + public void setUp() { + final LogContext logContext = new LogContext(); + final MockTime time = new MockTime(); + final Random random = new Random(); + snapshotRegistry = new SnapshotRegistry(logContext); + clusterControl = new ClusterControlManager( + logContext, time, snapshotRegistry, 1000, + new SimpleReplicaPlacementPolicy(random)); + + clusterControl.activate(); + for (int i = 0; i < 4; i++) { + RegisterBrokerRecord brokerRecord = new RegisterBrokerRecord().setBrokerEpoch(100).setBrokerId(i); + brokerRecord.endPoints().add(new RegisterBrokerRecord.BrokerEndpoint(). + setSecurityProtocol(SecurityProtocol.PLAINTEXT.id). + setPort((short) 9092). + setName("PLAINTEXT"). + setHost(String.format("broker-%02d.example.org", i))); + clusterControl.replay(brokerRecord); + } + + this.producerIdControlManager = new ProducerIdControlManager(clusterControl, snapshotRegistry); + } + + @Test + public void testInitialResult() { + ControllerResult> result = + producerIdControlManager.generateNextProducerId(1, 100); + assertEquals(0, result.response().result().producerIdStart()); + assertEquals(1000, result.response().result().producerIdLen()); + ProducerIdsRecord record = (ProducerIdsRecord) result.records().get(0).message(); + assertEquals(1000, record.producerIdsEnd()); + } + + @Test + public void testMonotonic() { + producerIdControlManager.replay( + new ProducerIdsRecord() + .setBrokerId(1) + .setBrokerEpoch(100) + .setProducerIdsEnd(42)); + + ProducerIdsBlock range = + producerIdControlManager.generateNextProducerId(1, 100).response().result(); + assertEquals(42, range.producerIdStart()); + + // Can't go backwards in Producer IDs + assertThrows(RuntimeException.class, () -> { + producerIdControlManager.replay( + new ProducerIdsRecord() + .setBrokerId(1) + .setBrokerEpoch(100) + .setProducerIdsEnd(40)); + }, "Producer ID range must only increase"); + range = producerIdControlManager.generateNextProducerId(1, 100).response().result(); + assertEquals(42, range.producerIdStart()); + + // Gaps in the ID range are okay. + producerIdControlManager.replay( + new ProducerIdsRecord() + .setBrokerId(1) + .setBrokerEpoch(100) + .setProducerIdsEnd(50)); + range = producerIdControlManager.generateNextProducerId(1, 100).response().result(); + assertEquals(50, range.producerIdStart()); + } + + @Test + public void testUnknownBrokerOrEpoch() { + ControllerResult> result; + + result = producerIdControlManager.generateNextProducerId(99, 0); + assertEquals(Errors.STALE_BROKER_EPOCH, result.response().error().error()); + + result = producerIdControlManager.generateNextProducerId(1, 99); + assertEquals(Errors.STALE_BROKER_EPOCH, result.response().error().error()); + } + + @Test + public void testMaxValue() { + producerIdControlManager.replay( + new ProducerIdsRecord() + .setBrokerId(1) + .setBrokerEpoch(100) + .setProducerIdsEnd(Long.MAX_VALUE - 1)); + + ControllerResult> result = + producerIdControlManager.generateNextProducerId(1, 100); + assertEquals(Errors.UNKNOWN_SERVER_ERROR, result.response().error().error()); + } + + @Test + public void testSnapshotIterator() { + ProducerIdsBlock range = null; + for (int i = 0; i < 100; i++) { + range = generateProducerIds(producerIdControlManager, i % 4, 100); + } + + Iterator> snapshotIterator = producerIdControlManager.iterator(Long.MAX_VALUE); + assertTrue(snapshotIterator.hasNext()); + List batch = snapshotIterator.next(); + assertEquals(1, batch.size(), "Producer IDs record batch should only contain a single record"); + assertEquals(range.producerIdStart() + range.producerIdLen(), ((ProducerIdsRecord) batch.get(0).message()).producerIdsEnd()); + assertFalse(snapshotIterator.hasNext(), "Producer IDs iterator should only contain a single batch"); + + ProducerIdControlManager newProducerIdManager = new ProducerIdControlManager(clusterControl, snapshotRegistry); + snapshotIterator = producerIdControlManager.iterator(Long.MAX_VALUE); + while (snapshotIterator.hasNext()) { + snapshotIterator.next().forEach(message -> newProducerIdManager.replay((ProducerIdsRecord) message.message())); + } + + // Verify that after reloading state from this "snapshot", we don't produce any overlapping IDs + long lastProducerID = range.producerIdStart() + range.producerIdLen() - 1; + range = generateProducerIds(producerIdControlManager, 1, 100); + assertTrue(range.producerIdStart() > lastProducerID); + } + + static ProducerIdsBlock generateProducerIds( + ProducerIdControlManager producerIdControlManager, int brokerId, long brokerEpoch) { + ControllerResult> result = + producerIdControlManager.generateNextProducerId(brokerId, brokerEpoch); + assertFalse(result.response().isError()); + result.records().forEach(apiMessageAndVersion -> + producerIdControlManager.replay((ProducerIdsRecord) apiMessageAndVersion.message())); + return result.response().result(); + } +} From 07181d9e021b2412b59b9dbdb969ca5bd2e374f3 Mon Sep 17 00:00:00 2001 From: David Arthur Date: Mon, 24 May 2021 10:49:19 -0400 Subject: [PATCH 2/5] Used TimelineLong --- .../metadata/BrokerMetadataListener.scala | 2 +- .../controller/ProducerIdControlManager.java | 30 +++++++------------ .../apache/kafka/timeline/TimelineLong.java | 6 +++- .../ProducerIdControlManagerTest.java | 2 +- 4 files changed, 17 insertions(+), 23 deletions(-) diff --git a/core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala b/core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala index a10f7ba1bcc0c..1dead27cfb2fa 100644 --- a/core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala +++ b/core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala @@ -264,7 +264,7 @@ class BrokerMetadataListener( // no-op } - class HandleNewLeaderEvent(leader: MetaLogLeader) + class HandleNewLeaderEvent(leaderAndEpoch: LeaderAndEpoch) extends EventQueue.FailureLoggingEvent(log) { override def run(): Unit = { val imageBuilder = diff --git a/metadata/src/main/java/org/apache/kafka/controller/ProducerIdControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/ProducerIdControlManager.java index 72c07d216a999..dfe6282d559c7 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/ProducerIdControlManager.java +++ b/metadata/src/main/java/org/apache/kafka/controller/ProducerIdControlManager.java @@ -23,27 +23,24 @@ import org.apache.kafka.server.common.ApiMessageAndVersion; import org.apache.kafka.server.common.ProducerIdsBlock; import org.apache.kafka.timeline.SnapshotRegistry; -import org.apache.kafka.timeline.TimelineHashMap; +import org.apache.kafka.timeline.TimelineLong; import java.util.ArrayList; import java.util.Collections; import java.util.Iterator; import java.util.List; -import java.util.Map; import static org.apache.kafka.common.protocol.Errors.UNKNOWN_SERVER_ERROR; public class ProducerIdControlManager { - private static final Object PRODUCER_ID_KEY = new Object(); - private static final int PRODUCER_ID_BLOCK_SIZE = 1000; private final ClusterControlManager clusterControlManager; - private final TimelineHashMap lastProducerId; + private final TimelineLong lastProducerId; ProducerIdControlManager(ClusterControlManager clusterControlManager, SnapshotRegistry snapshotRegistry) { this.clusterControlManager = clusterControlManager; - this.lastProducerId = new TimelineHashMap<>(snapshotRegistry, 0); + this.lastProducerId = new TimelineLong(snapshotRegistry, 0L); } ControllerResult> generateNextProducerId(int brokerId, long brokerEpoch) { @@ -53,45 +50,38 @@ ControllerResult> generateNextProducerId(int bro return ControllerResult.of(Collections.emptyList(), ResultOrError.of(ApiError.fromThrowable(e))); } - long producerId = lastProducerId.getOrDefault(PRODUCER_ID_KEY, 0L); + long producerId = lastProducerId.get(); - if (producerId > Long.MAX_VALUE - PRODUCER_ID_BLOCK_SIZE) { + if (producerId > Long.MAX_VALUE - ProducerIdsBlock.PRODUCER_ID_BLOCK_SIZE) { ApiError error = new ApiError(UNKNOWN_SERVER_ERROR, "Exhausted all producerIds as the next block's end producerId " + "is will has exceeded long type limit"); return ControllerResult.of(Collections.emptyList(), ResultOrError.of(error)); } - long nextProducerId = producerId + PRODUCER_ID_BLOCK_SIZE; + long nextProducerId = producerId + ProducerIdsBlock.PRODUCER_ID_BLOCK_SIZE; ProducerIdsRecord record = new ProducerIdsRecord() .setProducerIdsEnd(nextProducerId) .setBrokerId(brokerId) .setBrokerEpoch(brokerEpoch); - ProducerIdsBlock block = new ProducerIdsBlock(-1, producerId, PRODUCER_ID_BLOCK_SIZE); + ProducerIdsBlock block = new ProducerIdsBlock(brokerId, producerId, ProducerIdsBlock.PRODUCER_ID_BLOCK_SIZE); return ControllerResult.of( Collections.singletonList(new ApiMessageAndVersion(record, (short) 0)), ResultOrError.of(block)); } void replay(ProducerIdsRecord record) { - long currentProducerId = lastProducerId.getOrDefault(PRODUCER_ID_KEY, 0L); + long currentProducerId = lastProducerId.get(); if (record.producerIdsEnd() <= currentProducerId) { throw new RuntimeException("Producer ID from record is not monotonically increasing"); } else { - lastProducerId.put(PRODUCER_ID_KEY, record.producerIdsEnd()); + lastProducerId.set(record.producerIdsEnd()); } } Iterator> iterator(long epoch) { List records = new ArrayList<>(1); - long producerId = 0L; - for (Map.Entry entry : lastProducerId.entrySet(epoch)) { - if (entry.getKey() == PRODUCER_ID_KEY) { - producerId = lastProducerId.getOrDefault(PRODUCER_ID_KEY, 0L); - } else { - throw new IllegalStateException("Unexpected key in producer ids map " + entry.getKey()); - } - } + long producerId = lastProducerId.get(epoch); if (producerId > 0) { records.add(new ApiMessageAndVersion( new ProducerIdsRecord() diff --git a/metadata/src/main/java/org/apache/kafka/timeline/TimelineLong.java b/metadata/src/main/java/org/apache/kafka/timeline/TimelineLong.java index e057391c4bcd3..36a300ff94998 100644 --- a/metadata/src/main/java/org/apache/kafka/timeline/TimelineLong.java +++ b/metadata/src/main/java/org/apache/kafka/timeline/TimelineLong.java @@ -47,8 +47,12 @@ public void mergeFrom(long destinationEpoch, Delta delta) { private long value; public TimelineLong(SnapshotRegistry snapshotRegistry) { + this(snapshotRegistry, 0L); + } + + public TimelineLong(SnapshotRegistry snapshotRegistry, long value) { this.snapshotRegistry = snapshotRegistry; - this.value = 0; + this.value = value; } public long get() { diff --git a/metadata/src/test/java/org/apache/kafka/controller/ProducerIdControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/ProducerIdControlManagerTest.java index 23bbaf0403444..a3eb9737ac276 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/ProducerIdControlManagerTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/ProducerIdControlManagerTest.java @@ -53,7 +53,7 @@ public void setUp() { snapshotRegistry = new SnapshotRegistry(logContext); clusterControl = new ClusterControlManager( logContext, time, snapshotRegistry, 1000, - new SimpleReplicaPlacementPolicy(random)); + new StripedReplicaPlacer(random)); clusterControl.activate(); for (int i = 0; i < 4; i++) { From b3aa2b61070be2b48bda19914b08d78250eecfb6 Mon Sep 17 00:00:00 2001 From: David Arthur Date: Mon, 24 May 2021 13:51:33 -0400 Subject: [PATCH 3/5] Better name in callback --- .../org/apache/kafka/controller/QuorumController.java | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java index 03abe0f2e3b0b..cf509434f738c 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java +++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java @@ -1218,13 +1218,13 @@ public CompletableFuture allocateProducerIds( AllocateProducerIdsRequestData request) { return appendWriteEvent("allocateProducerIds", () -> producerIdControlManager.generateNextProducerId(request.brokerId(), request.brokerEpoch())) - .thenApply(range -> { - if (range.isError()) { - return new AllocateProducerIdsResponseData().setErrorCode(range.error().error().code()); + .thenApply(resultOrError -> { + if (resultOrError.isError()) { + return new AllocateProducerIdsResponseData().setErrorCode(resultOrError.error().error().code()); } else { return new AllocateProducerIdsResponseData() - .setProducerIdStart(range.result().producerIdStart()) - .setProducerIdLen(range.result().producerIdLen()); + .setProducerIdStart(resultOrError.result().producerIdStart()) + .setProducerIdLen(resultOrError.result().producerIdLen()); } }); } From 252f7027db34a266c90509dd44b542ef87de9075 Mon Sep 17 00:00:00 2001 From: David Arthur Date: Mon, 24 May 2021 14:32:47 -0400 Subject: [PATCH 4/5] Add newline --- .../src/main/resources/common/metadata/ProducerIdsRecord.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/metadata/src/main/resources/common/metadata/ProducerIdsRecord.json b/metadata/src/main/resources/common/metadata/ProducerIdsRecord.json index 774fd482379a2..09e6b536129e2 100644 --- a/metadata/src/main/resources/common/metadata/ProducerIdsRecord.json +++ b/metadata/src/main/resources/common/metadata/ProducerIdsRecord.json @@ -26,4 +26,4 @@ { "name": "ProducerIdsEnd", "type": "int64", "versions": "0+", "about": "The highest producer ID that has been generated"} ] -} \ No newline at end of file +} From 96293e0da3bee04afc2c07da17c7c20b03c76813 Mon Sep 17 00:00:00 2001 From: David Arthur Date: Wed, 26 May 2021 15:43:43 -0400 Subject: [PATCH 5/5] PR feedback * Adding integration tests for authz * Fixing failing unit test (and adding case for ProducerIdsRecord) --- .../scala/kafka/server/BrokerServer.scala | 5 +-- .../metadata/BrokerMetadataListener.scala | 3 +- .../kafka/server/ControllerApisTest.scala | 8 +++++ .../controller/ProducerIdControlManager.java | 20 +++-------- .../kafka/controller/QuorumController.java | 12 ++----- .../ProducerIdControlManagerTest.java | 35 +++++++++---------- .../controller/QuorumControllerTest.java | 10 +++++- 7 files changed, 47 insertions(+), 46 deletions(-) diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala b/core/src/main/scala/kafka/server/BrokerServer.scala index 0a7a64e93f730..4c76903173ab3 100644 --- a/core/src/main/scala/kafka/server/BrokerServer.scala +++ b/core/src/main/scala/kafka/server/BrokerServer.scala @@ -22,6 +22,7 @@ import java.util.concurrent.atomic.AtomicBoolean import java.util.concurrent.locks.ReentrantLock import java.util.concurrent.{CompletableFuture, TimeUnit, TimeoutException} import java.net.InetAddress + import kafka.cluster.Broker.ServerInfo import kafka.coordinator.group.GroupCoordinator import kafka.coordinator.transaction.{ProducerIdManager, TransactionCoordinator} @@ -243,8 +244,6 @@ class BrokerServer( // Hardcode Time.SYSTEM for now as some Streams tests fail otherwise, it would be good to fix the underlying issue groupCoordinator = GroupCoordinator(config, replicaManager, Time.SYSTEM, metrics) - // Create transaction coordinator, but don't start it until we've started replica manager. - // Hardcode Time.SYSTEM for now as some Streams tests fail otherwise, it would be good to fix the underlying issue val producerIdManagerSupplier = () => ProducerIdManager.rpc( config.brokerId, brokerEpochSupplier = () => lifecycleManager.brokerEpoch(), @@ -252,6 +251,8 @@ class BrokerServer( config.requestTimeoutMs ) + // Create transaction coordinator, but don't start it until we've started replica manager. + // Hardcode Time.SYSTEM for now as some Streams tests fail otherwise, it would be good to fix the underlying issue transactionCoordinator = TransactionCoordinator(config, replicaManager, new KafkaScheduler(threads = 1, threadNamePrefix = "transaction-log-manager-"), producerIdManagerSupplier, metrics, metadataCache, Time.SYSTEM) diff --git a/core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala b/core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala index 1dead27cfb2fa..c7085f176d229 100644 --- a/core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala +++ b/core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala @@ -261,7 +261,8 @@ class BrokerMetadataListener( } def handleProducerIdRecord(record: ProducerIdsRecord): Unit = { - // no-op + // This is a no-op since brokers get their producer ID blocks directly from the controller via + // AllocateProducerIds RPC response } class HandleNewLeaderEvent(leaderAndEpoch: LeaderAndEpoch) diff --git a/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala b/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala index b0fa2b36b940f..59eb2b8f66bad 100644 --- a/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala +++ b/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala @@ -409,6 +409,14 @@ class ControllerApisTest { new AlterPartitionReassignmentsRequestData()).build()))) } + @Test + def testUnauthorizedHandleAllocateProducerIds(): Unit = { + assertThrows(classOf[ClusterAuthorizationException], () => createControllerApis( + Some(createDenyAllAuthorizer()), new MockController.Builder().build()). + handleAllocateProducerIdsRequest(buildRequest(new AllocateProducerIdsRequest.Builder( + new AllocateProducerIdsRequestData()).build()))) + } + @Test def testUnauthorizedHandleListPartitionReassignments(): Unit = { assertThrows(classOf[ClusterAuthorizationException], () => createControllerApis( diff --git a/metadata/src/main/java/org/apache/kafka/controller/ProducerIdControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/ProducerIdControlManager.java index dfe6282d559c7..924605c6d910f 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/ProducerIdControlManager.java +++ b/metadata/src/main/java/org/apache/kafka/controller/ProducerIdControlManager.java @@ -17,9 +17,8 @@ package org.apache.kafka.controller; -import org.apache.kafka.common.errors.StaleBrokerEpochException; +import org.apache.kafka.common.errors.UnknownServerException; import org.apache.kafka.common.metadata.ProducerIdsRecord; -import org.apache.kafka.common.requests.ApiError; import org.apache.kafka.server.common.ApiMessageAndVersion; import org.apache.kafka.server.common.ProducerIdsBlock; import org.apache.kafka.timeline.SnapshotRegistry; @@ -30,8 +29,6 @@ import java.util.Iterator; import java.util.List; -import static org.apache.kafka.common.protocol.Errors.UNKNOWN_SERVER_ERROR; - public class ProducerIdControlManager { @@ -43,20 +40,14 @@ public class ProducerIdControlManager { this.lastProducerId = new TimelineLong(snapshotRegistry, 0L); } - ControllerResult> generateNextProducerId(int brokerId, long brokerEpoch) { - try { - clusterControlManager.checkBrokerEpoch(brokerId, brokerEpoch); - } catch (StaleBrokerEpochException e) { - return ControllerResult.of(Collections.emptyList(), ResultOrError.of(ApiError.fromThrowable(e))); - } + ControllerResult generateNextProducerId(int brokerId, long brokerEpoch) { + clusterControlManager.checkBrokerEpoch(brokerId, brokerEpoch); long producerId = lastProducerId.get(); if (producerId > Long.MAX_VALUE - ProducerIdsBlock.PRODUCER_ID_BLOCK_SIZE) { - ApiError error = new ApiError(UNKNOWN_SERVER_ERROR, - "Exhausted all producerIds as the next block's end producerId " + + throw new UnknownServerException("Exhausted all producerIds as the next block's end producerId " + "is will has exceeded long type limit"); - return ControllerResult.of(Collections.emptyList(), ResultOrError.of(error)); } long nextProducerId = producerId + ProducerIdsBlock.PRODUCER_ID_BLOCK_SIZE; @@ -65,8 +56,7 @@ ControllerResult> generateNextProducerId(int bro .setBrokerId(brokerId) .setBrokerEpoch(brokerEpoch); ProducerIdsBlock block = new ProducerIdsBlock(brokerId, producerId, ProducerIdsBlock.PRODUCER_ID_BLOCK_SIZE); - return ControllerResult.of( - Collections.singletonList(new ApiMessageAndVersion(record, (short) 0)), ResultOrError.of(block)); + return ControllerResult.of(Collections.singletonList(new ApiMessageAndVersion(record, (short) 0)), block); } void replay(ProducerIdsRecord record) { diff --git a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java index cf509434f738c..bee0b69eff4e3 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java +++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java @@ -1218,15 +1218,9 @@ public CompletableFuture allocateProducerIds( AllocateProducerIdsRequestData request) { return appendWriteEvent("allocateProducerIds", () -> producerIdControlManager.generateNextProducerId(request.brokerId(), request.brokerEpoch())) - .thenApply(resultOrError -> { - if (resultOrError.isError()) { - return new AllocateProducerIdsResponseData().setErrorCode(resultOrError.error().error().code()); - } else { - return new AllocateProducerIdsResponseData() - .setProducerIdStart(resultOrError.result().producerIdStart()) - .setProducerIdLen(resultOrError.result().producerIdLen()); - } - }); + .thenApply(result -> new AllocateProducerIdsResponseData() + .setProducerIdStart(result.producerIdStart()) + .setProducerIdLen(result.producerIdLen())); } @Override diff --git a/metadata/src/test/java/org/apache/kafka/controller/ProducerIdControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/ProducerIdControlManagerTest.java index a3eb9737ac276..f96510ddae11e 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/ProducerIdControlManagerTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/ProducerIdControlManagerTest.java @@ -17,9 +17,10 @@ package org.apache.kafka.controller; +import org.apache.kafka.common.errors.StaleBrokerEpochException; +import org.apache.kafka.common.errors.UnknownServerException; import org.apache.kafka.common.metadata.ProducerIdsRecord; import org.apache.kafka.common.metadata.RegisterBrokerRecord; -import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.security.auth.SecurityProtocol; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.MockTime; @@ -71,10 +72,10 @@ public void setUp() { @Test public void testInitialResult() { - ControllerResult> result = + ControllerResult result = producerIdControlManager.generateNextProducerId(1, 100); - assertEquals(0, result.response().result().producerIdStart()); - assertEquals(1000, result.response().result().producerIdLen()); + assertEquals(0, result.response().producerIdStart()); + assertEquals(1000, result.response().producerIdLen()); ProducerIdsRecord record = (ProducerIdsRecord) result.records().get(0).message(); assertEquals(1000, record.producerIdsEnd()); } @@ -88,7 +89,7 @@ public void testMonotonic() { .setProducerIdsEnd(42)); ProducerIdsBlock range = - producerIdControlManager.generateNextProducerId(1, 100).response().result(); + producerIdControlManager.generateNextProducerId(1, 100).response(); assertEquals(42, range.producerIdStart()); // Can't go backwards in Producer IDs @@ -99,7 +100,7 @@ public void testMonotonic() { .setBrokerEpoch(100) .setProducerIdsEnd(40)); }, "Producer ID range must only increase"); - range = producerIdControlManager.generateNextProducerId(1, 100).response().result(); + range = producerIdControlManager.generateNextProducerId(1, 100).response(); assertEquals(42, range.producerIdStart()); // Gaps in the ID range are okay. @@ -108,19 +109,19 @@ public void testMonotonic() { .setBrokerId(1) .setBrokerEpoch(100) .setProducerIdsEnd(50)); - range = producerIdControlManager.generateNextProducerId(1, 100).response().result(); + range = producerIdControlManager.generateNextProducerId(1, 100).response(); assertEquals(50, range.producerIdStart()); } @Test public void testUnknownBrokerOrEpoch() { - ControllerResult> result; + ControllerResult result; - result = producerIdControlManager.generateNextProducerId(99, 0); - assertEquals(Errors.STALE_BROKER_EPOCH, result.response().error().error()); + assertThrows(StaleBrokerEpochException.class, () -> + producerIdControlManager.generateNextProducerId(99, 0)); - result = producerIdControlManager.generateNextProducerId(1, 99); - assertEquals(Errors.STALE_BROKER_EPOCH, result.response().error().error()); + assertThrows(StaleBrokerEpochException.class, () -> + producerIdControlManager.generateNextProducerId(1, 99)); } @Test @@ -131,9 +132,8 @@ public void testMaxValue() { .setBrokerEpoch(100) .setProducerIdsEnd(Long.MAX_VALUE - 1)); - ControllerResult> result = - producerIdControlManager.generateNextProducerId(1, 100); - assertEquals(Errors.UNKNOWN_SERVER_ERROR, result.response().error().error()); + assertThrows(UnknownServerException.class, () -> + producerIdControlManager.generateNextProducerId(1, 100)); } @Test @@ -164,11 +164,10 @@ public void testSnapshotIterator() { static ProducerIdsBlock generateProducerIds( ProducerIdControlManager producerIdControlManager, int brokerId, long brokerEpoch) { - ControllerResult> result = + ControllerResult result = producerIdControlManager.generateNextProducerId(brokerId, brokerEpoch); - assertFalse(result.response().isError()); result.records().forEach(apiMessageAndVersion -> producerIdControlManager.replay((ProducerIdsRecord) apiMessageAndVersion.message())); - return result.response().result(); + return result.response(); } } diff --git a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java index c6114dee0891b..5a39f82c98cfc 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java @@ -34,6 +34,7 @@ import org.apache.kafka.common.Uuid; import org.apache.kafka.common.config.ConfigResource; import org.apache.kafka.common.errors.TimeoutException; +import org.apache.kafka.common.message.AllocateProducerIdsRequestData; import org.apache.kafka.common.message.AlterPartitionReassignmentsRequestData.ReassignableTopic; import org.apache.kafka.common.message.AlterPartitionReassignmentsRequestData; import org.apache.kafka.common.message.AlterPartitionReassignmentsResponseData; @@ -54,6 +55,7 @@ import org.apache.kafka.common.message.ListPartitionReassignmentsRequestData; import org.apache.kafka.common.message.ListPartitionReassignmentsResponseData; import org.apache.kafka.common.metadata.PartitionRecord; +import org.apache.kafka.common.metadata.ProducerIdsRecord; import org.apache.kafka.common.metadata.RegisterBrokerRecord.BrokerEndpoint; import org.apache.kafka.common.metadata.RegisterBrokerRecord.BrokerEndpointCollection; import org.apache.kafka.common.metadata.RegisterBrokerRecord; @@ -267,6 +269,8 @@ public void testSnapshotSaveAndLoad() throws Throwable { setBrokerIds(Arrays.asList(1, 2, 0))). iterator()))).iterator()))).get(); fooId = fooData.topics().find("foo").topicId(); + active.allocateProducerIds( + new AllocateProducerIdsRequestData().setBrokerId(0).setBrokerEpoch(brokerEpochs.get(0))).get(); long snapshotEpoch = active.beginWritingSnapshot().get(); writer = snapshotWriterBuilder.writers.takeFirst(); assertEquals(snapshotEpoch, writer.epoch()); @@ -338,7 +342,11 @@ private void checkSnapshotContents(Uuid fooId, setEndPoints(new BrokerEndpointCollection(Arrays.asList( new BrokerEndpoint().setName("PLAINTEXT").setHost("localhost"). setPort(9095).setSecurityProtocol((short) 0)).iterator())). - setRack(null), (short) 0))), + setRack(null), (short) 0)), + Arrays.asList(new ApiMessageAndVersion(new ProducerIdsRecord(). + setBrokerId(0). + setBrokerEpoch(brokerEpochs.get(0)). + setProducerIdsEnd(1000), (short) 0))), iterator); }