Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import io.druid.query.aggregation.AggregatorUtil;
import io.druid.query.aggregation.BufferAggregator;
import io.druid.query.aggregation.LongSumAggregatorFactory;
import io.druid.query.aggregation.LongSumAggregateCombiner;
import io.druid.query.aggregation.AggregateCombiner;
import io.druid.query.dimension.DefaultDimensionSpec;
import io.druid.segment.ColumnSelectorFactory;
import io.druid.segment.DimensionSelector;
Expand Down Expand Up @@ -119,6 +121,13 @@ public Object combine(Object lhs, Object rhs)
return ((Number) lhs).longValue() + ((Number) rhs).longValue();
}

@Override
public AggregateCombiner makeAggregateCombiner()
{
// This is likely wrong as well as combine(), see https://github.com/druid-io/druid/pull/2602#issuecomment-321224202
return new LongSumAggregateCombiner();
}

@Override
public AggregatorFactory getCombiningFactory()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,13 @@ public class TimestampAggregator implements Aggregator
{
static final Comparator COMPARATOR = LongMaxAggregator.COMPARATOR;

static long combineValues(Object lhs, Object rhs)
static Object combineValues(Comparator<Long> comparator, Object lhs, Object rhs)
{
return Math.max(((Number) lhs).longValue(), ((Number) rhs).longValue());
if (comparator.compare(((Number) lhs).longValue(), ((Number) rhs).longValue()) > 0) {
return lhs;
} else {
return rhs;
}
}

private final ObjectColumnSelector selector;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import io.druid.java.util.common.DateTimes;
import io.druid.java.util.common.StringUtils;
import io.druid.segment.ColumnSelectorFactory;
import io.druid.segment.ColumnValueSelector;
import io.druid.segment.ObjectColumnSelector;
import org.joda.time.DateTime;

import java.nio.ByteBuffer;
Expand Down Expand Up @@ -83,7 +85,49 @@ public Comparator getComparator()
@Override
public Object combine(Object lhs, Object rhs)
{
return TimestampAggregator.combineValues(lhs, rhs);
return TimestampAggregator.combineValues(comparator, lhs, rhs);
}

@Override
public AggregateCombiner makeAggregateCombiner()
{
// TimestampAggregatorFactory.combine() delegates to TimestampAggregator.combineValues() and it doesn't check
// for nulls, so this AggregateCombiner neither.
return new LongAggregateCombiner()
{
private long result;

@Override
public void reset(ColumnValueSelector selector)
{
result = getTimestamp(selector);
}

private long getTimestamp(ColumnValueSelector selector)
{
if (selector instanceof ObjectColumnSelector) {
Object input = ((ObjectColumnSelector) selector).get();
return convertLong(timestampSpec, input);
} else {
return selector.getLong();
}
}

@Override
public void fold(ColumnValueSelector selector)
{
long other = getTimestamp(selector);
if (comparator.compare(result, other) <= 0) {
result = other;
}
}

@Override
public long getLong()
{
return result;
}
};
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,21 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import com.google.common.primitives.Ints;
import com.yahoo.sketches.Family;
import com.yahoo.sketches.Util;
import com.yahoo.sketches.theta.SetOperation;
import com.yahoo.sketches.theta.Union;
import io.druid.java.util.common.StringUtils;
import io.druid.query.aggregation.AggregateCombiner;
import io.druid.query.aggregation.Aggregator;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.BufferAggregator;
import io.druid.query.aggregation.ObjectAggregateCombiner;
import io.druid.segment.ColumnSelectorFactory;
import io.druid.segment.ColumnValueSelector;
import io.druid.segment.ObjectColumnSelector;

import javax.annotation.Nullable;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.Comparator;
Expand Down Expand Up @@ -98,6 +104,47 @@ public Object combine(Object lhs, Object rhs)
return SketchHolder.combine(lhs, rhs, size);
}

@Override
public AggregateCombiner makeAggregateCombiner()
{
return new ObjectAggregateCombiner<SketchHolder>()
{
private final Union union = (Union) SetOperation.builder().build(size, Family.UNION);
private final SketchHolder combined = SketchHolder.of(union);

@Override
public void reset(ColumnValueSelector selector)
{
union.reset();
fold(selector);
}

@Override
public void fold(ColumnValueSelector selector)
{
@SuppressWarnings("unchecked")
SketchHolder other = ((ObjectColumnSelector<SketchHolder>) selector).get();
// SketchAggregatorFactory.combine() delegates to SketchHolder.combine() and it doesn't check for nulls, so we
// neither.
other.updateUnion(union);
combined.invalidateCache();
}

@Override
public Class<SketchHolder> classOfObject()
{
return SketchHolder.class;
}

@Nullable
@Override
public SketchHolder get()
{
return combined;
}
};
}

@Override
@JsonProperty
public String getName()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ public SketchEstimateWithErrorBounds getEstimateWithErrorBounds(int errorBoundsS
return result;
}

public static Object combine(Object o1, Object o2, int nomEntries)
public static SketchHolder combine(Object o1, Object o2, int nomEntries)
{
SketchHolder holder1 = (SketchHolder) o1;
SketchHolder holder2 = (SketchHolder) o2;
Expand All @@ -194,7 +194,7 @@ public static Object combine(Object o1, Object o2, int nomEntries)
}
}

