From 37c8a438b6d6c69ae1c12c054ebd172c575799c0 Mon Sep 17 00:00:00 2001 From: Amatya Date: Wed, 17 Jan 2024 11:43:11 +0530 Subject: [PATCH 1/9] Verify action segmentListUsed with retrieveSegmentsToReplace --- .../appenderator/ActionBasedUsedSegmentChecker.java | 5 ++--- .../indexing/common/task/AbstractBatchIndexTask.java | 5 ++--- .../druid/indexing/common/task/CompactionTask.java | 7 +++---- .../common/task/batch/parallel/TombstoneHelper.java | 9 +++------ .../OverlordActionBasedUsedSegmentsRetriever.java | 4 ++-- .../ActionBasedUsedSegmentCheckerTest.java | 11 ++++------- 6 files changed, 16 insertions(+), 25 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/appenderator/ActionBasedUsedSegmentChecker.java b/indexing-service/src/main/java/org/apache/druid/indexing/appenderator/ActionBasedUsedSegmentChecker.java index 984058895e3c..b4d1944ce681 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/appenderator/ActionBasedUsedSegmentChecker.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/appenderator/ActionBasedUsedSegmentChecker.java @@ -20,9 +20,8 @@ package org.apache.druid.indexing.appenderator; import com.google.common.collect.Iterables; -import org.apache.druid.indexing.common.actions.RetrieveUsedSegmentsAction; +import org.apache.druid.indexing.common.actions.RetrieveSegmentsToReplaceAction; import org.apache.druid.indexing.common.actions.TaskActionClient; -import org.apache.druid.indexing.overlord.Segments; import org.apache.druid.java.util.common.JodaUtils; import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec; import org.apache.druid.segment.realtime.appenderator.UsedSegmentChecker; @@ -66,7 +65,7 @@ public Set findUsedSegments(Set segmentIds) ); final Collection usedSegmentsForIntervals = taskActionClient - .submit(new RetrieveUsedSegmentsAction(dataSource, null, intervals, Segments.ONLY_VISIBLE)); + .submit(new RetrieveSegmentsToReplaceAction(dataSource, intervals)); for (DataSegment segment : usedSegmentsForIntervals) { if (segmentIdsInDataSource.contains(segment.getId())) { diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java index 94110e167e3e..98124fce3fa3 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java @@ -35,7 +35,7 @@ import org.apache.druid.indexing.common.TaskLockType; import org.apache.druid.indexing.common.TaskToolbox; import org.apache.druid.indexing.common.actions.LockListAction; -import org.apache.druid.indexing.common.actions.RetrieveUsedSegmentsAction; +import org.apache.druid.indexing.common.actions.RetrieveSegmentsToReplaceAction; import org.apache.druid.indexing.common.actions.SegmentTransactionalAppendAction; import org.apache.druid.indexing.common.actions.SegmentTransactionalInsertAction; import org.apache.druid.indexing.common.actions.SegmentTransactionalReplaceAction; @@ -48,7 +48,6 @@ import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexTuningConfig; import org.apache.druid.indexing.input.InputRowSchemas; import org.apache.druid.indexing.overlord.SegmentPublishResult; -import org.apache.druid.indexing.overlord.Segments; import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.JodaUtils; @@ -655,7 +654,7 @@ protected static List findInputSegments( { return ImmutableList.copyOf( actionClient.submit( - new RetrieveUsedSegmentsAction(dataSource, null, intervalsToRead, Segments.ONLY_VISIBLE) + new RetrieveSegmentsToReplaceAction(dataSource, intervalsToRead) ) ); } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java index 1a0e5c971fda..dc370b133f6a 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java @@ -52,7 +52,7 @@ import org.apache.druid.indexing.common.RetryPolicyFactory; import org.apache.druid.indexing.common.SegmentCacheManagerFactory; import org.apache.druid.indexing.common.TaskToolbox; -import org.apache.druid.indexing.common.actions.RetrieveUsedSegmentsAction; +import org.apache.druid.indexing.common.actions.RetrieveSegmentsToReplaceAction; import org.apache.druid.indexing.common.actions.TaskActionClient; import org.apache.druid.indexing.common.task.IndexTask.IndexTuningConfig; import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexIOConfig; @@ -60,7 +60,6 @@ import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexSupervisorTask; import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexTuningConfig; import org.apache.druid.indexing.input.DruidInputSource; -import org.apache.druid.indexing.overlord.Segments; import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.Intervals; @@ -428,7 +427,7 @@ public List findSegmentsToLock(TaskActionClient taskActionClient, L throws IOException { return ImmutableList.copyOf( - taskActionClient.submit(new RetrieveUsedSegmentsAction(getDataSource(), null, intervals, Segments.ONLY_VISIBLE)) + taskActionClient.submit(new RetrieveSegmentsToReplaceAction(getDataSource(), intervals)) ); } @@ -1163,7 +1162,7 @@ static class SegmentProvider List findSegments(TaskActionClient actionClient) throws IOException { return new ArrayList<>( - actionClient.submit(new RetrieveUsedSegmentsAction(dataSource, interval, null, Segments.ONLY_VISIBLE)) + actionClient.submit(new RetrieveSegmentsToReplaceAction(dataSource, ImmutableList.of(interval))) ); } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/TombstoneHelper.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/TombstoneHelper.java index 6b3c684e7947..3e320cc17685 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/TombstoneHelper.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/TombstoneHelper.java @@ -23,10 +23,9 @@ import com.google.common.collect.ImmutableList; import org.apache.druid.indexing.common.TaskLock; import org.apache.druid.indexing.common.actions.LockListAction; -import org.apache.druid.indexing.common.actions.RetrieveUsedSegmentsAction; +import org.apache.druid.indexing.common.actions.RetrieveSegmentsToReplaceAction; import org.apache.druid.indexing.common.actions.TaskActionClient; import org.apache.druid.indexing.common.task.batch.TooManyBucketsException; -import org.apache.druid.indexing.overlord.Segments; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.JodaUtils; @@ -307,11 +306,9 @@ private List getExistingNonEmptyIntervalsOfDatasource( List condensedInputIntervals = JodaUtils.condenseIntervals(inputIntervals); if (!condensedInputIntervals.isEmpty()) { Collection usedSegmentsInInputInterval = - taskActionClient.submit(new RetrieveUsedSegmentsAction( + taskActionClient.submit(new RetrieveSegmentsToReplaceAction( dataSource, - null, - condensedInputIntervals, - Segments.ONLY_VISIBLE + condensedInputIntervals )); for (DataSegment usedSegment : usedSegmentsInInputInterval) { for (Interval condensedInputInterval : condensedInputIntervals) { diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/hadoop/OverlordActionBasedUsedSegmentsRetriever.java b/indexing-service/src/main/java/org/apache/druid/indexing/hadoop/OverlordActionBasedUsedSegmentsRetriever.java index 73bc411fb01d..d9c71b0f8004 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/hadoop/OverlordActionBasedUsedSegmentsRetriever.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/hadoop/OverlordActionBasedUsedSegmentsRetriever.java @@ -23,7 +23,7 @@ import com.google.inject.Inject; import org.apache.druid.indexer.path.UsedSegmentsRetriever; import org.apache.druid.indexing.common.TaskToolbox; -import org.apache.druid.indexing.common.actions.RetrieveUsedSegmentsAction; +import org.apache.druid.indexing.common.actions.RetrieveSegmentsToReplaceAction; import org.apache.druid.indexing.overlord.Segments; import org.apache.druid.timeline.DataSegment; import org.joda.time.Interval; @@ -53,6 +53,6 @@ public Collection retrieveUsedSegmentsForIntervals( { return toolbox .getTaskActionClient() - .submit(new RetrieveUsedSegmentsAction(dataSource, null, intervals, visibility)); + .submit(new RetrieveSegmentsToReplaceAction(dataSource, intervals)); } } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/appenderator/ActionBasedUsedSegmentCheckerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/appenderator/ActionBasedUsedSegmentCheckerTest.java index 35a28be6cbdf..ae4c4fa61636 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/appenderator/ActionBasedUsedSegmentCheckerTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/appenderator/ActionBasedUsedSegmentCheckerTest.java @@ -21,9 +21,8 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; -import org.apache.druid.indexing.common.actions.RetrieveUsedSegmentsAction; +import org.apache.druid.indexing.common.actions.RetrieveSegmentsToReplaceAction; import org.apache.druid.indexing.common.actions.TaskActionClient; -import org.apache.druid.indexing.overlord.Segments; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec; import org.apache.druid.segment.realtime.appenderator.UsedSegmentChecker; @@ -44,7 +43,7 @@ public void testBasic() throws IOException final TaskActionClient taskActionClient = EasyMock.createMock(TaskActionClient.class); EasyMock.expect( taskActionClient.submit( - new RetrieveUsedSegmentsAction("bar", Intervals.of("2002/P1D"), null, Segments.ONLY_VISIBLE) + new RetrieveSegmentsToReplaceAction("bar", ImmutableList.of(Intervals.of("2002/P1D"))) ) ).andReturn( ImmutableList.of( @@ -66,11 +65,9 @@ public void testBasic() throws IOException ); EasyMock.expect( taskActionClient.submit( - new RetrieveUsedSegmentsAction( + new RetrieveSegmentsToReplaceAction( "foo", - null, - ImmutableList.of(Intervals.of("2000/P1D"), Intervals.of("2001/P1D")), - Segments.ONLY_VISIBLE + ImmutableList.of(Intervals.of("2000/P1D"), Intervals.of("2001/P1D")) ) ) ).andReturn( From 80fc8721de328bbd329f17fa80ac7103a6d63d9f Mon Sep 17 00:00:00 2001 From: Amatya Date: Wed, 17 Jan 2024 14:14:14 +0530 Subject: [PATCH 2/9] Consolidate RetrieveSegmentsToReplaceAction into RetrieveUsedSegmentsAction --- .../apache/druid/msq/exec/ControllerImpl.java | 10 +- .../ActionBasedUsedSegmentChecker.java | 5 +- .../actions/RetrieveUsedSegmentsAction.java | 109 +++++++++++++++++- .../common/task/AbstractBatchIndexTask.java | 5 +- .../indexing/common/task/CompactionTask.java | 9 +- .../common/task/KillUnusedSegmentsTask.java | 3 +- .../task/batch/parallel/TombstoneHelper.java | 10 +- ...rlordActionBasedUsedSegmentsRetriever.java | 4 +- .../indexing/input/DruidInputSource.java | 11 +- .../ActionBasedUsedSegmentCheckerTest.java | 12 +- .../actions/RetrieveSegmentsActionsTest.java | 2 +- .../RetrieveUsedSegmentsActionSerdeTest.java | 27 ++++- .../ConcurrentReplaceAndAppendTest.java | 11 +- 13 files changed, 185 insertions(+), 33 deletions(-) diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java index 9a1dd089cfc5..c9aab90f6848 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java @@ -69,7 +69,7 @@ import org.apache.druid.indexing.common.actions.LockListAction; import org.apache.druid.indexing.common.actions.LockReleaseAction; import org.apache.druid.indexing.common.actions.MarkSegmentsAsUnusedAction; -import org.apache.druid.indexing.common.actions.RetrieveSegmentsToReplaceAction; +import org.apache.druid.indexing.common.actions.RetrieveUsedSegmentsAction; import org.apache.druid.indexing.common.actions.SegmentAllocateAction; import org.apache.druid.indexing.common.actions.SegmentTransactionalAppendAction; import org.apache.druid.indexing.common.actions.SegmentTransactionalInsertAction; @@ -79,6 +79,7 @@ import org.apache.druid.indexing.common.task.batch.TooManyBucketsException; import org.apache.druid.indexing.common.task.batch.parallel.TombstoneHelper; import org.apache.druid.indexing.overlord.SegmentPublishResult; +import org.apache.druid.indexing.overlord.Segments; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.ISE; @@ -1233,9 +1234,12 @@ private DataSegmentTimelineView makeDataSegmentTimelineView() // any segment created after the lock was acquired for its interval will not be considered. final Collection publishedUsedSegments; try { - publishedUsedSegments = context.taskActionClient().submit(new RetrieveSegmentsToReplaceAction( + publishedUsedSegments = context.taskActionClient().submit(new RetrieveUsedSegmentsAction( dataSource, - intervals + null, + intervals, + Segments.ONLY_VISIBLE, + true )); } catch (IOException e) { diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/appenderator/ActionBasedUsedSegmentChecker.java b/indexing-service/src/main/java/org/apache/druid/indexing/appenderator/ActionBasedUsedSegmentChecker.java index b4d1944ce681..bc348402ea1d 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/appenderator/ActionBasedUsedSegmentChecker.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/appenderator/ActionBasedUsedSegmentChecker.java @@ -20,8 +20,9 @@ package org.apache.druid.indexing.appenderator; import com.google.common.collect.Iterables; -import org.apache.druid.indexing.common.actions.RetrieveSegmentsToReplaceAction; +import org.apache.druid.indexing.common.actions.RetrieveUsedSegmentsAction; import org.apache.druid.indexing.common.actions.TaskActionClient; +import org.apache.druid.indexing.overlord.Segments; import org.apache.druid.java.util.common.JodaUtils; import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec; import org.apache.druid.segment.realtime.appenderator.UsedSegmentChecker; @@ -65,7 +66,7 @@ public Set findUsedSegments(Set segmentIds) ); final Collection usedSegmentsForIntervals = taskActionClient - .submit(new RetrieveSegmentsToReplaceAction(dataSource, intervals)); + .submit(new RetrieveUsedSegmentsAction(dataSource, null, intervals, Segments.ONLY_VISIBLE, true)); for (DataSegment segment : usedSegmentsForIntervals) { if (segmentIdsInDataSource.contains(segment.getId())) { diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/RetrieveUsedSegmentsAction.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/RetrieveUsedSegmentsAction.java index fab8c4846894..9d8a73a34fa0 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/RetrieveUsedSegmentsAction.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/RetrieveUsedSegmentsAction.java @@ -26,15 +26,27 @@ import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; import org.apache.druid.indexing.common.task.Task; +import org.apache.druid.indexing.common.task.batch.parallel.AbstractBatchSubtask; import org.apache.druid.indexing.overlord.Segments; +import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.JodaUtils; +import org.apache.druid.java.util.common.Pair; +import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.metadata.ReplaceTaskLock; import org.apache.druid.timeline.DataSegment; +import org.apache.druid.timeline.Partitions; +import org.apache.druid.timeline.SegmentTimeline; import org.joda.time.Interval; import javax.annotation.Nullable; import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Objects; +import java.util.Set; +import java.util.stream.Collectors; /** * This TaskAction returns a collection of segments which have data within the specified intervals and are marked as @@ -49,6 +61,8 @@ */ public class RetrieveUsedSegmentsAction implements TaskAction> { + private static final Logger log = new Logger(RetrieveSegmentsToReplaceAction.class); + @JsonIgnore private final String dataSource; @@ -58,6 +72,9 @@ public class RetrieveUsedSegmentsAction implements TaskAction intervals, // When JSON object is deserialized, this parameter is optional for backward compatibility. // Otherwise, it shouldn't be considered optional. - @JsonProperty("visibility") @Nullable Segments visibility + @JsonProperty("visibility") @Nullable Segments visibility, + @Nullable @JsonProperty("replace") Boolean replace ) { this.dataSource = dataSource; @@ -85,6 +103,8 @@ public RetrieveUsedSegmentsAction( // Defaulting to the former behaviour when visibility wasn't explicitly specified for backward compatibility this.visibility = visibility != null ? visibility : Segments.ONLY_VISIBLE; + + this.replace = replace != null ? replace : false; } @JsonProperty @@ -105,6 +125,12 @@ public Segments getVisibility() return visibility; } + @JsonProperty + public boolean getReplace() + { + return replace; + } + @Override public TypeReference> getReturnTypeReference() { @@ -113,11 +139,84 @@ public TypeReference> getReturnTypeReference() @Override public Collection perform(Task task, TaskActionToolbox toolbox) + { + if (!replace) { + return retrieveUsedSegments(toolbox); + } + + // The DruidInputSource can be used to read from one datasource and write to another. + // In such a case, the race condition described in the class-level docs cannot occur, + // and the action can simply fetch all visible segments for the datasource and interval + if (!task.getDataSource().equals(dataSource)) { + return retrieveUsedSegments(toolbox); + } + + final String supervisorId; + if (task instanceof AbstractBatchSubtask) { + supervisorId = ((AbstractBatchSubtask) task).getSupervisorTaskId(); + } else { + supervisorId = task.getId(); + } + + final Set replaceLocksForTask = toolbox + .getTaskLockbox() + .getAllReplaceLocksForDatasource(task.getDataSource()) + .stream() + .filter(lock -> supervisorId.equals(lock.getSupervisorTaskId())) + .collect(Collectors.toSet()); + + // If there are no replace locks for the task, simply fetch all visible segments for the interval + if (replaceLocksForTask.isEmpty()) { + return retrieveUsedSegments(toolbox); + } + + Map>> intervalToCreatedToSegments = new HashMap<>(); + for (Pair segmentAndCreatedDate : + toolbox.getIndexerMetadataStorageCoordinator().retrieveUsedSegmentsAndCreatedDates(dataSource, intervals)) { + final DataSegment segment = segmentAndCreatedDate.lhs; + final String created = segmentAndCreatedDate.rhs; + intervalToCreatedToSegments.computeIfAbsent(segment.getInterval(), s -> new HashMap<>()) + .computeIfAbsent(created, c -> new HashSet<>()) + .add(segment); + } + + Set allSegmentsToBeReplaced = new HashSet<>(); + for (final Map.Entry>> entry : intervalToCreatedToSegments.entrySet()) { + final Interval segmentInterval = entry.getKey(); + String lockVersion = null; + for (ReplaceTaskLock replaceLock : replaceLocksForTask) { + if (replaceLock.getInterval().contains(segmentInterval)) { + lockVersion = replaceLock.getVersion(); + } + } + final Map> createdToSegmentsMap = entry.getValue(); + for (Map.Entry> createdAndSegments : createdToSegmentsMap.entrySet()) { + if (lockVersion == null || lockVersion.compareTo(createdAndSegments.getKey()) > 0) { + allSegmentsToBeReplaced.addAll(createdAndSegments.getValue()); + } else { + for (DataSegment segment : createdAndSegments.getValue()) { + log.info("Ignoring segment[%s] as it has created_date[%s] greater than the REPLACE lock version[%s]", + segment.getId(), createdAndSegments.getKey(), lockVersion); + } + } + } + } + + if (visibility == Segments.ONLY_VISIBLE) { + return SegmentTimeline.forSegments(allSegmentsToBeReplaced) + .findNonOvershadowedObjectsInInterval(Intervals.ETERNITY, Partitions.ONLY_COMPLETE); + } else { + return allSegmentsToBeReplaced; + } + } + + private Collection retrieveUsedSegments(TaskActionToolbox toolbox) { return toolbox.getIndexerMetadataStorageCoordinator() .retrieveUsedSegmentsForIntervals(dataSource, intervals, visibility); } + @Override public boolean isAudited() { @@ -142,13 +241,16 @@ public boolean equals(Object o) if (!intervals.equals(that.intervals)) { return false; } - return visibility.equals(that.visibility); + if (!visibility.equals(that.visibility)) { + return false; + } + return replace == that.replace; } @Override public int hashCode() { - return Objects.hash(dataSource, intervals, visibility); + return Objects.hash(dataSource, intervals, visibility, replace); } @Override @@ -158,6 +260,7 @@ public String toString() "dataSource='" + dataSource + '\'' + ", intervals=" + intervals + ", visibility=" + visibility + + ", replace=" + replace + '}'; } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java index 98124fce3fa3..52461c4f309f 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java @@ -35,7 +35,7 @@ import org.apache.druid.indexing.common.TaskLockType; import org.apache.druid.indexing.common.TaskToolbox; import org.apache.druid.indexing.common.actions.LockListAction; -import org.apache.druid.indexing.common.actions.RetrieveSegmentsToReplaceAction; +import org.apache.druid.indexing.common.actions.RetrieveUsedSegmentsAction; import org.apache.druid.indexing.common.actions.SegmentTransactionalAppendAction; import org.apache.druid.indexing.common.actions.SegmentTransactionalInsertAction; import org.apache.druid.indexing.common.actions.SegmentTransactionalReplaceAction; @@ -48,6 +48,7 @@ import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexTuningConfig; import org.apache.druid.indexing.input.InputRowSchemas; import org.apache.druid.indexing.overlord.SegmentPublishResult; +import org.apache.druid.indexing.overlord.Segments; import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.JodaUtils; @@ -654,7 +655,7 @@ protected static List findInputSegments( { return ImmutableList.copyOf( actionClient.submit( - new RetrieveSegmentsToReplaceAction(dataSource, intervalsToRead) + new RetrieveUsedSegmentsAction(dataSource, null, intervalsToRead, Segments.ONLY_VISIBLE, true) ) ); } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java index dc370b133f6a..5a9c8fdbee90 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java @@ -52,7 +52,7 @@ import org.apache.druid.indexing.common.RetryPolicyFactory; import org.apache.druid.indexing.common.SegmentCacheManagerFactory; import org.apache.druid.indexing.common.TaskToolbox; -import org.apache.druid.indexing.common.actions.RetrieveSegmentsToReplaceAction; +import org.apache.druid.indexing.common.actions.RetrieveUsedSegmentsAction; import org.apache.druid.indexing.common.actions.TaskActionClient; import org.apache.druid.indexing.common.task.IndexTask.IndexTuningConfig; import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexIOConfig; @@ -60,6 +60,7 @@ import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexSupervisorTask; import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexTuningConfig; import org.apache.druid.indexing.input.DruidInputSource; +import org.apache.druid.indexing.overlord.Segments; import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.Intervals; @@ -427,7 +428,7 @@ public List findSegmentsToLock(TaskActionClient taskActionClient, L throws IOException { return ImmutableList.copyOf( - taskActionClient.submit(new RetrieveSegmentsToReplaceAction(getDataSource(), intervals)) + taskActionClient.submit(new RetrieveUsedSegmentsAction(getDataSource(), null, intervals, Segments.ONLY_VISIBLE, true)) ); } @@ -1162,7 +1163,9 @@ static class SegmentProvider List findSegments(TaskActionClient actionClient) throws IOException { return new ArrayList<>( - actionClient.submit(new RetrieveSegmentsToReplaceAction(dataSource, ImmutableList.of(interval))) + actionClient.submit( + new RetrieveUsedSegmentsAction(dataSource, null, ImmutableList.of(interval), Segments.ONLY_VISIBLE, true) + ) ); } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTask.java index 54fae94684fd..c06240b3208e 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTask.java @@ -200,7 +200,8 @@ public TaskStatus runTask(TaskToolbox toolbox) throws Exception getDataSource(), null, ImmutableList.of(getInterval()), - Segments.INCLUDING_OVERSHADOWED + Segments.INCLUDING_OVERSHADOWED, + false ); // Fetch the load specs of all segments overlapping with the unused segment intervals final Set> usedSegmentLoadSpecs = diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/TombstoneHelper.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/TombstoneHelper.java index 3e320cc17685..39acfa24a89f 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/TombstoneHelper.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/TombstoneHelper.java @@ -23,9 +23,10 @@ import com.google.common.collect.ImmutableList; import org.apache.druid.indexing.common.TaskLock; import org.apache.druid.indexing.common.actions.LockListAction; -import org.apache.druid.indexing.common.actions.RetrieveSegmentsToReplaceAction; +import org.apache.druid.indexing.common.actions.RetrieveUsedSegmentsAction; import org.apache.druid.indexing.common.actions.TaskActionClient; import org.apache.druid.indexing.common.task.batch.TooManyBucketsException; +import org.apache.druid.indexing.overlord.Segments; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.JodaUtils; @@ -306,9 +307,12 @@ private List getExistingNonEmptyIntervalsOfDatasource( List condensedInputIntervals = JodaUtils.condenseIntervals(inputIntervals); if (!condensedInputIntervals.isEmpty()) { Collection usedSegmentsInInputInterval = - taskActionClient.submit(new RetrieveSegmentsToReplaceAction( + taskActionClient.submit(new RetrieveUsedSegmentsAction( dataSource, - condensedInputIntervals + null, + condensedInputIntervals, + Segments.ONLY_VISIBLE, + true )); for (DataSegment usedSegment : usedSegmentsInInputInterval) { for (Interval condensedInputInterval : condensedInputIntervals) { diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/hadoop/OverlordActionBasedUsedSegmentsRetriever.java b/indexing-service/src/main/java/org/apache/druid/indexing/hadoop/OverlordActionBasedUsedSegmentsRetriever.java index d9c71b0f8004..01388b8c54ee 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/hadoop/OverlordActionBasedUsedSegmentsRetriever.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/hadoop/OverlordActionBasedUsedSegmentsRetriever.java @@ -23,7 +23,7 @@ import com.google.inject.Inject; import org.apache.druid.indexer.path.UsedSegmentsRetriever; import org.apache.druid.indexing.common.TaskToolbox; -import org.apache.druid.indexing.common.actions.RetrieveSegmentsToReplaceAction; +import org.apache.druid.indexing.common.actions.RetrieveUsedSegmentsAction; import org.apache.druid.indexing.overlord.Segments; import org.apache.druid.timeline.DataSegment; import org.joda.time.Interval; @@ -53,6 +53,6 @@ public Collection retrieveUsedSegmentsForIntervals( { return toolbox .getTaskActionClient() - .submit(new RetrieveSegmentsToReplaceAction(dataSource, intervals)); + .submit(new RetrieveUsedSegmentsAction(dataSource, null, intervals, Segments.ONLY_VISIBLE, true)); } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidInputSource.java b/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidInputSource.java index 85617728e5e6..2bb9b891e693 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidInputSource.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidInputSource.java @@ -49,9 +49,10 @@ import org.apache.druid.data.input.impl.systemfield.SystemFieldDecoratorFactory; import org.apache.druid.indexing.common.SegmentCacheManagerFactory; import org.apache.druid.indexing.common.TaskToolbox; -import org.apache.druid.indexing.common.actions.RetrieveSegmentsToReplaceAction; +import org.apache.druid.indexing.common.actions.RetrieveUsedSegmentsAction; import org.apache.druid.indexing.common.config.TaskConfig; import org.apache.druid.indexing.firehose.WindowedSegmentId; +import org.apache.druid.indexing.overlord.Segments; import org.apache.druid.java.util.common.CloseableIterators; import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.ISE; @@ -554,7 +555,13 @@ public static List> getTimelineForInte } else { try { usedSegments = toolbox.getTaskActionClient() - .submit(new RetrieveSegmentsToReplaceAction(dataSource, Collections.singletonList(interval))); + .submit(new RetrieveUsedSegmentsAction( + dataSource, + null, + Collections.singletonList(interval), + Segments.ONLY_VISIBLE, + true + )); } catch (IOException e) { LOG.error(e, "Error retrieving the used segments for interval[%s].", interval); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/appenderator/ActionBasedUsedSegmentCheckerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/appenderator/ActionBasedUsedSegmentCheckerTest.java index ae4c4fa61636..21cb27b8cfc9 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/appenderator/ActionBasedUsedSegmentCheckerTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/appenderator/ActionBasedUsedSegmentCheckerTest.java @@ -21,8 +21,9 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; -import org.apache.druid.indexing.common.actions.RetrieveSegmentsToReplaceAction; +import org.apache.druid.indexing.common.actions.RetrieveUsedSegmentsAction; import org.apache.druid.indexing.common.actions.TaskActionClient; +import org.apache.druid.indexing.overlord.Segments; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec; import org.apache.druid.segment.realtime.appenderator.UsedSegmentChecker; @@ -43,7 +44,7 @@ public void testBasic() throws IOException final TaskActionClient taskActionClient = EasyMock.createMock(TaskActionClient.class); EasyMock.expect( taskActionClient.submit( - new RetrieveSegmentsToReplaceAction("bar", ImmutableList.of(Intervals.of("2002/P1D"))) + new RetrieveUsedSegmentsAction("bar", Intervals.of("2002/P1D"), null, Segments.ONLY_VISIBLE, true) ) ).andReturn( ImmutableList.of( @@ -65,9 +66,12 @@ public void testBasic() throws IOException ); EasyMock.expect( taskActionClient.submit( - new RetrieveSegmentsToReplaceAction( + new RetrieveUsedSegmentsAction( "foo", - ImmutableList.of(Intervals.of("2000/P1D"), Intervals.of("2001/P1D")) + null, + ImmutableList.of(Intervals.of("2000/P1D"), Intervals.of("2001/P1D")), + Segments.ONLY_VISIBLE, + true ) ) ).andReturn( diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/RetrieveSegmentsActionsTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/RetrieveSegmentsActionsTest.java index 24d2f0a90431..f6c24b1d32bd 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/RetrieveSegmentsActionsTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/RetrieveSegmentsActionsTest.java @@ -96,7 +96,7 @@ private static DataSegment createSegment(Interval interval, String version) public void testRetrieveUsedSegmentsAction() { final RetrieveUsedSegmentsAction action = - new RetrieveUsedSegmentsAction(task.getDataSource(), INTERVAL, null, Segments.ONLY_VISIBLE); + new RetrieveUsedSegmentsAction(task.getDataSource(), INTERVAL, null, Segments.ONLY_VISIBLE, null); final Set resultSegments = new HashSet<>(action.perform(task, actionTestKit.getTaskActionToolbox())); Assert.assertEquals(expectedUsedSegments, resultSegments); } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/RetrieveUsedSegmentsActionSerdeTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/RetrieveUsedSegmentsActionSerdeTest.java index 0876092cac11..55d57c7d6e7c 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/RetrieveUsedSegmentsActionSerdeTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/RetrieveUsedSegmentsActionSerdeTest.java @@ -42,7 +42,7 @@ public void testSingleIntervalSerde() throws Exception Interval interval = Intervals.of("2014/2015"); RetrieveUsedSegmentsAction expected = - new RetrieveUsedSegmentsAction("dataSource", interval, null, Segments.ONLY_VISIBLE); + new RetrieveUsedSegmentsAction("dataSource", interval, null, Segments.ONLY_VISIBLE, false); RetrieveUsedSegmentsAction actual = MAPPER.readValue(MAPPER.writeValueAsString(expected), RetrieveUsedSegmentsAction.class); @@ -58,7 +58,8 @@ public void testMultiIntervalSerde() throws Exception "dataSource", null, intervals, - Segments.ONLY_VISIBLE + Segments.ONLY_VISIBLE, + false ); RetrieveUsedSegmentsAction actual = @@ -74,8 +75,28 @@ public void testOldJsonDeserialization() throws Exception RetrieveUsedSegmentsAction actual = (RetrieveUsedSegmentsAction) MAPPER.readValue(jsonStr, TaskAction.class); Assert.assertEquals( - new RetrieveUsedSegmentsAction("test", Intervals.of("2014/2015"), null, Segments.ONLY_VISIBLE), + new RetrieveUsedSegmentsAction("test", Intervals.of("2014/2015"), null, Segments.ONLY_VISIBLE, false), actual ); } + + @Test + public void testWithReplace() throws Exception + { + List intervals = ImmutableList.of(Intervals.of("2014/2015"), Intervals.of("2016/2017")); + RetrieveUsedSegmentsAction expected = new RetrieveUsedSegmentsAction( + "dataSource", + null, + intervals, + Segments.ONLY_VISIBLE, + true + ); + + RetrieveUsedSegmentsAction actual = + MAPPER.readValue(MAPPER.writeValueAsString(expected), RetrieveUsedSegmentsAction.class); + Assert.assertEquals(intervals, actual.getIntervals()); + Assert.assertTrue(actual.getReplace()); + Assert.assertEquals(expected, actual); + } + } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ConcurrentReplaceAndAppendTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ConcurrentReplaceAndAppendTest.java index 7f83a8f02332..a2ee82e05c09 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ConcurrentReplaceAndAppendTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ConcurrentReplaceAndAppendTest.java @@ -27,7 +27,6 @@ import org.apache.druid.indexing.common.TaskStorageDirTracker; import org.apache.druid.indexing.common.TaskToolbox; import org.apache.druid.indexing.common.TaskToolboxFactory; -import org.apache.druid.indexing.common.actions.RetrieveSegmentsToReplaceAction; import org.apache.druid.indexing.common.actions.RetrieveUsedSegmentsAction; import org.apache.druid.indexing.common.actions.TaskActionClient; import org.apache.druid.indexing.common.actions.TaskActionClientFactory; @@ -940,7 +939,8 @@ private void verifySegments(Interval interval, Segments visibility, DataSegment. WIKI, null, ImmutableList.of(interval), - visibility + visibility, + null ) ); Assert.assertEquals(Sets.newHashSet(expectedSegments), Sets.newHashSet(allUsedSegments)); @@ -955,9 +955,12 @@ private void verifyInputSegments(Task task, Interval interval, DataSegment... ex try { final TaskActionClient taskActionClient = taskActionClientFactory.create(task); Collection allUsedSegments = taskActionClient.submit( - new RetrieveSegmentsToReplaceAction( + new RetrieveUsedSegmentsAction( WIKI, - Collections.singletonList(interval) + null, + Collections.singletonList(interval), + Segments.ONLY_VISIBLE, + true ) ); Assert.assertEquals(Sets.newHashSet(expectedSegments), Sets.newHashSet(allUsedSegments)); From 9a048cce652bdda742d69819d457deca4ab27eef Mon Sep 17 00:00:00 2001 From: Amatya Date: Wed, 17 Jan 2024 18:39:41 +0530 Subject: [PATCH 3/9] Clean up unneeded config --- .../apache/druid/msq/exec/ControllerImpl.java | 10 +++---- .../actions/RetrieveUsedSegmentsAction.java | 2 +- .../indexing/common/config/TaskConfig.java | 27 +++---------------- .../indexing/input/DruidInputSource.java | 2 +- 4 files changed, 9 insertions(+), 32 deletions(-) diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java index c9aab90f6848..9a1dd089cfc5 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java @@ -69,7 +69,7 @@ import org.apache.druid.indexing.common.actions.LockListAction; import org.apache.druid.indexing.common.actions.LockReleaseAction; import org.apache.druid.indexing.common.actions.MarkSegmentsAsUnusedAction; -import org.apache.druid.indexing.common.actions.RetrieveUsedSegmentsAction; +import org.apache.druid.indexing.common.actions.RetrieveSegmentsToReplaceAction; import org.apache.druid.indexing.common.actions.SegmentAllocateAction; import org.apache.druid.indexing.common.actions.SegmentTransactionalAppendAction; import org.apache.druid.indexing.common.actions.SegmentTransactionalInsertAction; @@ -79,7 +79,6 @@ import org.apache.druid.indexing.common.task.batch.TooManyBucketsException; import org.apache.druid.indexing.common.task.batch.parallel.TombstoneHelper; import org.apache.druid.indexing.overlord.SegmentPublishResult; -import org.apache.druid.indexing.overlord.Segments; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.ISE; @@ -1234,12 +1233,9 @@ private DataSegmentTimelineView makeDataSegmentTimelineView() // any segment created after the lock was acquired for its interval will not be considered. final Collection publishedUsedSegments; try { - publishedUsedSegments = context.taskActionClient().submit(new RetrieveUsedSegmentsAction( + publishedUsedSegments = context.taskActionClient().submit(new RetrieveSegmentsToReplaceAction( dataSource, - null, - intervals, - Segments.ONLY_VISIBLE, - true + intervals )); } catch (IOException e) { diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/RetrieveUsedSegmentsAction.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/RetrieveUsedSegmentsAction.java index 9d8a73a34fa0..55132a064df6 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/RetrieveUsedSegmentsAction.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/RetrieveUsedSegmentsAction.java @@ -83,7 +83,7 @@ public RetrieveUsedSegmentsAction( // When JSON object is deserialized, this parameter is optional for backward compatibility. // Otherwise, it shouldn't be considered optional. @JsonProperty("visibility") @Nullable Segments visibility, - @Nullable @JsonProperty("replace") Boolean replace + @JsonProperty("replace") @Nullable Boolean replace ) { this.dataSource = dataSource; diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/config/TaskConfig.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/config/TaskConfig.java index 3352735b9e0e..db48d6f07f7d 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/config/TaskConfig.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/config/TaskConfig.java @@ -79,7 +79,6 @@ public enum BatchProcessingMode private static final Period DEFAULT_GRACEFUL_SHUTDOWN_TIMEOUT = new Period("PT5M"); private static final boolean DEFAULT_STORE_EMPTY_COLUMNS = true; private static final long DEFAULT_TMP_STORAGE_BYTES_PER_TASK = -1; - private static final boolean DEFAULT_ENABLE_CONCURRENT_APPEND_AND_REPLACE = false; @JsonProperty private final String baseDir; @@ -126,9 +125,6 @@ public enum BatchProcessingMode @JsonProperty private final long tmpStorageBytesPerTask; - @JsonProperty - private final boolean enableConcurrentAppendAndReplace; - @JsonCreator public TaskConfig( @JsonProperty("baseDir") String baseDir, @@ -146,8 +142,7 @@ public TaskConfig( @JsonProperty("batchProcessingMode") String batchProcessingMode, @JsonProperty("storeEmptyColumns") @Nullable Boolean storeEmptyColumns, @JsonProperty("encapsulatedTask") boolean enableTaskLevelLogPush, - @JsonProperty("tmpStorageBytesPerTask") @Nullable Long tmpStorageBytesPerTask, - @JsonProperty("enableConcurrentAppendAndReplace") @Nullable Boolean enableConcurrentAppendAndReplace + @JsonProperty("tmpStorageBytesPerTask") @Nullable Long tmpStorageBytesPerTask ) { this.baseDir = Configs.valueOrDefault(baseDir, System.getProperty("java.io.tmpdir")); @@ -198,10 +193,6 @@ public TaskConfig( this.storeEmptyColumns = Configs.valueOrDefault(storeEmptyColumns, DEFAULT_STORE_EMPTY_COLUMNS); this.tmpStorageBytesPerTask = Configs.valueOrDefault(tmpStorageBytesPerTask, DEFAULT_TMP_STORAGE_BYTES_PER_TASK); - this.enableConcurrentAppendAndReplace = Configs.valueOrDefault( - enableConcurrentAppendAndReplace, - DEFAULT_ENABLE_CONCURRENT_APPEND_AND_REPLACE - ); } private TaskConfig( @@ -219,8 +210,7 @@ private TaskConfig( BatchProcessingMode batchProcessingMode, boolean storeEmptyColumns, boolean encapsulatedTask, - long tmpStorageBytesPerTask, - boolean enableConcurrentAppendAndReplace + long tmpStorageBytesPerTask ) { this.baseDir = baseDir; @@ -238,7 +228,6 @@ private TaskConfig( this.storeEmptyColumns = storeEmptyColumns; this.encapsulatedTask = encapsulatedTask; this.tmpStorageBytesPerTask = tmpStorageBytesPerTask; - this.enableConcurrentAppendAndReplace = enableConcurrentAppendAndReplace; } @JsonProperty @@ -355,12 +344,6 @@ public long getTmpStorageBytesPerTask() return tmpStorageBytesPerTask; } - @JsonProperty("enableConcurrentAppendAndReplace") - public boolean isConcurrentAppendAndReplaceEnabled() - { - return enableConcurrentAppendAndReplace; - } - private String defaultDir(@Nullable String configParameter, final String defaultVal) { if (configParameter == null) { @@ -387,8 +370,7 @@ public TaskConfig withBaseTaskDir(File baseTaskDir) batchProcessingMode, storeEmptyColumns, encapsulatedTask, - tmpStorageBytesPerTask, - enableConcurrentAppendAndReplace + tmpStorageBytesPerTask ); } @@ -409,8 +391,7 @@ public TaskConfig withTmpStorageBytesPerTask(long tmpStorageBytesPerTask) batchProcessingMode, storeEmptyColumns, encapsulatedTask, - tmpStorageBytesPerTask, - enableConcurrentAppendAndReplace + tmpStorageBytesPerTask ); } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidInputSource.java b/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidInputSource.java index 2bb9b891e693..4d38143b724b 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidInputSource.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidInputSource.java @@ -547,7 +547,7 @@ public static List> getTimelineForInte Preconditions.checkNotNull(interval); final Collection usedSegments; - if (toolbox == null || !toolbox.getConfig().isConcurrentAppendAndReplaceEnabled()) { + if (toolbox == null) { usedSegments = FutureUtils.getUnchecked( coordinatorClient.fetchUsedSegments(dataSource, Collections.singletonList(interval)), true From 50a1a5d193a9c8e57fc522c4f3117c30f3b32110 Mon Sep 17 00:00:00 2001 From: Amatya Date: Wed, 17 Jan 2024 19:06:29 +0530 Subject: [PATCH 4/9] Fix compilation --- .../common/config/TaskConfigBuilder.java | 16 +--------------- .../indexing/overlord/TaskLifecycleTest.java | 1 - 2 files changed, 1 insertion(+), 16 deletions(-) diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/config/TaskConfigBuilder.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/config/TaskConfigBuilder.java index 8b488fff8093..af920ebbeb73 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/config/TaskConfigBuilder.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/config/TaskConfigBuilder.java @@ -41,7 +41,6 @@ public class TaskConfigBuilder private Boolean storeEmptyColumns; private boolean enableTaskLevelLogPush; private Long tmpStorageBytesPerTask; - private Boolean enableConcurrentAppendAndReplace; public TaskConfigBuilder setBaseDir(String baseDir) { @@ -133,18 +132,6 @@ public TaskConfigBuilder setTmpStorageBytesPerTask(Long tmpStorageBytesPerTask) return this; } - public TaskConfigBuilder enableConcurrentAppendAndReplace() - { - this.enableConcurrentAppendAndReplace = true; - return this; - } - - public TaskConfigBuilder disableConcurrentAppendAndReplace() - { - this.enableConcurrentAppendAndReplace = false; - return this; - } - public TaskConfig build() { return new TaskConfig( @@ -162,8 +149,7 @@ public TaskConfig build() batchProcessingMode, storeEmptyColumns, enableTaskLevelLogPush, - tmpStorageBytesPerTask, - enableConcurrentAppendAndReplace + tmpStorageBytesPerTask ); } } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java index 4f0aacd1cec3..94d21c144dc1 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java @@ -616,7 +616,6 @@ private TaskToolboxFactory setUpTaskToolboxFactory( .setDefaultRowFlushBoundary(50000) .setBatchProcessingMode(TaskConfig.BATCH_PROCESSING_MODE_DEFAULT.name()) .setTmpStorageBytesPerTask(-1L) - .enableConcurrentAppendAndReplace() .build(); return new TaskToolboxFactory( From d079367c2862d8c16382371e6cd64999f1c63777 Mon Sep 17 00:00:00 2001 From: Amatya Date: Sat, 20 Jan 2024 10:52:19 +0530 Subject: [PATCH 5/9] Address feedback --- .../apache/druid/msq/exec/ControllerImpl.java | 18 +- .../msq/test/MSQTestTaskActionClient.java | 10 - .../ActionBasedUsedSegmentChecker.java | 2 +- .../RetrieveSegmentsToReplaceAction.java | 202 ------------------ .../actions/RetrieveUsedSegmentsAction.java | 39 ++-- .../indexing/common/actions/TaskAction.java | 1 - .../common/task/AbstractBatchIndexTask.java | 2 +- .../indexing/common/task/CompactionTask.java | 4 +- .../common/task/KillUnusedSegmentsTask.java | 3 +- .../task/batch/parallel/TombstoneHelper.java | 3 +- ...rlordActionBasedUsedSegmentsRetriever.java | 2 +- .../indexing/input/DruidInputSource.java | 3 +- .../ActionBasedUsedSegmentCheckerTest.java | 5 +- .../actions/RetrieveSegmentsActionsTest.java | 2 +- .../RetrieveUsedSegmentsActionSerdeTest.java | 27 +-- .../ConcurrentReplaceAndAppendTest.java | 6 +- 16 files changed, 41 insertions(+), 288 deletions(-) delete mode 100644 indexing-service/src/main/java/org/apache/druid/indexing/common/actions/RetrieveSegmentsToReplaceAction.java diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java index 9a1dd089cfc5..371344719342 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java @@ -69,7 +69,7 @@ import org.apache.druid.indexing.common.actions.LockListAction; import org.apache.druid.indexing.common.actions.LockReleaseAction; import org.apache.druid.indexing.common.actions.MarkSegmentsAsUnusedAction; -import org.apache.druid.indexing.common.actions.RetrieveSegmentsToReplaceAction; +import org.apache.druid.indexing.common.actions.RetrieveUsedSegmentsAction; import org.apache.druid.indexing.common.actions.SegmentAllocateAction; import org.apache.druid.indexing.common.actions.SegmentTransactionalAppendAction; import org.apache.druid.indexing.common.actions.SegmentTransactionalInsertAction; @@ -79,6 +79,7 @@ import org.apache.druid.indexing.common.task.batch.TooManyBucketsException; import org.apache.druid.indexing.common.task.batch.parallel.TombstoneHelper; import org.apache.druid.indexing.overlord.SegmentPublishResult; +import org.apache.druid.indexing.overlord.Segments; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.ISE; @@ -1233,10 +1234,17 @@ private DataSegmentTimelineView makeDataSegmentTimelineView() // any segment created after the lock was acquired for its interval will not be considered. final Collection publishedUsedSegments; try { - publishedUsedSegments = context.taskActionClient().submit(new RetrieveSegmentsToReplaceAction( - dataSource, - intervals - )); + // Additional check as the task action does not accept empty intervals + if (!intervals.isEmpty()) { + publishedUsedSegments = context.taskActionClient().submit(new RetrieveUsedSegmentsAction( + dataSource, + null, + intervals, + Segments.ONLY_VISIBLE + )); + } else { + publishedUsedSegments = Collections.emptySet(); + } } catch (IOException e) { throw new MSQException(e, UnknownFault.forException(e)); diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestTaskActionClient.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestTaskActionClient.java index 5192aafccdc5..3ab3aaf8c25d 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestTaskActionClient.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestTaskActionClient.java @@ -27,7 +27,6 @@ import org.apache.druid.indexing.common.TaskLockType; import org.apache.druid.indexing.common.TimeChunkLock; import org.apache.druid.indexing.common.actions.LockListAction; -import org.apache.druid.indexing.common.actions.RetrieveSegmentsToReplaceAction; import org.apache.druid.indexing.common.actions.RetrieveUsedSegmentsAction; import org.apache.druid.indexing.common.actions.SegmentAllocateAction; import org.apache.druid.indexing.common.actions.SegmentTransactionalAppendAction; @@ -41,7 +40,6 @@ import org.apache.druid.java.util.common.granularity.PeriodGranularity; import org.apache.druid.msq.indexing.error.InsertLockPreemptedFaultTest; import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec; -import org.apache.druid.server.SpecificSegmentsQuerySegmentWalker; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.SegmentId; import org.joda.time.Interval; @@ -128,14 +126,6 @@ public RetType submit(TaskAction taskAction) .build() ).collect(Collectors.toSet()); } - } else if (taskAction instanceof RetrieveSegmentsToReplaceAction) { - String dataSource = ((RetrieveSegmentsToReplaceAction) taskAction).getDataSource(); - return (RetType) injector.getInstance(SpecificSegmentsQuerySegmentWalker.class) - .getSegments() - .stream() - .filter(dataSegment -> dataSegment.getDataSource() - .equals(dataSource)) - .collect(Collectors.toSet()); } else if (taskAction instanceof SegmentTransactionalInsertAction) { final Set segments = ((SegmentTransactionalInsertAction) taskAction).getSegments(); publishedSegments.addAll(segments); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/appenderator/ActionBasedUsedSegmentChecker.java b/indexing-service/src/main/java/org/apache/druid/indexing/appenderator/ActionBasedUsedSegmentChecker.java index bc348402ea1d..984058895e3c 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/appenderator/ActionBasedUsedSegmentChecker.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/appenderator/ActionBasedUsedSegmentChecker.java @@ -66,7 +66,7 @@ public Set findUsedSegments(Set segmentIds) ); final Collection usedSegmentsForIntervals = taskActionClient - .submit(new RetrieveUsedSegmentsAction(dataSource, null, intervals, Segments.ONLY_VISIBLE, true)); + .submit(new RetrieveUsedSegmentsAction(dataSource, null, intervals, Segments.ONLY_VISIBLE)); for (DataSegment segment : usedSegmentsForIntervals) { if (segmentIdsInDataSource.contains(segment.getId())) { diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/RetrieveSegmentsToReplaceAction.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/RetrieveSegmentsToReplaceAction.java deleted file mode 100644 index 7fec3369a824..000000000000 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/RetrieveSegmentsToReplaceAction.java +++ /dev/null @@ -1,202 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.druid.indexing.common.actions; - -import com.fasterxml.jackson.annotation.JsonCreator; -import com.fasterxml.jackson.annotation.JsonProperty; -import com.fasterxml.jackson.core.type.TypeReference; -import org.apache.druid.indexing.common.task.Task; -import org.apache.druid.indexing.common.task.batch.parallel.AbstractBatchSubtask; -import org.apache.druid.indexing.overlord.Segments; -import org.apache.druid.java.util.common.Intervals; -import org.apache.druid.java.util.common.Pair; -import org.apache.druid.java.util.common.logger.Logger; -import org.apache.druid.metadata.ReplaceTaskLock; -import org.apache.druid.timeline.DataSegment; -import org.apache.druid.timeline.Partitions; -import org.apache.druid.timeline.SegmentTimeline; -import org.joda.time.Interval; - -import java.util.Collection; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.Set; -import java.util.stream.Collectors; - -/** - * This action exists in addition to retrieveUsedSegmentsAction because that action suffers - * from a race condition described by the following sequence of events: - * - * -Segments S1, S2, S3 exist - * -Compact acquires a replace lock - * -A concurrent appending job publishes a segment S4 which needs to be upgraded to the replace lock's version - * -Compact task processes S1-S4 to create new segments - * -Compact task publishes new segments and carries S4 forward to the new version - * - * This can lead to the data in S4 being duplicated - * - * This TaskAction returns a collection of segments which have data within the specified interval and are marked as - * used, and have been created before a REPLACE lock, if any, was acquired. - * This ensures that a consistent set of segments is returned each time this action is called - */ -public class RetrieveSegmentsToReplaceAction implements TaskAction> -{ - private static final Logger log = new Logger(RetrieveSegmentsToReplaceAction.class); - - private final String dataSource; - - private final List intervals; - - @JsonCreator - public RetrieveSegmentsToReplaceAction( - @JsonProperty("dataSource") String dataSource, - @JsonProperty("intervals") List intervals - ) - { - this.dataSource = dataSource; - this.intervals = intervals; - } - - @JsonProperty - public String getDataSource() - { - return dataSource; - } - - @JsonProperty - public List getIntervals() - { - return intervals; - } - - @Override - public TypeReference> getReturnTypeReference() - { - return new TypeReference>() {}; - } - - @Override - public Collection perform(Task task, TaskActionToolbox toolbox) - { - // The DruidInputSource can be used to read from one datasource and write to another. - // In such a case, the race condition described in the class-level docs cannot occur, - // and the action can simply fetch all visible segments for the datasource and interval - if (!task.getDataSource().equals(dataSource)) { - return retrieveAllVisibleSegments(toolbox); - } - - final String supervisorId; - if (task instanceof AbstractBatchSubtask) { - supervisorId = ((AbstractBatchSubtask) task).getSupervisorTaskId(); - } else { - supervisorId = task.getId(); - } - - final Set replaceLocksForTask = toolbox - .getTaskLockbox() - .getAllReplaceLocksForDatasource(task.getDataSource()) - .stream() - .filter(lock -> supervisorId.equals(lock.getSupervisorTaskId())) - .collect(Collectors.toSet()); - - // If there are no replace locks for the task, simply fetch all visible segments for the interval - if (replaceLocksForTask.isEmpty()) { - return retrieveAllVisibleSegments(toolbox); - } - - Map>> intervalToCreatedToSegments = new HashMap<>(); - for (Pair segmentAndCreatedDate : - toolbox.getIndexerMetadataStorageCoordinator().retrieveUsedSegmentsAndCreatedDates(dataSource, intervals)) { - final DataSegment segment = segmentAndCreatedDate.lhs; - final String created = segmentAndCreatedDate.rhs; - intervalToCreatedToSegments.computeIfAbsent(segment.getInterval(), s -> new HashMap<>()) - .computeIfAbsent(created, c -> new HashSet<>()) - .add(segment); - } - - Set allSegmentsToBeReplaced = new HashSet<>(); - for (final Map.Entry>> entry : intervalToCreatedToSegments.entrySet()) { - final Interval segmentInterval = entry.getKey(); - String lockVersion = null; - for (ReplaceTaskLock replaceLock : replaceLocksForTask) { - if (replaceLock.getInterval().contains(segmentInterval)) { - lockVersion = replaceLock.getVersion(); - } - } - final Map> createdToSegmentsMap = entry.getValue(); - for (Map.Entry> createdAndSegments : createdToSegmentsMap.entrySet()) { - if (lockVersion == null || lockVersion.compareTo(createdAndSegments.getKey()) > 0) { - allSegmentsToBeReplaced.addAll(createdAndSegments.getValue()); - } else { - for (DataSegment segment : createdAndSegments.getValue()) { - log.info("Ignoring segment[%s] as it has created_date[%s] greater than the REPLACE lock version[%s]", - segment.getId(), createdAndSegments.getKey(), lockVersion); - } - } - } - } - - return SegmentTimeline.forSegments(allSegmentsToBeReplaced) - .findNonOvershadowedObjectsInInterval(Intervals.ETERNITY, Partitions.ONLY_COMPLETE); - } - - private Collection retrieveAllVisibleSegments(TaskActionToolbox toolbox) - { - return toolbox.getIndexerMetadataStorageCoordinator() - .retrieveUsedSegmentsForIntervals(dataSource, intervals, Segments.ONLY_VISIBLE); - } - - @Override - public boolean isAudited() - { - return false; - } - - @Override - public boolean equals(Object o) - { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - - RetrieveSegmentsToReplaceAction that = (RetrieveSegmentsToReplaceAction) o; - return Objects.equals(dataSource, that.dataSource) && Objects.equals(intervals, that.intervals); - } - - @Override - public int hashCode() - { - return Objects.hash(dataSource, intervals); - } - @Override - public String toString() - { - return "RetrieveSegmentsToReplaceAction{" + - "dataSource='" + dataSource + '\'' + - ", intervals=" + intervals + - '}'; - } -} diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/RetrieveUsedSegmentsAction.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/RetrieveUsedSegmentsAction.java index 55132a064df6..3c3cd87bb1e9 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/RetrieveUsedSegmentsAction.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/RetrieveUsedSegmentsAction.java @@ -51,17 +51,21 @@ /** * This TaskAction returns a collection of segments which have data within the specified intervals and are marked as * used. + * If the task holds REPLACE locks and the datasource being read is also the one being replaced, + * fetch only those segments for the interval that were created before its REPLACE lock's version. + * This change is needed to ensure that the input set of segments is always consistent for a replacing task + * when concurrent appending tasks append segments. * * The order of segments within the returned collection is unspecified, but each segment is guaranteed to appear in * the collection only once. * - * @implNote This action doesn't produce a {@link java.util.Set} because it's implemented via {@link + * @implNote This action doesn't produce a {@link Set} because it's implemented via {@link * org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator#retrieveUsedSegmentsForIntervals} which returns - * a collection. Producing a {@link java.util.Set} would require an unnecessary copy of segments collection. + * a collection. Producing a {@link Set} would require an unnecessary copy of segments collection. */ public class RetrieveUsedSegmentsAction implements TaskAction> { - private static final Logger log = new Logger(RetrieveSegmentsToReplaceAction.class); + private static final Logger log = new Logger(RetrieveUsedSegmentsAction.class); @JsonIgnore private final String dataSource; @@ -72,9 +76,6 @@ public class RetrieveUsedSegmentsAction implements TaskAction intervals, // When JSON object is deserialized, this parameter is optional for backward compatibility. // Otherwise, it shouldn't be considered optional. - @JsonProperty("visibility") @Nullable Segments visibility, - @JsonProperty("replace") @Nullable Boolean replace + @JsonProperty("visibility") @Nullable Segments visibility ) { this.dataSource = dataSource; @@ -103,8 +103,6 @@ public RetrieveUsedSegmentsAction( // Defaulting to the former behaviour when visibility wasn't explicitly specified for backward compatibility this.visibility = visibility != null ? visibility : Segments.ONLY_VISIBLE; - - this.replace = replace != null ? replace : false; } @JsonProperty @@ -125,12 +123,6 @@ public Segments getVisibility() return visibility; } - @JsonProperty - public boolean getReplace() - { - return replace; - } - @Override public TypeReference> getReturnTypeReference() { @@ -140,13 +132,10 @@ public TypeReference> getReturnTypeReference() @Override public Collection perform(Task task, TaskActionToolbox toolbox) { - if (!replace) { - return retrieveUsedSegments(toolbox); - } - // The DruidInputSource can be used to read from one datasource and write to another. // In such a case, the race condition described in the class-level docs cannot occur, - // and the action can simply fetch all visible segments for the datasource and interval + // and the action can simply fetch all visible segments for the datasource and interval. + // Similarly, an MSQ replace could read from a different datasource. if (!task.getDataSource().equals(dataSource)) { return retrieveUsedSegments(toolbox); } @@ -241,16 +230,13 @@ public boolean equals(Object o) if (!intervals.equals(that.intervals)) { return false; } - if (!visibility.equals(that.visibility)) { - return false; - } - return replace == that.replace; + return visibility.equals(that.visibility); } @Override public int hashCode() { - return Objects.hash(dataSource, intervals, visibility, replace); + return Objects.hash(dataSource, intervals, visibility); } @Override @@ -260,7 +246,6 @@ public String toString() "dataSource='" + dataSource + '\'' + ", intervals=" + intervals + ", visibility=" + visibility + - ", replace=" + replace + '}'; } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/TaskAction.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/TaskAction.java index e251626f8690..171d53b9cdd6 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/TaskAction.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/TaskAction.java @@ -38,7 +38,6 @@ @JsonSubTypes.Type(name = "segmentTransactionalInsert", value = SegmentTransactionalInsertAction.class), @JsonSubTypes.Type(name = "segmentTransactionalAppend", value = SegmentTransactionalAppendAction.class), @JsonSubTypes.Type(name = "segmentTransactionalReplace", value = SegmentTransactionalReplaceAction.class), - @JsonSubTypes.Type(name = "retrieveSegmentsToReplace", value = RetrieveSegmentsToReplaceAction.class), // Type name doesn't correspond to the name of the class for backward compatibility. @JsonSubTypes.Type(name = "segmentListUsed", value = RetrieveUsedSegmentsAction.class), // Type name doesn't correspond to the name of the class for backward compatibility. diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java index 52461c4f309f..94110e167e3e 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java @@ -655,7 +655,7 @@ protected static List findInputSegments( { return ImmutableList.copyOf( actionClient.submit( - new RetrieveUsedSegmentsAction(dataSource, null, intervalsToRead, Segments.ONLY_VISIBLE, true) + new RetrieveUsedSegmentsAction(dataSource, null, intervalsToRead, Segments.ONLY_VISIBLE) ) ); } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java index 5a9c8fdbee90..febbd4d8efd4 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java @@ -428,7 +428,7 @@ public List findSegmentsToLock(TaskActionClient taskActionClient, L throws IOException { return ImmutableList.copyOf( - taskActionClient.submit(new RetrieveUsedSegmentsAction(getDataSource(), null, intervals, Segments.ONLY_VISIBLE, true)) + taskActionClient.submit(new RetrieveUsedSegmentsAction(getDataSource(), null, intervals, Segments.ONLY_VISIBLE)) ); } @@ -1164,7 +1164,7 @@ List findSegments(TaskActionClient actionClient) throws IOException { return new ArrayList<>( actionClient.submit( - new RetrieveUsedSegmentsAction(dataSource, null, ImmutableList.of(interval), Segments.ONLY_VISIBLE, true) + new RetrieveUsedSegmentsAction(dataSource, null, ImmutableList.of(interval), Segments.ONLY_VISIBLE) ) ); } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTask.java index 635b347b76af..cbf4a84ba790 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTask.java @@ -221,8 +221,7 @@ public TaskStatus runTask(TaskToolbox toolbox) throws Exception getDataSource(), null, ImmutableList.of(getInterval()), - Segments.INCLUDING_OVERSHADOWED, - false + Segments.INCLUDING_OVERSHADOWED ); // Fetch the load specs of all segments overlapping with the unused segment intervals final Set> usedSegmentLoadSpecs = diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/TombstoneHelper.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/TombstoneHelper.java index 39acfa24a89f..6b3c684e7947 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/TombstoneHelper.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/TombstoneHelper.java @@ -311,8 +311,7 @@ private List getExistingNonEmptyIntervalsOfDatasource( dataSource, null, condensedInputIntervals, - Segments.ONLY_VISIBLE, - true + Segments.ONLY_VISIBLE )); for (DataSegment usedSegment : usedSegmentsInInputInterval) { for (Interval condensedInputInterval : condensedInputIntervals) { diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/hadoop/OverlordActionBasedUsedSegmentsRetriever.java b/indexing-service/src/main/java/org/apache/druid/indexing/hadoop/OverlordActionBasedUsedSegmentsRetriever.java index 01388b8c54ee..e1a48d3f557d 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/hadoop/OverlordActionBasedUsedSegmentsRetriever.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/hadoop/OverlordActionBasedUsedSegmentsRetriever.java @@ -53,6 +53,6 @@ public Collection retrieveUsedSegmentsForIntervals( { return toolbox .getTaskActionClient() - .submit(new RetrieveUsedSegmentsAction(dataSource, null, intervals, Segments.ONLY_VISIBLE, true)); + .submit(new RetrieveUsedSegmentsAction(dataSource, null, intervals, Segments.ONLY_VISIBLE)); } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidInputSource.java b/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidInputSource.java index 4d38143b724b..d99f2a45ee96 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidInputSource.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidInputSource.java @@ -559,8 +559,7 @@ public static List> getTimelineForInte dataSource, null, Collections.singletonList(interval), - Segments.ONLY_VISIBLE, - true + Segments.ONLY_VISIBLE )); } catch (IOException e) { diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/appenderator/ActionBasedUsedSegmentCheckerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/appenderator/ActionBasedUsedSegmentCheckerTest.java index 21cb27b8cfc9..35a28be6cbdf 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/appenderator/ActionBasedUsedSegmentCheckerTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/appenderator/ActionBasedUsedSegmentCheckerTest.java @@ -44,7 +44,7 @@ public void testBasic() throws IOException final TaskActionClient taskActionClient = EasyMock.createMock(TaskActionClient.class); EasyMock.expect( taskActionClient.submit( - new RetrieveUsedSegmentsAction("bar", Intervals.of("2002/P1D"), null, Segments.ONLY_VISIBLE, true) + new RetrieveUsedSegmentsAction("bar", Intervals.of("2002/P1D"), null, Segments.ONLY_VISIBLE) ) ).andReturn( ImmutableList.of( @@ -70,8 +70,7 @@ public void testBasic() throws IOException "foo", null, ImmutableList.of(Intervals.of("2000/P1D"), Intervals.of("2001/P1D")), - Segments.ONLY_VISIBLE, - true + Segments.ONLY_VISIBLE ) ) ).andReturn( diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/RetrieveSegmentsActionsTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/RetrieveSegmentsActionsTest.java index cab3d9842f10..0fbfb1733a81 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/RetrieveSegmentsActionsTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/RetrieveSegmentsActionsTest.java @@ -98,7 +98,7 @@ private static DataSegment createSegment(Interval interval, String version) public void testRetrieveUsedSegmentsAction() { final RetrieveUsedSegmentsAction action = - new RetrieveUsedSegmentsAction(task.getDataSource(), INTERVAL, null, Segments.ONLY_VISIBLE, null); + new RetrieveUsedSegmentsAction(task.getDataSource(), INTERVAL, null, Segments.ONLY_VISIBLE); final Set resultSegments = new HashSet<>(action.perform(task, actionTestKit.getTaskActionToolbox())); Assert.assertEquals(expectedUsedSegments, resultSegments); } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/RetrieveUsedSegmentsActionSerdeTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/RetrieveUsedSegmentsActionSerdeTest.java index 55d57c7d6e7c..0876092cac11 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/RetrieveUsedSegmentsActionSerdeTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/RetrieveUsedSegmentsActionSerdeTest.java @@ -42,7 +42,7 @@ public void testSingleIntervalSerde() throws Exception Interval interval = Intervals.of("2014/2015"); RetrieveUsedSegmentsAction expected = - new RetrieveUsedSegmentsAction("dataSource", interval, null, Segments.ONLY_VISIBLE, false); + new RetrieveUsedSegmentsAction("dataSource", interval, null, Segments.ONLY_VISIBLE); RetrieveUsedSegmentsAction actual = MAPPER.readValue(MAPPER.writeValueAsString(expected), RetrieveUsedSegmentsAction.class); @@ -58,8 +58,7 @@ public void testMultiIntervalSerde() throws Exception "dataSource", null, intervals, - Segments.ONLY_VISIBLE, - false + Segments.ONLY_VISIBLE ); RetrieveUsedSegmentsAction actual = @@ -75,28 +74,8 @@ public void testOldJsonDeserialization() throws Exception RetrieveUsedSegmentsAction actual = (RetrieveUsedSegmentsAction) MAPPER.readValue(jsonStr, TaskAction.class); Assert.assertEquals( - new RetrieveUsedSegmentsAction("test", Intervals.of("2014/2015"), null, Segments.ONLY_VISIBLE, false), + new RetrieveUsedSegmentsAction("test", Intervals.of("2014/2015"), null, Segments.ONLY_VISIBLE), actual ); } - - @Test - public void testWithReplace() throws Exception - { - List intervals = ImmutableList.of(Intervals.of("2014/2015"), Intervals.of("2016/2017")); - RetrieveUsedSegmentsAction expected = new RetrieveUsedSegmentsAction( - "dataSource", - null, - intervals, - Segments.ONLY_VISIBLE, - true - ); - - RetrieveUsedSegmentsAction actual = - MAPPER.readValue(MAPPER.writeValueAsString(expected), RetrieveUsedSegmentsAction.class); - Assert.assertEquals(intervals, actual.getIntervals()); - Assert.assertTrue(actual.getReplace()); - Assert.assertEquals(expected, actual); - } - } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ConcurrentReplaceAndAppendTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ConcurrentReplaceAndAppendTest.java index a2ee82e05c09..913ee3dd8d8f 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ConcurrentReplaceAndAppendTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ConcurrentReplaceAndAppendTest.java @@ -939,8 +939,7 @@ private void verifySegments(Interval interval, Segments visibility, DataSegment. WIKI, null, ImmutableList.of(interval), - visibility, - null + visibility ) ); Assert.assertEquals(Sets.newHashSet(expectedSegments), Sets.newHashSet(allUsedSegments)); @@ -959,8 +958,7 @@ private void verifyInputSegments(Task task, Interval interval, DataSegment... ex WIKI, null, Collections.singletonList(interval), - Segments.ONLY_VISIBLE, - true + Segments.ONLY_VISIBLE ) ); Assert.assertEquals(Sets.newHashSet(expectedSegments), Sets.newHashSet(allUsedSegments)); From 369f0fd9139c155e4887a8539be70ff4ae30aeec Mon Sep 17 00:00:00 2001 From: Amatya Date: Sat, 20 Jan 2024 11:04:14 +0530 Subject: [PATCH 6/9] Revert accidental change --- .../hadoop/OverlordActionBasedUsedSegmentsRetriever.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/hadoop/OverlordActionBasedUsedSegmentsRetriever.java b/indexing-service/src/main/java/org/apache/druid/indexing/hadoop/OverlordActionBasedUsedSegmentsRetriever.java index e1a48d3f557d..73bc411fb01d 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/hadoop/OverlordActionBasedUsedSegmentsRetriever.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/hadoop/OverlordActionBasedUsedSegmentsRetriever.java @@ -53,6 +53,6 @@ public Collection retrieveUsedSegmentsForIntervals( { return toolbox .getTaskActionClient() - .submit(new RetrieveUsedSegmentsAction(dataSource, null, intervals, Segments.ONLY_VISIBLE)); + .submit(new RetrieveUsedSegmentsAction(dataSource, null, intervals, visibility)); } } From e78bf51a780463247def4c8b68433af743baacfa Mon Sep 17 00:00:00 2001 From: Amatya Date: Mon, 22 Jan 2024 11:07:48 +0530 Subject: [PATCH 7/9] Address feedback --- .../apache/druid/msq/exec/ControllerImpl.java | 11 +++----- .../msq/test/MSQTestTaskActionClient.java | 28 +++++-------------- .../ActionBasedUsedSegmentChecker.java | 3 +- .../actions/RetrieveUsedSegmentsAction.java | 26 +++++++++-------- .../common/task/AbstractBatchIndexTask.java | 3 +- .../indexing/common/task/CompactionTask.java | 5 ++-- .../task/batch/parallel/TombstoneHelper.java | 5 +--- .../indexing/input/DruidInputSource.java | 5 +--- .../ActionBasedUsedSegmentCheckerTest.java | 7 ++--- .../actions/RetrieveSegmentsActionsTest.java | 3 +- .../RetrieveUsedSegmentsActionSerdeTest.java | 4 +-- .../ConcurrentReplaceAndAppendTest.java | 8 ++---- 12 files changed, 38 insertions(+), 70 deletions(-) diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java index 371344719342..f31f66c5ef3a 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java @@ -79,7 +79,6 @@ import org.apache.druid.indexing.common.task.batch.TooManyBucketsException; import org.apache.druid.indexing.common.task.batch.parallel.TombstoneHelper; import org.apache.druid.indexing.overlord.SegmentPublishResult; -import org.apache.druid.indexing.overlord.Segments; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.ISE; @@ -1235,15 +1234,13 @@ private DataSegmentTimelineView makeDataSegmentTimelineView() final Collection publishedUsedSegments; try { // Additional check as the task action does not accept empty intervals - if (!intervals.isEmpty()) { + if (intervals.isEmpty()) { + publishedUsedSegments = Collections.emptySet(); + } else { publishedUsedSegments = context.taskActionClient().submit(new RetrieveUsedSegmentsAction( dataSource, - null, - intervals, - Segments.ONLY_VISIBLE + intervals )); - } else { - publishedUsedSegments = Collections.emptySet(); } } catch (IOException e) { diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestTaskActionClient.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestTaskActionClient.java index 3ab3aaf8c25d..fd0452cce7ca 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestTaskActionClient.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestTaskActionClient.java @@ -21,8 +21,6 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.ImmutableSet; import com.google.inject.Injector; import org.apache.druid.indexing.common.TaskLockType; import org.apache.druid.indexing.common.TimeChunkLock; @@ -40,13 +38,12 @@ import org.apache.druid.java.util.common.granularity.PeriodGranularity; import org.apache.druid.msq.indexing.error.InsertLockPreemptedFaultTest; import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec; +import org.apache.druid.server.SpecificSegmentsQuerySegmentWalker; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.SegmentId; import org.joda.time.Interval; import java.util.HashSet; -import java.util.List; -import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; @@ -58,10 +55,6 @@ public class MSQTestTaskActionClient implements TaskActionClient public static final String VERSION = "test"; private final ObjectMapper mapper; private final ConcurrentHashMap segmentIdPartitionIdMap = new ConcurrentHashMap<>(); - private final Map> usedIntervals = ImmutableMap.of( - "foo", ImmutableList.of(Intervals.of("2001-01-01/2001-01-04"), Intervals.of("2000-01-01/2000-01-04")), - "foo2", ImmutableList.of(Intervals.of("2000-01-01/P1D")) - ); private final Set publishedSegments = new HashSet<>(); private final Injector injector; @@ -113,19 +106,12 @@ public RetType submit(TaskAction taskAction) )); } else if (taskAction instanceof RetrieveUsedSegmentsAction) { String dataSource = ((RetrieveUsedSegmentsAction) taskAction).getDataSource(); - if (!usedIntervals.containsKey(dataSource)) { - return (RetType) ImmutableSet.of(); - } else { - return (RetType) usedIntervals.get(dataSource) - .stream() - .map(interval -> DataSegment.builder() - .dataSource(dataSource) - .interval(interval) - .version(VERSION) - .size(1) - .build() - ).collect(Collectors.toSet()); - } + return (RetType) injector.getInstance(SpecificSegmentsQuerySegmentWalker.class) + .getSegments() + .stream() + .filter(dataSegment -> dataSegment.getDataSource() + .equals(dataSource)) + .collect(Collectors.toSet()); } else if (taskAction instanceof SegmentTransactionalInsertAction) { final Set segments = ((SegmentTransactionalInsertAction) taskAction).getSegments(); publishedSegments.addAll(segments); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/appenderator/ActionBasedUsedSegmentChecker.java b/indexing-service/src/main/java/org/apache/druid/indexing/appenderator/ActionBasedUsedSegmentChecker.java index 984058895e3c..3a33bc80d68f 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/appenderator/ActionBasedUsedSegmentChecker.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/appenderator/ActionBasedUsedSegmentChecker.java @@ -22,7 +22,6 @@ import com.google.common.collect.Iterables; import org.apache.druid.indexing.common.actions.RetrieveUsedSegmentsAction; import org.apache.druid.indexing.common.actions.TaskActionClient; -import org.apache.druid.indexing.overlord.Segments; import org.apache.druid.java.util.common.JodaUtils; import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec; import org.apache.druid.segment.realtime.appenderator.UsedSegmentChecker; @@ -66,7 +65,7 @@ public Set findUsedSegments(Set segmentIds) ); final Collection usedSegmentsForIntervals = taskActionClient - .submit(new RetrieveUsedSegmentsAction(dataSource, null, intervals, Segments.ONLY_VISIBLE)); + .submit(new RetrieveUsedSegmentsAction(dataSource, intervals)); for (DataSegment segment : usedSegmentsForIntervals) { if (segmentIdsInDataSource.contains(segment.getId())) { diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/RetrieveUsedSegmentsAction.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/RetrieveUsedSegmentsAction.java index 3c3cd87bb1e9..29986eeba555 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/RetrieveUsedSegmentsAction.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/RetrieveUsedSegmentsAction.java @@ -51,10 +51,10 @@ /** * This TaskAction returns a collection of segments which have data within the specified intervals and are marked as * used. - * If the task holds REPLACE locks and the datasource being read is also the one being replaced, - * fetch only those segments for the interval that were created before its REPLACE lock's version. - * This change is needed to ensure that the input set of segments is always consistent for a replacing task - * when concurrent appending tasks append segments. + * If the task holds REPLACE locks and is writing back to the same datasource, + * only segments that were created before the REPLACE lock was acquired are returned for an interval. + * This ensures that the input set of segments for this replace task remains consistent + * even when new data is appended by other concurrent tasks. * * The order of segments within the returned collection is unspecified, but each segment is guaranteed to appear in * the collection only once. @@ -105,6 +105,11 @@ public RetrieveUsedSegmentsAction( this.visibility = visibility != null ? visibility : Segments.ONLY_VISIBLE; } + public RetrieveUsedSegmentsAction(String dataSource, Collection intervals) + { + this(dataSource, null, intervals, Segments.ONLY_VISIBLE); + } + @JsonProperty public String getDataSource() { @@ -132,10 +137,9 @@ public TypeReference> getReturnTypeReference() @Override public Collection perform(Task task, TaskActionToolbox toolbox) { - // The DruidInputSource can be used to read from one datasource and write to another. - // In such a case, the race condition described in the class-level docs cannot occur, - // and the action can simply fetch all visible segments for the datasource and interval. - // Similarly, an MSQ replace could read from a different datasource. + // When fetching segments for a datasource other than the one this task is writing to, + // just return all segments with the needed visibility. + // This is because we can't ensure that the set of returned segments is consistent throughout the task's lifecycle if (!task.getDataSource().equals(dataSource)) { return retrieveUsedSegments(toolbox); } @@ -163,9 +167,9 @@ public Collection perform(Task task, TaskActionToolbox toolbox) for (Pair segmentAndCreatedDate : toolbox.getIndexerMetadataStorageCoordinator().retrieveUsedSegmentsAndCreatedDates(dataSource, intervals)) { final DataSegment segment = segmentAndCreatedDate.lhs; - final String created = segmentAndCreatedDate.rhs; + final String createdDate = segmentAndCreatedDate.rhs; intervalToCreatedToSegments.computeIfAbsent(segment.getInterval(), s -> new HashMap<>()) - .computeIfAbsent(created, c -> new HashSet<>()) + .computeIfAbsent(createdDate, c -> new HashSet<>()) .add(segment); } @@ -176,6 +180,7 @@ public Collection perform(Task task, TaskActionToolbox toolbox) for (ReplaceTaskLock replaceLock : replaceLocksForTask) { if (replaceLock.getInterval().contains(segmentInterval)) { lockVersion = replaceLock.getVersion(); + break; } } final Map> createdToSegmentsMap = entry.getValue(); @@ -205,7 +210,6 @@ private Collection retrieveUsedSegments(TaskActionToolbox toolbox) .retrieveUsedSegmentsForIntervals(dataSource, intervals, visibility); } - @Override public boolean isAudited() { diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java index 94110e167e3e..4a76e688fb7a 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java @@ -48,7 +48,6 @@ import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexTuningConfig; import org.apache.druid.indexing.input.InputRowSchemas; import org.apache.druid.indexing.overlord.SegmentPublishResult; -import org.apache.druid.indexing.overlord.Segments; import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.JodaUtils; @@ -655,7 +654,7 @@ protected static List findInputSegments( { return ImmutableList.copyOf( actionClient.submit( - new RetrieveUsedSegmentsAction(dataSource, null, intervalsToRead, Segments.ONLY_VISIBLE) + new RetrieveUsedSegmentsAction(dataSource, intervalsToRead) ) ); } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java index febbd4d8efd4..6959de2809de 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java @@ -60,7 +60,6 @@ import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexSupervisorTask; import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexTuningConfig; import org.apache.druid.indexing.input.DruidInputSource; -import org.apache.druid.indexing.overlord.Segments; import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.Intervals; @@ -428,7 +427,7 @@ public List findSegmentsToLock(TaskActionClient taskActionClient, L throws IOException { return ImmutableList.copyOf( - taskActionClient.submit(new RetrieveUsedSegmentsAction(getDataSource(), null, intervals, Segments.ONLY_VISIBLE)) + taskActionClient.submit(new RetrieveUsedSegmentsAction(getDataSource(), intervals)) ); } @@ -1164,7 +1163,7 @@ List findSegments(TaskActionClient actionClient) throws IOException { return new ArrayList<>( actionClient.submit( - new RetrieveUsedSegmentsAction(dataSource, null, ImmutableList.of(interval), Segments.ONLY_VISIBLE) + new RetrieveUsedSegmentsAction(dataSource, ImmutableList.of(interval)) ) ); } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/TombstoneHelper.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/TombstoneHelper.java index 6b3c684e7947..15ba6788307e 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/TombstoneHelper.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/TombstoneHelper.java @@ -26,7 +26,6 @@ import org.apache.druid.indexing.common.actions.RetrieveUsedSegmentsAction; import org.apache.druid.indexing.common.actions.TaskActionClient; import org.apache.druid.indexing.common.task.batch.TooManyBucketsException; -import org.apache.druid.indexing.overlord.Segments; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.JodaUtils; @@ -309,9 +308,7 @@ private List getExistingNonEmptyIntervalsOfDatasource( Collection usedSegmentsInInputInterval = taskActionClient.submit(new RetrieveUsedSegmentsAction( dataSource, - null, - condensedInputIntervals, - Segments.ONLY_VISIBLE + condensedInputIntervals )); for (DataSegment usedSegment : usedSegmentsInInputInterval) { for (Interval condensedInputInterval : condensedInputIntervals) { diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidInputSource.java b/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidInputSource.java index d99f2a45ee96..890a7c313fa4 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidInputSource.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidInputSource.java @@ -52,7 +52,6 @@ import org.apache.druid.indexing.common.actions.RetrieveUsedSegmentsAction; import org.apache.druid.indexing.common.config.TaskConfig; import org.apache.druid.indexing.firehose.WindowedSegmentId; -import org.apache.druid.indexing.overlord.Segments; import org.apache.druid.java.util.common.CloseableIterators; import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.ISE; @@ -557,9 +556,7 @@ public static List> getTimelineForInte usedSegments = toolbox.getTaskActionClient() .submit(new RetrieveUsedSegmentsAction( dataSource, - null, - Collections.singletonList(interval), - Segments.ONLY_VISIBLE + Collections.singletonList(interval) )); } catch (IOException e) { diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/appenderator/ActionBasedUsedSegmentCheckerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/appenderator/ActionBasedUsedSegmentCheckerTest.java index 35a28be6cbdf..c339a103b2d3 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/appenderator/ActionBasedUsedSegmentCheckerTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/appenderator/ActionBasedUsedSegmentCheckerTest.java @@ -23,7 +23,6 @@ import com.google.common.collect.ImmutableSet; import org.apache.druid.indexing.common.actions.RetrieveUsedSegmentsAction; import org.apache.druid.indexing.common.actions.TaskActionClient; -import org.apache.druid.indexing.overlord.Segments; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec; import org.apache.druid.segment.realtime.appenderator.UsedSegmentChecker; @@ -44,7 +43,7 @@ public void testBasic() throws IOException final TaskActionClient taskActionClient = EasyMock.createMock(TaskActionClient.class); EasyMock.expect( taskActionClient.submit( - new RetrieveUsedSegmentsAction("bar", Intervals.of("2002/P1D"), null, Segments.ONLY_VISIBLE) + new RetrieveUsedSegmentsAction("bar", ImmutableList.of(Intervals.of("2002/P1D"))) ) ).andReturn( ImmutableList.of( @@ -68,9 +67,7 @@ public void testBasic() throws IOException taskActionClient.submit( new RetrieveUsedSegmentsAction( "foo", - null, - ImmutableList.of(Intervals.of("2000/P1D"), Intervals.of("2001/P1D")), - Segments.ONLY_VISIBLE + ImmutableList.of(Intervals.of("2000/P1D"), Intervals.of("2001/P1D")) ) ) ).andReturn( diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/RetrieveSegmentsActionsTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/RetrieveSegmentsActionsTest.java index 0fbfb1733a81..2dee594aad68 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/RetrieveSegmentsActionsTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/RetrieveSegmentsActionsTest.java @@ -23,7 +23,6 @@ import com.google.common.collect.ImmutableSet; import org.apache.druid.indexing.common.task.NoopTask; import org.apache.druid.indexing.common.task.Task; -import org.apache.druid.indexing.overlord.Segments; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.timeline.DataSegment; @@ -98,7 +97,7 @@ private static DataSegment createSegment(Interval interval, String version) public void testRetrieveUsedSegmentsAction() { final RetrieveUsedSegmentsAction action = - new RetrieveUsedSegmentsAction(task.getDataSource(), INTERVAL, null, Segments.ONLY_VISIBLE); + new RetrieveUsedSegmentsAction(task.getDataSource(), ImmutableList.of(INTERVAL)); final Set resultSegments = new HashSet<>(action.perform(task, actionTestKit.getTaskActionToolbox())); Assert.assertEquals(expectedUsedSegments, resultSegments); } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/RetrieveUsedSegmentsActionSerdeTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/RetrieveUsedSegmentsActionSerdeTest.java index 0876092cac11..99675fd57bb1 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/RetrieveUsedSegmentsActionSerdeTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/RetrieveUsedSegmentsActionSerdeTest.java @@ -56,9 +56,7 @@ public void testMultiIntervalSerde() throws Exception List intervals = ImmutableList.of(Intervals.of("2014/2015"), Intervals.of("2016/2017")); RetrieveUsedSegmentsAction expected = new RetrieveUsedSegmentsAction( "dataSource", - null, - intervals, - Segments.ONLY_VISIBLE + intervals ); RetrieveUsedSegmentsAction actual = diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ConcurrentReplaceAndAppendTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ConcurrentReplaceAndAppendTest.java index 913ee3dd8d8f..94af13d69dd8 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ConcurrentReplaceAndAppendTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ConcurrentReplaceAndAppendTest.java @@ -937,9 +937,7 @@ private void verifySegments(Interval interval, Segments visibility, DataSegment. Collection allUsedSegments = dummyTaskActionClient.submit( new RetrieveUsedSegmentsAction( WIKI, - null, - ImmutableList.of(interval), - visibility + ImmutableList.of(interval) ) ); Assert.assertEquals(Sets.newHashSet(expectedSegments), Sets.newHashSet(allUsedSegments)); @@ -956,9 +954,7 @@ private void verifyInputSegments(Task task, Interval interval, DataSegment... ex Collection allUsedSegments = taskActionClient.submit( new RetrieveUsedSegmentsAction( WIKI, - null, - Collections.singletonList(interval), - Segments.ONLY_VISIBLE + Collections.singletonList(interval) ) ); Assert.assertEquals(Sets.newHashSet(expectedSegments), Sets.newHashSet(allUsedSegments)); From e75c8312698110afe25cf3f8828753f2a4723f65 Mon Sep 17 00:00:00 2001 From: Amatya Date: Tue, 23 Jan 2024 12:02:36 +0530 Subject: [PATCH 8/9] Fix concurrent tests --- .../task/concurrent/ConcurrentReplaceAndAppendTest.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ConcurrentReplaceAndAppendTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ConcurrentReplaceAndAppendTest.java index 94af13d69dd8..3fda953a4541 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ConcurrentReplaceAndAppendTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ConcurrentReplaceAndAppendTest.java @@ -937,7 +937,9 @@ private void verifySegments(Interval interval, Segments visibility, DataSegment. Collection allUsedSegments = dummyTaskActionClient.submit( new RetrieveUsedSegmentsAction( WIKI, - ImmutableList.of(interval) + null, + ImmutableList.of(interval), + visibility ) ); Assert.assertEquals(Sets.newHashSet(expectedSegments), Sets.newHashSet(allUsedSegments)); From 09ab5a04e59e105a23474c6a8602b52bb6551541 Mon Sep 17 00:00:00 2001 From: Amatya Date: Wed, 24 Jan 2024 15:24:03 +0530 Subject: [PATCH 9/9] Try to fix MSQ tests --- .../apache/druid/msq/exec/MSQReplaceTest.java | 139 +++++++++++++++++- 1 file changed, 138 insertions(+), 1 deletion(-) diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java index f8100dc8a8f3..ea7adc866ee0 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java @@ -38,6 +38,7 @@ import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.SegmentId; import org.apache.druid.timeline.partition.DimensionRangeShardSpec; +import org.easymock.EasyMock; import org.joda.time.Interval; import org.junit.Test; import org.junit.runner.RunWith; @@ -98,6 +99,28 @@ public void testReplaceOnFooWithAll() .add("m1", ColumnType.FLOAT) .build(); + DataSegment existingDataSegment0 = DataSegment.builder() + .interval(Intervals.of("2000-01-01T/2000-01-04T")) + .size(50) + .version(MSQTestTaskActionClient.VERSION) + .dataSource("foo") + .build(); + + DataSegment existingDataSegment1 = DataSegment.builder() + .interval(Intervals.of("2001-01-01T/2001-01-04T")) + .size(50) + .version(MSQTestTaskActionClient.VERSION) + .dataSource("foo") + .build(); + + Mockito.doCallRealMethod() + .doReturn(ImmutableSet.of(existingDataSegment0, existingDataSegment1)) + .when(testTaskActionClient) + .submit(new RetrieveUsedSegmentsAction( + EasyMock.eq("foo"), + EasyMock.eq(ImmutableList.of(Intervals.ETERNITY)) + )); + testIngestQuery().setSql(" REPLACE INTO foo OVERWRITE ALL " + "SELECT __time, m1 " + "FROM foo " @@ -418,6 +441,28 @@ public void testReplaceSegmentsRepartitionTable() .add("m1", ColumnType.FLOAT) .build(); + DataSegment existingDataSegment0 = DataSegment.builder() + .interval(Intervals.of("2000-01-01T/2000-01-04T")) + .size(50) + .version(MSQTestTaskActionClient.VERSION) + .dataSource("foo") + .build(); + DataSegment existingDataSegment1 = DataSegment.builder() + .interval(Intervals.of("2001-01-01T/2001-01-04T")) + .size(50) + .version(MSQTestTaskActionClient.VERSION) + .dataSource("foo") + .build(); + + Mockito.doCallRealMethod() + .doReturn(ImmutableSet.of(existingDataSegment0, existingDataSegment1)) + .when(testTaskActionClient) + .submit(new RetrieveUsedSegmentsAction( + EasyMock.eq("foo"), + EasyMock.eq(ImmutableList.of(Intervals.ETERNITY)) + )); + + testIngestQuery().setSql(" REPLACE INTO foo " + "OVERWRITE ALL " + "SELECT __time, m1 " @@ -479,6 +524,20 @@ public void testReplaceWithWhereClause() .add("m1", ColumnType.FLOAT) .build(); + DataSegment existingDataSegment0 = DataSegment.builder() + .interval(Intervals.of("2000-01-01T/2000-01-04T")) + .size(50) + .version(MSQTestTaskActionClient.VERSION) + .dataSource("foo") + .build(); + + Mockito.doReturn(ImmutableSet.of(existingDataSegment0)) + .when(testTaskActionClient) + .submit(new RetrieveUsedSegmentsAction( + EasyMock.eq("foo"), + EasyMock.eq(ImmutableList.of(Intervals.of("2000-01-01/2000-03-01"))) + )); + testIngestQuery().setSql(" REPLACE INTO foo " + "OVERWRITE WHERE __time >= TIMESTAMP '2000-01-01' AND __time < TIMESTAMP '2000-03-01' " + "SELECT __time, m1 " @@ -538,6 +597,28 @@ public void testReplaceWhereClauseLargerThanData() .add("m1", ColumnType.FLOAT) .build(); + DataSegment existingDataSegment0 = DataSegment.builder() + .interval(Intervals.of("2000-01-01T/2000-01-04T")) + .size(50) + .version(MSQTestTaskActionClient.VERSION) + .dataSource("foo") + .build(); + + DataSegment existingDataSegment1 = DataSegment.builder() + .interval(Intervals.of("2001-01-01T/2001-01-04T")) + .size(50) + .version(MSQTestTaskActionClient.VERSION) + .dataSource("foo") + .build(); + + Mockito.doReturn(ImmutableSet.of(existingDataSegment0, existingDataSegment1)) + .when(testTaskActionClient) + .submit(new RetrieveUsedSegmentsAction( + EasyMock.eq("foo"), + EasyMock.eq(ImmutableList.of(Intervals.of("2000-01-01/2002-01-01"))) + )); + + testIngestQuery().setSql(" REPLACE INTO foo " + "OVERWRITE WHERE __time >= TIMESTAMP '2000-01-01' AND __time < TIMESTAMP '2002-01-01' " + "SELECT __time, m1 " @@ -625,6 +706,19 @@ public void testReplaceTimeChunks() .add("m1", ColumnType.FLOAT) .build(); + final DataSegment existingDataSegment = DataSegment.builder() + .dataSource("foo") + .interval(Intervals.of("2000-01-01/2000-01-04")) + .version(MSQTestTaskActionClient.VERSION) + .size(1) + .build(); + Mockito.doReturn(ImmutableSet.of(existingDataSegment)) + .when(testTaskActionClient) + .submit(new RetrieveUsedSegmentsAction( + EasyMock.eq("foo"), + EasyMock.eq(ImmutableList.of(Intervals.of("2000-01-01/2000-03-01"))) + )); + testIngestQuery().setSql(" REPLACE INTO foo " + "OVERWRITE WHERE __time >= TIMESTAMP '2000-01-01' AND __time < TIMESTAMP '2000-03-01'" + "SELECT __time, m1 " @@ -659,6 +753,26 @@ public void testReplaceTimeChunksLargerThanData() .add("m1", ColumnType.FLOAT) .build(); + DataSegment existingDataSegment0 = DataSegment.builder() + .interval(Intervals.of("2000-01-01T/2000-01-04T")) + .size(50) + .version(MSQTestTaskActionClient.VERSION) + .dataSource("foo") + .build(); + DataSegment existingDataSegment1 = DataSegment.builder() + .interval(Intervals.of("2001-01-01T/2001-01-04T")) + .size(50) + .version(MSQTestTaskActionClient.VERSION) + .dataSource("foo") + .build(); + + Mockito.doReturn(ImmutableSet.of(existingDataSegment0, existingDataSegment1)) + .when(testTaskActionClient) + .submit(new RetrieveUsedSegmentsAction( + EasyMock.eq("foo"), + EasyMock.eq(ImmutableList.of(Intervals.of("2000/2002"))) + )); + testIngestQuery().setSql(" REPLACE INTO foo " + "OVERWRITE WHERE __time >= TIMESTAMP '2000-01-01' AND __time < TIMESTAMP '2002-01-01'" + "SELECT __time, m1 " @@ -939,6 +1053,26 @@ public void testReplaceUnnestSegmentWithTimeFilter() .add("d", ColumnType.STRING) .build(); + DataSegment existingDataSegment0 = DataSegment.builder() + .interval(Intervals.of("2000-01-01T/2000-01-04T")) + .size(50) + .version(MSQTestTaskActionClient.VERSION) + .dataSource("foo") + .build(); + DataSegment existingDataSegment1 = DataSegment.builder() + .interval(Intervals.of("2001-01-01T/2001-01-04T")) + .size(50) + .version(MSQTestTaskActionClient.VERSION) + .dataSource("foo") + .build(); + + Mockito.doReturn(ImmutableSet.of(existingDataSegment0, existingDataSegment1)) + .when(testTaskActionClient) + .submit(new RetrieveUsedSegmentsAction( + EasyMock.eq("foo"), + EasyMock.eq(ImmutableList.of(Intervals.of("1999/2002"))) + )); + testIngestQuery().setSql(" REPLACE INTO foo " + "OVERWRITE WHERE __time >= TIMESTAMP '1999-01-01 00:00:00' and __time < TIMESTAMP '2002-01-01 00:00:00'" + "SELECT __time, d " @@ -1003,7 +1137,10 @@ public void testReplaceTombstonesOverPartiallyOverlappingSegments() Mockito.doReturn(ImmutableSet.of(existingDataSegment)) .when(testTaskActionClient) - .submit(ArgumentMatchers.isA(RetrieveUsedSegmentsAction.class)); + .submit(new RetrieveUsedSegmentsAction( + EasyMock.eq("foo1"), + EasyMock.eq(ImmutableList.of(Intervals.of("2000/2002"))) + )); List expectedResults; if (NullHandling.sqlCompatible()) {