Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 47 additions & 36 deletions server/src/main/java/io/druid/metadata/SQLMetadataRuleManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,11 @@
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningScheduledExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.inject.Inject;
import com.metamx.common.Pair;
import com.metamx.common.concurrent.ScheduledExecutors;
import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.common.logger.Logger;
Expand All @@ -41,7 +43,6 @@
import io.druid.server.coordinator.rules.ForeverLoadRule;
import io.druid.server.coordinator.rules.Rule;
import org.joda.time.DateTime;
import org.joda.time.Duration;
import org.skife.jdbi.v2.FoldController;
import org.skife.jdbi.v2.Folder3;
import org.skife.jdbi.v2.Handle;
Expand All @@ -58,7 +59,7 @@
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

/**
Expand Down Expand Up @@ -134,12 +135,13 @@ public Void withHandle(Handle handle) throws Exception
private final AtomicReference<ImmutableMap<String, List<Rule>>> rules;
private final AuditManager auditManager;

private volatile ScheduledExecutorService exec;

private final Object lock = new Object();

private volatile boolean started = false;

private volatile ListeningScheduledExecutorService exec = null;
private volatile ListenableFuture<?> future = null;

@Inject
public SQLMetadataRuleManager(
@Json ObjectMapper jsonMapper,
Expand Down Expand Up @@ -168,21 +170,26 @@ public void start()
return;
}

this.exec = Execs.scheduledSingleThreaded("DatabaseRuleManager-Exec--%d");
exec = MoreExecutors.listeningDecorator(Execs.scheduledSingleThreaded("DatabaseRuleManager-Exec--%d"));

createDefaultRule(dbi, getRulesTable(), config.get().getDefaultRule(), jsonMapper);
ScheduledExecutors.scheduleWithFixedDelay(
exec,
new Duration(0),
config.get().getPollDuration().toStandardDuration(),
future = exec.scheduleWithFixedDelay(
new Runnable()
{
@Override
public void run()
{
poll();
try {
poll();
}
catch (Exception e) {
log.error(e, "uncaught exception in rule manager polling thread");
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this emit an alert?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe this is just replicating the existing behavior, which was handled within ScheduledExecutors.scheduleWithFixedDelay

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nothing against having an alert though

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think there should be an alert here without a retry strategy. An acute network hiccup should not cause an alert if it is simply recoverable.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I filed #1380 to track the request

}
}
}
},
0,
config.get().getPollDuration().toStandardDuration().getMillis(),
TimeUnit.MILLISECONDS
);

started = true;
Expand All @@ -199,6 +206,8 @@ public void stop()

rules.set(ImmutableMap.<String, List<Rule>>of());

future.cancel(false);
future = null;
started = false;
exec.shutdownNow();
exec = null;
Expand Down Expand Up @@ -235,7 +244,9 @@ public Pair<String, List<Rule>> map(int index, ResultSet r, StatementContext ctx
return Pair.of(
r.getString("dataSource"),
jsonMapper.<List<Rule>>readValue(
r.getBytes("payload"), new TypeReference<List<Rule>>(){}
r.getBytes("payload"), new TypeReference<List<Rule>>()
{
}
)
);
}
Expand All @@ -245,29 +256,29 @@ public Pair<String, List<Rule>> map(int index, ResultSet r, StatementContext ctx
}
}
)
.fold(
Maps.<String, List<Rule>>newHashMap(),
new Folder3<Map<String, List<Rule>>, Pair<String, List<Rule>>>()
{
@Override
public Map<String, List<Rule>> fold(
Map<String, List<Rule>> retVal,
Pair<String, List<Rule>> stringObjectMap,
FoldController foldController,
StatementContext statementContext
) throws SQLException
{
try {
String dataSource = stringObjectMap.lhs;
retVal.put(dataSource, stringObjectMap.rhs);
return retVal;
}
catch (Exception e) {
throw Throwables.propagate(e);
}
}
}
);
.fold(
Maps.<String, List<Rule>>newHashMap(),
new Folder3<Map<String, List<Rule>>, Pair<String, List<Rule>>>()
{
@Override
public Map<String, List<Rule>> fold(
Map<String, List<Rule>> retVal,
Pair<String, List<Rule>> stringObjectMap,
FoldController foldController,
StatementContext statementContext
) throws SQLException
{
try {
String dataSource = stringObjectMap.lhs;
retVal.put(dataSource, stringObjectMap.rhs);
return retVal;
}
catch (Exception e) {
throw Throwables.propagate(e);
}
}
}
);
}
}
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,11 @@
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Ordering;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningScheduledExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.inject.Inject;
import com.metamx.common.MapUtils;
import com.metamx.common.concurrent.ScheduledExecutors;
import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.common.logger.Logger;
Expand Down Expand Up @@ -56,7 +58,7 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

/**
Expand All @@ -74,7 +76,8 @@ public class SQLMetadataSegmentManager implements MetadataSegmentManager
private final AtomicReference<ConcurrentHashMap<String, DruidDataSource>> dataSources;
private final IDBI dbi;

private volatile ScheduledExecutorService exec;
private volatile ListeningScheduledExecutorService exec = null;
private volatile ListenableFuture<?> future = null;

private volatile boolean started = false;

Expand Down Expand Up @@ -103,23 +106,27 @@ public void start()
return;
}

this.exec = Execs.scheduledSingleThreaded("DatabaseSegmentManager-Exec--%d");
exec = MoreExecutors.listeningDecorator(Execs.scheduledSingleThreaded("DatabaseSegmentManager-Exec--%d"));

final Duration delay = config.get().getPollDuration().toStandardDuration();
ScheduledExecutors.scheduleWithFixedDelay(
exec,
new Duration(0),
delay,
future = exec.scheduleWithFixedDelay(
new Runnable()
{
@Override
public void run()
{
poll();
try {
poll();
}
catch (Exception e) {
log.error(e, "uncaught exception in segment manager polling thread");
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just realized this error will never get logged, SQLMetadataSegmentManager.poll() already logs and swallows all exceptions

}
}
}
},
0,
delay.getMillis(),
TimeUnit.MILLISECONDS
);

started = true;
}
}
Expand All @@ -134,6 +141,8 @@ public void stop()

started = false;
dataSources.set(new ConcurrentHashMap<String, DruidDataSource>());
future.cancel(false);
future = null;
exec.shutdownNow();
exec = null;
}
Expand Down