From 85cee4fcbcecebd3aae66fc074a560804601b84b Mon Sep 17 00:00:00 2001 From: Said BOUDJELDA Date: Sat, 15 Jul 2023 01:37:07 +0200 Subject: [PATCH 1/4] remove unused variable --- examples/src/main/java/kafka/examples/KafkaProperties.java | 1 - 1 file changed, 1 deletion(-) diff --git a/examples/src/main/java/kafka/examples/KafkaProperties.java b/examples/src/main/java/kafka/examples/KafkaProperties.java index e73c8d7c3dae7..57da9c46220b4 100644 --- a/examples/src/main/java/kafka/examples/KafkaProperties.java +++ b/examples/src/main/java/kafka/examples/KafkaProperties.java @@ -17,7 +17,6 @@ package kafka.examples; public class KafkaProperties { - public static final String TOPIC = "topic1"; public static final String KAFKA_SERVER_URL = "localhost"; public static final int KAFKA_SERVER_PORT = 9092; From 5a023a20108fe8327bf712b474a5901a99eebcf0 Mon Sep 17 00:00:00 2001 From: Said BOUDJELDA Date: Sun, 16 Jul 2023 01:10:34 +0200 Subject: [PATCH 2/4] refactor KafkaProperties to reuse kafka bootstrap servers url property --- .../examples/ExactlyOnceMessageProcessor.java | 19 +++++++++-- .../examples/KafkaConsumerProducerDemo.java | 7 ++-- .../kafka/examples/KafkaExactlyOnceDemo.java | 34 ++++++++++++++----- .../java/kafka/examples/KafkaProperties.java | 3 +- 4 files changed, 47 insertions(+), 16 deletions(-) diff --git a/examples/src/main/java/kafka/examples/ExactlyOnceMessageProcessor.java b/examples/src/main/java/kafka/examples/ExactlyOnceMessageProcessor.java index b23e567f848fc..fa3a5b81a2e09 100644 --- a/examples/src/main/java/kafka/examples/ExactlyOnceMessageProcessor.java +++ b/examples/src/main/java/kafka/examples/ExactlyOnceMessageProcessor.java @@ -74,14 +74,29 @@ public ExactlyOnceMessageProcessor(String threadName, int transactionTimeoutMs = 10_000; // A unique transactional.id must be provided in order to properly use EOS. producer = new Producer( - "processor-producer", KafkaProperties.KAFKA_SERVER_URL + ":" + KafkaProperties.KAFKA_SERVER_PORT, outputTopic, true, transactionalId, true, -1, transactionTimeoutMs, null) + "processor-producer", + KafkaProperties.BOOTSTRAP_SERVERS, + outputTopic, + true, + transactionalId, + true, + -1, + transactionTimeoutMs, + null) .createKafkaProducer(); // Consumer must be in read_committed mode, which means it won't be able to read uncommitted data. // Consumer could optionally configure groupInstanceId to avoid unnecessary rebalances. this.groupInstanceId = "giid-" + threadName; boolean readCommitted = true; consumer = new Consumer( - "processor-consumer", KafkaProperties.KAFKA_SERVER_URL + ":" + KafkaProperties.KAFKA_SERVER_PORT, inputTopic, "processor-group", Optional.of(groupInstanceId), readCommitted, -1, null) + "processor-consumer", + KafkaProperties.BOOTSTRAP_SERVERS, + inputTopic, + "processor-group", + Optional.of(groupInstanceId), + readCommitted, + -1, + null) .createKafkaConsumer(); this.latch = latch; } diff --git a/examples/src/main/java/kafka/examples/KafkaConsumerProducerDemo.java b/examples/src/main/java/kafka/examples/KafkaConsumerProducerDemo.java index 3c6424c7dcafd..88b957332a9aa 100644 --- a/examples/src/main/java/kafka/examples/KafkaConsumerProducerDemo.java +++ b/examples/src/main/java/kafka/examples/KafkaConsumerProducerDemo.java @@ -32,7 +32,6 @@ * record all the log output together. */ public class KafkaConsumerProducerDemo { - public static final String BOOTSTRAP_SERVERS = "localhost:9092"; public static final String TOPIC_NAME = "my-topic"; public static final String GROUP_NAME = "my-group"; @@ -49,17 +48,17 @@ public static void main(String[] args) { boolean isAsync = args.length == 1 || !args[1].trim().equalsIgnoreCase("sync"); // stage 1: clean any topics left from previous runs - Utils.recreateTopics(BOOTSTRAP_SERVERS, -1, TOPIC_NAME); + Utils.recreateTopics(KafkaProperties.BOOTSTRAP_SERVERS, -1, TOPIC_NAME); CountDownLatch latch = new CountDownLatch(2); // stage 2: produce records to topic1 Producer producerThread = new Producer( - "producer", BOOTSTRAP_SERVERS, TOPIC_NAME, isAsync, null, false, numRecords, -1, latch); + "producer", KafkaProperties.BOOTSTRAP_SERVERS, TOPIC_NAME, isAsync, null, false, numRecords, -1, latch); producerThread.start(); // stage 3: consume records from topic1 Consumer consumerThread = new Consumer( - "consumer", BOOTSTRAP_SERVERS, TOPIC_NAME, GROUP_NAME, Optional.empty(), false, numRecords, latch); + "consumer", KafkaProperties.BOOTSTRAP_SERVERS, TOPIC_NAME, GROUP_NAME, Optional.empty(), false, numRecords, latch); consumerThread.start(); if (!latch.await(5, TimeUnit.MINUTES)) { diff --git a/examples/src/main/java/kafka/examples/KafkaExactlyOnceDemo.java b/examples/src/main/java/kafka/examples/KafkaExactlyOnceDemo.java index 9d94337491dda..584e72d9e81eb 100644 --- a/examples/src/main/java/kafka/examples/KafkaExactlyOnceDemo.java +++ b/examples/src/main/java/kafka/examples/KafkaExactlyOnceDemo.java @@ -25,7 +25,7 @@ /** * This example can be decomposed into the following stages: - * + *

* 1. Clean any topics left from previous runs. * 2. Set up a producer thread to pre-populate a set of records with even number keys into the input topic. * The demo will block for the record generation to finish, so the producer is synchronous. @@ -37,16 +37,15 @@ * 4. Create a read_committed consumer thread to verify we have all records in the output topic, * and record ordering at the partition level is maintained. * The demo will block for the consumption of all committed records, with transactional guarantee. - * + *

* Broker version must be >= 2.5.0 in order to run, otherwise the example will throw * {@link org.apache.kafka.common.errors.UnsupportedVersionException}. - * + *

* If you are using IntelliJ IDEA, the above arguments should be put in `Modify Run Configuration - Program Arguments`. * You can also set an output log file in `Modify Run Configuration - Modify options - Save console output to file` to * record all the log output together. */ public class KafkaExactlyOnceDemo { - public static final String BOOTSTRAP_SERVERS = "localhost:9092"; private static final String INPUT_TOPIC = "input-topic"; private static final String OUTPUT_TOPIC = "output-topic"; public static final String GROUP_NAME = "check-group"; @@ -66,12 +65,20 @@ public static void main(String[] args) { int numRecords = Integer.parseInt(args[2]); // stage 1: clean any topics left from previous runs - Utils.recreateTopics(BOOTSTRAP_SERVERS, numPartitions, INPUT_TOPIC, OUTPUT_TOPIC); + Utils.recreateTopics(KafkaProperties.BOOTSTRAP_SERVERS, numPartitions, INPUT_TOPIC, OUTPUT_TOPIC); // stage 2: send demo records to the input-topic CountDownLatch producerLatch = new CountDownLatch(1); Producer producerThread = new Producer( - "producer", BOOTSTRAP_SERVERS, INPUT_TOPIC, false, null, true, numRecords, -1, producerLatch); + "producer", + KafkaProperties.BOOTSTRAP_SERVERS, + INPUT_TOPIC, + false, + null, + true, + numRecords, + -1, + producerLatch); producerThread.start(); if (!producerLatch.await(2, TimeUnit.MINUTES)) { Utils.printErr("Timeout after 2 minutes waiting for data load"); @@ -83,7 +90,11 @@ public static void main(String[] args) { CountDownLatch processorsLatch = new CountDownLatch(numInstances); List processors = IntStream.range(0, numInstances) .mapToObj(id -> new ExactlyOnceMessageProcessor( - "processor-" + id, BOOTSTRAP_SERVERS, INPUT_TOPIC, OUTPUT_TOPIC, processorsLatch)) + "processor-" + id, + KafkaProperties.BOOTSTRAP_SERVERS, + INPUT_TOPIC, + OUTPUT_TOPIC, + processorsLatch)) .collect(Collectors.toList()); processors.forEach(ExactlyOnceMessageProcessor::start); if (!processorsLatch.await(2, TimeUnit.MINUTES)) { @@ -95,7 +106,14 @@ public static void main(String[] args) { // stage 4: check consuming records from the output-topic CountDownLatch consumerLatch = new CountDownLatch(1); Consumer consumerThread = new Consumer( - "consumer", BOOTSTRAP_SERVERS, OUTPUT_TOPIC, GROUP_NAME, Optional.empty(), true, numRecords, consumerLatch); + "consumer", + KafkaProperties.BOOTSTRAP_SERVERS, + OUTPUT_TOPIC, + GROUP_NAME, + Optional.empty(), + true, + numRecords, + consumerLatch); consumerThread.start(); if (!consumerLatch.await(2, TimeUnit.MINUTES)) { Utils.printErr("Timeout after 2 minutes waiting for output read"); diff --git a/examples/src/main/java/kafka/examples/KafkaProperties.java b/examples/src/main/java/kafka/examples/KafkaProperties.java index 57da9c46220b4..ccba3ec93018c 100644 --- a/examples/src/main/java/kafka/examples/KafkaProperties.java +++ b/examples/src/main/java/kafka/examples/KafkaProperties.java @@ -17,8 +17,7 @@ package kafka.examples; public class KafkaProperties { - public static final String KAFKA_SERVER_URL = "localhost"; - public static final int KAFKA_SERVER_PORT = 9092; + public static final String BOOTSTRAP_SERVERS = "localhost:9092"; private KafkaProperties() {} } From ee094010e063fbfb3b25d5eee51c0336096fd2da Mon Sep 17 00:00:00 2001 From: Said BOUDJELDA Date: Sun, 16 Jul 2023 01:17:57 +0200 Subject: [PATCH 3/4] some formatting to avoid too long lines on editor --- .../src/main/java/kafka/examples/KafkaExactlyOnceDemo.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/examples/src/main/java/kafka/examples/KafkaExactlyOnceDemo.java b/examples/src/main/java/kafka/examples/KafkaExactlyOnceDemo.java index 584e72d9e81eb..ed2b8aee8c70c 100644 --- a/examples/src/main/java/kafka/examples/KafkaExactlyOnceDemo.java +++ b/examples/src/main/java/kafka/examples/KafkaExactlyOnceDemo.java @@ -70,7 +70,7 @@ public static void main(String[] args) { // stage 2: send demo records to the input-topic CountDownLatch producerLatch = new CountDownLatch(1); Producer producerThread = new Producer( - "producer", + "producer", KafkaProperties.BOOTSTRAP_SERVERS, INPUT_TOPIC, false, @@ -90,7 +90,7 @@ public static void main(String[] args) { CountDownLatch processorsLatch = new CountDownLatch(numInstances); List processors = IntStream.range(0, numInstances) .mapToObj(id -> new ExactlyOnceMessageProcessor( - "processor-" + id, + "processor-" + id, KafkaProperties.BOOTSTRAP_SERVERS, INPUT_TOPIC, OUTPUT_TOPIC, @@ -106,7 +106,7 @@ public static void main(String[] args) { // stage 4: check consuming records from the output-topic CountDownLatch consumerLatch = new CountDownLatch(1); Consumer consumerThread = new Consumer( - "consumer", + "consumer", KafkaProperties.BOOTSTRAP_SERVERS, OUTPUT_TOPIC, GROUP_NAME, From ee8461425876cb3859ac201bd20588ed5ff519c6 Mon Sep 17 00:00:00 2001 From: Said BOUDJELDA Date: Sun, 16 Jul 2023 01:20:24 +0200 Subject: [PATCH 4/4] add

tag for to mark new line in javadoc --- .../main/java/kafka/examples/KafkaConsumerProducerDemo.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/examples/src/main/java/kafka/examples/KafkaConsumerProducerDemo.java b/examples/src/main/java/kafka/examples/KafkaConsumerProducerDemo.java index 88b957332a9aa..7ab93d6a8cdbe 100644 --- a/examples/src/main/java/kafka/examples/KafkaConsumerProducerDemo.java +++ b/examples/src/main/java/kafka/examples/KafkaConsumerProducerDemo.java @@ -22,11 +22,11 @@ /** * This example can be decomposed into the following stages: - * + *

* 1. Clean any topics left from previous runs. * 2. Create a producer thread to send a set of records to topic1. * 3. Create a consumer thread to fetch all previously sent records from topic1. - * + *

* If you are using IntelliJ IDEA, the above arguments should be put in `Modify Run Configuration - Program Arguments`. * You can also set an output log file in `Modify Run Configuration - Modify options - Save console output to file` to * record all the log output together.