Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,11 @@
<artifactId>validation-api</artifactId>
<version>1.1.0.Final</version>
</dependency>
<dependency>
<groupId>org.asynchttpclient</groupId>
<artifactId>async-http-client</artifactId>
<version>2.0.37</version>
</dependency>

<!-- Extra dependencies for server-metrics -->
<dependency>
Expand Down
29 changes: 19 additions & 10 deletions src/main/java/com/metamx/emitter/core/BaseHttpEmittingConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,13 @@ public class BaseHttpEmittingConfig
public static final long DEFAULT_FLUSH_MILLIS = 60 * 1000;
public static final int DEFAULT_FLUSH_COUNTS = 500;
public static final int DEFAULT_MAX_BATCH_SIZE = 5 * 1024 * 1024;
public static final long DEFAULT_MAX_BUFFER_SIZE = 250 * 1024 * 1024;
/** Do not time out in case flushTimeOut is not set */
public static final long DEFAULT_FLUSH_TIME_OUT = Long.MAX_VALUE;
public static final String DEFAULT_BASIC_AUTHENTICATION = null;
public static final BatchingStrategy DEFAULT_BATCHING_STRATEGY = BatchingStrategy.ARRAY;
public static final ContentEncoding DEFAULT_CONTENT_ENCODING = null;
public static final int DEFAULT_BATCH_QUEUE_SIZE_LIMIT = 50;
public static final float DEFAULT_HTTP_TIMEOUT_ALLOWANCE_FACTOR = 1.5f;

@Min(1)
@JsonProperty
Expand All @@ -54,12 +55,16 @@ public class BaseHttpEmittingConfig
@JsonProperty
int maxBatchSize = DEFAULT_MAX_BATCH_SIZE;

@JsonProperty
ContentEncoding contentEncoding = DEFAULT_CONTENT_ENCODING;

@Min(0)
@JsonProperty
long maxBufferSize = DEFAULT_MAX_BUFFER_SIZE;
int batchQueueSizeLimit = DEFAULT_BATCH_QUEUE_SIZE_LIMIT;

@Min(1)
@JsonProperty
ContentEncoding contentEncoding = DEFAULT_CONTENT_ENCODING;
float httpTimeoutAllowanceFactor = DEFAULT_HTTP_TIMEOUT_ALLOWANCE_FACTOR;

public long getFlushMillis()
{
Expand Down Expand Up @@ -90,15 +95,18 @@ public int getMaxBatchSize()
return maxBatchSize;
}

public long getMaxBufferSize()
{
return maxBufferSize;
}

public ContentEncoding getContentEncoding() {
return contentEncoding;
}

public int getBatchQueueSizeLimit() {
return batchQueueSizeLimit;
}

public float getHttpTimeoutAllowanceFactor() {
return httpTimeoutAllowanceFactor;
}

@Override
public String toString()
{
Expand All @@ -114,7 +122,8 @@ protected String toStringBase()
", basicAuthentication='" + basicAuthentication + '\'' +
", batchingStrategy=" + batchingStrategy +
", maxBatchSize=" + maxBatchSize +
", maxBufferSize=" + maxBufferSize +
", contentEncoding=" + contentEncoding;
", contentEncoding=" + contentEncoding +
", batchQueueSizeLimit=" + batchQueueSizeLimit +
", httpTimeoutAllowanceFactor=" + httpTimeoutAllowanceFactor;
}
}
10 changes: 8 additions & 2 deletions src/main/java/com/metamx/emitter/core/Batch.java
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,10 @@ protected boolean tryReleaseShared(long tag)
if (compareAndSetState(state, newState)) {
// Ensures only one thread calls emitter.onSealExclusive() for each batch.
if (!isSealed(state)) {
emitter.onSealExclusive(this);
emitter.onSealExclusive(
this,
firstEventTimestamp > 0 ? System.currentTimeMillis() - firstEventTimestamp : -1
);
}
return isEmittingAllowed(newState);
}
Expand All @@ -290,7 +293,10 @@ protected boolean tryReleaseShared(long tag)
}
long newState = state | SEAL_BIT;
if (compareAndSetState(state, newState)) {
emitter.onSealExclusive(this);
emitter.onSealExclusive(
this,
firstEventTimestamp > 0 ? System.currentTimeMillis() - firstEventTimestamp : -1
);
return isEmittingAllowed(newState);
}
}
Expand Down
14 changes: 9 additions & 5 deletions src/main/java/com/metamx/emitter/core/Emitters.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@
import com.metamx.common.lifecycle.Lifecycle;
import com.metamx.common.logger.Logger;
import com.metamx.emitter.core.factory.EmitterFactory;
import com.metamx.http.client.HttpClient;
import org.asynchttpclient.AsyncHttpClient;

