diff --git a/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisor.java b/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisor.java index af5c0fbe95a1..333868780157 100644 --- a/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisor.java +++ b/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisor.java @@ -57,6 +57,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -371,7 +372,8 @@ Pair, Map>> checkSegment // Pair max(created_date), interval -> list> Pair, Map>> baseSegmentsSnapshot = getMaxCreateDateAndBaseSegments( - metadataStorageCoordinator.retrieveUsedSegmentsAndCreatedDates(spec.getBaseDataSource(), Intervals.ETERNITY) + metadataStorageCoordinator.retrieveUsedSegmentsAndCreatedDates(spec.getBaseDataSource(), + Collections.singletonList(Intervals.ETERNITY)) ); // baseSegments are used to create HadoopIndexTask Map> baseSegments = baseSegmentsSnapshot.rhs; diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/RetrieveSegmentsToReplaceAction.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/RetrieveSegmentsToReplaceAction.java index 78e6ada5c1e2..7fec3369a824 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/RetrieveSegmentsToReplaceAction.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/RetrieveSegmentsToReplaceAction.java @@ -20,7 +20,6 @@ package org.apache.druid.indexing.common.actions; import com.fasterxml.jackson.annotation.JsonCreator; -import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.core.type.TypeReference; import org.apache.druid.indexing.common.task.Task; @@ -38,6 +37,7 @@ import java.util.Collection; import java.util.HashMap; import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Set; @@ -63,20 +63,18 @@ public class RetrieveSegmentsToReplaceAction implements TaskAction intervals; @JsonCreator public RetrieveSegmentsToReplaceAction( @JsonProperty("dataSource") String dataSource, - @JsonProperty("interval") Interval interval + @JsonProperty("intervals") List intervals ) { this.dataSource = dataSource; - this.interval = interval; + this.intervals = intervals; } @JsonProperty @@ -86,9 +84,9 @@ public String getDataSource() } @JsonProperty - public Interval getInterval() + public List getIntervals() { - return interval; + return intervals; } @Override @@ -128,7 +126,7 @@ public Collection perform(Task task, TaskActionToolbox toolbox) Map>> intervalToCreatedToSegments = new HashMap<>(); for (Pair segmentAndCreatedDate : - toolbox.getIndexerMetadataStorageCoordinator().retrieveUsedSegmentsAndCreatedDates(dataSource, interval)) { + toolbox.getIndexerMetadataStorageCoordinator().retrieveUsedSegmentsAndCreatedDates(dataSource, intervals)) { final DataSegment segment = segmentAndCreatedDate.lhs; final String created = segmentAndCreatedDate.rhs; intervalToCreatedToSegments.computeIfAbsent(segment.getInterval(), s -> new HashMap<>()) @@ -165,7 +163,7 @@ public Collection perform(Task task, TaskActionToolbox toolbox) private Collection retrieveAllVisibleSegments(TaskActionToolbox toolbox) { return toolbox.getIndexerMetadataStorageCoordinator() - .retrieveUsedSegmentsForInterval(dataSource, interval, Segments.ONLY_VISIBLE); + .retrieveUsedSegmentsForIntervals(dataSource, intervals, Segments.ONLY_VISIBLE); } @Override @@ -185,25 +183,20 @@ public boolean equals(Object o) } RetrieveSegmentsToReplaceAction that = (RetrieveSegmentsToReplaceAction) o; - - if (!dataSource.equals(that.dataSource)) { - return false; - } - return interval.equals(that.interval); + return Objects.equals(dataSource, that.dataSource) && Objects.equals(intervals, that.intervals); } @Override public int hashCode() { - return Objects.hash(dataSource, interval); + return Objects.hash(dataSource, intervals); } - @Override public String toString() { - return getClass().getSimpleName() + "{" + + return "RetrieveSegmentsToReplaceAction{" + "dataSource='" + dataSource + '\'' + - ", interval=" + interval + + ", intervals=" + intervals + '}'; } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/config/TaskConfig.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/config/TaskConfig.java index db48d6f07f7d..3352735b9e0e 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/config/TaskConfig.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/config/TaskConfig.java @@ -79,6 +79,7 @@ public enum BatchProcessingMode private static final Period DEFAULT_GRACEFUL_SHUTDOWN_TIMEOUT = new Period("PT5M"); private static final boolean DEFAULT_STORE_EMPTY_COLUMNS = true; private static final long DEFAULT_TMP_STORAGE_BYTES_PER_TASK = -1; + private static final boolean DEFAULT_ENABLE_CONCURRENT_APPEND_AND_REPLACE = false; @JsonProperty private final String baseDir; @@ -125,6 +126,9 @@ public enum BatchProcessingMode @JsonProperty private final long tmpStorageBytesPerTask; + @JsonProperty + private final boolean enableConcurrentAppendAndReplace; + @JsonCreator public TaskConfig( @JsonProperty("baseDir") String baseDir, @@ -142,7 +146,8 @@ public TaskConfig( @JsonProperty("batchProcessingMode") String batchProcessingMode, @JsonProperty("storeEmptyColumns") @Nullable Boolean storeEmptyColumns, @JsonProperty("encapsulatedTask") boolean enableTaskLevelLogPush, - @JsonProperty("tmpStorageBytesPerTask") @Nullable Long tmpStorageBytesPerTask + @JsonProperty("tmpStorageBytesPerTask") @Nullable Long tmpStorageBytesPerTask, + @JsonProperty("enableConcurrentAppendAndReplace") @Nullable Boolean enableConcurrentAppendAndReplace ) { this.baseDir = Configs.valueOrDefault(baseDir, System.getProperty("java.io.tmpdir")); @@ -193,6 +198,10 @@ public TaskConfig( this.storeEmptyColumns = Configs.valueOrDefault(storeEmptyColumns, DEFAULT_STORE_EMPTY_COLUMNS); this.tmpStorageBytesPerTask = Configs.valueOrDefault(tmpStorageBytesPerTask, DEFAULT_TMP_STORAGE_BYTES_PER_TASK); + this.enableConcurrentAppendAndReplace = Configs.valueOrDefault( + enableConcurrentAppendAndReplace, + DEFAULT_ENABLE_CONCURRENT_APPEND_AND_REPLACE + ); } private TaskConfig( @@ -210,7 +219,8 @@ private TaskConfig( BatchProcessingMode batchProcessingMode, boolean storeEmptyColumns, boolean encapsulatedTask, - long tmpStorageBytesPerTask + long tmpStorageBytesPerTask, + boolean enableConcurrentAppendAndReplace ) { this.baseDir = baseDir; @@ -228,6 +238,7 @@ private TaskConfig( this.storeEmptyColumns = storeEmptyColumns; this.encapsulatedTask = encapsulatedTask; this.tmpStorageBytesPerTask = tmpStorageBytesPerTask; + this.enableConcurrentAppendAndReplace = enableConcurrentAppendAndReplace; } @JsonProperty @@ -344,6 +355,12 @@ public long getTmpStorageBytesPerTask() return tmpStorageBytesPerTask; } + @JsonProperty("enableConcurrentAppendAndReplace") + public boolean isConcurrentAppendAndReplaceEnabled() + { + return enableConcurrentAppendAndReplace; + } + private String defaultDir(@Nullable String configParameter, final String defaultVal) { if (configParameter == null) { @@ -370,7 +387,8 @@ public TaskConfig withBaseTaskDir(File baseTaskDir) batchProcessingMode, storeEmptyColumns, encapsulatedTask, - tmpStorageBytesPerTask + tmpStorageBytesPerTask, + enableConcurrentAppendAndReplace ); } @@ -391,7 +409,8 @@ public TaskConfig withTmpStorageBytesPerTask(long tmpStorageBytesPerTask) batchProcessingMode, storeEmptyColumns, encapsulatedTask, - tmpStorageBytesPerTask + tmpStorageBytesPerTask, + enableConcurrentAppendAndReplace ); } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidInputSource.java b/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidInputSource.java index 2e548e181f40..85617728e5e6 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidInputSource.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidInputSource.java @@ -546,7 +546,7 @@ public static List> getTimelineForInte Preconditions.checkNotNull(interval); final Collection usedSegments; - if (toolbox == null) { + if (toolbox == null || !toolbox.getConfig().isConcurrentAppendAndReplaceEnabled()) { usedSegments = FutureUtils.getUnchecked( coordinatorClient.fetchUsedSegments(dataSource, Collections.singletonList(interval)), true @@ -554,7 +554,7 @@ public static List> getTimelineForInte } else { try { usedSegments = toolbox.getTaskActionClient() - .submit(new RetrieveSegmentsToReplaceAction(dataSource, interval)); + .submit(new RetrieveSegmentsToReplaceAction(dataSource, Collections.singletonList(interval))); } catch (IOException e) { LOG.error(e, "Error retrieving the used segments for interval[%s].", interval); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/config/TaskConfigBuilder.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/config/TaskConfigBuilder.java index af920ebbeb73..8b488fff8093 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/config/TaskConfigBuilder.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/config/TaskConfigBuilder.java @@ -41,6 +41,7 @@ public class TaskConfigBuilder private Boolean storeEmptyColumns; private boolean enableTaskLevelLogPush; private Long tmpStorageBytesPerTask; + private Boolean enableConcurrentAppendAndReplace; public TaskConfigBuilder setBaseDir(String baseDir) { @@ -132,6 +133,18 @@ public TaskConfigBuilder setTmpStorageBytesPerTask(Long tmpStorageBytesPerTask) return this; } + public TaskConfigBuilder enableConcurrentAppendAndReplace() + { + this.enableConcurrentAppendAndReplace = true; + return this; + } + + public TaskConfigBuilder disableConcurrentAppendAndReplace() + { + this.enableConcurrentAppendAndReplace = false; + return this; + } + public TaskConfig build() { return new TaskConfig( @@ -149,7 +162,8 @@ public TaskConfig build() batchProcessingMode, storeEmptyColumns, enableTaskLevelLogPush, - tmpStorageBytesPerTask + tmpStorageBytesPerTask, + enableConcurrentAppendAndReplace ); } } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ConcurrentReplaceAndAppendTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ConcurrentReplaceAndAppendTest.java index 1c4b6809c387..7f83a8f02332 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ConcurrentReplaceAndAppendTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ConcurrentReplaceAndAppendTest.java @@ -957,7 +957,7 @@ private void verifyInputSegments(Task task, Interval interval, DataSegment... ex Collection allUsedSegments = taskActionClient.submit( new RetrieveSegmentsToReplaceAction( WIKI, - interval + Collections.singletonList(interval) ) ); Assert.assertEquals(Sets.newHashSet(expectedSegments), Sets.newHashSet(allUsedSegments)); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java index 572364a56a28..627c161863b6 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java @@ -615,6 +615,7 @@ private TaskToolboxFactory setUpTaskToolboxFactory( .setDefaultRowFlushBoundary(50000) .setBatchProcessingMode(TaskConfig.BATCH_PROCESSING_MODE_DEFAULT.name()) .setTmpStorageBytesPerTask(-1L) + .enableConcurrentAppendAndReplace() .build(); return new TaskToolboxFactory( diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java b/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java index 143a74c72cbc..bca79a559af7 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java @@ -89,7 +89,7 @@ public List retrieveAllUsedSegments(String dataSource, Segments vis } @Override - public List> retrieveUsedSegmentsAndCreatedDates(String dataSource, Interval interval) + public List> retrieveUsedSegmentsAndCreatedDates(String dataSource, List intervals) { return ImmutableList.of(); } diff --git a/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java b/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java index 34a55574dce9..3cea2e6dd581 100644 --- a/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java +++ b/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java @@ -84,7 +84,7 @@ default Collection retrieveUsedSegmentsForInterval( /** * * Retrieve all published segments which are marked as used and the created_date of these segments belonging to the - * given data source and interval from the metadata store. + * given data source and list of intervals from the metadata store. * * Unlike other similar methods in this interface, this method doesn't accept a {@link Segments} "visibility" * parameter. The returned collection may include overshadowed segments and their created_dates, as if {@link @@ -92,11 +92,11 @@ default Collection retrieveUsedSegmentsForInterval( * if needed. * * @param dataSource The data source to query - * @param interval The interval to query + * @param intervals The list of interval to query * * @return The DataSegments and the related created_date of segments */ - Collection> retrieveUsedSegmentsAndCreatedDates(String dataSource, Interval interval); + Collection> retrieveUsedSegmentsAndCreatedDates(String dataSource, List intervals); /** * Retrieve all published segments which may include any data in the given intervals and are marked as used from the diff --git a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java index 612f712c1bb2..62f55f96c475 100644 --- a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java +++ b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java @@ -174,21 +174,23 @@ private Collection doRetrieveUsedSegments( } @Override - public List> retrieveUsedSegmentsAndCreatedDates(String dataSource, Interval interval) + public List> retrieveUsedSegmentsAndCreatedDates(String dataSource, List intervals) { StringBuilder queryBuilder = new StringBuilder( "SELECT created_date, payload FROM %1$s WHERE dataSource = :dataSource AND used = true" ); - final List intervals = new ArrayList<>(); - // Do not need an interval condition if the interval is ETERNITY - if (!Intervals.isEternity(interval)) { - intervals.add(interval); + boolean hasEternityInterval = false; + for (Interval interval : intervals) { + if (Intervals.isEternity(interval)) { + hasEternityInterval = true; + break; + } } SqlSegmentsMetadataQuery.appendConditionForIntervalsAndMatchMode( queryBuilder, - intervals, + hasEternityInterval ? Collections.emptyList() : intervals, SqlSegmentsMetadataQuery.IntervalMode.OVERLAPS, connector ); diff --git a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java index 5d76296d67bd..8e2e7eb747fb 100644 --- a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java +++ b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java @@ -2870,28 +2870,28 @@ public void testRetrieveUsedSegmentsAndCreatedDates() insertUsedSegments(ImmutableSet.of(defaultSegment)); List> resultForIntervalOnTheLeft = - coordinator.retrieveUsedSegmentsAndCreatedDates(defaultSegment.getDataSource(), Intervals.of("2000/2001")); + coordinator.retrieveUsedSegmentsAndCreatedDates(defaultSegment.getDataSource(), Collections.singletonList(Intervals.of("2000/2001"))); Assert.assertTrue(resultForIntervalOnTheLeft.isEmpty()); List> resultForIntervalOnTheRight = - coordinator.retrieveUsedSegmentsAndCreatedDates(defaultSegment.getDataSource(), Intervals.of("3000/3001")); + coordinator.retrieveUsedSegmentsAndCreatedDates(defaultSegment.getDataSource(), Collections.singletonList(Intervals.of("3000/3001"))); Assert.assertTrue(resultForIntervalOnTheRight.isEmpty()); List> resultForExactInterval = - coordinator.retrieveUsedSegmentsAndCreatedDates(defaultSegment.getDataSource(), defaultSegment.getInterval()); + coordinator.retrieveUsedSegmentsAndCreatedDates(defaultSegment.getDataSource(), Collections.singletonList(defaultSegment.getInterval())); Assert.assertEquals(1, resultForExactInterval.size()); Assert.assertEquals(defaultSegment, resultForExactInterval.get(0).lhs); List> resultForIntervalWithLeftOverlap = - coordinator.retrieveUsedSegmentsAndCreatedDates(defaultSegment.getDataSource(), Intervals.of("2000/2015-01-02")); + coordinator.retrieveUsedSegmentsAndCreatedDates(defaultSegment.getDataSource(), Collections.singletonList(Intervals.of("2000/2015-01-02"))); Assert.assertEquals(resultForExactInterval, resultForIntervalWithLeftOverlap); List> resultForIntervalWithRightOverlap = - coordinator.retrieveUsedSegmentsAndCreatedDates(defaultSegment.getDataSource(), Intervals.of("2015-01-01/3000")); + coordinator.retrieveUsedSegmentsAndCreatedDates(defaultSegment.getDataSource(), Collections.singletonList(Intervals.of("2015-01-01/3000"))); Assert.assertEquals(resultForExactInterval, resultForIntervalWithRightOverlap); List> resultForEternity = - coordinator.retrieveUsedSegmentsAndCreatedDates(defaultSegment.getDataSource(), Intervals.ETERNITY); + coordinator.retrieveUsedSegmentsAndCreatedDates(defaultSegment.getDataSource(), Collections.singletonList(Intervals.ETERNITY)); Assert.assertEquals(resultForExactInterval, resultForEternity); } @@ -2902,11 +2902,11 @@ public void testRetrieveUsedSegmentsAndCreatedDatesFetchesEternityForAnyInterval insertUsedSegments(ImmutableSet.of(eternitySegment, firstHalfEternityRangeSegment, secondHalfEternityRangeSegment)); List> resultForRandomInterval = - coordinator.retrieveUsedSegmentsAndCreatedDates(defaultSegment.getDataSource(), defaultSegment.getInterval()); + coordinator.retrieveUsedSegmentsAndCreatedDates(defaultSegment.getDataSource(), Collections.singletonList(defaultSegment.getInterval())); Assert.assertEquals(3, resultForRandomInterval.size()); List> resultForEternity = - coordinator.retrieveUsedSegmentsAndCreatedDates(defaultSegment.getDataSource(), eternitySegment.getInterval()); + coordinator.retrieveUsedSegmentsAndCreatedDates(defaultSegment.getDataSource(), Collections.singletonList(eternitySegment.getInterval())); Assert.assertEquals(3, resultForEternity.size()); }