Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.fasterxml.jackson.annotation.JsonIgnore;
import org.apache.druid.data.input.impl.InputRowParser;

import javax.annotation.Nullable;
import java.io.IOException;
import java.util.stream.Stream;

Expand Down Expand Up @@ -52,13 +53,13 @@ default boolean isSplittable()
* lazily so that the listing overhead could be amortized.
*/
@JsonIgnore
Stream<InputSplit<S>> getSplits() throws IOException;
Stream<InputSplit<S>> getSplits(@Nullable SplitHintSpec splitHintSpec) throws IOException;

/**
* Returns number of splits returned by {@link #getSplits()}.
* Returns number of splits returned by {@link #getSplits}.
*/
@JsonIgnore
int getNumSplits() throws IOException;
int getNumSplits(@Nullable SplitHintSpec splitHintSpec) throws IOException;

/**
* Returns the same {@link FiniteFirehoseFactory} but with the given {@link InputSplit}. The returned
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.data.input;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;

import javax.annotation.Nullable;
import java.util.Objects;

/**
* {@link SplitHintSpec} for IngestSegmentFirehoseFactory.
*/
public class SegmentsSplitHintSpec implements SplitHintSpec
{
public static final String TYPE = "segments";

private static final long DEFAULT_MAX_INPUT_SEGMENT_BYTES_PER_TASK = 500 * 1024 * 1024;

/**
* Maximum number of bytes of input segments to process in a single task.
* If a single segment is larger than this number, it will be processed by itself in a single task.
*/
private final long maxInputSegmentBytesPerTask;

@JsonCreator
public SegmentsSplitHintSpec(
@JsonProperty("maxInputSegmentBytesPerTask") @Nullable Long maxInputSegmentBytesPerTask
)
{
this.maxInputSegmentBytesPerTask = maxInputSegmentBytesPerTask == null
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have to handle -1 as null for new specs?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This parameter was added in #7048 and it doesn't count -1 as null. IMO, handling -1 as null is a legacy behavior and new parameters shouldn't do that.

? DEFAULT_MAX_INPUT_SEGMENT_BYTES_PER_TASK
: maxInputSegmentBytesPerTask;
}

@JsonProperty
public long getMaxInputSegmentBytesPerTask()
{
return maxInputSegmentBytesPerTask;
}

@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
SegmentsSplitHintSpec that = (SegmentsSplitHintSpec) o;
return maxInputSegmentBytesPerTask == that.maxInputSegmentBytesPerTask;
}

@Override
public int hashCode()
{
return Objects.hash(maxInputSegmentBytesPerTask);
}

@Override
public String toString()
{
return "SegmentsSplitHintSpec{" +
"maxInputSegmentBytesPerTask=" + maxInputSegmentBytesPerTask +
'}';
}
}
41 changes: 41 additions & 0 deletions core/src/main/java/org/apache/druid/data/input/SplitHintSpec.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.data.input;

import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonSubTypes.Type;
import com.fasterxml.jackson.annotation.JsonTypeInfo;

/**
* In native parallel indexing, the supervisor task partitions input data into splits and assigns each of them
* to a single sub task. How to create splits could mainly depend on the input file format, but sometimes druid users
* want to give some hints to control the amount of data each sub task will read. SplitHintSpec can be used for this
* purpose. Implementations can ignore the given hint.
*
* @see FiniteFirehoseFactory#getSplits(SplitHintSpec)
* @see FiniteFirehoseFactory#getNumSplits(SplitHintSpec)
*/
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
@JsonSubTypes(value = {
@Type(name = SegmentsSplitHintSpec.TYPE, value = SegmentsSplitHintSpec.class)
})
public interface SplitHintSpec
{
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,10 @@
import org.apache.druid.data.input.FiniteFirehoseFactory;
import org.apache.druid.data.input.Firehose;
import org.apache.druid.data.input.InputSplit;
import org.apache.druid.data.input.SplitHintSpec;
import org.apache.druid.java.util.common.logger.Logger;

import javax.annotation.Nullable;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
Expand Down Expand Up @@ -102,14 +104,14 @@ public List<T> getObjects()
}

@Override
public Stream<InputSplit<T>> getSplits() throws IOException
public Stream<InputSplit<T>> getSplits(@Nullable SplitHintSpec splitHintSpec) throws IOException
{
initializeObjectsIfNeeded();
return getObjects().stream().map(InputSplit::new);
}

