Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ public void setup()
null,
null,
null,
null,
null
)
);
Expand Down
10 changes: 10 additions & 0 deletions docs/configuration/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -848,6 +848,7 @@ A description of the compaction config is:
|`tuningConfig`|Tuning config for compaction tasks. See below [Compaction Task TuningConfig](#automatic-compaction-tuningconfig).|no|
|`taskContext`|[Task context](../ingestion/tasks.md#context) for compaction tasks.|no|
|`granularitySpec`|Custom `granularitySpec` to describe the `segmentGranularity` for the compacted segments. See [Automatic compaction granularitySpec](#automatic-compaction-granularityspec)|No|
|`ioConfig`|IO config for compaction tasks. See below [Compaction Task IOConfig](#automatic-compaction-ioconfig).|no|

An example of compaction config is:

Expand Down Expand Up @@ -899,6 +900,15 @@ You can optionally use the `granularitySpec` object to configure the segment gra

> Unlike manual compaction, automatic compaction does not support query granularity.

###### Automatic compaction IOConfig

Auto compaction supports a subset of the [IOConfig for Parallel task](../ingestion/native-batch.md).
The below is a list of the supported configurations for auto compaction.

|Property|Description|Default|Required|
|--------|-----------|-------|--------|
|`dropExisting`|If `true`, then the generated compaction task drops (mark unused) all existing segments fully contained by the umbrella interval of the compacted segments when the task publishes new segments. If compaction fails, Druid does not drop or mark unused any segments. WARNING: this functionality is still in beta and can result in temporary data unavailability for data within the compacted `interval`. Note that changing this config does not cause intervals to be compacted again.|false|no|

### Overlord

For general Overlord Process information, see [here](../design/overlord.md).
Expand Down
12 changes: 7 additions & 5 deletions docs/ingestion/compaction.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ See [Setting up a manual compaction task](#setting-up-manual-compaction) for mor
## Data handling with compaction
During compaction, Druid overwrites the original set of segments with the compacted set. Druid also locks the segments for the time interval being compacted to ensure data consistency. By default, compaction tasks do not modify the underlying data. You can configure the compaction task to change the query granularity or add or remove dimensions in the compaction task. This means that the only changes to query results should be the result of intentional, not automatic, changes.

For compaction tasks, `dropExisting` for underlying ingestion tasks is "true". This means that Druid can drop (mark unused) all the existing segments fully within interval for the compaction task. For an example of why this is important, see the suggestion for reindexing with finer granularity under [Implementation considerations](native-batch.md#implementation-considerations).
For compaction tasks, `dropExisting` in `ioConfig` can be set to "true" for Druid to drop (mark unused) all existing segments fully contained by the interval of the compaction task. For an example of why this is important, see the suggestion for reindexing with finer granularity under [Implementation considerations](native-batch.md#implementation-considerations). WARNING: this functionality is still in beta and can result in temporary data unavailability for data within the compaction task interval.

If an ingestion task needs to write data to a segment for a time interval locked for compaction, by default the ingestion task supersedes the compaction task and the compaction task fails without finishing. For manual compaction tasks you can adjust the input spec interval to avoid conflicts between ingestion and compaction. For automatic compaction, you can set the `skipOffsetFromLatest` key to adjustment the auto compaction starting point from the current time to reduce the chance of conflicts between ingestion and compaction. See [Compaction dynamic configuration](../configuration/index.md#compaction-dynamic-configuration) for more information. Another option is to set the compaction task to higher priority than the ingestion task.

Expand Down Expand Up @@ -158,10 +158,12 @@ This task doesn't specify a `granularitySpec` so Druid retains the original segm

The compaction `ioConfig` requires specifying `inputSpec` as follows:

|Field|Description|Required|
|-----|-----------|--------|
|`type`|Task type. Should be `compact`|Yes|
|`inputSpec`|Input specification|Yes|
|Field|Description|Default|Required?|
|-----|-----------|-------|--------|
|`type`|Task type. Should be `compact`|none|Yes|
|`inputSpec`|Input specification|none|Yes|
|`dropExisting`|If `true`, then the compaction task drops (mark unused) all existing segments fully contained by either the `interval` in the `interval` type `inputSpec` or the umbrella interval of the `segments` in the `segment` type `inputSpec` when the task publishes new compacted segments. If compaction fails, Druid does not drop or mark unused any segments. WARNING: this functionality is still in beta and can result in temporary data unavailability for data within the compaction task interval.|false|no|


There are two supported `inputSpec`s for now.

Expand Down
7 changes: 4 additions & 3 deletions docs/ingestion/native-batch.md
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,8 @@ The supported compression formats for native batch ingestion are `bz2`, `gz`, `x
`granularitySpec`'s intervals, the portion of those segments outside the new segments' intervals will still be visible.
- You can set `dropExisting` flag in the `ioConfig` to true if you want the ingestion task to drop all existing segments that
start and end within your `granularitySpec`'s intervals. This applies whether or not the new data covers all existing segments.
`dropExisting` only applies when `appendToExisting` is false and the `granularitySpec` contains an `interval`.
`dropExisting` only applies when `appendToExisting` is false and the `granularitySpec` contains an `interval`. WARNING: this
functionality is still in beta and can result in temporary data unavailability for data within the specified `interval`

The following examples demonstrate when to set the `dropExisting` property to true in the `ioConfig`:

Expand Down Expand Up @@ -219,7 +220,7 @@ that range if there's some stray data with unexpected timestamps.
|type|The task type, this should always be `index_parallel`.|none|yes|
|inputFormat|[`inputFormat`](./data-formats.md#input-format) to specify how to parse input data.|none|yes|
|appendToExisting|Creates segments as additional shards of the latest version, effectively appending to the segment set instead of replacing it. This means that you can append new segments to any datasource regardless of its original partitioning scheme. You must use the `dynamic` partitioning type for the appended segments. If you specify a different partitioning type, the task fails with an error.|false|no|
|dropExisting|If `true` and `appendToExisting` is `false` and the `granularitySpec` contains an`interval`, then the ingestion task drops (mark unused) all existing segments fully contained by the specified `interval` when the task publishes new segments. If ingestion fails, Druid does not drop or mark unused any segments. In the case of misconfiguration where either `appendToExisting` is `true` or `interval` is not specified in `granularitySpec`, Druid does not drop any segments even if `dropExisting` is `true`.|false|no|
|dropExisting|If `true` and `appendToExisting` is `false` and the `granularitySpec` contains an`interval`, then the ingestion task drops (mark unused) all existing segments fully contained by the specified `interval` when the task publishes new segments. If ingestion fails, Druid does not drop or mark unused any segments. In the case of misconfiguration where either `appendToExisting` is `true` or `interval` is not specified in `granularitySpec`, Druid does not drop any segments even if `dropExisting` is `true`. WARNING: this functionality is still in beta and can result in temporary data unavailability for data within the specified `interval`.|false|no|

### `tuningConfig`

Expand Down Expand Up @@ -747,7 +748,7 @@ that range if there's some stray data with unexpected timestamps.
|type|The task type, this should always be "index".|none|yes|
|inputFormat|[`inputFormat`](./data-formats.md#input-format) to specify how to parse input data.|none|yes|
|appendToExisting|Creates segments as additional shards of the latest version, effectively appending to the segment set instead of replacing it. This means that you can append new segments to any datasource regardless of its original partitioning scheme. You must use the `dynamic` partitioning type for the appended segments. If you specify a different partitioning type, the task fails with an error.|false|no|
|dropExisting|If `true` and `appendToExisting` is `false` and the `granularitySpec` contains an`interval`, then the ingestion task drops (mark unused) all existing segments fully contained by the specified `interval` when the task publishes new segments. If ingestion fails, Druid does not drop or mark unused any segments. In the case of misconfiguration where either `appendToExisting` is `true` or `interval` is not specified in `granularitySpec`, Druid does not drop any segments even if `dropExisting` is `true`.|false|no|
|dropExisting|If `true` and `appendToExisting` is `false` and the `granularitySpec` contains an`interval`, then the ingestion task drops (mark unused) all existing segments fully contained by the specified `interval` when the task publishes new segments. If ingestion fails, Druid does not drop or mark unused any segments. In the case of misconfiguration where either `appendToExisting` is `true` or `interval` is not specified in `granularitySpec`, Druid does not drop any segments even if `dropExisting` is `true`. WARNING: this functionality is still in beta and can result in temporary data unavailability for data within the specified `interval`.|false|no|

### `tuningConfig`

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.fasterxml.jackson.annotation.JsonTypeName;
import org.apache.druid.segment.indexing.IOConfig;

import javax.annotation.Nullable;
import java.util.Objects;

/**
Expand All @@ -36,11 +37,16 @@
public class CompactionIOConfig implements IOConfig
{
private final CompactionInputSpec inputSpec;
private final boolean dropExisting;

@JsonCreator
public CompactionIOConfig(@JsonProperty("inputSpec") CompactionInputSpec inputSpec)
public CompactionIOConfig(
@JsonProperty("inputSpec") CompactionInputSpec inputSpec,
@JsonProperty("dropExisting") @Nullable Boolean dropExisting
)
{
this.inputSpec = inputSpec;
this.dropExisting = dropExisting == null ? DEFAULT_DROP_EXISTING : dropExisting;
}

@JsonProperty
Expand All @@ -49,6 +55,12 @@ public CompactionInputSpec getInputSpec()
return inputSpec;
}

@JsonProperty
public boolean isDropExisting()
{
return dropExisting;
}

@Override
public boolean equals(Object o)
{
Expand All @@ -59,20 +71,22 @@ public boolean equals(Object o)
return false;
}
CompactionIOConfig that = (CompactionIOConfig) o;
return Objects.equals(inputSpec, that.inputSpec);
return dropExisting == that.dropExisting &&
Objects.equals(inputSpec, that.inputSpec);
}

@Override
public int hashCode()
{
return Objects.hash(inputSpec);
return Objects.hash(inputSpec, dropExisting);
}

@Override
public String toString()
{
return "CompactionIOConfig{" +
"inputSpec=" + inputSpec +
", dropExisting=" + dropExisting +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -198,11 +198,11 @@ public CompactionTask(
if (ioConfig != null) {
this.ioConfig = ioConfig;
} else if (interval != null) {
this.ioConfig = new CompactionIOConfig(new CompactionIntervalSpec(interval, null));
this.ioConfig = new CompactionIOConfig(new CompactionIntervalSpec(interval, null), null);
} else {
// We already checked segments is not null or empty above.
//noinspection ConstantConditions
this.ioConfig = new CompactionIOConfig(SpecificSegmentsSpec.fromSegments(segments));
this.ioConfig = new CompactionIOConfig(SpecificSegmentsSpec.fromSegments(segments), null);
}

this.dimensionsSpec = dimensionsSpec == null ? dimensions : dimensionsSpec;
Expand Down Expand Up @@ -383,7 +383,8 @@ public TaskStatus runTask(TaskToolbox toolbox) throws Exception
granularitySpec,
toolbox.getCoordinatorClient(),
segmentLoaderFactory,
retryPolicyFactory
retryPolicyFactory,
ioConfig.isDropExisting()
);
final List<ParallelIndexSupervisorTask> indexTaskSpecs = IntStream
.range(0, ingestionSpecs.size())
Expand Down Expand Up @@ -491,7 +492,8 @@ static List<ParallelIndexIngestionSpec> createIngestionSchema(
@Nullable final ClientCompactionTaskGranularitySpec granularitySpec,
final CoordinatorClient coordinatorClient,
final SegmentLoaderFactory segmentLoaderFactory,
final RetryPolicyFactory retryPolicyFactory
final RetryPolicyFactory retryPolicyFactory,
final boolean dropExisting
) throws IOException, SegmentLoadingException
{
NonnullPair<Map<DataSegment, File>, List<TimelineObjectHolder<String, DataSegment>>> pair = prepareSegments(
Expand Down Expand Up @@ -573,7 +575,8 @@ static List<ParallelIndexIngestionSpec> createIngestionSchema(
interval,
coordinatorClient,
segmentLoaderFactory,
retryPolicyFactory
retryPolicyFactory,
dropExisting
),
compactionTuningConfig
)
Expand All @@ -600,7 +603,8 @@ static List<ParallelIndexIngestionSpec> createIngestionSchema(
segmentProvider.interval,
coordinatorClient,
segmentLoaderFactory,
retryPolicyFactory
retryPolicyFactory,
dropExisting
),
compactionTuningConfig
)
Expand All @@ -614,7 +618,8 @@ private static ParallelIndexIOConfig createIoConfig(
Interval interval,
CoordinatorClient coordinatorClient,
SegmentLoaderFactory segmentLoaderFactory,
RetryPolicyFactory retryPolicyFactory
RetryPolicyFactory retryPolicyFactory,
boolean dropExisting
)
{
return new ParallelIndexIOConfig(
Expand All @@ -634,7 +639,7 @@ private static ParallelIndexIOConfig createIoConfig(
),
null,
false,
true
dropExisting
);
}

Expand Down Expand Up @@ -1021,7 +1026,13 @@ public Builder segments(List<DataSegment> segments)

public Builder inputSpec(CompactionInputSpec inputSpec)
{
this.ioConfig = new CompactionIOConfig(inputSpec);
this.ioConfig = new CompactionIOConfig(inputSpec, null);
return this;
}

public Builder inputSpec(CompactionInputSpec inputSpec, Boolean dropExisting)
{
this.ioConfig = new CompactionIOConfig(inputSpec, dropExisting);
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1033,7 +1033,6 @@ public IndexTuningConfig getTuningConfig()
public static class IndexIOConfig implements BatchIOConfig
{
private static final boolean DEFAULT_APPEND_TO_EXISTING = false;
private static final boolean DEFAULT_DROP_EXISTING = false;

private final FirehoseFactory firehoseFactory;
private final InputSource inputSource;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,8 @@ public void testClientCompactionTaskQueryToCompactionTask() throws IOException
new ClientCompactionIntervalSpec(
Intervals.of("2019/2020"),
"testSha256OfSortedSegmentIds"
)
),
true
),
new ClientCompactionTaskQueryTuningConfig(
null,
Expand Down Expand Up @@ -201,6 +202,10 @@ public void testClientCompactionTaskQueryToCompactionTask() throws IOException
query.getGranularitySpec().getSegmentGranularity(),
task.getGranularitySpec().getSegmentGranularity()
);
Assert.assertEquals(
query.getIoConfig().isDropExisting(),
task.getIoConfig().isDropExisting()
);
Assert.assertEquals(query.getContext(), task.getContext());
}

Expand All @@ -214,7 +219,7 @@ public void testCompactionTaskToClientCompactionTaskQuery() throws IOException
new RetryPolicyFactory(new RetryPolicyConfig())
);
final CompactionTask task = builder
.inputSpec(new CompactionIntervalSpec(Intervals.of("2019/2020"), "testSha256OfSortedSegmentIds"))
.inputSpec(new CompactionIntervalSpec(Intervals.of("2019/2020"), "testSha256OfSortedSegmentIds"), true)
.tuningConfig(
new ParallelIndexTuningConfig(
null,
Expand Down Expand Up @@ -268,7 +273,8 @@ public void testCompactionTaskToClientCompactionTaskQuery() throws IOException
new ClientCompactionIntervalSpec(
Intervals.of("2019/2020"),
"testSha256OfSortedSegmentIds"
)
),
true
),
new ClientCompactionTaskQueryTuningConfig(
100,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -442,7 +442,7 @@ public void testDruidInputSourceCreateSplitsWithIndividualSplits()
}

@Test
public void testCompactionDropSegmentsOfInputInterval()
public void testCompactionDropSegmentsOfInputIntervalIfDropFlagIsSet()
{
runIndexTask(null, true);

Expand All @@ -458,7 +458,8 @@ public void testCompactionDropSegmentsOfInputInterval()
RETRY_POLICY_FACTORY
);
final CompactionTask compactionTask = builder
.inputSpec(new CompactionIntervalSpec(INTERVAL_TO_INDEX, null))
// Set the dropExisting flag to true in the IOConfig of the compaction task
.inputSpec(new CompactionIntervalSpec(INTERVAL_TO_INDEX, null), true)
.tuningConfig(AbstractParallelIndexSupervisorTaskTest.DEFAULT_TUNING_CONFIG_FOR_PARALLEL_INDEXING)
.granularitySpec(new ClientCompactionTaskGranularitySpec(Granularities.MINUTE, null))
.build();
Expand All @@ -474,6 +475,47 @@ public void testCompactionDropSegmentsOfInputInterval()
}
}

@Test
public void testCompactionDoesNotDropSegmentsIfDropFlagNotSet()
{
runIndexTask(null, true);

Collection<DataSegment> usedSegments = getCoordinatorClient().fetchUsedSegmentsInDataSourceForIntervals(DATA_SOURCE, ImmutableList.of(INTERVAL_TO_INDEX));
Assert.assertEquals(3, usedSegments.size());
for (DataSegment segment : usedSegments) {
Assert.assertTrue(Granularities.HOUR.isAligned(segment.getInterval()));
}

final Builder builder = new Builder(
DATA_SOURCE,
getSegmentLoaderFactory(),
RETRY_POLICY_FACTORY
);
final CompactionTask compactionTask = builder
.inputSpec(new CompactionIntervalSpec(INTERVAL_TO_INDEX, null))
.tuningConfig(AbstractParallelIndexSupervisorTaskTest.DEFAULT_TUNING_CONFIG_FOR_PARALLEL_INDEXING)
.granularitySpec(new ClientCompactionTaskGranularitySpec(Granularities.MINUTE, null))
.build();

final Set<DataSegment> compactedSegments = runTask(compactionTask);

usedSegments = getCoordinatorClient().fetchUsedSegmentsInDataSourceForIntervals(DATA_SOURCE, ImmutableList.of(INTERVAL_TO_INDEX));
// All the HOUR segments did not get dropped since MINUTES segments did not fully covering the 3 HOURS interval.
Assert.assertEquals(6, usedSegments.size());
int hourSegmentCount = 0;
int minuteSegmentCount = 0;
for (DataSegment segment : usedSegments) {
if (Granularities.MINUTE.isAligned(segment.getInterval())) {
minuteSegmentCount++;
}
if (Granularities.MINUTE.isAligned(segment.getInterval())) {
hourSegmentCount++;
}
}
Assert.assertEquals(3, hourSegmentCount);
Assert.assertEquals(3, minuteSegmentCount);
}

private void runIndexTask(@Nullable PartitionsSpec partitionsSpec, boolean appendToExisting)
{
ParallelIndexIOConfig ioConfig = new ParallelIndexIOConfig(
Expand Down
Loading