diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index 667b80696ecd5..f739336b85692 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -514,7 +514,7 @@ private TransactionManager configureTransactionState(ProducerConfig config,
LogContext logContext) {
TransactionManager transactionManager = null;
- if (config.idempotenceEnabled()) {
+ if (config.getBoolean(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG)) {
final String transactionalId = config.getString(ProducerConfig.TRANSACTIONAL_ID_CONFIG);
final int transactionTimeoutMs = config.getInt(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG);
final long retryBackoffMs = config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG);
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
index 9a698b1ecef3c..afc1e55cdfdad 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
@@ -27,6 +27,8 @@
import org.apache.kafka.common.config.SecurityConfig;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.serialization.Serializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.util.Collections;
import java.util.HashMap;
@@ -44,6 +46,7 @@
* href="http://kafka.apache.org/documentation.html#producerconfigs">Kafka documentation
*/
public class ProducerConfig extends AbstractConfig {
+ private static final Logger log = LoggerFactory.getLogger(ProducerConfig.class);
/*
* NOTE: DO NOT CHANGE EITHER CONFIG STRINGS OR THEIR JAVA VARIABLE NAMES AS THESE ARE PART OF THE PUBLIC API AND
@@ -104,7 +107,10 @@ public class ProducerConfig extends AbstractConfig {
+ "
acks=all This means the leader will wait for the full set of in-sync replicas to"
+ " acknowledge the record. This guarantees that the record will not be lost as long as at least one in-sync replica"
+ " remains alive. This is the strongest available guarantee. This is equivalent to the acks=-1 setting."
- + "";
+ + ""
+ + ""
+ + "Note that enabling idempotence requires this config value to be 'all'."
+ + " If conflicting configurations are set and idempotence is not explicitly enabled, idempotence is disabled.";
/** linger.ms */
public static final String LINGER_MS_CONFIG = "linger.ms";
@@ -202,25 +208,32 @@ public class ProducerConfig extends AbstractConfig {
/** metric.reporters */
public static final String METRIC_REPORTER_CLASSES_CONFIG = CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG;
+ // max.in.flight.requests.per.connection should be less than or equal to 5 when idempotence producer enabled to ensure message ordering
+ private static final int MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_FOR_IDEMPOTENCE = 5;
+
/** max.in.flight.requests.per.connection */
public static final String MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION = "max.in.flight.requests.per.connection";
private static final String MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_DOC = "The maximum number of unacknowledged requests the client will send on a single connection before blocking."
+ " Note that if this config is set to be greater than 1 and enable.idempotence is set to false, there is a risk of"
- + " message re-ordering after a failed send due to retries (i.e., if retries are enabled).";
- // max.in.flight.requests.per.connection should be less than or equal to 5 when idempotence producer enabled to ensure message ordering
- private static final int MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_FOR_IDEMPOTENCE = 5;
+ + " message re-ordering after a failed send due to retries (i.e., if retries are enabled)."
+ + " Additionally, enabling idempotence requires this config value to be less than or equal to " + MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_FOR_IDEMPOTENCE + "."
+ + " If conflicting configurations are set and idempotence is not explicitly enabled, idempotence is disabled.";
/** retries */
public static final String RETRIES_CONFIG = CommonClientConfigs.RETRIES_CONFIG;
private static final String RETRIES_DOC = "Setting a value greater than zero will cause the client to resend any record whose send fails with a potentially transient error."
+ " Note that this retry is no different than if the client resent the record upon receiving the error."
- + " Allowing retries without setting " + MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION + " to 1 will potentially change the"
- + " ordering of records because if two batches are sent to a single partition, and the first fails and is retried but the second"
- + " succeeds, then the records in the second batch may appear first. Note additionally that produce requests will be"
- + " failed before the number of retries has been exhausted if the timeout configured by"
+ + " Produce requests will be failed before the number of retries has been exhausted if the timeout configured by"
+ " " + DELIVERY_TIMEOUT_MS_CONFIG + " expires first before successful acknowledgement. Users should generally"
+ " prefer to leave this config unset and instead use " + DELIVERY_TIMEOUT_MS_CONFIG + " to control"
- + " retry behavior.";
+ + " retry behavior."
+ + "
"
+ + "Enabling idempotence requires this config value to be greater than 0."
+ + " If conflicting configurations are set and idempotence is not explicitly enabled, idempotence is disabled."
+ + "
"
+ + "Allowing retries while setting enable.idempotence to false and " + MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION + " to 1 will potentially change the"
+ + " ordering of records because if two batches are sent to a single partition, and the first fails and is retried but the second"
+ + " succeeds, then the records in the second batch may appear first.";
/** key.serializer */
public static final String KEY_SERIALIZER_CLASS_CONFIG = "key.serializer";
@@ -273,8 +286,11 @@ public class ProducerConfig extends AbstractConfig {
+ "retries due to broker failures, etc., may write duplicates of the retried message in the stream. "
+ "Note that enabling idempotence requires " + MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION + " to be less than or equal to " + MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_FOR_IDEMPOTENCE
+ " (with message ordering preserved for any allowable value), " + RETRIES_CONFIG + " to be greater than 0, and "
- + ACKS_CONFIG + " must be 'all'. If these values are not explicitly set by the user, suitable values will be chosen. If incompatible "
- + "values are set, a ConfigException will be thrown.";
+ + ACKS_CONFIG + " must be 'all'. "
+ + "
"
+ + "Idempotence is enabled by default if no conflicting configurations are set. "
+ + "If conflicting configurations are set and idempotence is not explicitly enabled, idempotence is disabled. "
+ + "If idempotence is explicitly enabled and conflicting configurations are set, a ConfigException is thrown.";
/** transaction.timeout.ms */
public static final String TRANSACTION_TIMEOUT_CONFIG = "transaction.timeout.ms";
@@ -461,27 +477,53 @@ private void postProcessAndValidateIdempotenceConfigs(final Map
final Map originalConfigs = this.originals();
final String acksStr = parseAcks(this.getString(ACKS_CONFIG));
configs.put(ACKS_CONFIG, acksStr);
-
- // For idempotence producers, values for `RETRIES_CONFIG` and `ACKS_CONFIG` need validation
- if (idempotenceEnabled()) {
- boolean userConfiguredRetries = originalConfigs.containsKey(RETRIES_CONFIG);
- if (userConfiguredRetries && this.getInt(RETRIES_CONFIG) == 0) {
- throw new ConfigException("Must set " + ProducerConfig.RETRIES_CONFIG + " to non-zero when using the idempotent producer.");
+ final boolean userConfiguredIdempotence = this.originals().containsKey(ENABLE_IDEMPOTENCE_CONFIG);
+ boolean idempotenceEnabled = this.getBoolean(ENABLE_IDEMPOTENCE_CONFIG);
+ boolean shouldDisableIdempotence = false;
+
+ // For idempotence producers, values for `retries` and `acks` and `max.in.flight.requests.per.connection` need validation
+ if (idempotenceEnabled) {
+ final int retries = this.getInt(RETRIES_CONFIG);
+ if (retries == 0) {
+ if (userConfiguredIdempotence) {
+ throw new ConfigException("Must set " + RETRIES_CONFIG + " to non-zero when using the idempotent producer.");
+ }
+ log.info("Idempotence will be disabled because {} is set to 0.", RETRIES_CONFIG, retries);
+ shouldDisableIdempotence = true;
}
- boolean userConfiguredAcks = originalConfigs.containsKey(ACKS_CONFIG);
final short acks = Short.valueOf(acksStr);
- if (userConfiguredAcks && acks != (short) -1) {
- throw new ConfigException("Must set " + ACKS_CONFIG + " to all in order to use the idempotent " +
+ if (acks != (short) -1) {
+ if (userConfiguredIdempotence) {
+ throw new ConfigException("Must set " + ACKS_CONFIG + " to all in order to use the idempotent " +
"producer. Otherwise we cannot guarantee idempotence.");
+ }
+ log.info("Idempotence will be disabled because {} is set to {}, not set to 'all'.", ACKS_CONFIG, acks);
+ shouldDisableIdempotence = true;
}
- boolean userConfiguredInflightRequests = originalConfigs.containsKey(MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION);
- if (userConfiguredInflightRequests && MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_FOR_IDEMPOTENCE < this.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION)) {
- throw new ConfigException("Must set " + ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION + " to at most 5" +
+ final int inFlightConnection = this.getInt(MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION);
+ if (MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_FOR_IDEMPOTENCE < inFlightConnection) {
+ if (userConfiguredIdempotence) {
+ throw new ConfigException("Must set " + MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION + " to at most 5" +
" to use the idempotent producer.");
+ }
+ log.warn("Idempotence will be disabled because {} is set to {}, which is greater than 5. " +
+ "Please note that in v4.0.0 and onward, this will become an error.", MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, inFlightConnection);
+ shouldDisableIdempotence = true;
}
}
+
+ if (shouldDisableIdempotence) {
+ configs.put(ENABLE_IDEMPOTENCE_CONFIG, false);
+ idempotenceEnabled = false;
+ }
+
+ // validate `transaction.id` after validating idempotence dependant configs because `enable.idempotence` config might be overridden
+ boolean userConfiguredTransactions = originalConfigs.containsKey(TRANSACTIONAL_ID_CONFIG);
+ if (!idempotenceEnabled && userConfiguredTransactions) {
+ throw new ConfigException("Cannot set a " + ProducerConfig.TRANSACTIONAL_ID_CONFIG + " without also enabling idempotence.");
+ }
}
private static String parseAcks(String acksString) {
@@ -511,15 +553,6 @@ public ProducerConfig(Map props) {
super(CONFIG, props);
}
- boolean idempotenceEnabled() {
- boolean userConfiguredTransactions = this.originals().containsKey(TRANSACTIONAL_ID_CONFIG);
- boolean idempotenceEnabled = this.getBoolean(ENABLE_IDEMPOTENCE_CONFIG);
- if (!idempotenceEnabled && userConfiguredTransactions)
- throw new ConfigException("Cannot set a " + ProducerConfig.TRANSACTIONAL_ID_CONFIG + " without also enabling idempotence.");
-
- return idempotenceEnabled;
- }
-
ProducerConfig(Map, ?> props, boolean doLog) {
super(CONFIG, props, doLog);
}
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
index 05d11d9458105..ed761b7798801 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
@@ -233,6 +233,32 @@ public void testAcksAndIdempotenceForIdempotentProducers() {
config.getString(ProducerConfig.ACKS_CONFIG),
"acks should be overwritten");
+ Properties validProps4 = new Properties() {{
+ putAll(baseProps);
+ setProperty(ProducerConfig.ACKS_CONFIG, "0");
+ }};
+ config = new ProducerConfig(validProps4);
+ assertFalse(
+ config.getBoolean(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG),
+ "idempotence should be disabled when acks not set to all and `enable.idempotence` config is unset.");
+ assertEquals(
+ "0",
+ config.getString(ProducerConfig.ACKS_CONFIG),
+ "acks should be set with overridden value");
+
+ Properties validProps5 = new Properties() {{
+ putAll(baseProps);
+ setProperty(ProducerConfig.ACKS_CONFIG, "1");
+ }};
+ config = new ProducerConfig(validProps5);
+ assertFalse(
+ config.getBoolean(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG),
+ "idempotence should be disabled when acks not set to all and `enable.idempotence` config is unset.");
+ assertEquals(
+ "1",
+ config.getString(ProducerConfig.ACKS_CONFIG),
+ "acks should be set with overridden value");
+
Properties invalidProps = new Properties() {{
putAll(baseProps);
setProperty(ProducerConfig.ACKS_CONFIG, "0");
@@ -258,21 +284,12 @@ public void testAcksAndIdempotenceForIdempotentProducers() {
Properties invalidProps3 = new Properties() {{
putAll(baseProps);
setProperty(ProducerConfig.ACKS_CONFIG, "0");
- }};
- assertThrows(
- ConfigException.class,
- () -> new ProducerConfig(invalidProps3),
- "Must set acks to all in order to use the idempotent producer");
-
- Properties invalidProps4 = new Properties() {{
- putAll(baseProps);
- setProperty(ProducerConfig.ACKS_CONFIG, "0");
setProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "transactionalId");
}};
assertThrows(
ConfigException.class,
- () -> new ProducerConfig(invalidProps4),
- "Must set retries to non-zero when using the idempotent producer.");
+ () -> new ProducerConfig(invalidProps3),
+ "Must set acks to all when using the transactional producer.");
}
@Test
@@ -297,6 +314,19 @@ public void testRetriesAndIdempotenceForIdempotentProducers() {
config.getInt(ProducerConfig.RETRIES_CONFIG),
"retries should be overwritten");
+ Properties validProps2 = new Properties() {{
+ putAll(baseProps);
+ setProperty(ProducerConfig.RETRIES_CONFIG, "0");
+ }};
+ config = new ProducerConfig(validProps2);
+ assertFalse(
+ config.getBoolean(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG),
+ "idempotence should be disabled when retries set to 0 and `enable.idempotence` config is unset.");
+ assertEquals(
+ 0,
+ config.getInt(ProducerConfig.RETRIES_CONFIG),
+ "retries should be set with overridden value");
+
Properties invalidProps = new Properties() {{
putAll(baseProps);
setProperty(ProducerConfig.RETRIES_CONFIG, "0");
@@ -311,32 +341,23 @@ public void testRetriesAndIdempotenceForIdempotentProducers() {
Properties invalidProps2 = new Properties() {{
putAll(baseProps);
setProperty(ProducerConfig.RETRIES_CONFIG, "0");
- }};
- assertThrows(
- ConfigException.class,
- () -> new ProducerConfig(invalidProps2),
- "Must set retries to non-zero when using the idempotent producer.");
-
- Properties invalidProps3 = new Properties() {{
- putAll(baseProps);
- setProperty(ProducerConfig.RETRIES_CONFIG, "0");
// explicitly enabling idempotence should still throw exception
setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
}};
assertThrows(
ConfigException.class,
- () -> new ProducerConfig(invalidProps3),
+ () -> new ProducerConfig(invalidProps2),
"Must set retries to non-zero when using the idempotent producer.");
- Properties invalidProps4 = new Properties() {{
+ Properties invalidProps3 = new Properties() {{
putAll(baseProps);
setProperty(ProducerConfig.RETRIES_CONFIG, "0");
setProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "transactionalId");
}};
assertThrows(
ConfigException.class,
- () -> new ProducerConfig(invalidProps4),
- "Must set retries to non-zero when using the idempotent producer.");
+ () -> new ProducerConfig(invalidProps3),
+ "Must set retries to non-zero when using the transactional producer.");
}
@Test
@@ -349,7 +370,7 @@ public void testInflightRequestsAndIdempotenceForIdempotentProducers() {
Properties validProps = new Properties() {{
putAll(baseProps);
- setProperty(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "10");
+ setProperty(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "6");
setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "false");
}};
ProducerConfig config = new ProducerConfig(validProps);
@@ -357,10 +378,24 @@ public void testInflightRequestsAndIdempotenceForIdempotentProducers() {
config.getBoolean(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG),
"idempotence should be overwritten");
assertEquals(
- 10,
+ 6,
config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION),
"max.in.flight.requests.per.connection should be overwritten");
+ Properties validProps2 = new Properties() {{
+ putAll(baseProps);
+ setProperty(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "6");
+ }};
+ config = new ProducerConfig(validProps2);
+ assertFalse(
+ config.getBoolean(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG),
+ "idempotence should be disabled when `max.in.flight.requests.per.connection` is greater than 5 and " +
+ "`enable.idempotence` config is unset.");
+ assertEquals(
+ 6,
+ config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION),
+ "`max.in.flight.requests.per.connection` should be set with overridden value");
+
Properties invalidProps = new Properties() {{
putAll(baseProps);
setProperty(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "5");
@@ -374,32 +409,23 @@ public void testInflightRequestsAndIdempotenceForIdempotentProducers() {
Properties invalidProps2 = new Properties() {{
putAll(baseProps);
- setProperty(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "10");
- }};
- assertThrows(
- ConfigException.class,
- () -> new ProducerConfig(invalidProps2),
- "Must set max.in.flight.requests.per.connection to at most 5 when using the idempotent producer.");
-
- Properties invalidProps3 = new Properties() {{
- putAll(baseProps);
- setProperty(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "10");
+ setProperty(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "6");
// explicitly enabling idempotence should still throw exception
setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
}};
assertThrows(
ConfigException.class,
- () -> new ProducerConfig(invalidProps3),
+ () -> new ProducerConfig(invalidProps2),
"Must set max.in.flight.requests.per.connection to at most 5 when using the idempotent producer.");
- Properties invalidProps4 = new Properties() {{
+ Properties invalidProps3 = new Properties() {{
putAll(baseProps);
- setProperty(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "10");
+ setProperty(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "6");
setProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "transactionalId");
}};
assertThrows(
ConfigException.class,
- () -> new ProducerConfig(invalidProps4),
+ () -> new ProducerConfig(invalidProps3),
"Must set retries to non-zero when using the idempotent producer.");
}
diff --git a/docs/upgrade.html b/docs/upgrade.html
index 0fbc9fd646ecf..ddcee32053eb5 100644
--- a/docs/upgrade.html
+++ b/docs/upgrade.html
@@ -21,7 +21,7 @@
- - Idempotence for the producer is enabled by default. In 3.0.0 and 3.1.0, a bug prevented this default from being applied,
+
- Idempotence for the producer is enabled by default if no conflicting configurations are set. In 3.0.0 and 3.1.0, a bug prevented this default from being applied,
which meant that idempotence remained disabled unless the user had explicitly set
enable.idempotence to true
(See KAFKA-13598for more details).
This issue was fixed and the default is properly applied in 3.0.1, 3.1.1, and 3.2.0.
@@ -71,7 +71,8 @@