Skip to content

KAFKA-3111: Fix ConsumerPerformance reporting to use time-based instead of message-based intervals#788

Closed
vahidhashemian wants to merge 1 commit intoapache:trunkfrom
vahidhashemian:KAFKA-3111
Closed

KAFKA-3111: Fix ConsumerPerformance reporting to use time-based instead of message-based intervals#788
vahidhashemian wants to merge 1 commit intoapache:trunkfrom
vahidhashemian:KAFKA-3111

Conversation

@vahidhashemian
Copy link
Copy Markdown
Contributor

Interval lengths for ConsumerPerformance could sometime be calculated as zero. In such cases, when the bytes read or messages read are also zero a NaN output is returned for mbRead per second or for nMsg per second, whereas zero would be a more appropriate output.

In cases where interval length is zero but there have been data and messages to read, an output of Infinity is returned, as expected.

@junrao
Copy link
Copy Markdown
Contributor

junrao commented Jan 19, 2016

Thanks for the patch. Thinking a bit more. It seems the main issue is that the reporting interval is based on messages, instead of time. Reporting the stats at some fixed time interval is more intuitive and is also used in org.apache.kafka.tools.ProducerPerformance. So, to be consistent, we probably should just change the reporting interval to time.

@vahidhashemian
Copy link
Copy Markdown
Contributor Author

Thanks for the feedback. If I understand correctly your suggestion is to change the implementation of the argument "reporting-interval" so instead of number of messages in each interval it takes the (perhaps minimum) length of intervals in milliseconds. It seems that this change could impact both kafka.tools.ConsumerPerformance and kafka.tools.ProducerPerformance as they both use kafka.tools.PerfConfig to extract the args. Do we want kafka.tools.ProducerPerformance modified to time-based intervals too?

@vahidhashemian
Copy link
Copy Markdown
Contributor Author

I made some minor changes so the reporting interval for ConsumerPerformance becomes time-based. Although kafka.tools.ProducerPerformance indirectly uses PerfConfig it does not implement similar interval reporting and will be unchanged.

@vahidhashemian vahidhashemian changed the title KAFKA-3111: Fix ConsumerPerformance output for zero interval lengths KAFKA-3111: Fix ConsumerPerformance reporting to use time-based instead of message-based intervals Jan 20, 2016
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need to change the method printProgressMessage() if we require currentTimeMillis - lastReportTime >= config.reportingInterval, right?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please clarify the question? This method is called only when currentTimeMillis - lastReportTime >= config.reportingInterval (and we want a detailed report).

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the reply.

Here you changed the code from

1000.0 * (mbRead / elapsedMs), messagesRead, ((messagesRead - lastMessagesRead) / elapsedMs) * 1000.0))

to

val mbReadPerSec =
if ((endMs == startMs) && (mbRead == 0.0))
  0.0
else
1000.0 * (mbRead / elapsedMs)

val nMsgPerSec =
if ((endMs == startMs) && (messagesRead == lastMessagesRead))
  0.0
else
  ((messagesRead - lastMessagesRead) / elapsedMs) * 1000.0

The reason is because we want to handle the case where endMs == startMs. But since we will only call this method when currentTimeMillis - lastReportTime >= config.reportingInterval is true, then it is guaranteed that endMs > startMs, thus making the code change unnecessary, right?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Besides, printProgressMessage is also called at line 144. Should that logic be changed as well so that it is called at given time interval?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for your feedback again.

To answer your first comment, There doesn't seem to be any constraint on the value of reporting-interval. I can simply pass in the value of 0 (or a negative value) and no validation is performed. So the added checks might prove useful. Unless we validate the value given for reporting-interval. I hope I didn't misunderstand your point.

On your second comment, you are right. I'll update the other usage as well.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vahidhashemian I agree with you that currently reporting-interval may be 0 or even negative. But does such value even make sense for reporting-interval? In my opinion, we should either throw exception, or disable printMessage until the end of experiment, if reporting-interval <=0. What do you think?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lindong28 Throwing exceptions sounds good. I don't see any validation for other argument values either. Not sure why. For example, the messages argument could be validated too (to be >= 0).

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vahidhashemian This code is written in late 2011 and developers at that time probably doesn't spend much time on such issue in tools and instead focus on implementing new features, since they are probably the only users of such tools. Recent committers are enforcing much high standards and they will probably ask such questions when reviewing your code. I myself have been forced to make my patch bullet-proof to any question like this before submitting it for review.

@vahidhashemian
Copy link
Copy Markdown
Contributor Author

@lindong28 Thank you for your feedback. I responded to your comments inline.

@vahidhashemian vahidhashemian force-pushed the KAFKA-3111 branch 2 times, most recently from 5230a55 to 4c7d8dc Compare June 2, 2016 20:13
@vahidhashemian
Copy link
Copy Markdown
Contributor Author

@lindong28 I updated the PR based on the feedback and the discussion.

@lindong28
Copy link
Copy Markdown
Member

