From 1c4072d9ac5462d0f1afeae01ee7cfebc22d79db Mon Sep 17 00:00:00 2001 From: Nishant Date: Tue, 2 Feb 2016 14:42:07 +0530 Subject: [PATCH 1/4] Abstract out LoadQueuePeon to make it extensible --- .../server/coordinator/LoadQueuePeon.java | 189 +++++------------- .../coordinator/LoadQueueTaskMaster.java | 2 +- .../server/coordinator/ZkLoadQueuePeon.java | 139 +++++++++++++ .../coordinator/DruidCoordinatorTest.java | 2 +- .../server/coordinator/LoadQueuePeonTest.java | 4 +- .../coordinator/LoadQueuePeonTester.java | 8 +- 6 files changed, 205 insertions(+), 139 deletions(-) create mode 100644 server/src/main/java/io/druid/server/coordinator/ZkLoadQueuePeon.java diff --git a/server/src/main/java/io/druid/server/coordinator/LoadQueuePeon.java b/server/src/main/java/io/druid/server/coordinator/LoadQueuePeon.java index ae049e8a68b3..e5b2d2e091ed 100644 --- a/server/src/main/java/io/druid/server/coordinator/LoadQueuePeon.java +++ b/server/src/main/java/io/druid/server/coordinator/LoadQueuePeon.java @@ -20,38 +20,26 @@ package io.druid.server.coordinator; import com.fasterxml.jackson.annotation.JsonProperty; -import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.Lists; -import com.metamx.common.ISE; -import com.metamx.common.guava.Comparators; import com.metamx.emitter.EmittingLogger; import io.druid.server.coordination.DataSegmentChangeRequest; import io.druid.server.coordination.SegmentChangeRequestDrop; import io.druid.server.coordination.SegmentChangeRequestLoad; -import io.druid.server.coordination.SegmentChangeRequestNoop; import io.druid.timeline.DataSegment; -import org.apache.curator.framework.CuratorFramework; -import org.apache.curator.framework.api.CuratorWatcher; -import org.apache.curator.utils.ZKPaths; -import org.apache.zookeeper.CreateMode; -import org.apache.zookeeper.WatchedEvent; -import org.apache.zookeeper.data.Stat; import java.util.Arrays; import java.util.Collection; -import java.util.Comparator; import java.util.List; import java.util.Set; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; /** */ -public class LoadQueuePeon +public abstract class LoadQueuePeon { private static final EmittingLogger log = new EmittingLogger(LoadQueuePeon.class); private static final int DROP = 0; @@ -66,12 +54,10 @@ private static void executeCallbacks(List callbacks) } } - private final CuratorFramework curator; - private final String basePath; - private final ObjectMapper jsonMapper; - private final ScheduledExecutorService zkWritingExecutor; + + private final ScheduledExecutorService processingExecutor; private final ExecutorService callBackExecutor; - private final DruidCoordinatorConfig config; + private final String peonId; private final AtomicLong queuedSize = new AtomicLong(0); private final AtomicInteger failedAssignCount = new AtomicInteger(0); @@ -89,20 +75,14 @@ private static void executeCallbacks(List callbacks) private boolean stopped = false; LoadQueuePeon( - CuratorFramework curator, - String basePath, - ObjectMapper jsonMapper, - ScheduledExecutorService zkWritingExecutor, - ExecutorService callbackExecutor, - DruidCoordinatorConfig config + String peonId, + ScheduledExecutorService processingExecutor, + ExecutorService callbackExecutor ) { - this.curator = curator; - this.basePath = basePath; - this.jsonMapper = jsonMapper; + this.peonId = peonId; this.callBackExecutor = callbackExecutor; - this.zkWritingExecutor = zkWritingExecutor; - this.config = config; + this.processingExecutor = processingExecutor; } @JsonProperty @@ -152,7 +132,7 @@ public void loadSegment( } } - log.info("Asking server peon[%s] to load segment[%s]", basePath, segment.getIdentifier()); + log.info("Asking server peon[%s] to load segment[%s]", peonId, segment.getIdentifier()); queuedSize.addAndGet(segment.getSize()); segmentsToLoad.put(segment, new SegmentHolder(segment, LOAD, Arrays.asList(callback))); doNext(); @@ -183,7 +163,7 @@ public void dropSegment( } } - log.info("Asking server peon[%s] to drop segment[%s]", basePath, segment.getIdentifier()); + log.info("Asking server peon[%s] to drop segment[%s]", peonId, segment.getIdentifier()); segmentsToDrop.put(segment, new SegmentHolder(segment, DROP, Arrays.asList(callback))); doNext(); } @@ -194,95 +174,32 @@ private void doNext() if (currentlyProcessing == null) { if (!segmentsToDrop.isEmpty()) { currentlyProcessing = segmentsToDrop.firstEntry().getValue(); - log.info("Server[%s] dropping [%s]", basePath, currentlyProcessing.getSegmentIdentifier()); + log.info("Server[%s] dropping [%s]", peonId, currentlyProcessing.getSegmentIdentifier()); } else if (!segmentsToLoad.isEmpty()) { currentlyProcessing = segmentsToLoad.firstEntry().getValue(); - log.info("Server[%s] loading [%s]", basePath, currentlyProcessing.getSegmentIdentifier()); + log.info("Server[%s] loading [%s]", peonId, currentlyProcessing.getSegmentIdentifier()); } else { return; } - zkWritingExecutor.execute( + processingExecutor.execute( new Runnable() { @Override public void run() { synchronized (lock) { - try { - // expected when the coordinator looses leadership and LoadQueuePeon is stopped. - if (currentlyProcessing == null) { - if(!stopped) { - log.makeAlert("Crazy race condition! server[%s]", basePath) - .emit(); - } - actionCompleted(); - doNext(); - return; - } - log.info("Server[%s] processing segment[%s]", basePath, currentlyProcessing.getSegmentIdentifier()); - final String path = ZKPaths.makePath(basePath, currentlyProcessing.getSegmentIdentifier()); - final byte[] payload = jsonMapper.writeValueAsBytes(currentlyProcessing.getChangeRequest()); - curator.create().withMode(CreateMode.EPHEMERAL).forPath(path, payload); - - zkWritingExecutor.schedule( - new Runnable() - { - @Override - public void run() - { - try { - if (curator.checkExists().forPath(path) != null) { - failAssign(new ISE("%s was never removed! Failing this operation!", path)); - } - } - catch (Exception e) { - failAssign(e); - } - } - }, - config.getLoadTimeoutDelay().getMillis(), - TimeUnit.MILLISECONDS - ); - - final Stat stat = curator.checkExists().usingWatcher( - new CuratorWatcher() - { - @Override - public void process(WatchedEvent watchedEvent) throws Exception - { - switch (watchedEvent.getType()) { - case NodeDeleted: - entryRemoved(watchedEvent.getPath()); - } - } - } - ).forPath(path); - - if (stat == null) { - final byte[] noopPayload = jsonMapper.writeValueAsBytes(new SegmentChangeRequestNoop()); - - // Create a node and then delete it to remove the registered watcher. This is a work-around for - // a zookeeper race condition. Specifically, when you set a watcher, it fires on the next event - // that happens for that node. If no events happen, the watcher stays registered foreverz. - // Couple that with the fact that you cannot set a watcher when you create a node, but what we - // want is to create a node and then watch for it to get deleted. The solution is that you *can* - // set a watcher when you check to see if it exists so, we first create the node and then set a - // watcher on its existence. However, if already does not exist by the time the existence check - // returns, then the watcher that was set will never fire (nobody will ever create the node - // again) and thus lead to a slow, but real, memory leak. So, we create another node to cause - // that watcher to fire and delete it right away. - // - // We do not create the existence watcher first, because then it will fire when we create the - // node and we'll have the same race when trying to refresh that watcher. - curator.create().withMode(CreateMode.EPHEMERAL).forPath(path, noopPayload); - - entryRemoved(path); + // expected when the coordinator looses leadership and LoadQueuePeon is stopped. + if (currentlyProcessing == null) { + if (!stopped) { + log.makeAlert("Crazy race condition! server[%s]", peonId) + .emit(); } + doNext(); + return; } - catch (Exception e) { - failAssign(e); - } + log.info("Server[%s] processing segment[%s]", peonId, currentlyProcessing.getSegmentIdentifier()); + processHolder(currentlyProcessing); } } } @@ -290,16 +207,39 @@ public void process(WatchedEvent watchedEvent) throws Exception } else { log.info( "Server[%s] skipping doNext() because something is currently loading[%s].", - basePath, + peonId, currentlyProcessing.getSegmentIdentifier() ); } } } - private void actionCompleted() + /** + * Processes the segmentHolder asynchronously. Completion of the processing action is notified + * via calling {@link #actionCompleted(SegmentHolder)} method. Any exception during processing + * is notified via {@link #failAssign(SegmentHolder, Exception)} method. + * NOTE: This method is always invoked using the processingExecutor. + * + * @param holder segment holder to be processed. + */ + abstract void processHolder(SegmentHolder holder); + + void actionCompleted(SegmentHolder holder) { - if (currentlyProcessing != null) { + synchronized (lock) { + if (currentlyProcessing == null) { + log.warn("Server[%s] completed processing[%s] even though it wasn't processing!?", peonId, holder); + return; + } + if (currentlyProcessing != holder) { + log.warn( + "Server[%s] completed processing[%s] while it was processing[%s]!?", + peonId, + holder, + currentlyProcessing + ); + return; + } switch (currentlyProcessing.getType()) { case LOAD: segmentsToLoad.remove(currentlyProcessing.getSegment()); @@ -324,7 +264,9 @@ public void run() } } ); + log.info("Server[%s] done processing [%s]", peonId, holder); } + doNext(); } public void stop() @@ -355,39 +297,18 @@ public void stop() } } - private void entryRemoved(String path) - { - synchronized (lock) { - if (currentlyProcessing == null) { - log.warn("Server[%s] an entry[%s] was removed even though it wasn't loading!?", basePath, path); - return; - } - if (!ZKPaths.getNodeFromPath(path).equals(currentlyProcessing.getSegmentIdentifier())) { - log.warn( - "Server[%s] entry [%s] was removed even though it's not what is currently loading[%s]", - basePath, path, currentlyProcessing - ); - return; - } - actionCompleted(); - log.info("Server[%s] done processing [%s]", basePath, path); - } - - doNext(); - } - - private void failAssign(Exception e) + void failAssign(SegmentHolder holder, Exception e) { synchronized (lock) { - log.error(e, "Server[%s], throwable caught when submitting [%s].", basePath, currentlyProcessing); + log.error(e, "Server[%s], throwable caught when submitting [%s].", peonId, currentlyProcessing); failedAssignCount.getAndIncrement(); // Act like it was completed so that the coordinator gives it to someone else - actionCompleted(); + actionCompleted(holder); doNext(); } } - private static class SegmentHolder + static class SegmentHolder { private final DataSegment segment; private final DataSegmentChangeRequest changeRequest; diff --git a/server/src/main/java/io/druid/server/coordinator/LoadQueueTaskMaster.java b/server/src/main/java/io/druid/server/coordinator/LoadQueueTaskMaster.java index cdad8c7adc21..12e0cd098de6 100644 --- a/server/src/main/java/io/druid/server/coordinator/LoadQueueTaskMaster.java +++ b/server/src/main/java/io/druid/server/coordinator/LoadQueueTaskMaster.java @@ -55,6 +55,6 @@ public LoadQueueTaskMaster( public LoadQueuePeon giveMePeon(String basePath) { - return new LoadQueuePeon(curator, basePath, jsonMapper, peonExec, callbackExec, config); + return new ZkLoadQueuePeon(curator, basePath, jsonMapper, peonExec, callbackExec, config); } } diff --git a/server/src/main/java/io/druid/server/coordinator/ZkLoadQueuePeon.java b/server/src/main/java/io/druid/server/coordinator/ZkLoadQueuePeon.java new file mode 100644 index 000000000000..9c91c739187f --- /dev/null +++ b/server/src/main/java/io/druid/server/coordinator/ZkLoadQueuePeon.java @@ -0,0 +1,139 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.server.coordinator; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.metamx.common.ISE; +import com.metamx.emitter.EmittingLogger; +import io.druid.server.coordination.SegmentChangeRequestNoop; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.api.CuratorWatcher; +import org.apache.curator.utils.ZKPaths; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.data.Stat; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +public class ZkLoadQueuePeon extends LoadQueuePeon +{ + private static final EmittingLogger log = new EmittingLogger(ZkLoadQueuePeon.class); + + private final CuratorFramework curator; + private final String basePath; + private final ObjectMapper jsonMapper; + private final ScheduledExecutorService processingExecutor; + private final DruidCoordinatorConfig config; + + ZkLoadQueuePeon( + CuratorFramework curator, + String basePath, + ObjectMapper jsonMapper, + ScheduledExecutorService processingExecutor, + ExecutorService callbackExecutor, + DruidCoordinatorConfig config + ) + { + super(basePath, processingExecutor, callbackExecutor); + this.curator = curator; + this.basePath = basePath; + this.jsonMapper = jsonMapper; + this.processingExecutor = processingExecutor; + this.config = config; + } + + @Override + void processHolder(final SegmentHolder holder) + { + + try { + final String path = ZKPaths.makePath(basePath, holder.getSegmentIdentifier()); + final byte[] payload = jsonMapper.writeValueAsBytes(holder.getChangeRequest()); + curator.create().withMode(CreateMode.EPHEMERAL).forPath(path, payload); + + processingExecutor.schedule( + new Runnable() + { + @Override + public void run() + { + try { + if (curator.checkExists().forPath(path) != null) { + failAssign(holder, new ISE("%s was never removed! Failing this operation!", path)); + } + } + catch (Exception e) { + failAssign(holder, e); + } + } + }, + config.getLoadTimeoutDelay().getMillis(), + TimeUnit.MILLISECONDS + ); + + final Stat stat = curator.checkExists().usingWatcher( + new CuratorWatcher() + { + @Override + public void process(WatchedEvent watchedEvent) throws Exception + { + switch (watchedEvent.getType()) { + case NodeDeleted: + if (!ZKPaths.getNodeFromPath(path).equals(holder.getSegmentIdentifier())) { + log.warn( + "Server[%s] entry [%s] was removed even though it's not what is currently loading[%s]", + basePath, path, holder + ); + return; + } + actionCompleted(holder); + } + } + } + ).forPath(path); + + if (stat == null) { + final byte[] noopPayload = jsonMapper.writeValueAsBytes(new SegmentChangeRequestNoop()); + + // Create a node and then delete it to remove the registered watcher. This is a work-around for + // a zookeeper race condition. Specifically, when you set a watcher, it fires on the next event + // that happens for that node. If no events happen, the watcher stays registered foreverz. + // Couple that with the fact that you cannot set a watcher when you create a node, but what we + // want is to create a node and then watch for it to get deleted. The solution is that you *can* + // set a watcher when you check to see if it exists so, we first create the node and then set a + // watcher on its existence. However, if already does not exist by the time the existence check + // returns, then the watcher that was set will never fire (nobody will ever create the node + // again) and thus lead to a slow, but real, memory leak. So, we create another node to cause + // that watcher to fire and delete it right away. + // + // We do not create the existence watcher first, because then it will fire when we create the + // node and we'll have the same race when trying to refresh that watcher. + curator.create().withMode(CreateMode.EPHEMERAL).forPath(path, noopPayload); + actionCompleted(holder); + } + + } + catch (Exception e) { + failAssign(holder, e); + } + } +} diff --git a/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorTest.java b/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorTest.java index 129d7d7eb195..b8a0add0d0bd 100644 --- a/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorTest.java +++ b/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorTest.java @@ -131,7 +131,7 @@ public void setUp() throws Exception false ); pathChildrenCache = new PathChildrenCache(curator, LOADPATH, true, true, Execs.singleThreaded("coordinator_test_path_children_cache-%d")); - loadQueuePeon = new LoadQueuePeon( + loadQueuePeon = new ZkLoadQueuePeon( curator, LOADPATH, objectMapper, diff --git a/server/src/test/java/io/druid/server/coordinator/LoadQueuePeonTest.java b/server/src/test/java/io/druid/server/coordinator/LoadQueuePeonTest.java index 5e3f56a41af8..9fa906db9f2a 100644 --- a/server/src/test/java/io/druid/server/coordinator/LoadQueuePeonTest.java +++ b/server/src/test/java/io/druid/server/coordinator/LoadQueuePeonTest.java @@ -82,7 +82,7 @@ public void testMultipleLoadDropSegments() throws Exception final AtomicInteger requestSignalIdx = new AtomicInteger(0); final AtomicInteger segmentSignalIdx = new AtomicInteger(0); - loadQueuePeon = new LoadQueuePeon( + loadQueuePeon = new ZkLoadQueuePeon( curator, LOAD_QUEUE_PATH, jsonMapper, @@ -287,7 +287,7 @@ public void testFailAssign() throws Exception final CountDownLatch loadRequestSignal = new CountDownLatch(1); final CountDownLatch segmentLoadedSignal = new CountDownLatch(1); - loadQueuePeon = new LoadQueuePeon( + loadQueuePeon = new ZkLoadQueuePeon( curator, LOAD_QUEUE_PATH, jsonMapper, diff --git a/server/src/test/java/io/druid/server/coordinator/LoadQueuePeonTester.java b/server/src/test/java/io/druid/server/coordinator/LoadQueuePeonTester.java index bb920a36198d..e85d44fada55 100644 --- a/server/src/test/java/io/druid/server/coordinator/LoadQueuePeonTester.java +++ b/server/src/test/java/io/druid/server/coordinator/LoadQueuePeonTester.java @@ -29,7 +29,7 @@ public class LoadQueuePeonTester extends LoadQueuePeon public LoadQueuePeonTester() { - super(null, null, null, null, null, null); + super(null, null, null); } @Override @@ -41,6 +41,12 @@ public void loadSegment( segmentsToLoad.add(segment); } + @Override + void processHolder(SegmentHolder holder) + { + // Nothing to do + } + public ConcurrentSkipListSet getSegmentsToLoad() { return segmentsToLoad; From 26c217bb8a76bbcc50e6fa211f21e247d60909a7 Mon Sep 17 00:00:00 2001 From: Nishant Date: Tue, 2 Feb 2016 15:37:37 +0530 Subject: [PATCH 2/4] Separate out ZKCoordination and SegmentManager --- .../server/coordination/SegmentManager.java | 515 ++++++++++++++++++ .../server/coordination/ZkCoordinator.java | 404 +------------- .../druid/server/http/HistoricalResource.java | 9 +- ...natorTest.java => SegmentManagerTest.java} | 48 +- .../main/java/io/druid/cli/CliHistorical.java | 4 + 5 files changed, 552 insertions(+), 428 deletions(-) create mode 100644 server/src/main/java/io/druid/server/coordination/SegmentManager.java rename server/src/test/java/io/druid/server/coordination/{ZkCoordinatorTest.java => SegmentManagerTest.java} (93%) diff --git a/server/src/main/java/io/druid/server/coordination/SegmentManager.java b/server/src/main/java/io/druid/server/coordination/SegmentManager.java new file mode 100644 index 000000000000..0cc9b3602d8f --- /dev/null +++ b/server/src/main/java/io/druid/server/coordination/SegmentManager.java @@ -0,0 +1,515 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.server.coordination; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Throwables; +import com.google.common.collect.Lists; +import com.google.common.collect.Queues; +import com.google.common.util.concurrent.SettableFuture; +import com.google.inject.Inject; +import com.google.inject.Singleton; +import com.metamx.common.ISE; +import com.metamx.common.concurrent.ScheduledExecutorFactory; +import com.metamx.common.lifecycle.LifecycleStart; +import com.metamx.common.lifecycle.LifecycleStop; +import com.metamx.emitter.EmittingLogger; +import io.druid.concurrent.Execs; +import io.druid.segment.loading.SegmentLoaderConfig; +import io.druid.segment.loading.SegmentLoadingException; +import io.druid.server.initialization.ZkPathsConfig; +import io.druid.timeline.DataSegment; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.recipes.cache.ChildData; +import org.apache.curator.framework.recipes.cache.PathChildrenCache; +import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent; +import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener; +import org.apache.curator.utils.ZKPaths; + +import java.io.File; +import java.io.IOException; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.ConcurrentSkipListSet; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +/** + */ +@Singleton +public class SegmentManager implements DataSegmentChangeHandler +{ + private static final EmittingLogger log = new EmittingLogger(SegmentManager.class); + + private final Object lock = new Object(); + + private final ObjectMapper jsonMapper; + private final SegmentLoaderConfig config; + private final DruidServerMetadata me; + private final DataSegmentAnnouncer announcer; + private final ServerManager serverManager; + private final ScheduledExecutorService exec; + private final ConcurrentSkipListSet segmentsToDelete; + + + private volatile boolean started = false; + + @Inject + public SegmentManager( + ObjectMapper jsonMapper, + SegmentLoaderConfig config, + DruidServerMetadata me, + DataSegmentAnnouncer announcer, + ServerManager serverManager, + ScheduledExecutorFactory factory + ) + { + this.jsonMapper = jsonMapper; + this.config = config; + this.me = me; + this.announcer = announcer; + this.serverManager = serverManager; + + this.exec = factory.create(1, "SegmentManager-Exec--%d"); + this.segmentsToDelete = new ConcurrentSkipListSet<>(); + } + + @LifecycleStart + public void start() throws IOException + { + synchronized (lock) { + if (started) { + return; + } + + log.info("Starting SegmentManager for server[%s]", me.getName()); + loadLocalCache(); + started = true; + } + } + + @LifecycleStop + public void stop() + { + log.info("Stopping SegmentManager for [%s]", me); + synchronized (lock) { + if (!started) { + return; + } + started = false; + } + } + + public boolean isStarted() + { + return started; + } + + public void loadLocalCache() + { + final long start = System.currentTimeMillis(); + File baseDir = config.getInfoDir(); + if (!baseDir.exists() && !config.getInfoDir().mkdirs()) { + return; + } + + List cachedSegments = Lists.newArrayList(); + File[] segmentsToLoad = baseDir.listFiles(); + for (int i = 0; i < segmentsToLoad.length; i++) { + File file = segmentsToLoad[i]; + log.info("Loading segment cache file [%d/%d][%s].", i, segmentsToLoad.length, file); + try { + DataSegment segment = jsonMapper.readValue(file, DataSegment.class); + if (serverManager.isSegmentCached(segment)) { + cachedSegments.add(segment); + } else { + log.warn("Unable to find cache file for %s. Deleting lookup entry", segment.getIdentifier()); + + File segmentInfoCacheFile = new File(config.getInfoDir(), segment.getIdentifier()); + if (!segmentInfoCacheFile.delete()) { + log.warn("Unable to delete segmentInfoCacheFile[%s]", segmentInfoCacheFile); + } + } + } + catch (Exception e) { + log.makeAlert(e, "Failed to load segment from segmentInfo file") + .addData("file", file) + .emit(); + } + } + + addSegments( + cachedSegments, + new DataSegmentChangeCallback() + { + @Override + public void execute() + { + log.info("Cache load took %,d ms", System.currentTimeMillis() - start); + } + } + ); + } + + public DataSegmentChangeHandler getDataSegmentChangeHandler() + { + return SegmentManager.this; + } + + /** + * Load a single segment. If the segment is loaded succesfully, this function simply returns. Otherwise it will + * throw a SegmentLoadingException + * + * @throws SegmentLoadingException + */ + private void loadSegment(DataSegment segment, DataSegmentChangeCallback callback) throws SegmentLoadingException + { + final boolean loaded; + try { + loaded = serverManager.loadSegment(segment); + } + catch (Exception e) { + removeSegment(segment, callback); + throw new SegmentLoadingException(e, "Exception loading segment[%s]", segment.getIdentifier()); + } + + if (loaded) { + File segmentInfoCacheFile = new File(config.getInfoDir(), segment.getIdentifier()); + if (!segmentInfoCacheFile.exists()) { + try { + jsonMapper.writeValue(segmentInfoCacheFile, segment); + } + catch (IOException e) { + removeSegment(segment, callback); + throw new SegmentLoadingException( + e, "Failed to write to disk segment info cache file[%s]", segmentInfoCacheFile + ); + } + } + } + } + + @Override + public void addSegment(DataSegment segment, DataSegmentChangeCallback callback) + { + try { + log.info("Loading segment %s", segment.getIdentifier()); + /* + The lock below is used to prevent a race condition when the scheduled runnable in removeSegment() starts, + and if(segmentsToDelete.remove(segment)) returns true, in which case historical will start deleting segment + files. At that point, it's possible that right after the "if" check, addSegment() is called and actually loads + the segment, which makes dropping segment and downloading segment happen at the same time. + */ + if (segmentsToDelete.contains(segment)) { + /* + Both contains(segment) and remove(segment) can be moved inside the synchronized block. However, in that case, + each time when addSegment() is called, it has to wait for the lock in order to make progress, which will make + things slow. Given that in most cases segmentsToDelete.contains(segment) returns false, it will save a lot of + cost of acquiring lock by doing the "contains" check outside the synchronized block. + */ + synchronized (lock) { + segmentsToDelete.remove(segment); + } + } + loadSegment(segment, callback); + if (!announcer.isAnnounced(segment)) { + try { + announcer.announceSegment(segment); + } + catch (IOException e) { + throw new SegmentLoadingException(e, "Failed to announce segment[%s]", segment.getIdentifier()); + } + } + } + catch (SegmentLoadingException e) { + log.makeAlert(e, "Failed to load segment for dataSource") + .addData("segment", segment) + .emit(); + } + finally { + callback.execute(); + } + } + + private void addSegments(Collection segments, final DataSegmentChangeCallback callback) + { + ExecutorService loadingExecutor = null; + try (final BackgroundSegmentAnnouncer backgroundSegmentAnnouncer = + new BackgroundSegmentAnnouncer(announcer, exec, config.getAnnounceIntervalMillis())) { + + backgroundSegmentAnnouncer.startAnnouncing(); + + loadingExecutor = Execs.multiThreaded(config.getNumBootstrapThreads(), "SegmentManager-loading-%s"); + + final int numSegments = segments.size(); + final CountDownLatch latch = new CountDownLatch(numSegments); + final AtomicInteger counter = new AtomicInteger(0); + final CopyOnWriteArrayList failedSegments = new CopyOnWriteArrayList<>(); + for (final DataSegment segment : segments) { + loadingExecutor.submit( + new Runnable() + { + @Override + public void run() + { + try { + log.info( + "Loading segment[%d/%d][%s]", + counter.getAndIncrement(), + numSegments, + segment.getIdentifier() + ); + loadSegment(segment, callback); + if (!announcer.isAnnounced(segment)) { + try { + backgroundSegmentAnnouncer.announceSegment(segment); + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new SegmentLoadingException(e, "Loading Interrupted"); + } + } + } + catch (SegmentLoadingException e) { + log.error(e, "[%s] failed to load", segment.getIdentifier()); + failedSegments.add(segment); + } + finally { + latch.countDown(); + } + } + } + ); + } + + try { + latch.await(); + + if (failedSegments.size() > 0) { + log.makeAlert("%,d errors seen while loading segments", failedSegments.size()) + .addData("failedSegments", failedSegments); + } + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + log.makeAlert(e, "LoadingInterrupted"); + } + + backgroundSegmentAnnouncer.finishAnnouncing(); + } + catch (SegmentLoadingException e) { + log.makeAlert(e, "Failed to load segments -- likely problem with announcing.") + .addData("numSegments", segments.size()) + .emit(); + } + finally { + callback.execute(); + if (loadingExecutor != null) { + loadingExecutor.shutdownNow(); + } + } + } + + + @Override + public void removeSegment(final DataSegment segment, final DataSegmentChangeCallback callback) + { + try { + announcer.unannounceSegment(segment); + segmentsToDelete.add(segment); + + log.info("Completely removing [%s] in [%,d] millis", segment.getIdentifier(), config.getDropSegmentDelayMillis()); + exec.schedule( + new Runnable() + { + @Override + public void run() + { + try { + synchronized (lock) { + if (segmentsToDelete.remove(segment)) { + serverManager.dropSegment(segment); + + File segmentInfoCacheFile = new File(config.getInfoDir(), segment.getIdentifier()); + if (!segmentInfoCacheFile.delete()) { + log.warn("Unable to delete segmentInfoCacheFile[%s]", segmentInfoCacheFile); + } + } + } + } + catch (Exception e) { + log.makeAlert(e, "Failed to remove segment! Possible resource leak!") + .addData("segment", segment) + .emit(); + } + } + }, + config.getDropSegmentDelayMillis(), + TimeUnit.MILLISECONDS + ); + } + catch (Exception e) { + log.makeAlert(e, "Failed to remove segment") + .addData("segment", segment) + .emit(); + } + finally { + callback.execute(); + } + } + + private static class BackgroundSegmentAnnouncer implements AutoCloseable + { + private static final EmittingLogger log = new EmittingLogger(BackgroundSegmentAnnouncer.class); + + private final int intervalMillis; + private final DataSegmentAnnouncer announcer; + private final ScheduledExecutorService exec; + private final LinkedBlockingQueue queue; + private final SettableFuture doneAnnouncing; + + private final Object lock = new Object(); + + private volatile boolean finished = false; + private volatile ScheduledFuture startedAnnouncing = null; + private volatile ScheduledFuture nextAnnoucement = null; + + public BackgroundSegmentAnnouncer( + DataSegmentAnnouncer announcer, + ScheduledExecutorService exec, + int intervalMillis + ) + { + this.announcer = announcer; + this.exec = exec; + this.intervalMillis = intervalMillis; + this.queue = Queues.newLinkedBlockingQueue(); + this.doneAnnouncing = SettableFuture.create(); + } + + public void announceSegment(final DataSegment segment) throws InterruptedException + { + if (finished) { + throw new ISE("Announce segment called after finishAnnouncing"); + } + queue.put(segment); + } + + public void startAnnouncing() + { + if (intervalMillis <= 0) { + return; + } + + log.info("Starting background segment announcing task"); + + // schedule background announcing task + nextAnnoucement = startedAnnouncing = exec.schedule( + new Runnable() + { + @Override + public void run() + { + synchronized (lock) { + try { + if (!(finished && queue.isEmpty())) { + final List segments = Lists.newLinkedList(); + queue.drainTo(segments); + try { + announcer.announceSegments(segments); + nextAnnoucement = exec.schedule(this, intervalMillis, TimeUnit.MILLISECONDS); + } + catch (IOException e) { + doneAnnouncing.setException( + new SegmentLoadingException(e, "Failed to announce segments[%s]", segments) + ); + } + } else { + doneAnnouncing.set(true); + } + } + catch (Exception e) { + doneAnnouncing.setException(e); + } + } + } + }, + intervalMillis, + TimeUnit.MILLISECONDS + ); + } + + public void finishAnnouncing() throws SegmentLoadingException + { + synchronized (lock) { + finished = true; + // announce any remaining segments + try { + final List segments = Lists.newLinkedList(); + queue.drainTo(segments); + announcer.announceSegments(segments); + } + catch (Exception e) { + throw new SegmentLoadingException(e, "Failed to announce segments[%s]", queue); + } + + // get any exception that may have been thrown in background annoucing + try { + // check in case intervalMillis is <= 0 + if (startedAnnouncing != null) { + startedAnnouncing.cancel(false); + } + // - if the task is waiting on the lock, then the queue will be empty by the time it runs + // - if the task just released it, then the lock ensures any exception is set in doneAnnouncing + if (doneAnnouncing.isDone()) { + doneAnnouncing.get(); + } + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new SegmentLoadingException(e, "Loading Interrupted"); + } + catch (ExecutionException e) { + throw new SegmentLoadingException(e.getCause(), "Background Announcing Task Failed"); + } + } + log.info("Completed background segment announcing"); + } + + @Override + public void close() + { + // stop background scheduling + synchronized (lock) { + finished = true; + if (nextAnnoucement != null) { + nextAnnoucement.cancel(false); + } + } + } + } +} diff --git a/server/src/main/java/io/druid/server/coordination/ZkCoordinator.java b/server/src/main/java/io/druid/server/coordination/ZkCoordinator.java index 77df75097c37..3f131101d093 100644 --- a/server/src/main/java/io/druid/server/coordination/ZkCoordinator.java +++ b/server/src/main/java/io/druid/server/coordination/ZkCoordinator.java @@ -59,7 +59,7 @@ /** */ -public class ZkCoordinator implements DataSegmentChangeHandler +public class ZkCoordinator { private static final EmittingLogger log = new EmittingLogger(ZkCoordinator.class); @@ -70,11 +70,7 @@ public class ZkCoordinator implements DataSegmentChangeHandler private final SegmentLoaderConfig config; private final DruidServerMetadata me; private final CuratorFramework curator; - private final DataSegmentAnnouncer announcer; - private final ServerManager serverManager; - private final ScheduledExecutorService exec; - private final ConcurrentSkipListSet segmentsToDelete; - + private final DataSegmentChangeHandler segmentChangeHandler; private volatile PathChildrenCache loadQueueCache; private volatile boolean started = false; @@ -85,10 +81,8 @@ public ZkCoordinator( SegmentLoaderConfig config, ZkPathsConfig zkPaths, DruidServerMetadata me, - DataSegmentAnnouncer announcer, CuratorFramework curator, - ServerManager serverManager, - ScheduledExecutorFactory factory + DataSegmentChangeHandler segmentChangeHandler ) { this.jsonMapper = jsonMapper; @@ -96,11 +90,7 @@ public ZkCoordinator( this.config = config; this.me = me; this.curator = curator; - this.announcer = announcer; - this.serverManager = serverManager; - - this.exec = factory.create(1, "ZkCoordinator-Exec--%d"); - this.segmentsToDelete = new ConcurrentSkipListSet<>(); + this.segmentChangeHandler = segmentChangeHandler; } @LifecycleStart @@ -130,8 +120,6 @@ public void start() throws IOException curator.newNamespaceAwareEnsurePath(servedSegmentsLocation).ensure(curator.getZookeeperClient()); curator.newNamespaceAwareEnsurePath(liveSegmentsLocation).ensure(curator.getZookeeperClient()); - loadLocalCache(); - loadQueueCache.getListenable().addListener( new PathChildrenCacheListener() { @@ -150,7 +138,7 @@ public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) th try { request.go( - getDataSegmentChangeHandler(), + segmentChangeHandler, new DataSegmentChangeCallback() { boolean hasRun = false; @@ -241,387 +229,5 @@ public boolean isStarted() return started; } - public void loadLocalCache() - { - final long start = System.currentTimeMillis(); - File baseDir = config.getInfoDir(); - if (!baseDir.exists() && !config.getInfoDir().mkdirs()) { - return; - } - - List cachedSegments = Lists.newArrayList(); - File[] segmentsToLoad = baseDir.listFiles(); - for (int i = 0; i < segmentsToLoad.length; i++) { - File file = segmentsToLoad[i]; - log.info("Loading segment cache file [%d/%d][%s].", i, segmentsToLoad.length, file); - try { - DataSegment segment = jsonMapper.readValue(file, DataSegment.class); - if (serverManager.isSegmentCached(segment)) { - cachedSegments.add(segment); - } else { - log.warn("Unable to find cache file for %s. Deleting lookup entry", segment.getIdentifier()); - - File segmentInfoCacheFile = new File(config.getInfoDir(), segment.getIdentifier()); - if (!segmentInfoCacheFile.delete()) { - log.warn("Unable to delete segmentInfoCacheFile[%s]", segmentInfoCacheFile); - } - } - } - catch (Exception e) { - log.makeAlert(e, "Failed to load segment from segmentInfo file") - .addData("file", file) - .emit(); - } - } - - addSegments( - cachedSegments, - new DataSegmentChangeCallback() - { - @Override - public void execute() - { - log.info("Cache load took %,d ms", System.currentTimeMillis() - start); - } - } - ); - } - - public DataSegmentChangeHandler getDataSegmentChangeHandler() - { - return ZkCoordinator.this; - } - - /** - * Load a single segment. If the segment is loaded succesfully, this function simply returns. Otherwise it will - * throw a SegmentLoadingException - * - * @throws SegmentLoadingException - */ - private void loadSegment(DataSegment segment, DataSegmentChangeCallback callback) throws SegmentLoadingException - { - final boolean loaded; - try { - loaded = serverManager.loadSegment(segment); - } - catch (Exception e) { - removeSegment(segment, callback); - throw new SegmentLoadingException(e, "Exception loading segment[%s]", segment.getIdentifier()); - } - - if (loaded) { - File segmentInfoCacheFile = new File(config.getInfoDir(), segment.getIdentifier()); - if (!segmentInfoCacheFile.exists()) { - try { - jsonMapper.writeValue(segmentInfoCacheFile, segment); - } - catch (IOException e) { - removeSegment(segment, callback); - throw new SegmentLoadingException( - e, "Failed to write to disk segment info cache file[%s]", segmentInfoCacheFile - ); - } - } - } - } - - @Override - public void addSegment(DataSegment segment, DataSegmentChangeCallback callback) - { - try { - log.info("Loading segment %s", segment.getIdentifier()); - /* - The lock below is used to prevent a race condition when the scheduled runnable in removeSegment() starts, - and if(segmentsToDelete.remove(segment)) returns true, in which case historical will start deleting segment - files. At that point, it's possible that right after the "if" check, addSegment() is called and actually loads - the segment, which makes dropping segment and downloading segment happen at the same time. - */ - if (segmentsToDelete.contains(segment)) { - /* - Both contains(segment) and remove(segment) can be moved inside the synchronized block. However, in that case, - each time when addSegment() is called, it has to wait for the lock in order to make progress, which will make - things slow. Given that in most cases segmentsToDelete.contains(segment) returns false, it will save a lot of - cost of acquiring lock by doing the "contains" check outside the synchronized block. - */ - synchronized (lock) { - segmentsToDelete.remove(segment); - } - } - loadSegment(segment, callback); - if (!announcer.isAnnounced(segment)) { - try { - announcer.announceSegment(segment); - } - catch (IOException e) { - throw new SegmentLoadingException(e, "Failed to announce segment[%s]", segment.getIdentifier()); - } - } - } - catch (SegmentLoadingException e) { - log.makeAlert(e, "Failed to load segment for dataSource") - .addData("segment", segment) - .emit(); - } - finally { - callback.execute(); - } - } - - private void addSegments(Collection segments, final DataSegmentChangeCallback callback) - { - ExecutorService loadingExecutor = null; - try (final BackgroundSegmentAnnouncer backgroundSegmentAnnouncer = - new BackgroundSegmentAnnouncer(announcer, exec, config.getAnnounceIntervalMillis())) { - - backgroundSegmentAnnouncer.startAnnouncing(); - - loadingExecutor = Execs.multiThreaded(config.getNumBootstrapThreads(), "ZkCoordinator-loading-%s"); - - final int numSegments = segments.size(); - final CountDownLatch latch = new CountDownLatch(numSegments); - final AtomicInteger counter = new AtomicInteger(0); - final CopyOnWriteArrayList failedSegments = new CopyOnWriteArrayList<>(); - for (final DataSegment segment : segments) { - loadingExecutor.submit( - new Runnable() - { - @Override - public void run() - { - try { - log.info( - "Loading segment[%d/%d][%s]", - counter.getAndIncrement(), - numSegments, - segment.getIdentifier() - ); - loadSegment(segment, callback); - if (!announcer.isAnnounced(segment)) { - try { - backgroundSegmentAnnouncer.announceSegment(segment); - } - catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new SegmentLoadingException(e, "Loading Interrupted"); - } - } - } - catch (SegmentLoadingException e) { - log.error(e, "[%s] failed to load", segment.getIdentifier()); - failedSegments.add(segment); - } - finally { - latch.countDown(); - } - } - } - ); - } - - try { - latch.await(); - - if (failedSegments.size() > 0) { - log.makeAlert("%,d errors seen while loading segments", failedSegments.size()) - .addData("failedSegments", failedSegments); - } - } - catch (InterruptedException e) { - Thread.currentThread().interrupt(); - log.makeAlert(e, "LoadingInterrupted"); - } - backgroundSegmentAnnouncer.finishAnnouncing(); - } - catch (SegmentLoadingException e) { - log.makeAlert(e, "Failed to load segments -- likely problem with announcing.") - .addData("numSegments", segments.size()) - .emit(); - } - finally { - callback.execute(); - if (loadingExecutor != null) { - loadingExecutor.shutdownNow(); - } - } - } - - - @Override - public void removeSegment(final DataSegment segment, final DataSegmentChangeCallback callback) - { - try { - announcer.unannounceSegment(segment); - segmentsToDelete.add(segment); - - log.info("Completely removing [%s] in [%,d] millis", segment.getIdentifier(), config.getDropSegmentDelayMillis()); - exec.schedule( - new Runnable() - { - @Override - public void run() - { - try { - synchronized (lock) { - if (segmentsToDelete.remove(segment)) { - serverManager.dropSegment(segment); - - File segmentInfoCacheFile = new File(config.getInfoDir(), segment.getIdentifier()); - if (!segmentInfoCacheFile.delete()) { - log.warn("Unable to delete segmentInfoCacheFile[%s]", segmentInfoCacheFile); - } - } - } - } - catch (Exception e) { - log.makeAlert(e, "Failed to remove segment! Possible resource leak!") - .addData("segment", segment) - .emit(); - } - } - }, - config.getDropSegmentDelayMillis(), - TimeUnit.MILLISECONDS - ); - } - catch (Exception e) { - log.makeAlert(e, "Failed to remove segment") - .addData("segment", segment) - .emit(); - } - finally { - callback.execute(); - } - } - - private static class BackgroundSegmentAnnouncer implements AutoCloseable - { - private static final EmittingLogger log = new EmittingLogger(BackgroundSegmentAnnouncer.class); - - private final int intervalMillis; - private final DataSegmentAnnouncer announcer; - private final ScheduledExecutorService exec; - private final LinkedBlockingQueue queue; - private final SettableFuture doneAnnouncing; - - private final Object lock = new Object(); - - private volatile boolean finished = false; - private volatile ScheduledFuture startedAnnouncing = null; - private volatile ScheduledFuture nextAnnoucement = null; - - public BackgroundSegmentAnnouncer( - DataSegmentAnnouncer announcer, - ScheduledExecutorService exec, - int intervalMillis - ) - { - this.announcer = announcer; - this.exec = exec; - this.intervalMillis = intervalMillis; - this.queue = Queues.newLinkedBlockingQueue(); - this.doneAnnouncing = SettableFuture.create(); - } - - public void announceSegment(final DataSegment segment) throws InterruptedException - { - if (finished) { - throw new ISE("Announce segment called after finishAnnouncing"); - } - queue.put(segment); - } - - public void startAnnouncing() - { - if (intervalMillis <= 0) { - return; - } - - log.info("Starting background segment announcing task"); - - // schedule background announcing task - nextAnnoucement = startedAnnouncing = exec.schedule( - new Runnable() - { - @Override - public void run() - { - synchronized (lock) { - try { - if (!(finished && queue.isEmpty())) { - final List segments = Lists.newLinkedList(); - queue.drainTo(segments); - try { - announcer.announceSegments(segments); - nextAnnoucement = exec.schedule(this, intervalMillis, TimeUnit.MILLISECONDS); - } - catch (IOException e) { - doneAnnouncing.setException( - new SegmentLoadingException(e, "Failed to announce segments[%s]", segments) - ); - } - } else { - doneAnnouncing.set(true); - } - } - catch (Exception e) { - doneAnnouncing.setException(e); - } - } - } - }, - intervalMillis, - TimeUnit.MILLISECONDS - ); - } - - public void finishAnnouncing() throws SegmentLoadingException - { - synchronized (lock) { - finished = true; - // announce any remaining segments - try { - final List segments = Lists.newLinkedList(); - queue.drainTo(segments); - announcer.announceSegments(segments); - } - catch (Exception e) { - throw new SegmentLoadingException(e, "Failed to announce segments[%s]", queue); - } - - // get any exception that may have been thrown in background annoucing - try { - // check in case intervalMillis is <= 0 - if (startedAnnouncing != null) { - startedAnnouncing.cancel(false); - } - // - if the task is waiting on the lock, then the queue will be empty by the time it runs - // - if the task just released it, then the lock ensures any exception is set in doneAnnouncing - if (doneAnnouncing.isDone()) { - doneAnnouncing.get(); - } - } - catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new SegmentLoadingException(e, "Loading Interrupted"); - } - catch (ExecutionException e) { - throw new SegmentLoadingException(e.getCause(), "Background Announcing Task Failed"); - } - } - log.info("Completed background segment announcing"); - } - - @Override - public void close() - { - // stop background scheduling - synchronized (lock) { - finished = true; - if (nextAnnoucement != null) { - nextAnnoucement.cancel(false); - } - } - } - } } diff --git a/server/src/main/java/io/druid/server/http/HistoricalResource.java b/server/src/main/java/io/druid/server/http/HistoricalResource.java index 4680cf29c6c5..a9935d4025de 100644 --- a/server/src/main/java/io/druid/server/http/HistoricalResource.java +++ b/server/src/main/java/io/druid/server/http/HistoricalResource.java @@ -20,6 +20,7 @@ package io.druid.server.http; import com.google.common.collect.ImmutableMap; +import io.druid.server.coordination.SegmentManager; import io.druid.server.coordination.ZkCoordinator; import javax.inject.Inject; @@ -32,14 +33,14 @@ @Path("/druid/historical/v1") public class HistoricalResource { - private final ZkCoordinator coordinator; + private final SegmentManager segmentManager; @Inject public HistoricalResource( - ZkCoordinator coordinator + SegmentManager segmentManager ) { - this.coordinator = coordinator; + this.segmentManager = segmentManager; } @GET @@ -47,6 +48,6 @@ public HistoricalResource( @Produces(MediaType.APPLICATION_JSON) public Response getLoadStatus() { - return Response.ok(ImmutableMap.of("cacheInitialized", coordinator.isStarted())).build(); + return Response.ok(ImmutableMap.of("cacheInitialized", segmentManager.isStarted())).build(); } } diff --git a/server/src/test/java/io/druid/server/coordination/ZkCoordinatorTest.java b/server/src/test/java/io/druid/server/coordination/SegmentManagerTest.java similarity index 93% rename from server/src/test/java/io/druid/server/coordination/ZkCoordinatorTest.java rename to server/src/test/java/io/druid/server/coordination/SegmentManagerTest.java index 04dcdebf3c1b..dfbcdc8c5bed 100644 --- a/server/src/test/java/io/druid/server/coordination/ZkCoordinatorTest.java +++ b/server/src/test/java/io/druid/server/coordination/SegmentManagerTest.java @@ -69,11 +69,11 @@ /** */ -public class ZkCoordinatorTest extends CuratorTestBase +public class SegmentManagerTest extends CuratorTestBase { public static final int COUNT = 50; - private static final Logger log = new Logger(ZkCoordinatorTest.class); + private static final Logger log = new Logger(SegmentManagerTest.class); private final ObjectMapper jsonMapper = new DefaultObjectMapper(); private final DruidServerMetadata me = new DruidServerMetadata( @@ -85,7 +85,7 @@ public class ZkCoordinatorTest extends CuratorTestBase 0 ); - private ZkCoordinator zkCoordinator; + private SegmentManager segmentManager; private ServerManager serverManager; private DataSegmentAnnouncer announcer; private File infoDir; @@ -101,7 +101,7 @@ public void setUp() throws Exception curator.start(); curator.blockUntilConnected(); try { - infoDir = new File(File.createTempFile("blah", "blah2").getParent(), "ZkCoordinatorTest"); + infoDir = new File(File.createTempFile("blah", "blah2").getParent(), "SegmentManagerTest"); infoDir.mkdirs(); for (File file : infoDir.listFiles()) { file.delete(); @@ -191,7 +191,7 @@ public boolean isAnnounced(DataSegment segment) } }; - zkCoordinator = new ZkCoordinator( + segmentManager = new SegmentManager( jsonMapper, new SegmentLoaderConfig() { @@ -219,10 +219,8 @@ public int getDropSegmentDelayMillis() return 0; } }, - zkPaths, me, announcer, - curator, serverManager, new ScheduledExecutorFactory() { @@ -247,8 +245,8 @@ public ScheduledFuture schedule( } }; } - } - ); + }); + } @After @@ -266,11 +264,11 @@ public void tearDown() throws Exception @Test public void testSegmentLoading1() throws Exception { - zkCoordinator.start(); + segmentManager.start(); final DataSegment segment = makeSegment("test", "1", new Interval("P1d/2011-04-01")); - zkCoordinator.removeSegment(segment, new DataSegmentChangeCallback() + segmentManager.removeSegment(segment, new DataSegmentChangeCallback() { @Override public void execute() @@ -281,7 +279,7 @@ public void execute() Assert.assertFalse(segmentsAnnouncedByMe.contains(segment)); - zkCoordinator.addSegment(segment, new DataSegmentChangeCallback() + segmentManager.addSegment(segment, new DataSegmentChangeCallback() { @Override public void execute() @@ -293,7 +291,7 @@ public void execute() /* make sure the scheduled runnable that "deletes" segment files has been executed. Because another addSegment() call is executed, which removes the segment from segmentsToDelete field in - ZkCoordinator, the scheduled runnable will not actually delete segment files. + SegmentManager, the scheduled runnable will not actually delete segment files. */ for (Runnable runnable : scheduledRunnable) { runnable.run(); @@ -302,7 +300,7 @@ Because another addSegment() call is executed, which removes the segment from se Assert.assertTrue(segmentsAnnouncedByMe.contains(segment)); Assert.assertFalse("segment files shouldn't be deleted", segmentLoader.getSegmentsInTrash().contains(segment)); - zkCoordinator.stop(); + segmentManager.stop(); } /** @@ -315,11 +313,11 @@ Because another addSegment() call is executed, which removes the segment from se @Test public void testSegmentLoading2() throws Exception { - zkCoordinator.start(); + segmentManager.start(); final DataSegment segment = makeSegment("test", "1", new Interval("P1d/2011-04-01")); - zkCoordinator.addSegment(segment, new DataSegmentChangeCallback() + segmentManager.addSegment(segment, new DataSegmentChangeCallback() { @Override public void execute() @@ -330,7 +328,7 @@ public void execute() Assert.assertTrue(segmentsAnnouncedByMe.contains(segment)); - zkCoordinator.removeSegment(segment, new DataSegmentChangeCallback() + segmentManager.removeSegment(segment, new DataSegmentChangeCallback() { @Override public void execute() @@ -341,7 +339,7 @@ public void execute() Assert.assertFalse(segmentsAnnouncedByMe.contains(segment)); - zkCoordinator.addSegment(segment, new DataSegmentChangeCallback() + segmentManager.addSegment(segment, new DataSegmentChangeCallback() { @Override public void execute() @@ -353,7 +351,7 @@ public void execute() /* make sure the scheduled runnable that "deletes" segment files has been executed. Because another addSegment() call is executed, which removes the segment from segmentsToDelete field in - ZkCoordinator, the scheduled runnable will not actually delete segment files. + SegmentManager, the scheduled runnable will not actually delete segment files. */ for (Runnable runnable : scheduledRunnable) { runnable.run(); @@ -362,7 +360,7 @@ Because another addSegment() call is executed, which removes the segment from se Assert.assertTrue(segmentsAnnouncedByMe.contains(segment)); Assert.assertFalse("segment files shouldn't be deleted", segmentLoader.getSegmentsInTrash().contains(segment)); - zkCoordinator.stop(); + segmentManager.stop(); } @Test @@ -392,14 +390,14 @@ public void testLoadCache() throws Exception checkCache(segments); Assert.assertTrue(serverManager.getDataSourceCounts().isEmpty()); - zkCoordinator.start(); + segmentManager.start(); Assert.assertTrue(!serverManager.getDataSourceCounts().isEmpty()); for (int i = 0; i < COUNT; ++i) { Assert.assertEquals(11L, serverManager.getDataSourceCounts().get("test" + i).longValue()); Assert.assertEquals(2L, serverManager.getDataSourceCounts().get("test_two" + i).longValue()); } Assert.assertEquals(13 * COUNT, announceCount.get()); - zkCoordinator.stop(); + segmentManager.stop(); for (DataSegment segment : segments) { deleteSegmentFromCache(segment); @@ -525,7 +523,7 @@ public String getBase() } ); - ZkCoordinator zkCoordinator = injector.getInstance(ZkCoordinator.class); + SegmentManager segmentManager = injector.getInstance(SegmentManager.class); List segments = Lists.newLinkedList(); for (int i = 0; i < COUNT; ++i) { @@ -544,14 +542,14 @@ public String getBase() checkCache(segments); Assert.assertTrue(serverManager.getDataSourceCounts().isEmpty()); - zkCoordinator.start(); + segmentManager.start(); Assert.assertTrue(!serverManager.getDataSourceCounts().isEmpty()); for (int i = 0; i < COUNT; ++i) { Assert.assertEquals(3L, serverManager.getDataSourceCounts().get("test" + i).longValue()); Assert.assertEquals(2L, serverManager.getDataSourceCounts().get("test_two" + i).longValue()); } Assert.assertEquals(5 * COUNT, announceCount.get()); - zkCoordinator.stop(); + segmentManager.stop(); for (DataSegment segment : segments) { deleteSegmentFromCache(segment); diff --git a/services/src/main/java/io/druid/cli/CliHistorical.java b/services/src/main/java/io/druid/cli/CliHistorical.java index 56c2025ebecb..a5de3c9f5e0d 100644 --- a/services/src/main/java/io/druid/cli/CliHistorical.java +++ b/services/src/main/java/io/druid/cli/CliHistorical.java @@ -36,6 +36,8 @@ import io.druid.guice.NodeTypeConfig; import io.druid.query.QuerySegmentWalker; import io.druid.server.QueryResource; +import io.druid.server.coordination.DataSegmentChangeHandler; +import io.druid.server.coordination.SegmentManager; import io.druid.server.coordination.ServerManager; import io.druid.server.coordination.ZkCoordinator; import io.druid.server.http.HistoricalResource; @@ -75,6 +77,8 @@ public void configure(Binder binder) // register Server before binding ZkCoordinator to ensure HTTP endpoints are available immediately LifecycleModule.register(binder, Server.class); binder.bind(ServerManager.class).in(LazySingleton.class); + binder.bind(DataSegmentChangeHandler.class).to(SegmentManager.class).in(ManageLifecycle.class); + binder.bind(ZkCoordinator.class).in(ManageLifecycle.class); binder.bind(QuerySegmentWalker.class).to(ServerManager.class).in(LazySingleton.class); From 8819adc38d1afd0368070bf4d04915296d5a2a45 Mon Sep 17 00:00:00 2001 From: Nishant Date: Tue, 2 Feb 2016 16:53:05 +0530 Subject: [PATCH 3/4] Add Http Endpoints for requests processing and pending request polling --- .../coordinator/DruidCoordinatorConfig.java | 6 ++ .../druid/server/http/HistoricalResource.java | 60 ++++++++++++++++++- 2 files changed, 65 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/io/druid/server/coordinator/DruidCoordinatorConfig.java b/server/src/main/java/io/druid/server/coordinator/DruidCoordinatorConfig.java index 529bd06aba0f..74b199d6a81f 100644 --- a/server/src/main/java/io/druid/server/coordinator/DruidCoordinatorConfig.java +++ b/server/src/main/java/io/druid/server/coordinator/DruidCoordinatorConfig.java @@ -75,6 +75,12 @@ public Duration getLoadTimeoutDelay() return new Duration(15 * 60 * 1000); } + @Config("druid.coordinator.load.status.poll.duration") + public Duration getLoadStatusPollDuration() + { + return new Duration(1000); + } + @Config("druid.coordinator.console.static") public String getConsoleStatic() { diff --git a/server/src/main/java/io/druid/server/http/HistoricalResource.java b/server/src/main/java/io/druid/server/http/HistoricalResource.java index a9935d4025de..c56c5295e8c9 100644 --- a/server/src/main/java/io/druid/server/http/HistoricalResource.java +++ b/server/src/main/java/io/druid/server/http/HistoricalResource.java @@ -19,28 +19,48 @@ package io.druid.server.http; +import com.amazonaws.services.dynamodbv2.model.ReturnConsumedCapacity; +import com.google.api.client.util.Lists; import com.google.common.collect.ImmutableMap; +import io.druid.concurrent.Execs; +import io.druid.segment.loading.SegmentLoaderConfig; +import io.druid.server.coordination.DataSegmentChangeCallback; +import io.druid.server.coordination.DataSegmentChangeRequest; import io.druid.server.coordination.SegmentManager; import io.druid.server.coordination.ZkCoordinator; import javax.inject.Inject; import javax.ws.rs.GET; +import javax.ws.rs.POST; import javax.ws.rs.Path; import javax.ws.rs.Produces; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; +import java.util.List; +import java.util.concurrent.ConcurrentSkipListMap; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; @Path("/druid/historical/v1") public class HistoricalResource { private final SegmentManager segmentManager; + private final ExecutorService exec; + private final List pendingRequests = new CopyOnWriteArrayList<>(); @Inject public HistoricalResource( - SegmentManager segmentManager + SegmentManager segmentManager, + SegmentLoaderConfig config ) { this.segmentManager = segmentManager; + exec = Execs.multiThreaded( + config.getNumLoadingThreads(), + "SegmentChangeRequestProcessing-%s" + ); + } @GET @@ -50,4 +70,42 @@ public Response getLoadStatus() { return Response.ok(ImmutableMap.of("cacheInitialized", segmentManager.isStarted())).build(); } + + @POST + @Path("/processRequest") + public Response processSegmentChangeRequest(final DataSegmentChangeRequest request) + { + try { + boolean newRequest = pendingRequests.add(request); + if (newRequest) { + exec.submit(new Runnable() + { + @Override + public void run() + { + request.go(segmentManager, new DataSegmentChangeCallback() + { + @Override + public void execute() + { + pendingRequests.remove(request); + } + }); + } + }); + } + return Response.ok().build(); + } + catch (Exception e) { + return Response.serverError().build(); + } + } + + @GET + @Path("/pendingRequests") + public Response pendingRequests() + { + return Response.ok(pendingRequests).build(); + } + } From 1b55a8ad170f564f1026695554115140016430c3 Mon Sep 17 00:00:00 2001 From: Nishant Date: Tue, 2 Feb 2016 18:01:46 +0530 Subject: [PATCH 4/4] Http Load Queue Peon initial imply fix initialization --- .../server/coordinator/DruidCoordinator.java | 27 +-- .../coordinator/DruidCoordinatorConfig.java | 2 +- .../server/coordinator/HttpLoadQueuePeon.java | 201 ++++++++++++++++++ .../coordinator/HttpLoadQueueTaskMaster.java | 63 ++++++ .../coordinator/LoadQueueTaskMaster.java | 30 +-- .../server/coordinator/ZkLoadQueuePeon.java | 1 - .../coordinator/ZkLoadQueueTaskMaster.java | 67 ++++++ .../coordinator/DruidCoordinatorTest.java | 16 -- .../java/io/druid/cli/CliCoordinator.java | 59 ++++- 9 files changed, 398 insertions(+), 68 deletions(-) create mode 100644 server/src/main/java/io/druid/server/coordinator/HttpLoadQueuePeon.java create mode 100644 server/src/main/java/io/druid/server/coordinator/HttpLoadQueueTaskMaster.java create mode 100644 server/src/main/java/io/druid/server/coordinator/ZkLoadQueueTaskMaster.java diff --git a/server/src/main/java/io/druid/server/coordinator/DruidCoordinator.java b/server/src/main/java/io/druid/server/coordinator/DruidCoordinator.java index 748a8522a6b4..4e85b3f28f12 100644 --- a/server/src/main/java/io/druid/server/coordinator/DruidCoordinator.java +++ b/server/src/main/java/io/druid/server/coordinator/DruidCoordinator.java @@ -348,7 +348,7 @@ public String getCurrentLeader() public void moveSegment( ImmutableDruidServer fromServer, - ImmutableDruidServer toServer, + final ImmutableDruidServer toServer, String segmentName, final LoadPeonCallback callback ) @@ -384,17 +384,6 @@ public void moveSegment( ); } - final String toLoadQueueSegPath = ZKPaths.makePath( - ZKPaths.makePath( - zkPaths.getLoadQueuePath(), - toServer.getName() - ), segmentName - ); - final String toServedSegPath = ZKPaths.makePath( - ZKPaths.makePath(serverInventoryView.getInventoryManagerConfig().getInventoryPath(), toServer.getName()), - segmentName - ); - loadPeon.loadSegment( segment, new LoadPeonCallback() @@ -403,9 +392,12 @@ public void moveSegment( public void execute() { try { - if (curator.checkExists().forPath(toServedSegPath) != null && - curator.checkExists().forPath(toLoadQueueSegPath) == null && - !dropPeon.getSegmentsToDrop().contains(segment)) { + DruidServer toLoadServer = serverInventoryView.getInventoryValue(toServer.getName()); + + if (toLoadServer != null // toLoadServer is still present + && toLoadServer.getSegment(segment.getIdentifier()) != null // verify segment loaded on toLoadServer + && !dropPeon.getSegmentsToDrop().contains(segment) // segment already in dropQueue + ) { dropPeon.dropSegment(segment, callback); } else if (callback != null) { callback.execute(); @@ -840,9 +832,8 @@ public ImmutableDruidServer apply(DruidServer input) final DruidCluster cluster = new DruidCluster(); for (ImmutableDruidServer server : servers) { if (!loadManagementPeons.containsKey(server.getName())) { - String basePath = ZKPaths.makePath(zkPaths.getLoadQueuePath(), server.getName()); - LoadQueuePeon loadQueuePeon = taskMaster.giveMePeon(basePath); - log.info("Creating LoadQueuePeon for server[%s] at path[%s]", server.getName(), basePath); + LoadQueuePeon loadQueuePeon = taskMaster.giveMePeon(server); + log.info("Creating LoadQueuePeon for server[%s]", server.getName()); loadManagementPeons.put(server.getName(), loadQueuePeon); } diff --git a/server/src/main/java/io/druid/server/coordinator/DruidCoordinatorConfig.java b/server/src/main/java/io/druid/server/coordinator/DruidCoordinatorConfig.java index 74b199d6a81f..b3f2d3f52c04 100644 --- a/server/src/main/java/io/druid/server/coordinator/DruidCoordinatorConfig.java +++ b/server/src/main/java/io/druid/server/coordinator/DruidCoordinatorConfig.java @@ -75,7 +75,7 @@ public Duration getLoadTimeoutDelay() return new Duration(15 * 60 * 1000); } - @Config("druid.coordinator.load.status.poll.duration") + @Config("druid.coordinator.load.status.pollDuration") public Duration getLoadStatusPollDuration() { return new Duration(1000); diff --git a/server/src/main/java/io/druid/server/coordinator/HttpLoadQueuePeon.java b/server/src/main/java/io/druid/server/coordinator/HttpLoadQueuePeon.java new file mode 100644 index 000000000000..9a69ec6cc3bf --- /dev/null +++ b/server/src/main/java/io/druid/server/coordinator/HttpLoadQueuePeon.java @@ -0,0 +1,201 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.server.coordinator; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Charsets; +import com.google.common.base.Predicate; +import com.google.common.collect.Maps; +import com.metamx.common.ISE; +import com.metamx.emitter.EmittingLogger; +import com.metamx.http.client.HttpClient; +import com.metamx.http.client.Request; +import com.metamx.http.client.response.StatusResponseHandler; +import com.metamx.http.client.response.StatusResponseHolder; +import io.druid.server.coordination.DataSegmentChangeRequest; +import io.druid.server.coordination.SegmentChangeRequestNoop; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.api.CuratorWatcher; +import org.apache.curator.utils.ZKPaths; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.data.Stat; +import org.jboss.netty.handler.codec.http.HttpMethod; +import org.jboss.netty.handler.codec.http.HttpResponseStatus; + +import javax.annotation.Nullable; +import javax.ws.rs.core.MediaType; +import java.net.MalformedURLException; +import java.net.URL; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +public class HttpLoadQueuePeon extends LoadQueuePeon +{ + private static final EmittingLogger log = new EmittingLogger(HttpLoadQueuePeon.class); + + private final HttpClient httpClient; + private final String baseURL; + private final ObjectMapper jsonMapper; + private final ScheduledExecutorService processingExecutor; + private final DruidCoordinatorConfig config; + private final StatusResponseHandler responseHandler; + private final Map pendingRequests = Maps.newConcurrentMap(); + + HttpLoadQueuePeon( + final HttpClient httpClient, + String baseURL, + ObjectMapper jsonMapper, + ScheduledExecutorService processingExecutor, + ExecutorService callbackExecutor, + DruidCoordinatorConfig config + ) + { + super(baseURL, processingExecutor, callbackExecutor); + this.httpClient = httpClient; + this.baseURL = baseURL; + this.jsonMapper = jsonMapper; + this.processingExecutor = processingExecutor; + this.config = config; + this.responseHandler = new StatusResponseHandler(Charsets.UTF_8); + processingExecutor.scheduleAtFixedRate( + new Runnable() + { + @Override + public void run() + { + try { + final List remotePendingRequests = getRemotePendingRequests(); + Map completedRequests = Maps.newHashMap(Maps.filterKeys( + pendingRequests, + new Predicate() + { + @Override + public boolean apply( + DataSegmentChangeRequest request + ) + { + return !remotePendingRequests.contains(request); + } + } + )); + for (Map.Entry completedRequest : completedRequests.entrySet()) { + completedRequest.getValue().run(); + pendingRequests.remove(completedRequest.getKey()); + } + } + catch (Exception e) { + log.error("Exception while fetching remote requests, Will retry again..", e); + } + } + }, + config.getLoadStatusPollDuration().getMillis(), + config.getLoadStatusPollDuration().getMillis(), + TimeUnit.MILLISECONDS + ); + + } + + private List getRemotePendingRequests() throws Exception + { + StatusResponseHolder response = httpClient.go( + new Request( + HttpMethod.GET, new URL(String.format("%s/pendingRequests", baseURL)) + ), + responseHandler + ).get(); + if (!response.getStatus().equals(HttpResponseStatus.OK)) { + throw new ISE( + "Error while fetching pending Requests on url[%s] status[%s] content[%s]", + baseURL, + response.getStatus(), + response.getContent() + ); + } + return jsonMapper.readValue( + response.getContent(), + new TypeReference>() + { + } + ); + } + + + @Override + void processHolder(final SegmentHolder holder) + { + try { + StatusResponseHolder response = httpClient.go( + new Request( + HttpMethod.POST, new URL(String.format("%s/processRequest", baseURL)) + ).setContent( + MediaType.APPLICATION_JSON, + jsonMapper.writeValueAsBytes(holder.getChangeRequest()) + ), + responseHandler + ).get(); + + if (!response.getStatus().equals(HttpResponseStatus.OK)) { + throw new ISE( + "Error while processing SegmentChangeRequest on url[%s] status[%s] content[%s]", + baseURL, + response.getStatus(), + response.getContent() + ); + } + // Add to pending Requests will be called when the request completes. + pendingRequests.put(holder.getChangeRequest(), new Runnable() + { + @Override + public void run() + { + actionCompleted(holder); + } + }); + + // Schedule timeout check. + processingExecutor.schedule( + new Runnable() + { + @Override + public void run() + { + if (pendingRequests.remove(holder.getChangeRequest()) != null) { + failAssign(holder, new ISE("%s was never processed! Failing this operation!", holder)); + } + } + }, + config.getLoadTimeoutDelay().getMillis(), + TimeUnit.MILLISECONDS + ); + + + } + catch (Exception e) { + failAssign(holder, e); + } + } + + +} diff --git a/server/src/main/java/io/druid/server/coordinator/HttpLoadQueueTaskMaster.java b/server/src/main/java/io/druid/server/coordinator/HttpLoadQueueTaskMaster.java new file mode 100644 index 000000000000..2a2dab3b60b1 --- /dev/null +++ b/server/src/main/java/io/druid/server/coordinator/HttpLoadQueueTaskMaster.java @@ -0,0 +1,63 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.server.coordinator; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.inject.Inject; +import com.metamx.http.client.HttpClient; +import io.druid.client.ImmutableDruidServer; +import org.apache.curator.framework.CuratorFramework; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ScheduledExecutorService; + +/** + * Provides HttpLoadQueuePeons + */ +public class HttpLoadQueueTaskMaster implements LoadQueueTaskMaster +{ + private final HttpClient httpClient; + private final ObjectMapper jsonMapper; + private final ScheduledExecutorService peonExec; + private final ExecutorService callbackExec; + private final DruidCoordinatorConfig config; + + @Inject + public HttpLoadQueueTaskMaster( + HttpClient httpClient, + ObjectMapper jsonMapper, + ScheduledExecutorService peonExec, + ExecutorService callbackExec, + DruidCoordinatorConfig config + ) + { + this.httpClient = httpClient; + this.jsonMapper = jsonMapper; + this.peonExec = peonExec; + this.callbackExec = callbackExec; + this.config = config; + } + + public LoadQueuePeon giveMePeon(ImmutableDruidServer druidServer) + { + String baseUrl = String.format("http://%s/druid/historical/v1", druidServer.getHost()); + return new HttpLoadQueuePeon(httpClient,baseUrl, jsonMapper, peonExec, callbackExec, config); + } +} diff --git a/server/src/main/java/io/druid/server/coordinator/LoadQueueTaskMaster.java b/server/src/main/java/io/druid/server/coordinator/LoadQueueTaskMaster.java index 12e0cd098de6..2da3ea9649f9 100644 --- a/server/src/main/java/io/druid/server/coordinator/LoadQueueTaskMaster.java +++ b/server/src/main/java/io/druid/server/coordinator/LoadQueueTaskMaster.java @@ -21,6 +21,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.inject.Inject; +import io.druid.client.ImmutableDruidServer; import org.apache.curator.framework.CuratorFramework; import java.util.concurrent.ExecutorService; @@ -29,32 +30,7 @@ /** * Provides LoadQueuePeons */ -public class LoadQueueTaskMaster +public interface LoadQueueTaskMaster { - private final CuratorFramework curator; - private final ObjectMapper jsonMapper; - private final ScheduledExecutorService peonExec; - private final ExecutorService callbackExec; - private final DruidCoordinatorConfig config; - - @Inject - public LoadQueueTaskMaster( - CuratorFramework curator, - ObjectMapper jsonMapper, - ScheduledExecutorService peonExec, - ExecutorService callbackExec, - DruidCoordinatorConfig config - ) - { - this.curator = curator; - this.jsonMapper = jsonMapper; - this.peonExec = peonExec; - this.callbackExec = callbackExec; - this.config = config; - } - - public LoadQueuePeon giveMePeon(String basePath) - { - return new ZkLoadQueuePeon(curator, basePath, jsonMapper, peonExec, callbackExec, config); - } + public LoadQueuePeon giveMePeon(ImmutableDruidServer server); } diff --git a/server/src/main/java/io/druid/server/coordinator/ZkLoadQueuePeon.java b/server/src/main/java/io/druid/server/coordinator/ZkLoadQueuePeon.java index 9c91c739187f..5c7a6fff907b 100644 --- a/server/src/main/java/io/druid/server/coordinator/ZkLoadQueuePeon.java +++ b/server/src/main/java/io/druid/server/coordinator/ZkLoadQueuePeon.java @@ -64,7 +64,6 @@ public class ZkLoadQueuePeon extends LoadQueuePeon @Override void processHolder(final SegmentHolder holder) { - try { final String path = ZKPaths.makePath(basePath, holder.getSegmentIdentifier()); final byte[] payload = jsonMapper.writeValueAsBytes(holder.getChangeRequest()); diff --git a/server/src/main/java/io/druid/server/coordinator/ZkLoadQueueTaskMaster.java b/server/src/main/java/io/druid/server/coordinator/ZkLoadQueueTaskMaster.java new file mode 100644 index 000000000000..0879059c44f3 --- /dev/null +++ b/server/src/main/java/io/druid/server/coordinator/ZkLoadQueueTaskMaster.java @@ -0,0 +1,67 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.server.coordinator; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.inject.Inject; +import io.druid.client.ImmutableDruidServer; +import io.druid.server.initialization.ZkPathsConfig; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.utils.ZKPaths; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ScheduledExecutorService; + +/** + * Provides ZkLoadQueuePeons + */ +public class ZkLoadQueueTaskMaster implements LoadQueueTaskMaster +{ + private final CuratorFramework curator; + private final ObjectMapper jsonMapper; + private final ScheduledExecutorService peonExec; + private final ExecutorService callbackExec; + private final DruidCoordinatorConfig config; + private final ZkPathsConfig zkPaths; + + @Inject + public ZkLoadQueueTaskMaster( + CuratorFramework curator, + ObjectMapper jsonMapper, + ScheduledExecutorService peonExec, + ExecutorService callbackExec, + DruidCoordinatorConfig config, + ZkPathsConfig zkPaths + ) + { + this.curator = curator; + this.jsonMapper = jsonMapper; + this.peonExec = peonExec; + this.callbackExec = callbackExec; + this.config = config; + this.zkPaths = zkPaths; + } + + public LoadQueuePeon giveMePeon(ImmutableDruidServer server) + { + String basePath = ZKPaths.makePath(zkPaths.getLoadQueuePath(), server.getName()); + return new ZkLoadQueuePeon(curator, basePath, jsonMapper, peonExec, callbackExec, config); + } +} diff --git a/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorTest.java b/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorTest.java index b8a0add0d0bd..5313a6a94337 100644 --- a/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorTest.java +++ b/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorTest.java @@ -232,22 +232,6 @@ public void testMoveSegment() throws Exception loadManagementPeons.put("from", loadQueuePeon); loadManagementPeons.put("to", loadQueuePeon); - EasyMock.expect(serverInventoryView.getInventoryManagerConfig()).andReturn( - new InventoryManagerConfig() - { - @Override - public String getContainerPath() - { - return ""; - } - - @Override - public String getInventoryPath() - { - return ""; - } - } - ); EasyMock.replay(serverInventoryView); coordinator.moveSegment( diff --git a/services/src/main/java/io/druid/cli/CliCoordinator.java b/services/src/main/java/io/druid/cli/CliCoordinator.java index 85dc77f47580..db020fcb36cd 100644 --- a/services/src/main/java/io/druid/cli/CliCoordinator.java +++ b/services/src/main/java/io/druid/cli/CliCoordinator.java @@ -22,11 +22,14 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableList; import com.google.inject.Binder; +import com.google.inject.Key; import com.google.inject.Module; import com.google.inject.Provides; +import com.google.inject.multibindings.MapBinder; import com.google.inject.name.Names; import com.metamx.common.concurrent.ScheduledExecutorFactory; import com.metamx.common.logger.Logger; +import com.metamx.http.client.HttpClient; import io.airlift.airline.Command; import io.druid.audit.AuditManager; import io.druid.client.CoordinatorServerView; @@ -37,6 +40,8 @@ import io.druid.guice.LazySingleton; import io.druid.guice.LifecycleModule; import io.druid.guice.ManageLifecycle; +import io.druid.guice.PolyBind; +import io.druid.guice.annotations.Global; import io.druid.metadata.MetadataRuleManager; import io.druid.metadata.MetadataRuleManagerConfig; import io.druid.metadata.MetadataRuleManagerProvider; @@ -48,7 +53,9 @@ import io.druid.server.audit.AuditManagerProvider; import io.druid.server.coordinator.DruidCoordinator; import io.druid.server.coordinator.DruidCoordinatorConfig; +import io.druid.server.coordinator.HttpLoadQueueTaskMaster; import io.druid.server.coordinator.LoadQueueTaskMaster; +import io.druid.server.coordinator.ZkLoadQueueTaskMaster; import io.druid.server.http.CoordinatorDynamicConfigsResource; import io.druid.server.http.CoordinatorRedirectInfo; import io.druid.server.http.CoordinatorResource; @@ -60,6 +67,7 @@ import io.druid.server.http.RulesResource; import io.druid.server.http.ServersResource; import io.druid.server.http.TiersResource; +import io.druid.server.initialization.ZkPathsConfig; import io.druid.server.initialization.jetty.JettyServerInitializer; import io.druid.server.router.TieredBrokerConfig; import org.apache.curator.framework.CuratorFramework; @@ -92,11 +100,27 @@ protected List getModules() @Override public void configure(Binder binder) { - binder.bindConstant().annotatedWith(Names.named("serviceName")).to(TieredBrokerConfig.DEFAULT_COORDINATOR_SERVICE_NAME); + binder.bindConstant() + .annotatedWith(Names.named("serviceName")) + .to(TieredBrokerConfig.DEFAULT_COORDINATOR_SERVICE_NAME); binder.bindConstant().annotatedWith(Names.named("servicePort")).to(8081); ConfigProvider.bind(binder, DruidCoordinatorConfig.class); + PolyBind.createChoice( + binder, + "druid.coordinator.load.peon.type", + Key.get(LoadQueueTaskMaster.class), + Key.get(HttpLoadQueueTaskMaster.class) + ); + final MapBinder biddy = PolyBind.optionBinder( + binder, + Key.get(LoadQueueTaskMaster.class) + ); + biddy.addBinding("zk").to(ZkLoadQueueTaskMaster.class); + biddy.addBinding("http").to(HttpLoadQueueTaskMaster.class); + + binder.bind(MetadataStorage.class) .toProvider(MetadataStorageProvider.class) .in(ManageLifecycle.class); @@ -104,6 +128,7 @@ public void configure(Binder binder) JsonConfigProvider.bind(binder, "druid.manager.segments", MetadataSegmentManagerConfig.class); JsonConfigProvider.bind(binder, "druid.manager.rules", MetadataRuleManagerConfig.class); + binder.bind(RedirectFilter.class).in(LazySingleton.class); binder.bind(RedirectInfo.class).to(CoordinatorRedirectInfo.class).in(LazySingleton.class); @@ -146,19 +171,43 @@ public void configure(Binder binder) @Provides @LazySingleton - public LoadQueueTaskMaster getLoadQueueTaskMaster( + public ZkLoadQueueTaskMaster getZkLoadQueueTaskMaster( CuratorFramework curator, ObjectMapper jsonMapper, ScheduledExecutorFactory factory, + DruidCoordinatorConfig config, + ZkPathsConfig zkPathsConfig + ) + { + return new ZkLoadQueueTaskMaster( + curator, + jsonMapper, + factory.create(1, "Master-PeonExec--%d"), + Executors.newSingleThreadExecutor(), + config, + zkPathsConfig + ); + } + + @Provides + @LazySingleton + public HttpLoadQueueTaskMaster getHttpLoadQueueTaskMaster( + @Global HttpClient httpClient, + ObjectMapper jsonMapper, + ScheduledExecutorFactory factory, DruidCoordinatorConfig config ) { - return new LoadQueueTaskMaster( - curator, jsonMapper, factory.create(1, "Master-PeonExec--%d"), - Executors.newSingleThreadExecutor(), config + return new HttpLoadQueueTaskMaster( + httpClient, + jsonMapper, + factory.create(1, "Master-PeonExec--%d"), + Executors.newSingleThreadExecutor(), + config ); } } + ); } }