Motivation
See the issue raised by @leventov (#7057).
Proposed changes
I think that out of the five Emitters listed in #7057, I would change all but StatsDEmitter. The code that handles enqueuing for the StatsDEmitter is in library class NonBlockingStatsDClient so there wouldn’t really be any way of changing it without rewriting the class.
The two approaches to refactoring:
-
In an ideal world, I'd create an abstract class QueueBasedEmitter which holds the event queue and implements int offerAndHandleFailure(event), and have the four emitters that write to queue (see list in Motivation) extend it. This method would try to offer the event to the queue. If the offering fails, events would be discarded from the queue (based on the configurable policy) until the offer succeeds (this approach assumes that the only reason for a offer() call failing is a full queue). The number of discarded events would be returned from the function so that the Emitter implementation can use it accordingly; for example, KafkaEmitter has a count of lost events that is periodically logged. However, I'm not sure if this is feasible since AmbariMetricsEmitter already extends AbstractTimelineMetricsSink and Java doesn't support multiple inheritance. It would be possible if AmbariMetricsEmitter was ignored in the refactor but that doesn't seem to be a very object-oriented approach.
-
I think the more feasible approach would be to create a helper class that accepts the event queue as a constructor argument and implements int offerAndHandleFailure(event). The helper class would be instantiated when the Emitter constructor is called. The helper would work in AmbariMetricsEmitter, KafkaEmitter, and GraphEmitter with minimal change since they all already use BlockingQueues. HttpPostEmitter can use the queue approach if its buffersToEmit Deque is converted to a BlockingQueue. I think this substitution would work since only addLast() and pollFirst() are called on the Deque.
With regard to configuration, I'm thinking of adding the following config parameters:
...emitter.eventThrottlingPolicy [String]: specifies what policy to follow when the queue is full and a new event is offered (i.e. BlockingQueue#offer() times out)
- "dropOldest" (default)": discards oldest event from the queue
- "dropNewest": discards the event that was just offered
...emitter.eventThrottlingLoggingFrequency [double from 0 to 1]: specifies how often an event being dropped should be logged. This policy is pretty naive as it assumes the offer failure frequency is constant. A better (but more challenging) approach might be to tune the logging rate based on the number of failures that have occurred in the past n seconds/minutes.
- 1: Log every dropped event
- 0.001: Log 1 in every 1000 dropped events
- 0: Don't log any dropped events
If anyone thinks that any of the above configs should be removed or modified, or another should be added, leave a comment below.
Rationale
I believe the helper-based approach to refactoring event emission is the best approach because it is the most object-oriented and avoids doing anything hacky with existing code. The one other option is to have each Emitter implementation implement its own handling of the event throttling policy but that doesn't really help with code reuse.
Operational impact
Likely not? It would be awesome if someone with more knowledge of users' use cases with metrics could comment on this.
Open questions
- What's the best thing to log when an event is dropped? Count of dropped events, a serialized version of the event, or just the fact that an event has been discarded?
- Does this change have an operational impact?
Motivation
See the issue raised by @leventov (#7057).
Proposed changes
I think that out of the five Emitters listed in #7057, I would change all but
StatsDEmitter. The code that handles enqueuing for theStatsDEmitteris in library classNonBlockingStatsDClientso there wouldn’t really be any way of changing it without rewriting the class.The two approaches to refactoring:
In an ideal world, I'd create an abstract class
QueueBasedEmitterwhich holds the event queue and implementsint offerAndHandleFailure(event), and have the four emitters that write to queue (see list inMotivation) extend it. This method would try to offer the event to the queue. If the offering fails, events would be discarded from the queue (based on the configurable policy) until the offer succeeds (this approach assumes that the only reason for aoffer()call failing is a full queue). The number of discarded events would be returned from the function so that theEmitterimplementation can use it accordingly; for example,KafkaEmitterhas a count of lost events that is periodically logged. However, I'm not sure if this is feasible sinceAmbariMetricsEmitteralready extendsAbstractTimelineMetricsSinkand Java doesn't support multiple inheritance. It would be possible ifAmbariMetricsEmitterwas ignored in the refactor but that doesn't seem to be a very object-oriented approach.I think the more feasible approach would be to create a helper class that accepts the event queue as a constructor argument and implements
int offerAndHandleFailure(event). The helper class would be instantiated when the Emitter constructor is called. The helper would work inAmbariMetricsEmitter,KafkaEmitter, andGraphEmitterwith minimal change since they all already use BlockingQueues.HttpPostEmittercan use the queue approach if itsbuffersToEmitDeque is converted to a BlockingQueue. I think this substitution would work since onlyaddLast()andpollFirst()are called on the Deque.With regard to configuration, I'm thinking of adding the following config parameters:
...emitter.eventThrottlingPolicy[String]: specifies what policy to follow when the queue is full and a new event is offered (i.e.BlockingQueue#offer()times out)...emitter.eventThrottlingLoggingFrequency[double from 0 to 1]: specifies how often an event being dropped should be logged. This policy is pretty naive as it assumes the offer failure frequency is constant. A better (but more challenging) approach might be to tune the logging rate based on the number of failures that have occurred in the past n seconds/minutes.If anyone thinks that any of the above configs should be removed or modified, or another should be added, leave a comment below.
Rationale
I believe the helper-based approach to refactoring event emission is the best approach because it is the most object-oriented and avoids doing anything hacky with existing code. The one other option is to have each Emitter implementation implement its own handling of the event throttling policy but that doesn't really help with code reuse.
Operational impact
Likely not? It would be awesome if someone with more knowledge of users' use cases with metrics could comment on this.
Open questions