KAFKA-15129;[3/N] Remove metrics in AbstractFetcherManager when fetcher manager instance shutdown#13929
KAFKA-15129;[3/N] Remove metrics in AbstractFetcherManager when fetcher manager instance shutdown#13929hudeqi wants to merge 9 commits intoapache:trunkfrom
Conversation
…er manager instance shutdown
clolov
left a comment
There was a problem hiding this comment.
As with the other two PRs I have a preference for variable names to start with a lowercase unless there is a reason (i.e. a convention) for them to be with uppercase, but beyond that, this seems reasonable to me.
|
And please help to review this @divijvaidya |
divijvaidya
left a comment
There was a problem hiding this comment.
Thanks for these code changes @hudeqi. I have seen cases of OOM caused by metric leaks in production and the changes you are making go a long way towards ensure safe code.
|
Are there any hidden problems in this PR? @divijvaidya |
|
I will get to all your PRs on metrics close by end of this week @hudeqi. Right now I am focusing on releasing 3.5.1. |
@divijvaidya hi, reminding~ |
|
I still need to get 3.5.1 out and focus on KIP-405 PRs (targeting 3.6 release) before I get some more time @hudeqi! |
Hi, do you have time to review this PR now? I want to combine all the currently opened PRs about metric leaks into one PR. I don’t know if this will help or burden your review? @divijvaidya |
…tric_replicaManager
|
@nizhikov Hi, can you help review the PR of this series? Looks like @divijvaidya doesn't have time. |
|
Hello. I will review this PR in the nearest time. |
| private val metricsGroup = new KafkaMetricsGroup(this.getClass) | ||
|
|
||
| // Visible for test | ||
| private[server] var metricNamesToTags: Map[String, java.util.Map[String, String]] = Map.empty |
There was a problem hiding this comment.
Can we use mutable map like it's done for fetcherThreadMap?
Or even Set<MetricName>, see comment below
| }, tags) | ||
| private[server] def newMetrics(): Unit = { | ||
| val tags = Map("clientId" -> clientId).asJava | ||
| metricsGroup.newGauge(MaxLagMetricName, () => { |
There was a problem hiding this comment.
We can use
Set<MetricName> metrics = ...;
MetricName name = metricGroup.metricName(MaxLagMetricName, tags);
KafkaYammerMetrics.defaultRegistry().newGauge(name, () => { ... });
metrics.add(name);
And store explicit MetricName instead of implicit name and tags.
metrics.forEach(KafkaYammerMetrics.defaultRegistry()::removeMetric);
`` `
There was a problem hiding this comment.
We have metrigGroup#meter and other methods that takes MetricName as parameter.
It's look convinient to create one for Gauge.
There was a problem hiding this comment.
I wonder, do we really need metricNamesToTags set. It's just 4 metric so may be it will be more clear to create explicit MetricName for each and use them while closing fetcher manager.
There was a problem hiding this comment.
Hi, @nizhikov , thank you for taking the time to review. The significance of metricNamesToTags is that it can be used in the unit test to check whether all the metrics of the corresponding module are removed when closed, to prevent someone from adding new metrics but forgetting remove, with this collection, you only need to add the new metric to this collection.
Removing the leaking metrics is a long and potentially fixed process, as can be seen from the original reviews reached with @divijvaidya.
There was a problem hiding this comment.
It seems you can use MetricName to check metrics registered and removed during test.
|
Hello. @hudeqi , thanks for the PR. |
This pr is used to remove the metrics in
AbstractFetcherManagerwhenReplicaFetcherManager/ReplicaAlterLogDirsManagerinstance shutdown.This pr has passed the corresponding unit test, and it is part of KAFKA-15129.