Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions docs/data-management/delete.md
Original file line number Diff line number Diff line change
Expand Up @@ -95,9 +95,10 @@ The available grammar is:
"id": <task_id>,
"dataSource": <task_datasource>,
"interval" : <all_unused_segments_in_this_interval_will_die!>,
"context": <task context>,
"batchSize": <optional_batch size>,
"limit": <the maximum number of segments to delete>
"context": <task_context>,
"batchSize": <optional_batch_size>,
"limit": <optional_maximum_number_of_segments_to_delete>,
"maxUsedStatusLastUpdatedTime": <optional_maximum_timestamp_when_segments_were_marked_as_unused>
}
```

Expand All @@ -106,7 +107,8 @@ Some of the parameters used in the task payload are further explained below:
| Parameter | Default | Explanation |
|-------------|-----------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| `batchSize` |100 | Maximum number of segments that are deleted in one kill batch. Some operations on the Overlord may get stuck while a `kill` task is in progress due to concurrency constraints (such as in `TaskLockbox`). Thus, a `kill` task splits the list of unused segments to be deleted into smaller batches to yield the Overlord resources intermittently to other task operations.|
| `limit` | null - no limit | Maximum number of segments for the kill task to delete.|
| `limit` | null (no limit) | Maximum number of segments for the kill task to delete.|
| `maxUsedStatusLastUpdatedTime` | null (no cutoff) | Maximum timestamp used as a cutoff to include unused segments. The kill task only considers segments which lie in the specified `interval` and were marked as unused no later than this time. The default behavior is to kill all unused segments in the `interval` regardless of when they where marked as unused.|


**WARNING:** The `kill` task permanently removes all information about the affected segments from the metadata store and
Expand Down
11 changes: 7 additions & 4 deletions docs/operations/clean-metadata-store.md
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@ Only applies to the specified datasources in the dynamic configuration parameter
If `killDataSourceWhitelist` is not set or empty, then kill tasks can be submitted for all datasources.
- `druid.coordinator.kill.period`: Defines the frequency in [ISO 8601 format](https://en.wikipedia.org/wiki/ISO_8601#Durations) for the cleanup job to check for and delete eligible segments. Defaults to `P1D`. Must be greater than `druid.coordinator.period.indexingPeriod`.
- `druid.coordinator.kill.durationToRetain`: Defines the retention period in [ISO 8601 format](https://en.wikipedia.org/wiki/ISO_8601#Durations) after creation that segments become eligible for deletion.
- `druid.coordinator.kill.ignoreDurationToRetain`: A way to override `druid.coordinator.kill.durationToRetain`. When enabled, the coordinator considers all unused segments as eligible to be killed.
- `druid.coordinator.kill.bufferPeriod`: Defines the amount of time that a segment must be unused before it can be permanently removed from metadata and deep storage. This serves as a buffer period to prevent data loss if data ends up being needed after being marked unused.
- `druid.coordinator.kill.maxSegments`: Defines the maximum number of segments to delete per kill task.

### Audit records
Expand Down Expand Up @@ -189,22 +191,23 @@ druid.coordinator.kill.datasource.on=false
<a name="example"></a>
## Example configuration for automated metadata cleanup

Consider a scenario where you have scripts to create and delete hundreds of datasources and related entities a day. You do not want to fill your metadata store with leftover records. The datasources and related entities tend to persist for only one or two days. Therefore, you want to run a cleanup job that identifies and removes leftover records that are at least four days old. The exception is for audit logs, which you need to retain for 30 days:
Consider a scenario where you have scripts to create and delete hundreds of datasources and related entities a day. You do not want to fill your metadata store with leftover records. The datasources and related entities tend to persist for only one or two days. Therefore, you want to run a cleanup job that identifies and removes leftover records that are at least four days old after a seven day buffer period in case you want to recover the data. The exception is for audit logs, which you need to retain for 30 days:

```properties
...
# Schedule the metadata management store task for every hour:
druid.coordinator.period.metadataStoreManagementPeriod=P1H
druid.coordinator.period.metadataStoreManagementPeriod=PT1H

