Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,10 @@
<groupId>org.antlr</groupId>
<artifactId>antlr4-runtime</artifactId>
</dependency>
<dependency>
<groupId>io.timeandspace</groupId>
<artifactId>cron-scheduler</artifactId>
</dependency>



Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,16 @@

import org.apache.druid.java.util.emitter.service.ServiceEmitter;

import java.util.concurrent.Future;


/**
*/
public abstract class AbstractMonitor implements Monitor
{
private volatile boolean started = false;

private volatile Future<?> scheduledFuture;

@Override
public void start()
Expand All @@ -51,4 +56,16 @@ public boolean monitor(ServiceEmitter emitter)
}

public abstract boolean doMonitor(ServiceEmitter emitter);

@Override
public Future<?> getScheduledFuture()
{
return scheduledFuture;
}

@Override
public void setScheduledFuture(Future<?> scheduledFuture)
{
this.scheduledFuture = scheduledFuture;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,13 @@

import java.util.Arrays;
import java.util.List;
import java.util.concurrent.Future;

public abstract class CompoundMonitor implements Monitor
{
private final List<Monitor> monitors;

private volatile Future<?> scheduledFuture;

public CompoundMonitor(List<Monitor> monitors)
{
Expand Down Expand Up @@ -61,5 +64,17 @@ public boolean monitor(final ServiceEmitter emitter)
return shouldReschedule(Lists.transform(monitors, monitor -> monitor.monitor(emitter)));
}

@Override
public Future<?> getScheduledFuture()
{
return scheduledFuture;
}

@Override
public void setScheduledFuture(Future<?> scheduledFuture)
{
this.scheduledFuture = scheduledFuture;
}

public abstract boolean shouldReschedule(List<Boolean> reschedules);
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@

import org.apache.druid.java.util.emitter.service.ServiceEmitter;

import java.util.concurrent.Future;


/**
*/
public interface Monitor
Expand All @@ -35,4 +38,8 @@ public interface Monitor
* @return true if this monitor needs to continue monitoring. False otherwise.
*/
boolean monitor(ServiceEmitter emitter);

Future<?> getScheduledFuture();

void setScheduledFuture(Future<?> scheduledFuture);
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,41 +20,52 @@
package org.apache.druid.java.util.metrics;

import com.google.common.collect.Sets;
import io.timeandspace.cronscheduler.CronScheduler;
import io.timeandspace.cronscheduler.CronTask;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.concurrent.ScheduledExecutors;
import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;

import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;


/**
*/
public class MonitorScheduler
{

private static final Logger log = new Logger(MonitorScheduler.class);

private final MonitorSchedulerConfig config;
private final ScheduledExecutorService exec;
private final ServiceEmitter emitter;
private final Set<Monitor> monitors;
private final Object lock = new Object();
private final CronScheduler scheduler;
private final ExecutorService executorService;

private volatile boolean started = false;

public MonitorScheduler(
MonitorSchedulerConfig config,
ScheduledExecutorService exec,
CronScheduler scheduler,
ServiceEmitter emitter,
List<Monitor> monitors
List<Monitor> monitors,
ExecutorService executorService
)
{
this.config = config;
this.exec = exec;
this.scheduler = scheduler;
this.emitter = emitter;
this.monitors = Sets.newHashSet(monitors);
this.executorService = executorService;
}

@LifecycleStart
Expand Down Expand Up @@ -124,24 +135,47 @@ private void startMonitor(final Monitor monitor)
{
synchronized (lock) {
monitor.start();
ScheduledExecutors.scheduleAtFixedRate(
exec,
config.getEmitterPeriod(),
new Callable<ScheduledExecutors.Signal>()
long rate = config.getEmitterPeriod().getMillis();
Future<?> scheduledFuture = scheduler.scheduleAtFixedRate(
rate,
rate,
TimeUnit.MILLISECONDS,
new CronTask()
{
private volatile Future<Boolean> monitorFuture = null;
@Override
public ScheduledExecutors.Signal call()
public void run(long scheduledRunTimeMillis)
{
// Run one more time even if the monitor was removed, in case there's some extra data to flush
if (monitor.monitor(emitter) && hasMonitor(monitor)) {
return ScheduledExecutors.Signal.REPEAT;
} else {
removeMonitor(monitor);
return ScheduledExecutors.Signal.STOP;
try {
if (monitorFuture != null && monitorFuture.isDone()
&& !(monitorFuture.get() && hasMonitor(monitor))) {
removeMonitor(monitor);
monitor.getScheduledFuture().cancel(false);
log.debug("Stopped rescheduling %s (delay %s)", this, rate);
return;
}
log.trace("Running %s (period %s)", this, rate);
monitorFuture = executorService.submit(new Callable<Boolean>()
{
@Override
public Boolean call()
{
try {
return monitor.monitor(emitter);
}
catch (Throwable e) {
log.error(e, "Uncaught exception.");
return Boolean.FALSE;
}
}
});
}
catch (Throwable e) {
log.error(e, "Uncaught exception.");
}
}
}
);
});
monitor.setScheduledFuture(scheduledFuture);
}
}

Expand All @@ -151,4 +185,5 @@ private boolean hasMonitor(final Monitor monitor)
return monitors.contains(monitor);
}
}

}
Loading