Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,7 @@ private HadoopDruidIndexerConfig makeHadoopDruidIndexerConfig(Map<String, Object
Map.class
),
new AggregatorFactory[]{
new LongSumAggregatorFactory("visited_sum", "visited_num"),
new LongSumAggregatorFactory("visited_sum", "visited_num", 1),
new HyperUniquesAggregatorFactory("unique_hosts", "host2")
},
new UniformGranularitySpec(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ public DetermineHashedPartitionsJobTest(String dataFilePath, long targetPartitio
),
Map.class
),
new AggregatorFactory[]{new DoubleSumAggregatorFactory("index", "index")},
new AggregatorFactory[]{new DoubleSumAggregatorFactory("index", "index", 1)},
new UniformGranularitySpec(
Granularity.DAY,
QueryGranularity.NONE,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ public DeterminePartitionsJobTest(
),
Map.class
),
new AggregatorFactory[]{new LongSumAggregatorFactory("visited_num", "visited_num")},
new AggregatorFactory[]{new LongSumAggregatorFactory("visited_num", "visited_num", 1)},
new UniformGranularitySpec(
Granularity.DAY, QueryGranularity.NONE, ImmutableList.of(new Interval(interval))
),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ public void setUp() throws Exception
Map.class
),
new AggregatorFactory[]{
new LongSumAggregatorFactory("visited_sum", "visited"),
new LongSumAggregatorFactory("visited_sum", "visited", 1),
new HyperUniquesAggregatorFactory("unique_hosts", "host")
},
new UniformGranularitySpec(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -368,7 +368,7 @@ public void setUp() throws Exception
Map.class
),
new AggregatorFactory[]{
new LongSumAggregatorFactory("visited_num", "visited_num"),
new LongSumAggregatorFactory("visited_num", "visited_num", 1),
new HyperUniquesAggregatorFactory("unique_hosts", "host")
},
new UniformGranularitySpec(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,9 @@ public void testSerde()
);

AggregatorFactory[] aggregatorFactories = new AggregatorFactory[] {
new DoubleSumAggregatorFactory("agg_non_existing", "agg_non_existing_in"),
new DoubleSumAggregatorFactory("m1out", "m1"),
new LongSumAggregatorFactory("m2out", "m2"),
new DoubleSumAggregatorFactory("agg_non_existing", "agg_non_existing_in", 1),
new DoubleSumAggregatorFactory("m1out", "m1", 1),
new LongSumAggregatorFactory("m2out", "m2", 1),
new HyperUniquesAggregatorFactory("m3out", "m3")
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ public void setup() throws Exception
),
Map.class
),
new AggregatorFactory[]{new LongSumAggregatorFactory("visited_num", "visited_num")},
new AggregatorFactory[]{new LongSumAggregatorFactory("visited_num", "visited_num", 1)},
new UniformGranularitySpec(
Granularity.DAY, QueryGranularity.NONE, ImmutableList.of(this.interval)
),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ public void testAddInputPaths() throws Exception
Map.class
),
new AggregatorFactory[]{
new LongSumAggregatorFactory("visited_sum", "visited")
new LongSumAggregatorFactory("visited_sum", "visited", 1)
},
new UniformGranularitySpec(
Granularity.DAY, QueryGranularity.NONE, ImmutableList.of(Interval.parse("2000/3000"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ public InputStream openStream() throws IOException
Map.class
),
new AggregatorFactory[]{
new DoubleSumAggregatorFactory(TestIndex.METRICS[0], TestIndex.METRICS[0]),
new DoubleSumAggregatorFactory(TestIndex.METRICS[0], TestIndex.METRICS[0], 1),
new HyperUniquesAggregatorFactory("quality_uniques", "quality")
},
new UniformGranularitySpec(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ public void testDeterminePartitions() throws Exception
Map.class
),
new AggregatorFactory[]{
new LongSumAggregatorFactory("val", "val")
new LongSumAggregatorFactory("val", "val", 1)
},
new UniformGranularitySpec(
Granularity.DAY,
Expand Down Expand Up @@ -189,7 +189,7 @@ public void testWithArbitraryGranularity() throws Exception
Map.class
),
new AggregatorFactory[]{
new LongSumAggregatorFactory("val", "val")
new LongSumAggregatorFactory("val", "val", 1)
},
new ArbitraryGranularitySpec(
QueryGranularity.MINUTE,
Expand Down Expand Up @@ -302,7 +302,7 @@ public void testIntervalBucketing() throws Exception
Map.class
),
new AggregatorFactory[]{
new LongSumAggregatorFactory("val", "val")
new LongSumAggregatorFactory("val", "val", 1)
},
new UniformGranularitySpec(
Granularity.HOUR,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public void testIndexTaskSerde() throws Exception
new DataSchema(
"foo",
null,
new AggregatorFactory[]{new DoubleSumAggregatorFactory("met", "met")},
new AggregatorFactory[]{new DoubleSumAggregatorFactory("met", "met", 1)},
new UniformGranularitySpec(
Granularity.DAY,
null,
Expand Down Expand Up @@ -119,7 +119,7 @@ public void testIndexTaskwithResourceSerde() throws Exception
new DataSchema(
"foo",
null,
new AggregatorFactory[]{new DoubleSumAggregatorFactory("met", "met")},
new AggregatorFactory[]{new DoubleSumAggregatorFactory("met", "met", 1)},
new UniformGranularitySpec(
Granularity.DAY,
null,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,8 +130,8 @@ public static Collection<Object[]> constructorFeeder() throws IOException
.withDimensionsSpec(ROW_PARSER)
.withMetrics(
new AggregatorFactory[]{
new LongSumAggregatorFactory(METRIC_LONG_NAME, DIM_LONG_NAME),
new DoubleSumAggregatorFactory(METRIC_FLOAT_NAME, DIM_FLOAT_NAME)
new LongSumAggregatorFactory(METRIC_LONG_NAME, DIM_LONG_NAME, 1),
new DoubleSumAggregatorFactory(METRIC_FLOAT_NAME, DIM_FLOAT_NAME, 1)
}
)
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ private static Map<String, Object> persist(File tmpDir, InputRow... rows)
.withDimensionsSpec(ROW_PARSER)
.withMetrics(
new AggregatorFactory[]{
new LongSumAggregatorFactory(METRICS[0], METRICS[0])
new LongSumAggregatorFactory(METRICS[0], METRICS[0], 1)
}
)
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -507,7 +507,7 @@ public void testIndexTask() throws Exception
new DataSchema(
"foo",
null,
new AggregatorFactory[]{new DoubleSumAggregatorFactory("met", "met")},
new AggregatorFactory[]{new DoubleSumAggregatorFactory("met", "met", 1)},
new UniformGranularitySpec(
Granularity.DAY,
null,
Expand Down Expand Up @@ -565,7 +565,7 @@ public void testIndexTaskFailure() throws Exception
new DataSchema(
"foo",
null,
new AggregatorFactory[]{new DoubleSumAggregatorFactory("met", "met")},
new AggregatorFactory[]{new DoubleSumAggregatorFactory("met", "met", 1)},
new UniformGranularitySpec(
Granularity.DAY,
null,
Expand Down Expand Up @@ -914,7 +914,7 @@ public void testResumeTasks() throws Exception
new DataSchema(
"foo",
null,
new AggregatorFactory[]{new DoubleSumAggregatorFactory("met", "met")},
new AggregatorFactory[]{new DoubleSumAggregatorFactory("met", "met", 1)},
new UniformGranularitySpec(
Granularity.DAY,
null,
Expand Down Expand Up @@ -1012,7 +1012,7 @@ private RealtimeIndexTask giveMeARealtimeIndexTask() {
DataSchema dataSchema = new DataSchema(
"test_ds",
null,
new AggregatorFactory[]{new LongSumAggregatorFactory("count", "rows")},
new AggregatorFactory[]{new LongSumAggregatorFactory("count", "rows", 1)},
new UniformGranularitySpec(Granularity.DAY, QueryGranularity.NONE, null),
mapper
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public Object combine(Object lhs, Object rhs)
@Override
public AggregatorFactory getCombiningFactory()
{
return new LongSumAggregatorFactory(name, name);
return new LongSumAggregatorFactory(name, name, 1);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,21 +43,23 @@ static double combineValues(Object lhs, Object rhs)

private final FloatColumnSelector selector;
private final String name;
private final int exponent;

private double sum;

public DoubleSumAggregator(String name, FloatColumnSelector selector)
public DoubleSumAggregator(String name, FloatColumnSelector selector, int exponent)
{
this.name = name;
this.selector = selector;
this.exponent = exponent;

this.sum = 0;
}

@Override
public void aggregate()
{
sum += selector.get();
sum += (exponent == 1 ? selector.get() : Math.pow(selector.get(), exponent));
}

@Override
Expand Down Expand Up @@ -93,7 +95,7 @@ public String getName()
@Override
public Aggregator clone()
{
return new DoubleSumAggregator(name, selector);
return new DoubleSumAggregator(name, selector, exponent);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,33 +37,38 @@ public class DoubleSumAggregatorFactory implements AggregatorFactory

private final String fieldName;
private final String name;
private final int exponent;

@JsonCreator
public DoubleSumAggregatorFactory(
@JsonProperty("name") String name,
@JsonProperty("fieldName") final String fieldName
@JsonProperty("fieldName") final String fieldName,
@JsonProperty("exponent") final Integer exponent
)
{
Preconditions.checkNotNull(name, "Must have a valid, non-null aggregator name");
Preconditions.checkNotNull(fieldName, "Must have a valid, non-null fieldName");

this.name = name;
this.fieldName = fieldName;
this.exponent = exponent == null ? 1 : exponent.intValue();
Preconditions.checkArgument(this.exponent >= 1, "exponent must be greater or equal to 1");
}

@Override
public Aggregator factorize(ColumnSelectorFactory metricFactory)
{
return new DoubleSumAggregator(
name,
metricFactory.makeFloatColumnSelector(fieldName)
metricFactory.makeFloatColumnSelector(fieldName),
exponent
);
}

@Override
public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory)
{
return new DoubleSumBufferAggregator(metricFactory.makeFloatColumnSelector(fieldName));
return new DoubleSumBufferAggregator(metricFactory.makeFloatColumnSelector(fieldName), exponent);
}

@Override
Expand All @@ -81,13 +86,13 @@ public Object combine(Object lhs, Object rhs)
@Override
public AggregatorFactory getCombiningFactory()
{
return new DoubleSumAggregatorFactory(name, name);
return new DoubleSumAggregatorFactory(name, name, 1);
}

@Override
public List<AggregatorFactory> getRequiredColumns()
{
return Arrays.<AggregatorFactory>asList(new DoubleSumAggregatorFactory(fieldName, fieldName));
return Arrays.<AggregatorFactory>asList(new DoubleSumAggregatorFactory(fieldName, fieldName, exponent));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,15 @@
public class DoubleSumBufferAggregator implements BufferAggregator
{
private final FloatColumnSelector selector;
private final int exponent;

public DoubleSumBufferAggregator(
FloatColumnSelector selector
FloatColumnSelector selector,
int exponent
)
{
this.selector = selector;
this.exponent = exponent;
}

@Override
Expand All @@ -43,7 +46,10 @@ public void init(ByteBuffer buf, int position)
@Override
public void aggregate(ByteBuffer buf, int position)
{
buf.putDouble(position, buf.getDouble(position) + (double) selector.get());
buf.putDouble(
position,
buf.getDouble(position) + (exponent == 1 ? + selector.get() : Math.pow(selector.get(), exponent))
);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package io.druid.query.aggregation;

import com.google.common.math.LongMath;
import com.google.common.primitives.Longs;
import io.druid.segment.LongColumnSelector;

Expand All @@ -41,21 +42,24 @@ static long combineValues(Object lhs, Object rhs) {

private final LongColumnSelector selector;
private final String name;
private final int exponent;

private long sum;

public LongSumAggregator(String name, LongColumnSelector selector)
public LongSumAggregator(String name, LongColumnSelector selector, int exponent)
{
this.name = name;
this.selector = selector;
this.exponent = exponent;

this.sum = 0;
}

@Override
public void aggregate()
{
sum += selector.get();
//TODO: should we do LongMath.checkPow(..) instead which would fail in case of overflow?
sum += (exponent == 1 ? selector.get() : LongMath.pow(selector.get(), exponent));
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we quantify the performance impact of the additional branch check?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@xvrl will do that

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ran some tests using following code

@State(Scope.Benchmark)
public class LongSumAggregatorBenchmark
{
  private LongColumnSelector selector = new LongColumnSelector()
  {
    @Override
    public long get()
    {
      return 100l;
    }
  };

  @Benchmark
  @BenchmarkMode(Mode.AverageTime)
  @OutputTimeUnit(TimeUnit.MICROSECONDS)
  public void benchmarkAggregate(Blackhole blackhole)
  {
    LongSumAggregator aggregator = new LongSumAggregator("name", selector, 1);
    for (int i = 0; i < 10000000; i++) {
      aggregator.aggregate();
    }
    blackhole.consume(aggregator.get());
  }

  public static void main(String[] args) throws Exception
  {
    Options opt = new OptionsBuilder()
        .include(".*" + LongSumAggregatorBenchmark.class.getSimpleName() + ".*")
        .warmupIterations(5)
        .forks(1)
        .build();
    new Runner(opt).run();
  }
}

with branching..

Result "benchmarkAggregate":
  210.999 ±(99.9%) 12.971 us/op [Average]
  (min, avg, max) = (192.895, 210.999, 241.701), stdev = 14.937
  CI (99.9%): [198.028, 223.970] (assumes normal distribution)

without branching..

Result "benchmarkAggregate":
  209.661 ±(99.9%) 10.036 us/op [Average]
  (min, avg, max) = (189.288, 209.661, 234.579), stdev = 11.557
  CI (99.9%): [199.625, 219.697] (assumes normal distribution)

note that, on multiple runs, numbers for both fluctuate a bit for both cases and I can't see any major difference in performance between two... branching is OK and creating another aggregator ( #1965 (comment) ) to handle exponent case is not really needed.

}

@Override
Expand Down Expand Up @@ -91,7 +95,7 @@ public String getName()
@Override
public Aggregator clone()
{
return new LongSumAggregator(name, selector);
return new LongSumAggregator(name, selector, exponent);
}

@Override
Expand Down
Loading