Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,7 @@ public KafkaProducer(Properties properties, Serializer<K> keySerializer, Seriali
Serializer<V> valueSerializer,
ProducerMetadata metadata,
KafkaClient kafkaClient,
ProducerInterceptors interceptors,
ProducerInterceptors<K, V> interceptors,
Time time) {
ProducerConfig config = new ProducerConfig(ProducerConfig.addSerializerToConfig(configs, keySerializer,
valueSerializer));
Expand Down Expand Up @@ -486,7 +486,7 @@ private static int configureDeliveryTimeout(ProducerConfig config, Logger log) {
int requestTimeoutMs = config.getInt(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG);
int lingerAndRequestTimeoutMs = (int) Math.min((long) lingerMs + requestTimeoutMs, Integer.MAX_VALUE);

if (deliveryTimeoutMs < Integer.MAX_VALUE && deliveryTimeoutMs < lingerAndRequestTimeoutMs) {
if (deliveryTimeoutMs < lingerAndRequestTimeoutMs) {
if (config.originals().containsKey(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG)) {
// throw an exception if the user explicitly set an inconsistent value
throw new ConfigException(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG
Expand Down Expand Up @@ -545,7 +545,7 @@ private static int configureInflightRequests(ProducerConfig config) {

private static short configureAcks(ProducerConfig config, Logger log) {
boolean userConfiguredAcks = config.originals().containsKey(ProducerConfig.ACKS_CONFIG);
short acks = Short.valueOf(config.getString(ProducerConfig.ACKS_CONFIG));
short acks = Short.parseShort(config.getString(ProducerConfig.ACKS_CONFIG));

if (config.idempotenceEnabled()) {
if (!userConfiguredAcks)
Expand Down Expand Up @@ -1051,12 +1051,11 @@ private ClusterAndWaitTime waitOnMetadata(String topic, Integer partition, long
* Validate that the record size isn't too large
*/
private void ensureValidRecordSize(int size) {
if (size > this.maxRequestSize)
if (size > maxRequestSize)
throw new RecordTooLargeException("The message is " + size +
" bytes when serialized which is larger than the maximum request size you have configured with the " +
ProducerConfig.MAX_REQUEST_SIZE_CONFIG +
" configuration.");
if (size > this.totalMemorySize)
" bytes when serialized which is larger than " + maxRequestSize + ", which is the value of the " +
ProducerConfig.MAX_REQUEST_SIZE_CONFIG + " configuration.");
if (size > totalMemorySize)
throw new RecordTooLargeException("The message is " + size +
" bytes when serialized which is larger than the total memory buffer you have configured with the " +
ProducerConfig.BUFFER_MEMORY_CONFIG +
Expand Down Expand Up @@ -1348,7 +1347,7 @@ private InterceptorCallback(Callback userCallback, ProducerInterceptors<K, V> in
}

public void onCompletion(RecordMetadata metadata, Exception exception) {
metadata = metadata != null ? metadata : new RecordMetadata(tp, -1, -1, RecordBatch.NO_TIMESTAMP, Long.valueOf(-1L), -1, -1);
metadata = metadata != null ? metadata : new RecordMetadata(tp, -1, -1, RecordBatch.NO_TIMESTAMP, -1L, -1, -1);
this.interceptors.onAcknowledgement(metadata, exception);
if (this.userCallback != null)
this.userCallback.onCompletion(metadata, exception);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ public class ProducerConfig extends AbstractConfig {
.define(RECEIVE_BUFFER_CONFIG, Type.INT, 32 * 1024, atLeast(CommonClientConfigs.RECEIVE_BUFFER_LOWER_BOUND), Importance.MEDIUM, CommonClientConfigs.RECEIVE_BUFFER_DOC)
.define(MAX_REQUEST_SIZE_CONFIG,
Type.INT,
1 * 1024 * 1024,
1024 * 1024,
atLeast(0),
Importance.MEDIUM,
MAX_REQUEST_SIZE_DOC)
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/server/KafkaConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ object Defaults {
val BrokerIdGenerationEnable = true
val MaxReservedBrokerId = 1000
val BrokerId = -1
val MessageMaxBytes = 1000000 + Records.LOG_OVERHEAD
val MessageMaxBytes = 1024 * 1024 + Records.LOG_OVERHEAD
val NumNetworkThreads = 3
val NumIoThreads = 8
val BackgroundThreads = 10
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,13 @@ import java.util.Properties
import java.util.concurrent.{ExecutionException, Future, TimeUnit}

import kafka.log.LogConfig
import kafka.server.{Defaults, KafkaConfig}
import kafka.utils.TestUtils
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord, RecordMetadata}
import org.apache.kafka.common.errors.{InvalidTimestampException, SerializationException, TimeoutException}
import org.apache.kafka.common.record.TimestampType
import org.apache.kafka.common.errors.{InvalidTimestampException, RecordTooLargeException, SerializationException, TimeoutException}
import org.apache.kafka.common.record.{DefaultRecord, DefaultRecordBatch, Records, TimestampType}
import org.apache.kafka.common.serialization.ByteArraySerializer
import org.apache.kafka.common.utils.ByteUtils
import org.junit.Assert._
import org.junit.Test
import org.scalatest.Assertions.intercept
Expand Down Expand Up @@ -168,4 +171,25 @@ class PlaintextProducerSendTest extends BaseProducerSendTest {
verifySendFailure(send(producer2)) // should fail send since buffer is full
verifySendSuccess(future2) // previous batch should be completed and sent now
}

@Test
def testSendRecordBatchWithMaxRequestSizeAndHigher(): Unit = {
val producerProps = new Properties()
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
val producer = registerProducer(new KafkaProducer(producerProps, new ByteArraySerializer, new ByteArraySerializer))

val keyLengthSize = 1
val headerLengthSize = 1
val valueLengthSize = 3
val overhead = Records.LOG_OVERHEAD + DefaultRecordBatch.RECORD_BATCH_OVERHEAD + DefaultRecord.MAX_RECORD_OVERHEAD +
keyLengthSize + headerLengthSize + valueLengthSize
val valueSize = Defaults.MessageMaxBytes - overhead

val record0 = new ProducerRecord(topic, new Array[Byte](0), new Array[Byte](valueSize))
assertEquals(record0.value.length, producer.send(record0).get.serializedValueSize)

val record1 = new ProducerRecord(topic, new Array[Byte](0), new Array[Byte](valueSize + 1))
assertEquals(classOf[RecordTooLargeException], intercept[ExecutionException](producer.send(record1).get).getCause.getClass)
}

}