diff --git a/build.gradle b/build.gradle index ee6d6e8..46cd4cd 100644 --- a/build.gradle +++ b/build.gradle @@ -61,8 +61,15 @@ allprojects { } dependencies { - testCompile('junit:junit:4.12') - testCompile "org.mockito:mockito-core:[1.0, 2.0)" + testCompile 'junit:junit:4.12' + testCompile 'org.mockito:mockito-core:[1.0, 2.0)' + testCompile 'ch.qos.logback:logback-classic:1.2.3' + testCompile 'ch.qos.logback:logback-core:1.2.3' + } + + test { + testLogging.showStandardStreams = true + testLogging.exceptionFormat = 'full' } ext.ossrhUsername = System.getenv('OSSRH_JIRA_USERNAME') diff --git a/m3/src/main/java/com/uber/m3/tally/m3/M3Reporter.java b/m3/src/main/java/com/uber/m3/tally/m3/M3Reporter.java index c307a00..174c7c0 100644 --- a/m3/src/main/java/com/uber/m3/tally/m3/M3Reporter.java +++ b/m3/src/main/java/com/uber/m3/tally/m3/M3Reporter.java @@ -99,20 +99,23 @@ public class M3Reporter implements StatsReporter, AutoCloseable { private static final int MIN_METRIC_BUCKET_ID_TAG_LENGTH = 4; + /** + * NOTE: DO NOT CHANGE THIS NUMBER! + * Reporter architecture is not suited for multi-processor setup and might cause some disruption + * to how metrics are processed and eventually submitted to M3 collectors; + */ private static final int NUM_PROCESSORS = 1; private static final ThreadLocal PAYLOAD_SIZE_ESTIMATOR = ThreadLocal.withInitial(SerializedPayloadSizeEstimator::new); - private M3.Client client; - - private Duration maxBufferingDelay; + private final Duration maxBufferingDelay; private final int payloadCapacity; - private String bucketIdTagName; - private String bucketTagName; - private String bucketValFmt; + private final String bucketIdTagName; + private final String bucketTagName; + private final String bucketValFmt; private final Set commonTags; @@ -125,51 +128,35 @@ public class M3Reporter implements StatsReporter, AutoCloseable { // This is a synchronization barrier to make sure that reporter // is being shutdown only after all of its processor had done so - private CountDownLatch shutdownLatch = new CountDownLatch(NUM_PROCESSORS); + private final CountDownLatch processorsShutdownLatch; - private TTransport transport; + private final List processors; - private AtomicBoolean isShutdown = new AtomicBoolean(false); + private final AtomicBoolean isShutdown = new AtomicBoolean(false); // Use inner Builder class to construct an M3Reporter private M3Reporter(Builder builder) { - try { - // Builder verifies non-null, non-empty socketAddresses - SocketAddress[] socketAddresses = builder.socketAddresses; + payloadCapacity = calculatePayloadCapacity(builder.maxPacketSizeBytes, builder.metricTagSet); - TProtocolFactory protocolFactory = new TCompactProtocol.Factory(); + maxBufferingDelay = Duration.ofMillis(builder.maxProcessorWaitUntilFlushMillis); - if (socketAddresses.length > 1) { - transport = new TMultiUdpClient(socketAddresses); - } else { - transport = new TUdpClient(socketAddresses[0]); - } - - transport.open(); - - client = new M3.Client(protocolFactory.getProtocol(transport)); - - payloadCapacity = calculatePayloadCapacity(builder.maxPacketSizeBytes, builder.metricTagSet); + bucketIdTagName = builder.histogramBucketIdName; + bucketTagName = builder.histogramBucketName; + bucketValFmt = String.format("%%.%df", builder.histogramBucketTagPrecision); - maxBufferingDelay = Duration.ofMillis(builder.maxProcessorWaitUntilFlushMillis); + metricQueue = new LinkedBlockingQueue<>(builder.maxQueueSize); - bucketIdTagName = builder.histogramBucketIdName; - bucketTagName = builder.histogramBucketName; - bucketValFmt = String.format("%%.%df", builder.histogramBucketTagPrecision); + executor = builder.executor != null ? builder.executor : Executors.newFixedThreadPool(NUM_PROCESSORS); - metricQueue = new LinkedBlockingQueue<>(builder.maxQueueSize); + clock = Clock.systemUTC(); - executor = builder.executor != null ? builder.executor : Executors.newFixedThreadPool(NUM_PROCESSORS); + commonTags = builder.metricTagSet; - clock = Clock.systemUTC(); + processorsShutdownLatch = new CountDownLatch(NUM_PROCESSORS); - commonTags = builder.metricTagSet; - - for (int i = 0; i < NUM_PROCESSORS; ++i) { - addAndRunProcessor(); - } - } catch (TTransportException | SocketException e) { - throw new RuntimeException("Exception creating M3Reporter", e); + processors = new ArrayList<>(); + for (int i = 0; i < NUM_PROCESSORS; ++i) { + processors.add(bootProcessor(builder.endpointSocketAddresses)); } } @@ -197,8 +184,15 @@ private static String getHostName() { } } - private void addAndRunProcessor() { - executor.execute(new Processor()); + private Processor bootProcessor(SocketAddress[] endpointSocketAddresses) { + try { + Processor processor = new Processor(endpointSocketAddresses); + executor.execute(processor); + return processor; + } catch (TTransportException | SocketException e) { + LOG.error("Failed to boot processor", e); + throw new RuntimeException(e); + } } @Override @@ -212,11 +206,7 @@ public void flush() { return; } - try { - metricQueue.put(SizedMetric.FLUSH); - } catch (InterruptedException e) { - LOG.warn("Interrupted while trying to queue flush sentinel"); - } + processors.forEach(Processor::scheduleFlush); } @Override @@ -226,9 +216,6 @@ public void close() { return; } - // Put sentinal value in queue so that processors know to disregard anything that comes after it. - queueSizedMetric(SizedMetric.CLOSE); - // Important to use `shutdownNow` instead of `shutdown` to interrupt processor // thread(s) or else they will block forever executor.shutdownNow(); @@ -236,7 +223,7 @@ public void close() { try { // Wait a maximum of `MAX_PROCESSOR_WAIT_ON_CLOSE_MILLIS` for all processors // to complete - if (!shutdownLatch.await(MAX_PROCESSOR_WAIT_ON_CLOSE_MILLIS, TimeUnit.MILLISECONDS)) { + if (!processorsShutdownLatch.await(MAX_PROCESSOR_WAIT_ON_CLOSE_MILLIS, TimeUnit.MILLISECONDS)) { LOG.warn( "M3Reporter closing before Processors complete after waiting timeout of {}ms!", MAX_PROCESSOR_WAIT_ON_CLOSE_MILLIS @@ -245,8 +232,6 @@ public void close() { } catch (InterruptedException e) { LOG.warn("M3Reporter closing before Processors complete due to being interrupted!"); } - - transport.close(); } private static Set toMetricTagSet(Map tags) { @@ -463,6 +448,14 @@ private void queueSizedMetric(SizedMetric sizedMetric) { } } + private static void runNoThrow(ThrowingRunnable r) { + try { + r.run(); + } catch (Throwable t) { + // no-op + } + } + private class Processor implements Runnable { private final List metricsBuffer = @@ -470,12 +463,39 @@ private class Processor implements Runnable { private Instant lastBufferFlushTimestamp = Instant.now(clock); - private int metricsSize = 0; + private int bufferedBytes = 0; + + private final M3.Client client; + private final TTransport transport; + + private final AtomicBoolean shouldFlush = new AtomicBoolean(false); + + Processor(SocketAddress[] socketAddresses) throws TTransportException, SocketException { + TProtocolFactory protocolFactory = new TCompactProtocol.Factory(); + + if (socketAddresses.length > 1) { + transport = new TMultiUdpClient(socketAddresses); + } else { + transport = new TUdpClient(socketAddresses[0]); + } + + // Open the socket + transport.open(); + + client = new M3.Client(protocolFactory.getProtocol(transport)); + + LOG.info("Booted reporting processor"); + } @Override public void run() { - try { - while (!executor.isShutdown()) { + while (!isShutdown.get()) { + try { + // Check whether flush has been requested by the reporter + if (shouldFlush.compareAndSet(true, false)) { + flushBuffered(); + } + // This `poll` call will block for at most the specified duration to take an item // off the queue. If we get an item, we append it to the queue to be flushed, // otherwise we flush what we have so far. @@ -484,46 +504,49 @@ public void run() { // catch block. SizedMetric sizedMetric = metricQueue.poll(maxBufferingDelay.toMillis(), TimeUnit.MILLISECONDS); - // Drop metrics that came in after close - if (sizedMetric == SizedMetric.CLOSE) { - metricQueue.clear(); - break; - } - if (sizedMetric == null) { // If we didn't get any new metrics after waiting the specified time, // flush what we have so far. - process(SizedMetric.FLUSH); + flushBuffered(); } else { process(sizedMetric); } - + } catch (InterruptedException t) { + // no-op + } catch (Throwable t) { + // This is fly-away guard making sure that uncaught exception + // will be logged + LOG.error("Unhandled exception in processor", t); + throw new RuntimeException(t); } - } catch (InterruptedException e) { - // Don't care if we get interrupted - the finally block will clean up - } finally { - drainQueue(); - flushBuffered(); - // Count down shutdown latch to notify reporter - shutdownLatch.countDown(); } - } - private void process(SizedMetric sizedMetric) { - if (sizedMetric == SizedMetric.FLUSH) { - flushBuffered(); - return; - } + LOG.warn("Processor shutting down"); + + // Drain queue of any remaining metrics submitted prior to shutdown; + runNoThrow(this::drainQueue); + // Flush remaining buffers at last (best effort) + runNoThrow(this::flushBuffered); + + // Close transport + transport.close(); + // Count down shutdown latch to notify reporter + processorsShutdownLatch.countDown(); + + LOG.warn("Processor shut down"); + } + + private void process(SizedMetric sizedMetric) throws TException { int size = sizedMetric.getSize(); - if (metricsSize + size > payloadCapacity || elapsedMaxDelaySinceLastFlush()) { + if (bufferedBytes + size > payloadCapacity || elapsedMaxDelaySinceLastFlush()) { flushBuffered(); } Metric metric = sizedMetric.getMetric(); metricsBuffer.add(metric); - metricsSize += size; + bufferedBytes += size; } private boolean elapsedMaxDelaySinceLastFlush() { @@ -532,19 +555,15 @@ private boolean elapsedMaxDelaySinceLastFlush() { ); } - private void drainQueue() { - while (!metricQueue.isEmpty()) { - SizedMetric sizedMetric = metricQueue.remove(); - // Don't care about metrics that came in after close - if (sizedMetric == SizedMetric.CLOSE) { - break; - } + private void drainQueue() throws TException { + SizedMetric metrics; - process(sizedMetric); + while ((metrics = metricQueue.poll()) != null) { + process(metrics); } } - private void flushBuffered() { + private void flushBuffered() throws TException { if (metricsBuffer.isEmpty()) { return; } @@ -555,14 +574,24 @@ private void flushBuffered() { .setCommonTags(commonTags) .setMetrics(metricsBuffer) ); - } catch (TException tException) { - LOG.warn("Failed to flush metrics: " + tException.getMessage()); + } catch (TException t) { + LOG.error("Failed to flush metrics", t); + throw t; } metricsBuffer.clear(); - metricsSize = 0; + bufferedBytes = 0; lastBufferFlushTimestamp = Instant.now(clock); } + + public void scheduleFlush() { + shouldFlush.set(true); + } + } + + @FunctionalInterface + interface ThrowingRunnable { + void run() throws Exception; } /** @@ -602,7 +631,7 @@ public int evaluateByteSize(Metric metric) { * Builder pattern to construct an {@link M3Reporter}. */ public static class Builder { - protected SocketAddress[] socketAddresses; + protected SocketAddress[] endpointSocketAddresses; protected String service; protected String env; protected ExecutorService executor; @@ -621,14 +650,14 @@ public static class Builder { /** * Constructs a {@link Builder}. Having at least one {@code SocketAddress} is required. - * @param socketAddresses the array of {@code SocketAddress}es for this {@link M3Reporter} + * @param endpointSocketAddresses the array of {@code SocketAddress}es for this {@link M3Reporter} */ - public Builder(SocketAddress[] socketAddresses) { - if (socketAddresses == null || socketAddresses.length == 0) { + public Builder(SocketAddress[] endpointSocketAddresses) { + if (endpointSocketAddresses == null || endpointSocketAddresses.length == 0) { throw new IllegalArgumentException("Must specify at least one SocketAddress"); } - this.socketAddresses = socketAddresses; + this.endpointSocketAddresses = endpointSocketAddresses; } /** diff --git a/m3/src/main/java/com/uber/m3/tally/m3/SizedMetric.java b/m3/src/main/java/com/uber/m3/tally/m3/SizedMetric.java index 40cb56b..c172176 100644 --- a/m3/src/main/java/com/uber/m3/tally/m3/SizedMetric.java +++ b/m3/src/main/java/com/uber/m3/tally/m3/SizedMetric.java @@ -26,9 +26,6 @@ * A metric along with its associated size. */ public class SizedMetric extends Metric { - public static final SizedMetric FLUSH = new SizedMetric(null, -1); - public static final SizedMetric CLOSE = new SizedMetric(null, -2); - private Metric metric; private int size; diff --git a/m3/src/main/java/com/uber/m3/tally/m3/thrift/TUdpClient.java b/m3/src/main/java/com/uber/m3/tally/m3/thrift/TUdpClient.java index 1f91ccf..bc7bc47 100644 --- a/m3/src/main/java/com/uber/m3/tally/m3/thrift/TUdpClient.java +++ b/m3/src/main/java/com/uber/m3/tally/m3/thrift/TUdpClient.java @@ -50,6 +50,8 @@ public TUdpClient(SocketAddress socketAddress) throws SocketException { public void open() throws TTransportException { try { socket.connect(socketAddress); + + logger.info("UDP socket has been opened"); } catch (SocketException e) { throw new TTransportException("Error opening transport", e); } diff --git a/m3/src/main/java/com/uber/m3/tally/m3/thrift/TUdpServer.java b/m3/src/main/java/com/uber/m3/tally/m3/thrift/TUdpServer.java index 0e6ca72..02cca89 100644 --- a/m3/src/main/java/com/uber/m3/tally/m3/thrift/TUdpServer.java +++ b/m3/src/main/java/com/uber/m3/tally/m3/thrift/TUdpServer.java @@ -57,6 +57,8 @@ public void open() throws TTransportException { try { socket.bind(socketAddress); socket.setSoTimeout(timeoutMillis); + + logger.info("UDP socket has been bound to"); } catch (SocketException e) { throw new TTransportException("Error opening transport", e); } diff --git a/m3/src/main/java/com/uber/m3/tally/m3/thrift/TUdpTransport.java b/m3/src/main/java/com/uber/m3/tally/m3/thrift/TUdpTransport.java index 894288c..a0b6eaf 100644 --- a/m3/src/main/java/com/uber/m3/tally/m3/thrift/TUdpTransport.java +++ b/m3/src/main/java/com/uber/m3/tally/m3/thrift/TUdpTransport.java @@ -23,6 +23,8 @@ import org.apache.http.annotation.GuardedBy; import org.apache.thrift.transport.TTransport; import org.apache.thrift.transport.TTransportException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; import java.net.DatagramPacket; @@ -41,6 +43,8 @@ public abstract class TUdpTransport extends TTransport implements AutoCloseable // 65535 - 512 = 65023 bytes public static final int PACKET_DATA_PAYLOAD_MAX_SIZE = 65023; + protected final Logger logger = LoggerFactory.getLogger(getClass()); + protected final Object sendLock = new Object(); protected final SocketAddress socketAddress; @@ -82,6 +86,8 @@ public boolean isOpen() { @Override public void close() { socket.close(); + + logger.info("UDP socket has been closed"); } @Override diff --git a/m3/src/test/java/com/uber/m3/tally/m3/M3ReporterTest.java b/m3/src/test/java/com/uber/m3/tally/m3/M3ReporterTest.java index b82c45b..14d616e 100644 --- a/m3/src/test/java/com/uber/m3/tally/m3/M3ReporterTest.java +++ b/m3/src/test/java/com/uber/m3/tally/m3/M3ReporterTest.java @@ -49,7 +49,13 @@ import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; +// TODO add tests to validate proper shutdown +// TODO add tests to validate uncaughts don't crash processor +// TODO add tests for multi-processor setup public class M3ReporterTest { + + private static final java.time.Duration MAX_WAIT_TIMEOUT = java.time.Duration.ofSeconds(30); + private static double EPSILON = 1e-9; private static SocketAddress socketAddress; @@ -60,31 +66,19 @@ public class M3ReporterTest { "host", "test-host" ); + private M3Reporter.Builder reporterBuilder = + new M3Reporter.Builder(socketAddress) + .service("test-service") + .commonTags(DEFAULT_TAGS); + @BeforeClass - public static void setup() { - try { - socketAddress = new InetSocketAddress(InetAddress.getByName("localhost"), 4448); - } catch (UnknownHostException e) { - throw new RuntimeException("Unable to get localhost"); - } + public static void setup() throws UnknownHostException { + socketAddress = new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 12345); } @Test public void reporter() throws InterruptedException { - final MockM3Server server = new MockM3Server(3, socketAddress); - M3Reporter reporter = null; - - Thread serverThread = new Thread(new Runnable() { - @Override - public void run() { - server.serve(); - } - }); - - try { - serverThread.start(); - - ImmutableMap commonTags = new ImmutableMap.Builder(5) + ImmutableMap commonTags = new ImmutableMap.Builder(5) .put("env", "development") .put("host", "default") .put("commonTag1", "val1") @@ -92,112 +86,105 @@ public void run() { .put("commonTag3", "val3") .build(); - reporter = new M3Reporter.Builder(socketAddress) - .service("test-service") - .commonTags(commonTags) - .includeHost(true) - .build(); - - ImmutableMap tags = new ImmutableMap.Builder(2) + ImmutableMap tags = new ImmutableMap.Builder(2) .put("testTag1", "testVal1") .put("testTag2", "testVal2") .build(); - reporter.reportCounter("my-counter", tags, 10); - reporter.flush(); + M3Reporter.Builder reporterBuilder = + new M3Reporter.Builder(socketAddress) + .service("test-service") + .commonTags(commonTags) + .includeHost(true); - reporter.reportTimer("my-timer", tags, Duration.ofMillis(5)); - reporter.flush(); + List receivedBatches; + List receivedMetrics; - reporter.reportGauge("my-gauge", tags, 42.42); - reporter.flush(); + try (final M3Reporter reporter = reporterBuilder.build()) { + try (final MockM3Server server = bootM3Collector(3)) { + reporter.reportCounter("my-counter", tags, 10); + reporter.flush(); - // Shutdown both reporter and server - reporter.close(); - server.awaitAndClose(); + reporter.reportTimer("my-timer", tags, Duration.ofMillis(5)); + reporter.flush(); - List batches = server.getService().getBatches(); - assertEquals(3, batches.size()); + reporter.reportGauge("my-gauge", tags, 42.42); + reporter.flush(); - // Validate common tags - for (MetricBatch batch : batches) { - assertNotNull(batch); - assertTrue(batch.isSetCommonTags()); - assertEquals(commonTags.size() + 1, batch.getCommonTags().size()); + // Shutdown both reporter and server + reporter.close(); + server.awaitReceiving(MAX_WAIT_TIMEOUT); - for (MetricTag tag : batch.getCommonTags()) { - if (tag.getTagName().equals(M3Reporter.SERVICE_TAG)) { - assertEquals("test-service", tag.getTagValue()); - } else { - assertEquals(commonTags.get(tag.getTagName()), tag.getTagValue()); - } - } + receivedBatches = server.getService().snapshotBatches(); + receivedMetrics = server.getService().snapshotMetrics(); } + } - // Validate metrics - List emittedCounters = batches.get(0).getMetrics(); - assertEquals(1, emittedCounters.size()); - - List emittedTimers = batches.get(1).getMetrics(); - assertEquals(1, emittedTimers.size()); - - List emittedGauges = batches.get(2).getMetrics(); - assertEquals(1, emittedGauges.size()); + // Validate common tags + for (MetricBatch batch : receivedBatches) { + assertNotNull(batch); + assertTrue(batch.isSetCommonTags()); + assertEquals(commonTags.size() + 1, batch.getCommonTags().size()); + + for (MetricTag tag : batch.getCommonTags()) { + if (tag.getTagName().equals(M3Reporter.SERVICE_TAG)) { + assertEquals("test-service", tag.getTagValue()); + } else { + assertEquals(commonTags.get(tag.getTagName()), tag.getTagValue()); + } + } + } - Metric emittedCounter = emittedCounters.get(0); - Metric emittedTimer = emittedTimers.get(0); - Metric emittedGauge = emittedGauges.get(0); + // Validate metrics + assertEquals(3, receivedMetrics.size()); - assertEquals("my-counter", emittedCounter.getName()); - assertTrue(emittedCounter.isSetTags()); - assertEquals(tags.size(), emittedCounter.getTagsSize()); + Metric emittedCounter = receivedMetrics.get(0); + Metric emittedTimer = receivedMetrics.get(1); + Metric emittedGauge = receivedMetrics.get(2); - for (MetricTag tag : emittedCounter.getTags()) { - assertEquals(tags.get(tag.getTagName()), tag.getTagValue()); - } + assertEquals("my-counter", emittedCounter.getName()); + assertTrue(emittedCounter.isSetTags()); + assertEquals(tags.size(), emittedCounter.getTagsSize()); - // Validate counter - assertTrue(emittedCounter.isSetMetricValue()); + for (MetricTag tag : emittedCounter.getTags()) { + assertEquals(tags.get(tag.getTagName()), tag.getTagValue()); + } - MetricValue emittedValue = emittedCounter.getMetricValue(); - assertTrue(emittedValue.isSetCount()); - assertFalse(emittedValue.isSetGauge()); - assertFalse(emittedValue.isSetTimer()); + // Validate counter + assertTrue(emittedCounter.isSetMetricValue()); - CountValue emittedCount = emittedValue.getCount(); - assertTrue(emittedCount.isSetI64Value()); - assertEquals(10, emittedCount.getI64Value()); + MetricValue emittedValue = emittedCounter.getMetricValue(); + assertTrue(emittedValue.isSetCount()); + assertFalse(emittedValue.isSetGauge()); + assertFalse(emittedValue.isSetTimer()); - // Validate timer - assertTrue(emittedTimer.isSetMetricValue()); + CountValue emittedCount = emittedValue.getCount(); + assertTrue(emittedCount.isSetI64Value()); + assertEquals(10, emittedCount.getI64Value()); - emittedValue = emittedTimer.getMetricValue(); - assertFalse(emittedValue.isSetCount()); - assertFalse(emittedValue.isSetGauge()); - assertTrue(emittedValue.isSetTimer()); + // Validate timer + assertTrue(emittedTimer.isSetMetricValue()); - TimerValue emittedTimerValue = emittedValue.getTimer(); - assertTrue(emittedTimerValue.isSetI64Value()); - assertEquals(5_000_000, emittedTimerValue.getI64Value()); + emittedValue = emittedTimer.getMetricValue(); + assertFalse(emittedValue.isSetCount()); + assertFalse(emittedValue.isSetGauge()); + assertTrue(emittedValue.isSetTimer()); - // Validate gauge - assertTrue(emittedGauge.isSetMetricValue()); + TimerValue emittedTimerValue = emittedValue.getTimer(); + assertTrue(emittedTimerValue.isSetI64Value()); + assertEquals(5_000_000, emittedTimerValue.getI64Value()); - emittedValue = emittedGauge.getMetricValue(); - assertFalse(emittedValue.isSetCount()); - assertTrue(emittedValue.isSetGauge()); - assertFalse(emittedValue.isSetTimer()); + // Validate gauge + assertTrue(emittedGauge.isSetMetricValue()); - GaugeValue emittedGaugeValue = emittedValue.getGauge(); - assertTrue(emittedGaugeValue.isSetDValue()); - assertEquals(42.42, emittedGaugeValue.getDValue(), EPSILON); - } finally { - if (reporter != null) { - reporter.close(); - } + emittedValue = emittedGauge.getMetricValue(); + assertFalse(emittedValue.isSetCount()); + assertTrue(emittedValue.isSetGauge()); + assertFalse(emittedValue.isSetTimer()); - server.awaitAndClose(); - } + GaugeValue emittedGaugeValue = emittedValue.getGauge(); + assertTrue(emittedGaugeValue.isSetDValue()); + assertEquals(42.42, emittedGaugeValue.getDValue(), EPSILON); } @Test @@ -229,112 +216,75 @@ public void builderMissingCommonTag() { @Test public void reporterFinalFlush() throws InterruptedException { - final MockM3Server server = new MockM3Server(1, socketAddress); - - Thread serverThread = new Thread(new Runnable() { - @Override - public void run() { - server.serve(); - } - }); - - serverThread.start(); - - M3Reporter reporter = new M3Reporter.Builder(socketAddress) - .service("test-service") - .commonTags(DEFAULT_TAGS) - .build(); - - reporter.reportTimer("final-flush-timer", null, Duration.ofMillis(10)); + try (final MockM3Server server = bootM3Collector(1)) { + try (final M3Reporter reporter = reporterBuilder.build()) { + reporter.reportTimer("final-flush-timer", null, Duration.ofMillis(10)); + reporter.close(); - reporter.close(); - server.awaitAndClose(); + server.awaitReceiving(MAX_WAIT_TIMEOUT); - List batches = server.getService().getBatches(); - assertEquals(1, batches.size()); - assertNotNull(batches.get(0)); - assertEquals(1, batches.get(0).getMetrics().size()); + List batches = server.getService().snapshotBatches(); + assertEquals(1, batches.size()); + assertNotNull(batches.get(0)); + assertEquals(1, batches.get(0).getMetrics().size()); + } + } } @Test public void reporterAfterCloseNoThrow() throws InterruptedException { - final MockM3Server server = new MockM3Server(0, socketAddress); + try (final MockM3Server server = bootM3Collector(0);) { + try (final M3Reporter reporter = reporterBuilder.build()) { + reporter.close(); - Thread serverThread = new Thread(new Runnable() { - @Override - public void run() { - server.serve(); + reporter.reportGauge("my-gauge", null, 4.2); + reporter.flush(); } - }); - - try { - serverThread.start(); - - M3Reporter reporter = new M3Reporter.Builder(socketAddress) - .service("test-service") - .commonTags(DEFAULT_TAGS) - .build(); - - reporter.close(); - - reporter.reportGauge("my-gauge", null, 4.2); - reporter.flush(); - } finally { - server.awaitAndClose(); } } @Test public void reporterHistogramDurations() throws InterruptedException { - final MockM3Server server = new MockM3Server(2, socketAddress); + List receivedBatches; + + try (final MockM3Server server = bootM3Collector(2)) { + try (final M3Reporter reporter = reporterBuilder.build()) { + Buckets buckets = DurationBuckets.linear(Duration.ZERO, Duration.ofMillis(25), 5); + + Map histogramTags = new HashMap<>(); + histogramTags.put("foo", "bar"); + + reporter.reportHistogramDurationSamples( + "my-histogram", + histogramTags, + buckets, + Duration.ZERO, + Duration.ofMillis(25), + 7 + ); + + reporter.reportHistogramDurationSamples( + "my-histogram", + histogramTags, + buckets, + Duration.ofMillis(50), + Duration.ofMillis(75), + 3 + ); - Thread serverThread = new Thread(new Runnable() { - @Override - public void run() { - server.serve(); - } - }); - - serverThread.start(); - - M3Reporter reporter = new M3Reporter.Builder(socketAddress) - .service("test-service") - .commonTags(DEFAULT_TAGS) - .build(); - - Buckets buckets = DurationBuckets.linear(Duration.ZERO, Duration.ofMillis(25), 5); - - Map histogramTags = new HashMap<>(); - histogramTags.put("foo", "bar"); - - reporter.reportHistogramDurationSamples( - "my-histogram", - histogramTags, - buckets, - Duration.ZERO, - Duration.ofMillis(25), - 7 - ); - - reporter.reportHistogramDurationSamples( - "my-histogram", - histogramTags, - buckets, - Duration.ofMillis(50), - Duration.ofMillis(75), - 3 - ); + reporter.close(); + server.awaitReceiving(MAX_WAIT_TIMEOUT); - reporter.close(); - server.awaitAndClose(); + receivedBatches = server.getService().snapshotBatches(); + } + } - List batches = server.getService().getBatches(); - assertEquals(1, batches.size()); - assertNotNull(batches.get(0)); - assertEquals(2, batches.get(0).getMetrics().size()); + assertEquals(1, receivedBatches.size()); + assertNotNull(receivedBatches.get(0)); + assertEquals(2, receivedBatches.get(0).getMetrics().size()); // Verify first bucket - Metric metric = batches.get(0).getMetrics().get(0); + Metric metric = receivedBatches.get(0).getMetrics().get(0); assertEquals("my-histogram", metric.getName()); assertTrue(metric.isSetTags()); assertEquals(3, metric.getTagsSize()); @@ -359,7 +309,7 @@ public void run() { assertEquals(7, count.getI64Value()); // Verify second bucket - metric = server.getService().getBatches().get(0).getMetrics().get(1); + metric = receivedBatches.get(0).getMetrics().get(1); assertEquals("my-histogram", metric.getName()); assertTrue(metric.isSetTags()); assertEquals(3, metric.getTagsSize()); @@ -384,104 +334,92 @@ public void run() { @Test public void reporterHistogramValues() throws InterruptedException { - final MockM3Server server = new MockM3Server(2, socketAddress); + List receivedBatches; + + try (final MockM3Server server = bootM3Collector(2)) { + try (final M3Reporter reporter = reporterBuilder.build()) { + + Buckets buckets = ValueBuckets.linear(0, 25_000_000, 5); + + Map histogramTags = new HashMap<>(); + histogramTags.put("foo", "bar"); + + reporter.reportHistogramValueSamples( + "my-histogram", + histogramTags, + buckets, + 0, + 25_000_000, + 7 + ); + + reporter.reportHistogramValueSamples( + "my-histogram", + histogramTags, + buckets, + 50_000_000, + 75_000_000, + 3 + ); - Thread serverThread = new Thread(new Runnable() { - @Override - public void run() { - server.serve(); + reporter.close(); + server.awaitReceiving(MAX_WAIT_TIMEOUT); + + receivedBatches = server.getService().snapshotBatches(); } - }); + } - try { - serverThread.start(); + assertEquals(1, receivedBatches.size()); + assertNotNull(receivedBatches.get(0)); + assertEquals(2, receivedBatches.get(0).getMetrics().size()); - M3Reporter reporter = new M3Reporter.Builder(socketAddress) - .service("test-service") - .commonTags(DEFAULT_TAGS) - .build(); + // Verify first bucket + Metric metric = receivedBatches.get(0).getMetrics().get(0); + assertEquals("my-histogram", metric.getName()); + assertTrue(metric.isSetTags()); + assertEquals(3, metric.getTagsSize()); - Buckets buckets = ValueBuckets.linear(0, 25_000_000, 5); - - Map histogramTags = new HashMap<>(); - histogramTags.put("foo", "bar"); - - reporter.reportHistogramValueSamples( - "my-histogram", - histogramTags, - buckets, - 0, - 25_000_000, - 7 - ); - - reporter.reportHistogramValueSamples( - "my-histogram", - histogramTags, - buckets, - 50_000_000, - 75_000_000, - 3 - ); - - reporter.close(); - server.awaitAndClose(); - - List batches = server.getService().getBatches(); - assertEquals(1, batches.size()); - assertNotNull(batches.get(0)); - assertEquals(2, batches.get(0).getMetrics().size()); - - // Verify first bucket - Metric metric = batches.get(0).getMetrics().get(0); - assertEquals("my-histogram", metric.getName()); - assertTrue(metric.isSetTags()); - assertEquals(3, metric.getTagsSize()); - - Map expectedTags = new HashMap<>(3, 1); - expectedTags.put("foo", "bar"); - expectedTags.put("bucketid", "0001"); - expectedTags.put("bucket", "0.000000-25000000.000000"); - for (MetricTag tag : metric.getTags()) { - assertEquals(expectedTags.get(tag.getTagName()), tag.getTagValue()); - } + Map expectedTags = new HashMap<>(3, 1); + expectedTags.put("foo", "bar"); + expectedTags.put("bucketid", "0001"); + expectedTags.put("bucket", "0.000000-25000000.000000"); + for (MetricTag tag : metric.getTags()) { + assertEquals(expectedTags.get(tag.getTagName()), tag.getTagValue()); + } - assertTrue(metric.isSetMetricValue()); + assertTrue(metric.isSetMetricValue()); - MetricValue value = metric.getMetricValue(); - assertTrue(value.isSetCount()); - assertFalse(value.isSetGauge()); - assertFalse(value.isSetTimer()); + MetricValue value = metric.getMetricValue(); + assertTrue(value.isSetCount()); + assertFalse(value.isSetGauge()); + assertFalse(value.isSetTimer()); - CountValue count = value.getCount(); - assertTrue(count.isSetI64Value()); - assertEquals(7, count.getI64Value()); + CountValue count = value.getCount(); + assertTrue(count.isSetI64Value()); + assertEquals(7, count.getI64Value()); - // Verify second bucket - metric = server.getService().getBatches().get(0).getMetrics().get(1); - assertEquals("my-histogram", metric.getName()); - assertTrue(metric.isSetTags()); - assertEquals(3, metric.getTagsSize()); + // Verify second bucket + metric = receivedBatches.get(0).getMetrics().get(1); + assertEquals("my-histogram", metric.getName()); + assertTrue(metric.isSetTags()); + assertEquals(3, metric.getTagsSize()); - expectedTags.put("bucketid", "0003"); - expectedTags.put("bucket", "50000000.000000-75000000.000000"); - for (MetricTag tag : metric.getTags()) { - assertEquals(expectedTags.get(tag.getTagName()), tag.getTagValue()); - } + expectedTags.put("bucketid", "0003"); + expectedTags.put("bucket", "50000000.000000-75000000.000000"); + for (MetricTag tag : metric.getTags()) { + assertEquals(expectedTags.get(tag.getTagName()), tag.getTagValue()); + } - assertTrue(metric.isSetMetricValue()); + assertTrue(metric.isSetMetricValue()); - value = metric.getMetricValue(); - assertTrue(value.isSetCount()); - assertFalse(value.isSetGauge()); - assertFalse(value.isSetTimer()); + value = metric.getMetricValue(); + assertTrue(value.isSetCount()); + assertFalse(value.isSetGauge()); + assertFalse(value.isSetTimer()); - count = value.getCount(); - assertTrue(count.isSetI64Value()); - assertEquals(3, count.getI64Value()); - } finally { - server.awaitAndClose(); - } + count = value.getCount(); + assertTrue(count.isSetI64Value()); + assertEquals(3, count.getI64Value()); } @Test @@ -494,6 +432,13 @@ public void capability() { assertEquals(CapableOf.REPORTING_TAGGING, reporter.capabilities()); } + private static MockM3Server bootM3Collector(int expectedMetricsCount) throws InterruptedException { + final MockM3Server server = new MockM3Server(expectedMetricsCount, socketAddress); + new Thread(server::serve).start(); + server.awaitStarting(); + return server; + } + @Test public void testSinglePacketPayloadOverflow() throws InterruptedException { // NOTE: Every metric emitted in this test is taking about 22 bytes, @@ -501,50 +446,40 @@ public void testSinglePacketPayloadOverflow() throws InterruptedException { // which should be split across 2 UDP datagrams with the current configuration int expectedMetricsCount = 5_000; - final MockM3Server server = new MockM3Server(expectedMetricsCount, socketAddress); - - Thread serverThread = new Thread(server::serve); - serverThread.start(); - // NOTE: We're using default max packet size - M3Reporter reporter = new M3Reporter.Builder(socketAddress) + M3Reporter.Builder reporterBuilder = new M3Reporter.Builder(socketAddress) .service("test-service") .commonTags( ImmutableMap.of("env", "test") ) // Effectively disable time-based flushing to only keep // size-based one - .maxProcessorWaitUntilFlushMillis(1_000_000) - .build(); - - ImmutableMap emptyTags = - new ImmutableMap.Builder(0).build(); + .maxProcessorWaitUntilFlushMillis(1_000_000); - for (int i = 0; i < expectedMetricsCount; ++i) { - // NOTE: The goal is to minimize the metric size, to make sure - // they're granular enough to detect any transport/reporter configuration - // inconsistencies - reporter.reportCounter("c", emptyTags, 1); - } + List metrics; - // Make sure reporter is flushed - reporter.flush(); + try (final MockM3Server server = bootM3Collector(expectedMetricsCount)) { + try (final M3Reporter reporter = reporterBuilder.build()) { - // Shutdown both reporter and server - reporter.close(); - server.awaitAndClose(); + ImmutableMap emptyTags = + new ImmutableMap.Builder(0).build(); - List batches = server.getService().getBatches(); + for (int i = 0; i < expectedMetricsCount; ++i) { + // NOTE: The goal is to minimize the metric size, to make sure + // they're granular enough to detect any transport/reporter configuration + // inconsistencies + reporter.reportCounter("c", emptyTags, 1); + } - int totalMetrics = 0; + // Shutdown reporter + reporter.close(); - // Validate that all metrics had been received - for (MetricBatch batch : batches) { - assertNotNull(batch); + server.awaitReceiving(MAX_WAIT_TIMEOUT); - totalMetrics += batch.metrics.size(); + metrics = server.getService().snapshotMetrics(); + } } - assertEquals(totalMetrics, expectedMetricsCount); + assertEquals(expectedMetricsCount, metrics.size()); } } diff --git a/m3/src/test/java/com/uber/m3/tally/m3/MockM3Server.java b/m3/src/test/java/com/uber/m3/tally/m3/MockM3Server.java index f140081..e742aab 100644 --- a/m3/src/test/java/com/uber/m3/tally/m3/MockM3Server.java +++ b/m3/src/test/java/com/uber/m3/tally/m3/MockM3Server.java @@ -28,6 +28,8 @@ import org.apache.thrift.protocol.TProtocol; import org.apache.thrift.transport.TTransport; import org.apache.thrift.transport.TTransportException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.net.SocketAddress; import java.net.SocketException; @@ -35,14 +37,17 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; -public class MockM3Server { - private static final Duration MAX_WAIT_TIMEOUT = Duration.ofSeconds(30); +public class MockM3Server implements AutoCloseable { + + private static final Logger LOG = LoggerFactory.getLogger(MockM3Server.class); private final CountDownLatch expectedMetricsLatch; - private TProcessor processor; - private TTransport transport; - private MockM3Service service; + private final Object startupMonitor = new Object(); + + private final TProcessor processor; + private final TTransport transport; + private final MockM3Service service; public MockM3Server( int expectedMetricsCount, @@ -62,12 +67,18 @@ public MockM3Server( public void serve() { try { transport.open(); + + LOG.info("Opened receiving server socket"); } catch (TTransportException e) { throw new RuntimeException("Failed to open socket", e); } TProtocol protocol = new TCompactProtocol.Factory().getProtocol(transport); + synchronized (startupMonitor) { + startupMonitor.notifyAll(); + } + while (transport.isOpen()) { try { processor.process(protocol, protocol); @@ -81,12 +92,31 @@ public void serve() { } } - public void awaitAndClose() throws InterruptedException { - expectedMetricsLatch.await(MAX_WAIT_TIMEOUT.getSeconds(), TimeUnit.SECONDS); - transport.close(); - } - public MockM3Service getService() { return service; } + + /** + * Awaits receiving of all the expected metrics + */ + public void awaitReceiving(Duration waitTimeout) throws InterruptedException { + expectedMetricsLatch.await(waitTimeout.getSeconds(), TimeUnit.SECONDS); + } + + /** + * Awaits for the server to be fully booted up + */ + public void awaitStarting() throws InterruptedException { + synchronized (startupMonitor) { + startupMonitor.wait(); + } + } + + @Override + public void close() { + // Close immediately without waiting + transport.close(); + + LOG.info("Closing receiving server socket"); + } } diff --git a/m3/src/test/java/com/uber/m3/tally/m3/MockM3Service.java b/m3/src/test/java/com/uber/m3/tally/m3/MockM3Service.java index f5f4d78..7e3265f 100644 --- a/m3/src/test/java/com/uber/m3/tally/m3/MockM3Service.java +++ b/m3/src/test/java/com/uber/m3/tally/m3/MockM3Service.java @@ -23,7 +23,6 @@ import com.uber.m3.thrift.gen.M3; import com.uber.m3.thrift.gen.Metric; import com.uber.m3.thrift.gen.MetricBatch; -import org.apache.thrift.transport.TTransportException; import java.util.ArrayList; import java.util.List; @@ -41,25 +40,31 @@ public MockM3Service(CountDownLatch metricsCountLatch) { this.metricsCountLatch = metricsCountLatch; } - public List getBatches() { + public List snapshotBatches() { + lock.readLock().lock(); + try { + return new ArrayList<>(batches); + } finally { + lock.readLock().unlock(); + } + } + public List snapshotMetrics() { lock.readLock().lock(); - try { - return batches; + return new ArrayList<>(metrics); } finally { lock.readLock().unlock(); } } @Override - public void emitMetricBatch(MetricBatch batch) throws TTransportException { + public void emitMetricBatch(MetricBatch batch) { lock.writeLock().lock(); batches.add(batch); for (Metric metric : batch.getMetrics()) { metrics.add(metric); - metricsCountLatch.countDown(); }