Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -74,14 +74,29 @@ public ExactlyOnceMessageProcessor(String threadName,
int transactionTimeoutMs = 10_000;
// A unique transactional.id must be provided in order to properly use EOS.
producer = new Producer(
"processor-producer", KafkaProperties.KAFKA_SERVER_URL + ":" + KafkaProperties.KAFKA_SERVER_PORT, outputTopic, true, transactionalId, true, -1, transactionTimeoutMs, null)
"processor-producer",
KafkaProperties.BOOTSTRAP_SERVERS,
outputTopic,
true,
transactionalId,
true,
-1,
transactionTimeoutMs,
null)
.createKafkaProducer();
// Consumer must be in read_committed mode, which means it won't be able to read uncommitted data.
// Consumer could optionally configure groupInstanceId to avoid unnecessary rebalances.
this.groupInstanceId = "giid-" + threadName;
boolean readCommitted = true;
consumer = new Consumer(
"processor-consumer", KafkaProperties.KAFKA_SERVER_URL + ":" + KafkaProperties.KAFKA_SERVER_PORT, inputTopic, "processor-group", Optional.of(groupInstanceId), readCommitted, -1, null)
"processor-consumer",
KafkaProperties.BOOTSTRAP_SERVERS,
inputTopic,
"processor-group",
Optional.of(groupInstanceId),
readCommitted,
-1,
null)
.createKafkaConsumer();
this.latch = latch;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,16 @@

/**
* This example can be decomposed into the following stages:
*
* <p>
* 1. Clean any topics left from previous runs.
* 2. Create a producer thread to send a set of records to topic1.
* 3. Create a consumer thread to fetch all previously sent records from topic1.
*
* <p>
* If you are using IntelliJ IDEA, the above arguments should be put in `Modify Run Configuration - Program Arguments`.
* You can also set an output log file in `Modify Run Configuration - Modify options - Save console output to file` to
* record all the log output together.
*/
public class KafkaConsumerProducerDemo {
public static final String BOOTSTRAP_SERVERS = "localhost:9092";
public static final String TOPIC_NAME = "my-topic";
public static final String GROUP_NAME = "my-group";

Expand All @@ -49,17 +48,17 @@ public static void main(String[] args) {
boolean isAsync = args.length == 1 || !args[1].trim().equalsIgnoreCase("sync");

// stage 1: clean any topics left from previous runs
Utils.recreateTopics(BOOTSTRAP_SERVERS, -1, TOPIC_NAME);
Utils.recreateTopics(KafkaProperties.BOOTSTRAP_SERVERS, -1, TOPIC_NAME);
CountDownLatch latch = new CountDownLatch(2);

// stage 2: produce records to topic1
Producer producerThread = new Producer(
"producer", BOOTSTRAP_SERVERS, TOPIC_NAME, isAsync, null, false, numRecords, -1, latch);
"producer", KafkaProperties.BOOTSTRAP_SERVERS, TOPIC_NAME, isAsync, null, false, numRecords, -1, latch);
producerThread.start();

// stage 3: consume records from topic1
Consumer consumerThread = new Consumer(
"consumer", BOOTSTRAP_SERVERS, TOPIC_NAME, GROUP_NAME, Optional.empty(), false, numRecords, latch);
"consumer", KafkaProperties.BOOTSTRAP_SERVERS, TOPIC_NAME, GROUP_NAME, Optional.empty(), false, numRecords, latch);
consumerThread.start();

