diff --git a/docs/content/configuration/index.md b/docs/content/configuration/index.md index cda566996cfc..5a91d556a897 100644 --- a/docs/content/configuration/index.md +++ b/docs/content/configuration/index.md @@ -1254,8 +1254,8 @@ These Historical configurations can be defined in the `historical/runtime.proper |`druid.segmentCache.dropSegmentDelayMillis`|How long a process delays before completely dropping segment.|30000 (30 seconds)| |`druid.segmentCache.infoDir`|Historical processes keep track of the segments they are serving so that when the process is restarted they can reload the same segments without waiting for the Coordinator to reassign. This path defines where this metadata is kept. Directory will be created if needed.|${first_location}/info_dir| |`druid.segmentCache.announceIntervalMillis`|How frequently to announce segments while segments are loading from cache. Set this value to zero to wait for all segments to be loaded before announcing.|5000 (5 seconds)| -|`druid.segmentCache.numLoadingThreads`|How many segments to drop or load concurrently from from deep storage.|10| -|`druid.segmentCache.numBootstrapThreads`|How many segments to load concurrently from local storage at startup.|Same as numLoadingThreads| +|`druid.segmentCache.numLoadingThreads`|How many segments to drop or load concurrently from deep storage. Note that the work of loading segments involves downloading segments from deep storage, decompressing them and loading them to a memory mapped location. So the work is not all I/O Bound. Depending on CPU and network load, one could possibly increase this config to a higher value.|Number of cores| +|`druid.coordinator.loadqueuepeon.curator.numCallbackThreads`|Number of threads for executing callback actions associated with loading or dropping of segments. One might want to increase this number when noticing clusters are lagging behind w.r.t. balancing segments across historical nodes.|2| In `druid.segmentCache.locations`, *freeSpacePercent* was added because *maxSize* setting is only a theoretical limit and assumes that much space will always be available for storing segments. In case of any druid bug leading to unaccounted segment files left alone on disk or some other process writing stuff to disk, This check can start failing segment loading early before filling up the disk completely and leaving the host usable otherwise. diff --git a/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderConfig.java b/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderConfig.java index b09bd0c5bea8..80f0fbc9fdf4 100644 --- a/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderConfig.java +++ b/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderConfig.java @@ -22,6 +22,7 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.collect.Lists; import org.apache.druid.java.util.common.ISE; +import org.apache.druid.utils.JvmUtils; import org.hibernate.validator.constraints.NotEmpty; import java.io.File; @@ -46,7 +47,7 @@ public class SegmentLoaderConfig private int announceIntervalMillis = 0; // do not background announce @JsonProperty("numLoadingThreads") - private int numLoadingThreads = 10; + private int numLoadingThreads = JvmUtils.getRuntimeInfo().getAvailableProcessors(); @JsonProperty("numBootstrapThreads") private Integer numBootstrapThreads = null; diff --git a/server/src/main/java/org/apache/druid/server/coordination/ZkCoordinator.java b/server/src/main/java/org/apache/druid/server/coordination/ZkCoordinator.java index fcf86b322e67..ca56b10024e7 100644 --- a/server/src/main/java/org/apache/druid/server/coordination/ZkCoordinator.java +++ b/server/src/main/java/org/apache/druid/server/coordination/ZkCoordinator.java @@ -32,9 +32,11 @@ import org.apache.druid.java.util.common.lifecycle.LifecycleStart; import org.apache.druid.java.util.common.lifecycle.LifecycleStop; import org.apache.druid.java.util.emitter.EmittingLogger; +import org.apache.druid.segment.loading.SegmentLoaderConfig; import org.apache.druid.server.initialization.ZkPathsConfig; import java.io.IOException; +import java.util.concurrent.ExecutorService; /** * Use {@link org.apache.druid.server.coordinator.HttpLoadQueuePeon} for segment load/drops. @@ -54,6 +56,7 @@ public class ZkCoordinator private volatile PathChildrenCache loadQueueCache; private volatile boolean started = false; + private final ExecutorService segmentLoadUnloadService; @Inject public ZkCoordinator( @@ -61,7 +64,8 @@ public ZkCoordinator( ObjectMapper jsonMapper, ZkPathsConfig zkPaths, DruidServerMetadata me, - CuratorFramework curator + CuratorFramework curator, + SegmentLoaderConfig config ) { this.dataSegmentChangeHandler = loadDropHandler; @@ -69,6 +73,10 @@ public ZkCoordinator( this.zkPaths = zkPaths; this.me = me; this.curator = curator; + this.segmentLoadUnloadService = Execs.multiThreaded( + config.getNumLoadingThreads(), + "ZKCoordinator--%d" + ); } @LifecycleStart @@ -102,63 +110,12 @@ public void start() throws IOException new PathChildrenCacheListener() { @Override - public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception + public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) { final ChildData child = event.getData(); switch (event.getType()) { case CHILD_ADDED: - final String path = child.getPath(); - final DataSegmentChangeRequest request = jsonMapper.readValue( - child.getData(), DataSegmentChangeRequest.class - ); - - log.info("New request[%s] with zNode[%s].", request.asString(), path); - - try { - request.go( - dataSegmentChangeHandler, - new DataSegmentChangeCallback() - { - boolean hasRun = false; - - @Override - public void execute() - { - try { - if (!hasRun) { - curator.delete().guaranteed().forPath(path); - log.info("Completed request [%s]", request.asString()); - hasRun = true; - } - } - catch (Exception e) { - try { - curator.delete().guaranteed().forPath(path); - } - catch (Exception e1) { - log.error(e1, "Failed to delete zNode[%s], but ignoring exception.", path); - } - log.error(e, "Exception while removing zNode[%s]", path); - throw new RuntimeException(e); - } - } - } - ); - } - catch (Exception e) { - try { - curator.delete().guaranteed().forPath(path); - } - catch (Exception e1) { - log.error(e1, "Failed to delete zNode[%s], but ignoring exception.", path); - } - - log.makeAlert(e, "Segment load/unload: uncaught exception.") - .addData("node", path) - .addData("nodeProperties", request) - .emit(); - } - + childAdded(child); break; case CHILD_REMOVED: log.info("zNode[%s] was removed", event.getData().getPath()); @@ -168,6 +125,7 @@ public void execute() } } } + ); loadQueueCache.start(); } @@ -180,6 +138,59 @@ public void execute() } } + private void childAdded(ChildData child) + { + segmentLoadUnloadService.submit(() -> { + final String path = child.getPath(); + DataSegmentChangeRequest request = new SegmentChangeRequestNoop(); + try { + final DataSegmentChangeRequest finalRequest = jsonMapper.readValue( + child.getData(), + DataSegmentChangeRequest.class + ); + + finalRequest.go( + dataSegmentChangeHandler, + new DataSegmentChangeCallback() + { + @Override + public void execute() + { + try { + curator.delete().guaranteed().forPath(path); + log.info("Completed request [%s]", finalRequest.asString()); + } + catch (Exception e) { + try { + curator.delete().guaranteed().forPath(path); + } + catch (Exception e1) { + log.error(e1, "Failed to delete zNode[%s], but ignoring exception.", path); + } + log.error(e, "Exception while removing zNode[%s]", path); + throw new RuntimeException(e); + } + } + } + ); + } + catch (Exception e) { + // Something went wrong in either deserializing the request using jsonMapper or when invoking it + try { + curator.delete().guaranteed().forPath(path); + } + catch (Exception e1) { + log.error(e1, "Failed to delete zNode[%s], but ignoring exception.", path); + } + + log.makeAlert(e, "Segment load/unload: uncaught exception.") + .addData("node", path) + .addData("nodeProperties", request) + .emit(); + } + }); + } + @LifecycleStop public void stop() { diff --git a/server/src/main/java/org/apache/druid/server/coordinator/CuratorLoadQueuePeon.java b/server/src/main/java/org/apache/druid/server/coordinator/CuratorLoadQueuePeon.java index a6b10bc345c4..a4d5d948cf07 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/CuratorLoadQueuePeon.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/CuratorLoadQueuePeon.java @@ -21,36 +21,46 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableList; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.api.CuratorWatcher; import org.apache.curator.utils.ZKPaths; import org.apache.druid.java.util.common.ISE; -import org.apache.druid.java.util.common.concurrent.ScheduledExecutors; import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.server.coordination.DataSegmentChangeRequest; import org.apache.druid.server.coordination.SegmentChangeRequestDrop; import org.apache.druid.server.coordination.SegmentChangeRequestLoad; import org.apache.druid.server.coordination.SegmentChangeRequestNoop; import org.apache.druid.timeline.DataSegment; -import org.apache.druid.timeline.SegmentId; import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.data.Stat; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.Iterator; import java.util.List; import java.util.Set; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.ConcurrentSkipListSet; import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; /** * Use {@link HttpLoadQueuePeon} instead. + *

+ * Objects of this class can be accessed by multiple threads. State wise, this class + * is thread safe and callers of the public methods can expect thread safe behavior. + * Though, like a typical object being accessed by multiple threads, + * callers shouldn't expect strict consistency in results between two calls + * of the same or different methods. */ @Deprecated public class CuratorLoadQueuePeon extends LoadQueuePeon @@ -59,40 +69,48 @@ public class CuratorLoadQueuePeon extends LoadQueuePeon private static final int DROP = 0; private static final int LOAD = 1; - private static void executeCallbacks(List callbacks) - { - for (LoadPeonCallback callback : callbacks) { - if (callback != null) { - callback.execute(); - } - } - } - private final CuratorFramework curator; private final String basePath; private final ObjectMapper jsonMapper; private final ScheduledExecutorService processingExecutor; + + /** + * Threadpool with daemon threads that execute callback actions associated + * with loading or dropping segments. + */ private final ExecutorService callBackExecutor; private final DruidCoordinatorConfig config; private final AtomicLong queuedSize = new AtomicLong(0); private final AtomicInteger failedAssignCount = new AtomicInteger(0); + /** + * Needs to be thread safe since it can be concurrently accessed via + * {@link #loadSegment(DataSegment, LoadPeonCallback)}, {@link #actionCompleted(SegmentHolder)}, + * {@link #getSegmentsToLoad()} and {@link #stop()} + */ private final ConcurrentSkipListMap segmentsToLoad = new ConcurrentSkipListMap<>( DruidCoordinator.SEGMENT_COMPARATOR_RECENT_FIRST ); + + /** + * Needs to be thread safe since it can be concurrently accessed via + * {@link #dropSegment(DataSegment, LoadPeonCallback)}, {@link #actionCompleted(SegmentHolder)}, + * {@link #getSegmentsToDrop()} and {@link #stop()} + */ private final ConcurrentSkipListMap segmentsToDrop = new ConcurrentSkipListMap<>( DruidCoordinator.SEGMENT_COMPARATOR_RECENT_FIRST ); + + /** + * Needs to be thread safe since it can be concurrently accessed via + * {@link #markSegmentToDrop(DataSegment)}}, {@link #unmarkSegmentToDrop(DataSegment)}} + * and {@link #getSegmentsToDrop()} + */ private final ConcurrentSkipListSet segmentsMarkedToDrop = new ConcurrentSkipListSet<>( DruidCoordinator.SEGMENT_COMPARATOR_RECENT_FIRST ); - private final Object lock = new Object(); - - private volatile SegmentHolder currentlyProcessing = null; - private boolean stopped = false; - CuratorLoadQueuePeon( CuratorFramework curator, String basePath, @@ -150,61 +168,30 @@ public int getNumberOfSegmentsInQueue() } @Override - public void loadSegment(final DataSegment segment, final LoadPeonCallback callback) + public void loadSegment(final DataSegment segment, @Nullable final LoadPeonCallback callback) { - synchronized (lock) { - if ((currentlyProcessing != null) && - currentlyProcessing.getSegmentId().equals(segment.getId())) { - if (callback != null) { - currentlyProcessing.addCallback(callback); - } - return; - } - } - - synchronized (lock) { - final SegmentHolder existingHolder = segmentsToLoad.get(segment); - if (existingHolder != null) { - if ((callback != null)) { - existingHolder.addCallback(callback); - } - return; - } + SegmentHolder segmentHolder = new SegmentHolder(segment, LOAD, Collections.singletonList(callback)); + final SegmentHolder existingHolder = segmentsToLoad.putIfAbsent(segment, segmentHolder); + if (existingHolder != null) { + existingHolder.addCallback(callback); + return; } - log.debug("Asking server peon[%s] to load segment[%s]", basePath, segment.getId()); queuedSize.addAndGet(segment.getSize()); - segmentsToLoad.put(segment, new SegmentHolder(segment, LOAD, Collections.singletonList(callback))); + processingExecutor.submit(new SegmentChangeProcessor(segmentHolder)); } @Override - public void dropSegment( - final DataSegment segment, - final LoadPeonCallback callback - ) + public void dropSegment(final DataSegment segment, @Nullable final LoadPeonCallback callback) { - synchronized (lock) { - if ((currentlyProcessing != null) && - currentlyProcessing.getSegmentId().equals(segment.getId())) { - if (callback != null) { - currentlyProcessing.addCallback(callback); - } - return; - } - } - - synchronized (lock) { - final SegmentHolder existingHolder = segmentsToDrop.get(segment); - if (existingHolder != null) { - if (callback != null) { - existingHolder.addCallback(callback); - } - return; - } + SegmentHolder segmentHolder = new SegmentHolder(segment, DROP, Collections.singletonList(callback)); + final SegmentHolder existingHolder = segmentsToDrop.putIfAbsent(segment, segmentHolder); + if (existingHolder != null) { + existingHolder.addCallback(callback); + return; } - log.debug("Asking server peon[%s] to drop segment[%s]", basePath, segment.getId()); - segmentsToDrop.put(segment, new SegmentHolder(segment, DROP, Collections.singletonList(callback))); + processingExecutor.submit(new SegmentChangeProcessor(segmentHolder)); } @Override @@ -219,206 +206,198 @@ public void unmarkSegmentToDrop(DataSegment dataSegment) segmentsMarkedToDrop.remove(dataSegment); } - private void processSegmentChangeRequest() + private class SegmentChangeProcessor implements Runnable { - if (currentlyProcessing != null) { - log.debug( - "Server[%s] skipping processSegmentChangeRequest because something is currently loading[%s].", - basePath, - currentlyProcessing.getSegmentId() - ); + private final SegmentHolder segmentHolder; - return; + private SegmentChangeProcessor(SegmentHolder segmentHolder) + { + this.segmentHolder = segmentHolder; } - if (!segmentsToDrop.isEmpty()) { - currentlyProcessing = segmentsToDrop.firstEntry().getValue(); - log.debug("Server[%s] dropping [%s]", basePath, currentlyProcessing.getSegmentId()); - } else if (!segmentsToLoad.isEmpty()) { - currentlyProcessing = segmentsToLoad.firstEntry().getValue(); - log.debug("Server[%s] loading [%s]", basePath, currentlyProcessing.getSegmentId()); - } else { - return; + @Override + public void run() + { + try { + final String path = ZKPaths.makePath(basePath, segmentHolder.getSegmentIdentifier()); + final byte[] payload = jsonMapper.writeValueAsBytes(segmentHolder.getChangeRequest()); + curator.create().withMode(CreateMode.EPHEMERAL).forPath(path, payload); + log.debug( + "ZKNode created for server to [%s] %s [%s]", + basePath, + segmentHolder.getType() == LOAD ? "load" : "drop", + segmentHolder.getSegmentIdentifier() + ); + final ScheduledFuture nodeDeletedCheck = scheduleNodeDeletedCheck(path); + final Stat stat = curator.checkExists().usingWatcher( + (CuratorWatcher) watchedEvent -> { + switch (watchedEvent.getType()) { + case NodeDeleted: + // Cancel the check node deleted task since we have already + // been notified by the zk watcher + nodeDeletedCheck.cancel(true); + entryRemoved(segmentHolder, watchedEvent.getPath()); + break; + default: + // do nothing + } + } + ).forPath(path); + + if (stat == null) { + final byte[] noopPayload = jsonMapper.writeValueAsBytes(new SegmentChangeRequestNoop()); + + // Create a node and then delete it to remove the registered watcher. This is a work-around for + // a zookeeper race condition. Specifically, when you set a watcher, it fires on the next event + // that happens for that node. If no events happen, the watcher stays registered foreverz. + // Couple that with the fact that you cannot set a watcher when you create a node, but what we + // want is to create a node and then watch for it to get deleted. The solution is that you *can* + // set a watcher when you check to see if it exists so, we first create the node and then set a + // watcher on its existence. However, if already does not exist by the time the existence check + // returns, then the watcher that was set will never fire (nobody will ever create the node + // again) and thus lead to a slow, but real, memory leak. So, we create another node to cause + // that watcher to fire and delete it right away. + // + // We do not create the existence watcher first, because then it will fire when we create the + // node and we'll have the same race when trying to refresh that watcher. + curator.create().withMode(CreateMode.EPHEMERAL).forPath(path, noopPayload); + entryRemoved(segmentHolder, path); + } + } + catch (KeeperException.NodeExistsException ne) { + // This is expected when historicals haven't yet picked up processing this segment and coordinator + // tries reassigning it to the same node. + log.warn(ne, "ZK node already exists because segment change request hasn't yet been processed"); + failAssign(segmentHolder); + } + catch (Exception e) { + failAssign(segmentHolder, e); + } } - try { - final String path = ZKPaths.makePath(basePath, currentlyProcessing.getSegmentId().toString()); - final byte[] payload = jsonMapper.writeValueAsBytes(currentlyProcessing.getChangeRequest()); - curator.create().withMode(CreateMode.EPHEMERAL).forPath(path, payload); - - processingExecutor.schedule( + @Nonnull + private ScheduledFuture scheduleNodeDeletedCheck(String path) + { + return processingExecutor.schedule( () -> { try { if (curator.checkExists().forPath(path) != null) { - failAssign(new ISE("%s was never removed! Failing this operation!", path)); + failAssign(segmentHolder, new ISE("%s was never removed! Failing this operation!", path)); + } else { + log.debug("%s detected to be removed. ", path); } } catch (Exception e) { - failAssign(e); + log.error(e, "Exception caught and ignored when checking whether zk node was deleted"); + failAssign(segmentHolder, e); } }, config.getLoadTimeoutDelay().getMillis(), TimeUnit.MILLISECONDS ); - - final Stat stat = curator.checkExists().usingWatcher( - (CuratorWatcher) watchedEvent -> { - switch (watchedEvent.getType()) { - case NodeDeleted: - entryRemoved(watchedEvent.getPath()); - break; - default: - // do nothing - } - } - ).forPath(path); - - if (stat == null) { - final byte[] noopPayload = jsonMapper.writeValueAsBytes(new SegmentChangeRequestNoop()); - - // Create a node and then delete it to remove the registered watcher. This is a work-around for - // a zookeeper race condition. Specifically, when you set a watcher, it fires on the next event - // that happens for that node. If no events happen, the watcher stays registered foreverz. - // Couple that with the fact that you cannot set a watcher when you create a node, but what we - // want is to create a node and then watch for it to get deleted. The solution is that you *can* - // set a watcher when you check to see if it exists so, we first create the node and then set a - // watcher on its existence. However, if already does not exist by the time the existence check - // returns, then the watcher that was set will never fire (nobody will ever create the node - // again) and thus lead to a slow, but real, memory leak. So, we create another node to cause - // that watcher to fire and delete it right away. - // - // We do not create the existence watcher first, because then it will fire when we create the - // node and we'll have the same race when trying to refresh that watcher. - curator.create().withMode(CreateMode.EPHEMERAL).forPath(path, noopPayload); - - entryRemoved(path); - } - } - catch (Exception e) { - failAssign(e); } } - private void actionCompleted() + private void actionCompleted(SegmentHolder segmentHolder) { - if (currentlyProcessing != null) { - switch (currentlyProcessing.getType()) { - case LOAD: - segmentsToLoad.remove(currentlyProcessing.getSegment()); - queuedSize.addAndGet(-currentlyProcessing.getSegmentSize()); - break; - case DROP: - segmentsToDrop.remove(currentlyProcessing.getSegment()); - break; - default: - throw new UnsupportedOperationException(); - } - - final List callbacks = currentlyProcessing.getCallbacks(); - currentlyProcessing = null; - callBackExecutor.execute( - () -> executeCallbacks(callbacks) - ); + switch (segmentHolder.getType()) { + case LOAD: + segmentsToLoad.remove(segmentHolder.getSegment()); + queuedSize.addAndGet(-segmentHolder.getSegmentSize()); + break; + case DROP: + segmentsToDrop.remove(segmentHolder.getSegment()); + break; + default: + throw new UnsupportedOperationException(); } + executeCallbacks(segmentHolder); } + @Override public void start() - { - ScheduledExecutors.scheduleAtFixedRate( - processingExecutor, - config.getLoadQueuePeonRepeatDelay(), - config.getLoadQueuePeonRepeatDelay(), - () -> { - processSegmentChangeRequest(); - - if (stopped) { - return ScheduledExecutors.Signal.STOP; - } else { - return ScheduledExecutors.Signal.REPEAT; - } - } - ); - } + { } @Override public void stop() { - synchronized (lock) { - if (currentlyProcessing != null) { - executeCallbacks(currentlyProcessing.getCallbacks()); - currentlyProcessing = null; - } - - if (!segmentsToDrop.isEmpty()) { - for (SegmentHolder holder : segmentsToDrop.values()) { - executeCallbacks(holder.getCallbacks()); - } - } - segmentsToDrop.clear(); - - if (!segmentsToLoad.isEmpty()) { - for (SegmentHolder holder : segmentsToLoad.values()) { - executeCallbacks(holder.getCallbacks()); - } - } - segmentsToLoad.clear(); + for (SegmentHolder holder : segmentsToDrop.values()) { + executeCallbacks(holder); + } + segmentsToDrop.clear(); - queuedSize.set(0L); - failedAssignCount.set(0); - stopped = true; + for (SegmentHolder holder : segmentsToLoad.values()) { + executeCallbacks(holder); } + segmentsToLoad.clear(); + + queuedSize.set(0L); + failedAssignCount.set(0); + processingExecutor.shutdown(); + callBackExecutor.shutdown(); } - private void entryRemoved(String path) + private void entryRemoved(SegmentHolder segmentHolder, String path) { - synchronized (lock) { - if (currentlyProcessing == null) { - log.warn("Server[%s] an entry[%s] was removed even though it wasn't loading!?", basePath, path); - return; - } - if (!ZKPaths.getNodeFromPath(path).equals(currentlyProcessing.getSegmentId().toString())) { - log.warn( - "Server[%s] entry [%s] was removed even though it's not what is currently loading[%s]", - basePath, path, currentlyProcessing - ); - return; - } - log.debug( - "Server[%s] done processing %s of segment [%s]", - basePath, - currentlyProcessing.getType() == LOAD ? "load" : "drop", - path + if (!ZKPaths.getNodeFromPath(path).equals(segmentHolder.getSegmentIdentifier())) { + log.warn( + "Server[%s] entry [%s] was removed even though it's not what is currently loading[%s]", + basePath, path, segmentHolder ); - actionCompleted(); + return; } + actionCompleted(segmentHolder); + log.debug( + "Server[%s] done processing %s of segment [%s]", + basePath, + segmentHolder.getType() == LOAD ? "load" : "drop", + path + ); } - private void failAssign(Exception e) + private void failAssign(SegmentHolder segmentHolder) { - synchronized (lock) { - log.error(e, "Server[%s], throwable caught when submitting [%s].", basePath, currentlyProcessing); - failedAssignCount.getAndIncrement(); - // Act like it was completed so that the coordinator gives it to someone else - actionCompleted(); + failAssign(segmentHolder, null); + } + + private void failAssign(SegmentHolder segmentHolder, Exception e) + { + if (e != null) { + log.error(e, "Server[%s], throwable caught when submitting [%s].", basePath, segmentHolder); } + failedAssignCount.getAndIncrement(); + // Act like it was completed so that the coordinator gives it to someone else + actionCompleted(segmentHolder); } + private static class SegmentHolder { private final DataSegment segment; private final DataSegmentChangeRequest changeRequest; private final int type; + // Guaranteed to store only non-null elements private final List callbacks = new ArrayList<>(); - private SegmentHolder(DataSegment segment, int type, Collection callbacks) + private SegmentHolder( + DataSegment segment, + int type, + Collection callbacksParam + ) { this.segment = segment; this.type = type; this.changeRequest = (type == LOAD) ? new SegmentChangeRequestLoad(segment) : new SegmentChangeRequestDrop(segment); - this.callbacks.addAll(callbacks); + Iterator itr = callbacksParam.iterator(); + while (itr.hasNext()) { + LoadPeonCallback c = itr.next(); + if (c != null) { + callbacks.add(c); + } + } } public DataSegment getSegment() @@ -431,9 +410,9 @@ public int getType() return type; } - public SegmentId getSegmentId() + public String getSegmentIdentifier() { - return segment.getId(); + return segment.getId().toString(); } public long getSegmentSize() @@ -441,24 +420,20 @@ public long getSegmentSize() return segment.getSize(); } - public void addCallbacks(Collection newCallbacks) + public void addCallback(@Nullable LoadPeonCallback newCallback) { - synchronized (callbacks) { - callbacks.addAll(newCallbacks); - } - } - - public void addCallback(LoadPeonCallback newCallback) - { - synchronized (callbacks) { - callbacks.add(newCallback); + if (newCallback != null) { + synchronized (callbacks) { + callbacks.add(newCallback); + } } } - public List getCallbacks() + List snapshotCallbacks() { synchronized (callbacks) { - return callbacks; + // Return an immutable copy so that callers don't have to worry about concurrent modification + return ImmutableList.copyOf(callbacks); } } @@ -473,4 +448,11 @@ public String toString() return changeRequest.toString(); } } + + private void executeCallbacks(SegmentHolder holder) + { + for (LoadPeonCallback callback : holder.snapshotCallbacks()) { + callBackExecutor.submit(() -> callback.execute()); + } + } } diff --git a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinatorConfig.java b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinatorConfig.java index 32eb87c1f675..b4adebe851c9 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinatorConfig.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinatorConfig.java @@ -75,6 +75,12 @@ public String getLoadQueuePeonType() return "curator"; } + @Config("druid.coordinator.curator.loadqueuepeon.numCallbackThreads") + public int getNumCuratorCallBackThreads() + { + return 2; + } + @Config("druid.coordinator.loadqueuepeon.http.repeatDelay") public Duration getHttpLoadQueuePeonRepeatDelay() { diff --git a/server/src/test/java/org/apache/druid/server/coordination/ZkCoordinatorTest.java b/server/src/test/java/org/apache/druid/server/coordination/ZkCoordinatorTest.java index 1be65cb86ac8..9bdd84ed70fb 100644 --- a/server/src/test/java/org/apache/druid/server/coordination/ZkCoordinatorTest.java +++ b/server/src/test/java/org/apache/druid/server/coordination/ZkCoordinatorTest.java @@ -133,7 +133,8 @@ public void removeSegment(DataSegment s, DataSegmentChangeCallback callback) jsonMapper, zkPaths, me, - curator + curator, + new SegmentLoaderConfig() ); zkCoordinator.start(); diff --git a/server/src/test/java/org/apache/druid/server/coordinator/CuratorDruidCoordinatorTest.java b/server/src/test/java/org/apache/druid/server/coordinator/CuratorDruidCoordinatorTest.java index 25dce8cf577f..3b2223514f59 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/CuratorDruidCoordinatorTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/CuratorDruidCoordinatorTest.java @@ -162,7 +162,8 @@ public void setUp() throws Exception null, false, false, - new Duration("PT0s") + new Duration("PT0s"), + Duration.millis(10) ); sourceLoadQueueChildrenCache = new PathChildrenCache( curator, diff --git a/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java b/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java index 1f4f7e2844b2..793bd287b950 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java @@ -139,7 +139,8 @@ public void setUp() throws Exception null, false, false, - new Duration("PT0s") + new Duration("PT0s"), + Duration.millis(10) ); pathChildrenCache = new PathChildrenCache( curator, diff --git a/server/src/test/java/org/apache/druid/server/coordinator/HttpLoadQueuePeonTest.java b/server/src/test/java/org/apache/druid/server/coordinator/HttpLoadQueuePeonTest.java index 0f73695c167f..894472a2cbcc 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/HttpLoadQueuePeonTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/HttpLoadQueuePeonTest.java @@ -84,7 +84,8 @@ public class HttpLoadQueuePeonTest null, false, false, - Duration.ZERO + Duration.ZERO, + Duration.millis(10) ) { @Override diff --git a/server/src/test/java/org/apache/druid/server/coordinator/LoadQueuePeonTest.java b/server/src/test/java/org/apache/druid/server/coordinator/LoadQueuePeonTest.java index 437b2e47cea7..8d8271dab1dc 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/LoadQueuePeonTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/LoadQueuePeonTest.java @@ -39,6 +39,7 @@ import org.apache.druid.server.coordination.SegmentChangeRequestDrop; import org.apache.druid.server.coordination.SegmentChangeRequestLoad; import org.apache.druid.timeline.DataSegment; +import org.apache.druid.timeline.SegmentId; import org.apache.druid.timeline.partition.NoneShardSpec; import org.joda.time.Duration; import org.junit.After; @@ -47,8 +48,10 @@ import org.junit.Test; import java.util.List; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.TimeUnit; public class LoadQueuePeonTest extends CuratorTestBase { @@ -79,46 +82,34 @@ public void setUp() throws Exception @Test public void testMultipleLoadDropSegments() throws Exception { - final AtomicInteger requestSignalIdx = new AtomicInteger(0); - final AtomicInteger segmentSignalIdx = new AtomicInteger(0); - loadQueuePeon = new CuratorLoadQueuePeon( curator, LOAD_QUEUE_PATH, jsonMapper, Execs.scheduledSingleThreaded("test_load_queue_peon_scheduled-%d"), Execs.singleThreaded("test_load_queue_peon-%d"), - new TestDruidCoordinatorConfig(null, null, null, null, null, null, 10, null, false, false, Duration.ZERO) + new TestDruidCoordinatorConfig( + null, + null, + null, + null, + null, + null, + 10, + null, + false, + false, + Duration.millis(0), + Duration.millis(10) + ) ); loadQueuePeon.start(); - final CountDownLatch[] loadRequestSignal = new CountDownLatch[5]; - final CountDownLatch[] dropRequestSignal = new CountDownLatch[5]; - final CountDownLatch[] segmentLoadedSignal = new CountDownLatch[5]; - final CountDownLatch[] segmentDroppedSignal = new CountDownLatch[5]; - - for (int i = 0; i < 5; ++i) { - loadRequestSignal[i] = new CountDownLatch(1); - dropRequestSignal[i] = new CountDownLatch(1); - segmentLoadedSignal[i] = new CountDownLatch(1); - segmentDroppedSignal[i] = new CountDownLatch(1); - } - - final DataSegmentChangeHandler handler = new DataSegmentChangeHandler() - { - @Override - public void addSegment(DataSegment segment, DataSegmentChangeCallback callback) - { - loadRequestSignal[requestSignalIdx.get()].countDown(); - } - - @Override - public void removeSegment(DataSegment segment, DataSegmentChangeCallback callback) - { - dropRequestSignal[requestSignalIdx.get()].countDown(); - } - }; + ConcurrentMap loadRequestSignals = new ConcurrentHashMap<>(5); + ConcurrentMap dropRequestSignals = new ConcurrentHashMap<>(5); + ConcurrentMap segmentLoadedSignals = new ConcurrentHashMap<>(5); + ConcurrentMap segmentDroppedSignals = new ConcurrentHashMap<>(5); final List segmentToDrop = Lists.transform( ImmutableList.of( @@ -132,11 +123,24 @@ public void removeSegment(DataSegment segment, DataSegmentChangeCallback callbac @Override public DataSegment apply(String intervalStr) { - return dataSegmentWithInterval(intervalStr); + DataSegment dataSegment = dataSegmentWithInterval(intervalStr); + return dataSegment; } } ); + final CountDownLatch[] dropRequestLatches = new CountDownLatch[5]; + final CountDownLatch[] dropSegmentLatches = new CountDownLatch[5]; + for (int i = 0; i < 5; i++) { + dropRequestLatches[i] = new CountDownLatch(1); + dropSegmentLatches[i] = new CountDownLatch(1); + } + int i = 0; + for (DataSegment s : segmentToDrop) { + dropRequestSignals.put(s.getId(), dropRequestLatches[i]); + segmentDroppedSignals.put(s.getId(), dropSegmentLatches[i++]); + } + final List segmentToLoad = Lists.transform( ImmutableList.of( "2014-10-27T00:00:00Z/P1D", @@ -149,11 +153,26 @@ public DataSegment apply(String intervalStr) @Override public DataSegment apply(String intervalStr) { - return dataSegmentWithInterval(intervalStr); + DataSegment dataSegment = dataSegmentWithInterval(intervalStr); + loadRequestSignals.put(dataSegment.getId(), new CountDownLatch(1)); + segmentLoadedSignals.put(dataSegment.getId(), new CountDownLatch(1)); + return dataSegment; } } ); + final CountDownLatch[] loadRequestLatches = new CountDownLatch[5]; + final CountDownLatch[] segmentLoadedLatches = new CountDownLatch[5]; + for (i = 0; i < 5; i++) { + loadRequestLatches[i] = new CountDownLatch(1); + segmentLoadedLatches[i] = new CountDownLatch(1); + } + i = 0; + for (DataSegment s : segmentToDrop) { + loadRequestSignals.put(s.getId(), loadRequestLatches[i]); + segmentLoadedSignals.put(s.getId(), segmentLoadedLatches[i++]); + } + // segment with latest interval should be loaded first final List expectedLoadOrder = Lists.transform( ImmutableList.of( @@ -162,59 +181,48 @@ public DataSegment apply(String intervalStr) "2014-10-30T00:00:00Z/P1D", "2014-10-28T00:00:00Z/P1D", "2014-10-27T00:00:00Z/P1D" - ), new Function() - { - @Override - public DataSegment apply(String intervalStr) - { - return dataSegmentWithInterval(intervalStr); - } - } + ), intervalStr -> dataSegmentWithInterval(intervalStr) ); + final DataSegmentChangeHandler handler = new DataSegmentChangeHandler() + { + @Override + public void addSegment(DataSegment segment, DataSegmentChangeCallback callback) + { + loadRequestSignals.get(segment.getId()).countDown(); + } + + @Override + public void removeSegment(DataSegment segment, DataSegmentChangeCallback callback) + { + dropRequestSignals.get(segment.getId()).countDown(); + } + }; + loadQueueCache.getListenable().addListener( - new PathChildrenCacheListener() - { - @Override - public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception - { - if (event.getType() == PathChildrenCacheEvent.Type.CHILD_ADDED) { - DataSegmentChangeRequest request = jsonMapper.readValue( - event.getData().getData(), - DataSegmentChangeRequest.class - ); - request.go(handler, null); - } + (client, event) -> { + if (event.getType() == PathChildrenCacheEvent.Type.CHILD_ADDED) { + DataSegmentChangeRequest request = jsonMapper.readValue( + event.getData().getData(), + DataSegmentChangeRequest.class + ); + request.go(handler, null); } } ); loadQueueCache.start(); - for (DataSegment segment : segmentToDrop) { + for (final DataSegment segment : segmentToDrop) { loadQueuePeon.dropSegment( segment, - new LoadPeonCallback() - { - @Override - public void execute() - { - segmentDroppedSignal[segmentSignalIdx.get()].countDown(); - } - } + () -> segmentDroppedSignals.get(segment.getId()).countDown() ); } - for (DataSegment segment : segmentToLoad) { + for (final DataSegment segment : segmentToLoad) { loadQueuePeon.loadSegment( segment, - new LoadPeonCallback() - { - @Override - public void execute() - { - segmentLoadedSignal[segmentSignalIdx.get()].countDown(); - } - } + () -> segmentLoadedSignals.get(segment.getId()).countDown() ); } @@ -224,8 +232,14 @@ public void execute() for (DataSegment segment : segmentToDrop) { String dropRequestPath = ZKPaths.makePath(LOAD_QUEUE_PATH, segment.getId().toString()); - Assert.assertTrue(timing.forWaiting().awaitLatch(dropRequestSignal[requestSignalIdx.get()])); - Assert.assertNotNull(curator.checkExists().forPath(dropRequestPath)); + Assert.assertTrue( + "Latch not counted down for " + dropRequestSignals.get(segment.getId()), + dropRequestSignals.get(segment.getId()).await(10, TimeUnit.SECONDS) + ); + Assert.assertNotNull( + "Path " + dropRequestPath + " doesn't exist", + curator.checkExists().forPath(dropRequestPath) + ); Assert.assertEquals( segment, ((SegmentChangeRequestDrop) jsonMapper.readValue( @@ -235,29 +249,14 @@ public void execute() )).getSegment() ); - if (requestSignalIdx.get() == 4) { - requestSignalIdx.set(0); - } else { - requestSignalIdx.incrementAndGet(); - } - // simulate completion of drop request by historical curator.delete().guaranteed().forPath(dropRequestPath); - Assert.assertTrue(timing.forWaiting().awaitLatch(segmentDroppedSignal[segmentSignalIdx.get()])); - - int expectedNumSegmentToDrop = 5 - segmentSignalIdx.get() - 1; - Assert.assertEquals(expectedNumSegmentToDrop, loadQueuePeon.getSegmentsToDrop().size()); - - if (segmentSignalIdx.get() == 4) { - segmentSignalIdx.set(0); - } else { - segmentSignalIdx.incrementAndGet(); - } + Assert.assertTrue(timing.forWaiting().awaitLatch(segmentDroppedSignals.get(segment.getId()))); } for (DataSegment segment : expectedLoadOrder) { String loadRequestPath = ZKPaths.makePath(LOAD_QUEUE_PATH, segment.getId().toString()); - Assert.assertTrue(timing.forWaiting().awaitLatch(loadRequestSignal[requestSignalIdx.get()])); + Assert.assertTrue(timing.forWaiting().awaitLatch(loadRequestSignals.get(segment.getId()))); Assert.assertNotNull(curator.checkExists().forPath(loadRequestPath)); Assert.assertEquals( segment, @@ -266,16 +265,9 @@ public void execute() .getSegment() ); - requestSignalIdx.incrementAndGet(); - // simulate completion of load request by historical curator.delete().guaranteed().forPath(loadRequestPath); - Assert.assertTrue(timing.forWaiting().awaitLatch(segmentLoadedSignal[segmentSignalIdx.get()])); - - int expectedNumSegmentToLoad = 5 - segmentSignalIdx.get() - 1; - Assert.assertEquals(1200 * expectedNumSegmentToLoad, loadQueuePeon.getLoadQueueSize()); - Assert.assertEquals(expectedNumSegmentToLoad, loadQueuePeon.getSegmentsToLoad().size()); - segmentSignalIdx.incrementAndGet(); + Assert.assertTrue(timing.forWaiting().awaitLatch(segmentLoadedSignals.get(segment.getId()))); } } @@ -294,7 +286,20 @@ public void testFailAssign() throws Exception Execs.scheduledSingleThreaded("test_load_queue_peon_scheduled-%d"), Execs.singleThreaded("test_load_queue_peon-%d"), // set time-out to 1 ms so that LoadQueuePeon will fail the assignment quickly - new TestDruidCoordinatorConfig(null, null, null, new Duration(1), null, null, 10, null, false, false, new Duration("PT1s")) + new TestDruidCoordinatorConfig( + null, + null, + null, + new Duration(1), + null, + null, + 10, + null, + false, + false, + new Duration("PT1s"), + Duration.millis(10) + ) ); loadQueuePeon.start(); diff --git a/server/src/test/java/org/apache/druid/server/coordinator/LoadQueuePeonTester.java b/server/src/test/java/org/apache/druid/server/coordinator/LoadQueuePeonTester.java index e07f363e78a4..c979671ff847 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/LoadQueuePeonTester.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/LoadQueuePeonTester.java @@ -19,7 +19,9 @@ package org.apache.druid.server.coordinator; +import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.timeline.DataSegment; +import org.joda.time.Duration; import java.util.concurrent.ConcurrentSkipListSet; @@ -29,7 +31,27 @@ public class LoadQueuePeonTester extends CuratorLoadQueuePeon public LoadQueuePeonTester() { - super(null, null, null, null, null, null); + super( + null, + null, + null, + Execs.scheduledSingleThreaded("LoadQueuePeonTester--%d"), + null, + new TestDruidCoordinatorConfig( + null, + null, + null, + new Duration(1), + null, + null, + 10, + null, + false, + false, + new Duration("PT1s"), + Duration.millis(10) + ) + ); } @Override diff --git a/server/src/test/java/org/apache/druid/server/coordinator/TestDruidCoordinatorConfig.java b/server/src/test/java/org/apache/druid/server/coordinator/TestDruidCoordinatorConfig.java index 317f2fe67118..e1b91354de70 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/TestDruidCoordinatorConfig.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/TestDruidCoordinatorConfig.java @@ -46,7 +46,8 @@ public TestDruidCoordinatorConfig( String consoleStatic, boolean mergeSegments, boolean convertSegments, - Duration getLoadQueuePeonRepeatDelay + Duration getLoadQueuePeonRepeatDelay, + Duration CuratorCreateZkNodesRepeatDelay ) { this.coordinatorStartDelay = coordinatorStartDelay; @@ -108,8 +109,10 @@ public String getConsoleStatic() return consoleStatic; } - @Override public Duration getLoadQueuePeonRepeatDelay() + @Override + public Duration getLoadQueuePeonRepeatDelay() { return getLoadQueuePeonRepeatDelay; } + } diff --git a/server/src/test/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorSegmentKillerTest.java b/server/src/test/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorSegmentKillerTest.java index 8593e301b972..7ee58a2a8f19 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorSegmentKillerTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorSegmentKillerTest.java @@ -113,7 +113,8 @@ private void testFindIntervalForKillTask(List segmentManagerResult, In null, false, false, - Duration.ZERO + Duration.ZERO, + Duration.millis(10) ) ); diff --git a/services/src/main/java/org/apache/druid/cli/CliCoordinator.java b/services/src/main/java/org/apache/druid/cli/CliCoordinator.java index 4097e952096a..88245624093d 100644 --- a/services/src/main/java/org/apache/druid/cli/CliCoordinator.java +++ b/services/src/main/java/org/apache/druid/cli/CliCoordinator.java @@ -46,6 +46,7 @@ import org.apache.druid.guice.annotations.CoordinatorIndexingServiceHelper; import org.apache.druid.guice.annotations.EscalatedGlobal; import org.apache.druid.guice.http.JettyHttpClientModule; +import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.common.concurrent.ScheduledExecutorFactory; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.java.util.http.client.HttpClient; @@ -90,7 +91,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Properties; -import java.util.concurrent.Executors; +import java.util.concurrent.ExecutorService; /** */ @@ -249,11 +250,19 @@ public LoadQueueTaskMaster getLoadQueueTaskMaster( ZkPathsConfig zkPaths ) { + boolean useHttpLoadQueuePeon = "http".equalsIgnoreCase(config.getLoadQueuePeonType()); + ExecutorService callBackExec; + if (useHttpLoadQueuePeon) { + callBackExec = Execs.singleThreaded("LoadQueuePeon-callbackexec--%d"); + } else { + callBackExec = Execs.multiThreaded(config.getNumCuratorCallBackThreads(), "LoadQueuePeon" + + "-callbackexec--%d"); + } return new LoadQueueTaskMaster( curator, jsonMapper, factory.create(1, "Master-PeonExec--%d"), - Executors.newSingleThreadExecutor(), + callBackExec, config, httpClient, zkPaths