Skip to content

Conversation

@talatuyarer
Copy link
Contributor

@talatuyarer talatuyarer commented Feb 19, 2023

We have a Flink Job which does not emit backlog metrics. Actually metrics are emitting in KafkaIO. However I could not see them on Flink Metric system. Looks like Beam -> Flink wiring is broken. I set metricContext in Checkpointing phase which is the place metrics emit on UnBoundedReader.

@mxm @tweise @angoenka Could you please review my MR ? I tested in our env. It is working as expected.

@github-actions
Copy link
Contributor

Checks are failing. Will not request review until checks are succeeding. If you'd like to override that behavior, comment assign set of reviewers

@talatuyarer
Copy link
Contributor Author

assign set of reviewers

1 similar comment
@apilloud
Copy link
Member

assign set of reviewers

@github-actions
Copy link
Contributor

No reviewers could be found from any of the labels on the PR or in the fallback reviewers list. Check the config file to make sure reviewers are configured

@apilloud
Copy link
Member

Run Java PreCommit

Copy link
Contributor

@je-ik je-ik left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we add test to verify that the change brings the expected behavior?

The test might probably just verify that the metrics are available after running a simple pipeline with some source that updates the metrics.

@je-ik
Copy link
Contributor

je-ik commented Feb 27, 2023

Could we add test to verify that the change brings the expected behavior?

The test might probably just verify that the metrics are available after running a simple pipeline with some source that updates the metrics.

There is an analogous test in SourceInputFormatTest.testAccumulatorRegistrationOnOperatorClose that can serve as an inspiration.

}

ReaderInvocationUtil<OutputT, UnboundedSource.UnboundedReader<OutputT>> readerInvoker =
new ReaderInvocationUtil<>(stepName, serializedOptions.get(), metricContainer);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be moved to a field and reused between calls to run and snapshotState.

@je-ik
Copy link
Contributor

je-ik commented Feb 27, 2023

We have a Flink Job which does not emit backlog metrics. Actually metrics are emitting in KafkaIO. However I could not see them on Flink Metric system. Looks like Beam -> Flink wiring is broken. I set metricContext in Checkpointing phase which is the place metrics emit on UnBoundedReader.

@mxm @tweise @angoenka Could you please review my MR ? I tested in our env. It is working as expected.

I wonder, why do we need to write metrics into the context on checkpoint? The current code does so in the call to advance(), is that not sufficient?

@talatuyarer
Copy link
Contributor Author

Hey @je-ik

I wonder, why do we need to write metrics into the context on checkpoint? The current code does so in the call to advance(), is that not sufficient?

Current code does not emit backlog metrics on advance. I see it is on checkpointing phase https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L255

Do I miss anything ?

@je-ik
Copy link
Contributor

je-ik commented Feb 28, 2023

If I understand that correctly (and I'm definitely not sure about that :)), then you refer to the time when metrics are compute in the source (KafkaSource in this case). But the wrapper should update the metric container on each call to advance, see [1]. I wonder why it is needed to do the same when doing checkpoint (though it sort might make more sense than to do that on each call to advance()).- I'm not saying it is wrong, I'd just like to understand the reason.

[1] https://github.com/apache/beam/pull/25554/files#diff-3f0d15eaf4f2c4979c6ac3fb42cd02819b1e5159ad7b28f7ac052ecd0bb21768L60

@talatuyarer
Copy link
Contributor Author

talatuyarer commented Feb 28, 2023

If I understand that correctly (and I'm definitely not sure about that :)), then you refer to the time when metrics are compute in the source (KafkaSource in this case). But the wrapper should update the metric container on each call to advance, see [1]. I wonder why it is needed to do the same when doing checkpoint (though it sort might make more sense than to do that on each call to advance()).- I'm not saying it is wrong, I'd just like to understand the reason.

Backlog is reported from getCheckpointMark(), which is done by some other thread. Not sure why it is done there. But this is the main issue. Advance function runs on main thread and it has Metric context so I am able to see element count metrics. However Checkpoint thread does not have context thats why It can not emit metrics.

If I move reportBacklog() function in advance function i am able to see backlog metrics too. We could do that in advance(), but that would unnecessary overhead for every single record. :)

@je-ik
Copy link
Contributor

je-ik commented Feb 28, 2023

No, I think there is good reason the backlog is computed on checkpoint only. :)
Yes, there are two threads, one thread runs the source and the other does the checkpoint. There is a lock (getCheckpointLock()) that ensures that this is consistent. If the source updates its internal metrics from the checkpoint thread, I would feel these metrics should be reported to flink on the first call to advance() - which is apparently not the case, I'm just a little struggling to see why. This might indicate some other bug somewhere.

