Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 25 additions & 2 deletions java-util/src/main/java/io/druid/java/util/emitter/core/Batch.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package io.druid.java.util.emitter.core;

import com.google.common.base.Preconditions;
import io.druid.java.util.common.logger.Logger;

import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.AbstractQueuedLongSynchronizer;
Expand All @@ -44,6 +45,8 @@
*/
class Batch extends AbstractQueuedLongSynchronizer
{
private static final Logger log = new Logger(Batch.class);

private static final long PARTY = 1L << 32;
private static final long SEAL_BIT = 1L << 63;

Expand Down Expand Up @@ -278,6 +281,9 @@ protected boolean tryReleaseShared(long tag)
if (compareAndSetState(state, newState)) {
// Ensures only one thread calls emitter.onSealExclusive() for each batch.
if (!isSealed(state)) {
log.debug("Unlocked and sealed batch [%d]", batchNumber);
debugLogState("old state", state);
debugLogState("new state", newState);
emitter.onSealExclusive(
this,
firstEventTimestamp > 0 ? System.currentTimeMillis() - firstEventTimestamp : -1
Expand All @@ -296,6 +302,9 @@ protected boolean tryReleaseShared(long tag)
}
long newState = state | SEAL_BIT;
if (compareAndSetState(state, newState)) {
log.debug("Sealed batch [%d]", batchNumber);
debugLogState("old state", state);
debugLogState("new state", newState);
emitter.onSealExclusive(
this,
firstEventTimestamp > 0 ? System.currentTimeMillis() - firstEventTimestamp : -1
Expand Down Expand Up @@ -323,10 +332,24 @@ protected long tryAcquireShared(long ignored)
public String toString()
{
long state = getState();
return "Batch{"
+ "bufferWatermark=" + bufferWatermark(state) +
return "Batch{" +
"batchNumber=" + batchNumber +
", bufferWatermark=" + bufferWatermark(state) +
", parties=" + parties(state) +
", isSealed=" + isSealed(state) +
"}";
}

private static void debugLogState(String name, long state)
{
if (log.isDebugEnabled()) {
log.debug(
"%s[bufferWatermark=%d, parties=%d, isSealed=%s]",
name,
bufferWatermark(state),
parties(state),
isSealed(state)
);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.asynchttpclient.RequestBuilder;
import org.asynchttpclient.Response;

import javax.annotation.Nullable;
import java.io.Closeable;
import java.io.Flushable;
import java.io.IOException;
Expand Down Expand Up @@ -224,6 +225,7 @@ public void emit(Event event)
}

@VisibleForTesting
@Nullable
Batch emitAndReturnBatch(Event event)
{
awaitStarted();
Expand Down Expand Up @@ -252,6 +254,8 @@ Batch emitAndReturnBatch(Event event)
}
if (batch.tryAddEvent(eventBytes)) {
return batch;
} else {
log.debug("Failed to emit an event in batch [%s]", batch);
}
// Spin loop, until the thread calling onSealExclusive() updates the concurrentBatch. This update becomes visible
// eventually, because concurrentBatch.get() is a volatile read.
Expand Down