Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
8b7826d
Early publishing segments in the middle of data ingestion
jihoonson May 2, 2017
bfdbac9
Remove unnecessary logs
jihoonson May 2, 2017
3ee09d2
Address comments
jihoonson May 7, 2017
f898776
Refactoring the patch according to #4292 and address comments
jihoonson Jun 6, 2017
95d659e
Merge branch 'master' of https://github.com/druid-io/druid into segme…
jihoonson Jun 6, 2017
be31858
Set the total shard number of NumberedShardSpec to 0
jihoonson Jun 8, 2017
800401e
Merge branch 'master' of https://github.com/druid-io/druid into segme…
jihoonson Jun 10, 2017
f5737ab
Merge branch 'master' of https://github.com/druid-io/druid into segme…
jihoonson Jun 14, 2017
784eff7
refactoring
jihoonson Jun 16, 2017
d8ecc48
Address comments
jihoonson Jun 20, 2017
b27e7f6
Fix tests
jihoonson Jun 20, 2017
06ff062
Merge branch 'master' of https://github.com/druid-io/druid into segme…
jihoonson Jun 27, 2017
70cf3af
Address comments
jihoonson Jun 27, 2017
05dd24b
Merge branch 'master' of https://github.com/druid-io/druid into segme…
jihoonson Jun 28, 2017
3df551c
Fix sync problem of committer and retry push only
jihoonson Jun 30, 2017
ac196d9
Merge branch 'master' of https://github.com/druid-io/druid into segme…
jihoonson Jun 30, 2017
8ae9f62
Fix doc
jihoonson Jun 30, 2017
f85f17d
Fix build failure
jihoonson Jun 30, 2017
dc5e86f
Address comments
jihoonson Jul 8, 2017
90ec78f
Merge branch 'master' of https://github.com/druid-io/druid into segme…
jihoonson Jul 8, 2017
248f524
Fix compilation failure
jihoonson Jul 8, 2017
4a2d651
Merge branch 'master' of https://github.com/druid-io/druid into segme…
jihoonson Jul 10, 2017
1a3e856
Fix transient test failure
jihoonson Jul 10, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 29 additions & 20 deletions docs/content/design/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,30 @@ They each represent an axis of the data that we’ve chosen to slice across.
Metrics are usually numeric values, and computations include operations such as count, sum, and mean.
Also known as measures in standard OLAP terminology.

## Sharding the Data

Druid shards are called `segments` and Druid always first shards data by time. In our compacted data set, we can create two segments, one for each hour of data.

For example:

Segment `sampleData_2011-01-01T01:00:00:00Z_2011-01-01T02:00:00:00Z_v1_0` contains

2011-01-01T01:00:00Z ultratrimfast.com google.com Male USA 1800 25 15.70
2011-01-01T01:00:00Z bieberfever.com google.com Male USA 2912 42 29.18


Segment `sampleData_2011-01-01T02:00:00:00Z_2011-01-01T03:00:00:00Z_v1_0` contains

2011-01-01T02:00:00Z ultratrimfast.com google.com Male UK 1953 17 17.31
2011-01-01T02:00:00Z bieberfever.com google.com Male UK 3194 170 34.01

Segments are self-contained containers for the time interval of data they hold. Segments
contain data stored in compressed column orientations, along with the indexes for those columns. Druid queries only understand how to
scan segments.

Segments are uniquely identified by a datasource, interval, version, and an optional partition number.
Examining our example segments, the segments are named following this convention: `dataSource_interval_version_partitionNumber`

## Roll-up

The individual events in our example data set are not very interesting because there may be trillions of such events.
Expand All @@ -56,30 +80,15 @@ This storage reduction does come at a cost; as we roll up data, we lose the abil
the rollup granularity is the minimum granularity you will be able to explore data at and events are floored to this granularity.
Hence, Druid ingestion specs define this granularity as the `queryGranularity` of the data. The lowest supported `queryGranularity` is millisecond.

## Sharding the Data

Druid shards are called `segments` and Druid always first shards data by time. In our compacted data set, we can create two segments, one for each hour of data.
### Roll-up modes
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This new doc section is great.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks :)


For example:

Segment `sampleData_2011-01-01T01:00:00:00Z_2011-01-01T02:00:00:00Z_v1_0` contains

2011-01-01T01:00:00Z ultratrimfast.com google.com Male USA 1800 25 15.70
2011-01-01T01:00:00Z bieberfever.com google.com Male USA 2912 42 29.18
Druid supports two roll-up modes, i.e., _perfect roll-up_ and _best-effort roll-up_. In the perfect roll-up mode, Druid guarantees that input data are perfectly aggregated at ingestion time. Meanwhile, in the best-effort roll-up, input data might not be perfectly aggregated and thus there can be multiple segments holding the rows which should belong to the same segment with the perfect roll-up since they have the same dimension value and their timestamps fall into the same interval.

The perfect roll-up mode encompasses an additional preprocessing step to determine intervals and shardSpecs before actual data ingestion if they are not specified in the ingestionSpec. This preprocessing step usually scans the entire input data which might increase the ingestion time. The [Hadoop indexing task](./ingestion/batch-ingestion.html) always runs with this perfect roll-up mode.

Segment `sampleData_2011-01-01T02:00:00:00Z_2011-01-01T03:00:00:00Z_v1_0` contains

2011-01-01T02:00:00Z ultratrimfast.com google.com Male UK 1953 17 17.31
2011-01-01T02:00:00Z bieberfever.com google.com Male UK 3194 170 34.01

Segments are self-contained containers for the time interval of data they hold. Segments
contain data stored in compressed column orientations, along with the indexes for those columns. Druid queries only understand how to
scan segments.

Segments are uniquely identified by a datasource, interval, version, and an optional partition number.
Examining our example segments, the segments are named following this convention: `dataSource_interval_version_partitionNumber`
On the contrary, the best-effort roll-up mode doesn't require any preprocessing step, but the size of ingested data might be larger than that of the perfect roll-up. All types of [streaming indexing (i.e., realtime index task, kafka indexing service, ...)](./ingestion/stream-ingestion.html) run with this mode.

Finally, the [native index task](./ingestion/tasks.html) supports both modes and you can choose either one which fits to your application.

## Indexing the Data

Expand Down
12 changes: 12 additions & 0 deletions docs/content/ingestion/tasks.md
Original file line number Diff line number Diff line change
Expand Up @@ -114,10 +114,12 @@ The tuningConfig is optional and default parameters will be used if no tuningCon
|type|The task type, this should always be "index".|none|yes|
|targetPartitionSize|Used in sharding. Determines how many rows are in each segment.|5000000|no|
|maxRowsInMemory|Used in determining when intermediate persists to disk should occur.|75000|no|
|maxTotalRows|Total number of rows in segments waiting for being published. Used in determining when intermediate publish should occur.|150000|no|
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add maybe another section to the docs about the task's publishing behavior? I think it'd be useful to have one place where the user can see what publishing modes are supported (early vs. no early) and what configuration parameters affect the publishing behavior

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I added more documents.

|numShards|Directly specify the number of shards to create. If this is specified and 'intervals' is specified in the granularitySpec, the index task can skip the determine intervals/partitions pass through the data. numShards cannot be specified if targetPartitionSize is set.|null|no|
|indexSpec|defines segment storage format options to be used at indexing time, see [IndexSpec](#indexspec)|null|no|
|maxPendingPersists|Maximum number of persists that can be pending but not started. If this limit would be exceeded by a new intermediate persist, ingestion will block until the currently-running persist finishes. Maximum heap memory usage for indexing scales with maxRowsInMemory * (2 + maxPendingPersists).|0 (meaning one persist can be running concurrently with ingestion, and none can be queued up)|no|
|forceExtendableShardSpecs|Forces use of extendable shardSpecs. Experimental feature intended for use with the [Kafka indexing service extension](../development/extensions-core/kafka-ingestion.html).|false|no|
|forceGuaranteedRollup|Forces guaranteeing the [perfect rollup](./design/index.html). The perfect rollup optimizes the total size of generated segments and querying time while indexing time will be increased. This flag cannot be used with either `appendToExisting` of IOConfig or `forceExtendableShardSpecs`. For more details, see the below __Segment publishing modes__ section.|false|no|
|reportParseExceptions|If true, exceptions encountered during parsing will be thrown and will halt ingestion; if false, unparseable rows and fields will be skipped.|false|no|
|publishTimeout|Milliseconds to wait for publishing segments. It must be >= 0, where 0 means to wait forever.|0|no|

Expand Down Expand Up @@ -148,6 +150,16 @@ For Roaring bitmaps:
|type|String|Must be `roaring`.|yes|
|compressRunOnSerialization|Boolean|Use a run-length encoding where it is estimated as more space efficient.|no (default == `true`)|

#### Segment publishing modes

While ingesting data using the Index task, it creates segments from the input data and publishes them. For segment publishing, the Index task supports two segment publishing modes, i.e., _bulk publishing mode_ and _incremental publishing mode_ for [perfect rollup and best-effort rollup](./design/index.html), respectively.

In the bulk publishing mode, every segment is published at the very end of the index task. Until then, created segments are stored in the memory and local storage of the node running the index task. As a result, this mode might cause a problem due to limited storage capacity, and is not recommended to use in production.

On the contrary, in the incremental publishing mode, segments are incrementally published, that is they can be published in the middle of the index task. More precisely, the index task collects data and stores created segments in the memory and disks of the node running that task until the total number of collected rows exceeds `maxTotalRows`. Once it exceeds, the index task immediately publishes all segments created until that moment, cleans all published segments up, and continues to ingest remaining data.

To enable bulk publishing mode, `forceGuaranteedRollup` should be set in the TuningConfig. Note that this option cannot be used with either `forceExtendableShardSpecs` of TuningConfig or `appendToExisting` of IOConfig.

Segment Merging Tasks
---------------------

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -596,7 +596,7 @@ public String apply(DataSegment input)
}

toolbox.getDataSegmentServerAnnouncer().unannounce();

return success();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,12 @@

package io.druid.indexing.appenderator;

import io.druid.data.input.InputRow;
import io.druid.indexing.common.actions.SegmentAllocateAction;
import io.druid.indexing.common.actions.TaskActionClient;
import io.druid.segment.indexing.DataSchema;
import io.druid.segment.realtime.appenderator.SegmentAllocator;
import io.druid.segment.realtime.appenderator.SegmentIdentifier;
import org.joda.time.DateTime;

import java.io.IOException;

Expand All @@ -44,15 +44,15 @@ public ActionBasedSegmentAllocator(

@Override
public SegmentIdentifier allocate(
final DateTime timestamp,
final InputRow row,
final String sequenceName,
final String previousSegmentId
) throws IOException
{
return taskActionClient.submit(
new SegmentAllocateAction(
dataSchema.getDataSource(),
timestamp,
row.getTimestamp(),
dataSchema.getGranularitySpec().getQueryGranularity(),
dataSchema.getGranularitySpec().getSegmentGranularity(),
sequenceName,
Expand Down
Loading