Skip to content

KAFKA-15995: Initial API + make Producer/Consumer plugins Monitorable#17511

Merged
mimaison merged 3 commits intoapache:trunkfrom
mimaison:kip-877
Jan 31, 2025
Merged

KAFKA-15995: Initial API + make Producer/Consumer plugins Monitorable#17511
mimaison merged 3 commits intoapache:trunkfrom
mimaison:kip-877

Conversation

@mimaison
Copy link
Copy Markdown
Member

First part of KIP-877.

This adds the public APIs and enables the following plugins:

  • Serializer
  • Deserializer
  • ProducerInterceptor
  • ConsumerInterceptor
  • Partitioner
    which are all the Producer and Consumer specific plugins.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@mimaison mimaison added the kip Requires or implements a KIP label Oct 15, 2024
@showuon showuon self-assigned this Oct 18, 2024
Copy link
Copy Markdown
Member

@showuon showuon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR. Left some comments.

@Override
public void close() throws Exception {
if (pluginMetrics.isPresent()) pluginMetrics.get().close();
if (instance instanceof AutoCloseable) ((AutoCloseable) instance).close();
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the KIP, we said:
The goal is allow this feature to be used by all plugins that are Closeable, AutoCloseable or have a close() methods.

But here, we only close if it's Closeable/AutoCloseable instance. Are we sure all the interfaces having close() method is Closeable/AutoCloseable?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So far the only plugins that support this feature are all AutoCloseable. When we'll add support to plugin that don't implement Closeable/AutoCloseable, we'll need to adjust this method. But I think for now it's fine.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If someday we missed the "new added plugin without AutoCloseable", it might cause the resource leak. I think we should either update the KIP and say "we only support plugins are Closeable, AutoCloseable", or we fix the issue here, with reflection or something else.

@mimaison
Copy link
Copy Markdown
Member Author

cc @tombentley @C0urante since you also voted on the KIP. Thanks

@mimaison
Copy link
Copy Markdown
Member Author

I rebased on trunk to resolve the conflicts.

@mimaison
Copy link
Copy Markdown
Member Author

Pinging @gharris1727 as you were also involved in the discussion thread.

Copy link
Copy Markdown
Contributor

@gharris1727 gharris1727 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR @mimaison!


@Override
public void close() throws Exception {
if (pluginMetrics.isPresent()) pluginMetrics.get().close();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a way for the plugin metrics to throw on close? If so, the instance itself wouldn't get closed, leaking it.

I think propagating the exception from the actual instance close is a good idea.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes I change the logic to capture exceptions thrown by the plugin or the PluginMetrics instance and rethrow the first one.

Comment thread clients/src/main/java/org/apache/kafka/common/metrics/PluginMetrics.java Outdated
}

@Override
public void close() throws Exception {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The close order here has some trade-offs.

If we close metrics first, the plugin close() method can't ever emit metrics. I can think of some useful examples that we should permit:

  • is a close operation ongoing
  • how many records were processed during the instance lifetime
  • how long was the plugin open
  • how long did operation X take during closing

Also if there are background threads within a plugin, those threads may want to emit metrics concurrent with the close() call. The close() call may leave that background thread running, intentionally or accidentally. I think that once the close() call completes, we can throw away any further metrics from the plugin, making all of the calls no-ops, or maybe throw exceptions if we want to be harsh.

If we close the plugin first, the MetricValueProvider interface may have an undefined value during and after closing. If the plugin needs to be open in order to provide real data, they MetricValueProvider instance may throw an exception, or provide junk or stale data.

I think we should close the plugin first, and mention that the MetricValueProviders may be called at any time, including after the metric is removed or the plugin is closed.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good point! And thanks for taking the time to consider the alternatives and suggest solutions.

I agree that the plugin instance should be closed first. This allows the plugin close() method to update metrics. Also when closing the PluginMetrics instance afterwards, this allows to "lock" it as in theory it should not receive further calls.

I've opted to use Utils.closeQuietly() so we can capture if any of the plugin or PluginMetrics instance fail closing.

@mimaison
Copy link
Copy Markdown
Member Author

mimaison commented Dec 3, 2024

Rebased to resolve the conflicts.
@showuon @gharris1727 Can you take another look? Thanks

Comment on lines +36 to +40
private final Metrics metrics;
private final Map<String, String> tags;
private final Set<MetricName> metricNames = new HashSet<>();
private final Set<String> sensors = new HashSet<>();
private boolean closing = false;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that this implementation should be thread-safe, at minimum the closing variable should be volatile, but maybe the Sets need to be thread-safe as well.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the review, I pushed an update.

@mimaison
Copy link
Copy Markdown
Member Author

@showuon @gharris1727 Thanks for the reviews! Can you take another look?

@gharris1727
Copy link
Copy Markdown
Contributor

@mimaison I don't have any more comments, I think i'm ready to approve this. Can you check the build failures? I've rerun it several times and the ClientUtilsTest seems to be consistently failing, but i'm not sure how your changes could be causing it.

@mimaison
Copy link
Copy Markdown
Member Author

mimaison commented Jan 30, 2025

I think that ClientUtilsTest issue has been fixed in #18549.
I rebased the PR on trunk and fixed a compilation issue due to a new test in KafkaProducerTest, that should clear the CI issue.

@mimaison
Copy link
Copy Markdown
Member Author

We still have a bunch of failures in the CI but these all look unrelated to this PR.
The failure AbstractCoordinatorTest > testWakeupAfterSyncGroupReceived() is tracked in https://issues.apache.org/jira/browse/KAFKA-18310.

Copy link
Copy Markdown
Contributor

@gharris1727 gharris1727 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM.

I was having pretty good luck earlier this month with green builds, so builds seem less reliable than usual. We can merge and monitor trunk, unblocking the next PR.

Thanks @mimaison for the KIP and implementation! So happy to see this change.

@mimaison mimaison merged commit 7131473 into apache:trunk Jan 31, 2025
@mimaison mimaison deleted the kip-877 branch January 31, 2025 09:40
pdruley pushed a commit to pdruley/kafka that referenced this pull request Feb 12, 2025
…apache#17511)


Reviewers: Greg Harris <gharris1727@gmail.com>, Luke Chen <showuon@gmail.com>
manoj-mathivanan pushed a commit to manoj-mathivanan/kafka that referenced this pull request Feb 19, 2025
…apache#17511)


Reviewers: Greg Harris <gharris1727@gmail.com>, Luke Chen <showuon@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

clients consumer kip Requires or implements a KIP producer

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants