From b3aad65175b01e941d599018bb542ac938990630 Mon Sep 17 00:00:00 2001 From: Roman Leventov Date: Wed, 14 Feb 2018 04:00:58 +0300 Subject: [PATCH] Make HttpPostEmitter more robust in the face of serious errors (like OutOfMemoryError) during onSealExclusive() (#5386) --- .../druid/java/util/emitter/core/Batch.java | 6 +- .../util/emitter/core/HttpPostEmitter.java | 83 +++++++++++++++---- 2 files changed, 72 insertions(+), 17 deletions(-) diff --git a/java-util/src/main/java/io/druid/java/util/emitter/core/Batch.java b/java-util/src/main/java/io/druid/java/util/emitter/core/Batch.java index 16dbab4532f1..81a3aefe9697 100644 --- a/java-util/src/main/java/io/druid/java/util/emitter/core/Batch.java +++ b/java-util/src/main/java/io/druid/java/util/emitter/core/Batch.java @@ -87,8 +87,12 @@ private static boolean isEmittingAllowed(long state) /** * Ordering number of this batch, as they filled & emitted in {@link HttpPostEmitter} serially, starting from 0. + * It's a boxed Integer rather than int, because we want to minimize the number of allocations done in + * {@link HttpPostEmitter#onSealExclusive} and so the probability of {@link OutOfMemoryError}. + * @see HttpPostEmitter#onSealExclusive + * @see HttpPostEmitter#concurrentBatch */ - final int batchNumber; + final Integer batchNumber; /** * The number of events in this batch, needed for event count-based batch emitting. diff --git a/java-util/src/main/java/io/druid/java/util/emitter/core/HttpPostEmitter.java b/java-util/src/main/java/io/druid/java/util/emitter/core/HttpPostEmitter.java index a9303d9ccaed..d6d51a423ee4 100644 --- a/java-util/src/main/java/io/druid/java/util/emitter/core/HttpPostEmitter.java +++ b/java-util/src/main/java/io/druid/java/util/emitter/core/HttpPostEmitter.java @@ -110,9 +110,12 @@ public class HttpPostEmitter implements Flushable, Closeable, Emitter private final AtomicInteger approximateBuffersToReuseCount = new AtomicInteger(); /** - * concurrentBatch.get() == null means the service is closed. + * concurrentBatch.get() == null means the service is closed. concurrentBatch.get() is the instance of Integer, + * it means that some thread has failed with a serious error during {@link #onSealExclusive} (with the batch number + * corresponding to the Integer object) and {@link #tryRecoverCurrentBatch} needs to be called. Otherwise (i. e. + * normally), an instance of {@link Batch} is stored in this atomic reference. */ - private final AtomicReference concurrentBatch = new AtomicReference<>(); + private final AtomicReference concurrentBatch = new AtomicReference<>(); private final ConcurrentLinkedDeque buffersToEmit = new ConcurrentLinkedDeque<>(); /** @@ -246,10 +249,15 @@ Batch emitAndReturnBatch(Event event) } while (true) { - Batch batch = concurrentBatch.get(); - if (batch == null) { + Object batchObj = concurrentBatch.get(); + if (batchObj instanceof Integer) { + tryRecoverCurrentBatch((Integer) batchObj); + continue; + } + if (batchObj == null) { throw new RejectedExecutionException("Service is closed."); } + Batch batch = (Batch) batchObj; if (batch.tryAddEvent(eventBytes)) { return batch; } @@ -290,6 +298,25 @@ private void writeLargeEvent(byte[] eventBytes) * Called from {@link Batch} only once for each Batch in existence. */ void onSealExclusive(Batch batch, long elapsedTimeMillis) + { + try { + doOnSealExclusive(batch, elapsedTimeMillis); + } + catch (Throwable t) { + try { + if (!concurrentBatch.compareAndSet(batch, batch.batchNumber)) { + log.error("Unexpected failure to set currentBatch to the failed Batch.batchNumber"); + } + log.error(t, "Serious error during onSealExclusive(), set currentBatch to the failed Batch.batchNumber"); + } + catch (Throwable t2) { + t.addSuppressed(t2); + } + throw t; + } + } + + private void doOnSealExclusive(Batch batch, long elapsedTimeMillis) { batchFillingTimeCounter.add((int) Math.max(elapsedTimeMillis, 0)); if (elapsedTimeMillis > 0) { @@ -301,13 +328,29 @@ void onSealExclusive(Batch batch, long elapsedTimeMillis) wakeUpEmittingThread(); if (!isTerminated()) { int nextBatchNumber = EmittedBatchCounter.nextBatchNumber(batch.batchNumber); - if (!concurrentBatch.compareAndSet(batch, new Batch(this, acquireBuffer(), nextBatchNumber))) { - // If compareAndSet failed, the service is closed concurrently. + byte[] newBuffer = acquireBuffer(); + if (!concurrentBatch.compareAndSet(batch, new Batch(this, newBuffer, nextBatchNumber))) { + buffersToReuse.add(newBuffer); + // If compareAndSet failed, the service should be closed concurrently, i. e. we expect isTerminated() = true. + // If we don't see this, there should be some bug in HttpPostEmitter. Preconditions.checkState(isTerminated()); } } } + private void tryRecoverCurrentBatch(Integer failedBatchNumber) + { + log.info("Trying to recover currentBatch"); + int nextBatchNumber = EmittedBatchCounter.nextBatchNumber(failedBatchNumber); + byte[] newBuffer = acquireBuffer(); + if (concurrentBatch.compareAndSet(failedBatchNumber, new Batch(this, newBuffer, nextBatchNumber))) { + log.info("Successfully recovered currentBatch"); + } else { + // It's normal, a concurrent thread could succeed to recover first. + buffersToReuse.add(newBuffer); + } + } + private void addBatchToEmitQueue(Batch batch) { limitBuffersToEmitSize(); @@ -359,7 +402,10 @@ private void wakeUpEmittingThread() public void flush() throws IOException { awaitStarted(); - flush(concurrentBatch.get()); + Object batchObj = concurrentBatch.get(); + if (batchObj instanceof Batch) { + flush((Batch) batchObj); + } } private void flush(Batch batch) throws IOException @@ -392,8 +438,10 @@ public void close() throws IOException synchronized (startLock) { if (running) { running = false; - Batch lastBatch = concurrentBatch.getAndSet(null); - flush(lastBatch); + Object lastBatch = concurrentBatch.getAndSet(null); + if (lastBatch instanceof Batch) { + flush((Batch) lastBatch); + } emittingThread.shuttingDown = true; // EmittingThread is interrupted after the last batch is flushed. emittingThread.interrupt(); @@ -474,16 +522,19 @@ private boolean needsToShutdown() { boolean needsToShutdown = Thread.interrupted() || shuttingDown; if (needsToShutdown) { - Batch lastBatch = concurrentBatch.getAndSet(null); - if (lastBatch != null) { - lastBatch.seal(); + Object lastBatch = concurrentBatch.getAndSet(null); + if (lastBatch instanceof Batch) { + ((Batch) lastBatch).seal(); } } else { - Batch batch = concurrentBatch.get(); - if (batch != null) { - batch.sealIfFlushNeeded(); + Object batch = concurrentBatch.get(); + if (batch instanceof Batch) { + ((Batch) batch).sealIfFlushNeeded(); } else { - needsToShutdown = true; + // batch == null means that HttpPostEmitter is terminated. Batch object could also be Integer, if some + // thread just failed with a serious error in onSealExclusive(), in this case we don't want to shutdown + // the emitter thread. + needsToShutdown = batch == null; } } return needsToShutdown;