KAFKA-8934: Introduce instance-level metrics for streams applications#7416
KAFKA-8934: Introduce instance-level metrics for streams applications#7416guozhangwang merged 5 commits intoapache:trunkfrom
Conversation
cadonna
left a comment
There was a problem hiding this comment.
Call for review: @guozhangwang @vvcephei @bbejeck
Sorry for the large PR.
There was a problem hiding this comment.
Here I add the instance-level metrics.
There was a problem hiding this comment.
StreamsMetricsImpl is now created at client level and not on thread level anymore.
There was a problem hiding this comment.
This is not needed anymore since the RocksDBMetricsRecordingTrigger is contained in StreamsMetricsImpl.
There was a problem hiding this comment.
The threadId needs to be passed to those methods since StreamsMetricsImpl was moved to KafkaStreams and cannot contain the thread ID anymore.
There was a problem hiding this comment.
Verifications of state, topology description, and application ID metrics.
There was a problem hiding this comment.
Verification of state metric.
|
JDK 11/Scala 2.13 and JDK 8/Scala 2.11 failed due to OOM. In JDK 11/Scala 2.12, the following test failed: Retest this, please |
|
In JDK 8/Scala 2.11, the following test failed: Retest this, please |
There was a problem hiding this comment.
nit: can we log the stacktrace here? IMHO it's better for debugging.
There was a problem hiding this comment.
+1
FYI @cadonna , all you have to do is:
| log.warn("Error while loading kafka-streams-version.properties: {}", exception.getMessage()); | |
| log.warn("Error while loading kafka-streams-version.properties", exception); |
There was a problem hiding this comment.
We seem to use StreamsMetricsImpl in several places, would it be better to accept a type StreamsMetrics and cast it internally to StreamsMetricsImpl? Same comment applies to here and elsewhere. I might be missing some context here though.
There was a problem hiding this comment.
StreamsMetrics is our public interface to interact with metrics. StreamsMetricsImpl is our internal implementation that also has methods to create our built-in metrics. As far as I know, we pass StreamsMetricsImpl only to internal objects where we need the methods for built-in metrics. A Streams user would only see the StreamsMetrics interface. Said that, I think it is fine as it is.
There was a problem hiding this comment.
My two cents: it's better not to cast. If we need the "internal interface" (i.e., Impl) here, then we should simply declare that fact in the parameter type. Then, the calling code would always know it has to pass an Impl, and we wouldn't ever get surprise ClassCastExceptions in tests or runtime code.
There was a problem hiding this comment.
nit: break args on a separate line
There was a problem hiding this comment.
super nit: Maybe add a brief comment that this is used for metrics like app-id etc. I needed to do a search through the code to see why it was necessary.
There was a problem hiding this comment.
I prefer to rename the class to ImmutableMetricValue.
There was a problem hiding this comment.
super nit: make string task a public final variable.
There was a problem hiding this comment.
nit: same here for string literals and below, I won't repeat this comment anymore.
There was a problem hiding this comment.
I will tackle this in the PR for the store-level metrics.
There was a problem hiding this comment.
Is multi-line here for checkstyle to be happy? Honestly I like the previous one as it is easier to compare multi-lines in case there's any parameter mis-set, would be great if we can get back to oneliner.
There was a problem hiding this comment.
It is not checkstyle, but our coding style guidelines. I see what you mean. I guess we should refactor this anyways to hide most of those parameters behind a method. Looking at the other refactorings, this should become something along the lines of putLatencySensor(threadId, taskId, metricsScope, storeName) which would fit again on one line.
There was a problem hiding this comment.
nit: parameters on separate line
There was a problem hiding this comment.
Just going out on a limb here... should we also log the topology description here?
There was a problem hiding this comment.
I thought, we already do, but I couldn't find any output in a log file that I have locally. I would be in favour of logging the topology because it helps us during on-call. On the other hand, it may pollute the logs. Anyways, could we postpone this discussion to after the release?
vvcephei
left a comment
There was a problem hiding this comment.
Just a couple of quick comments.
There was a problem hiding this comment.
+1
FYI @cadonna , all you have to do is:
| log.warn("Error while loading kafka-streams-version.properties: {}", exception.getMessage()); | |
| log.warn("Error while loading kafka-streams-version.properties", exception); |
There was a problem hiding this comment.
This seems to be more like "InstanceMetrics" than "client" metrics, where "client" usually means a Producer or Consumer. In Streams, we sometimes conflate "client" with "thread" because each StreamThread has exactly one Consumer.
There was a problem hiding this comment.
In KIP-444, "client" is used instead of "instance". Also the tag for instance-level is "client-id" whereas for thread it will be "thread-id" with PR #7429. I agree that clients is somehow overloaded, but since Streams is also a client (that uses other clients) it seems consistent to me.
73ab316 to
bf3ec27
Compare
There was a problem hiding this comment.
Is this always right? It would set the threadId as the name of the thread that constructed the sensor, but is this always the StreamThread? Even if it is today, it wouldn't be outrageous to think in the future that another thread might build a task and then pass it to the thread to execute. Such a refactoring would break this logic, but it would be very subtle.
In contrast, the old code here explicitly passed the thread name via the context, so it would never be set incorrectly just by changing the way that the code is invoked. Perhaps we could preserve this property, for example by passing the thread name as a method argument.
There was a problem hiding this comment.
Just checked the code path, passing the thread name via GlobalStreamThread / StreamThread -> StreamTask -> ProcessorContextImpl maybe fine.
There was a problem hiding this comment.
@vvcephei, you are right. This is not general enough. On thread-level -- for instance -- sensors are created by the main thread and then used in the stream thread. Here, instead of passing it through InternalProcessorContext, I would pass it as a separate parameter to lateRecordDropSensor(). According to KIP-444 late-record-drop will be superseded with dropped-records and lateRecordDropSensor() will most probably disappear. Actually, since it is currently not a problem at all because the calling thread is the stream thread, I would leave it as it is.
There was a problem hiding this comment.
Ok, sounds fine to me, as long as we're aware of the risk.
There was a problem hiding this comment.
My two cents: it's better not to cast. If we need the "internal interface" (i.e., Impl) here, then we should simply declare that fact in the parameter type. Then, the calling code would always know it has to pass an Impl, and we wouldn't ever get surprise ClassCastExceptions in tests or runtime code.
guozhangwang
left a comment
There was a problem hiding this comment.
Made a pass. One meta comment: lots of LOCs are actually extending the single line to multi-lines because of the insertion of thread-id, if it can be avoided I'd prefer the old way since if there are multi sensors to be added together it's actually easier to review.
There was a problem hiding this comment.
After the other PR is merged, this part can be leveraged / rebased, just a reminder :)
There was a problem hiding this comment.
Just checked the code path, passing the thread name via GlobalStreamThread / StreamThread -> StreamTask -> ProcessorContextImpl maybe fine.
There was a problem hiding this comment.
Can we rely on ImmutableValue from o.a.k.common?
There was a problem hiding this comment.
ImmutableMetricValue is basically a copy of ImmutableValue. ImmutableValue in o.a.k.common is package private and within AppInfoParser. Additionally, I needed to add hashCode() and equals() for testing purposes. What we can do after the release is refactoring the ImmutableValue in common and use it in streams.
There was a problem hiding this comment.
Is multi-line here for checkstyle to be happy? Honestly I like the previous one as it is easier to compare multi-lines in case there's any parameter mis-set, would be great if we can get back to oneliner.
There was a problem hiding this comment.
See my comment above.
FYI: We have have a bug here. We use fetch instead of get and users have already asked a question about this on Stackoverflow. KAFKA-8906 documents this.
There was a problem hiding this comment.
Ditto here, I think besides the threadId having other paired values in oneline is better to read / review.
There was a problem hiding this comment.
In KIP-444 the one below would be stream-thread-metrics.
There was a problem hiding this comment.
This is refactored in the thread-level refactoring PR that I could not yet open because it is based on PR #7429.
There was a problem hiding this comment.
Learned new ways of functional interfaces :)
|
Java 11/2.12 failed and Java 8 failed; results already cleared out retest this please |
|
@cadonna failures seem related |
bf3ec27 to
64a5fba
Compare
- Moves `StreamsMetricsImpl` from `StreamThread` to `KafkaStreams` - Adds instance-level metrics as specified in KIP-444, i.e.: -- version -- commit-id -- application-id -- topology-description -- state
Also adds comments to tests that possibly fail when started from an IDE due to the missing version file on the classpath.
Fixes compile error introduced by merging apache#5527
ece0eda to
ab3c043
Compare
|
In JDK 11/Scala 2.12, the following test failed: In JDK 11/Scala 2.13, the following test failed: In JDK 8/Scala 2.11, the following test failed: Retest this, please |
|
Jenkins failures are irrelevant. |
|
Merged to trunk. |
StreamsMetricsImplfromStreamThreadtoKafkaStreams-- version
-- commit-id
-- application-id
-- topology-description
-- state
Committer Checklist (excluded from commit message)