Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,15 @@ allprojects {
}

dependencies {
testCompile('junit:junit:4.12')
testCompile "org.mockito:mockito-core:[1.0, 2.0)"
testCompile 'junit:junit:4.12'
testCompile 'org.mockito:mockito-core:[1.0, 2.0)'
testCompile 'ch.qos.logback:logback-classic:1.2.3'
testCompile 'ch.qos.logback:logback-core:1.2.3'
}

test {
testLogging.showStandardStreams = true
testLogging.exceptionFormat = 'full'
}

ext.ossrhUsername = System.getenv('OSSRH_JIRA_USERNAME')
Expand Down
217 changes: 123 additions & 94 deletions m3/src/main/java/com/uber/m3/tally/m3/M3Reporter.java
Original file line number Diff line number Diff line change
Expand Up @@ -99,20 +99,23 @@ public class M3Reporter implements StatsReporter, AutoCloseable {

private static final int MIN_METRIC_BUCKET_ID_TAG_LENGTH = 4;

/**
* NOTE: DO NOT CHANGE THIS NUMBER!
* Reporter architecture is not suited for multi-processor setup and might cause some disruption
* to how metrics are processed and eventually submitted to M3 collectors;
*/
private static final int NUM_PROCESSORS = 1;

private static final ThreadLocal<SerializedPayloadSizeEstimator> PAYLOAD_SIZE_ESTIMATOR =
ThreadLocal.withInitial(SerializedPayloadSizeEstimator::new);

private M3.Client client;

private Duration maxBufferingDelay;
private final Duration maxBufferingDelay;

private final int payloadCapacity;

private String bucketIdTagName;
private String bucketTagName;
private String bucketValFmt;
private final String bucketIdTagName;
private final String bucketTagName;
private final String bucketValFmt;

private final Set<MetricTag> commonTags;

Expand All @@ -125,51 +128,35 @@ public class M3Reporter implements StatsReporter, AutoCloseable {

// This is a synchronization barrier to make sure that reporter
// is being shutdown only after all of its processor had done so
private CountDownLatch shutdownLatch = new CountDownLatch(NUM_PROCESSORS);
private final CountDownLatch processorsShutdownLatch;

private TTransport transport;
private final List<Processor> processors;

private AtomicBoolean isShutdown = new AtomicBoolean(false);
private final AtomicBoolean isShutdown = new AtomicBoolean(false);

// Use inner Builder class to construct an M3Reporter
private M3Reporter(Builder builder) {
try {
// Builder verifies non-null, non-empty socketAddresses
SocketAddress[] socketAddresses = builder.socketAddresses;
payloadCapacity = calculatePayloadCapacity(builder.maxPacketSizeBytes, builder.metricTagSet);

TProtocolFactory protocolFactory = new TCompactProtocol.Factory();
maxBufferingDelay = Duration.ofMillis(builder.maxProcessorWaitUntilFlushMillis);

if (socketAddresses.length > 1) {
transport = new TMultiUdpClient(socketAddresses);
} else {
transport = new TUdpClient(socketAddresses[0]);
}

transport.open();

client = new M3.Client(protocolFactory.getProtocol(transport));

payloadCapacity = calculatePayloadCapacity(builder.maxPacketSizeBytes, builder.metricTagSet);
bucketIdTagName = builder.histogramBucketIdName;
bucketTagName = builder.histogramBucketName;
bucketValFmt = String.format("%%.%df", builder.histogramBucketTagPrecision);

maxBufferingDelay = Duration.ofMillis(builder.maxProcessorWaitUntilFlushMillis);
metricQueue = new LinkedBlockingQueue<>(builder.maxQueueSize);

bucketIdTagName = builder.histogramBucketIdName;
bucketTagName = builder.histogramBucketName;
bucketValFmt = String.format("%%.%df", builder.histogramBucketTagPrecision);
executor = builder.executor != null ? builder.executor : Executors.newFixedThreadPool(NUM_PROCESSORS);

metricQueue = new LinkedBlockingQueue<>(builder.maxQueueSize);
clock = Clock.systemUTC();

executor = builder.executor != null ? builder.executor : Executors.newFixedThreadPool(NUM_PROCESSORS);
commonTags = builder.metricTagSet;

clock = Clock.systemUTC();
processorsShutdownLatch = new CountDownLatch(NUM_PROCESSORS);

commonTags = builder.metricTagSet;

for (int i = 0; i < NUM_PROCESSORS; ++i) {
addAndRunProcessor();
}
} catch (TTransportException | SocketException e) {
throw new RuntimeException("Exception creating M3Reporter", e);
processors = new ArrayList<>();
for (int i = 0; i < NUM_PROCESSORS; ++i) {
processors.add(bootProcessor(builder.endpointSocketAddresses));
}
}

Expand Down Expand Up @@ -197,8 +184,15 @@ private static String getHostName() {
}
}

private void addAndRunProcessor() {
executor.execute(new Processor());
private Processor bootProcessor(SocketAddress[] endpointSocketAddresses) {
try {
Processor processor = new Processor(endpointSocketAddresses);
executor.execute(processor);
return processor;
} catch (TTransportException | SocketException e) {
LOG.error("Failed to boot processor", e);
throw new RuntimeException(e);
}
}

@Override
Expand All @@ -212,11 +206,7 @@ public void flush() {
return;
}

try {
metricQueue.put(SizedMetric.FLUSH);
} catch (InterruptedException e) {
LOG.warn("Interrupted while trying to queue flush sentinel");
}
processors.forEach(Processor::scheduleFlush);
}

@Override
Expand All @@ -226,17 +216,14 @@ public void close() {
return;
}

// Put sentinal value in queue so that processors know to disregard anything that comes after it.
queueSizedMetric(SizedMetric.CLOSE);

// Important to use `shutdownNow` instead of `shutdown` to interrupt processor
// thread(s) or else they will block forever
executor.shutdownNow();

try {
// Wait a maximum of `MAX_PROCESSOR_WAIT_ON_CLOSE_MILLIS` for all processors
// to complete
if (!shutdownLatch.await(MAX_PROCESSOR_WAIT_ON_CLOSE_MILLIS, TimeUnit.MILLISECONDS)) {
if (!processorsShutdownLatch.await(MAX_PROCESSOR_WAIT_ON_CLOSE_MILLIS, TimeUnit.MILLISECONDS)) {
LOG.warn(
"M3Reporter closing before Processors complete after waiting timeout of {}ms!",
MAX_PROCESSOR_WAIT_ON_CLOSE_MILLIS
Expand All @@ -245,8 +232,6 @@ public void close() {
} catch (InterruptedException e) {
LOG.warn("M3Reporter closing before Processors complete due to being interrupted!");
}

transport.close();
}

private static Set<MetricTag> toMetricTagSet(Map<String, String> tags) {
Expand Down Expand Up @@ -463,19 +448,54 @@ private void queueSizedMetric(SizedMetric sizedMetric) {
}
}

private static void runNoThrow(ThrowingRunnable r) {
try {
r.run();
} catch (Throwable t) {
// no-op
}
}

private class Processor implements Runnable {

private final List<Metric> metricsBuffer =
new ArrayList<>(payloadCapacity / 10);

private Instant lastBufferFlushTimestamp = Instant.now(clock);

private int metricsSize = 0;
private int bufferedBytes = 0;

private final M3.Client client;
private final TTransport transport;

private final AtomicBoolean shouldFlush = new AtomicBoolean(false);

Processor(SocketAddress[] socketAddresses) throws TTransportException, SocketException {
TProtocolFactory protocolFactory = new TCompactProtocol.Factory();

if (socketAddresses.length > 1) {
transport = new TMultiUdpClient(socketAddresses);
} else {
transport = new TUdpClient(socketAddresses[0]);
}

// Open the socket
transport.open();

client = new M3.Client(protocolFactory.getProtocol(transport));

LOG.info("Booted reporting processor");
}

@Override
public void run() {
try {
while (!executor.isShutdown()) {
while (!isShutdown.get()) {
try {
// Check whether flush has been requested by the reporter
if (shouldFlush.compareAndSet(true, false)) {
flushBuffered();
}

// This `poll` call will block for at most the specified duration to take an item
// off the queue. If we get an item, we append it to the queue to be flushed,
// otherwise we flush what we have so far.
Expand All @@ -484,46 +504,49 @@ public void run() {
// catch block.
SizedMetric sizedMetric = metricQueue.poll(maxBufferingDelay.toMillis(), TimeUnit.MILLISECONDS);

// Drop metrics that came in after close
if (sizedMetric == SizedMetric.CLOSE) {
metricQueue.clear();
break;
}

if (sizedMetric == null) {
// If we didn't get any new metrics after waiting the specified time,
// flush what we have so far.
process(SizedMetric.FLUSH);
flushBuffered();
} else {
process(sizedMetric);
}

} catch (InterruptedException t) {
// no-op
} catch (Throwable t) {
// This is fly-away guard making sure that uncaught exception
// will be logged
LOG.error("Unhandled exception in processor", t);
throw new RuntimeException(t);
}
} catch (InterruptedException e) {
// Don't care if we get interrupted - the finally block will clean up
} finally {
drainQueue();
flushBuffered();
// Count down shutdown latch to notify reporter
shutdownLatch.countDown();
}
}

private void process(SizedMetric sizedMetric) {
if (sizedMetric == SizedMetric.FLUSH) {
flushBuffered();
return;
}
LOG.warn("Processor shutting down");

// Drain queue of any remaining metrics submitted prior to shutdown;
runNoThrow(this::drainQueue);
// Flush remaining buffers at last (best effort)
runNoThrow(this::flushBuffered);

// Close transport
transport.close();

// Count down shutdown latch to notify reporter
processorsShutdownLatch.countDown();

LOG.warn("Processor shut down");
}

private void process(SizedMetric sizedMetric) throws TException {
int size = sizedMetric.getSize();
if (metricsSize + size > payloadCapacity || elapsedMaxDelaySinceLastFlush()) {
if (bufferedBytes + size > payloadCapacity || elapsedMaxDelaySinceLastFlush()) {
flushBuffered();
}

Metric metric = sizedMetric.getMetric();

metricsBuffer.add(metric);
metricsSize += size;
bufferedBytes += size;
}

private boolean elapsedMaxDelaySinceLastFlush() {
Expand All @@ -532,19 +555,15 @@ private boolean elapsedMaxDelaySinceLastFlush() {
);
}

private void drainQueue() {
while (!metricQueue.isEmpty()) {
SizedMetric sizedMetric = metricQueue.remove();
// Don't care about metrics that came in after close
if (sizedMetric == SizedMetric.CLOSE) {
break;
}
private void drainQueue() throws TException {
SizedMetric metrics;

process(sizedMetric);
while ((metrics = metricQueue.poll()) != null) {
process(metrics);
}
}

private void flushBuffered() {
private void flushBuffered() throws TException {
if (metricsBuffer.isEmpty()) {
return;
}
Expand All @@ -555,14 +574,24 @@ private void flushBuffered() {
.setCommonTags(commonTags)
.setMetrics(metricsBuffer)
);
} catch (TException tException) {
LOG.warn("Failed to flush metrics: " + tException.getMessage());
} catch (TException t) {
LOG.error("Failed to flush metrics", t);
throw t;
}

metricsBuffer.clear();
metricsSize = 0;
bufferedBytes = 0;
lastBufferFlushTimestamp = Instant.now(clock);
}

public void scheduleFlush() {
shouldFlush.set(true);
}
}

@FunctionalInterface
interface ThrowingRunnable {
void run() throws Exception;
}

/**
Expand Down Expand Up @@ -602,7 +631,7 @@ public int evaluateByteSize(Metric metric) {
* Builder pattern to construct an {@link M3Reporter}.
*/
public static class Builder {
protected SocketAddress[] socketAddresses;
protected SocketAddress[] endpointSocketAddresses;
protected String service;
protected String env;
protected ExecutorService executor;
Expand All @@ -621,14 +650,14 @@ public static class Builder {

/**
* Constructs a {@link Builder}. Having at least one {@code SocketAddress} is required.
* @param socketAddresses the array of {@code SocketAddress}es for this {@link M3Reporter}
* @param endpointSocketAddresses the array of {@code SocketAddress}es for this {@link M3Reporter}
*/
public Builder(SocketAddress[] socketAddresses) {
if (socketAddresses == null || socketAddresses.length == 0) {
public Builder(SocketAddress[] endpointSocketAddresses) {
if (endpointSocketAddresses == null || endpointSocketAddresses.length == 0) {
throw new IllegalArgumentException("Must specify at least one SocketAddress");
}

this.socketAddresses = socketAddresses;
this.endpointSocketAddresses = endpointSocketAddresses;
}

/**
Expand Down
3 changes: 0 additions & 3 deletions m3/src/main/java/com/uber/m3/tally/m3/SizedMetric.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,6 @@
* A metric along with its associated size.
*/
public class SizedMetric extends Metric {
public static final SizedMetric FLUSH = new SizedMetric(null, -1);
public static final SizedMetric CLOSE = new SizedMetric(null, -2);

private Metric metric;
private int size;

Expand Down
Loading