Skip to content
45 changes: 28 additions & 17 deletions src/main/java/com/timgroup/statsd/NonBlockingStatsDClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,7 @@
import java.net.InetSocketAddress;
import java.nio.charset.Charset;
import java.util.Locale;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.*;
import java.text.NumberFormat;

/**
Expand All @@ -35,9 +32,10 @@
* on any StatsD clients.</p>
*
* @author Tom Denley
*
* @author Mauro Franceschini
*/
public final class NonBlockingStatsDClient extends ConvenienceMethodProvidingStatsDClient {
private static final int STATS_QUEUE_MAX_SIZE = 64 * 1024;

private static final Charset STATS_D_ENCODING = Charset.forName("UTF-8");

Expand All @@ -48,6 +46,9 @@ public final class NonBlockingStatsDClient extends ConvenienceMethodProvidingSta
private final String prefix;
private final DatagramSocket clientSocket;
private final StatsDClientErrorHandler handler;
private final BlockingQueue<String> statsQueue;
private Future<?> executorFuture;
private boolean stopping = false;

private final ExecutorService executor = Executors.newSingleThreadExecutor(new ThreadFactory() {
final ThreadFactory delegate = Executors.defaultThreadFactory();
Expand Down Expand Up @@ -107,13 +108,27 @@ public NonBlockingStatsDClient(String prefix, String hostname, int port) throws
public NonBlockingStatsDClient(String prefix, String hostname, int port, StatsDClientErrorHandler errorHandler) throws StatsDClientException {
this.prefix = (prefix == null || prefix.trim().isEmpty()) ? "" : (prefix.trim() + ".");
this.handler = errorHandler;
this.statsQueue = new ArrayBlockingQueue<String>(STATS_QUEUE_MAX_SIZE);

try {
this.clientSocket = new DatagramSocket();
this.clientSocket.connect(new InetSocketAddress(hostname, port));
} catch (Exception e) {
throw new StatsDClientException("Failed to start StatsD client", e);
}

this.executorFuture = executor.submit(new Runnable() {
@Override
public void run() {
while (!stopping) {
try {
blockingSend(statsQueue.take());
} catch (InterruptedException ex) {
handler.handle(ex);
}
}
}
});
}

/**
Expand All @@ -123,6 +138,8 @@ public NonBlockingStatsDClient(String prefix, String hostname, int port, StatsDC
@Override
public void stop() {
try {
stopping = true;
executorFuture.cancel(true);
executor.shutdown();
executor.awaitTermination(30, TimeUnit.SECONDS);
}
Expand Down Expand Up @@ -151,7 +168,7 @@ public void stop() {
*/
@Override
public void count(String aspect, long delta, double sampleRate) {
send(messageFor(aspect, Long.toString(delta), "c", sampleRate));
send(messageFor(aspect, delta, "c", sampleRate));
}

/**
Expand All @@ -169,7 +186,6 @@ public void recordGaugeValue(String aspect, long value) {
recordGaugeCommon(aspect, Long.toString(value), value < 0, false);
}

@Override
public void recordGaugeValue(String aspect, double value) {
recordGaugeCommon(aspect, stringValueOf(value), value < 0, false);
}
Expand All @@ -179,7 +195,6 @@ public void recordGaugeDelta(String aspect, long value) {
recordGaugeCommon(aspect, Long.toString(value), value < 0, true);
}

@Override
public void recordGaugeDelta(String aspect, double value) {
recordGaugeCommon(aspect, stringValueOf(value), value < 0, true);
}
Expand Down Expand Up @@ -221,25 +236,21 @@ public void recordSetEvent(String aspect, String eventName) {
*/
@Override
public void recordExecutionTime(String aspect, long timeInMs, double sampleRate) {
send(messageFor(aspect, Long.toString(timeInMs), "ms", sampleRate));
send(messageFor(aspect, timeInMs, "ms", sampleRate));
}

private String messageFor(String aspect, String value, String type) {
private String messageFor(String aspect, Object value, String type) {
return messageFor(aspect, value, type, 1.0);
}

private String messageFor(String aspect, String value, String type, double sampleRate) {
private String messageFor(String aspect, Object value, String type, double sampleRate) {
final String messageFormat = (sampleRate == 1.0) ? "%s%s:%s|%s" : "%s%s:%s|%s@%f";
return String.format((Locale)null, messageFormat, prefix, aspect, value, type, sampleRate);
return String.format(Locale.US, messageFormat, prefix, aspect, value, type, sampleRate);
}

private void send(final String message) {
try {
executor.execute(new Runnable() {
@Override public void run() {
blockingSend(message);
}
});
statsQueue.add(message);
}
catch (Exception e) {
handler.handle(e);
Expand Down