Skip to content

KAFKA-13945: add bytes/records consumed and produced metrics#12235

Merged
cadonna merged 30 commits intoapache:trunkfrom
ableegoldman:13945-add-bytes/records-produced-metric
Jun 7, 2022
Merged

KAFKA-13945: add bytes/records consumed and produced metrics#12235
cadonna merged 30 commits intoapache:trunkfrom
ableegoldman:13945-add-bytes/records-produced-metric

Conversation

@ableegoldman
Copy link
Copy Markdown
Member

Implementation of KIP-846: Source/sink node metrics for Consumed/Produced throughput in Streams

Adds the following INFO node-level metrics for the total bytes/records consumed and produced:

  • bytes-consumed-total
  • records-consumed-total
  • bytes-produced-total
  • records-produced-total

Comment thread docs/ops.html Outdated
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll follow up with a docs PR separately

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Optimization to hopefully bypass any potential concern or risk of performance regression -- it may be paranoid or an over-optimization at this point but imo it doesn't hurt to report these metrics at a slightly more course-grained time interval. Happy to go either way on this however

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is fine as you do it since we do it similarly in other places like processAtSourceSensor.record(1.0d, context.currentSystemTimeMs()); in SourceNode.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can also remove the comment.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just clearing up an overly-generic method name (the metric name itself is not changed)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

Copy link
Copy Markdown
Member

@cadonna cadonna left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ableegoldman Thank you for the PR!

Here my feedback:

Comment on lines 64 to 68
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I actually do not understand this comment. Backwards compatibility of what? What does "empty sensor without parent" mean? recordsProducedSensor() and bytesProducedSensor() do not create sensors with parents.

Copy link
Copy Markdown
Member

@cadonna cadonna Jun 1, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice, I just realized that you copied that comment from code that I wrote! 🙂
I guess the backwards compatibility was the one with the old metrics structure that we changed in KIP-444. We removed the old structure in 3.0, so I guess that this is an instance of comments that started lying.
Moreover, from where you copied the comment (I guess it was SourceNode) the sensor processAtSourceSensor has indeed a parent. I think the comment does not make sense in this class and we need to verify if it still makes sense in the other class. That does not need to be verifed in this PR.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah yeah I was wondering about it but I hoped you'd see this and remember the context. I'll remove the comment from my PR and we can follow up separately if it needs to be removed from the SourceNode class (where I copied it from, yes)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is fine as you do it since we do it similarly in other places like processAtSourceSensor.record(1.0d, context.currentSystemTimeMs()); in SourceNode.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can also remove the comment.

Copy link
Copy Markdown
Contributor

@guozhangwang guozhangwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Made a pass, overall LGTM modulo one minor suggestion to use one sensor for both metrics.

Please feel free to merge once KIP is accepted!

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A minor suggestion: I think we can use one sensor that contains two metrics, one as "sum" and one as "count", and then call record here once on that sensor which would update both metrics. Similar for consumed.

Examples can be found in those addInvocationRateAndCountToSensor etc functions.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we always record 1 for this sensor, I think we can combine this and the bytes metrics in a single sensor with sum and count (see my other comment above). Ditto for produced.

@ableegoldman ableegoldman force-pushed the 13945-add-bytes/records-produced-metric branch from 3879617 to a2228f7 Compare June 2, 2022 10:03
Copy link
Copy Markdown
Member

@cadonna cadonna left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ableegoldman Thank you for the updates!

Here my feedback:

Comment thread streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java Outdated
@ableegoldman
Copy link
Copy Markdown
Member Author

@cadonna ok this is ready for another pass -- besides addressing your comments I made two changes worth noting:

  1. I moved the produced/consumed metrics to a new TopicMetrics class as they didn't exactly fit into ProcessorNodeMetrics since I realized that due to dynamic topic routing you could have more than one topic produced to by a given sink node.
  2. To address your (imo valid) concern about over-counting bytes that were sent to the producer but never made it to the topic, I moved the "-produced" sensor into RecordCollectorImpl and have it record inside the send callback after we've checked for errors