@je-ik
Copy link
Contributor

je-ik commented Mar 6, 2023

retest this please

@talatuyarer
Copy link
Contributor Author

@je-ik Looks like this change is not working. Based on initial experiment. Whenever I enable beammetrics Pipeline stop processing. Do you have any suggestion ? I believe there is a concurrency issue. But I still count not define the issue.

@je-ik
Copy link
Contributor

je-ik commented Mar 9, 2023

What you describe sounds like a deadlock. That was what I was trying to figure out, the metrics are generated in the checkpoint thread, but should be read in the thread that calls advance(). Therefore the backlog metrics should be accessible, the fact that they are not are quite suspicious. But other than that I currently don't have enough understanding of how the metrics are binded between beam metrics and flink metrics, very quickly looking into the code, I do not see how the backlog metrics could get out of the KafkaUnboundedReader, because I'm struggling to find how the metrics are "registered" in any container for export.

@github-actions
Copy link
Contributor

github-actions bot commented Jun 6, 2023

This pull request has been marked as stale due to 60 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the dev@beam.apache.org list. Thank you for your contributions.

@github-actions github-actions bot added the stale label Jun 6, 2023
@github-actions
Copy link
Contributor

This pull request has been closed due to lack of activity. If you think that is incorrect, or the pull request requires review, you can revive the PR at any time.

@github-actions github-actions bot closed this Jun 14, 2023
@oswidan97
Copy link

I am facing a similar issue while using JdbcIO, I can't see the metrics that are emitted in the Flink Metric system. I noticed a more recent PR with the same changes but it's also closed. @talatuyarer did the changes here fix the issue for you?

I am using the apache beam version 2.56.0 and beam-runners-flink-1.17

@kennknowles
Copy link
Member

I'd love to see this or similar fix merged.

@kennknowles
Copy link
Member

If I understand that correctly (and I'm definitely not sure about that :)), then you refer to the time when metrics are compute in the source (KafkaSource in this case). But the wrapper should update the metric container on each call to advance, see [1]. I wonder why it is needed to do the same when doing checkpoint (though it sort might make more sense than to do that on each call to advance()).- I'm not saying it is wrong, I'd just like to understand the reason.

Backlog is reported from getCheckpointMark(), which is done by some other thread. Not sure why it is done there. But this is the main issue. Advance function runs on main thread and it has Metric context so I am able to see element count metrics. However Checkpoint thread does not have context thats why It can not emit metrics.

If I move reportBacklog() function in advance function i am able to see backlog metrics too. We could do that in advance(), but that would unnecessary overhead for every single record. :)

In current code advance() calls nextBatch() which calls reportBacklog():

So there might be some other issue. But if there is a call to reportBacklog() in getCheckpointMark in KafkaIO it is a good thing for the runner to provide a working metrics container. I don't understand this area of the FlinkRunner well enough to know why each method is custom with a temporary container, but we should do it.

@kennknowles
Copy link
Member

If I understand that correctly (and I'm definitely not sure about that :)), then you refer to the time when metrics are compute in the source (KafkaSource in this case). But the wrapper should update the metric container on each call to advance, see [1]. I wonder why it is needed to do the same when doing checkpoint (though it sort might make more sense than to do that on each call to advance()).- I'm not saying it is wrong, I'd just like to understand the reason.

Backlog is reported from getCheckpointMark(), which is done by some other thread. Not sure why it is done there. But this is the main issue. Advance function runs on main thread and it has Metric context so I am able to see element count metrics. However Checkpoint thread does not have context thats why It can not emit metrics.
If I move reportBacklog() function in advance function i am able to see backlog metrics too. We could do that in advance(), but that would unnecessary overhead for every single record. :)

In current code advance() calls nextBatch() which calls reportBacklog():

So there might be some other issue. But if there is a call to reportBacklog() in getCheckpointMark in KafkaIO it is a good thing for the runner to provide a working metrics container. I don't understand this area of the FlinkRunner well enough to know why each method is custom with a temporary container, but we should do it.

The change is recent, #32921

@github-actions github-actions bot removed the stale label Jan 14, 2025
@github-actions
Copy link
Contributor

This pull request has been marked as stale due to 60 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the dev@beam.apache.org list. Thank you for your contributions.

@github-actions github-actions bot added the stale label Mar 16, 2025
@github-actions
Copy link
Contributor

This pull request has been closed due to lack of activity. If you think that is incorrect, or the pull request requires review, you can revive the PR at any time.

@github-actions github-actions bot closed this Mar 23, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants