From 0c10ea20c34eab31bce375afccc86983874ba0fd Mon Sep 17 00:00:00 2001 From: leventov Date: Wed, 9 Aug 2017 20:33:42 +0300 Subject: [PATCH 1/5] Add MetricCombiners --- .../DistinctCountAggregatorFactory.java | 9 +++ .../TimestampAggregatorFactory.java | 9 ++- .../theta/SketchAggregatorFactory.java | 47 +++++++++++++ .../datasketches/theta/SketchHolder.java | 4 +- .../histogram/ApproximateHistogram.java | 8 ++- .../ApproximateHistogramAggregator.java | 2 +- ...ApproximateHistogramAggregatorFactory.java | 46 ++++++++++++- .../variance/VarianceAggregatorCollector.java | 43 +++++++----- .../variance/VarianceAggregatorFactory.java | 54 +++++++++++++-- .../io/druid/hll/HyperLogLogCollector.java | 3 +- .../druid/query/aggregation/Aggregator.java | 8 +-- .../query/aggregation/AggregatorFactory.java | 35 ++++++---- .../aggregation/CountAggregatorFactory.java | 6 ++ .../DoubleMaxAggregatorFactory.java | 6 ++ .../aggregation/DoubleMaxMetricCombiner.java | 45 +++++++++++++ .../aggregation/DoubleMetricCombiner.java | 35 ++++++++++ .../DoubleMinAggregatorFactory.java | 6 ++ .../aggregation/DoubleMinMetricCombiner.java | 45 +++++++++++++ .../DoubleSumAggregatorFactory.java | 6 ++ .../aggregation/DoubleSumMetricCombiner.java | 45 +++++++++++++ .../FilteredAggregatorFactory.java | 6 ++ .../FloatMaxAggregatorFactory.java | 6 ++ .../FloatMinAggregatorFactory.java | 6 ++ .../FloatSumAggregatorFactory.java | 6 ++ .../io/druid/query/aggregation/Histogram.java | 22 ++++++ .../aggregation/HistogramAggregator.java | 2 +- .../HistogramAggregatorFactory.java | 47 +++++++++++++ .../JavaScriptAggregatorFactory.java | 28 ++++++++ .../aggregation/LongMaxAggregatorFactory.java | 28 ++++++++ .../query/aggregation/LongMetricCombiner.java | 35 ++++++++++ .../aggregation/LongMinAggregatorFactory.java | 28 ++++++++ .../aggregation/LongSumAggregatorFactory.java | 6 ++ .../aggregation/LongSumMetricCombiner.java | 45 +++++++++++++ .../query/aggregation/MetricCombiner.java | 65 ++++++++++++++++++ .../aggregation/ObjectMetricCombiner.java | 29 ++++++++ .../CardinalityAggregatorFactory.java | 7 ++ .../HyperLogLogCollectorMetricCombiner.java | 67 +++++++++++++++++++ .../first/DoubleFirstAggregatorFactory.java | 8 +++ .../first/FloatFirstAggregatorFactory.java | 8 +++ .../first/LongFirstAggregatorFactory.java | 8 +++ .../HyperUniquesAggregatorFactory.java | 8 +++ .../last/DoubleLastAggregatorFactory.java | 8 +++ .../last/FloatLastAggregatorFactory.java | 8 +++ .../last/LongLastAggregatorFactory.java | 7 ++ .../druid/segment/ObjectColumnSelector.java | 21 +++++- 45 files changed, 916 insertions(+), 55 deletions(-) create mode 100644 processing/src/main/java/io/druid/query/aggregation/DoubleMaxMetricCombiner.java create mode 100644 processing/src/main/java/io/druid/query/aggregation/DoubleMetricCombiner.java create mode 100644 processing/src/main/java/io/druid/query/aggregation/DoubleMinMetricCombiner.java create mode 100644 processing/src/main/java/io/druid/query/aggregation/DoubleSumMetricCombiner.java create mode 100644 processing/src/main/java/io/druid/query/aggregation/LongMetricCombiner.java create mode 100644 processing/src/main/java/io/druid/query/aggregation/LongSumMetricCombiner.java create mode 100644 processing/src/main/java/io/druid/query/aggregation/MetricCombiner.java create mode 100644 processing/src/main/java/io/druid/query/aggregation/ObjectMetricCombiner.java create mode 100644 processing/src/main/java/io/druid/query/aggregation/cardinality/HyperLogLogCollectorMetricCombiner.java diff --git a/extensions-contrib/distinctcount/src/main/java/io/druid/query/aggregation/distinctcount/DistinctCountAggregatorFactory.java b/extensions-contrib/distinctcount/src/main/java/io/druid/query/aggregation/distinctcount/DistinctCountAggregatorFactory.java index a6578b9de2e9..4030e996b873 100644 --- a/extensions-contrib/distinctcount/src/main/java/io/druid/query/aggregation/distinctcount/DistinctCountAggregatorFactory.java +++ b/extensions-contrib/distinctcount/src/main/java/io/druid/query/aggregation/distinctcount/DistinctCountAggregatorFactory.java @@ -29,6 +29,8 @@ import io.druid.query.aggregation.AggregatorUtil; import io.druid.query.aggregation.BufferAggregator; import io.druid.query.aggregation.LongSumAggregatorFactory; +import io.druid.query.aggregation.LongSumMetricCombiner; +import io.druid.query.aggregation.MetricCombiner; import io.druid.query.dimension.DefaultDimensionSpec; import io.druid.segment.ColumnSelectorFactory; import io.druid.segment.DimensionSelector; @@ -119,6 +121,13 @@ public Object combine(Object lhs, Object rhs) return ((Number) lhs).longValue() + ((Number) rhs).longValue(); } + @Override + public MetricCombiner makeMetricCombiner() + { + // This is likely wrong as well as combine(), see https://github.com/druid-io/druid/pull/2602#issuecomment-321224202 + return new LongSumMetricCombiner(); + } + @Override public AggregatorFactory getCombiningFactory() { diff --git a/extensions-contrib/time-min-max/src/main/java/io/druid/query/aggregation/TimestampAggregatorFactory.java b/extensions-contrib/time-min-max/src/main/java/io/druid/query/aggregation/TimestampAggregatorFactory.java index 4466aa92a51b..587ab6b73745 100644 --- a/extensions-contrib/time-min-max/src/main/java/io/druid/query/aggregation/TimestampAggregatorFactory.java +++ b/extensions-contrib/time-min-max/src/main/java/io/druid/query/aggregation/TimestampAggregatorFactory.java @@ -21,8 +21,9 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.primitives.Longs; -import io.druid.java.util.common.StringUtils; import io.druid.data.input.impl.TimestampSpec; +import io.druid.java.util.common.StringUtils; +import io.druid.java.util.common.UOE; import io.druid.segment.ColumnSelectorFactory; import org.joda.time.DateTime; @@ -85,6 +86,12 @@ public Object combine(Object lhs, Object rhs) return TimestampAggregator.combineValues(lhs, rhs); } + @Override + public MetricCombiner makeMetricCombiner() + { + throw new UOE("[%s] is not supported during ingestion for rollup", getClass().getSimpleName()); + } + @Override public AggregatorFactory getCombiningFactory() { diff --git a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchAggregatorFactory.java b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchAggregatorFactory.java index f09241409926..9bdca1a8bffa 100644 --- a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchAggregatorFactory.java +++ b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchAggregatorFactory.java @@ -22,15 +22,21 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Preconditions; import com.google.common.primitives.Ints; +import com.yahoo.sketches.Family; import com.yahoo.sketches.Util; import com.yahoo.sketches.theta.SetOperation; +import com.yahoo.sketches.theta.Union; import io.druid.java.util.common.StringUtils; import io.druid.query.aggregation.Aggregator; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.BufferAggregator; +import io.druid.query.aggregation.MetricCombiner; +import io.druid.query.aggregation.ObjectMetricCombiner; import io.druid.segment.ColumnSelectorFactory; +import io.druid.segment.ColumnValueSelector; import io.druid.segment.ObjectColumnSelector; +import javax.annotation.Nullable; import java.nio.ByteBuffer; import java.util.Collections; import java.util.Comparator; @@ -98,6 +104,47 @@ public Object combine(Object lhs, Object rhs) return SketchHolder.combine(lhs, rhs, size); } + @Override + public MetricCombiner makeMetricCombiner() + { + return new ObjectMetricCombiner() + { + private final Union union = (Union) SetOperation.builder().build(size, Family.UNION); + private final SketchHolder combined = SketchHolder.of(union); + + @Override + public void reset(ColumnValueSelector selector) + { + union.reset(); + combine(selector); + } + + @Override + public void combine(ColumnValueSelector selector) + { + @SuppressWarnings("unchecked") + SketchHolder other = ((ObjectColumnSelector) selector).get(); + // SketchAggregatorFactory.combine() delegates to SketchHolder.combine() and it doesn't check for nulls, so we + // neither. + other.updateUnion(union); + combined.invalidateCache(); + } + + @Override + public Class classOfObject() + { + return SketchHolder.class; + } + + @Nullable + @Override + public SketchHolder get() + { + return combined; + } + }; + } + @Override @JsonProperty public String getName() diff --git a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchHolder.java b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchHolder.java index 891d39754047..33a1779b9e03 100644 --- a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchHolder.java +++ b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchHolder.java @@ -171,7 +171,7 @@ public SketchEstimateWithErrorBounds getEstimateWithErrorBounds(int errorBoundsS return result; } - public static Object combine(Object o1, Object o2, int nomEntries) + public static SketchHolder combine(Object o1, Object o2, int nomEntries) { SketchHolder holder1 = (SketchHolder) o1; SketchHolder holder2 = (SketchHolder) o2; @@ -194,7 +194,7 @@ public static Object combine(Object o1, Object o2, int nomEntries) } } - private void invalidateCache() + void invalidateCache() { cachedEstimate = null; cachedSketch = null; diff --git a/extensions-core/histogram/src/main/java/io/druid/query/aggregation/histogram/ApproximateHistogram.java b/extensions-core/histogram/src/main/java/io/druid/query/aggregation/histogram/ApproximateHistogram.java index 494b18681ec3..618451272139 100644 --- a/extensions-core/histogram/src/main/java/io/druid/query/aggregation/histogram/ApproximateHistogram.java +++ b/extensions-core/histogram/src/main/java/io/druid/query/aggregation/histogram/ApproximateHistogram.java @@ -534,9 +534,11 @@ public ApproximateHistogram foldFast(ApproximateHistogram h, float[] mergedPosit */ public ApproximateHistogram copy(ApproximateHistogram h) { - this.size = h.size; - this.positions = new float[size]; - this.bins = new long[size]; + if (h.size > this.size) { + this.size = h.size; + this.positions = new float[size]; + this.bins = new long[size]; + } System.arraycopy(h.positions, 0, this.positions, 0, h.binCount); System.arraycopy(h.bins, 0, this.bins, 0, h.binCount); diff --git a/extensions-core/histogram/src/main/java/io/druid/query/aggregation/histogram/ApproximateHistogramAggregator.java b/extensions-core/histogram/src/main/java/io/druid/query/aggregation/histogram/ApproximateHistogramAggregator.java index 3601aa5da8a5..fcc4951d19ee 100644 --- a/extensions-core/histogram/src/main/java/io/druid/query/aggregation/histogram/ApproximateHistogramAggregator.java +++ b/extensions-core/histogram/src/main/java/io/druid/query/aggregation/histogram/ApproximateHistogramAggregator.java @@ -36,7 +36,7 @@ public int compare(Object o, Object o1) } }; - static Object combineHistograms(Object lhs, Object rhs) + static ApproximateHistogram combineHistograms(Object lhs, Object rhs) { return ((ApproximateHistogram) lhs).foldFast((ApproximateHistogram) rhs); } diff --git a/extensions-core/histogram/src/main/java/io/druid/query/aggregation/histogram/ApproximateHistogramAggregatorFactory.java b/extensions-core/histogram/src/main/java/io/druid/query/aggregation/histogram/ApproximateHistogramAggregatorFactory.java index 7bfa9f05c109..4641b04e6c81 100644 --- a/extensions-core/histogram/src/main/java/io/druid/query/aggregation/histogram/ApproximateHistogramAggregatorFactory.java +++ b/extensions-core/histogram/src/main/java/io/druid/query/aggregation/histogram/ApproximateHistogramAggregatorFactory.java @@ -26,16 +26,20 @@ import com.google.common.base.Preconditions; import com.google.common.primitives.Floats; import com.google.common.primitives.Ints; - import io.druid.java.util.common.StringUtils; import io.druid.query.aggregation.Aggregator; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.AggregatorFactoryNotMergeableException; import io.druid.query.aggregation.AggregatorUtil; import io.druid.query.aggregation.BufferAggregator; +import io.druid.query.aggregation.MetricCombiner; +import io.druid.query.aggregation.ObjectMetricCombiner; import io.druid.segment.ColumnSelectorFactory; +import io.druid.segment.ColumnValueSelector; +import io.druid.segment.ObjectColumnSelector; import org.apache.commons.codec.binary.Base64; +import javax.annotation.Nullable; import java.nio.ByteBuffer; import java.util.Arrays; import java.util.Collections; @@ -111,6 +115,46 @@ public Object combine(Object lhs, Object rhs) return ApproximateHistogramAggregator.combineHistograms(lhs, rhs); } + @Override + public MetricCombiner makeMetricCombiner() + { + // ApproximateHistogramAggregatorFactory.combine() delegates to ApproximateHistogramAggregator.combineHistograms() + // and it doesn't check for nulls, so this MetricCombiner neither. + return new ObjectMetricCombiner() + { + private final ApproximateHistogram combined = new ApproximateHistogram(); + + @Override + public void reset(ColumnValueSelector selector) + { + @SuppressWarnings("unchecked") + ApproximateHistogram first = ((ObjectColumnSelector) selector).get(); + combined.copy(first); + } + + @Override + public void combine(ColumnValueSelector selector) + { + @SuppressWarnings("unchecked") + ApproximateHistogram other = ((ObjectColumnSelector) selector).get(); + combined.foldFast(other); + } + + @Override + public Class classOfObject() + { + return ApproximateHistogram.class; + } + + @Nullable + @Override + public ApproximateHistogram get() + { + return combined; + } + }; + } + @Override public AggregatorFactory getCombiningFactory() { diff --git a/extensions-core/stats/src/main/java/io/druid/query/aggregation/variance/VarianceAggregatorCollector.java b/extensions-core/stats/src/main/java/io/druid/query/aggregation/variance/VarianceAggregatorCollector.java index 3c388ad8c3a0..3d048cf835a6 100644 --- a/extensions-core/stats/src/main/java/io/druid/query/aggregation/variance/VarianceAggregatorCollector.java +++ b/extensions-core/stats/src/main/java/io/druid/query/aggregation/variance/VarianceAggregatorCollector.java @@ -74,29 +74,29 @@ public int compare(VarianceAggregatorCollector o1, VarianceAggregatorCollector o } }; - static Object combineValues(Object lhs, Object rhs) + void fold(VarianceAggregatorCollector other) { - final VarianceAggregatorCollector holder1 = (VarianceAggregatorCollector) lhs; - final VarianceAggregatorCollector holder2 = (VarianceAggregatorCollector) rhs; - - if (holder2.count == 0) { - return holder1; + if (other.count == 0) { + return; } - if (holder1.count == 0) { - holder1.nvariance = holder2.nvariance; - holder1.count = holder2.count; - holder1.sum = holder2.sum; - return holder1; + if (this.count == 0) { + this.nvariance = other.nvariance; + this.count = other.count; + this.sum = other.sum; + return; } + final double ratio = this.count / (double) other.count; + final double t = this.sum / ratio - other.sum; - final double ratio = holder1.count / (double) holder2.count; - final double t = holder1.sum / ratio - holder2.sum; - - holder1.nvariance += holder2.nvariance + (ratio / (holder1.count + holder2.count) * t * t); - holder1.count += holder2.count; - holder1.sum += holder2.sum; + this.nvariance += other.nvariance + (ratio / (this.count + other.count) * t * t); + this.count += other.count; + this.sum += other.sum; + } - return holder1; + static Object combineValues(Object lhs, Object rhs) + { + ((VarianceAggregatorCollector) lhs).fold((VarianceAggregatorCollector) rhs); + return lhs; } static int getMaxIntermediateSize() @@ -120,6 +120,13 @@ public void reset() nvariance = 0; } + void copyFrom(VarianceAggregatorCollector other) + { + this.count = other.count; + this.sum = other.sum; + this.nvariance = other.nvariance; + } + public VarianceAggregatorCollector(long count, double sum, double nvariance) { this.count = count; diff --git a/extensions-core/stats/src/main/java/io/druid/query/aggregation/variance/VarianceAggregatorFactory.java b/extensions-core/stats/src/main/java/io/druid/query/aggregation/variance/VarianceAggregatorFactory.java index 4af691543840..fa41ea1599c9 100644 --- a/extensions-core/stats/src/main/java/io/druid/query/aggregation/variance/VarianceAggregatorFactory.java +++ b/extensions-core/stats/src/main/java/io/druid/query/aggregation/variance/VarianceAggregatorFactory.java @@ -30,9 +30,12 @@ import io.druid.query.aggregation.AggregatorFactoryNotMergeableException; import io.druid.query.aggregation.AggregatorUtil; import io.druid.query.aggregation.BufferAggregator; +import io.druid.query.aggregation.MetricCombiner; import io.druid.query.aggregation.NoopAggregator; import io.druid.query.aggregation.NoopBufferAggregator; +import io.druid.query.aggregation.ObjectMetricCombiner; import io.druid.segment.ColumnSelectorFactory; +import io.druid.segment.ColumnValueSelector; import io.druid.segment.ObjectColumnSelector; import org.apache.commons.codec.binary.Base64; @@ -135,6 +138,51 @@ public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory) ); } + @Override + public Object combine(Object lhs, Object rhs) + { + return VarianceAggregatorCollector.combineValues(lhs, rhs); + } + + @Override + public MetricCombiner makeMetricCombiner() + { + // VarianceAggregatorFactory.combine() delegates to VarianceAggregatorCollector.combineValues() and it doesn't check + // for nulls, so this MetricCombiner neither. + return new ObjectMetricCombiner() + { + private final VarianceAggregatorCollector combined = new VarianceAggregatorCollector(); + + @Override + public void reset(ColumnValueSelector selector) + { + @SuppressWarnings("unchecked") + VarianceAggregatorCollector first = ((ObjectColumnSelector) selector).get(); + combined.copyFrom(first); + } + + @Override + public void combine(ColumnValueSelector selector) + { + @SuppressWarnings("unchecked") + VarianceAggregatorCollector other = ((ObjectColumnSelector) selector).get(); + combined.fold(other); + } + + @Override + public Class classOfObject() + { + return VarianceAggregatorCollector.class; + } + + @Override + public VarianceAggregatorCollector get() + { + return combined; + } + }; + } + @Override public AggregatorFactory getCombiningFactory() { @@ -163,12 +211,6 @@ public Comparator getComparator() return VarianceAggregatorCollector.COMPARATOR; } - @Override - public Object combine(Object lhs, Object rhs) - { - return VarianceAggregatorCollector.combineValues(lhs, rhs); - } - @Override public Object finalizeComputation(Object object) { diff --git a/hll/src/main/java/io/druid/hll/HyperLogLogCollector.java b/hll/src/main/java/io/druid/hll/HyperLogLogCollector.java index 3b2a283c0052..51f6770ece50 100644 --- a/hll/src/main/java/io/druid/hll/HyperLogLogCollector.java +++ b/hll/src/main/java/io/druid/hll/HyperLogLogCollector.java @@ -24,6 +24,7 @@ import io.druid.java.util.common.IAE; import io.druid.java.util.common.ISE; +import javax.annotation.Nullable; import java.nio.ByteBuffer; /** @@ -358,7 +359,7 @@ public void add(short bucket, byte positionOf1) } } - public HyperLogLogCollector fold(HyperLogLogCollector other) + public HyperLogLogCollector fold(@Nullable HyperLogLogCollector other) { if (other == null || other.storageBuffer.remaining() == 0) { return this; diff --git a/processing/src/main/java/io/druid/query/aggregation/Aggregator.java b/processing/src/main/java/io/druid/query/aggregation/Aggregator.java index fb4689fe009a..847fdcb67838 100644 --- a/processing/src/main/java/io/druid/query/aggregation/Aggregator.java +++ b/processing/src/main/java/io/druid/query/aggregation/Aggregator.java @@ -27,12 +27,8 @@ * it can use to get at the next bit of data. * * Thus, an Aggregator can be thought of as a closure over some other thing that is stateful and changes between calls - * to aggregate(). This is currently (as of this documentation) implemented through the use of Offset and - * FloatColumnSelector objects. The Aggregator has a handle on a FloatColumnSelector object which has a handle on an Offset. - * QueryableIndex has both the Aggregators and the Offset object and iterates through the Offset calling the aggregate() - * method on the Aggregators for each applicable row. - * - * This interface is old and going away. It is being replaced by BufferAggregator + * to aggregate(). This is currently (as of this documentation) implemented through the use of {@link + * io.druid.segment.ColumnValueSelector} objects. */ public interface Aggregator extends Closeable { diff --git a/processing/src/main/java/io/druid/query/aggregation/AggregatorFactory.java b/processing/src/main/java/io/druid/query/aggregation/AggregatorFactory.java index 388ad9114277..900415d50bfa 100644 --- a/processing/src/main/java/io/druid/query/aggregation/AggregatorFactory.java +++ b/processing/src/main/java/io/druid/query/aggregation/AggregatorFactory.java @@ -24,20 +24,16 @@ import io.druid.java.util.common.logger.Logger; import io.druid.segment.ColumnSelectorFactory; +import javax.annotation.Nullable; import java.util.Comparator; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; /** - * Processing related interface - * - * An AggregatorFactory is an object that knows how to generate an Aggregator using a ColumnSelectorFactory. - * - * This is useful as an abstraction to allow Aggregator classes to be written in terms of MetricSelector objects - * without making any assumptions about how they are pulling values out of the base data. That is, the data is - * provided to the Aggregator through the MetricSelector object, so whatever creates that object gets to choose how - * the data is actually stored and accessed. + * AggregatorFactory is a strategy (in the terms of Design Patterns) that represents column aggregation, e. g. min, + * max, sum of metric columns, or cardinality of dimension columns (see {@link + * io.druid.query.aggregation.cardinality.CardinalityAggregatorFactory}). */ public abstract class AggregatorFactory implements Cacheable { @@ -50,10 +46,10 @@ public abstract class AggregatorFactory implements Cacheable public abstract Comparator getComparator(); /** - * A method that knows how to combine the outputs of the getIntermediate() method from the Aggregators - * produced via factorize(). Note, even though this is called combine, this method's contract *does* - * allow for mutation of the input objects. Thus, any use of lhs or rhs after calling this method is - * highly discouraged. + * A method that knows how to combine the outputs of {@link Aggregator#get} produced via {@link #factorize} or {@link + * BufferAggregator#get} produced via {@link #factorizeBuffered}. Note, even though this method is called "combine", + * this method's contract *does* allow for mutation of the input objects. Thus, any use of lhs or rhs after calling + * this method is highly discouraged. * * @param lhs The left hand side of the combine * @param rhs The right hand side of the combine @@ -62,6 +58,20 @@ public abstract class AggregatorFactory implements Cacheable */ public abstract Object combine(Object lhs, Object rhs); + /** + * Creates a MetricCombiner to combine rollup aggregation results from serveral "rows" of different indexes during + * index merging. MetricCombiner implements the same logic as {@link #combine}, with the difference that it uses + * {@link io.druid.segment.ColumnValueSelector} and it's subinterfaces to get inputs and implements {@code + * ColumnValueSelector} to provide output. + * + * @see MetricCombiner + * @see io.druid.segment.IndexMerger + */ + public MetricCombiner makeMetricCombiner() + { + throw new UOE("[%s] does not implement makeMetricCombiner()", this.getClass().getName()); + } + /** * Returns an AggregatorFactory that can be used to combine the output of aggregators from this factory. This * generally amounts to simply creating a new factory that is the same as the current except with its input @@ -134,6 +144,7 @@ public AggregatorFactory getMergingFactory(AggregatorFactory other) throws Aggre * * @return merged AggregatorFactory[] or Null if merging is not possible. */ + @Nullable public static AggregatorFactory[] mergeAggregators(List aggregatorsList) { if (aggregatorsList == null || aggregatorsList.isEmpty()) { diff --git a/processing/src/main/java/io/druid/query/aggregation/CountAggregatorFactory.java b/processing/src/main/java/io/druid/query/aggregation/CountAggregatorFactory.java index ba7677cbcd86..3e023e122737 100644 --- a/processing/src/main/java/io/druid/query/aggregation/CountAggregatorFactory.java +++ b/processing/src/main/java/io/druid/query/aggregation/CountAggregatorFactory.java @@ -70,6 +70,12 @@ public Object combine(Object lhs, Object rhs) return CountAggregator.combineValues(lhs, rhs); } + @Override + public MetricCombiner makeMetricCombiner() + { + return new LongSumMetricCombiner(); + } + @Override public AggregatorFactory getCombiningFactory() { diff --git a/processing/src/main/java/io/druid/query/aggregation/DoubleMaxAggregatorFactory.java b/processing/src/main/java/io/druid/query/aggregation/DoubleMaxAggregatorFactory.java index 85267eb622a6..2a3b3ebe545e 100644 --- a/processing/src/main/java/io/druid/query/aggregation/DoubleMaxAggregatorFactory.java +++ b/processing/src/main/java/io/druid/query/aggregation/DoubleMaxAggregatorFactory.java @@ -70,6 +70,12 @@ public Object combine(Object lhs, Object rhs) return DoubleMaxAggregator.combineValues(lhs, rhs); } + @Override + public MetricCombiner makeMetricCombiner() + { + return new DoubleMaxMetricCombiner(); + } + @Override public AggregatorFactory getCombiningFactory() { diff --git a/processing/src/main/java/io/druid/query/aggregation/DoubleMaxMetricCombiner.java b/processing/src/main/java/io/druid/query/aggregation/DoubleMaxMetricCombiner.java new file mode 100644 index 000000000000..7a4850f3f54f --- /dev/null +++ b/processing/src/main/java/io/druid/query/aggregation/DoubleMaxMetricCombiner.java @@ -0,0 +1,45 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.query.aggregation; + +import io.druid.segment.ColumnValueSelector; + +final class DoubleMaxMetricCombiner extends DoubleMetricCombiner +{ + private double max; + + @Override + public void reset(ColumnValueSelector selector) + { + max = selector.getDouble(); + } + + @Override + public void combine(ColumnValueSelector selector) + { + max = Math.max(max, selector.getDouble()); + } + + @Override + public double getDouble() + { + return max; + } +} diff --git a/processing/src/main/java/io/druid/query/aggregation/DoubleMetricCombiner.java b/processing/src/main/java/io/druid/query/aggregation/DoubleMetricCombiner.java new file mode 100644 index 000000000000..7a113a7f4b26 --- /dev/null +++ b/processing/src/main/java/io/druid/query/aggregation/DoubleMetricCombiner.java @@ -0,0 +1,35 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.query.aggregation; + +import io.druid.query.monomorphicprocessing.RuntimeShapeInspector; +import io.druid.segment.DoubleColumnSelector; + +/** + * Specialization of {@link MetricCombiner} for primitive double metrics. + */ +public abstract class DoubleMetricCombiner implements MetricCombiner, DoubleColumnSelector +{ + @Override + public void inspectRuntimeShape(RuntimeShapeInspector inspector) + { + // Usually MetricCombiner has nothing to inspect + } +} diff --git a/processing/src/main/java/io/druid/query/aggregation/DoubleMinAggregatorFactory.java b/processing/src/main/java/io/druid/query/aggregation/DoubleMinAggregatorFactory.java index ab5e4aaf2a28..68110f0b3c6d 100644 --- a/processing/src/main/java/io/druid/query/aggregation/DoubleMinAggregatorFactory.java +++ b/processing/src/main/java/io/druid/query/aggregation/DoubleMinAggregatorFactory.java @@ -76,6 +76,12 @@ public Object combine(Object lhs, Object rhs) return DoubleMinAggregator.combineValues(lhs, rhs); } + @Override + public MetricCombiner makeMetricCombiner() + { + return new DoubleMinMetricCombiner(); + } + @Override public AggregatorFactory getCombiningFactory() { diff --git a/processing/src/main/java/io/druid/query/aggregation/DoubleMinMetricCombiner.java b/processing/src/main/java/io/druid/query/aggregation/DoubleMinMetricCombiner.java new file mode 100644 index 000000000000..f5d7d45cfb15 --- /dev/null +++ b/processing/src/main/java/io/druid/query/aggregation/DoubleMinMetricCombiner.java @@ -0,0 +1,45 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.query.aggregation; + +import io.druid.segment.ColumnValueSelector; + +final class DoubleMinMetricCombiner extends DoubleMetricCombiner +{ + private double min; + + @Override + public void reset(ColumnValueSelector selector) + { + min = selector.getDouble(); + } + + @Override + public void combine(ColumnValueSelector selector) + { + min = Math.min(min, selector.getDouble()); + } + + @Override + public double getDouble() + { + return min; + } +} diff --git a/processing/src/main/java/io/druid/query/aggregation/DoubleSumAggregatorFactory.java b/processing/src/main/java/io/druid/query/aggregation/DoubleSumAggregatorFactory.java index ac66d1bbdfba..32fa9dac4c17 100644 --- a/processing/src/main/java/io/druid/query/aggregation/DoubleSumAggregatorFactory.java +++ b/processing/src/main/java/io/druid/query/aggregation/DoubleSumAggregatorFactory.java @@ -70,6 +70,12 @@ public Object combine(Object lhs, Object rhs) return DoubleSumAggregator.combineValues(lhs, rhs); } + @Override + public MetricCombiner makeMetricCombiner() + { + return new DoubleSumMetricCombiner(); + } + @Override public AggregatorFactory getCombiningFactory() { diff --git a/processing/src/main/java/io/druid/query/aggregation/DoubleSumMetricCombiner.java b/processing/src/main/java/io/druid/query/aggregation/DoubleSumMetricCombiner.java new file mode 100644 index 000000000000..b3bb5a83cddb --- /dev/null +++ b/processing/src/main/java/io/druid/query/aggregation/DoubleSumMetricCombiner.java @@ -0,0 +1,45 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.query.aggregation; + +import io.druid.segment.ColumnValueSelector; + +final class DoubleSumMetricCombiner extends DoubleMetricCombiner +{ + private double sum; + + @Override + public void reset(ColumnValueSelector selector) + { + sum = selector.getDouble(); + } + + @Override + public void combine(ColumnValueSelector selector) + { + sum += selector.getDouble(); + } + + @Override + public double getDouble() + { + return sum; + } +} diff --git a/processing/src/main/java/io/druid/query/aggregation/FilteredAggregatorFactory.java b/processing/src/main/java/io/druid/query/aggregation/FilteredAggregatorFactory.java index 166421fc05e8..de5c4b1206c9 100644 --- a/processing/src/main/java/io/druid/query/aggregation/FilteredAggregatorFactory.java +++ b/processing/src/main/java/io/druid/query/aggregation/FilteredAggregatorFactory.java @@ -79,6 +79,12 @@ public Object combine(Object lhs, Object rhs) return delegate.combine(lhs, rhs); } + @Override + public MetricCombiner makeMetricCombiner() + { + return delegate.makeMetricCombiner(); + } + @Override public AggregatorFactory getCombiningFactory() { diff --git a/processing/src/main/java/io/druid/query/aggregation/FloatMaxAggregatorFactory.java b/processing/src/main/java/io/druid/query/aggregation/FloatMaxAggregatorFactory.java index d4bb95234f13..065f121fe927 100644 --- a/processing/src/main/java/io/druid/query/aggregation/FloatMaxAggregatorFactory.java +++ b/processing/src/main/java/io/druid/query/aggregation/FloatMaxAggregatorFactory.java @@ -69,6 +69,12 @@ public Object combine(Object lhs, Object rhs) return FloatMaxAggregator.combineValues(finalizeComputation(lhs), finalizeComputation(rhs)); } + @Override + public MetricCombiner makeMetricCombiner() + { + return new DoubleMaxMetricCombiner(); + } + @Override public AggregatorFactory getCombiningFactory() { diff --git a/processing/src/main/java/io/druid/query/aggregation/FloatMinAggregatorFactory.java b/processing/src/main/java/io/druid/query/aggregation/FloatMinAggregatorFactory.java index c520c1da933d..2028f0a31055 100644 --- a/processing/src/main/java/io/druid/query/aggregation/FloatMinAggregatorFactory.java +++ b/processing/src/main/java/io/druid/query/aggregation/FloatMinAggregatorFactory.java @@ -69,6 +69,12 @@ public Object combine(Object lhs, Object rhs) return FloatMinAggregator.combineValues(lhs, rhs); } + @Override + public MetricCombiner makeMetricCombiner() + { + return new DoubleMinMetricCombiner(); + } + @Override public AggregatorFactory getCombiningFactory() { diff --git a/processing/src/main/java/io/druid/query/aggregation/FloatSumAggregatorFactory.java b/processing/src/main/java/io/druid/query/aggregation/FloatSumAggregatorFactory.java index ad6206c8d6fd..591b7db8e71b 100644 --- a/processing/src/main/java/io/druid/query/aggregation/FloatSumAggregatorFactory.java +++ b/processing/src/main/java/io/druid/query/aggregation/FloatSumAggregatorFactory.java @@ -69,6 +69,12 @@ public Object combine(Object lhs, Object rhs) return FloatSumAggregator.combineValues(lhs, rhs); } + @Override + public MetricCombiner makeMetricCombiner() + { + return new DoubleSumMetricCombiner(); + } + @Override public AggregatorFactory getCombiningFactory() { diff --git a/processing/src/main/java/io/druid/query/aggregation/Histogram.java b/processing/src/main/java/io/druid/query/aggregation/Histogram.java index 8fa7188e75e9..dc8ac1234ec4 100644 --- a/processing/src/main/java/io/druid/query/aggregation/Histogram.java +++ b/processing/src/main/java/io/druid/query/aggregation/Histogram.java @@ -58,6 +58,28 @@ public Histogram(float[] breaks, long[] bins, float min, float max) } } + public Histogram(Histogram other) + { + this.breaks = other.breaks; + this.bins = other.bins.clone(); + this.min = other.min; + this.max = other.max; + this.count = other.count; + } + + public void copyFrom(Histogram other) + { + this.breaks = other.breaks; + if (this.bins.length == other.bins.length) { + System.arraycopy(other.bins, 0, this.bins, 0, this.bins.length); + } else { + this.bins = other.bins.clone(); + } + this.min = other.min; + this.max = other.max; + this.count = other.count; + } + public void offer(float d) { if (d > max) { diff --git a/processing/src/main/java/io/druid/query/aggregation/HistogramAggregator.java b/processing/src/main/java/io/druid/query/aggregation/HistogramAggregator.java index 665bf8c9d877..344899d7607c 100644 --- a/processing/src/main/java/io/druid/query/aggregation/HistogramAggregator.java +++ b/processing/src/main/java/io/druid/query/aggregation/HistogramAggregator.java @@ -35,7 +35,7 @@ public int compare(Object o, Object o1) } }; - static Object combineHistograms(Object lhs, Object rhs) + static Histogram combineHistograms(Object lhs, Object rhs) { return ((Histogram) lhs).fold((Histogram) rhs); } diff --git a/processing/src/main/java/io/druid/query/aggregation/HistogramAggregatorFactory.java b/processing/src/main/java/io/druid/query/aggregation/HistogramAggregatorFactory.java index 80eade4092ae..ca08bd0aec3f 100644 --- a/processing/src/main/java/io/druid/query/aggregation/HistogramAggregatorFactory.java +++ b/processing/src/main/java/io/druid/query/aggregation/HistogramAggregatorFactory.java @@ -27,8 +27,11 @@ import com.google.common.primitives.Longs; import io.druid.java.util.common.StringUtils; import io.druid.segment.ColumnSelectorFactory; +import io.druid.segment.ColumnValueSelector; +import io.druid.segment.ObjectColumnSelector; import org.apache.commons.codec.binary.Base64; +import javax.annotation.Nullable; import java.nio.ByteBuffer; import java.util.Arrays; import java.util.Comparator; @@ -88,6 +91,50 @@ public Object combine(Object lhs, Object rhs) return HistogramAggregator.combineHistograms(lhs, rhs); } + @Override + public MetricCombiner makeMetricCombiner() + { + // HistogramAggregatorFactory.combine() delegates to HistogramAggregator.combineHistograms() and it doesn't check + // for nulls, so this MetricCombiner neither. + return new ObjectMetricCombiner() + { + private Histogram combined; + + @Override + public void reset(ColumnValueSelector selector) + { + @SuppressWarnings("unchecked") + Histogram first = ((ObjectColumnSelector) selector).get(); + if (combined == null) { + combined = new Histogram(first); + } else { + combined.copyFrom(first); + } + } + + @Override + public void combine(ColumnValueSelector selector) + { + @SuppressWarnings("unchecked") + Histogram other = ((ObjectColumnSelector) selector).get(); + combined.fold(other); + } + + @Override + public Class classOfObject() + { + return Histogram.class; + } + + @Nullable + @Override + public Histogram get() + { + return combined; + } + }; + } + @Override public AggregatorFactory getCombiningFactory() { diff --git a/processing/src/main/java/io/druid/query/aggregation/JavaScriptAggregatorFactory.java b/processing/src/main/java/io/druid/query/aggregation/JavaScriptAggregatorFactory.java index bfd4647fad42..598a2804a8cd 100644 --- a/processing/src/main/java/io/druid/query/aggregation/JavaScriptAggregatorFactory.java +++ b/processing/src/main/java/io/druid/query/aggregation/JavaScriptAggregatorFactory.java @@ -32,6 +32,7 @@ import io.druid.java.util.common.StringUtils; import io.druid.js.JavaScriptConfig; import io.druid.segment.ColumnSelectorFactory; +import io.druid.segment.ColumnValueSelector; import io.druid.segment.ObjectColumnSelector; import org.mozilla.javascript.Context; import org.mozilla.javascript.ContextAction; @@ -140,6 +141,33 @@ public Object combine(Object lhs, Object rhs) return getCompiledScript().combine(((Number) lhs).doubleValue(), ((Number) rhs).doubleValue()); } + @Override + public MetricCombiner makeMetricCombiner() + { + return new DoubleMetricCombiner() + { + private double combined; + + @Override + public void reset(ColumnValueSelector selector) + { + combined = selector.getDouble(); + } + + @Override + public void combine(ColumnValueSelector selector) + { + combined = getCompiledScript().combine(combined, selector.getDouble()); + } + + @Override + public double getDouble() + { + return combined; + } + }; + } + @Override public AggregatorFactory getCombiningFactory() { diff --git a/processing/src/main/java/io/druid/query/aggregation/LongMaxAggregatorFactory.java b/processing/src/main/java/io/druid/query/aggregation/LongMaxAggregatorFactory.java index 66b07c288e1d..6b4ed37959e3 100644 --- a/processing/src/main/java/io/druid/query/aggregation/LongMaxAggregatorFactory.java +++ b/processing/src/main/java/io/druid/query/aggregation/LongMaxAggregatorFactory.java @@ -28,6 +28,7 @@ import io.druid.math.expr.ExprMacroTable; import io.druid.math.expr.Parser; import io.druid.segment.ColumnSelectorFactory; +import io.druid.segment.ColumnValueSelector; import io.druid.segment.LongColumnSelector; import java.nio.ByteBuffer; @@ -100,6 +101,33 @@ public Object combine(Object lhs, Object rhs) return LongMaxAggregator.combineValues(lhs, rhs); } + @Override + public MetricCombiner makeMetricCombiner() + { + return new LongMetricCombiner() + { + private long max; + + @Override + public void reset(ColumnValueSelector selector) + { + max = selector.getLong(); + } + + @Override + public void combine(ColumnValueSelector selector) + { + max = Math.max(max, selector.getLong()); + } + + @Override + public long getLong() + { + return max; + } + }; + } + @Override public AggregatorFactory getCombiningFactory() { diff --git a/processing/src/main/java/io/druid/query/aggregation/LongMetricCombiner.java b/processing/src/main/java/io/druid/query/aggregation/LongMetricCombiner.java new file mode 100644 index 000000000000..6ba8b386ff06 --- /dev/null +++ b/processing/src/main/java/io/druid/query/aggregation/LongMetricCombiner.java @@ -0,0 +1,35 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.query.aggregation; + +import io.druid.query.monomorphicprocessing.RuntimeShapeInspector; +import io.druid.segment.LongColumnSelector; + +/** + * Specialization of {@link MetricCombiner} for primitive long metrics. + */ +public abstract class LongMetricCombiner implements MetricCombiner, LongColumnSelector +{ + @Override + public void inspectRuntimeShape(RuntimeShapeInspector inspector) + { + // Usually MetricCombiner has nothing to inspect + } +} diff --git a/processing/src/main/java/io/druid/query/aggregation/LongMinAggregatorFactory.java b/processing/src/main/java/io/druid/query/aggregation/LongMinAggregatorFactory.java index d09f23dc4b04..7e07377e12b9 100644 --- a/processing/src/main/java/io/druid/query/aggregation/LongMinAggregatorFactory.java +++ b/processing/src/main/java/io/druid/query/aggregation/LongMinAggregatorFactory.java @@ -28,6 +28,7 @@ import io.druid.math.expr.ExprMacroTable; import io.druid.math.expr.Parser; import io.druid.segment.ColumnSelectorFactory; +import io.druid.segment.ColumnValueSelector; import io.druid.segment.LongColumnSelector; import java.nio.ByteBuffer; @@ -101,6 +102,33 @@ public Object combine(Object lhs, Object rhs) return LongMinAggregator.combineValues(lhs, rhs); } + @Override + public MetricCombiner makeMetricCombiner() + { + return new LongMetricCombiner() + { + private long min; + + @Override + public void reset(ColumnValueSelector selector) + { + min = selector.getLong(); + } + + @Override + public void combine(ColumnValueSelector selector) + { + min = Math.min(min, selector.getLong()); + } + + @Override + public long getLong() + { + return min; + } + }; + } + @Override public AggregatorFactory getCombiningFactory() { diff --git a/processing/src/main/java/io/druid/query/aggregation/LongSumAggregatorFactory.java b/processing/src/main/java/io/druid/query/aggregation/LongSumAggregatorFactory.java index d2212c4fbeb6..22c9799a0f1f 100644 --- a/processing/src/main/java/io/druid/query/aggregation/LongSumAggregatorFactory.java +++ b/processing/src/main/java/io/druid/query/aggregation/LongSumAggregatorFactory.java @@ -100,6 +100,12 @@ public Object combine(Object lhs, Object rhs) return LongSumAggregator.combineValues(lhs, rhs); } + @Override + public MetricCombiner makeMetricCombiner() + { + return new LongSumMetricCombiner(); + } + @Override public AggregatorFactory getCombiningFactory() { diff --git a/processing/src/main/java/io/druid/query/aggregation/LongSumMetricCombiner.java b/processing/src/main/java/io/druid/query/aggregation/LongSumMetricCombiner.java new file mode 100644 index 000000000000..1bfe06a0dd60 --- /dev/null +++ b/processing/src/main/java/io/druid/query/aggregation/LongSumMetricCombiner.java @@ -0,0 +1,45 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.query.aggregation; + +import io.druid.segment.ColumnValueSelector; + +public final class LongSumMetricCombiner extends LongMetricCombiner +{ + private long sum; + + @Override + public void reset(ColumnValueSelector selector) + { + sum = selector.getLong(); + } + + @Override + public void combine(ColumnValueSelector selector) + { + sum += selector.getLong(); + } + + @Override + public long getLong() + { + return sum; + } +} diff --git a/processing/src/main/java/io/druid/query/aggregation/MetricCombiner.java b/processing/src/main/java/io/druid/query/aggregation/MetricCombiner.java new file mode 100644 index 000000000000..d8956bb09aae --- /dev/null +++ b/processing/src/main/java/io/druid/query/aggregation/MetricCombiner.java @@ -0,0 +1,65 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.query.aggregation; + +import io.druid.segment.ColumnValueSelector; + +/** + * MetricCombiner is used to combine rollup aggregation results from serveral "rows" of different indexes during index + * merging (see {@link io.druid.segment.IndexMerger}). + * + * The state of the implementations of this interface is a metric value (either a primitive or an object), that could be + * queried via {@link ColumnValueSelector}'s methods. Before {@link #reset} is ever called on a MetricCombiner, it's + * state is undefined and {@link ColumnValueSelector}'s methods could return something random, or throw an exception. + * + * @see AggregatorFactory#makeMetricCombiner() + * @see LongMetricCombiner + * @see DoubleMetricCombiner + * @see ObjectMetricCombiner + */ +public interface MetricCombiner extends ColumnValueSelector +{ + /** + * Resets this MetricCombiner's state value to the value of the given selector, e. g. after calling this method + * metricCombiner.get*() should return the same value as selector.get*(). + * + * If the selector is an {@link io.druid.segment.ObjectColumnSelector}, the object returned from {@link + * io.druid.segment.ObjectColumnSelector#get()} must not be modified, and must not become a subject for modification + * during subsequent {@link #combine} calls. + */ + void reset(ColumnValueSelector selector); + + /** + * Combines this MetricCombiner's state value with the value of the given selector and saves it in this + * MetricCombiner's state, e. g. after calling metricCombiner.combine(selector), metricCombiner.get*() should return + * the value that would be the result of {@link AggregatorFactory#combine + * aggregatorFactory.combine(metricCombiner.get*(), selector.get*())} call. + * + * Unlike {@link AggregatorFactory#combine}, if the selector is an {@link io.druid.segment.ObjectColumnSelector}, the + * object returned from {@link io.druid.segment.ObjectColumnSelector#get()} must not be modified, and must not become + * a subject for modification during subsequent combine() calls. + * + * Since the state of MetricCombiner is underfined before {@link #reset} is ever called on it, the effects of calling + * combine() on a MetricCombiner instance are also underfined in this case. + * + * @see AggregatorFactory#combine + */ + void combine(ColumnValueSelector selector); +} diff --git a/processing/src/main/java/io/druid/query/aggregation/ObjectMetricCombiner.java b/processing/src/main/java/io/druid/query/aggregation/ObjectMetricCombiner.java new file mode 100644 index 000000000000..5e27941e35a6 --- /dev/null +++ b/processing/src/main/java/io/druid/query/aggregation/ObjectMetricCombiner.java @@ -0,0 +1,29 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.query.aggregation; + +import io.druid.segment.ObjectColumnSelector; + +/** + * Specialization of {@link MetricCombiner} for object metrics. + */ +public abstract class ObjectMetricCombiner implements MetricCombiner, ObjectColumnSelector +{ +} diff --git a/processing/src/main/java/io/druid/query/aggregation/cardinality/CardinalityAggregatorFactory.java b/processing/src/main/java/io/druid/query/aggregation/cardinality/CardinalityAggregatorFactory.java index 4a8129e4054b..80bacc81ffc5 100644 --- a/processing/src/main/java/io/druid/query/aggregation/cardinality/CardinalityAggregatorFactory.java +++ b/processing/src/main/java/io/druid/query/aggregation/cardinality/CardinalityAggregatorFactory.java @@ -33,6 +33,7 @@ import io.druid.query.aggregation.AggregatorFactoryNotMergeableException; import io.druid.query.aggregation.AggregatorUtil; import io.druid.query.aggregation.BufferAggregator; +import io.druid.query.aggregation.MetricCombiner; import io.druid.query.aggregation.NoopAggregator; import io.druid.query.aggregation.NoopBufferAggregator; import io.druid.query.aggregation.cardinality.types.CardinalityAggregatorColumnSelectorStrategy; @@ -191,6 +192,12 @@ public Object combine(Object lhs, Object rhs) return ((HyperLogLogCollector) lhs).fold((HyperLogLogCollector) rhs); } + @Override + public MetricCombiner makeMetricCombiner() + { + return new HyperLogLogCollectorMetricCombiner(); + } + @Override public AggregatorFactory getCombiningFactory() { diff --git a/processing/src/main/java/io/druid/query/aggregation/cardinality/HyperLogLogCollectorMetricCombiner.java b/processing/src/main/java/io/druid/query/aggregation/cardinality/HyperLogLogCollectorMetricCombiner.java new file mode 100644 index 000000000000..4789727868dd --- /dev/null +++ b/processing/src/main/java/io/druid/query/aggregation/cardinality/HyperLogLogCollectorMetricCombiner.java @@ -0,0 +1,67 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.query.aggregation.cardinality; + +import io.druid.hll.HyperLogLogCollector; +import io.druid.query.aggregation.ObjectMetricCombiner; +import io.druid.segment.ColumnValueSelector; +import io.druid.segment.ObjectColumnSelector; + +import javax.annotation.Nullable; + +public final class HyperLogLogCollectorMetricCombiner extends ObjectMetricCombiner +{ + @Nullable + private HyperLogLogCollector combined; + + @Override + public void reset(ColumnValueSelector selector) + { + combined = null; + combine(selector); + } + + @Override + public void combine(ColumnValueSelector selector) + { + @SuppressWarnings("unchecked") + HyperLogLogCollector other = ((ObjectColumnSelector) selector).get(); + if (other == null) { + return; + } + if (combined == null) { + combined = HyperLogLogCollector.makeLatestCollector(); + } + combined.fold(other); + } + + @Override + public Class classOfObject() + { + return HyperLogLogCollector.class; + } + + @Nullable + @Override + public HyperLogLogCollector get() + { + return combined; + } +} diff --git a/processing/src/main/java/io/druid/query/aggregation/first/DoubleFirstAggregatorFactory.java b/processing/src/main/java/io/druid/query/aggregation/first/DoubleFirstAggregatorFactory.java index 9a4af62eae11..1110551a26ac 100644 --- a/processing/src/main/java/io/druid/query/aggregation/first/DoubleFirstAggregatorFactory.java +++ b/processing/src/main/java/io/druid/query/aggregation/first/DoubleFirstAggregatorFactory.java @@ -26,11 +26,13 @@ import com.google.common.primitives.Longs; import com.metamx.common.StringUtils; import io.druid.collections.SerializablePair; +import io.druid.java.util.common.UOE; import io.druid.query.aggregation.Aggregator; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.AggregatorFactoryNotMergeableException; import io.druid.query.aggregation.AggregatorUtil; import io.druid.query.aggregation.BufferAggregator; +import io.druid.query.aggregation.MetricCombiner; import io.druid.query.monomorphicprocessing.RuntimeShapeInspector; import io.druid.segment.ColumnSelectorFactory; import io.druid.segment.ObjectColumnSelector; @@ -102,6 +104,12 @@ public Object combine(Object lhs, Object rhs) return TIME_COMPARATOR.compare(lhs, rhs) <= 0 ? lhs : rhs; } + @Override + public MetricCombiner makeMetricCombiner() + { + throw new UOE("DoubleFirstAggregatorFactory is not supported during ingestion for rollup"); + } + @Override public AggregatorFactory getCombiningFactory() { diff --git a/processing/src/main/java/io/druid/query/aggregation/first/FloatFirstAggregatorFactory.java b/processing/src/main/java/io/druid/query/aggregation/first/FloatFirstAggregatorFactory.java index a6d6823e2cdc..9385f7a19ba2 100644 --- a/processing/src/main/java/io/druid/query/aggregation/first/FloatFirstAggregatorFactory.java +++ b/processing/src/main/java/io/druid/query/aggregation/first/FloatFirstAggregatorFactory.java @@ -26,11 +26,13 @@ import com.google.common.primitives.Longs; import com.metamx.common.StringUtils; import io.druid.collections.SerializablePair; +import io.druid.java.util.common.UOE; import io.druid.query.aggregation.Aggregator; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.AggregatorFactoryNotMergeableException; import io.druid.query.aggregation.AggregatorUtil; import io.druid.query.aggregation.BufferAggregator; +import io.druid.query.aggregation.MetricCombiner; import io.druid.query.monomorphicprocessing.RuntimeShapeInspector; import io.druid.segment.ColumnSelectorFactory; import io.druid.segment.ObjectColumnSelector; @@ -102,6 +104,12 @@ public Object combine(Object lhs, Object rhs) return TIME_COMPARATOR.compare(lhs, rhs) <= 0 ? lhs : rhs; } + @Override + public MetricCombiner makeMetricCombiner() + { + throw new UOE("FloatFirstAggregatorFactory is not supported during ingestion for rollup"); + } + @Override public AggregatorFactory getCombiningFactory() { diff --git a/processing/src/main/java/io/druid/query/aggregation/first/LongFirstAggregatorFactory.java b/processing/src/main/java/io/druid/query/aggregation/first/LongFirstAggregatorFactory.java index 1cdffb2d49b0..444b02742760 100644 --- a/processing/src/main/java/io/druid/query/aggregation/first/LongFirstAggregatorFactory.java +++ b/processing/src/main/java/io/druid/query/aggregation/first/LongFirstAggregatorFactory.java @@ -25,11 +25,13 @@ import com.google.common.primitives.Longs; import com.metamx.common.StringUtils; import io.druid.collections.SerializablePair; +import io.druid.java.util.common.UOE; import io.druid.query.aggregation.Aggregator; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.AggregatorFactoryNotMergeableException; import io.druid.query.aggregation.AggregatorUtil; import io.druid.query.aggregation.BufferAggregator; +import io.druid.query.aggregation.MetricCombiner; import io.druid.query.monomorphicprocessing.RuntimeShapeInspector; import io.druid.segment.ColumnSelectorFactory; import io.druid.segment.ObjectColumnSelector; @@ -95,6 +97,12 @@ public Object combine(Object lhs, Object rhs) return DoubleFirstAggregatorFactory.TIME_COMPARATOR.compare(lhs, rhs) <= 0 ? lhs : rhs; } + @Override + public MetricCombiner makeMetricCombiner() + { + throw new UOE("LongFirstAggregatorFactory is not supported during ingestion for rollup"); + } + @Override public AggregatorFactory getCombiningFactory() { diff --git a/processing/src/main/java/io/druid/query/aggregation/hyperloglog/HyperUniquesAggregatorFactory.java b/processing/src/main/java/io/druid/query/aggregation/hyperloglog/HyperUniquesAggregatorFactory.java index bc61b4368022..84e9ab2550ed 100644 --- a/processing/src/main/java/io/druid/query/aggregation/hyperloglog/HyperUniquesAggregatorFactory.java +++ b/processing/src/main/java/io/druid/query/aggregation/hyperloglog/HyperUniquesAggregatorFactory.java @@ -30,8 +30,10 @@ import io.druid.query.aggregation.AggregatorFactoryNotMergeableException; import io.druid.query.aggregation.AggregatorUtil; import io.druid.query.aggregation.BufferAggregator; +import io.druid.query.aggregation.MetricCombiner; import io.druid.query.aggregation.NoopAggregator; import io.druid.query.aggregation.NoopBufferAggregator; +import io.druid.query.aggregation.cardinality.HyperLogLogCollectorMetricCombiner; import io.druid.segment.ColumnSelectorFactory; import io.druid.segment.ObjectColumnSelector; import org.apache.commons.codec.binary.Base64; @@ -136,6 +138,12 @@ public Object combine(Object lhs, Object rhs) return ((HyperLogLogCollector) lhs).fold((HyperLogLogCollector) rhs); } + @Override + public MetricCombiner makeMetricCombiner() + { + return new HyperLogLogCollectorMetricCombiner(); + } + @Override public AggregatorFactory getCombiningFactory() { diff --git a/processing/src/main/java/io/druid/query/aggregation/last/DoubleLastAggregatorFactory.java b/processing/src/main/java/io/druid/query/aggregation/last/DoubleLastAggregatorFactory.java index 27aefaaffffc..0b5f2c4f5814 100644 --- a/processing/src/main/java/io/druid/query/aggregation/last/DoubleLastAggregatorFactory.java +++ b/processing/src/main/java/io/druid/query/aggregation/last/DoubleLastAggregatorFactory.java @@ -25,11 +25,13 @@ import com.google.common.primitives.Longs; import com.metamx.common.StringUtils; import io.druid.collections.SerializablePair; +import io.druid.java.util.common.UOE; import io.druid.query.aggregation.Aggregator; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.AggregatorFactoryNotMergeableException; import io.druid.query.aggregation.AggregatorUtil; import io.druid.query.aggregation.BufferAggregator; +import io.druid.query.aggregation.MetricCombiner; import io.druid.query.aggregation.first.DoubleFirstAggregatorFactory; import io.druid.query.aggregation.first.LongFirstAggregatorFactory; import io.druid.query.monomorphicprocessing.RuntimeShapeInspector; @@ -93,6 +95,12 @@ public Object combine(Object lhs, Object rhs) return DoubleFirstAggregatorFactory.TIME_COMPARATOR.compare(lhs, rhs) > 0 ? lhs : rhs; } + @Override + public MetricCombiner makeMetricCombiner() + { + throw new UOE("DoubleLastAggregatorFactory is not supported during ingestion for rollup"); + } + @Override public AggregatorFactory getCombiningFactory() { diff --git a/processing/src/main/java/io/druid/query/aggregation/last/FloatLastAggregatorFactory.java b/processing/src/main/java/io/druid/query/aggregation/last/FloatLastAggregatorFactory.java index 6f2d55423dd2..d0f9be9167d9 100644 --- a/processing/src/main/java/io/druid/query/aggregation/last/FloatLastAggregatorFactory.java +++ b/processing/src/main/java/io/druid/query/aggregation/last/FloatLastAggregatorFactory.java @@ -25,11 +25,13 @@ import com.google.common.primitives.Longs; import com.metamx.common.StringUtils; import io.druid.collections.SerializablePair; +import io.druid.java.util.common.UOE; import io.druid.query.aggregation.Aggregator; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.AggregatorFactoryNotMergeableException; import io.druid.query.aggregation.AggregatorUtil; import io.druid.query.aggregation.BufferAggregator; +import io.druid.query.aggregation.MetricCombiner; import io.druid.query.aggregation.first.FloatFirstAggregatorFactory; import io.druid.query.aggregation.first.LongFirstAggregatorFactory; import io.druid.query.monomorphicprocessing.RuntimeShapeInspector; @@ -93,6 +95,12 @@ public Object combine(Object lhs, Object rhs) return FloatFirstAggregatorFactory.TIME_COMPARATOR.compare(lhs, rhs) > 0 ? lhs : rhs; } + @Override + public MetricCombiner makeMetricCombiner() + { + throw new UOE("FloatLastAggregatorFactory is not supported during ingestion for rollup"); + } + @Override public AggregatorFactory getCombiningFactory() { diff --git a/processing/src/main/java/io/druid/query/aggregation/last/LongLastAggregatorFactory.java b/processing/src/main/java/io/druid/query/aggregation/last/LongLastAggregatorFactory.java index 32623c50b891..f0dfef65bc4c 100644 --- a/processing/src/main/java/io/druid/query/aggregation/last/LongLastAggregatorFactory.java +++ b/processing/src/main/java/io/druid/query/aggregation/last/LongLastAggregatorFactory.java @@ -24,11 +24,13 @@ import com.google.common.base.Preconditions; import com.metamx.common.StringUtils; import io.druid.collections.SerializablePair; +import io.druid.java.util.common.UOE; import io.druid.query.aggregation.Aggregator; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.AggregatorFactoryNotMergeableException; import io.druid.query.aggregation.AggregatorUtil; import io.druid.query.aggregation.BufferAggregator; +import io.druid.query.aggregation.MetricCombiner; import io.druid.query.aggregation.first.DoubleFirstAggregatorFactory; import io.druid.query.aggregation.first.LongFirstAggregatorFactory; import io.druid.query.monomorphicprocessing.RuntimeShapeInspector; @@ -91,6 +93,11 @@ public Object combine(Object lhs, Object rhs) return DoubleFirstAggregatorFactory.TIME_COMPARATOR.compare(lhs, rhs) > 0 ? lhs : rhs; } + @Override + public MetricCombiner makeMetricCombiner() + { + throw new UOE("LongLastAggregatorFactory is not supported during ingestion for rollup"); + } @Override public AggregatorFactory getCombiningFactory() diff --git a/processing/src/main/java/io/druid/segment/ObjectColumnSelector.java b/processing/src/main/java/io/druid/segment/ObjectColumnSelector.java index 6dac1a42d5d4..498aa6273752 100644 --- a/processing/src/main/java/io/druid/segment/ObjectColumnSelector.java +++ b/processing/src/main/java/io/druid/segment/ObjectColumnSelector.java @@ -19,6 +19,8 @@ package io.druid.segment; +import javax.annotation.Nullable; + public interface ObjectColumnSelector extends ColumnValueSelector { public Class classOfObject(); @@ -28,6 +30,7 @@ public interface ObjectColumnSelector extends ColumnValueSelector * ObjectColumnSelector doesn't extend {@link io.druid.query.monomorphicprocessing.HotLoopCallee} yet. If it will, * this method should be annotated. */ + @Nullable public T get(); /** @@ -39,7 +42,11 @@ public interface ObjectColumnSelector extends ColumnValueSelector @Override default float getFloat() { - return ((Number) get()).floatValue(); + T value = get(); + if (value == null) { + return 0; + } + return ((Number) value).floatValue(); } /** @@ -51,7 +58,11 @@ default float getFloat() @Override default double getDouble() { - return ((Number) get()).doubleValue(); + T value = get(); + if (value == null) { + return 0; + } + return ((Number) value).doubleValue(); } /** @@ -63,6 +74,10 @@ default double getDouble() @Override default long getLong() { - return ((Number) get()).longValue(); + T value = get(); + if (value == null) { + return 0; + } + return ((Number) value).longValue(); } } From f4f22bbd5af92967dad46a68f906415ddacc2dab Mon Sep 17 00:00:00 2001 From: leventov Date: Thu, 10 Aug 2017 17:40:31 +0300 Subject: [PATCH 2/5] Rename MetricCombiner to AggregateCombiner --- .../DistinctCountAggregatorFactory.java | 8 ++--- .../TimestampAggregatorFactory.java | 2 +- .../theta/SketchAggregatorFactory.java | 8 ++--- ...ApproximateHistogramAggregatorFactory.java | 10 +++--- .../variance/VarianceAggregatorFactory.java | 10 +++--- ...icCombiner.java => AggregateCombiner.java} | 35 ++++++++++--------- .../query/aggregation/AggregatorFactory.java | 10 +++--- .../aggregation/CountAggregatorFactory.java | 4 +-- ...iner.java => DoubleAggregateCombiner.java} | 6 ++-- ...r.java => DoubleMaxAggregateCombiner.java} | 2 +- .../DoubleMaxAggregatorFactory.java | 4 +-- ...r.java => DoubleMinAggregateCombiner.java} | 2 +- .../DoubleMinAggregatorFactory.java | 4 +-- ...r.java => DoubleSumAggregateCombiner.java} | 2 +- .../DoubleSumAggregatorFactory.java | 4 +-- .../FilteredAggregatorFactory.java | 4 +-- .../FloatMaxAggregatorFactory.java | 4 +-- .../FloatMinAggregatorFactory.java | 4 +-- .../FloatSumAggregatorFactory.java | 4 +-- .../HistogramAggregatorFactory.java | 6 ++-- .../JavaScriptAggregatorFactory.java | 4 +-- ...mbiner.java => LongAggregateCombiner.java} | 6 ++-- .../aggregation/LongMaxAggregatorFactory.java | 4 +-- .../aggregation/LongMinAggregatorFactory.java | 4 +-- ...ner.java => LongSumAggregateCombiner.java} | 2 +- .../aggregation/LongSumAggregatorFactory.java | 4 +-- ...iner.java => ObjectAggregateCombiner.java} | 4 +-- .../CardinalityAggregatorFactory.java | 6 ++-- ...yperLogLogCollectorAggregateCombiner.java} | 4 +-- .../first/DoubleFirstAggregatorFactory.java | 4 +-- .../first/FloatFirstAggregatorFactory.java | 4 +-- .../first/LongFirstAggregatorFactory.java | 4 +-- .../HyperUniquesAggregatorFactory.java | 8 ++--- .../last/DoubleLastAggregatorFactory.java | 4 +-- .../last/FloatLastAggregatorFactory.java | 4 +-- .../last/LongLastAggregatorFactory.java | 4 +-- 36 files changed, 102 insertions(+), 101 deletions(-) rename processing/src/main/java/io/druid/query/aggregation/{MetricCombiner.java => AggregateCombiner.java} (53%) rename processing/src/main/java/io/druid/query/aggregation/{DoubleMetricCombiner.java => DoubleAggregateCombiner.java} (81%) rename processing/src/main/java/io/druid/query/aggregation/{DoubleMaxMetricCombiner.java => DoubleMaxAggregateCombiner.java} (94%) rename processing/src/main/java/io/druid/query/aggregation/{DoubleMinMetricCombiner.java => DoubleMinAggregateCombiner.java} (94%) rename processing/src/main/java/io/druid/query/aggregation/{DoubleSumMetricCombiner.java => DoubleSumAggregateCombiner.java} (94%) rename processing/src/main/java/io/druid/query/aggregation/{LongMetricCombiner.java => LongAggregateCombiner.java} (82%) rename processing/src/main/java/io/druid/query/aggregation/{LongSumMetricCombiner.java => LongSumAggregateCombiner.java} (94%) rename processing/src/main/java/io/druid/query/aggregation/{ObjectMetricCombiner.java => ObjectAggregateCombiner.java} (83%) rename processing/src/main/java/io/druid/query/aggregation/cardinality/{HyperLogLogCollectorMetricCombiner.java => HyperLogLogCollectorAggregateCombiner.java} (91%) diff --git a/extensions-contrib/distinctcount/src/main/java/io/druid/query/aggregation/distinctcount/DistinctCountAggregatorFactory.java b/extensions-contrib/distinctcount/src/main/java/io/druid/query/aggregation/distinctcount/DistinctCountAggregatorFactory.java index 4030e996b873..350283cfe150 100644 --- a/extensions-contrib/distinctcount/src/main/java/io/druid/query/aggregation/distinctcount/DistinctCountAggregatorFactory.java +++ b/extensions-contrib/distinctcount/src/main/java/io/druid/query/aggregation/distinctcount/DistinctCountAggregatorFactory.java @@ -29,8 +29,8 @@ import io.druid.query.aggregation.AggregatorUtil; import io.druid.query.aggregation.BufferAggregator; import io.druid.query.aggregation.LongSumAggregatorFactory; -import io.druid.query.aggregation.LongSumMetricCombiner; -import io.druid.query.aggregation.MetricCombiner; +import io.druid.query.aggregation.LongSumAggregateCombiner; +import io.druid.query.aggregation.AggregateCombiner; import io.druid.query.dimension.DefaultDimensionSpec; import io.druid.segment.ColumnSelectorFactory; import io.druid.segment.DimensionSelector; @@ -122,10 +122,10 @@ public Object combine(Object lhs, Object rhs) } @Override - public MetricCombiner makeMetricCombiner() + public AggregateCombiner makeAggregateCombiner() { // This is likely wrong as well as combine(), see https://github.com/druid-io/druid/pull/2602#issuecomment-321224202 - return new LongSumMetricCombiner(); + return new LongSumAggregateCombiner(); } @Override diff --git a/extensions-contrib/time-min-max/src/main/java/io/druid/query/aggregation/TimestampAggregatorFactory.java b/extensions-contrib/time-min-max/src/main/java/io/druid/query/aggregation/TimestampAggregatorFactory.java index 587ab6b73745..a50e9db12fc3 100644 --- a/extensions-contrib/time-min-max/src/main/java/io/druid/query/aggregation/TimestampAggregatorFactory.java +++ b/extensions-contrib/time-min-max/src/main/java/io/druid/query/aggregation/TimestampAggregatorFactory.java @@ -87,7 +87,7 @@ public Object combine(Object lhs, Object rhs) } @Override - public MetricCombiner makeMetricCombiner() + public AggregateCombiner makeAggregateCombiner() { throw new UOE("[%s] is not supported during ingestion for rollup", getClass().getSimpleName()); } diff --git a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchAggregatorFactory.java b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchAggregatorFactory.java index 9bdca1a8bffa..c872ce0e53d5 100644 --- a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchAggregatorFactory.java +++ b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchAggregatorFactory.java @@ -27,11 +27,11 @@ import com.yahoo.sketches.theta.SetOperation; import com.yahoo.sketches.theta.Union; import io.druid.java.util.common.StringUtils; +import io.druid.query.aggregation.AggregateCombiner; import io.druid.query.aggregation.Aggregator; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.BufferAggregator; -import io.druid.query.aggregation.MetricCombiner; -import io.druid.query.aggregation.ObjectMetricCombiner; +import io.druid.query.aggregation.ObjectAggregateCombiner; import io.druid.segment.ColumnSelectorFactory; import io.druid.segment.ColumnValueSelector; import io.druid.segment.ObjectColumnSelector; @@ -105,9 +105,9 @@ public Object combine(Object lhs, Object rhs) } @Override - public MetricCombiner makeMetricCombiner() + public AggregateCombiner makeAggregateCombiner() { - return new ObjectMetricCombiner() + return new ObjectAggregateCombiner() { private final Union union = (Union) SetOperation.builder().build(size, Family.UNION); private final SketchHolder combined = SketchHolder.of(union); diff --git a/extensions-core/histogram/src/main/java/io/druid/query/aggregation/histogram/ApproximateHistogramAggregatorFactory.java b/extensions-core/histogram/src/main/java/io/druid/query/aggregation/histogram/ApproximateHistogramAggregatorFactory.java index 4641b04e6c81..d8a951e846c7 100644 --- a/extensions-core/histogram/src/main/java/io/druid/query/aggregation/histogram/ApproximateHistogramAggregatorFactory.java +++ b/extensions-core/histogram/src/main/java/io/druid/query/aggregation/histogram/ApproximateHistogramAggregatorFactory.java @@ -27,13 +27,13 @@ import com.google.common.primitives.Floats; import com.google.common.primitives.Ints; import io.druid.java.util.common.StringUtils; +import io.druid.query.aggregation.AggregateCombiner; import io.druid.query.aggregation.Aggregator; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.AggregatorFactoryNotMergeableException; import io.druid.query.aggregation.AggregatorUtil; import io.druid.query.aggregation.BufferAggregator; -import io.druid.query.aggregation.MetricCombiner; -import io.druid.query.aggregation.ObjectMetricCombiner; +import io.druid.query.aggregation.ObjectAggregateCombiner; import io.druid.segment.ColumnSelectorFactory; import io.druid.segment.ColumnValueSelector; import io.druid.segment.ObjectColumnSelector; @@ -116,11 +116,11 @@ public Object combine(Object lhs, Object rhs) } @Override - public MetricCombiner makeMetricCombiner() + public AggregateCombiner makeAggregateCombiner() { // ApproximateHistogramAggregatorFactory.combine() delegates to ApproximateHistogramAggregator.combineHistograms() - // and it doesn't check for nulls, so this MetricCombiner neither. - return new ObjectMetricCombiner() + // and it doesn't check for nulls, so this AggregateCombiner neither. + return new ObjectAggregateCombiner() { private final ApproximateHistogram combined = new ApproximateHistogram(); diff --git a/extensions-core/stats/src/main/java/io/druid/query/aggregation/variance/VarianceAggregatorFactory.java b/extensions-core/stats/src/main/java/io/druid/query/aggregation/variance/VarianceAggregatorFactory.java index fa41ea1599c9..cc902357ca79 100644 --- a/extensions-core/stats/src/main/java/io/druid/query/aggregation/variance/VarianceAggregatorFactory.java +++ b/extensions-core/stats/src/main/java/io/druid/query/aggregation/variance/VarianceAggregatorFactory.java @@ -30,10 +30,10 @@ import io.druid.query.aggregation.AggregatorFactoryNotMergeableException; import io.druid.query.aggregation.AggregatorUtil; import io.druid.query.aggregation.BufferAggregator; -import io.druid.query.aggregation.MetricCombiner; +import io.druid.query.aggregation.AggregateCombiner; import io.druid.query.aggregation.NoopAggregator; import io.druid.query.aggregation.NoopBufferAggregator; -import io.druid.query.aggregation.ObjectMetricCombiner; +import io.druid.query.aggregation.ObjectAggregateCombiner; import io.druid.segment.ColumnSelectorFactory; import io.druid.segment.ColumnValueSelector; import io.druid.segment.ObjectColumnSelector; @@ -145,11 +145,11 @@ public Object combine(Object lhs, Object rhs) } @Override - public MetricCombiner makeMetricCombiner() + public AggregateCombiner makeAggregateCombiner() { // VarianceAggregatorFactory.combine() delegates to VarianceAggregatorCollector.combineValues() and it doesn't check - // for nulls, so this MetricCombiner neither. - return new ObjectMetricCombiner() + // for nulls, so this AggregateCombiner neither. + return new ObjectAggregateCombiner() { private final VarianceAggregatorCollector combined = new VarianceAggregatorCollector(); diff --git a/processing/src/main/java/io/druid/query/aggregation/MetricCombiner.java b/processing/src/main/java/io/druid/query/aggregation/AggregateCombiner.java similarity index 53% rename from processing/src/main/java/io/druid/query/aggregation/MetricCombiner.java rename to processing/src/main/java/io/druid/query/aggregation/AggregateCombiner.java index d8956bb09aae..3ca59c63eefc 100644 --- a/processing/src/main/java/io/druid/query/aggregation/MetricCombiner.java +++ b/processing/src/main/java/io/druid/query/aggregation/AggregateCombiner.java @@ -22,23 +22,24 @@ import io.druid.segment.ColumnValueSelector; /** - * MetricCombiner is used to combine rollup aggregation results from serveral "rows" of different indexes during index + * AggregateCombiner is used to combine rollup aggregation results from serveral "rows" of different indexes during index * merging (see {@link io.druid.segment.IndexMerger}). * - * The state of the implementations of this interface is a metric value (either a primitive or an object), that could be - * queried via {@link ColumnValueSelector}'s methods. Before {@link #reset} is ever called on a MetricCombiner, it's - * state is undefined and {@link ColumnValueSelector}'s methods could return something random, or throw an exception. + * The state of the implementations of this interface is an aggregation value (either a primitive or an object), that + * could be queried via {@link ColumnValueSelector}'s methods. Before {@link #reset} is ever called on an + * AggregateCombiner, it's state is undefined and {@link ColumnValueSelector}'s methods could return something random, + * or null, or throw an exception. * - * @see AggregatorFactory#makeMetricCombiner() - * @see LongMetricCombiner - * @see DoubleMetricCombiner - * @see ObjectMetricCombiner + * @see AggregatorFactory#makeAggregateCombiner() + * @see LongAggregateCombiner + * @see DoubleAggregateCombiner + * @see ObjectAggregateCombiner */ -public interface MetricCombiner extends ColumnValueSelector +public interface AggregateCombiner extends ColumnValueSelector { /** - * Resets this MetricCombiner's state value to the value of the given selector, e. g. after calling this method - * metricCombiner.get*() should return the same value as selector.get*(). + * Resets this AggregateCombiner's state value to the value of the given selector, e. g. after calling this method + * combiner.get*() should return the same value as selector.get*(). * * If the selector is an {@link io.druid.segment.ObjectColumnSelector}, the object returned from {@link * io.druid.segment.ObjectColumnSelector#get()} must not be modified, and must not become a subject for modification @@ -47,17 +48,17 @@ public interface MetricCombiner extends ColumnValueSelector void reset(ColumnValueSelector selector); /** - * Combines this MetricCombiner's state value with the value of the given selector and saves it in this - * MetricCombiner's state, e. g. after calling metricCombiner.combine(selector), metricCombiner.get*() should return - * the value that would be the result of {@link AggregatorFactory#combine - * aggregatorFactory.combine(metricCombiner.get*(), selector.get*())} call. + * Combines this AggregateCombiner's state value with the value of the given selector and saves it in this + * AggregateCombiner's state, e. g. after calling combiner.combine(selector), combiner.get*() should return the value + * that would be the result of {@link AggregatorFactory#combine + * aggregatorFactory.combine(combiner.get*(), selector.get*())} call. * * Unlike {@link AggregatorFactory#combine}, if the selector is an {@link io.druid.segment.ObjectColumnSelector}, the * object returned from {@link io.druid.segment.ObjectColumnSelector#get()} must not be modified, and must not become * a subject for modification during subsequent combine() calls. * - * Since the state of MetricCombiner is underfined before {@link #reset} is ever called on it, the effects of calling - * combine() on a MetricCombiner instance are also underfined in this case. + * Since the state of AggregateCombiner is underfined before {@link #reset} is ever called on it, the effects of calling + * combine() on an AggregateCombiner instance are also underfined in this case. * * @see AggregatorFactory#combine */ diff --git a/processing/src/main/java/io/druid/query/aggregation/AggregatorFactory.java b/processing/src/main/java/io/druid/query/aggregation/AggregatorFactory.java index 900415d50bfa..43c21e59bbe8 100644 --- a/processing/src/main/java/io/druid/query/aggregation/AggregatorFactory.java +++ b/processing/src/main/java/io/druid/query/aggregation/AggregatorFactory.java @@ -59,17 +59,17 @@ public abstract class AggregatorFactory implements Cacheable public abstract Object combine(Object lhs, Object rhs); /** - * Creates a MetricCombiner to combine rollup aggregation results from serveral "rows" of different indexes during - * index merging. MetricCombiner implements the same logic as {@link #combine}, with the difference that it uses + * Creates an AggregateCombiner to combine rollup aggregation results from serveral "rows" of different indexes during + * index merging. AggregateCombiner implements the same logic as {@link #combine}, with the difference that it uses * {@link io.druid.segment.ColumnValueSelector} and it's subinterfaces to get inputs and implements {@code * ColumnValueSelector} to provide output. * - * @see MetricCombiner + * @see AggregateCombiner * @see io.druid.segment.IndexMerger */ - public MetricCombiner makeMetricCombiner() + public AggregateCombiner makeAggregateCombiner() { - throw new UOE("[%s] does not implement makeMetricCombiner()", this.getClass().getName()); + throw new UOE("[%s] does not implement makeAggregateCombiner()", this.getClass().getName()); } /** diff --git a/processing/src/main/java/io/druid/query/aggregation/CountAggregatorFactory.java b/processing/src/main/java/io/druid/query/aggregation/CountAggregatorFactory.java index 3e023e122737..1e1be24b2e05 100644 --- a/processing/src/main/java/io/druid/query/aggregation/CountAggregatorFactory.java +++ b/processing/src/main/java/io/druid/query/aggregation/CountAggregatorFactory.java @@ -71,9 +71,9 @@ public Object combine(Object lhs, Object rhs) } @Override - public MetricCombiner makeMetricCombiner() + public AggregateCombiner makeAggregateCombiner() { - return new LongSumMetricCombiner(); + return new LongSumAggregateCombiner(); } @Override diff --git a/processing/src/main/java/io/druid/query/aggregation/DoubleMetricCombiner.java b/processing/src/main/java/io/druid/query/aggregation/DoubleAggregateCombiner.java similarity index 81% rename from processing/src/main/java/io/druid/query/aggregation/DoubleMetricCombiner.java rename to processing/src/main/java/io/druid/query/aggregation/DoubleAggregateCombiner.java index 7a113a7f4b26..821a070e1b23 100644 --- a/processing/src/main/java/io/druid/query/aggregation/DoubleMetricCombiner.java +++ b/processing/src/main/java/io/druid/query/aggregation/DoubleAggregateCombiner.java @@ -23,13 +23,13 @@ import io.druid.segment.DoubleColumnSelector; /** - * Specialization of {@link MetricCombiner} for primitive double metrics. + * Specialization of {@link AggregateCombiner} for primitive double aggregations. */ -public abstract class DoubleMetricCombiner implements MetricCombiner, DoubleColumnSelector +public abstract class DoubleAggregateCombiner implements AggregateCombiner, DoubleColumnSelector { @Override public void inspectRuntimeShape(RuntimeShapeInspector inspector) { - // Usually MetricCombiner has nothing to inspect + // Usually AggregateCombiner has nothing to inspect } } diff --git a/processing/src/main/java/io/druid/query/aggregation/DoubleMaxMetricCombiner.java b/processing/src/main/java/io/druid/query/aggregation/DoubleMaxAggregateCombiner.java similarity index 94% rename from processing/src/main/java/io/druid/query/aggregation/DoubleMaxMetricCombiner.java rename to processing/src/main/java/io/druid/query/aggregation/DoubleMaxAggregateCombiner.java index 7a4850f3f54f..2fd5306d9474 100644 --- a/processing/src/main/java/io/druid/query/aggregation/DoubleMaxMetricCombiner.java +++ b/processing/src/main/java/io/druid/query/aggregation/DoubleMaxAggregateCombiner.java @@ -21,7 +21,7 @@ import io.druid.segment.ColumnValueSelector; -final class DoubleMaxMetricCombiner extends DoubleMetricCombiner +final class DoubleMaxAggregateCombiner extends DoubleAggregateCombiner { private double max; diff --git a/processing/src/main/java/io/druid/query/aggregation/DoubleMaxAggregatorFactory.java b/processing/src/main/java/io/druid/query/aggregation/DoubleMaxAggregatorFactory.java index 2a3b3ebe545e..ff73f2f8c0eb 100644 --- a/processing/src/main/java/io/druid/query/aggregation/DoubleMaxAggregatorFactory.java +++ b/processing/src/main/java/io/druid/query/aggregation/DoubleMaxAggregatorFactory.java @@ -71,9 +71,9 @@ public Object combine(Object lhs, Object rhs) } @Override - public MetricCombiner makeMetricCombiner() + public AggregateCombiner makeAggregateCombiner() { - return new DoubleMaxMetricCombiner(); + return new DoubleMaxAggregateCombiner(); } @Override diff --git a/processing/src/main/java/io/druid/query/aggregation/DoubleMinMetricCombiner.java b/processing/src/main/java/io/druid/query/aggregation/DoubleMinAggregateCombiner.java similarity index 94% rename from processing/src/main/java/io/druid/query/aggregation/DoubleMinMetricCombiner.java rename to processing/src/main/java/io/druid/query/aggregation/DoubleMinAggregateCombiner.java index f5d7d45cfb15..3508336613ed 100644 --- a/processing/src/main/java/io/druid/query/aggregation/DoubleMinMetricCombiner.java +++ b/processing/src/main/java/io/druid/query/aggregation/DoubleMinAggregateCombiner.java @@ -21,7 +21,7 @@ import io.druid.segment.ColumnValueSelector; -final class DoubleMinMetricCombiner extends DoubleMetricCombiner +final class DoubleMinAggregateCombiner extends DoubleAggregateCombiner { private double min; diff --git a/processing/src/main/java/io/druid/query/aggregation/DoubleMinAggregatorFactory.java b/processing/src/main/java/io/druid/query/aggregation/DoubleMinAggregatorFactory.java index 68110f0b3c6d..8e696a8c93d6 100644 --- a/processing/src/main/java/io/druid/query/aggregation/DoubleMinAggregatorFactory.java +++ b/processing/src/main/java/io/druid/query/aggregation/DoubleMinAggregatorFactory.java @@ -77,9 +77,9 @@ public Object combine(Object lhs, Object rhs) } @Override - public MetricCombiner makeMetricCombiner() + public AggregateCombiner makeAggregateCombiner() { - return new DoubleMinMetricCombiner(); + return new DoubleMinAggregateCombiner(); } @Override diff --git a/processing/src/main/java/io/druid/query/aggregation/DoubleSumMetricCombiner.java b/processing/src/main/java/io/druid/query/aggregation/DoubleSumAggregateCombiner.java similarity index 94% rename from processing/src/main/java/io/druid/query/aggregation/DoubleSumMetricCombiner.java rename to processing/src/main/java/io/druid/query/aggregation/DoubleSumAggregateCombiner.java index b3bb5a83cddb..87adc6115724 100644 --- a/processing/src/main/java/io/druid/query/aggregation/DoubleSumMetricCombiner.java +++ b/processing/src/main/java/io/druid/query/aggregation/DoubleSumAggregateCombiner.java @@ -21,7 +21,7 @@ import io.druid.segment.ColumnValueSelector; -final class DoubleSumMetricCombiner extends DoubleMetricCombiner +final class DoubleSumAggregateCombiner extends DoubleAggregateCombiner { private double sum; diff --git a/processing/src/main/java/io/druid/query/aggregation/DoubleSumAggregatorFactory.java b/processing/src/main/java/io/druid/query/aggregation/DoubleSumAggregatorFactory.java index 32fa9dac4c17..50ec11bed556 100644 --- a/processing/src/main/java/io/druid/query/aggregation/DoubleSumAggregatorFactory.java +++ b/processing/src/main/java/io/druid/query/aggregation/DoubleSumAggregatorFactory.java @@ -71,9 +71,9 @@ public Object combine(Object lhs, Object rhs) } @Override - public MetricCombiner makeMetricCombiner() + public AggregateCombiner makeAggregateCombiner() { - return new DoubleSumMetricCombiner(); + return new DoubleSumAggregateCombiner(); } @Override diff --git a/processing/src/main/java/io/druid/query/aggregation/FilteredAggregatorFactory.java b/processing/src/main/java/io/druid/query/aggregation/FilteredAggregatorFactory.java index de5c4b1206c9..bb4ca9bfb08d 100644 --- a/processing/src/main/java/io/druid/query/aggregation/FilteredAggregatorFactory.java +++ b/processing/src/main/java/io/druid/query/aggregation/FilteredAggregatorFactory.java @@ -80,9 +80,9 @@ public Object combine(Object lhs, Object rhs) } @Override - public MetricCombiner makeMetricCombiner() + public AggregateCombiner makeAggregateCombiner() { - return delegate.makeMetricCombiner(); + return delegate.makeAggregateCombiner(); } @Override diff --git a/processing/src/main/java/io/druid/query/aggregation/FloatMaxAggregatorFactory.java b/processing/src/main/java/io/druid/query/aggregation/FloatMaxAggregatorFactory.java index 065f121fe927..d6627ef3adae 100644 --- a/processing/src/main/java/io/druid/query/aggregation/FloatMaxAggregatorFactory.java +++ b/processing/src/main/java/io/druid/query/aggregation/FloatMaxAggregatorFactory.java @@ -70,9 +70,9 @@ public Object combine(Object lhs, Object rhs) } @Override - public MetricCombiner makeMetricCombiner() + public AggregateCombiner makeAggregateCombiner() { - return new DoubleMaxMetricCombiner(); + return new DoubleMaxAggregateCombiner(); } @Override diff --git a/processing/src/main/java/io/druid/query/aggregation/FloatMinAggregatorFactory.java b/processing/src/main/java/io/druid/query/aggregation/FloatMinAggregatorFactory.java index 2028f0a31055..c4459268cfdb 100644 --- a/processing/src/main/java/io/druid/query/aggregation/FloatMinAggregatorFactory.java +++ b/processing/src/main/java/io/druid/query/aggregation/FloatMinAggregatorFactory.java @@ -70,9 +70,9 @@ public Object combine(Object lhs, Object rhs) } @Override - public MetricCombiner makeMetricCombiner() + public AggregateCombiner makeAggregateCombiner() { - return new DoubleMinMetricCombiner(); + return new DoubleMinAggregateCombiner(); } @Override diff --git a/processing/src/main/java/io/druid/query/aggregation/FloatSumAggregatorFactory.java b/processing/src/main/java/io/druid/query/aggregation/FloatSumAggregatorFactory.java index 591b7db8e71b..30719cefce89 100644 --- a/processing/src/main/java/io/druid/query/aggregation/FloatSumAggregatorFactory.java +++ b/processing/src/main/java/io/druid/query/aggregation/FloatSumAggregatorFactory.java @@ -70,9 +70,9 @@ public Object combine(Object lhs, Object rhs) } @Override - public MetricCombiner makeMetricCombiner() + public AggregateCombiner makeAggregateCombiner() { - return new DoubleSumMetricCombiner(); + return new DoubleSumAggregateCombiner(); } @Override diff --git a/processing/src/main/java/io/druid/query/aggregation/HistogramAggregatorFactory.java b/processing/src/main/java/io/druid/query/aggregation/HistogramAggregatorFactory.java index ca08bd0aec3f..b1fb73cb47c3 100644 --- a/processing/src/main/java/io/druid/query/aggregation/HistogramAggregatorFactory.java +++ b/processing/src/main/java/io/druid/query/aggregation/HistogramAggregatorFactory.java @@ -92,11 +92,11 @@ public Object combine(Object lhs, Object rhs) } @Override - public MetricCombiner makeMetricCombiner() + public AggregateCombiner makeAggregateCombiner() { // HistogramAggregatorFactory.combine() delegates to HistogramAggregator.combineHistograms() and it doesn't check - // for nulls, so this MetricCombiner neither. - return new ObjectMetricCombiner() + // for nulls, so this AggregateCombiner neither. + return new ObjectAggregateCombiner() { private Histogram combined; diff --git a/processing/src/main/java/io/druid/query/aggregation/JavaScriptAggregatorFactory.java b/processing/src/main/java/io/druid/query/aggregation/JavaScriptAggregatorFactory.java index 598a2804a8cd..ca7ef3a9f170 100644 --- a/processing/src/main/java/io/druid/query/aggregation/JavaScriptAggregatorFactory.java +++ b/processing/src/main/java/io/druid/query/aggregation/JavaScriptAggregatorFactory.java @@ -142,9 +142,9 @@ public Object combine(Object lhs, Object rhs) } @Override - public MetricCombiner makeMetricCombiner() + public AggregateCombiner makeAggregateCombiner() { - return new DoubleMetricCombiner() + return new DoubleAggregateCombiner() { private double combined; diff --git a/processing/src/main/java/io/druid/query/aggregation/LongMetricCombiner.java b/processing/src/main/java/io/druid/query/aggregation/LongAggregateCombiner.java similarity index 82% rename from processing/src/main/java/io/druid/query/aggregation/LongMetricCombiner.java rename to processing/src/main/java/io/druid/query/aggregation/LongAggregateCombiner.java index 6ba8b386ff06..949f4164a27f 100644 --- a/processing/src/main/java/io/druid/query/aggregation/LongMetricCombiner.java +++ b/processing/src/main/java/io/druid/query/aggregation/LongAggregateCombiner.java @@ -23,13 +23,13 @@ import io.druid.segment.LongColumnSelector; /** - * Specialization of {@link MetricCombiner} for primitive long metrics. + * Specialization of {@link AggregateCombiner} for primitive long aggregations. */ -public abstract class LongMetricCombiner implements MetricCombiner, LongColumnSelector +public abstract class LongAggregateCombiner implements AggregateCombiner, LongColumnSelector { @Override public void inspectRuntimeShape(RuntimeShapeInspector inspector) { - // Usually MetricCombiner has nothing to inspect + // Usually AggregateCombiner has nothing to inspect } } diff --git a/processing/src/main/java/io/druid/query/aggregation/LongMaxAggregatorFactory.java b/processing/src/main/java/io/druid/query/aggregation/LongMaxAggregatorFactory.java index 6b4ed37959e3..ba806d378094 100644 --- a/processing/src/main/java/io/druid/query/aggregation/LongMaxAggregatorFactory.java +++ b/processing/src/main/java/io/druid/query/aggregation/LongMaxAggregatorFactory.java @@ -102,9 +102,9 @@ public Object combine(Object lhs, Object rhs) } @Override - public MetricCombiner makeMetricCombiner() + public AggregateCombiner makeAggregateCombiner() { - return new LongMetricCombiner() + return new LongAggregateCombiner() { private long max; diff --git a/processing/src/main/java/io/druid/query/aggregation/LongMinAggregatorFactory.java b/processing/src/main/java/io/druid/query/aggregation/LongMinAggregatorFactory.java index 7e07377e12b9..75c471c4774b 100644 --- a/processing/src/main/java/io/druid/query/aggregation/LongMinAggregatorFactory.java +++ b/processing/src/main/java/io/druid/query/aggregation/LongMinAggregatorFactory.java @@ -103,9 +103,9 @@ public Object combine(Object lhs, Object rhs) } @Override - public MetricCombiner makeMetricCombiner() + public AggregateCombiner makeAggregateCombiner() { - return new LongMetricCombiner() + return new LongAggregateCombiner() { private long min; diff --git a/processing/src/main/java/io/druid/query/aggregation/LongSumMetricCombiner.java b/processing/src/main/java/io/druid/query/aggregation/LongSumAggregateCombiner.java similarity index 94% rename from processing/src/main/java/io/druid/query/aggregation/LongSumMetricCombiner.java rename to processing/src/main/java/io/druid/query/aggregation/LongSumAggregateCombiner.java index 1bfe06a0dd60..34e0ff16ae05 100644 --- a/processing/src/main/java/io/druid/query/aggregation/LongSumMetricCombiner.java +++ b/processing/src/main/java/io/druid/query/aggregation/LongSumAggregateCombiner.java @@ -21,7 +21,7 @@ import io.druid.segment.ColumnValueSelector; -public final class LongSumMetricCombiner extends LongMetricCombiner +public final class LongSumAggregateCombiner extends LongAggregateCombiner { private long sum; diff --git a/processing/src/main/java/io/druid/query/aggregation/LongSumAggregatorFactory.java b/processing/src/main/java/io/druid/query/aggregation/LongSumAggregatorFactory.java index 22c9799a0f1f..cec708eb4a60 100644 --- a/processing/src/main/java/io/druid/query/aggregation/LongSumAggregatorFactory.java +++ b/processing/src/main/java/io/druid/query/aggregation/LongSumAggregatorFactory.java @@ -101,9 +101,9 @@ public Object combine(Object lhs, Object rhs) } @Override - public MetricCombiner makeMetricCombiner() + public AggregateCombiner makeAggregateCombiner() { - return new LongSumMetricCombiner(); + return new LongSumAggregateCombiner(); } @Override diff --git a/processing/src/main/java/io/druid/query/aggregation/ObjectMetricCombiner.java b/processing/src/main/java/io/druid/query/aggregation/ObjectAggregateCombiner.java similarity index 83% rename from processing/src/main/java/io/druid/query/aggregation/ObjectMetricCombiner.java rename to processing/src/main/java/io/druid/query/aggregation/ObjectAggregateCombiner.java index 5e27941e35a6..29bc0950982f 100644 --- a/processing/src/main/java/io/druid/query/aggregation/ObjectMetricCombiner.java +++ b/processing/src/main/java/io/druid/query/aggregation/ObjectAggregateCombiner.java @@ -22,8 +22,8 @@ import io.druid.segment.ObjectColumnSelector; /** - * Specialization of {@link MetricCombiner} for object metrics. + * Specialization of {@link AggregateCombiner} for object aggregations. */ -public abstract class ObjectMetricCombiner implements MetricCombiner, ObjectColumnSelector +public abstract class ObjectAggregateCombiner implements AggregateCombiner, ObjectColumnSelector { } diff --git a/processing/src/main/java/io/druid/query/aggregation/cardinality/CardinalityAggregatorFactory.java b/processing/src/main/java/io/druid/query/aggregation/cardinality/CardinalityAggregatorFactory.java index 80bacc81ffc5..177815391add 100644 --- a/processing/src/main/java/io/druid/query/aggregation/cardinality/CardinalityAggregatorFactory.java +++ b/processing/src/main/java/io/druid/query/aggregation/cardinality/CardinalityAggregatorFactory.java @@ -28,12 +28,12 @@ import io.druid.hll.HyperLogLogCollector; import io.druid.java.util.common.StringUtils; import io.druid.query.ColumnSelectorPlus; +import io.druid.query.aggregation.AggregateCombiner; import io.druid.query.aggregation.Aggregator; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.AggregatorFactoryNotMergeableException; import io.druid.query.aggregation.AggregatorUtil; import io.druid.query.aggregation.BufferAggregator; -import io.druid.query.aggregation.MetricCombiner; import io.druid.query.aggregation.NoopAggregator; import io.druid.query.aggregation.NoopBufferAggregator; import io.druid.query.aggregation.cardinality.types.CardinalityAggregatorColumnSelectorStrategy; @@ -193,9 +193,9 @@ public Object combine(Object lhs, Object rhs) } @Override - public MetricCombiner makeMetricCombiner() + public AggregateCombiner makeAggregateCombiner() { - return new HyperLogLogCollectorMetricCombiner(); + return new HyperLogLogCollectorAggregateCombiner(); } @Override diff --git a/processing/src/main/java/io/druid/query/aggregation/cardinality/HyperLogLogCollectorMetricCombiner.java b/processing/src/main/java/io/druid/query/aggregation/cardinality/HyperLogLogCollectorAggregateCombiner.java similarity index 91% rename from processing/src/main/java/io/druid/query/aggregation/cardinality/HyperLogLogCollectorMetricCombiner.java rename to processing/src/main/java/io/druid/query/aggregation/cardinality/HyperLogLogCollectorAggregateCombiner.java index 4789727868dd..c5eced672bb4 100644 --- a/processing/src/main/java/io/druid/query/aggregation/cardinality/HyperLogLogCollectorMetricCombiner.java +++ b/processing/src/main/java/io/druid/query/aggregation/cardinality/HyperLogLogCollectorAggregateCombiner.java @@ -20,13 +20,13 @@ package io.druid.query.aggregation.cardinality; import io.druid.hll.HyperLogLogCollector; -import io.druid.query.aggregation.ObjectMetricCombiner; +import io.druid.query.aggregation.ObjectAggregateCombiner; import io.druid.segment.ColumnValueSelector; import io.druid.segment.ObjectColumnSelector; import javax.annotation.Nullable; -public final class HyperLogLogCollectorMetricCombiner extends ObjectMetricCombiner +public final class HyperLogLogCollectorAggregateCombiner extends ObjectAggregateCombiner { @Nullable private HyperLogLogCollector combined; diff --git a/processing/src/main/java/io/druid/query/aggregation/first/DoubleFirstAggregatorFactory.java b/processing/src/main/java/io/druid/query/aggregation/first/DoubleFirstAggregatorFactory.java index 1110551a26ac..783ba82d2b3b 100644 --- a/processing/src/main/java/io/druid/query/aggregation/first/DoubleFirstAggregatorFactory.java +++ b/processing/src/main/java/io/druid/query/aggregation/first/DoubleFirstAggregatorFactory.java @@ -32,7 +32,7 @@ import io.druid.query.aggregation.AggregatorFactoryNotMergeableException; import io.druid.query.aggregation.AggregatorUtil; import io.druid.query.aggregation.BufferAggregator; -import io.druid.query.aggregation.MetricCombiner; +import io.druid.query.aggregation.AggregateCombiner; import io.druid.query.monomorphicprocessing.RuntimeShapeInspector; import io.druid.segment.ColumnSelectorFactory; import io.druid.segment.ObjectColumnSelector; @@ -105,7 +105,7 @@ public Object combine(Object lhs, Object rhs) } @Override - public MetricCombiner makeMetricCombiner() + public AggregateCombiner makeAggregateCombiner() { throw new UOE("DoubleFirstAggregatorFactory is not supported during ingestion for rollup"); } diff --git a/processing/src/main/java/io/druid/query/aggregation/first/FloatFirstAggregatorFactory.java b/processing/src/main/java/io/druid/query/aggregation/first/FloatFirstAggregatorFactory.java index 9385f7a19ba2..e26b0228f8a2 100644 --- a/processing/src/main/java/io/druid/query/aggregation/first/FloatFirstAggregatorFactory.java +++ b/processing/src/main/java/io/druid/query/aggregation/first/FloatFirstAggregatorFactory.java @@ -32,7 +32,7 @@ import io.druid.query.aggregation.AggregatorFactoryNotMergeableException; import io.druid.query.aggregation.AggregatorUtil; import io.druid.query.aggregation.BufferAggregator; -import io.druid.query.aggregation.MetricCombiner; +import io.druid.query.aggregation.AggregateCombiner; import io.druid.query.monomorphicprocessing.RuntimeShapeInspector; import io.druid.segment.ColumnSelectorFactory; import io.druid.segment.ObjectColumnSelector; @@ -105,7 +105,7 @@ public Object combine(Object lhs, Object rhs) } @Override - public MetricCombiner makeMetricCombiner() + public AggregateCombiner makeAggregateCombiner() { throw new UOE("FloatFirstAggregatorFactory is not supported during ingestion for rollup"); } diff --git a/processing/src/main/java/io/druid/query/aggregation/first/LongFirstAggregatorFactory.java b/processing/src/main/java/io/druid/query/aggregation/first/LongFirstAggregatorFactory.java index 444b02742760..96ff43d2792d 100644 --- a/processing/src/main/java/io/druid/query/aggregation/first/LongFirstAggregatorFactory.java +++ b/processing/src/main/java/io/druid/query/aggregation/first/LongFirstAggregatorFactory.java @@ -31,7 +31,7 @@ import io.druid.query.aggregation.AggregatorFactoryNotMergeableException; import io.druid.query.aggregation.AggregatorUtil; import io.druid.query.aggregation.BufferAggregator; -import io.druid.query.aggregation.MetricCombiner; +import io.druid.query.aggregation.AggregateCombiner; import io.druid.query.monomorphicprocessing.RuntimeShapeInspector; import io.druid.segment.ColumnSelectorFactory; import io.druid.segment.ObjectColumnSelector; @@ -98,7 +98,7 @@ public Object combine(Object lhs, Object rhs) } @Override - public MetricCombiner makeMetricCombiner() + public AggregateCombiner makeAggregateCombiner() { throw new UOE("LongFirstAggregatorFactory is not supported during ingestion for rollup"); } diff --git a/processing/src/main/java/io/druid/query/aggregation/hyperloglog/HyperUniquesAggregatorFactory.java b/processing/src/main/java/io/druid/query/aggregation/hyperloglog/HyperUniquesAggregatorFactory.java index 84e9ab2550ed..fd0850ce352b 100644 --- a/processing/src/main/java/io/druid/query/aggregation/hyperloglog/HyperUniquesAggregatorFactory.java +++ b/processing/src/main/java/io/druid/query/aggregation/hyperloglog/HyperUniquesAggregatorFactory.java @@ -25,15 +25,15 @@ import io.druid.java.util.common.IAE; import io.druid.java.util.common.StringUtils; import io.druid.java.util.common.guava.Comparators; +import io.druid.query.aggregation.AggregateCombiner; import io.druid.query.aggregation.Aggregator; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.AggregatorFactoryNotMergeableException; import io.druid.query.aggregation.AggregatorUtil; import io.druid.query.aggregation.BufferAggregator; -import io.druid.query.aggregation.MetricCombiner; import io.druid.query.aggregation.NoopAggregator; import io.druid.query.aggregation.NoopBufferAggregator; -import io.druid.query.aggregation.cardinality.HyperLogLogCollectorMetricCombiner; +import io.druid.query.aggregation.cardinality.HyperLogLogCollectorAggregateCombiner; import io.druid.segment.ColumnSelectorFactory; import io.druid.segment.ObjectColumnSelector; import org.apache.commons.codec.binary.Base64; @@ -139,9 +139,9 @@ public Object combine(Object lhs, Object rhs) } @Override - public MetricCombiner makeMetricCombiner() + public AggregateCombiner makeAggregateCombiner() { - return new HyperLogLogCollectorMetricCombiner(); + return new HyperLogLogCollectorAggregateCombiner(); } @Override diff --git a/processing/src/main/java/io/druid/query/aggregation/last/DoubleLastAggregatorFactory.java b/processing/src/main/java/io/druid/query/aggregation/last/DoubleLastAggregatorFactory.java index 0b5f2c4f5814..ddaec8698897 100644 --- a/processing/src/main/java/io/druid/query/aggregation/last/DoubleLastAggregatorFactory.java +++ b/processing/src/main/java/io/druid/query/aggregation/last/DoubleLastAggregatorFactory.java @@ -26,12 +26,12 @@ import com.metamx.common.StringUtils; import io.druid.collections.SerializablePair; import io.druid.java.util.common.UOE; +import io.druid.query.aggregation.AggregateCombiner; import io.druid.query.aggregation.Aggregator; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.AggregatorFactoryNotMergeableException; import io.druid.query.aggregation.AggregatorUtil; import io.druid.query.aggregation.BufferAggregator; -import io.druid.query.aggregation.MetricCombiner; import io.druid.query.aggregation.first.DoubleFirstAggregatorFactory; import io.druid.query.aggregation.first.LongFirstAggregatorFactory; import io.druid.query.monomorphicprocessing.RuntimeShapeInspector; @@ -96,7 +96,7 @@ public Object combine(Object lhs, Object rhs) } @Override - public MetricCombiner makeMetricCombiner() + public AggregateCombiner makeAggregateCombiner() { throw new UOE("DoubleLastAggregatorFactory is not supported during ingestion for rollup"); } diff --git a/processing/src/main/java/io/druid/query/aggregation/last/FloatLastAggregatorFactory.java b/processing/src/main/java/io/druid/query/aggregation/last/FloatLastAggregatorFactory.java index d0f9be9167d9..34f818e490bf 100644 --- a/processing/src/main/java/io/druid/query/aggregation/last/FloatLastAggregatorFactory.java +++ b/processing/src/main/java/io/druid/query/aggregation/last/FloatLastAggregatorFactory.java @@ -31,7 +31,7 @@ import io.druid.query.aggregation.AggregatorFactoryNotMergeableException; import io.druid.query.aggregation.AggregatorUtil; import io.druid.query.aggregation.BufferAggregator; -import io.druid.query.aggregation.MetricCombiner; +import io.druid.query.aggregation.AggregateCombiner; import io.druid.query.aggregation.first.FloatFirstAggregatorFactory; import io.druid.query.aggregation.first.LongFirstAggregatorFactory; import io.druid.query.monomorphicprocessing.RuntimeShapeInspector; @@ -96,7 +96,7 @@ public Object combine(Object lhs, Object rhs) } @Override - public MetricCombiner makeMetricCombiner() + public AggregateCombiner makeAggregateCombiner() { throw new UOE("FloatLastAggregatorFactory is not supported during ingestion for rollup"); } diff --git a/processing/src/main/java/io/druid/query/aggregation/last/LongLastAggregatorFactory.java b/processing/src/main/java/io/druid/query/aggregation/last/LongLastAggregatorFactory.java index f0dfef65bc4c..23bd19345231 100644 --- a/processing/src/main/java/io/druid/query/aggregation/last/LongLastAggregatorFactory.java +++ b/processing/src/main/java/io/druid/query/aggregation/last/LongLastAggregatorFactory.java @@ -30,7 +30,7 @@ import io.druid.query.aggregation.AggregatorFactoryNotMergeableException; import io.druid.query.aggregation.AggregatorUtil; import io.druid.query.aggregation.BufferAggregator; -import io.druid.query.aggregation.MetricCombiner; +import io.druid.query.aggregation.AggregateCombiner; import io.druid.query.aggregation.first.DoubleFirstAggregatorFactory; import io.druid.query.aggregation.first.LongFirstAggregatorFactory; import io.druid.query.monomorphicprocessing.RuntimeShapeInspector; @@ -94,7 +94,7 @@ public Object combine(Object lhs, Object rhs) } @Override - public MetricCombiner makeMetricCombiner() + public AggregateCombiner makeAggregateCombiner() { throw new UOE("LongLastAggregatorFactory is not supported during ingestion for rollup"); } From 455820df766db28b82ae5323800cb8d4fe5170d4 Mon Sep 17 00:00:00 2001 From: leventov Date: Thu, 17 Aug 2017 21:02:48 +0300 Subject: [PATCH 3/5] Spelling --- .../java/io/druid/query/aggregation/AggregateCombiner.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/processing/src/main/java/io/druid/query/aggregation/AggregateCombiner.java b/processing/src/main/java/io/druid/query/aggregation/AggregateCombiner.java index 3ca59c63eefc..410bca05a12d 100644 --- a/processing/src/main/java/io/druid/query/aggregation/AggregateCombiner.java +++ b/processing/src/main/java/io/druid/query/aggregation/AggregateCombiner.java @@ -57,8 +57,8 @@ public interface AggregateCombiner extends ColumnValueSelector * object returned from {@link io.druid.segment.ObjectColumnSelector#get()} must not be modified, and must not become * a subject for modification during subsequent combine() calls. * - * Since the state of AggregateCombiner is underfined before {@link #reset} is ever called on it, the effects of calling - * combine() on an AggregateCombiner instance are also underfined in this case. + * Since the state of AggregateCombiner is undefined before {@link #reset} is ever called on it, the effects of calling + * combine() on an AggregateCombiner instance are also undefined in this case. * * @see AggregatorFactory#combine */ From 8757b6ceeccfb0d45146e09497e58069c02fb118 Mon Sep 17 00:00:00 2001 From: leventov Date: Fri, 18 Aug 2017 17:54:13 +0300 Subject: [PATCH 4/5] Fix TimestampAggregatorFactory.combine() and add makeAggregateCombiner() implementation --- .../aggregation/TimestampAggregator.java | 8 +++- .../TimestampAggregatorFactory.java | 43 +++++++++++++++++-- .../aggregation/DoubleMaxAggregator.java | 4 -- .../aggregation/DoubleMinAggregator.java | 4 -- .../DoubleMinAggregatorFactory.java | 7 --- .../query/aggregation/FloatMaxAggregator.java | 4 -- .../query/aggregation/FloatMinAggregator.java | 4 -- 7 files changed, 46 insertions(+), 28 deletions(-) diff --git a/extensions-contrib/time-min-max/src/main/java/io/druid/query/aggregation/TimestampAggregator.java b/extensions-contrib/time-min-max/src/main/java/io/druid/query/aggregation/TimestampAggregator.java index ecd4921fcea2..78209b1acd63 100644 --- a/extensions-contrib/time-min-max/src/main/java/io/druid/query/aggregation/TimestampAggregator.java +++ b/extensions-contrib/time-min-max/src/main/java/io/druid/query/aggregation/TimestampAggregator.java @@ -28,9 +28,13 @@ public class TimestampAggregator implements Aggregator { static final Comparator COMPARATOR = LongMaxAggregator.COMPARATOR; - static long combineValues(Object lhs, Object rhs) + static Object combineValues(Comparator comparator, Object lhs, Object rhs) { - return Math.max(((Number) lhs).longValue(), ((Number) rhs).longValue()); + if (comparator.compare(((Number) lhs).longValue(), ((Number) rhs).longValue()) > 0) { + return lhs; + } else { + return rhs; + } } private final ObjectColumnSelector selector; diff --git a/extensions-contrib/time-min-max/src/main/java/io/druid/query/aggregation/TimestampAggregatorFactory.java b/extensions-contrib/time-min-max/src/main/java/io/druid/query/aggregation/TimestampAggregatorFactory.java index a50e9db12fc3..d801d9389a1b 100644 --- a/extensions-contrib/time-min-max/src/main/java/io/druid/query/aggregation/TimestampAggregatorFactory.java +++ b/extensions-contrib/time-min-max/src/main/java/io/druid/query/aggregation/TimestampAggregatorFactory.java @@ -23,8 +23,9 @@ import com.google.common.primitives.Longs; import io.druid.data.input.impl.TimestampSpec; import io.druid.java.util.common.StringUtils; -import io.druid.java.util.common.UOE; import io.druid.segment.ColumnSelectorFactory; +import io.druid.segment.ColumnValueSelector; +import io.druid.segment.ObjectColumnSelector; import org.joda.time.DateTime; import java.nio.ByteBuffer; @@ -83,13 +84,49 @@ public Comparator getComparator() @Override public Object combine(Object lhs, Object rhs) { - return TimestampAggregator.combineValues(lhs, rhs); + return TimestampAggregator.combineValues(comparator, lhs, rhs); } @Override public AggregateCombiner makeAggregateCombiner() { - throw new UOE("[%s] is not supported during ingestion for rollup", getClass().getSimpleName()); + // TimestampAggregatorFactory.combine() delegates to TimestampAggregator.combineValues() and it doesn't check + // for nulls, so this AggregateCombiner neither. + return new LongAggregateCombiner() + { + private long result; + + @Override + public void reset(ColumnValueSelector selector) + { + result = getTimestamp(selector); + } + + private long getTimestamp(ColumnValueSelector selector) + { + if (selector instanceof ObjectColumnSelector) { + Object input = ((ObjectColumnSelector) selector).get(); + return convertLong(timestampSpec, input); + } else { + return selector.getLong(); + } + } + + @Override + public void combine(ColumnValueSelector selector) + { + long other = getTimestamp(selector); + if (comparator.compare(result, other) <= 0) { + result = other; + } + } + + @Override + public long getLong() + { + return result; + } + }; } @Override diff --git a/processing/src/main/java/io/druid/query/aggregation/DoubleMaxAggregator.java b/processing/src/main/java/io/druid/query/aggregation/DoubleMaxAggregator.java index 6b4f384d51c7..dcf4eadc61de 100644 --- a/processing/src/main/java/io/druid/query/aggregation/DoubleMaxAggregator.java +++ b/processing/src/main/java/io/druid/query/aggregation/DoubleMaxAggregator.java @@ -21,14 +21,10 @@ import io.druid.segment.DoubleColumnSelector; -import java.util.Comparator; - /** */ public class DoubleMaxAggregator implements Aggregator { - static final Comparator COMPARATOR = DoubleSumAggregator.COMPARATOR; - static double combineValues(Object lhs, Object rhs) { return Math.max(((Number) lhs).doubleValue(), ((Number) rhs).doubleValue()); diff --git a/processing/src/main/java/io/druid/query/aggregation/DoubleMinAggregator.java b/processing/src/main/java/io/druid/query/aggregation/DoubleMinAggregator.java index 33e0477cf294..f7592f14bbc3 100644 --- a/processing/src/main/java/io/druid/query/aggregation/DoubleMinAggregator.java +++ b/processing/src/main/java/io/druid/query/aggregation/DoubleMinAggregator.java @@ -21,14 +21,10 @@ import io.druid.segment.DoubleColumnSelector; -import java.util.Comparator; - /** */ public class DoubleMinAggregator implements Aggregator { - static final Comparator COMPARATOR = DoubleSumAggregator.COMPARATOR; - static double combineValues(Object lhs, Object rhs) { return Math.min(((Number) lhs).doubleValue(), ((Number) rhs).doubleValue()); diff --git a/processing/src/main/java/io/druid/query/aggregation/DoubleMinAggregatorFactory.java b/processing/src/main/java/io/druid/query/aggregation/DoubleMinAggregatorFactory.java index 8e696a8c93d6..9577e89846c3 100644 --- a/processing/src/main/java/io/druid/query/aggregation/DoubleMinAggregatorFactory.java +++ b/processing/src/main/java/io/druid/query/aggregation/DoubleMinAggregatorFactory.java @@ -28,7 +28,6 @@ import java.nio.ByteBuffer; import java.util.Arrays; -import java.util.Comparator; import java.util.List; import java.util.Objects; @@ -64,12 +63,6 @@ public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory) return new DoubleMinBufferAggregator(getDoubleColumnSelector(metricFactory, Double.POSITIVE_INFINITY)); } - @Override - public Comparator getComparator() - { - return DoubleMinAggregator.COMPARATOR; - } - @Override public Object combine(Object lhs, Object rhs) { diff --git a/processing/src/main/java/io/druid/query/aggregation/FloatMaxAggregator.java b/processing/src/main/java/io/druid/query/aggregation/FloatMaxAggregator.java index 0efbecdb1fd8..d344729d055b 100644 --- a/processing/src/main/java/io/druid/query/aggregation/FloatMaxAggregator.java +++ b/processing/src/main/java/io/druid/query/aggregation/FloatMaxAggregator.java @@ -21,14 +21,10 @@ import io.druid.segment.FloatColumnSelector; -import java.util.Comparator; - /** */ public class FloatMaxAggregator implements Aggregator { - static final Comparator COMPARATOR = FloatSumAggregator.COMPARATOR; - static double combineValues(Object lhs, Object rhs) { return Math.max(((Number) lhs).floatValue(), ((Number) rhs).floatValue()); diff --git a/processing/src/main/java/io/druid/query/aggregation/FloatMinAggregator.java b/processing/src/main/java/io/druid/query/aggregation/FloatMinAggregator.java index 67cd93f9e9ae..cbecd26b2238 100644 --- a/processing/src/main/java/io/druid/query/aggregation/FloatMinAggregator.java +++ b/processing/src/main/java/io/druid/query/aggregation/FloatMinAggregator.java @@ -21,14 +21,10 @@ import io.druid.segment.FloatColumnSelector; -import java.util.Comparator; - /** */ public class FloatMinAggregator implements Aggregator { - static final Comparator COMPARATOR = FloatSumAggregator.COMPARATOR; - static double combineValues(Object lhs, Object rhs) { return Math.min(((Number) lhs).floatValue(), ((Number) rhs).floatValue()); From 199bb6333bac5ecb3145b74455fdda8f9c99a36c Mon Sep 17 00:00:00 2001 From: leventov Date: Mon, 21 Aug 2017 14:29:26 -0500 Subject: [PATCH 5/5] Rename AggregateCombiner.combine() to fold() --- .../TimestampAggregatorFactory.java | 2 +- .../theta/SketchAggregatorFactory.java | 4 ++-- ...ApproximateHistogramAggregatorFactory.java | 2 +- .../variance/VarianceAggregatorFactory.java | 2 +- .../query/aggregation/AggregateCombiner.java | 19 +++++++++++-------- .../query/aggregation/AggregatorFactory.java | 2 +- .../DoubleMaxAggregateCombiner.java | 2 +- .../DoubleMinAggregateCombiner.java | 2 +- .../DoubleSumAggregateCombiner.java | 2 +- .../HistogramAggregatorFactory.java | 2 +- .../JavaScriptAggregatorFactory.java | 2 +- .../aggregation/LongMaxAggregatorFactory.java | 2 +- .../aggregation/LongMinAggregatorFactory.java | 2 +- .../aggregation/LongSumAggregateCombiner.java | 2 +- ...HyperLogLogCollectorAggregateCombiner.java | 4 ++-- 15 files changed, 27 insertions(+), 24 deletions(-) diff --git a/extensions-contrib/time-min-max/src/main/java/io/druid/query/aggregation/TimestampAggregatorFactory.java b/extensions-contrib/time-min-max/src/main/java/io/druid/query/aggregation/TimestampAggregatorFactory.java index d801d9389a1b..4e4c2ce34e2f 100644 --- a/extensions-contrib/time-min-max/src/main/java/io/druid/query/aggregation/TimestampAggregatorFactory.java +++ b/extensions-contrib/time-min-max/src/main/java/io/druid/query/aggregation/TimestampAggregatorFactory.java @@ -113,7 +113,7 @@ private long getTimestamp(ColumnValueSelector selector) } @Override - public void combine(ColumnValueSelector selector) + public void fold(ColumnValueSelector selector) { long other = getTimestamp(selector); if (comparator.compare(result, other) <= 0) { diff --git a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchAggregatorFactory.java b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchAggregatorFactory.java index c872ce0e53d5..0bf8ee09e5e7 100644 --- a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchAggregatorFactory.java +++ b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchAggregatorFactory.java @@ -116,11 +116,11 @@ public AggregateCombiner makeAggregateCombiner() public void reset(ColumnValueSelector selector) { union.reset(); - combine(selector); + fold(selector); } @Override - public void combine(ColumnValueSelector selector) + public void fold(ColumnValueSelector selector) { @SuppressWarnings("unchecked") SketchHolder other = ((ObjectColumnSelector) selector).get(); diff --git a/extensions-core/histogram/src/main/java/io/druid/query/aggregation/histogram/ApproximateHistogramAggregatorFactory.java b/extensions-core/histogram/src/main/java/io/druid/query/aggregation/histogram/ApproximateHistogramAggregatorFactory.java index d8a951e846c7..40dffb786b20 100644 --- a/extensions-core/histogram/src/main/java/io/druid/query/aggregation/histogram/ApproximateHistogramAggregatorFactory.java +++ b/extensions-core/histogram/src/main/java/io/druid/query/aggregation/histogram/ApproximateHistogramAggregatorFactory.java @@ -133,7 +133,7 @@ public void reset(ColumnValueSelector selector) } @Override - public void combine(ColumnValueSelector selector) + public void fold(ColumnValueSelector selector) { @SuppressWarnings("unchecked") ApproximateHistogram other = ((ObjectColumnSelector) selector).get(); diff --git a/extensions-core/stats/src/main/java/io/druid/query/aggregation/variance/VarianceAggregatorFactory.java b/extensions-core/stats/src/main/java/io/druid/query/aggregation/variance/VarianceAggregatorFactory.java index cc902357ca79..de27f1e0c10a 100644 --- a/extensions-core/stats/src/main/java/io/druid/query/aggregation/variance/VarianceAggregatorFactory.java +++ b/extensions-core/stats/src/main/java/io/druid/query/aggregation/variance/VarianceAggregatorFactory.java @@ -162,7 +162,7 @@ public void reset(ColumnValueSelector selector) } @Override - public void combine(ColumnValueSelector selector) + public void fold(ColumnValueSelector selector) { @SuppressWarnings("unchecked") VarianceAggregatorCollector other = ((ObjectColumnSelector) selector).get(); diff --git a/processing/src/main/java/io/druid/query/aggregation/AggregateCombiner.java b/processing/src/main/java/io/druid/query/aggregation/AggregateCombiner.java index 410bca05a12d..e873c82ed94f 100644 --- a/processing/src/main/java/io/druid/query/aggregation/AggregateCombiner.java +++ b/processing/src/main/java/io/druid/query/aggregation/AggregateCombiner.java @@ -22,7 +22,7 @@ import io.druid.segment.ColumnValueSelector; /** - * AggregateCombiner is used to combine rollup aggregation results from serveral "rows" of different indexes during index + * AggregateCombiner is used to fold rollup aggregation results from serveral "rows" of different indexes during index * merging (see {@link io.druid.segment.IndexMerger}). * * The state of the implementations of this interface is an aggregation value (either a primitive or an object), that @@ -30,6 +30,9 @@ * AggregateCombiner, it's state is undefined and {@link ColumnValueSelector}'s methods could return something random, * or null, or throw an exception. * + * This interface would probably better be called "AggregateFolder", but somebody may confuse it with "folder" as + * "directory" synonym. + * * @see AggregatorFactory#makeAggregateCombiner() * @see LongAggregateCombiner * @see DoubleAggregateCombiner @@ -43,24 +46,24 @@ public interface AggregateCombiner extends ColumnValueSelector * * If the selector is an {@link io.druid.segment.ObjectColumnSelector}, the object returned from {@link * io.druid.segment.ObjectColumnSelector#get()} must not be modified, and must not become a subject for modification - * during subsequent {@link #combine} calls. + * during subsequent {@link #fold} calls. */ void reset(ColumnValueSelector selector); /** - * Combines this AggregateCombiner's state value with the value of the given selector and saves it in this - * AggregateCombiner's state, e. g. after calling combiner.combine(selector), combiner.get*() should return the value + * Folds this AggregateCombiner's state value with the value of the given selector and saves it in this + * AggregateCombiner's state, e. g. after calling combiner.fold(selector), combiner.get*() should return the value * that would be the result of {@link AggregatorFactory#combine * aggregatorFactory.combine(combiner.get*(), selector.get*())} call. * * Unlike {@link AggregatorFactory#combine}, if the selector is an {@link io.druid.segment.ObjectColumnSelector}, the * object returned from {@link io.druid.segment.ObjectColumnSelector#get()} must not be modified, and must not become - * a subject for modification during subsequent combine() calls. + * a subject for modification during subsequent fold() calls. * - * Since the state of AggregateCombiner is undefined before {@link #reset} is ever called on it, the effects of calling - * combine() on an AggregateCombiner instance are also undefined in this case. + * Since the state of AggregateCombiner is undefined before {@link #reset} is ever called on it, the effects of + * calling fold() are also undefined in this case. * * @see AggregatorFactory#combine */ - void combine(ColumnValueSelector selector); + void fold(ColumnValueSelector selector); } diff --git a/processing/src/main/java/io/druid/query/aggregation/AggregatorFactory.java b/processing/src/main/java/io/druid/query/aggregation/AggregatorFactory.java index 43c21e59bbe8..9a37e1629556 100644 --- a/processing/src/main/java/io/druid/query/aggregation/AggregatorFactory.java +++ b/processing/src/main/java/io/druid/query/aggregation/AggregatorFactory.java @@ -59,7 +59,7 @@ public abstract class AggregatorFactory implements Cacheable public abstract Object combine(Object lhs, Object rhs); /** - * Creates an AggregateCombiner to combine rollup aggregation results from serveral "rows" of different indexes during + * Creates an AggregateCombiner to fold rollup aggregation results from serveral "rows" of different indexes during * index merging. AggregateCombiner implements the same logic as {@link #combine}, with the difference that it uses * {@link io.druid.segment.ColumnValueSelector} and it's subinterfaces to get inputs and implements {@code * ColumnValueSelector} to provide output. diff --git a/processing/src/main/java/io/druid/query/aggregation/DoubleMaxAggregateCombiner.java b/processing/src/main/java/io/druid/query/aggregation/DoubleMaxAggregateCombiner.java index 2fd5306d9474..4aa4845e1d51 100644 --- a/processing/src/main/java/io/druid/query/aggregation/DoubleMaxAggregateCombiner.java +++ b/processing/src/main/java/io/druid/query/aggregation/DoubleMaxAggregateCombiner.java @@ -32,7 +32,7 @@ public void reset(ColumnValueSelector selector) } @Override - public void combine(ColumnValueSelector selector) + public void fold(ColumnValueSelector selector) { max = Math.max(max, selector.getDouble()); } diff --git a/processing/src/main/java/io/druid/query/aggregation/DoubleMinAggregateCombiner.java b/processing/src/main/java/io/druid/query/aggregation/DoubleMinAggregateCombiner.java index 3508336613ed..e0ea8bb33c33 100644 --- a/processing/src/main/java/io/druid/query/aggregation/DoubleMinAggregateCombiner.java +++ b/processing/src/main/java/io/druid/query/aggregation/DoubleMinAggregateCombiner.java @@ -32,7 +32,7 @@ public void reset(ColumnValueSelector selector) } @Override - public void combine(ColumnValueSelector selector) + public void fold(ColumnValueSelector selector) { min = Math.min(min, selector.getDouble()); } diff --git a/processing/src/main/java/io/druid/query/aggregation/DoubleSumAggregateCombiner.java b/processing/src/main/java/io/druid/query/aggregation/DoubleSumAggregateCombiner.java index 87adc6115724..60b34a5bee9e 100644 --- a/processing/src/main/java/io/druid/query/aggregation/DoubleSumAggregateCombiner.java +++ b/processing/src/main/java/io/druid/query/aggregation/DoubleSumAggregateCombiner.java @@ -32,7 +32,7 @@ public void reset(ColumnValueSelector selector) } @Override - public void combine(ColumnValueSelector selector) + public void fold(ColumnValueSelector selector) { sum += selector.getDouble(); } diff --git a/processing/src/main/java/io/druid/query/aggregation/HistogramAggregatorFactory.java b/processing/src/main/java/io/druid/query/aggregation/HistogramAggregatorFactory.java index b1fb73cb47c3..1bccb8dcaed4 100644 --- a/processing/src/main/java/io/druid/query/aggregation/HistogramAggregatorFactory.java +++ b/processing/src/main/java/io/druid/query/aggregation/HistogramAggregatorFactory.java @@ -113,7 +113,7 @@ public void reset(ColumnValueSelector selector) } @Override - public void combine(ColumnValueSelector selector) + public void fold(ColumnValueSelector selector) { @SuppressWarnings("unchecked") Histogram other = ((ObjectColumnSelector) selector).get(); diff --git a/processing/src/main/java/io/druid/query/aggregation/JavaScriptAggregatorFactory.java b/processing/src/main/java/io/druid/query/aggregation/JavaScriptAggregatorFactory.java index ca7ef3a9f170..afcc71bd22a3 100644 --- a/processing/src/main/java/io/druid/query/aggregation/JavaScriptAggregatorFactory.java +++ b/processing/src/main/java/io/druid/query/aggregation/JavaScriptAggregatorFactory.java @@ -155,7 +155,7 @@ public void reset(ColumnValueSelector selector) } @Override - public void combine(ColumnValueSelector selector) + public void fold(ColumnValueSelector selector) { combined = getCompiledScript().combine(combined, selector.getDouble()); } diff --git a/processing/src/main/java/io/druid/query/aggregation/LongMaxAggregatorFactory.java b/processing/src/main/java/io/druid/query/aggregation/LongMaxAggregatorFactory.java index ba806d378094..35c25cffeb30 100644 --- a/processing/src/main/java/io/druid/query/aggregation/LongMaxAggregatorFactory.java +++ b/processing/src/main/java/io/druid/query/aggregation/LongMaxAggregatorFactory.java @@ -115,7 +115,7 @@ public void reset(ColumnValueSelector selector) } @Override - public void combine(ColumnValueSelector selector) + public void fold(ColumnValueSelector selector) { max = Math.max(max, selector.getLong()); } diff --git a/processing/src/main/java/io/druid/query/aggregation/LongMinAggregatorFactory.java b/processing/src/main/java/io/druid/query/aggregation/LongMinAggregatorFactory.java index 75c471c4774b..d024dc6afa1c 100644 --- a/processing/src/main/java/io/druid/query/aggregation/LongMinAggregatorFactory.java +++ b/processing/src/main/java/io/druid/query/aggregation/LongMinAggregatorFactory.java @@ -116,7 +116,7 @@ public void reset(ColumnValueSelector selector) } @Override - public void combine(ColumnValueSelector selector) + public void fold(ColumnValueSelector selector) { min = Math.min(min, selector.getLong()); } diff --git a/processing/src/main/java/io/druid/query/aggregation/LongSumAggregateCombiner.java b/processing/src/main/java/io/druid/query/aggregation/LongSumAggregateCombiner.java index 34e0ff16ae05..0ceff541b1b7 100644 --- a/processing/src/main/java/io/druid/query/aggregation/LongSumAggregateCombiner.java +++ b/processing/src/main/java/io/druid/query/aggregation/LongSumAggregateCombiner.java @@ -32,7 +32,7 @@ public void reset(ColumnValueSelector selector) } @Override - public void combine(ColumnValueSelector selector) + public void fold(ColumnValueSelector selector) { sum += selector.getLong(); } diff --git a/processing/src/main/java/io/druid/query/aggregation/cardinality/HyperLogLogCollectorAggregateCombiner.java b/processing/src/main/java/io/druid/query/aggregation/cardinality/HyperLogLogCollectorAggregateCombiner.java index c5eced672bb4..86d8c972c859 100644 --- a/processing/src/main/java/io/druid/query/aggregation/cardinality/HyperLogLogCollectorAggregateCombiner.java +++ b/processing/src/main/java/io/druid/query/aggregation/cardinality/HyperLogLogCollectorAggregateCombiner.java @@ -35,11 +35,11 @@ public final class HyperLogLogCollectorAggregateCombiner extends ObjectAggregate public void reset(ColumnValueSelector selector) { combined = null; - combine(selector); + fold(selector); } @Override - public void combine(ColumnValueSelector selector) + public void fold(ColumnValueSelector selector) { @SuppressWarnings("unchecked") HyperLogLogCollector other = ((ObjectColumnSelector) selector).get();