From e2903d337010fcd7e2c717343012f1bbc912c0a4 Mon Sep 17 00:00:00 2001 From: abbccdda Date: Thu, 6 Feb 2020 10:42:00 -0800 Subject: [PATCH 1/8] make code clean --- .../examples/ExactlyOnceMessageProcessor.java | 69 ++++++++++--------- 1 file changed, 36 insertions(+), 33 deletions(-) diff --git a/examples/src/main/java/kafka/examples/ExactlyOnceMessageProcessor.java b/examples/src/main/java/kafka/examples/ExactlyOnceMessageProcessor.java index 482e442850938..1d9ed347e73a9 100644 --- a/examples/src/main/java/kafka/examples/ExactlyOnceMessageProcessor.java +++ b/examples/src/main/java/kafka/examples/ExactlyOnceMessageProcessor.java @@ -124,44 +124,13 @@ public void onPartitionsAssigned(Collection partitions) { } int messageProcessed = 0; - boolean abortPreviousTransaction = false; while (messageRemaining.get() > 0) { ConsumerRecords records = consumer.poll(Duration.ofMillis(200)); if (records.count() > 0) { try { - // Abort previous transaction if instructed. - if (abortPreviousTransaction) { - producer.abortTransaction(); - // The consumer fetch position also needs to be reset. - resetToLastCommittedPositions(consumer); - abortPreviousTransaction = false; - } - // Begin a new transaction session. - producer.beginTransaction(); - for (ConsumerRecord record : records) { - // Process the record and send to downstream. - ProducerRecord customizedRecord = transform(record); - producer.send(customizedRecord); - } - Map positions = new HashMap<>(); - for (TopicPartition topicPartition : consumer.assignment()) { - positions.put(topicPartition, new OffsetAndMetadata(consumer.position(topicPartition), null)); - } - // Checkpoint the progress by sending offsets to group coordinator broker. - // Under group mode, we must apply consumer group metadata for proper fencing. - if (this.mode.equals("groupMode")) { - producer.sendOffsetsToTransaction(positions, consumer.groupMetadata()); - } else { - producer.sendOffsetsToTransaction(positions, consumerGroupId); - } - - // Finish the transaction. All sent records should be visible for consumption now. - producer.commitTransaction(); - messageProcessed += records.count(); - } catch (CommitFailedException e) { - // In case of a retriable exception, suggest aborting the ongoing transaction for correctness. - abortPreviousTransaction = true; + messageProcessed = processMessages(messageProcessed, records); } catch (ProducerFencedException | FencedInstanceIdException e) { + // Normally fenced exceptions are fatal, we catch them here just for demo purpose. throw new KafkaException("Encountered fatal error during processing: " + e.getMessage()); } } @@ -173,6 +142,40 @@ public void onPartitionsAssigned(Collection partitions) { latch.countDown(); } + private int processMessages(int messageProcessed, ConsumerRecords records) + throws ProducerFencedException, FencedInstanceIdException { + try { + // Begin a new transaction session. + producer.beginTransaction(); + for (ConsumerRecord record : records) { + // Process the record and send to downstream. + ProducerRecord customizedRecord = transform(record); + producer.send(customizedRecord); + } + Map positions = new HashMap<>(); + for (TopicPartition topicPartition : consumer.assignment()) { + positions.put(topicPartition, new OffsetAndMetadata(consumer.position(topicPartition), null)); + } + // Checkpoint the progress by sending offsets to group coordinator broker. + // Under group mode, we must apply consumer group metadata for proper fencing. + if (this.mode.equals("groupMode")) { + producer.sendOffsetsToTransaction(positions, consumer.groupMetadata()); + } else { + producer.sendOffsetsToTransaction(positions, consumerGroupId); + } + + // Finish the transaction. All sent records should be visible for consumption now. + producer.commitTransaction(); + messageProcessed += records.count(); + } catch (CommitFailedException e) { + // In case of a retriable exception, suggest aborting the ongoing transaction for correctness. + producer.abortTransaction(); + // The consumer fetch position also needs to be reset. + resetToLastCommittedPositions(consumer); + } + return messageProcessed; + } + private void printWithTxnId(final String message) { System.out.println(transactionalId + ": " + message); } From d3053239c3c0ea377c5eed51b4be273e8fca3d43 Mon Sep 17 00:00:00 2001 From: abbccdda Date: Thu, 6 Feb 2020 11:38:57 -0800 Subject: [PATCH 2/8] add more exception types --- .../examples/ExactlyOnceMessageProcessor.java | 76 +++++++++++-------- 1 file changed, 44 insertions(+), 32 deletions(-) diff --git a/examples/src/main/java/kafka/examples/ExactlyOnceMessageProcessor.java b/examples/src/main/java/kafka/examples/ExactlyOnceMessageProcessor.java index 1d9ed347e73a9..c02d9b8b83006 100644 --- a/examples/src/main/java/kafka/examples/ExactlyOnceMessageProcessor.java +++ b/examples/src/main/java/kafka/examples/ExactlyOnceMessageProcessor.java @@ -20,14 +20,23 @@ import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.InvalidOffsetException; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.AuthenticationException; +import org.apache.kafka.common.errors.AuthorizationException; import org.apache.kafka.common.errors.FencedInstanceIdException; +import org.apache.kafka.common.errors.InterruptException; +import org.apache.kafka.common.errors.InvalidTopicException; import org.apache.kafka.common.errors.ProducerFencedException; +import org.apache.kafka.common.errors.TimeoutException; +import org.apache.kafka.common.errors.UnsupportedForMessageFormatException; +import org.apache.kafka.common.errors.UnsupportedVersionException; +import org.apache.kafka.common.errors.WakeupException; import java.time.Duration; import java.util.ArrayList; @@ -125,14 +134,14 @@ public void onPartitionsAssigned(Collection partitions) { int messageProcessed = 0; while (messageRemaining.get() > 0) { - ConsumerRecords records = consumer.poll(Duration.ofMillis(200)); - if (records.count() > 0) { - try { - messageProcessed = processMessages(messageProcessed, records); - } catch (ProducerFencedException | FencedInstanceIdException e) { - // Normally fenced exceptions are fatal, we catch them here just for demo purpose. - throw new KafkaException("Encountered fatal error during processing: " + e.getMessage()); - } + try { + messageProcessed += processMessages(); + } catch (ProducerFencedException | FencedInstanceIdException | AuthorizationException | + AuthenticationException | UnsupportedVersionException | + UnsupportedForMessageFormatException | InvalidTopicException | + InvalidOffsetException e) { + // Normally fenced exceptions are fatal, we catch them here just for demo purpose. + throw new KafkaException("Encountered fatal error during processing: " + e.getMessage()); } messageRemaining.set(messagesRemaining(consumer)); printWithTxnId("Message remaining: " + messageRemaining); @@ -142,38 +151,41 @@ public void onPartitionsAssigned(Collection partitions) { latch.countDown(); } - private int processMessages(int messageProcessed, ConsumerRecords records) + private int processMessages() throws ProducerFencedException, FencedInstanceIdException { try { - // Begin a new transaction session. - producer.beginTransaction(); - for (ConsumerRecord record : records) { - // Process the record and send to downstream. - ProducerRecord customizedRecord = transform(record); - producer.send(customizedRecord); - } - Map positions = new HashMap<>(); - for (TopicPartition topicPartition : consumer.assignment()) { - positions.put(topicPartition, new OffsetAndMetadata(consumer.position(topicPartition), null)); - } - // Checkpoint the progress by sending offsets to group coordinator broker. - // Under group mode, we must apply consumer group metadata for proper fencing. - if (this.mode.equals("groupMode")) { - producer.sendOffsetsToTransaction(positions, consumer.groupMetadata()); - } else { - producer.sendOffsetsToTransaction(positions, consumerGroupId); - } + ConsumerRecords records = consumer.poll(Duration.ofMillis(200)); + if (records.count() > 0) { + // Begin a new transaction session. + producer.beginTransaction(); + for (ConsumerRecord record : records) { + // Process the record and send to downstream. + ProducerRecord customizedRecord = transform(record); + producer.send(customizedRecord); + } + Map positions = new HashMap<>(); + for (TopicPartition topicPartition : consumer.assignment()) { + positions.put(topicPartition, new OffsetAndMetadata(consumer.position(topicPartition), null)); + } + // Checkpoint the progress by sending offsets to group coordinator broker. + // Under group mode, we must apply consumer group metadata for proper fencing. + if (this.mode.equals("groupMode")) { + producer.sendOffsetsToTransaction(positions, consumer.groupMetadata()); + } else { + producer.sendOffsetsToTransaction(positions, consumerGroupId); + } - // Finish the transaction. All sent records should be visible for consumption now. - producer.commitTransaction(); - messageProcessed += records.count(); - } catch (CommitFailedException e) { + // Finish the transaction. All sent records should be visible for consumption now. + producer.commitTransaction(); + return records.count(); + } + } catch (CommitFailedException | WakeupException | InterruptException | TimeoutException e) { // In case of a retriable exception, suggest aborting the ongoing transaction for correctness. producer.abortTransaction(); // The consumer fetch position also needs to be reset. resetToLastCommittedPositions(consumer); } - return messageProcessed; + return 0; } private void printWithTxnId(final String message) { From 1c01fbe0211de5a19732d19fc4102162a437b76b Mon Sep 17 00:00:00 2001 From: abbccdda Date: Fri, 14 Feb 2020 00:00:23 -0800 Subject: [PATCH 3/8] address comments --- examples/src/main/java/kafka/examples/Consumer.java | 3 +++ .../kafka/examples/ExactlyOnceMessageProcessor.java | 10 ++++++---- .../java/kafka/examples/KafkaConsumerProducerDemo.java | 3 ++- .../main/java/kafka/examples/KafkaExactlyOnceDemo.java | 3 ++- 4 files changed, 13 insertions(+), 6 deletions(-) diff --git a/examples/src/main/java/kafka/examples/Consumer.java b/examples/src/main/java/kafka/examples/Consumer.java index 19cb67ccc9158..d7488327ea661 100644 --- a/examples/src/main/java/kafka/examples/Consumer.java +++ b/examples/src/main/java/kafka/examples/Consumer.java @@ -24,6 +24,7 @@ import java.time.Duration; import java.util.Collections; +import java.util.Optional; import java.util.Properties; import java.util.concurrent.CountDownLatch; @@ -37,6 +38,7 @@ public class Consumer extends ShutdownableThread { public Consumer(final String topic, final String groupId, + final Optional instanceId, final boolean readCommitted, final int numMessageToConsume, final CountDownLatch latch) { @@ -45,6 +47,7 @@ public Consumer(final String topic, Properties props = new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaProperties.KAFKA_SERVER_URL + ":" + KafkaProperties.KAFKA_SERVER_PORT); props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); + instanceId.ifPresent(id -> props.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, id)); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000"); props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000"); diff --git a/examples/src/main/java/kafka/examples/ExactlyOnceMessageProcessor.java b/examples/src/main/java/kafka/examples/ExactlyOnceMessageProcessor.java index c02d9b8b83006..2c07ba2f4d139 100644 --- a/examples/src/main/java/kafka/examples/ExactlyOnceMessageProcessor.java +++ b/examples/src/main/java/kafka/examples/ExactlyOnceMessageProcessor.java @@ -45,6 +45,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicLong; @@ -91,7 +92,9 @@ public ExactlyOnceMessageProcessor(final String mode, // A unique transactional.id must be provided in order to properly use EOS. producer = new Producer(outputTopic, true, transactionalId, true, -1, transactionTimeoutMs, null).get(); // Consumer must be in read_committed mode, which means it won't be able to read uncommitted data. - consumer = new Consumer(inputTopic, consumerGroupId, READ_COMMITTED, -1, null).get(); + // Consumer could optionally configure groupInstanceId to avoid unnecessary rebalances. + Optional groupInstanceId = Optional.of("Txn-consumer-" + instanceIdx); + consumer = new Consumer(inputTopic, consumerGroupId, groupInstanceId, READ_COMMITTED, -1, null).get(); this.latch = latch; } @@ -138,8 +141,7 @@ public void onPartitionsAssigned(Collection partitions) { messageProcessed += processMessages(); } catch (ProducerFencedException | FencedInstanceIdException | AuthorizationException | AuthenticationException | UnsupportedVersionException | - UnsupportedForMessageFormatException | InvalidTopicException | - InvalidOffsetException e) { + UnsupportedForMessageFormatException | InvalidTopicException e) { // Normally fenced exceptions are fatal, we catch them here just for demo purpose. throw new KafkaException("Encountered fatal error during processing: " + e.getMessage()); } @@ -179,7 +181,7 @@ private int processMessages() producer.commitTransaction(); return records.count(); } - } catch (CommitFailedException | WakeupException | InterruptException | TimeoutException e) { + } catch (CommitFailedException | TimeoutException | InvalidOffsetException e) { // In case of a retriable exception, suggest aborting the ongoing transaction for correctness. producer.abortTransaction(); // The consumer fetch position also needs to be reset. diff --git a/examples/src/main/java/kafka/examples/KafkaConsumerProducerDemo.java b/examples/src/main/java/kafka/examples/KafkaConsumerProducerDemo.java index 8a29402836cea..9fc911acac02b 100644 --- a/examples/src/main/java/kafka/examples/KafkaConsumerProducerDemo.java +++ b/examples/src/main/java/kafka/examples/KafkaConsumerProducerDemo.java @@ -18,6 +18,7 @@ import org.apache.kafka.common.errors.TimeoutException; +import java.util.Optional; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -28,7 +29,7 @@ public static void main(String[] args) throws InterruptedException { Producer producerThread = new Producer(KafkaProperties.TOPIC, isAsync, null, false, 10000, -1, latch); producerThread.start(); - Consumer consumerThread = new Consumer(KafkaProperties.TOPIC, "DemoConsumer", false, 10000, latch); + Consumer consumerThread = new Consumer(KafkaProperties.TOPIC, "DemoConsumer", Optional.empty(), false, 10000, latch); consumerThread.start(); if (!latch.await(5, TimeUnit.MINUTES)) { diff --git a/examples/src/main/java/kafka/examples/KafkaExactlyOnceDemo.java b/examples/src/main/java/kafka/examples/KafkaExactlyOnceDemo.java index 6da159cf92170..6437944bdf105 100644 --- a/examples/src/main/java/kafka/examples/KafkaExactlyOnceDemo.java +++ b/examples/src/main/java/kafka/examples/KafkaExactlyOnceDemo.java @@ -25,6 +25,7 @@ import java.util.Arrays; import java.util.List; +import java.util.Optional; import java.util.Properties; import java.util.Set; import java.util.concurrent.CountDownLatch; @@ -112,7 +113,7 @@ public static void main(String[] args) throws InterruptedException, ExecutionExc CountDownLatch consumeLatch = new CountDownLatch(1); /* Stage 4: consume all processed messages to verify exactly once */ - Consumer consumerThread = new Consumer(OUTPUT_TOPIC, "Verify-consumer", true, numRecords, consumeLatch); + Consumer consumerThread = new Consumer(OUTPUT_TOPIC, "Verify-consumer", Optional.empty(), true, numRecords, consumeLatch); consumerThread.start(); if (!consumeLatch.await(5, TimeUnit.MINUTES)) { From 5c1166f7f85393ed8db7732b876e9cea2b02d604 Mon Sep 17 00:00:00 2001 From: abbccdda Date: Tue, 18 Feb 2020 11:34:07 -0800 Subject: [PATCH 4/8] avoid doing the separate func --- .../examples/ExactlyOnceMessageProcessor.java | 74 +++++++++---------- 1 file changed, 33 insertions(+), 41 deletions(-) diff --git a/examples/src/main/java/kafka/examples/ExactlyOnceMessageProcessor.java b/examples/src/main/java/kafka/examples/ExactlyOnceMessageProcessor.java index 2c07ba2f4d139..0221b7cadfc17 100644 --- a/examples/src/main/java/kafka/examples/ExactlyOnceMessageProcessor.java +++ b/examples/src/main/java/kafka/examples/ExactlyOnceMessageProcessor.java @@ -30,13 +30,11 @@ import org.apache.kafka.common.errors.AuthenticationException; import org.apache.kafka.common.errors.AuthorizationException; import org.apache.kafka.common.errors.FencedInstanceIdException; -import org.apache.kafka.common.errors.InterruptException; import org.apache.kafka.common.errors.InvalidTopicException; import org.apache.kafka.common.errors.ProducerFencedException; import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.errors.UnsupportedForMessageFormatException; import org.apache.kafka.common.errors.UnsupportedVersionException; -import org.apache.kafka.common.errors.WakeupException; import java.time.Duration; import java.util.ArrayList; @@ -138,11 +136,42 @@ public void onPartitionsAssigned(Collection partitions) { int messageProcessed = 0; while (messageRemaining.get() > 0) { try { - messageProcessed += processMessages(); + ConsumerRecords records = consumer.poll(Duration.ofMillis(200)); + if (records.count() > 0) { + // Begin a new transaction session. + producer.beginTransaction(); + for (ConsumerRecord record : records) { + // Process the record and send to downstream. + ProducerRecord customizedRecord = transform(record); + producer.send(customizedRecord); + } + Map positions = new HashMap<>(); + for (TopicPartition topicPartition : consumer.assignment()) { + positions.put(topicPartition, new OffsetAndMetadata(consumer.position(topicPartition), null)); + } + // Checkpoint the progress by sending offsets to group coordinator broker. + // Under group mode, we must apply consumer group metadata for proper fencing. + if (this.mode.equals("groupMode")) { + producer.sendOffsetsToTransaction(positions, consumer.groupMetadata()); + } else { + producer.sendOffsetsToTransaction(positions, consumerGroupId); + } + + // Finish the transaction. All sent records should be visible for consumption now. + producer.commitTransaction(); + messageProcessed += records.count(); + } + } catch (CommitFailedException | InvalidOffsetException e) { + // In case of a retriable exception, suggest aborting the ongoing transaction for correctness. + // Note that abort transaction call could also throw fatal exceptions such as producer fenced. + producer.abortTransaction(); + + // The consumer fetch position also needs to be reset. + resetToLastCommittedPositions(consumer); } catch (ProducerFencedException | FencedInstanceIdException | AuthorizationException | AuthenticationException | UnsupportedVersionException | UnsupportedForMessageFormatException | InvalidTopicException e) { - // Normally fenced exceptions are fatal, we catch them here just for demo purpose. + // Normally the above exceptions are fatal, we catch them here just for demo purpose. throw new KafkaException("Encountered fatal error during processing: " + e.getMessage()); } messageRemaining.set(messagesRemaining(consumer)); @@ -153,43 +182,6 @@ public void onPartitionsAssigned(Collection partitions) { latch.countDown(); } - private int processMessages() - throws ProducerFencedException, FencedInstanceIdException { - try { - ConsumerRecords records = consumer.poll(Duration.ofMillis(200)); - if (records.count() > 0) { - // Begin a new transaction session. - producer.beginTransaction(); - for (ConsumerRecord record : records) { - // Process the record and send to downstream. - ProducerRecord customizedRecord = transform(record); - producer.send(customizedRecord); - } - Map positions = new HashMap<>(); - for (TopicPartition topicPartition : consumer.assignment()) { - positions.put(topicPartition, new OffsetAndMetadata(consumer.position(topicPartition), null)); - } - // Checkpoint the progress by sending offsets to group coordinator broker. - // Under group mode, we must apply consumer group metadata for proper fencing. - if (this.mode.equals("groupMode")) { - producer.sendOffsetsToTransaction(positions, consumer.groupMetadata()); - } else { - producer.sendOffsetsToTransaction(positions, consumerGroupId); - } - - // Finish the transaction. All sent records should be visible for consumption now. - producer.commitTransaction(); - return records.count(); - } - } catch (CommitFailedException | TimeoutException | InvalidOffsetException e) { - // In case of a retriable exception, suggest aborting the ongoing transaction for correctness. - producer.abortTransaction(); - // The consumer fetch position also needs to be reset. - resetToLastCommittedPositions(consumer); - } - return 0; - } - private void printWithTxnId(final String message) { System.out.println(transactionalId + ": " + message); } From eed195dfeb3a10a04b4d33cb26d1e79cfed8b477 Mon Sep 17 00:00:00 2001 From: abbccdda Date: Tue, 18 Feb 2020 13:34:49 -0800 Subject: [PATCH 5/8] simply catch --- .../examples/ExactlyOnceMessageProcessor.java | 30 ++++++++----------- 1 file changed, 12 insertions(+), 18 deletions(-) diff --git a/examples/src/main/java/kafka/examples/ExactlyOnceMessageProcessor.java b/examples/src/main/java/kafka/examples/ExactlyOnceMessageProcessor.java index 0221b7cadfc17..2e60ef8ee66b2 100644 --- a/examples/src/main/java/kafka/examples/ExactlyOnceMessageProcessor.java +++ b/examples/src/main/java/kafka/examples/ExactlyOnceMessageProcessor.java @@ -16,25 +16,17 @@ */ package kafka.examples; -import org.apache.kafka.clients.consumer.CommitFailedException; import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; -import org.apache.kafka.clients.consumer.InvalidOffsetException; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.errors.AuthenticationException; -import org.apache.kafka.common.errors.AuthorizationException; import org.apache.kafka.common.errors.FencedInstanceIdException; -import org.apache.kafka.common.errors.InvalidTopicException; import org.apache.kafka.common.errors.ProducerFencedException; -import org.apache.kafka.common.errors.TimeoutException; -import org.apache.kafka.common.errors.UnsupportedForMessageFormatException; -import org.apache.kafka.common.errors.UnsupportedVersionException; import java.time.Duration; import java.util.ArrayList; @@ -63,6 +55,7 @@ public class ExactlyOnceMessageProcessor extends Thread { private final int numInstances; private final int instanceIdx; private final String transactionalId; + private final String groupInstanceId; private final KafkaProducer producer; private final KafkaConsumer consumer; @@ -91,8 +84,9 @@ public ExactlyOnceMessageProcessor(final String mode, producer = new Producer(outputTopic, true, transactionalId, true, -1, transactionTimeoutMs, null).get(); // Consumer must be in read_committed mode, which means it won't be able to read uncommitted data. // Consumer could optionally configure groupInstanceId to avoid unnecessary rebalances. - Optional groupInstanceId = Optional.of("Txn-consumer-" + instanceIdx); - consumer = new Consumer(inputTopic, consumerGroupId, groupInstanceId, READ_COMMITTED, -1, null).get(); + groupInstanceId = "Txn-consumer-" + instanceIdx; + consumer = new Consumer(inputTopic, consumerGroupId, + Optional.of(groupInstanceId), READ_COMMITTED, -1, null).get(); this.latch = latch; } @@ -161,19 +155,19 @@ public void onPartitionsAssigned(Collection partitions) { producer.commitTransaction(); messageProcessed += records.count(); } - } catch (CommitFailedException | InvalidOffsetException e) { - // In case of a retriable exception, suggest aborting the ongoing transaction for correctness. - // Note that abort transaction call could also throw fatal exceptions such as producer fenced. + } catch (ProducerFencedException e) { + throw new KafkaException(String.format("The transactional.id %s has been claimed by another process", transactionalId)); + } catch (FencedInstanceIdException e) { + throw new KafkaException(String.format("The group.instance.id %s has been claimed by another process", groupInstanceId)); + } catch (KafkaException e) { + // If we have not been fenced, try to abort the transaction and continue. This will raise immediately + // if the producer has hit a fatal error. producer.abortTransaction(); // The consumer fetch position also needs to be reset. resetToLastCommittedPositions(consumer); - } catch (ProducerFencedException | FencedInstanceIdException | AuthorizationException | - AuthenticationException | UnsupportedVersionException | - UnsupportedForMessageFormatException | InvalidTopicException e) { - // Normally the above exceptions are fatal, we catch them here just for demo purpose. - throw new KafkaException("Encountered fatal error during processing: " + e.getMessage()); } + messageRemaining.set(messagesRemaining(consumer)); printWithTxnId("Message remaining: " + messageRemaining); } From 5b56d6c0d897b076b0cfefb87485ae9e87e42651 Mon Sep 17 00:00:00 2001 From: abbccdda Date: Tue, 18 Feb 2020 13:56:09 -0800 Subject: [PATCH 6/8] intellij recommendation --- .../src/main/java/kafka/examples/KafkaExactlyOnceDemo.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/examples/src/main/java/kafka/examples/KafkaExactlyOnceDemo.java b/examples/src/main/java/kafka/examples/KafkaExactlyOnceDemo.java index 6437944bdf105..50410c9a168b8 100644 --- a/examples/src/main/java/kafka/examples/KafkaExactlyOnceDemo.java +++ b/examples/src/main/java/kafka/examples/KafkaExactlyOnceDemo.java @@ -38,7 +38,11 @@ * - partition: number of partitions for input/output topic * - instances: number of instances * - records: number of records - * An example argument list would be `groupMode 6 3 50000` + * An example argument list would be `groupMode 6 3 50000`. + * + * If you are using Intellij, the above arguments should be put in the configuration's `Program Arguments`. + * Also recommended to set an output log file by `Edit Configuration -> Logs -> Save console + * output to file` to record all the log output together. * * The driver could be decomposed as following stages: * From 0bb8234164e8c5cc561456fffbf1345a8e7fcb55 Mon Sep 17 00:00:00 2001 From: abbccdda Date: Tue, 18 Feb 2020 14:36:58 -0800 Subject: [PATCH 7/8] remove standalone mode --- examples/README | 12 ++- .../examples/ExactlyOnceMessageProcessor.java | 83 +++++++------------ .../kafka/examples/KafkaExactlyOnceDemo.java | 25 +++--- .../java/kafka/examples/KafkaProperties.java | 5 -- 4 files changed, 44 insertions(+), 81 deletions(-) diff --git a/examples/README b/examples/README index 2efe71ac182ee..bff6cd39bf2bb 100644 --- a/examples/README +++ b/examples/README @@ -6,10 +6,8 @@ To run the demo: 2. For simple consumer demo, `run bin/java-simple-consumer-demo.sh` 3. For unlimited sync-producer-consumer run, `run bin/java-producer-consumer-demo.sh sync` 4. For unlimited async-producer-consumer run, `run bin/java-producer-consumer-demo.sh` - 5. For standalone mode exactly once demo run, `run bin/exactly-once-demo.sh standaloneMode 6 3 50000`, - this means we are starting 3 EOS instances with 6 topic partitions and 50000 pre-populated records - 6. For group mode exactly once demo run, `run bin/exactly-once-demo.sh groupMode 6 3 50000`, - this means the same as the standalone demo, except consumers are using subscription mode. - 7. Some notes for exactly once demo: - 7.1. The Kafka server has to be on broker version 2.5 or higher to be able to run group mode. - 7.2. You could also use Intellij to run the example directly by configuring parameters as "Program arguments" + 5. For exactly once demo run, `run bin/exactly-once-demo.sh 6 3 50000`, + this means we are starting 3 EOS instances with 6 topic partitions and 50000 pre-populated records. + 6. Some notes for exactly once demo: + 6.1. The Kafka server has to be on broker version 2.5 or higher. + 6.2. You could also use Intellij to run the example directly by configuring parameters as "Program arguments" diff --git a/examples/src/main/java/kafka/examples/ExactlyOnceMessageProcessor.java b/examples/src/main/java/kafka/examples/ExactlyOnceMessageProcessor.java index 2e60ef8ee66b2..5ced32c092a62 100644 --- a/examples/src/main/java/kafka/examples/ExactlyOnceMessageProcessor.java +++ b/examples/src/main/java/kafka/examples/ExactlyOnceMessageProcessor.java @@ -33,7 +33,6 @@ import java.util.Collection; import java.util.Collections; import java.util.HashMap; -import java.util.List; import java.util.Map; import java.util.Optional; import java.util.concurrent.CountDownLatch; @@ -47,13 +46,8 @@ public class ExactlyOnceMessageProcessor extends Thread { private static final boolean READ_COMMITTED = true; - private final String mode; private final String inputTopic; private final String outputTopic; - private final String consumerGroupId; - private final int numPartitions; - private final int numInstances; - private final int instanceIdx; private final String transactionalId; private final String groupInstanceId; @@ -62,20 +56,12 @@ public class ExactlyOnceMessageProcessor extends Thread { private final CountDownLatch latch; - public ExactlyOnceMessageProcessor(final String mode, - final String inputTopic, + public ExactlyOnceMessageProcessor(final String inputTopic, final String outputTopic, - final int numPartitions, - final int numInstances, final int instanceIdx, final CountDownLatch latch) { - this.mode = mode; this.inputTopic = inputTopic; this.outputTopic = outputTopic; - this.consumerGroupId = "Eos-consumer"; - this.numPartitions = numPartitions; - this.numInstances = numInstances; - this.instanceIdx = instanceIdx; this.transactionalId = "Processor-" + instanceIdx; // If we are using the group mode, it is recommended to have a relatively short txn timeout // in order to clear pending offsets faster. @@ -84,8 +70,8 @@ public ExactlyOnceMessageProcessor(final String mode, producer = new Producer(outputTopic, true, transactionalId, true, -1, transactionTimeoutMs, null).get(); // Consumer must be in read_committed mode, which means it won't be able to read uncommitted data. // Consumer could optionally configure groupInstanceId to avoid unnecessary rebalances. - groupInstanceId = "Txn-consumer-" + instanceIdx; - consumer = new Consumer(inputTopic, consumerGroupId, + this.groupInstanceId = "Txn-consumer-" + instanceIdx; + consumer = new Consumer(inputTopic, "Eos-consumer", Optional.of(groupInstanceId), READ_COMMITTED, -1, null).get(); this.latch = latch; } @@ -97,35 +83,18 @@ public void run() { final AtomicLong messageRemaining = new AtomicLong(Long.MAX_VALUE); - // Under group mode, topic based subscription is sufficient as EOS apps are safe to cooperate transactionally after 2.5. - // Under standalone mode, user needs to manually assign the topic partitions and make sure the assignment is unique - // across the consumer group instances. - if (this.mode.equals("groupMode")) { - consumer.subscribe(Collections.singleton(inputTopic), new ConsumerRebalanceListener() { - @Override - public void onPartitionsRevoked(Collection partitions) { - printWithTxnId("Revoked partition assignment to kick-off rebalancing: " + partitions); - } - - @Override - public void onPartitionsAssigned(Collection partitions) { - printWithTxnId("Received partition assignment after rebalancing: " + partitions); - messageRemaining.set(messagesRemaining(consumer)); - } - }); - } else { - // Do a range assignment of topic partitions. - List topicPartitions = new ArrayList<>(); - int rangeSize = numPartitions / numInstances; - int startPartition = rangeSize * instanceIdx; - int endPartition = Math.min(numPartitions - 1, startPartition + rangeSize - 1); - for (int partition = startPartition; partition <= endPartition; partition++) { - topicPartitions.add(new TopicPartition(inputTopic, partition)); + consumer.subscribe(Collections.singleton(inputTopic), new ConsumerRebalanceListener() { + @Override + public void onPartitionsRevoked(Collection partitions) { + printWithTxnId("Revoked partition assignment to kick-off rebalancing: " + partitions); } - consumer.assign(topicPartitions); - printWithTxnId("Manually assign partitions: " + topicPartitions); - } + @Override + public void onPartitionsAssigned(Collection partitions) { + printWithTxnId("Received partition assignment after rebalancing: " + partitions); + messageRemaining.set(messagesRemaining(consumer)); + } + }); int messageProcessed = 0; while (messageRemaining.get() > 0) { @@ -139,17 +108,12 @@ public void onPartitionsAssigned(Collection partitions) { ProducerRecord customizedRecord = transform(record); producer.send(customizedRecord); } - Map positions = new HashMap<>(); - for (TopicPartition topicPartition : consumer.assignment()) { - positions.put(topicPartition, new OffsetAndMetadata(consumer.position(topicPartition), null)); - } + + Map offsets = consumerOffsets(); + // Checkpoint the progress by sending offsets to group coordinator broker. - // Under group mode, we must apply consumer group metadata for proper fencing. - if (this.mode.equals("groupMode")) { - producer.sendOffsetsToTransaction(positions, consumer.groupMetadata()); - } else { - producer.sendOffsetsToTransaction(positions, consumerGroupId); - } + // Note that this API is only available for broker >= 2.5. + producer.sendOffsetsToTransaction(offsets, consumer.groupMetadata()); // Finish the transaction. All sent records should be visible for consumption now. producer.commitTransaction(); @@ -164,7 +128,8 @@ public void onPartitionsAssigned(Collection partitions) { // if the producer has hit a fatal error. producer.abortTransaction(); - // The consumer fetch position also needs to be reset. + // The consumer fetch position needs to be restored to the committed offset + // before the transaction started. resetToLastCommittedPositions(consumer); } @@ -176,6 +141,14 @@ public void onPartitionsAssigned(Collection partitions) { latch.countDown(); } + private Map consumerOffsets() { + Map offsets = new HashMap<>(); + for (TopicPartition topicPartition : consumer.assignment()) { + offsets.put(topicPartition, new OffsetAndMetadata(consumer.position(topicPartition), null)); + } + return offsets; + } + private void printWithTxnId(final String message) { System.out.println(transactionalId + ": " + message); } diff --git a/examples/src/main/java/kafka/examples/KafkaExactlyOnceDemo.java b/examples/src/main/java/kafka/examples/KafkaExactlyOnceDemo.java index 50410c9a168b8..50a1ad14bc499 100644 --- a/examples/src/main/java/kafka/examples/KafkaExactlyOnceDemo.java +++ b/examples/src/main/java/kafka/examples/KafkaExactlyOnceDemo.java @@ -33,12 +33,11 @@ import java.util.concurrent.TimeUnit; /** - * This exactly once demo driver takes 4 arguments: - * - mode: whether to run as standalone app, or a group + * This exactly once demo driver takes 3 arguments: * - partition: number of partitions for input/output topic * - instances: number of instances * - records: number of records - * An example argument list would be `groupMode 6 3 50000`. + * An example argument list would be `6 3 50000`. * * If you are using Intellij, the above arguments should be put in the configuration's `Program Arguments`. * Also recommended to set an output log file by `Edit Configuration -> Logs -> Save console @@ -65,10 +64,10 @@ * The driver will block for the consumption of all committed records. * * From this demo, you could see that all the records from pre-population are processed exactly once, - * in either standalone mode or group mode, with strong partition level ordering guarantee. + * with strong partition level ordering guarantee. * * Note: please start the kafka broker and zookeeper in local first. The broker version must be >= 2.5 - * in order to run group mode, otherwise the app could throw + * in order to run, otherwise the app could throw * {@link org.apache.kafka.common.errors.UnsupportedVersionException}. */ public class KafkaExactlyOnceDemo { @@ -77,15 +76,14 @@ public class KafkaExactlyOnceDemo { private static final String OUTPUT_TOPIC = "output-topic"; public static void main(String[] args) throws InterruptedException, ExecutionException { - if (args.length != 4) { - throw new IllegalArgumentException("Should accept 4 parameters: [mode], " + + if (args.length != 3) { + throw new IllegalArgumentException("Should accept 3 parameters: " + "[number of partitions], [number of instances], [number of records]"); } - String mode = args[0]; - int numPartitions = Integer.parseInt(args[1]); - int numInstances = Integer.parseInt(args[2]); - int numRecords = Integer.parseInt(args[3]); + int numPartitions = Integer.parseInt(args[0]); + int numInstances = Integer.parseInt(args[1]); + int numRecords = Integer.parseInt(args[2]); /* Stage 1: topic cleanup and recreation */ recreateTopics(numPartitions); @@ -104,9 +102,8 @@ public static void main(String[] args) throws InterruptedException, ExecutionExc /* Stage 3: transactionally process all messages */ for (int instanceIdx = 0; instanceIdx < numInstances; instanceIdx++) { - ExactlyOnceMessageProcessor messageProcessor = new ExactlyOnceMessageProcessor(mode, - INPUT_TOPIC, OUTPUT_TOPIC, numPartitions, - numInstances, instanceIdx, transactionalCopyLatch); + ExactlyOnceMessageProcessor messageProcessor = new ExactlyOnceMessageProcessor( + INPUT_TOPIC, OUTPUT_TOPIC, instanceIdx, transactionalCopyLatch); messageProcessor.start(); } diff --git a/examples/src/main/java/kafka/examples/KafkaProperties.java b/examples/src/main/java/kafka/examples/KafkaProperties.java index cd737cf900e29..e73c8d7c3dae7 100644 --- a/examples/src/main/java/kafka/examples/KafkaProperties.java +++ b/examples/src/main/java/kafka/examples/KafkaProperties.java @@ -20,11 +20,6 @@ public class KafkaProperties { public static final String TOPIC = "topic1"; public static final String KAFKA_SERVER_URL = "localhost"; public static final int KAFKA_SERVER_PORT = 9092; - public static final int KAFKA_PRODUCER_BUFFER_SIZE = 64 * 1024; - public static final int CONNECTION_TIMEOUT = 100000; - public static final String TOPIC2 = "topic2"; - public static final String TOPIC3 = "topic3"; - public static final String CLIENT_ID = "SimpleConsumerDemoClient"; private KafkaProperties() {} } From b1905ec71d61b8134284881f706306d1f14c122b Mon Sep 17 00:00:00 2001 From: abbccdda Date: Wed, 19 Feb 2020 09:27:54 -0800 Subject: [PATCH 8/8] fix group mode --- .../java/kafka/examples/ExactlyOnceMessageProcessor.java | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/examples/src/main/java/kafka/examples/ExactlyOnceMessageProcessor.java b/examples/src/main/java/kafka/examples/ExactlyOnceMessageProcessor.java index 5ced32c092a62..8f31b19626783 100644 --- a/examples/src/main/java/kafka/examples/ExactlyOnceMessageProcessor.java +++ b/examples/src/main/java/kafka/examples/ExactlyOnceMessageProcessor.java @@ -63,9 +63,8 @@ public ExactlyOnceMessageProcessor(final String inputTopic, this.inputTopic = inputTopic; this.outputTopic = outputTopic; this.transactionalId = "Processor-" + instanceIdx; - // If we are using the group mode, it is recommended to have a relatively short txn timeout - // in order to clear pending offsets faster. - final int transactionTimeoutMs = this.mode.equals("groupMode") ? 10000 : -1; + // It is recommended to have a relatively short txn timeout in order to clear pending offsets faster. + final int transactionTimeoutMs = 10000; // A unique transactional.id must be provided in order to properly use EOS. producer = new Producer(outputTopic, true, transactionalId, true, -1, transactionTimeoutMs, null).get(); // Consumer must be in read_committed mode, which means it won't be able to read uncommitted data.