Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 35 additions & 15 deletions server/src/main/java/io/druid/client/HttpServerInventoryView.java
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
import java.io.IOException;
import java.io.InputStream;
import java.net.URL;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
Expand Down Expand Up @@ -104,10 +105,10 @@ public class HttpServerInventoryView implements ServerInventoryView, FilteredSer

private volatile ExecutorService executor;

// a queue of queryable server names for which worker threads in executor initiate the segment list call i.e.
// DruidServerHolder.updateSegmentsListAsync(..) which updates the segment list asynchronously and adds itself
// to this queue again for next update.
private final BlockingQueue<String> queue = new LinkedBlockingDeque<>();
// the work queue, all items in this are sequentially processed by main thread setup in start()
// used to call inventoryInitialized on all SegmentCallbacks and
// for keeping segment list for each queryable server uptodate.
private final BlockingQueue<Runnable> queue = new LinkedBlockingDeque<>();

private final HttpClient httpClient;
private final ObjectMapper smileMapper;
Expand Down Expand Up @@ -161,10 +162,7 @@ public void run()

while (!Thread.interrupted() && lifecycleLock.awaitStarted(1, TimeUnit.MILLISECONDS)) {
try {
DruidServerHolder holder = servers.get(queue.take());
if (holder != null) {
holder.updateSegmentsListAsync();
}
queue.take().run();
}
catch (InterruptedException ex) {
log.info("main thread interrupted, served segments list is not synced anymore.");
Expand All @@ -184,17 +182,27 @@ public void run()
druidNodeDiscovery.registerListener(
new DruidNodeDiscovery.Listener()
{
private volatile boolean initialized = false;

@Override
public void nodeAdded(DiscoveryDruidNode node)
public void nodesAdded(List<DiscoveryDruidNode> nodes)
{
serverAddedOrUpdated(toDruidServer(node));
nodes.forEach(
node -> serverAddedOrUpdated(toDruidServer(node))
);

if (!initialized) {
initialized = true;
queue.add(HttpServerInventoryView.this::serverInventoryInitialized);
}
}

@Override
public void nodeRemoved(DiscoveryDruidNode node)
public void nodesRemoved(List<DiscoveryDruidNode> nodes)
{
serverRemoved(toDruidServer(node));
nodes.forEach(
node -> serverRemoved(toDruidServer(node))
);
}

private DruidServer toDruidServer(DiscoveryDruidNode node)
Expand Down Expand Up @@ -572,7 +580,7 @@ public void onSuccess(InputStream stream)
log.error(ex, "error processing segment list response from server [%s]", druidServer.getName());
}
finally {
queue.add(druidServer.getName());
addNextSyncToWorkQueue(druidServer.getName());
}
}

Expand Down Expand Up @@ -611,7 +619,7 @@ public void onFailure(Throwable t)
}
}
finally {
queue.add(druidServer.getName());
addNextSyncToWorkQueue(druidServer.getName());
}
}
},
Expand All @@ -621,7 +629,7 @@ public void onFailure(Throwable t)
return future;
}
catch (Throwable th) {
queue.add(druidServer.getName());
addNextSyncToWorkQueue(druidServer.getName());

String logMsg = StringUtils.nonStrictFormat(
"Fatal error while fetching segment list from server [%s].", druidServer.getName()
Expand All @@ -646,6 +654,18 @@ public void onFailure(Throwable t)
}
}

private void addNextSyncToWorkQueue(final String serverId)
{
queue.add(
() -> {
DruidServerHolder holder = servers.get(serverId);
if (holder != null) {
holder.updateSegmentsListAsync();
}
}
);
}

