From 0db3bbb6af76fc86254b507f1e0ba8bfa3a89101 Mon Sep 17 00:00:00 2001 From: Mauro Franceschini Date: Mon, 11 Aug 2014 10:32:14 +0200 Subject: [PATCH 1/7] Updated the creation of message from String.format to StringBuilder in order to improve performance. --- .../java/com/timgroup/statsd/NonBlockingStatsDClient.java | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/src/main/java/com/timgroup/statsd/NonBlockingStatsDClient.java b/src/main/java/com/timgroup/statsd/NonBlockingStatsDClient.java index bfbb263..e26c836 100644 --- a/src/main/java/com/timgroup/statsd/NonBlockingStatsDClient.java +++ b/src/main/java/com/timgroup/statsd/NonBlockingStatsDClient.java @@ -212,8 +212,12 @@ private String messageFor(String aspect, Object value, String type) { } private String messageFor(String aspect, Object value, String type, double sampleRate) { - final String messageFormat = (sampleRate == 1.0) ? "%s%s:%s|%s" : "%s%s:%s|%s@%f"; - return String.format((Locale)null, messageFormat, prefix, aspect, value, type, sampleRate); + final StringBuilder builder = new StringBuilder(); + builder.append(prefix).append(aspect).append(':').append(value).append('|').append(type); + if (sampleRate != 1.0) { + builder.append(sampleRate); + } + return builder.toString(); } private void send(final String message) { From 3d51d920a10d5250895ade11765f3a0bd66e3dbf Mon Sep 17 00:00:00 2001 From: Mauro Franceschini Date: Mon, 11 Aug 2014 10:43:00 +0200 Subject: [PATCH 2/7] Updated the submit of task without creating a new runnable each time. --- .../statsd/NonBlockingStatsDClient.java | 29 +++++++++++++------ 1 file changed, 20 insertions(+), 9 deletions(-) diff --git a/src/main/java/com/timgroup/statsd/NonBlockingStatsDClient.java b/src/main/java/com/timgroup/statsd/NonBlockingStatsDClient.java index e26c836..2edbc67 100644 --- a/src/main/java/com/timgroup/statsd/NonBlockingStatsDClient.java +++ b/src/main/java/com/timgroup/statsd/NonBlockingStatsDClient.java @@ -5,10 +5,7 @@ import java.net.InetSocketAddress; import java.nio.charset.Charset; import java.util.Locale; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.*; /** * A simple StatsD client implementation facilitating metrics recording. @@ -37,6 +34,7 @@ * */ public final class NonBlockingStatsDClient extends ConvenienceMethodProvidingStatsDClient { + private static final int STATS_QUEUE_MAX_SIZE = 64 * 1024; private static final Charset STATS_D_ENCODING = Charset.forName("UTF-8"); @@ -47,6 +45,8 @@ public final class NonBlockingStatsDClient extends ConvenienceMethodProvidingSta private final String prefix; private final DatagramSocket clientSocket; private final StatsDClientErrorHandler handler; + private final BlockingQueue statsQueue; + private boolean stopping = false; private final ExecutorService executor = Executors.newSingleThreadExecutor(new ThreadFactory() { final ThreadFactory delegate = Executors.defaultThreadFactory(); @@ -106,6 +106,7 @@ public NonBlockingStatsDClient(String prefix, String hostname, int port) throws public NonBlockingStatsDClient(String prefix, String hostname, int port, StatsDClientErrorHandler errorHandler) throws StatsDClientException { this.prefix = (prefix == null || prefix.trim().isEmpty()) ? "" : (prefix.trim() + "."); this.handler = errorHandler; + this.statsQueue = new ArrayBlockingQueue(STATS_QUEUE_MAX_SIZE); try { this.clientSocket = new DatagramSocket(); @@ -113,6 +114,19 @@ public NonBlockingStatsDClient(String prefix, String hostname, int port, StatsDC } catch (Exception e) { throw new StatsDClientException("Failed to start StatsD client", e); } + + executor.execute(new Runnable() { + @Override + public void run() { + while (!stopping) { + try { + blockingSend(statsQueue.take()); + } catch (InterruptedException ex) { + handler.handle(ex); + } + } + } + }); } /** @@ -122,6 +136,7 @@ public NonBlockingStatsDClient(String prefix, String hostname, int port, StatsDC @Override public void stop() { try { + stopping = true; executor.shutdown(); executor.awaitTermination(30, TimeUnit.SECONDS); } @@ -222,11 +237,7 @@ private String messageFor(String aspect, Object value, String type, double sampl private void send(final String message) { try { - executor.execute(new Runnable() { - @Override public void run() { - blockingSend(message); - } - }); + statsQueue.add(message); } catch (Exception e) { handler.handle(e); From 50c60c81f395f42fdcc5f6d55a84563c4788b04d Mon Sep 17 00:00:00 2001 From: Mauro Franceschini Date: Sun, 17 Aug 2014 07:19:03 +0200 Subject: [PATCH 3/7] Updated the client in order to allow the test to run successfully. --- .../timgroup/statsd/NonBlockingStatsDClient.java | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/src/main/java/com/timgroup/statsd/NonBlockingStatsDClient.java b/src/main/java/com/timgroup/statsd/NonBlockingStatsDClient.java index 2edbc67..9fffa35 100644 --- a/src/main/java/com/timgroup/statsd/NonBlockingStatsDClient.java +++ b/src/main/java/com/timgroup/statsd/NonBlockingStatsDClient.java @@ -31,7 +31,7 @@ * on any StatsD clients.

* * @author Tom Denley - * + * @author Mauro Franceschini */ public final class NonBlockingStatsDClient extends ConvenienceMethodProvidingStatsDClient { private static final int STATS_QUEUE_MAX_SIZE = 64 * 1024; @@ -46,6 +46,7 @@ public final class NonBlockingStatsDClient extends ConvenienceMethodProvidingSta private final DatagramSocket clientSocket; private final StatsDClientErrorHandler handler; private final BlockingQueue statsQueue; + private Future executorFuture; private boolean stopping = false; private final ExecutorService executor = Executors.newSingleThreadExecutor(new ThreadFactory() { @@ -115,7 +116,7 @@ public NonBlockingStatsDClient(String prefix, String hostname, int port, StatsDC throw new StatsDClientException("Failed to start StatsD client", e); } - executor.execute(new Runnable() { + this.executorFuture = executor.submit(new Runnable() { @Override public void run() { while (!stopping) { @@ -137,6 +138,7 @@ public void run() { public void stop() { try { stopping = true; + executorFuture.cancel(true); executor.shutdown(); executor.awaitTermination(30, TimeUnit.SECONDS); } @@ -227,12 +229,8 @@ private String messageFor(String aspect, Object value, String type) { } private String messageFor(String aspect, Object value, String type, double sampleRate) { - final StringBuilder builder = new StringBuilder(); - builder.append(prefix).append(aspect).append(':').append(value).append('|').append(type); - if (sampleRate != 1.0) { - builder.append(sampleRate); - } - return builder.toString(); + final String messageFormat = (sampleRate == 1.0) ? "%s%s:%s|%s" : "%s%s:%s|%s@%f"; + return String.format(Locale.US, messageFormat, prefix, aspect, value, type, sampleRate); } private void send(final String message) { From e6c53822fadc143f168b0602e8d712c4d094808d Mon Sep 17 00:00:00 2001 From: Mauro Franceschini Date: Mon, 11 Aug 2014 10:32:14 +0200 Subject: [PATCH 4/7] Updated the creation of message from String.format to StringBuilder in order to improve performance. --- .../java/com/timgroup/statsd/NonBlockingStatsDClient.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/main/java/com/timgroup/statsd/NonBlockingStatsDClient.java b/src/main/java/com/timgroup/statsd/NonBlockingStatsDClient.java index ed8311d..8362c3c 100644 --- a/src/main/java/com/timgroup/statsd/NonBlockingStatsDClient.java +++ b/src/main/java/com/timgroup/statsd/NonBlockingStatsDClient.java @@ -151,7 +151,7 @@ public void stop() { */ @Override public void count(String aspect, long delta, double sampleRate) { - send(messageFor(aspect, Long.toString(delta), "c", sampleRate)); + send(messageFor(aspect, delta, "c", sampleRate)); } /** @@ -221,10 +221,10 @@ public void recordSetEvent(String aspect, String eventName) { */ @Override public void recordExecutionTime(String aspect, long timeInMs, double sampleRate) { - send(messageFor(aspect, Long.toString(timeInMs), "ms", sampleRate)); + send(messageFor(aspect, timeInMs, "ms", sampleRate)); } - private String messageFor(String aspect, String value, String type) { + private String messageFor(String aspect, Object value, String type) { return messageFor(aspect, value, type, 1.0); } From 2015d62771127098124b16b1ac1d4fa74c710955 Mon Sep 17 00:00:00 2001 From: Mauro Franceschini Date: Mon, 11 Aug 2014 10:43:00 +0200 Subject: [PATCH 5/7] Updated the submit of task without creating a new runnable each time. --- .../statsd/NonBlockingStatsDClient.java | 24 +++++++++++++++---- 1 file changed, 19 insertions(+), 5 deletions(-) diff --git a/src/main/java/com/timgroup/statsd/NonBlockingStatsDClient.java b/src/main/java/com/timgroup/statsd/NonBlockingStatsDClient.java index 8362c3c..7b9778f 100644 --- a/src/main/java/com/timgroup/statsd/NonBlockingStatsDClient.java +++ b/src/main/java/com/timgroup/statsd/NonBlockingStatsDClient.java @@ -38,6 +38,7 @@ * */ public final class NonBlockingStatsDClient extends ConvenienceMethodProvidingStatsDClient { + private static final int STATS_QUEUE_MAX_SIZE = 64 * 1024; private static final Charset STATS_D_ENCODING = Charset.forName("UTF-8"); @@ -48,6 +49,8 @@ public final class NonBlockingStatsDClient extends ConvenienceMethodProvidingSta private final String prefix; private final DatagramSocket clientSocket; private final StatsDClientErrorHandler handler; + private final BlockingQueue statsQueue; + private boolean stopping = false; private final ExecutorService executor = Executors.newSingleThreadExecutor(new ThreadFactory() { final ThreadFactory delegate = Executors.defaultThreadFactory(); @@ -107,6 +110,7 @@ public NonBlockingStatsDClient(String prefix, String hostname, int port) throws public NonBlockingStatsDClient(String prefix, String hostname, int port, StatsDClientErrorHandler errorHandler) throws StatsDClientException { this.prefix = (prefix == null || prefix.trim().isEmpty()) ? "" : (prefix.trim() + "."); this.handler = errorHandler; + this.statsQueue = new ArrayBlockingQueue(STATS_QUEUE_MAX_SIZE); try { this.clientSocket = new DatagramSocket(); @@ -114,6 +118,19 @@ public NonBlockingStatsDClient(String prefix, String hostname, int port, StatsDC } catch (Exception e) { throw new StatsDClientException("Failed to start StatsD client", e); } + + executor.execute(new Runnable() { + @Override + public void run() { + while (!stopping) { + try { + blockingSend(statsQueue.take()); + } catch (InterruptedException ex) { + handler.handle(ex); + } + } + } + }); } /** @@ -123,6 +140,7 @@ public NonBlockingStatsDClient(String prefix, String hostname, int port, StatsDC @Override public void stop() { try { + stopping = true; executor.shutdown(); executor.awaitTermination(30, TimeUnit.SECONDS); } @@ -235,11 +253,7 @@ private String messageFor(String aspect, String value, String type, double sampl private void send(final String message) { try { - executor.execute(new Runnable() { - @Override public void run() { - blockingSend(message); - } - }); + statsQueue.add(message); } catch (Exception e) { handler.handle(e); From ddc21ac46d33506a6324138dbe8a29bc367ffb26 Mon Sep 17 00:00:00 2001 From: Mauro Franceschini Date: Sun, 17 Aug 2014 07:19:03 +0200 Subject: [PATCH 6/7] Updated the client in order to allow the test to run successfully. --- .../com/timgroup/statsd/NonBlockingStatsDClient.java | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/src/main/java/com/timgroup/statsd/NonBlockingStatsDClient.java b/src/main/java/com/timgroup/statsd/NonBlockingStatsDClient.java index 7b9778f..0655de0 100644 --- a/src/main/java/com/timgroup/statsd/NonBlockingStatsDClient.java +++ b/src/main/java/com/timgroup/statsd/NonBlockingStatsDClient.java @@ -35,7 +35,7 @@ * on any StatsD clients.

* * @author Tom Denley - * + * @author Mauro Franceschini */ public final class NonBlockingStatsDClient extends ConvenienceMethodProvidingStatsDClient { private static final int STATS_QUEUE_MAX_SIZE = 64 * 1024; @@ -50,6 +50,7 @@ public final class NonBlockingStatsDClient extends ConvenienceMethodProvidingSta private final DatagramSocket clientSocket; private final StatsDClientErrorHandler handler; private final BlockingQueue statsQueue; + private Future executorFuture; private boolean stopping = false; private final ExecutorService executor = Executors.newSingleThreadExecutor(new ThreadFactory() { @@ -119,7 +120,7 @@ public NonBlockingStatsDClient(String prefix, String hostname, int port, StatsDC throw new StatsDClientException("Failed to start StatsD client", e); } - executor.execute(new Runnable() { + this.executorFuture = executor.submit(new Runnable() { @Override public void run() { while (!stopping) { @@ -141,6 +142,7 @@ public void run() { public void stop() { try { stopping = true; + executorFuture.cancel(true); executor.shutdown(); executor.awaitTermination(30, TimeUnit.SECONDS); } @@ -246,9 +248,9 @@ private String messageFor(String aspect, Object value, String type) { return messageFor(aspect, value, type, 1.0); } - private String messageFor(String aspect, String value, String type, double sampleRate) { + private String messageFor(String aspect, Object value, String type, double sampleRate) { final String messageFormat = (sampleRate == 1.0) ? "%s%s:%s|%s" : "%s%s:%s|%s@%f"; - return String.format((Locale)null, messageFormat, prefix, aspect, value, type, sampleRate); + return String.format(Locale.US, messageFormat, prefix, aspect, value, type, sampleRate); } private void send(final String message) { From 076bd55a03cf7f832b4e3846195e9a770be7d2c6 Mon Sep 17 00:00:00 2001 From: Mauro Franceschini Date: Sun, 17 Aug 2014 07:33:36 +0200 Subject: [PATCH 7/7] Merge from upstream. --- .../java/com/timgroup/statsd/NonBlockingStatsDClient.java | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/src/main/java/com/timgroup/statsd/NonBlockingStatsDClient.java b/src/main/java/com/timgroup/statsd/NonBlockingStatsDClient.java index 0655de0..33a44b5 100644 --- a/src/main/java/com/timgroup/statsd/NonBlockingStatsDClient.java +++ b/src/main/java/com/timgroup/statsd/NonBlockingStatsDClient.java @@ -5,10 +5,7 @@ import java.net.InetSocketAddress; import java.nio.charset.Charset; import java.util.Locale; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.*; import java.text.NumberFormat; /** @@ -189,7 +186,6 @@ public void recordGaugeValue(String aspect, long value) { recordGaugeCommon(aspect, Long.toString(value), value < 0, false); } - @Override public void recordGaugeValue(String aspect, double value) { recordGaugeCommon(aspect, stringValueOf(value), value < 0, false); } @@ -199,7 +195,6 @@ public void recordGaugeDelta(String aspect, long value) { recordGaugeCommon(aspect, Long.toString(value), value < 0, true); } - @Override public void recordGaugeDelta(String aspect, double value) { recordGaugeCommon(aspect, stringValueOf(value), value < 0, true); }