From 1aa23745cd1bcc41176f64d11e1d364bd5417289 Mon Sep 17 00:00:00 2001 From: Kashif Faraz Date: Wed, 19 Jul 2023 17:26:30 +0530 Subject: [PATCH 01/15] Invalidate cache load status on historical as soon as it is served --- .../coordination/SegmentLoadDropHandler.java | 17 ++++-- .../druid/server/coordinator/stats/Stats.java | 4 +- .../SegmentLoadDropHandlerTest.java | 57 +++++++++++++------ 3 files changed, 53 insertions(+), 25 deletions(-) diff --git a/server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java b/server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java index 2ba5dc933042..636894d214fc 100644 --- a/server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java +++ b/server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java @@ -773,7 +773,7 @@ public void close() } // Future with cancel() implementation to remove it from "waitingFutures" list - private static class CustomSettableFuture extends AbstractFuture> + private class CustomSettableFuture extends AbstractFuture> { private final LinkedHashSet waitingFutures; private final Map> statusRefs; @@ -789,15 +789,20 @@ private CustomSettableFuture( public void resolve() { - synchronized (statusRefs) { + synchronized (requestStatusesLock) { if (isDone()) { return; } - List result = new ArrayList<>(statusRefs.size()); - statusRefs.forEach( - (request, statusRef) -> result.add(new DataSegmentChangeRequestAndStatus(request, statusRef.get())) - ); + final List result = new ArrayList<>(statusRefs.size()); + statusRefs.forEach((request, statusRef) -> { + // Remove complete statuses from the cache + final Status status = statusRef.get(); + if (status != null && status.getState() != Status.STATE.PENDING) { + requestStatuses.invalidate(request); + } + result.add(new DataSegmentChangeRequestAndStatus(request, status)); + }); set(result); } diff --git a/server/src/main/java/org/apache/druid/server/coordinator/stats/Stats.java b/server/src/main/java/org/apache/druid/server/coordinator/stats/Stats.java index 28f5c91049fb..78bca1486aba 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/stats/Stats.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/stats/Stats.java @@ -50,9 +50,9 @@ public static class Segments public static final CoordinatorStat USED_BYTES = CoordinatorStat.toDebugAndEmit("usedSegmentBytes", "segment/size"); public static final CoordinatorStat UNDER_REPLICATED - = CoordinatorStat.toDebugAndEmit("underreplicated", "segment/underReplicated/count"); + = CoordinatorStat.toLogAndEmit("underreplicated", "segment/underReplicated/count", CoordinatorStat.Level.INFO); public static final CoordinatorStat UNAVAILABLE - = CoordinatorStat.toDebugAndEmit("unavailable", "segment/unavailable/count"); + = CoordinatorStat.toLogAndEmit("unavailable", "segment/unavailable/count", CoordinatorStat.Level.INFO); public static final CoordinatorStat UNNEEDED = CoordinatorStat.toDebugAndEmit("unneeded", "segment/unneeded/count"); public static final CoordinatorStat OVERSHADOWED diff --git a/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerTest.java b/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerTest.java index d6d4d2374df9..c734d7b8f8fd 100644 --- a/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerTest.java +++ b/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerTest.java @@ -562,8 +562,12 @@ public void testProcessBatchDuplicateLoadRequestsWhenFirstRequestFailsSecondRequ public void testProcessBatchLoadDropLoadSequenceForSameSegment() throws Exception { final SegmentManager segmentManager = Mockito.mock(SegmentManager.class); - Mockito.when(segmentManager.loadSegment(ArgumentMatchers.any(), ArgumentMatchers.anyBoolean(), ArgumentMatchers.any(), ArgumentMatchers.any())) - .thenReturn(true); + Mockito.doReturn(true).when(segmentManager).loadSegment( + ArgumentMatchers.any(), + ArgumentMatchers.anyBoolean(), + ArgumentMatchers.any(), + ArgumentMatchers.any() + ); Mockito.doNothing().when(segmentManager).dropSegment(ArgumentMatchers.any()); final SegmentLoadDropHandler segmentLoadDropHandler = new SegmentLoadDropHandler( jsonMapper, @@ -578,11 +582,11 @@ public void testProcessBatchLoadDropLoadSequenceForSameSegment() throws Exceptio segmentLoadDropHandler.start(); - DataSegment segment1 = makeSegment("batchtest1", "1", Intervals.of("P1d/2011-04-01")); + final DataSegment segment1 = makeSegment("batchtest1", "1", Intervals.of("P1d/2011-04-01")); List batch = ImmutableList.of(new SegmentChangeRequestLoad(segment1)); - // load the segment + // Request 1: Load the segment ListenableFuture> future = segmentLoadDropHandler .processBatch(batch); for (Runnable runnable : scheduledRunnable) { @@ -592,7 +596,7 @@ public void testProcessBatchLoadDropLoadSequenceForSameSegment() throws Exceptio Assert.assertEquals(STATE.SUCCESS, result.get(0).getStatus().getState()); scheduledRunnable.clear(); - // drop the segment + // Request 2: Drop the segment batch = ImmutableList.of(new SegmentChangeRequestDrop(segment1)); future = segmentLoadDropHandler.processBatch(batch); for (Runnable runnable : scheduledRunnable) { @@ -603,23 +607,36 @@ public void testProcessBatchLoadDropLoadSequenceForSameSegment() throws Exceptio scheduledRunnable.clear(); // check invocations after a load-drop sequence - Mockito.verify(segmentManager, Mockito.times(1)).loadSegment(ArgumentMatchers.any(), ArgumentMatchers.anyBoolean(), ArgumentMatchers.any(), ArgumentMatchers.any()); - Mockito.verify(segmentManager, Mockito.times(1)).dropSegment(ArgumentMatchers.any()); + Mockito.verify(segmentManager, Mockito.times(1)).loadSegment( + ArgumentMatchers.any(), + ArgumentMatchers.anyBoolean(), + ArgumentMatchers.any(), + ArgumentMatchers.any() + ); + Mockito.verify(segmentManager, Mockito.times(1)) + .dropSegment(ArgumentMatchers.any()); - // try to reload the segment - this should be a no-op since it might be the case that this is the first load client - // with this request, we'll forget about the success of the load request + // Request 3: Reload the segment batch = ImmutableList.of(new SegmentChangeRequestLoad(segment1)); future = segmentLoadDropHandler.processBatch(batch); - Assert.assertEquals(scheduledRunnable.size(), 0); + for (Runnable runnable : scheduledRunnable) { + runnable.run(); + } result = future.get(); Assert.assertEquals(STATE.SUCCESS, result.get(0).getStatus().getState()); + scheduledRunnable.clear(); - // check invocations - should stay the same - Mockito.verify(segmentManager, Mockito.times(1)).loadSegment(ArgumentMatchers.any(), ArgumentMatchers.anyBoolean(), ArgumentMatchers.any(), ArgumentMatchers.any()); - Mockito.verify(segmentManager, Mockito.times(1)).dropSegment(ArgumentMatchers.any()); + // check invocations - 1 more load has happened + Mockito.verify(segmentManager, Mockito.times(2)).loadSegment( + ArgumentMatchers.any(), + ArgumentMatchers.anyBoolean(), + ArgumentMatchers.any(), + ArgumentMatchers.any() + ); + Mockito.verify(segmentManager, Mockito.times(1)) + .dropSegment(ArgumentMatchers.any()); - // try to reload the segment - this time the loader will know that is a fresh request to load - // so, the segment manager will be asked to load + // Request 4: Try to reload the segment - segment is loaded again batch = ImmutableList.of(new SegmentChangeRequestLoad(segment1)); future = segmentLoadDropHandler.processBatch(batch); for (Runnable runnable : scheduledRunnable) { @@ -630,8 +647,14 @@ public void testProcessBatchLoadDropLoadSequenceForSameSegment() throws Exceptio scheduledRunnable.clear(); // check invocations - the load segment counter should bump up - Mockito.verify(segmentManager, Mockito.times(2)).loadSegment(ArgumentMatchers.any(), ArgumentMatchers.anyBoolean(), ArgumentMatchers.any(), ArgumentMatchers.any()); - Mockito.verify(segmentManager, Mockito.times(1)).dropSegment(ArgumentMatchers.any()); + Mockito.verify(segmentManager, Mockito.times(3)).loadSegment( + ArgumentMatchers.any(), + ArgumentMatchers.anyBoolean(), + ArgumentMatchers.any(), + ArgumentMatchers.any() + ); + Mockito.verify(segmentManager, Mockito.times(1)) + .dropSegment(ArgumentMatchers.any()); segmentLoadDropHandler.stop(); } From 04bb4394c5b2dff2fe58858c326320ca45bf7dcc Mon Sep 17 00:00:00 2001 From: Kashif Faraz Date: Wed, 19 Jul 2023 19:28:19 +0530 Subject: [PATCH 02/15] Revert extra change --- .../java/org/apache/druid/server/coordinator/stats/Stats.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/apache/druid/server/coordinator/stats/Stats.java b/server/src/main/java/org/apache/druid/server/coordinator/stats/Stats.java index 78bca1486aba..28f5c91049fb 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/stats/Stats.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/stats/Stats.java @@ -50,9 +50,9 @@ public static class Segments public static final CoordinatorStat USED_BYTES = CoordinatorStat.toDebugAndEmit("usedSegmentBytes", "segment/size"); public static final CoordinatorStat UNDER_REPLICATED - = CoordinatorStat.toLogAndEmit("underreplicated", "segment/underReplicated/count", CoordinatorStat.Level.INFO); + = CoordinatorStat.toDebugAndEmit("underreplicated", "segment/underReplicated/count"); public static final CoordinatorStat UNAVAILABLE - = CoordinatorStat.toLogAndEmit("unavailable", "segment/unavailable/count", CoordinatorStat.Level.INFO); + = CoordinatorStat.toDebugAndEmit("unavailable", "segment/unavailable/count"); public static final CoordinatorStat UNNEEDED = CoordinatorStat.toDebugAndEmit("unneeded", "segment/unneeded/count"); public static final CoordinatorStat OVERSHADOWED From f7369df6446bbd194f2779ae0895eced9cf78a95 Mon Sep 17 00:00:00 2001 From: Kashif Faraz Date: Thu, 20 Jul 2023 11:51:11 +0530 Subject: [PATCH 03/15] Add javadocs, move classes --- .../DataSegmentChangeResponse.java | 105 +++++++++++ .../coordination/SegmentLoadDropHandler.java | 178 ++++++------------ .../loading/HttpLoadQueuePeon.java | 16 +- .../server/http/SegmentListerResource.java | 7 +- .../SegmentLoadDropHandlerTest.java | 36 ++-- .../loading/HttpLoadQueuePeonTest.java | 8 +- .../TestSegmentLoadingHttpClient.java | 15 +- 7 files changed, 201 insertions(+), 164 deletions(-) create mode 100644 server/src/main/java/org/apache/druid/server/coordination/DataSegmentChangeResponse.java diff --git a/server/src/main/java/org/apache/druid/server/coordination/DataSegmentChangeResponse.java b/server/src/main/java/org/apache/druid/server/coordination/DataSegmentChangeResponse.java new file mode 100644 index 000000000000..4643dd7b2961 --- /dev/null +++ b/server/src/main/java/org/apache/druid/server/coordination/DataSegmentChangeResponse.java @@ -0,0 +1,105 @@ +package org.apache.druid.server.coordination; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Preconditions; + +import javax.annotation.Nullable; + +/** + * Response of a {@link DataSegmentChangeRequest}. Contains the request itself + * and the {@link Status} of the request. + */ +public class DataSegmentChangeResponse +{ + private final DataSegmentChangeRequest request; + private final Status status; + + @JsonCreator + public DataSegmentChangeResponse( + @JsonProperty("request") DataSegmentChangeRequest request, + @JsonProperty("status") Status status + ) + { + this.request = request; + this.status = status; + } + + @JsonProperty + public DataSegmentChangeRequest getRequest() + { + return request; + } + + @JsonProperty + public Status getStatus() + { + return status; + } + + @Override + public String toString() + { + return "DataSegmentChangeResponse{" + + "request=" + request + + ", status=" + status + + '}'; + } + + public enum State + { + SUCCESS, FAILED, PENDING + } + + /** + * Contains {@link State} of a {@link DataSegmentChangeRequest} and the failure + * message, if any. + */ + public static class Status + { + private final State state; + @Nullable + private final String failureCause; + + public static final Status SUCCESS = new Status(State.SUCCESS, null); + public static final Status PENDING = new Status(State.PENDING, null); + + @JsonCreator + public Status( + @JsonProperty("state") State state, + @JsonProperty("failureCause") @Nullable String failureCause + ) + { + Preconditions.checkNotNull(state, "state must be non-null"); + this.state = state; + this.failureCause = failureCause; + } + + public static Status failed(String cause) + { + return new Status(State.FAILED, cause); + } + + @JsonProperty + public State getState() + { + return state; + } + + @Nullable + @JsonProperty + public String getFailureCause() + { + return failureCause; + } + + @Override + public String toString() + { + return "Status{" + + "state=" + state + + ", failureCause='" + failureCause + '\'' + + '}'; + } + } +} diff --git a/server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java b/server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java index 636894d214fc..62a9e4789e9b 100644 --- a/server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java +++ b/server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java @@ -19,11 +19,8 @@ package org.apache.druid.server.coordination; -import com.fasterxml.jackson.annotation.JsonCreator; -import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; import com.google.common.base.Throwables; import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; @@ -32,6 +29,7 @@ import com.google.common.util.concurrent.AbstractFuture; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; +import com.google.errorprone.annotations.concurrent.GuardedBy; import com.google.inject.Inject; import org.apache.druid.guice.ManageLifecycle; import org.apache.druid.guice.ServerTypeConfig; @@ -95,10 +93,33 @@ public class SegmentLoadDropHandler implements DataSegmentChangeHandler private volatile boolean started = false; - // Keep history of load/drop request status in a LRU cache to maintain idempotency if same request shows up - // again and to return status of a completed request. Maximum size of this cache must be significantly greater - // than number of pending load/drop requests. so that history is not lost too quickly. - private final Cache> requestStatuses; + /** + * Used to cache the status of a completed load or drop request until it has + * been served to the (Coordinator) client exactly once. + *

+ * The cache is used as follows: + *

    + *
  1. An entry with state PENDING is added to the cache upon receiving a + * request to load or drop a segment.
  2. + *
  3. A duplicate request received at this point is immediately answered with PENDING.
  4. + *
  5. Once the load/drop finishes, the entry is updated to either SUCCESS or FAILED.
  6. + *
  7. A duplicate request received at this point is immediately answered with + * SUCCESS or FAILED and the entry is removed from the cache.
  8. + *
  9. If the first request itself finishes after the load or drop has already + * completed, it is answered with a SUCCESS or FAILED and the entry is removed + * from the cache.
  10. + *
+ *

+ * Maximum size of this cache must be significantly greater than the number of + * pending load/drop requests. This is generally the case because the + * Coordinator sends load/drop requests in small batches and does not send new + * requests until the previously submitted ones have either succeeded or failed. + *

