From 525c6bd53e95e008cbd1fa8427c867e7a3aa9af3 Mon Sep 17 00:00:00 2001 From: Ayush Kulshrestha Date: Fri, 31 Jul 2020 14:07:43 +0530 Subject: [PATCH 01/11] Added cronScheduler support for MonitorScheduler --- core/pom.xml | 4 ++ .../common/concurrent/ScheduledExecutors.java | 41 +++++++++++++++++++ .../java/util/metrics/MonitorScheduler.java | 8 +++- pom.xml | 5 +++ 4 files changed, 57 insertions(+), 1 deletion(-) diff --git a/core/pom.xml b/core/pom.xml index 86dcd3c4c5b3..0572e0e32c99 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -226,6 +226,10 @@ org.antlr antlr4-runtime + + io.timeandspace + cron-scheduler + diff --git a/core/src/main/java/org/apache/druid/java/util/common/concurrent/ScheduledExecutors.java b/core/src/main/java/org/apache/druid/java/util/common/concurrent/ScheduledExecutors.java index 2850c50200fa..cd4f9a73a0c9 100644 --- a/core/src/main/java/org/apache/druid/java/util/common/concurrent/ScheduledExecutors.java +++ b/core/src/main/java/org/apache/druid/java/util/common/concurrent/ScheduledExecutors.java @@ -19,10 +19,12 @@ package org.apache.druid.java.util.common.concurrent; +import io.timeandspace.cronscheduler.CronScheduler; import org.apache.druid.java.util.common.lifecycle.Lifecycle; import org.apache.druid.java.util.common.logger.Logger; import org.joda.time.Duration; +import java.time.Instant; import java.util.concurrent.Callable; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; @@ -167,6 +169,45 @@ public void run() ); } + public static void scheduleAtFixedRate(CronScheduler exec, Duration rate, Callable callable) + { + scheduleAtFixedRate(exec, rate, rate, callable); + } + + public static void scheduleAtFixedRate( + final CronScheduler exec, + final Duration initialDelay, + final Duration rate, + final Callable callable + ) + { + log.debug("Scheduling periodically: %s with period %s", callable, rate); + Instant delayInstance = Instant.now().plusMillis(initialDelay.getMillis()); + exec.scheduleAt(delayInstance, + new Runnable() + { + private volatile Signal prevSignal = null; + + @Override + public void run() + { + if (prevSignal == null || prevSignal == Signal.REPEAT) { + Instant periodInstance = Instant.now().plusMillis(rate.getMillis()); + exec.scheduleAt(periodInstance, this); + } + + try { + log.trace("Running %s (period %s)", callable, rate); + prevSignal = callable.call(); + } + catch (Throwable e) { + log.error(e, "Uncaught exception."); + } + } + } + ); + } + public enum Signal { REPEAT, STOP diff --git a/core/src/main/java/org/apache/druid/java/util/metrics/MonitorScheduler.java b/core/src/main/java/org/apache/druid/java/util/metrics/MonitorScheduler.java index 118f283ac2c3..74dda3edb9a9 100644 --- a/core/src/main/java/org/apache/druid/java/util/metrics/MonitorScheduler.java +++ b/core/src/main/java/org/apache/druid/java/util/metrics/MonitorScheduler.java @@ -20,17 +20,20 @@ package org.apache.druid.java.util.metrics; import com.google.common.collect.Sets; +import io.timeandspace.cronscheduler.CronScheduler; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.concurrent.ScheduledExecutors; import org.apache.druid.java.util.common.lifecycle.LifecycleStart; import org.apache.druid.java.util.common.lifecycle.LifecycleStop; import org.apache.druid.java.util.emitter.service.ServiceEmitter; +import java.time.Duration; import java.util.List; import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.ScheduledExecutorService; + /** */ public class MonitorScheduler @@ -40,6 +43,8 @@ public class MonitorScheduler private final ServiceEmitter emitter; private final Set monitors; private final Object lock = new Object(); + private final Duration syncPeriod = Duration.ofSeconds(1L); + private final CronScheduler scheduler; private volatile boolean started = false; @@ -54,6 +59,7 @@ public MonitorScheduler( this.exec = exec; this.emitter = emitter; this.monitors = Sets.newHashSet(monitors); + this.scheduler = CronScheduler.create(syncPeriod); } @LifecycleStart @@ -113,7 +119,7 @@ private void startMonitor(final Monitor monitor) synchronized (lock) { monitor.start(); ScheduledExecutors.scheduleAtFixedRate( - exec, + scheduler, config.getEmitterPeriod(), new Callable() { diff --git a/pom.xml b/pom.xml index 65b2e74c63f8..3fd243ca8a8b 100644 --- a/pom.xml +++ b/pom.xml @@ -1254,6 +1254,11 @@ 1.19.0 test + + io.timeandspace + cron-scheduler + 0.1 + From 4c041bd364def78cf2ea1903c9733e4df4f6aed4 Mon Sep 17 00:00:00 2001 From: Ayush Kulshrestha Date: Mon, 3 Aug 2020 12:04:36 +0530 Subject: [PATCH 02/11] added javadoc and license --- .../util/common/concurrent/ScheduledExecutors.java | 5 +++++ .../druid/java/util/metrics/MonitorScheduler.java | 9 ++------- licenses.yaml | 10 ++++++++++ .../apache/druid/server/metrics/MetricsModule.java | 11 +++++------ 4 files changed, 22 insertions(+), 13 deletions(-) diff --git a/core/src/main/java/org/apache/druid/java/util/common/concurrent/ScheduledExecutors.java b/core/src/main/java/org/apache/druid/java/util/common/concurrent/ScheduledExecutors.java index cd4f9a73a0c9..39c9583c519b 100644 --- a/core/src/main/java/org/apache/druid/java/util/common/concurrent/ScheduledExecutors.java +++ b/core/src/main/java/org/apache/druid/java/util/common/concurrent/ScheduledExecutors.java @@ -174,6 +174,11 @@ public static void scheduleAtFixedRate(CronScheduler exec, Duration rate, Callab scheduleAtFixedRate(exec, rate, rate, callable); } + /** + * Run callable once every period, after the given initial delay. Uses + * {@link CronScheduler} for task scheduling. Exceptions are caught and logged + * as errors. + */ public static void scheduleAtFixedRate( final CronScheduler exec, final Duration initialDelay, diff --git a/core/src/main/java/org/apache/druid/java/util/metrics/MonitorScheduler.java b/core/src/main/java/org/apache/druid/java/util/metrics/MonitorScheduler.java index 74dda3edb9a9..7066d6750601 100644 --- a/core/src/main/java/org/apache/druid/java/util/metrics/MonitorScheduler.java +++ b/core/src/main/java/org/apache/druid/java/util/metrics/MonitorScheduler.java @@ -27,11 +27,9 @@ import org.apache.druid.java.util.common.lifecycle.LifecycleStop; import org.apache.druid.java.util.emitter.service.ServiceEmitter; -import java.time.Duration; import java.util.List; import java.util.Set; import java.util.concurrent.Callable; -import java.util.concurrent.ScheduledExecutorService; /** @@ -39,27 +37,24 @@ public class MonitorScheduler { private final MonitorSchedulerConfig config; - private final ScheduledExecutorService exec; private final ServiceEmitter emitter; private final Set monitors; private final Object lock = new Object(); - private final Duration syncPeriod = Duration.ofSeconds(1L); private final CronScheduler scheduler; private volatile boolean started = false; public MonitorScheduler( MonitorSchedulerConfig config, - ScheduledExecutorService exec, + CronScheduler scheduler, ServiceEmitter emitter, List monitors ) { this.config = config; - this.exec = exec; + this.scheduler = scheduler; this.emitter = emitter; this.monitors = Sets.newHashSet(monitors); - this.scheduler = CronScheduler.create(syncPeriod); } @LifecycleStart diff --git a/licenses.yaml b/licenses.yaml index 6f7c748170c2..0733e20ada0c 100644 --- a/licenses.yaml +++ b/licenses.yaml @@ -345,6 +345,16 @@ libraries: --- +name: CronScheduler +license_category: binary +module: java-core +license_name: Apache License version 2.0 +version: 0.1 +libraries: + - io.timeandspace: cron-scheduler + +--- + name: LMAX Disruptor license_category: binary module: java-core diff --git a/server/src/main/java/org/apache/druid/server/metrics/MetricsModule.java b/server/src/main/java/org/apache/druid/server/metrics/MetricsModule.java index 71b994541252..b24de9620107 100644 --- a/server/src/main/java/org/apache/druid/server/metrics/MetricsModule.java +++ b/server/src/main/java/org/apache/druid/server/metrics/MetricsModule.java @@ -27,11 +27,11 @@ import com.google.inject.Module; import com.google.inject.Provides; import com.google.inject.name.Names; +import io.timeandspace.cronscheduler.CronScheduler; import org.apache.druid.guice.DruidBinders; import org.apache.druid.guice.JsonConfigProvider; import org.apache.druid.guice.LazySingleton; import org.apache.druid.guice.ManageLifecycle; -import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.java.util.emitter.service.ServiceEmitter; import org.apache.druid.java.util.metrics.JvmCpuMonitor; @@ -42,6 +42,7 @@ import org.apache.druid.java.util.metrics.SysMonitor; import org.apache.druid.query.ExecutorServiceMonitor; +import java.time.Duration; import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -100,16 +101,14 @@ public MonitorScheduler getMonitorScheduler( log.info( "Loaded %d monitors: %s", monitors.size(), - monitors.stream().map(monitor -> monitor.getClass().getName()).collect(Collectors.joining(", ")) - ); + monitors.stream().map(monitor -> monitor.getClass().getName()).collect(Collectors.joining(", "))); } return new MonitorScheduler( config.get(), - Execs.scheduledSingleThreaded("MonitorScheduler-%s"), + CronScheduler.newBuilder(Duration.ofSeconds(1L)).setThreadName("MonitorScheduler-%s").build(), emitter, - monitors - ); + monitors); } @Provides From 24b686da7eef2a262732c96e632a803ca6bd186f Mon Sep 17 00:00:00 2001 From: Ayush Kulshrestha Date: Mon, 3 Aug 2020 17:48:43 +0530 Subject: [PATCH 03/11] Fixed formatting --- .../java/org/apache/druid/server/metrics/MetricsModule.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/apache/druid/server/metrics/MetricsModule.java b/server/src/main/java/org/apache/druid/server/metrics/MetricsModule.java index b24de9620107..5f97f5229cbe 100644 --- a/server/src/main/java/org/apache/druid/server/metrics/MetricsModule.java +++ b/server/src/main/java/org/apache/druid/server/metrics/MetricsModule.java @@ -101,14 +101,16 @@ public MonitorScheduler getMonitorScheduler( log.info( "Loaded %d monitors: %s", monitors.size(), - monitors.stream().map(monitor -> monitor.getClass().getName()).collect(Collectors.joining(", "))); + monitors.stream().map(monitor -> monitor.getClass().getName()).collect(Collectors.joining(", ")) + ); } return new MonitorScheduler( config.get(), CronScheduler.newBuilder(Duration.ofSeconds(1L)).setThreadName("MonitorScheduler-%s").build(), emitter, - monitors); + monitors + ); } @Provides From 653be2f6a663d1b4c5be29f7836e88cee64b57c5 Mon Sep 17 00:00:00 2001 From: Ayush Kulshrestha Date: Mon, 12 Oct 2020 16:12:52 +0530 Subject: [PATCH 04/11] Added tests for ScheduledExecutors and MonitorScheduler (#574) --- core/pom.xml | 5 + .../concurrent/ScheduledExecutorsTest.java | 101 ++++++++++++++++++ .../util/metrics/MonitorSchedulerTest.java | 71 ++++++++++++ server/pom.xml | 4 + 4 files changed, 181 insertions(+) create mode 100644 core/src/test/java/org/apache/druid/java/util/common/concurrent/ScheduledExecutorsTest.java create mode 100644 core/src/test/java/org/apache/druid/java/util/metrics/MonitorSchedulerTest.java diff --git a/core/pom.xml b/core/pom.xml index 0572e0e32c99..bcf82a5f1e53 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -297,6 +297,11 @@ easymock test + + org.mockito + mockito-core + test + org.hamcrest hamcrest-all diff --git a/core/src/test/java/org/apache/druid/java/util/common/concurrent/ScheduledExecutorsTest.java b/core/src/test/java/org/apache/druid/java/util/common/concurrent/ScheduledExecutorsTest.java new file mode 100644 index 000000000000..a48e31855d74 --- /dev/null +++ b/core/src/test/java/org/apache/druid/java/util/common/concurrent/ScheduledExecutorsTest.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.java.util.common.concurrent; + +import io.timeandspace.cronscheduler.CronScheduler; +import org.apache.druid.java.util.common.concurrent.ScheduledExecutors.Signal; +import org.joda.time.Duration; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.MockitoAnnotations; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +import java.util.concurrent.Callable; +import java.util.concurrent.Future; + +public class ScheduledExecutorsTest +{ + + @Mock + private CronScheduler scheduler; + + @Mock + private Callable callable; + + @Before + public void setUp() + { + MockitoAnnotations.initMocks(this); + } + + @Test + public void testScheduleAtFixedRate_Success() throws Exception + { + Mockito.when(callable.call()).thenReturn(Signal.REPEAT); + Mockito.doAnswer(new Answer>() + { + int scheduledCount = 0; + + @Override + public Future answer(InvocationOnMock invocation) throws Throwable + { + final Object originalArgument = (invocation.getArguments())[1]; + scheduledCount++; + if (scheduledCount == 1) { + ((Runnable) originalArgument).run(); + } + return null; + } + }).when(scheduler).scheduleAt(Mockito.any(), Mockito.any(Runnable.class)); + Duration rate = new Duration(2L); + ScheduledExecutors.scheduleAtFixedRate(scheduler, rate, callable); + Mockito.verify(scheduler, Mockito.times(2)).scheduleAt(Mockito.any(), Mockito.any(Runnable.class)); + Mockito.verify(callable, Mockito.times(1)).call(); + } + + @Test + public void testScheduleAtFixedRate_UnexpectedFailure() throws Exception + { + Mockito.when(callable.call()).thenThrow(new IllegalArgumentException("Unexpected Exception")); + Mockito.doAnswer(new Answer>() + { + int scheduledCount = 0; + + @Override + public Future answer(InvocationOnMock invocation) throws Throwable + { + final Object originalArgument = (invocation.getArguments())[1]; + scheduledCount++; + if (scheduledCount == 1) { + ((Runnable) originalArgument).run(); + } + return null; + } + }).when(scheduler).scheduleAt(Mockito.any(), Mockito.any(Runnable.class)); + Duration rate = new Duration(2L); + ScheduledExecutors.scheduleAtFixedRate(scheduler, rate, callable); + Mockito.verify(scheduler, Mockito.times(2)).scheduleAt(Mockito.any(), Mockito.any(Runnable.class)); + Mockito.verify(callable, Mockito.times(1)).call(); + } + +} diff --git a/core/src/test/java/org/apache/druid/java/util/metrics/MonitorSchedulerTest.java b/core/src/test/java/org/apache/druid/java/util/metrics/MonitorSchedulerTest.java new file mode 100644 index 000000000000..0319d6933f76 --- /dev/null +++ b/core/src/test/java/org/apache/druid/java/util/metrics/MonitorSchedulerTest.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.java.util.metrics; + +import io.timeandspace.cronscheduler.CronScheduler; +import org.apache.druid.java.util.emitter.service.ServiceEmitter; +import org.joda.time.Duration; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.MockitoAnnotations; + +import java.util.Arrays; + +public class MonitorSchedulerTest +{ + + private MonitorSchedulerConfig config = new MonitorSchedulerConfig() + { + @Override + public Duration getEmitterPeriod() + { + return new Duration(2L); + } + }; + + @Mock + private CronScheduler cronScheduler; + + @Mock + private ServiceEmitter emitter; + + @Mock + private Monitor monitor; + + public MonitorScheduler scheduler; + + @Before + public void setUp() + { + MockitoAnnotations.initMocks(this); + scheduler = new MonitorScheduler(config, cronScheduler, emitter, Arrays.asList(monitor)); + Mockito.when(monitor.monitor(emitter)).thenReturn(true); + } + + @Test + public void testStart_Success() + { + scheduler.start(); + Mockito.verify(cronScheduler, Mockito.times(1)).scheduleAt(Mockito.any(), Mockito.any(Runnable.class)); + } + +} diff --git a/server/pom.xml b/server/pom.xml index b23bea120495..ae3c038d8f19 100644 --- a/server/pom.xml +++ b/server/pom.xml @@ -311,6 +311,10 @@ io.github.resilience4j resilience4j-bulkhead + + io.timeandspace + cron-scheduler + From 35071c2b904f483fbe6fd6c1d6594d489ca022d2 Mon Sep 17 00:00:00 2001 From: Ayush Kulshrestha Date: Wed, 14 Oct 2020 13:11:07 +0530 Subject: [PATCH 05/11] added branch coverage for ScheduledExecutors --- core/pom.xml | 5 -- .../concurrent/ScheduledExecutorsTest.java | 62 +++++++++++++------ 2 files changed, 44 insertions(+), 23 deletions(-) diff --git a/core/pom.xml b/core/pom.xml index ac18c99bee02..72fea5468929 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -295,11 +295,6 @@ easymock test - - org.mockito - mockito-core - test - org.hamcrest hamcrest-all diff --git a/core/src/test/java/org/apache/druid/java/util/common/concurrent/ScheduledExecutorsTest.java b/core/src/test/java/org/apache/druid/java/util/common/concurrent/ScheduledExecutorsTest.java index a48e31855d74..41ddd74eee2c 100644 --- a/core/src/test/java/org/apache/druid/java/util/common/concurrent/ScheduledExecutorsTest.java +++ b/core/src/test/java/org/apache/druid/java/util/common/concurrent/ScheduledExecutorsTest.java @@ -24,6 +24,7 @@ import org.joda.time.Duration; import org.junit.Before; import org.junit.Test; +import org.mockito.ArgumentMatchers; import org.mockito.Mock; import org.mockito.Mockito; import org.mockito.MockitoAnnotations; @@ -49,52 +50,77 @@ public void setUp() } @Test - public void testScheduleAtFixedRate_Success() throws Exception + public void testScheduleAtFixedRateWithCronScheduler_SuccessWithRepeatSignal() throws Exception { Mockito.when(callable.call()).thenReturn(Signal.REPEAT); Mockito.doAnswer(new Answer>() { - int scheduledCount = 0; - @Override - public Future answer(InvocationOnMock invocation) throws Throwable + public Future answer(InvocationOnMock invocation) { final Object originalArgument = (invocation.getArguments())[1]; - scheduledCount++; - if (scheduledCount == 1) { + // mimicking recursive scheduling with a loop + for (int scheduledCount = 0; scheduledCount < 2; scheduledCount++) { + Mockito.when(scheduler.scheduleAt(ArgumentMatchers.any(), ArgumentMatchers.any(Runnable.class))) + .thenReturn(null); ((Runnable) originalArgument).run(); } return null; } - }).when(scheduler).scheduleAt(Mockito.any(), Mockito.any(Runnable.class)); + }).when(scheduler).scheduleAt(ArgumentMatchers.any(), ArgumentMatchers.any(Runnable.class)); Duration rate = new Duration(2L); ScheduledExecutors.scheduleAtFixedRate(scheduler, rate, callable); - Mockito.verify(scheduler, Mockito.times(2)).scheduleAt(Mockito.any(), Mockito.any(Runnable.class)); - Mockito.verify(callable, Mockito.times(1)).call(); + Mockito.verify(scheduler, Mockito.times(3)).scheduleAt(ArgumentMatchers.any(), + ArgumentMatchers.any(Runnable.class)); + Mockito.verify(callable, Mockito.times(2)).call(); } @Test - public void testScheduleAtFixedRate_UnexpectedFailure() throws Exception + public void testScheduleAtFixedRateWithCronScheduler_SuccessWithStopSignal() throws Exception { - Mockito.when(callable.call()).thenThrow(new IllegalArgumentException("Unexpected Exception")); + Mockito.when(callable.call()).thenReturn(Signal.STOP); Mockito.doAnswer(new Answer>() { - int scheduledCount = 0; - @Override - public Future answer(InvocationOnMock invocation) throws Throwable + public Future answer(InvocationOnMock invocation) { final Object originalArgument = (invocation.getArguments())[1]; - scheduledCount++; - if (scheduledCount == 1) { + // mimicking recursive scheduling with a loop + for (int scheduledCount = 0; scheduledCount < 2; scheduledCount++) { + Mockito.when(scheduler.scheduleAt(ArgumentMatchers.any(), ArgumentMatchers.any(Runnable.class))) + .thenReturn(null); ((Runnable) originalArgument).run(); } return null; } - }).when(scheduler).scheduleAt(Mockito.any(), Mockito.any(Runnable.class)); + }).when(scheduler).scheduleAt(ArgumentMatchers.any(), ArgumentMatchers.any(Runnable.class)); + Duration rate = new Duration(2L); + ScheduledExecutors.scheduleAtFixedRate(scheduler, rate, callable); + Mockito.verify(scheduler, Mockito.times(2)).scheduleAt(ArgumentMatchers.any(), + ArgumentMatchers.any(Runnable.class)); + Mockito.verify(callable, Mockito.times(2)).call(); + } + + @Test + public void testScheduleAtFixedRateWithCronScheduler_UnexpectedFailure() throws Exception + { + Mockito.when(callable.call()).thenThrow(new IllegalArgumentException("Unexpected Exception")); + Mockito.doAnswer(new Answer>() + { + @Override + public Future answer(InvocationOnMock invocation) + { + final Object originalArgument = (invocation.getArguments())[1]; + Mockito.when(scheduler.scheduleAt(ArgumentMatchers.any(), ArgumentMatchers.any(Runnable.class))) + .thenReturn(null); + ((Runnable) originalArgument).run(); + return null; + } + }).when(scheduler).scheduleAt(ArgumentMatchers.any(), ArgumentMatchers.any(Runnable.class)); Duration rate = new Duration(2L); ScheduledExecutors.scheduleAtFixedRate(scheduler, rate, callable); - Mockito.verify(scheduler, Mockito.times(2)).scheduleAt(Mockito.any(), Mockito.any(Runnable.class)); + Mockito.verify(scheduler, Mockito.times(2)).scheduleAt(ArgumentMatchers.any(), + ArgumentMatchers.any(Runnable.class)); Mockito.verify(callable, Mockito.times(1)).call(); } From 20f7fe61551d1f3afeaeb06c245089aa4569f568 Mon Sep 17 00:00:00 2001 From: Ayush Kulshrestha Date: Sun, 18 Oct 2020 18:51:00 +0530 Subject: [PATCH 06/11] dummy commit for rerunning build --- .../java/util/common/concurrent/ScheduledExecutorsTest.java | 1 + 1 file changed, 1 insertion(+) diff --git a/core/src/test/java/org/apache/druid/java/util/common/concurrent/ScheduledExecutorsTest.java b/core/src/test/java/org/apache/druid/java/util/common/concurrent/ScheduledExecutorsTest.java index 41ddd74eee2c..8079b6c3fcd5 100644 --- a/core/src/test/java/org/apache/druid/java/util/common/concurrent/ScheduledExecutorsTest.java +++ b/core/src/test/java/org/apache/druid/java/util/common/concurrent/ScheduledExecutorsTest.java @@ -124,4 +124,5 @@ public Future answer(InvocationOnMock invocation) Mockito.verify(callable, Mockito.times(1)).call(); } + } From 0de642c6f75cd378915d19ff6cf5514769e9fb0f Mon Sep 17 00:00:00 2001 From: Ayush Kulshrestha Date: Fri, 30 Oct 2020 19:32:25 +0530 Subject: [PATCH 07/11] added cronScheduler for fixed rate monitoring in MonitorScheduler --- .../common/concurrent/ScheduledExecutors.java | 46 --- .../java/util/metrics/AbstractMonitor.java | 15 + .../java/util/metrics/CompoundMonitor.java | 15 + .../druid/java/util/metrics/Monitor.java | 7 + .../java/util/metrics/MonitorScheduler.java | 65 +++- .../concurrent/ScheduledExecutorsTest.java | 128 -------- .../util/metrics/MonitorSchedulerTest.java | 300 +++++++++++++++++- .../druid/server/metrics/MetricsModule.java | 4 +- 8 files changed, 388 insertions(+), 192 deletions(-) delete mode 100644 core/src/test/java/org/apache/druid/java/util/common/concurrent/ScheduledExecutorsTest.java diff --git a/core/src/main/java/org/apache/druid/java/util/common/concurrent/ScheduledExecutors.java b/core/src/main/java/org/apache/druid/java/util/common/concurrent/ScheduledExecutors.java index 39c9583c519b..2850c50200fa 100644 --- a/core/src/main/java/org/apache/druid/java/util/common/concurrent/ScheduledExecutors.java +++ b/core/src/main/java/org/apache/druid/java/util/common/concurrent/ScheduledExecutors.java @@ -19,12 +19,10 @@ package org.apache.druid.java.util.common.concurrent; -import io.timeandspace.cronscheduler.CronScheduler; import org.apache.druid.java.util.common.lifecycle.Lifecycle; import org.apache.druid.java.util.common.logger.Logger; import org.joda.time.Duration; -import java.time.Instant; import java.util.concurrent.Callable; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; @@ -169,50 +167,6 @@ public void run() ); } - public static void scheduleAtFixedRate(CronScheduler exec, Duration rate, Callable callable) - { - scheduleAtFixedRate(exec, rate, rate, callable); - } - - /** - * Run callable once every period, after the given initial delay. Uses - * {@link CronScheduler} for task scheduling. Exceptions are caught and logged - * as errors. - */ - public static void scheduleAtFixedRate( - final CronScheduler exec, - final Duration initialDelay, - final Duration rate, - final Callable callable - ) - { - log.debug("Scheduling periodically: %s with period %s", callable, rate); - Instant delayInstance = Instant.now().plusMillis(initialDelay.getMillis()); - exec.scheduleAt(delayInstance, - new Runnable() - { - private volatile Signal prevSignal = null; - - @Override - public void run() - { - if (prevSignal == null || prevSignal == Signal.REPEAT) { - Instant periodInstance = Instant.now().plusMillis(rate.getMillis()); - exec.scheduleAt(periodInstance, this); - } - - try { - log.trace("Running %s (period %s)", callable, rate); - prevSignal = callable.call(); - } - catch (Throwable e) { - log.error(e, "Uncaught exception."); - } - } - } - ); - } - public enum Signal { REPEAT, STOP diff --git a/core/src/main/java/org/apache/druid/java/util/metrics/AbstractMonitor.java b/core/src/main/java/org/apache/druid/java/util/metrics/AbstractMonitor.java index 029dd4780002..d9f10452e001 100644 --- a/core/src/main/java/org/apache/druid/java/util/metrics/AbstractMonitor.java +++ b/core/src/main/java/org/apache/druid/java/util/metrics/AbstractMonitor.java @@ -22,11 +22,16 @@ import org.apache.druid.java.util.emitter.service.ServiceEmitter; +import java.util.concurrent.Future; + + /** */ public abstract class AbstractMonitor implements Monitor { private volatile boolean started = false; + + private volatile Future scheduledFuture; @Override public void start() @@ -51,4 +56,14 @@ public boolean monitor(ServiceEmitter emitter) } public abstract boolean doMonitor(ServiceEmitter emitter); + + public Future getScheduledFuture() + { + return scheduledFuture; + } + + public void setScheduledFuture(Future scheduledFuture) + { + this.scheduledFuture = scheduledFuture; + } } diff --git a/core/src/main/java/org/apache/druid/java/util/metrics/CompoundMonitor.java b/core/src/main/java/org/apache/druid/java/util/metrics/CompoundMonitor.java index 9811f584bc0e..6649312d4a2c 100644 --- a/core/src/main/java/org/apache/druid/java/util/metrics/CompoundMonitor.java +++ b/core/src/main/java/org/apache/druid/java/util/metrics/CompoundMonitor.java @@ -24,10 +24,13 @@ import java.util.Arrays; import java.util.List; +import java.util.concurrent.Future; public abstract class CompoundMonitor implements Monitor { private final List monitors; + + private volatile Future scheduledFuture; public CompoundMonitor(List monitors) { @@ -61,5 +64,17 @@ public boolean monitor(final ServiceEmitter emitter) return shouldReschedule(Lists.transform(monitors, monitor -> monitor.monitor(emitter))); } + @Override + public Future getScheduledFuture() + { + return scheduledFuture; + } + + @Override + public void setScheduledFuture(Future scheduledFuture) + { + this.scheduledFuture = scheduledFuture; + } + public abstract boolean shouldReschedule(List reschedules); } diff --git a/core/src/main/java/org/apache/druid/java/util/metrics/Monitor.java b/core/src/main/java/org/apache/druid/java/util/metrics/Monitor.java index 2ccd5db3ca6f..8a3975e57e29 100644 --- a/core/src/main/java/org/apache/druid/java/util/metrics/Monitor.java +++ b/core/src/main/java/org/apache/druid/java/util/metrics/Monitor.java @@ -21,6 +21,9 @@ import org.apache.druid.java.util.emitter.service.ServiceEmitter; +import java.util.concurrent.Future; + + /** */ public interface Monitor @@ -35,4 +38,8 @@ public interface Monitor * @return true if this monitor needs to continue monitoring. False otherwise. */ boolean monitor(ServiceEmitter emitter); + + Future getScheduledFuture(); + + void setScheduledFuture(Future scheduledFuture); } diff --git a/core/src/main/java/org/apache/druid/java/util/metrics/MonitorScheduler.java b/core/src/main/java/org/apache/druid/java/util/metrics/MonitorScheduler.java index a7afc416fb28..51726002e137 100644 --- a/core/src/main/java/org/apache/druid/java/util/metrics/MonitorScheduler.java +++ b/core/src/main/java/org/apache/druid/java/util/metrics/MonitorScheduler.java @@ -21,41 +21,51 @@ import com.google.common.collect.Sets; import io.timeandspace.cronscheduler.CronScheduler; +import io.timeandspace.cronscheduler.CronTask; import org.apache.druid.java.util.common.ISE; -import org.apache.druid.java.util.common.concurrent.ScheduledExecutors; import org.apache.druid.java.util.common.lifecycle.LifecycleStart; import org.apache.druid.java.util.common.lifecycle.LifecycleStop; +import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.java.util.emitter.service.ServiceEmitter; import java.util.List; import java.util.Optional; import java.util.Set; import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; /** */ public class MonitorScheduler { + + private static final Logger log = new Logger(MonitorScheduler.class); + private final MonitorSchedulerConfig config; private final ServiceEmitter emitter; private final Set monitors; private final Object lock = new Object(); private final CronScheduler scheduler; + private final ExecutorService executorService; private volatile boolean started = false; - + public MonitorScheduler( MonitorSchedulerConfig config, CronScheduler scheduler, ServiceEmitter emitter, - List monitors + List monitors, + ExecutorService executorService ) { this.config = config; this.scheduler = scheduler; this.emitter = emitter; this.monitors = Sets.newHashSet(monitors); + this.executorService = executorService; } @LifecycleStart @@ -125,24 +135,46 @@ private void startMonitor(final Monitor monitor) { synchronized (lock) { monitor.start(); - ScheduledExecutors.scheduleAtFixedRate( - scheduler, - config.getEmitterPeriod(), - new Callable() + Long rate = config.getEmitterPeriod().getMillis(); + Future scheduledFuture = scheduler.scheduleAtFixedRate( + rate, + rate, + TimeUnit.MILLISECONDS, + new CronTask() { + private volatile Future monitorFuture = null; @Override - public ScheduledExecutors.Signal call() + public void run(long scheduledRunTimeMillis) { - // Run one more time even if the monitor was removed, in case there's some extra data to flush - if (monitor.monitor(emitter) && hasMonitor(monitor)) { - return ScheduledExecutors.Signal.REPEAT; - } else { - removeMonitor(monitor); - return ScheduledExecutors.Signal.STOP; + try { + if (monitorFuture != null && monitorFuture.isDone() + && !(monitorFuture.get() && hasMonitor(monitor))) { + removeMonitor(monitor); + monitor.getScheduledFuture().cancel(false); + log.debug("Stopped rescheduling %s (delay %s)", this, rate); + return; + } + log.trace("Running %s (period %s)", this, rate); + monitorFuture = executorService.submit(new Callable() + { + public Boolean call() + { + try { + return monitor.monitor(emitter); + } + catch (Throwable e) { + log.error(e, "Uncaught exception."); + return Boolean.FALSE; + } + } + }); + } + catch (Throwable e) { + log.error(e, "Uncaught exception."); } } - } - ); + }); + monitor.setScheduledFuture(scheduledFuture); } } @@ -152,4 +184,5 @@ private boolean hasMonitor(final Monitor monitor) return monitors.contains(monitor); } } + } diff --git a/core/src/test/java/org/apache/druid/java/util/common/concurrent/ScheduledExecutorsTest.java b/core/src/test/java/org/apache/druid/java/util/common/concurrent/ScheduledExecutorsTest.java deleted file mode 100644 index 8079b6c3fcd5..000000000000 --- a/core/src/test/java/org/apache/druid/java/util/common/concurrent/ScheduledExecutorsTest.java +++ /dev/null @@ -1,128 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.druid.java.util.common.concurrent; - -import io.timeandspace.cronscheduler.CronScheduler; -import org.apache.druid.java.util.common.concurrent.ScheduledExecutors.Signal; -import org.joda.time.Duration; -import org.junit.Before; -import org.junit.Test; -import org.mockito.ArgumentMatchers; -import org.mockito.Mock; -import org.mockito.Mockito; -import org.mockito.MockitoAnnotations; -import org.mockito.invocation.InvocationOnMock; -import org.mockito.stubbing.Answer; - -import java.util.concurrent.Callable; -import java.util.concurrent.Future; - -public class ScheduledExecutorsTest -{ - - @Mock - private CronScheduler scheduler; - - @Mock - private Callable callable; - - @Before - public void setUp() - { - MockitoAnnotations.initMocks(this); - } - - @Test - public void testScheduleAtFixedRateWithCronScheduler_SuccessWithRepeatSignal() throws Exception - { - Mockito.when(callable.call()).thenReturn(Signal.REPEAT); - Mockito.doAnswer(new Answer>() - { - @Override - public Future answer(InvocationOnMock invocation) - { - final Object originalArgument = (invocation.getArguments())[1]; - // mimicking recursive scheduling with a loop - for (int scheduledCount = 0; scheduledCount < 2; scheduledCount++) { - Mockito.when(scheduler.scheduleAt(ArgumentMatchers.any(), ArgumentMatchers.any(Runnable.class))) - .thenReturn(null); - ((Runnable) originalArgument).run(); - } - return null; - } - }).when(scheduler).scheduleAt(ArgumentMatchers.any(), ArgumentMatchers.any(Runnable.class)); - Duration rate = new Duration(2L); - ScheduledExecutors.scheduleAtFixedRate(scheduler, rate, callable); - Mockito.verify(scheduler, Mockito.times(3)).scheduleAt(ArgumentMatchers.any(), - ArgumentMatchers.any(Runnable.class)); - Mockito.verify(callable, Mockito.times(2)).call(); - } - - @Test - public void testScheduleAtFixedRateWithCronScheduler_SuccessWithStopSignal() throws Exception - { - Mockito.when(callable.call()).thenReturn(Signal.STOP); - Mockito.doAnswer(new Answer>() - { - @Override - public Future answer(InvocationOnMock invocation) - { - final Object originalArgument = (invocation.getArguments())[1]; - // mimicking recursive scheduling with a loop - for (int scheduledCount = 0; scheduledCount < 2; scheduledCount++) { - Mockito.when(scheduler.scheduleAt(ArgumentMatchers.any(), ArgumentMatchers.any(Runnable.class))) - .thenReturn(null); - ((Runnable) originalArgument).run(); - } - return null; - } - }).when(scheduler).scheduleAt(ArgumentMatchers.any(), ArgumentMatchers.any(Runnable.class)); - Duration rate = new Duration(2L); - ScheduledExecutors.scheduleAtFixedRate(scheduler, rate, callable); - Mockito.verify(scheduler, Mockito.times(2)).scheduleAt(ArgumentMatchers.any(), - ArgumentMatchers.any(Runnable.class)); - Mockito.verify(callable, Mockito.times(2)).call(); - } - - @Test - public void testScheduleAtFixedRateWithCronScheduler_UnexpectedFailure() throws Exception - { - Mockito.when(callable.call()).thenThrow(new IllegalArgumentException("Unexpected Exception")); - Mockito.doAnswer(new Answer>() - { - @Override - public Future answer(InvocationOnMock invocation) - { - final Object originalArgument = (invocation.getArguments())[1]; - Mockito.when(scheduler.scheduleAt(ArgumentMatchers.any(), ArgumentMatchers.any(Runnable.class))) - .thenReturn(null); - ((Runnable) originalArgument).run(); - return null; - } - }).when(scheduler).scheduleAt(ArgumentMatchers.any(), ArgumentMatchers.any(Runnable.class)); - Duration rate = new Duration(2L); - ScheduledExecutors.scheduleAtFixedRate(scheduler, rate, callable); - Mockito.verify(scheduler, Mockito.times(2)).scheduleAt(ArgumentMatchers.any(), - ArgumentMatchers.any(Runnable.class)); - Mockito.verify(callable, Mockito.times(1)).call(); - } - - -} diff --git a/core/src/test/java/org/apache/druid/java/util/metrics/MonitorSchedulerTest.java b/core/src/test/java/org/apache/druid/java/util/metrics/MonitorSchedulerTest.java index abcbdc56eed3..efb3e5adcb4e 100644 --- a/core/src/test/java/org/apache/druid/java/util/metrics/MonitorSchedulerTest.java +++ b/core/src/test/java/org/apache/druid/java/util/metrics/MonitorSchedulerTest.java @@ -22,16 +22,40 @@ import com.google.common.collect.ImmutableList; import io.timeandspace.cronscheduler.CronScheduler; +import io.timeandspace.cronscheduler.CronTask; import org.apache.druid.java.util.emitter.service.ServiceEmitter; import org.junit.Assert; +import org.junit.Before; import org.junit.Test; +import org.mockito.ArgumentMatchers; +import org.mockito.Mock; import org.mockito.Mockito; +import org.mockito.MockitoAnnotations; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; import java.time.Duration; import java.util.Optional; +import java.util.concurrent.Callable; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; public class MonitorSchedulerTest { + + @Mock + private CronScheduler cronScheduler; + + @Before + public void setUp() + { + MockitoAnnotations.initMocks(this); + } + @Test public void testFindMonitor() { @@ -47,12 +71,15 @@ class Monitor3 extends NoopMonitor final Monitor1 monitor1 = new Monitor1(); final Monitor2 monitor2 = new Monitor2(); + + ExecutorService executor = Mockito.mock(ExecutorService.class); final MonitorScheduler scheduler = new MonitorScheduler( Mockito.mock(MonitorSchedulerConfig.class), CronScheduler.newBuilder(Duration.ofSeconds(1L)).setThreadName("monitor-scheduler-test").build(), Mockito.mock(ServiceEmitter.class), - ImmutableList.of(monitor1, monitor2) + ImmutableList.of(monitor1, monitor2), + executor ); final Optional maybeFound1 = scheduler.findMonitor(Monitor1.class); @@ -64,7 +91,264 @@ class Monitor3 extends NoopMonitor Assert.assertFalse(scheduler.findMonitor(Monitor3.class).isPresent()); } + + @Test + public void testStart_RepeatScheduling() + { + ExecutorService executor = Mockito.mock(ExecutorService.class); + + Mockito.doAnswer(new Answer>() + { + private int scheduleCount = 0; + + @SuppressWarnings("unchecked") + @Override + public Future answer(InvocationOnMock invocation) throws Exception + { + final Object originalArgument = (invocation.getArguments())[3]; + CronTask task = ((CronTask) originalArgument); + + Mockito.doAnswer(new Answer>() + { + @Override + public Future answer(InvocationOnMock invocation) throws Exception + { + final Object originalArgument = (invocation.getArguments())[0]; + ((Callable) originalArgument).call(); + return CompletableFuture.completedFuture(Boolean.TRUE); + } + }).when(executor).submit(ArgumentMatchers.any(Callable.class)); + + while (scheduleCount < 2) { + scheduleCount++; + task.run(123L); + } + return createDummyFuture(); + } + }).when(cronScheduler).scheduleAtFixedRate(ArgumentMatchers.anyLong(), ArgumentMatchers.anyLong(), + ArgumentMatchers.any(), ArgumentMatchers.any(CronTask.class)); + + Monitor monitor = Mockito.mock(Monitor.class); + + MonitorSchedulerConfig config = Mockito.mock(MonitorSchedulerConfig.class); + Mockito.when(config.getEmitterPeriod()).thenReturn(new org.joda.time.Duration(1000L)); + + final MonitorScheduler scheduler = new MonitorScheduler( + config, + cronScheduler, + Mockito.mock(ServiceEmitter.class), + ImmutableList.of(monitor), + executor); + scheduler.start(); + + Mockito.verify(monitor, Mockito.times(1)).start(); + Mockito.verify(cronScheduler, Mockito.times(1)).scheduleAtFixedRate(ArgumentMatchers.anyLong(), + ArgumentMatchers.anyLong(), + ArgumentMatchers.any(), ArgumentMatchers.any(CronTask.class)); + Mockito.verify(executor, Mockito.times(2)).submit(ArgumentMatchers.any(Callable.class)); + Mockito.verify(monitor, Mockito.times(2)).monitor(ArgumentMatchers.any()); + + } + + @Test + public void testStart_RepeatAndStopScheduling() + { + ExecutorService executor = Mockito.mock(ExecutorService.class); + + Mockito.doAnswer(new Answer>() + { + private int scheduleCount = 0; + + @SuppressWarnings("unchecked") + @Override + public Future answer(InvocationOnMock invocation) throws Exception + { + final Object originalArgument = (invocation.getArguments())[3]; + CronTask task = ((CronTask) originalArgument); + Mockito.doAnswer(new Answer>() + { + @Override + public Future answer(InvocationOnMock invocation) throws Exception + { + final Object originalArgument = (invocation.getArguments())[0]; + ((Callable) originalArgument).call(); + return CompletableFuture.completedFuture(Boolean.FALSE); + } + }).when(executor).submit(ArgumentMatchers.any(Callable.class)); + + while (scheduleCount < 2) { + scheduleCount++; + task.run(123L); + } + return createDummyFuture(); + } + }).when(cronScheduler).scheduleAtFixedRate(ArgumentMatchers.anyLong(), ArgumentMatchers.anyLong(), + ArgumentMatchers.any(), ArgumentMatchers.any(CronTask.class)); + + Monitor monitor = Mockito.mock(Monitor.class); + Mockito.when(monitor.getScheduledFuture()).thenReturn(createDummyFuture()); + + MonitorSchedulerConfig config = Mockito.mock(MonitorSchedulerConfig.class); + Mockito.when(config.getEmitterPeriod()).thenReturn(new org.joda.time.Duration(1000L)); + + final MonitorScheduler scheduler = new MonitorScheduler( + config, + cronScheduler, + Mockito.mock(ServiceEmitter.class), + ImmutableList.of(monitor), + executor); + scheduler.start(); + + Mockito.verify(monitor, Mockito.times(1)).start(); + Mockito.verify(cronScheduler, Mockito.times(1)).scheduleAtFixedRate(ArgumentMatchers.anyLong(), + ArgumentMatchers.anyLong(), + ArgumentMatchers.any(), ArgumentMatchers.any(CronTask.class)); + Mockito.verify(executor, Mockito.times(1)).submit(ArgumentMatchers.any(Callable.class)); + Mockito.verify(monitor, Mockito.times(1)).monitor(ArgumentMatchers.any()); + Mockito.verify(monitor, Mockito.times(1)).stop(); + + } + + @Test + public void testStart_UnexpectedExceptionWhileMonitoring() + { + ExecutorService executor = Mockito.mock(ExecutorService.class); + Monitor monitor = Mockito.mock(Monitor.class); + Mockito.when(monitor.getScheduledFuture()).thenReturn(createDummyFuture()); + Mockito.when(monitor.monitor(ArgumentMatchers.any(ServiceEmitter.class))).thenThrow(new RuntimeException()); + + MonitorSchedulerConfig config = Mockito.mock(MonitorSchedulerConfig.class); + Mockito.when(config.getEmitterPeriod()).thenReturn(new org.joda.time.Duration(1000L)); + + + Mockito.doAnswer(new Answer>() + { + @SuppressWarnings("unchecked") + @Override + public Future answer(InvocationOnMock invocation) throws Exception + { + final Object originalArgument = (invocation.getArguments())[3]; + CronTask task = ((CronTask) originalArgument); + + Mockito.doAnswer(new Answer>() + { + @Override + public Future answer(InvocationOnMock invocation) throws Exception + { + final Object originalArgument = (invocation.getArguments())[0]; + ((Callable) originalArgument).call(); + return CompletableFuture.completedFuture(Boolean.TRUE); + } + }).when(executor).submit(ArgumentMatchers.any(Callable.class)); + + task.run(123L); + return createDummyFuture(); + } + }).when(cronScheduler).scheduleAtFixedRate(ArgumentMatchers.anyLong(), ArgumentMatchers.anyLong(), + ArgumentMatchers.any(), ArgumentMatchers.any(CronTask.class)); + + + final MonitorScheduler scheduler = new MonitorScheduler( + config, + cronScheduler, + Mockito.mock(ServiceEmitter.class), + ImmutableList.of(monitor), + executor); + scheduler.start(); + + Mockito.verify(monitor, Mockito.times(1)).start(); + Mockito.verify(cronScheduler, Mockito.times(1)).scheduleAtFixedRate(ArgumentMatchers.anyLong(), + ArgumentMatchers.anyLong(), + ArgumentMatchers.any(), ArgumentMatchers.any(CronTask.class)); + Mockito.verify(executor, Mockito.times(1)).submit(ArgumentMatchers.any(Callable.class)); + Mockito.verify(monitor, Mockito.times(1)).monitor(ArgumentMatchers.any()); + } + + + @Test + public void testStart_UnexpectedExceptionWhileScheduling() + { + ExecutorService executor = Mockito.mock(ExecutorService.class); + Monitor monitor = Mockito.mock(Monitor.class); + Mockito.when(monitor.getScheduledFuture()).thenReturn(createDummyFuture()); + MonitorSchedulerConfig config = Mockito.mock(MonitorSchedulerConfig.class); + Mockito.when(config.getEmitterPeriod()).thenReturn(new org.joda.time.Duration(1000L)); + + + Mockito.doAnswer(new Answer>() + { + @SuppressWarnings("unchecked") + @Override + public Future answer(InvocationOnMock invocation) throws Exception + { + final Object originalArgument = (invocation.getArguments())[3]; + CronTask task = ((CronTask) originalArgument); + + Mockito.when(executor.submit(ArgumentMatchers.any(Callable.class))).thenThrow(new RuntimeException()); + task.run(123L); + return createDummyFuture(); + } + }).when(cronScheduler).scheduleAtFixedRate(ArgumentMatchers.anyLong(), ArgumentMatchers.anyLong(), + ArgumentMatchers.any(), ArgumentMatchers.any(CronTask.class)); + + + final MonitorScheduler scheduler = new MonitorScheduler( + config, + cronScheduler, + Mockito.mock(ServiceEmitter.class), + ImmutableList.of(monitor), + executor); + scheduler.start(); + + Mockito.verify(monitor, Mockito.times(1)).start(); + Mockito.verify(cronScheduler, Mockito.times(1)).scheduleAtFixedRate(ArgumentMatchers.anyLong(), + ArgumentMatchers.anyLong(), + ArgumentMatchers.any(), ArgumentMatchers.any(CronTask.class)); + Mockito.verify(executor, Mockito.times(1)).submit(ArgumentMatchers.any(Callable.class)); + } + + + private Future createDummyFuture() + { + Future future = new Future() + { + + @Override + public boolean cancel(boolean mayInterruptIfRunning) + { + return false; + } + + @Override + public boolean isCancelled() + { + return false; + } + + @Override + public boolean isDone() + { + return false; + } + @Override + public Object get() throws InterruptedException, ExecutionException + { + return null; + } + + @Override + public Object get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException + { + return null; + } + + }; + + return future; + } + + private static class NoopMonitor implements Monitor { @Override @@ -84,5 +368,19 @@ public boolean monitor(ServiceEmitter emitter) { return true; } + + @Override + public Future getScheduledFuture() + { + // TODO Auto-generated method stub + return null; + } + + @Override + public void setScheduledFuture(Future scheduledFuture) + { + // TODO Auto-generated method stub + + } } } diff --git a/server/src/main/java/org/apache/druid/server/metrics/MetricsModule.java b/server/src/main/java/org/apache/druid/server/metrics/MetricsModule.java index 5f97f5229cbe..e24a8b519aa7 100644 --- a/server/src/main/java/org/apache/druid/server/metrics/MetricsModule.java +++ b/server/src/main/java/org/apache/druid/server/metrics/MetricsModule.java @@ -47,6 +47,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.Executors; import java.util.stream.Collectors; /** @@ -109,7 +110,8 @@ public MonitorScheduler getMonitorScheduler( config.get(), CronScheduler.newBuilder(Duration.ofSeconds(1L)).setThreadName("MonitorScheduler-%s").build(), emitter, - monitors + monitors, + Executors.newFixedThreadPool(10) ); } From a24eacb46a324946adce12edb437fec6ef40a986 Mon Sep 17 00:00:00 2001 From: Ayush Kulshrestha Date: Sun, 1 Nov 2020 16:44:50 +0530 Subject: [PATCH 08/11] added override annotation --- .../org/apache/druid/java/util/metrics/AbstractMonitor.java | 2 ++ .../apache/druid/java/util/metrics/MonitorSchedulerTest.java | 4 ++-- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/core/src/main/java/org/apache/druid/java/util/metrics/AbstractMonitor.java b/core/src/main/java/org/apache/druid/java/util/metrics/AbstractMonitor.java index d9f10452e001..4fbefb88e5c4 100644 --- a/core/src/main/java/org/apache/druid/java/util/metrics/AbstractMonitor.java +++ b/core/src/main/java/org/apache/druid/java/util/metrics/AbstractMonitor.java @@ -57,11 +57,13 @@ public boolean monitor(ServiceEmitter emitter) public abstract boolean doMonitor(ServiceEmitter emitter); + @Override public Future getScheduledFuture() { return scheduledFuture; } + @Override public void setScheduledFuture(Future scheduledFuture) { this.scheduledFuture = scheduledFuture; diff --git a/core/src/test/java/org/apache/druid/java/util/metrics/MonitorSchedulerTest.java b/core/src/test/java/org/apache/druid/java/util/metrics/MonitorSchedulerTest.java index efb3e5adcb4e..4898d32dead7 100644 --- a/core/src/test/java/org/apache/druid/java/util/metrics/MonitorSchedulerTest.java +++ b/core/src/test/java/org/apache/druid/java/util/metrics/MonitorSchedulerTest.java @@ -332,13 +332,13 @@ public boolean isDone() } @Override - public Object get() throws InterruptedException, ExecutionException + public Object get() { return null; } @Override - public Object get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException + public Object get(long timeout, TimeUnit unit) { return null; } From 10ba516b47068b83b1021142fc940c1b788ffce8 Mon Sep 17 00:00:00 2001 From: Ayush Kulshrestha Date: Sun, 1 Nov 2020 17:40:51 +0530 Subject: [PATCH 09/11] fixed checkstyle --- .../org/apache/druid/java/util/metrics/MonitorScheduler.java | 3 ++- .../apache/druid/java/util/metrics/MonitorSchedulerTest.java | 2 -- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/core/src/main/java/org/apache/druid/java/util/metrics/MonitorScheduler.java b/core/src/main/java/org/apache/druid/java/util/metrics/MonitorScheduler.java index 51726002e137..961f8239482b 100644 --- a/core/src/main/java/org/apache/druid/java/util/metrics/MonitorScheduler.java +++ b/core/src/main/java/org/apache/druid/java/util/metrics/MonitorScheduler.java @@ -135,7 +135,7 @@ private void startMonitor(final Monitor monitor) { synchronized (lock) { monitor.start(); - Long rate = config.getEmitterPeriod().getMillis(); + long rate = config.getEmitterPeriod().getMillis(); Future scheduledFuture = scheduler.scheduleAtFixedRate( rate, rate, @@ -157,6 +157,7 @@ public void run(long scheduledRunTimeMillis) log.trace("Running %s (period %s)", this, rate); monitorFuture = executorService.submit(new Callable() { + @Override public Boolean call() { try { diff --git a/core/src/test/java/org/apache/druid/java/util/metrics/MonitorSchedulerTest.java b/core/src/test/java/org/apache/druid/java/util/metrics/MonitorSchedulerTest.java index 4898d32dead7..da2ba5926f59 100644 --- a/core/src/test/java/org/apache/druid/java/util/metrics/MonitorSchedulerTest.java +++ b/core/src/test/java/org/apache/druid/java/util/metrics/MonitorSchedulerTest.java @@ -38,11 +38,9 @@ import java.util.Optional; import java.util.concurrent.Callable; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; public class MonitorSchedulerTest { From 3af13a0e8cc43571969bead1ab787f5b5e2f8a6d Mon Sep 17 00:00:00 2001 From: Ayush Kulshrestha Date: Sun, 1 Nov 2020 17:43:40 +0530 Subject: [PATCH 10/11] changed executor to use cachedThreadPool --- .../java/org/apache/druid/server/metrics/MetricsModule.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/org/apache/druid/server/metrics/MetricsModule.java b/server/src/main/java/org/apache/druid/server/metrics/MetricsModule.java index e24a8b519aa7..485df5653d50 100644 --- a/server/src/main/java/org/apache/druid/server/metrics/MetricsModule.java +++ b/server/src/main/java/org/apache/druid/server/metrics/MetricsModule.java @@ -111,7 +111,7 @@ public MonitorScheduler getMonitorScheduler( CronScheduler.newBuilder(Duration.ofSeconds(1L)).setThreadName("MonitorScheduler-%s").build(), emitter, monitors, - Executors.newFixedThreadPool(10) + Executors.newCachedThreadPool() ); } From d8628ad13629a4a8bbca52e3f6561276be7b5893 Mon Sep 17 00:00:00 2001 From: Ayush Kulshrestha Date: Sat, 7 Nov 2020 18:28:11 +0530 Subject: [PATCH 11/11] changed number of threads to 64 --- .../java/org/apache/druid/server/metrics/MetricsModule.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/server/src/main/java/org/apache/druid/server/metrics/MetricsModule.java b/server/src/main/java/org/apache/druid/server/metrics/MetricsModule.java index 485df5653d50..d7598aeacfd7 100644 --- a/server/src/main/java/org/apache/druid/server/metrics/MetricsModule.java +++ b/server/src/main/java/org/apache/druid/server/metrics/MetricsModule.java @@ -32,6 +32,7 @@ import org.apache.druid.guice.JsonConfigProvider; import org.apache.druid.guice.LazySingleton; import org.apache.druid.guice.ManageLifecycle; +import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.java.util.emitter.service.ServiceEmitter; import org.apache.druid.java.util.metrics.JvmCpuMonitor; @@ -47,7 +48,6 @@ import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.Executors; import java.util.stream.Collectors; /** @@ -108,10 +108,10 @@ public MonitorScheduler getMonitorScheduler( return new MonitorScheduler( config.get(), - CronScheduler.newBuilder(Duration.ofSeconds(1L)).setThreadName("MonitorScheduler-%s").build(), + CronScheduler.newBuilder(Duration.ofSeconds(1L)).setThreadName("MonitorSchedulerThread").build(), emitter, monitors, - Executors.newCachedThreadPool() + Execs.multiThreaded(64, "MonitorThread-%d") ); }