diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java index eab77bdb88a07..1c9e118c0fdda 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java @@ -35,6 +35,7 @@ import org.apache.kafka.common.errors.TopicAuthorizationException; import org.apache.kafka.common.errors.TransactionAbortedException; import org.apache.kafka.common.errors.UnknownTopicOrPartitionException; +import org.apache.kafka.common.message.ProduceRequestData; import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.metrics.stats.Avg; import org.apache.kafka.common.metrics.stats.Max; @@ -59,6 +60,7 @@ import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.stream.Collectors; import static org.apache.kafka.common.record.RecordBatch.NO_TIMESTAMP; @@ -560,13 +562,24 @@ private void handleProduceResponse(ClientResponse response, Map entry : produceResponse.responses().entrySet()) { - TopicPartition tp = entry.getKey(); - ProduceResponse.PartitionResponse partResp = entry.getValue(); + produceResponse.data().responses().forEach(r -> r.partitionResponses().forEach(p -> { + TopicPartition tp = new TopicPartition(r.name(), p.index()); + ProduceResponse.PartitionResponse partResp = new ProduceResponse.PartitionResponse( + Errors.forCode(p.errorCode()), + p.baseOffset(), + p.logAppendTimeMs(), + p.logStartOffset(), + p.recordErrors() + .stream() + .map(e -> new ProduceResponse.RecordError(e.batchIndex(), e.batchIndexErrorMessage())) + .collect(Collectors.toList()), + p.errorMessage()); ProducerBatch batch = batches.get(tp); completeBatch(batch, partResp, correlationId, now); - } + })); this.sensors.recordLatency(response.destination(), response.requestLatencyMs()); } else { // this is the acks = 0 case, just complete all requests @@ -721,7 +734,6 @@ private void sendProduceRequest(long now, int destination, short acks, int timeo if (batches.isEmpty()) return; - Map produceRecordsByPartition = new HashMap<>(batches.size()); final Map recordsByPartition = new HashMap<>(batches.size()); // find the minimum magic version used when creating the record sets @@ -730,7 +742,7 @@ private void sendProduceRequest(long now, int destination, short acks, int timeo if (batch.magic() < minUsedMagic) minUsedMagic = batch.magic(); } - + ProduceRequestData.TopicProduceDataCollection tpd = new ProduceRequestData.TopicProduceDataCollection(); for (ProducerBatch batch : batches) { TopicPartition tp = batch.topicPartition; MemoryRecords records = batch.records(); @@ -744,7 +756,14 @@ private void sendProduceRequest(long now, int destination, short acks, int timeo // which is supporting the new magic version to one which doesn't, then we will need to convert. if (!records.hasMatchingMagic(minUsedMagic)) records = batch.records().downConvert(minUsedMagic, 0, time).records(); - produceRecordsByPartition.put(tp, records); + ProduceRequestData.TopicProduceData tpData = tpd.find(tp.topic()); + if (tpData == null) { + tpData = new ProduceRequestData.TopicProduceData().setName(tp.topic()); + tpd.add(tpData); + } + tpData.partitionData().add(new ProduceRequestData.PartitionProduceData() + .setIndex(tp.partition()) + .setRecords(records)); recordsByPartition.put(tp, batch); } @@ -752,8 +771,13 @@ private void sendProduceRequest(long now, int destination, short acks, int timeo if (transactionManager != null && transactionManager.isTransactional()) { transactionalId = transactionManager.transactionalId(); } - ProduceRequest.Builder requestBuilder = ProduceRequest.Builder.forMagic(minUsedMagic, acks, timeout, - produceRecordsByPartition, transactionalId); + + ProduceRequest.Builder requestBuilder = ProduceRequest.forMagic(minUsedMagic, + new ProduceRequestData() + .setAcks(acks) + .setTimeoutMs(timeout) + .setTransactionalId(transactionalId) + .setTopicData(tpd)); RequestCompletionHandler callback = response -> handleProduceResponse(response, recordsByPartition, time.milliseconds()); String nodeId = Integer.toString(destination); diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java index 7bed902837272..03ee450511c83 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java @@ -111,6 +111,8 @@ import org.apache.kafka.common.message.OffsetDeleteResponseData; import org.apache.kafka.common.message.OffsetFetchRequestData; import org.apache.kafka.common.message.OffsetFetchResponseData; +import org.apache.kafka.common.message.ProduceRequestData; +import org.apache.kafka.common.message.ProduceResponseData; import org.apache.kafka.common.message.RenewDelegationTokenRequestData; import org.apache.kafka.common.message.RenewDelegationTokenResponseData; import org.apache.kafka.common.message.SaslAuthenticateRequestData; @@ -138,8 +140,6 @@ import org.apache.kafka.common.record.RecordBatch; import org.apache.kafka.common.requests.OffsetsForLeaderEpochRequest; import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse; -import org.apache.kafka.common.requests.ProduceRequest; -import org.apache.kafka.common.requests.ProduceResponse; import java.nio.ByteBuffer; import java.util.Arrays; @@ -157,7 +157,7 @@ * Identifiers for all the Kafka APIs */ public enum ApiKeys { - PRODUCE(0, "Produce", ProduceRequest.schemaVersions(), ProduceResponse.schemaVersions()), + PRODUCE(0, "Produce", ProduceRequestData.SCHEMAS, ProduceResponseData.SCHEMAS), FETCH(1, "Fetch", FetchRequestData.SCHEMAS, FetchResponseData.SCHEMAS), LIST_OFFSETS(2, "ListOffsets", ListOffsetRequestData.SCHEMAS, ListOffsetResponseData.SCHEMAS), METADATA(3, "Metadata", MetadataRequestData.SCHEMAS, MetadataResponseData.SCHEMAS), diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/CommonFields.java b/clients/src/main/java/org/apache/kafka/common/protocol/CommonFields.java index fe756a8597e7a..122d4da602324 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/CommonFields.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/CommonFields.java @@ -32,14 +32,4 @@ public class CommonFields { "If the epoch provided by the client is larger than the current epoch known to the broker, then " + "the UNKNOWN_LEADER_EPOCH error code will be returned. If the provided epoch is smaller, then " + "the FENCED_LEADER_EPOCH error code will be returned."); - - // Group APIs - public static final Field.Str GROUP_ID = new Field.Str("group_id", "The unique group identifier"); - - // Transactional APIs - public static final Field.Str TRANSACTIONAL_ID = new Field.Str("transactional_id", "The transactional id corresponding to the transaction."); - public static final Field.NullableStr NULLABLE_TRANSACTIONAL_ID = new Field.NullableStr("transactional_id", - "The transactional id or null if the producer is not transactional"); - public static final Field.Int64 PRODUCER_ID = new Field.Int64("producer_id", "Current producer id in use by the transactional id."); - public static final Field.Int16 PRODUCER_EPOCH = new Field.Int16("producer_epoch", "Current epoch associated with the producer id."); } diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java index f2c6ed487258e..a0121666dfcf2 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java @@ -19,6 +19,7 @@ import org.apache.kafka.common.errors.UnsupportedVersionException; import org.apache.kafka.common.message.FetchRequestData; import org.apache.kafka.common.message.AlterIsrRequestData; +import org.apache.kafka.common.message.ProduceRequestData; import org.apache.kafka.common.network.NetworkSend; import org.apache.kafka.common.network.Send; import org.apache.kafka.common.protocol.ApiKeys; @@ -146,7 +147,7 @@ public Map errorCounts(Throwable e) { public static AbstractRequest parseRequest(ApiKeys apiKey, short apiVersion, Struct struct) { switch (apiKey) { case PRODUCE: - return new ProduceRequest(struct, apiVersion); + return new ProduceRequest(new ProduceRequestData(struct, apiVersion), apiVersion); case FETCH: return new FetchRequest(new FetchRequestData(struct, apiVersion), apiVersion); case LIST_OFFSETS: diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java index 2ff143c0eb4ab..104148d75c0da 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java @@ -19,6 +19,7 @@ import org.apache.kafka.common.message.EnvelopeResponseData; import org.apache.kafka.common.message.FetchResponseData; import org.apache.kafka.common.message.AlterIsrResponseData; +import org.apache.kafka.common.message.ProduceResponseData; import org.apache.kafka.common.network.NetworkSend; import org.apache.kafka.common.network.Send; import org.apache.kafka.common.protocol.ApiKeys; @@ -96,7 +97,7 @@ public static AbstractResponse parseResponse(ByteBuffer byteBuffer, RequestHeade public static AbstractResponse parseResponse(ApiKeys apiKey, Struct struct, short version) { switch (apiKey) { case PRODUCE: - return new ProduceResponse(struct); + return new ProduceResponse(new ProduceResponseData(struct, version)); case FETCH: return new FetchResponse<>(new FetchResponseData(struct, version)); case LIST_OFFSETS: diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java index 210b71da22158..6860a6f2327ed 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java @@ -19,162 +19,60 @@ import org.apache.kafka.common.InvalidRecordException; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.UnsupportedCompressionTypeException; +import org.apache.kafka.common.message.ProduceRequestData; +import org.apache.kafka.common.message.ProduceResponseData; +import org.apache.kafka.common.network.Send; import org.apache.kafka.common.protocol.ApiKeys; -import org.apache.kafka.common.protocol.CommonFields; +import org.apache.kafka.common.protocol.ByteBufferAccessor; import org.apache.kafka.common.protocol.Errors; -import org.apache.kafka.common.protocol.types.ArrayOf; -import org.apache.kafka.common.protocol.types.Field; -import org.apache.kafka.common.protocol.types.Schema; +import org.apache.kafka.common.protocol.SendBuilder; import org.apache.kafka.common.protocol.types.Struct; +import org.apache.kafka.common.record.BaseRecords; import org.apache.kafka.common.record.CompressionType; -import org.apache.kafka.common.record.MemoryRecords; -import org.apache.kafka.common.record.MutableRecordBatch; import org.apache.kafka.common.record.RecordBatch; -import org.apache.kafka.common.utils.CollectionUtils; +import org.apache.kafka.common.record.Records; import org.apache.kafka.common.utils.Utils; import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.Iterator; -import java.util.List; import java.util.Map; +import java.util.stream.Collectors; -import static org.apache.kafka.common.protocol.CommonFields.NULLABLE_TRANSACTIONAL_ID; -import static org.apache.kafka.common.protocol.CommonFields.PARTITION_ID; -import static org.apache.kafka.common.protocol.CommonFields.TOPIC_NAME; -import static org.apache.kafka.common.protocol.types.Type.INT16; -import static org.apache.kafka.common.protocol.types.Type.INT32; -import static org.apache.kafka.common.protocol.types.Type.RECORDS; +import static org.apache.kafka.common.requests.ProduceResponse.INVALID_OFFSET; public class ProduceRequest extends AbstractRequest { - private static final String ACKS_KEY_NAME = "acks"; - private static final String TIMEOUT_KEY_NAME = "timeout"; - private static final String TOPIC_DATA_KEY_NAME = "topic_data"; - // topic level field names - private static final String PARTITION_DATA_KEY_NAME = "data"; - - // partition level field names - private static final String RECORD_SET_KEY_NAME = "record_set"; - - - private static final Schema TOPIC_PRODUCE_DATA_V0 = new Schema( - TOPIC_NAME, - new Field(PARTITION_DATA_KEY_NAME, new ArrayOf(new Schema( - PARTITION_ID, - new Field(RECORD_SET_KEY_NAME, RECORDS))))); - - private static final Schema PRODUCE_REQUEST_V0 = new Schema( - new Field(ACKS_KEY_NAME, INT16, "The number of acknowledgments the producer requires the leader to have " + - "received before considering a request complete. Allowed values: 0 for no acknowledgments, 1 for " + - "only the leader and -1 for the full ISR."), - new Field(TIMEOUT_KEY_NAME, INT32, "The time to await a response in ms."), - new Field(TOPIC_DATA_KEY_NAME, new ArrayOf(TOPIC_PRODUCE_DATA_V0))); - - /** - * The body of PRODUCE_REQUEST_V1 is the same as PRODUCE_REQUEST_V0. - * The version number is bumped up to indicate that the client supports quota throttle time field in the response. - */ - private static final Schema PRODUCE_REQUEST_V1 = PRODUCE_REQUEST_V0; - /** - * The body of PRODUCE_REQUEST_V2 is the same as PRODUCE_REQUEST_V1. - * The version number is bumped up to indicate that message format V1 is used which has relative offset and - * timestamp. - */ - private static final Schema PRODUCE_REQUEST_V2 = PRODUCE_REQUEST_V1; - - // Produce request V3 adds the transactional id which is used for authorization when attempting to write - // transactional data. This version also adds support for message format V2. - private static final Schema PRODUCE_REQUEST_V3 = new Schema( - CommonFields.NULLABLE_TRANSACTIONAL_ID, - new Field(ACKS_KEY_NAME, INT16, "The number of acknowledgments the producer requires the leader to have " + - "received before considering a request complete. Allowed values: 0 for no acknowledgments, 1 " + - "for only the leader and -1 for the full ISR."), - new Field(TIMEOUT_KEY_NAME, INT32, "The time to await a response in ms."), - new Field(TOPIC_DATA_KEY_NAME, new ArrayOf(TOPIC_PRODUCE_DATA_V0))); - - /** - * The body of PRODUCE_REQUEST_V4 is the same as PRODUCE_REQUEST_V3. - * The version number is bumped up to indicate that the client supports KafkaStorageException. - * The KafkaStorageException will be translated to NotLeaderOrFollowerException in the response if version <= 3 - */ - private static final Schema PRODUCE_REQUEST_V4 = PRODUCE_REQUEST_V3; - - /** - * The body of the PRODUCE_REQUEST_V5 is the same as PRODUCE_REQUEST_V4. - * The version number is bumped since the PRODUCE_RESPONSE_V5 includes an additional partition level - * field: the log_start_offset. - */ - private static final Schema PRODUCE_REQUEST_V5 = PRODUCE_REQUEST_V4; - - /** - * The version number is bumped to indicate that on quota violation brokers send out responses before throttling. - */ - private static final Schema PRODUCE_REQUEST_V6 = PRODUCE_REQUEST_V5; - - /** - * V7 bumped up to indicate ZStandard capability. (see KIP-110) - */ - private static final Schema PRODUCE_REQUEST_V7 = PRODUCE_REQUEST_V6; - - /** - * V8 bumped up to add two new fields record_errors offset list and error_message to {@link org.apache.kafka.common.requests.ProduceResponse.PartitionResponse} - * (See KIP-467) - */ - private static final Schema PRODUCE_REQUEST_V8 = PRODUCE_REQUEST_V7; + public static Builder forMagic(byte magic, ProduceRequestData data) { + // Message format upgrades correspond with a bump in the produce request version. Older + // message format versions are generally not supported by the produce request versions + // following the bump. + + final short minVersion; + final short maxVersion; + if (magic < RecordBatch.MAGIC_VALUE_V2) { + minVersion = 2; + maxVersion = 2; + } else { + minVersion = 3; + maxVersion = ApiKeys.PRODUCE.latestVersion(); + } + return new Builder(minVersion, maxVersion, data); + } - public static Schema[] schemaVersions() { - return new Schema[] {PRODUCE_REQUEST_V0, PRODUCE_REQUEST_V1, PRODUCE_REQUEST_V2, PRODUCE_REQUEST_V3, - PRODUCE_REQUEST_V4, PRODUCE_REQUEST_V5, PRODUCE_REQUEST_V6, PRODUCE_REQUEST_V7, PRODUCE_REQUEST_V8}; + public static Builder forCurrentMagic(ProduceRequestData data) { + return forMagic(RecordBatch.CURRENT_MAGIC_VALUE, data); } public static class Builder extends AbstractRequest.Builder { - private final short acks; - private final int timeout; - private final Map partitionRecords; - private final String transactionalId; - - public static Builder forCurrentMagic(short acks, - int timeout, - Map partitionRecords) { - return forMagic(RecordBatch.CURRENT_MAGIC_VALUE, acks, timeout, partitionRecords, null); - } - - public static Builder forMagic(byte magic, - short acks, - int timeout, - Map partitionRecords, - String transactionalId) { - // Message format upgrades correspond with a bump in the produce request version. Older - // message format versions are generally not supported by the produce request versions - // following the bump. - - final short minVersion; - final short maxVersion; - if (magic < RecordBatch.MAGIC_VALUE_V2) { - minVersion = 2; - maxVersion = 2; - } else { - minVersion = 3; - maxVersion = ApiKeys.PRODUCE.latestVersion(); - } - return new Builder(minVersion, maxVersion, acks, timeout, partitionRecords, transactionalId); - } + private final ProduceRequestData data; public Builder(short minVersion, short maxVersion, - short acks, - int timeout, - Map partitionRecords, - String transactionalId) { + ProduceRequestData data) { super(ApiKeys.PRODUCE, minVersion, maxVersion); - this.acks = acks; - this.timeout = timeout; - this.partitionRecords = partitionRecords; - this.transactionalId = transactionalId; + this.data = data; } @Override @@ -190,85 +88,82 @@ public ProduceRequest buildUnsafe(short version) { private ProduceRequest build(short version, boolean validate) { if (validate) { // Validate the given records first - for (MemoryRecords records : partitionRecords.values()) { - ProduceRequest.validateRecords(version, records); - } + data.topicData().forEach(tpd -> + tpd.partitionData().forEach(partitionProduceData -> + ProduceRequest.validateRecords(version, partitionProduceData.records()))); } - return new ProduceRequest(version, acks, timeout, partitionRecords, transactionalId); + return new ProduceRequest(data, version); } @Override public String toString() { StringBuilder bld = new StringBuilder(); bld.append("(type=ProduceRequest") - .append(", acks=").append(acks) - .append(", timeout=").append(timeout) - .append(", partitionRecords=(").append(partitionRecords) - .append("), transactionalId='").append(transactionalId != null ? transactionalId : "") + .append(", acks=").append(data.acks()) + .append(", timeout=").append(data.timeoutMs()) + .append(", partitionRecords=(").append(data.topicData().stream().flatMap(d -> d.partitionData().stream()).collect(Collectors.toList())) + .append("), transactionalId='").append(data.transactionalId() != null ? data.transactionalId() : "") .append("'"); return bld.toString(); } } + /** + * We have to copy acks, timeout, transactionalId and partitionSizes from data since data maybe reset to eliminate + * the reference to ByteBuffer but those metadata are still useful. + */ private final short acks; private final int timeout; private final String transactionalId; - - private final Map partitionSizes; - // This is set to null by `clearPartitionRecords` to prevent unnecessary memory retention when a produce request is // put in the purgatory (due to client throttling, it can take a while before the response is sent). // Care should be taken in methods that use this field. - private volatile Map partitionRecords; - private boolean hasTransactionalRecords = false; - private boolean hasIdempotentRecords = false; + private volatile ProduceRequestData data; + // the partitionSizes is lazily initialized since it is used by server-side in production. + private volatile Map partitionSizes; - private ProduceRequest(short version, short acks, int timeout, Map partitionRecords, String transactionalId) { + public ProduceRequest(ProduceRequestData produceRequestData, short version) { super(ApiKeys.PRODUCE, version); - this.acks = acks; - this.timeout = timeout; - - this.transactionalId = transactionalId; - this.partitionRecords = partitionRecords; - this.partitionSizes = createPartitionSizes(partitionRecords); - - for (MemoryRecords records : partitionRecords.values()) { - setFlags(records); - } + this.data = produceRequestData; + this.acks = data.acks(); + this.timeout = data.timeoutMs(); + this.transactionalId = data.transactionalId(); } - private static Map createPartitionSizes(Map partitionRecords) { - Map result = new HashMap<>(partitionRecords.size()); - for (Map.Entry entry : partitionRecords.entrySet()) - result.put(entry.getKey(), entry.getValue().sizeInBytes()); - return result; + @Override + public Send toSend(String destination, RequestHeader header) { + return SendBuilder.buildRequestSend(destination, header, dataOrException()); } - public ProduceRequest(Struct struct, short version) { - super(ApiKeys.PRODUCE, version); - partitionRecords = new HashMap<>(); - for (Object topicDataObj : struct.getArray(TOPIC_DATA_KEY_NAME)) { - Struct topicData = (Struct) topicDataObj; - String topic = topicData.get(TOPIC_NAME); - for (Object partitionResponseObj : topicData.getArray(PARTITION_DATA_KEY_NAME)) { - Struct partitionResponse = (Struct) partitionResponseObj; - int partition = partitionResponse.get(PARTITION_ID); - MemoryRecords records = (MemoryRecords) partitionResponse.getRecords(RECORD_SET_KEY_NAME); - setFlags(records); - partitionRecords.put(new TopicPartition(topic, partition), records); + // visible for testing + Map partitionSizes() { + if (partitionSizes == null) { + // this method may be called by different thread (see the comment on data) + synchronized (this) { + if (partitionSizes == null) { + partitionSizes = new HashMap<>(); + data.topicData().forEach(topicData -> + topicData.partitionData().forEach(partitionData -> + partitionSizes.compute(new TopicPartition(topicData.name(), partitionData.index()), + (ignored, previousValue) -> + partitionData.records().sizeInBytes() + (previousValue == null ? 0 : previousValue)) + ) + ); + } } } - partitionSizes = createPartitionSizes(partitionRecords); - acks = struct.getShort(ACKS_KEY_NAME); - timeout = struct.getInt(TIMEOUT_KEY_NAME); - transactionalId = struct.getOrElse(NULLABLE_TRANSACTIONAL_ID, null); + return partitionSizes; } - private void setFlags(MemoryRecords records) { - Iterator iterator = records.batches().iterator(); - MutableRecordBatch entry = iterator.next(); - hasIdempotentRecords = hasIdempotentRecords || entry.hasProducerId(); - hasTransactionalRecords = hasTransactionalRecords || entry.isTransactional(); + /** + * @return data or IllegalStateException if the data is removed (to prevent unnecessary memory retention). + */ + public ProduceRequestData dataOrException() { + // Store it in a local variable to protect against concurrent updates + ProduceRequestData tmp = data; + if (tmp == null) + throw new IllegalStateException("The partition records are no longer available because clearPartitionRecords() has been invoked."); + return tmp; } /** @@ -276,32 +171,7 @@ private void setFlags(MemoryRecords records) { */ @Override public Struct toStruct() { - // Store it in a local variable to protect against concurrent updates - Map partitionRecords = partitionRecordsOrFail(); - short version = version(); - Struct struct = new Struct(ApiKeys.PRODUCE.requestSchema(version)); - Map> recordsByTopic = CollectionUtils.groupPartitionDataByTopic(partitionRecords); - struct.set(ACKS_KEY_NAME, acks); - struct.set(TIMEOUT_KEY_NAME, timeout); - struct.setIfExists(NULLABLE_TRANSACTIONAL_ID, transactionalId); - - List topicDatas = new ArrayList<>(recordsByTopic.size()); - for (Map.Entry> topicEntry : recordsByTopic.entrySet()) { - Struct topicData = struct.instance(TOPIC_DATA_KEY_NAME); - topicData.set(TOPIC_NAME, topicEntry.getKey()); - List partitionArray = new ArrayList<>(); - for (Map.Entry partitionEntry : topicEntry.getValue().entrySet()) { - MemoryRecords records = partitionEntry.getValue(); - Struct part = topicData.instance(PARTITION_DATA_KEY_NAME) - .set(PARTITION_ID, partitionEntry.getKey()) - .set(RECORD_SET_KEY_NAME, records); - partitionArray.add(part); - } - topicData.set(PARTITION_DATA_KEY_NAME, partitionArray.toArray()); - topicDatas.add(topicData); - } - struct.set(TOPIC_DATA_KEY_NAME, topicDatas.toArray()); - return struct; + return dataOrException().toStruct(version()); } @Override @@ -312,9 +182,9 @@ public String toString(boolean verbose) { .append(",timeout=").append(timeout); if (verbose) - bld.append(",partitionSizes=").append(Utils.mkString(partitionSizes, "[", "]", "=", ",")); + bld.append(",partitionSizes=").append(Utils.mkString(partitionSizes(), "[", "]", "=", ",")); else - bld.append(",numPartitions=").append(partitionSizes.size()); + bld.append(",numPartitions=").append(partitionSizes().size()); bld.append("}"); return bld.toString(); @@ -323,27 +193,31 @@ public String toString(boolean verbose) { @Override public ProduceResponse getErrorResponse(int throttleTimeMs, Throwable e) { /* In case the producer doesn't actually want any response */ - if (acks == 0) - return null; - + if (acks == 0) return null; Errors error = Errors.forException(e); - Map responseMap = new HashMap<>(); - ProduceResponse.PartitionResponse partitionResponse = new ProduceResponse.PartitionResponse(error); - - for (TopicPartition tp : partitions()) - responseMap.put(tp, partitionResponse); - - return new ProduceResponse(responseMap, throttleTimeMs); + ProduceResponseData data = new ProduceResponseData().setThrottleTimeMs(throttleTimeMs); + partitionSizes().forEach((tp, ignored) -> { + ProduceResponseData.TopicProduceResponse tpr = data.responses().find(tp.topic()); + if (tpr == null) { + tpr = new ProduceResponseData.TopicProduceResponse().setName(tp.topic()); + data.responses().add(tpr); + } + tpr.partitionResponses().add(new ProduceResponseData.PartitionProduceResponse() + .setIndex(tp.partition()) + .setRecordErrors(Collections.emptyList()) + .setBaseOffset(INVALID_OFFSET) + .setLogAppendTimeMs(RecordBatch.NO_TIMESTAMP) + .setLogStartOffset(INVALID_OFFSET) + .setErrorMessage(e.getMessage()) + .setErrorCode(error.code())); + }); + return new ProduceResponse(data); } @Override public Map errorCounts(Throwable e) { Errors error = Errors.forException(e); - return Collections.singletonMap(error, partitions().size()); - } - - private Collection partitions() { - return partitionSizes.keySet(); + return Collections.singletonMap(error, partitionSizes().size()); } public short acks() { @@ -358,49 +232,34 @@ public String transactionalId() { return transactionalId; } - public boolean hasTransactionalRecords() { - return hasTransactionalRecords; - } - - public boolean hasIdempotentRecords() { - return hasIdempotentRecords; - } - - /** - * Returns the partition records or throws IllegalStateException if clearPartitionRecords() has been invoked. - */ - public Map partitionRecordsOrFail() { - // Store it in a local variable to protect against concurrent updates - Map partitionRecords = this.partitionRecords; - if (partitionRecords == null) - throw new IllegalStateException("The partition records are no longer available because " + - "clearPartitionRecords() has been invoked."); - return partitionRecords; - } - public void clearPartitionRecords() { - partitionRecords = null; + // lazily initialize partitionSizes. + partitionSizes(); + data = null; } - public static void validateRecords(short version, MemoryRecords records) { + public static void validateRecords(short version, BaseRecords baseRecords) { if (version >= 3) { - Iterator iterator = records.batches().iterator(); - if (!iterator.hasNext()) - throw new InvalidRecordException("Produce requests with version " + version + " must have at least " + - "one record batch"); - - MutableRecordBatch entry = iterator.next(); - if (entry.magic() != RecordBatch.MAGIC_VALUE_V2) - throw new InvalidRecordException("Produce requests with version " + version + " are only allowed to " + - "contain record batches with magic version 2"); - if (version < 7 && entry.compressionType() == CompressionType.ZSTD) { - throw new UnsupportedCompressionTypeException("Produce requests with version " + version + " are not allowed to " + - "use ZStandard compression"); - } + if (baseRecords instanceof Records) { + Records records = (Records) baseRecords; + Iterator iterator = records.batches().iterator(); + if (!iterator.hasNext()) + throw new InvalidRecordException("Produce requests with version " + version + " must have at least " + + "one record batch"); + + RecordBatch entry = iterator.next(); + if (entry.magic() != RecordBatch.MAGIC_VALUE_V2) + throw new InvalidRecordException("Produce requests with version " + version + " are only allowed to " + + "contain record batches with magic version 2"); + if (version < 7 && entry.compressionType() == CompressionType.ZSTD) { + throw new UnsupportedCompressionTypeException("Produce requests with version " + version + " are not allowed to " + + "use ZStandard compression"); + } - if (iterator.hasNext()) - throw new InvalidRecordException("Produce requests with version " + version + " are only allowed to " + - "contain exactly one record batch"); + if (iterator.hasNext()) + throw new InvalidRecordException("Produce requests with version " + version + " are only allowed to " + + "contain exactly one record batch"); + } } // Note that we do not do similar validation for older versions to ensure compatibility with @@ -409,7 +268,7 @@ public static void validateRecords(short version, MemoryRecords records) { } public static ProduceRequest parse(ByteBuffer buffer, short version) { - return new ProduceRequest(ApiKeys.PRODUCE.parseRequest(version, buffer), version); + return new ProduceRequest(new ProduceRequestData(new ByteBufferAccessor(buffer), version), version); } public static byte requiredMagicForVersion(short produceRequestVersion) { diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java index 205d9aba5a683..3b089cdda9935 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java @@ -17,183 +17,57 @@ package org.apache.kafka.common.requests; import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.message.ProduceResponseData; +import org.apache.kafka.common.network.Send; +import org.apache.kafka.common.protocol.ByteBufferAccessor; import org.apache.kafka.common.protocol.Errors; -import org.apache.kafka.common.protocol.types.ArrayOf; -import org.apache.kafka.common.protocol.types.Field; -import org.apache.kafka.common.protocol.types.Schema; +import org.apache.kafka.common.protocol.SendBuilder; import org.apache.kafka.common.protocol.types.Struct; import org.apache.kafka.common.record.RecordBatch; -import org.apache.kafka.common.utils.CollectionUtils; import java.nio.ByteBuffer; -import java.util.ArrayList; +import java.util.AbstractMap; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; - -import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE; -import static org.apache.kafka.common.protocol.CommonFields.PARTITION_ID; -import static org.apache.kafka.common.protocol.CommonFields.THROTTLE_TIME_MS; -import static org.apache.kafka.common.protocol.CommonFields.TOPIC_NAME; -import static org.apache.kafka.common.protocol.types.Type.INT64; +import java.util.Objects; +import java.util.stream.Collectors; /** - * This wrapper supports both v0 and v1 of ProduceResponse. + * This wrapper supports both v0 and v8 of ProduceResponse. + * + * Possible error code: + * + * {@link Errors#CORRUPT_MESSAGE} + * {@link Errors#UNKNOWN_TOPIC_OR_PARTITION} + * {@link Errors#NOT_LEADER_OR_FOLLOWER} + * {@link Errors#MESSAGE_TOO_LARGE} + * {@link Errors#INVALID_TOPIC_EXCEPTION} + * {@link Errors#RECORD_LIST_TOO_LARGE} + * {@link Errors#NOT_ENOUGH_REPLICAS} + * {@link Errors#NOT_ENOUGH_REPLICAS_AFTER_APPEND} + * {@link Errors#INVALID_REQUIRED_ACKS} + * {@link Errors#TOPIC_AUTHORIZATION_FAILED} + * {@link Errors#UNSUPPORTED_FOR_MESSAGE_FORMAT} + * {@link Errors#INVALID_PRODUCER_EPOCH} + * {@link Errors#CLUSTER_AUTHORIZATION_FAILED} + * {@link Errors#TRANSACTIONAL_ID_AUTHORIZATION_FAILED} + * {@link Errors#INVALID_RECORD} */ public class ProduceResponse extends AbstractResponse { - - private static final String RESPONSES_KEY_NAME = "responses"; - - // topic level field names - private static final String PARTITION_RESPONSES_KEY_NAME = "partition_responses"; - public static final long INVALID_OFFSET = -1L; + private final ProduceResponseData data; - /** - * Possible error code: - * - * {@link Errors#CORRUPT_MESSAGE} - * {@link Errors#UNKNOWN_TOPIC_OR_PARTITION} - * {@link Errors#NOT_LEADER_OR_FOLLOWER} - * {@link Errors#MESSAGE_TOO_LARGE} - * {@link Errors#INVALID_TOPIC_EXCEPTION} - * {@link Errors#RECORD_LIST_TOO_LARGE} - * {@link Errors#NOT_ENOUGH_REPLICAS} - * {@link Errors#NOT_ENOUGH_REPLICAS_AFTER_APPEND} - * {@link Errors#INVALID_REQUIRED_ACKS} - * {@link Errors#TOPIC_AUTHORIZATION_FAILED} - * {@link Errors#UNSUPPORTED_FOR_MESSAGE_FORMAT} - * {@link Errors#INVALID_PRODUCER_EPOCH} - * {@link Errors#CLUSTER_AUTHORIZATION_FAILED} - * {@link Errors#TRANSACTIONAL_ID_AUTHORIZATION_FAILED} - * {@link Errors#INVALID_RECORD} - */ - - private static final String BASE_OFFSET_KEY_NAME = "base_offset"; - private static final String LOG_APPEND_TIME_KEY_NAME = "log_append_time"; - private static final String LOG_START_OFFSET_KEY_NAME = "log_start_offset"; - private static final String RECORD_ERRORS_KEY_NAME = "record_errors"; - private static final String BATCH_INDEX_KEY_NAME = "batch_index"; - private static final String BATCH_INDEX_ERROR_MESSAGE_KEY_NAME = "batch_index_error_message"; - private static final String ERROR_MESSAGE_KEY_NAME = "error_message"; - - private static final Field.Int64 LOG_START_OFFSET_FIELD = new Field.Int64(LOG_START_OFFSET_KEY_NAME, - "The start offset of the log at the time this produce response was created", INVALID_OFFSET); - private static final Field.NullableStr BATCH_INDEX_ERROR_MESSAGE_FIELD = new Field.NullableStr(BATCH_INDEX_ERROR_MESSAGE_KEY_NAME, - "The error message of the record that caused the batch to be dropped"); - private static final Field.NullableStr ERROR_MESSAGE_FIELD = new Field.NullableStr(ERROR_MESSAGE_KEY_NAME, - "The global error message summarizing the common root cause of the records that caused the batch to be dropped"); - - private static final Schema PRODUCE_RESPONSE_V0 = new Schema( - new Field(RESPONSES_KEY_NAME, new ArrayOf(new Schema( - TOPIC_NAME, - new Field(PARTITION_RESPONSES_KEY_NAME, new ArrayOf(new Schema( - PARTITION_ID, - ERROR_CODE, - new Field(BASE_OFFSET_KEY_NAME, INT64)))))))); - - private static final Schema PRODUCE_RESPONSE_V1 = new Schema( - new Field(RESPONSES_KEY_NAME, new ArrayOf(new Schema( - TOPIC_NAME, - new Field(PARTITION_RESPONSES_KEY_NAME, new ArrayOf(new Schema( - PARTITION_ID, - ERROR_CODE, - new Field(BASE_OFFSET_KEY_NAME, INT64))))))), - THROTTLE_TIME_MS); - - /** - * PRODUCE_RESPONSE_V2 added a timestamp field in the per partition response status. - * The timestamp is log append time if the topic is configured to use log append time. Or it is NoTimestamp when create - * time is used for the topic. - */ - private static final Schema PRODUCE_RESPONSE_V2 = new Schema( - new Field(RESPONSES_KEY_NAME, new ArrayOf(new Schema( - TOPIC_NAME, - new Field(PARTITION_RESPONSES_KEY_NAME, new ArrayOf(new Schema( - PARTITION_ID, - ERROR_CODE, - new Field(BASE_OFFSET_KEY_NAME, INT64), - new Field(LOG_APPEND_TIME_KEY_NAME, INT64, "The timestamp returned by broker after appending " + - "the messages. If CreateTime is used for the topic, the timestamp will be -1. " + - "If LogAppendTime is used for the topic, the timestamp will be " + - "the broker local time when the messages are appended."))))))), - THROTTLE_TIME_MS); - - private static final Schema PRODUCE_RESPONSE_V3 = PRODUCE_RESPONSE_V2; - - /** - * The body of PRODUCE_RESPONSE_V4 is the same as PRODUCE_RESPONSE_V3. - * The version number is bumped up to indicate that the client supports KafkaStorageException. - * The KafkaStorageException will be translated to NotLeaderOrFollowerException in the response if version <= 3 - */ - private static final Schema PRODUCE_RESPONSE_V4 = PRODUCE_RESPONSE_V3; - - - /** - * Add in the log_start_offset field to the partition response to filter out spurious OutOfOrderSequencExceptions - * on the client. - */ - public static final Schema PRODUCE_RESPONSE_V5 = new Schema( - new Field(RESPONSES_KEY_NAME, new ArrayOf(new Schema( - TOPIC_NAME, - new Field(PARTITION_RESPONSES_KEY_NAME, new ArrayOf(new Schema( - PARTITION_ID, - ERROR_CODE, - new Field(BASE_OFFSET_KEY_NAME, INT64), - new Field(LOG_APPEND_TIME_KEY_NAME, INT64, "The timestamp returned by broker after appending " + - "the messages. If CreateTime is used for the topic, the timestamp will be -1. " + - "If LogAppendTime is used for the topic, the timestamp will be the broker local " + - "time when the messages are appended."), - LOG_START_OFFSET_FIELD)))))), - THROTTLE_TIME_MS); - - /** - * The version number is bumped to indicate that on quota violation brokers send out responses before throttling. - */ - private static final Schema PRODUCE_RESPONSE_V6 = PRODUCE_RESPONSE_V5; - - /** - * V7 bumped up to indicate ZStandard capability. (see KIP-110) - */ - private static final Schema PRODUCE_RESPONSE_V7 = PRODUCE_RESPONSE_V6; - - /** - * V8 adds record_errors and error_message. (see KIP-467) - */ - public static final Schema PRODUCE_RESPONSE_V8 = new Schema( - new Field(RESPONSES_KEY_NAME, new ArrayOf(new Schema( - TOPIC_NAME, - new Field(PARTITION_RESPONSES_KEY_NAME, new ArrayOf(new Schema( - PARTITION_ID, - ERROR_CODE, - new Field(BASE_OFFSET_KEY_NAME, INT64), - new Field(LOG_APPEND_TIME_KEY_NAME, INT64, "The timestamp returned by broker after appending " + - "the messages. If CreateTime is used for the topic, the timestamp will be -1. " + - "If LogAppendTime is used for the topic, the timestamp will be the broker local " + - "time when the messages are appended."), - LOG_START_OFFSET_FIELD, - new Field(RECORD_ERRORS_KEY_NAME, new ArrayOf(new Schema( - new Field.Int32(BATCH_INDEX_KEY_NAME, "The batch index of the record " + - "that caused the batch to be dropped"), - BATCH_INDEX_ERROR_MESSAGE_FIELD - )), "The batch indices of records that caused the batch to be dropped"), - ERROR_MESSAGE_FIELD)))))), - THROTTLE_TIME_MS); - - public static Schema[] schemaVersions() { - return new Schema[]{PRODUCE_RESPONSE_V0, PRODUCE_RESPONSE_V1, PRODUCE_RESPONSE_V2, PRODUCE_RESPONSE_V3, - PRODUCE_RESPONSE_V4, PRODUCE_RESPONSE_V5, PRODUCE_RESPONSE_V6, PRODUCE_RESPONSE_V7, PRODUCE_RESPONSE_V8}; + public ProduceResponse(ProduceResponseData produceResponseData) { + this.data = produceResponseData; } - private final Map responses; - private final int throttleTimeMs; - /** * Constructor for Version 0 * @param responses Produced data grouped by topic-partition */ + @Deprecated public ProduceResponse(Map responses) { this(responses, DEFAULT_THROTTLE_TIME); } @@ -203,119 +77,88 @@ public ProduceResponse(Map responses) { * @param responses Produced data grouped by topic-partition * @param throttleTimeMs Time in milliseconds the response was throttled */ + @Deprecated public ProduceResponse(Map responses, int throttleTimeMs) { - this.responses = responses; - this.throttleTimeMs = throttleTimeMs; + this(toData(responses, throttleTimeMs)); } - /** - * Constructor from a {@link Struct}. - */ - public ProduceResponse(Struct struct) { - responses = new HashMap<>(); - for (Object topicResponse : struct.getArray(RESPONSES_KEY_NAME)) { - Struct topicRespStruct = (Struct) topicResponse; - String topic = topicRespStruct.get(TOPIC_NAME); - - for (Object partResponse : topicRespStruct.getArray(PARTITION_RESPONSES_KEY_NAME)) { - Struct partRespStruct = (Struct) partResponse; - int partition = partRespStruct.get(PARTITION_ID); - Errors error = Errors.forCode(partRespStruct.get(ERROR_CODE)); - long offset = partRespStruct.getLong(BASE_OFFSET_KEY_NAME); - long logAppendTime = partRespStruct.hasField(LOG_APPEND_TIME_KEY_NAME) ? - partRespStruct.getLong(LOG_APPEND_TIME_KEY_NAME) : RecordBatch.NO_TIMESTAMP; - long logStartOffset = partRespStruct.getOrElse(LOG_START_OFFSET_FIELD, INVALID_OFFSET); - - List recordErrors = Collections.emptyList(); - if (partRespStruct.hasField(RECORD_ERRORS_KEY_NAME)) { - Object[] recordErrorsArray = partRespStruct.getArray(RECORD_ERRORS_KEY_NAME); - if (recordErrorsArray.length > 0) { - recordErrors = new ArrayList<>(recordErrorsArray.length); - for (Object indexAndMessage : recordErrorsArray) { - Struct indexAndMessageStruct = (Struct) indexAndMessage; - recordErrors.add(new RecordError( - indexAndMessageStruct.getInt(BATCH_INDEX_KEY_NAME), - indexAndMessageStruct.get(BATCH_INDEX_ERROR_MESSAGE_FIELD) - )); - } - } - } + @Override + protected Send toSend(String destination, ResponseHeader header, short apiVersion) { + return SendBuilder.buildResponseSend(destination, header, this.data, apiVersion); + } - String errorMessage = partRespStruct.getOrElse(ERROR_MESSAGE_FIELD, null); - TopicPartition tp = new TopicPartition(topic, partition); - responses.put(tp, new PartitionResponse(error, offset, logAppendTime, logStartOffset, recordErrors, errorMessage)); + private static ProduceResponseData toData(Map responses, int throttleTimeMs) { + ProduceResponseData data = new ProduceResponseData().setThrottleTimeMs(throttleTimeMs); + responses.forEach((tp, response) -> { + ProduceResponseData.TopicProduceResponse tpr = data.responses().find(tp.topic()); + if (tpr == null) { + tpr = new ProduceResponseData.TopicProduceResponse().setName(tp.topic()); + data.responses().add(tpr); } - } - this.throttleTimeMs = struct.getOrElse(THROTTLE_TIME_MS, DEFAULT_THROTTLE_TIME); + tpr.partitionResponses() + .add(new ProduceResponseData.PartitionProduceResponse() + .setIndex(tp.partition()) + .setBaseOffset(response.baseOffset) + .setLogStartOffset(response.logStartOffset) + .setLogAppendTimeMs(response.logAppendTime) + .setErrorMessage(response.errorMessage) + .setErrorCode(response.error.code()) + .setRecordErrors(response.recordErrors + .stream() + .map(e -> new ProduceResponseData.BatchIndexAndErrorMessage() + .setBatchIndex(e.batchIndex) + .setBatchIndexErrorMessage(e.message)) + .collect(Collectors.toList()))); + }); + return data; } + /** + * Visible for testing. + */ @Override - protected Struct toStruct(short version) { - Struct struct = new Struct(ApiKeys.PRODUCE.responseSchema(version)); - - Map> responseByTopic = CollectionUtils.groupPartitionDataByTopic(responses); - List topicDatas = new ArrayList<>(responseByTopic.size()); - for (Map.Entry> entry : responseByTopic.entrySet()) { - Struct topicData = struct.instance(RESPONSES_KEY_NAME); - topicData.set(TOPIC_NAME, entry.getKey()); - List partitionArray = new ArrayList<>(); - for (Map.Entry partitionEntry : entry.getValue().entrySet()) { - PartitionResponse part = partitionEntry.getValue(); - short errorCode = part.error.code(); - // If producer sends ProduceRequest V3 or earlier, the client library is not guaranteed to recognize the error code - // for KafkaStorageException. In this case the client library will translate KafkaStorageException to - // UnknownServerException which is not retriable. We can ensure that producer will update metadata and retry - // by converting the KafkaStorageException to NotLeaderOrFollowerException in the response if ProduceRequest version <= 3 - if (errorCode == Errors.KAFKA_STORAGE_ERROR.code() && version <= 3) - errorCode = Errors.NOT_LEADER_OR_FOLLOWER.code(); - Struct partStruct = topicData.instance(PARTITION_RESPONSES_KEY_NAME) - .set(PARTITION_ID, partitionEntry.getKey()) - .set(ERROR_CODE, errorCode) - .set(BASE_OFFSET_KEY_NAME, part.baseOffset); - partStruct.setIfExists(LOG_APPEND_TIME_KEY_NAME, part.logAppendTime); - partStruct.setIfExists(LOG_START_OFFSET_FIELD, part.logStartOffset); - - if (partStruct.hasField(RECORD_ERRORS_KEY_NAME)) { - List recordErrors = Collections.emptyList(); - if (!part.recordErrors.isEmpty()) { - recordErrors = new ArrayList<>(); - for (RecordError indexAndMessage : part.recordErrors) { - Struct indexAndMessageStruct = partStruct.instance(RECORD_ERRORS_KEY_NAME) - .set(BATCH_INDEX_KEY_NAME, indexAndMessage.batchIndex) - .set(BATCH_INDEX_ERROR_MESSAGE_FIELD, indexAndMessage.message); - recordErrors.add(indexAndMessageStruct); - } - } - partStruct.set(RECORD_ERRORS_KEY_NAME, recordErrors.toArray()); - } - - partStruct.setIfExists(ERROR_MESSAGE_FIELD, part.errorMessage); - partitionArray.add(partStruct); - } - topicData.set(PARTITION_RESPONSES_KEY_NAME, partitionArray.toArray()); - topicDatas.add(topicData); - } - struct.set(RESPONSES_KEY_NAME, topicDatas.toArray()); - struct.setIfExists(THROTTLE_TIME_MS, throttleTimeMs); + public Struct toStruct(short version) { + return data.toStruct(version); + } - return struct; + public ProduceResponseData data() { + return this.data; } + /** + * this method is used by testing only. + * refactor the tests which are using this method and then remove this method from production code. + * https://issues.apache.org/jira/browse/KAFKA-10697 + */ + @Deprecated public Map responses() { - return this.responses; + return data.responses() + .stream() + .flatMap(t -> t.partitionResponses() + .stream() + .map(p -> new AbstractMap.SimpleEntry<>(new TopicPartition(t.name(), p.index()), + new PartitionResponse( + Errors.forCode(p.errorCode()), + p.baseOffset(), + p.logAppendTimeMs(), + p.logStartOffset(), + p.recordErrors() + .stream() + .map(e -> new RecordError(e.batchIndex(), e.batchIndexErrorMessage())) + .collect(Collectors.toList()), + p.errorMessage())))) + .collect(Collectors.toMap(AbstractMap.SimpleEntry::getKey, AbstractMap.SimpleEntry::getValue)); } @Override public int throttleTimeMs() { - return this.throttleTimeMs; + return this.data.throttleTimeMs(); } @Override public Map errorCounts() { Map errorCounts = new HashMap<>(); - responses.values().forEach(response -> - updateErrorCounts(errorCounts, response.error) - ); + data.responses().forEach(t -> t.partitionResponses().forEach(p -> updateErrorCounts(errorCounts, Errors.forCode(p.errorCode())))); return errorCounts; } @@ -348,6 +191,24 @@ public PartitionResponse(Errors error, long baseOffset, long logAppendTime, long this.errorMessage = errorMessage; } + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + PartitionResponse that = (PartitionResponse) o; + return baseOffset == that.baseOffset && + logAppendTime == that.logAppendTime && + logStartOffset == that.logStartOffset && + error == that.error && + Objects.equals(recordErrors, that.recordErrors) && + Objects.equals(errorMessage, that.errorMessage); + } + + @Override + public int hashCode() { + return Objects.hash(error, baseOffset, logAppendTime, logStartOffset, recordErrors, errorMessage); + } + @Override public String toString() { StringBuilder b = new StringBuilder(); @@ -387,10 +248,23 @@ public RecordError(int batchIndex) { this.message = null; } + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + RecordError that = (RecordError) o; + return batchIndex == that.batchIndex && + Objects.equals(message, that.message); + } + + @Override + public int hashCode() { + return Objects.hash(batchIndex, message); + } } public static ProduceResponse parse(ByteBuffer buffer, short version) { - return new ProduceResponse(ApiKeys.PRODUCE.responseSchema(version).read(buffer)); + return new ProduceResponse(new ProduceResponseData(new ByteBufferAccessor(buffer), version)); } @Override diff --git a/clients/src/main/java/org/apache/kafka/common/requests/RequestUtils.java b/clients/src/main/java/org/apache/kafka/common/requests/RequestUtils.java index be72edbd8ef9c..ff42914ebff34 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/RequestUtils.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/RequestUtils.java @@ -16,11 +16,16 @@ */ package org.apache.kafka.common.requests; +import org.apache.kafka.common.message.ProduceRequestData; import org.apache.kafka.common.protocol.types.Field; import org.apache.kafka.common.protocol.types.Struct; +import org.apache.kafka.common.record.BaseRecords; import org.apache.kafka.common.record.RecordBatch; +import org.apache.kafka.common.record.Records; import java.nio.ByteBuffer; +import java.util.AbstractMap; +import java.util.Iterator; import java.util.Optional; public final class RequestUtils { @@ -48,4 +53,41 @@ public static ByteBuffer serialize(Struct headerStruct, Struct bodyStruct) { buffer.rewind(); return buffer; } + + // visible for testing + public static boolean hasIdempotentRecords(ProduceRequest request) { + return flags(request).getKey(); + } + + // visible for testing + public static boolean hasTransactionalRecords(ProduceRequest request) { + return flags(request).getValue(); + } + + /** + * Get both hasIdempotentRecords flag and hasTransactionalRecords flag from produce request. + * Noted that we find all flags at once to avoid duplicate loop and record batch construction. + * @return first flag is "hasIdempotentRecords" and another is "hasTransactionalRecords" + */ + public static AbstractMap.SimpleEntry flags(ProduceRequest request) { + boolean hasIdempotentRecords = false; + boolean hasTransactionalRecords = false; + for (ProduceRequestData.TopicProduceData tpd : request.dataOrException().topicData()) { + for (ProduceRequestData.PartitionProduceData ppd : tpd.partitionData()) { + BaseRecords records = ppd.records(); + if (records instanceof Records) { + Iterator iterator = ((Records) records).batches().iterator(); + if (iterator.hasNext()) { + RecordBatch batch = iterator.next(); + hasIdempotentRecords = hasIdempotentRecords || batch.hasProducerId(); + hasTransactionalRecords = hasTransactionalRecords || batch.isTransactional(); + } + } + // return early + if (hasIdempotentRecords && hasTransactionalRecords) + return new AbstractMap.SimpleEntry<>(true, true); + } + } + return new AbstractMap.SimpleEntry<>(hasIdempotentRecords, hasTransactionalRecords); + } } diff --git a/clients/src/main/resources/common/message/ProduceRequest.json b/clients/src/main/resources/common/message/ProduceRequest.json index a872da75c7e9b..f5e3c2bb81341 100644 --- a/clients/src/main/resources/common/message/ProduceRequest.json +++ b/clients/src/main/resources/common/message/ProduceRequest.json @@ -33,21 +33,21 @@ "validVersions": "0-8", "flexibleVersions": "none", "fields": [ - { "name": "TransactionalId", "type": "string", "versions": "3+", "nullableVersions": "0+", "entityType": "transactionalId", + { "name": "TransactionalId", "type": "string", "versions": "3+", "nullableVersions": "3+", "default": "null", "entityType": "transactionalId", "about": "The transactional ID, or null if the producer is not transactional." }, { "name": "Acks", "type": "int16", "versions": "0+", "about": "The number of acknowledgments the producer requires the leader to have received before considering a request complete. Allowed values: 0 for no acknowledgments, 1 for only the leader and -1 for the full ISR." }, { "name": "TimeoutMs", "type": "int32", "versions": "0+", "about": "The timeout to await a response in miliseconds." }, - { "name": "Topics", "type": "[]TopicProduceData", "versions": "0+", + { "name": "TopicData", "type": "[]TopicProduceData", "versions": "0+", "about": "Each topic to produce to.", "fields": [ - { "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName", + { "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName", "mapKey": true, "about": "The topic name." }, - { "name": "Partitions", "type": "[]PartitionProduceData", "versions": "0+", + { "name": "PartitionData", "type": "[]PartitionProduceData", "versions": "0+", "about": "Each partition to produce to.", "fields": [ - { "name": "PartitionIndex", "type": "int32", "versions": "0+", + { "name": "Index", "type": "int32", "versions": "0+", "about": "The partition index." }, - { "name": "Records", "type": "bytes", "versions": "0+", "nullableVersions": "0+", + { "name": "Records", "type": "records", "versions": "0+", "nullableVersions": "0+", "about": "The record data to be produced." } ]} ]} diff --git a/clients/src/main/resources/common/message/ProduceResponse.json b/clients/src/main/resources/common/message/ProduceResponse.json index 77ab0655be57f..e27acaa47658d 100644 --- a/clients/src/main/resources/common/message/ProduceResponse.json +++ b/clients/src/main/resources/common/message/ProduceResponse.json @@ -35,11 +35,11 @@ "fields": [ { "name": "Responses", "type": "[]TopicProduceResponse", "versions": "0+", "about": "Each produce response", "fields": [ - { "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName", + { "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName", "mapKey": true, "about": "The topic name" }, - { "name": "Partitions", "type": "[]PartitionProduceResponse", "versions": "0+", + { "name": "PartitionResponses", "type": "[]PartitionProduceResponse", "versions": "0+", "about": "Each partition that we produced to within the topic.", "fields": [ - { "name": "PartitionIndex", "type": "int32", "versions": "0+", + { "name": "Index", "type": "int32", "versions": "0+", "about": "The partition index." }, { "name": "ErrorCode", "type": "int16", "versions": "0+", "about": "The error code, or 0 if there was no error." }, @@ -60,7 +60,7 @@ "about": "The global error message summarizing the common root cause of the records that caused the batch to be dropped"} ]} ]}, - { "name": "ThrottleTimeMs", "type": "int32", "versions": "1+", "ignorable": true, + { "name": "ThrottleTimeMs", "type": "int32", "versions": "1+", "ignorable": true, "default": "0", "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." } ] } diff --git a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java index 23edfcc0137f5..034a0d473e70b 100644 --- a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java @@ -25,9 +25,10 @@ import org.apache.kafka.common.message.ApiVersionsResponseData; import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersionsResponseKey; import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersionsResponseKeyCollection; +import org.apache.kafka.common.message.ProduceRequestData; +import org.apache.kafka.common.message.ProduceResponseData; import org.apache.kafka.common.network.NetworkReceive; import org.apache.kafka.common.protocol.ApiKeys; -import org.apache.kafka.common.protocol.CommonFields; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.protocol.types.Struct; import org.apache.kafka.common.requests.ApiVersionsResponse; @@ -154,8 +155,10 @@ public void testClose() { client.poll(1, time.milliseconds()); assertTrue("The client should be ready", client.isReady(node, time.milliseconds())); - ProduceRequest.Builder builder = ProduceRequest.Builder.forCurrentMagic((short) 1, 1000, - Collections.emptyMap()); + ProduceRequest.Builder builder = ProduceRequest.forCurrentMagic(new ProduceRequestData() + .setTopicData(new ProduceRequestData.TopicProduceDataCollection()) + .setAcks((short) 1) + .setTimeoutMs(1000)); ClientRequest request = client.newClientRequest(node.idString(), builder, time.milliseconds(), true); client.send(request, time.milliseconds()); assertEquals("There should be 1 in-flight request after send", 1, @@ -185,10 +188,9 @@ private void checkSimpleRequestResponse(NetworkClient networkClient) { ProduceRequest.Builder builder = new ProduceRequest.Builder( PRODUCE.latestVersion(), PRODUCE.latestVersion(), - (short) 1, - 1000, - Collections.emptyMap(), - null); + new ProduceRequestData() + .setAcks((short) 1) + .setTimeoutMs(1000)); TestCallbackHandler handler = new TestCallbackHandler(); ClientRequest request = networkClient.newClientRequest(node.idString(), builder, time.milliseconds(), true, defaultRequestTimeoutMs, handler); @@ -198,8 +200,9 @@ private void checkSimpleRequestResponse(NetworkClient networkClient) { ResponseHeader respHeader = new ResponseHeader(request.correlationId(), request.apiKey().responseHeaderVersion(PRODUCE.latestVersion())); - Struct resp = new Struct(PRODUCE.responseSchema(PRODUCE.latestVersion())); - resp.set("responses", new Object[0]); + Struct resp = new ProduceResponseData() + .setThrottleTimeMs(100) + .toStruct(PRODUCE.latestVersion()); Struct responseHeaderStruct = respHeader.toStruct(); int size = responseHeaderStruct.sizeOf() + resp.sizeOf(); ByteBuffer buffer = ByteBuffer.allocate(size); @@ -431,8 +434,10 @@ public void testUnsupportedApiVersionsRequestWithoutVersionProvidedByTheBroker() @Test public void testRequestTimeout() { awaitReady(client, node); // has to be before creating any request, as it may send ApiVersionsRequest and its response is mocked with correlation id 0 - ProduceRequest.Builder builder = ProduceRequest.Builder.forCurrentMagic((short) 1, - 1000, Collections.emptyMap()); + ProduceRequest.Builder builder = ProduceRequest.forCurrentMagic(new ProduceRequestData() + .setTopicData(new ProduceRequestData.TopicProduceDataCollection()) + .setAcks((short) 1) + .setTimeoutMs(1000)); TestCallbackHandler handler = new TestCallbackHandler(); int requestTimeoutMs = defaultRequestTimeoutMs + 5000; ClientRequest request = client.newClientRequest(node.idString(), builder, time.milliseconds(), true, @@ -444,8 +449,10 @@ public void testRequestTimeout() { @Test public void testDefaultRequestTimeout() { awaitReady(client, node); // has to be before creating any request, as it may send ApiVersionsRequest and its response is mocked with correlation id 0 - ProduceRequest.Builder builder = ProduceRequest.Builder.forCurrentMagic((short) 1, - 1000, Collections.emptyMap()); + ProduceRequest.Builder builder = ProduceRequest.forCurrentMagic(new ProduceRequestData() + .setTopicData(new ProduceRequestData.TopicProduceDataCollection()) + .setAcks((short) 1) + .setTimeoutMs(1000)); ClientRequest request = client.newClientRequest(node.idString(), builder, time.milliseconds(), true); assertEquals(defaultRequestTimeoutMs, request.requestTimeoutMs()); testRequestTimeout(request); @@ -498,10 +505,9 @@ public void testConnectionThrottling() { ProduceRequest.Builder builder = new ProduceRequest.Builder( PRODUCE.latestVersion(), PRODUCE.latestVersion(), - (short) 1, - 1000, - Collections.emptyMap(), - null); + new ProduceRequestData() + .setAcks((short) 1) + .setTimeoutMs(1000)); TestCallbackHandler handler = new TestCallbackHandler(); ClientRequest request = client.newClientRequest(node.idString(), builder, time.milliseconds(), true, defaultRequestTimeoutMs, handler); @@ -510,9 +516,9 @@ public void testConnectionThrottling() { ResponseHeader respHeader = new ResponseHeader(request.correlationId(), request.apiKey().responseHeaderVersion(PRODUCE.latestVersion())); - Struct resp = new Struct(PRODUCE.responseSchema(PRODUCE.latestVersion())); - resp.set("responses", new Object[0]); - resp.set(CommonFields.THROTTLE_TIME_MS, 100); + Struct resp = new ProduceResponseData() + .setThrottleTimeMs(100) + .toStruct(PRODUCE.latestVersion()); Struct responseHeaderStruct = respHeader.toStruct(); int size = responseHeaderStruct.sizeOf() + resp.sizeOf(); ByteBuffer buffer = ByteBuffer.allocate(size); @@ -586,8 +592,10 @@ private int sendEmptyProduceRequest() { } private int sendEmptyProduceRequest(String nodeId) { - ProduceRequest.Builder builder = ProduceRequest.Builder.forCurrentMagic((short) 1, 1000, - Collections.emptyMap()); + ProduceRequest.Builder builder = ProduceRequest.forCurrentMagic(new ProduceRequestData() + .setTopicData(new ProduceRequestData.TopicProduceDataCollection()) + .setAcks((short) 1) + .setTimeoutMs(1000)); TestCallbackHandler handler = new TestCallbackHandler(); ClientRequest request = client.newClientRequest(nodeId, builder, time.milliseconds(), true, defaultRequestTimeoutMs, handler); @@ -606,9 +614,9 @@ private void sendResponse(ResponseHeader respHeader, Struct response) { } private void sendThrottledProduceResponse(int correlationId, int throttleMs) { - Struct resp = new Struct(PRODUCE.responseSchema(PRODUCE.latestVersion())); - resp.set("responses", new Object[0]); - resp.set(CommonFields.THROTTLE_TIME_MS, throttleMs); + Struct resp = new ProduceResponseData() + .setThrottleTimeMs(throttleMs) + .toStruct(PRODUCE.latestVersion()); sendResponse(new ResponseHeader(correlationId, PRODUCE.responseHeaderVersion(PRODUCE.latestVersion())), resp); diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java index d5302595cee29..a0c8d5639aafc 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java @@ -26,8 +26,10 @@ import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.common.errors.InvalidRequestException; import org.apache.kafka.common.errors.TransactionAbortedException; +import org.apache.kafka.common.message.ProduceRequestData; import org.apache.kafka.common.requests.FindCoordinatorRequest.CoordinatorType; import org.apache.kafka.common.requests.MetadataRequest; +import org.apache.kafka.common.requests.RequestUtils; import org.apache.kafka.common.utils.ProducerIdAndEpoch; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.Cluster; @@ -156,6 +158,15 @@ public void tearDown() { this.metrics.close(); } + private static Map partitionRecords(ProduceRequest request) { + Map partitionRecords = new HashMap<>(); + request.dataOrException().topicData().forEach(tpData -> tpData.partitionData().forEach(p -> { + TopicPartition tp = new TopicPartition(tpData.name(), p.index()); + partitionRecords.put(tp, (MemoryRecords) p.records()); + })); + return Collections.unmodifiableMap(partitionRecords); + } + @Test public void testSimple() throws Exception { long offset = 0; @@ -195,7 +206,7 @@ public void testMessageFormatDownConversion() throws Exception { if (request.version() != 2) return false; - MemoryRecords records = request.partitionRecordsOrFail().get(tp0); + MemoryRecords records = partitionRecords(request).get(tp0); return records != null && records.sizeInBytes() > 0 && records.hasMatchingMagic(RecordBatch.MAGIC_VALUE_V1); @@ -208,6 +219,7 @@ public void testMessageFormatDownConversion() throws Exception { assertEquals(offset, future.get().offset()); } + @SuppressWarnings("deprecation") @Test public void testDownConversionForMismatchedMagicValues() throws Exception { // it can happen that we construct a record set with mismatching magic values (perhaps @@ -241,7 +253,7 @@ public void testDownConversionForMismatchedMagicValues() throws Exception { if (request.version() != 2) return false; - Map recordsMap = request.partitionRecordsOrFail(); + Map recordsMap = partitionRecords(request); if (recordsMap.size() != 2) return false; @@ -262,6 +274,7 @@ public void testDownConversionForMismatchedMagicValues() throws Exception { /* * Send multiple requests. Verify that the client side quota metrics have the right values */ + @SuppressWarnings("deprecation") @Test public void testQuotaMetrics() { MockSelector selector = new MockSelector(time); @@ -284,8 +297,10 @@ public void testQuotaMetrics() { for (int i = 1; i <= 3; i++) { int throttleTimeMs = 100 * i; - ProduceRequest.Builder builder = ProduceRequest.Builder.forCurrentMagic((short) 1, 1000, - Collections.emptyMap()); + ProduceRequest.Builder builder = ProduceRequest.forCurrentMagic(new ProduceRequestData() + .setTopicData(new ProduceRequestData.TopicProduceDataCollection()) + .setAcks((short) 1) + .setTimeoutMs(1000)); ClientRequest request = client.newClientRequest(node.idString(), builder, time.milliseconds(), true); client.send(request, time.milliseconds()); client.poll(1, time.milliseconds()); @@ -648,7 +663,7 @@ public void testCanRetryWithoutIdempotence() throws Exception { client.respond(body -> { ProduceRequest request = (ProduceRequest) body; - assertFalse(request.hasIdempotentRecords()); + assertFalse(RequestUtils.hasIdempotentRecords(request)); return true; }, produceResponse(tp0, -1L, Errors.TOPIC_AUTHORIZATION_FAILED, 0)); sender.runOnce(); @@ -1806,9 +1821,9 @@ void sendIdempotentProducerResponse(int expectedSequence, TopicPartition tp, Err void sendIdempotentProducerResponse(final int expectedSequence, TopicPartition tp, Errors responseError, long responseOffset, long logStartOffset) { client.respond(body -> { ProduceRequest produceRequest = (ProduceRequest) body; - assertTrue(produceRequest.hasIdempotentRecords()); + assertTrue(RequestUtils.hasIdempotentRecords(produceRequest)); - MemoryRecords records = produceRequest.partitionRecordsOrFail().get(tp0); + MemoryRecords records = partitionRecords(produceRequest).get(tp0); Iterator batchIterator = records.batches().iterator(); RecordBatch firstBatch = batchIterator.next(); assertFalse(batchIterator.hasNext()); @@ -1829,8 +1844,7 @@ public void testClusterAuthorizationExceptionInProduceRequest() throws Exception // cluster authorization is a fatal error for the producer Future future = appendToAccumulator(tp0); client.prepareResponse( - body -> body instanceof ProduceRequest && - ((ProduceRequest) body).hasIdempotentRecords(), + body -> body instanceof ProduceRequest && RequestUtils.hasIdempotentRecords((ProduceRequest) body), produceResponse(tp0, -1, Errors.CLUSTER_AUTHORIZATION_FAILED, 0)); sender.runOnce(); @@ -1858,8 +1872,7 @@ public void testCancelInFlightRequestAfterFatalError() throws Exception { sender.runOnce(); client.respond( - body -> body instanceof ProduceRequest && - ((ProduceRequest) body).hasIdempotentRecords(), + body -> body instanceof ProduceRequest && RequestUtils.hasIdempotentRecords((ProduceRequest) body), produceResponse(tp0, -1, Errors.CLUSTER_AUTHORIZATION_FAILED, 0)); sender.runOnce(); @@ -1871,8 +1884,7 @@ public void testCancelInFlightRequestAfterFatalError() throws Exception { // Should be fine if the second response eventually returns client.respond( - body -> body instanceof ProduceRequest && - ((ProduceRequest) body).hasIdempotentRecords(), + body -> body instanceof ProduceRequest && RequestUtils.hasIdempotentRecords((ProduceRequest) body), produceResponse(tp1, 0, Errors.NONE, 0)); sender.runOnce(); } @@ -1888,8 +1900,7 @@ public void testUnsupportedForMessageFormatInProduceRequest() throws Exception { Future future = appendToAccumulator(tp0); client.prepareResponse( - body -> body instanceof ProduceRequest && - ((ProduceRequest) body).hasIdempotentRecords(), + body -> body instanceof ProduceRequest && RequestUtils.hasIdempotentRecords((ProduceRequest) body), produceResponse(tp0, -1, Errors.UNSUPPORTED_FOR_MESSAGE_FORMAT, 0)); sender.runOnce(); @@ -1910,8 +1921,7 @@ public void testUnsupportedVersionInProduceRequest() throws Exception { Future future = appendToAccumulator(tp0); client.prepareUnsupportedVersionResponse( - body -> body instanceof ProduceRequest && - ((ProduceRequest) body).hasIdempotentRecords()); + body -> body instanceof ProduceRequest && RequestUtils.hasIdempotentRecords((ProduceRequest) body)); sender.runOnce(); assertFutureFailure(future, UnsupportedVersionException.class); @@ -1940,7 +1950,7 @@ public void testSequenceNumberIncrement() throws InterruptedException { client.prepareResponse(body -> { if (body instanceof ProduceRequest) { ProduceRequest request = (ProduceRequest) body; - MemoryRecords records = request.partitionRecordsOrFail().get(tp0); + MemoryRecords records = partitionRecords(request).get(tp0); Iterator batchIterator = records.batches().iterator(); assertTrue(batchIterator.hasNext()); RecordBatch batch = batchIterator.next(); @@ -2057,6 +2067,7 @@ public void testTransactionalSplitBatchAndSend() throws Exception { testSplitBatchAndSend(txnManager, producerIdAndEpoch, tp); } + @SuppressWarnings("deprecation") private void testSplitBatchAndSend(TransactionManager txnManager, ProducerIdAndEpoch producerIdAndEpoch, TopicPartition tp) throws Exception { @@ -2170,6 +2181,7 @@ public void testNoDoubleDeallocation() throws Exception { assertEquals(0, sender.inFlightBatches(tp0).size()); } + @SuppressWarnings("deprecation") @Test public void testInflightBatchesExpireOnDeliveryTimeout() throws InterruptedException { long deliveryTimeoutMs = 1500L; @@ -2304,6 +2316,7 @@ public void testResetNextBatchExpiry() throws Exception { } + @SuppressWarnings("deprecation") @Test public void testExpiredBatchesInMultiplePartitions() throws Exception { long deliveryTimeoutMs = 1500L; @@ -2601,7 +2614,7 @@ private MockClient.RequestMatcher produceRequestMatcher(final TopicPartition tp, return false; ProduceRequest request = (ProduceRequest) body; - Map recordsMap = request.partitionRecordsOrFail(); + Map recordsMap = partitionRecords(request); MemoryRecords records = recordsMap.get(tp); if (records == null) return false; @@ -2637,6 +2650,7 @@ private FutureRecordMetadata appendToAccumulator(TopicPartition tp, long timesta null, MAX_BLOCK_TIMEOUT, false, time.milliseconds()).future; } + @SuppressWarnings("deprecation") private ProduceResponse produceResponse(TopicPartition tp, long offset, Errors error, int throttleTimeMs, long logStartOffset, String errorMessage) { ProduceResponse.PartitionResponse resp = new ProduceResponse.PartitionResponse(error, offset, RecordBatch.NO_TIMESTAMP, logStartOffset, Collections.emptyList(), errorMessage); @@ -2644,6 +2658,7 @@ private ProduceResponse produceResponse(TopicPartition tp, long offset, Errors e return new ProduceResponse(partResp, throttleTimeMs); } + @SuppressWarnings("deprecation") private ProduceResponse produceResponse(Map responses) { Map partResponses = new LinkedHashMap<>(); for (Map.Entry entry : responses.entrySet()) { diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java index ee3f8925e8636..cd71aabb3dfd0 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java @@ -3356,7 +3356,16 @@ private void prepareProduceResponse(Errors error, final long producerId, final s private MockClient.RequestMatcher produceRequestMatcher(final long producerId, final short epoch, TopicPartition tp) { return body -> { ProduceRequest produceRequest = (ProduceRequest) body; - MemoryRecords records = produceRequest.partitionRecordsOrFail().get(tp); + MemoryRecords records = produceRequest.dataOrException().topicData() + .stream() + .filter(t -> t.name().equals(tp.topic())) + .findAny() + .get() + .partitionData() + .stream() + .filter(p -> p.index() == tp.partition()) + .map(p -> (MemoryRecords) p.records()) + .findAny().get(); assertNotNull(records); Iterator batchIterator = records.batches().iterator(); assertTrue(batchIterator.hasNext()); @@ -3481,6 +3490,7 @@ private ProduceResponse produceResponse(TopicPartition tp, long offset, Errors e return produceResponse(tp, offset, error, throttleTimeMs, 10); } + @SuppressWarnings("deprecation") private ProduceResponse produceResponse(TopicPartition tp, long offset, Errors error, int throttleTimeMs, int logStartOffset) { ProduceResponse.PartitionResponse resp = new ProduceResponse.PartitionResponse(error, offset, RecordBatch.NO_TIMESTAMP, logStartOffset); Map partResp = singletonMap(tp, resp); diff --git a/clients/src/test/java/org/apache/kafka/common/message/MessageTest.java b/clients/src/test/java/org/apache/kafka/common/message/MessageTest.java index f02db2374ea87..36dde722c8772 100644 --- a/clients/src/test/java/org/apache/kafka/common/message/MessageTest.java +++ b/clients/src/test/java/org/apache/kafka/common/message/MessageTest.java @@ -678,48 +678,47 @@ public void testProduceResponseVersions() throws Exception { String errorMessage = "global error message"; testAllMessageRoundTrips(new ProduceResponseData() - .setResponses(singletonList( - new ProduceResponseData.TopicProduceResponse() - .setName(topicName) - .setPartitions(singletonList( - new ProduceResponseData.PartitionProduceResponse() - .setPartitionIndex(partitionIndex) - .setErrorCode(errorCode) - .setBaseOffset(baseOffset)))))); - - Supplier response = - () -> new ProduceResponseData() - .setResponses(singletonList( - new ProduceResponseData.TopicProduceResponse() - .setName(topicName) - .setPartitions(singletonList( - new ProduceResponseData.PartitionProduceResponse() - .setPartitionIndex(partitionIndex) - .setErrorCode(errorCode) - .setBaseOffset(baseOffset) - .setLogAppendTimeMs(logAppendTimeMs) - .setLogStartOffset(logStartOffset) - .setRecordErrors(singletonList( - new ProduceResponseData.BatchIndexAndErrorMessage() - .setBatchIndex(batchIndex) - .setBatchIndexErrorMessage(batchIndexErrorMessage))) - .setErrorMessage(errorMessage))))) - .setThrottleTimeMs(throttleTimeMs); + .setResponses(new ProduceResponseData.TopicProduceResponseCollection(singletonList( + new ProduceResponseData.TopicProduceResponse() + .setName(topicName) + .setPartitionResponses(singletonList( + new ProduceResponseData.PartitionProduceResponse() + .setIndex(partitionIndex) + .setErrorCode(errorCode) + .setBaseOffset(baseOffset)))).iterator()))); + + Supplier response = () -> new ProduceResponseData() + .setResponses(new ProduceResponseData.TopicProduceResponseCollection(singletonList( + new ProduceResponseData.TopicProduceResponse() + .setName(topicName) + .setPartitionResponses(singletonList( + new ProduceResponseData.PartitionProduceResponse() + .setIndex(partitionIndex) + .setErrorCode(errorCode) + .setBaseOffset(baseOffset) + .setLogAppendTimeMs(logAppendTimeMs) + .setLogStartOffset(logStartOffset) + .setRecordErrors(singletonList( + new ProduceResponseData.BatchIndexAndErrorMessage() + .setBatchIndex(batchIndex) + .setBatchIndexErrorMessage(batchIndexErrorMessage))) + .setErrorMessage(errorMessage)))).iterator())) + .setThrottleTimeMs(throttleTimeMs); for (short version = 0; version <= ApiKeys.PRODUCE.latestVersion(); version++) { ProduceResponseData responseData = response.get(); if (version < 8) { - responseData.responses().get(0).partitions().get(0).setRecordErrors(Collections.emptyList()); - responseData.responses().get(0).partitions().get(0).setErrorMessage(null); + responseData.responses().iterator().next().partitionResponses().get(0).setRecordErrors(Collections.emptyList()); + responseData.responses().iterator().next().partitionResponses().get(0).setErrorMessage(null); } if (version < 5) { - responseData.responses().get(0).partitions().get(0).setLogStartOffset(-1); + responseData.responses().iterator().next().partitionResponses().get(0).setLogStartOffset(-1); } if (version < 2) { - responseData.responses().get(0).partitions().get(0).setLogAppendTimeMs(-1); + responseData.responses().iterator().next().partitionResponses().get(0).setLogAppendTimeMs(-1); } if (version < 1) { diff --git a/clients/src/test/java/org/apache/kafka/common/requests/ProduceRequestTest.java b/clients/src/test/java/org/apache/kafka/common/requests/ProduceRequestTest.java index 95d719adb3bb7..156af86b53756 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/ProduceRequestTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/ProduceRequestTest.java @@ -18,7 +18,7 @@ package org.apache.kafka.common.requests; import org.apache.kafka.common.InvalidRecordException; -import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.message.ProduceRequestData; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.record.CompressionType; import org.apache.kafka.common.record.MemoryRecords; @@ -30,10 +30,8 @@ import org.junit.Test; import java.nio.ByteBuffer; +import java.util.Arrays; import java.util.Collections; -import java.util.HashMap; -import java.util.LinkedHashMap; -import java.util.Map; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -50,30 +48,47 @@ public class ProduceRequestTest { public void shouldBeFlaggedAsTransactionalWhenTransactionalRecords() throws Exception { final MemoryRecords memoryRecords = MemoryRecords.withTransactionalRecords(0, CompressionType.NONE, 1L, (short) 1, 1, 1, simpleRecord); - final ProduceRequest request = ProduceRequest.Builder.forCurrentMagic((short) -1, - 10, Collections.singletonMap(new TopicPartition("topic", 1), memoryRecords)).build(); - assertTrue(request.hasTransactionalRecords()); + + final ProduceRequest request = ProduceRequest.forCurrentMagic(new ProduceRequestData() + .setTopicData(new ProduceRequestData.TopicProduceDataCollection(Collections.singletonList( + new ProduceRequestData.TopicProduceData() + .setName("topic") + .setPartitionData(Collections.singletonList( + new ProduceRequestData.PartitionProduceData() + .setIndex(1) + .setRecords(memoryRecords)))).iterator())) + .setAcks((short) -1) + .setTimeoutMs(10)).build(); + assertTrue(RequestUtils.hasTransactionalRecords(request)); } @Test public void shouldNotBeFlaggedAsTransactionalWhenNoRecords() throws Exception { final ProduceRequest request = createNonIdempotentNonTransactionalRecords(); - assertFalse(request.hasTransactionalRecords()); + assertFalse(RequestUtils.hasTransactionalRecords(request)); } @Test public void shouldNotBeFlaggedAsIdempotentWhenRecordsNotIdempotent() throws Exception { final ProduceRequest request = createNonIdempotentNonTransactionalRecords(); - assertFalse(request.hasTransactionalRecords()); + assertFalse(RequestUtils.hasTransactionalRecords(request)); } @Test public void shouldBeFlaggedAsIdempotentWhenIdempotentRecords() throws Exception { final MemoryRecords memoryRecords = MemoryRecords.withIdempotentRecords(1, CompressionType.NONE, 1L, (short) 1, 1, 1, simpleRecord); - final ProduceRequest request = ProduceRequest.Builder.forCurrentMagic((short) -1, 10, - Collections.singletonMap(new TopicPartition("topic", 1), memoryRecords)).build(); - assertTrue(request.hasIdempotentRecords()); + final ProduceRequest request = ProduceRequest.forCurrentMagic(new ProduceRequestData() + .setTopicData(new ProduceRequestData.TopicProduceDataCollection(Collections.singletonList( + new ProduceRequestData.TopicProduceData() + .setName("topic") + .setPartitionData(Collections.singletonList( + new ProduceRequestData.PartitionProduceData() + .setIndex(1) + .setRecords(memoryRecords)))).iterator())) + .setAcks((short) -1) + .setTimeoutMs(10)).build(); + assertTrue(RequestUtils.hasIdempotentRecords(request)); } @Test @@ -82,11 +97,14 @@ public void testBuildWithOldMessageFormat() { MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, RecordBatch.MAGIC_VALUE_V1, CompressionType.NONE, TimestampType.CREATE_TIME, 0L); builder.append(10L, null, "a".getBytes()); - Map produceData = new HashMap<>(); - produceData.put(new TopicPartition("test", 0), builder.build()); - - ProduceRequest.Builder requestBuilder = ProduceRequest.Builder.forMagic(RecordBatch.MAGIC_VALUE_V1, (short) 1, - 5000, produceData, null); + ProduceRequest.Builder requestBuilder = ProduceRequest.forMagic(RecordBatch.MAGIC_VALUE_V1, + new ProduceRequestData() + .setTopicData(new ProduceRequestData.TopicProduceDataCollection(Collections.singletonList( + new ProduceRequestData.TopicProduceData().setName("test").setPartitionData(Collections.singletonList( + new ProduceRequestData.PartitionProduceData().setIndex(9).setRecords(builder.build())))) + .iterator())) + .setAcks((short) 1) + .setTimeoutMs(5000)); assertEquals(2, requestBuilder.oldestAllowedVersion()); assertEquals(2, requestBuilder.latestAllowedVersion()); } @@ -97,11 +115,14 @@ public void testBuildWithCurrentMessageFormat() { MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, RecordBatch.CURRENT_MAGIC_VALUE, CompressionType.NONE, TimestampType.CREATE_TIME, 0L); builder.append(10L, null, "a".getBytes()); - Map produceData = new HashMap<>(); - produceData.put(new TopicPartition("test", 0), builder.build()); - - ProduceRequest.Builder requestBuilder = ProduceRequest.Builder.forMagic(RecordBatch.CURRENT_MAGIC_VALUE, - (short) 1, 5000, produceData, null); + ProduceRequest.Builder requestBuilder = ProduceRequest.forMagic(RecordBatch.CURRENT_MAGIC_VALUE, + new ProduceRequestData() + .setTopicData(new ProduceRequestData.TopicProduceDataCollection(Collections.singletonList( + new ProduceRequestData.TopicProduceData().setName("test").setPartitionData(Collections.singletonList( + new ProduceRequestData.PartitionProduceData().setIndex(9).setRecords(builder.build())))) + .iterator())) + .setAcks((short) 1) + .setTimeoutMs(5000)); assertEquals(3, requestBuilder.oldestAllowedVersion()); assertEquals(ApiKeys.PRODUCE.latestVersion(), requestBuilder.latestAllowedVersion()); } @@ -120,17 +141,31 @@ public void testV3AndAboveShouldContainOnlyOneRecordBatch() { buffer.flip(); - Map produceData = new HashMap<>(); - produceData.put(new TopicPartition("test", 0), MemoryRecords.readableRecords(buffer)); - ProduceRequest.Builder requestBuilder = ProduceRequest.Builder.forCurrentMagic((short) 1, 5000, produceData); + ProduceRequest.Builder requestBuilder = ProduceRequest.forCurrentMagic(new ProduceRequestData() + .setTopicData(new ProduceRequestData.TopicProduceDataCollection(Collections.singletonList( + new ProduceRequestData.TopicProduceData() + .setName("test") + .setPartitionData(Collections.singletonList( + new ProduceRequestData.PartitionProduceData() + .setIndex(0) + .setRecords(MemoryRecords.readableRecords(buffer))))).iterator())) + .setAcks((short) 1) + .setTimeoutMs(5000)); assertThrowsInvalidRecordExceptionForAllVersions(requestBuilder); } @Test public void testV3AndAboveCannotHaveNoRecordBatches() { - Map produceData = new HashMap<>(); - produceData.put(new TopicPartition("test", 0), MemoryRecords.EMPTY); - ProduceRequest.Builder requestBuilder = ProduceRequest.Builder.forCurrentMagic((short) 1, 5000, produceData); + ProduceRequest.Builder requestBuilder = ProduceRequest.forCurrentMagic(new ProduceRequestData() + .setTopicData(new ProduceRequestData.TopicProduceDataCollection(Collections.singletonList( + new ProduceRequestData.TopicProduceData() + .setName("test") + .setPartitionData(Collections.singletonList( + new ProduceRequestData.PartitionProduceData() + .setIndex(0) + .setRecords(MemoryRecords.EMPTY)))).iterator())) + .setAcks((short) 1) + .setTimeoutMs(5000)); assertThrowsInvalidRecordExceptionForAllVersions(requestBuilder); } @@ -141,9 +176,16 @@ public void testV3AndAboveCannotUseMagicV0() { TimestampType.NO_TIMESTAMP_TYPE, 0L); builder.append(10L, null, "a".getBytes()); - Map produceData = new HashMap<>(); - produceData.put(new TopicPartition("test", 0), builder.build()); - ProduceRequest.Builder requestBuilder = ProduceRequest.Builder.forCurrentMagic((short) 1, 5000, produceData); + ProduceRequest.Builder requestBuilder = ProduceRequest.forCurrentMagic(new ProduceRequestData() + .setTopicData(new ProduceRequestData.TopicProduceDataCollection(Collections.singletonList( + new ProduceRequestData.TopicProduceData() + .setName("test") + .setPartitionData(Collections.singletonList( + new ProduceRequestData.PartitionProduceData() + .setIndex(0) + .setRecords(builder.build())))).iterator())) + .setAcks((short) 1) + .setTimeoutMs(5000)); assertThrowsInvalidRecordExceptionForAllVersions(requestBuilder); } @@ -154,9 +196,16 @@ public void testV3AndAboveCannotUseMagicV1() { TimestampType.CREATE_TIME, 0L); builder.append(10L, null, "a".getBytes()); - Map produceData = new HashMap<>(); - produceData.put(new TopicPartition("test", 0), builder.build()); - ProduceRequest.Builder requestBuilder = ProduceRequest.Builder.forCurrentMagic((short) 1, 5000, produceData); + ProduceRequest.Builder requestBuilder = ProduceRequest.forCurrentMagic(new ProduceRequestData() + .setTopicData(new ProduceRequestData.TopicProduceDataCollection(Collections.singletonList( + new ProduceRequestData.TopicProduceData() + .setName("test") + .setPartitionData(Collections.singletonList(new ProduceRequestData.PartitionProduceData() + .setIndex(0) + .setRecords(builder.build())))) + .iterator())) + .setAcks((short) 1) + .setTimeoutMs(5000)); assertThrowsInvalidRecordExceptionForAllVersions(requestBuilder); } @@ -167,17 +216,25 @@ public void testV6AndBelowCannotUseZStdCompression() { TimestampType.CREATE_TIME, 0L); builder.append(10L, null, "a".getBytes()); - Map produceData = new HashMap<>(); - produceData.put(new TopicPartition("test", 0), builder.build()); - + ProduceRequestData produceData = new ProduceRequestData() + .setTopicData(new ProduceRequestData.TopicProduceDataCollection(Collections.singletonList( + new ProduceRequestData.TopicProduceData() + .setName("test") + .setPartitionData(Collections.singletonList(new ProduceRequestData.PartitionProduceData() + .setIndex(0) + .setRecords(builder.build())))) + .iterator())) + .setAcks((short) 1) + .setTimeoutMs(1000); // Can't create ProduceRequest instance with version within [3, 7) for (short version = 3; version < 7; version++) { - ProduceRequest.Builder requestBuilder = new ProduceRequest.Builder(version, version, (short) 1, 5000, produceData, null); + + ProduceRequest.Builder requestBuilder = new ProduceRequest.Builder(version, version, produceData); assertThrowsInvalidRecordExceptionForAllVersions(requestBuilder); } // Works fine with current version (>= 7) - ProduceRequest.Builder.forCurrentMagic((short) 1, 5000, produceData); + ProduceRequest.forCurrentMagic(produceData); } @Test @@ -192,16 +249,19 @@ public void testMixedTransactionalData() { final MemoryRecords txnRecords = MemoryRecords.withTransactionalRecords(CompressionType.NONE, producerId, producerEpoch, sequence, new SimpleRecord("bar".getBytes())); - final Map recordsByPartition = new LinkedHashMap<>(); - recordsByPartition.put(new TopicPartition("foo", 0), txnRecords); - recordsByPartition.put(new TopicPartition("foo", 1), nonTxnRecords); - - final ProduceRequest.Builder builder = ProduceRequest.Builder.forMagic(RecordVersion.current().value, (short) -1, 5000, - recordsByPartition, transactionalId); - + ProduceRequest.Builder builder = ProduceRequest.forMagic(RecordBatch.CURRENT_MAGIC_VALUE, + new ProduceRequestData() + .setTopicData(new ProduceRequestData.TopicProduceDataCollection(Arrays.asList( + new ProduceRequestData.TopicProduceData().setName("foo").setPartitionData(Collections.singletonList( + new ProduceRequestData.PartitionProduceData().setIndex(0).setRecords(txnRecords))), + new ProduceRequestData.TopicProduceData().setName("foo").setPartitionData(Collections.singletonList( + new ProduceRequestData.PartitionProduceData().setIndex(1).setRecords(nonTxnRecords)))) + .iterator())) + .setAcks((short) -1) + .setTimeoutMs(5000)); final ProduceRequest request = builder.build(); - assertTrue(request.hasTransactionalRecords()); - assertTrue(request.hasIdempotentRecords()); + assertTrue(RequestUtils.hasTransactionalRecords(request)); + assertTrue(RequestUtils.hasIdempotentRecords(request)); } @Test @@ -215,16 +275,20 @@ public void testMixedIdempotentData() { final MemoryRecords txnRecords = MemoryRecords.withIdempotentRecords(CompressionType.NONE, producerId, producerEpoch, sequence, new SimpleRecord("bar".getBytes())); - final Map recordsByPartition = new LinkedHashMap<>(); - recordsByPartition.put(new TopicPartition("foo", 0), txnRecords); - recordsByPartition.put(new TopicPartition("foo", 1), nonTxnRecords); - - final ProduceRequest.Builder builder = ProduceRequest.Builder.forMagic(RecordVersion.current().value, (short) -1, 5000, - recordsByPartition, null); + ProduceRequest.Builder builder = ProduceRequest.forMagic(RecordVersion.current().value, + new ProduceRequestData() + .setTopicData(new ProduceRequestData.TopicProduceDataCollection(Arrays.asList( + new ProduceRequestData.TopicProduceData().setName("foo").setPartitionData(Collections.singletonList( + new ProduceRequestData.PartitionProduceData().setIndex(0).setRecords(txnRecords))), + new ProduceRequestData.TopicProduceData().setName("foo").setPartitionData(Collections.singletonList( + new ProduceRequestData.PartitionProduceData().setIndex(1).setRecords(nonTxnRecords)))) + .iterator())) + .setAcks((short) -1) + .setTimeoutMs(5000)); final ProduceRequest request = builder.build(); - assertFalse(request.hasTransactionalRecords()); - assertTrue(request.hasIdempotentRecords()); + assertFalse(RequestUtils.hasTransactionalRecords(request)); + assertTrue(RequestUtils.hasIdempotentRecords(request)); } private void assertThrowsInvalidRecordExceptionForAllVersions(ProduceRequest.Builder builder) { @@ -244,8 +308,15 @@ private void assertThrowsInvalidRecordException(ProduceRequest.Builder builder, } private ProduceRequest createNonIdempotentNonTransactionalRecords() { - final MemoryRecords memoryRecords = MemoryRecords.withRecords(CompressionType.NONE, simpleRecord); - return ProduceRequest.Builder.forCurrentMagic((short) -1, 10, - Collections.singletonMap(new TopicPartition("topic", 1), memoryRecords)).build(); + return ProduceRequest.forCurrentMagic(new ProduceRequestData() + .setTopicData(new ProduceRequestData.TopicProduceDataCollection(Collections.singletonList( + new ProduceRequestData.TopicProduceData() + .setName("topic") + .setPartitionData(Collections.singletonList(new ProduceRequestData.PartitionProduceData() + .setIndex(1) + .setRecords(MemoryRecords.withRecords(CompressionType.NONE, simpleRecord))))) + .iterator())) + .setAcks((short) -1) + .setTimeoutMs(10)).build(); } } diff --git a/clients/src/test/java/org/apache/kafka/common/requests/ProduceResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/ProduceResponseTest.java index ea6e99882d200..37cac84db0c15 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/ProduceResponseTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/ProduceResponseTest.java @@ -18,6 +18,7 @@ package org.apache.kafka.common.requests; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.message.ProduceResponseData; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.protocol.types.Struct; @@ -31,10 +32,12 @@ import static org.apache.kafka.common.protocol.ApiKeys.PRODUCE; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; public class ProduceResponseTest { + @SuppressWarnings("deprecation") @Test public void produceResponseV5Test() { Map responseData = new HashMap<>(); @@ -64,6 +67,7 @@ public void produceResponseV5Test() { assertEquals(responseData, v5Response.responses()); } + @SuppressWarnings("deprecation") @Test public void produceResponseVersionTest() { Map responseData = new HashMap<>(); @@ -86,6 +90,7 @@ public void produceResponseVersionTest() { assertEquals("Response data does not match", responseData, v2Response.responses()); } + @SuppressWarnings("deprecation") @Test public void produceResponseRecordErrorsTest() { Map responseData = new HashMap<>(); @@ -100,7 +105,7 @@ public void produceResponseRecordErrorsTest() { ProduceResponse response = new ProduceResponse(responseData); Struct struct = response.toStruct(ver); assertEquals("Should use schema version " + ver, ApiKeys.PRODUCE.responseSchema(ver), struct.schema()); - ProduceResponse.PartitionResponse deserialized = new ProduceResponse(struct).responses().get(tp); + ProduceResponse.PartitionResponse deserialized = new ProduceResponse(new ProduceResponseData(struct, ver)).responses().get(tp); if (ver >= 8) { assertEquals(1, deserialized.recordErrors.size()); assertEquals(3, deserialized.recordErrors.get(0).batchIndex); @@ -108,7 +113,7 @@ public void produceResponseRecordErrorsTest() { assertEquals("Produce failed", deserialized.errorMessage); } else { assertEquals(0, deserialized.recordErrors.size()); - assertEquals(null, deserialized.errorMessage); + assertNull(deserialized.errorMessage); } } } diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java index 71048d8752dde..7e51fa2716990 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java @@ -131,6 +131,7 @@ import org.apache.kafka.common.message.OffsetDeleteResponseData.OffsetDeleteResponsePartitionCollection; import org.apache.kafka.common.message.OffsetDeleteResponseData.OffsetDeleteResponseTopic; import org.apache.kafka.common.message.OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection; +import org.apache.kafka.common.message.ProduceRequestData; import org.apache.kafka.common.message.RenewDelegationTokenRequestData; import org.apache.kafka.common.message.RenewDelegationTokenResponseData; import org.apache.kafka.common.message.SaslAuthenticateRequestData; @@ -625,10 +626,35 @@ public void cannotUseFindCoordinatorV0ToFindTransactionCoordinator() { builder.build((short) 0); } + @Test + public void testPartitionSize() { + TopicPartition tp0 = new TopicPartition("test", 0); + TopicPartition tp1 = new TopicPartition("test", 1); + MemoryRecords records0 = MemoryRecords.withRecords(RecordBatch.MAGIC_VALUE_V2, + CompressionType.NONE, new SimpleRecord("woot".getBytes())); + MemoryRecords records1 = MemoryRecords.withRecords(RecordBatch.MAGIC_VALUE_V2, + CompressionType.NONE, new SimpleRecord("woot".getBytes()), new SimpleRecord("woot".getBytes())); + ProduceRequest request = ProduceRequest.forMagic(RecordBatch.MAGIC_VALUE_V2, + new ProduceRequestData() + .setTopicData(new ProduceRequestData.TopicProduceDataCollection(Arrays.asList( + new ProduceRequestData.TopicProduceData().setName(tp0.topic()).setPartitionData( + Collections.singletonList(new ProduceRequestData.PartitionProduceData().setIndex(tp0.partition()).setRecords(records0))), + new ProduceRequestData.TopicProduceData().setName(tp1.topic()).setPartitionData( + Collections.singletonList(new ProduceRequestData.PartitionProduceData().setIndex(tp1.partition()).setRecords(records1)))) + .iterator())) + .setAcks((short) 1) + .setTimeoutMs(5000) + .setTransactionalId("transactionalId")) + .build((short) 3); + assertEquals(2, request.partitionSizes().size()); + assertEquals(records0.sizeInBytes(), (int) request.partitionSizes().get(tp0)); + assertEquals(records1.sizeInBytes(), (int) request.partitionSizes().get(tp1)); + } + @Test public void produceRequestToStringTest() { ProduceRequest request = createProduceRequest(ApiKeys.PRODUCE.latestVersion()); - assertEquals(1, request.partitionRecordsOrFail().size()); + assertEquals(1, request.dataOrException().topicData().size()); assertFalse(request.toString(false).contains("partitionSizes")); assertTrue(request.toString(false).contains("numPartitions=1")); assertTrue(request.toString(true).contains("partitionSizes")); @@ -636,8 +662,8 @@ public void produceRequestToStringTest() { request.clearPartitionRecords(); try { - request.partitionRecordsOrFail(); - fail("partitionRecordsOrFail should fail after clearPartitionRecords()"); + request.dataOrException(); + fail("dataOrException should fail after clearPartitionRecords()"); } catch (IllegalStateException e) { // OK } @@ -649,10 +675,11 @@ public void produceRequestToStringTest() { assertFalse(request.toString(true).contains("numPartitions")); } + @SuppressWarnings("deprecation") @Test public void produceRequestGetErrorResponseTest() { ProduceRequest request = createProduceRequest(ApiKeys.PRODUCE.latestVersion()); - Set partitions = new HashSet<>(request.partitionRecordsOrFail().keySet()); + Set partitions = new HashSet<>(request.partitionSizes().keySet()); ProduceResponse errorResponse = (ProduceResponse) request.getErrorResponse(new NotEnoughReplicasException()); assertEquals(partitions, errorResponse.responses().keySet()); @@ -1389,17 +1416,27 @@ private OffsetFetchResponse createOffsetFetchResponse() { return new OffsetFetchResponse(Errors.NONE, responseData); } + @SuppressWarnings("deprecation") private ProduceRequest createProduceRequest(int version) { if (version < 2) throw new IllegalArgumentException("Produce request version 2 is not supported"); - byte magic = version == 2 ? RecordBatch.MAGIC_VALUE_V1 : RecordBatch.MAGIC_VALUE_V2; MemoryRecords records = MemoryRecords.withRecords(magic, CompressionType.NONE, new SimpleRecord("woot".getBytes())); - Map produceData = Collections.singletonMap(new TopicPartition("test", 0), records); - return ProduceRequest.Builder.forMagic(magic, (short) 1, 5000, produceData, "transactionalId") + return ProduceRequest.forMagic(magic, + new ProduceRequestData() + .setTopicData(new ProduceRequestData.TopicProduceDataCollection(Collections.singletonList( + new ProduceRequestData.TopicProduceData() + .setName("test") + .setPartitionData(Collections.singletonList(new ProduceRequestData.PartitionProduceData() + .setIndex(0) + .setRecords(records)))).iterator())) + .setAcks((short) 1) + .setTimeoutMs(5000) + .setTransactionalId(version >= 3 ? "transactionalId" : null)) .build((short) version); } + @SuppressWarnings("deprecation") private ProduceResponse createProduceResponse() { Map responseData = new HashMap<>(); responseData.put(new TopicPartition("test", 0), new ProduceResponse.PartitionResponse(Errors.NONE, @@ -1407,6 +1444,7 @@ private ProduceResponse createProduceResponse() { return new ProduceResponse(responseData, 0); } + @SuppressWarnings("deprecation") private ProduceResponse createProduceResponseWithErrorMessage() { Map responseData = new HashMap<>(); responseData.put(new TopicPartition("test", 0), new ProduceResponse.PartitionResponse(Errors.NONE, diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index dca78b4de5de9..4392406de5c55 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -90,6 +90,8 @@ import scala.collection.{Map, Seq, Set, immutable, mutable} import scala.util.{Failure, Success, Try} import kafka.coordinator.group.GroupOverview +import scala.annotation.nowarn + /** * Logic to handle the various Kafka requests */ @@ -566,7 +568,11 @@ class KafkaApis(val requestChannel: RequestChannel, val produceRequest = request.body[ProduceRequest] val numBytesAppended = request.header.toStruct.sizeOf + request.sizeOfBodyInBytes - if (produceRequest.hasTransactionalRecords) { + val (hasIdempotentRecords, hasTransactionalRecords) = { + val flags = RequestUtils.flags(produceRequest) + (flags.getKey, flags.getValue) + } + if (hasTransactionalRecords) { val isAuthorizedTransactional = produceRequest.transactionalId != null && authorize(request.context, WRITE, TRANSACTIONAL_ID, produceRequest.transactionalId) if (!isAuthorizedTransactional) { @@ -575,19 +581,25 @@ class KafkaApis(val requestChannel: RequestChannel, } // Note that authorization to a transactionalId implies ProducerId authorization - } else if (produceRequest.hasIdempotentRecords && !authorize(request.context, IDEMPOTENT_WRITE, CLUSTER, CLUSTER_NAME)) { + } else if (hasIdempotentRecords && !authorize(request.context, IDEMPOTENT_WRITE, CLUSTER, CLUSTER_NAME)) { sendErrorResponseMaybeThrottle(request, Errors.CLUSTER_AUTHORIZATION_FAILED.exception) return } - val produceRecords = produceRequest.partitionRecordsOrFail.asScala val unauthorizedTopicResponses = mutable.Map[TopicPartition, PartitionResponse]() val nonExistingTopicResponses = mutable.Map[TopicPartition, PartitionResponse]() val invalidRequestResponses = mutable.Map[TopicPartition, PartitionResponse]() val authorizedRequestInfo = mutable.Map[TopicPartition, MemoryRecords]() - val authorizedTopics = filterByAuthorized(request.context, WRITE, TOPIC, produceRecords)(_._1.topic) - - for ((topicPartition, memoryRecords) <- produceRecords) { + // cache the result to avoid redundant authorization calls + val authorizedTopics = filterByAuthorized(request.context, WRITE, TOPIC, + produceRequest.dataOrException().topicData().asScala)(_.name()) + + produceRequest.dataOrException.topicData.forEach(topic => topic.partitionData.forEach { partition => + val topicPartition = new TopicPartition(topic.name, partition.index) + // This caller assumes the type is MemoryRecords and that is true on current serialization + // We cast the type to avoid causing big change to code base. + // https://issues.apache.org/jira/browse/KAFKA-10698 + val memoryRecords = partition.records.asInstanceOf[MemoryRecords] if (!authorizedTopics.contains(topicPartition.topic)) unauthorizedTopicResponses += topicPartition -> new PartitionResponse(Errors.TOPIC_AUTHORIZATION_FAILED) else if (!metadataCache.contains(topicPartition)) @@ -600,9 +612,13 @@ class KafkaApis(val requestChannel: RequestChannel, case e: ApiException => invalidRequestResponses += topicPartition -> new PartitionResponse(Errors.forException(e)) } - } + }) // the callback for sending a produce response + // The construction of ProduceResponse is able to accept auto-generated protocol data so + // KafkaApis#handleProduceRequest should apply auto-generated protocol to avoid extra conversion. + // https://issues.apache.org/jira/browse/KAFKA-10730 + @nowarn("cat=deprecation") def sendResponseCallback(responseStatus: Map[TopicPartition, PartitionResponse]): Unit = { val mergedResponseStatus = responseStatus ++ unauthorizedTopicResponses ++ nonExistingTopicResponses ++ invalidRequestResponses var errorInResponse = false diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala index c50c5cb776c3a..a6f67ca060530 100644 --- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala @@ -35,7 +35,7 @@ import org.apache.kafka.common.config.internals.BrokerSecurityConfigs import org.apache.kafka.common.config.{ConfigResource, LogLevelConfig} import org.apache.kafka.common.errors._ import org.apache.kafka.common.internals.Topic.GROUP_METADATA_TOPIC_NAME -import org.apache.kafka.common.message.AddOffsetsToTxnRequestData +import org.apache.kafka.common.message.{AddOffsetsToTxnRequestData, AlterPartitionReassignmentsRequestData, AlterReplicaLogDirsRequestData, ControlledShutdownRequestData, CreateAclsRequestData, CreatePartitionsRequestData, CreateTopicsRequestData, DeleteAclsRequestData, DeleteGroupsRequestData, DeleteRecordsRequestData, DeleteTopicsRequestData, DescribeConfigsRequestData, DescribeGroupsRequestData, DescribeLogDirsRequestData, FindCoordinatorRequestData, HeartbeatRequestData, IncrementalAlterConfigsRequestData, JoinGroupRequestData, ListPartitionReassignmentsRequestData, OffsetCommitRequestData, ProduceRequestData, SyncGroupRequestData} import org.apache.kafka.common.message.CreatePartitionsRequestData.CreatePartitionsTopic import org.apache.kafka.common.message.CreateTopicsRequestData.{CreatableTopic, CreatableTopicCollection} import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.{AlterConfigsResource, AlterableConfig, AlterableConfigCollection} @@ -45,7 +45,6 @@ import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity import org.apache.kafka.common.message.ListOffsetRequestData.{ListOffsetPartition, ListOffsetTopic} import org.apache.kafka.common.message.StopReplicaRequestData.{StopReplicaPartitionState, StopReplicaTopicState} import org.apache.kafka.common.message.UpdateMetadataRequestData.{UpdateMetadataBroker, UpdateMetadataEndpoint, UpdateMetadataPartitionState} -import org.apache.kafka.common.message.{AlterPartitionReassignmentsRequestData, AlterReplicaLogDirsRequestData, ControlledShutdownRequestData, CreateAclsRequestData, CreatePartitionsRequestData, CreateTopicsRequestData, DeleteAclsRequestData, DeleteGroupsRequestData, DeleteRecordsRequestData, DeleteTopicsRequestData, DescribeConfigsRequestData, DescribeGroupsRequestData, DescribeLogDirsRequestData, FindCoordinatorRequestData, HeartbeatRequestData, IncrementalAlterConfigsRequestData, JoinGroupRequestData, ListPartitionReassignmentsRequestData, OffsetCommitRequestData, SyncGroupRequestData} import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.protocol.{ApiKeys, Errors} import org.apache.kafka.common.record.{CompressionType, MemoryRecords, RecordBatch, Records, SimpleRecord} @@ -152,6 +151,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest { classOf[PrincipalBuilder].getName) } + @nowarn("cat=deprecation") val requestKeyToError = Map[ApiKeys, Nothing => Errors]( ApiKeys.METADATA -> ((resp: requests.MetadataResponse) => resp.errors.asScala.find(_._1 == topic).getOrElse(("test", Errors.NONE))._2), ApiKeys.PRODUCE -> ((resp: requests.ProduceResponse) => resp.responses.asScala.find(_._1 == tp).get._2.error), @@ -290,11 +290,18 @@ class AuthorizerIntegrationTest extends BaseRequestTest { new requests.MetadataRequest.Builder(List(topic).asJava, allowAutoTopicCreation).build() } - private def createProduceRequest = { - requests.ProduceRequest.Builder.forCurrentMagic(1, 5000, - collection.mutable.Map(tp -> MemoryRecords.withRecords(CompressionType.NONE, new SimpleRecord("test".getBytes))).asJava). - build() - } + private def createProduceRequest = + requests.ProduceRequest.forCurrentMagic(new ProduceRequestData() + .setTopicData(new ProduceRequestData.TopicProduceDataCollection( + Collections.singletonList(new ProduceRequestData.TopicProduceData() + .setName(tp.topic()).setPartitionData(Collections.singletonList( + new ProduceRequestData.PartitionProduceData() + .setIndex(tp.partition()) + .setRecords(MemoryRecords.withRecords(CompressionType.NONE, new SimpleRecord("test".getBytes)))))) + .iterator)) + .setAcks(1.toShort) + .setTimeoutMs(5000)) + .build() private def createFetchRequest = { val partitionMap = new util.LinkedHashMap[TopicPartition, requests.FetchRequest.PartitionData] diff --git a/core/src/test/scala/integration/kafka/network/DynamicConnectionQuotaTest.scala b/core/src/test/scala/integration/kafka/network/DynamicConnectionQuotaTest.scala index 78eeeba47f671..c67852cef7e4d 100644 --- a/core/src/test/scala/integration/kafka/network/DynamicConnectionQuotaTest.scala +++ b/core/src/test/scala/integration/kafka/network/DynamicConnectionQuotaTest.scala @@ -20,22 +20,24 @@ package kafka.network import java.io.IOException import java.net.{InetAddress, Socket} -import java.util.Properties import java.util.concurrent._ +import java.util.{Collections, Properties} import kafka.server.{BaseRequestTest, KafkaConfig} import kafka.utils.TestUtils import org.apache.kafka.clients.admin.{Admin, AdminClientConfig} +import org.apache.kafka.common.message.ProduceRequestData import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.record.{CompressionType, MemoryRecords, SimpleRecord} import org.apache.kafka.common.requests.{ProduceRequest, ProduceResponse} import org.apache.kafka.common.security.auth.SecurityProtocol -import org.apache.kafka.common.{KafkaException, TopicPartition} +import org.apache.kafka.common.{KafkaException, requests} import org.junit.Assert._ import org.junit.{After, Before, Test} import org.scalatest.Assertions.intercept +import scala.annotation.nowarn import scala.jdk.CollectionConverters._ class DynamicConnectionQuotaTest extends BaseRequestTest { @@ -265,12 +267,20 @@ class DynamicConnectionQuotaTest extends BaseRequestTest { } } - private def produceRequest: ProduceRequest = { - val topicPartition = new TopicPartition(topic, 0) - val memoryRecords = MemoryRecords.withRecords(CompressionType.NONE, new SimpleRecord(System.currentTimeMillis(), "key".getBytes, "value".getBytes)) - val partitionRecords = Map(topicPartition -> memoryRecords) - ProduceRequest.Builder.forCurrentMagic(-1, 3000, partitionRecords.asJava).build() - } + private def produceRequest: ProduceRequest = + requests.ProduceRequest.forCurrentMagic(new ProduceRequestData() + .setTopicData(new ProduceRequestData.TopicProduceDataCollection( + Collections.singletonList(new ProduceRequestData.TopicProduceData() + .setName(topic) + .setPartitionData(Collections.singletonList(new ProduceRequestData.PartitionProduceData() + .setIndex(0) + .setRecords(MemoryRecords.withRecords(CompressionType.NONE, + new SimpleRecord(System.currentTimeMillis(), "key".getBytes, "value".getBytes)))))) + .iterator)) + .setAcks((-1).toShort) + .setTimeoutMs(3000) + .setTransactionalId(null)) + .build() def connectionCount: Int = servers.head.socketServer.connectionCount(localAddress) @@ -288,6 +298,7 @@ class DynamicConnectionQuotaTest extends BaseRequestTest { } } + @nowarn("cat=deprecation") private def verifyConnection(socket: Socket): Unit = { val produceResponse = sendAndReceive[ProduceResponse](produceRequest, socket) assertEquals(1, produceResponse.responses.size) diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala index 34f5c916110e8..730446726912a 100644 --- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala +++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala @@ -24,7 +24,7 @@ import java.nio.channels.{SelectionKey, SocketChannel} import java.nio.charset.StandardCharsets import java.util import java.util.concurrent.{CompletableFuture, ConcurrentLinkedQueue, Executors, TimeUnit} -import java.util.{HashMap, Properties, Random} +import java.util.{Properties, Random} import com.yammer.metrics.core.{Gauge, Meter} import javax.net.ssl._ @@ -33,29 +33,26 @@ import kafka.security.CredentialProvider import kafka.server.{KafkaConfig, ThrottledChannel} import kafka.utils.Implicits._ import kafka.utils.TestUtils -import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.memory.MemoryPool -import org.apache.kafka.common.message.{SaslAuthenticateRequestData, SaslHandshakeRequestData, VoteRequestData} +import org.apache.kafka.common.message.{ProduceRequestData, SaslAuthenticateRequestData, SaslHandshakeRequestData, VoteRequestData} import org.apache.kafka.common.metrics.Metrics -import org.apache.kafka.common.network.ClientInformation import org.apache.kafka.common.network.KafkaChannel.ChannelMuteState -import org.apache.kafka.common.network._ +import org.apache.kafka.common.network.{ClientInformation, _} import org.apache.kafka.common.protocol.{ApiKeys, Errors} -import org.apache.kafka.common.record.MemoryRecords -import org.apache.kafka.common.requests.{AbstractRequest, ApiVersionsRequest, ProduceRequest, RequestHeader, SaslAuthenticateRequest, SaslHandshakeRequest, VoteRequest} +import org.apache.kafka.common.requests +import org.apache.kafka.common.requests._ import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol} import org.apache.kafka.common.security.scram.internals.ScramMechanism -import org.apache.kafka.common.utils.AppInfoParser -import org.apache.kafka.common.utils.{LogContext, MockTime, Time} +import org.apache.kafka.common.utils.{AppInfoParser, LogContext, MockTime, Time} import org.apache.kafka.test.{TestSslUtils, TestUtils => JTestUtils} import org.apache.log4j.Level import org.junit.Assert._ import org.junit._ import org.scalatest.Assertions.fail -import scala.jdk.CollectionConverters._ import scala.collection.mutable import scala.collection.mutable.ArrayBuffer +import scala.jdk.CollectionConverters._ import scala.util.control.ControlThrowable class SocketServerTest { @@ -210,8 +207,12 @@ class SocketServerTest { val clientId = "" val ackTimeoutMs = 10000 - val emptyRequest = ProduceRequest.Builder.forCurrentMagic(ack, ackTimeoutMs, - new HashMap[TopicPartition, MemoryRecords]()).build() + val emptyRequest = requests.ProduceRequest.forCurrentMagic(new ProduceRequestData() + .setTopicData(new ProduceRequestData.TopicProduceDataCollection()) + .setAcks(ack) + .setTimeoutMs(ackTimeoutMs) + .setTransactionalId(null)) + .build() val emptyHeader = new RequestHeader(ApiKeys.PRODUCE, emptyRequest.version, clientId, correlationId) val byteBuffer = emptyRequest.serialize(emptyHeader) byteBuffer.rewind() @@ -897,8 +898,12 @@ class SocketServerTest { val clientId = "" val ackTimeoutMs = 10000 val ack = 0: Short - val emptyRequest = ProduceRequest.Builder.forCurrentMagic(ack, ackTimeoutMs, - new HashMap[TopicPartition, MemoryRecords]()).build() + val emptyRequest = requests.ProduceRequest.forCurrentMagic(new ProduceRequestData() + .setTopicData(new ProduceRequestData.TopicProduceDataCollection()) + .setAcks(ack) + .setTimeoutMs(ackTimeoutMs) + .setTransactionalId(null)) + .build() val emptyHeader = new RequestHeader(ApiKeys.PRODUCE, emptyRequest.version, clientId, correlationId) val byteBuffer = emptyRequest.serialize(emptyHeader) @@ -980,8 +985,12 @@ class SocketServerTest { // ...and now send something to trigger the disconnection val ackTimeoutMs = 10000 val ack = 0: Short - val emptyRequest = ProduceRequest.Builder.forCurrentMagic(ack, ackTimeoutMs, - new HashMap[TopicPartition, MemoryRecords]()).build() + val emptyRequest = requests.ProduceRequest.forCurrentMagic(new ProduceRequestData() + .setTopicData(new ProduceRequestData.TopicProduceDataCollection()) + .setAcks(ack) + .setTimeoutMs(ackTimeoutMs) + .setTransactionalId(null)) + .build() val emptyHeader = new RequestHeader(ApiKeys.PRODUCE, emptyRequest.version, clientId, correlationId) sendApiRequest(socket, emptyRequest, emptyHeader) // wait a little bit for the server-side disconnection to occur since it happens asynchronously diff --git a/core/src/test/scala/unit/kafka/server/EdgeCaseRequestTest.scala b/core/src/test/scala/unit/kafka/server/EdgeCaseRequestTest.scala index 73527a67b210e..a877037be96bd 100755 --- a/core/src/test/scala/unit/kafka/server/EdgeCaseRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/EdgeCaseRequestTest.scala @@ -20,21 +20,23 @@ package kafka.server import java.io.{DataInputStream, DataOutputStream} import java.net.Socket import java.nio.ByteBuffer +import java.util.Collections import kafka.integration.KafkaServerTestHarness import kafka.network.SocketServer import kafka.utils._ -import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.message.ProduceRequestData import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.protocol.types.Type import org.apache.kafka.common.protocol.{ApiKeys, Errors} import org.apache.kafka.common.record.{CompressionType, MemoryRecords, SimpleRecord} -import org.apache.kafka.common.requests.{ProduceRequest, ProduceResponse, ResponseHeader} +import org.apache.kafka.common.requests.{ProduceResponse, ResponseHeader} import org.apache.kafka.common.security.auth.SecurityProtocol +import org.apache.kafka.common.{TopicPartition, requests} import org.junit.Assert._ import org.junit.Test -import scala.jdk.CollectionConverters._ +import scala.annotation.nowarn class EdgeCaseRequestTest extends KafkaServerTestHarness { @@ -108,6 +110,7 @@ class EdgeCaseRequestTest extends KafkaServerTestHarness { } } + @nowarn("cat=deprecation") @Test def testProduceRequestWithNullClientId(): Unit = { val topic = "topic" @@ -119,8 +122,18 @@ class EdgeCaseRequestTest extends KafkaServerTestHarness { val (serializedBytes, responseHeaderVersion) = { val headerBytes = requestHeaderBytes(ApiKeys.PRODUCE.id, version, null, correlationId) - val records = MemoryRecords.withRecords(CompressionType.NONE, new SimpleRecord("message".getBytes)) - val request = ProduceRequest.Builder.forCurrentMagic(1, 10000, Map(topicPartition -> records).asJava).build() + val request = requests.ProduceRequest.forCurrentMagic(new ProduceRequestData() + .setTopicData(new ProduceRequestData.TopicProduceDataCollection( + Collections.singletonList(new ProduceRequestData.TopicProduceData() + .setName(topicPartition.topic()).setPartitionData(Collections.singletonList( + new ProduceRequestData.PartitionProduceData() + .setIndex(topicPartition.partition()) + .setRecords(MemoryRecords.withRecords(CompressionType.NONE, new SimpleRecord("message".getBytes)))))) + .iterator)) + .setAcks(1.toShort) + .setTimeoutMs(10000) + .setTransactionalId(null)) + .build() val byteBuffer = ByteBuffer.allocate(headerBytes.length + request.toStruct.sizeOf) byteBuffer.put(headerBytes) request.toStruct.writeTo(byteBuffer) diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala index 1a534120bd5ea..3a2f99d63a6f7 100644 --- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala @@ -71,6 +71,7 @@ import org.easymock.{Capture, EasyMock, IAnswer, IArgumentMatcher} import org.junit.Assert.{assertArrayEquals, assertEquals, assertNull, assertTrue} import org.junit.{After, Test} +import scala.annotation.nowarn import scala.collection.{Map, Seq, mutable} import scala.compat.java8.OptionConverters._ import scala.jdk.CollectionConverters._ @@ -1232,6 +1233,7 @@ class KafkaApisTest { } } + @nowarn("cat=deprecation") @Test def shouldReplaceProducerFencedWithInvalidProducerEpochInProduceResponse(): Unit = { val topic = "topic" @@ -1245,10 +1247,17 @@ class KafkaApisTest { val tp = new TopicPartition("topic", 0) - val produceRequest = ProduceRequest.Builder.forCurrentMagic(1, 5000, - collection.mutable.Map(tp -> MemoryRecords.withRecords(CompressionType.NONE, - new SimpleRecord("test".getBytes))).asJava) - .build(version.toShort) + val produceRequest = ProduceRequest.forCurrentMagic(new ProduceRequestData() + .setTopicData(new ProduceRequestData.TopicProduceDataCollection( + Collections.singletonList(new ProduceRequestData.TopicProduceData() + .setName(tp.topic()).setPartitionData(Collections.singletonList( + new ProduceRequestData.PartitionProduceData() + .setIndex(tp.partition()) + .setRecords(MemoryRecords.withRecords(CompressionType.NONE, new SimpleRecord("test".getBytes)))))) + .iterator)) + .setAcks(1.toShort) + .setTimeoutMs(5000)) + .build(version.toShort) val request = buildRequest(produceRequest) EasyMock.expect(replicaManager.appendRecords(EasyMock.anyLong(), diff --git a/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala b/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala index 4368442a4ca74..ca43d3b8dba05 100644 --- a/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala @@ -18,13 +18,14 @@ package kafka.server import java.nio.ByteBuffer -import java.util.Properties +import java.util.{Collections, Properties} import kafka.log.LogConfig import kafka.message.ZStdCompressionCodec import kafka.metrics.KafkaYammerMetrics import kafka.utils.TestUtils import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.message.ProduceRequestData import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.record._ import org.apache.kafka.common.requests.{ProduceRequest, ProduceResponse} @@ -32,6 +33,7 @@ import org.junit.Assert._ import org.junit.Test import org.scalatest.Assertions.fail +import scala.annotation.nowarn import scala.jdk.CollectionConverters._ /** @@ -42,15 +44,24 @@ class ProduceRequestTest extends BaseRequestTest { val metricsKeySet = KafkaYammerMetrics.defaultRegistry.allMetrics.keySet.asScala + @nowarn("cat=deprecation") @Test def testSimpleProduceRequest(): Unit = { val (partition, leader) = createTopicAndFindPartitionWithLeader("topic") def sendAndCheck(memoryRecords: MemoryRecords, expectedOffset: Long): ProduceResponse.PartitionResponse = { val topicPartition = new TopicPartition("topic", partition) - val partitionRecords = Map(topicPartition -> memoryRecords) val produceResponse = sendProduceRequest(leader, - ProduceRequest.Builder.forCurrentMagic(-1, 3000, partitionRecords.asJava).build()) + ProduceRequest.forCurrentMagic(new ProduceRequestData() + .setTopicData(new ProduceRequestData.TopicProduceDataCollection(Collections.singletonList( + new ProduceRequestData.TopicProduceData() + .setName(topicPartition.topic()) + .setPartitionData(Collections.singletonList(new ProduceRequestData.PartitionProduceData() + .setIndex(topicPartition.partition()) + .setRecords(memoryRecords)))).iterator)) + .setAcks((-1).toShort) + .setTimeoutMs(3000) + .setTransactionalId(null)).build()) assertEquals(1, produceResponse.responses.size) val (tp, partitionResponse) = produceResponse.responses.asScala.head assertEquals(topicPartition, tp) @@ -69,6 +80,7 @@ class ProduceRequestTest extends BaseRequestTest { new SimpleRecord(System.currentTimeMillis(), "key2".getBytes, "value2".getBytes)), 1) } + @nowarn("cat=deprecation") @Test def testProduceWithInvalidTimestamp(): Unit = { val topic = "topic" @@ -89,8 +101,16 @@ class ProduceRequestTest extends BaseRequestTest { val records = createRecords(RecordBatch.MAGIC_VALUE_V2, System.currentTimeMillis() - 1001L, CompressionType.GZIP) val topicPartition = new TopicPartition("topic", partition) - val partitionRecords = Map(topicPartition -> records) - val produceResponse = sendProduceRequest(leader, ProduceRequest.Builder.forCurrentMagic(-1, 3000, partitionRecords.asJava).build()) + val produceResponse = sendProduceRequest(leader, ProduceRequest.forCurrentMagic(new ProduceRequestData() + .setTopicData(new ProduceRequestData.TopicProduceDataCollection(Collections.singletonList( + new ProduceRequestData.TopicProduceData() + .setName(topicPartition.topic()) + .setPartitionData(Collections.singletonList(new ProduceRequestData.PartitionProduceData() + .setIndex(topicPartition.partition()) + .setRecords(records)))).iterator)) + .setAcks((-1).toShort) + .setTimeoutMs(3000) + .setTransactionalId(null)).build()) val (tp, partitionResponse) = produceResponse.responses.asScala.head assertEquals(topicPartition, tp) assertEquals(Errors.INVALID_TIMESTAMP, partitionResponse.error) @@ -105,6 +125,7 @@ class ProduceRequestTest extends BaseRequestTest { assertEquals("One or more records have been rejected due to invalid timestamp", partitionResponse.errorMessage) } + @nowarn("cat=deprecation") @Test def testProduceToNonReplica(): Unit = { val topic = "topic" @@ -120,8 +141,16 @@ class ProduceRequestTest extends BaseRequestTest { // Send the produce request to the non-replica val records = MemoryRecords.withRecords(CompressionType.NONE, new SimpleRecord("key".getBytes, "value".getBytes)) val topicPartition = new TopicPartition("topic", partition) - val partitionRecords = Map(topicPartition -> records) - val produceRequest = ProduceRequest.Builder.forCurrentMagic(-1, 3000, partitionRecords.asJava).build() + val produceRequest = ProduceRequest.forCurrentMagic(new ProduceRequestData() + .setTopicData(new ProduceRequestData.TopicProduceDataCollection(Collections.singletonList( + new ProduceRequestData.TopicProduceData() + .setName(topicPartition.topic()) + .setPartitionData(Collections.singletonList(new ProduceRequestData.PartitionProduceData() + .setIndex(topicPartition.partition()) + .setRecords(records)))).iterator)) + .setAcks((-1).toShort) + .setTimeoutMs(3000) + .setTransactionalId(null)).build() val produceResponse = sendProduceRequest(nonReplicaId, produceRequest) assertEquals(1, produceResponse.responses.size) @@ -136,6 +165,7 @@ class ProduceRequestTest extends BaseRequestTest { }.getOrElse(fail(s"No leader elected for topic $topic")) } + @nowarn("cat=deprecation") @Test def testCorruptLz4ProduceRequest(): Unit = { val (partition, leader) = createTopicAndFindPartitionWithLeader("topic") @@ -146,9 +176,16 @@ class ProduceRequestTest extends BaseRequestTest { val lz4ChecksumOffset = 6 memoryRecords.buffer.array.update(DefaultRecordBatch.RECORD_BATCH_OVERHEAD + lz4ChecksumOffset, 0) val topicPartition = new TopicPartition("topic", partition) - val partitionRecords = Map(topicPartition -> memoryRecords) - val produceResponse = sendProduceRequest(leader, - ProduceRequest.Builder.forCurrentMagic(-1, 3000, partitionRecords.asJava).build()) + val produceResponse = sendProduceRequest(leader, ProduceRequest.forCurrentMagic(new ProduceRequestData() + .setTopicData(new ProduceRequestData.TopicProduceDataCollection(Collections.singletonList( + new ProduceRequestData.TopicProduceData() + .setName(topicPartition.topic()) + .setPartitionData(Collections.singletonList(new ProduceRequestData.PartitionProduceData() + .setIndex(topicPartition.partition()) + .setRecords(memoryRecords)))).iterator)) + .setAcks((-1).toShort) + .setTimeoutMs(3000) + .setTransactionalId(null)).build()) assertEquals(1, produceResponse.responses.size) val (tp, partitionResponse) = produceResponse.responses.asScala.head assertEquals(topicPartition, tp) @@ -159,6 +196,7 @@ class ProduceRequestTest extends BaseRequestTest { assertTrue(TestUtils.meterCount(s"${BrokerTopicStats.InvalidMessageCrcRecordsPerSec}") > 0) } + @nowarn("cat=deprecation") @Test def testZSTDProduceRequest(): Unit = { val topic = "topic" @@ -172,11 +210,21 @@ class ProduceRequestTest extends BaseRequestTest { val memoryRecords = MemoryRecords.withRecords(CompressionType.ZSTD, new SimpleRecord(System.currentTimeMillis(), "key".getBytes, "value".getBytes)) val topicPartition = new TopicPartition("topic", partition) - val partitionRecords = Map(topicPartition -> memoryRecords) + val partitionRecords = new ProduceRequestData() + .setTopicData(new ProduceRequestData.TopicProduceDataCollection(Collections.singletonList( + new ProduceRequestData.TopicProduceData() + .setName("topic").setPartitionData(Collections.singletonList( + new ProduceRequestData.PartitionProduceData() + .setIndex(partition) + .setRecords(memoryRecords)))) + .iterator)) + .setAcks((-1).toShort) + .setTimeoutMs(3000) + .setTransactionalId(null) // produce request with v7: works fine! val res1 = sendProduceRequest(leader, - new ProduceRequest.Builder(7, 7, -1, 3000, partitionRecords.asJava, null).build()) + new ProduceRequest.Builder(7, 7, partitionRecords).build()) val (tp1, partitionResponse1) = res1.responses.asScala.head assertEquals(topicPartition, tp1) assertEquals(Errors.NONE, partitionResponse1.error) @@ -185,7 +233,7 @@ class ProduceRequestTest extends BaseRequestTest { // produce request with v3: returns Errors.UNSUPPORTED_COMPRESSION_TYPE. val res2 = sendProduceRequest(leader, - new ProduceRequest.Builder(3, 3, -1, 3000, partitionRecords.asJava, null) + new ProduceRequest.Builder(3, 3, partitionRecords) .buildUnsafe(3)) val (tp2, partitionResponse2) = res2.responses.asScala.head assertEquals(topicPartition, tp2) diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala index ed886363b174a..7ea0dba9c51e5 100644 --- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala +++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala @@ -26,7 +26,6 @@ import kafka.security.authorizer.AclAuthorizer import kafka.utils.TestUtils import org.apache.kafka.common.acl._ import org.apache.kafka.common.config.ConfigResource -import org.apache.kafka.common.message.AddOffsetsToTxnRequestData import org.apache.kafka.common.message.CreatePartitionsRequestData.CreatePartitionsTopic import org.apache.kafka.common.message.CreateTopicsRequestData.{CreatableTopic, CreatableTopicCollection} import org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProtocolCollection @@ -35,7 +34,7 @@ import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity import org.apache.kafka.common.message.ListOffsetRequestData.{ListOffsetPartition, ListOffsetTopic} import org.apache.kafka.common.message.StopReplicaRequestData.{StopReplicaPartitionState, StopReplicaTopicState} import org.apache.kafka.common.message.UpdateMetadataRequestData.{UpdateMetadataBroker, UpdateMetadataEndpoint, UpdateMetadataPartitionState} -import org.apache.kafka.common.message._ +import org.apache.kafka.common.message.{AddOffsetsToTxnRequestData, _} import org.apache.kafka.common.metrics.{KafkaMetric, Quota, Sensor} import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.protocol.ApiKeys @@ -43,15 +42,15 @@ import org.apache.kafka.common.quota.ClientQuotaFilter import org.apache.kafka.common.record._ import org.apache.kafka.common.requests._ import org.apache.kafka.common.resource.{PatternType, ResourceType => AdminResourceType} -import org.apache.kafka.common.security.auth.{AuthenticationContext, KafkaPrincipal, KafkaPrincipalBuilder, KafkaPrincipalSerde, SecurityProtocol} +import org.apache.kafka.common.security.auth._ import org.apache.kafka.common.utils.{Sanitizer, SecurityUtils} -import org.apache.kafka.common.{ElectionType, IsolationLevel, Node, TopicPartition} +import org.apache.kafka.common._ import org.apache.kafka.server.authorizer.{Action, AuthorizableRequestContext, AuthorizationResult} import org.junit.Assert._ import org.junit.{After, Before, Test} -import scala.jdk.CollectionConverters._ import scala.collection.mutable.ListBuffer +import scala.jdk.CollectionConverters._ class RequestQuotaTest extends BaseRequestTest { @@ -211,8 +210,16 @@ class RequestQuotaTest extends BaseRequestTest { private def requestBuilder(apiKey: ApiKeys): AbstractRequest.Builder[_ <: AbstractRequest] = { apiKey match { case ApiKeys.PRODUCE => - ProduceRequest.Builder.forCurrentMagic(1, 5000, - collection.mutable.Map(tp -> MemoryRecords.withRecords(CompressionType.NONE, new SimpleRecord("test".getBytes))).asJava) + requests.ProduceRequest.forCurrentMagic(new ProduceRequestData() + .setTopicData(new ProduceRequestData.TopicProduceDataCollection( + Collections.singletonList(new ProduceRequestData.TopicProduceData() + .setName(tp.topic()).setPartitionData(Collections.singletonList( + new ProduceRequestData.PartitionProduceData() + .setIndex(tp.partition()) + .setRecords(MemoryRecords.withRecords(CompressionType.NONE, new SimpleRecord("test".getBytes)))))) + .iterator)) + .setAcks(1.toShort) + .setTimeoutMs(5000)) case ApiKeys.FETCH => val partitionMap = new util.LinkedHashMap[TopicPartition, FetchRequest.PartitionData] diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/producer/ProducerRequestBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/producer/ProducerRequestBenchmark.java new file mode 100644 index 0000000000000..58b35f354f92e --- /dev/null +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/producer/ProducerRequestBenchmark.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.jmh.producer; + +import org.apache.kafka.common.message.ProduceRequestData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.protocol.types.Struct; +import org.apache.kafka.common.record.CompressionType; +import org.apache.kafka.common.record.MemoryRecords; +import org.apache.kafka.common.record.RecordBatch; +import org.apache.kafka.common.record.SimpleRecord; +import org.apache.kafka.common.requests.ProduceRequest; +import org.apache.kafka.common.requests.ProduceResponse; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Warmup; + +import java.nio.charset.StandardCharsets; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +@State(Scope.Benchmark) +@Fork(value = 1) +@Warmup(iterations = 5) +@Measurement(iterations = 15) +@BenchmarkMode(Mode.AverageTime) +@OutputTimeUnit(TimeUnit.NANOSECONDS) +public class ProducerRequestBenchmark { + private static final int NUMBER_OF_PARTITIONS = 3; + private static final int NUMBER_OF_RECORDS = 3; + private static final List TOPIC_PRODUCE_DATA = Collections.singletonList(new ProduceRequestData.TopicProduceData() + .setName("tp") + .setPartitionData(IntStream.range(0, NUMBER_OF_PARTITIONS).mapToObj(partitionIndex -> new ProduceRequestData.PartitionProduceData() + .setIndex(partitionIndex) + .setRecords(MemoryRecords.withRecords(CompressionType.NONE, IntStream.range(0, NUMBER_OF_RECORDS) + .mapToObj(recordIndex -> new SimpleRecord(100, "hello0".getBytes(StandardCharsets.UTF_8))) + .collect(Collectors.toList()) + .toArray(new SimpleRecord[0])))) + .collect(Collectors.toList())) + ); + private static final ProduceRequestData PRODUCE_REQUEST_DATA = new ProduceRequestData() + .setTimeoutMs(100) + .setAcks((short) 1) + .setTopicData(new ProduceRequestData.TopicProduceDataCollection(TOPIC_PRODUCE_DATA.iterator())); + + private static ProduceRequest request() { + return ProduceRequest.forMagic(RecordBatch.CURRENT_MAGIC_VALUE, PRODUCE_REQUEST_DATA).build(); + } + + private static final ProduceRequest REQUEST = request(); + + @Benchmark + @OutputTimeUnit(TimeUnit.NANOSECONDS) + public ProduceRequest constructorProduceRequest() { + return request(); + } + + @Benchmark + @OutputTimeUnit(TimeUnit.NANOSECONDS) + public ProduceResponse constructorErrorResponse() { + return REQUEST.getErrorResponse(0, Errors.INVALID_REQUEST.exception()); + } + + @Benchmark + @OutputTimeUnit(TimeUnit.NANOSECONDS) + public Struct constructorStruct() { + return REQUEST.toStruct(); + } +} diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/producer/ProducerResponseBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/producer/ProducerResponseBenchmark.java new file mode 100644 index 0000000000000..307e194431818 --- /dev/null +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/producer/ProducerResponseBenchmark.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.jmh.producer; + +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.protocol.types.Struct; +import org.apache.kafka.common.requests.AbstractResponse; +import org.apache.kafka.common.requests.ProduceResponse; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Warmup; + +import java.util.AbstractMap; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static org.apache.kafka.common.protocol.ApiKeys.PRODUCE; + +@State(Scope.Benchmark) +@Fork(value = 1) +@Warmup(iterations = 5) +@Measurement(iterations = 15) +@BenchmarkMode(Mode.AverageTime) +@OutputTimeUnit(TimeUnit.NANOSECONDS) +public class ProducerResponseBenchmark { + private static final int NUMBER_OF_PARTITIONS = 3; + private static final int NUMBER_OF_RECORDS = 3; + private static final Map PARTITION_RESPONSE_MAP = IntStream.range(0, NUMBER_OF_PARTITIONS) + .mapToObj(partitionIndex -> new AbstractMap.SimpleEntry<>( + new TopicPartition("tp", partitionIndex), + new ProduceResponse.PartitionResponse( + Errors.NONE, + 0, + 0, + 0, + IntStream.range(0, NUMBER_OF_RECORDS) + .mapToObj(ProduceResponse.RecordError::new) + .collect(Collectors.toList())) + )) + .collect(Collectors.toMap(AbstractMap.SimpleEntry::getKey, AbstractMap.SimpleEntry::getValue)); + + /** + * this method is still used by production so we benchmark it. + * see https://issues.apache.org/jira/browse/KAFKA-10730 + */ + @SuppressWarnings("deprecation") + private static ProduceResponse response() { + return new ProduceResponse(PARTITION_RESPONSE_MAP); + } + + private static final ProduceResponse RESPONSE = response(); + + @Benchmark + @OutputTimeUnit(TimeUnit.NANOSECONDS) + public AbstractResponse constructorProduceResponse() { + return response(); + } + + @Benchmark + @OutputTimeUnit(TimeUnit.NANOSECONDS) + public Struct constructorStruct() { + return RESPONSE.toStruct(PRODUCE.latestVersion()); + } +}