From b396cace075efe63ae6ba23a449aa6bcb07746bd Mon Sep 17 00:00:00 2001 From: Luke Chen Date: Thu, 17 Feb 2022 21:11:57 +0800 Subject: [PATCH 1/9] KAFKA-13673: disable idempotence when config conflicts --- .../kafka/clients/producer/KafkaProducer.java | 2 +- .../clients/producer/ProducerConfig.java | 54 ++++++++---- .../clients/producer/KafkaProducerTest.java | 85 +++++++++++-------- 3 files changed, 88 insertions(+), 53 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index 667b80696ecd5..f739336b85692 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -514,7 +514,7 @@ private TransactionManager configureTransactionState(ProducerConfig config, LogContext logContext) { TransactionManager transactionManager = null; - if (config.idempotenceEnabled()) { + if (config.getBoolean(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG)) { final String transactionalId = config.getString(ProducerConfig.TRANSACTIONAL_ID_CONFIG); final int transactionTimeoutMs = config.getInt(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG); final long retryBackoffMs = config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG); diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java index 9a698b1ecef3c..6663b3cbad870 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java @@ -27,6 +27,8 @@ import org.apache.kafka.common.config.SecurityConfig; import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.serialization.Serializer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.Collections; import java.util.HashMap; @@ -44,6 +46,7 @@ * href="http://kafka.apache.org/documentation.html#producerconfigs">Kafka documentation */ public class ProducerConfig extends AbstractConfig { + private static final Logger log = LoggerFactory.getLogger(ProducerConfig.class); /* * NOTE: DO NOT CHANGE EITHER CONFIG STRINGS OR THEIR JAVA VARIABLE NAMES AS THESE ARE PART OF THE PUBLIC API AND @@ -461,27 +464,55 @@ private void postProcessAndValidateIdempotenceConfigs(final Map final Map originalConfigs = this.originals(); final String acksStr = parseAcks(this.getString(ACKS_CONFIG)); configs.put(ACKS_CONFIG, acksStr); + final boolean userConfiguredIdempotence = this.originals().containsKey(ENABLE_IDEMPOTENCE_CONFIG); + boolean idempotenceEnabled = this.getBoolean(ENABLE_IDEMPOTENCE_CONFIG); + boolean shouldDisableIdempotence = false; - // For idempotence producers, values for `RETRIES_CONFIG` and `ACKS_CONFIG` need validation - if (idempotenceEnabled()) { + // For idempotence producers, values for `retries` and `acks` and `max.in.flight.requests.per.connection` need validation + if (idempotenceEnabled) { boolean userConfiguredRetries = originalConfigs.containsKey(RETRIES_CONFIG); - if (userConfiguredRetries && this.getInt(RETRIES_CONFIG) == 0) { - throw new ConfigException("Must set " + ProducerConfig.RETRIES_CONFIG + " to non-zero when using the idempotent producer."); + int retries = this.getInt(RETRIES_CONFIG); + if (userConfiguredRetries && retries == 0) { + if (userConfiguredIdempotence) { + throw new ConfigException("Must set " + RETRIES_CONFIG + " to non-zero when using the idempotent producer."); + } + log.warn("`enable.idempotence` will be disabled because {} config is set to 0.", RETRIES_CONFIG, retries); + shouldDisableIdempotence = true; } boolean userConfiguredAcks = originalConfigs.containsKey(ACKS_CONFIG); final short acks = Short.valueOf(acksStr); if (userConfiguredAcks && acks != (short) -1) { - throw new ConfigException("Must set " + ACKS_CONFIG + " to all in order to use the idempotent " + + if (userConfiguredIdempotence) { + throw new ConfigException("Must set " + ACKS_CONFIG + " to all in order to use the idempotent " + "producer. Otherwise we cannot guarantee idempotence."); + } + log.warn("`enable.idempotence` will be disabled because {} config is set to {}, not set to 'all'.", ACKS_CONFIG, acks); + shouldDisableIdempotence = true; } boolean userConfiguredInflightRequests = originalConfigs.containsKey(MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION); - if (userConfiguredInflightRequests && MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_FOR_IDEMPOTENCE < this.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION)) { - throw new ConfigException("Must set " + ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION + " to at most 5" + + int inFlightConnection = this.getInt(MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION); + if (userConfiguredInflightRequests && MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_FOR_IDEMPOTENCE < inFlightConnection) { + if (userConfiguredIdempotence) { + throw new ConfigException("Must set " + MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION + " to at most 5" + " to use the idempotent producer."); + } + log.warn("`enable.idempotence` will be disabled because {} config is set to {}, which is greater than 5.", MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, inFlightConnection); + shouldDisableIdempotence = true; } } + + if (shouldDisableIdempotence) { + configs.put(ENABLE_IDEMPOTENCE_CONFIG, false); + } + + // validate `transaction.id` after validating idempotence dependant configs because `enable.idempotence` config might be overridden + idempotenceEnabled = idempotenceEnabled && !shouldDisableIdempotence; + boolean userConfiguredTransactions = originalConfigs.containsKey(TRANSACTIONAL_ID_CONFIG); + if (!idempotenceEnabled && userConfiguredTransactions) { + throw new ConfigException("Cannot set a " + ProducerConfig.TRANSACTIONAL_ID_CONFIG + " without also enabling idempotence."); + } } private static String parseAcks(String acksString) { @@ -511,15 +542,6 @@ public ProducerConfig(Map props) { super(CONFIG, props); } - boolean idempotenceEnabled() { - boolean userConfiguredTransactions = this.originals().containsKey(TRANSACTIONAL_ID_CONFIG); - boolean idempotenceEnabled = this.getBoolean(ENABLE_IDEMPOTENCE_CONFIG); - if (!idempotenceEnabled && userConfiguredTransactions) - throw new ConfigException("Cannot set a " + ProducerConfig.TRANSACTIONAL_ID_CONFIG + " without also enabling idempotence."); - - return idempotenceEnabled; - } - ProducerConfig(Map props, boolean doLog) { super(CONFIG, props, doLog); } diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java index 05d11d9458105..00c50f0af2b36 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java @@ -233,6 +233,19 @@ public void testAcksAndIdempotenceForIdempotentProducers() { config.getString(ProducerConfig.ACKS_CONFIG), "acks should be overwritten"); + Properties validProps4 = new Properties() {{ + putAll(baseProps); + setProperty(ProducerConfig.ACKS_CONFIG, "0"); + }}; + config = new ProducerConfig(validProps4); + assertFalse( + config.getBoolean(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG), + "idempotence should be disabled when acks not set to all and `enable.idempotence` config is unset."); + assertEquals( + "0", + config.getString(ProducerConfig.ACKS_CONFIG), + "acks should be set with overridden value"); + Properties invalidProps = new Properties() {{ putAll(baseProps); setProperty(ProducerConfig.ACKS_CONFIG, "0"); @@ -258,21 +271,12 @@ public void testAcksAndIdempotenceForIdempotentProducers() { Properties invalidProps3 = new Properties() {{ putAll(baseProps); setProperty(ProducerConfig.ACKS_CONFIG, "0"); - }}; - assertThrows( - ConfigException.class, - () -> new ProducerConfig(invalidProps3), - "Must set acks to all in order to use the idempotent producer"); - - Properties invalidProps4 = new Properties() {{ - putAll(baseProps); - setProperty(ProducerConfig.ACKS_CONFIG, "0"); setProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "transactionalId"); }}; assertThrows( ConfigException.class, - () -> new ProducerConfig(invalidProps4), - "Must set retries to non-zero when using the idempotent producer."); + () -> new ProducerConfig(invalidProps3), + "Must set acks to all when using the transactional producer."); } @Test @@ -297,6 +301,19 @@ public void testRetriesAndIdempotenceForIdempotentProducers() { config.getInt(ProducerConfig.RETRIES_CONFIG), "retries should be overwritten"); + Properties validProps2 = new Properties() {{ + putAll(baseProps); + setProperty(ProducerConfig.RETRIES_CONFIG, "0"); + }}; + config = new ProducerConfig(validProps2); + assertFalse( + config.getBoolean(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG), + "idempotence should be disabled when retries set to 0 and `enable.idempotence` config is unset."); + assertEquals( + 0, + config.getInt(ProducerConfig.RETRIES_CONFIG), + "retries should be set with overridden value"); + Properties invalidProps = new Properties() {{ putAll(baseProps); setProperty(ProducerConfig.RETRIES_CONFIG, "0"); @@ -311,32 +328,23 @@ public void testRetriesAndIdempotenceForIdempotentProducers() { Properties invalidProps2 = new Properties() {{ putAll(baseProps); setProperty(ProducerConfig.RETRIES_CONFIG, "0"); - }}; - assertThrows( - ConfigException.class, - () -> new ProducerConfig(invalidProps2), - "Must set retries to non-zero when using the idempotent producer."); - - Properties invalidProps3 = new Properties() {{ - putAll(baseProps); - setProperty(ProducerConfig.RETRIES_CONFIG, "0"); // explicitly enabling idempotence should still throw exception setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true"); }}; assertThrows( ConfigException.class, - () -> new ProducerConfig(invalidProps3), + () -> new ProducerConfig(invalidProps2), "Must set retries to non-zero when using the idempotent producer."); - Properties invalidProps4 = new Properties() {{ + Properties invalidProps3 = new Properties() {{ putAll(baseProps); setProperty(ProducerConfig.RETRIES_CONFIG, "0"); setProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "transactionalId"); }}; assertThrows( ConfigException.class, - () -> new ProducerConfig(invalidProps4), - "Must set retries to non-zero when using the idempotent producer."); + () -> new ProducerConfig(invalidProps3), + "Must set retries to non-zero when using the transactional producer."); } @Test @@ -361,6 +369,20 @@ public void testInflightRequestsAndIdempotenceForIdempotentProducers() { config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION), "max.in.flight.requests.per.connection should be overwritten"); + Properties validProps2 = new Properties() {{ + putAll(baseProps); + setProperty(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "10"); + }}; + config = new ProducerConfig(validProps2); + assertFalse( + config.getBoolean(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG), + "idempotence should be disabled when `max.in.flight.requests.per.connection` is greater than 5 and " + + "`enable.idempotence` config is unset."); + assertEquals( + 10, + config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION), + "`max.in.flight.requests.per.connection` should be set with overridden value"); + Properties invalidProps = new Properties() {{ putAll(baseProps); setProperty(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "5"); @@ -375,31 +397,22 @@ public void testInflightRequestsAndIdempotenceForIdempotentProducers() { Properties invalidProps2 = new Properties() {{ putAll(baseProps); setProperty(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "10"); - }}; - assertThrows( - ConfigException.class, - () -> new ProducerConfig(invalidProps2), - "Must set max.in.flight.requests.per.connection to at most 5 when using the idempotent producer."); - - Properties invalidProps3 = new Properties() {{ - putAll(baseProps); - setProperty(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "10"); // explicitly enabling idempotence should still throw exception setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true"); }}; assertThrows( ConfigException.class, - () -> new ProducerConfig(invalidProps3), + () -> new ProducerConfig(invalidProps2), "Must set max.in.flight.requests.per.connection to at most 5 when using the idempotent producer."); - Properties invalidProps4 = new Properties() {{ + Properties invalidProps3 = new Properties() {{ putAll(baseProps); setProperty(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "10"); setProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "transactionalId"); }}; assertThrows( ConfigException.class, - () -> new ProducerConfig(invalidProps4), + () -> new ProducerConfig(invalidProps3), "Must set retries to non-zero when using the idempotent producer."); } From a90c960105f2aa0900f367aadfae0837abbf2284 Mon Sep 17 00:00:00 2001 From: Luke Chen Date: Sun, 20 Feb 2022 10:50:14 +0800 Subject: [PATCH 2/9] KAFKA-13673: info log to acks/retries, and add upgrade doc --- .../apache/kafka/clients/producer/ProducerConfig.java | 7 ++++--- docs/upgrade.html | 9 ++++++--- 2 files changed, 10 insertions(+), 6 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java index 6663b3cbad870..2356e48c8ffda 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java @@ -476,7 +476,7 @@ private void postProcessAndValidateIdempotenceConfigs(final Map if (userConfiguredIdempotence) { throw new ConfigException("Must set " + RETRIES_CONFIG + " to non-zero when using the idempotent producer."); } - log.warn("`enable.idempotence` will be disabled because {} config is set to 0.", RETRIES_CONFIG, retries); + log.info("`enable.idempotence` will be disabled because {} config is set to 0.", RETRIES_CONFIG, retries); shouldDisableIdempotence = true; } @@ -487,7 +487,7 @@ private void postProcessAndValidateIdempotenceConfigs(final Map throw new ConfigException("Must set " + ACKS_CONFIG + " to all in order to use the idempotent " + "producer. Otherwise we cannot guarantee idempotence."); } - log.warn("`enable.idempotence` will be disabled because {} config is set to {}, not set to 'all'.", ACKS_CONFIG, acks); + log.info("`enable.idempotence` will be disabled because {} config is set to {}, not set to 'all'.", ACKS_CONFIG, acks); shouldDisableIdempotence = true; } @@ -498,7 +498,8 @@ private void postProcessAndValidateIdempotenceConfigs(final Map throw new ConfigException("Must set " + MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION + " to at most 5" + " to use the idempotent producer."); } - log.warn("`enable.idempotence` will be disabled because {} config is set to {}, which is greater than 5.", MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, inFlightConnection); + log.warn("`enable.idempotence` will be disabled because {} config is set to {}, which is greater than 5. " + + "Please note that in v4.0.0 and onward, this will become an error.", MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, inFlightConnection); shouldDisableIdempotence = true; } } diff --git a/docs/upgrade.html b/docs/upgrade.html index 0fbc9fd646ecf..221bb721bc41c 100644 --- a/docs/upgrade.html +++ b/docs/upgrade.html @@ -24,7 +24,8 @@
Notable changes in 3
  • Idempotence for the producer is enabled by default. In 3.0.0 and 3.1.0, a bug prevented this default from being applied, which meant that idempotence remained disabled unless the user had explicitly set enable.idempotence to true (See KAFKA-13598for more details). - This issue was fixed and the default is properly applied in 3.0.1, 3.1.1, and 3.2.0.
  • + This issue was fixed and the default is properly applied in 3.0.1, 3.1.1, and 3.2.0. + However, if enable.idempotence is unset and the other configs conflicts with it, idempotence will be disabled to avoid break existing producers.

    Upgrading to 3.1.0 from any version 0.8.x through 3.0.x

    @@ -73,7 +74,8 @@
    Notable changes in 3
    Notable changes in 3.1.0
    @@ -143,7 +145,8 @@
    Notable changes in 3
    Notable changes in 3.0.0
    From 29bb358a2e63046ea332981e8a5864caa9b25e78 Mon Sep 17 00:00:00 2001 From: Luke Chen Date: Tue, 22 Feb 2022 14:47:15 +0800 Subject: [PATCH 3/9] KAFKA-13673: rephrase comments and add note in doc --- .../apache/kafka/clients/producer/ProducerConfig.java | 10 +++++----- docs/upgrade.html | 6 +++--- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java index 2356e48c8ffda..3893a9f91f629 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java @@ -276,8 +276,8 @@ public class ProducerConfig extends AbstractConfig { + "retries due to broker failures, etc., may write duplicates of the retried message in the stream. " + "Note that enabling idempotence requires " + MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION + " to be less than or equal to " + MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_FOR_IDEMPOTENCE + " (with message ordering preserved for any allowable value), " + RETRIES_CONFIG + " to be greater than 0, and " - + ACKS_CONFIG + " must be 'all'. If these values are not explicitly set by the user, suitable values will be chosen. If incompatible " - + "values are set, a ConfigException will be thrown."; + + ACKS_CONFIG + " must be 'all'. If incompatible values are set, a ConfigException will be thrown. " + + "The default value is `true`. But if incompatible values are set and this config is not set explicitly, idempotent producer will be disabled automatically."; /** transaction.timeout.ms */ public static final String TRANSACTION_TIMEOUT_CONFIG = "transaction.timeout.ms"; @@ -476,7 +476,7 @@ private void postProcessAndValidateIdempotenceConfigs(final Map if (userConfiguredIdempotence) { throw new ConfigException("Must set " + RETRIES_CONFIG + " to non-zero when using the idempotent producer."); } - log.info("`enable.idempotence` will be disabled because {} config is set to 0.", RETRIES_CONFIG, retries); + log.info("`enable.idempotence` will be disabled because {} is set to 0.", RETRIES_CONFIG, retries); shouldDisableIdempotence = true; } @@ -487,7 +487,7 @@ private void postProcessAndValidateIdempotenceConfigs(final Map throw new ConfigException("Must set " + ACKS_CONFIG + " to all in order to use the idempotent " + "producer. Otherwise we cannot guarantee idempotence."); } - log.info("`enable.idempotence` will be disabled because {} config is set to {}, not set to 'all'.", ACKS_CONFIG, acks); + log.info("`enable.idempotence` will be disabled because {} is set to {}, not set to 'all'.", ACKS_CONFIG, acks); shouldDisableIdempotence = true; } @@ -498,7 +498,7 @@ private void postProcessAndValidateIdempotenceConfigs(final Map throw new ConfigException("Must set " + MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION + " to at most 5" + " to use the idempotent producer."); } - log.warn("`enable.idempotence` will be disabled because {} config is set to {}, which is greater than 5. " + + log.warn("`enable.idempotence` will be disabled because {} is set to {}, which is greater than 5. " + "Please note that in v4.0.0 and onward, this will become an error.", MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, inFlightConnection); shouldDisableIdempotence = true; } diff --git a/docs/upgrade.html b/docs/upgrade.html index 221bb721bc41c..6a96fe4aa20c0 100644 --- a/docs/upgrade.html +++ b/docs/upgrade.html @@ -25,7 +25,7 @@
    Notable changes in 3 which meant that idempotence remained disabled unless the user had explicitly set enable.idempotence to true (See KAFKA-13598for more details). This issue was fixed and the default is properly applied in 3.0.1, 3.1.1, and 3.2.0. - However, if enable.idempotence is unset and the other configs conflicts with it, idempotence will be disabled to avoid break existing producers. + However, if enable.idempotence is unset and the other configs conflicts with it, idempotence will be disabled to avoid breaking existing producers.

    Upgrading to 3.1.0 from any version 0.8.x through 3.0.x

    @@ -75,7 +75,7 @@
    Notable changes in 3
  • A bug prevented the producer idempotence default from being applied which meant that it remained disabled unless the user had explicitly set enable.idempotence to true. See KAFKA-13598for more details. This issue was fixed and the default is properly applied. - However, if enable.idempotence is unset and the other configs conflicts with it, idempotence will be disabled to avoid break existing producers.
  • + However, if enable.idempotence is unset and the other configs conflicts with it, idempotence will be disabled to avoid breaking existing producers.
    Notable changes in 3.1.0
    @@ -146,7 +146,7 @@
    Notable changes in 3
  • A bug prevented the producer idempotence default from being applied which meant that it remained disabled unless the user had explicitly set enable.idempotence to true. See KAFKA-13598for more details. This issue was fixed and the default is properly applied. - However, if enable.idempotence is unset and the other configs conflicts with it, idempotence will be disabled to avoid break existing producers.
  • + However, if enable.idempotence is unset and the other configs conflicts with it, idempotence will be disabled to avoid breaking existing producers.
    Notable changes in 3.0.0
    From 8a5c21b1e67abb40556acdc3f95b9c116af5bc11 Mon Sep 17 00:00:00 2001 From: Luke Chen Date: Wed, 23 Feb 2022 11:26:42 +0800 Subject: [PATCH 4/9] KAFKA-13673: improve upgrade doc --- docs/upgrade.html | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/docs/upgrade.html b/docs/upgrade.html index 6a96fe4aa20c0..8b8a48d87bd88 100644 --- a/docs/upgrade.html +++ b/docs/upgrade.html @@ -21,11 +21,10 @@
    Notable changes in 3.2.0
      -
    • Idempotence for the producer is enabled by default. In 3.0.0 and 3.1.0, a bug prevented this default from being applied, +
    • Idempotence for the producer is enabled by default if no conflicting configurations are set explicitly. In 3.0.0 and 3.1.0, a bug prevented this default from being applied, which meant that idempotence remained disabled unless the user had explicitly set enable.idempotence to true (See KAFKA-13598for more details). - This issue was fixed and the default is properly applied in 3.0.1, 3.1.1, and 3.2.0. - However, if enable.idempotence is unset and the other configs conflicts with it, idempotence will be disabled to avoid breaking existing producers.
    • + This issue was fixed and the default is properly applied in 3.0.1, 3.1.1, and 3.2.0.

    Upgrading to 3.1.0 from any version 0.8.x through 3.0.x

    @@ -72,10 +71,10 @@

    Upgrading to 3.1.0 from any vers
    Notable changes in 3.1.1
      -
    • A bug prevented the producer idempotence default from being applied which meant that it remained disabled unless the user had explicitly set +
    • Idempotence for the producer is enabled by default if no conflicting configurations are set explicitly. + A bug prevented the producer idempotence default from being applied which meant that it remained disabled unless the user had explicitly set enable.idempotence to true. See KAFKA-13598for more details. - This issue was fixed and the default is properly applied. - However, if enable.idempotence is unset and the other configs conflicts with it, idempotence will be disabled to avoid breaking existing producers.
    • + This issue was fixed and the default is properly applied.

    Notable changes in 3.1.0
    @@ -143,10 +142,10 @@

    Upgrading to 3.0.0 from any vers
    Notable changes in 3.0.1
      -
    • A bug prevented the producer idempotence default from being applied which meant that it remained disabled unless the user had explicitly set +
    • Idempotence for the producer is enabled by default if no conflicting configurations are set explicitly. + A bug prevented the producer idempotence default from being applied which meant that it remained disabled unless the user had explicitly set enable.idempotence to true. See KAFKA-13598for more details. - This issue was fixed and the default is properly applied. - However, if enable.idempotence is unset and the other configs conflicts with it, idempotence will be disabled to avoid breaking existing producers.
    • + This issue was fixed and the default is properly applied.

    Notable changes in 3.0.0
    From d0e8b07f94ed26bbf03d2ea8bd274c5693f03b66 Mon Sep 17 00:00:00 2001 From: Luke Chen Date: Thu, 24 Feb 2022 10:31:58 +0800 Subject: [PATCH 5/9] KAFKA-13673: refactor to make the description clear --- .../clients/producer/ProducerConfig.java | 26 +++++++++---------- .../clients/producer/KafkaProducerTest.java | 25 +++++++++++++----- docs/upgrade.html | 6 ++--- 3 files changed, 35 insertions(+), 22 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java index 3893a9f91f629..8f254cee1f936 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java @@ -276,8 +276,11 @@ public class ProducerConfig extends AbstractConfig { + "retries due to broker failures, etc., may write duplicates of the retried message in the stream. " + "Note that enabling idempotence requires " + MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION + " to be less than or equal to " + MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_FOR_IDEMPOTENCE + " (with message ordering preserved for any allowable value), " + RETRIES_CONFIG + " to be greater than 0, and " - + ACKS_CONFIG + " must be 'all'. If incompatible values are set, a ConfigException will be thrown. " - + "The default value is `true`. But if incompatible values are set and this config is not set explicitly, idempotent producer will be disabled automatically."; + + ACKS_CONFIG + " must be 'all'. " + + "

    " + + "Idempotence is enabled by default if no conflicting configurations are set. " + + "If conflicting configurations are set and idempotence is not explicitly enabled, idempotence is disabled. " + + "If idempotence is explicitly enabled and conflicting configurations are set, a ConfigException is thrown."; /** transaction.timeout.ms */ public static final String TRANSACTION_TIMEOUT_CONFIG = "transaction.timeout.ms"; @@ -470,35 +473,32 @@ private void postProcessAndValidateIdempotenceConfigs(final Map // For idempotence producers, values for `retries` and `acks` and `max.in.flight.requests.per.connection` need validation if (idempotenceEnabled) { - boolean userConfiguredRetries = originalConfigs.containsKey(RETRIES_CONFIG); - int retries = this.getInt(RETRIES_CONFIG); - if (userConfiguredRetries && retries == 0) { + final int retries = this.getInt(RETRIES_CONFIG); + if (retries == 0) { if (userConfiguredIdempotence) { throw new ConfigException("Must set " + RETRIES_CONFIG + " to non-zero when using the idempotent producer."); } - log.info("`enable.idempotence` will be disabled because {} is set to 0.", RETRIES_CONFIG, retries); + log.info("Idempotence will be disabled because {} is set to 0.", RETRIES_CONFIG, retries); shouldDisableIdempotence = true; } - boolean userConfiguredAcks = originalConfigs.containsKey(ACKS_CONFIG); final short acks = Short.valueOf(acksStr); - if (userConfiguredAcks && acks != (short) -1) { + if (acks != (short) -1) { if (userConfiguredIdempotence) { throw new ConfigException("Must set " + ACKS_CONFIG + " to all in order to use the idempotent " + "producer. Otherwise we cannot guarantee idempotence."); } - log.info("`enable.idempotence` will be disabled because {} is set to {}, not set to 'all'.", ACKS_CONFIG, acks); + log.info("Idempotence will be disabled because {} is set to {}, not set to 'all'.", ACKS_CONFIG, acks); shouldDisableIdempotence = true; } - boolean userConfiguredInflightRequests = originalConfigs.containsKey(MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION); - int inFlightConnection = this.getInt(MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION); - if (userConfiguredInflightRequests && MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_FOR_IDEMPOTENCE < inFlightConnection) { + final int inFlightConnection = this.getInt(MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION); + if (MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_FOR_IDEMPOTENCE < inFlightConnection) { if (userConfiguredIdempotence) { throw new ConfigException("Must set " + MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION + " to at most 5" + " to use the idempotent producer."); } - log.warn("`enable.idempotence` will be disabled because {} is set to {}, which is greater than 5. " + + log.warn("Idempotence will be disabled because {} is set to {}, which is greater than 5. " + "Please note that in v4.0.0 and onward, this will become an error.", MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, inFlightConnection); shouldDisableIdempotence = true; } diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java index 00c50f0af2b36..ed761b7798801 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java @@ -246,6 +246,19 @@ public void testAcksAndIdempotenceForIdempotentProducers() { config.getString(ProducerConfig.ACKS_CONFIG), "acks should be set with overridden value"); + Properties validProps5 = new Properties() {{ + putAll(baseProps); + setProperty(ProducerConfig.ACKS_CONFIG, "1"); + }}; + config = new ProducerConfig(validProps5); + assertFalse( + config.getBoolean(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG), + "idempotence should be disabled when acks not set to all and `enable.idempotence` config is unset."); + assertEquals( + "1", + config.getString(ProducerConfig.ACKS_CONFIG), + "acks should be set with overridden value"); + Properties invalidProps = new Properties() {{ putAll(baseProps); setProperty(ProducerConfig.ACKS_CONFIG, "0"); @@ -357,7 +370,7 @@ public void testInflightRequestsAndIdempotenceForIdempotentProducers() { Properties validProps = new Properties() {{ putAll(baseProps); - setProperty(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "10"); + setProperty(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "6"); setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "false"); }}; ProducerConfig config = new ProducerConfig(validProps); @@ -365,13 +378,13 @@ public void testInflightRequestsAndIdempotenceForIdempotentProducers() { config.getBoolean(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG), "idempotence should be overwritten"); assertEquals( - 10, + 6, config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION), "max.in.flight.requests.per.connection should be overwritten"); Properties validProps2 = new Properties() {{ putAll(baseProps); - setProperty(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "10"); + setProperty(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "6"); }}; config = new ProducerConfig(validProps2); assertFalse( @@ -379,7 +392,7 @@ public void testInflightRequestsAndIdempotenceForIdempotentProducers() { "idempotence should be disabled when `max.in.flight.requests.per.connection` is greater than 5 and " + "`enable.idempotence` config is unset."); assertEquals( - 10, + 6, config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION), "`max.in.flight.requests.per.connection` should be set with overridden value"); @@ -396,7 +409,7 @@ public void testInflightRequestsAndIdempotenceForIdempotentProducers() { Properties invalidProps2 = new Properties() {{ putAll(baseProps); - setProperty(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "10"); + setProperty(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "6"); // explicitly enabling idempotence should still throw exception setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true"); }}; @@ -407,7 +420,7 @@ public void testInflightRequestsAndIdempotenceForIdempotentProducers() { Properties invalidProps3 = new Properties() {{ putAll(baseProps); - setProperty(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "10"); + setProperty(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "6"); setProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "transactionalId"); }}; assertThrows( diff --git a/docs/upgrade.html b/docs/upgrade.html index 8b8a48d87bd88..ddcee32053eb5 100644 --- a/docs/upgrade.html +++ b/docs/upgrade.html @@ -21,7 +21,7 @@

    Notable changes in 3.2.0
      -
    • Idempotence for the producer is enabled by default if no conflicting configurations are set explicitly. In 3.0.0 and 3.1.0, a bug prevented this default from being applied, +
    • Idempotence for the producer is enabled by default if no conflicting configurations are set. In 3.0.0 and 3.1.0, a bug prevented this default from being applied, which meant that idempotence remained disabled unless the user had explicitly set enable.idempotence to true (See KAFKA-13598for more details). This issue was fixed and the default is properly applied in 3.0.1, 3.1.1, and 3.2.0.
    • @@ -71,7 +71,7 @@

      Upgrading to 3.1.0 from any vers
      Notable changes in 3.1.1
        -
      • Idempotence for the producer is enabled by default if no conflicting configurations are set explicitly. +
      • Idempotence for the producer is enabled by default if no conflicting configurations are set. A bug prevented the producer idempotence default from being applied which meant that it remained disabled unless the user had explicitly set enable.idempotence to true. See KAFKA-13598for more details. This issue was fixed and the default is properly applied.
      • @@ -142,7 +142,7 @@

        Upgrading to 3.0.0 from any vers
        Notable changes in 3.0.1
          -
        • Idempotence for the producer is enabled by default if no conflicting configurations are set explicitly. +
        • Idempotence for the producer is enabled by default if no conflicting configurations are set. A bug prevented the producer idempotence default from being applied which meant that it remained disabled unless the user had explicitly set enable.idempotence to true. See KAFKA-13598for more details. This issue was fixed and the default is properly applied.
        • From fcb4dfa141c9f400bc53dcb1d213cac1b19f2c5f Mon Sep 17 00:00:00 2001 From: Luke Chen Date: Sat, 26 Feb 2022 11:20:23 +0800 Subject: [PATCH 6/9] KAFKA-13673: refactor and add note in docs for idempotence --- .../clients/producer/ProducerConfig.java | 23 +++++++++++++------ 1 file changed, 16 insertions(+), 7 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java index 8f254cee1f936..4467b418f901d 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java @@ -107,7 +107,10 @@ public class ProducerConfig extends AbstractConfig { + "
        • acks=all This means the leader will wait for the full set of in-sync replicas to" + " acknowledge the record. This guarantees that the record will not be lost as long as at least one in-sync replica" + " remains alive. This is the strongest available guarantee. This is equivalent to the acks=-1 setting." - + "
        "; + + "

      " + + "

      " + + "Note that enabling idempotence requires this config value to be 'all'." + + " If conflicting configurations are set and idempotence is not explicitly enabled, idempotence is disabled."; /** linger.ms */ public static final String LINGER_MS_CONFIG = "linger.ms"; @@ -205,25 +208,31 @@ public class ProducerConfig extends AbstractConfig { /** metric.reporters */ public static final String METRIC_REPORTER_CLASSES_CONFIG = CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG; + // max.in.flight.requests.per.connection should be less than or equal to 5 when idempotence producer enabled to ensure message ordering + private static final int MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_FOR_IDEMPOTENCE = 5; + /** max.in.flight.requests.per.connection */ public static final String MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION = "max.in.flight.requests.per.connection"; private static final String MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_DOC = "The maximum number of unacknowledged requests the client will send on a single connection before blocking." + " Note that if this config is set to be greater than 1 and enable.idempotence is set to false, there is a risk of" - + " message re-ordering after a failed send due to retries (i.e., if retries are enabled)."; - // max.in.flight.requests.per.connection should be less than or equal to 5 when idempotence producer enabled to ensure message ordering - private static final int MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_FOR_IDEMPOTENCE = 5; + + " message re-ordering after a failed send due to retries (i.e., if retries are enabled)." + + " Note additionally that enabling idempotence requires this config value to be less than or equal to " + MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_FOR_IDEMPOTENCE + "." + + " If conflicting configurations are set and idempotence is not explicitly enabled, idempotence is disabled."; /** retries */ public static final String RETRIES_CONFIG = CommonClientConfigs.RETRIES_CONFIG; private static final String RETRIES_DOC = "Setting a value greater than zero will cause the client to resend any record whose send fails with a potentially transient error." + " Note that this retry is no different than if the client resent the record upon receiving the error." - + " Allowing retries without setting " + MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION + " to 1 will potentially change the" + + " Allowing retries and disabling enable.idempotence but without setting " + MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION + " to 1 will potentially change the" + " ordering of records because if two batches are sent to a single partition, and the first fails and is retried but the second" + " succeeds, then the records in the second batch may appear first. Note additionally that produce requests will be" + " failed before the number of retries has been exhausted if the timeout configured by" + " " + DELIVERY_TIMEOUT_MS_CONFIG + " expires first before successful acknowledgement. Users should generally" + " prefer to leave this config unset and instead use " + DELIVERY_TIMEOUT_MS_CONFIG + " to control" - + " retry behavior."; + + " retry behavior." + + "

      " + + "Enabling idempotence requires this config value to be greater than 0." + + " If conflicting configurations are set and idempotence is not explicitly enabled, idempotence is disabled."; /** key.serializer */ public static final String KEY_SERIALIZER_CLASS_CONFIG = "key.serializer"; @@ -506,10 +515,10 @@ private void postProcessAndValidateIdempotenceConfigs(final Map if (shouldDisableIdempotence) { configs.put(ENABLE_IDEMPOTENCE_CONFIG, false); + idempotenceEnabled = false; } // validate `transaction.id` after validating idempotence dependant configs because `enable.idempotence` config might be overridden - idempotenceEnabled = idempotenceEnabled && !shouldDisableIdempotence; boolean userConfiguredTransactions = originalConfigs.containsKey(TRANSACTIONAL_ID_CONFIG); if (!idempotenceEnabled && userConfiguredTransactions) { throw new ConfigException("Cannot set a " + ProducerConfig.TRANSACTIONAL_ID_CONFIG + " without also enabling idempotence."); From 53f0aa4087ee4856c1eeeb131f7768bb54c712bf Mon Sep 17 00:00:00 2001 From: Luke Chen Date: Wed, 2 Mar 2022 09:54:07 +0800 Subject: [PATCH 7/9] KAFKA-13673: refactor the description --- .../kafka/clients/producer/ProducerConfig.java | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java index 4467b418f901d..7a808ed1ec1c2 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java @@ -216,21 +216,22 @@ public class ProducerConfig extends AbstractConfig { private static final String MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_DOC = "The maximum number of unacknowledged requests the client will send on a single connection before blocking." + " Note that if this config is set to be greater than 1 and enable.idempotence is set to false, there is a risk of" + " message re-ordering after a failed send due to retries (i.e., if retries are enabled)." - + " Note additionally that enabling idempotence requires this config value to be less than or equal to " + MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_FOR_IDEMPOTENCE + "." + + " Additionally, enabling idempotence requires this config value to be less than or equal to " + MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_FOR_IDEMPOTENCE + "." + " If conflicting configurations are set and idempotence is not explicitly enabled, idempotence is disabled."; /** retries */ public static final String RETRIES_CONFIG = CommonClientConfigs.RETRIES_CONFIG; private static final String RETRIES_DOC = "Setting a value greater than zero will cause the client to resend any record whose send fails with a potentially transient error." + " Note that this retry is no different than if the client resent the record upon receiving the error." - + " Allowing retries and disabling enable.idempotence but without setting " + MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION + " to 1 will potentially change the" - + " ordering of records because if two batches are sent to a single partition, and the first fails and is retried but the second" - + " succeeds, then the records in the second batch may appear first. Note additionally that produce requests will be" - + " failed before the number of retries has been exhausted if the timeout configured by" + + " Produce requests will be failed before the number of retries has been exhausted if the timeout configured by" + " " + DELIVERY_TIMEOUT_MS_CONFIG + " expires first before successful acknowledgement. Users should generally" + " prefer to leave this config unset and instead use " + DELIVERY_TIMEOUT_MS_CONFIG + " to control" + " retry behavior." - + "

      " + + "

      " + + "Allowing retries and disabling enable.idempotence but without setting " + MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION + " to 1 will potentially change the" + + " ordering of records because if two batches are sent to a single partition, and the first fails and is retried but the second" + + " succeeds, then the records in the second batch may appear first." + + "

      " + "Enabling idempotence requires this config value to be greater than 0." + " If conflicting configurations are set and idempotence is not explicitly enabled, idempotence is disabled."; From 5b7a35ca1eacfc5ce9c5990f968e439c084816c0 Mon Sep 17 00:00:00 2001 From: Luke Chen Date: Thu, 3 Mar 2022 09:56:34 +0800 Subject: [PATCH 8/9] KAFKA-13673: refactor retries doc --- .../apache/kafka/clients/producer/ProducerConfig.java | 5 ++--- config/server.properties | 9 +++++---- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java index 7a808ed1ec1c2..f28739f1802b6 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java @@ -228,11 +228,10 @@ public class ProducerConfig extends AbstractConfig { + " prefer to leave this config unset and instead use " + DELIVERY_TIMEOUT_MS_CONFIG + " to control" + " retry behavior." + "

      " - + "Allowing retries and disabling enable.idempotence but without setting " + MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION + " to 1 will potentially change the" + + "Allowing retries while setting enable.idempotence to false and " + MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION + " to 1 will potentially change the" + " ordering of records because if two batches are sent to a single partition, and the first fails and is retried but the second" + " succeeds, then the records in the second batch may appear first." - + "

      " - + "Enabling idempotence requires this config value to be greater than 0." + + " Note that enabling idempotence requires this config value to be greater than 0." + " If conflicting configurations are set and idempotence is not explicitly enabled, idempotence is disabled."; /** key.serializer */ diff --git a/config/server.properties b/config/server.properties index f5172c297ec66..00e6011a6735d 100644 --- a/config/server.properties +++ b/config/server.properties @@ -31,7 +31,7 @@ broker.id=0 # listeners = listener_name://host_name:port # EXAMPLE: # listeners = PLAINTEXT://your.host.name:9092 -#listeners=PLAINTEXT://:9092 +#listeners=PLAINTEXT://:9093 # Listener name, hostname and port the broker will advertise to clients. # If not set, it uses the value for "listeners". @@ -102,18 +102,19 @@ transaction.state.log.min.isr=1 # from the end of the log. # The minimum age of a log file to be eligible for deletion due to age -log.retention.hours=168 +log.retention.ms=10000 # A size-based retention policy for logs. Segments are pruned from the log unless the remaining # segments drop below log.retention.bytes. Functions independently of log.retention.hours. #log.retention.bytes=1073741824 # The maximum size of a log segment file. When this size is reached a new log segment will be created. -log.segment.bytes=1073741824 +log.segment.bytes=1000 +log.retention.check.interval.ms=10000 # The interval at which log segments are checked to see if they can be deleted according # to the retention policies -log.retention.check.interval.ms=300000 + ############################# Zookeeper ############################# From 187d4fc5331c4d2dc4d933448b52e00e82350e2b Mon Sep 17 00:00:00 2001 From: Luke Chen Date: Thu, 3 Mar 2022 14:11:56 +0800 Subject: [PATCH 9/9] KAFKA-13673: refactor retries doc --- .../apache/kafka/clients/producer/ProducerConfig.java | 7 ++++--- config/server.properties | 9 ++++----- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java index f28739f1802b6..afc1e55cdfdad 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java @@ -228,11 +228,12 @@ public class ProducerConfig extends AbstractConfig { + " prefer to leave this config unset and instead use " + DELIVERY_TIMEOUT_MS_CONFIG + " to control" + " retry behavior." + "

      " + + "Enabling idempotence requires this config value to be greater than 0." + + " If conflicting configurations are set and idempotence is not explicitly enabled, idempotence is disabled." + + "

      " + "Allowing retries while setting enable.idempotence to false and " + MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION + " to 1 will potentially change the" + " ordering of records because if two batches are sent to a single partition, and the first fails and is retried but the second" - + " succeeds, then the records in the second batch may appear first." - + " Note that enabling idempotence requires this config value to be greater than 0." - + " If conflicting configurations are set and idempotence is not explicitly enabled, idempotence is disabled."; + + " succeeds, then the records in the second batch may appear first."; /** key.serializer */ public static final String KEY_SERIALIZER_CLASS_CONFIG = "key.serializer"; diff --git a/config/server.properties b/config/server.properties index 00e6011a6735d..f5172c297ec66 100644 --- a/config/server.properties +++ b/config/server.properties @@ -31,7 +31,7 @@ broker.id=0 # listeners = listener_name://host_name:port # EXAMPLE: # listeners = PLAINTEXT://your.host.name:9092 -#listeners=PLAINTEXT://:9093 +#listeners=PLAINTEXT://:9092 # Listener name, hostname and port the broker will advertise to clients. # If not set, it uses the value for "listeners". @@ -102,19 +102,18 @@ transaction.state.log.min.isr=1 # from the end of the log. # The minimum age of a log file to be eligible for deletion due to age -log.retention.ms=10000 +log.retention.hours=168 # A size-based retention policy for logs. Segments are pruned from the log unless the remaining # segments drop below log.retention.bytes. Functions independently of log.retention.hours. #log.retention.bytes=1073741824 # The maximum size of a log segment file. When this size is reached a new log segment will be created. -log.segment.bytes=1000 -log.retention.check.interval.ms=10000 +log.segment.bytes=1073741824 # The interval at which log segments are checked to see if they can be deleted according # to the retention policies - +log.retention.check.interval.ms=300000 ############################# Zookeeper #############################