diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java index 8bfc6298604be..7bbab0879bf42 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java @@ -222,7 +222,8 @@ public ProduceResponse(Struct struct) { int partition = partRespStruct.get(PARTITION_ID); Errors error = Errors.forCode(partRespStruct.get(ERROR_CODE)); long offset = partRespStruct.getLong(BASE_OFFSET_KEY_NAME); - long logAppendTime = partRespStruct.getLong(LOG_APPEND_TIME_KEY_NAME); + long logAppendTime = partRespStruct.hasField(LOG_APPEND_TIME_KEY_NAME) ? + partRespStruct.getLong(LOG_APPEND_TIME_KEY_NAME) : RecordBatch.NO_TIMESTAMP; long logStartOffset = partRespStruct.getOrElse(LOG_START_OFFSET_FIELD, INVALID_OFFSET); List recordErrors = Collections.emptyList(); @@ -274,19 +275,20 @@ protected Struct toStruct(short version) { partStruct.setIfExists(LOG_APPEND_TIME_KEY_NAME, part.logAppendTime); partStruct.setIfExists(LOG_START_OFFSET_FIELD, part.logStartOffset); - List recordErrors = Collections.emptyList(); - if (!part.recordErrors.isEmpty()) { - recordErrors = new ArrayList<>(); - for (RecordError indexAndMessage : part.recordErrors) { - Struct indexAndMessageStruct = partStruct.instance(RECORD_ERRORS_KEY_NAME) - .set(BATCH_INDEX_KEY_NAME, indexAndMessage.batchIndex) - .set(BATCH_INDEX_ERROR_MESSAGE_FIELD, indexAndMessage.message); - recordErrors.add(indexAndMessageStruct); + if (partStruct.hasField(RECORD_ERRORS_KEY_NAME)) { + List recordErrors = Collections.emptyList(); + if (!part.recordErrors.isEmpty()) { + recordErrors = new ArrayList<>(); + for (RecordError indexAndMessage : part.recordErrors) { + Struct indexAndMessageStruct = partStruct.instance(RECORD_ERRORS_KEY_NAME) + .set(BATCH_INDEX_KEY_NAME, indexAndMessage.batchIndex) + .set(BATCH_INDEX_ERROR_MESSAGE_FIELD, indexAndMessage.message); + recordErrors.add(indexAndMessageStruct); + } } + partStruct.set(RECORD_ERRORS_KEY_NAME, recordErrors.toArray()); } - partStruct.setIfExists(RECORD_ERRORS_KEY_NAME, recordErrors.toArray()); - partStruct.setIfExists(ERROR_MESSAGE_FIELD, part.errorMessage); partitionArray.add(partStruct); } diff --git a/clients/src/test/java/org/apache/kafka/common/requests/ProduceResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/ProduceResponseTest.java new file mode 100644 index 0000000000000..ea6e99882d200 --- /dev/null +++ b/clients/src/test/java/org/apache/kafka/common/requests/ProduceResponseTest.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.requests; + +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.protocol.types.Struct; +import org.apache.kafka.common.record.RecordBatch; +import org.junit.Test; + +import java.nio.ByteBuffer; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +import static org.apache.kafka.common.protocol.ApiKeys.PRODUCE; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class ProduceResponseTest { + + @Test + public void produceResponseV5Test() { + Map responseData = new HashMap<>(); + TopicPartition tp0 = new TopicPartition("test", 0); + responseData.put(tp0, new ProduceResponse.PartitionResponse(Errors.NONE, + 10000, RecordBatch.NO_TIMESTAMP, 100)); + + ProduceResponse v5Response = new ProduceResponse(responseData, 10); + short version = 5; + + ByteBuffer buffer = v5Response.serialize(ApiKeys.PRODUCE, version, 0); + buffer.rewind(); + + ResponseHeader.parse(buffer, ApiKeys.PRODUCE.responseHeaderVersion(version)); // throw away. + + Struct deserializedStruct = ApiKeys.PRODUCE.parseResponse(version, buffer); + + ProduceResponse v5FromBytes = (ProduceResponse) AbstractResponse.parseResponse(ApiKeys.PRODUCE, + deserializedStruct, version); + + assertEquals(1, v5FromBytes.responses().size()); + assertTrue(v5FromBytes.responses().containsKey(tp0)); + ProduceResponse.PartitionResponse partitionResponse = v5FromBytes.responses().get(tp0); + assertEquals(100, partitionResponse.logStartOffset); + assertEquals(10000, partitionResponse.baseOffset); + assertEquals(10, v5FromBytes.throttleTimeMs()); + assertEquals(responseData, v5Response.responses()); + } + + @Test + public void produceResponseVersionTest() { + Map responseData = new HashMap<>(); + responseData.put(new TopicPartition("test", 0), new ProduceResponse.PartitionResponse(Errors.NONE, + 10000, RecordBatch.NO_TIMESTAMP, 100)); + ProduceResponse v0Response = new ProduceResponse(responseData); + ProduceResponse v1Response = new ProduceResponse(responseData, 10); + ProduceResponse v2Response = new ProduceResponse(responseData, 10); + assertEquals("Throttle time must be zero", 0, v0Response.throttleTimeMs()); + assertEquals("Throttle time must be 10", 10, v1Response.throttleTimeMs()); + assertEquals("Throttle time must be 10", 10, v2Response.throttleTimeMs()); + assertEquals("Should use schema version 0", ApiKeys.PRODUCE.responseSchema((short) 0), + v0Response.toStruct((short) 0).schema()); + assertEquals("Should use schema version 1", ApiKeys.PRODUCE.responseSchema((short) 1), + v1Response.toStruct((short) 1).schema()); + assertEquals("Should use schema version 2", ApiKeys.PRODUCE.responseSchema((short) 2), + v2Response.toStruct((short) 2).schema()); + assertEquals("Response data does not match", responseData, v0Response.responses()); + assertEquals("Response data does not match", responseData, v1Response.responses()); + assertEquals("Response data does not match", responseData, v2Response.responses()); + } + + @Test + public void produceResponseRecordErrorsTest() { + Map responseData = new HashMap<>(); + TopicPartition tp = new TopicPartition("test", 0); + ProduceResponse.PartitionResponse partResponse = new ProduceResponse.PartitionResponse(Errors.NONE, + 10000, RecordBatch.NO_TIMESTAMP, 100, + Collections.singletonList(new ProduceResponse.RecordError(3, "Record error")), + "Produce failed"); + responseData.put(tp, partResponse); + + for (short ver = 0; ver <= PRODUCE.latestVersion(); ver++) { + ProduceResponse response = new ProduceResponse(responseData); + Struct struct = response.toStruct(ver); + assertEquals("Should use schema version " + ver, ApiKeys.PRODUCE.responseSchema(ver), struct.schema()); + ProduceResponse.PartitionResponse deserialized = new ProduceResponse(struct).responses().get(tp); + if (ver >= 8) { + assertEquals(1, deserialized.recordErrors.size()); + assertEquals(3, deserialized.recordErrors.get(0).batchIndex); + assertEquals("Record error", deserialized.recordErrors.get(0).message); + assertEquals("Produce failed", deserialized.errorMessage); + } else { + assertEquals(0, deserialized.recordErrors.size()); + assertEquals(null, deserialized.errorMessage); + } + } + } +} diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java index 7943101760fd0..6ce9405100fc0 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java @@ -602,57 +602,6 @@ public void produceRequestGetErrorResponseTest() { assertEquals(RecordBatch.NO_TIMESTAMP, partitionResponse.logAppendTime); } - @Test - public void produceResponseV5Test() { - Map responseData = new HashMap<>(); - TopicPartition tp0 = new TopicPartition("test", 0); - responseData.put(tp0, new ProduceResponse.PartitionResponse(Errors.NONE, - 10000, RecordBatch.NO_TIMESTAMP, 100)); - - ProduceResponse v5Response = new ProduceResponse(responseData, 10); - short version = 5; - - ByteBuffer buffer = v5Response.serialize(ApiKeys.PRODUCE, version, 0); - buffer.rewind(); - - ResponseHeader.parse(buffer, ApiKeys.PRODUCE.responseHeaderVersion(version)); // throw away. - - Struct deserializedStruct = ApiKeys.PRODUCE.parseResponse(version, buffer); - - ProduceResponse v5FromBytes = (ProduceResponse) AbstractResponse.parseResponse(ApiKeys.PRODUCE, - deserializedStruct, version); - - assertEquals(1, v5FromBytes.responses().size()); - assertTrue(v5FromBytes.responses().containsKey(tp0)); - ProduceResponse.PartitionResponse partitionResponse = v5FromBytes.responses().get(tp0); - assertEquals(100, partitionResponse.logStartOffset); - assertEquals(10000, partitionResponse.baseOffset); - assertEquals(10, v5FromBytes.throttleTimeMs()); - assertEquals(responseData, v5Response.responses()); - } - - @Test - public void produceResponseVersionTest() { - Map responseData = new HashMap<>(); - responseData.put(new TopicPartition("test", 0), new ProduceResponse.PartitionResponse(Errors.NONE, - 10000, RecordBatch.NO_TIMESTAMP, 100)); - ProduceResponse v0Response = new ProduceResponse(responseData); - ProduceResponse v1Response = new ProduceResponse(responseData, 10); - ProduceResponse v2Response = new ProduceResponse(responseData, 10); - assertEquals("Throttle time must be zero", 0, v0Response.throttleTimeMs()); - assertEquals("Throttle time must be 10", 10, v1Response.throttleTimeMs()); - assertEquals("Throttle time must be 10", 10, v2Response.throttleTimeMs()); - assertEquals("Should use schema version 0", ApiKeys.PRODUCE.responseSchema((short) 0), - v0Response.toStruct((short) 0).schema()); - assertEquals("Should use schema version 1", ApiKeys.PRODUCE.responseSchema((short) 1), - v1Response.toStruct((short) 1).schema()); - assertEquals("Should use schema version 2", ApiKeys.PRODUCE.responseSchema((short) 2), - v2Response.toStruct((short) 2).schema()); - assertEquals("Response data does not match", responseData, v0Response.responses()); - assertEquals("Response data does not match", responseData, v1Response.responses()); - assertEquals("Response data does not match", responseData, v2Response.responses()); - } - @Test public void fetchResponseVersionTest() { LinkedHashMap> responseData = new LinkedHashMap<>();