From 896bd21fa0e91369009a6e92f0fe38128c511e4a Mon Sep 17 00:00:00 2001 From: Niketh Sabbineni Date: Tue, 3 Apr 2018 16:19:35 -0700 Subject: [PATCH 1/2] Allow getDomain to return disjointed intervals --- .../timeline/partition/NoneShardSpec.java | 14 +++-------- .../druid/timeline/partition/ShardSpec.java | 4 +-- .../io/druid/timeline/DataSegmentTest.java | 4 +-- .../io/druid/query/filter/DimFilterUtils.java | 18 ++++++++++--- .../query/filter/DimFilterUtilsTest.java | 5 +++- .../partition/HashBasedNumberedShardSpec.java | 24 ++++++------------ .../timeline/partition/LinearShardSpec.java | 13 +++------- .../timeline/partition/NumberedShardSpec.java | 13 +++------- .../partition/SingleDimensionShardSpec.java | 25 ++++++++----------- 9 files changed, 50 insertions(+), 70 deletions(-) diff --git a/api/src/main/java/io/druid/timeline/partition/NoneShardSpec.java b/api/src/main/java/io/druid/timeline/partition/NoneShardSpec.java index 1fcadfb42d23..d20c3b5458c2 100644 --- a/api/src/main/java/io/druid/timeline/partition/NoneShardSpec.java +++ b/api/src/main/java/io/druid/timeline/partition/NoneShardSpec.java @@ -22,7 +22,7 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonIgnore; import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Range; +import com.google.common.collect.RangeSet; import io.druid.data.input.InputRow; import java.util.List; @@ -71,19 +71,11 @@ public int getPartitionNum() @Override public ShardSpecLookup getLookup(final List shardSpecs) { - - return new ShardSpecLookup() - { - @Override - public ShardSpec getShardSpec(long timestamp, InputRow row) - { - return shardSpecs.get(0); - } - }; + return (long timestamp, InputRow row) -> shardSpecs.get(0); } @Override - public Map> getDomain() + public Map> getDomain() { return ImmutableMap.of(); } diff --git a/api/src/main/java/io/druid/timeline/partition/ShardSpec.java b/api/src/main/java/io/druid/timeline/partition/ShardSpec.java index 5461544c609c..c691c3959dbb 100644 --- a/api/src/main/java/io/druid/timeline/partition/ShardSpec.java +++ b/api/src/main/java/io/druid/timeline/partition/ShardSpec.java @@ -21,7 +21,7 @@ import com.fasterxml.jackson.annotation.JsonSubTypes; import com.fasterxml.jackson.annotation.JsonTypeInfo; -import com.google.common.collect.Range; +import com.google.common.collect.RangeSet; import io.druid.data.input.InputRow; import java.util.List; @@ -50,5 +50,5 @@ public interface ShardSpec * * @return map of dimensions to its possible range. Dimensions with unknown possible range are not mapped */ - Map> getDomain(); + Map> getDomain(); } diff --git a/api/src/test/java/io/druid/timeline/DataSegmentTest.java b/api/src/test/java/io/druid/timeline/DataSegmentTest.java index bda7947dd2b2..f2743d24b0a2 100644 --- a/api/src/test/java/io/druid/timeline/DataSegmentTest.java +++ b/api/src/test/java/io/druid/timeline/DataSegmentTest.java @@ -24,7 +24,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; -import com.google.common.collect.Range; +import com.google.common.collect.RangeSet; import com.google.common.collect.Sets; import io.druid.TestObjectMapper; import io.druid.data.input.InputRow; @@ -82,7 +82,7 @@ public ShardSpecLookup getLookup(List shardSpecs) } @Override - public Map> getDomain() + public Map> getDomain() { return ImmutableMap.of(); } diff --git a/processing/src/main/java/io/druid/query/filter/DimFilterUtils.java b/processing/src/main/java/io/druid/query/filter/DimFilterUtils.java index 42ee9eeba77a..25c991662ed2 100644 --- a/processing/src/main/java/io/druid/query/filter/DimFilterUtils.java +++ b/processing/src/main/java/io/druid/query/filter/DimFilterUtils.java @@ -122,12 +122,13 @@ public static Set filterShards(DimFilter dimFilter, Iterable input, Fu boolean include = true; if (dimFilter != null && shard != null) { - Map> domain = shard.getDomain(); - for (Map.Entry> entry : domain.entrySet()) { + Map> domain = shard.getDomain(); + for (Map.Entry> entry : domain.entrySet()) { String dimension = entry.getKey(); Optional> optFilterRangeSet = dimensionRangeCache .computeIfAbsent(dimension, d -> Optional.fromNullable(dimFilter.getDimensionRangeSet(d))); - if (optFilterRangeSet.isPresent() && optFilterRangeSet.get().subRangeSet(entry.getValue()).isEmpty()) { + + if (optFilterRangeSet.isPresent() && hasEmptyIntersection(optFilterRangeSet.get(), entry.getValue())) { include = false; } } @@ -139,4 +140,15 @@ public static Set filterShards(DimFilter dimFilter, Iterable input, Fu } return retSet; } + + private static boolean hasEmptyIntersection(RangeSet r1, RangeSet r2) + { + for (Range range : r2.asRanges()) { + if (!r1.subRangeSet(range).isEmpty()) { + return false; + } + } + + return true; + } } diff --git a/processing/src/test/java/io/druid/query/filter/DimFilterUtilsTest.java b/processing/src/test/java/io/druid/query/filter/DimFilterUtilsTest.java index 421aa47bae3c..480062510eaf 100644 --- a/processing/src/test/java/io/druid/query/filter/DimFilterUtilsTest.java +++ b/processing/src/test/java/io/druid/query/filter/DimFilterUtilsTest.java @@ -29,6 +29,7 @@ import com.google.common.collect.Range; import com.google.common.collect.RangeSet; import com.google.common.collect.Sets; +import com.google.common.collect.TreeRangeSet; import io.druid.timeline.partition.ShardSpec; import org.easymock.EasyMock; import org.junit.Assert; @@ -113,8 +114,10 @@ private static RangeSet rangeSet(List> ranges) private static ShardSpec shardSpec(String dimension, Range range) { ShardSpec shard = EasyMock.createMock(ShardSpec.class); + RangeSet rangeSet = TreeRangeSet.create(); + rangeSet.add(range); EasyMock.expect(shard.getDomain()) - .andReturn(ImmutableMap.of(dimension, range)) + .andReturn(ImmutableMap.of(dimension, rangeSet)) .anyTimes(); return shard; } diff --git a/server/src/main/java/io/druid/timeline/partition/HashBasedNumberedShardSpec.java b/server/src/main/java/io/druid/timeline/partition/HashBasedNumberedShardSpec.java index b06ba203ff94..fc523992a36d 100644 --- a/server/src/main/java/io/druid/timeline/partition/HashBasedNumberedShardSpec.java +++ b/server/src/main/java/io/druid/timeline/partition/HashBasedNumberedShardSpec.java @@ -25,12 +25,11 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.base.Function; import com.google.common.base.Throwables; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; -import com.google.common.collect.Range; +import com.google.common.collect.RangeSet; import com.google.common.hash.HashFunction; import com.google.common.hash.Hashing; import io.druid.data.input.InputRow; @@ -90,14 +89,10 @@ List getGroupKey(final long timestamp, final InputRow inputRow) if (partitionDimensions.isEmpty()) { return Rows.toGroupKey(timestamp, inputRow); } else { - return Lists.transform(partitionDimensions, new Function() - { - @Override - public Object apply(final String dim) - { - return inputRow.getDimension(dim); - } - }); + return Lists.transform( + partitionDimensions, + dim -> inputRow.getDimension(dim) + ); } } @@ -114,19 +109,14 @@ public String toString() @Override public ShardSpecLookup getLookup(final List shardSpecs) { - return new ShardSpecLookup() - { - @Override - public ShardSpec getShardSpec(long timestamp, InputRow row) - { + return (long timestamp, InputRow row) -> { int index = Math.abs(hash(timestamp, row) % getPartitions()); return shardSpecs.get(index); - } }; } @Override - public Map> getDomain() + public Map> getDomain() { return ImmutableMap.of(); } diff --git a/server/src/main/java/io/druid/timeline/partition/LinearShardSpec.java b/server/src/main/java/io/druid/timeline/partition/LinearShardSpec.java index 000f035921d8..b2358eb707db 100644 --- a/server/src/main/java/io/druid/timeline/partition/LinearShardSpec.java +++ b/server/src/main/java/io/druid/timeline/partition/LinearShardSpec.java @@ -23,7 +23,7 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Range; +import com.google.common.collect.RangeSet; import io.druid.data.input.InputRow; import java.util.List; @@ -54,18 +54,11 @@ public int getPartitionNum() @Override public ShardSpecLookup getLookup(final List shardSpecs) { - return new ShardSpecLookup() - { - @Override - public ShardSpec getShardSpec(long timestamp, InputRow row) - { - return shardSpecs.get(0); - } - }; + return (long timestamp, InputRow row) -> shardSpecs.get(0); } @Override - public Map> getDomain() + public Map> getDomain() { return ImmutableMap.of(); } diff --git a/server/src/main/java/io/druid/timeline/partition/NumberedShardSpec.java b/server/src/main/java/io/druid/timeline/partition/NumberedShardSpec.java index 673bed3cf6a5..ea955e31f8a3 100644 --- a/server/src/main/java/io/druid/timeline/partition/NumberedShardSpec.java +++ b/server/src/main/java/io/druid/timeline/partition/NumberedShardSpec.java @@ -24,7 +24,7 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Range; +import com.google.common.collect.RangeSet; import io.druid.data.input.InputRow; import java.util.List; @@ -67,18 +67,11 @@ public int getPartitionNum() @Override public ShardSpecLookup getLookup(final List shardSpecs) { - return new ShardSpecLookup() - { - @Override - public ShardSpec getShardSpec(long timestamp, InputRow row) - { - return shardSpecs.get(0); - } - }; + return (long timestamp, InputRow row) -> shardSpecs.get(0); } @Override - public Map> getDomain() + public Map> getDomain() { return ImmutableMap.of(); } diff --git a/server/src/main/java/io/druid/timeline/partition/SingleDimensionShardSpec.java b/server/src/main/java/io/druid/timeline/partition/SingleDimensionShardSpec.java index 8cbefe40f8a1..4638b8f2fd21 100644 --- a/server/src/main/java/io/druid/timeline/partition/SingleDimensionShardSpec.java +++ b/server/src/main/java/io/druid/timeline/partition/SingleDimensionShardSpec.java @@ -23,6 +23,8 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.Range; +import com.google.common.collect.RangeSet; +import com.google.common.collect.TreeRangeSet; import io.druid.data.input.InputRow; import io.druid.java.util.common.ISE; @@ -102,35 +104,30 @@ public int getPartitionNum() @Override public ShardSpecLookup getLookup(final List shardSpecs) { - return new ShardSpecLookup() - { - @Override - public ShardSpec getShardSpec(long timestamp, InputRow row) - { + return (long timestamp, InputRow row) -> { for (ShardSpec spec : shardSpecs) { if (spec.isInChunk(timestamp, row)) { return spec; } } throw new ISE("row[%s] doesn't fit in any shard[%s]", row, shardSpecs); - } - }; + }; } @Override - public Map> getDomain() + public Map> getDomain() { - Range range; + RangeSet rangeSet = TreeRangeSet.create(); if (start == null && end == null) { - range = Range.all(); + rangeSet.add(Range.all()); } else if (start == null) { - range = Range.atMost(end); + rangeSet.add(Range.atMost(end)); } else if (end == null) { - range = Range.atLeast(start); + rangeSet.add(Range.atLeast(start)); } else { - range = Range.closed(start, end); + rangeSet.add(Range.closed(start, end)); } - return ImmutableMap.of(dimension, range); + return ImmutableMap.of(dimension, rangeSet); } public void setPartitionNum(int partitionNum) From b1acaa20a66bdaf23d6e5d03bbd804e5e18ca3f0 Mon Sep 17 00:00:00 2001 From: Niketh Sabbineni Date: Tue, 3 Apr 2018 16:50:10 -0700 Subject: [PATCH 2/2] Indentation issues --- .../partition/HashBasedNumberedShardSpec.java | 4 ++-- .../timeline/partition/SingleDimensionShardSpec.java | 12 ++++++------ 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/server/src/main/java/io/druid/timeline/partition/HashBasedNumberedShardSpec.java b/server/src/main/java/io/druid/timeline/partition/HashBasedNumberedShardSpec.java index fc523992a36d..2f1d32e7964a 100644 --- a/server/src/main/java/io/druid/timeline/partition/HashBasedNumberedShardSpec.java +++ b/server/src/main/java/io/druid/timeline/partition/HashBasedNumberedShardSpec.java @@ -110,8 +110,8 @@ public String toString() public ShardSpecLookup getLookup(final List shardSpecs) { return (long timestamp, InputRow row) -> { - int index = Math.abs(hash(timestamp, row) % getPartitions()); - return shardSpecs.get(index); + int index = Math.abs(hash(timestamp, row) % getPartitions()); + return shardSpecs.get(index); }; } diff --git a/server/src/main/java/io/druid/timeline/partition/SingleDimensionShardSpec.java b/server/src/main/java/io/druid/timeline/partition/SingleDimensionShardSpec.java index 4638b8f2fd21..aa5527429f71 100644 --- a/server/src/main/java/io/druid/timeline/partition/SingleDimensionShardSpec.java +++ b/server/src/main/java/io/druid/timeline/partition/SingleDimensionShardSpec.java @@ -105,13 +105,13 @@ public int getPartitionNum() public ShardSpecLookup getLookup(final List shardSpecs) { return (long timestamp, InputRow row) -> { - for (ShardSpec spec : shardSpecs) { - if (spec.isInChunk(timestamp, row)) { - return spec; - } + for (ShardSpec spec : shardSpecs) { + if (spec.isInChunk(timestamp, row)) { + return spec; } - throw new ISE("row[%s] doesn't fit in any shard[%s]", row, shardSpecs); - }; + } + throw new ISE("row[%s] doesn't fit in any shard[%s]", row, shardSpecs); + }; } @Override