KAFKA-7223: Suppression Buffer Metrics#5795
KAFKA-7223: Suppression Buffer Metrics#5795guozhangwang merged 5 commits intoapache:trunkfrom vvcephei:suppress-buffer-metrics
Conversation
vvcephei
left a comment
There was a problem hiding this comment.
@guozhangwang @mjsax @bbejeck ,
Do you mind taking a look at this last PR for the KIP?
There was a problem hiding this comment.
I did the metrics in a separate file to keep related tests together for readability.
I'm aware there are a lot of repeated strings in the metric names, but it is on purpose: it's the only place in the code where you can visually confirm that the metrics we're measuring conform to the ones we document.
There was a problem hiding this comment.
might as well consolidate this repeated string.
There was a problem hiding this comment.
I am always confused by metric. What it the group name used for (ie, semantic meaning)?
There was a problem hiding this comment.
AFAICT, I don't think it's "used for" anything in particular. It's part of the metric name, but so is the description.
I assume it's intended to be like a namespace.
There was a problem hiding this comment.
group name is used as a first "tag" of the metric name in JMX reporter: xxx-metrics:type=[group-name],[tag1]=[value1],...; for other reporters they can use the group name however they like.
There was a problem hiding this comment.
Sorry for my confusion. Let me rephrase my current understanding:
A "metric" is a single value that we track and report as key-value pair. A Sensor groups multiple metric together for ease of use -- each metric gets globally unique name that is put together with many parts.
- We prefix metric names (the
xxx-metrics:part) with theclientIdthat is different for eachStreamsThread. - What is the purpose of
group-namein the metric-name, ie, what metrics should use the same or different group name - Each sensor has a name that is added to all metric names within the sensor (ie, the sensor name groups all it's contains metrics)
- we also put additional tags to add more meta information (task-id, processor-id) if appropriate to name each metric uniquely within a sensor.
To clarify: by "grouping" I meant use the same string for a specific part in the metric name. Ie, the prefix groups all metric based on the stream-thread, and the sensor name groups all it's contained metrics (as sub-group within the stream-thread group of metrics). Does this make sense?
There was a problem hiding this comment.
To clarify: there are two entities: the metrics registry which organizes the metrics, and there is metrics reporter which regularly pulls from the registry to report the metric values.
Inside metrics registry there is sensor which as you understand is just a way of grouping metrics into meaningful clusters. The SensorName is just an id for distinguishing sensors in the metrics registry (i.e. you will see logic like if this sensor as already been created in the registry skip this step). A metric name presented in MetricName which contains groupName, tags, etc is just a logical entity in the registry. How to represent the metric names is up to the metrics reporter (different reporters can definitely represent it differently). As for the sensor names, they should never be seen outside the registry as the metrics reporter never exposed them.
There was a problem hiding this comment.
Thanks @guozhangwang
I was just checking, and we also define "stream-processor-node-metrics" in ProcessorNode -- should we unify both, to have one constant only?
Think, we can also simplify ProcessorNode#createTaskAndNodeLatencyAndThroughputSensors and remove the group parameter.
There was a problem hiding this comment.
Actually, someone filed a Jira last week reporting that a heap analysis identified this exact group name as responsible for Megabytes of heap space, so perhaps we should consolidate it into one constant!
There was a problem hiding this comment.
If this is an issue, maybe we should do a fix first (in it's own PR, so we can cherry-pick), and rebase/merge this PR later? Thought? Are you planning to do a PR for the reported issue?
There was a problem hiding this comment.
oh. I've already added a commit to this PR.
I am planning a separate PR for a couple of other issues he identified. It's https://issues.apache.org/jira/browse/KAFKA-7660, by the way.
vvcephei
left a comment
There was a problem hiding this comment.
I've updated this PR, and it's ready for review, now that 2.1 is complete.
These metrics would be included in the 2.2 release.
There was a problem hiding this comment.
This sensor is different than that proposed in the kip. During implementation, I noticed a weird asymmetry in which the processor node would measure the suppression, but the buffer would measure the "eviction" aka "emission" aka "forwarding".
I'm proposing to update the KIP to make these two measurements symmetrical.
There was a problem hiding this comment.
I didn't think of that before, but now that you mention it, the change makes sense to me.
There was a problem hiding this comment.
Avoid recording metrics if they haven't changed.
There was a problem hiding this comment.
In the KIP, I erroneously proposed to make this a node-level sensor. It should be a store-level sensor instead. I'm proposing to update the KIP if you all agree.
There was a problem hiding this comment.
Yeah, that makes sense to me. +1
There was a problem hiding this comment.
This is not newly introduced in this PR but: although we call it stream-buffer-metrics where the buffer-id is the store-name, BUT the store name could be an auto-generated name. Note for the named cache we call it stream-record-cache-metrics but the cache name is the store name, and hence we have the same issue.
But here we use the store.name which would be KTABLE-SUPPRESS-XXXX-store if it is auto generated, maybe it's less confusing to use the processor-name instead which would be KTABLE-SUPPRESS-XXX if it is auto generated. Note even if Suppressed.name() is specified as XYZ with store.name() we still set the buffer-id as XYZ-store instead.
There was a problem hiding this comment.
I think it's good to be aware of this, but it seems like reporting the store name is the right thing to do here.
Note that the buffer in this case is the store for the suppression processor. So, using the processor name instead of the full store name would be more analogous to doing the same thing for a K/V Store.
As written, the metrics that are specific to the buffer (aka store) will be tagged with a name that matches the name of that component when you describe the topology.
Does that seem right, or have I missed your point?
There was a problem hiding this comment.
just cleaning up deprecated usages.
There was a problem hiding this comment.
need this for the node-level metrics
There was a problem hiding this comment.
inline unnecessary method. I must have made this when I thought that the cast would cause a compiler warning, but it does not.
There was a problem hiding this comment.
I think it should be fine to set the recording level to debug for all unit tests, rather than create a way to configure it.
|
java 11 had six unrelated failures:
|
bbejeck
left a comment
There was a problem hiding this comment.
I made a pass over the PR and left some comments, overall looks good.
There was a problem hiding this comment.
nit: Should the string say The rate of...?
There was a problem hiding this comment.
Actually, strange as the sentence is, this is what the other rate metrics say, so I think we should just keep it for consistency.
There was a problem hiding this comment.
seems like the contents of the two sensor#add methods are the same as above maybe refactor into two methods?
There was a problem hiding this comment.
Yeah, that makes sense to me. +1
There was a problem hiding this comment.
I feel like I should know the answer to this, but how do we 51 for the answer here and below?
There was a problem hiding this comment.
Empirically :/
It's just the result of summing all the pieces of the test data.
There was a problem hiding this comment.
I didn't think of that before, but now that you mention it, the change makes sense to me.
|
retest this please |
|
Hi @bbejeck , Thanks for the review! As explained above, that description string for the rate metrics is common across the other rate metrics, so I've left it as-is. Is that ok? Incidentally, I was surprised that the tests didn't fail when I changed the description, since I specifically tested for them. Turns out, MetricName equality is defined to ignore the description. I've refactored the test to verify the description as well, and also fixed a problem with one of the description strings I found. |
|
retest this please |
|
rebased |
|
Java 8 failure unrelated.
Java 11 failures unrelated:
Retest this, please. |
|
hmm. It looks like the java 11 build hung and then timed out. Here's the end of the output: I reported this as https://issues.apache.org/jira/browse/KAFKA-7553 |
|
Retest this, please. |
|
The tests failed again, but this time, I didn't catch what failed. I'll take the opportunity to rebase, which will run the tests again. |
|
Java 8 passed, and Java 11 hung again. Retest this, please. |
|
Aha! The tests finally passed. @mjsax or @guozhangwang , do you mind taking a look? |
There was a problem hiding this comment.
This seems to effectively count the number of input records (total and average) -- so it's basically input data rate? Is it intended like this. intermediate-result-suppression sounds more like how often something is suppressed, but I don't see how this would be reflected here.
There was a problem hiding this comment.
I struggled a bit with this question as well. Technically, everything that goes into the buffer is suppressed, and then some stuff is emitted later. We have one metric for each of these things.
I agree it seems more like we should be measuring the number of records that are discarded. I guess we could do this by recording the metric when we buffer a record for which there was a pre-existing record in the buffer. This would be straightforward for the in-memory buffer, but I'm concerned about doing an unnecessary get with an on-disk store to maintain the metric.
You should be able to arrive at the same (or similar) number by subtracting the emit metric from the suppress metric. I was thinking this is what people would probably do, but on second though, I could put together a composite metrics that does it internally.
WDYT?
There was a problem hiding this comment.
I'd prefer only providing the non-composite metrics out-of-box while letting users to compose whenever they want, this is along with the same argument @vvcephei had about "always recording at the lowest level granularity only, and let users to the roll-up themselves if they like".
Following this line, I'd suggest we only keep one metrics aka suppressionEmitSensor and users can then calculate the suppression rate by "processing-rate" minus "suppression-emit-rate".
There was a problem hiding this comment.
Ok, I can get behind this. It should be measuring the same thing as the process-rate metric for the suppression ProcessorNode.
I'll remove this metric.
There was a problem hiding this comment.
Should we call this in init() and/or close(), too?
There was a problem hiding this comment.
sure; I don't think they're strictly necessary, but it doesn't hurt, and it'll make the system more resilient to future changes.
There was a problem hiding this comment.
I am always confused by metric. What it the group name used for (ie, semantic meaning)?
There was a problem hiding this comment.
Does it make sense to use the same group name as above?
There was a problem hiding this comment.
This string can effectively never change (it would mean we've renamed all our processor metrics), so I don't think there's any risk from the duplication.
There was a problem hiding this comment.
group name is used as a first "tag" of the metric name in JMX reporter: xxx-metrics:type=[group-name],[tag1]=[value1],...; for other reporters they can use the group name however they like.
There was a problem hiding this comment.
I'd prefer only providing the non-composite metrics out-of-box while letting users to compose whenever they want, this is along with the same argument @vvcephei had about "always recording at the lowest level granularity only, and let users to the roll-up themselves if they like".
Following this line, I'd suggest we only keep one metrics aka suppressionEmitSensor and users can then calculate the suppression rate by "processing-rate" minus "suppression-emit-rate".
There was a problem hiding this comment.
This is not newly introduced in this PR but: although we call it stream-buffer-metrics where the buffer-id is the store-name, BUT the store name could be an auto-generated name. Note for the named cache we call it stream-record-cache-metrics but the cache name is the store name, and hence we have the same issue.
But here we use the store.name which would be KTABLE-SUPPRESS-XXXX-store if it is auto generated, maybe it's less confusing to use the processor-name instead which would be KTABLE-SUPPRESS-XXX if it is auto generated. Note even if Suppressed.name() is specified as XYZ with store.name() we still set the buffer-id as XYZ-store instead.
There was a problem hiding this comment.
In the wiki there is another metric for suppression-mem-buffer-evict, is that not in the scope of this PR?
There was a problem hiding this comment.
Ah, now that you mention it, when I originally created this PR, I noticed there were some shortcomings with the metrics I proposed in the KIP, so I wanted to get the reviewers' feedback on the form it has taken in this PR before proposing an update to the KIP.
In particular, I proposed an asymmetric pair of metrics where the processor node would measure the number of incoming events (intermediate-result-suppression), but the buffer would measure the outgoing (emitted==evicted) events. It seemed better to offer a symmetric pair of in/out metrics on either the processor or the buffer. In this PR, I chose to put them both on the node.
In other words, I'm proposing to replace suppression-mem-buffer-evict with suppression-emit.
Of course, in this same review, you have proposed to drop intermediate-result-suppression in favor of the existing process-rate and process-total node-level metrics. This reduces the argument, but I think it still makes sense to make this a node metric instead of a buffer metric, since (IMHO) it more closely matches the intent of using a suppression node to control the emission rate.
There was a problem hiding this comment.
Thanks for the clarification, that sounds good to me.
There was a problem hiding this comment.
Is this rate value be time-dependent (I remember we saw some issues with it if the test runs too slowly), to be non-zero in some edge cases?
There was a problem hiding this comment.
IIRC, rate metrics are time-dependent, but I think that this particular comparison is pretty safe for a mock processor unit test.
|
Hi @mjsax @guozhangwang @bbejeck , Thank you all for your reviews! I have updated this PR to address the outstanding comments. If it looks good to you now, I will go ahead and update the KIP (and send out a notice to the vote thread), and we can merge the PR. |
|
LGTM! @mjsax please feel free to merge if you think all your comments are addressed. |
|
KIP is updated and message sent to the vote thread. |
mjsax
left a comment
There was a problem hiding this comment.
Two more follow up question. Overall LGTM.
There was a problem hiding this comment.
Thanks @guozhangwang
I was just checking, and we also define "stream-processor-node-metrics" in ProcessorNode -- should we unify both, to have one constant only?
Think, we can also simplify ProcessorNode#createTaskAndNodeLatencyAndThroughputSensors and remove the group parameter.
| dirtyKeys.clear(); | ||
| memBufferSize = 0; | ||
| minTimestamp = Long.MAX_VALUE; | ||
| open = false; |
There was a problem hiding this comment.
so that the buffer will report as "closed" immediately after this method is called, rather than only after the close operation has completed.
In retrospect, it seemed safer for :
open()to setopen=trueat when opening has completedclose()to setopen=falsewhen closing has started
This way, observers who first check that a store is open before querying would never be fooled into querying a partially closed store.
|
Unrelated test failure: Retest this, please |
|
@mjsax , do you have any more feedback on this? Thanks, |
I think there is none as he said "otherwise LGTM" :) Will merge to trunk now. |
There are a bunch of those constants that appears on different classes and I remember @vvcephei mentioned the reason not doing it in this PR is to not introduce mutual dependency on those classes. If we want to do so I'd suggest we just exact all these constants into a separate class and let all other sensor classes refer to this single one, which we can do in another minor PR. |
My bad, I did not refresh on the file changes and hence was looking at an older version. The above comment is actually already addressed. |
Document the new metrics added in #5795 Reviewers: Bill Bejeck <bill@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
Add the final batch of metrics from KIP-328 Reviewers: Matthias J. Sax <matthias@confluent.io>, Bill Bejeck <bill@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
Document the new metrics added in apache#5795 Reviewers: Bill Bejeck <bill@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
Add the final batch of metrics from KIP-328
Committer Checklist (excluded from commit message)