diff --git a/core/src/main/java/org/apache/druid/java/util/emitter/core/Batch.java b/core/src/main/java/org/apache/druid/java/util/emitter/core/Batch.java index 5bc598a724de..fbcfb23f0f82 100644 --- a/core/src/main/java/org/apache/druid/java/util/emitter/core/Batch.java +++ b/core/src/main/java/org/apache/druid/java/util/emitter/core/Batch.java @@ -92,6 +92,9 @@ private static boolean isEmittingAllowed(long state) * Ordering number of this batch, as they filled & emitted in {@link HttpPostEmitter} serially, starting from 0. * It's a boxed Long rather than primitive long, because we want to minimize the number of allocations done in * {@link HttpPostEmitter#onSealExclusive} and so the probability of {@link OutOfMemoryError}. + * + * See {@link HttpPostEmitter#concurrentBatch} which may store this object. + * * @see HttpPostEmitter#onSealExclusive * @see HttpPostEmitter#concurrentBatch */ diff --git a/core/src/main/java/org/apache/druid/java/util/emitter/core/HttpPostEmitter.java b/core/src/main/java/org/apache/druid/java/util/emitter/core/HttpPostEmitter.java index 85ad787bcdcf..1036fb965329 100644 --- a/core/src/main/java/org/apache/druid/java/util/emitter/core/HttpPostEmitter.java +++ b/core/src/main/java/org/apache/druid/java/util/emitter/core/HttpPostEmitter.java @@ -109,10 +109,10 @@ public class HttpPostEmitter implements Flushable, Closeable, Emitter private final AtomicInteger approximateBuffersToReuseCount = new AtomicInteger(); /** - * concurrentBatch.get() == null means the service is closed. concurrentBatch.get() is the instance of Integer, - * it means that some thread has failed with a serious error during {@link #onSealExclusive} (with the batch number - * corresponding to the Integer object) and {@link #tryRecoverCurrentBatch} needs to be called. Otherwise (i. e. - * normally), an instance of {@link Batch} is stored in this atomic reference. + * concurrentBatch.get() == null means the service is closed. concurrentBatch.get() is the instance of Long (i. e. the + * type of {@link Batch#batchNumber}), it means that some thread has failed with a serious error during {@link + * #onSealExclusive} (with the batch number corresponding to the Long object) and {@link #tryRecoverCurrentBatch} + * needs to be called. Otherwise (i. e. normally), an instance of {@link Batch} is stored in this atomic reference. */ private final AtomicReference concurrentBatch = new AtomicReference<>(); @@ -251,8 +251,8 @@ Batch emitAndReturnBatch(Event event) while (true) { Object batchObj = concurrentBatch.get(); - if (batchObj instanceof Integer) { - tryRecoverCurrentBatch((Integer) batchObj); + if (batchObj instanceof Long) { + tryRecoverCurrentBatch((Long) batchObj); continue; } if (batchObj == null) { @@ -342,7 +342,7 @@ private void doOnSealExclusive(Batch batch, long elapsedTimeMillis) } } - private void tryRecoverCurrentBatch(Integer failedBatchNumber) + private void tryRecoverCurrentBatch(Long failedBatchNumber) { log.info("Trying to recover currentBatch"); long nextBatchNumber = ConcurrentAwaitableCounter.nextCount(failedBatchNumber); @@ -535,8 +535,8 @@ private boolean needsToShutdown() if (batch instanceof Batch) { ((Batch) batch).sealIfFlushNeeded(); } else { - // batch == null means that HttpPostEmitter is terminated. Batch object could also be Integer, if some - // thread just failed with a serious error in onSealExclusive(), in this case we don't want to shutdown + // batch == null means that HttpPostEmitter is terminated. Batch object might also be a Long object if some + // thread just failed with a serious error in onSealExclusive(). In this case we don't want to shutdown // the emitter thread. needsToShutdown = batch == null; }