Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,12 @@ private static boolean isEmittingAllowed(long state)

/**
* Ordering number of this batch, as they filled & emitted in {@link HttpPostEmitter} serially, starting from 0.
* It's a boxed Integer rather than int, because we want to minimize the number of allocations done in
* {@link HttpPostEmitter#onSealExclusive} and so the probability of {@link OutOfMemoryError}.
* @see HttpPostEmitter#onSealExclusive
* @see HttpPostEmitter#concurrentBatch
*/
final int batchNumber;
final Integer batchNumber;

/**
* The number of events in this batch, needed for event count-based batch emitting.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,9 +110,12 @@ public class HttpPostEmitter implements Flushable, Closeable, Emitter
private final AtomicInteger approximateBuffersToReuseCount = new AtomicInteger();

/**
* concurrentBatch.get() == null means the service is closed.
* concurrentBatch.get() == null means the service is closed. concurrentBatch.get() is the instance of Integer,
* it means that some thread has failed with a serious error during {@link #onSealExclusive} (with the batch number
* corresponding to the Integer object) and {@link #tryRecoverCurrentBatch} needs to be called. Otherwise (i. e.
* normally), an instance of {@link Batch} is stored in this atomic reference.
*/
private final AtomicReference<Batch> concurrentBatch = new AtomicReference<>();
private final AtomicReference<Object> concurrentBatch = new AtomicReference<>();

private final ConcurrentLinkedDeque<Batch> buffersToEmit = new ConcurrentLinkedDeque<>();
/**
Expand Down Expand Up @@ -246,10 +249,15 @@ Batch emitAndReturnBatch(Event event)
}

while (true) {
Batch batch = concurrentBatch.get();
if (batch == null) {
Object batchObj = concurrentBatch.get();
if (batchObj instanceof Integer) {
tryRecoverCurrentBatch((Integer) batchObj);
continue;
}
if (batchObj == null) {
throw new RejectedExecutionException("Service is closed.");
}
Batch batch = (Batch) batchObj;
if (batch.tryAddEvent(eventBytes)) {
return batch;
}
Expand Down Expand Up @@ -290,6 +298,25 @@ private void writeLargeEvent(byte[] eventBytes)
* Called from {@link Batch} only once for each Batch in existence.
*/
void onSealExclusive(Batch batch, long elapsedTimeMillis)
{
try {
doOnSealExclusive(batch, elapsedTimeMillis);
}
catch (Throwable t) {
try {
if (!concurrentBatch.compareAndSet(batch, batch.batchNumber)) {
log.error("Unexpected failure to set currentBatch to the failed Batch.batchNumber");
}
log.error(t, "Serious error during onSealExclusive(), set currentBatch to the failed Batch.batchNumber");
}
catch (Throwable t2) {
t.addSuppressed(t2);
}
throw t;
}
}

private void doOnSealExclusive(Batch batch, long elapsedTimeMillis)
{
batchFillingTimeCounter.add((int) Math.max(elapsedTimeMillis, 0));
if (elapsedTimeMillis > 0) {
Expand All @@ -301,13 +328,29 @@ void onSealExclusive(Batch batch, long elapsedTimeMillis)
wakeUpEmittingThread();
if (!isTerminated()) {
int nextBatchNumber = EmittedBatchCounter.nextBatchNumber(batch.batchNumber);
if (!concurrentBatch.compareAndSet(batch, new Batch(this, acquireBuffer(), nextBatchNumber))) {
// If compareAndSet failed, the service is closed concurrently.
byte[] newBuffer = acquireBuffer();
if (!concurrentBatch.compareAndSet(batch, new Batch(this, newBuffer, nextBatchNumber))) {
buffersToReuse.add(newBuffer);
// If compareAndSet failed, the service should be closed concurrently, i. e. we expect isTerminated() = true.
// If we don't see this, there should be some bug in HttpPostEmitter.
Preconditions.checkState(isTerminated());
}
}
}

private void tryRecoverCurrentBatch(Integer failedBatchNumber)
{
log.info("Trying to recover currentBatch");
int nextBatchNumber = EmittedBatchCounter.nextBatchNumber(failedBatchNumber);
byte[] newBuffer = acquireBuffer();
if (concurrentBatch.compareAndSet(failedBatchNumber, new Batch(this, newBuffer, nextBatchNumber))) {
log.info("Successfully recovered currentBatch");
} else {
// It's normal, a concurrent thread could succeed to recover first.
buffersToReuse.add(newBuffer);
}
}

private void addBatchToEmitQueue(Batch batch)
{
limitBuffersToEmitSize();
Expand Down Expand Up @@ -359,7 +402,10 @@ private void wakeUpEmittingThread()
public void flush() throws IOException
{
awaitStarted();
flush(concurrentBatch.get());
Object batchObj = concurrentBatch.get();
if (batchObj instanceof Batch) {
flush((Batch) batchObj);
}
}

private void flush(Batch batch) throws IOException
Expand Down Expand Up @@ -392,8 +438,10 @@ public void close() throws IOException
synchronized (startLock) {
if (running) {
running = false;
Batch lastBatch = concurrentBatch.getAndSet(null);
flush(lastBatch);
Object lastBatch = concurrentBatch.getAndSet(null);
if (lastBatch instanceof Batch) {
flush((Batch) lastBatch);
}
emittingThread.shuttingDown = true;
// EmittingThread is interrupted after the last batch is flushed.
emittingThread.interrupt();
Expand Down Expand Up @@ -474,16 +522,19 @@ private boolean needsToShutdown()
{
boolean needsToShutdown = Thread.interrupted() || shuttingDown;
if (needsToShutdown) {
Batch lastBatch = concurrentBatch.getAndSet(null);
if (lastBatch != null) {
lastBatch.seal();
Object lastBatch = concurrentBatch.getAndSet(null);
if (lastBatch instanceof Batch) {
((Batch) lastBatch).seal();
}
} else {
Batch batch = concurrentBatch.get();
if (batch != null) {
batch.sealIfFlushNeeded();
Object batch = concurrentBatch.get();
if (batch instanceof Batch) {
((Batch) batch).sealIfFlushNeeded();
} else {
needsToShutdown = true;
// batch == null means that HttpPostEmitter is terminated. Batch object could also be Integer, if some
// thread just failed with a serious error in onSealExclusive(), in this case we don't want to shutdown
// the emitter thread.
needsToShutdown = batch == null;
}
}
return needsToShutdown;
Expand Down