Skip to content

Kafka: Emit production rate#17491

Closed
arunramani wants to merge 2 commits intoapache:masterfrom
arunramani:master
Closed

Kafka: Emit production rate#17491
arunramani wants to merge 2 commits intoapache:masterfrom
arunramani:master

Conversation

@arunramani
Copy link
Copy Markdown
Contributor

@arunramani arunramani commented Nov 19, 2024

Description

Add metric to track Kafka message production rate which can be useful for correlating with Kafka lag. For each collection period, the latest Kafka broker offset is compared with the previous minute to calculate the production rate.

One interesting use case is being able to use this metric to roughly calculate lag as time. This can be done by calculating the time t such that sum(time = now to t) of productionRate = lag i.e. how many minutes of production equals the current lag.

Release note

Added a new streaming ingest metric ingest/kafka/partitionProduction


Key changed/added classes in this PR
  • SeekableStreamingSupervisor
  • KafkaSupervisor

This PR has:

  • been self-reviewed.
  • added documentation for new or modified features or behaviors.
  • a release note entry in the PR description.
  • added Javadocs for most classes and all non-trivial methods. Linked related entities via Javadoc links.
  • added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
  • added unit tests or modified existing tests to cover new code paths, ensuring the threshold for code coverage is met.
  • added integration tests.
  • been tested in a test Druid cluster.

@kfaraz
Copy link
Copy Markdown
Contributor

kfaraz commented Nov 20, 2024

@arunramani , could you please share some details as to how this metric would provide additional insight that is not already covered by metrics like Kafka lag, partition lag and/or message gap?

Copy link
Copy Markdown
Contributor

@kfaraz kfaraz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@arunramani , there are some problems with the current approach.
There is no guarantee that metrics would be emitted strictly every minute. This could lead to incorrect rate values (and thus an incorrect calculation of time lag downstream) since we are really just emitting the diff between the latestSequence at the last metric emission time at the latestSequence at the current time.

IIUC, we are trying to calculate the lag in terms of "how many minutes worth of records is the supervisor yet to process".
This value is fairly similar to the message gap but not quite the same.
The message gap is the "difference between the current timestamp of the system and the timestamp of the latest ingested record".
Whereas here, I think we want the "difference between the timestamp of the latest ingested record and the timestamp of the latest record in the stream".

If that is the case, I wonder if we even need the production rate.
Instead, do you think it would make more sense to simply calculate this timestamp difference and emit it?
Or are there challenges with that approach?

@github-actions
Copy link
Copy Markdown

github-actions Bot commented Feb 4, 2025

This pull request has been marked as stale due to 60 days of inactivity.
It will be closed in 4 weeks if no further activity occurs. If you think
that's incorrect or this pull request should instead be reviewed, please simply
write any comment. Even if closed, you can still revive the PR at any time or
discuss it on the dev@druid.apache.org list.
Thank you for your contributions.

@github-actions github-actions Bot added the stale label Feb 4, 2025
@ac9817
Copy link
Copy Markdown
Contributor

ac9817 commented Feb 14, 2025

IIUC, we are trying to calculate the lag in terms of "how many minutes worth of records is the supervisor yet to process". This value is fairly similar to the message gap but not quite the same. The message gap is the "difference between the current timestamp of the system and the timestamp of the latest ingested record". Whereas here, I think we want the "difference between the timestamp of the latest ingested record and the timestamp of the latest record in the stream".

@kfaraz I think having this approach would fail for late/early arriving data which happens very frequently with the streams that we are trying to deal. The best approach we have is to calculate the production rate and then make a best estimation of how many minutes are we lagging behind. Do you think this PR works in achieving that ?

@github-actions github-actions Bot removed the stale label Feb 15, 2025
@kfaraz
Copy link
Copy Markdown
Contributor

kfaraz commented Feb 17, 2025

@adithyachakilam , you need not rely on the ingested timestamp for this.
You should use the Kafka record timestamp (kafka.timestamp) instead.
This would be fairly straightforward. (Let me know if you face challenges with implementing this.)
On the contrary, the production rate approach seems complicated to implement, error prone and less intuitive.

PS: You may put the new logic in KafkaSupervisor.getPartitionTimeLag().

@ac9817
Copy link
Copy Markdown
Contributor

ac9817 commented Feb 17, 2025

@kfaraz Thanks for comments, opened a new PR: #17735

@kfaraz
Copy link
Copy Markdown
Contributor

kfaraz commented Feb 18, 2025

Closing this PR in favor of #17735

@kfaraz kfaraz closed this Feb 18, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants