Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 22 additions & 3 deletions api/src/main/java/io/druid/data/input/impl/DimensionSchema.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import io.druid.guice.annotations.PublicApi;
import io.druid.java.util.common.StringUtils;

import java.util.Objects;

/**
*/
@PublicApi
Expand Down Expand Up @@ -116,7 +118,7 @@ public static MultiValueHandling ofDefault()
protected DimensionSchema(String name, MultiValueHandling multiValueHandling)
{
this.name = Preconditions.checkNotNull(name, "Dimension name cannot be null.");
this.multiValueHandling = multiValueHandling;
this.multiValueHandling = multiValueHandling == null ? MultiValueHandling.ofDefault() : multiValueHandling;
}

@JsonProperty
Expand Down Expand Up @@ -149,13 +151,30 @@ public boolean equals(Object o)

DimensionSchema that = (DimensionSchema) o;

return name.equals(that.name);
if (!name.equals(that.name)) {
return false;
}

if (!getValueType().equals(that.getValueType())) {
return false;
}

return Objects.equals(multiValueHandling, that.multiValueHandling);
}

@Override
public int hashCode()
{
return name.hashCode();
return Objects.hash(name, getValueType(), multiValueHandling);
}

@Override
public String toString()
{
return "DimensionSchema{" +
"name='" + name + "'" +
", valueType='" + getValueType() + "'" +
", multiValueHandling='" + getMultiValueHandling() + "'" +
"}";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ public InputRow parse(InputRow input)
return input;
}

@JsonProperty
@Override
public ParseSpec getParseSpec()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import junit.framework.Assert;
import org.junit.Assert;
import org.junit.Test;

/**
Expand Down
2 changes: 1 addition & 1 deletion docs/content/ingestion/firehose.md
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ A sample ingest firehose spec is shown below -
|interval|A String representing ISO-8601 Interval. This defines the time range to fetch the data over.|yes|
|dimensions|The list of dimensions to select. If left empty, no dimensions are returned. If left null or not defined, all dimensions are returned. |no|
|metrics|The list of metrics to select. If left empty, no metrics are returned. If left null or not defined, all metrics are selected.|no|
|filter| See [Filters](../querying/filters.html)|yes|
|filter| See [Filters](../querying/filters.html)|no|
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch.


#### CombiningFirehose

Expand Down
98 changes: 80 additions & 18 deletions docs/content/ingestion/tasks.md
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ Tasks can have different default priorities depening on their types. Here are a
|---------|----------------|
|Realtime index task|75|
|Batch index task|50|
|Merge/Append task|25|
|Merge/Append/Compaction task|25|
|Other tasks|0|

You can override the task priority by setting your priority in the task context like below.
Expand Down Expand Up @@ -184,19 +184,6 @@ On the contrary, in the incremental publishing mode, segments are incrementally

To enable bulk publishing mode, `forceGuaranteedRollup` should be set in the TuningConfig. Note that this option cannot be used with either `forceExtendableShardSpecs` of TuningConfig or `appendToExisting` of IOConfig.

### Task Context

The task context is used for various task configuration parameters. The following parameters apply to all tasks.

|property|default|description|
|--------|-------|-----------|
|taskLockTimeout|300000|task lock timeout in millisecond. For more details, see [the below Locking section](#locking).|

<div class="note caution">
When a task acquires a lock, it sends a request via HTTP and awaits until it receives a response containing the lock acquisition result.
As a result, an HTTP timeout error can occur if `taskLockTimeout` is greater than `druid.server.http.maxIdleTime` of overlords.
</div>

Segment Merging Tasks
---------------------

Expand All @@ -210,7 +197,8 @@ Append tasks append a list of segments together into a single segment (one after
"id": <task_id>,
"dataSource": <task_datasource>,
"segments": <JSON list of DataSegment objects to append>,
"aggregations": <optional list of aggregators>
"aggregations": <optional list of aggregators>,
"context": <task context>
}
```

Expand All @@ -228,7 +216,8 @@ The grammar is:
"dataSource": <task_datasource>,
"aggregations": <list of aggregators>,
"rollup": <whether or not to rollup data during a merge>,
"segments": <JSON list of DataSegment objects to merge>
"segments": <JSON list of DataSegment objects to merge>,
"context": <task context>
}
```

Expand All @@ -245,10 +234,67 @@ The grammar is:
"dataSource": <task_datasource>,
"aggregations": <list of aggregators>,
"rollup": <whether or not to rollup data during a merge>,
"interval": <DataSegment objects in this interval are going to be merged>
"interval": <DataSegment objects in this interval are going to be merged>,
"context": <task context>
}
```

### Compaction Task

Compaction tasks merge all segments of the given interval. The syntax is:

```json
{
"type": "compact",
"id": <task_id>,
"dataSource": <task_datasource>,
"interval": <interval to specify segments to be merged>,
"dimensions" <custom dimensionsSpec>,
"tuningConfig" <index task tuningConfig>,
"context": <task context>
}
```

|Field|Description|Required|
|-----|-----------|--------|
|`type`|Task type. Should be `compact`|Yes|
|`id`|Task id|No|
|`dataSource`|dataSource name to be compacted|Yes|
|`interval`|interval of segments to be compacted|Yes|
|`dimensions`|custom dimensionsSpec. compaction task will use this dimensionsSpec if exist instead of generating one. See below for more details.|No|
|`tuningConfig`|[Index task tuningConfig](#tuningconfig)|No|
|`context`|[Task context](#taskcontext)|No|

An example of compaction task is

```json
{
"type" : "compact",
"dataSource" : "wikipedia",
"interval" : "2017-01-01/2018-01-01"
}
```

This compaction task merges _all segments_ of the interval `2017-01-01/2018-01-01` into a _single segment_.
To merge each day's worth of data into a separate segment, you can submit multiple `compact` tasks, one for each day. They will run in parallel.

A compaction task internally generates an `index` task spec for performing compaction work with some fixed parameters.
For example, its `firehose` is always the [ingestSegmentSpec](./firehose.html), and `dimensionsSpec` and `metricsSpec`
include all dimensions and metrics of the input segments by default.

The output segment can have different metadata from the input segments unless all input segments have the same metadata.

- Dimensions: since Druid supports schema change, the dimensions can be different across segments even if they are a part of the same dataSource.
If the input segments have different dimensions, the output segment basically includes all dimensions of the input segments.
However, even if the input segments have the same set of dimensions, the dimension order or the data type of dimensions can be different. For example, the data type of some dimensions can be
changed from `string` to primitive types, or the order of dimensions can be changed for better locality (See [Partitioning](batch-ingestion.html#partitioning-specification)).
In this case, the dimensions of recent segments precede that of old segments in terms of data types and the ordering.
This is because more recent segments are more likely to have the new desired order and data types. If you want to use
your own ordering and types, you can specify a custom `dimensionsSpec` in the compaction task spec.
- Roll-up: the output segment is rolled up only when `rollup` is set for all input segments.
See [Roll-up](../design/index.html#roll-up) for more details.
You can check that your segments are rolled up or not by using [Segment Metadata Queries](../querying/segmentmetadataquery.html#analysistypes).

Segment Destroying Tasks
------------------------

Expand All @@ -261,7 +307,8 @@ Kill tasks delete all information about a segment and removes it from deep stora
"type": "kill",
"id": <task_id>,
"dataSource": <task_datasource>,
"interval" : <all_segments_in_this_interval_will_die!>
"interval" : <all_segments_in_this_interval_will_die!>,
"context": <task context>
}
```

Expand Down Expand Up @@ -342,6 +389,21 @@ These tasks start, sleep for a time and are used only for testing. The available
}
```

Task Context
------------

The task context is used for various task configuration parameters. The following parameters apply to all task types.

|property|default|description|
|--------|-------|-----------|
|taskLockTimeout|300000|task lock timeout in millisecond. For more details, see [the below Locking section](#locking).|
|priority|Different based on task types. See [Task Priority](#task-priority).|Task priority|

<div class="note caution">
When a task acquires a lock, it sends a request via HTTP and awaits until it receives a response containing the lock acquisition result.
As a result, an HTTP timeout error can occur if `taskLockTimeout` is greater than `druid.server.http.maxIdleTime` of overlords.
</div>

Locking
-------

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ public List<AggregatorFactory> getRequiredColumns()
@Override
public AggregatorFactory getMergingFactory(AggregatorFactory other) throws AggregatorFactoryNotMergeableException
{
if (Objects.equals(getName(), other.getName()) && this.getClass() == other.getClass()) {
if (Objects.equals(getName(), other.getName()) && other instanceof VarianceAggregatorFactory) {
return getCombiningFactory();
} else {
throw new AggregatorFactoryNotMergeableException(this, other);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@
import io.druid.query.QueryRunner;
import org.joda.time.Interval;

import javax.annotation.Nullable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

Expand Down Expand Up @@ -80,15 +82,27 @@ protected AbstractTask(
this.context = context;
}

public static String makeId(String id, final String typeName, String dataSource, Interval interval)
static String getOrMakeId(String id, final String typeName, String dataSource)
{
return id != null ? id : joinId(
typeName,
dataSource,
interval.getStart(),
interval.getEnd(),
DateTimes.nowUtc().toString()
);
return getOrMakeId(id, typeName, dataSource, null);
}

static String getOrMakeId(String id, final String typeName, String dataSource, @Nullable Interval interval)
{
if (id != null) {
return id;
}

final List<Object> objects = new ArrayList<>();
objects.add(typeName);
objects.add(dataSource);
if (interval != null) {
objects.add(interval.getStart());
objects.add(interval.getEnd());
}
objects.add(DateTimes.nowUtc().toString());

return joinId(objects);
}

@JsonProperty
Expand Down Expand Up @@ -167,7 +181,12 @@ public String toString()
*
* @return string of joined objects
*/
public static String joinId(Object... objects)
static String joinId(List<Object> objects)
{
return ID_JOINER.join(objects);
}

static String joinId(Object...objects)
{
return ID_JOINER.join(objects);
}
Expand Down Expand Up @@ -202,7 +221,7 @@ public int hashCode()
return id.hashCode();
}

protected List<TaskLock> getTaskLocks(TaskActionClient client) throws IOException
static List<TaskLock> getTaskLocks(TaskActionClient client) throws IOException
{
return client.submit(new LockListAction());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public ArchiveTask(
)
{
super(
makeId(id, "archive", dataSource, interval),
getOrMakeId(id, "archive", dataSource, interval),
dataSource,
interval,
context
Expand Down
Loading