From 462432d0e2e4efc0495d5f7c72728c8e9524b9c0 Mon Sep 17 00:00:00 2001 From: Yang Yu Date: Wed, 23 Mar 2022 12:09:17 -0500 Subject: [PATCH 1/3] add config to explicitly disable idempotence --- .../java/org/apache/kafka/log4jappender/KafkaLog4jAppender.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/log4j-appender/src/main/java/org/apache/kafka/log4jappender/KafkaLog4jAppender.java b/log4j-appender/src/main/java/org/apache/kafka/log4jappender/KafkaLog4jAppender.java index 23272a2cb5d3f..c4cac00dc52a5 100644 --- a/log4j-appender/src/main/java/org/apache/kafka/log4jappender/KafkaLog4jAppender.java +++ b/log4j-appender/src/main/java/org/apache/kafka/log4jappender/KafkaLog4jAppender.java @@ -43,6 +43,7 @@ import static org.apache.kafka.clients.producer.ProducerConfig.MAX_BLOCK_MS_CONFIG; import static org.apache.kafka.clients.producer.ProducerConfig.RETRIES_CONFIG; import static org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG; +import static org.apache.kafka.clients.producer.ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG; import static org.apache.kafka.common.config.SaslConfigs.SASL_JAAS_CONFIG; import static org.apache.kafka.common.config.SaslConfigs.SASL_KERBEROS_SERVICE_NAME; import static org.apache.kafka.common.config.SaslConfigs.SASL_MECHANISM; @@ -290,6 +291,7 @@ public void activateOptions() { props.put(DELIVERY_TIMEOUT_MS_CONFIG, deliveryTimeoutMs); props.put(LINGER_MS_CONFIG, lingerMs); props.put(BATCH_SIZE_CONFIG, batchSize); + props.put(ENABLE_IDEMPOTENCE_CONFIG, false); if (securityProtocol != null) { props.put(SECURITY_PROTOCOL_CONFIG, securityProtocol); From 3d88be4da9d2364ed98ddb327fcc1965aa1dcab4 Mon Sep 17 00:00:00 2001 From: Yang Yu Date: Wed, 23 Mar 2022 15:03:42 -0500 Subject: [PATCH 2/3] add comment to explain reason for disabling idempotence --- .../java/org/apache/kafka/log4jappender/KafkaLog4jAppender.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/log4j-appender/src/main/java/org/apache/kafka/log4jappender/KafkaLog4jAppender.java b/log4j-appender/src/main/java/org/apache/kafka/log4jappender/KafkaLog4jAppender.java index c4cac00dc52a5..1b6e6522b9b99 100644 --- a/log4j-appender/src/main/java/org/apache/kafka/log4jappender/KafkaLog4jAppender.java +++ b/log4j-appender/src/main/java/org/apache/kafka/log4jappender/KafkaLog4jAppender.java @@ -291,6 +291,8 @@ public void activateOptions() { props.put(DELIVERY_TIMEOUT_MS_CONFIG, deliveryTimeoutMs); props.put(LINGER_MS_CONFIG, lingerMs); props.put(BATCH_SIZE_CONFIG, batchSize); + // Disabling idempotence as a temporary workaround for KAFKA-13761. This is to avoid deadlock when producer + // network thread appends to log while interacting with TransactionManager. props.put(ENABLE_IDEMPOTENCE_CONFIG, false); if (securityProtocol != null) { From 21abd03005e23188cc0cb54df2296ac522895cab Mon Sep 17 00:00:00 2001 From: Ismael Juma Date: Wed, 23 Mar 2022 13:07:59 -0700 Subject: [PATCH 3/3] Tweak comment --- .../org/apache/kafka/log4jappender/KafkaLog4jAppender.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/log4j-appender/src/main/java/org/apache/kafka/log4jappender/KafkaLog4jAppender.java b/log4j-appender/src/main/java/org/apache/kafka/log4jappender/KafkaLog4jAppender.java index 1b6e6522b9b99..c561fc23608bf 100644 --- a/log4j-appender/src/main/java/org/apache/kafka/log4jappender/KafkaLog4jAppender.java +++ b/log4j-appender/src/main/java/org/apache/kafka/log4jappender/KafkaLog4jAppender.java @@ -291,8 +291,8 @@ public void activateOptions() { props.put(DELIVERY_TIMEOUT_MS_CONFIG, deliveryTimeoutMs); props.put(LINGER_MS_CONFIG, lingerMs); props.put(BATCH_SIZE_CONFIG, batchSize); - // Disabling idempotence as a temporary workaround for KAFKA-13761. This is to avoid deadlock when producer - // network thread appends to log while interacting with TransactionManager. + // Disable idempotence to avoid deadlock when the producer network thread writes a log line while interacting + // with the TransactionManager, see KAFKA-13761 for more information. props.put(ENABLE_IDEMPOTENCE_CONFIG, false); if (securityProtocol != null) {