Thanks @vahidhashemian, LGTM. @ijuma Do you have time to review this patch?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we avoid the System.currentTimeMillis() call in line 129?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure and a nice catch. One option is to make currentTimeMillis a variable and initially assign it before the while loop at line 129, and then re-assign the current time to it inside the while loop (to replace the current val definition at line 140). I'll update the PR.

@vahidhashemian vahidhashemian force-pushed the KAFKA-3111 branch 2 times, most recently from 477d8d9 to e5fab1e Compare June 2, 2016 22:51
@vahidhashemian
Copy link
Copy Markdown
Contributor Author

@junrao Thank you for your feedback. I updated the PR based on your suggestions.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For reporting purpose, I think it's enough to call System.currentTimeMillis once per while loop, instead of once per record. Also, the patch changed the behavior of the tool a bit. Currently, if there are no new messages, the tool will exit after 1 sec. With the patch, the tool will block forever. We will need to revert to the previous behavior.

Finally, since our system tests use this tool. Could you run our system tests (see test/README.md) and make sure that the patch doesn't break any test? If needed, @granders can give you access to our jenkins job so that you can test your branch on EC2.

Copy link
Copy Markdown
Contributor Author

@vahidhashemian vahidhashemian Jun 3, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@junrao Thanks again for your feedback. To respond to your comments:

  1. Sure, I'll update that for the new consumer. It seems to me the old consumer is OK.
  2. Right, I see it happening for the new consumer only, and it's because we don't refresh the value of currentTimeMillis variable in case the for loop doesn't run. Actually, the fix for the issue (1) above will fix this one too.
  3. I'll try to run the system tests locally, but my resources are limited. In any case, @granders would you please give me access so I can run these tests on EC2? Thanks.

@vahidhashemian vahidhashemian force-pushed the KAFKA-3111 branch 3 times, most recently from 9105d09 to 6a890c1 Compare June 3, 2016 18:35
@vahidhashemian
Copy link
Copy Markdown
Contributor Author

I made the changes discussed. Just waiting for EC2 access so I can run system tests.

@granders
Copy link
Copy Markdown
Contributor

granders commented Jun 3, 2016

@vahidhashemian
You should now be able to trigger test runs here:
https://jenkins.confluent.io/job/system-test-kafka-branch-builder/

I already triggered a run of system tests on your branch here:
https://jenkins.confluent.io/job/system-test-kafka-branch-builder/460/console

@vahidhashemian
Copy link
Copy Markdown
Contributor Author

@granders Thank you. Appreciate the quick help.

@granders
Copy link
Copy Markdown
Contributor

granders commented Jun 3, 2016

No problem @vahidhashemian !

@vahidhashemian
Copy link
Copy Markdown
Contributor Author

@junrao Looks like the system tests ran without an issue. They failed the first time on one test perhaps due to a transient failure.

@vahidhashemian
Copy link
Copy Markdown
Contributor Author

@junrao It's been a while since the PR was updated and passed system tests. Do you see anything further that needs to be addressed? Thanks.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we move this line to before line 132? That will save a System.currentTimeMillis call in 132.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I hope I got it right in the updated patch.

@junrao
Copy link
Copy Markdown
Contributor

junrao commented Jul 8, 2016

@vahidhashemian : Sorry for the delay. Looks good. Just had a minor comment.

@vahidhashemian
Copy link
Copy Markdown
Contributor Author

@junrao Thanks for taking another look. I updated the patch as you suggested. Do you think another system test is required?

@junrao
Copy link
Copy Markdown
Contributor

junrao commented Jul 8, 2016

Thanks for the patch. LGTM

@asfgit asfgit closed this in 0edaa89 Jul 8, 2016
@Meghajnct
Copy link
Copy Markdown

Hi, I am using kafka 2.11_1.1.0 and still facing this issue. My understanding is this patch is available in kafka 1.1.0 version. Please let me know if still I still need to pull.
Exception ::
java.lang.ArithmeticException: / by zero
at kafka.network.Acceptor.run(SocketServer.scala:354)
at java.lang.Thread.run(Thread.java:748)
[2018-06-08 09:00:53,962] ERROR Error while accepting connection (kafka.network.Acceptor)
java.lang.ArithmeticException: / by zero
at kafka.network.Acceptor.run(SocketServer.scala:354)
at java.lang.Thread.run

Later I got it ::
ava.io.IOException: Connection to 1 was disconnected before the response was read
at org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:97)
at kafka.server.ReplicaFetcherBlockingSend.sendRequest(ReplicaFetcherBlockingSend.scala:96)
at kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:220)
at kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:43)
at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:146)
at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:111)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82)
[2018-06-08 09:07:00,463] INFO [ReplicaFetcherManager on broker 3] Removed fetcher for partitions Phenom_Track_TEST7_TOPIC-4 (kafka.server.ReplicaFetcherManager)
[2018-06-08

@vahidhashemian
Copy link
Copy Markdown
Contributor Author

@Meghajnct, this PR has been in the past few releases; but at least from the stack trace you shared, it doesn't seem the issue you're experiencing is related to this PR. I'd suggest you open a JIRA ticket and explain the issue in as much details as possible. Thanks!

omkreddy pushed a commit to omkreddy/kafka that referenced this pull request Sep 15, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants