Skip to content

Minor clean up of LatchableEmitter / StubServiceEmitter#18255

Merged
kfaraz merged 6 commits intoapache:masterfrom
kfaraz:clean_up_stub_emitter
Jul 16, 2025
Merged

Minor clean up of LatchableEmitter / StubServiceEmitter#18255
kfaraz merged 6 commits intoapache:masterfrom
kfaraz:clean_up_stub_emitter

Conversation

@kfaraz
Copy link
Copy Markdown
Contributor

@kfaraz kfaraz commented Jul 15, 2025

Follow up to #18249 discussion

Changes

  • Maintain a List of processed events in LatchableEmitter.
    This is an improvement over the current flow where a copy of events is created upon receiving every new event.
  • When a new condition is registered, evaluate all past events upfront, then add it to the set of wait conditions
  • Evaluate each new event as it is received

Other changes

  • Hide the internal queue implementation of StubServiceEmitter from both tests and the sub-class LatchableEmitter
  • Reduce the usage of StubServiceEmitter.getEvents(). Use the inbuilt verifyValue methods instead.

@kfaraz kfaraz requested a review from kgyrtkirk July 15, 2025 10:26
@kfaraz
Copy link
Copy Markdown
Contributor Author

kfaraz commented Jul 15, 2025

@kgyrtkirk , I have tried to clean up the LatchableEmitter since it seemed wasteful to create a new list every time.

I have also removed the processedUntil flag to simplify the flow, while retaining readability.
The LatchableEmitter is now agnostic of how StubServiceEmitter stores events.

Please let me know if this makes sense.

Copy link
Copy Markdown
Member

@kgyrtkirk kgyrtkirk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @kfaraz, this cleaned up some stuff pretty nicely!
left a few notes here and there - I think only the comment on processedEvents matters


for (Event event : events) {
EventMap map = event.toMap();
for (StubServiceEmitter.ServiceMetricEventSnapshot event : events) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: right now there is event.getUserDims() and event.getMetricEvent().getUserDims() ; I have no clear understanding why this ServiceMetricEventSnapshot is necessary ; but if the userDims or some other things might be clobbered externally; it would be more straight to just add a copy constructor to ServiceMetricEvent -

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I am not sure why this class ServiceMetricEventSnapshot was added either.
Let me see what I can do to clean up this part.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Took a look at this. Apparently the userDims can change if the builder is reused for emitting another event.
But this is problematic anyway.

If the same event is to be emitted using (one or more) emitters, it is possible that before we even get to emit the event (as emission is always async), the dimensions of the event have changed.

I wonder if we should do this in a separate PR though as it is addressing a specific race condition. Let me know what you think.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left a comment on the original PR #17170 (comment)
I have added a ServiceMetricEvent.copy() for now as you suggested.

Comment thread server/src/test/java/org/apache/druid/server/ClientQuerySegmentWalkerTest.java Outdated
Comment on lines +194 to +196
if (condition.predicate.test(event)) {
condition.countDownLatch.countDown();
}
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: shouldn't this be condition#evaluate ?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops, let me fix this up!

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I checked this. It seems correct. The condition is evaluated (using condition.predicate.test()) against all the events and the latch is counted down if the condition is satisfied at any point.
I think we can probably break the loop once the condition is satisfied.

condition.countDownLatch.countDown();
}
}
waitConditions.add(condition);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: we don't necessarily need to add if the condition is already satisfied

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, that's true, let me check this again.

Comment thread server/src/test/java/org/apache/druid/server/metrics/LatchableEmitter.java Outdated
}
}
catch (Exception e) {
log.error(e, "Error while evaluating wait conditions");
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is partially unrelated; but I think this should be a serious error - (possibly the predicate is broken?)...repack into RuntimeException if necessary?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That would be good but this is running on a separate thread. So the thrown exception will not cause any real failure except for the countdown latch eventually timing out.

I could use futures instead of countdown latches, but the latch gives a cleaner flow.

Let me know if you have any suggestion to handle this.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why the need to put it on a different thread? it doesn't seem like its a lot of computation...

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, yeah, you are right. The original design needed the other thread because some conditions had to be evaluated from the start. But now we evaluate only one event at a time, so I guess we can do it synchronously after all.

Thanks for the suggestion!

Copy link
Copy Markdown
Contributor Author

@kfaraz kfaraz Jul 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am still not sure if throwing an exception here will help though.
The contract of Emitter.emit() says that it shouldn't throw an exception and only log warnings/errors.
An exception thrown here would still not fail the test directly as the emit() call might be happening on some service thread that just swallows up the exception.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm afraid we can't tell future bugs to not happen - so that apidoc is hard to live up to ; it may suggest to try to avoid that ; but the system invoking this api should fully prepared that something might go wrong even in this method

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair enough, I will throw a runtime exception here. Hopefully, it helps us catch issues early.

@kfaraz
Copy link
Copy Markdown
Contributor Author

kfaraz commented Jul 16, 2025

@kgyrtkirk , thanks a lot for the prompt review and for the suggestions!
I have incorporated your feedback. Please let me know if the changes look good.

@Akshat-Jain Akshat-Jain reopened this Jul 16, 2025
@kfaraz kfaraz merged commit 7aec41b into apache:master Jul 16, 2025
76 checks passed
@kfaraz kfaraz deleted the clean_up_stub_emitter branch July 16, 2025 14:13
ashibhardwaj pushed a commit to ashibhardwaj/druid that referenced this pull request Jul 23, 2025
Follow up to apache#18249

Changes:
- Maintain a List of processed events in `LatchableEmitter`.
This is an improvement over the current flow where a copy of events is created upon receiving every new event.
- When a new condition is registered, evaluate all past events upfront, then add it to the set of wait conditions
- Evaluate each new event as it is received

Other changes:
- Hide the internal queue implementation of `StubServiceEmitter` from tests and sub-classes
- Reduce the usage of `StubServiceEmitter.getEvents()`. Use the inbuilt `verifyValue` methods instead.
@cecemei cecemei added this to the 35.0.0 milestone Oct 21, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants