Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -514,7 +514,7 @@ private TransactionManager configureTransactionState(ProducerConfig config,
LogContext logContext) {
TransactionManager transactionManager = null;

if (config.idempotenceEnabled()) {
if (config.getBoolean(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG)) {
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

refactor: the idempotenceEnabled() method is only to get the config of ENABLE_IDEMPOTENCE_CONFIG now.

final String transactionalId = config.getString(ProducerConfig.TRANSACTIONAL_ID_CONFIG);
final int transactionTimeoutMs = config.getInt(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG);
final long retryBackoffMs = config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import org.apache.kafka.common.config.SecurityConfig;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.serialization.Serializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Collections;
import java.util.HashMap;
Expand All @@ -44,6 +46,7 @@
* href="http://kafka.apache.org/documentation.html#producerconfigs">Kafka documentation</a>
*/
public class ProducerConfig extends AbstractConfig {
private static final Logger log = LoggerFactory.getLogger(ProducerConfig.class);

/*
* NOTE: DO NOT CHANGE EITHER CONFIG STRINGS OR THEIR JAVA VARIABLE NAMES AS THESE ARE PART OF THE PUBLIC API AND
Expand Down Expand Up @@ -104,7 +107,10 @@ public class ProducerConfig extends AbstractConfig {
+ " <li><code>acks=all</code> This means the leader will wait for the full set of in-sync replicas to"
+ " acknowledge the record. This guarantees that the record will not be lost as long as at least one in-sync replica"
+ " remains alive. This is the strongest available guarantee. This is equivalent to the acks=-1 setting."
+ "</ul>";
+ "</ul>"
+ "<p>"
+ "Note that enabling idempotence requires this config value to be 'all'."
+ " If conflicting configurations are set and idempotence is not explicitly enabled, idempotence is disabled.";

/** <code>linger.ms</code> */
public static final String LINGER_MS_CONFIG = "linger.ms";
Expand Down Expand Up @@ -202,25 +208,32 @@ public class ProducerConfig extends AbstractConfig {
/** <code>metric.reporters</code> */
public static final String METRIC_REPORTER_CLASSES_CONFIG = CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG;

// max.in.flight.requests.per.connection should be less than or equal to 5 when idempotence producer enabled to ensure message ordering
private static final int MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_FOR_IDEMPOTENCE = 5;

/** <code>max.in.flight.requests.per.connection</code> */
public static final String MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION = "max.in.flight.requests.per.connection";
private static final String MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_DOC = "The maximum number of unacknowledged requests the client will send on a single connection before blocking."
+ " Note that if this config is set to be greater than 1 and <code>enable.idempotence</code> is set to false, there is a risk of"
+ " message re-ordering after a failed send due to retries (i.e., if retries are enabled).";
// max.in.flight.requests.per.connection should be less than or equal to 5 when idempotence producer enabled to ensure message ordering
private static final int MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_FOR_IDEMPOTENCE = 5;
+ " message re-ordering after a failed send due to retries (i.e., if retries are enabled)."
+ " Additionally, enabling idempotence requires this config value to be less than or equal to " + MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_FOR_IDEMPOTENCE + "."
+ " If conflicting configurations are set and idempotence is not explicitly enabled, idempotence is disabled.";

/** <code>retries</code> */
public static final String RETRIES_CONFIG = CommonClientConfigs.RETRIES_CONFIG;
private static final String RETRIES_DOC = "Setting a value greater than zero will cause the client to resend any record whose send fails with a potentially transient error."
+ " Note that this retry is no different than if the client resent the record upon receiving the error."
+ " Allowing retries without setting <code>" + MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION + "</code> to 1 will potentially change the"
+ " ordering of records because if two batches are sent to a single partition, and the first fails and is retried but the second"
+ " succeeds, then the records in the second batch may appear first. Note additionally that produce requests will be"
+ " failed before the number of retries has been exhausted if the timeout configured by"
+ " Produce requests will be failed before the number of retries has been exhausted if the timeout configured by"
+ " <code>" + DELIVERY_TIMEOUT_MS_CONFIG + "</code> expires first before successful acknowledgement. Users should generally"
+ " prefer to leave this config unset and instead use <code>" + DELIVERY_TIMEOUT_MS_CONFIG + "</code> to control"
+ " retry behavior.";
+ " retry behavior."
+ "<p>"
+ "Enabling idempotence requires this config value to be greater than 0."
+ " If conflicting configurations are set and idempotence is not explicitly enabled, idempotence is disabled."
+ "<p>"
+ "Allowing retries while setting <code>enable.idempotence</code> to <code>false</code> and <code>" + MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION + "</code> to 1 will potentially change the"
+ " ordering of records because if two batches are sent to a single partition, and the first fails and is retried but the second"
+ " succeeds, then the records in the second batch may appear first.";

/** <code>key.serializer</code> */
public static final String KEY_SERIALIZER_CLASS_CONFIG = "key.serializer";
Expand Down Expand Up @@ -273,8 +286,11 @@ public class ProducerConfig extends AbstractConfig {
+ "retries due to broker failures, etc., may write duplicates of the retried message in the stream. "
+ "Note that enabling idempotence requires <code>" + MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION + "</code> to be less than or equal to " + MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_FOR_IDEMPOTENCE
+ " (with message ordering preserved for any allowable value), <code>" + RETRIES_CONFIG + "</code> to be greater than 0, and <code>"
+ ACKS_CONFIG + "</code> must be 'all'. If these values are not explicitly set by the user, suitable values will be chosen. If incompatible "
+ "values are set, a <code>ConfigException</code> will be thrown.";
+ ACKS_CONFIG + "</code> must be 'all'. "
+ "<p>"
+ "Idempotence is enabled by default if no conflicting configurations are set. "
+ "If conflicting configurations are set and idempotence is not explicitly enabled, idempotence is disabled. "
+ "If idempotence is explicitly enabled and conflicting configurations are set, a <code>ConfigException</code> is thrown.";

/** <code> transaction.timeout.ms </code> */
public static final String TRANSACTION_TIMEOUT_CONFIG = "transaction.timeout.ms";
Expand Down Expand Up @@ -461,27 +477,53 @@ private void postProcessAndValidateIdempotenceConfigs(final Map<String, Object>
final Map<String, Object> originalConfigs = this.originals();
final String acksStr = parseAcks(this.getString(ACKS_CONFIG));
configs.put(ACKS_CONFIG, acksStr);

// For idempotence producers, values for `RETRIES_CONFIG` and `ACKS_CONFIG` need validation
if (idempotenceEnabled()) {
boolean userConfiguredRetries = originalConfigs.containsKey(RETRIES_CONFIG);
if (userConfiguredRetries && this.getInt(RETRIES_CONFIG) == 0) {
throw new ConfigException("Must set " + ProducerConfig.RETRIES_CONFIG + " to non-zero when using the idempotent producer.");
final boolean userConfiguredIdempotence = this.originals().containsKey(ENABLE_IDEMPOTENCE_CONFIG);
boolean idempotenceEnabled = this.getBoolean(ENABLE_IDEMPOTENCE_CONFIG);
boolean shouldDisableIdempotence = false;

// For idempotence producers, values for `retries` and `acks` and `max.in.flight.requests.per.connection` need validation
if (idempotenceEnabled) {
final int retries = this.getInt(RETRIES_CONFIG);
if (retries == 0) {
if (userConfiguredIdempotence) {
throw new ConfigException("Must set " + RETRIES_CONFIG + " to non-zero when using the idempotent producer.");
Comment thread
ijuma marked this conversation as resolved.
}
log.info("Idempotence will be disabled because {} is set to 0.", RETRIES_CONFIG, retries);
shouldDisableIdempotence = true;
}

boolean userConfiguredAcks = originalConfigs.containsKey(ACKS_CONFIG);
final short acks = Short.valueOf(acksStr);
if (userConfiguredAcks && acks != (short) -1) {
throw new ConfigException("Must set " + ACKS_CONFIG + " to all in order to use the idempotent " +
if (acks != (short) -1) {
if (userConfiguredIdempotence) {
throw new ConfigException("Must set " + ACKS_CONFIG + " to all in order to use the idempotent " +
"producer. Otherwise we cannot guarantee idempotence.");
}
log.info("Idempotence will be disabled because {} is set to {}, not set to 'all'.", ACKS_CONFIG, acks);
shouldDisableIdempotence = true;
}

boolean userConfiguredInflightRequests = originalConfigs.containsKey(MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION);
if (userConfiguredInflightRequests && MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_FOR_IDEMPOTENCE < this.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION)) {
throw new ConfigException("Must set " + ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION + " to at most 5" +
final int inFlightConnection = this.getInt(MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION);
if (MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_FOR_IDEMPOTENCE < inFlightConnection) {
if (userConfiguredIdempotence) {
throw new ConfigException("Must set " + MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION + " to at most 5" +
" to use the idempotent producer.");
}
log.warn("Idempotence will be disabled because {} is set to {}, which is greater than 5. " +
"Please note that in v4.0.0 and onward, this will become an error.", MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, inFlightConnection);
shouldDisableIdempotence = true;
}
}

if (shouldDisableIdempotence) {
configs.put(ENABLE_IDEMPOTENCE_CONFIG, false);
idempotenceEnabled = false;
}

// validate `transaction.id` after validating idempotence dependant configs because `enable.idempotence` config might be overridden
boolean userConfiguredTransactions = originalConfigs.containsKey(TRANSACTIONAL_ID_CONFIG);
if (!idempotenceEnabled && userConfiguredTransactions) {
throw new ConfigException("Cannot set a " + ProducerConfig.TRANSACTIONAL_ID_CONFIG + " without also enabling idempotence.");
}
}

private static String parseAcks(String acksString) {
Expand Down Expand Up @@ -511,15 +553,6 @@ public ProducerConfig(Map<String, Object> props) {
super(CONFIG, props);
}

boolean idempotenceEnabled() {
boolean userConfiguredTransactions = this.originals().containsKey(TRANSACTIONAL_ID_CONFIG);
boolean idempotenceEnabled = this.getBoolean(ENABLE_IDEMPOTENCE_CONFIG);
if (!idempotenceEnabled && userConfiguredTransactions)
throw new ConfigException("Cannot set a " + ProducerConfig.TRANSACTIONAL_ID_CONFIG + " without also enabling idempotence.");

return idempotenceEnabled;
}
Comment on lines -514 to -521
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move the transaction.id config validation into validation method, so that we don't need to validate it each time when checking idempotenceEnabled.


ProducerConfig(Map<?, ?> props, boolean doLog) {
super(CONFIG, props, doLog);
}
Expand Down
Loading