private boolean hasUnstabilityTimeoutPassed()
{
if (isUnstable && (System.currentTimeMillis() - unstableStartTime) > config.getServerUnstabilityTimeout()) {
Expand Down
33 changes: 0 additions & 33 deletions server/src/main/java/io/druid/client/selector/HostSelector.java

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.inject.Inject;
import io.druid.concurrent.Execs;
import io.druid.concurrent.LifecycleLock;
Expand Down Expand Up @@ -167,6 +168,8 @@ private static class NodeTypeWatcher implements DruidNodeDiscovery

private final Object lock = new Object();

private boolean cacheInitialized = false;

NodeTypeWatcher(
ExecutorService listenerExecutor,
CuratorFramework curatorFramework,
Expand Down Expand Up @@ -201,20 +204,14 @@ public Collection<DiscoveryDruidNode> getAllNodes()
public void registerListener(DruidNodeDiscovery.Listener listener)
{
synchronized (lock) {
for (DiscoveryDruidNode node : nodes.values()) {
listenerExecutor.submit(() -> {
try {
listener.nodeAdded(node);
}
catch (Exception ex) {
log.error(
ex,
"Exception occured in DiscoveryDruidNode.nodeAdded(node=[%s]) in listener [%s].",
node,
listener
);
}
});
if (cacheInitialized) {
ImmutableList<DiscoveryDruidNode> currNodes = ImmutableList.copyOf(nodes.values());
safeSchedule(
() -> {
listener.nodesAdded(currNodes);
},
"Exception occured in nodesAdded([%s]) in listener [%s].", currNodes, listener
);
}
nodeListeners.add(listener);
}
Expand Down Expand Up @@ -280,8 +277,30 @@ public void handleChildEvent(CuratorFramework client, PathChildrenCacheEvent eve

break;
}
case INITIALIZED: {
if (cacheInitialized) {
log.warn("cache is already initialized. ignoring [%s] event, nodeType [%s].", event.getType(), nodeType);
return;
}

log.info("Received INITIALIZED in node watcher for type [%s].", nodeType);

cacheInitialized = true;

ImmutableList<DiscoveryDruidNode> currNodes = ImmutableList.copyOf(nodes.values());
for (Listener l : nodeListeners) {
safeSchedule(
() -> {
l.nodesAdded(currNodes);
},
"Exception occured in nodesAdded([%s]) in listener [%s].", currNodes, l
);
}

break;
}
default: {
log.error("Ignored event type [%s] for nodeType [%s] watcher.", event.getType(), nodeType);
log.info("Ignored event type [%s] for nodeType [%s] watcher.", event.getType(), nodeType);
}
}
}
Expand All @@ -291,56 +310,59 @@ public void handleChildEvent(CuratorFramework client, PathChildrenCacheEvent eve
}
}

private void safeSchedule(
Runnable runnable,
String errMsgFormat, Object... args
)
{
listenerExecutor.submit(() -> {
try {
runnable.run();
}
catch (Exception ex) {
log.error(errMsgFormat, args);
}
});
}

private void addNode(DiscoveryDruidNode druidNode)
{
synchronized (lock) {
DiscoveryDruidNode prev = nodes.putIfAbsent(druidNode.getDruidNode().getHostAndPortToUse(), druidNode);
if (prev == null) {
for (DruidNodeDiscovery.Listener l : nodeListeners) {
listenerExecutor.submit(() -> {
try {
l.nodeAdded(druidNode);
}
catch (Exception ex) {
log.error(
ex,
"Exception occured in DiscoveryDruidNode.nodeAdded(node=[%s]) in listener [%s].",
druidNode,
l
);
}
});
DiscoveryDruidNode prev = nodes.putIfAbsent(druidNode.getDruidNode().getHostAndPortToUse(), druidNode);
if (prev == null) {
if (cacheInitialized) {
List<DiscoveryDruidNode> newNode = ImmutableList.of(druidNode);
for (Listener l : nodeListeners) {
safeSchedule(
() -> {
l.nodesAdded(newNode);
},
"Exception occured in nodeAdded(node=[%s]) in listener [%s].", druidNode, l
);
}
} else {
log.warn("Node[%s] discovered but existed already [%s].", druidNode, prev);
}
} else {
log.warn("Node[%s] discovered but existed already [%s].", druidNode, prev);
}
}

private void removeNode(DiscoveryDruidNode druidNode)
{
synchronized (lock) {
DiscoveryDruidNode prev = nodes.remove(druidNode.getDruidNode().getHostAndPortToUse());
DiscoveryDruidNode prev = nodes.remove(druidNode.getDruidNode().getHostAndPortToUse());

if (prev == null) {
log.warn("Noticed disappearance of unknown druid node [%s:%s].", druidNode.getNodeType(), druidNode);
return;
}
if (prev == null) {
log.warn("Noticed disappearance of unknown druid node [%s:%s].", druidNode.getNodeType(), druidNode);
return;
}

for (DruidNodeDiscovery.Listener l : nodeListeners) {
listenerExecutor.submit(() -> {
try {
l.nodeRemoved(druidNode);
}
catch (Exception ex) {
log.error(
ex,
"Exception occured in DiscoveryDruidNode.nodeRemoved(node=[%s]) in listener [%s].",
druidNode,
l
);
}
});
if (cacheInitialized) {
List<DiscoveryDruidNode> nodeRemoved = ImmutableList.of(druidNode);
for (Listener l : nodeListeners) {
safeSchedule(
() -> {
l.nodesRemoved(nodeRemoved);
},
"Exception occured in nodeRemoved(node=[%s]) in listener [%s].", druidNode, l
);
}
}
}
Expand All @@ -351,7 +373,7 @@ public void start()
cache.getListenable().addListener(
(client, event) -> handleChildEvent(client, event)
);
cache.start();
cache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
}
catch (Exception ex) {
throw Throwables.propagate(ex);
Expand Down
19 changes: 17 additions & 2 deletions server/src/main/java/io/druid/discovery/DruidNodeDiscovery.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package io.druid.discovery;

import java.util.Collection;
import java.util.List;

/**
* Interface for discovering Druid Nodes announced by DruidNodeAnnouncer.
Expand All @@ -29,9 +30,23 @@ public interface DruidNodeDiscovery
Collection<DiscoveryDruidNode> getAllNodes();
void registerListener(Listener listener);

/**
* Listener for watching nodes in a DruidNodeDiscovery instance obtained via DruidNodeDiscoveryProvider.getXXX().
* DruidNodeDiscovery implementation should assume that Listener is not threadsafe and never call methods in
* Listener concurrently.
*
* Implementation of Listener must ensure to not do any time consuming work or block in any of the methods.
*/
interface Listener
{
void nodeAdded(DiscoveryDruidNode node);
void nodeRemoved(DiscoveryDruidNode node);
/**
* List of nodes added.
* First call to this method is also a signal that underlying cache in the DruidNodeDiscovery implementation
* has been initialized.
* @param nodes
*/
void nodesAdded(List<DiscoveryDruidNode> nodes);

void nodesRemoved(List<DiscoveryDruidNode> nodes);
}
}
Loading