Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,13 @@

import org.apache.druid.java.util.emitter.service.ServiceEmitter;

import java.util.concurrent.Future;


/**
*/
public abstract class AbstractMonitor implements Monitor
{
private volatile boolean started = false;

private volatile Future<?> scheduledFuture;

@Override
public void start()
{
Expand All @@ -56,16 +52,4 @@ public boolean monitor(ServiceEmitter emitter)
}

public abstract boolean doMonitor(ServiceEmitter emitter);

@Override
public Future<?> getScheduledFuture()
{
return scheduledFuture;
}

@Override
public void setScheduledFuture(Future<?> scheduledFuture)
{
this.scheduledFuture = scheduledFuture;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.java.util.metrics;

import org.apache.druid.java.util.common.concurrent.ScheduledExecutors;
import org.apache.druid.java.util.common.concurrent.ScheduledExecutors.Signal;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;

import java.util.List;
import java.util.concurrent.ScheduledExecutorService;

/**
* A {@link MonitorScheduler} implementation based on {@link ScheduledExecutorService}.
*/
public class BasicMonitorScheduler extends MonitorScheduler
{
private final ScheduledExecutorService exec;

public BasicMonitorScheduler(
MonitorSchedulerConfig config,
ServiceEmitter emitter,
List<Monitor> monitors,
ScheduledExecutorService exec
)
{
super(config, emitter, monitors);
this.exec = exec;
}

@Override
void startMonitor(Monitor monitor)
{
monitor.start();
ScheduledExecutors.scheduleAtFixedRate(
exec,
getConfig().getEmitterPeriod(),
() -> {
// Run one more time even if the monitor was removed, in case there's some extra data to flush
if (monitor.monitor(getEmitter()) && hasMonitor(monitor)) {
return Signal.REPEAT;
} else {
removeMonitor(monitor);
return Signal.STOP;
}
}
);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.java.util.metrics;

import io.timeandspace.cronscheduler.CronScheduler;
import io.timeandspace.cronscheduler.CronTask;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;

import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

/**
* A {@link MonitorScheduler} implementation based on {@link CronScheduler}.
*/
public class ClockDriftSafeMonitorScheduler extends MonitorScheduler
{
private static final Logger LOG = new Logger(ClockDriftSafeMonitorScheduler.class);

private final CronScheduler monitorScheduler;
private final ExecutorService monitorRunner;

public ClockDriftSafeMonitorScheduler(
MonitorSchedulerConfig config,
ServiceEmitter emitter,
List<Monitor> monitors,
CronScheduler monitorScheduler,
ExecutorService monitorRunner
)
{
super(config, emitter, monitors);
this.monitorScheduler = monitorScheduler;
this.monitorRunner = monitorRunner;
}

@Override
void startMonitor(final Monitor monitor)
{
monitor.start();
long rate = getConfig().getEmitterPeriod().getMillis();
final AtomicReference<Future<?>> futureReference = new AtomicReference<>();
Future<?> future = monitorScheduler.scheduleAtFixedRate(
rate,
rate,
TimeUnit.MILLISECONDS,
new CronTask()
{
private Future<?> cancellationFuture = null;
private Future<Boolean> monitorFuture = null;

@Override
public void run(long scheduledRunTimeMillis)
{
waitForScheduleFutureToBeSet();
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not just use a CountDownLatch instead of continuously checking in continuous loop that counts down after scheduleFutureReference is set

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can use it, but it seems not matter much to me since this loop is not supposed to run at all in production.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

if (cancellationFuture == null) {
LOG.error("scheduleFuture is not set. Can't run monitor[%s]", monitor.getClass().getName());
return;
}
try {
// Do nothing if the monitor is still running.
if (monitorFuture == null || monitorFuture.isDone()) {
if (monitorFuture != null) {
// monitorFuture must be done at this moment if it's not null
if (!(monitorFuture.get() && hasMonitor(monitor))) {
stopMonitor(monitor);
return;
}
}

LOG.trace("Running monitor[%s]", monitor.getClass().getName());
monitorFuture = monitorRunner.submit(() -> {
try {
return monitor.monitor(getEmitter());
}
catch (Throwable e) {
LOG.error(
e,
"Exception while executing monitor[%s]. Rescheduling in %s ms",
monitor.getClass().getName(),
rate
);
return Boolean.TRUE;
}
});
}
}
catch (Throwable e) {
LOG.error(e, "Uncaught exception.");
}
}

private void waitForScheduleFutureToBeSet()
{
if (cancellationFuture == null) {
while (!Thread.currentThread().isInterrupted()) {
if (futureReference.get() != null) {
cancellationFuture = futureReference.get();
break;
}
}
}
}

private void stopMonitor(Monitor monitor)
{
removeMonitor(monitor);
cancellationFuture.cancel(false);
LOG.debug("Stopped monitor[%s]", monitor.getClass().getName());
}
}
);
futureReference.set(future);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,11 @@

import java.util.Arrays;
import java.util.List;
import java.util.concurrent.Future;

public abstract class CompoundMonitor implements Monitor
{
private final List<Monitor> monitors;

private volatile Future<?> scheduledFuture;

public CompoundMonitor(List<Monitor> monitors)
{
this.monitors = monitors;
Expand Down Expand Up @@ -64,17 +61,5 @@ public boolean monitor(final ServiceEmitter emitter)
return shouldReschedule(Lists.transform(monitors, monitor -> monitor.monitor(emitter)));
}

@Override
public Future<?> getScheduledFuture()
{
return scheduledFuture;
}

@Override
public void setScheduledFuture(Future<?> scheduledFuture)
{
this.scheduledFuture = scheduledFuture;
}

public abstract boolean shouldReschedule(List<Boolean> reschedules);
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@

import org.apache.druid.java.util.emitter.service.ServiceEmitter;

import java.util.concurrent.Future;


/**
*/
Expand All @@ -38,8 +36,4 @@ public interface Monitor
* @return true if this monitor needs to continue monitoring. False otherwise.
*/
boolean monitor(ServiceEmitter emitter);

Future<?> getScheduledFuture();

void setScheduledFuture(Future<?> scheduledFuture);
}
Loading