@Override
public int getNumSplits() throws IOException
public int getNumSplits(@Nullable SplitHintSpec splitHintSpec) throws IOException
{
initializeObjectsIfNeeded();
return getObjects().size();
Expand Down
2 changes: 2 additions & 0 deletions docs/configuration/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -813,9 +813,11 @@ If you see this problem, it's recommended to set `skipOffsetFromLatest` to some
|`maxRowsInMemory`|See [tuningConfig for indexTask](../ingestion/native-batch.md#tuningconfig)|no (default = 1000000)|
|`maxBytesInMemory`|See [tuningConfig for indexTask](../ingestion/native-batch.md#tuningconfig)|no (1/6 of max JVM memory)|
|`maxTotalRows`|See [tuningConfig for indexTask](../ingestion/native-batch.md#tuningconfig)|no (default = 20000000)|
|`splitHintSpec`|See [tuningConfig for indexTask](../ingestion/native-batch.md#tuningconfig)|no (default = null)|
|`indexSpec`|See [IndexSpec](../ingestion/index.md#indexspec)|no|
|`maxPendingPersists`|See [tuningConfig for indexTask](../ingestion/native-batch.md#tuningconfig)|no (default = 0 (meaning one persist can be running concurrently with ingestion, and none can be queued up))|
|`pushTimeout`|See [tuningConfig for indexTask](../ingestion/native-batch.md#tuningconfig)|no (default = 0)|
|`maxNumConcurrentSubTasks`|See [tuningConfig for indexTask](../ingestion/native-batch.md#tuningconfig)|no (default = 1)|

### Overlord

Expand Down
7 changes: 4 additions & 3 deletions docs/ingestion/data-management.md
Original file line number Diff line number Diff line change
Expand Up @@ -100,9 +100,10 @@ Compaction tasks merge all segments of the given interval. The syntax is:
"id": <task_id>,
"dataSource": <task_datasource>,
"ioConfig": <IO config>,
"dimensions" <custom dimensionsSpec>,
"dimensionsSpec" <custom dimensionsSpec>,
"metricsSpec" <custom metricsSpec>,
"segmentGranularity": <segment granularity after compaction>,
"tuningConfig" <index task tuningConfig>,
"tuningConfig" <parallel indexing task tuningConfig>,
"context": <task context>
}
```
Expand All @@ -116,7 +117,7 @@ Compaction tasks merge all segments of the given interval. The syntax is:
|`dimensionsSpec`|Custom dimensionsSpec. Compaction task will use this dimensionsSpec if exist instead of generating one. See below for more details.|No|
|`metricsSpec`|Custom metricsSpec. Compaction task will use this metricsSpec if specified rather than generating one.|No|
|`segmentGranularity`|If this is set, compactionTask will change the segment granularity for the given interval. See `segmentGranularity` of [`granularitySpec`](index.md#granularityspec) for more details. See the below table for the behavior.|No|
|`tuningConfig`|[Index task tuningConfig](../ingestion/native-batch.md#tuningconfig)|No|
|`tuningConfig`|[Parallel indexing task tuningConfig](../ingestion/native-batch.md#tuningconfig)|No|
|`context`|[Task context](../ingestion/tasks.md#context)|No|


Expand Down
21 changes: 19 additions & 2 deletions docs/ingestion/native-batch.md
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,7 @@ The tuningConfig is optional and default parameters will be used if no tuningCon
|maxBytesInMemory|Used in determining when intermediate persists to disk should occur. Normally this is computed internally and user does not need to set it. This value represents number of bytes to aggregate in heap memory before persisting. This is based on a rough estimate of memory usage and not actual usage. The maximum heap memory usage for indexing is maxBytesInMemory * (2 + maxPendingPersists)|1/6 of max JVM memory|no|
|maxTotalRows|Deprecated. Use `partitionsSpec` instead. Total number of rows in segments waiting for being pushed. Used in determining when intermediate pushing should occur.|20000000|no|
|numShards|Deprecated. Use `partitionsSpec` instead. Directly specify the number of shards to create. If this is specified and `intervals` is specified in the `granularitySpec`, the index task can skip the determine intervals/partitions pass through the data. `numShards` cannot be specified if `maxRowsPerSegment` is set.|null|no|
|splitHintSpec|Used to give a hint to control the amount of data that each first phase task reads. This hint could be ignored depending on the implementation of firehose. See [SplitHintSpec](#splithintspec) for more details.|null|no|
|partitionsSpec|Defines how to partition data in each timeChunk, see [PartitionsSpec](#partitionsspec)|`dynamic` if `forceGuaranteedRollup` = false, `hashed` if `forceGuaranteedRollup` = true|no|
|indexSpec|Defines segment storage format options to be used at indexing time, see [IndexSpec](index.md#indexspec)|null|no|
|indexSpecForIntermediatePersists|Defines segment storage format options to be used at indexing time for intermediate persisted temporary segments. this can be used to disable dimension/metric compression on intermediate segments to reduce memory required for final merging. however, disabling compression on intermediate segments might increase page cache use while they are used before getting merged into final segment published, see [IndexSpec](index.md#indexspec) for possible values.|same as indexSpec|no|
Expand All @@ -212,14 +213,30 @@ The tuningConfig is optional and default parameters will be used if no tuningCon
|reportParseExceptions|If true, exceptions encountered during parsing will be thrown and will halt ingestion; if false, unparseable rows and fields will be skipped.|false|no|
|pushTimeout|Milliseconds to wait for pushing segments. It must be >= 0, where 0 means to wait forever.|0|no|
|segmentWriteOutMediumFactory|Segment write-out medium to use when creating segments. See [SegmentWriteOutMediumFactory](#segmentwriteoutmediumfactory).|Not specified, the value from `druid.peon.defaultSegmentWriteOutMediumFactory.type` is used|no|
|maxNumConcurrentSubTasks|Maximum number of tasks which can be run in parallel at the same time. The supervisor task would spawn worker tasks up to `maxNumConcurrentSubTasks` regardless of the current available task slots. If this value is set to 1, the supervisor task processes data ingestion on its own instead of spawning worker tasks. If this value is set to too large, too many worker tasks can be created which might block other ingestion. Check [Capacity Planning](#capacity-planning) for more details.|1|no|
|maxNumConcurrentSubTasks|Maximum number of sub tasks which can be run in parallel at the same time. The supervisor task would spawn worker tasks up to `maxNumConcurrentSubTasks` regardless of the current available task slots. If this value is set to 1, the supervisor task processes data ingestion on its own instead of spawning worker tasks. If this value is set to too large, too many worker tasks can be created which might block other ingestion. Check [Capacity Planning](#capacity-planning) for more details.|1|no|
|maxRetry|Maximum number of retries on task failures.|3|no|
|maxNumSegmentsToMerge|Max limit for the number of segments that a single task can merge at the same time in the second phase. Used only `forceGuaranteedRollup` is set.|100|no|
|totalNumMergeTasks|Total number of tasks to merge segments in the second phase when `forceGuaranteedRollup` is set.|10|no|
|taskStatusCheckPeriodMs|Polling period in milliseconds to check running task statuses.|1000|no|
|chatHandlerTimeout|Timeout for reporting the pushed segments in worker tasks.|PT10S|no|
|chatHandlerNumRetries|Retries for reporting the pushed segments in worker tasks.|5|no|

### `splitHintSpec`

`SplitHintSpec` is used to give a hint when the supervisor task creates input splits.
Note that each sub task processes a single input split. You can control the amount of data each sub task will read during the first phase.

Currently only one splitHintSpec, i.e., `segments`, is available.

#### `SegmentsSplitHintSpec`

`SegmentsSplitHintSpec` is used only for `IngestSegmentFirehose`.

|property|description|default|required?|
|--------|-----------|-------|---------|
|type|This should always be `segments`.|none|yes|
|maxInputSegmentBytesPerTask|Maximum number of bytes of input segments to process in a single task. If a single segment is larger than this number, it will be processed by itself in a single task (input segments are never split across tasks).|150MB|no|

### `partitionsSpec`

PartitionsSpec is to describe the secondary partitioning method.
Expand Down Expand Up @@ -785,7 +802,7 @@ This firehose will accept any type of parser, but will only utilize the list of
|dimensions|The list of dimensions to select. If left empty, no dimensions are returned. If left null or not defined, all dimensions are returned. |no|
|metrics|The list of metrics to select. If left empty, no metrics are returned. If left null or not defined, all metrics are selected.|no|
|filter| See [Filters](../querying/filters.md)|no|
|maxInputSegmentBytesPerTask|When used with the native parallel index task, the maximum number of bytes of input segments to process in a single task. If a single segment is larger than this number, it will be processed by itself in a single task (input segments are never split across tasks). Defaults to 150MB.|no|
|maxInputSegmentBytesPerTask|Deprecated. Use [SegmentsSplitHintSpec](#segmentssplithintspec) instead. When used with the native parallel index task, the maximum number of bytes of input segments to process in a single task. If a single segment is larger than this number, it will be processed by itself in a single task (input segments are never split across tasks). Defaults to 150MB.|no|

<a name="sql-firehose"></a>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ public void testWithSplit() throws IOException
5
);
final List<FiniteFirehoseFactory<StringInputRowParser, URI>> subFactories = factory
.getSplits()
.getSplits(null)
.map(factory::withSplit)
.sorted(Comparator.comparing(eachFactory -> {
final StaticS3FirehoseFactory staticS3FirehoseFactory = (StaticS3FirehoseFactory) eachFactory;
Expand Down
Loading