Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 3 additions & 11 deletions api/src/main/java/io/druid/timeline/partition/NoneShardSpec.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Range;
import com.google.common.collect.RangeSet;
import io.druid.data.input.InputRow;

import java.util.List;
Expand Down Expand Up @@ -71,19 +71,11 @@ public int getPartitionNum()
@Override
public ShardSpecLookup getLookup(final List<ShardSpec> shardSpecs)
{

return new ShardSpecLookup()
{
@Override
public ShardSpec getShardSpec(long timestamp, InputRow row)
{
return shardSpecs.get(0);
}
};
return (long timestamp, InputRow row) -> shardSpecs.get(0);
}

@Override
public Map<String, Range<String>> getDomain()
public Map<String, RangeSet<String>> getDomain()
{
return ImmutableMap.of();
}
Expand Down
4 changes: 2 additions & 2 deletions api/src/main/java/io/druid/timeline/partition/ShardSpec.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import com.google.common.collect.Range;
import com.google.common.collect.RangeSet;
import io.druid.data.input.InputRow;

import java.util.List;
Expand Down Expand Up @@ -50,5 +50,5 @@ public interface ShardSpec
*
* @return map of dimensions to its possible range. Dimensions with unknown possible range are not mapped
*/
Map<String, Range<String>> getDomain();
Map<String, RangeSet<String>> getDomain();
}
4 changes: 2 additions & 2 deletions api/src/test/java/io/druid/timeline/DataSegmentTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Range;
import com.google.common.collect.RangeSet;
import com.google.common.collect.Sets;
import io.druid.TestObjectMapper;
import io.druid.data.input.InputRow;
Expand Down Expand Up @@ -82,7 +82,7 @@ public ShardSpecLookup getLookup(List<ShardSpec> shardSpecs)
}