The PR has definitely ballooned in size since you last reviewed it but that's almost all due to the new test coverage for the topic-level metrics. The actual logical changes are still relatively minor, so I'm hoping you can give this a +1 by now (and again, if so, please go ahead and merge it yourself)

Thanks for the thorough reviews! :P

@vvcephei
Copy link
Copy Markdown
Contributor

vvcephei commented Jun 3, 2022

The prior build was hung. I cancelled it so the latest commit could be built.

@vvcephei
Copy link
Copy Markdown
Contributor

vvcephei commented Jun 3, 2022

@ableegoldman , you might want to take a look at the build I cancelled. Before it got stuck, there were a large number of failures: https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-12235/9/#showFailuresLink

@ableegoldman ableegoldman force-pushed the 13945-add-bytes/records-produced-metric branch 2 times, most recently from c555982 to 61d5a51 Compare June 3, 2022 20:35
Copy link
Copy Markdown
Member

@cadonna cadonna left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ableegoldman Thank you for the updates!

I had just a couple of nits that can also be done in a follow-up PR.

I did not quite get all the changes in the scala tests and the test-utils tests. Are those intended or just debugging side-effects?

}

if (!topic.endsWith("-changelog")) {
// we may not have created a sensor yet if the node uses dynamic topic routing
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment is a bit misleading here. AFAIU it refers to the else-branch. Please move it or remove it. I think you know my preference 🙂.

Copy link
Copy Markdown
Member Author

@ableegoldman ableegoldman Jun 19, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, it does refer to the if branch -- if sinkNodeToProducedSensorByTopic.get returns null, that means we haven't seen this topic yet/constructed a sensor for it during initialization.
edit: this is wrong, see following comment

I do see how it might be confusing if it's interpreted as referring to the line above, and I do know your preference, but I stand by this particular comment -- it follows the rule of explaining WHY we do something rather than WHAT, and I believe in this case the why is very much not obvious* and worry someone reading this code in the future might think this branch would actually indicate a bug and decide to throw an exception or something. So I'll adjust the spacing to clarify which line it's referring to but I would like to keep it.

I believe in this case the why is very much not obvious

* Source: I myself did not remember about dynamic topics and only realized this was possible when looking at some TTD tests

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh wait sorry I'm being dumb, yes, it does refer to the else case -- I've moved it accordingly.

@cadonna
Copy link
Copy Markdown
Member

cadonna commented Jun 7, 2022

The test failures are unrelated.

@cadonna cadonna merged commit a6c5a74 into apache:trunk Jun 7, 2022
wcarlson5 added a commit to confluentinc/kafka that referenced this pull request Jun 9, 2022
* KAFKA-13152: Replace "buffered.records.per.partition" with "input.buffer.max.bytes" (apache#11796)

Implements KIP-770

Reviewers: Guozhang Wang <wangguoz@gmail.com>
(cherry picked from commit 0924fd3)

* KAFKA-13945: add bytes/records consumed and produced metrics (apache#12235)

Implementation of KIP-846: Source/sink node metrics for Consumed/Produced throughput in Streams

Adds the following INFO topic-level metrics for the total bytes/records consumed and produced:

    bytes-consumed-total
    records-consumed-total
    bytes-produced-total
    records-produced-total

Reviewers: Kvicii <Karonazaba@gmail.com>, Guozhang Wang <guozhang@apache.org>, Bruno Cadonna <cadonna@apache.org>

* Update CODEOWNERS (#721)

(cherry picked from commit ad18b43)

Co-authored-by: vamossagar12 <sagarmeansocean@gmail.com>
Co-authored-by: A. Sophie Blee-Goldman <sophie@confluent.io>
Co-authored-by: Chris Johnson <53450846+cjohnson-confluent@users.noreply.github.com>
ableegoldman added a commit that referenced this pull request Jun 14, 2022
…12288)

Minor followup to #12235 that adds a null check on the record key in the new ClientUtils#producerRecordSizeInBytes utility method, as there are valid cases in which we might be sending records with null keys to the Producer, such as a simple builder.stream("non-keyed-input-topic").filter(...).to("output-topic")

Reviewers: Matthias J. Sax <matthias@confluent.io>, Bill Bejeck <bill@confluent.io>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants