Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,13 @@ jobs:
docker exec -it druid-$v sh -c 'dmesg | tail -3' ;
done

- &integration_perfect_rollup_parallel_batch_index
name: "perfect rollup parallel batch index integration test"
services: *integration_test_services
env: TESTNG_GROUPS='-Dgroups=perfect-rollup-parallel-batch-index'
script: *run_integration_test
after_failure: *integration_test_diags

- &integration_kafka_index
name: "kafka index integration test"
services: *integration_test_services
Expand All @@ -314,6 +321,6 @@ jobs:
- &integration_tests
name: "other integration test"
services: *integration_test_services
env: TESTNG_GROUPS='-DexcludedGroups=batch-index,kafka-index,query,realtime-index'
env: TESTNG_GROUPS='-DexcludedGroups=batch-index,perfect-rollup-parallel-batch-index,kafka-index,query,realtime-index'
script: *run_integration_test
after_failure: *integration_test_diags
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,11 @@ public List<String> getPartitionDimensions()
@Override
public String getForceGuaranteedRollupIncompatiblityReason()
{
return NAME + " partitions unsupported";
if (getPartitionDimension() == null) {
return PARITION_DIMENSION + " must be specified";
}

return FORCE_GUARANTEED_ROLLUP_COMPATIBLE;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import javax.annotation.Nullable;
import java.util.List;
import java.util.Map;
import java.util.Objects;

/**
* {@link ShardSpec} for range partitioning based on a single dimension
Expand Down Expand Up @@ -184,4 +185,26 @@ public String toString()
", partitionNum=" + partitionNum +
'}';
}

@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
SingleDimensionShardSpec that = (SingleDimensionShardSpec) o;
return partitionNum == that.partitionNum &&
Objects.equals(dimension, that.dimension) &&
Objects.equals(start, that.start) &&
Objects.equals(end, that.end);
}

@Override
public int hashCode()
{
return Objects.hash(dimension, start, end, partitionNum);
}
}
2 changes: 1 addition & 1 deletion docs/ingestion/hadoop.md
Original file line number Diff line number Diff line change
Expand Up @@ -366,7 +366,7 @@ The configuration options are:
|type|Type of partitionSpec to be used.|"single_dim"|
|targetRowsPerSegment|Target number of rows to include in a partition, should be a number that targets segments of 500MB\~1GB.|yes|
|targetPartitionSize|Deprecated. Renamed to `targetRowsPerSegment`. Target number of rows to include in a partition, should be a number that targets segments of 500MB\~1GB.|no|
|maxRowsPerSegment|Maximum number of rows to include in a partition. Defaults to 50% larger than the `targetPartitionSize`.|no|
|maxRowsPerSegment|Maximum number of rows to include in a partition. Defaults to 50% larger than the `targetRowsPerSegment`.|no|
|maxPartitionSize|Deprecated. Use `maxRowsPerSegment` instead. Maximum number of rows to include in a partition. Defaults to 50% larger than the `targetPartitionSize`.|no|
|partitionDimension|The dimension to partition on. Leave blank to select a dimension automatically.|no|
|assumeGrouped|Assume that input data has already been grouped on time and dimensions. Ingestion will run faster, but may choose sub-optimal partitions if this assumption is violated.|no|
Expand Down
2 changes: 1 addition & 1 deletion docs/ingestion/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ This table compares the three available options:
| **Input locations** | Any [firehose](native-batch.md#firehoses). | Any [firehose](native-batch.md#firehoses). | Any Hadoop FileSystem or Druid datasource. |
| **File formats** | Text file formats (CSV, TSV, JSON). Support for binary formats is coming in a future release. | Text file formats (CSV, TSV, JSON). Support for binary formats is coming in a future release. | Any Hadoop InputFormat. |
| **[Rollup modes](#rollup)** | Perfect if `forceGuaranteedRollup` = true in the [`tuningConfig`](native-batch.md#tuningconfig).| Perfect if `forceGuaranteedRollup` = true in the [`tuningConfig`](native-batch.md#tuningconfig). | Always perfect. |
| **Partitioning options** | Hash-based partitioning is supported when `forceGuaranteedRollup` = true in the [`tuningConfig`](native-batch.md#tuningconfig). | Hash-based partitioning (when `forceGuaranteedRollup` = true). | Hash-based or range-based partitioning via [`partitionsSpec`](hadoop.md#partitionsspec). |
| **Partitioning options** | Hash-based partitioning is supported when `forceGuaranteedRollup` = true in the [`tuningConfig`](native-batch.md#tuningconfig). | Hash-based or range-based partitioning (when `forceGuaranteedRollup` = true). | Hash-based or range-based partitioning via [`partitionsSpec`](hadoop.md#partitionsspec). |

<a name="data-model"></a>

Expand Down
47 changes: 36 additions & 11 deletions docs/ingestion/native-batch.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ each sub task creates segments individually and reports them to the supervisor t

If `forceGuaranteedRollup` = true, it's executed in two phases with data shuffle which is similar to [MapReduce](https://en.wikipedia.org/wiki/MapReduce).
In the first phase, each sub task partitions input data based on `segmentGranularity` (primary partition key) in `granularitySpec`
and `partitionDimensions` (secondary partition key) in `partitionsSpec`. The partitioned data is served by
and `partitionDimension` or `partitionDimensions` (secondary partition key) in `partitionsSpec`. The partitioned data is served by
the [middleManager](../design/middlemanager.md) or the [indexer](../design/indexer.md)
where the first phase tasks ran. In the second phase, each sub task fetches
partitioned data from MiddleManagers or indexers and merges them to create the final segments.
Expand Down Expand Up @@ -205,13 +205,13 @@ The tuningConfig is optional and default parameters will be used if no tuningCon
|maxRowsInMemory|Used in determining when intermediate persists to disk should occur. Normally user does not need to set this, but depending on the nature of data, if rows are short in terms of bytes, user may not want to store a million rows in memory and this value should be set.|1000000|no|
|maxBytesInMemory|Used in determining when intermediate persists to disk should occur. Normally this is computed internally and user does not need to set it. This value represents number of bytes to aggregate in heap memory before persisting. This is based on a rough estimate of memory usage and not actual usage. The maximum heap memory usage for indexing is maxBytesInMemory * (2 + maxPendingPersists)|1/6 of max JVM memory|no|
|maxTotalRows|Deprecated. Use `partitionsSpec` instead. Total number of rows in segments waiting for being pushed. Used in determining when intermediate pushing should occur.|20000000|no|
|numShards|Deprecated. Use `partitionsSpec` instead. Directly specify the number of shards to create. If this is specified and `intervals` is specified in the `granularitySpec`, the index task can skip the determine intervals/partitions pass through the data. `numShards` cannot be specified if `maxRowsPerSegment` is set.|null|no|
|numShards|Deprecated. Use `partitionsSpec` instead. Directly specify the number of shards to create when using a `hashed` `partitionsSpec`. If this is specified and `intervals` is specified in the `granularitySpec`, the index task can skip the determine intervals/partitions pass through the data. `numShards` cannot be specified if `maxRowsPerSegment` is set.|null|no|
|splitHintSpec|Used to give a hint to control the amount of data that each first phase task reads. This hint could be ignored depending on the implementation of firehose. See [SplitHintSpec](#splithintspec) for more details.|null|no|
|partitionsSpec|Defines how to partition data in each timeChunk, see [PartitionsSpec](#partitionsspec)|`dynamic` if `forceGuaranteedRollup` = false, `hashed` if `forceGuaranteedRollup` = true|no|
|partitionsSpec|Defines how to partition data in each timeChunk, see [PartitionsSpec](#partitionsspec)|`dynamic` if `forceGuaranteedRollup` = false, `hashed` or `single_dim` if `forceGuaranteedRollup` = true|no|
|indexSpec|Defines segment storage format options to be used at indexing time, see [IndexSpec](index.md#indexspec)|null|no|
|indexSpecForIntermediatePersists|Defines segment storage format options to be used at indexing time for intermediate persisted temporary segments. this can be used to disable dimension/metric compression on intermediate segments to reduce memory required for final merging. however, disabling compression on intermediate segments might increase page cache use while they are used before getting merged into final segment published, see [IndexSpec](index.md#indexspec) for possible values.|same as indexSpec|no|
|maxPendingPersists|Maximum number of persists that can be pending but not started. If this limit would be exceeded by a new intermediate persist, ingestion will block until the currently-running persist finishes. Maximum heap memory usage for indexing scales with maxRowsInMemory * (2 + maxPendingPersists).|0 (meaning one persist can be running concurrently with ingestion, and none can be queued up)|no|
|forceGuaranteedRollup|Forces guaranteeing the [perfect rollup](../ingestion/index.md#rollup). The perfect rollup optimizes the total size of generated segments and querying time while indexing time will be increased. If this is set to true, `numShards` in `tuningConfig` and `intervals` in `granularitySpec` must be set. Note that the result segments would be hash-partitioned. This flag cannot be used with `appendToExisting` of IOConfig. For more details, see the below __Segment pushing modes__ section.|false|no|
|forceGuaranteedRollup|Forces guaranteeing the [perfect rollup](../ingestion/index.md#rollup). The perfect rollup optimizes the total size of generated segments and querying time while indexing time will be increased. If this is set to true, `intervals` in `granularitySpec` must be set and `hashed` or `single_dim` must be used for `partitionsSpec`. This flag cannot be used with `appendToExisting` of IOConfig. For more details, see the below __Segment pushing modes__ section.|false|no|
|reportParseExceptions|If true, exceptions encountered during parsing will be thrown and will halt ingestion; if false, unparseable rows and fields will be skipped.|false|no|
|pushTimeout|Milliseconds to wait for pushing segments. It must be >= 0, where 0 means to wait forever.|0|no|
|segmentWriteOutMediumFactory|Segment write-out medium to use when creating segments. See [SegmentWriteOutMediumFactory](#segmentwriteoutmediumfactory).|Not specified, the value from `druid.peon.defaultSegmentWriteOutMediumFactory.type` is used|no|
Expand Down Expand Up @@ -241,18 +241,43 @@ Currently only one splitHintSpec, i.e., `segments`, is available.

### `partitionsSpec`

PartitionsSpec is to describe the secondary partitioning method.
PartitionsSpec is used to describe the secondary partitioning method.
You should use different partitionsSpec depending on the [rollup mode](../ingestion/index.md#rollup) you want.
For perfect rollup, you should use `hashed`.
For perfect rollup, you should use either `hashed` (partitioning based on the hash of dimensions in each row) or
`single_dim` (based on ranges of a single dimension). For best-effort rollup, you should use `dynamic`.

The three `partitionsSpec` types have different pros and cons:
- `dynamic`: Fastest ingestion speed. Guarantees a well-balanced distribution in segment size. Only best-effort rollup.
- `hashed`: Moderate ingestion speed. Creates a well-balanced distribution in segment size. Allows perfect rollup.
- `single_dim`: Slowest ingestion speed. Segment sizes may be skewed depending on the partition key, but the broker can
use the partition information to efficiently prune segments early to speed up queries. Allows perfect rollup.

#### Hash-based partitioning

|property|description|default|required?|
|--------|-----------|-------|---------|
|type|This should always be `hashed`|none|yes|
|targetRowsPerSegment|Target number of rows to include in a partition, should be a number that targets segments of 500MB\~1GB.|5000000 (if `numShards` is not set)|either this or `numShards`|
|numShards|Directly specify the number of shards to create. If this is specified and `intervals` is specified in the `granularitySpec`, the index task can skip the determine intervals/partitions pass through the data. `numShards` cannot be specified if `targetRowsPerSegment` is set.|null|no|
|partitionDimensions|The dimensions to partition on. Leave blank to select all dimensions. Only used with `numShards`, will be ignored when `targetRowsPerSegment` is set.|null|no|
|numShards|Directly specify the number of shards to create. If this is specified and `intervals` is specified in the `granularitySpec`, the index task can skip the determine intervals/partitions pass through the data. `numShards` cannot be specified if `targetRowsPerSegment` is set.|null|yes|
|partitionDimensions|The dimensions to partition on. Leave blank to select all dimensions.|null|no|

For best-effort rollup, you should use `dynamic`.
#### Single-dimension range partitioning

> Single-dimension range partitioning currently requires the
> [druid-datasketches](../development/extensions-core/datasketches-extension.md)
> extension to be [loaded from the classpath](../development/extensions.md#loading-extensions-from-the-classpath).

> Because single-range partitioning makes two passes over the input, the index task may fail if the input changes
> in between the two passes.

|property|description|default|required?|
|--------|-----------|-------|---------|
|type|This should always be `single_dim`|none|yes|
|partitionDimension|The dimension to partition on. Only rows with a single dimension value will be included.|none|yes|
|targetRowsPerSegment|Target number of rows to include in a partition, should be a number that targets segments of 500MB\~1GB.|none|either this or `maxRowsPerSegment`|
|maxRowsPerSegment|Maximum number of rows to include in a partition. Defaults to 50% larger than the `targetRowsPerSegment`.|none|either this or `targetRowsPerSegment`|
|assumeGrouped|Assume that input data has already been grouped on time and dimensions. Ingestion will run faster, but may choose sub-optimal partitions if this assumption is violated.|false|no|

#### Dynamic partitioning

|property|description|default|required?|
|--------|-----------|-------|---------|
Expand Down Expand Up @@ -943,4 +968,4 @@ A spec that applies a filter and reads a subset of the original datasource's col
}
```

This spec above will only return the `page`, `user` dimensions and `added` metric. Only rows where `page` = `Druid` will be returned.
This spec above will only return the `page`, `user` dimensions and `added` metric. Only rows where `page` = `Druid` will be returned.
7 changes: 0 additions & 7 deletions extensions-core/datasketches/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -34,16 +34,10 @@
<relativePath>../../pom.xml</relativePath>
</parent>

<properties>
<datasketches.core.version>1.1.0-incubating</datasketches.core.version>
<datasketches.memory.version>1.2.0-incubating</datasketches.memory.version>
</properties>

<dependencies>
<dependency>
<groupId>org.apache.datasketches</groupId>
<artifactId>datasketches-java</artifactId>
<version>${datasketches.core.version}</version>
<exclusions>
<exclusion>
<groupId>com.google.code.findbugs</groupId>
Expand All @@ -54,7 +48,6 @@
<dependency>
<groupId>org.apache.datasketches</groupId>
<artifactId>datasketches-memory</artifactId>
<version>${datasketches.memory.version}</version>
</dependency>
<dependency>
<groupId>org.apache.calcite</groupId>
Expand Down
20 changes: 20 additions & 0 deletions indexing-service/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,26 @@
<groupId>it.unimi.dsi</groupId>
<artifactId>fastutil</artifactId>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
</dependency>
<dependency>
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would you please add a comment where datasketches are used?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a comment

<!-- Used in native parallel batch indexing to determine distribution of dimension values -->
<groupId>org.apache.datasketches</groupId>
<artifactId>datasketches-java</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<!-- Used in native parallel batch indexing to determine distribution of dimension values -->
<groupId>org.apache.datasketches</groupId>
<artifactId>datasketches-memory</artifactId>
<scope>provided</scope>
</dependency>

<!-- Tests -->
<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.druid.indexing.common.TaskLock;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.actions.LockListAction;
import org.apache.druid.indexing.common.actions.SurrogateAction;
import org.apache.druid.indexing.common.task.IndexTask.ShardSpecs;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
Expand All @@ -43,8 +44,9 @@
* Allocates all necessary segments locally at the beginning and reuses them.
*
* @see HashPartitionCachingLocalSegmentAllocator
* @see RangePartitionCachingLocalSegmentAllocator
*/
class CachingLocalSegmentAllocator implements IndexTaskSegmentAllocator
class CachingLocalSegmentAllocatorHelper implements IndexTaskSegmentAllocator
{
private final String taskId;
private final Map<String, SegmentIdWithShardSpec> sequenceNameToSegmentId;
Expand All @@ -55,27 +57,30 @@ interface IntervalToSegmentIdsCreator
{
/**
* @param versionFinder Returns the version for the specified interval
*
* @return Information for segment preallocation
*/
Map<Interval, List<SegmentIdWithShardSpec>> create(Function<Interval, String> versionFinder);
}

CachingLocalSegmentAllocator(
CachingLocalSegmentAllocatorHelper(
TaskToolbox toolbox,
String taskId,
String supervisorTaskId,
IntervalToSegmentIdsCreator intervalToSegmentIdsCreator
) throws IOException
{
this.taskId = taskId;
this.sequenceNameToSegmentId = new HashMap<>();

final Map<Interval, String> intervalToVersion = toolbox.getTaskActionClient()
.submit(new LockListAction())
.stream()
.collect(Collectors.toMap(
TaskLock::getInterval,
TaskLock::getVersion
));
final Map<Interval, String> intervalToVersion =
toolbox.getTaskActionClient()
.submit(new SurrogateAction<>(supervisorTaskId, new LockListAction()))
.stream()
.collect(Collectors.toMap(
TaskLock::getInterval,
TaskLock::getVersion
));
Function<Interval, String> versionFinder = interval -> findVersion(intervalToVersion, interval);

final Map<Interval, List<SegmentIdWithShardSpec>> intervalToIds = intervalToSegmentIdsCreator.create(versionFinder);
Expand Down
Loading