Skip to content

KAFKA-18193 Refactor Kafka Streams CloseOptions to Fluent API Style#19955

Merged
bbejeck merged 19 commits intoapache:trunkfrom
m1a2st:KAFKA-18193
Oct 7, 2025
Merged

KAFKA-18193 Refactor Kafka Streams CloseOptions to Fluent API Style#19955
bbejeck merged 19 commits intoapache:trunkfrom
m1a2st:KAFKA-18193

Conversation

@m1a2st
Copy link
Copy Markdown
Collaborator

@m1a2st m1a2st commented Jun 12, 2025

In Kafka Streams, configuration classes typically follow a fluent API
pattern to ensure a consistent and intuitive developer experience.
However, the current implementation of
org.apache.kafka.streams.KafkaStreams$CloseOptions deviates from this
convention by exposing a public constructor, breaking the uniformity
expected across the API.

To address this inconsistency, we propose introducing a new
CloseOptions class that adheres to the fluent API style, replacing the
existing implementation. The new class will retain the existing
timeout(Duration) and leaveGroup(boolean) methods but will enforce
fluent instantiation and configuration. Given the design shift, we will
not maintain backward compatibility with the current class.

This change aligns with the goal of standardizing configuration objects
across Kafka Streams, offering developers a more cohesive and
predictable API.

Reviewers: Bill Bejeckbbejeck@apache.org

@github-actions github-actions Bot added triage PRs from the community streams tools labels Jun 12, 2025
@m1a2st
Copy link
Copy Markdown
Collaborator Author

m1a2st commented Jun 12, 2025

This patch should wait until the KIP-1153 vote concludes.

@m1a2st
Copy link
Copy Markdown
Collaborator Author

m1a2st commented Jun 13, 2025

This KIP has passed. LINK

@github-actions
Copy link
Copy Markdown

A label of 'needs-attention' was automatically added to this PR in order to raise the
attention of the committers. Once this issue has been triaged, the triage label
should be removed to prevent this automation from happening again.

@github-actions
Copy link
Copy Markdown

A label of 'needs-attention' was automatically added to this PR in order to raise the
attention of the committers. Once this issue has been triaged, the triage label
should be removed to prevent this automation from happening again.

@github-actions
Copy link
Copy Markdown

A label of 'needs-attention' was automatically added to this PR in order to raise the
attention of the committers. Once this issue has been triaged, the triage label
should be removed to prevent this automation from happening again.

@github-actions
Copy link
Copy Markdown

github-actions Bot commented Aug 6, 2025

A label of 'needs-attention' was automatically added to this PR in order to raise the
attention of the committers. Once this issue has been triaged, the triage label
should be removed to prevent this automation from happening again.

@github-actions
Copy link
Copy Markdown

A label of 'needs-attention' was automatically added to this PR in order to raise the
attention of the committers. Once this issue has been triaged, the triage label
should be removed to prevent this automation from happening again.

@mjsax mjsax requested a review from bbejeck August 29, 2025 22:24
@mjsax mjsax added kip Requires or implements a KIP and removed triage PRs from the community needs-attention labels Aug 29, 2025
@bbejeck
Copy link
Copy Markdown
Member

bbejeck commented Oct 3, 2025

@m1a2st can you rebase to fix the merge conflicts? Thanks

Apologies for the delay in the reviewing we'll get this in soon.

m1a2st added 2 commits October 4, 2025 09:41
# Conflicts:
#	streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
Copy link
Copy Markdown
Member

@bbejeck bbejeck left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR @m1a2st overall looks good, I have a couple of comments

* threads to join.
* @param options contains timeout to specify how long to wait for the threads to shut down, and a flag
* {@link org.apache.kafka.streams.CloseOptions.GroupMembershipOperation#LEAVE_GROUP} to
* trigger consumer leave call
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this just specify that users can provide either option for remaining in the group or leaving vs. explicity showing LEAVE_GROUP


public class CloseOptions {
/**
* Enum to specify the group membership operation upon leaving group.
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should say "upon closing closing the Kafka Streams application" or something similar

// further state reports from the thread since we're shutting down
int numStreamThreads = processStreamThread(streamThread -> streamThread.shutdown(leaveGroup));
int numStreamThreads = processStreamThread(
streamThread -> streamThread.shutdown(operation == org.apache.kafka.streams.CloseOptions.GroupMembershipOperation.LEAVE_GROUP)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm thinking we could update the StreamThread#shutdown to take a CloseOptions.GroupMembershipOperation parameter, then change the AtomicBoolean to AtomicReference<GroupMembershipOperation> and we can pass the close option directly to the consumer. WDYT?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think updating the StreamThread#shutdown parameter makes sense to me, so I will update the PR.

@m1a2st
Copy link
Copy Markdown
Collaborator Author

m1a2st commented Oct 5, 2025

Thanks for @bbejeck review, I have updated the PR based on your comments, PTAL

Copy link
Copy Markdown
Member

@bbejeck bbejeck left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@m1a2st thanks for the update, just one minor comment to address, then we can get this in.

private final AtomicLong cacheResizeSize = new AtomicLong(-1L);
private final AtomicBoolean leaveGroupRequested = new AtomicBoolean(false);
private final AtomicReference<org.apache.kafka.streams.CloseOptions.GroupMembershipOperation> leaveGroupRequested =
new AtomicReference<>(org.apache.kafka.streams.CloseOptions.GroupMembershipOperation.LEAVE_GROUP);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The default should be REMAIN_IN_GROUP, which corresponds to the default of leaveGroupRequested = false

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for pointing that out, that was my mistake.

Copy link
Copy Markdown
Member

@bbejeck bbejeck left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @m1a2st ! LGTM, I'll merge once the build passes.

@bbejeck bbejeck merged commit ebae768 into apache:trunk Oct 7, 2025
34 of 36 checks passed
@bbejeck
Copy link
Copy Markdown
Member

bbejeck commented Oct 7, 2025

Merged #19955 into trunk

Copy link
Copy Markdown
Member

@chia7712 chia7712 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@m1a2st sorry for late review. I left a few comments

final String msgPrefix = prepareMillisCheckFailMsgPrefix(options.timeout, "timeout");
final long timeoutMs = validateMillisecondDuration(options.timeout, msgPrefix);
final CloseOptionsInternal optionsInternal = new CloseOptionsInternal(options);
final String msgPrefix = prepareMillisCheckFailMsgPrefix(optionsInternal.timeout(), "timeout");
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it safe to pass an Optional<Duration> to prepareMillisCheckFailMsgPrefix? I assume the accepted type is a Duration.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It’s safe to pass an Optional object, but we shouldn’t change the error message. Therefore, I’ll update it to use Duration instead.

Invalid value for parameter "timeout" (value was: PT1M). 
Invalid value for parameter "timeout" (value was: Optional[PT1M]). 

final long timeoutMs = validateMillisecondDuration(options.timeout, msgPrefix);
final CloseOptionsInternal optionsInternal = new CloseOptionsInternal(options);
final String msgPrefix = prepareMillisCheckFailMsgPrefix(optionsInternal.timeout(), "timeout");
final long timeoutMs = validateMillisecondDuration(optionsInternal.timeout().get(), msgPrefix);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the timeout could be empty, right?

/**
* Fluent method to set the timeout for the close process.
*
* @param timeout the maximum time to wait for the KafkaStreams to close. If {@code null}, the default timeout will be used.
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If {@code null}, the default timeout will be used.

It looks like the implementation is ignoring the comment 😢

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I missed this comment. If null values are not allowed, KafkaStreams at L1637 should not be empty.

@m1a2st
Copy link
Copy Markdown
Collaborator Author

m1a2st commented Oct 7, 2025

Thanks for @chia7712 review, followup PR

chia7712 pushed a commit that referenced this pull request Oct 12, 2025
This PR is a follow-up to KAFKA-18193. It addresses the need for a null
check and an improved error message. Please refer to the previous
comments and the review of
#19955 (review)
for more context.

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
eduwercamacaro pushed a commit to littlehorse-enterprises/kafka that referenced this pull request Nov 12, 2025
…pache#19955)

In Kafka Streams, configuration classes typically follow a fluent API
pattern to ensure a consistent and intuitive developer experience.
However, the current implementation of
`org.apache.kafka.streams.KafkaStreams$CloseOptions` deviates from this
convention by exposing a public constructor, breaking the uniformity
expected across the API.

To address this inconsistency, we propose introducing a new
`CloseOptions` class that adheres to the fluent API style, replacing the
existing implementation. The new class will retain the existing
`timeout(Duration)` and `leaveGroup(boolean)` methods but will enforce
fluent instantiation and configuration. Given the design shift, we will
not maintain backward compatibility with the current class.

This change aligns with the goal of standardizing configuration objects
across Kafka Streams, offering developers a more cohesive and
predictable API.

Reviewers: Bill Bejeck<bbejeck@apache.org>
eduwercamacaro pushed a commit to littlehorse-enterprises/kafka that referenced this pull request Nov 12, 2025
…20650)

This PR is a follow-up to KAFKA-18193. It addresses the need for a null
check and an improved error message. Please refer to the previous
comments and the review of
apache#19955 (review)
for more context.

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
shashankhs11 pushed a commit to shashankhs11/kafka that referenced this pull request Dec 15, 2025
…pache#19955)

In Kafka Streams, configuration classes typically follow a fluent API
pattern to ensure a consistent and intuitive developer experience.
However, the current implementation of
`org.apache.kafka.streams.KafkaStreams$CloseOptions` deviates from this
convention by exposing a public constructor, breaking the uniformity
expected across the API.

To address this inconsistency, we propose introducing a new
`CloseOptions` class that adheres to the fluent API style, replacing the
existing implementation. The new class will retain the existing
`timeout(Duration)` and `leaveGroup(boolean)` methods but will enforce
fluent instantiation and configuration. Given the design shift, we will
not maintain backward compatibility with the current class.

This change aligns with the goal of standardizing configuration objects
across Kafka Streams, offering developers a more cohesive and
predictable API.

Reviewers: Bill Bejeck<bbejeck@apache.org>
shashankhs11 pushed a commit to shashankhs11/kafka that referenced this pull request Dec 15, 2025
…20650)

This PR is a follow-up to KAFKA-18193. It addresses the need for a null
check and an improved error message. Please refer to the previous
comments and the review of
apache#19955 (review)
for more context.

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

kip Requires or implements a KIP streams tools

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants