diff --git a/pom.xml b/pom.xml
index bd0fae49..fd6641c8 100644
--- a/pom.xml
+++ b/pom.xml
@@ -147,6 +147,11 @@
validation-api
1.1.0.Final
+
+ org.asynchttpclient
+ async-http-client
+ 2.0.37
+
diff --git a/src/main/java/com/metamx/emitter/core/BaseHttpEmittingConfig.java b/src/main/java/com/metamx/emitter/core/BaseHttpEmittingConfig.java
index 909f3c37..c909d8d8 100644
--- a/src/main/java/com/metamx/emitter/core/BaseHttpEmittingConfig.java
+++ b/src/main/java/com/metamx/emitter/core/BaseHttpEmittingConfig.java
@@ -25,12 +25,13 @@ public class BaseHttpEmittingConfig
public static final long DEFAULT_FLUSH_MILLIS = 60 * 1000;
public static final int DEFAULT_FLUSH_COUNTS = 500;
public static final int DEFAULT_MAX_BATCH_SIZE = 5 * 1024 * 1024;
- public static final long DEFAULT_MAX_BUFFER_SIZE = 250 * 1024 * 1024;
/** Do not time out in case flushTimeOut is not set */
public static final long DEFAULT_FLUSH_TIME_OUT = Long.MAX_VALUE;
public static final String DEFAULT_BASIC_AUTHENTICATION = null;
public static final BatchingStrategy DEFAULT_BATCHING_STRATEGY = BatchingStrategy.ARRAY;
public static final ContentEncoding DEFAULT_CONTENT_ENCODING = null;
+ public static final int DEFAULT_BATCH_QUEUE_SIZE_LIMIT = 50;
+ public static final float DEFAULT_HTTP_TIMEOUT_ALLOWANCE_FACTOR = 1.5f;
@Min(1)
@JsonProperty
@@ -54,12 +55,16 @@ public class BaseHttpEmittingConfig
@JsonProperty
int maxBatchSize = DEFAULT_MAX_BATCH_SIZE;
+ @JsonProperty
+ ContentEncoding contentEncoding = DEFAULT_CONTENT_ENCODING;
+
@Min(0)
@JsonProperty
- long maxBufferSize = DEFAULT_MAX_BUFFER_SIZE;
+ int batchQueueSizeLimit = DEFAULT_BATCH_QUEUE_SIZE_LIMIT;
+ @Min(1)
@JsonProperty
- ContentEncoding contentEncoding = DEFAULT_CONTENT_ENCODING;
+ float httpTimeoutAllowanceFactor = DEFAULT_HTTP_TIMEOUT_ALLOWANCE_FACTOR;
public long getFlushMillis()
{
@@ -90,15 +95,18 @@ public int getMaxBatchSize()
return maxBatchSize;
}
- public long getMaxBufferSize()
- {
- return maxBufferSize;
- }
-
public ContentEncoding getContentEncoding() {
return contentEncoding;
}
+ public int getBatchQueueSizeLimit() {
+ return batchQueueSizeLimit;
+ }
+
+ public float getHttpTimeoutAllowanceFactor() {
+ return httpTimeoutAllowanceFactor;
+ }
+
@Override
public String toString()
{
@@ -114,7 +122,8 @@ protected String toStringBase()
", basicAuthentication='" + basicAuthentication + '\'' +
", batchingStrategy=" + batchingStrategy +
", maxBatchSize=" + maxBatchSize +
- ", maxBufferSize=" + maxBufferSize +
- ", contentEncoding=" + contentEncoding;
+ ", contentEncoding=" + contentEncoding +
+ ", batchQueueSizeLimit=" + batchQueueSizeLimit +
+ ", httpTimeoutAllowanceFactor=" + httpTimeoutAllowanceFactor;
}
}
diff --git a/src/main/java/com/metamx/emitter/core/Batch.java b/src/main/java/com/metamx/emitter/core/Batch.java
index 61940397..b9d23603 100644
--- a/src/main/java/com/metamx/emitter/core/Batch.java
+++ b/src/main/java/com/metamx/emitter/core/Batch.java
@@ -275,7 +275,10 @@ protected boolean tryReleaseShared(long tag)
if (compareAndSetState(state, newState)) {
// Ensures only one thread calls emitter.onSealExclusive() for each batch.
if (!isSealed(state)) {
- emitter.onSealExclusive(this);
+ emitter.onSealExclusive(
+ this,
+ firstEventTimestamp > 0 ? System.currentTimeMillis() - firstEventTimestamp : -1
+ );
}
return isEmittingAllowed(newState);
}
@@ -290,7 +293,10 @@ protected boolean tryReleaseShared(long tag)
}
long newState = state | SEAL_BIT;
if (compareAndSetState(state, newState)) {
- emitter.onSealExclusive(this);
+ emitter.onSealExclusive(
+ this,
+ firstEventTimestamp > 0 ? System.currentTimeMillis() - firstEventTimestamp : -1
+ );
return isEmittingAllowed(newState);
}
}
diff --git a/src/main/java/com/metamx/emitter/core/Emitters.java b/src/main/java/com/metamx/emitter/core/Emitters.java
index 5db20e8b..a8fcd384 100644
--- a/src/main/java/com/metamx/emitter/core/Emitters.java
+++ b/src/main/java/com/metamx/emitter/core/Emitters.java
@@ -23,7 +23,8 @@
import com.metamx.common.lifecycle.Lifecycle;
import com.metamx.common.logger.Logger;
import com.metamx.emitter.core.factory.EmitterFactory;
-import com.metamx.http.client.HttpClient;
+import org.asynchttpclient.AsyncHttpClient;
+
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
@@ -36,12 +37,12 @@ public class Emitters
private static final String HTTP_EMITTER_PROP = "com.metamx.emitter.http";
private static final String CUSTOM_EMITTER_TYPE_PROP = "com.metamx.emitter.type";
- public static Emitter create(Properties props, HttpClient httpClient, Lifecycle lifecycle)
+ public static Emitter create(Properties props, AsyncHttpClient httpClient, Lifecycle lifecycle)
{
return create(props, httpClient, new ObjectMapper(), lifecycle);
}
- public static Emitter create(Properties props, HttpClient httpClient, ObjectMapper jsonMapper, Lifecycle lifecycle)
+ public static Emitter create(Properties props, AsyncHttpClient httpClient, ObjectMapper jsonMapper, Lifecycle lifecycle)
{
Map jsonified = Maps.newHashMap();
if (props.getProperty(LOG_EMITTER_PROP) != null) {
@@ -95,8 +96,11 @@ static Map makeHttpMap(Properties props)
if (props.containsKey("com.metamx.emitter.http.maxBatchSize")) {
httpMap.put("maxBatchSize", Integer.parseInt(props.getProperty("com.metamx.emitter.http.maxBatchSize")));
}
- if (props.containsKey("com.metamx.emitter.http.maxBufferSize")) {
- httpMap.put("maxBufferSize", Long.parseLong(props.getProperty("com.metamx.emitter.http.maxBufferSize")));
+ if (props.containsKey("com.metamx.emitter.http.batchQueueSizeLimit")) {
+ httpMap.put("batchQueueSizeLimit", Integer.parseInt(props.getProperty("com.metamx.emitter.http.batchQueueSizeLimit")));
+ }
+ if (props.containsKey("com.metamx.emitter.http.httpTimeoutAllowanceFactor")) {
+ httpMap.put("httpTimeoutAllowanceFactor", Float.parseFloat(props.getProperty("com.metamx.emitter.http.httpTimeoutAllowanceFactor")));
}
return httpMap;
}
diff --git a/src/main/java/com/metamx/emitter/core/HttpEmitterConfig.java b/src/main/java/com/metamx/emitter/core/HttpEmitterConfig.java
index 5a4817db..2cbb72c4 100644
--- a/src/main/java/com/metamx/emitter/core/HttpEmitterConfig.java
+++ b/src/main/java/com/metamx/emitter/core/HttpEmitterConfig.java
@@ -42,8 +42,9 @@ public HttpEmitterConfig(BaseHttpEmittingConfig base, String recipientBaseUrl)
this.basicAuthentication = base.basicAuthentication;
this.batchingStrategy = base.batchingStrategy;
this.maxBatchSize = base.maxBatchSize;
- this.maxBufferSize = base.maxBufferSize;
this.contentEncoding = base.contentEncoding;
+ this.batchQueueSizeLimit = base.batchQueueSizeLimit;
+ this.httpTimeoutAllowanceFactor = base.httpTimeoutAllowanceFactor;
}
public String getRecipientBaseUrl()
@@ -103,15 +104,21 @@ public Builder setMaxBatchSize(int maxBatchSize)
return this;
}
- public Builder setMaxBufferSize(long maxBufferSize)
+ public Builder setContentEncoding(ContentEncoding contentEncoding)
{
- this.maxBufferSize = maxBufferSize;
+ this.contentEncoding = contentEncoding;
return this;
}
- public Builder setContentEncoding(ContentEncoding contentEncoding)
+ public Builder setBatchQueueSizeLimit(int batchQueueSizeLimit)
{
- this.contentEncoding = contentEncoding;
+ this.batchQueueSizeLimit = batchQueueSizeLimit;
+ return this;
+ }
+
+ public Builder setHttpTimeoutAllowanceFactor(float httpTimeoutAllowanceFactor)
+ {
+ this.httpTimeoutAllowanceFactor = httpTimeoutAllowanceFactor;
return this;
}
diff --git a/src/main/java/com/metamx/emitter/core/HttpPostEmitter.java b/src/main/java/com/metamx/emitter/core/HttpPostEmitter.java
index 12d2ecef..3cab6b99 100644
--- a/src/main/java/com/metamx/emitter/core/HttpPostEmitter.java
+++ b/src/main/java/com/metamx/emitter/core/HttpPostEmitter.java
@@ -18,22 +18,21 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Charsets;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.base.Throwables;
+import com.google.common.primitives.Ints;
import com.metamx.common.ISE;
import com.metamx.common.RetryUtils;
import com.metamx.common.StringUtils;
import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.common.logger.Logger;
-import com.metamx.http.client.HttpClient;
-import com.metamx.http.client.Request;
-import com.metamx.http.client.response.StatusResponseHandler;
-import com.metamx.http.client.response.StatusResponseHolder;
+import org.asynchttpclient.AsyncHttpClient;
+import org.asynchttpclient.ListenableFuture;
+import org.asynchttpclient.RequestBuilder;
+import org.asynchttpclient.Response;
import org.jboss.netty.handler.codec.http.HttpHeaders;
-import org.jboss.netty.handler.codec.http.HttpMethod;
import java.io.Closeable;
import java.io.Flushable;
@@ -41,10 +40,14 @@
import java.net.MalformedURLException;
import java.net.URL;
import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
import java.util.ArrayDeque;
+import java.util.Base64;
import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@@ -59,6 +62,26 @@ public class HttpPostEmitter implements Flushable, Closeable, Emitter
private static final int MAX_EVENT_SIZE = 1023 * 1024; // Set max size slightly less than 1M to allow for metadata
private static final int MAX_SEND_RETRIES = 3;
+
+ /**
+ * Threshold of the size of {@link #buffersToEmit} when switch from using {@link
+ * BaseHttpEmittingConfig#getHttpTimeoutAllowanceFactor()} to {@link #EQUILIBRIUM_ALLOWANCE_FACTOR}
+ */
+ private static final int EMIT_QUEUE_THRESHOLD_1 = 5;
+
+ /**
+ * Threshold of the size of {@link #buffersToEmit} when switch from using {@link #EQUILIBRIUM_ALLOWANCE_FACTOR}
+ * to {@link #TIGHT_ALLOWANCE_FACTOR}.
+ */
+ private static final int EMIT_QUEUE_THRESHOLD_2 = 10;
+
+ /**
+ * 0.9 is to give room for unexpected latency or time out not being respected rigorously.
+ */
+ private static final double EQUILIBRIUM_ALLOWANCE_FACTOR = 0.9;
+
+ private static final double TIGHT_ALLOWANCE_FACTOR = 0.5;
+
/**
* Used in {@link EmittingThread#emitLargeEvents()} to ensure fair emitting of both large events and batched events.
*/
@@ -72,9 +95,9 @@ public class HttpPostEmitter implements Flushable, Closeable, Emitter
private final int bufferSize;
final int maxBufferWatermark;
private final int largeEventThreshold;
- private final HttpClient client;
+ private final AsyncHttpClient client;
private final ObjectMapper jsonMapper;
- private final URL url;
+ private final String url;
private final ConcurrentLinkedQueue buffersToReuse = new ConcurrentLinkedQueue<>();
/**
@@ -88,7 +111,7 @@ public class HttpPostEmitter implements Flushable, Closeable, Emitter
*/
private final AtomicReference concurrentBatch = new AtomicReference<>();
- private final ConcurrentLinkedQueue buffersToEmit = new ConcurrentLinkedQueue<>();
+ private final ConcurrentLinkedDeque buffersToEmit = new ConcurrentLinkedDeque<>();
/** See {@link #approximateBuffersToReuseCount} */
private final AtomicInteger approximateBuffersToEmitCount = new AtomicInteger();
/** See {@link #approximateBuffersToReuseCount} */
@@ -102,17 +125,20 @@ public class HttpPostEmitter implements Flushable, Closeable, Emitter
private final EmittingThread emittingThread = new EmittingThread();
private final AtomicLong totalEmittedEvents = new AtomicLong();
private final AtomicInteger allocatedBuffers = new AtomicInteger();
+ private final AtomicInteger droppedBuffers = new AtomicInteger();
+
+ private volatile long lastFillTimeMillis = 0;
private final Object startLock = new Object();
private final CountDownLatch startLatch = new CountDownLatch(1);
private boolean running = false;
- public HttpPostEmitter(HttpEmitterConfig config, HttpClient client)
+ public HttpPostEmitter(HttpEmitterConfig config, AsyncHttpClient client)
{
this(config, client, new ObjectMapper());
}
- public HttpPostEmitter(HttpEmitterConfig config, HttpClient client, ObjectMapper jsonMapper)
+ public HttpPostEmitter(HttpEmitterConfig config, AsyncHttpClient client, ObjectMapper jsonMapper)
{
batchingStrategy = config.getBatchingStrategy();
final int batchOverhead = batchingStrategy.batchStartLength() + batchingStrategy.batchEndLength();
@@ -124,13 +150,6 @@ public HttpPostEmitter(HttpEmitterConfig config, HttpClient client, ObjectMapper
batchOverhead
)
);
- Preconditions.checkArgument(
- config.getMaxBufferSize() >= MAX_EVENT_SIZE,
- String.format(
- "maxBufferSize must be greater than MAX_EVENT_SIZE[%,d].",
- MAX_EVENT_SIZE
- )
- );
this.config = config;
this.bufferSize = config.getMaxBatchSize();
this.maxBufferWatermark = bufferSize - batchingStrategy.batchEndLength();
@@ -139,7 +158,7 @@ public HttpPostEmitter(HttpEmitterConfig config, HttpClient client, ObjectMapper
this.client = client;
this.jsonMapper = jsonMapper;
try {
- this.url = new URL(config.getRecipientBaseUrl());
+ this.url = new URL(config.getRecipientBaseUrl()).toString();
}
catch (MalformedURLException e) {
throw new ISE(e, "Bad URL: %s", config.getRecipientBaseUrl());
@@ -247,8 +266,11 @@ private void writeLargeEvent(byte[] eventBytes)
/**
* Called from {@link Batch} only once for each Batch in existence.
*/
- void onSealExclusive(Batch batch)
+ void onSealExclusive(Batch batch, long elapsedTimeMillis)
{
+ if (elapsedTimeMillis > 0) {
+ lastFillTimeMillis = elapsedTimeMillis;
+ }
addBatchToEmitQueue(batch);
wakeUpEmittingThread();
if (!isTerminated()) {
@@ -262,14 +284,34 @@ void onSealExclusive(Batch batch)
private void addBatchToEmitQueue(Batch batch)
{
- buffersToEmit.add(batch);
+ limitBuffersToEmitSize();
+ buffersToEmit.addLast(batch);
approximateBuffersToEmitCount.incrementAndGet();
approximateEventsToEmitCount.addAndGet(batch.eventCount.get());
}
+ private void limitBuffersToEmitSize()
+ {
+ if (approximateBuffersToEmitCount.get() >= config.getBatchQueueSizeLimit()) {
+ Batch droppedBatch = buffersToEmit.pollFirst();
+ if (droppedBatch != null) {
+ finalizeBatch(droppedBatch);
+ approximateBuffersToEmitCount.decrementAndGet();
+ approximateEventsToEmitCount.addAndGet(-droppedBatch.eventCount.get());
+ droppedBuffers.incrementAndGet();
+ log.error("buffersToEmit queue size reached the limit, dropping the oldest buffer to emit");
+ }
+ }
+ }
+
+ private void finalizeBatch(Batch batch) {
+ // Notify HttpPostEmitter.flush(), that the batch is emitted, or failed, or dropped.
+ emittedBatchCounter.batchEmitted(batch.batchNumber);
+ }
+
private Batch pollBatchFromEmitQueue()
{
- Batch result = buffersToEmit.poll();
+ Batch result = buffersToEmit.pollFirst();
if (result == null) {
return null;
}
@@ -428,17 +470,27 @@ private void emit(final Batch batch)
);
int bufferEndOffset = batchingStrategy.writeBatchEnd(batch.buffer, bufferWatermark);
- if (sendWithRetries(batch.buffer, bufferEndOffset, eventCount)) {
+ if (sendWithRetries(batch.buffer, bufferEndOffset, eventCount, true)) {
buffersToReuse.add(batch.buffer);
approximateBuffersToReuseCount.incrementAndGet();
} else {
- failedBuffers.add(new FailedBuffer(batch.buffer, bufferEndOffset, eventCount));
+ limitFailedBuffersSize();
+ failedBuffers.addLast(new FailedBuffer(batch.buffer, bufferEndOffset, eventCount));
approximateFailedBuffersCount.incrementAndGet();
}
}
finally {
- // Notify HttpPostEmitter.flush(), that the batch is emitted (or failed).
- emittedBatchCounter.batchEmitted(batch.batchNumber);
+ finalizeBatch(batch);
+ }
+ }
+
+ private void limitFailedBuffersSize()
+ {
+ if (failedBuffers.size() >= config.getBatchQueueSizeLimit()) {
+ failedBuffers.removeFirst();
+ approximateFailedBuffersCount.decrementAndGet();
+ droppedBuffers.incrementAndGet();
+ log.error("failedBuffers queue size reached the limit, dropping the oldest failed buffer");
}
}
@@ -464,22 +516,23 @@ private void emitLargeEvent(byte[] eventBytes)
System.arraycopy(eventBytes, 0, buffer, bufferOffset, eventBytes.length);
bufferOffset += eventBytes.length;
bufferOffset = batchingStrategy.writeBatchEnd(buffer, bufferOffset);
- if (sendWithRetries(buffer, bufferOffset, 1)) {
+ if (sendWithRetries(buffer, bufferOffset, 1, true)) {
buffersToReuse.add(buffer);
approximateBuffersToReuseCount.incrementAndGet();
} else {
- failedBuffers.add(new FailedBuffer(buffer, bufferOffset, 1));
+ limitFailedBuffersSize();
+ failedBuffers.addLast(new FailedBuffer(buffer, bufferOffset, 1));
approximateFailedBuffersCount.incrementAndGet();
}
}
private void tryEmitOneFailedBuffer()
{
- FailedBuffer failedBuffer = failedBuffers.peek();
+ FailedBuffer failedBuffer = failedBuffers.peekFirst();
if (failedBuffer != null) {
- if (sendWithRetries(failedBuffer.buffer, failedBuffer.length, failedBuffer.eventCount)) {
+ if (sendWithRetries(failedBuffer.buffer, failedBuffer.length, failedBuffer.eventCount, false)) {
// Remove from the queue of failed buffer.
- failedBuffers.poll();
+ failedBuffers.pollFirst();
approximateFailedBuffersCount.decrementAndGet();
// Don't add the failed buffer back to the buffersToReuse queue here, because in a situation when we were not
// able to emit events for a while we don't have a way to discard buffers that were used to accumulate events
@@ -491,8 +544,8 @@ private void tryEmitOneFailedBuffer()
private void tryEmitAndDrainAllFailedBuffers()
{
- for (FailedBuffer failedBuffer; (failedBuffer = failedBuffers.poll()) != null; ) {
- sendWithRetries(failedBuffer.buffer, failedBuffer.length, failedBuffer.eventCount);
+ for (FailedBuffer failedBuffer; (failedBuffer = failedBuffers.pollFirst()) != null; ) {
+ sendWithRetries(failedBuffer.buffer, failedBuffer.length, failedBuffer.eventCount, false);
approximateFailedBuffersCount.decrementAndGet();
}
}
@@ -500,8 +553,9 @@ private void tryEmitAndDrainAllFailedBuffers()
/**
* Returns true if sent successfully.
*/
- private boolean sendWithRetries(final byte[] buffer, final int length, final int eventCount)
+ private boolean sendWithRetries(final byte[] buffer, final int length, final int eventCount, boolean withTimeout)
{
+ long deadLineMillis = System.currentTimeMillis() + sendRequestTimeoutMillis();
try {
RetryUtils.retry(
new Callable()
@@ -518,6 +572,9 @@ public Void call() throws Exception
@Override
public boolean apply(Throwable input)
{
+ if (withTimeout && deadLineMillis - System.currentTimeMillis() <= 0) { // overflow-aware
+ return false;
+ }
return !(input instanceof InterruptedException);
}
},
@@ -537,7 +594,8 @@ public boolean apply(Throwable input)
private void send(byte[] buffer, int length) throws Exception
{
- final Request request = new Request(HttpMethod.POST, url);
+ final RequestBuilder request = new RequestBuilder("POST");
+ request.setUrl(url);
byte[] payload;
int payloadLength;
ContentEncoding contentEncoding = config.getContentEncoding();
@@ -560,18 +618,40 @@ private void send(byte[] buffer, int length) throws Exception
}
- request.setContent("application/json", payload, 0, payloadLength);
+ request.setHeader(HttpHeaders.Names.CONTENT_TYPE, "application/json");
+ request.setHeader(HttpHeaders.Names.CONTENT_LENGTH, String.valueOf(payloadLength));
+ request.setBody(ByteBuffer.wrap(payload, 0, payloadLength));
if (config.getBasicAuthentication() != null) {
final String[] parts = config.getBasicAuthentication().split(":", 2);
final String user = parts[0];
final String password = parts.length > 1 ? parts[1] : "";
- request.setBasicAuthentication(user, password);
+ String encoded = Base64.getEncoder().encodeToString((user + ':' + password).getBytes(StandardCharsets.UTF_8));
+ request.setHeader(HttpHeaders.Names.AUTHORIZATION, "Basic " + encoded);
}
- final StatusResponseHolder response = client.go(request, new StatusResponseHandler(Charsets.UTF_8)).get();
+ long lastFillTimeMillis = HttpPostEmitter.this.lastFillTimeMillis;
+ final long timeoutMillis = sendRequestTimeoutMillis();
+ request.setRequestTimeout(Ints.saturatedCast(timeoutMillis));
- if (response.getStatus().getCode() == 413) {
+ ListenableFuture future = client.executeRequest(request);
+ Response response;
+ try {
+ response = future.get(timeoutMillis, TimeUnit.MILLISECONDS);
+ }
+ catch (Exception e) {
+ if (e instanceof TimeoutException ||
+ (e instanceof ExecutionException && e.getCause() instanceof TimeoutException)) {
+ log.error(
+ "Timing out emitter batch send, last batch fill time [%,d] ms, timeout [%,d] ms",
+ lastFillTimeMillis,
+ timeoutMillis
+ );
+ }
+ throw e;
+ }
+
+ if (response.getStatusCode() == 413) {
throw new ISE(
"Received HTTP status 413 from [%s]. Batch size of [%d] may be too large, "
+ "try adjusting maxBatchSizeBatch property",
@@ -580,15 +660,30 @@ private void send(byte[] buffer, int length) throws Exception
);
}
- if (response.getStatus().getCode() / 100 != 2) {
+ if (response.getStatusCode() / 100 != 2) {
throw new ISE(
- "Emissions of events not successful[%s], with message[%s].",
- response.getStatus(),
- response.getContent().trim()
+ "Emissions of events not successful[%d: %s], with message[%s].",
+ response.getStatusCode(),
+ response.getStatusText(),
+ response.getResponseBody(StandardCharsets.UTF_8).trim()
);
}
}
+ private long sendRequestTimeoutMillis()
+ {
+ int emitQueueSize = approximateBuffersToEmitCount.get();
+ if (emitQueueSize < EMIT_QUEUE_THRESHOLD_1) {
+ return (long) (lastFillTimeMillis * config.httpTimeoutAllowanceFactor);
+ }
+ if (emitQueueSize < EMIT_QUEUE_THRESHOLD_2) {
+ // The idea is to not let buffersToEmit queue to grow faster than we can emit buffers.
+ return (long) (lastFillTimeMillis * EQUILIBRIUM_ALLOWANCE_FACTOR);
+ }
+ // If buffersToEmit still grows, try to restrict even more
+ return (long) (lastFillTimeMillis * TIGHT_ALLOWANCE_FACTOR);
+ }
+
GZIPOutputStream acquireGzipOutputStream(int length) throws IOException
{
if (gzipBaos == null) {
@@ -656,6 +751,11 @@ public int getFailedBuffers()
return emittingThread.approximateFailedBuffersCount.get();
}
+ public int getDroppedBuffers()
+ {
+ return droppedBuffers.get();
+ }
+
public long getTotalEmittedEvents()
{
return totalEmittedEvents.get();
diff --git a/src/main/java/com/metamx/emitter/core/ParametrizedUriEmitter.java b/src/main/java/com/metamx/emitter/core/ParametrizedUriEmitter.java
index 66081eee..d62ec8a5 100644
--- a/src/main/java/com/metamx/emitter/core/ParametrizedUriEmitter.java
+++ b/src/main/java/com/metamx/emitter/core/ParametrizedUriEmitter.java
@@ -7,7 +7,7 @@
import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.common.logger.Logger;
-import com.metamx.http.client.HttpClient;
+import org.asynchttpclient.AsyncHttpClient;
import javax.annotation.concurrent.GuardedBy;
import java.io.Closeable;
@@ -47,13 +47,13 @@ private static UriExtractor makeUriExtractor(ParametrizedUriEmitterConfig config
@GuardedBy("startCloseLock")
private boolean closed = false;
private final Lifecycle innerLifecycle = new Lifecycle();
- private final HttpClient client;
+ private final AsyncHttpClient client;
private final ObjectMapper jsonMapper;
private final ParametrizedUriEmitterConfig config;
public ParametrizedUriEmitter(
ParametrizedUriEmitterConfig config,
- HttpClient client,
+ AsyncHttpClient client,
ObjectMapper jsonMapper
)
{
@@ -62,7 +62,7 @@ public ParametrizedUriEmitter(
public ParametrizedUriEmitter(
ParametrizedUriEmitterConfig config,
- HttpClient client,
+ AsyncHttpClient client,
ObjectMapper jsonMapper,
UriExtractor uriExtractor
)
diff --git a/src/main/java/com/metamx/emitter/core/factory/EmitterFactory.java b/src/main/java/com/metamx/emitter/core/factory/EmitterFactory.java
index cd8822ce..5959c3dc 100644
--- a/src/main/java/com/metamx/emitter/core/factory/EmitterFactory.java
+++ b/src/main/java/com/metamx/emitter/core/factory/EmitterFactory.java
@@ -5,7 +5,7 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.metamx.common.lifecycle.Lifecycle;
import com.metamx.emitter.core.Emitter;
-import com.metamx.http.client.HttpClient;
+import org.asynchttpclient.AsyncHttpClient;
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
@JsonSubTypes(value = {
@@ -16,5 +16,5 @@
})
public interface EmitterFactory
{
- Emitter makeEmitter(ObjectMapper objectMapper, HttpClient httpClient, Lifecycle lifecycle);
+ Emitter makeEmitter(ObjectMapper objectMapper, AsyncHttpClient httpClient, Lifecycle lifecycle);
}
diff --git a/src/main/java/com/metamx/emitter/core/factory/HttpEmitterFactory.java b/src/main/java/com/metamx/emitter/core/factory/HttpEmitterFactory.java
index b6d5541f..7e032a07 100644
--- a/src/main/java/com/metamx/emitter/core/factory/HttpEmitterFactory.java
+++ b/src/main/java/com/metamx/emitter/core/factory/HttpEmitterFactory.java
@@ -5,13 +5,13 @@
import com.metamx.emitter.core.Emitter;
import com.metamx.emitter.core.HttpEmitterConfig;
import com.metamx.emitter.core.HttpPostEmitter;
-import com.metamx.http.client.HttpClient;
+import org.asynchttpclient.AsyncHttpClient;
public class HttpEmitterFactory extends HttpEmitterConfig implements EmitterFactory
{
@Override
- public Emitter makeEmitter(ObjectMapper objectMapper, HttpClient httpClient, Lifecycle lifecycle)
+ public Emitter makeEmitter(ObjectMapper objectMapper, AsyncHttpClient httpClient, Lifecycle lifecycle)
{
Emitter retVal = new HttpPostEmitter(this, httpClient, objectMapper);
lifecycle.addManagedInstance(retVal);
diff --git a/src/main/java/com/metamx/emitter/core/factory/LoggingEmitterFactory.java b/src/main/java/com/metamx/emitter/core/factory/LoggingEmitterFactory.java
index 7753d186..547ec2ce 100644
--- a/src/main/java/com/metamx/emitter/core/factory/LoggingEmitterFactory.java
+++ b/src/main/java/com/metamx/emitter/core/factory/LoggingEmitterFactory.java
@@ -5,14 +5,14 @@
import com.metamx.emitter.core.Emitter;
import com.metamx.emitter.core.LoggingEmitter;
import com.metamx.emitter.core.LoggingEmitterConfig;
-import com.metamx.http.client.HttpClient;
+import org.asynchttpclient.AsyncHttpClient;
public class LoggingEmitterFactory extends LoggingEmitterConfig implements EmitterFactory
{
public LoggingEmitterFactory() {}
@Override
- public Emitter makeEmitter(ObjectMapper objectMapper, HttpClient httpClient, Lifecycle lifecycle)
+ public Emitter makeEmitter(ObjectMapper objectMapper, AsyncHttpClient httpClient, Lifecycle lifecycle)
{
return makeEmitter(objectMapper, lifecycle);
}
diff --git a/src/main/java/com/metamx/emitter/core/factory/NoopEmiterFactory.java b/src/main/java/com/metamx/emitter/core/factory/NoopEmiterFactory.java
index c600be1d..44a47b9b 100644
--- a/src/main/java/com/metamx/emitter/core/factory/NoopEmiterFactory.java
+++ b/src/main/java/com/metamx/emitter/core/factory/NoopEmiterFactory.java
@@ -4,12 +4,12 @@
import com.metamx.common.lifecycle.Lifecycle;
import com.metamx.emitter.core.Emitter;
import com.metamx.emitter.core.NoopEmitter;
-import com.metamx.http.client.HttpClient;
+import org.asynchttpclient.AsyncHttpClient;
public class NoopEmiterFactory implements EmitterFactory
{
@Override
- public Emitter makeEmitter(ObjectMapper objectMapper, HttpClient httpClient, Lifecycle lifecycle)
+ public Emitter makeEmitter(ObjectMapper objectMapper, AsyncHttpClient httpClient, Lifecycle lifecycle)
{
return makeEmitter(lifecycle);
}
diff --git a/src/main/java/com/metamx/emitter/core/factory/ParametrizedUriEmitterFactory.java b/src/main/java/com/metamx/emitter/core/factory/ParametrizedUriEmitterFactory.java
index 44942763..a9f7bf01 100644
--- a/src/main/java/com/metamx/emitter/core/factory/ParametrizedUriEmitterFactory.java
+++ b/src/main/java/com/metamx/emitter/core/factory/ParametrizedUriEmitterFactory.java
@@ -5,13 +5,13 @@
import com.metamx.emitter.core.Emitter;
import com.metamx.emitter.core.ParametrizedUriEmitter;
import com.metamx.emitter.core.ParametrizedUriEmitterConfig;
-import com.metamx.http.client.HttpClient;
+import org.asynchttpclient.AsyncHttpClient;
public class ParametrizedUriEmitterFactory extends ParametrizedUriEmitterConfig implements EmitterFactory
{
@Override
- public Emitter makeEmitter(ObjectMapper objectMapper, HttpClient httpClient, Lifecycle lifecycle)
+ public Emitter makeEmitter(ObjectMapper objectMapper, AsyncHttpClient httpClient, Lifecycle lifecycle)
{
final Emitter retVal = new ParametrizedUriEmitter(this, httpClient, objectMapper);
lifecycle.addManagedInstance(retVal);
diff --git a/src/main/java/com/metamx/http/client/CredentialedHttpClient.java b/src/main/java/com/metamx/http/client/CredentialedHttpClient.java
index daba088e..0bed9849 100644
--- a/src/main/java/com/metamx/http/client/CredentialedHttpClient.java
+++ b/src/main/java/com/metamx/http/client/CredentialedHttpClient.java
@@ -22,11 +22,8 @@
import com.google.common.util.concurrent.ListenableFuture;
import com.metamx.http.client.auth.Credentials;
import com.metamx.http.client.response.HttpResponseHandler;
-import org.jboss.netty.handler.codec.http.HttpMethod;
import org.joda.time.Duration;
-import java.net.URL;
-
/**
*/
public class CredentialedHttpClient extends AbstractHttpClient
diff --git a/src/main/java/com/metamx/metrics/HttpPostEmitterMonitor.java b/src/main/java/com/metamx/metrics/HttpPostEmitterMonitor.java
index 9dff7858..499c8ac5 100644
--- a/src/main/java/com/metamx/metrics/HttpPostEmitterMonitor.java
+++ b/src/main/java/com/metamx/metrics/HttpPostEmitterMonitor.java
@@ -28,7 +28,7 @@ public class HttpPostEmitterMonitor extends FeedDefiningMonitor
private final ImmutableMap extraDimensions;
private final ServiceMetricEvent.Builder builder;
private long lastTotalEmittedEvents = 0;
- private int lastTotalAllocatedBuffers = 0;
+ private int lastDroppedBuffers = 0;
public HttpPostEmitterMonitor(
String feed,
@@ -50,13 +50,14 @@ public boolean doMonitor(ServiceEmitter emitter)
emitter.emit(builder.build("emitter/events/emitted", totalEmittedEventsDiff));
lastTotalEmittedEvents = newTotalEmittedEvents;
- int newTotalAllocatedBuffers = httpPostEmitter.getTotalAllocatedBuffers();
- int totalAllocatedBuffersDiff = newTotalAllocatedBuffers - lastTotalAllocatedBuffers;
- emitter.emit(builder.build("emitter/buffers/allocated", totalAllocatedBuffersDiff));
- lastTotalAllocatedBuffers = newTotalAllocatedBuffers;
+ int newDroppedBuffers = httpPostEmitter.getDroppedBuffers();
+ int droppedBuffersDiff = newDroppedBuffers - lastDroppedBuffers;
+ emitter.emit(builder.build("emitter/buffers/dropped", droppedBuffersDiff));
+ lastDroppedBuffers = newDroppedBuffers;
emitter.emit(builder.build("emitter/events/emitQueue", httpPostEmitter.getEventsToEmit()));
emitter.emit(builder.build("emitter/events/large/emitQueue", httpPostEmitter.getLargeEventsToEmit()));
+ emitter.emit(builder.build("emitter/buffers/totalAllocated", httpPostEmitter.getTotalAllocatedBuffers()));
emitter.emit(builder.build("emitter/buffers/emitQueue", httpPostEmitter.getBuffersToEmit()));
emitter.emit(builder.build("emitter/buffers/failed", httpPostEmitter.getFailedBuffers()));
emitter.emit(builder.build("emitter/buffers/reuseQueue", httpPostEmitter.getBuffersToReuse()));
diff --git a/src/test/java/com/metamx/emitter/core/CustomEmitterFactoryTest.java b/src/test/java/com/metamx/emitter/core/CustomEmitterFactoryTest.java
index 7cc3ff83..2099eccd 100644
--- a/src/test/java/com/metamx/emitter/core/CustomEmitterFactoryTest.java
+++ b/src/test/java/com/metamx/emitter/core/CustomEmitterFactoryTest.java
@@ -5,7 +5,7 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.metamx.common.lifecycle.Lifecycle;
import com.metamx.emitter.core.factory.EmitterFactory;
-import com.metamx.http.client.HttpClient;
+import org.asynchttpclient.AsyncHttpClient;
import org.junit.Assert;
import org.junit.Test;
@@ -23,7 +23,7 @@ public static class TestEmitterConfig implements EmitterFactory
private int intProperty;
@Override
- public Emitter makeEmitter(ObjectMapper objectMapper, HttpClient httpClient, Lifecycle lifecycle)
+ public Emitter makeEmitter(ObjectMapper objectMapper, AsyncHttpClient httpClient, Lifecycle lifecycle)
{
return new StubEmitter(stringProperty, intProperty);
}
diff --git a/src/test/java/com/metamx/emitter/core/EmitterTest.java b/src/test/java/com/metamx/emitter/core/EmitterTest.java
index acf6f3a0..a80ddb94 100644
--- a/src/test/java/com/metamx/emitter/core/EmitterTest.java
+++ b/src/test/java/com/metamx/emitter/core/EmitterTest.java
@@ -16,27 +16,25 @@
package com.metamx.emitter.core;
+import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Charsets;
-import com.google.common.collect.ImmutableList;
import com.google.common.io.BaseEncoding;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
import com.metamx.common.CompressionUtils;
import com.metamx.common.lifecycle.Lifecycle;
import com.metamx.emitter.service.UnitEvent;
-import com.metamx.http.client.GoHandler;
-import com.metamx.http.client.GoHandlers;
-import com.metamx.http.client.MockHttpClient;
-import com.metamx.http.client.Request;
-import com.metamx.http.client.response.HttpResponseHandler;
-import com.metamx.http.client.response.StatusResponseHandler;
-import com.metamx.http.client.response.StatusResponseHolder;
-import org.jboss.netty.handler.codec.http.DefaultHttpResponse;
-import org.jboss.netty.handler.codec.http.HttpHeaders;
-import org.jboss.netty.handler.codec.http.HttpResponseStatus;
-import org.jboss.netty.handler.codec.http.HttpVersion;
-import org.joda.time.Duration;
+import io.netty.buffer.Unpooled;
+import io.netty.handler.codec.http.DefaultHttpResponse;
+import io.netty.handler.codec.http.HttpHeaders;
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.netty.handler.codec.http.HttpVersion;
+import org.asynchttpclient.DefaultAsyncHttpClientConfig;
+import org.asynchttpclient.ListenableFuture;
+import org.asynchttpclient.Request;
+import org.asynchttpclient.Response;
+import org.asynchttpclient.netty.EagerResponseBodyPart;
+import org.asynchttpclient.netty.NettyResponseStatus;
+import org.asynchttpclient.uri.Uri;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@@ -45,7 +43,8 @@
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
-import java.net.URL;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
@@ -57,16 +56,29 @@
public class EmitterTest
{
private static final ObjectMapper jsonMapper = new ObjectMapper();
- public static final StatusResponseHolder OK_RESPONSE = new StatusResponseHolder(
- new HttpResponseStatus(201, "Created"),
- new StringBuilder("Yay")
- );
public static String TARGET_URL = "http://metrics.foo.bar/";
+ public static final Response OK_RESPONSE = responseBuilder(HttpVersion.HTTP_1_1, HttpResponseStatus.CREATED)
+ .accumulate(new EagerResponseBodyPart(Unpooled.wrappedBuffer("Yay".getBytes(StandardCharsets.UTF_8)), true))
+ .build();
+
+ private static Response.ResponseBuilder responseBuilder(HttpVersion version, HttpResponseStatus status)
+ {
+ return new Response.ResponseBuilder()
+ .accumulate(
+ new NettyResponseStatus(
+ Uri.create(TARGET_URL),
+ new DefaultAsyncHttpClientConfig.Builder().build(),
+ new DefaultHttpResponse(version, status),
+ null
+ )
+ );
+ }
+
MockHttpClient httpClient;
HttpPostEmitter emitter;
- public static StatusResponseHolder okResponse()
+ public static Response okResponse()
{
return OK_RESPONSE;
}
@@ -157,7 +169,6 @@ private HttpPostEmitter manualFlushEmitterWithBasicAuthenticationAndNewlineSepar
.setBasicAuthentication(authentication)
.setBatchingStrategy(BatchingStrategy.NEWLINES)
.setMaxBatchSize(1024 * 1024)
- .setMaxBufferSize(100 * 1024 * 1024)
.build();
HttpPostEmitter emitter = new HttpPostEmitter(
config,
@@ -174,7 +185,6 @@ private HttpPostEmitter manualFlushEmitterWithBatchSizeAndBufferSize(int batchSi
.setFlushMillis(Long.MAX_VALUE)
.setFlushCount(Integer.MAX_VALUE)
.setMaxBatchSize(batchSize)
- .setMaxBufferSize(bufferSize)
.build();
HttpPostEmitter emitter = new HttpPostEmitter(
config,
@@ -198,11 +208,11 @@ public void testSanity() throws Exception
new GoHandler()
{
@Override
- public ListenableFuture go(Request request, HttpResponseHandler handler, Duration requestReadTimeout) throws Exception
+ protected ListenableFuture go(Request request) throws JsonProcessingException
{
- Assert.assertEquals(new URL(TARGET_URL), request.getUrl());
+ Assert.assertEquals(TARGET_URL, request.getUrl());
Assert.assertEquals(
- ImmutableList.of("application/json"),
+ "application/json",
request.getHeaders().get(HttpHeaders.Names.CONTENT_TYPE)
);
Assert.assertEquals(
@@ -211,14 +221,10 @@ public ListenableFuture go(Request request, HttpRes
jsonMapper.writeValueAsString(events.get(0)),
jsonMapper.writeValueAsString(events.get(1))
),
- request.getContent().toString(Charsets.UTF_8)
- );
- Assert.assertTrue(
- "handler is a StatusResponseHandler",
- handler instanceof StatusResponseHandler
+ Charsets.UTF_8.decode(request.getByteBufferData().slice()).toString()
);
- return Futures.immediateFuture((Final) okResponse());
+ return GoHandlers.immediateFuture(okResponse());
}
}.times(1)
);
@@ -244,11 +250,11 @@ public void testSanityWithGeneralizedCreation() throws Exception
new GoHandler()
{
@Override
- public ListenableFuture go(Request request, HttpResponseHandler handler, Duration requestReadTimeout) throws Exception
+ protected ListenableFuture go(Request request) throws JsonProcessingException
{
- Assert.assertEquals(new URL(TARGET_URL), request.getUrl());
+ Assert.assertEquals(TARGET_URL, request.getUrl());
Assert.assertEquals(
- ImmutableList.of("application/json"),
+ "application/json",
request.getHeaders().get(HttpHeaders.Names.CONTENT_TYPE)
);
Assert.assertEquals(
@@ -257,14 +263,10 @@ public ListenableFuture go(Request request, HttpRes
jsonMapper.writeValueAsString(events.get(0)),
jsonMapper.writeValueAsString(events.get(1))
),
- request.getContent().toString(Charsets.UTF_8)
- );
- Assert.assertTrue(
- "handler is a StatusResponseHandler",
- handler instanceof StatusResponseHandler
+ Charsets.UTF_8.decode(request.getByteBufferData().slice()).toString()
);
- return Futures.immediateFuture((Final) okResponse());
+ return GoHandlers.immediateFuture(okResponse());
}
}.times(1)
);
@@ -310,11 +312,10 @@ public void testTimeBasedEmission() throws Exception
new GoHandler()
{
@Override
- public ListenableFuture go(Request intermediateFinalRequest, HttpResponseHandler handler, Duration requestReadTimeout)
- throws Exception
+ protected ListenableFuture go(Request request)
{
latch.countDown();
- return Futures.immediateFuture((Final) okResponse());
+ return GoHandlers.immediateFuture(okResponse());
}
}.times(1)
);
@@ -336,11 +337,10 @@ public ListenableFuture go(Request intermediateFina
new GoHandler()
{
@Override
- public ListenableFuture go(Request intermediateFinalRequest, HttpResponseHandler handler, Duration requestReadTimeout)
- throws Exception
+ protected ListenableFuture go(Request request)
{
thisLatch.countDown();
- return Futures.immediateFuture((Final) okResponse());
+ return GoHandlers.immediateFuture(okResponse());
}
}.times(1)
);
@@ -372,19 +372,10 @@ public void testFailedEmission() throws Exception
new GoHandler()
{
@Override
- public ListenableFuture go(Request request, HttpResponseHandler handler, Duration requestReadTimeout)
- throws Exception
+ protected ListenableFuture go(Request request)
{
- final Intermediate obj = handler
- .handleResponse(
- new DefaultHttpResponse(
- HttpVersion.HTTP_1_1,
- HttpResponseStatus.BAD_REQUEST
- )
- )
- .getObj();
- Assert.assertNotNull(obj);
- return Futures.immediateFuture((Final) obj);
+ Response response = responseBuilder(HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST).build();
+ return GoHandlers.immediateFuture(response);
}
}
);
@@ -400,10 +391,9 @@ public ListenableFuture go(Request request, HttpRes
new GoHandler()
{
@Override
- public ListenableFuture go(Request request, HttpResponseHandler handler, Duration requestReadTimeout)
- throws Exception
+ protected ListenableFuture go(Request request)
{
- return Futures.immediateFuture((Final) okResponse());
+ return GoHandlers.immediateFuture(okResponse());
}
}.times(2)
);
@@ -432,15 +422,15 @@ public void testBasicAuthenticationAndNewlineSeparating() throws Exception
new GoHandler()
{
@Override
- public ListenableFuture go(Request request, HttpResponseHandler handler, Duration requestReadTimeout) throws Exception
+ protected ListenableFuture go(Request request) throws JsonProcessingException
{
- Assert.assertEquals(new URL(TARGET_URL), request.getUrl());
+ Assert.assertEquals(TARGET_URL, request.getUrl());
Assert.assertEquals(
- ImmutableList.of("application/json"),
+ "application/json",
request.getHeaders().get(HttpHeaders.Names.CONTENT_TYPE)
);
Assert.assertEquals(
- ImmutableList.of("Basic " + BaseEncoding.base64().encode("foo:bar".getBytes())),
+ "Basic " + BaseEncoding.base64().encode("foo:bar".getBytes()),
request.getHeaders().get(HttpHeaders.Names.AUTHORIZATION)
);
Assert.assertEquals(
@@ -449,14 +439,10 @@ public ListenableFuture go(Request request, HttpRes
jsonMapper.writeValueAsString(events.get(0)),
jsonMapper.writeValueAsString(events.get(1))
),
- request.getContent().toString(Charsets.UTF_8)
- );
- Assert.assertTrue(
- "handler is a StatusResponseHandler",
- handler instanceof StatusResponseHandler
+ Charsets.UTF_8.decode(request.getByteBufferData().slice()).toString()
);
- return Futures.immediateFuture((Final) okResponse());
+ return GoHandlers.immediateFuture(okResponse());
}
}.times(1)
);
@@ -492,11 +478,11 @@ public void testBatchSplitting() throws Exception
new GoHandler()
{
@Override
- public ListenableFuture go(Request request, HttpResponseHandler handler, Duration requestReadTimeout) throws Exception
+ protected ListenableFuture go(Request request) throws JsonProcessingException
{
- Assert.assertEquals(new URL(TARGET_URL), request.getUrl());
+ Assert.assertEquals(TARGET_URL, request.getUrl());
Assert.assertEquals(
- ImmutableList.of("application/json"),
+ "application/json",
request.getHeaders().get(HttpHeaders.Names.CONTENT_TYPE)
);
Assert.assertEquals(
@@ -505,14 +491,10 @@ public ListenableFuture go(Request request, HttpRes
jsonMapper.writeValueAsString(events.get(counter.getAndIncrement())),
jsonMapper.writeValueAsString(events.get(counter.getAndIncrement()))
),
- request.getContent().toString(Charsets.UTF_8)
- );
- Assert.assertTrue(
- "handler is a StatusResponseHandler",
- handler instanceof StatusResponseHandler
+ Charsets.UTF_8.decode(request.getByteBufferData().slice()).toString()
);
- return Futures.immediateFuture((Final) okResponse());
+ return GoHandlers.immediateFuture(okResponse());
}
}.times(3)
);
@@ -544,20 +526,23 @@ public void testGzipContentEncoding() throws Exception
new GoHandler()
{
@Override
- public ListenableFuture go(Request request, HttpResponseHandler handler, Duration requestReadTimeout) throws Exception
+ protected ListenableFuture go(Request request) throws IOException
{
- Assert.assertEquals(new URL(TARGET_URL), request.getUrl());
+ Assert.assertEquals(TARGET_URL, request.getUrl());
Assert.assertEquals(
- ImmutableList.of("application/json"),
+ "application/json",
request.getHeaders().get(HttpHeaders.Names.CONTENT_TYPE)
);
Assert.assertEquals(
- ImmutableList.of(HttpHeaders.Values.GZIP),
+ HttpHeaders.Values.GZIP,
request.getHeaders().get(HttpHeaders.Names.CONTENT_ENCODING)
);
ByteArrayOutputStream baos = new ByteArrayOutputStream();
- CompressionUtils.gunzip(new ByteArrayInputStream(request.getContent().array()), baos);
+ ByteBuffer data = request.getByteBufferData().slice();
+ byte[] dataArray = new byte[data.remaining()];
+ data.get(dataArray);
+ CompressionUtils.gunzip(new ByteArrayInputStream(dataArray), baos);
Assert.assertEquals(
String.format(
@@ -567,12 +552,8 @@ public ListenableFuture go(Request request, HttpRes
),
baos.toString(Charsets.UTF_8.name())
);
- Assert.assertTrue(
- "handler is a StatusResponseHandler",
- handler instanceof StatusResponseHandler
- );
- return Futures.immediateFuture((Final) okResponse());
+ return GoHandlers.immediateFuture(okResponse());
}
}.times(1)
);
diff --git a/src/test/java/com/metamx/http/client/GoHandler.java b/src/test/java/com/metamx/emitter/core/GoHandler.java
similarity index 52%
rename from src/test/java/com/metamx/http/client/GoHandler.java
rename to src/test/java/com/metamx/emitter/core/GoHandler.java
index fcdeaee3..72767bbe 100644
--- a/src/test/java/com/metamx/http/client/GoHandler.java
+++ b/src/test/java/com/metamx/emitter/core/GoHandler.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2011 - 2015 Metamarkets Group Inc.
+ * Copyright 2016 Metamarkets Group Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -12,15 +12,16 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
+ *
*/
-package com.metamx.http.client;
+package com.metamx.emitter.core;
import com.google.common.base.Throwables;
-import com.google.common.util.concurrent.ListenableFuture;
import com.metamx.common.ISE;
-import com.metamx.http.client.response.HttpResponseHandler;
-import org.joda.time.Duration;
+import org.asynchttpclient.ListenableFuture;
+import org.asynchttpclient.Request;
+import org.asynchttpclient.Response;
import java.util.concurrent.atomic.AtomicInteger;
@@ -29,11 +30,7 @@
public abstract class GoHandler
{
/******* Abstract Methods *********/
- protected abstract ListenableFuture go(
- Request request,
- HttpResponseHandler handler,
- Duration requestReadTimeout
- ) throws Exception;
+ protected abstract ListenableFuture go(Request request) throws X;
/******* Non Abstract Methods ********/
private volatile boolean succeeded = false;
@@ -43,28 +40,15 @@ public boolean succeeded()
return succeeded;
}
- public ListenableFuture run(
- Request request,
- HttpResponseHandler handler
- ) throws Exception
- {
- return run(request, handler, null);
- }
-
- public ListenableFuture run(
- Request request,
- HttpResponseHandler handler,
- Duration requestReadTimeout
- ) throws Exception
+ public ListenableFuture run(Request request)
{
try {
- final ListenableFuture retVal = go(request, handler, requestReadTimeout);
+ final ListenableFuture retVal = go(request);
succeeded = true;
return retVal;
}
catch (Throwable e) {
succeeded = false;
- Throwables.propagateIfPossible(e, Exception.class);
throw Throwables.propagate(e);
}
}
@@ -78,14 +62,10 @@ public GoHandler times(final int n)
AtomicInteger counter = new AtomicInteger(0);
@Override
- public ListenableFuture go(
- final Request request,
- final HttpResponseHandler handler,
- final Duration requestReadTimeout
- ) throws Exception
+ public ListenableFuture go(final Request request)
{
if (counter.getAndIncrement() < n) {
- return myself.go(request, handler, requestReadTimeout);
+ return myself.go(request);
}
succeeded = false;
throw new ISE("Called more than %d times", n);
diff --git a/src/test/java/com/metamx/emitter/core/GoHandlers.java b/src/test/java/com/metamx/emitter/core/GoHandlers.java
new file mode 100644
index 00000000..2013f175
--- /dev/null
+++ b/src/test/java/com/metamx/emitter/core/GoHandlers.java
@@ -0,0 +1,123 @@
+/*
+ * Copyright 2016 Metamarkets Group Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package com.metamx.emitter.core;
+
+import com.metamx.common.ISE;
+import org.asynchttpclient.ListenableFuture;
+import org.asynchttpclient.Request;
+import org.asynchttpclient.Response;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ */
+public class GoHandlers
+{
+ public static GoHandler failingHandler()
+ {
+ return new GoHandler()
+ {
+ @Override
+ protected ListenableFuture go(Request request)
+ {
+ throw new ISE("Shouldn't be called");
+ }
+ };
+ }
+
+ public static GoHandler passingHandler(final Response retVal)
+ {
+ return new GoHandler()
+ {
+ @Override
+ protected ListenableFuture go(Request request)
+ {
+ return immediateFuture(retVal);
+ }
+ };
+ }
+
+ static ListenableFuture immediateFuture(T val)
+ {
+ CompletableFuture future = CompletableFuture.completedFuture(val);
+ return new ListenableFuture()
+ {
+ @Override
+ public void done()
+ {
+ }
+
+ @Override
+ public void abort(Throwable t)
+ {
+ }
+
+ @Override
+ public void touch()
+ {
+ }
+
+ @Override
+ public ListenableFuture addListener(Runnable listener, Executor exec)
+ {
+ future.thenAcceptAsync(r -> listener.run(), exec);
+ return this;
+ }
+
+ @Override
+ public CompletableFuture toCompletableFuture()
+ {
+ return future;
+ }
+
+ @Override
+ public boolean cancel(boolean mayInterruptIfRunning)
+ {
+ return false;
+ }
+
+ @Override
+ public boolean isCancelled()
+ {
+ return false;
+ }
+
+ @Override
+ public boolean isDone()
+ {
+ return true;
+ }
+
+ @Override
+ public T get() throws InterruptedException, ExecutionException
+ {
+ return future.get();
+ }
+
+ @Override
+ public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException
+ {
+ return future.get(timeout, unit);
+ }
+ };
+ }
+}
diff --git a/src/test/java/com/metamx/emitter/core/HttpEmitterConfigTest.java b/src/test/java/com/metamx/emitter/core/HttpEmitterConfigTest.java
index e8570d85..b2dca547 100644
--- a/src/test/java/com/metamx/emitter/core/HttpEmitterConfigTest.java
+++ b/src/test/java/com/metamx/emitter/core/HttpEmitterConfigTest.java
@@ -38,8 +38,9 @@ public void testDefaults(){
Assert.assertEquals(null, config.getBasicAuthentication());
Assert.assertEquals(BatchingStrategy.ARRAY, config.getBatchingStrategy());
Assert.assertEquals(5 * 1024 * 1024, config.getMaxBatchSize());
- Assert.assertEquals(250 * 1024 * 1024, config.getMaxBufferSize());
Assert.assertEquals(Long.MAX_VALUE, config.getFlushTimeOut());
+ Assert.assertEquals(50, config.getBatchQueueSizeLimit());
+ Assert.assertEquals(1.5f, config.getHttpTimeoutAllowanceFactor(), 0.0f);
}
@Test
@@ -57,8 +58,9 @@ public void testDefaultsLegacy()
Assert.assertEquals(null, config.getBasicAuthentication());
Assert.assertEquals(BatchingStrategy.ARRAY, config.getBatchingStrategy());
Assert.assertEquals(5 * 1024 * 1024, config.getMaxBatchSize());
- Assert.assertEquals(250 * 1024 * 1024, config.getMaxBufferSize());
Assert.assertEquals(Long.MAX_VALUE, config.getFlushTimeOut());
+ Assert.assertEquals(50, config.getBatchQueueSizeLimit());
+ Assert.assertEquals(1.5f, config.getHttpTimeoutAllowanceFactor(), 0.0f);
}
@Test
@@ -71,8 +73,9 @@ public void testSettingEverything()
props.setProperty("com.metamx.emitter.basicAuthentication", "a:b");
props.setProperty("com.metamx.emitter.batchingStrategy", "NEWLINES");
props.setProperty("com.metamx.emitter.maxBatchSize", "4");
- props.setProperty("com.metamx.emitter.maxBufferSize", "8");
props.setProperty("com.metamx.emitter.flushTimeOut", "1000");
+ props.setProperty("com.metamx.emitter.batchQueueSizeLimit", "2500");
+ props.setProperty("com.metamx.emitter.httpTimeoutAllowanceFactor", "3.0");
final ObjectMapper objectMapper = new ObjectMapper();
final HttpEmitterConfig config = objectMapper.convertValue(Emitters.makeCustomFactoryMap(props), HttpEmitterConfig.class);
@@ -83,8 +86,9 @@ public void testSettingEverything()
Assert.assertEquals("a:b", config.getBasicAuthentication());
Assert.assertEquals(BatchingStrategy.NEWLINES, config.getBatchingStrategy());
Assert.assertEquals(4, config.getMaxBatchSize());
- Assert.assertEquals(8, config.getMaxBufferSize());
Assert.assertEquals(1000, config.getFlushTimeOut());
+ Assert.assertEquals(2500, config.getBatchQueueSizeLimit());
+ Assert.assertEquals(3.0f, config.getHttpTimeoutAllowanceFactor(), 0.0f);
}
@Test
@@ -97,8 +101,9 @@ public void testSettingEverythingLegacy()
props.setProperty("com.metamx.emitter.http.basicAuthentication", "a:b");
props.setProperty("com.metamx.emitter.http.batchingStrategy", "newlines");
props.setProperty("com.metamx.emitter.http.maxBatchSize", "4");
- props.setProperty("com.metamx.emitter.http.maxBufferSize", "8");
props.setProperty("com.metamx.emitter.http.flushTimeOut", "1000");
+ props.setProperty("com.metamx.emitter.http.batchQueueSizeLimit", "2500");
+ props.setProperty("com.metamx.emitter.http.httpTimeoutAllowanceFactor", "3.0");
final ObjectMapper objectMapper = new ObjectMapper();
final HttpEmitterConfig config = objectMapper.convertValue(Emitters.makeHttpMap(props), HttpEmitterConfig.class);
@@ -109,7 +114,8 @@ public void testSettingEverythingLegacy()
Assert.assertEquals("a:b", config.getBasicAuthentication());
Assert.assertEquals(BatchingStrategy.NEWLINES, config.getBatchingStrategy());
Assert.assertEquals(4, config.getMaxBatchSize());
- Assert.assertEquals(8, config.getMaxBufferSize());
Assert.assertEquals(1000, config.getFlushTimeOut());
+ Assert.assertEquals(2500, config.getBatchQueueSizeLimit());
+ Assert.assertEquals(3.0f, config.getHttpTimeoutAllowanceFactor(), 0.0f);
}
}
diff --git a/src/test/java/com/metamx/emitter/core/HttpEmitterTest.java b/src/test/java/com/metamx/emitter/core/HttpEmitterTest.java
new file mode 100644
index 00000000..e932e3fa
--- /dev/null
+++ b/src/test/java/com/metamx/emitter/core/HttpEmitterTest.java
@@ -0,0 +1,82 @@
+/*
+ * Copyright 2017 Metamarkets Group Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.metamx.emitter.core;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.primitives.Ints;
+import org.asynchttpclient.Request;
+import org.asynchttpclient.Response;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicLong;
+
+public class HttpEmitterTest
+{
+ private final MockHttpClient httpClient = new MockHttpClient();
+ private static final ObjectMapper objectMapper = new ObjectMapper()
+ {
+ @Override
+ public byte[] writeValueAsBytes(Object value) throws JsonProcessingException
+ {
+ return Ints.toByteArray(((IntEvent) value).index);
+ }
+ };
+
+ private final AtomicLong timeoutUsed = new AtomicLong();
+
+ @Before
+ public void setup()
+ {
+ timeoutUsed.set(-1L);
+
+ httpClient.setGoHandler(new GoHandler()
+ {
+ @Override
+ protected org.asynchttpclient.ListenableFuture go(Request request)
+ {
+ int timeout = request.getRequestTimeout();
+ timeoutUsed.set(timeout);
+ return GoHandlers.immediateFuture(EmitterTest.okResponse());
+ }
+ });
+ }
+
+ @Test
+ public void timeoutEmptyQueue() throws IOException, InterruptedException
+ {
+ final HttpEmitterConfig config = new HttpEmitterConfig.Builder("http://foo.bar")
+ .setBatchingStrategy(BatchingStrategy.ONLY_EVENTS)
+ .setHttpTimeoutAllowanceFactor(2.0f)
+ .build();
+ final HttpPostEmitter emitter = new HttpPostEmitter(config, httpClient, objectMapper);
+
+ emitter.start();
+ emitter.emitAndReturnBatch(new IntEvent());
+ emitter.flush();
+ Assert.assertEquals(0, timeoutUsed.get());
+
+ final Batch batch = emitter.emitAndReturnBatch(new IntEvent());
+ Thread.sleep(1000);
+ batch.seal();
+ emitter.flush();
+ Assert.assertTrue(timeoutUsed.get() >= 2000 && timeoutUsed.get() < 3000);
+ }
+}
diff --git a/src/test/java/com/metamx/emitter/core/HttpPostEmitterStressTest.java b/src/test/java/com/metamx/emitter/core/HttpPostEmitterStressTest.java
index db8f1509..a9b63f4a 100644
--- a/src/test/java/com/metamx/emitter/core/HttpPostEmitterStressTest.java
+++ b/src/test/java/com/metamx/emitter/core/HttpPostEmitterStressTest.java
@@ -20,15 +20,11 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.primitives.Ints;
import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.metamx.http.client.GoHandler;
-import com.metamx.http.client.MockHttpClient;
-import com.metamx.http.client.Request;
-import com.metamx.http.client.response.HttpResponseHandler;
import it.unimi.dsi.fastutil.ints.IntArrayList;
import it.unimi.dsi.fastutil.ints.IntList;
-import org.jboss.netty.buffer.ChannelBuffer;
-import org.joda.time.Duration;
+import org.asynchttpclient.ListenableFuture;
+import org.asynchttpclient.Request;
+import org.asynchttpclient.Response;
import org.junit.Test;
import java.io.IOException;
@@ -63,7 +59,6 @@ public void eventCountBased() throws InterruptedException, IOException
.setFlushCount(4)
.setBatchingStrategy(BatchingStrategy.ONLY_EVENTS)
.setMaxBatchSize(1024 * 1024)
- .setMaxBufferSize(1024 * 1024)
.build();
final HttpPostEmitter emitter = new HttpPostEmitter(config, httpClient, objectMapper);
int nThreads = Runtime.getRuntime().availableProcessors() * 2;
@@ -80,15 +75,13 @@ public void eventCountBased() throws InterruptedException, IOException
httpClient.setGoHandler(new GoHandler()
{
@Override
- protected ListenableFuture go(
- Request request, HttpResponseHandler httpResponseHandler, Duration duration
- ) throws Exception
+ protected ListenableFuture go(Request request)
{
- ChannelBuffer batch = request.getContent();
- while (batch.readerIndex() != batch.writerIndex()) {
- emittedEvents.set(batch.readInt());
+ ByteBuffer batch = request.getByteBufferData().slice();
+ while (batch.remaining() > 0) {
+ emittedEvents.set(batch.getInt());
}
- return (ListenableFuture) OK_FUTURE;
+ return GoHandlers.immediateFuture(EmitterTest.okResponse());
}
});
emitter.start();
diff --git a/src/test/java/com/metamx/emitter/core/MockHttpClient.java b/src/test/java/com/metamx/emitter/core/MockHttpClient.java
new file mode 100644
index 00000000..eb92a60e
--- /dev/null
+++ b/src/test/java/com/metamx/emitter/core/MockHttpClient.java
@@ -0,0 +1,55 @@
+/*
+ * Copyright 2016 Metamarkets Group Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package com.metamx.emitter.core;
+
+import org.asynchttpclient.DefaultAsyncHttpClient;
+import org.asynchttpclient.ListenableFuture;
+import org.asynchttpclient.Request;
+import org.asynchttpclient.Response;
+
+/**
+ */
+public class MockHttpClient extends DefaultAsyncHttpClient
+{
+ private volatile GoHandler goHandler;
+
+ public MockHttpClient()
+ {
+ }
+
+ public GoHandler getGoHandler()
+ {
+ return goHandler;
+ }
+
+ public void setGoHandler(GoHandler goHandler)
+ {
+ this.goHandler = goHandler;
+ }
+
+ public boolean succeeded()
+ {
+ return goHandler.succeeded();
+ }
+
+ @Override
+ public ListenableFuture executeRequest(Request request)
+ {
+ return goHandler.run(request);
+ }
+}
diff --git a/src/test/java/com/metamx/emitter/core/ParametrizedUriEmitterConfigTest.java b/src/test/java/com/metamx/emitter/core/ParametrizedUriEmitterConfigTest.java
index 7ff1a3a2..bfce5661 100644
--- a/src/test/java/com/metamx/emitter/core/ParametrizedUriEmitterConfigTest.java
+++ b/src/test/java/com/metamx/emitter/core/ParametrizedUriEmitterConfigTest.java
@@ -23,7 +23,6 @@ public void testDefaults()
Assert.assertEquals(null, config.getBasicAuthentication());
Assert.assertEquals(BatchingStrategy.ARRAY, config.getBatchingStrategy());
Assert.assertEquals(5 * 1024 * 1024, config.getMaxBatchSize());
- Assert.assertEquals(250 * 1024 * 1024, config.getMaxBufferSize());
Assert.assertEquals(Long.MAX_VALUE, config.getFlushTimeOut());
}
@@ -36,7 +35,6 @@ public void testSettingEverything()
props.setProperty("com.metamx.emitter.httpEmitting.basicAuthentication", "a:b");
props.setProperty("com.metamx.emitter.httpEmitting.batchingStrategy", "NEWLINES");
props.setProperty("com.metamx.emitter.httpEmitting.maxBatchSize", "4");
- props.setProperty("com.metamx.emitter.httpEmitting.maxBufferSize", "8");
props.setProperty("com.metamx.emitter.httpEmitting.flushTimeOut", "1000");
final ObjectMapper objectMapper = new ObjectMapper();
@@ -49,7 +47,6 @@ public void testSettingEverything()
Assert.assertEquals("a:b", config.getBasicAuthentication());
Assert.assertEquals(BatchingStrategy.NEWLINES, config.getBatchingStrategy());
Assert.assertEquals(4, config.getMaxBatchSize());
- Assert.assertEquals(8, config.getMaxBufferSize());
Assert.assertEquals(1000, config.getFlushTimeOut());
}
}
diff --git a/src/test/java/com/metamx/emitter/core/ParametrizedUriEmitterTest.java b/src/test/java/com/metamx/emitter/core/ParametrizedUriEmitterTest.java
index 5832eec1..f3d8f3e9 100644
--- a/src/test/java/com/metamx/emitter/core/ParametrizedUriEmitterTest.java
+++ b/src/test/java/com/metamx/emitter/core/ParametrizedUriEmitterTest.java
@@ -1,28 +1,25 @@
package com.metamx.emitter.core;
+import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Charsets;
import com.google.common.collect.ImmutableMap;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
import com.metamx.common.lifecycle.Lifecycle;
import com.metamx.emitter.service.UnitEvent;
-import com.metamx.http.client.GoHandler;
-import com.metamx.http.client.GoHandlers;
-import com.metamx.http.client.MockHttpClient;
-import com.metamx.http.client.Request;
-import com.metamx.http.client.response.HttpResponseHandler;
-import java.net.URL;
+import org.asynchttpclient.ListenableFuture;
+import org.asynchttpclient.Request;
+import org.asynchttpclient.Response;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
-import org.joda.time.Duration;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
+
import static com.metamx.emitter.core.EmitterTest.okResponse;
import static org.junit.Assert.assertEquals;
@@ -78,23 +75,19 @@ public void testEmitterWithFeedUriExtractor() throws Exception
new GoHandler()
{
@Override
- public ListenableFuture go(
- Request request,
- HttpResponseHandler handler,
- Duration requestReadTimeout
- ) throws Exception
+ public ListenableFuture go(Request request) throws JsonProcessingException
{
- Assert.assertEquals(new URL("http://example.com/test"), request.getUrl());
+ Assert.assertEquals("http://example.com/test", request.getUrl());
Assert.assertEquals(
String.format(
"[%s,%s]\n",
jsonMapper.writeValueAsString(events.get(0)),
jsonMapper.writeValueAsString(events.get(1))
),
- request.getContent().toString(Charsets.UTF_8)
+ Charsets.UTF_8.decode(request.getByteBufferData().slice()).toString()
);
- return Futures.immediateFuture((Final) okResponse());
+ return GoHandlers.immediateFuture(okResponse());
}
}.times(1)
);
@@ -121,14 +114,13 @@ public void testEmitterWithMultipleFeeds() throws Exception
new GoHandler()
{
@Override
- public ListenableFuture go(
- Request request,
- HttpResponseHandler handler,
- Duration requestReadTimeout
- ) throws Exception
+ protected ListenableFuture go(Request request)
{
- results.put(request.getUrl().toString(), request.getContent().toString(Charsets.UTF_8));
- return Futures.immediateFuture((Final) okResponse());
+ results.put(
+ request.getUrl().toString(),
+ Charsets.UTF_8.decode(request.getByteBufferData().slice()).toString()
+ );
+ return GoHandlers.immediateFuture(okResponse());
}
}.times(2)
);
@@ -157,23 +149,19 @@ public void testEmitterWithParametrizedUriExtractor() throws Exception
new GoHandler()
{
@Override
- public ListenableFuture go(
- Request request,
- HttpResponseHandler handler,
- Duration requestReadTimeout
- ) throws Exception
+ protected ListenableFuture go(Request request) throws JsonProcessingException
{
- Assert.assertEquals(new URL("http://example.com/val1/val2"), request.getUrl());
+ Assert.assertEquals("http://example.com/val1/val2", request.getUrl());
Assert.assertEquals(
String.format(
"[%s,%s]\n",
jsonMapper.writeValueAsString(events.get(0)),
jsonMapper.writeValueAsString(events.get(1))
),
- request.getContent().toString(Charsets.UTF_8)
+ Charsets.UTF_8.decode(request.getByteBufferData().slice()).toString()
);
- return Futures.immediateFuture((Final) okResponse());
+ return GoHandlers.immediateFuture(okResponse());
}
}.times(1)
);
diff --git a/src/test/java/com/metamx/http/client/AsyncHttpClientTest.java b/src/test/java/com/metamx/http/client/AsyncHttpClientTest.java
new file mode 100644
index 00000000..af636d88
--- /dev/null
+++ b/src/test/java/com/metamx/http/client/AsyncHttpClientTest.java
@@ -0,0 +1,107 @@
+/*
+ * Copyright 2016 Metamarkets Group Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package com.metamx.http.client;
+
+import com.google.common.base.Charsets;
+import org.asynchttpclient.DefaultAsyncHttpClient;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.BufferedReader;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+public class AsyncHttpClientTest
+{
+
+ @Test
+ public void testRequestTimeout() throws Exception
+ {
+ final ExecutorService exec = Executors.newSingleThreadExecutor();
+ final ServerSocket serverSocket = new ServerSocket(0);
+ exec.submit(
+ new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ while (!Thread.currentThread().isInterrupted()) {
+ try (
+ Socket clientSocket = serverSocket.accept();
+ BufferedReader in = new BufferedReader(new InputStreamReader(clientSocket.getInputStream()));
+ OutputStream out = clientSocket.getOutputStream()
+ ) {
+ while (!in.readLine().equals("")); // skip lines
+ Thread.sleep(5000); // times out
+ out.write("HTTP/1.1 200 OK\r\nContent-Length: 6\r\n\r\nhello!".getBytes(Charsets.UTF_8));
+ }
+ catch (Exception e) {
+ // Suppress
+ }
+ }
+ }
+ }
+ );
+
+ long requestStart = 0;
+ DefaultAsyncHttpClient client = new DefaultAsyncHttpClient();
+ // Creation of connection for the first time takes long time, probably because of DNS cache or something like that
+ warmUp(serverSocket, client);
+ try {
+ requestStart = System.currentTimeMillis();
+ Future> future = client
+ .prepareGet(String.format("http://localhost:%d/", serverSocket.getLocalPort()))
+ .setRequestTimeout(2000)
+ .execute();
+ System.out.println("created future in: " + (System.currentTimeMillis() - requestStart));
+ future.get(3000, TimeUnit.MILLISECONDS);
+ Assert.fail("Expected timeout");
+ }
+ catch (ExecutionException | TimeoutException e) {
+ long elapsed = System.currentTimeMillis() - requestStart;
+ // Within 10% of timeout
+ Assert.assertTrue("elapsed: " + elapsed, elapsed < 2200);
+ }
+ finally {
+ exec.shutdownNow();
+ serverSocket.close();
+ }
+ }
+
+ private void warmUp(ServerSocket serverSocket, DefaultAsyncHttpClient client)
+ {
+ try {
+ Future> future = client
+ .prepareGet(String.format("http://localhost:%d/", serverSocket.getLocalPort()))
+ .setRequestTimeout(100)
+ .execute();
+ future.get();
+ }
+ catch (Exception e) {
+ // ignore
+ }
+ }
+}
diff --git a/src/test/java/com/metamx/http/client/GoHandlers.java b/src/test/java/com/metamx/http/client/GoHandlers.java
deleted file mode 100644
index e353c5ea..00000000
--- a/src/test/java/com/metamx/http/client/GoHandlers.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Copyright 2011 - 2015 Metamarkets Group Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.metamx.http.client;
-
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.metamx.common.ISE;
-import com.metamx.http.client.response.HttpResponseHandler;
-import org.joda.time.Duration;
-
-import java.util.concurrent.Future;
-
-/**
- */
-public class GoHandlers
-{
- public static GoHandler failingHandler()
- {
- return new GoHandler()
- {
- @Override
- public ListenableFuture go(
- Request request,
- HttpResponseHandler handler,
- Duration requestReadTimeout
- ) throws Exception
- {
- throw new ISE("Shouldn't be called");
- }
- };
- }
-
- public static GoHandler passingHandler(final Object retVal)
- {
- return new GoHandler()
- {
- @SuppressWarnings("unchecked")
- @Override
- public ListenableFuture go(
- Request request,
- HttpResponseHandler handler,
- Duration requestReadTimeout
- ) throws Exception
- {
- return Futures.immediateFuture((Final) retVal);
- }
- };
- }
-}
diff --git a/src/test/java/com/metamx/http/client/MockHttpClient.java b/src/test/java/com/metamx/http/client/MockHttpClient.java
deleted file mode 100644
index 63d73e31..00000000
--- a/src/test/java/com/metamx/http/client/MockHttpClient.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Copyright 2011 - 2015 Metamarkets Group Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.metamx.http.client;
-
-import com.google.common.base.Throwables;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.metamx.http.client.pool.ResourceFactory;
-import com.metamx.http.client.pool.ResourcePool;
-import com.metamx.http.client.pool.ResourcePoolConfig;
-import com.metamx.http.client.response.HttpResponseHandler;
-import org.jboss.netty.channel.ChannelFuture;
-import org.joda.time.Duration;
-
-/**
- */
-public class MockHttpClient extends NettyHttpClient
-{
- private volatile GoHandler goHandler;
-
- public MockHttpClient()
- {
- super(
- new ResourcePool(
- new ResourceFactory()
- {
- @Override
- public ChannelFuture generate(String key)
- {
- return null;
- }
-
- @Override
- public boolean isGood(ChannelFuture resource)
- {
- return false;
- }
-
- @Override
- public void close(ChannelFuture resource)
- {
-
- }
- },
- new ResourcePoolConfig(1)
- )
- );
- }
-
- public GoHandler getGoHandler()
- {
- return goHandler;
- }
-
- public void setGoHandler(GoHandler goHandler)
- {
- this.goHandler = goHandler;
- }
-
- public boolean succeeded()
- {
- return goHandler.succeeded();
- }
-
- @Override
- public ListenableFuture go(
- Request request,
- HttpResponseHandler handler,
- Duration requestReadTimeout
- )
- {
- try {
- return goHandler.run(request, handler, requestReadTimeout);
- }
- catch (Exception e) {
- throw Throwables.propagate(e);
- }
- }
-}