diff --git a/core/pom.xml b/core/pom.xml
index e0ccfa1a04bf..72fea5468929 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -224,6 +224,10 @@
org.antlr
antlr4-runtime
+
+ io.timeandspace
+ cron-scheduler
+
diff --git a/core/src/main/java/org/apache/druid/java/util/metrics/AbstractMonitor.java b/core/src/main/java/org/apache/druid/java/util/metrics/AbstractMonitor.java
index 029dd4780002..4fbefb88e5c4 100644
--- a/core/src/main/java/org/apache/druid/java/util/metrics/AbstractMonitor.java
+++ b/core/src/main/java/org/apache/druid/java/util/metrics/AbstractMonitor.java
@@ -22,11 +22,16 @@
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import java.util.concurrent.Future;
+
+
/**
*/
public abstract class AbstractMonitor implements Monitor
{
private volatile boolean started = false;
+
+ private volatile Future> scheduledFuture;
@Override
public void start()
@@ -51,4 +56,16 @@ public boolean monitor(ServiceEmitter emitter)
}
public abstract boolean doMonitor(ServiceEmitter emitter);
+
+ @Override
+ public Future> getScheduledFuture()
+ {
+ return scheduledFuture;
+ }
+
+ @Override
+ public void setScheduledFuture(Future> scheduledFuture)
+ {
+ this.scheduledFuture = scheduledFuture;
+ }
}
diff --git a/core/src/main/java/org/apache/druid/java/util/metrics/CompoundMonitor.java b/core/src/main/java/org/apache/druid/java/util/metrics/CompoundMonitor.java
index 9811f584bc0e..6649312d4a2c 100644
--- a/core/src/main/java/org/apache/druid/java/util/metrics/CompoundMonitor.java
+++ b/core/src/main/java/org/apache/druid/java/util/metrics/CompoundMonitor.java
@@ -24,10 +24,13 @@
import java.util.Arrays;
import java.util.List;
+import java.util.concurrent.Future;
public abstract class CompoundMonitor implements Monitor
{
private final List monitors;
+
+ private volatile Future> scheduledFuture;
public CompoundMonitor(List monitors)
{
@@ -61,5 +64,17 @@ public boolean monitor(final ServiceEmitter emitter)
return shouldReschedule(Lists.transform(monitors, monitor -> monitor.monitor(emitter)));
}
+ @Override
+ public Future> getScheduledFuture()
+ {
+ return scheduledFuture;
+ }
+
+ @Override
+ public void setScheduledFuture(Future> scheduledFuture)
+ {
+ this.scheduledFuture = scheduledFuture;
+ }
+
public abstract boolean shouldReschedule(List reschedules);
}
diff --git a/core/src/main/java/org/apache/druid/java/util/metrics/Monitor.java b/core/src/main/java/org/apache/druid/java/util/metrics/Monitor.java
index 2ccd5db3ca6f..8a3975e57e29 100644
--- a/core/src/main/java/org/apache/druid/java/util/metrics/Monitor.java
+++ b/core/src/main/java/org/apache/druid/java/util/metrics/Monitor.java
@@ -21,6 +21,9 @@
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import java.util.concurrent.Future;
+
+
/**
*/
public interface Monitor
@@ -35,4 +38,8 @@ public interface Monitor
* @return true if this monitor needs to continue monitoring. False otherwise.
*/
boolean monitor(ServiceEmitter emitter);
+
+ Future> getScheduledFuture();
+
+ void setScheduledFuture(Future> scheduledFuture);
}
diff --git a/core/src/main/java/org/apache/druid/java/util/metrics/MonitorScheduler.java b/core/src/main/java/org/apache/druid/java/util/metrics/MonitorScheduler.java
index 2adbe9510a36..961f8239482b 100644
--- a/core/src/main/java/org/apache/druid/java/util/metrics/MonitorScheduler.java
+++ b/core/src/main/java/org/apache/druid/java/util/metrics/MonitorScheduler.java
@@ -20,41 +20,52 @@
package org.apache.druid.java.util.metrics;
import com.google.common.collect.Sets;
+import io.timeandspace.cronscheduler.CronScheduler;
+import io.timeandspace.cronscheduler.CronTask;
import org.apache.druid.java.util.common.ISE;
-import org.apache.druid.java.util.common.concurrent.ScheduledExecutors;
import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
+import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Callable;
-import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
/**
*/
public class MonitorScheduler
{
+
+ private static final Logger log = new Logger(MonitorScheduler.class);
+
private final MonitorSchedulerConfig config;
- private final ScheduledExecutorService exec;
private final ServiceEmitter emitter;
private final Set monitors;
private final Object lock = new Object();
+ private final CronScheduler scheduler;
+ private final ExecutorService executorService;
private volatile boolean started = false;
-
+
public MonitorScheduler(
MonitorSchedulerConfig config,
- ScheduledExecutorService exec,
+ CronScheduler scheduler,
ServiceEmitter emitter,
- List monitors
+ List monitors,
+ ExecutorService executorService
)
{
this.config = config;
- this.exec = exec;
+ this.scheduler = scheduler;
this.emitter = emitter;
this.monitors = Sets.newHashSet(monitors);
+ this.executorService = executorService;
}
@LifecycleStart
@@ -124,24 +135,47 @@ private void startMonitor(final Monitor monitor)
{
synchronized (lock) {
monitor.start();
- ScheduledExecutors.scheduleAtFixedRate(
- exec,
- config.getEmitterPeriod(),
- new Callable()
+ long rate = config.getEmitterPeriod().getMillis();
+ Future> scheduledFuture = scheduler.scheduleAtFixedRate(
+ rate,
+ rate,
+ TimeUnit.MILLISECONDS,
+ new CronTask()
{
+ private volatile Future monitorFuture = null;
@Override
- public ScheduledExecutors.Signal call()
+ public void run(long scheduledRunTimeMillis)
{
- // Run one more time even if the monitor was removed, in case there's some extra data to flush
- if (monitor.monitor(emitter) && hasMonitor(monitor)) {
- return ScheduledExecutors.Signal.REPEAT;
- } else {
- removeMonitor(monitor);
- return ScheduledExecutors.Signal.STOP;
+ try {
+ if (monitorFuture != null && monitorFuture.isDone()
+ && !(monitorFuture.get() && hasMonitor(monitor))) {
+ removeMonitor(monitor);
+ monitor.getScheduledFuture().cancel(false);
+ log.debug("Stopped rescheduling %s (delay %s)", this, rate);
+ return;
+ }
+ log.trace("Running %s (period %s)", this, rate);
+ monitorFuture = executorService.submit(new Callable()
+ {
+ @Override
+ public Boolean call()
+ {
+ try {
+ return monitor.monitor(emitter);
+ }
+ catch (Throwable e) {
+ log.error(e, "Uncaught exception.");
+ return Boolean.FALSE;
+ }
+ }
+ });
+ }
+ catch (Throwable e) {
+ log.error(e, "Uncaught exception.");
}
}
- }
- );
+ });
+ monitor.setScheduledFuture(scheduledFuture);
}
}
@@ -151,4 +185,5 @@ private boolean hasMonitor(final Monitor monitor)
return monitors.contains(monitor);
}
}
+
}
diff --git a/core/src/test/java/org/apache/druid/java/util/metrics/MonitorSchedulerTest.java b/core/src/test/java/org/apache/druid/java/util/metrics/MonitorSchedulerTest.java
index 76671968b911..da2ba5926f59 100644
--- a/core/src/test/java/org/apache/druid/java/util/metrics/MonitorSchedulerTest.java
+++ b/core/src/test/java/org/apache/druid/java/util/metrics/MonitorSchedulerTest.java
@@ -19,17 +19,41 @@
package org.apache.druid.java.util.metrics;
+
import com.google.common.collect.ImmutableList;
-import org.apache.druid.java.util.common.concurrent.Execs;
+import io.timeandspace.cronscheduler.CronScheduler;
+import io.timeandspace.cronscheduler.CronTask;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.junit.Assert;
+import org.junit.Before;
import org.junit.Test;
+import org.mockito.ArgumentMatchers;
+import org.mockito.Mock;
import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import java.time.Duration;
import java.util.Optional;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
public class MonitorSchedulerTest
{
+
+ @Mock
+ private CronScheduler cronScheduler;
+
+ @Before
+ public void setUp()
+ {
+ MockitoAnnotations.initMocks(this);
+ }
+
@Test
public void testFindMonitor()
{
@@ -45,12 +69,15 @@ class Monitor3 extends NoopMonitor
final Monitor1 monitor1 = new Monitor1();
final Monitor2 monitor2 = new Monitor2();
+
+ ExecutorService executor = Mockito.mock(ExecutorService.class);
final MonitorScheduler scheduler = new MonitorScheduler(
Mockito.mock(MonitorSchedulerConfig.class),
- Execs.scheduledSingleThreaded("monitor-scheduler-test"),
+ CronScheduler.newBuilder(Duration.ofSeconds(1L)).setThreadName("monitor-scheduler-test").build(),
Mockito.mock(ServiceEmitter.class),
- ImmutableList.of(monitor1, monitor2)
+ ImmutableList.of(monitor1, monitor2),
+ executor
);
final Optional maybeFound1 = scheduler.findMonitor(Monitor1.class);
@@ -62,7 +89,264 @@ class Monitor3 extends NoopMonitor
Assert.assertFalse(scheduler.findMonitor(Monitor3.class).isPresent());
}
+
+ @Test
+ public void testStart_RepeatScheduling()
+ {
+ ExecutorService executor = Mockito.mock(ExecutorService.class);
+
+ Mockito.doAnswer(new Answer>()
+ {
+ private int scheduleCount = 0;
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public Future> answer(InvocationOnMock invocation) throws Exception
+ {
+ final Object originalArgument = (invocation.getArguments())[3];
+ CronTask task = ((CronTask) originalArgument);
+
+ Mockito.doAnswer(new Answer>()
+ {
+ @Override
+ public Future answer(InvocationOnMock invocation) throws Exception
+ {
+ final Object originalArgument = (invocation.getArguments())[0];
+ ((Callable) originalArgument).call();
+ return CompletableFuture.completedFuture(Boolean.TRUE);
+ }
+ }).when(executor).submit(ArgumentMatchers.any(Callable.class));
+ while (scheduleCount < 2) {
+ scheduleCount++;
+ task.run(123L);
+ }
+ return createDummyFuture();
+ }
+ }).when(cronScheduler).scheduleAtFixedRate(ArgumentMatchers.anyLong(), ArgumentMatchers.anyLong(),
+ ArgumentMatchers.any(), ArgumentMatchers.any(CronTask.class));
+
+ Monitor monitor = Mockito.mock(Monitor.class);
+
+ MonitorSchedulerConfig config = Mockito.mock(MonitorSchedulerConfig.class);
+ Mockito.when(config.getEmitterPeriod()).thenReturn(new org.joda.time.Duration(1000L));
+
+ final MonitorScheduler scheduler = new MonitorScheduler(
+ config,
+ cronScheduler,
+ Mockito.mock(ServiceEmitter.class),
+ ImmutableList.of(monitor),
+ executor);
+ scheduler.start();
+
+ Mockito.verify(monitor, Mockito.times(1)).start();
+ Mockito.verify(cronScheduler, Mockito.times(1)).scheduleAtFixedRate(ArgumentMatchers.anyLong(),
+ ArgumentMatchers.anyLong(),
+ ArgumentMatchers.any(), ArgumentMatchers.any(CronTask.class));
+ Mockito.verify(executor, Mockito.times(2)).submit(ArgumentMatchers.any(Callable.class));
+ Mockito.verify(monitor, Mockito.times(2)).monitor(ArgumentMatchers.any());
+
+ }
+
+ @Test
+ public void testStart_RepeatAndStopScheduling()
+ {
+ ExecutorService executor = Mockito.mock(ExecutorService.class);
+
+ Mockito.doAnswer(new Answer>()
+ {
+ private int scheduleCount = 0;
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public Future> answer(InvocationOnMock invocation) throws Exception
+ {
+ final Object originalArgument = (invocation.getArguments())[3];
+ CronTask task = ((CronTask) originalArgument);
+ Mockito.doAnswer(new Answer>()
+ {
+ @Override
+ public Future answer(InvocationOnMock invocation) throws Exception
+ {
+ final Object originalArgument = (invocation.getArguments())[0];
+ ((Callable) originalArgument).call();
+ return CompletableFuture.completedFuture(Boolean.FALSE);
+ }
+ }).when(executor).submit(ArgumentMatchers.any(Callable.class));
+
+ while (scheduleCount < 2) {
+ scheduleCount++;
+ task.run(123L);
+ }
+ return createDummyFuture();
+ }
+ }).when(cronScheduler).scheduleAtFixedRate(ArgumentMatchers.anyLong(), ArgumentMatchers.anyLong(),
+ ArgumentMatchers.any(), ArgumentMatchers.any(CronTask.class));
+
+ Monitor monitor = Mockito.mock(Monitor.class);
+ Mockito.when(monitor.getScheduledFuture()).thenReturn(createDummyFuture());
+
+ MonitorSchedulerConfig config = Mockito.mock(MonitorSchedulerConfig.class);
+ Mockito.when(config.getEmitterPeriod()).thenReturn(new org.joda.time.Duration(1000L));
+
+ final MonitorScheduler scheduler = new MonitorScheduler(
+ config,
+ cronScheduler,
+ Mockito.mock(ServiceEmitter.class),
+ ImmutableList.of(monitor),
+ executor);
+ scheduler.start();
+
+ Mockito.verify(monitor, Mockito.times(1)).start();
+ Mockito.verify(cronScheduler, Mockito.times(1)).scheduleAtFixedRate(ArgumentMatchers.anyLong(),
+ ArgumentMatchers.anyLong(),
+ ArgumentMatchers.any(), ArgumentMatchers.any(CronTask.class));
+ Mockito.verify(executor, Mockito.times(1)).submit(ArgumentMatchers.any(Callable.class));
+ Mockito.verify(monitor, Mockito.times(1)).monitor(ArgumentMatchers.any());
+ Mockito.verify(monitor, Mockito.times(1)).stop();
+
+ }
+
+ @Test
+ public void testStart_UnexpectedExceptionWhileMonitoring()
+ {
+ ExecutorService executor = Mockito.mock(ExecutorService.class);
+ Monitor monitor = Mockito.mock(Monitor.class);
+ Mockito.when(monitor.getScheduledFuture()).thenReturn(createDummyFuture());
+ Mockito.when(monitor.monitor(ArgumentMatchers.any(ServiceEmitter.class))).thenThrow(new RuntimeException());
+
+ MonitorSchedulerConfig config = Mockito.mock(MonitorSchedulerConfig.class);
+ Mockito.when(config.getEmitterPeriod()).thenReturn(new org.joda.time.Duration(1000L));
+
+
+ Mockito.doAnswer(new Answer>()
+ {
+ @SuppressWarnings("unchecked")
+ @Override
+ public Future> answer(InvocationOnMock invocation) throws Exception
+ {
+ final Object originalArgument = (invocation.getArguments())[3];
+ CronTask task = ((CronTask) originalArgument);
+
+ Mockito.doAnswer(new Answer>()
+ {
+ @Override
+ public Future answer(InvocationOnMock invocation) throws Exception
+ {
+ final Object originalArgument = (invocation.getArguments())[0];
+ ((Callable) originalArgument).call();
+ return CompletableFuture.completedFuture(Boolean.TRUE);
+ }
+ }).when(executor).submit(ArgumentMatchers.any(Callable.class));
+
+ task.run(123L);
+ return createDummyFuture();
+ }
+ }).when(cronScheduler).scheduleAtFixedRate(ArgumentMatchers.anyLong(), ArgumentMatchers.anyLong(),
+ ArgumentMatchers.any(), ArgumentMatchers.any(CronTask.class));
+
+
+ final MonitorScheduler scheduler = new MonitorScheduler(
+ config,
+ cronScheduler,
+ Mockito.mock(ServiceEmitter.class),
+ ImmutableList.of(monitor),
+ executor);
+ scheduler.start();
+
+ Mockito.verify(monitor, Mockito.times(1)).start();
+ Mockito.verify(cronScheduler, Mockito.times(1)).scheduleAtFixedRate(ArgumentMatchers.anyLong(),
+ ArgumentMatchers.anyLong(),
+ ArgumentMatchers.any(), ArgumentMatchers.any(CronTask.class));
+ Mockito.verify(executor, Mockito.times(1)).submit(ArgumentMatchers.any(Callable.class));
+ Mockito.verify(monitor, Mockito.times(1)).monitor(ArgumentMatchers.any());
+ }
+
+
+ @Test
+ public void testStart_UnexpectedExceptionWhileScheduling()
+ {
+ ExecutorService executor = Mockito.mock(ExecutorService.class);
+ Monitor monitor = Mockito.mock(Monitor.class);
+ Mockito.when(monitor.getScheduledFuture()).thenReturn(createDummyFuture());
+ MonitorSchedulerConfig config = Mockito.mock(MonitorSchedulerConfig.class);
+ Mockito.when(config.getEmitterPeriod()).thenReturn(new org.joda.time.Duration(1000L));
+
+
+ Mockito.doAnswer(new Answer>()
+ {
+ @SuppressWarnings("unchecked")
+ @Override
+ public Future> answer(InvocationOnMock invocation) throws Exception
+ {
+ final Object originalArgument = (invocation.getArguments())[3];
+ CronTask task = ((CronTask) originalArgument);
+
+ Mockito.when(executor.submit(ArgumentMatchers.any(Callable.class))).thenThrow(new RuntimeException());
+ task.run(123L);
+ return createDummyFuture();
+ }
+ }).when(cronScheduler).scheduleAtFixedRate(ArgumentMatchers.anyLong(), ArgumentMatchers.anyLong(),
+ ArgumentMatchers.any(), ArgumentMatchers.any(CronTask.class));
+
+
+ final MonitorScheduler scheduler = new MonitorScheduler(
+ config,
+ cronScheduler,
+ Mockito.mock(ServiceEmitter.class),
+ ImmutableList.of(monitor),
+ executor);
+ scheduler.start();
+
+ Mockito.verify(monitor, Mockito.times(1)).start();
+ Mockito.verify(cronScheduler, Mockito.times(1)).scheduleAtFixedRate(ArgumentMatchers.anyLong(),
+ ArgumentMatchers.anyLong(),
+ ArgumentMatchers.any(), ArgumentMatchers.any(CronTask.class));
+ Mockito.verify(executor, Mockito.times(1)).submit(ArgumentMatchers.any(Callable.class));
+ }
+
+
+ private Future createDummyFuture()
+ {
+ Future> future = new Future()
+ {
+
+ @Override
+ public boolean cancel(boolean mayInterruptIfRunning)
+ {
+ return false;
+ }
+
+ @Override
+ public boolean isCancelled()
+ {
+ return false;
+ }
+
+ @Override
+ public boolean isDone()
+ {
+ return false;
+ }
+
+ @Override
+ public Object get()
+ {
+ return null;
+ }
+
+ @Override
+ public Object get(long timeout, TimeUnit unit)
+ {
+ return null;
+ }
+
+ };
+
+ return future;
+ }
+
+
private static class NoopMonitor implements Monitor
{
@Override
@@ -82,5 +366,19 @@ public boolean monitor(ServiceEmitter emitter)
{
return true;
}
+
+ @Override
+ public Future> getScheduledFuture()
+ {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public void setScheduledFuture(Future> scheduledFuture)
+ {
+ // TODO Auto-generated method stub
+
+ }
}
}
diff --git a/licenses.yaml b/licenses.yaml
index a6d85500ae2b..896a94366003 100644
--- a/licenses.yaml
+++ b/licenses.yaml
@@ -345,6 +345,16 @@ libraries:
---
+name: CronScheduler
+license_category: binary
+module: java-core
+license_name: Apache License version 2.0
+version: 0.1
+libraries:
+ - io.timeandspace: cron-scheduler
+
+---
+
name: LMAX Disruptor
license_category: binary
module: java-core
diff --git a/pom.xml b/pom.xml
index 635d62eb8b21..bfa4a6be9a65 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1254,6 +1254,11 @@
1.19.0
test
+
+ io.timeandspace
+ cron-scheduler
+ 0.1
+
diff --git a/server/pom.xml b/server/pom.xml
index 7fa3a488e715..caddf483bdc5 100644
--- a/server/pom.xml
+++ b/server/pom.xml
@@ -311,6 +311,10 @@
io.github.resilience4j
resilience4j-bulkhead
+
+ io.timeandspace
+ cron-scheduler
+
diff --git a/server/src/main/java/org/apache/druid/server/metrics/MetricsModule.java b/server/src/main/java/org/apache/druid/server/metrics/MetricsModule.java
index 71b994541252..d7598aeacfd7 100644
--- a/server/src/main/java/org/apache/druid/server/metrics/MetricsModule.java
+++ b/server/src/main/java/org/apache/druid/server/metrics/MetricsModule.java
@@ -27,6 +27,7 @@
import com.google.inject.Module;
import com.google.inject.Provides;
import com.google.inject.name.Names;
+import io.timeandspace.cronscheduler.CronScheduler;
import org.apache.druid.guice.DruidBinders;
import org.apache.druid.guice.JsonConfigProvider;
import org.apache.druid.guice.LazySingleton;
@@ -42,6 +43,7 @@
import org.apache.druid.java.util.metrics.SysMonitor;
import org.apache.druid.query.ExecutorServiceMonitor;
+import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@@ -106,9 +108,10 @@ public MonitorScheduler getMonitorScheduler(
return new MonitorScheduler(
config.get(),
- Execs.scheduledSingleThreaded("MonitorScheduler-%s"),
+ CronScheduler.newBuilder(Duration.ofSeconds(1L)).setThreadName("MonitorSchedulerThread").build(),
emitter,
- monitors
+ monitors,
+ Execs.multiThreaded(64, "MonitorThread-%d")
);
}