Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,20 +28,19 @@
import io.druid.query.aggregation.BufferAggregator;
import io.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import io.druid.segment.ObjectColumnSelector;
import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;

import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
import java.util.IdentityHashMap;

public class SketchBufferAggregator implements BufferAggregator
{
private final ObjectColumnSelector selector;
private final int size;
private final int maxIntermediateSize;

private NativeMemory nm;

private final Map<Integer, Union> unions = new HashMap<>(); //position in BB -> Union Object
private final IdentityHashMap<ByteBuffer, Int2ObjectMap<Union>> unions = new IdentityHashMap<>();
private final IdentityHashMap<ByteBuffer, NativeMemory> nmCache = new IdentityHashMap<>();

public SketchBufferAggregator(ObjectColumnSelector selector, int size, int maxIntermediateSize)
{
Expand All @@ -53,12 +52,7 @@ public SketchBufferAggregator(ObjectColumnSelector selector, int size, int maxIn
@Override
public void init(ByteBuffer buf, int position)
{
if (nm == null) {
nm = new NativeMemory(buf);
}

Memory mem = new MemoryRegion(nm, position, maxIntermediateSize);
unions.put(position, (Union) SetOperation.builder().initMemory(mem).build(size, Family.UNION));
createNewUnion(buf, position, false);
}

@Override
Expand Down Expand Up @@ -87,12 +81,27 @@ public Object get(ByteBuffer buf, int position)
//Note that this is not threadsafe and I don't think it needs to be
private Union getUnion(ByteBuffer buf, int position)
{
Union union = unions.get(position);
if (union == null) {
Memory mem = new MemoryRegion(nm, position, maxIntermediateSize);
union = (Union) SetOperation.wrap(mem);
unions.put(position, union);
Int2ObjectMap<Union> unionMap = unions.get(buf);
Union union = unionMap != null ? unionMap.get(position) : null;
if (union != null) {
return union;
}
return createNewUnion(buf, position, true);
}

private Union createNewUnion(ByteBuffer buf, int position, boolean isWrapped)
{
NativeMemory nm = getNativeMemory(buf);
Memory mem = new MemoryRegion(nm, position, maxIntermediateSize);
Union union = isWrapped
? (Union) SetOperation.wrap(mem)
: (Union) SetOperation.builder().initMemory(mem).build(size, Family.UNION);
Int2ObjectMap<Union> unionMap = unions.get(buf);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unions.computeIfAbsent(buf, Int2ObjectOpenHashMap::new).put(position, union) could be used here

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

computeIfAbsent() require creating new function on each call and I'm not sure if it would impact performance.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No. In both places where I propose computeIfAbsent() non-capturing lambdas are required, they create only one instance in the JVM: http://stackoverflow.com/questions/27524445/does-a-lambda-expression-create-an-object-on-the-heap-every-time-its-executed#comment43488801_27524445

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did one small performance test, https://gist.github.com/akashdw/9f9b9db11fca602e562ecb47ec3ea24f,

Benchmark Mode Cnt Score Error Units
ComputeIfAbsent.withComputeIfAbsent avgt 25 150.218 ± 4.617 us/op
ComputeIfAbsent.withoutComputeIfAbsent avgt 25 113.956 ± 2.037 us/op

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@akashdw have you even tried to analyze the results? Or you supposed that I should do this for you?

  • This benchmark is doing computeIfAbsent() against Int2ObjectOpenHashMap rather than IdentityHashMap, like in the PR
  • Default @Setup level is Level.Trial, that makes little sense for such code. Should be Level.Invocation.
  • The actual reason of the difference, is that Int2ObjectOpenHashMap::new takes the key as capacity of the map. It is a bug in the code that I suggested. It should be k -> new Int2ObjectOpenHashMap().

So the results of this benchmark: https://gist.github.com/leventov/2dabf3563e50012493e030aa69a40be4

ComputeIfAbsent.withComputeIfAbsent     avgt   10  83.784 ± 1.476  us/op
ComputeIfAbsent.withoutComputeIfAbsent  avgt   10  85.201 ± 1.083  us/op

I. e. versions perform identically, that is just as expected, because IdentityHashMap actually doesn't override computeIfAbsent(). The default implementation of computeIfAbsent() does exactly the same calls as you do in the hand-written version. However

  • IdentityHashMap may specialize computeIfAbsent() in the future versions of JDK;
  • At some point of time unions and nmCache could be changed to use e. g. HashMap, which specializes computeIfAbsent(), and the code that already uses computeIfAbsent() will automatically improve;
  • All other advantages of computeIfAbsent() are still in place.

Note: -server is the default option with JDK 8, you don't need to specify it explicitly.

Copy link
Copy Markdown
Contributor Author

@akashdw akashdw Mar 22, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gianm Yes, SketchAggregator.updateUnion is expected to take longer than getUnion and requested optimization might not yield performance difference.
If BufferAggregators called in the order you mentioned in #4071 (comment), then nmCache and unions will contain only one key.

whether or not this optimization is necessary right now.

I think the current changes are in good state, no optimization required and are ready to be merged.

However instead of addressing my original small code quality comment (from last week) that would have taken less than 5 min, @akashdw started a dispute and then a super time-draining benchmark battle

@leventov Are you suggesting to accept change request without doing validation?

To be very clear, one can not simply accept these kind of change requests without verification and validation b/c its about performance/correctness and not about changes you requested which can be done in 5 minutes.

fyi your initial recommendation had bug (b/c probably you didn't validated or benchmarked it )and caught during benchmarking.
Given,SketchAggregator.updateUnion is expected to take longer than getUnion and requested optimization might not yield performance difference, can you please unblock this PR ?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@akashdw

You started to do "performance validation", while taken down earlier suggestion by @himanshug (your coworker, btw), which was much better, both from performance and simplicity points of view, than both versions of code we were arguing about. When I paid attention to that dialogue between you and @himanshug, I realised immediately that the reasons why you decided it is impossible to implement @himanshug's suggestion are not valid and that suggestion is eligible.

So instead of spending a few mins thinking about @himanshug's suggestion, that you should have done if you cared about getUnion()'s performance, because the suggestion obviously makes getUnion() much more faster, you preferred to spend a significant time preparing invalid benchmark and throwing it at me, forcing me to spend a lot of time analyzing it (a required step when doing any benchmarking).

This is a terrible waste of yours and my time and not how communication in Druid PRs is expected to be done.


fyi your initial recommendation had bug (b/c probably you never validated or benchmarked it )and caught during benchmarking

Yes, it's my mistake, and I noted for myself for the future to pay more attention when leaving PR review comments. However bugs are unavoidable in code completely. The fact that you didn't notice it before starting benchmarking also tells that you don't validate review suggestions functionally very attentively.

The fact that benchmark helped to find the mistake is a total coincidence and doesn't mean that benchmarking was useful.


Unblocking PR in it's current state means that I'm taking full responsibility for this situation, that I'm still not going to do. (But it doesn't mean that I impose that it's fully your responsibility, no.) I learned some lessons from this situation and I suggest you to do the same. So currently I suggest you to implement @himanshug / my suggestion. I think it's "compromise enough" because it's not what we were arguing about. Let alone it's just better.

Copy link
Copy Markdown
Contributor Author

@akashdw akashdw Mar 22, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@leventov BufferAggregator interface does not tell about the order of execution mentioned here #4071 (comment), now BufferAggregators implementations have two options:

  1. Favor correctness : i.e to support any execution order of BufferAggregator methods. (e.g support aggregate-related methods even without init() being called)
  2. Favor performance : i.e assume relocate method in BufferAggregator. #4071 (comment) execution order and fail aggregate-related methods if methods are not called in correct order.

I choose correctness over performance and Its unfair to block someones PR who disagrees with your choice.

FYI: @himanshug already +1'ed this PR.

Copy link
Copy Markdown
Contributor

@himanshug himanshug Mar 22, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gianm @leventov

The contract of BufferAggregator is that before aggregate(int pos, ByteBuffer buff) is called, caller ensures that init(pos,buff) has been called for that position and buffer.... and that is the end of it. Now, there may or may not be multiple ByteBuffers in the scene at a time .
However, given the current state of Druid, there is only one ByteBuffer there at a time and even groupBy-v2 follows the contract described at #4071 (comment) .
But BufferAggregator contract is more flexible than that. @akashdw made me realize that and to prevent future bugs from happening he stopped making that assumption in the code and created maps. and that is why I gave up on my suggestion and +1d it.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@akashdw @himanshug this is BDUF. Possible redesign of BufferGrouper which won't include "rehash" will make relocate() obsolete as well, so rework of SketchBufferAggregator will be needed anyway. And this BDUF not really safe and couldn't easily be made safe, as I shown above. However both versions are correct now.

@akashdw didn't you notice that you are the Druid contributor who is the most reluctant to making changes suggested by (any) reviewers? General mindset among Druid committers is implementing reviewer's comments or compromise decisions to achieve agreement without extra communication, with high level of common acceptance of the code. Your mindset seems to be to persuade reviewers that your original code is the best. Moreover, your way of persuading is highly disrespectful to reviewer's (my) time.

The shortest and least time-consuming way to resolve this situation now for all parties is for you to spend 15 minutes and implement expectedBuffer and throw away maps. It should be clear for you from the yesterday's evening. Continuing to argue and draining time is entirely your choice.

if (unionMap == null) {
unionMap = new Int2ObjectOpenHashMap<>();
unions.put(buf, unionMap);
}
unionMap.put(position, union);
return union;
}

Expand All @@ -119,4 +128,29 @@ public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
inspector.visit("selector", selector);
}

@Override
public void relocate(int oldPosition, int newPosition, ByteBuffer oldBuffer, ByteBuffer newBuffer)
{
createNewUnion(newBuffer, newPosition, true);
Int2ObjectMap<Union> unionMap = unions.get(oldBuffer);
if (unionMap != null) {
unionMap.remove(oldPosition);
if (unionMap.isEmpty()) {
unions.remove(oldBuffer);
nmCache.remove(oldBuffer);
}
}
}

private NativeMemory getNativeMemory(ByteBuffer buffer)
{
NativeMemory nm = nmCache.get(buffer);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

computeIfAbsent() should be used here. See #4034

Copy link
Copy Markdown
Contributor Author

@akashdw akashdw Mar 17, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this something agreed upon? Also, please explain advantages of computeIfAbsent() compared to this in this case.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • More readable, intent is clearer: computeIfAbsent speaks for itself, harder to make a mistake.
  • One operation with Map instead of two separate operations.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Its a small method, readability and intent is pretty much clear.
Also, This is not a hot method and for now I will stick to the current implementation.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not a huge improvement, but it is an improvement by any metric - computeIfAbsent is clearer, shorter, safer, more efficient and the "right" way to perform this kind of task in Java 8+ world. I don't see reasons not to use it in completely new code.

if (nm == null) {
nm = new NativeMemory(buffer);
nmCache.put(buffer, nm);
}
return nm;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -290,4 +290,16 @@ public static SketchHolder sketchSetOperation(Func func, int sketchSize, Object.
throw new IllegalArgumentException("Unknown sketch operation " + func);
}
}

@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
return this.getSketch().equals(((SketchHolder) o).getSketch());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package io.druid.query.aggregation.datasketches.theta;

import com.google.common.base.Suppliers;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.yahoo.sketches.theta.Sketches;
import com.yahoo.sketches.theta.UpdateSketch;
import io.druid.data.input.MapBasedRow;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.CountAggregatorFactory;
import io.druid.query.groupby.epinephelinae.BufferGrouper;
import io.druid.query.groupby.epinephelinae.Grouper;
import io.druid.query.groupby.epinephelinae.GrouperTestUtil;
import io.druid.query.groupby.epinephelinae.TestColumnSelectorFactory;
import org.junit.Assert;
import org.junit.Test;

import java.nio.ByteBuffer;

public class BufferGrouperUsingSketchMergeAggregatorFactoryTest
{
private static BufferGrouper<Integer> makeGrouper(
TestColumnSelectorFactory columnSelectorFactory,
int bufferSize,
int initialBuckets
)
{
final BufferGrouper<Integer> grouper = new BufferGrouper<>(
Suppliers.ofInstance(ByteBuffer.allocate(bufferSize)),
GrouperTestUtil.intKeySerde(),
columnSelectorFactory,
new AggregatorFactory[]{
new SketchMergeAggregatorFactory("sketch", "sketch", 16, false, true, 2),
new CountAggregatorFactory("count")
},
Integer.MAX_VALUE,
0.75f,
initialBuckets
);
grouper.init();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: grouper need to be closed after test.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed.

return grouper;
}

@Test
public void testGrowingBufferGrouper()
{
final TestColumnSelectorFactory columnSelectorFactory = GrouperTestUtil.newColumnSelectorFactory();
final Grouper<Integer> grouper = makeGrouper(columnSelectorFactory, 100000, 2);
try {
final int expectedMaxSize = 5;

SketchHolder sketchHolder = SketchHolder.of(Sketches.updateSketchBuilder().build(16));
UpdateSketch updateSketch = (UpdateSketch) sketchHolder.getSketch();
updateSketch.update(1);

columnSelectorFactory.setRow(new MapBasedRow(0, ImmutableMap.<String, Object>of("sketch", sketchHolder)));

for (int i = 0; i < expectedMaxSize; i++) {
Assert.assertTrue(String.valueOf(i), grouper.aggregate(i));
}

updateSketch.update(3);
columnSelectorFactory.setRow(new MapBasedRow(0, ImmutableMap.<String, Object>of("sketch", sketchHolder)));

for (int i = 0; i < expectedMaxSize; i++) {
Assert.assertTrue(String.valueOf(i), grouper.aggregate(i));
}

Object[] holders = Lists.newArrayList(grouper.iterator(true)).get(0).getValues();

Assert.assertEquals(2.0d, ((SketchHolder) holders[0]).getEstimate(), 0);
}
finally {
grouper.close();
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import com.yahoo.sketches.theta.Sketch;
import com.yahoo.sketches.theta.Sketches;
import com.yahoo.sketches.theta.Union;
import com.yahoo.sketches.theta.UpdateSketch;
import io.druid.data.input.MapBasedRow;
import io.druid.data.input.Row;
import io.druid.java.util.common.granularity.Granularities;
Expand All @@ -39,6 +40,8 @@
import io.druid.query.aggregation.post.FieldAccessPostAggregator;
import io.druid.query.groupby.GroupByQueryConfig;
import io.druid.query.groupby.GroupByQueryRunnerTest;
import io.druid.query.groupby.epinephelinae.GrouperTestUtil;
import io.druid.query.groupby.epinephelinae.TestColumnSelectorFactory;
import org.joda.time.DateTime;
import org.junit.Assert;
import org.junit.Rule;
Expand Down Expand Up @@ -389,6 +392,23 @@ public void testSketchAggregatorFactoryComparator()
Assert.assertEquals(1, comparator.compare(SketchHolder.of(union2), SketchHolder.of(sketch1)));
}

@Test
public void testRelocation()
{
final TestColumnSelectorFactory columnSelectorFactory = GrouperTestUtil.newColumnSelectorFactory();
SketchHolder sketchHolder = SketchHolder.of(Sketches.updateSketchBuilder().build(16));
UpdateSketch updateSketch = (UpdateSketch) sketchHolder.getSketch();
updateSketch.update(1);

columnSelectorFactory.setRow(new MapBasedRow(0, ImmutableMap.<String, Object>of("sketch", sketchHolder)));
SketchHolder[] holders = helper.runRelocateVerificationTest(
new SketchMergeAggregatorFactory("sketch", "sketch", 16, false, true, 2),
columnSelectorFactory,
SketchHolder.class
);
Assert.assertEquals(holders[0].getEstimate(), holders[1].getEstimate(), 0);
}

private void assertPostAggregatorSerde(PostAggregator agg) throws Exception
{
Assert.assertEquals(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,21 @@
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.io.Files;
import com.yahoo.sketches.theta.Sketches;
import com.yahoo.sketches.theta.UpdateSketch;
import io.druid.data.input.MapBasedRow;
import io.druid.java.util.common.granularity.Granularities;
import io.druid.java.util.common.guava.Sequence;
import io.druid.java.util.common.guava.Sequences;
import io.druid.query.aggregation.AggregationTestHelper;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.PostAggregator;
import io.druid.query.aggregation.datasketches.theta.SketchHolder;
import io.druid.query.aggregation.post.FieldAccessPostAggregator;
import io.druid.query.groupby.GroupByQueryConfig;
import io.druid.query.groupby.GroupByQueryRunnerTest;
import io.druid.query.groupby.epinephelinae.GrouperTestUtil;
import io.druid.query.groupby.epinephelinae.TestColumnSelectorFactory;
import org.joda.time.DateTime;
import org.junit.Assert;
import org.junit.Rule;
Expand Down Expand Up @@ -194,6 +199,23 @@ public void testSketchSetPostAggregatorSerde() throws Exception
);
}

@Test
public void testRelocation()
{
final TestColumnSelectorFactory columnSelectorFactory = GrouperTestUtil.newColumnSelectorFactory();
SketchHolder sketchHolder = SketchHolder.of(Sketches.updateSketchBuilder().build(16));
UpdateSketch updateSketch = (UpdateSketch) sketchHolder.getSketch();
updateSketch.update(1);

columnSelectorFactory.setRow(new MapBasedRow(0, ImmutableMap.<String, Object>of("sketch", sketchHolder)));
SketchHolder[] holders = helper.runRelocateVerificationTest(
new OldSketchMergeAggregatorFactory("sketch", "sketch", 16, false),
columnSelectorFactory,
SketchHolder.class
);
Assert.assertEquals(holders[0].getEstimate(), holders[1].getEstimate(), 0);
}

private void assertPostAggregatorSerde(PostAggregator agg) throws Exception
{
Assert.assertEquals(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,4 +126,28 @@ public interface BufferAggregator extends HotLoopCallee
default void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
}

/*
* Relocates any cached objects.
* If underlying ByteBuffer used for aggregation buffer relocates to a new ByteBuffer, positional caches(if any)
* built on top of old ByteBuffer can not be used for further {@link BufferAggregator#aggregate(ByteBuffer, int)}
* calls. This method tells the BufferAggregator that the cached objects at a certain location has been relocated to
* a different location.
*
* Only used if there is any positional caches/objects in the BufferAggregator implementation.
*
* If relocate happens to be across multiple new ByteBuffers (say n ByteBuffers), this method should be called
* multiple times(n times) given all the new positions/old positions should exist in newBuffer/OldBuffer.
*
* <b>Implementations must not change the position, limit or mark of the given buffer</b>
*
* @param oldPosition old position of a cached object before aggregation buffer relocates to a new ByteBuffer.
* @param newPosition new position of a cached object after aggregation buffer relocates to a new ByteBuffer.
* @param oldBuffer old aggregation buffer.
* @param newBuffer new aggregation buffer.
*/
default void relocate(int oldPosition, int newPosition, ByteBuffer oldBuffer, ByteBuffer newBuffer)
{
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -430,8 +430,9 @@ private void growIfPossible()

for (int oldBucket = 0; oldBucket < buckets; oldBucket++) {
if (isUsed(oldBucket)) {
int oldPosition = oldBucket * bucketSize;
entryBuffer.limit((oldBucket + 1) * bucketSize);
entryBuffer.position(oldBucket * bucketSize);
entryBuffer.position(oldPosition);
keyBuffer.limit(entryBuffer.position() + HASH_SIZE + keySize);
keyBuffer.position(entryBuffer.position() + HASH_SIZE);

Expand All @@ -442,9 +443,19 @@ private void growIfPossible()
throw new ISE("WTF?! Couldn't find a bucket while resizing?!");
}

newTableBuffer.position(newBucket * bucketSize);
int newPosition = newBucket * bucketSize;
newTableBuffer.position(newPosition);
newTableBuffer.put(entryBuffer);

for (int i = 0; i < aggregators.length; i++) {
aggregators[i].relocate(
oldPosition + aggregatorOffsets[i],
newPosition + aggregatorOffsets[i],
tableBuffer,
newTableBuffer
);
}

buffer.putInt(tableArenaSize + newSize * Ints.BYTES, newBucket * bucketSize);
newSize++;
}
Expand Down
Loading