Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 36 additions & 27 deletions server/src/main/java/io/druid/metadata/SQLMetadataRuleManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,6 @@
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningScheduledExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.inject.Inject;
import io.druid.audit.AuditEntry;
import io.druid.audit.AuditInfo;
Expand Down Expand Up @@ -63,6 +60,7 @@
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

Expand All @@ -72,7 +70,6 @@
public class SQLMetadataRuleManager implements MetadataRuleManager
{


public static void createDefaultRule(
final IDBI dbi,
final String ruleTable,
Expand Down Expand Up @@ -142,13 +139,19 @@ public Void withHandle(Handle handle) throws Exception
private final AuditManager auditManager;

private final Object lock = new Object();

private volatile boolean started = false;

private volatile ListeningScheduledExecutorService exec = null;
private volatile ListenableFuture<?> future = null;

private volatile long retryStartTime = 0;
/** The number of times this SQLMetadataRuleManager was started. */
private long startCount = 0;
/**
* Equal to the current {@link #startCount} value, if the SQLMetadataRuleManager is currently started; -1 if
* currently stopped.
*
* This field is used to implement a simple stamp mechanism instead of just a boolean "started" flag to prevent
* the theoretical situation of two tasks scheduled in {@link #start()} calling {@link #poll()} concurrently, if
* the sequence of {@link #start()} - {@link #stop()} - {@link #start()} actions occurs quickly.
*/
private long currentStartOrder = -1;
private ScheduledExecutorService exec = null;
private long retryStartTime = 0;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: looks that this can be a local variable.


@Inject
public SQLMetadataRuleManager(
Expand All @@ -169,31 +172,42 @@ public SQLMetadataRuleManager(
Preconditions.checkNotNull(config.getAlertThreshold().toStandardDuration());
Preconditions.checkNotNull(config.getPollDuration().toStandardDuration());

this.rules = new AtomicReference<>(
ImmutableMap.<String, List<Rule>>of()
);
this.rules = new AtomicReference<>(ImmutableMap.of());
}

@Override
@LifecycleStart
public void start()
{
synchronized (lock) {
if (started) {
if (currentStartOrder >= 0) {
return;
}

exec = MoreExecutors.listeningDecorator(Execs.scheduledSingleThreaded("DatabaseRuleManager-Exec--%d"));
startCount++;
currentStartOrder = startCount;
long localStartedOrder = currentStartOrder;

exec = Execs.scheduledSingleThreaded("DatabaseRuleManager-Exec--%d");

createDefaultRule(dbi, getRulesTable(), config.getDefaultRule(), jsonMapper);
future = exec.scheduleWithFixedDelay(
exec.scheduleWithFixedDelay(
new Runnable()
{
@Override
public void run()
{
try {
poll();
// poll() is synchronized together with start() and stop() to ensure that when stop() exists, poll()
// won't actually run anymore after that (it could only enter the syncrhonized section and exit
// immediately because the localStartedOrder doesn't match the new currentStartOrder). It's needed
// to avoid flakiness in SQLMetadataRuleManagerTest.
// See https://github.com/apache/incubator-druid/issues/6028
synchronized (lock) {
if (localStartedOrder == currentStartOrder) {
poll();
}
}
}
catch (Exception e) {
log.error(e, "uncaught exception in rule manager polling thread");
Expand All @@ -204,8 +218,6 @@ public void run()
config.getPollDuration().toStandardDuration().getMillis(),
TimeUnit.MILLISECONDS
);

started = true;
}
}

Expand All @@ -214,15 +226,12 @@ public void run()
public void stop()
{
synchronized (lock) {
if (!started) {
if (currentStartOrder == -1) {
return;
}

rules.set(ImmutableMap.<String, List<Rule>>of());

future.cancel(false);
future = null;
started = false;
rules.set(ImmutableMap.of());
currentStartOrder = -1;
// This call cancels the periodic poll() task, scheduled in start().
exec.shutdownNow();
exec = null;
}
Expand Down