From fdbd0857651adc060bbd056d429c131cde725221 Mon Sep 17 00:00:00 2001 From: leventov Date: Tue, 17 Oct 2017 13:27:31 -0500 Subject: [PATCH 1/9] Add Emitter monitoring --- .../ambari-metrics-emitter/pom.xml | 2 +- extensions-contrib/graphite-emitter/pom.xml | 2 +- .../kafka-eight-simpleConsumer/pom.xml | 2 +- extensions-contrib/kafka-emitter/pom.xml | 2 +- extensions-contrib/statsd-emitter/pom.xml | 2 +- extensions-core/hdfs-storage/pom.xml | 2 +- extensions-core/s3-extensions/pom.xml | 2 +- .../simple-client-sslcontext/pom.xml | 2 +- pom.xml | 14 +------ processing/pom.xml | 6 +-- server/pom.xml | 6 +-- .../emitter/EmitterMonitorProvider.java | 42 +++++++++++++++++++ .../server/emitter/HttpEmitterModule.java | 19 ++++++++- .../emitter/ParametrizedUriEmitterModule.java | 13 +++++- .../druid/server/metrics/MetricsModule.java | 6 +++ 15 files changed, 88 insertions(+), 34 deletions(-) create mode 100644 server/src/main/java/io/druid/server/emitter/EmitterMonitorProvider.java diff --git a/extensions-contrib/ambari-metrics-emitter/pom.xml b/extensions-contrib/ambari-metrics-emitter/pom.xml index 702eee4a7c9a..bdf4b543cf29 100644 --- a/extensions-contrib/ambari-metrics-emitter/pom.xml +++ b/extensions-contrib/ambari-metrics-emitter/pom.xml @@ -49,7 +49,7 @@ com.metamx - emitter + java-util provided diff --git a/extensions-contrib/graphite-emitter/pom.xml b/extensions-contrib/graphite-emitter/pom.xml index d81450f71858..6e4b433305ff 100644 --- a/extensions-contrib/graphite-emitter/pom.xml +++ b/extensions-contrib/graphite-emitter/pom.xml @@ -55,7 +55,7 @@ com.metamx - emitter + java-util provided diff --git a/extensions-contrib/kafka-eight-simpleConsumer/pom.xml b/extensions-contrib/kafka-eight-simpleConsumer/pom.xml index 018e2b301596..1f351b2f5780 100644 --- a/extensions-contrib/kafka-eight-simpleConsumer/pom.xml +++ b/extensions-contrib/kafka-eight-simpleConsumer/pom.xml @@ -40,7 +40,7 @@ com.metamx - emitter + java-util provided diff --git a/extensions-contrib/kafka-emitter/pom.xml b/extensions-contrib/kafka-emitter/pom.xml index 01766f6c43ca..24ce7f363331 100644 --- a/extensions-contrib/kafka-emitter/pom.xml +++ b/extensions-contrib/kafka-emitter/pom.xml @@ -54,7 +54,7 @@ com.metamx - emitter + java-util provided diff --git a/extensions-contrib/statsd-emitter/pom.xml b/extensions-contrib/statsd-emitter/pom.xml index 475fae574d6b..488782111238 100644 --- a/extensions-contrib/statsd-emitter/pom.xml +++ b/extensions-contrib/statsd-emitter/pom.xml @@ -48,7 +48,7 @@ com.metamx - emitter + java-util provided diff --git a/extensions-core/hdfs-storage/pom.xml b/extensions-core/hdfs-storage/pom.xml index b066f2050538..1a842ac76d4d 100644 --- a/extensions-core/hdfs-storage/pom.xml +++ b/extensions-core/hdfs-storage/pom.xml @@ -137,7 +137,7 @@ com.metamx - emitter + java-util provided diff --git a/extensions-core/s3-extensions/pom.xml b/extensions-core/s3-extensions/pom.xml index 16e5974338b6..2c89929620a6 100644 --- a/extensions-core/s3-extensions/pom.xml +++ b/extensions-core/s3-extensions/pom.xml @@ -53,7 +53,7 @@ com.metamx - emitter + java-util provided diff --git a/extensions-core/simple-client-sslcontext/pom.xml b/extensions-core/simple-client-sslcontext/pom.xml index 4446b377723b..96b33eaafc38 100644 --- a/extensions-core/simple-client-sslcontext/pom.xml +++ b/extensions-core/simple-client-sslcontext/pom.xml @@ -36,7 +36,7 @@ com.metamx - emitter + java-util provided diff --git a/pom.xml b/pom.xml index 6029a1393305..747322204c66 100644 --- a/pom.xml +++ b/pom.xml @@ -145,18 +145,8 @@ com.metamx - emitter - 0.6.0 - - - com.metamx - http-client - 1.1.0 - - - com.metamx - server-metrics - 0.5.2 + java-util + 1.0 commons-codec diff --git a/processing/pom.xml b/processing/pom.xml index 8f6ce7c5283d..54963de0fb8c 100644 --- a/processing/pom.xml +++ b/processing/pom.xml @@ -51,11 +51,7 @@ com.metamx - emitter - - - com.metamx - server-metrics + java-util com.ning diff --git a/server/pom.xml b/server/pom.xml index 67e0268782ac..7c9b7f7b8c93 100644 --- a/server/pom.xml +++ b/server/pom.xml @@ -47,11 +47,7 @@ com.metamx - http-client - - - com.metamx - server-metrics + java-util commons-cli diff --git a/server/src/main/java/io/druid/server/emitter/EmitterMonitorProvider.java b/server/src/main/java/io/druid/server/emitter/EmitterMonitorProvider.java new file mode 100644 index 000000000000..d7803bdf2098 --- /dev/null +++ b/server/src/main/java/io/druid/server/emitter/EmitterMonitorProvider.java @@ -0,0 +1,42 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.server.emitter; + +import com.metamx.metrics.Monitor; +import io.druid.guice.LazySingleton; + +import javax.annotation.Nullable; + +@LazySingleton +public class EmitterMonitorProvider +{ + private Monitor emitterMontor; + + public void setEmitterMontor(Monitor emitterMontor) + { + this.emitterMontor = emitterMontor; + } + + @Nullable + public Monitor getEmitterMontor() + { + return emitterMontor; + } +} diff --git a/server/src/main/java/io/druid/server/emitter/HttpEmitterModule.java b/server/src/main/java/io/druid/server/emitter/HttpEmitterModule.java index 4dfe7166d57c..9fcd98f2f5b8 100644 --- a/server/src/main/java/io/druid/server/emitter/HttpEmitterModule.java +++ b/server/src/main/java/io/druid/server/emitter/HttpEmitterModule.java @@ -21,6 +21,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Supplier; +import com.google.common.collect.ImmutableMap; import com.google.inject.Binder; import com.google.inject.Module; import com.google.inject.Provides; @@ -30,6 +31,8 @@ import com.metamx.emitter.core.HttpPostEmitter; import com.metamx.http.client.HttpClientConfig; import com.metamx.http.client.HttpClientInit; +import com.metamx.metrics.FeedDefiningMonitor; +import com.metamx.metrics.HttpPostEmitterMonitor; import io.druid.guice.JsonConfigProvider; import io.druid.guice.LazySingleton; import io.druid.guice.ManageLifecycle; @@ -72,7 +75,8 @@ public Emitter getEmitter( Supplier config, @Nullable SSLContext sslContext, Lifecycle lifecycle, - ObjectMapper jsonMapper + ObjectMapper jsonMapper, + EmitterMonitorProvider emitterMonitorProvider ) { final HttpClientConfig.Builder builder = HttpClientConfig @@ -83,6 +87,17 @@ public Emitter getEmitter( if (sslContext != null) { builder.withSslContext(sslContext); } - return new HttpPostEmitter(config.get(), HttpClientInit.createClient(builder.build(), LifecycleUtils.asMmxLifecycle(lifecycle)), jsonMapper); + HttpPostEmitter emitter = new HttpPostEmitter( + config.get(), + HttpClientInit.createClient(builder.build(), LifecycleUtils.asMmxLifecycle(lifecycle)), + jsonMapper + ); + HttpPostEmitterMonitor emitterMonitor = new HttpPostEmitterMonitor( + FeedDefiningMonitor.DEFAULT_METRICS_FEED, + emitter, + ImmutableMap.of() + ); + emitterMonitorProvider.setEmitterMontor(emitterMonitor); + return emitter; } } diff --git a/server/src/main/java/io/druid/server/emitter/ParametrizedUriEmitterModule.java b/server/src/main/java/io/druid/server/emitter/ParametrizedUriEmitterModule.java index 7e9ea0946149..feaccb063049 100644 --- a/server/src/main/java/io/druid/server/emitter/ParametrizedUriEmitterModule.java +++ b/server/src/main/java/io/druid/server/emitter/ParametrizedUriEmitterModule.java @@ -29,6 +29,8 @@ import com.metamx.emitter.core.ParametrizedUriEmitter; import com.metamx.http.client.HttpClientConfig; import com.metamx.http.client.HttpClientInit; +import com.metamx.metrics.FeedDefiningMonitor; +import com.metamx.metrics.ParametrizedUriEmitterMonitor; import io.druid.guice.JsonConfigProvider; import io.druid.guice.ManageLifecycle; import io.druid.guice.http.LifecycleUtils; @@ -53,7 +55,8 @@ public Emitter getEmitter( Supplier config, @Nullable SSLContext sslContext, Lifecycle lifecycle, - ObjectMapper jsonMapper + ObjectMapper jsonMapper, + EmitterMonitorProvider emitterMonitorProvider ) { final HttpClientConfig.Builder builder = HttpClientConfig @@ -63,10 +66,16 @@ public Emitter getEmitter( if (sslContext != null) { builder.withSslContext(sslContext); } - return new ParametrizedUriEmitter( + ParametrizedUriEmitter emitter = new ParametrizedUriEmitter( config.get(), HttpClientInit.createClient(builder.build(), LifecycleUtils.asMmxLifecycle(lifecycle)), jsonMapper ); + ParametrizedUriEmitterMonitor emitterMonitor = new ParametrizedUriEmitterMonitor( + FeedDefiningMonitor.DEFAULT_METRICS_FEED, + emitter + ); + emitterMonitorProvider.setEmitterMontor(emitterMonitor); + return emitter; } } diff --git a/server/src/main/java/io/druid/server/metrics/MetricsModule.java b/server/src/main/java/io/druid/server/metrics/MetricsModule.java index 6d3c4f37404d..b878dda22a80 100644 --- a/server/src/main/java/io/druid/server/metrics/MetricsModule.java +++ b/server/src/main/java/io/druid/server/metrics/MetricsModule.java @@ -41,6 +41,7 @@ import io.druid.guice.ManageLifecycle; import io.druid.java.util.common.logger.Logger; import io.druid.query.ExecutorServiceMonitor; +import io.druid.server.emitter.EmitterMonitorProvider; import java.util.List; import java.util.Set; @@ -84,6 +85,7 @@ public MonitorScheduler getMonitorScheduler( MonitorsConfig monitorsConfig, Set> monitorSet, ServiceEmitter emitter, + EmitterMonitorProvider emitterMonitorProvider, Injector injector ) { @@ -96,6 +98,10 @@ public MonitorScheduler getMonitorScheduler( monitors.add(monitor); } + Monitor emitterMontor = emitterMonitorProvider.getEmitterMontor(); + if (emitterMontor != null) { + monitors.add(emitterMontor); + } return new MonitorScheduler( config.get(), From f72c5e2f3286a8a383a1ee7b31c93b68dfc93848 Mon Sep 17 00:00:00 2001 From: leventov Date: Thu, 19 Oct 2017 23:15:28 -0500 Subject: [PATCH 2/9] Fix typo --- .../druid/server/emitter/EmitterMonitorProvider.java | 10 +++++----- .../io/druid/server/emitter/HttpEmitterModule.java | 2 +- .../server/emitter/ParametrizedUriEmitterModule.java | 2 +- .../java/io/druid/server/metrics/MetricsModule.java | 2 +- 4 files changed, 8 insertions(+), 8 deletions(-) diff --git a/server/src/main/java/io/druid/server/emitter/EmitterMonitorProvider.java b/server/src/main/java/io/druid/server/emitter/EmitterMonitorProvider.java index d7803bdf2098..7a9b23a7d338 100644 --- a/server/src/main/java/io/druid/server/emitter/EmitterMonitorProvider.java +++ b/server/src/main/java/io/druid/server/emitter/EmitterMonitorProvider.java @@ -27,16 +27,16 @@ @LazySingleton public class EmitterMonitorProvider { - private Monitor emitterMontor; + private Monitor emitterMonitor; - public void setEmitterMontor(Monitor emitterMontor) + public void setEmitterMonitor(Monitor emitterMonitor) { - this.emitterMontor = emitterMontor; + this.emitterMonitor = emitterMonitor; } @Nullable - public Monitor getEmitterMontor() + public Monitor getEmitterMonitor() { - return emitterMontor; + return emitterMonitor; } } diff --git a/server/src/main/java/io/druid/server/emitter/HttpEmitterModule.java b/server/src/main/java/io/druid/server/emitter/HttpEmitterModule.java index 9fcd98f2f5b8..6a9252d9578d 100644 --- a/server/src/main/java/io/druid/server/emitter/HttpEmitterModule.java +++ b/server/src/main/java/io/druid/server/emitter/HttpEmitterModule.java @@ -97,7 +97,7 @@ public Emitter getEmitter( emitter, ImmutableMap.of() ); - emitterMonitorProvider.setEmitterMontor(emitterMonitor); + emitterMonitorProvider.setEmitterMonitor(emitterMonitor); return emitter; } } diff --git a/server/src/main/java/io/druid/server/emitter/ParametrizedUriEmitterModule.java b/server/src/main/java/io/druid/server/emitter/ParametrizedUriEmitterModule.java index feaccb063049..b54afaa3dab9 100644 --- a/server/src/main/java/io/druid/server/emitter/ParametrizedUriEmitterModule.java +++ b/server/src/main/java/io/druid/server/emitter/ParametrizedUriEmitterModule.java @@ -75,7 +75,7 @@ public Emitter getEmitter( FeedDefiningMonitor.DEFAULT_METRICS_FEED, emitter ); - emitterMonitorProvider.setEmitterMontor(emitterMonitor); + emitterMonitorProvider.setEmitterMonitor(emitterMonitor); return emitter; } } diff --git a/server/src/main/java/io/druid/server/metrics/MetricsModule.java b/server/src/main/java/io/druid/server/metrics/MetricsModule.java index b878dda22a80..63a976b14c24 100644 --- a/server/src/main/java/io/druid/server/metrics/MetricsModule.java +++ b/server/src/main/java/io/druid/server/metrics/MetricsModule.java @@ -98,7 +98,7 @@ public MonitorScheduler getMonitorScheduler( monitors.add(monitor); } - Monitor emitterMontor = emitterMonitorProvider.getEmitterMontor(); + Monitor emitterMontor = emitterMonitorProvider.getEmitterMonitor(); if (emitterMontor != null) { monitors.add(emitterMontor); } From 0f09b694eaaaf9c2222173699706fe9316753db6 Mon Sep 17 00:00:00 2001 From: leventov Date: Fri, 20 Oct 2017 12:59:44 -0500 Subject: [PATCH 3/9] Fixes --- .../main/java/io/druid/server/metrics/MetricsModule.java | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/server/src/main/java/io/druid/server/metrics/MetricsModule.java b/server/src/main/java/io/druid/server/metrics/MetricsModule.java index 63a976b14c24..e1be36772078 100644 --- a/server/src/main/java/io/druid/server/metrics/MetricsModule.java +++ b/server/src/main/java/io/druid/server/metrics/MetricsModule.java @@ -85,6 +85,9 @@ public MonitorScheduler getMonitorScheduler( MonitorsConfig monitorsConfig, Set> monitorSet, ServiceEmitter emitter, + // emitterMonitorProvider is guaranteed to be initialized, because ServiceEmitter is injected as the previous + // parameter, which depends on Emitter (e. g. HttpPostEmitter or ParametrizedUriEmitter), which initialize + // EmitterMonitorProvider in their @Provider methods. EmitterMonitorProvider emitterMonitorProvider, Injector injector ) @@ -98,9 +101,9 @@ public MonitorScheduler getMonitorScheduler( monitors.add(monitor); } - Monitor emitterMontor = emitterMonitorProvider.getEmitterMonitor(); - if (emitterMontor != null) { - monitors.add(emitterMontor); + Monitor emitterMonitor = emitterMonitorProvider.getEmitterMonitor(); + if (emitterMonitor != null) { + monitors.add(emitterMonitor); } return new MonitorScheduler( From 798e9d86955cae0d92ea591f3b974f7b12db28b6 Mon Sep 17 00:00:00 2001 From: Roman Leventov Date: Thu, 26 Oct 2017 23:41:54 -0500 Subject: [PATCH 4/9] testing new emitter --- .../java/util/common/lifecycle/Lifecycle.java | 40 +++++++++++++++++++ pom.xml | 2 +- .../server/emitter/HttpEmitterConfig.java | 36 ----------------- .../server/emitter/HttpEmitterModule.java | 29 +++++++------- .../emitter/ParametrizedUriEmitterConfig.java | 34 ---------------- .../emitter/ParametrizedUriEmitterModule.java | 28 ++++++------- 6 files changed, 69 insertions(+), 100 deletions(-) delete mode 100644 server/src/main/java/io/druid/server/emitter/HttpEmitterConfig.java delete mode 100644 server/src/main/java/io/druid/server/emitter/ParametrizedUriEmitterConfig.java diff --git a/java-util/src/main/java/io/druid/java/util/common/lifecycle/Lifecycle.java b/java-util/src/main/java/io/druid/java/util/common/lifecycle/Lifecycle.java index 74eef87c8af1..05999be54566 100644 --- a/java-util/src/main/java/io/druid/java/util/common/lifecycle/Lifecycle.java +++ b/java-util/src/main/java/io/druid/java/util/common/lifecycle/Lifecycle.java @@ -23,6 +23,7 @@ import io.druid.java.util.common.ISE; import io.druid.java.util.common.logger.Logger; +import java.io.Closeable; import java.lang.annotation.Annotation; import java.lang.reflect.Method; import java.util.List; @@ -238,6 +239,16 @@ public T addMaybeStartStartCloseInstance(T o, Stage stage) throws Exception return o; } + /** + * Adds a Closeable instance to the lifecycle at {@link Stage#NORMAL} stage, doesn't try to call any "start" method on + * it, use {@link #addStartCloseInstance(Object)} instead if you need the latter behaviour. + */ + public T addCloseableInstance(T o) + { + addHandler(new CloseableHandler(o)); + return o; + } + /** * Adds a handler to the Lifecycle at the Stage.NORMAL stage and starts it if the lifecycle has already been started. * @@ -472,4 +483,33 @@ public void stop() } } } + + private static class CloseableHandler implements Handler + { + private static final Logger log = new Logger(CloseableHandler.class); + private final Closeable o; + + private CloseableHandler(Closeable o) + { + this.o = o; + } + + @Override + public void start() throws Exception + { + // do nothing + } + + @Override + public void stop() + { + log.info("Closing object[%s]", o); + try { + o.close(); + } + catch (Exception e) { + log.error(e, "Exception when closing object [%s]", o); + } + } + } } diff --git a/pom.xml b/pom.xml index 747322204c66..4ce5964f8b1a 100644 --- a/pom.xml +++ b/pom.xml @@ -146,7 +146,7 @@ com.metamx java-util - 1.0 + 1.3.0 commons-codec diff --git a/server/src/main/java/io/druid/server/emitter/HttpEmitterConfig.java b/server/src/main/java/io/druid/server/emitter/HttpEmitterConfig.java deleted file mode 100644 index 55dc4f8a0929..000000000000 --- a/server/src/main/java/io/druid/server/emitter/HttpEmitterConfig.java +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Licensed to Metamarkets Group Inc. (Metamarkets) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. Metamarkets licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package io.druid.server.emitter; - -import com.fasterxml.jackson.annotation.JsonProperty; -import org.joda.time.Period; - -/** - */ -public class HttpEmitterConfig extends com.metamx.emitter.core.HttpEmitterConfig -{ - @JsonProperty - private Period readTimeout = new Period("PT5M"); - - public Period getReadTimeout() - { - return readTimeout; - } -} diff --git a/server/src/main/java/io/druid/server/emitter/HttpEmitterModule.java b/server/src/main/java/io/druid/server/emitter/HttpEmitterModule.java index 6a9252d9578d..8493fac55df0 100644 --- a/server/src/main/java/io/druid/server/emitter/HttpEmitterModule.java +++ b/server/src/main/java/io/druid/server/emitter/HttpEmitterModule.java @@ -28,16 +28,19 @@ import com.google.inject.name.Named; import com.google.inject.util.Providers; import com.metamx.emitter.core.Emitter; +import com.metamx.emitter.core.HttpEmitterConfig; import com.metamx.emitter.core.HttpPostEmitter; -import com.metamx.http.client.HttpClientConfig; -import com.metamx.http.client.HttpClientInit; import com.metamx.metrics.FeedDefiningMonitor; import com.metamx.metrics.HttpPostEmitterMonitor; import io.druid.guice.JsonConfigProvider; import io.druid.guice.LazySingleton; import io.druid.guice.ManageLifecycle; -import io.druid.guice.http.LifecycleUtils; import io.druid.java.util.common.lifecycle.Lifecycle; +import io.netty.handler.ssl.ClientAuth; +import io.netty.handler.ssl.JdkSslContext; +import org.asynchttpclient.AsyncHttpClient; +import org.asynchttpclient.DefaultAsyncHttpClient; +import org.asynchttpclient.DefaultAsyncHttpClientConfig; import javax.annotation.Nullable; import javax.net.ssl.SSLContext; @@ -79,20 +82,16 @@ public Emitter getEmitter( EmitterMonitorProvider emitterMonitorProvider ) { - final HttpClientConfig.Builder builder = HttpClientConfig - .builder() - .withNumConnections(1) - .withReadTimeout(config.get().getReadTimeout().toStandardDuration()); - + final DefaultAsyncHttpClientConfig.Builder builder = new DefaultAsyncHttpClientConfig.Builder(); if (sslContext != null) { - builder.withSslContext(sslContext); + builder.setSslContext(new JdkSslContext(sslContext, true, ClientAuth.NONE)); } - HttpPostEmitter emitter = new HttpPostEmitter( - config.get(), - HttpClientInit.createClient(builder.build(), LifecycleUtils.asMmxLifecycle(lifecycle)), - jsonMapper - ); - HttpPostEmitterMonitor emitterMonitor = new HttpPostEmitterMonitor( + final AsyncHttpClient client = new DefaultAsyncHttpClient(builder.build()); + lifecycle.addCloseableInstance(client); + + final HttpPostEmitter emitter = new HttpPostEmitter(config.get(), client, jsonMapper); + + final HttpPostEmitterMonitor emitterMonitor = new HttpPostEmitterMonitor( FeedDefiningMonitor.DEFAULT_METRICS_FEED, emitter, ImmutableMap.of() diff --git a/server/src/main/java/io/druid/server/emitter/ParametrizedUriEmitterConfig.java b/server/src/main/java/io/druid/server/emitter/ParametrizedUriEmitterConfig.java deleted file mode 100644 index 665c6f2fb705..000000000000 --- a/server/src/main/java/io/druid/server/emitter/ParametrizedUriEmitterConfig.java +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Licensed to Metamarkets Group Inc. (Metamarkets) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. Metamarkets licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package io.druid.server.emitter; - -import com.fasterxml.jackson.annotation.JsonProperty; -import org.joda.time.Period; - -public class ParametrizedUriEmitterConfig extends com.metamx.emitter.core.ParametrizedUriEmitterConfig -{ - @JsonProperty - private Period readTimeout = new Period("PT5M"); - - public Period getReadTimeout() - { - return readTimeout; - } -} diff --git a/server/src/main/java/io/druid/server/emitter/ParametrizedUriEmitterModule.java b/server/src/main/java/io/druid/server/emitter/ParametrizedUriEmitterModule.java index b54afaa3dab9..a8f436ef6246 100644 --- a/server/src/main/java/io/druid/server/emitter/ParametrizedUriEmitterModule.java +++ b/server/src/main/java/io/druid/server/emitter/ParametrizedUriEmitterModule.java @@ -27,14 +27,17 @@ import com.google.inject.name.Named; import com.metamx.emitter.core.Emitter; import com.metamx.emitter.core.ParametrizedUriEmitter; -import com.metamx.http.client.HttpClientConfig; -import com.metamx.http.client.HttpClientInit; +import com.metamx.emitter.core.ParametrizedUriEmitterConfig; import com.metamx.metrics.FeedDefiningMonitor; import com.metamx.metrics.ParametrizedUriEmitterMonitor; import io.druid.guice.JsonConfigProvider; import io.druid.guice.ManageLifecycle; -import io.druid.guice.http.LifecycleUtils; import io.druid.java.util.common.lifecycle.Lifecycle; +import io.netty.handler.ssl.ClientAuth; +import io.netty.handler.ssl.JdkSslContext; +import org.asynchttpclient.AsyncHttpClient; +import org.asynchttpclient.DefaultAsyncHttpClient; +import org.asynchttpclient.DefaultAsyncHttpClientConfig; import javax.annotation.Nullable; import javax.net.ssl.SSLContext; @@ -59,19 +62,16 @@ public Emitter getEmitter( EmitterMonitorProvider emitterMonitorProvider ) { - final HttpClientConfig.Builder builder = HttpClientConfig - .builder() - .withNumConnections(1) - .withReadTimeout(config.get().getReadTimeout().toStandardDuration()); + final DefaultAsyncHttpClientConfig.Builder builder = new DefaultAsyncHttpClientConfig.Builder(); if (sslContext != null) { - builder.withSslContext(sslContext); + builder.setSslContext(new JdkSslContext(sslContext, true, ClientAuth.NONE)); } - ParametrizedUriEmitter emitter = new ParametrizedUriEmitter( - config.get(), - HttpClientInit.createClient(builder.build(), LifecycleUtils.asMmxLifecycle(lifecycle)), - jsonMapper - ); - ParametrizedUriEmitterMonitor emitterMonitor = new ParametrizedUriEmitterMonitor( + final AsyncHttpClient client = new DefaultAsyncHttpClient(builder.build()); + lifecycle.addCloseableInstance(client); + + final ParametrizedUriEmitter emitter = new ParametrizedUriEmitter(config.get(), client, jsonMapper); + + final ParametrizedUriEmitterMonitor emitterMonitor = new ParametrizedUriEmitterMonitor( FeedDefiningMonitor.DEFAULT_METRICS_FEED, emitter ); From 208697c556c06c3690f37be18fa99f46d0316632 Mon Sep 17 00:00:00 2001 From: Goh Wei Xiang Date: Fri, 27 Oct 2017 08:58:49 -0700 Subject: [PATCH 5/9] Fix failed test (#71) * testing new emitter * fix on failed test --- .../src/test/java/io/druid/server/emitter/EmitterModuleTest.java | 1 - 1 file changed, 1 deletion(-) diff --git a/server/src/test/java/io/druid/server/emitter/EmitterModuleTest.java b/server/src/test/java/io/druid/server/emitter/EmitterModuleTest.java index d693d989d956..52a6876a5b62 100644 --- a/server/src/test/java/io/druid/server/emitter/EmitterModuleTest.java +++ b/server/src/test/java/io/druid/server/emitter/EmitterModuleTest.java @@ -52,7 +52,6 @@ public void testParametrizedUriEmitterConfig() props.setProperty("druid.emitter.parametrized.httpEmitting.basicAuthentication", "a:b"); props.setProperty("druid.emitter.parametrized.httpEmitting.batchingStrategy", "NEWLINES"); props.setProperty("druid.emitter.parametrized.httpEmitting.maxBatchSize", "4"); - props.setProperty("druid.emitter.parametrized.httpEmitting.maxBufferSize", "8"); props.setProperty("druid.emitter.parametrized.httpEmitting.flushTimeOut", "1000"); final Emitter emitter = From 7f39154c9474e5af11605130432d8178d1165d99 Mon Sep 17 00:00:00 2001 From: leventov Date: Tue, 31 Oct 2017 17:58:42 -0500 Subject: [PATCH 6/9] Remove emitter's readTimeout from docs --- docs/content/configuration/index.md | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/docs/content/configuration/index.md b/docs/content/configuration/index.md index 9bd7b97ce49c..2f4a1966564d 100644 --- a/docs/content/configuration/index.md +++ b/docs/content/configuration/index.md @@ -216,7 +216,6 @@ The Druid servers [emit various metrics](../operations/metrics.html) and alerts |Property|Description|Default| |--------|-----------|-------| -|`druid.emitter.http.readTimeout`|The timeout for data reads.|PT5M| |`druid.emitter.http.flushMillis`|How often the internal message buffer is flushed (data is sent).|60000| |`druid.emitter.http.flushCount`|How many messages the internal message buffer can hold before flushing (sending).|500| |`druid.emitter.http.basicAuthentication`|Login and password for authentification in "login:password" form, e. g. `druid.emitter.http.basicAuthentication=admin:adminpassword`|not specified = no authentification| @@ -228,14 +227,13 @@ The Druid servers [emit various metrics](../operations/metrics.html) and alerts #### Parametrized Http Emitter Module `druid.emitter.parametrized.httpEmitting.*` configs correspond to the configs of Http Emitter Modules, see above. -Except `readTimeout` and `recipientBaseUrl`. E. g. `druid.emitter.parametrized.httpEmitting.flushMillis`, +Except `recipientBaseUrl`. E. g. `druid.emitter.parametrized.httpEmitting.flushMillis`, `druid.emitter.parametrized.httpEmitting.flushCount`, etc. The additional configs are: |Property|Description|Default| |--------|-----------|-------| -|`druid.emitter.parametrized.readTimeout`|The timeout for data reads.|PT5M| |`druid.emitter.parametrized.recipientBaseUrlPattern`|The URL pattern to send an event to, based on the event's feed. E. g. `http://foo.bar/{feed}`, that will send event to `http://foo.bar/metrics` if the event's feed is "metrics".|none, required config| #### Composing Emitter Module From 70d0a815e51af5c08d10a66e010192251f5cf1e3 Mon Sep 17 00:00:00 2001 From: leventov Date: Tue, 31 Oct 2017 18:19:03 -0500 Subject: [PATCH 7/9] Update docs --- docs/content/configuration/index.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/docs/content/configuration/index.md b/docs/content/configuration/index.md index 2f4a1966564d..47f1e0da1c16 100644 --- a/docs/content/configuration/index.md +++ b/docs/content/configuration/index.md @@ -222,6 +222,8 @@ The Druid servers [emit various metrics](../operations/metrics.html) and alerts |`druid.emitter.http.flushTimeOut|The timeout after which an event should be sent to the endpoint, even if internal buffers are not filled, in milliseconds.|not specified = no timeout| |`druid.emitter.http.batchingStrategy`|The strategy of how the batch is formatted. "ARRAY" means `[event1,event2]`, "NEWLINES" means `event1\nevent2`, ONLY_EVENTS means `event1event2`.|ARRAY| |`druid.emitter.http.maxBatchSize`|The maximum batch size, in bytes.|5191680 (i. e. 5 MB)| +|`druid.emitter.http.batchQueueSizeLimit`|The maximum number of batches in emitter queue, if there are problems with emitting.|50| +|`druid.emitter.http.minHttpTimeoutMillis`|If the speed of filling batches imposes timeout smaller than that, not even trying to send batch to endpoint, because it will likely fail, not being able to send the data that fast. Configure this depending based on emitter/successfulSending/minTimeMs metric. Reasonable values are 10ms..100ms.|0| |`druid.emitter.http.recipientBaseUrl`|The base URL to emit messages to. Druid will POST JSON to be consumed at the HTTP endpoint specified by this property.|none, required config| #### Parametrized Http Emitter Module From de1cc0a4743d69b8e250bd2f354fc4e41c733979 Mon Sep 17 00:00:00 2001 From: leventov Date: Wed, 1 Nov 2017 16:52:14 -0500 Subject: [PATCH 8/9] Add HttpEmittingMonitor --- docs/content/configuration/index.md | 1 + .../emitter/EmitterMonitorProvider.java | 42 ------------ .../server/emitter/HttpEmitterModule.java | 16 +---- .../server/emitter/HttpEmittingMonitor.java | 67 +++++++++++++++++++ .../emitter/ParametrizedUriEmitterModule.java | 14 +--- .../druid/server/metrics/MetricsModule.java | 11 +-- 6 files changed, 73 insertions(+), 78 deletions(-) delete mode 100644 server/src/main/java/io/druid/server/emitter/EmitterMonitorProvider.java create mode 100644 server/src/main/java/io/druid/server/emitter/HttpEmittingMonitor.java diff --git a/docs/content/configuration/index.md b/docs/content/configuration/index.md index 47f1e0da1c16..66b8ca41f76f 100644 --- a/docs/content/configuration/index.md +++ b/docs/content/configuration/index.md @@ -196,6 +196,7 @@ The following monitors are available: |`io.druid.segment.realtime.RealtimeMetricsMonitor`|Reports statistics on Realtime nodes.| |`io.druid.server.metrics.EventReceiverFirehoseMonitor`|Reports how many events have been queued in the EventReceiverFirehose.| |`io.druid.server.metrics.QueryCountStatsMonitor`|Reports how many queries have been successful/failed/interrupted.| +|`io.druid.server.emitter.HttpEmitterMonitor`|Reports internal metrics of `http` or `parametrized` emitter (see below). Must not be used with another emitter type. See the description of the metrics here: https://github.com/druid-io/druid/pull/4973.| ### Emitting Metrics diff --git a/server/src/main/java/io/druid/server/emitter/EmitterMonitorProvider.java b/server/src/main/java/io/druid/server/emitter/EmitterMonitorProvider.java deleted file mode 100644 index 7a9b23a7d338..000000000000 --- a/server/src/main/java/io/druid/server/emitter/EmitterMonitorProvider.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Licensed to Metamarkets Group Inc. (Metamarkets) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. Metamarkets licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package io.druid.server.emitter; - -import com.metamx.metrics.Monitor; -import io.druid.guice.LazySingleton; - -import javax.annotation.Nullable; - -@LazySingleton -public class EmitterMonitorProvider -{ - private Monitor emitterMonitor; - - public void setEmitterMonitor(Monitor emitterMonitor) - { - this.emitterMonitor = emitterMonitor; - } - - @Nullable - public Monitor getEmitterMonitor() - { - return emitterMonitor; - } -} diff --git a/server/src/main/java/io/druid/server/emitter/HttpEmitterModule.java b/server/src/main/java/io/druid/server/emitter/HttpEmitterModule.java index 8493fac55df0..7497c21f8f12 100644 --- a/server/src/main/java/io/druid/server/emitter/HttpEmitterModule.java +++ b/server/src/main/java/io/druid/server/emitter/HttpEmitterModule.java @@ -21,7 +21,6 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Supplier; -import com.google.common.collect.ImmutableMap; import com.google.inject.Binder; import com.google.inject.Module; import com.google.inject.Provides; @@ -30,8 +29,6 @@ import com.metamx.emitter.core.Emitter; import com.metamx.emitter.core.HttpEmitterConfig; import com.metamx.emitter.core.HttpPostEmitter; -import com.metamx.metrics.FeedDefiningMonitor; -import com.metamx.metrics.HttpPostEmitterMonitor; import io.druid.guice.JsonConfigProvider; import io.druid.guice.LazySingleton; import io.druid.guice.ManageLifecycle; @@ -78,8 +75,7 @@ public Emitter getEmitter( Supplier config, @Nullable SSLContext sslContext, Lifecycle lifecycle, - ObjectMapper jsonMapper, - EmitterMonitorProvider emitterMonitorProvider + ObjectMapper jsonMapper ) { final DefaultAsyncHttpClientConfig.Builder builder = new DefaultAsyncHttpClientConfig.Builder(); @@ -89,14 +85,6 @@ public Emitter getEmitter( final AsyncHttpClient client = new DefaultAsyncHttpClient(builder.build()); lifecycle.addCloseableInstance(client); - final HttpPostEmitter emitter = new HttpPostEmitter(config.get(), client, jsonMapper); - - final HttpPostEmitterMonitor emitterMonitor = new HttpPostEmitterMonitor( - FeedDefiningMonitor.DEFAULT_METRICS_FEED, - emitter, - ImmutableMap.of() - ); - emitterMonitorProvider.setEmitterMonitor(emitterMonitor); - return emitter; + return new HttpPostEmitter(config.get(), client, jsonMapper); } } diff --git a/server/src/main/java/io/druid/server/emitter/HttpEmittingMonitor.java b/server/src/main/java/io/druid/server/emitter/HttpEmittingMonitor.java new file mode 100644 index 000000000000..0bb85ef55ce8 --- /dev/null +++ b/server/src/main/java/io/druid/server/emitter/HttpEmittingMonitor.java @@ -0,0 +1,67 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.server.emitter; + +import com.google.common.collect.ImmutableMap; +import com.google.inject.Inject; +import com.metamx.emitter.core.Emitter; +import com.metamx.emitter.core.HttpPostEmitter; +import com.metamx.emitter.core.ParametrizedUriEmitter; +import com.metamx.emitter.service.ServiceEmitter; +import com.metamx.metrics.AbstractMonitor; +import com.metamx.metrics.FeedDefiningMonitor; +import com.metamx.metrics.HttpPostEmitterMonitor; +import com.metamx.metrics.ParametrizedUriEmitterMonitor; + +/** + * Able to monitor {@link HttpPostEmitter} or {@link ParametrizedUriEmitter}, which is based on the former. + */ +public class HttpEmittingMonitor extends AbstractMonitor +{ + private AbstractMonitor delegate; + + @Inject + public HttpEmittingMonitor(Emitter emitter) + { + if (emitter instanceof HttpPostEmitter) { + delegate = new HttpPostEmitterMonitor( + FeedDefiningMonitor.DEFAULT_METRICS_FEED, + (HttpPostEmitter) emitter, + ImmutableMap.of() + ); + } else if (emitter instanceof ParametrizedUriEmitter) { + delegate = new ParametrizedUriEmitterMonitor( + FeedDefiningMonitor.DEFAULT_METRICS_FEED, + (ParametrizedUriEmitter) emitter + ); + } else { + throw new IllegalStateException( + "Unable to use HttpEmittingMonitor with emitter other than HttpPostEmitter or ParametrizedUriEmitter, " + + emitter.getClass() + " is used" + ); + } + } + + @Override + public boolean doMonitor(ServiceEmitter serviceEmitter) + { + return delegate.doMonitor(serviceEmitter); + } +} diff --git a/server/src/main/java/io/druid/server/emitter/ParametrizedUriEmitterModule.java b/server/src/main/java/io/druid/server/emitter/ParametrizedUriEmitterModule.java index a8f436ef6246..6d8633c9f6c3 100644 --- a/server/src/main/java/io/druid/server/emitter/ParametrizedUriEmitterModule.java +++ b/server/src/main/java/io/druid/server/emitter/ParametrizedUriEmitterModule.java @@ -28,8 +28,6 @@ import com.metamx.emitter.core.Emitter; import com.metamx.emitter.core.ParametrizedUriEmitter; import com.metamx.emitter.core.ParametrizedUriEmitterConfig; -import com.metamx.metrics.FeedDefiningMonitor; -import com.metamx.metrics.ParametrizedUriEmitterMonitor; import io.druid.guice.JsonConfigProvider; import io.druid.guice.ManageLifecycle; import io.druid.java.util.common.lifecycle.Lifecycle; @@ -58,8 +56,7 @@ public Emitter getEmitter( Supplier config, @Nullable SSLContext sslContext, Lifecycle lifecycle, - ObjectMapper jsonMapper, - EmitterMonitorProvider emitterMonitorProvider + ObjectMapper jsonMapper ) { final DefaultAsyncHttpClientConfig.Builder builder = new DefaultAsyncHttpClientConfig.Builder(); @@ -69,13 +66,6 @@ public Emitter getEmitter( final AsyncHttpClient client = new DefaultAsyncHttpClient(builder.build()); lifecycle.addCloseableInstance(client); - final ParametrizedUriEmitter emitter = new ParametrizedUriEmitter(config.get(), client, jsonMapper); - - final ParametrizedUriEmitterMonitor emitterMonitor = new ParametrizedUriEmitterMonitor( - FeedDefiningMonitor.DEFAULT_METRICS_FEED, - emitter - ); - emitterMonitorProvider.setEmitterMonitor(emitterMonitor); - return emitter; + return new ParametrizedUriEmitter(config.get(), client, jsonMapper); } } diff --git a/server/src/main/java/io/druid/server/metrics/MetricsModule.java b/server/src/main/java/io/druid/server/metrics/MetricsModule.java index e1be36772078..1f24942c9351 100644 --- a/server/src/main/java/io/druid/server/metrics/MetricsModule.java +++ b/server/src/main/java/io/druid/server/metrics/MetricsModule.java @@ -34,14 +34,13 @@ import com.metamx.metrics.Monitor; import com.metamx.metrics.MonitorScheduler; import com.metamx.metrics.SysMonitor; -import io.druid.java.util.common.concurrent.Execs; import io.druid.guice.DruidBinders; import io.druid.guice.JsonConfigProvider; import io.druid.guice.LazySingleton; import io.druid.guice.ManageLifecycle; +import io.druid.java.util.common.concurrent.Execs; import io.druid.java.util.common.logger.Logger; import io.druid.query.ExecutorServiceMonitor; -import io.druid.server.emitter.EmitterMonitorProvider; import java.util.List; import java.util.Set; @@ -85,10 +84,6 @@ public MonitorScheduler getMonitorScheduler( MonitorsConfig monitorsConfig, Set> monitorSet, ServiceEmitter emitter, - // emitterMonitorProvider is guaranteed to be initialized, because ServiceEmitter is injected as the previous - // parameter, which depends on Emitter (e. g. HttpPostEmitter or ParametrizedUriEmitter), which initialize - // EmitterMonitorProvider in their @Provider methods. - EmitterMonitorProvider emitterMonitorProvider, Injector injector ) { @@ -101,10 +96,6 @@ public MonitorScheduler getMonitorScheduler( monitors.add(monitor); } - Monitor emitterMonitor = emitterMonitorProvider.getEmitterMonitor(); - if (emitterMonitor != null) { - monitors.add(emitterMonitor); - } return new MonitorScheduler( config.get(), From a997bc2693e386eea16635c6e34fb462de4c62b8 Mon Sep 17 00:00:00 2001 From: leventov Date: Thu, 2 Nov 2017 15:25:16 -0500 Subject: [PATCH 9/9] Update java-util to 1.3.2 --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 4ce5964f8b1a..6a002942d643 100644 --- a/pom.xml +++ b/pom.xml @@ -146,7 +146,7 @@ com.metamx java-util - 1.3.0 + 1.3.2 commons-codec