-
Notifications
You must be signed in to change notification settings - Fork 44
Description
Currently, in HttpPostEmitter, when a Batch object is sealed, it will call onSealExclusive in HttpPostEmitter:
java-util/src/main/java/com/metamx/emitter/core/HttpPostEmitter.java
Lines 250 to 261 in 8eafafb
| void onSealExclusive(Batch batch) | |
| { | |
| addBatchToEmitQueue(batch); | |
| wakeUpEmittingThread(); | |
| if (!isTerminated()) { | |
| int nextBatchNumber = EmittedBatchCounter.nextBatchNumber(batch.batchNumber); | |
| if (!concurrentBatch.compareAndSet(batch, new Batch(this, acquireBuffer(), nextBatchNumber))) { | |
| // If compareAndSet failed, the service is closed concurrently. | |
| Preconditions.checkState(isTerminated()); | |
| } | |
| } | |
| } |
Notice that this will automatically create a new Batch object as the concurrentBatch. What this may result, and what we have observed, is that there may be an uncontrolled build up of these objects especially when the EmittingThread does not emit fast enough and result in OOME.
Without altering the existing behavior (e.g., additional block elsewhere in the code), I suggest that we can put in additional checks on queue length in emitAndReturnBatch:
java-util/src/main/java/com/metamx/emitter/core/HttpPostEmitter.java
Lines 216 to 227 in 8eafafb
| while (true) { | |
| Batch batch = concurrentBatch.get(); | |
| if (batch == null) { | |
| throw new RejectedExecutionException("Service is closed."); | |
| } | |
| if (batch.tryAddEvent(eventBytes)) { | |
| return batch; | |
| } | |
| // Spin loop, until the thread calling onSealExclusive() updates the concurrentBatch. This update becomes visible | |
| // eventually, because concurrentBatch.get() is a volatile read. | |
| } | |
| } |
While this is not strictly and directly linked to the queue but it should allow some degree of control in the queue length (i.e., not adding new events to the Batch when there are already plenty to clear) and at the same, the blocking is expected in this method call.