diff --git a/docs/content/configuration/historical.md b/docs/content/configuration/historical.md index 8ddb30c12590..27e74d1bc3f8 100644 --- a/docs/content/configuration/historical.md +++ b/docs/content/configuration/historical.md @@ -36,7 +36,8 @@ The historical node uses several of the global configs in [Configuration](../con |`druid.segmentCache.dropSegmentDelayMillis`|How long a node delays before completely dropping segment.|30000 (30 seconds)| |`druid.segmentCache.infoDir`|Historical nodes keep track of the segments they are serving so that when the process is restarted they can reload the same segments without waiting for the Coordinator to reassign. This path defines where this metadata is kept. Directory will be created if needed.|${first_location}/info_dir| |`druid.segmentCache.announceIntervalMillis`|How frequently to announce segments while segments are loading from cache. Set this value to zero to wait for all segments to be loaded before announcing.|5000 (5 seconds)| -|`druid.segmentCache.numBootstrapThreads`|How many segments to load concurrently from local storage at startup.|1| +|`druid.segmentCache.numLoadingThreads`|How many segments to load concurrently from from deep storage.|1| +|`druid.segmentCache.numBootstrapThreads`|How many segments to load concurrently from local storage at startup.|Same as numLoadingThreads| ### Query Configs diff --git a/java-util/src/main/java/io/druid/java/util/common/Pair.java b/java-util/src/main/java/io/druid/java/util/common/Pair.java index 456b41970bd3..62142622c70c 100644 --- a/java-util/src/main/java/io/druid/java/util/common/Pair.java +++ b/java-util/src/main/java/io/druid/java/util/common/Pair.java @@ -34,9 +34,13 @@ public static Pair of(T1 lhs, T2 rhs) } public final T1 lhs; + public final T2 rhs; - public Pair(T1 lhs, T2 rhs) + public Pair( + T1 lhs, + T2 rhs + ) { this.lhs = lhs; this.rhs = rhs; diff --git a/server/src/main/java/io/druid/client/ImmutableDruidServer.java b/server/src/main/java/io/druid/client/ImmutableDruidServer.java index 1851befe277e..af36ec9563cd 100644 --- a/server/src/main/java/io/druid/client/ImmutableDruidServer.java +++ b/server/src/main/java/io/druid/client/ImmutableDruidServer.java @@ -20,6 +20,7 @@ package io.druid.client; import com.google.common.collect.ImmutableMap; +import com.metamx.common.StringUtils; import io.druid.server.coordination.DruidServerMetadata; import io.druid.server.coordination.ServerType; import io.druid.timeline.DataSegment; @@ -108,6 +109,15 @@ public Map getSegments() return segments; } + public String getURL() + { + if (metadata.getHostAndTlsPort() != null) { + return StringUtils.safeFormat("https://%s", metadata.getHostAndTlsPort()); + } else { + return StringUtils.safeFormat("http://%s", metadata.getHostAndPort()); + } + } + @Override public String toString() { diff --git a/server/src/main/java/io/druid/segment/loading/SegmentLoaderConfig.java b/server/src/main/java/io/druid/segment/loading/SegmentLoaderConfig.java index a0d66c1f402e..87131bf103dd 100644 --- a/server/src/main/java/io/druid/segment/loading/SegmentLoaderConfig.java +++ b/server/src/main/java/io/druid/segment/loading/SegmentLoaderConfig.java @@ -31,8 +31,6 @@ */ public class SegmentLoaderConfig { - private static final int DEFAULT_NUM_BOOTSTRAP_THREADS = 1; - @JsonProperty @NotEmpty private List locations = null; @@ -46,12 +44,18 @@ public class SegmentLoaderConfig @JsonProperty("announceIntervalMillis") private int announceIntervalMillis = 0; // do not background announce + @JsonProperty("numLoadingThreads") + private int numLoadingThreads = 1; + @JsonProperty("numBootstrapThreads") private Integer numBootstrapThreads = null; @JsonProperty private File infoDir = null; + @JsonProperty + private int statusQueueMaxSize = 100; + public List getLocations() { return locations; @@ -72,9 +76,14 @@ public int getAnnounceIntervalMillis() return announceIntervalMillis; } + public int getNumLoadingThreads() + { + return numLoadingThreads; + } + public int getNumBootstrapThreads() { - return numBootstrapThreads == null ? DEFAULT_NUM_BOOTSTRAP_THREADS : numBootstrapThreads; + return numBootstrapThreads == null ? numLoadingThreads : numBootstrapThreads; } public File getInfoDir() @@ -90,6 +99,11 @@ public File getInfoDir() return infoDir; } + public int getStatusQueueMaxSize() + { + return statusQueueMaxSize; + } + public SegmentLoaderConfig withLocations(List locations) { SegmentLoaderConfig retVal = new SegmentLoaderConfig(); diff --git a/server/src/main/java/io/druid/server/coordination/DataSegmentChangeCallback.java b/server/src/main/java/io/druid/server/coordination/DataSegmentChangeCallback.java index a21369976b5f..82dabc097ab1 100644 --- a/server/src/main/java/io/druid/server/coordination/DataSegmentChangeCallback.java +++ b/server/src/main/java/io/druid/server/coordination/DataSegmentChangeCallback.java @@ -24,4 +24,6 @@ public interface DataSegmentChangeCallback { public void execute(); + + DataSegmentChangeCallback NOOP = () -> {}; } diff --git a/server/src/main/java/io/druid/server/coordination/SegmentChangeRequestDrop.java b/server/src/main/java/io/druid/server/coordination/SegmentChangeRequestDrop.java index 5cb7c18d3a67..86cc31e401b0 100644 --- a/server/src/main/java/io/druid/server/coordination/SegmentChangeRequestDrop.java +++ b/server/src/main/java/io/druid/server/coordination/SegmentChangeRequestDrop.java @@ -25,6 +25,8 @@ import io.druid.java.util.common.StringUtils; import io.druid.timeline.DataSegment; +import java.util.Objects; + /** */ public class SegmentChangeRequestDrop implements DataSegmentChangeRequest @@ -58,6 +60,25 @@ public String asString() return StringUtils.format("DROP: %s", segment.getIdentifier()); } + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + SegmentChangeRequestDrop that = (SegmentChangeRequestDrop) o; + return Objects.equals(segment, that.segment); + } + + @Override + public int hashCode() + { + return Objects.hash(segment); + } + @Override public String toString() { diff --git a/server/src/main/java/io/druid/server/coordination/SegmentChangeRequestLoad.java b/server/src/main/java/io/druid/server/coordination/SegmentChangeRequestLoad.java index 9ad6f667f6a8..24a73f63a94a 100644 --- a/server/src/main/java/io/druid/server/coordination/SegmentChangeRequestLoad.java +++ b/server/src/main/java/io/druid/server/coordination/SegmentChangeRequestLoad.java @@ -25,6 +25,8 @@ import io.druid.java.util.common.StringUtils; import io.druid.timeline.DataSegment; +import java.util.Objects; + /** */ public class SegmentChangeRequestLoad implements DataSegmentChangeRequest @@ -58,6 +60,25 @@ public String asString() return StringUtils.format("LOAD: %s", segment.getIdentifier()); } + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + SegmentChangeRequestLoad that = (SegmentChangeRequestLoad) o; + return Objects.equals(segment, that.segment); + } + + @Override + public int hashCode() + { + return Objects.hash(segment); + } + @Override public String toString() { diff --git a/server/src/main/java/io/druid/server/coordination/SegmentLoadDropHandler.java b/server/src/main/java/io/druid/server/coordination/SegmentLoadDropHandler.java new file mode 100644 index 000000000000..01bf0cdb8ed5 --- /dev/null +++ b/server/src/main/java/io/druid/server/coordination/SegmentLoadDropHandler.java @@ -0,0 +1,833 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.server.coordination; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.base.Throwables; +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Queues; +import com.google.common.util.concurrent.AbstractFuture; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; +import com.google.inject.Inject; +import com.metamx.emitter.EmittingLogger; +import io.druid.concurrent.Execs; +import io.druid.guice.ManageLifecycle; +import io.druid.java.util.common.ISE; +import io.druid.java.util.common.lifecycle.LifecycleStart; +import io.druid.java.util.common.lifecycle.LifecycleStop; +import io.druid.segment.loading.SegmentLoaderConfig; +import io.druid.segment.loading.SegmentLoadingException; +import io.druid.server.SegmentManager; +import io.druid.timeline.DataSegment; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentSkipListSet; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; + +/** + */ +@ManageLifecycle +public class SegmentLoadDropHandler implements DataSegmentChangeHandler +{ + private static final EmittingLogger log = new EmittingLogger(SegmentLoadDropHandler.class); + + private final Object lock = new Object(); + + private final ObjectMapper jsonMapper; + private final SegmentLoaderConfig config; + private final DataSegmentAnnouncer announcer; + private final DataSegmentServerAnnouncer serverAnnouncer; + private final SegmentManager segmentManager; + private final ScheduledExecutorService exec; + private final ConcurrentSkipListSet segmentsToDelete; + + private volatile boolean started = false; + + // Keep history of load/drop request status in a LRU cache to maintain idempotency if same request shows up + // again and to return status of a completed request. Maximum size of this cache must be significantly greater + // than number of pending load/drop requests. so that history is not lost too quickly. + private final Cache> requestStatuses; + private final Object requestStatusesLock = new Object(); + + // This is the list of unresolved futures returned to callers of processBatch(List) + // Threads loading/dropping segments resolve these futures as and when some segment load/drop finishes. + private final LinkedHashSet waitingFutures = new LinkedHashSet<>(); + + @Inject + public SegmentLoadDropHandler( + ObjectMapper jsonMapper, + SegmentLoaderConfig config, + DataSegmentAnnouncer announcer, + DataSegmentServerAnnouncer serverAnnouncer, + SegmentManager segmentManager + ) + { + this(jsonMapper, config, announcer, serverAnnouncer, segmentManager, + Executors.newScheduledThreadPool( + config.getNumLoadingThreads(), + Execs.makeThreadFactory("SimpleDataSegmentChangeHandler-%s") + ) + ); + } + + @VisibleForTesting + SegmentLoadDropHandler( + ObjectMapper jsonMapper, + SegmentLoaderConfig config, + DataSegmentAnnouncer announcer, + DataSegmentServerAnnouncer serverAnnouncer, + SegmentManager segmentManager, + ScheduledExecutorService exec + ) + { + this.jsonMapper = jsonMapper; + this.config = config; + this.announcer = announcer; + this.serverAnnouncer = serverAnnouncer; + this.segmentManager = segmentManager; + + this.exec = exec; + this.segmentsToDelete = new ConcurrentSkipListSet<>(); + + requestStatuses = CacheBuilder.newBuilder().maximumSize(config.getStatusQueueMaxSize()).initialCapacity(8).build(); + } + + @LifecycleStart + public void start() throws IOException + { + synchronized (lock) { + if (started) { + return; + } + + log.info("Starting..."); + try { + loadLocalCache(); + serverAnnouncer.announce(); + } + catch (Exception e) { + Throwables.propagateIfPossible(e, IOException.class); + throw Throwables.propagate(e); + } + started = true; + log.info("Started."); + } + } + + @LifecycleStop + public void stop() + { + synchronized (lock) { + if (!started) { + return; + } + + log.info("Stopping..."); + try { + serverAnnouncer.unannounce(); + } + catch (Exception e) { + throw Throwables.propagate(e); + } + finally { + started = false; + } + log.info("Stopped."); + } + } + + public boolean isStarted() + { + return started; + } + + private void loadLocalCache() + { + final long start = System.currentTimeMillis(); + File baseDir = config.getInfoDir(); + if (!baseDir.isDirectory()) { + if (baseDir.exists()) { + throw new ISE("[%s] exists but not a directory.", baseDir); + } else if (!baseDir.mkdirs()) { + throw new ISE("Failed to create directory[%s].", baseDir); + } + } + + List cachedSegments = Lists.newArrayList(); + File[] segmentsToLoad = baseDir.listFiles(); + int ignored = 0; + for (int i = 0; i < segmentsToLoad.length; i++) { + File file = segmentsToLoad[i]; + log.info("Loading segment cache file [%d/%d][%s].", i + 1, segmentsToLoad.length, file); + try { + final DataSegment segment = jsonMapper.readValue(file, DataSegment.class); + + if (!segment.getIdentifier().equals(file.getName())) { + log.warn("Ignoring cache file[%s] for segment[%s].", file.getPath(), segment.getIdentifier()); + ignored++; + } else if (segmentManager.isSegmentCached(segment)) { + cachedSegments.add(segment); + } else { + log.warn("Unable to find cache file for %s. Deleting lookup entry", segment.getIdentifier()); + + File segmentInfoCacheFile = new File(baseDir, segment.getIdentifier()); + if (!segmentInfoCacheFile.delete()) { + log.warn("Unable to delete segmentInfoCacheFile[%s]", segmentInfoCacheFile); + } + } + } + catch (Exception e) { + log.makeAlert(e, "Failed to load segment from segmentInfo file") + .addData("file", file) + .emit(); + } + } + + if (ignored > 0) { + log.makeAlert("Ignored misnamed segment cache files on startup.") + .addData("numIgnored", ignored) + .emit(); + } + + addSegments( + cachedSegments, + new DataSegmentChangeCallback() + { + @Override + public void execute() + { + log.info("Cache load took %,d ms", System.currentTimeMillis() - start); + } + } + ); + } + + /** + * Load a single segment. If the segment is loaded successfully, this function simply returns. Otherwise it will + * throw a SegmentLoadingException + * + * @throws SegmentLoadingException + */ + private void loadSegment(DataSegment segment, DataSegmentChangeCallback callback) throws SegmentLoadingException + { + final boolean loaded; + try { + loaded = segmentManager.loadSegment(segment); + } + catch (Exception e) { + removeSegment(segment, callback, false); + throw new SegmentLoadingException(e, "Exception loading segment[%s]", segment.getIdentifier()); + } + + if (loaded) { + File segmentInfoCacheFile = new File(config.getInfoDir(), segment.getIdentifier()); + if (!segmentInfoCacheFile.exists()) { + try { + jsonMapper.writeValue(segmentInfoCacheFile, segment); + } + catch (IOException e) { + removeSegment(segment, callback, false); + throw new SegmentLoadingException( + e, "Failed to write to disk segment info cache file[%s]", segmentInfoCacheFile + ); + } + } + } + } + + @Override + public void addSegment(DataSegment segment, DataSegmentChangeCallback callback) + { + Status result = null; + try { + log.info("Loading segment %s", segment.getIdentifier()); + /* + The lock below is used to prevent a race condition when the scheduled runnable in removeSegment() starts, + and if (segmentsToDelete.remove(segment)) returns true, in which case historical will start deleting segment + files. At that point, it's possible that right after the "if" check, addSegment() is called and actually loads + the segment, which makes dropping segment and downloading segment happen at the same time. + */ + if (segmentsToDelete.contains(segment)) { + /* + Both contains(segment) and remove(segment) can be moved inside the synchronized block. However, in that case, + each time when addSegment() is called, it has to wait for the lock in order to make progress, which will make + things slow. Given that in most cases segmentsToDelete.contains(segment) returns false, it will save a lot of + cost of acquiring lock by doing the "contains" check outside the synchronized block. + */ + synchronized (lock) { + segmentsToDelete.remove(segment); + } + } + loadSegment(segment, DataSegmentChangeCallback.NOOP); + try { + announcer.announceSegment(segment); + } + catch (IOException e) { + throw new SegmentLoadingException(e, "Failed to announce segment[%s]", segment.getIdentifier()); + } + + result = Status.SUCCESS; + } + catch (Exception e) { + log.makeAlert(e, "Failed to load segment for dataSource") + .addData("segment", segment) + .emit(); + result = Status.failed(e.getMessage()); + } + finally { + updateRequestStatus(new SegmentChangeRequestLoad(segment), result); + callback.execute(); + } + } + + private void addSegments(Collection segments, final DataSegmentChangeCallback callback) + { + ExecutorService loadingExecutor = null; + try (final BackgroundSegmentAnnouncer backgroundSegmentAnnouncer = + new BackgroundSegmentAnnouncer(announcer, exec, config.getAnnounceIntervalMillis())) { + + backgroundSegmentAnnouncer.startAnnouncing(); + + loadingExecutor = Execs.multiThreaded(config.getNumBootstrapThreads(), "Segment-Load-Startup-%s"); + + final int numSegments = segments.size(); + final CountDownLatch latch = new CountDownLatch(numSegments); + final AtomicInteger counter = new AtomicInteger(0); + final CopyOnWriteArrayList failedSegments = new CopyOnWriteArrayList<>(); + for (final DataSegment segment : segments) { + loadingExecutor.submit( + new Runnable() + { + @Override + public void run() + { + try { + log.info( + "Loading segment[%d/%d][%s]", + counter.incrementAndGet(), + numSegments, + segment.getIdentifier() + ); + loadSegment(segment, callback); + try { + backgroundSegmentAnnouncer.announceSegment(segment); + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new SegmentLoadingException(e, "Loading Interrupted"); + } + } + catch (SegmentLoadingException e) { + log.error(e, "[%s] failed to load", segment.getIdentifier()); + failedSegments.add(segment); + } + finally { + latch.countDown(); + } + } + } + ); + } + + try { + latch.await(); + + if (failedSegments.size() > 0) { + log.makeAlert("%,d errors seen while loading segments", failedSegments.size()) + .addData("failedSegments", failedSegments); + } + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + log.makeAlert(e, "LoadingInterrupted"); + } + + backgroundSegmentAnnouncer.finishAnnouncing(); + } + catch (SegmentLoadingException e) { + log.makeAlert(e, "Failed to load segments -- likely problem with announcing.") + .addData("numSegments", segments.size()) + .emit(); + } + finally { + callback.execute(); + if (loadingExecutor != null) { + loadingExecutor.shutdownNow(); + } + } + } + + @Override + public void removeSegment(DataSegment segment, DataSegmentChangeCallback callback) + { + removeSegment(segment, callback, true); + } + + private void removeSegment( + final DataSegment segment, + final DataSegmentChangeCallback callback, + final boolean scheduleDrop + ) + { + Status result = null; + try { + announcer.unannounceSegment(segment); + segmentsToDelete.add(segment); + + Runnable runnable = new Runnable() + { + @Override + public void run() + { + try { + synchronized (lock) { + if (segmentsToDelete.remove(segment)) { + segmentManager.dropSegment(segment); + + File segmentInfoCacheFile = new File(config.getInfoDir(), segment.getIdentifier()); + if (!segmentInfoCacheFile.delete()) { + log.warn("Unable to delete segmentInfoCacheFile[%s]", segmentInfoCacheFile); + } + } + } + } + catch (Exception e) { + log.makeAlert(e, "Failed to remove segment! Possible resource leak!") + .addData("segment", segment) + .emit(); + } + } + }; + + if (scheduleDrop) { + log.info( + "Completely removing [%s] in [%,d] millis", + segment.getIdentifier(), + config.getDropSegmentDelayMillis() + ); + exec.schedule( + runnable, + config.getDropSegmentDelayMillis(), + TimeUnit.MILLISECONDS + ); + } else { + runnable.run(); + } + + result = Status.SUCCESS; + } + catch (Exception e) { + log.makeAlert(e, "Failed to remove segment") + .addData("segment", segment) + .emit(); + result = Status.failed(e.getMessage()); + } + finally { + updateRequestStatus(new SegmentChangeRequestDrop(segment), result); + callback.execute(); + } + } + + public Collection getPendingDeleteSnapshot() + { + return ImmutableList.copyOf(segmentsToDelete); + } + + public ListenableFuture> processBatch(List changeRequests) + { + boolean isAnyRequestDone = false; + + Map> statuses = Maps.newHashMapWithExpectedSize(changeRequests.size()); + + for (DataSegmentChangeRequest cr : changeRequests) { + AtomicReference status = processRequest(cr); + if (status.get().getState() != Status.STATE.PENDING) { + isAnyRequestDone = true; + } + statuses.put(cr, status); + } + + CustomSettableFuture future = new CustomSettableFuture(waitingFutures, statuses); + + if (isAnyRequestDone) { + future.resolve(); + } else { + synchronized (waitingFutures) { + waitingFutures.add(future); + } + } + + return future; + } + + private AtomicReference processRequest(DataSegmentChangeRequest changeRequest) + { + synchronized (requestStatusesLock) { + if (requestStatuses.getIfPresent(changeRequest) == null) { + changeRequest.go( + new DataSegmentChangeHandler() + { + @Override + public void addSegment(DataSegment segment, DataSegmentChangeCallback callback) + { + requestStatuses.put(changeRequest, new AtomicReference<>(Status.PENDING)); + exec.submit( + () -> SegmentLoadDropHandler.this.addSegment( + ((SegmentChangeRequestLoad) changeRequest).getSegment(), + () -> resolveWaitingFutures() + ) + ); + } + + @Override + public void removeSegment(DataSegment segment, DataSegmentChangeCallback callback) + { + requestStatuses.put(changeRequest, new AtomicReference<>(Status.PENDING)); + SegmentLoadDropHandler.this.removeSegment( + ((SegmentChangeRequestDrop) changeRequest).getSegment(), + () -> resolveWaitingFutures(), + true + ); + } + }, + () -> resolveWaitingFutures() + ); + } + return requestStatuses.getIfPresent(changeRequest); + } + } + + private void updateRequestStatus(DataSegmentChangeRequest changeRequest, Status result) + { + if (result == null) { + result = Status.failed("Unknown reason. Check server logs."); + } + synchronized (requestStatusesLock) { + AtomicReference statusRef = requestStatuses.getIfPresent(changeRequest); + if (statusRef != null) { + statusRef.set(result); + } + } + } + + private void resolveWaitingFutures() + { + LinkedHashSet waitingFuturesCopy = new LinkedHashSet<>(); + synchronized (waitingFutures) { + waitingFuturesCopy.addAll(waitingFutures); + waitingFutures.clear(); + } + for (CustomSettableFuture future : waitingFuturesCopy) { + future.resolve(); + } + } + + private static class BackgroundSegmentAnnouncer implements AutoCloseable + { + private static final EmittingLogger log = new EmittingLogger(BackgroundSegmentAnnouncer.class); + + private final int intervalMillis; + private final DataSegmentAnnouncer announcer; + private final ScheduledExecutorService exec; + private final LinkedBlockingQueue queue; + private final SettableFuture doneAnnouncing; + + private final Object lock = new Object(); + + private volatile boolean finished = false; + private volatile ScheduledFuture startedAnnouncing = null; + private volatile ScheduledFuture nextAnnoucement = null; + + public BackgroundSegmentAnnouncer( + DataSegmentAnnouncer announcer, + ScheduledExecutorService exec, + int intervalMillis + ) + { + this.announcer = announcer; + this.exec = exec; + this.intervalMillis = intervalMillis; + this.queue = Queues.newLinkedBlockingQueue(); + this.doneAnnouncing = SettableFuture.create(); + } + + public void announceSegment(final DataSegment segment) throws InterruptedException + { + if (finished) { + throw new ISE("Announce segment called after finishAnnouncing"); + } + queue.put(segment); + } + + public void startAnnouncing() + { + if (intervalMillis <= 0) { + return; + } + + log.info("Starting background segment announcing task"); + + // schedule background announcing task + nextAnnoucement = startedAnnouncing = exec.schedule( + new Runnable() + { + @Override + public void run() + { + synchronized (lock) { + try { + if (!(finished && queue.isEmpty())) { + final List segments = Lists.newLinkedList(); + queue.drainTo(segments); + try { + announcer.announceSegments(segments); + nextAnnoucement = exec.schedule(this, intervalMillis, TimeUnit.MILLISECONDS); + } + catch (IOException e) { + doneAnnouncing.setException( + new SegmentLoadingException(e, "Failed to announce segments[%s]", segments) + ); + } + } else { + doneAnnouncing.set(true); + } + } + catch (Exception e) { + doneAnnouncing.setException(e); + } + } + } + }, + intervalMillis, + TimeUnit.MILLISECONDS + ); + } + + public void finishAnnouncing() throws SegmentLoadingException + { + synchronized (lock) { + finished = true; + // announce any remaining segments + try { + final List segments = Lists.newLinkedList(); + queue.drainTo(segments); + announcer.announceSegments(segments); + } + catch (Exception e) { + throw new SegmentLoadingException(e, "Failed to announce segments[%s]", queue); + } + + // get any exception that may have been thrown in background announcing + try { + // check in case intervalMillis is <= 0 + if (startedAnnouncing != null) { + startedAnnouncing.cancel(false); + } + // - if the task is waiting on the lock, then the queue will be empty by the time it runs + // - if the task just released it, then the lock ensures any exception is set in doneAnnouncing + if (doneAnnouncing.isDone()) { + doneAnnouncing.get(); + } + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new SegmentLoadingException(e, "Loading Interrupted"); + } + catch (ExecutionException e) { + throw new SegmentLoadingException(e.getCause(), "Background Announcing Task Failed"); + } + } + log.info("Completed background segment announcing"); + } + + @Override + public void close() + { + // stop background scheduling + synchronized (lock) { + finished = true; + if (nextAnnoucement != null) { + nextAnnoucement.cancel(false); + } + } + } + } + + // Future with cancel() implementation to remove it from "waitingFutures" list + private static class CustomSettableFuture extends AbstractFuture> + { + private final LinkedHashSet waitingFutures; + private final Map> statusRefs; + + private CustomSettableFuture( + LinkedHashSet waitingFutures, + Map> statusRefs + ) + { + this.waitingFutures = waitingFutures; + this.statusRefs = statusRefs; + } + + public void resolve() + { + synchronized (statusRefs) { + if (isDone()) { + return; + } + + List result = new ArrayList<>(statusRefs.size()); + statusRefs.forEach( + (request, statusRef) -> result.add(new DataSegmentChangeRequestAndStatus(request, statusRef.get())) + ); + + super.set(result); + } + } + + @Override + public boolean setException(Throwable throwable) + { + return super.setException(throwable); + } + + @Override + public boolean cancel(boolean interruptIfRunning) + { + synchronized (waitingFutures) { + waitingFutures.remove(this); + } + return true; + } + } + + public static class Status + { + public enum STATE + { + SUCCESS, FAILED, PENDING + } + + private final STATE state; + private final String failureCause; + + public final static Status SUCCESS = new Status(STATE.SUCCESS, null); + public final static Status PENDING = new Status(STATE.PENDING, null); + + @JsonCreator + Status( + @JsonProperty("state") STATE state, + @JsonProperty("failureCause") String failureCause + ) + { + Preconditions.checkNotNull(state, "state must be non-null"); + this.state = state; + this.failureCause = failureCause; + } + + public static Status failed(String cause) + { + return new Status(STATE.FAILED, cause); + } + + @JsonProperty + public STATE getState() + { + return state; + } + + @JsonProperty + public String getFailureCause() + { + return failureCause; + } + + @Override + public String toString() + { + return "Status{" + + "state=" + state + + ", failureCause='" + failureCause + '\'' + + '}'; + } + } + + public static class DataSegmentChangeRequestAndStatus + { + private final DataSegmentChangeRequest request; + private final Status status; + + @JsonCreator + public DataSegmentChangeRequestAndStatus( + @JsonProperty("request") DataSegmentChangeRequest request, + @JsonProperty("status") Status status + ) + { + this.request = request; + this.status = status; + } + + @JsonProperty + public DataSegmentChangeRequest getRequest() + { + return request; + } + + @JsonProperty + public Status getStatus() + { + return status; + } + + @Override + public String toString() + { + return "DataSegmentChangeRequestAndStatus{" + + "request=" + request + + ", status=" + status + + '}'; + } + } +} + diff --git a/server/src/main/java/io/druid/server/coordination/ZkCoordinator.java b/server/src/main/java/io/druid/server/coordination/ZkCoordinator.java index 5747cd46fc9e..d181a1477129 100644 --- a/server/src/main/java/io/druid/server/coordination/ZkCoordinator.java +++ b/server/src/main/java/io/druid/server/coordination/ZkCoordinator.java @@ -21,22 +21,12 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Throwables; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Lists; -import com.google.common.collect.Queues; -import com.google.common.util.concurrent.SettableFuture; import com.google.inject.Inject; import com.metamx.emitter.EmittingLogger; import io.druid.concurrent.Execs; -import io.druid.java.util.common.ISE; -import io.druid.java.util.common.concurrent.ScheduledExecutorFactory; import io.druid.java.util.common.lifecycle.LifecycleStart; import io.druid.java.util.common.lifecycle.LifecycleStop; -import io.druid.segment.loading.SegmentLoaderConfig; -import io.druid.segment.loading.SegmentLoadingException; -import io.druid.server.SegmentManager; import io.druid.server.initialization.ZkPathsConfig; -import io.druid.timeline.DataSegment; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.recipes.cache.ChildData; import org.apache.curator.framework.recipes.cache.PathChildrenCache; @@ -44,68 +34,39 @@ import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener; import org.apache.curator.utils.ZKPaths; -import java.io.File; import java.io.IOException; -import java.util.Collection; -import java.util.List; -import java.util.concurrent.ConcurrentSkipListSet; -import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; /** */ -public class ZkCoordinator implements DataSegmentChangeHandler +public class ZkCoordinator { private static final EmittingLogger log = new EmittingLogger(ZkCoordinator.class); private final Object lock = new Object(); + private final DataSegmentChangeHandler dataSegmentChangeHandler; private final ObjectMapper jsonMapper; private final ZkPathsConfig zkPaths; - private final SegmentLoaderConfig config; private final DruidServerMetadata me; private final CuratorFramework curator; - private final DataSegmentAnnouncer announcer; - private final DataSegmentServerAnnouncer serverAnnouncer; - private final SegmentManager segmentManager; - private final ScheduledExecutorService exec; - private final ConcurrentSkipListSet segmentsToDelete; - private volatile PathChildrenCache loadQueueCache; private volatile boolean started = false; @Inject public ZkCoordinator( + SegmentLoadDropHandler loadDropHandler, ObjectMapper jsonMapper, - SegmentLoaderConfig config, ZkPathsConfig zkPaths, DruidServerMetadata me, - DataSegmentAnnouncer announcer, - DataSegmentServerAnnouncer serverAnnouncer, - CuratorFramework curator, - SegmentManager segmentManager, - ScheduledExecutorFactory factory + CuratorFramework curator ) { + this.dataSegmentChangeHandler = loadDropHandler; this.jsonMapper = jsonMapper; this.zkPaths = zkPaths; - this.config = config; this.me = me; this.curator = curator; - this.announcer = announcer; - this.serverAnnouncer = serverAnnouncer; - this.segmentManager = segmentManager; - - this.exec = factory.create(1, "ZkCoordinator-Exec--%d"); - this.segmentsToDelete = new ConcurrentSkipListSet<>(); } @LifecycleStart @@ -127,7 +88,7 @@ public void start() throws IOException loadQueueLocation, true, true, - Execs.singleThreaded("ZkCoordinator-%s") + Execs.singleThreaded("ZkCoordinator") ); try { @@ -135,9 +96,6 @@ public void start() throws IOException curator.newNamespaceAwareEnsurePath(servedSegmentsLocation).ensure(curator.getZookeeperClient()); curator.newNamespaceAwareEnsurePath(liveSegmentsLocation).ensure(curator.getZookeeperClient()); - loadLocalCache(); - serverAnnouncer.announce(); - loadQueueCache.getListenable().addListener( new PathChildrenCacheListener() { @@ -156,7 +114,7 @@ public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) th try { request.go( - getDataSegmentChangeHandler(), + dataSegmentChangeHandler, new DataSegmentChangeCallback() { boolean hasRun = false; @@ -231,7 +189,6 @@ public void stop() try { loadQueueCache.close(); - serverAnnouncer.unannounce(); } catch (Exception e) { throw Throwables.propagate(e); @@ -247,400 +204,4 @@ public boolean isStarted() { return started; } - - public void loadLocalCache() - { - final long start = System.currentTimeMillis(); - File baseDir = config.getInfoDir(); - if (!baseDir.exists() && !config.getInfoDir().mkdirs()) { - return; - } - - List cachedSegments = Lists.newArrayList(); - File[] segmentsToLoad = baseDir.listFiles(); - int ignored = 0; - for (int i = 0; i < segmentsToLoad.length; i++) { - File file = segmentsToLoad[i]; - log.info("Loading segment cache file [%d/%d][%s].", i + 1, segmentsToLoad.length, file); - try { - final DataSegment segment = jsonMapper.readValue(file, DataSegment.class); - - if (!segment.getIdentifier().equals(file.getName())) { - log.warn("Ignoring cache file[%s] for segment[%s].", file.getPath(), segment.getIdentifier()); - ignored++; - } else if (segmentManager.isSegmentCached(segment)) { - cachedSegments.add(segment); - } else { - log.warn("Unable to find cache file for %s. Deleting lookup entry", segment.getIdentifier()); - - File segmentInfoCacheFile = new File(config.getInfoDir(), segment.getIdentifier()); - if (!segmentInfoCacheFile.delete()) { - log.warn("Unable to delete segmentInfoCacheFile[%s]", segmentInfoCacheFile); - } - } - } - catch (Exception e) { - log.makeAlert(e, "Failed to load segment from segmentInfo file") - .addData("file", file) - .emit(); - } - } - - if (ignored > 0) { - log.makeAlert("Ignored misnamed segment cache files on startup.") - .addData("numIgnored", ignored) - .emit(); - } - - addSegments( - cachedSegments, - new DataSegmentChangeCallback() - { - @Override - public void execute() - { - log.info("Cache load took %,d ms", System.currentTimeMillis() - start); - } - } - ); - } - - public DataSegmentChangeHandler getDataSegmentChangeHandler() - { - return ZkCoordinator.this; - } - - /** - * Load a single segment. If the segment is loaded successfully, this function simply returns. Otherwise it will - * throw a SegmentLoadingException - * - * @throws SegmentLoadingException - */ - private void loadSegment(DataSegment segment, DataSegmentChangeCallback callback) throws SegmentLoadingException - { - final boolean loaded; - try { - loaded = segmentManager.loadSegment(segment); - } - catch (Exception e) { - removeSegment(segment, callback); - throw new SegmentLoadingException(e, "Exception loading segment[%s]", segment.getIdentifier()); - } - - if (loaded) { - File segmentInfoCacheFile = new File(config.getInfoDir(), segment.getIdentifier()); - if (!segmentInfoCacheFile.exists()) { - try { - jsonMapper.writeValue(segmentInfoCacheFile, segment); - } - catch (IOException e) { - removeSegment(segment, callback); - throw new SegmentLoadingException( - e, "Failed to write to disk segment info cache file[%s]", segmentInfoCacheFile - ); - } - } - } - } - - @Override - public void addSegment(DataSegment segment, DataSegmentChangeCallback callback) - { - try { - log.info("Loading segment %s", segment.getIdentifier()); - /* - The lock below is used to prevent a race condition when the scheduled runnable in removeSegment() starts, - and if (segmentsToDelete.remove(segment)) returns true, in which case historical will start deleting segment - files. At that point, it's possible that right after the "if" check, addSegment() is called and actually loads - the segment, which makes dropping segment and downloading segment happen at the same time. - */ - if (segmentsToDelete.contains(segment)) { - /* - Both contains(segment) and remove(segment) can be moved inside the synchronized block. However, in that case, - each time when addSegment() is called, it has to wait for the lock in order to make progress, which will make - things slow. Given that in most cases segmentsToDelete.contains(segment) returns false, it will save a lot of - cost of acquiring lock by doing the "contains" check outside the synchronized block. - */ - synchronized (lock) { - segmentsToDelete.remove(segment); - } - } - loadSegment(segment, callback); - try { - announcer.announceSegment(segment); - } - catch (IOException e) { - throw new SegmentLoadingException(e, "Failed to announce segment[%s]", segment.getIdentifier()); - } - } - catch (SegmentLoadingException e) { - log.makeAlert(e, "Failed to load segment for dataSource") - .addData("segment", segment) - .emit(); - } - finally { - callback.execute(); - } - } - - private void addSegments(Collection segments, final DataSegmentChangeCallback callback) - { - ExecutorService loadingExecutor = null; - try (final BackgroundSegmentAnnouncer backgroundSegmentAnnouncer = - new BackgroundSegmentAnnouncer(announcer, exec, config.getAnnounceIntervalMillis())) { - - backgroundSegmentAnnouncer.startAnnouncing(); - - loadingExecutor = Execs.multiThreaded(config.getNumBootstrapThreads(), "ZkCoordinator-loading-%s"); - - final int numSegments = segments.size(); - final CountDownLatch latch = new CountDownLatch(numSegments); - final AtomicInteger counter = new AtomicInteger(0); - final CopyOnWriteArrayList failedSegments = new CopyOnWriteArrayList<>(); - for (final DataSegment segment : segments) { - loadingExecutor.submit( - new Runnable() - { - @Override - public void run() - { - try { - log.info( - "Loading segment[%d/%d][%s]", - counter.incrementAndGet(), - numSegments, - segment.getIdentifier() - ); - loadSegment(segment, callback); - try { - backgroundSegmentAnnouncer.announceSegment(segment); - } - catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new SegmentLoadingException(e, "Loading Interrupted"); - } - } - catch (SegmentLoadingException e) { - log.error(e, "[%s] failed to load", segment.getIdentifier()); - failedSegments.add(segment); - } - finally { - latch.countDown(); - } - } - } - ); - } - - try { - latch.await(); - - if (failedSegments.size() > 0) { - log.makeAlert("%,d errors seen while loading segments", failedSegments.size()) - .addData("failedSegments", failedSegments); - } - } - catch (InterruptedException e) { - Thread.currentThread().interrupt(); - log.makeAlert(e, "LoadingInterrupted"); - } - - backgroundSegmentAnnouncer.finishAnnouncing(); - } - catch (SegmentLoadingException e) { - log.makeAlert(e, "Failed to load segments -- likely problem with announcing.") - .addData("numSegments", segments.size()) - .emit(); - } - finally { - callback.execute(); - if (loadingExecutor != null) { - loadingExecutor.shutdownNow(); - } - } - } - - - @Override - public void removeSegment(final DataSegment segment, final DataSegmentChangeCallback callback) - { - try { - announcer.unannounceSegment(segment); - segmentsToDelete.add(segment); - - log.info("Completely removing [%s] in [%,d] millis", segment.getIdentifier(), config.getDropSegmentDelayMillis()); - exec.schedule( - new Runnable() - { - @Override - public void run() - { - try { - synchronized (lock) { - if (segmentsToDelete.remove(segment)) { - segmentManager.dropSegment(segment); - - File segmentInfoCacheFile = new File(config.getInfoDir(), segment.getIdentifier()); - if (!segmentInfoCacheFile.delete()) { - log.warn("Unable to delete segmentInfoCacheFile[%s]", segmentInfoCacheFile); - } - } - } - } - catch (Exception e) { - log.makeAlert(e, "Failed to remove segment! Possible resource leak!") - .addData("segment", segment) - .emit(); - } - } - }, - config.getDropSegmentDelayMillis(), - TimeUnit.MILLISECONDS - ); - } - catch (Exception e) { - log.makeAlert(e, "Failed to remove segment") - .addData("segment", segment) - .emit(); - } - finally { - callback.execute(); - } - } - - public Collection getPendingDeleteSnapshot() - { - return ImmutableList.copyOf(segmentsToDelete); - } - - private static class BackgroundSegmentAnnouncer implements AutoCloseable - { - private static final EmittingLogger log = new EmittingLogger(BackgroundSegmentAnnouncer.class); - - private final int intervalMillis; - private final DataSegmentAnnouncer announcer; - private final ScheduledExecutorService exec; - private final LinkedBlockingQueue queue; - private final SettableFuture doneAnnouncing; - - private final Object lock = new Object(); - - private volatile boolean finished = false; - private volatile ScheduledFuture startedAnnouncing = null; - private volatile ScheduledFuture nextAnnoucement = null; - - public BackgroundSegmentAnnouncer( - DataSegmentAnnouncer announcer, - ScheduledExecutorService exec, - int intervalMillis - ) - { - this.announcer = announcer; - this.exec = exec; - this.intervalMillis = intervalMillis; - this.queue = Queues.newLinkedBlockingQueue(); - this.doneAnnouncing = SettableFuture.create(); - } - - public void announceSegment(final DataSegment segment) throws InterruptedException - { - if (finished) { - throw new ISE("Announce segment called after finishAnnouncing"); - } - queue.put(segment); - } - - public void startAnnouncing() - { - if (intervalMillis <= 0) { - return; - } - - log.info("Starting background segment announcing task"); - - // schedule background announcing task - nextAnnoucement = startedAnnouncing = exec.schedule( - new Runnable() - { - @Override - public void run() - { - synchronized (lock) { - try { - if (!(finished && queue.isEmpty())) { - final List segments = Lists.newLinkedList(); - queue.drainTo(segments); - try { - announcer.announceSegments(segments); - nextAnnoucement = exec.schedule(this, intervalMillis, TimeUnit.MILLISECONDS); - } - catch (IOException e) { - doneAnnouncing.setException( - new SegmentLoadingException(e, "Failed to announce segments[%s]", segments) - ); - } - } else { - doneAnnouncing.set(true); - } - } - catch (Exception e) { - doneAnnouncing.setException(e); - } - } - } - }, - intervalMillis, - TimeUnit.MILLISECONDS - ); - } - - public void finishAnnouncing() throws SegmentLoadingException - { - synchronized (lock) { - finished = true; - // announce any remaining segments - try { - final List segments = Lists.newLinkedList(); - queue.drainTo(segments); - announcer.announceSegments(segments); - } - catch (Exception e) { - throw new SegmentLoadingException(e, "Failed to announce segments[%s]", queue); - } - - // get any exception that may have been thrown in background announcing - try { - // check in case intervalMillis is <= 0 - if (startedAnnouncing != null) { - startedAnnouncing.cancel(false); - } - // - if the task is waiting on the lock, then the queue will be empty by the time it runs - // - if the task just released it, then the lock ensures any exception is set in doneAnnouncing - if (doneAnnouncing.isDone()) { - doneAnnouncing.get(); - } - } - catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new SegmentLoadingException(e, "Loading Interrupted"); - } - catch (ExecutionException e) { - throw new SegmentLoadingException(e.getCause(), "Background Announcing Task Failed"); - } - } - log.info("Completed background segment announcing"); - } - - @Override - public void close() - { - // stop background scheduling - synchronized (lock) { - finished = true; - if (nextAnnoucement != null) { - nextAnnoucement.cancel(false); - } - } - } - } } diff --git a/server/src/main/java/io/druid/server/coordinator/CuratorLoadQueuePeon.java b/server/src/main/java/io/druid/server/coordinator/CuratorLoadQueuePeon.java new file mode 100644 index 000000000000..8ab64a466d3c --- /dev/null +++ b/server/src/main/java/io/druid/server/coordinator/CuratorLoadQueuePeon.java @@ -0,0 +1,509 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.server.coordinator; + +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.Lists; +import com.metamx.emitter.EmittingLogger; +import io.druid.java.util.common.ISE; +import io.druid.java.util.common.concurrent.ScheduledExecutors; +import io.druid.server.coordination.DataSegmentChangeRequest; +import io.druid.server.coordination.SegmentChangeRequestDrop; +import io.druid.server.coordination.SegmentChangeRequestLoad; +import io.druid.server.coordination.SegmentChangeRequestNoop; +import io.druid.timeline.DataSegment; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.api.CuratorWatcher; +import org.apache.curator.utils.ZKPaths; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.data.Stat; + +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentSkipListMap; +import java.util.concurrent.ConcurrentSkipListSet; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; + +/** + */ +public class CuratorLoadQueuePeon extends LoadQueuePeon +{ + private static final EmittingLogger log = new EmittingLogger(CuratorLoadQueuePeon.class); + private static final int DROP = 0; + private static final int LOAD = 1; + + private static void executeCallbacks(List callbacks) + { + for (LoadPeonCallback callback : callbacks) { + if (callback != null) { + callback.execute(); + } + } + } + + private final CuratorFramework curator; + private final String basePath; + private final ObjectMapper jsonMapper; + private final ScheduledExecutorService processingExecutor; + private final ExecutorService callBackExecutor; + private final DruidCoordinatorConfig config; + + private final AtomicLong queuedSize = new AtomicLong(0); + private final AtomicInteger failedAssignCount = new AtomicInteger(0); + + private final ConcurrentSkipListMap segmentsToLoad = new ConcurrentSkipListMap<>( + DruidCoordinator.SEGMENT_COMPARATOR + ); + private final ConcurrentSkipListMap segmentsToDrop = new ConcurrentSkipListMap<>( + DruidCoordinator.SEGMENT_COMPARATOR + ); + private final ConcurrentSkipListSet segmentsMarkedToDrop = new ConcurrentSkipListSet<>( + DruidCoordinator.SEGMENT_COMPARATOR + ); + + private final Object lock = new Object(); + + private volatile SegmentHolder currentlyProcessing = null; + private boolean stopped = false; + + CuratorLoadQueuePeon( + CuratorFramework curator, + String basePath, + ObjectMapper jsonMapper, + ScheduledExecutorService processingExecutor, + ExecutorService callbackExecutor, + DruidCoordinatorConfig config + ) + { + this.curator = curator; + this.basePath = basePath; + this.jsonMapper = jsonMapper; + this.callBackExecutor = callbackExecutor; + this.processingExecutor = processingExecutor; + this.config = config; + } + + @JsonProperty + @Override + public Set getSegmentsToLoad() + { + return segmentsToLoad.keySet(); + } + + @JsonProperty + @Override + public Set getSegmentsToDrop() + { + return segmentsToDrop.keySet(); + } + + @JsonProperty + @Override + public Set getSegmentsMarkedToDrop() + { + return segmentsMarkedToDrop; + } + + @Override + public long getLoadQueueSize() + { + return queuedSize.get(); + } + + @Override + public int getAndResetFailedAssignCount() + { + return failedAssignCount.getAndSet(0); + } + + @Override + public int getNumberOfSegmentsInQueue() + { + return segmentsToLoad.size(); + } + + @Override + public void loadSegment( + final DataSegment segment, + final LoadPeonCallback callback + ) + { + synchronized (lock) { + if ((currentlyProcessing != null) && + currentlyProcessing.getSegmentIdentifier().equals(segment.getIdentifier())) { + if (callback != null) { + currentlyProcessing.addCallback(callback); + } + return; + } + } + + synchronized (lock) { + final SegmentHolder existingHolder = segmentsToLoad.get(segment); + if (existingHolder != null) { + if ((callback != null)) { + existingHolder.addCallback(callback); + } + return; + } + } + + log.info("Asking server peon[%s] to load segment[%s]", basePath, segment.getIdentifier()); + queuedSize.addAndGet(segment.getSize()); + segmentsToLoad.put(segment, new SegmentHolder(segment, LOAD, Collections.singletonList(callback))); + } + + @Override + public void dropSegment( + final DataSegment segment, + final LoadPeonCallback callback + ) + { + synchronized (lock) { + if ((currentlyProcessing != null) && + currentlyProcessing.getSegmentIdentifier().equals(segment.getIdentifier())) { + if (callback != null) { + currentlyProcessing.addCallback(callback); + } + return; + } + } + + synchronized (lock) { + final SegmentHolder existingHolder = segmentsToDrop.get(segment); + if (existingHolder != null) { + if (callback != null) { + existingHolder.addCallback(callback); + } + return; + } + } + + log.info("Asking server peon[%s] to drop segment[%s]", basePath, segment.getIdentifier()); + segmentsToDrop.put(segment, new SegmentHolder(segment, DROP, Collections.singletonList(callback))); + } + + @Override + public void markSegmentToDrop(DataSegment dataSegment) + { + segmentsMarkedToDrop.add(dataSegment); + } + + @Override + public void unmarkSegmentToDrop(DataSegment dataSegment) + { + segmentsMarkedToDrop.remove(dataSegment); + } + + private void processSegmentChangeRequest() + { + if (currentlyProcessing != null) { + log.debug( + "Server[%s] skipping processSegmentChangeRequest because something is currently loading[%s].", + basePath, + currentlyProcessing.getSegmentIdentifier() + ); + + return; + } + + if (!segmentsToDrop.isEmpty()) { + currentlyProcessing = segmentsToDrop.firstEntry().getValue(); + log.info("Server[%s] dropping [%s]", basePath, currentlyProcessing.getSegmentIdentifier()); + } else if (!segmentsToLoad.isEmpty()) { + currentlyProcessing = segmentsToLoad.firstEntry().getValue(); + log.info("Server[%s] loading [%s]", basePath, currentlyProcessing.getSegmentIdentifier()); + } else { + return; + } + + try { + if (currentlyProcessing == null) { + if (!stopped) { + log.makeAlert("Crazy race condition! server[%s]", basePath) + .emit(); + } + actionCompleted(); + return; + } + + log.info("Server[%s] processing segment[%s]", basePath, currentlyProcessing.getSegmentIdentifier()); + final String path = ZKPaths.makePath(basePath, currentlyProcessing.getSegmentIdentifier()); + final byte[] payload = jsonMapper.writeValueAsBytes(currentlyProcessing.getChangeRequest()); + curator.create().withMode(CreateMode.EPHEMERAL).forPath(path, payload); + + processingExecutor.schedule( + new Runnable() + { + @Override + public void run() + { + try { + if (curator.checkExists().forPath(path) != null) { + failAssign(new ISE("%s was never removed! Failing this operation!", path)); + } + } + catch (Exception e) { + failAssign(e); + } + } + }, + config.getLoadTimeoutDelay().getMillis(), + TimeUnit.MILLISECONDS + ); + + final Stat stat = curator.checkExists().usingWatcher( + new CuratorWatcher() + { + @Override + public void process(WatchedEvent watchedEvent) throws Exception + { + switch (watchedEvent.getType()) { + case NodeDeleted: + entryRemoved(watchedEvent.getPath()); + break; + default: + // do nothing + } + } + } + ).forPath(path); + + if (stat == null) { + final byte[] noopPayload = jsonMapper.writeValueAsBytes(new SegmentChangeRequestNoop()); + + // Create a node and then delete it to remove the registered watcher. This is a work-around for + // a zookeeper race condition. Specifically, when you set a watcher, it fires on the next event + // that happens for that node. If no events happen, the watcher stays registered foreverz. + // Couple that with the fact that you cannot set a watcher when you create a node, but what we + // want is to create a node and then watch for it to get deleted. The solution is that you *can* + // set a watcher when you check to see if it exists so, we first create the node and then set a + // watcher on its existence. However, if already does not exist by the time the existence check + // returns, then the watcher that was set will never fire (nobody will ever create the node + // again) and thus lead to a slow, but real, memory leak. So, we create another node to cause + // that watcher to fire and delete it right away. + // + // We do not create the existence watcher first, because then it will fire when we create the + // node and we'll have the same race when trying to refresh that watcher. + curator.create().withMode(CreateMode.EPHEMERAL).forPath(path, noopPayload); + + entryRemoved(path); + } + } + catch (Exception e) { + failAssign(e); + } + } + + private void actionCompleted() + { + if (currentlyProcessing != null) { + switch (currentlyProcessing.getType()) { + case LOAD: + segmentsToLoad.remove(currentlyProcessing.getSegment()); + queuedSize.addAndGet(-currentlyProcessing.getSegmentSize()); + break; + case DROP: + segmentsToDrop.remove(currentlyProcessing.getSegment()); + break; + default: + throw new UnsupportedOperationException(); + } + + final List callbacks = currentlyProcessing.getCallbacks(); + currentlyProcessing = null; + callBackExecutor.execute( + new Runnable() + { + @Override + public void run() + { + executeCallbacks(callbacks); + } + } + ); + } + } + + @Override + public void start() + { + ScheduledExecutors.scheduleAtFixedRate( + processingExecutor, + config.getLoadQueuePeonRepeatDelay(), + config.getLoadQueuePeonRepeatDelay(), + new Callable() + { + @Override + public ScheduledExecutors.Signal call() + { + processSegmentChangeRequest(); + + if (stopped) { + return ScheduledExecutors.Signal.STOP; + } else { + return ScheduledExecutors.Signal.REPEAT; + } + } + } + ); + } + + @Override + public void stop() + { + synchronized (lock) { + if (currentlyProcessing != null) { + executeCallbacks(currentlyProcessing.getCallbacks()); + currentlyProcessing = null; + } + + if (!segmentsToDrop.isEmpty()) { + for (SegmentHolder holder : segmentsToDrop.values()) { + executeCallbacks(holder.getCallbacks()); + } + } + segmentsToDrop.clear(); + + if (!segmentsToLoad.isEmpty()) { + for (SegmentHolder holder : segmentsToLoad.values()) { + executeCallbacks(holder.getCallbacks()); + } + } + segmentsToLoad.clear(); + + queuedSize.set(0L); + failedAssignCount.set(0); + stopped = true; + } + } + + private void entryRemoved(String path) + { + synchronized (lock) { + if (currentlyProcessing == null) { + log.warn("Server[%s] an entry[%s] was removed even though it wasn't loading!?", basePath, path); + return; + } + if (!ZKPaths.getNodeFromPath(path).equals(currentlyProcessing.getSegmentIdentifier())) { + log.warn( + "Server[%s] entry [%s] was removed even though it's not what is currently loading[%s]", + basePath, path, currentlyProcessing + ); + return; + } + actionCompleted(); + log.info("Server[%s] done processing [%s]", basePath, path); + } + } + + private void failAssign(Exception e) + { + synchronized (lock) { + log.error(e, "Server[%s], throwable caught when submitting [%s].", basePath, currentlyProcessing); + failedAssignCount.getAndIncrement(); + // Act like it was completed so that the coordinator gives it to someone else + actionCompleted(); + } + } + + private static class SegmentHolder + { + private final DataSegment segment; + private final DataSegmentChangeRequest changeRequest; + private final int type; + private final List callbacks = Lists.newArrayList(); + + private SegmentHolder( + DataSegment segment, + int type, + Collection callbacks + ) + { + this.segment = segment; + this.type = type; + this.changeRequest = (type == LOAD) + ? new SegmentChangeRequestLoad(segment) + : new SegmentChangeRequestDrop(segment); + this.callbacks.addAll(callbacks); + } + + public DataSegment getSegment() + { + return segment; + } + + public int getType() + { + return type; + } + + public String getSegmentIdentifier() + { + return segment.getIdentifier(); + } + + public long getSegmentSize() + { + return segment.getSize(); + } + + public void addCallbacks(Collection newCallbacks) + { + synchronized (callbacks) { + callbacks.addAll(newCallbacks); + } + } + + public void addCallback(LoadPeonCallback newCallback) + { + synchronized (callbacks) { + callbacks.add(newCallback); + } + } + + public List getCallbacks() + { + synchronized (callbacks) { + return callbacks; + } + } + + public DataSegmentChangeRequest getChangeRequest() + { + return changeRequest; + } + + @Override + public String toString() + { + return changeRequest.toString(); + } + } +} diff --git a/server/src/main/java/io/druid/server/coordinator/DruidCoordinator.java b/server/src/main/java/io/druid/server/coordinator/DruidCoordinator.java index 95d3d0cd1a71..fd51086ee11c 100644 --- a/server/src/main/java/io/druid/server/coordinator/DruidCoordinator.java +++ b/server/src/main/java/io/druid/server/coordinator/DruidCoordinator.java @@ -735,10 +735,9 @@ public ImmutableDruidServer apply(DruidServer input) final DruidCluster cluster = new DruidCluster(); for (ImmutableDruidServer server : servers) { if (!loadManagementPeons.containsKey(server.getName())) { - String basePath = ZKPaths.makePath(zkPaths.getLoadQueuePath(), server.getName()); - LoadQueuePeon loadQueuePeon = taskMaster.giveMePeon(basePath); + LoadQueuePeon loadQueuePeon = taskMaster.giveMePeon(server); loadQueuePeon.start(); - log.info("Creating LoadQueuePeon for server[%s] at path[%s]", server.getName(), basePath); + log.info("Created LoadQueuePeon for server[%s].", server.getName()); loadManagementPeons.put(server.getName(), loadQueuePeon); } diff --git a/server/src/main/java/io/druid/server/coordinator/DruidCoordinatorConfig.java b/server/src/main/java/io/druid/server/coordinator/DruidCoordinatorConfig.java index 1271af0ec751..ba834620013d 100644 --- a/server/src/main/java/io/druid/server/coordinator/DruidCoordinatorConfig.java +++ b/server/src/main/java/io/druid/server/coordinator/DruidCoordinatorConfig.java @@ -86,4 +86,28 @@ public Duration getLoadQueuePeonRepeatDelay() { return Duration.millis(50); } + + @Config("druid.coordinator.loadqueuepeon.type") + public String getLoadQueuePeonType() + { + return "curator"; + } + + @Config("druid.coordinator.loadqueuepeon.http.repeatDelay") + public Duration getHttpLoadQueuePeonRepeatDelay() + { + return Duration.millis(60000); + } + + @Config("druid.coordinator.loadqueuepeon.http.hostTimeout") + public Duration getHttpLoadQueuePeonHostTimeout() + { + return Duration.millis(300000); + } + + @Config("druid.coordinator.loadqueuepeon.http.batchSize") + public int getHttpLoadQueuePeonBatchSize() + { + return 1; + } } diff --git a/server/src/main/java/io/druid/server/coordinator/HttpLoadQueuePeon.java b/server/src/main/java/io/druid/server/coordinator/HttpLoadQueuePeon.java new file mode 100644 index 000000000000..1c9f7fca7f67 --- /dev/null +++ b/server/src/main/java/io/druid/server/coordinator/HttpLoadQueuePeon.java @@ -0,0 +1,576 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.server.coordinator; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.ObjectWriter; +import com.google.common.base.Throwables; +import com.google.common.collect.Iterators; +import com.google.common.collect.Lists; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.metamx.common.RE; +import com.metamx.common.StringUtils; +import com.metamx.emitter.EmittingLogger; +import com.metamx.http.client.HttpClient; +import com.metamx.http.client.Request; +import com.metamx.http.client.io.AppendableByteArrayInputStream; +import com.metamx.http.client.response.ClientResponse; +import com.metamx.http.client.response.InputStreamResponseHandler; +import io.druid.java.util.common.ISE; +import io.druid.java.util.common.concurrent.ScheduledExecutors; +import io.druid.server.coordination.DataSegmentChangeCallback; +import io.druid.server.coordination.DataSegmentChangeHandler; +import io.druid.server.coordination.DataSegmentChangeRequest; +import io.druid.server.coordination.SegmentChangeRequestDrop; +import io.druid.server.coordination.SegmentChangeRequestLoad; +import io.druid.server.coordination.SegmentLoadDropHandler; +import io.druid.timeline.DataSegment; +import org.jboss.netty.handler.codec.http.HttpHeaders; +import org.jboss.netty.handler.codec.http.HttpMethod; +import org.jboss.netty.handler.codec.http.HttpResponse; +import org.joda.time.Duration; + +import javax.servlet.http.HttpServletResponse; +import javax.ws.rs.core.MediaType; +import java.io.InputStream; +import java.net.MalformedURLException; +import java.net.URL; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentSkipListMap; +import java.util.concurrent.ConcurrentSkipListSet; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; + +/** + */ +public class HttpLoadQueuePeon extends LoadQueuePeon +{ + public static final TypeReference REQUEST_ENTITY_TYPE_REF = new TypeReference>() + { + }; + + public static final TypeReference RESPONSE_ENTITY_TYPE_REF = new TypeReference>() + { + }; + + private static final EmittingLogger log = new EmittingLogger(HttpLoadQueuePeon.class); + + private final AtomicLong queuedSize = new AtomicLong(0); + private final AtomicInteger failedAssignCount = new AtomicInteger(0); + + private final ConcurrentSkipListMap segmentsToLoad = new ConcurrentSkipListMap<>( + DruidCoordinator.SEGMENT_COMPARATOR + ); + private final ConcurrentSkipListMap segmentsToDrop = new ConcurrentSkipListMap<>( + DruidCoordinator.SEGMENT_COMPARATOR + ); + private final ConcurrentSkipListSet segmentsMarkedToDrop = new ConcurrentSkipListSet<>( + DruidCoordinator.SEGMENT_COMPARATOR + ); + + private final ScheduledExecutorService processingExecutor; + + private volatile boolean stopped = false; + + private final Object lock = new Object(); + + private final DruidCoordinatorConfig config; + + private final ObjectMapper jsonMapper; + private final HttpClient httpClient; + private final URL changeRequestURL; + private final String serverId; + + private final AtomicBoolean mainLoopInProgress = new AtomicBoolean(false); + private final ExecutorService callBackExecutor; + + private final ObjectWriter requestBodyWriter; + + public HttpLoadQueuePeon( + String baseUrl, + ObjectMapper jsonMapper, + HttpClient httpClient, + DruidCoordinatorConfig config, + ScheduledExecutorService processingExecutor, + ExecutorService callBackExecutor + ) + { + this.jsonMapper = jsonMapper; + this.requestBodyWriter = jsonMapper.writerWithType(REQUEST_ENTITY_TYPE_REF); + this.httpClient = httpClient; + this.config = config; + this.processingExecutor = processingExecutor; + this.callBackExecutor = callBackExecutor; + + this.serverId = baseUrl; + try { + this.changeRequestURL = new URL( + new URL(baseUrl), + StringUtils.safeFormat( + "druid-internal/v1/segments/changeRequests?timeout=%d", + config.getHttpLoadQueuePeonHostTimeout().getMillis() + ) + ); + } + catch (MalformedURLException ex) { + throw Throwables.propagate(ex); + } + } + + private void doSegmentManagement() + { + if (stopped || !mainLoopInProgress.compareAndSet(false, true)) { + log.debug("[%s]Ignoring tick. Either in-progress already or stopped.", serverId); + return; + } + + int batchSize = config.getHttpLoadQueuePeonBatchSize(); + + List newRequests = new ArrayList<>(batchSize); + + synchronized (lock) { + Iterator> iter = Iterators.concat( + segmentsToDrop.entrySet().iterator(), + segmentsToLoad.entrySet().iterator() + ); + + while (batchSize > 0 && iter.hasNext()) { + batchSize--; + Map.Entry entry = iter.next(); + if (entry.getValue().hasTimedOut()) { + entry.getValue().requestFailed("timed out"); + iter.remove(); + } else { + newRequests.add(entry.getValue().getChangeRequest()); + } + } + } + + if (newRequests.size() == 0) { + mainLoopInProgress.set(false); + return; + } + + try { + log.debug("Sending [%d] load/drop requests to Server[%s].", newRequests.size(), serverId); + BytesAccumulatingResponseHandler responseHandler = new BytesAccumulatingResponseHandler(); + ListenableFuture future = httpClient.go( + new Request(HttpMethod.POST, changeRequestURL) + .addHeader(HttpHeaders.Names.ACCEPT, MediaType.APPLICATION_JSON) + .addHeader(HttpHeaders.Names.CONTENT_TYPE, MediaType.APPLICATION_JSON) + .setContent(requestBodyWriter.writeValueAsBytes(newRequests)), + responseHandler, + new Duration(config.getHttpLoadQueuePeonHostTimeout().getMillis() + 5000) + ); + + Futures.addCallback( + future, + new FutureCallback() + { + @Override + public void onSuccess(InputStream result) + { + boolean scheduleNextRunImmediately = true; + try { + if (responseHandler.status == HttpServletResponse.SC_NO_CONTENT) { + log.debug("Received NO CONTENT reseponse from [%s]", serverId); + } else if (HttpServletResponse.SC_OK == responseHandler.status) { + try { + List statuses = jsonMapper.readValue( + result, RESPONSE_ENTITY_TYPE_REF + ); + + synchronized (lock) { + if (stopped) { + return; + } + + for (SegmentLoadDropHandler.DataSegmentChangeRequestAndStatus e : statuses) { + switch (e.getStatus().getState()) { + case SUCCESS: + case FAILED: + handleResponseStatus(e.getRequest(), e.getStatus()); + break; + case PENDING: + log.debug("[%s]Segment request [%s] is pending.", serverId, e.getRequest()); + break; + default: + scheduleNextRunImmediately = false; + log.error("WTF! Server[%s] returned unknown state in status[%s].", serverId, e.getStatus()); + } + } + } + } + catch (Exception ex) { + scheduleNextRunImmediately = false; + logRequestFailure(ex); + } + } else { + scheduleNextRunImmediately = false; + logRequestFailure(new RE("Unexpected Response Status.")); + } + } + finally { + mainLoopInProgress.set(false); + + if (scheduleNextRunImmediately) { + processingExecutor.execute(HttpLoadQueuePeon.this::doSegmentManagement); + } + } + } + + @Override + public void onFailure(Throwable t) + { + try { + logRequestFailure(t); + } + finally { + mainLoopInProgress.set(false); + } + } + + private void logRequestFailure(Throwable t) + { + log.error( + t, + "Request[%s] Failed with code[%s] and status[%s]. Reason[%s].", + changeRequestURL, + responseHandler.status, + responseHandler.description + ); + } + }, + processingExecutor + ); + } + catch (Throwable th) { + log.error(th, "Error sending load/drop request to [%s].", serverId); + mainLoopInProgress.set(false); + } + } + + private void handleResponseStatus(DataSegmentChangeRequest changeRequest, SegmentLoadDropHandler.Status status) + { + changeRequest.go( + new DataSegmentChangeHandler() + { + @Override + public void addSegment(DataSegment segment, DataSegmentChangeCallback callback) + { + updateSuccessOrFailureInHolder(segmentsToLoad.remove(segment), status); + } + + @Override + public void removeSegment(DataSegment segment, DataSegmentChangeCallback callback) + { + updateSuccessOrFailureInHolder(segmentsToDrop.remove(segment), status); + } + + private void updateSuccessOrFailureInHolder(SegmentHolder holder, SegmentLoadDropHandler.Status status) + { + if (holder == null) { + return; + } + + if (status.getState() + == SegmentLoadDropHandler.Status.STATE.FAILED) { + holder.requestFailed(status.getFailureCause()); + } else { + holder.requestSucceeded(); + } + } + }, null + ); + } + + @Override + public void start() + { + synchronized (lock) { + if (stopped) { + throw new ISE("Can't start."); + } + + ScheduledExecutors.scheduleAtFixedRate( + processingExecutor, + new Duration(config.getHttpLoadQueuePeonRepeatDelay()), + new Callable() + { + @Override + public ScheduledExecutors.Signal call() + { + if (!stopped) { + doSegmentManagement(); + } + + if (stopped) { + return ScheduledExecutors.Signal.STOP; + } else { + return ScheduledExecutors.Signal.REPEAT; + } + } + } + ); + } + } + + @Override + public void stop() + { + synchronized (lock) { + if (stopped) { + return; + } + + stopped = true; + + for (SegmentHolder holder : segmentsToDrop.values()) { + holder.requestSucceeded(); + } + + for (SegmentHolder holder : segmentsToLoad.values()) { + holder.requestSucceeded(); + } + + segmentsToDrop.clear(); + segmentsToLoad.clear(); + queuedSize.set(0L); + failedAssignCount.set(0); + } + } + + @Override + public void loadSegment(DataSegment segment, LoadPeonCallback callback) + { + synchronized (lock) { + SegmentHolder holder = segmentsToLoad.get(segment); + + if (holder == null) { + log.info("Server[%s] to load segment[%s] queued.", serverId, segment.getIdentifier()); + segmentsToLoad.put(segment, new LoadSegmentHolder(segment, callback)); + processingExecutor.execute(this::doSegmentManagement); + } else { + holder.addCallback(callback); + } + } + } + + @Override + public void dropSegment(DataSegment segment, LoadPeonCallback callback) + { + synchronized (lock) { + SegmentHolder holder = segmentsToDrop.get(segment); + + if (holder == null) { + log.info("Server[%s] to drop segment[%s] queued.", serverId, segment.getIdentifier()); + segmentsToDrop.put(segment, new DropSegmentHolder(segment, callback)); + processingExecutor.execute(this::doSegmentManagement); + } else { + holder.addCallback(callback); + } + } + } + + @Override + public Set getSegmentsToLoad() + { + return Collections.unmodifiableSet(segmentsToLoad.keySet()); + } + + @Override + public Set getSegmentsToDrop() + { + return Collections.unmodifiableSet(segmentsToDrop.keySet()); + } + + @Override + public long getLoadQueueSize() + { + return queuedSize.get(); + } + + @Override + public int getAndResetFailedAssignCount() + { + return failedAssignCount.getAndSet(0); + } + + @Override + public void markSegmentToDrop(DataSegment dataSegment) + { + segmentsMarkedToDrop.add(dataSegment); + } + + @Override + public void unmarkSegmentToDrop(DataSegment dataSegment) + { + segmentsMarkedToDrop.remove(dataSegment); + } + + @Override + public int getNumberOfSegmentsInQueue() + { + return segmentsToLoad.size(); + } + + @Override + public Set getSegmentsMarkedToDrop() + { + return Collections.unmodifiableSet(segmentsMarkedToDrop); + } + + private abstract class SegmentHolder + { + private final DataSegment segment; + private final DataSegmentChangeRequest changeRequest; + private final List callbacks = Lists.newArrayList(); + + // Time when this request was sent to target server the first time. + private volatile long scheduleTime = -1; + + private SegmentHolder( + DataSegment segment, + DataSegmentChangeRequest changeRequest, + LoadPeonCallback callback + ) + { + this.segment = segment; + this.changeRequest = changeRequest; + + if (callback != null) { + this.callbacks.add(callback); + } + } + + public void addCallback(LoadPeonCallback newCallback) + { + synchronized (callbacks) { + if (newCallback != null) { + callbacks.add(newCallback); + } + } + } + + public DataSegment getSegment() + { + return segment; + } + + public DataSegmentChangeRequest getChangeRequest() + { + return changeRequest; + } + + public boolean hasTimedOut() + { + if (scheduleTime < 0) { + scheduleTime = System.currentTimeMillis(); + return false; + } else if (System.currentTimeMillis() - scheduleTime > config.getLoadTimeoutDelay().getMillis()) { + return true; + } else { + return false; + } + } + + public void requestSucceeded() + { + callBackExecutor.execute(() -> { + for (LoadPeonCallback callback : callbacks) { + if (callback != null) { + callback.execute(); + } + } + }); + } + + public void requestFailed(String failureCause) + { + log.error( + "Server[%s] Failed segment[%s] request[%s] with cause [%s].", + serverId, + segment.getIdentifier(), + changeRequest.getClass().getSimpleName(), + failureCause + ); + + failedAssignCount.getAndIncrement(); + + requestSucceeded(); + } + + @Override + public String toString() + { + return changeRequest.toString(); + } + } + + private class LoadSegmentHolder extends SegmentHolder + { + public LoadSegmentHolder(DataSegment segment, LoadPeonCallback callback) + { + super(segment, new SegmentChangeRequestLoad(segment), callback); + queuedSize.addAndGet(segment.getSize()); + } + + @Override + public void requestSucceeded() + { + queuedSize.addAndGet(-getSegment().getSize()); + super.requestSucceeded(); + } + } + + private class DropSegmentHolder extends SegmentHolder + { + public DropSegmentHolder(DataSegment segment, LoadPeonCallback callback) + { + super(segment, new SegmentChangeRequestDrop(segment), callback); + } + } + + private static class BytesAccumulatingResponseHandler extends InputStreamResponseHandler + { + private int status; + private String description; + + @Override + public ClientResponse handleResponse(HttpResponse response) + { + status = response.getStatus().getCode(); + description = response.getStatus().getReasonPhrase(); + return ClientResponse.unfinished(super.handleResponse(response).getObj()); + } + } +} diff --git a/server/src/main/java/io/druid/server/coordinator/LoadQueuePeon.java b/server/src/main/java/io/druid/server/coordinator/LoadQueuePeon.java index 81eab183bdcf..658be22e4f34 100644 --- a/server/src/main/java/io/druid/server/coordinator/LoadQueuePeon.java +++ b/server/src/main/java/io/druid/server/coordinator/LoadQueuePeon.java @@ -19,479 +19,37 @@ package io.druid.server.coordinator; -import com.fasterxml.jackson.annotation.JsonProperty; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.collect.Lists; -import com.metamx.emitter.EmittingLogger; -import io.druid.java.util.common.ISE; -import io.druid.java.util.common.concurrent.ScheduledExecutors; -import io.druid.server.coordination.DataSegmentChangeRequest; -import io.druid.server.coordination.SegmentChangeRequestDrop; -import io.druid.server.coordination.SegmentChangeRequestLoad; -import io.druid.server.coordination.SegmentChangeRequestNoop; import io.druid.timeline.DataSegment; -import org.apache.curator.framework.CuratorFramework; -import org.apache.curator.framework.api.CuratorWatcher; -import org.apache.curator.utils.ZKPaths; -import org.apache.zookeeper.CreateMode; -import org.apache.zookeeper.WatchedEvent; -import org.apache.zookeeper.data.Stat; -import java.util.Collection; -import java.util.Collections; -import java.util.List; import java.util.Set; -import java.util.concurrent.Callable; -import java.util.concurrent.ConcurrentSkipListMap; -import java.util.concurrent.ConcurrentSkipListSet; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; /** + * This interface exists only to support configurable load queue management via curator or http. Once HttpLoadQueuePeon + * has been verified enough in production, CuratorLoadQueuePeon and this interface would be removed. */ -public class LoadQueuePeon +@Deprecated +public abstract class LoadQueuePeon { - private static final EmittingLogger log = new EmittingLogger(LoadQueuePeon.class); - private static final int DROP = 0; - private static final int LOAD = 1; + public abstract void start(); + public abstract void stop(); - private static void executeCallbacks(List callbacks) - { - for (LoadPeonCallback callback : callbacks) { - if (callback != null) { - callback.execute(); - } - } - } + public abstract Set getSegmentsToLoad(); - private final CuratorFramework curator; - private final String basePath; - private final ObjectMapper jsonMapper; - private final ScheduledExecutorService processingExecutor; - private final ExecutorService callBackExecutor; - private final DruidCoordinatorConfig config; + public abstract Set getSegmentsToDrop(); - private final AtomicLong queuedSize = new AtomicLong(0); - private final AtomicInteger failedAssignCount = new AtomicInteger(0); + public abstract void unmarkSegmentToDrop(DataSegment segmentToLoad); - private final ConcurrentSkipListMap segmentsToLoad = new ConcurrentSkipListMap<>( - DruidCoordinator.SEGMENT_COMPARATOR - ); - private final ConcurrentSkipListMap segmentsToDrop = new ConcurrentSkipListMap<>( - DruidCoordinator.SEGMENT_COMPARATOR - ); - private final ConcurrentSkipListSet segmentsMarkedToDrop = new ConcurrentSkipListSet<>( - DruidCoordinator.SEGMENT_COMPARATOR - ); - private final Object lock = new Object(); + public abstract void markSegmentToDrop(DataSegment segmentToLoad); - private volatile SegmentHolder currentlyProcessing = null; - private boolean stopped = false; + public abstract void loadSegment(DataSegment segment, LoadPeonCallback callback); + public abstract void dropSegment(DataSegment segment, LoadPeonCallback callback); - LoadQueuePeon( - CuratorFramework curator, - String basePath, - ObjectMapper jsonMapper, - ScheduledExecutorService processingExecutor, - ExecutorService callbackExecutor, - DruidCoordinatorConfig config - ) - { - this.curator = curator; - this.basePath = basePath; - this.jsonMapper = jsonMapper; - this.callBackExecutor = callbackExecutor; - this.processingExecutor = processingExecutor; - this.config = config; - } + public abstract long getLoadQueueSize(); - @JsonProperty - public Set getSegmentsToLoad() - { - return segmentsToLoad.keySet(); - } + public abstract int getAndResetFailedAssignCount(); - @JsonProperty - public Set getSegmentsToDrop() - { - return segmentsToDrop.keySet(); - } + public abstract int getNumberOfSegmentsInQueue(); + public abstract Set getSegmentsMarkedToDrop(); - @JsonProperty - public Set getSegmentsMarkedToDrop() - { - return segmentsMarkedToDrop; - } - - public long getLoadQueueSize() - { - return queuedSize.get(); - } - - public int getAndResetFailedAssignCount() - { - return failedAssignCount.getAndSet(0); - } - - public int getNumberOfSegmentsInQueue() - { - return segmentsToLoad.size(); - } - - public void loadSegment( - final DataSegment segment, - final LoadPeonCallback callback - ) - { - synchronized (lock) { - if ((currentlyProcessing != null) && - currentlyProcessing.getSegmentIdentifier().equals(segment.getIdentifier())) { - if (callback != null) { - currentlyProcessing.addCallback(callback); - } - return; - } - } - - synchronized (lock) { - final SegmentHolder existingHolder = segmentsToLoad.get(segment); - if (existingHolder != null) { - if ((callback != null)) { - existingHolder.addCallback(callback); - } - return; - } - } - - log.info("Asking server peon[%s] to load segment[%s]", basePath, segment.getIdentifier()); - queuedSize.addAndGet(segment.getSize()); - segmentsToLoad.put(segment, new SegmentHolder(segment, LOAD, Collections.singletonList(callback))); - } - - public void dropSegment( - final DataSegment segment, - final LoadPeonCallback callback - ) - { - synchronized (lock) { - if ((currentlyProcessing != null) && - currentlyProcessing.getSegmentIdentifier().equals(segment.getIdentifier())) { - if (callback != null) { - currentlyProcessing.addCallback(callback); - } - return; - } - } - - synchronized (lock) { - final SegmentHolder existingHolder = segmentsToDrop.get(segment); - if (existingHolder != null) { - if (callback != null) { - existingHolder.addCallback(callback); - } - return; - } - } - - log.info("Asking server peon[%s] to drop segment[%s]", basePath, segment.getIdentifier()); - segmentsToDrop.put(segment, new SegmentHolder(segment, DROP, Collections.singletonList(callback))); - } - - public void markSegmentToDrop(DataSegment dataSegment) - { - segmentsMarkedToDrop.add(dataSegment); - } - - public void unmarkSegmentToDrop(DataSegment dataSegment) - { - segmentsMarkedToDrop.remove(dataSegment); - } - - private void processSegmentChangeRequest() - { - if (currentlyProcessing != null) { - log.debug( - "Server[%s] skipping processSegmentChangeRequest because something is currently loading[%s].", - basePath, - currentlyProcessing.getSegmentIdentifier() - ); - - return; - } - - if (!segmentsToDrop.isEmpty()) { - currentlyProcessing = segmentsToDrop.firstEntry().getValue(); - log.info("Server[%s] dropping [%s]", basePath, currentlyProcessing.getSegmentIdentifier()); - } else if (!segmentsToLoad.isEmpty()) { - currentlyProcessing = segmentsToLoad.firstEntry().getValue(); - log.info("Server[%s] loading [%s]", basePath, currentlyProcessing.getSegmentIdentifier()); - } else { - return; - } - - try { - if (currentlyProcessing == null) { - if (!stopped) { - log.makeAlert("Crazy race condition! server[%s]", basePath) - .emit(); - } - actionCompleted(); - return; - } - - log.info("Server[%s] processing segment[%s]", basePath, currentlyProcessing.getSegmentIdentifier()); - final String path = ZKPaths.makePath(basePath, currentlyProcessing.getSegmentIdentifier()); - final byte[] payload = jsonMapper.writeValueAsBytes(currentlyProcessing.getChangeRequest()); - curator.create().withMode(CreateMode.EPHEMERAL).forPath(path, payload); - - processingExecutor.schedule( - new Runnable() - { - @Override - public void run() - { - try { - if (curator.checkExists().forPath(path) != null) { - failAssign(new ISE("%s was never removed! Failing this operation!", path)); - } - } - catch (Exception e) { - failAssign(e); - } - } - }, - config.getLoadTimeoutDelay().getMillis(), - TimeUnit.MILLISECONDS - ); - - final Stat stat = curator.checkExists().usingWatcher( - new CuratorWatcher() - { - @Override - public void process(WatchedEvent watchedEvent) throws Exception - { - switch (watchedEvent.getType()) { - case NodeDeleted: - entryRemoved(watchedEvent.getPath()); - break; - default: - // do nothing - } - } - } - ).forPath(path); - - if (stat == null) { - final byte[] noopPayload = jsonMapper.writeValueAsBytes(new SegmentChangeRequestNoop()); - - // Create a node and then delete it to remove the registered watcher. This is a work-around for - // a zookeeper race condition. Specifically, when you set a watcher, it fires on the next event - // that happens for that node. If no events happen, the watcher stays registered foreverz. - // Couple that with the fact that you cannot set a watcher when you create a node, but what we - // want is to create a node and then watch for it to get deleted. The solution is that you *can* - // set a watcher when you check to see if it exists so, we first create the node and then set a - // watcher on its existence. However, if already does not exist by the time the existence check - // returns, then the watcher that was set will never fire (nobody will ever create the node - // again) and thus lead to a slow, but real, memory leak. So, we create another node to cause - // that watcher to fire and delete it right away. - // - // We do not create the existence watcher first, because then it will fire when we create the - // node and we'll have the same race when trying to refresh that watcher. - curator.create().withMode(CreateMode.EPHEMERAL).forPath(path, noopPayload); - - entryRemoved(path); - } - } - catch (Exception e) { - failAssign(e); - } - } - - private void actionCompleted() - { - if (currentlyProcessing != null) { - switch (currentlyProcessing.getType()) { - case LOAD: - segmentsToLoad.remove(currentlyProcessing.getSegment()); - queuedSize.addAndGet(-currentlyProcessing.getSegmentSize()); - break; - case DROP: - segmentsToDrop.remove(currentlyProcessing.getSegment()); - break; - default: - throw new UnsupportedOperationException(); - } - - final List callbacks = currentlyProcessing.getCallbacks(); - currentlyProcessing = null; - callBackExecutor.execute( - new Runnable() - { - @Override - public void run() - { - executeCallbacks(callbacks); - } - } - ); - } - } - - public void start() - { - ScheduledExecutors.scheduleAtFixedRate( - processingExecutor, - config.getLoadQueuePeonRepeatDelay(), - config.getLoadQueuePeonRepeatDelay(), - new Callable() - { - @Override - public ScheduledExecutors.Signal call() - { - processSegmentChangeRequest(); - - if (stopped) { - return ScheduledExecutors.Signal.STOP; - } else { - return ScheduledExecutors.Signal.REPEAT; - } - } - } - ); - } - - public void stop() - { - synchronized (lock) { - if (currentlyProcessing != null) { - executeCallbacks(currentlyProcessing.getCallbacks()); - currentlyProcessing = null; - } - - if (!segmentsToDrop.isEmpty()) { - for (SegmentHolder holder : segmentsToDrop.values()) { - executeCallbacks(holder.getCallbacks()); - } - } - segmentsToDrop.clear(); - - if (!segmentsToLoad.isEmpty()) { - for (SegmentHolder holder : segmentsToLoad.values()) { - executeCallbacks(holder.getCallbacks()); - } - } - segmentsToLoad.clear(); - - queuedSize.set(0L); - failedAssignCount.set(0); - stopped = true; - } - } - - private void entryRemoved(String path) - { - synchronized (lock) { - if (currentlyProcessing == null) { - log.warn("Server[%s] an entry[%s] was removed even though it wasn't loading!?", basePath, path); - return; - } - if (!ZKPaths.getNodeFromPath(path).equals(currentlyProcessing.getSegmentIdentifier())) { - log.warn( - "Server[%s] entry [%s] was removed even though it's not what is currently loading[%s]", - basePath, path, currentlyProcessing - ); - return; - } - actionCompleted(); - log.info("Server[%s] done processing [%s]", basePath, path); - } - } - - private void failAssign(Exception e) - { - synchronized (lock) { - log.error(e, "Server[%s], throwable caught when submitting [%s].", basePath, currentlyProcessing); - failedAssignCount.getAndIncrement(); - // Act like it was completed so that the coordinator gives it to someone else - actionCompleted(); - } - } - - private static class SegmentHolder - { - private final DataSegment segment; - private final DataSegmentChangeRequest changeRequest; - private final int type; - private final List callbacks = Lists.newArrayList(); - - private SegmentHolder( - DataSegment segment, - int type, - Collection callbacks - ) - { - this.segment = segment; - this.type = type; - this.changeRequest = (type == LOAD) - ? new SegmentChangeRequestLoad(segment) - : new SegmentChangeRequestDrop(segment); - this.callbacks.addAll(callbacks); - } - - public DataSegment getSegment() - { - return segment; - } - - public int getType() - { - return type; - } - - public String getSegmentIdentifier() - { - return segment.getIdentifier(); - } - - public long getSegmentSize() - { - return segment.getSize(); - } - - public void addCallbacks(Collection newCallbacks) - { - synchronized (callbacks) { - callbacks.addAll(newCallbacks); - } - } - - public void addCallback(LoadPeonCallback newCallback) - { - synchronized (callbacks) { - callbacks.add(newCallback); - } - } - - public List getCallbacks() - { - synchronized (callbacks) { - return callbacks; - } - } - - public DataSegmentChangeRequest getChangeRequest() - { - return changeRequest; - } - - @Override - public String toString() - { - return changeRequest.toString(); - } - } } diff --git a/server/src/main/java/io/druid/server/coordinator/LoadQueueTaskMaster.java b/server/src/main/java/io/druid/server/coordinator/LoadQueueTaskMaster.java index cdad8c7adc21..c24a98a2ff2f 100644 --- a/server/src/main/java/io/druid/server/coordinator/LoadQueueTaskMaster.java +++ b/server/src/main/java/io/druid/server/coordinator/LoadQueueTaskMaster.java @@ -21,7 +21,13 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.inject.Inject; +import com.metamx.http.client.HttpClient; +import io.druid.client.ImmutableDruidServer; +import io.druid.guice.annotations.Global; +import io.druid.guice.annotations.Json; +import io.druid.server.initialization.ZkPathsConfig; import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.utils.ZKPaths; import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledExecutorService; @@ -36,14 +42,18 @@ public class LoadQueueTaskMaster private final ScheduledExecutorService peonExec; private final ExecutorService callbackExec; private final DruidCoordinatorConfig config; + private final HttpClient httpClient; + private final ZkPathsConfig zkPaths; @Inject public LoadQueueTaskMaster( CuratorFramework curator, - ObjectMapper jsonMapper, + @Json ObjectMapper jsonMapper, ScheduledExecutorService peonExec, ExecutorService callbackExec, - DruidCoordinatorConfig config + DruidCoordinatorConfig config, + @Global HttpClient httpClient, + ZkPathsConfig zkPaths ) { this.curator = curator; @@ -51,10 +61,23 @@ public LoadQueueTaskMaster( this.peonExec = peonExec; this.callbackExec = callbackExec; this.config = config; + this.httpClient = httpClient; + this.zkPaths = zkPaths; } - public LoadQueuePeon giveMePeon(String basePath) + public LoadQueuePeon giveMePeon(ImmutableDruidServer server) { - return new LoadQueuePeon(curator, basePath, jsonMapper, peonExec, callbackExec, config); + if ("http".equalsIgnoreCase(config.getLoadQueuePeonType())) { + return new HttpLoadQueuePeon(server.getURL(), jsonMapper, httpClient, config, peonExec, callbackExec); + } else { + return new CuratorLoadQueuePeon( + curator, + ZKPaths.makePath(zkPaths.getLoadQueuePath(), server.getName()), + jsonMapper, + peonExec, + callbackExec, + config + ); + } } } diff --git a/server/src/main/java/io/druid/server/http/SegmentListerResource.java b/server/src/main/java/io/druid/server/http/SegmentListerResource.java index c24ec4c60a1c..600806cc7e13 100644 --- a/server/src/main/java/io/druid/server/http/SegmentListerResource.java +++ b/server/src/main/java/io/druid/server/http/SegmentListerResource.java @@ -30,10 +30,12 @@ import io.druid.guice.annotations.Json; import io.druid.guice.annotations.Smile; import io.druid.server.coordination.BatchDataSegmentAnnouncer; +import io.druid.server.coordination.DataSegmentChangeRequest; import io.druid.server.coordination.SegmentChangeRequestHistory; import io.druid.server.coordination.SegmentChangeRequestsSnapshot; +import io.druid.server.coordination.SegmentLoadDropHandler; +import io.druid.server.coordinator.HttpLoadQueuePeon; import io.druid.server.http.security.StateResourceFilter; -import io.druid.server.security.AuthConfig; import javax.annotation.Nullable; import javax.servlet.AsyncContext; @@ -43,14 +45,17 @@ import javax.servlet.http.HttpServletResponse; import javax.ws.rs.Consumes; import javax.ws.rs.GET; +import javax.ws.rs.POST; import javax.ws.rs.Path; import javax.ws.rs.Produces; import javax.ws.rs.QueryParam; import javax.ws.rs.core.Context; import javax.ws.rs.core.MediaType; import java.io.IOException; +import java.util.List; /** + * Endpoints exposed here are to be used only for druid internal management of segments by Coordinators, Brokers etc. */ @Path("/druid-internal/v1/segments/") @ResourceFilters(StateResourceFilter.class) @@ -60,21 +65,21 @@ public class SegmentListerResource protected final ObjectMapper jsonMapper; protected final ObjectMapper smileMapper; - protected final AuthConfig authConfig; private final BatchDataSegmentAnnouncer announcer; + private final SegmentLoadDropHandler loadDropRequestHandler; @Inject public SegmentListerResource( @Json ObjectMapper jsonMapper, @Smile ObjectMapper smileMapper, - AuthConfig authConfig, - @Nullable BatchDataSegmentAnnouncer announcer + @Nullable BatchDataSegmentAnnouncer announcer, + @Nullable SegmentLoadDropHandler loadDropRequestHandler ) { this.jsonMapper = jsonMapper; this.smileMapper = smileMapper; - this.authConfig = authConfig; this.announcer = announcer; + this.loadDropRequestHandler = loadDropRequestHandler; } /** @@ -203,6 +208,117 @@ public void onFailure(Throwable th) asyncContext.setTimeout(timeout); } + /** + * This endpoint is used by HttpLoadQueuePeon to assign segment load/drop requests batch. This endpoint makes the + * client wait till one of the following events occur. Note that this is implemented using async IO so no jetty + * threads are held while in wait. + * + * (1) Given timeout elapses. + * (2) Some load/drop request completed. + * + * It returns a map of "load/drop request -> SUCCESS/FAILED/PENDING status" for each request in the batch. + */ + @POST + @Path("/changeRequests") + @Produces({MediaType.APPLICATION_JSON, SmileMediaTypes.APPLICATION_JACKSON_SMILE}) + @Consumes({MediaType.APPLICATION_JSON, SmileMediaTypes.APPLICATION_JACKSON_SMILE}) + public void applyDataSegmentChangeRequests( + @QueryParam("timeout") long timeout, + List changeRequestList, + @Context final HttpServletRequest req + ) throws IOException + { + if (loadDropRequestHandler == null) { + sendErrorResponse(req, HttpServletResponse.SC_NOT_FOUND, "load/drop handler is not available."); + return; + } + + if (timeout <= 0) { + sendErrorResponse(req, HttpServletResponse.SC_BAD_REQUEST, "timeout must be positive."); + return; + } + + if (changeRequestList == null || changeRequestList.isEmpty()) { + sendErrorResponse(req, HttpServletResponse.SC_BAD_REQUEST, "No change requests provided."); + return; + } + + final ResponseContext context = createContext(req.getHeader("Accept")); + final ListenableFuture> future = loadDropRequestHandler + .processBatch(changeRequestList); + + final AsyncContext asyncContext = req.startAsync(); + + asyncContext.addListener( + new AsyncListener() + { + @Override + public void onComplete(AsyncEvent event) throws IOException + { + } + + @Override + public void onTimeout(AsyncEvent event) throws IOException + { + + // HTTP 204 NO_CONTENT is sent to the client. + future.cancel(true); + event.getAsyncContext().complete(); + } + + @Override + public void onError(AsyncEvent event) throws IOException + { + } + + @Override + public void onStartAsync(AsyncEvent event) throws IOException + { + } + } + ); + + Futures.addCallback( + future, + new FutureCallback>() + { + @Override + public void onSuccess(List result) + { + try { + HttpServletResponse response = (HttpServletResponse) asyncContext.getResponse(); + response.setStatus(HttpServletResponse.SC_OK); + context.inputMapper.writerWithType(HttpLoadQueuePeon.RESPONSE_ENTITY_TYPE_REF) + .writeValue(asyncContext.getResponse().getOutputStream(), result); + asyncContext.complete(); + } + catch (Exception ex) { + log.debug(ex, "Request timed out or closed already."); + } + } + + @Override + public void onFailure(Throwable th) + { + try { + HttpServletResponse response = (HttpServletResponse) asyncContext.getResponse(); + if (th instanceof IllegalArgumentException) { + response.sendError(HttpServletResponse.SC_BAD_REQUEST, th.getMessage()); + } else { + response.sendError(HttpServletResponse.SC_INTERNAL_SERVER_ERROR, th.getMessage()); + } + asyncContext.complete(); + } + catch (Exception ex) { + log.debug(ex, "Request timed out or closed already."); + } + } + } + ); + + asyncContext.setTimeout(timeout); + } + private void sendErrorResponse(HttpServletRequest req, int code, String error) throws IOException { AsyncContext asyncContext = req.startAsync(); diff --git a/server/src/main/java/io/druid/server/initialization/jetty/JettyServerModule.java b/server/src/main/java/io/druid/server/initialization/jetty/JettyServerModule.java index 078210485ee2..3efa94f976bc 100644 --- a/server/src/main/java/io/druid/server/initialization/jetty/JettyServerModule.java +++ b/server/src/main/java/io/druid/server/initialization/jetty/JettyServerModule.java @@ -308,6 +308,7 @@ static Server makeAndInitializeServer( @Override public void start() throws Exception { + log.info("Starting Jetty Server..."); server.start(); if (node.isEnableTlsPort()) { // Perform validation @@ -336,6 +337,7 @@ public void start() throws Exception public void stop() { try { + log.info("Stopping Jetty Server..."); server.stop(); } catch (Exception e) { diff --git a/server/src/main/java/io/druid/server/metrics/HistoricalMetricsMonitor.java b/server/src/main/java/io/druid/server/metrics/HistoricalMetricsMonitor.java index f6bd6e2e6059..3d775984a135 100644 --- a/server/src/main/java/io/druid/server/metrics/HistoricalMetricsMonitor.java +++ b/server/src/main/java/io/druid/server/metrics/HistoricalMetricsMonitor.java @@ -26,7 +26,7 @@ import io.druid.client.DruidServerConfig; import io.druid.query.DruidMetrics; import io.druid.server.SegmentManager; -import io.druid.server.coordination.ZkCoordinator; +import io.druid.server.coordination.SegmentLoadDropHandler; import io.druid.timeline.DataSegment; import it.unimi.dsi.fastutil.objects.Object2LongMap; import it.unimi.dsi.fastutil.objects.Object2LongOpenHashMap; @@ -37,18 +37,18 @@ public class HistoricalMetricsMonitor extends AbstractMonitor { private final DruidServerConfig serverConfig; private final SegmentManager segmentManager; - private final ZkCoordinator zkCoordinator; + private final SegmentLoadDropHandler segmentLoadDropMgr; @Inject public HistoricalMetricsMonitor( DruidServerConfig serverConfig, SegmentManager segmentManager, - ZkCoordinator zkCoordinator + SegmentLoadDropHandler segmentLoadDropMgr ) { this.serverConfig = serverConfig; this.segmentManager = segmentManager; - this.zkCoordinator = zkCoordinator; + this.segmentLoadDropMgr = segmentLoadDropMgr; } @Override @@ -58,7 +58,7 @@ public boolean doMonitor(ServiceEmitter emitter) final Object2LongOpenHashMap pendingDeleteSizes = new Object2LongOpenHashMap<>(); - for (DataSegment segment : zkCoordinator.getPendingDeleteSnapshot()) { + for (DataSegment segment : segmentLoadDropMgr.getPendingDeleteSnapshot()) { pendingDeleteSizes.addTo(segment.getDataSource(), segment.getSize()); } diff --git a/server/src/test/java/io/druid/server/coordination/SegmentLoadDropHandlerTest.java b/server/src/test/java/io/druid/server/coordination/SegmentLoadDropHandlerTest.java new file mode 100644 index 000000000000..122c83bf07cd --- /dev/null +++ b/server/src/test/java/io/druid/server/coordination/SegmentLoadDropHandlerTest.java @@ -0,0 +1,487 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.server.coordination; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import io.druid.jackson.DefaultObjectMapper; +import io.druid.java.util.common.Intervals; +import io.druid.java.util.common.concurrent.ScheduledExecutorFactory; +import io.druid.java.util.common.logger.Logger; +import io.druid.segment.IndexIO; +import io.druid.segment.loading.CacheTestSegmentLoader; +import io.druid.segment.loading.SegmentLoaderConfig; +import io.druid.server.SegmentManager; +import io.druid.server.initialization.ZkPathsConfig; +import io.druid.timeline.DataSegment; +import io.druid.timeline.partition.NoneShardSpec; +import org.easymock.EasyMock; +import org.joda.time.Interval; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.io.File; +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.ConcurrentSkipListSet; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +/** + */ +public class SegmentLoadDropHandlerTest +{ + public static final int COUNT = 50; + + private static final Logger log = new Logger(ZkCoordinatorTest.class); + + private final ObjectMapper jsonMapper = new DefaultObjectMapper(); + private final DruidServerMetadata me = new DruidServerMetadata( + "dummyServer", + "dummyHost", + null, + 0, + ServerType.HISTORICAL, + "normal", + 0 + ); + + private SegmentLoadDropHandler segmentLoadDropHandler; + private DataSegmentAnnouncer announcer; + private File infoDir; + private AtomicInteger announceCount; + private ConcurrentSkipListSet segmentsAnnouncedByMe; + private CacheTestSegmentLoader segmentLoader; + private SegmentManager segmentManager; + private List scheduledRunnable; + + @Before + public void setUp() throws Exception + { + try { + infoDir = new File(File.createTempFile("blah", "blah2").getParent(), "ZkCoordinatorTest"); + infoDir.mkdirs(); + for (File file : infoDir.listFiles()) { + file.delete(); + } + log.info("Creating tmp test files in [%s]", infoDir); + } + catch (IOException e) { + throw new RuntimeException(e); + } + + scheduledRunnable = Lists.newArrayList(); + + segmentLoader = new CacheTestSegmentLoader(); + segmentManager = new SegmentManager(segmentLoader); + final ZkPathsConfig zkPaths = new ZkPathsConfig() + { + @Override + public String getBase() + { + return "/druid"; + } + }; + + segmentsAnnouncedByMe = new ConcurrentSkipListSet<>(); + announceCount = new AtomicInteger(0); + + announcer = new DataSegmentAnnouncer() + { + @Override + public void announceSegment(DataSegment segment) throws IOException + { + segmentsAnnouncedByMe.add(segment); + announceCount.incrementAndGet(); + } + + @Override + public void unannounceSegment(DataSegment segment) throws IOException + { + segmentsAnnouncedByMe.remove(segment); + announceCount.decrementAndGet(); + } + + @Override + public void announceSegments(Iterable segments) throws IOException + { + for (DataSegment segment : segments) { + segmentsAnnouncedByMe.add(segment); + } + announceCount.addAndGet(Iterables.size(segments)); + } + + @Override + public void unannounceSegments(Iterable segments) throws IOException + { + for (DataSegment segment : segments) { + segmentsAnnouncedByMe.remove(segment); + } + announceCount.addAndGet(-Iterables.size(segments)); + } + }; + + segmentLoadDropHandler = new SegmentLoadDropHandler( + jsonMapper, + new SegmentLoaderConfig() + { + @Override + public File getInfoDir() + { + return infoDir; + } + + @Override + public int getNumLoadingThreads() + { + return 5; + } + + @Override + public int getAnnounceIntervalMillis() + { + return 50; + } + + @Override + public int getDropSegmentDelayMillis() + { + return 0; + } + }, + announcer, + EasyMock.createNiceMock(DataSegmentServerAnnouncer.class), + segmentManager, + new ScheduledExecutorFactory() + { + @Override + public ScheduledExecutorService create(int corePoolSize, String nameFormat) + { + /* + Override normal behavoir by adding the runnable to a list so that you can make sure + all the shceduled runnables are executed by explicitly calling run() on each item in the list + */ + return new ScheduledThreadPoolExecutor( + corePoolSize, new ThreadFactoryBuilder().setDaemon(true).setNameFormat(nameFormat).build() + ) + { + @Override + public ScheduledFuture schedule( + Runnable command, long delay, TimeUnit unit + ) + { + scheduledRunnable.add(command); + return null; + } + }; + } + }.create(5, "SegmentLoadDropHandlerTest-[%d]") + ); + } + + /** + * Steps: + * 1. removeSegment() schedules a delete runnable that deletes segment files, + * 2. addSegment() succesfully loads the segment and annouces it + * 3. scheduled delete task executes and realizes it should not delete the segment files. + */ + @Test + public void testSegmentLoading1() throws Exception + { + segmentLoadDropHandler.start(); + + final DataSegment segment = makeSegment("test", "1", Intervals.of("P1d/2011-04-01")); + + segmentLoadDropHandler.removeSegment(segment, DataSegmentChangeCallback.NOOP); + + Assert.assertFalse(segmentsAnnouncedByMe.contains(segment)); + + segmentLoadDropHandler.addSegment(segment, DataSegmentChangeCallback.NOOP); + + /* + make sure the scheduled runnable that "deletes" segment files has been executed. + Because another addSegment() call is executed, which removes the segment from segmentsToDelete field in + ZkCoordinator, the scheduled runnable will not actually delete segment files. + */ + for (Runnable runnable : scheduledRunnable) { + runnable.run(); + } + + Assert.assertTrue(segmentsAnnouncedByMe.contains(segment)); + Assert.assertFalse("segment files shouldn't be deleted", segmentLoader.getSegmentsInTrash().contains(segment)); + + segmentLoadDropHandler.stop(); + } + + /** + * Steps: + * 1. addSegment() succesfully loads the segment and annouces it + * 2. removeSegment() unannounces the segment and schedules a delete runnable that deletes segment files + * 3. addSegment() calls loadSegment() and annouces it again + * 4. scheduled delete task executes and realizes it should not delete the segment files. + */ + @Test + public void testSegmentLoading2() throws Exception + { + segmentLoadDropHandler.start(); + + final DataSegment segment = makeSegment("test", "1", Intervals.of("P1d/2011-04-01")); + + segmentLoadDropHandler.addSegment(segment, DataSegmentChangeCallback.NOOP); + + Assert.assertTrue(segmentsAnnouncedByMe.contains(segment)); + + segmentLoadDropHandler.removeSegment(segment, DataSegmentChangeCallback.NOOP); + + Assert.assertFalse(segmentsAnnouncedByMe.contains(segment)); + + segmentLoadDropHandler.addSegment(segment, DataSegmentChangeCallback.NOOP); + + /* + make sure the scheduled runnable that "deletes" segment files has been executed. + Because another addSegment() call is executed, which removes the segment from segmentsToDelete field in + ZkCoordinator, the scheduled runnable will not actually delete segment files. + */ + for (Runnable runnable : scheduledRunnable) { + runnable.run(); + } + + Assert.assertTrue(segmentsAnnouncedByMe.contains(segment)); + Assert.assertFalse("segment files shouldn't be deleted", segmentLoader.getSegmentsInTrash().contains(segment)); + + segmentLoadDropHandler.stop(); + } + + @Test + public void testLoadCache() throws Exception + { + List segments = Lists.newLinkedList(); + for (int i = 0; i < COUNT; ++i) { + segments.add(makeSegment("test" + i, "1", Intervals.of("P1d/2011-04-01"))); + segments.add(makeSegment("test" + i, "1", Intervals.of("P1d/2011-04-02"))); + segments.add(makeSegment("test" + i, "2", Intervals.of("P1d/2011-04-02"))); + segments.add(makeSegment("test" + i, "1", Intervals.of("P1d/2011-04-03"))); + segments.add(makeSegment("test" + i, "1", Intervals.of("P1d/2011-04-04"))); + segments.add(makeSegment("test" + i, "1", Intervals.of("P1d/2011-04-05"))); + segments.add(makeSegment("test" + i, "2", Intervals.of("PT1h/2011-04-04T01"))); + segments.add(makeSegment("test" + i, "2", Intervals.of("PT1h/2011-04-04T02"))); + segments.add(makeSegment("test" + i, "2", Intervals.of("PT1h/2011-04-04T03"))); + segments.add(makeSegment("test" + i, "2", Intervals.of("PT1h/2011-04-04T05"))); + segments.add(makeSegment("test" + i, "2", Intervals.of("PT1h/2011-04-04T06"))); + segments.add(makeSegment("test_two" + i, "1", Intervals.of("P1d/2011-04-01"))); + segments.add(makeSegment("test_two" + i, "1", Intervals.of("P1d/2011-04-02"))); + } + Collections.sort(segments); + + for (DataSegment segment : segments) { + writeSegmentToCache(segment); + } + + checkCache(segments); + Assert.assertTrue(segmentManager.getDataSourceCounts().isEmpty()); + segmentLoadDropHandler.start(); + Assert.assertTrue(!segmentManager.getDataSourceCounts().isEmpty()); + for (int i = 0; i < COUNT; ++i) { + Assert.assertEquals(11L, segmentManager.getDataSourceCounts().get("test" + i).longValue()); + Assert.assertEquals(2L, segmentManager.getDataSourceCounts().get("test_two" + i).longValue()); + } + Assert.assertEquals(13 * COUNT, announceCount.get()); + segmentLoadDropHandler.stop(); + + for (DataSegment segment : segments) { + deleteSegmentFromCache(segment); + } + + Assert.assertEquals(0, infoDir.listFiles().length); + Assert.assertTrue(infoDir.delete()); + } + + private DataSegment makeSegment(String dataSource, String version, Interval interval) + { + return new DataSegment( + dataSource, + interval, + version, + ImmutableMap.of("version", version, "interval", interval, "cacheDir", infoDir), + Arrays.asList("dim1", "dim2", "dim3"), + Arrays.asList("metric1", "metric2"), + NoneShardSpec.instance(), + IndexIO.CURRENT_VERSION_ID, + 123L + ); + } + + private void writeSegmentToCache(final DataSegment segment) throws IOException + { + if (!infoDir.exists()) { + infoDir.mkdir(); + } + + File segmentInfoCacheFile = new File( + infoDir, + segment.getIdentifier() + ); + try { + jsonMapper.writeValue(segmentInfoCacheFile, segment); + } + catch (IOException e) { + throw new RuntimeException(e); + } + + Assert.assertTrue(segmentInfoCacheFile.exists()); + } + + private void deleteSegmentFromCache(final DataSegment segment) throws IOException + { + File segmentInfoCacheFile = new File( + infoDir, + segment.getIdentifier() + ); + if (segmentInfoCacheFile.exists()) { + segmentInfoCacheFile.delete(); + } + + Assert.assertTrue(!segmentInfoCacheFile.exists()); + } + + private void checkCache(List segments) throws IOException + { + Assert.assertTrue(infoDir.exists()); + File[] files = infoDir.listFiles(); + + List sortedFiles = Lists.newArrayList(files); + Collections.sort(sortedFiles); + + Assert.assertEquals(segments.size(), sortedFiles.size()); + for (int i = 0; i < sortedFiles.size(); i++) { + DataSegment segment = jsonMapper.readValue(sortedFiles.get(i), DataSegment.class); + Assert.assertEquals(segments.get(i), segment); + } + } + + @Test + public void testStartStop() throws Exception + { + SegmentLoadDropHandler handler = new SegmentLoadDropHandler( + jsonMapper, + new SegmentLoaderConfig() + { + @Override + public File getInfoDir() + { + return infoDir; + } + + @Override + public int getNumLoadingThreads() + { + return 5; + } + + @Override + public int getAnnounceIntervalMillis() + { + return 50; + } + }, + announcer, EasyMock.createNiceMock(DataSegmentServerAnnouncer.class), segmentManager + ); + + List segments = Lists.newLinkedList(); + for (int i = 0; i < COUNT; ++i) { + segments.add(makeSegment("test" + i, "1", Intervals.of("P1d/2011-04-01"))); + segments.add(makeSegment("test" + i, "1", Intervals.of("P1d/2011-04-02"))); + segments.add(makeSegment("test" + i, "2", Intervals.of("P1d/2011-04-02"))); + segments.add(makeSegment("test_two" + i, "1", Intervals.of("P1d/2011-04-01"))); + segments.add(makeSegment("test_two" + i, "1", Intervals.of("P1d/2011-04-02"))); + } + Collections.sort(segments); + + for (DataSegment segment : segments) { + writeSegmentToCache(segment); + } + + checkCache(segments); + Assert.assertTrue(segmentManager.getDataSourceCounts().isEmpty()); + + handler.start(); + Assert.assertTrue(!segmentManager.getDataSourceCounts().isEmpty()); + for (int i = 0; i < COUNT; ++i) { + Assert.assertEquals(3L, segmentManager.getDataSourceCounts().get("test" + i).longValue()); + Assert.assertEquals(2L, segmentManager.getDataSourceCounts().get("test_two" + i).longValue()); + } + Assert.assertEquals(5 * COUNT, announceCount.get()); + handler.stop(); + + for (DataSegment segment : segments) { + deleteSegmentFromCache(segment); + } + + Assert.assertEquals(0, infoDir.listFiles().length); + Assert.assertTrue(infoDir.delete()); + } + + @Test(timeout = 1000L) + public void testProcessBatch() throws Exception + { + segmentLoadDropHandler.start(); + + DataSegment segment1 = makeSegment("batchtest1", "1", Intervals.of("P1d/2011-04-01")); + DataSegment segment2 = makeSegment("batchtest2", "1", Intervals.of("P1d/2011-04-01")); + + List batch = ImmutableList.of( + new SegmentChangeRequestLoad(segment1), + new SegmentChangeRequestDrop(segment2) + ); + + ListenableFuture> future = segmentLoadDropHandler + .processBatch(batch); + + List result = future.get(); + Assert.assertEquals(SegmentLoadDropHandler.Status.PENDING, result.get(0).getStatus()); + Assert.assertEquals(SegmentLoadDropHandler.Status.SUCCESS, result.get(1).getStatus()); + + for (Runnable runnable : scheduledRunnable) { + runnable.run(); + } + + result = segmentLoadDropHandler.processBatch(batch).get(); + Assert.assertEquals(SegmentLoadDropHandler.Status.SUCCESS, result.get(0).getStatus()); + Assert.assertEquals(SegmentLoadDropHandler.Status.SUCCESS, result.get(1).getStatus()); + + + for (SegmentLoadDropHandler.DataSegmentChangeRequestAndStatus e : segmentLoadDropHandler.processBatch(batch).get()) { + Assert.assertEquals(SegmentLoadDropHandler.Status.SUCCESS, e.getStatus()); + } + + segmentLoadDropHandler.stop(); + } +} diff --git a/server/src/test/java/io/druid/server/coordination/ZkCoordinatorTest.java b/server/src/test/java/io/druid/server/coordination/ZkCoordinatorTest.java index fbd407265280..ce106ff4fdac 100644 --- a/server/src/test/java/io/druid/server/coordination/ZkCoordinatorTest.java +++ b/server/src/test/java/io/druid/server/coordination/ZkCoordinatorTest.java @@ -21,63 +21,33 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Iterables; -import com.google.common.collect.Lists; -import com.google.common.util.concurrent.MoreExecutors; -import com.google.common.util.concurrent.ThreadFactoryBuilder; -import com.google.inject.Binder; -import com.google.inject.Guice; -import com.google.inject.Injector; -import com.google.inject.Module; -import io.druid.client.cache.CacheConfig; -import io.druid.client.cache.LocalCacheProvider; -import io.druid.concurrent.Execs; +import com.metamx.emitter.EmittingLogger; +import io.druid.TestUtil; import io.druid.curator.CuratorTestBase; -import io.druid.curator.announcement.Announcer; import io.druid.jackson.DefaultObjectMapper; import io.druid.java.util.common.Intervals; -import io.druid.java.util.common.concurrent.ScheduledExecutorFactory; -import io.druid.java.util.common.concurrent.ScheduledExecutors; -import io.druid.java.util.common.lifecycle.Lifecycle; -import io.druid.java.util.common.logger.Logger; -import io.druid.query.NoopQueryRunnerFactoryConglomerate; import io.druid.segment.IndexIO; -import io.druid.segment.loading.CacheTestSegmentLoader; import io.druid.segment.loading.SegmentLoaderConfig; import io.druid.server.SegmentManager; -import io.druid.server.initialization.BatchDataSegmentAnnouncerConfig; import io.druid.server.initialization.ZkPathsConfig; import io.druid.server.metrics.NoopServiceEmitter; import io.druid.timeline.DataSegment; import io.druid.timeline.partition.NoneShardSpec; -import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.utils.ZKPaths; +import org.apache.zookeeper.CreateMode; import org.easymock.EasyMock; -import org.joda.time.Interval; import org.junit.After; -import org.junit.Assert; import org.junit.Before; import org.junit.Test; -import java.io.File; -import java.io.IOException; import java.util.Arrays; -import java.util.Collections; -import java.util.List; -import java.util.concurrent.ConcurrentSkipListSet; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.ScheduledThreadPoolExecutor; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; /** */ public class ZkCoordinatorTest extends CuratorTestBase { - public static final int COUNT = 50; - - private static final Logger log = new Logger(ZkCoordinatorTest.class); - private final ObjectMapper jsonMapper = new DefaultObjectMapper(); private final DruidServerMetadata me = new DruidServerMetadata( "dummyServer", @@ -88,16 +58,15 @@ public class ZkCoordinatorTest extends CuratorTestBase "normal", 0 ); - + private final ZkPathsConfig zkPaths = new ZkPathsConfig() + { + @Override + public String getBase() + { + return "/druid"; + } + }; private ZkCoordinator zkCoordinator; - private ServerManager serverManager; - private DataSegmentAnnouncer announcer; - private File infoDir; - private AtomicInteger announceCount; - private ConcurrentSkipListSet segmentsAnnouncedByMe; - private CacheTestSegmentLoader segmentLoader; - private SegmentManager segmentManager; - private List scheduledRunnable; @Before public void setUp() throws Exception @@ -105,451 +74,90 @@ public void setUp() throws Exception setupServerAndCurator(); curator.start(); curator.blockUntilConnected(); - try { - infoDir = new File(File.createTempFile("blah", "blah2").getParent(), "ZkCoordinatorTest"); - infoDir.mkdirs(); - for (File file : infoDir.listFiles()) { - file.delete(); - } - log.info("Creating tmp test files in [%s]", infoDir); - } - catch (IOException e) { - throw new RuntimeException(e); - } - - scheduledRunnable = Lists.newArrayList(); + } - segmentLoader = new CacheTestSegmentLoader(); - segmentManager = new SegmentManager(segmentLoader); + @After + public void tearDown() throws Exception + { + tearDownServerAndCurator(); + } - serverManager = new ServerManager( - new NoopQueryRunnerFactoryConglomerate(), - new NoopServiceEmitter(), - MoreExecutors.sameThreadExecutor(), - MoreExecutors.sameThreadExecutor(), - new DefaultObjectMapper(), - new LocalCacheProvider().get(), - new CacheConfig(), - segmentManager + @Test(timeout = 5000L) + public void testLoadDrop() throws Exception + { + EmittingLogger.registerEmitter(new NoopServiceEmitter()); + DataSegment segment = new DataSegment( + "test", + Intervals.of("P1d/2011-04-02"), + "v0", + ImmutableMap.of("version", "v0", "interval", Intervals.of("P1d/2011-04-02"), "cacheDir", "/no"), + Arrays.asList("dim1", "dim2", "dim3"), + Arrays.asList("metric1", "metric2"), + NoneShardSpec.instance(), + IndexIO.CURRENT_VERSION_ID, + 123L ); - final ZkPathsConfig zkPaths = new ZkPathsConfig() - { - @Override - public String getBase() - { - return "/druid"; - } - }; - - segmentsAnnouncedByMe = new ConcurrentSkipListSet<>(); - announceCount = new AtomicInteger(0); + CountDownLatch loadLatch = new CountDownLatch(1); + CountDownLatch dropLatch = new CountDownLatch(1); - announcer = new DataSegmentAnnouncer() + SegmentLoadDropHandler segmentLoadDropHandler = new SegmentLoadDropHandler( + TestUtil.MAPPER, + new SegmentLoaderConfig(), + EasyMock.createNiceMock(DataSegmentAnnouncer.class), + EasyMock.createNiceMock(DataSegmentServerAnnouncer.class), + EasyMock.createNiceMock(SegmentManager.class), + EasyMock.createNiceMock(ScheduledExecutorService.class) + ) { - private final DataSegmentAnnouncer delegate = new BatchDataSegmentAnnouncer( - me, - new BatchDataSegmentAnnouncerConfig(), - zkPaths, - new Announcer(curator, Execs.singleThreaded("blah")), - jsonMapper - ); - - @Override - public void announceSegment(DataSegment segment) throws IOException - { - segmentsAnnouncedByMe.add(segment); - announceCount.incrementAndGet(); - delegate.announceSegment(segment); - } - - @Override - public void unannounceSegment(DataSegment segment) throws IOException - { - segmentsAnnouncedByMe.remove(segment); - announceCount.decrementAndGet(); - delegate.unannounceSegment(segment); - } - @Override - public void announceSegments(Iterable segments) throws IOException + public void addSegment(DataSegment s, DataSegmentChangeCallback callback) { - for (DataSegment segment : segments) { - segmentsAnnouncedByMe.add(segment); + if (segment.getIdentifier().equals(s.getIdentifier())) { + loadLatch.countDown(); + callback.execute(); } - announceCount.addAndGet(Iterables.size(segments)); - delegate.announceSegments(segments); } @Override - public void unannounceSegments(Iterable segments) throws IOException + public void removeSegment(DataSegment s, DataSegmentChangeCallback callback) { - for (DataSegment segment : segments) { - segmentsAnnouncedByMe.remove(segment); + if (segment.getIdentifier().equals(s.getIdentifier())) { + dropLatch.countDown(); + callback.execute(); } - announceCount.addAndGet(-Iterables.size(segments)); - delegate.unannounceSegments(segments); } }; zkCoordinator = new ZkCoordinator( + segmentLoadDropHandler, jsonMapper, - new SegmentLoaderConfig() - { - @Override - public File getInfoDir() - { - return infoDir; - } - - @Override - public int getAnnounceIntervalMillis() - { - return 50; - } - - @Override - public int getDropSegmentDelayMillis() - { - return 0; - } - }, zkPaths, me, - announcer, - EasyMock.createNiceMock(DataSegmentServerAnnouncer.class), - curator, - segmentManager, - new ScheduledExecutorFactory() - { - @Override - public ScheduledExecutorService create(int corePoolSize, String nameFormat) - { - /* - Override normal behavoir by adding the runnable to a list so that you can make sure - all the shceduled runnables are executed by explicitly calling run() on each item in the list - */ - return new ScheduledThreadPoolExecutor( - corePoolSize, new ThreadFactoryBuilder().setDaemon(true).setNameFormat(nameFormat).build() - ) - { - @Override - public ScheduledFuture schedule( - Runnable command, long delay, TimeUnit unit - ) - { - scheduledRunnable.add(command); - return null; - } - }; - } - } + curator ); - } - - @After - public void tearDown() throws Exception - { - tearDownServerAndCurator(); - } - - /** - * Steps: - * 1. removeSegment() schedules a delete runnable that deletes segment files, - * 2. addSegment() succesfully loads the segment and annouces it - * 3. scheduled delete task executes and realizes it should not delete the segment files. - */ - @Test - public void testSegmentLoading1() throws Exception - { zkCoordinator.start(); - final DataSegment segment = makeSegment("test", "1", Intervals.of("P1d/2011-04-01")); + String segmentZkPath = ZKPaths.makePath(zkPaths.getLoadQueuePath(), me.getName(), segment.getIdentifier()); - zkCoordinator.removeSegment(segment, new DataSegmentChangeCallback() - { - @Override - public void execute() - { - // do nothing - } - }); + curator.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath( + segmentZkPath, jsonMapper.writeValueAsBytes(new SegmentChangeRequestLoad(segment))); - Assert.assertFalse(segmentsAnnouncedByMe.contains(segment)); + loadLatch.await(); - zkCoordinator.addSegment(segment, new DataSegmentChangeCallback() - { - @Override - public void execute() - { - // do nothing - } - }); - - /* - make sure the scheduled runnable that "deletes" segment files has been executed. - Because another addSegment() call is executed, which removes the segment from segmentsToDelete field in - ZkCoordinator, the scheduled runnable will not actually delete segment files. - */ - for (Runnable runnable : scheduledRunnable) { - runnable.run(); + while (curator.checkExists().forPath(segmentZkPath) != null) { + Thread.sleep(100); } - Assert.assertTrue(segmentsAnnouncedByMe.contains(segment)); - Assert.assertFalse("segment files shouldn't be deleted", segmentLoader.getSegmentsInTrash().contains(segment)); - - zkCoordinator.stop(); - } - - /** - * Steps: - * 1. addSegment() succesfully loads the segment and annouces it - * 2. removeSegment() unannounces the segment and schedules a delete runnable that deletes segment files - * 3. addSegment() calls loadSegment() and annouces it again - * 4. scheduled delete task executes and realizes it should not delete the segment files. - */ - @Test - public void testSegmentLoading2() throws Exception - { - zkCoordinator.start(); - - final DataSegment segment = makeSegment("test", "1", Intervals.of("P1d/2011-04-01")); + curator.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath( + segmentZkPath, jsonMapper.writeValueAsBytes(new SegmentChangeRequestDrop(segment))); - zkCoordinator.addSegment(segment, new DataSegmentChangeCallback() - { - @Override - public void execute() - { - // do nothing - } - }); - - Assert.assertTrue(segmentsAnnouncedByMe.contains(segment)); + dropLatch.await(); - zkCoordinator.removeSegment(segment, new DataSegmentChangeCallback() - { - @Override - public void execute() - { - // do nothing - } - }); - - Assert.assertFalse(segmentsAnnouncedByMe.contains(segment)); - - zkCoordinator.addSegment(segment, new DataSegmentChangeCallback() - { - @Override - public void execute() - { - // do nothing - } - }); - - /* - make sure the scheduled runnable that "deletes" segment files has been executed. - Because another addSegment() call is executed, which removes the segment from segmentsToDelete field in - ZkCoordinator, the scheduled runnable will not actually delete segment files. - */ - for (Runnable runnable : scheduledRunnable) { - runnable.run(); + while (curator.checkExists().forPath(segmentZkPath) != null) { + Thread.sleep(100); } - Assert.assertTrue(segmentsAnnouncedByMe.contains(segment)); - Assert.assertFalse("segment files shouldn't be deleted", segmentLoader.getSegmentsInTrash().contains(segment)); - zkCoordinator.stop(); } - - @Test - public void testLoadCache() throws Exception - { - List segments = Lists.newLinkedList(); - for (int i = 0; i < COUNT; ++i) { - segments.add(makeSegment("test" + i, "1", Intervals.of("P1d/2011-04-01"))); - segments.add(makeSegment("test" + i, "1", Intervals.of("P1d/2011-04-02"))); - segments.add(makeSegment("test" + i, "2", Intervals.of("P1d/2011-04-02"))); - segments.add(makeSegment("test" + i, "1", Intervals.of("P1d/2011-04-03"))); - segments.add(makeSegment("test" + i, "1", Intervals.of("P1d/2011-04-04"))); - segments.add(makeSegment("test" + i, "1", Intervals.of("P1d/2011-04-05"))); - segments.add(makeSegment("test" + i, "2", Intervals.of("PT1h/2011-04-04T01"))); - segments.add(makeSegment("test" + i, "2", Intervals.of("PT1h/2011-04-04T02"))); - segments.add(makeSegment("test" + i, "2", Intervals.of("PT1h/2011-04-04T03"))); - segments.add(makeSegment("test" + i, "2", Intervals.of("PT1h/2011-04-04T05"))); - segments.add(makeSegment("test" + i, "2", Intervals.of("PT1h/2011-04-04T06"))); - segments.add(makeSegment("test_two" + i, "1", Intervals.of("P1d/2011-04-01"))); - segments.add(makeSegment("test_two" + i, "1", Intervals.of("P1d/2011-04-02"))); - } - Collections.sort(segments); - - for (DataSegment segment : segments) { - writeSegmentToCache(segment); - } - - checkCache(segments); - Assert.assertTrue(segmentManager.getDataSourceCounts().isEmpty()); - zkCoordinator.start(); - Assert.assertTrue(!segmentManager.getDataSourceCounts().isEmpty()); - for (int i = 0; i < COUNT; ++i) { - Assert.assertEquals(11L, segmentManager.getDataSourceCounts().get("test" + i).longValue()); - Assert.assertEquals(2L, segmentManager.getDataSourceCounts().get("test_two" + i).longValue()); - } - Assert.assertEquals(13 * COUNT, announceCount.get()); - zkCoordinator.stop(); - - for (DataSegment segment : segments) { - deleteSegmentFromCache(segment); - } - - Assert.assertEquals(0, infoDir.listFiles().length); - Assert.assertTrue(infoDir.delete()); - } - - private DataSegment makeSegment(String dataSource, String version, Interval interval) - { - return new DataSegment( - dataSource, - interval, - version, - ImmutableMap.of("version", version, "interval", interval, "cacheDir", infoDir), - Arrays.asList("dim1", "dim2", "dim3"), - Arrays.asList("metric1", "metric2"), - NoneShardSpec.instance(), - IndexIO.CURRENT_VERSION_ID, - 123L - ); - } - - private void writeSegmentToCache(final DataSegment segment) throws IOException - { - if (!infoDir.exists()) { - infoDir.mkdir(); - } - - File segmentInfoCacheFile = new File( - infoDir, - segment.getIdentifier() - ); - try { - jsonMapper.writeValue(segmentInfoCacheFile, segment); - } - catch (IOException e) { - throw new RuntimeException(e); - } - - Assert.assertTrue(segmentInfoCacheFile.exists()); - } - - private void deleteSegmentFromCache(final DataSegment segment) throws IOException - { - File segmentInfoCacheFile = new File( - infoDir, - segment.getIdentifier() - ); - if (segmentInfoCacheFile.exists()) { - segmentInfoCacheFile.delete(); - } - - Assert.assertTrue(!segmentInfoCacheFile.exists()); - } - - private void checkCache(List segments) throws IOException - { - Assert.assertTrue(infoDir.exists()); - File[] files = infoDir.listFiles(); - - List sortedFiles = Lists.newArrayList(files); - Collections.sort(sortedFiles); - - Assert.assertEquals(segments.size(), sortedFiles.size()); - for (int i = 0; i < sortedFiles.size(); i++) { - DataSegment segment = jsonMapper.readValue(sortedFiles.get(i), DataSegment.class); - Assert.assertEquals(segments.get(i), segment); - } - } - - @Test - public void testInjector() throws Exception - { - Injector injector = Guice.createInjector( - new Module() - { - @Override - public void configure(Binder binder) - { - binder.bind(ObjectMapper.class).toInstance(jsonMapper); - binder.bind(SegmentLoaderConfig.class).toInstance( - new SegmentLoaderConfig() - { - @Override - public File getInfoDir() - { - return infoDir; - } - - @Override - public int getAnnounceIntervalMillis() - { - return 50; - } - } - ); - binder.bind(ZkPathsConfig.class).toInstance( - new ZkPathsConfig() - { - @Override - public String getBase() - { - return "/druid"; - } - } - ); - binder.bind(DruidServerMetadata.class) - .toInstance(new DruidServerMetadata("dummyServer", "dummyHost", null, 0, ServerType.HISTORICAL, "normal", 0)); - binder.bind(DataSegmentAnnouncer.class).toInstance(announcer); - binder.bind(DataSegmentServerAnnouncer.class).toInstance(EasyMock.createNiceMock(DataSegmentServerAnnouncer.class)); - binder.bind(CuratorFramework.class).toInstance(curator); - binder.bind(ServerManager.class).toInstance(serverManager); - binder.bind(SegmentManager.class).toInstance(segmentManager); - binder.bind(ScheduledExecutorFactory.class).toInstance(ScheduledExecutors.createFactory(new Lifecycle())); - } - - } - ); - - ZkCoordinator zkCoordinator = injector.getInstance(ZkCoordinator.class); - - List segments = Lists.newLinkedList(); - for (int i = 0; i < COUNT; ++i) { - segments.add(makeSegment("test" + i, "1", Intervals.of("P1d/2011-04-01"))); - segments.add(makeSegment("test" + i, "1", Intervals.of("P1d/2011-04-02"))); - segments.add(makeSegment("test" + i, "2", Intervals.of("P1d/2011-04-02"))); - segments.add(makeSegment("test_two" + i, "1", Intervals.of("P1d/2011-04-01"))); - segments.add(makeSegment("test_two" + i, "1", Intervals.of("P1d/2011-04-02"))); - } - Collections.sort(segments); - - for (DataSegment segment : segments) { - writeSegmentToCache(segment); - } - - checkCache(segments); - Assert.assertTrue(segmentManager.getDataSourceCounts().isEmpty()); - - zkCoordinator.start(); - Assert.assertTrue(!segmentManager.getDataSourceCounts().isEmpty()); - for (int i = 0; i < COUNT; ++i) { - Assert.assertEquals(3L, segmentManager.getDataSourceCounts().get("test" + i).longValue()); - Assert.assertEquals(2L, segmentManager.getDataSourceCounts().get("test_two" + i).longValue()); - } - Assert.assertEquals(5 * COUNT, announceCount.get()); - zkCoordinator.stop(); - - for (DataSegment segment : segments) { - deleteSegmentFromCache(segment); - } - - Assert.assertEquals(0, infoDir.listFiles().length); - Assert.assertTrue(infoDir.delete()); - } } diff --git a/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorTest.java b/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorTest.java index 5157dba8040a..c2009ddfeb54 100644 --- a/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorTest.java +++ b/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorTest.java @@ -141,7 +141,7 @@ public void setUp() throws Exception true, Execs.singleThreaded("coordinator_test_path_children_cache-%d") ); - loadQueuePeon = new LoadQueuePeon( + loadQueuePeon = new CuratorLoadQueuePeon( curator, LOADPATH, objectMapper, diff --git a/server/src/test/java/io/druid/server/coordinator/HttpLoadQueuePeonTest.java b/server/src/test/java/io/druid/server/coordinator/HttpLoadQueuePeonTest.java new file mode 100644 index 000000000000..b9adbb89a017 --- /dev/null +++ b/server/src/test/java/io/druid/server/coordinator/HttpLoadQueuePeonTest.java @@ -0,0 +1,209 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.server.coordinator; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.metamx.http.client.HttpClient; +import com.metamx.http.client.Request; +import com.metamx.http.client.response.HttpResponseHandler; +import io.druid.TestUtil; +import io.druid.concurrent.Execs; +import io.druid.discovery.DiscoveryDruidNode; +import io.druid.discovery.DruidNodeDiscovery; +import io.druid.java.util.common.Intervals; +import io.druid.java.util.common.RE; +import io.druid.server.coordination.DataSegmentChangeRequest; +import io.druid.server.coordination.SegmentLoadDropHandler; +import io.druid.timeline.DataSegment; +import org.jboss.netty.buffer.ChannelBuffers; +import org.jboss.netty.handler.codec.http.DefaultHttpResponse; +import org.jboss.netty.handler.codec.http.HttpResponse; +import org.jboss.netty.handler.codec.http.HttpResponseStatus; +import org.jboss.netty.handler.codec.http.HttpVersion; +import org.joda.time.Duration; +import org.junit.Test; + +import java.io.ByteArrayInputStream; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicInteger; + +/** + */ +public class HttpLoadQueuePeonTest +{ + @Test(timeout = 10000) + public void testSimple() throws Exception + { + final DataSegment segment1 = new DataSegment( + "test1", Intervals.of("2014/2015"), "v1", + null, null, null, null, 0, 0 + ); + + final DataSegment segment2 = new DataSegment( + "test2", Intervals.of("2014/2015"), "v1", + null, null, null, null, 0, 0 + ); + + final DataSegment segment3 = new DataSegment( + "test3", Intervals.of("2014/2015"), "v1", + null, null, null, null, 0, 0 + ); + + final DataSegment segment4 = new DataSegment( + "test4", Intervals.of("2014/2015"), "v1", + null, null, null, null, 0, 0 + ); + + HttpLoadQueuePeon httpLoadQueuePeon = new HttpLoadQueuePeon( + "http://dummy:4000", + TestUtil.MAPPER, + new TestHttpClient(), + new TestDruidCoordinatorConfig(null, null, null, null, null, null, 10, null, false, false, Duration.ZERO) { + @Override + public int getHttpLoadQueuePeonBatchSize() + { + return 2; + } + }, + Executors.newScheduledThreadPool( + 2, + Execs.makeThreadFactory("HttpLoadQueuePeonTest-%s") + ), + Execs.singleThreaded("HttpLoadQueuePeonTest") + ); + + httpLoadQueuePeon.start(); + + Map latches = ImmutableMap.of( + segment1.getIdentifier(), new CountDownLatch(1), + segment2.getIdentifier(), new CountDownLatch(1), + segment3.getIdentifier(), new CountDownLatch(1), + segment4.getIdentifier(), new CountDownLatch(1) + ); + + httpLoadQueuePeon.dropSegment(segment1, new LoadPeonCallback() + { + @Override + public void execute() + { + latches.get(segment1.getIdentifier()).countDown(); + } + }); + + httpLoadQueuePeon.loadSegment(segment2, new LoadPeonCallback() + { + @Override + public void execute() + { + latches.get(segment2.getIdentifier()).countDown(); + } + }); + + httpLoadQueuePeon.dropSegment(segment3, new LoadPeonCallback() + { + @Override + public void execute() + { + latches.get(segment3.getIdentifier()).countDown(); + } + }); + + httpLoadQueuePeon.loadSegment(segment4, new LoadPeonCallback() + { + @Override + public void execute() + { + latches.get(segment4.getIdentifier()).countDown(); + } + }); + + latches.get(segment1.getIdentifier()).await(); + latches.get(segment2.getIdentifier()).await(); + latches.get(segment3.getIdentifier()).await(); + latches.get(segment4.getIdentifier()).await(); + + httpLoadQueuePeon.stop(); + } + + private static class TestDruidNodeDiscovery implements DruidNodeDiscovery + { + Listener listener; + + @Override + public Collection getAllNodes() + { + throw new UnsupportedOperationException("Not Implemented."); + } + + @Override + public void registerListener(Listener listener) + { + listener.nodesAdded(ImmutableList.of()); + this.listener = listener; + } + } + + private static class TestHttpClient implements HttpClient + { + AtomicInteger requestNum = new AtomicInteger(0); + + @Override + public ListenableFuture go( + Request request, HttpResponseHandler httpResponseHandler + ) + { + throw new UnsupportedOperationException("Not Implemented."); + } + + @Override + public ListenableFuture go( + Request request, HttpResponseHandler httpResponseHandler, Duration duration + ) + { + HttpResponse httpResponse = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK); + httpResponse.setContent(ChannelBuffers.buffer(0)); + httpResponseHandler.handleResponse(httpResponse); + try { + List changeRequests = TestUtil.MAPPER.readValue( + request.getContent().array(), new TypeReference>() {} + ); + + List statuses = new ArrayList<>(changeRequests.size()); + for (DataSegmentChangeRequest cr : changeRequests) { + statuses.add(new SegmentLoadDropHandler.DataSegmentChangeRequestAndStatus(cr, SegmentLoadDropHandler.Status.SUCCESS)); + } + return (ListenableFuture) Futures.immediateFuture(new ByteArrayInputStream(TestUtil.MAPPER.writerWithType( + HttpLoadQueuePeon.RESPONSE_ENTITY_TYPE_REF).writeValueAsBytes(statuses))); + } + catch (Exception ex) { + throw new RE(ex, "Unexpected exception."); + } + } + } +} diff --git a/server/src/test/java/io/druid/server/coordinator/LoadQueuePeonTest.java b/server/src/test/java/io/druid/server/coordinator/LoadQueuePeonTest.java index 108e1ac5c974..e5ce7d391783 100644 --- a/server/src/test/java/io/druid/server/coordinator/LoadQueuePeonTest.java +++ b/server/src/test/java/io/druid/server/coordinator/LoadQueuePeonTest.java @@ -82,7 +82,7 @@ public void testMultipleLoadDropSegments() throws Exception final AtomicInteger requestSignalIdx = new AtomicInteger(0); final AtomicInteger segmentSignalIdx = new AtomicInteger(0); - loadQueuePeon = new LoadQueuePeon( + loadQueuePeon = new CuratorLoadQueuePeon( curator, LOAD_QUEUE_PATH, jsonMapper, @@ -289,7 +289,7 @@ public void testFailAssign() throws Exception final CountDownLatch loadRequestSignal = new CountDownLatch(1); final CountDownLatch segmentLoadedSignal = new CountDownLatch(1); - loadQueuePeon = new LoadQueuePeon( + loadQueuePeon = new CuratorLoadQueuePeon( curator, LOAD_QUEUE_PATH, jsonMapper, diff --git a/server/src/test/java/io/druid/server/coordinator/LoadQueuePeonTester.java b/server/src/test/java/io/druid/server/coordinator/LoadQueuePeonTester.java index f394ad04b2ea..7b3f2808cab9 100644 --- a/server/src/test/java/io/druid/server/coordinator/LoadQueuePeonTester.java +++ b/server/src/test/java/io/druid/server/coordinator/LoadQueuePeonTester.java @@ -23,7 +23,7 @@ import java.util.concurrent.ConcurrentSkipListSet; -public class LoadQueuePeonTester extends LoadQueuePeon +public class LoadQueuePeonTester extends CuratorLoadQueuePeon { private final ConcurrentSkipListSet segmentsToLoad = new ConcurrentSkipListSet(); diff --git a/server/src/test/java/io/druid/server/metrics/HistoricalMetricsMonitorTest.java b/server/src/test/java/io/druid/server/metrics/HistoricalMetricsMonitorTest.java index f97440bdad56..652600a7d9ba 100644 --- a/server/src/test/java/io/druid/server/metrics/HistoricalMetricsMonitorTest.java +++ b/server/src/test/java/io/druid/server/metrics/HistoricalMetricsMonitorTest.java @@ -29,7 +29,7 @@ import io.druid.client.DruidServerConfig; import io.druid.java.util.common.Intervals; import io.druid.server.SegmentManager; -import io.druid.server.coordination.ZkCoordinator; +import io.druid.server.coordination.SegmentLoadDropHandler; import io.druid.timeline.DataSegment; import org.easymock.Capture; import org.easymock.CaptureType; @@ -48,7 +48,7 @@ public class HistoricalMetricsMonitorTest extends EasyMockSupport { private DruidServerConfig druidServerConfig; private SegmentManager segmentManager; - private ZkCoordinator zkCoordinator; + private SegmentLoadDropHandler segmentLoadDropMgr; private ServiceEmitter serviceEmitter; @Before @@ -56,7 +56,7 @@ public void setUp() { druidServerConfig = EasyMock.createStrictMock(DruidServerConfig.class); segmentManager = EasyMock.createStrictMock(SegmentManager.class); - zkCoordinator = EasyMock.createStrictMock(ZkCoordinator.class); + segmentLoadDropMgr = EasyMock.createStrictMock(SegmentLoadDropHandler.class); serviceEmitter = EasyMock.createStrictMock(ServiceEmitter.class); } @@ -81,7 +81,7 @@ public void testSimple() final String tier = "tier"; EasyMock.expect(druidServerConfig.getMaxSize()).andReturn(maxSize).once(); - EasyMock.expect(zkCoordinator.getPendingDeleteSnapshot()).andReturn(ImmutableList.of(dataSegment)).once(); + EasyMock.expect(segmentLoadDropMgr.getPendingDeleteSnapshot()).andReturn(ImmutableList.of(dataSegment)).once(); EasyMock.expect(druidServerConfig.getTier()).andReturn(tier).once(); EasyMock.expect(druidServerConfig.getPriority()).andReturn(priority).once(); EasyMock.expect(segmentManager.getDataSourceSizes()).andReturn(ImmutableMap.of(dataSource, size)); @@ -95,16 +95,16 @@ public void testSimple() final HistoricalMetricsMonitor monitor = new HistoricalMetricsMonitor( druidServerConfig, segmentManager, - zkCoordinator + segmentLoadDropMgr ); final Capture> eventCapture = EasyMock.newCapture(CaptureType.ALL); serviceEmitter.emit(EasyMock.capture(eventCapture)); EasyMock.expectLastCall().times(5); - EasyMock.replay(druidServerConfig, segmentManager, zkCoordinator, serviceEmitter); + EasyMock.replay(druidServerConfig, segmentManager, segmentLoadDropMgr, serviceEmitter); monitor.doMonitor(serviceEmitter); - EasyMock.verify(druidServerConfig, segmentManager, zkCoordinator, serviceEmitter); + EasyMock.verify(druidServerConfig, segmentManager, segmentLoadDropMgr, serviceEmitter); final String host = "host"; final String service = "service"; diff --git a/services/src/main/java/io/druid/cli/CliCoordinator.java b/services/src/main/java/io/druid/cli/CliCoordinator.java index 3aff7ddf15e7..f2107d54c621 100644 --- a/services/src/main/java/io/druid/cli/CliCoordinator.java +++ b/services/src/main/java/io/druid/cli/CliCoordinator.java @@ -28,6 +28,7 @@ import com.google.inject.Module; import com.google.inject.Provides; import com.google.inject.name.Names; +import com.metamx.http.client.HttpClient; import io.airlift.airline.Command; import io.druid.audit.AuditManager; import io.druid.client.CoordinatorServerView; @@ -42,6 +43,7 @@ import io.druid.guice.LifecycleModule; import io.druid.guice.ManageLifecycle; import io.druid.guice.annotations.CoordinatorIndexingServiceHelper; +import io.druid.guice.annotations.Global; import io.druid.java.util.common.concurrent.ScheduledExecutorFactory; import io.druid.java.util.common.logger.Logger; import io.druid.metadata.MetadataRuleManager; @@ -74,6 +76,7 @@ import io.druid.server.http.RulesResource; import io.druid.server.http.ServersResource; import io.druid.server.http.TiersResource; +import io.druid.server.initialization.ZkPathsConfig; import io.druid.server.initialization.jetty.JettyServerInitializer; import io.druid.server.lookup.cache.LookupCoordinatorManager; import io.druid.server.lookup.cache.LookupCoordinatorManagerConfig; @@ -221,12 +224,14 @@ public LoadQueueTaskMaster getLoadQueueTaskMaster( CuratorFramework curator, ObjectMapper jsonMapper, ScheduledExecutorFactory factory, - DruidCoordinatorConfig config + DruidCoordinatorConfig config, + @Global HttpClient httpClient, + ZkPathsConfig zkPaths ) { return new LoadQueueTaskMaster( curator, jsonMapper, factory.create(1, "Master-PeonExec--%d"), - Executors.newSingleThreadExecutor(), config + Executors.newSingleThreadExecutor(), config, httpClient, zkPaths ); } } diff --git a/services/src/main/java/io/druid/cli/CliPeon.java b/services/src/main/java/io/druid/cli/CliPeon.java index 0ebef587ca37..a205dfab01b3 100644 --- a/services/src/main/java/io/druid/cli/CliPeon.java +++ b/services/src/main/java/io/druid/cli/CliPeon.java @@ -51,6 +51,7 @@ import io.druid.guice.QueryableModule; import io.druid.guice.QueryablePeonModule; import io.druid.guice.annotations.Json; +import io.druid.guice.annotations.Smile; import io.druid.indexing.common.RetryPolicyConfig; import io.druid.indexing.common.RetryPolicyFactory; import io.druid.indexing.common.TaskToolboxFactory; @@ -87,6 +88,7 @@ import io.druid.segment.realtime.plumber.CoordinatorBasedSegmentHandoffNotifierConfig; import io.druid.segment.realtime.plumber.CoordinatorBasedSegmentHandoffNotifierFactory; import io.druid.segment.realtime.plumber.SegmentHandoffNotifierFactory; +import io.druid.server.coordination.BatchDataSegmentAnnouncer; import io.druid.server.coordination.ServerType; import io.druid.server.http.SegmentListerResource; import io.druid.server.initialization.jetty.ChatHandlerServerModule; @@ -94,6 +96,7 @@ import io.druid.server.metrics.DataSourceTaskIdHolder; import org.eclipse.jetty.server.Server; +import javax.annotation.Nullable; import java.io.File; import java.io.IOException; import java.util.Arrays; @@ -266,6 +269,21 @@ public String getTaskIDFromTask(final Task task) { return task.getId(); } + + @Provides + public SegmentListerResource getSegmentListerResource( + @Json ObjectMapper jsonMapper, + @Smile ObjectMapper smileMapper, + @Nullable BatchDataSegmentAnnouncer announcer + ) + { + return new SegmentListerResource( + jsonMapper, + smileMapper, + announcer, + null + ); + } }, new QueryablePeonModule(), new IndexingServiceFirehoseModule(),