Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,10 @@ public void setup()
dataSource,
new DataSourceCompactionConfig(
dataSource,
false,
0,
targetCompactionSizeBytes,
targetCompactionSizeBytes,
null,
null,
null,
Expand Down
6 changes: 4 additions & 2 deletions docs/content/configuration/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -782,9 +782,11 @@ A description of the compaction config is:
|Property|Description|Required|
|--------|-----------|--------|
|`dataSource`|dataSource name to be compacted.|yes|
|`keepSegmentGranularity`|Set [keepSegmentGranularity](../ingestion/compaction.html) to true for compactionTask.|no (default = true)|
|`taskPriority`|[Priority](../ingestion/tasks.html#task-priorities) of compact task.|no (default = 25)|
|`targetCompactionSizeBytes`|The target segment size of compaction. The actual size of a compact segment might be slightly larger or smaller than this value.|no (default = 838860800)|
|`numTargetCompactionSegments`|Max number of segments to compact together.|no (default = 150)|
|`inputSegmentSizeBytes`|Total input segment size of a compactionTask.|no (default = 419430400)|
|`targetCompactionSizeBytes`|The target segment size of compaction. The actual size of a compact segment might be slightly larger or smaller than this value.|no (default = 419430400)|
|`maxNumSegmentsToCompact`|Max number of segments to compact together.|no (default = 150)|
|`skipOffsetFromLatest`|The offset for searching segments to be compacted. Strongly recommended to set for realtime dataSources. |no (default = "P1D")|
|`tuningConfig`|Tuning config for compact tasks. See below [Compact Task TuningConfig](#compact-task-tuningconfig).|no|
|`taskContext`|[Task context](../ingestion/tasks.html#task-context) for compact tasks.|no|
Expand Down
11 changes: 7 additions & 4 deletions docs/content/ingestion/compaction.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ Compaction tasks merge all segments of the given interval. The syntax is:
"dataSource": <task_datasource>,
"interval": <interval to specify segments to be merged>,
"dimensions" <custom dimensionsSpec>,
"keepSegmentGranularity": <true or false>,
"targetCompactionSizeBytes": <target size of compacted segments>
"tuningConfig" <index task tuningConfig>,
"context": <task context>
}
Expand All @@ -22,9 +24,11 @@ Compaction tasks merge all segments of the given interval. The syntax is:
|-----|-----------|--------|
|`type`|Task type. Should be `compact`|Yes|
|`id`|Task id|No|
|`dataSource`|dataSource name to be compacted|Yes|
|`interval`|interval of segments to be compacted|Yes|
|`dimensions`|custom dimensionsSpec. compaction task will use this dimensionsSpec if exist instead of generating one. See below for more details.|No|
|`dataSource`|DataSource name to be compacted|Yes|
|`interval`|Interval of segments to be compacted|Yes|
|`dimensions`|Custom dimensionsSpec. compaction task will use this dimensionsSpec if exist instead of generating one. See below for more details.|No|
|`keepSegmentGranularity`|If set to true, compactionTask will keep the time chunk boundaries and merge segments only if they fall into the same time chunk.|No (default = true)|
|`targetCompactionSizeBytes`|Target segment size after comapction. Cannot be used with `targetPartitionSize`, `maxTotalRows`, and `numShards` in tuningConfig.|No|
|`tuningConfig`|[Index task tuningConfig](../ingestion/native_tasks.html#tuningconfig)|No|
|`context`|[Task context](../ingestion/locking-and-priority.html#task-context)|No|

Expand Down Expand Up @@ -62,4 +66,3 @@ your own ordering and types, you can specify a custom `dimensionsSpec` in the co
- Roll-up: the output segment is rolled up only when `rollup` is set for all input segments.
See [Roll-up](../ingestion/index.html#rollup) for more details.
You can check that your segments are rolled up or not by using [Segment Metadata Queries](../querying/segmentmetadataquery.html#analysistypes).
- Partitioning: The compaction task is a special form of native batch indexing task, so it always uses hash-based partitioning on the full set of dimensions.
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ public CompactionTask(
this.tuningConfig = tuningConfig;
this.jsonMapper = jsonMapper;
this.segmentProvider = segments == null ? new SegmentProvider(dataSource, interval) : new SegmentProvider(segments);
this.partitionConfigurationManager = new PartitionConfigurationManager(targetCompactionSizeBytes, tuningConfig);
this.partitionConfigurationManager = new PartitionConfigurationManager(this.targetCompactionSizeBytes, tuningConfig);
this.authorizerMapper = authorizerMapper;
this.chatHandlerProvider = chatHandlerProvider;
this.rowIngestionMetersFactory = rowIngestionMetersFactory;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,11 @@ public static Interval of(String interval)
return new Interval(interval, ISOChronology.getInstanceUTC());
}

public static Interval of(String format, Object... formatArgs)
{
return of(StringUtils.format(format, formatArgs));
}

public static boolean isEmpty(Interval interval)
{
return interval.getStart().equals(interval.getEnd());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ public class ClientCompactQuery
private final String dataSource;
private final List<DataSegment> segments;
private final boolean keepSegmentGranularity;
private final Long targetCompactionSizeBytes;
private final ClientCompactQueryTuningConfig tuningConfig;
private final Map<String, Object> context;

Expand All @@ -39,13 +40,15 @@ public ClientCompactQuery(
@JsonProperty("dataSource") String dataSource,
@JsonProperty("segments") List<DataSegment> segments,
@JsonProperty("keepSegmentGranularity") boolean keepSegmentGranularity,
@JsonProperty("targetCompactionSizeBytes") Long targetCompactionSizeBytes,
@JsonProperty("tuningConfig") ClientCompactQueryTuningConfig tuningConfig,
@JsonProperty("context") Map<String, Object> context
)
{
this.dataSource = dataSource;
this.segments = segments;
this.keepSegmentGranularity = keepSegmentGranularity;
this.targetCompactionSizeBytes = targetCompactionSizeBytes;
this.tuningConfig = tuningConfig;
this.context = context;
}
Expand Down Expand Up @@ -74,6 +77,12 @@ public boolean isKeepSegmentGranularity()
return keepSegmentGranularity;
}

@JsonProperty
public Long getTargetCompactionSizeBytes()
{
return targetCompactionSizeBytes;
}

@JsonProperty
public ClientCompactQueryTuningConfig getTuningConfig()
{
Expand All @@ -90,10 +99,12 @@ public Map<String, Object> getContext()
public String toString()
{
return "ClientCompactQuery{" +
"dataSource=" + dataSource + "'" +
"dataSource='" + dataSource + '\'' +
", segments=" + segments +
", keepSegmentGranularity=" + keepSegmentGranularity +
", targetCompactionSizeBytes=" + targetCompactionSizeBytes +
", tuningConfig=" + tuningConfig +
", contexts=" + context +
"}";
", context=" + context +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ public void killSegments(String dataSource, Interval interval)
public String compactSegments(
List<DataSegment> segments,
boolean keepSegmentGranularity,
long targetCompactionSizeBytes,
int compactionTaskPriority,
@Nullable ClientCompactQueryTuningConfig tuningConfig,
@Nullable Map<String, Object> context
Expand All @@ -107,7 +108,16 @@ public String compactSegments(
context = context == null ? new HashMap<>() : context;
context.put("priority", compactionTaskPriority);

return runTask(new ClientCompactQuery(dataSource, segments, keepSegmentGranularity, tuningConfig, context));
return runTask(
new ClientCompactQuery(
dataSource,
segments,
keepSegmentGranularity,
targetCompactionSizeBytes,
tuningConfig,
context
)
);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ public interface IndexingServiceClient
String compactSegments(
List<DataSegment> segments,
boolean keepSegmentGranularity,
long targetCompactionSizeBytes,
int compactionTaskPriority,
@Nullable ClientCompactQueryTuningConfig tuningConfig,
@Nullable Map<String, Object> context
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,46 +35,58 @@ public class DataSourceCompactionConfig

// should be synchronized with Tasks.DEFAULT_MERGE_TASK_PRIORITY
private static final int DEFAULT_COMPACTION_TASK_PRIORITY = 25;
private static final int DEFAULT_NUM_TARGET_COMPACTION_SEGMENTS = 150;
private static final boolean DEFAULT_KEEP_SEGMENT_GRANULARITY = true;
private static final long DEFAULT_INPUT_SEGMENT_SIZE_BYTES = 400 * 1024 * 1024;
private static final int DEFAULT_NUM_INPUT_SEGMENTS = 150;
private static final Period DEFAULT_SKIP_OFFSET_FROM_LATEST = new Period("P1D");

private final String dataSource;
private final boolean keepSegmentGranularity;
private final int taskPriority;
private final long inputSegmentSizeBytes;
private final long targetCompactionSizeBytes;
// The number of compaction segments is limited because the byte size of a serialized task spec is limited by
// The number of input segments is limited because the byte size of a serialized task spec is limited by
// RemoteTaskRunnerConfig.maxZnodeBytes.
private final int numTargetCompactionSegments;
private final int maxNumSegmentsToCompact;
private final Period skipOffsetFromLatest;
private final ClientCompactQueryTuningConfig tuningConfig;
private final Map<String, Object> taskContext;

@JsonCreator
public DataSourceCompactionConfig(
@JsonProperty("dataSource") String dataSource,
@JsonProperty("keepSegmentGranularity") Boolean keepSegmentGranularity,
@JsonProperty("taskPriority") @Nullable Integer taskPriority,
@JsonProperty("inputSegmentSizeBytes") @Nullable Long inputSegmentSizeBytes,
@JsonProperty("targetCompactionSizeBytes") @Nullable Long targetCompactionSizeBytes,
@JsonProperty("numTargetCompactionSegments") @Nullable Integer numTargetCompactionSegments,
@JsonProperty("maxNumSegmentsToCompact") @Nullable Integer maxNumSegmentsToCompact,
@JsonProperty("skipOffsetFromLatest") @Nullable Period skipOffsetFromLatest,
@JsonProperty("tuningConfig") @Nullable ClientCompactQueryTuningConfig tuningConfig,
@JsonProperty("taskContext") @Nullable Map<String, Object> taskContext
)
{
this.dataSource = Preconditions.checkNotNull(dataSource, "dataSource");
this.taskPriority = taskPriority == null ?
DEFAULT_COMPACTION_TASK_PRIORITY :
taskPriority;
this.targetCompactionSizeBytes = targetCompactionSizeBytes == null ?
DEFAULT_TARGET_COMPACTION_SIZE_BYTES :
targetCompactionSizeBytes;
this.numTargetCompactionSegments = numTargetCompactionSegments == null ?
DEFAULT_NUM_TARGET_COMPACTION_SEGMENTS :
numTargetCompactionSegments;
this.keepSegmentGranularity = keepSegmentGranularity == null
? DEFAULT_KEEP_SEGMENT_GRANULARITY
: keepSegmentGranularity;
this.taskPriority = taskPriority == null
? DEFAULT_COMPACTION_TASK_PRIORITY
: taskPriority;
this.inputSegmentSizeBytes = inputSegmentSizeBytes == null
? DEFAULT_INPUT_SEGMENT_SIZE_BYTES
: inputSegmentSizeBytes;
this.targetCompactionSizeBytes = targetCompactionSizeBytes == null
? DEFAULT_TARGET_COMPACTION_SIZE_BYTES
: targetCompactionSizeBytes;
this.maxNumSegmentsToCompact = maxNumSegmentsToCompact == null
? DEFAULT_NUM_INPUT_SEGMENTS
: maxNumSegmentsToCompact;
this.skipOffsetFromLatest = skipOffsetFromLatest == null ? DEFAULT_SKIP_OFFSET_FROM_LATEST : skipOffsetFromLatest;
this.tuningConfig = tuningConfig;
this.taskContext = taskContext;

Preconditions.checkArgument(
this.numTargetCompactionSegments > 1,
this.maxNumSegmentsToCompact > 1,
"numTargetCompactionSegments should be larger than 1"
);
}
Expand All @@ -85,22 +97,34 @@ public String getDataSource()
return dataSource;
}

@JsonProperty
public boolean isKeepSegmentGranularity()
{
return keepSegmentGranularity;
}

@JsonProperty
public int getTaskPriority()
{
return taskPriority;
}

@JsonProperty
public long getTargetCompactionSizeBytes()
public long getInputSegmentSizeBytes()
{
return targetCompactionSizeBytes;
return inputSegmentSizeBytes;
}

@JsonProperty
public int getMaxNumSegmentsToCompact()
{
return maxNumSegmentsToCompact;
}

@JsonProperty
public int getNumTargetCompactionSegments()
public long getTargetCompactionSizeBytes()
{
return numTargetCompactionSegments;
return targetCompactionSizeBytes;
}

@JsonProperty
Expand Down Expand Up @@ -140,15 +164,23 @@ public boolean equals(Object o)
return false;
}

if (keepSegmentGranularity != that.keepSegmentGranularity) {
return false;
}

if (taskPriority != that.taskPriority) {
return false;
}

if (targetCompactionSizeBytes != that.targetCompactionSizeBytes) {
if (inputSegmentSizeBytes != that.inputSegmentSizeBytes) {
return false;
}

if (numTargetCompactionSegments != that.numTargetCompactionSegments) {
if (maxNumSegmentsToCompact != that.maxNumSegmentsToCompact) {
return false;
}

if (targetCompactionSizeBytes != that.targetCompactionSizeBytes) {
return false;
}

Expand All @@ -168,9 +200,11 @@ public int hashCode()
{
return Objects.hash(
dataSource,
keepSegmentGranularity,
taskPriority,
inputSegmentSizeBytes,
maxNumSegmentsToCompact,
targetCompactionSizeBytes,
numTargetCompactionSegments,
skipOffsetFromLatest,
tuningConfig,
taskContext
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,8 @@ private CoordinatorStats doRun(
// find segments to be compacted.
final String taskId = indexingServiceClient.compactSegments(
segmentsToCompact,
false,
config.isKeepSegmentGranularity(),
config.getTargetCompactionSizeBytes(),
config.getTaskPriority(),
config.getTuningConfig(),
config.getTaskContext()
Expand Down
Loading