From 2724f22f8ea4b54f138b64655b79b27024a2c975 Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Tue, 23 May 2023 14:04:29 -0700 Subject: [PATCH 1/8] Add ZooKeeper connection state alerts and metrics. - New metric "zk/connected" is an indicator showing 1 when connected, 0 when disconnected. - New metric "zk/disconnected/time" measures time spent disconnected. - New alert when Curator connection state enters LOST or SUSPENDED. --- docs/operations/metrics.md | 9 ++ .../java/util/emitter/EmittingLogger.java | 18 +-- .../util/emitter/service/AlertBuilder.java | 20 ++- .../apache/druid/curator/CuratorModule.java | 81 +++++++++--- .../curator/DruidConnectionStateListener.java | 121 ++++++++++++++++++ 5 files changed, 214 insertions(+), 35 deletions(-) create mode 100644 server/src/main/java/org/apache/druid/curator/DruidConnectionStateListener.java diff --git a/docs/operations/metrics.md b/docs/operations/metrics.md index 58334f44b289..7364dc02b59e 100644 --- a/docs/operations/metrics.md +++ b/docs/operations/metrics.md @@ -358,6 +358,15 @@ These metrics are only available if the `JVMMonitor` module is included. |`jvm/gc/count`|Garbage collection count|`gcName` (cms/g1/parallel/etc.), `gcGen` (old/young)|Varies| |`jvm/gc/cpu`|Count of CPU time in Nanoseconds spent on garbage collection. Note: `jvm/gc/cpu` represents the total time over multiple GC cycles; divide by `jvm/gc/count` to get the mean GC time per cycle.|`gcName`, `gcGen`|Sum of `jvm/gc/cpu` should be within 10-30% of sum of `jvm/cpu/total`, depending on the GC algorithm used (reported by [`JvmCpuMonitor`](../configuration/index.md#enabling-metrics)). | +### ZooKeeper, Curator + +These metrics are available unless `druid.zk.service.enabled = false`. + +|Metric|Description|Dimensions|Normal Value| +|------|-----------|----------|------------| +|`zk/connected`|Indicator of connection status. `1` for connected, `0` for disconnected. Emitted once per monitor period.|None|1| +|`zk/disconnected/time`|Amount of time, in milliseconds, that the server was disconnected from ZooKeeper. Emitted on reconnection. Note that this means the metric is not emitted if connection to ZooKeeper is permanently lost.|None|Not present| + ### EventReceiverFirehose The following metric is only available if the `EventReceiverFirehoseMonitor` module is included. diff --git a/processing/src/main/java/org/apache/druid/java/util/emitter/EmittingLogger.java b/processing/src/main/java/org/apache/druid/java/util/emitter/EmittingLogger.java index 7ad5d1091de9..66df7ea7d740 100644 --- a/processing/src/main/java/org/apache/druid/java/util/emitter/EmittingLogger.java +++ b/processing/src/main/java/org/apache/druid/java/util/emitter/EmittingLogger.java @@ -29,8 +29,6 @@ import org.apache.druid.java.util.emitter.service.ServiceEmitter; import javax.annotation.Nullable; -import java.io.PrintWriter; -import java.io.StringWriter; /** */ @@ -93,19 +91,9 @@ public AlertBuilder makeAlert(@Nullable Throwable t, String message, Object... o throw e; } - final AlertBuilder retVal = new EmittingAlertBuilder(t, StringUtils.format(message, objects), emitter) - .addData("class", className); - - if (t != null) { - final StringWriter trace = new StringWriter(); - final PrintWriter pw = new PrintWriter(trace); - t.printStackTrace(pw); - retVal.addData("exceptionType", t.getClass()); - retVal.addData("exceptionMessage", t.getMessage()); - retVal.addData("exceptionStackTrace", trace.toString()); - } - - return retVal; + return new EmittingAlertBuilder(t, StringUtils.format(message, objects), emitter) + .addData("class", className) + .addThrowable(t); } public class EmittingAlertBuilder extends AlertBuilder diff --git a/processing/src/main/java/org/apache/druid/java/util/emitter/service/AlertBuilder.java b/processing/src/main/java/org/apache/druid/java/util/emitter/service/AlertBuilder.java index 9938e7cb7ae3..2dc8222bd771 100644 --- a/processing/src/main/java/org/apache/druid/java/util/emitter/service/AlertBuilder.java +++ b/processing/src/main/java/org/apache/druid/java/util/emitter/service/AlertBuilder.java @@ -24,10 +24,14 @@ import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.StringUtils; +import javax.annotation.Nullable; +import java.io.PrintWriter; +import java.io.StringWriter; import java.util.Map; /** -*/ + * + */ public class AlertBuilder extends ServiceEventBuilder { protected final Map dataMap = Maps.newLinkedHashMap(); @@ -67,6 +71,20 @@ public AlertBuilder addData(Map data) return this; } + public AlertBuilder addThrowable(@Nullable final Throwable t) + { + if (t != null) { + final StringWriter trace = new StringWriter(); + final PrintWriter pw = new PrintWriter(trace); + t.printStackTrace(pw); + addData("exceptionType", t.getClass().getName()); + addData("exceptionMessage", t.getMessage()); + addData("exceptionStackTrace", trace.toString()); + } + + return this; + } + public AlertBuilder severity(AlertEvent.Severity severity) { this.severity = severity; diff --git a/server/src/main/java/org/apache/druid/curator/CuratorModule.java b/server/src/main/java/org/apache/druid/curator/CuratorModule.java index 201c96bfdff0..43d1f7d651e8 100644 --- a/server/src/main/java/org/apache/druid/curator/CuratorModule.java +++ b/server/src/main/java/org/apache/druid/curator/CuratorModule.java @@ -36,6 +36,9 @@ import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.lifecycle.Lifecycle; import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.java.util.emitter.service.AlertBuilder; +import org.apache.druid.java.util.emitter.service.ServiceEmitter; +import org.apache.druid.java.util.metrics.MonitorScheduler; import org.apache.zookeeper.ZooDefs; import org.apache.zookeeper.data.ACL; @@ -58,7 +61,6 @@ public CuratorModule() } /** - * * @param haltOnFailedStart set to true if the JVM needs to be halted within 30 seconds of failed initialization * due to unhandled curator exceptions. */ @@ -88,7 +90,8 @@ public static CuratorFramework createCurator(CuratorConfig config) ); } - RetryPolicy retryPolicy = new BoundedExponentialBackoffRetry(BASE_SLEEP_TIME_MS, MAX_SLEEP_TIME_MS, config.getMaxZkRetries()); + final RetryPolicy retryPolicy = + new BoundedExponentialBackoffRetry(BASE_SLEEP_TIME_MS, MAX_SLEEP_TIME_MS, config.getMaxZkRetries()); return builder .ensembleProvider(new FixedEnsembleProvider(config.getZkHosts())) @@ -105,7 +108,13 @@ public static CuratorFramework createCurator(CuratorConfig config) */ @Provides @LazySingleton - public CuratorFramework makeCurator(ZkEnablementConfig zkEnablementConfig, CuratorConfig config, Lifecycle lifecycle) + public CuratorFramework makeCurator( + final ZkEnablementConfig zkEnablementConfig, + final CuratorConfig config, + final DruidConnectionStateListener connectionStateListener, + final ServiceEmitter emitter, + final Lifecycle lifecycle + ) { if (!zkEnablementConfig.isEnabled()) { throw new RuntimeException("Zookeeper is disabled, cannot create CuratorFramework."); @@ -113,7 +122,37 @@ public CuratorFramework makeCurator(ZkEnablementConfig zkEnablementConfig, Curat final CuratorFramework framework = createCurator(config); + framework.getConnectionStateListenable().addListener(connectionStateListener); + addUnhandledErrorListener(framework, emitter, lifecycle); + addLifecycleHandler(framework, lifecycle); + + return framework; + } + + /** + * Provide an instance of {@link DruidConnectionStateListener} for monitoring connection state. + */ + @Provides + @LazySingleton + public DruidConnectionStateListener makeConnectionStateListener( + final ServiceEmitter emitter, + final MonitorScheduler monitorScheduler + ) + { + return new DruidConnectionStateListener(monitorScheduler, emitter); + } + + /** + * Add unhandled error listener that shuts down the JVM. + */ + private void addUnhandledErrorListener( + final CuratorFramework framework, + final ServiceEmitter emitter, + final Lifecycle lifecycle + ) + { framework.getUnhandledErrorListenable().addListener((message, e) -> { + emitter.emit(AlertBuilder.create("Unhandled Curator error").addThrowable(e)); log.error(e, "Unhandled error in Curator, stopping server."); if (haltOnFailedStart) { @@ -140,7 +179,13 @@ public CuratorFramework makeCurator(ZkEnablementConfig zkEnablementConfig, Curat shutdown(lifecycle); }); + } + /** + * Add unhandled error listener that shuts down the JVM. + */ + private void addLifecycleHandler(final CuratorFramework framework, final Lifecycle lifecycle) + { lifecycle.addHandler( new Lifecycle.Handler() { @@ -159,11 +204,23 @@ public void stop() } } ); + } - return framework; + private void shutdown(Lifecycle lifecycle) + { + //noinspection finally (not completing the 'finally' block normally is intentional) + try { + lifecycle.stop(); + } + catch (Throwable t) { + log.error(t, "Exception when stopping server after unhandled Curator error."); + } + finally { + System.exit(1); + } } - static class SecuredACLProvider implements ACLProvider + private static class SecuredACLProvider implements ACLProvider { @Override public List getDefaultAcl() @@ -177,18 +234,4 @@ public List getAclForPath(String path) return ZooDefs.Ids.CREATOR_ALL_ACL; } } - - private void shutdown(Lifecycle lifecycle) - { - //noinspection finally (not completing the 'finally' block normally is intentional) - try { - lifecycle.stop(); - } - catch (Throwable t) { - log.error(t, "Exception when stopping server after unhandled Curator error."); - } - finally { - System.exit(1); - } - } } diff --git a/server/src/main/java/org/apache/druid/curator/DruidConnectionStateListener.java b/server/src/main/java/org/apache/druid/curator/DruidConnectionStateListener.java new file mode 100644 index 000000000000..4bb8c28e2526 --- /dev/null +++ b/server/src/main/java/org/apache/druid/curator/DruidConnectionStateListener.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.curator; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.state.ConnectionState; +import org.apache.curator.framework.state.ConnectionStateListener; +import org.apache.druid.java.util.emitter.service.AlertBuilder; +import org.apache.druid.java.util.emitter.service.ServiceEmitter; +import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; +import org.apache.druid.java.util.metrics.AbstractMonitor; +import org.apache.druid.java.util.metrics.MonitorScheduler; + +import javax.annotation.concurrent.GuardedBy; + +/** + * Curator {@link ConnectionStateListener} that uses a {@link ServiceEmitter} to send alerts on ZK connection loss, + * and emit metrics about ZK connection status. + */ +public class DruidConnectionStateListener implements ConnectionStateListener +{ + private static final String METRIC_IS_CONNECTED = "zk/connected"; + private static final String METRIC_DISCONNECTED_TIME = "zk/disconnected/time"; + private static final int NIL = -1; + + private final MonitorScheduler monitorScheduler; + private final ServiceEmitter emitter; + + /** + * Current connection state. + */ + @GuardedBy("this") + private ConnectionState currentState; + + /** + * Time given by {@link System#currentTimeMillis()} at last disconnect. + */ + @GuardedBy("this") + private long lastDisconnectTime = NIL; + + public DruidConnectionStateListener(final MonitorScheduler monitorScheduler, final ServiceEmitter emitter) + { + this.monitorScheduler = monitorScheduler; + this.emitter = emitter; + } + + @Override + public void stateChanged(CuratorFramework curatorFramework, ConnectionState newState) + { + if (newState.isConnected()) { + final boolean isFirst; + final long disconnectDuration; + + synchronized (this) { + if (lastDisconnectTime != NIL) { + disconnectDuration = System.currentTimeMillis() - lastDisconnectTime; + } else { + disconnectDuration = 0; + } + + isFirst = currentState == null; + currentState = newState; + lastDisconnectTime = NIL; + } + + if (isFirst) { + monitorScheduler.addMonitor(createMonitor()); + } + + if (disconnectDuration > 0) { + emitter.emit(ServiceMetricEvent.builder().build(METRIC_DISCONNECTED_TIME, disconnectDuration)); + } + } else { + synchronized (this) { + currentState = newState; + lastDisconnectTime = Math.max(lastDisconnectTime, System.currentTimeMillis()); + } + + emitter.emit(AlertBuilder.create("ZooKeeper connection state[%s]", newState)); + } + } + + public boolean isConnected() + { + synchronized (this) { + return currentState != null && currentState.isConnected(); + } + } + + public Monitor createMonitor() + { + return new Monitor(); + } + + private class Monitor extends AbstractMonitor + { + @Override + public boolean doMonitor(ServiceEmitter emitter) + { + emitter.emit(ServiceMetricEvent.builder().build(METRIC_IS_CONNECTED, isConnected() ? 1 : 0)); + return true; + } + } +} From b376e519543448c8c423e5aebcdec9b65114c70f Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Tue, 23 May 2023 14:12:48 -0700 Subject: [PATCH 2/8] Use right GuardedBy. --- .../org/apache/druid/curator/DruidConnectionStateListener.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/server/src/main/java/org/apache/druid/curator/DruidConnectionStateListener.java b/server/src/main/java/org/apache/druid/curator/DruidConnectionStateListener.java index 4bb8c28e2526..2c39d778ca52 100644 --- a/server/src/main/java/org/apache/druid/curator/DruidConnectionStateListener.java +++ b/server/src/main/java/org/apache/druid/curator/DruidConnectionStateListener.java @@ -19,6 +19,7 @@ package org.apache.druid.curator; +import com.google.errorprone.annotations.concurrent.GuardedBy; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.state.ConnectionState; import org.apache.curator.framework.state.ConnectionStateListener; @@ -28,8 +29,6 @@ import org.apache.druid.java.util.metrics.AbstractMonitor; import org.apache.druid.java.util.metrics.MonitorScheduler; -import javax.annotation.concurrent.GuardedBy; - /** * Curator {@link ConnectionStateListener} that uses a {@link ServiceEmitter} to send alerts on ZK connection loss, * and emit metrics about ZK connection status. From 9aa91eebb706d3f8b88af224f6d4d046c21c7d5f Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Fri, 23 Jun 2023 14:50:06 -0700 Subject: [PATCH 3/8] Test fixes, coverage. --- .../emitter/service/AlertBuilderTest.java | 54 +++++++++++++++++++ .../druid/curator/CuratorModuleTest.java | 6 +++ 2 files changed, 60 insertions(+) create mode 100644 processing/src/test/java/org/apache/druid/java/util/emitter/service/AlertBuilderTest.java diff --git a/processing/src/test/java/org/apache/druid/java/util/emitter/service/AlertBuilderTest.java b/processing/src/test/java/org/apache/druid/java/util/emitter/service/AlertBuilderTest.java new file mode 100644 index 000000000000..275dc8be7033 --- /dev/null +++ b/processing/src/test/java/org/apache/druid/java/util/emitter/service/AlertBuilderTest.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.java.util.emitter.service; + +import com.google.common.collect.ImmutableMap; +import org.apache.druid.java.util.emitter.core.EventMap; +import org.junit.Assert; +import org.junit.Test; + +import java.util.Map; + +public class AlertBuilderTest +{ + @Test + public void testAlertBuilder() + { + final AlertEvent alertEvent = + AlertBuilder.create("alert[%s]", "oops") + .addData(ImmutableMap.of("foo", "bar")) + .addData(ImmutableMap.of("baz", "qux")) + .addThrowable(new RuntimeException("an exception!")) + .build("druid/test", "example.com"); + + final EventMap alertMap = alertEvent.toMap(); + + Assert.assertEquals("alerts", alertMap.get("feed")); + Assert.assertEquals("alert[oops]", alertMap.get("description")); + Assert.assertEquals("druid/test", alertMap.get("service")); + Assert.assertEquals("example.com", alertMap.get("host")); + + final Map dataMap = (Map) alertMap.get("data"); + Assert.assertEquals("java.lang.RuntimeException", dataMap.get("exceptionType")); + Assert.assertEquals("an exception!", dataMap.get("exceptionMessage")); + Assert.assertEquals("bar", dataMap.get("foo")); + Assert.assertEquals("qux", dataMap.get("baz")); + } +} diff --git a/server/src/test/java/org/apache/druid/curator/CuratorModuleTest.java b/server/src/test/java/org/apache/druid/curator/CuratorModuleTest.java index 5b88cb59f434..239b5718aef4 100644 --- a/server/src/test/java/org/apache/druid/curator/CuratorModuleTest.java +++ b/server/src/test/java/org/apache/druid/curator/CuratorModuleTest.java @@ -27,9 +27,13 @@ import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.druid.guice.LifecycleModule; import org.apache.druid.guice.StartupInjectorBuilder; +import org.apache.druid.java.util.emitter.service.ServiceEmitter; +import org.apache.druid.java.util.metrics.MonitorScheduler; +import org.apache.druid.server.metrics.NoopServiceEmitter; import org.apache.druid.testing.junit.LoggerCaptureRule; import org.apache.logging.log4j.Level; import org.apache.logging.log4j.core.LogEvent; +import org.easymock.EasyMock; import org.hamcrest.CoreMatchers; import org.hamcrest.MatcherAssert; import org.hamcrest.Matchers; @@ -121,6 +125,8 @@ private Injector newInjector(final Properties props) .add( new LifecycleModule(), new CuratorModule(false), + binder -> binder.bind(ServiceEmitter.class).to(NoopServiceEmitter.class), + binder -> binder.bind(MonitorScheduler.class).toInstance(EasyMock.mock(MonitorScheduler.class)), binder -> binder.bind(Properties.class).toInstance(props) ) .build(); From 0369f6556990daa89c48ff2be7ed9d78a0b1eabd Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Fri, 23 Jun 2023 14:54:06 -0700 Subject: [PATCH 4/8] Adjustment. --- .../org/apache/druid/java/util/emitter/EmittingLogger.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/processing/src/main/java/org/apache/druid/java/util/emitter/EmittingLogger.java b/processing/src/main/java/org/apache/druid/java/util/emitter/EmittingLogger.java index 66df7ea7d740..3ae99a86566b 100644 --- a/processing/src/main/java/org/apache/druid/java/util/emitter/EmittingLogger.java +++ b/processing/src/main/java/org/apache/druid/java/util/emitter/EmittingLogger.java @@ -92,8 +92,7 @@ public AlertBuilder makeAlert(@Nullable Throwable t, String message, Object... o } return new EmittingAlertBuilder(t, StringUtils.format(message, objects), emitter) - .addData("class", className) - .addThrowable(t); + .addData("class", className); } public class EmittingAlertBuilder extends AlertBuilder @@ -106,6 +105,7 @@ private EmittingAlertBuilder(Throwable t, String description, ServiceEmitter emi { super(description, emitter); this.t = t; + addThrowable(t); } @Override From fd40a7255bb8338ddf26edcb92ecf61d2d68f3e0 Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Fri, 23 Jun 2023 15:00:46 -0700 Subject: [PATCH 5/8] Fix tests. --- .../kafka/supervisor/KafkaSupervisorTest.java | 2 +- .../kinesis/supervisor/KinesisSupervisorTest.java | 2 +- .../druid/java/util/emitter/EmittingLogger.java | 4 ---- .../java/util/emitter/service/AlertBuilder.java | 10 +++++++--- .../metrics/ExceptionCapturingServiceEmitter.java | 12 ++++++------ 5 files changed, 15 insertions(+), 15 deletions(-) diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java index 97297c085e22..47875e106f35 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java @@ -3440,7 +3440,7 @@ public void testCheckpointForUnknownTaskGroup() "Cannot find taskGroup [0] among all activelyReadingTaskGroups [{}]", serviceEmitter.getExceptionMessage() ); - Assert.assertEquals(ISE.class, serviceEmitter.getExceptionClass()); + Assert.assertEquals(ISE.class.getName(), serviceEmitter.getExceptionClass()); } @Test diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java index 19caa96ead94..37691b1a7e53 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java @@ -3461,7 +3461,7 @@ public void testCheckpointForUnknownTaskGroup() "Cannot find taskGroup [0] among all activelyReadingTaskGroups [{}]", serviceEmitter.getExceptionMessage() ); - Assert.assertEquals(ISE.class, serviceEmitter.getExceptionClass()); + Assert.assertEquals(ISE.class.getName(), serviceEmitter.getExceptionClass()); } @Test diff --git a/processing/src/main/java/org/apache/druid/java/util/emitter/EmittingLogger.java b/processing/src/main/java/org/apache/druid/java/util/emitter/EmittingLogger.java index 3ae99a86566b..6531c6bd3a8a 100644 --- a/processing/src/main/java/org/apache/druid/java/util/emitter/EmittingLogger.java +++ b/processing/src/main/java/org/apache/druid/java/util/emitter/EmittingLogger.java @@ -34,10 +34,6 @@ */ public class EmittingLogger extends Logger { - public static final String EXCEPTION_TYPE_KEY = "exceptionType"; - public static final String EXCEPTION_MESSAGE_KEY = "exceptionMessage"; - public static final String EXCEPTION_STACK_TRACE_KEY = "exceptionStackTrace"; - private static volatile ServiceEmitter emitter = null; private final String className; diff --git a/processing/src/main/java/org/apache/druid/java/util/emitter/service/AlertBuilder.java b/processing/src/main/java/org/apache/druid/java/util/emitter/service/AlertBuilder.java index 2dc8222bd771..352272d67bc4 100644 --- a/processing/src/main/java/org/apache/druid/java/util/emitter/service/AlertBuilder.java +++ b/processing/src/main/java/org/apache/druid/java/util/emitter/service/AlertBuilder.java @@ -34,6 +34,10 @@ */ public class AlertBuilder extends ServiceEventBuilder { + public static final String EXCEPTION_TYPE_KEY = "exceptionType"; + public static final String EXCEPTION_MESSAGE_KEY = "exceptionMessage"; + public static final String EXCEPTION_STACK_TRACE_KEY = "exceptionStackTrace"; + protected final Map dataMap = Maps.newLinkedHashMap(); protected final String description; protected final ServiceEmitter emitter; @@ -77,9 +81,9 @@ public AlertBuilder addThrowable(@Nullable final Throwable t) final StringWriter trace = new StringWriter(); final PrintWriter pw = new PrintWriter(trace); t.printStackTrace(pw); - addData("exceptionType", t.getClass().getName()); - addData("exceptionMessage", t.getMessage()); - addData("exceptionStackTrace", trace.toString()); + addData(EXCEPTION_TYPE_KEY, t.getClass().getName()); + addData(EXCEPTION_MESSAGE_KEY, t.getMessage()); + addData(EXCEPTION_STACK_TRACE_KEY, trace.toString()); } return this; diff --git a/server/src/test/java/org/apache/druid/server/metrics/ExceptionCapturingServiceEmitter.java b/server/src/test/java/org/apache/druid/server/metrics/ExceptionCapturingServiceEmitter.java index c28744220f17..8a80b3df3e52 100644 --- a/server/src/test/java/org/apache/druid/server/metrics/ExceptionCapturingServiceEmitter.java +++ b/server/src/test/java/org/apache/druid/server/metrics/ExceptionCapturingServiceEmitter.java @@ -19,8 +19,8 @@ package org.apache.druid.server.metrics; -import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.java.util.emitter.core.Event; +import org.apache.druid.java.util.emitter.service.AlertBuilder; import org.apache.druid.java.util.emitter.service.ServiceEmitter; import javax.annotation.Nullable; @@ -28,7 +28,7 @@ public class ExceptionCapturingServiceEmitter extends ServiceEmitter { - private volatile Class exceptionClass; + private volatile String exceptionClass; private volatile String exceptionMessage; private volatile String stackTrace; @@ -42,10 +42,10 @@ public void emit(Event event) { //noinspection unchecked final Map dataMap = (Map) event.toMap().get("data"); - final Class exceptionClass = (Class) dataMap.get(EmittingLogger.EXCEPTION_TYPE_KEY); + final String exceptionClass = (String) dataMap.get(AlertBuilder.EXCEPTION_TYPE_KEY); if (exceptionClass != null) { - final String exceptionMessage = (String) dataMap.get(EmittingLogger.EXCEPTION_MESSAGE_KEY); - final String stackTrace = (String) dataMap.get(EmittingLogger.EXCEPTION_STACK_TRACE_KEY); + final String exceptionMessage = (String) dataMap.get(AlertBuilder.EXCEPTION_MESSAGE_KEY); + final String stackTrace = (String) dataMap.get(AlertBuilder.EXCEPTION_STACK_TRACE_KEY); this.exceptionClass = exceptionClass; this.exceptionMessage = exceptionMessage; this.stackTrace = stackTrace; @@ -53,7 +53,7 @@ public void emit(Event event) } @Nullable - public Class getExceptionClass() + public String getExceptionClass() { return exceptionClass; } From 87e6dfbe0d7d5ddf82bcbb680744ff622c331830 Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Sat, 24 Jun 2023 16:59:00 -0700 Subject: [PATCH 6/8] Fix ITs. --- .../druid/testsEx/config/Initializer.java | 38 +++++++++++++++++-- 1 file changed, 35 insertions(+), 3 deletions(-) diff --git a/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/config/Initializer.java b/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/config/Initializer.java index 80d1dec0e668..4205cd2a19e1 100644 --- a/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/config/Initializer.java +++ b/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/config/Initializer.java @@ -55,12 +55,16 @@ import org.apache.druid.jackson.ToStringObjectPairListDeserializer; import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.concurrent.ScheduledExecutors; import org.apache.druid.java.util.common.lifecycle.Lifecycle; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.java.util.emitter.core.LoggingEmitter; import org.apache.druid.java.util.emitter.core.LoggingEmitterConfig; import org.apache.druid.java.util.emitter.service.ServiceEmitter; import org.apache.druid.java.util.http.client.HttpClient; +import org.apache.druid.java.util.metrics.BasicMonitorScheduler; +import org.apache.druid.java.util.metrics.MonitorScheduler; +import org.apache.druid.java.util.metrics.MonitorSchedulerConfig; import org.apache.druid.metadata.MetadataStorageConnector; import org.apache.druid.metadata.MetadataStorageConnectorConfig; import org.apache.druid.metadata.MetadataStorageProvider; @@ -77,9 +81,11 @@ import org.apache.druid.testing.guice.TestClient; import org.apache.druid.testsEx.cluster.DruidClusterClient; import org.apache.druid.testsEx.cluster.MetastoreClient; +import org.joda.time.Duration; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -168,7 +174,11 @@ public void configure(Binder binder) JsonConfigProvider.bind(binder, MetadataStorageTablesConfig.PROPERTY_BASE, MetadataStorageTablesConfig.class); // Build from properties provided in the config - JsonConfigProvider.bind(binder, MetadataStorageConnectorConfig.PROPERTY_BASE, MetadataStorageConnectorConfig.class); + JsonConfigProvider.bind( + binder, + MetadataStorageConnectorConfig.PROPERTY_BASE, + MetadataStorageConnectorConfig.class + ); } @Provides @@ -189,6 +199,28 @@ public ServiceEmitter getServiceEmitter(ObjectMapper jsonMapper) return new ServiceEmitter("", "", new LoggingEmitter(new LoggingEmitterConfig(), jsonMapper)); } + @Provides + @ManageLifecycle + public MonitorScheduler getMonitorScheduler(ServiceEmitter emitter) + { + final MonitorSchedulerConfig config = + new MonitorSchedulerConfig() + { + @Override + public Duration getEmitterPeriod() + { + return Duration.standardSeconds(60); + } + }; + + return new BasicMonitorScheduler( + config, + emitter, + Collections.emptyList(), + ScheduledExecutors.fixed(1, "MonitorScheduler-%d") + ); + } + // From ServerModule to allow deserialization of DiscoveryDruidNode objects from ZK. // We don't want the other dependencies of that module. @Override @@ -327,7 +359,7 @@ public Builder test(Object test) *

* The builder registers {@code DruidNodeDiscoveryProvider} by default: add any * test-specific instances as needed. - */ + */ public Builder eagerInstance(Class theClass) { this.eagerCreation.add(theClass); @@ -343,7 +375,7 @@ public Builder modules(List modules) return this; } - public Builder modules(Module...modules) + public Builder modules(Module... modules) { return modules(Arrays.asList(modules)); } From 7bdf56a179e04c64d2f8448a1050f0b50e8a333b Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Sun, 25 Jun 2023 10:21:55 -0700 Subject: [PATCH 7/8] Improved injection. --- .../druid/testsEx/config/Initializer.java | 28 ------------ .../testing/junit/LoggerCaptureRule.java | 45 +++++++++++++++---- .../apache/druid/curator/CuratorModule.java | 10 ++--- .../curator/DruidConnectionStateListener.java | 29 +++--------- .../druid/curator/CuratorModuleTest.java | 15 +++---- 5 files changed, 54 insertions(+), 73 deletions(-) diff --git a/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/config/Initializer.java b/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/config/Initializer.java index 4205cd2a19e1..7a2eae93e16a 100644 --- a/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/config/Initializer.java +++ b/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/config/Initializer.java @@ -55,16 +55,12 @@ import org.apache.druid.jackson.ToStringObjectPairListDeserializer; import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.StringUtils; -import org.apache.druid.java.util.common.concurrent.ScheduledExecutors; import org.apache.druid.java.util.common.lifecycle.Lifecycle; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.java.util.emitter.core.LoggingEmitter; import org.apache.druid.java.util.emitter.core.LoggingEmitterConfig; import org.apache.druid.java.util.emitter.service.ServiceEmitter; import org.apache.druid.java.util.http.client.HttpClient; -import org.apache.druid.java.util.metrics.BasicMonitorScheduler; -import org.apache.druid.java.util.metrics.MonitorScheduler; -import org.apache.druid.java.util.metrics.MonitorSchedulerConfig; import org.apache.druid.metadata.MetadataStorageConnector; import org.apache.druid.metadata.MetadataStorageConnectorConfig; import org.apache.druid.metadata.MetadataStorageProvider; @@ -81,11 +77,9 @@ import org.apache.druid.testing.guice.TestClient; import org.apache.druid.testsEx.cluster.DruidClusterClient; import org.apache.druid.testsEx.cluster.MetastoreClient; -import org.joda.time.Duration; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -199,28 +193,6 @@ public ServiceEmitter getServiceEmitter(ObjectMapper jsonMapper) return new ServiceEmitter("", "", new LoggingEmitter(new LoggingEmitterConfig(), jsonMapper)); } - @Provides - @ManageLifecycle - public MonitorScheduler getMonitorScheduler(ServiceEmitter emitter) - { - final MonitorSchedulerConfig config = - new MonitorSchedulerConfig() - { - @Override - public Duration getEmitterPeriod() - { - return Duration.standardSeconds(60); - } - }; - - return new BasicMonitorScheduler( - config, - emitter, - Collections.emptyList(), - ScheduledExecutors.fixed(1, "MonitorScheduler-%d") - ); - } - // From ServerModule to allow deserialization of DiscoveryDruidNode objects from ZK. // We don't want the other dependencies of that module. @Override diff --git a/processing/src/test/java/org/apache/druid/testing/junit/LoggerCaptureRule.java b/processing/src/test/java/org/apache/druid/testing/junit/LoggerCaptureRule.java index ae7a1dfde055..70c566b3f0e3 100644 --- a/processing/src/test/java/org/apache/druid/testing/junit/LoggerCaptureRule.java +++ b/processing/src/test/java/org/apache/druid/testing/junit/LoggerCaptureRule.java @@ -19,6 +19,8 @@ package org.apache.druid.testing.junit; +import com.google.common.collect.ImmutableList; +import com.google.errorprone.annotations.concurrent.GuardedBy; import org.apache.logging.log4j.Level; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.core.LogEvent; @@ -28,8 +30,8 @@ import org.apache.logging.log4j.core.config.LoggerConfig; import org.junit.rules.ExternalResource; +import java.util.ArrayList; import java.util.List; -import java.util.concurrent.CopyOnWriteArrayList; /** * JUnit rule to capture a class's logger output to an in-memory buffer to allow verification of log messages in tests. @@ -73,6 +75,14 @@ public void clearLogEvents() inMemoryAppender.clearLogEvents(); } + /** + * Wait for the captured + */ + public void awaitLogEvents() throws InterruptedException + { + inMemoryAppender.awaitLogEvents(); + } + private static class InMemoryAppender extends AbstractAppender { static final String NAME = InMemoryAppender.class.getName(); @@ -80,32 +90,51 @@ private static class InMemoryAppender extends AbstractAppender private final String targetLoggerName; // logEvents has concurrent iteration and modification in CuratorModuleTest::exitsJvmWhenMaxRetriesExceeded(), needs to be thread safe - private final CopyOnWriteArrayList logEvents; + @GuardedBy("logEvents") + private final List logEvents; InMemoryAppender(Class targetClass) { super(NAME, null, null); targetLoggerName = targetClass.getName(); - logEvents = new CopyOnWriteArrayList<>(); + logEvents = new ArrayList<>(); } @Override public void append(LogEvent logEvent) { - if (logEvent.getLoggerName().equals(targetLoggerName)) { - logEvents.add(logEvent); + synchronized (logEvents) { + if (logEvent.getLoggerName().equals(targetLoggerName)) { + logEvents.add(logEvent); + logEvents.notifyAll(); + } } } List getLogEvents() { - return logEvents; + synchronized (logEvents) { + return ImmutableList.copyOf(logEvents); + } } void clearLogEvents() { - logEvents.clear(); + synchronized (logEvents) { + logEvents.clear(); + } + } + + /** + * Wait for "logEvents" to be nonempty. If it is already nonempty, return immediately. + */ + void awaitLogEvents() throws InterruptedException + { + synchronized (logEvents) { + while (logEvents.isEmpty()) { + logEvents.wait(); + } + } } } } - diff --git a/server/src/main/java/org/apache/druid/curator/CuratorModule.java b/server/src/main/java/org/apache/druid/curator/CuratorModule.java index 43d1f7d651e8..a7e76af474a3 100644 --- a/server/src/main/java/org/apache/druid/curator/CuratorModule.java +++ b/server/src/main/java/org/apache/druid/curator/CuratorModule.java @@ -38,7 +38,7 @@ import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.java.util.emitter.service.AlertBuilder; import org.apache.druid.java.util.emitter.service.ServiceEmitter; -import org.apache.druid.java.util.metrics.MonitorScheduler; +import org.apache.druid.server.metrics.MetricsModule; import org.apache.zookeeper.ZooDefs; import org.apache.zookeeper.data.ACL; @@ -74,6 +74,7 @@ public void configure(Binder binder) { JsonConfigProvider.bind(binder, CuratorConfig.CONFIG_PREFIX, ZkEnablementConfig.class); JsonConfigProvider.bind(binder, CuratorConfig.CONFIG_PREFIX, CuratorConfig.class); + MetricsModule.register(binder, DruidConnectionStateListener.class); } /** @@ -134,12 +135,9 @@ public CuratorFramework makeCurator( */ @Provides @LazySingleton - public DruidConnectionStateListener makeConnectionStateListener( - final ServiceEmitter emitter, - final MonitorScheduler monitorScheduler - ) + public DruidConnectionStateListener makeConnectionStateListener(final ServiceEmitter emitter) { - return new DruidConnectionStateListener(monitorScheduler, emitter); + return new DruidConnectionStateListener(emitter); } /** diff --git a/server/src/main/java/org/apache/druid/curator/DruidConnectionStateListener.java b/server/src/main/java/org/apache/druid/curator/DruidConnectionStateListener.java index 2c39d778ca52..acbadd865fc1 100644 --- a/server/src/main/java/org/apache/druid/curator/DruidConnectionStateListener.java +++ b/server/src/main/java/org/apache/druid/curator/DruidConnectionStateListener.java @@ -27,19 +27,17 @@ import org.apache.druid.java.util.emitter.service.ServiceEmitter; import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; import org.apache.druid.java.util.metrics.AbstractMonitor; -import org.apache.druid.java.util.metrics.MonitorScheduler; /** * Curator {@link ConnectionStateListener} that uses a {@link ServiceEmitter} to send alerts on ZK connection loss, * and emit metrics about ZK connection status. */ -public class DruidConnectionStateListener implements ConnectionStateListener +public class DruidConnectionStateListener extends AbstractMonitor implements ConnectionStateListener { private static final String METRIC_IS_CONNECTED = "zk/connected"; private static final String METRIC_DISCONNECTED_TIME = "zk/disconnected/time"; private static final int NIL = -1; - private final MonitorScheduler monitorScheduler; private final ServiceEmitter emitter; /** @@ -54,9 +52,8 @@ public class DruidConnectionStateListener implements ConnectionStateListener @GuardedBy("this") private long lastDisconnectTime = NIL; - public DruidConnectionStateListener(final MonitorScheduler monitorScheduler, final ServiceEmitter emitter) + public DruidConnectionStateListener(final ServiceEmitter emitter) { - this.monitorScheduler = monitorScheduler; this.emitter = emitter; } @@ -64,7 +61,6 @@ public DruidConnectionStateListener(final MonitorScheduler monitorScheduler, fin public void stateChanged(CuratorFramework curatorFramework, ConnectionState newState) { if (newState.isConnected()) { - final boolean isFirst; final long disconnectDuration; synchronized (this) { @@ -74,15 +70,10 @@ public void stateChanged(CuratorFramework curatorFramework, ConnectionState newS disconnectDuration = 0; } - isFirst = currentState == null; currentState = newState; lastDisconnectTime = NIL; } - if (isFirst) { - monitorScheduler.addMonitor(createMonitor()); - } - if (disconnectDuration > 0) { emitter.emit(ServiceMetricEvent.builder().build(METRIC_DISCONNECTED_TIME, disconnectDuration)); } @@ -103,18 +94,10 @@ public boolean isConnected() } } - public Monitor createMonitor() - { - return new Monitor(); - } - - private class Monitor extends AbstractMonitor + @Override + public boolean doMonitor(ServiceEmitter emitter) { - @Override - public boolean doMonitor(ServiceEmitter emitter) - { - emitter.emit(ServiceMetricEvent.builder().build(METRIC_IS_CONNECTED, isConnected() ? 1 : 0)); - return true; - } + emitter.emit(ServiceMetricEvent.builder().build(METRIC_IS_CONNECTED, isConnected() ? 1 : 0)); + return true; } } diff --git a/server/src/test/java/org/apache/druid/curator/CuratorModuleTest.java b/server/src/test/java/org/apache/druid/curator/CuratorModuleTest.java index 239b5718aef4..46257efc0557 100644 --- a/server/src/test/java/org/apache/druid/curator/CuratorModuleTest.java +++ b/server/src/test/java/org/apache/druid/curator/CuratorModuleTest.java @@ -28,12 +28,10 @@ import org.apache.druid.guice.LifecycleModule; import org.apache.druid.guice.StartupInjectorBuilder; import org.apache.druid.java.util.emitter.service.ServiceEmitter; -import org.apache.druid.java.util.metrics.MonitorScheduler; import org.apache.druid.server.metrics.NoopServiceEmitter; import org.apache.druid.testing.junit.LoggerCaptureRule; import org.apache.logging.log4j.Level; import org.apache.logging.log4j.core.LogEvent; -import org.easymock.EasyMock; import org.hamcrest.CoreMatchers; import org.hamcrest.MatcherAssert; import org.hamcrest.Matchers; @@ -73,23 +71,25 @@ public void createsCuratorFrameworkAsConfigured() Assert.assertEquals(CuratorModule.MAX_SLEEP_TIME_MS, retryPolicy.getMaxSleepTimeMs()); } - @Test + @Test(timeout = 60_000L) public void exitsJvmWhenMaxRetriesExceeded() throws Exception { Properties props = new Properties(); props.setProperty(CURATOR_CONNECTION_TIMEOUT_MS_KEY, "0"); Injector injector = newInjector(props); - CuratorFramework curatorFramework = createCuratorFramework(injector, 0); - curatorFramework.start(); - exit.expectSystemExitWithStatus(1); logger.clearLogEvents(); + exit.expectSystemExitWithStatus(1); // This will result in a curator unhandled error since the connection timeout is 0 and retries are disabled + CuratorFramework curatorFramework = createCuratorFramework(injector, 0); + curatorFramework.start(); curatorFramework.create().inBackground().forPath("/foo"); // org.apache.curator.framework.impl.CuratorFrameworkImpl logs "Background retry gave up" unhandled error twice + logger.awaitLogEvents(); List loggingEvents = logger.getLogEvents(); + Assert.assertTrue( "Logging events: " + loggingEvents, loggingEvents.stream() @@ -126,9 +126,8 @@ private Injector newInjector(final Properties props) new LifecycleModule(), new CuratorModule(false), binder -> binder.bind(ServiceEmitter.class).to(NoopServiceEmitter.class), - binder -> binder.bind(MonitorScheduler.class).toInstance(EasyMock.mock(MonitorScheduler.class)), binder -> binder.bind(Properties.class).toInstance(props) - ) + ) .build(); } From 709feeeba6eff97ab089bfa7793cb1ae5b1d1a32 Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Sun, 25 Jun 2023 15:10:37 -0700 Subject: [PATCH 8/8] Adjust metric name, add tests. --- docs/operations/metrics.md | 4 +- .../curator/DruidConnectionStateListener.java | 12 +- .../DruidConnectionStateListenerTest.java | 150 ++++++++++++++++++ 3 files changed, 158 insertions(+), 8 deletions(-) create mode 100644 server/src/test/java/org/apache/druid/curator/DruidConnectionStateListenerTest.java diff --git a/docs/operations/metrics.md b/docs/operations/metrics.md index fb9e05427977..33549719b18d 100644 --- a/docs/operations/metrics.md +++ b/docs/operations/metrics.md @@ -361,14 +361,14 @@ These metrics are only available if the `JVMMonitor` module is included. |`jvm/gc/count`|Garbage collection count|`gcName` (cms/g1/parallel/etc.), `gcGen` (old/young)|Varies| |`jvm/gc/cpu`|Count of CPU time in Nanoseconds spent on garbage collection. Note: `jvm/gc/cpu` represents the total time over multiple GC cycles; divide by `jvm/gc/count` to get the mean GC time per cycle.|`gcName`, `gcGen`|Sum of `jvm/gc/cpu` should be within 10-30% of sum of `jvm/cpu/total`, depending on the GC algorithm used (reported by [`JvmCpuMonitor`](../configuration/index.md#enabling-metrics)). | -### ZooKeeper, Curator +### ZooKeeper These metrics are available unless `druid.zk.service.enabled = false`. |Metric|Description|Dimensions|Normal Value| |------|-----------|----------|------------| |`zk/connected`|Indicator of connection status. `1` for connected, `0` for disconnected. Emitted once per monitor period.|None|1| -|`zk/disconnected/time`|Amount of time, in milliseconds, that the server was disconnected from ZooKeeper. Emitted on reconnection. Note that this means the metric is not emitted if connection to ZooKeeper is permanently lost.|None|Not present| +|`zk/reconnect/time`|Amount of time, in milliseconds, that a server was disconnected from ZooKeeper before reconnecting. Emitted on reconnection. Not emitted if connection to ZooKeeper is permanently lost, because in this case, there is no reconnection.|None|Not present| ### EventReceiverFirehose diff --git a/server/src/main/java/org/apache/druid/curator/DruidConnectionStateListener.java b/server/src/main/java/org/apache/druid/curator/DruidConnectionStateListener.java index acbadd865fc1..b0ba274d5014 100644 --- a/server/src/main/java/org/apache/druid/curator/DruidConnectionStateListener.java +++ b/server/src/main/java/org/apache/druid/curator/DruidConnectionStateListener.java @@ -35,7 +35,7 @@ public class DruidConnectionStateListener extends AbstractMonitor implements ConnectionStateListener { private static final String METRIC_IS_CONNECTED = "zk/connected"; - private static final String METRIC_DISCONNECTED_TIME = "zk/disconnected/time"; + private static final String METRIC_RECONNECT_TIME = "zk/reconnect/time"; private static final int NIL = -1; private final ServiceEmitter emitter; @@ -65,17 +65,17 @@ public void stateChanged(CuratorFramework curatorFramework, ConnectionState newS synchronized (this) { if (lastDisconnectTime != NIL) { - disconnectDuration = System.currentTimeMillis() - lastDisconnectTime; + disconnectDuration = Math.max(0, System.currentTimeMillis() - lastDisconnectTime); } else { - disconnectDuration = 0; + disconnectDuration = NIL; } currentState = newState; lastDisconnectTime = NIL; } - if (disconnectDuration > 0) { - emitter.emit(ServiceMetricEvent.builder().build(METRIC_DISCONNECTED_TIME, disconnectDuration)); + if (disconnectDuration != NIL) { + emitter.emit(ServiceMetricEvent.builder().build(METRIC_RECONNECT_TIME, disconnectDuration)); } } else { synchronized (this) { @@ -83,7 +83,7 @@ public void stateChanged(CuratorFramework curatorFramework, ConnectionState newS lastDisconnectTime = Math.max(lastDisconnectTime, System.currentTimeMillis()); } - emitter.emit(AlertBuilder.create("ZooKeeper connection state[%s]", newState)); + emitter.emit(AlertBuilder.create("ZooKeeper connection[%s]", newState)); } } diff --git a/server/src/test/java/org/apache/druid/curator/DruidConnectionStateListenerTest.java b/server/src/test/java/org/apache/druid/curator/DruidConnectionStateListenerTest.java new file mode 100644 index 000000000000..b173015959d0 --- /dev/null +++ b/server/src/test/java/org/apache/druid/curator/DruidConnectionStateListenerTest.java @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.curator; + +import com.google.common.collect.ImmutableList; +import com.google.errorprone.annotations.concurrent.GuardedBy; +import org.apache.curator.framework.state.ConnectionState; +import org.apache.druid.java.util.emitter.core.Event; +import org.apache.druid.server.metrics.NoopServiceEmitter; +import org.hamcrest.CoreMatchers; +import org.hamcrest.MatcherAssert; +import org.hamcrest.Matchers; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +public class DruidConnectionStateListenerTest +{ + private TestEmitter emitter; + private DruidConnectionStateListener listener; + + @Before + public void setUp() + { + emitter = new TestEmitter(); + listener = new DruidConnectionStateListener(emitter); + } + + @Test + public void test_isConnected() + { + Assert.assertFalse(listener.isConnected()); + + listener.stateChanged(null, ConnectionState.CONNECTED); + Assert.assertTrue(listener.isConnected()); + + listener.stateChanged(null, ConnectionState.SUSPENDED); + Assert.assertFalse(listener.isConnected()); + + listener.stateChanged(null, ConnectionState.RECONNECTED); + Assert.assertTrue(listener.isConnected()); + + listener.stateChanged(null, ConnectionState.LOST); + Assert.assertFalse(listener.isConnected()); + } + + @Test + public void test_doMonitor_init() + { + listener.doMonitor(emitter); + Assert.assertEquals(1, emitter.getEvents().size()); + + final Map eventMap = emitter.getEvents().get(0).toMap(); + Assert.assertEquals("zk/connected", eventMap.get("metric")); + Assert.assertEquals(0, eventMap.get("value")); + } + + @Test + public void test_doMonitor_connected() + { + listener.stateChanged(null, ConnectionState.CONNECTED); + listener.doMonitor(emitter); + Assert.assertEquals(1, emitter.getEvents().size()); + + final Map eventMap = emitter.getEvents().get(0).toMap(); + Assert.assertEquals("zk/connected", eventMap.get("metric")); + Assert.assertEquals(1, eventMap.get("value")); + } + + @Test + public void test_doMonitor_notConnected() + { + listener.stateChanged(null, ConnectionState.SUSPENDED); + listener.doMonitor(emitter); + Assert.assertEquals(2, emitter.getEvents().size()); // 2 because stateChanged emitted an alert + + final Map eventMap = emitter.getEvents().get(1).toMap(); + Assert.assertEquals("zk/connected", eventMap.get("metric")); + Assert.assertEquals(0, eventMap.get("value")); + } + + @Test + public void test_suspendedAlert() + { + listener.stateChanged(null, ConnectionState.SUSPENDED); + Assert.assertEquals(1, emitter.getEvents().size()); + + final Map alertMap = emitter.getEvents().get(0).toMap(); + Assert.assertEquals("alerts", alertMap.get("feed")); + Assert.assertEquals("ZooKeeper connection[SUSPENDED]", alertMap.get("description")); + } + + @Test + public void test_reconnectedMetric() + { + listener.stateChanged(null, ConnectionState.SUSPENDED); + Assert.assertEquals(1, emitter.getEvents().size()); // the first stateChanged emits an alert + + listener.stateChanged(null, ConnectionState.RECONNECTED); + Assert.assertEquals(2, emitter.getEvents().size()); // the second stateChanged emits a metric + + final Map eventMap = emitter.getEvents().get(1).toMap(); + Assert.assertEquals("metrics", eventMap.get("feed")); + Assert.assertEquals("zk/reconnect/time", eventMap.get("metric")); + MatcherAssert.assertThat(eventMap.get("value"), CoreMatchers.instanceOf(Long.class)); + MatcherAssert.assertThat(((Number) eventMap.get("value")).longValue(), Matchers.greaterThanOrEqualTo(0L)); + } + + private static class TestEmitter extends NoopServiceEmitter + { + @GuardedBy("events") + private final List events = new ArrayList<>(); + + @Override + public void emit(Event event) + { + synchronized (events) { + events.add(event); + } + } + + public List getEvents() + { + synchronized (events) { + return ImmutableList.copyOf(events); + } + } + } +}