diff --git a/extensions-contrib/distinctcount/src/main/java/io/druid/query/aggregation/distinctcount/DistinctCountAggregatorFactory.java b/extensions-contrib/distinctcount/src/main/java/io/druid/query/aggregation/distinctcount/DistinctCountAggregatorFactory.java index a6578b9de2e9..350283cfe150 100644 --- a/extensions-contrib/distinctcount/src/main/java/io/druid/query/aggregation/distinctcount/DistinctCountAggregatorFactory.java +++ b/extensions-contrib/distinctcount/src/main/java/io/druid/query/aggregation/distinctcount/DistinctCountAggregatorFactory.java @@ -29,6 +29,8 @@ import io.druid.query.aggregation.AggregatorUtil; import io.druid.query.aggregation.BufferAggregator; import io.druid.query.aggregation.LongSumAggregatorFactory; +import io.druid.query.aggregation.LongSumAggregateCombiner; +import io.druid.query.aggregation.AggregateCombiner; import io.druid.query.dimension.DefaultDimensionSpec; import io.druid.segment.ColumnSelectorFactory; import io.druid.segment.DimensionSelector; @@ -119,6 +121,13 @@ public Object combine(Object lhs, Object rhs) return ((Number) lhs).longValue() + ((Number) rhs).longValue(); } + @Override + public AggregateCombiner makeAggregateCombiner() + { + // This is likely wrong as well as combine(), see https://github.com/druid-io/druid/pull/2602#issuecomment-321224202 + return new LongSumAggregateCombiner(); + } + @Override public AggregatorFactory getCombiningFactory() { diff --git a/extensions-contrib/time-min-max/src/main/java/io/druid/query/aggregation/TimestampAggregator.java b/extensions-contrib/time-min-max/src/main/java/io/druid/query/aggregation/TimestampAggregator.java index ecd4921fcea2..78209b1acd63 100644 --- a/extensions-contrib/time-min-max/src/main/java/io/druid/query/aggregation/TimestampAggregator.java +++ b/extensions-contrib/time-min-max/src/main/java/io/druid/query/aggregation/TimestampAggregator.java @@ -28,9 +28,13 @@ public class TimestampAggregator implements Aggregator { static final Comparator COMPARATOR = LongMaxAggregator.COMPARATOR; - static long combineValues(Object lhs, Object rhs) + static Object combineValues(Comparator comparator, Object lhs, Object rhs) { - return Math.max(((Number) lhs).longValue(), ((Number) rhs).longValue()); + if (comparator.compare(((Number) lhs).longValue(), ((Number) rhs).longValue()) > 0) { + return lhs; + } else { + return rhs; + } } private final ObjectColumnSelector selector; diff --git a/extensions-contrib/time-min-max/src/main/java/io/druid/query/aggregation/TimestampAggregatorFactory.java b/extensions-contrib/time-min-max/src/main/java/io/druid/query/aggregation/TimestampAggregatorFactory.java index 1a27163b5ea3..7b9fe52f86ad 100644 --- a/extensions-contrib/time-min-max/src/main/java/io/druid/query/aggregation/TimestampAggregatorFactory.java +++ b/extensions-contrib/time-min-max/src/main/java/io/druid/query/aggregation/TimestampAggregatorFactory.java @@ -25,6 +25,8 @@ import io.druid.java.util.common.DateTimes; import io.druid.java.util.common.StringUtils; import io.druid.segment.ColumnSelectorFactory; +import io.druid.segment.ColumnValueSelector; +import io.druid.segment.ObjectColumnSelector; import org.joda.time.DateTime; import java.nio.ByteBuffer; @@ -83,7 +85,49 @@ public Comparator getComparator() @Override public Object combine(Object lhs, Object rhs) { - return TimestampAggregator.combineValues(lhs, rhs); + return TimestampAggregator.combineValues(comparator, lhs, rhs); + } + + @Override + public AggregateCombiner makeAggregateCombiner() + { + // TimestampAggregatorFactory.combine() delegates to TimestampAggregator.combineValues() and it doesn't check + // for nulls, so this AggregateCombiner neither. + return new LongAggregateCombiner() + { + private long result; + + @Override + public void reset(ColumnValueSelector selector) + { + result = getTimestamp(selector); + } + + private long getTimestamp(ColumnValueSelector selector) + { + if (selector instanceof ObjectColumnSelector) { + Object input = ((ObjectColumnSelector) selector).get(); + return convertLong(timestampSpec, input); + } else { + return selector.getLong(); + } + } + + @Override + public void fold(ColumnValueSelector selector) + { + long other = getTimestamp(selector); + if (comparator.compare(result, other) <= 0) { + result = other; + } + } + + @Override + public long getLong() + { + return result; + } + }; } @Override diff --git a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchAggregatorFactory.java b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchAggregatorFactory.java index f09241409926..0bf8ee09e5e7 100644 --- a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchAggregatorFactory.java +++ b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchAggregatorFactory.java @@ -22,15 +22,21 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Preconditions; import com.google.common.primitives.Ints; +import com.yahoo.sketches.Family; import com.yahoo.sketches.Util; import com.yahoo.sketches.theta.SetOperation; +import com.yahoo.sketches.theta.Union; import io.druid.java.util.common.StringUtils; +import io.druid.query.aggregation.AggregateCombiner; import io.druid.query.aggregation.Aggregator; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.BufferAggregator; +import io.druid.query.aggregation.ObjectAggregateCombiner; import io.druid.segment.ColumnSelectorFactory; +import io.druid.segment.ColumnValueSelector; import io.druid.segment.ObjectColumnSelector; +import javax.annotation.Nullable; import java.nio.ByteBuffer; import java.util.Collections; import java.util.Comparator; @@ -98,6 +104,47 @@ public Object combine(Object lhs, Object rhs) return SketchHolder.combine(lhs, rhs, size); } + @Override + public AggregateCombiner makeAggregateCombiner() + { + return new ObjectAggregateCombiner() + { + private final Union union = (Union) SetOperation.builder().build(size, Family.UNION); + private final SketchHolder combined = SketchHolder.of(union); + + @Override + public void reset(ColumnValueSelector selector) + { + union.reset(); + fold(selector); + } + + @Override + public void fold(ColumnValueSelector selector) + { + @SuppressWarnings("unchecked") + SketchHolder other = ((ObjectColumnSelector) selector).get(); + // SketchAggregatorFactory.combine() delegates to SketchHolder.combine() and it doesn't check for nulls, so we + // neither. + other.updateUnion(union); + combined.invalidateCache(); + } + + @Override + public Class classOfObject() + { + return SketchHolder.class; + } + + @Nullable + @Override + public SketchHolder get() + { + return combined; + } + }; + } + @Override @JsonProperty public String getName() diff --git a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchHolder.java b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchHolder.java index 891d39754047..33a1779b9e03 100644 --- a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchHolder.java +++ b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchHolder.java @@ -171,7 +171,7 @@ public SketchEstimateWithErrorBounds getEstimateWithErrorBounds(int errorBoundsS return result; } - public static Object combine(Object o1, Object o2, int nomEntries) + public static SketchHolder combine(Object o1, Object o2, int nomEntries) { SketchHolder holder1 = (SketchHolder) o1; SketchHolder holder2 = (SketchHolder) o2; @@ -194,7 +194,7 @@ public static Object combine(Object o1, Object o2, int nomEntries) } } - private void invalidateCache() + void invalidateCache() { cachedEstimate = null; cachedSketch = null; diff --git a/extensions-core/histogram/src/main/java/io/druid/query/aggregation/histogram/ApproximateHistogram.java b/extensions-core/histogram/src/main/java/io/druid/query/aggregation/histogram/ApproximateHistogram.java index 494b18681ec3..618451272139 100644 --- a/extensions-core/histogram/src/main/java/io/druid/query/aggregation/histogram/ApproximateHistogram.java +++ b/extensions-core/histogram/src/main/java/io/druid/query/aggregation/histogram/ApproximateHistogram.java @@ -534,9 +534,11 @@ public ApproximateHistogram foldFast(ApproximateHistogram h, float[] mergedPosit */ public ApproximateHistogram copy(ApproximateHistogram h) { - this.size = h.size; - this.positions = new float[size]; - this.bins = new long[size]; + if (h.size > this.size) { + this.size = h.size; + this.positions = new float[size]; + this.bins = new long[size]; + } System.arraycopy(h.positions, 0, this.positions, 0, h.binCount); System.arraycopy(h.bins, 0, this.bins, 0, h.binCount); diff --git a/extensions-core/histogram/src/main/java/io/druid/query/aggregation/histogram/ApproximateHistogramAggregator.java b/extensions-core/histogram/src/main/java/io/druid/query/aggregation/histogram/ApproximateHistogramAggregator.java index 3601aa5da8a5..fcc4951d19ee 100644 --- a/extensions-core/histogram/src/main/java/io/druid/query/aggregation/histogram/ApproximateHistogramAggregator.java +++ b/extensions-core/histogram/src/main/java/io/druid/query/aggregation/histogram/ApproximateHistogramAggregator.java @@ -36,7 +36,7 @@ public int compare(Object o, Object o1) } }; - static Object combineHistograms(Object lhs, Object rhs) + static ApproximateHistogram combineHistograms(Object lhs, Object rhs) { return ((ApproximateHistogram) lhs).foldFast((ApproximateHistogram) rhs); } diff --git a/extensions-core/histogram/src/main/java/io/druid/query/aggregation/histogram/ApproximateHistogramAggregatorFactory.java b/extensions-core/histogram/src/main/java/io/druid/query/aggregation/histogram/ApproximateHistogramAggregatorFactory.java index 7bfa9f05c109..40dffb786b20 100644 --- a/extensions-core/histogram/src/main/java/io/druid/query/aggregation/histogram/ApproximateHistogramAggregatorFactory.java +++ b/extensions-core/histogram/src/main/java/io/druid/query/aggregation/histogram/ApproximateHistogramAggregatorFactory.java @@ -26,16 +26,20 @@ import com.google.common.base.Preconditions; import com.google.common.primitives.Floats; import com.google.common.primitives.Ints; - import io.druid.java.util.common.StringUtils; +import io.druid.query.aggregation.AggregateCombiner; import io.druid.query.aggregation.Aggregator; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.AggregatorFactoryNotMergeableException; import io.druid.query.aggregation.AggregatorUtil; import io.druid.query.aggregation.BufferAggregator; +import io.druid.query.aggregation.ObjectAggregateCombiner; import io.druid.segment.ColumnSelectorFactory; +import io.druid.segment.ColumnValueSelector; +import io.druid.segment.ObjectColumnSelector; import org.apache.commons.codec.binary.Base64; +import javax.annotation.Nullable; import java.nio.ByteBuffer; import java.util.Arrays; import java.util.Collections; @@ -111,6 +115,46 @@ public Object combine(Object lhs, Object rhs) return ApproximateHistogramAggregator.combineHistograms(lhs, rhs); } + @Override + public AggregateCombiner makeAggregateCombiner() + { + // ApproximateHistogramAggregatorFactory.combine() delegates to ApproximateHistogramAggregator.combineHistograms() + // and it doesn't check for nulls, so this AggregateCombiner neither. + return new ObjectAggregateCombiner() + { + private final ApproximateHistogram combined = new ApproximateHistogram(); + + @Override + public void reset(ColumnValueSelector selector) + { + @SuppressWarnings("unchecked") + ApproximateHistogram first = ((ObjectColumnSelector) selector).get(); + combined.copy(first); + } + + @Override + public void fold(ColumnValueSelector selector) + { + @SuppressWarnings("unchecked") + ApproximateHistogram other = ((ObjectColumnSelector) selector).get(); + combined.foldFast(other); + } + + @Override + public Class classOfObject() + { + return ApproximateHistogram.class; + } + + @Nullable + @Override + public ApproximateHistogram get() + { + return combined; + } + }; + } + @Override public AggregatorFactory getCombiningFactory() { diff --git a/extensions-core/stats/src/main/java/io/druid/query/aggregation/variance/VarianceAggregatorCollector.java b/extensions-core/stats/src/main/java/io/druid/query/aggregation/variance/VarianceAggregatorCollector.java index 3c388ad8c3a0..3d048cf835a6 100644 --- a/extensions-core/stats/src/main/java/io/druid/query/aggregation/variance/VarianceAggregatorCollector.java +++ b/extensions-core/stats/src/main/java/io/druid/query/aggregation/variance/VarianceAggregatorCollector.java @@ -74,29 +74,29 @@ public int compare(VarianceAggregatorCollector o1, VarianceAggregatorCollector o } }; - static Object combineValues(Object lhs, Object rhs) + void fold(VarianceAggregatorCollector other) { - final VarianceAggregatorCollector holder1 = (VarianceAggregatorCollector) lhs; - final VarianceAggregatorCollector holder2 = (VarianceAggregatorCollector) rhs; - - if (holder2.count == 0) { - return holder1; + if (other.count == 0) { + return; } - if (holder1.count == 0) { - holder1.nvariance = holder2.nvariance; - holder1.count = holder2.count; - holder1.sum = holder2.sum; - return holder1; + if (this.count == 0) { + this.nvariance = other.nvariance; + this.count = other.count; + this.sum = other.sum; + return; } + final double ratio = this.count / (double) other.count; + final double t = this.sum / ratio - other.sum; - final double ratio = holder1.count / (double) holder2.count; - final double t = holder1.sum / ratio - holder2.sum; - - holder1.nvariance += holder2.nvariance + (ratio / (holder1.count + holder2.count) * t * t); - holder1.count += holder2.count; - holder1.sum += holder2.sum; + this.nvariance += other.nvariance + (ratio / (this.count + other.count) * t * t); + this.count += other.count; + this.sum += other.sum; + } - return holder1; + static Object combineValues(Object lhs, Object rhs) + { + ((VarianceAggregatorCollector) lhs).fold((VarianceAggregatorCollector) rhs); + return lhs; } static int getMaxIntermediateSize() @@ -120,6 +120,13 @@ public void reset() nvariance = 0; } + void copyFrom(VarianceAggregatorCollector other) + { + this.count = other.count; + this.sum = other.sum; + this.nvariance = other.nvariance; + } + public VarianceAggregatorCollector(long count, double sum, double nvariance) { this.count = count; diff --git a/extensions-core/stats/src/main/java/io/druid/query/aggregation/variance/VarianceAggregatorFactory.java b/extensions-core/stats/src/main/java/io/druid/query/aggregation/variance/VarianceAggregatorFactory.java index 4af691543840..de27f1e0c10a 100644 --- a/extensions-core/stats/src/main/java/io/druid/query/aggregation/variance/VarianceAggregatorFactory.java +++ b/extensions-core/stats/src/main/java/io/druid/query/aggregation/variance/VarianceAggregatorFactory.java @@ -30,9 +30,12 @@ import io.druid.query.aggregation.AggregatorFactoryNotMergeableException; import io.druid.query.aggregation.AggregatorUtil; import io.druid.query.aggregation.BufferAggregator; +import io.druid.query.aggregation.AggregateCombiner; import io.druid.query.aggregation.NoopAggregator; import io.druid.query.aggregation.NoopBufferAggregator; +import io.druid.query.aggregation.ObjectAggregateCombiner; import io.druid.segment.ColumnSelectorFactory; +import io.druid.segment.ColumnValueSelector; import io.druid.segment.ObjectColumnSelector; import org.apache.commons.codec.binary.Base64; @@ -135,6 +138,51 @@ public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory) ); } + @Override + public Object combine(Object lhs, Object rhs) + { + return VarianceAggregatorCollector.combineValues(lhs, rhs); + } + + @Override + public AggregateCombiner makeAggregateCombiner() + { + // VarianceAggregatorFactory.combine() delegates to VarianceAggregatorCollector.combineValues() and it doesn't check + // for nulls, so this AggregateCombiner neither. + return new ObjectAggregateCombiner() + { + private final VarianceAggregatorCollector combined = new VarianceAggregatorCollector(); + + @Override + public void reset(ColumnValueSelector selector) + { + @SuppressWarnings("unchecked") + VarianceAggregatorCollector first = ((ObjectColumnSelector) selector).get(); + combined.copyFrom(first); + } + + @Override + public void fold(ColumnValueSelector selector) + { + @SuppressWarnings("unchecked") + VarianceAggregatorCollector other = ((ObjectColumnSelector) selector).get(); + combined.fold(other); + } + + @Override + public Class classOfObject() + { + return VarianceAggregatorCollector.class; + } + + @Override + public VarianceAggregatorCollector get() + { + return combined; + } + }; + } + @Override public AggregatorFactory getCombiningFactory() { @@ -163,12 +211,6 @@ public Comparator getComparator() return VarianceAggregatorCollector.COMPARATOR; } - @Override - public Object combine(Object lhs, Object rhs) - { - return VarianceAggregatorCollector.combineValues(lhs, rhs); - } - @Override public Object finalizeComputation(Object object) { diff --git a/hll/src/main/java/io/druid/hll/HyperLogLogCollector.java b/hll/src/main/java/io/druid/hll/HyperLogLogCollector.java index 3b2a283c0052..51f6770ece50 100644 --- a/hll/src/main/java/io/druid/hll/HyperLogLogCollector.java +++ b/hll/src/main/java/io/druid/hll/HyperLogLogCollector.java @@ -24,6 +24,7 @@ import io.druid.java.util.common.IAE; import io.druid.java.util.common.ISE; +import javax.annotation.Nullable; import java.nio.ByteBuffer; /** @@ -358,7 +359,7 @@ public void add(short bucket, byte positionOf1) } } - public HyperLogLogCollector fold(HyperLogLogCollector other) + public HyperLogLogCollector fold(@Nullable HyperLogLogCollector other) { if (other == null || other.storageBuffer.remaining() == 0) { return this; diff --git a/processing/src/main/java/io/druid/query/aggregation/AggregateCombiner.java b/processing/src/main/java/io/druid/query/aggregation/AggregateCombiner.java new file mode 100644 index 000000000000..e873c82ed94f --- /dev/null +++ b/processing/src/main/java/io/druid/query/aggregation/AggregateCombiner.java @@ -0,0 +1,69 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.query.aggregation; + +import io.druid.segment.ColumnValueSelector; + +/** + * AggregateCombiner is used to fold rollup aggregation results from serveral "rows" of different indexes during index + * merging (see {@link io.druid.segment.IndexMerger}). + * + * The state of the implementations of this interface is an aggregation value (either a primitive or an object), that + * could be queried via {@link ColumnValueSelector}'s methods. Before {@link #reset} is ever called on an + * AggregateCombiner, it's state is undefined and {@link ColumnValueSelector}'s methods could return something random, + * or null, or throw an exception. + * + * This interface would probably better be called "AggregateFolder", but somebody may confuse it with "folder" as + * "directory" synonym. + * + * @see AggregatorFactory#makeAggregateCombiner() + * @see LongAggregateCombiner + * @see DoubleAggregateCombiner + * @see ObjectAggregateCombiner + */ +public interface AggregateCombiner extends ColumnValueSelector +{ + /** + * Resets this AggregateCombiner's state value to the value of the given selector, e. g. after calling this method + * combiner.get*() should return the same value as selector.get*(). + * + * If the selector is an {@link io.druid.segment.ObjectColumnSelector}, the object returned from {@link + * io.druid.segment.ObjectColumnSelector#get()} must not be modified, and must not become a subject for modification + * during subsequent {@link #fold} calls. + */ + void reset(ColumnValueSelector selector); + + /** + * Folds this AggregateCombiner's state value with the value of the given selector and saves it in this + * AggregateCombiner's state, e. g. after calling combiner.fold(selector), combiner.get*() should return the value + * that would be the result of {@link AggregatorFactory#combine + * aggregatorFactory.combine(combiner.get*(), selector.get*())} call. + * + * Unlike {@link AggregatorFactory#combine}, if the selector is an {@link io.druid.segment.ObjectColumnSelector}, the + * object returned from {@link io.druid.segment.ObjectColumnSelector#get()} must not be modified, and must not become + * a subject for modification during subsequent fold() calls. + * + * Since the state of AggregateCombiner is undefined before {@link #reset} is ever called on it, the effects of + * calling fold() are also undefined in this case. + * + * @see AggregatorFactory#combine + */ + void fold(ColumnValueSelector selector); +} diff --git a/processing/src/main/java/io/druid/query/aggregation/Aggregator.java b/processing/src/main/java/io/druid/query/aggregation/Aggregator.java index fb4689fe009a..847fdcb67838 100644 --- a/processing/src/main/java/io/druid/query/aggregation/Aggregator.java +++ b/processing/src/main/java/io/druid/query/aggregation/Aggregator.java @@ -27,12 +27,8 @@ * it can use to get at the next bit of data. * * Thus, an Aggregator can be thought of as a closure over some other thing that is stateful and changes between calls - * to aggregate(). This is currently (as of this documentation) implemented through the use of Offset and - * FloatColumnSelector objects. The Aggregator has a handle on a FloatColumnSelector object which has a handle on an Offset. - * QueryableIndex has both the Aggregators and the Offset object and iterates through the Offset calling the aggregate() - * method on the Aggregators for each applicable row. - * - * This interface is old and going away. It is being replaced by BufferAggregator + * to aggregate(). This is currently (as of this documentation) implemented through the use of {@link + * io.druid.segment.ColumnValueSelector} objects. */ public interface Aggregator extends Closeable { diff --git a/processing/src/main/java/io/druid/query/aggregation/AggregatorFactory.java b/processing/src/main/java/io/druid/query/aggregation/AggregatorFactory.java index 388ad9114277..9a37e1629556 100644 --- a/processing/src/main/java/io/druid/query/aggregation/AggregatorFactory.java +++ b/processing/src/main/java/io/druid/query/aggregation/AggregatorFactory.java @@ -24,20 +24,16 @@ import io.druid.java.util.common.logger.Logger; import io.druid.segment.ColumnSelectorFactory; +import javax.annotation.Nullable; import java.util.Comparator; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; /** - * Processing related interface - * - * An AggregatorFactory is an object that knows how to generate an Aggregator using a ColumnSelectorFactory. - * - * This is useful as an abstraction to allow Aggregator classes to be written in terms of MetricSelector objects - * without making any assumptions about how they are pulling values out of the base data. That is, the data is - * provided to the Aggregator through the MetricSelector object, so whatever creates that object gets to choose how - * the data is actually stored and accessed. + * AggregatorFactory is a strategy (in the terms of Design Patterns) that represents column aggregation, e. g. min, + * max, sum of metric columns, or cardinality of dimension columns (see {@link + * io.druid.query.aggregation.cardinality.CardinalityAggregatorFactory}). */ public abstract class AggregatorFactory implements Cacheable { @@ -50,10 +46,10 @@ public abstract class AggregatorFactory implements Cacheable public abstract Comparator getComparator(); /** - * A method that knows how to combine the outputs of the getIntermediate() method from the Aggregators - * produced via factorize(). Note, even though this is called combine, this method's contract *does* - * allow for mutation of the input objects. Thus, any use of lhs or rhs after calling this method is - * highly discouraged. + * A method that knows how to combine the outputs of {@link Aggregator#get} produced via {@link #factorize} or {@link + * BufferAggregator#get} produced via {@link #factorizeBuffered}. Note, even though this method is called "combine", + * this method's contract *does* allow for mutation of the input objects. Thus, any use of lhs or rhs after calling + * this method is highly discouraged. * * @param lhs The left hand side of the combine * @param rhs The right hand side of the combine @@ -62,6 +58,20 @@ public abstract class AggregatorFactory implements Cacheable */ public abstract Object combine(Object lhs, Object rhs); + /** + * Creates an AggregateCombiner to fold rollup aggregation results from serveral "rows" of different indexes during + * index merging. AggregateCombiner implements the same logic as {@link #combine}, with the difference that it uses + * {@link io.druid.segment.ColumnValueSelector} and it's subinterfaces to get inputs and implements {@code + * ColumnValueSelector} to provide output. + * + * @see AggregateCombiner + * @see io.druid.segment.IndexMerger + */ + public AggregateCombiner makeAggregateCombiner() + { + throw new UOE("[%s] does not implement makeAggregateCombiner()", this.getClass().getName()); + } + /** * Returns an AggregatorFactory that can be used to combine the output of aggregators from this factory. This * generally amounts to simply creating a new factory that is the same as the current except with its input @@ -134,6 +144,7 @@ public AggregatorFactory getMergingFactory(AggregatorFactory other) throws Aggre * * @return merged AggregatorFactory[] or Null if merging is not possible. */ + @Nullable public static AggregatorFactory[] mergeAggregators(List aggregatorsList) { if (aggregatorsList == null || aggregatorsList.isEmpty()) { diff --git a/processing/src/main/java/io/druid/query/aggregation/CountAggregatorFactory.java b/processing/src/main/java/io/druid/query/aggregation/CountAggregatorFactory.java index ba7677cbcd86..1e1be24b2e05 100644 --- a/processing/src/main/java/io/druid/query/aggregation/CountAggregatorFactory.java +++ b/processing/src/main/java/io/druid/query/aggregation/CountAggregatorFactory.java @@ -70,6 +70,12 @@ public Object combine(Object lhs, Object rhs) return CountAggregator.combineValues(lhs, rhs); } + @Override + public AggregateCombiner makeAggregateCombiner() + { + return new LongSumAggregateCombiner(); + } + @Override public AggregatorFactory getCombiningFactory() { diff --git a/processing/src/main/java/io/druid/query/aggregation/DoubleAggregateCombiner.java b/processing/src/main/java/io/druid/query/aggregation/DoubleAggregateCombiner.java new file mode 100644 index 000000000000..821a070e1b23 --- /dev/null +++ b/processing/src/main/java/io/druid/query/aggregation/DoubleAggregateCombiner.java @@ -0,0 +1,35 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.query.aggregation; + +import io.druid.query.monomorphicprocessing.RuntimeShapeInspector; +import io.druid.segment.DoubleColumnSelector; + +/** + * Specialization of {@link AggregateCombiner} for primitive double aggregations. + */ +public abstract class DoubleAggregateCombiner implements AggregateCombiner, DoubleColumnSelector +{ + @Override + public void inspectRuntimeShape(RuntimeShapeInspector inspector) + { + // Usually AggregateCombiner has nothing to inspect + } +} diff --git a/processing/src/main/java/io/druid/query/aggregation/DoubleMaxAggregateCombiner.java b/processing/src/main/java/io/druid/query/aggregation/DoubleMaxAggregateCombiner.java new file mode 100644 index 000000000000..4aa4845e1d51 --- /dev/null +++ b/processing/src/main/java/io/druid/query/aggregation/DoubleMaxAggregateCombiner.java @@ -0,0 +1,45 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.query.aggregation; + +import io.druid.segment.ColumnValueSelector; + +final class DoubleMaxAggregateCombiner extends DoubleAggregateCombiner +{ + private double max; + + @Override + public void reset(ColumnValueSelector selector) + { + max = selector.getDouble(); + } + + @Override + public void fold(ColumnValueSelector selector) + { + max = Math.max(max, selector.getDouble()); + } + + @Override + public double getDouble() + { + return max; + } +} diff --git a/processing/src/main/java/io/druid/query/aggregation/DoubleMaxAggregator.java b/processing/src/main/java/io/druid/query/aggregation/DoubleMaxAggregator.java index 6b4f384d51c7..dcf4eadc61de 100644 --- a/processing/src/main/java/io/druid/query/aggregation/DoubleMaxAggregator.java +++ b/processing/src/main/java/io/druid/query/aggregation/DoubleMaxAggregator.java @@ -21,14 +21,10 @@ import io.druid.segment.DoubleColumnSelector; -import java.util.Comparator; - /** */ public class DoubleMaxAggregator implements Aggregator { - static final Comparator COMPARATOR = DoubleSumAggregator.COMPARATOR; - static double combineValues(Object lhs, Object rhs) { return Math.max(((Number) lhs).doubleValue(), ((Number) rhs).doubleValue()); diff --git a/processing/src/main/java/io/druid/query/aggregation/DoubleMaxAggregatorFactory.java b/processing/src/main/java/io/druid/query/aggregation/DoubleMaxAggregatorFactory.java index 85267eb622a6..ff73f2f8c0eb 100644 --- a/processing/src/main/java/io/druid/query/aggregation/DoubleMaxAggregatorFactory.java +++ b/processing/src/main/java/io/druid/query/aggregation/DoubleMaxAggregatorFactory.java @@ -70,6 +70,12 @@ public Object combine(Object lhs, Object rhs) return DoubleMaxAggregator.combineValues(lhs, rhs); } + @Override + public AggregateCombiner makeAggregateCombiner() + { + return new DoubleMaxAggregateCombiner(); + } + @Override public AggregatorFactory getCombiningFactory() { diff --git a/processing/src/main/java/io/druid/query/aggregation/DoubleMinAggregateCombiner.java b/processing/src/main/java/io/druid/query/aggregation/DoubleMinAggregateCombiner.java new file mode 100644 index 000000000000..e0ea8bb33c33 --- /dev/null +++ b/processing/src/main/java/io/druid/query/aggregation/DoubleMinAggregateCombiner.java @@ -0,0 +1,45 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.query.aggregation; + +import io.druid.segment.ColumnValueSelector; + +final class DoubleMinAggregateCombiner extends DoubleAggregateCombiner +{ + private double min; + + @Override + public void reset(ColumnValueSelector selector) + { + min = selector.getDouble(); + } + + @Override + public void fold(ColumnValueSelector selector) + { + min = Math.min(min, selector.getDouble()); + } + + @Override + public double getDouble() + { + return min; + } +} diff --git a/processing/src/main/java/io/druid/query/aggregation/DoubleMinAggregator.java b/processing/src/main/java/io/druid/query/aggregation/DoubleMinAggregator.java index 33e0477cf294..f7592f14bbc3 100644 --- a/processing/src/main/java/io/druid/query/aggregation/DoubleMinAggregator.java +++ b/processing/src/main/java/io/druid/query/aggregation/DoubleMinAggregator.java @@ -21,14 +21,10 @@ import io.druid.segment.DoubleColumnSelector; -import java.util.Comparator; - /** */ public class DoubleMinAggregator implements Aggregator { - static final Comparator COMPARATOR = DoubleSumAggregator.COMPARATOR; - static double combineValues(Object lhs, Object rhs) { return Math.min(((Number) lhs).doubleValue(), ((Number) rhs).doubleValue()); diff --git a/processing/src/main/java/io/druid/query/aggregation/DoubleMinAggregatorFactory.java b/processing/src/main/java/io/druid/query/aggregation/DoubleMinAggregatorFactory.java index ab5e4aaf2a28..9577e89846c3 100644 --- a/processing/src/main/java/io/druid/query/aggregation/DoubleMinAggregatorFactory.java +++ b/processing/src/main/java/io/druid/query/aggregation/DoubleMinAggregatorFactory.java @@ -28,7 +28,6 @@ import java.nio.ByteBuffer; import java.util.Arrays; -import java.util.Comparator; import java.util.List; import java.util.Objects; @@ -65,15 +64,15 @@ public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory) } @Override - public Comparator getComparator() + public Object combine(Object lhs, Object rhs) { - return DoubleMinAggregator.COMPARATOR; + return DoubleMinAggregator.combineValues(lhs, rhs); } @Override - public Object combine(Object lhs, Object rhs) + public AggregateCombiner makeAggregateCombiner() { - return DoubleMinAggregator.combineValues(lhs, rhs); + return new DoubleMinAggregateCombiner(); } @Override diff --git a/processing/src/main/java/io/druid/query/aggregation/DoubleSumAggregateCombiner.java b/processing/src/main/java/io/druid/query/aggregation/DoubleSumAggregateCombiner.java new file mode 100644 index 000000000000..60b34a5bee9e --- /dev/null +++ b/processing/src/main/java/io/druid/query/aggregation/DoubleSumAggregateCombiner.java @@ -0,0 +1,45 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.query.aggregation; + +import io.druid.segment.ColumnValueSelector; + +final class DoubleSumAggregateCombiner extends DoubleAggregateCombiner +{ + private double sum; + + @Override + public void reset(ColumnValueSelector selector) + { + sum = selector.getDouble(); + } + + @Override + public void fold(ColumnValueSelector selector) + { + sum += selector.getDouble(); + } + + @Override + public double getDouble() + { + return sum; + } +} diff --git a/processing/src/main/java/io/druid/query/aggregation/DoubleSumAggregatorFactory.java b/processing/src/main/java/io/druid/query/aggregation/DoubleSumAggregatorFactory.java index ac66d1bbdfba..50ec11bed556 100644 --- a/processing/src/main/java/io/druid/query/aggregation/DoubleSumAggregatorFactory.java +++ b/processing/src/main/java/io/druid/query/aggregation/DoubleSumAggregatorFactory.java @@ -70,6 +70,12 @@ public Object combine(Object lhs, Object rhs) return DoubleSumAggregator.combineValues(lhs, rhs); } + @Override + public AggregateCombiner makeAggregateCombiner() + { + return new DoubleSumAggregateCombiner(); + } + @Override public AggregatorFactory getCombiningFactory() { diff --git a/processing/src/main/java/io/druid/query/aggregation/FilteredAggregatorFactory.java b/processing/src/main/java/io/druid/query/aggregation/FilteredAggregatorFactory.java index 166421fc05e8..bb4ca9bfb08d 100644 --- a/processing/src/main/java/io/druid/query/aggregation/FilteredAggregatorFactory.java +++ b/processing/src/main/java/io/druid/query/aggregation/FilteredAggregatorFactory.java @@ -79,6 +79,12 @@ public Object combine(Object lhs, Object rhs) return delegate.combine(lhs, rhs); } + @Override + public AggregateCombiner makeAggregateCombiner() + { + return delegate.makeAggregateCombiner(); + } + @Override public AggregatorFactory getCombiningFactory() { diff --git a/processing/src/main/java/io/druid/query/aggregation/FloatMaxAggregator.java b/processing/src/main/java/io/druid/query/aggregation/FloatMaxAggregator.java index 0efbecdb1fd8..d344729d055b 100644 --- a/processing/src/main/java/io/druid/query/aggregation/FloatMaxAggregator.java +++ b/processing/src/main/java/io/druid/query/aggregation/FloatMaxAggregator.java @@ -21,14 +21,10 @@ import io.druid.segment.FloatColumnSelector; -import java.util.Comparator; - /** */ public class FloatMaxAggregator implements Aggregator { - static final Comparator COMPARATOR = FloatSumAggregator.COMPARATOR; - static double combineValues(Object lhs, Object rhs) { return Math.max(((Number) lhs).floatValue(), ((Number) rhs).floatValue()); diff --git a/processing/src/main/java/io/druid/query/aggregation/FloatMaxAggregatorFactory.java b/processing/src/main/java/io/druid/query/aggregation/FloatMaxAggregatorFactory.java index 31a4c49d3e23..63a83da06d2d 100644 --- a/processing/src/main/java/io/druid/query/aggregation/FloatMaxAggregatorFactory.java +++ b/processing/src/main/java/io/druid/query/aggregation/FloatMaxAggregatorFactory.java @@ -68,6 +68,12 @@ public Object combine(Object lhs, Object rhs) return FloatMaxAggregator.combineValues(finalizeComputation(lhs), finalizeComputation(rhs)); } + @Override + public AggregateCombiner makeAggregateCombiner() + { + return new DoubleMaxAggregateCombiner(); + } + @Override public AggregatorFactory getCombiningFactory() { diff --git a/processing/src/main/java/io/druid/query/aggregation/FloatMinAggregator.java b/processing/src/main/java/io/druid/query/aggregation/FloatMinAggregator.java index 67cd93f9e9ae..cbecd26b2238 100644 --- a/processing/src/main/java/io/druid/query/aggregation/FloatMinAggregator.java +++ b/processing/src/main/java/io/druid/query/aggregation/FloatMinAggregator.java @@ -21,14 +21,10 @@ import io.druid.segment.FloatColumnSelector; -import java.util.Comparator; - /** */ public class FloatMinAggregator implements Aggregator { - static final Comparator COMPARATOR = FloatSumAggregator.COMPARATOR; - static double combineValues(Object lhs, Object rhs) { return Math.min(((Number) lhs).floatValue(), ((Number) rhs).floatValue()); diff --git a/processing/src/main/java/io/druid/query/aggregation/FloatMinAggregatorFactory.java b/processing/src/main/java/io/druid/query/aggregation/FloatMinAggregatorFactory.java index c186ed8ea234..1129e15bc761 100644 --- a/processing/src/main/java/io/druid/query/aggregation/FloatMinAggregatorFactory.java +++ b/processing/src/main/java/io/druid/query/aggregation/FloatMinAggregatorFactory.java @@ -68,6 +68,12 @@ public Object combine(Object lhs, Object rhs) return FloatMinAggregator.combineValues(lhs, rhs); } + @Override + public AggregateCombiner makeAggregateCombiner() + { + return new DoubleMinAggregateCombiner(); + } + @Override public AggregatorFactory getCombiningFactory() { diff --git a/processing/src/main/java/io/druid/query/aggregation/FloatSumAggregatorFactory.java b/processing/src/main/java/io/druid/query/aggregation/FloatSumAggregatorFactory.java index a15c7004b53c..4986a1ff6a3b 100644 --- a/processing/src/main/java/io/druid/query/aggregation/FloatSumAggregatorFactory.java +++ b/processing/src/main/java/io/druid/query/aggregation/FloatSumAggregatorFactory.java @@ -68,6 +68,12 @@ public Object combine(Object lhs, Object rhs) return FloatSumAggregator.combineValues(lhs, rhs); } + @Override + public AggregateCombiner makeAggregateCombiner() + { + return new DoubleSumAggregateCombiner(); + } + @Override public AggregatorFactory getCombiningFactory() { diff --git a/processing/src/main/java/io/druid/query/aggregation/Histogram.java b/processing/src/main/java/io/druid/query/aggregation/Histogram.java index 8fa7188e75e9..dc8ac1234ec4 100644 --- a/processing/src/main/java/io/druid/query/aggregation/Histogram.java +++ b/processing/src/main/java/io/druid/query/aggregation/Histogram.java @@ -58,6 +58,28 @@ public Histogram(float[] breaks, long[] bins, float min, float max) } } + public Histogram(Histogram other) + { + this.breaks = other.breaks; + this.bins = other.bins.clone(); + this.min = other.min; + this.max = other.max; + this.count = other.count; + } + + public void copyFrom(Histogram other) + { + this.breaks = other.breaks; + if (this.bins.length == other.bins.length) { + System.arraycopy(other.bins, 0, this.bins, 0, this.bins.length); + } else { + this.bins = other.bins.clone(); + } + this.min = other.min; + this.max = other.max; + this.count = other.count; + } + public void offer(float d) { if (d > max) { diff --git a/processing/src/main/java/io/druid/query/aggregation/HistogramAggregator.java b/processing/src/main/java/io/druid/query/aggregation/HistogramAggregator.java index 665bf8c9d877..344899d7607c 100644 --- a/processing/src/main/java/io/druid/query/aggregation/HistogramAggregator.java +++ b/processing/src/main/java/io/druid/query/aggregation/HistogramAggregator.java @@ -35,7 +35,7 @@ public int compare(Object o, Object o1) } }; - static Object combineHistograms(Object lhs, Object rhs) + static Histogram combineHistograms(Object lhs, Object rhs) { return ((Histogram) lhs).fold((Histogram) rhs); } diff --git a/processing/src/main/java/io/druid/query/aggregation/HistogramAggregatorFactory.java b/processing/src/main/java/io/druid/query/aggregation/HistogramAggregatorFactory.java index 80eade4092ae..1bccb8dcaed4 100644 --- a/processing/src/main/java/io/druid/query/aggregation/HistogramAggregatorFactory.java +++ b/processing/src/main/java/io/druid/query/aggregation/HistogramAggregatorFactory.java @@ -27,8 +27,11 @@ import com.google.common.primitives.Longs; import io.druid.java.util.common.StringUtils; import io.druid.segment.ColumnSelectorFactory; +import io.druid.segment.ColumnValueSelector; +import io.druid.segment.ObjectColumnSelector; import org.apache.commons.codec.binary.Base64; +import javax.annotation.Nullable; import java.nio.ByteBuffer; import java.util.Arrays; import java.util.Comparator; @@ -88,6 +91,50 @@ public Object combine(Object lhs, Object rhs) return HistogramAggregator.combineHistograms(lhs, rhs); } + @Override + public AggregateCombiner makeAggregateCombiner() + { + // HistogramAggregatorFactory.combine() delegates to HistogramAggregator.combineHistograms() and it doesn't check + // for nulls, so this AggregateCombiner neither. + return new ObjectAggregateCombiner() + { + private Histogram combined; + + @Override + public void reset(ColumnValueSelector selector) + { + @SuppressWarnings("unchecked") + Histogram first = ((ObjectColumnSelector) selector).get(); + if (combined == null) { + combined = new Histogram(first); + } else { + combined.copyFrom(first); + } + } + + @Override + public void fold(ColumnValueSelector selector) + { + @SuppressWarnings("unchecked") + Histogram other = ((ObjectColumnSelector) selector).get(); + combined.fold(other); + } + + @Override + public Class classOfObject() + { + return Histogram.class; + } + + @Nullable + @Override + public Histogram get() + { + return combined; + } + }; + } + @Override public AggregatorFactory getCombiningFactory() { diff --git a/processing/src/main/java/io/druid/query/aggregation/JavaScriptAggregatorFactory.java b/processing/src/main/java/io/druid/query/aggregation/JavaScriptAggregatorFactory.java index bfd4647fad42..afcc71bd22a3 100644 --- a/processing/src/main/java/io/druid/query/aggregation/JavaScriptAggregatorFactory.java +++ b/processing/src/main/java/io/druid/query/aggregation/JavaScriptAggregatorFactory.java @@ -32,6 +32,7 @@ import io.druid.java.util.common.StringUtils; import io.druid.js.JavaScriptConfig; import io.druid.segment.ColumnSelectorFactory; +import io.druid.segment.ColumnValueSelector; import io.druid.segment.ObjectColumnSelector; import org.mozilla.javascript.Context; import org.mozilla.javascript.ContextAction; @@ -140,6 +141,33 @@ public Object combine(Object lhs, Object rhs) return getCompiledScript().combine(((Number) lhs).doubleValue(), ((Number) rhs).doubleValue()); } + @Override + public AggregateCombiner makeAggregateCombiner() + { + return new DoubleAggregateCombiner() + { + private double combined; + + @Override + public void reset(ColumnValueSelector selector) + { + combined = selector.getDouble(); + } + + @Override + public void fold(ColumnValueSelector selector) + { + combined = getCompiledScript().combine(combined, selector.getDouble()); + } + + @Override + public double getDouble() + { + return combined; + } + }; + } + @Override public AggregatorFactory getCombiningFactory() { diff --git a/processing/src/main/java/io/druid/query/aggregation/LongAggregateCombiner.java b/processing/src/main/java/io/druid/query/aggregation/LongAggregateCombiner.java new file mode 100644 index 000000000000..949f4164a27f --- /dev/null +++ b/processing/src/main/java/io/druid/query/aggregation/LongAggregateCombiner.java @@ -0,0 +1,35 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.query.aggregation; + +import io.druid.query.monomorphicprocessing.RuntimeShapeInspector; +import io.druid.segment.LongColumnSelector; + +/** + * Specialization of {@link AggregateCombiner} for primitive long aggregations. + */ +public abstract class LongAggregateCombiner implements AggregateCombiner, LongColumnSelector +{ + @Override + public void inspectRuntimeShape(RuntimeShapeInspector inspector) + { + // Usually AggregateCombiner has nothing to inspect + } +} diff --git a/processing/src/main/java/io/druid/query/aggregation/LongMaxAggregatorFactory.java b/processing/src/main/java/io/druid/query/aggregation/LongMaxAggregatorFactory.java index 66b07c288e1d..35c25cffeb30 100644 --- a/processing/src/main/java/io/druid/query/aggregation/LongMaxAggregatorFactory.java +++ b/processing/src/main/java/io/druid/query/aggregation/LongMaxAggregatorFactory.java @@ -28,6 +28,7 @@ import io.druid.math.expr.ExprMacroTable; import io.druid.math.expr.Parser; import io.druid.segment.ColumnSelectorFactory; +import io.druid.segment.ColumnValueSelector; import io.druid.segment.LongColumnSelector; import java.nio.ByteBuffer; @@ -100,6 +101,33 @@ public Object combine(Object lhs, Object rhs) return LongMaxAggregator.combineValues(lhs, rhs); } + @Override + public AggregateCombiner makeAggregateCombiner() + { + return new LongAggregateCombiner() + { + private long max; + + @Override + public void reset(ColumnValueSelector selector) + { + max = selector.getLong(); + } + + @Override + public void fold(ColumnValueSelector selector) + { + max = Math.max(max, selector.getLong()); + } + + @Override + public long getLong() + { + return max; + } + }; + } + @Override public AggregatorFactory getCombiningFactory() { diff --git a/processing/src/main/java/io/druid/query/aggregation/LongMinAggregatorFactory.java b/processing/src/main/java/io/druid/query/aggregation/LongMinAggregatorFactory.java index d09f23dc4b04..d024dc6afa1c 100644 --- a/processing/src/main/java/io/druid/query/aggregation/LongMinAggregatorFactory.java +++ b/processing/src/main/java/io/druid/query/aggregation/LongMinAggregatorFactory.java @@ -28,6 +28,7 @@ import io.druid.math.expr.ExprMacroTable; import io.druid.math.expr.Parser; import io.druid.segment.ColumnSelectorFactory; +import io.druid.segment.ColumnValueSelector; import io.druid.segment.LongColumnSelector; import java.nio.ByteBuffer; @@ -101,6 +102,33 @@ public Object combine(Object lhs, Object rhs) return LongMinAggregator.combineValues(lhs, rhs); } + @Override + public AggregateCombiner makeAggregateCombiner() + { + return new LongAggregateCombiner() + { + private long min; + + @Override + public void reset(ColumnValueSelector selector) + { + min = selector.getLong(); + } + + @Override + public void fold(ColumnValueSelector selector) + { + min = Math.min(min, selector.getLong()); + } + + @Override + public long getLong() + { + return min; + } + }; + } + @Override public AggregatorFactory getCombiningFactory() { diff --git a/processing/src/main/java/io/druid/query/aggregation/LongSumAggregateCombiner.java b/processing/src/main/java/io/druid/query/aggregation/LongSumAggregateCombiner.java new file mode 100644 index 000000000000..0ceff541b1b7 --- /dev/null +++ b/processing/src/main/java/io/druid/query/aggregation/LongSumAggregateCombiner.java @@ -0,0 +1,45 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.query.aggregation; + +import io.druid.segment.ColumnValueSelector; + +public final class LongSumAggregateCombiner extends LongAggregateCombiner +{ + private long sum; + + @Override + public void reset(ColumnValueSelector selector) + { + sum = selector.getLong(); + } + + @Override + public void fold(ColumnValueSelector selector) + { + sum += selector.getLong(); + } + + @Override + public long getLong() + { + return sum; + } +} diff --git a/processing/src/main/java/io/druid/query/aggregation/LongSumAggregatorFactory.java b/processing/src/main/java/io/druid/query/aggregation/LongSumAggregatorFactory.java index d2212c4fbeb6..cec708eb4a60 100644 --- a/processing/src/main/java/io/druid/query/aggregation/LongSumAggregatorFactory.java +++ b/processing/src/main/java/io/druid/query/aggregation/LongSumAggregatorFactory.java @@ -100,6 +100,12 @@ public Object combine(Object lhs, Object rhs) return LongSumAggregator.combineValues(lhs, rhs); } + @Override + public AggregateCombiner makeAggregateCombiner() + { + return new LongSumAggregateCombiner(); + } + @Override public AggregatorFactory getCombiningFactory() { diff --git a/processing/src/main/java/io/druid/query/aggregation/ObjectAggregateCombiner.java b/processing/src/main/java/io/druid/query/aggregation/ObjectAggregateCombiner.java new file mode 100644 index 000000000000..29bc0950982f --- /dev/null +++ b/processing/src/main/java/io/druid/query/aggregation/ObjectAggregateCombiner.java @@ -0,0 +1,29 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.query.aggregation; + +import io.druid.segment.ObjectColumnSelector; + +/** + * Specialization of {@link AggregateCombiner} for object aggregations. + */ +public abstract class ObjectAggregateCombiner implements AggregateCombiner, ObjectColumnSelector +{ +} diff --git a/processing/src/main/java/io/druid/query/aggregation/cardinality/CardinalityAggregatorFactory.java b/processing/src/main/java/io/druid/query/aggregation/cardinality/CardinalityAggregatorFactory.java index 4a8129e4054b..177815391add 100644 --- a/processing/src/main/java/io/druid/query/aggregation/cardinality/CardinalityAggregatorFactory.java +++ b/processing/src/main/java/io/druid/query/aggregation/cardinality/CardinalityAggregatorFactory.java @@ -28,6 +28,7 @@ import io.druid.hll.HyperLogLogCollector; import io.druid.java.util.common.StringUtils; import io.druid.query.ColumnSelectorPlus; +import io.druid.query.aggregation.AggregateCombiner; import io.druid.query.aggregation.Aggregator; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.AggregatorFactoryNotMergeableException; @@ -191,6 +192,12 @@ public Object combine(Object lhs, Object rhs) return ((HyperLogLogCollector) lhs).fold((HyperLogLogCollector) rhs); } + @Override + public AggregateCombiner makeAggregateCombiner() + { + return new HyperLogLogCollectorAggregateCombiner(); + } + @Override public AggregatorFactory getCombiningFactory() { diff --git a/processing/src/main/java/io/druid/query/aggregation/cardinality/HyperLogLogCollectorAggregateCombiner.java b/processing/src/main/java/io/druid/query/aggregation/cardinality/HyperLogLogCollectorAggregateCombiner.java new file mode 100644 index 000000000000..86d8c972c859 --- /dev/null +++ b/processing/src/main/java/io/druid/query/aggregation/cardinality/HyperLogLogCollectorAggregateCombiner.java @@ -0,0 +1,67 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.query.aggregation.cardinality; + +import io.druid.hll.HyperLogLogCollector; +import io.druid.query.aggregation.ObjectAggregateCombiner; +import io.druid.segment.ColumnValueSelector; +import io.druid.segment.ObjectColumnSelector; + +import javax.annotation.Nullable; + +public final class HyperLogLogCollectorAggregateCombiner extends ObjectAggregateCombiner +{ + @Nullable + private HyperLogLogCollector combined; + + @Override + public void reset(ColumnValueSelector selector) + { + combined = null; + fold(selector); + } + + @Override + public void fold(ColumnValueSelector selector) + { + @SuppressWarnings("unchecked") + HyperLogLogCollector other = ((ObjectColumnSelector) selector).get(); + if (other == null) { + return; + } + if (combined == null) { + combined = HyperLogLogCollector.makeLatestCollector(); + } + combined.fold(other); + } + + @Override + public Class classOfObject() + { + return HyperLogLogCollector.class; + } + + @Nullable + @Override + public HyperLogLogCollector get() + { + return combined; + } +} diff --git a/processing/src/main/java/io/druid/query/aggregation/first/DoubleFirstAggregatorFactory.java b/processing/src/main/java/io/druid/query/aggregation/first/DoubleFirstAggregatorFactory.java index 9a4af62eae11..783ba82d2b3b 100644 --- a/processing/src/main/java/io/druid/query/aggregation/first/DoubleFirstAggregatorFactory.java +++ b/processing/src/main/java/io/druid/query/aggregation/first/DoubleFirstAggregatorFactory.java @@ -26,11 +26,13 @@ import com.google.common.primitives.Longs; import com.metamx.common.StringUtils; import io.druid.collections.SerializablePair; +import io.druid.java.util.common.UOE; import io.druid.query.aggregation.Aggregator; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.AggregatorFactoryNotMergeableException; import io.druid.query.aggregation.AggregatorUtil; import io.druid.query.aggregation.BufferAggregator; +import io.druid.query.aggregation.AggregateCombiner; import io.druid.query.monomorphicprocessing.RuntimeShapeInspector; import io.druid.segment.ColumnSelectorFactory; import io.druid.segment.ObjectColumnSelector; @@ -102,6 +104,12 @@ public Object combine(Object lhs, Object rhs) return TIME_COMPARATOR.compare(lhs, rhs) <= 0 ? lhs : rhs; } + @Override + public AggregateCombiner makeAggregateCombiner() + { + throw new UOE("DoubleFirstAggregatorFactory is not supported during ingestion for rollup"); + } + @Override public AggregatorFactory getCombiningFactory() { diff --git a/processing/src/main/java/io/druid/query/aggregation/first/FloatFirstAggregatorFactory.java b/processing/src/main/java/io/druid/query/aggregation/first/FloatFirstAggregatorFactory.java index a6d6823e2cdc..e26b0228f8a2 100644 --- a/processing/src/main/java/io/druid/query/aggregation/first/FloatFirstAggregatorFactory.java +++ b/processing/src/main/java/io/druid/query/aggregation/first/FloatFirstAggregatorFactory.java @@ -26,11 +26,13 @@ import com.google.common.primitives.Longs; import com.metamx.common.StringUtils; import io.druid.collections.SerializablePair; +import io.druid.java.util.common.UOE; import io.druid.query.aggregation.Aggregator; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.AggregatorFactoryNotMergeableException; import io.druid.query.aggregation.AggregatorUtil; import io.druid.query.aggregation.BufferAggregator; +import io.druid.query.aggregation.AggregateCombiner; import io.druid.query.monomorphicprocessing.RuntimeShapeInspector; import io.druid.segment.ColumnSelectorFactory; import io.druid.segment.ObjectColumnSelector; @@ -102,6 +104,12 @@ public Object combine(Object lhs, Object rhs) return TIME_COMPARATOR.compare(lhs, rhs) <= 0 ? lhs : rhs; } + @Override + public AggregateCombiner makeAggregateCombiner() + { + throw new UOE("FloatFirstAggregatorFactory is not supported during ingestion for rollup"); + } + @Override public AggregatorFactory getCombiningFactory() { diff --git a/processing/src/main/java/io/druid/query/aggregation/first/LongFirstAggregatorFactory.java b/processing/src/main/java/io/druid/query/aggregation/first/LongFirstAggregatorFactory.java index 1cdffb2d49b0..96ff43d2792d 100644 --- a/processing/src/main/java/io/druid/query/aggregation/first/LongFirstAggregatorFactory.java +++ b/processing/src/main/java/io/druid/query/aggregation/first/LongFirstAggregatorFactory.java @@ -25,11 +25,13 @@ import com.google.common.primitives.Longs; import com.metamx.common.StringUtils; import io.druid.collections.SerializablePair; +import io.druid.java.util.common.UOE; import io.druid.query.aggregation.Aggregator; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.AggregatorFactoryNotMergeableException; import io.druid.query.aggregation.AggregatorUtil; import io.druid.query.aggregation.BufferAggregator; +import io.druid.query.aggregation.AggregateCombiner; import io.druid.query.monomorphicprocessing.RuntimeShapeInspector; import io.druid.segment.ColumnSelectorFactory; import io.druid.segment.ObjectColumnSelector; @@ -95,6 +97,12 @@ public Object combine(Object lhs, Object rhs) return DoubleFirstAggregatorFactory.TIME_COMPARATOR.compare(lhs, rhs) <= 0 ? lhs : rhs; } + @Override + public AggregateCombiner makeAggregateCombiner() + { + throw new UOE("LongFirstAggregatorFactory is not supported during ingestion for rollup"); + } + @Override public AggregatorFactory getCombiningFactory() { diff --git a/processing/src/main/java/io/druid/query/aggregation/hyperloglog/HyperUniquesAggregatorFactory.java b/processing/src/main/java/io/druid/query/aggregation/hyperloglog/HyperUniquesAggregatorFactory.java index bc61b4368022..fd0850ce352b 100644 --- a/processing/src/main/java/io/druid/query/aggregation/hyperloglog/HyperUniquesAggregatorFactory.java +++ b/processing/src/main/java/io/druid/query/aggregation/hyperloglog/HyperUniquesAggregatorFactory.java @@ -25,6 +25,7 @@ import io.druid.java.util.common.IAE; import io.druid.java.util.common.StringUtils; import io.druid.java.util.common.guava.Comparators; +import io.druid.query.aggregation.AggregateCombiner; import io.druid.query.aggregation.Aggregator; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.AggregatorFactoryNotMergeableException; @@ -32,6 +33,7 @@ import io.druid.query.aggregation.BufferAggregator; import io.druid.query.aggregation.NoopAggregator; import io.druid.query.aggregation.NoopBufferAggregator; +import io.druid.query.aggregation.cardinality.HyperLogLogCollectorAggregateCombiner; import io.druid.segment.ColumnSelectorFactory; import io.druid.segment.ObjectColumnSelector; import org.apache.commons.codec.binary.Base64; @@ -136,6 +138,12 @@ public Object combine(Object lhs, Object rhs) return ((HyperLogLogCollector) lhs).fold((HyperLogLogCollector) rhs); } + @Override + public AggregateCombiner makeAggregateCombiner() + { + return new HyperLogLogCollectorAggregateCombiner(); + } + @Override public AggregatorFactory getCombiningFactory() { diff --git a/processing/src/main/java/io/druid/query/aggregation/last/DoubleLastAggregatorFactory.java b/processing/src/main/java/io/druid/query/aggregation/last/DoubleLastAggregatorFactory.java index 27aefaaffffc..ddaec8698897 100644 --- a/processing/src/main/java/io/druid/query/aggregation/last/DoubleLastAggregatorFactory.java +++ b/processing/src/main/java/io/druid/query/aggregation/last/DoubleLastAggregatorFactory.java @@ -25,6 +25,8 @@ import com.google.common.primitives.Longs; import com.metamx.common.StringUtils; import io.druid.collections.SerializablePair; +import io.druid.java.util.common.UOE; +import io.druid.query.aggregation.AggregateCombiner; import io.druid.query.aggregation.Aggregator; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.AggregatorFactoryNotMergeableException; @@ -93,6 +95,12 @@ public Object combine(Object lhs, Object rhs) return DoubleFirstAggregatorFactory.TIME_COMPARATOR.compare(lhs, rhs) > 0 ? lhs : rhs; } + @Override + public AggregateCombiner makeAggregateCombiner() + { + throw new UOE("DoubleLastAggregatorFactory is not supported during ingestion for rollup"); + } + @Override public AggregatorFactory getCombiningFactory() { diff --git a/processing/src/main/java/io/druid/query/aggregation/last/FloatLastAggregatorFactory.java b/processing/src/main/java/io/druid/query/aggregation/last/FloatLastAggregatorFactory.java index 6f2d55423dd2..34f818e490bf 100644 --- a/processing/src/main/java/io/druid/query/aggregation/last/FloatLastAggregatorFactory.java +++ b/processing/src/main/java/io/druid/query/aggregation/last/FloatLastAggregatorFactory.java @@ -25,11 +25,13 @@ import com.google.common.primitives.Longs; import com.metamx.common.StringUtils; import io.druid.collections.SerializablePair; +import io.druid.java.util.common.UOE; import io.druid.query.aggregation.Aggregator; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.AggregatorFactoryNotMergeableException; import io.druid.query.aggregation.AggregatorUtil; import io.druid.query.aggregation.BufferAggregator; +import io.druid.query.aggregation.AggregateCombiner; import io.druid.query.aggregation.first.FloatFirstAggregatorFactory; import io.druid.query.aggregation.first.LongFirstAggregatorFactory; import io.druid.query.monomorphicprocessing.RuntimeShapeInspector; @@ -93,6 +95,12 @@ public Object combine(Object lhs, Object rhs) return FloatFirstAggregatorFactory.TIME_COMPARATOR.compare(lhs, rhs) > 0 ? lhs : rhs; } + @Override + public AggregateCombiner makeAggregateCombiner() + { + throw new UOE("FloatLastAggregatorFactory is not supported during ingestion for rollup"); + } + @Override public AggregatorFactory getCombiningFactory() { diff --git a/processing/src/main/java/io/druid/query/aggregation/last/LongLastAggregatorFactory.java b/processing/src/main/java/io/druid/query/aggregation/last/LongLastAggregatorFactory.java index 32623c50b891..23bd19345231 100644 --- a/processing/src/main/java/io/druid/query/aggregation/last/LongLastAggregatorFactory.java +++ b/processing/src/main/java/io/druid/query/aggregation/last/LongLastAggregatorFactory.java @@ -24,11 +24,13 @@ import com.google.common.base.Preconditions; import com.metamx.common.StringUtils; import io.druid.collections.SerializablePair; +import io.druid.java.util.common.UOE; import io.druid.query.aggregation.Aggregator; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.AggregatorFactoryNotMergeableException; import io.druid.query.aggregation.AggregatorUtil; import io.druid.query.aggregation.BufferAggregator; +import io.druid.query.aggregation.AggregateCombiner; import io.druid.query.aggregation.first.DoubleFirstAggregatorFactory; import io.druid.query.aggregation.first.LongFirstAggregatorFactory; import io.druid.query.monomorphicprocessing.RuntimeShapeInspector; @@ -91,6 +93,11 @@ public Object combine(Object lhs, Object rhs) return DoubleFirstAggregatorFactory.TIME_COMPARATOR.compare(lhs, rhs) > 0 ? lhs : rhs; } + @Override + public AggregateCombiner makeAggregateCombiner() + { + throw new UOE("LongLastAggregatorFactory is not supported during ingestion for rollup"); + } @Override public AggregatorFactory getCombiningFactory() diff --git a/processing/src/main/java/io/druid/segment/ObjectColumnSelector.java b/processing/src/main/java/io/druid/segment/ObjectColumnSelector.java index 6dac1a42d5d4..498aa6273752 100644 --- a/processing/src/main/java/io/druid/segment/ObjectColumnSelector.java +++ b/processing/src/main/java/io/druid/segment/ObjectColumnSelector.java @@ -19,6 +19,8 @@ package io.druid.segment; +import javax.annotation.Nullable; + public interface ObjectColumnSelector extends ColumnValueSelector { public Class classOfObject(); @@ -28,6 +30,7 @@ public interface ObjectColumnSelector extends ColumnValueSelector * ObjectColumnSelector doesn't extend {@link io.druid.query.monomorphicprocessing.HotLoopCallee} yet. If it will, * this method should be annotated. */ + @Nullable public T get(); /** @@ -39,7 +42,11 @@ public interface ObjectColumnSelector extends ColumnValueSelector @Override default float getFloat() { - return ((Number) get()).floatValue(); + T value = get(); + if (value == null) { + return 0; + } + return ((Number) value).floatValue(); } /** @@ -51,7 +58,11 @@ default float getFloat() @Override default double getDouble() { - return ((Number) get()).doubleValue(); + T value = get(); + if (value == null) { + return 0; + } + return ((Number) value).doubleValue(); } /** @@ -63,6 +74,10 @@ default double getDouble() @Override default long getLong() { - return ((Number) get()).longValue(); + T value = get(); + if (value == null) { + return 0; + } + return ((Number) value).longValue(); } }