@Override
public Map<String, Range<String>> getDomain()
public Map<String, RangeSet<String>> getDomain()
{
return ImmutableMap.of();
}
Expand Down
18 changes: 15 additions & 3 deletions processing/src/main/java/io/druid/query/filter/DimFilterUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -122,12 +122,13 @@ public static <T> Set<T> filterShards(DimFilter dimFilter, Iterable<T> input, Fu
boolean include = true;

if (dimFilter != null && shard != null) {
Map<String, Range<String>> domain = shard.getDomain();
for (Map.Entry<String, Range<String>> entry : domain.entrySet()) {
Map<String, RangeSet<String>> domain = shard.getDomain();
for (Map.Entry<String, RangeSet<String>> entry : domain.entrySet()) {
String dimension = entry.getKey();
Optional<RangeSet<String>> optFilterRangeSet = dimensionRangeCache
.computeIfAbsent(dimension, d -> Optional.fromNullable(dimFilter.getDimensionRangeSet(d)));
if (optFilterRangeSet.isPresent() && optFilterRangeSet.get().subRangeSet(entry.getValue()).isEmpty()) {

if (optFilterRangeSet.isPresent() && hasEmptyIntersection(optFilterRangeSet.get(), entry.getValue())) {
include = false;
}
}
Expand All @@ -139,4 +140,15 @@ public static <T> Set<T> filterShards(DimFilter dimFilter, Iterable<T> input, Fu
}
return retSet;
}

private static boolean hasEmptyIntersection(RangeSet<String> r1, RangeSet<String> r2)
{
for (Range<String> range : r2.asRanges()) {
if (!r1.subRangeSet(range).isEmpty()) {
return false;
}
}

return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import com.google.common.collect.Range;
import com.google.common.collect.RangeSet;
import com.google.common.collect.Sets;
import com.google.common.collect.TreeRangeSet;
import io.druid.timeline.partition.ShardSpec;
import org.easymock.EasyMock;
import org.junit.Assert;
Expand Down Expand Up @@ -113,8 +114,10 @@ private static RangeSet<String> rangeSet(List<Range<String>> ranges)
private static ShardSpec shardSpec(String dimension, Range<String> range)
{
ShardSpec shard = EasyMock.createMock(ShardSpec.class);
RangeSet<String> rangeSet = TreeRangeSet.create();
rangeSet.add(range);
EasyMock.expect(shard.getDomain())
.andReturn(ImmutableMap.of(dimension, range))
.andReturn(ImmutableMap.of(dimension, rangeSet))
.anyTimes();
return shard;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,11 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Function;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Range;
import com.google.common.collect.RangeSet;
import com.google.common.hash.HashFunction;
import com.google.common.hash.Hashing;
import io.druid.data.input.InputRow;
Expand Down Expand Up @@ -90,14 +89,10 @@ List<Object> getGroupKey(final long timestamp, final InputRow inputRow)
if (partitionDimensions.isEmpty()) {
return Rows.toGroupKey(timestamp, inputRow);
} else {
return Lists.transform(partitionDimensions, new Function<String, Object>()
{
@Override
public Object apply(final String dim)
{
return inputRow.getDimension(dim);
}
});
return Lists.transform(
partitionDimensions,
dim -> inputRow.getDimension(dim)
);
}
}

Expand All @@ -114,19 +109,14 @@ public String toString()
@Override
public ShardSpecLookup getLookup(final List<ShardSpec> shardSpecs)
{
return new ShardSpecLookup()
{
@Override
public ShardSpec getShardSpec(long timestamp, InputRow row)
{
int index = Math.abs(hash(timestamp, row) % getPartitions());
return shardSpecs.get(index);
}
return (long timestamp, InputRow row) -> {
int index = Math.abs(hash(timestamp, row) % getPartitions());
return shardSpecs.get(index);
};
}

@Override
public Map<String, Range<String>> getDomain()
public Map<String, RangeSet<String>> getDomain()
{
return ImmutableMap.of();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Range;
import com.google.common.collect.RangeSet;
import io.druid.data.input.InputRow;

import java.util.List;
Expand Down Expand Up @@ -54,18 +54,11 @@ public int getPartitionNum()
@Override
public ShardSpecLookup getLookup(final List<ShardSpec> shardSpecs)
{
return new ShardSpecLookup()
{
@Override
public ShardSpec getShardSpec(long timestamp, InputRow row)
{
return shardSpecs.get(0);
}
};
return (long timestamp, InputRow row) -> shardSpecs.get(0);
}

@Override
public Map<String, Range<String>> getDomain()
public Map<String, RangeSet<String>> getDomain()
{
return ImmutableMap.of();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Range;
import com.google.common.collect.RangeSet;
import io.druid.data.input.InputRow;

import java.util.List;
Expand Down Expand Up @@ -67,18 +67,11 @@ public int getPartitionNum()
@Override
public ShardSpecLookup getLookup(final List<ShardSpec> shardSpecs)
{
return new ShardSpecLookup()
{
@Override
public ShardSpec getShardSpec(long timestamp, InputRow row)
{
return shardSpecs.get(0);
}
};
return (long timestamp, InputRow row) -> shardSpecs.get(0);
}

@Override
public Map<String, Range<String>> getDomain()
public Map<String, RangeSet<String>> getDomain()
{
return ImmutableMap.of();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Range;

import com.google.common.collect.RangeSet;
import com.google.common.collect.TreeRangeSet;
import io.druid.data.input.InputRow;
import io.druid.java.util.common.ISE;

Expand Down Expand Up @@ -102,35 +104,30 @@ public int getPartitionNum()
@Override
public ShardSpecLookup getLookup(final List<ShardSpec> shardSpecs)
{
return new ShardSpecLookup()
{
@Override
public ShardSpec getShardSpec(long timestamp, InputRow row)
{
for (ShardSpec spec : shardSpecs) {
if (spec.isInChunk(timestamp, row)) {
return spec;
}
return (long timestamp, InputRow row) -> {
for (ShardSpec spec : shardSpecs) {
if (spec.isInChunk(timestamp, row)) {
return spec;
}
throw new ISE("row[%s] doesn't fit in any shard[%s]", row, shardSpecs);
}
throw new ISE("row[%s] doesn't fit in any shard[%s]", row, shardSpecs);
};
}

@Override
public Map<String, Range<String>> getDomain()
public Map<String, RangeSet<String>> getDomain()
{
Range<String> range;
RangeSet<String> rangeSet = TreeRangeSet.create();
if (start == null && end == null) {
range = Range.all();
rangeSet.add(Range.all());
} else if (start == null) {
range = Range.atMost(end);
rangeSet.add(Range.atMost(end));
} else if (end == null) {
range = Range.atLeast(start);
rangeSet.add(Range.atLeast(start));
} else {
range = Range.closed(start, end);
rangeSet.add(Range.closed(start, end));
}
return ImmutableMap.of(dimension, range);
return ImmutableMap.of(dimension, rangeSet);
}

public void setPartitionNum(int partitionNum)
Expand Down