diff --git a/distribution/pom.xml b/distribution/pom.xml index 19669d2d496e..207dccd7580d 100644 --- a/distribution/pom.xml +++ b/distribution/pom.xml @@ -93,6 +93,8 @@ -c io.druid.extensions:druid-s3-extensions -c + io.druid.extensions:druid-stats + -c io.druid.extensions:mysql-metadata-storage -c io.druid.extensions:postgresql-metadata-storage diff --git a/docs/content/development/extensions-core/stats.md b/docs/content/development/extensions-core/stats.md new file mode 100644 index 000000000000..747a75686dcf --- /dev/null +++ b/docs/content/development/extensions-core/stats.md @@ -0,0 +1,152 @@ +--- +layout: doc_page +--- + +# Stats aggregator + +Includes stat-related aggregators, including variance and standard deviations, etc. Make sure to [include](../../operations/including-extensions.html) `druid-stats` as an extension. + +## Variance aggregator + +Algorithm of the aggregator is the same with that of apache hive. This is the description in GenericUDAFVariance in hive. + +Evaluate the variance using the algorithm described by Chan, Golub, and LeVeque in +"Algorithms for computing the sample variance: analysis and recommendations" +The American Statistician, 37 (1983) pp. 242--247. + +variance = variance1 + variance2 + n/(m*(m+n)) * pow(((m/n)*t1 - t2),2) + +where: - variance is sum[x-avg^2] (this is actually n times the variance) +and is updated at every step. - n is the count of elements in chunk1 - m is +the count of elements in chunk2 - t1 = sum of elements in chunk1, t2 = +sum of elements in chunk2. + +This algorithm was proven to be numerically stable by J.L. Barlow in +"Error analysis of a pairwise summation algorithm to compute sample variance" +Numer. Math, 58 (1991) pp. 583--590 + +### Pre-aggregating variance at ingestion time + +To use this feature, an "variance" aggregator must be included at indexing time. +The ingestion aggregator can only apply to numeric values. If you use "variance" +then any input rows missing the value will be considered to have a value of 0. + +User can specify expected input type as one of "float", "long", "variance" for ingestion, which is by default "float". + +```json +{ + "type" : "variance", + "name" : , + "fieldName" : , + "inputType" : , + "estimator" : +} +``` + +To query for results, "variance" aggregator with "variance" input type or simply a "varianceFold" aggregator must be included in the query. + +```json +{ + "type" : "varianceFold", + "name" : , + "fieldName" : , + "estimator" : +} +``` + +|Property |Description |Default | +|-------------------------|------------------------------|----------------------------------| +|`estimator`|Set "population" to get variance_pop rather than variance_sample, which is default.|null| + + +### Standard Deviation post-aggregator + +To acquire standard deviation from variance, user can use "stddev" post aggregator. + +```json +{ + "type": "stddev", + "name": "", + "fieldName": "", + "estimator": +} +``` + +## Query Examples: + +### Timeseries Query + +```json +{ + "queryType": "timeseries", + "dataSource": "testing", + "granularity": "day", + "aggregations": [ + { + "type": "variance", + "name": "index_var", + "fieldName": "index_var" + } + ], + "intervals": [ + "2016-03-01T00:00:00.000/2013-03-20T00:00:00.000" + ] +} +``` + +### TopN Query + +```json +{ + "queryType": "topN", + "dataSource": "testing", + "dimensions": ["alias"], + "threshold": 5, + "granularity": "all", + "aggregations": [ + { + "type": "variance", + "name": "index_var", + "fieldName": "index" + } + ], + "postAggregations": [ + { + "type": "stddev", + "name": "index_stddev", + "fieldName": "index_var" + } + ], + "intervals": [ + "2016-03-06T00:00:00/2016-03-06T23:59:59" + ] +} +``` + +### GroupBy Query + +```json +{ + "queryType": "groupBy", + "dataSource": "testing", + "dimensions": ["alias"], + "granularity": "all", + "aggregations": [ + { + "type": "variance", + "name": "index_var", + "fieldName": "index" + } + ], + "postAggregations": [ + { + "type": "stddev", + "name": "index_stddev", + "fieldName": "index_var" + } + ], + "intervals": [ + "2016-03-06T00:00:00/2016-03-06T23:59:59" + ] +} +``` diff --git a/docs/content/development/extensions.md b/docs/content/development/extensions.md index e95c397149dd..1699e2d46716 100644 --- a/docs/content/development/extensions.md +++ b/docs/content/development/extensions.md @@ -30,6 +30,7 @@ Core extensions are maintained by Druid committers. |druid-kafka-extraction-namespace|Kafka-based namespaced lookup. Requires namespace lookup extension.|[link](../development/extensions-core/kafka-extraction-namespace.html)| |druid-lookups-cached-global|A module for [lookups](../querying/lookups.html) providing a jvm-global eager caching for lookups. It provides JDBC and URI implementations for fetching lookup data.|[link](../development/extensions-core/lookups-cached-global.html)| |druid-s3-extensions|Interfacing with data in AWS S3, and using S3 as deep storage.|[link](../development/extensions-core/s3.html)| +|druid-stats|Statistics related module including variance and standard deviation.|[link](../development/extensions-core/stats.html)| |mysql-metadata-storage|MySQL metadata store.|[link](../development/extensions-core/mysql.html)| |postgresql-metadata-storage|PostgreSQL metadata store.|[link](../development/extensions-core/postgresql.html)| diff --git a/extensions-core/stats/pom.xml b/extensions-core/stats/pom.xml new file mode 100644 index 000000000000..cb9379cbcb6e --- /dev/null +++ b/extensions-core/stats/pom.xml @@ -0,0 +1,64 @@ + + + + + 4.0.0 + + io.druid.extensions + druid-stats + druid-stats + druid-stats + + + io.druid + druid + 0.9.2-SNAPSHOT + ../../pom.xml + + + + + io.druid + druid-processing + ${project.parent.version} + provided + + + + + io.druid + druid-processing + ${project.parent.version} + test + test-jar + + + junit + junit + test + + + org.easymock + easymock + test + + + + diff --git a/extensions-core/stats/src/main/java/io/druid/query/aggregation/stats/DruidStatsModule.java b/extensions-core/stats/src/main/java/io/druid/query/aggregation/stats/DruidStatsModule.java new file mode 100644 index 000000000000..cc136f5f9dfa --- /dev/null +++ b/extensions-core/stats/src/main/java/io/druid/query/aggregation/stats/DruidStatsModule.java @@ -0,0 +1,58 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.query.aggregation.stats; + +import com.fasterxml.jackson.databind.Module; +import com.fasterxml.jackson.databind.module.SimpleModule; +import com.google.common.collect.ImmutableList; +import com.google.inject.Binder; +import io.druid.initialization.DruidModule; +import io.druid.query.aggregation.variance.StandardDeviationPostAggregator; +import io.druid.query.aggregation.variance.VarianceAggregatorFactory; +import io.druid.query.aggregation.variance.VarianceFoldingAggregatorFactory; +import io.druid.query.aggregation.variance.VarianceSerde; +import io.druid.segment.serde.ComplexMetrics; + +import java.util.List; + +/** + */ +public class DruidStatsModule implements DruidModule +{ + @Override + public List getJacksonModules() + { + return ImmutableList.of( + new SimpleModule().registerSubtypes( + VarianceAggregatorFactory.class, + VarianceFoldingAggregatorFactory.class, + StandardDeviationPostAggregator.class + ) + ); + } + + @Override + public void configure(Binder binder) + { + if (ComplexMetrics.getSerdeForType("variance") == null) { + ComplexMetrics.registerSerde("variance", new VarianceSerde()); + } + } +} diff --git a/extensions-core/stats/src/main/java/io/druid/query/aggregation/variance/StandardDeviationPostAggregator.java b/extensions-core/stats/src/main/java/io/druid/query/aggregation/variance/StandardDeviationPostAggregator.java new file mode 100644 index 000000000000..2bdcb0da9c91 --- /dev/null +++ b/extensions-core/stats/src/main/java/io/druid/query/aggregation/variance/StandardDeviationPostAggregator.java @@ -0,0 +1,104 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.query.aggregation.variance; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonTypeName; +import com.google.common.base.Preconditions; +import com.google.common.collect.Sets; +import io.druid.query.aggregation.PostAggregator; +import io.druid.query.aggregation.post.ArithmeticPostAggregator; + +import java.util.Comparator; +import java.util.Map; +import java.util.Set; + +/** + */ +@JsonTypeName("stddev") +public class StandardDeviationPostAggregator implements PostAggregator +{ + protected final String name; + protected final String fieldName; + protected final String estimator; + + protected final boolean isVariancePop; + + @JsonCreator + public StandardDeviationPostAggregator( + @JsonProperty("name") String name, + @JsonProperty("fieldName") String fieldName, + @JsonProperty("estimator") String estimator + ) + { + this.fieldName = Preconditions.checkNotNull(fieldName, "fieldName is null"); + this.name = Preconditions.checkNotNull(name, "name is null"); + this.estimator = estimator; + this.isVariancePop = VarianceAggregatorCollector.isVariancePop(estimator); + } + + @Override + public Set getDependentFields() + { + return Sets.newHashSet(fieldName); + } + + @Override + public Comparator getComparator() + { + return ArithmeticPostAggregator.DEFAULT_COMPARATOR; + } + + @Override + public Object compute(Map combinedAggregators) + { + return Math.sqrt(((VarianceAggregatorCollector) combinedAggregators.get(fieldName)).getVariance(isVariancePop)); + } + + @Override + @JsonProperty("name") + public String getName() + { + return name; + } + + @JsonProperty("fieldName") + public String getFieldName() + { + return fieldName; + } + + @JsonProperty("estimator") + public String getEstimator() + { + return estimator; + } + + @Override + public String toString() + { + return "StandardDeviationPostAggregator{" + + "name='" + name + '\'' + + ", fieldName='" + fieldName + '\'' + + ", isVariancePop='" + isVariancePop + '\'' + + '}'; + } +} diff --git a/extensions-core/stats/src/main/java/io/druid/query/aggregation/variance/VarianceAggregator.java b/extensions-core/stats/src/main/java/io/druid/query/aggregation/variance/VarianceAggregator.java new file mode 100644 index 000000000000..2553322d2fda --- /dev/null +++ b/extensions-core/stats/src/main/java/io/druid/query/aggregation/variance/VarianceAggregator.java @@ -0,0 +1,125 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.query.aggregation.variance; + +import io.druid.query.aggregation.Aggregator; +import io.druid.segment.FloatColumnSelector; +import io.druid.segment.LongColumnSelector; +import io.druid.segment.ObjectColumnSelector; + +/** + */ +public abstract class VarianceAggregator implements Aggregator +{ + protected final String name; + + protected final VarianceAggregatorCollector holder = new VarianceAggregatorCollector(); + + public VarianceAggregator(String name) + { + this.name = name; + } + + @Override + public void reset() + { + holder.reset(); + } + + @Override + public Object get() + { + return holder; + } + + @Override + public String getName() + { + return name; + } + + @Override + public void close() + { + } + + @Override + public float getFloat() + { + throw new UnsupportedOperationException("VarianceAggregator does not support getFloat()"); + } + + @Override + public long getLong() + { + throw new UnsupportedOperationException("VarianceAggregator does not support getLong()"); + } + + public static final class FloatVarianceAggregator extends VarianceAggregator + { + private final FloatColumnSelector selector; + + public FloatVarianceAggregator(String name, FloatColumnSelector selector) + { + super(name); + this.selector = selector; + } + + @Override + public void aggregate() + { + holder.add(selector.get()); + } + } + + public static final class LongVarianceAggregator extends VarianceAggregator + { + private final LongColumnSelector selector; + + public LongVarianceAggregator(String name, LongColumnSelector selector) + { + super(name); + this.selector = selector; + } + + @Override + public void aggregate() + { + holder.add(selector.get()); + } + } + + public static final class ObjectVarianceAggregator extends VarianceAggregator + { + private final ObjectColumnSelector selector; + + public ObjectVarianceAggregator(String name, ObjectColumnSelector selector) + { + super(name); + this.selector = selector; + } + + @Override + public void aggregate() + { + VarianceAggregatorCollector.combineValues(holder, selector.get()); + } + } +} diff --git a/extensions-core/stats/src/main/java/io/druid/query/aggregation/variance/VarianceAggregatorCollector.java b/extensions-core/stats/src/main/java/io/druid/query/aggregation/variance/VarianceAggregatorCollector.java new file mode 100644 index 000000000000..4ab6bc25a970 --- /dev/null +++ b/extensions-core/stats/src/main/java/io/druid/query/aggregation/variance/VarianceAggregatorCollector.java @@ -0,0 +1,249 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.query.aggregation.variance; + +import com.fasterxml.jackson.annotation.JsonValue; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.primitives.Doubles; +import com.google.common.primitives.Longs; + +import java.nio.ByteBuffer; +import java.util.Comparator; + +/** + * + * Algorithm used here is copied from apache hive. This is description in GenericUDAFVariance + * + * Evaluate the variance using the algorithm described by Chan, Golub, and LeVeque in + * "Algorithms for computing the sample variance: analysis and recommendations" + * The American Statistician, 37 (1983) pp. 242--247. + * + * variance = variance1 + variance2 + n/(m*(m+n)) * pow(((m/n)*t1 - t2),2) + * + * where: - variance is sum[x-avg^2] (this is actually n times the variance) + * and is updated at every step. - n is the count of elements in chunk1 - m is + * the count of elements in chunk2 - t1 = sum of elements in chunk1, t2 = + * sum of elements in chunk2. + * + * This algorithm was proven to be numerically stable by J.L. Barlow in + * "Error analysis of a pairwise summation algorithm to compute sample variance" + * Numer. Math, 58 (1991) pp. 583--590 + */ +public class VarianceAggregatorCollector +{ + public static boolean isVariancePop(String estimator) { + return estimator != null && estimator.equalsIgnoreCase("population"); + } + + public static VarianceAggregatorCollector from(ByteBuffer buffer) + { + return new VarianceAggregatorCollector(buffer.getLong(), buffer.getDouble(), buffer.getDouble()); + } + + public static final Comparator COMPARATOR = new Comparator() + { + @Override + public int compare(VarianceAggregatorCollector o1, VarianceAggregatorCollector o2) + { + int compare = Longs.compare(o1.count, o2.count); + if (compare == 0) { + compare = Doubles.compare(o1.sum, o2.sum); + if (compare == 0) { + compare = Doubles.compare(o1.nvariance, o2.nvariance); + } + } + return compare; + } + }; + + static Object combineValues(Object lhs, Object rhs) + { + final VarianceAggregatorCollector holder1 = (VarianceAggregatorCollector) lhs; + final VarianceAggregatorCollector holder2 = (VarianceAggregatorCollector) rhs; + + if (holder2.count == 0) { + return holder1; + } + if (holder1.count == 0) { + holder1.nvariance = holder2.nvariance; + holder1.count = holder2.count; + holder1.sum = holder2.sum; + return holder1; + } + + final double ratio = holder1.count / (double) holder2.count; + final double t = holder1.sum / ratio - holder2.sum; + + holder1.nvariance += holder2.nvariance + (ratio / (holder1.count + holder2.count) * t * t); + holder1.count += holder2.count; + holder1.sum += holder2.sum; + + return holder1; + } + + static int getMaxIntermediateSize() + { + return Longs.BYTES + Doubles.BYTES + Doubles.BYTES; + } + + long count; // number of elements + double sum; // sum of elements + double nvariance; // sum[x-avg^2] (this is actually n times of the variance) + + public VarianceAggregatorCollector() + { + this(0, 0, 0); + } + + public void reset() + { + count = 0; + sum = 0; + nvariance = 0; + } + + public VarianceAggregatorCollector(long count, double sum, double nvariance) + { + this.count = count; + this.sum = sum; + this.nvariance = nvariance; + } + + public VarianceAggregatorCollector add(float v) + { + count++; + sum += v; + if (count > 1) { + double t = count * v - sum; + nvariance += (t * t) / ((double) count * (count - 1)); + } + return this; + } + + public VarianceAggregatorCollector add(long v) + { + count++; + sum += v; + if (count > 1) { + double t = count * v - sum; + nvariance += (t * t) / ((double) count * (count - 1)); + } + return this; + } + + public double getVariance(boolean variancePop) + { + if (count == 0) { + // in SQL standard, we should return null for zero elements. But druid there should not be such a case + throw new IllegalStateException("should not be empty holder"); + } else if (count == 1) { + return 0d; + } else { + return variancePop ? nvariance / count : nvariance / (count - 1); + } + } + + @JsonValue + public byte[] toByteArray() + { + final ByteBuffer buffer = toByteBuffer(); + buffer.flip(); + byte[] theBytes = new byte[buffer.remaining()]; + buffer.get(theBytes); + + return theBytes; + } + + public ByteBuffer toByteBuffer() + { + return ByteBuffer.allocate(Longs.BYTES + Doubles.BYTES + Doubles.BYTES) + .putLong(count) + .putDouble(sum) + .putDouble(nvariance); + } + + @VisibleForTesting + boolean equalsWithEpsilon(VarianceAggregatorCollector o, double epsilon) + { + if (this == o) { + return true; + } + + if (count != o.count) { + return false; + } + if (Math.abs(sum - o.sum) > epsilon) { + return false; + } + if (Math.abs(nvariance - o.nvariance) > epsilon) { + return false; + } + + return true; + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + VarianceAggregatorCollector that = (VarianceAggregatorCollector) o; + + if (count != that.count) { + return false; + } + if (Double.compare(that.sum, sum) != 0) { + return false; + } + if (Double.compare(that.nvariance, nvariance) != 0) { + return false; + } + + return true; + } + + @Override + public int hashCode() + { + int result; + long temp; + result = (int) (count ^ (count >>> 32)); + temp = Double.doubleToLongBits(sum); + result = 31 * result + (int) (temp ^ (temp >>> 32)); + temp = Double.doubleToLongBits(nvariance); + result = 31 * result + (int) (temp ^ (temp >>> 32)); + return result; + } + + @Override + public String toString() + { + return "VarianceHolder{" + + "count=" + count + + ", sum=" + sum + + ", nvariance=" + nvariance + + '}'; + } +} diff --git a/extensions-core/stats/src/main/java/io/druid/query/aggregation/variance/VarianceAggregatorFactory.java b/extensions-core/stats/src/main/java/io/druid/query/aggregation/variance/VarianceAggregatorFactory.java new file mode 100644 index 000000000000..d5b85718f348 --- /dev/null +++ b/extensions-core/stats/src/main/java/io/druid/query/aggregation/variance/VarianceAggregatorFactory.java @@ -0,0 +1,294 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.query.aggregation.variance; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonTypeName; +import com.google.common.base.Preconditions; +import com.metamx.common.IAE; +import com.metamx.common.StringUtils; +import io.druid.query.aggregation.Aggregator; +import io.druid.query.aggregation.AggregatorFactory; +import io.druid.query.aggregation.AggregatorFactoryNotMergeableException; +import io.druid.query.aggregation.Aggregators; +import io.druid.query.aggregation.BufferAggregator; +import io.druid.segment.ColumnSelectorFactory; +import io.druid.segment.ObjectColumnSelector; +import org.apache.commons.codec.binary.Base64; + +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.Comparator; +import java.util.List; +import java.util.Objects; + +/** + */ +@JsonTypeName("variance") +public class VarianceAggregatorFactory extends AggregatorFactory +{ + protected static final byte CACHE_TYPE_ID = 16; + + protected final String fieldName; + protected final String name; + protected final String estimator; + private final String inputType; + + protected final boolean isVariancePop; + + @JsonCreator + public VarianceAggregatorFactory( + @JsonProperty("name") String name, + @JsonProperty("fieldName") String fieldName, + @JsonProperty("estimator") String estimator, + @JsonProperty("inputType") String inputType + ) + { + Preconditions.checkNotNull(name, "Must have a valid, non-null aggregator name"); + Preconditions.checkNotNull(fieldName, "Must have a valid, non-null fieldName"); + + this.name = name; + this.fieldName = fieldName; + this.estimator = estimator; + this.isVariancePop = VarianceAggregatorCollector.isVariancePop(estimator); + this.inputType = inputType == null ? "float" : inputType; + } + + public VarianceAggregatorFactory(String name, String fieldName) + { + this(name, fieldName, null, null); + } + + @Override + public String getTypeName() + { + return "variance"; + } + + @Override + public int getMaxIntermediateSize() + { + return VarianceAggregatorCollector.getMaxIntermediateSize(); + } + + @Override + public Aggregator factorize(ColumnSelectorFactory metricFactory) + { + ObjectColumnSelector selector = metricFactory.makeObjectColumnSelector(fieldName); + if (selector == null) { + return Aggregators.noopAggregator(); + } + + if ("float".equalsIgnoreCase(inputType)) { + return new VarianceAggregator.FloatVarianceAggregator( + name, + metricFactory.makeFloatColumnSelector(fieldName) + ); + } else if ("long".equalsIgnoreCase(inputType)) { + return new VarianceAggregator.LongVarianceAggregator( + name, + metricFactory.makeLongColumnSelector(fieldName) + ); + } else if ("variance".equalsIgnoreCase(inputType)) { + return new VarianceAggregator.ObjectVarianceAggregator(name, selector); + } + throw new IAE( + "Incompatible type for metric[%s], expected a float, long or variance, got a %s", fieldName, inputType + ); + } + + @Override + public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory) + { + ObjectColumnSelector selector = metricFactory.makeObjectColumnSelector(fieldName); + if (selector == null) { + return Aggregators.noopBufferAggregator(); + } + if ("float".equalsIgnoreCase(inputType)) { + return new VarianceBufferAggregator.FloatVarianceAggregator( + name, + metricFactory.makeFloatColumnSelector(fieldName) + ); + } else if ("long".equalsIgnoreCase(inputType)) { + return new VarianceBufferAggregator.LongVarianceAggregator( + name, + metricFactory.makeLongColumnSelector(fieldName) + ); + } else if ("variance".equalsIgnoreCase(inputType)) { + return new VarianceBufferAggregator.ObjectVarianceAggregator(name, selector); + } + throw new IAE( + "Incompatible type for metric[%s], expected a float, long or variance, got a %s", fieldName, inputType + ); + } + + @Override + public AggregatorFactory getCombiningFactory() + { + return new VarianceFoldingAggregatorFactory(name, name, estimator); + } + + @Override + public List getRequiredColumns() + { + return Arrays.asList(new VarianceAggregatorFactory(fieldName, fieldName, estimator, inputType)); + } + + @Override + public AggregatorFactory getMergingFactory(AggregatorFactory other) throws AggregatorFactoryNotMergeableException + { + if (Objects.equals(getName(), other.getName()) && this.getClass() == other.getClass()) { + return getCombiningFactory(); + } else { + throw new AggregatorFactoryNotMergeableException(this, other); + } + } + + @Override + public Comparator getComparator() + { + return VarianceAggregatorCollector.COMPARATOR; + } + + @Override + public Object getAggregatorStartValue() + { + return new VarianceAggregatorCollector(); + } + + @Override + public Object combine(Object lhs, Object rhs) + { + return VarianceAggregatorCollector.combineValues(lhs, rhs); + } + + @Override + public Object finalizeComputation(Object object) + { + return ((VarianceAggregatorCollector) object).getVariance(isVariancePop); + } + + @Override + public Object deserialize(Object object) + { + if (object instanceof byte[]) { + return VarianceAggregatorCollector.from(ByteBuffer.wrap((byte[]) object)); + } else if (object instanceof ByteBuffer) { + return VarianceAggregatorCollector.from((ByteBuffer) object); + } else if (object instanceof String) { + return VarianceAggregatorCollector.from( + ByteBuffer.wrap(Base64.decodeBase64(StringUtils.toUtf8((String) object))) + ); + } + return object; + } + + @JsonProperty + public String getFieldName() + { + return fieldName; + } + + @Override + @JsonProperty + public String getName() + { + return name; + } + + @JsonProperty + public String getEstimator() + { + return estimator; + } + + @JsonProperty + public String getInputType() + { + return inputType; + } + + @Override + public List requiredFields() + { + return Arrays.asList(fieldName); + } + + @Override + public byte[] getCacheKey() + { + byte[] fieldNameBytes = StringUtils.toUtf8(fieldName); + byte[] inputTypeBytes = StringUtils.toUtf8(inputType); + return ByteBuffer.allocate(2 + fieldNameBytes.length + 1 + inputTypeBytes.length) + .put(CACHE_TYPE_ID) + .put(isVariancePop ? (byte) 1 : 0) + .put(fieldNameBytes) + .put((byte) 0xFF) + .put(inputTypeBytes) + .array(); + } + + @Override + public String toString() + { + return getClass().getSimpleName() + "{" + + "fieldName='" + fieldName + '\'' + + ", name='" + name + '\'' + + ", isVariancePop='" + isVariancePop + '\'' + + ", inputType='" + inputType + '\'' + + '}'; + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + VarianceAggregatorFactory that = (VarianceAggregatorFactory) o; + + if (!Objects.equals(name, that.name)) { + return false; + } + if (!Objects.equals(isVariancePop, that.isVariancePop)) { + return false; + } + if (!Objects.equals(inputType, that.inputType)) { + return false; + } + + return true; + } + + @Override + public int hashCode() + { + int result = fieldName.hashCode(); + result = 31 * result + Objects.hashCode(name); + result = 31 * result + Objects.hashCode(isVariancePop); + result = 31 * result + Objects.hashCode(inputType); + return result; + } +} diff --git a/extensions-core/stats/src/main/java/io/druid/query/aggregation/variance/VarianceBufferAggregator.java b/extensions-core/stats/src/main/java/io/druid/query/aggregation/variance/VarianceBufferAggregator.java new file mode 100644 index 000000000000..77017de52312 --- /dev/null +++ b/extensions-core/stats/src/main/java/io/druid/query/aggregation/variance/VarianceBufferAggregator.java @@ -0,0 +1,171 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.query.aggregation.variance; + +import com.google.common.primitives.Doubles; +import com.google.common.primitives.Longs; +import io.druid.query.aggregation.BufferAggregator; +import io.druid.segment.FloatColumnSelector; +import io.druid.segment.LongColumnSelector; +import io.druid.segment.ObjectColumnSelector; + +import java.nio.ByteBuffer; + +/** + */ +public abstract class VarianceBufferAggregator implements BufferAggregator +{ + private static final int COUNT_OFFSET = 0; + private static final int SUM_OFFSET = Longs.BYTES; + private static final int NVARIANCE_OFFSET = SUM_OFFSET + Doubles.BYTES; + + protected final String name; + + public VarianceBufferAggregator(String name) + { + this.name = name; + } + + @Override + public void init(final ByteBuffer buf, final int position) + { + buf.putLong(position + COUNT_OFFSET, 0) + .putDouble(position + SUM_OFFSET, 0) + .putDouble(position + NVARIANCE_OFFSET, 0); + } + + @Override + public Object get(final ByteBuffer buf, final int position) + { + VarianceAggregatorCollector holder = new VarianceAggregatorCollector(); + holder.count = buf.getLong(position); + holder.sum = buf.getDouble(position + SUM_OFFSET); + holder.nvariance = buf.getDouble(position + NVARIANCE_OFFSET); + return holder; + } + + @Override + public float getFloat(ByteBuffer buf, int position) + { + throw new UnsupportedOperationException("VarianceBufferAggregator does not support getFloat()"); + } + + @Override + public long getLong(ByteBuffer buf, int position) + { + throw new UnsupportedOperationException("VarianceBufferAggregator does not support getFloat()"); + } + + @Override + public void close() + { + } + + public static final class FloatVarianceAggregator extends VarianceBufferAggregator + { + private final FloatColumnSelector selector; + + public FloatVarianceAggregator(String name, FloatColumnSelector selector) + { + super(name); + this.selector = selector; + } + + @Override + public void aggregate(ByteBuffer buf, int position) + { + float v = selector.get(); + long count = buf.getLong(position + COUNT_OFFSET) + 1; + double sum = buf.getDouble(position + SUM_OFFSET) + v; + buf.putLong(position, count); + buf.putDouble(position + SUM_OFFSET, sum); + if (count > 1) { + double t = count * v - sum; + double variance = buf.getDouble(position + NVARIANCE_OFFSET) + (t * t) / ((double) count * (count - 1)); + buf.putDouble(position + NVARIANCE_OFFSET, variance); + } + } + } + + public static final class LongVarianceAggregator extends VarianceBufferAggregator + { + private final LongColumnSelector selector; + + public LongVarianceAggregator(String name, LongColumnSelector selector) + { + super(name); + this.selector = selector; + } + + @Override + public void aggregate(ByteBuffer buf, int position) + { + long v = selector.get(); + long count = buf.getLong(position + COUNT_OFFSET) + 1; + double sum = buf.getDouble(position + SUM_OFFSET) + v; + buf.putLong(position, count); + buf.putDouble(position + SUM_OFFSET, sum); + if (count > 1) { + double t = count * v - sum; + double variance = buf.getDouble(position + NVARIANCE_OFFSET) + (t * t) / ((double) count * (count - 1)); + buf.putDouble(position + NVARIANCE_OFFSET, variance); + } + } + } + + public static final class ObjectVarianceAggregator extends VarianceBufferAggregator + { + private final ObjectColumnSelector selector; + + public ObjectVarianceAggregator(String name, ObjectColumnSelector selector) + { + super(name); + this.selector = selector; + } + + @Override + public void aggregate(ByteBuffer buf, int position) + { + VarianceAggregatorCollector holder2 = (VarianceAggregatorCollector) selector.get(); + + long count = buf.getLong(position + COUNT_OFFSET); + if (count == 0) { + buf.putLong(position, holder2.count); + buf.putDouble(position + SUM_OFFSET, holder2.sum); + buf.putDouble(position + NVARIANCE_OFFSET, holder2.nvariance); + return; + } + + double sum = buf.getDouble(position + SUM_OFFSET); + double nvariance = buf.getDouble(position + NVARIANCE_OFFSET); + + final double ratio = count / (double) holder2.count; + final double t = sum / ratio - holder2.sum; + + nvariance += holder2.nvariance + (ratio / (count + holder2.count) * t * t); + count += holder2.count; + sum += holder2.sum; + + buf.putLong(position, count); + buf.putDouble(position + SUM_OFFSET, sum); + buf.putDouble(position + NVARIANCE_OFFSET, nvariance); + } + } +} diff --git a/extensions-core/stats/src/main/java/io/druid/query/aggregation/variance/VarianceFoldingAggregatorFactory.java b/extensions-core/stats/src/main/java/io/druid/query/aggregation/variance/VarianceFoldingAggregatorFactory.java new file mode 100644 index 000000000000..a1134184b4c1 --- /dev/null +++ b/extensions-core/stats/src/main/java/io/druid/query/aggregation/variance/VarianceFoldingAggregatorFactory.java @@ -0,0 +1,38 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.query.aggregation.variance; + +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonTypeName; + +/** + */ +@JsonTypeName("varianceFold") +public class VarianceFoldingAggregatorFactory extends VarianceAggregatorFactory +{ + public VarianceFoldingAggregatorFactory( + @JsonProperty("name") String name, + @JsonProperty("fieldName") String fieldName, + @JsonProperty("estimator") String estimator + ) + { + super(name, fieldName, estimator, "variance"); + } +} diff --git a/extensions-core/stats/src/main/java/io/druid/query/aggregation/variance/VarianceSerde.java b/extensions-core/stats/src/main/java/io/druid/query/aggregation/variance/VarianceSerde.java new file mode 100644 index 000000000000..4bb83ada71ae --- /dev/null +++ b/extensions-core/stats/src/main/java/io/druid/query/aggregation/variance/VarianceSerde.java @@ -0,0 +1,121 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.query.aggregation.variance; + +import com.google.common.collect.Ordering; +import io.druid.data.input.InputRow; +import io.druid.segment.column.ColumnBuilder; +import io.druid.segment.data.GenericIndexed; +import io.druid.segment.data.ObjectStrategy; +import io.druid.segment.serde.ComplexColumnPartSupplier; +import io.druid.segment.serde.ComplexMetricExtractor; +import io.druid.segment.serde.ComplexMetricSerde; + +import java.nio.ByteBuffer; +import java.util.List; + +/** + */ +public class VarianceSerde extends ComplexMetricSerde +{ + private static final Ordering comparator = + Ordering.from(VarianceAggregatorCollector.COMPARATOR).nullsFirst(); + + @Override + public String getTypeName() + { + return "variance"; + } + + @Override + public ComplexMetricExtractor getExtractor() + { + return new ComplexMetricExtractor() + { + @Override + public Class extractedClass() + { + return VarianceAggregatorCollector.class; + } + + @Override + public VarianceAggregatorCollector extractValue(InputRow inputRow, String metricName) + { + Object rawValue = inputRow.getRaw(metricName); + + if (rawValue instanceof VarianceAggregatorCollector) { + return (VarianceAggregatorCollector) rawValue; + } + VarianceAggregatorCollector collector = new VarianceAggregatorCollector(); + + List dimValues = inputRow.getDimension(metricName); + if (dimValues != null && dimValues.size() > 0) { + for (String dimValue : dimValues) { + float value = Float.parseFloat(dimValue); + collector.add(value); + } + } + return collector; + } + }; + } + + @Override + public void deserializeColumn( + ByteBuffer byteBuffer, ColumnBuilder columnBuilder + ) + { + final GenericIndexed column = GenericIndexed.read(byteBuffer, getObjectStrategy()); + columnBuilder.setComplexColumn(new ComplexColumnPartSupplier(getTypeName(), column)); + } + + @Override + public ObjectStrategy getObjectStrategy() + { + return new ObjectStrategy() + { + @Override + public Class getClazz() + { + return VarianceAggregatorCollector.class; + } + + @Override + public VarianceAggregatorCollector fromByteBuffer(ByteBuffer buffer, int numBytes) + { + final ByteBuffer readOnlyBuffer = buffer.asReadOnlyBuffer(); + readOnlyBuffer.limit(readOnlyBuffer.position() + numBytes); + return VarianceAggregatorCollector.from(readOnlyBuffer); + } + + @Override + public byte[] toBytes(VarianceAggregatorCollector collector) + { + return collector == null ? new byte[]{} : collector.toByteArray(); + } + + @Override + public int compare(VarianceAggregatorCollector o1, VarianceAggregatorCollector o2) + { + return comparator.compare(o1, o2); + } + }; + } +} diff --git a/extensions-core/stats/src/main/resources/META-INF/services/io.druid.initialization.DruidModule b/extensions-core/stats/src/main/resources/META-INF/services/io.druid.initialization.DruidModule new file mode 100644 index 000000000000..823ccffd31d2 --- /dev/null +++ b/extensions-core/stats/src/main/resources/META-INF/services/io.druid.initialization.DruidModule @@ -0,0 +1 @@ +io.druid.query.aggregation.stats.DruidStatsModule diff --git a/extensions-core/stats/src/test/java/io/druid/query/aggregation/variance/VarianceAggregatorCollectorTest.java b/extensions-core/stats/src/test/java/io/druid/query/aggregation/variance/VarianceAggregatorCollectorTest.java new file mode 100644 index 000000000000..89d92179770b --- /dev/null +++ b/extensions-core/stats/src/test/java/io/druid/query/aggregation/variance/VarianceAggregatorCollectorTest.java @@ -0,0 +1,171 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.query.aggregation.variance; + +import com.google.common.collect.Lists; +import com.metamx.common.Pair; +import io.druid.segment.FloatColumnSelector; +import io.druid.segment.ObjectColumnSelector; +import org.junit.Assert; +import org.junit.Test; + +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.List; +import java.util.Random; + +public class VarianceAggregatorCollectorTest +{ + private static final float[] market_upfront = new float[]{ + 800.0f, 800.0f, 826.0602f, 1564.6177f, 1006.4021f, 869.64374f, 809.04175f, 1458.4027f, 852.4375f, 879.9881f, + 950.1468f, 712.7746f, 846.2675f, 682.8855f, 1109.875f, 594.3817f, 870.1159f, 677.511f, 1410.2781f, 1219.4321f, + 979.306f, 1224.5016f, 1215.5898f, 716.6092f, 1301.0233f, 786.3633f, 989.9315f, 1609.0967f, 1023.2952f, 1367.6381f, + 1627.598f, 810.8894f, 1685.5001f, 545.9906f, 1870.061f, 555.476f, 1643.3408f, 943.4972f, 1667.4978f, 913.5611f, + 1218.5619f, 1273.7074f, 888.70526f, 1113.1141f, 864.5689f, 1308.582f, 785.07886f, 1363.6149f, 787.1253f, + 826.0392f, 1107.2438f, 872.6257f, 1188.3693f, 911.9568f, 794.0988f, 1299.0933f, 1212.9283f, 901.3273f, 723.5143f, + 1061.9734f, 602.97955f, 879.4061f, 724.2625f, 862.93134f, 1133.1351f, 948.65796f, 807.6017f, 914.525f, 1553.3485f, + 1208.4567f, 679.6193f, 645.1777f, 1120.0887f, 1649.5333f, 1433.3988f, 1598.1793f, 1192.5631f, 1022.85455f, + 1228.5024f, 1298.4158f, 1345.9644f, 1291.898f, 1306.4957f, 1287.7667f, 1631.5844f, 578.79596f, 1017.5732f, + 782.0135f, 829.91626f, 1862.7379f, 873.3065f, 1427.0167f, 1430.2573f, 1101.9182f, 1166.1411f, 1004.94086f, + 740.1837f, 865.7779f, 901.30756f, 691.9589f, 1674.3317f, 975.57794f, 1360.6948f, 755.89935f, 771.34845f, + 869.30835f, 1095.6376f, 906.3738f, 988.8938f, 835.76263f, 776.70294f, 875.6834f, 1070.8363f, 835.46124f, + 715.5161f, 755.64655f, 771.1005f, 764.50806f, 736.40924f, 884.8373f, 918.72284f, 893.98505f, 832.8749f, + 850.995f, 767.9733f, 848.3399f, 878.6838f, 906.1019f, 1403.8302f, 936.4296f, 846.2884f, 856.4901f, 1032.2576f, + 954.7542f, 1031.99f, 907.02155f, 1110.789f, 843.95215f, 1362.6506f, 884.8015f, 1684.2688f, 873.65204f, 855.7177f, + 996.56415f, 1061.6786f, 962.2358f, 1019.8985f, 1056.4193f, 1198.7231f, 1108.1361f, 1289.0095f, + 1069.4318f, 1001.13403f, 1030.4995f, 1734.2749f, 1063.2012f, 1447.3412f, 1234.2476f, 1144.3424f, 1049.7385f, + 811.9913f, 768.4231f, 1151.0692f, 877.0794f, 1146.4231f, 902.6157f, 1355.8434f, 897.39343f, 1260.1431f, 762.8625f, + 935.168f, 782.10785f, 996.2054f, 767.69214f, 1031.7415f, 775.9656f, 1374.9684f, 853.163f, 1456.6118f, 811.92523f, + 989.0328f, 744.7446f, 1166.4012f, 753.105f, 962.7312f, 780.272f + }; + + private static final float[] market_total_market = new float[]{ + 1000.0f, 1000.0f, 1040.9456f, 1689.0128f, 1049.142f, 1073.4766f, 1007.36554f, 1545.7089f, 1016.9652f, 1077.6127f, + 1075.0896f, 953.9954f, 1022.7833f, 937.06195f, 1156.7448f, 849.8775f, 1066.208f, 904.34064f, 1240.5255f, + 1343.2325f, 1088.9431f, 1349.2544f, 1102.8667f, 939.2441f, 1109.8754f, 997.99457f, 1037.4495f, 1686.4197f, + 1074.007f, 1486.2013f, 1300.3022f, 1021.3345f, 1314.6195f, 792.32605f, 1233.4489f, 805.9301f, 1184.9207f, + 1127.231f, 1203.4656f, 1100.9048f, 1097.2112f, 1410.793f, 1033.4012f, 1283.166f, 1025.6333f, 1331.861f, + 1039.5005f, 1332.4684f, 1011.20544f, 1029.9952f, 1047.2129f, 1057.08f, 1064.9727f, 1082.7277f, 971.0508f, + 1320.6383f, 1070.1655f, 1089.6478f, 980.3866f, 1179.6959f, 959.2362f, 1092.417f, 987.0674f, 1103.4583f, + 1091.2231f, 1199.6074f, 1044.3843f, 1183.2408f, 1289.0973f, 1360.0325f, 993.59125f, 1021.07117f, 1105.3834f, + 1601.8295f, 1200.5272f, 1600.7233f, 1317.4584f, 1304.3262f, 1544.1082f, 1488.7378f, 1224.8271f, 1421.6487f, + 1251.9062f, 1414.619f, 1350.1754f, 970.7283f, 1057.4272f, 1073.9673f, 996.4337f, 1743.9218f, 1044.5629f, + 1474.5911f, 1159.2788f, 1292.5428f, 1124.2014f, 1243.354f, 1051.809f, 1143.0784f, 1097.4907f, 1010.3703f, + 1326.8291f, 1179.8038f, 1281.6012f, 994.73126f, 1081.6504f, 1103.2397f, 1177.8584f, 1152.5477f, 1117.954f, + 1084.3325f, 1029.8025f, 1121.3854f, 1244.85f, 1077.2794f, 1098.5432f, 998.65076f, 1088.8076f, 1008.74554f, + 998.75397f, 1129.7233f, 1075.243f, 1141.5884f, 1037.3811f, 1099.1973f, 981.5773f, 1092.942f, 1072.2394f, + 1154.4156f, 1311.1786f, 1176.6052f, 1107.2202f, 1102.699f, 1285.0901f, 1217.5475f, 1283.957f, 1178.8302f, + 1301.7781f, 1119.2472f, 1403.3389f, 1156.6019f, 1429.5802f, 1137.8423f, 1124.9352f, 1256.4998f, 1217.8774f, + 1247.8909f, 1185.71f, 1345.7817f, 1250.1667f, 1390.754f, 1224.1162f, 1361.0802f, 1190.9337f, 1310.7971f, + 1466.2094f, 1366.4476f, 1314.8397f, 1522.0437f, 1193.5563f, 1321.375f, 1055.7837f, 1021.6387f, 1197.0084f, + 1131.532f, 1192.1443f, 1154.2896f, 1272.6771f, 1141.5146f, 1190.8961f, 1009.36316f, 1006.9138f, 1032.5999f, + 1137.3857f, 1030.0756f, 1005.25305f, 1030.0947f, 1112.7948f, 1113.3575f, 1153.9747f, 1069.6409f, 1016.13745f, + 994.9023f, 1032.1543f, 999.5864f, 994.75275f, 1029.057f + }; + + @Test + public void testVariance() + { + Random random = new Random(); + for (float[] values : Arrays.asList(market_upfront, market_total_market)) { + double sum = 0; + for (float f : values) { + sum += f; + } + final double mean = sum / values.length; + double temp = 0; + for (float f : values) { + temp += Math.pow(f - mean, 2); + } + + final double variance_pop = temp / values.length; + final double variance_sample = temp / (values.length - 1); + + VarianceAggregatorCollector holder = new VarianceAggregatorCollector(); + for (float f : values) { + holder.add(f); + } + Assert.assertEquals(holder.getVariance(true), variance_pop, 0.001); + Assert.assertEquals(holder.getVariance(false), variance_sample, 0.001); + + for (int mergeOn : new int[] {2, 3, 5, 9}) { + List holders1 = Lists.newArrayListWithCapacity(mergeOn); + List> holders2 = Lists.newArrayListWithCapacity(mergeOn); + + FloatHandOver valueHandOver = new FloatHandOver(); + for (int i = 0; i < mergeOn; i++) { + holders1.add(new VarianceAggregatorCollector()); + holders2.add(Pair.of( + new VarianceBufferAggregator.FloatVarianceAggregator("XX", valueHandOver), + ByteBuffer.allocate(VarianceAggregatorCollector.getMaxIntermediateSize()) + )); + } + for (float f : values) { + valueHandOver.v = f; + int index = random.nextInt(mergeOn); + holders1.get(index).add(f); + holders2.get(index).lhs.aggregate(holders2.get(index).rhs, 0); + } + VarianceAggregatorCollector holder1 = holders1.get(0); + for (int i = 1; i < mergeOn; i++) { + holder1 = (VarianceAggregatorCollector) VarianceAggregatorCollector.combineValues(holder1, holders1.get(i)); + } + ObjectHandOver collectHandOver = new ObjectHandOver(); + ByteBuffer buffer = ByteBuffer.allocate(VarianceAggregatorCollector.getMaxIntermediateSize()); + VarianceBufferAggregator.ObjectVarianceAggregator merger = new VarianceBufferAggregator.ObjectVarianceAggregator("xxx", collectHandOver); + for (int i = 0; i < mergeOn; i++) { + collectHandOver.v = holders2.get(i).lhs.get(holders2.get(i).rhs, 0); + merger.aggregate(buffer, 0); + } + VarianceAggregatorCollector holder2 = (VarianceAggregatorCollector) merger.get(buffer, 0); + Assert.assertEquals(holder2.getVariance(true), variance_pop, 0.01); + Assert.assertEquals(holder2.getVariance(false), variance_sample, 0.01); + } + } + } + + private static class FloatHandOver implements FloatColumnSelector + { + float v; + + @Override + public float get() + { + return v; + } + } + + private static class ObjectHandOver implements ObjectColumnSelector + { + Object v; + + @Override + public Class classOfObject() + { + return v == null ? Object.class : v.getClass(); + } + + @Override + public Object get() + { + return v; + } + } +} diff --git a/extensions-core/stats/src/test/java/io/druid/query/aggregation/variance/VarianceAggregatorTest.java b/extensions-core/stats/src/test/java/io/druid/query/aggregation/variance/VarianceAggregatorTest.java new file mode 100644 index 000000000000..9beb980a14a5 --- /dev/null +++ b/extensions-core/stats/src/test/java/io/druid/query/aggregation/variance/VarianceAggregatorTest.java @@ -0,0 +1,173 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.query.aggregation.variance; + +import io.druid.jackson.DefaultObjectMapper; +import io.druid.query.aggregation.TestFloatColumnSelector; +import io.druid.query.aggregation.TestObjectColumnSelector; +import io.druid.segment.ColumnSelectorFactory; +import org.easymock.EasyMock; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.nio.ByteBuffer; + +/** + */ +public class VarianceAggregatorTest +{ + private VarianceAggregatorFactory aggFactory; + private ColumnSelectorFactory colSelectorFactory; + private TestFloatColumnSelector selector; + + private final float[] values = {1.1f, 2.7f, 3.5f, 1.3f}; + private final double[] variances_pop = new double[values.length]; // calculated + private final double[] variances_samp = new double[values.length]; // calculated + + public VarianceAggregatorTest() throws Exception + { + String aggSpecJson = "{\"type\": \"variance\", \"name\": \"billy\", \"fieldName\": \"nilly\"}"; + aggFactory = new DefaultObjectMapper().readValue(aggSpecJson, VarianceAggregatorFactory.class); + double sum = 0; + for (int i = 0; i < values.length; i++) { + sum += values[i]; + if (i > 0) { + double mean = sum / (i + 1); + double temp = 0; + for (int j = 0; j <= i; j++) { + temp += Math.pow(values[j] - mean, 2); + } + variances_pop[i] = temp / (i + 1); + variances_samp[i] = temp / i; + } + } + } + + @Before + public void setup() + { + selector = new TestFloatColumnSelector(values); + colSelectorFactory = EasyMock.createMock(ColumnSelectorFactory.class); + EasyMock.expect(colSelectorFactory.makeObjectColumnSelector("nilly")).andReturn(new TestObjectColumnSelector(0.0f)); + EasyMock.expect(colSelectorFactory.makeFloatColumnSelector("nilly")).andReturn(selector); + EasyMock.replay(colSelectorFactory); + } + + @Test + public void testDoubleVarianceAggregator() + { + VarianceAggregator agg = (VarianceAggregator) aggFactory.factorize(colSelectorFactory); + + Assert.assertEquals("billy", agg.getName()); + + assertValues((VarianceAggregatorCollector) agg.get(), 0, 0d, 0d); + aggregate(selector, agg); + assertValues((VarianceAggregatorCollector) agg.get(), 1, 1.1d, 0d); + aggregate(selector, agg); + assertValues((VarianceAggregatorCollector) agg.get(), 2, 3.8d, 1.28d); + aggregate(selector, agg); + assertValues((VarianceAggregatorCollector) agg.get(), 3, 7.3d, 2.9866d); + aggregate(selector, agg); + assertValues((VarianceAggregatorCollector) agg.get(), 4, 8.6d, 3.95d); + + agg.reset(); + assertValues((VarianceAggregatorCollector) agg.get(), 0, 0d, 0d); + } + + private void assertValues(VarianceAggregatorCollector holder, long count, double sum, double nvariance) + { + Assert.assertEquals(count, holder.count); + Assert.assertEquals(sum, holder.sum, 0.0001); + Assert.assertEquals(nvariance, holder.nvariance, 0.0001); + if (count == 0) { + try { + holder.getVariance(false); + Assert.fail("Should throw ISE"); + } + catch (IllegalStateException e) { + Assert.assertTrue(e.getMessage().contains("should not be empty holder")); + } + } else { + Assert.assertEquals(holder.getVariance(true), variances_pop[(int) count - 1], 0.0001); + Assert.assertEquals(holder.getVariance(false), variances_samp[(int) count - 1], 0.0001); + } + } + + @Test + public void testDoubleVarianceBufferAggregator() + { + VarianceBufferAggregator agg = (VarianceBufferAggregator) aggFactory.factorizeBuffered( + colSelectorFactory + ); + + ByteBuffer buffer = ByteBuffer.wrap(new byte[aggFactory.getMaxIntermediateSize()]); + agg.init(buffer, 0); + + assertValues((VarianceAggregatorCollector) agg.get(buffer, 0), 0, 0d, 0d); + aggregate(selector, agg, buffer, 0); + assertValues((VarianceAggregatorCollector) agg.get(buffer, 0), 1, 1.1d, 0d); + aggregate(selector, agg, buffer, 0); + assertValues((VarianceAggregatorCollector) agg.get(buffer, 0), 2, 3.8d, 1.28d); + aggregate(selector, agg, buffer, 0); + assertValues((VarianceAggregatorCollector) agg.get(buffer, 0), 3, 7.3d, 2.9866d); + aggregate(selector, agg, buffer, 0); + assertValues((VarianceAggregatorCollector) agg.get(buffer, 0), 4, 8.6d, 3.95d); + } + + @Test + public void testCombine() + { + VarianceAggregatorCollector holder1 = new VarianceAggregatorCollector().add(1.1f).add(2.7f); + VarianceAggregatorCollector holder2 = new VarianceAggregatorCollector().add(3.5f).add(1.3f); + VarianceAggregatorCollector expected = new VarianceAggregatorCollector(4, 8.6d, 3.95d); + Assert.assertTrue(expected.equalsWithEpsilon((VarianceAggregatorCollector) aggFactory.combine(holder1, holder2), 0.00001)); + } + + @Test + public void testEqualsAndHashCode() throws Exception + { + VarianceAggregatorFactory one = new VarianceAggregatorFactory("name1", "fieldName1"); + VarianceAggregatorFactory oneMore = new VarianceAggregatorFactory("name1", "fieldName1"); + VarianceAggregatorFactory two = new VarianceAggregatorFactory("name2", "fieldName2"); + + Assert.assertEquals(one.hashCode(), oneMore.hashCode()); + + Assert.assertTrue(one.equals(oneMore)); + Assert.assertFalse(one.equals(two)); + } + + private void aggregate(TestFloatColumnSelector selector, VarianceAggregator agg) + { + agg.aggregate(); + selector.increment(); + } + + private void aggregate( + TestFloatColumnSelector selector, + VarianceBufferAggregator agg, + ByteBuffer buff, + int position + ) + { + agg.aggregate(buff, position); + selector.increment(); + } +} diff --git a/extensions-core/stats/src/test/java/io/druid/query/aggregation/variance/VarianceGroupByQueryTest.java b/extensions-core/stats/src/test/java/io/druid/query/aggregation/variance/VarianceGroupByQueryTest.java new file mode 100644 index 000000000000..a45901bf7422 --- /dev/null +++ b/extensions-core/stats/src/test/java/io/druid/query/aggregation/variance/VarianceGroupByQueryTest.java @@ -0,0 +1,226 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.query.aggregation.variance; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; +import com.google.common.util.concurrent.MoreExecutors; +import io.druid.data.input.Row; +import io.druid.granularity.PeriodGranularity; +import io.druid.query.QueryRunner; +import io.druid.query.QueryRunnerTestHelper; +import io.druid.query.aggregation.AggregatorFactory; +import io.druid.query.aggregation.LongSumAggregatorFactory; +import io.druid.query.aggregation.PostAggregator; +import io.druid.query.dimension.DefaultDimensionSpec; +import io.druid.query.dimension.DimensionSpec; +import io.druid.query.groupby.GroupByQuery; +import io.druid.query.groupby.GroupByQueryConfig; +import io.druid.query.groupby.GroupByQueryRunnerFactory; +import io.druid.query.groupby.GroupByQueryRunnerTest; +import io.druid.query.groupby.GroupByQueryRunnerTestHelper; +import io.druid.query.groupby.having.GreaterThanHavingSpec; +import io.druid.query.groupby.having.HavingSpec; +import io.druid.query.groupby.having.OrHavingSpec; +import io.druid.query.groupby.orderby.DefaultLimitSpec; +import io.druid.query.groupby.orderby.OrderByColumnSpec; +import io.druid.segment.TestHelper; +import org.joda.time.Period; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; + +/** + */ +@RunWith(Parameterized.class) +public class VarianceGroupByQueryTest +{ + private final GroupByQueryConfig config; + private final QueryRunner runner; + private final GroupByQueryRunnerFactory factory; + + @Parameterized.Parameters + public static Collection constructorFeeder() throws IOException + { + return GroupByQueryRunnerTest.constructorFeeder(); + } + + public VarianceGroupByQueryTest(GroupByQueryConfig config, GroupByQueryRunnerFactory factory, QueryRunner runner) + { + this.config = config; + this.factory = factory; + this.runner = factory.mergeRunners(MoreExecutors.sameThreadExecutor(), ImmutableList.>of(runner)); + } + + @Test + public void testGroupByVarianceOnly() + { + GroupByQuery query = GroupByQuery + .builder() + .setDataSource(QueryRunnerTestHelper.dataSource) + .setQuerySegmentSpec(QueryRunnerTestHelper.firstToThird) + .setDimensions(Lists.newArrayList(new DefaultDimensionSpec("quality", "alias"))) + .setAggregatorSpecs(Arrays.asList(VarianceTestHelper.indexVarianceAggr)) + .setPostAggregatorSpecs(Arrays.asList(VarianceTestHelper.stddevOfIndexPostAggr)) + .setGranularity(QueryRunnerTestHelper.dayGran) + .build(); + + VarianceTestHelper.RowBuilder builder = + new VarianceTestHelper.RowBuilder(new String[]{"alias", "index_stddev", "index_var"}); + + List expectedResults = builder + .add("2011-04-01", "automotive", 0d, 0d) + .add("2011-04-01", "business", 0d, 0d) + .add("2011-04-01", "entertainment", 0d, 0d) + .add("2011-04-01", "health", 0d, 0d) + .add("2011-04-01", "mezzanine", 737.0179286322613d, 543195.4271253889d) + .add("2011-04-01", "news", 0d, 0d) + .add("2011-04-01", "premium", 726.6322593583996d, 527994.4403402924d) + .add("2011-04-01", "technology", 0d, 0d) + .add("2011-04-01", "travel", 0d, 0d) + + .add("2011-04-02", "automotive", 0d, 0d) + .add("2011-04-02", "business", 0d, 0d) + .add("2011-04-02", "entertainment", 0d, 0d) + .add("2011-04-02", "health", 0d, 0d) + .add("2011-04-02", "mezzanine", 611.3420766546617d, 373739.13468843425d) + .add("2011-04-02", "news", 0d, 0d) + .add("2011-04-02", "premium", 621.3898134843073d, 386125.30030206224d) + .add("2011-04-02", "technology", 0d, 0d) + .add("2011-04-02", "travel", 0d, 0d) + .build(); + + Iterable results = GroupByQueryRunnerTestHelper.runQuery(factory, runner, query); + TestHelper.assertExpectedObjects(expectedResults, results, ""); + } + + @Test + public void testGroupBy() + { + GroupByQuery query = GroupByQuery + .builder() + .setDataSource(QueryRunnerTestHelper.dataSource) + .setQuerySegmentSpec(QueryRunnerTestHelper.firstToThird) + .setDimensions(Lists.newArrayList(new DefaultDimensionSpec("quality", "alias"))) + .setAggregatorSpecs( + Arrays.asList( + VarianceTestHelper.rowsCount, + VarianceTestHelper.indexVarianceAggr, + new LongSumAggregatorFactory("idx", "index") + ) + ) + .setPostAggregatorSpecs( + Arrays.asList(VarianceTestHelper.stddevOfIndexPostAggr) + ) + .setGranularity(QueryRunnerTestHelper.dayGran) + .build(); + + VarianceTestHelper.RowBuilder builder = + new VarianceTestHelper.RowBuilder(new String[]{"alias", "rows", "idx", "index_stddev", "index_var"}); + + List expectedResults = builder + .add("2011-04-01", "automotive", 1L, 135L, 0d, 0d) + .add("2011-04-01", "business", 1L, 118L, 0d, 0d) + .add("2011-04-01", "entertainment", 1L, 158L, 0d, 0d) + .add("2011-04-01", "health", 1L, 120L, 0d, 0d) + .add("2011-04-01", "mezzanine", 3L, 2870L, 737.0179286322613d, 543195.4271253889d) + .add("2011-04-01", "news", 1L, 121L, 0d, 0d) + .add("2011-04-01", "premium", 3L, 2900L, 726.6322593583996d, 527994.4403402924d) + .add("2011-04-01", "technology", 1L, 78L, 0d, 0d) + .add("2011-04-01", "travel", 1L, 119L, 0d, 0d) + + .add("2011-04-02", "automotive", 1L, 147L, 0d, 0d) + .add("2011-04-02", "business", 1L, 112L, 0d, 0d) + .add("2011-04-02", "entertainment", 1L, 166L, 0d, 0d) + .add("2011-04-02", "health", 1L, 113L, 0d, 0d) + .add("2011-04-02", "mezzanine", 3L, 2447L, 611.3420766546617d, 373739.13468843425d) + .add("2011-04-02", "news", 1L, 114L, 0d, 0d) + .add("2011-04-02", "premium", 3L, 2505L, 621.3898134843073d, 386125.30030206224d) + .add("2011-04-02", "technology", 1L, 97L, 0d, 0d) + .add("2011-04-02", "travel", 1L, 126L, 0d, 0d) + .build(); + + Iterable results = GroupByQueryRunnerTestHelper.runQuery(factory, runner, query); + TestHelper.assertExpectedObjects(expectedResults, results, ""); + } + + @Test + public void testPostAggHavingSpec() + { + VarianceTestHelper.RowBuilder expect = new VarianceTestHelper.RowBuilder( + new String[]{"alias", "rows", "index", "index_var", "index_stddev"} + ); + + List expectedResults = expect + .add("2011-04-01", "automotive", 2L, 269L, 299.0009819048282, 17.29164485827847) + .add("2011-04-01", "mezzanine", 6L, 4420L, 254083.76447001836, 504.06722217380724) + .add("2011-04-01", "premium", 6L, 4416L, 252279.2020389339, 502.27403082275106) + .build(); + + GroupByQuery query = GroupByQuery + .builder() + .setDataSource(VarianceTestHelper.dataSource) + .setInterval("2011-04-02/2011-04-04") + .setDimensions(Lists.newArrayList(new DefaultDimensionSpec("quality", "alias"))) + .setAggregatorSpecs( + Arrays.asList( + VarianceTestHelper.rowsCount, + VarianceTestHelper.indexLongSum, + VarianceTestHelper.indexVarianceAggr + ) + ) + .setPostAggregatorSpecs(ImmutableList.of(VarianceTestHelper.stddevOfIndexPostAggr)) + .setGranularity(new PeriodGranularity(new Period("P1M"), null, null)) + .setHavingSpec( + new OrHavingSpec( + ImmutableList.of( + new GreaterThanHavingSpec(VarianceTestHelper.stddevOfIndexMetric, 15L) // 3 rows + ) + ) + ) + .build(); + + Iterable results = GroupByQueryRunnerTestHelper.runQuery(factory, runner, query); + TestHelper.assertExpectedObjects(expectedResults, results, ""); + + query = query.withLimitSpec( + new DefaultLimitSpec( + Arrays.asList( + OrderByColumnSpec.asc( + VarianceTestHelper.stddevOfIndexMetric + ) + ), 2 + ) + ); + + expectedResults = expect + .add("2011-04-01", "automotive", 2L, 269L, 299.0009819048282, 17.29164485827847) + .add("2011-04-01", "premium", 6L, 4416L, 252279.2020389339, 502.27403082275106) + .build(); + + results = GroupByQueryRunnerTestHelper.runQuery(factory, runner, query); + TestHelper.assertExpectedObjects(expectedResults, results, ""); + } +} diff --git a/extensions-core/stats/src/test/java/io/druid/query/aggregation/variance/VarianceSerdeTest.java b/extensions-core/stats/src/test/java/io/druid/query/aggregation/variance/VarianceSerdeTest.java new file mode 100644 index 000000000000..47f5b0008dfe --- /dev/null +++ b/extensions-core/stats/src/test/java/io/druid/query/aggregation/variance/VarianceSerdeTest.java @@ -0,0 +1,46 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.query.aggregation.variance; + +import io.druid.segment.data.ObjectStrategy; +import org.junit.Assert; +import org.junit.Test; + +import java.nio.ByteBuffer; +import java.util.Random; + +public class VarianceSerdeTest +{ + @Test + public void testSerde() + { + Random r = new Random(); + VarianceAggregatorCollector holder = new VarianceAggregatorCollector(); + ObjectStrategy strategy = new VarianceSerde().getObjectStrategy(); + Assert.assertEquals(VarianceAggregatorCollector.class, strategy.getClazz()); + + for (int i = 0; i < 100; i++) { + byte[] array = strategy.toBytes(holder); + Assert.assertArrayEquals(array, holder.toByteArray()); + Assert.assertEquals(holder, strategy.fromByteBuffer(ByteBuffer.wrap(array), array.length)); + holder.add(r.nextFloat()); + } + } +} diff --git a/extensions-core/stats/src/test/java/io/druid/query/aggregation/variance/VarianceTestHelper.java b/extensions-core/stats/src/test/java/io/druid/query/aggregation/variance/VarianceTestHelper.java new file mode 100644 index 000000000000..3799d03d06db --- /dev/null +++ b/extensions-core/stats/src/test/java/io/druid/query/aggregation/variance/VarianceTestHelper.java @@ -0,0 +1,106 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.query.aggregation.variance; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import io.druid.data.input.MapBasedRow; +import io.druid.data.input.Row; +import io.druid.query.QueryRunnerTestHelper; +import io.druid.query.aggregation.AggregatorFactory; +import io.druid.query.aggregation.PostAggregator; +import io.druid.query.aggregation.stats.DruidStatsModule; +import org.joda.time.DateTime; + +import java.util.Arrays; +import java.util.List; +import java.util.Map; + +/** + */ +public class VarianceTestHelper extends QueryRunnerTestHelper +{ + static { + DruidStatsModule module = new DruidStatsModule(); + module.configure(null); + } + + public static final String indexVarianceMetric = "index_var"; + + public static final VarianceAggregatorFactory indexVarianceAggr = new VarianceAggregatorFactory( + indexVarianceMetric, + indexMetric + ); + + public static final String stddevOfIndexMetric = "index_stddev"; + + public static final PostAggregator stddevOfIndexPostAggr = new StandardDeviationPostAggregator( + stddevOfIndexMetric, + indexVarianceMetric, + null + ); + + public static final List commonPlusVarAggregators = Arrays.asList( + rowsCount, + indexDoubleSum, + qualityUniques, + indexVarianceAggr + ); + + public static class RowBuilder + { + private final String[] names; + private final List rows = Lists.newArrayList(); + + public RowBuilder(String[] names) + { + this.names = names; + } + + public RowBuilder add(final String timestamp, Object... values) + { + rows.add(build(timestamp, values)); + return this; + } + + public List build() + { + try { + return Lists.newArrayList(rows); + } + finally { + rows.clear(); + } + } + + public Row build(final String timestamp, Object... values) + { + Preconditions.checkArgument(names.length == values.length); + + Map theVals = Maps.newHashMap(); + for (int i = 0; i < values.length; i++) { + theVals.put(names[i], values[i]); + } + DateTime ts = new DateTime(timestamp); + return new MapBasedRow(ts, theVals); + } + } +} diff --git a/extensions-core/stats/src/test/java/io/druid/query/aggregation/variance/VarianceTimeseriesQueryTest.java b/extensions-core/stats/src/test/java/io/druid/query/aggregation/variance/VarianceTimeseriesQueryTest.java new file mode 100644 index 000000000000..8080c089a6e1 --- /dev/null +++ b/extensions-core/stats/src/test/java/io/druid/query/aggregation/variance/VarianceTimeseriesQueryTest.java @@ -0,0 +1,121 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.query.aggregation.variance; + +import com.google.common.collect.Lists; +import com.metamx.common.guava.Sequences; +import io.druid.query.Druids; +import io.druid.query.QueryRunner; +import io.druid.query.Result; +import io.druid.query.aggregation.PostAggregator; +import io.druid.query.timeseries.TimeseriesQuery; +import io.druid.query.timeseries.TimeseriesQueryRunnerTest; +import io.druid.query.timeseries.TimeseriesResultValue; +import io.druid.segment.TestHelper; +import org.joda.time.DateTime; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.io.IOException; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; + +@RunWith(Parameterized.class) +public class VarianceTimeseriesQueryTest +{ + @Parameterized.Parameters(name="{0}:descending={1}") + public static Iterable constructorFeeder() throws IOException + { + return TimeseriesQueryRunnerTest.constructorFeeder(); + } + + private final QueryRunner runner; + private final boolean descending; + + public VarianceTimeseriesQueryTest(QueryRunner runner, boolean descending) + { + this.runner = runner; + this.descending = descending; + } + + @Test + public void testTimeseriesWithNullFilterOnNonExistentDimension() + { + TimeseriesQuery query = Druids.newTimeseriesQueryBuilder() + .dataSource(VarianceTestHelper.dataSource) + .granularity(VarianceTestHelper.dayGran) + .filters("bobby", null) + .intervals(VarianceTestHelper.firstToThird) + .aggregators(VarianceTestHelper.commonPlusVarAggregators) + .postAggregators( + Arrays.asList( + VarianceTestHelper.addRowsIndexConstant, + VarianceTestHelper.stddevOfIndexPostAggr + ) + ) + .descending(descending) + .build(); + + List> expectedResults = Arrays.asList( + new Result<>( + new DateTime("2011-04-01"), + new TimeseriesResultValue( + VarianceTestHelper.of( + "rows", 13L, + "index", 6626.151596069336, + "addRowsIndexConstant", 6640.151596069336, + "uniques", VarianceTestHelper.UNIQUES_9, + "index_var", descending ? 368885.6897238851 : 368885.689155086, + "index_stddev", descending ? 607.3596049490657 : 607.35960448081 + ) + ) + ), + new Result<>( + new DateTime("2011-04-02"), + new TimeseriesResultValue( + VarianceTestHelper.of( + "rows", 13L, + "index", 5833.2095947265625, + "addRowsIndexConstant", 5847.2095947265625, + "uniques", VarianceTestHelper.UNIQUES_9, + "index_var", descending ? 259061.6037088883 : 259061.60216419376, + "index_stddev", descending ? 508.9809463122252 : 508.98094479478675 + ) + ) + ) + ); + + Iterable> results = Sequences.toList( + runner.run(query, new HashMap()), + Lists.>newArrayList() + ); + assertExpectedResults(expectedResults, results); + } + + private void assertExpectedResults(Iterable> expectedResults, Iterable> results) + { + if (descending) { + expectedResults = TestHelper.revert(expectedResults); + } + TestHelper.assertExpectedResults(expectedResults, results); + } +} diff --git a/extensions-core/stats/src/test/java/io/druid/query/aggregation/variance/VarianceTopNQueryTest.java b/extensions-core/stats/src/test/java/io/druid/query/aggregation/variance/VarianceTopNQueryTest.java new file mode 100644 index 000000000000..eb2665d37935 --- /dev/null +++ b/extensions-core/stats/src/test/java/io/druid/query/aggregation/variance/VarianceTopNQueryTest.java @@ -0,0 +1,149 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.query.aggregation.variance; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import com.metamx.common.guava.Sequence; +import io.druid.query.QueryRunner; +import io.druid.query.QueryRunnerTestHelper; +import io.druid.query.Result; +import io.druid.query.aggregation.AggregatorFactory; +import io.druid.query.aggregation.DoubleMaxAggregatorFactory; +import io.druid.query.aggregation.DoubleMinAggregatorFactory; +import io.druid.query.aggregation.PostAggregator; +import io.druid.query.topn.TopNQuery; +import io.druid.query.topn.TopNQueryBuilder; +import io.druid.query.topn.TopNQueryConfig; +import io.druid.query.topn.TopNQueryQueryToolChest; +import io.druid.query.topn.TopNQueryRunnerTest; +import io.druid.query.topn.TopNResultValue; +import io.druid.segment.TestHelper; +import org.joda.time.DateTime; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.Map; + +@RunWith(Parameterized.class) +public class VarianceTopNQueryTest +{ + @Parameterized.Parameters + public static Iterable constructorFeeder() throws IOException + { + return TopNQueryRunnerTest.constructorFeeder(); + } + + private final QueryRunner runner; + + public VarianceTopNQueryTest( + QueryRunner runner + ) + { + this.runner = runner; + } + + @Test + public void testFullOnTopNOverUniques() + { + TopNQuery query = new TopNQueryBuilder() + .dataSource(QueryRunnerTestHelper.dataSource) + .granularity(QueryRunnerTestHelper.allGran) + .dimension(QueryRunnerTestHelper.marketDimension) + .metric(QueryRunnerTestHelper.uniqueMetric) + .threshold(3) + .intervals(QueryRunnerTestHelper.fullOnInterval) + .aggregators( + Lists.newArrayList( + Iterables.concat( + VarianceTestHelper.commonPlusVarAggregators, + Lists.newArrayList( + new DoubleMaxAggregatorFactory("maxIndex", "index"), + new DoubleMinAggregatorFactory("minIndex", "index") + ) + ) + ) + ) + .postAggregators(Arrays.asList(QueryRunnerTestHelper.addRowsIndexConstant)) + .build(); + + List> expectedResults = Arrays.asList( + new Result( + new DateTime("2011-01-12T00:00:00.000Z"), + new TopNResultValue( + Arrays.>asList( + ImmutableMap.builder() + .put("market", "spot") + .put("rows", 837L) + .put("index", 95606.57232284546D) + .put("addRowsIndexConstant", 96444.57232284546D) + .put("uniques", QueryRunnerTestHelper.UNIQUES_9) + .put("maxIndex", 277.2735290527344D) + .put("minIndex", 59.02102279663086D) + .put("index_var", 439.3851694586573D) + .build(), + ImmutableMap.builder() + .put("market", "total_market") + .put("rows", 186L) + .put("index", 215679.82879638672D) + .put("addRowsIndexConstant", 215866.82879638672D) + .put("uniques", QueryRunnerTestHelper.UNIQUES_2) + .put("maxIndex", 1743.9217529296875D) + .put("minIndex", 792.3260498046875D) + .put("index_var", 27679.900887366413D) + .build(), + ImmutableMap.builder() + .put("market", "upfront") + .put("rows", 186L) + .put("index", 192046.1060180664D) + .put("addRowsIndexConstant", 192233.1060180664D) + .put("uniques", QueryRunnerTestHelper.UNIQUES_2) + .put("maxIndex", 1870.06103515625D) + .put("minIndex", 545.9906005859375D) + .put("index_var", 79699.9780741607D) + .build() + ) + ) + ) + ); + assertExpectedResults(expectedResults, query); + } + + private Sequence> assertExpectedResults( + Iterable> expectedResults, + TopNQuery query + ) + { + final TopNQueryQueryToolChest chest = new TopNQueryQueryToolChest( + new TopNQueryConfig(), + QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator() + ); + final QueryRunner> mergeRunner = chest.mergeResults(runner); + final Sequence> retval = mergeRunner.run(query, ImmutableMap.of()); + TestHelper.assertExpectedResults(expectedResults, retval); + return retval; + } + +} diff --git a/pom.xml b/pom.xml index e01684a45cf3..aa80de846455 100644 --- a/pom.xml +++ b/pom.xml @@ -88,6 +88,7 @@ extensions-core/datasketches extensions-core/hdfs-storage extensions-core/histogram + extensions-core/stats extensions-core/kafka-eight extensions-core/kafka-extraction-namespace extensions-core/kafka-indexing-service diff --git a/processing/src/main/java/io/druid/query/aggregation/post/ArithmeticPostAggregator.java b/processing/src/main/java/io/druid/query/aggregation/post/ArithmeticPostAggregator.java index def4115fda8f..7fd4cb17333e 100644 --- a/processing/src/main/java/io/druid/query/aggregation/post/ArithmeticPostAggregator.java +++ b/processing/src/main/java/io/druid/query/aggregation/post/ArithmeticPostAggregator.java @@ -37,7 +37,7 @@ */ public class ArithmeticPostAggregator implements PostAggregator { - private static final Comparator DEFAULT_COMPARATOR = new Comparator() + public static final Comparator DEFAULT_COMPARATOR = new Comparator() { @Override public int compare(Object o, Object o1) diff --git a/processing/src/main/java/io/druid/query/groupby/GroupByQuery.java b/processing/src/main/java/io/druid/query/groupby/GroupByQuery.java index c76274e74992..b209dee8662d 100644 --- a/processing/src/main/java/io/druid/query/groupby/GroupByQuery.java +++ b/processing/src/main/java/io/druid/query/groupby/GroupByQuery.java @@ -386,6 +386,22 @@ public GroupByQuery withDimensionSpecs(final List dimensionSpecs) ); } + public GroupByQuery withLimitSpec(final LimitSpec limitSpec) + { + return new GroupByQuery( + getDataSource(), + getQuerySegmentSpec(), + getDimFilter(), + getGranularity(), + getDimensions(), + getAggregatorSpecs(), + getPostAggregatorSpecs(), + getHavingSpec(), + limitSpec, + getContext() + ); + } + public static class Builder { private DataSource dataSource; diff --git a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java index 45cef452b27d..cec4baf9d30f 100644 --- a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java +++ b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java @@ -773,6 +773,22 @@ public String getMetricType(String metric) return metricDesc != null ? metricDesc.getType() : null; } + public Class getMetricClass(String metric) + { + MetricDesc metricDesc = metricDescs.get(metric); + switch (metricDesc.getCapabilities().getType()) { + case COMPLEX: + return ComplexMetrics.getSerdeForType(metricDesc.getType()).getObjectStrategy().getClazz(); + case FLOAT: + return Float.TYPE; + case LONG: + return Long.TYPE; + case STRING: + return String.class; + } + return null; + } + public Interval getInterval() { return new Interval(minTimestamp, isEmpty() ? minTimestamp : gran.next(getMaxTimeMillis())); diff --git a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexStorageAdapter.java b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexStorageAdapter.java index f749bf9554ab..9b6f96b7717c 100644 --- a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexStorageAdapter.java +++ b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexStorageAdapter.java @@ -21,13 +21,11 @@ import com.google.common.base.Function; import com.google.common.base.Predicate; -import com.google.common.base.Splitter; import com.google.common.base.Strings; import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterators; import com.google.common.collect.Lists; import com.google.common.primitives.Ints; -import com.metamx.common.UOE; import com.metamx.common.guava.Sequence; import com.metamx.common.guava.Sequences; import io.druid.granularity.QueryGranularity; @@ -57,8 +55,6 @@ import io.druid.segment.data.ListIndexed; import io.druid.segment.filter.BooleanValueMatcher; import io.druid.segment.filter.Filters; -import io.druid.segment.serde.ComplexMetricSerde; -import io.druid.segment.serde.ComplexMetrics; import org.joda.time.DateTime; import org.joda.time.Interval; @@ -75,7 +71,6 @@ */ public class IncrementalIndexStorageAdapter implements StorageAdapter { - private static final Splitter SPLITTER = Splitter.on(","); private static final NullDimensionSelector NULL_DIMENSION_SELECTOR = new NullDimensionSelector(); private final IncrementalIndex index; @@ -308,7 +303,7 @@ public void reset() } if (Thread.interrupted()) { - throw new QueryInterruptedException( new InterruptedException()); + throw new QueryInterruptedException(new InterruptedException()); } boolean foundMatched = false; @@ -533,14 +528,13 @@ public Long get() final Integer metricIndexInt = index.getMetricIndex(column); if (metricIndexInt != null) { final int metricIndex = metricIndexInt; - - final ComplexMetricSerde serde = ComplexMetrics.getSerdeForType(index.getMetricType(column)); + final Class classOfObject = index.getMetricClass(column); return new ObjectColumnSelector() { @Override public Class classOfObject() { - return serde.getObjectStrategy().getClazz(); + return classOfObject; } @Override diff --git a/processing/src/test/java/io/druid/query/QueryRunnerTestHelper.java b/processing/src/test/java/io/druid/query/QueryRunnerTestHelper.java index c3b4b3d02dcf..6198951afce7 100644 --- a/processing/src/test/java/io/druid/query/QueryRunnerTestHelper.java +++ b/processing/src/test/java/io/druid/query/QueryRunnerTestHelper.java @@ -21,6 +21,7 @@ import com.google.common.base.Function; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.util.concurrent.ListenableFuture; @@ -100,6 +101,7 @@ public TableDataSource apply(@Nullable String input) public static final QueryGranularity dayGran = QueryGranularities.DAY; public static final QueryGranularity allGran = QueryGranularities.ALL; + public static final String timeDimension = "__time"; public static final String marketDimension = "market"; public static final String qualityDimension = "quality"; public static final String placementDimension = "placement"; @@ -119,9 +121,9 @@ public TableDataSource apply(@Nullable String input) public static String dependentPostAggMetric = "dependentPostAgg"; public static final CountAggregatorFactory rowsCount = new CountAggregatorFactory("rows"); - public static final LongSumAggregatorFactory indexLongSum = new LongSumAggregatorFactory("index", "index"); - public static final LongSumAggregatorFactory __timeLongSum = new LongSumAggregatorFactory("sumtime", "__time"); - public static final DoubleSumAggregatorFactory indexDoubleSum = new DoubleSumAggregatorFactory("index", "index"); + public static final LongSumAggregatorFactory indexLongSum = new LongSumAggregatorFactory("index", indexMetric); + public static final LongSumAggregatorFactory __timeLongSum = new LongSumAggregatorFactory("sumtime", timeDimension); + public static final DoubleSumAggregatorFactory indexDoubleSum = new DoubleSumAggregatorFactory("index", indexMetric); public static final String JS_COMBINE_A_PLUS_B = "function combine(a, b) { return a + b; }"; public static final String JS_RESET_0 = "function reset() { return 0; }"; public static final JavaScriptAggregatorFactory jsIndexSumIfPlacementishA = new JavaScriptAggregatorFactory( @@ -515,4 +517,13 @@ public Sequence run(Query query, Map responseContext) } }; } + + public static Map of(Object... keyvalues) + { + ImmutableMap.Builder builder = ImmutableMap.builder(); + for (int i = 0; i < keyvalues.length; i += 2) { + builder.put(String.valueOf(keyvalues[i]), keyvalues[i + 1]); + } + return builder.build(); + } } diff --git a/processing/src/test/java/io/druid/query/select/SelectQueryRunnerTest.java b/processing/src/test/java/io/druid/query/select/SelectQueryRunnerTest.java index a7bc02563805..20ae203b9af2 100644 --- a/processing/src/test/java/io/druid/query/select/SelectQueryRunnerTest.java +++ b/processing/src/test/java/io/druid/query/select/SelectQueryRunnerTest.java @@ -695,7 +695,7 @@ private static void verify( if (acHolder.getEvent().get(ex.getKey()) instanceof Double) { actVal = ((Double) actVal).floatValue(); } - Assert.assertEquals(ex.getValue(), actVal); + Assert.assertEquals("invalid value for " + ex.getKey(), ex.getValue(), actVal); } }