From 4c84c11c119888af0cab2587074593a3bd3f4d56 Mon Sep 17 00:00:00 2001 From: cryptoe Date: Fri, 24 Nov 2023 21:33:55 +0530 Subject: [PATCH 1/5] Fixing failing compaction/parallel index jobs during upgrade due to new actions not available on the overlord. --- .../MaterializedViewSupervisor.java | 4 ++- .../RetrieveSegmentsToReplaceAction.java | 28 ++++++++----------- .../indexing/common/config/TaskConfig.java | 27 +++++++++++++++--- .../indexing/input/DruidInputSource.java | 4 +-- .../ConcurrentReplaceAndAppendTest.java | 2 +- ...TestIndexerMetadataStorageCoordinator.java | 2 +- .../IndexerMetadataStorageCoordinator.java | 6 ++-- .../IndexerSQLMetadataStorageCoordinator.java | 14 ++++++---- ...exerSQLMetadataStorageCoordinatorTest.java | 16 +++++------ 9 files changed, 61 insertions(+), 42 deletions(-) diff --git a/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisor.java b/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisor.java index af5c0fbe95a1..333868780157 100644 --- a/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisor.java +++ b/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisor.java @@ -57,6 +57,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -371,7 +372,8 @@ Pair, Map>> checkSegment // Pair max(created_date), interval -> list> Pair, Map>> baseSegmentsSnapshot = getMaxCreateDateAndBaseSegments( - metadataStorageCoordinator.retrieveUsedSegmentsAndCreatedDates(spec.getBaseDataSource(), Intervals.ETERNITY) + metadataStorageCoordinator.retrieveUsedSegmentsAndCreatedDates(spec.getBaseDataSource(), + Collections.singletonList(Intervals.ETERNITY)) ); // baseSegments are used to create HadoopIndexTask Map> baseSegments = baseSegmentsSnapshot.rhs; diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/RetrieveSegmentsToReplaceAction.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/RetrieveSegmentsToReplaceAction.java index 78e6ada5c1e2..f6effedbd37e 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/RetrieveSegmentsToReplaceAction.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/RetrieveSegmentsToReplaceAction.java @@ -38,6 +38,7 @@ import java.util.Collection; import java.util.HashMap; import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Set; @@ -67,16 +68,16 @@ public class RetrieveSegmentsToReplaceAction implements TaskAction intervals; @JsonCreator public RetrieveSegmentsToReplaceAction( @JsonProperty("dataSource") String dataSource, - @JsonProperty("interval") Interval interval + @JsonProperty("intervals") List intervals ) { this.dataSource = dataSource; - this.interval = interval; + this.intervals = intervals; } @JsonProperty @@ -86,9 +87,9 @@ public String getDataSource() } @JsonProperty - public Interval getInterval() + public List getIntervals() { - return interval; + return intervals; } @Override @@ -128,7 +129,7 @@ public Collection perform(Task task, TaskActionToolbox toolbox) Map>> intervalToCreatedToSegments = new HashMap<>(); for (Pair segmentAndCreatedDate : - toolbox.getIndexerMetadataStorageCoordinator().retrieveUsedSegmentsAndCreatedDates(dataSource, interval)) { + toolbox.getIndexerMetadataStorageCoordinator().retrieveUsedSegmentsAndCreatedDates(dataSource, intervals)) { final DataSegment segment = segmentAndCreatedDate.lhs; final String created = segmentAndCreatedDate.rhs; intervalToCreatedToSegments.computeIfAbsent(segment.getInterval(), s -> new HashMap<>()) @@ -165,7 +166,7 @@ public Collection perform(Task task, TaskActionToolbox toolbox) private Collection retrieveAllVisibleSegments(TaskActionToolbox toolbox) { return toolbox.getIndexerMetadataStorageCoordinator() - .retrieveUsedSegmentsForInterval(dataSource, interval, Segments.ONLY_VISIBLE); + .retrieveUsedSegmentsForIntervals(dataSource, intervals, Segments.ONLY_VISIBLE); } @Override @@ -185,25 +186,20 @@ public boolean equals(Object o) } RetrieveSegmentsToReplaceAction that = (RetrieveSegmentsToReplaceAction) o; - - if (!dataSource.equals(that.dataSource)) { - return false; - } - return interval.equals(that.interval); + return Objects.equals(dataSource, that.dataSource) && Objects.equals(intervals, that.intervals); } @Override public int hashCode() { - return Objects.hash(dataSource, interval); + return Objects.hash(dataSource, intervals); } - @Override public String toString() { - return getClass().getSimpleName() + "{" + + return "RetrieveSegmentsToReplaceAction{" + "dataSource='" + dataSource + '\'' + - ", interval=" + interval + + ", intervals=" + intervals + '}'; } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/config/TaskConfig.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/config/TaskConfig.java index db48d6f07f7d..1253f59dc456 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/config/TaskConfig.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/config/TaskConfig.java @@ -79,6 +79,7 @@ public enum BatchProcessingMode private static final Period DEFAULT_GRACEFUL_SHUTDOWN_TIMEOUT = new Period("PT5M"); private static final boolean DEFAULT_STORE_EMPTY_COLUMNS = true; private static final long DEFAULT_TMP_STORAGE_BYTES_PER_TASK = -1; + private static final boolean DEFAULT_ENABLE_CONCURRENT_APPEND_AND_REPLACE = false; @JsonProperty private final String baseDir; @@ -125,6 +126,9 @@ public enum BatchProcessingMode @JsonProperty private final long tmpStorageBytesPerTask; + @JsonProperty + private final boolean enableConcurrentAppendAndReplace; + @JsonCreator public TaskConfig( @JsonProperty("baseDir") String baseDir, @@ -142,7 +146,8 @@ public TaskConfig( @JsonProperty("batchProcessingMode") String batchProcessingMode, @JsonProperty("storeEmptyColumns") @Nullable Boolean storeEmptyColumns, @JsonProperty("encapsulatedTask") boolean enableTaskLevelLogPush, - @JsonProperty("tmpStorageBytesPerTask") @Nullable Long tmpStorageBytesPerTask + @JsonProperty("tmpStorageBytesPerTask") @Nullable Long tmpStorageBytesPerTask, + @JsonProperty("enableConcurrentAppendAndReplace") @Nullable Boolean enableConcurrentAppendAndReplace ) { this.baseDir = Configs.valueOrDefault(baseDir, System.getProperty("java.io.tmpdir")); @@ -193,6 +198,10 @@ public TaskConfig( this.storeEmptyColumns = Configs.valueOrDefault(storeEmptyColumns, DEFAULT_STORE_EMPTY_COLUMNS); this.tmpStorageBytesPerTask = Configs.valueOrDefault(tmpStorageBytesPerTask, DEFAULT_TMP_STORAGE_BYTES_PER_TASK); + this.enableConcurrentAppendAndReplace = Configs.valueOrDefault( + enableConcurrentAppendAndReplace, + DEFAULT_ENABLE_CONCURRENT_APPEND_AND_REPLACE + ); } private TaskConfig( @@ -210,7 +219,8 @@ private TaskConfig( BatchProcessingMode batchProcessingMode, boolean storeEmptyColumns, boolean encapsulatedTask, - long tmpStorageBytesPerTask + long tmpStorageBytesPerTask, + boolean enableConcurrentAppendAndReplace ) { this.baseDir = baseDir; @@ -228,6 +238,7 @@ private TaskConfig( this.storeEmptyColumns = storeEmptyColumns; this.encapsulatedTask = encapsulatedTask; this.tmpStorageBytesPerTask = tmpStorageBytesPerTask; + this.enableConcurrentAppendAndReplace = enableConcurrentAppendAndReplace; } @JsonProperty @@ -344,6 +355,12 @@ public long getTmpStorageBytesPerTask() return tmpStorageBytesPerTask; } + @JsonProperty + public boolean isConcurrentAppendAndReplaceEnabled() + { + return enableConcurrentAppendAndReplace; + } + private String defaultDir(@Nullable String configParameter, final String defaultVal) { if (configParameter == null) { @@ -370,7 +387,8 @@ public TaskConfig withBaseTaskDir(File baseTaskDir) batchProcessingMode, storeEmptyColumns, encapsulatedTask, - tmpStorageBytesPerTask + tmpStorageBytesPerTask, + enableConcurrentAppendAndReplace ); } @@ -391,7 +409,8 @@ public TaskConfig withTmpStorageBytesPerTask(long tmpStorageBytesPerTask) batchProcessingMode, storeEmptyColumns, encapsulatedTask, - tmpStorageBytesPerTask + tmpStorageBytesPerTask, + enableConcurrentAppendAndReplace ); } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidInputSource.java b/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidInputSource.java index 2e548e181f40..85617728e5e6 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidInputSource.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidInputSource.java @@ -546,7 +546,7 @@ public static List> getTimelineForInte Preconditions.checkNotNull(interval); final Collection usedSegments; - if (toolbox == null) { + if (toolbox == null || !toolbox.getConfig().isConcurrentAppendAndReplaceEnabled()) { usedSegments = FutureUtils.getUnchecked( coordinatorClient.fetchUsedSegments(dataSource, Collections.singletonList(interval)), true @@ -554,7 +554,7 @@ public static List> getTimelineForInte } else { try { usedSegments = toolbox.getTaskActionClient() - .submit(new RetrieveSegmentsToReplaceAction(dataSource, interval)); + .submit(new RetrieveSegmentsToReplaceAction(dataSource, Collections.singletonList(interval))); } catch (IOException e) { LOG.error(e, "Error retrieving the used segments for interval[%s].", interval); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ConcurrentReplaceAndAppendTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ConcurrentReplaceAndAppendTest.java index 1c4b6809c387..7f83a8f02332 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ConcurrentReplaceAndAppendTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ConcurrentReplaceAndAppendTest.java @@ -957,7 +957,7 @@ private void verifyInputSegments(Task task, Interval interval, DataSegment... ex Collection allUsedSegments = taskActionClient.submit( new RetrieveSegmentsToReplaceAction( WIKI, - interval + Collections.singletonList(interval) ) ); Assert.assertEquals(Sets.newHashSet(expectedSegments), Sets.newHashSet(allUsedSegments)); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java b/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java index 143a74c72cbc..bca79a559af7 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java @@ -89,7 +89,7 @@ public List retrieveAllUsedSegments(String dataSource, Segments vis } @Override - public List> retrieveUsedSegmentsAndCreatedDates(String dataSource, Interval interval) + public List> retrieveUsedSegmentsAndCreatedDates(String dataSource, List intervals) { return ImmutableList.of(); } diff --git a/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java b/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java index 34a55574dce9..dc9dc28bbb62 100644 --- a/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java +++ b/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java @@ -84,7 +84,7 @@ default Collection retrieveUsedSegmentsForInterval( /** * * Retrieve all published segments which are marked as used and the created_date of these segments belonging to the - * given data source and interval from the metadata store. + * given data source and List from the metadata store. * * Unlike other similar methods in this interface, this method doesn't accept a {@link Segments} "visibility" * parameter. The returned collection may include overshadowed segments and their created_dates, as if {@link @@ -92,11 +92,11 @@ default Collection retrieveUsedSegmentsForInterval( * if needed. * * @param dataSource The data source to query - * @param interval The interval to query + * @param intervals The list of interval to query * * @return The DataSegments and the related created_date of segments */ - Collection> retrieveUsedSegmentsAndCreatedDates(String dataSource, Interval interval); + Collection> retrieveUsedSegmentsAndCreatedDates(String dataSource, List intervals); /** * Retrieve all published segments which may include any data in the given intervals and are marked as used from the diff --git a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java index 612f712c1bb2..f00fe6989e41 100644 --- a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java +++ b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java @@ -174,21 +174,23 @@ private Collection doRetrieveUsedSegments( } @Override - public List> retrieveUsedSegmentsAndCreatedDates(String dataSource, Interval interval) + public List> retrieveUsedSegmentsAndCreatedDates(String dataSource, List intervals) { StringBuilder queryBuilder = new StringBuilder( "SELECT created_date, payload FROM %1$s WHERE dataSource = :dataSource AND used = true" ); - final List intervals = new ArrayList<>(); - // Do not need an interval condition if the interval is ETERNITY - if (!Intervals.isEternity(interval)) { - intervals.add(interval); + boolean intervalsAreEternity = false; + for (Interval interval : intervals) { + if (Intervals.isEternity(interval)) { + intervalsAreEternity = true; + break; + } } SqlSegmentsMetadataQuery.appendConditionForIntervalsAndMatchMode( queryBuilder, - intervals, + intervalsAreEternity ? Collections.emptyList() : intervals, SqlSegmentsMetadataQuery.IntervalMode.OVERLAPS, connector ); diff --git a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java index 5d76296d67bd..8e2e7eb747fb 100644 --- a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java +++ b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java @@ -2870,28 +2870,28 @@ public void testRetrieveUsedSegmentsAndCreatedDates() insertUsedSegments(ImmutableSet.of(defaultSegment)); List> resultForIntervalOnTheLeft = - coordinator.retrieveUsedSegmentsAndCreatedDates(defaultSegment.getDataSource(), Intervals.of("2000/2001")); + coordinator.retrieveUsedSegmentsAndCreatedDates(defaultSegment.getDataSource(), Collections.singletonList(Intervals.of("2000/2001"))); Assert.assertTrue(resultForIntervalOnTheLeft.isEmpty()); List> resultForIntervalOnTheRight = - coordinator.retrieveUsedSegmentsAndCreatedDates(defaultSegment.getDataSource(), Intervals.of("3000/3001")); + coordinator.retrieveUsedSegmentsAndCreatedDates(defaultSegment.getDataSource(), Collections.singletonList(Intervals.of("3000/3001"))); Assert.assertTrue(resultForIntervalOnTheRight.isEmpty()); List> resultForExactInterval = - coordinator.retrieveUsedSegmentsAndCreatedDates(defaultSegment.getDataSource(), defaultSegment.getInterval()); + coordinator.retrieveUsedSegmentsAndCreatedDates(defaultSegment.getDataSource(), Collections.singletonList(defaultSegment.getInterval())); Assert.assertEquals(1, resultForExactInterval.size()); Assert.assertEquals(defaultSegment, resultForExactInterval.get(0).lhs); List> resultForIntervalWithLeftOverlap = - coordinator.retrieveUsedSegmentsAndCreatedDates(defaultSegment.getDataSource(), Intervals.of("2000/2015-01-02")); + coordinator.retrieveUsedSegmentsAndCreatedDates(defaultSegment.getDataSource(), Collections.singletonList(Intervals.of("2000/2015-01-02"))); Assert.assertEquals(resultForExactInterval, resultForIntervalWithLeftOverlap); List> resultForIntervalWithRightOverlap = - coordinator.retrieveUsedSegmentsAndCreatedDates(defaultSegment.getDataSource(), Intervals.of("2015-01-01/3000")); + coordinator.retrieveUsedSegmentsAndCreatedDates(defaultSegment.getDataSource(), Collections.singletonList(Intervals.of("2015-01-01/3000"))); Assert.assertEquals(resultForExactInterval, resultForIntervalWithRightOverlap); List> resultForEternity = - coordinator.retrieveUsedSegmentsAndCreatedDates(defaultSegment.getDataSource(), Intervals.ETERNITY); + coordinator.retrieveUsedSegmentsAndCreatedDates(defaultSegment.getDataSource(), Collections.singletonList(Intervals.ETERNITY)); Assert.assertEquals(resultForExactInterval, resultForEternity); } @@ -2902,11 +2902,11 @@ public void testRetrieveUsedSegmentsAndCreatedDatesFetchesEternityForAnyInterval insertUsedSegments(ImmutableSet.of(eternitySegment, firstHalfEternityRangeSegment, secondHalfEternityRangeSegment)); List> resultForRandomInterval = - coordinator.retrieveUsedSegmentsAndCreatedDates(defaultSegment.getDataSource(), defaultSegment.getInterval()); + coordinator.retrieveUsedSegmentsAndCreatedDates(defaultSegment.getDataSource(), Collections.singletonList(defaultSegment.getInterval())); Assert.assertEquals(3, resultForRandomInterval.size()); List> resultForEternity = - coordinator.retrieveUsedSegmentsAndCreatedDates(defaultSegment.getDataSource(), eternitySegment.getInterval()); + coordinator.retrieveUsedSegmentsAndCreatedDates(defaultSegment.getDataSource(), Collections.singletonList(eternitySegment.getInterval())); Assert.assertEquals(3, resultForEternity.size()); } From ca12664621e2ecc5b1da7c17e9f5a331b2bf144a Mon Sep 17 00:00:00 2001 From: cryptoe Date: Fri, 24 Nov 2023 21:52:56 +0530 Subject: [PATCH 2/5] Fixing build --- .../common/config/TaskConfigBuilder.java | 17 ++++++++++++++++- .../indexing/overlord/TaskLifecycleTest.java | 1 + 2 files changed, 17 insertions(+), 1 deletion(-) diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/config/TaskConfigBuilder.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/config/TaskConfigBuilder.java index af920ebbeb73..acbd7ecffa16 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/config/TaskConfigBuilder.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/config/TaskConfigBuilder.java @@ -42,6 +42,8 @@ public class TaskConfigBuilder private boolean enableTaskLevelLogPush; private Long tmpStorageBytesPerTask; + private Boolean enableConcurrentAppendAndReplace; + public TaskConfigBuilder setBaseDir(String baseDir) { this.baseDir = baseDir; @@ -132,6 +134,18 @@ public TaskConfigBuilder setTmpStorageBytesPerTask(Long tmpStorageBytesPerTask) return this; } + public TaskConfigBuilder enableConcurrentAppendAndReplace() + { + this.enableConcurrentAppendAndReplace = true; + return this; + } + + public TaskConfigBuilder disableConcurrentAppendAndReplace() + { + this.enableConcurrentAppendAndReplace = false; + return this; + } + public TaskConfig build() { return new TaskConfig( @@ -149,7 +163,8 @@ public TaskConfig build() batchProcessingMode, storeEmptyColumns, enableTaskLevelLogPush, - tmpStorageBytesPerTask + tmpStorageBytesPerTask, + enableConcurrentAppendAndReplace ); } } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java index 572364a56a28..627c161863b6 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java @@ -615,6 +615,7 @@ private TaskToolboxFactory setUpTaskToolboxFactory( .setDefaultRowFlushBoundary(50000) .setBatchProcessingMode(TaskConfig.BATCH_PROCESSING_MODE_DEFAULT.name()) .setTmpStorageBytesPerTask(-1L) + .enableConcurrentAppendAndReplace() .build(); return new TaskToolboxFactory( From 429e421145bc57ac5dd1766d0bdd7abe9fdb4751 Mon Sep 17 00:00:00 2001 From: cryptoe Date: Fri, 24 Nov 2023 21:54:00 +0530 Subject: [PATCH 3/5] Removing extra space. --- .../apache/druid/indexing/common/config/TaskConfigBuilder.java | 1 - 1 file changed, 1 deletion(-) diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/config/TaskConfigBuilder.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/config/TaskConfigBuilder.java index acbd7ecffa16..8b488fff8093 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/config/TaskConfigBuilder.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/config/TaskConfigBuilder.java @@ -41,7 +41,6 @@ public class TaskConfigBuilder private Boolean storeEmptyColumns; private boolean enableTaskLevelLogPush; private Long tmpStorageBytesPerTask; - private Boolean enableConcurrentAppendAndReplace; public TaskConfigBuilder setBaseDir(String baseDir) From 4ca8cab55b57916e7a459a48721e2fada7b418ff Mon Sep 17 00:00:00 2001 From: cryptoe Date: Sat, 25 Nov 2023 08:08:58 +0530 Subject: [PATCH 4/5] Fixing json getter. --- .../org/apache/druid/indexing/common/config/TaskConfig.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/config/TaskConfig.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/config/TaskConfig.java index 1253f59dc456..3352735b9e0e 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/config/TaskConfig.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/config/TaskConfig.java @@ -355,7 +355,7 @@ public long getTmpStorageBytesPerTask() return tmpStorageBytesPerTask; } - @JsonProperty + @JsonProperty("enableConcurrentAppendAndReplace") public boolean isConcurrentAppendAndReplaceEnabled() { return enableConcurrentAppendAndReplace; From 982e68fda9425695212eafa7cc3d8494be6e26a8 Mon Sep 17 00:00:00 2001 From: cryptoe Date: Sat, 25 Nov 2023 10:27:47 +0530 Subject: [PATCH 5/5] Review comments. --- .../common/actions/RetrieveSegmentsToReplaceAction.java | 3 --- .../overlord/IndexerMetadataStorageCoordinator.java | 2 +- .../metadata/IndexerSQLMetadataStorageCoordinator.java | 6 +++--- 3 files changed, 4 insertions(+), 7 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/RetrieveSegmentsToReplaceAction.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/RetrieveSegmentsToReplaceAction.java index f6effedbd37e..7fec3369a824 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/RetrieveSegmentsToReplaceAction.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/RetrieveSegmentsToReplaceAction.java @@ -20,7 +20,6 @@ package org.apache.druid.indexing.common.actions; import com.fasterxml.jackson.annotation.JsonCreator; -import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.core.type.TypeReference; import org.apache.druid.indexing.common.task.Task; @@ -64,10 +63,8 @@ public class RetrieveSegmentsToReplaceAction implements TaskAction intervals; @JsonCreator diff --git a/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java b/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java index dc9dc28bbb62..3cea2e6dd581 100644 --- a/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java +++ b/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java @@ -84,7 +84,7 @@ default Collection retrieveUsedSegmentsForInterval( /** * * Retrieve all published segments which are marked as used and the created_date of these segments belonging to the - * given data source and List from the metadata store. + * given data source and list of intervals from the metadata store. * * Unlike other similar methods in this interface, this method doesn't accept a {@link Segments} "visibility" * parameter. The returned collection may include overshadowed segments and their created_dates, as if {@link diff --git a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java index f00fe6989e41..62f55f96c475 100644 --- a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java +++ b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java @@ -180,17 +180,17 @@ public List> retrieveUsedSegmentsAndCreatedDates(Strin "SELECT created_date, payload FROM %1$s WHERE dataSource = :dataSource AND used = true" ); - boolean intervalsAreEternity = false; + boolean hasEternityInterval = false; for (Interval interval : intervals) { if (Intervals.isEternity(interval)) { - intervalsAreEternity = true; + hasEternityInterval = true; break; } } SqlSegmentsMetadataQuery.appendConditionForIntervalsAndMatchMode( queryBuilder, - intervalsAreEternity ? Collections.emptyList() : intervals, + hasEternityInterval ? Collections.emptyList() : intervals, SqlSegmentsMetadataQuery.IntervalMode.OVERLAPS, connector );