From f783b026c549935f3e1102cd6b13fbb267f874bf Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Fri, 13 Oct 2017 11:38:17 -0700 Subject: [PATCH 1/2] Fix havingSpec on complex aggregators. - Uses the technique from #4883 on DimFilterHavingSpec too. - Also uses Transformers from #4890, necessitating a move of that and other related classes from druid-server to druid-processing. They probably make more sense there anyway. - Adds a SQL query test. Fixes #4957. --- .../indexing/kafka/KafkaIndexTaskTest.java | 4 +- .../hadoop/DatasourceIngestionSpec.java | 2 +- .../indexer/BatchDeltaIngestionTest.java | 2 +- .../indexer/HadoopDruidIndexerMapperTest.java | 4 +- .../IngestSegmentFirehoseFactory.java | 2 +- .../indexing/common/task/IndexTaskTest.java | 4 +- .../common/task/RealtimeIndexTaskTest.java | 4 +- .../IngestSegmentFirehoseFactoryTest.java | 4 +- ...estSegmentFirehoseFactoryTimelineTest.java | 2 +- .../groupby/having/DimFilterHavingSpec.java | 178 ++++++++++++++++-- .../transform}/ExpressionTransform.java | 2 +- .../druid/segment/transform}/RowFunction.java | 2 +- .../druid/segment/transform}/Transform.java | 2 +- .../segment/transform}/TransformSpec.java | 19 +- .../druid/segment/transform}/Transformer.java | 7 +- .../TransformingInputRowParser.java | 2 +- .../TransformingStringInputRowParser.java | 2 +- .../query/groupby/GroupByQueryRunnerTest.java | 9 +- .../having/DimFilterHavingSpecTest.java | 8 +- .../io/druid/segment/indexing/DataSchema.java | 1 + .../firehose/IngestSegmentFirehose.java | 4 +- .../segment/indexing/DataSchemaTest.java | 2 + .../segment/indexing/TransformSpecTest.java | 2 + .../firehose/IngestSegmentFirehoseTest.java | 2 +- .../io/druid/sql/calcite/rel/DruidQuery.java | 2 +- .../druid/sql/calcite/CalciteQueryTest.java | 116 +++++++++++- 26 files changed, 331 insertions(+), 57 deletions(-) rename {server/src/main/java/io/druid/segment/indexing => processing/src/main/java/io/druid/segment/transform}/ExpressionTransform.java (98%) rename {server/src/main/java/io/druid/segment/indexing => processing/src/main/java/io/druid/segment/transform}/RowFunction.java (96%) rename {server/src/main/java/io/druid/segment/indexing => processing/src/main/java/io/druid/segment/transform}/Transform.java (98%) rename {server/src/main/java/io/druid/segment/indexing => processing/src/main/java/io/druid/segment/transform}/TransformSpec.java (89%) rename {server/src/main/java/io/druid/segment/indexing => processing/src/main/java/io/druid/segment/transform}/Transformer.java (95%) rename {server/src/main/java/io/druid/segment/indexing => processing/src/main/java/io/druid/segment/transform}/TransformingInputRowParser.java (98%) rename {server/src/main/java/io/druid/segment/indexing => processing/src/main/java/io/druid/segment/transform}/TransformingStringInputRowParser.java (98%) diff --git a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java index af0574da8a90..e7c94624ff34 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java @@ -112,8 +112,8 @@ import io.druid.segment.TestHelper; import io.druid.segment.column.DictionaryEncodedColumn; import io.druid.segment.indexing.DataSchema; -import io.druid.segment.indexing.ExpressionTransform; -import io.druid.segment.indexing.TransformSpec; +import io.druid.segment.transform.ExpressionTransform; +import io.druid.segment.transform.TransformSpec; import io.druid.segment.indexing.granularity.UniformGranularitySpec; import io.druid.segment.loading.DataSegmentPusher; import io.druid.segment.loading.LocalDataSegmentPusher; diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/hadoop/DatasourceIngestionSpec.java b/indexing-hadoop/src/main/java/io/druid/indexer/hadoop/DatasourceIngestionSpec.java index b55bdf4aa7c9..a69e880875c1 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/hadoop/DatasourceIngestionSpec.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/hadoop/DatasourceIngestionSpec.java @@ -26,7 +26,7 @@ import io.druid.java.util.common.JodaUtils; import io.druid.java.util.common.granularity.Granularity; import io.druid.query.filter.DimFilter; -import io.druid.segment.indexing.TransformSpec; +import io.druid.segment.transform.TransformSpec; import io.druid.timeline.DataSegment; import org.joda.time.Interval; diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/BatchDeltaIngestionTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/BatchDeltaIngestionTest.java index e1107ff06c7f..5ba57ac90d91 100644 --- a/indexing-hadoop/src/test/java/io/druid/indexer/BatchDeltaIngestionTest.java +++ b/indexing-hadoop/src/test/java/io/druid/indexer/BatchDeltaIngestionTest.java @@ -47,7 +47,7 @@ import io.druid.segment.QueryableIndexStorageAdapter; import io.druid.segment.StorageAdapter; import io.druid.segment.indexing.DataSchema; -import io.druid.segment.indexing.TransformSpec; +import io.druid.segment.transform.TransformSpec; import io.druid.segment.indexing.granularity.UniformGranularitySpec; import io.druid.segment.loading.LocalDataSegmentPuller; import io.druid.segment.realtime.firehose.IngestSegmentFirehose; diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/HadoopDruidIndexerMapperTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/HadoopDruidIndexerMapperTest.java index cf33fb83e19b..268fae619817 100644 --- a/indexing-hadoop/src/test/java/io/druid/indexer/HadoopDruidIndexerMapperTest.java +++ b/indexing-hadoop/src/test/java/io/druid/indexer/HadoopDruidIndexerMapperTest.java @@ -36,8 +36,8 @@ import io.druid.query.filter.SelectorDimFilter; import io.druid.segment.TestHelper; import io.druid.segment.indexing.DataSchema; -import io.druid.segment.indexing.ExpressionTransform; -import io.druid.segment.indexing.TransformSpec; +import io.druid.segment.transform.ExpressionTransform; +import io.druid.segment.transform.TransformSpec; import io.druid.segment.indexing.granularity.UniformGranularitySpec; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.NullWritable; diff --git a/indexing-service/src/main/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactory.java b/indexing-service/src/main/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactory.java index 18427125f9e3..7df5e0833a5c 100644 --- a/indexing-service/src/main/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactory.java +++ b/indexing-service/src/main/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactory.java @@ -42,7 +42,7 @@ import io.druid.query.filter.DimFilter; import io.druid.segment.IndexIO; import io.druid.segment.QueryableIndexStorageAdapter; -import io.druid.segment.indexing.TransformSpec; +import io.druid.segment.transform.TransformSpec; import io.druid.segment.loading.SegmentLoadingException; import io.druid.segment.realtime.firehose.IngestSegmentFirehose; import io.druid.segment.realtime.firehose.WindowedStorageAdapter; diff --git a/indexing-service/src/test/java/io/druid/indexing/common/task/IndexTaskTest.java b/indexing-service/src/test/java/io/druid/indexing/common/task/IndexTaskTest.java index e0606a8f8518..f9c85918b9fa 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/task/IndexTaskTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/task/IndexTaskTest.java @@ -57,8 +57,8 @@ import io.druid.segment.IndexMergerV9; import io.druid.segment.IndexSpec; import io.druid.segment.indexing.DataSchema; -import io.druid.segment.indexing.ExpressionTransform; -import io.druid.segment.indexing.TransformSpec; +import io.druid.segment.transform.ExpressionTransform; +import io.druid.segment.transform.TransformSpec; import io.druid.segment.indexing.granularity.ArbitraryGranularitySpec; import io.druid.segment.indexing.granularity.GranularitySpec; import io.druid.segment.indexing.granularity.UniformGranularitySpec; diff --git a/indexing-service/src/test/java/io/druid/indexing/common/task/RealtimeIndexTaskTest.java b/indexing-service/src/test/java/io/druid/indexing/common/task/RealtimeIndexTaskTest.java index a5b3c95a1df5..832fa0eb6362 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/task/RealtimeIndexTaskTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/task/RealtimeIndexTaskTest.java @@ -103,10 +103,10 @@ import io.druid.query.timeseries.TimeseriesResultValue; import io.druid.segment.TestHelper; import io.druid.segment.indexing.DataSchema; -import io.druid.segment.indexing.ExpressionTransform; +import io.druid.segment.transform.ExpressionTransform; import io.druid.segment.indexing.RealtimeIOConfig; import io.druid.segment.indexing.RealtimeTuningConfig; -import io.druid.segment.indexing.TransformSpec; +import io.druid.segment.transform.TransformSpec; import io.druid.segment.indexing.granularity.UniformGranularitySpec; import io.druid.segment.loading.SegmentLoaderConfig; import io.druid.segment.loading.SegmentLoaderLocalCacheManager; diff --git a/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java b/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java index bdda4aae5ca6..9d6e19f5266a 100644 --- a/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java @@ -71,8 +71,8 @@ import io.druid.segment.column.Column; import io.druid.segment.incremental.IncrementalIndex; import io.druid.segment.incremental.IncrementalIndexSchema; -import io.druid.segment.indexing.ExpressionTransform; -import io.druid.segment.indexing.TransformSpec; +import io.druid.segment.transform.ExpressionTransform; +import io.druid.segment.transform.TransformSpec; import io.druid.segment.loading.DataSegmentArchiver; import io.druid.segment.loading.DataSegmentKiller; import io.druid.segment.loading.DataSegmentMover; diff --git a/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTimelineTest.java b/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTimelineTest.java index 58d21ee3aa8f..79ff70523e17 100644 --- a/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTimelineTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTimelineTest.java @@ -59,7 +59,7 @@ import io.druid.segment.incremental.IncrementalIndex; import io.druid.segment.incremental.IncrementalIndexSchema; import io.druid.segment.incremental.IndexSizeExceededException; -import io.druid.segment.indexing.TransformSpec; +import io.druid.segment.transform.TransformSpec; import io.druid.segment.loading.SegmentLoaderConfig; import io.druid.segment.loading.SegmentLoaderLocalCacheManager; import io.druid.segment.loading.StorageLocationConfig; diff --git a/processing/src/main/java/io/druid/query/groupby/having/DimFilterHavingSpec.java b/processing/src/main/java/io/druid/query/groupby/having/DimFilterHavingSpec.java index 6051b96aac05..d0b8b72b6a13 100644 --- a/processing/src/main/java/io/druid/query/groupby/having/DimFilterHavingSpec.java +++ b/processing/src/main/java/io/druid/query/groupby/having/DimFilterHavingSpec.java @@ -23,29 +23,44 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Preconditions; import io.druid.common.guava.SettableSupplier; +import io.druid.data.input.InputRow; import io.druid.data.input.Row; +import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.filter.DimFilter; -import io.druid.query.filter.ValueMatcher; -import io.druid.query.groupby.RowBasedColumnSelectorFactory; import io.druid.segment.column.ValueType; +import io.druid.segment.transform.RowFunction; +import io.druid.segment.transform.Transform; +import io.druid.segment.transform.TransformSpec; +import io.druid.segment.transform.Transformer; +import org.joda.time.DateTime; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; import java.util.Map; +import java.util.Objects; public class DimFilterHavingSpec extends BaseHavingSpec { + private static final boolean DEFAULT_FINALIZE = true; + private final DimFilter dimFilter; - private final SettableSupplier rowSupplier; + private final boolean finalize; - private ValueMatcher valueMatcher; + private Map rowSignature = new HashMap<>(); + private Map aggregators = new HashMap<>(); + private Transformer transformer = null; private int evalCount; @JsonCreator public DimFilterHavingSpec( - @JsonProperty("filter") final DimFilter dimFilter + @JsonProperty("filter") final DimFilter dimFilter, + @JsonProperty("finalize") final Boolean finalize ) { this.dimFilter = Preconditions.checkNotNull(dimFilter, "filter"); - this.rowSupplier = new SettableSupplier<>(); + this.finalize = finalize == null ? DEFAULT_FINALIZE : finalize; } @JsonProperty("filter") @@ -54,11 +69,22 @@ public DimFilter getDimFilter() return dimFilter; } + @JsonProperty + public boolean isFinalize() + { + return finalize; + } + @Override public void setRowSignature(Map rowSignature) { - this.valueMatcher = dimFilter.toFilter() - .makeMatcher(RowBasedColumnSelectorFactory.create(rowSupplier, rowSignature)); + this.rowSignature = rowSignature; + } + + @Override + public void setAggregators(final Map aggregators) + { + this.aggregators = aggregators; } @Override @@ -66,17 +92,23 @@ public boolean eval(final Row row) { int oldEvalCount = evalCount; evalCount++; - rowSupplier.set(row); - final boolean retVal = valueMatcher.matches(); + + if (transformer == null) { + transformer = createTransformer(dimFilter, rowSignature, aggregators, finalize); + } + + final boolean retVal = transformer.transform(new RowAsInputRow(row)) != null; + if (evalCount != oldEvalCount + 1) { // Oops, someone was using this from two different threads, bad caller. throw new IllegalStateException("concurrent 'eval' calls not permitted!"); } + return retVal; } @Override - public boolean equals(Object o) + public boolean equals(final Object o) { if (this == o) { return true; @@ -84,16 +116,15 @@ public boolean equals(Object o) if (o == null || getClass() != o.getClass()) { return false; } - - DimFilterHavingSpec that = (DimFilterHavingSpec) o; - - return dimFilter.equals(that.dimFilter); + final DimFilterHavingSpec that = (DimFilterHavingSpec) o; + return finalize == that.finalize && + Objects.equals(dimFilter, that.dimFilter); } @Override public int hashCode() { - return dimFilter.hashCode(); + return Objects.hash(dimFilter, finalize); } @Override @@ -101,6 +132,121 @@ public String toString() { return "DimFilterHavingSpec{" + "dimFilter=" + dimFilter + + ", finalize=" + finalize + '}'; } + + private static Transformer createTransformer( + final DimFilter filter, + final Map rowSignature, + final Map aggregators, + final boolean finalize + ) + { + final List transforms = new ArrayList<>(); + + if (finalize) { + for (AggregatorFactory aggregator : aggregators.values()) { + final String name = aggregator.getName(); + + transforms.add( + new Transform() + { + @Override + public String getName() + { + return name; + } + + @Override + public RowFunction getRowFunction() + { + return row -> aggregator.finalizeComputation(row.getRaw(name)); + } + } + ); + } + } + + return new TransformSpec(filter, transforms).toTransformer(rowSignature); + } + + private static class RowAsInputRow implements InputRow + { + private final Row row; + + public RowAsInputRow(final Row row) + { + this.row = row; + } + + @Override + public List getDimensions() + { + return Collections.emptyList(); + } + + @Override + public long getTimestampFromEpoch() + { + return row.getTimestampFromEpoch(); + } + + @Override + public DateTime getTimestamp() + { + return row.getTimestamp(); + } + + @Override + public List getDimension(final String dimension) + { + return row.getDimension(dimension); + } + + @Override + public Object getRaw(final String dimension) + { + return row.getRaw(dimension); + } + + @Override + public Number getMetric(final String metric) + { + return row.getMetric(metric); + } + + @Override + public int compareTo(final Row o) + { + return row.compareTo(o); + } + + @Override + public boolean equals(final Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + final RowAsInputRow that = (RowAsInputRow) o; + return Objects.equals(row, that.row); + } + + @Override + public int hashCode() + { + return Objects.hash(row); + } + + @Override + public String toString() + { + return "RowAsInputRow{" + + "row=" + row + + '}'; + } + } } diff --git a/server/src/main/java/io/druid/segment/indexing/ExpressionTransform.java b/processing/src/main/java/io/druid/segment/transform/ExpressionTransform.java similarity index 98% rename from server/src/main/java/io/druid/segment/indexing/ExpressionTransform.java rename to processing/src/main/java/io/druid/segment/transform/ExpressionTransform.java index 7369a2fce8c5..d9a35b5c17e3 100644 --- a/server/src/main/java/io/druid/segment/indexing/ExpressionTransform.java +++ b/processing/src/main/java/io/druid/segment/transform/ExpressionTransform.java @@ -17,7 +17,7 @@ * under the License. */ -package io.druid.segment.indexing; +package io.druid.segment.transform; import com.fasterxml.jackson.annotation.JacksonInject; import com.fasterxml.jackson.annotation.JsonCreator; diff --git a/server/src/main/java/io/druid/segment/indexing/RowFunction.java b/processing/src/main/java/io/druid/segment/transform/RowFunction.java similarity index 96% rename from server/src/main/java/io/druid/segment/indexing/RowFunction.java rename to processing/src/main/java/io/druid/segment/transform/RowFunction.java index c9dabd744c57..6065ae94004b 100644 --- a/server/src/main/java/io/druid/segment/indexing/RowFunction.java +++ b/processing/src/main/java/io/druid/segment/transform/RowFunction.java @@ -17,7 +17,7 @@ * under the License. */ -package io.druid.segment.indexing; +package io.druid.segment.transform; import io.druid.data.input.Row; diff --git a/server/src/main/java/io/druid/segment/indexing/Transform.java b/processing/src/main/java/io/druid/segment/transform/Transform.java similarity index 98% rename from server/src/main/java/io/druid/segment/indexing/Transform.java rename to processing/src/main/java/io/druid/segment/transform/Transform.java index 9c8253f066b8..fe3b043d5197 100644 --- a/server/src/main/java/io/druid/segment/indexing/Transform.java +++ b/processing/src/main/java/io/druid/segment/transform/Transform.java @@ -17,7 +17,7 @@ * under the License. */ -package io.druid.segment.indexing; +package io.druid.segment.transform; import com.fasterxml.jackson.annotation.JsonSubTypes; import com.fasterxml.jackson.annotation.JsonTypeInfo; diff --git a/server/src/main/java/io/druid/segment/indexing/TransformSpec.java b/processing/src/main/java/io/druid/segment/transform/TransformSpec.java similarity index 89% rename from server/src/main/java/io/druid/segment/indexing/TransformSpec.java rename to processing/src/main/java/io/druid/segment/transform/TransformSpec.java index ff441834a07d..706b022c7f24 100644 --- a/server/src/main/java/io/druid/segment/indexing/TransformSpec.java +++ b/processing/src/main/java/io/druid/segment/transform/TransformSpec.java @@ -17,7 +17,7 @@ * under the License. */ -package io.druid.segment.indexing; +package io.druid.segment.transform; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; @@ -26,10 +26,12 @@ import io.druid.data.input.impl.StringInputRowParser; import io.druid.java.util.common.ISE; import io.druid.query.filter.DimFilter; +import io.druid.segment.column.ValueType; import javax.annotation.Nullable; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Objects; import java.util.Set; @@ -111,9 +113,22 @@ public InputRowParser decorate(final InputRowParser parser) } } + /** + * Create a {@link Transformer} from this TransformSpec, when the rows to be transformed do not have a known + * signature. + */ public Transformer toTransformer() { - return new Transformer(this); + return new Transformer(this, null); + } + + /** + * Create a {@link Transformer} from this TransformSpec, taking advantage of the known signature of the rows + * to be transformed. + */ + public Transformer toTransformer(@Nullable final Map rowSignature) + { + return new Transformer(this, rowSignature); } @Override diff --git a/server/src/main/java/io/druid/segment/indexing/Transformer.java b/processing/src/main/java/io/druid/segment/transform/Transformer.java similarity index 95% rename from server/src/main/java/io/druid/segment/indexing/Transformer.java rename to processing/src/main/java/io/druid/segment/transform/Transformer.java index 8a18f898c43e..5f475b56e363 100644 --- a/server/src/main/java/io/druid/segment/indexing/Transformer.java +++ b/processing/src/main/java/io/druid/segment/transform/Transformer.java @@ -17,7 +17,7 @@ * under the License. */ -package io.druid.segment.indexing; +package io.druid.segment.transform; import io.druid.data.input.InputRow; import io.druid.data.input.Row; @@ -26,6 +26,7 @@ import io.druid.query.filter.ValueMatcher; import io.druid.query.groupby.RowBasedColumnSelectorFactory; import io.druid.segment.column.Column; +import io.druid.segment.column.ValueType; import org.joda.time.DateTime; import javax.annotation.Nullable; @@ -43,7 +44,7 @@ public class Transformer private final ThreadLocal rowSupplierForValueMatcher = new ThreadLocal<>(); private final ValueMatcher valueMatcher; - Transformer(final TransformSpec transformSpec) + Transformer(final TransformSpec transformSpec, final Map rowSignature) { for (final Transform transform : transformSpec.getTransforms()) { transforms.put(transform.getName(), transform.getRowFunction()); @@ -54,7 +55,7 @@ public class Transformer .makeMatcher( RowBasedColumnSelectorFactory.create( rowSupplierForValueMatcher, - null + rowSignature ) ); } else { diff --git a/server/src/main/java/io/druid/segment/indexing/TransformingInputRowParser.java b/processing/src/main/java/io/druid/segment/transform/TransformingInputRowParser.java similarity index 98% rename from server/src/main/java/io/druid/segment/indexing/TransformingInputRowParser.java rename to processing/src/main/java/io/druid/segment/transform/TransformingInputRowParser.java index 96151b585f6e..9c5883e80a6a 100644 --- a/server/src/main/java/io/druid/segment/indexing/TransformingInputRowParser.java +++ b/processing/src/main/java/io/druid/segment/transform/TransformingInputRowParser.java @@ -17,7 +17,7 @@ * under the License. */ -package io.druid.segment.indexing; +package io.druid.segment.transform; import io.druid.data.input.InputRow; import io.druid.data.input.impl.InputRowParser; diff --git a/server/src/main/java/io/druid/segment/indexing/TransformingStringInputRowParser.java b/processing/src/main/java/io/druid/segment/transform/TransformingStringInputRowParser.java similarity index 98% rename from server/src/main/java/io/druid/segment/indexing/TransformingStringInputRowParser.java rename to processing/src/main/java/io/druid/segment/transform/TransformingStringInputRowParser.java index 8a241969136c..20e0430aab5a 100644 --- a/server/src/main/java/io/druid/segment/indexing/TransformingStringInputRowParser.java +++ b/processing/src/main/java/io/druid/segment/transform/TransformingStringInputRowParser.java @@ -17,7 +17,7 @@ * under the License. */ -package io.druid.segment.indexing; +package io.druid.segment.transform; import io.druid.data.input.InputRow; import io.druid.data.input.impl.ParseSpec; diff --git a/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerTest.java b/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerTest.java index de649fa5662c..9e633285b200 100644 --- a/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerTest.java +++ b/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerTest.java @@ -3878,7 +3878,8 @@ public void testDimFilterHavingSpec() ), new SelectorDimFilter("__time", String.valueOf(DateTimes.of("2011-04-01").getMillis()), null) ) - ) + ), + null ); GroupByQuery.Builder builder = GroupByQuery @@ -3924,7 +3925,8 @@ public void testDimFilterHavingSpecWithExtractionFns() new BoundDimFilter("rows", "12", null, true, false, null, extractionFn2, StringComparators.NUMERIC), new SelectorDimFilter("idx", "super-217", extractionFn) ) - ) + ), + null ); GroupByQuery.Builder builder = GroupByQuery @@ -7955,7 +7957,8 @@ public void testGroupByWithHavingSpecOnLongAndFloat() StringComparators.NUMERIC ) ) - ) + ), + null ) ) .setGranularity(QueryRunnerTestHelper.allGran) diff --git a/processing/src/test/java/io/druid/query/groupby/having/DimFilterHavingSpecTest.java b/processing/src/test/java/io/druid/query/groupby/having/DimFilterHavingSpecTest.java index 103b5e5b1448..677ebb5c0ae4 100644 --- a/processing/src/test/java/io/druid/query/groupby/having/DimFilterHavingSpecTest.java +++ b/processing/src/test/java/io/druid/query/groupby/having/DimFilterHavingSpecTest.java @@ -48,7 +48,7 @@ public class DimFilterHavingSpecTest @Test public void testSimple() { - final DimFilterHavingSpec havingSpec = new DimFilterHavingSpec(new SelectorDimFilter("foo", "bar", null)); + final DimFilterHavingSpec havingSpec = new DimFilterHavingSpec(new SelectorDimFilter("foo", "bar", null), null); havingSpec.setRowSignature(null); Assert.assertTrue(havingSpec.eval(new MapBasedRow(0, ImmutableMap.of("foo", "bar")))); @@ -58,7 +58,7 @@ public void testSimple() @Test public void testRowSignature() { - final DimFilterHavingSpec havingSpec = new DimFilterHavingSpec(new SelectorDimFilter("foo", "1", null)); + final DimFilterHavingSpec havingSpec = new DimFilterHavingSpec(new SelectorDimFilter("foo", "1", null), null); havingSpec.setRowSignature(ImmutableMap.of("foo", ValueType.LONG)); Assert.assertTrue(havingSpec.eval(new MapBasedRow(0, ImmutableMap.of("foo", 1L)))); @@ -70,7 +70,7 @@ public void testRowSignature() public void testConcurrentUsage() throws Exception { final ExecutorService exec = Executors.newFixedThreadPool(2); - final DimFilterHavingSpec havingSpec = new DimFilterHavingSpec(new SelectorDimFilter("foo", "1", null)); + final DimFilterHavingSpec havingSpec = new DimFilterHavingSpec(new SelectorDimFilter("foo", "1", null), null); final List> futures = new ArrayList<>(); for (int i = 0; i < 2; i++) { @@ -114,7 +114,7 @@ public void run() @Test public void testSerde() throws Exception { - final DimFilterHavingSpec havingSpec = new DimFilterHavingSpec(new SelectorDimFilter("foo", "1", null)); + final DimFilterHavingSpec havingSpec = new DimFilterHavingSpec(new SelectorDimFilter("foo", "1", null), false); final ObjectMapper objectMapper = new DefaultObjectMapper(); Assert.assertEquals( havingSpec, diff --git a/server/src/main/java/io/druid/segment/indexing/DataSchema.java b/server/src/main/java/io/druid/segment/indexing/DataSchema.java index 5dd5815c78c6..c1feaac05b6d 100644 --- a/server/src/main/java/io/druid/segment/indexing/DataSchema.java +++ b/server/src/main/java/io/druid/segment/indexing/DataSchema.java @@ -34,6 +34,7 @@ import io.druid.query.aggregation.AggregatorFactory; import io.druid.segment.indexing.granularity.GranularitySpec; import io.druid.segment.indexing.granularity.UniformGranularitySpec; +import io.druid.segment.transform.TransformSpec; import java.util.Arrays; import java.util.HashSet; diff --git a/server/src/main/java/io/druid/segment/realtime/firehose/IngestSegmentFirehose.java b/server/src/main/java/io/druid/segment/realtime/firehose/IngestSegmentFirehose.java index 5d91f4e5ed70..de23fdd40c64 100644 --- a/server/src/main/java/io/druid/segment/realtime/firehose/IngestSegmentFirehose.java +++ b/server/src/main/java/io/druid/segment/realtime/firehose/IngestSegmentFirehose.java @@ -43,8 +43,8 @@ import io.druid.segment.column.Column; import io.druid.segment.data.IndexedInts; import io.druid.segment.filter.Filters; -import io.druid.segment.indexing.TransformSpec; -import io.druid.segment.indexing.Transformer; +import io.druid.segment.transform.TransformSpec; +import io.druid.segment.transform.Transformer; import io.druid.utils.Runnables; import javax.annotation.Nullable; diff --git a/server/src/test/java/io/druid/segment/indexing/DataSchemaTest.java b/server/src/test/java/io/druid/segment/indexing/DataSchemaTest.java index 730a27ba8e17..e464c241a81c 100644 --- a/server/src/test/java/io/druid/segment/indexing/DataSchemaTest.java +++ b/server/src/test/java/io/druid/segment/indexing/DataSchemaTest.java @@ -41,6 +41,8 @@ import io.druid.query.filter.SelectorDimFilter; import io.druid.segment.TestHelper; import io.druid.segment.indexing.granularity.ArbitraryGranularitySpec; +import io.druid.segment.transform.ExpressionTransform; +import io.druid.segment.transform.TransformSpec; import org.hamcrest.CoreMatchers; import org.junit.Assert; import org.junit.Rule; diff --git a/server/src/test/java/io/druid/segment/indexing/TransformSpecTest.java b/server/src/test/java/io/druid/segment/indexing/TransformSpecTest.java index 72bd851ae971..5ddc716c76cf 100644 --- a/server/src/test/java/io/druid/segment/indexing/TransformSpecTest.java +++ b/server/src/test/java/io/druid/segment/indexing/TransformSpecTest.java @@ -33,6 +33,8 @@ import io.druid.query.filter.AndDimFilter; import io.druid.query.filter.SelectorDimFilter; import io.druid.segment.TestHelper; +import io.druid.segment.transform.ExpressionTransform; +import io.druid.segment.transform.TransformSpec; import org.junit.Assert; import org.junit.Test; diff --git a/server/src/test/java/io/druid/segment/realtime/firehose/IngestSegmentFirehoseTest.java b/server/src/test/java/io/druid/segment/realtime/firehose/IngestSegmentFirehoseTest.java index 67ca7508d7ef..51bd505ea93f 100644 --- a/server/src/test/java/io/druid/segment/realtime/firehose/IngestSegmentFirehoseTest.java +++ b/server/src/test/java/io/druid/segment/realtime/firehose/IngestSegmentFirehoseTest.java @@ -47,7 +47,7 @@ import io.druid.segment.incremental.IncrementalIndex; import io.druid.segment.incremental.IncrementalIndexSchema; import io.druid.segment.incremental.IncrementalIndexStorageAdapter; -import io.druid.segment.indexing.TransformSpec; +import io.druid.segment.transform.TransformSpec; import org.junit.Assert; import org.junit.Rule; import org.junit.Test; diff --git a/sql/src/main/java/io/druid/sql/calcite/rel/DruidQuery.java b/sql/src/main/java/io/druid/sql/calcite/rel/DruidQuery.java index c4bb35ce8ab9..b4f2c14caa91 100644 --- a/sql/src/main/java/io/druid/sql/calcite/rel/DruidQuery.java +++ b/sql/src/main/java/io/druid/sql/calcite/rel/DruidQuery.java @@ -804,7 +804,7 @@ public GroupByQuery toGroupByQuery() grouping.getDimensionSpecs(), grouping.getAggregatorFactories(), grouping.getPostAggregators(), - grouping.getHavingFilter() != null ? new DimFilterHavingSpec(grouping.getHavingFilter()) : null, + grouping.getHavingFilter() != null ? new DimFilterHavingSpec(grouping.getHavingFilter(), true) : null, limitSpec, ImmutableSortedMap.copyOf(plannerContext.getQueryContext()) ); diff --git a/sql/src/test/java/io/druid/sql/calcite/CalciteQueryTest.java b/sql/src/test/java/io/druid/sql/calcite/CalciteQueryTest.java index c2ec6b2e1d42..d10d81296ee1 100644 --- a/sql/src/test/java/io/druid/sql/calcite/CalciteQueryTest.java +++ b/sql/src/test/java/io/druid/sql/calcite/CalciteQueryTest.java @@ -1058,7 +1058,7 @@ public void testHavingOnGrandTotal() throws Exception .setInterval(QSS(Filtration.eternity())) .setGranularity(Granularities.ALL) .setAggregatorSpecs(AGGS(new DoubleSumAggregatorFactory("a0", "m1"))) - .setHavingSpec(new DimFilterHavingSpec(NUMERIC_SELECTOR("a0", "21", null))) + .setHavingSpec(HAVING(NUMERIC_SELECTOR("a0", "21", null))) .setContext(QUERY_CONTEXT_DEFAULT) .build() ), @@ -1081,7 +1081,7 @@ public void testHavingOnDoubleSum() throws Exception .setDimensions(DIMS(new DefaultDimensionSpec("dim1", "d0"))) .setAggregatorSpecs(AGGS(new DoubleSumAggregatorFactory("a0", "m1"))) .setHavingSpec( - new DimFilterHavingSpec( + HAVING( new BoundDimFilter( "a0", "1", @@ -1107,6 +1107,105 @@ public void testHavingOnDoubleSum() throws Exception ); } + @Test + public void testHavingOnApproximateCountDistinct() throws Exception + { + testQuery( + "SELECT dim2, COUNT(DISTINCT m1) FROM druid.foo GROUP BY dim2 HAVING COUNT(DISTINCT m1) > 1", + ImmutableList.of( + GroupByQuery.builder() + .setDataSource(CalciteTests.DATASOURCE1) + .setInterval(QSS(Filtration.eternity())) + .setGranularity(Granularities.ALL) + .setDimensions(DIMS(new DefaultDimensionSpec("dim2", "d0"))) + .setAggregatorSpecs( + AGGS( + new CardinalityAggregatorFactory( + "a0", + null, + ImmutableList.of( + new DefaultDimensionSpec("m1", "m1", ValueType.FLOAT) + ), + false, + true + ) + ) + ) + .setHavingSpec( + HAVING( + BOUND( + "a0", + "1", + null, + true, + false, + null, + StringComparators.NUMERIC + ) + ) + ) + .setContext(QUERY_CONTEXT_DEFAULT) + .build() + ), + ImmutableList.of( + new Object[]{"", 3L}, + new Object[]{"a", 2L} + ) + ); + } + + @Test + public void testHavingOnExactCountDistinct() throws Exception + { + testQuery( + PLANNER_CONFIG_NO_HLL, + "SELECT dim2, COUNT(DISTINCT m1) FROM druid.foo GROUP BY dim2 HAVING COUNT(DISTINCT m1) > 1", + CalciteTests.REGULAR_USER_AUTH_RESULT, + ImmutableList.of( + GroupByQuery.builder() + .setDataSource( + new QueryDataSource( + GroupByQuery.builder() + .setDataSource(CalciteTests.DATASOURCE1) + .setInterval(QSS(Filtration.eternity())) + .setGranularity(Granularities.ALL) + .setDimensions( + DIMS( + new DefaultDimensionSpec("dim2", "d0", ValueType.STRING), + new DefaultDimensionSpec("m1", "d1", ValueType.FLOAT) + ) + ) + .setContext(QUERY_CONTEXT_DEFAULT) + .build() + ) + ) + .setInterval(QSS(Filtration.eternity())) + .setGranularity(Granularities.ALL) + .setDimensions(DIMS(new DefaultDimensionSpec("d0", "_d0", ValueType.STRING))) + .setAggregatorSpecs(AGGS(new CountAggregatorFactory("a0"))) + .setHavingSpec( + HAVING( + BOUND( + "a0", + "1", + null, + true, + false, + null, + StringComparators.NUMERIC + ) + ) + ) + .setContext(QUERY_CONTEXT_DEFAULT) + .build() + ), + ImmutableList.of( + new Object[]{"", 3L}, + new Object[]{"a", 2L} + ) + ); + } + @Test public void testHavingOnFloatSum() throws Exception { @@ -1122,7 +1221,7 @@ public void testHavingOnFloatSum() throws Exception .setDimensions(DIMS(new DefaultDimensionSpec("dim1", "d0"))) .setAggregatorSpecs(AGGS(new DoubleSumAggregatorFactory("a0", "m1"))) .setHavingSpec( - new DimFilterHavingSpec( + HAVING( new BoundDimFilter( "a0", "1", @@ -1202,7 +1301,7 @@ public void testHavingOnRatio() throws Exception .setPostAggregatorSpecs(ImmutableList.of( EXPRESSION_POST_AGG("p0", "(\"a0\" / \"a1\")") )) - .setHavingSpec(new DimFilterHavingSpec(EXPRESSION_FILTER("((\"a0\" / \"a1\") == 1)"))) + .setHavingSpec(HAVING(EXPRESSION_FILTER("((\"a0\" / \"a1\") == 1)"))) .setContext(QUERY_CONTEXT_DEFAULT) .build() ), @@ -4437,7 +4536,7 @@ public void testGroupByLimitPushDownWithHavingOnLong() throws Exception 4 ) ) - .setHavingSpec(new DimFilterHavingSpec(NUMERIC_SELECTOR("a0", "1", null))) + .setHavingSpec(HAVING(NUMERIC_SELECTOR("a0", "1", null))) .setContext(QUERY_CONTEXT_DEFAULT) .build() ), @@ -5847,7 +5946,7 @@ public void testUsingSubqueryAsFilterOnTwoColumns() throws Exception new DefaultDimensionSpec("dim2", "d1") )) .setAggregatorSpecs(AGGS(new CountAggregatorFactory("a0"))) - .setHavingSpec(new DimFilterHavingSpec(NUMERIC_SELECTOR("a0", "1", null))) + .setHavingSpec(HAVING(NUMERIC_SELECTOR("a0", "1", null))) .setContext(QUERY_CONTEXT_DEFAULT) .build(), newScanQueryBuilder() @@ -6223,6 +6322,11 @@ private static List AGGS(final AggregatorFactory... aggregato return Arrays.asList(aggregators); } + private static DimFilterHavingSpec HAVING(final DimFilter filter) + { + return new DimFilterHavingSpec(filter, true); + } + private static ExpressionVirtualColumn EXPRESSION_VIRTUAL_COLUMN( final String name, final String expression, From d8a6a68d6e7f984ed4053405e3790ffaefcfb787 Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Tue, 31 Oct 2017 07:39:33 -0700 Subject: [PATCH 2/2] Remove unused import. --- .../java/io/druid/query/groupby/having/DimFilterHavingSpec.java | 1 - 1 file changed, 1 deletion(-) diff --git a/processing/src/main/java/io/druid/query/groupby/having/DimFilterHavingSpec.java b/processing/src/main/java/io/druid/query/groupby/having/DimFilterHavingSpec.java index d0b8b72b6a13..8b4e556f7b73 100644 --- a/processing/src/main/java/io/druid/query/groupby/having/DimFilterHavingSpec.java +++ b/processing/src/main/java/io/druid/query/groupby/having/DimFilterHavingSpec.java @@ -22,7 +22,6 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Preconditions; -import io.druid.common.guava.SettableSupplier; import io.druid.data.input.InputRow; import io.druid.data.input.Row; import io.druid.query.aggregation.AggregatorFactory;