diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java index 548d5b6a05b7..7a6df9a6986a 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java @@ -72,7 +72,6 @@ import org.apache.druid.indexing.overlord.TaskLockbox; import org.apache.druid.indexing.overlord.TaskStorage; import org.apache.druid.indexing.overlord.supervisor.SupervisorManager; -import org.apache.druid.indexing.test.TestDataSegmentAnnouncer; import org.apache.druid.indexing.test.TestDataSegmentKiller; import org.apache.druid.indexing.test.TestDataSegmentPusher; import org.apache.druid.jackson.DefaultObjectMapper; @@ -125,6 +124,7 @@ import org.apache.druid.server.DruidNode; import org.apache.druid.server.coordination.DataSegmentServerAnnouncer; import org.apache.druid.server.coordination.ServerType; +import org.apache.druid.server.coordination.TestDataSegmentAnnouncer; import org.apache.druid.server.security.AuthTestUtils; import org.apache.druid.testing.InitializedNullHandlingTest; import org.apache.druid.timeline.DataSegment; diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/RealtimeIndexTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/RealtimeIndexTaskTest.java index cd9163fad9ad..4f8e2dd9a68e 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/RealtimeIndexTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/RealtimeIndexTaskTest.java @@ -59,7 +59,6 @@ import org.apache.druid.indexing.overlord.TaskLockbox; import org.apache.druid.indexing.overlord.TaskStorage; import org.apache.druid.indexing.overlord.supervisor.SupervisorManager; -import org.apache.druid.indexing.test.TestDataSegmentAnnouncer; import org.apache.druid.indexing.test.TestDataSegmentKiller; import org.apache.druid.indexing.test.TestDataSegmentPusher; import org.apache.druid.indexing.test.TestIndexerMetadataStorageCoordinator; @@ -115,6 +114,7 @@ import org.apache.druid.server.DruidNode; import org.apache.druid.server.coordination.DataSegmentServerAnnouncer; import org.apache.druid.server.coordination.ServerType; +import org.apache.druid.server.coordination.TestDataSegmentAnnouncer; import org.apache.druid.server.security.AuthTestUtils; import org.apache.druid.testing.InitializedNullHandlingTest; import org.apache.druid.timeline.DataSegment; diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTestBase.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTestBase.java index 79bb286f7ad8..decc27bf4db0 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTestBase.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTestBase.java @@ -75,7 +75,6 @@ import org.apache.druid.indexing.overlord.TaskLockbox; import org.apache.druid.indexing.overlord.TaskStorage; import org.apache.druid.indexing.overlord.supervisor.SupervisorManager; -import org.apache.druid.indexing.test.TestDataSegmentAnnouncer; import org.apache.druid.indexing.test.TestDataSegmentKiller; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.Intervals; @@ -120,6 +119,7 @@ import org.apache.druid.server.DruidNode; import org.apache.druid.server.coordination.DataSegmentServerAnnouncer; import org.apache.druid.server.coordination.ServerType; +import org.apache.druid.server.coordination.TestDataSegmentAnnouncer; import org.apache.druid.server.security.AuthTestUtils; import org.apache.druid.timeline.DataSegment; import org.apache.druid.utils.CompressionUtils; diff --git a/server/src/main/java/org/apache/druid/client/HttpServerInventoryView.java b/server/src/main/java/org/apache/druid/client/HttpServerInventoryView.java index 2c30c1681325..6f5c3bdb1600 100644 --- a/server/src/main/java/org/apache/druid/client/HttpServerInventoryView.java +++ b/server/src/main/java/org/apache/druid/client/HttpServerInventoryView.java @@ -608,14 +608,13 @@ public void deltaSync(List changes) { for (DataSegmentChangeRequest request : changes) { if (request instanceof SegmentChangeRequestLoad) { - addSegment(((SegmentChangeRequestLoad) request).getSegment(), false); + addSegment(request.getSegment(), false); } else if (request instanceof SegmentChangeRequestDrop) { - removeSegment(((SegmentChangeRequestDrop) request).getSegment(), false); + removeSegment(request.getSegment(), false); } else { log.error( "Server[%s] gave a non load/drop dataSegmentChangeRequest[%s], Ignored.", - druidServer.getName(), - request + druidServer.getName(), request ); } } diff --git a/server/src/main/java/org/apache/druid/server/coordination/DataSegmentChangeRequest.java b/server/src/main/java/org/apache/druid/server/coordination/DataSegmentChangeRequest.java index 6a044aaf6bba..c6500fade7d3 100644 --- a/server/src/main/java/org/apache/druid/server/coordination/DataSegmentChangeRequest.java +++ b/server/src/main/java/org/apache/druid/server/coordination/DataSegmentChangeRequest.java @@ -21,6 +21,7 @@ import com.fasterxml.jackson.annotation.JsonSubTypes; import com.fasterxml.jackson.annotation.JsonTypeInfo; +import org.apache.druid.timeline.DataSegment; import javax.annotation.Nullable; @@ -37,4 +38,6 @@ public interface DataSegmentChangeRequest void go(DataSegmentChangeHandler handler, @Nullable DataSegmentChangeCallback callback); String asString(); + + DataSegment getSegment(); } diff --git a/server/src/main/java/org/apache/druid/server/coordination/DataSegmentChangeResponse.java b/server/src/main/java/org/apache/druid/server/coordination/DataSegmentChangeResponse.java new file mode 100644 index 000000000000..50d622928705 --- /dev/null +++ b/server/src/main/java/org/apache/druid/server/coordination/DataSegmentChangeResponse.java @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server.coordination; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Preconditions; + +import javax.annotation.Nullable; + +/** + * Response of a {@link DataSegmentChangeRequest}. Contains the request itself + * and the {@link Status} of the request. + */ +public class DataSegmentChangeResponse +{ + private final DataSegmentChangeRequest request; + private final Status status; + + @JsonCreator + public DataSegmentChangeResponse( + @JsonProperty("request") DataSegmentChangeRequest request, + @JsonProperty("status") Status status + ) + { + this.request = request; + this.status = status; + } + + @JsonProperty + public DataSegmentChangeRequest getRequest() + { + return request; + } + + @JsonProperty + public Status getStatus() + { + return status; + } + + @JsonIgnore + public boolean isComplete() + { + return getStatus().getState() != State.PENDING; + } + + @JsonIgnore + public boolean isLoadRequest() + { + return request instanceof SegmentChangeRequestLoad; + } + + @Override + public String toString() + { + return "DataSegmentChangeResponse{" + + "request=" + request + + ", status=" + status + + '}'; + } + + public enum State + { + SUCCESS, FAILED, PENDING + } + + /** + * Contains {@link State} of a {@link DataSegmentChangeRequest} and the failure + * message, if any. + */ + public static class Status + { + private final State state; + @Nullable + private final String failureCause; + + public static final Status SUCCESS = new Status(State.SUCCESS, null); + public static final Status PENDING = new Status(State.PENDING, null); + + @JsonCreator + public Status( + @JsonProperty("state") State state, + @JsonProperty("failureCause") @Nullable String failureCause + ) + { + Preconditions.checkNotNull(state, "state must be non-null"); + this.state = state; + this.failureCause = failureCause; + } + + public static Status failed(String cause) + { + return new Status(State.FAILED, cause); + } + + @JsonProperty + public State getState() + { + return state; + } + + @Nullable + @JsonProperty + public String getFailureCause() + { + return failureCause; + } + + @Override + public String toString() + { + return "Status{" + + "state=" + state + + ", failureCause='" + failureCause + '\'' + + '}'; + } + } +} diff --git a/server/src/main/java/org/apache/druid/server/coordination/SegmentChangeRequestDrop.java b/server/src/main/java/org/apache/druid/server/coordination/SegmentChangeRequestDrop.java index c4229a028806..94bb84561f89 100644 --- a/server/src/main/java/org/apache/druid/server/coordination/SegmentChangeRequestDrop.java +++ b/server/src/main/java/org/apache/druid/server/coordination/SegmentChangeRequestDrop.java @@ -45,6 +45,7 @@ public SegmentChangeRequestDrop( @JsonProperty @JsonUnwrapped + @Override public DataSegment getSegment() { return segment; diff --git a/server/src/main/java/org/apache/druid/server/coordination/SegmentChangeRequestLoad.java b/server/src/main/java/org/apache/druid/server/coordination/SegmentChangeRequestLoad.java index 130c7b50d80c..d2e2db35fd76 100644 --- a/server/src/main/java/org/apache/druid/server/coordination/SegmentChangeRequestLoad.java +++ b/server/src/main/java/org/apache/druid/server/coordination/SegmentChangeRequestLoad.java @@ -63,6 +63,7 @@ public void go(DataSegmentChangeHandler handler, @Nullable DataSegmentChangeCall @JsonProperty @JsonUnwrapped + @Override public DataSegment getSegment() { return segment; diff --git a/server/src/main/java/org/apache/druid/server/coordination/SegmentChangeRequestNoop.java b/server/src/main/java/org/apache/druid/server/coordination/SegmentChangeRequestNoop.java index 3978bcf693c4..3a25f602ebe2 100644 --- a/server/src/main/java/org/apache/druid/server/coordination/SegmentChangeRequestNoop.java +++ b/server/src/main/java/org/apache/druid/server/coordination/SegmentChangeRequestNoop.java @@ -19,6 +19,9 @@ package org.apache.druid.server.coordination; +import com.fasterxml.jackson.annotation.JsonIgnore; +import org.apache.druid.timeline.DataSegment; + import javax.annotation.Nullable; /** @@ -39,4 +42,11 @@ public String asString() { return "NOOP"; } + + @Override + @JsonIgnore + public DataSegment getSegment() + { + throw new UnsupportedOperationException(); + } } diff --git a/server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java b/server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java index 2ba5dc933042..8fb1141a57bf 100644 --- a/server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java +++ b/server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java @@ -19,11 +19,8 @@ package org.apache.druid.server.coordination; -import com.fasterxml.jackson.annotation.JsonCreator; -import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; import com.google.common.base.Throwables; import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; @@ -32,11 +29,13 @@ import com.google.common.util.concurrent.AbstractFuture; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; +import com.google.errorprone.annotations.concurrent.GuardedBy; import com.google.inject.Inject; import org.apache.druid.guice.ManageLifecycle; import org.apache.druid.guice.ServerTypeConfig; import org.apache.druid.java.util.common.FileUtils; import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.common.Stopwatch; import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.common.lifecycle.LifecycleStart; import org.apache.druid.java.util.common.lifecycle.LifecycleStop; @@ -77,10 +76,32 @@ public class SegmentLoadDropHandler implements DataSegmentChangeHandler { private static final EmittingLogger log = new EmittingLogger(SegmentLoadDropHandler.class); - // Synchronizes removals from segmentsToDelete - private final Object segmentDeleteLock = new Object(); + /** + * Synchronizes addition and removal from {@link #segmentsToDrop} to ensure that + * concurrent Load and Drop of the same segment do not result in an invalid state. + *

+ * For a given segment, the latest request handled by {@link #processRequest} + * is the one which is honored. + *

+ * Possible cases: + *

    + *
  1. + * Load after Drop: + *
      + *
    • Drop started before Load is queued: Load must wait for Drop to finish.
    • + *
    • Drop started after Load is queued: There is nothing to do as Drop would not be + * processed anyway due to the segment not being present in {@link #segmentsToDrop}
    • + *
    + *
  2. + *
  3. Drop after Load: the Load must exit as soon as it realizes that the + * segment is now marked for Drop and should not be loaded or announced anymore.
  4. + *
+ */ + private final Object segmentDropLock = new Object(); - // Synchronizes start/stop of this object. + /** + * Synchronizes start/stop of the SegmentLoadDropHandler. + */ private final Object startStopLock = new Object(); private final ObjectMapper jsonMapper; @@ -90,19 +111,48 @@ public class SegmentLoadDropHandler implements DataSegmentChangeHandler private final SegmentManager segmentManager; private final ScheduledExecutorService exec; private final ServerTypeConfig serverTypeConfig; - private final ConcurrentSkipListSet segmentsToDelete; + private final ConcurrentSkipListSet segmentsToDrop; private final SegmentCacheManager segmentCacheManager; private volatile boolean started = false; - // Keep history of load/drop request status in a LRU cache to maintain idempotency if same request shows up - // again and to return status of a completed request. Maximum size of this cache must be significantly greater - // than number of pending load/drop requests. so that history is not lost too quickly. - private final Cache> requestStatuses; + /** + * Used to cache the status of a completed load or drop request until it has + * been served to the (Coordinator) client exactly once. + *

+ * The cache is used as follows: + *

    + *
  1. An entry with state PENDING is added to the cache upon receiving a + * request to load or drop a segment.
  2. + *
  3. A duplicate request received at this point is immediately answered with PENDING.
  4. + *
  5. Once the load/drop finishes, the entry is updated to either SUCCESS or FAILED.
  6. + *
  7. A duplicate request received at this point is immediately answered with + * SUCCESS or FAILED and the entry is removed from the cache.
  8. + *
  9. If the original request itself finishes after the load or drop has already + * completed, it is answered with a SUCCESS or FAILED and the entry is removed + * from the cache.
  10. + *
  11. If a request of a different type (e.g. load after drop) is received, + * the entry from the cache is removed and the previous request is abandoned.
  12. + *
+ *

+ * Maximum size of this cache must be significantly greater than the number of + * pending load/drop requests. This is generally already the case because the + * Coordinator sends load/drop requests in small batches and does not send new + * requests until the previously submitted ones have either succeeded or failed. + *

+ * The cache must be updated in a thread-safe manner so that stale statuses + * are not served. + */ + @GuardedBy("requestStatusesLock") + private final Cache> requestStatuses; private final Object requestStatusesLock = new Object(); - // This is the list of unresolved futures returned to callers of processBatch(List) - // Threads loading/dropping segments resolve these futures as and when some segment load/drop finishes. + /** + * List of unresolved futures returned to callers of {@link #processBatch}. + * Each {@link CustomSettableFuture} corresponds to a single batch of requests. + * A future is resolved as soon as a single request in the batch completes with + * either success or failure. + */ private final LinkedHashSet waitingFutures = new LinkedHashSet<>(); @Inject @@ -152,8 +202,11 @@ public SegmentLoadDropHandler( this.exec = exec; this.serverTypeConfig = serverTypeConfig; - this.segmentsToDelete = new ConcurrentSkipListSet<>(); - requestStatuses = CacheBuilder.newBuilder().maximumSize(config.getStatusQueueMaxSize()).initialCapacity(8).build(); + this.segmentsToDrop = new ConcurrentSkipListSet<>(); + this.requestStatuses = CacheBuilder.newBuilder() + .maximumSize(config.getStatusQueueMaxSize()) + .initialCapacity(8) + .build(); } @LifecycleStart @@ -164,7 +217,8 @@ public void start() throws IOException return; } - log.info("Starting..."); + final Stopwatch stopwatch = Stopwatch.createStarted(); + log.info("Starting SegmentLoadDropHandler..."); try { if (!config.getLocations().isEmpty()) { loadLocalCache(); @@ -179,7 +233,7 @@ public void start() throws IOException throw new RuntimeException(e); } started = true; - log.info("Started."); + log.info("Started SegmentLoadDropHandler in [%d]ms.", stopwatch.millisElapsed()); } } @@ -191,7 +245,7 @@ public void stop() return; } - log.info("Stopping..."); + log.info("Stopping SegmentLoadDropHandler..."); try { if (shouldAnnounce()) { serverAnnouncer.unannounce(); @@ -203,7 +257,7 @@ public void stop() finally { started = false; } - log.info("Stopped."); + log.info("Stopped SegmentLoadDropHandler."); } } @@ -214,7 +268,6 @@ public boolean isStarted() private void loadLocalCache() throws IOException { - final long start = System.currentTimeMillis(); File baseDir = config.getInfoDir(); FileUtils.mkdirp(baseDir); @@ -254,39 +307,33 @@ private void loadLocalCache() throws IOException .emit(); } - addSegments( - cachedSegments, - () -> log.info("Cache load took %,d ms", System.currentTimeMillis() - start) - ); - } - - private void loadSegment(DataSegment segment, DataSegmentChangeCallback callback, boolean lazy) - throws SegmentLoadingException - { - loadSegment(segment, callback, lazy, null); + loadCachedSegments(cachedSegments); } /** - * Load a single segment. If the segment is loaded successfully, this function simply returns. Otherwise it will - * throw a SegmentLoadingException + * Downloads a single segment and creates a cache file for it in the info dir. + * If the load fails at any step, {@link #cleanupFailedLoad} is called. * - * @throws SegmentLoadingException if it fails to load the given segment + * @throws SegmentLoadingException if the load fails */ - private void loadSegment(DataSegment segment, DataSegmentChangeCallback callback, boolean lazy, @Nullable - ExecutorService loadSegmentIntoPageCacheExec) - throws SegmentLoadingException + private void loadSegment( + DataSegment segment, + boolean lazy, + @Nullable ExecutorService loadSegmentIntoPageCacheExec + ) throws SegmentLoadingException { final boolean loaded; try { - loaded = segmentManager.loadSegment(segment, - lazy, - () -> this.removeSegment(segment, DataSegmentChangeCallback.NOOP, false), - loadSegmentIntoPageCacheExec + loaded = segmentManager.loadSegment( + segment, + lazy, + () -> cleanupFailedLoad(segment), + loadSegmentIntoPageCacheExec ); } catch (Exception e) { - removeSegment(segment, callback, false); - throw new SegmentLoadingException(e, "Exception loading segment[%s]", segment.getId()); + cleanupFailedLoad(segment); + throw new SegmentLoadingException(e, "Could not load segment: %s", e.getMessage()); } if (loaded) { @@ -296,7 +343,7 @@ private void loadSegment(DataSegment segment, DataSegmentChangeCallback callback jsonMapper.writeValue(segmentInfoCacheFile, segment); } catch (IOException e) { - removeSegment(segment, callback, false); + cleanupFailedLoad(segment); throw new SegmentLoadingException( e, "Failed to write to disk segment info cache file[%s]", @@ -320,59 +367,82 @@ public Map getRowCountDistributionPerDataso @Override public void addSegment(DataSegment segment, @Nullable DataSegmentChangeCallback callback) { - Status result = null; + // Unmark the segment for drop + synchronized (segmentDropLock) { + segmentsToDrop.remove(segment); + } + + // Load and announce the segment asynchronously + exec.submit(() -> loadAndAnnounceSegment(segment)); + } + + /** + * Loads the segment synchronously, announces it and updates the status of the + * corresponding change request in the {@link #requestStatuses} cache. + */ + private void loadAndAnnounceSegment(DataSegment segment) + { + DataSegmentChangeResponse.Status result = null; try { - log.info("Loading segment %s", segment.getId()); - /* - The lock below is used to prevent a race condition when the scheduled runnable in removeSegment() starts, - and if (segmentsToDelete.remove(segment)) returns true, in which case historical will start deleting segment - files. At that point, it's possible that right after the "if" check, addSegment() is called and actually loads - the segment, which makes dropping segment and downloading segment happen at the same time. - */ - if (segmentsToDelete.contains(segment)) { - /* - Both contains(segment) and remove(segment) can be moved inside the synchronized block. However, in that case, - each time when addSegment() is called, it has to wait for the lock in order to make progress, which will make - things slow. Given that in most cases segmentsToDelete.contains(segment) returns false, it will save a lot of - cost of acquiring lock by doing the "contains" check outside the synchronized block. - */ - synchronized (segmentDeleteLock) { - segmentsToDelete.remove(segment); + log.info("Loading segment[%s]", segment.getId()); + + // Do not start with Load if there is a Drop in progress. This prevents a Drop + // that has come before Load of the same segment from causing partial success. + synchronized (segmentDropLock) { + if (!shouldLoadSegment(segment)) { + return; } } - loadSegment(segment, DataSegmentChangeCallback.NOOP, false); - // announce segment even if the segment file already exists. - try { - announcer.announceSegment(segment); + + // Do not load segment if it has already been marked for drop + if (shouldLoadSegment(segment)) { + loadSegment(segment, false, null); + } else { + return; } - catch (IOException e) { - throw new SegmentLoadingException(e, "Failed to announce segment[%s]", segment.getId()); + + // Do not announce segment if it has already been marked for drop + if (shouldLoadSegment(segment)) { + announceSegment(segment); + } else { + return; } - result = Status.SUCCESS; + result = DataSegmentChangeResponse.Status.SUCCESS; } catch (Throwable e) { log.makeAlert(e, "Failed to load segment for dataSource") .addData("segment", segment) .emit(); - result = Status.failed(e.toString()); + result = DataSegmentChangeResponse.Status.failed(e.getMessage()); } finally { updateRequestStatus(new SegmentChangeRequestLoad(segment), result); - if (null != callback) { - callback.execute(); - } + resolveWaitingFutures(); + } + } + + /** + * Announces the given segment, regardless of whether the segment files already + * existed or have been freshly downloaded. + */ + private void announceSegment(DataSegment segment) throws SegmentLoadingException + { + try { + announcer.announceSegment(segment); + } + catch (IOException e) { + throw new SegmentLoadingException(e, "Failed to announce segment[%s]", segment.getId()); } } /** * Bulk adding segments during bootstrap - * @param segments A collection of segments to add - * @param callback Segment loading callback */ - private void addSegments(Collection segments, final DataSegmentChangeCallback callback) + private void loadCachedSegments(Collection segments) { // Start a temporary thread pool to load segments into page cache during bootstrap + final Stopwatch stopwatch = Stopwatch.createStarted(); ExecutorService loadingExecutor = null; ExecutorService loadSegmentsIntoPageCacheOnBootstrapExec = config.getNumThreadsToLoadSegmentsIntoPageCacheOnBootstrap() != 0 ? @@ -395,11 +465,9 @@ private void addSegments(Collection segments, final DataSegmentChan try { log.info( "Loading segment[%d/%d][%s]", - counter.incrementAndGet(), - numSegments, - segment.getId() + counter.incrementAndGet(), numSegments, segment.getId() ); - loadSegment(segment, callback, config.isLazyLoadOnStart(), loadSegmentsIntoPageCacheOnBootstrapExec); + loadSegment(segment, config.isLazyLoadOnStart(), loadSegmentsIntoPageCacheOnBootstrapExec); try { backgroundSegmentAnnouncer.announceSegment(segment); } @@ -441,7 +509,7 @@ private void addSegments(Collection segments, final DataSegmentChan .emit(); } finally { - callback.execute(); + log.info("Finished cache load in [%,d]ms", stopwatch.millisElapsed()); if (loadingExecutor != null) { loadingExecutor.shutdownNow(); } @@ -453,96 +521,120 @@ private void addSegments(Collection segments, final DataSegmentChan } } + /** + * Cleans up a failed LOAD request by completely removing the partially + * downloaded segment files and unannouncing the segment for safe measure. + */ + @VisibleForTesting + void cleanupFailedLoad(DataSegment segment) + { + unannounceSegment(segment); + synchronized (segmentDropLock) { + segmentsToDrop.add(segment); + } + dropSegment(segment); + } + @Override public void removeSegment(DataSegment segment, @Nullable DataSegmentChangeCallback callback) { - removeSegment(segment, callback, true); + // Mark the segment for drop + synchronized (segmentDropLock) { + segmentsToDrop.add(segment); + } + + unannounceSegment(segment); + resolveWaitingFutures(); + + // Schedule drop of segment + log.info( + "Completely removing segment[%s] in [%,d] millis.", + segment.getId(), config.getDropSegmentDelayMillis() + ); + exec.schedule( + () -> dropSegment(segment), + config.getDropSegmentDelayMillis(), + TimeUnit.MILLISECONDS + ); } - @VisibleForTesting - void removeSegment( - final DataSegment segment, - @Nullable final DataSegmentChangeCallback callback, - final boolean scheduleDrop - ) + /** + * Unannounces the segment and updates the result for the corresponding DROP request. + * A DROP request is considered successful if the unannouncement has succeeded, + * even if the segment files have not been deleted yet. + */ + private void unannounceSegment(final DataSegment segment) { - Status result = null; + DataSegmentChangeResponse.Status result = null; try { announcer.unannounceSegment(segment); - segmentsToDelete.add(segment); - - Runnable runnable = () -> { - try { - synchronized (segmentDeleteLock) { - if (segmentsToDelete.remove(segment)) { - segmentManager.dropSegment(segment); - - File segmentInfoCacheFile = new File(config.getInfoDir(), segment.getId().toString()); - if (!segmentInfoCacheFile.delete()) { - log.warn("Unable to delete segmentInfoCacheFile[%s]", segmentInfoCacheFile); - } - } - } - } - catch (Exception e) { - log.makeAlert(e, "Failed to remove segment! Possible resource leak!") - .addData("segment", segment) - .emit(); - } - }; - - if (scheduleDrop) { - log.info( - "Completely removing [%s] in [%,d] millis", - segment.getId(), - config.getDropSegmentDelayMillis() - ); - exec.schedule( - runnable, - config.getDropSegmentDelayMillis(), - TimeUnit.MILLISECONDS - ); - } else { - runnable.run(); - } - - result = Status.SUCCESS; + result = DataSegmentChangeResponse.Status.SUCCESS; } catch (Exception e) { log.makeAlert(e, "Failed to remove segment") .addData("segment", segment) .emit(); - result = Status.failed(e.getMessage()); + result = DataSegmentChangeResponse.Status.failed(e.getMessage()); } finally { updateRequestStatus(new SegmentChangeRequestDrop(segment), result); - if (null != callback) { - callback.execute(); + } + } + + /** + * Drops the given segment synchronously. + */ + private void dropSegment(DataSegment segment) + { + try { + synchronized (segmentDropLock) { + if (segmentsToDrop.remove(segment)) { + segmentManager.dropSegment(segment); + + File segmentInfoCacheFile = new File(config.getInfoDir(), segment.getId().toString()); + if (!segmentInfoCacheFile.delete()) { + log.warn("Unable to delete segmentInfoCacheFile[%s]", segmentInfoCacheFile); + } + } } } + catch (Exception e) { + log.makeAlert(e, "Failed to drop segment. Possible resource leak.") + .addData("segment", segment) + .emit(); + } } public Collection getPendingDeleteSnapshot() { - return ImmutableList.copyOf(segmentsToDelete); + return ImmutableList.copyOf(segmentsToDrop); } - public ListenableFuture> processBatch(List changeRequests) + /** + * Processes a batch of segment load/drop requests. + * + * @return Future of List of results of each change request in the batch. + * This future completes as soon as any pending request is completed by + * this {@code SegmentLoadDropHandler}. + */ + public ListenableFuture> processBatch( + List changeRequests + ) { boolean isAnyRequestDone = false; - Map> statuses = Maps.newHashMapWithExpectedSize(changeRequests.size()); + Map> statuses + = Maps.newHashMapWithExpectedSize(changeRequests.size()); for (DataSegmentChangeRequest cr : changeRequests) { - AtomicReference status = processRequest(cr); - if (status.get().getState() != Status.STATE.PENDING) { + AtomicReference status = processRequest(cr); + if (status.get().isComplete()) { isAnyRequestDone = true; } statuses.put(cr, status); } - CustomSettableFuture future = new CustomSettableFuture(waitingFutures, statuses); - + final CustomSettableFuture future = new CustomSettableFuture(statuses); if (isAnyRequestDone) { future.resolve(); } else { @@ -554,64 +646,83 @@ public ListenableFuture> processBatch(Li return future; } - private AtomicReference processRequest(DataSegmentChangeRequest changeRequest) + /** + * Starts the processing of the given segment change request. + */ + private AtomicReference processRequest(DataSegmentChangeRequest changeRequest) { synchronized (requestStatusesLock) { - AtomicReference status = requestStatuses.getIfPresent(changeRequest); - - // If last load/drop request status is failed, here can try that again - if (status == null || status.get().getState() == Status.STATE.FAILED) { - changeRequest.go( - new DataSegmentChangeHandler() - { - @Override - public void addSegment(DataSegment segment, DataSegmentChangeCallback callback) - { - requestStatuses.put(changeRequest, new AtomicReference<>(Status.PENDING)); - exec.submit( - () -> SegmentLoadDropHandler.this.addSegment( - ((SegmentChangeRequestLoad) changeRequest).getSegment(), - () -> resolveWaitingFutures() - ) - ); - } + final DataSegment segment = changeRequest.getSegment(); + final AtomicReference cachedResponse = requestStatuses.getIfPresent(segment); + + if (cachedResponse == null) { + // Start a fresh LOAD or DROP as there is no previous known request + markRequestAsPending(changeRequest); + changeRequest.go(this, null); + return requestStatuses.getIfPresent(segment); + } else if (cachedResponse.get().getRequest().equals(changeRequest)) { + // Serve the cached response and clear it if the request has completed, + // so that we don't keep serving a stale response indefinitely + if (cachedResponse.get().isComplete()) { + requestStatuses.invalidate(segment); + } + return cachedResponse; + } else { + // Clear the cached response as this is a different request + requestStatuses.invalidate(segment); - @Override - public void removeSegment(DataSegment segment, DataSegmentChangeCallback callback) - { - requestStatuses.put(changeRequest, new AtomicReference<>(Status.PENDING)); - SegmentLoadDropHandler.this.removeSegment( - ((SegmentChangeRequestDrop) changeRequest).getSegment(), - () -> resolveWaitingFutures(), - true - ); - } - }, - this::resolveWaitingFutures - ); - } else if (status.get().getState() == Status.STATE.SUCCESS) { - // SUCCESS case, we'll clear up the cached success while serving it to this client - // Not doing this can lead to an incorrect response to upcoming clients for a reload - requestStatuses.invalidate(changeRequest); - return status; + markRequestAsPending(changeRequest); + changeRequest.go(this, null); + return requestStatuses.getIfPresent(segment); } - return requestStatuses.getIfPresent(changeRequest); } } - private void updateRequestStatus(DataSegmentChangeRequest changeRequest, Status result) + @GuardedBy("requestStatusesLock") + private void markRequestAsPending(DataSegmentChangeRequest changeRequest) { - if (result == null) { - result = Status.failed("Unknown reason. Check server logs."); + DataSegmentChangeResponse pendingResponse + = new DataSegmentChangeResponse(changeRequest, DataSegmentChangeResponse.Status.PENDING); + requestStatuses.put(changeRequest.getSegment(), new AtomicReference<>(pendingResponse)); + } + + /** + * Returns true only if there is a Load request in {@link #requestStatuses} for + * this segment, and the segment is not present in {@link #segmentsToDrop}. + */ + private boolean shouldLoadSegment(DataSegment segment) + { + if (segmentsToDrop.contains(segment)) { + return false; + } + + synchronized (requestStatusesLock) { + AtomicReference response = requestStatuses.getIfPresent(segment); + return response != null && response.get().isLoadRequest(); + } + } + + /** + * Updates the status for the given request only if this request is currently + * in progress and present in {@link #requestStatuses}. + */ + private void updateRequestStatus(DataSegmentChangeRequest changeRequest, DataSegmentChangeResponse.Status status) + { + if (status == null) { + status = DataSegmentChangeResponse.Status.failed("Unknown reason. Check server logs."); } synchronized (requestStatusesLock) { - AtomicReference statusRef = requestStatuses.getIfPresent(changeRequest); - if (statusRef != null) { - statusRef.set(result); + AtomicReference statusRef + = requestStatuses.getIfPresent(changeRequest.getSegment()); + if (statusRef != null && statusRef.get().getRequest().equals(changeRequest)) { + statusRef.set(new DataSegmentChangeResponse(changeRequest, status)); } } } + /** + * Resolves waiting futures after a LOAD or DROP request has completed. + */ private void resolveWaitingFutures() { LinkedHashSet waitingFuturesCopy; @@ -772,34 +883,43 @@ public void close() } } - // Future with cancel() implementation to remove it from "waitingFutures" list - private static class CustomSettableFuture extends AbstractFuture> + /** + * Represents the future result of a single batch of segment load drop requests. + *

+ * Upon cancellation, this future removes itself from {@link #waitingFutures}. + */ + private class CustomSettableFuture extends AbstractFuture> { - private final LinkedHashSet waitingFutures; - private final Map> statusRefs; + private final Map> resultRefs; private CustomSettableFuture( - LinkedHashSet waitingFutures, - Map> statusRefs + Map> resultRefs ) { - this.waitingFutures = waitingFutures; - this.statusRefs = statusRefs; + this.resultRefs = resultRefs; } public void resolve() { - synchronized (statusRefs) { + // Synchronize here to ensure thread-safety of (a) resolving this future + // and (b) updating the requestStatuses cache + synchronized (requestStatusesLock) { if (isDone()) { return; } - List result = new ArrayList<>(statusRefs.size()); - statusRefs.forEach( - (request, statusRef) -> result.add(new DataSegmentChangeRequestAndStatus(request, statusRef.get())) - ); + final List results = new ArrayList<>(resultRefs.size()); + resultRefs.forEach((request, reference) -> { + DataSegmentChangeResponse result = reference.get(); + results.add(result); - set(result); + // Remove complete statuses from the cache + if (result != null && result.isComplete()) { + requestStatuses.invalidate(request.getSegment()); + } + }); + + set(results); } } @@ -813,94 +933,5 @@ public boolean cancel(boolean interruptIfRunning) } } - public static class Status - { - public enum STATE - { - SUCCESS, FAILED, PENDING - } - - private final STATE state; - @Nullable - private final String failureCause; - - public static final Status SUCCESS = new Status(STATE.SUCCESS, null); - public static final Status PENDING = new Status(STATE.PENDING, null); - - @JsonCreator - Status( - @JsonProperty("state") STATE state, - @JsonProperty("failureCause") @Nullable String failureCause - ) - { - Preconditions.checkNotNull(state, "state must be non-null"); - this.state = state; - this.failureCause = failureCause; - } - - public static Status failed(String cause) - { - return new Status(STATE.FAILED, cause); - } - - @JsonProperty - public STATE getState() - { - return state; - } - - @Nullable - @JsonProperty - public String getFailureCause() - { - return failureCause; - } - - @Override - public String toString() - { - return "Status{" + - "state=" + state + - ", failureCause='" + failureCause + '\'' + - '}'; - } - } - - public static class DataSegmentChangeRequestAndStatus - { - private final DataSegmentChangeRequest request; - private final Status status; - - @JsonCreator - public DataSegmentChangeRequestAndStatus( - @JsonProperty("request") DataSegmentChangeRequest request, - @JsonProperty("status") Status status - ) - { - this.request = request; - this.status = status; - } - - @JsonProperty - public DataSegmentChangeRequest getRequest() - { - return request; - } - - @JsonProperty - public Status getStatus() - { - return status; - } - - @Override - public String toString() - { - return "DataSegmentChangeRequestAndStatus{" + - "request=" + request + - ", status=" + status + - '}'; - } - } } diff --git a/server/src/main/java/org/apache/druid/server/coordinator/loading/HttpLoadQueuePeon.java b/server/src/main/java/org/apache/druid/server/coordinator/loading/HttpLoadQueuePeon.java index 92dd4714a454..edaff7517389 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/loading/HttpLoadQueuePeon.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/loading/HttpLoadQueuePeon.java @@ -35,7 +35,7 @@ import org.apache.druid.server.coordination.DataSegmentChangeCallback; import org.apache.druid.server.coordination.DataSegmentChangeHandler; import org.apache.druid.server.coordination.DataSegmentChangeRequest; -import org.apache.druid.server.coordination.SegmentLoadDropHandler; +import org.apache.druid.server.coordination.DataSegmentChangeResponse; import org.apache.druid.server.coordinator.BytesAccumulatingResponseHandler; import org.apache.druid.server.coordinator.DruidCoordinatorConfig; import org.apache.druid.server.coordinator.stats.CoordinatorRunStats; @@ -77,8 +77,8 @@ public class HttpLoadQueuePeon implements LoadQueuePeon { }; - public static final TypeReference> RESPONSE_ENTITY_TYPE_REF = - new TypeReference>() + public static final TypeReference> RESPONSE_ENTITY_TYPE_REF = + new TypeReference>() { }; @@ -225,7 +225,7 @@ public void onSuccess(InputStream result) log.trace("Received NO CONTENT reseponse from [%s]", serverId); } else if (HttpServletResponse.SC_OK == responseHandler.getStatus()) { try { - List statuses = + List statuses = jsonMapper.readValue(result, RESPONSE_ENTITY_TYPE_REF); log.trace("Server[%s] returned status response [%s].", serverId, statuses); synchronized (lock) { @@ -235,7 +235,7 @@ public void onSuccess(InputStream result) return; } - for (SegmentLoadDropHandler.DataSegmentChangeRequestAndStatus e : statuses) { + for (DataSegmentChangeResponse e : statuses) { switch (e.getStatus().getState()) { case SUCCESS: case FAILED: @@ -300,7 +300,7 @@ private void logRequestFailure(Throwable t) } } - private void handleResponseStatus(DataSegmentChangeRequest changeRequest, SegmentLoadDropHandler.Status status) + private void handleResponseStatus(DataSegmentChangeRequest changeRequest, DataSegmentChangeResponse.Status status) { changeRequest.go( new DataSegmentChangeHandler() @@ -317,7 +317,7 @@ public void removeSegment(DataSegment segment, DataSegmentChangeCallback callbac updateSuccessOrFailureInHolder(segmentsToDrop.remove(segment), status); } - private void updateSuccessOrFailureInHolder(SegmentHolder holder, SegmentLoadDropHandler.Status status) + private void updateSuccessOrFailureInHolder(SegmentHolder holder, DataSegmentChangeResponse.Status status) { if (holder == null) { return; @@ -325,7 +325,7 @@ private void updateSuccessOrFailureInHolder(SegmentHolder holder, SegmentLoadDro queuedSegments.remove(holder); activeRequestSegments.remove(holder.getSegment()); - if (status.getState() == SegmentLoadDropHandler.Status.STATE.FAILED) { + if (status.getState() == DataSegmentChangeResponse.State.FAILED) { onRequestFailed(holder, status.getFailureCause()); } else { onRequestCompleted(holder, RequestStatus.SUCCESS); diff --git a/server/src/main/java/org/apache/druid/server/http/SegmentListerResource.java b/server/src/main/java/org/apache/druid/server/http/SegmentListerResource.java index a0281b27ff2d..35e3c7b7cdc0 100644 --- a/server/src/main/java/org/apache/druid/server/http/SegmentListerResource.java +++ b/server/src/main/java/org/apache/druid/server/http/SegmentListerResource.java @@ -34,6 +34,7 @@ import org.apache.druid.server.coordination.ChangeRequestHistory; import org.apache.druid.server.coordination.ChangeRequestsSnapshot; import org.apache.druid.server.coordination.DataSegmentChangeRequest; +import org.apache.druid.server.coordination.DataSegmentChangeResponse; import org.apache.druid.server.coordination.SegmentLoadDropHandler; import org.apache.druid.server.coordinator.loading.HttpLoadQueuePeon; import org.apache.druid.server.http.security.StateResourceFilter; @@ -248,7 +249,7 @@ public void applyDataSegmentChangeRequests( } final ResponseContext context = createContext(req.getHeader("Accept")); - final ListenableFuture> future = + final ListenableFuture> future = loadDropRequestHandler.processBatch(changeRequestList); final AsyncContext asyncContext = req.startAsync(); @@ -284,10 +285,10 @@ public void onStartAsync(AsyncEvent event) Futures.addCallback( future, - new FutureCallback>() + new FutureCallback>() { @Override - public void onSuccess(List result) + public void onSuccess(List result) { try { HttpServletResponse response = (HttpServletResponse) asyncContext.getResponse(); diff --git a/server/src/test/java/org/apache/druid/segment/loading/CacheTestSegmentLoader.java b/server/src/test/java/org/apache/druid/segment/loading/CacheTestSegmentLoader.java index f80688276c3e..3269425eea30 100644 --- a/server/src/test/java/org/apache/druid/segment/loading/CacheTestSegmentLoader.java +++ b/server/src/test/java/org/apache/druid/segment/loading/CacheTestSegmentLoader.java @@ -39,6 +39,8 @@ import org.joda.time.Interval; import javax.annotation.Nullable; +import java.util.HashSet; +import java.util.Set; import java.util.concurrent.ExecutorService; /** @@ -46,6 +48,9 @@ public class CacheTestSegmentLoader implements SegmentLoader { + private final Set removedSegments = new HashSet<>(); + private final Set loadedSegments = new HashSet<>(); + @Override public ReferenceCountingSegment getSegment(final DataSegment segment, boolean lazy, SegmentLazyLoadFailCallback SegmentLazyLoadFailCallback) { @@ -175,12 +180,23 @@ public void close() @Override public void loadSegmentIntoPageCache(DataSegment segment, ExecutorService exec) { - + loadedSegments.add(segment); } @Override public void cleanup(DataSegment segment) { + removedSegments.add(segment); + } + + public Set getLoadedSegments() + { + return loadedSegments; + } + public Set getRemovedSegments() + { + return removedSegments; } + } diff --git a/server/src/test/java/org/apache/druid/server/coordination/SegmentChangeRequestNoopTest.java b/server/src/test/java/org/apache/druid/server/coordination/SegmentChangeRequestNoopTest.java new file mode 100644 index 000000000000..010dab1bedf5 --- /dev/null +++ b/server/src/test/java/org/apache/druid/server/coordination/SegmentChangeRequestNoopTest.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server.coordination; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.druid.jackson.DefaultObjectMapper; +import org.apache.druid.java.util.common.jackson.JacksonUtils; +import org.junit.Assert; +import org.junit.Test; + +import java.util.Map; + +public class SegmentChangeRequestNoopTest +{ + private static final ObjectMapper MAPPER = new DefaultObjectMapper(); + + @Test + public void testSerde() throws Exception + { + final SegmentChangeRequestNoop noopRequest = new SegmentChangeRequestNoop(); + final String json = MAPPER.writeValueAsString(noopRequest); + + Map objectMap = MAPPER.readValue( + json, + JacksonUtils.TYPE_REFERENCE_MAP_STRING_OBJECT + ); + Assert.assertEquals(1, objectMap.size()); + Assert.assertEquals("noop", objectMap.get("action")); + + DataSegmentChangeRequest deserialized = MAPPER.readValue(json, DataSegmentChangeRequest.class); + Assert.assertTrue(deserialized instanceof SegmentChangeRequestNoop); + } + + @Test + public void testGetSegmentThrowsUnsupportedException() + { + SegmentChangeRequestNoop noopRequest = new SegmentChangeRequestNoop(); + Assert.assertThrows( + UnsupportedOperationException.class, + noopRequest::getSegment + ); + } +} diff --git a/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerCacheTest.java b/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerCacheTest.java index ff22f00baae1..403df656f200 100644 --- a/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerCacheTest.java +++ b/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerCacheTest.java @@ -43,6 +43,8 @@ import org.apache.druid.segment.loading.SegmentLocalCacheManager; import org.apache.druid.segment.loading.SegmentizerFactory; import org.apache.druid.server.SegmentManager; +import org.apache.druid.server.coordinator.simulate.BlockingExecutorService; +import org.apache.druid.server.coordinator.simulate.WrappingScheduledExecutorService; import org.apache.druid.server.metrics.NoopServiceEmitter; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.partition.NoneShardSpec; @@ -51,7 +53,6 @@ import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; -import org.mockito.ArgumentCaptor; import org.mockito.Mockito; import java.io.File; @@ -60,12 +61,9 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; -import java.util.Comparator; import java.util.List; import java.util.concurrent.ThreadLocalRandom; -import static org.mockito.ArgumentMatchers.any; - /** * This class includes tests that cover the storage location layer as well. */ @@ -76,9 +74,9 @@ public class SegmentLoadDropHandlerCacheTest @Rule public TemporaryFolder temporaryFolder = new TemporaryFolder(); private SegmentLoadDropHandler loadDropHandler; + private BlockingExecutorService loadingExecutor; private TestStorageLocation storageLoc; - private ObjectMapper objectMapper; - private DataSegmentAnnouncer segmentAnnouncer; + private TestDataSegmentAnnouncer segmentAnnouncer; @Before public void setup() throws IOException @@ -87,7 +85,7 @@ public void setup() throws IOException SegmentLoaderConfig config = new SegmentLoaderConfig() .withLocations(Collections.singletonList(storageLoc.toStorageLocationConfig(MAX_SIZE, null))) .withInfoDir(storageLoc.getInfoDir()); - objectMapper = TestHelper.makeJsonMapper(); + final ObjectMapper objectMapper = TestHelper.makeJsonMapper(); objectMapper.registerSubtypes(TestLoadSpec.class); objectMapper.registerSubtypes(TestSegmentizerFactory.class); SegmentCacheManager cacheManager = new SegmentLocalCacheManager(config, objectMapper); @@ -96,7 +94,8 @@ public void setup() throws IOException TestIndex.INDEX_IO, objectMapper )); - segmentAnnouncer = Mockito.mock(DataSegmentAnnouncer.class); + segmentAnnouncer = new TestDataSegmentAnnouncer(); + loadingExecutor = new BlockingExecutorService("test-LoadDropHandler"); loadDropHandler = new SegmentLoadDropHandler( objectMapper, config, @@ -104,6 +103,7 @@ public void setup() throws IOException Mockito.mock(DataSegmentServerAnnouncer.class), segmentManager, cacheManager, + new WrappingScheduledExecutorService("test-LoadDropHandler", loadingExecutor, false), new ServerTypeConfig(ServerType.HISTORICAL) ); EmittingLogger.registerEmitter(new NoopServiceEmitter()); @@ -127,28 +127,28 @@ public void testLoadLocalCache() throws IOException, SegmentLoadingException expectedSegments.add(segment); } - // Start the load drop handler loadDropHandler.start(); // Verify the expected announcements - ArgumentCaptor> argCaptor = ArgumentCaptor.forClass(Iterable.class); - Mockito.verify(segmentAnnouncer).announceSegments(argCaptor.capture()); - List announcedSegments = new ArrayList<>(); - argCaptor.getValue().forEach(announcedSegments::add); - announcedSegments.sort(Comparator.comparing(DataSegment::getVersion)); - Assert.assertEquals(expectedSegments, announcedSegments); + Assert.assertEquals(expectedSegments.size(), segmentAnnouncer.getNumAnnouncedSegments()); + for (DataSegment segment : expectedSegments) { + Assert.assertTrue(segmentAnnouncer.isAnnounced(segment)); + } // make sure adding segments beyond allowed size fails - Mockito.reset(segmentAnnouncer); - DataSegment newSegment = makeSegment("test", "new-segment"); - loadDropHandler.addSegment(newSegment, null); - Mockito.verify(segmentAnnouncer, Mockito.never()).announceSegment(any()); - Mockito.verify(segmentAnnouncer, Mockito.never()).announceSegments(any()); + final DataSegment newSegment = makeSegment("test", "new-segment"); + final List requestBatch = Collections.singletonList( + new SegmentChangeRequestLoad(newSegment) + ); + loadDropHandler.processBatch(requestBatch); + loadingExecutor.finishAllPendingTasks(); + Assert.assertFalse(segmentAnnouncer.isAnnounced(newSegment)); // clearing some segment should allow for new segments - loadDropHandler.removeSegment(expectedSegments.get(0), null, false); - loadDropHandler.addSegment(newSegment, null); - Mockito.verify(segmentAnnouncer).announceSegment(newSegment); + loadDropHandler.cleanupFailedLoad(expectedSegments.get(0)); + loadDropHandler.processBatch(requestBatch); + loadingExecutor.finishAllPendingTasks(); + Assert.assertTrue(segmentAnnouncer.isAnnounced(newSegment)); } private DataSegment makeSegment(String dataSource, String name) diff --git a/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerTest.java b/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerTest.java index d6d4d2374df9..27fe829a3732 100644 --- a/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerTest.java +++ b/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerTest.java @@ -22,13 +22,11 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Iterables; import com.google.common.util.concurrent.ListenableFuture; import org.apache.druid.guice.ServerTypeConfig; +import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.MapUtils; -import org.apache.druid.java.util.common.concurrent.Execs; -import org.apache.druid.java.util.common.concurrent.ScheduledExecutorFactory; import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.segment.IndexIO; import org.apache.druid.segment.TestHelper; @@ -36,10 +34,11 @@ import org.apache.druid.segment.loading.NoopSegmentCacheManager; import org.apache.druid.segment.loading.SegmentCacheManager; import org.apache.druid.segment.loading.SegmentLoaderConfig; +import org.apache.druid.segment.loading.SegmentLoadingException; import org.apache.druid.segment.loading.StorageLocationConfig; import org.apache.druid.server.SegmentManager; -import org.apache.druid.server.coordination.SegmentLoadDropHandler.DataSegmentChangeRequestAndStatus; -import org.apache.druid.server.coordination.SegmentLoadDropHandler.Status.STATE; +import org.apache.druid.server.coordinator.simulate.BlockingExecutorService; +import org.apache.druid.server.coordinator.simulate.WrappingScheduledExecutorService; import org.apache.druid.server.metrics.NoopServiceEmitter; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.partition.NoneShardSpec; @@ -48,14 +47,13 @@ import org.junit.Before; import org.junit.Rule; import org.junit.Test; -import org.junit.rules.ExpectedException; import org.junit.rules.TemporaryFolder; import org.mockito.ArgumentMatchers; import org.mockito.Mockito; +import org.mockito.stubbing.OngoingStubbing; import java.io.File; import java.io.IOException; -import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; @@ -63,39 +61,26 @@ import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.ConcurrentSkipListSet; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.ScheduledThreadPoolExecutor; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; /** */ public class SegmentLoadDropHandlerTest { - public static final int COUNT = 50; + private static final int COUNT = 50; + private static final String EXECUTOR_NAME_FORMAT = "SegmentLoadDropHandlerTest-[%d]"; private final ObjectMapper jsonMapper = TestHelper.makeJsonMapper(); private SegmentLoadDropHandler segmentLoadDropHandler; - private DataSegmentAnnouncer announcer; + private TestDataSegmentAnnouncer announcer; + private CacheTestSegmentLoader segmentLoader; + private File infoDir; private TestStorageLocation testStorageLocation; - private AtomicInteger announceCount; - private ConcurrentSkipListSet segmentsAnnouncedByMe; - private SegmentCacheManager segmentCacheManager; - private Set segmentsRemovedFromCache; private SegmentManager segmentManager; - private List scheduledRunnable; private SegmentLoaderConfig segmentLoaderConfig; - private SegmentLoaderConfig noAnnouncerSegmentLoaderConfig; - private ScheduledExecutorFactory scheduledExecutorFactory; - private List locations; - - @Rule - public ExpectedException expectedException = ExpectedException.none(); + private BlockingExecutorService loadingExecutor; @Rule public TemporaryFolder temporaryFolder = new TemporaryFolder(); @@ -116,68 +101,13 @@ public void setUp() throws IOException throw new RuntimeException(e); } - locations = Collections.singletonList( + final List locations = Collections.singletonList( testStorageLocation.toStorageLocationConfig() ); - scheduledRunnable = new ArrayList<>(); - - segmentsRemovedFromCache = new HashSet<>(); - segmentCacheManager = new NoopSegmentCacheManager() - { - @Override - public boolean isSegmentCached(DataSegment segment) - { - Map loadSpec = segment.getLoadSpec(); - return new File(MapUtils.getString(loadSpec, "cacheDir")).exists(); - } - - @Override - public void cleanup(DataSegment segment) - { - segmentsRemovedFromCache.add(segment); - } - }; - - segmentManager = new SegmentManager(new CacheTestSegmentLoader()); - segmentsAnnouncedByMe = new ConcurrentSkipListSet<>(); - announceCount = new AtomicInteger(0); - - announcer = new DataSegmentAnnouncer() - { - @Override - public void announceSegment(DataSegment segment) - { - segmentsAnnouncedByMe.add(segment); - announceCount.incrementAndGet(); - } - - @Override - public void unannounceSegment(DataSegment segment) - { - segmentsAnnouncedByMe.remove(segment); - announceCount.decrementAndGet(); - } - - @Override - public void announceSegments(Iterable segments) - { - for (DataSegment segment : segments) { - segmentsAnnouncedByMe.add(segment); - } - announceCount.addAndGet(Iterables.size(segments)); - } - - @Override - public void unannounceSegments(Iterable segments) - { - for (DataSegment segment : segments) { - segmentsAnnouncedByMe.remove(segment); - } - announceCount.addAndGet(-Iterables.size(segments)); - } - }; - + segmentLoader = new CacheTestSegmentLoader(); + segmentManager = new SegmentManager(segmentLoader); + announcer = new TestDataSegmentAnnouncer(); segmentLoaderConfig = new SegmentLoaderConfig() { @@ -187,166 +117,105 @@ public File getInfoDir() return testStorageLocation.getInfoDir(); } - @Override - public int getNumLoadingThreads() - { - return 5; - } - - @Override - public int getAnnounceIntervalMillis() - { - return 50; - } - @Override public List getLocations() { return locations; } - - @Override - public int getDropSegmentDelayMillis() - { - return 0; - } }; - noAnnouncerSegmentLoaderConfig = new SegmentLoaderConfig() - { - @Override - public File getInfoDir() - { - return testStorageLocation.getInfoDir(); - } + loadingExecutor = new BlockingExecutorService(EXECUTOR_NAME_FORMAT); + segmentLoadDropHandler = initHandler(segmentManager); + } - @Override - public int getNumLoadingThreads() - { - return 5; - } + @Test + public void testLoadCancelsPendingDropOfMissingSegment() throws Exception + { + segmentLoadDropHandler.start(); - @Override - public int getAnnounceIntervalMillis() - { - return 0; - } + final DataSegment segment = makeSegment("test", "1", Intervals.of("P1d/2011-04-01")); - @Override - public List getLocations() - { - return locations; - } + // Queue a drop even though the segment is not loaded yet + segmentLoadDropHandler.processBatch(dropRequest(segment)); + Assert.assertFalse(announcer.isAnnounced(segment)); + Assert.assertEquals(1, loadingExecutor.numPendingTasks()); - @Override - public int getDropSegmentDelayMillis() - { - return 0; - } - }; + // Queue a load of the segment + segmentLoadDropHandler.processBatch(loadRequest(segment)); + Assert.assertFalse(announcer.isAnnounced(segment)); + Assert.assertEquals(2, loadingExecutor.numPendingTasks()); - scheduledExecutorFactory = new ScheduledExecutorFactory() - { - @Override - public ScheduledExecutorService create(int corePoolSize, String nameFormat) - { - /* - Override normal behavoir by adding the runnable to a list so that you can make sure - all the shceduled runnables are executed by explicitly calling run() on each item in the list - */ - return new ScheduledThreadPoolExecutor(corePoolSize, Execs.makeThreadFactory(nameFormat)) - { - @Override - public ScheduledFuture schedule(Runnable command, long delay, TimeUnit unit) - { - scheduledRunnable.add(command); - return null; - } - }; - } - }; + // Try to complete both the pending drop and load + loadingExecutor.finishAllPendingTasks(); - segmentLoadDropHandler = new SegmentLoadDropHandler( - jsonMapper, - segmentLoaderConfig, - announcer, - Mockito.mock(DataSegmentServerAnnouncer.class), - segmentManager, - segmentCacheManager, - scheduledExecutorFactory.create(5, "SegmentLoadDropHandlerTest-[%d]"), - new ServerTypeConfig(ServerType.HISTORICAL) - ); + // Verify that the segment is loaded and the drop never happens + Assert.assertTrue(announcer.isAnnounced(segment)); + Assert.assertTrue(segmentLoader.getLoadedSegments().contains(segment)); + Assert.assertFalse(segmentLoader.getRemovedSegments().contains(segment)); + + segmentLoadDropHandler.stop(); } - /** - * Steps: - * 1. removeSegment() schedules a delete runnable that deletes segment files, - * 2. addSegment() succesfully loads the segment and annouces it - * 3. scheduled delete task executes and realizes it should not delete the segment files. - */ @Test - public void testSegmentLoading1() throws Exception + public void testLoadCancelsPendingDrop() throws Exception { segmentLoadDropHandler.start(); - final DataSegment segment = makeSegment("test", "1", Intervals.of("P1d/2011-04-01")); + final String datasource = "test"; + final DataSegment segment = makeSegment(datasource, "1", Intervals.of("P1d/2011-04-01")); - segmentLoadDropHandler.removeSegment(segment, DataSegmentChangeCallback.NOOP); + // Load the segment + segmentLoadDropHandler.processBatch(loadRequest(segment)); + loadingExecutor.finishNextPendingTask(); + Assert.assertTrue(announcer.isAnnounced(segment)); + Assert.assertEquals(1, segmentManager.getDataSourceCounts().get(datasource).intValue()); - Assert.assertFalse(segmentsAnnouncedByMe.contains(segment)); + // Queue a drop of the segment but do not process it + segmentLoadDropHandler.processBatch(dropRequest(segment)); + Assert.assertFalse(announcer.isAnnounced(segment)); + Assert.assertEquals(1, loadingExecutor.numPendingTasks()); - segmentLoadDropHandler.addSegment(segment, DataSegmentChangeCallback.NOOP); + // Queue a load of the segment + segmentLoadDropHandler.processBatch(loadRequest(segment)); + Assert.assertFalse(announcer.isAnnounced(segment)); + Assert.assertEquals(2, loadingExecutor.numPendingTasks()); - /* - make sure the scheduled runnable that "deletes" segment files has been executed. - Because another addSegment() call is executed, which removes the segment from segmentsToDelete field in - ZkCoordinator, the scheduled runnable will not actually delete segment files. - */ - for (Runnable runnable : scheduledRunnable) { - runnable.run(); - } + // Try to complete both the pending drop and load + loadingExecutor.finishAllPendingTasks(); - Assert.assertTrue(segmentsAnnouncedByMe.contains(segment)); - Assert.assertFalse("segment files shouldn't be deleted", segmentsRemovedFromCache.contains(segment)); + // Verify that the segment is loaded and the drop never happens + Assert.assertTrue(announcer.isAnnounced(segment)); + Assert.assertTrue(segmentLoader.getLoadedSegments().contains(segment)); + Assert.assertFalse(segmentLoader.getRemovedSegments().contains(segment)); segmentLoadDropHandler.stop(); } - /** - * Steps: - * 1. addSegment() succesfully loads the segment and annouces it - * 2. removeSegment() unannounces the segment and schedules a delete runnable that deletes segment files - * 3. addSegment() calls loadSegment() and annouces it again - * 4. scheduled delete task executes and realizes it should not delete the segment files. - */ @Test - public void testSegmentLoading2() throws Exception + public void testDropCancelsPendingLoad() throws IOException { segmentLoadDropHandler.start(); - final DataSegment segment = makeSegment("test", "1", Intervals.of("P1d/2011-04-01")); - - segmentLoadDropHandler.addSegment(segment, DataSegmentChangeCallback.NOOP); - - Assert.assertTrue(segmentsAnnouncedByMe.contains(segment)); + final String datasource = "test"; + final DataSegment segment = makeSegment(datasource, "1", Intervals.of("P1d/2011-04-01")); - segmentLoadDropHandler.removeSegment(segment, DataSegmentChangeCallback.NOOP); + // Queue a load of the segment but do not process it + segmentLoadDropHandler.processBatch(loadRequest(segment)); + Assert.assertFalse(announcer.isAnnounced(segment)); + Assert.assertEquals(1, loadingExecutor.numPendingTasks()); - Assert.assertFalse(segmentsAnnouncedByMe.contains(segment)); + // Queue a drop of the segment + segmentLoadDropHandler.processBatch(dropRequest(segment)); + Assert.assertFalse(announcer.isAnnounced(segment)); + Assert.assertEquals(2, loadingExecutor.numPendingTasks()); - segmentLoadDropHandler.addSegment(segment, DataSegmentChangeCallback.NOOP); + // Try to complete the drop first and then the load + loadingExecutor.finishAllPendingTasks(); - /* - make sure the scheduled runnable that "deletes" segment files has been executed. - Because another addSegment() call is executed, which removes the segment from segmentsToDelete field in - ZkCoordinator, the scheduled runnable will not actually delete segment files. - */ - for (Runnable runnable : scheduledRunnable) { - runnable.run(); - } - - Assert.assertTrue(segmentsAnnouncedByMe.contains(segment)); - Assert.assertFalse("segment files shouldn't be deleted", segmentsRemovedFromCache.contains(segment)); + // Verify that the segment is unannounced and the load never happens + Assert.assertFalse(announcer.isAnnounced(segment)); + Assert.assertFalse(segmentLoader.getLoadedSegments().contains(segment)); + Assert.assertTrue(segmentLoader.getRemovedSegments().contains(segment)); segmentLoadDropHandler.stop(); } @@ -378,12 +247,12 @@ public void testLoadCache() throws Exception testStorageLocation.checkInfoCache(segments); Assert.assertTrue(segmentManager.getDataSourceCounts().isEmpty()); segmentLoadDropHandler.start(); - Assert.assertTrue(!segmentManager.getDataSourceCounts().isEmpty()); + Assert.assertFalse(segmentManager.getDataSourceCounts().isEmpty()); for (int i = 0; i < COUNT; ++i) { Assert.assertEquals(11L, segmentManager.getDataSourceCounts().get("test" + i).longValue()); Assert.assertEquals(2L, segmentManager.getDataSourceCounts().get("test_two" + i).longValue()); } - Assert.assertEquals(13 * COUNT, announceCount.get()); + Assert.assertEquals(13 * COUNT, announcer.getNumAnnouncedSegments()); segmentLoadDropHandler.stop(); for (DataSegment segment : segments) { @@ -412,41 +281,6 @@ private DataSegment makeSegment(String dataSource, String version, Interval inte @Test public void testStartStop() throws Exception { - SegmentLoadDropHandler handler = new SegmentLoadDropHandler( - jsonMapper, - new SegmentLoaderConfig() - { - @Override - public File getInfoDir() - { - return infoDir; - } - - @Override - public int getNumLoadingThreads() - { - return 5; - } - - @Override - public List getLocations() - { - return locations; - } - - @Override - public int getAnnounceIntervalMillis() - { - return 50; - } - }, - announcer, - Mockito.mock(DataSegmentServerAnnouncer.class), - segmentManager, - segmentCacheManager, - new ServerTypeConfig(ServerType.HISTORICAL) - ); - Set segments = new HashSet<>(); for (int i = 0; i < COUNT; ++i) { segments.add(makeSegment("test" + i, "1", Intervals.of("P1d/2011-04-01"))); @@ -463,14 +297,14 @@ public int getAnnounceIntervalMillis() testStorageLocation.checkInfoCache(segments); Assert.assertTrue(segmentManager.getDataSourceCounts().isEmpty()); - handler.start(); - Assert.assertTrue(!segmentManager.getDataSourceCounts().isEmpty()); + segmentLoadDropHandler.start(); + Assert.assertFalse(segmentManager.getDataSourceCounts().isEmpty()); for (int i = 0; i < COUNT; ++i) { Assert.assertEquals(3L, segmentManager.getDataSourceCounts().get("test" + i).longValue()); Assert.assertEquals(2L, segmentManager.getDataSourceCounts().get("test_two" + i).longValue()); } - Assert.assertEquals(5 * COUNT, announceCount.get()); - handler.stop(); + Assert.assertEquals(5 * COUNT, announcer.getNumAnnouncedSegments()); + segmentLoadDropHandler.stop(); for (DataSegment segment : segments) { testStorageLocation.deleteSegmentInfoFromCache(segment); @@ -493,23 +327,26 @@ public void testProcessBatch() throws Exception new SegmentChangeRequestDrop(segment2) ); - ListenableFuture> future = segmentLoadDropHandler - .processBatch(batch); - - Map expectedStatusMap = new HashMap<>(); - expectedStatusMap.put(batch.get(0), SegmentLoadDropHandler.Status.PENDING); - expectedStatusMap.put(batch.get(1), SegmentLoadDropHandler.Status.SUCCESS); - List result = future.get(); - for (SegmentLoadDropHandler.DataSegmentChangeRequestAndStatus requestAndStatus : result) { - Assert.assertEquals(expectedStatusMap.get(requestAndStatus.getRequest()), requestAndStatus.getStatus()); + ListenableFuture> future + = segmentLoadDropHandler.processBatch(batch); + + Map expectedStatusMap = new HashMap<>(); + expectedStatusMap.put(batch.get(0), DataSegmentChangeResponse.Status.PENDING); + expectedStatusMap.put(batch.get(1), DataSegmentChangeResponse.Status.SUCCESS); + List result = future.get(); + for (DataSegmentChangeResponse response : result) { + Assert.assertEquals( + expectedStatusMap.get(response.getRequest()), + response.getStatus() + ); } - for (Runnable runnable : scheduledRunnable) { - runnable.run(); - } + loadingExecutor.finishAllPendingTasks(); - result = segmentLoadDropHandler.processBatch(ImmutableList.of(new SegmentChangeRequestLoad(segment1))).get(); - Assert.assertEquals(SegmentLoadDropHandler.Status.SUCCESS, result.get(0).getStatus()); + result = segmentLoadDropHandler.processBatch(loadRequest(segment1)).get(); + Assert.assertEquals(DataSegmentChangeResponse.Status.SUCCESS, result.get(0).getStatus()); + Assert.assertTrue(segmentLoader.getLoadedSegments().contains(segment1)); + Assert.assertTrue(segmentLoader.getRemovedSegments().contains(segment2)); segmentLoadDropHandler.stop(); } @@ -518,42 +355,26 @@ public void testProcessBatch() throws Exception public void testProcessBatchDuplicateLoadRequestsWhenFirstRequestFailsSecondRequestShouldSucceed() throws Exception { final SegmentManager segmentManager = Mockito.mock(SegmentManager.class); - Mockito.when(segmentManager.loadSegment(ArgumentMatchers.any(), ArgumentMatchers.anyBoolean(), - ArgumentMatchers.any(), ArgumentMatchers.any())) - .thenThrow(new RuntimeException("segment loading failure test")) - .thenReturn(true); - final SegmentLoadDropHandler segmentLoadDropHandler = new SegmentLoadDropHandler( - jsonMapper, - segmentLoaderConfig, - announcer, - Mockito.mock(DataSegmentServerAnnouncer.class), - segmentManager, - segmentCacheManager, - scheduledExecutorFactory.create(5, "SegmentLoadDropHandlerTest-[%d]"), - new ServerTypeConfig(ServerType.HISTORICAL) - ); + whenLoadSegment(segmentManager) + .thenThrow(new RuntimeException("segment loading failure test")) + .thenReturn(true); + final SegmentLoadDropHandler segmentLoadDropHandler = initHandler(segmentManager); segmentLoadDropHandler.start(); - DataSegment segment1 = makeSegment("batchtest1", "1", Intervals.of("P1d/2011-04-01")); - - List batch = ImmutableList.of(new SegmentChangeRequestLoad(segment1)); + final DataSegment segment1 = makeSegment("batchtest1", "1", Intervals.of("P1d/2011-04-01")); - ListenableFuture> future = segmentLoadDropHandler - .processBatch(batch); + ListenableFuture> future + = segmentLoadDropHandler.processBatch(loadRequest(segment1)); - for (Runnable runnable : scheduledRunnable) { - runnable.run(); - } - List result = future.get(); - Assert.assertEquals(STATE.FAILED, result.get(0).getStatus().getState()); + loadingExecutor.finishAllPendingTasks(); + List result = future.get(); + Assert.assertEquals(DataSegmentChangeResponse.State.FAILED, result.get(0).getStatus().getState()); - future = segmentLoadDropHandler.processBatch(batch); - for (Runnable runnable : scheduledRunnable) { - runnable.run(); - } + future = segmentLoadDropHandler.processBatch(loadRequest(segment1)); + loadingExecutor.finishAllPendingTasks(); result = future.get(); - Assert.assertEquals(STATE.SUCCESS, result.get(0).getStatus().getState()); + Assert.assertEquals(DataSegmentChangeResponse.State.SUCCESS, result.get(0).getStatus().getState()); segmentLoadDropHandler.stop(); } @@ -562,77 +383,144 @@ public void testProcessBatchDuplicateLoadRequestsWhenFirstRequestFailsSecondRequ public void testProcessBatchLoadDropLoadSequenceForSameSegment() throws Exception { final SegmentManager segmentManager = Mockito.mock(SegmentManager.class); - Mockito.when(segmentManager.loadSegment(ArgumentMatchers.any(), ArgumentMatchers.anyBoolean(), ArgumentMatchers.any(), ArgumentMatchers.any())) - .thenReturn(true); + whenLoadSegment(segmentManager).thenReturn(true); Mockito.doNothing().when(segmentManager).dropSegment(ArgumentMatchers.any()); - final SegmentLoadDropHandler segmentLoadDropHandler = new SegmentLoadDropHandler( - jsonMapper, - noAnnouncerSegmentLoaderConfig, - announcer, - Mockito.mock(DataSegmentServerAnnouncer.class), - segmentManager, - segmentCacheManager, - scheduledExecutorFactory.create(5, "SegmentLoadDropHandlerTest-[%d]"), - new ServerTypeConfig(ServerType.HISTORICAL) - ); + final SegmentLoadDropHandler segmentLoadDropHandler = initHandler(segmentManager); segmentLoadDropHandler.start(); - DataSegment segment1 = makeSegment("batchtest1", "1", Intervals.of("P1d/2011-04-01")); + final DataSegment segment1 = makeSegment("batchtest1", "1", Intervals.of("P1d/2011-04-01")); - List batch = ImmutableList.of(new SegmentChangeRequestLoad(segment1)); + // Request 1: Load the segment + ListenableFuture> future + = segmentLoadDropHandler.processBatch(loadRequest(segment1)); + loadingExecutor.finishAllPendingTasks(); + List result = future.get(); + Assert.assertEquals(DataSegmentChangeResponse.State.SUCCESS, result.get(0).getStatus().getState()); - // load the segment - ListenableFuture> future = segmentLoadDropHandler - .processBatch(batch); - for (Runnable runnable : scheduledRunnable) { - runnable.run(); - } - List result = future.get(); - Assert.assertEquals(STATE.SUCCESS, result.get(0).getStatus().getState()); - scheduledRunnable.clear(); - - // drop the segment - batch = ImmutableList.of(new SegmentChangeRequestDrop(segment1)); - future = segmentLoadDropHandler.processBatch(batch); - for (Runnable runnable : scheduledRunnable) { - runnable.run(); - } + // Request 2: Drop the segment + future = segmentLoadDropHandler.processBatch(dropRequest(segment1)); + loadingExecutor.finishAllPendingTasks(); result = future.get(); - Assert.assertEquals(STATE.SUCCESS, result.get(0).getStatus().getState()); - scheduledRunnable.clear(); - - // check invocations after a load-drop sequence - Mockito.verify(segmentManager, Mockito.times(1)).loadSegment(ArgumentMatchers.any(), ArgumentMatchers.anyBoolean(), ArgumentMatchers.any(), ArgumentMatchers.any()); - Mockito.verify(segmentManager, Mockito.times(1)).dropSegment(ArgumentMatchers.any()); - - // try to reload the segment - this should be a no-op since it might be the case that this is the first load client - // with this request, we'll forget about the success of the load request - batch = ImmutableList.of(new SegmentChangeRequestLoad(segment1)); - future = segmentLoadDropHandler.processBatch(batch); - Assert.assertEquals(scheduledRunnable.size(), 0); + Assert.assertEquals(DataSegmentChangeResponse.State.SUCCESS, result.get(0).getStatus().getState()); + + // Verify that 1 load and 1 drop has happened + verifyLoadCalled(segmentManager, 1); + verifyDropCalled(segmentManager, 1); + + // Request 3: Reload the segment + future = segmentLoadDropHandler.processBatch(loadRequest(segment1)); + loadingExecutor.finishAllPendingTasks(); result = future.get(); - Assert.assertEquals(STATE.SUCCESS, result.get(0).getStatus().getState()); - - // check invocations - should stay the same - Mockito.verify(segmentManager, Mockito.times(1)).loadSegment(ArgumentMatchers.any(), ArgumentMatchers.anyBoolean(), ArgumentMatchers.any(), ArgumentMatchers.any()); - Mockito.verify(segmentManager, Mockito.times(1)).dropSegment(ArgumentMatchers.any()); - - // try to reload the segment - this time the loader will know that is a fresh request to load - // so, the segment manager will be asked to load - batch = ImmutableList.of(new SegmentChangeRequestLoad(segment1)); - future = segmentLoadDropHandler.processBatch(batch); - for (Runnable runnable : scheduledRunnable) { - runnable.run(); - } + Assert.assertEquals(DataSegmentChangeResponse.State.SUCCESS, result.get(0).getStatus().getState()); + + // Verify that 1 more load has happened + verifyLoadCalled(segmentManager, 2); + verifyDropCalled(segmentManager, 1); + + // Request 4: Try to reload the segment - segment is loaded again + future = segmentLoadDropHandler.processBatch(loadRequest(segment1)); + loadingExecutor.finishAllPendingTasks(); result = future.get(); - Assert.assertEquals(STATE.SUCCESS, result.get(0).getStatus().getState()); - scheduledRunnable.clear(); + Assert.assertEquals(DataSegmentChangeResponse.State.SUCCESS, result.get(0).getStatus().getState()); + + // Verify that 1 more load has happened + verifyLoadCalled(segmentManager, 3); + verifyDropCalled(segmentManager, 1); + + segmentLoadDropHandler.stop(); + } - // check invocations - the load segment counter should bump up - Mockito.verify(segmentManager, Mockito.times(2)).loadSegment(ArgumentMatchers.any(), ArgumentMatchers.anyBoolean(), ArgumentMatchers.any(), ArgumentMatchers.any()); - Mockito.verify(segmentManager, Mockito.times(1)).dropSegment(ArgumentMatchers.any()); + @Test + public void testLoadIsNotRetriedIfFailureIsCached() throws Exception + { + final DataSegment segment = makeSegment("batchtest1", "1", Intervals.of("P1D/2011-04-01")); + + final SegmentManager segmentManager = Mockito.mock(SegmentManager.class); + final SegmentLoadDropHandler segmentLoadDropHandler = initHandler(segmentManager); + segmentLoadDropHandler.start(); + + // Send a load request to the handler + ListenableFuture> future + = segmentLoadDropHandler.processBatch(loadRequest(segment)); + Assert.assertFalse(future.isDone()); + + // Cancel the future so that it is never resolved and the response remains cached + future.cancel(true); + + // Fail the load operation + whenLoadSegment(segmentManager).thenThrow(new ISE("segment files missing")); + loadingExecutor.finishNextPendingTask(); + + // Verify that next load request completes immediately with a failed response + future = segmentLoadDropHandler.processBatch(loadRequest(segment)); + Assert.assertTrue(future.isDone()); + + DataSegmentChangeResponse response = future.get().get(0); + Assert.assertTrue(response.getRequest() instanceof SegmentChangeRequestLoad); + Assert.assertEquals(DataSegmentChangeResponse.State.FAILED, response.getStatus().getState()); + Assert.assertEquals("Could not load segment: segment files missing", response.getStatus().getFailureCause()); segmentLoadDropHandler.stop(); } + + private SegmentLoadDropHandler initHandler(SegmentManager manager) + { + SegmentCacheManager segmentCacheManager = new NoopSegmentCacheManager() + { + @Override + public boolean isSegmentCached(DataSegment segment) + { + Map loadSpec = segment.getLoadSpec(); + return new File(MapUtils.getString(loadSpec, "cacheDir")).exists(); + } + }; + return new SegmentLoadDropHandler( + jsonMapper, + segmentLoaderConfig, + announcer, + Mockito.mock(DataSegmentServerAnnouncer.class), + manager, + segmentCacheManager, + new WrappingScheduledExecutorService(EXECUTOR_NAME_FORMAT, loadingExecutor, false), + new ServerTypeConfig(ServerType.HISTORICAL) + ); + } + + private OngoingStubbing whenLoadSegment(SegmentManager manager) throws SegmentLoadingException + { + return Mockito.when( + manager.loadSegment( + ArgumentMatchers.any(), + ArgumentMatchers.anyBoolean(), + ArgumentMatchers.any(), + ArgumentMatchers.any() + ) + ); + } + + private void verifyLoadCalled(SegmentManager manager, int times) throws SegmentLoadingException + { + Mockito.verify(manager, Mockito.times(times)).loadSegment( + ArgumentMatchers.any(), + ArgumentMatchers.anyBoolean(), + ArgumentMatchers.any(), + ArgumentMatchers.any() + ); + } + + private void verifyDropCalled(SegmentManager manager, int times) + { + Mockito.verify(manager, Mockito.times(times)).dropSegment(ArgumentMatchers.any()); + } + + private static List loadRequest(DataSegment segment) + { + return Collections.singletonList(new SegmentChangeRequestLoad(segment)); + } + + private static List dropRequest(DataSegment segment) + { + return Collections.singletonList(new SegmentChangeRequestDrop(segment)); + } } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/test/TestDataSegmentAnnouncer.java b/server/src/test/java/org/apache/druid/server/coordination/TestDataSegmentAnnouncer.java similarity index 78% rename from indexing-service/src/test/java/org/apache/druid/indexing/test/TestDataSegmentAnnouncer.java rename to server/src/test/java/org/apache/druid/server/coordination/TestDataSegmentAnnouncer.java index 6874ae79a91a..d25b127e168c 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/test/TestDataSegmentAnnouncer.java +++ b/server/src/test/java/org/apache/druid/server/coordination/TestDataSegmentAnnouncer.java @@ -17,17 +17,16 @@ * under the License. */ -package org.apache.druid.indexing.test; +package org.apache.druid.server.coordination; import com.google.common.collect.Sets; -import org.apache.druid.server.coordination.DataSegmentAnnouncer; import org.apache.druid.timeline.DataSegment; import java.util.Set; public class TestDataSegmentAnnouncer implements DataSegmentAnnouncer { - public Set announcedSegments = Sets.newConcurrentHashSet(); + private final Set announcedSegments = Sets.newConcurrentHashSet(); @Override public void announceSegment(DataSegment segment) @@ -57,4 +56,17 @@ public void unannounceSegments(Iterable segments) } } + public int getNumAnnouncedSegments() + { + return announcedSegments.size(); + } + + /** + * @return true if the given segment is currently present in the list of + * announced segments. + */ + public boolean isAnnounced(DataSegment segment) + { + return announcedSegments.contains(segment); + } } diff --git a/server/src/test/java/org/apache/druid/server/coordinator/loading/HttpLoadQueuePeonTest.java b/server/src/test/java/org/apache/druid/server/coordinator/loading/HttpLoadQueuePeonTest.java index c3d3fb0fbe8f..f898c78fb46c 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/loading/HttpLoadQueuePeonTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/loading/HttpLoadQueuePeonTest.java @@ -31,7 +31,7 @@ import org.apache.druid.server.coordination.DataSegmentChangeCallback; import org.apache.druid.server.coordination.DataSegmentChangeHandler; import org.apache.druid.server.coordination.DataSegmentChangeRequest; -import org.apache.druid.server.coordination.SegmentLoadDropHandler; +import org.apache.druid.server.coordination.DataSegmentChangeResponse; import org.apache.druid.server.coordinator.CreateDataSegments; import org.apache.druid.server.coordinator.TestDruidCoordinatorConfig; import org.apache.druid.server.coordinator.simulate.BlockingExecutorService; @@ -321,12 +321,12 @@ public ListenableFuture go( } ); - List statuses = new ArrayList<>(changeRequests.size()); + List statuses = new ArrayList<>(changeRequests.size()); for (DataSegmentChangeRequest cr : changeRequests) { cr.go(this, null); - statuses.add(new SegmentLoadDropHandler.DataSegmentChangeRequestAndStatus( + statuses.add(new DataSegmentChangeResponse( cr, - SegmentLoadDropHandler.Status.SUCCESS + DataSegmentChangeResponse.Status.SUCCESS )); } return (ListenableFuture) Futures.immediateFuture( diff --git a/server/src/test/java/org/apache/druid/server/coordinator/loading/LoadQueuePeonTest.java b/server/src/test/java/org/apache/druid/server/coordinator/loading/LoadQueuePeonTest.java index aba1c9b6be6c..cc18817f5d4a 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/loading/LoadQueuePeonTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/loading/LoadQueuePeonTest.java @@ -36,7 +36,6 @@ import org.apache.druid.server.coordination.DataSegmentChangeCallback; import org.apache.druid.server.coordination.DataSegmentChangeHandler; import org.apache.druid.server.coordination.DataSegmentChangeRequest; -import org.apache.druid.server.coordination.SegmentChangeRequestDrop; import org.apache.druid.server.coordination.SegmentChangeRequestLoad; import org.apache.druid.server.coordinator.TestDruidCoordinatorConfig; import org.apache.druid.timeline.DataSegment; @@ -235,11 +234,11 @@ public void removeSegment(DataSegment segment, DataSegmentChangeCallback callbac ); Assert.assertEquals( segment, - ((SegmentChangeRequestDrop) jsonMapper.readValue( + jsonMapper.readValue( curator.getData() .decompressed() .forPath(dropRequestPath), DataSegmentChangeRequest.class - )).getSegment() + ).getSegment() ); // simulate completion of drop request by historical @@ -253,8 +252,8 @@ public void removeSegment(DataSegment segment, DataSegmentChangeCallback callbac Assert.assertNotNull(curator.checkExists().forPath(loadRequestPath)); Assert.assertEquals( segment, - ((SegmentChangeRequestLoad) jsonMapper - .readValue(curator.getData().decompressed().forPath(loadRequestPath), DataSegmentChangeRequest.class)) + jsonMapper + .readValue(curator.getData().decompressed().forPath(loadRequestPath), DataSegmentChangeRequest.class) .getSegment() ); diff --git a/server/src/test/java/org/apache/druid/server/coordinator/simulate/BlockingExecutorService.java b/server/src/test/java/org/apache/druid/server/coordinator/simulate/BlockingExecutorService.java index 111d88bf5500..309eb34ac308 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/simulate/BlockingExecutorService.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/simulate/BlockingExecutorService.java @@ -61,6 +61,11 @@ public boolean hasPendingTasks() return !taskQueue.isEmpty(); } + public int numPendingTasks() + { + return taskQueue.size(); + } + /** * Executes the next pending task on the calling thread itself. */ diff --git a/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBuilder.java b/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBuilder.java index 43f7bd9872ab..300d96679bf7 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBuilder.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBuilder.java @@ -31,6 +31,7 @@ import org.apache.druid.java.util.common.concurrent.DirectExecutorService; import org.apache.druid.java.util.common.concurrent.ScheduledExecutorFactory; import org.apache.druid.java.util.common.lifecycle.Lifecycle; +import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.java.util.http.client.HttpClient; import org.apache.druid.java.util.metrics.MetricsVerifier; @@ -73,6 +74,8 @@ */ public class CoordinatorSimulationBuilder { + private static final Logger log = new Logger(CoordinatorSimulationBuilder.class); + private static final long DEFAULT_COORDINATOR_PERIOD = 100L; private static final ObjectMapper OBJECT_MAPPER = new DefaultObjectMapper() .setInjectableValues( @@ -562,6 +565,7 @@ private ExecutorFactory(boolean directExecution) @Override public ScheduledExecutorService create(int corePoolSize, String nameFormat) { + log.trace("Dummy use of corePoolSize[%d] to avoid intellij inspection failures", corePoolSize); boolean isCoordinatorRunner = COORDINATOR_RUNNER.equals(nameFormat); // Coordinator running executor must always be blocked diff --git a/server/src/test/java/org/apache/druid/server/coordinator/simulate/TestSegmentLoadingHttpClient.java b/server/src/test/java/org/apache/druid/server/coordinator/simulate/TestSegmentLoadingHttpClient.java index 0b91e7009026..95282bbf2994 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/simulate/TestSegmentLoadingHttpClient.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/simulate/TestSegmentLoadingHttpClient.java @@ -30,7 +30,7 @@ import org.apache.druid.server.coordination.DataSegmentChangeCallback; import org.apache.druid.server.coordination.DataSegmentChangeHandler; import org.apache.druid.server.coordination.DataSegmentChangeRequest; -import org.apache.druid.server.coordination.SegmentLoadDropHandler; +import org.apache.druid.server.coordination.DataSegmentChangeResponse; import org.jboss.netty.buffer.ChannelBuffers; import org.jboss.netty.handler.codec.http.DefaultHttpResponse; import org.jboss.netty.handler.codec.http.HttpResponse; @@ -126,7 +126,7 @@ private Final processRequest( /** * Processes all the changes in the request. */ - private List processRequest( + private List processRequest( Request request, DataSegmentChangeHandler changeHandler ) throws IOException @@ -147,21 +147,20 @@ private List processRe /** * Processes each DataSegmentChangeRequest using the handler. */ - private SegmentLoadDropHandler.DataSegmentChangeRequestAndStatus processRequest( + private DataSegmentChangeResponse processRequest( DataSegmentChangeRequest request, DataSegmentChangeHandler handler ) { - SegmentLoadDropHandler.Status status; + DataSegmentChangeResponse.Status status; try { request.go(handler, NOOP_CALLBACK); - status = SegmentLoadDropHandler.Status.SUCCESS; + status = DataSegmentChangeResponse.Status.SUCCESS; } catch (Exception e) { - status = SegmentLoadDropHandler.Status.failed(e.getMessage()); + status = DataSegmentChangeResponse.Status.failed(e.getMessage()); } - return new SegmentLoadDropHandler - .DataSegmentChangeRequestAndStatus(request, status); + return new DataSegmentChangeResponse(request, status); } }