+ * The cache must be updated in a thread-safe manner so that stale statuses + * are not served. + */ + @GuardedBy("requestStatusesLock") + private final Cache> requestStatuses; private final Object requestStatusesLock = new Object(); // This is the list of unresolved futures returned to callers of processBatch(List) @@ -320,7 +341,7 @@ public Map getRowCountDistributionPerDataso @Override public void addSegment(DataSegment segment, @Nullable DataSegmentChangeCallback callback) { - Status result = null; + DataSegmentChangeResponse.Status result = null; try { log.info("Loading segment %s", segment.getId()); /* @@ -349,13 +370,13 @@ each time when addSegment() is called, it has to wait for the lock in order to m throw new SegmentLoadingException(e, "Failed to announce segment[%s]", segment.getId()); } - result = Status.SUCCESS; + result = DataSegmentChangeResponse.Status.SUCCESS; } catch (Throwable e) { log.makeAlert(e, "Failed to load segment for dataSource") .addData("segment", segment) .emit(); - result = Status.failed(e.toString()); + result = DataSegmentChangeResponse.Status.failed(e.toString()); } finally { updateRequestStatus(new SegmentChangeRequestLoad(segment), result); @@ -466,7 +487,7 @@ void removeSegment( final boolean scheduleDrop ) { - Status result = null; + DataSegmentChangeResponse.Status result = null; try { announcer.unannounceSegment(segment); segmentsToDelete.add(segment); @@ -506,13 +527,13 @@ void removeSegment( runnable.run(); } - result = Status.SUCCESS; + result = DataSegmentChangeResponse.Status.SUCCESS; } catch (Exception e) { log.makeAlert(e, "Failed to remove segment") .addData("segment", segment) .emit(); - result = Status.failed(e.getMessage()); + result = DataSegmentChangeResponse.Status.failed(e.getMessage()); } finally { updateRequestStatus(new SegmentChangeRequestDrop(segment), result); @@ -527,15 +548,15 @@ public Collection getPendingDeleteSnapshot() return ImmutableList.copyOf(segmentsToDelete); } - public ListenableFuture> processBatch(List changeRequests) + public ListenableFuture> processBatch(List changeRequests) { boolean isAnyRequestDone = false; - Map> statuses = Maps.newHashMapWithExpectedSize(changeRequests.size()); + Map> statuses = Maps.newHashMapWithExpectedSize(changeRequests.size()); for (DataSegmentChangeRequest cr : changeRequests) { - AtomicReference status = processRequest(cr); - if (status.get().getState() != Status.STATE.PENDING) { + AtomicReference status = processRequest(cr); + if (status.get().getState() != DataSegmentChangeResponse.State.PENDING) { isAnyRequestDone = true; } statuses.put(cr, status); @@ -554,20 +575,20 @@ public ListenableFuture> processBatch(Li return future; } - private AtomicReference processRequest(DataSegmentChangeRequest changeRequest) + private AtomicReference processRequest(DataSegmentChangeRequest changeRequest) { synchronized (requestStatusesLock) { - AtomicReference status = requestStatuses.getIfPresent(changeRequest); + AtomicReference status = requestStatuses.getIfPresent(changeRequest); // If last load/drop request status is failed, here can try that again - if (status == null || status.get().getState() == Status.STATE.FAILED) { + if (status == null || status.get().getState() == DataSegmentChangeResponse.State.FAILED) { changeRequest.go( new DataSegmentChangeHandler() { @Override public void addSegment(DataSegment segment, DataSegmentChangeCallback callback) { - requestStatuses.put(changeRequest, new AtomicReference<>(Status.PENDING)); + requestStatuses.put(changeRequest, new AtomicReference<>(DataSegmentChangeResponse.Status.PENDING)); exec.submit( () -> SegmentLoadDropHandler.this.addSegment( ((SegmentChangeRequestLoad) changeRequest).getSegment(), @@ -579,7 +600,7 @@ public void addSegment(DataSegment segment, DataSegmentChangeCallback callback) @Override public void removeSegment(DataSegment segment, DataSegmentChangeCallback callback) { - requestStatuses.put(changeRequest, new AtomicReference<>(Status.PENDING)); + requestStatuses.put(changeRequest, new AtomicReference<>(DataSegmentChangeResponse.Status.PENDING)); SegmentLoadDropHandler.this.removeSegment( ((SegmentChangeRequestDrop) changeRequest).getSegment(), () -> resolveWaitingFutures(), @@ -589,7 +610,7 @@ public void removeSegment(DataSegment segment, DataSegmentChangeCallback callbac }, this::resolveWaitingFutures ); - } else if (status.get().getState() == Status.STATE.SUCCESS) { + } else if (status.get().getState() == DataSegmentChangeResponse.State.SUCCESS) { // SUCCESS case, we'll clear up the cached success while serving it to this client // Not doing this can lead to an incorrect response to upcoming clients for a reload requestStatuses.invalidate(changeRequest); @@ -599,13 +620,13 @@ public void removeSegment(DataSegment segment, DataSegmentChangeCallback callbac } } - private void updateRequestStatus(DataSegmentChangeRequest changeRequest, Status result) + private void updateRequestStatus(DataSegmentChangeRequest changeRequest, DataSegmentChangeResponse.Status result) { if (result == null) { - result = Status.failed("Unknown reason. Check server logs."); + result = DataSegmentChangeResponse.Status.failed("Unknown reason. Check server logs."); } synchronized (requestStatusesLock) { - AtomicReference statusRef = requestStatuses.getIfPresent(changeRequest); + AtomicReference statusRef = requestStatuses.getIfPresent(changeRequest); if (statusRef != null) { statusRef.set(result); } @@ -773,14 +794,14 @@ public void close() } // Future with cancel() implementation to remove it from "waitingFutures" list - private class CustomSettableFuture extends AbstractFuture> + private class CustomSettableFuture extends AbstractFuture> { private final LinkedHashSet waitingFutures; - private final Map> statusRefs; + private final Map> statusRefs; private CustomSettableFuture( LinkedHashSet waitingFutures, - Map> statusRefs + Map> statusRefs ) { this.waitingFutures = waitingFutures; @@ -789,19 +810,21 @@ private CustomSettableFuture( public void resolve() { + // Synchronize here to ensure thread-safety of (a) resolving this future + // and (b) updating the requestStatuses cache synchronized (requestStatusesLock) { if (isDone()) { return; } - final List result = new ArrayList<>(statusRefs.size()); + final List result = new ArrayList<>(statusRefs.size()); statusRefs.forEach((request, statusRef) -> { // Remove complete statuses from the cache - final Status status = statusRef.get(); - if (status != null && status.getState() != Status.STATE.PENDING) { + final DataSegmentChangeResponse.Status status = statusRef.get(); + if (status != null && status.getState() != DataSegmentChangeResponse.State.PENDING) { requestStatuses.invalidate(request); } - result.add(new DataSegmentChangeRequestAndStatus(request, status)); + result.add(new DataSegmentChangeResponse(request, status)); }); set(result); @@ -818,94 +841,5 @@ public boolean cancel(boolean interruptIfRunning) } } - public static class Status - { - public enum STATE - { - SUCCESS, FAILED, PENDING - } - - private final STATE state; - @Nullable - private final String failureCause; - - public static final Status SUCCESS = new Status(STATE.SUCCESS, null); - public static final Status PENDING = new Status(STATE.PENDING, null); - - @JsonCreator - Status( - @JsonProperty("state") STATE state, - @JsonProperty("failureCause") @Nullable String failureCause - ) - { - Preconditions.checkNotNull(state, "state must be non-null"); - this.state = state; - this.failureCause = failureCause; - } - - public static Status failed(String cause) - { - return new Status(STATE.FAILED, cause); - } - - @JsonProperty - public STATE getState() - { - return state; - } - - @Nullable - @JsonProperty - public String getFailureCause() - { - return failureCause; - } - - @Override - public String toString() - { - return "Status{" + - "state=" + state + - ", failureCause='" + failureCause + '\'' + - '}'; - } - } - - public static class DataSegmentChangeRequestAndStatus - { - private final DataSegmentChangeRequest request; - private final Status status; - - @JsonCreator - public DataSegmentChangeRequestAndStatus( - @JsonProperty("request") DataSegmentChangeRequest request, - @JsonProperty("status") Status status - ) - { - this.request = request; - this.status = status; - } - - @JsonProperty - public DataSegmentChangeRequest getRequest() - { - return request; - } - - @JsonProperty - public Status getStatus() - { - return status; - } - - @Override - public String toString() - { - return "DataSegmentChangeRequestAndStatus{" + - "request=" + request + - ", status=" + status + - '}'; - } - } } diff --git a/server/src/main/java/org/apache/druid/server/coordinator/loading/HttpLoadQueuePeon.java b/server/src/main/java/org/apache/druid/server/coordinator/loading/HttpLoadQueuePeon.java index 92dd4714a454..edaff7517389 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/loading/HttpLoadQueuePeon.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/loading/HttpLoadQueuePeon.java @@ -35,7 +35,7 @@ import org.apache.druid.server.coordination.DataSegmentChangeCallback; import org.apache.druid.server.coordination.DataSegmentChangeHandler; import org.apache.druid.server.coordination.DataSegmentChangeRequest; -import org.apache.druid.server.coordination.SegmentLoadDropHandler; +import org.apache.druid.server.coordination.DataSegmentChangeResponse; import org.apache.druid.server.coordinator.BytesAccumulatingResponseHandler; import org.apache.druid.server.coordinator.DruidCoordinatorConfig; import org.apache.druid.server.coordinator.stats.CoordinatorRunStats; @@ -77,8 +77,8 @@ public class HttpLoadQueuePeon implements LoadQueuePeon { }; - public static final TypeReference> RESPONSE_ENTITY_TYPE_REF = - new TypeReference>() + public static final TypeReference> RESPONSE_ENTITY_TYPE_REF = + new TypeReference>() { }; @@ -225,7 +225,7 @@ public void onSuccess(InputStream result) log.trace("Received NO CONTENT reseponse from [%s]", serverId); } else if (HttpServletResponse.SC_OK == responseHandler.getStatus()) { try { - List statuses = + List statuses = jsonMapper.readValue(result, RESPONSE_ENTITY_TYPE_REF); log.trace("Server[%s] returned status response [%s].", serverId, statuses); synchronized (lock) { @@ -235,7 +235,7 @@ public void onSuccess(InputStream result) return; } - for (SegmentLoadDropHandler.DataSegmentChangeRequestAndStatus e : statuses) { + for (DataSegmentChangeResponse e : statuses) { switch (e.getStatus().getState()) { case SUCCESS: case FAILED: @@ -300,7 +300,7 @@ private void logRequestFailure(Throwable t) } } - private void handleResponseStatus(DataSegmentChangeRequest changeRequest, SegmentLoadDropHandler.Status status) + private void handleResponseStatus(DataSegmentChangeRequest changeRequest, DataSegmentChangeResponse.Status status) { changeRequest.go( new DataSegmentChangeHandler() @@ -317,7 +317,7 @@ public void removeSegment(DataSegment segment, DataSegmentChangeCallback callbac updateSuccessOrFailureInHolder(segmentsToDrop.remove(segment), status); } - private void updateSuccessOrFailureInHolder(SegmentHolder holder, SegmentLoadDropHandler.Status status) + private void updateSuccessOrFailureInHolder(SegmentHolder holder, DataSegmentChangeResponse.Status status) { if (holder == null) { return; @@ -325,7 +325,7 @@ private void updateSuccessOrFailureInHolder(SegmentHolder holder, SegmentLoadDro queuedSegments.remove(holder); activeRequestSegments.remove(holder.getSegment()); - if (status.getState() == SegmentLoadDropHandler.Status.STATE.FAILED) { + if (status.getState() == DataSegmentChangeResponse.State.FAILED) { onRequestFailed(holder, status.getFailureCause()); } else { onRequestCompleted(holder, RequestStatus.SUCCESS); diff --git a/server/src/main/java/org/apache/druid/server/http/SegmentListerResource.java b/server/src/main/java/org/apache/druid/server/http/SegmentListerResource.java index a0281b27ff2d..35e3c7b7cdc0 100644 --- a/server/src/main/java/org/apache/druid/server/http/SegmentListerResource.java +++ b/server/src/main/java/org/apache/druid/server/http/SegmentListerResource.java @@ -34,6 +34,7 @@ import org.apache.druid.server.coordination.ChangeRequestHistory; import org.apache.druid.server.coordination.ChangeRequestsSnapshot; import org.apache.druid.server.coordination.DataSegmentChangeRequest; +import org.apache.druid.server.coordination.DataSegmentChangeResponse; import org.apache.druid.server.coordination.SegmentLoadDropHandler; import org.apache.druid.server.coordinator.loading.HttpLoadQueuePeon; import org.apache.druid.server.http.security.StateResourceFilter; @@ -248,7 +249,7 @@ public void applyDataSegmentChangeRequests( } final ResponseContext context = createContext(req.getHeader("Accept")); - final ListenableFuture> future = + final ListenableFuture> future = loadDropRequestHandler.processBatch(changeRequestList); final AsyncContext asyncContext = req.startAsync(); @@ -284,10 +285,10 @@ public void onStartAsync(AsyncEvent event) Futures.addCallback( future, - new FutureCallback>() + new FutureCallback>() { @Override - public void onSuccess(List result) + public void onSuccess(List result) { try { HttpServletResponse response = (HttpServletResponse) asyncContext.getResponse(); diff --git a/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerTest.java b/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerTest.java index c734d7b8f8fd..ede05433d42e 100644 --- a/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerTest.java +++ b/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerTest.java @@ -38,8 +38,6 @@ import org.apache.druid.segment.loading.SegmentLoaderConfig; import org.apache.druid.segment.loading.StorageLocationConfig; import org.apache.druid.server.SegmentManager; -import org.apache.druid.server.coordination.SegmentLoadDropHandler.DataSegmentChangeRequestAndStatus; -import org.apache.druid.server.coordination.SegmentLoadDropHandler.Status.STATE; import org.apache.druid.server.metrics.NoopServiceEmitter; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.partition.NoneShardSpec; @@ -493,14 +491,14 @@ public void testProcessBatch() throws Exception new SegmentChangeRequestDrop(segment2) ); - ListenableFuture> future = segmentLoadDropHandler + ListenableFuture> future = segmentLoadDropHandler .processBatch(batch); - Map expectedStatusMap = new HashMap<>(); - expectedStatusMap.put(batch.get(0), SegmentLoadDropHandler.Status.PENDING); - expectedStatusMap.put(batch.get(1), SegmentLoadDropHandler.Status.SUCCESS); - List result = future.get(); - for (SegmentLoadDropHandler.DataSegmentChangeRequestAndStatus requestAndStatus : result) { + Map expectedStatusMap = new HashMap<>(); + expectedStatusMap.put(batch.get(0), DataSegmentChangeResponse.Status.PENDING); + expectedStatusMap.put(batch.get(1), DataSegmentChangeResponse.Status.SUCCESS); + List result = future.get(); + for (DataSegmentChangeResponse requestAndStatus : result) { Assert.assertEquals(expectedStatusMap.get(requestAndStatus.getRequest()), requestAndStatus.getStatus()); } @@ -509,7 +507,7 @@ public void testProcessBatch() throws Exception } result = segmentLoadDropHandler.processBatch(ImmutableList.of(new SegmentChangeRequestLoad(segment1))).get(); - Assert.assertEquals(SegmentLoadDropHandler.Status.SUCCESS, result.get(0).getStatus()); + Assert.assertEquals(DataSegmentChangeResponse.Status.SUCCESS, result.get(0).getStatus()); segmentLoadDropHandler.stop(); } @@ -539,21 +537,21 @@ public void testProcessBatchDuplicateLoadRequestsWhenFirstRequestFailsSecondRequ List batch = ImmutableList.of(new SegmentChangeRequestLoad(segment1)); - ListenableFuture> future = segmentLoadDropHandler + ListenableFuture> future = segmentLoadDropHandler .processBatch(batch); for (Runnable runnable : scheduledRunnable) { runnable.run(); } - List result = future.get(); - Assert.assertEquals(STATE.FAILED, result.get(0).getStatus().getState()); + List result = future.get(); + Assert.assertEquals(DataSegmentChangeResponse.State.FAILED, result.get(0).getStatus().getState()); future = segmentLoadDropHandler.processBatch(batch); for (Runnable runnable : scheduledRunnable) { runnable.run(); } result = future.get(); - Assert.assertEquals(STATE.SUCCESS, result.get(0).getStatus().getState()); + Assert.assertEquals(DataSegmentChangeResponse.State.SUCCESS, result.get(0).getStatus().getState()); segmentLoadDropHandler.stop(); } @@ -587,13 +585,13 @@ public void testProcessBatchLoadDropLoadSequenceForSameSegment() throws Exceptio List batch = ImmutableList.of(new SegmentChangeRequestLoad(segment1)); // Request 1: Load the segment - ListenableFuture> future = segmentLoadDropHandler + ListenableFuture> future = segmentLoadDropHandler .processBatch(batch); for (Runnable runnable : scheduledRunnable) { runnable.run(); } - List result = future.get(); - Assert.assertEquals(STATE.SUCCESS, result.get(0).getStatus().getState()); + List result = future.get(); + Assert.assertEquals(DataSegmentChangeResponse.State.SUCCESS, result.get(0).getStatus().getState()); scheduledRunnable.clear(); // Request 2: Drop the segment @@ -603,7 +601,7 @@ public void testProcessBatchLoadDropLoadSequenceForSameSegment() throws Exceptio runnable.run(); } result = future.get(); - Assert.assertEquals(STATE.SUCCESS, result.get(0).getStatus().getState()); + Assert.assertEquals(DataSegmentChangeResponse.State.SUCCESS, result.get(0).getStatus().getState()); scheduledRunnable.clear(); // check invocations after a load-drop sequence @@ -623,7 +621,7 @@ public void testProcessBatchLoadDropLoadSequenceForSameSegment() throws Exceptio runnable.run(); } result = future.get(); - Assert.assertEquals(STATE.SUCCESS, result.get(0).getStatus().getState()); + Assert.assertEquals(DataSegmentChangeResponse.State.SUCCESS, result.get(0).getStatus().getState()); scheduledRunnable.clear(); // check invocations - 1 more load has happened @@ -643,7 +641,7 @@ public void testProcessBatchLoadDropLoadSequenceForSameSegment() throws Exceptio runnable.run(); } result = future.get(); - Assert.assertEquals(STATE.SUCCESS, result.get(0).getStatus().getState()); + Assert.assertEquals(DataSegmentChangeResponse.State.SUCCESS, result.get(0).getStatus().getState()); scheduledRunnable.clear(); // check invocations - the load segment counter should bump up diff --git a/server/src/test/java/org/apache/druid/server/coordinator/loading/HttpLoadQueuePeonTest.java b/server/src/test/java/org/apache/druid/server/coordinator/loading/HttpLoadQueuePeonTest.java index c3d3fb0fbe8f..f898c78fb46c 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/loading/HttpLoadQueuePeonTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/loading/HttpLoadQueuePeonTest.java @@ -31,7 +31,7 @@ import org.apache.druid.server.coordination.DataSegmentChangeCallback; import org.apache.druid.server.coordination.DataSegmentChangeHandler; import org.apache.druid.server.coordination.DataSegmentChangeRequest; -import org.apache.druid.server.coordination.SegmentLoadDropHandler; +import org.apache.druid.server.coordination.DataSegmentChangeResponse; import org.apache.druid.server.coordinator.CreateDataSegments; import org.apache.druid.server.coordinator.TestDruidCoordinatorConfig; import org.apache.druid.server.coordinator.simulate.BlockingExecutorService; @@ -321,12 +321,12 @@ public ListenableFuture go( } ); - List statuses = new ArrayList<>(changeRequests.size()); + List statuses = new ArrayList<>(changeRequests.size()); for (DataSegmentChangeRequest cr : changeRequests) { cr.go(this, null); - statuses.add(new SegmentLoadDropHandler.DataSegmentChangeRequestAndStatus( + statuses.add(new DataSegmentChangeResponse( cr, - SegmentLoadDropHandler.Status.SUCCESS + DataSegmentChangeResponse.Status.SUCCESS )); } return (ListenableFuture) Futures.immediateFuture( diff --git a/server/src/test/java/org/apache/druid/server/coordinator/simulate/TestSegmentLoadingHttpClient.java b/server/src/test/java/org/apache/druid/server/coordinator/simulate/TestSegmentLoadingHttpClient.java index 0b91e7009026..95282bbf2994 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/simulate/TestSegmentLoadingHttpClient.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/simulate/TestSegmentLoadingHttpClient.java @@ -30,7 +30,7 @@ import org.apache.druid.server.coordination.DataSegmentChangeCallback; import org.apache.druid.server.coordination.DataSegmentChangeHandler; import org.apache.druid.server.coordination.DataSegmentChangeRequest; -import org.apache.druid.server.coordination.SegmentLoadDropHandler; +import org.apache.druid.server.coordination.DataSegmentChangeResponse; import org.jboss.netty.buffer.ChannelBuffers; import org.jboss.netty.handler.codec.http.DefaultHttpResponse; import org.jboss.netty.handler.codec.http.HttpResponse; @@ -126,7 +126,7 @@ private Final processRequest( /** * Processes all the changes in the request. */ - private List processRequest( + private List processRequest( Request request, DataSegmentChangeHandler changeHandler ) throws IOException @@ -147,21 +147,20 @@ private List processRe /** * Processes each DataSegmentChangeRequest using the handler. */ - private SegmentLoadDropHandler.DataSegmentChangeRequestAndStatus processRequest( + private DataSegmentChangeResponse processRequest( DataSegmentChangeRequest request, DataSegmentChangeHandler handler ) { - SegmentLoadDropHandler.Status status; + DataSegmentChangeResponse.Status status; try { request.go(handler, NOOP_CALLBACK); - status = SegmentLoadDropHandler.Status.SUCCESS; + status = DataSegmentChangeResponse.Status.SUCCESS; } catch (Exception e) { - status = SegmentLoadDropHandler.Status.failed(e.getMessage()); + status = DataSegmentChangeResponse.Status.failed(e.getMessage()); } - return new SegmentLoadDropHandler - .DataSegmentChangeRequestAndStatus(request, status); + return new DataSegmentChangeResponse(request, status); } } From 54ae3c5ebdf7df569c2cb176f005ecd40de3b81c Mon Sep 17 00:00:00 2001 From: Kashif Faraz Date: Thu, 20 Jul 2023 13:57:02 +0530 Subject: [PATCH 04/15] Add missing license to new file --- .../DataSegmentChangeResponse.java | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/server/src/main/java/org/apache/druid/server/coordination/DataSegmentChangeResponse.java b/server/src/main/java/org/apache/druid/server/coordination/DataSegmentChangeResponse.java index 4643dd7b2961..896aa1c1339c 100644 --- a/server/src/main/java/org/apache/druid/server/coordination/DataSegmentChangeResponse.java +++ b/server/src/main/java/org/apache/druid/server/coordination/DataSegmentChangeResponse.java @@ -1,3 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + package org.apache.druid.server.coordination; import com.fasterxml.jackson.annotation.JsonCreator; From 66ec366b3a6e3694685548398b35fe9d41482da0 Mon Sep 17 00:00:00 2001 From: Kashif Faraz Date: Thu, 20 Jul 2023 14:21:32 +0530 Subject: [PATCH 05/15] Acquire lock before accessing requestStatuses --- .../coordination/SegmentLoadDropHandler.java | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java b/server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java index 62a9e4789e9b..ebbe4b42fa38 100644 --- a/server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java +++ b/server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java @@ -588,7 +588,9 @@ private AtomicReference processRequest(DataSeg @Override public void addSegment(DataSegment segment, DataSegmentChangeCallback callback) { - requestStatuses.put(changeRequest, new AtomicReference<>(DataSegmentChangeResponse.Status.PENDING)); + markRequestAsPending(changeRequest); + + // Load the segment asynchronously exec.submit( () -> SegmentLoadDropHandler.this.addSegment( ((SegmentChangeRequestLoad) changeRequest).getSegment(), @@ -600,7 +602,9 @@ public void addSegment(DataSegment segment, DataSegmentChangeCallback callback) @Override public void removeSegment(DataSegment segment, DataSegmentChangeCallback callback) { - requestStatuses.put(changeRequest, new AtomicReference<>(DataSegmentChangeResponse.Status.PENDING)); + markRequestAsPending(changeRequest); + + // Drop the segment synchronously SegmentLoadDropHandler.this.removeSegment( ((SegmentChangeRequestDrop) changeRequest).getSegment(), () -> resolveWaitingFutures(), @@ -620,6 +624,13 @@ public void removeSegment(DataSegment segment, DataSegmentChangeCallback callbac } } + private void markRequestAsPending(DataSegmentChangeRequest changeRequest) + { + synchronized (requestStatusesLock) { + requestStatuses.put(changeRequest, new AtomicReference<>(DataSegmentChangeResponse.Status.PENDING)); + } + } + private void updateRequestStatus(DataSegmentChangeRequest changeRequest, DataSegmentChangeResponse.Status result) { if (result == null) { From b849b64a22586142b461370ccff24e4d9f4a8028 Mon Sep 17 00:00:00 2001 From: Kashif Faraz Date: Fri, 21 Jul 2023 20:59:01 +0530 Subject: [PATCH 06/15] Invalidate response cache in SegmentLoadDropHandler --- ...penderatorDriverRealtimeIndexTaskTest.java | 2 +- .../common/task/RealtimeIndexTaskTest.java | 2 +- .../SeekableStreamIndexTaskTestBase.java | 2 +- .../apache/druid/server/SegmentManager.java | 2 +- .../DataSegmentChangeRequest.java | 3 + .../SegmentChangeRequestDrop.java | 1 + .../SegmentChangeRequestLoad.java | 1 + .../SegmentChangeRequestNoop.java | 8 + .../coordination/SegmentLoadDropHandler.java | 322 +++++++------ .../metrics/HistoricalMetricsMonitor.java | 2 +- .../druid/server/SegmentManagerTest.java | 2 +- .../SegmentLoadDropHandlerCacheTest.java | 6 +- .../SegmentLoadDropHandlerTest.java | 452 ++++++------------ .../TestDataSegmentAnnouncer.java | 18 +- .../metrics/HistoricalMetricsMonitorTest.java | 2 +- 15 files changed, 362 insertions(+), 463 deletions(-) rename {indexing-service/src/test/java/org/apache/druid/indexing/test => server/src/test/java/org/apache/druid/server/coordination}/TestDataSegmentAnnouncer.java (78%) diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java index 548d5b6a05b7..5fd8daaf8011 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java @@ -72,7 +72,7 @@ import org.apache.druid.indexing.overlord.TaskLockbox; import org.apache.druid.indexing.overlord.TaskStorage; import org.apache.druid.indexing.overlord.supervisor.SupervisorManager; -import org.apache.druid.indexing.test.TestDataSegmentAnnouncer; +import org.apache.druid.server.coordination.TestDataSegmentAnnouncer; import org.apache.druid.indexing.test.TestDataSegmentKiller; import org.apache.druid.indexing.test.TestDataSegmentPusher; import org.apache.druid.jackson.DefaultObjectMapper; diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/RealtimeIndexTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/RealtimeIndexTaskTest.java index cd9163fad9ad..4f8e2dd9a68e 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/RealtimeIndexTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/RealtimeIndexTaskTest.java @@ -59,7 +59,6 @@ import org.apache.druid.indexing.overlord.TaskLockbox; import org.apache.druid.indexing.overlord.TaskStorage; import org.apache.druid.indexing.overlord.supervisor.SupervisorManager; -import org.apache.druid.indexing.test.TestDataSegmentAnnouncer; import org.apache.druid.indexing.test.TestDataSegmentKiller; import org.apache.druid.indexing.test.TestDataSegmentPusher; import org.apache.druid.indexing.test.TestIndexerMetadataStorageCoordinator; @@ -115,6 +114,7 @@ import org.apache.druid.server.DruidNode; import org.apache.druid.server.coordination.DataSegmentServerAnnouncer; import org.apache.druid.server.coordination.ServerType; +import org.apache.druid.server.coordination.TestDataSegmentAnnouncer; import org.apache.druid.server.security.AuthTestUtils; import org.apache.druid.testing.InitializedNullHandlingTest; import org.apache.druid.timeline.DataSegment; diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTestBase.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTestBase.java index 79bb286f7ad8..decc27bf4db0 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTestBase.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTestBase.java @@ -75,7 +75,6 @@ import org.apache.druid.indexing.overlord.TaskLockbox; import org.apache.druid.indexing.overlord.TaskStorage; import org.apache.druid.indexing.overlord.supervisor.SupervisorManager; -import org.apache.druid.indexing.test.TestDataSegmentAnnouncer; import org.apache.druid.indexing.test.TestDataSegmentKiller; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.Intervals; @@ -120,6 +119,7 @@ import org.apache.druid.server.DruidNode; import org.apache.druid.server.coordination.DataSegmentServerAnnouncer; import org.apache.druid.server.coordination.ServerType; +import org.apache.druid.server.coordination.TestDataSegmentAnnouncer; import org.apache.druid.server.security.AuthTestUtils; import org.apache.druid.timeline.DataSegment; import org.apache.druid.utils.CompressionUtils; diff --git a/server/src/main/java/org/apache/druid/server/SegmentManager.java b/server/src/main/java/org/apache/druid/server/SegmentManager.java index 6ce441b2ab1e..4cb5d11f397d 100644 --- a/server/src/main/java/org/apache/druid/server/SegmentManager.java +++ b/server/src/main/java/org/apache/druid/server/SegmentManager.java @@ -186,7 +186,7 @@ public Set getDataSourceNames() * * @return a map of dataSources and number of segments */ - public Map getDataSourceCounts() + public Map getDataSourceToNumSegments() { return CollectionUtils.mapValues(dataSources, SegmentManager.DataSourceState::getNumSegments); } diff --git a/server/src/main/java/org/apache/druid/server/coordination/DataSegmentChangeRequest.java b/server/src/main/java/org/apache/druid/server/coordination/DataSegmentChangeRequest.java index 6a044aaf6bba..c6500fade7d3 100644 --- a/server/src/main/java/org/apache/druid/server/coordination/DataSegmentChangeRequest.java +++ b/server/src/main/java/org/apache/druid/server/coordination/DataSegmentChangeRequest.java @@ -21,6 +21,7 @@ import com.fasterxml.jackson.annotation.JsonSubTypes; import com.fasterxml.jackson.annotation.JsonTypeInfo; +import org.apache.druid.timeline.DataSegment; import javax.annotation.Nullable; @@ -37,4 +38,6 @@ public interface DataSegmentChangeRequest void go(DataSegmentChangeHandler handler, @Nullable DataSegmentChangeCallback callback); String asString(); + + DataSegment getSegment(); } diff --git a/server/src/main/java/org/apache/druid/server/coordination/SegmentChangeRequestDrop.java b/server/src/main/java/org/apache/druid/server/coordination/SegmentChangeRequestDrop.java index c4229a028806..94bb84561f89 100644 --- a/server/src/main/java/org/apache/druid/server/coordination/SegmentChangeRequestDrop.java +++ b/server/src/main/java/org/apache/druid/server/coordination/SegmentChangeRequestDrop.java @@ -45,6 +45,7 @@ public SegmentChangeRequestDrop( @JsonProperty @JsonUnwrapped + @Override public DataSegment getSegment() { return segment; diff --git a/server/src/main/java/org/apache/druid/server/coordination/SegmentChangeRequestLoad.java b/server/src/main/java/org/apache/druid/server/coordination/SegmentChangeRequestLoad.java index 130c7b50d80c..d2e2db35fd76 100644 --- a/server/src/main/java/org/apache/druid/server/coordination/SegmentChangeRequestLoad.java +++ b/server/src/main/java/org/apache/druid/server/coordination/SegmentChangeRequestLoad.java @@ -63,6 +63,7 @@ public void go(DataSegmentChangeHandler handler, @Nullable DataSegmentChangeCall @JsonProperty @JsonUnwrapped + @Override public DataSegment getSegment() { return segment; diff --git a/server/src/main/java/org/apache/druid/server/coordination/SegmentChangeRequestNoop.java b/server/src/main/java/org/apache/druid/server/coordination/SegmentChangeRequestNoop.java index 3978bcf693c4..112e03e41299 100644 --- a/server/src/main/java/org/apache/druid/server/coordination/SegmentChangeRequestNoop.java +++ b/server/src/main/java/org/apache/druid/server/coordination/SegmentChangeRequestNoop.java @@ -19,6 +19,8 @@ package org.apache.druid.server.coordination; +import org.apache.druid.timeline.DataSegment; + import javax.annotation.Nullable; /** @@ -39,4 +41,10 @@ public String asString() { return "NOOP"; } + + @Override + public DataSegment getSegment() + { + return null; + } } diff --git a/server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java b/server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java index ebbe4b42fa38..8de1a326dbe4 100644 --- a/server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java +++ b/server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java @@ -35,6 +35,7 @@ import org.apache.druid.guice.ServerTypeConfig; import org.apache.druid.java.util.common.FileUtils; import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.common.Stopwatch; import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.common.lifecycle.LifecycleStart; import org.apache.druid.java.util.common.lifecycle.LifecycleStop; @@ -75,10 +76,14 @@ public class SegmentLoadDropHandler implements DataSegmentChangeHandler { private static final EmittingLogger log = new EmittingLogger(SegmentLoadDropHandler.class); - // Synchronizes removals from segmentsToDelete - private final Object segmentDeleteLock = new Object(); + /** + * Synchronizes removals from {@link #segmentsToDrop}. + */ + private final Object segmentDropLock = new Object(); - // Synchronizes start/stop of this object. + /** + * Synchronizes start/stop of the SegmentLoadDropHandler. + */ private final Object startStopLock = new Object(); private final ObjectMapper jsonMapper; @@ -88,7 +93,7 @@ public class SegmentLoadDropHandler implements DataSegmentChangeHandler private final SegmentManager segmentManager; private final ScheduledExecutorService exec; private final ServerTypeConfig serverTypeConfig; - private final ConcurrentSkipListSet segmentsToDelete; + private final ConcurrentSkipListSet segmentsToDrop; private final SegmentCacheManager segmentCacheManager; private volatile boolean started = false; @@ -119,11 +124,15 @@ public class SegmentLoadDropHandler implements DataSegmentChangeHandler * are not served. */ @GuardedBy("requestStatusesLock") - private final Cache> requestStatuses; + private final Cache> requestStatuses; private final Object requestStatusesLock = new Object(); - // This is the list of unresolved futures returned to callers of processBatch(List) - // Threads loading/dropping segments resolve these futures as and when some segment load/drop finishes. + /** + * List of unresolved futures returned to callers of {@link #processBatch}. + * Each {@link CustomSettableFuture} corresponds to a single batch of requests. + * A future is resolved as soon as a single request in the batch completes with + * either success or failure. + */ private final LinkedHashSet waitingFutures = new LinkedHashSet<>(); @Inject @@ -173,8 +182,11 @@ public SegmentLoadDropHandler( this.exec = exec; this.serverTypeConfig = serverTypeConfig; - this.segmentsToDelete = new ConcurrentSkipListSet<>(); - requestStatuses = CacheBuilder.newBuilder().maximumSize(config.getStatusQueueMaxSize()).initialCapacity(8).build(); + this.segmentsToDrop = new ConcurrentSkipListSet<>(); + this.requestStatuses = CacheBuilder.newBuilder() + .maximumSize(config.getStatusQueueMaxSize()) + .initialCapacity(8) + .build(); } @LifecycleStart @@ -185,7 +197,8 @@ public void start() throws IOException return; } - log.info("Starting..."); + final Stopwatch stopwatch = Stopwatch.createStarted(); + log.info("Starting SegmentLoadDropHandler..."); try { if (!config.getLocations().isEmpty()) { loadLocalCache(); @@ -200,7 +213,7 @@ public void start() throws IOException throw new RuntimeException(e); } started = true; - log.info("Started."); + log.info("Started SegmentLoadDropHandler in [%d]ms.", stopwatch.millisElapsed()); } } @@ -212,7 +225,7 @@ public void stop() return; } - log.info("Stopping..."); + log.info("Stopping SegmentLoadDropHandler..."); try { if (shouldAnnounce()) { serverAnnouncer.unannounce(); @@ -224,7 +237,7 @@ public void stop() finally { started = false; } - log.info("Stopped."); + log.info("Stopped SegmentLoadDropHandler."); } } @@ -235,7 +248,7 @@ public boolean isStarted() private void loadLocalCache() throws IOException { - final long start = System.currentTimeMillis(); + final Stopwatch stopwatch = Stopwatch.createStarted(); File baseDir = config.getInfoDir(); FileUtils.mkdirp(baseDir); @@ -277,7 +290,7 @@ private void loadLocalCache() throws IOException addSegments( cachedSegments, - () -> log.info("Cache load took %,d ms", System.currentTimeMillis() - start) + () -> log.info("Finished cache load in [%,d] ms", stopwatch.millisElapsed()) ); } @@ -293,21 +306,25 @@ private void loadSegment(DataSegment segment, DataSegmentChangeCallback callback * * @throws SegmentLoadingException if it fails to load the given segment */ - private void loadSegment(DataSegment segment, DataSegmentChangeCallback callback, boolean lazy, @Nullable - ExecutorService loadSegmentIntoPageCacheExec) - throws SegmentLoadingException + private void loadSegment( + DataSegment segment, + DataSegmentChangeCallback callback, + boolean lazy, + @Nullable ExecutorService loadSegmentIntoPageCacheExec + ) throws SegmentLoadingException { final boolean loaded; try { - loaded = segmentManager.loadSegment(segment, - lazy, - () -> this.removeSegment(segment, DataSegmentChangeCallback.NOOP, false), - loadSegmentIntoPageCacheExec + loaded = segmentManager.loadSegment( + segment, + lazy, + () -> unannounceAndDropSegment(segment, DataSegmentChangeCallback.NOOP), + loadSegmentIntoPageCacheExec ); } catch (Exception e) { - removeSegment(segment, callback, false); - throw new SegmentLoadingException(e, "Exception loading segment[%s]", segment.getId()); + unannounceAndDropSegment(segment, callback); + throw new SegmentLoadingException(e, "Could not load segment: %s", e.getMessage()); } if (loaded) { @@ -317,7 +334,7 @@ private void loadSegment(DataSegment segment, DataSegmentChangeCallback callback jsonMapper.writeValue(segmentInfoCacheFile, segment); } catch (IOException e) { - removeSegment(segment, callback, false); + unannounceAndDropSegment(segment, callback); throw new SegmentLoadingException( e, "Failed to write to disk segment info cache file[%s]", @@ -340,25 +357,35 @@ public Map getRowCountDistributionPerDataso @Override public void addSegment(DataSegment segment, @Nullable DataSegmentChangeCallback callback) + { + // Load and announce the segment asynchronously + exec.submit(() -> loadAndAnnounceSegment(segment, callback)); + } + + /** + * Loads the segment synchronously, announces it and updates the status of the + * corresponding change request in the {@link #requestStatuses} cache. + */ + void loadAndAnnounceSegment(DataSegment segment, @Nullable DataSegmentChangeCallback callback) { DataSegmentChangeResponse.Status result = null; try { - log.info("Loading segment %s", segment.getId()); + log.info("Loading segment[%s]", segment.getId()); /* The lock below is used to prevent a race condition when the scheduled runnable in removeSegment() starts, and if (segmentsToDelete.remove(segment)) returns true, in which case historical will start deleting segment files. At that point, it's possible that right after the "if" check, addSegment() is called and actually loads the segment, which makes dropping segment and downloading segment happen at the same time. */ - if (segmentsToDelete.contains(segment)) { + if (segmentsToDrop.contains(segment)) { /* Both contains(segment) and remove(segment) can be moved inside the synchronized block. However, in that case, each time when addSegment() is called, it has to wait for the lock in order to make progress, which will make things slow. Given that in most cases segmentsToDelete.contains(segment) returns false, it will save a lot of cost of acquiring lock by doing the "contains" check outside the synchronized block. */ - synchronized (segmentDeleteLock) { - segmentsToDelete.remove(segment); + synchronized (segmentDropLock) { + segmentsToDrop.remove(segment); } } loadSegment(segment, DataSegmentChangeCallback.NOOP, false); @@ -376,7 +403,7 @@ each time when addSegment() is called, it has to wait for the lock in order to m log.makeAlert(e, "Failed to load segment for dataSource") .addData("segment", segment) .emit(); - result = DataSegmentChangeResponse.Status.failed(e.toString()); + result = DataSegmentChangeResponse.Status.failed(e.getMessage()); } finally { updateRequestStatus(new SegmentChangeRequestLoad(segment), result); @@ -474,59 +501,51 @@ private void addSegments(Collection segments, final DataSegmentChan } } + /** + * Unannounces and drops the segment immediately. + */ + @VisibleForTesting + void unannounceAndDropSegment( + DataSegment segment, + DataSegmentChangeCallback callback + ) + { + unannounceSegment(segment, callback); + segmentsToDrop.add(segment); + dropSegment(segment); + } + @Override public void removeSegment(DataSegment segment, @Nullable DataSegmentChangeCallback callback) { - removeSegment(segment, callback, true); + unannounceSegment(segment, callback); + + // Schedule drop of segment + segmentsToDrop.add(segment); + log.info( + "Completely removing segment[%s] in [%,d] millis.", + segment.getId(), config.getDropSegmentDelayMillis() + ); + exec.schedule( + () -> dropSegment(segment), + config.getDropSegmentDelayMillis(), + TimeUnit.MILLISECONDS + ); } - @VisibleForTesting - void removeSegment( + /** + * Unannounces the segment and updates the result for the corresponding DROP request. + * A DROP request is considered successful if the unannouncement has succeeded, + * even if the segment files have not been deleted yet. + */ + private void unannounceSegment( final DataSegment segment, - @Nullable final DataSegmentChangeCallback callback, - final boolean scheduleDrop + @Nullable final DataSegmentChangeCallback callback ) { DataSegmentChangeResponse.Status result = null; try { announcer.unannounceSegment(segment); - segmentsToDelete.add(segment); - - Runnable runnable = () -> { - try { - synchronized (segmentDeleteLock) { - if (segmentsToDelete.remove(segment)) { - segmentManager.dropSegment(segment); - - File segmentInfoCacheFile = new File(config.getInfoDir(), segment.getId().toString()); - if (!segmentInfoCacheFile.delete()) { - log.warn("Unable to delete segmentInfoCacheFile[%s]", segmentInfoCacheFile); - } - } - } - } - catch (Exception e) { - log.makeAlert(e, "Failed to remove segment! Possible resource leak!") - .addData("segment", segment) - .emit(); - } - }; - - if (scheduleDrop) { - log.info( - "Completely removing [%s] in [%,d] millis", - segment.getId(), - config.getDropSegmentDelayMillis() - ); - exec.schedule( - runnable, - config.getDropSegmentDelayMillis(), - TimeUnit.MILLISECONDS - ); - } else { - runnable.run(); - } - result = DataSegmentChangeResponse.Status.SUCCESS; } catch (Exception e) { @@ -543,27 +562,56 @@ void removeSegment( } } + /** + * Drops the given segment synchronously. + */ + private void dropSegment(DataSegment segment) + { + try { + synchronized (segmentDropLock) { + if (segmentsToDrop.remove(segment)) { + segmentManager.dropSegment(segment); + + File segmentInfoCacheFile = new File(config.getInfoDir(), segment.getId().toString()); + if (!segmentInfoCacheFile.delete()) { + log.warn("Unable to delete segmentInfoCacheFile[%s]", segmentInfoCacheFile); + } + } + } + } + catch (Exception e) { + log.makeAlert(e, "Failed to drop segment. Possible resource leak.") + .addData("segment", segment) + .emit(); + } + } + public Collection getPendingDeleteSnapshot() { - return ImmutableList.copyOf(segmentsToDelete); + return ImmutableList.copyOf(segmentsToDrop); } - public ListenableFuture> processBatch(List changeRequests) + /** + * + */ + public ListenableFuture> processBatch( + List changeRequests + ) { boolean isAnyRequestDone = false; - Map> statuses = Maps.newHashMapWithExpectedSize(changeRequests.size()); + Map> statuses + = Maps.newHashMapWithExpectedSize(changeRequests.size()); for (DataSegmentChangeRequest cr : changeRequests) { - AtomicReference status = processRequest(cr); - if (status.get().getState() != DataSegmentChangeResponse.State.PENDING) { + AtomicReference status = processRequest(cr); + if (status.get().getStatus().getState() != DataSegmentChangeResponse.State.PENDING) { isAnyRequestDone = true; } statuses.put(cr, status); } - CustomSettableFuture future = new CustomSettableFuture(waitingFutures, statuses); - + final CustomSettableFuture future = new CustomSettableFuture(waitingFutures, statuses); if (isAnyRequestDone) { future.resolve(); } else { @@ -575,71 +623,62 @@ public ListenableFuture> processBatch(List processRequest(DataSegmentChangeRequest changeRequest) + /** + * Starts the processing of the given segment change request. + */ + private AtomicReference processRequest(DataSegmentChangeRequest changeRequest) { synchronized (requestStatusesLock) { - AtomicReference status = requestStatuses.getIfPresent(changeRequest); - - // If last load/drop request status is failed, here can try that again - if (status == null || status.get().getState() == DataSegmentChangeResponse.State.FAILED) { - changeRequest.go( - new DataSegmentChangeHandler() - { - @Override - public void addSegment(DataSegment segment, DataSegmentChangeCallback callback) - { - markRequestAsPending(changeRequest); - - // Load the segment asynchronously - exec.submit( - () -> SegmentLoadDropHandler.this.addSegment( - ((SegmentChangeRequestLoad) changeRequest).getSegment(), - () -> resolveWaitingFutures() - ) - ); - } - - @Override - public void removeSegment(DataSegment segment, DataSegmentChangeCallback callback) - { - markRequestAsPending(changeRequest); + final DataSegment segment = changeRequest.getSegment(); + final AtomicReference cachedResponse = requestStatuses.getIfPresent(segment); + + if (cachedResponse == null) { + // Start a fresh LOAD or DROP as there is no previous known request + markRequestAsPending(changeRequest); + changeRequest.go(this, this::resolveWaitingFutures); + return requestStatuses.getIfPresent(segment); + } else if (cachedResponse.get().getRequest().equals(changeRequest)) { + // Serve the cached response and clear it if the request has completed, + // so that we don't keep serving a stale response indefinitely + if (cachedResponse.get().getStatus().getState() != DataSegmentChangeResponse.State.PENDING) { + requestStatuses.invalidate(segment); + } + return cachedResponse; + } else { + // Clear the cached response as this is a different request + // TODO: what if the previous one was pending?? + requestStatuses.invalidate(segment); - // Drop the segment synchronously - SegmentLoadDropHandler.this.removeSegment( - ((SegmentChangeRequestDrop) changeRequest).getSegment(), - () -> resolveWaitingFutures(), - true - ); - } - }, - this::resolveWaitingFutures - ); - } else if (status.get().getState() == DataSegmentChangeResponse.State.SUCCESS) { - // SUCCESS case, we'll clear up the cached success while serving it to this client - // Not doing this can lead to an incorrect response to upcoming clients for a reload - requestStatuses.invalidate(changeRequest); - return status; + markRequestAsPending(changeRequest); + changeRequest.go(this, this::resolveWaitingFutures); + return requestStatuses.getIfPresent(segment); } - return requestStatuses.getIfPresent(changeRequest); } } private void markRequestAsPending(DataSegmentChangeRequest changeRequest) { synchronized (requestStatusesLock) { - requestStatuses.put(changeRequest, new AtomicReference<>(DataSegmentChangeResponse.Status.PENDING)); + DataSegmentChangeResponse pendingResponse + = new DataSegmentChangeResponse(changeRequest, DataSegmentChangeResponse.Status.PENDING); + requestStatuses.put(changeRequest.getSegment(), new AtomicReference<>(pendingResponse)); } } - private void updateRequestStatus(DataSegmentChangeRequest changeRequest, DataSegmentChangeResponse.Status result) + /** + * Updates the status for the given request only if this request is currently + * in progress and present in {@link #requestStatuses}. + */ + private void updateRequestStatus(DataSegmentChangeRequest changeRequest, DataSegmentChangeResponse.Status status) { - if (result == null) { - result = DataSegmentChangeResponse.Status.failed("Unknown reason. Check server logs."); + if (status == null) { + status = DataSegmentChangeResponse.Status.failed("Unknown reason. Check server logs."); } synchronized (requestStatusesLock) { - AtomicReference statusRef = requestStatuses.getIfPresent(changeRequest); - if (statusRef != null) { - statusRef.set(result); + AtomicReference statusRef + = requestStatuses.getIfPresent(changeRequest.getSegment()); + if (statusRef != null && statusRef.get().getRequest().equals(changeRequest)) { + statusRef.set(new DataSegmentChangeResponse(changeRequest, status)); } } } @@ -804,19 +843,23 @@ public void close() } } - // Future with cancel() implementation to remove it from "waitingFutures" list + /** + * Represents the future result of a single batch of segment load drop requests. + *

+ * Upon cancellation, this future removes itself from {@link #waitingFutures}. + */ private class CustomSettableFuture extends AbstractFuture> { private final LinkedHashSet waitingFutures; - private final Map> statusRefs; + private final Map> resultRefs; private CustomSettableFuture( LinkedHashSet waitingFutures, - Map> statusRefs + Map> resultRefs ) { this.waitingFutures = waitingFutures; - this.statusRefs = statusRefs; + this.resultRefs = resultRefs; } public void resolve() @@ -828,17 +871,18 @@ public void resolve() return; } - final List result = new ArrayList<>(statusRefs.size()); - statusRefs.forEach((request, statusRef) -> { + final List results = new ArrayList<>(resultRefs.size()); + resultRefs.forEach((request, reference) -> { + DataSegmentChangeResponse result = reference.get(); + results.add(result); + // Remove complete statuses from the cache - final DataSegmentChangeResponse.Status status = statusRef.get(); - if (status != null && status.getState() != DataSegmentChangeResponse.State.PENDING) { - requestStatuses.invalidate(request); + if (result != null && result.getStatus().getState() != DataSegmentChangeResponse.State.PENDING) { + requestStatuses.invalidate(request.getSegment()); } - result.add(new DataSegmentChangeResponse(request, status)); }); - set(result); + set(results); } } diff --git a/server/src/main/java/org/apache/druid/server/metrics/HistoricalMetricsMonitor.java b/server/src/main/java/org/apache/druid/server/metrics/HistoricalMetricsMonitor.java index 30083cbc0911..1373f38b6e4c 100644 --- a/server/src/main/java/org/apache/druid/server/metrics/HistoricalMetricsMonitor.java +++ b/server/src/main/java/org/apache/druid/server/metrics/HistoricalMetricsMonitor.java @@ -90,7 +90,7 @@ public boolean doMonitor(ServiceEmitter emitter) emitter.emit(builder.build("segment/usedPercent", usedPercent)); } - for (Map.Entry entry : segmentManager.getDataSourceCounts().entrySet()) { + for (Map.Entry entry : segmentManager.getDataSourceToNumSegments().entrySet()) { String dataSource = entry.getKey(); long count = entry.getValue(); final ServiceMetricEvent.Builder builder = diff --git a/server/src/test/java/org/apache/druid/server/SegmentManagerTest.java b/server/src/test/java/org/apache/druid/server/SegmentManagerTest.java index 65afb8ea7abf..f092ced614b6 100644 --- a/server/src/test/java/org/apache/druid/server/SegmentManagerTest.java +++ b/server/src/test/java/org/apache/druid/server/SegmentManagerTest.java @@ -446,7 +446,7 @@ private void assertResult(List expectedExistingSegments) throws Seg } Assert.assertEquals(expectedDataSourceNames, segmentManager.getDataSourceNames()); - Assert.assertEquals(expectedDataSourceCounts, segmentManager.getDataSourceCounts()); + Assert.assertEquals(expectedDataSourceCounts, segmentManager.getDataSourceToNumSegments()); Assert.assertEquals(expectedDataSourceSizes, segmentManager.getDataSourceSizes()); final Map dataSources = segmentManager.getDataSources(); diff --git a/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerCacheTest.java b/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerCacheTest.java index ff22f00baae1..47205c4d3a43 100644 --- a/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerCacheTest.java +++ b/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerCacheTest.java @@ -141,13 +141,13 @@ public void testLoadLocalCache() throws IOException, SegmentLoadingException // make sure adding segments beyond allowed size fails Mockito.reset(segmentAnnouncer); DataSegment newSegment = makeSegment("test", "new-segment"); - loadDropHandler.addSegment(newSegment, null); + loadDropHandler.loadAndAnnounceSegment(newSegment, null); Mockito.verify(segmentAnnouncer, Mockito.never()).announceSegment(any()); Mockito.verify(segmentAnnouncer, Mockito.never()).announceSegments(any()); // clearing some segment should allow for new segments - loadDropHandler.removeSegment(expectedSegments.get(0), null, false); - loadDropHandler.addSegment(newSegment, null); + loadDropHandler.unannounceAndDropSegment(expectedSegments.get(0), null); + loadDropHandler.loadAndAnnounceSegment(newSegment, null); Mockito.verify(segmentAnnouncer).announceSegment(newSegment); } diff --git a/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerTest.java b/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerTest.java index ede05433d42e..d11e23e2e8a5 100644 --- a/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerTest.java +++ b/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerTest.java @@ -22,13 +22,11 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Iterables; import com.google.common.util.concurrent.ListenableFuture; import org.apache.druid.guice.ServerTypeConfig; +import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.MapUtils; -import org.apache.druid.java.util.common.concurrent.Execs; -import org.apache.druid.java.util.common.concurrent.ScheduledExecutorFactory; import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.segment.IndexIO; import org.apache.druid.segment.TestHelper; @@ -36,8 +34,11 @@ import org.apache.druid.segment.loading.NoopSegmentCacheManager; import org.apache.druid.segment.loading.SegmentCacheManager; import org.apache.druid.segment.loading.SegmentLoaderConfig; +import org.apache.druid.segment.loading.SegmentLoadingException; import org.apache.druid.segment.loading.StorageLocationConfig; import org.apache.druid.server.SegmentManager; +import org.apache.druid.server.coordinator.simulate.BlockingExecutorService; +import org.apache.druid.server.coordinator.simulate.WrappingScheduledExecutorService; import org.apache.druid.server.metrics.NoopServiceEmitter; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.partition.NoneShardSpec; @@ -46,14 +47,13 @@ import org.junit.Before; import org.junit.Rule; import org.junit.Test; -import org.junit.rules.ExpectedException; import org.junit.rules.TemporaryFolder; import org.mockito.ArgumentMatchers; import org.mockito.Mockito; +import org.mockito.stubbing.OngoingStubbing; import java.io.File; import java.io.IOException; -import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; @@ -61,39 +61,26 @@ import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.ConcurrentSkipListSet; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.ScheduledThreadPoolExecutor; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; /** */ public class SegmentLoadDropHandlerTest { - public static final int COUNT = 50; + private static final int COUNT = 50; + private static final String EXECUTOR_NAME_FORMAT = "SegmentLoadDropHandlerTest-[%d]"; private final ObjectMapper jsonMapper = TestHelper.makeJsonMapper(); private SegmentLoadDropHandler segmentLoadDropHandler; - private DataSegmentAnnouncer announcer; + private TestDataSegmentAnnouncer announcer; private File infoDir; private TestStorageLocation testStorageLocation; - private AtomicInteger announceCount; - private ConcurrentSkipListSet segmentsAnnouncedByMe; private SegmentCacheManager segmentCacheManager; private Set segmentsRemovedFromCache; private SegmentManager segmentManager; - private List scheduledRunnable; private SegmentLoaderConfig segmentLoaderConfig; - private SegmentLoaderConfig noAnnouncerSegmentLoaderConfig; - private ScheduledExecutorFactory scheduledExecutorFactory; - private List locations; - - @Rule - public ExpectedException expectedException = ExpectedException.none(); + private BlockingExecutorService loadingExecutor; @Rule public TemporaryFolder temporaryFolder = new TemporaryFolder(); @@ -114,12 +101,10 @@ public void setUp() throws IOException throw new RuntimeException(e); } - locations = Collections.singletonList( + final List locations = Collections.singletonList( testStorageLocation.toStorageLocationConfig() ); - scheduledRunnable = new ArrayList<>(); - segmentsRemovedFromCache = new HashSet<>(); segmentCacheManager = new NoopSegmentCacheManager() { @@ -138,44 +123,7 @@ public void cleanup(DataSegment segment) }; segmentManager = new SegmentManager(new CacheTestSegmentLoader()); - segmentsAnnouncedByMe = new ConcurrentSkipListSet<>(); - announceCount = new AtomicInteger(0); - - announcer = new DataSegmentAnnouncer() - { - @Override - public void announceSegment(DataSegment segment) - { - segmentsAnnouncedByMe.add(segment); - announceCount.incrementAndGet(); - } - - @Override - public void unannounceSegment(DataSegment segment) - { - segmentsAnnouncedByMe.remove(segment); - announceCount.decrementAndGet(); - } - - @Override - public void announceSegments(Iterable segments) - { - for (DataSegment segment : segments) { - segmentsAnnouncedByMe.add(segment); - } - announceCount.addAndGet(Iterables.size(segments)); - } - - @Override - public void unannounceSegments(Iterable segments) - { - for (DataSegment segment : segments) { - segmentsAnnouncedByMe.remove(segment); - } - announceCount.addAndGet(-Iterables.size(segments)); - } - }; - + announcer = new TestDataSegmentAnnouncer(); segmentLoaderConfig = new SegmentLoaderConfig() { @@ -185,166 +133,64 @@ public File getInfoDir() return testStorageLocation.getInfoDir(); } - @Override - public int getNumLoadingThreads() - { - return 5; - } - - @Override - public int getAnnounceIntervalMillis() - { - return 50; - } - - @Override - public List getLocations() - { - return locations; - } - - @Override - public int getDropSegmentDelayMillis() - { - return 0; - } - }; - - noAnnouncerSegmentLoaderConfig = new SegmentLoaderConfig() - { - @Override - public File getInfoDir() - { - return testStorageLocation.getInfoDir(); - } - - @Override - public int getNumLoadingThreads() - { - return 5; - } - - @Override - public int getAnnounceIntervalMillis() - { - return 0; - } - @Override public List getLocations() { return locations; } - - @Override - public int getDropSegmentDelayMillis() - { - return 0; - } - }; - - scheduledExecutorFactory = new ScheduledExecutorFactory() - { - @Override - public ScheduledExecutorService create(int corePoolSize, String nameFormat) - { - /* - Override normal behavoir by adding the runnable to a list so that you can make sure - all the shceduled runnables are executed by explicitly calling run() on each item in the list - */ - return new ScheduledThreadPoolExecutor(corePoolSize, Execs.makeThreadFactory(nameFormat)) - { - @Override - public ScheduledFuture schedule(Runnable command, long delay, TimeUnit unit) - { - scheduledRunnable.add(command); - return null; - } - }; - } }; - segmentLoadDropHandler = new SegmentLoadDropHandler( - jsonMapper, - segmentLoaderConfig, - announcer, - Mockito.mock(DataSegmentServerAnnouncer.class), - segmentManager, - segmentCacheManager, - scheduledExecutorFactory.create(5, "SegmentLoadDropHandlerTest-[%d]"), - new ServerTypeConfig(ServerType.HISTORICAL) - ); + loadingExecutor = new BlockingExecutorService(EXECUTOR_NAME_FORMAT); + segmentLoadDropHandler = initHandler(segmentManager); } - /** - * Steps: - * 1. removeSegment() schedules a delete runnable that deletes segment files, - * 2. addSegment() succesfully loads the segment and annouces it - * 3. scheduled delete task executes and realizes it should not delete the segment files. - */ @Test - public void testSegmentLoading1() throws Exception + public void testLoadCancelsPendingDrop() throws Exception { segmentLoadDropHandler.start(); final DataSegment segment = makeSegment("test", "1", Intervals.of("P1d/2011-04-01")); segmentLoadDropHandler.removeSegment(segment, DataSegmentChangeCallback.NOOP); + Assert.assertFalse(announcer.isAnnounced(segment)); + Assert.assertTrue(loadingExecutor.hasPendingTasks()); - Assert.assertFalse(segmentsAnnouncedByMe.contains(segment)); + segmentLoadDropHandler.loadAndAnnounceSegment(segment, DataSegmentChangeCallback.NOOP); - segmentLoadDropHandler.addSegment(segment, DataSegmentChangeCallback.NOOP); + // Try to complete pending drop of segment + loadingExecutor.finishAllPendingTasks(); - /* - make sure the scheduled runnable that "deletes" segment files has been executed. - Because another addSegment() call is executed, which removes the segment from segmentsToDelete field in - ZkCoordinator, the scheduled runnable will not actually delete segment files. - */ - for (Runnable runnable : scheduledRunnable) { - runnable.run(); - } - - Assert.assertTrue(segmentsAnnouncedByMe.contains(segment)); - Assert.assertFalse("segment files shouldn't be deleted", segmentsRemovedFromCache.contains(segment)); + Assert.assertTrue(announcer.isAnnounced(segment)); + Assert.assertFalse(segmentsRemovedFromCache.contains(segment)); segmentLoadDropHandler.stop(); } - /** - * Steps: - * 1. addSegment() succesfully loads the segment and annouces it - * 2. removeSegment() unannounces the segment and schedules a delete runnable that deletes segment files - * 3. addSegment() calls loadSegment() and annouces it again - * 4. scheduled delete task executes and realizes it should not delete the segment files. - */ @Test - public void testSegmentLoading2() throws Exception + public void testLoadCancelsPendingDrop2() throws Exception { segmentLoadDropHandler.start(); - final DataSegment segment = makeSegment("test", "1", Intervals.of("P1d/2011-04-01")); - - segmentLoadDropHandler.addSegment(segment, DataSegmentChangeCallback.NOOP); + final String datasource = "test"; + final DataSegment segment = makeSegment(datasource, "1", Intervals.of("P1d/2011-04-01")); - Assert.assertTrue(segmentsAnnouncedByMe.contains(segment)); + segmentLoadDropHandler.loadAndAnnounceSegment(segment, DataSegmentChangeCallback.NOOP); + Assert.assertTrue(announcer.isAnnounced(segment)); + Assert.assertEquals(1, segmentManager.getDataSourceToNumSegments().get(datasource).intValue()); + // Unannounce segment and schedule a drop segmentLoadDropHandler.removeSegment(segment, DataSegmentChangeCallback.NOOP); + Assert.assertFalse(announcer.isAnnounced(segment)); + Assert.assertTrue(loadingExecutor.hasPendingTasks()); - Assert.assertFalse(segmentsAnnouncedByMe.contains(segment)); + segmentLoadDropHandler.loadAndAnnounceSegment(segment, DataSegmentChangeCallback.NOOP); - segmentLoadDropHandler.addSegment(segment, DataSegmentChangeCallback.NOOP); - - /* - make sure the scheduled runnable that "deletes" segment files has been executed. - Because another addSegment() call is executed, which removes the segment from segmentsToDelete field in - ZkCoordinator, the scheduled runnable will not actually delete segment files. - */ - for (Runnable runnable : scheduledRunnable) { - runnable.run(); - } + // Try to complete pending drop of segment + loadingExecutor.finishAllPendingTasks(); - Assert.assertTrue(segmentsAnnouncedByMe.contains(segment)); - Assert.assertFalse("segment files shouldn't be deleted", segmentsRemovedFromCache.contains(segment)); + // Verify that segment is still loaded + Assert.assertTrue(announcer.isAnnounced(segment)); + Assert.assertFalse(segmentsRemovedFromCache.contains(segment)); segmentLoadDropHandler.stop(); } @@ -374,14 +220,14 @@ public void testLoadCache() throws Exception } testStorageLocation.checkInfoCache(segments); - Assert.assertTrue(segmentManager.getDataSourceCounts().isEmpty()); + Assert.assertTrue(segmentManager.getDataSourceToNumSegments().isEmpty()); segmentLoadDropHandler.start(); - Assert.assertTrue(!segmentManager.getDataSourceCounts().isEmpty()); + Assert.assertFalse(segmentManager.getDataSourceToNumSegments().isEmpty()); for (int i = 0; i < COUNT; ++i) { - Assert.assertEquals(11L, segmentManager.getDataSourceCounts().get("test" + i).longValue()); - Assert.assertEquals(2L, segmentManager.getDataSourceCounts().get("test_two" + i).longValue()); + Assert.assertEquals(11L, segmentManager.getDataSourceToNumSegments().get("test" + i).longValue()); + Assert.assertEquals(2L, segmentManager.getDataSourceToNumSegments().get("test_two" + i).longValue()); } - Assert.assertEquals(13 * COUNT, announceCount.get()); + Assert.assertEquals(13 * COUNT, announcer.getNumAnnouncedSegments()); segmentLoadDropHandler.stop(); for (DataSegment segment : segments) { @@ -410,41 +256,6 @@ private DataSegment makeSegment(String dataSource, String version, Interval inte @Test public void testStartStop() throws Exception { - SegmentLoadDropHandler handler = new SegmentLoadDropHandler( - jsonMapper, - new SegmentLoaderConfig() - { - @Override - public File getInfoDir() - { - return infoDir; - } - - @Override - public int getNumLoadingThreads() - { - return 5; - } - - @Override - public List getLocations() - { - return locations; - } - - @Override - public int getAnnounceIntervalMillis() - { - return 50; - } - }, - announcer, - Mockito.mock(DataSegmentServerAnnouncer.class), - segmentManager, - segmentCacheManager, - new ServerTypeConfig(ServerType.HISTORICAL) - ); - Set segments = new HashSet<>(); for (int i = 0; i < COUNT; ++i) { segments.add(makeSegment("test" + i, "1", Intervals.of("P1d/2011-04-01"))); @@ -459,16 +270,16 @@ public int getAnnounceIntervalMillis() } testStorageLocation.checkInfoCache(segments); - Assert.assertTrue(segmentManager.getDataSourceCounts().isEmpty()); + Assert.assertTrue(segmentManager.getDataSourceToNumSegments().isEmpty()); - handler.start(); - Assert.assertTrue(!segmentManager.getDataSourceCounts().isEmpty()); + segmentLoadDropHandler.start(); + Assert.assertFalse(segmentManager.getDataSourceToNumSegments().isEmpty()); for (int i = 0; i < COUNT; ++i) { - Assert.assertEquals(3L, segmentManager.getDataSourceCounts().get("test" + i).longValue()); - Assert.assertEquals(2L, segmentManager.getDataSourceCounts().get("test_two" + i).longValue()); + Assert.assertEquals(3L, segmentManager.getDataSourceToNumSegments().get("test" + i).longValue()); + Assert.assertEquals(2L, segmentManager.getDataSourceToNumSegments().get("test_two" + i).longValue()); } - Assert.assertEquals(5 * COUNT, announceCount.get()); - handler.stop(); + Assert.assertEquals(5 * COUNT, announcer.getNumAnnouncedSegments()); + segmentLoadDropHandler.stop(); for (DataSegment segment : segments) { testStorageLocation.deleteSegmentInfoFromCache(segment); @@ -502,9 +313,7 @@ public void testProcessBatch() throws Exception Assert.assertEquals(expectedStatusMap.get(requestAndStatus.getRequest()), requestAndStatus.getStatus()); } - for (Runnable runnable : scheduledRunnable) { - runnable.run(); - } + loadingExecutor.finishAllPendingTasks(); result = segmentLoadDropHandler.processBatch(ImmutableList.of(new SegmentChangeRequestLoad(segment1))).get(); Assert.assertEquals(DataSegmentChangeResponse.Status.SUCCESS, result.get(0).getStatus()); @@ -516,40 +325,26 @@ public void testProcessBatch() throws Exception public void testProcessBatchDuplicateLoadRequestsWhenFirstRequestFailsSecondRequestShouldSucceed() throws Exception { final SegmentManager segmentManager = Mockito.mock(SegmentManager.class); - Mockito.when(segmentManager.loadSegment(ArgumentMatchers.any(), ArgumentMatchers.anyBoolean(), - ArgumentMatchers.any(), ArgumentMatchers.any())) - .thenThrow(new RuntimeException("segment loading failure test")) - .thenReturn(true); - final SegmentLoadDropHandler segmentLoadDropHandler = new SegmentLoadDropHandler( - jsonMapper, - segmentLoaderConfig, - announcer, - Mockito.mock(DataSegmentServerAnnouncer.class), - segmentManager, - segmentCacheManager, - scheduledExecutorFactory.create(5, "SegmentLoadDropHandlerTest-[%d]"), - new ServerTypeConfig(ServerType.HISTORICAL) - ); + whenLoadSegment(segmentManager) + .thenThrow(new RuntimeException("segment loading failure test")) + .thenReturn(true); + final SegmentLoadDropHandler segmentLoadDropHandler = initHandler(segmentManager); segmentLoadDropHandler.start(); - DataSegment segment1 = makeSegment("batchtest1", "1", Intervals.of("P1d/2011-04-01")); + final DataSegment segment1 = makeSegment("batchtest1", "1", Intervals.of("P1d/2011-04-01")); List batch = ImmutableList.of(new SegmentChangeRequestLoad(segment1)); ListenableFuture> future = segmentLoadDropHandler .processBatch(batch); - for (Runnable runnable : scheduledRunnable) { - runnable.run(); - } + loadingExecutor.finishAllPendingTasks(); List result = future.get(); Assert.assertEquals(DataSegmentChangeResponse.State.FAILED, result.get(0).getStatus().getState()); future = segmentLoadDropHandler.processBatch(batch); - for (Runnable runnable : scheduledRunnable) { - runnable.run(); - } + loadingExecutor.finishAllPendingTasks(); result = future.get(); Assert.assertEquals(DataSegmentChangeResponse.State.SUCCESS, result.get(0).getStatus().getState()); @@ -560,23 +355,9 @@ public void testProcessBatchDuplicateLoadRequestsWhenFirstRequestFailsSecondRequ public void testProcessBatchLoadDropLoadSequenceForSameSegment() throws Exception { final SegmentManager segmentManager = Mockito.mock(SegmentManager.class); - Mockito.doReturn(true).when(segmentManager).loadSegment( - ArgumentMatchers.any(), - ArgumentMatchers.anyBoolean(), - ArgumentMatchers.any(), - ArgumentMatchers.any() - ); + whenLoadSegment(segmentManager).thenReturn(true); Mockito.doNothing().when(segmentManager).dropSegment(ArgumentMatchers.any()); - final SegmentLoadDropHandler segmentLoadDropHandler = new SegmentLoadDropHandler( - jsonMapper, - noAnnouncerSegmentLoaderConfig, - announcer, - Mockito.mock(DataSegmentServerAnnouncer.class), - segmentManager, - segmentCacheManager, - scheduledExecutorFactory.create(5, "SegmentLoadDropHandlerTest-[%d]"), - new ServerTypeConfig(ServerType.HISTORICAL) - ); + final SegmentLoadDropHandler segmentLoadDropHandler = initHandler(segmentManager); segmentLoadDropHandler.start(); @@ -587,73 +368,122 @@ public void testProcessBatchLoadDropLoadSequenceForSameSegment() throws Exceptio // Request 1: Load the segment ListenableFuture> future = segmentLoadDropHandler .processBatch(batch); - for (Runnable runnable : scheduledRunnable) { - runnable.run(); - } + loadingExecutor.finishAllPendingTasks(); List result = future.get(); Assert.assertEquals(DataSegmentChangeResponse.State.SUCCESS, result.get(0).getStatus().getState()); - scheduledRunnable.clear(); // Request 2: Drop the segment batch = ImmutableList.of(new SegmentChangeRequestDrop(segment1)); future = segmentLoadDropHandler.processBatch(batch); - for (Runnable runnable : scheduledRunnable) { - runnable.run(); - } + loadingExecutor.finishAllPendingTasks(); result = future.get(); Assert.assertEquals(DataSegmentChangeResponse.State.SUCCESS, result.get(0).getStatus().getState()); - scheduledRunnable.clear(); // check invocations after a load-drop sequence - Mockito.verify(segmentManager, Mockito.times(1)).loadSegment( - ArgumentMatchers.any(), - ArgumentMatchers.anyBoolean(), - ArgumentMatchers.any(), - ArgumentMatchers.any() - ); + verifyLoadCalled(segmentManager, 1); Mockito.verify(segmentManager, Mockito.times(1)) .dropSegment(ArgumentMatchers.any()); // Request 3: Reload the segment batch = ImmutableList.of(new SegmentChangeRequestLoad(segment1)); future = segmentLoadDropHandler.processBatch(batch); - for (Runnable runnable : scheduledRunnable) { - runnable.run(); - } + loadingExecutor.finishAllPendingTasks(); result = future.get(); Assert.assertEquals(DataSegmentChangeResponse.State.SUCCESS, result.get(0).getStatus().getState()); - scheduledRunnable.clear(); // check invocations - 1 more load has happened - Mockito.verify(segmentManager, Mockito.times(2)).loadSegment( - ArgumentMatchers.any(), - ArgumentMatchers.anyBoolean(), - ArgumentMatchers.any(), - ArgumentMatchers.any() - ); - Mockito.verify(segmentManager, Mockito.times(1)) - .dropSegment(ArgumentMatchers.any()); + verifyLoadCalled(segmentManager, 2); + verifyDropCalled(segmentManager, 1); // Request 4: Try to reload the segment - segment is loaded again batch = ImmutableList.of(new SegmentChangeRequestLoad(segment1)); future = segmentLoadDropHandler.processBatch(batch); - for (Runnable runnable : scheduledRunnable) { - runnable.run(); - } + loadingExecutor.finishAllPendingTasks(); result = future.get(); Assert.assertEquals(DataSegmentChangeResponse.State.SUCCESS, result.get(0).getStatus().getState()); - scheduledRunnable.clear(); // check invocations - the load segment counter should bump up - Mockito.verify(segmentManager, Mockito.times(3)).loadSegment( + verifyLoadCalled(segmentManager, 3); + verifyDropCalled(segmentManager, 1); + + segmentLoadDropHandler.stop(); + } + + @Test + public void testLoadIsNotRetriedIfFailureIsCached() throws Exception + { + final DataSegment segment = makeSegment("batchtest1", "1", Intervals.of("P1D/2011-04-01")); + + final SegmentManager segmentManager = Mockito.mock(SegmentManager.class); + final SegmentLoadDropHandler segmentLoadDropHandler = initHandler(segmentManager); + segmentLoadDropHandler.start(); + + // Send a load request to the handler + ListenableFuture> future = segmentLoadDropHandler.processBatch( + Collections.singletonList(new SegmentChangeRequestLoad(segment)) + ); + Assert.assertFalse(future.isDone()); + + // Cancel the future so that it is never resolved and the response remains cached + future.cancel(true); + + // Fail the load operation + whenLoadSegment(segmentManager).thenThrow(new ISE("segment files missing")); + loadingExecutor.finishNextPendingTask(); + + // Verify that next load request completes immediately with a failed response + future = segmentLoadDropHandler.processBatch( + Collections.singletonList(new SegmentChangeRequestLoad(segment)) + ); + Assert.assertTrue(future.isDone()); + + DataSegmentChangeResponse response = future.get().get(0); + Assert.assertTrue(response.getRequest() instanceof SegmentChangeRequestLoad); + Assert.assertEquals(DataSegmentChangeResponse.State.FAILED, response.getStatus().getState()); + Assert.assertEquals("Could not load segment: segment files missing", response.getStatus().getFailureCause()); + + segmentLoadDropHandler.stop(); + } + + private SegmentLoadDropHandler initHandler(SegmentManager manager) + { + return new SegmentLoadDropHandler( + jsonMapper, + segmentLoaderConfig, + announcer, + Mockito.mock(DataSegmentServerAnnouncer.class), + manager, + segmentCacheManager, + new WrappingScheduledExecutorService(EXECUTOR_NAME_FORMAT, loadingExecutor, false), + new ServerTypeConfig(ServerType.HISTORICAL) + ); + } + + private OngoingStubbing whenLoadSegment(SegmentManager manager) throws SegmentLoadingException + { + return Mockito.when( + manager.loadSegment( + ArgumentMatchers.any(), + ArgumentMatchers.anyBoolean(), + ArgumentMatchers.any(), + ArgumentMatchers.any() + ) + ); + } + + private void verifyLoadCalled(SegmentManager manager, int times) throws SegmentLoadingException + { + Mockito.verify(manager, Mockito.times(times)).loadSegment( ArgumentMatchers.any(), ArgumentMatchers.anyBoolean(), ArgumentMatchers.any(), ArgumentMatchers.any() ); - Mockito.verify(segmentManager, Mockito.times(1)) - .dropSegment(ArgumentMatchers.any()); + } - segmentLoadDropHandler.stop(); + private void verifyDropCalled(SegmentManager manager, int times) + { + Mockito.verify(manager, Mockito.times(times)).dropSegment(ArgumentMatchers.any()); } + } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/test/TestDataSegmentAnnouncer.java b/server/src/test/java/org/apache/druid/server/coordination/TestDataSegmentAnnouncer.java similarity index 78% rename from indexing-service/src/test/java/org/apache/druid/indexing/test/TestDataSegmentAnnouncer.java rename to server/src/test/java/org/apache/druid/server/coordination/TestDataSegmentAnnouncer.java index 6874ae79a91a..d25b127e168c 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/test/TestDataSegmentAnnouncer.java +++ b/server/src/test/java/org/apache/druid/server/coordination/TestDataSegmentAnnouncer.java @@ -17,17 +17,16 @@ * under the License. */ -package org.apache.druid.indexing.test; +package org.apache.druid.server.coordination; import com.google.common.collect.Sets; -import org.apache.druid.server.coordination.DataSegmentAnnouncer; import org.apache.druid.timeline.DataSegment; import java.util.Set; public class TestDataSegmentAnnouncer implements DataSegmentAnnouncer { - public Set announcedSegments = Sets.newConcurrentHashSet(); + private final Set announcedSegments = Sets.newConcurrentHashSet(); @Override public void announceSegment(DataSegment segment) @@ -57,4 +56,17 @@ public void unannounceSegments(Iterable segments) } } + public int getNumAnnouncedSegments() + { + return announcedSegments.size(); + } + + /** + * @return true if the given segment is currently present in the list of + * announced segments. + */ + public boolean isAnnounced(DataSegment segment) + { + return announcedSegments.contains(segment); + } } diff --git a/server/src/test/java/org/apache/druid/server/metrics/HistoricalMetricsMonitorTest.java b/server/src/test/java/org/apache/druid/server/metrics/HistoricalMetricsMonitorTest.java index 8e658faafb97..b1f56496415b 100644 --- a/server/src/test/java/org/apache/druid/server/metrics/HistoricalMetricsMonitorTest.java +++ b/server/src/test/java/org/apache/druid/server/metrics/HistoricalMetricsMonitorTest.java @@ -88,7 +88,7 @@ public void testSimple() EasyMock.expect(druidServerConfig.getTier()).andReturn(tier).once(); EasyMock.expect(druidServerConfig.getPriority()).andReturn(priority).once(); EasyMock.expect(druidServerConfig.getMaxSize()).andReturn(maxSize).times(2); - EasyMock.expect(segmentManager.getDataSourceCounts()).andReturn(ImmutableMap.of(dataSource, 1L)); + EasyMock.expect(segmentManager.getDataSourceToNumSegments()).andReturn(ImmutableMap.of(dataSource, 1L)); EasyMock.expect(druidServerConfig.getTier()).andReturn(tier).once(); EasyMock.expect(druidServerConfig.getPriority()).andReturn(priority).once(); From 3b176f595d798e559afbcf8b2dbb983ddb740375 Mon Sep 17 00:00:00 2001 From: Kashif Faraz Date: Fri, 21 Jul 2023 21:12:58 +0530 Subject: [PATCH 07/15] Remove redundant casts --- .../org/apache/druid/client/HttpServerInventoryView.java | 7 +++---- .../server/coordinator/loading/LoadQueuePeonTest.java | 9 ++++----- 2 files changed, 7 insertions(+), 9 deletions(-) diff --git a/server/src/main/java/org/apache/druid/client/HttpServerInventoryView.java b/server/src/main/java/org/apache/druid/client/HttpServerInventoryView.java index 2c30c1681325..6f5c3bdb1600 100644 --- a/server/src/main/java/org/apache/druid/client/HttpServerInventoryView.java +++ b/server/src/main/java/org/apache/druid/client/HttpServerInventoryView.java @@ -608,14 +608,13 @@ public void deltaSync(List changes) { for (DataSegmentChangeRequest request : changes) { if (request instanceof SegmentChangeRequestLoad) { - addSegment(((SegmentChangeRequestLoad) request).getSegment(), false); + addSegment(request.getSegment(), false); } else if (request instanceof SegmentChangeRequestDrop) { - removeSegment(((SegmentChangeRequestDrop) request).getSegment(), false); + removeSegment(request.getSegment(), false); } else { log.error( "Server[%s] gave a non load/drop dataSegmentChangeRequest[%s], Ignored.", - druidServer.getName(), - request + druidServer.getName(), request ); } } diff --git a/server/src/test/java/org/apache/druid/server/coordinator/loading/LoadQueuePeonTest.java b/server/src/test/java/org/apache/druid/server/coordinator/loading/LoadQueuePeonTest.java index aba1c9b6be6c..cc18817f5d4a 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/loading/LoadQueuePeonTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/loading/LoadQueuePeonTest.java @@ -36,7 +36,6 @@ import org.apache.druid.server.coordination.DataSegmentChangeCallback; import org.apache.druid.server.coordination.DataSegmentChangeHandler; import org.apache.druid.server.coordination.DataSegmentChangeRequest; -import org.apache.druid.server.coordination.SegmentChangeRequestDrop; import org.apache.druid.server.coordination.SegmentChangeRequestLoad; import org.apache.druid.server.coordinator.TestDruidCoordinatorConfig; import org.apache.druid.timeline.DataSegment; @@ -235,11 +234,11 @@ public void removeSegment(DataSegment segment, DataSegmentChangeCallback callbac ); Assert.assertEquals( segment, - ((SegmentChangeRequestDrop) jsonMapper.readValue( + jsonMapper.readValue( curator.getData() .decompressed() .forPath(dropRequestPath), DataSegmentChangeRequest.class - )).getSegment() + ).getSegment() ); // simulate completion of drop request by historical @@ -253,8 +252,8 @@ public void removeSegment(DataSegment segment, DataSegmentChangeCallback callbac Assert.assertNotNull(curator.checkExists().forPath(loadRequestPath)); Assert.assertEquals( segment, - ((SegmentChangeRequestLoad) jsonMapper - .readValue(curator.getData().decompressed().forPath(loadRequestPath), DataSegmentChangeRequest.class)) + jsonMapper + .readValue(curator.getData().decompressed().forPath(loadRequestPath), DataSegmentChangeRequest.class) .getSegment() ); From a7a98af8ecf3c6c014066cc92cb15d3c4bdfadae Mon Sep 17 00:00:00 2001 From: Kashif Faraz Date: Fri, 21 Jul 2023 21:33:36 +0530 Subject: [PATCH 08/15] Fix checkstyle --- .../AppenderatorDriverRealtimeIndexTaskTest.java | 2 +- .../coordination/SegmentLoadDropHandlerTest.java | 14 +++++++------- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java index 5fd8daaf8011..7a6df9a6986a 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java @@ -72,7 +72,6 @@ import org.apache.druid.indexing.overlord.TaskLockbox; import org.apache.druid.indexing.overlord.TaskStorage; import org.apache.druid.indexing.overlord.supervisor.SupervisorManager; -import org.apache.druid.server.coordination.TestDataSegmentAnnouncer; import org.apache.druid.indexing.test.TestDataSegmentKiller; import org.apache.druid.indexing.test.TestDataSegmentPusher; import org.apache.druid.jackson.DefaultObjectMapper; @@ -125,6 +124,7 @@ import org.apache.druid.server.DruidNode; import org.apache.druid.server.coordination.DataSegmentServerAnnouncer; import org.apache.druid.server.coordination.ServerType; +import org.apache.druid.server.coordination.TestDataSegmentAnnouncer; import org.apache.druid.server.security.AuthTestUtils; import org.apache.druid.testing.InitializedNullHandlingTest; import org.apache.druid.timeline.DataSegment; diff --git a/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerTest.java b/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerTest.java index d11e23e2e8a5..01d21b468a69 100644 --- a/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerTest.java +++ b/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerTest.java @@ -145,12 +145,13 @@ public List getLocations() } @Test - public void testLoadCancelsPendingDrop() throws Exception + public void testLoadCancelsPendingDropOfMissingSegment() throws Exception { segmentLoadDropHandler.start(); final DataSegment segment = makeSegment("test", "1", Intervals.of("P1d/2011-04-01")); + // Schedule a drop even though the segment is not loaded yet segmentLoadDropHandler.removeSegment(segment, DataSegmentChangeCallback.NOOP); Assert.assertFalse(announcer.isAnnounced(segment)); Assert.assertTrue(loadingExecutor.hasPendingTasks()); @@ -167,7 +168,7 @@ public void testLoadCancelsPendingDrop() throws Exception } @Test - public void testLoadCancelsPendingDrop2() throws Exception + public void testLoadCancelsPendingDrop() throws Exception { segmentLoadDropHandler.start(); @@ -379,10 +380,9 @@ public void testProcessBatchLoadDropLoadSequenceForSameSegment() throws Exceptio result = future.get(); Assert.assertEquals(DataSegmentChangeResponse.State.SUCCESS, result.get(0).getStatus().getState()); - // check invocations after a load-drop sequence + // Verify that 1 load and 1 drop has happened verifyLoadCalled(segmentManager, 1); - Mockito.verify(segmentManager, Mockito.times(1)) - .dropSegment(ArgumentMatchers.any()); + verifyDropCalled(segmentManager, 1); // Request 3: Reload the segment batch = ImmutableList.of(new SegmentChangeRequestLoad(segment1)); @@ -391,7 +391,7 @@ public void testProcessBatchLoadDropLoadSequenceForSameSegment() throws Exceptio result = future.get(); Assert.assertEquals(DataSegmentChangeResponse.State.SUCCESS, result.get(0).getStatus().getState()); - // check invocations - 1 more load has happened + // Verify that 1 more load has happened verifyLoadCalled(segmentManager, 2); verifyDropCalled(segmentManager, 1); @@ -402,7 +402,7 @@ public void testProcessBatchLoadDropLoadSequenceForSameSegment() throws Exceptio result = future.get(); Assert.assertEquals(DataSegmentChangeResponse.State.SUCCESS, result.get(0).getStatus().getState()); - // check invocations - the load segment counter should bump up + // Verify that 1 more load has happened verifyLoadCalled(segmentManager, 3); verifyDropCalled(segmentManager, 1); From 1e998c5eee17683698a7c4ccb50d05889fc840d2 Mon Sep 17 00:00:00 2001 From: Kashif Faraz Date: Fri, 21 Jul 2023 23:03:36 +0530 Subject: [PATCH 09/15] Some tests --- .../SegmentChangeRequestNoop.java | 4 +- .../SegmentChangeRequestNoopTest.java | 60 +++++++++++++++++++ 2 files changed, 63 insertions(+), 1 deletion(-) create mode 100644 server/src/test/java/org/apache/druid/server/coordination/SegmentChangeRequestNoopTest.java diff --git a/server/src/main/java/org/apache/druid/server/coordination/SegmentChangeRequestNoop.java b/server/src/main/java/org/apache/druid/server/coordination/SegmentChangeRequestNoop.java index 112e03e41299..3a25f602ebe2 100644 --- a/server/src/main/java/org/apache/druid/server/coordination/SegmentChangeRequestNoop.java +++ b/server/src/main/java/org/apache/druid/server/coordination/SegmentChangeRequestNoop.java @@ -19,6 +19,7 @@ package org.apache.druid.server.coordination; +import com.fasterxml.jackson.annotation.JsonIgnore; import org.apache.druid.timeline.DataSegment; import javax.annotation.Nullable; @@ -43,8 +44,9 @@ public String asString() } @Override + @JsonIgnore public DataSegment getSegment() { - return null; + throw new UnsupportedOperationException(); } } diff --git a/server/src/test/java/org/apache/druid/server/coordination/SegmentChangeRequestNoopTest.java b/server/src/test/java/org/apache/druid/server/coordination/SegmentChangeRequestNoopTest.java new file mode 100644 index 000000000000..550a2f111000 --- /dev/null +++ b/server/src/test/java/org/apache/druid/server/coordination/SegmentChangeRequestNoopTest.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server.coordination; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.druid.jackson.DefaultObjectMapper; +import org.apache.druid.java.util.common.jackson.JacksonUtils; +import org.junit.Assert; +import org.junit.Test; + +import java.util.Map; + +public class SegmentChangeRequestNoopTest +{ + private static final ObjectMapper MAPPER = new DefaultObjectMapper(); + + @Test + public void testSerdeOfNoopRequest() throws Exception + { + final SegmentChangeRequestNoop noopRequest = new SegmentChangeRequestNoop(); + final String json = MAPPER.writeValueAsString(noopRequest); + + Map objectMap = MAPPER.readValue( + json, + JacksonUtils.TYPE_REFERENCE_MAP_STRING_OBJECT + ); + Assert.assertEquals(1, objectMap.size()); + Assert.assertEquals("noop", objectMap.get("action")); + + DataSegmentChangeRequest deserialized = MAPPER.readValue(json, DataSegmentChangeRequest.class); + Assert.assertTrue(deserialized instanceof SegmentChangeRequestNoop); + } + + @Test + public void testGetSegmentThrowsUnsupportedException() + { + SegmentChangeRequestNoop noopRequest = new SegmentChangeRequestNoop(); + Assert.assertThrows( + UnsupportedOperationException.class, + noopRequest::getSegment + ); + } +} From 7f3d41d961d410a57453536ea6008dcff68cfbb6 Mon Sep 17 00:00:00 2001 From: Kashif Faraz Date: Mon, 24 Jul 2023 09:16:55 +0530 Subject: [PATCH 10/15] wip: temp changes --- .../server/coordination/SegmentLoadDropHandler.java | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java b/server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java index 8de1a326dbe4..b1ae5e057a19 100644 --- a/server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java +++ b/server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java @@ -373,7 +373,7 @@ void loadAndAnnounceSegment(DataSegment segment, @Nullable DataSegmentChangeCall log.info("Loading segment[%s]", segment.getId()); /* The lock below is used to prevent a race condition when the scheduled runnable in removeSegment() starts, - and if (segmentsToDelete.remove(segment)) returns true, in which case historical will start deleting segment + and if (segmentsToDrop.remove(segment)) returns true, in which case historical will start deleting segment files. At that point, it's possible that right after the "if" check, addSegment() is called and actually loads the segment, which makes dropping segment and downloading segment happen at the same time. */ @@ -656,13 +656,12 @@ private AtomicReference processRequest(DataSegmentCha } } + @GuardedBy("requestStatusesLock") private void markRequestAsPending(DataSegmentChangeRequest changeRequest) { - synchronized (requestStatusesLock) { - DataSegmentChangeResponse pendingResponse - = new DataSegmentChangeResponse(changeRequest, DataSegmentChangeResponse.Status.PENDING); - requestStatuses.put(changeRequest.getSegment(), new AtomicReference<>(pendingResponse)); - } + DataSegmentChangeResponse pendingResponse + = new DataSegmentChangeResponse(changeRequest, DataSegmentChangeResponse.Status.PENDING); + requestStatuses.put(changeRequest.getSegment(), new AtomicReference<>(pendingResponse)); } /** From 5a485732b0637aa0f01587085d45a5408475d324 Mon Sep 17 00:00:00 2001 From: Kashif Faraz Date: Mon, 24 Jul 2023 19:09:51 +0530 Subject: [PATCH 11/15] Remove redundant callbacks --- .../coordination/SegmentLoadDropHandler.java | 80 ++++++++----------- .../SegmentLoadDropHandlerCacheTest.java | 6 +- .../SegmentLoadDropHandlerTest.java | 6 +- 3 files changed, 38 insertions(+), 54 deletions(-) diff --git a/server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java b/server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java index b1ae5e057a19..1d74355c5480 100644 --- a/server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java +++ b/server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java @@ -248,7 +248,6 @@ public boolean isStarted() private void loadLocalCache() throws IOException { - final Stopwatch stopwatch = Stopwatch.createStarted(); File baseDir = config.getInfoDir(); FileUtils.mkdirp(baseDir); @@ -288,27 +287,17 @@ private void loadLocalCache() throws IOException .emit(); } - addSegments( - cachedSegments, - () -> log.info("Finished cache load in [%,d] ms", stopwatch.millisElapsed()) - ); - } - - private void loadSegment(DataSegment segment, DataSegmentChangeCallback callback, boolean lazy) - throws SegmentLoadingException - { - loadSegment(segment, callback, lazy, null); + loadCachedSegments(cachedSegments); } /** - * Load a single segment. If the segment is loaded successfully, this function simply returns. Otherwise it will - * throw a SegmentLoadingException + * Downloads a single segment and creates a cache file for it in the info dir. + * If the load fails at any step, {@link #cleanupFailedLoad} is called. * - * @throws SegmentLoadingException if it fails to load the given segment + * @throws SegmentLoadingException if the load fails */ private void loadSegment( DataSegment segment, - DataSegmentChangeCallback callback, boolean lazy, @Nullable ExecutorService loadSegmentIntoPageCacheExec ) throws SegmentLoadingException @@ -318,12 +307,12 @@ private void loadSegment( loaded = segmentManager.loadSegment( segment, lazy, - () -> unannounceAndDropSegment(segment, DataSegmentChangeCallback.NOOP), + () -> cleanupFailedLoad(segment), loadSegmentIntoPageCacheExec ); } catch (Exception e) { - unannounceAndDropSegment(segment, callback); + cleanupFailedLoad(segment); throw new SegmentLoadingException(e, "Could not load segment: %s", e.getMessage()); } @@ -334,7 +323,7 @@ private void loadSegment( jsonMapper.writeValue(segmentInfoCacheFile, segment); } catch (IOException e) { - unannounceAndDropSegment(segment, callback); + cleanupFailedLoad(segment); throw new SegmentLoadingException( e, "Failed to write to disk segment info cache file[%s]", @@ -359,14 +348,14 @@ public Map getRowCountDistributionPerDataso public void addSegment(DataSegment segment, @Nullable DataSegmentChangeCallback callback) { // Load and announce the segment asynchronously - exec.submit(() -> loadAndAnnounceSegment(segment, callback)); + exec.submit(() -> loadAndAnnounceSegment(segment)); } /** * Loads the segment synchronously, announces it and updates the status of the * corresponding change request in the {@link #requestStatuses} cache. */ - void loadAndAnnounceSegment(DataSegment segment, @Nullable DataSegmentChangeCallback callback) + void loadAndAnnounceSegment(DataSegment segment) { DataSegmentChangeResponse.Status result = null; try { @@ -388,7 +377,7 @@ each time when addSegment() is called, it has to wait for the lock in order to m segmentsToDrop.remove(segment); } } - loadSegment(segment, DataSegmentChangeCallback.NOOP, false); + loadSegment(segment, false, null); // announce segment even if the segment file already exists. try { announcer.announceSegment(segment); @@ -407,20 +396,17 @@ each time when addSegment() is called, it has to wait for the lock in order to m } finally { updateRequestStatus(new SegmentChangeRequestLoad(segment), result); - if (null != callback) { - callback.execute(); - } + resolveWaitingFutures(); } } /** * Bulk adding segments during bootstrap - * @param segments A collection of segments to add - * @param callback Segment loading callback */ - private void addSegments(Collection segments, final DataSegmentChangeCallback callback) + private void loadCachedSegments(Collection segments) { // Start a temporary thread pool to load segments into page cache during bootstrap + final Stopwatch stopwatch = Stopwatch.createStarted(); ExecutorService loadingExecutor = null; ExecutorService loadSegmentsIntoPageCacheOnBootstrapExec = config.getNumThreadsToLoadSegmentsIntoPageCacheOnBootstrap() != 0 ? @@ -443,11 +429,9 @@ private void addSegments(Collection segments, final DataSegmentChan try { log.info( "Loading segment[%d/%d][%s]", - counter.incrementAndGet(), - numSegments, - segment.getId() + counter.incrementAndGet(), numSegments, segment.getId() ); - loadSegment(segment, callback, config.isLazyLoadOnStart(), loadSegmentsIntoPageCacheOnBootstrapExec); + loadSegment(segment, config.isLazyLoadOnStart(), loadSegmentsIntoPageCacheOnBootstrapExec); try { backgroundSegmentAnnouncer.announceSegment(segment); } @@ -489,7 +473,7 @@ private void addSegments(Collection segments, final DataSegmentChan .emit(); } finally { - callback.execute(); + log.info("Finished cache load in [%,d]ms", stopwatch.millisElapsed()); if (loadingExecutor != null) { loadingExecutor.shutdownNow(); } @@ -502,15 +486,13 @@ private void addSegments(Collection segments, final DataSegmentChan } /** - * Unannounces and drops the segment immediately. + * Cleans up a failed LOAD request by completely removing the partially + * downloaded segment files and unannouncing the segment for safe measure. */ @VisibleForTesting - void unannounceAndDropSegment( - DataSegment segment, - DataSegmentChangeCallback callback - ) + void cleanupFailedLoad(DataSegment segment) { - unannounceSegment(segment, callback); + unannounceSegment(segment); segmentsToDrop.add(segment); dropSegment(segment); } @@ -518,7 +500,12 @@ void unannounceAndDropSegment( @Override public void removeSegment(DataSegment segment, @Nullable DataSegmentChangeCallback callback) { - unannounceSegment(segment, callback); + try { + unannounceSegment(segment); + } + finally { + resolveWaitingFutures(); + } // Schedule drop of segment segmentsToDrop.add(segment); @@ -538,10 +525,7 @@ public void removeSegment(DataSegment segment, @Nullable DataSegmentChangeCallba * A DROP request is considered successful if the unannouncement has succeeded, * even if the segment files have not been deleted yet. */ - private void unannounceSegment( - final DataSegment segment, - @Nullable final DataSegmentChangeCallback callback - ) + private void unannounceSegment(final DataSegment segment) { DataSegmentChangeResponse.Status result = null; try { @@ -556,9 +540,6 @@ private void unannounceSegment( } finally { updateRequestStatus(new SegmentChangeRequestDrop(segment), result); - if (null != callback) { - callback.execute(); - } } } @@ -635,7 +616,7 @@ private AtomicReference processRequest(DataSegmentCha if (cachedResponse == null) { // Start a fresh LOAD or DROP as there is no previous known request markRequestAsPending(changeRequest); - changeRequest.go(this, this::resolveWaitingFutures); + changeRequest.go(this, null); return requestStatuses.getIfPresent(segment); } else if (cachedResponse.get().getRequest().equals(changeRequest)) { // Serve the cached response and clear it if the request has completed, @@ -650,7 +631,7 @@ private AtomicReference processRequest(DataSegmentCha requestStatuses.invalidate(segment); markRequestAsPending(changeRequest); - changeRequest.go(this, this::resolveWaitingFutures); + changeRequest.go(this, null); return requestStatuses.getIfPresent(segment); } } @@ -682,6 +663,9 @@ private void updateRequestStatus(DataSegmentChangeRequest changeRequest, DataSeg } } + /** + * Resolves waiting futures after a LOAD or DROP request has completed. + */ private void resolveWaitingFutures() { LinkedHashSet waitingFuturesCopy; diff --git a/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerCacheTest.java b/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerCacheTest.java index 47205c4d3a43..7be2a739eaf7 100644 --- a/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerCacheTest.java +++ b/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerCacheTest.java @@ -141,13 +141,13 @@ public void testLoadLocalCache() throws IOException, SegmentLoadingException // make sure adding segments beyond allowed size fails Mockito.reset(segmentAnnouncer); DataSegment newSegment = makeSegment("test", "new-segment"); - loadDropHandler.loadAndAnnounceSegment(newSegment, null); + loadDropHandler.loadAndAnnounceSegment(newSegment); Mockito.verify(segmentAnnouncer, Mockito.never()).announceSegment(any()); Mockito.verify(segmentAnnouncer, Mockito.never()).announceSegments(any()); // clearing some segment should allow for new segments - loadDropHandler.unannounceAndDropSegment(expectedSegments.get(0), null); - loadDropHandler.loadAndAnnounceSegment(newSegment, null); + loadDropHandler.cleanupFailedLoad(expectedSegments.get(0)); + loadDropHandler.loadAndAnnounceSegment(newSegment); Mockito.verify(segmentAnnouncer).announceSegment(newSegment); } diff --git a/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerTest.java b/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerTest.java index 01d21b468a69..57292368af1b 100644 --- a/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerTest.java +++ b/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerTest.java @@ -156,7 +156,7 @@ public void testLoadCancelsPendingDropOfMissingSegment() throws Exception Assert.assertFalse(announcer.isAnnounced(segment)); Assert.assertTrue(loadingExecutor.hasPendingTasks()); - segmentLoadDropHandler.loadAndAnnounceSegment(segment, DataSegmentChangeCallback.NOOP); + segmentLoadDropHandler.loadAndAnnounceSegment(segment); // Try to complete pending drop of segment loadingExecutor.finishAllPendingTasks(); @@ -175,7 +175,7 @@ public void testLoadCancelsPendingDrop() throws Exception final String datasource = "test"; final DataSegment segment = makeSegment(datasource, "1", Intervals.of("P1d/2011-04-01")); - segmentLoadDropHandler.loadAndAnnounceSegment(segment, DataSegmentChangeCallback.NOOP); + segmentLoadDropHandler.loadAndAnnounceSegment(segment); Assert.assertTrue(announcer.isAnnounced(segment)); Assert.assertEquals(1, segmentManager.getDataSourceToNumSegments().get(datasource).intValue()); @@ -184,7 +184,7 @@ public void testLoadCancelsPendingDrop() throws Exception Assert.assertFalse(announcer.isAnnounced(segment)); Assert.assertTrue(loadingExecutor.hasPendingTasks()); - segmentLoadDropHandler.loadAndAnnounceSegment(segment, DataSegmentChangeCallback.NOOP); + segmentLoadDropHandler.loadAndAnnounceSegment(segment); // Try to complete pending drop of segment loadingExecutor.finishAllPendingTasks(); From 491ad1b8269ff58cc091b31e652bca2e10c18727 Mon Sep 17 00:00:00 2001 From: Kashif Faraz Date: Mon, 24 Jul 2023 19:15:33 +0530 Subject: [PATCH 12/15] Remove extra changes --- .../apache/druid/server/SegmentManager.java | 2 +- .../metrics/HistoricalMetricsMonitor.java | 2 +- .../druid/server/SegmentManagerTest.java | 2 +- .../SegmentChangeRequestNoopTest.java | 2 +- .../SegmentLoadDropHandlerTest.java | 18 +++++++++--------- .../metrics/HistoricalMetricsMonitorTest.java | 2 +- 6 files changed, 14 insertions(+), 14 deletions(-) diff --git a/server/src/main/java/org/apache/druid/server/SegmentManager.java b/server/src/main/java/org/apache/druid/server/SegmentManager.java index 4cb5d11f397d..6ce441b2ab1e 100644 --- a/server/src/main/java/org/apache/druid/server/SegmentManager.java +++ b/server/src/main/java/org/apache/druid/server/SegmentManager.java @@ -186,7 +186,7 @@ public Set getDataSourceNames() * * @return a map of dataSources and number of segments */ - public Map getDataSourceToNumSegments() + public Map getDataSourceCounts() { return CollectionUtils.mapValues(dataSources, SegmentManager.DataSourceState::getNumSegments); } diff --git a/server/src/main/java/org/apache/druid/server/metrics/HistoricalMetricsMonitor.java b/server/src/main/java/org/apache/druid/server/metrics/HistoricalMetricsMonitor.java index 1373f38b6e4c..30083cbc0911 100644 --- a/server/src/main/java/org/apache/druid/server/metrics/HistoricalMetricsMonitor.java +++ b/server/src/main/java/org/apache/druid/server/metrics/HistoricalMetricsMonitor.java @@ -90,7 +90,7 @@ public boolean doMonitor(ServiceEmitter emitter) emitter.emit(builder.build("segment/usedPercent", usedPercent)); } - for (Map.Entry entry : segmentManager.getDataSourceToNumSegments().entrySet()) { + for (Map.Entry entry : segmentManager.getDataSourceCounts().entrySet()) { String dataSource = entry.getKey(); long count = entry.getValue(); final ServiceMetricEvent.Builder builder = diff --git a/server/src/test/java/org/apache/druid/server/SegmentManagerTest.java b/server/src/test/java/org/apache/druid/server/SegmentManagerTest.java index f092ced614b6..65afb8ea7abf 100644 --- a/server/src/test/java/org/apache/druid/server/SegmentManagerTest.java +++ b/server/src/test/java/org/apache/druid/server/SegmentManagerTest.java @@ -446,7 +446,7 @@ private void assertResult(List expectedExistingSegments) throws Seg } Assert.assertEquals(expectedDataSourceNames, segmentManager.getDataSourceNames()); - Assert.assertEquals(expectedDataSourceCounts, segmentManager.getDataSourceToNumSegments()); + Assert.assertEquals(expectedDataSourceCounts, segmentManager.getDataSourceCounts()); Assert.assertEquals(expectedDataSourceSizes, segmentManager.getDataSourceSizes()); final Map dataSources = segmentManager.getDataSources(); diff --git a/server/src/test/java/org/apache/druid/server/coordination/SegmentChangeRequestNoopTest.java b/server/src/test/java/org/apache/druid/server/coordination/SegmentChangeRequestNoopTest.java index 550a2f111000..010dab1bedf5 100644 --- a/server/src/test/java/org/apache/druid/server/coordination/SegmentChangeRequestNoopTest.java +++ b/server/src/test/java/org/apache/druid/server/coordination/SegmentChangeRequestNoopTest.java @@ -32,7 +32,7 @@ public class SegmentChangeRequestNoopTest private static final ObjectMapper MAPPER = new DefaultObjectMapper(); @Test - public void testSerdeOfNoopRequest() throws Exception + public void testSerde() throws Exception { final SegmentChangeRequestNoop noopRequest = new SegmentChangeRequestNoop(); final String json = MAPPER.writeValueAsString(noopRequest); diff --git a/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerTest.java b/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerTest.java index 57292368af1b..7685cfb4f28f 100644 --- a/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerTest.java +++ b/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerTest.java @@ -177,7 +177,7 @@ public void testLoadCancelsPendingDrop() throws Exception segmentLoadDropHandler.loadAndAnnounceSegment(segment); Assert.assertTrue(announcer.isAnnounced(segment)); - Assert.assertEquals(1, segmentManager.getDataSourceToNumSegments().get(datasource).intValue()); + Assert.assertEquals(1, segmentManager.getDataSourceCounts().get(datasource).intValue()); // Unannounce segment and schedule a drop segmentLoadDropHandler.removeSegment(segment, DataSegmentChangeCallback.NOOP); @@ -221,12 +221,12 @@ public void testLoadCache() throws Exception } testStorageLocation.checkInfoCache(segments); - Assert.assertTrue(segmentManager.getDataSourceToNumSegments().isEmpty()); + Assert.assertTrue(segmentManager.getDataSourceCounts().isEmpty()); segmentLoadDropHandler.start(); - Assert.assertFalse(segmentManager.getDataSourceToNumSegments().isEmpty()); + Assert.assertFalse(segmentManager.getDataSourceCounts().isEmpty()); for (int i = 0; i < COUNT; ++i) { - Assert.assertEquals(11L, segmentManager.getDataSourceToNumSegments().get("test" + i).longValue()); - Assert.assertEquals(2L, segmentManager.getDataSourceToNumSegments().get("test_two" + i).longValue()); + Assert.assertEquals(11L, segmentManager.getDataSourceCounts().get("test" + i).longValue()); + Assert.assertEquals(2L, segmentManager.getDataSourceCounts().get("test_two" + i).longValue()); } Assert.assertEquals(13 * COUNT, announcer.getNumAnnouncedSegments()); segmentLoadDropHandler.stop(); @@ -271,13 +271,13 @@ public void testStartStop() throws Exception } testStorageLocation.checkInfoCache(segments); - Assert.assertTrue(segmentManager.getDataSourceToNumSegments().isEmpty()); + Assert.assertTrue(segmentManager.getDataSourceCounts().isEmpty()); segmentLoadDropHandler.start(); - Assert.assertFalse(segmentManager.getDataSourceToNumSegments().isEmpty()); + Assert.assertFalse(segmentManager.getDataSourceCounts().isEmpty()); for (int i = 0; i < COUNT; ++i) { - Assert.assertEquals(3L, segmentManager.getDataSourceToNumSegments().get("test" + i).longValue()); - Assert.assertEquals(2L, segmentManager.getDataSourceToNumSegments().get("test_two" + i).longValue()); + Assert.assertEquals(3L, segmentManager.getDataSourceCounts().get("test" + i).longValue()); + Assert.assertEquals(2L, segmentManager.getDataSourceCounts().get("test_two" + i).longValue()); } Assert.assertEquals(5 * COUNT, announcer.getNumAnnouncedSegments()); segmentLoadDropHandler.stop(); diff --git a/server/src/test/java/org/apache/druid/server/metrics/HistoricalMetricsMonitorTest.java b/server/src/test/java/org/apache/druid/server/metrics/HistoricalMetricsMonitorTest.java index b1f56496415b..8e658faafb97 100644 --- a/server/src/test/java/org/apache/druid/server/metrics/HistoricalMetricsMonitorTest.java +++ b/server/src/test/java/org/apache/druid/server/metrics/HistoricalMetricsMonitorTest.java @@ -88,7 +88,7 @@ public void testSimple() EasyMock.expect(druidServerConfig.getTier()).andReturn(tier).once(); EasyMock.expect(druidServerConfig.getPriority()).andReturn(priority).once(); EasyMock.expect(druidServerConfig.getMaxSize()).andReturn(maxSize).times(2); - EasyMock.expect(segmentManager.getDataSourceToNumSegments()).andReturn(ImmutableMap.of(dataSource, 1L)); + EasyMock.expect(segmentManager.getDataSourceCounts()).andReturn(ImmutableMap.of(dataSource, 1L)); EasyMock.expect(druidServerConfig.getTier()).andReturn(tier).once(); EasyMock.expect(druidServerConfig.getPriority()).andReturn(priority).once(); From a24f9f6915288df9f4e480d53afe19162c7a2443 Mon Sep 17 00:00:00 2001 From: Kashif Faraz Date: Tue, 25 Jul 2023 13:31:19 +0530 Subject: [PATCH 13/15] Handle concurrent load and drop of the same segment --- .../DataSegmentChangeResponse.java | 13 ++ .../coordination/SegmentLoadDropHandler.java | 133 ++++++++++----- .../loading/CacheTestSegmentLoader.java | 18 +- .../SegmentLoadDropHandlerCacheTest.java | 44 ++--- .../SegmentLoadDropHandlerTest.java | 157 +++++++++++------- .../simulate/BlockingExecutorService.java | 5 + 6 files changed, 247 insertions(+), 123 deletions(-) diff --git a/server/src/main/java/org/apache/druid/server/coordination/DataSegmentChangeResponse.java b/server/src/main/java/org/apache/druid/server/coordination/DataSegmentChangeResponse.java index 896aa1c1339c..50d622928705 100644 --- a/server/src/main/java/org/apache/druid/server/coordination/DataSegmentChangeResponse.java +++ b/server/src/main/java/org/apache/druid/server/coordination/DataSegmentChangeResponse.java @@ -20,6 +20,7 @@ package org.apache.druid.server.coordination; import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Preconditions; @@ -56,6 +57,18 @@ public Status getStatus() return status; } + @JsonIgnore + public boolean isComplete() + { + return getStatus().getState() != State.PENDING; + } + + @JsonIgnore + public boolean isLoadRequest() + { + return request instanceof SegmentChangeRequestLoad; + } + @Override public String toString() { diff --git a/server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java b/server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java index 1d74355c5480..22aac5cfb4fc 100644 --- a/server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java +++ b/server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java @@ -77,7 +77,25 @@ public class SegmentLoadDropHandler implements DataSegmentChangeHandler private static final EmittingLogger log = new EmittingLogger(SegmentLoadDropHandler.class); /** - * Synchronizes removals from {@link #segmentsToDrop}. + * Synchronizes addition and removal from {@link #segmentsToDrop} to ensure that + * concurrent Load and Drop of the same segment do not result in an invalid state. + *

+ * For a given segment, the latest request handled by {@link #processRequest} + * is the one which is honored. + *

+ * Possible cases: + *

    + *
  1. + * Load after Drop: + *
      + *
    • Drop started before Load is queued: Load must wait for Drop to finish.
    • + *
    • Drop started after Load is queued: There is nothing to do as Drop would not be + * processed anyway due to the segment not being present in {@link #segmentsToDrop}
    • + *
    + *
  2. + *
  3. Drop after Load: the Load must exit as soon as it realizes that the + * segment is now marked for Drop and should not be loaded or announced anymore.
  4. + *
*/ private final Object segmentDropLock = new Object(); @@ -110,13 +128,15 @@ public class SegmentLoadDropHandler implements DataSegmentChangeHandler *
  • Once the load/drop finishes, the entry is updated to either SUCCESS or FAILED.
  • *
  • A duplicate request received at this point is immediately answered with * SUCCESS or FAILED and the entry is removed from the cache.
  • - *
  • If the first request itself finishes after the load or drop has already + *
  • If the original request itself finishes after the load or drop has already * completed, it is answered with a SUCCESS or FAILED and the entry is removed * from the cache.
  • + *
  • If a request of a different type (e.g. load after drop) is received, + * the entry from the cache is removed and the previous request is abandoned.
  • * *

    * Maximum size of this cache must be significantly greater than the number of - * pending load/drop requests. This is generally the case because the + * pending load/drop requests. This is generally already the case because the * Coordinator sends load/drop requests in small batches and does not send new * requests until the previously submitted ones have either succeeded or failed. *

    @@ -347,6 +367,11 @@ public Map getRowCountDistributionPerDataso @Override public void addSegment(DataSegment segment, @Nullable DataSegmentChangeCallback callback) { + // Unmark the segment for drop + synchronized (segmentDropLock) { + segmentsToDrop.remove(segment); + } + // Load and announce the segment asynchronously exec.submit(() -> loadAndAnnounceSegment(segment)); } @@ -355,35 +380,32 @@ public void addSegment(DataSegment segment, @Nullable DataSegmentChangeCallback * Loads the segment synchronously, announces it and updates the status of the * corresponding change request in the {@link #requestStatuses} cache. */ - void loadAndAnnounceSegment(DataSegment segment) + private void loadAndAnnounceSegment(DataSegment segment) { DataSegmentChangeResponse.Status result = null; try { log.info("Loading segment[%s]", segment.getId()); - /* - The lock below is used to prevent a race condition when the scheduled runnable in removeSegment() starts, - and if (segmentsToDrop.remove(segment)) returns true, in which case historical will start deleting segment - files. At that point, it's possible that right after the "if" check, addSegment() is called and actually loads - the segment, which makes dropping segment and downloading segment happen at the same time. - */ - if (segmentsToDrop.contains(segment)) { - /* - Both contains(segment) and remove(segment) can be moved inside the synchronized block. However, in that case, - each time when addSegment() is called, it has to wait for the lock in order to make progress, which will make - things slow. Given that in most cases segmentsToDelete.contains(segment) returns false, it will save a lot of - cost of acquiring lock by doing the "contains" check outside the synchronized block. - */ - synchronized (segmentDropLock) { - segmentsToDrop.remove(segment); + + // Do not start with Load if there is a Drop in progress. This prevents a Drop + // that has come before Load of the same segment from causing partial success. + synchronized (segmentDropLock) { + if (!shouldLoadSegment(segment)) { + return; } } - loadSegment(segment, false, null); - // announce segment even if the segment file already exists. - try { - announcer.announceSegment(segment); + + // Do not load segment if it has already been marked for drop + if (shouldLoadSegment(segment)) { + loadSegment(segment, false, null); + } else { + return; } - catch (IOException e) { - throw new SegmentLoadingException(e, "Failed to announce segment[%s]", segment.getId()); + + // Do not announce segment if it has already been marked for drop + if (shouldLoadSegment(segment)) { + announceSegment(segment); + } else { + return; } result = DataSegmentChangeResponse.Status.SUCCESS; @@ -400,6 +422,20 @@ each time when addSegment() is called, it has to wait for the lock in order to m } } + /** + * Announces the given segment, regardless of whether the segment files already + * existed or have been freshly downloaded. + */ + private void announceSegment(DataSegment segment) throws SegmentLoadingException + { + try { + announcer.announceSegment(segment); + } + catch (IOException e) { + throw new SegmentLoadingException(e, "Failed to announce segment[%s]", segment.getId()); + } + } + /** * Bulk adding segments during bootstrap */ @@ -493,22 +529,24 @@ private void loadCachedSegments(Collection segments) void cleanupFailedLoad(DataSegment segment) { unannounceSegment(segment); - segmentsToDrop.add(segment); + synchronized (segmentDropLock) { + segmentsToDrop.add(segment); + } dropSegment(segment); } @Override public void removeSegment(DataSegment segment, @Nullable DataSegmentChangeCallback callback) { - try { - unannounceSegment(segment); - } - finally { - resolveWaitingFutures(); + // Mark the segment for drop + synchronized (segmentDropLock) { + segmentsToDrop.add(segment); } + unannounceSegment(segment); + resolveWaitingFutures(); + // Schedule drop of segment - segmentsToDrop.add(segment); log.info( "Completely removing segment[%s] in [%,d] millis.", segment.getId(), config.getDropSegmentDelayMillis() @@ -573,7 +611,11 @@ public Collection getPendingDeleteSnapshot() } /** + * Processes a batch of segment load/drop requests. * + * @return Future of List of results of each change request in the batch. + * This future completes as soon as any pending request is completed by + * this {@code SegmentLoadDropHandler}. */ public ListenableFuture> processBatch( List changeRequests @@ -586,13 +628,13 @@ public ListenableFuture> processBatch( for (DataSegmentChangeRequest cr : changeRequests) { AtomicReference status = processRequest(cr); - if (status.get().getStatus().getState() != DataSegmentChangeResponse.State.PENDING) { + if (status.get().isComplete()) { isAnyRequestDone = true; } statuses.put(cr, status); } - final CustomSettableFuture future = new CustomSettableFuture(waitingFutures, statuses); + final CustomSettableFuture future = new CustomSettableFuture(statuses); if (isAnyRequestDone) { future.resolve(); } else { @@ -621,13 +663,12 @@ private AtomicReference processRequest(DataSegmentCha } else if (cachedResponse.get().getRequest().equals(changeRequest)) { // Serve the cached response and clear it if the request has completed, // so that we don't keep serving a stale response indefinitely - if (cachedResponse.get().getStatus().getState() != DataSegmentChangeResponse.State.PENDING) { + if (cachedResponse.get().isComplete()) { requestStatuses.invalidate(segment); } return cachedResponse; } else { // Clear the cached response as this is a different request - // TODO: what if the previous one was pending?? requestStatuses.invalidate(segment); markRequestAsPending(changeRequest); @@ -645,6 +686,21 @@ private void markRequestAsPending(DataSegmentChangeRequest changeRequest) requestStatuses.put(changeRequest.getSegment(), new AtomicReference<>(pendingResponse)); } + /** + * Returns true only if there is a Load request in {@link #requestStatuses} for + * this segment, and the segment is not present in {@link #segmentsToDrop}. + */ + private boolean shouldLoadSegment(DataSegment segment) { + if (segmentsToDrop.contains(segment)) { + return false; + } + + synchronized (requestStatusesLock) { + AtomicReference response = requestStatuses.getIfPresent(segment); + return response != null && response.get().isLoadRequest(); + } + } + /** * Updates the status for the given request only if this request is currently * in progress and present in {@link #requestStatuses}. @@ -833,15 +889,12 @@ public void close() */ private class CustomSettableFuture extends AbstractFuture> { - private final LinkedHashSet waitingFutures; private final Map> resultRefs; private CustomSettableFuture( - LinkedHashSet waitingFutures, Map> resultRefs ) { - this.waitingFutures = waitingFutures; this.resultRefs = resultRefs; } @@ -860,7 +913,7 @@ public void resolve() results.add(result); // Remove complete statuses from the cache - if (result != null && result.getStatus().getState() != DataSegmentChangeResponse.State.PENDING) { + if (result != null && result.isComplete()) { requestStatuses.invalidate(request.getSegment()); } }); diff --git a/server/src/test/java/org/apache/druid/segment/loading/CacheTestSegmentLoader.java b/server/src/test/java/org/apache/druid/segment/loading/CacheTestSegmentLoader.java index f80688276c3e..3269425eea30 100644 --- a/server/src/test/java/org/apache/druid/segment/loading/CacheTestSegmentLoader.java +++ b/server/src/test/java/org/apache/druid/segment/loading/CacheTestSegmentLoader.java @@ -39,6 +39,8 @@ import org.joda.time.Interval; import javax.annotation.Nullable; +import java.util.HashSet; +import java.util.Set; import java.util.concurrent.ExecutorService; /** @@ -46,6 +48,9 @@ public class CacheTestSegmentLoader implements SegmentLoader { + private final Set removedSegments = new HashSet<>(); + private final Set loadedSegments = new HashSet<>(); + @Override public ReferenceCountingSegment getSegment(final DataSegment segment, boolean lazy, SegmentLazyLoadFailCallback SegmentLazyLoadFailCallback) { @@ -175,12 +180,23 @@ public void close() @Override public void loadSegmentIntoPageCache(DataSegment segment, ExecutorService exec) { - + loadedSegments.add(segment); } @Override public void cleanup(DataSegment segment) { + removedSegments.add(segment); + } + + public Set getLoadedSegments() + { + return loadedSegments; + } + public Set getRemovedSegments() + { + return removedSegments; } + } diff --git a/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerCacheTest.java b/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerCacheTest.java index 7be2a739eaf7..403df656f200 100644 --- a/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerCacheTest.java +++ b/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerCacheTest.java @@ -43,6 +43,8 @@ import org.apache.druid.segment.loading.SegmentLocalCacheManager; import org.apache.druid.segment.loading.SegmentizerFactory; import org.apache.druid.server.SegmentManager; +import org.apache.druid.server.coordinator.simulate.BlockingExecutorService; +import org.apache.druid.server.coordinator.simulate.WrappingScheduledExecutorService; import org.apache.druid.server.metrics.NoopServiceEmitter; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.partition.NoneShardSpec; @@ -51,7 +53,6 @@ import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; -import org.mockito.ArgumentCaptor; import org.mockito.Mockito; import java.io.File; @@ -60,12 +61,9 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; -import java.util.Comparator; import java.util.List; import java.util.concurrent.ThreadLocalRandom; -import static org.mockito.ArgumentMatchers.any; - /** * This class includes tests that cover the storage location layer as well. */ @@ -76,9 +74,9 @@ public class SegmentLoadDropHandlerCacheTest @Rule public TemporaryFolder temporaryFolder = new TemporaryFolder(); private SegmentLoadDropHandler loadDropHandler; + private BlockingExecutorService loadingExecutor; private TestStorageLocation storageLoc; - private ObjectMapper objectMapper; - private DataSegmentAnnouncer segmentAnnouncer; + private TestDataSegmentAnnouncer segmentAnnouncer; @Before public void setup() throws IOException @@ -87,7 +85,7 @@ public void setup() throws IOException SegmentLoaderConfig config = new SegmentLoaderConfig() .withLocations(Collections.singletonList(storageLoc.toStorageLocationConfig(MAX_SIZE, null))) .withInfoDir(storageLoc.getInfoDir()); - objectMapper = TestHelper.makeJsonMapper(); + final ObjectMapper objectMapper = TestHelper.makeJsonMapper(); objectMapper.registerSubtypes(TestLoadSpec.class); objectMapper.registerSubtypes(TestSegmentizerFactory.class); SegmentCacheManager cacheManager = new SegmentLocalCacheManager(config, objectMapper); @@ -96,7 +94,8 @@ public void setup() throws IOException TestIndex.INDEX_IO, objectMapper )); - segmentAnnouncer = Mockito.mock(DataSegmentAnnouncer.class); + segmentAnnouncer = new TestDataSegmentAnnouncer(); + loadingExecutor = new BlockingExecutorService("test-LoadDropHandler"); loadDropHandler = new SegmentLoadDropHandler( objectMapper, config, @@ -104,6 +103,7 @@ public void setup() throws IOException Mockito.mock(DataSegmentServerAnnouncer.class), segmentManager, cacheManager, + new WrappingScheduledExecutorService("test-LoadDropHandler", loadingExecutor, false), new ServerTypeConfig(ServerType.HISTORICAL) ); EmittingLogger.registerEmitter(new NoopServiceEmitter()); @@ -127,28 +127,28 @@ public void testLoadLocalCache() throws IOException, SegmentLoadingException expectedSegments.add(segment); } - // Start the load drop handler loadDropHandler.start(); // Verify the expected announcements - ArgumentCaptor> argCaptor = ArgumentCaptor.forClass(Iterable.class); - Mockito.verify(segmentAnnouncer).announceSegments(argCaptor.capture()); - List announcedSegments = new ArrayList<>(); - argCaptor.getValue().forEach(announcedSegments::add); - announcedSegments.sort(Comparator.comparing(DataSegment::getVersion)); - Assert.assertEquals(expectedSegments, announcedSegments); + Assert.assertEquals(expectedSegments.size(), segmentAnnouncer.getNumAnnouncedSegments()); + for (DataSegment segment : expectedSegments) { + Assert.assertTrue(segmentAnnouncer.isAnnounced(segment)); + } // make sure adding segments beyond allowed size fails - Mockito.reset(segmentAnnouncer); - DataSegment newSegment = makeSegment("test", "new-segment"); - loadDropHandler.loadAndAnnounceSegment(newSegment); - Mockito.verify(segmentAnnouncer, Mockito.never()).announceSegment(any()); - Mockito.verify(segmentAnnouncer, Mockito.never()).announceSegments(any()); + final DataSegment newSegment = makeSegment("test", "new-segment"); + final List requestBatch = Collections.singletonList( + new SegmentChangeRequestLoad(newSegment) + ); + loadDropHandler.processBatch(requestBatch); + loadingExecutor.finishAllPendingTasks(); + Assert.assertFalse(segmentAnnouncer.isAnnounced(newSegment)); // clearing some segment should allow for new segments loadDropHandler.cleanupFailedLoad(expectedSegments.get(0)); - loadDropHandler.loadAndAnnounceSegment(newSegment); - Mockito.verify(segmentAnnouncer).announceSegment(newSegment); + loadDropHandler.processBatch(requestBatch); + loadingExecutor.finishAllPendingTasks(); + Assert.assertTrue(segmentAnnouncer.isAnnounced(newSegment)); } private DataSegment makeSegment(String dataSource, String name) diff --git a/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerTest.java b/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerTest.java index 7685cfb4f28f..27fe829a3732 100644 --- a/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerTest.java +++ b/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerTest.java @@ -74,10 +74,10 @@ public class SegmentLoadDropHandlerTest private SegmentLoadDropHandler segmentLoadDropHandler; private TestDataSegmentAnnouncer announcer; + private CacheTestSegmentLoader segmentLoader; + private File infoDir; private TestStorageLocation testStorageLocation; - private SegmentCacheManager segmentCacheManager; - private Set segmentsRemovedFromCache; private SegmentManager segmentManager; private SegmentLoaderConfig segmentLoaderConfig; private BlockingExecutorService loadingExecutor; @@ -105,24 +105,8 @@ public void setUp() throws IOException testStorageLocation.toStorageLocationConfig() ); - segmentsRemovedFromCache = new HashSet<>(); - segmentCacheManager = new NoopSegmentCacheManager() - { - @Override - public boolean isSegmentCached(DataSegment segment) - { - Map loadSpec = segment.getLoadSpec(); - return new File(MapUtils.getString(loadSpec, "cacheDir")).exists(); - } - - @Override - public void cleanup(DataSegment segment) - { - segmentsRemovedFromCache.add(segment); - } - }; - - segmentManager = new SegmentManager(new CacheTestSegmentLoader()); + segmentLoader = new CacheTestSegmentLoader(); + segmentManager = new SegmentManager(segmentLoader); announcer = new TestDataSegmentAnnouncer(); segmentLoaderConfig = new SegmentLoaderConfig() @@ -151,18 +135,23 @@ public void testLoadCancelsPendingDropOfMissingSegment() throws Exception final DataSegment segment = makeSegment("test", "1", Intervals.of("P1d/2011-04-01")); - // Schedule a drop even though the segment is not loaded yet - segmentLoadDropHandler.removeSegment(segment, DataSegmentChangeCallback.NOOP); + // Queue a drop even though the segment is not loaded yet + segmentLoadDropHandler.processBatch(dropRequest(segment)); Assert.assertFalse(announcer.isAnnounced(segment)); - Assert.assertTrue(loadingExecutor.hasPendingTasks()); + Assert.assertEquals(1, loadingExecutor.numPendingTasks()); - segmentLoadDropHandler.loadAndAnnounceSegment(segment); + // Queue a load of the segment + segmentLoadDropHandler.processBatch(loadRequest(segment)); + Assert.assertFalse(announcer.isAnnounced(segment)); + Assert.assertEquals(2, loadingExecutor.numPendingTasks()); - // Try to complete pending drop of segment + // Try to complete both the pending drop and load loadingExecutor.finishAllPendingTasks(); + // Verify that the segment is loaded and the drop never happens Assert.assertTrue(announcer.isAnnounced(segment)); - Assert.assertFalse(segmentsRemovedFromCache.contains(segment)); + Assert.assertTrue(segmentLoader.getLoadedSegments().contains(segment)); + Assert.assertFalse(segmentLoader.getRemovedSegments().contains(segment)); segmentLoadDropHandler.stop(); } @@ -175,23 +164,58 @@ public void testLoadCancelsPendingDrop() throws Exception final String datasource = "test"; final DataSegment segment = makeSegment(datasource, "1", Intervals.of("P1d/2011-04-01")); - segmentLoadDropHandler.loadAndAnnounceSegment(segment); + // Load the segment + segmentLoadDropHandler.processBatch(loadRequest(segment)); + loadingExecutor.finishNextPendingTask(); Assert.assertTrue(announcer.isAnnounced(segment)); Assert.assertEquals(1, segmentManager.getDataSourceCounts().get(datasource).intValue()); - // Unannounce segment and schedule a drop - segmentLoadDropHandler.removeSegment(segment, DataSegmentChangeCallback.NOOP); + // Queue a drop of the segment but do not process it + segmentLoadDropHandler.processBatch(dropRequest(segment)); Assert.assertFalse(announcer.isAnnounced(segment)); - Assert.assertTrue(loadingExecutor.hasPendingTasks()); + Assert.assertEquals(1, loadingExecutor.numPendingTasks()); - segmentLoadDropHandler.loadAndAnnounceSegment(segment); + // Queue a load of the segment + segmentLoadDropHandler.processBatch(loadRequest(segment)); + Assert.assertFalse(announcer.isAnnounced(segment)); + Assert.assertEquals(2, loadingExecutor.numPendingTasks()); - // Try to complete pending drop of segment + // Try to complete both the pending drop and load loadingExecutor.finishAllPendingTasks(); - // Verify that segment is still loaded + // Verify that the segment is loaded and the drop never happens Assert.assertTrue(announcer.isAnnounced(segment)); - Assert.assertFalse(segmentsRemovedFromCache.contains(segment)); + Assert.assertTrue(segmentLoader.getLoadedSegments().contains(segment)); + Assert.assertFalse(segmentLoader.getRemovedSegments().contains(segment)); + + segmentLoadDropHandler.stop(); + } + + @Test + public void testDropCancelsPendingLoad() throws IOException + { + segmentLoadDropHandler.start(); + + final String datasource = "test"; + final DataSegment segment = makeSegment(datasource, "1", Intervals.of("P1d/2011-04-01")); + + // Queue a load of the segment but do not process it + segmentLoadDropHandler.processBatch(loadRequest(segment)); + Assert.assertFalse(announcer.isAnnounced(segment)); + Assert.assertEquals(1, loadingExecutor.numPendingTasks()); + + // Queue a drop of the segment + segmentLoadDropHandler.processBatch(dropRequest(segment)); + Assert.assertFalse(announcer.isAnnounced(segment)); + Assert.assertEquals(2, loadingExecutor.numPendingTasks()); + + // Try to complete the drop first and then the load + loadingExecutor.finishAllPendingTasks(); + + // Verify that the segment is unannounced and the load never happens + Assert.assertFalse(announcer.isAnnounced(segment)); + Assert.assertFalse(segmentLoader.getLoadedSegments().contains(segment)); + Assert.assertTrue(segmentLoader.getRemovedSegments().contains(segment)); segmentLoadDropHandler.stop(); } @@ -303,21 +327,26 @@ public void testProcessBatch() throws Exception new SegmentChangeRequestDrop(segment2) ); - ListenableFuture> future = segmentLoadDropHandler - .processBatch(batch); + ListenableFuture> future + = segmentLoadDropHandler.processBatch(batch); Map expectedStatusMap = new HashMap<>(); expectedStatusMap.put(batch.get(0), DataSegmentChangeResponse.Status.PENDING); expectedStatusMap.put(batch.get(1), DataSegmentChangeResponse.Status.SUCCESS); List result = future.get(); - for (DataSegmentChangeResponse requestAndStatus : result) { - Assert.assertEquals(expectedStatusMap.get(requestAndStatus.getRequest()), requestAndStatus.getStatus()); + for (DataSegmentChangeResponse response : result) { + Assert.assertEquals( + expectedStatusMap.get(response.getRequest()), + response.getStatus() + ); } loadingExecutor.finishAllPendingTasks(); - result = segmentLoadDropHandler.processBatch(ImmutableList.of(new SegmentChangeRequestLoad(segment1))).get(); + result = segmentLoadDropHandler.processBatch(loadRequest(segment1)).get(); Assert.assertEquals(DataSegmentChangeResponse.Status.SUCCESS, result.get(0).getStatus()); + Assert.assertTrue(segmentLoader.getLoadedSegments().contains(segment1)); + Assert.assertTrue(segmentLoader.getRemovedSegments().contains(segment2)); segmentLoadDropHandler.stop(); } @@ -335,16 +364,14 @@ public void testProcessBatchDuplicateLoadRequestsWhenFirstRequestFailsSecondRequ final DataSegment segment1 = makeSegment("batchtest1", "1", Intervals.of("P1d/2011-04-01")); - List batch = ImmutableList.of(new SegmentChangeRequestLoad(segment1)); - - ListenableFuture> future = segmentLoadDropHandler - .processBatch(batch); + ListenableFuture> future + = segmentLoadDropHandler.processBatch(loadRequest(segment1)); loadingExecutor.finishAllPendingTasks(); List result = future.get(); Assert.assertEquals(DataSegmentChangeResponse.State.FAILED, result.get(0).getStatus().getState()); - future = segmentLoadDropHandler.processBatch(batch); + future = segmentLoadDropHandler.processBatch(loadRequest(segment1)); loadingExecutor.finishAllPendingTasks(); result = future.get(); Assert.assertEquals(DataSegmentChangeResponse.State.SUCCESS, result.get(0).getStatus().getState()); @@ -364,18 +391,15 @@ public void testProcessBatchLoadDropLoadSequenceForSameSegment() throws Exceptio final DataSegment segment1 = makeSegment("batchtest1", "1", Intervals.of("P1d/2011-04-01")); - List batch = ImmutableList.of(new SegmentChangeRequestLoad(segment1)); - // Request 1: Load the segment - ListenableFuture> future = segmentLoadDropHandler - .processBatch(batch); + ListenableFuture> future + = segmentLoadDropHandler.processBatch(loadRequest(segment1)); loadingExecutor.finishAllPendingTasks(); List result = future.get(); Assert.assertEquals(DataSegmentChangeResponse.State.SUCCESS, result.get(0).getStatus().getState()); // Request 2: Drop the segment - batch = ImmutableList.of(new SegmentChangeRequestDrop(segment1)); - future = segmentLoadDropHandler.processBatch(batch); + future = segmentLoadDropHandler.processBatch(dropRequest(segment1)); loadingExecutor.finishAllPendingTasks(); result = future.get(); Assert.assertEquals(DataSegmentChangeResponse.State.SUCCESS, result.get(0).getStatus().getState()); @@ -385,8 +409,7 @@ public void testProcessBatchLoadDropLoadSequenceForSameSegment() throws Exceptio verifyDropCalled(segmentManager, 1); // Request 3: Reload the segment - batch = ImmutableList.of(new SegmentChangeRequestLoad(segment1)); - future = segmentLoadDropHandler.processBatch(batch); + future = segmentLoadDropHandler.processBatch(loadRequest(segment1)); loadingExecutor.finishAllPendingTasks(); result = future.get(); Assert.assertEquals(DataSegmentChangeResponse.State.SUCCESS, result.get(0).getStatus().getState()); @@ -396,8 +419,7 @@ public void testProcessBatchLoadDropLoadSequenceForSameSegment() throws Exceptio verifyDropCalled(segmentManager, 1); // Request 4: Try to reload the segment - segment is loaded again - batch = ImmutableList.of(new SegmentChangeRequestLoad(segment1)); - future = segmentLoadDropHandler.processBatch(batch); + future = segmentLoadDropHandler.processBatch(loadRequest(segment1)); loadingExecutor.finishAllPendingTasks(); result = future.get(); Assert.assertEquals(DataSegmentChangeResponse.State.SUCCESS, result.get(0).getStatus().getState()); @@ -419,9 +441,8 @@ public void testLoadIsNotRetriedIfFailureIsCached() throws Exception segmentLoadDropHandler.start(); // Send a load request to the handler - ListenableFuture> future = segmentLoadDropHandler.processBatch( - Collections.singletonList(new SegmentChangeRequestLoad(segment)) - ); + ListenableFuture> future + = segmentLoadDropHandler.processBatch(loadRequest(segment)); Assert.assertFalse(future.isDone()); // Cancel the future so that it is never resolved and the response remains cached @@ -432,9 +453,7 @@ public void testLoadIsNotRetriedIfFailureIsCached() throws Exception loadingExecutor.finishNextPendingTask(); // Verify that next load request completes immediately with a failed response - future = segmentLoadDropHandler.processBatch( - Collections.singletonList(new SegmentChangeRequestLoad(segment)) - ); + future = segmentLoadDropHandler.processBatch(loadRequest(segment)); Assert.assertTrue(future.isDone()); DataSegmentChangeResponse response = future.get().get(0); @@ -447,6 +466,15 @@ public void testLoadIsNotRetriedIfFailureIsCached() throws Exception private SegmentLoadDropHandler initHandler(SegmentManager manager) { + SegmentCacheManager segmentCacheManager = new NoopSegmentCacheManager() + { + @Override + public boolean isSegmentCached(DataSegment segment) + { + Map loadSpec = segment.getLoadSpec(); + return new File(MapUtils.getString(loadSpec, "cacheDir")).exists(); + } + }; return new SegmentLoadDropHandler( jsonMapper, segmentLoaderConfig, @@ -486,4 +514,13 @@ private void verifyDropCalled(SegmentManager manager, int times) Mockito.verify(manager, Mockito.times(times)).dropSegment(ArgumentMatchers.any()); } + private static List loadRequest(DataSegment segment) + { + return Collections.singletonList(new SegmentChangeRequestLoad(segment)); + } + + private static List dropRequest(DataSegment segment) + { + return Collections.singletonList(new SegmentChangeRequestDrop(segment)); + } } diff --git a/server/src/test/java/org/apache/druid/server/coordinator/simulate/BlockingExecutorService.java b/server/src/test/java/org/apache/druid/server/coordinator/simulate/BlockingExecutorService.java index 111d88bf5500..309eb34ac308 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/simulate/BlockingExecutorService.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/simulate/BlockingExecutorService.java @@ -61,6 +61,11 @@ public boolean hasPendingTasks() return !taskQueue.isEmpty(); } + public int numPendingTasks() + { + return taskQueue.size(); + } + /** * Executes the next pending task on the calling thread itself. */ From c3fe4cf58c993cb1950582643102d34c6f43a9f5 Mon Sep 17 00:00:00 2001 From: Kashif Faraz Date: Tue, 25 Jul 2023 13:45:22 +0530 Subject: [PATCH 14/15] Fix checkstyle --- .../druid/server/coordination/SegmentLoadDropHandler.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java b/server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java index 22aac5cfb4fc..8fb1141a57bf 100644 --- a/server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java +++ b/server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java @@ -690,7 +690,8 @@ private void markRequestAsPending(DataSegmentChangeRequest changeRequest) * Returns true only if there is a Load request in {@link #requestStatuses} for * this segment, and the segment is not present in {@link #segmentsToDrop}. */ - private boolean shouldLoadSegment(DataSegment segment) { + private boolean shouldLoadSegment(DataSegment segment) + { if (segmentsToDrop.contains(segment)) { return false; } From 6bc061c19bcf6075bcdd215e76207acc267d1931 Mon Sep 17 00:00:00 2001 From: Kashif Faraz Date: Tue, 25 Jul 2023 16:53:50 +0530 Subject: [PATCH 15/15] Add dummy use of parameter to fix intellij inspections --- .../coordinator/simulate/CoordinatorSimulationBuilder.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBuilder.java b/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBuilder.java index 43f7bd9872ab..300d96679bf7 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBuilder.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBuilder.java @@ -31,6 +31,7 @@ import org.apache.druid.java.util.common.concurrent.DirectExecutorService; import org.apache.druid.java.util.common.concurrent.ScheduledExecutorFactory; import org.apache.druid.java.util.common.lifecycle.Lifecycle; +import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.java.util.http.client.HttpClient; import org.apache.druid.java.util.metrics.MetricsVerifier; @@ -73,6 +74,8 @@ */ public class CoordinatorSimulationBuilder { + private static final Logger log = new Logger(CoordinatorSimulationBuilder.class); + private static final long DEFAULT_COORDINATOR_PERIOD = 100L; private static final ObjectMapper OBJECT_MAPPER = new DefaultObjectMapper() .setInjectableValues( @@ -562,6 +565,7 @@ private ExecutorFactory(boolean directExecution) @Override public ScheduledExecutorService create(int corePoolSize, String nameFormat) { + log.trace("Dummy use of corePoolSize[%d] to avoid intellij inspection failures", corePoolSize); boolean isCoordinatorRunner = COORDINATOR_RUNNER.equals(nameFormat); // Coordinator running executor must always be blocked