From f0a78899b22268007608a857fa16b93c68d3be1a Mon Sep 17 00:00:00 2001 From: Ismael Juma Date: Sat, 28 Oct 2017 16:46:44 +0100 Subject: [PATCH 1/2] KAFKA-4203: Align broker default for max.message.bytes with Java producer default --- .../java/org/apache/kafka/clients/producer/ProducerConfig.java | 2 +- core/src/main/scala/kafka/server/KafkaConfig.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java index 75af5d64ecce4..cf400cb658c5b 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java @@ -280,7 +280,7 @@ public class ProducerConfig extends AbstractConfig { .define(RECEIVE_BUFFER_CONFIG, Type.INT, 32 * 1024, atLeast(CommonClientConfigs.RECEIVE_BUFFER_LOWER_BOUND), Importance.MEDIUM, CommonClientConfigs.RECEIVE_BUFFER_DOC) .define(MAX_REQUEST_SIZE_CONFIG, Type.INT, - 1 * 1024 * 1024, + 1024 * 1024, atLeast(0), Importance.MEDIUM, MAX_REQUEST_SIZE_DOC) diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 7bf9eae2d370a..96a2be67451e0 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -55,7 +55,7 @@ object Defaults { val BrokerIdGenerationEnable = true val MaxReservedBrokerId = 1000 val BrokerId = -1 - val MessageMaxBytes = 1000000 + Records.LOG_OVERHEAD + val MessageMaxBytes = 1024 * 1024 + Records.LOG_OVERHEAD val NumNetworkThreads = 3 val NumIoThreads = 8 val BackgroundThreads = 10 From 390fdaf0d4629d013af9154536232af30d090e36 Mon Sep 17 00:00:00 2001 From: Ismael Juma Date: Wed, 29 Jan 2020 02:06:31 -0800 Subject: [PATCH 2/2] Add test, improve error message and minor tweaks --- .../kafka/clients/producer/KafkaProducer.java | 17 ++++++----- .../kafka/api/PlaintextProducerSendTest.scala | 28 +++++++++++++++++-- 2 files changed, 34 insertions(+), 11 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index fc80d29efdab3..742ac61c51a90 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -321,7 +321,7 @@ public KafkaProducer(Properties properties, Serializer keySerializer, Seriali Serializer valueSerializer, ProducerMetadata metadata, KafkaClient kafkaClient, - ProducerInterceptors interceptors, + ProducerInterceptors interceptors, Time time) { ProducerConfig config = new ProducerConfig(ProducerConfig.addSerializerToConfig(configs, keySerializer, valueSerializer)); @@ -486,7 +486,7 @@ private static int configureDeliveryTimeout(ProducerConfig config, Logger log) { int requestTimeoutMs = config.getInt(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG); int lingerAndRequestTimeoutMs = (int) Math.min((long) lingerMs + requestTimeoutMs, Integer.MAX_VALUE); - if (deliveryTimeoutMs < Integer.MAX_VALUE && deliveryTimeoutMs < lingerAndRequestTimeoutMs) { + if (deliveryTimeoutMs < lingerAndRequestTimeoutMs) { if (config.originals().containsKey(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG)) { // throw an exception if the user explicitly set an inconsistent value throw new ConfigException(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG @@ -545,7 +545,7 @@ private static int configureInflightRequests(ProducerConfig config) { private static short configureAcks(ProducerConfig config, Logger log) { boolean userConfiguredAcks = config.originals().containsKey(ProducerConfig.ACKS_CONFIG); - short acks = Short.valueOf(config.getString(ProducerConfig.ACKS_CONFIG)); + short acks = Short.parseShort(config.getString(ProducerConfig.ACKS_CONFIG)); if (config.idempotenceEnabled()) { if (!userConfiguredAcks) @@ -1051,12 +1051,11 @@ private ClusterAndWaitTime waitOnMetadata(String topic, Integer partition, long * Validate that the record size isn't too large */ private void ensureValidRecordSize(int size) { - if (size > this.maxRequestSize) + if (size > maxRequestSize) throw new RecordTooLargeException("The message is " + size + - " bytes when serialized which is larger than the maximum request size you have configured with the " + - ProducerConfig.MAX_REQUEST_SIZE_CONFIG + - " configuration."); - if (size > this.totalMemorySize) + " bytes when serialized which is larger than " + maxRequestSize + ", which is the value of the " + + ProducerConfig.MAX_REQUEST_SIZE_CONFIG + " configuration."); + if (size > totalMemorySize) throw new RecordTooLargeException("The message is " + size + " bytes when serialized which is larger than the total memory buffer you have configured with the " + ProducerConfig.BUFFER_MEMORY_CONFIG + @@ -1348,7 +1347,7 @@ private InterceptorCallback(Callback userCallback, ProducerInterceptors in } public void onCompletion(RecordMetadata metadata, Exception exception) { - metadata = metadata != null ? metadata : new RecordMetadata(tp, -1, -1, RecordBatch.NO_TIMESTAMP, Long.valueOf(-1L), -1, -1); + metadata = metadata != null ? metadata : new RecordMetadata(tp, -1, -1, RecordBatch.NO_TIMESTAMP, -1L, -1, -1); this.interceptors.onAcknowledgement(metadata, exception); if (this.userCallback != null) this.userCallback.onCompletion(metadata, exception); diff --git a/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala index 4b82783db4f29..7f707a1dd0df2 100644 --- a/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala +++ b/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala @@ -21,10 +21,13 @@ import java.util.Properties import java.util.concurrent.{ExecutionException, Future, TimeUnit} import kafka.log.LogConfig +import kafka.server.{Defaults, KafkaConfig} import kafka.utils.TestUtils import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord, RecordMetadata} -import org.apache.kafka.common.errors.{InvalidTimestampException, SerializationException, TimeoutException} -import org.apache.kafka.common.record.TimestampType +import org.apache.kafka.common.errors.{InvalidTimestampException, RecordTooLargeException, SerializationException, TimeoutException} +import org.apache.kafka.common.record.{DefaultRecord, DefaultRecordBatch, Records, TimestampType} +import org.apache.kafka.common.serialization.ByteArraySerializer +import org.apache.kafka.common.utils.ByteUtils import org.junit.Assert._ import org.junit.Test import org.scalatest.Assertions.intercept @@ -168,4 +171,25 @@ class PlaintextProducerSendTest extends BaseProducerSendTest { verifySendFailure(send(producer2)) // should fail send since buffer is full verifySendSuccess(future2) // previous batch should be completed and sent now } + + @Test + def testSendRecordBatchWithMaxRequestSizeAndHigher(): Unit = { + val producerProps = new Properties() + producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList) + val producer = registerProducer(new KafkaProducer(producerProps, new ByteArraySerializer, new ByteArraySerializer)) + + val keyLengthSize = 1 + val headerLengthSize = 1 + val valueLengthSize = 3 + val overhead = Records.LOG_OVERHEAD + DefaultRecordBatch.RECORD_BATCH_OVERHEAD + DefaultRecord.MAX_RECORD_OVERHEAD + + keyLengthSize + headerLengthSize + valueLengthSize + val valueSize = Defaults.MessageMaxBytes - overhead + + val record0 = new ProducerRecord(topic, new Array[Byte](0), new Array[Byte](valueSize)) + assertEquals(record0.value.length, producer.send(record0).get.serializedValueSize) + + val record1 = new ProducerRecord(topic, new Array[Byte](0), new Array[Byte](valueSize + 1)) + assertEquals(classOf[RecordTooLargeException], intercept[ExecutionException](producer.send(record1).get).getCause.getClass) + } + }