private void invalidateCache()
void invalidateCache()
{
cachedEstimate = null;
cachedSketch = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -534,9 +534,11 @@ public ApproximateHistogram foldFast(ApproximateHistogram h, float[] mergedPosit
*/
public ApproximateHistogram copy(ApproximateHistogram h)
{
this.size = h.size;
this.positions = new float[size];
this.bins = new long[size];
if (h.size > this.size) {
this.size = h.size;
this.positions = new float[size];
this.bins = new long[size];
}

System.arraycopy(h.positions, 0, this.positions, 0, h.binCount);
System.arraycopy(h.bins, 0, this.bins, 0, h.binCount);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public int compare(Object o, Object o1)
}
};

static Object combineHistograms(Object lhs, Object rhs)
static ApproximateHistogram combineHistograms(Object lhs, Object rhs)
{
return ((ApproximateHistogram) lhs).foldFast((ApproximateHistogram) rhs);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,20 @@
import com.google.common.base.Preconditions;
import com.google.common.primitives.Floats;
import com.google.common.primitives.Ints;

import io.druid.java.util.common.StringUtils;
import io.druid.query.aggregation.AggregateCombiner;
import io.druid.query.aggregation.Aggregator;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.AggregatorFactoryNotMergeableException;
import io.druid.query.aggregation.AggregatorUtil;
import io.druid.query.aggregation.BufferAggregator;
import io.druid.query.aggregation.ObjectAggregateCombiner;
import io.druid.segment.ColumnSelectorFactory;
import io.druid.segment.ColumnValueSelector;
import io.druid.segment.ObjectColumnSelector;
import org.apache.commons.codec.binary.Base64;

import javax.annotation.Nullable;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collections;
Expand Down Expand Up @@ -111,6 +115,46 @@ public Object combine(Object lhs, Object rhs)
return ApproximateHistogramAggregator.combineHistograms(lhs, rhs);
}

@Override
public AggregateCombiner makeAggregateCombiner()
{
// ApproximateHistogramAggregatorFactory.combine() delegates to ApproximateHistogramAggregator.combineHistograms()
// and it doesn't check for nulls, so this AggregateCombiner neither.
return new ObjectAggregateCombiner<ApproximateHistogram>()
{
private final ApproximateHistogram combined = new ApproximateHistogram();

@Override
public void reset(ColumnValueSelector selector)
{
@SuppressWarnings("unchecked")
ApproximateHistogram first = ((ObjectColumnSelector<ApproximateHistogram>) selector).get();
combined.copy(first);
}

@Override
public void fold(ColumnValueSelector selector)
{
@SuppressWarnings("unchecked")
ApproximateHistogram other = ((ObjectColumnSelector<ApproximateHistogram>) selector).get();
combined.foldFast(other);
}

@Override
public Class<ApproximateHistogram> classOfObject()
{
return ApproximateHistogram.class;
}

@Nullable
@Override
public ApproximateHistogram get()
{
return combined;
}
};
}

@Override
public AggregatorFactory getCombiningFactory()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,29 +74,29 @@ public int compare(VarianceAggregatorCollector o1, VarianceAggregatorCollector o
}
};

static Object combineValues(Object lhs, Object rhs)
void fold(VarianceAggregatorCollector other)
{
final VarianceAggregatorCollector holder1 = (VarianceAggregatorCollector) lhs;
final VarianceAggregatorCollector holder2 = (VarianceAggregatorCollector) rhs;

if (holder2.count == 0) {
return holder1;
if (other.count == 0) {
return;
}
if (holder1.count == 0) {
holder1.nvariance = holder2.nvariance;
holder1.count = holder2.count;
holder1.sum = holder2.sum;
return holder1;
if (this.count == 0) {
this.nvariance = other.nvariance;
this.count = other.count;
this.sum = other.sum;
return;
}
final double ratio = this.count / (double) other.count;
final double t = this.sum / ratio - other.sum;

final double ratio = holder1.count / (double) holder2.count;
final double t = holder1.sum / ratio - holder2.sum;

holder1.nvariance += holder2.nvariance + (ratio / (holder1.count + holder2.count) * t * t);
holder1.count += holder2.count;
holder1.sum += holder2.sum;
this.nvariance += other.nvariance + (ratio / (this.count + other.count) * t * t);
this.count += other.count;
this.sum += other.sum;
}

return holder1;
static Object combineValues(Object lhs, Object rhs)
{
((VarianceAggregatorCollector) lhs).fold((VarianceAggregatorCollector) rhs);
return lhs;
}

static int getMaxIntermediateSize()
Expand All @@ -120,6 +120,13 @@ public void reset()
nvariance = 0;
}

void copyFrom(VarianceAggregatorCollector other)
{
this.count = other.count;
this.sum = other.sum;
this.nvariance = other.nvariance;
}

public VarianceAggregatorCollector(long count, double sum, double nvariance)
{
this.count = count;
Expand Down
Loading