Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/content/ingestion/batch-ingestion.md
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,7 @@ The configuration options are:
|type|Type of partitionSpec to be used.|"hashed"|
|targetPartitionSize|Target number of rows to include in a partition, should be a number that targets segments of 500MB\~1GB.|either this or numShards|
|numShards|Specify the number of partitions directly, instead of a target partition size. Ingestion will run faster, since it can skip the step necessary to select a number of partitions automatically.|either this or targetPartitionSize|
|partitionDimensions|The dimensions to partition on. Leave blank to select all dimensions. Only used with numShards, will be ignored when targetPartitionSize is set|no|

#### Single-dimension partitioning

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ public boolean run()
new HashBasedNumberedShardSpec(
i,
numberOfShards,
null,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it appears that for "hashed" partition spec, partitionDimensions is completely ignored if "targetPartitionSize" was set. from documentation, it appears they should have been considered.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add doc for it.

HadoopDruidIndexerConfig.JSON_MAPPER
),
shardCount++
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,12 @@ public boolean run()
for (int i = 0; i < shardsPerInterval; i++) {
specs.add(
new HadoopyShardSpec(
new HashBasedNumberedShardSpec(i, shardsPerInterval, HadoopDruidIndexerConfig.JSON_MAPPER),
new HashBasedNumberedShardSpec(
i,
shardsPerInterval,
config.getPartitionsSpec().getPartitionDimensions(),
HadoopDruidIndexerConfig.JSON_MAPPER
),
shardCount++
)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,34 +20,51 @@
package io.druid.indexer.partitions;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableList;
import io.druid.indexer.DetermineHashedPartitionsJob;
import io.druid.indexer.HadoopDruidIndexerConfig;
import io.druid.indexer.Jobby;

import javax.annotation.Nullable;
import java.util.List;

public class HashedPartitionsSpec extends AbstractPartitionsSpec
{
private static final List<String> DEFAULT_PARTITION_DIMENSIONS = ImmutableList.of();

public static HashedPartitionsSpec makeDefaultHashedPartitionsSpec()
{
return new HashedPartitionsSpec(null, null, null, null);
return new HashedPartitionsSpec(null, null, null, null, null);
}

@JsonIgnore
private final List<String> partitionDimensions;

@JsonCreator
public HashedPartitionsSpec(
@JsonProperty("targetPartitionSize") @Nullable Long targetPartitionSize,
@JsonProperty("maxPartitionSize") @Nullable Long maxPartitionSize,
@JsonProperty("assumeGrouped") @Nullable Boolean assumeGrouped,
@JsonProperty("numShards") @Nullable Integer numShards
@JsonProperty("numShards") @Nullable Integer numShards,
@JsonProperty("partitionDimensions") @Nullable List<String> partitionDimensions
)
{
super(targetPartitionSize, maxPartitionSize, assumeGrouped, numShards);
this.partitionDimensions = partitionDimensions == null ? DEFAULT_PARTITION_DIMENSIONS : partitionDimensions;
}

@Override
public Jobby getPartitionJob(HadoopDruidIndexerConfig config)
{
return new DetermineHashedPartitionsJob(config);
}

@Override
@JsonProperty
public List<String> getPartitionDimensions()
{
return partitionDimensions;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import io.druid.indexer.HadoopDruidIndexerConfig;
import io.druid.indexer.Jobby;

import java.util.List;

@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = HashedPartitionsSpec.class)
@JsonSubTypes(value = {
@JsonSubTypes.Type(name = "dimension", value = SingleDimensionPartitionsSpec.class),
Expand All @@ -51,4 +53,6 @@ public interface PartitionsSpec
@JsonProperty
public int getNumShards();

@JsonProperty
public List<String> getPartitionDimensions();
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,13 @@

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableList;
import io.druid.indexer.DeterminePartitionsJob;
import io.druid.indexer.HadoopDruidIndexerConfig;
import io.druid.indexer.Jobby;

import javax.annotation.Nullable;
import java.util.List;

public class SingleDimensionPartitionsSpec extends AbstractPartitionsSpec
{
Expand Down Expand Up @@ -57,4 +59,11 @@ public Jobby getPartitionJob(HadoopDruidIndexerConfig config)
{
return new DeterminePartitionsJob(config);
}

@Override
@JsonProperty
public List<String> getPartitionDimensions()
{
return ImmutableList.of();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why doesn't this return "partitionDimension"?

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -393,7 +393,7 @@ private HadoopDruidIndexerConfig makeHadoopDruidIndexerConfig(Map<String, Object
INTERVAL_FULL.getStart(),
ImmutableList.of(
new HadoopyShardSpec(
new HashBasedNumberedShardSpec(0, 1, HadoopDruidIndexerConfig.JSON_MAPPER),
new HashBasedNumberedShardSpec(0, 1, null, HadoopDruidIndexerConfig.JSON_MAPPER),
0
)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ public DetermineHashedPartitionsJobTest(String dataFilePath, long targetPartitio
new HadoopTuningConfig(
tmpDir.getAbsolutePath(),
null,
new HashedPartitionsSpec(targetPartitionSize, null, true, null),
new HashedPartitionsSpec(targetPartitionSize, null, true, null, null),
null,
null,
null,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ public void testHashedBucketSelection()
List<HadoopyShardSpec> specs = Lists.newArrayList();
final int partitionCount = 10;
for (int i = 0; i < partitionCount; i++) {
specs.add(new HadoopyShardSpec(new HashBasedNumberedShardSpec(i, partitionCount, new DefaultObjectMapper()), i));
specs.add(new HadoopyShardSpec(new HashBasedNumberedShardSpec(i, partitionCount, null, new DefaultObjectMapper()), i));
}

HadoopIngestionSpec spec = new HadoopIngestionSpec(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -517,7 +517,7 @@ private List<ShardSpec> constructShardSpecFromShardInfo(String partitionType, Ob
List<ShardSpec> specs = Lists.newArrayList();
if (partitionType.equals("hashed")) {
for (Integer[] shardInfo : (Integer[][]) shardInfoForEachShard) {
specs.add(new HashBasedNumberedShardSpec(shardInfo[0], shardInfo[1], HadoopDruidIndexerConfig.JSON_MAPPER));
specs.add(new HashBasedNumberedShardSpec(shardInfo[0], shardInfo[1], null, HadoopDruidIndexerConfig.JSON_MAPPER));
}
} else if (partitionType.equals("single")) {
int partitionNum = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import io.druid.jackson.DefaultObjectMapper;
import org.junit.Assert;
import org.junit.Test;
Expand Down Expand Up @@ -68,6 +69,12 @@ public void testHashedPartitionsSpec() throws Exception
150
);

Assert.assertEquals(
"getPartitionDimensions",
partitionsSpec.getPartitionDimensions(),
ImmutableList.of()
);

Assert.assertTrue("partitionsSpec", partitionsSpec instanceof HashedPartitionsSpec);
}
}
Expand Down Expand Up @@ -114,6 +121,12 @@ public void testHashedPartitionsSpecShardCount() throws Exception
2
);

Assert.assertEquals(
"getPartitionDimensions",
partitionsSpec.getPartitionDimensions(),
ImmutableList.of()
);

Assert.assertTrue("partitionsSpec", partitionsSpec instanceof HashedPartitionsSpec);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ public TaskStatus run(TaskToolbox toolbox) throws Exception
if (numShards > 0) {
shardSpecs = Lists.newArrayList();
for (int i = 0; i < numShards; i++) {
shardSpecs.add(new HashBasedNumberedShardSpec(i, numShards, jsonMapper));
shardSpecs.add(new HashBasedNumberedShardSpec(i, numShards, null, jsonMapper));
}
} else {
shardSpecs = ImmutableList.<ShardSpec>of(new NoneShardSpec());
Expand Down Expand Up @@ -304,7 +304,7 @@ private List<ShardSpec> determinePartitions(
shardSpecs.add(new NoneShardSpec());
} else {
for (int i = 0; i < numberOfShards; ++i) {
shardSpecs.add(new HashBasedNumberedShardSpec(i, numberOfShards, jsonMapper));
shardSpecs.add(new HashBasedNumberedShardSpec(i, numberOfShards, null, jsonMapper));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,31 +21,48 @@

import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Function;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.hash.HashFunction;
import com.google.common.hash.Hashing;
import io.druid.data.input.InputRow;
import io.druid.data.input.Rows;

import javax.annotation.Nullable;
import java.util.List;

public class HashBasedNumberedShardSpec extends NumberedShardSpec
{
private static final HashFunction hashFunction = Hashing.murmur3_32();
private static final List<String> DEFAULT_PARTITION_DIMENSIONS = ImmutableList.of();

private final ObjectMapper jsonMapper;
@JsonIgnore
private final List<String> partitionDimensions;

@JsonCreator
public HashBasedNumberedShardSpec(
@JsonProperty("partitionNum") int partitionNum,
@JsonProperty("partitions") int partitions,
@JsonProperty("partitionDimensions") @Nullable List<String> partitionDimensions,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i don't see this in the docs, or i am missing something ?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The HashBasedNumberedShardSpec is used internal, so there is no docs.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@binlijin I don't understand. Why should people not use this? Why not document it?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@fjy For batch ingestion, people use PartitionsSpec, which will use HashBasedNumberedShardSpec and SingleDimensionShardSpec internal for it's work.
http://druid.io/docs/latest/ingestion/batch-ingestion.html

@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = HashedPartitionsSpec.class)
@JsonSubTypes(value = {
    @JsonSubTypes.Type(name = "dimension", value = SingleDimensionPartitionsSpec.class),
    @JsonSubTypes.Type(name = "hashed", value = HashedPartitionsSpec.class)
})
public interface PartitionsSpec

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@fjy For realtime ingestion, people only use none/linear/numbered shardSpec, and not use HashBasedNumberedShardSpec and SingleDimensionShardSpec directly.
http://druid.io/docs/latest/ingestion/realtime-ingestion.html

@JacksonInject ObjectMapper jsonMapper
)
{
super(partitionNum, partitions);
this.jsonMapper = jsonMapper;
this.partitionDimensions = partitionDimensions == null ? DEFAULT_PARTITION_DIMENSIONS : partitionDimensions;
}

@JsonProperty("partitionDimensions")
public List<String> getPartitionDimensions()
{
return partitionDimensions;
}

@Override
Expand All @@ -56,7 +73,7 @@ public boolean isInChunk(long timestamp, InputRow inputRow)

protected int hash(long timestamp, InputRow inputRow)
{
final List<Object> groupKey = Rows.toGroupKey(timestamp, inputRow);
final List<Object> groupKey = getGroupKey(timestamp, inputRow);
try {
return hashFunction.hashBytes(jsonMapper.writeValueAsBytes(groupKey)).asInt();
}
Expand All @@ -65,12 +82,29 @@ protected int hash(long timestamp, InputRow inputRow)
}
}

List<Object> getGroupKey(final long timestamp, final InputRow inputRow)
{
if (partitionDimensions.isEmpty()) {
return Rows.toGroupKey(timestamp, inputRow);
} else {
return Lists.transform(partitionDimensions, new Function<String, Object>()
{
@Override
public Object apply(final String dim)
{
return inputRow.getDimension(dim);
}
});
}
}

@Override
public String toString()
{
return "HashBasedNumberedShardSpec{" +
"partitionNum=" + getPartitionNum() +
", partitions=" + getPartitions() +
", partitionDimensions=" + getPartitionDimensions() +
'}';
}

Expand Down
Loading