# Set a kill task to poll every day to delete Segment records and segments
# in deep storage > 4 days old. When druid.coordinator.kill.on is set to true,
# Set a kill task to poll every day to delete segment records and segments
# in deep storage > 4 days old after a 7-day buffer period. When druid.coordinator.kill.on is set to true,
# you can set killDataSourceWhitelist in the dynamic configuration to limit
# the datasources that can be killed.
# Required also for automated cleanup of rules and compaction configuration.

druid.coordinator.kill.on=true
druid.coordinator.kill.period=P1D
druid.coordinator.kill.durationToRetain=P4D
druid.coordinator.kill.bufferPeriod=P7D
druid.coordinator.kill.maxSegments=1000

# Poll every day to delete audit records > 30 days old
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.fasterxml.jackson.core.type.TypeReference;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.timeline.DataSegment;
import org.joda.time.DateTime;
import org.joda.time.Interval;

import javax.annotation.Nullable;
Expand All @@ -42,16 +43,21 @@ public class RetrieveUnusedSegmentsAction implements TaskAction<List<DataSegment
@JsonIgnore
private final Integer limit;

@JsonIgnore
private final DateTime maxUsedStatusLastUpdatedTime;

@JsonCreator
public RetrieveUnusedSegmentsAction(
@JsonProperty("dataSource") String dataSource,
@JsonProperty("interval") Interval interval,
@JsonProperty("limit") @Nullable Integer limit
@JsonProperty("limit") @Nullable Integer limit,
@JsonProperty("maxUsedStatusLastUpdatedTime") @Nullable DateTime maxUsedStatusLastUpdatedTime
)
{
this.dataSource = dataSource;
this.interval = interval;
this.limit = limit;
this.maxUsedStatusLastUpdatedTime = maxUsedStatusLastUpdatedTime;
}

@JsonProperty
Expand All @@ -73,6 +79,13 @@ public Integer getLimit()
return limit;
}

@Nullable
@JsonProperty
public DateTime getMaxUsedStatusLastUpdatedTime()
{
return maxUsedStatusLastUpdatedTime;
}

