-
Notifications
You must be signed in to change notification settings - Fork 3k
Flink: fix the bug where metrics are registered in split reader. Also updated reader metric group to be more consistent with Flink metrics style. #5554
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Just realized a bug in the source reader metrics. can't register it in split reader as split reader can be re-created as previous fetcher exited. As a result, we will see registration failure later and metrics value will never be updated again. Need to construct the metrics at reader (not split reader) level. |
ce82ddb to
3d6fd94
Compare
|
@stevenzwu: Could we create a test for the bug mentioned above? Also, could we prefix the PR with Thanks, Peter |
| this.assignedBytes = readerMetrics.counter("assignedBytes"); | ||
| this.finishedSplits = readerMetrics.counter("finishedSplits"); | ||
| this.finishedBytes = readerMetrics.counter("finishedBytes"); | ||
| this.splitReaderFetchCalls = readerMetrics.counter("splitReaderFetchCalls"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How is this metric used?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good question. It is measuring as the counter of polling of record batch. mainly meant for liveness signal. Kind of like the Kafka consumer fetch request count.
I am open to remove it if we think it is not very useful.
Basically, IcebergSourceReader will become part of metric name and table k-v pair will become a tag. This is suggested by Mason Chen in the comment below so that it is more consistent as other Flink connector metrics. apache#5410 (comment)
…r/reader is closed.
3d6fd94 to
22c92a4
Compare
|
@pvary It took some effort. But I managed to add a unit test with a custom metric group class so that counter values can be inspected. I verified that the unit test failed before the fix and passed after the fix. |
| this.appenderFactory = new GenericAppenderFactory(TestFixtures.SCHEMA); | ||
| } | ||
|
|
||
| private List<List<Record>> createRecordBatchList(int batchCount) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These two util methods are moved to ReaderUtil
| this.counters = Maps.newHashMap(); | ||
| } | ||
|
|
||
| /** Pass along the reference to share the map for child metric groups. */ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the trick part because IcebergSourceReaderMetrics calls addGroup to create child metric group. We need to share the reference so that we can hijack the counter creation from the child group.
| return new IcebergSourceReader<>(readerMetrics, readerFunction, readerContext); | ||
| } | ||
|
|
||
| private static class TestingMetricGroup extends UnregisteredMetricsGroup |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just a question: Is it possible that this class would be useful for other tests later? If so, maybe putting it to the org.apache.flink.connector.testutils.source.reader package would be better. OTOH if we think this will never be used anywhere else, then this is a good place.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't know if it will be used by other tests or not. I typically like to refactor later when the needs come. But I see the TestingMetricGroup has been big enough to be a stand-alone class. Hence I moved it to its own file. it is package private for now.
| } | ||
|
|
||
| @Override | ||
| public void setPendingBytesGauge(Gauge<Long> pendingBytesGauge) {} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not: I have not seen empty methods formatted this way in the Iceberg code yet. Could you please check how should they be formatted?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is from latest spotless auto-formatting with google java format.
here is an example from FileIO
default void initialize(Map<String, String> properties) {}
|
LGTM, just 2 small questions. |
…ends on timing. it is not essential to this testing on counters. assignedSplits metric should be good enough and its assertion should be reliable.
|
Thanks @stevenzwu for the fix an @mas-chen for the review! |
can't register it in split reader as split reader can be re-created as previous fetcher exited. As a result, we will see registration failure later and metrics value will never be updated again. Hence we need to construct the metrics object/registration when reader (not split reader) is created.
Basically, IcebergSourceReader will become part of metric name and table k-v pair will become a tag. This is suggested by Mason Chen in the comment below so that it is more consistent as other Flink connector metrics. #5410 (comment)