if (!latch.await(5, TimeUnit.MINUTES)) {
Expand Down
34 changes: 26 additions & 8 deletions examples/src/main/java/kafka/examples/KafkaExactlyOnceDemo.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@

/**
* This example can be decomposed into the following stages:
*
* <p>
* 1. Clean any topics left from previous runs.
* 2. Set up a producer thread to pre-populate a set of records with even number keys into the input topic.
* The demo will block for the record generation to finish, so the producer is synchronous.
Expand All @@ -37,16 +37,15 @@
* 4. Create a read_committed consumer thread to verify we have all records in the output topic,
* and record ordering at the partition level is maintained.
* The demo will block for the consumption of all committed records, with transactional guarantee.
*
* <p>
* Broker version must be >= 2.5.0 in order to run, otherwise the example will throw
* {@link org.apache.kafka.common.errors.UnsupportedVersionException}.
*
* <p>
* If you are using IntelliJ IDEA, the above arguments should be put in `Modify Run Configuration - Program Arguments`.
* You can also set an output log file in `Modify Run Configuration - Modify options - Save console output to file` to
* record all the log output together.
*/
public class KafkaExactlyOnceDemo {
public static final String BOOTSTRAP_SERVERS = "localhost:9092";
private static final String INPUT_TOPIC = "input-topic";
private static final String OUTPUT_TOPIC = "output-topic";
public static final String GROUP_NAME = "check-group";
Expand All @@ -66,12 +65,20 @@ public static void main(String[] args) {
int numRecords = Integer.parseInt(args[2]);

// stage 1: clean any topics left from previous runs
Utils.recreateTopics(BOOTSTRAP_SERVERS, numPartitions, INPUT_TOPIC, OUTPUT_TOPIC);
Utils.recreateTopics(KafkaProperties.BOOTSTRAP_SERVERS, numPartitions, INPUT_TOPIC, OUTPUT_TOPIC);

// stage 2: send demo records to the input-topic
CountDownLatch producerLatch = new CountDownLatch(1);
Producer producerThread = new Producer(
"producer", BOOTSTRAP_SERVERS, INPUT_TOPIC, false, null, true, numRecords, -1, producerLatch);
"producer",
KafkaProperties.BOOTSTRAP_SERVERS,
INPUT_TOPIC,
false,
null,
true,
numRecords,
-1,
producerLatch);
producerThread.start();
if (!producerLatch.await(2, TimeUnit.MINUTES)) {
Utils.printErr("Timeout after 2 minutes waiting for data load");
Expand All @@ -83,7 +90,11 @@ public static void main(String[] args) {
CountDownLatch processorsLatch = new CountDownLatch(numInstances);
List<ExactlyOnceMessageProcessor> processors = IntStream.range(0, numInstances)
.mapToObj(id -> new ExactlyOnceMessageProcessor(
"processor-" + id, BOOTSTRAP_SERVERS, INPUT_TOPIC, OUTPUT_TOPIC, processorsLatch))
"processor-" + id,
KafkaProperties.BOOTSTRAP_SERVERS,
INPUT_TOPIC,
OUTPUT_TOPIC,
processorsLatch))
.collect(Collectors.toList());
processors.forEach(ExactlyOnceMessageProcessor::start);
if (!processorsLatch.await(2, TimeUnit.MINUTES)) {
Expand All @@ -95,7 +106,14 @@ public static void main(String[] args) {
// stage 4: check consuming records from the output-topic
CountDownLatch consumerLatch = new CountDownLatch(1);
Consumer consumerThread = new Consumer(
"consumer", BOOTSTRAP_SERVERS, OUTPUT_TOPIC, GROUP_NAME, Optional.empty(), true, numRecords, consumerLatch);
"consumer",
KafkaProperties.BOOTSTRAP_SERVERS,
OUTPUT_TOPIC,
GROUP_NAME,
Optional.empty(),
true,
numRecords,
consumerLatch);
consumerThread.start();
if (!consumerLatch.await(2, TimeUnit.MINUTES)) {
Utils.printErr("Timeout after 2 minutes waiting for output read");
Expand Down
4 changes: 1 addition & 3 deletions examples/src/main/java/kafka/examples/KafkaProperties.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,7 @@
package kafka.examples;

public class KafkaProperties {
public static final String TOPIC = "topic1";
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you please look into when this was added and why it wasn't used? Unused variables can sometimes hint towards bugs where we are not using variables when we should.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@divijvaidya Thank you so much for the review

When this class is created on it's earlier version, it stored all related properties needed to run all examples this is an example of what is was when created

  final static String zkConnect = "127.0.0.1:2181";
  final static  String groupId = "group1";
  final static String topic = "topic1";
  final static String kafkaServerURL = "localhost";
  final static int kafkaServerPort = 9092;
  final static int kafkaProducerBufferSize = 64*1024;
  final static int connectionTimeOut = 100000;
  final static int reconnectInterval = 10000;
  final static String topic2 = "topic2";
  final static String topic3 = "topic3";
  final static String clientId = "SimpleConsumerDemoClient";**

Then it was cleaned, and intended to keep configuration needed to run Kafka,

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@divijvaidya I think even the two variables needs to be refactored since they are used only once and for the exactly once semantic example and can be replaced by one variable the contains the bootstrapServers instead of concatenating three strings to build a boostrapServers url

    public static final String KAFKA_SERVER_URL = "localhost";
    public static final int KAFKA_SERVER_PORT = 9092;

I think I can update this PR cover this point

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@divijvaidya I updated my pull request and made some suggestions like reusing BOOTSTRAP_SERVERS property in all examples when it's needed

Please can you review it again

public static final String KAFKA_SERVER_URL = "localhost";
public static final int KAFKA_SERVER_PORT = 9092;
public static final String BOOTSTRAP_SERVERS = "localhost:9092";

private KafkaProperties() {}
}