-
Notifications
You must be signed in to change notification settings - Fork 3k
Flink: add monitor metrics for Flink sink #5410
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
ac24945 to
e35087f
Compare
| private final AtomicLong deleteFilesRecordCount = new AtomicLong(); | ||
| private final AtomicLong deleteFilesByteCount = new AtomicLong(); | ||
|
|
||
| CommitSummary(NavigableMap<Long, WriteResult> pendingResults) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't all of these metrics come from the commit path? I think that these would be produced by core and then passed to Flink by adding a way to set the metrics context.
@nastra, what do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that makes a lot of sense to track those metrics in core itself and then have a particular metrics context in order to customize what type of metrics framework to use underneath.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the problem with the Listeners API is that it is a global static. There is no unregister API. it is difficult to associate listener at table scope, as we want to have IcebergFilesCommitter in Flink to register listener for the table that it writes to.
It would be convenient if PendingUpdate#commit return a CommitResult (instead of void). What do you think of adding a new UpdateResult PendingUpdate#apply() method that return the action result? @rdblue @nastra
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BTW, this class also consolidate the computations of the file count in various places in the code path. FlinkSink already have the information here. It doesn't have to rely on the callback from core module. Maybe we can share/reuse the CommitSummary class with the core module?
0e3cb16 to
075af33
Compare
| private final Histogram deleteFilesSizeHistogram; | ||
|
|
||
| IcebergStreamWriterMetrics(MetricGroup metrics, String fullTableName) { | ||
| MetricGroup writerMetrics = metrics.addGroup("IcebergStreamWriter", fullTableName); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The group is equivalent to a tag for a tag based reporter in Flink (e.g. Prometheus). Would something like table be clearer name for users who want to aggregate metrics by the full table name?
You can still use IcebergStreamWriter as a group without a value as a means to identify that metric belongs to IcebergStreamWriter in addition to the suggestion
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
agree. Thanks for your suggestions!
|
Does such change require documentation in the iceberg project? Otherwise it would be very difficult for the user to discover these additional metrics |
| WriteResult result = writer.complete(); | ||
| writerMetrics.updateFlushResult(result); | ||
| output.collect(new StreamRecord<>(result)); | ||
| writerMetrics.flushDuration(TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNano)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe it is clearer and better to avoid unnecessary conversion by using System.currentTimeMillis() and millis for the calculations since there is no extra precision from doing it in nanoseconds? Unless you want the extra precision, for which the return type long wouldn't suffice. Ditto on the similar instances to this calculation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is intentional (not for the reason of precision though).
nanoTime is monotonically increasing number. it is best fit for measuring duration. currentTimeMillis can jump forward or backward in the case of clock adjustments (e.g. NTP).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the explanation!
Doc update is typically done by separate PRs. Yes, we can add the metrics to the Iceberg docs. |
Basically, IcebergSourceReader will become part of metric name and table k-v pair will become a tag. This is suggested by Mason Chen in the comment below so that it is more consistent as other Flink connector metrics. apache#5410 (comment)
Basically, IcebergSourceReader will become part of metric name and table k-v pair will become a tag. This is suggested by Mason Chen in the comment below so that it is more consistent as other Flink connector metrics. apache#5410 (comment)
|
Thanks, @stevenzwu! |
No description provided.