@Override
public TypeReference<List<DataSegment>> getReturnTypeReference()
{
Expand All @@ -83,7 +96,7 @@ public TypeReference<List<DataSegment>> getReturnTypeReference()
public List<DataSegment> perform(Task task, TaskActionToolbox toolbox)
{
return toolbox.getIndexerMetadataStorageCoordinator()
.retrieveUnusedSegmentsForInterval(dataSource, interval, limit);
.retrieveUnusedSegmentsForInterval(dataSource, interval, limit, maxUsedStatusLastUpdatedTime);
}

@Override
Expand All @@ -99,6 +112,7 @@ public String toString()
"dataSource='" + dataSource + '\'' +
", interval=" + interval +
", limit=" + limit +
", maxUsedStatusLastUpdatedTime=" + maxUsedStatusLastUpdatedTime +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ public TaskStatus runTask(TaskToolbox toolbox) throws Exception
// List unused segments
final List<DataSegment> unusedSegments = toolbox
.getTaskActionClient()
.submit(new RetrieveUnusedSegmentsAction(myLock.getDataSource(), myLock.getInterval(), null));
.submit(new RetrieveUnusedSegmentsAction(myLock.getDataSource(), myLock.getInterval(), null, null));

// Verify none of these segments have versions > lock version
for (final DataSegment unusedSegment : unusedSegments) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,10 @@
/**
* The client representation of this task is {@link ClientKillUnusedSegmentsTaskQuery}.
* JSON serialization fields of this class must correspond to those of {@link
* ClientKillUnusedSegmentsTaskQuery}, except for "id" and "context" fields.
* ClientKillUnusedSegmentsTaskQuery}, except for {@link #id} and {@link #context} fields.
* <p>
* The field {@link #isMarkAsUnused()} is now deprecated.
* </p>
*/
public class KillUnusedSegmentsTask extends AbstractFixedIntervalTask
{
Expand Down Expand Up @@ -95,6 +96,12 @@ public class KillUnusedSegmentsTask extends AbstractFixedIntervalTask
*/
@Nullable private final Integer limit;

/**
* The maximum used status last updated time. Any segments with
* {@code used_status_last_updated} no later than this time will be included in the kill task.
*/
@Nullable private final DateTime maxUsedStatusLastUpdatedTime;

@JsonCreator
public KillUnusedSegmentsTask(
@JsonProperty("id") String id,
Expand All @@ -103,7 +110,8 @@ public KillUnusedSegmentsTask(
@JsonProperty("context") Map<String, Object> context,
@JsonProperty("markAsUnused") @Deprecated Boolean markAsUnused,
@JsonProperty("batchSize") Integer batchSize,
@JsonProperty("limit") @Nullable Integer limit
@JsonProperty("limit") @Nullable Integer limit,
@JsonProperty("maxUsedStatusLastUpdatedTime") @Nullable DateTime maxUsedStatusLastUpdatedTime
)
{
super(
Expand All @@ -115,15 +123,16 @@ public KillUnusedSegmentsTask(
this.markAsUnused = markAsUnused != null && markAsUnused;
this.batchSize = (batchSize != null) ? batchSize : DEFAULT_SEGMENT_NUKE_BATCH_SIZE;
if (this.batchSize <= 0) {
throw InvalidInput.exception("batchSize[%d] must be a positive integer.", limit);
throw InvalidInput.exception("batchSize[%d] must be a positive integer.", batchSize);
}
if (limit != null && limit <= 0) {
throw InvalidInput.exception("Limit[%d] must be a positive integer.", limit);
throw InvalidInput.exception("limit[%d] must be a positive integer.", limit);
}
if (limit != null && Boolean.TRUE.equals(markAsUnused)) {
throw InvalidInput.exception("Limit cannot be provided when markAsUnused is enabled.");
throw InvalidInput.exception("limit[%d] cannot be provided when markAsUnused is enabled.", limit);
}
this.limit = limit;
this.maxUsedStatusLastUpdatedTime = maxUsedStatusLastUpdatedTime;
}

/**
Expand Down Expand Up @@ -155,6 +164,13 @@ public Integer getLimit()
return limit;
}

@Nullable
@JsonProperty
public DateTime getMaxUsedStatusLastUpdatedTime()
{
return maxUsedStatusLastUpdatedTime;
}

@Override
public String getType()
{
Expand All @@ -180,7 +196,8 @@ public TaskStatus runTask(TaskToolbox toolbox) throws Exception
numSegmentsMarkedAsUnused = toolbox.getTaskActionClient().submit(
new MarkSegmentsAsUnusedAction(getDataSource(), getInterval())
);
LOG.info("Marked [%d] segments as unused.", numSegmentsMarkedAsUnused);
LOG.info("Marked [%d] segments of datasource[%s] in interval[%s] as unused.",
numSegmentsMarkedAsUnused, getDataSource(), getInterval());
} else {
numSegmentsMarkedAsUnused = 0;
}
Expand All @@ -190,9 +207,13 @@ public TaskStatus runTask(TaskToolbox toolbox) throws Exception
@Nullable Integer numTotalBatches = getNumTotalBatches();
List<DataSegment> unusedSegments;
LOG.info(
"Starting kill with batchSize[%d], up to limit[%d] segments will be deleted%s",
"Starting kill for datasource[%s] in interval[%s] with batchSize[%d], up to limit[%d] segments "
+ "before maxUsedStatusLastUpdatedTime[%s] will be deleted%s",
getDataSource(),
getInterval(),
batchSize,
limit,
maxUsedStatusLastUpdatedTime,
numTotalBatches != null ? StringUtils.format(" in [%d] batches.", numTotalBatches) : "."
);

Expand All @@ -217,7 +238,9 @@ public TaskStatus runTask(TaskToolbox toolbox) throws Exception

unusedSegments = toolbox
.getTaskActionClient()
.submit(new RetrieveUnusedSegmentsAction(getDataSource(), getInterval(), nextBatchSize));
.submit(
new RetrieveUnusedSegmentsAction(getDataSource(), getInterval(), nextBatchSize, maxUsedStatusLastUpdatedTime
));

// Fetch locks each time as a revokal could have occurred in between batches
final NavigableMap<DateTime, List<TaskLock>> taskLockMap
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ public TaskStatus runTask(TaskToolbox toolbox) throws Exception
// List unused segments
final List<DataSegment> unusedSegments = toolbox
.getTaskActionClient()
.submit(new RetrieveUnusedSegmentsAction(myLock.getDataSource(), myLock.getInterval(), null));
.submit(new RetrieveUnusedSegmentsAction(myLock.getDataSource(), myLock.getInterval(), null, null));

// Verify none of these segments have versions > lock version
for (final DataSegment unusedSegment : unusedSegments) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ public TaskStatus runTask(TaskToolbox toolbox) throws Exception
// List unused segments
final List<DataSegment> unusedSegments = toolbox
.getTaskActionClient()
.submit(new RetrieveUnusedSegmentsAction(myLock.getDataSource(), myLock.getInterval(), null));
.submit(new RetrieveUnusedSegmentsAction(myLock.getDataSource(), myLock.getInterval(), null, null));

// Verify none of these segments have versions > lock version
for (final DataSegment unusedSegment : unusedSegments) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@
package org.apache.druid.indexing.common.actions;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import org.apache.druid.indexing.common.task.NoopTask;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.NoneShardSpec;
Expand Down Expand Up @@ -103,7 +105,23 @@ public void testRetrieveUsedSegmentsAction()
@Test
public void testRetrieveUnusedSegmentsAction()
{
final RetrieveUnusedSegmentsAction action = new RetrieveUnusedSegmentsAction(task.getDataSource(), INTERVAL, null);
final RetrieveUnusedSegmentsAction action = new RetrieveUnusedSegmentsAction(task.getDataSource(), INTERVAL, null, null);
final Set<DataSegment> resultSegments = new HashSet<>(action.perform(task, actionTestKit.getTaskActionToolbox()));
Assert.assertEquals(expectedUnusedSegments, resultSegments);
}

@Test
public void testRetrieveUnusedSegmentsActionWithMinUsedLastUpdatedTime()
{
final RetrieveUnusedSegmentsAction action = new RetrieveUnusedSegmentsAction(task.getDataSource(), INTERVAL, null, DateTimes.MIN);
final Set<DataSegment> resultSegments = new HashSet<>(action.perform(task, actionTestKit.getTaskActionToolbox()));
Assert.assertEquals(ImmutableSet.of(), resultSegments);
}

@Test
public void testRetrieveUnusedSegmentsActionWithNowUsedLastUpdatedTime()
{
final RetrieveUnusedSegmentsAction action = new RetrieveUnusedSegmentsAction(task.getDataSource(), INTERVAL, null, DateTimes.nowUtc());
final Set<DataSegment> resultSegments = new HashSet<>(action.perform(task, actionTestKit.getTaskActionToolbox()));
Assert.assertEquals(expectedUnusedSegments, resultSegments);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.druid.client.indexing.ClientKillUnusedSegmentsTaskQuery;
import org.apache.druid.client.indexing.ClientTaskQuery;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Intervals;
import org.junit.Assert;
import org.junit.Before;
Expand Down Expand Up @@ -53,7 +54,8 @@ public void testClientKillUnusedSegmentsTaskQueryToKillUnusedSegmentsTask() thro
Intervals.of("2020-01-01/P1D"),
false,
99,
5
5,
DateTimes.nowUtc()
);
final byte[] json = objectMapper.writeValueAsBytes(taskQuery);
final KillUnusedSegmentsTask fromJson = (KillUnusedSegmentsTask) objectMapper.readValue(json, Task.class);
Expand All @@ -63,7 +65,7 @@ public void testClientKillUnusedSegmentsTaskQueryToKillUnusedSegmentsTask() thro
Assert.assertEquals(taskQuery.getMarkAsUnused(), fromJson.isMarkAsUnused());
Assert.assertEquals(taskQuery.getBatchSize(), Integer.valueOf(fromJson.getBatchSize()));
Assert.assertEquals(taskQuery.getLimit(), fromJson.getLimit());

Assert.assertEquals(taskQuery.getMaxUsedStatusLastUpdatedTime(), fromJson.getMaxUsedStatusLastUpdatedTime());
}

@Test
Expand All @@ -75,6 +77,7 @@ public void testClientKillUnusedSegmentsTaskQueryToKillUnusedSegmentsTaskDefault
Intervals.of("2020-01-01/P1D"),
true,
null,
null,
null
);
final byte[] json = objectMapper.writeValueAsBytes(taskQuery);
Expand All @@ -85,6 +88,7 @@ public void testClientKillUnusedSegmentsTaskQueryToKillUnusedSegmentsTaskDefault
Assert.assertEquals(taskQuery.getMarkAsUnused(), fromJson.isMarkAsUnused());
Assert.assertEquals(100, fromJson.getBatchSize());
Assert.assertNull(taskQuery.getLimit());
Assert.assertNull(taskQuery.getMaxUsedStatusLastUpdatedTime());
}

@Test
Expand All @@ -97,6 +101,7 @@ public void testKillUnusedSegmentsTaskToClientKillUnusedSegmentsTaskQuery() thro
null,
true,
99,
null,
null
);
final byte[] json = objectMapper.writeValueAsBytes(task);
Expand All @@ -110,5 +115,33 @@ public void testKillUnusedSegmentsTaskToClientKillUnusedSegmentsTaskQuery() thro
Assert.assertEquals(task.isMarkAsUnused(), taskQuery.getMarkAsUnused());
Assert.assertEquals(Integer.valueOf(task.getBatchSize()), taskQuery.getBatchSize());
Assert.assertNull(task.getLimit());
Assert.assertNull(task.getMaxUsedStatusLastUpdatedTime());
}

@Test
public void testKillUnusedSegmentsTaskWithNonNullValuesToClientKillUnusedSegmentsTaskQuery() throws IOException
{
final KillUnusedSegmentsTask task = new KillUnusedSegmentsTask(
null,
"datasource",
Intervals.of("2020-01-01/P1D"),
null,
null,
99,
100,
DateTimes.nowUtc()
);
final byte[] json = objectMapper.writeValueAsBytes(task);
final ClientKillUnusedSegmentsTaskQuery taskQuery = (ClientKillUnusedSegmentsTaskQuery) objectMapper.readValue(
json,
ClientTaskQuery.class
);
Assert.assertEquals(task.getId(), taskQuery.getId());
Assert.assertEquals(task.getDataSource(), taskQuery.getDataSource());
Assert.assertEquals(task.getInterval(), taskQuery.getInterval());
Assert.assertNull(taskQuery.getMarkAsUnused());
Assert.assertEquals(Integer.valueOf(task.getBatchSize()), taskQuery.getBatchSize());
Assert.assertEquals(task.getLimit(), taskQuery.getLimit());
Assert.assertEquals(task.getMaxUsedStatusLastUpdatedTime(), taskQuery.getMaxUsedStatusLastUpdatedTime());
}
}
Loading