import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
Expand All @@ -36,12 +37,12 @@ public class Emitters
private static final String HTTP_EMITTER_PROP = "com.metamx.emitter.http";
private static final String CUSTOM_EMITTER_TYPE_PROP = "com.metamx.emitter.type";

public static Emitter create(Properties props, HttpClient httpClient, Lifecycle lifecycle)
public static Emitter create(Properties props, AsyncHttpClient httpClient, Lifecycle lifecycle)
{
return create(props, httpClient, new ObjectMapper(), lifecycle);
}

public static Emitter create(Properties props, HttpClient httpClient, ObjectMapper jsonMapper, Lifecycle lifecycle)
public static Emitter create(Properties props, AsyncHttpClient httpClient, ObjectMapper jsonMapper, Lifecycle lifecycle)
{
Map<String, Object> jsonified = Maps.newHashMap();
if (props.getProperty(LOG_EMITTER_PROP) != null) {
Expand Down Expand Up @@ -95,8 +96,11 @@ static Map<String, Object> makeHttpMap(Properties props)
if (props.containsKey("com.metamx.emitter.http.maxBatchSize")) {
httpMap.put("maxBatchSize", Integer.parseInt(props.getProperty("com.metamx.emitter.http.maxBatchSize")));
}
if (props.containsKey("com.metamx.emitter.http.maxBufferSize")) {
httpMap.put("maxBufferSize", Long.parseLong(props.getProperty("com.metamx.emitter.http.maxBufferSize")));
if (props.containsKey("com.metamx.emitter.http.batchQueueSizeLimit")) {
httpMap.put("batchQueueSizeLimit", Integer.parseInt(props.getProperty("com.metamx.emitter.http.batchQueueSizeLimit")));
}
if (props.containsKey("com.metamx.emitter.http.httpTimeoutAllowanceFactor")) {
httpMap.put("httpTimeoutAllowanceFactor", Float.parseFloat(props.getProperty("com.metamx.emitter.http.httpTimeoutAllowanceFactor")));
}
return httpMap;
}
Expand Down
17 changes: 12 additions & 5 deletions src/main/java/com/metamx/emitter/core/HttpEmitterConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,9 @@ public HttpEmitterConfig(BaseHttpEmittingConfig base, String recipientBaseUrl)
this.basicAuthentication = base.basicAuthentication;
this.batchingStrategy = base.batchingStrategy;
this.maxBatchSize = base.maxBatchSize;
this.maxBufferSize = base.maxBufferSize;
this.contentEncoding = base.contentEncoding;
this.batchQueueSizeLimit = base.batchQueueSizeLimit;
this.httpTimeoutAllowanceFactor = base.httpTimeoutAllowanceFactor;
}

public String getRecipientBaseUrl()
Expand Down Expand Up @@ -103,15 +104,21 @@ public Builder setMaxBatchSize(int maxBatchSize)
return this;
}

public Builder setMaxBufferSize(long maxBufferSize)
public Builder setContentEncoding(ContentEncoding contentEncoding)
{
this.maxBufferSize = maxBufferSize;
this.contentEncoding = contentEncoding;
return this;
}

public Builder setContentEncoding(ContentEncoding contentEncoding)
public Builder setBatchQueueSizeLimit(int batchQueueSizeLimit)
{
this.contentEncoding = contentEncoding;
this.batchQueueSizeLimit = batchQueueSizeLimit;
return this;
}

public Builder setHttpTimeoutAllowanceFactor(float httpTimeoutAllowanceFactor)
{
this.httpTimeoutAllowanceFactor = httpTimeoutAllowanceFactor;
return this;
}

Expand Down
Loading