Skip to content

KAFKA-9447: Add new customized EOS model example#8031

Merged
guozhangwang merged 11 commits intoapache:trunkfrom
abbccdda:eos_example
Feb 6, 2020
Merged

KAFKA-9447: Add new customized EOS model example#8031
guozhangwang merged 11 commits intoapache:trunkfrom
abbccdda:eos_example

Conversation

@abbccdda
Copy link
Copy Markdown

@abbccdda abbccdda commented Feb 1, 2020

With the improvement of 447, we are now offering developers a better experience on writing their customized EOS apps with group subscription, instead of manual assignments. With the demo, user should be able to get started more quickly on writing their own EOS app, and understand the processing logic much better.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@@ -51,21 +51,18 @@ public Map<String, KafkaFuture<TopicDescription>> values() {
*/
Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

side cleanup

@@ -48,23 +48,13 @@ public KafkaFuture<Map<String, TopicListing>> namesToListings() {
* Return a future which yields a collection of TopicListing objects.
Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

side cleanup

@abbccdda abbccdda force-pushed the eos_example branch 3 times, most recently from 6bf8724 to 340747f Compare February 1, 2020 22:20
Copy link
Copy Markdown
Contributor

@guozhangwang guozhangwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM overall, just some minor comments.

if (this.mode.equals("groupMode")) {
producer.sendOffsetsToTransaction(positions, consumer.groupMetadata());
} else {
producer.sendOffsetsToTransaction(positions, consumerGroupId);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to refresh my memory: are we going to eventually deprecate this API, or are we going to keep both, and let users apply this one with manual assignment (like you did here)? I thought we are going to deprecate, but maybe I remembered it wrong.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, we are intended to keep both

throw new KafkaException("Encountered fatal error during processing: " + e.getMessage());
}
}
messageRemaining.set(messagesRemaining(consumer));
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why we need an atomic long here? Seems there's no concurrency.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The tricky thing here is that if we define a primitive long outside of the rebalance callback, it won't compile.


private ProducerRecord<Integer, String> transform(final ConsumerRecord<Integer, String> record) {
printWithTxnId("Transformed record (" + record.key() + "," + record.value() + ")");
return new ProducerRecord<>(outputTopic, record.key() / 2, "Transformed_" + record.value());
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why divide the key by two?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's just a way of showing the key gets processed by message copier, all the produced message key are even keys.

messageProcessed += records.count();
} catch (CommitFailedException e) {
// In case of a retriable exception, suggest aborting the ongoing transaction first for correctness.
producer.abortTransaction();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

prop: abortTransaction can also throw ProducerFenced.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, which is ok as we will throw on ProducerFenced anyway

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure I understand: we try to capture ProducerFenced below and wrap it as a KafkaException, but here if we throw ProducerFenced it would not be captured and wrapped, is that intentional?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh I see your pt.

Comment thread examples/src/main/java/kafka/examples/KafkaExactlyOnceDemo.java Outdated
@guozhangwang guozhangwang merged commit 9d17bf9 into apache:trunk Feb 6, 2020
@omkreddy
Copy link
Copy Markdown
Contributor

omkreddy commented Feb 6, 2020

@abbccdda looks like :examples:spotbugsMain failing for newly added classes.

@abbccdda
Copy link
Copy Markdown
Author

abbccdda commented Feb 6, 2020

@omkreddy Thanks for catching that! Will do a fix in a minute

try {
// Abort previous transaction if instructed.
if (abortPreviousTransaction) {
producer.abortTransaction();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a bit unconventional to have abort logic at the start of the loop. I think what users would expect is something like this:

try {
  producer.beginTransaction()
  producer.send(...)
  producer.sendOffsetsToTransaction(...)
  producer.commitTransaction()
} catch (Exception ) {
  producer.abortTransaction()
}

ijuma added a commit to confluentinc/kafka that referenced this pull request Feb 7, 2020
Conflicts:
* build.gradle: moved avro plugin definition below
newly added test retry plugin.

* apache-github/trunk:
  MINOR: further InternalTopologyBuilder cleanup  (apache#8046)
  MINOR: Add timer for update limit offsets (apache#8047)
  HOTFIX: Fix spotsbug failure in Kafka examples (apache#8051)
  KAFKA-9447: Add new customized EOS model example (apache#8031)
  KAFKA-8164: Add support for retrying failed (apache#8019)
  HOTFIX: checkstyle for newly added unit test
  KAFKA-9261; Client should handle unavailable leader metadata (apache#7770)
  MINOR: Fix typos introduced in KIP-559 (apache#8042)
  MINOR: Fixing null handilg in ValueAndTimestampSerializer (apache#7679)
  KAFKA-9113: Clean up task management and state management (apache#7997)
  MINOR: fix checkstyle issue in ConsumerConfig.java (apache#8038)
  KAFKA-9491; Increment high watermark after full log truncation (apache#8037)
  KAFKA-9477 Document RoundRobinAssignor as an option for partition.assignment.strategy (apache#8007)
  KAFKA-9074: Correct Connect’s `Values.parseString` to properly parse a time and timestamp literal (apache#7568)
  KAFKA-9492; Ignore record errors in ProduceResponse for older versions (apache#8030)
guozhangwang pushed a commit that referenced this pull request Feb 12, 2020
With the improvement of 447, we are now offering developers a better experience on writing their customized EOS apps with group subscription, instead of manual assignments. With the demo, user should be able to get started more quickly on writing their own EOS app, and understand the processing logic much better.

Reviewers: Guozhang Wang <wangguoz@gmail.com>
stanislavkozlovski pushed a commit to stanislavkozlovski/kafka that referenced this pull request Feb 18, 2020
With the improvement of 447, we are now offering developers a better experience on writing their customized EOS apps with group subscription, instead of manual assignments. With the demo, user should be able to get started more quickly on writing their own EOS app, and understand the processing logic much better.

Reviewers: Guozhang Wang <wangguoz@gmail.com>
ijuma added a commit to ijuma/kafka that referenced this pull request Apr 28, 2020
…t-for-generated-requests

* apache-github/trunk: (410 commits)
  KAFKA-8843: KIP-515: Zookeeper TLS support
  MINOR: Add missing quote for malformed line content (apache#8070)
  MINOR: Simplify KafkaProducerTest (apache#8044)
  KAFKA-9507; AdminClient should check for missing committed offsets (apache#8057)
  KAFKA-9519: Deprecate the --zookeeper flag in ConfigCommand (apache#8056)
  KAFKA-9509; Fixing flakiness of MirrorConnectorsIntegrationTest.testReplication (apache#8048)
  HOTFIX: Fix two test failures in JDK11 (apache#8063)
  DOCS - clarify transactionalID and idempotent behavior (apache#7821)
  MINOR: further InternalTopologyBuilder cleanup  (apache#8046)
  MINOR: Add timer for update limit offsets (apache#8047)
  HOTFIX: Fix spotsbug failure in Kafka examples (apache#8051)
  KAFKA-9447: Add new customized EOS model example (apache#8031)
  KAFKA-8164: Add support for retrying failed (apache#8019)
  HOTFIX: checkstyle for newly added unit test
  KAFKA-9261; Client should handle unavailable leader metadata (apache#7770)
  MINOR: Fix typos introduced in KIP-559 (apache#8042)
  MINOR: Fixing null handilg in ValueAndTimestampSerializer (apache#7679)
  KAFKA-9113: Clean up task management and state management (apache#7997)
  MINOR: fix checkstyle issue in ConsumerConfig.java (apache#8038)
  KAFKA-9491; Increment high watermark after full log truncation (apache#8037)
  ...
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants