diff --git a/java-util/src/main/java/io/druid/java/util/emitter/core/Batch.java b/java-util/src/main/java/io/druid/java/util/emitter/core/Batch.java index 16dbab4532f1..aaf9f11fee82 100644 --- a/java-util/src/main/java/io/druid/java/util/emitter/core/Batch.java +++ b/java-util/src/main/java/io/druid/java/util/emitter/core/Batch.java @@ -20,6 +20,7 @@ package io.druid.java.util.emitter.core; import com.google.common.base.Preconditions; +import io.druid.java.util.common.logger.Logger; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.AbstractQueuedLongSynchronizer; @@ -44,6 +45,8 @@ */ class Batch extends AbstractQueuedLongSynchronizer { + private static final Logger log = new Logger(Batch.class); + private static final long PARTY = 1L << 32; private static final long SEAL_BIT = 1L << 63; @@ -278,6 +281,9 @@ protected boolean tryReleaseShared(long tag) if (compareAndSetState(state, newState)) { // Ensures only one thread calls emitter.onSealExclusive() for each batch. if (!isSealed(state)) { + log.debug("Unlocked and sealed batch [%d]", batchNumber); + debugLogState("old state", state); + debugLogState("new state", newState); emitter.onSealExclusive( this, firstEventTimestamp > 0 ? System.currentTimeMillis() - firstEventTimestamp : -1 @@ -296,6 +302,9 @@ protected boolean tryReleaseShared(long tag) } long newState = state | SEAL_BIT; if (compareAndSetState(state, newState)) { + log.debug("Sealed batch [%d]", batchNumber); + debugLogState("old state", state); + debugLogState("new state", newState); emitter.onSealExclusive( this, firstEventTimestamp > 0 ? System.currentTimeMillis() - firstEventTimestamp : -1 @@ -323,10 +332,24 @@ protected long tryAcquireShared(long ignored) public String toString() { long state = getState(); - return "Batch{" - + "bufferWatermark=" + bufferWatermark(state) + + return "Batch{" + + "batchNumber=" + batchNumber + + ", bufferWatermark=" + bufferWatermark(state) + ", parties=" + parties(state) + ", isSealed=" + isSealed(state) + "}"; } + + private static void debugLogState(String name, long state) + { + if (log.isDebugEnabled()) { + log.debug( + "%s[bufferWatermark=%d, parties=%d, isSealed=%s]", + name, + bufferWatermark(state), + parties(state), + isSealed(state) + ); + } + } } diff --git a/java-util/src/main/java/io/druid/java/util/emitter/core/HttpPostEmitter.java b/java-util/src/main/java/io/druid/java/util/emitter/core/HttpPostEmitter.java index 20f29443da45..8cfbff7a5bc9 100644 --- a/java-util/src/main/java/io/druid/java/util/emitter/core/HttpPostEmitter.java +++ b/java-util/src/main/java/io/druid/java/util/emitter/core/HttpPostEmitter.java @@ -37,6 +37,7 @@ import org.asynchttpclient.RequestBuilder; import org.asynchttpclient.Response; +import javax.annotation.Nullable; import java.io.Closeable; import java.io.Flushable; import java.io.IOException; @@ -223,6 +224,7 @@ public void emit(Event event) } @VisibleForTesting + @Nullable Batch emitAndReturnBatch(Event event) { awaitStarted(); @@ -251,6 +253,8 @@ Batch emitAndReturnBatch(Event event) } if (batch.tryAddEvent(eventBytes)) { return batch; + } else { + log.debug("Failed to emit an event in batch [%s]", batch); } // Spin loop, until the thread calling onSealExclusive() updates the concurrentBatch. This update becomes visible // eventually, because concurrentBatch.get() is a volatile read.