Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion docs/ingestion/data-management.md
Original file line number Diff line number Diff line change
Expand Up @@ -95,18 +95,25 @@ A data deletion tutorial is available at [Tutorial: Deleting data](../tutorials/

## Kill Task

Kill tasks delete all information about a segment and removes it from deep storage. Segments to kill must be unused (used==0) in the Druid segment table. The available grammar is:
The kill task deletes all information about segments and removes them from deep storage. Segments to kill must be unused (used==0) in the Druid segment table.

The available grammar is:

```json
{
"type": "kill",
"id": <task_id>,
"dataSource": <task_datasource>,
"interval" : <all_segments_in_this_interval_will_die!>,
"markAsUnused": <true|false>,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems worth to mention what the default is.

"context": <task context>
}
```

If `markAsUnused` is true (default is false), the kill task will first mark any segments within the specified interval as unused, before deleting the unused segments within the interval.

**WARNING!** The kill task permanently removes all information about the affected segments from the metadata store and deep storage. These segments cannot be recovered after the kill task runs, this operation cannot be undone.

## Retention

Druid supports retention rules, which are used to define intervals of time where data should be preserved, and intervals where data should be discarded.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.indexing.common.actions;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.type.TypeReference;
import org.apache.druid.indexing.common.task.Task;
import org.joda.time.Interval;

public class MarkSegmentsAsUnusedAction implements TaskAction<Integer>
{
@JsonIgnore
private final String dataSource;

@JsonIgnore
private final Interval interval;

@JsonCreator
public MarkSegmentsAsUnusedAction(
@JsonProperty("dataSource") String dataSource,
@JsonProperty("interval") Interval interval
)
{
this.dataSource = dataSource;
this.interval = interval;
}

@JsonProperty
public String getDataSource()
{
return dataSource;
}

@JsonProperty
public Interval getInterval()
{
return interval;
}

@Override
public TypeReference<Integer> getReturnTypeReference()
{
return new TypeReference<Integer>()
{
};
}

@Override
public Integer perform(Task task, TaskActionToolbox toolbox)
{
int numMarked = toolbox.getIndexerMetadataStorageCoordinator()
.markSegmentsAsUnusedWithinInterval(dataSource, interval);
return numMarked;
}

@Override
public boolean isAudited()
{
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
@JsonSubTypes.Type(name = "segmentListUsed", value = RetrieveUsedSegmentsAction.class),
// Type name doesn't correspond to the name of the class for backward compatibility.
@JsonSubTypes.Type(name = "segmentListUnused", value = RetrieveUnusedSegmentsAction.class),
@JsonSubTypes.Type(name = "markSegmentsAsUnused", value = MarkSegmentsAsUnusedAction.class),
@JsonSubTypes.Type(name = "segmentNuke", value = SegmentNukeAction.class),
@JsonSubTypes.Type(name = "segmentMetadataUpdate", value = SegmentMetadataUpdateAction.class),
@JsonSubTypes.Type(name = SegmentAllocateAction.TYPE, value = SegmentAllocateAction.class),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,13 @@
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.TaskLock;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.actions.MarkSegmentsAsUnusedAction;
import org.apache.druid.indexing.common.actions.RetrieveUnusedSegmentsAction;
import org.apache.druid.indexing.common.actions.SegmentNukeAction;
import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.indexing.common.actions.TaskLocks;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.timeline.DataSegment;
import org.joda.time.DateTime;
import org.joda.time.Interval;
Expand All @@ -50,13 +52,17 @@
*/
public class KillUnusedSegmentsTask extends AbstractFixedIntervalTask
{
private static final Logger LOG = new Logger(KillUnusedSegmentsTask.class);

private final boolean markAsUnused;

@JsonCreator
public KillUnusedSegmentsTask(
@JsonProperty("id") String id,
@JsonProperty("dataSource") String dataSource,
@JsonProperty("interval") Interval interval,
@JsonProperty("context") Map<String, Object> context
@JsonProperty("context") Map<String, Object> context,
@JsonProperty("markAsUnused") Boolean markAsUnused
)
{
super(
Expand All @@ -65,6 +71,13 @@ public KillUnusedSegmentsTask(
interval,
context
);
this.markAsUnused = markAsUnused != null && markAsUnused;
}

@JsonProperty
public boolean isMarkAsUnused()
{
return markAsUnused;
}

@Override
Expand All @@ -77,6 +90,14 @@ public String getType()
public TaskStatus run(TaskToolbox toolbox) throws Exception
{
final NavigableMap<DateTime, List<TaskLock>> taskLockMap = getTaskLockMap(toolbox.getTaskActionClient());

if (markAsUnused) {
int numMarked = toolbox.getTaskActionClient().submit(
new MarkSegmentsAsUnusedAction(getDataSource(), getInterval())
);
LOG.info("Marked %d segments as unused.", numMarked);
}

// List unused segments
final List<DataSegment> unusedSegments = toolbox
.getTaskActionClient()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,15 @@ public void testClientKillUnusedSegmentsTaskQueryToKillUnusedSegmentsTask() thro
final ClientKillUnusedSegmentsTaskQuery taskQuery = new ClientKillUnusedSegmentsTaskQuery(
"killTaskId",
"datasource",
Intervals.of("2020-01-01/P1D")
Intervals.of("2020-01-01/P1D"),
true
);
final byte[] json = objectMapper.writeValueAsBytes(taskQuery);
final KillUnusedSegmentsTask fromJson = (KillUnusedSegmentsTask) objectMapper.readValue(json, Task.class);
Assert.assertEquals(taskQuery.getId(), fromJson.getId());
Assert.assertEquals(taskQuery.getDataSource(), fromJson.getDataSource());
Assert.assertEquals(taskQuery.getInterval(), fromJson.getInterval());
Assert.assertEquals(taskQuery.getMarkAsUnused(), fromJson.isMarkAsUnused());
}

@Test
Expand All @@ -66,7 +68,8 @@ public void testKillUnusedSegmentsTaskToClientKillUnusedSegmentsTaskQuery() thro
null,
"datasource",
Intervals.of("2020-01-01/P1D"),
null
null,
true
);
final byte[] json = objectMapper.writeValueAsBytes(task);
final ClientKillUnusedSegmentsTaskQuery taskQuery = (ClientKillUnusedSegmentsTaskQuery) objectMapper.readValue(
Expand All @@ -76,5 +79,6 @@ public void testKillUnusedSegmentsTaskToClientKillUnusedSegmentsTaskQuery() thro
Assert.assertEquals(task.getId(), taskQuery.getId());
Assert.assertEquals(task.getDataSource(), taskQuery.getDataSource());
Assert.assertEquals(task.getInterval(), taskQuery.getInterval());
Assert.assertEquals(task.isMarkAsUnused(), taskQuery.getMarkAsUnused());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,58 @@ public void testKill() throws Exception
);

final KillUnusedSegmentsTask task =
new KillUnusedSegmentsTask(null, DATA_SOURCE, Intervals.of("2019-03-01/2019-04-01"), null);
new KillUnusedSegmentsTask(
null,
DATA_SOURCE,
Intervals.of("2019-03-01/2019-04-01"),
null,
false
);

Assert.assertEquals(TaskState.SUCCESS, taskRunner.run(task).get().getStatusCode());

final List<DataSegment> unusedSegments =
getMetadataStorageCoordinator().retrieveUnusedSegmentsForInterval(DATA_SOURCE, Intervals.of("2019/2020"));

Assert.assertEquals(ImmutableList.of(newSegment(Intervals.of("2019-02-01/2019-03-01"), version)), unusedSegments);
Assertions.assertThat(
getMetadataStorageCoordinator()
.retrieveUsedSegmentsForInterval(DATA_SOURCE, Intervals.of("2019/2020"), Segments.ONLY_VISIBLE)
).containsExactlyInAnyOrder(
newSegment(Intervals.of("2019-01-01/2019-02-01"), version),
newSegment(Intervals.of("2019-04-01/2019-05-01"), version)
);
}


@Test
public void testKillWithMarkUnused() throws Exception
{
final String version = DateTimes.nowUtc().toString();
final Set<DataSegment> segments = ImmutableSet.of(
newSegment(Intervals.of("2019-01-01/2019-02-01"), version),
newSegment(Intervals.of("2019-02-01/2019-03-01"), version),
newSegment(Intervals.of("2019-03-01/2019-04-01"), version),
newSegment(Intervals.of("2019-04-01/2019-05-01"), version)
);
final Set<DataSegment> announced = getMetadataStorageCoordinator().announceHistoricalSegments(segments);

Assert.assertEquals(segments, announced);

Assert.assertTrue(
getSegmentsMetadataManager().markSegmentAsUnused(
newSegment(Intervals.of("2019-02-01/2019-03-01"), version).getId().toString()
)
);

final KillUnusedSegmentsTask task =
new KillUnusedSegmentsTask(
null,
DATA_SOURCE,
Intervals.of("2019-03-01/2019-04-01"),
null,
true
);
Comment thread
zachjsh marked this conversation as resolved.

Assert.assertEquals(TaskState.SUCCESS, taskRunner.run(task).get().getStatusCode());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -929,7 +929,13 @@ public DataSegment apply(String input)
}

final Task killUnusedSegmentsTask =
new KillUnusedSegmentsTask(null, "test_kill_task", Intervals.of("2011-04-01/P4D"), null);
new KillUnusedSegmentsTask(
null,
"test_kill_task",
Intervals.of("2011-04-01/P4D"),
null,
false
);

final TaskStatus status = runTask(killUnusedSegmentsTask);
Assert.assertEquals(taskLocation, status.getLocation());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,12 @@ public List<DataSegment> retrieveUnusedSegmentsForInterval(String dataSource, In
}
}

@Override
public int markSegmentsAsUnusedWithinInterval(String dataSource, Interval interval)
{
return 0;
}

@Override
public Set<DataSegment> announceHistoricalSegments(Set<DataSegment> segments)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,17 +38,20 @@ public class ClientKillUnusedSegmentsTaskQuery implements ClientTaskQuery
private final String id;
private final String dataSource;
private final Interval interval;
private final Boolean markAsUnused;

@JsonCreator
public ClientKillUnusedSegmentsTaskQuery(
@JsonProperty("id") String id,
@JsonProperty("dataSource") String dataSource,
@JsonProperty("interval") Interval interval
@JsonProperty("interval") Interval interval,
@JsonProperty("markAsUnused") Boolean markAsUnused
)
{
this.id = Preconditions.checkNotNull(id, "id");
this.dataSource = dataSource;
this.interval = interval;
this.markAsUnused = markAsUnused;
}

@JsonProperty
Expand Down Expand Up @@ -78,6 +81,12 @@ public Interval getInterval()
return interval;
}

@JsonProperty
public Boolean getMarkAsUnused()
{
return markAsUnused;
}

@Override
public boolean equals(Object o)
{
Expand All @@ -88,14 +97,15 @@ public boolean equals(Object o)
return false;
}
ClientKillUnusedSegmentsTaskQuery that = (ClientKillUnusedSegmentsTaskQuery) o;
return Objects.equals(id, that.id) &&
Objects.equals(dataSource, that.dataSource) &&
Objects.equals(interval, that.interval);
return Objects.equals(id, that.id)
&& Objects.equals(dataSource, that.dataSource)
&& Objects.equals(interval, that.interval)
&& Objects.equals(markAsUnused, that.markAsUnused);
}

