From 7a620046151592dfffa55b8120a0287f3395010e Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Sun, 10 May 2020 12:36:02 -0700 Subject: [PATCH 01/31] Fixed processors improperly sharing the transport --- .../java/com/uber/m3/tally/m3/M3Reporter.java | 88 ++++++++++--------- 1 file changed, 45 insertions(+), 43 deletions(-) diff --git a/m3/src/main/java/com/uber/m3/tally/m3/M3Reporter.java b/m3/src/main/java/com/uber/m3/tally/m3/M3Reporter.java index c307a00..d87507c 100644 --- a/m3/src/main/java/com/uber/m3/tally/m3/M3Reporter.java +++ b/m3/src/main/java/com/uber/m3/tally/m3/M3Reporter.java @@ -104,8 +104,6 @@ public class M3Reporter implements StatsReporter, AutoCloseable { private static final ThreadLocal PAYLOAD_SIZE_ESTIMATOR = ThreadLocal.withInitial(SerializedPayloadSizeEstimator::new); - private M3.Client client; - private Duration maxBufferingDelay; private final int payloadCapacity; @@ -127,49 +125,28 @@ public class M3Reporter implements StatsReporter, AutoCloseable { // is being shutdown only after all of its processor had done so private CountDownLatch shutdownLatch = new CountDownLatch(NUM_PROCESSORS); - private TTransport transport; - private AtomicBoolean isShutdown = new AtomicBoolean(false); // Use inner Builder class to construct an M3Reporter private M3Reporter(Builder builder) { - try { - // Builder verifies non-null, non-empty socketAddresses - SocketAddress[] socketAddresses = builder.socketAddresses; + payloadCapacity = calculatePayloadCapacity(builder.maxPacketSizeBytes, builder.metricTagSet); - TProtocolFactory protocolFactory = new TCompactProtocol.Factory(); + maxBufferingDelay = Duration.ofMillis(builder.maxProcessorWaitUntilFlushMillis); - if (socketAddresses.length > 1) { - transport = new TMultiUdpClient(socketAddresses); - } else { - transport = new TUdpClient(socketAddresses[0]); - } + bucketIdTagName = builder.histogramBucketIdName; + bucketTagName = builder.histogramBucketName; + bucketValFmt = String.format("%%.%df", builder.histogramBucketTagPrecision); - transport.open(); + metricQueue = new LinkedBlockingQueue<>(builder.maxQueueSize); - client = new M3.Client(protocolFactory.getProtocol(transport)); - - payloadCapacity = calculatePayloadCapacity(builder.maxPacketSizeBytes, builder.metricTagSet); - - maxBufferingDelay = Duration.ofMillis(builder.maxProcessorWaitUntilFlushMillis); - - bucketIdTagName = builder.histogramBucketIdName; - bucketTagName = builder.histogramBucketName; - bucketValFmt = String.format("%%.%df", builder.histogramBucketTagPrecision); - - metricQueue = new LinkedBlockingQueue<>(builder.maxQueueSize); + executor = builder.executor != null ? builder.executor : Executors.newFixedThreadPool(NUM_PROCESSORS); - executor = builder.executor != null ? builder.executor : Executors.newFixedThreadPool(NUM_PROCESSORS); + clock = Clock.systemUTC(); - clock = Clock.systemUTC(); + commonTags = builder.metricTagSet; - commonTags = builder.metricTagSet; - - for (int i = 0; i < NUM_PROCESSORS; ++i) { - addAndRunProcessor(); - } - } catch (TTransportException | SocketException e) { - throw new RuntimeException("Exception creating M3Reporter", e); + for (int i = 0; i < NUM_PROCESSORS; ++i) { + bootProcessors(builder.endpointSocketAddresses); } } @@ -197,8 +174,13 @@ private static String getHostName() { } } - private void addAndRunProcessor() { - executor.execute(new Processor()); + private void bootProcessors(SocketAddress[] endpointSocketAddresses) { + try { + executor.execute(new Processor(endpointSocketAddresses)); + } catch (TTransportException | SocketException e) { + LOG.error("Failed to boot processor", e); + throw new RuntimeException(e); + } } @Override @@ -245,8 +227,6 @@ public void close() { } catch (InterruptedException e) { LOG.warn("M3Reporter closing before Processors complete due to being interrupted!"); } - - transport.close(); } private static Set toMetricTagSet(Map tags) { @@ -472,6 +452,24 @@ private class Processor implements Runnable { private int metricsSize = 0; + private final M3.Client client; + private final TTransport transport; + + Processor(SocketAddress[] socketAddresses) throws TTransportException, SocketException { + TProtocolFactory protocolFactory = new TCompactProtocol.Factory(); + + if (socketAddresses.length > 1) { + transport = new TMultiUdpClient(socketAddresses); + } else { + transport = new TUdpClient(socketAddresses[0]); + } + + // Open the socket + transport.open(); + + client = new M3.Client(protocolFactory.getProtocol(transport)); + } + @Override public void run() { try { @@ -504,6 +502,10 @@ public void run() { } finally { drainQueue(); flushBuffered(); + + // Close transport + transport.close(); + // Count down shutdown latch to notify reporter shutdownLatch.countDown(); } @@ -602,7 +604,7 @@ public int evaluateByteSize(Metric metric) { * Builder pattern to construct an {@link M3Reporter}. */ public static class Builder { - protected SocketAddress[] socketAddresses; + protected SocketAddress[] endpointSocketAddresses; protected String service; protected String env; protected ExecutorService executor; @@ -621,14 +623,14 @@ public static class Builder { /** * Constructs a {@link Builder}. Having at least one {@code SocketAddress} is required. - * @param socketAddresses the array of {@code SocketAddress}es for this {@link M3Reporter} + * @param endpointSocketAddresses the array of {@code SocketAddress}es for this {@link M3Reporter} */ - public Builder(SocketAddress[] socketAddresses) { - if (socketAddresses == null || socketAddresses.length == 0) { + public Builder(SocketAddress[] endpointSocketAddresses) { + if (endpointSocketAddresses == null || endpointSocketAddresses.length == 0) { throw new IllegalArgumentException("Must specify at least one SocketAddress"); } - this.socketAddresses = socketAddresses; + this.endpointSocketAddresses = endpointSocketAddresses; } /** From 8163bbf42279a59c2d6163b4897144f75101a1b9 Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Sun, 10 May 2020 12:36:40 -0700 Subject: [PATCH 02/31] Tidying up --- .../main/java/com/uber/m3/tally/m3/M3Reporter.java | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/m3/src/main/java/com/uber/m3/tally/m3/M3Reporter.java b/m3/src/main/java/com/uber/m3/tally/m3/M3Reporter.java index d87507c..0456c88 100644 --- a/m3/src/main/java/com/uber/m3/tally/m3/M3Reporter.java +++ b/m3/src/main/java/com/uber/m3/tally/m3/M3Reporter.java @@ -104,13 +104,13 @@ public class M3Reporter implements StatsReporter, AutoCloseable { private static final ThreadLocal PAYLOAD_SIZE_ESTIMATOR = ThreadLocal.withInitial(SerializedPayloadSizeEstimator::new); - private Duration maxBufferingDelay; + private final Duration maxBufferingDelay; private final int payloadCapacity; - private String bucketIdTagName; - private String bucketTagName; - private String bucketValFmt; + private final String bucketIdTagName; + private final String bucketTagName; + private final String bucketValFmt; private final Set commonTags; @@ -123,9 +123,9 @@ public class M3Reporter implements StatsReporter, AutoCloseable { // This is a synchronization barrier to make sure that reporter // is being shutdown only after all of its processor had done so - private CountDownLatch shutdownLatch = new CountDownLatch(NUM_PROCESSORS); + private final CountDownLatch shutdownLatch = new CountDownLatch(NUM_PROCESSORS); - private AtomicBoolean isShutdown = new AtomicBoolean(false); + private final AtomicBoolean isShutdown = new AtomicBoolean(false); // Use inner Builder class to construct an M3Reporter private M3Reporter(Builder builder) { From 516b3f770b186d41506ea48d062a463541a46c16 Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Sun, 10 May 2020 12:47:37 -0700 Subject: [PATCH 03/31] Added disclaimer --- m3/src/main/java/com/uber/m3/tally/m3/M3Reporter.java | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/m3/src/main/java/com/uber/m3/tally/m3/M3Reporter.java b/m3/src/main/java/com/uber/m3/tally/m3/M3Reporter.java index 0456c88..4effcdc 100644 --- a/m3/src/main/java/com/uber/m3/tally/m3/M3Reporter.java +++ b/m3/src/main/java/com/uber/m3/tally/m3/M3Reporter.java @@ -99,6 +99,11 @@ public class M3Reporter implements StatsReporter, AutoCloseable { private static final int MIN_METRIC_BUCKET_ID_TAG_LENGTH = 4; + /** + * NOTE: DO NOT CHANGE THIS NUMBER! + * Reporter architecture is not suited for multi-processor setup and might cause some disruption + * to how metrics are processed and eventually submitted to M3 collectors; + */ private static final int NUM_PROCESSORS = 1; private static final ThreadLocal PAYLOAD_SIZE_ESTIMATOR = From e75a180dc95dbe856513880cf28d901e4371ddec Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Sun, 10 May 2020 12:51:14 -0700 Subject: [PATCH 04/31] Tidying up --- m3/src/main/java/com/uber/m3/tally/m3/M3Reporter.java | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/m3/src/main/java/com/uber/m3/tally/m3/M3Reporter.java b/m3/src/main/java/com/uber/m3/tally/m3/M3Reporter.java index 4effcdc..155628e 100644 --- a/m3/src/main/java/com/uber/m3/tally/m3/M3Reporter.java +++ b/m3/src/main/java/com/uber/m3/tally/m3/M3Reporter.java @@ -455,7 +455,7 @@ private class Processor implements Runnable { private Instant lastBufferFlushTimestamp = Instant.now(clock); - private int metricsSize = 0; + private int bufferedBytes = 0; private final M3.Client client; private final TTransport transport; @@ -478,7 +478,7 @@ private class Processor implements Runnable { @Override public void run() { try { - while (!executor.isShutdown()) { + while (!isShutdown.get()) { // This `poll` call will block for at most the specified duration to take an item // off the queue. If we get an item, we append it to the queue to be flushed, // otherwise we flush what we have so far. @@ -523,14 +523,14 @@ private void process(SizedMetric sizedMetric) { } int size = sizedMetric.getSize(); - if (metricsSize + size > payloadCapacity || elapsedMaxDelaySinceLastFlush()) { + if (bufferedBytes + size > payloadCapacity || elapsedMaxDelaySinceLastFlush()) { flushBuffered(); } Metric metric = sizedMetric.getMetric(); metricsBuffer.add(metric); - metricsSize += size; + bufferedBytes += size; } private boolean elapsedMaxDelaySinceLastFlush() { @@ -567,7 +567,7 @@ private void flushBuffered() { } metricsBuffer.clear(); - metricsSize = 0; + bufferedBytes = 0; lastBufferFlushTimestamp = Instant.now(clock); } } From 816a29e84dda09940f2f15e021714d8ba8d90560 Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Sun, 10 May 2020 12:58:04 -0700 Subject: [PATCH 05/31] Put a guard-rail against uncaughts in the `Processor` --- .../java/com/uber/m3/tally/m3/M3Reporter.java | 34 ++++++++++--------- 1 file changed, 18 insertions(+), 16 deletions(-) diff --git a/m3/src/main/java/com/uber/m3/tally/m3/M3Reporter.java b/m3/src/main/java/com/uber/m3/tally/m3/M3Reporter.java index 155628e..69ea315 100644 --- a/m3/src/main/java/com/uber/m3/tally/m3/M3Reporter.java +++ b/m3/src/main/java/com/uber/m3/tally/m3/M3Reporter.java @@ -128,7 +128,7 @@ public class M3Reporter implements StatsReporter, AutoCloseable { // This is a synchronization barrier to make sure that reporter // is being shutdown only after all of its processor had done so - private final CountDownLatch shutdownLatch = new CountDownLatch(NUM_PROCESSORS); + private final CountDownLatch shutdownLatch; private final AtomicBoolean isShutdown = new AtomicBoolean(false); @@ -150,6 +150,7 @@ private M3Reporter(Builder builder) { commonTags = builder.metricTagSet; + shutdownLatch = new CountDownLatch(NUM_PROCESSORS); for (int i = 0; i < NUM_PROCESSORS; ++i) { bootProcessors(builder.endpointSocketAddresses); } @@ -477,8 +478,8 @@ private class Processor implements Runnable { @Override public void run() { - try { - while (!isShutdown.get()) { + while (!isShutdown.get()) { + try { // This `poll` call will block for at most the specified duration to take an item // off the queue. If we get an item, we append it to the queue to be flushed, // otherwise we flush what we have so far. @@ -500,20 +501,21 @@ public void run() { } else { process(sizedMetric); } - + } catch (Throwable t) { + // This is fly-away guard making sure that uncaught exception + // will not crash the processor + LOG.error("Unhandled exception in processor", t); } - } catch (InterruptedException e) { - // Don't care if we get interrupted - the finally block will clean up - } finally { - drainQueue(); - flushBuffered(); + } - // Close transport - transport.close(); + drainQueue(); + flushBuffered(); - // Count down shutdown latch to notify reporter - shutdownLatch.countDown(); - } + // Close transport + transport.close(); + + // Count down shutdown latch to notify reporter + shutdownLatch.countDown(); } private void process(SizedMetric sizedMetric) { @@ -562,8 +564,8 @@ private void flushBuffered() { .setCommonTags(commonTags) .setMetrics(metricsBuffer) ); - } catch (TException tException) { - LOG.warn("Failed to flush metrics: " + tException.getMessage()); + } catch (Throwable t) { + LOG.error("Failed to flush metrics", t); } metricsBuffer.clear(); From ef84bfca6383247ea0fc139f53909cd186a4f15e Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Sun, 10 May 2020 13:21:54 -0700 Subject: [PATCH 06/31] Fixed tests --- .../com/uber/m3/tally/m3/M3ReporterTest.java | 461 ++++++++---------- .../com/uber/m3/tally/m3/MockM3Server.java | 27 +- .../com/uber/m3/tally/m3/MockM3Service.java | 1 - 3 files changed, 226 insertions(+), 263 deletions(-) diff --git a/m3/src/test/java/com/uber/m3/tally/m3/M3ReporterTest.java b/m3/src/test/java/com/uber/m3/tally/m3/M3ReporterTest.java index b82c45b..ef5a297 100644 --- a/m3/src/test/java/com/uber/m3/tally/m3/M3ReporterTest.java +++ b/m3/src/test/java/com/uber/m3/tally/m3/M3ReporterTest.java @@ -33,6 +33,8 @@ import com.uber.m3.thrift.gen.TimerValue; import com.uber.m3.util.Duration; import com.uber.m3.util.ImmutableMap; +import org.junit.After; +import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; @@ -60,6 +62,8 @@ public class M3ReporterTest { "host", "test-host" ); + private M3Reporter reporter; + @BeforeClass public static void setup() { try { @@ -69,22 +73,25 @@ public static void setup() { } } - @Test - public void reporter() throws InterruptedException { - final MockM3Server server = new MockM3Server(3, socketAddress); - M3Reporter reporter = null; - - Thread serverThread = new Thread(new Runnable() { - @Override - public void run() { - server.serve(); - } - }); + @Before + public void setupTest() { + reporter = + new M3Reporter.Builder(socketAddress) + .service("test-service") + .commonTags(DEFAULT_TAGS) + .maxQueueSize(MAX_QUEUE_SIZE) + .maxPacketSizeBytes(MAX_PACKET_SIZE_BYTES) + .build(); + } - try { - serverThread.start(); + @After + public void teardownTest() { + reporter.close(); + } - ImmutableMap commonTags = new ImmutableMap.Builder(5) + @Test + public void reporter() throws InterruptedException { + ImmutableMap commonTags = new ImmutableMap.Builder(5) .put("env", "development") .put("host", "default") .put("commonTag1", "val1") @@ -92,112 +99,112 @@ public void run() { .put("commonTag3", "val3") .build(); - reporter = new M3Reporter.Builder(socketAddress) - .service("test-service") - .commonTags(commonTags) - .includeHost(true) - .build(); - - ImmutableMap tags = new ImmutableMap.Builder(2) + ImmutableMap tags = new ImmutableMap.Builder(2) .put("testTag1", "testVal1") .put("testTag2", "testVal2") .build(); - reporter.reportCounter("my-counter", tags, 10); - reporter.flush(); + M3Reporter.Builder reporterBuilder = + new M3Reporter.Builder(socketAddress) + .service("test-service") + .commonTags(commonTags) + .includeHost(true) - reporter.reportTimer("my-timer", tags, Duration.ofMillis(5)); - reporter.flush(); + List receivedBatches; - reporter.reportGauge("my-gauge", tags, 42.42); - reporter.flush(); + try (final M3Reporter reporter = reporterBuilder.build()) { + try (final MockM3Server server = bootM3Collector(3)) { + reporter.reportCounter("my-counter", tags, 10); + reporter.flush(); - // Shutdown both reporter and server - reporter.close(); - server.awaitAndClose(); + reporter.reportTimer("my-timer", tags, Duration.ofMillis(5)); + reporter.flush(); - List batches = server.getService().getBatches(); - assertEquals(3, batches.size()); - - // Validate common tags - for (MetricBatch batch : batches) { - assertNotNull(batch); - assertTrue(batch.isSetCommonTags()); - assertEquals(commonTags.size() + 1, batch.getCommonTags().size()); - - for (MetricTag tag : batch.getCommonTags()) { - if (tag.getTagName().equals(M3Reporter.SERVICE_TAG)) { - assertEquals("test-service", tag.getTagValue()); - } else { - assertEquals(commonTags.get(tag.getTagName()), tag.getTagValue()); - } - } + reporter.reportGauge("my-gauge", tags, 42.42); + reporter.flush(); + + // Shutdown both reporter and server + reporter.close(); + server.await(); + + receivedBatches = server.getService().getBatches(); } + } - // Validate metrics - List emittedCounters = batches.get(0).getMetrics(); - assertEquals(1, emittedCounters.size()); + assertEquals(3, receivedBatches.size()); - List emittedTimers = batches.get(1).getMetrics(); - assertEquals(1, emittedTimers.size()); + // Validate common tags + for (MetricBatch batch : receivedBatches) { + assertNotNull(batch); + assertTrue(batch.isSetCommonTags()); + assertEquals(commonTags.size() + 1, batch.getCommonTags().size()); + + for (MetricTag tag : batch.getCommonTags()) { + if (tag.getTagName().equals(M3Reporter.SERVICE_TAG)) { + assertEquals("test-service", tag.getTagValue()); + } else { + assertEquals(commonTags.get(tag.getTagName()), tag.getTagValue()); + } + } + } - List emittedGauges = batches.get(2).getMetrics(); - assertEquals(1, emittedGauges.size()); + // Validate metrics + List emittedCounters = receivedBatches.get(0).getMetrics(); + assertEquals(1, emittedCounters.size()); - Metric emittedCounter = emittedCounters.get(0); - Metric emittedTimer = emittedTimers.get(0); - Metric emittedGauge = emittedGauges.get(0); + List emittedTimers = receivedBatches.get(1).getMetrics(); + assertEquals(1, emittedTimers.size()); - assertEquals("my-counter", emittedCounter.getName()); - assertTrue(emittedCounter.isSetTags()); - assertEquals(tags.size(), emittedCounter.getTagsSize()); + List emittedGauges = receivedBatches.get(2).getMetrics(); + assertEquals(1, emittedGauges.size()); - for (MetricTag tag : emittedCounter.getTags()) { - assertEquals(tags.get(tag.getTagName()), tag.getTagValue()); - } + Metric emittedCounter = emittedCounters.get(0); + Metric emittedTimer = emittedTimers.get(0); + Metric emittedGauge = emittedGauges.get(0); - // Validate counter - assertTrue(emittedCounter.isSetMetricValue()); + assertEquals("my-counter", emittedCounter.getName()); + assertTrue(emittedCounter.isSetTags()); + assertEquals(tags.size(), emittedCounter.getTagsSize()); - MetricValue emittedValue = emittedCounter.getMetricValue(); - assertTrue(emittedValue.isSetCount()); - assertFalse(emittedValue.isSetGauge()); - assertFalse(emittedValue.isSetTimer()); + for (MetricTag tag : emittedCounter.getTags()) { + assertEquals(tags.get(tag.getTagName()), tag.getTagValue()); + } - CountValue emittedCount = emittedValue.getCount(); - assertTrue(emittedCount.isSetI64Value()); - assertEquals(10, emittedCount.getI64Value()); + // Validate counter + assertTrue(emittedCounter.isSetMetricValue()); - // Validate timer - assertTrue(emittedTimer.isSetMetricValue()); + MetricValue emittedValue = emittedCounter.getMetricValue(); + assertTrue(emittedValue.isSetCount()); + assertFalse(emittedValue.isSetGauge()); + assertFalse(emittedValue.isSetTimer()); - emittedValue = emittedTimer.getMetricValue(); - assertFalse(emittedValue.isSetCount()); - assertFalse(emittedValue.isSetGauge()); - assertTrue(emittedValue.isSetTimer()); + CountValue emittedCount = emittedValue.getCount(); + assertTrue(emittedCount.isSetI64Value()); + assertEquals(10, emittedCount.getI64Value()); - TimerValue emittedTimerValue = emittedValue.getTimer(); - assertTrue(emittedTimerValue.isSetI64Value()); - assertEquals(5_000_000, emittedTimerValue.getI64Value()); + // Validate timer + assertTrue(emittedTimer.isSetMetricValue()); - // Validate gauge - assertTrue(emittedGauge.isSetMetricValue()); + emittedValue = emittedTimer.getMetricValue(); + assertFalse(emittedValue.isSetCount()); + assertFalse(emittedValue.isSetGauge()); + assertTrue(emittedValue.isSetTimer()); - emittedValue = emittedGauge.getMetricValue(); - assertFalse(emittedValue.isSetCount()); - assertTrue(emittedValue.isSetGauge()); - assertFalse(emittedValue.isSetTimer()); + TimerValue emittedTimerValue = emittedValue.getTimer(); + assertTrue(emittedTimerValue.isSetI64Value()); + assertEquals(5_000_000, emittedTimerValue.getI64Value()); - GaugeValue emittedGaugeValue = emittedValue.getGauge(); - assertTrue(emittedGaugeValue.isSetDValue()); - assertEquals(42.42, emittedGaugeValue.getDValue(), EPSILON); - } finally { - if (reporter != null) { - reporter.close(); - } + // Validate gauge + assertTrue(emittedGauge.isSetMetricValue()); - server.awaitAndClose(); - } + emittedValue = emittedGauge.getMetricValue(); + assertFalse(emittedValue.isSetCount()); + assertTrue(emittedValue.isSetGauge()); + assertFalse(emittedValue.isSetTimer()); + + GaugeValue emittedGaugeValue = emittedValue.getGauge(); + assertTrue(emittedGaugeValue.isSetDValue()); + assertEquals(42.42, emittedGaugeValue.getDValue(), EPSILON); } @Test @@ -229,112 +236,69 @@ public void builderMissingCommonTag() { @Test public void reporterFinalFlush() throws InterruptedException { - final MockM3Server server = new MockM3Server(1, socketAddress); - - Thread serverThread = new Thread(new Runnable() { - @Override - public void run() { - server.serve(); - } - }); - - serverThread.start(); - - M3Reporter reporter = new M3Reporter.Builder(socketAddress) - .service("test-service") - .commonTags(DEFAULT_TAGS) - .build(); - - reporter.reportTimer("final-flush-timer", null, Duration.ofMillis(10)); + try (final MockM3Server server = bootM3Collector(1)) { + reporter.reportTimer("final-flush-timer", null, Duration.ofMillis(10)); + reporter.close(); - reporter.close(); - server.awaitAndClose(); + server.await(); - List batches = server.getService().getBatches(); - assertEquals(1, batches.size()); - assertNotNull(batches.get(0)); - assertEquals(1, batches.get(0).getMetrics().size()); + List batches = server.getService().getBatches(); + assertEquals(1, batches.size()); + assertNotNull(batches.get(0)); + assertEquals(1, batches.get(0).getMetrics().size()); + } } @Test public void reporterAfterCloseNoThrow() throws InterruptedException { - final MockM3Server server = new MockM3Server(0, socketAddress); - - Thread serverThread = new Thread(new Runnable() { - @Override - public void run() { - server.serve(); - } - }); - - try { - serverThread.start(); - - M3Reporter reporter = new M3Reporter.Builder(socketAddress) - .service("test-service") - .commonTags(DEFAULT_TAGS) - .build(); - + try (final MockM3Server server = bootM3Collector(0);) { reporter.close(); reporter.reportGauge("my-gauge", null, 4.2); reporter.flush(); - } finally { - server.awaitAndClose(); } } @Test public void reporterHistogramDurations() throws InterruptedException { - final MockM3Server server = new MockM3Server(2, socketAddress); - - Thread serverThread = new Thread(new Runnable() { - @Override - public void run() { - server.serve(); - } - }); + List receivedBatches; - serverThread.start(); - - M3Reporter reporter = new M3Reporter.Builder(socketAddress) - .service("test-service") - .commonTags(DEFAULT_TAGS) - .build(); + try (final MockM3Server server = bootM3Collector(2)) { + Buckets buckets = DurationBuckets.linear(Duration.ZERO, Duration.ofMillis(25), 5); - Buckets buckets = DurationBuckets.linear(Duration.ZERO, Duration.ofMillis(25), 5); + Map histogramTags = new HashMap<>(); + histogramTags.put("foo", "bar"); - Map histogramTags = new HashMap<>(); - histogramTags.put("foo", "bar"); + reporter.reportHistogramDurationSamples( + "my-histogram", + histogramTags, + buckets, + Duration.ZERO, + Duration.ofMillis(25), + 7 + ); - reporter.reportHistogramDurationSamples( - "my-histogram", - histogramTags, - buckets, - Duration.ZERO, - Duration.ofMillis(25), - 7 - ); + reporter.reportHistogramDurationSamples( + "my-histogram", + histogramTags, + buckets, + Duration.ofMillis(50), + Duration.ofMillis(75), + 3 + ); - reporter.reportHistogramDurationSamples( - "my-histogram", - histogramTags, - buckets, - Duration.ofMillis(50), - Duration.ofMillis(75), - 3 - ); + reporter.close(); + server.await(); - reporter.close(); - server.awaitAndClose(); + receivedBatches = server.getService().getBatches(); + } - List batches = server.getService().getBatches(); - assertEquals(1, batches.size()); - assertNotNull(batches.get(0)); - assertEquals(2, batches.get(0).getMetrics().size()); + assertEquals(1, receivedBatches.size()); + assertNotNull(receivedBatches.get(0)); + assertEquals(2, receivedBatches.get(0).getMetrics().size()); // Verify first bucket - Metric metric = batches.get(0).getMetrics().get(0); + Metric metric = receivedBatches.get(0).getMetrics().get(0); assertEquals("my-histogram", metric.getName()); assertTrue(metric.isSetTags()); assertEquals(3, metric.getTagsSize()); @@ -359,7 +323,7 @@ public void run() { assertEquals(7, count.getI64Value()); // Verify second bucket - metric = server.getService().getBatches().get(0).getMetrics().get(1); + metric = receivedBatches.get(0).getMetrics().get(1); assertEquals("my-histogram", metric.getName()); assertTrue(metric.isSetTags()); assertEquals(3, metric.getTagsSize()); @@ -384,104 +348,89 @@ public void run() { @Test public void reporterHistogramValues() throws InterruptedException { - final MockM3Server server = new MockM3Server(2, socketAddress); - - Thread serverThread = new Thread(new Runnable() { - @Override - public void run() { - server.serve(); - } - }); - - try { - serverThread.start(); - - M3Reporter reporter = new M3Reporter.Builder(socketAddress) - .service("test-service") - .commonTags(DEFAULT_TAGS) - .build(); + List receivedBatches; + try(final MockM3Server server = bootM3Collector(2);) { Buckets buckets = ValueBuckets.linear(0, 25_000_000, 5); Map histogramTags = new HashMap<>(); histogramTags.put("foo", "bar"); reporter.reportHistogramValueSamples( - "my-histogram", - histogramTags, - buckets, - 0, - 25_000_000, - 7 + "my-histogram", + histogramTags, + buckets, + 0, + 25_000_000, + 7 ); reporter.reportHistogramValueSamples( - "my-histogram", - histogramTags, - buckets, - 50_000_000, - 75_000_000, - 3 + "my-histogram", + histogramTags, + buckets, + 50_000_000, + 75_000_000, + 3 ); reporter.close(); - server.awaitAndClose(); + server.await(); - List batches = server.getService().getBatches(); - assertEquals(1, batches.size()); - assertNotNull(batches.get(0)); - assertEquals(2, batches.get(0).getMetrics().size()); - - // Verify first bucket - Metric metric = batches.get(0).getMetrics().get(0); - assertEquals("my-histogram", metric.getName()); - assertTrue(metric.isSetTags()); - assertEquals(3, metric.getTagsSize()); - - Map expectedTags = new HashMap<>(3, 1); - expectedTags.put("foo", "bar"); - expectedTags.put("bucketid", "0001"); - expectedTags.put("bucket", "0.000000-25000000.000000"); - for (MetricTag tag : metric.getTags()) { - assertEquals(expectedTags.get(tag.getTagName()), tag.getTagValue()); - } + receivedBatches = server.getService().getBatches(); + } - assertTrue(metric.isSetMetricValue()); + assertEquals(1, receivedBatches.size()); + assertNotNull(receivedBatches.get(0)); + assertEquals(2, receivedBatches.get(0).getMetrics().size()); - MetricValue value = metric.getMetricValue(); - assertTrue(value.isSetCount()); - assertFalse(value.isSetGauge()); - assertFalse(value.isSetTimer()); + // Verify first bucket + Metric metric = receivedBatches.get(0).getMetrics().get(0); + assertEquals("my-histogram", metric.getName()); + assertTrue(metric.isSetTags()); + assertEquals(3, metric.getTagsSize()); - CountValue count = value.getCount(); - assertTrue(count.isSetI64Value()); - assertEquals(7, count.getI64Value()); + Map expectedTags = new HashMap<>(3, 1); + expectedTags.put("foo", "bar"); + expectedTags.put("bucketid", "0001"); + expectedTags.put("bucket", "0.000000-25000000.000000"); + for (MetricTag tag : metric.getTags()) { + assertEquals(expectedTags.get(tag.getTagName()), tag.getTagValue()); + } - // Verify second bucket - metric = server.getService().getBatches().get(0).getMetrics().get(1); - assertEquals("my-histogram", metric.getName()); - assertTrue(metric.isSetTags()); - assertEquals(3, metric.getTagsSize()); + assertTrue(metric.isSetMetricValue()); - expectedTags.put("bucketid", "0003"); - expectedTags.put("bucket", "50000000.000000-75000000.000000"); - for (MetricTag tag : metric.getTags()) { - assertEquals(expectedTags.get(tag.getTagName()), tag.getTagValue()); - } + MetricValue value = metric.getMetricValue(); + assertTrue(value.isSetCount()); + assertFalse(value.isSetGauge()); + assertFalse(value.isSetTimer()); - assertTrue(metric.isSetMetricValue()); + CountValue count = value.getCount(); + assertTrue(count.isSetI64Value()); + assertEquals(7, count.getI64Value()); - value = metric.getMetricValue(); - assertTrue(value.isSetCount()); - assertFalse(value.isSetGauge()); - assertFalse(value.isSetTimer()); + // Verify second bucket + metric = receivedBatches.get(0).getMetrics().get(1); + assertEquals("my-histogram", metric.getName()); + assertTrue(metric.isSetTags()); + assertEquals(3, metric.getTagsSize()); - count = value.getCount(); - assertTrue(count.isSetI64Value()); - assertEquals(3, count.getI64Value()); - } finally { - server.awaitAndClose(); + expectedTags.put("bucketid", "0003"); + expectedTags.put("bucket", "50000000.000000-75000000.000000"); + for (MetricTag tag : metric.getTags()) { + assertEquals(expectedTags.get(tag.getTagName()), tag.getTagValue()); } + + assertTrue(metric.isSetMetricValue()); + + value = metric.getMetricValue(); + assertTrue(value.isSetCount()); + assertFalse(value.isSetGauge()); + assertFalse(value.isSetTimer()); + + count = value.getCount(); + assertTrue(count.isSetI64Value()); + assertEquals(3, count.getI64Value()); } @Test @@ -494,6 +443,12 @@ public void capability() { assertEquals(CapableOf.REPORTING_TAGGING, reporter.capabilities()); } + private static MockM3Server bootM3Collector(int expectedMetricsCount) { + final MockM3Server server = new MockM3Server(expectedMetricsCount, socketAddress); + new Thread(server::serve).start(); + return server; + } + @Test public void testSinglePacketPayloadOverflow() throws InterruptedException { // NOTE: Every metric emitted in this test is taking about 22 bytes, diff --git a/m3/src/test/java/com/uber/m3/tally/m3/MockM3Server.java b/m3/src/test/java/com/uber/m3/tally/m3/MockM3Server.java index f140081..bc26d9b 100644 --- a/m3/src/test/java/com/uber/m3/tally/m3/MockM3Server.java +++ b/m3/src/test/java/com/uber/m3/tally/m3/MockM3Server.java @@ -35,14 +35,13 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; -public class MockM3Server { +public class MockM3Server implements AutoCloseable { private static final Duration MAX_WAIT_TIMEOUT = Duration.ofSeconds(30); - private final CountDownLatch expectedMetricsLatch; - private TProcessor processor; - private TTransport transport; - private MockM3Service service; + private final TProcessor processor; + private final TTransport transport; + private final MockM3Service service; public MockM3Server( int expectedMetricsCount, @@ -81,12 +80,22 @@ public void serve() { } } - public void awaitAndClose() throws InterruptedException { + public MockM3Service getService() { + return service; + } + + /** + * Awaits receiving of all the expected metrics + * + * @throws InterruptedException + */ + public void await() throws InterruptedException { expectedMetricsLatch.await(MAX_WAIT_TIMEOUT.getSeconds(), TimeUnit.SECONDS); - transport.close(); } - public MockM3Service getService() { - return service; + @Override + public void close() { + // Close immediately without waiting + transport.close(); } } diff --git a/m3/src/test/java/com/uber/m3/tally/m3/MockM3Service.java b/m3/src/test/java/com/uber/m3/tally/m3/MockM3Service.java index f5f4d78..56ac7e8 100644 --- a/m3/src/test/java/com/uber/m3/tally/m3/MockM3Service.java +++ b/m3/src/test/java/com/uber/m3/tally/m3/MockM3Service.java @@ -59,7 +59,6 @@ public void emitMetricBatch(MetricBatch batch) throws TTransportException { for (Metric metric : batch.getMetrics()) { metrics.add(metric); - metricsCountLatch.countDown(); } From 168d73228c9aa8d8985b9fa72394f45255766915 Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Sun, 10 May 2020 13:32:33 -0700 Subject: [PATCH 07/31] Added TODOs --- m3/src/test/java/com/uber/m3/tally/m3/M3ReporterTest.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/m3/src/test/java/com/uber/m3/tally/m3/M3ReporterTest.java b/m3/src/test/java/com/uber/m3/tally/m3/M3ReporterTest.java index ef5a297..8dc5948 100644 --- a/m3/src/test/java/com/uber/m3/tally/m3/M3ReporterTest.java +++ b/m3/src/test/java/com/uber/m3/tally/m3/M3ReporterTest.java @@ -51,6 +51,9 @@ import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; +// TODO add tests to validate proper shutdown +// TODO add tests to validate uncaughts don't crash processor +// TODO add tests for multi-processor setup public class M3ReporterTest { private static double EPSILON = 1e-9; From fb19ef6fd9c9706e8088ee81fc69e26be7061206 Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Sun, 10 May 2020 13:32:47 -0700 Subject: [PATCH 08/31] Fixed closing in multi-processor setup --- .../java/com/uber/m3/tally/m3/M3Reporter.java | 19 +++---------------- .../com/uber/m3/tally/m3/SizedMetric.java | 1 - 2 files changed, 3 insertions(+), 17 deletions(-) diff --git a/m3/src/main/java/com/uber/m3/tally/m3/M3Reporter.java b/m3/src/main/java/com/uber/m3/tally/m3/M3Reporter.java index 69ea315..12558a1 100644 --- a/m3/src/main/java/com/uber/m3/tally/m3/M3Reporter.java +++ b/m3/src/main/java/com/uber/m3/tally/m3/M3Reporter.java @@ -214,9 +214,6 @@ public void close() { return; } - // Put sentinal value in queue so that processors know to disregard anything that comes after it. - queueSizedMetric(SizedMetric.CLOSE); - // Important to use `shutdownNow` instead of `shutdown` to interrupt processor // thread(s) or else they will block forever executor.shutdownNow(); @@ -488,12 +485,6 @@ public void run() { // catch block. SizedMetric sizedMetric = metricQueue.poll(maxBufferingDelay.toMillis(), TimeUnit.MILLISECONDS); - // Drop metrics that came in after close - if (sizedMetric == SizedMetric.CLOSE) { - metricQueue.clear(); - break; - } - if (sizedMetric == null) { // If we didn't get any new metrics after waiting the specified time, // flush what we have so far. @@ -542,14 +533,10 @@ private boolean elapsedMaxDelaySinceLastFlush() { } private void drainQueue() { - while (!metricQueue.isEmpty()) { - SizedMetric sizedMetric = metricQueue.remove(); - // Don't care about metrics that came in after close - if (sizedMetric == SizedMetric.CLOSE) { - break; - } + SizedMetric metrics; - process(sizedMetric); + while ((metrics = metricQueue.poll()) != null) { + process(metrics); } } diff --git a/m3/src/main/java/com/uber/m3/tally/m3/SizedMetric.java b/m3/src/main/java/com/uber/m3/tally/m3/SizedMetric.java index 40cb56b..2ffc94a 100644 --- a/m3/src/main/java/com/uber/m3/tally/m3/SizedMetric.java +++ b/m3/src/main/java/com/uber/m3/tally/m3/SizedMetric.java @@ -27,7 +27,6 @@ */ public class SizedMetric extends Metric { public static final SizedMetric FLUSH = new SizedMetric(null, -1); - public static final SizedMetric CLOSE = new SizedMetric(null, -2); private Metric metric; private int size; From 7c7e7c7c8105bf6519d2f059cf4d55413d771ab1 Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Sun, 10 May 2020 13:41:49 -0700 Subject: [PATCH 09/31] Fixed flushing sequence to work properly in multi-processor setup --- .../java/com/uber/m3/tally/m3/M3Reporter.java | 44 +++++++++++-------- .../com/uber/m3/tally/m3/SizedMetric.java | 2 - 2 files changed, 26 insertions(+), 20 deletions(-) diff --git a/m3/src/main/java/com/uber/m3/tally/m3/M3Reporter.java b/m3/src/main/java/com/uber/m3/tally/m3/M3Reporter.java index 12558a1..db3c14d 100644 --- a/m3/src/main/java/com/uber/m3/tally/m3/M3Reporter.java +++ b/m3/src/main/java/com/uber/m3/tally/m3/M3Reporter.java @@ -128,7 +128,9 @@ public class M3Reporter implements StatsReporter, AutoCloseable { // This is a synchronization barrier to make sure that reporter // is being shutdown only after all of its processor had done so - private final CountDownLatch shutdownLatch; + private final CountDownLatch processorsShutdownLatch; + + private final List processors; private final AtomicBoolean isShutdown = new AtomicBoolean(false); @@ -150,9 +152,11 @@ private M3Reporter(Builder builder) { commonTags = builder.metricTagSet; - shutdownLatch = new CountDownLatch(NUM_PROCESSORS); + processorsShutdownLatch = new CountDownLatch(NUM_PROCESSORS); + + processors = new ArrayList<>(); for (int i = 0; i < NUM_PROCESSORS; ++i) { - bootProcessors(builder.endpointSocketAddresses); + processors.add(bootProcessors(builder.endpointSocketAddresses)); } } @@ -180,9 +184,11 @@ private static String getHostName() { } } - private void bootProcessors(SocketAddress[] endpointSocketAddresses) { + private Processor bootProcessors(SocketAddress[] endpointSocketAddresses) { try { - executor.execute(new Processor(endpointSocketAddresses)); + Processor processor = new Processor(endpointSocketAddresses); + executor.execute(processor); + return processor; } catch (TTransportException | SocketException e) { LOG.error("Failed to boot processor", e); throw new RuntimeException(e); @@ -200,11 +206,7 @@ public void flush() { return; } - try { - metricQueue.put(SizedMetric.FLUSH); - } catch (InterruptedException e) { - LOG.warn("Interrupted while trying to queue flush sentinel"); - } + processors.forEach(Processor::scheduleFlush); } @Override @@ -221,7 +223,7 @@ public void close() { try { // Wait a maximum of `MAX_PROCESSOR_WAIT_ON_CLOSE_MILLIS` for all processors // to complete - if (!shutdownLatch.await(MAX_PROCESSOR_WAIT_ON_CLOSE_MILLIS, TimeUnit.MILLISECONDS)) { + if (!processorsShutdownLatch.await(MAX_PROCESSOR_WAIT_ON_CLOSE_MILLIS, TimeUnit.MILLISECONDS)) { LOG.warn( "M3Reporter closing before Processors complete after waiting timeout of {}ms!", MAX_PROCESSOR_WAIT_ON_CLOSE_MILLIS @@ -458,6 +460,8 @@ private class Processor implements Runnable { private final M3.Client client; private final TTransport transport; + private final AtomicBoolean shouldFlush = new AtomicBoolean(false); + Processor(SocketAddress[] socketAddresses) throws TTransportException, SocketException { TProtocolFactory protocolFactory = new TCompactProtocol.Factory(); @@ -477,6 +481,11 @@ private class Processor implements Runnable { public void run() { while (!isShutdown.get()) { try { + // Check whether flush has been requested by the reporter + if (shouldFlush.compareAndSet(true, false)) { + flushBuffered(); + } + // This `poll` call will block for at most the specified duration to take an item // off the queue. If we get an item, we append it to the queue to be flushed, // otherwise we flush what we have so far. @@ -488,7 +497,7 @@ public void run() { if (sizedMetric == null) { // If we didn't get any new metrics after waiting the specified time, // flush what we have so far. - process(SizedMetric.FLUSH); + flushBuffered(); } else { process(sizedMetric); } @@ -506,15 +515,10 @@ public void run() { transport.close(); // Count down shutdown latch to notify reporter - shutdownLatch.countDown(); + processorsShutdownLatch.countDown(); } private void process(SizedMetric sizedMetric) { - if (sizedMetric == SizedMetric.FLUSH) { - flushBuffered(); - return; - } - int size = sizedMetric.getSize(); if (bufferedBytes + size > payloadCapacity || elapsedMaxDelaySinceLastFlush()) { flushBuffered(); @@ -559,6 +563,10 @@ private void flushBuffered() { bufferedBytes = 0; lastBufferFlushTimestamp = Instant.now(clock); } + + public void scheduleFlush() { + shouldFlush.set(true); + } } /** diff --git a/m3/src/main/java/com/uber/m3/tally/m3/SizedMetric.java b/m3/src/main/java/com/uber/m3/tally/m3/SizedMetric.java index 2ffc94a..c172176 100644 --- a/m3/src/main/java/com/uber/m3/tally/m3/SizedMetric.java +++ b/m3/src/main/java/com/uber/m3/tally/m3/SizedMetric.java @@ -26,8 +26,6 @@ * A metric along with its associated size. */ public class SizedMetric extends Metric { - public static final SizedMetric FLUSH = new SizedMetric(null, -1); - private Metric metric; private int size; From 69104f62257876b07e9fd196a06b389916a6ad91 Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Sun, 10 May 2020 13:48:44 -0700 Subject: [PATCH 10/31] Tidying up --- m3/src/main/java/com/uber/m3/tally/m3/M3Reporter.java | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/m3/src/main/java/com/uber/m3/tally/m3/M3Reporter.java b/m3/src/main/java/com/uber/m3/tally/m3/M3Reporter.java index db3c14d..b8d4bf0 100644 --- a/m3/src/main/java/com/uber/m3/tally/m3/M3Reporter.java +++ b/m3/src/main/java/com/uber/m3/tally/m3/M3Reporter.java @@ -156,7 +156,7 @@ private M3Reporter(Builder builder) { processors = new ArrayList<>(); for (int i = 0; i < NUM_PROCESSORS; ++i) { - processors.add(bootProcessors(builder.endpointSocketAddresses)); + processors.add(bootProcessor(builder.endpointSocketAddresses)); } } @@ -184,7 +184,7 @@ private static String getHostName() { } } - private Processor bootProcessors(SocketAddress[] endpointSocketAddresses) { + private Processor bootProcessor(SocketAddress[] endpointSocketAddresses) { try { Processor processor = new Processor(endpointSocketAddresses); executor.execute(processor); @@ -508,7 +508,11 @@ public void run() { } } + LOG.warn("Processor shutting down"); + + // Drain queue of any remaining metrics submitted prior to shutdown; drainQueue(); + // Flush remaining buffers at last flushBuffered(); // Close transport @@ -516,6 +520,8 @@ public void run() { // Count down shutdown latch to notify reporter processorsShutdownLatch.countDown(); + + LOG.warn("Processor shut down"); } private void process(SizedMetric sizedMetric) { From 7fb2f3ad27c0957195559ae040dc1248fa920aaa Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Sun, 10 May 2020 13:59:05 -0700 Subject: [PATCH 11/31] Fixed tests --- .../com/uber/m3/tally/m3/M3ReporterTest.java | 19 ++++++------------- 1 file changed, 6 insertions(+), 13 deletions(-) diff --git a/m3/src/test/java/com/uber/m3/tally/m3/M3ReporterTest.java b/m3/src/test/java/com/uber/m3/tally/m3/M3ReporterTest.java index 8dc5948..5bdf8fb 100644 --- a/m3/src/test/java/com/uber/m3/tally/m3/M3ReporterTest.java +++ b/m3/src/test/java/com/uber/m3/tally/m3/M3ReporterTest.java @@ -114,6 +114,7 @@ public void reporter() throws InterruptedException { .includeHost(true) List receivedBatches; + List receivedMetrics; try (final M3Reporter reporter = reporterBuilder.build()) { try (final MockM3Server server = bootM3Collector(3)) { @@ -131,11 +132,10 @@ public void reporter() throws InterruptedException { server.await(); receivedBatches = server.getService().getBatches(); + receivedMetrics = server.getService().getMetrics(); } } - assertEquals(3, receivedBatches.size()); - // Validate common tags for (MetricBatch batch : receivedBatches) { assertNotNull(batch); @@ -152,18 +152,11 @@ public void reporter() throws InterruptedException { } // Validate metrics - List emittedCounters = receivedBatches.get(0).getMetrics(); - assertEquals(1, emittedCounters.size()); - - List emittedTimers = receivedBatches.get(1).getMetrics(); - assertEquals(1, emittedTimers.size()); - - List emittedGauges = receivedBatches.get(2).getMetrics(); - assertEquals(1, emittedGauges.size()); + assertEquals(3, receivedMetrics.size()); - Metric emittedCounter = emittedCounters.get(0); - Metric emittedTimer = emittedTimers.get(0); - Metric emittedGauge = emittedGauges.get(0); + Metric emittedCounter = receivedMetrics.get(0); + Metric emittedTimer = receivedMetrics.get(1); + Metric emittedGauge = receivedMetrics.get(2); assertEquals("my-counter", emittedCounter.getName()); assertTrue(emittedCounter.isSetTags()); From 4cc32e09e462a51570cbe18900a3566e020adcb9 Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Sun, 10 May 2020 14:11:20 -0700 Subject: [PATCH 12/31] Rebased --- .../com/uber/m3/tally/m3/M3ReporterTest.java | 53 +++++++------------ .../com/uber/m3/tally/m3/MockM3Service.java | 12 +++-- 2 files changed, 29 insertions(+), 36 deletions(-) diff --git a/m3/src/test/java/com/uber/m3/tally/m3/M3ReporterTest.java b/m3/src/test/java/com/uber/m3/tally/m3/M3ReporterTest.java index 5bdf8fb..30c097a 100644 --- a/m3/src/test/java/com/uber/m3/tally/m3/M3ReporterTest.java +++ b/m3/src/test/java/com/uber/m3/tally/m3/M3ReporterTest.java @@ -82,8 +82,6 @@ public void setupTest() { new M3Reporter.Builder(socketAddress) .service("test-service") .commonTags(DEFAULT_TAGS) - .maxQueueSize(MAX_QUEUE_SIZE) - .maxPacketSizeBytes(MAX_PACKET_SIZE_BYTES) .build(); } @@ -111,7 +109,7 @@ public void reporter() throws InterruptedException { new M3Reporter.Builder(socketAddress) .service("test-service") .commonTags(commonTags) - .includeHost(true) + .includeHost(true); List receivedBatches; List receivedMetrics; @@ -452,50 +450,39 @@ public void testSinglePacketPayloadOverflow() throws InterruptedException { // which should be split across 2 UDP datagrams with the current configuration int expectedMetricsCount = 5_000; - final MockM3Server server = new MockM3Server(expectedMetricsCount, socketAddress); - - Thread serverThread = new Thread(server::serve); - serverThread.start(); - // NOTE: We're using default max packet size - M3Reporter reporter = new M3Reporter.Builder(socketAddress) + M3Reporter.Builder reporterBuilder = new M3Reporter.Builder(socketAddress) .service("test-service") .commonTags( ImmutableMap.of("env", "test") ) // Effectively disable time-based flushing to only keep // size-based one - .maxProcessorWaitUntilFlushMillis(1_000_000) - .build(); - - ImmutableMap emptyTags = - new ImmutableMap.Builder(0).build(); - - for (int i = 0; i < expectedMetricsCount; ++i) { - // NOTE: The goal is to minimize the metric size, to make sure - // they're granular enough to detect any transport/reporter configuration - // inconsistencies - reporter.reportCounter("c", emptyTags, 1); - } + .maxProcessorWaitUntilFlushMillis(1_000_000); - // Make sure reporter is flushed - reporter.flush(); + List metrics; - // Shutdown both reporter and server - reporter.close(); - server.awaitAndClose(); + try (final M3Reporter reporter = reporterBuilder.build()) { + try (final MockM3Server server = new MockM3Server(expectedMetricsCount, socketAddress)) { - List batches = server.getService().getBatches(); + ImmutableMap emptyTags = + new ImmutableMap.Builder(0).build(); - int totalMetrics = 0; + for (int i = 0; i < expectedMetricsCount; ++i) { + // NOTE: The goal is to minimize the metric size, to make sure + // they're granular enough to detect any transport/reporter configuration + // inconsistencies + reporter.reportCounter("c", emptyTags, 1); + } - // Validate that all metrics had been received - for (MetricBatch batch : batches) { - assertNotNull(batch); + // Shutdown both reporter and server + reporter.close(); + server.await(); - totalMetrics += batch.metrics.size(); + metrics = server.getService().getMetrics(); + } } - assertEquals(totalMetrics, expectedMetricsCount); + assertEquals(metrics.size(), expectedMetricsCount); } } diff --git a/m3/src/test/java/com/uber/m3/tally/m3/MockM3Service.java b/m3/src/test/java/com/uber/m3/tally/m3/MockM3Service.java index 56ac7e8..d136a82 100644 --- a/m3/src/test/java/com/uber/m3/tally/m3/MockM3Service.java +++ b/m3/src/test/java/com/uber/m3/tally/m3/MockM3Service.java @@ -23,7 +23,6 @@ import com.uber.m3.thrift.gen.M3; import com.uber.m3.thrift.gen.Metric; import com.uber.m3.thrift.gen.MetricBatch; -import org.apache.thrift.transport.TTransportException; import java.util.ArrayList; import java.util.List; @@ -43,16 +42,23 @@ public MockM3Service(CountDownLatch metricsCountLatch) { public List getBatches() { lock.readLock().lock(); - try { return batches; } finally { lock.readLock().unlock(); } } + public List getMetrics() { + lock.readLock().lock(); + try { + return metrics; + } finally { + lock.readLock().unlock(); + } + } @Override - public void emitMetricBatch(MetricBatch batch) throws TTransportException { + public void emitMetricBatch(MetricBatch batch) { lock.writeLock().lock(); batches.add(batch); From 89777f4173126f9b82f5fa6dc254e92b83f93441 Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Sun, 10 May 2020 14:15:40 -0700 Subject: [PATCH 13/31] Fixed tests --- .../test/java/com/uber/m3/tally/m3/M3ReporterTest.java | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/m3/src/test/java/com/uber/m3/tally/m3/M3ReporterTest.java b/m3/src/test/java/com/uber/m3/tally/m3/M3ReporterTest.java index 30c097a..82c314f 100644 --- a/m3/src/test/java/com/uber/m3/tally/m3/M3ReporterTest.java +++ b/m3/src/test/java/com/uber/m3/tally/m3/M3ReporterTest.java @@ -462,8 +462,8 @@ public void testSinglePacketPayloadOverflow() throws InterruptedException { List metrics; - try (final M3Reporter reporter = reporterBuilder.build()) { - try (final MockM3Server server = new MockM3Server(expectedMetricsCount, socketAddress)) { + try (final MockM3Server server =bootM3Collector(expectedMetricsCount)) { + try (final M3Reporter reporter = reporterBuilder.build()) { ImmutableMap emptyTags = new ImmutableMap.Builder(0).build(); @@ -475,14 +475,15 @@ public void testSinglePacketPayloadOverflow() throws InterruptedException { reporter.reportCounter("c", emptyTags, 1); } - // Shutdown both reporter and server + // Shutdown reporter reporter.close(); + server.await(); metrics = server.getService().getMetrics(); } } - assertEquals(metrics.size(), expectedMetricsCount); + assertEquals(expectedMetricsCount, metrics.size()); } } From 15e757c9578573c09ac506568be8f2e2071d4bbf Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Tue, 21 Jul 2020 16:36:22 -0700 Subject: [PATCH 14/31] Make metrics snapshotting explicit --- m3/src/test/java/com/uber/m3/tally/m3/M3ReporterTest.java | 4 ++-- m3/src/test/java/com/uber/m3/tally/m3/MockM3Service.java | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/m3/src/test/java/com/uber/m3/tally/m3/M3ReporterTest.java b/m3/src/test/java/com/uber/m3/tally/m3/M3ReporterTest.java index 82c314f..9939dee 100644 --- a/m3/src/test/java/com/uber/m3/tally/m3/M3ReporterTest.java +++ b/m3/src/test/java/com/uber/m3/tally/m3/M3ReporterTest.java @@ -130,7 +130,7 @@ public void reporter() throws InterruptedException { server.await(); receivedBatches = server.getService().getBatches(); - receivedMetrics = server.getService().getMetrics(); + receivedMetrics = server.getService().snapshotMetrics(); } } @@ -480,7 +480,7 @@ public void testSinglePacketPayloadOverflow() throws InterruptedException { server.await(); - metrics = server.getService().getMetrics(); + metrics = server.getService().snapshotMetrics(); } } diff --git a/m3/src/test/java/com/uber/m3/tally/m3/MockM3Service.java b/m3/src/test/java/com/uber/m3/tally/m3/MockM3Service.java index d136a82..edca061 100644 --- a/m3/src/test/java/com/uber/m3/tally/m3/MockM3Service.java +++ b/m3/src/test/java/com/uber/m3/tally/m3/MockM3Service.java @@ -48,10 +48,10 @@ public List getBatches() { lock.readLock().unlock(); } } - public List getMetrics() { + public List snapshotMetrics() { lock.readLock().lock(); try { - return metrics; + return new ArrayList<>(metrics); } finally { lock.readLock().unlock(); } From 54695aeb0d392f426f24f0c14bcbcf6a829e9e4b Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Wed, 22 Jul 2020 09:58:51 -0700 Subject: [PATCH 15/31] Tidying up --- m3/src/test/java/com/uber/m3/tally/m3/M3ReporterTest.java | 8 ++++---- m3/src/test/java/com/uber/m3/tally/m3/MockM3Service.java | 4 ++-- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/m3/src/test/java/com/uber/m3/tally/m3/M3ReporterTest.java b/m3/src/test/java/com/uber/m3/tally/m3/M3ReporterTest.java index 9939dee..47f1355 100644 --- a/m3/src/test/java/com/uber/m3/tally/m3/M3ReporterTest.java +++ b/m3/src/test/java/com/uber/m3/tally/m3/M3ReporterTest.java @@ -129,7 +129,7 @@ public void reporter() throws InterruptedException { reporter.close(); server.await(); - receivedBatches = server.getService().getBatches(); + receivedBatches = server.getService().snapshotBatches(); receivedMetrics = server.getService().snapshotMetrics(); } } @@ -236,7 +236,7 @@ public void reporterFinalFlush() throws InterruptedException { server.await(); - List batches = server.getService().getBatches(); + List batches = server.getService().snapshotBatches(); assertEquals(1, batches.size()); assertNotNull(batches.get(0)); assertEquals(1, batches.get(0).getMetrics().size()); @@ -284,7 +284,7 @@ public void reporterHistogramDurations() throws InterruptedException { reporter.close(); server.await(); - receivedBatches = server.getService().getBatches(); + receivedBatches = server.getService().snapshotBatches(); } assertEquals(1, receivedBatches.size()); @@ -371,7 +371,7 @@ public void reporterHistogramValues() throws InterruptedException { reporter.close(); server.await(); - receivedBatches = server.getService().getBatches(); + receivedBatches = server.getService().snapshotBatches(); } assertEquals(1, receivedBatches.size()); diff --git a/m3/src/test/java/com/uber/m3/tally/m3/MockM3Service.java b/m3/src/test/java/com/uber/m3/tally/m3/MockM3Service.java index edca061..7e3265f 100644 --- a/m3/src/test/java/com/uber/m3/tally/m3/MockM3Service.java +++ b/m3/src/test/java/com/uber/m3/tally/m3/MockM3Service.java @@ -40,10 +40,10 @@ public MockM3Service(CountDownLatch metricsCountLatch) { this.metricsCountLatch = metricsCountLatch; } - public List getBatches() { + public List snapshotBatches() { lock.readLock().lock(); try { - return batches; + return new ArrayList<>(batches); } finally { lock.readLock().unlock(); } From 6ff5e52165af2093b24b30389a1fc81b4c8a5f59 Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Wed, 22 Jul 2020 14:26:00 -0700 Subject: [PATCH 16/31] Tidying up; Increased t/o to 1 minute (to make sure it accommodates for being run on heavily loaded CI instance) --- m3/src/test/java/com/uber/m3/tally/m3/M3ReporterTest.java | 4 ++-- m3/src/test/java/com/uber/m3/tally/m3/MockM3Server.java | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/m3/src/test/java/com/uber/m3/tally/m3/M3ReporterTest.java b/m3/src/test/java/com/uber/m3/tally/m3/M3ReporterTest.java index 47f1355..589d699 100644 --- a/m3/src/test/java/com/uber/m3/tally/m3/M3ReporterTest.java +++ b/m3/src/test/java/com/uber/m3/tally/m3/M3ReporterTest.java @@ -344,7 +344,7 @@ public void reporterHistogramDurations() throws InterruptedException { public void reporterHistogramValues() throws InterruptedException { List receivedBatches; - try(final MockM3Server server = bootM3Collector(2);) { + try (final MockM3Server server = bootM3Collector(2);) { Buckets buckets = ValueBuckets.linear(0, 25_000_000, 5); Map histogramTags = new HashMap<>(); @@ -462,7 +462,7 @@ public void testSinglePacketPayloadOverflow() throws InterruptedException { List metrics; - try (final MockM3Server server =bootM3Collector(expectedMetricsCount)) { + try (final MockM3Server server = bootM3Collector(expectedMetricsCount)) { try (final M3Reporter reporter = reporterBuilder.build()) { ImmutableMap emptyTags = diff --git a/m3/src/test/java/com/uber/m3/tally/m3/MockM3Server.java b/m3/src/test/java/com/uber/m3/tally/m3/MockM3Server.java index bc26d9b..db14545 100644 --- a/m3/src/test/java/com/uber/m3/tally/m3/MockM3Server.java +++ b/m3/src/test/java/com/uber/m3/tally/m3/MockM3Server.java @@ -36,7 +36,7 @@ import java.util.concurrent.TimeUnit; public class MockM3Server implements AutoCloseable { - private static final Duration MAX_WAIT_TIMEOUT = Duration.ofSeconds(30); + private static final Duration MAX_WAIT_TIMEOUT = Duration.ofMinutes(1); private final CountDownLatch expectedMetricsLatch; private final TProcessor processor; From dbcefa3e9505feb15f17fba9d1de74127017a72d Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Wed, 22 Jul 2020 14:50:30 -0700 Subject: [PATCH 17/31] Extracted wait-timeout to `M3ReporterTest` --- .../com/uber/m3/tally/m3/M3ReporterTest.java | 26 +++++++------------ .../com/uber/m3/tally/m3/MockM3Server.java | 5 ++-- 2 files changed, 12 insertions(+), 19 deletions(-) diff --git a/m3/src/test/java/com/uber/m3/tally/m3/M3ReporterTest.java b/m3/src/test/java/com/uber/m3/tally/m3/M3ReporterTest.java index 589d699..d9c25f8 100644 --- a/m3/src/test/java/com/uber/m3/tally/m3/M3ReporterTest.java +++ b/m3/src/test/java/com/uber/m3/tally/m3/M3ReporterTest.java @@ -35,7 +35,6 @@ import com.uber.m3.util.ImmutableMap; import org.junit.After; import org.junit.Before; -import org.junit.BeforeClass; import org.junit.Test; import java.net.InetAddress; @@ -55,6 +54,9 @@ // TODO add tests to validate uncaughts don't crash processor // TODO add tests for multi-processor setup public class M3ReporterTest { + + private static final java.time.Duration MAX_WAIT_TIMEOUT = java.time.Duration.ofSeconds(10); + private static double EPSILON = 1e-9; private static SocketAddress socketAddress; @@ -67,17 +69,9 @@ public class M3ReporterTest { private M3Reporter reporter; - @BeforeClass - public static void setup() { - try { - socketAddress = new InetSocketAddress(InetAddress.getByName("localhost"), 4448); - } catch (UnknownHostException e) { - throw new RuntimeException("Unable to get localhost"); - } - } - @Before - public void setupTest() { + public void setupTest() throws UnknownHostException { + socketAddress = new InetSocketAddress(InetAddress.getByName("localhost"), 12345); reporter = new M3Reporter.Builder(socketAddress) .service("test-service") @@ -127,7 +121,7 @@ public void reporter() throws InterruptedException { // Shutdown both reporter and server reporter.close(); - server.await(); + server.await(MAX_WAIT_TIMEOUT); receivedBatches = server.getService().snapshotBatches(); receivedMetrics = server.getService().snapshotMetrics(); @@ -234,7 +228,7 @@ public void reporterFinalFlush() throws InterruptedException { reporter.reportTimer("final-flush-timer", null, Duration.ofMillis(10)); reporter.close(); - server.await(); + server.await(MAX_WAIT_TIMEOUT); List batches = server.getService().snapshotBatches(); assertEquals(1, batches.size()); @@ -282,7 +276,7 @@ public void reporterHistogramDurations() throws InterruptedException { ); reporter.close(); - server.await(); + server.await(MAX_WAIT_TIMEOUT); receivedBatches = server.getService().snapshotBatches(); } @@ -369,7 +363,7 @@ public void reporterHistogramValues() throws InterruptedException { ); reporter.close(); - server.await(); + server.await(MAX_WAIT_TIMEOUT); receivedBatches = server.getService().snapshotBatches(); } @@ -478,7 +472,7 @@ public void testSinglePacketPayloadOverflow() throws InterruptedException { // Shutdown reporter reporter.close(); - server.await(); + server.await(MAX_WAIT_TIMEOUT); metrics = server.getService().snapshotMetrics(); } diff --git a/m3/src/test/java/com/uber/m3/tally/m3/MockM3Server.java b/m3/src/test/java/com/uber/m3/tally/m3/MockM3Server.java index db14545..570e0e3 100644 --- a/m3/src/test/java/com/uber/m3/tally/m3/MockM3Server.java +++ b/m3/src/test/java/com/uber/m3/tally/m3/MockM3Server.java @@ -36,7 +36,6 @@ import java.util.concurrent.TimeUnit; public class MockM3Server implements AutoCloseable { - private static final Duration MAX_WAIT_TIMEOUT = Duration.ofMinutes(1); private final CountDownLatch expectedMetricsLatch; private final TProcessor processor; @@ -89,8 +88,8 @@ public MockM3Service getService() { * * @throws InterruptedException */ - public void await() throws InterruptedException { - expectedMetricsLatch.await(MAX_WAIT_TIMEOUT.getSeconds(), TimeUnit.SECONDS); + public void await(Duration waitTimeout) throws InterruptedException { + expectedMetricsLatch.await(waitTimeout.getSeconds(), TimeUnit.SECONDS); } @Override From 7b3ec45132c498a9c814175a73d97f10c40fcc18 Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Wed, 22 Jul 2020 14:58:44 -0700 Subject: [PATCH 18/31] Make gradle test output verbose --- gradle.properties | 1 + 1 file changed, 1 insertion(+) diff --git a/gradle.properties b/gradle.properties index 30e00a8..4288754 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,2 +1,3 @@ # DO NOT REMOVE - USED FOR INTERNAL TRACKING devexp.template.version = 'lib-1.0.0' +org.gradle.logging.level=debug From ce72a9480cf260c29c0b1e13d4975712ebc159ce Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Wed, 22 Jul 2020 15:20:40 -0700 Subject: [PATCH 19/31] Reverted gradle debug output; Enabled tests output; Adding logger to test deps to properly print tests output --- build.gradle | 11 +++++++++-- gradle.properties | 1 - 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/build.gradle b/build.gradle index ee6d6e8..46cd4cd 100644 --- a/build.gradle +++ b/build.gradle @@ -61,8 +61,15 @@ allprojects { } dependencies { - testCompile('junit:junit:4.12') - testCompile "org.mockito:mockito-core:[1.0, 2.0)" + testCompile 'junit:junit:4.12' + testCompile 'org.mockito:mockito-core:[1.0, 2.0)' + testCompile 'ch.qos.logback:logback-classic:1.2.3' + testCompile 'ch.qos.logback:logback-core:1.2.3' + } + + test { + testLogging.showStandardStreams = true + testLogging.exceptionFormat = 'full' } ext.ossrhUsername = System.getenv('OSSRH_JIRA_USERNAME') diff --git a/gradle.properties b/gradle.properties index 4288754..30e00a8 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,3 +1,2 @@ # DO NOT REMOVE - USED FOR INTERNAL TRACKING devexp.template.version = 'lib-1.0.0' -org.gradle.logging.level=debug From 69dc2b06c104964d1cc196bf4532446ea78fc03a Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Wed, 22 Jul 2020 15:20:50 -0700 Subject: [PATCH 20/31] Additional logging --- m3/src/main/java/com/uber/m3/tally/m3/M3Reporter.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/m3/src/main/java/com/uber/m3/tally/m3/M3Reporter.java b/m3/src/main/java/com/uber/m3/tally/m3/M3Reporter.java index b8d4bf0..2979114 100644 --- a/m3/src/main/java/com/uber/m3/tally/m3/M3Reporter.java +++ b/m3/src/main/java/com/uber/m3/tally/m3/M3Reporter.java @@ -475,6 +475,8 @@ private class Processor implements Runnable { transport.open(); client = new M3.Client(protocolFactory.getProtocol(transport)); + + LOG.info("Booted reporting processor"); } @Override From d764cc6ed66c02f46ef1b0aa1da5504de306900e Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Wed, 22 Jul 2020 15:26:53 -0700 Subject: [PATCH 21/31] Replaced "localhost" w/ "127.0.0.1" --- m3/src/test/java/com/uber/m3/tally/m3/M3ReporterTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/m3/src/test/java/com/uber/m3/tally/m3/M3ReporterTest.java b/m3/src/test/java/com/uber/m3/tally/m3/M3ReporterTest.java index d9c25f8..88bf411 100644 --- a/m3/src/test/java/com/uber/m3/tally/m3/M3ReporterTest.java +++ b/m3/src/test/java/com/uber/m3/tally/m3/M3ReporterTest.java @@ -71,7 +71,7 @@ public class M3ReporterTest { @Before public void setupTest() throws UnknownHostException { - socketAddress = new InetSocketAddress(InetAddress.getByName("localhost"), 12345); + socketAddress = new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 12345); reporter = new M3Reporter.Builder(socketAddress) .service("test-service") From 804a39bb5dcfb05d14c3a55eb08e243a5f1000cb Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Wed, 22 Jul 2020 15:29:51 -0700 Subject: [PATCH 22/31] Increasing the t/o value --- m3/src/test/java/com/uber/m3/tally/m3/M3ReporterTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/m3/src/test/java/com/uber/m3/tally/m3/M3ReporterTest.java b/m3/src/test/java/com/uber/m3/tally/m3/M3ReporterTest.java index 88bf411..4e36a2f 100644 --- a/m3/src/test/java/com/uber/m3/tally/m3/M3ReporterTest.java +++ b/m3/src/test/java/com/uber/m3/tally/m3/M3ReporterTest.java @@ -55,7 +55,7 @@ // TODO add tests for multi-processor setup public class M3ReporterTest { - private static final java.time.Duration MAX_WAIT_TIMEOUT = java.time.Duration.ofSeconds(10); + private static final java.time.Duration MAX_WAIT_TIMEOUT = java.time.Duration.ofSeconds(30); private static double EPSILON = 1e-9; From 2546bcc6ab0b176c751bdc65d5606d9f7e0dee8e Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Wed, 22 Jul 2020 15:52:03 -0700 Subject: [PATCH 23/31] Fixed tests to not boot reporter twice --- .../com/uber/m3/tally/m3/M3ReporterTest.java | 160 +++++++++--------- 1 file changed, 81 insertions(+), 79 deletions(-) diff --git a/m3/src/test/java/com/uber/m3/tally/m3/M3ReporterTest.java b/m3/src/test/java/com/uber/m3/tally/m3/M3ReporterTest.java index 4e36a2f..3b3e114 100644 --- a/m3/src/test/java/com/uber/m3/tally/m3/M3ReporterTest.java +++ b/m3/src/test/java/com/uber/m3/tally/m3/M3ReporterTest.java @@ -33,8 +33,7 @@ import com.uber.m3.thrift.gen.TimerValue; import com.uber.m3.util.Duration; import com.uber.m3.util.ImmutableMap; -import org.junit.After; -import org.junit.Before; +import org.junit.BeforeClass; import org.junit.Test; import java.net.InetAddress; @@ -67,22 +66,16 @@ public class M3ReporterTest { "host", "test-host" ); - private M3Reporter reporter; + private M3Reporter.Builder reporterBuilder = + new M3Reporter.Builder(socketAddress) + .service("test-service") + .commonTags(DEFAULT_TAGS); - @Before - public void setupTest() throws UnknownHostException { + @BeforeClass + public static void setup() throws UnknownHostException { socketAddress = new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 12345); - reporter = - new M3Reporter.Builder(socketAddress) - .service("test-service") - .commonTags(DEFAULT_TAGS) - .build(); } - @After - public void teardownTest() { - reporter.close(); - } @Test public void reporter() throws InterruptedException { @@ -225,25 +218,29 @@ public void builderMissingCommonTag() { @Test public void reporterFinalFlush() throws InterruptedException { try (final MockM3Server server = bootM3Collector(1)) { - reporter.reportTimer("final-flush-timer", null, Duration.ofMillis(10)); - reporter.close(); + try (final M3Reporter reporter = reporterBuilder.build()) { + reporter.reportTimer("final-flush-timer", null, Duration.ofMillis(10)); + reporter.close(); - server.await(MAX_WAIT_TIMEOUT); + server.await(MAX_WAIT_TIMEOUT); - List batches = server.getService().snapshotBatches(); - assertEquals(1, batches.size()); - assertNotNull(batches.get(0)); - assertEquals(1, batches.get(0).getMetrics().size()); + List batches = server.getService().snapshotBatches(); + assertEquals(1, batches.size()); + assertNotNull(batches.get(0)); + assertEquals(1, batches.get(0).getMetrics().size()); + } } } @Test public void reporterAfterCloseNoThrow() throws InterruptedException { try (final MockM3Server server = bootM3Collector(0);) { - reporter.close(); + try (final M3Reporter reporter = reporterBuilder.build()) { + reporter.close(); - reporter.reportGauge("my-gauge", null, 4.2); - reporter.flush(); + reporter.reportGauge("my-gauge", null, 4.2); + reporter.flush(); + } } } @@ -252,33 +249,35 @@ public void reporterHistogramDurations() throws InterruptedException { List receivedBatches; try (final MockM3Server server = bootM3Collector(2)) { - Buckets buckets = DurationBuckets.linear(Duration.ZERO, Duration.ofMillis(25), 5); - - Map histogramTags = new HashMap<>(); - histogramTags.put("foo", "bar"); - - reporter.reportHistogramDurationSamples( - "my-histogram", - histogramTags, - buckets, - Duration.ZERO, - Duration.ofMillis(25), - 7 - ); - - reporter.reportHistogramDurationSamples( - "my-histogram", - histogramTags, - buckets, - Duration.ofMillis(50), - Duration.ofMillis(75), - 3 - ); - - reporter.close(); - server.await(MAX_WAIT_TIMEOUT); - - receivedBatches = server.getService().snapshotBatches(); + try (final M3Reporter reporter = reporterBuilder.build()) { + Buckets buckets = DurationBuckets.linear(Duration.ZERO, Duration.ofMillis(25), 5); + + Map histogramTags = new HashMap<>(); + histogramTags.put("foo", "bar"); + + reporter.reportHistogramDurationSamples( + "my-histogram", + histogramTags, + buckets, + Duration.ZERO, + Duration.ofMillis(25), + 7 + ); + + reporter.reportHistogramDurationSamples( + "my-histogram", + histogramTags, + buckets, + Duration.ofMillis(50), + Duration.ofMillis(75), + 3 + ); + + reporter.close(); + server.await(MAX_WAIT_TIMEOUT); + + receivedBatches = server.getService().snapshotBatches(); + } } assertEquals(1, receivedBatches.size()); @@ -338,34 +337,37 @@ public void reporterHistogramDurations() throws InterruptedException { public void reporterHistogramValues() throws InterruptedException { List receivedBatches; - try (final MockM3Server server = bootM3Collector(2);) { - Buckets buckets = ValueBuckets.linear(0, 25_000_000, 5); - - Map histogramTags = new HashMap<>(); - histogramTags.put("foo", "bar"); - - reporter.reportHistogramValueSamples( - "my-histogram", - histogramTags, - buckets, - 0, - 25_000_000, - 7 - ); - - reporter.reportHistogramValueSamples( - "my-histogram", - histogramTags, - buckets, - 50_000_000, - 75_000_000, - 3 - ); - - reporter.close(); - server.await(MAX_WAIT_TIMEOUT); - - receivedBatches = server.getService().snapshotBatches(); + try (final MockM3Server server = bootM3Collector(2)) { + try (final M3Reporter reporter = reporterBuilder.build()) { + + Buckets buckets = ValueBuckets.linear(0, 25_000_000, 5); + + Map histogramTags = new HashMap<>(); + histogramTags.put("foo", "bar"); + + reporter.reportHistogramValueSamples( + "my-histogram", + histogramTags, + buckets, + 0, + 25_000_000, + 7 + ); + + reporter.reportHistogramValueSamples( + "my-histogram", + histogramTags, + buckets, + 50_000_000, + 75_000_000, + 3 + ); + + reporter.close(); + server.await(MAX_WAIT_TIMEOUT); + + receivedBatches = server.getService().snapshotBatches(); + } } assertEquals(1, receivedBatches.size()); From e1dd6fe11dccaaf18fed2319d60465d649fa018f Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Wed, 22 Jul 2020 15:59:52 -0700 Subject: [PATCH 24/31] Added more logs --- m3/src/test/java/com/uber/m3/tally/m3/MockM3Server.java | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/m3/src/test/java/com/uber/m3/tally/m3/MockM3Server.java b/m3/src/test/java/com/uber/m3/tally/m3/MockM3Server.java index 570e0e3..a9078f1 100644 --- a/m3/src/test/java/com/uber/m3/tally/m3/MockM3Server.java +++ b/m3/src/test/java/com/uber/m3/tally/m3/MockM3Server.java @@ -28,6 +28,8 @@ import org.apache.thrift.protocol.TProtocol; import org.apache.thrift.transport.TTransport; import org.apache.thrift.transport.TTransportException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.net.SocketAddress; import java.net.SocketException; @@ -36,6 +38,9 @@ import java.util.concurrent.TimeUnit; public class MockM3Server implements AutoCloseable { + + private static final Logger LOG = LoggerFactory.getLogger(MockM3Server.class); + private final CountDownLatch expectedMetricsLatch; private final TProcessor processor; @@ -60,6 +65,8 @@ public MockM3Server( public void serve() { try { transport.open(); + + LOG.info("Opened receiving server socket"); } catch (TTransportException e) { throw new RuntimeException("Failed to open socket", e); } @@ -96,5 +103,7 @@ public void await(Duration waitTimeout) throws InterruptedException { public void close() { // Close immediately without waiting transport.close(); + + LOG.info("Closing receiving server socket"); } } From d980ce274fa0d38bcda3b406274ad4ccd8ea397d Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Wed, 22 Jul 2020 16:11:38 -0700 Subject: [PATCH 25/31] Added a little more logs --- .../main/java/com/uber/m3/tally/m3/thrift/TUdpClient.java | 2 ++ .../java/com/uber/m3/tally/m3/thrift/TUdpTransport.java | 6 ++++++ 2 files changed, 8 insertions(+) diff --git a/m3/src/main/java/com/uber/m3/tally/m3/thrift/TUdpClient.java b/m3/src/main/java/com/uber/m3/tally/m3/thrift/TUdpClient.java index 1f91ccf..bc7bc47 100644 --- a/m3/src/main/java/com/uber/m3/tally/m3/thrift/TUdpClient.java +++ b/m3/src/main/java/com/uber/m3/tally/m3/thrift/TUdpClient.java @@ -50,6 +50,8 @@ public TUdpClient(SocketAddress socketAddress) throws SocketException { public void open() throws TTransportException { try { socket.connect(socketAddress); + + logger.info("UDP socket has been opened"); } catch (SocketException e) { throw new TTransportException("Error opening transport", e); } diff --git a/m3/src/main/java/com/uber/m3/tally/m3/thrift/TUdpTransport.java b/m3/src/main/java/com/uber/m3/tally/m3/thrift/TUdpTransport.java index 894288c..a0b6eaf 100644 --- a/m3/src/main/java/com/uber/m3/tally/m3/thrift/TUdpTransport.java +++ b/m3/src/main/java/com/uber/m3/tally/m3/thrift/TUdpTransport.java @@ -23,6 +23,8 @@ import org.apache.http.annotation.GuardedBy; import org.apache.thrift.transport.TTransport; import org.apache.thrift.transport.TTransportException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; import java.net.DatagramPacket; @@ -41,6 +43,8 @@ public abstract class TUdpTransport extends TTransport implements AutoCloseable // 65535 - 512 = 65023 bytes public static final int PACKET_DATA_PAYLOAD_MAX_SIZE = 65023; + protected final Logger logger = LoggerFactory.getLogger(getClass()); + protected final Object sendLock = new Object(); protected final SocketAddress socketAddress; @@ -82,6 +86,8 @@ public boolean isOpen() { @Override public void close() { socket.close(); + + logger.info("UDP socket has been closed"); } @Override From 34441effdd3c977ae458d5a0ff1c64941407a272 Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Wed, 22 Jul 2020 16:15:11 -0700 Subject: [PATCH 26/31] A little more logs --- m3/src/main/java/com/uber/m3/tally/m3/thrift/TUdpServer.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/m3/src/main/java/com/uber/m3/tally/m3/thrift/TUdpServer.java b/m3/src/main/java/com/uber/m3/tally/m3/thrift/TUdpServer.java index 0e6ca72..02cca89 100644 --- a/m3/src/main/java/com/uber/m3/tally/m3/thrift/TUdpServer.java +++ b/m3/src/main/java/com/uber/m3/tally/m3/thrift/TUdpServer.java @@ -57,6 +57,8 @@ public void open() throws TTransportException { try { socket.bind(socketAddress); socket.setSoTimeout(timeoutMillis); + + logger.info("UDP socket has been bound to"); } catch (SocketException e) { throw new TTransportException("Error opening transport", e); } From aa6fab43e7c24c0eede1962f0a8d7f8aa2550101 Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Wed, 22 Jul 2020 16:15:24 -0700 Subject: [PATCH 27/31] `lint` --- m3/src/test/java/com/uber/m3/tally/m3/M3ReporterTest.java | 1 - 1 file changed, 1 deletion(-) diff --git a/m3/src/test/java/com/uber/m3/tally/m3/M3ReporterTest.java b/m3/src/test/java/com/uber/m3/tally/m3/M3ReporterTest.java index 3b3e114..472821b 100644 --- a/m3/src/test/java/com/uber/m3/tally/m3/M3ReporterTest.java +++ b/m3/src/test/java/com/uber/m3/tally/m3/M3ReporterTest.java @@ -76,7 +76,6 @@ public static void setup() throws UnknownHostException { socketAddress = new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 12345); } - @Test public void reporter() throws InterruptedException { ImmutableMap commonTags = new ImmutableMap.Builder(5) From c3512ba0dd785511d2356d7a6515f4222ba87b91 Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Wed, 22 Jul 2020 16:26:00 -0700 Subject: [PATCH 28/31] Fixed RC of `MockM3Server` not being waited for to boot up --- .../com/uber/m3/tally/m3/M3ReporterTest.java | 13 +++++++------ .../com/uber/m3/tally/m3/MockM3Server.java | 18 +++++++++++++++--- 2 files changed, 22 insertions(+), 9 deletions(-) diff --git a/m3/src/test/java/com/uber/m3/tally/m3/M3ReporterTest.java b/m3/src/test/java/com/uber/m3/tally/m3/M3ReporterTest.java index 472821b..14d616e 100644 --- a/m3/src/test/java/com/uber/m3/tally/m3/M3ReporterTest.java +++ b/m3/src/test/java/com/uber/m3/tally/m3/M3ReporterTest.java @@ -113,7 +113,7 @@ public void reporter() throws InterruptedException { // Shutdown both reporter and server reporter.close(); - server.await(MAX_WAIT_TIMEOUT); + server.awaitReceiving(MAX_WAIT_TIMEOUT); receivedBatches = server.getService().snapshotBatches(); receivedMetrics = server.getService().snapshotMetrics(); @@ -221,7 +221,7 @@ public void reporterFinalFlush() throws InterruptedException { reporter.reportTimer("final-flush-timer", null, Duration.ofMillis(10)); reporter.close(); - server.await(MAX_WAIT_TIMEOUT); + server.awaitReceiving(MAX_WAIT_TIMEOUT); List batches = server.getService().snapshotBatches(); assertEquals(1, batches.size()); @@ -273,7 +273,7 @@ public void reporterHistogramDurations() throws InterruptedException { ); reporter.close(); - server.await(MAX_WAIT_TIMEOUT); + server.awaitReceiving(MAX_WAIT_TIMEOUT); receivedBatches = server.getService().snapshotBatches(); } @@ -363,7 +363,7 @@ public void reporterHistogramValues() throws InterruptedException { ); reporter.close(); - server.await(MAX_WAIT_TIMEOUT); + server.awaitReceiving(MAX_WAIT_TIMEOUT); receivedBatches = server.getService().snapshotBatches(); } @@ -432,9 +432,10 @@ public void capability() { assertEquals(CapableOf.REPORTING_TAGGING, reporter.capabilities()); } - private static MockM3Server bootM3Collector(int expectedMetricsCount) { + private static MockM3Server bootM3Collector(int expectedMetricsCount) throws InterruptedException { final MockM3Server server = new MockM3Server(expectedMetricsCount, socketAddress); new Thread(server::serve).start(); + server.awaitStarting(); return server; } @@ -473,7 +474,7 @@ public void testSinglePacketPayloadOverflow() throws InterruptedException { // Shutdown reporter reporter.close(); - server.await(MAX_WAIT_TIMEOUT); + server.awaitReceiving(MAX_WAIT_TIMEOUT); metrics = server.getService().snapshotMetrics(); } diff --git a/m3/src/test/java/com/uber/m3/tally/m3/MockM3Server.java b/m3/src/test/java/com/uber/m3/tally/m3/MockM3Server.java index a9078f1..6513810 100644 --- a/m3/src/test/java/com/uber/m3/tally/m3/MockM3Server.java +++ b/m3/src/test/java/com/uber/m3/tally/m3/MockM3Server.java @@ -36,6 +36,9 @@ import java.time.Duration; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.AbstractQueuedSynchronizer; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; public class MockM3Server implements AutoCloseable { @@ -43,6 +46,8 @@ public class MockM3Server implements AutoCloseable { private final CountDownLatch expectedMetricsLatch; + private final Condition started = new ReentrantLock().newCondition(); + private final TProcessor processor; private final TTransport transport; private final MockM3Service service; @@ -73,6 +78,8 @@ public void serve() { TProtocol protocol = new TCompactProtocol.Factory().getProtocol(transport); + started.notifyAll(); + while (transport.isOpen()) { try { processor.process(protocol, protocol); @@ -92,13 +99,18 @@ public MockM3Service getService() { /** * Awaits receiving of all the expected metrics - * - * @throws InterruptedException */ - public void await(Duration waitTimeout) throws InterruptedException { + public void awaitReceiving(Duration waitTimeout) throws InterruptedException { expectedMetricsLatch.await(waitTimeout.getSeconds(), TimeUnit.SECONDS); } + /** + * Awaits for the server to be fully booted up + */ + public void awaitStarting() throws InterruptedException { + started.await(); + } + @Override public void close() { // Close immediately without waiting From e3b7a2f14ed1736b84d1efec59a9e3017fd865b7 Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Wed, 22 Jul 2020 16:29:12 -0700 Subject: [PATCH 29/31] Properly sync on the monitor --- .../java/com/uber/m3/tally/m3/MockM3Server.java | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/m3/src/test/java/com/uber/m3/tally/m3/MockM3Server.java b/m3/src/test/java/com/uber/m3/tally/m3/MockM3Server.java index 6513810..e742aab 100644 --- a/m3/src/test/java/com/uber/m3/tally/m3/MockM3Server.java +++ b/m3/src/test/java/com/uber/m3/tally/m3/MockM3Server.java @@ -36,9 +36,6 @@ import java.time.Duration; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.AbstractQueuedSynchronizer; -import java.util.concurrent.locks.Condition; -import java.util.concurrent.locks.ReentrantLock; public class MockM3Server implements AutoCloseable { @@ -46,7 +43,7 @@ public class MockM3Server implements AutoCloseable { private final CountDownLatch expectedMetricsLatch; - private final Condition started = new ReentrantLock().newCondition(); + private final Object startupMonitor = new Object(); private final TProcessor processor; private final TTransport transport; @@ -78,7 +75,9 @@ public void serve() { TProtocol protocol = new TCompactProtocol.Factory().getProtocol(transport); - started.notifyAll(); + synchronized (startupMonitor) { + startupMonitor.notifyAll(); + } while (transport.isOpen()) { try { @@ -108,7 +107,9 @@ public void awaitReceiving(Duration waitTimeout) throws InterruptedException { * Awaits for the server to be fully booted up */ public void awaitStarting() throws InterruptedException { - started.await(); + synchronized (startupMonitor) { + startupMonitor.wait(); + } } @Override From 56382f35fbaefd7587cc077f79739b6ef370e06d Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Fri, 24 Jul 2020 19:40:32 -0700 Subject: [PATCH 30/31] Reduced scope of catch-clause, re-throw un-actionable exceptions; --- .../java/com/uber/m3/tally/m3/M3Reporter.java | 33 ++++++++++++++----- 1 file changed, 25 insertions(+), 8 deletions(-) diff --git a/m3/src/main/java/com/uber/m3/tally/m3/M3Reporter.java b/m3/src/main/java/com/uber/m3/tally/m3/M3Reporter.java index 2979114..e0a5cf1 100644 --- a/m3/src/main/java/com/uber/m3/tally/m3/M3Reporter.java +++ b/m3/src/main/java/com/uber/m3/tally/m3/M3Reporter.java @@ -503,19 +503,22 @@ public void run() { } else { process(sizedMetric); } + } catch (InterruptedException t) { + // no-op } catch (Throwable t) { // This is fly-away guard making sure that uncaught exception - // will not crash the processor + // will be logged LOG.error("Unhandled exception in processor", t); + throw new RuntimeException(t); } } LOG.warn("Processor shutting down"); // Drain queue of any remaining metrics submitted prior to shutdown; - drainQueue(); - // Flush remaining buffers at last - flushBuffered(); + runNoThrow(this::drainQueue); + // Flush remaining buffers at last (best effort) + runNoThrow(this::flushBuffered); // Close transport transport.close(); @@ -526,7 +529,7 @@ public void run() { LOG.warn("Processor shut down"); } - private void process(SizedMetric sizedMetric) { + private void process(SizedMetric sizedMetric) throws TException { int size = sizedMetric.getSize(); if (bufferedBytes + size > payloadCapacity || elapsedMaxDelaySinceLastFlush()) { flushBuffered(); @@ -544,7 +547,7 @@ private boolean elapsedMaxDelaySinceLastFlush() { ); } - private void drainQueue() { + private void drainQueue() throws TException { SizedMetric metrics; while ((metrics = metricQueue.poll()) != null) { @@ -552,7 +555,7 @@ private void drainQueue() { } } - private void flushBuffered() { + private void flushBuffered() throws TException { if (metricsBuffer.isEmpty()) { return; } @@ -563,8 +566,9 @@ private void flushBuffered() { .setCommonTags(commonTags) .setMetrics(metricsBuffer) ); - } catch (Throwable t) { + } catch (TException t) { LOG.error("Failed to flush metrics", t); + throw t; } metricsBuffer.clear(); @@ -577,6 +581,19 @@ public void scheduleFlush() { } } + private static void runNoThrow(ThrowingRunnable r) { + try { + r.run(); + } catch (Throwable t) { + // no-op + } + } + + @FunctionalInterface + interface ThrowingRunnable { + void run() throws Exception; + } + /** * This class provides the facility to calculate the size of the payload serialized through {@link TCompactProtocol}, * using phony {@link TCalcTransport} as a measurer From bf8513b21c1dd8b8705440e1b12161ff36d96944 Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Fri, 24 Jul 2020 19:45:25 -0700 Subject: [PATCH 31/31] `lint` --- .../java/com/uber/m3/tally/m3/M3Reporter.java | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/m3/src/main/java/com/uber/m3/tally/m3/M3Reporter.java b/m3/src/main/java/com/uber/m3/tally/m3/M3Reporter.java index e0a5cf1..174c7c0 100644 --- a/m3/src/main/java/com/uber/m3/tally/m3/M3Reporter.java +++ b/m3/src/main/java/com/uber/m3/tally/m3/M3Reporter.java @@ -448,6 +448,14 @@ private void queueSizedMetric(SizedMetric sizedMetric) { } } + private static void runNoThrow(ThrowingRunnable r) { + try { + r.run(); + } catch (Throwable t) { + // no-op + } + } + private class Processor implements Runnable { private final List metricsBuffer = @@ -581,14 +589,6 @@ public void scheduleFlush() { } } - private static void runNoThrow(ThrowingRunnable r) { - try { - r.run(); - } catch (Throwable t) { - // no-op - } - } - @FunctionalInterface interface ThrowingRunnable { void run() throws Exception;