diff --git a/docs/ingestion/compaction.md b/docs/ingestion/compaction.md index e50d2625e536..239523d401be 100644 --- a/docs/ingestion/compaction.md +++ b/docs/ingestion/compaction.md @@ -52,7 +52,9 @@ In cases where you require more control over compaction, you can manually submit See [Setting up a manual compaction task](#setting-up-manual-compaction) for more about manual compaction tasks. ## Data handling with compaction -During compaction, Druid overwrites the original set of segments with the compacted set. During compaction Druid locks the segments for the time interval being compacted to ensure data consistency. By default, compaction tasks do not modify the underlying data. You can configure the compaction task to change the query granularity or add or remove dimensions in the compaction task. This means that the only changes to query results should be the result of intentional, not automatic, changes. +During compaction, Druid overwrites the original set of segments with the compacted set. Druid also locks the segments for the time interval being compacted to ensure data consistency. By default, compaction tasks do not modify the underlying data. You can configure the compaction task to change the query granularity or add or remove dimensions in the compaction task. This means that the only changes to query results should be the result of intentional, not automatic, changes. + +For compaction tasks, `dropExisting` for underlying ingestion tasks is "true". This means that Druid can drop (mark unused) all the existing segments fully within interval for the compaction task. For an example of why this is important, see the suggestion for reindexing with finer granularity under [Implementation considerations](native-batch.md#implementation-considerations). If an ingestion task needs to write data to a segment for a time interval locked for compaction, by default the ingestion task supersedes the compaction task and the compaction task fails without finishing. For manual compaction tasks you can adjust the input spec interval to avoid conflicts between ingestion and compaction. For automatic compaction, you can set the `skipOffsetFromLatest` key to adjustment the auto compaction starting point from the current time to reduce the chance of conflicts between ingestion and compaction. See [Compaction dynamic configuration](../configuration/index.md#compaction-dynamic-configuration) for more information. Another option is to set the compaction task to higher priority than the ingestion task. diff --git a/docs/ingestion/native-batch.md b/docs/ingestion/native-batch.md index dece5bf260c4..3b0d2a9b254f 100644 --- a/docs/ingestion/native-batch.md +++ b/docs/ingestion/native-batch.md @@ -74,7 +74,7 @@ The supported compression formats for native batch ingestion are `bz2`, `gz`, `x - [`static-cloudfiles`](../development/extensions-contrib/cloudfiles.md#firehose) -You may want to consider the below things: +### Implementation considerations - You may want to control the amount of input data each worker task processes. This can be controlled using different configurations depending on the phase in parallel ingestion (see [`partitionsSpec`](#partitionsspec) for more details). @@ -89,7 +89,33 @@ You may want to consider the below things: data in segments where it actively adds data: if there are segments in your `granularitySpec`'s intervals that have no data written by this task, they will be left alone. If any existing segments partially overlap with the `granularitySpec`'s intervals, the portion of those segments outside the new segments' intervals will still be visible. - +- You can set `dropExisting` flag in the `ioConfig` to true if you want the ingestion task to drop all existing segments that + start and end within your `granularitySpec`'s intervals. This applies whether or not the new data covers all existing segments. + `dropExisting` only applies when `appendToExisting` is false and the `granularitySpec` contains an `interval`. + + The following examples demonstrate when to set the `dropExisting` property to true in the `ioConfig`: + + - Example 1: Consider an existing segment with an interval of 2020-01-01 to 2021-01-01 and YEAR segmentGranularity. You want to + overwrite the whole interval of 2020-01-01 to 2021-01-01 with new data using the finer segmentGranularity of MONTH. + If the replacement data does not have a record within every months from 2020-01-01 to 2021-01-01 + Druid cannot drop the original YEAR segment even if it does include all the replacement. Set `dropExisting` to true in this case to drop + the original segment at year `segmentGranularity` since you no longer need it. + - Example 2: Consider the case where you want to re-ingest or overwrite a datasource and the new data does not contains some time intervals that exist + in the datasource. For example, a datasource contains the following data at MONTH segmentGranularity: + January: 1 record + February: 10 records + March: 10 records + You want to re-ingest and overwrite with new data as follows: + January: 0 records + February: 10 records + March: 9 records + Unless you set `dropExisting` to true, the result after ingestion with overwrite using the same MONTH segmentGranularity would be: + January: 1 record + February: 10 records + March: 9 records + This is incorrect since the new data has 0 records for January. Setting `dropExisting` to true to drop the original + segment for January that is not needed since the newly ingested data has no records for January. + ### Task syntax A sample task is shown below: @@ -193,6 +219,7 @@ that range if there's some stray data with unexpected timestamps. |type|The task type, this should always be `index_parallel`.|none|yes| |inputFormat|[`inputFormat`](./data-formats.md#input-format) to specify how to parse input data.|none|yes| |appendToExisting|Creates segments as additional shards of the latest version, effectively appending to the segment set instead of replacing it. This means that you can append new segments to any datasource regardless of its original partitioning scheme. You must use the `dynamic` partitioning type for the appended segments. If you specify a different partitioning type, the task fails with an error.|false|no| +|dropExisting|If `true` and `appendToExisting` is `false` and the `granularitySpec` contains an`interval`, then the ingestion task drops (mark unused) all existing segments fully contained by the specified `interval` when the task publishes new segments. If ingestion fails, Druid does not drop or mark unused any segments. In the case of misconfiguration where either `appendToExisting` is `true` or `interval` is not specified in `granularitySpec`, Druid does not drop any segments even if `dropExisting` is `true`.|false|no| ### `tuningConfig` @@ -538,7 +565,8 @@ An example of the result is "l_comment" ] }, - "appendToExisting": false + "appendToExisting": false, + "dropExisting": false }, "tuningConfig": { "type": "index_parallel", @@ -719,6 +747,7 @@ that range if there's some stray data with unexpected timestamps. |type|The task type, this should always be "index".|none|yes| |inputFormat|[`inputFormat`](./data-formats.md#input-format) to specify how to parse input data.|none|yes| |appendToExisting|Creates segments as additional shards of the latest version, effectively appending to the segment set instead of replacing it. This means that you can append new segments to any datasource regardless of its original partitioning scheme. You must use the `dynamic` partitioning type for the appended segments. If you specify a different partitioning type, the task fails with an error.|false|no| +|dropExisting|If `true` and `appendToExisting` is `false` and the `granularitySpec` contains an`interval`, then the ingestion task drops (mark unused) all existing segments fully contained by the specified `interval` when the task publishes new segments. If ingestion fails, Druid does not drop or mark unused any segments. In the case of misconfiguration where either `appendToExisting` is `true` or `interval` is not specified in `granularitySpec`, Druid does not drop any segments even if `dropExisting` is `true`.|false|no| ### `tuningConfig` diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertAction.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertAction.java index a3a15de99d29..1e0df0e13231 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertAction.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertAction.java @@ -54,9 +54,22 @@ */ public class SegmentTransactionalInsertAction implements TaskAction { + /** + * Set of segments that was fully overshadowed by new segments, {@link SegmentTransactionalInsertAction#segments} + */ @Nullable private final Set segmentsToBeOverwritten; + /** + * Set of segments to be inserted into metadata storage + */ private final Set segments; + /** + * Set of segments to be dropped (mark unused) when new segments, {@link SegmentTransactionalInsertAction#segments}, + * are inserted into metadata storage. + */ + @Nullable + private final Set segmentsToBeDropped; + @Nullable private final DataSourceMetadata startMetadata; @Nullable @@ -66,10 +79,11 @@ public class SegmentTransactionalInsertAction implements TaskAction segmentsToBeOverwritten, + @Nullable Set segmentsToBeDropped, Set segmentsToPublish ) { - return new SegmentTransactionalInsertAction(segmentsToBeOverwritten, segmentsToPublish, null, null, null); + return new SegmentTransactionalInsertAction(segmentsToBeOverwritten, segmentsToBeDropped, segmentsToPublish, null, null, null); } public static SegmentTransactionalInsertAction appendAction( @@ -78,7 +92,7 @@ public static SegmentTransactionalInsertAction appendAction( @Nullable DataSourceMetadata endMetadata ) { - return new SegmentTransactionalInsertAction(null, segments, startMetadata, endMetadata, null); + return new SegmentTransactionalInsertAction(null, null, segments, startMetadata, endMetadata, null); } public static SegmentTransactionalInsertAction commitMetadataOnlyAction( @@ -87,12 +101,13 @@ public static SegmentTransactionalInsertAction commitMetadataOnlyAction( DataSourceMetadata endMetadata ) { - return new SegmentTransactionalInsertAction(null, null, startMetadata, endMetadata, dataSource); + return new SegmentTransactionalInsertAction(null, null, null, startMetadata, endMetadata, dataSource); } @JsonCreator private SegmentTransactionalInsertAction( @JsonProperty("segmentsToBeOverwritten") @Nullable Set segmentsToBeOverwritten, + @JsonProperty("segmentsToBeDropped") @Nullable Set segmentsToBeDropped, @JsonProperty("segments") @Nullable Set segments, @JsonProperty("startMetadata") @Nullable DataSourceMetadata startMetadata, @JsonProperty("endMetadata") @Nullable DataSourceMetadata endMetadata, @@ -100,6 +115,7 @@ private SegmentTransactionalInsertAction( ) { this.segmentsToBeOverwritten = segmentsToBeOverwritten; + this.segmentsToBeDropped = segmentsToBeDropped; this.segments = segments == null ? ImmutableSet.of() : ImmutableSet.copyOf(segments); this.startMetadata = startMetadata; this.endMetadata = endMetadata; @@ -113,6 +129,13 @@ public Set getSegmentsToBeOverwritten() return segmentsToBeOverwritten; } + @JsonProperty + @Nullable + public Set getSegmentsToBeDropped() + { + return segmentsToBeDropped; + } + @JsonProperty public Set getSegments() { @@ -176,6 +199,10 @@ public SegmentPublishResult perform(Task task, TaskActionToolbox toolbox) if (segmentsToBeOverwritten != null) { allSegments.addAll(segmentsToBeOverwritten); } + if (segmentsToBeDropped != null) { + allSegments.addAll(segmentsToBeDropped); + } + TaskLocks.checkLockCoversSegments(task, toolbox.getTaskLockbox(), allSegments); if (segmentsToBeOverwritten != null && !segmentsToBeOverwritten.isEmpty()) { @@ -194,6 +221,7 @@ public SegmentPublishResult perform(Task task, TaskActionToolbox toolbox) .onValidLocks( () -> toolbox.getIndexerMetadataStorageCoordinator().announceHistoricalSegments( segments, + segmentsToBeDropped, startMetadata, endMetadata ) @@ -305,7 +333,8 @@ public String toString() ", segments=" + SegmentUtils.commaSeparatedIdentifiers(segments) + ", startMetadata=" + startMetadata + ", endMetadata=" + endMetadata + - ", dataSource=" + dataSource + + ", dataSource='" + dataSource + '\'' + + ", segmentsToBeDropped=" + SegmentUtils.commaSeparatedIdentifiers(segmentsToBeDropped) + '}'; } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java index 72f150226644..a51a3c248dbe 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java @@ -67,6 +67,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -481,6 +482,28 @@ public static Function, Set> compactionStateAnnota } } + public static Set getUsedSegmentsWithinInterval( + TaskToolbox toolbox, + String dataSource, + List intervals + ) throws IOException + { + Set segmentsFoundForDrop = new HashSet<>(); + List condensedIntervals = JodaUtils.condenseIntervals(intervals); + if (!intervals.isEmpty()) { + Collection usedSegment = toolbox.getTaskActionClient().submit(new RetrieveUsedSegmentsAction(dataSource, null, condensedIntervals, Segments.ONLY_VISIBLE)); + for (DataSegment segment : usedSegment) { + for (Interval interval : condensedIntervals) { + if (interval.contains(segment.getInterval())) { + segmentsFoundForDrop.add(segment); + break; + } + } + } + } + return segmentsFoundForDrop; + } + @Nullable static Granularity findGranularityFromSegments(List segments) { diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java index 064d2caa3350..3e5bd4f09ff8 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java @@ -325,11 +325,17 @@ public TaskStatus run(final TaskToolbox toolbox) int sequenceNumber = 0; String sequenceName = makeSequenceName(getId(), sequenceNumber); - final TransactionalSegmentPublisher publisher = (mustBeNullOrEmptySegments, segments, commitMetadata) -> { - if (mustBeNullOrEmptySegments != null && !mustBeNullOrEmptySegments.isEmpty()) { + final TransactionalSegmentPublisher publisher = (mustBeNullOrEmptyOverwriteSegments, mustBeNullOrEmptyDropSegments, segments, commitMetadata) -> { + if (mustBeNullOrEmptyOverwriteSegments != null && !mustBeNullOrEmptyOverwriteSegments.isEmpty()) { throw new ISE( "Stream ingestion task unexpectedly attempted to overwrite segments: %s", - SegmentUtils.commaSeparatedIdentifiers(mustBeNullOrEmptySegments) + SegmentUtils.commaSeparatedIdentifiers(mustBeNullOrEmptyOverwriteSegments) + ); + } + if (mustBeNullOrEmptyDropSegments != null && !mustBeNullOrEmptyDropSegments.isEmpty()) { + throw new ISE( + "Stream ingestion task unexpectedly attempted to drop segments: %s", + SegmentUtils.commaSeparatedIdentifiers(mustBeNullOrEmptyDropSegments) ); } final SegmentTransactionalInsertAction action = SegmentTransactionalInsertAction.appendAction( diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java index 6f69811f3e58..ed4bf7eb2b3a 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java @@ -633,7 +633,8 @@ private static ParallelIndexIOConfig createIoConfig( toolbox.getConfig() ), null, - false + false, + true ); } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java index 48b05732e33b..bb1ee7714633 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java @@ -854,9 +854,14 @@ private TaskStatus generateAndPublishSegments( throw new UOE("[%s] secondary partition type is not supported", partitionsSpec.getType()); } - final TransactionalSegmentPublisher publisher = (segmentsToBeOverwritten, segmentsToPublish, commitMetadata) -> + Set segmentsFoundForDrop = null; + if (ingestionSchema.getIOConfig().isDropExisting()) { + segmentsFoundForDrop = getUsedSegmentsWithinInterval(toolbox, getDataSource(), ingestionSchema.getDataSchema().getGranularitySpec().inputIntervals()); + } + + final TransactionalSegmentPublisher publisher = (segmentsToBeOverwritten, segmentsToDrop, segmentsToPublish, commitMetadata) -> toolbox.getTaskActionClient() - .submit(SegmentTransactionalInsertAction.overwriteAction(segmentsToBeOverwritten, segmentsToPublish)); + .submit(SegmentTransactionalInsertAction.overwriteAction(segmentsToBeOverwritten, segmentsToDrop, segmentsToPublish)); String effectiveId = getContextValue(CompactionTask.CTX_KEY_APPENDERATOR_TRACKING_TASK_ID, null); if (effectiveId == null) { @@ -910,7 +915,7 @@ private TaskStatus generateAndPublishSegments( // Probably we can publish atomicUpdateGroup along with segments. final SegmentsAndCommitMetadata published = - awaitPublish(driver.publishAll(inputSegments, publisher, annotateFunction), pushTimeout); + awaitPublish(driver.publishAll(inputSegments, segmentsFoundForDrop, publisher, annotateFunction), pushTimeout); appenderator.close(); ingestionState = IngestionState.COMPLETED; @@ -1028,18 +1033,21 @@ public IndexTuningConfig getTuningConfig() public static class IndexIOConfig implements BatchIOConfig { private static final boolean DEFAULT_APPEND_TO_EXISTING = false; + private static final boolean DEFAULT_DROP_EXISTING = false; private final FirehoseFactory firehoseFactory; private final InputSource inputSource; private final InputFormat inputFormat; private final boolean appendToExisting; + private final boolean dropExisting; @JsonCreator public IndexIOConfig( @Deprecated @JsonProperty("firehose") @Nullable FirehoseFactory firehoseFactory, @JsonProperty("inputSource") @Nullable InputSource inputSource, @JsonProperty("inputFormat") @Nullable InputFormat inputFormat, - @JsonProperty("appendToExisting") @Nullable Boolean appendToExisting + @JsonProperty("appendToExisting") @Nullable Boolean appendToExisting, + @JsonProperty("dropExisting") @Nullable Boolean dropExisting ) { Checks.checkOneNotNullOrEmpty( @@ -1052,13 +1060,18 @@ public IndexIOConfig( this.inputSource = inputSource; this.inputFormat = inputFormat; this.appendToExisting = appendToExisting == null ? DEFAULT_APPEND_TO_EXISTING : appendToExisting; + this.dropExisting = dropExisting == null ? DEFAULT_DROP_EXISTING : dropExisting; + if (this.dropExisting && this.appendToExisting) { + throw new IAE("Cannot both drop existing segments and append to existing segments. " + + "Either dropExisting or appendToExisting should be set to false"); + } } // old constructor for backward compatibility @Deprecated - public IndexIOConfig(FirehoseFactory firehoseFactory, @Nullable Boolean appendToExisting) + public IndexIOConfig(FirehoseFactory firehoseFactory, @Nullable Boolean appendToExisting, @Nullable Boolean dropExisting) { - this(firehoseFactory, null, null, appendToExisting); + this(firehoseFactory, null, null, appendToExisting, dropExisting); } @Nullable @@ -1113,6 +1126,13 @@ public boolean isAppendToExisting() { return appendToExisting; } + + @Override + @JsonProperty + public boolean isDropExisting() + { + return dropExisting; + } } public static class IndexTuningConfig implements AppenderatorConfig diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/InputSourceSplitParallelIndexTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/InputSourceSplitParallelIndexTaskRunner.java index ac4169d9f670..b06539137455 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/InputSourceSplitParallelIndexTaskRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/InputSourceSplitParallelIndexTaskRunner.java @@ -97,7 +97,8 @@ final SubTaskSpec newTaskSpec(InputSplit split) firehoseFactory, inputSource, ingestionSchema.getIOConfig().getInputFormat(), - ingestionSchema.getIOConfig().isAppendToExisting() + ingestionSchema.getIOConfig().isAppendToExisting(), + ingestionSchema.getIOConfig().isDropExisting() ), ingestionSchema.getTuningConfig() ); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexIOConfig.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexIOConfig.java index 860e81673c54..9790a3817c4b 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexIOConfig.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexIOConfig.java @@ -40,16 +40,17 @@ public ParallelIndexIOConfig( @JsonProperty("firehose") @Nullable FirehoseFactory firehoseFactory, @JsonProperty("inputSource") @Nullable InputSource inputSource, @JsonProperty("inputFormat") @Nullable InputFormat inputFormat, - @JsonProperty("appendToExisting") @Nullable Boolean appendToExisting + @JsonProperty("appendToExisting") @Nullable Boolean appendToExisting, + @JsonProperty("dropExisting") @Nullable Boolean dropExisting ) { - super(firehoseFactory, inputSource, inputFormat, appendToExisting); + super(firehoseFactory, inputSource, inputFormat, appendToExisting, dropExisting); } // old constructor for backward compatibility @Deprecated public ParallelIndexIOConfig(FirehoseFactory firehoseFactory, @Nullable Boolean appendToExisting) { - this(firehoseFactory, null, null, appendToExisting); + this(firehoseFactory, null, null, appendToExisting, null); } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java index af4352b5b737..03927ad08bd1 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java @@ -932,12 +932,18 @@ private void publishSegments( ingestionSchema.getTuningConfig(), ingestionSchema.getDataSchema().getGranularitySpec() ); - final TransactionalSegmentPublisher publisher = (segmentsToBeOverwritten, segmentsToPublish, commitMetadata) -> + + Set segmentsFoundForDrop = null; + if (ingestionSchema.getIOConfig().isDropExisting()) { + segmentsFoundForDrop = getUsedSegmentsWithinInterval(toolbox, getDataSource(), ingestionSchema.getDataSchema().getGranularitySpec().inputIntervals()); + } + + final TransactionalSegmentPublisher publisher = (segmentsToBeOverwritten, segmentsToDrop, segmentsToPublish, commitMetadata) -> toolbox.getTaskActionClient().submit( - SegmentTransactionalInsertAction.overwriteAction(segmentsToBeOverwritten, segmentsToPublish) + SegmentTransactionalInsertAction.overwriteAction(segmentsToBeOverwritten, segmentsToDrop, segmentsToPublish) ); final boolean published = newSegments.isEmpty() - || publisher.publishSegments(oldSegments, newSegments, annotateFunction, null) + || publisher.publishSegments(oldSegments, segmentsFoundForDrop, newSegments, annotateFunction, null) .isSuccess(); if (published) { diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexTaskRunner.java index 0e60ae1266ef..17987665a9c8 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexTaskRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexTaskRunner.java @@ -119,7 +119,8 @@ SubTaskSpec newTaskSpec(InputSplit split) firehoseFactory, inputSource, ingestionSchema.getIOConfig().getInputFormat(), - ingestionSchema.getIOConfig().isAppendToExisting() + ingestionSchema.getIOConfig().isAppendToExisting(), + ingestionSchema.getIOConfig().isDropExisting() ), ingestionSchema.getTuningConfig() ), diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SequenceMetadata.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SequenceMetadata.java index b7cff3130ba0..6a786523d4b4 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SequenceMetadata.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SequenceMetadata.java @@ -336,15 +336,22 @@ public SequenceMetadataTransactionalSegmentPublisher( @Override public SegmentPublishResult publishAnnotatedSegments( - @Nullable Set mustBeNullOrEmptySegments, + @Nullable Set mustBeNullOrEmptyOverwriteSegments, + @Nullable Set mustBeNullOrEmptyDropSegments, Set segmentsToPush, @Nullable Object commitMetadata ) throws IOException { - if (mustBeNullOrEmptySegments != null && !mustBeNullOrEmptySegments.isEmpty()) { + if (mustBeNullOrEmptyOverwriteSegments != null && !mustBeNullOrEmptyOverwriteSegments.isEmpty()) { throw new ISE( "Stream ingestion task unexpectedly attempted to overwrite segments: %s", - SegmentUtils.commaSeparatedIdentifiers(mustBeNullOrEmptySegments) + SegmentUtils.commaSeparatedIdentifiers(mustBeNullOrEmptyOverwriteSegments) + ); + } + if (mustBeNullOrEmptyDropSegments != null && !mustBeNullOrEmptyDropSegments.isEmpty()) { + throw new ISE( + "Stream ingestion task unexpectedly attempted to drop segments: %s", + SegmentUtils.commaSeparatedIdentifiers(mustBeNullOrEmptyDropSegments) ); } final Map commitMetaMap = (Map) Preconditions.checkNotNull(commitMetadata, "commitMetadata"); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertActionTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertActionTest.java index b9eb14c01c0c..5f50e4abf532 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertActionTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertActionTest.java @@ -97,7 +97,7 @@ private LockResult acquireTimeChunkLock(TaskLockType lockType, Task task, Interv } @Test - public void testTransactional() throws Exception + public void testTransactionalUpdateDataSourceMetadata() throws Exception { final Task task = NoopTask.create(); actionTestKit.getTaskLockbox().add(task); @@ -135,7 +135,40 @@ public void testTransactional() throws Exception } @Test - public void testFailTransactional() throws Exception + public void testTransactionalDropSegments() throws Exception + { + final Task task = NoopTask.create(); + actionTestKit.getTaskLockbox().add(task); + acquireTimeChunkLock(TaskLockType.EXCLUSIVE, task, INTERVAL, 5000); + + SegmentPublishResult result1 = SegmentTransactionalInsertAction.overwriteAction( + null, + null, + ImmutableSet.of(SEGMENT1) + ).perform( + task, + actionTestKit.getTaskActionToolbox() + ); + Assert.assertEquals(SegmentPublishResult.ok(ImmutableSet.of(SEGMENT1)), result1); + + SegmentPublishResult result2 = SegmentTransactionalInsertAction.overwriteAction( + null, + ImmutableSet.of(SEGMENT1), + ImmutableSet.of(SEGMENT2) + ).perform( + task, + actionTestKit.getTaskActionToolbox() + ); + Assert.assertEquals(SegmentPublishResult.ok(ImmutableSet.of(SEGMENT2)), result2); + + Assertions.assertThat( + actionTestKit.getMetadataStorageCoordinator() + .retrieveUsedSegmentsForInterval(DATA_SOURCE, INTERVAL, Segments.ONLY_VISIBLE) + ).containsExactlyInAnyOrder(SEGMENT2); + } + + @Test + public void testFailTransactionalUpdateDataSourceMetadata() throws Exception { final Task task = NoopTask.create(); actionTestKit.getTaskLockbox().add(task); @@ -153,11 +186,32 @@ public void testFailTransactional() throws Exception Assert.assertEquals(SegmentPublishResult.fail("java.lang.RuntimeException: Aborting transaction!"), result); } + @Test + public void testFailTransactionalDropSegment() throws Exception + { + final Task task = NoopTask.create(); + actionTestKit.getTaskLockbox().add(task); + acquireTimeChunkLock(TaskLockType.EXCLUSIVE, task, INTERVAL, 5000); + + SegmentPublishResult result = SegmentTransactionalInsertAction.overwriteAction( + null, + // SEGMENT1 does not exist, hence will fail to drop + ImmutableSet.of(SEGMENT1), + ImmutableSet.of(SEGMENT2) + ).perform( + task, + actionTestKit.getTaskActionToolbox() + ); + + Assert.assertEquals(SegmentPublishResult.fail("org.apache.druid.metadata.RetryTransactionException: Aborting transaction!"), result); + } + @Test public void testFailBadVersion() throws Exception { final Task task = NoopTask.create(); final SegmentTransactionalInsertAction action = SegmentTransactionalInsertAction.overwriteAction( + null, null, ImmutableSet.of(SEGMENT3) ); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/TaskActionTestKit.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/TaskActionTestKit.java index df7311206b0f..ea2c854c3e24 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/TaskActionTestKit.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/TaskActionTestKit.java @@ -84,7 +84,14 @@ public void before() objectMapper, metadataStorageTablesConfig, testDerbyConnector - ); + ) + { + @Override + public int getSqlMetadataMaxRetry() + { + return 2; + } + }; taskLockbox = new TaskLockbox(taskStorage, metadataStorageCoordinator); segmentsMetadataManager = new SqlSegmentsMetadataManager( objectMapper, diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java index f6b09cbcff84..abc504b308d3 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java @@ -1486,11 +1486,12 @@ public Set announceHistoricalSegments(Set segments) th @Override public SegmentPublishResult announceHistoricalSegments( Set segments, + Set segmentsToDrop, DataSourceMetadata startMetadata, DataSourceMetadata endMetadata ) throws IOException { - SegmentPublishResult result = super.announceHistoricalSegments(segments, startMetadata, endMetadata); + SegmentPublishResult result = super.announceHistoricalSegments(segments, segmentsToDrop, startMetadata, endMetadata); Assert.assertFalse( "Segment latch not initialized, did you forget to call expectPublishSegments?", diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java index 2e23fcaef8ef..4693f3ad485c 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java @@ -24,6 +24,7 @@ import com.google.common.collect.ImmutableSet; import com.google.common.collect.Lists; import com.google.common.io.Files; +import org.apache.druid.client.indexing.ClientCompactionTaskGranularitySpec; import org.apache.druid.data.input.InputSplit; import org.apache.druid.data.input.SegmentsSplitHintSpec; import org.apache.druid.data.input.impl.CsvInputFormat; @@ -76,6 +77,7 @@ import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -439,6 +441,39 @@ public void testDruidInputSourceCreateSplitsWithIndividualSplits() Assert.assertEquals(segmentIdsFromCoordinator, segmentIdsFromSplits); } + @Test + public void testCompactionDropSegmentsOfInputInterval() + { + runIndexTask(null, true); + + Collection usedSegments = getCoordinatorClient().fetchUsedSegmentsInDataSourceForIntervals(DATA_SOURCE, ImmutableList.of(INTERVAL_TO_INDEX)); + Assert.assertEquals(3, usedSegments.size()); + for (DataSegment segment : usedSegments) { + Assert.assertTrue(Granularities.HOUR.isAligned(segment.getInterval())); + } + + final Builder builder = new Builder( + DATA_SOURCE, + getSegmentLoaderFactory(), + RETRY_POLICY_FACTORY + ); + final CompactionTask compactionTask = builder + .inputSpec(new CompactionIntervalSpec(INTERVAL_TO_INDEX, null)) + .tuningConfig(AbstractParallelIndexSupervisorTaskTest.DEFAULT_TUNING_CONFIG_FOR_PARALLEL_INDEXING) + .granularitySpec(new ClientCompactionTaskGranularitySpec(Granularities.MINUTE, null)) + .build(); + + final Set compactedSegments = runTask(compactionTask); + + usedSegments = getCoordinatorClient().fetchUsedSegmentsInDataSourceForIntervals(DATA_SOURCE, ImmutableList.of(INTERVAL_TO_INDEX)); + // All the HOUR segments got dropped even if we do not have all MINUTES segments fully covering the 3 HOURS interval. + // In fact, we only have 3 minutes of data out of the 3 hours interval. + Assert.assertEquals(3, usedSegments.size()); + for (DataSegment segment : usedSegments) { + Assert.assertTrue(Granularities.MINUTE.isAligned(segment.getInterval())); + } + } + private void runIndexTask(@Nullable PartitionsSpec partitionsSpec, boolean appendToExisting) { ParallelIndexIOConfig ioConfig = new ParallelIndexIOConfig( @@ -451,7 +486,8 @@ private void runIndexTask(@Nullable PartitionsSpec partitionsSpec, boolean appen false, 0 ), - appendToExisting + appendToExisting, + null ); ParallelIndexTuningConfig tuningConfig = newTuningConfig(partitionsSpec, 2, !appendToExisting); ParallelIndexSupervisorTask indexTask = new ParallelIndexSupervisorTask( diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java index c835397f7bcd..5eadc388db0f 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java @@ -482,6 +482,7 @@ public void testRunIndexAndCompactAtTheSameTimeForDifferentInterval() throws Exc null ), IndexTaskTest.createTuningConfig(2, 2, null, 2L, null, false, true), + false, false ), null @@ -808,12 +809,27 @@ public void testPartialIntervalCompactWithFinerSegmentGranularityThenFullInterva return; } + // This creates HOUR segments with intervals of + // - 2014-01-01T00:00:00/2014-01-01T01:00:00 + // - 2014-01-01T01:00:00/2014-01-01T02:00:00 + // - 2014-01-01T02:00:00/2014-01-01T03:00:00 runIndexTask(); - final Set expectedSegments = new HashSet<>( + final Interval compactionPartialInterval = Intervals.of("2014-01-01T01:00:00/2014-01-01T02:00:00"); + + // Segments that did not belong in the compaction interval are expected unchanged + final Set expectedSegments = new HashSet<>(); + expectedSegments.addAll( getStorageCoordinator().retrieveUsedSegmentsForIntervals( DATA_SOURCE, - Collections.singletonList(Intervals.of("2014-01-01/2014-01-02")), + Collections.singletonList(Intervals.of("2014-01-01T02:00:00/2014-01-01T03:00:00")), + Segments.ONLY_VISIBLE + ) + ); + expectedSegments.addAll( + getStorageCoordinator().retrieveUsedSegmentsForIntervals( + DATA_SOURCE, + Collections.singletonList(Intervals.of("2014-01-01T00:00:00/2014-01-01T01:00:00")), Segments.ONLY_VISIBLE ) ); @@ -824,15 +840,17 @@ public void testPartialIntervalCompactWithFinerSegmentGranularityThenFullInterva RETRY_POLICY_FACTORY ); - final Interval partialInterval = Intervals.of("2014-01-01T01:00:00/2014-01-01T02:00:00"); final CompactionTask partialCompactionTask = builder - .interval(partialInterval) + .interval(compactionPartialInterval) .segmentGranularity(Granularities.MINUTE) .build(); final Pair> partialCompactionResult = runTask(partialCompactionTask); Assert.assertTrue(partialCompactionResult.lhs.isSuccess()); - // All segments in the previous expectedSegments should still appear as they have larger segment granularity. + + // New segments that was compacted are expected. However, old segments of the compacted interval should be drop + // regardless of the new segments fully overshadow the old segments or not. Hence, we do not expect old segments + // of the 2014-01-01T01:00:00/2014-01-01T02:00:00 interval post-compaction expectedSegments.addAll(partialCompactionResult.rhs); final Set segmentsAfterPartialCompaction = new HashSet<>( @@ -865,12 +883,23 @@ public void testPartialIntervalCompactWithFinerSegmentGranularityThenFullInterva ); Assert.assertEquals(3, segmentsAfterFullCompaction.size()); - for (int i = 0; i < segmentsAfterFullCompaction.size(); i++) { - Assert.assertEquals( - Intervals.of(StringUtils.format("2014-01-01T%02d/2014-01-01T%02d", i, i + 1)), - segmentsAfterFullCompaction.get(i).getInterval() - ); - } + // Full Compaction with null segmentGranularity meaning that the original segmentGrnaularity is perserved + // For the intervals, 2014-01-01T00:00:00.000Z/2014-01-01T01:00:00.000Z and 2014-01-01T02:00:00.000Z/2014-01-01T03:00:00.000Z + // the original segmentGranularity is HOUR from the initial ingestion. + // For the interval, 2014-01-01T01:00:00.000Z/2014-01-01T01:01:00.000Z, the original segmentGranularity is + // MINUTE from the partial compaction done earlier. + Assert.assertEquals( + Intervals.of("2014-01-01T00:00:00.000Z/2014-01-01T01:00:00.000Z"), + segmentsAfterFullCompaction.get(0).getInterval() + ); + Assert.assertEquals( + Intervals.of("2014-01-01T01:00:00.000Z/2014-01-01T01:01:00.000Z"), + segmentsAfterFullCompaction.get(1).getInterval() + ); + Assert.assertEquals( + Intervals.of("2014-01-01T02:00:00.000Z/2014-01-01T03:00:00.000Z"), + segmentsAfterFullCompaction.get(2).getInterval() + ); } @Test @@ -1050,6 +1079,7 @@ public void testRunRegularIndexTaskWithIngestSegmentFirehose() throws Exception segmentLoaderFactory, RETRY_POLICY_FACTORY ), + false, false ), IndexTaskTest.createTuningConfig(5000000, null, null, Long.MAX_VALUE, null, false, true) @@ -1126,7 +1156,8 @@ private Pair> runIndexTask( null ), IndexTaskTest.createTuningConfig(2, 2, null, 2L, null, false, true), - appendToExisting + appendToExisting, + false ), null ); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexIngestionSpecTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexIngestionSpecTest.java index af67f9908f51..6af2e194c1f3 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexIngestionSpecTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexIngestionSpecTest.java @@ -60,6 +60,7 @@ public void testParserAndInputFormat() null, new NoopInputSource(), new NoopInputFormat(), + null, null ), null @@ -84,6 +85,7 @@ public void testParserAndInputSource() null, new NoopInputSource(), null, + null, null ), null @@ -110,6 +112,7 @@ public void testFirehoseAndInputSource() new NoopFirehoseFactory(), new NoopInputSource(), null, + null, null ), null @@ -134,6 +137,7 @@ public void testFirehoseAndInputFormat() new NoopFirehoseFactory(), null, new NoopInputFormat(), + null, null ), null diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java index a7ccd4753999..3364c422440a 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java @@ -53,6 +53,7 @@ import org.apache.druid.indexing.common.task.IndexTask.IndexIOConfig; import org.apache.druid.indexing.common.task.IndexTask.IndexIngestionSpec; import org.apache.druid.indexing.common.task.IndexTask.IndexTuningConfig; +import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.common.StringUtils; @@ -125,6 +126,7 @@ public class IndexTaskTest extends IngestionTestBase @Rule public ExpectedException expectedException = ExpectedException.none(); + private static final String DATASOURCE = "test"; private static final TimestampSpec DEFAULT_TIMESTAMP_SPEC = new TimestampSpec("ts", "auto", null); private static final DimensionsSpec DEFAULT_DIMENSIONS_SPEC = new DimensionsSpec( DimensionsSpec.getDefaultSchemas(Arrays.asList("ts", "dim")) @@ -220,6 +222,7 @@ public void testDeterminePartitions() throws Exception null, null, createTuningConfigWithMaxRowsPerSegment(2, true), + false, false ), null @@ -231,7 +234,7 @@ public void testDeterminePartitions() throws Exception Assert.assertEquals(2, segments.size()); - Assert.assertEquals("test", segments.get(0).getDataSource()); + Assert.assertEquals(DATASOURCE, segments.get(0).getDataSource()); Assert.assertEquals(Intervals.of("2014/P1D"), segments.get(0).getInterval()); Assert.assertEquals(HashBasedNumberedShardSpec.class, segments.get(0).getShardSpec().getClass()); Assert.assertEquals(0, segments.get(0).getShardSpec().getPartitionNum()); @@ -241,7 +244,7 @@ public void testDeterminePartitions() throws Exception ((HashBasedNumberedShardSpec) segments.get(0).getShardSpec()).getPartitionFunction() ); - Assert.assertEquals("test", segments.get(1).getDataSource()); + Assert.assertEquals(DATASOURCE, segments.get(1).getDataSource()); Assert.assertEquals(Intervals.of("2014/P1D"), segments.get(1).getInterval()); Assert.assertEquals(HashBasedNumberedShardSpec.class, segments.get(1).getShardSpec().getClass()); Assert.assertEquals(1, segments.get(1).getShardSpec().getPartitionNum()); @@ -306,6 +309,7 @@ public void testTransformSpec() throws Exception transformSpec, null, tuningConfig, + false, false ); } else { @@ -316,6 +320,7 @@ public void testTransformSpec() throws Exception transformSpec, null, tuningConfig, + false, false ); } @@ -383,7 +388,7 @@ public void testTransformSpec() throws Exception Assert.assertEquals(ImmutableList.of("anotherfoo", "arrayfoo"), transforms.get(0).get("dimtarray2")); Assert.assertEquals(ImmutableList.of("6.0", "7.0"), transforms.get(0).get("dimtnum_array")); - Assert.assertEquals("test", segments.get(0).getDataSource()); + Assert.assertEquals(DATASOURCE, segments.get(0).getDataSource()); Assert.assertEquals(Intervals.of("2014/P1D"), segments.get(0).getInterval()); Assert.assertEquals(NumberedShardSpec.class, segments.get(0).getShardSpec().getClass()); Assert.assertEquals(0, segments.get(0).getShardSpec().getPartitionNum()); @@ -414,6 +419,7 @@ public void testWithArbitraryGranularity() throws Exception ), null, createTuningConfigWithMaxRowsPerSegment(10, true), + false, false ), null @@ -449,6 +455,7 @@ public void testIntervalBucketing() throws Exception ), null, createTuningConfigWithMaxRowsPerSegment(50, true), + false, false ), null @@ -480,6 +487,7 @@ public void testNumShardsProvided() throws Exception null, null, createTuningConfigWithPartitionsSpec(new HashedPartitionsSpec(null, 1, null), true), + false, false ), null @@ -489,7 +497,7 @@ public void testNumShardsProvided() throws Exception Assert.assertEquals(1, segments.size()); - Assert.assertEquals("test", segments.get(0).getDataSource()); + Assert.assertEquals(DATASOURCE, segments.get(0).getDataSource()); Assert.assertEquals(Intervals.of("2014/P1D"), segments.get(0).getInterval()); Assert.assertEquals(HashBasedNumberedShardSpec.class, segments.get(0).getShardSpec().getClass()); Assert.assertEquals(0, segments.get(0).getShardSpec().getPartitionNum()); @@ -522,6 +530,7 @@ public void testNumShardsAndHashPartitionFunctionProvided() throws Exception createTuningConfigWithPartitionsSpec( new HashedPartitionsSpec(null, 1, null, HashPartitionFunction.MURMUR3_32_ABS), true ), + false, false ), null @@ -531,7 +540,7 @@ public void testNumShardsAndHashPartitionFunctionProvided() throws Exception Assert.assertEquals(1, segments.size()); - Assert.assertEquals("test", segments.get(0).getDataSource()); + Assert.assertEquals(DATASOURCE, segments.get(0).getDataSource()); Assert.assertEquals(Intervals.of("2014/P1D"), segments.get(0).getInterval()); Assert.assertEquals(HashBasedNumberedShardSpec.class, segments.get(0).getShardSpec().getClass()); Assert.assertEquals(0, segments.get(0).getShardSpec().getPartitionNum()); @@ -562,6 +571,7 @@ public void testNumShardsAndPartitionDimensionsProvided() throws Exception null, null, createTuningConfigWithPartitionsSpec(new HashedPartitionsSpec(null, 2, ImmutableList.of("dim")), true), + false, false ), null @@ -572,7 +582,7 @@ public void testNumShardsAndPartitionDimensionsProvided() throws Exception Assert.assertEquals(2, segments.size()); for (DataSegment segment : segments) { - Assert.assertEquals("test", segment.getDataSource()); + Assert.assertEquals(DATASOURCE, segment.getDataSource()); Assert.assertEquals(Intervals.of("2014/P1D"), segment.getInterval()); Assert.assertEquals(HashBasedNumberedShardSpec.class, segment.getShardSpec().getClass()); final HashBasedNumberedShardSpec hashBasedNumberedShardSpec = (HashBasedNumberedShardSpec) segment.getShardSpec(); @@ -634,7 +644,8 @@ public void testWriteNewSegmentsWithAppendToExistingWithLinearPartitioningSucces null, null, createTuningConfigWithMaxRowsPerSegment(2, false), - true + true, + false ), null ); @@ -646,12 +657,12 @@ public void testWriteNewSegmentsWithAppendToExistingWithLinearPartitioningSucces Assert.assertEquals(2, taskRunner.getTaskActionClient().getActionCount(SegmentAllocateAction.class)); Assert.assertEquals(2, segments.size()); - Assert.assertEquals("test", segments.get(0).getDataSource()); + Assert.assertEquals(DATASOURCE, segments.get(0).getDataSource()); Assert.assertEquals(Intervals.of("2014/P1D"), segments.get(0).getInterval()); Assert.assertEquals(NumberedShardSpec.class, segments.get(0).getShardSpec().getClass()); Assert.assertEquals(0, segments.get(0).getShardSpec().getPartitionNum()); - Assert.assertEquals("test", segments.get(1).getDataSource()); + Assert.assertEquals(DATASOURCE, segments.get(1).getDataSource()); Assert.assertEquals(Intervals.of("2014/P1D"), segments.get(1).getInterval()); Assert.assertEquals(NumberedShardSpec.class, segments.get(1).getShardSpec().getClass()); Assert.assertEquals(1, segments.get(1).getShardSpec().getPartitionNum()); @@ -682,6 +693,7 @@ public void testIntervalNotSpecified() throws Exception ), null, createTuningConfigWithMaxRowsPerSegment(2, true), + false, false ), null @@ -691,17 +703,17 @@ public void testIntervalNotSpecified() throws Exception Assert.assertEquals(3, segments.size()); - Assert.assertEquals("test", segments.get(0).getDataSource()); + Assert.assertEquals(DATASOURCE, segments.get(0).getDataSource()); Assert.assertEquals(Intervals.of("2014-01-01T00/PT1H"), segments.get(0).getInterval()); Assert.assertEquals(HashBasedNumberedShardSpec.class, segments.get(0).getShardSpec().getClass()); Assert.assertEquals(0, segments.get(0).getShardSpec().getPartitionNum()); - Assert.assertEquals("test", segments.get(1).getDataSource()); + Assert.assertEquals(DATASOURCE, segments.get(1).getDataSource()); Assert.assertEquals(Intervals.of("2014-01-01T01/PT1H"), segments.get(1).getInterval()); Assert.assertEquals(HashBasedNumberedShardSpec.class, segments.get(1).getShardSpec().getClass()); Assert.assertEquals(0, segments.get(1).getShardSpec().getPartitionNum()); - Assert.assertEquals("test", segments.get(2).getDataSource()); + Assert.assertEquals(DATASOURCE, segments.get(2).getDataSource()); Assert.assertEquals(Intervals.of("2014-01-01T02/PT1H"), segments.get(2).getInterval()); Assert.assertEquals(HashBasedNumberedShardSpec.class, segments.get(2).getShardSpec().getClass()); Assert.assertEquals(0, segments.get(2).getShardSpec().getPartitionNum()); @@ -730,6 +742,7 @@ public void testCSVFileWithHeader() throws Exception null, null, tuningConfig, + false, false ); } else { @@ -742,6 +755,7 @@ public void testCSVFileWithHeader() throws Exception null, null, tuningConfig, + false, false ); } @@ -786,6 +800,7 @@ public void testCSVFileWithHeaderColumnOverride() throws Exception null, null, tuningConfig, + false, false ); } else { @@ -798,6 +813,7 @@ public void testCSVFileWithHeaderColumnOverride() throws Exception null, null, tuningConfig, + false, false ); } @@ -849,6 +865,7 @@ public void testWithSmallMaxTotalRows() throws Exception ), null, createTuningConfig(2, 2, null, 2L, null, false, true), + false, false ), null @@ -863,7 +880,7 @@ public void testWithSmallMaxTotalRows() throws Exception final Interval expectedInterval = Intervals.of(StringUtils.format("2014-01-01T0%d/PT1H", (i / 2))); final int expectedPartitionNum = i % 2; - Assert.assertEquals("test", segment.getDataSource()); + Assert.assertEquals(DATASOURCE, segment.getDataSource()); Assert.assertEquals(expectedInterval, segment.getInterval()); Assert.assertEquals(NumberedShardSpec.class, segment.getShardSpec().getClass()); Assert.assertEquals(expectedPartitionNum, segment.getShardSpec().getPartitionNum()); @@ -892,6 +909,7 @@ public void testPerfectRollup() throws Exception ), null, createTuningConfig(3, 2, null, 2L, null, true, true), + false, false ), null @@ -905,7 +923,7 @@ public void testPerfectRollup() throws Exception final DataSegment segment = segments.get(i); final Interval expectedInterval = Intervals.of("2014-01-01T00:00:00.000Z/2014-01-02T00:00:00.000Z"); - Assert.assertEquals("test", segment.getDataSource()); + Assert.assertEquals(DATASOURCE, segment.getDataSource()); Assert.assertEquals(expectedInterval, segment.getInterval()); Assert.assertTrue(segment.getShardSpec().getClass().equals(HashBasedNumberedShardSpec.class)); Assert.assertEquals(i, segment.getShardSpec().getPartitionNum()); @@ -934,6 +952,7 @@ public void testBestEffortRollup() throws Exception ), null, createTuningConfig(3, 2, null, 2L, null, false, true), + false, false ), null @@ -947,7 +966,7 @@ public void testBestEffortRollup() throws Exception for (int i = 0; i < 5; i++) { final DataSegment segment = segments.get(i); - Assert.assertEquals("test", segment.getDataSource()); + Assert.assertEquals(DATASOURCE, segment.getDataSource()); Assert.assertEquals(expectedInterval, segment.getInterval()); Assert.assertEquals(NumberedShardSpec.class, segment.getShardSpec().getClass()); Assert.assertEquals(i, segment.getShardSpec().getPartitionNum()); @@ -1000,6 +1019,7 @@ public void testIgnoreParseException() throws Exception null, null, tuningConfig, + false, false ); } else { @@ -1010,6 +1030,7 @@ public void testIgnoreParseException() throws Exception null, null, tuningConfig, + false, false ); } @@ -1056,6 +1077,7 @@ public void testReportParseException() throws Exception null, null, tuningConfig, + false, false ); } else { @@ -1066,6 +1088,7 @@ public void testReportParseException() throws Exception null, null, tuningConfig, + false, false ); } @@ -1159,6 +1182,7 @@ public void testMultipleParseExceptionsSuccess() throws Exception null, null, tuningConfig, + false, false ); } else { @@ -1169,6 +1193,7 @@ public void testMultipleParseExceptionsSuccess() throws Exception null, null, tuningConfig, + false, false ); } @@ -1291,6 +1316,7 @@ public void testMultipleParseExceptionsFailure() throws Exception null, null, tuningConfig, + false, false ); } else { @@ -1301,6 +1327,7 @@ public void testMultipleParseExceptionsFailure() throws Exception null, null, tuningConfig, + false, false ); } @@ -1414,6 +1441,7 @@ public void testMultipleParseExceptionsFailureAtDeterminePartitions() throws Exc null, null, tuningConfig, + false, false ); } else { @@ -1424,6 +1452,7 @@ public void testMultipleParseExceptionsFailureAtDeterminePartitions() throws Exc null, null, tuningConfig, + false, false ); } @@ -1513,6 +1542,7 @@ public void testCsvWithHeaderOfEmptyColumns() throws Exception null, null, tuningConfig, + false, false ); } else { @@ -1523,6 +1553,7 @@ public void testCsvWithHeaderOfEmptyColumns() throws Exception null, null, tuningConfig, + false, false ); } @@ -1585,6 +1616,7 @@ public void testCsvWithHeaderOfEmptyTimestamp() throws Exception null, null, tuningConfig, + false, false ); } else { @@ -1595,6 +1627,7 @@ public void testCsvWithHeaderOfEmptyTimestamp() throws Exception null, null, tuningConfig, + false, false ); } @@ -1646,6 +1679,7 @@ public void testOverwriteWithSameSegmentGranularity() throws Exception ), null, createTuningConfig(3, 2, null, 2L, null, false, true), + false, false ), null @@ -1658,7 +1692,7 @@ public void testOverwriteWithSameSegmentGranularity() throws Exception final Interval expectedInterval = Intervals.of("2014-01-01T00:00:00.000Z/2014-01-02T00:00:00.000Z"); for (int j = 0; j < 5; j++) { final DataSegment segment = segments.get(j); - Assert.assertEquals("test", segment.getDataSource()); + Assert.assertEquals(DATASOURCE, segment.getDataSource()); Assert.assertEquals(expectedInterval, segment.getInterval()); if (i == 0) { Assert.assertEquals(NumberedShardSpec.class, segment.getShardSpec().getClass()); @@ -1710,6 +1744,7 @@ public void testOverwriteWithDifferentSegmentGranularity() throws Exception ), null, createTuningConfig(3, 2, null, 2L, null, false, true), + false, false ), null @@ -1724,7 +1759,7 @@ public void testOverwriteWithDifferentSegmentGranularity() throws Exception : Intervals.of("2014-01-01/2014-02-01"); for (int j = 0; j < 5; j++) { final DataSegment segment = segments.get(j); - Assert.assertEquals("test", segment.getDataSource()); + Assert.assertEquals(DATASOURCE, segment.getDataSource()); Assert.assertEquals(expectedInterval, segment.getInterval()); Assert.assertEquals(NumberedShardSpec.class, segment.getShardSpec().getClass()); Assert.assertEquals(j, segment.getShardSpec().getPartitionNum()); @@ -1745,6 +1780,7 @@ public void testIndexTaskWithSingleDimPartitionsSpecThrowingException() throws E null, null, createTuningConfigWithPartitionsSpec(new SingleDimensionPartitionsSpec(null, 1, null, false), true), + false, false ), null @@ -1756,6 +1792,363 @@ public void testIndexTaskWithSingleDimPartitionsSpecThrowingException() throws E task.isReady(createActionClient(task)); } + @Test + public void testOldSegmentNotDropWhenDropFlagFalse() throws Exception + { + File tmpDir = temporaryFolder.newFolder(); + + File tmpFile = File.createTempFile("druid", "index", tmpDir); + + try (BufferedWriter writer = Files.newWriter(tmpFile, StandardCharsets.UTF_8)) { + writer.write("2014-01-01T00:00:10Z,a,1\n"); + writer.write("2014-01-01T01:00:20Z,b,1\n"); + writer.write("2014-01-01T02:00:30Z,c,1\n"); + } + + IndexTask indexTask = new IndexTask( + null, + null, + createDefaultIngestionSpec( + jsonMapper, + tmpDir, + new UniformGranularitySpec( + Granularities.YEAR, + Granularities.MINUTE, + Collections.singletonList(Intervals.of("2014-01-01/2014-01-02")) + ), + null, + createTuningConfigWithMaxRowsPerSegment(10, true), + false, + false + ), + null + ); + + // Ingest data with YEAR segment granularity + List segments = runTask(indexTask).rhs; + + Assert.assertEquals(1, segments.size()); + Set usedSegmentsBeforeOverwrite = Sets.newHashSet(getSegmentsMetadataManager().iterateAllUsedNonOvershadowedSegmentsForDatasourceInterval(DATASOURCE, Intervals.ETERNITY, true).get()); + Assert.assertEquals(1, usedSegmentsBeforeOverwrite.size()); + for (DataSegment segment : usedSegmentsBeforeOverwrite) { + Assert.assertTrue(Granularities.YEAR.isAligned(segment.getInterval())); + } + + indexTask = new IndexTask( + null, + null, + createDefaultIngestionSpec( + jsonMapper, + tmpDir, + new UniformGranularitySpec( + Granularities.MINUTE, + Granularities.MINUTE, + Collections.singletonList(Intervals.of("2014-01-01/2014-01-02")) + ), + null, + createTuningConfigWithMaxRowsPerSegment(10, true), + false, + false + ), + null + ); + + // Ingest data with overwrite and MINUTE segment granularity + segments = runTask(indexTask).rhs; + + Assert.assertEquals(3, segments.size()); + Set usedSegmentsBeforeAfterOverwrite = Sets.newHashSet(getSegmentsMetadataManager().iterateAllUsedNonOvershadowedSegmentsForDatasourceInterval(DATASOURCE, Intervals.ETERNITY, true).get()); + Assert.assertEquals(4, usedSegmentsBeforeAfterOverwrite.size()); + int yearSegmentFound = 0; + int minuteSegmentFound = 0; + for (DataSegment segment : usedSegmentsBeforeAfterOverwrite) { + // Used segments after overwrite will contain 1 old segment with YEAR segmentGranularity (from first ingestion) + // and 3 new segments with MINUTE segmentGranularity (from second ingestion) + if (usedSegmentsBeforeOverwrite.contains(segment)) { + Assert.assertTrue(Granularities.YEAR.isAligned(segment.getInterval())); + yearSegmentFound++; + } else { + Assert.assertTrue(Granularities.MINUTE.isAligned(segment.getInterval())); + minuteSegmentFound++; + } + } + Assert.assertEquals(1, yearSegmentFound); + Assert.assertEquals(3, minuteSegmentFound); + } + + @Test + public void testOldSegmentNotDropWhenDropFlagTrueSinceIngestionIntervalDoesNotContainsOldSegment() throws Exception + { + File tmpDir = temporaryFolder.newFolder(); + + File tmpFile = File.createTempFile("druid", "index", tmpDir); + + try (BufferedWriter writer = Files.newWriter(tmpFile, StandardCharsets.UTF_8)) { + writer.write("2014-01-01T00:00:10Z,a,1\n"); + writer.write("2014-01-01T01:00:20Z,b,1\n"); + writer.write("2014-01-01T02:00:30Z,c,1\n"); + } + + IndexTask indexTask = new IndexTask( + null, + null, + createDefaultIngestionSpec( + jsonMapper, + tmpDir, + new UniformGranularitySpec( + Granularities.YEAR, + Granularities.MINUTE, + Collections.singletonList(Intervals.of("2014-01-01/2014-01-02")) + ), + null, + createTuningConfigWithMaxRowsPerSegment(10, true), + false, + false + ), + null + ); + + // Ingest data with YEAR segment granularity + List segments = runTask(indexTask).rhs; + + Assert.assertEquals(1, segments.size()); + Set usedSegmentsBeforeOverwrite = Sets.newHashSet(getSegmentsMetadataManager().iterateAllUsedNonOvershadowedSegmentsForDatasourceInterval(DATASOURCE, Intervals.ETERNITY, true).get()); + Assert.assertEquals(1, usedSegmentsBeforeOverwrite.size()); + for (DataSegment segment : usedSegmentsBeforeOverwrite) { + Assert.assertTrue(Granularities.YEAR.isAligned(segment.getInterval())); + } + + indexTask = new IndexTask( + null, + null, + createDefaultIngestionSpec( + jsonMapper, + tmpDir, + new UniformGranularitySpec( + Granularities.MINUTE, + Granularities.MINUTE, + Collections.singletonList(Intervals.of("2014-01-01/2014-01-02")) + ), + null, + createTuningConfigWithMaxRowsPerSegment(10, true), + false, + true + ), + null + ); + + // Ingest data with overwrite and MINUTE segment granularity + segments = runTask(indexTask).rhs; + + Assert.assertEquals(3, segments.size()); + Set usedSegmentsBeforeAfterOverwrite = Sets.newHashSet(getSegmentsMetadataManager().iterateAllUsedNonOvershadowedSegmentsForDatasourceInterval(DATASOURCE, Intervals.ETERNITY, true).get()); + Assert.assertEquals(4, usedSegmentsBeforeAfterOverwrite.size()); + int yearSegmentFound = 0; + int minuteSegmentFound = 0; + for (DataSegment segment : usedSegmentsBeforeAfterOverwrite) { + // Used segments after overwrite will contain 1 old segment with YEAR segmentGranularity (from first ingestion) + // and 3 new segments with MINUTE segmentGranularity (from second ingestion) + if (usedSegmentsBeforeOverwrite.contains(segment)) { + Assert.assertTrue(Granularities.YEAR.isAligned(segment.getInterval())); + yearSegmentFound++; + } else { + Assert.assertTrue(Granularities.MINUTE.isAligned(segment.getInterval())); + minuteSegmentFound++; + } + } + Assert.assertEquals(1, yearSegmentFound); + Assert.assertEquals(3, minuteSegmentFound); + } + + @Test + public void testOldSegmentDropWhenDropFlagTrueAndIngestionIntervalContainsOldSegment() throws Exception + { + File tmpDir = temporaryFolder.newFolder(); + + File tmpFile = File.createTempFile("druid", "index", tmpDir); + + try (BufferedWriter writer = Files.newWriter(tmpFile, StandardCharsets.UTF_8)) { + writer.write("2014-01-01T00:00:10Z,a,1\n"); + writer.write("2014-01-01T01:00:20Z,b,1\n"); + writer.write("2014-01-01T02:00:30Z,c,1\n"); + } + + IndexTask indexTask = new IndexTask( + null, + null, + createDefaultIngestionSpec( + jsonMapper, + tmpDir, + new UniformGranularitySpec( + Granularities.YEAR, + Granularities.MINUTE, + Collections.singletonList(Intervals.of("2014-01-01/2014-01-02")) + ), + null, + createTuningConfigWithMaxRowsPerSegment(10, true), + false, + false + ), + null + ); + + // Ingest data with YEAR segment granularity + List segments = runTask(indexTask).rhs; + + Assert.assertEquals(1, segments.size()); + Set usedSegmentsBeforeOverwrite = Sets.newHashSet(getSegmentsMetadataManager().iterateAllUsedNonOvershadowedSegmentsForDatasourceInterval(DATASOURCE, Intervals.ETERNITY, true).get()); + Assert.assertEquals(1, usedSegmentsBeforeOverwrite.size()); + for (DataSegment segment : usedSegmentsBeforeOverwrite) { + Assert.assertTrue(Granularities.YEAR.isAligned(segment.getInterval())); + } + + indexTask = new IndexTask( + null, + null, + createDefaultIngestionSpec( + jsonMapper, + tmpDir, + new UniformGranularitySpec( + Granularities.MINUTE, + Granularities.MINUTE, + Collections.singletonList(Intervals.of("2014-01-01/2015-01-01")) + ), + null, + createTuningConfigWithMaxRowsPerSegment(10, true), + false, + true + ), + null + ); + + // Ingest data with overwrite and MINUTE segment granularity + segments = runTask(indexTask).rhs; + + Assert.assertEquals(3, segments.size()); + Set usedSegmentsBeforeAfterOverwrite = Sets.newHashSet(getSegmentsMetadataManager().iterateAllUsedNonOvershadowedSegmentsForDatasourceInterval(DATASOURCE, Intervals.ETERNITY, true).get()); + Assert.assertEquals(3, usedSegmentsBeforeAfterOverwrite.size()); + for (DataSegment segment : usedSegmentsBeforeAfterOverwrite) { + // Used segments after overwrite and drop will contain only the + // 3 new segments with MINUTE segmentGranularity (from second ingestion) + if (usedSegmentsBeforeOverwrite.contains(segment)) { + Assert.fail(); + } else { + Assert.assertTrue(Granularities.MINUTE.isAligned(segment.getInterval())); + } + } + } + + @Test + public void testOldSegmentNotDropWhenDropFlagTrueAndIngestionIntervalEmpty() throws Exception + { + File tmpDir = temporaryFolder.newFolder(); + + File tmpFile = File.createTempFile("druid", "index", tmpDir); + + try (BufferedWriter writer = Files.newWriter(tmpFile, StandardCharsets.UTF_8)) { + writer.write("2014-01-01T00:00:10Z,a,1\n"); + writer.write("2014-01-01T01:00:20Z,b,1\n"); + writer.write("2014-01-01T02:00:30Z,c,1\n"); + } + + IndexTask indexTask = new IndexTask( + null, + null, + createDefaultIngestionSpec( + jsonMapper, + tmpDir, + new UniformGranularitySpec( + Granularities.YEAR, + Granularities.MINUTE, + Collections.singletonList(Intervals.of("2014-01-01/2014-01-02")) + ), + null, + createTuningConfigWithMaxRowsPerSegment(10, true), + false, + false + ), + null + ); + + // Ingest data with YEAR segment granularity + List segments = runTask(indexTask).rhs; + + Assert.assertEquals(1, segments.size()); + Set usedSegmentsBeforeOverwrite = Sets.newHashSet(getSegmentsMetadataManager().iterateAllUsedNonOvershadowedSegmentsForDatasourceInterval(DATASOURCE, Intervals.ETERNITY, true).get()); + Assert.assertEquals(1, usedSegmentsBeforeOverwrite.size()); + for (DataSegment segment : usedSegmentsBeforeOverwrite) { + Assert.assertTrue(Granularities.YEAR.isAligned(segment.getInterval())); + } + + indexTask = new IndexTask( + null, + null, + createDefaultIngestionSpec( + jsonMapper, + tmpDir, + new UniformGranularitySpec( + Granularities.MINUTE, + Granularities.MINUTE, + null + ), + null, + createTuningConfigWithMaxRowsPerSegment(10, true), + false, + true + ), + null + ); + + // Ingest data with overwrite and MINUTE segment granularity + segments = runTask(indexTask).rhs; + + Assert.assertEquals(3, segments.size()); + Set usedSegmentsBeforeAfterOverwrite = Sets.newHashSet(getSegmentsMetadataManager().iterateAllUsedNonOvershadowedSegmentsForDatasourceInterval(DATASOURCE, Intervals.ETERNITY, true).get()); + Assert.assertEquals(4, usedSegmentsBeforeAfterOverwrite.size()); + int yearSegmentFound = 0; + int minuteSegmentFound = 0; + for (DataSegment segment : usedSegmentsBeforeAfterOverwrite) { + // Used segments after overwrite will contain 1 old segment with YEAR segmentGranularity (from first ingestion) + // and 3 new segments with MINUTE segmentGranularity (from second ingestion) + if (usedSegmentsBeforeOverwrite.contains(segment)) { + Assert.assertTrue(Granularities.YEAR.isAligned(segment.getInterval())); + yearSegmentFound++; + } else { + Assert.assertTrue(Granularities.MINUTE.isAligned(segment.getInterval())); + minuteSegmentFound++; + } + } + Assert.assertEquals(1, yearSegmentFound); + Assert.assertEquals(3, minuteSegmentFound); + } + + @Test + public void testErrorWhenDropFlagTrueAndOverwriteFalse() throws Exception + { + expectedException.expect(IAE.class); + expectedException.expectMessage( + "Cannot both drop existing segments and append to existing segments. Either dropExisting or appendToExisting should be set to false" + ); + new IndexTask( + null, + null, + createDefaultIngestionSpec( + jsonMapper, + temporaryFolder.newFolder(), + new UniformGranularitySpec( + Granularities.MINUTE, + Granularities.MINUTE, + Collections.singletonList(Intervals.of("2014-01-01/2014-01-02")) + ), + null, + createTuningConfigWithMaxRowsPerSegment(10, true), + true, + true + ), + null + ); + } + public static void checkTaskStatusErrorMsgForParseExceptionsExceeded(TaskStatus status) { // full stacktrace will be too long and make tests brittle (e.g. if line # changes), just match the main message @@ -1861,7 +2254,8 @@ private IndexIngestionSpec createDefaultIngestionSpec( @Nullable GranularitySpec granularitySpec, @Nullable TransformSpec transformSpec, IndexTuningConfig tuningConfig, - boolean appendToExisting + boolean appendToExisting, + Boolean dropExisting ) { if (useInputFormatApi) { @@ -1874,7 +2268,8 @@ private IndexIngestionSpec createDefaultIngestionSpec( transformSpec, granularitySpec, tuningConfig, - appendToExisting + appendToExisting, + dropExisting ); } else { return createIngestionSpec( @@ -1884,7 +2279,8 @@ private IndexIngestionSpec createDefaultIngestionSpec( transformSpec, granularitySpec, tuningConfig, - appendToExisting + appendToExisting, + dropExisting ); } } @@ -1896,7 +2292,8 @@ static IndexIngestionSpec createIngestionSpec( @Nullable TransformSpec transformSpec, @Nullable GranularitySpec granularitySpec, IndexTuningConfig tuningConfig, - boolean appendToExisting + boolean appendToExisting, + Boolean dropExisting ) { return createIngestionSpec( @@ -1909,7 +2306,8 @@ static IndexIngestionSpec createIngestionSpec( transformSpec, granularitySpec, tuningConfig, - appendToExisting + appendToExisting, + dropExisting ); } @@ -1922,7 +2320,8 @@ static IndexIngestionSpec createIngestionSpec( @Nullable TransformSpec transformSpec, @Nullable GranularitySpec granularitySpec, IndexTuningConfig tuningConfig, - boolean appendToExisting + boolean appendToExisting, + Boolean dropExisting ) { return createIngestionSpec( @@ -1935,7 +2334,8 @@ static IndexIngestionSpec createIngestionSpec( transformSpec, granularitySpec, tuningConfig, - appendToExisting + appendToExisting, + dropExisting ); } @@ -1949,14 +2349,15 @@ private static IndexIngestionSpec createIngestionSpec( @Nullable TransformSpec transformSpec, @Nullable GranularitySpec granularitySpec, IndexTuningConfig tuningConfig, - boolean appendToExisting + boolean appendToExisting, + Boolean dropExisting ) { if (inputFormat != null) { Preconditions.checkArgument(parseSpec == null, "Can't use parseSpec"); return new IndexIngestionSpec( new DataSchema( - "test", + DATASOURCE, Preconditions.checkNotNull(timestampSpec, "timestampSpec"), Preconditions.checkNotNull(dimensionsSpec, "dimensionsSpec"), new AggregatorFactory[]{ @@ -1973,14 +2374,15 @@ private static IndexIngestionSpec createIngestionSpec( null, new LocalInputSource(baseDir, "druid*"), inputFormat, - appendToExisting + appendToExisting, + dropExisting ), tuningConfig ); } else { return new IndexIngestionSpec( new DataSchema( - "test", + DATASOURCE, objectMapper.convertValue( new StringInputRowParser( parseSpec != null ? parseSpec : DEFAULT_PARSE_SPEC, @@ -2005,7 +2407,8 @@ private static IndexIngestionSpec createIngestionSpec( "druid*", null ), - appendToExisting + appendToExisting, + dropExisting ), tuningConfig ); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskSerdeTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskSerdeTest.java index 59bc8335c70b..4bbd34b3c41b 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskSerdeTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskSerdeTest.java @@ -245,7 +245,7 @@ public void testIndexTaskSerde() throws Exception ), null ), - new IndexIOConfig(null, new LocalInputSource(new File("lol"), "rofl"), new NoopInputFormat(), true), + new IndexIOConfig(null, new LocalInputSource(new File("lol"), "rofl"), new NoopInputFormat(), true, false), new IndexTuningConfig( null, null, @@ -328,7 +328,7 @@ public void testIndexTaskwithResourceSerde() throws Exception ), null ), - new IndexIOConfig(null, new LocalInputSource(new File("lol"), "rofl"), new NoopInputFormat(), true), + new IndexIOConfig(null, new LocalInputSource(new File("lol"), "rofl"), new NoopInputFormat(), true, false), new IndexTuningConfig( null, null, diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractMultiPhaseParallelIndexingTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractMultiPhaseParallelIndexingTest.java index d06d17c39309..5b2662400831 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractMultiPhaseParallelIndexingTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractMultiPhaseParallelIndexingTest.java @@ -198,7 +198,8 @@ private ParallelIndexSupervisorTask newTask( null, new LocalInputSource(inputDir, filter), inputFormat, - appendToExisting + appendToExisting, + null ); ingestionSpec = new ParallelIndexIngestionSpec( new DataSchema( diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskKillTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskKillTest.java index 162c86ec52e4..843ca4cca3cb 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskKillTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskKillTest.java @@ -77,7 +77,8 @@ public void testStopGracefully() throws Exception // Sub tasks would run forever new TestInputSource(Pair.of(new TestInput(Integer.MAX_VALUE, TaskState.SUCCESS), 4)), new NoopInputFormat(), - false + false, + null ) ); getIndexingServiceClient().runTask(task.getId(), task); @@ -109,7 +110,8 @@ public void testSubTaskFail() throws Exception Pair.of(new TestInput(Integer.MAX_VALUE, TaskState.FAILED), 3) ), new NoopInputFormat(), - false + false, + null ) ); final TaskActionClient actionClient = createActionClient(task); @@ -319,7 +321,8 @@ SinglePhaseSubTaskSpec newTaskSpec(InputSplit split) null, baseInputSource.withSplit(split), getIngestionSchema().getIOConfig().getInputFormat(), - getIngestionSchema().getIOConfig().isAppendToExisting() + getIngestionSchema().getIOConfig().isAppendToExisting(), + getIngestionSchema().getIOConfig().isDropExisting() ), getIngestionSchema().getTuningConfig() ), diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskResourceTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskResourceTest.java index f4b1746ee27f..224b034a0faa 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskResourceTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskResourceTest.java @@ -123,7 +123,8 @@ public void testAPIs() throws Exception null, new TestInputSource(IntStream.range(0, NUM_SUB_TASKS).boxed().collect(Collectors.toList())), new NoopInputFormat(), - false + false, + null ) ); getIndexingServiceClient().runTask(task.getId(), task); @@ -548,7 +549,8 @@ SinglePhaseSubTaskSpec newTaskSpec(InputSplit split) null, baseInputSource.withSplit(split), getIngestionSchema().getIOConfig().getInputFormat(), - getIngestionSchema().getIOConfig().isAppendToExisting() + getIngestionSchema().getIOConfig().isAppendToExisting(), + null ), getIngestionSchema().getTuningConfig() ), diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskSerdeTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskSerdeTest.java index 0d61e6cca5bb..949654871d0b 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskSerdeTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskSerdeTest.java @@ -198,7 +198,8 @@ private static class ParallelIndexIngestionSpecBuilder null, new LocalInputSource(new File("tmp"), "test_*"), new CsvInputFormat(Arrays.asList("ts", "dim", "val"), null, null, false, 0), - false + false, + null ); // For dataSchema.granularitySpec diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskTest.java index b4ee01394126..b29b6cf8e06d 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskTest.java @@ -190,7 +190,8 @@ public void testFailToConstructWhenBothAppendToExistingAndForceGuaranteedRollupA null, new InlineInputSource("test"), new JsonInputFormat(null, null, null), - appendToExisting + appendToExisting, + null ); final ParallelIndexTuningConfig tuningConfig = new ParallelIndexTuningConfig( null, diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTestingFactory.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTestingFactory.java index 0e6460dad752..6a15eeb1e79a 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTestingFactory.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTestingFactory.java @@ -219,7 +219,7 @@ static ParallelIndexIngestionSpec createIngestionSpec( DataSchema dataSchema ) { - ParallelIndexIOConfig ioConfig = new ParallelIndexIOConfig(null, inputSource, inputFormat, false); + ParallelIndexIOConfig ioConfig = new ParallelIndexIOConfig(null, inputSource, inputFormat, false, false); return new ParallelIndexIngestionSpec(dataSchema, ioConfig, tuningConfig); } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexingTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexingTest.java index 42c24ee24dda..f08579607564 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexingTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexingTest.java @@ -468,7 +468,8 @@ private ParallelIndexSupervisorTask newTask( null, new SettableSplittableLocalInputSource(inputDir, "test_*", splittableInputSource), DEFAULT_INPUT_FORMAT, - appendToExisting + appendToExisting, + null ), tuningConfig ); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTaskSpecTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTaskSpecTest.java index 8fd9049a0762..c28d43c3858b 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTaskSpecTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTaskSpecTest.java @@ -55,6 +55,7 @@ public class SinglePhaseSubTaskSpecTest null, new LocalInputSource(new File("baseDir"), "filter"), new JsonInputFormat(null, null, null), + null, null ), null diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java index d39905e91814..8f220870c474 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java @@ -742,7 +742,7 @@ public void testIndexTask() throws Exception ), null ), - new IndexIOConfig(null, new MockInputSource(), new NoopInputFormat(), false), + new IndexIOConfig(null, new MockInputSource(), new NoopInputFormat(), false, false), new IndexTuningConfig( null, 10000, @@ -825,7 +825,7 @@ public void testIndexTaskFailure() throws Exception null, mapper ), - new IndexIOConfig(null, new MockExceptionInputSource(), new NoopInputFormat(), false), + new IndexIOConfig(null, new MockExceptionInputSource(), new NoopInputFormat(), false, false), new IndexTuningConfig( null, 10000, @@ -1253,7 +1253,7 @@ public void testResumeTasks() throws Exception ), null ), - new IndexIOConfig(null, new MockInputSource(), new NoopInputFormat(), false), + new IndexIOConfig(null, new MockInputSource(), new NoopInputFormat(), false, false), new IndexTuningConfig( null, 10000, @@ -1363,7 +1363,7 @@ public void testUnifiedAppenderatorsManagerCleanup() throws Exception ), null ), - new IndexIOConfig(null, new MockInputSource(), new NoopInputFormat(), false), + new IndexIOConfig(null, new MockInputSource(), new NoopInputFormat(), false, false), new IndexTuningConfig( null, 10000, diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SequenceMetadataTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SequenceMetadataTest.java new file mode 100644 index 000000000000..537b6c4b6fab --- /dev/null +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SequenceMetadataTest.java @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.seekablestream; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import org.apache.druid.indexing.common.TaskToolbox; +import org.apache.druid.indexing.common.actions.TaskActionClient; +import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.common.Intervals; +import org.apache.druid.segment.SegmentUtils; +import org.apache.druid.segment.realtime.appenderator.TransactionalSegmentPublisher; +import org.apache.druid.timeline.DataSegment; +import org.apache.druid.timeline.partition.LinearShardSpec; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.mockito.ArgumentMatchers; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.junit.MockitoJUnitRunner; + +import java.util.Set; + +@RunWith(MockitoJUnitRunner.class) +public class SequenceMetadataTest +{ + @Rule + public ExpectedException expectedException = ExpectedException.none(); + + @Mock + private SeekableStreamIndexTaskRunner mockSeekableStreamIndexTaskRunner; + + @Mock + private SeekableStreamEndSequenceNumbers mockSeekableStreamEndSequenceNumbers; + + @Mock + private TaskActionClient mockTaskActionClient; + + @Mock + private TaskToolbox mockTaskToolbox; + + @Test + public void testPublishAnnotatedSegmentsThrowExceptionIfOverwriteSegmentsNotNullAndNotEmpty() throws Exception + { + DataSegment dataSegment = DataSegment.builder() + .dataSource("foo") + .interval(Intervals.of("2001/P1D")) + .shardSpec(new LinearShardSpec(1)) + .version("b") + .size(0) + .build(); + + Set notNullNotEmptySegment = ImmutableSet.of(dataSegment); + SequenceMetadata sequenceMetadata = new SequenceMetadata<>( + 1, + "test", + ImmutableMap.of(), + ImmutableMap.of(), + true, + ImmutableSet.of() + ); + TransactionalSegmentPublisher transactionalSegmentPublisher = sequenceMetadata.createPublisher(mockSeekableStreamIndexTaskRunner, mockTaskToolbox, true); + + expectedException.expect(ISE.class); + expectedException.expectMessage( + "Stream ingestion task unexpectedly attempted to overwrite segments: " + SegmentUtils.commaSeparatedIdentifiers(notNullNotEmptySegment) + ); + + transactionalSegmentPublisher.publishAnnotatedSegments(notNullNotEmptySegment, null, ImmutableSet.of(), null); + } + + @Test + public void testPublishAnnotatedSegmentsThrowExceptionIfDropSegmentsNotNullAndNotEmpty() throws Exception + { + DataSegment dataSegment = DataSegment.builder() + .dataSource("foo") + .interval(Intervals.of("2001/P1D")) + .shardSpec(new LinearShardSpec(1)) + .version("b") + .size(0) + .build(); + + Set notNullNotEmptySegment = ImmutableSet.of(dataSegment); + SequenceMetadata sequenceMetadata = new SequenceMetadata<>( + 1, + "test", + ImmutableMap.of(), + ImmutableMap.of(), + true, + ImmutableSet.of() + ); + TransactionalSegmentPublisher transactionalSegmentPublisher = sequenceMetadata.createPublisher(mockSeekableStreamIndexTaskRunner, mockTaskToolbox, true); + + expectedException.expect(ISE.class); + expectedException.expectMessage( + "Stream ingestion task unexpectedly attempted to drop segments: " + SegmentUtils.commaSeparatedIdentifiers(notNullNotEmptySegment) + ); + + transactionalSegmentPublisher.publishAnnotatedSegments(null, notNullNotEmptySegment, ImmutableSet.of(), null); + } + + @Test + public void testPublishAnnotatedSegmentsSucceedIfDropSegmentsAndOverwriteSegmentsNullAndEmpty() throws Exception + { + Mockito.when(mockSeekableStreamIndexTaskRunner.deserializePartitionsFromMetadata(ArgumentMatchers.anyObject(), ArgumentMatchers.anyObject())).thenReturn(mockSeekableStreamEndSequenceNumbers); + Mockito.when(mockSeekableStreamEndSequenceNumbers.getPartitionSequenceNumberMap()).thenReturn(ImmutableMap.of()); + Mockito.when(mockTaskToolbox.getTaskActionClient()).thenReturn(mockTaskActionClient); + DataSegment dataSegment = DataSegment.builder() + .dataSource("foo") + .interval(Intervals.of("2001/P1D")) + .shardSpec(new LinearShardSpec(1)) + .version("b") + .size(0) + .build(); + + Set notNullNotEmptySegment = ImmutableSet.of(dataSegment); + SequenceMetadata sequenceMetadata = new SequenceMetadata<>( + 1, + "test", + ImmutableMap.of(), + ImmutableMap.of(), + true, + ImmutableSet.of() + ); + TransactionalSegmentPublisher transactionalSegmentPublisher = sequenceMetadata.createPublisher(mockSeekableStreamIndexTaskRunner, mockTaskToolbox, false); + + transactionalSegmentPublisher.publishAnnotatedSegments(null, null, notNullNotEmptySegment, ImmutableMap.of()); + } +} diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java b/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java index bca3f7881e51..decaff87f875 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java @@ -120,6 +120,7 @@ public Set announceHistoricalSegments(Set segments) @Override public SegmentPublishResult announceHistoricalSegments( Set segments, + Set segmentsToDrop, DataSourceMetadata oldCommitMetadata, DataSourceMetadata newCommitMetadata ) diff --git a/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java b/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java index e2b141f0f395..cad6e2a73f79 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java @@ -318,19 +318,17 @@ public void testAutoCompactionDutyWithSegmentGranularity() throws Exception LOG.info("Auto compaction test with DAY segment granularity"); - // The earlier segment with YEAR granularity is still 'used' as it’s not fully overshaowed. - // This is because we only have newer version on 2013-08-31 to 2013-09-01 and 2013-09-01 to 2013-09-02. - // The version for the YEAR segment is still the latest for 2013-01-01 to 2013-08-31 and 2013-09-02 to 2014-01-01. - // Hence, all three segments are available and the expected intervals are combined from the DAY and YEAR segment granularities - // (which are 2013-08-31 to 2013-09-01, 2013-09-01 to 2013-09-02 and 2013-01-01 to 2014-01-01) + // The earlier segment with YEAR granularity will be dropped post-compaction + // Hence, we will only have 2013-08-31 to 2013-09-01 and 2013-09-01 to 2013-09-02. + expectedIntervalAfterCompaction = new ArrayList<>(); for (String interval : intervalsBeforeCompaction) { for (Interval newinterval : newGranularity.getIterable(new Interval(interval, ISOChronology.getInstanceUTC()))) { expectedIntervalAfterCompaction.add(newinterval.toString()); } } - forceTriggerAutoCompaction(3); + forceTriggerAutoCompaction(2); verifyQuery(INDEX_QUERIES_RESOURCE); - verifySegmentsCompacted(3, 1000); + verifySegmentsCompacted(2, 1000); checkCompactionIntervals(expectedIntervalAfterCompaction); } } @@ -478,23 +476,17 @@ public void testAutoCompactionDutyWithSegmentGranularityAndSmallerSegmentGranula submitCompactionConfig(MAX_ROWS_PER_SEGMENT_COMPACTED, NO_SKIP_OFFSET, new UserCompactionTaskGranularityConfig(newGranularity, null)); // This will submit a single compaction task for interval of 2013-01-01/2014-01-01 with MONTH granularity expectedIntervalAfterCompaction = new ArrayList<>(); - // We wil have one segment with interval of 2013-01-01/2014-01-01 (compacted with YEAR) from before the compaction - for (String interval : intervalsBeforeCompaction) { - for (Interval newinterval : Granularities.YEAR.getIterable(new Interval(interval, ISOChronology.getInstanceUTC()))) { - expectedIntervalAfterCompaction.add(newinterval.toString()); - } - } - // one segments with interval of 2013-09-01/2013-10-01 (compacted with MONTH) + // The previous segment with interval of 2013-01-01/2014-01-01 (compacted with YEAR) will be dropped + // We will only have one segments with interval of 2013-09-01/2013-10-01 (compacted with MONTH) // and one segments with interval of 2013-10-01/2013-11-01 (compacted with MONTH) for (String interval : intervalsBeforeCompaction) { for (Interval newinterval : Granularities.MONTH.getIterable(new Interval(interval, ISOChronology.getInstanceUTC()))) { expectedIntervalAfterCompaction.add(newinterval.toString()); } } - - forceTriggerAutoCompaction(3); + forceTriggerAutoCompaction(2); verifyQuery(INDEX_QUERIES_RESOURCE); - verifySegmentsCompacted(3, MAX_ROWS_PER_SEGMENT_COMPACTED); + verifySegmentsCompacted(2, MAX_ROWS_PER_SEGMENT_COMPACTED); checkCompactionIntervals(expectedIntervalAfterCompaction); } } diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractLocalInputSourceParallelIndexTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractLocalInputSourceParallelIndexTest.java index 82578d1e3625..8203d03e68c9 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractLocalInputSourceParallelIndexTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractLocalInputSourceParallelIndexTest.java @@ -75,6 +75,11 @@ public void doIndexTest(InputFormatDetails inputFormatDetails, @Nonnull Map().put("type", INPUT_FORMAT_DETAILS.getInputFormatType()) + .build(); + final Function sqlInputSourcePropsTransform = spec -> { + try { + spec = StringUtils.replace( + spec, + "%%PARTITIONS_SPEC%%", + jsonMapper.writeValueAsString(new DynamicPartitionsSpec(null, null)) + ); + spec = StringUtils.replace( + spec, + "%%INPUT_SOURCE_FILTER%%", + fileFilter + ); + spec = StringUtils.replace( + spec, + "%%INPUT_SOURCE_BASE_DIR%%", + "/resources/data/batch_index" + INPUT_FORMAT_DETAILS.getFolderSuffix() + ); + spec = StringUtils.replace( + spec, + "%%INPUT_FORMAT%%", + jsonMapper.writeValueAsString(inputFormatMap) + ); + spec = StringUtils.replace( + spec, + "%%APPEND_TO_EXISTING%%", + jsonMapper.writeValueAsString(false) + ); + spec = StringUtils.replace( + spec, + "%%DROP_EXISTING%%", + jsonMapper.writeValueAsString(dropExisting) + ); + spec = StringUtils.replace( + spec, + "%%FORCE_GUARANTEED_ROLLUP%%", + jsonMapper.writeValueAsString(false) + ); + return spec; + } + catch (Exception e) { + throw new RuntimeException(e); + } + }; + + doIndexTest( + indexDatasource, + INDEX_TASK, + sqlInputSourcePropsTransform, + null, + false, + false, + true + ); + } +} diff --git a/integration-tests/src/test/resources/indexer/wikipedia_index_queries_only_data3.json b/integration-tests/src/test/resources/indexer/wikipedia_index_queries_only_data3.json new file mode 100644 index 000000000000..5838c247d74d --- /dev/null +++ b/integration-tests/src/test/resources/indexer/wikipedia_index_queries_only_data3.json @@ -0,0 +1,198 @@ +[ + { + "description": "timeseries, 1 agg, all", + "query":{ + "queryType" : "timeBoundary", + "dataSource": "%%DATASOURCE%%" + }, + "expectedResults":[ + { + "timestamp" : "2013-09-01T03:32:45.000Z", + "result" : { + "minTime" : "2013-09-01T03:32:45.000Z", + "maxTime" : "2013-09-01T12:41:27.000Z" + } + } + ] + }, + { + "description": "timeseries, datasketch aggs, all", + "query":{ + "queryType" : "timeseries", + "dataSource": "%%DATASOURCE%%", + "granularity":"day", + "intervals":[ + "2013-08-31T00:00/2013-09-01T00:00" + ], + "filter":null, + "aggregations":[ + { + "type": "HLLSketchMerge", + "name": "approxCountHLL", + "fieldName": "HLLSketchBuild", + "lgK": 12, + "tgtHllType": "HLL_4", + "round": true + }, + { + "type":"thetaSketch", + "name":"approxCountTheta", + "fieldName":"thetaSketch", + "size":16384, + "shouldFinalize":true, + "isInputThetaSketch":false, + "errorBoundsStdDev":null + }, + { + "type":"quantilesDoublesSketch", + "name":"quantilesSketch", + "fieldName":"quantilesDoublesSketch", + "k":128 + } + ] + }, + "expectedResults":[] + }, + { + "description": "timeseries, datasketch aggs, all", + "query":{ + "queryType" : "timeseries", + "dataSource": "%%DATASOURCE%%", + "granularity":"day", + "intervals":[ + "2013-09-01T00:00/2013-09-02T00:00" + ], + "filter":null, + "aggregations":[ + { + "type": "HLLSketchMerge", + "name": "approxCountHLL", + "fieldName": "HLLSketchBuild", + "lgK": 12, + "tgtHllType": "HLL_4", + "round": true + }, + { + "type":"thetaSketch", + "name":"approxCountTheta", + "fieldName":"thetaSketch", + "size":16384, + "shouldFinalize":true, + "isInputThetaSketch":false, + "errorBoundsStdDev":null + }, + { + "type":"quantilesDoublesSketch", + "name":"quantilesSketch", + "fieldName":"quantilesDoublesSketch", + "k":128 + } + ] + }, + "expectedResults":[ + { + "timestamp" : "2013-09-01T00:00:00.000Z", + "result" : { + "quantilesSketch":4, + "approxCountTheta":4.0, + "approxCountHLL":4 + } + } + ] + }, + { + "description":"having spec on post aggregation", + "query":{ + "queryType":"groupBy", + "dataSource":"%%DATASOURCE%%", + "granularity":"day", + "dimensions":[ + "page" + ], + "filter":{ + "type":"selector", + "dimension":"language", + "value":"zh" + }, + "aggregations":[ + { + "type":"count", + "name":"rows" + }, + { + "type":"longSum", + "fieldName":"added", + "name":"added_count" + } + ], + "postAggregations": [ + { + "type":"arithmetic", + "name":"added_count_times_ten", + "fn":"*", + "fields":[ + {"type":"fieldAccess", "name":"added_count", "fieldName":"added_count"}, + {"type":"constant", "name":"const", "value":10} + ] + } + ], + "having":{"type":"greaterThan", "aggregation":"added_count_times_ten", "value":9000}, + "intervals":[ + "2013-08-31T00:00/2013-09-01T00:00" + ] + }, + "expectedResults":[] + }, + { + "description":"having spec on post aggregation", + "query":{ + "queryType":"groupBy", + "dataSource":"%%DATASOURCE%%", + "granularity":"day", + "dimensions":[ + "page" + ], + "filter":{ + "type":"selector", + "dimension":"language", + "value":"zh" + }, + "aggregations":[ + { + "type":"count", + "name":"rows" + }, + { + "type":"longSum", + "fieldName":"added", + "name":"added_count" + } + ], + "postAggregations": [ + { + "type":"arithmetic", + "name":"added_count_times_ten", + "fn":"*", + "fields":[ + {"type":"fieldAccess", "name":"added_count", "fieldName":"added_count"}, + {"type":"constant", "name":"const", "value":10} + ] + } + ], + "having":{"type":"greaterThan", "aggregation":"added_count_times_ten", "value":9000}, + "intervals":[ + "2013-09-01T00:00/2013-09-02T00:00" + ] + }, + "expectedResults":[ { + "version" : "v1", + "timestamp" : "2013-09-01T00:00:00.000Z", + "event" : { + "added_count_times_ten" : 9050.0, + "page" : "Crimson Typhoon", + "added_count" : 905, + "rows" : 1 + } + } ] + } +] \ No newline at end of file diff --git a/integration-tests/src/test/resources/indexer/wikipedia_local_input_source_index_task.json b/integration-tests/src/test/resources/indexer/wikipedia_local_input_source_index_task.json index 46de14ce41c2..ee0fd73021a8 100644 --- a/integration-tests/src/test/resources/indexer/wikipedia_local_input_source_index_task.json +++ b/integration-tests/src/test/resources/indexer/wikipedia_local_input_source_index_task.json @@ -72,6 +72,7 @@ "baseDir": "%%INPUT_SOURCE_BASE_DIR%%" }, "appendToExisting": %%APPEND_TO_EXISTING%%, + "dropExisting": %%DROP_EXISTING%%, "inputFormat": %%INPUT_FORMAT%% }, "tuningConfig": { diff --git a/server/src/main/java/org/apache/druid/indexing/overlord/DataSourceMetadata.java b/server/src/main/java/org/apache/druid/indexing/overlord/DataSourceMetadata.java index 23579a0c8a4c..6cad3c12649b 100644 --- a/server/src/main/java/org/apache/druid/indexing/overlord/DataSourceMetadata.java +++ b/server/src/main/java/org/apache/druid/indexing/overlord/DataSourceMetadata.java @@ -26,7 +26,7 @@ /** * Commit metadata for a dataSource. Used by - * {@link IndexerMetadataStorageCoordinator#announceHistoricalSegments(Set, DataSourceMetadata, DataSourceMetadata)} + * {@link IndexerMetadataStorageCoordinator#announceHistoricalSegments(Set, Set, DataSourceMetadata, DataSourceMetadata)} * to provide metadata transactions for segment inserts. * * Two metadata instances can be added together, and any conflicts are resolved in favor of the right-hand side. diff --git a/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java b/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java index fdbdb7a94439..3a1d2578c5a0 100644 --- a/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java +++ b/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java @@ -207,13 +207,17 @@ SegmentIdWithShardSpec allocatePendingSegment( * If startMetadata and endMetadata are set, this insertion will be atomic with a compare-and-swap on dataSource * commit metadata. * - * @param segments set of segments to add, must all be from the same dataSource - * @param startMetadata dataSource metadata pre-insert must match this startMetadata according to - * {@link DataSourceMetadata#matches(DataSourceMetadata)}. If null, this insert will - * not involve a metadata transaction - * @param endMetadata dataSource metadata post-insert will have this endMetadata merged in with - * {@link DataSourceMetadata#plus(DataSourceMetadata)}. If null, this insert will not - * involve a metadata transaction + * If segmentsToDrop is not null and not empty, this insertion will be atomic with a insert-and-drop on inserting + * {@param segments} and dropping {@param segmentsToDrop} + * + * @param segments set of segments to add, must all be from the same dataSource + * @param segmentsToDrop set of segments to drop, must all be from the same dataSource + * @param startMetadata dataSource metadata pre-insert must match this startMetadata according to + * {@link DataSourceMetadata#matches(DataSourceMetadata)}. If null, this insert will + * not involve a metadata transaction + * @param endMetadata dataSource metadata post-insert will have this endMetadata merged in with + * {@link DataSourceMetadata#plus(DataSourceMetadata)}. If null, this insert will not + * involve a metadata transaction * * @return segment publish result indicating transaction success or failure, and set of segments actually published. * This method must only return a failure code if it is sure that the transaction did not happen. If it is not sure, @@ -224,6 +228,7 @@ SegmentIdWithShardSpec allocatePendingSegment( */ SegmentPublishResult announceHistoricalSegments( Set segments, + Set segmentsToDrop, @Nullable DataSourceMetadata startMetadata, @Nullable DataSourceMetadata endMetadata ) throws IOException; diff --git a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java index 0345b6e08c51..05fcb563dad4 100644 --- a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java +++ b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java @@ -21,6 +21,7 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.collect.FluentIterable; import com.google.common.collect.ImmutableList; @@ -58,6 +59,7 @@ import org.apache.druid.timeline.partition.SingleDimensionShardSpec; import org.joda.time.Interval; import org.joda.time.chrono.ISOChronology; +import org.skife.jdbi.v2.Batch; import org.skife.jdbi.v2.Folder3; import org.skife.jdbi.v2.Handle; import org.skife.jdbi.v2.PreparedBatch; @@ -108,7 +110,7 @@ public IndexerSQLMetadataStorageCoordinator( this.connector = connector; } - enum DataSourceMetadataUpdateResult + enum DataStoreMetadataUpdateResult { SUCCESS, FAILURE, @@ -325,7 +327,7 @@ private Query> createUsedSegmentsSqlQueryForIntervals( @Override public Set announceHistoricalSegments(final Set segments) throws IOException { - final SegmentPublishResult result = announceHistoricalSegments(segments, null, null); + final SegmentPublishResult result = announceHistoricalSegments(segments, null, null, null); // Metadata transaction cannot fail because we are not trying to do one. if (!result.isSuccess()) { @@ -338,6 +340,7 @@ public Set announceHistoricalSegments(final Set segmen @Override public SegmentPublishResult announceHistoricalSegments( final Set segments, + final Set segmentsToDrop, @Nullable final DataSourceMetadata startMetadata, @Nullable final DataSourceMetadata endMetadata ) throws IOException @@ -383,21 +386,40 @@ public SegmentPublishResult inTransaction( definitelyNotUpdated.set(false); if (startMetadata != null) { - final DataSourceMetadataUpdateResult result = updateDataSourceMetadataWithHandle( + final DataStoreMetadataUpdateResult result = updateDataSourceMetadataWithHandle( handle, dataSource, startMetadata, endMetadata ); - if (result != DataSourceMetadataUpdateResult.SUCCESS) { + if (result != DataStoreMetadataUpdateResult.SUCCESS) { // Metadata was definitely not updated. transactionStatus.setRollbackOnly(); definitelyNotUpdated.set(true); - if (result == DataSourceMetadataUpdateResult.FAILURE) { + if (result == DataStoreMetadataUpdateResult.FAILURE) { throw new RuntimeException("Aborting transaction!"); - } else if (result == DataSourceMetadataUpdateResult.TRY_AGAIN) { + } else if (result == DataStoreMetadataUpdateResult.TRY_AGAIN) { + throw new RetryTransactionException("Aborting transaction!"); + } + } + } + + if (segmentsToDrop != null && !segmentsToDrop.isEmpty()) { + final DataStoreMetadataUpdateResult result = dropSegmentsWithHandle( + handle, + segmentsToDrop, + dataSource + ); + if (result != DataStoreMetadataUpdateResult.SUCCESS) { + // Metadata store was definitely not updated. + transactionStatus.setRollbackOnly(); + definitelyNotUpdated.set(true); + + if (result == DataStoreMetadataUpdateResult.FAILURE) { + throw new RuntimeException("Aborting transaction!"); + } else if (result == DataStoreMetadataUpdateResult.TRY_AGAIN) { throw new RetryTransactionException("Aborting transaction!"); } } @@ -409,7 +431,7 @@ public SegmentPublishResult inTransaction( } }, 3, - SQLMetadataConnector.DEFAULT_MAX_TRIES + getSqlMetadataMaxRetry() ); } catch (CallbackFailedException e) { @@ -454,21 +476,21 @@ public SegmentPublishResult inTransaction( // Set definitelyNotUpdated back to false upon retrying. definitelyNotUpdated.set(false); - final DataSourceMetadataUpdateResult result = updateDataSourceMetadataWithHandle( + final DataStoreMetadataUpdateResult result = updateDataSourceMetadataWithHandle( handle, dataSource, startMetadata, endMetadata ); - if (result != DataSourceMetadataUpdateResult.SUCCESS) { + if (result != DataStoreMetadataUpdateResult.SUCCESS) { // Metadata was definitely not updated. transactionStatus.setRollbackOnly(); definitelyNotUpdated.set(true); - if (result == DataSourceMetadataUpdateResult.FAILURE) { + if (result == DataStoreMetadataUpdateResult.FAILURE) { throw new RuntimeException("Aborting transaction!"); - } else if (result == DataSourceMetadataUpdateResult.TRY_AGAIN) { + } else if (result == DataStoreMetadataUpdateResult.TRY_AGAIN) { throw new RetryTransactionException("Aborting transaction!"); } } @@ -477,7 +499,7 @@ public SegmentPublishResult inTransaction( } }, 3, - SQLMetadataConnector.DEFAULT_MAX_TRIES + getSqlMetadataMaxRetry() ); } catch (CallbackFailedException e) { @@ -490,6 +512,12 @@ public SegmentPublishResult inTransaction( } } + @VisibleForTesting + public int getSqlMetadataMaxRetry() + { + return SQLMetadataConnector.DEFAULT_MAX_TRIES; + } + @Override public SegmentIdWithShardSpec allocatePendingSegment( final String dataSource, @@ -1087,12 +1115,12 @@ private Set segmentExistsBatch(final Handle handle, final Set INSERT can fail due to races; callers must be prepared to retry. final int numRows = handle.createStatement( @@ -1165,7 +1193,7 @@ protected DataSourceMetadataUpdateResult updateDataSourceMetadataWithHandle( .bind("commit_metadata_sha1", newCommitMetadataSha1) .execute(); - retVal = numRows == 1 ? DataSourceMetadataUpdateResult.SUCCESS : DataSourceMetadataUpdateResult.TRY_AGAIN; + retVal = numRows == 1 ? DataStoreMetadataUpdateResult.SUCCESS : DataStoreMetadataUpdateResult.TRY_AGAIN; } else { // Expecting a particular old metadata; use the SHA1 in a compare-and-swap UPDATE final int numRows = handle.createStatement( @@ -1183,10 +1211,10 @@ protected DataSourceMetadataUpdateResult updateDataSourceMetadataWithHandle( .bind("new_commit_metadata_sha1", newCommitMetadataSha1) .execute(); - retVal = numRows == 1 ? DataSourceMetadataUpdateResult.SUCCESS : DataSourceMetadataUpdateResult.TRY_AGAIN; + retVal = numRows == 1 ? DataStoreMetadataUpdateResult.SUCCESS : DataStoreMetadataUpdateResult.TRY_AGAIN; } - if (retVal == DataSourceMetadataUpdateResult.SUCCESS) { + if (retVal == DataStoreMetadataUpdateResult.SUCCESS) { log.info("Updated metadata from[%s] to[%s].", oldCommitMetadataFromDb, newCommitMetadata); } else { log.info("Not updating metadata, compare-and-swap failure."); @@ -1195,6 +1223,64 @@ protected DataSourceMetadataUpdateResult updateDataSourceMetadataWithHandle( return retVal; } + /** + * Mark segments as unsed in a transaction. This method is idempotent in that if + * the segments was already marked unused, it will return true. + * + * @param handle database handle + * @param segmentsToDrop segments to mark as unused + * @param dataSource druid dataSource + * + * @return SUCCESS if segment was marked unused, FAILURE or + * TRY_AGAIN if it definitely was not updated. This guarantee is meant to help + * {@link #announceHistoricalSegments(Set, Set, DataSourceMetadata, DataSourceMetadata)} + * achieve its own guarantee. + * + * @throws RuntimeException if state is unknown after this call + */ + protected DataStoreMetadataUpdateResult dropSegmentsWithHandle( + final Handle handle, + final Set segmentsToDrop, + final String dataSource + ) + { + Preconditions.checkNotNull(dataSource, "dataSource"); + Preconditions.checkNotNull(segmentsToDrop, "segmentsToDrop"); + + if (segmentsToDrop.isEmpty()) { + return DataStoreMetadataUpdateResult.SUCCESS; + } + + if (segmentsToDrop.stream().anyMatch(segment -> !dataSource.equals(segment.getDataSource()))) { + // All segments to drop must belong to the same datasource + log.error( + "Not dropping segments, as not all segments belong to the datasource[%s].", + dataSource + ); + return DataStoreMetadataUpdateResult.FAILURE; + } + final List segmentIdList = segmentsToDrop.stream().map(segment -> segment.getId().toString()).collect(Collectors.toList()); + Batch batch = handle.createBatch(); + segmentIdList.forEach(segmentId -> batch.add( + StringUtils.format( + "UPDATE %s SET used=false WHERE datasource = '%s' AND id = '%s'", + dbTables.getSegmentsTable(), + dataSource, + segmentId + ) + )); + final int[] segmentChanges = batch.execute(); + int numChangedSegments = SqlSegmentsMetadataManager.computeNumChangedSegments(segmentIdList, segmentChanges); + if (numChangedSegments != segmentsToDrop.size()) { + log.warn("Failed to drop segments metadata update as numChangedSegments[%s] segmentsToDropSize[%s]", + numChangedSegments, + segmentsToDrop.size() + ); + return DataStoreMetadataUpdateResult.TRY_AGAIN; + } + return DataStoreMetadataUpdateResult.SUCCESS; + } + @Override public boolean deleteDataSourceMetadata(final String dataSource) { diff --git a/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataManager.java b/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataManager.java index 68c333a1a4f6..0ea264dc8912 100644 --- a/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataManager.java +++ b/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataManager.java @@ -883,7 +883,7 @@ private boolean markSegmentAsUnusedInDatabase(String segmentId) return numUpdatedRows > 0; } - private static int computeNumChangedSegments(List segmentIds, int[] segmentChanges) + static int computeNumChangedSegments(List segmentIds, int[] segmentChanges) { int numChangedSegments = 0; for (int i = 0; i < segmentChanges.length; i++) { diff --git a/server/src/main/java/org/apache/druid/segment/indexing/BatchIOConfig.java b/server/src/main/java/org/apache/druid/segment/indexing/BatchIOConfig.java index f964055bed23..b05b38145ae8 100644 --- a/server/src/main/java/org/apache/druid/segment/indexing/BatchIOConfig.java +++ b/server/src/main/java/org/apache/druid/segment/indexing/BatchIOConfig.java @@ -32,4 +32,6 @@ public interface BatchIOConfig extends IOConfig InputFormat getInputFormat(); boolean isAppendToExisting(); + + boolean isDropExisting(); } diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java index 92b9f3cdd00d..c0e5347dee61 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java @@ -558,6 +558,7 @@ ListenableFuture dropInBackground(SegmentsAndCommitMe */ ListenableFuture publishInBackground( @Nullable Set segmentsToBeOverwritten, + @Nullable Set segmentsToBeDropped, SegmentsAndCommitMetadata segmentsAndCommitMetadata, TransactionalSegmentPublisher publisher, java.util.function.Function, Set> outputSegmentsAnnotateFunction @@ -593,6 +594,7 @@ ListenableFuture publishInBackground( final ImmutableSet ourSegments = ImmutableSet.copyOf(segmentsAndCommitMetadata.getSegments()); final SegmentPublishResult publishResult = publisher.publishSegments( segmentsToBeOverwritten, + segmentsToBeDropped, ourSegments, outputSegmentsAnnotateFunction, callerMetadata diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BatchAppenderatorDriver.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BatchAppenderatorDriver.java index 9993486bc578..ca369447a657 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BatchAppenderatorDriver.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BatchAppenderatorDriver.java @@ -191,12 +191,14 @@ private SegmentsAndCommitMetadata pushAndClear( * Publish all segments. * * @param segmentsToBeOverwritten segments which can be overwritten by new segments published by the given publisher + * @param segmentsToBeDropped segments which will be dropped and marked unused * @param publisher segment publisher * * @return a {@link ListenableFuture} for the publish task */ public ListenableFuture publishAll( @Nullable final Set segmentsToBeOverwritten, + @Nullable final Set segmentsToBeDropped, final TransactionalSegmentPublisher publisher, final Function, Set> outputSegmentsAnnotateFunction ) @@ -208,6 +210,7 @@ public ListenableFuture publishAll( return publishInBackground( segmentsToBeOverwritten, + segmentsToBeDropped, new SegmentsAndCommitMetadata( snapshot .values() diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriver.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriver.java index ea466f6de109..f510df37ac6a 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriver.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriver.java @@ -278,6 +278,7 @@ public ListenableFuture publish( // version of a segment with the same identifier containing different data; see DataSegmentPusher.push() docs pushInBackground(wrapCommitter(committer), theSegments, true), (AsyncFunction) sam -> publishInBackground( + null, null, sam, publisher, diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/TransactionalSegmentPublisher.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/TransactionalSegmentPublisher.java index 2ffb4dd572a3..a71e4cdb6d36 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/TransactionalSegmentPublisher.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/TransactionalSegmentPublisher.java @@ -41,12 +41,14 @@ public interface TransactionalSegmentPublisher */ SegmentPublishResult publishAnnotatedSegments( @Nullable Set segmentsToBeOverwritten, + @Nullable Set segmentsToDrop, Set segmentsToPublish, @Nullable Object commitMetadata ) throws IOException; default SegmentPublishResult publishSegments( @Nullable Set segmentsToBeOverwritten, + @Nullable Set segmentsToDrop, Set segmentsToPublish, Function, Set> outputSegmentsAnnotateFunction, @Nullable Object commitMetadata @@ -56,6 +58,7 @@ default SegmentPublishResult publishSegments( .andThen(SegmentPublisherHelper::annotateShardSpec); return publishAnnotatedSegments( segmentsToBeOverwritten, + segmentsToDrop, annotateFunction.apply(segmentsToPublish), commitMetadata ); diff --git a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java index ac0611f5e99c..924ad92bdcd4 100644 --- a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java +++ b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java @@ -29,6 +29,7 @@ import org.apache.druid.indexing.overlord.SegmentPublishResult; import org.apache.druid.indexing.overlord.Segments; import org.apache.druid.java.util.common.DateTimes; +import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.segment.TestHelper; @@ -54,11 +55,13 @@ import org.junit.Test; import org.junit.rules.ExpectedException; import org.skife.jdbi.v2.Handle; +import org.skife.jdbi.v2.PreparedBatch; import org.skife.jdbi.v2.tweak.HandleCallback; import org.skife.jdbi.v2.util.StringMapper; import java.io.IOException; import java.nio.charset.StandardCharsets; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.Comparator; @@ -70,6 +73,8 @@ public class IndexerSQLMetadataStorageCoordinatorTest { + private static final int MAX_SQL_MEATADATA_RETRY_FOR_TEST = 2; + @Rule public final TestDerbyConnector.DerbyConnectorRule derbyConnectorRule = new TestDerbyConnector.DerbyConnectorRule(); @@ -187,8 +192,34 @@ public class IndexerSQLMetadataStorageCoordinatorTest 100 ); + private final DataSegment existingSegment1 = new DataSegment( + "fooDataSource", + Intervals.of("1994-01-01T00Z/1994-01-02T00Z"), + "zversion", + ImmutableMap.of(), + ImmutableList.of("dim1"), + ImmutableList.of("m1"), + new NumberedShardSpec(1, 1), + 9, + 100 + ); + + private final DataSegment existingSegment2 = new DataSegment( + "fooDataSource", + Intervals.of("1994-01-02T00Z/1994-01-03T00Z"), + "zversion", + ImmutableMap.of(), + ImmutableList.of("dim1"), + ImmutableList.of("m1"), + new NumberedShardSpec(1, 1), + 9, + 100 + ); + private final Set SEGMENTS = ImmutableSet.of(defaultSegment, defaultSegment2); private final AtomicLong metadataUpdateCounter = new AtomicLong(); + private final AtomicLong segmentTableDropUpdateCounter = new AtomicLong(); + private IndexerSQLMetadataStorageCoordinator coordinator; private TestDerbyConnector derbyConnector; @@ -202,6 +233,7 @@ public void setUp() derbyConnector.createSegmentTable(); derbyConnector.createPendingSegmentsTable(); metadataUpdateCounter.set(0); + segmentTableDropUpdateCounter.set(0); coordinator = new IndexerSQLMetadataStorageCoordinator( mapper, derbyConnectorRule.metadataTablesConfigSupplier().get(), @@ -209,7 +241,7 @@ public void setUp() ) { @Override - protected DataSourceMetadataUpdateResult updateDataSourceMetadataWithHandle( + protected DataStoreMetadataUpdateResult updateDataSourceMetadataWithHandle( Handle handle, String dataSource, DataSourceMetadata startMetadata, @@ -220,6 +252,24 @@ protected DataSourceMetadataUpdateResult updateDataSourceMetadataWithHandle( metadataUpdateCounter.getAndIncrement(); return super.updateDataSourceMetadataWithHandle(handle, dataSource, startMetadata, endMetadata); } + + @Override + protected DataStoreMetadataUpdateResult dropSegmentsWithHandle( + final Handle handle, + final Set segmentsToDrop, + final String dataSource + ) + { + // Count number of times this method is called. + segmentTableDropUpdateCounter.getAndIncrement(); + return super.dropSegmentsWithHandle(handle, segmentsToDrop, dataSource); + } + + @Override + public int getSqlMetadataMaxRetry() + { + return MAX_SQL_MEATADATA_RETRY_FOR_TEST; + } }; } @@ -263,6 +313,47 @@ public List withHandle(Handle handle) ); } + private Boolean insertUsedSegments(Set dataSegments) + { + final String table = derbyConnectorRule.metadataTablesConfigSupplier().get().getSegmentsTable(); + return derbyConnector.retryWithHandle( + new HandleCallback() + { + @Override + public Boolean withHandle(Handle handle) throws Exception + { + PreparedBatch preparedBatch = handle.prepareBatch( + StringUtils.format( + "INSERT INTO %1$s (id, dataSource, created_date, start, %2$send%2$s, partitioned, version, used, payload) " + + "VALUES (:id, :dataSource, :created_date, :start, :end, :partitioned, :version, :used, :payload)", + table, + derbyConnector.getQuoteString() + ) + ); + for (DataSegment segment : dataSegments) { + preparedBatch.add() + .bind("id", segment.getId().toString()) + .bind("dataSource", segment.getDataSource()) + .bind("created_date", DateTimes.nowUtc().toString()) + .bind("start", segment.getInterval().getStart().toString()) + .bind("end", segment.getInterval().getEnd().toString()) + .bind("partitioned", (segment.getShardSpec() instanceof NoneShardSpec) ? false : true) + .bind("version", segment.getVersion()) + .bind("used", true) + .bind("payload", mapper.writeValueAsBytes(segment)); + } + + final int[] affectedRows = preparedBatch.execute(); + final boolean succeeded = Arrays.stream(affectedRows).allMatch(eachAffectedRows -> eachAffectedRows == 1); + if (!succeeded) { + throw new ISE("Failed to publish segments to DB"); + } + return true; + } + } + ); + } + @Test public void testSimpleAnnounce() throws IOException { @@ -360,6 +451,7 @@ public void testTransactionalAnnounceSuccess() throws IOException // Insert first segment. final SegmentPublishResult result1 = coordinator.announceHistoricalSegments( ImmutableSet.of(defaultSegment), + ImmutableSet.of(), new ObjectMetadata(null), new ObjectMetadata(ImmutableMap.of("foo", "bar")) ); @@ -378,6 +470,7 @@ public void testTransactionalAnnounceSuccess() throws IOException // Insert second segment. final SegmentPublishResult result2 = coordinator.announceHistoricalSegments( ImmutableSet.of(defaultSegment2), + ImmutableSet.of(), new ObjectMetadata(ImmutableMap.of("foo", "bar")), new ObjectMetadata(ImmutableMap.of("foo", "baz")) ); @@ -415,7 +508,7 @@ public void testTransactionalAnnounceRetryAndSuccess() throws IOException ) { @Override - protected DataSourceMetadataUpdateResult updateDataSourceMetadataWithHandle( + protected DataStoreMetadataUpdateResult updateDataSourceMetadataWithHandle( Handle handle, String dataSource, DataSourceMetadata startMetadata, @@ -424,7 +517,7 @@ protected DataSourceMetadataUpdateResult updateDataSourceMetadataWithHandle( { metadataUpdateCounter.getAndIncrement(); if (attemptCounter.getAndIncrement() == 0) { - return DataSourceMetadataUpdateResult.TRY_AGAIN; + return DataStoreMetadataUpdateResult.TRY_AGAIN; } else { return super.updateDataSourceMetadataWithHandle(handle, dataSource, startMetadata, endMetadata); } @@ -434,6 +527,7 @@ protected DataSourceMetadataUpdateResult updateDataSourceMetadataWithHandle( // Insert first segment. final SegmentPublishResult result1 = failOnceCoordinator.announceHistoricalSegments( ImmutableSet.of(defaultSegment), + ImmutableSet.of(), new ObjectMetadata(null), new ObjectMetadata(ImmutableMap.of("foo", "bar")) ); @@ -455,6 +549,7 @@ protected DataSourceMetadataUpdateResult updateDataSourceMetadataWithHandle( // Insert second segment. final SegmentPublishResult result2 = failOnceCoordinator.announceHistoricalSegments( ImmutableSet.of(defaultSegment2), + ImmutableSet.of(), new ObjectMetadata(ImmutableMap.of("foo", "bar")), new ObjectMetadata(ImmutableMap.of("foo", "baz")) ); @@ -485,6 +580,7 @@ public void testTransactionalAnnounceFailDbNullWantNotNull() throws IOException { final SegmentPublishResult result1 = coordinator.announceHistoricalSegments( ImmutableSet.of(defaultSegment), + ImmutableSet.of(), new ObjectMetadata(ImmutableMap.of("foo", "bar")), new ObjectMetadata(ImmutableMap.of("foo", "baz")) ); @@ -494,11 +590,116 @@ public void testTransactionalAnnounceFailDbNullWantNotNull() throws IOException Assert.assertEquals(1, metadataUpdateCounter.get()); } + @Test + public void testTransactionalAnnounceFailSegmentDropFailWithoutRetry() throws IOException + { + insertUsedSegments(ImmutableSet.of(existingSegment1, existingSegment2)); + + Assert.assertEquals( + ImmutableList.of(existingSegment1.getId().toString(), existingSegment2.getId().toString()), + retrieveUsedSegmentIds() + ); + + DataSegment dataSegmentBar = DataSegment.builder() + .dataSource("bar") + .interval(Intervals.of("2001/P1D")) + .shardSpec(new LinearShardSpec(1)) + .version("b") + .size(0) + .build(); + Set dropSegments = ImmutableSet.of(existingSegment1, existingSegment2, dataSegmentBar); + + final SegmentPublishResult result1 = coordinator.announceHistoricalSegments( + SEGMENTS, + dropSegments, + null, + null + ); + Assert.assertEquals(SegmentPublishResult.fail("java.lang.RuntimeException: Aborting transaction!"), result1); + + // Should only be tried once. Since dropSegmentsWithHandle will return FAILURE (not TRY_AGAIN) as set of + // segments to drop contains more than one datasource. + Assert.assertEquals(1, segmentTableDropUpdateCounter.get()); + + Assert.assertEquals( + ImmutableList.of(existingSegment1.getId().toString(), existingSegment2.getId().toString()), + retrieveUsedSegmentIds() + ); + } + + @Test + public void testTransactionalAnnounceSucceedWithSegmentDrop() throws IOException + { + insertUsedSegments(ImmutableSet.of(existingSegment1, existingSegment2)); + + Assert.assertEquals( + ImmutableList.of(existingSegment1.getId().toString(), existingSegment2.getId().toString()), + retrieveUsedSegmentIds() + ); + + final SegmentPublishResult result1 = coordinator.announceHistoricalSegments( + SEGMENTS, + ImmutableSet.of(existingSegment1, existingSegment2), + null, + null + ); + + Assert.assertEquals(SegmentPublishResult.ok(SEGMENTS), result1); + + for (DataSegment segment : SEGMENTS) { + Assert.assertArrayEquals( + mapper.writeValueAsString(segment).getBytes(StandardCharsets.UTF_8), + derbyConnector.lookup( + derbyConnectorRule.metadataTablesConfigSupplier().get().getSegmentsTable(), + "id", + "payload", + segment.getId().toString() + ) + ); + } + + Assert.assertEquals( + ImmutableList.of(defaultSegment.getId().toString(), defaultSegment2.getId().toString()), + retrieveUsedSegmentIds() + ); + } + + @Test + public void testTransactionalAnnounceFailSegmentDropFailWithRetry() throws IOException + { + insertUsedSegments(ImmutableSet.of(existingSegment1, existingSegment2)); + + Assert.assertEquals( + ImmutableList.of(existingSegment1.getId().toString(), existingSegment2.getId().toString()), + retrieveUsedSegmentIds() + ); + + DataSegment nonExistingSegment = defaultSegment4; + + Set dropSegments = ImmutableSet.of(existingSegment1, nonExistingSegment); + + final SegmentPublishResult result1 = coordinator.announceHistoricalSegments( + SEGMENTS, + dropSegments, + null, + null + ); + Assert.assertEquals(SegmentPublishResult.fail("org.apache.druid.metadata.RetryTransactionException: Aborting transaction!"), result1); + + Assert.assertEquals(MAX_SQL_MEATADATA_RETRY_FOR_TEST, segmentTableDropUpdateCounter.get()); + + Assert.assertEquals( + ImmutableList.of(existingSegment1.getId().toString(), existingSegment2.getId().toString()), + retrieveUsedSegmentIds() + ); + } + @Test public void testTransactionalAnnounceFailDbNotNullWantNull() throws IOException { final SegmentPublishResult result1 = coordinator.announceHistoricalSegments( ImmutableSet.of(defaultSegment), + ImmutableSet.of(), new ObjectMetadata(null), new ObjectMetadata(ImmutableMap.of("foo", "baz")) ); @@ -506,6 +707,7 @@ public void testTransactionalAnnounceFailDbNotNullWantNull() throws IOException final SegmentPublishResult result2 = coordinator.announceHistoricalSegments( ImmutableSet.of(defaultSegment2), + ImmutableSet.of(), new ObjectMetadata(null), new ObjectMetadata(ImmutableMap.of("foo", "baz")) ); @@ -520,6 +722,7 @@ public void testTransactionalAnnounceFailDbNotNullWantDifferent() throws IOExcep { final SegmentPublishResult result1 = coordinator.announceHistoricalSegments( ImmutableSet.of(defaultSegment), + ImmutableSet.of(), new ObjectMetadata(null), new ObjectMetadata(ImmutableMap.of("foo", "baz")) ); @@ -527,6 +730,7 @@ public void testTransactionalAnnounceFailDbNotNullWantDifferent() throws IOExcep final SegmentPublishResult result2 = coordinator.announceHistoricalSegments( ImmutableSet.of(defaultSegment2), + ImmutableSet.of(), new ObjectMetadata(ImmutableMap.of("foo", "qux")), new ObjectMetadata(ImmutableMap.of("foo", "baz")) ); @@ -833,6 +1037,7 @@ public void testDeleteDataSourceMetadata() throws IOException { coordinator.announceHistoricalSegments( ImmutableSet.of(defaultSegment), + ImmutableSet.of(), new ObjectMetadata(null), new ObjectMetadata(ImmutableMap.of("foo", "bar")) ); @@ -1238,4 +1443,32 @@ public void testAddNumberedShardSpecAfterSingleDimensionsShardSpecWithUnknownCor ); Assert.assertNull(id); } + + @Test + public void testDropSegmentsWithHandleForSegmentThatExist() + { + try (Handle handle = derbyConnector.getDBI().open()) { + Assert.assertTrue(insertUsedSegments(ImmutableSet.of(defaultSegment))); + List usedSegments = retrieveUsedSegmentIds(); + Assert.assertEquals(1, usedSegments.size()); + Assert.assertEquals(defaultSegment.getId().toString(), usedSegments.get(0)); + + // Try drop segment + IndexerSQLMetadataStorageCoordinator.DataStoreMetadataUpdateResult result = coordinator.dropSegmentsWithHandle(handle, ImmutableSet.of(defaultSegment), defaultSegment.getDataSource()); + + Assert.assertEquals(IndexerSQLMetadataStorageCoordinator.DataStoreMetadataUpdateResult.SUCCESS, result); + usedSegments = retrieveUsedSegmentIds(); + Assert.assertEquals(0, usedSegments.size()); + } + } + + @Test + public void testDropSegmentsWithHandleForSegmentThatDoesNotExist() + { + try (Handle handle = derbyConnector.getDBI().open()) { + // Try drop segment + IndexerSQLMetadataStorageCoordinator.DataStoreMetadataUpdateResult result = coordinator.dropSegmentsWithHandle(handle, ImmutableSet.of(defaultSegment), defaultSegment.getDataSource()); + Assert.assertEquals(IndexerSQLMetadataStorageCoordinator.DataStoreMetadataUpdateResult.TRY_AGAIN, result); + } + } } diff --git a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/BatchAppenderatorDriverTest.java b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/BatchAppenderatorDriverTest.java index 57aecb534c99..024bfa053913 100644 --- a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/BatchAppenderatorDriverTest.java +++ b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/BatchAppenderatorDriverTest.java @@ -122,7 +122,7 @@ public void testSimple() throws Exception checkSegmentStates(2, SegmentState.PUSHED_AND_DROPPED); final SegmentsAndCommitMetadata published = - driver.publishAll(null, makeOkPublisher(), Function.identity()).get(TIMEOUT, TimeUnit.MILLISECONDS); + driver.publishAll(null, null, makeOkPublisher(), Function.identity()).get(TIMEOUT, TimeUnit.MILLISECONDS); Assert.assertEquals( ImmutableSet.of( @@ -156,7 +156,7 @@ public void testIncrementalPush() throws Exception } final SegmentsAndCommitMetadata published = - driver.publishAll(null, makeOkPublisher(), Function.identity()).get(TIMEOUT, TimeUnit.MILLISECONDS); + driver.publishAll(null, null, makeOkPublisher(), Function.identity()).get(TIMEOUT, TimeUnit.MILLISECONDS); Assert.assertEquals( ImmutableSet.of( @@ -197,6 +197,6 @@ private void checkSegmentStates(int expectedNumSegmentsInState, SegmentState exp static TransactionalSegmentPublisher makeOkPublisher() { - return (segmentsToBeOverwritten, segmentsToPublish, commitMetadata) -> SegmentPublishResult.ok(ImmutableSet.of()); + return (segmentsToBeOverwritten, segmentsToBeDropped, segmentsToPublish, commitMetadata) -> SegmentPublishResult.ok(ImmutableSet.of()); } } diff --git a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriverTest.java b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriverTest.java index 87fc54b42a3a..a3614afea30f 100644 --- a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriverTest.java +++ b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriverTest.java @@ -366,13 +366,13 @@ private Set asIdentifiers(Iterable segments static TransactionalSegmentPublisher makeOkPublisher() { - return (segmentsToBeOverwritten, segmentsToPublish, commitMetadata) -> + return (segmentsToBeOverwritten, segmentsToBeDropped, segmentsToPublish, commitMetadata) -> SegmentPublishResult.ok(Collections.emptySet()); } static TransactionalSegmentPublisher makeFailingPublisher(boolean failWithException) { - return (segmentsToBeOverwritten, segmentsToPublish, commitMetadata) -> { + return (segmentsToBeOverwritten, segmentsToBeDropped, segmentsToPublish, commitMetadata) -> { final RuntimeException exception = new RuntimeException("test"); if (failWithException) { throw exception; diff --git a/website/.spelling b/website/.spelling index 39ec30bc4a13..9cc2b65667ac 100644 --- a/website/.spelling +++ b/website/.spelling @@ -1041,6 +1041,7 @@ chatHandlerTimeout connectorConfig countryName dataSchema's +dropExisting foldCase forceGuaranteedRollup httpAuthenticationPassword