@Override
public int hashCode()
{
return Objects.hash(id, dataSource, interval);
return Objects.hash(id, dataSource, interval, markAsUnused);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public HttpIndexingServiceClient(
public void killUnusedSegments(String idPrefix, String dataSource, Interval interval)
Comment thread
zachjsh marked this conversation as resolved.
{
final String taskId = IdUtils.newTaskId(idPrefix, ClientKillUnusedSegmentsTaskQuery.TYPE, dataSource, interval);
final ClientTaskQuery taskQuery = new ClientKillUnusedSegmentsTaskQuery(taskId, dataSource, interval);
final ClientTaskQuery taskQuery = new ClientKillUnusedSegmentsTaskQuery(taskId, dataSource, interval, false);
runTask(taskId, taskQuery);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,16 @@ Collection<DataSegment> retrieveUsedSegmentsForIntervals(
*/
List<DataSegment> retrieveUnusedSegmentsForInterval(String dataSource, Interval interval);

/**
* Mark as unused segments which include ONLY data within the given interval.
*
* @param dataSource The data source the segments belong to
* @param interval Filter the data segments to ones that include data in this interval exclusively.
*
* @return number of segments marked unused
*/
int markSegmentsAsUnusedWithinInterval(String dataSource, Interval interval);

/**
* Attempts to insert a set of segments to the metadata storage. Returns the set of segments actually added (segments
* with identifiers already in the metadata storage will not be added).
Expand Down
Loading