diff --git a/docs/content/configuration/index.md b/docs/content/configuration/index.md index 708755e59311..06f7e2ac77c1 100644 --- a/docs/content/configuration/index.md +++ b/docs/content/configuration/index.md @@ -118,6 +118,7 @@ The following monitors are available: |`io.druid.server.metrics.HistoricalMetricsMonitor`|Reports statistics on Historical nodes.| |`com.metamx.metrics.JvmMonitor`|Reports JVM-related statistics.| |`io.druid.segment.realtime.RealtimeMetricsMonitor`|Reports statistics on Realtime nodes.| +|`io.druid.server.metrics.EventReceiverFirehoseMonitor`|Reports how many events have been queued in the EventReceiverFirehose.| ### Emitting Metrics diff --git a/docs/content/operations/metrics.md b/docs/content/operations/metrics.md index 0a445a29655f..68b57a8ab669 100644 --- a/docs/content/operations/metrics.md +++ b/docs/content/operations/metrics.md @@ -156,6 +156,14 @@ These metrics are only available if the JVMMonitor module is included. |`jvm/gc/count`|Garbage collection count.|gcName.|< 100| |`jvm/gc/time`|Garbage collection time.|gcName.|< 1s| +### EventReceiverFirehose + +The following metric is only available if the EventReceiverFirehoseMonitor module is included. + +|Metric|Description|Dimensions|Normal Value| +|------|-----------|----------|------------| +|`ingest/events/buffered`|Number of events queued in the EventReceiverFirehose's buffer|serviceName, bufferCapacity.|Equal to current # of events in the buffer queue.| + ## Sys These metrics are only available if the SysMonitor module is included. diff --git a/server/src/main/java/io/druid/segment/realtime/firehose/EventReceiverFirehoseFactory.java b/server/src/main/java/io/druid/segment/realtime/firehose/EventReceiverFirehoseFactory.java index cb6c947dd03b..c58147f05014 100644 --- a/server/src/main/java/io/druid/segment/realtime/firehose/EventReceiverFirehoseFactory.java +++ b/server/src/main/java/io/druid/segment/realtime/firehose/EventReceiverFirehoseFactory.java @@ -33,12 +33,12 @@ import io.druid.data.input.Firehose; import io.druid.data.input.FirehoseFactory; import io.druid.data.input.InputRow; -import io.druid.data.input.Rows; import io.druid.data.input.impl.MapInputRowParser; - import io.druid.guice.annotations.Json; import io.druid.guice.annotations.Smile; -import java.io.InputStream; +import io.druid.server.metrics.EventReceiverFirehoseMetric; +import io.druid.server.metrics.EventReceiverFirehoseRegister; + import javax.servlet.http.HttpServletRequest; import javax.ws.rs.Consumes; import javax.ws.rs.POST; @@ -48,6 +48,7 @@ import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; import java.io.IOException; +import java.io.InputStream; import java.util.Collection; import java.util.List; import java.util.Map; @@ -70,6 +71,7 @@ public class EventReceiverFirehoseFactory implements FirehoseFactory chatHandlerProvider; private final ObjectMapper jsonMapper; private final ObjectMapper smileMapper; + private final EventReceiverFirehoseRegister eventReceiverFirehoseRegister; @JsonCreator public EventReceiverFirehoseFactory( @@ -77,7 +79,8 @@ public EventReceiverFirehoseFactory( @JsonProperty("bufferSize") Integer bufferSize, @JacksonInject ChatHandlerProvider chatHandlerProvider, @JacksonInject @Json ObjectMapper jsonMapper, - @JacksonInject @Smile ObjectMapper smileMapper + @JacksonInject @Smile ObjectMapper smileMapper, + @JacksonInject EventReceiverFirehoseRegister eventReceiverFirehoseRegister ) { Preconditions.checkNotNull(serviceName, "serviceName"); @@ -87,13 +90,13 @@ public EventReceiverFirehoseFactory( this.chatHandlerProvider = Optional.fromNullable(chatHandlerProvider); this.jsonMapper = jsonMapper; this.smileMapper = smileMapper; + this.eventReceiverFirehoseRegister = eventReceiverFirehoseRegister; } @Override public Firehose connect(MapInputRowParser firehoseParser) throws IOException { log.info("Connecting firehose: %s", serviceName); - final EventReceiverFirehose firehose = new EventReceiverFirehose(firehoseParser); if (chatHandlerProvider.isPresent()) { @@ -106,6 +109,8 @@ public Firehose connect(MapInputRowParser firehoseParser) throws IOException log.info("No chathandler detected"); } + eventReceiverFirehoseRegister.register(serviceName, firehose); + return firehose; } @@ -121,7 +126,7 @@ public int getBufferSize() return bufferSize; } - public class EventReceiverFirehose implements ChatHandler, Firehose + public class EventReceiverFirehose implements ChatHandler, Firehose, EventReceiverFirehoseMetric { private final BlockingQueue buffer; private final MapInputRowParser parser; @@ -243,12 +248,25 @@ public void run() }; } + @Override + public int getCurrentBufferSize() + { + // ArrayBlockingQueue's implementation of size() is thread-safe, so we can use that + return buffer.size(); + } + + @Override + public int getCapacity() + { + return bufferSize; + } + @Override public void close() throws IOException { log.info("Firehose closing."); closed = true; - + eventReceiverFirehoseRegister.unregister(serviceName); if (chatHandlerProvider.isPresent()) { chatHandlerProvider.get().unregister(serviceName); } diff --git a/server/src/main/java/io/druid/server/metrics/EventReceiverFirehoseMetric.java b/server/src/main/java/io/druid/server/metrics/EventReceiverFirehoseMetric.java new file mode 100644 index 000000000000..8f286d05c588 --- /dev/null +++ b/server/src/main/java/io/druid/server/metrics/EventReceiverFirehoseMetric.java @@ -0,0 +1,41 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.server.metrics; + +/** + * An EventReceiverFirehoseMetric is an object with metrics about EventReceiverFirehose objects. + * It is not likely that anything other than an EventReceiverFirehose actually implements this. + * This interface is not part of the public API and backwards incompatible changes can occur without + * requiring a major (or even minor) version change. + * The interface's primary purpose is to be able to share metrics via the EventReceiverFirehoseRegister + * without exposing the entire EventReceiverFirehose + */ +public interface EventReceiverFirehoseMetric +{ + /** + * Return the current number of {@link io.druid.data.input.InputRow} that are stored in the buffer. + */ + int getCurrentBufferSize(); + + /** + * Return the capacity of the buffer. + */ + int getCapacity(); +} diff --git a/server/src/main/java/io/druid/server/metrics/EventReceiverFirehoseMonitor.java b/server/src/main/java/io/druid/server/metrics/EventReceiverFirehoseMonitor.java new file mode 100644 index 000000000000..aa11bf589d61 --- /dev/null +++ b/server/src/main/java/io/druid/server/metrics/EventReceiverFirehoseMonitor.java @@ -0,0 +1,61 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.server.metrics; + +import com.google.inject.Inject; +import com.metamx.emitter.service.ServiceEmitter; +import com.metamx.emitter.service.ServiceMetricEvent; +import com.metamx.metrics.AbstractMonitor; + +import java.util.Map; + +public class EventReceiverFirehoseMonitor extends AbstractMonitor +{ + + private final EventReceiverFirehoseRegister register; + + @Inject + public EventReceiverFirehoseMonitor( + EventReceiverFirehoseRegister eventReceiverFirehoseRegister + ) + { + this.register = eventReceiverFirehoseRegister; + } + + @Override + public boolean doMonitor(ServiceEmitter emitter) + { + for (Map.Entry entry : register.getMetrics()) { + final String serviceName = entry.getKey(); + final EventReceiverFirehoseMetric metric = entry.getValue(); + + final ServiceMetricEvent.Builder builder = ServiceMetricEvent.builder() + .setDimension("serviceName", serviceName) + .setDimension( + "bufferCapacity", + String.valueOf(metric.getCapacity()) + ); + + emitter.emit(builder.build("ingest/events/buffered", metric.getCurrentBufferSize())); + } + + return true; + } +} diff --git a/server/src/main/java/io/druid/server/metrics/EventReceiverFirehoseRegister.java b/server/src/main/java/io/druid/server/metrics/EventReceiverFirehoseRegister.java new file mode 100644 index 000000000000..da1f073e461c --- /dev/null +++ b/server/src/main/java/io/druid/server/metrics/EventReceiverFirehoseRegister.java @@ -0,0 +1,56 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.server.metrics; + +import com.metamx.common.ISE; +import com.metamx.common.logger.Logger; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +public class EventReceiverFirehoseRegister +{ + + private static final Logger log = new Logger(EventReceiverFirehoseRegister.class); + + private final ConcurrentMap metrics = new ConcurrentHashMap<>(); + + public void register(String serviceName, EventReceiverFirehoseMetric metric) + { + log.info("Registering EventReceiverFirehoseMetric for service [%s]", serviceName); + if (metrics.putIfAbsent(serviceName, metric) != null) { + throw new ISE("Service [%s] is already registered!", serviceName); + } + } + + public Iterable> getMetrics() + { + return metrics.entrySet(); + } + + public void unregister(String serviceName) + { + log.info("Unregistering EventReceiverFirehoseMetric for service [%s]", serviceName); + if (metrics.remove(serviceName) == null) { + log.warn("Unregistering a non-exist service. Service [%s] never exists."); + } + } +} diff --git a/server/src/main/java/io/druid/server/metrics/MetricsModule.java b/server/src/main/java/io/druid/server/metrics/MetricsModule.java index a395b8b317eb..2d80a760d2e3 100644 --- a/server/src/main/java/io/druid/server/metrics/MetricsModule.java +++ b/server/src/main/java/io/druid/server/metrics/MetricsModule.java @@ -33,6 +33,7 @@ import io.druid.concurrent.Execs; import io.druid.guice.DruidBinders; import io.druid.guice.JsonConfigProvider; +import io.druid.guice.LazySingleton; import io.druid.guice.ManageLifecycle; import java.util.List; @@ -59,6 +60,8 @@ public void configure(Binder binder) DruidBinders.metricMonitorBinder(binder); // get the binder so that it will inject the empty set at a minimum. + binder.bind(EventReceiverFirehoseRegister.class).in(LazySingleton.class); + // Instantiate eagerly so that we get everything registered and put into the Lifecycle binder.bind(Key.get(MonitorScheduler.class, Names.named("ForTheEagerness"))) .to(MonitorScheduler.class) diff --git a/server/src/test/java/io/druid/segment/realtime/firehose/EventReceiverFirehoseTest.java b/server/src/test/java/io/druid/segment/realtime/firehose/EventReceiverFirehoseTest.java new file mode 100644 index 000000000000..9df0170f70c7 --- /dev/null +++ b/server/src/test/java/io/druid/segment/realtime/firehose/EventReceiverFirehoseTest.java @@ -0,0 +1,220 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.segment.realtime.firehose; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Iterables; +import com.metamx.common.ISE; +import io.druid.concurrent.Execs; +import io.druid.data.input.impl.DimensionsSpec; +import io.druid.data.input.impl.JSONParseSpec; +import io.druid.data.input.impl.MapInputRowParser; +import io.druid.data.input.impl.TimestampSpec; +import io.druid.jackson.DefaultObjectMapper; +import io.druid.server.metrics.EventReceiverFirehoseMetric; +import io.druid.server.metrics.EventReceiverFirehoseRegister; +import org.apache.commons.io.IOUtils; +import org.easymock.EasyMock; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import javax.servlet.http.HttpServletRequest; +import java.io.IOException; +import java.io.InputStream; +import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +public class EventReceiverFirehoseTest +{ + private static final int CAPACITY = 300; + private static final int NUM_EVENTS = 100; + private static final String SERVICE_NAME = "test_firehose"; + + private final String inputRow = "[{\n" + + " \"timestamp\":123,\n" + + " \"d1\":\"v1\"\n" + + "}]"; + + private EventReceiverFirehoseFactory eventReceiverFirehoseFactory; + private EventReceiverFirehoseFactory.EventReceiverFirehose firehose; + private EventReceiverFirehoseRegister register = new EventReceiverFirehoseRegister(); + private HttpServletRequest req; + + @Before + public void setUp() throws Exception + { + req = EasyMock.createMock(HttpServletRequest.class); + eventReceiverFirehoseFactory = new EventReceiverFirehoseFactory( + SERVICE_NAME, + CAPACITY, + null, + new DefaultObjectMapper(), + new DefaultObjectMapper(), + register + ); + firehose = (EventReceiverFirehoseFactory.EventReceiverFirehose) eventReceiverFirehoseFactory.connect( + new MapInputRowParser( + new JSONParseSpec( + new TimestampSpec( + "timestamp", + "auto", + null + ), new DimensionsSpec(ImmutableList.of("d1"), null, null) + ) + ) + ); + } + + @Test + public void testSingleThread() throws IOException + { + EasyMock.expect(req.getContentType()).andReturn("application/json").times(NUM_EVENTS); + EasyMock.replay(req); + + for (int i = 0; i < NUM_EVENTS; ++i) { + final InputStream inputStream = IOUtils.toInputStream(inputRow); + firehose.addAll(inputStream, req); + Assert.assertEquals(i + 1, firehose.getCurrentBufferSize()); + inputStream.close(); + } + + EasyMock.verify(req); + + final Iterable> metrics = register.getMetrics(); + Assert.assertEquals(1, Iterables.size(metrics)); + + final Map.Entry entry = Iterables.getLast(metrics); + Assert.assertEquals(SERVICE_NAME, entry.getKey()); + Assert.assertEquals(CAPACITY, entry.getValue().getCapacity()); + Assert.assertEquals(CAPACITY, firehose.getCapacity()); + Assert.assertEquals(NUM_EVENTS, entry.getValue().getCurrentBufferSize()); + Assert.assertEquals(NUM_EVENTS, firehose.getCurrentBufferSize()); + + for (int i = NUM_EVENTS - 1; i >= 0; --i) { + Assert.assertTrue(firehose.hasMore()); + Assert.assertNotNull(firehose.nextRow()); + Assert.assertEquals(i, firehose.getCurrentBufferSize()); + } + + Assert.assertEquals(CAPACITY, entry.getValue().getCapacity()); + Assert.assertEquals(CAPACITY, firehose.getCapacity()); + Assert.assertEquals(0, entry.getValue().getCurrentBufferSize()); + Assert.assertEquals(0, firehose.getCurrentBufferSize()); + + firehose.close(); + Assert.assertFalse(firehose.hasMore()); + Assert.assertEquals(0, Iterables.size(register.getMetrics())); + + } + + @Test + public void testMultipleThreads() throws InterruptedException, IOException, TimeoutException, ExecutionException + { + EasyMock.expect(req.getContentType()).andReturn("application/json").times(2 * NUM_EVENTS); + EasyMock.replay(req); + + final ExecutorService executorService = Execs.singleThreaded("single_thread"); + final Future future = executorService.submit( + new Callable() + { + @Override + public Boolean call() throws Exception + { + for (int i = 0; i < NUM_EVENTS; ++i) { + final InputStream inputStream = IOUtils.toInputStream(inputRow); + firehose.addAll(inputStream, req); + inputStream.close(); + } + return true; + } + } + ); + + for (int i = 0; i < NUM_EVENTS; ++i) { + final InputStream inputStream = IOUtils.toInputStream(inputRow); + firehose.addAll(inputStream, req); + inputStream.close(); + } + + future.get(10, TimeUnit.SECONDS); + + EasyMock.verify(req); + + final Iterable> metrics = register.getMetrics(); + Assert.assertEquals(1, Iterables.size(metrics)); + + final Map.Entry entry = Iterables.getLast(metrics); + + Assert.assertEquals(SERVICE_NAME, entry.getKey()); + Assert.assertEquals(CAPACITY, entry.getValue().getCapacity()); + Assert.assertEquals(CAPACITY, firehose.getCapacity()); + Assert.assertEquals(2 * NUM_EVENTS, entry.getValue().getCurrentBufferSize()); + Assert.assertEquals(2 * NUM_EVENTS, firehose.getCurrentBufferSize()); + + for (int i = 2 * NUM_EVENTS - 1; i >= 0; --i) { + Assert.assertTrue(firehose.hasMore()); + Assert.assertNotNull(firehose.nextRow()); + Assert.assertEquals(i, firehose.getCurrentBufferSize()); + } + + Assert.assertEquals(CAPACITY, entry.getValue().getCapacity()); + Assert.assertEquals(CAPACITY, firehose.getCapacity()); + Assert.assertEquals(0, entry.getValue().getCurrentBufferSize()); + Assert.assertEquals(0, firehose.getCurrentBufferSize()); + + firehose.close(); + Assert.assertFalse(firehose.hasMore()); + Assert.assertEquals(0, Iterables.size(register.getMetrics())); + + executorService.shutdownNow(); + } + + @Test(expected = ISE.class) + public void testDuplicateRegistering() throws IOException + { + EventReceiverFirehoseFactory eventReceiverFirehoseFactory2 = new EventReceiverFirehoseFactory( + SERVICE_NAME, + CAPACITY, + null, + new DefaultObjectMapper(), + new DefaultObjectMapper(), + register + ); + EventReceiverFirehoseFactory.EventReceiverFirehose firehose2 = + (EventReceiverFirehoseFactory.EventReceiverFirehose) eventReceiverFirehoseFactory2 + .connect( + new MapInputRowParser( + new JSONParseSpec( + new TimestampSpec( + "timestamp", + "auto", + null + ), new DimensionsSpec(ImmutableList.of("d1"), null, null) + ) + ) + ); + } +}